source: rtems/cpukit/libfs/src/pipe/fifo.c @ 1f16a9f

Last change on this file since 1f16a9f was 1f16a9f, checked in by Sebastian Huber <sebastian.huber@…>, on 04/12/12 at 15:15:53

Filesystem: Add select() support for pipes

  • Property mode set to 100644
File size: 14.0 KB
Line 
1/*
2 * fifo.c: POSIX FIFO/pipe for RTEMS
3 *
4 * Author: Wei Shen <cquark@gmail.com>
5 *
6 * The license and distribution terms for this file may be
7 * found in the file LICENSE in this distribution or at
8 * http://www.rtems.com/license/LICENSE.
9 *
10 * $Id$
11 */
12
13
14#if HAVE_CONFIG_H
15#include "config.h"
16#endif
17
18#ifdef RTEMS_POSIX_API
19#define __RTEMS_VIOLATE_KERNEL_VISIBILITY__
20#endif
21
22#include <errno.h>
23#include <stdlib.h>
24
25#include <rtems.h>
26#include <rtems/libio_.h>
27
28#include "pipe.h"
29
30
31#define MIN(a, b) ((a) < (b)? (a): (b))
32
33#define LIBIO_ACCMODE(_iop) ((_iop)->flags & LIBIO_FLAGS_READ_WRITE)
34#define LIBIO_NODELAY(_iop) ((_iop)->flags & LIBIO_FLAGS_NO_DELAY)
35
36static rtems_id pipe_semaphore = RTEMS_ID_NONE;
37
38
39#define PIPE_EMPTY(_pipe) (_pipe->Length == 0)
40#define PIPE_FULL(_pipe)  (_pipe->Length == _pipe->Size)
41#define PIPE_SPACE(_pipe) (_pipe->Size - _pipe->Length)
42#define PIPE_WSTART(_pipe) ((_pipe->Start + _pipe->Length) % _pipe->Size)
43
44#define PIPE_LOCK(_pipe)  \
45  ( rtems_semaphore_obtain(_pipe->Semaphore, RTEMS_WAIT, RTEMS_NO_TIMEOUT)  \
46   == RTEMS_SUCCESSFUL )
47
48#define PIPE_UNLOCK(_pipe)  rtems_semaphore_release(_pipe->Semaphore)
49
50#define PIPE_READWAIT(_pipe)  \
51  ( rtems_barrier_wait(_pipe->readBarrier, RTEMS_NO_TIMEOUT)  \
52   == RTEMS_SUCCESSFUL)
53
54#define PIPE_WRITEWAIT(_pipe)  \
55  ( rtems_barrier_wait(_pipe->writeBarrier, RTEMS_NO_TIMEOUT)  \
56   == RTEMS_SUCCESSFUL)
57
58#define PIPE_WAKEUPREADERS(_pipe) \
59  do {uint32_t n; rtems_barrier_release(_pipe->readBarrier, &n); } while(0)
60
61#define PIPE_WAKEUPWRITERS(_pipe) \
62  do {uint32_t n; rtems_barrier_release(_pipe->writeBarrier, &n); } while(0)
63
64
65#ifdef RTEMS_POSIX_API
66#define __RTEMS_VIOLATE_KERNEL_VISIBILITY__
67
68#include <rtems/rtems/barrier.h>
69#include <rtems/score/thread.h>
70
71static void pipe_select_wakeup(rtems_id *id_ptr)
72{
73  rtems_id id = *id_ptr;
74
75  *id_ptr = 0;
76
77  if (id != 0) {
78    rtems_status_code sc = rtems_event_send(id, RTEMS_IOCTL_SELECT_EVENT);
79
80    if (sc != RTEMS_SUCCESSFUL) {
81      rtems_fatal_error_occurred(sc);
82    }
83  }
84}
85
86/* Set barriers to be interruptible by signals. */
87static void pipe_interruptible(pipe_control_t *pipe)
88{
89  Objects_Locations location;
90
91  _Barrier_Get(pipe->readBarrier, &location)->Barrier.Wait_queue.state
92    |= STATES_INTERRUPTIBLE_BY_SIGNAL;
93  _Thread_Enable_dispatch();
94  _Barrier_Get(pipe->writeBarrier, &location)->Barrier.Wait_queue.state
95    |= STATES_INTERRUPTIBLE_BY_SIGNAL;
96  _Thread_Enable_dispatch();
97}
98#endif
99
100/*
101 * Alloc pipe control structure, buffer, and resources.
102 * Called with pipe_semaphore held.
103 */
104static int pipe_alloc(
105  pipe_control_t **pipep
106)
107{
108  static char c = 'a';
109  pipe_control_t *pipe;
110  int err = -ENOMEM;
111
112  pipe = calloc(1, sizeof(pipe_control_t));
113  if (pipe == NULL)
114    return err;
115
116  pipe->Size = PIPE_BUF;
117  pipe->Buffer = malloc(pipe->Size);
118  if (! pipe->Buffer)
119    goto err_buf;
120
121  err = -ENOMEM;
122
123  if (rtems_barrier_create(
124        rtems_build_name ('P', 'I', 'r', c),
125        RTEMS_BARRIER_MANUAL_RELEASE, 0,
126        &pipe->readBarrier) != RTEMS_SUCCESSFUL)
127    goto err_rbar;
128  if (rtems_barrier_create(
129        rtems_build_name ('P', 'I', 'w', c),
130        RTEMS_BARRIER_MANUAL_RELEASE, 0,
131        &pipe->writeBarrier) != RTEMS_SUCCESSFUL)
132    goto err_wbar;
133  if (rtems_semaphore_create(
134        rtems_build_name ('P', 'I', 's', c), 1,
135        RTEMS_BINARY_SEMAPHORE | RTEMS_FIFO,
136        RTEMS_NO_PRIORITY, &pipe->Semaphore) != RTEMS_SUCCESSFUL)
137    goto err_sem;
138
139#ifdef RTEMS_POSIX_API
140  pipe_interruptible(pipe);
141#endif
142
143  *pipep = pipe;
144  if (c ++ == 'z')
145    c = 'a';
146  return 0;
147
148err_sem:
149  rtems_barrier_delete(pipe->writeBarrier);
150err_wbar:
151  rtems_barrier_delete(pipe->readBarrier);
152err_rbar:
153  free(pipe->Buffer);
154err_buf:
155  free(pipe);
156  return err;
157}
158
159/* Called with pipe_semaphore held. */
160static inline void pipe_free(
161  pipe_control_t *pipe
162)
163{
164  rtems_barrier_delete(pipe->readBarrier);
165  rtems_barrier_delete(pipe->writeBarrier);
166  rtems_semaphore_delete(pipe->Semaphore);
167  free(pipe->Buffer);
168  free(pipe);
169}
170
171static int pipe_lock(void)
172{
173  rtems_status_code sc = RTEMS_SUCCESSFUL;
174
175  if (pipe_semaphore == RTEMS_ID_NONE) {
176    rtems_libio_lock();
177
178    if (pipe_semaphore == RTEMS_ID_NONE) {
179      sc = rtems_semaphore_create(
180        rtems_build_name('P', 'I', 'P', 'E'),
181        1,
182        RTEMS_BINARY_SEMAPHORE | RTEMS_INHERIT_PRIORITY | RTEMS_PRIORITY,
183        RTEMS_NO_PRIORITY,
184        &pipe_semaphore
185      );
186    }
187
188    rtems_libio_unlock();
189  }
190
191  if (sc == RTEMS_SUCCESSFUL) {
192    sc = rtems_semaphore_obtain(pipe_semaphore, RTEMS_WAIT, RTEMS_NO_TIMEOUT);
193  }
194
195  if (sc == RTEMS_SUCCESSFUL) {
196    return 0;
197  } else {
198    return -ENOMEM;
199  }
200}
201
202static void pipe_unlock(void)
203{
204#ifdef RTEMS_DEBUG
205  rtems_status_code sc = RTEMS_SUCCESSFUL;
206
207  sc =
208#endif
209   rtems_semaphore_release(pipe_semaphore);
210  #ifdef RTEMS_DEBUG
211    if (sc != RTEMS_SUCCESSFUL) {
212      rtems_fatal_error_occurred(0xdeadbeef);
213    }
214  #endif
215}
216
217/*
218 * If called with *pipep = NULL, pipe_new will call pipe_alloc to allocate a
219 * pipe control structure and set *pipep to its address.
220 * pipe is locked, when pipe_new returns with no error.
221 */
222static int pipe_new(
223  pipe_control_t **pipep
224)
225{
226  pipe_control_t *pipe;
227  int err = 0;
228
229  err = pipe_lock();
230  if (err)
231    return err;
232
233  pipe = *pipep;
234  if (pipe == NULL) {
235    err = pipe_alloc(&pipe);
236    if (err)
237      goto out;
238  }
239
240  if (! PIPE_LOCK(pipe))
241    err = -EINTR;
242
243  if (*pipep == NULL) {
244    if (err)
245      pipe_free(pipe);
246    else
247      *pipep = pipe;
248  }
249
250out:
251  pipe_unlock();
252  return err;
253}
254
255/*
256 * Interface to file system close.
257 *
258 * *pipep points to pipe control structure. When the last user releases pipe,
259 * it will be set to NULL.
260 */
261void pipe_release(
262  pipe_control_t **pipep,
263  rtems_libio_t *iop
264)
265{
266  pipe_control_t *pipe = *pipep;
267  uint32_t mode;
268
269  #if defined(RTEMS_DEBUG)
270    /* WARN pipe not freed and pipep not set to NULL! */
271    if (pipe_lock())
272      rtems_fatal_error_occurred(0xdeadbeef);
273
274    /* WARN pipe not released! */
275    if (!PIPE_LOCK(pipe))
276      rtems_fatal_error_occurred(0xdeadbeef);
277  #endif
278
279  mode = LIBIO_ACCMODE(iop);
280  if (mode & LIBIO_FLAGS_READ)
281     pipe->Readers --;
282  if (mode & LIBIO_FLAGS_WRITE)
283     pipe->Writers --;
284
285  PIPE_UNLOCK(pipe);
286
287  if (pipe->Readers == 0 && pipe->Writers == 0) {
288#if 0
289    /* To delete an anonymous pipe file when all users closed it */
290    if (pipe->Anonymous)
291      delfile = TRUE;
292#endif
293    pipe_free(pipe);
294    *pipep = NULL;
295  }
296  else if (pipe->Readers == 0 && mode != LIBIO_FLAGS_WRITE)
297    /* Notify waiting Writers that all their partners left */
298    PIPE_WAKEUPWRITERS(pipe);
299  else if (pipe->Writers == 0 && mode != LIBIO_FLAGS_READ)
300    PIPE_WAKEUPREADERS(pipe);
301
302  pipe_unlock();
303
304#if 0
305  if (! delfile)
306    return;
307  if (iop->pathinfo.ops->unlink_h == NULL)
308    return;
309
310  /* This is safe for IMFS, but how about other FSes? */
311  iop->flags &= ~LIBIO_FLAGS_OPEN;
312  if(iop->pathinfo.ops->unlink_h(&iop->pathinfo))
313    return;
314#endif
315
316}
317
318/*
319 * Interface to file system open.
320 *
321 * *pipep points to pipe control structure. If called with *pipep = NULL,
322 * fifo_open will try allocating and initializing a control structure. If the
323 * call succeeds, *pipep will be set to address of new control structure.
324 */
325int fifo_open(
326  pipe_control_t **pipep,
327  rtems_libio_t *iop
328)
329{
330  pipe_control_t *pipe;
331  unsigned int prevCounter;
332  int err;
333
334  err = pipe_new(pipep);
335  if (err)
336    return err;
337  pipe = *pipep;
338
339  switch (LIBIO_ACCMODE(iop)) {
340    case LIBIO_FLAGS_READ:
341      pipe->readerCounter ++;
342      if (pipe->Readers ++ == 0)
343        PIPE_WAKEUPWRITERS(pipe);
344
345      if (pipe->Writers == 0) {
346        /* Not an error */
347        if (LIBIO_NODELAY(iop))
348          break;
349
350        prevCounter = pipe->writerCounter;
351        err = -EINTR;
352        /* Wait until a writer opens the pipe */
353        do {
354          PIPE_UNLOCK(pipe);
355          if (! PIPE_READWAIT(pipe))
356            goto out_error;
357          if (! PIPE_LOCK(pipe))
358            goto out_error;
359        } while (prevCounter == pipe->writerCounter);
360      }
361      break;
362
363    case LIBIO_FLAGS_WRITE:
364      pipe->writerCounter ++;
365
366      if (pipe->Writers ++ == 0)
367        PIPE_WAKEUPREADERS(pipe);
368
369      if (pipe->Readers == 0 && LIBIO_NODELAY(iop)) {
370        PIPE_UNLOCK(pipe);
371        err = -ENXIO;
372        goto out_error;
373      }
374
375      if (pipe->Readers == 0) {
376        prevCounter = pipe->readerCounter;
377        err = -EINTR;
378        do {
379          PIPE_UNLOCK(pipe);
380          if (! PIPE_WRITEWAIT(pipe))
381            goto out_error;
382          if (! PIPE_LOCK(pipe))
383            goto out_error;
384        } while (prevCounter == pipe->readerCounter);
385      }
386      break;
387
388    case LIBIO_FLAGS_READ_WRITE:
389      pipe->readerCounter ++;
390      if (pipe->Readers ++ == 0)
391        PIPE_WAKEUPWRITERS(pipe);
392      pipe->writerCounter ++;
393      if (pipe->Writers ++ == 0)
394        PIPE_WAKEUPREADERS(pipe);
395      break;
396  }
397
398  PIPE_UNLOCK(pipe);
399  return 0;
400
401out_error:
402  pipe_release(pipep, iop);
403  return err;
404}
405
406/*
407 * Interface to file system read.
408 */
409ssize_t pipe_read(
410  pipe_control_t *pipe,
411  void           *buffer,
412  size_t          count,
413  rtems_libio_t  *iop
414)
415{
416  int chunk, chunk1, read = 0, ret = 0;
417
418  if (! PIPE_LOCK(pipe))
419    return -EINTR;
420
421  while (read < count) {
422    while (PIPE_EMPTY(pipe)) {
423      /* Not an error */
424      if (pipe->Writers == 0)
425        goto out_locked;
426
427      if (LIBIO_NODELAY(iop)) {
428        ret = -EAGAIN;
429        goto out_locked;
430      }
431
432      /* Wait until pipe is no more empty or no writer exists */
433      pipe->waitingReaders ++;
434      PIPE_UNLOCK(pipe);
435      if (! PIPE_READWAIT(pipe))
436        ret = -EINTR;
437      if (! PIPE_LOCK(pipe)) {
438        /* WARN waitingReaders not restored! */
439        ret = -EINTR;
440        goto out_nolock;
441      }
442      pipe->waitingReaders --;
443      if (ret != 0)
444        goto out_locked;
445    }
446
447    /* Read chunk bytes */
448    chunk = MIN(count - read,  pipe->Length);
449    chunk1 = pipe->Size - pipe->Start;
450    if (chunk > chunk1) {
451      memcpy(buffer + read, pipe->Buffer + pipe->Start, chunk1);
452      memcpy(buffer + read + chunk1, pipe->Buffer, chunk - chunk1);
453    }
454    else
455      memcpy(buffer + read, pipe->Buffer + pipe->Start, chunk);
456
457    pipe->Start += chunk;
458    pipe->Start %= pipe->Size;
459    pipe->Length -= chunk;
460    /* For buffering optimization */
461    if (PIPE_EMPTY(pipe))
462      pipe->Start = 0;
463
464    pipe_select_wakeup(&pipe->select_write_task_id);
465    if (pipe->waitingWriters > 0)
466      PIPE_WAKEUPWRITERS(pipe);
467    read += chunk;
468  }
469
470out_locked:
471  PIPE_UNLOCK(pipe);
472
473out_nolock:
474  if (read > 0)
475    return read;
476  return ret;
477}
478
479/*
480 * Interface to file system write.
481 */
482ssize_t pipe_write(
483  pipe_control_t *pipe,
484  const void     *buffer,
485  size_t          count,
486  rtems_libio_t  *iop
487)
488{
489  int chunk, chunk1, written = 0, ret = 0;
490
491  /* Write nothing */
492  if (count == 0)
493    return 0;
494
495  if (! PIPE_LOCK(pipe))
496    return -EINTR;
497
498  if (pipe->Readers == 0) {
499    ret = -EPIPE;
500    goto out_locked;
501  }
502
503  /* Write of PIPE_BUF bytes or less shall not be interleaved */
504  chunk = count <= pipe->Size ? count : 1;
505
506  while (written < count) {
507    while (PIPE_SPACE(pipe) < chunk) {
508      if (LIBIO_NODELAY(iop)) {
509        ret = -EAGAIN;
510        goto out_locked;
511      }
512
513      /* Wait until there is chunk bytes space or no reader exists */
514      pipe->waitingWriters ++;
515      PIPE_UNLOCK(pipe);
516      if (! PIPE_WRITEWAIT(pipe))
517        ret = -EINTR;
518      if (! PIPE_LOCK(pipe)) {
519        /* WARN waitingWriters not restored! */
520        ret = -EINTR;
521        goto out_nolock;
522      }
523      pipe->waitingWriters --;
524      if (ret != 0)
525        goto out_locked;
526
527      if (pipe->Readers == 0) {
528        ret = -EPIPE;
529        goto out_locked;
530      }
531    }
532
533    chunk = MIN(count - written, PIPE_SPACE(pipe));
534    chunk1 = pipe->Size - PIPE_WSTART(pipe);
535    if (chunk > chunk1) {
536      memcpy(pipe->Buffer + PIPE_WSTART(pipe), buffer + written, chunk1);
537      memcpy(pipe->Buffer, buffer + written + chunk1, chunk - chunk1);
538    }
539    else
540      memcpy(pipe->Buffer + PIPE_WSTART(pipe), buffer + written, chunk);
541
542    pipe->Length += chunk;
543    pipe_select_wakeup(&pipe->select_read_task_id);
544    if (pipe->waitingReaders > 0)
545      PIPE_WAKEUPREADERS(pipe);
546    written += chunk;
547    /* Write of more than PIPE_BUF bytes can be interleaved */
548    chunk = 1;
549  }
550
551out_locked:
552  PIPE_UNLOCK(pipe);
553
554out_nolock:
555#ifdef RTEMS_POSIX_API
556  /* Signal SIGPIPE */
557  if (ret == -EPIPE)
558    kill(getpid(), SIGPIPE);
559#endif
560
561  if (written > 0)
562    return written;
563  return ret;
564}
565
566static int pipe_register_select_wakeup(
567  rtems_id *id_ptr,
568  const rtems_ioctl_select_request *request
569)
570{
571  int rv = 0;
572  rtems_id current_id = *id_ptr;
573  rtems_id request_id = request->request_task_id;
574
575  if (current_id == 0 || current_id == request_id) {
576    *id_ptr = request_id;
577  } else {
578    rv = -EINVAL;
579  }
580
581  return rv;
582}
583
584static int pipe_select(
585  pipe_control_t *pipe,
586  const rtems_ioctl_select_request *request
587)
588{
589  int rv = 0;
590
591  switch (request->kind) {
592    case RTEMS_IOCTL_SELECT_READ:
593      if (!PIPE_EMPTY(pipe)) {
594        rv = 1;
595      } else {
596        rv = pipe_register_select_wakeup(&pipe->select_read_task_id, request);
597      }
598      break;
599    case RTEMS_IOCTL_SELECT_WRITE:
600      if (PIPE_SPACE(pipe) > 0) {
601        rv = 1;
602      } else {
603        rv = pipe_register_select_wakeup(&pipe->select_write_task_id, request);
604      }
605      break;
606    default:
607      break;
608  }
609
610  return rv;
611}
612
613/*
614 * Interface to file system ioctl.
615 */
616int pipe_ioctl(
617  pipe_control_t *pipe,
618  uint32_t        cmd,
619  void           *buffer,
620  rtems_libio_t  *iop
621)
622{
623  int rv = 0;
624
625  if (buffer != NULL) {
626    if (PIPE_LOCK(pipe)) {
627      switch (cmd) {
628        case RTEMS_IOCTL_SELECT:
629          rv = pipe_select(pipe, buffer);
630          break;
631        case FIONREAD:
632          /* Return length of pipe */
633          *(unsigned int *) buffer = pipe->Length;
634          break;
635      }
636
637      PIPE_UNLOCK(pipe);
638    } else {
639      rv = -EINTR;
640    }
641  } else {
642    rv = -EFAULT;
643  }
644
645  return rv;
646}
647
648/*
649 * Interface to file system lseek.
650 */
651int pipe_lseek(
652  pipe_control_t *pipe,
653  off_t           offset,
654  int             whence,
655  rtems_libio_t  *iop
656)
657{
658  /* Seek on pipe is not supported */
659  return -ESPIPE;
660}
Note: See TracBrowser for help on using the repository browser.