source: rtems/cpukit/rtems/src/msg.c @ ac7d5ef0

4.104.114.84.95
Last change on this file since ac7d5ef0 was ac7d5ef0, checked in by Joel Sherrill <joel.sherrill@…>, on May 11, 1995 at 5:39:37 PM

Initial revision

  • Property mode set to 100644
File size: 19.0 KB
Line 
1/*
2 *  Message Queue Manager
3 *
4 *
5 *  COPYRIGHT (c) 1989, 1990, 1991, 1992, 1993, 1994.
6 *  On-Line Applications Research Corporation (OAR).
7 *  All rights assigned to U.S. Government, 1994.
8 *
9 *  This material may be reproduced by or for the U.S. Government pursuant
10 *  to the copyright license under the clause at DFARS 252.227-7013.  This
11 *  notice must appear in all copies of this file and its derivatives.
12 *
13 *  $Id$
14 */
15
16#include <rtems/system.h>
17#include <rtems/attr.h>
18#include <rtems/chain.h>
19#include <rtems/config.h>
20#include <rtems/isr.h>
21#include <rtems/message.h>
22#include <rtems/object.h>
23#include <rtems/options.h>
24#include <rtems/states.h>
25#include <rtems/thread.h>
26#include <rtems/wkspace.h>
27#include <rtems/mpci.h>
28
29/*PAGE
30 *
31 *  _Message_queue_Manager_initialization
32 *
33 *  This routine initializes all message queue manager related
34 *  data structures.
35 *
36 *  Input parameters:
37 *    maximum_message_queues - number of message queues to initialize
38 *    maximum_message        - number of messages per queue
39 *
40 *  Output parameters:  NONE
41 */
42
43void _Message_queue_Manager_initialization(
44  unsigned32 maximum_message_queues,
45  unsigned32 maximum_messages
46)
47{
48
49  _Objects_Initialize_information(
50    &_Message_queue_Information,
51    TRUE,
52    maximum_message_queues,
53    sizeof( Message_queue_Control )
54  );
55
56  if ( maximum_messages == 0 ) {
57
58    _Chain_Initialize_empty( &_Message_queue_Inactive_messages );
59
60  } else {
61
62
63    _Chain_Initialize(
64      &_Message_queue_Inactive_messages,
65      _Workspace_Allocate_or_fatal_error(
66        maximum_messages * sizeof( Message_queue_Buffer_control )
67      ),
68      maximum_messages,
69      sizeof( Message_queue_Buffer_control )
70    );
71
72  }
73}
74
75/*PAGE
76 *
77 *  rtems_message_queue_create
78 *
79 *  This directive creates a message queue by allocating and initializing
80 *  a message queue data structure.
81 *
82 *  Input parameters:
83 *    name          - user defined queue name
84 *    count         - maximum message and reserved buffer count
85 *    attribute_set - process method
86 *    id            - pointer to queue
87 *
88 *  Output parameters:
89 *    id                - queue id
90 *    RTEMS_SUCCESSFUL - if successful
91 *    error code        - if unsuccessful
92 */
93
94rtems_status_code rtems_message_queue_create(
95  Objects_Name        name,
96  unsigned32          count,
97  rtems_attribute  attribute_set,
98  Objects_Id         *id
99)
100{
101  register Message_queue_Control *the_message_queue;
102
103  if ( !_Objects_Is_name_valid( name ) )
104    return ( RTEMS_INVALID_NAME );
105
106  if ( _Attributes_Is_global( attribute_set ) &&
107       !_Configuration_Is_multiprocessing() )
108    return( RTEMS_MP_NOT_CONFIGURED );
109
110  _Thread_Disable_dispatch();              /* protects object pointer */
111
112  the_message_queue = _Message_queue_Allocate();
113
114  if ( !the_message_queue ) {
115    _Thread_Enable_dispatch();
116    return( RTEMS_TOO_MANY );
117  }
118
119  if ( _Attributes_Is_global( attribute_set ) &&
120       !( _Objects_MP_Open( &_Message_queue_Information, name,
121                            the_message_queue->Object.id, FALSE ) ) ) {
122    _Message_queue_Free( the_message_queue );
123    _Thread_Enable_dispatch();
124    return( RTEMS_TOO_MANY );
125  }
126
127  if ( _Attributes_Is_limit( attribute_set ) )
128    the_message_queue->maximum_pending_messages = count;
129  else
130    the_message_queue->maximum_pending_messages = 0xffffffff;
131
132  the_message_queue->attribute_set              = attribute_set;
133  the_message_queue->number_of_pending_messages = 0;
134
135  _Chain_Initialize_empty( &the_message_queue->Pending_messages );
136
137  _Thread_queue_Initialize( &the_message_queue->Wait_queue, attribute_set,
138                            STATES_WAITING_FOR_MESSAGE );
139
140  _Objects_Open( &_Message_queue_Information,
141                 &the_message_queue->Object, name );
142
143  *id = the_message_queue->Object.id;
144
145  if ( _Attributes_Is_global( attribute_set ) )
146    _Message_queue_MP_Send_process_packet(
147      MESSAGE_QUEUE_MP_ANNOUNCE_CREATE,
148      the_message_queue->Object.id,
149      name,
150      0
151    );
152
153  _Thread_Enable_dispatch();
154  return( RTEMS_SUCCESSFUL );
155}
156
157/*PAGE
158 *
159 *  rtems_message_queue_ident
160 *
161 *  This directive returns the system ID associated with
162 *  the message queue name.
163 *
164 *  Input parameters:
165 *    name - user defined message queue name
166 *    node - node(s) to be searched
167 *    id   - pointer to message queue id
168 *
169 *  Output parameters:
170 *    *id               - message queue id
171 *    RTEMS_SUCCESSFUL - if successful
172 *    error code        - if unsuccessful
173 */
174
175rtems_status_code rtems_message_queue_ident(
176  Objects_Name  name,
177  unsigned32    node,
178  Objects_Id   *id
179)
180{
181  return( _Objects_Name_to_id( &_Message_queue_Information, name,
182                               node, id ) );
183}
184
185/*PAGE
186 *
187 *  rtems_message_queue_delete
188 *
189 *  This directive allows a thread to delete the message queue specified
190 *  by the given queue identifier.
191 *
192 *  Input parameters:
193 *    id - queue id
194 *
195 *  Output parameters:
196 *    RTEMS_SUCCESSFUL - if successful
197 *    error code        - if unsuccessful
198 */
199
200rtems_status_code rtems_message_queue_delete(
201  Objects_Id id
202)
203{
204  register Message_queue_Control *the_message_queue;
205  Objects_Locations               location;
206
207  the_message_queue = _Message_queue_Get( id, &location );
208  switch ( location ) {
209    case OBJECTS_ERROR:
210      return( RTEMS_INVALID_ID );
211    case OBJECTS_REMOTE:
212      _Thread_Dispatch();
213      return( RTEMS_ILLEGAL_ON_REMOTE_OBJECT );
214    case OBJECTS_LOCAL:
215      _Objects_Close( &_Message_queue_Information,
216                      &the_message_queue->Object );
217
218      if ( the_message_queue->number_of_pending_messages != 0 )
219        (void) _Message_queue_Flush_support( the_message_queue );
220      else
221        _Thread_queue_Flush(
222          &the_message_queue->Wait_queue,
223          _Message_queue_MP_Send_object_was_deleted
224        );
225
226      _Message_queue_Free( the_message_queue );
227
228      if ( _Attributes_Is_global( the_message_queue->attribute_set ) ) {
229        _Objects_MP_Close(
230          &_Message_queue_Information,
231          the_message_queue->Object.id
232        );
233
234        _Message_queue_MP_Send_process_packet(
235          MESSAGE_QUEUE_MP_ANNOUNCE_DELETE,
236          the_message_queue->Object.id,
237          0,                                 /* Not used */
238          MPCI_DEFAULT_TIMEOUT
239        );
240      }
241
242      _Thread_Enable_dispatch();
243      return( RTEMS_SUCCESSFUL );
244  }
245
246  return( RTEMS_INTERNAL_ERROR );   /* unreached - only to remove warnings */
247}
248
249/*PAGE
250 *
251 *  rtems_message_queue_send
252 *
253 *  This routine implements the directives q_send.  It sends a
254 *  message to the specified message queue.
255 *
256 *  Input parameters:
257 *    id     - pointer to message queue
258 *    buffer - pointer to message buffer
259 *
260 *  Output parameters:
261 *    RTEMS_SUCCESSFUL - if successful
262 *    error code        - if unsuccessful
263 */
264
265rtems_status_code rtems_message_queue_send(
266  Objects_Id            id,
267  void                 *buffer
268)
269{
270  return( _Message_queue_Submit(
271      id,
272      (Message_queue_Buffer *) buffer,
273      MESSAGE_QUEUE_SEND_REQUEST
274    )
275  );
276}
277
278/*PAGE
279 *
280 *  rtems_message_queue_urgent
281 *
282 *  This routine implements the directives q_urgent.  It urgents a
283 *  message to the specified message queue.
284 *
285 *  Input parameters:
286 *    id     - pointer to message queue
287 *    buffer - pointer to message buffer
288 *
289 *  Output parameters:
290 *    RTEMS_SUCCESSFUL - if successful
291 *    error code - if unsuccessful
292 */
293
294rtems_status_code rtems_message_queue_urgent(
295  Objects_Id            id,
296  void                 *buffer
297)
298{
299  return( _Message_queue_Submit(
300      id,
301      (Message_queue_Buffer *) buffer,
302      MESSAGE_QUEUE_URGENT_REQUEST
303    )
304  );
305}
306
307/*PAGE
308 *
309 *  rtems_message_queue_broadcast
310 *
311 *  This directive sends a message for every thread waiting on the queue
312 *  designated by id.
313 *
314 *  Input parameters:
315 *    id     - pointer to message queue
316 *    buffer - pointer to message buffer
317 *    count  - pointer to area to store number of threads made ready
318 *
319 *  Output parameters:
320 *    count             - number of threads made ready
321 *    RTEMS_SUCCESSFUL - if successful
322 *    error code        - if unsuccessful
323 */
324
325rtems_status_code rtems_message_queue_broadcast(
326  Objects_Id            id,
327  void                 *buffer,
328  unsigned32           *count
329)
330{
331  register Message_queue_Control *the_message_queue;
332  Objects_Locations               location;
333  Thread_Control                 *the_thread;
334  unsigned32                      number_broadcasted;
335
336  the_message_queue = _Message_queue_Get( id, &location );
337  switch ( location ) {
338    case OBJECTS_ERROR:
339      return( RTEMS_INVALID_ID );
340    case OBJECTS_REMOTE:
341      _Thread_Executing->Wait.return_argument = count;
342
343      return
344        _Message_queue_MP_Send_request_packet(
345          MESSAGE_QUEUE_MP_BROADCAST_REQUEST,
346          id,
347          (Message_queue_Buffer *) buffer,
348          0,                               /* Not used */
349          MPCI_DEFAULT_TIMEOUT
350        );
351
352    case OBJECTS_LOCAL:
353      number_broadcasted = 0;
354      while ( (the_thread =
355                 _Thread_queue_Dequeue(&the_message_queue->Wait_queue)) ) {
356        number_broadcasted += 1;
357        _Message_queue_Copy_buffer(
358          (Message_queue_Buffer *) buffer,
359          the_thread->Wait.return_argument
360        );
361
362        if ( !_Objects_Is_local_id( the_thread->Object.id ) ) {
363          the_thread->receive_packet->return_code = RTEMS_SUCCESSFUL;
364
365          _Message_queue_MP_Send_response_packet(
366            MESSAGE_QUEUE_MP_RECEIVE_RESPONSE,
367            id,
368            the_thread
369          );
370        }
371      }
372      _Thread_Enable_dispatch();
373      *count = number_broadcasted;
374      return( RTEMS_SUCCESSFUL );
375  }
376
377  return( RTEMS_INTERNAL_ERROR );   /* unreached - only to remove warnings */
378}
379
380/*PAGE
381 *
382 *  rtems_message_queue_receive
383 *
384 *  This directive dequeues a message from the designated message queue
385 *  and copies it into the requesting thread's buffer.
386 *
387 *  Input parameters:
388 *    id         - queue id
389 *    buffer     - pointer to message buffer
390 *    option_set - options on receive
391 *    timeout    - number of ticks to wait
392 *
393 *  Output parameters:
394 *    RTEMS_SUCCESSFUL - if successful
395 *    error code        - if unsuccessful
396 */
397
398rtems_status_code rtems_message_queue_receive(
399  Objects_Id            id,
400  void                 *buffer,
401  unsigned32            option_set,
402  rtems_interval     timeout
403)
404{
405  register Message_queue_Control *the_message_queue;
406  Objects_Locations                      location;
407
408  the_message_queue = _Message_queue_Get( id, &location );
409  switch ( location ) {
410    case OBJECTS_ERROR:
411      return( RTEMS_INVALID_ID );
412    case OBJECTS_REMOTE:
413      _Thread_Executing->Wait.return_argument = buffer;
414      return
415        _Message_queue_MP_Send_request_packet(
416          MESSAGE_QUEUE_MP_RECEIVE_REQUEST,
417          id,
418          buffer,
419          option_set,
420          timeout
421        );
422
423    case OBJECTS_LOCAL:
424      if ( !_Message_queue_Seize( the_message_queue, option_set, buffer ) )
425        _Thread_queue_Enqueue( &the_message_queue->Wait_queue, timeout );
426      _Thread_Enable_dispatch();
427      return( _Thread_Executing->Wait.return_code );
428  }
429
430  return( RTEMS_INTERNAL_ERROR );   /* unreached - only to remove warnings */
431}
432
433/*PAGE
434 *
435 *  rtems_message_queue_flush
436 *
437 *  This directive removes all pending messages from a queue and returns
438 *  the number of messages removed.  If no messages were present then
439 *  a count of zero is returned.
440 *
441 *  Input parameters:
442 *    id    - queue id
443 *    count - return area for count
444 *
445 *  Output parameters:
446 *    count             - number of messages removed ( 0 = empty queue )
447 *    RTEMS_SUCCESSFUL - if successful
448 *    error code        - if unsuccessful
449 */
450
451rtems_status_code rtems_message_queue_flush(
452  Objects_Id  id,
453  unsigned32 *count
454)
455{
456  register Message_queue_Control *the_message_queue;
457  Objects_Locations               location;
458
459  the_message_queue = _Message_queue_Get( id, &location );
460  switch ( location ) {
461    case OBJECTS_ERROR:
462      return( RTEMS_INVALID_ID );
463    case OBJECTS_REMOTE:
464      _Thread_Executing->Wait.return_argument = count;
465
466      return
467        _Message_queue_MP_Send_request_packet(
468          MESSAGE_QUEUE_MP_FLUSH_REQUEST,
469          id,
470          0,                               /* Not used */
471          0,                               /* Not used */
472          MPCI_DEFAULT_TIMEOUT
473        );
474
475    case OBJECTS_LOCAL:
476      if ( the_message_queue->number_of_pending_messages != 0 )
477        *count = _Message_queue_Flush_support( the_message_queue );
478      else
479        *count = 0;
480      _Thread_Enable_dispatch();
481      return( RTEMS_SUCCESSFUL );
482  }
483
484  return( RTEMS_INTERNAL_ERROR );   /* unreached - only to remove warnings */
485}
486
487/*PAGE
488 *
489 *  _Message_queue_Seize
490 *
491 *  This kernel routine dequeues a message, copies the message buffer to
492 *  a given destination buffer, and frees the message buffer to the
493 *  inactive message pool.
494 *
495 *  Input parameters:
496 *    the_message_queue - pointer to message queue
497 *    option_set        - options on receive
498 *    the_buffer        - pointer to message buffer to be filled
499 *
500 *  Output parameters:
501 *    TRUE  - if message received or RTEMS_NO_WAIT and no message
502 *    FALSE - if thread is to block
503 *
504 *  NOTE: Dependent on BUFFER_LENGTH
505 *
506 *  INTERRUPT LATENCY:
507 *    available
508 *    wait
509 */
510
511boolean _Message_queue_Seize(
512  Message_queue_Control  *the_message_queue,
513  rtems_option            option_set,
514  Message_queue_Buffer   *buffer
515)
516{
517  ISR_Level                     level;
518  Message_queue_Buffer_control *the_message;
519  Thread_Control               *executing;
520
521  executing = _Thread_Executing;
522  executing->Wait.return_code = RTEMS_SUCCESSFUL;
523  _ISR_Disable( level );
524  if ( the_message_queue->number_of_pending_messages != 0 ) {
525    the_message_queue->number_of_pending_messages -= 1;
526
527    the_message = _Message_queue_Get_pending_message( the_message_queue );
528    _ISR_Enable( level );
529    _Message_queue_Copy_buffer( &the_message->Contents, buffer );
530    _Message_queue_Free_message_buffer( the_message );
531    return( TRUE );
532  }
533
534  if ( _Options_Is_no_wait( option_set ) ) {
535    _ISR_Enable( level );
536    executing->Wait.return_code = RTEMS_UNSATISFIED;
537    return( TRUE );
538  }
539
540  the_message_queue->Wait_queue.sync = TRUE;
541  executing->Wait.queue              = &the_message_queue->Wait_queue;
542  executing->Wait.id                 = the_message_queue->Object.id;
543  executing->Wait.option_set         = option_set;
544  executing->Wait.return_argument    = (unsigned32 *)buffer;
545  _ISR_Enable( level );
546  return( FALSE );
547}
548
549/*PAGE
550 *
551 *  _Message_queue_Flush_support
552 *
553 *  This message manager routine removes all messages from a message queue
554 *  and returns them to the inactive message pool.
555 *
556 *  Input parameters:
557 *    the_message_queue - pointer to message queue
558 *
559 *  Output parameters:
560 *    returns - number of messages placed on inactive chain
561 *
562 *  INTERRUPT LATENCY:
563 *    only case
564 */
565
566unsigned32 _Message_queue_Flush_support(
567  Message_queue_Control *the_message_queue
568)
569{
570  ISR_Level          level;
571  Chain_Node *inactive_first;
572  Chain_Node *message_queue_first;
573  Chain_Node *message_queue_last;
574  unsigned32         count;
575
576  _ISR_Disable( level );
577    inactive_first      = _Message_queue_Inactive_messages.first;
578    message_queue_first = the_message_queue->Pending_messages.first;
579    message_queue_last  = the_message_queue->Pending_messages.last;
580
581    _Message_queue_Inactive_messages.first = message_queue_first;
582    message_queue_last->next               = inactive_first;
583    inactive_first->previous               = message_queue_last;
584    message_queue_first->previous          =
585               _Chain_Head( &_Message_queue_Inactive_messages );
586
587    _Chain_Initialize_empty( &the_message_queue->Pending_messages );
588
589    count = the_message_queue->number_of_pending_messages;
590    the_message_queue->number_of_pending_messages = 0;
591  _ISR_Enable( level );
592  return( count );
593}
594
595/*PAGE
596 *
597 *  _Message_queue_Submit
598 *
599 *  This routine implements the directives q_send and q_urgent.  It
600 *  processes a message that is to be submitted to the designated
601 *  message queue.  The message will either be processed as a send
602 *  send message which it will be inserted at the rear of the queue
603 *  or it will be processed as an urgent message which will be inserted
604 *  at the front of the queue.
605 *
606 *  Input parameters:
607 *    id          - pointer to message queue
608 *    the_buffer  - pointer to message buffer
609 *    submit_type - send or urgent message
610 *
611 *  Output parameters:
612 *    RTEMS_SUCCESSFUL - if successful
613 *    error code        - if unsuccessful
614 */
615
616rtems_status_code _Message_queue_Submit(
617  Objects_Id                  id,
618  Message_queue_Buffer       *buffer,
619  Message_queue_Submit_types  submit_type
620)
621{
622  register Message_queue_Control *the_message_queue;
623  Objects_Locations               location;
624  Thread_Control                 *the_thread;
625  Message_queue_Buffer_control   *the_message;
626
627  the_message_queue = _Message_queue_Get( id, &location );
628  switch ( location ) {
629    case OBJECTS_ERROR:
630      return( RTEMS_INVALID_ID );
631    case OBJECTS_REMOTE:
632      switch ( submit_type ) {
633        case MESSAGE_QUEUE_SEND_REQUEST:
634          return
635            _Message_queue_MP_Send_request_packet(
636              MESSAGE_QUEUE_MP_SEND_REQUEST,
637              id,
638              buffer,
639              0,                               /* Not used */
640              MPCI_DEFAULT_TIMEOUT
641            );
642
643        case MESSAGE_QUEUE_URGENT_REQUEST:
644          return
645            _Message_queue_MP_Send_request_packet(
646              MESSAGE_QUEUE_MP_URGENT_REQUEST,
647              id,
648              buffer,
649              0,                               /* Not used */
650              MPCI_DEFAULT_TIMEOUT
651            );
652      }
653    case OBJECTS_LOCAL:
654      the_thread = _Thread_queue_Dequeue( &the_message_queue->Wait_queue );
655
656      if ( the_thread ) {
657
658        _Message_queue_Copy_buffer(
659          buffer,
660          the_thread->Wait.return_argument
661        );
662
663        if ( !_Objects_Is_local_id( the_thread->Object.id ) ) {
664          the_thread->receive_packet->return_code = RTEMS_SUCCESSFUL;
665
666          _Message_queue_MP_Send_response_packet(
667            MESSAGE_QUEUE_MP_RECEIVE_RESPONSE,
668            id,
669            the_thread
670          );
671
672        }
673        _Thread_Enable_dispatch();
674        return( RTEMS_SUCCESSFUL );
675      }
676
677      if ( the_message_queue->number_of_pending_messages ==
678           the_message_queue->maximum_pending_messages ) {
679        _Thread_Enable_dispatch();
680        return( RTEMS_TOO_MANY );
681      }
682
683      the_message = _Message_queue_Allocate_message_buffer();
684
685      if ( !the_message ) {
686        _Thread_Enable_dispatch();
687        return( RTEMS_UNSATISFIED );
688      }
689
690      _Message_queue_Copy_buffer( buffer, &the_message->Contents );
691
692      the_message_queue->number_of_pending_messages += 1;
693
694      switch ( submit_type ) {
695        case MESSAGE_QUEUE_SEND_REQUEST:
696          _Message_queue_Append( the_message_queue, the_message );
697          break;
698        case MESSAGE_QUEUE_URGENT_REQUEST:
699          _Message_queue_Prepend( the_message_queue, the_message );
700          break;
701      }
702
703      _Thread_Enable_dispatch();
704      return( RTEMS_SUCCESSFUL );
705  }
706
707  return( RTEMS_INTERNAL_ERROR );   /* unreached - only to remove warnings */
708}
Note: See TracBrowser for help on using the repository browser.