source: rtems/cpukit/posix/src/aio_misc.c @ f16c059

4.115
Last change on this file since f16c059 was f16c059, checked in by Joel Sherrill <joel.sherrill@…>, on 01/17/11 at 22:12:48

2011-01-17 Alin Rus <alin.codejunkie@…>

  • posix/src/aio_cancel.c: Fixed ending of if braces.

2011-01-17 Alin Rus <alin.codejunkie@…>

  • posix/src/aio_misc.c: Add debug information. Fixed idle_threads/ active_threads issues. Fixed infinite loop in rtems_aio_handle().
  • Property mode set to 100644
File size: 14.8 KB
Line 
1/*
2 * Copyright 2010-2011, Alin Rus <alin.codejunkie@gmail.com>
3 *
4 * The license and distribution terms for this file may be
5 * found in the file LICENSE in this distribution or at
6 * http://www.rtems.com/license/LICENSE.
7 *
8 * $Id$
9 */
10
11#include <pthread.h>
12#include <stdlib.h>
13#include <unistd.h>
14#include <time.h>
15#include <rtems/posix/aio_misc.h>
16#include <errno.h>
17
18static void *rtems_aio_handle (void *arg);
19
20rtems_aio_queue aio_request_queue;
21
22/*
23 *  rtems_aio_init
24 *
25 * Initialize the request queue for aio
26 *
27 *  Input parameters:
28 *        NONE
29 *
30 *  Output parameters:
31 *        0    -    if initialization succeeded
32 */
33
34int
35rtems_aio_init (void)
36{
37  int result = 0;
38
39  result = pthread_attr_init (&aio_request_queue.attr);
40  if (result != 0)
41    return result;
42
43  result =
44    pthread_attr_setdetachstate (&aio_request_queue.attr,
45                                 PTHREAD_CREATE_DETACHED);
46  if (result != 0)
47    pthread_attr_destroy (&aio_request_queue.attr);
48
49
50  result = pthread_mutex_init (&aio_request_queue.mutex, NULL);
51  if (result != 0)
52    pthread_attr_destroy (&aio_request_queue.attr);
53
54
55  result = pthread_cond_init (&aio_request_queue.new_req, NULL);
56  if (result != 0) {
57    pthread_mutex_destroy (&aio_request_queue.mutex);
58    pthread_attr_destroy (&aio_request_queue.attr);
59  }
60
61  rtems_chain_initialize_empty (&aio_request_queue.work_req);
62  rtems_chain_initialize_empty (&aio_request_queue.idle_req);
63
64  aio_request_queue.active_threads = 0;
65  aio_request_queue.idle_threads = 0;
66  aio_request_queue.initialized = AIO_QUEUE_INITIALIZED;
67
68  return result;
69}
70
71/*
72 *  rtems_aio_search_fd
73 *
74 * Search and create chain of requests for given FD
75 *
76 *  Input parameters:
77 *        chain        - chain of FD chains
78 *        fildes       - file descriptor to search
79 *        create       - if 1 search and create
80 *                     - if 0 just search
81 *
82 *  Output parameters:
83 *        r_chain      - NULL if create == 0 and there is
84 *                       no chain for given fildes
85 *                     - pointer to chain is there exists
86 *                       a chain for given fildes
87 *                     - pointer to newly create chain if
88 *                       create == 1
89 *
90 */
91
92rtems_aio_request_chain *
93rtems_aio_search_fd (rtems_chain_control *chain, int fildes, int create)
94{
95  rtems_aio_request_chain *r_chain;
96  rtems_chain_node *node;
97
98  node = rtems_chain_first (chain);
99  r_chain = (rtems_aio_request_chain *) node;
100
101  while (r_chain->fildes < fildes && !rtems_chain_is_tail (chain, node)) {
102    node = rtems_chain_next (node);
103    r_chain = (rtems_aio_request_chain *) node;
104  }
105
106  if (r_chain->fildes == fildes)
107    r_chain->new_fd = 0;
108  else {
109    if (create == 0)
110      r_chain = NULL;
111    else {
112      r_chain = malloc (sizeof (rtems_aio_request_chain));
113      rtems_chain_initialize_empty (&r_chain->perfd);
114
115      if (rtems_chain_is_empty (chain))
116        rtems_chain_prepend (chain, &r_chain->next_fd);
117      else
118        rtems_chain_insert (rtems_chain_previous (node), &r_chain->next_fd);
119
120      r_chain->new_fd = 1;
121          r_chain->fildes = fildes;
122    }
123  }
124  return r_chain;
125}
126
127/*
128 *  rtems_aio_move_to_work
129 *
130 * Move chain of requests from IQ to WQ
131 *
132 *  Input parameters:
133 *        r_chain      - chain of requests
134 *
135 *  Output paramteres:
136 *        NONE
137 */
138
139void
140rtems_aio_move_to_work (rtems_aio_request_chain *r_chain)
141{
142  rtems_aio_request_chain *temp;
143  rtems_chain_node *node;
144 
145  node = rtems_chain_first (&aio_request_queue.work_req);
146  temp = (rtems_aio_request_chain *) node;
147
148  while (temp->fildes < r_chain->fildes &&
149         !rtems_chain_is_tail (&aio_request_queue.work_req, node)) {
150    node = rtems_chain_next (node);
151    temp = (rtems_aio_request_chain *) node;
152  }
153 
154  rtems_chain_insert (rtems_chain_previous (node), &r_chain->next_fd);
155}
156 
157
158/*
159 *  rtems_aio_insert_prio
160 *
161 * Add request to given FD chain. The chain is ordered
162 * by priority
163 *
164 *  Input parameters:
165 *        chain        - chain of requests for a given FD
166 *        req          - request (see aio_misc.h)
167 *
168 *  Output parameters:
169 *        NONE
170 */
171
172void
173rtems_aio_insert_prio (rtems_chain_control *chain, rtems_aio_request *req)
174{
175  rtems_chain_node *node;
176
177  AIO_printf ("FD exists \n");
178  node = rtems_chain_first (chain);
179
180  if (rtems_chain_is_empty (chain)) {
181    AIO_printf ("First in chain \n");
182    rtems_chain_prepend (chain, &req->next_prio);
183  } else {
184    AIO_printf ("Add by priority \n");
185    int prio = ((rtems_aio_request *) node)->aiocbp->aio_reqprio;
186
187    while (req->aiocbp->aio_reqprio > prio &&
188           !rtems_chain_is_tail (chain, node)) {
189      node = rtems_chain_next (node);
190      prio = ((rtems_aio_request *) node)->aiocbp->aio_reqprio;
191    }
192
193    rtems_chain_insert (node->previous, &req->next_prio);
194
195  }
196}
197
198/*
199 *  rtems_aio_remove_fd
200 *
201 * Removes all the requests in a fd chain
202 *
203 *  Input parameters:
204 *        r_chain        - pointer to the fd chain request
205 *
206 *  Output parameters:
207 *        NONE
208 */
209
210void rtems_aio_remove_fd (rtems_aio_request_chain *r_chain)
211{
212  rtems_chain_control *chain;
213  rtems_chain_node *node;
214  chain = &r_chain->perfd;
215  node = rtems_chain_first (chain);
216 
217  while (!rtems_chain_is_tail (chain, node))
218    {
219      rtems_chain_extract (node);
220      rtems_aio_request *req = (rtems_aio_request *) node;
221      node = rtems_chain_next (node);
222      req->aiocbp->error_code = ECANCELED;
223      req->aiocbp->return_value = -1;
224      free (req);
225    }
226}
227
228/*
229 *  rtems_aio_remove_req
230 *
231 * Removes request from given chain
232 *
233 *  Input parameters:
234 *        chain      - pointer to fd chain which may contain
235 *                     the request
236 *        aiocbp     - pointer to request that needs to be
237 *                     canceled
238 *
239 *  Output parameters:
240 *         AIO_NOTCANCELED   - if request was not canceled
241 *         AIO_CANCELED      - if request was canceled
242 */
243
244int rtems_aio_remove_req (rtems_chain_control *chain, struct aiocb *aiocbp)
245{
246  if (rtems_chain_is_empty (chain))
247    return AIO_ALLDONE;
248
249  rtems_chain_node *node = rtems_chain_first (chain);
250  rtems_aio_request *current;
251 
252  current = (rtems_aio_request *) node;
253
254  while (!rtems_chain_is_tail (chain, node) && current->aiocbp != aiocbp) {
255    node = rtems_chain_next (node);
256    current = (rtems_aio_request *) node;
257  }
258 
259  if (rtems_chain_is_tail (chain, node))
260    return AIO_NOTCANCELED;
261  else
262    {
263      rtems_chain_extract (node);
264      current->aiocbp->error_code = ECANCELED;
265      current->aiocbp->return_value = -1;
266      free (current);
267    }
268   
269  return AIO_CANCELED;
270}
271
272/*
273 *  rtems_aio_enqueue
274 *
275 * Enqueue requests, and creates threads to process them
276 *
277 *  Input parameters:
278 *        req        - see aio_misc.h
279 *
280 *  Output parameters:
281 *         0         - if request was added to queue
282 *         errno     - otherwise
283 */
284
285int
286rtems_aio_enqueue (rtems_aio_request *req)
287{
288
289  rtems_aio_request_chain *r_chain;
290  rtems_chain_control *chain;
291  pthread_t thid;
292  int result, policy;
293  struct sched_param param;
294
295  /* The queue should be initialized */
296  AIO_assert (aio_request_queue.initialized == AIO_QUEUE_INITIALIZED);
297
298  result = pthread_mutex_lock (&aio_request_queue.mutex);
299  if (result != 0) {
300    free (req);
301    return result;
302  }
303
304  /* _POSIX_PRIORITIZED_IO and _POSIX_PRIORITY_SCHEDULING are defined,
305     we can use aio_reqprio to lower the priority of the request */
306  pthread_getschedparam (pthread_self(), &policy, &param);
307
308  req->caller_thread = pthread_self ();
309  req->priority = param.sched_priority - req->aiocbp->aio_reqprio;
310  req->policy = policy;
311  req->aiocbp->error_code = EINPROGRESS;
312  req->aiocbp->return_value = 0;
313
314  if ((aio_request_queue.idle_threads == 0) &&
315      aio_request_queue.active_threads < AIO_MAX_THREADS)
316    /* we still have empty places on the active_threads chain */
317    {
318      chain = &aio_request_queue.work_req;
319      r_chain = rtems_aio_search_fd (chain, req->aiocbp->aio_fildes, 1);
320     
321      if (r_chain->new_fd == 1) {
322        rtems_chain_prepend (&r_chain->perfd, &req->next_prio);
323        r_chain->new_fd = 0;
324        pthread_mutex_init (&r_chain->mutex, NULL);
325        pthread_cond_init (&r_chain->cond, NULL);
326           
327        AIO_printf ("New thread \n");
328        result = pthread_create (&thid, &aio_request_queue.attr,
329                                 rtems_aio_handle, (void *) r_chain);
330        if (result != 0) {
331          pthread_mutex_unlock (&aio_request_queue.mutex);
332          return result;
333        }
334        ++aio_request_queue.active_threads;
335      }
336      else {
337        /* put request in the fd chain it belongs to */
338        pthread_mutex_lock (&r_chain->mutex);
339        rtems_aio_insert_prio (&r_chain->perfd, req);
340        pthread_cond_signal (&r_chain->cond);
341        pthread_mutex_unlock (&r_chain->mutex);
342      }
343    }
344  else
345    {
346      /* the maximum number of threads has been already created
347         even though some of them might be idle.
348         The request belongs to one of the active fd chain */
349      r_chain = rtems_aio_search_fd (&aio_request_queue.work_req,
350                                     req->aiocbp->aio_fildes, 0);
351      if (r_chain != NULL)
352        {
353          pthread_mutex_lock (&r_chain->mutex);
354          rtems_aio_insert_prio (&r_chain->perfd, req);
355          pthread_cond_signal (&r_chain->cond);
356          pthread_mutex_unlock (&r_chain->mutex);
357           
358        } else {
359     
360        /* or to the idle chain */
361        chain = &aio_request_queue.idle_req;
362        r_chain = rtems_aio_search_fd (chain, req->aiocbp->aio_fildes, 1);
363     
364        if (r_chain->new_fd == 1) {
365          /* If this is a new fd chain we signal the idle threads that
366             might be waiting for requests */
367          AIO_printf (" New chain on waiting queue \n ");
368          rtems_chain_prepend (&r_chain->perfd, &req->next_prio);
369          r_chain->new_fd = 0;
370          pthread_mutex_init (&r_chain->mutex, NULL);
371          pthread_cond_init (&r_chain->cond, NULL);
372        } else
373          /* just insert the request in the existing fd chain */
374          rtems_aio_insert_prio (&r_chain->perfd, req);
375        if (aio_request_queue.idle_threads > 0)
376          pthread_cond_signal (&aio_request_queue.new_req);
377      }
378    }
379
380  pthread_mutex_unlock (&aio_request_queue.mutex);
381  return 0;
382}
383
384/*
385 *  rtems_aio_handle
386 *
387 * Thread processing requests
388 *
389 *  Input parameters:
390 *        arg        - the chain for the fd to be worked on
391 *
392 *  Output parameters:
393 *        NULL       - if error
394 */
395
396static void *
397rtems_aio_handle (void *arg)
398{
399
400  rtems_aio_request_chain *r_chain = arg;
401  rtems_aio_request *req;
402  rtems_chain_control *chain;
403  rtems_chain_node *node;
404  int result, policy;
405  struct sched_param param;
406
407  AIO_printf ("Thread started\n");
408 
409  while (1) {
410   
411    /* acquire the mutex of the current fd chain.
412       we don't need to lock the queue mutex since we can
413       add requests to idle fd chains or even active ones
414       if the working request has been extracted from the
415       chain */
416    result = pthread_mutex_lock (&r_chain->mutex);
417    if (result != 0)
418      return NULL;
419   
420    chain = &r_chain->perfd;   
421
422    /* If the locked chain is not empty, take the first
423       request extract it, unlock the chain and process
424       the request, in this way the user can supply more
425       requests to this fd chain */
426    if (!rtems_chain_is_empty (chain)) {
427
428      AIO_printf ("Get new request from not empty chain\n");   
429      node = rtems_chain_first (chain);
430      req = (rtems_aio_request *) node;
431     
432      /* See _POSIX_PRIORITIZE_IO and _POSIX_PRIORITY_SCHEDULING
433         discussion in rtems_aio_enqueue () */
434      pthread_getschedparam (pthread_self(), &policy, &param);
435      param.sched_priority = req->priority;
436      pthread_setschedparam (pthread_self(), req->policy, &param);
437
438      rtems_chain_extract (node);
439
440      pthread_mutex_unlock (&r_chain->mutex);
441
442      switch (req->aiocbp->aio_lio_opcode) {
443      case LIO_READ:
444        AIO_printf ("read\n");
445        result = pread (req->aiocbp->aio_fildes,
446                        (void *) req->aiocbp->aio_buf,
447                        req->aiocbp->aio_nbytes, req->aiocbp->aio_offset);
448        break;
449
450      case LIO_WRITE:
451        AIO_printf ("write\n");
452        result = pwrite (req->aiocbp->aio_fildes,
453                         (void *) req->aiocbp->aio_buf,
454                         req->aiocbp->aio_nbytes, req->aiocbp->aio_offset);
455        break;
456       
457      case LIO_SYNC:
458        AIO_printf ("sync\n");
459        result = fsync (req->aiocbp->aio_fildes);
460        break;
461
462      default:
463        result = -1;
464      }
465      if (result == -1) {
466        req->aiocbp->return_value = -1;
467        req->aiocbp->error_code = errno;
468      } else {
469        req->aiocbp->return_value = result;
470        req->aiocbp->error_code = 0;
471      }
472
473      // notification needed for lio
474
475    } else {
476      /* If the fd chain is empty we unlock the fd chain
477         and we lock the queue chain, this will ensure that
478         we have at most one request comming to our fd chain
479         when we check.
480         
481         If there was no request added sleep for 3 seconds and
482         wait for a signal on chain, this will unlock the queue.
483         The fd chain is already unlocked */
484
485      struct timespec timeout;
486     
487      AIO_printf ("Chain is empty [WQ], wait for work\n");
488     
489      pthread_mutex_unlock (&r_chain->mutex);
490      pthread_mutex_lock (&aio_request_queue.mutex);
491     
492      if (rtems_chain_is_empty (chain))
493        {
494          clock_gettime (CLOCK_REALTIME, &timeout);
495          timeout.tv_sec += 3;
496          timeout.tv_nsec = 0;
497          result = pthread_cond_timedwait (&r_chain->cond,
498                                           &aio_request_queue.mutex,
499                                           &timeout);
500
501          /* If no requests were added to the chain we delete the fd chain from
502             the queue and start working with idle fd chains */
503          if (result == ETIMEDOUT) {
504            rtems_chain_extract (&r_chain->next_fd);
505            pthread_mutex_destroy (&r_chain->mutex);
506            pthread_cond_destroy (&r_chain->cond);
507            free (r_chain);
508           
509            /* If the idle chain is empty sleep for 3 seconds and wait for a
510               signal. The thread now becomes idle. */
511            if (rtems_chain_is_empty (&aio_request_queue.idle_req)) {
512              AIO_printf ("Chain is empty [IQ], wait for work\n");           
513
514              ++aio_request_queue.idle_threads;
515              --aio_request_queue.active_threads;
516              clock_gettime (CLOCK_REALTIME, &timeout);
517              timeout.tv_sec += 3;
518              timeout.tv_nsec = 0;
519
520              result = pthread_cond_timedwait (&aio_request_queue.new_req,
521                                               &aio_request_queue.mutex,
522                                               &timeout);
523             
524              /* If no new fd chain was added in the idle requests
525                 then this thread is finished */
526              if (result == ETIMEDOUT) {
527                AIO_printf ("Etimeout\n");
528                --aio_request_queue.idle_threads;
529                pthread_mutex_unlock (&aio_request_queue.mutex);
530                return NULL;
531              }
532            }
533            /* Otherwise move this chain to the working chain and
534               start the loop all over again */
535            AIO_printf ("Work on idle\n");
536            --aio_request_queue.idle_threads;
537            ++aio_request_queue.active_threads;
538
539            node = rtems_chain_first (&aio_request_queue.idle_req);
540            rtems_chain_extract (node);
541
542            r_chain = (rtems_aio_request_chain *) node;
543            rtems_aio_move_to_work (r_chain);
544           
545          }
546        }
547      /* If there was a request added in the initial fd chain then release
548         the mutex and process it */
549      pthread_mutex_unlock (&aio_request_queue.mutex);
550     
551    }
552  }
553 
554  AIO_printf ("Thread finished\n");
555  return NULL;
556}
Note: See TracBrowser for help on using the repository browser.