source: rtems/cpukit/libfs/src/pipe/fifo.c @ 9efa012

4.104.115
Last change on this file since 9efa012 was 9efa012, checked in by Joel Sherrill <joel.sherrill@…>, on 12/03/08 at 17:38:21

2008-12-03 Joel Sherrill <joel.sherrill@…>

  • libfs/src/pipe/fifo.c: Actually disable initializing pipes as requested in confdefs.h.
  • Property mode set to 100644
File size: 12.4 KB
Line 
1/*
2 * fifo.c: POSIX FIFO/pipe for RTEMS
3 *
4 * Author: Wei Shen <cquark@gmail.com>
5 *
6 * The license and distribution terms for this file may be
7 * found in the file LICENSE in this distribution or at
8 * http://www.rtems.com/license/LICENSE.
9 *
10 * $Id$
11 */
12
13#include <errno.h>
14#include <stdlib.h>
15#include "pipe.h"
16
17
18#define MIN(a, b) ((a) < (b)? (a): (b))
19
20#define LIBIO_ACCMODE(_iop) ((_iop)->flags & LIBIO_FLAGS_READ_WRITE)
21#define LIBIO_NODELAY(_iop) ((_iop)->flags & LIBIO_FLAGS_NO_DELAY)
22
23extern uint16_t rtems_pipe_no;
24static rtems_id rtems_pipe_semaphore = 0;
25extern bool rtems_pipe_configured;
26
27
28#define PIPE_EMPTY(_pipe) (_pipe->Length == 0)
29#define PIPE_FULL(_pipe)  (_pipe->Length == _pipe->Size)
30#define PIPE_SPACE(_pipe) (_pipe->Size - _pipe->Length)
31#define PIPE_WSTART(_pipe) ((_pipe->Start + _pipe->Length) % _pipe->Size)
32
33#define PIPE_LOCK(_pipe)  \
34  ( rtems_semaphore_obtain(_pipe->Semaphore, RTEMS_WAIT, RTEMS_NO_TIMEOUT)  \
35   == RTEMS_SUCCESSFUL )
36
37#define PIPE_UNLOCK(_pipe)  rtems_semaphore_release(_pipe->Semaphore)
38
39#define PIPE_READWAIT(_pipe)  \
40  ( rtems_barrier_wait(_pipe->readBarrier, RTEMS_NO_TIMEOUT)  \
41   == RTEMS_SUCCESSFUL)
42
43#define PIPE_WRITEWAIT(_pipe)  \
44  ( rtems_barrier_wait(_pipe->writeBarrier, RTEMS_NO_TIMEOUT)  \
45   == RTEMS_SUCCESSFUL)
46
47#define PIPE_WAKEUPREADERS(_pipe) \
48  do {uint32_t n; rtems_barrier_release(_pipe->readBarrier, &n); } while(0)
49
50#define PIPE_WAKEUPWRITERS(_pipe) \
51  do {uint32_t n; rtems_barrier_release(_pipe->writeBarrier, &n); } while(0)
52
53
54#ifdef RTEMS_POSIX_API
55
56#include <rtems/rtems/barrier.inl>
57#include <rtems/score/thread.inl>
58
59/* Set barriers to be interruptible by signals. */
60static void pipe_interruptible(pipe_control_t *pipe)
61{
62  Objects_Locations location;
63
64  _Barrier_Get(pipe->readBarrier, &location)->Barrier.Wait_queue.state
65    |= STATES_INTERRUPTIBLE_BY_SIGNAL;
66  _Thread_Enable_dispatch();
67  _Barrier_Get(pipe->writeBarrier, &location)->Barrier.Wait_queue.state
68    |= STATES_INTERRUPTIBLE_BY_SIGNAL;
69  _Thread_Enable_dispatch();
70}
71#endif
72
73/*
74 * Alloc pipe control structure, buffer, and resources.
75 * Called with rtems_pipe_semaphore held.
76 */
77static int pipe_alloc(
78  pipe_control_t **pipep
79)
80{
81  static char c = 'a';
82  pipe_control_t *pipe;
83  int err = -ENOMEM;
84
85  pipe = malloc(sizeof(pipe_control_t));
86  if (pipe == NULL)
87    return err;
88  memset(pipe, 0, sizeof(pipe_control_t));
89
90  pipe->Size = PIPE_BUF;
91  pipe->Buffer = malloc(pipe->Size);
92  if (! pipe->Buffer)
93    goto err_buf;
94
95  err = -EINTR;
96  if (rtems_barrier_create(
97        rtems_build_name ('P', 'I', 'r', c),
98        RTEMS_BARRIER_MANUAL_RELEASE, 0,
99        &pipe->readBarrier) != RTEMS_SUCCESSFUL)
100    goto err_rbar;
101  if (rtems_barrier_create(
102        rtems_build_name ('P', 'I', 'w', c),
103        RTEMS_BARRIER_MANUAL_RELEASE, 0,
104        &pipe->writeBarrier) != RTEMS_SUCCESSFUL)
105    goto err_wbar;
106  if (rtems_semaphore_create(
107        rtems_build_name ('P', 'I', 's', c), 1,
108        RTEMS_BINARY_SEMAPHORE | RTEMS_FIFO,
109        RTEMS_NO_PRIORITY, &pipe->Semaphore) != RTEMS_SUCCESSFUL)
110    goto err_sem;
111
112#ifdef RTEMS_POSIX_API
113  pipe_interruptible(pipe);
114#endif
115
116  *pipep = pipe;
117  if (c ++ == 'z')
118    c = 'a';
119  return 0;
120
121err_sem:
122  rtems_barrier_delete(pipe->writeBarrier);
123err_wbar:
124  rtems_barrier_delete(pipe->readBarrier);
125err_rbar:
126  free(pipe->Buffer);
127err_buf:
128  free(pipe);
129  return err;
130}
131
132/* Called with rtems_pipe_semaphore held. */
133static inline void pipe_free(
134  pipe_control_t *pipe
135)
136{
137  rtems_barrier_delete(pipe->readBarrier);
138  rtems_barrier_delete(pipe->writeBarrier);
139  rtems_semaphore_delete(pipe->Semaphore);
140  free(pipe->Buffer);
141  free(pipe);
142}
143
144/*
145 * If called with *pipep = NULL, pipe_new will call pipe_alloc to allocate a
146 * pipe control structure and set *pipep to its address.
147 * pipe is locked, when pipe_new returns with no error.
148 */
149static int pipe_new(
150  pipe_control_t **pipep
151)
152{
153  pipe_control_t *pipe;
154  int err = 0;
155
156  if (rtems_semaphore_obtain(rtems_pipe_semaphore,
157        RTEMS_WAIT, RTEMS_NO_TIMEOUT) != RTEMS_SUCCESSFUL)
158    return -EINTR;
159
160  pipe = *pipep;
161  if (pipe == NULL) {
162    err = pipe_alloc(&pipe);
163    if (err)
164      goto out;
165  }
166
167  if (! PIPE_LOCK(pipe))
168    err = -EINTR;
169
170  if (*pipep == NULL) {
171    if (err)
172      pipe_free(pipe);
173    else
174      *pipep = pipe;
175  }
176
177out:
178  rtems_semaphore_release(rtems_pipe_semaphore);
179  return err;
180}
181
182/*
183 * Interface to file system close.
184 *
185 * *pipep points to pipe control structure. When the last user releases pipe,
186 * it will be set to NULL.
187 */
188int pipe_release(
189  pipe_control_t **pipep,
190  rtems_libio_t *iop
191)
192{
193  pipe_control_t *pipe = *pipep;
194  uint32_t mode;
195
196  rtems_status_code sc;
197  sc = rtems_semaphore_obtain(pipe->Semaphore,
198                              RTEMS_WAIT, RTEMS_NO_TIMEOUT);
199  /* WARN pipe not released! */
200  if(sc != RTEMS_SUCCESSFUL)
201    rtems_fatal_error_occurred(sc);
202
203  mode = LIBIO_ACCMODE(iop);
204  if (mode & LIBIO_FLAGS_READ)
205     pipe->Readers --;
206  if (mode & LIBIO_FLAGS_WRITE)
207     pipe->Writers --;
208
209  sc = rtems_semaphore_obtain(rtems_pipe_semaphore,
210                              RTEMS_WAIT, RTEMS_NO_TIMEOUT);
211  /* WARN pipe not freed and pipep not set to NULL! */
212  if(sc != RTEMS_SUCCESSFUL)
213    rtems_fatal_error_occurred(sc);
214
215  PIPE_UNLOCK(pipe);
216
217  if (pipe->Readers == 0 && pipe->Writers == 0) {
218#if 0
219    /* To delete an anonymous pipe file when all users closed it */
220    if (pipe->Anonymous)
221      delfile = TRUE;
222#endif
223    pipe_free(pipe);
224    *pipep = NULL;
225  }
226  else if (pipe->Readers == 0 && mode != LIBIO_FLAGS_WRITE)
227    /* Notify waiting Writers that all their partners left */
228    PIPE_WAKEUPWRITERS(pipe);
229  else if (pipe->Writers == 0 && mode != LIBIO_FLAGS_READ)
230    PIPE_WAKEUPREADERS(pipe);
231
232  rtems_semaphore_release(rtems_pipe_semaphore);
233
234#if 0
235  if (! delfile)
236    return 0;
237  if (iop->pathinfo.ops->unlink_h == NULL)
238    return 0;
239
240  /* This is safe for IMFS, but how about other FSes? */
241  iop->flags &= ~LIBIO_FLAGS_OPEN;
242  if(iop->pathinfo.ops->unlink_h(&iop->pathinfo))
243    return -errno;
244#endif
245
246  return 0;
247}
248
249/*
250 * Interface to file system open.
251 *
252 * *pipep points to pipe control structure. If called with *pipep = NULL,
253 * fifo_open will try allocating and initializing a control structure. If the
254 * call succeeds, *pipep will be set to address of new control structure.
255 */
256int fifo_open(
257  pipe_control_t **pipep,
258  rtems_libio_t *iop
259)
260{
261  pipe_control_t *pipe;
262  uint prevCounter;
263  int err;
264
265  err = pipe_new(pipep);
266  if (err)
267    return err;
268  pipe = *pipep;
269
270  switch (LIBIO_ACCMODE(iop)) {
271    case LIBIO_FLAGS_READ:
272      pipe->readerCounter ++;
273      if (pipe->Readers ++ == 0)
274        PIPE_WAKEUPWRITERS(pipe);
275
276      if (pipe->Writers == 0) {
277        /* Not an error */
278        if (LIBIO_NODELAY(iop))
279          break;
280
281        prevCounter = pipe->writerCounter;
282        err = -EINTR;
283        /* Wait until a writer opens the pipe */
284        do {
285          PIPE_UNLOCK(pipe);
286          if (! PIPE_READWAIT(pipe))
287            goto out_error;
288          if (! PIPE_LOCK(pipe))
289            goto out_error;
290        } while (prevCounter == pipe->writerCounter);
291      }
292      break;
293
294    case LIBIO_FLAGS_WRITE:
295      if (pipe->Readers == 0 && LIBIO_NODELAY(iop)) {
296        err = -ENXIO;
297        goto out_error;
298      }
299
300      pipe->writerCounter ++;
301      if (pipe->Writers ++ == 0)
302        PIPE_WAKEUPREADERS(pipe);
303
304      if (pipe->Readers == 0) {
305        prevCounter = pipe->readerCounter;
306        err = -EINTR;
307        do {
308          PIPE_UNLOCK(pipe);
309          if (! PIPE_WRITEWAIT(pipe))
310            goto out_error;
311          if (! PIPE_LOCK(pipe))
312            goto out_error;
313        } while (prevCounter == pipe->readerCounter);
314      }
315      break;
316
317    case LIBIO_FLAGS_READ_WRITE:
318      pipe->readerCounter ++;
319      if (pipe->Readers ++ == 0)
320        PIPE_WAKEUPWRITERS(pipe);
321      pipe->writerCounter ++;
322      if (pipe->Writers ++ == 0)
323        PIPE_WAKEUPREADERS(pipe);
324      break;
325  }
326
327  PIPE_UNLOCK(pipe);
328  return 0;
329
330out_error:
331  pipe_release(pipep, iop);
332  return err;
333}
334
335/*
336 * Interface to file system read.
337 */
338ssize_t pipe_read(
339  pipe_control_t *pipe,
340  void           *buffer,
341  size_t          count,
342  rtems_libio_t  *iop
343)
344{
345  int chunk, chunk1, read = 0, ret = 0;
346
347  if (! PIPE_LOCK(pipe))
348    return -EINTR;
349
350  while (read < count) {
351    while (PIPE_EMPTY(pipe)) {
352      /* Not an error */
353      if (pipe->Writers == 0)
354        goto out_locked;
355
356      if (LIBIO_NODELAY(iop)) {
357        ret = -EAGAIN;
358        goto out_locked;
359      }
360
361      /* Wait until pipe is no more empty or no writer exists */
362      pipe->waitingReaders ++;
363      PIPE_UNLOCK(pipe);
364      if (! PIPE_READWAIT(pipe))
365        ret = -EINTR;
366      if (! PIPE_LOCK(pipe)) {
367        /* WARN waitingReaders not restored! */
368        ret = -EINTR;
369        goto out_nolock;
370      }
371      pipe->waitingReaders --;
372      if (ret != 0)
373        goto out_locked;
374    }
375
376    /* Read chunk bytes */
377    chunk = MIN(count - read,  pipe->Length);
378    chunk1 = pipe->Size - pipe->Start;
379    if (chunk > chunk1) {
380      memcpy(buffer + read, pipe->Buffer + pipe->Start, chunk1);
381      memcpy(buffer + read + chunk1, pipe->Buffer, chunk - chunk1);
382    }
383    else
384      memcpy(buffer + read, pipe->Buffer + pipe->Start, chunk);
385
386    pipe->Start += chunk;
387    pipe->Start %= pipe->Size;
388    pipe->Length -= chunk;
389    /* For buffering optimization */
390    if (PIPE_EMPTY(pipe))
391      pipe->Start = 0;
392
393    if (pipe->waitingWriters > 0)
394      PIPE_WAKEUPWRITERS(pipe);
395    read += chunk;
396  }
397
398out_locked:
399  PIPE_UNLOCK(pipe);
400
401out_nolock:
402  if (read > 0)
403    return read;
404  return ret;
405}
406
407/*
408 * Interface to file system write.
409 */
410ssize_t pipe_write(
411  pipe_control_t *pipe,
412  const void     *buffer,
413  size_t          count,
414  rtems_libio_t  *iop
415)
416{
417  int chunk, chunk1, written = 0, ret = 0;
418
419  /* Write nothing */
420  if (count == 0)
421    return 0;
422
423  if (! PIPE_LOCK(pipe))
424    return -EINTR;
425
426  if (pipe->Readers == 0) {
427    ret = -EPIPE;
428    goto out_locked;
429  }
430
431  /* Write of PIPE_BUF bytes or less shall not be interleaved */
432  chunk = count <= pipe->Size ? count : 1;
433
434  while (written < count) {
435    while (PIPE_SPACE(pipe) < chunk) {
436      if (LIBIO_NODELAY(iop)) {
437        ret = -EAGAIN;
438        goto out_locked;
439      }
440
441      /* Wait until there is chunk bytes space or no reader exists */
442      pipe->waitingWriters ++;
443      PIPE_UNLOCK(pipe);
444      if (! PIPE_WRITEWAIT(pipe))
445        ret = -EINTR;
446      if (! PIPE_LOCK(pipe)) {
447        /* WARN waitingWriters not restored! */
448        ret = -EINTR;
449        goto out_nolock;
450      }
451      pipe->waitingWriters --;
452      if (ret != 0)
453        goto out_locked;
454
455      if (pipe->Readers == 0) {
456        ret = -EPIPE;
457        goto out_locked;
458      }
459    }
460
461    chunk = MIN(count - written, PIPE_SPACE(pipe));
462    chunk1 = pipe->Size - PIPE_WSTART(pipe);
463    if (chunk > chunk1) {
464      memcpy(pipe->Buffer + PIPE_WSTART(pipe), buffer + written, chunk1);
465      memcpy(pipe->Buffer, buffer + written + chunk1, chunk - chunk1);
466    }
467    else
468      memcpy(pipe->Buffer + PIPE_WSTART(pipe), buffer + written, chunk);
469
470    pipe->Length += chunk;
471    if (pipe->waitingReaders > 0)
472      PIPE_WAKEUPREADERS(pipe);
473    written += chunk;
474    /* Write of more than PIPE_BUF bytes can be interleaved */
475    chunk = 1;
476  }
477
478out_locked:
479  PIPE_UNLOCK(pipe);
480
481out_nolock:
482#ifdef RTEMS_POSIX_API
483  /* Signal SIGPIPE */
484  if (ret == -EPIPE)
485    kill(getpid(), SIGPIPE);
486#endif
487
488  if (written > 0)
489    return written;
490  return ret;
491}
492
493/*
494 * Interface to file system ioctl.
495 */
496int pipe_ioctl(
497  pipe_control_t *pipe,
498  uint32_t        cmd,
499  void           *buffer,
500  rtems_libio_t  *iop
501)
502{
503  if (cmd == FIONREAD) {
504    if (buffer == NULL)
505      return -EFAULT;
506
507    if (! PIPE_LOCK(pipe))
508      return -EINTR;
509
510    /* Return length of pipe */
511    *(uint *)buffer = pipe->Length;
512    PIPE_UNLOCK(pipe);
513    return 0;
514  }
515
516  return -EINVAL;
517}
518
519/*
520 * Interface to file system lseek.
521 */
522int pipe_lseek(
523  pipe_control_t *pipe,
524  off_t           offset,
525  int             whence,
526  rtems_libio_t  *iop
527)
528{
529  /* Seek on pipe is not supported */
530  return -ESPIPE;
531}
532
533/*
534 * Initialization of FIFO/pipe module.
535 */
536void rtems_pipe_initialize (void)
537{
538  if (!rtems_pipe_configured)
539    return;
540
541  if (rtems_pipe_semaphore)
542    return;
543
544  rtems_status_code sc;
545  sc = rtems_semaphore_create(
546        rtems_build_name ('P', 'I', 'P', 'E'), 1,
547        RTEMS_BINARY_SEMAPHORE | RTEMS_INHERIT_PRIORITY | RTEMS_PRIORITY,
548        RTEMS_NO_PRIORITY, &rtems_pipe_semaphore);
549  if (sc != RTEMS_SUCCESSFUL)
550    rtems_fatal_error_occurred (sc);
551
552  rtems_interval now;
553  rtems_clock_get(RTEMS_CLOCK_GET_TICKS_SINCE_BOOT, &now);
554  rtems_pipe_no = now;
555}
Note: See TracBrowser for help on using the repository browser.