source: rtems/cpukit/libfs/src/pipe/fifo.c @ 11109ea

4.115
Last change on this file since 11109ea was 11109ea, checked in by Alex Ivanov <alexivanov97@…>, on 12/18/12 at 20:46:38

libfs: Doxygen Enhancement Task #2

http://www.google-melange.com/gci/task/view/google/gci2012/8032207

  • Property mode set to 100644
File size: 11.9 KB
Line 
1/**
2 * @file
3 *
4 * @brief FIFO/Pipe Support
5 * @ingroup FIFO_PIPE
6 */
7
8/*
9 * Author: Wei Shen <cquark@gmail.com>
10 *
11 * The license and distribution terms for this file may be
12 * found in the file LICENSE in this distribution or at
13 * http://www.rtems.com/license/LICENSE.
14 */
15
16
17#if HAVE_CONFIG_H
18#include "config.h"
19#endif
20
21#ifdef RTEMS_POSIX_API
22#define __RTEMS_VIOLATE_KERNEL_VISIBILITY__
23#endif
24
25#include <errno.h>
26#include <stdlib.h>
27
28#include <rtems.h>
29#include <rtems/libio_.h>
30
31#include "pipe.h"
32
33
34#define MIN(a, b) ((a) < (b)? (a): (b))
35
36#define LIBIO_ACCMODE(_iop) ((_iop)->flags & LIBIO_FLAGS_READ_WRITE)
37#define LIBIO_NODELAY(_iop) ((_iop)->flags & LIBIO_FLAGS_NO_DELAY)
38
39static rtems_id pipe_semaphore = RTEMS_ID_NONE;
40
41
42#define PIPE_EMPTY(_pipe) (_pipe->Length == 0)
43#define PIPE_FULL(_pipe)  (_pipe->Length == _pipe->Size)
44#define PIPE_SPACE(_pipe) (_pipe->Size - _pipe->Length)
45#define PIPE_WSTART(_pipe) ((_pipe->Start + _pipe->Length) % _pipe->Size)
46
47#define PIPE_LOCK(_pipe)  \
48  ( rtems_semaphore_obtain(_pipe->Semaphore, RTEMS_WAIT, RTEMS_NO_TIMEOUT)  \
49   == RTEMS_SUCCESSFUL )
50
51#define PIPE_UNLOCK(_pipe)  rtems_semaphore_release(_pipe->Semaphore)
52
53#define PIPE_READWAIT(_pipe)  \
54  ( rtems_barrier_wait(_pipe->readBarrier, RTEMS_NO_TIMEOUT)  \
55   == RTEMS_SUCCESSFUL)
56
57#define PIPE_WRITEWAIT(_pipe)  \
58  ( rtems_barrier_wait(_pipe->writeBarrier, RTEMS_NO_TIMEOUT)  \
59   == RTEMS_SUCCESSFUL)
60
61#define PIPE_WAKEUPREADERS(_pipe) \
62  do {uint32_t n; rtems_barrier_release(_pipe->readBarrier, &n); } while(0)
63
64#define PIPE_WAKEUPWRITERS(_pipe) \
65  do {uint32_t n; rtems_barrier_release(_pipe->writeBarrier, &n); } while(0)
66
67
68#ifdef RTEMS_POSIX_API
69#define __RTEMS_VIOLATE_KERNEL_VISIBILITY__
70
71#include <rtems/rtems/barrier.h>
72#include <rtems/score/thread.h>
73
74/* Set barriers to be interruptible by signals. */
75static void pipe_interruptible(pipe_control_t *pipe)
76{
77  Objects_Locations location;
78
79  _Barrier_Get(pipe->readBarrier, &location)->Barrier.Wait_queue.state
80    |= STATES_INTERRUPTIBLE_BY_SIGNAL;
81  _Thread_Enable_dispatch();
82  _Barrier_Get(pipe->writeBarrier, &location)->Barrier.Wait_queue.state
83    |= STATES_INTERRUPTIBLE_BY_SIGNAL;
84  _Thread_Enable_dispatch();
85}
86#endif
87
88/*
89 * Alloc pipe control structure, buffer, and resources.
90 * Called with pipe_semaphore held.
91 */
92static int pipe_alloc(
93  pipe_control_t **pipep
94)
95{
96  static char c = 'a';
97  pipe_control_t *pipe;
98  int err = -ENOMEM;
99
100  pipe = malloc(sizeof(pipe_control_t));
101  if (pipe == NULL)
102    return err;
103  memset(pipe, 0, sizeof(pipe_control_t));
104
105  pipe->Size = PIPE_BUF;
106  pipe->Buffer = malloc(pipe->Size);
107  if (! pipe->Buffer)
108    goto err_buf;
109
110  err = -ENOMEM;
111
112  if (rtems_barrier_create(
113        rtems_build_name ('P', 'I', 'r', c),
114        RTEMS_BARRIER_MANUAL_RELEASE, 0,
115        &pipe->readBarrier) != RTEMS_SUCCESSFUL)
116    goto err_rbar;
117  if (rtems_barrier_create(
118        rtems_build_name ('P', 'I', 'w', c),
119        RTEMS_BARRIER_MANUAL_RELEASE, 0,
120        &pipe->writeBarrier) != RTEMS_SUCCESSFUL)
121    goto err_wbar;
122  if (rtems_semaphore_create(
123        rtems_build_name ('P', 'I', 's', c), 1,
124        RTEMS_BINARY_SEMAPHORE | RTEMS_FIFO,
125        RTEMS_NO_PRIORITY, &pipe->Semaphore) != RTEMS_SUCCESSFUL)
126    goto err_sem;
127
128#ifdef RTEMS_POSIX_API
129  pipe_interruptible(pipe);
130#endif
131
132  *pipep = pipe;
133  if (c ++ == 'z')
134    c = 'a';
135  return 0;
136
137err_sem:
138  rtems_barrier_delete(pipe->writeBarrier);
139err_wbar:
140  rtems_barrier_delete(pipe->readBarrier);
141err_rbar:
142  free(pipe->Buffer);
143err_buf:
144  free(pipe);
145  return err;
146}
147
148/* Called with pipe_semaphore held. */
149static inline void pipe_free(
150  pipe_control_t *pipe
151)
152{
153  rtems_barrier_delete(pipe->readBarrier);
154  rtems_barrier_delete(pipe->writeBarrier);
155  rtems_semaphore_delete(pipe->Semaphore);
156  free(pipe->Buffer);
157  free(pipe);
158}
159
160static int pipe_lock(void)
161{
162  rtems_status_code sc = RTEMS_SUCCESSFUL;
163
164  if (pipe_semaphore == RTEMS_ID_NONE) {
165    rtems_libio_lock();
166
167    if (pipe_semaphore == RTEMS_ID_NONE) {
168      sc = rtems_semaphore_create(
169        rtems_build_name('P', 'I', 'P', 'E'),
170        1,
171        RTEMS_BINARY_SEMAPHORE | RTEMS_INHERIT_PRIORITY | RTEMS_PRIORITY,
172        RTEMS_NO_PRIORITY,
173        &pipe_semaphore
174      );
175    }
176
177    rtems_libio_unlock();
178  }
179
180  if (sc == RTEMS_SUCCESSFUL) {
181    sc = rtems_semaphore_obtain(pipe_semaphore, RTEMS_WAIT, RTEMS_NO_TIMEOUT);
182  }
183
184  if (sc == RTEMS_SUCCESSFUL) {
185    return 0;
186  } else {
187    return -ENOMEM;
188  }
189}
190
191static void pipe_unlock(void)
192{
193#ifdef RTEMS_DEBUG
194  rtems_status_code sc = RTEMS_SUCCESSFUL;
195
196  sc =
197#endif
198   rtems_semaphore_release(pipe_semaphore);
199  #ifdef RTEMS_DEBUG
200    if (sc != RTEMS_SUCCESSFUL) {
201      rtems_fatal_error_occurred(0xdeadbeef);
202    }
203  #endif
204}
205
206/*
207 * If called with *pipep = NULL, pipe_new will call pipe_alloc to allocate a
208 * pipe control structure and set *pipep to its address.
209 * pipe is locked, when pipe_new returns with no error.
210 */
211static int pipe_new(
212  pipe_control_t **pipep
213)
214{
215  pipe_control_t *pipe;
216  int err = 0;
217
218  err = pipe_lock();
219  if (err)
220    return err;
221
222  pipe = *pipep;
223  if (pipe == NULL) {
224    err = pipe_alloc(&pipe);
225    if (err)
226      goto out;
227  }
228
229  if (! PIPE_LOCK(pipe))
230    err = -EINTR;
231
232  if (*pipep == NULL) {
233    if (err)
234      pipe_free(pipe);
235    else
236      *pipep = pipe;
237  }
238
239out:
240  pipe_unlock();
241  return err;
242}
243
244void pipe_release(
245  pipe_control_t **pipep,
246  rtems_libio_t *iop
247)
248{
249  pipe_control_t *pipe = *pipep;
250  uint32_t mode;
251
252  #if defined(RTEMS_DEBUG)
253    /* WARN pipe not freed and pipep not set to NULL! */
254    if (pipe_lock())
255      rtems_fatal_error_occurred(0xdeadbeef);
256
257    /* WARN pipe not released! */
258    if (!PIPE_LOCK(pipe))
259      rtems_fatal_error_occurred(0xdeadbeef);
260  #endif
261
262  mode = LIBIO_ACCMODE(iop);
263  if (mode & LIBIO_FLAGS_READ)
264     pipe->Readers --;
265  if (mode & LIBIO_FLAGS_WRITE)
266     pipe->Writers --;
267
268  PIPE_UNLOCK(pipe);
269
270  if (pipe->Readers == 0 && pipe->Writers == 0) {
271#if 0
272    /* To delete an anonymous pipe file when all users closed it */
273    if (pipe->Anonymous)
274      delfile = TRUE;
275#endif
276    pipe_free(pipe);
277    *pipep = NULL;
278  }
279  else if (pipe->Readers == 0 && mode != LIBIO_FLAGS_WRITE)
280    /* Notify waiting Writers that all their partners left */
281    PIPE_WAKEUPWRITERS(pipe);
282  else if (pipe->Writers == 0 && mode != LIBIO_FLAGS_READ)
283    PIPE_WAKEUPREADERS(pipe);
284
285  pipe_unlock();
286
287#if 0
288  if (! delfile)
289    return;
290  if (iop->pathinfo.ops->unlink_h == NULL)
291    return;
292
293  /* This is safe for IMFS, but how about other FSes? */
294  iop->flags &= ~LIBIO_FLAGS_OPEN;
295  if(iop->pathinfo.ops->unlink_h(&iop->pathinfo))
296    return;
297#endif
298
299}
300
301int fifo_open(
302  pipe_control_t **pipep,
303  rtems_libio_t *iop
304)
305{
306  pipe_control_t *pipe;
307  unsigned int prevCounter;
308  int err;
309
310  err = pipe_new(pipep);
311  if (err)
312    return err;
313  pipe = *pipep;
314
315  switch (LIBIO_ACCMODE(iop)) {
316    case LIBIO_FLAGS_READ:
317      pipe->readerCounter ++;
318      if (pipe->Readers ++ == 0)
319        PIPE_WAKEUPWRITERS(pipe);
320
321      if (pipe->Writers == 0) {
322        /* Not an error */
323        if (LIBIO_NODELAY(iop))
324          break;
325
326        prevCounter = pipe->writerCounter;
327        err = -EINTR;
328        /* Wait until a writer opens the pipe */
329        do {
330          PIPE_UNLOCK(pipe);
331          if (! PIPE_READWAIT(pipe))
332            goto out_error;
333          if (! PIPE_LOCK(pipe))
334            goto out_error;
335        } while (prevCounter == pipe->writerCounter);
336      }
337      break;
338
339    case LIBIO_FLAGS_WRITE:
340      pipe->writerCounter ++;
341
342      if (pipe->Writers ++ == 0)
343        PIPE_WAKEUPREADERS(pipe);
344
345      if (pipe->Readers == 0 && LIBIO_NODELAY(iop)) {
346        PIPE_UNLOCK(pipe);
347        err = -ENXIO;
348        goto out_error;
349      }
350
351      if (pipe->Readers == 0) {
352        prevCounter = pipe->readerCounter;
353        err = -EINTR;
354        do {
355          PIPE_UNLOCK(pipe);
356          if (! PIPE_WRITEWAIT(pipe))
357            goto out_error;
358          if (! PIPE_LOCK(pipe))
359            goto out_error;
360        } while (prevCounter == pipe->readerCounter);
361      }
362      break;
363
364    case LIBIO_FLAGS_READ_WRITE:
365      pipe->readerCounter ++;
366      if (pipe->Readers ++ == 0)
367        PIPE_WAKEUPWRITERS(pipe);
368      pipe->writerCounter ++;
369      if (pipe->Writers ++ == 0)
370        PIPE_WAKEUPREADERS(pipe);
371      break;
372  }
373
374  PIPE_UNLOCK(pipe);
375  return 0;
376
377out_error:
378  pipe_release(pipep, iop);
379  return err;
380}
381
382ssize_t pipe_read(
383  pipe_control_t *pipe,
384  void           *buffer,
385  size_t          count,
386  rtems_libio_t  *iop
387)
388{
389  int chunk, chunk1, read = 0, ret = 0;
390
391  if (! PIPE_LOCK(pipe))
392    return -EINTR;
393
394  while (read < count) {
395    while (PIPE_EMPTY(pipe)) {
396      /* Not an error */
397      if (pipe->Writers == 0)
398        goto out_locked;
399
400      if (LIBIO_NODELAY(iop)) {
401        ret = -EAGAIN;
402        goto out_locked;
403      }
404
405      /* Wait until pipe is no more empty or no writer exists */
406      pipe->waitingReaders ++;
407      PIPE_UNLOCK(pipe);
408      if (! PIPE_READWAIT(pipe))
409        ret = -EINTR;
410      if (! PIPE_LOCK(pipe)) {
411        /* WARN waitingReaders not restored! */
412        ret = -EINTR;
413        goto out_nolock;
414      }
415      pipe->waitingReaders --;
416      if (ret != 0)
417        goto out_locked;
418    }
419
420    /* Read chunk bytes */
421    chunk = MIN(count - read,  pipe->Length);
422    chunk1 = pipe->Size - pipe->Start;
423    if (chunk > chunk1) {
424      memcpy(buffer + read, pipe->Buffer + pipe->Start, chunk1);
425      memcpy(buffer + read + chunk1, pipe->Buffer, chunk - chunk1);
426    }
427    else
428      memcpy(buffer + read, pipe->Buffer + pipe->Start, chunk);
429
430    pipe->Start += chunk;
431    pipe->Start %= pipe->Size;
432    pipe->Length -= chunk;
433    /* For buffering optimization */
434    if (PIPE_EMPTY(pipe))
435      pipe->Start = 0;
436
437    if (pipe->waitingWriters > 0)
438      PIPE_WAKEUPWRITERS(pipe);
439    read += chunk;
440  }
441
442out_locked:
443  PIPE_UNLOCK(pipe);
444
445out_nolock:
446  if (read > 0)
447    return read;
448  return ret;
449}
450
451ssize_t pipe_write(
452  pipe_control_t *pipe,
453  const void     *buffer,
454  size_t          count,
455  rtems_libio_t  *iop
456)
457{
458  int chunk, chunk1, written = 0, ret = 0;
459
460  /* Write nothing */
461  if (count == 0)
462    return 0;
463
464  if (! PIPE_LOCK(pipe))
465    return -EINTR;
466
467  if (pipe->Readers == 0) {
468    ret = -EPIPE;
469    goto out_locked;
470  }
471
472  /* Write of PIPE_BUF bytes or less shall not be interleaved */
473  chunk = count <= pipe->Size ? count : 1;
474
475  while (written < count) {
476    while (PIPE_SPACE(pipe) < chunk) {
477      if (LIBIO_NODELAY(iop)) {
478        ret = -EAGAIN;
479        goto out_locked;
480      }
481
482      /* Wait until there is chunk bytes space or no reader exists */
483      pipe->waitingWriters ++;
484      PIPE_UNLOCK(pipe);
485      if (! PIPE_WRITEWAIT(pipe))
486        ret = -EINTR;
487      if (! PIPE_LOCK(pipe)) {
488        /* WARN waitingWriters not restored! */
489        ret = -EINTR;
490        goto out_nolock;
491      }
492      pipe->waitingWriters --;
493      if (ret != 0)
494        goto out_locked;
495
496      if (pipe->Readers == 0) {
497        ret = -EPIPE;
498        goto out_locked;
499      }
500    }
501
502    chunk = MIN(count - written, PIPE_SPACE(pipe));
503    chunk1 = pipe->Size - PIPE_WSTART(pipe);
504    if (chunk > chunk1) {
505      memcpy(pipe->Buffer + PIPE_WSTART(pipe), buffer + written, chunk1);
506      memcpy(pipe->Buffer, buffer + written + chunk1, chunk - chunk1);
507    }
508    else
509      memcpy(pipe->Buffer + PIPE_WSTART(pipe), buffer + written, chunk);
510
511    pipe->Length += chunk;
512    if (pipe->waitingReaders > 0)
513      PIPE_WAKEUPREADERS(pipe);
514    written += chunk;
515    /* Write of more than PIPE_BUF bytes can be interleaved */
516    chunk = 1;
517  }
518
519out_locked:
520  PIPE_UNLOCK(pipe);
521
522out_nolock:
523#ifdef RTEMS_POSIX_API
524  /* Signal SIGPIPE */
525  if (ret == -EPIPE)
526    kill(getpid(), SIGPIPE);
527#endif
528
529  if (written > 0)
530    return written;
531  return ret;
532}
533
534int pipe_ioctl(
535  pipe_control_t  *pipe,
536  ioctl_command_t  cmd,
537  void            *buffer,
538  rtems_libio_t   *iop
539)
540{
541  if (cmd == FIONREAD) {
542    if (buffer == NULL)
543      return -EFAULT;
544
545    if (! PIPE_LOCK(pipe))
546      return -EINTR;
547
548    /* Return length of pipe */
549    *(unsigned int *)buffer = pipe->Length;
550    PIPE_UNLOCK(pipe);
551    return 0;
552  }
553
554  return -EINVAL;
555}
Note: See TracBrowser for help on using the repository browser.