source: rtems/cpukit/posix/src/aio_misc.c @ 6c22fa1

4.115
Last change on this file since 6c22fa1 was 6c22fa1, checked in by Ralf Corsepius <ralf.corsepius@…>, on 08/20/10 at 14:37:08

2010-08-16 Ralf Corsépius <ralf.corsepius@…>

  • posix/include/rtems/posix/aio_misc.h: Add decl for aio_request_queue.
  • posix/src/aio_misc.c: Add aio_request_queue.
  • Property mode set to 100644
File size: 13.6 KB
Line 
1/*
2 * Copyright 2010, Alin Rus <alin.codejunkie@gmail.com>
3 *
4 * The license and distribution terms for this file may be
5 * found in the file LICENSE in this distribution or at
6 * http://www.rtems.com/license/LICENSE.
7 *
8 * $Id$
9 */
10
11#include <pthread.h>
12#include <stdlib.h>
13#include <unistd.h>
14#include <time.h>
15#include <rtems/posix/aio_misc.h>
16
17static void *rtems_aio_handle (void *arg);
18
19rtems_aio_queue aio_request_queue;
20
21/*
22 *  rtems_aio_init
23 *
24 * Initialize the request queue for aio
25 *
26 *  Input parameters:
27 *        NONE
28 *
29 *  Output parameters:
30 *        0    -    if initialization succeeded
31 */
32
33int
34rtems_aio_init (void)
35{
36  int result = 0;
37
38  result = pthread_attr_init (&aio_request_queue.attr);
39  if (result != 0)
40    return result;
41
42  result =
43    pthread_attr_setdetachstate (&aio_request_queue.attr,
44                                 PTHREAD_CREATE_DETACHED);
45  if (result != 0)
46    pthread_attr_destroy (&aio_request_queue.attr);
47
48
49  result = pthread_mutex_init (&aio_request_queue.mutex, NULL);
50  if (result != 0)
51    pthread_attr_destroy (&aio_request_queue.attr);
52
53
54  result = pthread_cond_init (&aio_request_queue.new_req, NULL);
55  if (result != 0) {
56    pthread_mutex_destroy (&aio_request_queue.mutex);
57    pthread_attr_destroy (&aio_request_queue.attr);
58  }
59
60  rtems_chain_initialize_empty (&aio_request_queue.work_req);
61  rtems_chain_initialize_empty (&aio_request_queue.idle_req);
62
63  aio_request_queue.active_threads = 0;
64  aio_request_queue.idle_threads = 0;
65  aio_request_queue.initialized = AIO_QUEUE_INITIALIZED;
66
67  return result;
68}
69
70/*
71 *  rtems_aio_search_fd
72 *
73 * Search and create chain of requests for given FD
74 *
75 *  Input parameters:
76 *        chain        - chain of FD chains
77 *        fildes       - file descriptor to search
78 *        create       - if 1 search and create
79 *                     - if 0 just search
80 *
81 *  Output parameters:
82 *        r_chain      - NULL if create == 0 and there is
83 *                       no chain for given fildes
84 *                     - pointer to chain is there exists
85 *                       a chain for given fildes
86 *                     - pointer to newly create chain if
87 *                       create == 1
88 *
89 */
90
91rtems_aio_request_chain *
92rtems_aio_search_fd (rtems_chain_control *chain, int fildes, int create)
93{
94  rtems_aio_request_chain *r_chain;
95  rtems_chain_node *node;
96
97  node = chain->first;
98  r_chain = (rtems_aio_request_chain *) node;
99
100  while (r_chain->fildes < fildes && !rtems_chain_is_tail (chain, node)) {
101    node = node->next;
102    r_chain = (rtems_aio_request_chain *) node;
103  }
104
105  if (r_chain->fildes == fildes)
106    r_chain->new_fd = 0;
107  else {
108    if (create == 0)
109      r_chain = NULL;
110    else {
111      r_chain = malloc (sizeof (rtems_aio_request_chain));
112      rtems_chain_initialize_empty (&r_chain->perfd);
113
114      if (rtems_chain_is_empty (chain))
115        rtems_chain_prepend (chain, &r_chain->next_fd);
116      else
117        rtems_chain_insert (node->previous, &r_chain->next_fd);
118
119      r_chain->new_fd = 1;
120    }
121  }
122  return r_chain;
123}
124
125/*
126 *  rtems_aio_insert_prio
127 *
128 * Add request to given FD chain. The chain is ordered
129 * by priority
130 *
131 *  Input parameters:
132 *        chain        - chain of requests for a given FD
133 *        req          - request (see aio_misc.h)
134 *
135 *  Output parameters:
136 *        NONE
137 */
138
139void
140rtems_aio_insert_prio (rtems_chain_control *chain, rtems_aio_request *req)
141{
142  rtems_chain_node *node;
143
144  AIO_printf ("FD exists \n");
145  node = chain->first;
146
147  if (rtems_chain_is_empty (chain)) {
148    AIO_printf ("First in chain \n");
149    rtems_chain_prepend (chain, &req->next_prio);
150  } else {
151    AIO_printf ("Add by priority \n");
152    int prio = ((rtems_aio_request *) node)->aiocbp->aio_reqprio;
153
154    while (req->aiocbp->aio_reqprio > prio &&
155           !rtems_chain_is_tail (chain, node)) {
156      node = node->next;
157      prio = ((rtems_aio_request *) node)->aiocbp->aio_reqprio;
158    }
159
160    rtems_chain_insert (node->previous, &req->next_prio);
161
162  }
163}
164
165/*
166 *  rtems_aio_remove_fd
167 *
168 * Removes all the requests in a fd chain
169 *
170 *  Input parameters:
171 *        r_chain        - pointer to the fd chain request
172 *
173 *  Output parameters:
174 *        NONE
175 */
176
177void rtems_aio_remove_fd (rtems_aio_request_chain *r_chain)
178{
179  rtems_chain_control *chain;
180  rtems_chain_node *node;
181
182  chain = &r_chain->perfd;
183  node = chain->first;
184 
185  while (!rtems_chain_is_tail (chain, node))
186    {
187      rtems_chain_extract (node);
188      rtems_aio_request *req = (rtems_aio_request *) node;
189      req->aiocbp->error_code = ECANCELED;
190      req->aiocbp->return_value = -1;
191      free (req);
192    }
193}
194
195/*
196 *  rtems_aio_remove_req
197 *
198 * Removes request from given chain
199 *
200 *  Input parameters:
201 *        chain      - pointer to fd chain which may contain
202 *                     the request
203 *        aiocbp     - pointer to request that needs to be
204 *                     canceled
205 *
206 *  Output parameters:
207 *         AIO_NOTCANCELED   - if request was not canceled
208 *         AIO_CANCELED      - if request was canceled
209 */
210
211int rtems_aio_remove_req (rtems_chain_control *chain, struct aiocb *aiocbp)
212{
213  rtems_chain_node *node = chain->first;
214  rtems_aio_request *current;
215 
216  current = (rtems_aio_request *) node;
217
218  while (!rtems_chain_is_tail (chain, node) && current->aiocbp != aiocbp) {
219    node = node->next;
220    current = (rtems_aio_request *) node;
221  }
222 
223  if (rtems_chain_is_tail (chain, node))
224    return AIO_NOTCANCELED;
225  else
226    {
227      rtems_chain_extract (node);
228      current->aiocbp->error_code = ECANCELED;
229      current->aiocbp->return_value = -1;
230      free (current);
231    }
232   
233  return AIO_CANCELED;
234}
235
236/*
237 *  rtems_aio_enqueue
238 *
239 * Enqueue requests, and creates threads to process them
240 *
241 *  Input parameters:
242 *        req        - see aio_misc.h
243 *
244 *  Output parameters:
245 *         0         - if request was added to queue
246 *         errno     - otherwise
247 */
248
249int
250rtems_aio_enqueue (rtems_aio_request *req)
251{
252
253  rtems_aio_request_chain *r_chain;
254  rtems_chain_control *chain;
255  pthread_t thid;
256  int result, policy;
257  struct sched_param param;
258
259  /* The queue should be initialized */
260  AIO_assert (aio_request_queue.initialized != AIO_QUEUE_INITIALIZED);
261
262  result = pthread_mutex_lock (&aio_request_queue.mutex);
263  if (result != 0) {
264    free (req);
265    return result;
266  }
267
268  /* _POSIX_PRIORITIZED_IO and _POSIX_PRIORITY_SCHEDULING are defined,
269     we can use aio_reqprio to lower the priority of the request */
270  pthread_getschedparam (pthread_self(), &policy, &param);
271
272  req->caller_thread = pthread_self ();
273  req->priority = param.sched_priority - req->aiocbp->aio_reqprio;
274  req->policy = policy;
275  req->aiocbp->error_code = EINPROGRESS;
276  req->aiocbp->return_value = 0;
277
278  if ((aio_request_queue.idle_threads == 0) &&
279      aio_request_queue.active_threads < AIO_MAX_THREADS)
280    /* we still have empty places on the active_threads chain */
281    {
282      chain = &aio_request_queue.work_req;
283      r_chain = rtems_aio_search_fd (chain, req->aiocbp->aio_fildes, 1);
284     
285      if (r_chain->new_fd == 1) {
286        rtems_chain_prepend (&r_chain->perfd, &req->next_prio);
287        r_chain->new_fd = 0;
288        pthread_mutex_init (&r_chain->mutex, NULL);
289        pthread_cond_init (&r_chain->cond, NULL);
290       
291        AIO_printf ("New thread");
292        result = pthread_create (&thid, &aio_request_queue.attr,
293                                 rtems_aio_handle, (void *) r_chain);
294        if (result != 0) {
295          pthread_mutex_unlock (&aio_request_queue.mutex);
296          return result;
297        }
298        ++aio_request_queue.active_threads;
299      }
300      else {
301        /* put request in the fd chain it belongs to */
302        pthread_mutex_lock (&r_chain->mutex);
303        rtems_aio_insert_prio (&r_chain->perfd, req);
304        pthread_cond_signal (&r_chain->cond);
305        pthread_mutex_unlock (&r_chain->mutex);
306      }
307    }
308  else
309    {
310      /* the maximum number of threads has been already created
311         even though some of them might be idle.
312         The request belongs to one of the active fd chain */
313      r_chain = rtems_aio_search_fd (&aio_request_queue.work_req,
314                                     req->aiocbp->aio_fildes, 0);
315      if (r_chain != NULL)
316        {
317          pthread_mutex_lock (&r_chain->mutex);
318          rtems_aio_insert_prio (&r_chain->perfd, req);
319          pthread_cond_signal (&r_chain->cond);
320          pthread_mutex_unlock (&r_chain->mutex);
321           
322        } else {
323     
324        /* or to the idle chain */
325        chain = &aio_request_queue.idle_req;
326        r_chain = rtems_aio_search_fd (chain, req->aiocbp->aio_fildes, 1);
327     
328        if (r_chain->new_fd == 1) {
329          /* If this is a new fd chain we signal the idle threads that
330             might be waiting for requests */
331          rtems_chain_prepend (&r_chain->perfd, &req->next_prio);
332          r_chain->new_fd = 0;
333          pthread_mutex_init (&r_chain->mutex, NULL);
334          pthread_cond_init (&r_chain->cond, NULL);
335          pthread_cond_signal (&aio_request_queue.new_req);
336        } else
337          /* just insert the request in the existing fd chain */
338          rtems_aio_insert_prio (&r_chain->perfd, req);
339      }
340    }
341
342  pthread_mutex_unlock (&aio_request_queue.mutex);
343  return 0;
344}
345
346/*
347 *  rtems_aio_handle
348 *
349 * Thread processing requests
350 *
351 *  Input parameters:
352 *        arg        - the chain for the fd to be worked on
353 *
354 *  Output parameters:
355 *        NULL       - if error
356 */
357
358static void *
359rtems_aio_handle (void *arg)
360{
361
362  rtems_aio_request_chain *r_chain = arg;
363  rtems_aio_request *req;
364  rtems_chain_control *chain;
365  rtems_chain_node *node;
366  int result, policy;
367  struct sched_param param;
368
369  AIO_printf ("Thread started\n");
370 
371  while (1) {
372   
373    /* acquire the mutex of the current fd chain.
374       we don't need to lock the queue mutex since we can
375       add requests to idle fd chains or even active ones
376       if the working request has been extracted from the
377       chain */
378    result = pthread_mutex_lock (&r_chain->mutex);
379    if (result != 0)
380      return NULL;
381   
382    chain = &r_chain->perfd;
383
384    /* If the locked chain is not empty, take the first
385       request extract it, unlock the chain and process
386       the request, in this way the user can supply more
387       requests to this fd chain */
388    if (!rtems_chain_is_empty (chain)) {
389
390      node = chain->first;
391      req = (rtems_aio_request *) node;
392     
393      /* See _POSIX_PRIORITIZE_IO and _POSIX_PRIORITY_SCHEDULING
394         discussion in rtems_aio_enqueue () */
395      pthread_getschedparam (pthread_self(), &policy, &param);
396      param.sched_priority = req->priority;
397      pthread_setschedparam (pthread_self(), req->policy, &param);
398
399      rtems_chain_extract (node);
400
401      pthread_mutex_unlock (&r_chain->mutex);
402
403      switch (req->aiocbp->aio_lio_opcode) {
404      case LIO_READ:
405        result = pread (req->aiocbp->aio_fildes,
406                        (void *) req->aiocbp->aio_buf,
407                        req->aiocbp->aio_nbytes, req->aiocbp->aio_offset);
408        break;
409
410      case LIO_WRITE:
411        result = pwrite (req->aiocbp->aio_fildes,
412                         (void *) req->aiocbp->aio_buf,
413                         req->aiocbp->aio_nbytes, req->aiocbp->aio_offset);
414        break;
415       
416      case LIO_SYNC:
417        result = fsync (req->aiocbp->aio_fildes);
418        break;
419
420      default:
421        result = -1;
422      }
423      if (result == -1) {
424        req->aiocbp->return_value = -1;
425        req->aiocbp->error_code = errno;
426      } else {
427        req->aiocbp->return_value = result;
428        req->aiocbp->error_code = 0;
429      }
430
431      // notification needed for lio
432
433    } else {
434      /* If the fd chain is empty we unlock the fd chain
435         and we lock the queue chain, this will ensure that
436         we have at most one request comming to our fd chain
437         when we check.
438         
439         If there was no request added sleep for 3 seconds and
440         wait for a signal on chain, this will unlock the queue.
441         The fd chain is already unlocked */
442     
443      struct timespec timeout;
444     
445      pthread_mutex_unlock (&r_chain->mutex);
446      pthread_mutex_lock (&aio_request_queue.mutex);
447      if (rtems_chain_is_empty (chain))
448        {
449          clock_gettime (CLOCK_REALTIME, &timeout);
450          timeout.tv_sec += 3;
451          timeout.tv_nsec = 0;
452          result = pthread_cond_timedwait (&r_chain->cond,
453                                           &aio_request_queue.mutex, &timeout);
454         
455          /* If no requests were added to the chain we delete the fd chain from
456             the queue and start working with idle fd chains */
457          if (result == ETIMEDOUT) {
458            rtems_chain_extract (&r_chain->next_fd);
459            pthread_mutex_destroy (&r_chain->mutex);
460            pthread_cond_destroy (&r_chain->cond);
461            free (r_chain);
462           
463            /* If the idle chain is empty sleep for 3 seconds and wait for a
464               signal. The thread now becomes idle. */
465            if (rtems_chain_is_empty (&aio_request_queue.idle_req)) {
466              ++aio_request_queue.idle_threads;
467              clock_gettime (CLOCK_REALTIME, &timeout);
468              timeout.tv_sec += 3;
469              timeout.tv_nsec = 0;
470              result = pthread_cond_timedwait (&aio_request_queue.new_req,
471                                               &aio_request_queue.mutex,
472                                               &timeout);
473             
474              /* If no new fd chain was added in the idle requests
475                 then this thread is finished */
476              if (result == ETIMEDOUT) {
477                pthread_mutex_unlock (&aio_request_queue.mutex);
478                return NULL;
479              }
480             
481              /* Otherwise move this chain to the working chain and
482                 start the loop all over again */
483              --aio_request_queue.idle_threads;
484              node = aio_request_queue.idle_req.first;
485              rtems_chain_extract (node);
486              r_chain = rtems_aio_search_fd (&aio_request_queue.work_req,
487                                             ((rtems_aio_request_chain *)node)->fildes,
488                                             1);
489              r_chain->new_fd = 0;
490              pthread_mutex_init (&r_chain->mutex, NULL);
491              pthread_cond_init (&r_chain->cond, NULL);
492             
493              r_chain->perfd = ((rtems_aio_request_chain *)node)->perfd;
494            }
495            else
496              /* If there was a request added in the initial fd chain then release
497                 the mutex and process it */
498              pthread_mutex_unlock (&aio_request_queue.mutex);
499          }
500        }
501    }
502  }
503 
504
505  AIO_printf ("Thread finished\n");
506  return NULL;
507}
Note: See TracBrowser for help on using the repository browser.