Commit ce118732 ce11873215acb4cb1a17656c8c3effa4850dedcb by Alain Magloire

* mailbox2/tcp.c: make it thread-safe.

	* mailbox2/bstream.c: Implement buffered stream.
1 parent 515c5cfa
1 /* GNU mailutils - a suite of utilities for electronic mail
2 Copyright (C) 1999, 2000, 2001 Free Software Foundation, Inc.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Library Public License as published by
6 the Free Software Foundation; either version 2, or (at your option)
7 any later version.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU Library General Public License for more details.
13
14 You should have received a copy of the GNU Library General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */
17
18 /* Credits. Some of the Readline an buffering scheme was taken
19 from 4.4BSDLite2.
20
21 Copyright (c) 1990, 1993
22 The Regents of the University of California. All rights reserved.
23
24 This code is derived from software contributed to Berkeley by
25 Chris Torek.
26 */
27
28 #ifdef HAVE_CONFIG_H
29 # include <config.h>
30 #endif
31
32 #include <stdlib.h>
33 #include <string.h>
34
35 #include <mailutils/monitor.h>
36 #include <mailutils/error.h>
37 #include <mailutils/sys/bstream.h>
38
39 static void
40 _bs_cleanup (void *arg)
41 {
42 struct _bs *bs = arg;
43 monitor_unlock (bs->lock);
44 }
45
46 static int
47 refill (struct _bs *bs)
48 {
49 int status;
50 if (bs->rbuffer.base == NULL)
51 {
52 bs->rbuffer.base = calloc (1, bs->rbuffer.bufsize);
53 if (bs->rbuffer.base == NULL)
54 return ENOMEM;
55 }
56 bs->rbuffer.ptr = bs->rbuffer.base;
57 bs->rbuffer.count = 0;
58 status = stream_read (bs->stream, bs->rbuffer.ptr, bs->rbuffer.bufsize,
59 (size_t *)&(bs->rbuffer.count));
60 return status;
61 }
62
63 static int
64 _bs_add_ref (stream_t stream)
65 {
66 struct _bs *bs = (struct _bs *)stream;
67 return stream_add_ref (bs->stream);
68 }
69
70 static int
71 _bs_destroy (stream_t stream)
72 {
73 struct _bs *bs = (struct _bs *)stream;
74 stream_destroy (bs->stream);
75 monitor_destroy (bs->lock);
76 free (bs);
77 return 0;
78 }
79
80 static int
81 _bs_release (stream_t stream)
82 {
83 struct _bs *bs = (struct _bs *)stream;
84 int status = stream_release (bs->stream);
85 if (status == 0)
86 {
87 _bs_destroy (stream);
88 return 0;
89 }
90 return status;
91 }
92
93 static int
94 _bs_open (stream_t stream, const char *name, int port, int flags)
95 {
96 struct _bs *bs = (struct _bs *)stream;
97 return stream_open (bs->stream, name, port, flags);
98 }
99
100 static int
101 _bs_close (stream_t stream)
102 {
103 struct _bs *bs = (struct _bs *)stream;
104 monitor_lock (bs->lock);
105 /* Clear the buffer of any residue left. */
106 if (bs->rbuffer.base && bs->rbuffer.bufsize)
107 {
108 bs->rbuffer.ptr = bs->rbuffer.base;
109 bs->rbuffer.count = 0;
110 memset (bs->rbuffer.base, '\0', bs->rbuffer.bufsize);
111 }
112 monitor_unlock (bs->lock);
113 return stream_close (bs->stream);
114 }
115
116 /* We have to be clear about the buffering scheme, it is not designed to be
117 use as a fully fledge buffer mechanism. It is a simple mechanims for
118 networking. Lots of code between POP and IMAP can be share this way.
119 The buffering is on the read only, the writes fall through. */
120 static int
121 _bs_read (stream_t stream, char *buf, size_t count, size_t *pnread)
122 {
123 int status = 0;
124 struct _bs *bs = (struct _bs *)stream;
125
126 /* Noop. */
127 if (count == 0)
128 {
129 if (pnread)
130 *pnread = 0;
131 }
132 else if (bs->rbuffer.bufsize == 0)
133 {
134 /* If rbuffer.bufsize == 0. It means they did not want the buffer
135 mechanism, then what are we doing here? */
136 status = stream_read (bs->stream, buf, count, pnread);
137 }
138 else
139 {
140 size_t residue = count;
141 int r;
142
143 monitor_lock (bs->lock);
144 monitor_cleanup_push (_bs_cleanup, bs);
145
146 /* If the amount requested is bigger then the buffer cache size
147 bypass it. Do no waste time and let it through. */
148 if (count > bs->rbuffer.bufsize)
149 {
150 r = 0;
151 /* Drain our buffer first. */
152 if (bs->rbuffer.count > 0)
153 {
154 memcpy(buf, bs->rbuffer.ptr, bs->rbuffer.count);
155 residue -= bs->rbuffer.count;
156 buf += bs->rbuffer.count;
157 }
158 bs->rbuffer.count = 0; /* Signal we will need to refill. */
159 status = stream_read (bs->stream, buf, residue, &r);
160 residue -= r;
161 if (pnread)
162 *pnread = count - residue;
163 }
164 else
165 {
166 int done = 0;
167
168 /* Should not be necessary but guard never the less. */
169 if (bs->rbuffer.count < 0)
170 bs->rbuffer.count = 0;
171
172 /* Drain the buffer, if we have less then requested. */
173 while (residue > (size_t)(r = bs->rbuffer.count))
174 {
175 (void)memcpy (buf, bs->rbuffer.ptr, (size_t)r);
176 bs->rbuffer.ptr += r;
177 /* bs->rbuffer.count = 0 ... done in refill */
178 buf += r;
179 residue -= r;
180 status = refill (bs);
181 /* Did we reach the end. */
182 if (status != 0 || bs->rbuffer.count == 0)
183 {
184 /* We have something in the buffer return the error on the
185 next call . */
186 if (count != residue)
187 status = 0;
188 if (pnread)
189 *pnread = count - residue;
190 done = 1;
191 }
192 }
193 if (!done)
194 {
195 memcpy(buf, bs->rbuffer.ptr, residue);
196 bs->rbuffer.count -= residue;
197 bs->rbuffer.ptr += residue;
198 if (pnread)
199 *pnread = count;
200 }
201 }
202 monitor_unlock (bs->lock);
203 monitor_cleanup_pop (0);
204 }
205 return status;
206 }
207
208 /*
209 * Read at most n-1 characters.
210 * Stop when a newline has been read, or the count runs out.
211 */
212 static int
213 _bs_readline (stream_t stream, char *buf, size_t count, size_t *pnread)
214 {
215 int status = 0;
216 struct _bs *bs = (struct _bs *)stream;
217
218 /* Noop. */
219 if (count == 0)
220 {
221 if (pnread)
222 *pnread = 0;
223 }
224 else if (bs->rbuffer.bufsize == 0)
225 {
226 /* Use the provided readline. */
227 status = stream_readline (bs->stream, buf, count, pnread);
228 }
229 else /* Buffered. */
230 {
231 char *s = buf;
232 char *p, *nl;
233 size_t len;
234 size_t total = 0;
235
236 monitor_lock (bs->lock);
237 monitor_cleanup_push (_bs_cleanup, bs);
238
239 count--; /* Leave space for the null. */
240
241 while (count != 0)
242 {
243 /* If the buffer is empty refill it. */
244 len = bs->rbuffer.count;
245 if (len <= 0)
246 {
247 status = refill (bs);
248 if (status != 0 || bs->rbuffer.count == 0)
249 {
250 break;
251 }
252 len = bs->rbuffer.count;
253 }
254 p = bs->rbuffer.ptr;
255
256 /* Scan through at most n bytes of the current buffer,
257 looking for '\n'. If found, copy up to and including
258 newline, and stop. Otherwise, copy entire chunk
259 and loop. */
260 if (len > count)
261 len = count;
262 nl = memchr (p, '\n', len);
263 if (nl != NULL)
264 {
265 len = ++nl - p;
266 bs->rbuffer.count -= len;
267 bs->rbuffer.ptr = nl;
268 memcpy (s, p, len);
269 total += len;
270 s[len] = '\0';
271 if (pnread)
272 *pnread = total;
273 break;
274 }
275 bs->rbuffer.count -= len;
276 bs->rbuffer.ptr += len;
277 memcpy(s, p, len);
278 total += len;
279 s += len;
280 count -= len;
281 }
282 *s = 0;
283 if (pnread)
284 *pnread = s - buf;
285
286 monitor_unlock (bs->lock);
287 monitor_cleanup_pop (0);
288 }
289 return status;
290 }
291
292 static int
293 _bs_write (stream_t stream, const char *buf, size_t count, size_t *pnwrite)
294 {
295 struct _bs *bs = (struct _bs *)stream;
296 int err = 0;
297 size_t nwriten = 0;
298 size_t total = 0;
299 int nleft = count;
300
301 /* First try to send it all. */
302 while (nleft > 0)
303 {
304 err = stream_write (bs->stream, buf, nleft, &nwriten);
305 if (err != 0 || nwriten == 0)
306 break;
307 nleft -= nwriten;
308 total += nwriten;
309 buf += nwriten;
310 }
311 if (pnwrite)
312 *pnwrite = total;
313 return err;
314 }
315
316 static int
317 _bs_get_fd (stream_t stream, int *pfd)
318 {
319 struct _bs *bs = (struct _bs *)stream;
320 return stream_get_fd (bs->stream, pfd);
321 }
322
323 static int
324 _bs_get_flags (stream_t stream, int *pfl)
325 {
326 struct _bs *bs = (struct _bs *)stream;
327 return stream_get_flags (bs->stream, pfl);
328 }
329
330 static int
331 _bs_get_size (stream_t stream, off_t *psize)
332 {
333 struct _bs *bs = (struct _bs *)stream;
334 return stream_get_size (bs->stream, psize);
335 }
336
337 static int
338 _bs_truncate (stream_t stream, off_t len)
339 {
340 struct _bs *bs = (struct _bs *)stream;
341 return stream_truncate (bs->stream, len);
342 }
343
344 static int
345 _bs_flush (stream_t stream)
346 {
347 struct _bs *bs = (struct _bs *)stream;
348 return stream_flush (bs->stream);
349 }
350
351 static int
352 _bs_get_state (stream_t stream, enum stream_state *pstate)
353 {
354 struct _bs *bs = (struct _bs *)stream;
355 return stream_get_state (bs->stream, pstate);
356 }
357
358 static int
359 _bs_seek (stream_t stream, off_t off, enum stream_whence whence)
360 {
361 struct _bs *bs = (struct _bs *)stream;
362 return stream_seek (bs->stream, off, whence);
363 }
364
365 static int
366 _bs_tell (stream_t stream, off_t *off)
367 {
368 struct _bs *bs = (struct _bs *)stream;
369 return stream_tell (bs->stream, off);
370 }
371
372 static struct _stream_vtable _bs_vtable =
373 {
374 _bs_add_ref,
375 _bs_release,
376 _bs_destroy,
377
378 _bs_open,
379 _bs_close,
380
381 _bs_read,
382 _bs_readline,
383 _bs_write,
384
385 _bs_seek,
386 _bs_tell,
387
388 _bs_get_size,
389 _bs_truncate,
390 _bs_flush,
391
392 _bs_get_fd,
393 _bs_get_flags,
394 _bs_get_state,
395 };
396
397 int
398 stream_buffer_create (stream_t *pstream, stream_t stream, size_t bufsize)
399 {
400 struct _bs *bs;
401
402 if (pstream == NULL || stream == NULL || bufsize == 0)
403 return MU_ERROR_INVALID_PARAMETER;
404
405 bs = calloc (1, sizeof *bs);
406 if (bs == NULL)
407 return MU_ERROR_NO_MEMORY;
408
409 bs->base.vtable = &_bs_vtable;
410 bs->ref = 1;
411 bs->stream = stream;
412 bs->rbuffer.bufsize = bufsize;
413 monitor_create (&(bs->lock));
414 *pstream = &bs->base;
415 return 0;
416 }
...@@ -73,6 +73,7 @@ extern int stream_file_create __P ((stream_t *)); ...@@ -73,6 +73,7 @@ extern int stream_file_create __P ((stream_t *));
73 extern int stream_mapfile_create __P ((stream_t *)); 73 extern int stream_mapfile_create __P ((stream_t *));
74 extern int stream_memory_create __P ((stream_t *)); 74 extern int stream_memory_create __P ((stream_t *));
75 extern int stream_tcp_create __P ((stream_t *)); 75 extern int stream_tcp_create __P ((stream_t *));
76 extern int stream_buffer_create __P ((stream_t *, stream_t, size_t));
76 77
77 __MAILUTILS_END_DECLS 78 __MAILUTILS_END_DECLS
78 79
......
1 /* GNU mailutils - a suite of utilities for electronic mail
2 Copyright (C) 1999, 2000, 2001 Free Software Foundation, Inc.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU Library General Public License as published by
6 the Free Software Foundation; either version 2, or (at your option)
7 any later version.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU Library General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */
17
18 #ifndef MAILUTILS_SYS_TCP_H
19 #define MAILUTILS_SYS_TCP_H
20
21 #include <mailutils/sys/stream.h>
22 #include <mailutils/monitor.h>
23
24 /* Read buffer */
25 struct _rbuffer
26 {
27 char *base;
28 char *ptr;
29 int count;
30 size_t bufsize;
31 };
32
33 struct _bs
34 {
35 struct _stream base;
36 int ref;
37 stream_t stream;
38 struct _rbuffer rbuffer;
39 monitor_t lock;
40 };
41
42 #endif /* _MAILUTILS_SYS_TCP_H */
...@@ -45,9 +45,9 @@ struct _iterator ...@@ -45,9 +45,9 @@ struct _iterator
45 #define iterator_release(i) ((i)->vtable->release)(i) 45 #define iterator_release(i) ((i)->vtable->release)(i)
46 #define iterator_destroy(i) ((i)->vtable->destroy)(i) 46 #define iterator_destroy(i) ((i)->vtable->destroy)(i)
47 47
48 #define iterator_firs(i) ((i)->vtable->first)(i) 48 #define iterator_first(i) ((i)->vtable->first)(i)
49 #define iterator_next(i) ((i)->vtable->next)(i) 49 #define iterator_next(i) ((i)->vtable->next)(i)
50 #define iterator_current(i, a) ((i)->vtable->current)(i, a) 50 #define iterator_current(i,a) ((i)->vtable->current)(i,a)
51 #define iterator_is_done(i) ((i)->vtable->is_done)(i) 51 #define iterator_is_done(i) ((i)->vtable->is_done)(i)
52 52
53 __MAILUTILS_END_DECLS 53 __MAILUTILS_END_DECLS
......
...@@ -53,26 +53,27 @@ struct _stream ...@@ -53,26 +53,27 @@ struct _stream
53 struct _stream_vtable *vtable; 53 struct _stream_vtable *vtable;
54 }; 54 };
55 55
56 #define stream_destroy(s) ((s)->vtable->destroy)(s) 56 #define stream_add_ref(s) ((s)->vtable->add_ref)(s)
57 #define stream_release(s) ((s)->vtable->release)(s) 57 #define stream_release(s) ((s)->vtable->release)(s)
58 #define stream_destroy(s) ((s)->vtable->destroy)(s)
58 59
59 #define stream_open(s,h,p,f) ((s)->vtable->open)(s,h,p,f) 60 #define stream_open(s,h,p,f) ((s)->vtable->open)(s,h,p,f)
60 #define stream_close(s) ((s)->vtable->close)(s) 61 #define stream_close(s) ((s)->vtable->close)(s)
61 62
62 #define stream_read(s, b, l, n) ((s)->vtable->read)(s, b, l, n) 63 #define stream_read(s,b,l,n) ((s)->vtable->read)(s,b,l,n)
63 #define stream_readline(s, b, l, n) ((s)->vtable->readline)(s, b, l, n) 64 #define stream_readline(s,b,l,n) ((s)->vtable->readline)(s,b,l,n)
64 #define stream_write(s, b, l, n) ((s)->vtable->write)(s, b, l, n) 65 #define stream_write(s,b,l,n) ((s)->vtable->write)(s,b,l,n)
65 66
66 #define stream_seek(s, o, w) ((s)->vtable->seek)(s, o, w) 67 #define stream_seek(s,o,w) ((s)->vtable->seek)(s,o,w)
67 #define stream_tell(s, o) ((s)->vtable->tell)(s, o) 68 #define stream_tell(s,o) ((s)->vtable->tell)(s,o)
68 69
69 #define stream_get_size(s, o) ((s)->vtable->get_size)(s, o) 70 #define stream_get_size(s,o) ((s)->vtable->get_size)(s,o)
70 #define stream_flush(s) ((s)->vtable->flush)(s) 71 #define stream_flush(s) ((s)->vtable->flush)(s)
71 #define stream_truncate(s, o) ((s)->vtable->truncate)(s, o) 72 #define stream_truncate(s,o) ((s)->vtable->truncate)(s,o)
72 73
73 #define stream_get_fd(s, f) ((s)->vtable->get_fd)(s, f) 74 #define stream_get_fd(s,f) ((s)->vtable->get_fd)(s,f)
74 #define stream_get_flags(s, f) ((s)->vtable->get_flags)(s, f) 75 #define stream_get_flags(s,f) ((s)->vtable->get_flags)(s,f)
75 #define stream_get_state(s, f) ((s)->vtable->get_state)(s, f) 76 #define stream_get_state(s,f) ((s)->vtable->get_state)(s,f)
76 77
77 __MAILUTILS_END_DECLS 78 __MAILUTILS_END_DECLS
78 79
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
18 #ifndef MAILUTILS_SYS_TCP_H 18 #ifndef MAILUTILS_SYS_TCP_H
19 #define MAILUTILS_SYS_TCP_H 19 #define MAILUTILS_SYS_TCP_H
20 20
21 #include <mailutils/monitor.h>
21 #include <mailutils/sys/stream.h> 22 #include <mailutils/sys/stream.h>
22 23
23 enum _tcp_state 24 enum _tcp_state
...@@ -39,6 +40,7 @@ struct _tcp_instance ...@@ -39,6 +40,7 @@ struct _tcp_instance
39 int state; 40 int state;
40 int flags; 41 int flags;
41 unsigned long address; 42 unsigned long address;
43 monitor_t lock;
42 }; 44 };
43 45
44 #endif /* _MAILUTILS_SYS_TCP_H */ 46 #endif /* _MAILUTILS_SYS_TCP_H */
......
...@@ -41,23 +41,33 @@ ...@@ -41,23 +41,33 @@
41 # define INADDR_NONE (unsigned long)-1 41 # define INADDR_NONE (unsigned long)-1
42 #endif 42 #endif
43 43
44 static void
45 _tcp_cleanup (void *arg)
46 {
47 struct _tcp_instance *tcp = arg;
48 monitor_unlock (tcp->lock);
49 }
50
44 static int 51 static int
45 _tcp_add_ref (stream_t stream) 52 _tcp_add_ref (stream_t stream)
46 { 53 {
47 struct _tcp_instance *tcp = (struct _tcp_instance *)stream; 54 struct _tcp_instance *tcp = (struct _tcp_instance *)stream;
48 return ++tcp->ref; 55 int status;
56 monitor_lock (tcp->lock);
57 status = ++tcp->ref;
58 monitor_unlock (tcp->lock);
59 return status;
49 } 60 }
50 61
51 static int 62 static int
52 _tcp_destroy (stream_t stream) 63 _tcp_destroy (stream_t stream)
53 { 64 {
54 struct _tcp_instance *tcp = (struct _tcp_instance *)stream; 65 struct _tcp_instance *tcp = (struct _tcp_instance *)stream;
55
56 if (tcp->host) 66 if (tcp->host)
57 free (tcp->host); 67 free (tcp->host);
58 if (tcp->fd != -1) 68 if (tcp->fd != -1)
59 close (tcp->fd); 69 close (tcp->fd);
60 70 monitor_destroy (tcp->lock);
61 free (tcp); 71 free (tcp);
62 return 0; 72 return 0;
63 } 73 }
...@@ -65,20 +75,24 @@ _tcp_destroy (stream_t stream) ...@@ -65,20 +75,24 @@ _tcp_destroy (stream_t stream)
65 static int 75 static int
66 _tcp_release (stream_t stream) 76 _tcp_release (stream_t stream)
67 { 77 {
78 int status;
68 struct _tcp_instance *tcp = (struct _tcp_instance *)stream; 79 struct _tcp_instance *tcp = (struct _tcp_instance *)stream;
69 if (--tcp->ref == 0) 80 monitor_lock (tcp->lock);
81 status = --tcp->ref;
82 if (status <= 0)
70 { 83 {
84 monitor_unlock (tcp->lock);
71 _tcp_destroy (stream); 85 _tcp_destroy (stream);
72 return 0; 86 return 0;
73 } 87 }
74 return tcp->ref; 88 monitor_unlock (tcp->lock);
89 return status;
75 } 90 }
76 91
77 static int 92 static int
78 _tcp_close (stream_t stream) 93 _tcp_close0 (stream_t stream)
79 { 94 {
80 struct _tcp_instance *tcp = (struct _tcp_instance *)stream; 95 struct _tcp_instance *tcp = (struct _tcp_instance *)stream;
81
82 if (tcp->fd != -1) 96 if (tcp->fd != -1)
83 close (tcp->fd); 97 close (tcp->fd);
84 tcp->fd = -1; 98 tcp->fd = -1;
...@@ -87,7 +101,20 @@ _tcp_close (stream_t stream) ...@@ -87,7 +101,20 @@ _tcp_close (stream_t stream)
87 } 101 }
88 102
89 static int 103 static int
90 _tcp_open (stream_t stream, const char *host, int port, int flags) 104 _tcp_close (stream_t stream)
105 {
106 struct _tcp_instance *tcp = (struct _tcp_instance *)stream;
107
108 monitor_lock (tcp->lock);
109 monitor_cleanup_push (_tcp_cleanup, tcp);
110 _tcp_close0 (stream);
111 monitor_unlock (tcp->lock);
112 monitor_cleanup_pop (0);
113 return 0;
114 }
115
116 static int
117 _tcp_open0 (stream_t stream, const char *host, int port, int flags)
91 { 118 {
92 struct _tcp_instance *tcp = (struct _tcp_instance *)stream; 119 struct _tcp_instance *tcp = (struct _tcp_instance *)stream;
93 int flgs, ret; 120 int flgs, ret;
...@@ -131,7 +158,7 @@ _tcp_open (stream_t stream, const char *host, int port, int flags) ...@@ -131,7 +158,7 @@ _tcp_open (stream_t stream, const char *host, int port, int flags)
131 phe = gethostbyname (tcp->host); 158 phe = gethostbyname (tcp->host);
132 if (!phe) 159 if (!phe)
133 { 160 {
134 _tcp_close (stream); 161 _tcp_close0 (stream);
135 return MU_ERROR_INVALID_PARAMETER; 162 return MU_ERROR_INVALID_PARAMETER;
136 } 163 }
137 tcp->address = *(((unsigned long **)phe->h_addr_list)[0]); 164 tcp->address = *(((unsigned long **)phe->h_addr_list)[0]);
...@@ -153,7 +180,7 @@ _tcp_open (stream_t stream, const char *host, int port, int flags) ...@@ -153,7 +180,7 @@ _tcp_open (stream_t stream, const char *host, int port, int flags)
153 ret = MU_ERROR_TRY_AGAIN; 180 ret = MU_ERROR_TRY_AGAIN;
154 } 181 }
155 else 182 else
156 _tcp_close (stream); 183 _tcp_close0 (stream);
157 return ret; 184 return ret;
158 } 185 }
159 tcp->state = TCP_STATE_CONNECTING; 186 tcp->state = TCP_STATE_CONNECTING;
...@@ -165,7 +192,7 @@ _tcp_open (stream_t stream, const char *host, int port, int flags) ...@@ -165,7 +192,7 @@ _tcp_open (stream_t stream, const char *host, int port, int flags)
165 else 192 else
166 { 193 {
167 ret = errno; 194 ret = errno;
168 _tcp_close (stream); 195 _tcp_close0 (stream);
169 return ret; 196 return ret;
170 } 197 }
171 break; 198 break;
...@@ -174,6 +201,19 @@ _tcp_open (stream_t stream, const char *host, int port, int flags) ...@@ -174,6 +201,19 @@ _tcp_open (stream_t stream, const char *host, int port, int flags)
174 } 201 }
175 202
176 static int 203 static int
204 _tcp_open (stream_t stream, const char *host, int port, int flags)
205 {
206 int status;
207 struct _tcp_instance *tcp = (struct _tcp_instance *)stream;
208 monitor_lock (tcp->lock);
209 monitor_cleanup_push (_tcp_cleanup, tcp);
210 status = _tcp_open0 (stream, host, port, flags);
211 monitor_unlock (tcp->lock);
212 monitor_cleanup_pop (0);
213 return status;
214 }
215
216 static int
177 _tcp_get_fd (stream_t stream, int *fd) 217 _tcp_get_fd (stream_t stream, int *fd)
178 { 218 {
179 struct _tcp_instance *tcp = (struct _tcp_instance *)stream; 219 struct _tcp_instance *tcp = (struct _tcp_instance *)stream;
...@@ -292,7 +332,7 @@ static int ...@@ -292,7 +332,7 @@ static int
292 _tcp_flush (stream_t stream) 332 _tcp_flush (stream_t stream)
293 { 333 {
294 (void)stream; 334 (void)stream;
295 return MU_ERROR_NOT_SUPPORTED; 335 return 0;
296 } 336 }
297 337
298 static int 338 static int
......