source: rtems/cpukit/libfs/src/pipe/fifo.c @ a7d1992c

Last change on this file since a7d1992c was a7d1992c, checked in by Sebastian Huber <sebastian.huber@…>, on May 15, 2012 at 8:06:18 AM

Merge branch 'upstream'

  • Property mode set to 100644
File size: 13.7 KB
Line 
1/*
2 * fifo.c: POSIX FIFO/pipe for RTEMS
3 *
4 * Author: Wei Shen <cquark@gmail.com>
5 *
6 * The license and distribution terms for this file may be
7 * found in the file LICENSE in this distribution or at
8 * http://www.rtems.com/license/LICENSE.
9 */
10
11
12#if HAVE_CONFIG_H
13#include "config.h"
14#endif
15
16#ifdef RTEMS_POSIX_API
17#define __RTEMS_VIOLATE_KERNEL_VISIBILITY__
18#endif
19
20#include <errno.h>
21#include <stdlib.h>
22
23#include <rtems.h>
24#include <rtems/libio_.h>
25
26#include "pipe.h"
27
28
29#define MIN(a, b) ((a) < (b)? (a): (b))
30
31#define LIBIO_ACCMODE(_iop) ((_iop)->flags & LIBIO_FLAGS_READ_WRITE)
32#define LIBIO_NODELAY(_iop) ((_iop)->flags & LIBIO_FLAGS_NO_DELAY)
33
34static rtems_id pipe_semaphore = RTEMS_ID_NONE;
35
36
37#define PIPE_EMPTY(_pipe) (_pipe->Length == 0)
38#define PIPE_FULL(_pipe)  (_pipe->Length == _pipe->Size)
39#define PIPE_SPACE(_pipe) (_pipe->Size - _pipe->Length)
40#define PIPE_WSTART(_pipe) ((_pipe->Start + _pipe->Length) % _pipe->Size)
41
42#define PIPE_LOCK(_pipe)  \
43  ( rtems_semaphore_obtain(_pipe->Semaphore, RTEMS_WAIT, RTEMS_NO_TIMEOUT)  \
44   == RTEMS_SUCCESSFUL )
45
46#define PIPE_UNLOCK(_pipe)  rtems_semaphore_release(_pipe->Semaphore)
47
48#define PIPE_READWAIT(_pipe)  \
49  ( rtems_barrier_wait(_pipe->readBarrier, RTEMS_NO_TIMEOUT)  \
50   == RTEMS_SUCCESSFUL)
51
52#define PIPE_WRITEWAIT(_pipe)  \
53  ( rtems_barrier_wait(_pipe->writeBarrier, RTEMS_NO_TIMEOUT)  \
54   == RTEMS_SUCCESSFUL)
55
56#define PIPE_WAKEUPREADERS(_pipe) \
57  do {uint32_t n; rtems_barrier_release(_pipe->readBarrier, &n); } while(0)
58
59#define PIPE_WAKEUPWRITERS(_pipe) \
60  do {uint32_t n; rtems_barrier_release(_pipe->writeBarrier, &n); } while(0)
61
62
63#ifdef RTEMS_POSIX_API
64#define __RTEMS_VIOLATE_KERNEL_VISIBILITY__
65
66#include <rtems/rtems/barrier.h>
67#include <rtems/score/thread.h>
68
69static void pipe_select_wakeup(rtems_id *id_ptr)
70{
71  rtems_id id = *id_ptr;
72
73  *id_ptr = 0;
74
75  if (id != 0) {
76    rtems_status_code sc = rtems_event_send(id, RTEMS_IOCTL_SELECT_EVENT);
77
78    if (sc != RTEMS_SUCCESSFUL) {
79      rtems_fatal_error_occurred(sc);
80    }
81  }
82}
83
84/* Set barriers to be interruptible by signals. */
85static void pipe_interruptible(pipe_control_t *pipe)
86{
87  Objects_Locations location;
88
89  _Barrier_Get(pipe->readBarrier, &location)->Barrier.Wait_queue.state
90    |= STATES_INTERRUPTIBLE_BY_SIGNAL;
91  _Thread_Enable_dispatch();
92  _Barrier_Get(pipe->writeBarrier, &location)->Barrier.Wait_queue.state
93    |= STATES_INTERRUPTIBLE_BY_SIGNAL;
94  _Thread_Enable_dispatch();
95}
96#endif
97
98/*
99 * Alloc pipe control structure, buffer, and resources.
100 * Called with pipe_semaphore held.
101 */
102static int pipe_alloc(
103  pipe_control_t **pipep
104)
105{
106  static char c = 'a';
107  pipe_control_t *pipe;
108  int err = -ENOMEM;
109
110  pipe = calloc(1, sizeof(pipe_control_t));
111  if (pipe == NULL)
112    return err;
113
114  pipe->Size = PIPE_BUF;
115  pipe->Buffer = malloc(pipe->Size);
116  if (! pipe->Buffer)
117    goto err_buf;
118
119  err = -ENOMEM;
120
121  if (rtems_barrier_create(
122        rtems_build_name ('P', 'I', 'r', c),
123        RTEMS_BARRIER_MANUAL_RELEASE, 0,
124        &pipe->readBarrier) != RTEMS_SUCCESSFUL)
125    goto err_rbar;
126  if (rtems_barrier_create(
127        rtems_build_name ('P', 'I', 'w', c),
128        RTEMS_BARRIER_MANUAL_RELEASE, 0,
129        &pipe->writeBarrier) != RTEMS_SUCCESSFUL)
130    goto err_wbar;
131  if (rtems_semaphore_create(
132        rtems_build_name ('P', 'I', 's', c), 1,
133        RTEMS_BINARY_SEMAPHORE | RTEMS_FIFO,
134        RTEMS_NO_PRIORITY, &pipe->Semaphore) != RTEMS_SUCCESSFUL)
135    goto err_sem;
136
137#ifdef RTEMS_POSIX_API
138  pipe_interruptible(pipe);
139#endif
140
141  *pipep = pipe;
142  if (c ++ == 'z')
143    c = 'a';
144  return 0;
145
146err_sem:
147  rtems_barrier_delete(pipe->writeBarrier);
148err_wbar:
149  rtems_barrier_delete(pipe->readBarrier);
150err_rbar:
151  free(pipe->Buffer);
152err_buf:
153  free(pipe);
154  return err;
155}
156
157/* Called with pipe_semaphore held. */
158static inline void pipe_free(
159  pipe_control_t *pipe
160)
161{
162  rtems_barrier_delete(pipe->readBarrier);
163  rtems_barrier_delete(pipe->writeBarrier);
164  rtems_semaphore_delete(pipe->Semaphore);
165  free(pipe->Buffer);
166  free(pipe);
167}
168
169static int pipe_lock(void)
170{
171  rtems_status_code sc = RTEMS_SUCCESSFUL;
172
173  if (pipe_semaphore == RTEMS_ID_NONE) {
174    rtems_libio_lock();
175
176    if (pipe_semaphore == RTEMS_ID_NONE) {
177      sc = rtems_semaphore_create(
178        rtems_build_name('P', 'I', 'P', 'E'),
179        1,
180        RTEMS_BINARY_SEMAPHORE | RTEMS_INHERIT_PRIORITY | RTEMS_PRIORITY,
181        RTEMS_NO_PRIORITY,
182        &pipe_semaphore
183      );
184    }
185
186    rtems_libio_unlock();
187  }
188
189  if (sc == RTEMS_SUCCESSFUL) {
190    sc = rtems_semaphore_obtain(pipe_semaphore, RTEMS_WAIT, RTEMS_NO_TIMEOUT);
191  }
192
193  if (sc == RTEMS_SUCCESSFUL) {
194    return 0;
195  } else {
196    return -ENOMEM;
197  }
198}
199
200static void pipe_unlock(void)
201{
202#ifdef RTEMS_DEBUG
203  rtems_status_code sc = RTEMS_SUCCESSFUL;
204
205  sc =
206#endif
207   rtems_semaphore_release(pipe_semaphore);
208  #ifdef RTEMS_DEBUG
209    if (sc != RTEMS_SUCCESSFUL) {
210      rtems_fatal_error_occurred(0xdeadbeef);
211    }
212  #endif
213}
214
215/*
216 * If called with *pipep = NULL, pipe_new will call pipe_alloc to allocate a
217 * pipe control structure and set *pipep to its address.
218 * pipe is locked, when pipe_new returns with no error.
219 */
220static int pipe_new(
221  pipe_control_t **pipep
222)
223{
224  pipe_control_t *pipe;
225  int err = 0;
226
227  err = pipe_lock();
228  if (err)
229    return err;
230
231  pipe = *pipep;
232  if (pipe == NULL) {
233    err = pipe_alloc(&pipe);
234    if (err)
235      goto out;
236  }
237
238  if (! PIPE_LOCK(pipe))
239    err = -EINTR;
240
241  if (*pipep == NULL) {
242    if (err)
243      pipe_free(pipe);
244    else
245      *pipep = pipe;
246  }
247
248out:
249  pipe_unlock();
250  return err;
251}
252
253/*
254 * Interface to file system close.
255 *
256 * *pipep points to pipe control structure. When the last user releases pipe,
257 * it will be set to NULL.
258 */
259void pipe_release(
260  pipe_control_t **pipep,
261  rtems_libio_t *iop
262)
263{
264  pipe_control_t *pipe = *pipep;
265  uint32_t mode;
266
267  #if defined(RTEMS_DEBUG)
268    /* WARN pipe not freed and pipep not set to NULL! */
269    if (pipe_lock())
270      rtems_fatal_error_occurred(0xdeadbeef);
271
272    /* WARN pipe not released! */
273    if (!PIPE_LOCK(pipe))
274      rtems_fatal_error_occurred(0xdeadbeef);
275  #endif
276
277  mode = LIBIO_ACCMODE(iop);
278  if (mode & LIBIO_FLAGS_READ)
279     pipe->Readers --;
280  if (mode & LIBIO_FLAGS_WRITE)
281     pipe->Writers --;
282
283  PIPE_UNLOCK(pipe);
284
285  if (pipe->Readers == 0 && pipe->Writers == 0) {
286#if 0
287    /* To delete an anonymous pipe file when all users closed it */
288    if (pipe->Anonymous)
289      delfile = TRUE;
290#endif
291    pipe_free(pipe);
292    *pipep = NULL;
293  }
294  else if (pipe->Readers == 0 && mode != LIBIO_FLAGS_WRITE)
295    /* Notify waiting Writers that all their partners left */
296    PIPE_WAKEUPWRITERS(pipe);
297  else if (pipe->Writers == 0 && mode != LIBIO_FLAGS_READ)
298    PIPE_WAKEUPREADERS(pipe);
299
300  pipe_unlock();
301
302#if 0
303  if (! delfile)
304    return;
305  if (iop->pathinfo.ops->unlink_h == NULL)
306    return;
307
308  /* This is safe for IMFS, but how about other FSes? */
309  iop->flags &= ~LIBIO_FLAGS_OPEN;
310  if(iop->pathinfo.ops->unlink_h(&iop->pathinfo))
311    return;
312#endif
313
314}
315
316/*
317 * Interface to file system open.
318 *
319 * *pipep points to pipe control structure. If called with *pipep = NULL,
320 * fifo_open will try allocating and initializing a control structure. If the
321 * call succeeds, *pipep will be set to address of new control structure.
322 */
323int fifo_open(
324  pipe_control_t **pipep,
325  rtems_libio_t *iop
326)
327{
328  pipe_control_t *pipe;
329  unsigned int prevCounter;
330  int err;
331
332  err = pipe_new(pipep);
333  if (err)
334    return err;
335  pipe = *pipep;
336
337  switch (LIBIO_ACCMODE(iop)) {
338    case LIBIO_FLAGS_READ:
339      pipe->readerCounter ++;
340      if (pipe->Readers ++ == 0)
341        PIPE_WAKEUPWRITERS(pipe);
342
343      if (pipe->Writers == 0) {
344        /* Not an error */
345        if (LIBIO_NODELAY(iop))
346          break;
347
348        prevCounter = pipe->writerCounter;
349        err = -EINTR;
350        /* Wait until a writer opens the pipe */
351        do {
352          PIPE_UNLOCK(pipe);
353          if (! PIPE_READWAIT(pipe))
354            goto out_error;
355          if (! PIPE_LOCK(pipe))
356            goto out_error;
357        } while (prevCounter == pipe->writerCounter);
358      }
359      break;
360
361    case LIBIO_FLAGS_WRITE:
362      pipe->writerCounter ++;
363
364      if (pipe->Writers ++ == 0)
365        PIPE_WAKEUPREADERS(pipe);
366
367      if (pipe->Readers == 0 && LIBIO_NODELAY(iop)) {
368        PIPE_UNLOCK(pipe);
369        err = -ENXIO;
370        goto out_error;
371      }
372
373      if (pipe->Readers == 0) {
374        prevCounter = pipe->readerCounter;
375        err = -EINTR;
376        do {
377          PIPE_UNLOCK(pipe);
378          if (! PIPE_WRITEWAIT(pipe))
379            goto out_error;
380          if (! PIPE_LOCK(pipe))
381            goto out_error;
382        } while (prevCounter == pipe->readerCounter);
383      }
384      break;
385
386    case LIBIO_FLAGS_READ_WRITE:
387      pipe->readerCounter ++;
388      if (pipe->Readers ++ == 0)
389        PIPE_WAKEUPWRITERS(pipe);
390      pipe->writerCounter ++;
391      if (pipe->Writers ++ == 0)
392        PIPE_WAKEUPREADERS(pipe);
393      break;
394  }
395
396  PIPE_UNLOCK(pipe);
397  return 0;
398
399out_error:
400  pipe_release(pipep, iop);
401  return err;
402}
403
404/*
405 * Interface to file system read.
406 */
407ssize_t pipe_read(
408  pipe_control_t *pipe,
409  void           *buffer,
410  size_t          count,
411  rtems_libio_t  *iop
412)
413{
414  int chunk, chunk1, read = 0, ret = 0;
415
416  if (! PIPE_LOCK(pipe))
417    return -EINTR;
418
419  while (read < count) {
420    while (PIPE_EMPTY(pipe)) {
421      /* Not an error */
422      if (pipe->Writers == 0)
423        goto out_locked;
424
425      if (LIBIO_NODELAY(iop)) {
426        ret = -EAGAIN;
427        goto out_locked;
428      }
429
430      /* Wait until pipe is no more empty or no writer exists */
431      pipe->waitingReaders ++;
432      PIPE_UNLOCK(pipe);
433      if (! PIPE_READWAIT(pipe))
434        ret = -EINTR;
435      if (! PIPE_LOCK(pipe)) {
436        /* WARN waitingReaders not restored! */
437        ret = -EINTR;
438        goto out_nolock;
439      }
440      pipe->waitingReaders --;
441      if (ret != 0)
442        goto out_locked;
443    }
444
445    /* Read chunk bytes */
446    chunk = MIN(count - read,  pipe->Length);
447    chunk1 = pipe->Size - pipe->Start;
448    if (chunk > chunk1) {
449      memcpy(buffer + read, pipe->Buffer + pipe->Start, chunk1);
450      memcpy(buffer + read + chunk1, pipe->Buffer, chunk - chunk1);
451    }
452    else
453      memcpy(buffer + read, pipe->Buffer + pipe->Start, chunk);
454
455    pipe->Start += chunk;
456    pipe->Start %= pipe->Size;
457    pipe->Length -= chunk;
458    /* For buffering optimization */
459    if (PIPE_EMPTY(pipe))
460      pipe->Start = 0;
461
462    pipe_select_wakeup(&pipe->select_write_task_id);
463    if (pipe->waitingWriters > 0)
464      PIPE_WAKEUPWRITERS(pipe);
465    read += chunk;
466  }
467
468out_locked:
469  PIPE_UNLOCK(pipe);
470
471out_nolock:
472  if (read > 0)
473    return read;
474  return ret;
475}
476
477/*
478 * Interface to file system write.
479 */
480ssize_t pipe_write(
481  pipe_control_t *pipe,
482  const void     *buffer,
483  size_t          count,
484  rtems_libio_t  *iop
485)
486{
487  int chunk, chunk1, written = 0, ret = 0;
488
489  /* Write nothing */
490  if (count == 0)
491    return 0;
492
493  if (! PIPE_LOCK(pipe))
494    return -EINTR;
495
496  if (pipe->Readers == 0) {
497    ret = -EPIPE;
498    goto out_locked;
499  }
500
501  /* Write of PIPE_BUF bytes or less shall not be interleaved */
502  chunk = count <= pipe->Size ? count : 1;
503
504  while (written < count) {
505    while (PIPE_SPACE(pipe) < chunk) {
506      if (LIBIO_NODELAY(iop)) {
507        ret = -EAGAIN;
508        goto out_locked;
509      }
510
511      /* Wait until there is chunk bytes space or no reader exists */
512      pipe->waitingWriters ++;
513      PIPE_UNLOCK(pipe);
514      if (! PIPE_WRITEWAIT(pipe))
515        ret = -EINTR;
516      if (! PIPE_LOCK(pipe)) {
517        /* WARN waitingWriters not restored! */
518        ret = -EINTR;
519        goto out_nolock;
520      }
521      pipe->waitingWriters --;
522      if (ret != 0)
523        goto out_locked;
524
525      if (pipe->Readers == 0) {
526        ret = -EPIPE;
527        goto out_locked;
528      }
529    }
530
531    chunk = MIN(count - written, PIPE_SPACE(pipe));
532    chunk1 = pipe->Size - PIPE_WSTART(pipe);
533    if (chunk > chunk1) {
534      memcpy(pipe->Buffer + PIPE_WSTART(pipe), buffer + written, chunk1);
535      memcpy(pipe->Buffer, buffer + written + chunk1, chunk - chunk1);
536    }
537    else
538      memcpy(pipe->Buffer + PIPE_WSTART(pipe), buffer + written, chunk);
539
540    pipe->Length += chunk;
541    pipe_select_wakeup(&pipe->select_read_task_id);
542    if (pipe->waitingReaders > 0)
543      PIPE_WAKEUPREADERS(pipe);
544    written += chunk;
545    /* Write of more than PIPE_BUF bytes can be interleaved */
546    chunk = 1;
547  }
548
549out_locked:
550  PIPE_UNLOCK(pipe);
551
552out_nolock:
553#ifdef RTEMS_POSIX_API
554  /* Signal SIGPIPE */
555  if (ret == -EPIPE)
556    kill(getpid(), SIGPIPE);
557#endif
558
559  if (written > 0)
560    return written;
561  return ret;
562}
563
564static int pipe_register_select_wakeup(
565  rtems_id *id_ptr,
566  const rtems_ioctl_select_request *request
567)
568{
569  int rv = 0;
570  rtems_id current_id = *id_ptr;
571  rtems_id request_id = request->request_task_id;
572
573  if (current_id == 0 || current_id == request_id) {
574    *id_ptr = request_id;
575  } else {
576    rv = -EINVAL;
577  }
578
579  return rv;
580}
581
582static int pipe_select(
583  pipe_control_t *pipe,
584  const rtems_ioctl_select_request *request
585)
586{
587  int rv = 0;
588
589  switch (request->kind) {
590    case RTEMS_IOCTL_SELECT_READ:
591      if (!PIPE_EMPTY(pipe)) {
592        rv = 1;
593      } else {
594        rv = pipe_register_select_wakeup(&pipe->select_read_task_id, request);
595      }
596      break;
597    case RTEMS_IOCTL_SELECT_WRITE:
598      if (PIPE_SPACE(pipe) > 0) {
599        rv = 1;
600      } else {
601        rv = pipe_register_select_wakeup(&pipe->select_write_task_id, request);
602      }
603      break;
604    default:
605      break;
606  }
607
608  return rv;
609}
610
611/*
612 * Interface to file system ioctl.
613 */
614int pipe_ioctl(
615  pipe_control_t  *pipe,
616  ioctl_command_t  cmd,
617  void            *buffer,
618  rtems_libio_t   *iop
619)
620{
621  int rv = 0;
622
623  if (PIPE_LOCK(pipe)) {
624    switch (cmd) {
625      case RTEMS_IOCTL_SELECT:
626        rv = pipe_select(pipe, buffer);
627        break;
628      case FIONREAD:
629        /* Return length of pipe */
630        *(unsigned int *) buffer = pipe->Length;
631        break;
632      default:
633        rv = -EINVAL;
634        break;
635    }
636
637    PIPE_UNLOCK(pipe);
638  } else {
639    rv = -EINTR;
640  }
641
642  return rv;
643}
Note: See TracBrowser for help on using the repository browser.