source: rtems/cpukit/posix/src/aio_misc.c @ 04da96c7

5
Last change on this file since 04da96c7 was c499856, checked in by Chris Johns <chrisj@…>, on 03/20/14 at 21:10:47

Change all references of rtems.com to rtems.org.

  • Property mode set to 100644
File size: 15.0 KB
Line 
1/**
2 * @file
3 *
4 * @brief Actual request being processed
5 * @ingroup POSIXAPI
6 */
7
8/*
9 * Copyright 2010-2011, Alin Rus <alin.codejunkie@gmail.com>
10 *
11 * The license and distribution terms for this file may be
12 * found in the file LICENSE in this distribution or at
13 * http://www.rtems.org/license/LICENSE.
14 */
15
16#include <pthread.h>
17#include <stdlib.h>
18#include <unistd.h>
19#include <time.h>
20#include <rtems/posix/aio_misc.h>
21#include <errno.h>
22
23static void *rtems_aio_handle (void *arg);
24
25rtems_aio_queue aio_request_queue;
26
27/*
28 *  rtems_aio_init
29 *
30 * Initialize the request queue for aio
31 *
32 *  Input parameters:
33 *        NONE
34 *
35 *  Output parameters:
36 *        0    -    if initialization succeeded
37 */
38
39int
40rtems_aio_init (void)
41{
42  int result = 0;
43
44  result = pthread_attr_init (&aio_request_queue.attr);
45  if (result != 0)
46    return result;
47
48  result =
49    pthread_attr_setdetachstate (&aio_request_queue.attr,
50                                 PTHREAD_CREATE_DETACHED);
51  if (result != 0)
52    pthread_attr_destroy (&aio_request_queue.attr);
53
54
55  result = pthread_mutex_init (&aio_request_queue.mutex, NULL);
56  if (result != 0)
57    pthread_attr_destroy (&aio_request_queue.attr);
58
59
60  result = pthread_cond_init (&aio_request_queue.new_req, NULL);
61  if (result != 0) {
62    pthread_mutex_destroy (&aio_request_queue.mutex);
63    pthread_attr_destroy (&aio_request_queue.attr);
64  }
65
66  rtems_chain_initialize_empty (&aio_request_queue.work_req);
67  rtems_chain_initialize_empty (&aio_request_queue.idle_req);
68
69  aio_request_queue.active_threads = 0;
70  aio_request_queue.idle_threads = 0;
71  aio_request_queue.initialized = AIO_QUEUE_INITIALIZED;
72
73  return result;
74}
75
76/*
77 *  rtems_aio_search_fd
78 *
79 * Search and create chain of requests for given FD
80 *
81 *  Input parameters:
82 *        chain        - chain of FD chains
83 *        fildes       - file descriptor to search
84 *        create       - if 1 search and create
85 *                     - if 0 just search
86 *
87 *  Output parameters:
88 *        r_chain      - NULL if create == 0 and there is
89 *                       no chain for given fildes
90 *                     - pointer to chain is there exists
91 *                       a chain for given fildes
92 *                     - pointer to newly create chain if
93 *                       create == 1
94 *
95 */
96
97rtems_aio_request_chain *
98rtems_aio_search_fd (rtems_chain_control *chain, int fildes, int create)
99{
100  rtems_aio_request_chain *r_chain;
101  rtems_chain_node *node;
102
103  node = rtems_chain_first (chain);
104  r_chain = (rtems_aio_request_chain *) node;
105
106  while (r_chain->fildes < fildes && !rtems_chain_is_tail (chain, node)) {
107    node = rtems_chain_next (node);
108    r_chain = (rtems_aio_request_chain *) node;
109  }
110
111  if (r_chain->fildes == fildes)
112    r_chain->new_fd = 0;
113  else {
114    if (create == 0)
115      r_chain = NULL;
116    else {
117      r_chain = malloc (sizeof (rtems_aio_request_chain));
118      rtems_chain_initialize_empty (&r_chain->perfd);
119
120      if (rtems_chain_is_empty (chain))
121        rtems_chain_prepend (chain, &r_chain->next_fd);
122      else
123        rtems_chain_insert (rtems_chain_previous (node), &r_chain->next_fd);
124
125      r_chain->new_fd = 1;
126          r_chain->fildes = fildes;
127    }
128  }
129  return r_chain;
130}
131
132/*
133 *  rtems_aio_move_to_work
134 *
135 * Move chain of requests from IQ to WQ
136 *
137 *  Input parameters:
138 *        r_chain      - chain of requests
139 *
140 *  Output paramteres:
141 *        NONE
142 */
143
144static void
145rtems_aio_move_to_work (rtems_aio_request_chain *r_chain)
146{
147  rtems_chain_control *work_req_chain = &aio_request_queue.work_req;
148  rtems_aio_request_chain *temp;
149  rtems_chain_node *node;
150
151  node = rtems_chain_first (work_req_chain);
152  temp = (rtems_aio_request_chain *) node;
153
154  while (temp->fildes < r_chain->fildes &&
155         !rtems_chain_is_tail (work_req_chain, node)) {
156    node = rtems_chain_next (node);
157    temp = (rtems_aio_request_chain *) node;
158  }
159
160  rtems_chain_insert (rtems_chain_previous (node), &r_chain->next_fd);
161}
162 
163
164/*
165 *  rtems_aio_insert_prio
166 *
167 * Add request to given FD chain. The chain is ordered
168 * by priority
169 *
170 *  Input parameters:
171 *        chain        - chain of requests for a given FD
172 *        req          - request (see aio_misc.h)
173 *
174 *  Output parameters:
175 *        NONE
176 */
177
178static void
179rtems_aio_insert_prio (rtems_chain_control *chain, rtems_aio_request *req)
180{
181  rtems_chain_node *node;
182
183  AIO_printf ("FD exists \n");
184  node = rtems_chain_first (chain);
185
186  if (rtems_chain_is_empty (chain)) {
187    AIO_printf ("First in chain \n");
188    rtems_chain_prepend (chain, &req->next_prio);
189  } else {
190    AIO_printf ("Add by priority \n");
191    int prio = ((rtems_aio_request *) node)->aiocbp->aio_reqprio;
192
193    while (req->aiocbp->aio_reqprio > prio &&
194           !rtems_chain_is_tail (chain, node)) {
195      node = rtems_chain_next (node);
196      prio = ((rtems_aio_request *) node)->aiocbp->aio_reqprio;
197    }
198
199    rtems_chain_insert (node->previous, &req->next_prio);
200
201  }
202}
203
204/*
205 *  rtems_aio_remove_fd
206 *
207 * Removes all the requests in a fd chain
208 *
209 *  Input parameters:
210 *        r_chain        - pointer to the fd chain request
211 *
212 *  Output parameters:
213 *        NONE
214 */
215
216void rtems_aio_remove_fd (rtems_aio_request_chain *r_chain)
217{
218  rtems_chain_control *chain;
219  rtems_chain_node *node;
220  chain = &r_chain->perfd;
221  node = rtems_chain_first (chain);
222 
223  while (!rtems_chain_is_tail (chain, node))
224    {
225      rtems_chain_extract (node);
226      rtems_aio_request *req = (rtems_aio_request *) node;
227      node = rtems_chain_next (node);
228      req->aiocbp->error_code = ECANCELED;
229      req->aiocbp->return_value = -1;
230      free (req);
231    }
232}
233
234/*
235 *  rtems_aio_remove_req
236 *
237 * Removes request from given chain
238 *
239 *  Input parameters:
240 *        chain      - pointer to fd chain which may contain
241 *                     the request
242 *        aiocbp     - pointer to request that needs to be
243 *                     canceled
244 *
245 *  Output parameters:
246 *         AIO_NOTCANCELED   - if request was not canceled
247 *         AIO_CANCELED      - if request was canceled
248 */
249
250int rtems_aio_remove_req (rtems_chain_control *chain, struct aiocb *aiocbp)
251{
252  if (rtems_chain_is_empty (chain))
253    return AIO_ALLDONE;
254
255  rtems_chain_node *node = rtems_chain_first (chain);
256  rtems_aio_request *current;
257 
258  current = (rtems_aio_request *) node;
259
260  while (!rtems_chain_is_tail (chain, node) && current->aiocbp != aiocbp) {
261    node = rtems_chain_next (node);
262    current = (rtems_aio_request *) node;
263  }
264 
265  if (rtems_chain_is_tail (chain, node))
266    return AIO_NOTCANCELED;
267  else
268    {
269      rtems_chain_extract (node);
270      current->aiocbp->error_code = ECANCELED;
271      current->aiocbp->return_value = -1;
272      free (current);
273    }
274   
275  return AIO_CANCELED;
276}
277
278/*
279 *  rtems_aio_enqueue
280 *
281 * Enqueue requests, and creates threads to process them
282 *
283 *  Input parameters:
284 *        req        - see aio_misc.h
285 *
286 *  Output parameters:
287 *         0         - if request was added to queue
288 *         errno     - otherwise
289 */
290
291int
292rtems_aio_enqueue (rtems_aio_request *req)
293{
294
295  rtems_aio_request_chain *r_chain;
296  rtems_chain_control *chain;
297  pthread_t thid;
298  int result, policy;
299  struct sched_param param;
300
301  /* The queue should be initialized */
302  AIO_assert (aio_request_queue.initialized == AIO_QUEUE_INITIALIZED);
303
304  result = pthread_mutex_lock (&aio_request_queue.mutex);
305  if (result != 0) {
306    free (req);
307    return result;
308  }
309
310  /* _POSIX_PRIORITIZED_IO and _POSIX_PRIORITY_SCHEDULING are defined,
311     we can use aio_reqprio to lower the priority of the request */
312  pthread_getschedparam (pthread_self(), &policy, &param);
313
314  req->caller_thread = pthread_self ();
315  req->priority = param.sched_priority - req->aiocbp->aio_reqprio;
316  req->policy = policy;
317  req->aiocbp->error_code = EINPROGRESS;
318  req->aiocbp->return_value = 0;
319
320  if ((aio_request_queue.idle_threads == 0) &&
321      aio_request_queue.active_threads < AIO_MAX_THREADS)
322    /* we still have empty places on the active_threads chain */
323    {
324      chain = &aio_request_queue.work_req;
325      r_chain = rtems_aio_search_fd (chain, req->aiocbp->aio_fildes, 1);
326     
327      if (r_chain->new_fd == 1) {
328        rtems_chain_prepend (&r_chain->perfd, &req->next_prio);
329        r_chain->new_fd = 0;
330        pthread_mutex_init (&r_chain->mutex, NULL);
331        pthread_cond_init (&r_chain->cond, NULL);
332           
333        AIO_printf ("New thread \n");
334        result = pthread_create (&thid, &aio_request_queue.attr,
335                                 rtems_aio_handle, (void *) r_chain);
336        if (result != 0) {
337          pthread_mutex_unlock (&aio_request_queue.mutex);
338          return result;
339        }
340        ++aio_request_queue.active_threads;
341      }
342      else {
343        /* put request in the fd chain it belongs to */
344        pthread_mutex_lock (&r_chain->mutex);
345        rtems_aio_insert_prio (&r_chain->perfd, req);
346        pthread_cond_signal (&r_chain->cond);
347        pthread_mutex_unlock (&r_chain->mutex);
348      }
349    }
350  else
351    {
352      /* the maximum number of threads has been already created
353         even though some of them might be idle.
354         The request belongs to one of the active fd chain */
355      r_chain = rtems_aio_search_fd (&aio_request_queue.work_req,
356                                     req->aiocbp->aio_fildes, 0);
357      if (r_chain != NULL)
358        {
359          pthread_mutex_lock (&r_chain->mutex);
360          rtems_aio_insert_prio (&r_chain->perfd, req);
361          pthread_cond_signal (&r_chain->cond);
362          pthread_mutex_unlock (&r_chain->mutex);
363           
364        } else {
365     
366        /* or to the idle chain */
367        chain = &aio_request_queue.idle_req;
368        r_chain = rtems_aio_search_fd (chain, req->aiocbp->aio_fildes, 1);
369     
370        if (r_chain->new_fd == 1) {
371          /* If this is a new fd chain we signal the idle threads that
372             might be waiting for requests */
373          AIO_printf (" New chain on waiting queue \n ");
374          rtems_chain_prepend (&r_chain->perfd, &req->next_prio);
375          r_chain->new_fd = 0;
376          pthread_mutex_init (&r_chain->mutex, NULL);
377          pthread_cond_init (&r_chain->cond, NULL);
378        } else
379          /* just insert the request in the existing fd chain */
380          rtems_aio_insert_prio (&r_chain->perfd, req);
381        if (aio_request_queue.idle_threads > 0)
382          pthread_cond_signal (&aio_request_queue.new_req);
383      }
384    }
385
386  pthread_mutex_unlock (&aio_request_queue.mutex);
387  return 0;
388}
389
390/*
391 *  rtems_aio_handle
392 *
393 * Thread processing requests
394 *
395 *  Input parameters:
396 *        arg        - the chain for the fd to be worked on
397 *
398 *  Output parameters:
399 *        NULL       - if error
400 */
401
402static void *
403rtems_aio_handle (void *arg)
404{
405
406  rtems_aio_request_chain *r_chain = arg;
407  rtems_aio_request *req;
408  rtems_chain_control *chain;
409  rtems_chain_node *node;
410  int result, policy;
411  struct sched_param param;
412
413  AIO_printf ("Thread started\n");
414 
415  while (1) {
416   
417    /* acquire the mutex of the current fd chain.
418       we don't need to lock the queue mutex since we can
419       add requests to idle fd chains or even active ones
420       if the working request has been extracted from the
421       chain */
422    result = pthread_mutex_lock (&r_chain->mutex);
423    if (result != 0)
424      return NULL;
425   
426    chain = &r_chain->perfd;   
427
428    /* If the locked chain is not empty, take the first
429       request extract it, unlock the chain and process
430       the request, in this way the user can supply more
431       requests to this fd chain */
432    if (!rtems_chain_is_empty (chain)) {
433
434      AIO_printf ("Get new request from not empty chain\n");   
435      node = rtems_chain_first (chain);
436      req = (rtems_aio_request *) node;
437     
438      /* See _POSIX_PRIORITIZE_IO and _POSIX_PRIORITY_SCHEDULING
439         discussion in rtems_aio_enqueue () */
440      pthread_getschedparam (pthread_self(), &policy, &param);
441      param.sched_priority = req->priority;
442      pthread_setschedparam (pthread_self(), req->policy, &param);
443
444      rtems_chain_extract (node);
445
446      pthread_mutex_unlock (&r_chain->mutex);
447
448      switch (req->aiocbp->aio_lio_opcode) {
449      case LIO_READ:
450        AIO_printf ("read\n");
451        result = pread (req->aiocbp->aio_fildes,
452                        (void *) req->aiocbp->aio_buf,
453                        req->aiocbp->aio_nbytes, req->aiocbp->aio_offset);
454        break;
455
456      case LIO_WRITE:
457        AIO_printf ("write\n");
458        result = pwrite (req->aiocbp->aio_fildes,
459                         (void *) req->aiocbp->aio_buf,
460                         req->aiocbp->aio_nbytes, req->aiocbp->aio_offset);
461        break;
462       
463      case LIO_SYNC:
464        AIO_printf ("sync\n");
465        result = fsync (req->aiocbp->aio_fildes);
466        break;
467
468      default:
469        result = -1;
470      }
471      if (result == -1) {
472        req->aiocbp->return_value = -1;
473        req->aiocbp->error_code = errno;
474      } else {
475        req->aiocbp->return_value = result;
476        req->aiocbp->error_code = 0;
477      }
478
479      // notification needed for lio
480
481    } else {
482      /* If the fd chain is empty we unlock the fd chain
483         and we lock the queue chain, this will ensure that
484         we have at most one request comming to our fd chain
485         when we check.
486         
487         If there was no request added sleep for 3 seconds and
488         wait for a signal on chain, this will unlock the queue.
489         The fd chain is already unlocked */
490
491      struct timespec timeout;
492     
493      AIO_printf ("Chain is empty [WQ], wait for work\n");
494     
495      pthread_mutex_unlock (&r_chain->mutex);
496      pthread_mutex_lock (&aio_request_queue.mutex);
497     
498      if (rtems_chain_is_empty (chain))
499        {
500          clock_gettime (CLOCK_REALTIME, &timeout);
501          timeout.tv_sec += 3;
502          timeout.tv_nsec = 0;
503          result = pthread_cond_timedwait (&r_chain->cond,
504                                           &aio_request_queue.mutex,
505                                           &timeout);
506
507          /* If no requests were added to the chain we delete the fd chain from
508             the queue and start working with idle fd chains */
509          if (result == ETIMEDOUT) {
510            rtems_chain_extract (&r_chain->next_fd);
511            pthread_mutex_destroy (&r_chain->mutex);
512            pthread_cond_destroy (&r_chain->cond);
513            free (r_chain);
514           
515            /* If the idle chain is empty sleep for 3 seconds and wait for a
516               signal. The thread now becomes idle. */
517            if (rtems_chain_is_empty (&aio_request_queue.idle_req)) {
518              AIO_printf ("Chain is empty [IQ], wait for work\n");           
519
520              ++aio_request_queue.idle_threads;
521              --aio_request_queue.active_threads;
522              clock_gettime (CLOCK_REALTIME, &timeout);
523              timeout.tv_sec += 3;
524              timeout.tv_nsec = 0;
525
526              result = pthread_cond_timedwait (&aio_request_queue.new_req,
527                                               &aio_request_queue.mutex,
528                                               &timeout);
529             
530              /* If no new fd chain was added in the idle requests
531                 then this thread is finished */
532              if (result == ETIMEDOUT) {
533                AIO_printf ("Etimeout\n");
534                --aio_request_queue.idle_threads;
535                pthread_mutex_unlock (&aio_request_queue.mutex);
536                return NULL;
537              }
538            }
539            /* Otherwise move this chain to the working chain and
540               start the loop all over again */
541            AIO_printf ("Work on idle\n");
542            --aio_request_queue.idle_threads;
543            ++aio_request_queue.active_threads;
544
545            node = rtems_chain_first (&aio_request_queue.idle_req);
546            rtems_chain_extract (node);
547
548            r_chain = (rtems_aio_request_chain *) node;
549            rtems_aio_move_to_work (r_chain);
550           
551          }
552        }
553      /* If there was a request added in the initial fd chain then release
554         the mutex and process it */
555      pthread_mutex_unlock (&aio_request_queue.mutex);
556     
557    }
558  }
559 
560  AIO_printf ("Thread finished\n");
561  return NULL;
562}
Note: See TracBrowser for help on using the repository browser.