source: rtems/cpukit/posix/src/mqueue.c @ 93994fdb

4.104.114.84.95
Last change on this file since 93994fdb was 93994fdb, checked in by Joel Sherrill <joel.sherrill@…>, on Apr 15, 1999 at 6:57:31 PM

Now compiles and is included in normal build even though it is untested.

Added multiprocessing conditional.

  • Property mode set to 100644
File size: 17.0 KB
Line 
1/*
2 *  NOTE:  The structure of the routines is identical to that of POSIX
3 *         Message_queues to leave the option of having unnamed message
4 *         queues at a future date.  They are currently not part of the
5 *         POSIX standard but unnamed message_queues are.  This is also
6 *         the reason for the apparently unnecessary tracking of
7 *         the process_shared attribute.  [In addition to the fact that
8 *         it would be trivial to add pshared to the mq_attr structure
9 *         and have process private message queues.]
10 *
11 *         This code ignores the O_RDONLY/O_WRONLY/O_RDWR flag at open
12 *         time.
13 *
14 *  $Id$
15 */
16
17#include <stdarg.h>
18
19#include <pthread.h>
20#include <limits.h>
21#include <errno.h>
22#include <fcntl.h>
23#include <mqueue.h>
24
25#include <rtems/system.h>
26#include <rtems/score/watchdog.h>
27#include <rtems/posix/seterr.h>
28#include <rtems/posix/mqueue.h>
29#include <rtems/posix/time.h>
30
31/*PAGE
32 *
33 *  _POSIX_Message_queue_Manager_initialization
34 *
35 *  This routine initializes all message_queue manager related data structures.
36 *
37 *  Input parameters:
38 *    maximum_message_queues - maximum configured message_queues
39 *
40 *  Output parameters:  NONE
41 */
42 
43void _POSIX_Message_queue_Manager_initialization(
44  unsigned32 maximum_message_queues
45)
46{
47  _Objects_Initialize_information(
48    &_POSIX_Message_queue_Information,
49    OBJECTS_POSIX_MESSAGE_QUEUES,
50    TRUE,
51    maximum_message_queues,
52    sizeof( POSIX_Message_queue_Control ),
53    TRUE,
54    _POSIX_PATH_MAX,
55    FALSE
56  );
57}
58
59/*PAGE
60 *
61 *  _POSIX_Message_queue_Create_support
62 */
63 
64int _POSIX_Message_queue_Create_support(
65  const char                    *name,
66  int                            pshared,
67  unsigned int                   oflag,
68  struct mq_attr                *attr,
69  POSIX_Message_queue_Control  **message_queue
70)
71{
72  POSIX_Message_queue_Control   *the_mq;
73 
74  _Thread_Disable_dispatch();
75 
76  the_mq = _POSIX_Message_queue_Allocate();
77 
78  if ( !the_mq ) {
79    _Thread_Enable_dispatch();
80    set_errno_and_return_minus_one( ENFILE );
81  }
82 
83#if defined(RTEMS_MULTIPROCESSING)
84  if ( pshared == PTHREAD_PROCESS_SHARED &&
85       !( _Objects_MP_Allocate_and_open( &_POSIX_Message_queue_Information, 0,
86                            the_mq->Object.id, FALSE ) ) ) {
87    _POSIX_Message_queue_Free( the_mq );
88    _Thread_Enable_dispatch();
89    set_errno_and_return_minus_one( ENFILE );
90  }
91#endif
92 
93  the_mq->process_shared  = pshared;
94 
95  if ( name ) {
96    the_mq->named = TRUE;
97    the_mq->open_count = 1;
98    the_mq->linked = TRUE;
99  }
100  else
101    the_mq->named = FALSE;
102 
103  if ( oflag & O_NONBLOCK ) 
104    the_mq->blocking = FALSE;
105  else
106    the_mq->blocking = TRUE;
107 
108  /* XXX
109   *
110   *  Note that this should be based on the current scheduling policy.
111   */
112
113  /* XXX
114   *
115   *  Message and waiting disciplines are not distinguished.
116   */
117/*
118  the_mq_attr->message_discipline = CORE_MESSAGE_QUEUE_DISCIPLINES_FIFO;
119  the_mq_attr->waiting_discipline = CORE_MESSAGE_QUEUE_DISCIPLINES_FIFO;
120 */
121
122  the_mq->Message_queue.Attributes.discipline =
123                                         CORE_MESSAGE_QUEUE_DISCIPLINES_FIFO;
124 
125  if ( ! _CORE_message_queue_Initialize(
126           &the_mq->Message_queue,
127           OBJECTS_POSIX_MESSAGE_QUEUES,
128           &the_mq->Message_queue.Attributes,
129           attr->mq_maxmsg,
130           attr->mq_msgsize,
131#if defined(RTEMS_MULTIPROCESSING)
132           _POSIX_Message_queue_MP_Send_extract_proxy
133#else
134           NULL
135#endif
136      ) ) {
137
138#if defined(RTEMS_MULTIPROCESSING)
139    if ( pshared == PTHREAD_PROCESS_SHARED )
140      _Objects_MP_Close( &_POSIX_Message_queue_Information, the_mq->Object.id );
141#endif
142 
143    _POSIX_Message_queue_Free( the_mq );
144    _Thread_Enable_dispatch();
145    set_errno_and_return_minus_one( ENOSPC );
146  }
147
148 
149  /* XXX - need Names to be a string!!! */
150  _Objects_Open(
151    &_POSIX_Message_queue_Information,
152    &the_mq->Object,
153    (char *) name
154  );
155 
156  *message_queue = the_mq;
157 
158#if defined(RTEMS_MULTIPROCESSING)
159  if ( pshared == PTHREAD_PROCESS_SHARED )
160    _POSIX_Message_queue_MP_Send_process_packet(
161      POSIX_MESSAGE_QUEUE_MP_ANNOUNCE_CREATE,
162      the_mq->Object.id,
163      (char *) name,
164      0                          /* Not used */
165    );
166#endif
167 
168  _Thread_Enable_dispatch();
169  return 0;
170}
171
172/*PAGE
173 *
174 *  15.2.2 Open a Message Queue, P1003.1b-1993, p. 272
175 */
176
177mqd_t mq_open(
178  const char *name,
179  int         oflag,
180  ...
181  /* mode_t mode, */
182  /* struct mq_attr  attr */
183)
184{
185  va_list arg;
186  mode_t mode;
187  struct mq_attr *attr;
188  int                        status;
189  Objects_Id                 the_mq_id;
190  POSIX_Message_queue_Control   *the_mq;
191 
192  if ( oflag & O_CREAT ) {
193    va_start(arg, oflag);
194    mode = (mode_t) va_arg( arg, mode_t * );
195    attr = (struct mq_attr *) va_arg( arg, struct mq_attr ** );
196    va_end(arg);
197  }
198 
199  status = _POSIX_Message_queue_Name_to_id( name, &the_mq_id );
200 
201  /*
202   *  If the name to id translation worked, then the message queue exists
203   *  and we can just return a pointer to the id.  Otherwise we may
204   *  need to check to see if this is a "message queue does not exist"
205   *  or some other miscellaneous error on the name.
206   */
207 
208  if ( status ) {
209 
210    if ( status == EINVAL ) {      /* name -> ID translation failed */
211      if ( !(oflag & O_CREAT) ) {  /* willing to create it? */
212        set_errno_and_return_minus_one( ENOENT );
213        return (mqd_t) -1;
214      }
215      /* we are willing to create it */
216    }
217    set_errno_and_return_minus_one( status ); /* some type of error */
218    return (mqd_t) -1;
219 
220  } else {                /* name -> ID translation succeeded */
221 
222    if ( (oflag & (O_CREAT | O_EXCL)) == (O_CREAT | O_EXCL) ) {
223      set_errno_and_return_minus_one( EEXIST );
224      return (mqd_t) -1;
225    }
226 
227    /*
228     * XXX In this case we need to do an ID->pointer conversion to
229     *     check the mode.   This is probably a good place for a subroutine.
230     */
231 
232    the_mq->open_count += 1;
233 
234    return (mqd_t)&the_mq->Object.id;
235 
236  }
237 
238  /* XXX verify this comment...
239   *
240   *  At this point, the message queue does not exist and everything has been
241   *  checked. We should go ahead and create a message queue.
242   */
243 
244  status = _POSIX_Message_queue_Create_support(
245    name,
246    TRUE,         /* shared across processes */
247    oflag,
248    attr,
249    &the_mq
250  );
251 
252  if ( status == -1 )
253    return (mqd_t) -1;
254 
255  return (mqd_t) &the_mq->Object.id;
256}
257
258/*PAGE
259 *
260 *  _POSIX_Message_queue_Delete
261 */
262 
263void _POSIX_Message_queue_Delete(
264  POSIX_Message_queue_Control *the_mq
265)
266{
267  if ( !the_mq->linked && !the_mq->open_count ) {
268    _POSIX_Message_queue_Free( the_mq );
269 
270#if defined(RTEMS_MULTIPROCESSING)
271    if ( the_mq->process_shared == PTHREAD_PROCESS_SHARED ) {
272 
273      _Objects_MP_Close(
274        &_POSIX_Message_queue_Information,
275        the_mq->Object.id
276      );
277 
278      _POSIX_Message_queue_MP_Send_process_packet(
279        POSIX_MESSAGE_QUEUE_MP_ANNOUNCE_DELETE,
280        the_mq->Object.id,
281        0,                         /* Not used */
282        0                          /* Not used */
283      );
284    }
285#endif
286 
287  }
288}
289
290/*PAGE
291 *
292 *  15.2.2 Close a Message Queue, P1003.1b-1993, p. 275
293 */
294
295int mq_close(
296  mqd_t  mqdes
297)
298{
299  register POSIX_Message_queue_Control *the_mq;
300  Objects_Locations                     location;
301 
302  the_mq = _POSIX_Message_queue_Get( mqdes, &location );
303  switch ( location ) {
304    case OBJECTS_ERROR:
305      set_errno_and_return_minus_one( EINVAL );
306    case OBJECTS_REMOTE:
307      _Thread_Dispatch();
308      return POSIX_MP_NOT_IMPLEMENTED();
309      set_errno_and_return_minus_one( EINVAL );
310    case OBJECTS_LOCAL:
311      the_mq->open_count -= 1;
312      _POSIX_Message_queue_Delete( the_mq );
313      _Thread_Enable_dispatch();
314      return 0;
315  }
316  return POSIX_BOTTOM_REACHED();
317}
318
319/*PAGE
320 *
321 *  15.2.2 Remove a Message Queue, P1003.1b-1993, p. 276
322 */
323
324int mq_unlink(
325  const char *name
326)
327{
328  int  status;
329  register POSIX_Message_queue_Control *the_mq;
330  Objects_Id                        the_mq_id;
331  Objects_Locations                 location;
332 
333  status = _POSIX_Message_queue_Name_to_id( name, &the_mq_id );
334 
335  if ( !status )
336    set_errno_and_return_minus_one( status );
337 
338  the_mq = _POSIX_Message_queue_Get( the_mq_id, &location );
339  switch ( location ) {
340    case OBJECTS_ERROR:
341      set_errno_and_return_minus_one( EINVAL );
342    case OBJECTS_REMOTE:
343      _Thread_Dispatch();
344      return POSIX_MP_NOT_IMPLEMENTED();
345      set_errno_and_return_minus_one( EINVAL );
346    case OBJECTS_LOCAL:
347 
348#if defined(RTEMS_MULTIPROCESSING)
349      _Objects_MP_Close(
350        &_POSIX_Message_queue_Information,
351        the_mq->Object.id
352      );
353#endif
354 
355      the_mq->linked = FALSE;
356 
357      _POSIX_Message_queue_Delete( the_mq );
358 
359      _Thread_Enable_dispatch();
360      return 0;
361  }
362  return POSIX_BOTTOM_REACHED();
363}
364
365/*PAGE
366 *
367 *  _POSIX_Message_queue_Send_support
368 */
369 
370int _POSIX_Message_queue_Send_support(
371  mqd_t               mqdes,
372  const char         *msg_ptr,
373  unsigned32          msg_len,
374  Priority_Control    msg_prio,
375  Watchdog_Interval   timeout
376)
377{
378  register POSIX_Message_queue_Control *the_mq;
379  Objects_Locations                     location;
380 
381  the_mq = _POSIX_Message_queue_Get( mqdes, &location );
382  switch ( location ) {
383    case OBJECTS_ERROR:
384      set_errno_and_return_minus_one( EINVAL );
385    case OBJECTS_REMOTE:
386      _Thread_Dispatch();
387      return POSIX_MP_NOT_IMPLEMENTED();
388      set_errno_and_return_minus_one( EINVAL );
389    case OBJECTS_LOCAL:
390      /* XXX must add support for timeout and priority */
391      _CORE_message_queue_Send(
392        &the_mq->Message_queue,
393        (void *) msg_ptr,
394        msg_len,
395        mqdes,
396#if defined(RTEMS_MULTIPROCESSING)
397        NULL       /* XXX _POSIX_Message_queue_Core_message_queue_mp_support*/
398#else
399        NULL
400#endif
401      );
402      _Thread_Enable_dispatch();
403      return _Thread_Executing->Wait.return_code;
404  }
405  return POSIX_BOTTOM_REACHED();
406}
407
408/*PAGE
409 *
410 *  15.2.4 Send a Message to a Message Queue, P1003.1b-1993, p. 277
411 *
412 *  NOTE: P1003.4b/D8, p. 45 adds mq_timedsend().
413 */
414
415int mq_send(
416  mqd_t         mqdes,
417  const char   *msg_ptr,
418  size_t        msg_len,
419  unsigned int  msg_prio
420)
421{
422  return _POSIX_Message_queue_Send_support(
423    mqdes,
424    msg_ptr,
425    msg_len,
426    msg_prio,
427    THREAD_QUEUE_WAIT_FOREVER
428  );
429}
430
431/*PAGE
432 *
433 *  15.2.4 Send a Message to a Message Queue, P1003.1b-1993, p. 277
434 *
435 *  NOTE: P1003.4b/D8, p. 45 adds mq_timedsend().
436 */
437
438int mq_timedsend(
439  mqd_t                  mqdes,
440  const char            *msg_ptr,
441  size_t                 msg_len,
442  unsigned int           msg_prio,
443  const struct timespec *timeout
444)
445{
446  return _POSIX_Message_queue_Send_support(
447    mqdes,
448    msg_ptr,
449    msg_len,
450    msg_prio,
451    _POSIX_Timespec_to_interval( timeout )
452  );
453}
454
455/*PAGE
456 *
457 *  _POSIX_Message_queue_Receive_support
458 */
459 
460/* XXX be careful ... watch the size going through all the layers ... */
461
462ssize_t _POSIX_Message_queue_Receive_support(
463  mqd_t               mqdes,
464  char               *msg_ptr,
465  size_t              msg_len,
466  unsigned int       *msg_prio,
467  Watchdog_Interval   timeout
468)
469{
470  register POSIX_Message_queue_Control *the_mq;
471  Objects_Locations                     location;
472  unsigned32                            status = 0;
473  unsigned32                            length_out;
474 
475  the_mq = _POSIX_Message_queue_Get( mqdes, &location );
476  switch ( location ) {
477    case OBJECTS_ERROR:
478      set_errno_and_return_minus_one( EINVAL );
479    case OBJECTS_REMOTE:
480      _Thread_Dispatch();
481      return POSIX_MP_NOT_IMPLEMENTED();
482      set_errno_and_return_minus_one( EINVAL );
483    case OBJECTS_LOCAL:
484      /* XXX need to define the options argument to this */
485      length_out = msg_len;
486      _CORE_message_queue_Seize(
487        &the_mq->Message_queue,
488        mqdes,
489        msg_ptr,
490        &length_out,
491        /* msg_prio,    XXXX */
492        the_mq->blocking,
493        timeout
494      );
495      _Thread_Enable_dispatch();
496      if ( !status )
497        return length_out;
498      /* XXX --- the return codes gotta be looked at .. fix this */
499      return _Thread_Executing->Wait.return_code;
500  }
501  return POSIX_BOTTOM_REACHED();
502}
503
504/*PAGE
505 *
506 *  15.2.5 Receive a Message From a Message Queue, P1003.1b-1993, p. 279
507 *
508 *  NOTE: P1003.4b/D8, p. 45 adds mq_timedreceive().
509 */
510
511ssize_t mq_receive(
512  mqd_t         mqdes,
513  char         *msg_ptr,
514  size_t        msg_len,
515  unsigned int *msg_prio
516)
517{
518  return _POSIX_Message_queue_Receive_support(
519    mqdes,
520    msg_ptr,
521    msg_len,
522    msg_prio,
523    THREAD_QUEUE_WAIT_FOREVER
524  );
525}
526
527/*PAGE
528 *
529 *  15.2.5 Receive a Message From a Message Queue, P1003.1b-1993, p. 279
530 *
531 *  NOTE: P1003.4b/D8, p. 45 adds mq_timedreceive().
532 */
533
534int mq_timedreceive(                  /* XXX: should this be ssize_t */
535  mqd_t                  mqdes,
536  char                  *msg_ptr,
537  size_t                 msg_len,
538  unsigned int          *msg_prio,
539  const struct timespec *timeout
540)
541{
542  return _POSIX_Message_queue_Receive_support(
543    mqdes,
544    msg_ptr,
545    msg_len,
546    msg_prio,
547    _POSIX_Timespec_to_interval( timeout )
548  );
549}
550
551/*PAGE
552 *
553 *  _POSIX_Message_queue_Notify_handler
554 *
555 */
556
557void _POSIX_Message_queue_Notify_handler(
558  void    *user_data
559)
560{
561  POSIX_Message_queue_Control *the_mq;
562
563  the_mq = user_data;
564
565  /* XXX do something with signals here!!!! */
566}
567
568/*PAGE
569 *
570 *  15.2.6 Notify Process that a Message is Available on a Queue,
571 *         P1003.1b-1993, p. 280
572 */
573
574int mq_notify(
575  mqd_t                  mqdes,
576  const struct sigevent *notification
577)
578{
579  register POSIX_Message_queue_Control *the_mq;
580  Objects_Locations                     location;
581 
582  the_mq = _POSIX_Message_queue_Get( mqdes, &location );
583  switch ( location ) {
584    case OBJECTS_ERROR:
585      set_errno_and_return_minus_one( EBADF );
586    case OBJECTS_REMOTE:
587      _Thread_Dispatch();
588      return POSIX_MP_NOT_IMPLEMENTED();
589      set_errno_and_return_minus_one( EINVAL );
590    case OBJECTS_LOCAL:
591      if ( notification ) {
592        if ( _CORE_message_queue_Is_notify_enabled( &the_mq->Message_queue ) ) {
593          _Thread_Enable_dispatch();
594          set_errno_and_return_minus_one( EBUSY );
595        }
596
597        _CORE_message_queue_Set_notify( &the_mq->Message_queue, NULL, NULL );
598
599        the_mq->notification = *notification;
600     
601        _CORE_message_queue_Set_notify(
602          &the_mq->Message_queue,
603          _POSIX_Message_queue_Notify_handler,
604          the_mq
605        );
606      } else {
607
608        _CORE_message_queue_Set_notify( &the_mq->Message_queue, NULL, NULL );
609
610      }
611
612      _Thread_Enable_dispatch();
613      return 0;
614  }
615  return POSIX_BOTTOM_REACHED();
616}
617
618/*PAGE
619 *
620 *  15.2.7 Set Message Queue Attributes, P1003.1b-1993, p. 281
621 */
622
623int mq_setattr(
624  mqd_t                 mqdes,
625  const struct mq_attr *mqstat,
626  struct mq_attr       *omqstat
627)
628{
629  register POSIX_Message_queue_Control *the_mq;
630  Objects_Locations                     location;
631  CORE_message_queue_Attributes        *the_mq_attr;
632 
633  the_mq = _POSIX_Message_queue_Get( mqdes, &location );
634  switch ( location ) {
635    case OBJECTS_ERROR:
636      set_errno_and_return_minus_one( EINVAL );
637    case OBJECTS_REMOTE:
638      _Thread_Dispatch();
639      return POSIX_MP_NOT_IMPLEMENTED();
640      set_errno_and_return_minus_one( EINVAL );
641    case OBJECTS_LOCAL:
642      /*
643       *  Return the old values.
644       */
645
646      /* XXX this is the same stuff as is in mq_getattr... and probably */
647      /* XXX should be in an inlined private routine */
648
649      the_mq_attr = &the_mq->Message_queue.Attributes;
650
651      omqstat->mq_flags   = the_mq->flags;
652      omqstat->mq_msgsize = the_mq->Message_queue.maximum_message_size;
653      omqstat->mq_maxmsg  = the_mq->Message_queue.maximum_pending_messages;
654      omqstat->mq_curmsgs = the_mq->Message_queue.number_of_pending_messages;
655 
656      /*
657       *  Ignore everything except the O_NONBLOCK bit.
658       */
659
660      if (  mqstat->mq_flags & O_NONBLOCK ) 
661        the_mq->blocking = FALSE;
662      else
663        the_mq->blocking = TRUE;
664 
665      the_mq->flags = mqstat->mq_flags;
666
667      _Thread_Enable_dispatch();
668      return 0;
669  }
670  return POSIX_BOTTOM_REACHED();
671}
672
673/*PAGE
674 *
675 *  15.2.8 Get Message Queue Attributes, P1003.1b-1993, p. 283
676 */
677
678int mq_getattr(
679  mqd_t           mqdes,
680  struct mq_attr *mqstat
681)
682{
683  register POSIX_Message_queue_Control *the_mq;
684  Objects_Locations                     location;
685  CORE_message_queue_Attributes        *the_mq_attr;
686 
687  the_mq = _POSIX_Message_queue_Get( mqdes, &location );
688  switch ( location ) {
689    case OBJECTS_ERROR:
690      set_errno_and_return_minus_one( EINVAL );
691    case OBJECTS_REMOTE:
692      _Thread_Dispatch();
693      return POSIX_MP_NOT_IMPLEMENTED();
694      set_errno_and_return_minus_one( EINVAL );
695    case OBJECTS_LOCAL:
696      /*
697       *  Return the old values.
698       */
699 
700      /* XXX this is the same stuff as is in mq_setattr... and probably */
701      /* XXX should be in an inlined private routine */
702 
703      the_mq_attr = &the_mq->Message_queue.Attributes;
704 
705      mqstat->mq_flags   = the_mq->flags;
706      mqstat->mq_msgsize = the_mq->Message_queue.maximum_message_size;
707      mqstat->mq_maxmsg  = the_mq->Message_queue.maximum_pending_messages;
708      mqstat->mq_curmsgs = the_mq->Message_queue.number_of_pending_messages;
709 
710      _Thread_Enable_dispatch();
711      return 0;
712  }
713  return POSIX_BOTTOM_REACHED();
714}
Note: See TracBrowser for help on using the repository browser.