Commit f24df125 f24df125b4a5f7e29eee39e89127871a38856281 by Sergey Poznyakoff

Fixes in stream subsystem. Rewrite and optimize maidag lmtp mode using streams.

* libmailutils/file_stream.c (mu_fd_stream_create): Mark stream as
open, do not call mu_stream_open explicitly.
* libmailutils/message_stream.c (mu_stream_to_message): Bugfixes,
wrong owner given to mu_envelope_set_ calls.
* libmailutils/stream.c (_MU_STR_FLUSH_ALL)
(_MU_STR_FLUSH_KEEP): New macros for _stream_flush_buffer.
(_stream_flush_buffer): Change the meaning of the last argument.
All callers updated.
(mu_stream_seek): Fix operation with MU_SEEK_END.
Call _mu_stream_cleareof on success.
(mu_stream_read): Call _stream_flush_buffer in buffered mode.
(mu_stream_getdelim, mu_stream_readdelim): Call _stream_flush_buffer.
* libmailutils/streamcpy.c (mu_stream_copy): Reset size if mu_stream_seek
fails.
* libmailutils/temp_file_stream.c (mu_temp_file_stream_create): Set
full buffering mode by default.

* maidag/mailtmp.c: Remove.
* maidag/Makefile.am (maidag_SOURCES): Remove mailtmp.c
* po/POTFILES.in: Likewise.
* maidag/deliver.c (make_tmp): Rewrite.  Return mu_mailbox_t.
All callers changed.
* maidag/lmtp.c (lmtp_transcript): Remove static.
(lmpt_transcript): New function.
(lmtp_reply): Use mu_stream_t instead of FILE.
(xlatnl): Remove.  Superseded by mu_rtrim_cset and family.
(mtmp, mbox): Remove globals.
(mesg): New global.
(cfun_unknown, cfun_mail_from, cfun_rcpt_to)
(dot_temp_fail, dot_deliver, cfun_rset)
(cfun_lhlo, cfun_quit, cfun_help): Use mu_stream_t instead of FILE.
(cfun_data): Rewrite.
(cfun_dot): Remove.
(to_fgets): Rewrite using mu_stream_t.
(lmtp_loop): Change signature. Rewrite using mu_stream_t.
(lmtp_connection, maidag_lmtp_server): Update accordingly.
* maidag/maidag.c (maidag_transcript): New global.
(options, parse_opt): New option --transcript.
* maidag/maidag.h (maidag_transcript): New extern.
(mail_tmp_begin, mail_tmp_add_line, mail_tmp_finish)
(mail_tmp_destroy): Remove.
1 parent 6b7badca
......@@ -390,16 +390,12 @@ mu_fd_stream_create (mu_stream_t *pstream, char *filename, int fd, int flags)
struct _mu_file_stream *fstr;
int rc = _mu_file_stream_create (&fstr,
sizeof (struct _mu_file_stream),
filename, fd, flags);
filename, fd, flags|_MU_STR_OPEN);
if (rc == 0)
{
mu_stream_t stream = (mu_stream_t) fstr;
mu_stream_set_buffer (stream, mu_buffer_full, 0);
rc = mu_stream_open (stream);
if (rc)
mu_stream_unref (stream);
else
*pstream = stream;
*pstream = stream;
}
return rc;
}
......
......@@ -414,8 +414,8 @@ mu_stream_to_message (mu_stream_t instream, mu_message_t *pmsg)
return rc;
}
mu_envelope_set_date (env, _env_msg_date, msg);
mu_envelope_set_sender (env, _env_msg_sender, msg);
mu_envelope_set_date (env, _env_msg_date, draftstream);
mu_envelope_set_sender (env, _env_msg_sender, draftstream);
mu_message_set_envelope (msg, env, draftstream);
mu_body_create (&body, msg);
......
......@@ -34,6 +34,9 @@
size_t mu_stream_default_buffer_size = MU_STREAM_DEFBUFSIZ;
#define _MU_STR_FLUSH_ALL 0x01
#define _MU_STR_FLUSH_KEEP 0x02
#define _stream_event(stream, code, n, p) \
do \
{ \
......@@ -167,12 +170,12 @@ _stream_buffer_full_p (struct _mu_stream *stream)
}
static int
_stream_flush_buffer (struct _mu_stream *stream, int all)
_stream_flush_buffer (struct _mu_stream *stream, int flags)
{
int rc;
char *start, *end;
size_t wrsize;
if (stream->flags & _MU_STR_DIRTY)
{
if ((stream->flags & MU_STREAM_SEEK) && stream->seek)
......@@ -215,7 +218,8 @@ _stream_flush_buffer (struct _mu_stream *stream, int all)
if (wrsize == 0)
break;
}
if ((all && wrsize) || wrsize == stream->level)
if (((flags & _MU_STR_FLUSH_ALL) && wrsize) ||
wrsize == stream->level)
{
rc = _stream_write_unbuffered (stream,
stream->buffer,
......@@ -234,10 +238,11 @@ _stream_flush_buffer (struct _mu_stream *stream, int all)
stream->level = stream->pos = wrsize;
return 0;
}
_stream_clrflag (stream, _MU_STR_DIRTY);
}
_stream_clrflag (stream, _MU_STR_DIRTY);
stream->pos = stream->level = 0;
if (!(flags & _MU_STR_FLUSH_KEEP))
stream->pos = stream->level = 0;
return 0;
}
......@@ -400,8 +405,8 @@ mu_stream_seek (mu_stream_t stream, mu_off_t offset, int whence,
case MU_SEEK_END:
rc = mu_stream_size (stream, &size);
if (rc)
return mu_stream_seterr (stream, rc, 1);
offset += size + stream->pos;
return rc;
offset += size;
break;
default:
......@@ -414,7 +419,7 @@ mu_stream_seek (mu_stream_t stream, mu_off_t offset, int whence,
|| offset < stream->offset
|| offset > stream->offset + stream->level))
{
if ((rc = _stream_flush_buffer (stream, 1)))
if ((rc = _stream_flush_buffer (stream, _MU_STR_FLUSH_ALL)))
return rc;
rc = stream->seek (stream, offset, &stream->offset);
if (rc == ESPIPE)
......@@ -425,7 +430,9 @@ mu_stream_seek (mu_stream_t stream, mu_off_t offset, int whence,
}
else if (stream->buftype != mu_buffer_none)
stream->pos = offset - stream->offset;
_mu_stream_cleareof (stream);
if (pres)
*pres = stream->offset + stream->pos;
return 0;
......@@ -469,7 +476,7 @@ _stream_skip_input_bytes (mu_stream_t stream, mu_off_t count, mu_off_t *pres)
{
for (pos = 0;;)
{
if ((rc = _stream_flush_buffer (stream, 1)))
if ((rc = _stream_flush_buffer (stream, _MU_STR_FLUSH_ALL)))
return rc;
if (stream->pos == stream->level)
{
......@@ -693,10 +700,15 @@ mu_stream_read (mu_stream_t stream, void *buf, size_t size, size_t *pread)
{
char *bufp = buf;
size_t nbytes = 0;
int rc;
if ((rc = _stream_flush_buffer (stream,
_MU_STR_FLUSH_ALL|_MU_STR_FLUSH_KEEP)))
return rc;
while (size)
{
size_t n;
int rc;
if (stream->pos == stream->level)
{
......@@ -824,7 +836,12 @@ mu_stream_readdelim (mu_stream_t stream, char *buf, size_t size,
rc = _stream_readdelim (stream, buf, size, delim, pread);
}
else
rc = _stream_scandelim (stream, buf, size, delim, pread);
{
if ((rc = _stream_flush_buffer (stream,
_MU_STR_FLUSH_ALL|_MU_STR_FLUSH_KEEP)))
return rc;
rc = _stream_scandelim (stream, buf, size, delim, pread);
}
return rc;
}
......@@ -850,6 +867,10 @@ mu_stream_getdelim (mu_stream_t stream, char **pbuf, size_t *psize,
_stream_init (stream);
}
if ((rc = _stream_flush_buffer (stream,
_MU_STR_FLUSH_ALL|_MU_STR_FLUSH_KEEP)))
return rc;
if (lineptr == NULL || n == 0)
{
char *new_lineptr;
......@@ -997,7 +1018,7 @@ mu_stream_flush (mu_stream_t stream)
return MU_ERR_NOT_OPEN;
_stream_init (stream);
}
rc = _stream_flush_buffer (stream, 1);
rc = _stream_flush_buffer (stream, _MU_STR_FLUSH_ALL);
if (rc)
return rc;
if ((stream->flags & _MU_STR_WRT) && stream->flush)
......@@ -1031,7 +1052,8 @@ int
mu_stream_size (mu_stream_t stream, mu_off_t *psize)
{
int rc;
mu_off_t size;
if (!(stream->flags & _MU_STR_OPEN))
{
if (stream->open)
......@@ -1040,7 +1062,13 @@ mu_stream_size (mu_stream_t stream, mu_off_t *psize)
}
if (!stream->size)
return mu_stream_seterr (stream, ENOSYS, 0);
rc = stream->size (stream, psize);
rc = stream->size (stream, &size);
if (rc == 0)
{
if (stream->buftype != mu_buffer_none && stream->offset == size)
size += stream->level;
*psize = size;
}
return mu_stream_seterr (stream, rc, rc != 0);
}
......@@ -1118,7 +1146,7 @@ mu_stream_truncate (mu_stream_t stream, mu_off_t size)
{
int rc;
if ((rc = _stream_flush_buffer (stream, 1)))
if ((rc = _stream_flush_buffer (stream, _MU_STR_FLUSH_ALL)))
return rc;
return stream->truncate (stream, size);
}
......
......@@ -72,6 +72,7 @@ mu_stream_copy (mu_stream_t dst, mu_stream_t src, mu_off_t size,
case EACCES:
mu_stream_clearerr (src);
case ENOSYS:
size = 0;
break;
default:
......
......@@ -66,7 +66,10 @@ mu_temp_file_stream_create (mu_stream_t *pstream, const char *dir)
if (rc)
mu_stream_unref (stream);
else
*pstream = stream;
{
mu_stream_set_buffer (stream, mu_buffer_full, 0);
*pstream = stream;
}
}
return 0;
}
......
......@@ -26,7 +26,6 @@ maidag_SOURCES=\
lmtp.c\
maidag.c\
maidag.h\
mailtmp.c\
mailquota.c\
python.c\
sieve.c\
......
......@@ -19,26 +19,110 @@
#include "maidag.h"
void
make_tmp (const char *from, mu_mailbox_t *mbox)
static mu_mailbox_t
make_tmp (const char *from)
{
struct mail_tmp *mtmp;
char *buf = NULL;
size_t n = 0;
int rc;
mu_stream_t in, out;
char *buf = NULL;
size_t size = 0, n;
mu_mailbox_t mbox;
rc = mu_stdio_stream_create (&in, MU_STDIN_FD, MU_STREAM_READ);
if (rc)
{
mu_diag_funcall (MU_DIAG_ERROR, "mu_stdio_stream_create",
"MU_STDIN_FD", rc);
exit (EX_TEMPFAIL);
}
rc = mu_temp_file_stream_create (&out, NULL);
if (rc)
{
maidag_error (_("unable to open temporary file: %s"), mu_strerror (rc));
exit (EX_TEMPFAIL);
}
if (mail_tmp_begin (&mtmp, from))
exit (EX_TEMPFAIL);
rc = mu_stream_getline (in, &buf, &size, &n);
if (rc)
{
maidag_error (_("read error: %s"), mu_strerror (rc));
mu_stream_destroy (&in);
mu_stream_destroy (&out);
exit (EX_TEMPFAIL);
}
if (n == 0)
{
maidag_error (_("unexpected EOF on input"));
mu_stream_destroy (&in);
mu_stream_destroy (&out);
exit (EX_TEMPFAIL);
}
while (getline (&buf, &n, stdin) > 0)
if ((rc = mail_tmp_add_line (mtmp, buf, strlen (buf))))
break;
if (n >= 5 && memcmp (buf, "From ", 5))
{
struct mu_auth_data *auth = NULL;
if (!from)
{
auth = mu_get_auth_by_uid (getuid ());
if (auth)
from = auth->name;
}
if (from)
{
time_t t;
time (&t);
mu_stream_printf (out, "From %s %s", from, ctime (&t));
}
else
{
maidag_error (_("cannot determine sender address"));
mu_stream_destroy (&in);
mu_stream_destroy (&out);
exit (EX_TEMPFAIL);
}
if (auth)
mu_auth_data_free (auth);
}
mu_stream_write (out, buf, n, NULL);
free (buf);
if (rc == 0)
rc = mail_tmp_finish (mtmp, mbox);
mail_tmp_destroy (&mtmp);
rc = mu_stream_copy (out, in, 0, NULL);
mu_stream_destroy (&in);
if (rc)
exit (EX_TEMPFAIL);
{
maidag_error (_("copy error: %s"), mu_strerror (rc));
mu_stream_destroy (&out);
exit (EX_TEMPFAIL);
}
mu_stream_flush (out);
if ((rc = mu_mailbox_create (&mbox, "mbox:/dev/null"))
|| (rc = mu_mailbox_open (mbox, MU_STREAM_READ))
|| (rc = mu_mailbox_set_stream (mbox, out)))
{
maidag_error (_("error opening temporary file: %s"),
mu_strerror (rc));
mu_stream_destroy (&out);
exit (EX_TEMPFAIL);
}
rc = mu_mailbox_messages_count (mbox, &n);
if (rc)
{
errno = rc;
maidag_error (_("error creating temporary message: %s"),
mu_strerror (rc));
mu_stream_destroy (&out);
exit (EX_TEMPFAIL);
}
/* FIXME: mu_stream_unref (out); But mu_mailbox_set_stream
steals the reference */
return mbox;
}
int
......@@ -65,9 +149,7 @@ mda (mu_mailbox_t mbx, char *username)
int
maidag_stdio_delivery (int argc, char **argv)
{
mu_mailbox_t mbox;
make_tmp (sender_address, &mbox);
mu_mailbox_t mbox = make_tmp (sender_address);
if (multiple_delivery)
multiple_delivery = argc > 1;
......
......@@ -53,6 +53,7 @@ int lmtp_mode;
int url_option;
char *lmtp_url_string;
int reuse_lmtp_address = 1;
int maidag_transcript;
const char *program_version = "maidag (" PACKAGE_STRING ")";
static char doc[] =
......@@ -72,6 +73,7 @@ static char args_doc[] = N_("[recipient...]");
#define LMTP_OPTION 258
#define FOREGROUND_OPTION 260
#define URL_OPTION 261
#define TRANSCRIPT_OPTION 262
static struct argp_option options[] =
{
......@@ -79,21 +81,23 @@ static struct argp_option options[] =
{ NULL, 0, NULL, 0,
N_("General options"), GRID },
{ "foreground", FOREGROUND_OPTION, 0, 0, N_("remain in foreground"),
GRID + 1 },
{ "inetd", 'i', 0, 0, N_("run in inetd mode"), GRID + 1 },
{ "daemon", 'd', N_("NUMBER"), OPTION_ARG_OPTIONAL,
N_("runs in daemon mode with a maximum of NUMBER children"), GRID + 1 },
{ "url", URL_OPTION, 0, 0, N_("deliver to given URLs"), GRID + 1 },
{ "from", 'f', N_("EMAIL"), 0,
N_("specify the sender's name"), GRID + 1 },
{ NULL, 'r', NULL, OPTION_ALIAS, NULL },
{ "lmtp", LMTP_OPTION, N_("URL"), OPTION_ARG_OPTIONAL,
N_("operate in LMTP mode"), GRID + 1 },
{ "debug", 'x', N_("FLAGS"), 0,
N_("enable debugging"), GRID + 1 },
{ "stderr", STDERR_OPTION, NULL, 0,
N_("log to standard error"), GRID + 1 },
{ "foreground", FOREGROUND_OPTION, 0, 0, N_("remain in foreground"),
GRID + 1 },
{ "inetd", 'i', 0, 0, N_("run in inetd mode"), GRID + 1 },
{ "daemon", 'd', N_("NUMBER"), OPTION_ARG_OPTIONAL,
N_("runs in daemon mode with a maximum of NUMBER children"), GRID + 1 },
{ "url", URL_OPTION, 0, 0, N_("deliver to given URLs"), GRID + 1 },
{ "from", 'f', N_("EMAIL"), 0,
N_("specify the sender's name"), GRID + 1 },
{ NULL, 'r', NULL, OPTION_ALIAS, NULL },
{ "lmtp", LMTP_OPTION, N_("URL"), OPTION_ARG_OPTIONAL,
N_("operate in LMTP mode"), GRID + 1 },
{ "debug", 'x', N_("FLAGS"), 0,
N_("enable debugging"), GRID + 1 },
{ "stderr", STDERR_OPTION, NULL, 0,
N_("log to standard error"), GRID + 1 },
{ "transcript", TRANSCRIPT_OPTION, NULL, 0,
N_("enable session transcript"), GRID + 1 },
#undef GRID
#define GRID 2
......@@ -212,6 +216,10 @@ parse_opt (int key, char *arg, struct argp_state *state)
mu_argp_node_list_new (lst, "listen", arg);
break;
case TRANSCRIPT_OPTION:
maidag_transcript = 1;
break;
case 'r':
case 'f':
if (sender_address != NULL)
......
......@@ -142,6 +142,7 @@ extern char *lmtp_url_string;
extern int reuse_lmtp_address;
extern mu_list_t lmtp_groups;
extern mu_acl_t maidag_acl;
extern int maidag_transcript;
void close_fds (void);
int switch_user_id (struct mu_auth_data *auth, int user);
......@@ -160,12 +161,6 @@ int deliver (mu_message_t msg, char *name, char **errp);
int sieve_test (struct mu_auth_data *auth, mu_message_t msg);
int check_quota (struct mu_auth_data *auth, mu_off_t size, mu_off_t *rest);
struct mail_tmp;
int mail_tmp_begin (struct mail_tmp **pmtmp, const char *from);
int mail_tmp_add_line (struct mail_tmp *mtmp, char *buf, size_t buflen);
int mail_tmp_finish (struct mail_tmp *mtmp, mu_mailbox_t *mbox);
void mail_tmp_destroy (struct mail_tmp **pmtmp);
enum maidag_forward_result
{
maidag_forward_none,
......
/* GNU Mailutils -- a suite of utilities for electronic mail
Copyright (C) 1999, 2000, 2001, 2002, 2005, 2007, 2009, 2010 Free
Software Foundation, Inc.
GNU Mailutils is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3, or (at your option)
any later version.
GNU Mailutils is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with GNU Mailutils; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301 USA */
#include "maidag.h"
struct mail_tmp
{
mu_stream_t stream;
size_t line;
char *tempfile;
const char *from;
int had_nl;
};
int
mail_tmp_begin (struct mail_tmp **pmtmp, const char *from)
{
int status;
struct mail_tmp *mtmp = malloc (sizeof *mtmp);
if (!mtmp)
return ENOMEM;
memset (mtmp, 0, sizeof *mtmp);
mtmp->tempfile = mu_tempname (NULL);
if ((status = mu_file_stream_create (&mtmp->stream, mtmp->tempfile,
MU_STREAM_RDWR)))
{
free (mtmp);
maidag_error (_("unable to open temporary file: %s"),
mu_strerror (status));
return status;
}
mtmp->from = from;
*pmtmp = mtmp;
return 0;
}
int
mail_tmp_add_line (struct mail_tmp *mtmp, char *buf, size_t buflen)
{
int status = 0;
mtmp->line++;
if (mtmp->line == 1)
{
const char *from = mtmp->from;
if (buflen >= 5 && memcmp (buf, "From ", 5))
{
struct mu_auth_data *auth = NULL;
if (!from)
{
auth = mu_get_auth_by_uid (getuid ());
if (auth)
from = auth->name;
}
if (from)
{
time_t t;
char *envs;
time (&t);
asprintf (&envs, "From %s %s", from, ctime (&t));
status = mu_stream_write (mtmp->stream,
envs,
strlen (envs), NULL);
free (envs);
}
else
{
maidag_error (_("cannot determine sender address"));
return EINVAL;
}
if (auth)
mu_auth_data_free (auth);
}
}
else if (buflen >= 5 && !memcmp (buf, "From ", 5))
{
static char *escape = ">";
status = mu_stream_write (mtmp->stream, escape, 1, NULL);
}
if (!status)
status = mu_stream_write (mtmp->stream, buf, buflen, NULL);
if (status)
{
maidag_error (_("error writing temporary file: %s"),
mu_strerror (status));
mu_stream_destroy (&mtmp->stream);
}
mtmp->had_nl = buf[buflen-1] == '\n';
return status;
}
int
mail_tmp_finish (struct mail_tmp *mtmp, mu_mailbox_t *mbox)
{
int status;
static char *newline = "\n";
size_t n;
if (!mtmp->had_nl)
status = mu_stream_write (mtmp->stream, newline, 1, NULL);
status = mu_stream_write (mtmp->stream, newline, 1, NULL);
unlink (mtmp->tempfile);
free (mtmp->tempfile);
mtmp->tempfile = NULL;
if (status)
{
errno = status;
maidag_error (_("error writing temporary file: %s"),
mu_strerror (status));
mu_stream_destroy (&mtmp->stream);
return status;
}
mu_stream_flush (mtmp->stream);
if ((status = mu_mailbox_create (mbox, "mbox:/dev/null"))
|| (status = mu_mailbox_open (*mbox, MU_STREAM_READ))
|| (status = mu_mailbox_set_stream (*mbox, mtmp->stream)))
{
maidag_error (_("error opening temporary file: %s"),
mu_strerror (status));
mu_stream_destroy (&mtmp->stream);
return status;
}
status = mu_mailbox_messages_count (*mbox, &n);
if (status)
{
errno = status;
maidag_error (_("error creating temporary message: %s"),
mu_strerror (status));
mu_stream_destroy (&mtmp->stream);
return status;
}
mtmp->stream = NULL;
mtmp->line = 0;
return status;
}
void
mail_tmp_destroy (struct mail_tmp **pmtmp)
{
struct mail_tmp *mtmp = *pmtmp;
if (mtmp)
{
if (mtmp->tempfile)
{
unlink (mtmp->tempfile);
free (mtmp->tempfile);
}
mu_stream_destroy (&mtmp->stream);
free (*pmtmp);
*pmtmp = NULL;
}
}
......@@ -90,7 +90,6 @@ maidag/deliver.c
maidag/lmtp.c
maidag/maidag.c
maidag/mailquota.c
maidag/mailtmp.c
maidag/script.c
mail/alias.c
......