source: rtems/cpukit/libfs/src/pipe/fifo.c @ b0f524f

Last change on this file since b0f524f was b0f524f, checked in by Joel Sherrill <joel@…>, on Mar 18, 2019 at 1:46:56 PM

libfs/src/pipe/fifo.c: Fix warning.

  • Property mode set to 100644
File size: 9.5 KB
Line 
1/**
2 * @file
3 *
4 * @brief FIFO/Pipe Support
5 * @ingroup FIFO_PIPE
6 */
7
8/*
9 * Author: Wei Shen <cquark@gmail.com>
10 *
11 * The license and distribution terms for this file may be
12 * found in the file LICENSE in this distribution or at
13 * http://www.rtems.org/license/LICENSE.
14 */
15
16
17#if HAVE_CONFIG_H
18#include "config.h"
19#endif
20
21#include <sys/param.h>
22#include <sys/filio.h>
23#include <errno.h>
24#include <stdlib.h>
25#include <string.h>
26
27#include <rtems.h>
28#include <rtems/libio_.h>
29#include <rtems/pipe.h>
30#include <rtems/rtems/barrierimpl.h>
31#include <rtems/score/statesimpl.h>
32
33#define LIBIO_ACCMODE(_iop) (rtems_libio_iop_flags(_iop) & LIBIO_FLAGS_READ_WRITE)
34#define LIBIO_NODELAY(_iop) rtems_libio_iop_is_no_delay(_iop)
35
36static rtems_mutex pipe_mutex = RTEMS_MUTEX_INITIALIZER("Pipes");
37
38
39#define PIPE_EMPTY(_pipe) (_pipe->Length == 0)
40#define PIPE_FULL(_pipe)  (_pipe->Length == _pipe->Size)
41#define PIPE_SPACE(_pipe) (_pipe->Size - _pipe->Length)
42#define PIPE_WSTART(_pipe) ((_pipe->Start + _pipe->Length) % _pipe->Size)
43
44#define PIPE_LOCK(_pipe) rtems_mutex_lock(&(_pipe)->Mutex)
45
46#define PIPE_UNLOCK(_pipe) rtems_mutex_unlock(&(_pipe)->Mutex)
47
48#define PIPE_READWAIT(_pipe)  \
49  ( rtems_barrier_wait(_pipe->readBarrier, RTEMS_NO_TIMEOUT)  \
50   == RTEMS_SUCCESSFUL)
51
52#define PIPE_WRITEWAIT(_pipe)  \
53  ( rtems_barrier_wait(_pipe->writeBarrier, RTEMS_NO_TIMEOUT)  \
54   == RTEMS_SUCCESSFUL)
55
56#define PIPE_WAKEUPREADERS(_pipe) \
57  do {uint32_t n; rtems_barrier_release(_pipe->readBarrier, &n); } while(0)
58
59#define PIPE_WAKEUPWRITERS(_pipe) \
60  do {uint32_t n; rtems_barrier_release(_pipe->writeBarrier, &n); } while(0)
61
62/*
63 * Alloc pipe control structure, buffer, and resources.
64 * Called with pipe_semaphore held.
65 */
66static int pipe_alloc(
67  pipe_control_t **pipep
68)
69{
70  static char c = 'a';
71  pipe_control_t *pipe;
72  int err = -ENOMEM;
73
74  pipe = malloc(sizeof(pipe_control_t));
75  if (pipe == NULL)
76    return err;
77  memset(pipe, 0, sizeof(pipe_control_t));
78
79  pipe->Size = PIPE_BUF;
80  pipe->Buffer = malloc(pipe->Size);
81  if (! pipe->Buffer)
82    goto err_buf;
83
84  err = -ENOMEM;
85
86  if (rtems_barrier_create(
87        rtems_build_name ('P', 'I', 'r', c),
88        RTEMS_BARRIER_MANUAL_RELEASE, 0,
89        &pipe->readBarrier) != RTEMS_SUCCESSFUL)
90    goto err_rbar;
91  if (rtems_barrier_create(
92        rtems_build_name ('P', 'I', 'w', c),
93        RTEMS_BARRIER_MANUAL_RELEASE, 0,
94        &pipe->writeBarrier) != RTEMS_SUCCESSFUL)
95    goto err_wbar;
96  rtems_mutex_init(&pipe->Mutex, "Pipe");
97
98  *pipep = pipe;
99  if (c ++ == 'z')
100    c = 'a';
101  return 0;
102
103err_wbar:
104  rtems_barrier_delete(pipe->readBarrier);
105err_rbar:
106  free(pipe->Buffer);
107err_buf:
108  free(pipe);
109  return err;
110}
111
112/* Called with pipe_semaphore held. */
113static inline void pipe_free(
114  pipe_control_t *pipe
115)
116{
117  rtems_barrier_delete(pipe->readBarrier);
118  rtems_barrier_delete(pipe->writeBarrier);
119  rtems_mutex_destroy(&pipe->Mutex);
120  free(pipe->Buffer);
121  free(pipe);
122}
123
124static void pipe_lock(void)
125{
126  rtems_mutex_lock(&pipe_mutex);
127}
128
129static void pipe_unlock(void)
130{
131  rtems_mutex_unlock(&pipe_mutex);
132}
133
134/*
135 * If called with *pipep = NULL, pipe_new will call pipe_alloc to allocate a
136 * pipe control structure and set *pipep to its address.
137 * pipe is locked, when pipe_new returns with no error.
138 */
139static int pipe_new(
140  pipe_control_t **pipep
141)
142{
143  pipe_control_t *pipe;
144  int err = 0;
145
146  _Assert( pipep );
147  pipe_lock();
148
149  pipe = *pipep;
150  if (pipe == NULL) {
151    err = pipe_alloc(&pipe);
152    if (err) {
153      pipe_unlock();
154      return err;
155    }
156  }
157
158  PIPE_LOCK(pipe);
159
160  *pipep = pipe;
161  pipe_unlock();
162  return err;
163}
164
165void pipe_release(
166  pipe_control_t **pipep,
167  rtems_libio_t *iop
168)
169{
170  pipe_control_t *pipe = *pipep;
171  uint32_t mode;
172
173  pipe_lock();
174  PIPE_LOCK(pipe);
175
176  mode = LIBIO_ACCMODE(iop);
177  if (mode & LIBIO_FLAGS_READ)
178     pipe->Readers --;
179  if (mode & LIBIO_FLAGS_WRITE)
180     pipe->Writers --;
181
182  PIPE_UNLOCK(pipe);
183
184  if (pipe->Readers == 0 && pipe->Writers == 0) {
185#if 0
186    /* To delete an anonymous pipe file when all users closed it */
187    if (pipe->Anonymous)
188      delfile = TRUE;
189#endif
190    pipe_free(pipe);
191    *pipep = NULL;
192  }
193  else if (pipe->Readers == 0 && mode != LIBIO_FLAGS_WRITE)
194    /* Notify waiting Writers that all their partners left */
195    PIPE_WAKEUPWRITERS(pipe);
196  else if (pipe->Writers == 0 && mode != LIBIO_FLAGS_READ)
197    PIPE_WAKEUPREADERS(pipe);
198
199  pipe_unlock();
200
201#if 0
202  if (! delfile)
203    return;
204  if (iop->pathinfo.ops->unlink_h == NULL)
205    return;
206
207  /* This is safe for IMFS, but how about other FSes? */
208  rtems_libio_iop_flags_clear( iop, LIBIO_FLAGS_OPEN );
209  if(iop->pathinfo.ops->unlink_h(&iop->pathinfo))
210    return;
211#endif
212
213}
214
215int fifo_open(
216  pipe_control_t **pipep,
217  rtems_libio_t *iop
218)
219{
220  pipe_control_t *pipe;
221  unsigned int prevCounter;
222  int err;
223
224  err = pipe_new(pipep);
225  if (err)
226    return err;
227  pipe = *pipep;
228
229  switch (LIBIO_ACCMODE(iop)) {
230    case LIBIO_FLAGS_READ:
231      pipe->readerCounter ++;
232      if (pipe->Readers ++ == 0)
233        PIPE_WAKEUPWRITERS(pipe);
234
235      if (pipe->Writers == 0) {
236        /* Not an error */
237        if (LIBIO_NODELAY(iop))
238          break;
239
240        prevCounter = pipe->writerCounter;
241        err = -EINTR;
242        /* Wait until a writer opens the pipe */
243        do {
244          PIPE_UNLOCK(pipe);
245          if (! PIPE_READWAIT(pipe))
246            goto out_error;
247          PIPE_LOCK(pipe);
248        } while (prevCounter == pipe->writerCounter);
249      }
250      break;
251
252    case LIBIO_FLAGS_WRITE:
253      pipe->writerCounter ++;
254
255      if (pipe->Writers ++ == 0)
256        PIPE_WAKEUPREADERS(pipe);
257
258      if (pipe->Readers == 0 && LIBIO_NODELAY(iop)) {
259        PIPE_UNLOCK(pipe);
260        err = -ENXIO;
261        goto out_error;
262      }
263
264      if (pipe->Readers == 0) {
265        prevCounter = pipe->readerCounter;
266        err = -EINTR;
267        do {
268          PIPE_UNLOCK(pipe);
269          if (! PIPE_WRITEWAIT(pipe))
270            goto out_error;
271          PIPE_LOCK(pipe);
272        } while (prevCounter == pipe->readerCounter);
273      }
274      break;
275
276    case LIBIO_FLAGS_READ_WRITE:
277      pipe->readerCounter ++;
278      if (pipe->Readers ++ == 0)
279        PIPE_WAKEUPWRITERS(pipe);
280      pipe->writerCounter ++;
281      if (pipe->Writers ++ == 0)
282        PIPE_WAKEUPREADERS(pipe);
283      break;
284  }
285
286  PIPE_UNLOCK(pipe);
287  return 0;
288
289out_error:
290  pipe_release(pipep, iop);
291  return err;
292}
293
294ssize_t pipe_read(
295  pipe_control_t *pipe,
296  void           *buffer,
297  size_t          count,
298  rtems_libio_t  *iop
299)
300{
301  int chunk, chunk1, read = 0, ret = 0;
302
303  PIPE_LOCK(pipe);
304
305  while (PIPE_EMPTY(pipe)) {
306    /* Not an error */
307    if (pipe->Writers == 0)
308      goto out_locked;
309
310    if (LIBIO_NODELAY(iop)) {
311      ret = -EAGAIN;
312      goto out_locked;
313    }
314
315    /* Wait until pipe is no more empty or no writer exists */
316    pipe->waitingReaders ++;
317    PIPE_UNLOCK(pipe);
318    if (! PIPE_READWAIT(pipe))
319      ret = -EINTR;
320    PIPE_LOCK(pipe);
321    pipe->waitingReaders --;
322    if (ret != 0)
323      goto out_locked;
324  }
325
326  /* Read chunk bytes */
327  chunk = MIN(count - read,  pipe->Length);
328  chunk1 = pipe->Size - pipe->Start;
329  if (chunk > chunk1) {
330    memcpy(buffer + read, pipe->Buffer + pipe->Start, chunk1);
331    memcpy(buffer + read + chunk1, pipe->Buffer, chunk - chunk1);
332  }
333  else
334    memcpy(buffer + read, pipe->Buffer + pipe->Start, chunk);
335
336  pipe->Start += chunk;
337  pipe->Start %= pipe->Size;
338  pipe->Length -= chunk;
339  /* For buffering optimization */
340  if (PIPE_EMPTY(pipe))
341    pipe->Start = 0;
342
343  if (pipe->waitingWriters > 0)
344    PIPE_WAKEUPWRITERS(pipe);
345  read += chunk;
346
347out_locked:
348  PIPE_UNLOCK(pipe);
349
350  if (read > 0)
351    return read;
352  return ret;
353}
354
355ssize_t pipe_write(
356  pipe_control_t *pipe,
357  const void     *buffer,
358  size_t          count,
359  rtems_libio_t  *iop
360)
361{
362  int chunk, chunk1, written = 0, ret = 0;
363
364  /* Write nothing */
365  if (count == 0)
366    return 0;
367
368  PIPE_LOCK(pipe);
369
370  if (pipe->Readers == 0) {
371    ret = -EPIPE;
372    goto out_locked;
373  }
374
375  /* Write of PIPE_BUF bytes or less shall not be interleaved */
376  chunk = count <= pipe->Size ? count : 1;
377
378  while (written < count) {
379    while (PIPE_SPACE(pipe) < chunk) {
380      if (LIBIO_NODELAY(iop)) {
381        ret = -EAGAIN;
382        goto out_locked;
383      }
384
385      /* Wait until there is chunk bytes space or no reader exists */
386      pipe->waitingWriters ++;
387      PIPE_UNLOCK(pipe);
388      if (! PIPE_WRITEWAIT(pipe))
389        ret = -EINTR;
390      PIPE_LOCK(pipe);
391      pipe->waitingWriters --;
392      if (ret != 0)
393        goto out_locked;
394
395      if (pipe->Readers == 0) {
396        ret = -EPIPE;
397        goto out_locked;
398      }
399    }
400
401    chunk = MIN(count - written, PIPE_SPACE(pipe));
402    chunk1 = pipe->Size - PIPE_WSTART(pipe);
403    if (chunk > chunk1) {
404      memcpy(pipe->Buffer + PIPE_WSTART(pipe), buffer + written, chunk1);
405      memcpy(pipe->Buffer, buffer + written + chunk1, chunk - chunk1);
406    }
407    else
408      memcpy(pipe->Buffer + PIPE_WSTART(pipe), buffer + written, chunk);
409
410    pipe->Length += chunk;
411    if (pipe->waitingReaders > 0)
412      PIPE_WAKEUPREADERS(pipe);
413    written += chunk;
414    /* Write of more than PIPE_BUF bytes can be interleaved */
415    chunk = 1;
416  }
417
418out_locked:
419  PIPE_UNLOCK(pipe);
420
421#ifdef RTEMS_POSIX_API
422  /* Signal SIGPIPE */
423  if (ret == -EPIPE)
424    kill(getpid(), SIGPIPE);
425#endif
426
427  if (written > 0)
428    return written;
429  return ret;
430}
431
432int pipe_ioctl(
433  pipe_control_t  *pipe,
434  ioctl_command_t  cmd,
435  void            *buffer,
436  rtems_libio_t   *iop
437)
438{
439  if (cmd == FIONREAD) {
440    if (buffer == NULL)
441      return -EFAULT;
442
443    PIPE_LOCK(pipe);
444
445    /* Return length of pipe */
446    *(unsigned int *)buffer = pipe->Length;
447    PIPE_UNLOCK(pipe);
448    return 0;
449  }
450
451  return -EINVAL;
452}
Note: See TracBrowser for help on using the repository browser.