Commit 29d2e390 29d2e3904c897c01191072db85985b93f9ea1704 by Sergey Poznyakoff

Implement an ioctl for replace bottom-level transport layers in a stream chain.

* include/mailutils/stream.h (MU_IOCTL_SWAP_STREAM): New ioctl op.
(mu_stream_seterr): New function.
* include/mailutils/sys/stream.h (_MU_SWAP_FIRST_ONLY)
(_MU_SWAP_IOCTL_MUST_SUCCEED): New defines.
(_mu_stream_swap_streams): New proto.
* mailbox/amd.c (amd_body_stream_read): Always update *pnread.
* mailbox/errors (MU_ERR_NO_TRANSPORT): New error code.

* mailbox/stream.c (_stream_seterror): Rename to mu_stream_seterr,
remove static qualifier. All uses updated.
(_mu_stream_swap_streams): New function.

* mailbox/filter_iconv.c (_icvt_ioctl): Implement MU_IOCTL_SWAP_STREAM.
* mailbox/iostream.c (_iostream_ctl): Likewise.
* mailbox/xscript-stream.c (_xscript_ctl): Likewise.

* pop3d/extra.c (real_istream, real_ostream): Remove statics.
(pop3d_init_tls_server): Use MU_IOCTL_SWAP_STREAM to replace
bottom-level transport layers without recreating the entire stream
chain.
1 parent baea3184
......@@ -32,8 +32,6 @@ enum mu_buffer_type
#define MU_SEEK_CUR 1
#define MU_SEEK_END 2
#define MU_STREAM_NO_CHECK 0 /* for backward compatibility */
#define MU_STREAM_READ 0x00000001
#define MU_STREAM_WRITE 0x00000002
#define MU_STREAM_RDWR (MU_STREAM_READ|MU_STREAM_WRITE)
......@@ -63,6 +61,7 @@ enum mu_buffer_type
#define MU_IOCTL_ABRIDGE_SEEK MU_IOCTL_SET_SEEK_LIMITS
#define MU_IOCTL_GET_SEEK_LIMITS 5
#define MU_IOCTL_SET_TRANSPORT 6
#define MU_IOCTL_SWAP_STREAM 7
void mu_stream_ref (mu_stream_t stream);
void mu_stream_unref (mu_stream_t stream);
......@@ -72,6 +71,8 @@ const char *mu_stream_strerror (mu_stream_t stream, int rc);
int mu_stream_err (mu_stream_t stream);
int mu_stream_last_error (mu_stream_t stream);
void mu_stream_clearerr (mu_stream_t stream);
int mu_stream_seterr (struct _mu_stream *stream, int code, int perm);
int mu_stream_eof (mu_stream_t stream);
int mu_stream_seek (mu_stream_t stream, mu_off_t offset, int whence,
mu_off_t *pres);
......
......@@ -65,4 +65,11 @@ int mu_stream_write_unbuffered (mu_stream_t stream,
const void *buf, size_t size,
int full_write, size_t *pnwritten);
#define _MU_SWAP_FIRST_ONLY 0x01
#define _MU_SWAP_IOCTL_MUST_SUCCEED 0x02
int _mu_stream_swap_streams (mu_stream_t stream, mu_stream_t *curtrans,
mu_stream_t *newtrans, int flags);
#endif
......
......@@ -679,7 +679,7 @@ _mu_tls_stream_create (mu_stream_t *pstream,
free (sp);
return rc;
}
rc = _mu_tls_io_stream_create (&sp->transport[1], strout,
MU_STREAM_WRITE | autoclose, sp);
if (rc)
......@@ -688,7 +688,7 @@ _mu_tls_stream_create (mu_stream_t *pstream,
free (sp->transport[0]);
return rc;
}
mu_stream_set_buffer ((mu_stream_t) sp, mu_buffer_line, 1024);
*pstream = (mu_stream_t) sp;
return 0;
......
......@@ -1604,8 +1604,7 @@ amd_body_stream_read (mu_stream_t is, char *buffer, size_t buflen,
if (buffer == NULL || buflen == 0)
{
if (pnread)
*pnread = nread;
*pnread = nread;
return 0;
}
......@@ -1626,11 +1625,11 @@ amd_body_stream_read (mu_stream_t is, char *buffer, size_t buflen,
{
status = mu_stream_read (mhm->stream, buffer, nread, &nread);
amdstr->off += nread;
if (pnread)
*pnread = nread;
}
}
*pnread = nread;
mu_monitor_unlock (mhm->amd->mailbox->monitor);
#ifdef WITH_PTHREAD
pthread_cleanup_pop (0);
......
......@@ -83,3 +83,5 @@ MU_ERR_NO_INTERFACE _("No such interface")
MU_ERR_BADOP _("Inappropriate operation for this mode")
MU_ERR_BAD_FILENAME _("Badly formed file or directory name")
MU_ERR_READ _("Read error")
MU_ERR_NO_TRANSPORT _("Transport stream not set")
......
......@@ -400,6 +400,9 @@ _icvt_ioctl (mu_stream_t stream, int code, void *ptr)
ptrans[1] = NULL;
break;
case MU_IOCTL_SWAP_STREAM:
return mu_stream_ioctl (s->transport, code, ptr);
default:
return EINVAL;
}
......
......@@ -148,6 +148,11 @@ _iostream_ctl (struct _mu_stream *str, int op, void *arg)
sp->transport[_MU_STREAM_INPUT] = (mu_stream_t) ptrans[0];
sp->transport[_MU_STREAM_OUTPUT] = (mu_stream_t) ptrans[1];
break;
case MU_IOCTL_SWAP_STREAM:
if (!arg)
return EINVAL;
return _mu_stream_swap_streams (str, sp->transport, arg, 0);
default:
return EINVAL;
......
......@@ -32,8 +32,8 @@
#include <mailutils/stream.h>
#include <mailutils/sys/stream.h>
static int
_stream_seterror (struct _mu_stream *stream, int code, int perm)
int
mu_stream_seterr (struct _mu_stream *stream, int code, int perm)
{
stream->last_err = code;
switch (code)
......@@ -236,7 +236,7 @@ mu_stream_open (mu_stream_t stream)
int rc;
if (stream->open && (rc = stream->open (stream)))
return _stream_seterror (stream, rc, 1);
return mu_stream_seterr (stream, rc, 1);
stream->bytes_in = stream->bytes_out = 0;
return 0;
}
......@@ -286,10 +286,10 @@ mu_stream_seek (mu_stream_t stream, mu_off_t offset, int whence,
mu_off_t size;
if (!stream->seek)
return _stream_seterror (stream, ENOSYS, 0);
return mu_stream_seterr (stream, ENOSYS, 0);
if (!(stream->flags & MU_STREAM_SEEK))
return _stream_seterror (stream, EACCES, 1);
return mu_stream_seterr (stream, EACCES, 1);
switch (whence)
{
......@@ -308,12 +308,12 @@ mu_stream_seek (mu_stream_t stream, mu_off_t offset, int whence,
case MU_SEEK_END:
rc = mu_stream_size (stream, &size);
if (rc)
return _stream_seterror (stream, rc, 1);
return mu_stream_seterr (stream, rc, 1);
offset += size;
break;
default:
return _stream_seterror (stream, EINVAL, 1);
return mu_stream_seterr (stream, EINVAL, 1);
}
if (stream->buftype == mu_buffer_none
......@@ -326,7 +326,7 @@ mu_stream_seek (mu_stream_t stream, mu_off_t offset, int whence,
if (rc == ESPIPE)
return rc;
if (rc)
return _stream_seterror (stream, rc, 1);
return mu_stream_seterr (stream, rc, 1);
_stream_cleareof (stream);
}
......@@ -352,7 +352,7 @@ mu_stream_skip_input_bytes (mu_stream_t stream, mu_off_t count, mu_off_t *pres)
int rc;
if (!(stream->flags & MU_STREAM_READ))
return _stream_seterror (stream, EACCES, 1);
return mu_stream_seterr (stream, EACCES, 1);
if (stream->buftype == mu_buffer_none)
{
......@@ -426,7 +426,7 @@ mu_stream_set_buffer (mu_stream_t stream, enum mu_buffer_type type,
if (stream->buffer == NULL)
{
stream->buftype = mu_buffer_none;
return _stream_seterror (stream, ENOMEM, 1);
return mu_stream_seterr (stream, ENOMEM, 1);
}
stream->bufsize = size;
stream->cur = stream->buffer;
......@@ -444,10 +444,10 @@ mu_stream_read_unbuffered (mu_stream_t stream, void *buf, size_t size,
size_t nread;
if (!stream->read)
return _stream_seterror (stream, ENOSYS, 0);
return mu_stream_seterr (stream, ENOSYS, 0);
if (!(stream->flags & MU_STREAM_READ))
return _stream_seterror (stream, EACCES, 1);
return mu_stream_seterr (stream, EACCES, 1);
if (stream->flags & _MU_STR_ERR)
return stream->last_err;
......@@ -479,7 +479,7 @@ mu_stream_read_unbuffered (mu_stream_t stream, void *buf, size_t size,
}
if (size && rc)
rc = _stream_seterror (stream, rc, 0);
rc = mu_stream_seterr (stream, rc, 0);
}
else
{
......@@ -490,7 +490,7 @@ mu_stream_read_unbuffered (mu_stream_t stream, void *buf, size_t size,
stream->flags |= _MU_STR_EOF;
stream->bytes_in += nread;
}
_stream_seterror (stream, rc, rc != 0);
mu_stream_seterr (stream, rc, rc != 0);
}
stream->offset += nread;
if (pnread)
......@@ -509,10 +509,10 @@ mu_stream_write_unbuffered (mu_stream_t stream,
size_t nwritten;
if (!stream->write)
return _stream_seterror (stream, ENOSYS, 0);
return mu_stream_seterr (stream, ENOSYS, 0);
if (!(stream->flags & MU_STREAM_WRITE))
return _stream_seterror (stream, EACCES, 1);
return mu_stream_seterr (stream, EACCES, 1);
if (stream->flags & _MU_STR_ERR)
return stream->last_err;
......@@ -555,7 +555,7 @@ mu_stream_write_unbuffered (mu_stream_t stream,
stream->offset += nwritten;
if (pnwritten)
*pnwritten = nwritten;
_stream_seterror (stream, rc, rc != 0);
mu_stream_seterr (stream, rc, rc != 0);
return rc;
}
......@@ -861,9 +861,9 @@ mu_stream_size (mu_stream_t stream, mu_off_t *psize)
int rc;
if (!stream->size)
return _stream_seterror (stream, ENOSYS, 0);
return mu_stream_seterr (stream, ENOSYS, 0);
rc = stream->size (stream, psize);
return _stream_seterror (stream, rc, rc != 0);
return mu_stream_seterr (stream, rc, rc != 0);
}
mu_off_t
......@@ -948,8 +948,88 @@ mu_stream_clr_flags (mu_stream_t stream, int fl)
return 0;
}
static void
swapstr (mu_stream_t stream, mu_stream_t *curstr, mu_stream_t *newstr)
{
mu_stream_t tmp;
tmp = *newstr;
*newstr = *curstr;
*curstr = tmp;
if (!(stream->flags & MU_STREAM_AUTOCLOSE))
{
if (*newstr)
mu_stream_unref (*newstr);
if (tmp)
mu_stream_ref (tmp);
}
if (!tmp)
mu_stream_seterr (stream, MU_ERR_NO_TRANSPORT, 1);
else if (stream->last_err == MU_ERR_NO_TRANSPORT)
mu_stream_clearerr (stream);
}
static int
swapstr_recursive (mu_stream_t stream, mu_stream_t *curstr,
mu_stream_t *newstr, int flags)
{
mu_stream_t strtab[2];
int rc = ENOSYS;
if (*curstr == NULL && *newstr == NULL)
return 0;
if (*curstr)
{
strtab[0] = *newstr;
strtab[1] = NULL;
rc = mu_stream_ioctl (*curstr, MU_IOCTL_SWAP_STREAM, strtab);
if (rc)
{
if ((flags & _MU_SWAP_IOCTL_MUST_SUCCEED)
|| !(rc == ENOSYS || rc == EINVAL))
return rc;
}
}
if (rc == 0)
*newstr = strtab[0];
else
swapstr (stream, curstr, newstr);
return 0;
}
/* CURTRANS[2] contains I/O transport streams used by STREAM,
NEWTRANS[2] contains another pair of streams.
This function swaps the items of these two arrays using the
MU_IOCTL_SWAP_STREAM ioctl. It is intended for use by STREAM's
ioctl method and is currently used by iostream.c */
int
_mu_stream_swap_streams (mu_stream_t stream, mu_stream_t *curtrans,
mu_stream_t *newtrans, int flags)
{
int rc;
rc = swapstr_recursive (stream, &curtrans[0], &newtrans[0], flags);
if (rc)
return rc;
if (flags & _MU_SWAP_FIRST_ONLY)
return 0;
rc = swapstr_recursive (stream, &curtrans[1], &newtrans[1], flags);
if (rc)
{
int rc1 = swapstr_recursive (stream, &curtrans[0], &newtrans[0], flags);
if (rc1)
{
mu_diag_output (MU_DIAG_CRIT,
_("restoring streams on %p failed: %s"),
stream, mu_strerror (rc1));
abort ();
}
}
return rc;
}
......
......@@ -176,7 +176,8 @@ _xscript_ctl (struct _mu_stream *str, int op, void *arg)
{
struct _mu_xscript_stream *sp = (struct _mu_xscript_stream *)str;
mu_transport_t *ptrans;
mu_stream_t strtab[2];
switch (op)
{
case MU_IOCTL_GET_TRANSPORT:
......@@ -196,7 +197,9 @@ _xscript_ctl (struct _mu_stream *str, int op, void *arg)
if (ptrans[1])
sp->logstr = (mu_stream_t) ptrans[1];
break;
case MU_IOCTL_SWAP_STREAM:
/* fall through */
default:
return mu_stream_ioctl (sp->transport, op, arg);
}
......
......@@ -126,14 +126,6 @@ pop3d_abquit (int reason)
exit (code);
}
/* Keeps the *real* output stream. Ostream is a RFC822 filter built over
real_ostream, or even over a TLS stream, which in turn is based on this
real_ostream.
FIXME: This is sorta kludge: we could use MU_IOCTL_GET_TRANSPORT call
to retrieve the bottom-level stream, if filter streams supported it.
*/
static mu_stream_t real_istream, real_ostream;
void
pop3d_setio (FILE *in, FILE *out)
{
......@@ -147,20 +139,24 @@ pop3d_setio (FILE *in, FILE *out)
if (mu_stdio_stream_create (&istream, fileno (in),
MU_STREAM_READ | MU_STREAM_AUTOCLOSE))
pop3d_abquit (ERR_NO_IFILE);
real_istream = istream;
mu_stream_set_buffer (istream, mu_buffer_line, 1024);
if (mu_stdio_stream_create (&str, fileno (out),
if (mu_stdio_stream_create (&ostream, fileno (out),
MU_STREAM_WRITE | MU_STREAM_AUTOCLOSE))
pop3d_abquit (ERR_NO_OFILE);
real_ostream = str;
if (mu_filter_create (&ostream, str, "rfc822", MU_FILTER_ENCODE,
MU_STREAM_WRITE))
pop3d_abquit (ERR_NO_IFILE);
mu_stream_set_buffer (ostream, mu_buffer_line, 1024);
if (mu_iostream_create (&iostream, istream, ostream))
/* Combine the two streams into an I/O one. */
if (mu_iostream_create (&str, istream, ostream))
pop3d_abquit (ERR_FILE);
/* Convert all writes to CRLF form.
There is no need to convert reads, as the code ignores extra \r anyway.
This also installs an extra full buffering, which is needed for TLS
code (see below). */
if (mu_filter_create (&iostream, str, "rfc822", MU_FILTER_ENCODE,
MU_STREAM_WRITE | MU_STREAM_RDTHRU))
pop3d_abquit (ERR_NO_IFILE);
if (pop3d_transcript)
{
int rc;
......@@ -192,36 +188,40 @@ pop3d_setio (FILE *in, FILE *out)
int
pop3d_init_tls_server ()
{
mu_stream_t stream;
mu_stream_t tlsstream, stream[2];
int rc;
rc = mu_tls_server_stream_create (&stream, real_istream, real_ostream, 0);
stream[0] = stream[1] = NULL;
rc = mu_stream_ioctl (iostream, MU_IOCTL_SWAP_STREAM, stream);
if (rc)
{
mu_error (_("%s failed: %s"), "MU_IOCTL_SWAP_STREAM",
mu_stream_strerror (iostream, rc));
return 1;
}
rc = mu_tls_server_stream_create (&tlsstream, stream[0], stream[1], 0);
if (rc)
return 1;
rc = mu_stream_open (stream);
rc = mu_stream_open (tlsstream);
if (rc)
{
mu_diag_output (MU_DIAG_ERROR, _("cannot open TLS stream: %s"),
mu_stream_strerror (stream, rc));
mu_stream_destroy (&stream);
mu_stream_strerror (tlsstream, rc));
mu_stream_destroy (&tlsstream);
return 1;
}
else
stream[0] = stream[1] = tlsstream;
if (mu_filter_create (&stream, stream, "rfc822", MU_FILTER_ENCODE,
MU_STREAM_WRITE | MU_STREAM_RDTHRU))
pop3d_abquit (ERR_NO_IFILE);
if (pop3d_transcript)
rc = mu_stream_ioctl (iostream, MU_IOCTL_SWAP_STREAM, stream);
if (rc)
{
mu_transport_t trans[2];
trans[0] = (mu_transport_t) stream;
trans[1] = NULL;
mu_stream_ioctl (iostream, MU_IOCTL_SET_TRANSPORT, trans);
mu_error (_("%s failed: %s"), "MU_IOCTL_SWAP_STREAM",
mu_stream_strerror (iostream, rc));
pop3d_abquit (ERR_IO);
}
else
iostream = stream;
return 0;
}
#endif
......