Commit f96ce94e f96ce94e2ad041c0f6011eafb74e1799e1f90eb0 by Sergey Poznyakoff

Optimize readline/readdelim/getdelim calls.

This speeds up common reading operations by factor of 4-6.

* include/mailutils/stream.h (mu_stream_readdelim): New proto.
* include/mailutils/sys/stream.h (_mu_stream) <readdelim>: New method.
* mailbox/stream.c (_stream_scandelim, _stream_readdelim): New functions.
(mu_stream_readdelim): New function.
(mu_stream_readline): Rewrite using mu_stream_readdelim.
(mu_stream_getdelim): Optimize.

* mailbox/amd.c (amd_body_stream_readdelim): New function.
(_amd_attach_message): Set the readdelim method.
* mailbox/header.c: Add a placeholder for readdelim method.
* mailbox/message.c (_message_stream_readdelim): New function.
(_message_stream_create): Set the readdelim method.
* mailbox/streamref.c (_streamref_readdelim): New function.
(mu_streamref_create_abridged): Set the readdelim method.
1 parent 03aa45f1
......@@ -74,6 +74,8 @@ int mu_stream_seek (mu_stream_t stream, mu_off_t offset, int whence,
int mu_stream_set_buffer (mu_stream_t stream, enum mu_buffer_type type,
size_t size);
int mu_stream_read (mu_stream_t stream, void *buf, size_t size, size_t *pread);
int mu_stream_readdelim (mu_stream_t stream, char *buf, size_t size,
int delim, size_t *pread);
int mu_stream_readline (mu_stream_t stream, char *buf, size_t size, size_t *pread);
int mu_stream_getdelim (mu_stream_t stream, char **pbuf, size_t *psize,
int delim, size_t *pread);
......
......@@ -41,6 +41,7 @@ struct _mu_stream
int last_err;
int (*read) (struct _mu_stream *, char *, size_t, size_t *);
int (*readdelim) (struct _mu_stream *, char *, size_t, int, size_t *);
int (*write) (struct _mu_stream *, const char *, size_t, size_t *);
int (*flush) (struct _mu_stream *);
int (*open) (struct _mu_stream *);
......
......@@ -117,6 +117,10 @@ static int amd_envelope_sender (mu_envelope_t envelope, char *buf, size_t len,
static int amd_body_stream_read (mu_stream_t str, char *buffer,
size_t buflen,
size_t *pnread);
static int amd_body_stream_readdelim (mu_stream_t is,
char *buffer, size_t buflen,
int delim,
size_t *pnread);
static int amd_body_stream_size (mu_stream_t str, mu_off_t *psize);
static int amd_body_stream_seek (mu_stream_t str, mu_off_t off, int whence,
mu_off_t *presult);
......@@ -487,6 +491,7 @@ _amd_attach_message (mu_mailbox_t mailbox, struct _amd_message *mhm,
return ENOMEM;
}
str->stream.read = amd_body_stream_read;
str->stream.readdelim = amd_body_stream_readdelim;
str->stream.size = amd_body_stream_size;
str->stream.seek = amd_body_stream_seek;
mu_body_set_stream (body, (mu_stream_t) str, msg);
......@@ -1635,13 +1640,75 @@ amd_body_stream_read (mu_stream_t is, char *buffer, size_t buflen,
}
static int
amd_body_stream_readdelim (mu_stream_t is, char *buffer, size_t buflen,
int delim,
size_t *pnread)
{
struct _amd_body_stream *amdstr = (struct _amd_body_stream *)is;
mu_body_t body = amdstr->body;
mu_message_t msg = mu_body_get_owner (body);
struct _amd_message *mhm = mu_message_get_owner (msg);
size_t nread = 0;
int status = 0;
amd_pool_open (mhm);
if (buffer == NULL || buflen == 0)
{
if (pnread)
*pnread = nread;
return 0;
}
mu_monitor_rdlock (mhm->amd->mailbox->monitor);
#ifdef WITH_PTHREAD
/* read() is cancellation point since we're doing a potentially
long operation. Lets make sure we clean the state. */
pthread_cleanup_push (amd_cleanup, (void *)mhm->amd->mailbox);
#endif
status = mu_stream_seek (mhm->stream, mhm->body_start + amdstr->off,
MU_SEEK_SET, NULL);
if (status)
{
buflen--;
while (buflen)
{
size_t ln, rdsize;
ln = mhm->body_end - (mhm->body_start + amdstr->off);
if (ln > 0)
{
rdsize = ((size_t)ln < buflen) ? (size_t)ln : buflen;
status = mu_stream_readdelim (mhm->stream, buffer, rdsize,
delim, &rdsize);
amdstr->off += nread;
nread += rdsize;
if (status)
break;
buflen -= rdsize;
buffer += rdsize;
}
}
}
mu_monitor_unlock (mhm->amd->mailbox->monitor);
#ifdef WITH_PTHREAD
pthread_cleanup_pop (0);
#endif
if (pnread)
*pnread = nread;
return status;
}
static int
amd_body_stream_seek (mu_stream_t str, mu_off_t off, int whence,
mu_off_t *presult)
{
size_t size;
struct _amd_body_stream *amdstr = (struct _amd_body_stream *)str;
mu_message_t msg = mu_body_get_owner (amdstr->body);
struct _amd_message *mhm = mu_message_get_owner (msg);
amd_body_size (amdstr->body, &size);
switch (whence)
......
......@@ -999,6 +999,59 @@ header_read (mu_stream_t is, char *buffer, size_t buflen, size_t *pnread)
return 0;
}
#if 0
/* FIXME: Implement header_readdelim based on this: */
static int
_header_readline (mu_stream_t is, char *buffer, size_t buflen, size_t *pnread)
{
struct _mu_header_stream *hstr = (struct _mu_header_stream *) is;
mu_header_t header = hstr->hdr;
struct mu_hdrent *ent;
size_t ent_off;
int status;
size_t strsize;
char *start, *end;
if (buflen == 0)
return EINVAL;
header = mu_stream_get_owner (is);
status = mu_header_fill (header);
if (status)
return status;
if (mu_hdrent_find_stream_pos (header, hstr->off, &ent, &ent_off))
{
if (pnread)
*pnread = 0;
return 0;
}
buflen--; /* Account for the terminating nul */
mu_hdrent_fixup (header, ent);
strsize = MU_STR_SIZE (ent->nlen, ent->vlen) - ent_off;
start = MU_HDRENT_NAME (header, ent) + ent_off;
end = strchr (start, '\n');
if (end)
{
size_t len = end - start + 1;
if (len < strsize)
strsize = len;
}
if (strsize < buflen)
buflen = strsize;
memcpy (buffer, start, buflen);
buffer[buflen] = 0;
hstr->off += buflen;
mu_hdrent_unroll_fixup (header, ent);
if (pnread)
*pnread = buflen;
return 0;
}
#endif
static int
header_write (mu_stream_t os, const char *buf, size_t buflen, size_t *pnwrite)
{
......
......@@ -243,6 +243,31 @@ _message_stream_read (struct _mu_stream *str, char *buf, size_t bufsize,
return rc;
}
static int
_message_stream_readdelim (struct _mu_stream *str, char *buf, size_t bufsize,
int delim, size_t *pnread)
{
struct _mu_message_stream *sp = (struct _mu_message_stream *)str;
size_t nread = 0;
int rc;
while (bufsize)
{
size_t n;
rc = _check_stream_state (sp);
if (rc)
break;
if (sp->state == _mss_eof)
break;
rc = mu_stream_readdelim (sp->transport, buf, bufsize, delim, &n);
nread += n;
buf += n;
bufsize -= n;
}
*pnread = nread;
return rc;
}
#if 0
static int
_message_stream_write (struct _mu_stream *str,
......@@ -266,6 +291,7 @@ _message_stream_create (mu_stream_t *pmsg, mu_message_t msg, int flags)
return ENOMEM;
sp->stream.read = _message_stream_read;
sp->stream.readdelim = _message_stream_readdelim;
/* FIXME: Write is not defined */
/* sp->stream.write = _message_stream_write;*/
sp->stream.done = _message_stream_done;
......
......@@ -536,22 +536,54 @@ mu_stream_read (mu_stream_t stream, void *buf, size_t size, size_t *pread)
}
int
mu_stream_readline (mu_stream_t stream, char *buf, size_t size, size_t *pread)
_stream_scandelim (mu_stream_t stream, char *buf, size_t size, int delim,
size_t *pnread)
{
int rc = 0;
size_t nread = 0;
size--;
while (size)
{
char *p;
size_t len;
if (stream->level == 0)
{
if ((rc = _stream_fill_buffer (stream)) || stream->level == 0)
break;
}
p = memchr (stream->cur, delim, stream->level);
len = p ? p - stream->cur + 1 : stream->level;
if (len > size)
len = size;
memcpy (buf, stream->cur, len);
_stream_advance_buffer (stream, len);
buf += len;
size -= len;
nread += len;
}
*buf = 0;
*pnread = nread;
return rc;
}
static int
_stream_readdelim (mu_stream_t stream, char *buf, size_t size,
int delim, size_t *pread)
{
int rc;
char c;
size_t n = 0, rdn;
if (size == 0)
return EINVAL;
size--;
for (n = 0;
n < size && (rc = mu_stream_read (stream, &c, 1, &rdn)) == 0 && rdn;)
{
*buf++ = c;
n++;
if (c == '\n')
if (c == delim)
break;
}
*buf = 0;
......@@ -561,6 +593,30 @@ mu_stream_readline (mu_stream_t stream, char *buf, size_t size, size_t *pread)
}
int
mu_stream_readdelim (mu_stream_t stream, char *buf, size_t size,
int delim, size_t *pread)
{
int rc;
if (size == 0)
return EINVAL;
if (stream->readdelim)
rc = stream->readdelim (stream, buf, size, delim, pread);
else if (stream->buftype != mu_buffer_none)
rc = _stream_scandelim (stream, buf, size, delim, pread);
else
rc = _stream_readdelim (stream, buf, size, delim, pread);
return rc;
}
int
mu_stream_readline (mu_stream_t stream, char *buf, size_t size, size_t *pread)
{
return mu_stream_readdelim (stream, buf, size, '\n', pread);
}
int
mu_stream_getdelim (mu_stream_t stream, char **pbuf, size_t *psize,
int delim, size_t *pread)
{
......@@ -581,13 +637,8 @@ mu_stream_getdelim (mu_stream_t stream, char **pbuf, size_t *psize,
for (;;)
{
char c;
size_t rdn;
rc = mu_stream_read (stream, &c, 1, &rdn);
if (rc || rdn == 0)
break;
/* Make enough space for len+1 (for final NUL) bytes. */
if (cur_len + 1 >= n)
{
......@@ -615,10 +666,20 @@ mu_stream_getdelim (mu_stream_t stream, char **pbuf, size_t *psize,
n = needed;
}
lineptr[cur_len] = c;
cur_len++;
if (stream->readdelim)
rc = stream->readdelim (stream, lineptr + cur_len, n - cur_len, delim,
&rdn);
else if (stream->buftype != mu_buffer_none)
rc = _stream_scandelim (stream, lineptr + cur_len, n - cur_len, delim,
&rdn);
else
rc = mu_stream_read (stream, lineptr + cur_len, 1, &rdn);
if (rc || rdn == 0)
break;
cur_len += rdn;
if (c == delim)
if (lineptr[cur_len - 1] == delim)
break;
}
lineptr[cur_len] = '\0';
......
......@@ -69,6 +69,16 @@ _streamref_read (struct _mu_stream *str, char *buf, size_t bufsize,
}
static int
_streamref_readdelim (struct _mu_stream *str, char *buf, size_t bufsize,
int delim, size_t *pnread)
{
struct _mu_streamref *sp = (struct _mu_streamref *)str;
return streamref_return (sp, mu_stream_readdelim (sp->transport,
buf, bufsize,
delim, pnread));
}
static int
_streamref_write (struct _mu_stream *str, const char *buf, size_t bufsize,
size_t *pnwrite)
{
......@@ -264,6 +274,7 @@ mu_streamref_create_abridged (mu_stream_t *pref, mu_stream_t str,
mu_stream_ref (str);
sp->stream.read = _streamref_read;
sp->stream.readdelim = _streamref_readdelim;
sp->stream.write = _streamref_write;
sp->stream.flush = _streamref_flush;
sp->stream.open = _streamref_open;
......