source: rtems/cpukit/libfs/src/pipe/fifo.c @ fe6c170c

4.115
Last change on this file since fe6c170c was fe6c170c, checked in by Sebastian Huber <sebastian.huber@…>, on Jul 24, 2013 at 2:19:52 PM

score: Create states implementation header

Move implementation specific parts of states.h and states.inl into new
header file statesimpl.h. The states.h contains now only the
application visible API.

  • Property mode set to 100644
File size: 12.1 KB
Line 
1/**
2 * @file
3 *
4 * @brief FIFO/Pipe Support
5 * @ingroup FIFO_PIPE
6 */
7
8/*
9 * Author: Wei Shen <cquark@gmail.com>
10 *
11 * The license and distribution terms for this file may be
12 * found in the file LICENSE in this distribution or at
13 * http://www.rtems.com/license/LICENSE.
14 */
15
16
17#if HAVE_CONFIG_H
18#include "config.h"
19#endif
20
21#ifdef RTEMS_POSIX_API
22#define __RTEMS_VIOLATE_KERNEL_VISIBILITY__
23#endif
24
25#include <errno.h>
26#include <stdlib.h>
27#include <string.h>
28
29#include <rtems.h>
30#include <rtems/libio_.h>
31#include <rtems/rtems/barrierimpl.h>
32#include <rtems/score/statesimpl.h>
33
34#include "pipe.h"
35
36
37#define MIN(a, b) ((a) < (b)? (a): (b))
38
39#define LIBIO_ACCMODE(_iop) ((_iop)->flags & LIBIO_FLAGS_READ_WRITE)
40#define LIBIO_NODELAY(_iop) ((_iop)->flags & LIBIO_FLAGS_NO_DELAY)
41
42static rtems_id pipe_semaphore = RTEMS_ID_NONE;
43
44
45#define PIPE_EMPTY(_pipe) (_pipe->Length == 0)
46#define PIPE_FULL(_pipe)  (_pipe->Length == _pipe->Size)
47#define PIPE_SPACE(_pipe) (_pipe->Size - _pipe->Length)
48#define PIPE_WSTART(_pipe) ((_pipe->Start + _pipe->Length) % _pipe->Size)
49
50#define PIPE_LOCK(_pipe)  \
51  ( rtems_semaphore_obtain(_pipe->Semaphore, RTEMS_WAIT, RTEMS_NO_TIMEOUT)  \
52   == RTEMS_SUCCESSFUL )
53
54#define PIPE_UNLOCK(_pipe)  rtems_semaphore_release(_pipe->Semaphore)
55
56#define PIPE_READWAIT(_pipe)  \
57  ( rtems_barrier_wait(_pipe->readBarrier, RTEMS_NO_TIMEOUT)  \
58   == RTEMS_SUCCESSFUL)
59
60#define PIPE_WRITEWAIT(_pipe)  \
61  ( rtems_barrier_wait(_pipe->writeBarrier, RTEMS_NO_TIMEOUT)  \
62   == RTEMS_SUCCESSFUL)
63
64#define PIPE_WAKEUPREADERS(_pipe) \
65  do {uint32_t n; rtems_barrier_release(_pipe->readBarrier, &n); } while(0)
66
67#define PIPE_WAKEUPWRITERS(_pipe) \
68  do {uint32_t n; rtems_barrier_release(_pipe->writeBarrier, &n); } while(0)
69
70
71#ifdef RTEMS_POSIX_API
72#define __RTEMS_VIOLATE_KERNEL_VISIBILITY__
73
74#include <rtems/rtems/barrier.h>
75#include <rtems/score/thread.h>
76
77/* Set barriers to be interruptible by signals. */
78static void pipe_interruptible(pipe_control_t *pipe)
79{
80  Objects_Locations  location;
81  Barrier_Control   *the_barrier;
82
83  the_barrier = _Barrier_Get(pipe->readBarrier, &location);
84  the_barrier->Barrier.Wait_queue.state |= STATES_INTERRUPTIBLE_BY_SIGNAL;
85  _Objects_Put( &the_barrier->Object );
86
87  the_barrier = _Barrier_Get(pipe->writeBarrier, &location);
88  the_barrier->Barrier.Wait_queue.state |= STATES_INTERRUPTIBLE_BY_SIGNAL;
89  _Objects_Put( &the_barrier->Object );
90}
91#endif
92
93/*
94 * Alloc pipe control structure, buffer, and resources.
95 * Called with pipe_semaphore held.
96 */
97static int pipe_alloc(
98  pipe_control_t **pipep
99)
100{
101  static char c = 'a';
102  pipe_control_t *pipe;
103  int err = -ENOMEM;
104
105  pipe = malloc(sizeof(pipe_control_t));
106  if (pipe == NULL)
107    return err;
108  memset(pipe, 0, sizeof(pipe_control_t));
109
110  pipe->Size = PIPE_BUF;
111  pipe->Buffer = malloc(pipe->Size);
112  if (! pipe->Buffer)
113    goto err_buf;
114
115  err = -ENOMEM;
116
117  if (rtems_barrier_create(
118        rtems_build_name ('P', 'I', 'r', c),
119        RTEMS_BARRIER_MANUAL_RELEASE, 0,
120        &pipe->readBarrier) != RTEMS_SUCCESSFUL)
121    goto err_rbar;
122  if (rtems_barrier_create(
123        rtems_build_name ('P', 'I', 'w', c),
124        RTEMS_BARRIER_MANUAL_RELEASE, 0,
125        &pipe->writeBarrier) != RTEMS_SUCCESSFUL)
126    goto err_wbar;
127  if (rtems_semaphore_create(
128        rtems_build_name ('P', 'I', 's', c), 1,
129        RTEMS_BINARY_SEMAPHORE | RTEMS_FIFO,
130        RTEMS_NO_PRIORITY, &pipe->Semaphore) != RTEMS_SUCCESSFUL)
131    goto err_sem;
132
133#ifdef RTEMS_POSIX_API
134  pipe_interruptible(pipe);
135#endif
136
137  *pipep = pipe;
138  if (c ++ == 'z')
139    c = 'a';
140  return 0;
141
142err_sem:
143  rtems_barrier_delete(pipe->writeBarrier);
144err_wbar:
145  rtems_barrier_delete(pipe->readBarrier);
146err_rbar:
147  free(pipe->Buffer);
148err_buf:
149  free(pipe);
150  return err;
151}
152
153/* Called with pipe_semaphore held. */
154static inline void pipe_free(
155  pipe_control_t *pipe
156)
157{
158  rtems_barrier_delete(pipe->readBarrier);
159  rtems_barrier_delete(pipe->writeBarrier);
160  rtems_semaphore_delete(pipe->Semaphore);
161  free(pipe->Buffer);
162  free(pipe);
163}
164
165static int pipe_lock(void)
166{
167  rtems_status_code sc = RTEMS_SUCCESSFUL;
168
169  if (pipe_semaphore == RTEMS_ID_NONE) {
170    rtems_libio_lock();
171
172    if (pipe_semaphore == RTEMS_ID_NONE) {
173      sc = rtems_semaphore_create(
174        rtems_build_name('P', 'I', 'P', 'E'),
175        1,
176        RTEMS_BINARY_SEMAPHORE | RTEMS_INHERIT_PRIORITY | RTEMS_PRIORITY,
177        RTEMS_NO_PRIORITY,
178        &pipe_semaphore
179      );
180    }
181
182    rtems_libio_unlock();
183  }
184
185  if (sc == RTEMS_SUCCESSFUL) {
186    sc = rtems_semaphore_obtain(pipe_semaphore, RTEMS_WAIT, RTEMS_NO_TIMEOUT);
187  }
188
189  if (sc == RTEMS_SUCCESSFUL) {
190    return 0;
191  } else {
192    return -ENOMEM;
193  }
194}
195
196static void pipe_unlock(void)
197{
198#ifdef RTEMS_DEBUG
199  rtems_status_code sc = RTEMS_SUCCESSFUL;
200
201  sc =
202#endif
203   rtems_semaphore_release(pipe_semaphore);
204  #ifdef RTEMS_DEBUG
205    if (sc != RTEMS_SUCCESSFUL) {
206      rtems_fatal_error_occurred(0xdeadbeef);
207    }
208  #endif
209}
210
211/*
212 * If called with *pipep = NULL, pipe_new will call pipe_alloc to allocate a
213 * pipe control structure and set *pipep to its address.
214 * pipe is locked, when pipe_new returns with no error.
215 */
216static int pipe_new(
217  pipe_control_t **pipep
218)
219{
220  pipe_control_t *pipe;
221  int err = 0;
222
223  err = pipe_lock();
224  if (err)
225    return err;
226
227  pipe = *pipep;
228  if (pipe == NULL) {
229    err = pipe_alloc(&pipe);
230    if (err)
231      goto out;
232  }
233
234  if (! PIPE_LOCK(pipe))
235    err = -EINTR;
236
237  if (*pipep == NULL) {
238    if (err)
239      pipe_free(pipe);
240    else
241      *pipep = pipe;
242  }
243
244out:
245  pipe_unlock();
246  return err;
247}
248
249void pipe_release(
250  pipe_control_t **pipep,
251  rtems_libio_t *iop
252)
253{
254  pipe_control_t *pipe = *pipep;
255  uint32_t mode;
256
257  #if defined(RTEMS_DEBUG)
258    /* WARN pipe not freed and pipep not set to NULL! */
259    if (pipe_lock())
260      rtems_fatal_error_occurred(0xdeadbeef);
261
262    /* WARN pipe not released! */
263    if (!PIPE_LOCK(pipe))
264      rtems_fatal_error_occurred(0xdeadbeef);
265  #endif
266
267  mode = LIBIO_ACCMODE(iop);
268  if (mode & LIBIO_FLAGS_READ)
269     pipe->Readers --;
270  if (mode & LIBIO_FLAGS_WRITE)
271     pipe->Writers --;
272
273  PIPE_UNLOCK(pipe);
274
275  if (pipe->Readers == 0 && pipe->Writers == 0) {
276#if 0
277    /* To delete an anonymous pipe file when all users closed it */
278    if (pipe->Anonymous)
279      delfile = TRUE;
280#endif
281    pipe_free(pipe);
282    *pipep = NULL;
283  }
284  else if (pipe->Readers == 0 && mode != LIBIO_FLAGS_WRITE)
285    /* Notify waiting Writers that all their partners left */
286    PIPE_WAKEUPWRITERS(pipe);
287  else if (pipe->Writers == 0 && mode != LIBIO_FLAGS_READ)
288    PIPE_WAKEUPREADERS(pipe);
289
290  pipe_unlock();
291
292#if 0
293  if (! delfile)
294    return;
295  if (iop->pathinfo.ops->unlink_h == NULL)
296    return;
297
298  /* This is safe for IMFS, but how about other FSes? */
299  iop->flags &= ~LIBIO_FLAGS_OPEN;
300  if(iop->pathinfo.ops->unlink_h(&iop->pathinfo))
301    return;
302#endif
303
304}
305
306int fifo_open(
307  pipe_control_t **pipep,
308  rtems_libio_t *iop
309)
310{
311  pipe_control_t *pipe;
312  unsigned int prevCounter;
313  int err;
314
315  err = pipe_new(pipep);
316  if (err)
317    return err;
318  pipe = *pipep;
319
320  switch (LIBIO_ACCMODE(iop)) {
321    case LIBIO_FLAGS_READ:
322      pipe->readerCounter ++;
323      if (pipe->Readers ++ == 0)
324        PIPE_WAKEUPWRITERS(pipe);
325
326      if (pipe->Writers == 0) {
327        /* Not an error */
328        if (LIBIO_NODELAY(iop))
329          break;
330
331        prevCounter = pipe->writerCounter;
332        err = -EINTR;
333        /* Wait until a writer opens the pipe */
334        do {
335          PIPE_UNLOCK(pipe);
336          if (! PIPE_READWAIT(pipe))
337            goto out_error;
338          if (! PIPE_LOCK(pipe))
339            goto out_error;
340        } while (prevCounter == pipe->writerCounter);
341      }
342      break;
343
344    case LIBIO_FLAGS_WRITE:
345      pipe->writerCounter ++;
346
347      if (pipe->Writers ++ == 0)
348        PIPE_WAKEUPREADERS(pipe);
349
350      if (pipe->Readers == 0 && LIBIO_NODELAY(iop)) {
351        PIPE_UNLOCK(pipe);
352        err = -ENXIO;
353        goto out_error;
354      }
355
356      if (pipe->Readers == 0) {
357        prevCounter = pipe->readerCounter;
358        err = -EINTR;
359        do {
360          PIPE_UNLOCK(pipe);
361          if (! PIPE_WRITEWAIT(pipe))
362            goto out_error;
363          if (! PIPE_LOCK(pipe))
364            goto out_error;
365        } while (prevCounter == pipe->readerCounter);
366      }
367      break;
368
369    case LIBIO_FLAGS_READ_WRITE:
370      pipe->readerCounter ++;
371      if (pipe->Readers ++ == 0)
372        PIPE_WAKEUPWRITERS(pipe);
373      pipe->writerCounter ++;
374      if (pipe->Writers ++ == 0)
375        PIPE_WAKEUPREADERS(pipe);
376      break;
377  }
378
379  PIPE_UNLOCK(pipe);
380  return 0;
381
382out_error:
383  pipe_release(pipep, iop);
384  return err;
385}
386
387ssize_t pipe_read(
388  pipe_control_t *pipe,
389  void           *buffer,
390  size_t          count,
391  rtems_libio_t  *iop
392)
393{
394  int chunk, chunk1, read = 0, ret = 0;
395
396  if (! PIPE_LOCK(pipe))
397    return -EINTR;
398
399  while (read < count) {
400    while (PIPE_EMPTY(pipe)) {
401      /* Not an error */
402      if (pipe->Writers == 0)
403        goto out_locked;
404
405      if (LIBIO_NODELAY(iop)) {
406        ret = -EAGAIN;
407        goto out_locked;
408      }
409
410      /* Wait until pipe is no more empty or no writer exists */
411      pipe->waitingReaders ++;
412      PIPE_UNLOCK(pipe);
413      if (! PIPE_READWAIT(pipe))
414        ret = -EINTR;
415      if (! PIPE_LOCK(pipe)) {
416        /* WARN waitingReaders not restored! */
417        ret = -EINTR;
418        goto out_nolock;
419      }
420      pipe->waitingReaders --;
421      if (ret != 0)
422        goto out_locked;
423    }
424
425    /* Read chunk bytes */
426    chunk = MIN(count - read,  pipe->Length);
427    chunk1 = pipe->Size - pipe->Start;
428    if (chunk > chunk1) {
429      memcpy(buffer + read, pipe->Buffer + pipe->Start, chunk1);
430      memcpy(buffer + read + chunk1, pipe->Buffer, chunk - chunk1);
431    }
432    else
433      memcpy(buffer + read, pipe->Buffer + pipe->Start, chunk);
434
435    pipe->Start += chunk;
436    pipe->Start %= pipe->Size;
437    pipe->Length -= chunk;
438    /* For buffering optimization */
439    if (PIPE_EMPTY(pipe))
440      pipe->Start = 0;
441
442    if (pipe->waitingWriters > 0)
443      PIPE_WAKEUPWRITERS(pipe);
444    read += chunk;
445  }
446
447out_locked:
448  PIPE_UNLOCK(pipe);
449
450out_nolock:
451  if (read > 0)
452    return read;
453  return ret;
454}
455
456ssize_t pipe_write(
457  pipe_control_t *pipe,
458  const void     *buffer,
459  size_t          count,
460  rtems_libio_t  *iop
461)
462{
463  int chunk, chunk1, written = 0, ret = 0;
464
465  /* Write nothing */
466  if (count == 0)
467    return 0;
468
469  if (! PIPE_LOCK(pipe))
470    return -EINTR;
471
472  if (pipe->Readers == 0) {
473    ret = -EPIPE;
474    goto out_locked;
475  }
476
477  /* Write of PIPE_BUF bytes or less shall not be interleaved */
478  chunk = count <= pipe->Size ? count : 1;
479
480  while (written < count) {
481    while (PIPE_SPACE(pipe) < chunk) {
482      if (LIBIO_NODELAY(iop)) {
483        ret = -EAGAIN;
484        goto out_locked;
485      }
486
487      /* Wait until there is chunk bytes space or no reader exists */
488      pipe->waitingWriters ++;
489      PIPE_UNLOCK(pipe);
490      if (! PIPE_WRITEWAIT(pipe))
491        ret = -EINTR;
492      if (! PIPE_LOCK(pipe)) {
493        /* WARN waitingWriters not restored! */
494        ret = -EINTR;
495        goto out_nolock;
496      }
497      pipe->waitingWriters --;
498      if (ret != 0)
499        goto out_locked;
500
501      if (pipe->Readers == 0) {
502        ret = -EPIPE;
503        goto out_locked;
504      }
505    }
506
507    chunk = MIN(count - written, PIPE_SPACE(pipe));
508    chunk1 = pipe->Size - PIPE_WSTART(pipe);
509    if (chunk > chunk1) {
510      memcpy(pipe->Buffer + PIPE_WSTART(pipe), buffer + written, chunk1);
511      memcpy(pipe->Buffer, buffer + written + chunk1, chunk - chunk1);
512    }
513    else
514      memcpy(pipe->Buffer + PIPE_WSTART(pipe), buffer + written, chunk);
515
516    pipe->Length += chunk;
517    if (pipe->waitingReaders > 0)
518      PIPE_WAKEUPREADERS(pipe);
519    written += chunk;
520    /* Write of more than PIPE_BUF bytes can be interleaved */
521    chunk = 1;
522  }
523
524out_locked:
525  PIPE_UNLOCK(pipe);
526
527out_nolock:
528#ifdef RTEMS_POSIX_API
529  /* Signal SIGPIPE */
530  if (ret == -EPIPE)
531    kill(getpid(), SIGPIPE);
532#endif
533
534  if (written > 0)
535    return written;
536  return ret;
537}
538
539int pipe_ioctl(
540  pipe_control_t  *pipe,
541  ioctl_command_t  cmd,
542  void            *buffer,
543  rtems_libio_t   *iop
544)
545{
546  if (cmd == FIONREAD) {
547    if (buffer == NULL)
548      return -EFAULT;
549
550    if (! PIPE_LOCK(pipe))
551      return -EINTR;
552
553    /* Return length of pipe */
554    *(unsigned int *)buffer = pipe->Length;
555    PIPE_UNLOCK(pipe);
556    return 0;
557  }
558
559  return -EINVAL;
560}
Note: See TracBrowser for help on using the repository browser.