source: rtems-libbsd/freebsd/sys/rpc/svc.c @ c6dbc96

6-freebsd-12
Last change on this file since c6dbc96 was c6dbc96, checked in by Chris Johns <chrisj@…>, on 07/29/21 at 03:24:49

kern/sys: Add the kernel RPC and XDR support

Updates #4475

  • Property mode set to 100644
File size: 35.5 KB
Line 
1#include <machine/rtems-bsd-kernel-space.h>
2
3/*      $NetBSD: svc.c,v 1.21 2000/07/06 03:10:35 christos Exp $        */
4
5/*-
6 * SPDX-License-Identifier: BSD-3-Clause
7 *
8 * Copyright (c) 2009, Sun Microsystems, Inc.
9 * All rights reserved.
10 *
11 * Redistribution and use in source and binary forms, with or without
12 * modification, are permitted provided that the following conditions are met:
13 * - Redistributions of source code must retain the above copyright notice,
14 *   this list of conditions and the following disclaimer.
15 * - Redistributions in binary form must reproduce the above copyright notice,
16 *   this list of conditions and the following disclaimer in the documentation
17 *   and/or other materials provided with the distribution.
18 * - Neither the name of Sun Microsystems, Inc. nor the names of its
19 *   contributors may be used to endorse or promote products derived
20 *   from this software without specific prior written permission.
21 *
22 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
23 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
24 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
25 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
26 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
27 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
28 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
29 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
30 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
31 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 * POSSIBILITY OF SUCH DAMAGE.
33 */
34
35#if defined(LIBC_SCCS) && !defined(lint)
36static char *sccsid2 = "@(#)svc.c 1.44 88/02/08 Copyr 1984 Sun Micro";
37static char *sccsid = "@(#)svc.c        2.4 88/08/11 4.0 RPCSRC";
38#endif
39#include <sys/cdefs.h>
40__FBSDID("$FreeBSD$");
41
42/*
43 * svc.c, Server-side remote procedure call interface.
44 *
45 * There are two sets of procedures here.  The xprt routines are
46 * for handling transport handles.  The svc routines handle the
47 * list of service routines.
48 *
49 * Copyright (C) 1984, Sun Microsystems, Inc.
50 */
51
52#include <sys/param.h>
53#include <sys/lock.h>
54#include <sys/kernel.h>
55#include <sys/kthread.h>
56#include <sys/malloc.h>
57#include <sys/mbuf.h>
58#include <sys/mutex.h>
59#include <sys/proc.h>
60#include <sys/queue.h>
61#include <sys/socketvar.h>
62#include <sys/systm.h>
63#include <sys/smp.h>
64#include <sys/sx.h>
65#include <sys/ucred.h>
66
67#include <rpc/rpc.h>
68#include <rpc/rpcb_clnt.h>
69#include <rpc/replay.h>
70
71#include <rpc/rpc_com.h>
72
73#define SVC_VERSQUIET 0x0001            /* keep quiet about vers mismatch */
74#define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET)
75
76static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t,
77    char *);
78static void svc_new_thread(SVCGROUP *grp);
79static void xprt_unregister_locked(SVCXPRT *xprt);
80static void svc_change_space_used(SVCPOOL *pool, long delta);
81static bool_t svc_request_space_available(SVCPOOL *pool);
82static void svcpool_cleanup(SVCPOOL *pool);
83
84/* ***************  SVCXPRT related stuff **************** */
85
86static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS);
87static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS);
88static int svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS);
89
90SVCPOOL*
91svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base)
92{
93        SVCPOOL *pool;
94        SVCGROUP *grp;
95        int g;
96
97        pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO);
98       
99        mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF);
100        pool->sp_name = name;
101        pool->sp_state = SVCPOOL_INIT;
102        pool->sp_proc = NULL;
103        TAILQ_INIT(&pool->sp_callouts);
104        TAILQ_INIT(&pool->sp_lcallouts);
105        pool->sp_minthreads = 1;
106        pool->sp_maxthreads = 1;
107        pool->sp_groupcount = 1;
108        for (g = 0; g < SVC_MAXGROUPS; g++) {
109                grp = &pool->sp_groups[g];
110                mtx_init(&grp->sg_lock, "sg_lock", NULL, MTX_DEF);
111                grp->sg_pool = pool;
112                grp->sg_state = SVCPOOL_ACTIVE;
113                TAILQ_INIT(&grp->sg_xlist);
114                TAILQ_INIT(&grp->sg_active);
115                LIST_INIT(&grp->sg_idlethreads);
116                grp->sg_minthreads = 1;
117                grp->sg_maxthreads = 1;
118        }
119
120        /*
121         * Don't use more than a quarter of mbuf clusters.  Nota bene:
122         * nmbclusters is an int, but nmbclusters*MCLBYTES may overflow
123         * on LP64 architectures, so cast to u_long to avoid undefined
124         * behavior.  (ILP32 architectures cannot have nmbclusters
125         * large enough to overflow for other reasons.)
126         */
127        pool->sp_space_high = (u_long)nmbclusters * MCLBYTES / 4;
128        pool->sp_space_low = (pool->sp_space_high / 3) * 2;
129
130        sysctl_ctx_init(&pool->sp_sysctl);
131        if (sysctl_base) {
132                SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
133                    "minthreads", CTLTYPE_INT | CTLFLAG_RW,
134                    pool, 0, svcpool_minthread_sysctl, "I",
135                    "Minimal number of threads");
136                SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
137                    "maxthreads", CTLTYPE_INT | CTLFLAG_RW,
138                    pool, 0, svcpool_maxthread_sysctl, "I",
139                    "Maximal number of threads");
140                SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
141                    "threads", CTLTYPE_INT | CTLFLAG_RD,
142                    pool, 0, svcpool_threads_sysctl, "I",
143                    "Current number of threads");
144                SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
145                    "groups", CTLFLAG_RD, &pool->sp_groupcount, 0,
146                    "Number of thread groups");
147
148                SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
149                    "request_space_used", CTLFLAG_RD,
150                    &pool->sp_space_used,
151                    "Space in parsed but not handled requests.");
152
153                SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
154                    "request_space_used_highest", CTLFLAG_RD,
155                    &pool->sp_space_used_highest,
156                    "Highest space used since reboot.");
157
158                SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
159                    "request_space_high", CTLFLAG_RW,
160                    &pool->sp_space_high,
161                    "Maximum space in parsed but not handled requests.");
162
163                SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
164                    "request_space_low", CTLFLAG_RW,
165                    &pool->sp_space_low,
166                    "Low water mark for request space.");
167
168                SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
169                    "request_space_throttled", CTLFLAG_RD,
170                    &pool->sp_space_throttled, 0,
171                    "Whether nfs requests are currently throttled");
172
173                SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
174                    "request_space_throttle_count", CTLFLAG_RD,
175                    &pool->sp_space_throttle_count, 0,
176                    "Count of times throttling based on request space has occurred");
177        }
178
179        return pool;
180}
181
182/*
183 * Code common to svcpool_destroy() and svcpool_close(), which cleans up
184 * the pool data structures.
185 */
186static void
187svcpool_cleanup(SVCPOOL *pool)
188{
189        SVCGROUP *grp;
190        SVCXPRT *xprt, *nxprt;
191        struct svc_callout *s;
192        struct svc_loss_callout *sl;
193        struct svcxprt_list cleanup;
194        int g;
195
196        TAILQ_INIT(&cleanup);
197
198        for (g = 0; g < SVC_MAXGROUPS; g++) {
199                grp = &pool->sp_groups[g];
200                mtx_lock(&grp->sg_lock);
201                while ((xprt = TAILQ_FIRST(&grp->sg_xlist)) != NULL) {
202                        xprt_unregister_locked(xprt);
203                        TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
204                }
205                mtx_unlock(&grp->sg_lock);
206        }
207        TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
208                SVC_RELEASE(xprt);
209        }
210
211        mtx_lock(&pool->sp_lock);
212        while ((s = TAILQ_FIRST(&pool->sp_callouts)) != NULL) {
213                mtx_unlock(&pool->sp_lock);
214                svc_unreg(pool, s->sc_prog, s->sc_vers);
215                mtx_lock(&pool->sp_lock);
216        }
217        while ((sl = TAILQ_FIRST(&pool->sp_lcallouts)) != NULL) {
218                mtx_unlock(&pool->sp_lock);
219                svc_loss_unreg(pool, sl->slc_dispatch);
220                mtx_lock(&pool->sp_lock);
221        }
222        mtx_unlock(&pool->sp_lock);
223}
224
225void
226svcpool_destroy(SVCPOOL *pool)
227{
228        SVCGROUP *grp;
229        int g;
230
231        svcpool_cleanup(pool);
232
233        for (g = 0; g < SVC_MAXGROUPS; g++) {
234                grp = &pool->sp_groups[g];
235                mtx_destroy(&grp->sg_lock);
236        }
237        mtx_destroy(&pool->sp_lock);
238
239        if (pool->sp_rcache)
240                replay_freecache(pool->sp_rcache);
241
242        sysctl_ctx_free(&pool->sp_sysctl);
243        free(pool, M_RPC);
244}
245
246/*
247 * Similar to svcpool_destroy(), except that it does not destroy the actual
248 * data structures.  As such, "pool" may be used again.
249 */
250void
251svcpool_close(SVCPOOL *pool)
252{
253        SVCGROUP *grp;
254        int g;
255
256        svcpool_cleanup(pool);
257
258        /* Now, initialize the pool's state for a fresh svc_run() call. */
259        mtx_lock(&pool->sp_lock);
260        pool->sp_state = SVCPOOL_INIT;
261        mtx_unlock(&pool->sp_lock);
262        for (g = 0; g < SVC_MAXGROUPS; g++) {
263                grp = &pool->sp_groups[g];
264                mtx_lock(&grp->sg_lock);
265                grp->sg_state = SVCPOOL_ACTIVE;
266                mtx_unlock(&grp->sg_lock);
267        }
268}
269
270/*
271 * Sysctl handler to get the present thread count on a pool
272 */
273static int
274svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS)
275{
276        SVCPOOL *pool;
277        int threads, error, g;
278
279        pool = oidp->oid_arg1;
280        threads = 0;
281        mtx_lock(&pool->sp_lock);
282        for (g = 0; g < pool->sp_groupcount; g++)
283                threads += pool->sp_groups[g].sg_threadcount;
284        mtx_unlock(&pool->sp_lock);
285        error = sysctl_handle_int(oidp, &threads, 0, req);
286        return (error);
287}
288
289/*
290 * Sysctl handler to set the minimum thread count on a pool
291 */
292static int
293svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS)
294{
295        SVCPOOL *pool;
296        int newminthreads, error, g;
297
298        pool = oidp->oid_arg1;
299        newminthreads = pool->sp_minthreads;
300        error = sysctl_handle_int(oidp, &newminthreads, 0, req);
301        if (error == 0 && newminthreads != pool->sp_minthreads) {
302                if (newminthreads > pool->sp_maxthreads)
303                        return (EINVAL);
304                mtx_lock(&pool->sp_lock);
305                pool->sp_minthreads = newminthreads;
306                for (g = 0; g < pool->sp_groupcount; g++) {
307                        pool->sp_groups[g].sg_minthreads = max(1,
308                            pool->sp_minthreads / pool->sp_groupcount);
309                }
310                mtx_unlock(&pool->sp_lock);
311        }
312        return (error);
313}
314
315/*
316 * Sysctl handler to set the maximum thread count on a pool
317 */
318static int
319svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS)
320{
321        SVCPOOL *pool;
322        int newmaxthreads, error, g;
323
324        pool = oidp->oid_arg1;
325        newmaxthreads = pool->sp_maxthreads;
326        error = sysctl_handle_int(oidp, &newmaxthreads, 0, req);
327        if (error == 0 && newmaxthreads != pool->sp_maxthreads) {
328                if (newmaxthreads < pool->sp_minthreads)
329                        return (EINVAL);
330                mtx_lock(&pool->sp_lock);
331                pool->sp_maxthreads = newmaxthreads;
332                for (g = 0; g < pool->sp_groupcount; g++) {
333                        pool->sp_groups[g].sg_maxthreads = max(1,
334                            pool->sp_maxthreads / pool->sp_groupcount);
335                }
336                mtx_unlock(&pool->sp_lock);
337        }
338        return (error);
339}
340
341/*
342 * Activate a transport handle.
343 */
344void
345xprt_register(SVCXPRT *xprt)
346{
347        SVCPOOL *pool = xprt->xp_pool;
348        SVCGROUP *grp;
349        int g;
350
351        SVC_ACQUIRE(xprt);
352        g = atomic_fetchadd_int(&pool->sp_nextgroup, 1) % pool->sp_groupcount;
353        xprt->xp_group = grp = &pool->sp_groups[g];
354        mtx_lock(&grp->sg_lock);
355        xprt->xp_registered = TRUE;
356        xprt->xp_active = FALSE;
357        TAILQ_INSERT_TAIL(&grp->sg_xlist, xprt, xp_link);
358        mtx_unlock(&grp->sg_lock);
359}
360
361/*
362 * De-activate a transport handle. Note: the locked version doesn't
363 * release the transport - caller must do that after dropping the pool
364 * lock.
365 */
366static void
367xprt_unregister_locked(SVCXPRT *xprt)
368{
369        SVCGROUP *grp = xprt->xp_group;
370
371        mtx_assert(&grp->sg_lock, MA_OWNED);
372        KASSERT(xprt->xp_registered == TRUE,
373            ("xprt_unregister_locked: not registered"));
374        xprt_inactive_locked(xprt);
375        TAILQ_REMOVE(&grp->sg_xlist, xprt, xp_link);
376        xprt->xp_registered = FALSE;
377}
378
379void
380xprt_unregister(SVCXPRT *xprt)
381{
382        SVCGROUP *grp = xprt->xp_group;
383
384        mtx_lock(&grp->sg_lock);
385        if (xprt->xp_registered == FALSE) {
386                /* Already unregistered by another thread */
387                mtx_unlock(&grp->sg_lock);
388                return;
389        }
390        xprt_unregister_locked(xprt);
391        mtx_unlock(&grp->sg_lock);
392
393        SVC_RELEASE(xprt);
394}
395
396/*
397 * Attempt to assign a service thread to this transport.
398 */
399static int
400xprt_assignthread(SVCXPRT *xprt)
401{
402        SVCGROUP *grp = xprt->xp_group;
403        SVCTHREAD *st;
404
405        mtx_assert(&grp->sg_lock, MA_OWNED);
406        st = LIST_FIRST(&grp->sg_idlethreads);
407        if (st) {
408                LIST_REMOVE(st, st_ilink);
409                SVC_ACQUIRE(xprt);
410                xprt->xp_thread = st;
411                st->st_xprt = xprt;
412                cv_signal(&st->st_cond);
413                return (TRUE);
414        } else {
415                /*
416                 * See if we can create a new thread. The
417                 * actual thread creation happens in
418                 * svc_run_internal because our locking state
419                 * is poorly defined (we are typically called
420                 * from a socket upcall). Don't create more
421                 * than one thread per second.
422                 */
423                if (grp->sg_state == SVCPOOL_ACTIVE
424                    && grp->sg_lastcreatetime < time_uptime
425                    && grp->sg_threadcount < grp->sg_maxthreads) {
426                        grp->sg_state = SVCPOOL_THREADWANTED;
427                }
428        }
429        return (FALSE);
430}
431
432void
433xprt_active(SVCXPRT *xprt)
434{
435        SVCGROUP *grp = xprt->xp_group;
436
437        mtx_lock(&grp->sg_lock);
438
439        if (!xprt->xp_registered) {
440                /*
441                 * Race with xprt_unregister - we lose.
442                 */
443                mtx_unlock(&grp->sg_lock);
444                return;
445        }
446
447        if (!xprt->xp_active) {
448                xprt->xp_active = TRUE;
449                if (xprt->xp_thread == NULL) {
450                        if (!svc_request_space_available(xprt->xp_pool) ||
451                            !xprt_assignthread(xprt))
452                                TAILQ_INSERT_TAIL(&grp->sg_active, xprt,
453                                    xp_alink);
454                }
455        }
456
457        mtx_unlock(&grp->sg_lock);
458}
459
460void
461xprt_inactive_locked(SVCXPRT *xprt)
462{
463        SVCGROUP *grp = xprt->xp_group;
464
465        mtx_assert(&grp->sg_lock, MA_OWNED);
466        if (xprt->xp_active) {
467                if (xprt->xp_thread == NULL)
468                        TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
469                xprt->xp_active = FALSE;
470        }
471}
472
473void
474xprt_inactive(SVCXPRT *xprt)
475{
476        SVCGROUP *grp = xprt->xp_group;
477
478        mtx_lock(&grp->sg_lock);
479        xprt_inactive_locked(xprt);
480        mtx_unlock(&grp->sg_lock);
481}
482
483/*
484 * Variant of xprt_inactive() for use only when sure that port is
485 * assigned to thread. For example, within receive handlers.
486 */
487void
488xprt_inactive_self(SVCXPRT *xprt)
489{
490
491        KASSERT(xprt->xp_thread != NULL,
492            ("xprt_inactive_self(%p) with NULL xp_thread", xprt));
493        xprt->xp_active = FALSE;
494}
495
496/*
497 * Add a service program to the callout list.
498 * The dispatch routine will be called when a rpc request for this
499 * program number comes in.
500 */
501bool_t
502svc_reg(SVCXPRT *xprt, const rpcprog_t prog, const rpcvers_t vers,
503    void (*dispatch)(struct svc_req *, SVCXPRT *),
504    const struct netconfig *nconf)
505{
506        SVCPOOL *pool = xprt->xp_pool;
507        struct svc_callout *s;
508        char *netid = NULL;
509        int flag = 0;
510
511/* VARIABLES PROTECTED BY svc_lock: s, svc_head */
512
513        if (xprt->xp_netid) {
514                netid = strdup(xprt->xp_netid, M_RPC);
515                flag = 1;
516        } else if (nconf && nconf->nc_netid) {
517                netid = strdup(nconf->nc_netid, M_RPC);
518                flag = 1;
519        } /* must have been created with svc_raw_create */
520        if ((netid == NULL) && (flag == 1)) {
521                return (FALSE);
522        }
523
524        mtx_lock(&pool->sp_lock);
525        if ((s = svc_find(pool, prog, vers, netid)) != NULL) {
526                if (netid)
527                        free(netid, M_RPC);
528                if (s->sc_dispatch == dispatch)
529                        goto rpcb_it; /* he is registering another xptr */
530                mtx_unlock(&pool->sp_lock);
531                return (FALSE);
532        }
533        s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT);
534        if (s == NULL) {
535                if (netid)
536                        free(netid, M_RPC);
537                mtx_unlock(&pool->sp_lock);
538                return (FALSE);
539        }
540
541        s->sc_prog = prog;
542        s->sc_vers = vers;
543        s->sc_dispatch = dispatch;
544        s->sc_netid = netid;
545        TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link);
546
547        if ((xprt->xp_netid == NULL) && (flag == 1) && netid)
548                ((SVCXPRT *) xprt)->xp_netid = strdup(netid, M_RPC);
549
550rpcb_it:
551        mtx_unlock(&pool->sp_lock);
552        /* now register the information with the local binder service */
553        if (nconf) {
554                bool_t dummy;
555                struct netconfig tnc;
556                struct netbuf nb;
557                tnc = *nconf;
558                nb.buf = &xprt->xp_ltaddr;
559                nb.len = xprt->xp_ltaddr.ss_len;
560                dummy = rpcb_set(prog, vers, &tnc, &nb);
561                return (dummy);
562        }
563        return (TRUE);
564}
565
566/*
567 * Remove a service program from the callout list.
568 */
569void
570svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers)
571{
572        struct svc_callout *s;
573
574        /* unregister the information anyway */
575        (void) rpcb_unset(prog, vers, NULL);
576        mtx_lock(&pool->sp_lock);
577        while ((s = svc_find(pool, prog, vers, NULL)) != NULL) {
578                TAILQ_REMOVE(&pool->sp_callouts, s, sc_link);
579                if (s->sc_netid)
580                        mem_free(s->sc_netid, sizeof (s->sc_netid) + 1);
581                mem_free(s, sizeof (struct svc_callout));
582        }
583        mtx_unlock(&pool->sp_lock);
584}
585
586/*
587 * Add a service connection loss program to the callout list.
588 * The dispatch routine will be called when some port in ths pool die.
589 */
590bool_t
591svc_loss_reg(SVCXPRT *xprt, void (*dispatch)(SVCXPRT *))
592{
593        SVCPOOL *pool = xprt->xp_pool;
594        struct svc_loss_callout *s;
595
596        mtx_lock(&pool->sp_lock);
597        TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
598                if (s->slc_dispatch == dispatch)
599                        break;
600        }
601        if (s != NULL) {
602                mtx_unlock(&pool->sp_lock);
603                return (TRUE);
604        }
605        s = malloc(sizeof(struct svc_loss_callout), M_RPC, M_NOWAIT);
606        if (s == NULL) {
607                mtx_unlock(&pool->sp_lock);
608                return (FALSE);
609        }
610        s->slc_dispatch = dispatch;
611        TAILQ_INSERT_TAIL(&pool->sp_lcallouts, s, slc_link);
612        mtx_unlock(&pool->sp_lock);
613        return (TRUE);
614}
615
616/*
617 * Remove a service connection loss program from the callout list.
618 */
619void
620svc_loss_unreg(SVCPOOL *pool, void (*dispatch)(SVCXPRT *))
621{
622        struct svc_loss_callout *s;
623
624        mtx_lock(&pool->sp_lock);
625        TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
626                if (s->slc_dispatch == dispatch) {
627                        TAILQ_REMOVE(&pool->sp_lcallouts, s, slc_link);
628                        free(s, M_RPC);
629                        break;
630                }
631        }
632        mtx_unlock(&pool->sp_lock);
633}
634
635/* ********************** CALLOUT list related stuff ************* */
636
637/*
638 * Search the callout list for a program number, return the callout
639 * struct.
640 */
641static struct svc_callout *
642svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid)
643{
644        struct svc_callout *s;
645
646        mtx_assert(&pool->sp_lock, MA_OWNED);
647        TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
648                if (s->sc_prog == prog && s->sc_vers == vers
649                    && (netid == NULL || s->sc_netid == NULL ||
650                        strcmp(netid, s->sc_netid) == 0))
651                        break;
652        }
653
654        return (s);
655}
656
657/* ******************* REPLY GENERATION ROUTINES  ************ */
658
659static bool_t
660svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply,
661    struct mbuf *body)
662{
663        SVCXPRT *xprt = rqstp->rq_xprt;
664        bool_t ok;
665
666        if (rqstp->rq_args) {
667                m_freem(rqstp->rq_args);
668                rqstp->rq_args = NULL;
669        }
670
671        if (xprt->xp_pool->sp_rcache)
672                replay_setreply(xprt->xp_pool->sp_rcache,
673                    rply, svc_getrpccaller(rqstp), body);
674
675        if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body))
676                return (FALSE);
677
678        ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body, &rqstp->rq_reply_seq);
679        if (rqstp->rq_addr) {
680                free(rqstp->rq_addr, M_SONAME);
681                rqstp->rq_addr = NULL;
682        }
683
684        return (ok);
685}
686
687/*
688 * Send a reply to an rpc request
689 */
690bool_t
691svc_sendreply(struct svc_req *rqstp, xdrproc_t xdr_results, void * xdr_location)
692{
693        struct rpc_msg rply;
694        struct mbuf *m;
695        XDR xdrs;
696        bool_t ok;
697
698        rply.rm_xid = rqstp->rq_xid;
699        rply.rm_direction = REPLY; 
700        rply.rm_reply.rp_stat = MSG_ACCEPTED;
701        rply.acpted_rply.ar_verf = rqstp->rq_verf;
702        rply.acpted_rply.ar_stat = SUCCESS;
703        rply.acpted_rply.ar_results.where = NULL;
704        rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
705
706        m = m_getcl(M_WAITOK, MT_DATA, 0);
707        xdrmbuf_create(&xdrs, m, XDR_ENCODE);
708        ok = xdr_results(&xdrs, xdr_location);
709        XDR_DESTROY(&xdrs);
710
711        if (ok) {
712                return (svc_sendreply_common(rqstp, &rply, m));
713        } else {
714                m_freem(m);
715                return (FALSE);
716        }
717}
718
719bool_t
720svc_sendreply_mbuf(struct svc_req *rqstp, struct mbuf *m)
721{
722        struct rpc_msg rply;
723
724        rply.rm_xid = rqstp->rq_xid;
725        rply.rm_direction = REPLY; 
726        rply.rm_reply.rp_stat = MSG_ACCEPTED;
727        rply.acpted_rply.ar_verf = rqstp->rq_verf;
728        rply.acpted_rply.ar_stat = SUCCESS;
729        rply.acpted_rply.ar_results.where = NULL;
730        rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
731
732        return (svc_sendreply_common(rqstp, &rply, m));
733}
734
735/*
736 * No procedure error reply
737 */
738void
739svcerr_noproc(struct svc_req *rqstp)
740{
741        SVCXPRT *xprt = rqstp->rq_xprt;
742        struct rpc_msg rply;
743
744        rply.rm_xid = rqstp->rq_xid;
745        rply.rm_direction = REPLY;
746        rply.rm_reply.rp_stat = MSG_ACCEPTED;
747        rply.acpted_rply.ar_verf = rqstp->rq_verf;
748        rply.acpted_rply.ar_stat = PROC_UNAVAIL;
749
750        if (xprt->xp_pool->sp_rcache)
751                replay_setreply(xprt->xp_pool->sp_rcache,
752                    &rply, svc_getrpccaller(rqstp), NULL);
753
754        svc_sendreply_common(rqstp, &rply, NULL);
755}
756
757/*
758 * Can't decode args error reply
759 */
760void
761svcerr_decode(struct svc_req *rqstp)
762{
763        SVCXPRT *xprt = rqstp->rq_xprt;
764        struct rpc_msg rply;
765
766        rply.rm_xid = rqstp->rq_xid;
767        rply.rm_direction = REPLY;
768        rply.rm_reply.rp_stat = MSG_ACCEPTED;
769        rply.acpted_rply.ar_verf = rqstp->rq_verf;
770        rply.acpted_rply.ar_stat = GARBAGE_ARGS;
771
772        if (xprt->xp_pool->sp_rcache)
773                replay_setreply(xprt->xp_pool->sp_rcache,
774                    &rply, (struct sockaddr *) &xprt->xp_rtaddr, NULL);
775
776        svc_sendreply_common(rqstp, &rply, NULL);
777}
778
779/*
780 * Some system error
781 */
782void
783svcerr_systemerr(struct svc_req *rqstp)
784{
785        SVCXPRT *xprt = rqstp->rq_xprt;
786        struct rpc_msg rply;
787
788        rply.rm_xid = rqstp->rq_xid;
789        rply.rm_direction = REPLY;
790        rply.rm_reply.rp_stat = MSG_ACCEPTED;
791        rply.acpted_rply.ar_verf = rqstp->rq_verf;
792        rply.acpted_rply.ar_stat = SYSTEM_ERR;
793
794        if (xprt->xp_pool->sp_rcache)
795                replay_setreply(xprt->xp_pool->sp_rcache,
796                    &rply, svc_getrpccaller(rqstp), NULL);
797
798        svc_sendreply_common(rqstp, &rply, NULL);
799}
800
801/*
802 * Authentication error reply
803 */
804void
805svcerr_auth(struct svc_req *rqstp, enum auth_stat why)
806{
807        SVCXPRT *xprt = rqstp->rq_xprt;
808        struct rpc_msg rply;
809
810        rply.rm_xid = rqstp->rq_xid;
811        rply.rm_direction = REPLY;
812        rply.rm_reply.rp_stat = MSG_DENIED;
813        rply.rjcted_rply.rj_stat = AUTH_ERROR;
814        rply.rjcted_rply.rj_why = why;
815
816        if (xprt->xp_pool->sp_rcache)
817                replay_setreply(xprt->xp_pool->sp_rcache,
818                    &rply, svc_getrpccaller(rqstp), NULL);
819
820        svc_sendreply_common(rqstp, &rply, NULL);
821}
822
823/*
824 * Auth too weak error reply
825 */
826void
827svcerr_weakauth(struct svc_req *rqstp)
828{
829
830        svcerr_auth(rqstp, AUTH_TOOWEAK);
831}
832
833/*
834 * Program unavailable error reply
835 */
836void
837svcerr_noprog(struct svc_req *rqstp)
838{
839        SVCXPRT *xprt = rqstp->rq_xprt;
840        struct rpc_msg rply; 
841
842        rply.rm_xid = rqstp->rq_xid;
843        rply.rm_direction = REPLY;   
844        rply.rm_reply.rp_stat = MSG_ACCEPTED; 
845        rply.acpted_rply.ar_verf = rqstp->rq_verf; 
846        rply.acpted_rply.ar_stat = PROG_UNAVAIL;
847
848        if (xprt->xp_pool->sp_rcache)
849                replay_setreply(xprt->xp_pool->sp_rcache,
850                    &rply, svc_getrpccaller(rqstp), NULL);
851
852        svc_sendreply_common(rqstp, &rply, NULL);
853}
854
855/*
856 * Program version mismatch error reply
857 */
858void 
859svcerr_progvers(struct svc_req *rqstp, rpcvers_t low_vers, rpcvers_t high_vers)
860{
861        SVCXPRT *xprt = rqstp->rq_xprt;
862        struct rpc_msg rply;
863
864        rply.rm_xid = rqstp->rq_xid;
865        rply.rm_direction = REPLY;
866        rply.rm_reply.rp_stat = MSG_ACCEPTED;
867        rply.acpted_rply.ar_verf = rqstp->rq_verf;
868        rply.acpted_rply.ar_stat = PROG_MISMATCH;
869        rply.acpted_rply.ar_vers.low = (uint32_t)low_vers;
870        rply.acpted_rply.ar_vers.high = (uint32_t)high_vers;
871
872        if (xprt->xp_pool->sp_rcache)
873                replay_setreply(xprt->xp_pool->sp_rcache,
874                    &rply, svc_getrpccaller(rqstp), NULL);
875
876        svc_sendreply_common(rqstp, &rply, NULL);
877}
878
879/*
880 * Allocate a new server transport structure. All fields are
881 * initialized to zero and xp_p3 is initialized to point at an
882 * extension structure to hold various flags and authentication
883 * parameters.
884 */
885SVCXPRT *
886svc_xprt_alloc(void)
887{
888        SVCXPRT *xprt;
889        SVCXPRT_EXT *ext;
890
891        xprt = mem_alloc(sizeof(SVCXPRT));
892        ext = mem_alloc(sizeof(SVCXPRT_EXT));
893        xprt->xp_p3 = ext;
894        refcount_init(&xprt->xp_refs, 1);
895
896        return (xprt);
897}
898
899/*
900 * Free a server transport structure.
901 */
902void
903svc_xprt_free(SVCXPRT *xprt)
904{
905
906        mem_free(xprt->xp_p3, sizeof(SVCXPRT_EXT));
907        mem_free(xprt, sizeof(SVCXPRT));
908}
909
910/* ******************* SERVER INPUT STUFF ******************* */
911
912/*
913 * Read RPC requests from a transport and queue them to be
914 * executed. We handle authentication and replay cache replies here.
915 * Actually dispatching the RPC is deferred till svc_executereq.
916 */
917static enum xprt_stat
918svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret)
919{
920        SVCPOOL *pool = xprt->xp_pool;
921        struct svc_req *r;
922        struct rpc_msg msg;
923        struct mbuf *args;
924        struct svc_loss_callout *s;
925        enum xprt_stat stat;
926
927        /* now receive msgs from xprtprt (support batch calls) */
928        r = malloc(sizeof(*r), M_RPC, M_WAITOK|M_ZERO);
929
930        msg.rm_call.cb_cred.oa_base = r->rq_credarea;
931        msg.rm_call.cb_verf.oa_base = &r->rq_credarea[MAX_AUTH_BYTES];
932        r->rq_clntcred = &r->rq_credarea[2*MAX_AUTH_BYTES];
933        if (SVC_RECV(xprt, &msg, &r->rq_addr, &args)) {
934                enum auth_stat why;
935
936                /*
937                 * Handle replays and authenticate before queuing the
938                 * request to be executed.
939                 */
940                SVC_ACQUIRE(xprt);
941                r->rq_xprt = xprt;
942                if (pool->sp_rcache) {
943                        struct rpc_msg repmsg;
944                        struct mbuf *repbody;
945                        enum replay_state rs;
946                        rs = replay_find(pool->sp_rcache, &msg,
947                            svc_getrpccaller(r), &repmsg, &repbody);
948                        switch (rs) {
949                        case RS_NEW:
950                                break;
951                        case RS_DONE:
952                                SVC_REPLY(xprt, &repmsg, r->rq_addr,
953                                    repbody, &r->rq_reply_seq);
954                                if (r->rq_addr) {
955                                        free(r->rq_addr, M_SONAME);
956                                        r->rq_addr = NULL;
957                                }
958                                m_freem(args);
959                                goto call_done;
960
961                        default:
962                                m_freem(args);
963                                goto call_done;
964                        }
965                }
966
967                r->rq_xid = msg.rm_xid;
968                r->rq_prog = msg.rm_call.cb_prog;
969                r->rq_vers = msg.rm_call.cb_vers;
970                r->rq_proc = msg.rm_call.cb_proc;
971                r->rq_size = sizeof(*r) + m_length(args, NULL);
972                r->rq_args = args;
973                if ((why = _authenticate(r, &msg)) != AUTH_OK) {
974                        /*
975                         * RPCSEC_GSS uses this return code
976                         * for requests that form part of its
977                         * context establishment protocol and
978                         * should not be dispatched to the
979                         * application.
980                         */
981                        if (why != RPCSEC_GSS_NODISPATCH)
982                                svcerr_auth(r, why);
983                        goto call_done;
984                }
985
986                if (!SVCAUTH_UNWRAP(&r->rq_auth, &r->rq_args)) {
987                        svcerr_decode(r);
988                        goto call_done;
989                }
990
991                /*
992                 * Everything checks out, return request to caller.
993                 */
994                *rqstp_ret = r;
995                r = NULL;
996        }
997call_done:
998        if (r) {
999                svc_freereq(r);
1000                r = NULL;
1001        }
1002        if ((stat = SVC_STAT(xprt)) == XPRT_DIED) {
1003                TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link)
1004                        (*s->slc_dispatch)(xprt);
1005                xprt_unregister(xprt);
1006        }
1007
1008        return (stat);
1009}
1010
1011static void
1012svc_executereq(struct svc_req *rqstp)
1013{
1014        SVCXPRT *xprt = rqstp->rq_xprt;
1015        SVCPOOL *pool = xprt->xp_pool;
1016        int prog_found;
1017        rpcvers_t low_vers;
1018        rpcvers_t high_vers;
1019        struct svc_callout *s;
1020
1021        /* now match message with a registered service*/
1022        prog_found = FALSE;
1023        low_vers = (rpcvers_t) -1L;
1024        high_vers = (rpcvers_t) 0L;
1025        TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
1026                if (s->sc_prog == rqstp->rq_prog) {
1027                        if (s->sc_vers == rqstp->rq_vers) {
1028                                /*
1029                                 * We hand ownership of r to the
1030                                 * dispatch method - they must call
1031                                 * svc_freereq.
1032                                 */
1033                                (*s->sc_dispatch)(rqstp, xprt);
1034                                return;
1035                        }  /* found correct version */
1036                        prog_found = TRUE;
1037                        if (s->sc_vers < low_vers)
1038                                low_vers = s->sc_vers;
1039                        if (s->sc_vers > high_vers)
1040                                high_vers = s->sc_vers;
1041                }   /* found correct program */
1042        }
1043
1044        /*
1045         * if we got here, the program or version
1046         * is not served ...
1047         */
1048        if (prog_found)
1049                svcerr_progvers(rqstp, low_vers, high_vers);
1050        else
1051                svcerr_noprog(rqstp);
1052
1053        svc_freereq(rqstp);
1054}
1055
1056static void
1057svc_checkidle(SVCGROUP *grp)
1058{
1059        SVCXPRT *xprt, *nxprt;
1060        time_t timo;
1061        struct svcxprt_list cleanup;
1062
1063        TAILQ_INIT(&cleanup);
1064        TAILQ_FOREACH_SAFE(xprt, &grp->sg_xlist, xp_link, nxprt) {
1065                /*
1066                 * Only some transports have idle timers. Don't time
1067                 * something out which is just waking up.
1068                 */
1069                if (!xprt->xp_idletimeout || xprt->xp_thread)
1070                        continue;
1071
1072                timo = xprt->xp_lastactive + xprt->xp_idletimeout;
1073                if (time_uptime > timo) {
1074                        xprt_unregister_locked(xprt);
1075                        TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
1076                }
1077        }
1078
1079        mtx_unlock(&grp->sg_lock);
1080        TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
1081                SVC_RELEASE(xprt);
1082        }
1083        mtx_lock(&grp->sg_lock);
1084}
1085
1086static void
1087svc_assign_waiting_sockets(SVCPOOL *pool)
1088{
1089        SVCGROUP *grp;
1090        SVCXPRT *xprt;
1091        int g;
1092
1093        for (g = 0; g < pool->sp_groupcount; g++) {
1094                grp = &pool->sp_groups[g];
1095                mtx_lock(&grp->sg_lock);
1096                while ((xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
1097                        if (xprt_assignthread(xprt))
1098                                TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
1099                        else
1100                                break;
1101                }
1102                mtx_unlock(&grp->sg_lock);
1103        }
1104}
1105
1106static void
1107svc_change_space_used(SVCPOOL *pool, long delta)
1108{
1109        unsigned long value;
1110
1111        value = atomic_fetchadd_long(&pool->sp_space_used, delta) + delta;
1112        if (delta > 0) {
1113                if (value >= pool->sp_space_high && !pool->sp_space_throttled) {
1114                        pool->sp_space_throttled = TRUE;
1115                        pool->sp_space_throttle_count++;
1116                }
1117                if (value > pool->sp_space_used_highest)
1118                        pool->sp_space_used_highest = value;
1119        } else {
1120                if (value < pool->sp_space_low && pool->sp_space_throttled) {
1121                        pool->sp_space_throttled = FALSE;
1122                        svc_assign_waiting_sockets(pool);
1123                }
1124        }
1125}
1126
1127static bool_t
1128svc_request_space_available(SVCPOOL *pool)
1129{
1130
1131        if (pool->sp_space_throttled)
1132                return (FALSE);
1133        return (TRUE);
1134}
1135
1136static void
1137svc_run_internal(SVCGROUP *grp, bool_t ismaster)
1138{
1139        SVCPOOL *pool = grp->sg_pool;
1140        SVCTHREAD *st, *stpref;
1141        SVCXPRT *xprt;
1142        enum xprt_stat stat;
1143        struct svc_req *rqstp;
1144        struct proc *p;
1145        long sz;
1146        int error;
1147
1148        st = mem_alloc(sizeof(*st));
1149        mtx_init(&st->st_lock, "st_lock", NULL, MTX_DEF);
1150        st->st_pool = pool;
1151        st->st_xprt = NULL;
1152        STAILQ_INIT(&st->st_reqs);
1153        cv_init(&st->st_cond, "rpcsvc");
1154
1155        mtx_lock(&grp->sg_lock);
1156
1157        /*
1158         * If we are a new thread which was spawned to cope with
1159         * increased load, set the state back to SVCPOOL_ACTIVE.
1160         */
1161        if (grp->sg_state == SVCPOOL_THREADSTARTING)
1162                grp->sg_state = SVCPOOL_ACTIVE;
1163
1164        while (grp->sg_state != SVCPOOL_CLOSING) {
1165                /*
1166                 * Create new thread if requested.
1167                 */
1168                if (grp->sg_state == SVCPOOL_THREADWANTED) {
1169                        grp->sg_state = SVCPOOL_THREADSTARTING;
1170                        grp->sg_lastcreatetime = time_uptime;
1171                        mtx_unlock(&grp->sg_lock);
1172                        svc_new_thread(grp);
1173                        mtx_lock(&grp->sg_lock);
1174                        continue;
1175                }
1176
1177                /*
1178                 * Check for idle transports once per second.
1179                 */
1180                if (time_uptime > grp->sg_lastidlecheck) {
1181                        grp->sg_lastidlecheck = time_uptime;
1182                        svc_checkidle(grp);
1183                }
1184
1185                xprt = st->st_xprt;
1186                if (!xprt) {
1187                        /*
1188                         * Enforce maxthreads count.
1189                         */
1190                        if (!ismaster && grp->sg_threadcount >
1191                            grp->sg_maxthreads)
1192                                break;
1193
1194                        /*
1195                         * Before sleeping, see if we can find an
1196                         * active transport which isn't being serviced
1197                         * by a thread.
1198                         */
1199                        if (svc_request_space_available(pool) &&
1200                            (xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
1201                                TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
1202                                SVC_ACQUIRE(xprt);
1203                                xprt->xp_thread = st;
1204                                st->st_xprt = xprt;
1205                                continue;
1206                        }
1207
1208                        LIST_INSERT_HEAD(&grp->sg_idlethreads, st, st_ilink);
1209                        if (ismaster || (!ismaster &&
1210                            grp->sg_threadcount > grp->sg_minthreads))
1211                                error = cv_timedwait_sig(&st->st_cond,
1212                                    &grp->sg_lock, 5 * hz);
1213                        else
1214                                error = cv_wait_sig(&st->st_cond,
1215                                    &grp->sg_lock);
1216                        if (st->st_xprt == NULL)
1217                                LIST_REMOVE(st, st_ilink);
1218
1219                        /*
1220                         * Reduce worker thread count when idle.
1221                         */
1222                        if (error == EWOULDBLOCK) {
1223                                if (!ismaster
1224                                    && (grp->sg_threadcount
1225                                        > grp->sg_minthreads)
1226                                        && !st->st_xprt)
1227                                        break;
1228                        } else if (error != 0) {
1229                                KASSERT(error == EINTR || error == ERESTART,
1230                                    ("non-signal error %d", error));
1231#ifdef __rtems__
1232                                mtx_unlock(&grp->sg_lock);
1233                                svc_exit(pool);
1234                                mtx_lock(&grp->sg_lock);
1235#else /* __rtems__ */
1236                                mtx_unlock(&grp->sg_lock);
1237                                p = curproc;
1238                                PROC_LOCK(p);
1239                                if (P_SHOULDSTOP(p) ||
1240                                    (p->p_flag & P_TOTAL_STOP) != 0) {
1241                                        thread_suspend_check(0);
1242                                        PROC_UNLOCK(p);
1243                                        mtx_lock(&grp->sg_lock);
1244                                } else {
1245                                        PROC_UNLOCK(p);
1246                                        svc_exit(pool);
1247                                        mtx_lock(&grp->sg_lock);
1248                                        break;
1249                                }
1250#endif /* __rtems__ */
1251                        }
1252                        continue;
1253                }
1254                mtx_unlock(&grp->sg_lock);
1255
1256                /*
1257                 * Drain the transport socket and queue up any RPCs.
1258                 */
1259                xprt->xp_lastactive = time_uptime;
1260                do {
1261                        if (!svc_request_space_available(pool))
1262                                break;
1263                        rqstp = NULL;
1264                        stat = svc_getreq(xprt, &rqstp);
1265                        if (rqstp) {
1266                                svc_change_space_used(pool, rqstp->rq_size);
1267                                /*
1268                                 * See if the application has a preference
1269                                 * for some other thread.
1270                                 */
1271                                if (pool->sp_assign) {
1272                                        stpref = pool->sp_assign(st, rqstp);
1273                                        rqstp->rq_thread = stpref;
1274                                        STAILQ_INSERT_TAIL(&stpref->st_reqs,
1275                                            rqstp, rq_link);
1276                                        mtx_unlock(&stpref->st_lock);
1277                                        if (stpref != st)
1278                                                rqstp = NULL;
1279                                } else {
1280                                        rqstp->rq_thread = st;
1281                                        STAILQ_INSERT_TAIL(&st->st_reqs,
1282                                            rqstp, rq_link);
1283                                }
1284                        }
1285                } while (rqstp == NULL && stat == XPRT_MOREREQS
1286                    && grp->sg_state != SVCPOOL_CLOSING);
1287
1288                /*
1289                 * Move this transport to the end of the active list to
1290                 * ensure fairness when multiple transports are active.
1291                 * If this was the last queued request, svc_getreq will end
1292                 * up calling xprt_inactive to remove from the active list.
1293                 */
1294                mtx_lock(&grp->sg_lock);
1295                xprt->xp_thread = NULL;
1296                st->st_xprt = NULL;
1297                if (xprt->xp_active) {
1298                        if (!svc_request_space_available(pool) ||
1299                            !xprt_assignthread(xprt))
1300                                TAILQ_INSERT_TAIL(&grp->sg_active,
1301                                    xprt, xp_alink);
1302                }
1303                mtx_unlock(&grp->sg_lock);
1304                SVC_RELEASE(xprt);
1305
1306                /*
1307                 * Execute what we have queued.
1308                 */
1309                mtx_lock(&st->st_lock);
1310                while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) {
1311                        STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link);
1312                        mtx_unlock(&st->st_lock);
1313                        sz = (long)rqstp->rq_size;
1314                        svc_executereq(rqstp);
1315                        svc_change_space_used(pool, -sz);
1316                        mtx_lock(&st->st_lock);
1317                }
1318                mtx_unlock(&st->st_lock);
1319                mtx_lock(&grp->sg_lock);
1320        }
1321
1322        if (st->st_xprt) {
1323                xprt = st->st_xprt;
1324                st->st_xprt = NULL;
1325                SVC_RELEASE(xprt);
1326        }
1327        KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit"));
1328        mtx_destroy(&st->st_lock);
1329        cv_destroy(&st->st_cond);
1330        mem_free(st, sizeof(*st));
1331
1332        grp->sg_threadcount--;
1333        if (!ismaster)
1334                wakeup(grp);
1335        mtx_unlock(&grp->sg_lock);
1336}
1337
1338static void
1339svc_thread_start(void *arg)
1340{
1341
1342        svc_run_internal((SVCGROUP *) arg, FALSE);
1343        kthread_exit();
1344}
1345
1346static void
1347svc_new_thread(SVCGROUP *grp)
1348{
1349        SVCPOOL *pool = grp->sg_pool;
1350        struct thread *td;
1351
1352        mtx_lock(&grp->sg_lock);
1353        grp->sg_threadcount++;
1354        mtx_unlock(&grp->sg_lock);
1355        kthread_add(svc_thread_start, grp, pool->sp_proc, &td, 0, 0,
1356            "%s: service", pool->sp_name);
1357}
1358
1359void
1360svc_run(SVCPOOL *pool)
1361{
1362        int g, i;
1363#ifndef __rtems__
1364        struct proc *p;
1365        struct thread *td;
1366#endif /* __rtems__ */
1367        SVCGROUP *grp;
1368
1369#ifndef __rtems__
1370        p = curproc;
1371        td = curthread;
1372        snprintf(td->td_name, sizeof(td->td_name),
1373            "%s: master", pool->sp_name);
1374#endif /* __rtems__ */
1375
1376        pool->sp_state = SVCPOOL_ACTIVE;
1377#ifndef __rtems__
1378        pool->sp_proc = p;
1379#else /* __rtems__ */
1380        pool->sp_proc = NULL;
1381#endif /* __rtems__ */
1382
1383        /* Choose group count based on number of threads and CPUs. */
1384        pool->sp_groupcount = max(1, min(SVC_MAXGROUPS,
1385            min(pool->sp_maxthreads / 2, mp_ncpus) / 6));
1386        for (g = 0; g < pool->sp_groupcount; g++) {
1387                grp = &pool->sp_groups[g];
1388                grp->sg_minthreads = max(1,
1389                    pool->sp_minthreads / pool->sp_groupcount);
1390                grp->sg_maxthreads = max(1,
1391                    pool->sp_maxthreads / pool->sp_groupcount);
1392                grp->sg_lastcreatetime = time_uptime;
1393        }
1394
1395        /* Starting threads */
1396        pool->sp_groups[0].sg_threadcount++;
1397        for (g = 0; g < pool->sp_groupcount; g++) {
1398                grp = &pool->sp_groups[g];
1399                for (i = ((g == 0) ? 1 : 0); i < grp->sg_minthreads; i++)
1400                        svc_new_thread(grp);
1401        }
1402        svc_run_internal(&pool->sp_groups[0], TRUE);
1403
1404        /* Waiting for threads to stop. */
1405        for (g = 0; g < pool->sp_groupcount; g++) {
1406                grp = &pool->sp_groups[g];
1407                mtx_lock(&grp->sg_lock);
1408                while (grp->sg_threadcount > 0)
1409                        msleep(grp, &grp->sg_lock, 0, "svcexit", 0);
1410                mtx_unlock(&grp->sg_lock);
1411        }
1412}
1413
1414void
1415svc_exit(SVCPOOL *pool)
1416{
1417        SVCGROUP *grp;
1418        SVCTHREAD *st;
1419        int g;
1420
1421        pool->sp_state = SVCPOOL_CLOSING;
1422        for (g = 0; g < pool->sp_groupcount; g++) {
1423                grp = &pool->sp_groups[g];
1424                mtx_lock(&grp->sg_lock);
1425                if (grp->sg_state != SVCPOOL_CLOSING) {
1426                        grp->sg_state = SVCPOOL_CLOSING;
1427                        LIST_FOREACH(st, &grp->sg_idlethreads, st_ilink)
1428                                cv_signal(&st->st_cond);
1429                }
1430                mtx_unlock(&grp->sg_lock);
1431        }
1432}
1433
1434bool_t
1435svc_getargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1436{
1437        struct mbuf *m;
1438        XDR xdrs;
1439        bool_t stat;
1440
1441        m = rqstp->rq_args;
1442        rqstp->rq_args = NULL;
1443
1444        xdrmbuf_create(&xdrs, m, XDR_DECODE);
1445        stat = xargs(&xdrs, args);
1446        XDR_DESTROY(&xdrs);
1447
1448        return (stat);
1449}
1450
1451bool_t
1452svc_freeargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1453{
1454        XDR xdrs;
1455
1456        if (rqstp->rq_addr) {
1457                free(rqstp->rq_addr, M_SONAME);
1458                rqstp->rq_addr = NULL;
1459        }
1460
1461        xdrs.x_op = XDR_FREE;
1462        return (xargs(&xdrs, args));
1463}
1464
1465void
1466svc_freereq(struct svc_req *rqstp)
1467{
1468        SVCTHREAD *st;
1469        SVCPOOL *pool;
1470
1471        st = rqstp->rq_thread;
1472        if (st) {
1473                pool = st->st_pool;
1474                if (pool->sp_done)
1475                        pool->sp_done(st, rqstp);
1476        }
1477
1478        if (rqstp->rq_auth.svc_ah_ops)
1479                SVCAUTH_RELEASE(&rqstp->rq_auth);
1480
1481        if (rqstp->rq_xprt) {
1482                SVC_RELEASE(rqstp->rq_xprt);
1483        }
1484
1485        if (rqstp->rq_addr)
1486                free(rqstp->rq_addr, M_SONAME);
1487
1488        if (rqstp->rq_args)
1489                m_freem(rqstp->rq_args);
1490
1491        free(rqstp, M_RPC);
1492}
Note: See TracBrowser for help on using the repository browser.