Commit e9580ebf e9580ebf7bd77a9d1c456c2138d38258275b454c by Sergey Poznyakoff

Improve stream statistics interface. Introduce "null stream".

* include/mailutils/stream.h (MU_IOCTL_NULLSTREAM_SET_PATTERN)
(MU_IOCTL_NULLSTREAM_SET_PATCLASS)
(MU_IOCTL_NULLSTREAM_SETSIZE)
(MU_IOCTL_NULLSTREAM_CLRSIZE): New ioctl opcodes.
(mu_nullstream_pattern): New struct.
(MU_STREAM_STAT_IN,MU_STREAM_STAT_OUT)
(MU_STREAM_STAT_READS,MU_STREAM_STAT_WRITES)
(MU_STREAM_STAT_SEEKS,_MU_STREAM_STAT_MAX): New defines.
(MU_STREAM_STAT_MASK): New macro.
(MU_STREAM_STAT_MASK_ALL): New define.
(mu_stream_stat_buffer): New typedef.
(mu_stream_set_stat, mu_stream_get_stat)
(mu_nullstream_create): New protos.
(mu_stream_bytes_in, mu_stream_bytes_out): Remove protos.

* include/mailutils/sys/nullstream.h: New file.
* include/mailutils/sys/stream.h (_mu_stream) <bytes_in>,
<bytes_out>: Remove.
<statmask,statbuf>: New members.
* include/mailutils/sys/Makefile.am (sysinclude_HEADERS): Add nullstream.h.

* libmailutils/stream/nullstream.c: New file.
* libmailutils/stream/stream.c (_stream_read, _stream_write)
(_stream_seek, _stream_stat_incr): New macros.
(_stream_fill_buffer, mu_stream_seek)
(_stream_read_unbuffered)
(_stream_write_unbuffered): Use _stream_read, _stream_write
and _stream_seek instead of calling the corresponding methods directly.
(mu_stream_bytes_in, mu_stream_bytes_out): Remove.
(mu_stream_unref): Clear statbuf, if provided.
(mu_stream_set_stat, mu_stream_get_stat): New functions.

* libmailutils/stream/Makefile.am (libstream_la_SOURCES): Add nullstream.c.

* libmailutils/stream/fltstream.c (filter_wr_close): Check for fs->eof.

* examples/base64.c (c_copy): Use new statistics interface.
* libmailutils/tests/fltst.c: Likewise.
1 parent 90af346d
......@@ -33,6 +33,22 @@ int printable = 0;
static void
c_copy (mu_stream_t out, mu_stream_t in)
{
mu_stream_stat_buffer instat, outstat;
if (verbose)
{
mu_stream_set_stat (in,
MU_STREAM_STAT_MASK (MU_STREAM_STAT_IN) |
MU_STREAM_STAT_MASK (MU_STREAM_STAT_READS) |
MU_STREAM_STAT_MASK (MU_STREAM_STAT_SEEKS),
instat);
mu_stream_set_stat (out,
MU_STREAM_STAT_MASK (MU_STREAM_STAT_OUT) |
MU_STREAM_STAT_MASK (MU_STREAM_STAT_WRITES) |
MU_STREAM_STAT_MASK (MU_STREAM_STAT_SEEKS),
outstat);
}
if (printable)
{
char c;
......@@ -58,9 +74,23 @@ c_copy (mu_stream_t out, mu_stream_t in)
mu_stream_close (out);
mu_stream_close (in);
if (verbose)
fprintf (stderr, "\ntotal: %lu/%lu bytes\n",
(unsigned long) mu_stream_bytes_in (in),
(unsigned long) mu_stream_bytes_out (out));
{
fprintf (stderr, "\nInput stats:\n");
fprintf (stderr, "Bytes in: %lu\n",
(unsigned long) instat[MU_STREAM_STAT_IN]);
fprintf (stderr, "Reads: %lu\n",
(unsigned long) instat[MU_STREAM_STAT_READS]);
fprintf (stderr, "Seeks: %lu\n",
(unsigned long) instat[MU_STREAM_STAT_SEEKS]);
fprintf (stderr, "\nOutput stats:\n");
fprintf (stderr, "Bytes out: %lu\n",
(unsigned long) outstat[MU_STREAM_STAT_OUT]);
fprintf (stderr, "Writes: %lu\n",
(unsigned long) outstat[MU_STREAM_STAT_WRITES]);
fprintf (stderr, "Seeks: %lu\n",
(unsigned long) outstat[MU_STREAM_STAT_SEEKS]);
}
}
/* Set the maximum line length for the filter NAME to LENGTH.
......
......@@ -76,12 +76,31 @@ enum mu_buffer_type
#define MU_IOCTL_GET_ECHO 12
#define MU_IOCTL_SET_ECHO 13
/* The following ioctls are for nullstreams only: */
#define MU_IOCTL_NULLSTREAM_SET_PATTERN 14
/* Set read pattern.
Argument: struct mu_nullstream_pattern *pat.
If pat==NULL, any reads from that stream will return EOF. */
#define MU_IOCTL_NULLSTREAM_SET_PATCLASS 15
/* Set pattern class for reads: Argument int *pclass (a class mask
from mailutils/cctype.h */
#define MU_IOCTL_NULLSTREAM_SETSIZE 16
/* Limit stream size. Argument: mu_off_t *psize; */
#define MU_IOCTL_NULLSTREAM_CLRSIZE 17
/* Lift the size limit. Argument: NULL */
#define MU_TRANSPORT_INPUT 0
#define MU_TRANSPORT_OUTPUT 1
#define MU_TRANSPORT_VALID_TYPE(n) \
((n) == MU_TRANSPORT_INPUT || (n) == MU_TRANSPORT_OUTPUT)
struct mu_nullstream_pattern
{
char *pattern;
size_t size;
};
struct mu_buffer_query
{
int type; /* One of MU_TRANSPORT_ defines */
......@@ -89,6 +108,28 @@ struct mu_buffer_query
size_t bufsize; /* Buffer size */
};
/* Statistics */
#define MU_STREAM_STAT_IN 0
#define MU_STREAM_STAT_OUT 1
#define MU_STREAM_STAT_READS 2
#define MU_STREAM_STAT_WRITES 3
#define MU_STREAM_STAT_SEEKS 4
#define _MU_STREAM_STAT_MAX 5
#define MU_STREAM_STAT_MASK(n) (1U<<(n+1))
#define MU_STREAM_STAT_MASK_ALL \
(MU_STREAM_STAT_MASK (MU_STREAM_STAT_IN) | \
MU_STREAM_STAT_MASK (MU_STREAM_STAT_OUT) | \
MU_STREAM_STAT_MASK (MU_STREAM_STAT_READS) | \
MU_STREAM_STAT_MASK (MU_STREAM_STAT_WRITES) | \
MU_STREAM_STAT_MASK (MU_STREAM_STAT_SEEKS))
typedef mu_off_t mu_stream_stat_buffer[_MU_STREAM_STAT_MAX];
int mu_stream_set_stat (mu_stream_t stream, int statmask,
mu_stream_stat_buffer statbuf);
int mu_stream_get_stat (mu_stream_t stream, int *pstatmask,
mu_off_t **pstatbuf);
#define MU_STREAM_DEFBUFSIZ 8192
extern size_t mu_stream_default_buffer_size;
......@@ -127,8 +168,6 @@ int mu_stream_writeline (mu_stream_t stream, const char *buf, size_t size);
int mu_stream_flush (mu_stream_t stream);
int mu_stream_close (mu_stream_t stream);
int mu_stream_size (mu_stream_t stream, mu_off_t *psize);
mu_off_t mu_stream_bytes_in (mu_stream_t stream);
mu_off_t mu_stream_bytes_out (mu_stream_t stream);
int mu_stream_ioctl (mu_stream_t stream, int code, void *ptr);
int mu_stream_truncate (mu_stream_t stream, mu_off_t);
int mu_stream_shutdown (mu_stream_t stream, int how);
......@@ -202,6 +241,8 @@ int mu_dbgstream_create(mu_stream_t *pref, mu_debug_t debug,
int mu_rdcache_stream_create (mu_stream_t *pstream, mu_stream_t transport,
int flags);
int mu_nullstream_create (mu_stream_t *pref, int flags);
#ifdef __cplusplus
}
#endif
......
......@@ -42,6 +42,7 @@ sysinclude_HEADERS = \
mime.h\
monitor.h\
nntp.h\
nullstream.h\
observer.h\
pop3.h\
prog_stream.h\
......
/* GNU Mailutils -- a suite of utilities for electronic mail
Copyright (C) 2010 Free Software Foundation, Inc.
This library is free software; you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3, or (at your option)
any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with GNU Mailutils. If not, see <http://www.gnu.org/licenses/>. */
#ifndef _MAILUTILS_SYS_NULLSTREAM_H
#define _MAILUTILS_SYS_NULLSTREAM_H
#include <mailutils/types.h>
#include <mailutils/sys/stream.h>
#define MU_NULLSTREAM_SIZE 0x0001 /* Stream has a limited size */
#define MU_NULLSTREAM_PATSTAT 0x0002 /* Pattern is allocated statically */
struct _mu_nullstream
{
struct _mu_stream base; /* Superclass */
int mode; /* Stream mode */
mu_off_t size; /* Stream size */
char *pattern; /* Fill pattern */
size_t patsize; /* Size of pattern */
};
#endif
......@@ -45,7 +45,9 @@ struct _mu_stream
int flags;
mu_off_t offset;
mu_off_t bytes_in, bytes_out;
int statmask;
mu_off_t *statbuf;
int last_err;
......
......@@ -25,6 +25,7 @@ libstream_la_SOURCES = \
mapfile_stream.c\
memory_stream.c\
message_stream.c\
nullstream.c\
prog_stream.c\
rdcache_stream.c\
socket_stream.c\
......
......@@ -407,7 +407,7 @@ static int
filter_wr_close (mu_stream_t stream)
{
struct _mu_filter_stream *fs = (struct _mu_filter_stream *)stream;
if (!mu_stream_eof (stream))
if (!mu_stream_eof (stream) && !fs->eof)
{
size_t dummy;
int rc = filter_write_internal (stream, mu_filter_lastbuf, NULL, 0,
......
/* GNU Mailutils -- a suite of utilities for electronic mail
Copyright (C) 2010 Free Software Foundation, Inc.
This library is free software; you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3, or (at your option)
any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with GNU Mailutils. If not, see <http://www.gnu.org/licenses/>. */
#ifdef HAVE_CONFIG_H
# include <config.h>
#endif
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <mailutils/types.h>
#include <mailutils/stream.h>
#include <mailutils/errno.h>
#include <mailutils/sys/nullstream.h>
#include <mailutils/stream.h>
#include <mailutils/cctype.h>
static int
_nullstream_read (struct _mu_stream *str, char *buf, size_t bufsize,
size_t *pnread)
{
struct _mu_nullstream *np = (struct _mu_nullstream *)str;
size_t i;
mu_off_t off;
if (np->pattern == NULL)
{
*pnread = 0;
return 0;
}
off = np->base.offset + np->base.pos;
for (i = 0; i < bufsize; i++, off++)
{
if ((np->mode & MU_NULLSTREAM_SIZE) && off >= np->size)
break;
*buf++ = np->pattern[off % np->patsize];
}
*pnread = i;
return 0;
}
static int
_nullstream_write (struct _mu_stream *str, const char *buf, size_t bufsize,
size_t *pnwrite)
{
*pnwrite = bufsize;
return 0;
}
static void
_nullstream_free_pattern (struct _mu_nullstream *np)
{
if (!(np->mode & MU_NULLSTREAM_PATSTAT))
{
free (np->pattern);
np->pattern = NULL;
np->patsize = 0;
np->mode &= ~MU_NULLSTREAM_PATSTAT;
}
}
static void
_nullstream_done (struct _mu_stream *str)
{
struct _mu_nullstream *np = (struct _mu_nullstream *)str;
_nullstream_free_pattern (np);
}
static int
_nullstream_seek (struct _mu_stream *str, mu_off_t off, mu_off_t *ppos)
{
struct _mu_nullstream *np = (struct _mu_nullstream *)str;
if ((np->mode & MU_NULLSTREAM_SIZE) && off >= np->size)
return ESPIPE;
*ppos = off;
return 0;
}
static int
_nullstream_size (struct _mu_stream *str, mu_off_t *psize)
{
struct _mu_nullstream *np = (struct _mu_nullstream *)str;
*psize = (np->mode & MU_NULLSTREAM_SIZE) ? np->size : 0;
return 0;
}
static int
_nullstream_truncate (struct _mu_stream *str, mu_off_t size)
{
struct _mu_nullstream *np = (struct _mu_nullstream *)str;
np->base.size = _nullstream_size;
np->size = size;
np->mode |= MU_NULLSTREAM_SIZE;
return 0;
}
static int
_nullstream_ctl (struct _mu_stream *str, int op, void *arg)
{
struct _mu_nullstream *np = (struct _mu_nullstream *)str;
switch (op)
{
case MU_IOCTL_NULLSTREAM_SET_PATTERN:
if (!arg)
_nullstream_free_pattern (np);
else
{
struct mu_nullstream_pattern *pat = arg;
char *p;
p = malloc (pat->size);
if (!p)
return ENOMEM;
memcpy (p, pat->pattern, pat->size);
_nullstream_free_pattern (np);
np->pattern = p;
np->patsize = pat->size;
}
break;
case MU_IOCTL_NULLSTREAM_SET_PATCLASS:
if (!arg)
return EINVAL;
else
{
char buf[256];
int cnt = 0, i;
int class = *(int*)arg;
char *p;
for (i = 0; i < 256; i++)
{
if (mu_c_is_class (i, class))
buf[cnt++] = i;
}
p = malloc (cnt);
if (!p)
return ENOMEM;
memcpy (p, buf, cnt);
_nullstream_free_pattern (np);
np->pattern = p;
np->patsize = cnt;
}
break;
case MU_IOCTL_NULLSTREAM_SETSIZE:
if (!arg)
return EINVAL;
else
return _nullstream_truncate (str, *(mu_off_t*)arg);
break;
case MU_IOCTL_NULLSTREAM_CLRSIZE:
np->mode &= ~MU_NULLSTREAM_SIZE;
np->base.size = NULL;
break;
default:
return ENOSYS;
}
return 0;
}
int
mu_nullstream_create (mu_stream_t *pref, int flags)
{
struct _mu_nullstream *np;
np = (struct _mu_nullstream *)
_mu_stream_create (sizeof (*np),
flags | MU_STREAM_SEEK | _MU_STR_OPEN);
if (!np)
return ENOMEM;
np->base.read = _nullstream_read;
np->base.write = _nullstream_write;
np->base.seek = _nullstream_seek;
np->base.ctl = _nullstream_ctl;
np->base.truncate = _nullstream_truncate;
np->base.done = _nullstream_done;
np->pattern = "";
np->patsize = 0;
np->mode = MU_NULLSTREAM_PATSTAT;
*pref = (mu_stream_t) np;
mu_stream_set_buffer (*pref, mu_buffer_full, 0);
return 0;
}
......@@ -46,6 +46,19 @@ size_t mu_stream_default_buffer_size = MU_STREAM_DEFBUFSIZ;
} \
while (0)
#define _stream_stat_incr(s, k, n) \
(((s)->statmask & MU_STREAM_STAT_MASK(k)) ? ((s)->statbuf[k] += n) : 0)
#define _stream_read(str, buf, size, rdbytes) \
(_stream_stat_incr ((str), MU_STREAM_STAT_READS, 1), \
(str)->read (str, buf, size, rdbytes))
#define _stream_write(str, buf, size, wrbytes) \
(_stream_stat_incr ((str), MU_STREAM_STAT_WRITES, 1), \
(str)->write (str, buf, size, wrbytes))
#define _stream_seek(str, pos, poff) \
(_stream_stat_incr ((str), MU_STREAM_STAT_SEEKS, 1), \
(str)->seek (str, pos, poff))
static int _stream_read_unbuffered (mu_stream_t stream, void *buf, size_t size,
int full_read, size_t *pnread);
static int _stream_write_unbuffered (mu_stream_t stream,
......@@ -180,7 +193,7 @@ _stream_flush_buffer (struct _mu_stream *stream, int flags)
if ((stream->flags & MU_STREAM_SEEK) && stream->seek)
{
mu_off_t off;
rc = stream->seek (stream, stream->offset, &off);
rc = _stream_seek (stream, stream->offset, &off);
if (rc)
return rc;
}
......@@ -306,7 +319,9 @@ mu_stream_unref (mu_stream_t stream)
static void
_stream_init (mu_stream_t stream)
{
stream->bytes_in = stream->bytes_out = 0;
if (stream->statmask)
memset (stream->statbuf, 0,
_MU_STREAM_STAT_MAX * sizeof (stream->statbuf[0]));
stream->flags &= ~_MU_STR_INTERN_MASK;
_stream_setflag (stream, _MU_STR_OPEN);
stream->offset = 0;
......@@ -431,7 +446,7 @@ mu_stream_seek (mu_stream_t stream, mu_off_t offset, int whence,
return rc;
if (stream->offset != offset)
{
rc = stream->seek (stream, offset, &stream->offset);
rc = _stream_seek (stream, offset, &stream->offset);
if (rc == ESPIPE)
return rc;
if (rc)
......@@ -602,7 +617,7 @@ _stream_read_unbuffered (mu_stream_t stream, void *buf, size_t size,
nread = 0;
while (size > 0
&& (rc = stream->read (stream, buf, size, &rdbytes)) == 0)
&& (rc = _stream_read (stream, buf, size, &rdbytes)) == 0)
{
if (rdbytes == 0)
{
......@@ -612,19 +627,19 @@ _stream_read_unbuffered (mu_stream_t stream, void *buf, size_t size,
buf += rdbytes;
nread += rdbytes;
size -= rdbytes;
stream->bytes_in += rdbytes;
_stream_stat_incr (stream, MU_STREAM_STAT_IN, rdbytes);
}
if (size && rc)
rc = mu_stream_seterr (stream, rc, 0);
}
else
{
rc = stream->read (stream, buf, size, &nread);
rc = _stream_read (stream, buf, size, &nread);
if (rc == 0)
{
if (nread == 0)
_stream_setflag (stream, _MU_STR_EOF);
stream->bytes_in += nread;
_stream_stat_incr (stream, MU_STREAM_STAT_IN, nread);
}
mu_stream_seterr (stream, rc, rc != 0);
}
......@@ -665,7 +680,7 @@ _stream_write_unbuffered (mu_stream_t stream,
nwritten = 0;
while (size > 0
&& (rc = stream->write (stream, bufp, size, &wrbytes))
&& (rc = _stream_write (stream, bufp, size, &wrbytes))
== 0)
{
if (wrbytes == 0)
......@@ -676,14 +691,14 @@ _stream_write_unbuffered (mu_stream_t stream,
bufp += wrbytes;
nwritten += wrbytes;
size -= wrbytes;
stream->bytes_out += wrbytes;
_stream_stat_incr (stream, MU_STREAM_STAT_OUT, wrbytes);
}
}
else
{
rc = stream->write (stream, buf, size, &nwritten);
rc = _stream_write (stream, buf, size, &nwritten);
if (rc == 0)
stream->bytes_out += nwritten;
_stream_stat_incr (stream, MU_STREAM_STAT_OUT, nwritten);
}
_stream_setflag (stream, _MU_STR_WRT);
if (pnwritten)
......@@ -1101,18 +1116,6 @@ mu_stream_size (mu_stream_t stream, mu_off_t *psize)
return mu_stream_seterr (stream, rc, rc != 0);
}
mu_off_t
mu_stream_bytes_in (mu_stream_t stream)
{
return stream->bytes_in;
}
mu_off_t
mu_stream_bytes_out (mu_stream_t stream)
{
return stream->bytes_out;
}
int
mu_stream_ioctl (mu_stream_t stream, int code, void *ptr)
{
......@@ -1215,3 +1218,27 @@ mu_stream_clr_flags (mu_stream_t stream, int fl)
return 0;
}
int
mu_stream_set_stat (mu_stream_t stream, int statmask,
mu_stream_stat_buffer statbuf)
{
if (stream == NULL)
return EINVAL;
stream->statmask = statmask;
stream->statbuf = statbuf;
memset (stream->statbuf, 0,
_MU_STREAM_STAT_MAX * sizeof (stream->statbuf[0]));
return 0;
}
int
mu_stream_get_stat (mu_stream_t stream, int *pstatmask,
mu_off_t **pstatbuf)
{
if (stream == NULL)
return EINVAL;
*pstatmask = stream->statmask;
*pstatbuf = stream->statbuf;
return 0;
}
......
......@@ -29,10 +29,21 @@
int verbose = 0;
int printable = 0;
mu_stream_stat_buffer instat, outstat;
static void
c_copy (mu_stream_t out, mu_stream_t in)
{
if (verbose)
{
mu_stream_set_stat (in,
MU_STREAM_STAT_MASK_ALL,
instat);
mu_stream_set_stat (out,
MU_STREAM_STAT_MASK_ALL,
outstat);
}
if (printable)
{
char c;
......@@ -55,11 +66,6 @@ c_copy (mu_stream_t out, mu_stream_t in)
else
MU_ASSERT (mu_stream_copy (out, in, 0, NULL));
if (verbose)
fprintf (stderr, "\ntotal: %lu/%lu bytes\n",
(unsigned long) mu_stream_bytes_in (in),
(unsigned long) mu_stream_bytes_out (out));
}
/* Set the maximum line length for the filter NAME to LENGTH.
......@@ -186,5 +192,28 @@ main (int argc, char * argv [])
mu_stream_close (out);
mu_stream_destroy (&out);
if (verbose)
{
fprintf (stderr, "\nInput stream stats:\n");
fprintf (stderr, "Bytes in: %lu\n",
(unsigned long) instat[MU_STREAM_STAT_IN]);
fprintf (stderr, "Bytes out: %lu\n",
(unsigned long) instat[MU_STREAM_STAT_OUT]);
fprintf (stderr, "Reads: %lu\n",
(unsigned long) instat[MU_STREAM_STAT_READS]);
fprintf (stderr, "Seeks: %lu\n",
(unsigned long) instat[MU_STREAM_STAT_SEEKS]);
fprintf (stderr, "\nOutput stream stats:\n");
fprintf (stderr, "Bytes in: %lu\n",
(unsigned long) outstat[MU_STREAM_STAT_IN]);
fprintf (stderr, "Bytes out: %lu\n",
(unsigned long) outstat[MU_STREAM_STAT_OUT]);
fprintf (stderr, "Writes: %lu\n",
(unsigned long) outstat[MU_STREAM_STAT_WRITES]);
fprintf (stderr, "Seeks: %lu\n",
(unsigned long) outstat[MU_STREAM_STAT_SEEKS]);
}
return 0;
}
......