Blame view

mailbox/stream.c 15.8 KB
1
/* GNU Mailutils -- a suite of utilities for electronic mail
2
   Copyright (C) 2009, 2010 Free Software Foundation, Inc.
Alain Magloire authored
3

4 5 6 7
   GNU Mailutils is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 3, or (at your option)
   any later version.
Alain Magloire authored
8

9
   GNU Mailutils is distributed in the hope that it will be useful,
Alain Magloire authored
10
   but WITHOUT ANY WARRANTY; without even the implied warranty of
11 12
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.
13

14 15
   You should have received a copy of the GNU General Public License
   along with GNU Mailutils.  If not, see <http://www.gnu.org/licenses/>. */
16

17 18 19
#ifdef HAVE_CONFIG_H
# include <config.h>
#endif
20
#include <string.h>
21 22 23 24 25
#include <stdlib.h>
#include <limits.h>
#ifndef SIZE_MAX
# define SIZE_MAX (~((size_t)0))
#endif
Alain Magloire authored
26

27 28 29
#include <mailutils/types.h>
#include <mailutils/alloc.h>
#include <mailutils/error.h>
30
#include <mailutils/errno.h>
31 32 33 34 35 36
#include <mailutils/nls.h>
#include <mailutils/stream.h>
#include <mailutils/sys/stream.h>

static int
_stream_seterror (struct _mu_stream *stream, int code, int perm)
Alain Magloire authored
37
{
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
  stream->last_err = code;
  switch (code)
    {
    case 0:
    case EAGAIN:
    case EINPROGRESS:
      break;

    default:
      if (perm)
	stream->flags |= _MU_STR_ERR;
    }
  return code;
}

Sergey Poznyakoff authored
53 54 55
#define _stream_cleareof(s) ((s)->flags &= ~_MU_STR_EOF)
#define _stream_advance_buffer(s,n) ((s)->cur += n, (s)->level -= n)
#define _stream_buffer_offset(s) ((s)->cur - (s)->buffer)
56
#define _stream_orig_level(s) ((s)->level + _stream_buffer_offset (s))
Sergey Poznyakoff authored
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71

static int
_stream_fill_buffer (struct _mu_stream *stream)
{
  size_t n;
  size_t rdn;
  int rc = 0;
  char c;
    
  switch (stream->buftype)
    {
    case mu_buffer_none:
      return 0;
	
    case mu_buffer_full:
72 73 74 75
      rc = mu_stream_read_unbuffered (stream,
				      stream->buffer, stream->bufsize,
				      0,
				      &stream->level);
Sergey Poznyakoff authored
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
      break;
	
    case mu_buffer_line:
      for (n = 0;
	   n < stream->bufsize
	     && (rc = mu_stream_read_unbuffered (stream,
						 &c, 1, 0, &rdn)) == 0
	     && rdn; n++)
	{
	  stream->buffer[n] = c;
	  if (c == '\n')
	    break;
	}
      stream->level = n;
      break;
    }
  stream->cur = stream->buffer;
  return rc;
}

#define BUFFER_FULL_P(s) \
  ((s)->cur + (s)->level == (s)->buffer + (s)->bufsize)

static int
_stream_buffer_full_p (struct _mu_stream *stream)
{
    switch (stream->buftype)
      {
      case mu_buffer_none:
	break;
	
      case mu_buffer_line:
	return BUFFER_FULL_P (stream)
	       || memchr (stream->cur, '\n', stream->level) != NULL;

      case mu_buffer_full:
	return BUFFER_FULL_P (stream);
      }
    return 0;
}

static int
_stream_flush_buffer (struct _mu_stream *stream, int all)
{
  int rc;
  char *end;
		  
  if (stream->flags & _MU_STR_DIRTY)
    {
      if ((stream->flags & MU_STREAM_SEEK)
126
	  && (rc = mu_stream_seek (stream, stream->offset, MU_SEEK_SET, NULL)))
Sergey Poznyakoff authored
127 128 129 130 131 132 133 134 135 136 137
	return rc;

      switch (stream->buftype)
	{
	case mu_buffer_none:
	    abort(); /* should not happen */
	    
	case mu_buffer_full:
	  if ((rc = mu_stream_write_unbuffered (stream, stream->cur,
						stream->level, 1, NULL)))
	    return rc;
138 139
	  if (all)
	    _stream_advance_buffer (stream, stream->level);
Sergey Poznyakoff authored
140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168
	  break;
	    
	case mu_buffer_line:
	  if (stream->level == 0)
	    break;
	  for (end = memchr (stream->cur, '\n', stream->level);
	       end;
	       end = memchr (stream->cur, '\n', stream->level))
	    {
	      size_t size = end - stream->cur + 1;
	      rc = mu_stream_write_unbuffered (stream,
					       stream->cur,
					       size, 1, NULL);
	      if (rc)
		return rc;
	      _stream_advance_buffer (stream, size);
	    }
	  if ((all && stream->level) || BUFFER_FULL_P (stream))
	    {
	      rc = mu_stream_write_unbuffered (stream,
					       stream->cur,
					       stream->level,
					       1, NULL);
	      if (rc)
		return rc;
	      _stream_advance_buffer (stream, stream->level);
	    }
	}
    }
169 170 171
  else if (all)
    _stream_advance_buffer (stream, stream->level);
  
Sergey Poznyakoff authored
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186
  if (stream->level)
    {
      if (stream->cur > stream->buffer)
	memmove (stream->buffer, stream->cur, stream->level);
    }
  else
    {
      stream->flags &= ~_MU_STR_DIRTY;
      stream->level = 0;
    }
  stream->cur = stream->buffer;
  return 0;
}


187 188 189 190 191 192 193 194
mu_stream_t
_mu_stream_create (size_t size, int flags)
{
  struct _mu_stream *str;
  if (size < sizeof (str))
    abort ();
  str = mu_zalloc (size);
  str->flags = flags;
Sergey Poznyakoff authored
195
  mu_stream_ref (str);
196
  return str;
Alain Magloire authored
197 198
}

199
void
200
mu_stream_destroy (mu_stream_t *pstream)
201
{
202
  if (pstream)
203
    {
204 205
      mu_stream_t str = *pstream;
      if (str && (str->ref_count == 0 || --str->ref_count == 0))
206
	{
207
	  mu_stream_close (str);
208 209 210 211
	  if (str->done)
	    str->done (str);
	  free (str);
	  *pstream = NULL;
212
	}
213 214 215
    }
}

216 217
void
mu_stream_get_flags (mu_stream_t str, int *pflags)
218
{
219
  *pflags = str->flags & ~_MU_STR_INTERN_MASK;
220 221 222 223 224 225 226 227 228 229 230 231
}
  
void
mu_stream_ref (mu_stream_t stream)
{
  stream->ref_count++;
}

void
mu_stream_unref (mu_stream_t stream)
{
  mu_stream_destroy (&stream);
232 233
}

234
int
235
mu_stream_open (mu_stream_t stream)
236
{
237
  int rc;
238

239 240 241 242
  if (stream->open && (rc = stream->open (stream)))
    return _stream_seterror (stream, rc, 1);
  stream->bytes_in = stream->bytes_out = 0;
  return 0;
243 244
}

245 246
const char *
mu_stream_strerror (mu_stream_t stream, int rc)
247
{
248
  const char *str;
249

250 251 252 253 254
  if (stream->error_string)
    str = stream->error_string (stream, rc);
  else
    str = mu_strerror (rc);
  return str;
255 256 257
}

int
258 259 260 261 262 263
mu_stream_err (mu_stream_t stream)
{
  return stream->flags & _MU_STR_ERR;
}

int
264 265 266 267 268 269 270
mu_stream_last_error (mu_stream_t stream)
{
  return stream->last_err;
}

void
mu_stream_clearerr (mu_stream_t stream)
271
{
272 273
  stream->last_err = 0;
  stream->flags &= ~_MU_STR_ERR;
274 275 276
}

int
277
mu_stream_eof (mu_stream_t stream)
278
{
279
  return stream->flags & _MU_STR_EOF;
280 281 282
}

int
283 284
mu_stream_seek (mu_stream_t stream, mu_off_t offset, int whence,
		mu_off_t *pres)
285
{    
286
  int rc;
287 288
  mu_off_t size;
  
289 290
  if (!stream->seek)
    return _stream_seterror (stream, ENOSYS, 0);
Alain Magloire authored
291

292 293
  if (!(stream->flags & MU_STREAM_SEEK))
    return _stream_seterror (stream, EACCES, 1);
294

295
  switch (whence)
296
    {
297 298
    case MU_SEEK_SET:
      break;
Alain Magloire authored
299

300
    case MU_SEEK_CUR:
301
      if (offset == 0)
302
	{
303
	  *pres = stream->offset + _stream_buffer_offset (stream);
304
	  return 0;
305
	}
306 307 308 309 310 311 312 313
      offset += stream->offset;
      break;

    case MU_SEEK_END:
      rc = mu_stream_size (stream, &size);
      if (rc)
	return _stream_seterror (stream, rc, 1);
      offset += size;
314
      break;
315

316 317
    default:
      return _stream_seterror (stream, EINVAL, 1);
318
    }
319

320 321 322 323 324 325 326 327 328 329 330 331
  if (stream->buftype == mu_buffer_none
      || offset < stream->offset
      || offset > stream->offset + _stream_buffer_offset (stream))
    {
      if ((rc = _stream_flush_buffer (stream, 1)))
	return rc;
      rc = stream->seek (stream, offset, MU_SEEK_SET, &stream->offset);
      if (rc)
	return _stream_seterror (stream, rc, 1);
      _stream_cleareof (stream);
    }
  
332
  if (pres)
333
    *pres = stream->offset + _stream_buffer_offset (stream);
334
  return 0;
Alain Magloire authored
335 336 337
}

int
338 339
mu_stream_set_buffer (mu_stream_t stream, enum mu_buffer_type type,
		      size_t size)
340
{
341 342
  if (size == 0)
    type = mu_buffer_none;
343

344 345 346 347 348
  if (stream->buffer)
    {
      mu_stream_flush (stream);
      free (stream->buffer);
    }
349

350 351
  stream->buftype = type;
  if (type == mu_buffer_none)
352
    {
353
      stream->buffer = NULL;
354 355
      return 0;
    }
356

357 358
  stream->buffer = mu_alloc (size);
  if (stream->buffer == NULL)
359
    {
360 361
      stream->buftype = mu_buffer_none;
      return _stream_seterror (stream, ENOMEM, 1);
362
    }
363 364 365 366 367 368
  stream->bufsize = size;
  stream->cur = stream->buffer;
  stream->level = 0;
    
  return 0;
}
369

370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389
int
mu_stream_read_unbuffered (mu_stream_t stream, void *buf, size_t size,
			   int full_read,
			   size_t *pnread)
{
  int rc;
  size_t nread;
    
  if (!stream->read) 
    return _stream_seterror (stream, ENOSYS, 0);

  if (!(stream->flags & MU_STREAM_READ)) 
    return _stream_seterror (stream, EACCES, 1);
    
  if (stream->flags & _MU_STR_ERR)
    return stream->last_err;
    
  if ((stream->flags & _MU_STR_EOF) || size == 0)
    {
      if (pnread)
Sergey Poznyakoff authored
390 391
	*pnread = 0;
      return 0;
392
    }
393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412
    
    if (full_read)
      {
	size_t rdbytes;

	nread = 0;
	while (size > 0
	       && (rc = stream->read (stream, buf, size, &rdbytes)) == 0)
	  {
	    if (rdbytes == 0)
	      {
		stream->flags |= _MU_STR_EOF;
		break;
	      }
	    buf += rdbytes;
	    nread += rdbytes;
	    size -= rdbytes;
	    stream->bytes_in += rdbytes;
	    
	  }
Sergey Poznyakoff authored
413 414
	if (size && rc)
	  rc = _stream_seterror (stream, rc, 0);
415 416 417 418 419 420 421 422 423 424 425 426
      }
    else
      {
	rc = stream->read (stream, buf, size, &nread);
	if (rc == 0)
	  {
	    if (nread == 0)
	      stream->flags |= _MU_STR_EOF;
	    stream->bytes_in += nread;
	  }
	_stream_seterror (stream, rc, rc != 0);
      }
427
    stream->offset += nread;
428 429 430 431
    if (pnread)
      *pnread = nread;
    
    return rc;
432 433 434
}

int
435 436 437 438
mu_stream_write_unbuffered (mu_stream_t stream,
			    const void *buf, size_t size,
			    int full_write,
			    size_t *pnwritten)
439
{
440 441
  int rc;
  size_t nwritten;
442
  
443 444 445 446 447 448 449 450 451 452
  if (!stream->write) 
    return _stream_seterror (stream, ENOSYS, 0);

  if (!(stream->flags & MU_STREAM_WRITE)) 
    return _stream_seterror (stream, EACCES, 1);

  if (stream->flags & _MU_STR_ERR)
    return stream->last_err;

  if (size == 0)
453
    {
454 455 456
      if (pnwritten)
	*pnwritten = 0;
      return 0;
457
    }
458 459
    
  if (full_write)
460
    {
461 462
      size_t wrbytes;
      const char *bufp = buf;
463

464 465 466 467
      nwritten = 0;
      while (size > 0
	     && (rc = stream->write (stream, bufp, size, &wrbytes))
	             == 0)
468
	{
469
	  if (wrbytes == 0)
470
	    {
471
	      rc = EIO;
472 473
	      break;
	    }
474 475 476 477
	  bufp += wrbytes;
	  nwritten += wrbytes;
	  size -= wrbytes;
	  stream->bytes_out += wrbytes;
478 479 480 481
	}
    }
  else
    {
482 483 484
      rc = stream->write (stream, buf, size, &nwritten);
      if (rc == 0)
	stream->bytes_out += nwritten;
485
    }
Sergey Poznyakoff authored
486
  stream->flags |= _MU_STR_WRT;
487
  stream->offset += nwritten;
488 489 490
  if (pnwritten)
    *pnwritten = nwritten;
  _stream_seterror (stream, rc, rc != 0);
491 492 493
  return rc;
}

494
int
495
mu_stream_read (mu_stream_t stream, void *buf, size_t size, size_t *pread)
496
{
497 498 499
  if (stream->buftype == mu_buffer_none)
    return mu_stream_read_unbuffered (stream, buf, size, !pread, pread);
  else
500
    {
501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533
      char *bufp = buf;
      size_t nbytes = 0;
      while (size)
	{
	  size_t n;
	  int rc;
	  
	  if (stream->level == 0)
	    {
	      if ((rc = _stream_fill_buffer (stream)))
		{
		  if (nbytes)
		    break;
		  return rc;
		}
	      if (stream->level == 0)
		break;
	    }
	  
	  n = size;
	  if (n > stream->level)
	    n = stream->level;
	  memcpy (bufp, stream->cur, n);
	  _stream_advance_buffer (stream, n);
	  nbytes += n;
	  bufp += n;
	  size -= n;
	  if (stream->buftype == mu_buffer_line && bufp[-1] == '\n')
	    break;
	}
      
      if (pread)
	*pread = nbytes;
534
    }
535
  return 0;
536
}
537 538

int
539
mu_stream_readline (mu_stream_t stream, char *buf, size_t size, size_t *pread)
540
{
541 542
  int rc;
  char c;
Sergey Poznyakoff authored
543
  size_t n = 0, rdn;
544 545
    
  if (size == 0)
546
    return EINVAL;
547 548
    
  size--;
Sergey Poznyakoff authored
549 550
  for (n = 0;
       n < size && (rc = mu_stream_read (stream, &c, 1, &rdn)) == 0 && rdn;)
551
    {
552
      *buf++ = c;
Sergey Poznyakoff authored
553
      n++;
554 555
      if (c == '\n')
	break;
556
    }
557 558 559 560
  *buf = 0;
  if (pread)
    *pread = n;
  return rc;
561
}
562

563
int
564 565
mu_stream_getdelim (mu_stream_t stream, char **pbuf, size_t *psize,
		    int delim, size_t *pread)
566
{
567 568 569 570 571 572
  int rc;
  char *lineptr = *pbuf;
  size_t n = *psize;
  size_t cur_len = 0;
    
  if (lineptr == NULL || n == 0)
573
    {
574 575 576 577 578 579
      char *new_lineptr;
      n = 120;
      new_lineptr = mu_realloc (lineptr, n);
      if (new_lineptr == NULL) 
	return ENOMEM;
      lineptr = new_lineptr;
580
    }
581 582
    
  for (;;)
583
    {
584
      char c;
Sergey Poznyakoff authored
585
      size_t rdn;
586

Sergey Poznyakoff authored
587 588
      rc = mu_stream_read (stream, &c, 1, &rdn);
      if (rc || rdn == 0)
589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622
	break;
	
      /* Make enough space for len+1 (for final NUL) bytes.  */
      if (cur_len + 1 >= n)
	{
	  size_t needed_max =
	    SSIZE_MAX < SIZE_MAX ? (size_t) SSIZE_MAX + 1 : SIZE_MAX;
	  size_t needed = 2 * n + 1;   /* Be generous. */
	  char *new_lineptr;
	  
	  if (needed_max < needed)
	    needed = needed_max;
	  if (cur_len + 1 >= needed)
	    {
	      rc = EOVERFLOW;
	      break;
	    }
	    
	  new_lineptr = mu_realloc (lineptr, needed);
	  if (new_lineptr == NULL)
	    {
	      rc = ENOMEM;
	      break;
	    }
	    
	  lineptr = new_lineptr;
	  n = needed;
	}

      lineptr[cur_len] = c;
      cur_len++;
      
      if (c == delim)
	break;
623
    }
624 625 626 627 628 629 630 631
  lineptr[cur_len] = '\0';
    
  *pbuf = lineptr;
  *psize = n;
  
  if (pread)
    *pread = cur_len;
  return rc;
632 633 634
}

int
635 636
mu_stream_getline (mu_stream_t stream, char **pbuf, size_t *psize,
		   size_t *pread)
637
{
638
    return mu_stream_getdelim (stream, pbuf, psize, '\n', pread);
639 640 641
}

int
642 643
mu_stream_write (mu_stream_t stream, const void *buf, size_t size,
		 size_t *pnwritten)
644
{
645 646 647 648 649 650
  int rc = 0;
  
  if (stream->buftype == mu_buffer_none)
    rc = mu_stream_write_unbuffered (stream, buf, size,
				     !pnwritten, pnwritten);
  else
651
    {
652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677
      size_t nbytes = 0;
      const char *bufp = buf;
	
      while (1)
	{
	  size_t n;
	  
	  if (_stream_buffer_full_p (stream)
	      && (rc = _stream_flush_buffer (stream, 0)))
	    break;

	  if (size == 0)
	    break;
	    
	  n = stream->bufsize - stream->level;
	  if (n > size)
	    n = size;
	  memcpy (stream->cur + stream->level, bufp, n);
	  stream->level += n;
	  nbytes += n;
	  bufp += n;
	  size -= n;
	  stream->flags |= _MU_STR_DIRTY;
	}
      if (pnwritten)
	*pnwritten = nbytes;
678
    }
679
  return rc;
680 681 682
}

int
683
mu_stream_writeline (mu_stream_t stream, const char *buf, size_t size)
684
{
685 686 687 688
  int rc;
  if ((rc = mu_stream_write (stream, buf, size, NULL)) == 0)
    rc = mu_stream_write (stream, "\r\n", 2, NULL);
  return rc;
689 690 691
}

int
692
mu_stream_flush (mu_stream_t stream)
693
{
694 695 696
  int rc;
  
  if (!stream)
697
    return EINVAL;
698 699 700
  rc = _stream_flush_buffer (stream, 1);
  if (rc)
    return rc;
Sergey Poznyakoff authored
701
  if ((stream->flags & _MU_STR_WRT) && stream->flush)
702
    return stream->flush (stream);
Sergey Poznyakoff authored
703
  stream->flags &= ~_MU_STR_WRT;
704
  return 0;
705 706 707
}

int
708
mu_stream_close (mu_stream_t stream)
709
{
710 711 712
  int rc = 0;
    
  if (!stream)
713
    return EINVAL;
714
  mu_stream_flush (stream);
715 716 717
  /* Do close the stream only if it is not used by anyone else */
  if (stream->ref_count > 1)
    return 0;
718 719 720
  if (stream->close)
    rc = stream->close (stream);
  return rc;
721
}
722 723

int
724
mu_stream_size (mu_stream_t stream, mu_off_t *psize)
725
{
726 727 728 729 730 731
  int rc;
    
  if (!stream->size)
    return _stream_seterror (stream, ENOSYS, 0);
  rc = stream->size (stream, psize);
  return _stream_seterror (stream, rc, rc != 0);
732
}
733

734 735
mu_off_t
mu_stream_bytes_in (mu_stream_t stream)
736
{
737
  return stream->bytes_in;
738 739
}

740 741
mu_off_t
mu_stream_bytes_out (mu_stream_t stream)
742
{
743
  return stream->bytes_out;
744 745 746
}

int
747
mu_stream_ioctl (mu_stream_t stream, int code, void *ptr)
Sergey Poznyakoff authored
748
{
749 750 751
  if (stream->ctl == NULL)
    return ENOSYS;
  return stream->ctl (stream, code, ptr);
Sergey Poznyakoff authored
752 753 754
}

int
755
mu_stream_wait (mu_stream_t stream, int *pflags, struct timeval *tvp)
756
{
757
  int flg = 0;
758 759
  if (stream == NULL)
    return EINVAL;
760 761 762 763 764 765
  
  /* Take to acount if we have any buffering.  */
  /* FIXME: How about MU_STREAM_READY_WR? */
  if ((*pflags) & MU_STREAM_READY_RD 
      && stream->buftype != mu_buffer_none
      && stream->level > 0)
766
    {
767 768
      flg = MU_STREAM_READY_RD;
      *pflags &= ~MU_STREAM_READY_RD;
769 770
    }

771
  if (stream->wait)
772
    {
773 774 775 776
      int rc = stream->wait (stream, pflags, tvp);
      if (rc == 0)
	*pflags |= flg;
      return rc;
777
    }
778 779
  
  return ENOSYS;
780 781 782
}

int
783
mu_stream_truncate (mu_stream_t stream, mu_off_t size)
784
{
785 786 787
  if (stream->truncate)
    return stream->truncate (stream, size);
  return ENOSYS;
788 789 790
}

int
791
mu_stream_shutdown (mu_stream_t stream, int how)
792
{
793 794 795 796
  if (stream->shutdown)
    return stream->shutdown (stream, how);
  return ENOSYS;
}
797 798

int
799
mu_stream_set_flags (mu_stream_t stream, int fl)
800
{
801 802
  if (stream == NULL)
    return EINVAL;
803
  stream->flags |= (fl & ~_MU_STR_INTERN_MASK);
804 805 806 807
  return 0;
}

int
808
mu_stream_clr_flags (mu_stream_t stream, int fl)
809
{
810 811
  if (stream == NULL)
    return EINVAL;
812
  stream->flags &= ~(fl & ~_MU_STR_INTERN_MASK);
813 814 815
  return 0;
}

816 817


Sergey Poznyakoff authored
818

819

820