source: rtems/cpukit/score/src/coremsg.c @ 5e9b32b

4.104.114.84.95
Last change on this file since 5e9b32b was 5e9b32b, checked in by Joel Sherrill <joel.sherrill@…>, on Sep 26, 1995 at 7:27:15 PM

posix support initially added

  • Property mode set to 100644
File size: 13.7 KB
Line 
1/*
2 *  CORE Message Queue Handler
3 *
4 *  DESCRIPTION:
5 *
6 *  This package is the implementation of the CORE Message Queue Handler.
7 *  This core object provides task synchronization and communication functions
8 *  via messages passed to queue objects.
9 *
10 *  COPYRIGHT (c) 1989, 1990, 1991, 1992, 1993, 1994.
11 *  On-Line Applications Research Corporation (OAR).
12 *  All rights assigned to U.S. Government, 1994.
13 *
14 *  This material may be reproduced by or for the U.S. Government pursuant
15 *  to the copyright license under the clause at DFARS 252.227-7013.  This
16 *  notice must appear in all copies of this file and its derivatives.
17 *
18 *  $Id$
19 */
20
21#include <rtems/system.h>
22#include <rtems/score/chain.h>
23#include <rtems/score/isr.h>
24#include <rtems/score/object.h>
25#include <rtems/score/coremsg.h>
26#include <rtems/score/states.h>
27#include <rtems/score/thread.h>
28#include <rtems/score/wkspace.h>
29#include <rtems/score/mpci.h>
30
31/*PAGE
32 *
33 *  _CORE_message_queue_Initialize
34 *
35 *  This routine initializes a newly created message queue based on the
36 *  specified data.
37 *
38 *  Input parameters:
39 *    the_message_queue            - the message queue to initialize
40 *    the_class                    - the API specific object class
41 *    the_message_queue_attributes - the message queue's attributes
42 *    maximum_pending_messages     - maximum message and reserved buffer count
43 *    maximum_message_size         - maximum size of each message
44 *    proxy_extract_callout        - remote extract support
45 *
46 *  Output parameters:
47 *    TRUE   - if the message queue is initialized
48 *    FALSE  - if the message queue is NOT initialized
49 */
50
51boolean _CORE_message_queue_Initialize(
52  CORE_message_queue_Control    *the_message_queue,
53  Objects_Classes                the_class,
54  CORE_message_queue_Attributes *the_message_queue_attributes,
55  unsigned32                     maximum_pending_messages,
56  unsigned32                     maximum_message_size,
57  Thread_queue_Extract_callout   proxy_extract_callout
58)
59{
60  unsigned32 message_buffering_required;
61  unsigned32 allocated_message_size;
62
63  the_message_queue->maximum_pending_messages   = maximum_pending_messages;
64  the_message_queue->number_of_pending_messages = 0;
65  the_message_queue->maximum_message_size       = maximum_message_size;
66  _CORE_message_queue_Set_notify( the_message_queue, NULL, NULL );
67 
68  /*
69   * round size up to multiple of a ptr for chain init
70   */
71 
72  allocated_message_size = maximum_message_size;
73  if (allocated_message_size & (sizeof(unsigned32) - 1)) {
74      allocated_message_size += sizeof(unsigned32);
75      allocated_message_size &= ~(sizeof(unsigned32) - 1);
76  }
77   
78  message_buffering_required = maximum_pending_messages *
79       (allocated_message_size + sizeof(CORE_message_queue_Buffer_control));
80 
81  the_message_queue->message_buffers = (CORE_message_queue_Buffer *) 
82     _Workspace_Allocate( message_buffering_required );
83 
84  if (the_message_queue->message_buffers == 0)
85    return FALSE;
86 
87  _Chain_Initialize (
88    &the_message_queue->Inactive_messages,
89    the_message_queue->message_buffers,
90    maximum_pending_messages,
91    allocated_message_size + sizeof( CORE_message_queue_Buffer_control )
92  );
93 
94  _Chain_Initialize_empty( &the_message_queue->Pending_messages );
95 
96  _Thread_queue_Initialize(
97    &the_message_queue->Wait_queue,
98    the_class,
99    _CORE_message_queue_Is_priority( the_message_queue_attributes ) ?
100       THREAD_QUEUE_DISCIPLINE_PRIORITY : THREAD_QUEUE_DISCIPLINE_FIFO,
101    STATES_WAITING_FOR_MESSAGE,
102    proxy_extract_callout,
103    CORE_MESSAGE_QUEUE_STATUS_TIMEOUT
104  );
105
106  return TRUE;
107}
108
109/*PAGE
110 *
111 *  _CORE_message_queue_Close
112 *
113 *  This function closes a message by returning all allocated space and
114 *  flushing the message_queue's task wait queue.
115 *
116 *  Input parameters:
117 *    the_message_queue      - the message_queue to be flushed
118 *    remote_extract_callout - function to invoke remotely
119 *    status                 - status to pass to thread
120 *
121 *  Output parameters:  NONE
122 */
123 
124void _CORE_message_queue_Close(
125  CORE_message_queue_Control *the_message_queue,
126  Thread_queue_Flush_callout  remote_extract_callout,
127  unsigned32                  status
128)
129{
130 
131  if ( the_message_queue->number_of_pending_messages != 0 )
132        (void) _CORE_message_queue_Flush_support( the_message_queue );
133  else
134    _Thread_queue_Flush(
135      &the_message_queue->Wait_queue,
136      remote_extract_callout,
137      status
138    );
139
140  (void) _Workspace_Free( the_message_queue->message_buffers );
141
142}
143
144/*PAGE
145 *
146 *  _CORE_message_queue_Flush
147 *
148 *  This function flushes the message_queue's task wait queue.  The number
149 *  of messages flushed from the queue is returned.
150 *
151 *  Input parameters:
152 *    the_message_queue - the message_queue to be flushed
153 *
154 *  Output parameters:
155 *    returns - the number of messages flushed from the queue
156 */
157 
158unsigned32 _CORE_message_queue_Flush(
159  CORE_message_queue_Control *the_message_queue
160)
161{
162  if ( the_message_queue->number_of_pending_messages != 0 )
163    return _CORE_message_queue_Flush_support( the_message_queue );
164  else
165    return 0;
166}
167
168/*PAGE
169 *
170 *  _CORE_message_queue_Broadcast
171 *
172 *  This function sends a message for every thread waiting on the queue and
173 *  returns the number of threads made ready by the message.
174 *
175 *  Input parameters:
176 *    the_message_queue            - message is submitted to this message queue
177 *    buffer                       - pointer to message buffer
178 *    size                         - size in bytes of message to send
179 *    id                           - id of message queue
180 *    api_message_queue_mp_support - api specific mp support callout
181 *    count                        - area to store number of threads made ready
182 *
183 *  Output parameters:
184 *    count                         - number of threads made ready
185 *    CORE_MESSAGE_QUEUE_SUCCESSFUL - if successful
186 *    error code                    - if unsuccessful
187 */
188
189CORE_message_queue_Status _CORE_message_queue_Broadcast(
190  CORE_message_queue_Control                *the_message_queue,
191  void                                      *buffer,
192  unsigned32                                 size,
193  Objects_Id                                 id,
194  CORE_message_queue_API_mp_support_callout  api_message_queue_mp_support,
195  unsigned32                                *count
196)
197{
198  Thread_Control          *the_thread;
199  unsigned32               number_broadcasted;
200  Thread_Wait_information *waitp;
201  unsigned32               constrained_size;
202
203  number_broadcasted = 0;
204  while ((the_thread = _Thread_queue_Dequeue(&the_message_queue->Wait_queue))) {
205    waitp = &the_thread->Wait;
206    number_broadcasted += 1;
207
208    constrained_size = size;
209    if ( size > the_message_queue->maximum_message_size )
210        constrained_size = the_message_queue->maximum_message_size;
211
212    _CORE_message_queue_Copy_buffer(
213      buffer,
214      waitp->return_argument,
215      constrained_size
216    );
217
218    *(unsigned32 *)the_thread->Wait.return_argument_1 = size;
219
220    if ( !_Objects_Is_local_id( the_thread->Object.id ) )
221      (*api_message_queue_mp_support) ( the_thread, id );
222
223  }
224  *count = number_broadcasted;
225  return CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL;
226}
227
228/*PAGE
229 *
230 *  _CORE_message_queue_Seize
231 *
232 *  This kernel routine dequeues a message, copies the message buffer to
233 *  a given destination buffer, and frees the message buffer to the
234 *  inactive message pool.  The thread will be blocked if wait is TRUE,
235 *  otherwise an error will be given to the thread if no messages are available.
236 *
237 *  Input parameters:
238 *    the_message_queue - pointer to message queue
239 *    id                - id of object we are waitig on
240 *    buffer            - pointer to message buffer to be filled
241 *    size              - pointer to the size of buffer to be filled
242 *    wait              - TRUE if wait is allowed, FALSE otherwise
243 *    timeout           - time to wait for a message
244 *
245 *  Output parameters:  NONE
246 *
247 *  NOTE: Dependent on BUFFER_LENGTH
248 *
249 *  INTERRUPT LATENCY:
250 *    available
251 *    wait
252 */
253
254void _CORE_message_queue_Seize(
255  CORE_message_queue_Control *the_message_queue,
256  Objects_Id                  id,
257  void                       *buffer,
258  unsigned32                 *size,
259  boolean                     wait,
260  Watchdog_Interval           timeout
261)
262{
263  ISR_Level                          level;
264  CORE_message_queue_Buffer_control *the_message;
265  Thread_Control                    *executing;
266
267  executing = _Thread_Executing;
268  executing->Wait.return_code = CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL;
269  _ISR_Disable( level );
270  if ( the_message_queue->number_of_pending_messages != 0 ) {
271    the_message_queue->number_of_pending_messages -= 1;
272
273    the_message = _CORE_message_queue_Get_pending_message( the_message_queue );
274    _ISR_Enable( level );
275    *size = the_message->Contents.size;
276    _CORE_message_queue_Copy_buffer(the_message->Contents.buffer,buffer,*size );
277    _CORE_message_queue_Free_message_buffer(the_message_queue, the_message );
278    return;
279  }
280
281  if ( !wait ) {
282    _ISR_Enable( level );
283    executing->Wait.return_code = CORE_MESSAGE_QUEUE_STATUS_UNSATISFIED_NOWAIT;
284    return;
285  }
286
287  the_message_queue->Wait_queue.sync = TRUE;
288  executing->Wait.queue              = &the_message_queue->Wait_queue;
289  executing->Wait.id                 = id;
290  executing->Wait.return_argument    = (void *)buffer;
291  executing->Wait.return_argument_1  = (void *)size;
292  _ISR_Enable( level );
293
294  _Thread_queue_Enqueue( &the_message_queue->Wait_queue, timeout );
295}
296
297/*PAGE
298 *
299 *  _CORE_message_queue_Flush_support
300 *
301 *  This message handler routine removes all messages from a message queue
302 *  and returns them to the inactive message pool.  The number of messages
303 *  flushed from the queue is returned
304 *
305 *  Input parameters:
306 *    the_message_queue - pointer to message queue
307 *
308 *  Output parameters:
309 *    returns - number of messages placed on inactive chain
310 *
311 *  INTERRUPT LATENCY:
312 *    only case
313 */
314
315unsigned32 _CORE_message_queue_Flush_support(
316  CORE_message_queue_Control *the_message_queue
317)
318{
319  ISR_Level   level;
320  Chain_Node *inactive_first;
321  Chain_Node *message_queue_first;
322  Chain_Node *message_queue_last;
323  unsigned32  count;
324
325  _ISR_Disable( level );
326    inactive_first      = the_message_queue->Inactive_messages.first;
327    message_queue_first = the_message_queue->Pending_messages.first;
328    message_queue_last  = the_message_queue->Pending_messages.last;
329
330    the_message_queue->Inactive_messages.first = message_queue_first;
331    message_queue_last->next = inactive_first;
332    inactive_first->previous = message_queue_last;
333    message_queue_first->previous          =
334               _Chain_Head( &the_message_queue->Inactive_messages );
335
336    _Chain_Initialize_empty( &the_message_queue->Pending_messages );
337
338    count = the_message_queue->number_of_pending_messages;
339    the_message_queue->number_of_pending_messages = 0;
340  _ISR_Enable( level );
341  return count;
342}
343
344/*PAGE
345 *
346 *  _CORE_message_queue_Submit
347 *
348 *  This routine implements the send and urgent message functions. It
349 *  processes a message that is to be submitted to the designated
350 *  message queue.  The message will either be processed as a
351 *  send message which it will be inserted at the rear of the queue
352 *  or it will be processed as an urgent message which will be inserted
353 *  at the front of the queue.
354 *
355 *  Input parameters:
356 *    the_message_queue            - message is submitted to this message queue
357 *    buffer                       - pointer to message buffer
358 *    size                         - size in bytes of message to send
359 *    id                           - id of message queue
360 *    api_message_queue_mp_support - api specific mp support callout
361 *    submit_type                  - send or urgent message
362 *
363 *  Output parameters:
364 *    CORE_MESSAGE_QUEUE_SUCCESSFUL - if successful
365 *    error code                    - if unsuccessful
366 */
367
368CORE_message_queue_Status _CORE_message_queue_Submit(
369  CORE_message_queue_Control                *the_message_queue,
370  void                                      *buffer,
371  unsigned32                                 size,
372  Objects_Id                                 id,
373  CORE_message_queue_API_mp_support_callout  api_message_queue_mp_support,
374  CORE_message_queue_Submit_types            submit_type
375)
376{
377  CORE_message_queue_Buffer_control   *the_message;
378  Thread_Control                      *the_thread;
379
380  if ( size > the_message_queue->maximum_message_size )
381    return CORE_MESSAGE_QUEUE_STATUS_INVALID_SIZE;
382
383  /*
384   * Is there a thread currently waiting on this message queue?
385   */
386     
387  the_thread = _Thread_queue_Dequeue( &the_message_queue->Wait_queue );
388  if ( the_thread )
389  {
390    _CORE_message_queue_Copy_buffer(
391      buffer,
392      the_thread->Wait.return_argument,
393      size
394    );
395    *(unsigned32 *)the_thread->Wait.return_argument_1 = size;
396   
397    if ( !_Objects_Is_local_id( the_thread->Object.id ) )
398      (*api_message_queue_mp_support) ( the_thread, id );
399
400    return CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL;
401  }
402
403  /*
404   * No one waiting on this one currently.
405   * Allocate a message buffer and store it away
406   */
407
408  if ( the_message_queue->number_of_pending_messages ==
409       the_message_queue->maximum_pending_messages ) {
410    return CORE_MESSAGE_QUEUE_STATUS_TOO_MANY;
411  }
412
413  the_message = _CORE_message_queue_Allocate_message_buffer(the_message_queue);
414  if ( the_message == 0)
415    return CORE_MESSAGE_QUEUE_STATUS_UNSATISFIED;
416
417  _CORE_message_queue_Copy_buffer( buffer, the_message->Contents.buffer, size );
418  the_message->Contents.size = size;
419 
420  the_message_queue->number_of_pending_messages += 1;
421
422  switch ( submit_type ) {
423    case CORE_MESSAGE_QUEUE_SEND_REQUEST:
424      _CORE_message_queue_Append( the_message_queue, the_message );
425      break;
426    case CORE_MESSAGE_QUEUE_URGENT_REQUEST:
427      _CORE_message_queue_Prepend( the_message_queue, the_message );
428      break;
429  }
430
431  return CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL;
432}
Note: See TracBrowser for help on using the repository browser.