source: rtems/c/src/exec/score/src/coremsg.c @ be650a84

4.104.114.84.95
Last change on this file since be650a84 was be650a84, checked in by Joel Sherrill <joel.sherrill@…>, on 09/21/95 at 16:22:25

moving files around

  • Property mode set to 100644
File size: 13.6 KB
Line 
1/*
2 *  CORE Message Queue Handler
3 *
4 *  DESCRIPTION:
5 *
6 *  This package is the implementation of the CORE Message Queue Handler.
7 *  This core object provides task synchronization and communication functions
8 *  via messages passed to queue objects.
9 *
10 *  COPYRIGHT (c) 1989, 1990, 1991, 1992, 1993, 1994.
11 *  On-Line Applications Research Corporation (OAR).
12 *  All rights assigned to U.S. Government, 1994.
13 *
14 *  This material may be reproduced by or for the U.S. Government pursuant
15 *  to the copyright license under the clause at DFARS 252.227-7013.  This
16 *  notice must appear in all copies of this file and its derivatives.
17 *
18 *  $Id$
19 */
20
21#include <rtems/system.h>
22#include <rtems/core/chain.h>
23#include <rtems/core/isr.h>
24#include <rtems/core/object.h>
25#include <rtems/core/coremsg.h>
26#include <rtems/core/states.h>
27#include <rtems/core/thread.h>
28#include <rtems/core/wkspace.h>
29#include <rtems/core/mpci.h>
30
31/*PAGE
32 *
33 *  _CORE_message_queue_Initialize
34 *
35 *  This routine initializes a newly created message queue based on the
36 *  specified data.
37 *
38 *  Input parameters:
39 *    the_message_queue            - the message queue to initialize
40 *    the_class                    - the API specific object class
41 *    the_message_queue_attributes - the message queue's attributes
42 *    maximum_pending_messages     - maximum message and reserved buffer count
43 *    maximum_message_size         - maximum size of each message
44 *    proxy_extract_callout        - remote extract support
45 *
46 *  Output parameters:
47 *    TRUE   - if the message queue is initialized
48 *    FALSE  - if the message queue is NOT initialized
49 */
50
51boolean _CORE_message_queue_Initialize(
52  CORE_message_queue_Control    *the_message_queue,
53  Objects_Classes                the_class,
54  CORE_message_queue_Attributes *the_message_queue_attributes,
55  unsigned32                     maximum_pending_messages,
56  unsigned32                     maximum_message_size,
57  Thread_queue_Extract_callout   proxy_extract_callout
58)
59{
60  unsigned32 message_buffering_required;
61  unsigned32 allocated_message_size;
62
63  the_message_queue->maximum_pending_messages   = maximum_pending_messages;
64  the_message_queue->number_of_pending_messages = 0;
65  the_message_queue->maximum_message_size       = maximum_message_size;
66 
67  /*
68   * round size up to multiple of a ptr for chain init
69   */
70 
71  allocated_message_size = maximum_message_size;
72  if (allocated_message_size & (sizeof(unsigned32) - 1)) {
73      allocated_message_size += sizeof(unsigned32);
74      allocated_message_size &= ~(sizeof(unsigned32) - 1);
75  }
76   
77  message_buffering_required = maximum_pending_messages *
78       (allocated_message_size + sizeof(CORE_message_queue_Buffer_control));
79 
80  the_message_queue->message_buffers = (CORE_message_queue_Buffer *)
81     _Workspace_Allocate( message_buffering_required );
82 
83  if (the_message_queue->message_buffers == 0)
84    return FALSE;
85 
86  _Chain_Initialize (
87    &the_message_queue->Inactive_messages,
88    the_message_queue->message_buffers,
89    maximum_pending_messages,
90    allocated_message_size + sizeof( CORE_message_queue_Buffer_control )
91  );
92 
93  _Chain_Initialize_empty( &the_message_queue->Pending_messages );
94 
95  _Thread_queue_Initialize(
96    &the_message_queue->Wait_queue,
97    the_class,
98    _CORE_message_queue_Is_priority( the_message_queue_attributes ) ?
99       THREAD_QUEUE_DISCIPLINE_PRIORITY : THREAD_QUEUE_DISCIPLINE_FIFO,
100    STATES_WAITING_FOR_MESSAGE,
101    proxy_extract_callout,
102    CORE_MESSAGE_QUEUE_STATUS_TIMEOUT
103  );
104
105  return TRUE;
106}
107
108/*PAGE
109 *
110 *  _CORE_message_queue_Close
111 *
112 *  This function closes a message by returning all allocated space and
113 *  flushing the message_queue's task wait queue.
114 *
115 *  Input parameters:
116 *    the_message_queue      - the message_queue to be flushed
117 *    remote_extract_callout - function to invoke remotely
118 *    status                 - status to pass to thread
119 *
120 *  Output parameters:  NONE
121 */
122 
123void _CORE_message_queue_Close(
124  CORE_message_queue_Control *the_message_queue,
125  Thread_queue_Flush_callout  remote_extract_callout,
126  unsigned32                  status
127)
128{
129 
130  if ( the_message_queue->number_of_pending_messages != 0 )
131        (void) _CORE_message_queue_Flush_support( the_message_queue );
132  else
133    _Thread_queue_Flush(
134      &the_message_queue->Wait_queue,
135      remote_extract_callout,
136      status
137    );
138
139  (void) _Workspace_Free( the_message_queue->message_buffers );
140
141}
142
143/*PAGE
144 *
145 *  _CORE_message_queue_Flush
146 *
147 *  This function flushes the message_queue's task wait queue.  The number
148 *  of messages flushed from the queue is returned.
149 *
150 *  Input parameters:
151 *    the_message_queue - the message_queue to be flushed
152 *
153 *  Output parameters:
154 *    returns - the number of messages flushed from the queue
155 */
156 
157unsigned32 _CORE_message_queue_Flush(
158  CORE_message_queue_Control *the_message_queue
159)
160{
161  if ( the_message_queue->number_of_pending_messages != 0 )
162    return _CORE_message_queue_Flush_support( the_message_queue );
163  else
164    return 0;
165}
166
167/*PAGE
168 *
169 *  _CORE_message_queue_Broadcast
170 *
171 *  This function sends a message for every thread waiting on the queue and
172 *  returns the number of threads made ready by the message.
173 *
174 *  Input parameters:
175 *    the_message_queue            - message is submitted to this message queue
176 *    buffer                       - pointer to message buffer
177 *    size                         - size in bytes of message to send
178 *    id                           - id of message queue
179 *    api_message_queue_mp_support - api specific mp support callout
180 *    count                        - area to store number of threads made ready
181 *
182 *  Output parameters:
183 *    count                         - number of threads made ready
184 *    CORE_MESSAGE_QUEUE_SUCCESSFUL - if successful
185 *    error code                    - if unsuccessful
186 */
187
188CORE_message_queue_Status _CORE_message_queue_Broadcast(
189  CORE_message_queue_Control                *the_message_queue,
190  void                                      *buffer,
191  unsigned32                                 size,
192  Objects_Id                                 id,
193  CORE_message_queue_API_mp_support_callout  api_message_queue_mp_support,
194  unsigned32                                *count
195)
196{
197  Thread_Control          *the_thread;
198  unsigned32               number_broadcasted;
199  Thread_Wait_information *waitp;
200  unsigned32               constrained_size;
201
202  number_broadcasted = 0;
203  while ((the_thread = _Thread_queue_Dequeue(&the_message_queue->Wait_queue))) {
204    waitp = &the_thread->Wait;
205    number_broadcasted += 1;
206
207    constrained_size = size;
208    if ( size > the_message_queue->maximum_message_size )
209        constrained_size = the_message_queue->maximum_message_size;
210
211    _CORE_message_queue_Copy_buffer(
212      buffer,
213      waitp->return_argument,
214      constrained_size
215    );
216
217    *(unsigned32 *)the_thread->Wait.return_argument_1 = size;
218
219    if ( !_Objects_Is_local_id( the_thread->Object.id ) )
220      (*api_message_queue_mp_support) ( the_thread, id );
221
222  }
223  *count = number_broadcasted;
224  return CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL;
225}
226
227/*PAGE
228 *
229 *  _CORE_message_queue_Seize
230 *
231 *  This kernel routine dequeues a message, copies the message buffer to
232 *  a given destination buffer, and frees the message buffer to the
233 *  inactive message pool.  The thread will be blocked if wait is TRUE,
234 *  otherwise an error will be given to the thread if no messages are available.
235 *
236 *  Input parameters:
237 *    the_message_queue - pointer to message queue
238 *    id                - id of object we are waitig on
239 *    buffer            - pointer to message buffer to be filled
240 *    size              - pointer to the size of buffer to be filled
241 *    wait              - TRUE if wait is allowed, FALSE otherwise
242 *    timeout           - time to wait for a message
243 *
244 *  Output parameters:  NONE
245 *
246 *  NOTE: Dependent on BUFFER_LENGTH
247 *
248 *  INTERRUPT LATENCY:
249 *    available
250 *    wait
251 */
252
253void _CORE_message_queue_Seize(
254  CORE_message_queue_Control *the_message_queue,
255  Objects_Id                  id,
256  void                       *buffer,
257  unsigned32                 *size,
258  boolean                     wait,
259  Watchdog_Interval           timeout
260)
261{
262  ISR_Level                          level;
263  CORE_message_queue_Buffer_control *the_message;
264  Thread_Control                    *executing;
265
266  executing = _Thread_Executing;
267  executing->Wait.return_code = CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL;
268  _ISR_Disable( level );
269  if ( the_message_queue->number_of_pending_messages != 0 ) {
270    the_message_queue->number_of_pending_messages -= 1;
271
272    the_message = _CORE_message_queue_Get_pending_message( the_message_queue );
273    _ISR_Enable( level );
274    *size = the_message->Contents.size;
275    _CORE_message_queue_Copy_buffer(the_message->Contents.buffer,buffer,*size );
276    _CORE_message_queue_Free_message_buffer(the_message_queue, the_message );
277    return;
278  }
279
280  if ( !wait ) {
281    _ISR_Enable( level );
282    executing->Wait.return_code = CORE_MESSAGE_QUEUE_STATUS_UNSATISFIED_NOWAIT;
283    return;
284  }
285
286  the_message_queue->Wait_queue.sync = TRUE;
287  executing->Wait.queue              = &the_message_queue->Wait_queue;
288  executing->Wait.id                 = id;
289  executing->Wait.return_argument    = (void *)buffer;
290  executing->Wait.return_argument_1  = (void *)size;
291  _ISR_Enable( level );
292
293  _Thread_queue_Enqueue( &the_message_queue->Wait_queue, timeout );
294}
295
296/*PAGE
297 *
298 *  _CORE_message_queue_Flush_support
299 *
300 *  This message handler routine removes all messages from a message queue
301 *  and returns them to the inactive message pool.  The number of messages
302 *  flushed from the queue is returned
303 *
304 *  Input parameters:
305 *    the_message_queue - pointer to message queue
306 *
307 *  Output parameters:
308 *    returns - number of messages placed on inactive chain
309 *
310 *  INTERRUPT LATENCY:
311 *    only case
312 */
313
314unsigned32 _CORE_message_queue_Flush_support(
315  CORE_message_queue_Control *the_message_queue
316)
317{
318  ISR_Level   level;
319  Chain_Node *inactive_first;
320  Chain_Node *message_queue_first;
321  Chain_Node *message_queue_last;
322  unsigned32  count;
323
324  _ISR_Disable( level );
325    inactive_first      = the_message_queue->Inactive_messages.first;
326    message_queue_first = the_message_queue->Pending_messages.first;
327    message_queue_last  = the_message_queue->Pending_messages.last;
328
329    the_message_queue->Inactive_messages.first = message_queue_first;
330    message_queue_last->next = inactive_first;
331    inactive_first->previous = message_queue_last;
332    message_queue_first->previous          =
333               _Chain_Head( &the_message_queue->Inactive_messages );
334
335    _Chain_Initialize_empty( &the_message_queue->Pending_messages );
336
337    count = the_message_queue->number_of_pending_messages;
338    the_message_queue->number_of_pending_messages = 0;
339  _ISR_Enable( level );
340  return count;
341}
342
343/*PAGE
344 *
345 *  _CORE_message_queue_Submit
346 *
347 *  This routine implements the send and urgent message functions. It
348 *  processes a message that is to be submitted to the designated
349 *  message queue.  The message will either be processed as a
350 *  send message which it will be inserted at the rear of the queue
351 *  or it will be processed as an urgent message which will be inserted
352 *  at the front of the queue.
353 *
354 *  Input parameters:
355 *    the_message_queue            - message is submitted to this message queue
356 *    buffer                       - pointer to message buffer
357 *    size                         - size in bytes of message to send
358 *    id                           - id of message queue
359 *    api_message_queue_mp_support - api specific mp support callout
360 *    submit_type                  - send or urgent message
361 *
362 *  Output parameters:
363 *    CORE_MESSAGE_QUEUE_SUCCESSFUL - if successful
364 *    error code                    - if unsuccessful
365 */
366
367CORE_message_queue_Status _CORE_message_queue_Submit(
368  CORE_message_queue_Control                *the_message_queue,
369  void                                      *buffer,
370  unsigned32                                 size,
371  Objects_Id                                 id,
372  CORE_message_queue_API_mp_support_callout  api_message_queue_mp_support,
373  CORE_message_queue_Submit_types            submit_type
374)
375{
376  CORE_message_queue_Buffer_control   *the_message;
377  Thread_Control                      *the_thread;
378
379  if ( size > the_message_queue->maximum_message_size )
380    return CORE_MESSAGE_QUEUE_STATUS_INVALID_SIZE;
381
382  /*
383   * Is there a thread currently waiting on this message queue?
384   */
385     
386  the_thread = _Thread_queue_Dequeue( &the_message_queue->Wait_queue );
387  if ( the_thread )
388  {
389    _CORE_message_queue_Copy_buffer(
390      buffer,
391      the_thread->Wait.return_argument,
392      size
393    );
394    *(unsigned32 *)the_thread->Wait.return_argument_1 = size;
395   
396    if ( !_Objects_Is_local_id( the_thread->Object.id ) )
397      (*api_message_queue_mp_support) ( the_thread, id );
398
399    return CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL;
400  }
401
402  /*
403   * No one waiting on this one currently.
404   * Allocate a message buffer and store it away
405   */
406
407  if ( the_message_queue->number_of_pending_messages ==
408       the_message_queue->maximum_pending_messages ) {
409    return CORE_MESSAGE_QUEUE_STATUS_TOO_MANY;
410  }
411
412  the_message = _CORE_message_queue_Allocate_message_buffer(the_message_queue);
413  if ( the_message == 0)
414    return CORE_MESSAGE_QUEUE_STATUS_UNSATISFIED;
415
416  _CORE_message_queue_Copy_buffer( buffer, the_message->Contents.buffer, size );
417  the_message->Contents.size = size;
418 
419  the_message_queue->number_of_pending_messages += 1;
420
421  switch ( submit_type ) {
422    case CORE_MESSAGE_QUEUE_SEND_REQUEST:
423      _CORE_message_queue_Append( the_message_queue, the_message );
424      break;
425    case CORE_MESSAGE_QUEUE_URGENT_REQUEST:
426      _CORE_message_queue_Prepend( the_message_queue, the_message );
427      break;
428  }
429
430  return CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL;
431}
Note: See TracBrowser for help on using the repository browser.