source: rtems/cpukit/score/src/threadqenqueue.c @ aaaf9610

5
Last change on this file since aaaf9610 was aaaf9610, checked in by Sebastian Huber <sebastian.huber@…>, on 08/08/16 at 06:44:51

score: Add debug support to red-black trees

This helps to detect double insert and extract errors.

  • Property mode set to 100644
File size: 15.6 KB
Line 
1/**
2 * @file
3 *
4 * @brief Thread Queue Operations
5 * @ingroup ScoreThreadQ
6 */
7
8/*
9 *  COPYRIGHT (c) 1989-2014.
10 *  On-Line Applications Research Corporation (OAR).
11 *
12 *  Copyright (c) 2015, 2016 embedded brains GmbH.
13 *
14 *  The license and distribution terms for this file may be
15 *  found in the file LICENSE in this distribution or at
16 *  http://www.rtems.org/license/LICENSE.
17 */
18
19#if HAVE_CONFIG_H
20#include "config.h"
21#endif
22
23#include <rtems/score/threadqimpl.h>
24#include <rtems/score/assert.h>
25#include <rtems/score/threaddispatch.h>
26#include <rtems/score/threadimpl.h>
27#include <rtems/score/status.h>
28#include <rtems/score/watchdogimpl.h>
29
30#define THREAD_QUEUE_INTEND_TO_BLOCK \
31  (THREAD_WAIT_CLASS_OBJECT | THREAD_WAIT_STATE_INTEND_TO_BLOCK)
32
33#define THREAD_QUEUE_BLOCKED \
34  (THREAD_WAIT_CLASS_OBJECT | THREAD_WAIT_STATE_BLOCKED)
35
36#define THREAD_QUEUE_READY_AGAIN \
37  (THREAD_WAIT_CLASS_OBJECT | THREAD_WAIT_STATE_READY_AGAIN)
38
39#if defined(RTEMS_SMP)
40/*
41 * A global registry of active thread queue links is used to provide deadlock
42 * detection on SMP configurations.  This is simple to implement and no
43 * additional storage is required for the thread queues.  The disadvantage is
44 * the global registry is not scalable and may lead to lock contention.
45 * However, the registry is only used in case of nested resource conflicts.  In
46 * this case, the application is already in trouble.
47 */
48
49typedef struct {
50  ISR_lock_Control Lock;
51
52  RBTree_Control Links;
53} Thread_queue_Links;
54
55static Thread_queue_Links _Thread_queue_Links = {
56  ISR_LOCK_INITIALIZER( "Thread Queue Links" ),
57  RBTREE_INITIALIZER_EMPTY( _Thread_queue_Links.Links )
58};
59
60static bool _Thread_queue_Link_equal(
61  const void        *left,
62  const RBTree_Node *right
63)
64{
65  const Thread_queue_Queue *the_left;
66  const Thread_queue_Link  *the_right;
67
68  the_left = left;
69  the_right = (Thread_queue_Link *) right;
70
71  return the_left == the_right->source;
72}
73
74static bool _Thread_queue_Link_less(
75  const void        *left,
76  const RBTree_Node *right
77)
78{
79  const Thread_queue_Queue *the_left;
80  const Thread_queue_Link  *the_right;
81
82  the_left = left;
83  the_right = (Thread_queue_Link *) right;
84
85  return (uintptr_t) the_left < (uintptr_t) the_right->source;
86}
87
88static void *_Thread_queue_Link_map( RBTree_Node *node )
89{
90  return node;
91}
92
93static Thread_queue_Link *_Thread_queue_Link_find(
94  Thread_queue_Links *links,
95  Thread_queue_Queue *source
96)
97{
98  return _RBTree_Find_inline(
99    &links->Links,
100    source,
101    _Thread_queue_Link_equal,
102    _Thread_queue_Link_less,
103    _Thread_queue_Link_map
104  );
105}
106
107static bool _Thread_queue_Link_add(
108  Thread_queue_Link  *link,
109  Thread_queue_Queue *source,
110  Thread_queue_Queue *target
111)
112{
113  Thread_queue_Links *links;
114  Thread_queue_Queue *recursive_target;
115  ISR_lock_Context    lock_context;
116
117  links = &_Thread_queue_Links;
118  recursive_target = target;
119
120  _ISR_lock_Acquire( &links->Lock, &lock_context );
121
122  while ( true ) {
123    Thread_queue_Link *recursive_link;
124
125    recursive_link = _Thread_queue_Link_find( links, recursive_target );
126
127    if ( recursive_link == NULL ) {
128      break;
129    }
130
131    recursive_target = recursive_link->target;
132
133    if ( recursive_target == source ) {
134      _ISR_lock_Release( &links->Lock, &lock_context );
135      return false;
136    }
137  }
138
139  link->source = source;
140  link->target = target;
141  _RBTree_Insert_inline(
142    &links->Links,
143    &link->Registry_node,
144    source,
145    _Thread_queue_Link_less
146  );
147
148  _ISR_lock_Release( &links->Lock, &lock_context );
149  return true;
150}
151
152static void _Thread_queue_Link_remove( Thread_queue_Link *link )
153{
154  Thread_queue_Links *links;
155  ISR_lock_Context    lock_context;
156
157  links = &_Thread_queue_Links;
158
159  _ISR_lock_Acquire( &links->Lock, &lock_context );
160  _RBTree_Extract( &links->Links, &link->Registry_node );
161  _ISR_lock_Release( &links->Lock, &lock_context );
162}
163#endif
164
165#define THREAD_QUEUE_LINK_OF_PATH_NODE( node ) \
166  RTEMS_CONTAINER_OF( node, Thread_queue_Link, Path_node );
167
168static void _Thread_queue_Path_release( Thread_queue_Path *path )
169{
170#if defined(RTEMS_SMP)
171  Chain_Node *head;
172  Chain_Node *node;
173
174  head = _Chain_Head( &path->Links );
175  node = _Chain_Last( &path->Links );
176
177  if ( head != node ) {
178    Thread_queue_Link *link;
179
180    /*
181     * The terminal link may have an owner which does not wait on a thread
182     * queue.
183     */
184
185    link = THREAD_QUEUE_LINK_OF_PATH_NODE( node );
186
187    if ( link->Queue_context.Wait.queue == NULL ) {
188      _Thread_Wait_release_default_critical(
189        link->owner,
190        &link->Queue_context.Lock_context
191      );
192
193      node = _Chain_Previous( node );
194#if defined(RTEMS_DEBUG)
195      _Chain_Set_off_chain( &link->Path_node );
196#endif
197    }
198
199    while ( head != node ) {
200      /* The other links have an owner which waits on a thread queue */
201      link = THREAD_QUEUE_LINK_OF_PATH_NODE( node );
202      _Assert( link->Queue_context.Wait.queue != NULL );
203
204      _Thread_queue_Link_remove( link );
205      _Thread_Wait_release_queue_critical(
206        link->Queue_context.Wait.queue,
207        &link->Queue_context
208      );
209      _Thread_Wait_remove_request( link->owner, &link->Queue_context );
210
211      node = _Chain_Previous( node );
212#if defined(RTEMS_DEBUG)
213      _Chain_Set_off_chain( &link->Path_node );
214#endif
215    }
216  }
217#else
218  (void) path;
219#endif
220}
221
222static bool _Thread_queue_Path_acquire(
223  Thread_Control     *the_thread,
224  Thread_queue_Queue *queue,
225  Thread_queue_Path  *path
226)
227{
228  Thread_Control     *owner;
229
230#if defined(RTEMS_SMP)
231  Thread_queue_Link  *link;
232  Thread_queue_Queue *target;
233
234  /*
235   * For an overview please look at the non-SMP part below.  We basically do
236   * the same on SMP configurations.  The fact that we may have more than one
237   * executing thread and each thread queue has its own SMP lock makes the task
238   * a bit more difficult.  We have to avoid deadlocks at SMP lock level, since
239   * this would result in an unrecoverable deadlock of the overall system.
240   */
241
242  _Chain_Initialize_empty( &path->Links );
243
244  owner = queue->owner;
245
246  if ( owner == NULL ) {
247    return true;
248  }
249
250  if ( owner == the_thread ) {
251    return false;
252  }
253
254  _RBTree_Initialize_node( &path->Start.Registry_node );
255  _Chain_Initialize_node( &path->Start.Path_node );
256  _Thread_queue_Context_initialize( &path->Start.Queue_context );
257  link = &path->Start;
258
259  do {
260    _Chain_Append_unprotected( &path->Links, &link->Path_node );
261    link->owner = owner;
262
263    _Thread_Wait_acquire_default_critical(
264      owner,
265      &link->Queue_context.Lock_context
266    );
267
268    target = owner->Wait.queue;
269    link->Queue_context.Wait.queue = target;
270
271    if ( target != NULL ) {
272      if ( _Thread_queue_Link_add( link, queue, target ) ) {
273        _Thread_queue_Gate_add(
274          &owner->Wait.Lock.Pending_requests,
275          &link->Queue_context.Wait.Gate
276        );
277        _Thread_Wait_release_default_critical(
278          owner,
279          &link->Queue_context.Lock_context
280        );
281        _Thread_Wait_acquire_queue_critical( target, &link->Queue_context );
282
283        if ( link->Queue_context.Wait.queue == NULL ) {
284          _Thread_queue_Link_remove( link );
285          _Thread_Wait_release_queue_critical( target, &link->Queue_context );
286          _Thread_Wait_acquire_default_critical(
287            owner,
288            &link->Queue_context.Lock_context
289          );
290          _Thread_Wait_remove_request_locked( owner, &link->Queue_context );
291          _Assert( owner->Wait.queue == NULL );
292          return true;
293        }
294      } else {
295        link->Queue_context.Wait.queue = NULL;
296        _Thread_queue_Path_release( path );
297        return false;
298      }
299    } else {
300      return true;
301    }
302
303    link = &owner->Wait.Link;
304    queue = target;
305    owner = queue->owner;
306  } while ( owner != NULL );
307#else
308  do {
309    owner = queue->owner;
310
311    if ( owner == NULL ) {
312      return true;
313    }
314
315    if ( owner == the_thread ) {
316      return false;
317    }
318
319    queue = owner->Wait.queue;
320  } while ( queue != NULL );
321#endif
322
323  return true;
324}
325
326void _Thread_queue_Deadlock_status( Thread_Control *the_thread )
327{
328  the_thread->Wait.return_code = STATUS_DEADLOCK;
329}
330
331void _Thread_queue_Deadlock_fatal( Thread_Control *the_thread )
332{
333  _Terminate(
334    INTERNAL_ERROR_CORE,
335    false,
336    INTERNAL_ERROR_THREAD_QUEUE_DEADLOCK
337  );
338}
339
340void _Thread_queue_Enqueue_critical(
341  Thread_queue_Queue            *queue,
342  const Thread_queue_Operations *operations,
343  Thread_Control                *the_thread,
344  States_Control                 state,
345  Thread_queue_Context          *queue_context
346)
347{
348  Thread_queue_Path  path;
349  Per_CPU_Control   *cpu_self;
350  bool               success;
351
352#if defined(RTEMS_MULTIPROCESSING)
353  if ( _Thread_MP_Is_receive( the_thread ) && the_thread->receive_packet ) {
354    the_thread = _Thread_MP_Allocate_proxy( state );
355  }
356#endif
357
358  _Thread_Wait_claim( the_thread, queue, operations );
359
360  if ( !_Thread_queue_Path_acquire( the_thread, queue, &path ) ) {
361    _Thread_Wait_restore_default( the_thread );
362    _Thread_queue_Queue_release( queue, &queue_context->Lock_context );
363    _Thread_Wait_tranquilize( the_thread );
364    ( *queue_context->deadlock_callout )( the_thread );
365    return;
366  }
367
368  ( *operations->enqueue )( queue, the_thread, &path );
369
370  _Thread_queue_Path_release( &path );
371
372  the_thread->Wait.return_code = STATUS_SUCCESSFUL;
373  _Thread_Wait_flags_set( the_thread, THREAD_QUEUE_INTEND_TO_BLOCK );
374  cpu_self = _Thread_Dispatch_disable_critical( &queue_context->Lock_context );
375  _Thread_queue_Queue_release( queue, &queue_context->Lock_context );
376
377  if (
378    cpu_self->thread_dispatch_disable_level
379      != queue_context->expected_thread_dispatch_disable_level
380  ) {
381    _Terminate(
382      INTERNAL_ERROR_CORE,
383      false,
384      INTERNAL_ERROR_THREAD_QUEUE_ENQUEUE_FROM_BAD_STATE
385    );
386  }
387
388  /*
389   *  Set the blocking state for this thread queue in the thread.
390   */
391  _Thread_Set_state( the_thread, state );
392
393  /*
394   *  If the thread wants to timeout, then schedule its timer.
395   */
396  switch ( queue_context->timeout_discipline ) {
397    case WATCHDOG_RELATIVE:
398      /* A relative timeout of 0 is a special case indefinite (no) timeout */
399      if ( queue_context->timeout != 0 ) {
400        _Thread_Timer_insert_relative(
401          the_thread,
402          cpu_self,
403          _Thread_Timeout,
404          (Watchdog_Interval) queue_context->timeout
405        );
406      }
407      break;
408    case WATCHDOG_ABSOLUTE:
409      _Thread_Timer_insert_absolute(
410        the_thread,
411        cpu_self,
412        _Thread_Timeout,
413        queue_context->timeout
414      );
415      break;
416    default:
417      break;
418  }
419
420  /*
421   * At this point thread dispatching is disabled, however, we already released
422   * the thread queue lock.  Thus, interrupts or threads on other processors
423   * may already changed our state with respect to the thread queue object.
424   * The request could be satisfied or timed out.  This situation is indicated
425   * by the thread wait flags.  Other parties must not modify our thread state
426   * as long as we are in the THREAD_QUEUE_INTEND_TO_BLOCK thread wait state,
427   * thus we have to cancel the blocking operation ourself if necessary.
428   */
429  success = _Thread_Wait_flags_try_change_acquire(
430    the_thread,
431    THREAD_QUEUE_INTEND_TO_BLOCK,
432    THREAD_QUEUE_BLOCKED
433  );
434  if ( !success ) {
435    _Thread_Remove_timer_and_unblock( the_thread, queue );
436  }
437
438  _Thread_Update_priority( path.update_priority );
439  _Thread_Dispatch_enable( cpu_self );
440}
441
442bool _Thread_queue_Do_extract_locked(
443  Thread_queue_Queue            *queue,
444  const Thread_queue_Operations *operations,
445  Thread_Control                *the_thread
446#if defined(RTEMS_MULTIPROCESSING)
447  ,
448  const Thread_queue_Context    *queue_context
449#endif
450)
451{
452  bool success;
453  bool unblock;
454
455#if defined(RTEMS_MULTIPROCESSING)
456  if ( !_Objects_Is_local_id( the_thread->Object.id ) ) {
457    Thread_Proxy_control    *the_proxy;
458    Thread_queue_MP_callout  mp_callout;
459
460    the_proxy = (Thread_Proxy_control *) the_thread;
461    mp_callout = queue_context->mp_callout;
462    _Assert( mp_callout != NULL );
463    the_proxy->thread_queue_callout = queue_context->mp_callout;
464  }
465#endif
466
467  ( *operations->extract )( queue, the_thread );
468
469  /*
470   * We must update the wait flags under protection of the current thread lock,
471   * otherwise a _Thread_Timeout() running on another processor may interfere.
472   */
473  success = _Thread_Wait_flags_try_change_release(
474    the_thread,
475    THREAD_QUEUE_INTEND_TO_BLOCK,
476    THREAD_QUEUE_READY_AGAIN
477  );
478  if ( success ) {
479    unblock = false;
480  } else {
481    _Assert( _Thread_Wait_flags_get( the_thread ) == THREAD_QUEUE_BLOCKED );
482    _Thread_Wait_flags_set( the_thread, THREAD_QUEUE_READY_AGAIN );
483    unblock = true;
484  }
485
486  _Thread_Wait_restore_default( the_thread );
487
488  return unblock;
489}
490
491void _Thread_queue_Unblock_critical(
492  bool                unblock,
493  Thread_queue_Queue *queue,
494  Thread_Control     *the_thread,
495  ISR_lock_Context   *lock_context
496)
497{
498  if ( unblock ) {
499    Per_CPU_Control *cpu_self;
500
501    cpu_self = _Thread_Dispatch_disable_critical( lock_context );
502    _Thread_queue_Queue_release( queue, lock_context );
503
504    _Thread_Remove_timer_and_unblock( the_thread, queue );
505
506    _Thread_Dispatch_enable( cpu_self );
507  } else {
508    _Thread_queue_Queue_release( queue, lock_context );
509  }
510}
511
512void _Thread_queue_Extract_critical(
513  Thread_queue_Queue            *queue,
514  const Thread_queue_Operations *operations,
515  Thread_Control                *the_thread,
516  Thread_queue_Context          *queue_context
517)
518{
519  bool unblock;
520
521  unblock = _Thread_queue_Extract_locked(
522    queue,
523    operations,
524    the_thread,
525    queue_context
526  );
527
528  _Thread_queue_Unblock_critical(
529    unblock,
530    queue,
531    the_thread,
532    &queue_context->Lock_context
533  );
534}
535
536void _Thread_queue_Extract( Thread_Control *the_thread )
537{
538  Thread_queue_Context  queue_context;
539  Thread_queue_Queue   *queue;
540
541  _Thread_queue_Context_initialize( &queue_context );
542  _Thread_Wait_acquire( the_thread, &queue_context );
543
544  queue = the_thread->Wait.queue;
545
546  if ( queue != NULL ) {
547    bool unblock;
548
549    _Thread_Wait_remove_request( the_thread, &queue_context );
550    _Thread_queue_Context_set_MP_callout(
551      &queue_context,
552      _Thread_queue_MP_callout_do_nothing
553    );
554    unblock = _Thread_queue_Extract_locked(
555      queue,
556      the_thread->Wait.operations,
557      the_thread,
558      &queue_context.Lock_context
559    );
560    _Thread_queue_Unblock_critical(
561      unblock,
562      queue,
563      the_thread,
564      &queue_context.Lock_context
565    );
566  } else {
567    _Thread_Wait_release( the_thread, &queue_context );
568  }
569}
570
571Thread_Control *_Thread_queue_Do_dequeue(
572  Thread_queue_Control          *the_thread_queue,
573  const Thread_queue_Operations *operations
574#if defined(RTEMS_MULTIPROCESSING)
575  ,
576  Thread_queue_MP_callout        mp_callout
577#endif
578)
579{
580  Thread_queue_Context  queue_context;
581  Thread_Control       *the_thread;
582
583  _Thread_queue_Context_initialize( &queue_context );
584  _Thread_queue_Context_set_MP_callout( &queue_context, mp_callout );
585  _Thread_queue_Acquire( the_thread_queue, &queue_context.Lock_context );
586
587  the_thread = _Thread_queue_First_locked( the_thread_queue, operations );
588
589  if ( the_thread != NULL ) {
590    _Thread_queue_Extract_critical(
591      &the_thread_queue->Queue,
592      operations,
593      the_thread,
594      &queue_context
595    );
596  } else {
597    _Thread_queue_Release( the_thread_queue, &queue_context.Lock_context );
598  }
599
600  return the_thread;
601}
602
603#if defined(RTEMS_MULTIPROCESSING)
604void _Thread_queue_Unblock_proxy(
605  Thread_queue_Queue *queue,
606  Thread_Control     *the_thread
607)
608{
609  const Thread_queue_Object *the_queue_object;
610  Thread_Proxy_control      *the_proxy;
611  Thread_queue_MP_callout    mp_callout;
612
613  the_queue_object = THREAD_QUEUE_QUEUE_TO_OBJECT( queue );
614  the_proxy = (Thread_Proxy_control *) the_thread;
615  mp_callout = the_proxy->thread_queue_callout;
616  ( *mp_callout )( the_thread, the_queue_object->Object.id );
617
618  _Thread_MP_Free_proxy( the_thread );
619}
620#endif
Note: See TracBrowser for help on using the repository browser.