source: rtems/cpukit/libfs/src/pipe/fifo.c @ c75aa86

5
Last change on this file since c75aa86 was c75aa86, checked in by Sebastian Huber <sebastian.huber@…>, on 06/06/16 at 10:51:59

pipe: Use proper semaphore attr for mutex

Close #2728.

  • Property mode set to 100644
File size: 11.2 KB
Line 
1/**
2 * @file
3 *
4 * @brief FIFO/Pipe Support
5 * @ingroup FIFO_PIPE
6 */
7
8/*
9 * Author: Wei Shen <cquark@gmail.com>
10 *
11 * The license and distribution terms for this file may be
12 * found in the file LICENSE in this distribution or at
13 * http://www.rtems.org/license/LICENSE.
14 */
15
16
17#if HAVE_CONFIG_H
18#include "config.h"
19#endif
20
21#include <sys/param.h>
22#include <errno.h>
23#include <stdlib.h>
24#include <string.h>
25
26#include <rtems.h>
27#include <rtems/libio_.h>
28#include <rtems/rtems/barrierimpl.h>
29#include <rtems/score/statesimpl.h>
30
31#include "pipe.h"
32
33#define LIBIO_ACCMODE(_iop) ((_iop)->flags & LIBIO_FLAGS_READ_WRITE)
34#define LIBIO_NODELAY(_iop) ((_iop)->flags & LIBIO_FLAGS_NO_DELAY)
35
36static rtems_id pipe_semaphore = RTEMS_ID_NONE;
37
38
39#define PIPE_EMPTY(_pipe) (_pipe->Length == 0)
40#define PIPE_FULL(_pipe)  (_pipe->Length == _pipe->Size)
41#define PIPE_SPACE(_pipe) (_pipe->Size - _pipe->Length)
42#define PIPE_WSTART(_pipe) ((_pipe->Start + _pipe->Length) % _pipe->Size)
43
44#define PIPE_LOCK(_pipe)  \
45  ( rtems_semaphore_obtain(_pipe->Semaphore, RTEMS_WAIT, RTEMS_NO_TIMEOUT)  \
46   == RTEMS_SUCCESSFUL )
47
48#define PIPE_UNLOCK(_pipe)  rtems_semaphore_release(_pipe->Semaphore)
49
50#define PIPE_READWAIT(_pipe)  \
51  ( rtems_barrier_wait(_pipe->readBarrier, RTEMS_NO_TIMEOUT)  \
52   == RTEMS_SUCCESSFUL)
53
54#define PIPE_WRITEWAIT(_pipe)  \
55  ( rtems_barrier_wait(_pipe->writeBarrier, RTEMS_NO_TIMEOUT)  \
56   == RTEMS_SUCCESSFUL)
57
58#define PIPE_WAKEUPREADERS(_pipe) \
59  do {uint32_t n; rtems_barrier_release(_pipe->readBarrier, &n); } while(0)
60
61#define PIPE_WAKEUPWRITERS(_pipe) \
62  do {uint32_t n; rtems_barrier_release(_pipe->writeBarrier, &n); } while(0)
63
64/*
65 * Alloc pipe control structure, buffer, and resources.
66 * Called with pipe_semaphore held.
67 */
68static int pipe_alloc(
69  pipe_control_t **pipep
70)
71{
72  static char c = 'a';
73  pipe_control_t *pipe;
74  int err = -ENOMEM;
75
76  pipe = malloc(sizeof(pipe_control_t));
77  if (pipe == NULL)
78    return err;
79  memset(pipe, 0, sizeof(pipe_control_t));
80
81  pipe->Size = PIPE_BUF;
82  pipe->Buffer = malloc(pipe->Size);
83  if (! pipe->Buffer)
84    goto err_buf;
85
86  err = -ENOMEM;
87
88  if (rtems_barrier_create(
89        rtems_build_name ('P', 'I', 'r', c),
90        RTEMS_BARRIER_MANUAL_RELEASE, 0,
91        &pipe->readBarrier) != RTEMS_SUCCESSFUL)
92    goto err_rbar;
93  if (rtems_barrier_create(
94        rtems_build_name ('P', 'I', 'w', c),
95        RTEMS_BARRIER_MANUAL_RELEASE, 0,
96        &pipe->writeBarrier) != RTEMS_SUCCESSFUL)
97    goto err_wbar;
98  if (rtems_semaphore_create(
99        rtems_build_name ('P', 'I', 's', c), 1,
100        RTEMS_BINARY_SEMAPHORE | RTEMS_PRIORITY | RTEMS_INHERIT_PRIORITY,
101        0, &pipe->Semaphore) != RTEMS_SUCCESSFUL)
102    goto err_sem;
103
104  *pipep = pipe;
105  if (c ++ == 'z')
106    c = 'a';
107  return 0;
108
109err_sem:
110  rtems_barrier_delete(pipe->writeBarrier);
111err_wbar:
112  rtems_barrier_delete(pipe->readBarrier);
113err_rbar:
114  free(pipe->Buffer);
115err_buf:
116  free(pipe);
117  return err;
118}
119
120/* Called with pipe_semaphore held. */
121static inline void pipe_free(
122  pipe_control_t *pipe
123)
124{
125  rtems_barrier_delete(pipe->readBarrier);
126  rtems_barrier_delete(pipe->writeBarrier);
127  rtems_semaphore_delete(pipe->Semaphore);
128  free(pipe->Buffer);
129  free(pipe);
130}
131
132static int pipe_lock(void)
133{
134  rtems_status_code sc = RTEMS_SUCCESSFUL;
135
136  if (pipe_semaphore == RTEMS_ID_NONE) {
137    rtems_libio_lock();
138
139    if (pipe_semaphore == RTEMS_ID_NONE) {
140      sc = rtems_semaphore_create(
141        rtems_build_name('P', 'I', 'P', 'E'),
142        1,
143        RTEMS_BINARY_SEMAPHORE | RTEMS_INHERIT_PRIORITY | RTEMS_PRIORITY,
144        RTEMS_NO_PRIORITY,
145        &pipe_semaphore
146      );
147    }
148
149    rtems_libio_unlock();
150  }
151
152  if (sc == RTEMS_SUCCESSFUL) {
153    sc = rtems_semaphore_obtain(pipe_semaphore, RTEMS_WAIT, RTEMS_NO_TIMEOUT);
154  }
155
156  if (sc == RTEMS_SUCCESSFUL) {
157    return 0;
158  } else {
159    return -ENOMEM;
160  }
161}
162
163static void pipe_unlock(void)
164{
165#ifdef RTEMS_DEBUG
166  rtems_status_code sc = RTEMS_SUCCESSFUL;
167
168  sc =
169#endif
170   rtems_semaphore_release(pipe_semaphore);
171  #ifdef RTEMS_DEBUG
172    if (sc != RTEMS_SUCCESSFUL) {
173      rtems_fatal_error_occurred(0xdeadbeef);
174    }
175  #endif
176}
177
178/*
179 * If called with *pipep = NULL, pipe_new will call pipe_alloc to allocate a
180 * pipe control structure and set *pipep to its address.
181 * pipe is locked, when pipe_new returns with no error.
182 */
183static int pipe_new(
184  pipe_control_t **pipep
185)
186{
187  pipe_control_t *pipe;
188  int err = 0;
189
190  _Assert( pipep );
191  err = pipe_lock();
192  if (err)
193    return err;
194
195  pipe = *pipep;
196  if (pipe == NULL) {
197    err = pipe_alloc(&pipe);
198    if (err)
199      goto out;
200  }
201
202  if (!PIPE_LOCK(pipe))
203    err = -EINTR;
204
205  if (*pipep == NULL) {
206    if (err)
207      pipe_free(pipe);
208    else
209      *pipep = pipe;
210  }
211
212out:
213  pipe_unlock();
214  return err;
215}
216
217void pipe_release(
218  pipe_control_t **pipep,
219  rtems_libio_t *iop
220)
221{
222  pipe_control_t *pipe = *pipep;
223  uint32_t mode;
224
225  #if defined(RTEMS_DEBUG)
226    /* WARN pipe not freed and pipep not set to NULL! */
227    if (pipe_lock())
228      rtems_fatal_error_occurred(0xdeadbeef);
229
230    /* WARN pipe not released! */
231    if (!PIPE_LOCK(pipe))
232      rtems_fatal_error_occurred(0xdeadbeef);
233  #endif
234
235  mode = LIBIO_ACCMODE(iop);
236  if (mode & LIBIO_FLAGS_READ)
237     pipe->Readers --;
238  if (mode & LIBIO_FLAGS_WRITE)
239     pipe->Writers --;
240
241  PIPE_UNLOCK(pipe);
242
243  if (pipe->Readers == 0 && pipe->Writers == 0) {
244#if 0
245    /* To delete an anonymous pipe file when all users closed it */
246    if (pipe->Anonymous)
247      delfile = TRUE;
248#endif
249    pipe_free(pipe);
250    *pipep = NULL;
251  }
252  else if (pipe->Readers == 0 && mode != LIBIO_FLAGS_WRITE)
253    /* Notify waiting Writers that all their partners left */
254    PIPE_WAKEUPWRITERS(pipe);
255  else if (pipe->Writers == 0 && mode != LIBIO_FLAGS_READ)
256    PIPE_WAKEUPREADERS(pipe);
257
258  pipe_unlock();
259
260#if 0
261  if (! delfile)
262    return;
263  if (iop->pathinfo.ops->unlink_h == NULL)
264    return;
265
266  /* This is safe for IMFS, but how about other FSes? */
267  iop->flags &= ~LIBIO_FLAGS_OPEN;
268  if(iop->pathinfo.ops->unlink_h(&iop->pathinfo))
269    return;
270#endif
271
272}
273
274int fifo_open(
275  pipe_control_t **pipep,
276  rtems_libio_t *iop
277)
278{
279  pipe_control_t *pipe;
280  unsigned int prevCounter;
281  int err;
282
283  err = pipe_new(pipep);
284  if (err)
285    return err;
286  pipe = *pipep;
287
288  switch (LIBIO_ACCMODE(iop)) {
289    case LIBIO_FLAGS_READ:
290      pipe->readerCounter ++;
291      if (pipe->Readers ++ == 0)
292        PIPE_WAKEUPWRITERS(pipe);
293
294      if (pipe->Writers == 0) {
295        /* Not an error */
296        if (LIBIO_NODELAY(iop))
297          break;
298
299        prevCounter = pipe->writerCounter;
300        err = -EINTR;
301        /* Wait until a writer opens the pipe */
302        do {
303          PIPE_UNLOCK(pipe);
304          if (! PIPE_READWAIT(pipe))
305            goto out_error;
306          if (! PIPE_LOCK(pipe))
307            goto out_error;
308        } while (prevCounter == pipe->writerCounter);
309      }
310      break;
311
312    case LIBIO_FLAGS_WRITE:
313      pipe->writerCounter ++;
314
315      if (pipe->Writers ++ == 0)
316        PIPE_WAKEUPREADERS(pipe);
317
318      if (pipe->Readers == 0 && LIBIO_NODELAY(iop)) {
319        PIPE_UNLOCK(pipe);
320        err = -ENXIO;
321        goto out_error;
322      }
323
324      if (pipe->Readers == 0) {
325        prevCounter = pipe->readerCounter;
326        err = -EINTR;
327        do {
328          PIPE_UNLOCK(pipe);
329          if (! PIPE_WRITEWAIT(pipe))
330            goto out_error;
331          if (! PIPE_LOCK(pipe))
332            goto out_error;
333        } while (prevCounter == pipe->readerCounter);
334      }
335      break;
336
337    case LIBIO_FLAGS_READ_WRITE:
338      pipe->readerCounter ++;
339      if (pipe->Readers ++ == 0)
340        PIPE_WAKEUPWRITERS(pipe);
341      pipe->writerCounter ++;
342      if (pipe->Writers ++ == 0)
343        PIPE_WAKEUPREADERS(pipe);
344      break;
345  }
346
347  PIPE_UNLOCK(pipe);
348  return 0;
349
350out_error:
351  pipe_release(pipep, iop);
352  return err;
353}
354
355ssize_t pipe_read(
356  pipe_control_t *pipe,
357  void           *buffer,
358  size_t          count,
359  rtems_libio_t  *iop
360)
361{
362  int chunk, chunk1, read = 0, ret = 0;
363
364  if (! PIPE_LOCK(pipe))
365    return -EINTR;
366
367  while (PIPE_EMPTY(pipe)) {
368    /* Not an error */
369    if (pipe->Writers == 0)
370      goto out_locked;
371
372    if (LIBIO_NODELAY(iop)) {
373      ret = -EAGAIN;
374      goto out_locked;
375    }
376
377    /* Wait until pipe is no more empty or no writer exists */
378    pipe->waitingReaders ++;
379    PIPE_UNLOCK(pipe);
380    if (! PIPE_READWAIT(pipe))
381      ret = -EINTR;
382    if (! PIPE_LOCK(pipe)) {
383      /* WARN waitingReaders not restored! */
384      ret = -EINTR;
385      goto out_nolock;
386    }
387    pipe->waitingReaders --;
388    if (ret != 0)
389      goto out_locked;
390  }
391
392  /* Read chunk bytes */
393  chunk = MIN(count - read,  pipe->Length);
394  chunk1 = pipe->Size - pipe->Start;
395  if (chunk > chunk1) {
396    memcpy(buffer + read, pipe->Buffer + pipe->Start, chunk1);
397    memcpy(buffer + read + chunk1, pipe->Buffer, chunk - chunk1);
398  }
399  else
400    memcpy(buffer + read, pipe->Buffer + pipe->Start, chunk);
401
402  pipe->Start += chunk;
403  pipe->Start %= pipe->Size;
404  pipe->Length -= chunk;
405  /* For buffering optimization */
406  if (PIPE_EMPTY(pipe))
407    pipe->Start = 0;
408
409  if (pipe->waitingWriters > 0)
410    PIPE_WAKEUPWRITERS(pipe);
411  read += chunk;
412
413out_locked:
414  PIPE_UNLOCK(pipe);
415
416out_nolock:
417  if (read > 0)
418    return read;
419  return ret;
420}
421
422ssize_t pipe_write(
423  pipe_control_t *pipe,
424  const void     *buffer,
425  size_t          count,
426  rtems_libio_t  *iop
427)
428{
429  int chunk, chunk1, written = 0, ret = 0;
430
431  /* Write nothing */
432  if (count == 0)
433    return 0;
434
435  if (! PIPE_LOCK(pipe))
436    return -EINTR;
437
438  if (pipe->Readers == 0) {
439    ret = -EPIPE;
440    goto out_locked;
441  }
442
443  /* Write of PIPE_BUF bytes or less shall not be interleaved */
444  chunk = count <= pipe->Size ? count : 1;
445
446  while (written < count) {
447    while (PIPE_SPACE(pipe) < chunk) {
448      if (LIBIO_NODELAY(iop)) {
449        ret = -EAGAIN;
450        goto out_locked;
451      }
452
453      /* Wait until there is chunk bytes space or no reader exists */
454      pipe->waitingWriters ++;
455      PIPE_UNLOCK(pipe);
456      if (! PIPE_WRITEWAIT(pipe))
457        ret = -EINTR;
458      if (! PIPE_LOCK(pipe)) {
459        /* WARN waitingWriters not restored! */
460        ret = -EINTR;
461        goto out_nolock;
462      }
463      pipe->waitingWriters --;
464      if (ret != 0)
465        goto out_locked;
466
467      if (pipe->Readers == 0) {
468        ret = -EPIPE;
469        goto out_locked;
470      }
471    }
472
473    chunk = MIN(count - written, PIPE_SPACE(pipe));
474    chunk1 = pipe->Size - PIPE_WSTART(pipe);
475    if (chunk > chunk1) {
476      memcpy(pipe->Buffer + PIPE_WSTART(pipe), buffer + written, chunk1);
477      memcpy(pipe->Buffer, buffer + written + chunk1, chunk - chunk1);
478    }
479    else
480      memcpy(pipe->Buffer + PIPE_WSTART(pipe), buffer + written, chunk);
481
482    pipe->Length += chunk;
483    if (pipe->waitingReaders > 0)
484      PIPE_WAKEUPREADERS(pipe);
485    written += chunk;
486    /* Write of more than PIPE_BUF bytes can be interleaved */
487    chunk = 1;
488  }
489
490out_locked:
491  PIPE_UNLOCK(pipe);
492
493out_nolock:
494#ifdef RTEMS_POSIX_API
495  /* Signal SIGPIPE */
496  if (ret == -EPIPE)
497    kill(getpid(), SIGPIPE);
498#endif
499
500  if (written > 0)
501    return written;
502  return ret;
503}
504
505int pipe_ioctl(
506  pipe_control_t  *pipe,
507  ioctl_command_t  cmd,
508  void            *buffer,
509  rtems_libio_t   *iop
510)
511{
512  if (cmd == FIONREAD) {
513    if (buffer == NULL)
514      return -EFAULT;
515
516    if (! PIPE_LOCK(pipe))
517      return -EINTR;
518
519    /* Return length of pipe */
520    *(unsigned int *)buffer = pipe->Length;
521    PIPE_UNLOCK(pipe);
522    return 0;
523  }
524
525  return -EINVAL;
526}
Note: See TracBrowser for help on using the repository browser.