source: rtems/c/src/exec/rtems/src/msg.c @ 13f09e6

4.104.114.84.95
Last change on this file since 13f09e6 was e7d8b58, checked in by Joel Sherrill <joel.sherrill@…>, on 07/31/97 at 19:01:42

Added rtems_message_queue_get_number_pending directive.

  • Property mode set to 100644
File size: 20.2 KB
Line 
1/*
2 *  Message Queue Manager
3 *
4 *
5 *  COPYRIGHT (c) 1989-1997.
6 *  On-Line Applications Research Corporation (OAR).
7 *  Copyright assigned to U.S. Government, 1994.
8 *
9 *  The license and distribution terms for this file may in
10 *  the file LICENSE in this distribution or at
11 *  http://www.OARcorp.com/rtems/license.html.
12 *
13 *  $Id$
14 */
15
16#include <rtems/system.h>
17#include <rtems/score/sysstate.h>
18#include <rtems/score/chain.h>
19#include <rtems/score/isr.h>
20#include <rtems/score/coremsg.h>
21#include <rtems/score/object.h>
22#include <rtems/score/states.h>
23#include <rtems/score/thread.h>
24#include <rtems/score/wkspace.h>
25#include <rtems/score/mpci.h>
26#include <rtems/rtems/status.h>
27#include <rtems/rtems/attr.h>
28#include <rtems/rtems/message.h>
29#include <rtems/rtems/options.h>
30#include <rtems/rtems/support.h>
31
32/*PAGE
33 *
34 *  _Message_queue_Manager_initialization
35 *
36 *  This routine initializes all message queue manager related
37 *  data structures.
38 *
39 *  Input parameters:
40 *    maximum_message_queues - number of message queues to initialize
41 *
42 *  Output parameters:  NONE
43 */
44
45void _Message_queue_Manager_initialization(
46  unsigned32 maximum_message_queues
47)
48{
49  _Objects_Initialize_information(
50    &_Message_queue_Information,
51    OBJECTS_RTEMS_MESSAGE_QUEUES,
52    TRUE,
53    maximum_message_queues,
54    sizeof( Message_queue_Control ),
55    FALSE,
56    RTEMS_MAXIMUM_NAME_LENGTH,
57    FALSE
58  );
59
60  /*
61   *  Register the MP Process Packet routine.
62   */
63
64  _MPCI_Register_packet_processor(
65    MP_PACKET_MESSAGE_QUEUE,
66    _Message_queue_MP_Process_packet
67  );
68
69}
70
71/*PAGE
72 *
73 *  _Message_queue_Allocate
74 *
75 *  Allocate a message queue and the space for its messages
76 *
77 *  Input parameters:
78 *    the_message_queue - the message queue to allocate message buffers
79 *    count             - maximum message and reserved buffer count
80 *    max_message_size  - maximum size of each message
81 *
82 *  Output parameters:
83 *    the_message_queue - set if successful, NULL otherwise
84 */
85
86Message_queue_Control *_Message_queue_Allocate (
87  unsigned32           count,
88  unsigned32           max_message_size
89)
90{
91  return
92    (Message_queue_Control *)_Objects_Allocate(&_Message_queue_Information);
93
94}
95
96/*PAGE
97 *
98 *  rtems_message_queue_create
99 *
100 *  This directive creates a message queue by allocating and initializing
101 *  a message queue data structure.
102 *
103 *  Input parameters:
104 *    name             - user defined queue name
105 *    count            - maximum message and reserved buffer count
106 *    max_message_size - maximum size of each message
107 *    attribute_set    - process method
108 *    id               - pointer to queue
109 *
110 *  Output parameters:
111 *    id                - queue id
112 *    RTEMS_SUCCESSFUL  - if successful
113 *    error code        - if unsuccessful
114 */
115
116rtems_status_code rtems_message_queue_create(
117  rtems_name          name,
118  unsigned32          count,
119  unsigned32          max_message_size,
120  rtems_attribute     attribute_set,
121  Objects_Id         *id
122)
123{
124  register Message_queue_Control *the_message_queue;
125  CORE_message_queue_Attributes   the_message_queue_attributes;
126  boolean                         is_global;
127
128  if ( !rtems_is_name_valid( name ) )
129    return RTEMS_INVALID_NAME;
130
131  if ( (is_global = _Attributes_Is_global( attribute_set ) ) &&
132       !_System_state_Is_multiprocessing )
133    return RTEMS_MP_NOT_CONFIGURED;
134
135  if (count == 0)
136      return RTEMS_INVALID_NUMBER;
137
138  if (max_message_size == 0)
139      return RTEMS_INVALID_SIZE;
140
141#if 1
142  /*
143   * I am not 100% sure this should be an error.
144   * It seems reasonable to create a que with a large max size,
145   * and then just send smaller msgs from remote (or all) nodes.
146   */
147 
148  if ( is_global && (_MPCI_table->maximum_packet_size < max_message_size) )
149    return RTEMS_INVALID_SIZE;
150
151#endif
152       
153  _Thread_Disable_dispatch();              /* protects object pointer */
154
155  the_message_queue = _Message_queue_Allocate( count, max_message_size );
156
157  if ( !the_message_queue ) {
158    _Thread_Enable_dispatch();
159    return RTEMS_TOO_MANY;
160  }
161
162  if ( is_global &&
163    !( _Objects_MP_Allocate_and_open( &_Message_queue_Information,
164                              name, the_message_queue->Object.id, FALSE ) ) ) {
165    _Message_queue_Free( the_message_queue );
166    _Thread_Enable_dispatch();
167    return RTEMS_TOO_MANY;
168  }
169
170  the_message_queue->attribute_set = attribute_set;
171
172  if (_Attributes_Is_priority( attribute_set ) )
173    the_message_queue_attributes.discipline =
174                                      CORE_MESSAGE_QUEUE_DISCIPLINES_PRIORITY;
175  else
176    the_message_queue_attributes.discipline =
177                                      CORE_MESSAGE_QUEUE_DISCIPLINES_FIFO;
178
179  if ( ! _CORE_message_queue_Initialize(
180           &the_message_queue->message_queue,
181           OBJECTS_RTEMS_MESSAGE_QUEUES,
182           &the_message_queue_attributes,
183           count,
184           max_message_size,
185           _Message_queue_MP_Send_extract_proxy ) ) {
186    if ( is_global )
187        _Objects_MP_Close(
188          &_Message_queue_Information, the_message_queue->Object.id);
189
190    _Message_queue_Free( the_message_queue );
191    _Thread_Enable_dispatch();
192    return RTEMS_TOO_MANY;
193  }
194
195  _Objects_Open(
196    &_Message_queue_Information,
197    &the_message_queue->Object,
198    &name
199  );
200
201  *id = the_message_queue->Object.id;
202
203  if ( is_global )
204    _Message_queue_MP_Send_process_packet(
205      MESSAGE_QUEUE_MP_ANNOUNCE_CREATE,
206      the_message_queue->Object.id,
207      name,
208      0
209    );
210
211  _Thread_Enable_dispatch();
212  return RTEMS_SUCCESSFUL;
213}
214
215/*PAGE
216 *
217 *  rtems_message_queue_ident
218 *
219 *  This directive returns the system ID associated with
220 *  the message queue name.
221 *
222 *  Input parameters:
223 *    name - user defined message queue name
224 *    node - node(s) to be searched
225 *    id   - pointer to message queue id
226 *
227 *  Output parameters:
228 *    *id               - message queue id
229 *    RTEMS_SUCCESSFUL - if successful
230 *    error code        - if unsuccessful
231 */
232
233rtems_status_code rtems_message_queue_ident(
234  rtems_name    name,
235  unsigned32    node,
236  Objects_Id   *id
237)
238{
239  Objects_Name_to_id_errors  status;
240
241  status = _Objects_Name_to_id(
242    &_Message_queue_Information,
243    &name,
244    node,
245    id
246  );
247
248  return _Status_Object_name_errors_to_status[ status ];
249}
250
251/*PAGE
252 *
253 *  rtems_message_queue_delete
254 *
255 *  This directive allows a thread to delete the message queue specified
256 *  by the given queue identifier.
257 *
258 *  Input parameters:
259 *    id - queue id
260 *
261 *  Output parameters:
262 *    RTEMS_SUCCESSFUL - if successful
263 *    error code        - if unsuccessful
264 */
265
266rtems_status_code rtems_message_queue_delete(
267  Objects_Id id
268)
269{
270  register Message_queue_Control *the_message_queue;
271  Objects_Locations               location;
272
273  the_message_queue = _Message_queue_Get( id, &location );
274  switch ( location ) {
275    case OBJECTS_ERROR:
276      return RTEMS_INVALID_ID;
277    case OBJECTS_REMOTE:
278      _Thread_Dispatch();
279      return RTEMS_ILLEGAL_ON_REMOTE_OBJECT;
280    case OBJECTS_LOCAL:
281      _Objects_Close( &_Message_queue_Information,
282                      &the_message_queue->Object );
283
284      _CORE_message_queue_Close(
285        &the_message_queue->message_queue,
286        _Message_queue_MP_Send_object_was_deleted,
287        CORE_MESSAGE_QUEUE_STATUS_WAS_DELETED
288      );
289
290      _Message_queue_Free( the_message_queue );
291
292      if ( _Attributes_Is_global( the_message_queue->attribute_set ) ) {
293        _Objects_MP_Close(
294          &_Message_queue_Information,
295          the_message_queue->Object.id
296        );
297
298        _Message_queue_MP_Send_process_packet(
299          MESSAGE_QUEUE_MP_ANNOUNCE_DELETE,
300          the_message_queue->Object.id,
301          0,                                 /* Not used */
302          0
303        );
304      }
305
306      _Thread_Enable_dispatch();
307      return RTEMS_SUCCESSFUL;
308  }
309
310  return RTEMS_INTERNAL_ERROR;   /* unreached - only to remove warnings */
311}
312
313/*PAGE
314 *
315 *  rtems_message_queue_send
316 *
317 *  This routine implements the directives q_send.  It sends a
318 *  message to the specified message queue.
319 *
320 *  Input parameters:
321 *    id     - pointer to message queue
322 *    buffer - pointer to message buffer
323 *    size   - size of message to sent urgently
324 *
325 *  Output parameters:
326 *    RTEMS_SUCCESSFUL - if successful
327 *    error code        - if unsuccessful
328 */
329
330rtems_status_code rtems_message_queue_send(
331  Objects_Id            id,
332  void                 *buffer,
333  unsigned32            size
334)
335{
336  return( _Message_queue_Submit(id, buffer, size, MESSAGE_QUEUE_SEND_REQUEST) );
337}
338
339/*PAGE
340 *
341 *  rtems_message_queue_urgent
342 *
343 *  This routine implements the directives q_urgent.  It urgents a
344 *  message to the specified message queue.
345 *
346 *  Input parameters:
347 *    id     - pointer to message queue
348 *    buffer - pointer to message buffer
349 *    size   - size of message to sent urgently
350 *
351 *  Output parameters:
352 *    RTEMS_SUCCESSFUL - if successful
353 *    error code       - if unsuccessful
354 */
355
356rtems_status_code rtems_message_queue_urgent(
357  Objects_Id            id,
358  void                 *buffer,
359  unsigned32            size
360)
361{
362  return(_Message_queue_Submit(id, buffer, size, MESSAGE_QUEUE_URGENT_REQUEST));
363}
364
365/*PAGE
366 *
367 *  rtems_message_queue_broadcast
368 *
369 *  This directive sends a message for every thread waiting on the queue
370 *  designated by id.
371 *
372 *  Input parameters:
373 *    id     - pointer to message queue
374 *    buffer - pointer to message buffer
375 *    size   - size of message to broadcast
376 *    count  - pointer to area to store number of threads made ready
377 *
378 *  Output parameters:
379 *    count             - number of threads made ready
380 *    RTEMS_SUCCESSFUL  - if successful
381 *    error code        - if unsuccessful
382 */
383
384rtems_status_code rtems_message_queue_broadcast(
385  Objects_Id            id,
386  void                 *buffer,
387  unsigned32            size,
388  unsigned32           *count
389)
390{
391  register Message_queue_Control *the_message_queue;
392  Objects_Locations               location;
393  CORE_message_queue_Status       core_status;
394
395  the_message_queue = _Message_queue_Get( id, &location );
396  switch ( location ) {
397    case OBJECTS_ERROR:
398      return RTEMS_INVALID_ID;
399    case OBJECTS_REMOTE:
400      _Thread_Executing->Wait.return_argument = count;
401
402      return
403        _Message_queue_MP_Send_request_packet(
404          MESSAGE_QUEUE_MP_BROADCAST_REQUEST,
405          id,
406          buffer,
407          &size,
408          0,                               /* option_set not used */
409          MPCI_DEFAULT_TIMEOUT
410        );
411
412    case OBJECTS_LOCAL:
413      core_status = _CORE_message_queue_Broadcast(
414                      &the_message_queue->message_queue,
415                      buffer,
416                      size,
417                      id,
418                      _Message_queue_Core_message_queue_mp_support,
419                      count
420                    );
421                     
422      _Thread_Enable_dispatch();
423      return
424        _Message_queue_Translate_core_message_queue_return_code( core_status );
425
426  }
427  return RTEMS_INTERNAL_ERROR;   /* unreached - only to remove warnings */
428}
429
430/*PAGE
431 *
432 *  rtems_message_queue_receive
433 *
434 *  This directive dequeues a message from the designated message queue
435 *  and copies it into the requesting thread's buffer.
436 *
437 *  Input parameters:
438 *    id         - queue id
439 *    buffer     - pointer to message buffer
440 *    size       - size of message receive
441 *    option_set - options on receive
442 *    timeout    - number of ticks to wait
443 *
444 *  Output parameters:
445 *    RTEMS_SUCCESSFUL - if successful
446 *    error code       - if unsuccessful
447 */
448
449rtems_status_code rtems_message_queue_receive(
450  Objects_Id            id,
451  void                 *buffer,
452  unsigned32           *size,
453  unsigned32            option_set,
454  rtems_interval        timeout
455)
456{
457  register Message_queue_Control *the_message_queue;
458  Objects_Locations               location;
459  boolean                         wait;
460
461  the_message_queue = _Message_queue_Get( id, &location );
462  switch ( location ) {
463
464    case OBJECTS_ERROR:
465      return RTEMS_INVALID_ID;
466
467    case OBJECTS_REMOTE:
468      return _Message_queue_MP_Send_request_packet(
469          MESSAGE_QUEUE_MP_RECEIVE_REQUEST,
470          id,
471          buffer,
472          size,
473          option_set,
474          timeout
475        );
476
477    case OBJECTS_LOCAL:
478      if ( _Options_Is_no_wait( option_set ) )
479        wait = FALSE;
480      else
481        wait = TRUE;
482 
483      _CORE_message_queue_Seize(
484        &the_message_queue->message_queue,
485        the_message_queue->Object.id,
486        buffer,
487        size,
488        wait,
489        timeout
490      );
491      _Thread_Enable_dispatch();
492      return( _Message_queue_Translate_core_message_queue_return_code(
493                  _Thread_Executing->Wait.return_code ) );
494
495  }
496
497  return RTEMS_INTERNAL_ERROR;   /* unreached - only to remove warnings */
498}
499
500/*PAGE
501 *
502 *  rtems_message_queue_flush
503 *
504 *  This directive removes all pending messages from a queue and returns
505 *  the number of messages removed.  If no messages were present then
506 *  a count of zero is returned.
507 *
508 *  Input parameters:
509 *    id    - queue id
510 *    count - return area for count
511 *
512 *  Output parameters:
513 *    count             - number of messages removed ( 0 = empty queue )
514 *    RTEMS_SUCCESSFUL - if successful
515 *    error code        - if unsuccessful
516 */
517
518rtems_status_code rtems_message_queue_flush(
519  Objects_Id  id,
520  unsigned32 *count
521)
522{
523  register Message_queue_Control *the_message_queue;
524  Objects_Locations               location;
525
526  the_message_queue = _Message_queue_Get( id, &location );
527  switch ( location ) {
528    case OBJECTS_ERROR:
529      return RTEMS_INVALID_ID;
530    case OBJECTS_REMOTE:
531      _Thread_Executing->Wait.return_argument = count;
532
533      return
534        _Message_queue_MP_Send_request_packet(
535          MESSAGE_QUEUE_MP_FLUSH_REQUEST,
536          id,
537          0,                               /* buffer not used */
538          0,                               /* size */
539          0,                               /* option_set not used */
540          MPCI_DEFAULT_TIMEOUT
541        );
542
543    case OBJECTS_LOCAL:
544      *count = _CORE_message_queue_Flush( &the_message_queue->message_queue );
545      _Thread_Enable_dispatch();
546      return RTEMS_SUCCESSFUL;
547  }
548
549  return RTEMS_INTERNAL_ERROR;   /* unreached - only to remove warnings */
550}
551
552/*PAGE
553 *
554 *  rtems_message_queue_get_number_pending
555 *
556 *  This directive returns the number of messages pending.
557 *
558 *  Input parameters:
559 *    id    - queue id
560 *    count - return area for count
561 *
562 *  Output parameters:
563 *    count             - number of messages removed ( 0 = empty queue )
564 *    RTEMS_SUCCESSFUL - if successful
565 *    error code        - if unsuccessful
566 */
567
568rtems_status_code rtems_message_queue_get_number_pending(
569  Objects_Id  id,
570  unsigned32 *count
571)
572{
573  register Message_queue_Control *the_message_queue;
574  Objects_Locations               location;
575
576  the_message_queue = _Message_queue_Get( id, &location );
577  switch ( location ) {
578    case OBJECTS_ERROR:
579      return RTEMS_INVALID_ID;
580    case OBJECTS_REMOTE:
581      _Thread_Executing->Wait.return_argument = count;
582
583      return
584        _Message_queue_MP_Send_request_packet(
585          MESSAGE_QUEUE_MP_GET_NUMBER_PENDING_REQUEST,
586          id,
587          0,                               /* buffer not used */
588          0,                               /* size */
589          0,                               /* option_set not used */
590          MPCI_DEFAULT_TIMEOUT
591        );
592
593    case OBJECTS_LOCAL:
594      *count = the_message_queue->message_queue.number_of_pending_messages;
595      _Thread_Enable_dispatch();
596      return RTEMS_SUCCESSFUL;
597  }
598
599  return RTEMS_INTERNAL_ERROR;   /* unreached - only to remove warnings */
600}
601
602/*PAGE
603 *
604 *  _Message_queue_Submit
605 *
606 *  This routine implements the directives rtems_message_queue_send
607 *  and rtems_message_queue_urgent.  It processes a message that is
608 *  to be submitted to the designated message queue.  The message will
609 *  either be processed as a send send message which it will be inserted
610 *  at the rear of the queue or it will be processed as an urgent message
611 *  which will be inserted at the front of the queue.
612 *
613 *  Input parameters:
614 *    id          - pointer to message queue
615 *    buffer      - pointer to message buffer
616 *    size        - size in bytes of message to send
617 *    submit_type - send or urgent message
618 *
619 *  Output parameters:
620 *    RTEMS_SUCCESSFUL - if successful
621 *    error code       - if unsuccessful
622 */
623
624rtems_status_code _Message_queue_Submit(
625  Objects_Id                  id,
626  void                       *buffer,
627  unsigned32                  size,
628  Message_queue_Submit_types  submit_type
629)
630{
631  register Message_queue_Control  *the_message_queue;
632  Objects_Locations                location;
633  CORE_message_queue_Status        core_status;
634
635  the_message_queue = _Message_queue_Get( id, &location );
636  switch ( location )
637  {
638    case OBJECTS_ERROR:
639      return RTEMS_INVALID_ID;
640
641    case OBJECTS_REMOTE:
642      switch ( submit_type ) {
643        case MESSAGE_QUEUE_SEND_REQUEST:
644          return
645            _Message_queue_MP_Send_request_packet(
646              MESSAGE_QUEUE_MP_SEND_REQUEST,
647              id,
648              buffer,
649              &size,
650              0,                               /* option_set */
651              MPCI_DEFAULT_TIMEOUT
652            );
653
654        case MESSAGE_QUEUE_URGENT_REQUEST:
655          return
656            _Message_queue_MP_Send_request_packet(
657              MESSAGE_QUEUE_MP_URGENT_REQUEST,
658              id,
659              buffer,
660              &size,
661              0,                               /* option_set */
662              MPCI_DEFAULT_TIMEOUT
663            );
664      }
665
666    case OBJECTS_LOCAL:
667      switch ( submit_type ) {
668        case MESSAGE_QUEUE_SEND_REQUEST:
669          core_status = _CORE_message_queue_Send(
670                          &the_message_queue->message_queue,
671                          buffer,
672                          size,
673                          id,
674                          _Message_queue_Core_message_queue_mp_support
675                        );
676          break;
677        case MESSAGE_QUEUE_URGENT_REQUEST:
678          core_status = _CORE_message_queue_Urgent(
679                          &the_message_queue->message_queue,
680                          buffer,
681                          size,
682                          id,
683                          _Message_queue_Core_message_queue_mp_support
684                        );
685          break;
686        default:
687          core_status = CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL;
688          return RTEMS_INTERNAL_ERROR;   /* should never get here */
689      }
690
691      _Thread_Enable_dispatch();
692      return _Message_queue_Translate_core_message_queue_return_code(
693                core_status );
694         
695  }
696  return RTEMS_INTERNAL_ERROR;   /* unreached - only to remove warnings */
697}
698
699/*PAGE
700 *
701 *  _Message_queue_Translate_core_message_queue_return_code
702 *
703 *  Input parameters:
704 *    the_message_queue_status - message_queue status code to translate
705 *
706 *  Output parameters:
707 *    rtems status code - translated RTEMS status code
708 *
709 */
710 
711rtems_status_code _Message_queue_Translate_core_message_queue_return_code (
712  unsigned32 the_message_queue_status
713)
714{
715  switch ( the_message_queue_status ) {
716    case  CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL:
717      return RTEMS_SUCCESSFUL;
718    case  CORE_MESSAGE_QUEUE_STATUS_INVALID_SIZE:
719      return RTEMS_INVALID_SIZE;
720    case  CORE_MESSAGE_QUEUE_STATUS_TOO_MANY:
721      return RTEMS_TOO_MANY;
722    case CORE_MESSAGE_QUEUE_STATUS_UNSATISFIED:
723      return RTEMS_UNSATISFIED;
724    case CORE_MESSAGE_QUEUE_STATUS_UNSATISFIED_NOWAIT:
725      return RTEMS_UNSATISFIED;
726    case CORE_MESSAGE_QUEUE_STATUS_WAS_DELETED:
727      return RTEMS_OBJECT_WAS_DELETED;
728    case CORE_MESSAGE_QUEUE_STATUS_TIMEOUT:
729      return RTEMS_TIMEOUT;
730    case THREAD_STATUS_PROXY_BLOCKING:
731      return RTEMS_PROXY_BLOCKING;
732  }
733  _Internal_error_Occurred(         /* XXX */
734    INTERNAL_ERROR_RTEMS_API,
735    TRUE,
736    the_message_queue_status
737  );
738  return RTEMS_INTERNAL_ERROR;   /* unreached - only to remove warnings */
739}
740
741/*PAGE
742 *
743 *  _Message_queue_Core_message_queue_mp_support
744 *
745 *  Input parameters:
746 *    the_thread - the remote thread the message was submitted to
747 *    id         - id of the message queue
748 *
749 *  Output parameters: NONE
750 */
751 
752void  _Message_queue_Core_message_queue_mp_support (
753  Thread_Control *the_thread,
754  Objects_Id      id
755)
756{
757  the_thread->receive_packet->return_code = RTEMS_SUCCESSFUL;
758 
759  _Message_queue_MP_Send_response_packet(
760    MESSAGE_QUEUE_MP_RECEIVE_RESPONSE,
761    id,
762    the_thread
763  );
764}
Note: See TracBrowser for help on using the repository browser.