source: rtems/cpukit/posix/src/aio_misc.c @ 0e16fa45

5
Last change on this file since 0e16fa45 was 059529e, checked in by Sebastian Huber <sebastian.huber@…>, on 07/21/16 at 08:15:02

score: Add debug support to chains

This helps to detect

  • double insert, append, prepend errors, and
  • get from empty chain errors.
  • Property mode set to 100644
File size: 15.1 KB
Line 
1/**
2 * @file
3 *
4 * @brief Actual request being processed
5 * @ingroup POSIXAPI
6 */
7
8/*
9 * Copyright 2010-2011, Alin Rus <alin.codejunkie@gmail.com>
10 *
11 * The license and distribution terms for this file may be
12 * found in the file LICENSE in this distribution or at
13 * http://www.rtems.org/license/LICENSE.
14 */
15
16#include <pthread.h>
17#include <stdlib.h>
18#include <unistd.h>
19#include <time.h>
20#include <rtems/posix/aio_misc.h>
21#include <errno.h>
22
23static void *rtems_aio_handle (void *arg);
24
25rtems_aio_queue aio_request_queue;
26
27/*
28 *  rtems_aio_init
29 *
30 * Initialize the request queue for aio
31 *
32 *  Input parameters:
33 *        NONE
34 *
35 *  Output parameters:
36 *        0    -    if initialization succeeded
37 */
38
39int
40rtems_aio_init (void)
41{
42  int result = 0;
43
44  result = pthread_attr_init (&aio_request_queue.attr);
45  if (result != 0)
46    return result;
47
48  result =
49    pthread_attr_setdetachstate (&aio_request_queue.attr,
50                                 PTHREAD_CREATE_DETACHED);
51  if (result != 0)
52    pthread_attr_destroy (&aio_request_queue.attr);
53
54
55  result = pthread_mutex_init (&aio_request_queue.mutex, NULL);
56  if (result != 0)
57    pthread_attr_destroy (&aio_request_queue.attr);
58
59
60  result = pthread_cond_init (&aio_request_queue.new_req, NULL);
61  if (result != 0) {
62    pthread_mutex_destroy (&aio_request_queue.mutex);
63    pthread_attr_destroy (&aio_request_queue.attr);
64  }
65
66  rtems_chain_initialize_empty (&aio_request_queue.work_req);
67  rtems_chain_initialize_empty (&aio_request_queue.idle_req);
68
69  aio_request_queue.active_threads = 0;
70  aio_request_queue.idle_threads = 0;
71  aio_request_queue.initialized = AIO_QUEUE_INITIALIZED;
72
73  return result;
74}
75
76/*
77 *  rtems_aio_search_fd
78 *
79 * Search and create chain of requests for given FD
80 *
81 *  Input parameters:
82 *        chain        - chain of FD chains
83 *        fildes       - file descriptor to search
84 *        create       - if 1 search and create
85 *                     - if 0 just search
86 *
87 *  Output parameters:
88 *        r_chain      - NULL if create == 0 and there is
89 *                       no chain for given fildes
90 *                     - pointer to chain is there exists
91 *                       a chain for given fildes
92 *                     - pointer to newly create chain if
93 *                       create == 1
94 *
95 */
96
97rtems_aio_request_chain *
98rtems_aio_search_fd (rtems_chain_control *chain, int fildes, int create)
99{
100  rtems_aio_request_chain *r_chain;
101  rtems_chain_node *node;
102
103  node = rtems_chain_first (chain);
104  r_chain = (rtems_aio_request_chain *) node;
105
106  while (r_chain->fildes < fildes && !rtems_chain_is_tail (chain, node)) {
107    node = rtems_chain_next (node);
108    r_chain = (rtems_aio_request_chain *) node;
109  }
110
111  if (r_chain->fildes == fildes)
112    r_chain->new_fd = 0;
113  else {
114    if (create == 0)
115      r_chain = NULL;
116    else {
117      r_chain = malloc (sizeof (rtems_aio_request_chain));
118      rtems_chain_initialize_empty (&r_chain->perfd);
119      rtems_chain_initialize_node (&r_chain->next_fd);
120
121      if (rtems_chain_is_empty (chain))
122        rtems_chain_prepend (chain, &r_chain->next_fd);
123      else
124        rtems_chain_insert (rtems_chain_previous (node), &r_chain->next_fd);
125
126      r_chain->new_fd = 1;
127          r_chain->fildes = fildes;
128    }
129  }
130  return r_chain;
131}
132
133/*
134 *  rtems_aio_move_to_work
135 *
136 * Move chain of requests from IQ to WQ
137 *
138 *  Input parameters:
139 *        r_chain      - chain of requests
140 *
141 *  Output paramteres:
142 *        NONE
143 */
144
145static void
146rtems_aio_move_to_work (rtems_aio_request_chain *r_chain)
147{
148  rtems_chain_control *work_req_chain = &aio_request_queue.work_req;
149  rtems_aio_request_chain *temp;
150  rtems_chain_node *node;
151
152  node = rtems_chain_first (work_req_chain);
153  temp = (rtems_aio_request_chain *) node;
154
155  while (temp->fildes < r_chain->fildes &&
156         !rtems_chain_is_tail (work_req_chain, node)) {
157    node = rtems_chain_next (node);
158    temp = (rtems_aio_request_chain *) node;
159  }
160
161  rtems_chain_insert (rtems_chain_previous (node), &r_chain->next_fd);
162}
163 
164
165/*
166 *  rtems_aio_insert_prio
167 *
168 * Add request to given FD chain. The chain is ordered
169 * by priority
170 *
171 *  Input parameters:
172 *        chain        - chain of requests for a given FD
173 *        req          - request (see aio_misc.h)
174 *
175 *  Output parameters:
176 *        NONE
177 */
178
179static void
180rtems_aio_insert_prio (rtems_chain_control *chain, rtems_aio_request *req)
181{
182  rtems_chain_node *node;
183
184  AIO_printf ("FD exists \n");
185  node = rtems_chain_first (chain);
186
187  if (rtems_chain_is_empty (chain)) {
188    AIO_printf ("First in chain \n");
189    rtems_chain_prepend (chain, &req->next_prio);
190  } else {
191    AIO_printf ("Add by priority \n");
192    int prio = ((rtems_aio_request *) node)->aiocbp->aio_reqprio;
193
194    while (req->aiocbp->aio_reqprio > prio &&
195           !rtems_chain_is_tail (chain, node)) {
196      node = rtems_chain_next (node);
197      prio = ((rtems_aio_request *) node)->aiocbp->aio_reqprio;
198    }
199
200    rtems_chain_insert (node->previous, &req->next_prio);
201
202  }
203}
204
205/*
206 *  rtems_aio_remove_fd
207 *
208 * Removes all the requests in a fd chain
209 *
210 *  Input parameters:
211 *        r_chain        - pointer to the fd chain request
212 *
213 *  Output parameters:
214 *        NONE
215 */
216
217void rtems_aio_remove_fd (rtems_aio_request_chain *r_chain)
218{
219  rtems_chain_control *chain;
220  rtems_chain_node *node;
221  chain = &r_chain->perfd;
222  node = rtems_chain_first (chain);
223 
224  while (!rtems_chain_is_tail (chain, node))
225    {
226      rtems_aio_request *req = (rtems_aio_request *) node;
227      node = rtems_chain_next (node);
228      rtems_chain_extract (&req->next_prio);
229      req->aiocbp->error_code = ECANCELED;
230      req->aiocbp->return_value = -1;
231      free (req);
232    }
233}
234
235/*
236 *  rtems_aio_remove_req
237 *
238 * Removes request from given chain
239 *
240 *  Input parameters:
241 *        chain      - pointer to fd chain which may contain
242 *                     the request
243 *        aiocbp     - pointer to request that needs to be
244 *                     canceled
245 *
246 *  Output parameters:
247 *         AIO_NOTCANCELED   - if request was not canceled
248 *         AIO_CANCELED      - if request was canceled
249 */
250
251int rtems_aio_remove_req (rtems_chain_control *chain, struct aiocb *aiocbp)
252{
253  if (rtems_chain_is_empty (chain))
254    return AIO_ALLDONE;
255
256  rtems_chain_node *node = rtems_chain_first (chain);
257  rtems_aio_request *current;
258 
259  current = (rtems_aio_request *) node;
260
261  while (!rtems_chain_is_tail (chain, node) && current->aiocbp != aiocbp) {
262    node = rtems_chain_next (node);
263    current = (rtems_aio_request *) node;
264  }
265 
266  if (rtems_chain_is_tail (chain, node))
267    return AIO_NOTCANCELED;
268  else
269    {
270      rtems_chain_extract (node);
271      current->aiocbp->error_code = ECANCELED;
272      current->aiocbp->return_value = -1;
273      free (current);
274    }
275   
276  return AIO_CANCELED;
277}
278
279/*
280 *  rtems_aio_enqueue
281 *
282 * Enqueue requests, and creates threads to process them
283 *
284 *  Input parameters:
285 *        req        - see aio_misc.h
286 *
287 *  Output parameters:
288 *         0         - if request was added to queue
289 *         errno     - otherwise
290 */
291
292int
293rtems_aio_enqueue (rtems_aio_request *req)
294{
295
296  rtems_aio_request_chain *r_chain;
297  rtems_chain_control *chain;
298  pthread_t thid;
299  int result, policy;
300  struct sched_param param;
301
302  /* The queue should be initialized */
303  AIO_assert (aio_request_queue.initialized == AIO_QUEUE_INITIALIZED);
304
305  result = pthread_mutex_lock (&aio_request_queue.mutex);
306  if (result != 0) {
307    free (req);
308    return result;
309  }
310
311  /* _POSIX_PRIORITIZED_IO and _POSIX_PRIORITY_SCHEDULING are defined,
312     we can use aio_reqprio to lower the priority of the request */
313  pthread_getschedparam (pthread_self(), &policy, &param);
314
315  rtems_chain_initialize_node (&req->next_prio);
316  req->caller_thread = pthread_self ();
317  req->priority = param.sched_priority - req->aiocbp->aio_reqprio;
318  req->policy = policy;
319  req->aiocbp->error_code = EINPROGRESS;
320  req->aiocbp->return_value = 0;
321
322  if ((aio_request_queue.idle_threads == 0) &&
323      aio_request_queue.active_threads < AIO_MAX_THREADS)
324    /* we still have empty places on the active_threads chain */
325    {
326      chain = &aio_request_queue.work_req;
327      r_chain = rtems_aio_search_fd (chain, req->aiocbp->aio_fildes, 1);
328     
329      if (r_chain->new_fd == 1) {
330        rtems_chain_prepend (&r_chain->perfd, &req->next_prio);
331        r_chain->new_fd = 0;
332        pthread_mutex_init (&r_chain->mutex, NULL);
333        pthread_cond_init (&r_chain->cond, NULL);
334           
335        AIO_printf ("New thread \n");
336        result = pthread_create (&thid, &aio_request_queue.attr,
337                                 rtems_aio_handle, (void *) r_chain);
338        if (result != 0) {
339          pthread_mutex_unlock (&aio_request_queue.mutex);
340          return result;
341        }
342        ++aio_request_queue.active_threads;
343      }
344      else {
345        /* put request in the fd chain it belongs to */
346        pthread_mutex_lock (&r_chain->mutex);
347        rtems_aio_insert_prio (&r_chain->perfd, req);
348        pthread_cond_signal (&r_chain->cond);
349        pthread_mutex_unlock (&r_chain->mutex);
350      }
351    }
352  else
353    {
354      /* the maximum number of threads has been already created
355         even though some of them might be idle.
356         The request belongs to one of the active fd chain */
357      r_chain = rtems_aio_search_fd (&aio_request_queue.work_req,
358                                     req->aiocbp->aio_fildes, 0);
359      if (r_chain != NULL)
360        {
361          pthread_mutex_lock (&r_chain->mutex);
362          rtems_aio_insert_prio (&r_chain->perfd, req);
363          pthread_cond_signal (&r_chain->cond);
364          pthread_mutex_unlock (&r_chain->mutex);
365           
366        } else {
367     
368        /* or to the idle chain */
369        chain = &aio_request_queue.idle_req;
370        r_chain = rtems_aio_search_fd (chain, req->aiocbp->aio_fildes, 1);
371     
372        if (r_chain->new_fd == 1) {
373          /* If this is a new fd chain we signal the idle threads that
374             might be waiting for requests */
375          AIO_printf (" New chain on waiting queue \n ");
376          rtems_chain_prepend (&r_chain->perfd, &req->next_prio);
377          r_chain->new_fd = 0;
378          pthread_mutex_init (&r_chain->mutex, NULL);
379          pthread_cond_init (&r_chain->cond, NULL);
380        } else
381          /* just insert the request in the existing fd chain */
382          rtems_aio_insert_prio (&r_chain->perfd, req);
383        if (aio_request_queue.idle_threads > 0)
384          pthread_cond_signal (&aio_request_queue.new_req);
385      }
386    }
387
388  pthread_mutex_unlock (&aio_request_queue.mutex);
389  return 0;
390}
391
392/*
393 *  rtems_aio_handle
394 *
395 * Thread processing requests
396 *
397 *  Input parameters:
398 *        arg        - the chain for the fd to be worked on
399 *
400 *  Output parameters:
401 *        NULL       - if error
402 */
403
404static void *
405rtems_aio_handle (void *arg)
406{
407
408  rtems_aio_request_chain *r_chain = arg;
409  rtems_aio_request *req;
410  rtems_chain_control *chain;
411  rtems_chain_node *node;
412  int result, policy;
413  struct sched_param param;
414
415  AIO_printf ("Thread started\n");
416 
417  while (1) {
418   
419    /* acquire the mutex of the current fd chain.
420       we don't need to lock the queue mutex since we can
421       add requests to idle fd chains or even active ones
422       if the working request has been extracted from the
423       chain */
424    result = pthread_mutex_lock (&r_chain->mutex);
425    if (result != 0)
426      return NULL;
427   
428    chain = &r_chain->perfd;   
429
430    /* If the locked chain is not empty, take the first
431       request extract it, unlock the chain and process
432       the request, in this way the user can supply more
433       requests to this fd chain */
434    if (!rtems_chain_is_empty (chain)) {
435
436      AIO_printf ("Get new request from not empty chain\n");   
437      node = rtems_chain_first (chain);
438      req = (rtems_aio_request *) node;
439     
440      /* See _POSIX_PRIORITIZE_IO and _POSIX_PRIORITY_SCHEDULING
441         discussion in rtems_aio_enqueue () */
442      pthread_getschedparam (pthread_self(), &policy, &param);
443      param.sched_priority = req->priority;
444      pthread_setschedparam (pthread_self(), req->policy, &param);
445
446      rtems_chain_extract (node);
447
448      pthread_mutex_unlock (&r_chain->mutex);
449
450      switch (req->aiocbp->aio_lio_opcode) {
451      case LIO_READ:
452        AIO_printf ("read\n");
453        result = pread (req->aiocbp->aio_fildes,
454                        (void *) req->aiocbp->aio_buf,
455                        req->aiocbp->aio_nbytes, req->aiocbp->aio_offset);
456        break;
457
458      case LIO_WRITE:
459        AIO_printf ("write\n");
460        result = pwrite (req->aiocbp->aio_fildes,
461                         (void *) req->aiocbp->aio_buf,
462                         req->aiocbp->aio_nbytes, req->aiocbp->aio_offset);
463        break;
464       
465      case LIO_SYNC:
466        AIO_printf ("sync\n");
467        result = fsync (req->aiocbp->aio_fildes);
468        break;
469
470      default:
471        result = -1;
472      }
473      if (result == -1) {
474        req->aiocbp->return_value = -1;
475        req->aiocbp->error_code = errno;
476      } else {
477        req->aiocbp->return_value = result;
478        req->aiocbp->error_code = 0;
479      }
480
481      // notification needed for lio
482
483    } else {
484      /* If the fd chain is empty we unlock the fd chain
485         and we lock the queue chain, this will ensure that
486         we have at most one request comming to our fd chain
487         when we check.
488         
489         If there was no request added sleep for 3 seconds and
490         wait for a signal on chain, this will unlock the queue.
491         The fd chain is already unlocked */
492
493      struct timespec timeout;
494     
495      AIO_printf ("Chain is empty [WQ], wait for work\n");
496     
497      pthread_mutex_unlock (&r_chain->mutex);
498      pthread_mutex_lock (&aio_request_queue.mutex);
499     
500      if (rtems_chain_is_empty (chain))
501        {
502          clock_gettime (CLOCK_REALTIME, &timeout);
503          timeout.tv_sec += 3;
504          timeout.tv_nsec = 0;
505          result = pthread_cond_timedwait (&r_chain->cond,
506                                           &aio_request_queue.mutex,
507                                           &timeout);
508
509          /* If no requests were added to the chain we delete the fd chain from
510             the queue and start working with idle fd chains */
511          if (result == ETIMEDOUT) {
512            rtems_chain_extract (&r_chain->next_fd);
513            pthread_mutex_destroy (&r_chain->mutex);
514            pthread_cond_destroy (&r_chain->cond);
515            free (r_chain);
516           
517            /* If the idle chain is empty sleep for 3 seconds and wait for a
518               signal. The thread now becomes idle. */
519            if (rtems_chain_is_empty (&aio_request_queue.idle_req)) {
520              AIO_printf ("Chain is empty [IQ], wait for work\n");           
521
522              ++aio_request_queue.idle_threads;
523              --aio_request_queue.active_threads;
524              clock_gettime (CLOCK_REALTIME, &timeout);
525              timeout.tv_sec += 3;
526              timeout.tv_nsec = 0;
527
528              result = pthread_cond_timedwait (&aio_request_queue.new_req,
529                                               &aio_request_queue.mutex,
530                                               &timeout);
531             
532              /* If no new fd chain was added in the idle requests
533                 then this thread is finished */
534              if (result == ETIMEDOUT) {
535                AIO_printf ("Etimeout\n");
536                --aio_request_queue.idle_threads;
537                pthread_mutex_unlock (&aio_request_queue.mutex);
538                return NULL;
539              }
540            }
541            /* Otherwise move this chain to the working chain and
542               start the loop all over again */
543            AIO_printf ("Work on idle\n");
544            --aio_request_queue.idle_threads;
545            ++aio_request_queue.active_threads;
546
547            node = rtems_chain_first (&aio_request_queue.idle_req);
548            rtems_chain_extract (node);
549
550            r_chain = (rtems_aio_request_chain *) node;
551            rtems_aio_move_to_work (r_chain);
552           
553          }
554        }
555      /* If there was a request added in the initial fd chain then release
556         the mutex and process it */
557      pthread_mutex_unlock (&aio_request_queue.mutex);
558     
559    }
560  }
561 
562  AIO_printf ("Thread finished\n");
563  return NULL;
564}
Note: See TracBrowser for help on using the repository browser.