Commit 3733cba7 3733cba7d0a04600f1119858956f923bbf820681 by Sergey Poznyakoff

Rewrite stream flushing code

* libmailutils/stream/stream.c (_stream_flush_buffer): Rewrite.
1 parent 07be38a0
...@@ -34,9 +34,6 @@ ...@@ -34,9 +34,6 @@
34 34
35 size_t mu_stream_default_buffer_size = MU_STREAM_DEFBUFSIZ; 35 size_t mu_stream_default_buffer_size = MU_STREAM_DEFBUFSIZ;
36 36
37 #define _MU_STR_FLUSH_ALL 0x01
38 #define _MU_STR_FLUSH_KEEP 0x02
39
40 #define _stream_event(stream, code, n, p) \ 37 #define _stream_event(stream, code, n, p) \
41 do \ 38 do \
42 { \ 39 { \
...@@ -194,12 +191,17 @@ _stream_buffer_full_p (struct _mu_stream *stream) ...@@ -194,12 +191,17 @@ _stream_buffer_full_p (struct _mu_stream *stream)
194 return 0; 191 return 0;
195 } 192 }
196 193
194 enum
195 {
196 FLUSH_WRITE, /* Flush only modified data. Keep buffer level and position
197 intact */
198 FLUSH_RDWR /* Flush modified data and reset buffer level and position */
199 };
200
197 static int 201 static int
198 _stream_flush_buffer (struct _mu_stream *stream, int flags) 202 _stream_flush_buffer (struct _mu_stream *stream, int what)
199 { 203 {
200 int rc; 204 int rc;
201 char *start, *end;
202 size_t wrsize;
203 205
204 if (stream->flags & _MU_STR_DIRTY) 206 if (stream->flags & _MU_STR_DIRTY)
205 { 207 {
...@@ -211,73 +213,28 @@ _stream_flush_buffer (struct _mu_stream *stream, int flags) ...@@ -211,73 +213,28 @@ _stream_flush_buffer (struct _mu_stream *stream, int flags)
211 return rc; 213 return rc;
212 } 214 }
213 215
214 switch (stream->buftype) 216 if ((rc = _stream_write_unbuffered (stream, stream->buffer,
215 { 217 stream->level,
216 case mu_buffer_none: 218 1, NULL)))
217 abort(); /* should not happen */ 219 return rc;
218 220 _stream_event (stream, _MU_STR_EVENT_FLUSHBUF,
219 case mu_buffer_full: 221 stream->level, stream->buffer);
220 if ((rc = _stream_write_unbuffered (stream, stream->buffer,
221 stream->level,
222 1, NULL)))
223 return rc;
224 _stream_event (stream, _MU_STR_EVENT_FLUSHBUF,
225 stream->level, stream->buffer);
226 break;
227
228 case mu_buffer_line:
229 if (stream->level == 0)
230 break;
231 wrsize = stream->level;
232 for (start = stream->buffer, end = memchr (start, '\n', wrsize);
233 end;
234 end = memchr (start, '\n', wrsize))
235 {
236 size_t size = end - start + 1;
237 rc = _stream_write_unbuffered (stream, start, size,
238 1, NULL);
239 if (rc)
240 return rc;
241 _stream_event (stream, _MU_STR_EVENT_FLUSHBUF,
242 size, start);
243 start += size;
244 wrsize -= size;
245 if (wrsize == 0)
246 break;
247 }
248 if (((flags & _MU_STR_FLUSH_ALL) && wrsize) ||
249 wrsize == stream->level)
250 {
251 rc = _stream_write_unbuffered (stream,
252 stream->buffer,
253 wrsize,
254 1, NULL);
255 if (rc)
256 return rc;
257 _stream_event (stream, _MU_STR_EVENT_FLUSHBUF,
258 wrsize, stream->buffer);
259 wrsize = 0;
260 }
261 if (!(flags & _MU_STR_FLUSH_KEEP))
262 {
263 if (wrsize)
264 memmove (stream->buffer, start, wrsize);
265 else
266 _stream_clrflag (stream, _MU_STR_DIRTY);
267 stream->offset += stream->level - wrsize;
268 stream->level = stream->pos = wrsize;
269 return 0;
270 }
271 }
272 _stream_clrflag (stream, _MU_STR_DIRTY); 222 _stream_clrflag (stream, _MU_STR_DIRTY);
223
224 if (stream->pos < stream->level)
225 memmove (stream->buffer, stream->buffer + stream->pos,
226 stream->level - stream->pos);
227 stream->offset += stream->pos;
228 stream->level -= stream->pos;
229 stream->pos = 0;
273 } 230 }
274 231
275 if (!(flags & _MU_STR_FLUSH_KEEP)) 232 if (what)
276 { 233 {
277 stream->offset += stream->level; 234 stream->offset += stream->level;
278 stream->pos = stream->level = 0; 235 stream->pos = stream->level = 0;
279 } 236 }
280 237
281 return 0; 238 return 0;
282 } 239 }
283 240
...@@ -462,7 +419,7 @@ mu_stream_seek (mu_stream_t stream, mu_off_t offset, int whence, ...@@ -462,7 +419,7 @@ mu_stream_seek (mu_stream_t stream, mu_off_t offset, int whence,
462 : (stream->offset <= offset && 419 : (stream->offset <= offset &&
463 offset < stream->offset + stream->level))) 420 offset < stream->offset + stream->level)))
464 { 421 {
465 if ((rc = _stream_flush_buffer (stream, _MU_STR_FLUSH_ALL))) 422 if ((rc = _stream_flush_buffer (stream, FLUSH_RDWR)))
466 return rc; 423 return rc;
467 if (stream->offset != offset) 424 if (stream->offset != offset)
468 { 425 {
...@@ -524,7 +481,7 @@ _stream_skip_input_bytes (mu_stream_t stream, mu_off_t count, mu_off_t *pres) ...@@ -524,7 +481,7 @@ _stream_skip_input_bytes (mu_stream_t stream, mu_off_t count, mu_off_t *pres)
524 { 481 {
525 if (pos || stream->level == 0) 482 if (pos || stream->level == 0)
526 { 483 {
527 if ((rc = _stream_flush_buffer (stream, _MU_STR_FLUSH_ALL))) 484 if ((rc = _stream_flush_buffer (stream, FLUSH_RDWR)))
528 return rc; 485 return rc;
529 rc = _stream_fill_buffer (stream); 486 rc = _stream_fill_buffer (stream);
530 if (rc) 487 if (rc)
...@@ -762,10 +719,6 @@ mu_stream_read (mu_stream_t stream, void *buf, size_t size, size_t *pread) ...@@ -762,10 +719,6 @@ mu_stream_read (mu_stream_t stream, void *buf, size_t size, size_t *pread)
762 char *bufp = buf; 719 char *bufp = buf;
763 size_t nbytes = 0; 720 size_t nbytes = 0;
764 int rc; 721 int rc;
765
766 if ((rc = _stream_flush_buffer (stream,
767 _MU_STR_FLUSH_ALL|_MU_STR_FLUSH_KEEP)))
768 return rc;
769 722
770 while (size) 723 while (size)
771 { 724 {
...@@ -773,7 +726,7 @@ mu_stream_read (mu_stream_t stream, void *buf, size_t size, size_t *pread) ...@@ -773,7 +726,7 @@ mu_stream_read (mu_stream_t stream, void *buf, size_t size, size_t *pread)
773 726
774 if (stream->pos == stream->level) 727 if (stream->pos == stream->level)
775 { 728 {
776 if ((rc = _stream_flush_buffer (stream, _MU_STR_FLUSH_ALL))) 729 if ((rc = _stream_flush_buffer (stream, FLUSH_RDWR)))
777 { 730 {
778 if (nbytes) 731 if (nbytes)
779 break; 732 break;
...@@ -824,7 +777,7 @@ _stream_scandelim (mu_stream_t stream, char *buf, size_t size, int delim, ...@@ -824,7 +777,7 @@ _stream_scandelim (mu_stream_t stream, char *buf, size_t size, int delim,
824 777
825 if (stream->pos == stream->level) 778 if (stream->pos == stream->level)
826 { 779 {
827 if ((rc = _stream_flush_buffer (stream, _MU_STR_FLUSH_ALL))) 780 if ((rc = _stream_flush_buffer (stream, FLUSH_RDWR)))
828 break; 781 break;
829 if ((rc = _stream_fill_buffer (stream)) || stream->level == 0) 782 if ((rc = _stream_fill_buffer (stream)) || stream->level == 0)
830 break; 783 break;
...@@ -908,8 +861,7 @@ mu_stream_readdelim (mu_stream_t stream, char *buf, size_t size, ...@@ -908,8 +861,7 @@ mu_stream_readdelim (mu_stream_t stream, char *buf, size_t size,
908 } 861 }
909 else 862 else
910 { 863 {
911 if ((rc = _stream_flush_buffer (stream, 864 if ((rc = _stream_flush_buffer (stream, FLUSH_WRITE)))
912 _MU_STR_FLUSH_ALL|_MU_STR_FLUSH_KEEP)))
913 return rc; 865 return rc;
914 rc = _stream_scandelim (stream, buf, size, delim, pread); 866 rc = _stream_scandelim (stream, buf, size, delim, pread);
915 } 867 }
...@@ -940,8 +892,7 @@ mu_stream_getdelim (mu_stream_t stream, char **pbuf, size_t *psize, ...@@ -940,8 +892,7 @@ mu_stream_getdelim (mu_stream_t stream, char **pbuf, size_t *psize,
940 _stream_init (stream); 892 _stream_init (stream);
941 } 893 }
942 894
943 if ((rc = _stream_flush_buffer (stream, 895 if ((rc = _stream_flush_buffer (stream, FLUSH_WRITE)))
944 _MU_STR_FLUSH_ALL|_MU_STR_FLUSH_KEEP)))
945 return rc; 896 return rc;
946 897
947 if (lineptr == NULL || n == 0) 898 if (lineptr == NULL || n == 0)
...@@ -1052,7 +1003,7 @@ mu_stream_write (mu_stream_t stream, const void *buf, size_t size, ...@@ -1052,7 +1003,7 @@ mu_stream_write (mu_stream_t stream, const void *buf, size_t size,
1052 size_t n; 1003 size_t n;
1053 1004
1054 if (_stream_buffer_full_p (stream) 1005 if (_stream_buffer_full_p (stream)
1055 && (rc = _stream_flush_buffer (stream, 0))) 1006 && (rc = _stream_flush_buffer (stream, FLUSH_RDWR)))
1056 break; 1007 break;
1057 1008
1058 if (size == 0) 1009 if (size == 0)
...@@ -1100,7 +1051,7 @@ mu_stream_flush (mu_stream_t stream) ...@@ -1100,7 +1051,7 @@ mu_stream_flush (mu_stream_t stream)
1100 return MU_ERR_NOT_OPEN; 1051 return MU_ERR_NOT_OPEN;
1101 _stream_init (stream); 1052 _stream_init (stream);
1102 } 1053 }
1103 rc = _stream_flush_buffer (stream, _MU_STR_FLUSH_ALL); 1054 rc = _stream_flush_buffer (stream, FLUSH_RDWR);
1104 if (rc) 1055 if (rc)
1105 return rc; 1056 return rc;
1106 if ((stream->flags & _MU_STR_WRT) && stream->flush) 1057 if ((stream->flags & _MU_STR_WRT) && stream->flush)
...@@ -1160,7 +1111,7 @@ mu_stream_ioctl (mu_stream_t stream, int family, int opcode, void *ptr) ...@@ -1160,7 +1111,7 @@ mu_stream_ioctl (mu_stream_t stream, int family, int opcode, void *ptr)
1160 { 1111 {
1161 int rc; 1112 int rc;
1162 _bootstrap_event (stream); 1113 _bootstrap_event (stream);
1163 if ((rc = _stream_flush_buffer (stream, _MU_STR_FLUSH_ALL|_MU_STR_FLUSH_KEEP))) 1114 if ((rc = _stream_flush_buffer (stream, FLUSH_WRITE)))
1164 return rc; 1115 return rc;
1165 if (stream->ctl == NULL) 1116 if (stream->ctl == NULL)
1166 return ENOSYS; 1117 return ENOSYS;
...@@ -1231,7 +1182,7 @@ mu_stream_truncate (mu_stream_t stream, mu_off_t size) ...@@ -1231,7 +1182,7 @@ mu_stream_truncate (mu_stream_t stream, mu_off_t size)
1231 { 1182 {
1232 int rc; 1183 int rc;
1233 1184
1234 if ((rc = _stream_flush_buffer (stream, _MU_STR_FLUSH_ALL))) 1185 if ((rc = _stream_flush_buffer (stream, FLUSH_RDWR)))
1235 return rc; 1186 return rc;
1236 return stream->truncate (stream, size); 1187 return stream->truncate (stream, size);
1237 } 1188 }
......