source: rtems/cpukit/libfs/src/pipe/fifo.c @ b1b6dd71

5
Last change on this file since b1b6dd71 was b1b6dd71, checked in by Sebastian Huber <sebastian.huber@…>, on 12/11/19 at 15:45:37

pipe: Use condition variables

Use self-contained condition variables instead of Classic API barriers.
This simplifies the implementation and configuration.

Update #3840.

  • Property mode set to 100644
File size: 8.7 KB
Line 
1/**
2 * @file
3 *
4 * @brief FIFO/Pipe Support
5 * @ingroup FIFO_PIPE
6 */
7
8/*
9 * Author: Wei Shen <cquark@gmail.com>
10 *
11 * The license and distribution terms for this file may be
12 * found in the file LICENSE in this distribution or at
13 * http://www.rtems.org/license/LICENSE.
14 */
15
16
17#if HAVE_CONFIG_H
18#include "config.h"
19#endif
20
21#include <sys/param.h>
22#include <sys/filio.h>
23#include <errno.h>
24#include <stdlib.h>
25#include <string.h>
26
27#include <rtems.h>
28#include <rtems/libio_.h>
29#include <rtems/pipe.h>
30
31#define LIBIO_ACCMODE(_iop) (rtems_libio_iop_flags(_iop) & LIBIO_FLAGS_READ_WRITE)
32#define LIBIO_NODELAY(_iop) rtems_libio_iop_is_no_delay(_iop)
33
34static rtems_mutex pipe_mutex = RTEMS_MUTEX_INITIALIZER("Pipes");
35
36
37#define PIPE_EMPTY(_pipe) (_pipe->Length == 0)
38#define PIPE_FULL(_pipe)  (_pipe->Length == _pipe->Size)
39#define PIPE_SPACE(_pipe) (_pipe->Size - _pipe->Length)
40#define PIPE_WSTART(_pipe) ((_pipe->Start + _pipe->Length) % _pipe->Size)
41
42#define PIPE_LOCK(_pipe) rtems_mutex_lock(&(_pipe)->Mutex)
43
44#define PIPE_UNLOCK(_pipe) rtems_mutex_unlock(&(_pipe)->Mutex)
45
46#define PIPE_READWAIT(_pipe)  \
47  rtems_condition_variable_wait(&(_pipe)->readBarrier, &(_pipe)->Mutex)
48
49#define PIPE_WRITEWAIT(_pipe)  \
50  rtems_condition_variable_wait(&(_pipe)->writeBarrier, &(_pipe)->Mutex)
51
52#define PIPE_WAKEUPREADERS(_pipe) \
53  rtems_condition_variable_broadcast(&(_pipe)->readBarrier)
54
55#define PIPE_WAKEUPWRITERS(_pipe) \
56  rtems_condition_variable_broadcast(&(_pipe)->writeBarrier)
57
58/*
59 * Alloc pipe control structure, buffer, and resources.
60 * Called with pipe_semaphore held.
61 */
62static int pipe_alloc(
63  pipe_control_t **pipep
64)
65{
66  static char c = 'a';
67  pipe_control_t *pipe;
68  int err = -ENOMEM;
69
70  pipe = malloc(sizeof(pipe_control_t));
71  if (pipe == NULL)
72    return err;
73  memset(pipe, 0, sizeof(pipe_control_t));
74
75  pipe->Size = PIPE_BUF;
76  pipe->Buffer = malloc(pipe->Size);
77  if (pipe->Buffer == NULL) {
78    free(pipe);
79    return -ENOMEM;
80  }
81
82  rtems_condition_variable_init(&pipe->readBarrier, "Pipe Read");
83  rtems_condition_variable_init(&pipe->writeBarrier, "Pipe Write");
84  rtems_mutex_init(&pipe->Mutex, "Pipe");
85
86  *pipep = pipe;
87  if (c ++ == 'z')
88    c = 'a';
89  return 0;
90}
91
92/* Called with pipe_semaphore held. */
93static inline void pipe_free(
94  pipe_control_t *pipe
95)
96{
97  rtems_condition_variable_destroy(&pipe->readBarrier);
98  rtems_condition_variable_destroy(&pipe->writeBarrier);
99  rtems_mutex_destroy(&pipe->Mutex);
100  free(pipe->Buffer);
101  free(pipe);
102}
103
104static void pipe_lock(void)
105{
106  rtems_mutex_lock(&pipe_mutex);
107}
108
109static void pipe_unlock(void)
110{
111  rtems_mutex_unlock(&pipe_mutex);
112}
113
114/*
115 * If called with *pipep = NULL, pipe_new will call pipe_alloc to allocate a
116 * pipe control structure and set *pipep to its address.
117 * pipe is locked, when pipe_new returns with no error.
118 */
119static int pipe_new(
120  pipe_control_t **pipep
121)
122{
123  pipe_control_t *pipe;
124  int err = 0;
125
126  _Assert( pipep );
127  pipe_lock();
128
129  pipe = *pipep;
130  if (pipe == NULL) {
131    err = pipe_alloc(&pipe);
132    if (err) {
133      pipe_unlock();
134      return err;
135    }
136  }
137
138  PIPE_LOCK(pipe);
139
140  *pipep = pipe;
141  pipe_unlock();
142  return err;
143}
144
145void pipe_release(
146  pipe_control_t **pipep,
147  rtems_libio_t *iop
148)
149{
150  pipe_control_t *pipe = *pipep;
151  uint32_t mode;
152
153  pipe_lock();
154  PIPE_LOCK(pipe);
155
156  mode = LIBIO_ACCMODE(iop);
157  if (mode & LIBIO_FLAGS_READ)
158     pipe->Readers --;
159  if (mode & LIBIO_FLAGS_WRITE)
160     pipe->Writers --;
161
162  PIPE_UNLOCK(pipe);
163
164  if (pipe->Readers == 0 && pipe->Writers == 0) {
165#if 0
166    /* To delete an anonymous pipe file when all users closed it */
167    if (pipe->Anonymous)
168      delfile = TRUE;
169#endif
170    pipe_free(pipe);
171    *pipep = NULL;
172  }
173  else if (pipe->Readers == 0 && mode != LIBIO_FLAGS_WRITE)
174    /* Notify waiting Writers that all their partners left */
175    PIPE_WAKEUPWRITERS(pipe);
176  else if (pipe->Writers == 0 && mode != LIBIO_FLAGS_READ)
177    PIPE_WAKEUPREADERS(pipe);
178
179  pipe_unlock();
180
181#if 0
182  if (! delfile)
183    return;
184  if (iop->pathinfo.ops->unlink_h == NULL)
185    return;
186
187  /* This is safe for IMFS, but how about other FSes? */
188  rtems_libio_iop_flags_clear( iop, LIBIO_FLAGS_OPEN );
189  if(iop->pathinfo.ops->unlink_h(&iop->pathinfo))
190    return;
191#endif
192
193}
194
195int fifo_open(
196  pipe_control_t **pipep,
197  rtems_libio_t *iop
198)
199{
200  pipe_control_t *pipe;
201  unsigned int prevCounter;
202  int err;
203
204  err = pipe_new(pipep);
205  if (err)
206    return err;
207  pipe = *pipep;
208
209  switch (LIBIO_ACCMODE(iop)) {
210    case LIBIO_FLAGS_READ:
211      pipe->readerCounter ++;
212      if (pipe->Readers ++ == 0)
213        PIPE_WAKEUPWRITERS(pipe);
214
215      if (pipe->Writers == 0) {
216        /* Not an error */
217        if (LIBIO_NODELAY(iop))
218          break;
219
220        prevCounter = pipe->writerCounter;
221        err = -EINTR;
222        /* Wait until a writer opens the pipe */
223        do {
224          PIPE_READWAIT(pipe);
225        } while (prevCounter == pipe->writerCounter);
226      }
227      break;
228
229    case LIBIO_FLAGS_WRITE:
230      pipe->writerCounter ++;
231
232      if (pipe->Writers ++ == 0)
233        PIPE_WAKEUPREADERS(pipe);
234
235      if (pipe->Readers == 0 && LIBIO_NODELAY(iop)) {
236        PIPE_UNLOCK(pipe);
237        err = -ENXIO;
238        goto out_error;
239      }
240
241      if (pipe->Readers == 0) {
242        prevCounter = pipe->readerCounter;
243        err = -EINTR;
244        do {
245          PIPE_WRITEWAIT(pipe);
246        } while (prevCounter == pipe->readerCounter);
247      }
248      break;
249
250    case LIBIO_FLAGS_READ_WRITE:
251      pipe->readerCounter ++;
252      if (pipe->Readers ++ == 0)
253        PIPE_WAKEUPWRITERS(pipe);
254      pipe->writerCounter ++;
255      if (pipe->Writers ++ == 0)
256        PIPE_WAKEUPREADERS(pipe);
257      break;
258  }
259
260  PIPE_UNLOCK(pipe);
261  return 0;
262
263out_error:
264  pipe_release(pipep, iop);
265  return err;
266}
267
268ssize_t pipe_read(
269  pipe_control_t *pipe,
270  void           *buffer,
271  size_t          count,
272  rtems_libio_t  *iop
273)
274{
275  int chunk, chunk1, read = 0, ret = 0;
276
277  PIPE_LOCK(pipe);
278
279  while (PIPE_EMPTY(pipe)) {
280    /* Not an error */
281    if (pipe->Writers == 0)
282      goto out_locked;
283
284    if (LIBIO_NODELAY(iop)) {
285      ret = -EAGAIN;
286      goto out_locked;
287    }
288
289    /* Wait until pipe is no more empty or no writer exists */
290    pipe->waitingReaders ++;
291    PIPE_READWAIT(pipe);
292    pipe->waitingReaders --;
293    if (ret != 0)
294      goto out_locked;
295  }
296
297  /* Read chunk bytes */
298  chunk = MIN(count - read,  pipe->Length);
299  chunk1 = pipe->Size - pipe->Start;
300  if (chunk > chunk1) {
301    memcpy(buffer + read, pipe->Buffer + pipe->Start, chunk1);
302    memcpy(buffer + read + chunk1, pipe->Buffer, chunk - chunk1);
303  }
304  else
305    memcpy(buffer + read, pipe->Buffer + pipe->Start, chunk);
306
307  pipe->Start += chunk;
308  pipe->Start %= pipe->Size;
309  pipe->Length -= chunk;
310  /* For buffering optimization */
311  if (PIPE_EMPTY(pipe))
312    pipe->Start = 0;
313
314  if (pipe->waitingWriters > 0)
315    PIPE_WAKEUPWRITERS(pipe);
316  read += chunk;
317
318out_locked:
319  PIPE_UNLOCK(pipe);
320
321  if (read > 0)
322    return read;
323  return ret;
324}
325
326ssize_t pipe_write(
327  pipe_control_t *pipe,
328  const void     *buffer,
329  size_t          count,
330  rtems_libio_t  *iop
331)
332{
333  int chunk, chunk1, written = 0, ret = 0;
334
335  /* Write nothing */
336  if (count == 0)
337    return 0;
338
339  PIPE_LOCK(pipe);
340
341  if (pipe->Readers == 0) {
342    ret = -EPIPE;
343    goto out_locked;
344  }
345
346  /* Write of PIPE_BUF bytes or less shall not be interleaved */
347  chunk = count <= pipe->Size ? count : 1;
348
349  while (written < count) {
350    while (PIPE_SPACE(pipe) < chunk) {
351      if (LIBIO_NODELAY(iop)) {
352        ret = -EAGAIN;
353        goto out_locked;
354      }
355
356      /* Wait until there is chunk bytes space or no reader exists */
357      pipe->waitingWriters ++;
358      PIPE_WRITEWAIT(pipe);
359      pipe->waitingWriters --;
360      if (ret != 0)
361        goto out_locked;
362
363      if (pipe->Readers == 0) {
364        ret = -EPIPE;
365        goto out_locked;
366      }
367    }
368
369    chunk = MIN(count - written, PIPE_SPACE(pipe));
370    chunk1 = pipe->Size - PIPE_WSTART(pipe);
371    if (chunk > chunk1) {
372      memcpy(pipe->Buffer + PIPE_WSTART(pipe), buffer + written, chunk1);
373      memcpy(pipe->Buffer, buffer + written + chunk1, chunk - chunk1);
374    }
375    else
376      memcpy(pipe->Buffer + PIPE_WSTART(pipe), buffer + written, chunk);
377
378    pipe->Length += chunk;
379    if (pipe->waitingReaders > 0)
380      PIPE_WAKEUPREADERS(pipe);
381    written += chunk;
382    /* Write of more than PIPE_BUF bytes can be interleaved */
383    chunk = 1;
384  }
385
386out_locked:
387  PIPE_UNLOCK(pipe);
388
389#ifdef RTEMS_POSIX_API
390  /* Signal SIGPIPE */
391  if (ret == -EPIPE)
392    kill(getpid(), SIGPIPE);
393#endif
394
395  if (written > 0)
396    return written;
397  return ret;
398}
399
400int pipe_ioctl(
401  pipe_control_t  *pipe,
402  ioctl_command_t  cmd,
403  void            *buffer,
404  rtems_libio_t   *iop
405)
406{
407  if (cmd == FIONREAD) {
408    if (buffer == NULL)
409      return -EFAULT;
410
411    PIPE_LOCK(pipe);
412
413    /* Return length of pipe */
414    *(unsigned int *)buffer = pipe->Length;
415    PIPE_UNLOCK(pipe);
416    return 0;
417  }
418
419  return -EINVAL;
420}
Note: See TracBrowser for help on using the repository browser.