source: rtems/cpukit/libfs/src/pipe/fifo.c @ 241f4c96

4.115
Last change on this file since 241f4c96 was 241f4c96, checked in by Sebastian Huber <sebastian.huber@…>, on 06/08/10 at 10:25:46

2010-06-08 Sebastian Huber <sebastian.huber@…>

  • libfs/src/imfs/fifoimfs_init.c: New file.
  • libfs/Makefile.am: Reflect change above.
  • libfs/src/imfs/imfs.h, libfs/src/imfs/imfs_eval.c, libfs/src/imfs/imfs_init.c, libfs/src/imfs/imfs_initsupp.c, libfs/src/imfs/miniimfs_init.c, libfs/src/pipe/fifo.c, libfs/src/pipe/pipe.c, libfs/src/pipe/pipe.h: Pipe support is now link-time optional.
  • sapi/include/confdefs.h: Reflect changes above.
  • Property mode set to 100644
File size: 12.5 KB
Line 
1/*
2 * fifo.c: POSIX FIFO/pipe for RTEMS
3 *
4 * Author: Wei Shen <cquark@gmail.com>
5 *
6 * The license and distribution terms for this file may be
7 * found in the file LICENSE in this distribution or at
8 * http://www.rtems.com/license/LICENSE.
9 *
10 * $Id$
11 */
12
13
14#if HAVE_CONFIG_H
15#include "config.h"
16#endif
17
18#ifdef RTEMS_POSIX_API
19#define __RTEMS_VIOLATE_KERNEL_VISIBILITY__
20#endif
21
22#include <errno.h>
23#include <stdlib.h>
24
25#include <rtems.h>
26#include <rtems/libio_.h>
27
28#include "pipe.h"
29
30
31#define MIN(a, b) ((a) < (b)? (a): (b))
32
33#define LIBIO_ACCMODE(_iop) ((_iop)->flags & LIBIO_FLAGS_READ_WRITE)
34#define LIBIO_NODELAY(_iop) ((_iop)->flags & LIBIO_FLAGS_NO_DELAY)
35
36static rtems_id pipe_semaphore = RTEMS_ID_NONE;
37
38
39#define PIPE_EMPTY(_pipe) (_pipe->Length == 0)
40#define PIPE_FULL(_pipe)  (_pipe->Length == _pipe->Size)
41#define PIPE_SPACE(_pipe) (_pipe->Size - _pipe->Length)
42#define PIPE_WSTART(_pipe) ((_pipe->Start + _pipe->Length) % _pipe->Size)
43
44#define PIPE_LOCK(_pipe)  \
45  ( rtems_semaphore_obtain(_pipe->Semaphore, RTEMS_WAIT, RTEMS_NO_TIMEOUT)  \
46   == RTEMS_SUCCESSFUL )
47
48#define PIPE_UNLOCK(_pipe)  rtems_semaphore_release(_pipe->Semaphore)
49
50#define PIPE_READWAIT(_pipe)  \
51  ( rtems_barrier_wait(_pipe->readBarrier, RTEMS_NO_TIMEOUT)  \
52   == RTEMS_SUCCESSFUL)
53
54#define PIPE_WRITEWAIT(_pipe)  \
55  ( rtems_barrier_wait(_pipe->writeBarrier, RTEMS_NO_TIMEOUT)  \
56   == RTEMS_SUCCESSFUL)
57
58#define PIPE_WAKEUPREADERS(_pipe) \
59  do {uint32_t n; rtems_barrier_release(_pipe->readBarrier, &n); } while(0)
60
61#define PIPE_WAKEUPWRITERS(_pipe) \
62  do {uint32_t n; rtems_barrier_release(_pipe->writeBarrier, &n); } while(0)
63
64
65#ifdef RTEMS_POSIX_API
66#define __RTEMS_VIOLATE_KERNEL_VISIBILITY__
67
68#include <rtems/rtems/barrier.h>
69#include <rtems/score/thread.h>
70
71/* Set barriers to be interruptible by signals. */
72static void pipe_interruptible(pipe_control_t *pipe)
73{
74  Objects_Locations location;
75
76  _Barrier_Get(pipe->readBarrier, &location)->Barrier.Wait_queue.state
77    |= STATES_INTERRUPTIBLE_BY_SIGNAL;
78  _Thread_Enable_dispatch();
79  _Barrier_Get(pipe->writeBarrier, &location)->Barrier.Wait_queue.state
80    |= STATES_INTERRUPTIBLE_BY_SIGNAL;
81  _Thread_Enable_dispatch();
82}
83#endif
84
85/*
86 * Alloc pipe control structure, buffer, and resources.
87 * Called with pipe_semaphore held.
88 */
89static int pipe_alloc(
90  pipe_control_t **pipep
91)
92{
93  static char c = 'a';
94  pipe_control_t *pipe;
95  int err = -ENOMEM;
96
97  pipe = malloc(sizeof(pipe_control_t));
98  if (pipe == NULL)
99    return err;
100  memset(pipe, 0, sizeof(pipe_control_t));
101
102  pipe->Size = PIPE_BUF;
103  pipe->Buffer = malloc(pipe->Size);
104  if (! pipe->Buffer)
105    goto err_buf;
106
107  err = -EINTR;
108  if (rtems_barrier_create(
109        rtems_build_name ('P', 'I', 'r', c),
110        RTEMS_BARRIER_MANUAL_RELEASE, 0,
111        &pipe->readBarrier) != RTEMS_SUCCESSFUL)
112    goto err_rbar;
113  if (rtems_barrier_create(
114        rtems_build_name ('P', 'I', 'w', c),
115        RTEMS_BARRIER_MANUAL_RELEASE, 0,
116        &pipe->writeBarrier) != RTEMS_SUCCESSFUL)
117    goto err_wbar;
118  if (rtems_semaphore_create(
119        rtems_build_name ('P', 'I', 's', c), 1,
120        RTEMS_BINARY_SEMAPHORE | RTEMS_FIFO,
121        RTEMS_NO_PRIORITY, &pipe->Semaphore) != RTEMS_SUCCESSFUL)
122    goto err_sem;
123
124#ifdef RTEMS_POSIX_API
125  pipe_interruptible(pipe);
126#endif
127
128  *pipep = pipe;
129  if (c ++ == 'z')
130    c = 'a';
131  return 0;
132
133err_sem:
134  rtems_barrier_delete(pipe->writeBarrier);
135err_wbar:
136  rtems_barrier_delete(pipe->readBarrier);
137err_rbar:
138  free(pipe->Buffer);
139err_buf:
140  free(pipe);
141  return err;
142}
143
144/* Called with pipe_semaphore held. */
145static inline void pipe_free(
146  pipe_control_t *pipe
147)
148{
149  rtems_barrier_delete(pipe->readBarrier);
150  rtems_barrier_delete(pipe->writeBarrier);
151  rtems_semaphore_delete(pipe->Semaphore);
152  free(pipe->Buffer);
153  free(pipe);
154}
155
156static rtems_status_code pipe_lock(void)
157{
158  rtems_status_code sc = RTEMS_SUCCESSFUL;
159
160  if (pipe_semaphore == RTEMS_ID_NONE) {
161    rtems_libio_lock();
162
163    if (pipe_semaphore == RTEMS_ID_NONE) {
164      sc = rtems_semaphore_create(
165        rtems_build_name('P', 'I', 'P', 'E'),
166        1,
167        RTEMS_BINARY_SEMAPHORE | RTEMS_INHERIT_PRIORITY | RTEMS_PRIORITY,
168        RTEMS_NO_PRIORITY,
169        &pipe_semaphore
170      );
171    }
172
173    rtems_libio_unlock();
174  }
175
176  if (sc == RTEMS_SUCCESSFUL) {
177    sc = rtems_semaphore_obtain(pipe_semaphore, RTEMS_WAIT, RTEMS_NO_TIMEOUT);
178  }
179
180  if (sc == RTEMS_SUCCESSFUL) {
181    return 0;
182  } else {
183    return -EINTR;
184  }
185}
186
187static void pipe_unlock(void)
188{
189  rtems_status_code sc = RTEMS_SUCCESSFUL;
190
191  sc = rtems_semaphore_release(pipe_semaphore);
192  if (sc != RTEMS_SUCCESSFUL) {
193    /* FIXME */
194    rtems_fatal_error_occurred(0xdeadbeef);
195  }
196}
197
198/*
199 * If called with *pipep = NULL, pipe_new will call pipe_alloc to allocate a
200 * pipe control structure and set *pipep to its address.
201 * pipe is locked, when pipe_new returns with no error.
202 */
203static int pipe_new(
204  pipe_control_t **pipep
205)
206{
207  pipe_control_t *pipe;
208  int err = 0;
209
210  err = pipe_lock();
211  if (err)
212    return err;
213
214  pipe = *pipep;
215  if (pipe == NULL) {
216    err = pipe_alloc(&pipe);
217    if (err)
218      goto out;
219  }
220
221  if (! PIPE_LOCK(pipe))
222    err = -EINTR;
223
224  if (*pipep == NULL) {
225    if (err)
226      pipe_free(pipe);
227    else
228      *pipep = pipe;
229  }
230
231out:
232  pipe_unlock();
233  return err;
234}
235
236/*
237 * Interface to file system close.
238 *
239 * *pipep points to pipe control structure. When the last user releases pipe,
240 * it will be set to NULL.
241 */
242int pipe_release(
243  pipe_control_t **pipep,
244  rtems_libio_t *iop
245)
246{
247  pipe_control_t *pipe = *pipep;
248  uint32_t mode;
249
250  rtems_status_code sc;
251
252  if (pipe_lock())
253    /* WARN pipe not freed and pipep not set to NULL! */
254    /* FIXME */
255    rtems_fatal_error_occurred(0xdeadbeef);
256
257  if (!PIPE_LOCK(pipe))
258    /* WARN pipe not released! */
259    /* FIXME */
260    rtems_fatal_error_occurred(0xdeadbeef);
261
262  mode = LIBIO_ACCMODE(iop);
263  if (mode & LIBIO_FLAGS_READ)
264     pipe->Readers --;
265  if (mode & LIBIO_FLAGS_WRITE)
266     pipe->Writers --;
267
268  PIPE_UNLOCK(pipe);
269
270  if (pipe->Readers == 0 && pipe->Writers == 0) {
271#if 0
272    /* To delete an anonymous pipe file when all users closed it */
273    if (pipe->Anonymous)
274      delfile = TRUE;
275#endif
276    pipe_free(pipe);
277    *pipep = NULL;
278  }
279  else if (pipe->Readers == 0 && mode != LIBIO_FLAGS_WRITE)
280    /* Notify waiting Writers that all their partners left */
281    PIPE_WAKEUPWRITERS(pipe);
282  else if (pipe->Writers == 0 && mode != LIBIO_FLAGS_READ)
283    PIPE_WAKEUPREADERS(pipe);
284
285  pipe_unlock();
286
287#if 0
288  if (! delfile)
289    return 0;
290  if (iop->pathinfo.ops->unlink_h == NULL)
291    return 0;
292
293  /* This is safe for IMFS, but how about other FSes? */
294  iop->flags &= ~LIBIO_FLAGS_OPEN;
295  if(iop->pathinfo.ops->unlink_h(&iop->pathinfo))
296    return -errno;
297#endif
298
299  return 0;
300}
301
302/*
303 * Interface to file system open.
304 *
305 * *pipep points to pipe control structure. If called with *pipep = NULL,
306 * fifo_open will try allocating and initializing a control structure. If the
307 * call succeeds, *pipep will be set to address of new control structure.
308 */
309int fifo_open(
310  pipe_control_t **pipep,
311  rtems_libio_t *iop
312)
313{
314  pipe_control_t *pipe;
315  uint prevCounter;
316  int err;
317
318  err = pipe_new(pipep);
319  if (err)
320    return err;
321  pipe = *pipep;
322
323  switch (LIBIO_ACCMODE(iop)) {
324    case LIBIO_FLAGS_READ:
325      pipe->readerCounter ++;
326      if (pipe->Readers ++ == 0)
327        PIPE_WAKEUPWRITERS(pipe);
328
329      if (pipe->Writers == 0) {
330        /* Not an error */
331        if (LIBIO_NODELAY(iop))
332          break;
333
334        prevCounter = pipe->writerCounter;
335        err = -EINTR;
336        /* Wait until a writer opens the pipe */
337        do {
338          PIPE_UNLOCK(pipe);
339          if (! PIPE_READWAIT(pipe))
340            goto out_error;
341          if (! PIPE_LOCK(pipe))
342            goto out_error;
343        } while (prevCounter == pipe->writerCounter);
344      }
345      break;
346
347    case LIBIO_FLAGS_WRITE:
348      if (pipe->Readers == 0 && LIBIO_NODELAY(iop)) {
349        err = -ENXIO;
350        goto out_error;
351      }
352
353      pipe->writerCounter ++;
354      if (pipe->Writers ++ == 0)
355        PIPE_WAKEUPREADERS(pipe);
356
357      if (pipe->Readers == 0) {
358        prevCounter = pipe->readerCounter;
359        err = -EINTR;
360        do {
361          PIPE_UNLOCK(pipe);
362          if (! PIPE_WRITEWAIT(pipe))
363            goto out_error;
364          if (! PIPE_LOCK(pipe))
365            goto out_error;
366        } while (prevCounter == pipe->readerCounter);
367      }
368      break;
369
370    case LIBIO_FLAGS_READ_WRITE:
371      pipe->readerCounter ++;
372      if (pipe->Readers ++ == 0)
373        PIPE_WAKEUPWRITERS(pipe);
374      pipe->writerCounter ++;
375      if (pipe->Writers ++ == 0)
376        PIPE_WAKEUPREADERS(pipe);
377      break;
378  }
379
380  PIPE_UNLOCK(pipe);
381  return 0;
382
383out_error:
384  pipe_release(pipep, iop);
385  return err;
386}
387
388/*
389 * Interface to file system read.
390 */
391ssize_t pipe_read(
392  pipe_control_t *pipe,
393  void           *buffer,
394  size_t          count,
395  rtems_libio_t  *iop
396)
397{
398  int chunk, chunk1, read = 0, ret = 0;
399
400  if (! PIPE_LOCK(pipe))
401    return -EINTR;
402
403  while (read < count) {
404    while (PIPE_EMPTY(pipe)) {
405      /* Not an error */
406      if (pipe->Writers == 0)
407        goto out_locked;
408
409      if (LIBIO_NODELAY(iop)) {
410        ret = -EAGAIN;
411        goto out_locked;
412      }
413
414      /* Wait until pipe is no more empty or no writer exists */
415      pipe->waitingReaders ++;
416      PIPE_UNLOCK(pipe);
417      if (! PIPE_READWAIT(pipe))
418        ret = -EINTR;
419      if (! PIPE_LOCK(pipe)) {
420        /* WARN waitingReaders not restored! */
421        ret = -EINTR;
422        goto out_nolock;
423      }
424      pipe->waitingReaders --;
425      if (ret != 0)
426        goto out_locked;
427    }
428
429    /* Read chunk bytes */
430    chunk = MIN(count - read,  pipe->Length);
431    chunk1 = pipe->Size - pipe->Start;
432    if (chunk > chunk1) {
433      memcpy(buffer + read, pipe->Buffer + pipe->Start, chunk1);
434      memcpy(buffer + read + chunk1, pipe->Buffer, chunk - chunk1);
435    }
436    else
437      memcpy(buffer + read, pipe->Buffer + pipe->Start, chunk);
438
439    pipe->Start += chunk;
440    pipe->Start %= pipe->Size;
441    pipe->Length -= chunk;
442    /* For buffering optimization */
443    if (PIPE_EMPTY(pipe))
444      pipe->Start = 0;
445
446    if (pipe->waitingWriters > 0)
447      PIPE_WAKEUPWRITERS(pipe);
448    read += chunk;
449  }
450
451out_locked:
452  PIPE_UNLOCK(pipe);
453
454out_nolock:
455  if (read > 0)
456    return read;
457  return ret;
458}
459
460/*
461 * Interface to file system write.
462 */
463ssize_t pipe_write(
464  pipe_control_t *pipe,
465  const void     *buffer,
466  size_t          count,
467  rtems_libio_t  *iop
468)
469{
470  int chunk, chunk1, written = 0, ret = 0;
471
472  /* Write nothing */
473  if (count == 0)
474    return 0;
475
476  if (! PIPE_LOCK(pipe))
477    return -EINTR;
478
479  if (pipe->Readers == 0) {
480    ret = -EPIPE;
481    goto out_locked;
482  }
483
484  /* Write of PIPE_BUF bytes or less shall not be interleaved */
485  chunk = count <= pipe->Size ? count : 1;
486
487  while (written < count) {
488    while (PIPE_SPACE(pipe) < chunk) {
489      if (LIBIO_NODELAY(iop)) {
490        ret = -EAGAIN;
491        goto out_locked;
492      }
493
494      /* Wait until there is chunk bytes space or no reader exists */
495      pipe->waitingWriters ++;
496      PIPE_UNLOCK(pipe);
497      if (! PIPE_WRITEWAIT(pipe))
498        ret = -EINTR;
499      if (! PIPE_LOCK(pipe)) {
500        /* WARN waitingWriters not restored! */
501        ret = -EINTR;
502        goto out_nolock;
503      }
504      pipe->waitingWriters --;
505      if (ret != 0)
506        goto out_locked;
507
508      if (pipe->Readers == 0) {
509        ret = -EPIPE;
510        goto out_locked;
511      }
512    }
513
514    chunk = MIN(count - written, PIPE_SPACE(pipe));
515    chunk1 = pipe->Size - PIPE_WSTART(pipe);
516    if (chunk > chunk1) {
517      memcpy(pipe->Buffer + PIPE_WSTART(pipe), buffer + written, chunk1);
518      memcpy(pipe->Buffer, buffer + written + chunk1, chunk - chunk1);
519    }
520    else
521      memcpy(pipe->Buffer + PIPE_WSTART(pipe), buffer + written, chunk);
522
523    pipe->Length += chunk;
524    if (pipe->waitingReaders > 0)
525      PIPE_WAKEUPREADERS(pipe);
526    written += chunk;
527    /* Write of more than PIPE_BUF bytes can be interleaved */
528    chunk = 1;
529  }
530
531out_locked:
532  PIPE_UNLOCK(pipe);
533
534out_nolock:
535#ifdef RTEMS_POSIX_API
536  /* Signal SIGPIPE */
537  if (ret == -EPIPE)
538    kill(getpid(), SIGPIPE);
539#endif
540
541  if (written > 0)
542    return written;
543  return ret;
544}
545
546/*
547 * Interface to file system ioctl.
548 */
549int pipe_ioctl(
550  pipe_control_t *pipe,
551  uint32_t        cmd,
552  void           *buffer,
553  rtems_libio_t  *iop
554)
555{
556  if (cmd == FIONREAD) {
557    if (buffer == NULL)
558      return -EFAULT;
559
560    if (! PIPE_LOCK(pipe))
561      return -EINTR;
562
563    /* Return length of pipe */
564    *(uint *)buffer = pipe->Length;
565    PIPE_UNLOCK(pipe);
566    return 0;
567  }
568
569  return -EINVAL;
570}
571
572/*
573 * Interface to file system lseek.
574 */
575int pipe_lseek(
576  pipe_control_t *pipe,
577  off_t           offset,
578  int             whence,
579  rtems_libio_t  *iop
580)
581{
582  /* Seek on pipe is not supported */
583  return -ESPIPE;
584}
Note: See TracBrowser for help on using the repository browser.