source: rtems/cpukit/libfs/src/pipe/fifo.c @ 0ec9bbc

5
Last change on this file since 0ec9bbc was 8ddd92d, checked in by Sebastian Huber <sebastian.huber@…>, on 12/14/17 at 05:12:59

pipe: Use self-contained mutex

Update #2843.

  • Property mode set to 100644
File size: 9.6 KB
Line 
1/**
2 * @file
3 *
4 * @brief FIFO/Pipe Support
5 * @ingroup FIFO_PIPE
6 */
7
8/*
9 * Author: Wei Shen <cquark@gmail.com>
10 *
11 * The license and distribution terms for this file may be
12 * found in the file LICENSE in this distribution or at
13 * http://www.rtems.org/license/LICENSE.
14 */
15
16
17#if HAVE_CONFIG_H
18#include "config.h"
19#endif
20
21#include <sys/param.h>
22#include <sys/filio.h>
23#include <errno.h>
24#include <stdlib.h>
25#include <string.h>
26
27#include <rtems.h>
28#include <rtems/libio_.h>
29#include <rtems/pipe.h>
30#include <rtems/rtems/barrierimpl.h>
31#include <rtems/score/statesimpl.h>
32
33#define LIBIO_ACCMODE(_iop) (rtems_libio_iop_flags(_iop) & LIBIO_FLAGS_READ_WRITE)
34#define LIBIO_NODELAY(_iop) rtems_libio_iop_is_no_delay(_iop)
35
36static rtems_mutex pipe_mutex = RTEMS_MUTEX_INITIALIZER("Pipes");
37
38
39#define PIPE_EMPTY(_pipe) (_pipe->Length == 0)
40#define PIPE_FULL(_pipe)  (_pipe->Length == _pipe->Size)
41#define PIPE_SPACE(_pipe) (_pipe->Size - _pipe->Length)
42#define PIPE_WSTART(_pipe) ((_pipe->Start + _pipe->Length) % _pipe->Size)
43
44#define PIPE_LOCK(_pipe) rtems_mutex_lock(&(_pipe)->Mutex)
45
46#define PIPE_UNLOCK(_pipe) rtems_mutex_unlock(&(_pipe)->Mutex)
47
48#define PIPE_READWAIT(_pipe)  \
49  ( rtems_barrier_wait(_pipe->readBarrier, RTEMS_NO_TIMEOUT)  \
50   == RTEMS_SUCCESSFUL)
51
52#define PIPE_WRITEWAIT(_pipe)  \
53  ( rtems_barrier_wait(_pipe->writeBarrier, RTEMS_NO_TIMEOUT)  \
54   == RTEMS_SUCCESSFUL)
55
56#define PIPE_WAKEUPREADERS(_pipe) \
57  do {uint32_t n; rtems_barrier_release(_pipe->readBarrier, &n); } while(0)
58
59#define PIPE_WAKEUPWRITERS(_pipe) \
60  do {uint32_t n; rtems_barrier_release(_pipe->writeBarrier, &n); } while(0)
61
62/*
63 * Alloc pipe control structure, buffer, and resources.
64 * Called with pipe_semaphore held.
65 */
66static int pipe_alloc(
67  pipe_control_t **pipep
68)
69{
70  static char c = 'a';
71  pipe_control_t *pipe;
72  int err = -ENOMEM;
73
74  pipe = malloc(sizeof(pipe_control_t));
75  if (pipe == NULL)
76    return err;
77  memset(pipe, 0, sizeof(pipe_control_t));
78
79  pipe->Size = PIPE_BUF;
80  pipe->Buffer = malloc(pipe->Size);
81  if (! pipe->Buffer)
82    goto err_buf;
83
84  err = -ENOMEM;
85
86  if (rtems_barrier_create(
87        rtems_build_name ('P', 'I', 'r', c),
88        RTEMS_BARRIER_MANUAL_RELEASE, 0,
89        &pipe->readBarrier) != RTEMS_SUCCESSFUL)
90    goto err_rbar;
91  if (rtems_barrier_create(
92        rtems_build_name ('P', 'I', 'w', c),
93        RTEMS_BARRIER_MANUAL_RELEASE, 0,
94        &pipe->writeBarrier) != RTEMS_SUCCESSFUL)
95    goto err_wbar;
96  rtems_mutex_init(&pipe->Mutex, "Pipe");
97
98  *pipep = pipe;
99  if (c ++ == 'z')
100    c = 'a';
101  return 0;
102
103  rtems_barrier_delete(pipe->writeBarrier);
104err_wbar:
105  rtems_barrier_delete(pipe->readBarrier);
106err_rbar:
107  free(pipe->Buffer);
108err_buf:
109  free(pipe);
110  return err;
111}
112
113/* Called with pipe_semaphore held. */
114static inline void pipe_free(
115  pipe_control_t *pipe
116)
117{
118  rtems_barrier_delete(pipe->readBarrier);
119  rtems_barrier_delete(pipe->writeBarrier);
120  rtems_mutex_destroy(&pipe->Mutex);
121  free(pipe->Buffer);
122  free(pipe);
123}
124
125static void pipe_lock(void)
126{
127  rtems_mutex_lock(&pipe_mutex);
128}
129
130static void pipe_unlock(void)
131{
132  rtems_mutex_unlock(&pipe_mutex);
133}
134
135/*
136 * If called with *pipep = NULL, pipe_new will call pipe_alloc to allocate a
137 * pipe control structure and set *pipep to its address.
138 * pipe is locked, when pipe_new returns with no error.
139 */
140static int pipe_new(
141  pipe_control_t **pipep
142)
143{
144  pipe_control_t *pipe;
145  int err = 0;
146
147  _Assert( pipep );
148  pipe_lock();
149
150  pipe = *pipep;
151  if (pipe == NULL) {
152    err = pipe_alloc(&pipe);
153    if (err)
154      goto out;
155  }
156
157  PIPE_LOCK(pipe);
158
159  if (*pipep == NULL) {
160    if (err)
161      pipe_free(pipe);
162    else
163      *pipep = pipe;
164  }
165
166out:
167  pipe_unlock();
168  return err;
169}
170
171void pipe_release(
172  pipe_control_t **pipep,
173  rtems_libio_t *iop
174)
175{
176  pipe_control_t *pipe = *pipep;
177  uint32_t mode;
178
179  pipe_lock();
180  PIPE_LOCK(pipe);
181
182  mode = LIBIO_ACCMODE(iop);
183  if (mode & LIBIO_FLAGS_READ)
184     pipe->Readers --;
185  if (mode & LIBIO_FLAGS_WRITE)
186     pipe->Writers --;
187
188  PIPE_UNLOCK(pipe);
189
190  if (pipe->Readers == 0 && pipe->Writers == 0) {
191#if 0
192    /* To delete an anonymous pipe file when all users closed it */
193    if (pipe->Anonymous)
194      delfile = TRUE;
195#endif
196    pipe_free(pipe);
197    *pipep = NULL;
198  }
199  else if (pipe->Readers == 0 && mode != LIBIO_FLAGS_WRITE)
200    /* Notify waiting Writers that all their partners left */
201    PIPE_WAKEUPWRITERS(pipe);
202  else if (pipe->Writers == 0 && mode != LIBIO_FLAGS_READ)
203    PIPE_WAKEUPREADERS(pipe);
204
205  pipe_unlock();
206
207#if 0
208  if (! delfile)
209    return;
210  if (iop->pathinfo.ops->unlink_h == NULL)
211    return;
212
213  /* This is safe for IMFS, but how about other FSes? */
214  rtems_libio_iop_flags_clear( iop, LIBIO_FLAGS_OPEN );
215  if(iop->pathinfo.ops->unlink_h(&iop->pathinfo))
216    return;
217#endif
218
219}
220
221int fifo_open(
222  pipe_control_t **pipep,
223  rtems_libio_t *iop
224)
225{
226  pipe_control_t *pipe;
227  unsigned int prevCounter;
228  int err;
229
230  err = pipe_new(pipep);
231  if (err)
232    return err;
233  pipe = *pipep;
234
235  switch (LIBIO_ACCMODE(iop)) {
236    case LIBIO_FLAGS_READ:
237      pipe->readerCounter ++;
238      if (pipe->Readers ++ == 0)
239        PIPE_WAKEUPWRITERS(pipe);
240
241      if (pipe->Writers == 0) {
242        /* Not an error */
243        if (LIBIO_NODELAY(iop))
244          break;
245
246        prevCounter = pipe->writerCounter;
247        err = -EINTR;
248        /* Wait until a writer opens the pipe */
249        do {
250          PIPE_UNLOCK(pipe);
251          if (! PIPE_READWAIT(pipe))
252            goto out_error;
253          PIPE_LOCK(pipe);
254        } while (prevCounter == pipe->writerCounter);
255      }
256      break;
257
258    case LIBIO_FLAGS_WRITE:
259      pipe->writerCounter ++;
260
261      if (pipe->Writers ++ == 0)
262        PIPE_WAKEUPREADERS(pipe);
263
264      if (pipe->Readers == 0 && LIBIO_NODELAY(iop)) {
265        PIPE_UNLOCK(pipe);
266        err = -ENXIO;
267        goto out_error;
268      }
269
270      if (pipe->Readers == 0) {
271        prevCounter = pipe->readerCounter;
272        err = -EINTR;
273        do {
274          PIPE_UNLOCK(pipe);
275          if (! PIPE_WRITEWAIT(pipe))
276            goto out_error;
277          PIPE_LOCK(pipe);
278        } while (prevCounter == pipe->readerCounter);
279      }
280      break;
281
282    case LIBIO_FLAGS_READ_WRITE:
283      pipe->readerCounter ++;
284      if (pipe->Readers ++ == 0)
285        PIPE_WAKEUPWRITERS(pipe);
286      pipe->writerCounter ++;
287      if (pipe->Writers ++ == 0)
288        PIPE_WAKEUPREADERS(pipe);
289      break;
290  }
291
292  PIPE_UNLOCK(pipe);
293  return 0;
294
295out_error:
296  pipe_release(pipep, iop);
297  return err;
298}
299
300ssize_t pipe_read(
301  pipe_control_t *pipe,
302  void           *buffer,
303  size_t          count,
304  rtems_libio_t  *iop
305)
306{
307  int chunk, chunk1, read = 0, ret = 0;
308
309  PIPE_LOCK(pipe);
310
311  while (PIPE_EMPTY(pipe)) {
312    /* Not an error */
313    if (pipe->Writers == 0)
314      goto out_locked;
315
316    if (LIBIO_NODELAY(iop)) {
317      ret = -EAGAIN;
318      goto out_locked;
319    }
320
321    /* Wait until pipe is no more empty or no writer exists */
322    pipe->waitingReaders ++;
323    PIPE_UNLOCK(pipe);
324    if (! PIPE_READWAIT(pipe))
325      ret = -EINTR;
326    PIPE_LOCK(pipe);
327    pipe->waitingReaders --;
328    if (ret != 0)
329      goto out_locked;
330  }
331
332  /* Read chunk bytes */
333  chunk = MIN(count - read,  pipe->Length);
334  chunk1 = pipe->Size - pipe->Start;
335  if (chunk > chunk1) {
336    memcpy(buffer + read, pipe->Buffer + pipe->Start, chunk1);
337    memcpy(buffer + read + chunk1, pipe->Buffer, chunk - chunk1);
338  }
339  else
340    memcpy(buffer + read, pipe->Buffer + pipe->Start, chunk);
341
342  pipe->Start += chunk;
343  pipe->Start %= pipe->Size;
344  pipe->Length -= chunk;
345  /* For buffering optimization */
346  if (PIPE_EMPTY(pipe))
347    pipe->Start = 0;
348
349  if (pipe->waitingWriters > 0)
350    PIPE_WAKEUPWRITERS(pipe);
351  read += chunk;
352
353out_locked:
354  PIPE_UNLOCK(pipe);
355
356  if (read > 0)
357    return read;
358  return ret;
359}
360
361ssize_t pipe_write(
362  pipe_control_t *pipe,
363  const void     *buffer,
364  size_t          count,
365  rtems_libio_t  *iop
366)
367{
368  int chunk, chunk1, written = 0, ret = 0;
369
370  /* Write nothing */
371  if (count == 0)
372    return 0;
373
374  PIPE_LOCK(pipe);
375
376  if (pipe->Readers == 0) {
377    ret = -EPIPE;
378    goto out_locked;
379  }
380
381  /* Write of PIPE_BUF bytes or less shall not be interleaved */
382  chunk = count <= pipe->Size ? count : 1;
383
384  while (written < count) {
385    while (PIPE_SPACE(pipe) < chunk) {
386      if (LIBIO_NODELAY(iop)) {
387        ret = -EAGAIN;
388        goto out_locked;
389      }
390
391      /* Wait until there is chunk bytes space or no reader exists */
392      pipe->waitingWriters ++;
393      PIPE_UNLOCK(pipe);
394      if (! PIPE_WRITEWAIT(pipe))
395        ret = -EINTR;
396      PIPE_LOCK(pipe);
397      pipe->waitingWriters --;
398      if (ret != 0)
399        goto out_locked;
400
401      if (pipe->Readers == 0) {
402        ret = -EPIPE;
403        goto out_locked;
404      }
405    }
406
407    chunk = MIN(count - written, PIPE_SPACE(pipe));
408    chunk1 = pipe->Size - PIPE_WSTART(pipe);
409    if (chunk > chunk1) {
410      memcpy(pipe->Buffer + PIPE_WSTART(pipe), buffer + written, chunk1);
411      memcpy(pipe->Buffer, buffer + written + chunk1, chunk - chunk1);
412    }
413    else
414      memcpy(pipe->Buffer + PIPE_WSTART(pipe), buffer + written, chunk);
415
416    pipe->Length += chunk;
417    if (pipe->waitingReaders > 0)
418      PIPE_WAKEUPREADERS(pipe);
419    written += chunk;
420    /* Write of more than PIPE_BUF bytes can be interleaved */
421    chunk = 1;
422  }
423
424out_locked:
425  PIPE_UNLOCK(pipe);
426
427#ifdef RTEMS_POSIX_API
428  /* Signal SIGPIPE */
429  if (ret == -EPIPE)
430    kill(getpid(), SIGPIPE);
431#endif
432
433  if (written > 0)
434    return written;
435  return ret;
436}
437
438int pipe_ioctl(
439  pipe_control_t  *pipe,
440  ioctl_command_t  cmd,
441  void            *buffer,
442  rtems_libio_t   *iop
443)
444{
445  if (cmd == FIONREAD) {
446    if (buffer == NULL)
447      return -EFAULT;
448
449    PIPE_LOCK(pipe);
450
451    /* Return length of pipe */
452    *(unsigned int *)buffer = pipe->Length;
453    PIPE_UNLOCK(pipe);
454    return 0;
455  }
456
457  return -EINVAL;
458}
Note: See TracBrowser for help on using the repository browser.