Commit 1f6c71fc 1f6c71fcb9f5b51cf79c0cb5212031f14adb631e by Sergey Poznyakoff

* mailbox/socket_stream.c: New file.

* include/mailutils/stream.h (mu_socket_stream_create)
(mu_stream_shutdown, mu_stream_set_shutdown): New functions.
* libproto/include/stream0.h (struct _mu_stream): New member
`_shutdown'.
* mailbox/file_stream.c (mu_stream_create): Bugfix.
* mailbox/stream.c (mu_stream_shutdown)
(mu_stream_set_shutdown): New functions.
* mailbox/tcp.c (_tcp_shutdown): New function.
(_tcp_stream_init): New function.
(mu_tcp_stream_create_with_source_ip): Register _tcp_shutdown.
1 parent a1dc0f56
...@@ -56,6 +56,8 @@ extern int mu_tcp_stream_create_with_source_host (mu_stream_t *stream, ...@@ -56,6 +56,8 @@ extern int mu_tcp_stream_create_with_source_host (mu_stream_t *stream,
56 const char *host, int port, 56 const char *host, int port,
57 const char *source_host, 57 const char *source_host,
58 int flags); 58 int flags);
59 extern int mu_socket_stream_create (mu_stream_t *stream, const char *filename,
60 int flags);
59 61
60 extern int mu_mapfile_stream_create (mu_stream_t *stream, const char* filename, 62 extern int mu_mapfile_stream_create (mu_stream_t *stream, const char* filename,
61 int flags); 63 int flags);
...@@ -91,6 +93,7 @@ extern int mu_stream_write (mu_stream_t, const char *, size_t, mu_off_t, ...@@ -91,6 +93,7 @@ extern int mu_stream_write (mu_stream_t, const char *, size_t, mu_off_t,
91 size_t *); 93 size_t *);
92 extern int mu_stream_setbufsiz (mu_stream_t stream, size_t size); 94 extern int mu_stream_setbufsiz (mu_stream_t stream, size_t size);
93 extern int mu_stream_flush (mu_stream_t); 95 extern int mu_stream_flush (mu_stream_t);
96 extern int mu_stream_shutdown (mu_stream_t stream, int how);
94 97
95 extern int mu_stream_vprintf (mu_stream_t os, mu_off_t *poff, 98 extern int mu_stream_vprintf (mu_stream_t os, mu_off_t *poff,
96 const char *fmt, va_list ap); 99 const char *fmt, va_list ap);
...@@ -166,6 +169,9 @@ extern int mu_stream_set_strerror (mu_stream_t stream, ...@@ -166,6 +169,9 @@ extern int mu_stream_set_strerror (mu_stream_t stream,
166 169
167 extern int mu_stream_set_wait (mu_stream_t stream, 170 extern int mu_stream_set_wait (mu_stream_t stream,
168 int (*wait) (mu_stream_t, int *, struct timeval *), void *owner); 171 int (*wait) (mu_stream_t, int *, struct timeval *), void *owner);
172
173 extern int mu_stream_set_shutdown (mu_stream_t stream,
174 int (*_shutdown) (mu_stream_t, int how), void *owner);
169 175
170 extern int mu_stream_sequential_read (mu_stream_t stream, 176 extern int mu_stream_sequential_read (mu_stream_t stream,
171 char *buf, size_t size, size_t *nbytes); 177 char *buf, size_t size, size_t *nbytes);
......
...@@ -65,6 +65,7 @@ struct _mu_stream ...@@ -65,6 +65,7 @@ struct _mu_stream
65 int (*_setbufsiz)(mu_stream_t, size_t); 65 int (*_setbufsiz)(mu_stream_t, size_t);
66 int (*_strerror) (mu_stream_t, const char **); 66 int (*_strerror) (mu_stream_t, const char **);
67 int (*_wait) (mu_stream_t, int *pflags, struct timeval *tvp); 67 int (*_wait) (mu_stream_t, int *pflags, struct timeval *tvp);
68 int (*_shutdown) (mu_stream_t, int how);
68 }; 69 };
69 70
70 #ifdef __cplusplus 71 #ifdef __cplusplus
......
...@@ -546,8 +546,8 @@ mu_file_stream_create (mu_stream_t *stream, const char* filename, int flags) ...@@ -546,8 +546,8 @@ mu_file_stream_create (mu_stream_t *stream, const char* filename, int flags)
546 ret = mu_stream_create (stream, flags|MU_STREAM_NO_CHECK, fs); 546 ret = mu_stream_create (stream, flags|MU_STREAM_NO_CHECK, fs);
547 if (ret != 0) 547 if (ret != 0)
548 { 548 {
549 free (fs);
550 free (fs->filename); 549 free (fs->filename);
550 free (fs);
551 return ret; 551 return ret;
552 } 552 }
553 553
......
1 /* GNU Mailutils -- a suite of utilities for electronic mail
2 Copyright (C) 1999, 2000, 2001, 2002, 2004,
3 2005, 2006, 2007 Free Software Foundation, Inc.
4
5 This library is free software; you can redistribute it and/or
6 modify it under the terms of the GNU Lesser General Public
7 License as published by the Free Software Foundation; either
8 version 3 of the License, or (at your option) any later version.
9
10 This library is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Lesser General Public License for more details.
14
15 You should have received a copy of the GNU Lesser General
16 Public License along with this library; if not, write to the
17 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 Boston, MA 02110-1301 USA */
19
20 #ifdef HAVE_CONFIG_H
21 # include <config.h>
22 #endif
23
24 #include <unistd.h>
25 #include <stdlib.h>
26 #include <sys/types.h>
27 #include <sys/socket.h>
28 #include <sys/un.h>
29 #include <string.h>
30 #include <signal.h>
31 #include <errno.h>
32
33 #include <mailutils/stream.h>
34 #include <mailutils/error.h>
35 #include <mailutils/errno.h>
36
37 struct _socket_stream
38 {
39 mu_stream_t fstream;
40 char *filename;
41 int ec; /* Last error code if fstream == NULL */
42 };
43
44 static void
45 _s_destroy (mu_stream_t stream)
46 {
47 struct _socket_stream *s = mu_stream_get_owner (stream);
48
49 if (s->filename)
50 free (s->filename);
51 mu_stream_destroy (&s->fstream, mu_stream_get_owner (s->fstream));
52 free (s);
53 }
54
55 static int
56 _s_read (mu_stream_t stream, char *optr, size_t osize,
57 mu_off_t offset, size_t *nbytes)
58 {
59 struct _socket_stream *s = mu_stream_get_owner (stream);
60 return mu_stream_read (s->fstream, optr, osize, offset, nbytes);
61 }
62
63 static int
64 _s_readline (mu_stream_t stream, char *optr, size_t osize,
65 mu_off_t offset, size_t *nbytes)
66 {
67 struct _socket_stream *s = mu_stream_get_owner (stream);
68 return mu_stream_readline (s->fstream, optr, osize, offset, nbytes);
69 }
70
71 static int
72 _s_write (mu_stream_t stream, const char *iptr, size_t isize,
73 mu_off_t offset, size_t *nbytes)
74 {
75 struct _socket_stream *s = mu_stream_get_owner (stream);
76 return mu_stream_write (s->fstream, iptr, isize, offset, nbytes);
77 }
78
79 static int
80 _s_open (mu_stream_t stream)
81 {
82 struct _socket_stream *s = mu_stream_get_owner (stream);
83 int fd, rc;
84 FILE *fp;
85 struct sockaddr_un addr;
86 char *fstr;
87 int flags;
88
89 if (!s)
90 return EINVAL;
91
92 fd = socket (PF_UNIX, SOCK_STREAM, 0);
93 if (fd < 0)
94 return errno;
95
96 memset (&addr, 0, sizeof addr);
97 addr.sun_family = AF_UNIX;
98 strncpy (addr.sun_path, s->filename, sizeof addr.sun_path - 1);
99 addr.sun_path[sizeof addr.sun_path - 1] = 0;
100 if (connect (fd, (struct sockaddr *) &addr, sizeof(addr)))
101 {
102 close (fd);
103 return errno;
104 }
105
106 mu_stream_get_flags(stream, &flags);
107 if (flags & MU_STREAM_WRITE)
108 fstr = "w";
109 else if (flags & MU_STREAM_RDWR)
110 fstr = "w+";
111 else /* default, also if flags & MU_STREAM_READ */
112 fstr = "r";
113
114 fp = fdopen (fd, fstr);
115 if (!fp)
116 {
117 close (fd);
118 return errno;
119 }
120
121 rc = mu_stdio_stream_create (&s->fstream, fp, flags);
122 if (rc)
123 {
124 fclose (fp);
125 return rc;
126 }
127
128 rc = mu_stream_open (s->fstream);
129 if (rc)
130 {
131 mu_stream_destroy (&s->fstream, mu_stream_get_owner (s->fstream));
132 fclose (fp);
133 }
134 return rc;
135 }
136
137 static int
138 _s_close (mu_stream_t stream)
139 {
140 struct _socket_stream *s = mu_stream_get_owner (stream);
141 return mu_stream_close (s->fstream);
142 }
143
144 static int
145 _s_flush (mu_stream_t stream)
146 {
147 struct _socket_stream *s = mu_stream_get_owner (stream);
148 return mu_stream_flush (s->fstream);
149 }
150
151 int
152 _s_wait (mu_stream_t stream, int *pflags, struct timeval *tvp)
153 {
154 struct _socket_stream *s = mu_stream_get_owner (stream);
155 return mu_stream_wait (s->fstream, pflags, tvp);
156 }
157
158 int
159 _s_strerror (mu_stream_t stream, const char **pstr)
160 {
161 struct _socket_stream *s = mu_stream_get_owner (stream);
162 return mu_stream_strerror (s->fstream, pstr);
163 }
164
165 static int
166 _s_get_transport2 (mu_stream_t stream,
167 mu_transport_t *pin, mu_transport_t *pout)
168 {
169 struct _socket_stream *s = mu_stream_get_owner (stream);
170 return mu_stream_get_transport2 (s->fstream, pin, pout);
171 }
172
173 int
174 _s_shutdown (mu_stream_t stream, int how)
175 {
176 struct _socket_stream *s = mu_stream_get_owner (stream);
177 int flag;
178 mu_transport_t trans;
179
180 if (s->fstream == NULL)
181 return EINVAL;
182
183 mu_stream_get_transport(s->fstream, &trans);
184 switch (how)
185 {
186 case MU_STREAM_READ:
187 flag = SHUT_RD;
188 break;
189
190 case MU_STREAM_WRITE:
191 flag = SHUT_WR;
192 }
193
194 if (shutdown ((int) trans, flag))
195 return errno;
196 return 0;
197 }
198
199 int
200 mu_socket_stream_create (mu_stream_t *stream, const char *filename, int flags)
201 {
202 struct _socket_stream *s;
203 int rc;
204
205 if (stream == NULL)
206 return MU_ERR_OUT_PTR_NULL;
207
208 s = calloc (1, sizeof (struct _socket_stream));
209 if (s == NULL)
210 return ENOMEM;
211
212 if ((s->filename = strdup (filename)) == NULL)
213 {
214 free (s);
215 return ENOMEM;
216 }
217
218 rc = mu_stream_create (stream, flags | MU_STREAM_NO_CHECK, s);
219 if (rc)
220 {
221 free (s);
222 free (s->filename);
223 return rc;
224 }
225
226 mu_stream_set_open (*stream, _s_open, s);
227 mu_stream_set_close (*stream, _s_close, s);
228 mu_stream_set_get_transport2 (*stream, _s_get_transport2, s);
229 mu_stream_set_read (*stream, _s_read, s);
230 mu_stream_set_readline (*stream, _s_readline, s);
231 mu_stream_set_write (*stream, _s_write, s);
232 mu_stream_set_flush (*stream, _s_flush, s);
233 mu_stream_set_destroy (*stream, _s_destroy, s);
234 mu_stream_set_strerror (*stream, _s_strerror, s);
235 mu_stream_set_wait (*stream, _s_wait, s);
236 mu_stream_set_shutdown (*stream, _s_shutdown, s);
237
238 return 0;
239 }
240
241
242
243
...@@ -669,6 +669,25 @@ mu_stream_get_state (mu_stream_t stream, int *pstate) ...@@ -669,6 +669,25 @@ mu_stream_get_state (mu_stream_t stream, int *pstate)
669 } 669 }
670 670
671 int 671 int
672 mu_stream_shutdown (mu_stream_t stream, int how)
673 {
674 if (stream == NULL)
675 return EINVAL;
676 if (!stream->_shutdown)
677 return ENOSYS;
678 switch (how)
679 {
680 case MU_STREAM_READ:
681 case MU_STREAM_WRITE:
682 break;
683
684 default:
685 return EINVAL;
686 }
687 return stream->_shutdown (stream, how);
688 }
689
690 int
672 mu_stream_set_destroy (mu_stream_t stream, 691 mu_stream_set_destroy (mu_stream_t stream,
673 void (*_destroy) (mu_stream_t), void *owner) 692 void (*_destroy) (mu_stream_t), void *owner)
674 { 693 {
...@@ -849,6 +868,20 @@ mu_stream_set_wait (mu_stream_t stream, ...@@ -849,6 +868,20 @@ mu_stream_set_wait (mu_stream_t stream,
849 } 868 }
850 869
851 int 870 int
871 mu_stream_set_shutdown (mu_stream_t stream,
872 int (*_shutdown) (mu_stream_t, int how), void *owner)
873 {
874 if (stream == NULL)
875 return EINVAL;
876 if (owner == stream->owner)
877 {
878 stream->_shutdown = _shutdown;
879 return 0;
880 }
881 return EACCES;
882 }
883
884 int
852 mu_stream_sequential_read (mu_stream_t stream, char *buf, size_t size, 885 mu_stream_sequential_read (mu_stream_t stream, char *buf, size_t size,
853 size_t *nbytes) 886 size_t *nbytes)
854 { 887 {
......
...@@ -266,6 +266,42 @@ _tcp_wait (mu_stream_t stream, int *pflags, struct timeval *tvp) ...@@ -266,6 +266,42 @@ _tcp_wait (mu_stream_t stream, int *pflags, struct timeval *tvp)
266 } 266 }
267 267
268 int 268 int
269 _tcp_shutdown (mu_stream_t stream, int how)
270 {
271 struct _tcp_instance *tcp = mu_stream_get_owner (stream);
272 int flag;
273 if (tcp->fd == -1)
274 return EINVAL;
275
276 switch (how)
277 {
278 case MU_STREAM_READ:
279 flag = SHUT_RD;
280 break;
281
282 case MU_STREAM_WRITE:
283 flag = SHUT_WR;
284 }
285
286 if (shutdown (tcp->fd, flag))
287 return errno;
288 return 0;
289 }
290
291 static void
292 _tcp_stream_init (mu_stream_t stream, struct _tcp_instance *tcp)
293 {
294 mu_stream_set_open (stream, _tcp_open, tcp);
295 mu_stream_set_close (stream, _tcp_close, tcp);
296 mu_stream_set_read (stream, _tcp_read, tcp);
297 mu_stream_set_write (stream, _tcp_write, tcp);
298 mu_stream_set_get_transport2 (stream, _tcp_get_transport2, tcp);
299 mu_stream_set_destroy (stream, _tcp_destroy, tcp);
300 mu_stream_set_wait (stream, _tcp_wait, tcp);
301 mu_stream_set_shutdown (stream, _tcp_shutdown, tcp);
302 }
303
304 int
269 mu_tcp_stream_create_with_source_ip (mu_stream_t *stream, 305 mu_tcp_stream_create_with_source_ip (mu_stream_t *stream,
270 const char *host, int port, 306 const char *host, int port,
271 unsigned long source_ip, 307 unsigned long source_ip,
...@@ -301,14 +337,7 @@ mu_tcp_stream_create_with_source_ip (mu_stream_t *stream, ...@@ -301,14 +337,7 @@ mu_tcp_stream_create_with_source_ip (mu_stream_t *stream,
301 return ret; 337 return ret;
302 } 338 }
303 339
304 mu_stream_set_open (*stream, _tcp_open, tcp); 340 _tcp_stream_init (*stream, tcp);
305 mu_stream_set_close (*stream, _tcp_close, tcp);
306 mu_stream_set_read (*stream, _tcp_read, tcp);
307 mu_stream_set_write (*stream, _tcp_write, tcp);
308 mu_stream_set_get_transport2 (*stream, _tcp_get_transport2, tcp);
309 mu_stream_set_destroy (*stream, _tcp_destroy, tcp);
310 mu_stream_set_wait (*stream, _tcp_wait, tcp);
311
312 return 0; 341 return 0;
313 } 342 }
314 343
......