Commit 2203753f 2203753f72c89a194889baa199a513e3ed02e5b3 by Sergey Poznyakoff

Implement seek on filters in read-only mode.

* include/mailutils/stream.h (mu_stream_skip_input_bytes): New proto.
* mailbox/stream.c (mu_stream_skip_input_bytes): New function.
* mailbox/fltstream.c (filter_seek): Re-implement on top of
mu_stream_skip_input_bytes.
(mu_filter_stream_create): Fix flag validity checking.

* examples/base64.c: Add new option (-s) for testing seek operations
on filters.
1 parent 0f2fc18a
...@@ -53,8 +53,9 @@ main (int argc, char * argv []) ...@@ -53,8 +53,9 @@ main (int argc, char * argv [])
53 int flags = MU_STREAM_READ; 53 int flags = MU_STREAM_READ;
54 char *input = NULL, *output = NULL; 54 char *input = NULL, *output = NULL;
55 char *encoding = "base64"; 55 char *encoding = "base64";
56 56 mu_off_t shift = 0;
57 while ((c = getopt (argc, argv, "deE:hi:o:pvw")) != EOF) 57
58 while ((c = getopt (argc, argv, "deE:hi:o:ps:vw")) != EOF)
58 switch (c) 59 switch (c)
59 { 60 {
60 case 'i': 61 case 'i':
...@@ -80,6 +81,10 @@ main (int argc, char * argv []) ...@@ -80,6 +81,10 @@ main (int argc, char * argv [])
80 case 'p': 81 case 'p':
81 printable = 1; /* FIXME: Not implemented */ 82 printable = 1; /* FIXME: Not implemented */
82 break; 83 break;
84
85 case 's':
86 shift = strtoul (optarg, NULL, 0);
87 break;
83 88
84 case 'v': 89 case 'v':
85 verbose = 1; 90 verbose = 1;
...@@ -98,7 +103,7 @@ main (int argc, char * argv []) ...@@ -98,7 +103,7 @@ main (int argc, char * argv [])
98 } 103 }
99 104
100 if (input) 105 if (input)
101 MU_ASSERT (mu_file_stream_create (&in, input, MU_STREAM_READ)); 106 MU_ASSERT (mu_file_stream_create (&in, input, MU_STREAM_READ|MU_STREAM_SEEK));
102 else 107 else
103 MU_ASSERT (mu_stdio_stream_create (&in, MU_STDIN_FD, 0)); 108 MU_ASSERT (mu_stdio_stream_create (&in, MU_STDIN_FD, 0));
104 MU_ASSERT (mu_stream_open (in)); 109 MU_ASSERT (mu_stream_open (in));
...@@ -112,13 +117,18 @@ main (int argc, char * argv []) ...@@ -112,13 +117,18 @@ main (int argc, char * argv [])
112 117
113 if (flags == MU_STREAM_READ) 118 if (flags == MU_STREAM_READ)
114 { 119 {
115 MU_ASSERT (mu_filter_create (&flt, in, encoding, mode, MU_STREAM_READ)); 120 MU_ASSERT (mu_filter_create (&flt, in, encoding, mode,
121 MU_STREAM_READ|MU_STREAM_SEEK));
122 if (shift)
123 MU_ASSERT (mu_stream_seek (flt, shift, MU_SEEK_SET, NULL));
116 c_copy (out, flt); 124 c_copy (out, flt);
117 } 125 }
118 else 126 else
119 { 127 {
120 MU_ASSERT (mu_filter_create (&flt, out, encoding, mode, 128 MU_ASSERT (mu_filter_create (&flt, out, encoding, mode,
121 MU_STREAM_WRITE)); 129 MU_STREAM_WRITE));
130 if (shift)
131 MU_ASSERT (mu_stream_seek (in, shift, MU_SEEK_SET, NULL));
122 c_copy (flt, in); 132 c_copy (flt, in);
123 } 133 }
124 134
......
...@@ -71,6 +71,9 @@ void mu_stream_clearerr (mu_stream_t stream); ...@@ -71,6 +71,9 @@ void mu_stream_clearerr (mu_stream_t stream);
71 int mu_stream_eof (mu_stream_t stream); 71 int mu_stream_eof (mu_stream_t stream);
72 int mu_stream_seek (mu_stream_t stream, mu_off_t offset, int whence, 72 int mu_stream_seek (mu_stream_t stream, mu_off_t offset, int whence,
73 mu_off_t *pres); 73 mu_off_t *pres);
74 int mu_stream_skip_input_bytes (mu_stream_t stream, mu_off_t count,
75 mu_off_t *pres);
76
74 int mu_stream_set_buffer (mu_stream_t stream, enum mu_buffer_type type, 77 int mu_stream_set_buffer (mu_stream_t stream, enum mu_buffer_type type,
75 size_t size); 78 size_t size);
76 int mu_stream_read (mu_stream_t stream, void *buf, size_t size, size_t *pread); 79 int mu_stream_read (mu_stream_t stream, void *buf, size_t size, size_t *pread);
......
...@@ -313,12 +313,17 @@ filter_wr_flush (mu_stream_t stream) ...@@ -313,12 +313,17 @@ filter_wr_flush (mu_stream_t stream)
313 return rc; 313 return rc;
314 } 314 }
315 315
316 /* FIXME: Seeks in the *transport* stream */
317 static int 316 static int
318 filter_seek (struct _mu_stream *stream, mu_off_t off, mu_off_t *ppos) 317 filter_seek (struct _mu_stream *stream, mu_off_t off, mu_off_t *ppos)
319 { 318 {
320 struct _mu_filter_stream *fs = (struct _mu_filter_stream *)stream; 319 struct _mu_filter_stream *fs = (struct _mu_filter_stream *)stream;
321 return mu_stream_seek (fs->transport, off, MU_SEEK_SET, ppos); 320 int status;
321
322 status = mu_stream_seek (fs->transport, 0, MU_SEEK_SET, NULL);
323 if (status)
324 return status;
325 stream->offset = 0;
326 return mu_stream_skip_input_bytes (stream, off, ppos);
322 } 327 }
323 328
324 static int 329 static int
...@@ -392,7 +397,7 @@ mu_filter_stream_create (mu_stream_t *pflt, ...@@ -392,7 +397,7 @@ mu_filter_stream_create (mu_stream_t *pflt,
392 397
393 if ((flags & MU_STREAM_RDWR) == MU_STREAM_RDWR 398 if ((flags & MU_STREAM_RDWR) == MU_STREAM_RDWR
394 || !(flags & MU_STREAM_RDWR) 399 || !(flags & MU_STREAM_RDWR)
395 || (flags & MU_STREAM_SEEK)) 400 || (flags & (MU_STREAM_WRITE|MU_STREAM_SEEK)) == (MU_STREAM_WRITE|MU_STREAM_SEEK))
396 return EINVAL; 401 return EINVAL;
397 402
398 fs = (struct _mu_filter_stream *) _mu_stream_create (sizeof (*fs), flags); 403 fs = (struct _mu_filter_stream *) _mu_stream_create (sizeof (*fs), flags);
......
...@@ -334,6 +334,71 @@ mu_stream_seek (mu_stream_t stream, mu_off_t offset, int whence, ...@@ -334,6 +334,71 @@ mu_stream_seek (mu_stream_t stream, mu_off_t offset, int whence,
334 return 0; 334 return 0;
335 } 335 }
336 336
337 /* Skip COUNT bytes from the current position in stream by reading from
338 it. Return new offset in PRES.
339
340 Return 0 on success, EACCES if STREAM was not opened for reading.
341 Another non-zero exit codes are propagated from the underlying
342 input operations.
343
344 This function is designed to help implement seek method in otherwise
345 unseekable streams (such as filters). Do not use it if you absolutely
346 have to. Using it on an unbuffered stream is a terrible waste of CPU. */
347 int
348 mu_stream_skip_input_bytes (mu_stream_t stream, mu_off_t count, mu_off_t *pres)
349 {
350 mu_off_t pos;
351 int rc;
352
353 if (!(stream->flags & MU_STREAM_READ))
354 return _stream_seterror (stream, EACCES, 1);
355
356 if (stream->buftype == mu_buffer_none)
357 {
358 for (pos = 0; pos < count; pos++)
359 {
360 char c;
361 size_t nrd;
362 rc = mu_stream_read (stream, &c, 1, &nrd);
363 if (nrd == 0)
364 rc = ESPIPE;
365 if (rc)
366 break;
367 }
368 }
369 else
370 {
371 if ((rc = _stream_flush_buffer (stream, 1)))
372 return rc;
373 for (pos = 0;;)
374 {
375 if (stream->level == 0)
376 {
377 rc = _stream_fill_buffer (stream);
378 if (rc)
379 break;
380 if (stream->level == 0)
381 {
382 rc = ESPIPE;
383 break;
384 }
385 }
386 if (pos <= count && count < pos + stream->level)
387 {
388 rc = 0;
389 stream->cur = stream->buffer + count - pos;
390 pos = count;
391 break;
392 }
393 }
394 }
395
396 stream->offset += pos;
397 if (pres)
398 *pres = stream->offset;
399 return rc;
400 }
401
337 int 402 int
338 mu_stream_set_buffer (mu_stream_t stream, enum mu_buffer_type type, 403 mu_stream_set_buffer (mu_stream_t stream, enum mu_buffer_type type,
339 size_t size) 404 size_t size)
......