source: rtems-libbsd/freebsd/sys/kern/subr_taskqueue.c @ 4a8f953

55-freebsd-126-freebsd-12
Last change on this file since 4a8f953 was 4a8f953, checked in by Kevin Kirspel <kevin-kirspel@…>, on 05/04/17 at 12:27:58

Updating FREEBSD for tty support

  • Property mode set to 100644
File size: 21.1 KB
Line 
1#include <machine/rtems-bsd-kernel-space.h>
2
3/*-
4 * Copyright (c) 2000 Doug Rabson
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 * 1. Redistributions of source code must retain the above copyright
11 *    notice, this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright
13 *    notice, this list of conditions and the following disclaimer in the
14 *    documentation and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
17 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
20 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
21 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
22 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
23 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
24 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
25 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
26 * SUCH DAMAGE.
27 */
28
29#include <sys/cdefs.h>
30__FBSDID("$FreeBSD$");
31
32#include <rtems/bsd/sys/param.h>
33#include <sys/systm.h>
34#include <sys/bus.h>
35#include <rtems/bsd/sys/cpuset.h>
36#include <sys/interrupt.h>
37#include <sys/kernel.h>
38#include <sys/kthread.h>
39#include <sys/libkern.h>
40#include <sys/limits.h>
41#include <rtems/bsd/sys/lock.h>
42#include <sys/malloc.h>
43#include <sys/mutex.h>
44#include <sys/proc.h>
45#include <sys/sched.h>
46#include <sys/smp.h>
47#include <sys/taskqueue.h>
48#include <rtems/bsd/sys/unistd.h>
49#include <machine/stdarg.h>
50
51static MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues");
52static void     *taskqueue_giant_ih;
53static void     *taskqueue_ih;
54static void      taskqueue_fast_enqueue(void *);
55static void      taskqueue_swi_enqueue(void *);
56static void      taskqueue_swi_giant_enqueue(void *);
57
58struct taskqueue_busy {
59        struct task     *tb_running;
60        TAILQ_ENTRY(taskqueue_busy) tb_link;
61};
62
63struct task * const TB_DRAIN_WAITER = (struct task *)0x1;
64
65struct taskqueue {
66        STAILQ_HEAD(, task)     tq_queue;
67        taskqueue_enqueue_fn    tq_enqueue;
68        void                    *tq_context;
69        char                    *tq_name;
70        TAILQ_HEAD(, taskqueue_busy) tq_active;
71        struct mtx              tq_mutex;
72        struct thread           **tq_threads;
73        int                     tq_tcount;
74#ifndef __rtems__
75        int                     tq_spin;
76#endif /* __rtems__ */
77        int                     tq_flags;
78        int                     tq_callouts;
79        taskqueue_callback_fn   tq_callbacks[TASKQUEUE_NUM_CALLBACKS];
80        void                    *tq_cb_contexts[TASKQUEUE_NUM_CALLBACKS];
81};
82
83#define TQ_FLAGS_ACTIVE         (1 << 0)
84#define TQ_FLAGS_BLOCKED        (1 << 1)
85#define TQ_FLAGS_UNLOCKED_ENQUEUE       (1 << 2)
86
87#define DT_CALLOUT_ARMED        (1 << 0)
88#define DT_DRAIN_IN_PROGRESS    (1 << 1)
89
90#ifndef __rtems__
91#define TQ_LOCK(tq)                                                     \
92        do {                                                            \
93                if ((tq)->tq_spin)                                      \
94                        mtx_lock_spin(&(tq)->tq_mutex);                 \
95                else                                                    \
96                        mtx_lock(&(tq)->tq_mutex);                      \
97        } while (0)
98#else /* __rtems__ */
99#define TQ_LOCK(tq)                                                     \
100        do {                                                            \
101                mtx_lock(&(tq)->tq_mutex);                              \
102        } while (0)
103#endif /* __rtems__ */
104#define TQ_ASSERT_LOCKED(tq)    mtx_assert(&(tq)->tq_mutex, MA_OWNED)
105
106#ifndef __rtems__
107#define TQ_UNLOCK(tq)                                                   \
108        do {                                                            \
109                if ((tq)->tq_spin)                                      \
110                        mtx_unlock_spin(&(tq)->tq_mutex);               \
111                else                                                    \
112                        mtx_unlock(&(tq)->tq_mutex);                    \
113        } while (0)
114#else /* __rtems__ */
115#define TQ_UNLOCK(tq)                                                   \
116        do {                                                            \
117                mtx_unlock(&(tq)->tq_mutex);                            \
118        } while (0)
119#endif /* __rtems__ */
120#define TQ_ASSERT_UNLOCKED(tq)  mtx_assert(&(tq)->tq_mutex, MA_NOTOWNED)
121
122void
123_timeout_task_init(struct taskqueue *queue, struct timeout_task *timeout_task,
124    int priority, task_fn_t func, void *context)
125{
126
127        TASK_INIT(&timeout_task->t, priority, func, context);
128        callout_init_mtx(&timeout_task->c, &queue->tq_mutex,
129            CALLOUT_RETURNUNLOCKED);
130        timeout_task->q = queue;
131        timeout_task->f = 0;
132}
133
134static __inline int
135TQ_SLEEP(struct taskqueue *tq, void *p, struct mtx *m, int pri, const char *wm,
136    int t)
137{
138#ifndef __rtems__
139        if (tq->tq_spin)
140                return (msleep_spin(p, m, wm, t));
141#endif /* __rtems__ */
142        return (msleep(p, m, pri, wm, t));
143}
144
145static struct taskqueue *
146_taskqueue_create(const char *name, int mflags,
147                 taskqueue_enqueue_fn enqueue, void *context,
148                 int mtxflags, const char *mtxname __unused)
149{
150        struct taskqueue *queue;
151        char *tq_name;
152
153        tq_name = malloc(TASKQUEUE_NAMELEN, M_TASKQUEUE, mflags | M_ZERO);
154        if (tq_name == NULL)
155                return (NULL);
156
157        queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO);
158        if (queue == NULL) {
159                free(tq_name, M_TASKQUEUE);
160                return (NULL);
161        }
162
163        snprintf(tq_name, TASKQUEUE_NAMELEN, "%s", (name) ? name : "taskqueue");
164
165        STAILQ_INIT(&queue->tq_queue);
166        TAILQ_INIT(&queue->tq_active);
167        queue->tq_enqueue = enqueue;
168        queue->tq_context = context;
169        queue->tq_name = tq_name;
170#ifndef __rtems__
171        queue->tq_spin = (mtxflags & MTX_SPIN) != 0;
172#else /* __rtems__ */
173        /*
174         * FIXME: Here is a potential performance optimization.  Maybe also an
175         * issue for correctness.
176         */
177#endif /* __rtems__ */
178        queue->tq_flags |= TQ_FLAGS_ACTIVE;
179        if (enqueue == taskqueue_fast_enqueue ||
180            enqueue == taskqueue_swi_enqueue ||
181            enqueue == taskqueue_swi_giant_enqueue ||
182            enqueue == taskqueue_thread_enqueue)
183                queue->tq_flags |= TQ_FLAGS_UNLOCKED_ENQUEUE;
184        mtx_init(&queue->tq_mutex, tq_name, NULL, mtxflags);
185
186        return (queue);
187}
188
189struct taskqueue *
190taskqueue_create(const char *name, int mflags,
191                 taskqueue_enqueue_fn enqueue, void *context)
192{
193
194        return _taskqueue_create(name, mflags, enqueue, context,
195                        MTX_DEF, name);
196}
197
198void
199taskqueue_set_callback(struct taskqueue *queue,
200    enum taskqueue_callback_type cb_type, taskqueue_callback_fn callback,
201    void *context)
202{
203
204        KASSERT(((cb_type >= TASKQUEUE_CALLBACK_TYPE_MIN) &&
205            (cb_type <= TASKQUEUE_CALLBACK_TYPE_MAX)),
206            ("Callback type %d not valid, must be %d-%d", cb_type,
207            TASKQUEUE_CALLBACK_TYPE_MIN, TASKQUEUE_CALLBACK_TYPE_MAX));
208        KASSERT((queue->tq_callbacks[cb_type] == NULL),
209            ("Re-initialization of taskqueue callback?"));
210
211        queue->tq_callbacks[cb_type] = callback;
212        queue->tq_cb_contexts[cb_type] = context;
213}
214
215/*
216 * Signal a taskqueue thread to terminate.
217 */
218static void
219taskqueue_terminate(struct thread **pp, struct taskqueue *tq)
220{
221
222        while (tq->tq_tcount > 0 || tq->tq_callouts > 0) {
223                wakeup(tq);
224                TQ_SLEEP(tq, pp, &tq->tq_mutex, PWAIT, "taskqueue_destroy", 0);
225        }
226}
227
228void
229taskqueue_free(struct taskqueue *queue)
230{
231
232        TQ_LOCK(queue);
233        queue->tq_flags &= ~TQ_FLAGS_ACTIVE;
234        taskqueue_terminate(queue->tq_threads, queue);
235        KASSERT(TAILQ_EMPTY(&queue->tq_active), ("Tasks still running?"));
236        KASSERT(queue->tq_callouts == 0, ("Armed timeout tasks"));
237        mtx_destroy(&queue->tq_mutex);
238        free(queue->tq_threads, M_TASKQUEUE);
239        free(queue->tq_name, M_TASKQUEUE);
240        free(queue, M_TASKQUEUE);
241}
242
243static int
244taskqueue_enqueue_locked(struct taskqueue *queue, struct task *task)
245{
246        struct task *ins;
247        struct task *prev;
248
249        KASSERT(task->ta_func != NULL, ("enqueueing task with NULL func"));
250        /*
251         * Count multiple enqueues.
252         */
253        if (task->ta_pending) {
254                if (task->ta_pending < USHRT_MAX)
255                        task->ta_pending++;
256                TQ_UNLOCK(queue);
257                return (0);
258        }
259
260        /*
261         * Optimise the case when all tasks have the same priority.
262         */
263        prev = STAILQ_LAST(&queue->tq_queue, task, ta_link);
264        if (!prev || prev->ta_priority >= task->ta_priority) {
265                STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link);
266        } else {
267                prev = NULL;
268                for (ins = STAILQ_FIRST(&queue->tq_queue); ins;
269                     prev = ins, ins = STAILQ_NEXT(ins, ta_link))
270                        if (ins->ta_priority < task->ta_priority)
271                                break;
272
273                if (prev)
274                        STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link);
275                else
276                        STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link);
277        }
278
279        task->ta_pending = 1;
280        if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) != 0)
281                TQ_UNLOCK(queue);
282        if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0)
283                queue->tq_enqueue(queue->tq_context);
284        if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) == 0)
285                TQ_UNLOCK(queue);
286
287        /* Return with lock released. */
288        return (0);
289}
290
291int
292taskqueue_enqueue(struct taskqueue *queue, struct task *task)
293{
294        int res;
295
296        TQ_LOCK(queue);
297        res = taskqueue_enqueue_locked(queue, task);
298        /* The lock is released inside. */
299
300        return (res);
301}
302
303static void
304taskqueue_timeout_func(void *arg)
305{
306        struct taskqueue *queue;
307        struct timeout_task *timeout_task;
308
309        timeout_task = arg;
310        queue = timeout_task->q;
311        KASSERT((timeout_task->f & DT_CALLOUT_ARMED) != 0, ("Stray timeout"));
312        timeout_task->f &= ~DT_CALLOUT_ARMED;
313        queue->tq_callouts--;
314        taskqueue_enqueue_locked(timeout_task->q, &timeout_task->t);
315        /* The lock is released inside. */
316}
317
318int
319taskqueue_enqueue_timeout(struct taskqueue *queue,
320    struct timeout_task *timeout_task, int ticks)
321{
322        int res;
323
324        TQ_LOCK(queue);
325        KASSERT(timeout_task->q == NULL || timeout_task->q == queue,
326            ("Migrated queue"));
327#ifndef __rtems__
328        KASSERT(!queue->tq_spin, ("Timeout for spin-queue"));
329#endif /* __rtems__ */
330        timeout_task->q = queue;
331        res = timeout_task->t.ta_pending;
332        if (timeout_task->f & DT_DRAIN_IN_PROGRESS) {
333                /* Do nothing */
334                TQ_UNLOCK(queue);
335                res = -1;
336        } else if (ticks == 0) {
337                taskqueue_enqueue_locked(queue, &timeout_task->t);
338                /* The lock is released inside. */
339        } else {
340                if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) {
341                        res++;
342                } else {
343                        queue->tq_callouts++;
344                        timeout_task->f |= DT_CALLOUT_ARMED;
345                        if (ticks < 0)
346                                ticks = -ticks; /* Ignore overflow. */
347                }
348                if (ticks > 0) {
349                        callout_reset(&timeout_task->c, ticks,
350                            taskqueue_timeout_func, timeout_task);
351                }
352                TQ_UNLOCK(queue);
353        }
354        return (res);
355}
356
357static void
358taskqueue_task_nop_fn(void *context, int pending)
359{
360}
361
362/*
363 * Block until all currently queued tasks in this taskqueue
364 * have begun execution.  Tasks queued during execution of
365 * this function are ignored.
366 */
367static void
368taskqueue_drain_tq_queue(struct taskqueue *queue)
369{
370        struct task t_barrier;
371
372        if (STAILQ_EMPTY(&queue->tq_queue))
373                return;
374
375        /*
376         * Enqueue our barrier after all current tasks, but with
377         * the highest priority so that newly queued tasks cannot
378         * pass it.  Because of the high priority, we can not use
379         * taskqueue_enqueue_locked directly (which drops the lock
380         * anyway) so just insert it at tail while we have the
381         * queue lock.
382         */
383        TASK_INIT(&t_barrier, USHRT_MAX, taskqueue_task_nop_fn, &t_barrier);
384        STAILQ_INSERT_TAIL(&queue->tq_queue, &t_barrier, ta_link);
385        t_barrier.ta_pending = 1;
386
387        /*
388         * Once the barrier has executed, all previously queued tasks
389         * have completed or are currently executing.
390         */
391        while (t_barrier.ta_pending != 0)
392                TQ_SLEEP(queue, &t_barrier, &queue->tq_mutex, PWAIT, "-", 0);
393}
394
395/*
396 * Block until all currently executing tasks for this taskqueue
397 * complete.  Tasks that begin execution during the execution
398 * of this function are ignored.
399 */
400static void
401taskqueue_drain_tq_active(struct taskqueue *queue)
402{
403        struct taskqueue_busy tb_marker, *tb_first;
404
405        if (TAILQ_EMPTY(&queue->tq_active))
406                return;
407
408        /* Block taskq_terminate().*/
409        queue->tq_callouts++;
410
411        /*
412         * Wait for all currently executing taskqueue threads
413         * to go idle.
414         */
415        tb_marker.tb_running = TB_DRAIN_WAITER;
416        TAILQ_INSERT_TAIL(&queue->tq_active, &tb_marker, tb_link);
417        while (TAILQ_FIRST(&queue->tq_active) != &tb_marker)
418                TQ_SLEEP(queue, &tb_marker, &queue->tq_mutex, PWAIT, "-", 0);
419        TAILQ_REMOVE(&queue->tq_active, &tb_marker, tb_link);
420
421        /*
422         * Wakeup any other drain waiter that happened to queue up
423         * without any intervening active thread.
424         */
425        tb_first = TAILQ_FIRST(&queue->tq_active);
426        if (tb_first != NULL && tb_first->tb_running == TB_DRAIN_WAITER)
427                wakeup(tb_first);
428
429        /* Release taskqueue_terminate(). */
430        queue->tq_callouts--;
431        if ((queue->tq_flags & TQ_FLAGS_ACTIVE) == 0)
432                wakeup_one(queue->tq_threads);
433}
434
435void
436taskqueue_block(struct taskqueue *queue)
437{
438
439        TQ_LOCK(queue);
440        queue->tq_flags |= TQ_FLAGS_BLOCKED;
441        TQ_UNLOCK(queue);
442}
443
444void
445taskqueue_unblock(struct taskqueue *queue)
446{
447
448        TQ_LOCK(queue);
449        queue->tq_flags &= ~TQ_FLAGS_BLOCKED;
450        if (!STAILQ_EMPTY(&queue->tq_queue))
451                queue->tq_enqueue(queue->tq_context);
452        TQ_UNLOCK(queue);
453}
454
455static void
456taskqueue_run_locked(struct taskqueue *queue)
457{
458        struct taskqueue_busy tb;
459        struct taskqueue_busy *tb_first;
460        struct task *task;
461        int pending;
462
463        KASSERT(queue != NULL, ("tq is NULL"));
464        TQ_ASSERT_LOCKED(queue);
465        tb.tb_running = NULL;
466
467        while (STAILQ_FIRST(&queue->tq_queue)) {
468                TAILQ_INSERT_TAIL(&queue->tq_active, &tb, tb_link);
469
470                /*
471                 * Carefully remove the first task from the queue and
472                 * zero its pending count.
473                 */
474                task = STAILQ_FIRST(&queue->tq_queue);
475                KASSERT(task != NULL, ("task is NULL"));
476                STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
477                pending = task->ta_pending;
478                task->ta_pending = 0;
479                tb.tb_running = task;
480                TQ_UNLOCK(queue);
481
482                KASSERT(task->ta_func != NULL, ("task->ta_func is NULL"));
483                task->ta_func(task->ta_context, pending);
484
485                TQ_LOCK(queue);
486                tb.tb_running = NULL;
487                wakeup(task);
488
489                TAILQ_REMOVE(&queue->tq_active, &tb, tb_link);
490                tb_first = TAILQ_FIRST(&queue->tq_active);
491                if (tb_first != NULL &&
492                    tb_first->tb_running == TB_DRAIN_WAITER)
493                        wakeup(tb_first);
494        }
495}
496
497void
498taskqueue_run(struct taskqueue *queue)
499{
500
501        TQ_LOCK(queue);
502        taskqueue_run_locked(queue);
503        TQ_UNLOCK(queue);
504}
505
506static int
507task_is_running(struct taskqueue *queue, struct task *task)
508{
509        struct taskqueue_busy *tb;
510
511        TQ_ASSERT_LOCKED(queue);
512        TAILQ_FOREACH(tb, &queue->tq_active, tb_link) {
513                if (tb->tb_running == task)
514                        return (1);
515        }
516        return (0);
517}
518
519/*
520 * Only use this function in single threaded contexts. It returns
521 * non-zero if the given task is either pending or running. Else the
522 * task is idle and can be queued again or freed.
523 */
524int
525taskqueue_poll_is_busy(struct taskqueue *queue, struct task *task)
526{
527        int retval;
528
529        TQ_LOCK(queue);
530        retval = task->ta_pending > 0 || task_is_running(queue, task);
531        TQ_UNLOCK(queue);
532
533        return (retval);
534}
535
536static int
537taskqueue_cancel_locked(struct taskqueue *queue, struct task *task,
538    u_int *pendp)
539{
540
541        if (task->ta_pending > 0)
542                STAILQ_REMOVE(&queue->tq_queue, task, task, ta_link);
543        if (pendp != NULL)
544                *pendp = task->ta_pending;
545        task->ta_pending = 0;
546        return (task_is_running(queue, task) ? EBUSY : 0);
547}
548
549int
550taskqueue_cancel(struct taskqueue *queue, struct task *task, u_int *pendp)
551{
552        int error;
553
554        TQ_LOCK(queue);
555        error = taskqueue_cancel_locked(queue, task, pendp);
556        TQ_UNLOCK(queue);
557
558        return (error);
559}
560
561int
562taskqueue_cancel_timeout(struct taskqueue *queue,
563    struct timeout_task *timeout_task, u_int *pendp)
564{
565        u_int pending, pending1;
566        int error;
567
568        TQ_LOCK(queue);
569        pending = !!(callout_stop(&timeout_task->c) > 0);
570        error = taskqueue_cancel_locked(queue, &timeout_task->t, &pending1);
571        if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) {
572                timeout_task->f &= ~DT_CALLOUT_ARMED;
573                queue->tq_callouts--;
574        }
575        TQ_UNLOCK(queue);
576
577        if (pendp != NULL)
578                *pendp = pending + pending1;
579        return (error);
580}
581
582void
583taskqueue_drain(struct taskqueue *queue, struct task *task)
584{
585
586#ifndef __rtems__
587        if (!queue->tq_spin)
588                WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__);
589#endif /* __rtems__ */
590
591        TQ_LOCK(queue);
592        while (task->ta_pending != 0 || task_is_running(queue, task))
593                TQ_SLEEP(queue, task, &queue->tq_mutex, PWAIT, "-", 0);
594        TQ_UNLOCK(queue);
595}
596
597void
598taskqueue_drain_all(struct taskqueue *queue)
599{
600
601#ifndef __rtems__
602        if (!queue->tq_spin)
603                WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__);
604#endif /* __rtems__ */
605
606        TQ_LOCK(queue);
607        taskqueue_drain_tq_queue(queue);
608        taskqueue_drain_tq_active(queue);
609        TQ_UNLOCK(queue);
610}
611
612void
613taskqueue_drain_timeout(struct taskqueue *queue,
614    struct timeout_task *timeout_task)
615{
616
617        /*
618         * Set flag to prevent timer from re-starting during drain:
619         */
620        TQ_LOCK(queue);
621        KASSERT((timeout_task->f & DT_DRAIN_IN_PROGRESS) == 0,
622            ("Drain already in progress"));
623        timeout_task->f |= DT_DRAIN_IN_PROGRESS;
624        TQ_UNLOCK(queue);
625
626        callout_drain(&timeout_task->c);
627        taskqueue_drain(queue, &timeout_task->t);
628
629        /*
630         * Clear flag to allow timer to re-start:
631         */
632        TQ_LOCK(queue);
633        timeout_task->f &= ~DT_DRAIN_IN_PROGRESS;
634        TQ_UNLOCK(queue);
635}
636
637static void
638taskqueue_swi_enqueue(void *context)
639{
640        swi_sched(taskqueue_ih, 0);
641}
642
643static void
644taskqueue_swi_run(void *dummy)
645{
646        taskqueue_run(taskqueue_swi);
647}
648
649static void
650taskqueue_swi_giant_enqueue(void *context)
651{
652        swi_sched(taskqueue_giant_ih, 0);
653}
654
655static void
656taskqueue_swi_giant_run(void *dummy)
657{
658        taskqueue_run(taskqueue_swi_giant);
659}
660
661static int
662_taskqueue_start_threads(struct taskqueue **tqp, int count, int pri,
663    cpuset_t *mask, const char *name, va_list ap)
664{
665        char ktname[MAXCOMLEN + 1];
666        struct thread *td;
667        struct taskqueue *tq;
668        int i, error;
669
670        if (count <= 0)
671                return (EINVAL);
672
673        vsnprintf(ktname, sizeof(ktname), name, ap);
674        tq = *tqp;
675
676        tq->tq_threads = malloc(sizeof(struct thread *) * count, M_TASKQUEUE,
677            M_NOWAIT | M_ZERO);
678        if (tq->tq_threads == NULL) {
679                printf("%s: no memory for %s threads\n", __func__, ktname);
680                return (ENOMEM);
681        }
682
683        for (i = 0; i < count; i++) {
684                if (count == 1)
685                        error = kthread_add(taskqueue_thread_loop, tqp, NULL,
686                            &tq->tq_threads[i], RFSTOPPED, 0, "%s", ktname);
687                else
688                        error = kthread_add(taskqueue_thread_loop, tqp, NULL,
689                            &tq->tq_threads[i], RFSTOPPED, 0,
690                            "%s_%d", ktname, i);
691                if (error) {
692                        /* should be ok to continue, taskqueue_free will dtrt */
693                        printf("%s: kthread_add(%s): error %d", __func__,
694                            ktname, error);
695                        tq->tq_threads[i] = NULL;               /* paranoid */
696                } else
697                        tq->tq_tcount++;
698        }
699        if (tq->tq_tcount == 0) {
700                free(tq->tq_threads, M_TASKQUEUE);
701                tq->tq_threads = NULL;
702                return (ENOMEM);
703        }
704#ifndef __rtems__
705        for (i = 0; i < count; i++) {
706                if (tq->tq_threads[i] == NULL)
707                        continue;
708                td = tq->tq_threads[i];
709                if (mask) {
710                        error = cpuset_setthread(td->td_tid, mask);
711                        /*
712                         * Failing to pin is rarely an actual fatal error;
713                         * it'll just affect performance.
714                         */
715                        if (error)
716                                printf("%s: curthread=%llu: can't pin; "
717                                    "error=%d\n",
718                                    __func__,
719                                    (unsigned long long) td->td_tid,
720                                    error);
721                }
722                thread_lock(td);
723                sched_prio(td, pri);
724                sched_add(td, SRQ_BORING);
725                thread_unlock(td);
726        }
727#else /* __rtems__ */
728        (void) td;
729#endif /* __rtems__ */
730
731        return (0);
732}
733
734int
735taskqueue_start_threads(struct taskqueue **tqp, int count, int pri,
736    const char *name, ...)
737{
738        va_list ap;
739        int error;
740
741        va_start(ap, name);
742        error = _taskqueue_start_threads(tqp, count, pri, NULL, name, ap);
743        va_end(ap);
744        return (error);
745}
746
747int
748taskqueue_start_threads_cpuset(struct taskqueue **tqp, int count, int pri,
749    cpuset_t *mask, const char *name, ...)
750{
751        va_list ap;
752        int error;
753
754        va_start(ap, name);
755        error = _taskqueue_start_threads(tqp, count, pri, mask, name, ap);
756        va_end(ap);
757        return (error);
758}
759
760static inline void
761taskqueue_run_callback(struct taskqueue *tq,
762    enum taskqueue_callback_type cb_type)
763{
764        taskqueue_callback_fn tq_callback;
765
766        TQ_ASSERT_UNLOCKED(tq);
767        tq_callback = tq->tq_callbacks[cb_type];
768        if (tq_callback != NULL)
769                tq_callback(tq->tq_cb_contexts[cb_type]);
770}
771
772void
773taskqueue_thread_loop(void *arg)
774{
775        struct taskqueue **tqp, *tq;
776
777        tqp = arg;
778        tq = *tqp;
779        taskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_INIT);
780        TQ_LOCK(tq);
781        while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) {
782                /* XXX ? */
783                taskqueue_run_locked(tq);
784                /*
785                 * Because taskqueue_run() can drop tq_mutex, we need to
786                 * check if the TQ_FLAGS_ACTIVE flag wasn't removed in the
787                 * meantime, which means we missed a wakeup.
788                 */
789                if ((tq->tq_flags & TQ_FLAGS_ACTIVE) == 0)
790                        break;
791                TQ_SLEEP(tq, tq, &tq->tq_mutex, 0, "-", 0);
792        }
793        taskqueue_run_locked(tq);
794        /*
795         * This thread is on its way out, so just drop the lock temporarily
796         * in order to call the shutdown callback.  This allows the callback
797         * to look at the taskqueue, even just before it dies.
798         */
799        TQ_UNLOCK(tq);
800        taskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_SHUTDOWN);
801        TQ_LOCK(tq);
802
803        /* rendezvous with thread that asked us to terminate */
804        tq->tq_tcount--;
805        wakeup_one(tq->tq_threads);
806        TQ_UNLOCK(tq);
807        kthread_exit();
808}
809
810void
811taskqueue_thread_enqueue(void *context)
812{
813        struct taskqueue **tqp, *tq;
814
815        tqp = context;
816        tq = *tqp;
817        wakeup_one(tq);
818}
819
820TASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, NULL,
821                 swi_add(NULL, "task queue", taskqueue_swi_run, NULL, SWI_TQ,
822                     INTR_MPSAFE, &taskqueue_ih));
823
824TASKQUEUE_DEFINE(swi_giant, taskqueue_swi_giant_enqueue, NULL,
825                 swi_add(NULL, "Giant taskq", taskqueue_swi_giant_run,
826                     NULL, SWI_TQ_GIANT, 0, &taskqueue_giant_ih));
827
828TASKQUEUE_DEFINE_THREAD(thread);
829
830struct taskqueue *
831taskqueue_create_fast(const char *name, int mflags,
832                 taskqueue_enqueue_fn enqueue, void *context)
833{
834        return _taskqueue_create(name, mflags, enqueue, context,
835                        MTX_SPIN, "fast_taskqueue");
836}
837
838static void     *taskqueue_fast_ih;
839
840static void
841taskqueue_fast_enqueue(void *context)
842{
843        swi_sched(taskqueue_fast_ih, 0);
844}
845
846static void
847taskqueue_fast_run(void *dummy)
848{
849        taskqueue_run(taskqueue_fast);
850}
851
852TASKQUEUE_FAST_DEFINE(fast, taskqueue_fast_enqueue, NULL,
853        swi_add(NULL, "fast taskq", taskqueue_fast_run, NULL,
854        SWI_TQ_FAST, INTR_MPSAFE, &taskqueue_fast_ih));
855
856int
857taskqueue_member(struct taskqueue *queue, struct thread *td)
858{
859        int i, j, ret = 0;
860
861        for (i = 0, j = 0; ; i++) {
862                if (queue->tq_threads[i] == NULL)
863                        continue;
864                if (queue->tq_threads[i] == td) {
865                        ret = 1;
866                        break;
867                }
868                if (++j >= queue->tq_tcount)
869                        break;
870        }
871        return (ret);
872}
Note: See TracBrowser for help on using the repository browser.