Commit baea3184 baea3184ad1cf5c6124ac1697a52d782ed7bebab by Sergey Poznyakoff

Redo the support for transport-based streams.

If a stream takes another stream(s) as a transport, it
always increases its reference count.  This means that
when that stream is destroyed the underlying stream is
not destroyed by default (its refcount is decreased, that's
all). To force destruction of the underlying stream,
the caller must explicitly unreference it after creating
the stream that uses it (or give its creation function
the MU_STREAM_AUTOCLOSE flag, if it supports flags).

Similarly, if a stream uses a non-stream object (e.g. a file
descriptor) as the transport, it will not destroy it, unless
it has been created with the MU_STREAM_AUTOCLOSE flag. This
differs from the previous behavior.

The MU_STREAM_NO_CHECK and MU_STREAM_NO_CLOSE flags are removed.

* examples/base64.c (main): Call mu_filter_create with the
MU_STREAM_AUTOCLOSE flag.
* examples/mimetest.c (message_display_parts): Likewise.
* examples/murun.c (main): Unref the input stream after
passing it to mu_filter_prog_stream_create.
* imap4d/fetch.c (fetch_io): Update arguments to mu_filter_create
and mu_memory_stream_create.
* imap4d/preauth.c (decode64_buf)
(do_preauth_ident): Update arguments to mu_memory_stream_create and
mu_tcp_stream_create.
* imap4d/util.c (util_setio): Update arguments to mu_filter_create and
mu_stdio_stream_create.
* include/mailutils/stream.h (MU_STREAM_NO_CLOSE)
(MU_STREAM_NO_CHECK): Remove.
(MU_STREAM_AUTOCLOSE): New flag.
(mu_iostream_create): Remove the `flags' argument.
* libmu_argp/muinit.c (mu_app_init): Update arguments to
mu_stdio_stream_create.
* libmu_auth/ldap.c: Update arguments to
mu_memory_stream_create and mu_filter_create.
* libmu_auth/tls.c (_tls_io_close, _tls_close): Always try to close the
transport stream. Rely on refcount to protect it.
(_tls_io_done, _tls_done): Likewise, always unreference it.
(_mu_tls_io_stream_create): Increase reference counter on the
transport stream, unless MU_STREAM_AUTOCLOSE is requested.
(_mu_tls_stream_create): Rewrite using the new MU_STREAM_AUTOCLOSE
logic.
* libmu_sieve/extensions/spamd.c (spamd_connect_tcp): Update arguments to
mu_tcp_stream_create.
(spamd_connect_tcp): Update arguments to mu_socket_stream_create and
mu_filter_create.
* libmu_sieve/extensions/vacation.c (build_mime): Update arguments to
mu_filter_create.
* mail/decode.c (display_submessage): Update arguments to
mu_filter_create.
* mailbox/attachment.c (mu_message_save_attachment): Update arguments to
mu_filter_create.
* mailbox/cfg_driver.c (mu_cfg_tree_reduce): Update arguments to
mu_stdio_stream_create.
* mailbox/dbgstream.c (_dbg_done): Use MU_STREAM_AUTOCLOSE bit to
decide whether to destroy the debug object.
(mu_dbgstream_create): Use MU_STREAM_AUTOCLOSE instead of
MU_STREAM_NO_CLOSE.
* mailbox/file_stream.c (fd_close): Use MU_STREAM_AUTOCLOSE bit to
decide whether to close the descriptor.
(mu_file_stream_create): Force MU_STREAM_AUTOCLOSE bit.
* mailbox/filter.c (filter_create_rd, filter_create_wr): Change
substream creation logic.
* mailbox/filter_iconv.c (_icvt_close): Always try to close the
transport stream. Rely on refcount to protect it.
(_icvt_done): Ditto for destroying it.
(mu_filter_iconv_create): Increase refcount on the transport stream,
unless MU_STREAM_AUTOCLOSE is requested.
* mailbox/fltstream.c (filter_done): Always dereference the
transport stream. Rely on refcount to protect it.
(filter_close): Ditto for closing it.
(mu_filter_stream_create): Increase refcount on the transport stream,
unless MU_STREAM_AUTOCLOSE is requested.
* mailbox/iostream.c (_iostream_close) : Always try to close the
transport stream. Rely on refcount to protect it.
(_iostream_done): Ditto for closing it.
(mu_iostream_create): Remove the use of MU_STREAM_NO_CLOSE.
* mailbox/mimehdr.c (mu_mimehdr_decode_param): Remove the use of
MU_STREAM_NO_CLOSE.
* mailbox/mutil.c (mu_decode_filter) Change substream creation logic.
* mailbox/prog_stream.c (_prog_open): Use MU_STREAM_AUTOCLOSE bit
in arguments to mu_stdio_stream_create.
(mu_filter_prog_stream_create): Increase refcount on the transport
(input) stream.
* mailbox/rfc2047.c (mu_rfc2047_decode): Dereference in_stream after
passing it to mu_decode_filter.
Pass MU_STREAM_AUTOCLOSE in flags to mu_filter_create.
* mailbox/socket_stream.c (mu_socket_stream_create): Force
MU_STREAM_AUTOCLOSE bit.
* mailbox/streamref.c (_streamref_close): Always close the
transport stream (refcount will protect it, if necessary).
(mu_streamref_create_abridged): Mask out the MU_STREAM_AUTOCLOSE bit.
* mailbox/temp_file_stream.c (mu_temp_file_stream_create: Force
MU_STREAM_AUTOCLOSE bit.
* mailbox/xscript-stream.c (_xscript_close): Always close the
transport stream (refcount will protect it, if necessary).
(mu_xscript_stream_create): Increase refcounts on both underlying streams,
unless MU_STREAM_AUTOCLOSE is set.
* pop3d/extra.c (pop3d_setio): Remove uses of MU_STREAM_NO_CLOSE.

* examples/nntpclient.c: Remove uses of MU_STREAM_NO_CHECK/MU_STREAM_NO_CLOSE.
* examples/pop3client.c: Likewise.
* libmu_auth/gsasl.c: Likewise.
* libproto/nntp/nntp_stream.c: Likewise.
* libproto/pop/pop3_stream.c: Likewise.
* mailbox/tcp.c: Likewise.
* mailbox/vartab.c: Likewise.
* mh/mh_list.c: Likewise.
* mimeview/mimeview.c: Likewise.
* mh/mhn.c: Likewise.
(edit_mime): Use MU_STREAM_AUTOCLOSE.

Bugfixes:

* mailbox/fltstream.c (init_iobuf): Fix input initialization.
(filter_write_internal): Bugfix.
* mailbox/stream.c (_stream_buffer_freespace): New macro.
(_stream_buffer_is_full): New macro.
(BUFFER_FULL_P): Remove, use _stream_buffer_is_full instead. All callers
updated.
(_stream_flush_buffer): Operation for full buffered streams does not
depend on the `all' flag.
(mu_stream_write): Fix calculation of the bytes available in the
buffer.
1 parent 814b9791
......@@ -166,7 +166,8 @@ main (int argc, char * argv [])
if (flags == MU_STREAM_READ)
{
MU_ASSERT (mu_filter_create (&flt, in, encoding, mode,
MU_STREAM_READ|MU_STREAM_SEEK));
MU_STREAM_READ|MU_STREAM_SEEK|
MU_STREAM_AUTOCLOSE));
if (shift)
MU_ASSERT (mu_stream_seek (flt, shift, MU_SEEK_SET, NULL));
c_copy (out, flt);
......@@ -174,7 +175,7 @@ main (int argc, char * argv [])
else
{
MU_ASSERT (mu_filter_create (&flt, out, encoding, mode,
MU_STREAM_WRITE));
MU_STREAM_WRITE|MU_STREAM_AUTOCLOSE));
if (shift)
MU_ASSERT (mu_stream_seek (in, shift, MU_SEEK_SET, NULL));
c_copy (flt, in);
......
......@@ -253,7 +253,7 @@ message_display_parts (mu_message_t msg, int indent)
/* Make sure the original body stream is not closed when
str gets destroyed */
mu_filter_create (&str, str, encoding, MU_FILTER_DECODE,
MU_STREAM_READ | MU_STREAM_NO_CLOSE);
MU_STREAM_READ);
while (mu_stream_readline (str, buf, sizeof (buf), &nbytes) == 0
&& nbytes)
......
......@@ -71,6 +71,8 @@ main (int argc, char *argv[])
MU_ASSERT (mu_stdio_stream_create (&in, MU_STDIN_FD, 0));
MU_ASSERT (mu_stream_open (in));
rc = mu_filter_prog_stream_create (&stream, cmdline, in);
/* Make sure closing/destroying stream will close/destroy in */
mu_stream_unref (in);
}
else
rc = mu_prog_stream_create (&stream, cmdline, flags);
......
......@@ -1004,8 +1004,7 @@ com_connect (char *arg)
if (verbose)
com_verbose ("on");
status =
mu_tcp_stream_create (&tcp, host, port,
MU_STREAM_READ | MU_STREAM_NO_CHECK);
mu_tcp_stream_create (&tcp, host, port, MU_STREAM_READ);
if (status == 0)
{
mu_nntp_set_carrier (nntp, tcp);
......
......@@ -800,8 +800,7 @@ com_connect (char *arg)
if (verbose)
com_verbose ("on");
status =
mu_tcp_stream_create (&tcp, argv[0], n,
MU_STREAM_READ | MU_STREAM_NO_CHECK);
mu_tcp_stream_create (&tcp, argv[0], n, MU_STREAM_READ);
if (status == 0)
{
mu_pop3_set_carrier (pop3, tcp);
......
......@@ -694,7 +694,7 @@ fetch_io (mu_stream_t stream, size_t start, size_t size, size_t max)
size_t n = 0;
mu_filter_create (&rfc, stream, "rfc822", MU_FILTER_ENCODE,
MU_STREAM_READ|MU_STREAM_SEEK|MU_STREAM_NO_CLOSE);
MU_STREAM_READ|MU_STREAM_SEEK);
if (start == 0 && size == (size_t) -1)
{
......@@ -1038,7 +1038,7 @@ _frt_header_fields (struct fetch_function_closure *ffc,
return RESP_OK;
}
status = mu_memory_stream_create (&stream, MU_STREAM_NO_CHECK);
status = mu_memory_stream_create (&stream, 0);
if (status != 0)
imap4d_bye (ERR_NO_MEM);
......
......@@ -210,9 +210,9 @@ decode64_buf (const char *name, unsigned char **pbuf, size_t *psize)
name++;
namelen = strlen (name) - 1;
mu_memory_stream_create (&str, MU_STREAM_NO_CHECK);
mu_memory_stream_create (&str, 0);
mu_filter_create (&flt, str, "base64", MU_FILTER_DECODE,
MU_STREAM_READ | MU_STREAM_NO_CHECK);
MU_STREAM_READ | MU_STREAM_AUTOCLOSE);
mu_stream_open (str);
mu_stream_write (str, name, namelen, NULL);
mu_stream_read (flt, buf, sizeof buf, &size);
......@@ -349,8 +349,7 @@ do_preauth_ident (struct sockaddr *clt_sa, struct sockaddr *srv_sa)
memcpy (hostaddr, p, 15);
hostaddr[15] = 0;
rc = mu_tcp_stream_create (&stream, hostaddr, ident_port,
MU_STREAM_RDWR | MU_STREAM_NO_CHECK);
rc = mu_tcp_stream_create (&stream, hostaddr, ident_port, MU_STREAM_RDWR);
if (rc)
{
mu_diag_output (MU_DIAG_INFO, _("cannot create TCP stream: %s"),
......
......@@ -813,17 +813,18 @@ util_setio (FILE *in, FILE *out)
if (!out)
imap4d_bye (ERR_NO_OFILE);
if (mu_stdio_stream_create (&tmp, fileno (in), MU_STREAM_NO_CLOSE))
if (mu_stdio_stream_create (&tmp, fileno (in), 0))
imap4d_bye (ERR_NO_IFILE);
mu_stream_set_buffer (tmp, mu_buffer_line, 1024);
mu_filter_create (&istream, tmp, "rfc822", MU_FILTER_DECODE, MU_STREAM_READ);
mu_filter_create (&istream, tmp, "rfc822", MU_FILTER_DECODE,
MU_STREAM_READ | MU_STREAM_AUTOCLOSE);
mu_stream_set_buffer (istream, mu_buffer_line, 1024);
if (mu_stdio_stream_create (&tmp, fileno (out), MU_STREAM_NO_CLOSE))
if (mu_stdio_stream_create (&tmp, fileno (out), 0))
imap4d_bye (ERR_NO_OFILE);
mu_stream_set_buffer (tmp, mu_buffer_line, 1024);
mu_filter_create (&ostream, tmp, "rfc822", MU_FILTER_ENCODE,
MU_STREAM_WRITE);
MU_STREAM_WRITE | MU_STREAM_AUTOCLOSE);
mu_stream_set_buffer (ostream, mu_buffer_line, 1024);
}
......
......@@ -41,7 +41,7 @@ enum mu_buffer_type
#define MU_STREAM_APPEND 0x00000008
#define MU_STREAM_CREAT 0x00000010
#define MU_STREAM_NONBLOCK 0x00000020
#define MU_STREAM_NO_CLOSE 0x00000040
#define MU_STREAM_AUTOCLOSE 0x00000040
#define MU_STREAM_NONLOCK 0x00000080
#define MU_STREAM_ALLOW_LINKS 0x00000100
/* FIXME: This one affects only mailboxes */
......@@ -152,8 +152,7 @@ int mu_tcp_stream_create (mu_stream_t *stream, const char *host, int port,
int mu_xscript_stream_create(mu_stream_t *pref, mu_stream_t transport,
mu_stream_t logstr,
const char *prefix[]);
int mu_iostream_create (mu_stream_t *pref, mu_stream_t in, mu_stream_t out,
int flags);
int mu_iostream_create (mu_stream_t *pref, mu_stream_t in, mu_stream_t out);
int mu_dbgstream_create(mu_stream_t *pref, mu_debug_t debug,
mu_log_level_t level, int flags);
......
......@@ -132,8 +132,7 @@ mu_app_init (struct argp *myargp, const char **capa,
char *comment;
char *canonical_name = get_canonical_name ();
mu_stream_t stream;
mu_stdio_stream_create (&stream, MU_STDOUT_FD,
MU_STREAM_NO_CHECK|MU_STREAM_NO_CLOSE);
mu_stdio_stream_create (&stream, MU_STDOUT_FD, 0);
mu_stream_open (stream);
asprintf (&comment,
"Configuration file structure for %s utility.",
......
......@@ -256,7 +256,7 @@ mu_gsasl_stream_create (mu_stream_t *stream, mu_stream_t transport,
s->stream = transport;
s->sess_ctx = ctx;
rc = mu_stream_create (stream, flags|MU_STREAM_NO_CHECK, s);
rc = mu_stream_create (stream, flags, s);
if (rc)
{
free (s);
......
......@@ -594,9 +594,9 @@ chk_md5 (const char *db_pass, const char *pass)
mu_md5_process_bytes (pass, strlen (pass), &md5context);
mu_md5_finish_ctx (&md5context, md5digest);
mu_memory_stream_create (&str, MU_STREAM_NO_CHECK);
mu_memory_stream_create (&str, 0);
mu_filter_create (&flt, str, "base64", MU_FILTER_DECODE,
MU_STREAM_READ | MU_STREAM_NO_CHECK);
MU_STREAM_READ | MU_STREAM_AUTOCLOSE);
mu_stream_open (str);
mu_stream_write (str, db_pass, strlen (db_pass), NULL);
......@@ -617,9 +617,9 @@ chk_smd5 (const char *db_pass, const char *pass)
mu_stream_t str = NULL, flt = NULL;
size_t size;
mu_memory_stream_create (&str, MU_STREAM_NO_CHECK);
mu_memory_stream_create (&str, 0);
mu_filter_create (&flt, str, "base64", MU_FILTER_DECODE,
MU_STREAM_READ | MU_STREAM_NO_CHECK);
MU_STREAM_READ | MU_STREAM_AUTOCLOSE);
mu_stream_open (str);
size = strlen (db_pass);
mu_stream_write (str, db_pass, size, NULL);
......@@ -663,9 +663,9 @@ chk_sha (const char *db_pass, const char *pass)
mu_sha1_process_bytes (pass, strlen (pass), &sha1context);
mu_sha1_finish_ctx (&sha1context, sha1digest);
mu_memory_stream_create (&str, MU_STREAM_NO_CHECK);
mu_memory_stream_create (&str, 0);
mu_filter_create (&flt, str, "base64", MU_FILTER_DECODE,
MU_STREAM_READ | MU_STREAM_NO_CHECK);
MU_STREAM_READ | MU_STREAM_AUTOCLOSE);
mu_stream_open (str);
mu_stream_write (str, db_pass, strlen (db_pass), NULL);
......@@ -686,9 +686,9 @@ chk_ssha (const char *db_pass, const char *pass)
mu_stream_t str = NULL, flt = NULL;
size_t size;
mu_memory_stream_create (&str, MU_STREAM_NO_CHECK);
mu_memory_stream_create (&str, 0);
mu_filter_create (&flt, str, "base64", MU_FILTER_DECODE,
MU_STREAM_READ | MU_STREAM_NO_CHECK);
MU_STREAM_READ | MU_STREAM_AUTOCLOSE);
mu_stream_open (str);
size = strlen (db_pass);
mu_stream_write (str, db_pass, size, NULL);
......
......@@ -241,16 +241,13 @@ _tls_io_close (mu_stream_t stream)
{
struct _mu_tls_io_stream *sp = (struct _mu_tls_io_stream *) stream;
if (!(sp->stream.flags & MU_STREAM_NO_CLOSE))
return mu_stream_close (sp->transport);
return 0;
}
static void
_tls_io_done (struct _mu_stream *stream)
{
struct _mu_tls_io_stream *sp = (struct _mu_tls_io_stream *) stream;
if (!(sp->stream.flags & MU_STREAM_NO_CLOSE))
mu_stream_unref (sp->transport);
}
......@@ -364,8 +361,7 @@ _mu_tls_io_stream_create (mu_stream_t *pstream,
struct _mu_tls_io_stream *sp;
sp = (struct _mu_tls_io_stream *)
_mu_stream_create (sizeof (*sp),
flags & (MU_STREAM_RDWR | MU_STREAM_NO_CLOSE));
_mu_stream_create (sizeof (*sp), flags & MU_STREAM_RDWR);
if (!sp)
return ENOMEM;
......@@ -388,6 +384,8 @@ _mu_tls_io_stream_create (mu_stream_t *pstream,
/* FIXME:
sp->stream.error_string = _tls_error_string;*/
if (!(flags & MU_STREAM_AUTOCLOSE))
mu_stream_ref (transport);
sp->transport = transport;
sp->up = master;
*pstream = (mu_stream_t) sp;
......@@ -625,11 +623,8 @@ _tls_close (mu_stream_t stream)
sp->state = state_closed;
}
if (!(sp->stream.flags & MU_STREAM_NO_CLOSE))
{
mu_stream_close (sp->transport[0]);
mu_stream_close (sp->transport[1]);
}
return 0;
}
......@@ -646,11 +641,8 @@ _tls_done (struct _mu_stream *stream)
sp->state = state_destroyed;
}
if (!(sp->stream.flags & MU_STREAM_NO_CLOSE))
{
mu_stream_unref (sp->transport[0]);
mu_stream_unref (sp->transport[1]);
}
mu_stream_destroy (&sp->transport[0]);
mu_stream_destroy (&sp->transport[1]);
}
static int
......@@ -659,12 +651,11 @@ _mu_tls_stream_create (mu_stream_t *pstream,
mu_stream_t strin, mu_stream_t strout, int flags)
{
struct _mu_tls_stream *sp;
int noclose = flags & MU_STREAM_NO_CLOSE;
int autoclose = flags & MU_STREAM_AUTOCLOSE;
int rc;
sp = (struct _mu_tls_stream *)
_mu_stream_create (sizeof (*sp),
MU_STREAM_RDWR | noclose);
_mu_stream_create (sizeof (*sp), MU_STREAM_RDWR);
if (!sp)
return ENOMEM;
......@@ -679,13 +670,10 @@ _mu_tls_stream_create (mu_stream_t *pstream,
/* FIXME:
sp->stream.error_string = _tls_error_string;*/
if (!noclose && strin == strout)
mu_stream_ref (strin);
mu_stream_set_buffer (strin, mu_buffer_none, 0);
mu_stream_set_buffer (strout, mu_buffer_none, 0);
rc = _mu_tls_io_stream_create (&sp->transport[0], strin,
MU_STREAM_READ | noclose, sp);
MU_STREAM_READ | autoclose, sp);
if (rc)
{
free (sp);
......@@ -693,7 +681,7 @@ _mu_tls_stream_create (mu_stream_t *pstream,
}
rc = _mu_tls_io_stream_create (&sp->transport[1], strout,
MU_STREAM_WRITE | noclose, sp);
MU_STREAM_WRITE | autoclose, sp);
if (rc)
{
free (sp);
......
......@@ -46,7 +46,7 @@ static int
spamd_connect_tcp (mu_sieve_machine_t mach, mu_stream_t *stream,
char *host, int port)
{
int rc = mu_tcp_stream_create (stream, host, port, MU_STREAM_NO_CHECK);
int rc = mu_tcp_stream_create (stream, host, port, 0);
if (rc)
{
mu_sieve_error (mach, "mu_tcp_stream_create: %s", mu_strerror (rc));
......@@ -64,7 +64,7 @@ spamd_connect_tcp (mu_sieve_machine_t mach, mu_stream_t *stream,
static int
spamd_connect_socket (mu_sieve_machine_t mach, mu_stream_t *stream, char *path)
{
int rc = mu_socket_stream_create (stream, path, MU_STREAM_NO_CHECK);
int rc = mu_socket_stream_create (stream, path, 0);
if (rc)
{
mu_sieve_error (mach, "mu_socket_stream_create: %s", mu_strerror (rc));
......@@ -110,7 +110,7 @@ spamd_send_message (mu_stream_t stream, mu_message_t msg)
if (rc)
return rc;
rc = mu_filter_create (&flt, mstr, "rfc822", MU_FILTER_ENCODE,
MU_STREAM_READ|MU_STREAM_SEEK|MU_STREAM_NO_CLOSE);
MU_STREAM_READ|MU_STREAM_SEEK);
if (rc)
{
mu_stream_destroy (&mstr);
......
......@@ -88,7 +88,8 @@ build_mime (mu_sieve_machine_t mach, mu_list_t tags, mu_mime_t *pmime,
{
mu_stream_t fstr;
rc = mu_filter_create (&fstr, input, "base64",
MU_FILTER_ENCODE, MU_STREAM_READ);
MU_FILTER_ENCODE,
MU_STREAM_READ | MU_STREAM_AUTOCLOSE);
if (rc == 0)
{
header = "Content-Type: text/plain;charset=" MU_SIEVE_CHARSET "\n"
......
......@@ -131,7 +131,7 @@ mu_nntp_stream_create (mu_nntp_t nntp, mu_stream_t *pstream)
nntp_stream->nntp = nntp;
nntp_stream->done = 0;
status = mu_stream_create (pstream, MU_STREAM_READ | MU_STREAM_NO_CLOSE | MU_STREAM_NO_CHECK, nntp_stream);
status = mu_stream_create (pstream, MU_STREAM_READ, nntp_stream);
if (status != 0)
{
free (nntp_stream);
......
......@@ -131,7 +131,7 @@ mu_pop3_stream_create (mu_pop3_t pop3, mu_stream_t *pstream)
pop3_stream->pop3 = pop3;
pop3_stream->done = 0;
status = mu_stream_create (pstream, MU_STREAM_READ | MU_STREAM_NO_CLOSE | MU_STREAM_NO_CHECK, pop3_stream);
status = mu_stream_create (pstream, MU_STREAM_READ, pop3_stream);
if (status != 0)
{
free (pop3_stream);
......
......@@ -262,7 +262,7 @@ display_submessage (struct mime_descend_closure *closure, void *data)
/* Can we decode. */
if (mu_filter_create (&d_stream, b_stream, closure->encoding,
MU_FILTER_DECODE,
MU_STREAM_READ|MU_STREAM_NO_CLOSE) == 0)
MU_STREAM_READ | MU_STREAM_AUTOCLOSE) == 0)
stream = d_stream;
else
stream = b_stream;
......
......@@ -314,7 +314,7 @@ mu_message_save_attachment (mu_message_t msg, const char *filename,
ret =
mu_filter_create (&info->stream, istream, content_encoding,
MU_FILTER_DECODE,
MU_STREAM_READ | MU_STREAM_NO_CLOSE);
MU_STREAM_READ);
free (content_encoding_mem);
}
}
......
......@@ -589,8 +589,7 @@ mu_cfg_tree_reduce (mu_cfg_tree_t *parse_tree, const char *progname,
if (flags & MU_PARSE_CONFIG_DUMP)
{
mu_stream_t stream;
mu_stdio_stream_create (&stream, MU_STDERR_FD,
MU_STREAM_NO_CHECK|MU_STREAM_NO_CLOSE);
mu_stdio_stream_create (&stream, MU_STDERR_FD, 0);
mu_stream_open (stream);
mu_cfg_format_parse_tree (stream, parse_tree, MU_CFG_FMT_LOCUS);
mu_stream_destroy (&stream);
......
......@@ -58,7 +58,7 @@ static void
_dbg_done (struct _mu_stream *str)
{
struct _mu_dbgstream *sp = (struct _mu_dbgstream *)str;
if (!(str->flags & MU_STREAM_NO_CLOSE))
if (str->flags & MU_STREAM_AUTOCLOSE)
mu_debug_destroy (&sp->debug, NULL);
}
......@@ -70,7 +70,7 @@ mu_dbgstream_create(mu_stream_t *pref, mu_debug_t debug, mu_log_level_t level,
sp = (struct _mu_dbgstream *)
_mu_stream_create (sizeof (*sp), MU_STREAM_WRITE |
(flags & MU_STREAM_NO_CLOSE));
(flags & MU_STREAM_AUTOCLOSE));
if (!sp)
return ENOMEM;
sp->stream.write = _dbg_write;
......
......@@ -62,9 +62,12 @@ static int
fd_close (struct _mu_stream *str)
{
struct _mu_file_stream *fstr = (struct _mu_file_stream *) str;
if (close (fstr->fd))
if (fstr->fd != -1)
{
if ((str->flags & MU_STREAM_AUTOCLOSE) && close (fstr->fd))
return errno;
fstr->fd = -1;
}
return 0;
}
......@@ -234,8 +237,8 @@ _mu_file_stream_create (mu_stream_t *pstream, size_t size,
char *filename, int flags)
{
struct _mu_file_stream *str =
(struct _mu_file_stream *) _mu_stream_create (size,
flags | MU_STREAM_SEEK);
(struct _mu_file_stream *)
_mu_stream_create (size, flags | MU_STREAM_SEEK);
if (!str)
return ENOMEM;
......@@ -267,7 +270,7 @@ mu_file_stream_create (mu_stream_t *pstream, const char *filename, int flags)
return ENOMEM;
rc = _mu_file_stream_create (pstream,
sizeof (struct _mu_file_stream),
fname, flags);
fname, flags | MU_STREAM_AUTOCLOSE);
if (rc)
free (fname);
return rc;
......
......@@ -93,26 +93,27 @@ filter_create_rd (mu_stream_t *pstream, mu_stream_t stream,
int status;
mu_stream_t fltstream;
flags &= ~MU_STREAM_AUTOCLOSE;
status = mu_filter_stream_create (&fltstream, stream,
mode, xcode, xdata,
flags & ~MU_STREAM_NO_CLOSE);
flags);
if (status == 0)
{
if (max_line_length)
{
status = mu_linelen_filter_create (pstream, fltstream,
max_line_length,
flags & ~MU_STREAM_NO_CLOSE);
flags);
mu_stream_unref (fltstream);
if (status)
mu_stream_destroy (&fltstream);
else if (flags & MU_STREAM_NO_CLOSE)
mu_stream_set_flags (*pstream, MU_STREAM_NO_CLOSE);
return status;
}
else
*pstream = fltstream;
if (flags & MU_STREAM_NO_CLOSE)
mu_stream_set_flags (fltstream, MU_STREAM_NO_CLOSE);
if (flags & MU_STREAM_AUTOCLOSE)
mu_stream_unref (stream);
}
return status;
}
......@@ -125,29 +126,31 @@ filter_create_wr (mu_stream_t *pstream, mu_stream_t stream,
int flags)
{
int status;
mu_stream_t fltstream, instream = NULL;
mu_stream_t fltstream, instream = NULL, tmpstr;
flags &= ~MU_STREAM_AUTOCLOSE;
if (max_line_length)
{
status = mu_linelen_filter_create (&instream, stream,
max_line_length,
flags & ~MU_STREAM_NO_CLOSE);
flags);
if (status)
return status;
stream = instream;
tmpstr = instream;
}
else
tmpstr = stream;
status = mu_filter_stream_create (&fltstream, stream,
status = mu_filter_stream_create (&fltstream, tmpstr,
mode, xcode, xdata,
flags);
if (instream)
{
mu_stream_unref (instream);
if (status)
mu_stream_destroy (&instream);
else if (flags & MU_STREAM_NO_CLOSE)
mu_stream_set_flags (fltstream, MU_STREAM_NO_CLOSE);
}
return status;
*pstream = fltstream;
if (flags & MU_STREAM_AUTOCLOSE)
mu_stream_unref (stream);
return status;
}
......
......@@ -90,7 +90,6 @@ _icvt_close (mu_stream_t stream)
struct icvt_stream *s = (struct icvt_stream *)stream;
if (s->state != state_closed)
{
if (!(stream->flags & MU_STREAM_NO_CLOSE))
mu_stream_close (s->transport);
iconv_close (s->cd);
s->cd = (iconv_t) -1;
......@@ -106,7 +105,6 @@ _icvt_done (mu_stream_t stream)
if (s->state != state_closed)
_icvt_close (stream);
if (!(stream->flags & MU_STREAM_NO_CLOSE))
mu_stream_destroy (&s->transport);
free (s->buf);
}
......@@ -442,6 +440,8 @@ mu_filter_iconv_create (mu_stream_t *s, mu_stream_t transport,
return ENOMEM;
}
if (!(flags & MU_STREAM_AUTOCLOSE))
mu_stream_ref (transport);
iptr->transport = transport;
iptr->fallback_mode = fallback_mode;
iptr->cd = cd;
......
......@@ -45,8 +45,8 @@
static void
init_iobuf (struct mu_filter_io *io, struct _mu_filter_stream *fs)
{
io->input = MFB_CURPTR (fs->inbuf);
io->isize = MFB_RDBYTES (fs->inbuf);
io->input = MFB_BASE (fs->inbuf);
io->isize = MFB_LEVEL (fs->inbuf);
io->output = MFB_ENDPTR (fs->outbuf);
io->osize = MFB_FREESIZE (fs->outbuf);
}
......@@ -234,7 +234,7 @@ filter_write_internal (mu_stream_t stream, enum mu_filter_command cmd,
break;
if (rdsize > MFB_FREESIZE (fs->inbuf))
rdsize = MFB_FREESIZE (fs->inbuf);
memcpy (MFB_BASE (fs->inbuf), buf + total, rdsize);
memcpy (MFB_ENDPTR (fs->inbuf), buf + total, rdsize);
MFB_advance_level (&fs->inbuf, rdsize);
total += rdsize;
}
......@@ -374,8 +374,6 @@ filter_done (mu_stream_t stream)
fs->xcode (fs->xdata, mu_filter_done, NULL);
free (fs->xdata);
}
if (stream->flags & MU_STREAM_NO_CLOSE)
return;
mu_stream_destroy (&fs->transport);
}
......@@ -383,8 +381,6 @@ static int
filter_close (mu_stream_t stream)
{
struct _mu_filter_stream *fs = (struct _mu_filter_stream *)stream;
if (stream->flags & MU_STREAM_NO_CLOSE)
return 0;
MBF_CLEAR (fs->inbuf);
MBF_CLEAR (fs->outbuf);
return mu_stream_close (fs->transport);
......@@ -464,6 +460,8 @@ mu_filter_stream_create (mu_stream_t *pflt,
fs->stream.error_string = filter_error_string;
fs->stream.flags = flags;
if (!(flags & MU_STREAM_AUTOCLOSE))
mu_stream_ref (str);
fs->transport = str;
fs->xcode = xcode;
fs->xdata = xdata;
......
......@@ -112,10 +112,7 @@ static int
_iostream_close (struct _mu_stream *str)
{
struct _mu_iostream *sp = (struct _mu_iostream *)str;
if (sp->stream.flags & MU_STREAM_NO_CLOSE)
return 0;
mu_stream_close (sp->transport[_MU_STREAM_INPUT]);
if (sp->transport[_MU_STREAM_INPUT] != sp->transport[_MU_STREAM_OUTPUT])
mu_stream_close (sp->transport[_MU_STREAM_OUTPUT]);
return 0;
}
......@@ -125,7 +122,6 @@ _iostream_done (struct _mu_stream *str)
{
struct _mu_iostream *sp = (struct _mu_iostream *)str;
mu_stream_unref (sp->transport[_MU_STREAM_INPUT]);
if (sp->transport[_MU_STREAM_INPUT] != sp->transport[_MU_STREAM_OUTPUT])
mu_stream_unref (sp->transport[_MU_STREAM_OUTPUT]);
}
......@@ -212,15 +208,13 @@ _iostream_error_string (struct _mu_stream *str, int rc)
}
int
mu_iostream_create (mu_stream_t *pref, mu_stream_t in, mu_stream_t out,
int flags)
mu_iostream_create (mu_stream_t *pref, mu_stream_t in, mu_stream_t out)
{
struct _mu_iostream *sp;
sp = (struct _mu_iostream *)
_mu_stream_create (sizeof (*sp),
MU_STREAM_READ | MU_STREAM_WRITE |
(flags & MU_STREAM_NO_CLOSE));
MU_STREAM_READ | MU_STREAM_WRITE);
if (!sp)
return ENOMEM;
......
......@@ -520,7 +520,7 @@ mu_mimehdr_decode_param (const char *value, int flags,
break;
rc = mu_filter_iconv_create (&cvt, instr, source_cs, charset,
MU_STREAM_NO_CLOSE,
0,
mu_default_fallback_mode);
if (rc)
break;
......
......@@ -1328,16 +1328,16 @@ mu_decode_filter (mu_stream_t *pfilter, mu_stream_t input,
if (fromcode && tocode && mu_c_strcasecmp (fromcode, tocode))
{
mu_stream_t cvt;
status = mu_filter_iconv_create (&cvt, filter, fromcode, tocode,
MU_STREAM_NO_CLOSE,
mu_default_fallback_mode);
0, mu_default_fallback_mode);
if (status == 0)
{
if (mu_stream_open (cvt))
mu_stream_destroy (&cvt);
else
{
mu_stream_clr_flags (cvt, MU_STREAM_NO_CLOSE);
mu_stream_unref (filter);
filter = cvt;
}
}
......
......@@ -301,7 +301,7 @@ _prog_open (mu_stream_t stream)
if (REDIRECT_STDOUT_P (flags))
{
rc = mu_stdio_stream_create (&fs->in, pfd[0],
MU_STREAM_READ|seekable_flag);
MU_STREAM_READ|MU_STREAM_AUTOCLOSE|seekable_flag);
if (rc)
{
_prog_close (stream);
......@@ -318,7 +318,7 @@ _prog_open (mu_stream_t stream)
if (REDIRECT_STDIN_P (flags))
{
rc = mu_stdio_stream_create (&fs->out, pfd[1],
MU_STREAM_WRITE|seekable_flag);
MU_STREAM_WRITE|MU_STREAM_AUTOCLOSE|seekable_flag);
if (rc)
{
_prog_close (stream);
......@@ -454,6 +454,7 @@ mu_filter_prog_stream_create (mu_stream_t *pstream, const char *progname,
fs = _prog_stream_create (progname, MU_STREAM_RDWR);
if (!fs)
return ENOMEM;
mu_stream_ref (input);
fs->input = input;
*pstream = (mu_stream_t) fs;
return 0;
......
......@@ -166,6 +166,7 @@ mu_rfc2047_decode (const char *tocode, const char *input, char **ptostr)
mu_stream_seek (in_stream, 0, MU_SEEK_SET, NULL);
status = mu_decode_filter (&filter, in_stream, filter_type, fromcode,
tocode);
mu_stream_unref (in_stream);
if (status != 0)
break;
......@@ -273,7 +274,8 @@ mu_rfc2047_encode (const char *charset, const char *encoding,
mu_stream_write (input_stream, text, strlen (text), NULL);
mu_stream_seek (input_stream, 0, MU_SEEK_SET, NULL);
rc = mu_filter_create (&output_stream, input_stream,
encoding, MU_FILTER_ENCODE, MU_STREAM_READ);
encoding, MU_FILTER_ENCODE,
MU_STREAM_READ | MU_STREAM_AUTOCLOSE);
if (rc == 0)
{
/* Assume strlen(qp_encoded_text) <= strlen(text) * 3 */
......
......@@ -114,7 +114,8 @@ mu_socket_stream_create (mu_stream_t *pstream, const char *filename, int flags)
struct _mu_socket_stream *s;
int rc;
rc = _mu_stdio_stream_create (pstream, sizeof (*s), flags);
rc = _mu_stdio_stream_create (pstream, sizeof (*s),
flags | MU_STREAM_AUTOCLOSE);
if (rc)
return rc;
s = (struct _mu_socket_stream *) *pstream;
......
......@@ -54,6 +54,9 @@ _stream_seterror (struct _mu_stream *stream, int code, int perm)
#define _stream_advance_buffer(s,n) ((s)->cur += n, (s)->level -= n)
#define _stream_buffer_offset(s) ((s)->cur - (s)->buffer)
#define _stream_orig_level(s) ((s)->level + _stream_buffer_offset (s))
#define _stream_buffer_freespace(s) \
((s)->bufsize - (s)->level - _stream_buffer_offset(s))
#define _stream_buffer_is_full(s) (_stream_buffer_freespace(s) == 0)
static int
_stream_fill_buffer (struct _mu_stream *stream)
......@@ -93,9 +96,6 @@ _stream_fill_buffer (struct _mu_stream *stream)
return rc;
}
#define BUFFER_FULL_P(s) \
((s)->cur + (s)->level == (s)->buffer + (s)->bufsize)
static int
_stream_buffer_full_p (struct _mu_stream *stream)
{
......@@ -105,11 +105,11 @@ _stream_buffer_full_p (struct _mu_stream *stream)
break;
case mu_buffer_line:
return BUFFER_FULL_P (stream)
return _stream_buffer_is_full (stream)
|| memchr (stream->cur, '\n', stream->level) != NULL;
case mu_buffer_full:
return BUFFER_FULL_P (stream);
return _stream_buffer_is_full (stream);
}
return 0;
}
......@@ -135,7 +135,6 @@ _stream_flush_buffer (struct _mu_stream *stream, int all)
if ((rc = mu_stream_write_unbuffered (stream, stream->cur,
stream->level, 1, NULL)))
return rc;
if (all)
_stream_advance_buffer (stream, stream->level);
break;
......@@ -154,7 +153,7 @@ _stream_flush_buffer (struct _mu_stream *stream, int all)
return rc;
_stream_advance_buffer (stream, size);
}
if ((all && stream->level) || BUFFER_FULL_P (stream))
if ((all && stream->level) || _stream_buffer_is_full (stream))
{
rc = mu_stream_write_unbuffered (stream,
stream->cur,
......@@ -799,7 +798,7 @@ mu_stream_write (mu_stream_t stream, const void *buf, size_t size,
if (size == 0)
break;
n = stream->bufsize - stream->level;
n = _stream_buffer_freespace (stream);
if (n > size)
n = size;
memcpy (stream->cur + stream->level, bufp, n);
......
......@@ -140,8 +140,6 @@ static int
_streamref_close (struct _mu_stream *str)
{
struct _mu_streamref *sp = (struct _mu_streamref *)str;
if (sp->stream.flags & MU_STREAM_NO_CLOSE)
return 0;
return streamref_return (sp, mu_stream_close (sp->transport));
}
......@@ -271,9 +269,11 @@ mu_streamref_create_abridged (mu_stream_t *pref, mu_stream_t str,
if (rc)
return rc;
mu_stream_get_flags (str, &flags);
sp = (struct _mu_streamref *) _mu_stream_create (sizeof (*sp), flags);
sp = (struct _mu_streamref *)
_mu_stream_create (sizeof (*sp), flags & ~MU_STREAM_AUTOCLOSE);
if (!sp)
return ENOMEM;
mu_stream_ref (str);
sp->stream.read = _streamref_read;
......
......@@ -317,7 +317,7 @@ mu_tcp_stream_create_with_source_ip (mu_stream_t *stream,
if (port < 1)
return MU_ERR_TCP_NO_PORT;
tcp = _create_tcp_stream (flags | MU_STREAM_NO_CHECK | MU_STREAM_RDWR);
tcp = _create_tcp_stream (flags | MU_STREAM_RDWR);
if (!tcp)
return ENOMEM;
tcp->host = strdup (host);
......
......@@ -58,7 +58,9 @@ mu_temp_file_stream_create (mu_stream_t *pstream, const char *dir)
rc = _mu_file_stream_create (pstream,
sizeof (struct _mu_file_stream),
fname, MU_STREAM_RDWR|MU_STREAM_CREAT);
fname,
MU_STREAM_RDWR | MU_STREAM_CREAT |
MU_STREAM_AUTOCLOSE);
if (rc)
{
free (fname);
......
......@@ -224,7 +224,7 @@ mu_vartab_expand (mu_vartab_t vt, const char *str, char **pres)
return EINVAL;
if (!vt->stream)
{
rc = mu_memory_stream_create (&vt->stream, MU_STREAM_NO_CHECK);
rc = mu_memory_stream_create (&vt->stream, 0);
if (rc)
return rc;
rc = mu_stream_open (vt->stream);
......
......@@ -144,8 +144,6 @@ static int
_xscript_close (struct _mu_stream *str)
{
struct _mu_xscript_stream *sp = (struct _mu_xscript_stream *)str;
if (sp->stream.flags & MU_STREAM_NO_CLOSE)
return 0;
return mu_stream_close (sp->transport);
}
......@@ -268,9 +266,12 @@ mu_xscript_stream_create(mu_stream_t *pref, mu_stream_t transport,
sp->stream.shutdown = _xscript_shutdown;
sp->stream.error_string = _xscript_error_string;
if (!(flags & MU_STREAM_AUTOCLOSE))
{
mu_stream_ref (transport);
sp->transport = transport;
mu_stream_ref (logstr);
}
sp->transport = transport;
sp->logstr = logstr;
sp->flags = TRANS_READ | TRANS_WRITE;
......
......@@ -698,9 +698,9 @@ eval_body (struct eval_env *env)
mu_header_aget_value (hdr, MU_HEADER_CONTENT_TRANSFER_ENCODING, &encoding);
if (encoding)
{
int rc = mu_filter_create(&dstr, input, encoding,
int rc = mu_filter_create (&dstr, input, encoding,
MU_FILTER_DECODE,
MU_STREAM_READ | MU_STREAM_NO_CLOSE);
MU_STREAM_READ);
if (rc == 0)
input = dstr;
free (encoding);
......
......@@ -1119,7 +1119,7 @@ mhn_message_size (mu_message_t msg, size_t *psize)
rc = mu_filter_create (&dstr, bstr, encoding,
MU_FILTER_DECODE,
MU_STREAM_READ | MU_STREAM_NO_CLOSE);
MU_STREAM_READ);
free (encoding);
if (rc == 0)
{
......@@ -1256,7 +1256,7 @@ show_internal (mu_message_t msg, msg_part_t part, char *encoding, mu_stream_t ou
}
mu_body_get_streamref (body, &bstr);
rc = mu_filter_create (&dstr, bstr, encoding,
MU_FILTER_DECODE, MU_STREAM_READ | MU_STREAM_NO_CLOSE);
MU_FILTER_DECODE, MU_STREAM_READ);
if (rc == 0)
bstr = dstr;
rc = mu_stream_copy (out, bstr, 0);
......@@ -2001,7 +2001,7 @@ finish_text_msg (struct compose_env *env, mu_message_t *msg, int ascii)
mu_body_get_streamref (body, &input);
rc = mu_filter_create (&fstr, input, "quoted-printable",
MU_FILTER_ENCODE,
MU_STREAM_READ | MU_STREAM_NO_CLOSE);
MU_STREAM_READ);
if (rc == 0)
{
mu_stream_copy (output, fstr, 0);
......@@ -2276,7 +2276,8 @@ edit_mime (char *cmd, struct compose_env *env, mu_message_t *msg, int level)
free (subtype);
}
rc = mu_filter_create (&fstr, in, encoding, MU_FILTER_ENCODE, MU_STREAM_READ);
rc = mu_filter_create (&fstr, in, encoding, MU_FILTER_ENCODE,
MU_STREAM_READ | MU_STREAM_AUTOCLOSE);
if (rc)
{
fstr = in;
......
......@@ -259,7 +259,7 @@ display_file (const char *type)
{
mu_stdio_stream_create (&stream, fileno (mimeview_fp),
MU_STREAM_READ|
MU_STREAM_SEEK|MU_STREAM_NO_CLOSE);
MU_STREAM_SEEK);
mu_stream_open (stream);
display_stream_mailcap (mimeview_file, stream, hdr,
......
......@@ -145,21 +145,21 @@ pop3d_setio (FILE *in, FILE *out)
pop3d_abquit (ERR_NO_OFILE);
if (mu_stdio_stream_create (&istream, fileno (in),
MU_STREAM_READ | MU_STREAM_NO_CLOSE))
MU_STREAM_READ | MU_STREAM_AUTOCLOSE))
pop3d_abquit (ERR_NO_IFILE);
real_istream = istream;
mu_stream_set_buffer (istream, mu_buffer_line, 1024);
if (mu_stdio_stream_create (&str, fileno (out),
MU_STREAM_WRITE | MU_STREAM_NO_CLOSE))
MU_STREAM_WRITE | MU_STREAM_AUTOCLOSE))
pop3d_abquit (ERR_NO_OFILE);
real_ostream = str;
if (mu_filter_create (&ostream, str, "rfc822", MU_FILTER_ENCODE,
MU_STREAM_WRITE | MU_STREAM_NO_CLOSE))
MU_STREAM_WRITE))
pop3d_abquit (ERR_NO_IFILE);
mu_stream_set_buffer (ostream, mu_buffer_line, 1024);
if (mu_iostream_create (&iostream, istream, ostream, 0))
if (mu_iostream_create (&iostream, istream, ostream))
pop3d_abquit (ERR_FILE);
if (pop3d_transcript)
{
......@@ -169,8 +169,7 @@ pop3d_setio (FILE *in, FILE *out)
mu_diag_get_debug (&debug);
rc = mu_dbgstream_create (&dstr, debug, MU_DIAG_DEBUG,
MU_STREAM_NO_CLOSE);
rc = mu_dbgstream_create (&dstr, debug, MU_DIAG_DEBUG, 0);
if (rc)
mu_error (_("cannot create debug stream; transcript disabled: %s"),
mu_strerror (rc));
......@@ -181,9 +180,12 @@ pop3d_setio (FILE *in, FILE *out)
mu_error (_("cannot create transcript stream: %s"),
mu_strerror (rc));
else
{
mu_stream_unref (iostream);
iostream = xstr;
}
}
}
}
#ifdef WITH_TLS
......@@ -193,8 +195,7 @@ pop3d_init_tls_server ()
mu_stream_t stream;
int rc;
rc = mu_tls_server_stream_create (&stream, real_istream, real_ostream,
MU_STREAM_NO_CLOSE);
rc = mu_tls_server_stream_create (&stream, real_istream, real_ostream, 0);
if (rc)
return 1;
......@@ -208,8 +209,7 @@ pop3d_init_tls_server ()
}
if (mu_filter_create (&stream, stream, "rfc822", MU_FILTER_ENCODE,
MU_STREAM_WRITE | MU_STREAM_RDTHRU |
MU_STREAM_NO_CLOSE))
MU_STREAM_WRITE | MU_STREAM_RDTHRU))
pop3d_abquit (ERR_NO_IFILE);
if (pop3d_transcript)
......