source: rtems/cpukit/posix/src/aio_misc.c @ 5cb175bb

4.115
Last change on this file since 5cb175bb was 5cb175bb, checked in by Joel Sherrill <joel.sherrill@…>, on 01/10/13 at 19:22:31

cpukit/posix: Doxygen group is POSIXAPI

  • Property mode set to 100644
File size: 14.9 KB
Line 
1/**
2 * @file
3 *
4 * @brief Actual request being processed
5 * @ingroup POSIXAPI
6 */
7
8/*
9 * Copyright 2010-2011, Alin Rus <alin.codejunkie@gmail.com>
10 *
11 * The license and distribution terms for this file may be
12 * found in the file LICENSE in this distribution or at
13 * http://www.rtems.com/license/LICENSE.
14 */
15
16#include <pthread.h>
17#include <stdlib.h>
18#include <unistd.h>
19#include <time.h>
20#include <rtems/posix/aio_misc.h>
21#include <errno.h>
22
23static void *rtems_aio_handle (void *arg);
24
25rtems_aio_queue aio_request_queue;
26
27/*
28 *  rtems_aio_init
29 *
30 * Initialize the request queue for aio
31 *
32 *  Input parameters:
33 *        NONE
34 *
35 *  Output parameters:
36 *        0    -    if initialization succeeded
37 */
38
39int
40rtems_aio_init (void)
41{
42  int result = 0;
43
44  result = pthread_attr_init (&aio_request_queue.attr);
45  if (result != 0)
46    return result;
47
48  result =
49    pthread_attr_setdetachstate (&aio_request_queue.attr,
50                                 PTHREAD_CREATE_DETACHED);
51  if (result != 0)
52    pthread_attr_destroy (&aio_request_queue.attr);
53
54
55  result = pthread_mutex_init (&aio_request_queue.mutex, NULL);
56  if (result != 0)
57    pthread_attr_destroy (&aio_request_queue.attr);
58
59
60  result = pthread_cond_init (&aio_request_queue.new_req, NULL);
61  if (result != 0) {
62    pthread_mutex_destroy (&aio_request_queue.mutex);
63    pthread_attr_destroy (&aio_request_queue.attr);
64  }
65
66  rtems_chain_initialize_empty (&aio_request_queue.work_req);
67  rtems_chain_initialize_empty (&aio_request_queue.idle_req);
68
69  aio_request_queue.active_threads = 0;
70  aio_request_queue.idle_threads = 0;
71  aio_request_queue.initialized = AIO_QUEUE_INITIALIZED;
72
73  return result;
74}
75
76/*
77 *  rtems_aio_search_fd
78 *
79 * Search and create chain of requests for given FD
80 *
81 *  Input parameters:
82 *        chain        - chain of FD chains
83 *        fildes       - file descriptor to search
84 *        create       - if 1 search and create
85 *                     - if 0 just search
86 *
87 *  Output parameters:
88 *        r_chain      - NULL if create == 0 and there is
89 *                       no chain for given fildes
90 *                     - pointer to chain is there exists
91 *                       a chain for given fildes
92 *                     - pointer to newly create chain if
93 *                       create == 1
94 *
95 */
96
97rtems_aio_request_chain *
98rtems_aio_search_fd (rtems_chain_control *chain, int fildes, int create)
99{
100  rtems_aio_request_chain *r_chain;
101  rtems_chain_node *node;
102
103  node = rtems_chain_first (chain);
104  r_chain = (rtems_aio_request_chain *) node;
105
106  while (r_chain->fildes < fildes && !rtems_chain_is_tail (chain, node)) {
107    node = rtems_chain_next (node);
108    r_chain = (rtems_aio_request_chain *) node;
109  }
110
111  if (r_chain->fildes == fildes)
112    r_chain->new_fd = 0;
113  else {
114    if (create == 0)
115      r_chain = NULL;
116    else {
117      r_chain = malloc (sizeof (rtems_aio_request_chain));
118      rtems_chain_initialize_empty (&r_chain->perfd);
119
120      if (rtems_chain_is_empty (chain))
121        rtems_chain_prepend (chain, &r_chain->next_fd);
122      else
123        rtems_chain_insert (rtems_chain_previous (node), &r_chain->next_fd);
124
125      r_chain->new_fd = 1;
126          r_chain->fildes = fildes;
127    }
128  }
129  return r_chain;
130}
131
132/*
133 *  rtems_aio_move_to_work
134 *
135 * Move chain of requests from IQ to WQ
136 *
137 *  Input parameters:
138 *        r_chain      - chain of requests
139 *
140 *  Output paramteres:
141 *        NONE
142 */
143
144static void
145rtems_aio_move_to_work (rtems_aio_request_chain *r_chain)
146{
147  rtems_aio_request_chain *temp;
148  rtems_chain_node *node;
149 
150  node = rtems_chain_first (&aio_request_queue.work_req);
151  temp = (rtems_aio_request_chain *) node;
152
153  while (temp->fildes < r_chain->fildes &&
154         !rtems_chain_is_tail (&aio_request_queue.work_req, node)) {
155    node = rtems_chain_next (node);
156    temp = (rtems_aio_request_chain *) node;
157  }
158 
159  rtems_chain_insert (rtems_chain_previous (node), &r_chain->next_fd);
160}
161 
162
163/*
164 *  rtems_aio_insert_prio
165 *
166 * Add request to given FD chain. The chain is ordered
167 * by priority
168 *
169 *  Input parameters:
170 *        chain        - chain of requests for a given FD
171 *        req          - request (see aio_misc.h)
172 *
173 *  Output parameters:
174 *        NONE
175 */
176
177static void
178rtems_aio_insert_prio (rtems_chain_control *chain, rtems_aio_request *req)
179{
180  rtems_chain_node *node;
181
182  AIO_printf ("FD exists \n");
183  node = rtems_chain_first (chain);
184
185  if (rtems_chain_is_empty (chain)) {
186    AIO_printf ("First in chain \n");
187    rtems_chain_prepend (chain, &req->next_prio);
188  } else {
189    AIO_printf ("Add by priority \n");
190    int prio = ((rtems_aio_request *) node)->aiocbp->aio_reqprio;
191
192    while (req->aiocbp->aio_reqprio > prio &&
193           !rtems_chain_is_tail (chain, node)) {
194      node = rtems_chain_next (node);
195      prio = ((rtems_aio_request *) node)->aiocbp->aio_reqprio;
196    }
197
198    rtems_chain_insert (node->previous, &req->next_prio);
199
200  }
201}
202
203/*
204 *  rtems_aio_remove_fd
205 *
206 * Removes all the requests in a fd chain
207 *
208 *  Input parameters:
209 *        r_chain        - pointer to the fd chain request
210 *
211 *  Output parameters:
212 *        NONE
213 */
214
215void rtems_aio_remove_fd (rtems_aio_request_chain *r_chain)
216{
217  rtems_chain_control *chain;
218  rtems_chain_node *node;
219  chain = &r_chain->perfd;
220  node = rtems_chain_first (chain);
221 
222  while (!rtems_chain_is_tail (chain, node))
223    {
224      rtems_chain_extract (node);
225      rtems_aio_request *req = (rtems_aio_request *) node;
226      node = rtems_chain_next (node);
227      req->aiocbp->error_code = ECANCELED;
228      req->aiocbp->return_value = -1;
229      free (req);
230    }
231}
232
233/*
234 *  rtems_aio_remove_req
235 *
236 * Removes request from given chain
237 *
238 *  Input parameters:
239 *        chain      - pointer to fd chain which may contain
240 *                     the request
241 *        aiocbp     - pointer to request that needs to be
242 *                     canceled
243 *
244 *  Output parameters:
245 *         AIO_NOTCANCELED   - if request was not canceled
246 *         AIO_CANCELED      - if request was canceled
247 */
248
249int rtems_aio_remove_req (rtems_chain_control *chain, struct aiocb *aiocbp)
250{
251  if (rtems_chain_is_empty (chain))
252    return AIO_ALLDONE;
253
254  rtems_chain_node *node = rtems_chain_first (chain);
255  rtems_aio_request *current;
256 
257  current = (rtems_aio_request *) node;
258
259  while (!rtems_chain_is_tail (chain, node) && current->aiocbp != aiocbp) {
260    node = rtems_chain_next (node);
261    current = (rtems_aio_request *) node;
262  }
263 
264  if (rtems_chain_is_tail (chain, node))
265    return AIO_NOTCANCELED;
266  else
267    {
268      rtems_chain_extract (node);
269      current->aiocbp->error_code = ECANCELED;
270      current->aiocbp->return_value = -1;
271      free (current);
272    }
273   
274  return AIO_CANCELED;
275}
276
277/*
278 *  rtems_aio_enqueue
279 *
280 * Enqueue requests, and creates threads to process them
281 *
282 *  Input parameters:
283 *        req        - see aio_misc.h
284 *
285 *  Output parameters:
286 *         0         - if request was added to queue
287 *         errno     - otherwise
288 */
289
290int
291rtems_aio_enqueue (rtems_aio_request *req)
292{
293
294  rtems_aio_request_chain *r_chain;
295  rtems_chain_control *chain;
296  pthread_t thid;
297  int result, policy;
298  struct sched_param param;
299
300  /* The queue should be initialized */
301  AIO_assert (aio_request_queue.initialized == AIO_QUEUE_INITIALIZED);
302
303  result = pthread_mutex_lock (&aio_request_queue.mutex);
304  if (result != 0) {
305    free (req);
306    return result;
307  }
308
309  /* _POSIX_PRIORITIZED_IO and _POSIX_PRIORITY_SCHEDULING are defined,
310     we can use aio_reqprio to lower the priority of the request */
311  pthread_getschedparam (pthread_self(), &policy, &param);
312
313  req->caller_thread = pthread_self ();
314  req->priority = param.sched_priority - req->aiocbp->aio_reqprio;
315  req->policy = policy;
316  req->aiocbp->error_code = EINPROGRESS;
317  req->aiocbp->return_value = 0;
318
319  if ((aio_request_queue.idle_threads == 0) &&
320      aio_request_queue.active_threads < AIO_MAX_THREADS)
321    /* we still have empty places on the active_threads chain */
322    {
323      chain = &aio_request_queue.work_req;
324      r_chain = rtems_aio_search_fd (chain, req->aiocbp->aio_fildes, 1);
325     
326      if (r_chain->new_fd == 1) {
327        rtems_chain_prepend (&r_chain->perfd, &req->next_prio);
328        r_chain->new_fd = 0;
329        pthread_mutex_init (&r_chain->mutex, NULL);
330        pthread_cond_init (&r_chain->cond, NULL);
331           
332        AIO_printf ("New thread \n");
333        result = pthread_create (&thid, &aio_request_queue.attr,
334                                 rtems_aio_handle, (void *) r_chain);
335        if (result != 0) {
336          pthread_mutex_unlock (&aio_request_queue.mutex);
337          return result;
338        }
339        ++aio_request_queue.active_threads;
340      }
341      else {
342        /* put request in the fd chain it belongs to */
343        pthread_mutex_lock (&r_chain->mutex);
344        rtems_aio_insert_prio (&r_chain->perfd, req);
345        pthread_cond_signal (&r_chain->cond);
346        pthread_mutex_unlock (&r_chain->mutex);
347      }
348    }
349  else
350    {
351      /* the maximum number of threads has been already created
352         even though some of them might be idle.
353         The request belongs to one of the active fd chain */
354      r_chain = rtems_aio_search_fd (&aio_request_queue.work_req,
355                                     req->aiocbp->aio_fildes, 0);
356      if (r_chain != NULL)
357        {
358          pthread_mutex_lock (&r_chain->mutex);
359          rtems_aio_insert_prio (&r_chain->perfd, req);
360          pthread_cond_signal (&r_chain->cond);
361          pthread_mutex_unlock (&r_chain->mutex);
362           
363        } else {
364     
365        /* or to the idle chain */
366        chain = &aio_request_queue.idle_req;
367        r_chain = rtems_aio_search_fd (chain, req->aiocbp->aio_fildes, 1);
368     
369        if (r_chain->new_fd == 1) {
370          /* If this is a new fd chain we signal the idle threads that
371             might be waiting for requests */
372          AIO_printf (" New chain on waiting queue \n ");
373          rtems_chain_prepend (&r_chain->perfd, &req->next_prio);
374          r_chain->new_fd = 0;
375          pthread_mutex_init (&r_chain->mutex, NULL);
376          pthread_cond_init (&r_chain->cond, NULL);
377        } else
378          /* just insert the request in the existing fd chain */
379          rtems_aio_insert_prio (&r_chain->perfd, req);
380        if (aio_request_queue.idle_threads > 0)
381          pthread_cond_signal (&aio_request_queue.new_req);
382      }
383    }
384
385  pthread_mutex_unlock (&aio_request_queue.mutex);
386  return 0;
387}
388
389/*
390 *  rtems_aio_handle
391 *
392 * Thread processing requests
393 *
394 *  Input parameters:
395 *        arg        - the chain for the fd to be worked on
396 *
397 *  Output parameters:
398 *        NULL       - if error
399 */
400
401static void *
402rtems_aio_handle (void *arg)
403{
404
405  rtems_aio_request_chain *r_chain = arg;
406  rtems_aio_request *req;
407  rtems_chain_control *chain;
408  rtems_chain_node *node;
409  int result, policy;
410  struct sched_param param;
411
412  AIO_printf ("Thread started\n");
413 
414  while (1) {
415   
416    /* acquire the mutex of the current fd chain.
417       we don't need to lock the queue mutex since we can
418       add requests to idle fd chains or even active ones
419       if the working request has been extracted from the
420       chain */
421    result = pthread_mutex_lock (&r_chain->mutex);
422    if (result != 0)
423      return NULL;
424   
425    chain = &r_chain->perfd;   
426
427    /* If the locked chain is not empty, take the first
428       request extract it, unlock the chain and process
429       the request, in this way the user can supply more
430       requests to this fd chain */
431    if (!rtems_chain_is_empty (chain)) {
432
433      AIO_printf ("Get new request from not empty chain\n");   
434      node = rtems_chain_first (chain);
435      req = (rtems_aio_request *) node;
436     
437      /* See _POSIX_PRIORITIZE_IO and _POSIX_PRIORITY_SCHEDULING
438         discussion in rtems_aio_enqueue () */
439      pthread_getschedparam (pthread_self(), &policy, &param);
440      param.sched_priority = req->priority;
441      pthread_setschedparam (pthread_self(), req->policy, &param);
442
443      rtems_chain_extract (node);
444
445      pthread_mutex_unlock (&r_chain->mutex);
446
447      switch (req->aiocbp->aio_lio_opcode) {
448      case LIO_READ:
449        AIO_printf ("read\n");
450        result = pread (req->aiocbp->aio_fildes,
451                        (void *) req->aiocbp->aio_buf,
452                        req->aiocbp->aio_nbytes, req->aiocbp->aio_offset);
453        break;
454
455      case LIO_WRITE:
456        AIO_printf ("write\n");
457        result = pwrite (req->aiocbp->aio_fildes,
458                         (void *) req->aiocbp->aio_buf,
459                         req->aiocbp->aio_nbytes, req->aiocbp->aio_offset);
460        break;
461       
462      case LIO_SYNC:
463        AIO_printf ("sync\n");
464        result = fsync (req->aiocbp->aio_fildes);
465        break;
466
467      default:
468        result = -1;
469      }
470      if (result == -1) {
471        req->aiocbp->return_value = -1;
472        req->aiocbp->error_code = errno;
473      } else {
474        req->aiocbp->return_value = result;
475        req->aiocbp->error_code = 0;
476      }
477
478      // notification needed for lio
479
480    } else {
481      /* If the fd chain is empty we unlock the fd chain
482         and we lock the queue chain, this will ensure that
483         we have at most one request comming to our fd chain
484         when we check.
485         
486         If there was no request added sleep for 3 seconds and
487         wait for a signal on chain, this will unlock the queue.
488         The fd chain is already unlocked */
489
490      struct timespec timeout;
491     
492      AIO_printf ("Chain is empty [WQ], wait for work\n");
493     
494      pthread_mutex_unlock (&r_chain->mutex);
495      pthread_mutex_lock (&aio_request_queue.mutex);
496     
497      if (rtems_chain_is_empty (chain))
498        {
499          clock_gettime (CLOCK_REALTIME, &timeout);
500          timeout.tv_sec += 3;
501          timeout.tv_nsec = 0;
502          result = pthread_cond_timedwait (&r_chain->cond,
503                                           &aio_request_queue.mutex,
504                                           &timeout);
505
506          /* If no requests were added to the chain we delete the fd chain from
507             the queue and start working with idle fd chains */
508          if (result == ETIMEDOUT) {
509            rtems_chain_extract (&r_chain->next_fd);
510            pthread_mutex_destroy (&r_chain->mutex);
511            pthread_cond_destroy (&r_chain->cond);
512            free (r_chain);
513           
514            /* If the idle chain is empty sleep for 3 seconds and wait for a
515               signal. The thread now becomes idle. */
516            if (rtems_chain_is_empty (&aio_request_queue.idle_req)) {
517              AIO_printf ("Chain is empty [IQ], wait for work\n");           
518
519              ++aio_request_queue.idle_threads;
520              --aio_request_queue.active_threads;
521              clock_gettime (CLOCK_REALTIME, &timeout);
522              timeout.tv_sec += 3;
523              timeout.tv_nsec = 0;
524
525              result = pthread_cond_timedwait (&aio_request_queue.new_req,
526                                               &aio_request_queue.mutex,
527                                               &timeout);
528             
529              /* If no new fd chain was added in the idle requests
530                 then this thread is finished */
531              if (result == ETIMEDOUT) {
532                AIO_printf ("Etimeout\n");
533                --aio_request_queue.idle_threads;
534                pthread_mutex_unlock (&aio_request_queue.mutex);
535                return NULL;
536              }
537            }
538            /* Otherwise move this chain to the working chain and
539               start the loop all over again */
540            AIO_printf ("Work on idle\n");
541            --aio_request_queue.idle_threads;
542            ++aio_request_queue.active_threads;
543
544            node = rtems_chain_first (&aio_request_queue.idle_req);
545            rtems_chain_extract (node);
546
547            r_chain = (rtems_aio_request_chain *) node;
548            rtems_aio_move_to_work (r_chain);
549           
550          }
551        }
552      /* If there was a request added in the initial fd chain then release
553         the mutex and process it */
554      pthread_mutex_unlock (&aio_request_queue.mutex);
555     
556    }
557  }
558 
559  AIO_printf ("Thread finished\n");
560  return NULL;
561}
Note: See TracBrowser for help on using the repository browser.