Changeset 3b438fa in rtems


Ignore:
Timestamp:
Aug 17, 1995, 7:39:31 PM (25 years ago)
Author:
Joel Sherrill <joel.sherrill@…>
Branches:
4.10, 4.11, 4.8, 4.9, master
Children:
b06e68ef
Parents:
4b374f36
Message:

variable length messages

Files:
4 edited

Legend:

Unmodified
Added
Removed
  • c/src/exec/rtems/src/msg.c

    r4b374f36 r3b438fa  
    3636 *  Input parameters:
    3737 *    maximum_message_queues - number of message queues to initialize
    38  *    maximum_message        - number of messages per queue
    3938 *
    4039 *  Output parameters:  NONE
     
    4241
    4342void _Message_queue_Manager_initialization(
    44   unsigned32 maximum_message_queues,
    45   unsigned32 maximum_messages
    46 )
    47 {
    48 
     43  unsigned32 maximum_message_queues
     44)
     45{
    4946  _Objects_Initialize_information(
    5047    &_Message_queue_Information,
     
    5350    sizeof( Message_queue_Control )
    5451  );
    55 
    56   if ( maximum_messages == 0 ) {
    57 
    58     _Chain_Initialize_empty( &_Message_queue_Inactive_messages );
    59 
    60   } else {
    61 
    62 
    63     _Chain_Initialize(
    64       &_Message_queue_Inactive_messages,
    65       _Workspace_Allocate_or_fatal_error(
    66         maximum_messages * sizeof( Message_queue_Buffer_control )
    67       ),
    68       maximum_messages,
    69       sizeof( Message_queue_Buffer_control )
    70     );
    71 
    72   }
     52}
     53
     54/*PAGE
     55 *
     56 *  _Message_queue_Allocate
     57 *
     58 *  Allocate a message queue and the space for its messages
     59 */
     60
     61Message_queue_Control *_Message_queue_Allocate (
     62  unsigned32          count,
     63  unsigned32          max_message_size
     64)
     65{
     66    Message_queue_Control *mq = 0;
     67    unsigned32 message_buffering_required;
     68    unsigned32 allocated_message_size;
     69
     70    mq = (Message_queue_Control *)_Objects_Allocate(&_Message_queue_Information);
     71    if (mq == 0)
     72        goto failed;
     73
     74    mq->maximum_message_size = max_message_size;
     75
     76    /*
     77     * round size up to multiple of a ptr for chain init
     78     */
     79   
     80    allocated_message_size = max_message_size;
     81    if (allocated_message_size & (sizeof(unsigned32) - 1))
     82    {
     83        allocated_message_size += sizeof(unsigned32);
     84        allocated_message_size &= ~(sizeof(unsigned32) - 1);
     85    }
     86   
     87    message_buffering_required = count * (allocated_message_size + sizeof(Message_queue_Buffer_control));
     88 
     89    mq->message_buffers = (Message_queue_Buffer *) _Workspace_Allocate(message_buffering_required);
     90    if (mq->message_buffers == 0)
     91        goto failed;
     92 
     93    _Chain_Initialize(&mq->Inactive_messages,
     94                      mq->message_buffers,
     95                      count,
     96                      allocated_message_size + sizeof(Message_queue_Buffer_control));
     97    return mq;
     98
     99failed:
     100    if (mq)
     101        _Message_queue_Free(mq);
     102    return (Message_queue_Control *) 0;
    73103}
    74104
     
    83113 *    name          - user defined queue name
    84114 *    count         - maximum message and reserved buffer count
     115 *    max_message_size - maximum size of each message
    85116 *    attribute_set - process method
    86117 *    id            - pointer to queue
     
    95126  Objects_Name        name,
    96127  unsigned32          count,
    97   rtems_attribute  attribute_set,
     128  unsigned32          max_message_size,
     129  rtems_attribute     attribute_set,
    98130  Objects_Id         *id
    99131)
     
    108140    return( RTEMS_MP_NOT_CONFIGURED );
    109141
     142  if (count == 0)
     143      return RTEMS_INVALID_NUMBER;
     144
     145  if (max_message_size == 0)
     146      return RTEMS_INVALID_SIZE;
     147
     148#if 1
     149  /*
     150   * I am not 100% sure this should be an error.
     151   * It seems reasonable to create a que with a large max size,
     152   * and then just send smaller msgs from remote (or all) nodes.
     153   */
     154 
     155  if ( _Attributes_Is_global( attribute_set ) &&
     156       _Configuration_MPCI_table &&
     157       (_Configuration_MPCI_table->maximum_packet_size < max_message_size))
     158  {
     159      return RTEMS_INVALID_SIZE;
     160  }
     161#endif
     162       
    110163  _Thread_Disable_dispatch();              /* protects object pointer */
    111164
    112   the_message_queue = _Message_queue_Allocate();
     165  the_message_queue = _Message_queue_Allocate(count, max_message_size);
    113166
    114167  if ( !the_message_queue ) {
     
    122175    _Message_queue_Free( the_message_queue );
    123176    _Thread_Enable_dispatch();
    124     return( RTEMS_TOO_MANY );
    125   }
    126 
    127   if ( _Attributes_Is_limit( attribute_set ) )
    128     the_message_queue->maximum_pending_messages = count;
    129   else
    130     the_message_queue->maximum_pending_messages = 0xffffffff;
    131 
    132   the_message_queue->attribute_set              = attribute_set;
     177    return RTEMS_TOO_MANY;
     178  }
     179
     180  the_message_queue->maximum_pending_messages = count;
     181
     182  the_message_queue->attribute_set = attribute_set;
    133183  the_message_queue->number_of_pending_messages = 0;
    134184
     
    265315rtems_status_code rtems_message_queue_send(
    266316  Objects_Id            id,
    267   void                 *buffer
    268 )
    269 {
    270   return( _Message_queue_Submit(
    271       id,
    272       (Message_queue_Buffer *) buffer,
    273       MESSAGE_QUEUE_SEND_REQUEST
    274     )
    275   );
     317  void                 *buffer,
     318  unsigned32            size
     319)
     320{
     321  return _Message_queue_Submit(id, buffer, size, MESSAGE_QUEUE_SEND_REQUEST);
    276322}
    277323
     
    294340rtems_status_code rtems_message_queue_urgent(
    295341  Objects_Id            id,
    296   void                 *buffer
    297 )
    298 {
    299   return( _Message_queue_Submit(
    300       id,
    301       (Message_queue_Buffer *) buffer,
    302       MESSAGE_QUEUE_URGENT_REQUEST
    303     )
    304   );
     342  void                 *buffer,
     343  unsigned32            size
     344)
     345{
     346  return _Message_queue_Submit(id, buffer, size, MESSAGE_QUEUE_URGENT_REQUEST);
    305347}
    306348
     
    326368  Objects_Id            id,
    327369  void                 *buffer,
     370  unsigned32            size,
    328371  unsigned32           *count
    329372)
     
    345388          MESSAGE_QUEUE_MP_BROADCAST_REQUEST,
    346389          id,
    347           (Message_queue_Buffer *) buffer,
    348           0,                               /* Not used */
     390          buffer,
     391          &size,
     392          0,                               /* option_set not used */
    349393          MPCI_DEFAULT_TIMEOUT
    350394        );
    351395
    352396    case OBJECTS_LOCAL:
     397    {
     398      Thread_Wait_information *waitp;
     399      unsigned32 constrained_size;
     400
    353401      number_broadcasted = 0;
    354402      while ( (the_thread =
    355403                 _Thread_queue_Dequeue(&the_message_queue->Wait_queue)) ) {
     404        waitp = &the_thread->Wait;
    356405        number_broadcasted += 1;
    357         _Message_queue_Copy_buffer(
    358           (Message_queue_Buffer *) buffer,
    359           the_thread->Wait.return_argument
    360         );
    361 
     406
     407        constrained_size = size;
     408        if (size > the_message_queue->maximum_message_size)
     409            constrained_size = the_message_queue->maximum_message_size;
     410
     411        _Message_queue_Copy_buffer(buffer,
     412                                   waitp->return_argument,
     413                                   constrained_size);
     414
     415        *waitp->Extra.message_size_p = constrained_size;
     416       
    362417        if ( !_Objects_Is_local_id( the_thread->Object.id ) ) {
    363418          the_thread->receive_packet->return_code = RTEMS_SUCCESSFUL;
     
    373428      *count = number_broadcasted;
    374429      return( RTEMS_SUCCESSFUL );
    375   }
    376 
    377   return( RTEMS_INTERNAL_ERROR );   /* unreached - only to remove warnings */
     430    }
     431
     432    default:
     433      return RTEMS_INTERNAL_ERROR;
     434  }
    378435}
    379436
     
    399456  Objects_Id            id,
    400457  void                 *buffer,
     458  unsigned32           *size_p,
    401459  unsigned32            option_set,
    402   rtems_interval     timeout
     460  rtems_interval        timeout
    403461)
    404462{
    405463  register Message_queue_Control *the_message_queue;
    406   Objects_Locations                      location;
     464  Objects_Locations               location;
    407465
    408466  the_message_queue = _Message_queue_Get( id, &location );
    409467  switch ( location ) {
     468
    410469    case OBJECTS_ERROR:
    411470      return( RTEMS_INVALID_ID );
     471
    412472    case OBJECTS_REMOTE:
    413473      _Thread_Executing->Wait.return_argument = buffer;
    414       return
    415         _Message_queue_MP_Send_request_packet(
     474     
     475      return _Message_queue_MP_Send_request_packet(
    416476          MESSAGE_QUEUE_MP_RECEIVE_REQUEST,
    417477          id,
    418478          buffer,
     479          size_p,
    419480          option_set,
    420481          timeout
     
    422483
    423484    case OBJECTS_LOCAL:
    424       if ( !_Message_queue_Seize( the_message_queue, option_set, buffer ) )
     485      if ( ! _Message_queue_Seize(the_message_queue,
     486                                  option_set,
     487                                  buffer,
     488                                  size_p))
     489      {
    425490        _Thread_queue_Enqueue( &the_message_queue->Wait_queue, timeout );
     491      }
    426492      _Thread_Enable_dispatch();
    427       return( _Thread_Executing->Wait.return_code );
     493      return _Thread_Executing->Wait.return_code;
    428494  }
    429495
     
    468534          MESSAGE_QUEUE_MP_FLUSH_REQUEST,
    469535          id,
    470           0,                               /* Not used */
    471           0,                               /* Not used */
     536          0,                               /* buffer not used */
     537          0,                               /* size_p */
     538          0,                               /* option_set not used */
    472539          MPCI_DEFAULT_TIMEOUT
    473540        );
     
    512579  Message_queue_Control  *the_message_queue,
    513580  rtems_option            option_set,
    514   Message_queue_Buffer   *buffer
     581  void                   *buffer,
     582  unsigned32             *size_p
    515583)
    516584{
     
    527595    the_message = _Message_queue_Get_pending_message( the_message_queue );
    528596    _ISR_Enable( level );
    529     _Message_queue_Copy_buffer( &the_message->Contents, buffer );
    530     _Message_queue_Free_message_buffer( the_message );
     597    *size_p = the_message->Contents.size;
     598    _Message_queue_Copy_buffer( the_message->Contents.buffer, buffer, *size_p );
     599    _Message_queue_Free_message_buffer(the_message_queue, the_message );
    531600    return( TRUE );
    532601  }
     
    543612  executing->Wait.option_set         = option_set;
    544613  executing->Wait.return_argument    = (unsigned32 *)buffer;
     614  executing->Wait.Extra.message_size_p = size_p;
    545615  _ISR_Enable( level );
    546   return( FALSE );
     616  return FALSE;
    547617}
    548618
     
    568638)
    569639{
    570   ISR_Level          level;
     640  ISR_Level   level;
    571641  Chain_Node *inactive_first;
    572642  Chain_Node *message_queue_first;
    573643  Chain_Node *message_queue_last;
    574   unsigned32         count;
     644  unsigned32  count;
    575645
    576646  _ISR_Disable( level );
    577     inactive_first      = _Message_queue_Inactive_messages.first;
     647    inactive_first      = the_message_queue->Inactive_messages.first;
    578648    message_queue_first = the_message_queue->Pending_messages.first;
    579649    message_queue_last  = the_message_queue->Pending_messages.last;
    580650
    581     _Message_queue_Inactive_messages.first = message_queue_first;
    582     message_queue_last->next               = inactive_first;
    583     inactive_first->previous               = message_queue_last;
     651    the_message_queue->Inactive_messages.first = message_queue_first;
     652    message_queue_last->next = inactive_first;
     653    inactive_first->previous = message_queue_last;
    584654    message_queue_first->previous          =
    585                _Chain_Head( &_Message_queue_Inactive_messages );
     655               _Chain_Head( &the_message_queue->Inactive_messages );
    586656
    587657    _Chain_Initialize_empty( &the_message_queue->Pending_messages );
     
    590660    the_message_queue->number_of_pending_messages = 0;
    591661  _ISR_Enable( level );
    592   return( count );
     662  return count;
    593663}
    594664
     
    606676 *  Input parameters:
    607677 *    id          - pointer to message queue
    608  *    the_buffer  - pointer to message buffer
     678 *    buffer      - pointer to message buffer
     679 *    size        - size in bytes of message to send
    609680 *    submit_type - send or urgent message
    610681 *
    611682 *  Output parameters:
    612683 *    RTEMS_SUCCESSFUL - if successful
    613  *    error code        - if unsuccessful
     684 *    error code       - if unsuccessful
    614685 */
    615686
    616687rtems_status_code _Message_queue_Submit(
    617688  Objects_Id                  id,
    618   Message_queue_Buffer       *buffer,
     689  void                       *buffer,
     690  unsigned32                  size,
    619691  Message_queue_Submit_types  submit_type
    620692)
     
    626698
    627699  the_message_queue = _Message_queue_Get( id, &location );
    628   switch ( location ) {
     700  switch ( location )
     701  {
    629702    case OBJECTS_ERROR:
    630703      return( RTEMS_INVALID_ID );
     704
    631705    case OBJECTS_REMOTE:
    632706      switch ( submit_type ) {
     
    637711              id,
    638712              buffer,
    639               0,                               /* Not used */
     713              &size,
     714              0,                               /* option_set */
    640715              MPCI_DEFAULT_TIMEOUT
    641716            );
     
    647722              id,
    648723              buffer,
    649               0,                               /* Not used */
     724              &size,
     725              0,                               /* option_set */
    650726              MPCI_DEFAULT_TIMEOUT
    651727            );
    652728      }
     729
    653730    case OBJECTS_LOCAL:
     731      if (size > the_message_queue->maximum_message_size)
     732      {
     733          _Thread_Enable_dispatch();
     734          return RTEMS_INVALID_SIZE;
     735      }
     736
     737      /*
     738       * Is there a thread currently waiting on this message queue?
     739       */
     740     
    654741      the_thread = _Thread_queue_Dequeue( &the_message_queue->Wait_queue );
    655 
    656       if ( the_thread ) {
    657 
     742      if ( the_thread )
     743      {
    658744        _Message_queue_Copy_buffer(
    659745          buffer,
    660           the_thread->Wait.return_argument
     746          the_thread->Wait.return_argument,
     747          size
    661748        );
    662 
     749        *the_thread->Wait.Extra.message_size_p = size;
     750       
    663751        if ( !_Objects_Is_local_id( the_thread->Object.id ) ) {
    664752          the_thread->receive_packet->return_code = RTEMS_SUCCESSFUL;
     
    675763      }
    676764
     765      /*
     766       * No one waiting on this one currently.
     767       * Allocate a message buffer and store it away
     768       */
     769
    677770      if ( the_message_queue->number_of_pending_messages ==
    678771           the_message_queue->maximum_pending_messages ) {
     
    681774      }
    682775
    683       the_message = _Message_queue_Allocate_message_buffer();
    684 
    685       if ( !the_message ) {
     776      the_message = _Message_queue_Allocate_message_buffer(the_message_queue);
     777      if ( the_message == 0) {
    686778        _Thread_Enable_dispatch();
    687779        return( RTEMS_UNSATISFIED );
    688780      }
    689781
    690       _Message_queue_Copy_buffer( buffer, &the_message->Contents );
    691 
     782      _Message_queue_Copy_buffer( buffer, the_message->Contents.buffer, size );
     783      the_message->Contents.size = size;
     784     
    692785      the_message_queue->number_of_pending_messages += 1;
    693786
     
    703796      _Thread_Enable_dispatch();
    704797      return( RTEMS_SUCCESSFUL );
    705   }
    706 
    707   return( RTEMS_INTERNAL_ERROR );   /* unreached - only to remove warnings */
    708 }
     798         
     799    default:
     800      return RTEMS_INTERNAL_ERROR;       /* And they were such nice boys, too! */
     801  }
     802}
  • c/src/exec/rtems/src/msgmp.c

    r4b374f36 r3b438fa  
    2222#include <rtems/thread.h>
    2323#include <rtems/watchdog.h>
     24#include <rtems/config.h>
    2425
    2526/*PAGE
     
    8687  Message_queue_MP_Remote_operations  operation,
    8788  Objects_Id                          message_queue_id,
    88   Message_queue_Buffer               *buffer,
    89   rtems_option                     option_set,
    90   rtems_interval                   timeout
     89  void                               *buffer,
     90  unsigned32                         *size_p,
     91  rtems_option                        option_set,
     92  rtems_interval                      timeout
    9193)
    9294{
     
    9597  switch ( operation ) {
    9698
    97     case MESSAGE_QUEUE_MP_RECEIVE_REQUEST:
    9899    case MESSAGE_QUEUE_MP_SEND_REQUEST:
    99100    case MESSAGE_QUEUE_MP_URGENT_REQUEST:
     
    103104      the_packet                    = _Message_queue_MP_Get_packet();
    104105      the_packet->Prefix.the_class  = RTEMS_MP_PACKET_MESSAGE_QUEUE;
    105       the_packet->Prefix.length     = sizeof ( Message_queue_MP_Packet );
    106       the_packet->Prefix.to_convert = sizeof ( Message_queue_MP_Packet ) -
    107                          sizeof ( Message_queue_Buffer );
     106      the_packet->Prefix.length     = sizeof(Message_queue_MP_Packet);
     107      if ( size_p )
     108      the_packet->Prefix.length     += *size_p;
     109      the_packet->Prefix.to_convert = sizeof(Message_queue_MP_Packet);
     110
     111      /*
     112       * make sure message is not too big for our MPCI driver
     113       * We have to check it here instead of waiting for MPCI because
     114       * we are about to slam in the payload
     115       */
     116
     117      if (the_packet->Prefix.length >
     118          _Configuration_MPCI_table->maximum_packet_size)
     119      {
     120          _Thread_Enable_dispatch();
     121          return RTEMS_INVALID_SIZE;
     122      }
     123
    108124      if ( ! _Options_Is_no_wait(option_set))
    109125          the_packet->Prefix.timeout = timeout;
    110126
    111       the_packet->operation         = operation;
    112       the_packet->Prefix.id         = message_queue_id;
    113       the_packet->option_set        = option_set;
    114 
    115       if ( buffer )
    116         _Message_queue_Copy_buffer( buffer, &the_packet->Buffer );
    117 
    118       return
    119         _MPCI_Send_request_packet(
    120           rtems_get_node( message_queue_id ),
    121           &the_packet->Prefix,
    122           STATES_WAITING_FOR_MESSAGE
    123         );
     127      the_packet->operation  = operation;
     128      the_packet->Prefix.id  = message_queue_id;
     129      the_packet->option_set = option_set;
     130
     131      /*
     132       * Copy the data into place if needed
     133       */
     134     
     135      if (buffer)
     136      {
     137          the_packet->Buffer.size = *size_p;
     138          _Message_queue_Copy_buffer(buffer,
     139                                     the_packet->Buffer.buffer,
     140                                     *size_p);
     141      }
     142
     143      return _MPCI_Send_request_packet(rtems_get_node(message_queue_id),
     144                                       &the_packet->Prefix,
     145                                       STATES_WAITING_FOR_MESSAGE);
     146      break;
     147
     148    case MESSAGE_QUEUE_MP_RECEIVE_REQUEST:
     149
     150      the_packet                    = _Message_queue_MP_Get_packet();
     151      the_packet->Prefix.the_class  = RTEMS_MP_PACKET_MESSAGE_QUEUE;
     152      the_packet->Prefix.length     = sizeof(Message_queue_MP_Packet);
     153      the_packet->Prefix.to_convert = sizeof(Message_queue_MP_Packet);
     154
     155      if ( ! _Options_Is_no_wait(option_set))
     156          the_packet->Prefix.timeout = timeout;
     157
     158      the_packet->operation  = MESSAGE_QUEUE_MP_RECEIVE_REQUEST;
     159      the_packet->Prefix.id  = message_queue_id;
     160      the_packet->option_set = option_set;
     161      the_packet->size       = 0;        /* just in case of an error */
     162
     163      _Thread_Executing->Wait.return_argument      = (unsigned32 *)buffer;
     164      _Thread_Executing->Wait.Extra.message_size_p = size_p;
     165     
     166      return _MPCI_Send_request_packet(rtems_get_node(message_queue_id),
     167                                       &the_packet->Prefix,
     168                                       STATES_WAITING_FOR_MESSAGE);
    124169      break;
    125170
     
    133178    case MESSAGE_QUEUE_MP_FLUSH_RESPONSE:
    134179      break;
    135 
    136180  }
    137   /*
    138    *  The following line is included to satisfy compilers which
    139    *  produce warnings when a function does not end with a return.
    140    */
     181
    141182  return RTEMS_SUCCESSFUL;
    142183}
     
    169210 *  The packet being returned already contains the class, length, and
    170211 *  to_convert fields, therefore they are not set in this routine.
     212 *
     213 *  Exception: MESSAGE_QUEUE_MP_RECEIVE_RESPONSE needs payload length
     214 *             added to 'length'
    171215 */
    172216      the_packet->operation = operation;
    173217      the_packet->Prefix.id = the_packet->Prefix.source_tid;
    174218
     219      if (operation == MESSAGE_QUEUE_MP_RECEIVE_RESPONSE)
     220          the_packet->Prefix.length += the_packet->size;
     221     
    175222      _MPCI_Send_response_packet(
    176223        rtems_get_node( the_packet->Prefix.source_tid ),
     
    244291      the_packet->Prefix.return_code = rtems_message_queue_receive(
    245292        the_packet->Prefix.id,
    246         &the_packet->Buffer,
     293        the_packet->Buffer.buffer,
     294        &the_packet->size,
    247295        the_packet->option_set,
    248296        the_packet->Prefix.timeout
     
    261309      the_thread = _MPCI_Process_response( the_packet_prefix );
    262310
    263       _Message_queue_Copy_buffer(
    264         &the_packet->Buffer,
    265         (Message_queue_Buffer *) the_thread->Wait.return_argument
    266       );
     311      if (the_packet->Prefix.return_code == RTEMS_SUCCESSFUL) {
     312        *the_thread->Wait.Extra.message_size_p = the_packet->size;
     313
     314        _Message_queue_Copy_buffer(
     315          the_packet->Buffer.buffer,
     316          the_thread->Wait.return_argument,
     317          the_packet->size
     318        );
     319      }
    267320
    268321      _MPCI_Return_packet( the_packet_prefix );
     
    273326      the_packet->Prefix.return_code = rtems_message_queue_send(
    274327        the_packet->Prefix.id,
    275         &the_packet->Buffer
     328        the_packet->Buffer.buffer,
     329        the_packet->Buffer.size
    276330      );
    277331
     
    295349      the_packet->Prefix.return_code = rtems_message_queue_urgent(
    296350        the_packet->Prefix.id,
    297         &the_packet->Buffer
     351        the_packet->Buffer.buffer,
     352        the_packet->Buffer.size
    298353      );
    299354
     
    309364      the_packet->Prefix.return_code = rtems_message_queue_broadcast(
    310365        the_packet->Prefix.id,
    311         &the_packet->Buffer,
     366        the_packet->Buffer.buffer,
     367        the_packet->Buffer.size,
    312368        &the_packet->count
    313369      );
  • cpukit/rtems/src/msg.c

    r4b374f36 r3b438fa  
    3636 *  Input parameters:
    3737 *    maximum_message_queues - number of message queues to initialize
    38  *    maximum_message        - number of messages per queue
    3938 *
    4039 *  Output parameters:  NONE
     
    4241
    4342void _Message_queue_Manager_initialization(
    44   unsigned32 maximum_message_queues,
    45   unsigned32 maximum_messages
    46 )
    47 {
    48 
     43  unsigned32 maximum_message_queues
     44)
     45{
    4946  _Objects_Initialize_information(
    5047    &_Message_queue_Information,
     
    5350    sizeof( Message_queue_Control )
    5451  );
    55 
    56   if ( maximum_messages == 0 ) {
    57 
    58     _Chain_Initialize_empty( &_Message_queue_Inactive_messages );
    59 
    60   } else {
    61 
    62 
    63     _Chain_Initialize(
    64       &_Message_queue_Inactive_messages,
    65       _Workspace_Allocate_or_fatal_error(
    66         maximum_messages * sizeof( Message_queue_Buffer_control )
    67       ),
    68       maximum_messages,
    69       sizeof( Message_queue_Buffer_control )
    70     );
    71 
    72   }
     52}
     53
     54/*PAGE
     55 *
     56 *  _Message_queue_Allocate
     57 *
     58 *  Allocate a message queue and the space for its messages
     59 */
     60
     61Message_queue_Control *_Message_queue_Allocate (
     62  unsigned32          count,
     63  unsigned32          max_message_size
     64)
     65{
     66    Message_queue_Control *mq = 0;
     67    unsigned32 message_buffering_required;
     68    unsigned32 allocated_message_size;
     69
     70    mq = (Message_queue_Control *)_Objects_Allocate(&_Message_queue_Information);
     71    if (mq == 0)
     72        goto failed;
     73
     74    mq->maximum_message_size = max_message_size;
     75
     76    /*
     77     * round size up to multiple of a ptr for chain init
     78     */
     79   
     80    allocated_message_size = max_message_size;
     81    if (allocated_message_size & (sizeof(unsigned32) - 1))
     82    {
     83        allocated_message_size += sizeof(unsigned32);
     84        allocated_message_size &= ~(sizeof(unsigned32) - 1);
     85    }
     86   
     87    message_buffering_required = count * (allocated_message_size + sizeof(Message_queue_Buffer_control));
     88 
     89    mq->message_buffers = (Message_queue_Buffer *) _Workspace_Allocate(message_buffering_required);
     90    if (mq->message_buffers == 0)
     91        goto failed;
     92 
     93    _Chain_Initialize(&mq->Inactive_messages,
     94                      mq->message_buffers,
     95                      count,
     96                      allocated_message_size + sizeof(Message_queue_Buffer_control));
     97    return mq;
     98
     99failed:
     100    if (mq)
     101        _Message_queue_Free(mq);
     102    return (Message_queue_Control *) 0;
    73103}
    74104
     
    83113 *    name          - user defined queue name
    84114 *    count         - maximum message and reserved buffer count
     115 *    max_message_size - maximum size of each message
    85116 *    attribute_set - process method
    86117 *    id            - pointer to queue
     
    95126  Objects_Name        name,
    96127  unsigned32          count,
    97   rtems_attribute  attribute_set,
     128  unsigned32          max_message_size,
     129  rtems_attribute     attribute_set,
    98130  Objects_Id         *id
    99131)
     
    108140    return( RTEMS_MP_NOT_CONFIGURED );
    109141
     142  if (count == 0)
     143      return RTEMS_INVALID_NUMBER;
     144
     145  if (max_message_size == 0)
     146      return RTEMS_INVALID_SIZE;
     147
     148#if 1
     149  /*
     150   * I am not 100% sure this should be an error.
     151   * It seems reasonable to create a que with a large max size,
     152   * and then just send smaller msgs from remote (or all) nodes.
     153   */
     154 
     155  if ( _Attributes_Is_global( attribute_set ) &&
     156       _Configuration_MPCI_table &&
     157       (_Configuration_MPCI_table->maximum_packet_size < max_message_size))
     158  {
     159      return RTEMS_INVALID_SIZE;
     160  }
     161#endif
     162       
    110163  _Thread_Disable_dispatch();              /* protects object pointer */
    111164
    112   the_message_queue = _Message_queue_Allocate();
     165  the_message_queue = _Message_queue_Allocate(count, max_message_size);
    113166
    114167  if ( !the_message_queue ) {
     
    122175    _Message_queue_Free( the_message_queue );
    123176    _Thread_Enable_dispatch();
    124     return( RTEMS_TOO_MANY );
    125   }
    126 
    127   if ( _Attributes_Is_limit( attribute_set ) )
    128     the_message_queue->maximum_pending_messages = count;
    129   else
    130     the_message_queue->maximum_pending_messages = 0xffffffff;
    131 
    132   the_message_queue->attribute_set              = attribute_set;
     177    return RTEMS_TOO_MANY;
     178  }
     179
     180  the_message_queue->maximum_pending_messages = count;
     181
     182  the_message_queue->attribute_set = attribute_set;
    133183  the_message_queue->number_of_pending_messages = 0;
    134184
     
    265315rtems_status_code rtems_message_queue_send(
    266316  Objects_Id            id,
    267   void                 *buffer
    268 )
    269 {
    270   return( _Message_queue_Submit(
    271       id,
    272       (Message_queue_Buffer *) buffer,
    273       MESSAGE_QUEUE_SEND_REQUEST
    274     )
    275   );
     317  void                 *buffer,
     318  unsigned32            size
     319)
     320{
     321  return _Message_queue_Submit(id, buffer, size, MESSAGE_QUEUE_SEND_REQUEST);
    276322}
    277323
     
    294340rtems_status_code rtems_message_queue_urgent(
    295341  Objects_Id            id,
    296   void                 *buffer
    297 )
    298 {
    299   return( _Message_queue_Submit(
    300       id,
    301       (Message_queue_Buffer *) buffer,
    302       MESSAGE_QUEUE_URGENT_REQUEST
    303     )
    304   );
     342  void                 *buffer,
     343  unsigned32            size
     344)
     345{
     346  return _Message_queue_Submit(id, buffer, size, MESSAGE_QUEUE_URGENT_REQUEST);
    305347}
    306348
     
    326368  Objects_Id            id,
    327369  void                 *buffer,
     370  unsigned32            size,
    328371  unsigned32           *count
    329372)
     
    345388          MESSAGE_QUEUE_MP_BROADCAST_REQUEST,
    346389          id,
    347           (Message_queue_Buffer *) buffer,
    348           0,                               /* Not used */
     390          buffer,
     391          &size,
     392          0,                               /* option_set not used */
    349393          MPCI_DEFAULT_TIMEOUT
    350394        );
    351395
    352396    case OBJECTS_LOCAL:
     397    {
     398      Thread_Wait_information *waitp;
     399      unsigned32 constrained_size;
     400
    353401      number_broadcasted = 0;
    354402      while ( (the_thread =
    355403                 _Thread_queue_Dequeue(&the_message_queue->Wait_queue)) ) {
     404        waitp = &the_thread->Wait;
    356405        number_broadcasted += 1;
    357         _Message_queue_Copy_buffer(
    358           (Message_queue_Buffer *) buffer,
    359           the_thread->Wait.return_argument
    360         );
    361 
     406
     407        constrained_size = size;
     408        if (size > the_message_queue->maximum_message_size)
     409            constrained_size = the_message_queue->maximum_message_size;
     410
     411        _Message_queue_Copy_buffer(buffer,
     412                                   waitp->return_argument,
     413                                   constrained_size);
     414
     415        *waitp->Extra.message_size_p = constrained_size;
     416       
    362417        if ( !_Objects_Is_local_id( the_thread->Object.id ) ) {
    363418          the_thread->receive_packet->return_code = RTEMS_SUCCESSFUL;
     
    373428      *count = number_broadcasted;
    374429      return( RTEMS_SUCCESSFUL );
    375   }
    376 
    377   return( RTEMS_INTERNAL_ERROR );   /* unreached - only to remove warnings */
     430    }
     431
     432    default:
     433      return RTEMS_INTERNAL_ERROR;
     434  }
    378435}
    379436
     
    399456  Objects_Id            id,
    400457  void                 *buffer,
     458  unsigned32           *size_p,
    401459  unsigned32            option_set,
    402   rtems_interval     timeout
     460  rtems_interval        timeout
    403461)
    404462{
    405463  register Message_queue_Control *the_message_queue;
    406   Objects_Locations                      location;
     464  Objects_Locations               location;
    407465
    408466  the_message_queue = _Message_queue_Get( id, &location );
    409467  switch ( location ) {
     468
    410469    case OBJECTS_ERROR:
    411470      return( RTEMS_INVALID_ID );
     471
    412472    case OBJECTS_REMOTE:
    413473      _Thread_Executing->Wait.return_argument = buffer;
    414       return
    415         _Message_queue_MP_Send_request_packet(
     474     
     475      return _Message_queue_MP_Send_request_packet(
    416476          MESSAGE_QUEUE_MP_RECEIVE_REQUEST,
    417477          id,
    418478          buffer,
     479          size_p,
    419480          option_set,
    420481          timeout
     
    422483
    423484    case OBJECTS_LOCAL:
    424       if ( !_Message_queue_Seize( the_message_queue, option_set, buffer ) )
     485      if ( ! _Message_queue_Seize(the_message_queue,
     486                                  option_set,
     487                                  buffer,
     488                                  size_p))
     489      {
    425490        _Thread_queue_Enqueue( &the_message_queue->Wait_queue, timeout );
     491      }
    426492      _Thread_Enable_dispatch();
    427       return( _Thread_Executing->Wait.return_code );
     493      return _Thread_Executing->Wait.return_code;
    428494  }
    429495
     
    468534          MESSAGE_QUEUE_MP_FLUSH_REQUEST,
    469535          id,
    470           0,                               /* Not used */
    471           0,                               /* Not used */
     536          0,                               /* buffer not used */
     537          0,                               /* size_p */
     538          0,                               /* option_set not used */
    472539          MPCI_DEFAULT_TIMEOUT
    473540        );
     
    512579  Message_queue_Control  *the_message_queue,
    513580  rtems_option            option_set,
    514   Message_queue_Buffer   *buffer
     581  void                   *buffer,
     582  unsigned32             *size_p
    515583)
    516584{
     
    527595    the_message = _Message_queue_Get_pending_message( the_message_queue );
    528596    _ISR_Enable( level );
    529     _Message_queue_Copy_buffer( &the_message->Contents, buffer );
    530     _Message_queue_Free_message_buffer( the_message );
     597    *size_p = the_message->Contents.size;
     598    _Message_queue_Copy_buffer( the_message->Contents.buffer, buffer, *size_p );
     599    _Message_queue_Free_message_buffer(the_message_queue, the_message );
    531600    return( TRUE );
    532601  }
     
    543612  executing->Wait.option_set         = option_set;
    544613  executing->Wait.return_argument    = (unsigned32 *)buffer;
     614  executing->Wait.Extra.message_size_p = size_p;
    545615  _ISR_Enable( level );
    546   return( FALSE );
     616  return FALSE;
    547617}
    548618
     
    568638)
    569639{
    570   ISR_Level          level;
     640  ISR_Level   level;
    571641  Chain_Node *inactive_first;
    572642  Chain_Node *message_queue_first;
    573643  Chain_Node *message_queue_last;
    574   unsigned32         count;
     644  unsigned32  count;
    575645
    576646  _ISR_Disable( level );
    577     inactive_first      = _Message_queue_Inactive_messages.first;
     647    inactive_first      = the_message_queue->Inactive_messages.first;
    578648    message_queue_first = the_message_queue->Pending_messages.first;
    579649    message_queue_last  = the_message_queue->Pending_messages.last;
    580650
    581     _Message_queue_Inactive_messages.first = message_queue_first;
    582     message_queue_last->next               = inactive_first;
    583     inactive_first->previous               = message_queue_last;
     651    the_message_queue->Inactive_messages.first = message_queue_first;
     652    message_queue_last->next = inactive_first;
     653    inactive_first->previous = message_queue_last;
    584654    message_queue_first->previous          =
    585                _Chain_Head( &_Message_queue_Inactive_messages );
     655               _Chain_Head( &the_message_queue->Inactive_messages );
    586656
    587657    _Chain_Initialize_empty( &the_message_queue->Pending_messages );
     
    590660    the_message_queue->number_of_pending_messages = 0;
    591661  _ISR_Enable( level );
    592   return( count );
     662  return count;
    593663}
    594664
     
    606676 *  Input parameters:
    607677 *    id          - pointer to message queue
    608  *    the_buffer  - pointer to message buffer
     678 *    buffer      - pointer to message buffer
     679 *    size        - size in bytes of message to send
    609680 *    submit_type - send or urgent message
    610681 *
    611682 *  Output parameters:
    612683 *    RTEMS_SUCCESSFUL - if successful
    613  *    error code        - if unsuccessful
     684 *    error code       - if unsuccessful
    614685 */
    615686
    616687rtems_status_code _Message_queue_Submit(
    617688  Objects_Id                  id,
    618   Message_queue_Buffer       *buffer,
     689  void                       *buffer,
     690  unsigned32                  size,
    619691  Message_queue_Submit_types  submit_type
    620692)
     
    626698
    627699  the_message_queue = _Message_queue_Get( id, &location );
    628   switch ( location ) {
     700  switch ( location )
     701  {
    629702    case OBJECTS_ERROR:
    630703      return( RTEMS_INVALID_ID );
     704
    631705    case OBJECTS_REMOTE:
    632706      switch ( submit_type ) {
     
    637711              id,
    638712              buffer,
    639               0,                               /* Not used */
     713              &size,
     714              0,                               /* option_set */
    640715              MPCI_DEFAULT_TIMEOUT
    641716            );
     
    647722              id,
    648723              buffer,
    649               0,                               /* Not used */
     724              &size,
     725              0,                               /* option_set */
    650726              MPCI_DEFAULT_TIMEOUT
    651727            );
    652728      }
     729
    653730    case OBJECTS_LOCAL:
     731      if (size > the_message_queue->maximum_message_size)
     732      {
     733          _Thread_Enable_dispatch();
     734          return RTEMS_INVALID_SIZE;
     735      }
     736
     737      /*
     738       * Is there a thread currently waiting on this message queue?
     739       */
     740     
    654741      the_thread = _Thread_queue_Dequeue( &the_message_queue->Wait_queue );
    655 
    656       if ( the_thread ) {
    657 
     742      if ( the_thread )
     743      {
    658744        _Message_queue_Copy_buffer(
    659745          buffer,
    660           the_thread->Wait.return_argument
     746          the_thread->Wait.return_argument,
     747          size
    661748        );
    662 
     749        *the_thread->Wait.Extra.message_size_p = size;
     750       
    663751        if ( !_Objects_Is_local_id( the_thread->Object.id ) ) {
    664752          the_thread->receive_packet->return_code = RTEMS_SUCCESSFUL;
     
    675763      }
    676764
     765      /*
     766       * No one waiting on this one currently.
     767       * Allocate a message buffer and store it away
     768       */
     769
    677770      if ( the_message_queue->number_of_pending_messages ==
    678771           the_message_queue->maximum_pending_messages ) {
     
    681774      }
    682775
    683       the_message = _Message_queue_Allocate_message_buffer();
    684 
    685       if ( !the_message ) {
     776      the_message = _Message_queue_Allocate_message_buffer(the_message_queue);
     777      if ( the_message == 0) {
    686778        _Thread_Enable_dispatch();
    687779        return( RTEMS_UNSATISFIED );
    688780      }
    689781
    690       _Message_queue_Copy_buffer( buffer, &the_message->Contents );
    691 
     782      _Message_queue_Copy_buffer( buffer, the_message->Contents.buffer, size );
     783      the_message->Contents.size = size;
     784     
    692785      the_message_queue->number_of_pending_messages += 1;
    693786
     
    703796      _Thread_Enable_dispatch();
    704797      return( RTEMS_SUCCESSFUL );
    705   }
    706 
    707   return( RTEMS_INTERNAL_ERROR );   /* unreached - only to remove warnings */
    708 }
     798         
     799    default:
     800      return RTEMS_INTERNAL_ERROR;       /* And they were such nice boys, too! */
     801  }
     802}
  • cpukit/rtems/src/msgmp.c

    r4b374f36 r3b438fa  
    2222#include <rtems/thread.h>
    2323#include <rtems/watchdog.h>
     24#include <rtems/config.h>
    2425
    2526/*PAGE
     
    8687  Message_queue_MP_Remote_operations  operation,
    8788  Objects_Id                          message_queue_id,
    88   Message_queue_Buffer               *buffer,
    89   rtems_option                     option_set,
    90   rtems_interval                   timeout
     89  void                               *buffer,
     90  unsigned32                         *size_p,
     91  rtems_option                        option_set,
     92  rtems_interval                      timeout
    9193)
    9294{
     
    9597  switch ( operation ) {
    9698
    97     case MESSAGE_QUEUE_MP_RECEIVE_REQUEST:
    9899    case MESSAGE_QUEUE_MP_SEND_REQUEST:
    99100    case MESSAGE_QUEUE_MP_URGENT_REQUEST:
     
    103104      the_packet                    = _Message_queue_MP_Get_packet();
    104105      the_packet->Prefix.the_class  = RTEMS_MP_PACKET_MESSAGE_QUEUE;
    105       the_packet->Prefix.length     = sizeof ( Message_queue_MP_Packet );
    106       the_packet->Prefix.to_convert = sizeof ( Message_queue_MP_Packet ) -
    107                          sizeof ( Message_queue_Buffer );
     106      the_packet->Prefix.length     = sizeof(Message_queue_MP_Packet);
     107      if ( size_p )
     108      the_packet->Prefix.length     += *size_p;
     109      the_packet->Prefix.to_convert = sizeof(Message_queue_MP_Packet);
     110
     111      /*
     112       * make sure message is not too big for our MPCI driver
     113       * We have to check it here instead of waiting for MPCI because
     114       * we are about to slam in the payload
     115       */
     116
     117      if (the_packet->Prefix.length >
     118          _Configuration_MPCI_table->maximum_packet_size)
     119      {
     120          _Thread_Enable_dispatch();
     121          return RTEMS_INVALID_SIZE;
     122      }
     123
    108124      if ( ! _Options_Is_no_wait(option_set))
    109125          the_packet->Prefix.timeout = timeout;
    110126
    111       the_packet->operation         = operation;
    112       the_packet->Prefix.id         = message_queue_id;
    113       the_packet->option_set        = option_set;
    114 
    115       if ( buffer )
    116         _Message_queue_Copy_buffer( buffer, &the_packet->Buffer );
    117 
    118       return
    119         _MPCI_Send_request_packet(
    120           rtems_get_node( message_queue_id ),
    121           &the_packet->Prefix,
    122           STATES_WAITING_FOR_MESSAGE
    123         );
     127      the_packet->operation  = operation;
     128      the_packet->Prefix.id  = message_queue_id;
     129      the_packet->option_set = option_set;
     130
     131      /*
     132       * Copy the data into place if needed
     133       */
     134     
     135      if (buffer)
     136      {
     137          the_packet->Buffer.size = *size_p;
     138          _Message_queue_Copy_buffer(buffer,
     139                                     the_packet->Buffer.buffer,
     140                                     *size_p);
     141      }
     142
     143      return _MPCI_Send_request_packet(rtems_get_node(message_queue_id),
     144                                       &the_packet->Prefix,
     145                                       STATES_WAITING_FOR_MESSAGE);
     146      break;
     147
     148    case MESSAGE_QUEUE_MP_RECEIVE_REQUEST:
     149
     150      the_packet                    = _Message_queue_MP_Get_packet();
     151      the_packet->Prefix.the_class  = RTEMS_MP_PACKET_MESSAGE_QUEUE;
     152      the_packet->Prefix.length     = sizeof(Message_queue_MP_Packet);
     153      the_packet->Prefix.to_convert = sizeof(Message_queue_MP_Packet);
     154
     155      if ( ! _Options_Is_no_wait(option_set))
     156          the_packet->Prefix.timeout = timeout;
     157
     158      the_packet->operation  = MESSAGE_QUEUE_MP_RECEIVE_REQUEST;
     159      the_packet->Prefix.id  = message_queue_id;
     160      the_packet->option_set = option_set;
     161      the_packet->size       = 0;        /* just in case of an error */
     162
     163      _Thread_Executing->Wait.return_argument      = (unsigned32 *)buffer;
     164      _Thread_Executing->Wait.Extra.message_size_p = size_p;
     165     
     166      return _MPCI_Send_request_packet(rtems_get_node(message_queue_id),
     167                                       &the_packet->Prefix,
     168                                       STATES_WAITING_FOR_MESSAGE);
    124169      break;
    125170
     
    133178    case MESSAGE_QUEUE_MP_FLUSH_RESPONSE:
    134179      break;
    135 
    136180  }
    137   /*
    138    *  The following line is included to satisfy compilers which
    139    *  produce warnings when a function does not end with a return.
    140    */
     181
    141182  return RTEMS_SUCCESSFUL;
    142183}
     
    169210 *  The packet being returned already contains the class, length, and
    170211 *  to_convert fields, therefore they are not set in this routine.
     212 *
     213 *  Exception: MESSAGE_QUEUE_MP_RECEIVE_RESPONSE needs payload length
     214 *             added to 'length'
    171215 */
    172216      the_packet->operation = operation;
    173217      the_packet->Prefix.id = the_packet->Prefix.source_tid;
    174218
     219      if (operation == MESSAGE_QUEUE_MP_RECEIVE_RESPONSE)
     220          the_packet->Prefix.length += the_packet->size;
     221     
    175222      _MPCI_Send_response_packet(
    176223        rtems_get_node( the_packet->Prefix.source_tid ),
     
    244291      the_packet->Prefix.return_code = rtems_message_queue_receive(
    245292        the_packet->Prefix.id,
    246         &the_packet->Buffer,
     293        the_packet->Buffer.buffer,
     294        &the_packet->size,
    247295        the_packet->option_set,
    248296        the_packet->Prefix.timeout
     
    261309      the_thread = _MPCI_Process_response( the_packet_prefix );
    262310
    263       _Message_queue_Copy_buffer(
    264         &the_packet->Buffer,
    265         (Message_queue_Buffer *) the_thread->Wait.return_argument
    266       );
     311      if (the_packet->Prefix.return_code == RTEMS_SUCCESSFUL) {
     312        *the_thread->Wait.Extra.message_size_p = the_packet->size;
     313
     314        _Message_queue_Copy_buffer(
     315          the_packet->Buffer.buffer,
     316          the_thread->Wait.return_argument,
     317          the_packet->size
     318        );
     319      }
    267320
    268321      _MPCI_Return_packet( the_packet_prefix );
     
    273326      the_packet->Prefix.return_code = rtems_message_queue_send(
    274327        the_packet->Prefix.id,
    275         &the_packet->Buffer
     328        the_packet->Buffer.buffer,
     329        the_packet->Buffer.size
    276330      );
    277331
     
    295349      the_packet->Prefix.return_code = rtems_message_queue_urgent(
    296350        the_packet->Prefix.id,
    297         &the_packet->Buffer
     351        the_packet->Buffer.buffer,
     352        the_packet->Buffer.size
    298353      );
    299354
     
    309364      the_packet->Prefix.return_code = rtems_message_queue_broadcast(
    310365        the_packet->Prefix.id,
    311         &the_packet->Buffer,
     366        the_packet->Buffer.buffer,
     367        the_packet->Buffer.size,
    312368        &the_packet->count
    313369      );
Note: See TracChangeset for help on using the changeset viewer.