source: rtems/cpukit/libfs/src/pipe/fifo.c @ 9da8740

4.11
Last change on this file since 9da8740 was 9da8740, checked in by Chris Johns <chrisj@…>, on Dec 10, 2013 at 1:33:22 AM

PR2159: Have the FIFO driver read follow POSIX standard.

The read call was only returning once the requested buffer was full.
The change returns any available data.

  • Property mode set to 100644
File size: 11.8 KB
Line 
1/**
2 * @file
3 *
4 * @brief FIFO/Pipe Support
5 * @ingroup FIFO_PIPE
6 */
7
8/*
9 * Author: Wei Shen <cquark@gmail.com>
10 *
11 * The license and distribution terms for this file may be
12 * found in the file LICENSE in this distribution or at
13 * http://www.rtems.com/license/LICENSE.
14 */
15
16
17#if HAVE_CONFIG_H
18#include "config.h"
19#endif
20
21#include <errno.h>
22#include <stdlib.h>
23#include <string.h>
24
25#include <rtems.h>
26#include <rtems/libio_.h>
27#include <rtems/rtems/barrierimpl.h>
28#include <rtems/score/statesimpl.h>
29
30#include "pipe.h"
31
32
33#define MIN(a, b) ((a) < (b)? (a): (b))
34
35#define LIBIO_ACCMODE(_iop) ((_iop)->flags & LIBIO_FLAGS_READ_WRITE)
36#define LIBIO_NODELAY(_iop) ((_iop)->flags & LIBIO_FLAGS_NO_DELAY)
37
38static rtems_id pipe_semaphore = RTEMS_ID_NONE;
39
40
41#define PIPE_EMPTY(_pipe) (_pipe->Length == 0)
42#define PIPE_FULL(_pipe)  (_pipe->Length == _pipe->Size)
43#define PIPE_SPACE(_pipe) (_pipe->Size - _pipe->Length)
44#define PIPE_WSTART(_pipe) ((_pipe->Start + _pipe->Length) % _pipe->Size)
45
46#define PIPE_LOCK(_pipe)  \
47  ( rtems_semaphore_obtain(_pipe->Semaphore, RTEMS_WAIT, RTEMS_NO_TIMEOUT)  \
48   == RTEMS_SUCCESSFUL )
49
50#define PIPE_UNLOCK(_pipe)  rtems_semaphore_release(_pipe->Semaphore)
51
52#define PIPE_READWAIT(_pipe)  \
53  ( rtems_barrier_wait(_pipe->readBarrier, RTEMS_NO_TIMEOUT)  \
54   == RTEMS_SUCCESSFUL)
55
56#define PIPE_WRITEWAIT(_pipe)  \
57  ( rtems_barrier_wait(_pipe->writeBarrier, RTEMS_NO_TIMEOUT)  \
58   == RTEMS_SUCCESSFUL)
59
60#define PIPE_WAKEUPREADERS(_pipe) \
61  do {uint32_t n; rtems_barrier_release(_pipe->readBarrier, &n); } while(0)
62
63#define PIPE_WAKEUPWRITERS(_pipe) \
64  do {uint32_t n; rtems_barrier_release(_pipe->writeBarrier, &n); } while(0)
65
66
67#ifdef RTEMS_POSIX_API
68#include <rtems/rtems/barrier.h>
69#include <rtems/score/thread.h>
70
71/* Set barriers to be interruptible by signals. */
72static void pipe_interruptible(pipe_control_t *pipe)
73{
74  Objects_Locations  location;
75  Barrier_Control   *the_barrier;
76
77  the_barrier = _Barrier_Get(pipe->readBarrier, &location);
78  the_barrier->Barrier.Wait_queue.state |= STATES_INTERRUPTIBLE_BY_SIGNAL;
79  _Objects_Put( &the_barrier->Object );
80
81  the_barrier = _Barrier_Get(pipe->writeBarrier, &location);
82  the_barrier->Barrier.Wait_queue.state |= STATES_INTERRUPTIBLE_BY_SIGNAL;
83  _Objects_Put( &the_barrier->Object );
84}
85#endif
86
87/*
88 * Alloc pipe control structure, buffer, and resources.
89 * Called with pipe_semaphore held.
90 */
91static int pipe_alloc(
92  pipe_control_t **pipep
93)
94{
95  static char c = 'a';
96  pipe_control_t *pipe;
97  int err = -ENOMEM;
98
99  pipe = malloc(sizeof(pipe_control_t));
100  if (pipe == NULL)
101    return err;
102  memset(pipe, 0, sizeof(pipe_control_t));
103
104  pipe->Size = PIPE_BUF;
105  pipe->Buffer = malloc(pipe->Size);
106  if (! pipe->Buffer)
107    goto err_buf;
108
109  err = -ENOMEM;
110
111  if (rtems_barrier_create(
112        rtems_build_name ('P', 'I', 'r', c),
113        RTEMS_BARRIER_MANUAL_RELEASE, 0,
114        &pipe->readBarrier) != RTEMS_SUCCESSFUL)
115    goto err_rbar;
116  if (rtems_barrier_create(
117        rtems_build_name ('P', 'I', 'w', c),
118        RTEMS_BARRIER_MANUAL_RELEASE, 0,
119        &pipe->writeBarrier) != RTEMS_SUCCESSFUL)
120    goto err_wbar;
121  if (rtems_semaphore_create(
122        rtems_build_name ('P', 'I', 's', c), 1,
123        RTEMS_BINARY_SEMAPHORE | RTEMS_FIFO,
124        RTEMS_NO_PRIORITY, &pipe->Semaphore) != RTEMS_SUCCESSFUL)
125    goto err_sem;
126
127#ifdef RTEMS_POSIX_API
128  pipe_interruptible(pipe);
129#endif
130
131  *pipep = pipe;
132  if (c ++ == 'z')
133    c = 'a';
134  return 0;
135
136err_sem:
137  rtems_barrier_delete(pipe->writeBarrier);
138err_wbar:
139  rtems_barrier_delete(pipe->readBarrier);
140err_rbar:
141  free(pipe->Buffer);
142err_buf:
143  free(pipe);
144  return err;
145}
146
147/* Called with pipe_semaphore held. */
148static inline void pipe_free(
149  pipe_control_t *pipe
150)
151{
152  rtems_barrier_delete(pipe->readBarrier);
153  rtems_barrier_delete(pipe->writeBarrier);
154  rtems_semaphore_delete(pipe->Semaphore);
155  free(pipe->Buffer);
156  free(pipe);
157}
158
159static int pipe_lock(void)
160{
161  rtems_status_code sc = RTEMS_SUCCESSFUL;
162
163  if (pipe_semaphore == RTEMS_ID_NONE) {
164    rtems_libio_lock();
165
166    if (pipe_semaphore == RTEMS_ID_NONE) {
167      sc = rtems_semaphore_create(
168        rtems_build_name('P', 'I', 'P', 'E'),
169        1,
170        RTEMS_BINARY_SEMAPHORE | RTEMS_INHERIT_PRIORITY | RTEMS_PRIORITY,
171        RTEMS_NO_PRIORITY,
172        &pipe_semaphore
173      );
174    }
175
176    rtems_libio_unlock();
177  }
178
179  if (sc == RTEMS_SUCCESSFUL) {
180    sc = rtems_semaphore_obtain(pipe_semaphore, RTEMS_WAIT, RTEMS_NO_TIMEOUT);
181  }
182
183  if (sc == RTEMS_SUCCESSFUL) {
184    return 0;
185  } else {
186    return -ENOMEM;
187  }
188}
189
190static void pipe_unlock(void)
191{
192#ifdef RTEMS_DEBUG
193  rtems_status_code sc = RTEMS_SUCCESSFUL;
194
195  sc =
196#endif
197   rtems_semaphore_release(pipe_semaphore);
198  #ifdef RTEMS_DEBUG
199    if (sc != RTEMS_SUCCESSFUL) {
200      rtems_fatal_error_occurred(0xdeadbeef);
201    }
202  #endif
203}
204
205/*
206 * If called with *pipep = NULL, pipe_new will call pipe_alloc to allocate a
207 * pipe control structure and set *pipep to its address.
208 * pipe is locked, when pipe_new returns with no error.
209 */
210static int pipe_new(
211  pipe_control_t **pipep
212)
213{
214  pipe_control_t *pipe;
215  int err = 0;
216
217  err = pipe_lock();
218  if (err)
219    return err;
220
221  pipe = *pipep;
222  if (pipe == NULL) {
223    err = pipe_alloc(&pipe);
224    if (err)
225      goto out;
226  }
227
228  if (! PIPE_LOCK(pipe))
229    err = -EINTR;
230
231  if (*pipep == NULL) {
232    if (err)
233      pipe_free(pipe);
234    else
235      *pipep = pipe;
236  }
237
238out:
239  pipe_unlock();
240  return err;
241}
242
243void pipe_release(
244  pipe_control_t **pipep,
245  rtems_libio_t *iop
246)
247{
248  pipe_control_t *pipe = *pipep;
249  uint32_t mode;
250
251  #if defined(RTEMS_DEBUG)
252    /* WARN pipe not freed and pipep not set to NULL! */
253    if (pipe_lock())
254      rtems_fatal_error_occurred(0xdeadbeef);
255
256    /* WARN pipe not released! */
257    if (!PIPE_LOCK(pipe))
258      rtems_fatal_error_occurred(0xdeadbeef);
259  #endif
260
261  mode = LIBIO_ACCMODE(iop);
262  if (mode & LIBIO_FLAGS_READ)
263     pipe->Readers --;
264  if (mode & LIBIO_FLAGS_WRITE)
265     pipe->Writers --;
266
267  PIPE_UNLOCK(pipe);
268
269  if (pipe->Readers == 0 && pipe->Writers == 0) {
270#if 0
271    /* To delete an anonymous pipe file when all users closed it */
272    if (pipe->Anonymous)
273      delfile = TRUE;
274#endif
275    pipe_free(pipe);
276    *pipep = NULL;
277  }
278  else if (pipe->Readers == 0 && mode != LIBIO_FLAGS_WRITE)
279    /* Notify waiting Writers that all their partners left */
280    PIPE_WAKEUPWRITERS(pipe);
281  else if (pipe->Writers == 0 && mode != LIBIO_FLAGS_READ)
282    PIPE_WAKEUPREADERS(pipe);
283
284  pipe_unlock();
285
286#if 0
287  if (! delfile)
288    return;
289  if (iop->pathinfo.ops->unlink_h == NULL)
290    return;
291
292  /* This is safe for IMFS, but how about other FSes? */
293  iop->flags &= ~LIBIO_FLAGS_OPEN;
294  if(iop->pathinfo.ops->unlink_h(&iop->pathinfo))
295    return;
296#endif
297
298}
299
300int fifo_open(
301  pipe_control_t **pipep,
302  rtems_libio_t *iop
303)
304{
305  pipe_control_t *pipe;
306  unsigned int prevCounter;
307  int err;
308
309  err = pipe_new(pipep);
310  if (err)
311    return err;
312  pipe = *pipep;
313
314  switch (LIBIO_ACCMODE(iop)) {
315    case LIBIO_FLAGS_READ:
316      pipe->readerCounter ++;
317      if (pipe->Readers ++ == 0)
318        PIPE_WAKEUPWRITERS(pipe);
319
320      if (pipe->Writers == 0) {
321        /* Not an error */
322        if (LIBIO_NODELAY(iop))
323          break;
324
325        prevCounter = pipe->writerCounter;
326        err = -EINTR;
327        /* Wait until a writer opens the pipe */
328        do {
329          PIPE_UNLOCK(pipe);
330          if (! PIPE_READWAIT(pipe))
331            goto out_error;
332          if (! PIPE_LOCK(pipe))
333            goto out_error;
334        } while (prevCounter == pipe->writerCounter);
335      }
336      break;
337
338    case LIBIO_FLAGS_WRITE:
339      pipe->writerCounter ++;
340
341      if (pipe->Writers ++ == 0)
342        PIPE_WAKEUPREADERS(pipe);
343
344      if (pipe->Readers == 0 && LIBIO_NODELAY(iop)) {
345        PIPE_UNLOCK(pipe);
346        err = -ENXIO;
347        goto out_error;
348      }
349
350      if (pipe->Readers == 0) {
351        prevCounter = pipe->readerCounter;
352        err = -EINTR;
353        do {
354          PIPE_UNLOCK(pipe);
355          if (! PIPE_WRITEWAIT(pipe))
356            goto out_error;
357          if (! PIPE_LOCK(pipe))
358            goto out_error;
359        } while (prevCounter == pipe->readerCounter);
360      }
361      break;
362
363    case LIBIO_FLAGS_READ_WRITE:
364      pipe->readerCounter ++;
365      if (pipe->Readers ++ == 0)
366        PIPE_WAKEUPWRITERS(pipe);
367      pipe->writerCounter ++;
368      if (pipe->Writers ++ == 0)
369        PIPE_WAKEUPREADERS(pipe);
370      break;
371  }
372
373  PIPE_UNLOCK(pipe);
374  return 0;
375
376out_error:
377  pipe_release(pipep, iop);
378  return err;
379}
380
381ssize_t pipe_read(
382  pipe_control_t *pipe,
383  void           *buffer,
384  size_t          count,
385  rtems_libio_t  *iop
386)
387{
388  int chunk, chunk1, read = 0, ret = 0;
389
390  if (! PIPE_LOCK(pipe))
391    return -EINTR;
392
393  while (PIPE_EMPTY(pipe)) {
394    /* Not an error */
395    if (pipe->Writers == 0)
396      goto out_locked;
397
398    if (LIBIO_NODELAY(iop)) {
399      ret = -EAGAIN;
400      goto out_locked;
401    }
402
403    /* Wait until pipe is no more empty or no writer exists */
404    pipe->waitingReaders ++;
405    PIPE_UNLOCK(pipe);
406    if (! PIPE_READWAIT(pipe))
407      ret = -EINTR;
408    if (! PIPE_LOCK(pipe)) {
409      /* WARN waitingReaders not restored! */
410      ret = -EINTR;
411      goto out_nolock;
412    }
413    pipe->waitingReaders --;
414    if (ret != 0)
415      goto out_locked;
416  }
417
418  /* Read chunk bytes */
419  chunk = MIN(count - read,  pipe->Length);
420  chunk1 = pipe->Size - pipe->Start;
421  if (chunk > chunk1) {
422    memcpy(buffer + read, pipe->Buffer + pipe->Start, chunk1);
423    memcpy(buffer + read + chunk1, pipe->Buffer, chunk - chunk1);
424  }
425  else
426    memcpy(buffer + read, pipe->Buffer + pipe->Start, chunk);
427
428  pipe->Start += chunk;
429  pipe->Start %= pipe->Size;
430  pipe->Length -= chunk;
431  /* For buffering optimization */
432  if (PIPE_EMPTY(pipe))
433    pipe->Start = 0;
434
435  if (pipe->waitingWriters > 0)
436    PIPE_WAKEUPWRITERS(pipe);
437  read += chunk;
438
439out_locked:
440  PIPE_UNLOCK(pipe);
441
442out_nolock:
443  if (read > 0)
444    return read;
445  return ret;
446}
447
448ssize_t pipe_write(
449  pipe_control_t *pipe,
450  const void     *buffer,
451  size_t          count,
452  rtems_libio_t  *iop
453)
454{
455  int chunk, chunk1, written = 0, ret = 0;
456
457  /* Write nothing */
458  if (count == 0)
459    return 0;
460
461  if (! PIPE_LOCK(pipe))
462    return -EINTR;
463
464  if (pipe->Readers == 0) {
465    ret = -EPIPE;
466    goto out_locked;
467  }
468
469  /* Write of PIPE_BUF bytes or less shall not be interleaved */
470  chunk = count <= pipe->Size ? count : 1;
471
472  while (written < count) {
473    while (PIPE_SPACE(pipe) < chunk) {
474      if (LIBIO_NODELAY(iop)) {
475        ret = -EAGAIN;
476        goto out_locked;
477      }
478
479      /* Wait until there is chunk bytes space or no reader exists */
480      pipe->waitingWriters ++;
481      PIPE_UNLOCK(pipe);
482      if (! PIPE_WRITEWAIT(pipe))
483        ret = -EINTR;
484      if (! PIPE_LOCK(pipe)) {
485        /* WARN waitingWriters not restored! */
486        ret = -EINTR;
487        goto out_nolock;
488      }
489      pipe->waitingWriters --;
490      if (ret != 0)
491        goto out_locked;
492
493      if (pipe->Readers == 0) {
494        ret = -EPIPE;
495        goto out_locked;
496      }
497    }
498
499    chunk = MIN(count - written, PIPE_SPACE(pipe));
500    chunk1 = pipe->Size - PIPE_WSTART(pipe);
501    if (chunk > chunk1) {
502      memcpy(pipe->Buffer + PIPE_WSTART(pipe), buffer + written, chunk1);
503      memcpy(pipe->Buffer, buffer + written + chunk1, chunk - chunk1);
504    }
505    else
506      memcpy(pipe->Buffer + PIPE_WSTART(pipe), buffer + written, chunk);
507
508    pipe->Length += chunk;
509    if (pipe->waitingReaders > 0)
510      PIPE_WAKEUPREADERS(pipe);
511    written += chunk;
512    /* Write of more than PIPE_BUF bytes can be interleaved */
513    chunk = 1;
514  }
515
516out_locked:
517  PIPE_UNLOCK(pipe);
518
519out_nolock:
520#ifdef RTEMS_POSIX_API
521  /* Signal SIGPIPE */
522  if (ret == -EPIPE)
523    kill(getpid(), SIGPIPE);
524#endif
525
526  if (written > 0)
527    return written;
528  return ret;
529}
530
531int pipe_ioctl(
532  pipe_control_t  *pipe,
533  ioctl_command_t  cmd,
534  void            *buffer,
535  rtems_libio_t   *iop
536)
537{
538  if (cmd == FIONREAD) {
539    if (buffer == NULL)
540      return -EFAULT;
541
542    if (! PIPE_LOCK(pipe))
543      return -EINTR;
544
545    /* Return length of pipe */
546    *(unsigned int *)buffer = pipe->Length;
547    PIPE_UNLOCK(pipe);
548    return 0;
549  }
550
551  return -EINVAL;
552}
Note: See TracBrowser for help on using the repository browser.