Commit 62c5c62a 62c5c62a7e8fd3524227cd241986d6ce4711b239 by Sergey Poznyakoff

Implement "read cache" streams. Rewrite stdio and socket streams.

* include/mailutils/sys/stdio_stream.h: Remove.
* include/mailutils/sys/socket_stream.h: Remove.
* include/mailutils/sys/rdcache_stream.h: New file.
* include/mailutils/sys/Makefile.am: Update.
* mailbox/rdcache_stream.c: New file.
* mailbox/Makefile.am: Update.

* examples/mucat.c: New example.
* examples/musocio.c: New example.
* examples/Makefile.am (noinst_PROGRAMS): Build new examples.

* include/mailutils/stream.h (mu_fd_stream_create): New proto.
(mu_rdcache_stream_create): New proto.
* include/mailutils/sys/file_stream.h (_mu_file_stream_create): Change
prototype.
* mailbox/file_stream.c (fd_open): Raise the MU_STREAM_AUTOCLOSE bit.
(fd_ioctl): Support MU_IOCTL_SET_TRANSPORT.
(_mu_file_stream_create): Change signature. All uses updated.
Allocate a copy of the filename argument, unless it is NULL.
(mu_fd_stream_create): New function.

* mailbox/socket_stream.c: Rewrite using file_stream directly.
* mailbox/stdio_stream.c: Rewrite. Use rdcache_stream if
the seek capability is required on an input stream.
* mailbox/streamcpy.c (mu_stream_copy): Handle eventual
EACCES return from mu_stream_seek as equivalent to ENOSYS.
1 parent 69f7dcae
...@@ -49,9 +49,11 @@ noinst_PROGRAMS = \ ...@@ -49,9 +49,11 @@ noinst_PROGRAMS = \
49 mimetest\ 49 mimetest\
50 msg-send\ 50 msg-send\
51 mta\ 51 mta\
52 mucat\
52 muauth\ 53 muauth\
53 muemail\ 54 muemail\
54 murun\ 55 murun\
56 musocio\
55 $(NNTPCLIENT)\ 57 $(NNTPCLIENT)\
56 $(POP3CLIENT)\ 58 $(POP3CLIENT)\
57 sfrom\ 59 sfrom\
......
1 /* GNU Mailutils -- a suite of utilities for electronic mail
2 Copyright (C) 1999, 2000, 2001, 2002, 2005, 2007, 2010 Free Software
3 Foundation, Inc.
4
5 GNU Mailutils is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 3, or (at your option)
8 any later version.
9
10 GNU Mailutils is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNU Mailutils; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
18 MA 02110-1301 USA */
19
20 #ifdef HAVE_CONFIG_H
21 # include <config.h>
22 #endif
23 #include <unistd.h>
24 #include <stdio.h>
25 #include <assert.h>
26 #include <ctype.h>
27 #include <string.h>
28 #include <mailutils/mailutils.h>
29
30 int
31 main (int argc, char * argv [])
32 {
33 int c;
34 mu_stream_t in, out;
35 int reread_option = 0;
36 mu_off_t reread_off;
37 int skip_option = 0;
38 mu_off_t skip_off;
39
40 while ((c = getopt (argc, argv, "hs:r:")) != EOF)
41 switch (c)
42 {
43 case 'r':
44 reread_option = 1;
45 reread_off = strtoul (optarg, NULL, 10);
46 break;
47
48 case 's':
49 skip_option = 1;
50 skip_off = strtoul (optarg, NULL, 10);
51 break;
52
53 case 'h':
54 printf ("usage: cat [-s off]\n");
55 exit (0);
56
57 default:
58 exit (1);
59 }
60
61
62 MU_ASSERT (mu_stdio_stream_create (&in, MU_STDIN_FD, MU_STREAM_SEEK));
63 MU_ASSERT (mu_stdio_stream_create (&out, MU_STDOUT_FD, 0));
64
65 if (skip_option)
66 {
67 mu_stream_printf (out, "skipping to %lu:\n",
68 (unsigned long) skip_off);
69 MU_ASSERT (mu_stream_seek (in, skip_off, MU_SEEK_SET, NULL));
70 }
71
72 MU_ASSERT (mu_stream_copy (out, in, 0));
73
74 if (reread_option)
75 {
76 mu_stream_printf (out, "rereading from %lu:\n",
77 (unsigned long) reread_off);
78 MU_ASSERT (mu_stream_seek (in, reread_off, MU_SEEK_SET, NULL));
79 MU_ASSERT (mu_stream_copy (out, in, 0));
80 }
81
82 mu_stream_close (in);
83 mu_stream_destroy (&in);
84 mu_stream_close (out);
85 mu_stream_destroy (&out);
86 return 0;
87 }
1 /* GNU Mailutils -- a suite of utilities for electronic mail
2 Copyright (C) 1999, 2000, 2001, 2002, 2005, 2007, 2010 Free Software
3 Foundation, Inc.
4
5 GNU Mailutils is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 3, or (at your option)
8 any later version.
9
10 GNU Mailutils is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNU Mailutils; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
18 MA 02110-1301 USA */
19
20 #ifdef HAVE_CONFIG_H
21 # include <config.h>
22 #endif
23
24 #include <sys/types.h>
25 #include <sys/wait.h>
26 #include <unistd.h>
27 #include <stdio.h>
28 #include <assert.h>
29 #include <ctype.h>
30 #include <string.h>
31 #include <mailutils/mailutils.h>
32
33 int verbose;
34
35 void
36 ioloop (char *id, mu_stream_t in, mu_stream_t out)
37 {
38 char *buf = NULL;
39 size_t size = 0, n;
40 int rc;
41
42 while ((rc = mu_stream_getline (in, &buf, &size, &n)) == 0 && n > 0)
43 {
44 if (rc)
45 {
46 mu_error("%s: read error: %s", id, mu_stream_strerror (in, rc));
47 exit (1);
48 }
49 MU_ASSERT (mu_stream_write (out, buf, n, NULL));
50 }
51 mu_stream_flush (out);
52 if (verbose)
53 fprintf (stderr, "%s exited\n", id);
54 }
55
56 int
57 main (int argc, char * argv [])
58 {
59 mu_stream_t in, out, sock;
60 pid_t pid;
61 int status, c;
62
63 while ((c = getopt (argc, argv, "v")) != EOF)
64 switch (c)
65 {
66 case 'v':
67 verbose++;
68 break;
69
70 case 'h':
71 printf ("usage: musocio file\n");
72 return 0;
73
74 default:
75 return 1;
76 }
77
78 argc -= optind;
79 argv += optind;
80
81 if (argc != 2)
82 {
83 mu_error ("usage: musocio file");
84 return 1;
85 }
86
87 MU_ASSERT (mu_stdio_stream_create (&in, MU_STDIN_FD, 0));
88 mu_stream_set_buffer (in, mu_buffer_line, 1024);
89 MU_ASSERT (mu_stdio_stream_create (&out, MU_STDOUT_FD, 0));
90 mu_stream_set_buffer (out, mu_buffer_line, 1024);
91 MU_ASSERT (mu_socket_stream_create (&sock, argv[1], MU_STREAM_RDWR));
92 mu_stream_set_buffer (sock, mu_buffer_line, 1024);
93 MU_ASSERT (mu_stream_open (sock));
94
95 pid = fork ();
96 if (pid == -1)
97 {
98 mu_error ("fork failed: %s", mu_strerror (errno));
99 return 1;
100 }
101
102 if (pid == 0)
103 {
104 mu_stream_close (in);
105 mu_stream_destroy (&in);
106 ioloop ("reader", sock, out);
107 exit (0);
108 }
109
110 ioloop ("writer", in, sock);
111
112 mu_stream_close (in);
113 mu_stream_destroy (&in);
114
115 mu_stream_shutdown (sock, MU_STREAM_WRITE);
116 waitpid (pid, &status, 0);
117
118 mu_stream_close (sock);
119 mu_stream_destroy (&sock);
120
121 mu_stream_close (out);
122 mu_stream_destroy (&out);
123 return 0;
124 }
...@@ -120,6 +120,8 @@ int mu_stream_copy (mu_stream_t dst, mu_stream_t src, size_t size); ...@@ -120,6 +120,8 @@ int mu_stream_copy (mu_stream_t dst, mu_stream_t src, size_t size);
120 120
121 int mu_file_stream_create (mu_stream_t *pstream, const char *filename, int flags); 121 int mu_file_stream_create (mu_stream_t *pstream, const char *filename, int flags);
122 int mu_temp_file_stream_create (mu_stream_t *pstream, const char *dir); 122 int mu_temp_file_stream_create (mu_stream_t *pstream, const char *dir);
123 int mu_fd_stream_create (mu_stream_t *pstream, char *filename, int fd,
124 int flags);
123 125
124 #define MU_STDIN_FD 0 126 #define MU_STDIN_FD 0
125 #define MU_STDOUT_FD 1 127 #define MU_STDOUT_FD 1
...@@ -157,4 +159,7 @@ int mu_iostream_create (mu_stream_t *pref, mu_stream_t in, mu_stream_t out); ...@@ -157,4 +159,7 @@ int mu_iostream_create (mu_stream_t *pref, mu_stream_t in, mu_stream_t out);
157 int mu_dbgstream_create(mu_stream_t *pref, mu_debug_t debug, 159 int mu_dbgstream_create(mu_stream_t *pref, mu_debug_t debug,
158 mu_log_level_t level, int flags); 160 mu_log_level_t level, int flags);
159 161
162 int mu_rdcache_stream_create (mu_stream_t *pstream, mu_stream_t transport,
163 int flags);
164
160 #endif 165 #endif
......
...@@ -49,9 +49,8 @@ sysinclude_HEADERS = \ ...@@ -49,9 +49,8 @@ sysinclude_HEADERS = \
49 pop3.h\ 49 pop3.h\
50 prog_stream.h\ 50 prog_stream.h\
51 property.h\ 51 property.h\
52 rdcache_stream.h\
52 registrar.h\ 53 registrar.h\
53 socket_stream.h\
54 stdio_stream.h\
55 streamref.h\ 54 streamref.h\
56 streamtrans.h\ 55 streamtrans.h\
57 stream.h\ 56 stream.h\
......
...@@ -31,7 +31,7 @@ struct _mu_file_stream ...@@ -31,7 +31,7 @@ struct _mu_file_stream
31 char *filename; 31 char *filename;
32 }; 32 };
33 33
34 int _mu_file_stream_create (mu_stream_t *pstream, size_t size, 34 int _mu_file_stream_create (struct _mu_file_stream **pstream, size_t size,
35 char *filename, int flags); 35 const char *filename, int fd, int flags);
36 36
37 #endif 37 #endif
......
1 /* GNU Mailutils -- a suite of utilities for electronic mail 1 /* GNU Mailutils -- a suite of utilities for electronic mail
2 Copyright (C) 2009 Free Software Foundation, Inc. 2 Copyright (C) 2010 Free Software Foundation, Inc.
3 3
4 This library is free software; you can redistribute it and/or modify 4 This library is free software; you can redistribute it and/or modify
5 it under the terms of the GNU Lesser General Public License as published by 5 it under the terms of the GNU Lesser General Public License as published by
...@@ -14,21 +14,20 @@ ...@@ -14,21 +14,20 @@
14 You should have received a copy of the GNU Lesser General Public License 14 You should have received a copy of the GNU Lesser General Public License
15 along with GNU Mailutils. If not, see <http://www.gnu.org/licenses/>. */ 15 along with GNU Mailutils. If not, see <http://www.gnu.org/licenses/>. */
16 16
17 #ifndef _MAILUTILS_SYS_STDIO_STREAM_H 17 #ifndef _MAILUTILS_SYS_RDCACHE_STREAM_H
18 #define _MAILUTILS_SYS_STDIO_STREAM_H 18 # define _MAILUTILS_SYS_RDCACHE_STREAM_H
19 19
20 #include <mailutils/sys/file_stream.h> 20 # include <mailutils/types.h>
21 # include <mailutils/stream.h>
22 # include <mailutils/sys/stream.h>
21 23
22 #define _MU_STDIO_SIZE_COMPUTED 0x02 24 struct _mu_rdcache_stream
23
24 struct _mu_stdio_stream
25 { 25 {
26 struct _mu_file_stream file_stream; 26 struct _mu_stream stream;
27 mu_stream_t transport;
27 mu_stream_t cache; 28 mu_stream_t cache;
28 mu_off_t size; 29 mu_off_t size;
29 mu_off_t offset; 30 mu_off_t offset;
30 }; 31 };
31 32
32 int _mu_stdio_stream_create (mu_stream_t *pstream, size_t size, int flags);
33
34 #endif 33 #endif
......
1 /* GNU Mailutils -- a suite of utilities for electronic mail
2 Copyright (C) 2009 Free Software Foundation, Inc.
3
4 This library is free software; you can redistribute it and/or modify
5 it under the terms of the GNU Lesser General Public License as published by
6 the Free Software Foundation; either version 3, or (at your option)
7 any later version.
8
9 This library is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU Lesser General Public License for more details.
13
14 You should have received a copy of the GNU Lesser General Public License
15 along with GNU Mailutils. If not, see <http://www.gnu.org/licenses/>. */
16
17 #ifndef _MAILUTILS_SYS_SOCKET_STREAM_H
18 #define _MAILUTILS_SYS_SOCKET_STREAM_H
19
20 #include <mailutils/sys/stdio_stream.h>
21
22 struct _mu_socket_stream
23 {
24 struct _mu_stdio_stream stdio_stream;
25 char *filename;
26 };
27
28 #endif
...@@ -126,6 +126,7 @@ libmailutils_la_SOURCES = \ ...@@ -126,6 +126,7 @@ libmailutils_la_SOURCES = \
126 prog_stream.c\ 126 prog_stream.c\
127 property.c\ 127 property.c\
128 qpflt.c\ 128 qpflt.c\
129 rdcache_stream.c\
129 registrar.c\ 130 registrar.c\
130 refcount.c\ 131 refcount.c\
131 rfc2047.c\ 132 rfc2047.c\
......
...@@ -146,6 +146,9 @@ fd_open (struct _mu_stream *str) ...@@ -146,6 +146,9 @@ fd_open (struct _mu_stream *str)
146 if (fd < 0) 146 if (fd < 0)
147 return errno; 147 return errno;
148 148
149 /* Make sure it will be closed */
150 fstr->flags |= MU_STREAM_AUTOCLOSE;
151
149 fstr->fd = fd; 152 fstr->fd = fd;
150 return 0; 153 return 0;
151 } 154 }
...@@ -207,6 +210,13 @@ fd_ioctl (struct _mu_stream *str, int code, void *ptr) ...@@ -207,6 +210,13 @@ fd_ioctl (struct _mu_stream *str, int code, void *ptr)
207 ptrans[1] = NULL; 210 ptrans[1] = NULL;
208 break; 211 break;
209 212
213 case MU_IOCTL_SET_TRANSPORT:
214 if (!ptr)
215 return EINVAL;
216 ptrans = ptr;
217 fstr->fd = (int) ptrans[0];
218 break;
219
210 default: 220 default:
211 return EINVAL; 221 return EINVAL;
212 } 222 }
...@@ -233,12 +243,11 @@ fd_truncate (mu_stream_t stream, mu_off_t size) ...@@ -233,12 +243,11 @@ fd_truncate (mu_stream_t stream, mu_off_t size)
233 } 243 }
234 244
235 int 245 int
236 _mu_file_stream_create (mu_stream_t *pstream, size_t size, 246 _mu_file_stream_create (struct _mu_file_stream **pstream, size_t size,
237 char *filename, int flags) 247 const char *filename, int fd, int flags)
238 { 248 {
239 struct _mu_file_stream *str = 249 struct _mu_file_stream *str =
240 (struct _mu_file_stream *) 250 (struct _mu_file_stream *) _mu_stream_create (size, flags);
241 _mu_stream_create (size, flags | MU_STREAM_SEEK);
242 if (!str) 251 if (!str)
243 return ENOMEM; 252 return ENOMEM;
244 253
...@@ -254,25 +263,38 @@ _mu_file_stream_create (mu_stream_t *pstream, size_t size, ...@@ -254,25 +263,38 @@ _mu_file_stream_create (mu_stream_t *pstream, size_t size,
254 str->stream.truncate = fd_truncate; 263 str->stream.truncate = fd_truncate;
255 str->stream.error_string = fd_error_string; 264 str->stream.error_string = fd_error_string;
256 265
257 str->filename = filename; 266 if (filename)
258 str->fd = -1; 267 str->filename = mu_strdup (filename);
268 else
269 str->filename = NULL;
270 str->fd = fd;
259 str->flags = 0; 271 str->flags = 0;
260 *pstream = (mu_stream_t) str; 272 *pstream = str;
261 return 0; 273 return 0;
262 } 274 }
263 275
264 int 276 int
265 mu_file_stream_create (mu_stream_t *pstream, const char *filename, int flags) 277 mu_file_stream_create (mu_stream_t *pstream, const char *filename, int flags)
266 { 278 {
267 int rc; 279 struct _mu_file_stream *fstr;
268 char *fname = mu_strdup (filename); 280 int rc = _mu_file_stream_create (&fstr,
269 if (!fname) 281 sizeof (struct _mu_file_stream),
270 return ENOMEM; 282 filename, -1,
271 rc = _mu_file_stream_create (pstream, 283 flags | MU_STREAM_SEEK | MU_STREAM_AUTOCLOSE);
272 sizeof (struct _mu_file_stream), 284 if (rc == 0)
273 fname, flags | MU_STREAM_AUTOCLOSE); 285 *pstream = (mu_stream_t) fstr;
274 if (rc) 286 return rc;
275 free (fname); 287 }
288
289 int
290 mu_fd_stream_create (mu_stream_t *pstream, char *filename, int fd, int flags)
291 {
292 struct _mu_file_stream *fstr;
293 int rc = _mu_file_stream_create (&fstr,
294 sizeof (struct _mu_file_stream),
295 filename, fd, flags);
296 if (rc == 0)
297 *pstream = (mu_stream_t) fstr;
276 return rc; 298 return rc;
277 } 299 }
278 300
......
1 /* GNU Mailutils -- a suite of utilities for electronic mail
2 Copyright (C) 2009, 2010 Free Software Foundation, Inc.
3
4 This library is free software; you can redistribute it and/or modify
5 it under the terms of the GNU Lesser General Public License as published by
6 the Free Software Foundation; either version 3, or (at your option)
7 any later version.
8
9 This library is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU Lesser General Public License for more details.
13
14 You should have received a copy of the GNU Lesser General Public License
15 along with GNU Mailutils. If not, see <http://www.gnu.org/licenses/>. */
16
17 #ifdef HAVE_CONFIG_H
18 # include <config.h>
19 #endif
20
21 #include <stdlib.h>
22 #include <errno.h>
23 #include <mailutils/types.h>
24 #include <mailutils/sys/rdcache_stream.h>
25
26 size_t mu_rdcache_stream_max_memory_size = 4096;
27
28 static int
29 rdcache_read (struct _mu_stream *str, char *buf, size_t size, size_t *pnbytes)
30 {
31 struct _mu_rdcache_stream *sp = (struct _mu_rdcache_stream *) str;
32 int status = 0;
33 size_t nbytes = 0;
34
35 if (sp->offset < sp->size)
36 {
37 status = mu_stream_read (sp->cache, buf, size, &nbytes);
38 if (status)
39 return status;
40 sp->offset += nbytes;
41 sp->size += nbytes;
42 buf += nbytes;
43 size -= nbytes;
44 }
45 else if (sp->offset > sp->size)
46 {
47 size_t left = sp->offset - sp->size;
48 status = mu_stream_seek (sp->cache, 0, MU_SEEK_END, NULL);
49 if (status)
50 return status;
51 status = mu_stream_copy (sp->cache, sp->transport, left);
52 if (status)
53 return status;
54 sp->size = sp->offset;
55 }
56
57 if (size)
58 {
59 size_t rdbytes;
60 status = mu_stream_read (sp->transport, buf, size, &rdbytes);
61 if (rdbytes)
62 {
63 int rc;
64
65 sp->offset += rdbytes;
66 sp->size += rdbytes;
67 nbytes += rdbytes;
68 rc = mu_stream_write (sp->cache, buf, rdbytes, NULL);
69 if (rc)
70 {
71 if (status == 0)
72 status = rc;
73 }
74 }
75 }
76 if (pnbytes)
77 *pnbytes = nbytes;
78 return status;
79 }
80
81 static int
82 rdcache_size (struct _mu_stream *str, off_t *psize)
83 {
84 struct _mu_rdcache_stream *sp = (struct _mu_rdcache_stream *) str;
85 *psize = sp->size;
86 return 0;
87 }
88
89 static int
90 rdcache_seek (struct _mu_stream *str, mu_off_t off, mu_off_t *presult)
91 {
92 struct _mu_rdcache_stream *sp = (struct _mu_rdcache_stream *) str;
93
94 if (off < 0)
95 return ESPIPE;
96
97 if (off < sp->size)
98 {
99 int status = mu_stream_seek (sp->cache, off, MU_SEEK_SET, NULL);
100 if (status)
101 return status;
102 }
103
104 sp->offset = off;
105 *presult = sp->offset;
106 return 0;
107 }
108
109 static int
110 rdcache_wait (struct _mu_stream *str, int *pflags, struct timeval *tvp)
111 {
112 struct _mu_rdcache_stream *sp = (struct _mu_rdcache_stream *) str;
113 return mu_stream_wait (sp->transport, pflags, tvp);
114 }
115
116 /* FIXME: Truncate? */
117
118 static int
119 rdcache_ioctl (struct _mu_stream *str, int op, void *arg)
120 {
121 struct _mu_rdcache_stream *sp = (struct _mu_rdcache_stream *) str;
122 mu_transport_t *ptrans;
123
124 switch (op)
125 {
126 case MU_IOCTL_GET_TRANSPORT:
127 if (!arg)
128 return EINVAL;
129 ptrans = arg;
130 ptrans[0] = (mu_transport_t) sp->transport;
131 ptrans[1] = NULL;
132 break;
133
134 default:
135 return EINVAL;
136 }
137 return 0;
138 }
139
140 static int
141 rdcache_open (struct _mu_stream *str)
142 {
143 struct _mu_rdcache_stream *sp = (struct _mu_rdcache_stream *) str;
144 return mu_stream_open (sp->transport);
145 }
146
147 static int
148 rdcache_close (struct _mu_stream *str)
149 {
150 struct _mu_rdcache_stream *sp = (struct _mu_rdcache_stream *) str;
151 return mu_stream_close (sp->transport);
152 }
153
154 static void
155 rdcache_done (struct _mu_stream *str)
156 {
157 struct _mu_rdcache_stream *sp = (struct _mu_rdcache_stream *) str;
158 mu_stream_unref (sp->transport);
159 mu_stream_unref (sp->cache);
160 }
161
162 int
163 mu_rdcache_stream_create (mu_stream_t *pstream, mu_stream_t transport,
164 int flags)
165 {
166 struct _mu_rdcache_stream *sp;
167 int rc;
168 int sflags = MU_STREAM_READ | MU_STREAM_SEEK | (flags & MU_STREAM_AUTOCLOSE);
169
170 if (flags & ~sflags)
171 return EINVAL;
172
173 sp = (struct _mu_rdcache_stream *)
174 _mu_stream_create (sizeof (*sp), sflags);
175 if (!sp)
176 return ENOMEM;
177
178 sp->stream.read = rdcache_read;
179 sp->stream.open = rdcache_open;
180 sp->stream.close = rdcache_close;
181 sp->stream.done = rdcache_done;
182 sp->stream.seek = rdcache_seek;
183 sp->stream.size = rdcache_size;
184 sp->stream.ctl = rdcache_ioctl;
185 sp->stream.wait = rdcache_wait;
186
187 if (!(flags & MU_STREAM_AUTOCLOSE))
188 mu_stream_ref (transport);
189 sp->transport = transport;
190
191 if ((rc = mu_memory_stream_create (&sp->cache, MU_STREAM_RDWR))
192 || (rc = mu_stream_open (sp->cache)))
193 {
194 mu_stream_destroy ((mu_stream_t*) &sp);
195 return rc;
196 }
197
198 *pstream = (mu_stream_t) sp;
199 return 0;
200 }
201
...@@ -31,22 +31,13 @@ ...@@ -31,22 +31,13 @@
31 #include <mailutils/errno.h> 31 #include <mailutils/errno.h>
32 #include <mailutils/nls.h> 32 #include <mailutils/nls.h>
33 #include <mailutils/stream.h> 33 #include <mailutils/stream.h>
34 #include <mailutils/sys/socket_stream.h> 34 #include <mailutils/sys/file_stream.h>
35
36 static void
37 _socket_done (mu_stream_t stream)
38 {
39 struct _mu_socket_stream *s = (struct _mu_socket_stream *) stream;
40
41 if (s->filename)
42 free (s->filename);
43 }
44 35
45 static int 36 static int
46 _socket_open (mu_stream_t stream) 37 _socket_open (mu_stream_t stream)
47 { 38 {
48 struct _mu_socket_stream *s = (struct _mu_socket_stream *) stream; 39 struct _mu_file_stream *s = (struct _mu_file_stream *) stream;
49 int fd, rc; 40 int fd;
50 struct sockaddr_un addr; 41 struct sockaddr_un addr;
51 42
52 if (!s) 43 if (!s)
...@@ -66,31 +57,15 @@ _socket_open (mu_stream_t stream) ...@@ -66,31 +57,15 @@ _socket_open (mu_stream_t stream)
66 return errno; 57 return errno;
67 } 58 }
68 59
69 s->stdio_stream.file_stream.fd = fd; 60 s->fd = fd;
70 s->stdio_stream.size = 0;
71 s->stdio_stream.offset = 0;
72 if (s->stdio_stream.cache)
73 mu_stream_truncate (s->stdio_stream.cache, 0);
74 61
75 return rc;
76 }
77
78 static int
79 _socket_close (mu_stream_t stream)
80 {
81 struct _mu_socket_stream *s = (struct _mu_socket_stream *) stream;
82 if (s->stdio_stream.file_stream.fd != -1)
83 {
84 close (s->stdio_stream.file_stream.fd);
85 s->stdio_stream.file_stream.fd = -1;
86 }
87 return 0; 62 return 0;
88 } 63 }
89 64
90 int 65 int
91 _socket_shutdown (mu_stream_t stream, int how) 66 _socket_shutdown (mu_stream_t stream, int how)
92 { 67 {
93 struct _mu_socket_stream *s = (struct _mu_socket_stream *) stream; 68 struct _mu_file_stream *s = (struct _mu_file_stream *) stream;
94 int flag; 69 int flag;
95 70
96 switch (how) 71 switch (how)
...@@ -103,7 +78,7 @@ _socket_shutdown (mu_stream_t stream, int how) ...@@ -103,7 +78,7 @@ _socket_shutdown (mu_stream_t stream, int how)
103 flag = SHUT_WR; 78 flag = SHUT_WR;
104 } 79 }
105 80
106 if (shutdown (s->stdio_stream.file_stream.fd, flag)) 81 if (shutdown (s->fd, flag))
107 return errno; 82 return errno;
108 return 0; 83 return 0;
109 } 84 }
...@@ -111,17 +86,37 @@ _socket_shutdown (mu_stream_t stream, int how) ...@@ -111,17 +86,37 @@ _socket_shutdown (mu_stream_t stream, int how)
111 int 86 int
112 mu_socket_stream_create (mu_stream_t *pstream, const char *filename, int flags) 87 mu_socket_stream_create (mu_stream_t *pstream, const char *filename, int flags)
113 { 88 {
114 struct _mu_socket_stream *s;
115 int rc; 89 int rc;
90 mu_stream_t transport;
91 int need_cache;
92 struct _mu_file_stream *fstr;
116 93
117 rc = _mu_stdio_stream_create (pstream, sizeof (*s), 94 need_cache = flags & MU_STREAM_SEEK;
118 flags | MU_STREAM_AUTOCLOSE); 95 if (need_cache && (flags & MU_STREAM_WRITE))
96 /* Write caches are not supported */
97 return EINVAL;
98
99 /* Create transport stream. */
100 rc = _mu_file_stream_create (&fstr, sizeof (*fstr),
101 filename, -1,
102 (flags | MU_STREAM_AUTOCLOSE) & ~MU_STREAM_SEEK);
119 if (rc) 103 if (rc)
120 return rc; 104 return rc;
121 s = (struct _mu_socket_stream *) *pstream; 105 fstr->stream.open = _socket_open;
122 s->stdio_stream.file_stream.stream.done = _socket_done; 106 fstr->stream.shutdown = _socket_shutdown;
123 s->stdio_stream.file_stream.stream.open = _socket_open; 107 transport = (mu_stream_t) fstr;
124 s->stdio_stream.file_stream.stream.close = _socket_close; 108
125 s->stdio_stream.file_stream.stream.shutdown = _socket_shutdown; 109 /* Wrap it in cache, if required */
110 if (need_cache)
111 {
112 mu_stream_t str;
113 rc = mu_rdcache_stream_create (&str, transport, flags);
114 mu_stream_unref (transport);
115 if (rc)
116 return rc;
117 transport = str;
118 }
119
120 *pstream = transport;
126 return 0; 121 return 0;
127 } 122 }
......
...@@ -22,166 +22,21 @@ ...@@ -22,166 +22,21 @@
22 #include <unistd.h> 22 #include <unistd.h>
23 #include <fcntl.h> 23 #include <fcntl.h>
24 24
25 #include <errno.h>
25 #include <mailutils/types.h> 26 #include <mailutils/types.h>
26 #include <mailutils/alloc.h>
27 #include <mailutils/error.h>
28 #include <mailutils/errno.h>
29 #include <mailutils/nls.h>
30 #include <mailutils/stream.h> 27 #include <mailutils/stream.h>
31 #include <mailutils/sys/stream.h> 28 #include <mailutils/sys/stream.h>
32 #include <mailutils/sys/stdio_stream.h> 29 #include <mailutils/sys/file_stream.h>
33 #include <mailutils/mutil.h>
34
35 static int
36 stdin_read (struct _mu_stream *str, char *buf, size_t size, size_t *pnbytes)
37 {
38 struct _mu_stdio_stream *fs = (struct _mu_stdio_stream *) str;
39 int fd = fs->file_stream.fd;
40 int status = 0;
41 size_t nbytes;
42 ssize_t rdbytes;
43
44 if (fs->offset < fs->size)
45 {
46 status = mu_stream_read (fs->cache, buf, size, &nbytes);
47 if (status)
48 fs->offset += nbytes;
49 }
50 else if (fs->offset > fs->size)
51 {
52 size_t left = fs->offset - fs->size + 1;
53 char sbuf[1024];
54 size_t bufsize;
55 char *tmpbuf = malloc (left);
56 if (tmpbuf)
57 bufsize = left;
58 else
59 {
60 tmpbuf = sbuf;
61 bufsize = sizeof sbuf;
62 }
63
64 while (left > 0)
65 {
66 size_t n;
67
68 rdbytes = read (fd, tmpbuf, bufsize);
69 if (rdbytes < 0)
70 {
71 status = errno;
72 break;
73 }
74 if (rdbytes == 0)
75 {
76 status = EIO; /* FIXME: EOF?? */
77 break;
78 }
79
80 status = mu_stream_write (fs->cache, tmpbuf, rdbytes, &n);
81 fs->offset += n;
82 left -= n;
83 if (status)
84 break;
85 }
86 if (tmpbuf != sbuf)
87 free (tmpbuf);
88 if (status)
89 return status;
90 }
91
92 nbytes = read (fd, buf, size);
93 if (nbytes <= 0)
94 return EIO;
95 else
96 {
97 status = mu_stream_write (fs->cache, buf, nbytes, NULL);
98 if (status)
99 return status;
100 }
101 fs->offset += nbytes;
102 fs->size += nbytes;
103
104 if (pnbytes)
105 *pnbytes = nbytes;
106 return status;
107 }
108
109 static int
110 stdout_write (struct _mu_stream *str, const char *buf, size_t size,
111 size_t *pret)
112 {
113 struct _mu_stdio_stream *fstr = (struct _mu_stdio_stream *) str;
114 int n = write (fstr->file_stream.fd, (char*) buf, size);
115 if (n == -1)
116 return errno;
117 fstr->size += n;
118 fstr->offset += n;
119 return 0;
120 }
121
122 static int
123 stdio_size (struct _mu_stream *str, off_t *psize)
124 {
125 struct _mu_stdio_stream *fs = (struct _mu_stdio_stream *) str;
126 *psize = fs->size;
127 return 0;
128 }
129
130 static int
131 stdio_seek (struct _mu_stream *str, mu_off_t off, mu_off_t *presult)
132 {
133 struct _mu_stdio_stream *fs = (struct _mu_stdio_stream *) str;
134
135 if (off < 0)
136 return ESPIPE;
137
138 fs->offset = off;
139 *presult = fs->offset;
140 return 0;
141 }
142
143 int
144 _mu_stdio_stream_create (mu_stream_t *pstream, size_t size, int flags)
145 {
146 struct _mu_stdio_stream *fs;
147 int rc;
148
149 rc = _mu_file_stream_create (pstream, size, NULL, flags);
150 if (rc)
151 return rc;
152 fs = (struct _mu_stdio_stream *) *pstream;
153
154 if (flags & MU_STREAM_SEEK)
155 {
156 if ((rc = mu_memory_stream_create (&fs->cache, MU_STREAM_RDWR))
157 || (rc = mu_stream_open (fs->cache)))
158 {
159 mu_stream_destroy ((mu_stream_t*) &fs);
160 return rc;
161 }
162 fs->file_stream.stream.read = stdin_read;
163 fs->file_stream.stream.write = stdout_write;
164 fs->file_stream.stream.size = stdio_size;
165 fs->file_stream.stream.seek = stdio_seek;
166 }
167 else
168 {
169 fs->file_stream.stream.flags &= ~MU_STREAM_SEEK;
170 fs->file_stream.stream.seek = NULL;
171 }
172 fs->file_stream.stream.open = NULL;
173 fs->file_stream.fd = -1;
174 return 0;
175 }
176 30
177 int 31 int
178 mu_stdio_stream_create (mu_stream_t *pstream, int fd, int flags) 32 mu_stdio_stream_create (mu_stream_t *pstream, int fd, int flags)
179 { 33 {
180 int rc; 34 int rc;
181 struct _mu_stdio_stream *fs; 35 char *filename;
182 36 mu_stream_t transport;
183 if (flags & MU_STREAM_SEEK && lseek (fd, 0, 0) == 0) 37 int need_cache;
184 flags &= ~MU_STREAM_SEEK; 38 struct _mu_file_stream *fstr;
39
185 switch (fd) 40 switch (fd)
186 { 41 {
187 case MU_STDIN_FD: 42 case MU_STDIN_FD:
...@@ -193,11 +48,36 @@ mu_stdio_stream_create (mu_stream_t *pstream, int fd, int flags) ...@@ -193,11 +48,36 @@ mu_stdio_stream_create (mu_stream_t *pstream, int fd, int flags)
193 flags |= MU_STREAM_WRITE; 48 flags |= MU_STREAM_WRITE;
194 } 49 }
195 50
196 rc = _mu_stdio_stream_create (pstream, sizeof (*fs), flags); 51 need_cache = flags & MU_STREAM_SEEK;
197 if (rc == 0) 52 if (need_cache && (flags & MU_STREAM_WRITE))
53 /* Write caches are not supported */
54 return EINVAL;
55
56 if (flags & MU_STREAM_READ)
57 filename = "<stdin>";
58 else
59 filename = "<stdout>";
60
61 /* Create transport stream. */
62 rc = _mu_file_stream_create (&fstr, sizeof (*fstr),
63 filename, fd, flags & ~MU_STREAM_SEEK);
64 if (rc)
65 return rc;
66 fstr->stream.open = NULL;
67 transport = (mu_stream_t) fstr;
68
69 /* Wrap it in cache, if required */
70 if (need_cache)
198 { 71 {
199 fs = (struct _mu_stdio_stream *) *pstream; 72 mu_stream_t str;
200 fs->file_stream.fd = fd; 73 rc = mu_rdcache_stream_create (&str, transport, flags);
74 mu_stream_unref (transport);
75 if (rc)
76 return rc;
77 transport = str;
201 } 78 }
202 return rc; 79
80 *pstream = transport;
81 return 0;
203 } 82 }
83
......
...@@ -69,6 +69,8 @@ mu_stream_copy (mu_stream_t dst, mu_stream_t src, size_t size) ...@@ -69,6 +69,8 @@ mu_stream_copy (mu_stream_t dst, mu_stream_t src, size_t size)
69 size -= pos; 69 size -= pos;
70 break; 70 break;
71 71
72 case EACCES:
73 mu_stream_clearerr (src);
72 case ENOSYS: 74 case ENOSYS:
73 break; 75 break;
74 76
......
...@@ -48,28 +48,18 @@ int ...@@ -48,28 +48,18 @@ int
48 mu_temp_file_stream_create (mu_stream_t *pstream, const char *dir) 48 mu_temp_file_stream_create (mu_stream_t *pstream, const char *dir)
49 { 49 {
50 int rc; 50 int rc;
51 char *fname;
52 struct _mu_file_stream *str; 51 struct _mu_file_stream *str;
53 52
54 if (!dir) 53 rc = _mu_file_stream_create (&str,
55 fname = NULL;
56 else if ((fname = mu_strdup (dir)) == NULL)
57 return ENOMEM;
58
59 rc = _mu_file_stream_create (pstream,
60 sizeof (struct _mu_file_stream), 54 sizeof (struct _mu_file_stream),
61 fname, 55 dir,
62 MU_STREAM_RDWR | MU_STREAM_CREAT | 56 -1,
57 MU_STREAM_RDWR | MU_STREAM_SEEK |
58 MU_STREAM_CREAT |
63 MU_STREAM_AUTOCLOSE); 59 MU_STREAM_AUTOCLOSE);
64 if (rc)
65 {
66 free (fname);
67 return rc;
68 }
69 str = (struct _mu_file_stream *) *pstream;
70 60
71 str->stream.open = fd_temp_open; 61 str->stream.open = fd_temp_open;
72 str->flags = _MU_FILE_STREAM_TEMP; 62 str->flags = _MU_FILE_STREAM_TEMP;
73 63 *pstream = (mu_stream_t) str;
74 return 0; 64 return 0;
75 } 65 }
......