Commit 8c76d675 8c76d675333b70a0887de506073d64cd2c14643a by Sergey Poznyakoff

Optimize stream seeks and I/O. Expand stream event handlers. Rewrite xscript stream.

* include/mailutils/sys/stream.h (_MU_STR_EVENT_SET): Rename
to _MU_STR_EVENT_SET.
(_MU_STR_EVENT_CLR): Rename to _MU_STR_EVENT_CLRFLAG.
(_MU_STR_EVENT_FILLBUF, _MU_STR_EVENT_FLUSHBUF): New event codes.
(_MU_STR_EVMASK): New macro.
(_mu_stream) <cur>: Replace with pos, indicating current position
in the buffer.
(event_cb): Change signature.
(mu_stream_read_unbuffered): Remove.
(mu_stream_write_unbuffered): Remove.

* mailbox/iostream.c (mu_iostream_create): Provide the readdelim method
only if the underlying input transpor stream provides it.
* mailbox/streamref.c (mu_streamref_create_abridged): Likewise.

* mailbox/stream.c (_stream_event): New macro.
(mu_stream_read_unbuffered): Rename to static _stream_read_unbuffered.
(mu_stream_write_unbuffered): Rename to static _stream_write_unbuffered.
(_stream_advance_buffer, _stream_buffer_offset)
(_stream_orig_level): Remove macros.
(_stream_buffer_freespace): Rewrite.
(_stream_curp): New macro.
(_stream_fill_buffer): Make sure the `offset' indicates the offset
int the transport, corresponding to the beginning of the current
buffer.
(_stream_flush_buffer): Essentially rewritten.
(mu_stream_seek): Reflect changes to the _mu_stream structure. Optimize
calls to the seek method.
(_stream_skip_input_bytes): Likewise.
(mu_stream_read, _stream_scandelim, mu_stream_write): Rewrite using
new _mu_stream structure.

* mailbox/xscript-stream.c: Rewrite using stream events.

* mailbox/base64.c (_base64_encoder): Bugfix.

* libproto/pop/pop3_stream.c (_pop3_event_cb): Update signature to
match the changes above.
* examples/mimetest.c (main): Add more error checking.
* mail/testsuite/mail/write.exp: Minor fix.
1 parent fd032c6f
......@@ -116,7 +116,7 @@ main (int argc, char **argv)
MU_ASSERT (mu_mailbox_open (mbox, MU_STREAM_RDWR));
/* Iterate through the entire message set. */
mu_mailbox_messages_count (mbox, &count);
MU_ASSERT (mu_mailbox_messages_count (mbox, &count));
for (i = 1; i <= count; ++i)
{
......
......@@ -24,8 +24,12 @@
#define _MU_STR_INTERN_MASK 0xf0000000
#define _MU_STR_EVENT_SET 1
#define _MU_STR_EVENT_CLR 2
#define _MU_STR_EVENT_SETFLAG 0
#define _MU_STR_EVENT_CLRFLAG 1
#define _MU_STR_EVENT_FILLBUF 2
#define _MU_STR_EVENT_FLUSHBUF 3
#define _MU_STR_EVMASK(n) (1<<(n))
struct _mu_stream
{
......@@ -35,7 +39,7 @@ struct _mu_stream
size_t bufsize;
char *buffer;
size_t level;
char *cur;
size_t pos;
int flags;
mu_off_t offset;
......@@ -57,7 +61,7 @@ struct _mu_stream
int (*truncate) (struct _mu_stream *, mu_off_t);
int (*shutdown) (struct _mu_stream *, int);
void (*event_cb) (struct _mu_stream *, int, int);
void (*event_cb) (struct _mu_stream *, int code, unsigned long, void *);
int event_mask;
const char *(*error_string) (struct _mu_stream *, int);
......@@ -65,11 +69,6 @@ struct _mu_stream
};
mu_stream_t _mu_stream_create (size_t size, int flags);
int mu_stream_read_unbuffered (mu_stream_t stream, void *buf, size_t size,
int full_read, size_t *pnread);
int mu_stream_write_unbuffered (mu_stream_t stream,
const void *buf, size_t size,
int full_write, size_t *pnwritten);
void _mu_stream_cleareof (mu_stream_t str);
void _mu_stream_seteof (mu_stream_t str);
......
......@@ -41,10 +41,12 @@ struct mu_pop3_stream
struct mu_buffer_query oldbuf;
};
/* Called on _MU_STR_EVENT_SETFLAG */
static void
_pop3_event_cb (mu_stream_t str, int ev, int flags)
_pop3_event_cb (mu_stream_t str, int ev, unsigned long flags,
void *ptr MU_ARG_UNUSED)
{
if (ev == _MU_STR_EVENT_SET)
if (flags & _MU_STR_EOF)
{
mu_transport_t trans[2];
......@@ -78,7 +80,7 @@ mu_pop3_filter_create (mu_stream_t *pstream, mu_stream_t stream)
mu_stream_t str = *pstream;
str->event_cb = _pop3_event_cb;
str->event_mask = _MU_STR_EOF;
str->event_mask = _MU_STR_EVMASK(_MU_STR_EVENT_SETFLAG);
sp->oldbuf.type = MU_TRANSPORT_OUTPUT;
if (mu_stream_ioctl (sp->pop3->carrier, MU_IOCTL_GET_TRANSPORT_BUFFER,
......
......@@ -27,7 +27,7 @@ mail_test -noprompt "quit" \
# Start again using the same mailbox
mail_start -reuse-spool "--file=%mbox1"
# Go to the last message and do delete 4 times
mail_command "4"
mail_command "3"
mail_command "delete"
mail_command "delete"
mail_command "delete"
......
......@@ -244,6 +244,9 @@ _base64_encoder (void *xd MU_ARG_UNUSED,
pad = 0;
}
/* Consumed may grow bigger than isize if cmd is mu_filter_lastbuf */
if (consumed > iobuf->isize)
consumed = iobuf->isize;
iobuf->isize = consumed;
iobuf->osize = nbytes;
return mu_filter_ok;
......
......@@ -236,6 +236,7 @@ mu_iostream_create (mu_stream_t *pref, mu_stream_t in, mu_stream_t out)
return ENOMEM;
sp->stream.read = _iostream_read;
if (in->readdelim)
sp->stream.readdelim = _iostream_readdelim;
sp->stream.write = _iostream_write;
sp->stream.flush = _iostream_flush;
......
......@@ -34,19 +34,32 @@
size_t mu_stream_default_buffer_size = MU_STREAM_DEFBUFSIZ;
#define _stream_event(stream, code, n, p) \
do \
{ \
if ((stream)->event_cb && \
((stream)->event_mask & _MU_STR_EVMASK(code))) \
(stream)->event_cb (stream, code, n, p); \
} \
while (0)
static int _stream_read_unbuffered (mu_stream_t stream, void *buf, size_t size,
int full_read, size_t *pnread);
static int _stream_write_unbuffered (mu_stream_t stream,
const void *buf, size_t size,
int full_write, size_t *pnwritten);
static void
_stream_setflag (struct _mu_stream *stream, int flag)
{
if (stream->event_cb && (stream->event_mask & flag))
stream->event_cb (stream, _MU_STR_EVENT_SET, flag);
_stream_event (stream, _MU_STR_EVENT_SETFLAG, flag, NULL);
stream->flags |= flag;
}
static void
_stream_clrflag (struct _mu_stream *stream, int flag)
{
if (stream->event_cb && (stream->event_mask & flag))
stream->event_cb (stream, _MU_STR_EVENT_CLR, flag);
_stream_event (stream, _MU_STR_EVENT_CLRFLAG, flag, NULL);
stream->flags &= ~flag;
}
......@@ -80,13 +93,12 @@ _mu_stream_seteof (mu_stream_t str)
_stream_setflag (str, _MU_STR_EOF);
}
#define _stream_advance_buffer(s,n) ((s)->cur += n, (s)->level -= n)
#define _stream_buffer_offset(s) ((s)->cur - (s)->buffer)
#define _stream_orig_level(s) ((s)->level + _stream_buffer_offset (s))
#define _stream_buffer_freespace(s) \
((s)->bufsize - (s)->level - _stream_buffer_offset(s))
((s)->bufsize - (s)->level)
#define _stream_buffer_is_full(s) (_stream_buffer_freespace(s) == 0)
#define _stream_curp(s) ((s)->buffer + (s)->pos)
static int
_stream_fill_buffer (struct _mu_stream *stream)
{
......@@ -95,13 +107,14 @@ _stream_fill_buffer (struct _mu_stream *stream)
int rc = 0;
char c;
stream->offset += stream->level;
switch (stream->buftype)
{
case mu_buffer_none:
return 0;
case mu_buffer_full:
rc = mu_stream_read_unbuffered (stream,
rc = _stream_read_unbuffered (stream,
stream->buffer, stream->bufsize,
0,
&stream->level);
......@@ -110,7 +123,7 @@ _stream_fill_buffer (struct _mu_stream *stream)
case mu_buffer_line:
for (n = 0;
n < stream->bufsize
&& (rc = mu_stream_read_unbuffered (stream,
&& (rc = _stream_read_unbuffered (stream,
&c, 1, 0, &rdn)) == 0;)
{
if (rdn == 0)
......@@ -125,7 +138,13 @@ _stream_fill_buffer (struct _mu_stream *stream)
stream->level = n;
break;
}
stream->cur = stream->buffer;
if (rc == 0)
{
stream->offset -= stream->level;
stream->pos = 0;
_stream_event (stream, _MU_STR_EVENT_FILLBUF,
stream->level, _stream_curp (stream));
}
return rc;
}
......@@ -139,7 +158,7 @@ _stream_buffer_full_p (struct _mu_stream *stream)
case mu_buffer_line:
return _stream_buffer_is_full (stream)
|| memchr (stream->cur, '\n', stream->level) != NULL;
|| memchr (stream->buffer, '\n', stream->level) != NULL;
case mu_buffer_full:
return _stream_buffer_is_full (stream);
......@@ -151,7 +170,8 @@ static int
_stream_flush_buffer (struct _mu_stream *stream, int all)
{
int rc;
char *end;
char *start, *end;
size_t wrsize;
if (stream->flags & _MU_STR_DIRTY)
{
......@@ -169,53 +189,55 @@ _stream_flush_buffer (struct _mu_stream *stream, int all)
abort(); /* should not happen */
case mu_buffer_full:
if ((rc = mu_stream_write_unbuffered (stream, stream->cur,
if ((rc = _stream_write_unbuffered (stream, stream->buffer,
stream->level, 1, NULL)))
return rc;
_stream_advance_buffer (stream, stream->level);
_stream_event (stream, _MU_STR_EVENT_FLUSHBUF,
stream->level, stream->buffer);
break;
case mu_buffer_line:
if (stream->level == 0)
break;
for (end = memchr (stream->cur, '\n', stream->level);
wrsize = stream->level;
for (start = stream->buffer, end = memchr (start, '\n', wrsize);
end;
end = memchr (stream->cur, '\n', stream->level))
end = memchr (start, '\n', wrsize))
{
size_t size = end - stream->cur + 1;
rc = mu_stream_write_unbuffered (stream,
stream->cur,
size, 1, NULL);
size_t size = end - start + 1;
rc = _stream_write_unbuffered (stream, start, size, 1, NULL);
if (rc)
return rc;
_stream_advance_buffer (stream, size);
_stream_event (stream, _MU_STR_EVENT_FLUSHBUF,
size, start);
start += size;
wrsize -= size;
if (wrsize == 0)
break;
}
if ((all && stream->level) || _stream_buffer_is_full (stream))
if ((all && wrsize) || wrsize == stream->level)
{
rc = mu_stream_write_unbuffered (stream,
stream->cur,
stream->level,
rc = _stream_write_unbuffered (stream,
stream->buffer,
wrsize,
1, NULL);
if (rc)
return rc;
_stream_advance_buffer (stream, stream->level);
_stream_event (stream, _MU_STR_EVENT_FLUSHBUF,
wrsize, stream->buffer);
wrsize = 0;
}
if (wrsize)
memmove (stream->buffer, start, wrsize);
else
_stream_clrflag (stream, _MU_STR_DIRTY);
stream->level = stream->pos = wrsize;
return 0;
}
}
else if (all)
_stream_advance_buffer (stream, stream->level);
if (stream->level)
{
if (stream->cur > stream->buffer)
memmove (stream->buffer, stream->cur, stream->level);
}
else
{
_stream_clrflag (stream, _MU_STR_DIRTY);
stream->level = 0;
}
stream->cur = stream->buffer;
stream->pos = stream->level = 0;
return 0;
}
......@@ -336,7 +358,7 @@ mu_stream_seek (mu_stream_t stream, mu_off_t offset, int whence,
case MU_SEEK_CUR:
if (offset == 0)
{
*pres = stream->offset + _stream_buffer_offset (stream);
*pres = stream->offset + stream->pos;
return 0;
}
offset += stream->offset;
......@@ -346,7 +368,7 @@ mu_stream_seek (mu_stream_t stream, mu_off_t offset, int whence,
rc = mu_stream_size (stream, &size);
if (rc)
return mu_stream_seterr (stream, rc, 1);
offset += size;
offset += size + stream->pos;
break;
default:
......@@ -354,8 +376,9 @@ mu_stream_seek (mu_stream_t stream, mu_off_t offset, int whence,
}
if ((stream->buftype == mu_buffer_none && offset != stream->offset)
|| stream->level == 0
|| offset < stream->offset
|| offset > stream->offset + _stream_buffer_offset (stream))
|| offset > stream->offset + stream->level)
{
if ((rc = _stream_flush_buffer (stream, 1)))
return rc;
......@@ -366,9 +389,11 @@ mu_stream_seek (mu_stream_t stream, mu_off_t offset, int whence,
return mu_stream_seterr (stream, rc, 1);
_mu_stream_cleareof (stream);
}
else if (stream->buftype != mu_buffer_none)
stream->pos = offset - stream->offset;
if (pres)
*pres = stream->offset + _stream_buffer_offset (stream);
*pres = stream->offset + stream->pos;
return 0;
}
......@@ -386,11 +411,13 @@ static int
_stream_skip_input_bytes (mu_stream_t stream, mu_off_t count, mu_off_t *pres)
{
mu_off_t pos;
int rc;
int rc = 0;
if (!(stream->flags & MU_STREAM_READ))
return mu_stream_seterr (stream, EACCES, 1);
if (count)
{
if (stream->buftype == mu_buffer_none)
{
for (pos = 0; pos < count; pos++)
......@@ -410,7 +437,7 @@ _stream_skip_input_bytes (mu_stream_t stream, mu_off_t count, mu_off_t *pres)
{
if ((rc = _stream_flush_buffer (stream, 1)))
return rc;
if (stream->level == 0)
if (stream->pos == stream->level)
{
rc = _stream_fill_buffer (stream);
if (rc)
......@@ -423,19 +450,17 @@ _stream_skip_input_bytes (mu_stream_t stream, mu_off_t count, mu_off_t *pres)
}
if (pos <= count && count < pos + stream->level)
{
size_t delta = count - pos;
_stream_advance_buffer (stream, delta);
pos = count;
stream->pos = count - pos;
rc = 0;
break;
}
pos += stream->level;
}
}
}
stream->offset += pos;
if (pres)
*pres = stream->offset;
*pres = stream->offset + stream->pos;
return rc;
}
......@@ -482,7 +507,7 @@ mu_stream_set_buffer (mu_stream_t stream, enum mu_buffer_type type,
return mu_stream_seterr (stream, ENOMEM, 1);
}
stream->bufsize = size;
stream->cur = stream->buffer;
stream->pos = 0;
stream->level = 0;
return 0;
......@@ -496,10 +521,9 @@ mu_stream_get_buffer (mu_stream_t stream, struct mu_buffer_query *qry)
return 0;
}
int
mu_stream_read_unbuffered (mu_stream_t stream, void *buf, size_t size,
int full_read,
size_t *pnread)
static int
_stream_read_unbuffered (mu_stream_t stream, void *buf, size_t size,
int full_read, size_t *pnread)
{
int rc;
size_t nread;
......@@ -560,11 +584,10 @@ mu_stream_read_unbuffered (mu_stream_t stream, void *buf, size_t size,
return rc;
}
int
mu_stream_write_unbuffered (mu_stream_t stream,
static int
_stream_write_unbuffered (mu_stream_t stream,
const void *buf, size_t size,
int full_write,
size_t *pnwritten)
int full_write, size_t *pnwritten)
{
int rc;
size_t nwritten;
......@@ -624,7 +647,7 @@ int
mu_stream_read (mu_stream_t stream, void *buf, size_t size, size_t *pread)
{
if (stream->buftype == mu_buffer_none)
return mu_stream_read_unbuffered (stream, buf, size, !pread, pread);
return _stream_read_unbuffered (stream, buf, size, !pread, pread);
else
{
char *bufp = buf;
......@@ -634,7 +657,7 @@ mu_stream_read (mu_stream_t stream, void *buf, size_t size, size_t *pread)
size_t n;
int rc;
if (stream->level == 0)
if (stream->pos == stream->level)
{
if ((rc = _stream_fill_buffer (stream)))
{
......@@ -647,10 +670,10 @@ mu_stream_read (mu_stream_t stream, void *buf, size_t size, size_t *pread)
}
n = size;
if (n > stream->level)
n = stream->level;
memcpy (bufp, stream->cur, n);
_stream_advance_buffer (stream, n);
if (n > stream->level - stream->pos)
n = stream->level - stream->pos;
memcpy (bufp, _stream_curp (stream), n);
stream->pos += n;
nbytes += n;
bufp += n;
size -= n;
......@@ -676,21 +699,24 @@ _stream_scandelim (mu_stream_t stream, char *buf, size_t size, int delim,
return MU_ERR_BUFSPACE;
while (size)
{
char *p;
char *p, *q;
size_t len;
if (stream->level == 0)
if (stream->pos == stream->level)
{
if ((rc = _stream_fill_buffer (stream)) || stream->level == 0)
break;
}
p = memchr (stream->cur, delim, stream->level);
len = p ? p - stream->cur + 1 : stream->level;
q = _stream_curp (stream);
len = stream->level - stream->pos;
p = memchr (q, delim, len);
if (p)
len = p - q + 1;
if (len > size)
len = size;
memcpy (buf, stream->cur, len);
_stream_advance_buffer (stream, len);
memcpy (buf, _stream_curp (stream), len);
stream->pos += len;
buf += len;
size -= len;
nread += len;
......@@ -736,6 +762,8 @@ mu_stream_readdelim (mu_stream_t stream, char *buf, size_t size,
if (size == 0)
return EINVAL;
if (stream->buftype == mu_buffer_none)
{
if (stream->readdelim)
{
size_t nread;
......@@ -744,10 +772,11 @@ mu_stream_readdelim (mu_stream_t stream, char *buf, size_t size,
*pread = nread;
stream->offset += nread;
}
else if (stream->buftype != mu_buffer_none)
rc = _stream_scandelim (stream, buf, size, delim, pread);
else
rc = _stream_readdelim (stream, buf, size, delim, pread);
}
else
rc = _stream_scandelim (stream, buf, size, delim, pread);
return rc;
}
......@@ -847,7 +876,7 @@ mu_stream_write (mu_stream_t stream, const void *buf, size_t size,
int rc = 0;
if (stream->buftype == mu_buffer_none)
rc = mu_stream_write_unbuffered (stream, buf, size,
rc = _stream_write_unbuffered (stream, buf, size,
!pnwritten, pnwritten);
else
{
......@@ -868,8 +897,11 @@ mu_stream_write (mu_stream_t stream, const void *buf, size_t size,
n = _stream_buffer_freespace (stream);
if (n > size)
n = size;
memcpy (stream->cur + stream->level, bufp, n);
stream->level += n;
memcpy (_stream_curp (stream), bufp, n);
stream->pos += n;
if (stream->pos > stream->level)
stream->level = stream->pos;
nbytes += n;
bufp += n;
size -= n;
......
......@@ -279,6 +279,7 @@ mu_streamref_create_abridged (mu_stream_t *pref, mu_stream_t str,
mu_stream_ref (str);
sp->stream.read = _streamref_read;
if (str->readdelim)
sp->stream.readdelim = _streamref_readdelim;
sp->stream.write = _streamref_write;
sp->stream.flush = _streamref_flush;
......
......@@ -167,21 +167,28 @@ print_transcript (struct _mu_xscript_stream *str, int flag,
}
}
static int
_xscript_read (struct _mu_stream *str, char *buf, size_t bufsize,
size_t *pnread)
static void
_xscript_event_cb (mu_stream_t str, int ev, unsigned long size, void *ptr)
{
struct _mu_xscript_stream *sp = (struct _mu_xscript_stream *)str;
size_t nbytes;
int rc = mu_stream_read (sp->transport, buf, bufsize, &nbytes);
if (rc == 0)
switch (ev)
{
print_transcript (sp, TRANS_READ, buf, nbytes);
if (pnread)
*pnread = nbytes;
case _MU_STR_EVENT_FILLBUF:
print_transcript (sp, TRANS_READ, ptr, size);
break;
case _MU_STR_EVENT_FLUSHBUF:
print_transcript (sp, TRANS_WRITE, ptr, size);
}
return rc;
}
static int
_xscript_read (struct _mu_stream *str, char *buf, size_t bufsize,
size_t *pnread)
{
struct _mu_xscript_stream *sp = (struct _mu_xscript_stream *)str;
return mu_stream_read (sp->transport, buf, bufsize, pnread);
}
static int
......@@ -189,15 +196,7 @@ _xscript_readdelim (struct _mu_stream *str, char *buf, size_t bufsize,
int delim, size_t *pnread)
{
struct _mu_xscript_stream *sp = (struct _mu_xscript_stream *)str;
size_t nread;
int rc = mu_stream_readdelim (sp->transport, buf, bufsize, delim, &nread);
if (rc == 0)
{
print_transcript (sp, TRANS_READ, buf, nread);
if (pnread)
*pnread = nread;
}
return rc;
return mu_stream_readdelim (sp->transport, buf, bufsize, delim, pnread);
}
static int
......@@ -205,11 +204,7 @@ _xscript_write (struct _mu_stream *str, const char *buf, size_t bufsize,
size_t *pnwrite)
{
struct _mu_xscript_stream *sp = (struct _mu_xscript_stream *)str;
int rc = mu_stream_write (sp->transport, buf, bufsize, pnwrite);
if (rc == 0)
print_transcript (sp, TRANS_WRITE, buf, pnwrite ? *pnwrite : bufsize);
return rc;
return mu_stream_write (sp->transport, buf, bufsize, pnwrite);
}
static int
......@@ -318,7 +313,6 @@ _xscript_ctl (struct _mu_stream *str, int op, void *arg)
case MU_IOCTL_GET_TRANSPORT_BUFFER:
case MU_IOCTL_SET_TRANSPORT_BUFFER:
{
struct mu_transport_buffer_query *qp = arg;
if (!sp->transport)
return EINVAL;
return mu_stream_ioctl (sp->transport, op, arg);
......@@ -391,6 +385,7 @@ mu_xscript_stream_create(mu_stream_t *pref, mu_stream_t transport,
return ENOMEM;
sp->stream.read = _xscript_read;
if (transport->readdelim)
sp->stream.readdelim = _xscript_readdelim;
sp->stream.write = _xscript_write;
sp->stream.flush = _xscript_flush;
......@@ -404,7 +399,9 @@ mu_xscript_stream_create(mu_stream_t *pref, mu_stream_t transport,
sp->stream.truncate = _xscript_truncate;
sp->stream.shutdown = _xscript_shutdown;
sp->stream.error_string = _xscript_error_string;
sp->stream.event_cb = _xscript_event_cb;
sp->stream.event_mask = _MU_STR_EVMASK(_MU_STR_EVENT_FILLBUF) |
_MU_STR_EVMASK(_MU_STR_EVENT_FLUSHBUF);
if (!(flags & MU_STREAM_AUTOCLOSE))
{
mu_stream_ref (transport);
......