source: rtems/cpukit/rtems/src/msgmp.c @ 52a0641

4.104.114.84.95
Last change on this file since 52a0641 was 5e9b32b, checked in by Joel Sherrill <joel.sherrill@…>, on 09/26/95 at 19:27:15

posix support initially added

  • Property mode set to 100644
File size: 12.3 KB
Line 
1/*
2 *  Multiprocessing Support for the Message Queue Manager
3 *
4 *
5 *  COPYRIGHT (c) 1989, 1990, 1991, 1992, 1993, 1994.
6 *  On-Line Applications Research Corporation (OAR).
7 *  All rights assigned to U.S. Government, 1994.
8 *
9 *  This material may be reproduced by or for the U.S. Government pursuant
10 *  to the copyright license under the clause at DFARS 252.227-7013.  This
11 *  notice must appear in all copies of this file and its derivatives.
12 *
13 *  $Id$
14 */
15
16#include <rtems/system.h>
17#include <rtems/rtems/status.h>
18#include <rtems/rtems/message.h>
19#include <rtems/score/mpci.h>
20#include <rtems/rtems/msgmp.h>
21#include <rtems/score/object.h>
22#include <rtems/rtems/options.h>
23#include <rtems/score/thread.h>
24#include <rtems/score/watchdog.h>
25#include <rtems/rtems/support.h>
26
27/*PAGE
28 *
29 *  _Message_queue_MP_Send_process_packet
30 *
31 */
32
33void _Message_queue_MP_Send_process_packet (
34  Message_queue_MP_Remote_operations  operation,
35  Objects_Id                          message_queue_id,
36  rtems_name                          name,
37  Objects_Id                          proxy_id
38)
39{
40  Message_queue_MP_Packet *the_packet;
41  unsigned32               node;
42
43  switch ( operation ) {
44
45    case MESSAGE_QUEUE_MP_ANNOUNCE_CREATE:
46    case MESSAGE_QUEUE_MP_ANNOUNCE_DELETE:
47    case MESSAGE_QUEUE_MP_EXTRACT_PROXY:
48
49      the_packet                    = _Message_queue_MP_Get_packet();
50      the_packet->Prefix.the_class  = MP_PACKET_MESSAGE_QUEUE;
51      the_packet->Prefix.length     = sizeof ( Message_queue_MP_Packet );
52      the_packet->Prefix.to_convert = sizeof ( Message_queue_MP_Packet );
53      the_packet->operation         = operation;
54      the_packet->Prefix.id         = message_queue_id;
55      the_packet->name              = name;
56      the_packet->proxy_id          = proxy_id;
57
58      if ( operation == MESSAGE_QUEUE_MP_EXTRACT_PROXY )
59         node = rtems_get_node( message_queue_id );
60      else
61         node = MPCI_ALL_NODES;
62
63      _MPCI_Send_process_packet( node, &the_packet->Prefix );
64      break;
65
66    case MESSAGE_QUEUE_MP_RECEIVE_REQUEST:
67    case MESSAGE_QUEUE_MP_RECEIVE_RESPONSE:
68    case MESSAGE_QUEUE_MP_SEND_REQUEST:
69    case MESSAGE_QUEUE_MP_SEND_RESPONSE:
70    case MESSAGE_QUEUE_MP_URGENT_REQUEST:
71    case MESSAGE_QUEUE_MP_URGENT_RESPONSE:
72    case MESSAGE_QUEUE_MP_BROADCAST_REQUEST:
73    case MESSAGE_QUEUE_MP_BROADCAST_RESPONSE:
74    case MESSAGE_QUEUE_MP_FLUSH_REQUEST:
75    case MESSAGE_QUEUE_MP_FLUSH_RESPONSE:
76      break;
77
78  }
79}
80
81/*PAGE
82 *
83 *  _Message_queue_MP_Send_request_packet
84 *
85 */
86
87rtems_status_code _Message_queue_MP_Send_request_packet (
88  Message_queue_MP_Remote_operations  operation,
89  Objects_Id                          message_queue_id,
90  void                               *buffer,
91  unsigned32                         *size_p,
92  rtems_option                        option_set,
93  rtems_interval                      timeout
94)
95{
96  Message_queue_MP_Packet *the_packet;
97
98  switch ( operation ) {
99
100    case MESSAGE_QUEUE_MP_SEND_REQUEST:
101    case MESSAGE_QUEUE_MP_URGENT_REQUEST:
102    case MESSAGE_QUEUE_MP_BROADCAST_REQUEST:
103    case MESSAGE_QUEUE_MP_FLUSH_REQUEST:
104
105      the_packet                    = _Message_queue_MP_Get_packet();
106      the_packet->Prefix.the_class  = MP_PACKET_MESSAGE_QUEUE;
107      the_packet->Prefix.length     = sizeof(Message_queue_MP_Packet);
108      if ( size_p )
109        the_packet->Prefix.length     += *size_p;
110      the_packet->Prefix.to_convert = sizeof(Message_queue_MP_Packet);
111
112      /*
113       * make sure message is not too big for our MPCI driver
114       * We have to check it here instead of waiting for MPCI because
115       * we are about to slam in the payload
116       */
117
118      if (the_packet->Prefix.length > _MPCI_table->maximum_packet_size) {
119          _Thread_Enable_dispatch();
120          return RTEMS_INVALID_SIZE;
121      }
122
123      if ( ! _Options_Is_no_wait(option_set))
124          the_packet->Prefix.timeout = timeout;
125
126      the_packet->operation  = operation;
127      the_packet->Prefix.id  = message_queue_id;
128      the_packet->option_set = option_set;
129
130      /*
131       * Copy the data into place if needed
132       */
133     
134      if (buffer) {
135          the_packet->Buffer.size = *size_p;
136          _CORE_message_queue_Copy_buffer(
137            buffer,
138            the_packet->Buffer.buffer,
139            *size_p
140          );
141      }
142
143      return _MPCI_Send_request_packet(rtems_get_node(message_queue_id),
144                                       &the_packet->Prefix,
145                                       STATES_WAITING_FOR_MESSAGE);
146      break;
147
148    case MESSAGE_QUEUE_MP_RECEIVE_REQUEST:
149
150      the_packet                    = _Message_queue_MP_Get_packet();
151      the_packet->Prefix.the_class  = MP_PACKET_MESSAGE_QUEUE;
152      the_packet->Prefix.length     = sizeof(Message_queue_MP_Packet);
153      the_packet->Prefix.to_convert = sizeof(Message_queue_MP_Packet);
154
155      if ( ! _Options_Is_no_wait(option_set))
156          the_packet->Prefix.timeout = timeout;
157
158      the_packet->operation  = MESSAGE_QUEUE_MP_RECEIVE_REQUEST;
159      the_packet->Prefix.id  = message_queue_id;
160      the_packet->option_set = option_set;
161      the_packet->size       = 0;        /* just in case of an error */
162
163      _Thread_Executing->Wait.return_argument   = (unsigned32 *)buffer;
164      _Thread_Executing->Wait.return_argument_1 = size_p;
165     
166      return _MPCI_Send_request_packet(rtems_get_node(message_queue_id),
167                                       &the_packet->Prefix,
168                                       STATES_WAITING_FOR_MESSAGE);
169      break;
170
171    case MESSAGE_QUEUE_MP_ANNOUNCE_CREATE:
172    case MESSAGE_QUEUE_MP_ANNOUNCE_DELETE:
173    case MESSAGE_QUEUE_MP_EXTRACT_PROXY:
174    case MESSAGE_QUEUE_MP_RECEIVE_RESPONSE:
175    case MESSAGE_QUEUE_MP_SEND_RESPONSE:
176    case MESSAGE_QUEUE_MP_URGENT_RESPONSE:
177    case MESSAGE_QUEUE_MP_BROADCAST_RESPONSE:
178    case MESSAGE_QUEUE_MP_FLUSH_RESPONSE:
179      break;
180  }
181
182  return RTEMS_SUCCESSFUL;
183}
184
185/*PAGE
186 *
187 *  _Message_queue_MP_Send_response_packet
188 *
189 */
190
191void _Message_queue_MP_Send_response_packet (
192  Message_queue_MP_Remote_operations  operation,
193  Objects_Id                          message_queue_id,
194  Thread_Control                     *the_thread
195)
196{
197  Message_queue_MP_Packet *the_packet;
198
199  switch ( operation ) {
200
201    case MESSAGE_QUEUE_MP_RECEIVE_RESPONSE:
202    case MESSAGE_QUEUE_MP_SEND_RESPONSE:
203    case MESSAGE_QUEUE_MP_URGENT_RESPONSE:
204    case MESSAGE_QUEUE_MP_BROADCAST_RESPONSE:
205    case MESSAGE_QUEUE_MP_FLUSH_RESPONSE:
206
207      the_packet = ( Message_queue_MP_Packet *) the_thread->receive_packet;
208
209/*
210 *  The packet being returned already contains the class, length, and
211 *  to_convert fields, therefore they are not set in this routine.
212 *
213 *  Exception: MESSAGE_QUEUE_MP_RECEIVE_RESPONSE needs payload length
214 *             added to 'length'
215 */
216      the_packet->operation = operation;
217      the_packet->Prefix.id = the_packet->Prefix.source_tid;
218
219      if (operation == MESSAGE_QUEUE_MP_RECEIVE_RESPONSE)
220          the_packet->Prefix.length += the_packet->size;
221     
222      _MPCI_Send_response_packet(
223        rtems_get_node( the_packet->Prefix.source_tid ),
224        &the_packet->Prefix
225      );
226      break;
227
228    case MESSAGE_QUEUE_MP_ANNOUNCE_CREATE:
229    case MESSAGE_QUEUE_MP_ANNOUNCE_DELETE:
230    case MESSAGE_QUEUE_MP_EXTRACT_PROXY:
231    case MESSAGE_QUEUE_MP_RECEIVE_REQUEST:
232    case MESSAGE_QUEUE_MP_SEND_REQUEST:
233    case MESSAGE_QUEUE_MP_URGENT_REQUEST:
234    case MESSAGE_QUEUE_MP_BROADCAST_REQUEST:
235    case MESSAGE_QUEUE_MP_FLUSH_REQUEST:
236      break;
237
238  }
239}
240
241/*PAGE
242 *
243 *
244 *  _Message_queue_MP_Process_packet
245 *
246 */
247
248void _Message_queue_MP_Process_packet (
249  rtems_packet_prefix   *the_packet_prefix
250)
251{
252  Message_queue_MP_Packet *the_packet;
253  Thread_Control          *the_thread;
254  boolean                  ignored;
255
256  the_packet = (Message_queue_MP_Packet *) the_packet_prefix;
257
258  switch ( the_packet->operation ) {
259
260    case MESSAGE_QUEUE_MP_ANNOUNCE_CREATE:
261
262      ignored = _Objects_MP_Allocate_and_open(
263                  &_Message_queue_Information,
264                  the_packet->name,
265                  the_packet->Prefix.id,
266                  TRUE
267                );
268
269      _MPCI_Return_packet( the_packet_prefix );
270      break;
271
272    case MESSAGE_QUEUE_MP_ANNOUNCE_DELETE:
273
274      _Objects_MP_Close( &_Message_queue_Information, the_packet->Prefix.id );
275
276      _MPCI_Return_packet( the_packet_prefix );
277      break;
278
279    case MESSAGE_QUEUE_MP_EXTRACT_PROXY:
280
281      the_thread = _Thread_MP_Find_proxy( the_packet->proxy_id );
282
283      if ( ! _Thread_Is_null( the_thread ) )
284         _Thread_queue_Extract( the_thread->Wait.queue, the_thread );
285
286      _MPCI_Return_packet( the_packet_prefix );
287      break;
288
289    case MESSAGE_QUEUE_MP_RECEIVE_REQUEST:
290
291      the_packet->Prefix.return_code = rtems_message_queue_receive(
292        the_packet->Prefix.id,
293        the_packet->Buffer.buffer,
294        &the_packet->size,
295        the_packet->option_set,
296        the_packet->Prefix.timeout
297      );
298
299      if ( ! _Thread_Is_proxy_blocking( the_packet->Prefix.return_code ) )
300        _Message_queue_MP_Send_response_packet(
301          MESSAGE_QUEUE_MP_RECEIVE_RESPONSE,
302          the_packet->Prefix.id,
303          _Thread_Executing
304        );
305      break;
306
307    case MESSAGE_QUEUE_MP_RECEIVE_RESPONSE:
308
309      the_thread = _MPCI_Process_response( the_packet_prefix );
310
311      if (the_packet->Prefix.return_code == RTEMS_SUCCESSFUL) {
312        *(rtems_unsigned32 *)the_thread->Wait.return_argument_1 =
313           the_packet->size;
314
315        _CORE_message_queue_Copy_buffer(
316          the_packet->Buffer.buffer,
317          the_thread->Wait.return_argument,
318          the_packet->size
319        );
320      }
321
322      _MPCI_Return_packet( the_packet_prefix );
323      break;
324
325    case MESSAGE_QUEUE_MP_SEND_REQUEST:
326
327      the_packet->Prefix.return_code = rtems_message_queue_send(
328        the_packet->Prefix.id,
329        the_packet->Buffer.buffer,
330        the_packet->Buffer.size
331      );
332
333      _Message_queue_MP_Send_response_packet(
334        MESSAGE_QUEUE_MP_SEND_RESPONSE,
335        the_packet->Prefix.id,
336        _Thread_Executing
337      );
338      break;
339
340    case MESSAGE_QUEUE_MP_SEND_RESPONSE:
341    case MESSAGE_QUEUE_MP_URGENT_RESPONSE:
342
343      the_thread = _MPCI_Process_response( the_packet_prefix );
344
345      _MPCI_Return_packet( the_packet_prefix );
346      break;
347
348    case MESSAGE_QUEUE_MP_URGENT_REQUEST:
349
350      the_packet->Prefix.return_code = rtems_message_queue_urgent(
351        the_packet->Prefix.id,
352        the_packet->Buffer.buffer,
353        the_packet->Buffer.size
354      );
355
356      _Message_queue_MP_Send_response_packet(
357        MESSAGE_QUEUE_MP_URGENT_RESPONSE,
358        the_packet->Prefix.id,
359        _Thread_Executing
360      );
361      break;
362
363    case MESSAGE_QUEUE_MP_BROADCAST_REQUEST:
364
365      the_packet->Prefix.return_code = rtems_message_queue_broadcast(
366        the_packet->Prefix.id,
367        the_packet->Buffer.buffer,
368        the_packet->Buffer.size,
369        &the_packet->count
370      );
371
372      _Message_queue_MP_Send_response_packet(
373        MESSAGE_QUEUE_MP_BROADCAST_RESPONSE,
374        the_packet->Prefix.id,
375        _Thread_Executing
376      );
377      break;
378
379    case MESSAGE_QUEUE_MP_BROADCAST_RESPONSE:
380    case MESSAGE_QUEUE_MP_FLUSH_RESPONSE:
381
382      the_thread = _MPCI_Process_response( the_packet_prefix );
383
384      *(unsigned32 *)the_thread->Wait.return_argument = the_packet->count;
385
386      _MPCI_Return_packet( the_packet_prefix );
387      break;
388
389    case MESSAGE_QUEUE_MP_FLUSH_REQUEST:
390
391      the_packet->Prefix.return_code = rtems_message_queue_flush(
392        the_packet->Prefix.id,
393        &the_packet->count
394      );
395
396      _Message_queue_MP_Send_response_packet(
397        MESSAGE_QUEUE_MP_FLUSH_RESPONSE,
398        the_packet->Prefix.id,
399        _Thread_Executing
400      );
401      break;
402
403  }
404}
405
406/*PAGE
407 *
408 *  _Message_queue_MP_Send_object_was_deleted
409 *
410 */
411
412void _Message_queue_MP_Send_object_was_deleted (
413  Thread_Control  *the_proxy
414)
415{
416  the_proxy->receive_packet->return_code = RTEMS_OBJECT_WAS_DELETED;
417
418  _Message_queue_MP_Send_response_packet(
419    MESSAGE_QUEUE_MP_RECEIVE_RESPONSE,
420    the_proxy->Wait.id,
421    the_proxy
422  );
423}
424
425/*PAGE
426 *
427 *  _Message_queue_MP_Send_extract_proxy
428 *
429 */
430
431void _Message_queue_MP_Send_extract_proxy (
432  Thread_Control  *the_thread
433)
434{
435  _Message_queue_MP_Send_process_packet(
436    MESSAGE_QUEUE_MP_EXTRACT_PROXY,
437    the_thread->Wait.id,
438    (rtems_name) 0,
439    the_thread->Object.id
440  );
441}
442
443/*PAGE
444 *
445 *  _Message_queue_MP_Get_packet
446 *
447 */
448
449Message_queue_MP_Packet *_Message_queue_MP_Get_packet ( void )
450{
451  return ( (Message_queue_MP_Packet *) _MPCI_Get_packet() );
452}
453
454/* end of file */
Note: See TracBrowser for help on using the repository browser.