source: rtems/cpukit/libfs/src/pipe/fifo.c @ 0b32bb8

4.115
Last change on this file since 0b32bb8 was 0b32bb8, checked in by Sebastian Huber <sebastian.huber@…>, on 07/19/13 at 08:42:39

rtems: Create barrier implementation header

Move implementation specific parts of barrier.h and barrier.inl into new
header file barrierimpl.h. The barrier.h contains now only the
application visible API.

  • Property mode set to 100644
File size: 12.0 KB
Line 
1/**
2 * @file
3 *
4 * @brief FIFO/Pipe Support
5 * @ingroup FIFO_PIPE
6 */
7
8/*
9 * Author: Wei Shen <cquark@gmail.com>
10 *
11 * The license and distribution terms for this file may be
12 * found in the file LICENSE in this distribution or at
13 * http://www.rtems.com/license/LICENSE.
14 */
15
16
17#if HAVE_CONFIG_H
18#include "config.h"
19#endif
20
21#ifdef RTEMS_POSIX_API
22#define __RTEMS_VIOLATE_KERNEL_VISIBILITY__
23#endif
24
25#include <errno.h>
26#include <stdlib.h>
27
28#include <rtems.h>
29#include <rtems/libio_.h>
30#include <rtems/rtems/barrierimpl.h>
31
32#include "pipe.h"
33
34
35#define MIN(a, b) ((a) < (b)? (a): (b))
36
37#define LIBIO_ACCMODE(_iop) ((_iop)->flags & LIBIO_FLAGS_READ_WRITE)
38#define LIBIO_NODELAY(_iop) ((_iop)->flags & LIBIO_FLAGS_NO_DELAY)
39
40static rtems_id pipe_semaphore = RTEMS_ID_NONE;
41
42
43#define PIPE_EMPTY(_pipe) (_pipe->Length == 0)
44#define PIPE_FULL(_pipe)  (_pipe->Length == _pipe->Size)
45#define PIPE_SPACE(_pipe) (_pipe->Size - _pipe->Length)
46#define PIPE_WSTART(_pipe) ((_pipe->Start + _pipe->Length) % _pipe->Size)
47
48#define PIPE_LOCK(_pipe)  \
49  ( rtems_semaphore_obtain(_pipe->Semaphore, RTEMS_WAIT, RTEMS_NO_TIMEOUT)  \
50   == RTEMS_SUCCESSFUL )
51
52#define PIPE_UNLOCK(_pipe)  rtems_semaphore_release(_pipe->Semaphore)
53
54#define PIPE_READWAIT(_pipe)  \
55  ( rtems_barrier_wait(_pipe->readBarrier, RTEMS_NO_TIMEOUT)  \
56   == RTEMS_SUCCESSFUL)
57
58#define PIPE_WRITEWAIT(_pipe)  \
59  ( rtems_barrier_wait(_pipe->writeBarrier, RTEMS_NO_TIMEOUT)  \
60   == RTEMS_SUCCESSFUL)
61
62#define PIPE_WAKEUPREADERS(_pipe) \
63  do {uint32_t n; rtems_barrier_release(_pipe->readBarrier, &n); } while(0)
64
65#define PIPE_WAKEUPWRITERS(_pipe) \
66  do {uint32_t n; rtems_barrier_release(_pipe->writeBarrier, &n); } while(0)
67
68
69#ifdef RTEMS_POSIX_API
70#define __RTEMS_VIOLATE_KERNEL_VISIBILITY__
71
72#include <rtems/rtems/barrier.h>
73#include <rtems/score/thread.h>
74
75/* Set barriers to be interruptible by signals. */
76static void pipe_interruptible(pipe_control_t *pipe)
77{
78  Objects_Locations  location;
79  Barrier_Control   *the_barrier;
80
81  the_barrier = _Barrier_Get(pipe->readBarrier, &location);
82  the_barrier->Barrier.Wait_queue.state |= STATES_INTERRUPTIBLE_BY_SIGNAL;
83  _Objects_Put( &the_barrier->Object );
84
85  the_barrier = _Barrier_Get(pipe->writeBarrier, &location);
86  the_barrier->Barrier.Wait_queue.state |= STATES_INTERRUPTIBLE_BY_SIGNAL;
87  _Objects_Put( &the_barrier->Object );
88}
89#endif
90
91/*
92 * Alloc pipe control structure, buffer, and resources.
93 * Called with pipe_semaphore held.
94 */
95static int pipe_alloc(
96  pipe_control_t **pipep
97)
98{
99  static char c = 'a';
100  pipe_control_t *pipe;
101  int err = -ENOMEM;
102
103  pipe = malloc(sizeof(pipe_control_t));
104  if (pipe == NULL)
105    return err;
106  memset(pipe, 0, sizeof(pipe_control_t));
107
108  pipe->Size = PIPE_BUF;
109  pipe->Buffer = malloc(pipe->Size);
110  if (! pipe->Buffer)
111    goto err_buf;
112
113  err = -ENOMEM;
114
115  if (rtems_barrier_create(
116        rtems_build_name ('P', 'I', 'r', c),
117        RTEMS_BARRIER_MANUAL_RELEASE, 0,
118        &pipe->readBarrier) != RTEMS_SUCCESSFUL)
119    goto err_rbar;
120  if (rtems_barrier_create(
121        rtems_build_name ('P', 'I', 'w', c),
122        RTEMS_BARRIER_MANUAL_RELEASE, 0,
123        &pipe->writeBarrier) != RTEMS_SUCCESSFUL)
124    goto err_wbar;
125  if (rtems_semaphore_create(
126        rtems_build_name ('P', 'I', 's', c), 1,
127        RTEMS_BINARY_SEMAPHORE | RTEMS_FIFO,
128        RTEMS_NO_PRIORITY, &pipe->Semaphore) != RTEMS_SUCCESSFUL)
129    goto err_sem;
130
131#ifdef RTEMS_POSIX_API
132  pipe_interruptible(pipe);
133#endif
134
135  *pipep = pipe;
136  if (c ++ == 'z')
137    c = 'a';
138  return 0;
139
140err_sem:
141  rtems_barrier_delete(pipe->writeBarrier);
142err_wbar:
143  rtems_barrier_delete(pipe->readBarrier);
144err_rbar:
145  free(pipe->Buffer);
146err_buf:
147  free(pipe);
148  return err;
149}
150
151/* Called with pipe_semaphore held. */
152static inline void pipe_free(
153  pipe_control_t *pipe
154)
155{
156  rtems_barrier_delete(pipe->readBarrier);
157  rtems_barrier_delete(pipe->writeBarrier);
158  rtems_semaphore_delete(pipe->Semaphore);
159  free(pipe->Buffer);
160  free(pipe);
161}
162
163static int pipe_lock(void)
164{
165  rtems_status_code sc = RTEMS_SUCCESSFUL;
166
167  if (pipe_semaphore == RTEMS_ID_NONE) {
168    rtems_libio_lock();
169
170    if (pipe_semaphore == RTEMS_ID_NONE) {
171      sc = rtems_semaphore_create(
172        rtems_build_name('P', 'I', 'P', 'E'),
173        1,
174        RTEMS_BINARY_SEMAPHORE | RTEMS_INHERIT_PRIORITY | RTEMS_PRIORITY,
175        RTEMS_NO_PRIORITY,
176        &pipe_semaphore
177      );
178    }
179
180    rtems_libio_unlock();
181  }
182
183  if (sc == RTEMS_SUCCESSFUL) {
184    sc = rtems_semaphore_obtain(pipe_semaphore, RTEMS_WAIT, RTEMS_NO_TIMEOUT);
185  }
186
187  if (sc == RTEMS_SUCCESSFUL) {
188    return 0;
189  } else {
190    return -ENOMEM;
191  }
192}
193
194static void pipe_unlock(void)
195{
196#ifdef RTEMS_DEBUG
197  rtems_status_code sc = RTEMS_SUCCESSFUL;
198
199  sc =
200#endif
201   rtems_semaphore_release(pipe_semaphore);
202  #ifdef RTEMS_DEBUG
203    if (sc != RTEMS_SUCCESSFUL) {
204      rtems_fatal_error_occurred(0xdeadbeef);
205    }
206  #endif
207}
208
209/*
210 * If called with *pipep = NULL, pipe_new will call pipe_alloc to allocate a
211 * pipe control structure and set *pipep to its address.
212 * pipe is locked, when pipe_new returns with no error.
213 */
214static int pipe_new(
215  pipe_control_t **pipep
216)
217{
218  pipe_control_t *pipe;
219  int err = 0;
220
221  err = pipe_lock();
222  if (err)
223    return err;
224
225  pipe = *pipep;
226  if (pipe == NULL) {
227    err = pipe_alloc(&pipe);
228    if (err)
229      goto out;
230  }
231
232  if (! PIPE_LOCK(pipe))
233    err = -EINTR;
234
235  if (*pipep == NULL) {
236    if (err)
237      pipe_free(pipe);
238    else
239      *pipep = pipe;
240  }
241
242out:
243  pipe_unlock();
244  return err;
245}
246
247void pipe_release(
248  pipe_control_t **pipep,
249  rtems_libio_t *iop
250)
251{
252  pipe_control_t *pipe = *pipep;
253  uint32_t mode;
254
255  #if defined(RTEMS_DEBUG)
256    /* WARN pipe not freed and pipep not set to NULL! */
257    if (pipe_lock())
258      rtems_fatal_error_occurred(0xdeadbeef);
259
260    /* WARN pipe not released! */
261    if (!PIPE_LOCK(pipe))
262      rtems_fatal_error_occurred(0xdeadbeef);
263  #endif
264
265  mode = LIBIO_ACCMODE(iop);
266  if (mode & LIBIO_FLAGS_READ)
267     pipe->Readers --;
268  if (mode & LIBIO_FLAGS_WRITE)
269     pipe->Writers --;
270
271  PIPE_UNLOCK(pipe);
272
273  if (pipe->Readers == 0 && pipe->Writers == 0) {
274#if 0
275    /* To delete an anonymous pipe file when all users closed it */
276    if (pipe->Anonymous)
277      delfile = TRUE;
278#endif
279    pipe_free(pipe);
280    *pipep = NULL;
281  }
282  else if (pipe->Readers == 0 && mode != LIBIO_FLAGS_WRITE)
283    /* Notify waiting Writers that all their partners left */
284    PIPE_WAKEUPWRITERS(pipe);
285  else if (pipe->Writers == 0 && mode != LIBIO_FLAGS_READ)
286    PIPE_WAKEUPREADERS(pipe);
287
288  pipe_unlock();
289
290#if 0
291  if (! delfile)
292    return;
293  if (iop->pathinfo.ops->unlink_h == NULL)
294    return;
295
296  /* This is safe for IMFS, but how about other FSes? */
297  iop->flags &= ~LIBIO_FLAGS_OPEN;
298  if(iop->pathinfo.ops->unlink_h(&iop->pathinfo))
299    return;
300#endif
301
302}
303
304int fifo_open(
305  pipe_control_t **pipep,
306  rtems_libio_t *iop
307)
308{
309  pipe_control_t *pipe;
310  unsigned int prevCounter;
311  int err;
312
313  err = pipe_new(pipep);
314  if (err)
315    return err;
316  pipe = *pipep;
317
318  switch (LIBIO_ACCMODE(iop)) {
319    case LIBIO_FLAGS_READ:
320      pipe->readerCounter ++;
321      if (pipe->Readers ++ == 0)
322        PIPE_WAKEUPWRITERS(pipe);
323
324      if (pipe->Writers == 0) {
325        /* Not an error */
326        if (LIBIO_NODELAY(iop))
327          break;
328
329        prevCounter = pipe->writerCounter;
330        err = -EINTR;
331        /* Wait until a writer opens the pipe */
332        do {
333          PIPE_UNLOCK(pipe);
334          if (! PIPE_READWAIT(pipe))
335            goto out_error;
336          if (! PIPE_LOCK(pipe))
337            goto out_error;
338        } while (prevCounter == pipe->writerCounter);
339      }
340      break;
341
342    case LIBIO_FLAGS_WRITE:
343      pipe->writerCounter ++;
344
345      if (pipe->Writers ++ == 0)
346        PIPE_WAKEUPREADERS(pipe);
347
348      if (pipe->Readers == 0 && LIBIO_NODELAY(iop)) {
349        PIPE_UNLOCK(pipe);
350        err = -ENXIO;
351        goto out_error;
352      }
353
354      if (pipe->Readers == 0) {
355        prevCounter = pipe->readerCounter;
356        err = -EINTR;
357        do {
358          PIPE_UNLOCK(pipe);
359          if (! PIPE_WRITEWAIT(pipe))
360            goto out_error;
361          if (! PIPE_LOCK(pipe))
362            goto out_error;
363        } while (prevCounter == pipe->readerCounter);
364      }
365      break;
366
367    case LIBIO_FLAGS_READ_WRITE:
368      pipe->readerCounter ++;
369      if (pipe->Readers ++ == 0)
370        PIPE_WAKEUPWRITERS(pipe);
371      pipe->writerCounter ++;
372      if (pipe->Writers ++ == 0)
373        PIPE_WAKEUPREADERS(pipe);
374      break;
375  }
376
377  PIPE_UNLOCK(pipe);
378  return 0;
379
380out_error:
381  pipe_release(pipep, iop);
382  return err;
383}
384
385ssize_t pipe_read(
386  pipe_control_t *pipe,
387  void           *buffer,
388  size_t          count,
389  rtems_libio_t  *iop
390)
391{
392  int chunk, chunk1, read = 0, ret = 0;
393
394  if (! PIPE_LOCK(pipe))
395    return -EINTR;
396
397  while (read < count) {
398    while (PIPE_EMPTY(pipe)) {
399      /* Not an error */
400      if (pipe->Writers == 0)
401        goto out_locked;
402
403      if (LIBIO_NODELAY(iop)) {
404        ret = -EAGAIN;
405        goto out_locked;
406      }
407
408      /* Wait until pipe is no more empty or no writer exists */
409      pipe->waitingReaders ++;
410      PIPE_UNLOCK(pipe);
411      if (! PIPE_READWAIT(pipe))
412        ret = -EINTR;
413      if (! PIPE_LOCK(pipe)) {
414        /* WARN waitingReaders not restored! */
415        ret = -EINTR;
416        goto out_nolock;
417      }
418      pipe->waitingReaders --;
419      if (ret != 0)
420        goto out_locked;
421    }
422
423    /* Read chunk bytes */
424    chunk = MIN(count - read,  pipe->Length);
425    chunk1 = pipe->Size - pipe->Start;
426    if (chunk > chunk1) {
427      memcpy(buffer + read, pipe->Buffer + pipe->Start, chunk1);
428      memcpy(buffer + read + chunk1, pipe->Buffer, chunk - chunk1);
429    }
430    else
431      memcpy(buffer + read, pipe->Buffer + pipe->Start, chunk);
432
433    pipe->Start += chunk;
434    pipe->Start %= pipe->Size;
435    pipe->Length -= chunk;
436    /* For buffering optimization */
437    if (PIPE_EMPTY(pipe))
438      pipe->Start = 0;
439
440    if (pipe->waitingWriters > 0)
441      PIPE_WAKEUPWRITERS(pipe);
442    read += chunk;
443  }
444
445out_locked:
446  PIPE_UNLOCK(pipe);
447
448out_nolock:
449  if (read > 0)
450    return read;
451  return ret;
452}
453
454ssize_t pipe_write(
455  pipe_control_t *pipe,
456  const void     *buffer,
457  size_t          count,
458  rtems_libio_t  *iop
459)
460{
461  int chunk, chunk1, written = 0, ret = 0;
462
463  /* Write nothing */
464  if (count == 0)
465    return 0;
466
467  if (! PIPE_LOCK(pipe))
468    return -EINTR;
469
470  if (pipe->Readers == 0) {
471    ret = -EPIPE;
472    goto out_locked;
473  }
474
475  /* Write of PIPE_BUF bytes or less shall not be interleaved */
476  chunk = count <= pipe->Size ? count : 1;
477
478  while (written < count) {
479    while (PIPE_SPACE(pipe) < chunk) {
480      if (LIBIO_NODELAY(iop)) {
481        ret = -EAGAIN;
482        goto out_locked;
483      }
484
485      /* Wait until there is chunk bytes space or no reader exists */
486      pipe->waitingWriters ++;
487      PIPE_UNLOCK(pipe);
488      if (! PIPE_WRITEWAIT(pipe))
489        ret = -EINTR;
490      if (! PIPE_LOCK(pipe)) {
491        /* WARN waitingWriters not restored! */
492        ret = -EINTR;
493        goto out_nolock;
494      }
495      pipe->waitingWriters --;
496      if (ret != 0)
497        goto out_locked;
498
499      if (pipe->Readers == 0) {
500        ret = -EPIPE;
501        goto out_locked;
502      }
503    }
504
505    chunk = MIN(count - written, PIPE_SPACE(pipe));
506    chunk1 = pipe->Size - PIPE_WSTART(pipe);
507    if (chunk > chunk1) {
508      memcpy(pipe->Buffer + PIPE_WSTART(pipe), buffer + written, chunk1);
509      memcpy(pipe->Buffer, buffer + written + chunk1, chunk - chunk1);
510    }
511    else
512      memcpy(pipe->Buffer + PIPE_WSTART(pipe), buffer + written, chunk);
513
514    pipe->Length += chunk;
515    if (pipe->waitingReaders > 0)
516      PIPE_WAKEUPREADERS(pipe);
517    written += chunk;
518    /* Write of more than PIPE_BUF bytes can be interleaved */
519    chunk = 1;
520  }
521
522out_locked:
523  PIPE_UNLOCK(pipe);
524
525out_nolock:
526#ifdef RTEMS_POSIX_API
527  /* Signal SIGPIPE */
528  if (ret == -EPIPE)
529    kill(getpid(), SIGPIPE);
530#endif
531
532  if (written > 0)
533    return written;
534  return ret;
535}
536
537int pipe_ioctl(
538  pipe_control_t  *pipe,
539  ioctl_command_t  cmd,
540  void            *buffer,
541  rtems_libio_t   *iop
542)
543{
544  if (cmd == FIONREAD) {
545    if (buffer == NULL)
546      return -EFAULT;
547
548    if (! PIPE_LOCK(pipe))
549      return -EINTR;
550
551    /* Return length of pipe */
552    *(unsigned int *)buffer = pipe->Length;
553    PIPE_UNLOCK(pipe);
554    return 0;
555  }
556
557  return -EINVAL;
558}
Note: See TracBrowser for help on using the repository browser.