source: rtems/cpukit/libfs/src/nfsclient/src/rpcio.c @ 4ac5ffbb

5
Last change on this file since 4ac5ffbb was 03e5a780, checked in by Sebastian Huber <sebastian.huber@…>, on 12/15/17 at 05:20:09

NFS: Use self-contained recursive mutex

Update #2843.

  • Property mode set to 100644
File size: 43.9 KB
Line 
1/**
2 * @file
3 *
4 * @brief RPC Multiplexor for a Multitasking Environment
5 * @ingroup libfs
6 *
7 * This code funnels arbitrary task's UDP/RPC requests
8 * through one socket to arbitrary servers.
9 * The replies are gathered and dispatched to the
10 * requestors.
11 * One task handles all the sending and receiving
12 * work including retries.
13 * It is up to the requestor, however, to do
14 * the XDR encoding of the arguments / decoding
15 * of the results (except for the RPC header which
16 * is handled by the daemon).
17 */
18
19/*
20 * Author: Till Straumann <strauman@slac.stanford.edu>, 2002
21 *
22 * Authorship
23 * ----------
24 * This software (NFS-2 client implementation for RTEMS) was created by
25 *     Till Straumann <strauman@slac.stanford.edu>, 2002-2007,
26 *         Stanford Linear Accelerator Center, Stanford University.
27 *
28 * Acknowledgement of sponsorship
29 * ------------------------------
30 * The NFS-2 client implementation for RTEMS was produced by
31 *     the Stanford Linear Accelerator Center, Stanford University,
32 *         under Contract DE-AC03-76SFO0515 with the Department of Energy.
33 *
34 * Government disclaimer of liability
35 * ----------------------------------
36 * Neither the United States nor the United States Department of Energy,
37 * nor any of their employees, makes any warranty, express or implied, or
38 * assumes any legal liability or responsibility for the accuracy,
39 * completeness, or usefulness of any data, apparatus, product, or process
40 * disclosed, or represents that its use would not infringe privately owned
41 * rights.
42 *
43 * Stanford disclaimer of liability
44 * --------------------------------
45 * Stanford University makes no representations or warranties, express or
46 * implied, nor assumes any liability for the use of this software.
47 *
48 * Stanford disclaimer of copyright
49 * --------------------------------
50 * Stanford University, owner of the copyright, hereby disclaims its
51 * copyright and all other rights in this software.  Hence, anyone may
52 * freely use it for any purpose without restriction.
53 *
54 * Maintenance of notices
55 * ----------------------
56 * In the interest of clarity regarding the origin and status of this
57 * SLAC software, this and all the preceding Stanford University notices
58 * are to remain affixed to any copy or derivative of this software made
59 * or distributed by the recipient and are to be affixed to any copy of
60 * software made or distributed by the recipient that contains a copy or
61 * derivative of this software.
62 *
63 * ------------------ SLAC Software Notices, Set 4 OTT.002a, 2004 FEB 03
64 */
65
66#if HAVE_CONFIG_H
67#include "config.h"
68#endif
69
70#include <inttypes.h>
71
72#include <rtems.h>
73#include <rtems/error.h>
74#include <rtems/rtems_bsdnet.h>
75#include <rtems/thread.h>
76#include <stdlib.h>
77#include <time.h>
78#include <rpc/rpc.h>
79#include <rpc/pmap_prot.h>
80#include <errno.h>
81#include <sys/ioctl.h>
82#include <assert.h>
83#include <stdio.h>
84#include <errno.h>
85#include <inttypes.h>
86#include <string.h>
87#include <netinet/in.h>
88#include <arpa/inet.h>
89#include <sys/cpuset.h>
90
91#include "rpcio.h"
92#include "nfsclient-private.h"
93
94/****************************************************************/
95/* CONFIGURABLE PARAMETERS                                      */
96/****************************************************************/
97
98#define MBUF_RX                 /* If defined: use mbuf XDR stream for
99                                                 *  decoding directly out of mbufs
100                                                 *  Otherwise, the regular 'recvfrom()'
101                                                 *  interface will be used involving an
102                                                 *  extra buffer allocation + copy step.
103                                                 */
104
105#define MBUF_TX                 /* If defined: avoid copying data when
106                                                 *  sending. Instead, use a wrapper to
107                                                 *  'sosend()' which will point an MBUF
108                                                 *  directly to our buffer space.
109                                                 *  Note that the BSD stack does not copy
110                                                 *  data when fragmenting packets - it
111                                                 *  merely uses an mbuf chain pointing
112                                                 *  into different areas of the data.
113                                                 *
114                                                 * If undefined, the regular 'sendto()'
115                                                 *  interface is used.
116                                                 */
117
118#undef REJECT_SERVERIP_MISMATCH
119                                                /* If defined, RPC replies must come from the server
120                                                 * that was queried. Eric Norum has reported problems
121                                                 * with clustered NFS servers. So we disable this
122                                                 * reducing paranoia...
123                                                 */
124
125/* daemon task parameters */
126#define RPCIOD_STACK            10000
127#define RPCIOD_PRIO                     100     /* *fallback* priority */
128
129/* depth of the message queue for sending
130 * RPC requests to the daemon
131 */
132#define RPCIOD_QDEPTH           20
133
134/* Maximum retry limit for retransmission */
135#define RPCIOD_RETX_CAP_S       3 /* seconds */
136
137/* Default timeout for RPC calls */
138#define RPCIOD_DEFAULT_TIMEOUT  (&_rpc_default_timeout)
139static struct timeval _rpc_default_timeout = { 10 /* secs */, 0 /* usecs */ };
140
141/* how many times should we try to resend a failed
142 * transaction with refreshed AUTHs
143 */
144#define RPCIOD_REFRESH          2
145
146/* Events we are using; the RPC_EVENT
147 * MUST NOT be used by any application
148 * thread doing RPC IO (e.g. NFS)
149 */
150#define RTEMS_RPC_EVENT         RTEMS_EVENT_30  /* THE event used by RPCIO. Every task doing
151                                                                                         * RPC IO will receive this - hence it is
152                                                                                         * RESERVED
153                                                                                         */
154#define RPCIOD_RX_EVENT         RTEMS_EVENT_1   /* Events the RPCIOD is using/waiting for */
155#define RPCIOD_TX_EVENT         RTEMS_EVENT_2
156#define RPCIOD_KILL_EVENT       RTEMS_EVENT_3   /* send to the daemon to kill it          */
157
158#define LD_XACT_HASH            8                               /* ld of the size of the transaction hash table  */
159
160
161/* Debugging Flags                                              */
162
163/* NOTE: defining DEBUG 0 leaves some 'assert()' paranoia checks
164 *       but produces no output
165 */
166
167#define DEBUG_TRACE_XACT        (1<<0)
168#define DEBUG_EVENTS            (1<<1)
169#define DEBUG_MALLOC            (1<<2)
170#define DEBUG_TIMEOUT           (1<<3)
171#define DEBUG_PACKLOSS          (1<<4)  /* This introduces random, artificial packet losses to test retransmission */
172
173#define DEBUG_PACKLOSS_FRACT (0xffffffff/10)
174
175/* USE PARENTHESIS WHEN 'or'ing MULTIPLE FLAGS: (DEBUG_XX | DEBUG_YY) */
176#define DEBUG                           (0)
177
178/****************************************************************/
179/* END OF CONFIGURABLE SECTION                                  */
180/****************************************************************/
181
182/* prevent rollover of our timers by readjusting the epoch on the fly */
183#if     (DEBUG) & DEBUG_TIMEOUT
184#define RPCIOD_EPOCH_SECS       10
185#else
186#define RPCIOD_EPOCH_SECS       10000
187#endif
188
189#ifdef  DEBUG
190#define ASSERT(arg)                     assert(arg)
191#else
192#define ASSERT(arg)                     if (arg)
193#endif
194
195/****************************************************************/
196/* MACROS                                                       */
197/****************************************************************/
198
199
200#define XACT_HASHS              (1<<(LD_XACT_HASH))     /* the hash table size derived from the ld       */
201#define XACT_HASH_MSK   ((XACT_HASHS)-1)        /* mask to extract the hash index from a RPC-XID */
202
203
204#define MU_LOCK(mutex)          rtems_recursive_mutex_lock(&(mutex))
205
206#define MU_UNLOCK(mutex)        rtems_recursive_mutex_unlock(&(mutex))
207
208#define MU_CREAT(pmutex)        rtems_recursive_mutex_init((pmutex), "RPCl")
209
210#define MU_DESTROY(mutex)       rtems_recursive_mutex_destroy(&(mutex))
211
212#define FIRST_ATTEMPT           0x88888888 /* some time that is never reached */
213
214/****************************************************************/
215/* TYPE DEFINITIONS                                             */
216/****************************************************************/
217
218typedef rtems_interval          TimeoutT;
219
220/* 100000th implementation of a doubly linked list;
221 * since only one thread is looking at these,
222 * we need no locking
223 */
224typedef struct ListNodeRec_ {
225        struct ListNodeRec_ *next, *prev;
226} ListNodeRec, *ListNode;
227
228
229/* Structure representing an RPC server */
230typedef struct RpcUdpServerRec_ {
231                RpcUdpServer            next;                   /* linked list of all servers; protected by hlock */
232                union {
233                struct sockaddr_in      sin;
234                struct sockaddr     sa;
235                }                                       addr;
236                AUTH                            *auth;
237                rtems_recursive_mutex                   authlock;               /* must MUTEX the auth object - it's not clear
238                                                                                         *  what is better:
239                                                                                         *   1 having one (MUTEXed) auth per server
240                                                                                         *         who is shared among all transactions
241                                                                                         *         using that server
242                                                                                         *       2 maintaining an AUTH per transaction
243                                                                                         *         (there are then other options: manage
244                                                                                         *         XACT pools on a per-server basis instead
245                                                                                         *         of associating a server with a XACT when
246                                                                                         *   sending)
247                                                                                         * experience will show if the current (1)
248                                                                                         * approach has to be changed.
249                                                                                         */
250                TimeoutT                        retry_period;   /* dynamically adjusted retry period
251                                                                                         * (based on packet roundtrip time)
252                                                                                         */
253                /* STATISTICS */
254                unsigned long           retrans;                /* how many retries were issued by this server         */
255                unsigned long           requests;               /* how many requests have been sent                    */
256                unsigned long       timeouts;           /* how many requests have timed out                    */
257                unsigned long       errors;         /* how many errors have occurred (other than timeouts) */
258                char                            name[20];               /* server's address in IP 'dot' notation               */
259} RpcUdpServerRec;
260
261typedef union  RpcBufU_ {
262                uint32_t                        xid;
263                char                            buf[1];
264} RpcBufU, *RpcBuf;
265
266/* RX Buffer implementation; this is either
267 * an MBUF chain (MBUF_RX configuration)
268 * or a buffer allocated from the heap
269 * where recvfrom copies the (encoded) reply
270 * to. The XDR routines the copy/decode
271 * it into the user's data structures.
272 */
273#ifdef MBUF_RX
274typedef struct mbuf *           RxBuf;  /* an MBUF chain */
275static  void                            bufFree(struct mbuf **m);
276#define XID(ibuf)                       (*(mtod((ibuf), u_long *)))
277#else
278typedef RpcBuf                          RxBuf;
279#define bufFree(b)                      do { MY_FREE(*(b)); *(b)=0; } while(0)
280#define XID(ibuf)                       ((ibuf)->xid)
281#endif
282
283/* A RPC 'transaction' consisting
284 * of server and requestor information,
285 * buffer space and an XDR object
286 * (for encoding arguments).
287 */
288typedef struct RpcUdpXactRec_ {
289                ListNodeRec                     node;           /* so we can put XACTs on a list                */
290                RpcUdpServer            server;         /* server this XACT goes to                     */
291                long                            lifetime;       /* during the lifetime, retry attempts are made */
292                long                            tolive;         /* lifetime timer                               */
293                struct rpc_err          status;         /* RPC reply error status                       */
294                long                            age;            /* age info; needed to manage retransmission    */
295                long                            trip;           /* record round trip time in ticks              */
296                rtems_id                        requestor;      /* the task waiting for this XACT to complete   */
297                RpcUdpXactPool          pool;           /* if this XACT belong to a pool, this is it    */
298                XDR                                     xdrs;           /* argument encoder stream                      */
299                int                                     xdrpos;     /* stream position after the (permanent) header */
300                xdrproc_t                       xres;           /* reply decoder proc - TODO needn't be here    */
301                caddr_t                         pres;           /* reply decoded obj  - TODO needn't be here    */
302#ifndef MBUF_RX
303                int                                     ibufsize;       /* size of the ibuf (bytes)                     */
304#endif
305#ifdef  MBUF_TX
306                int                                     refcnt;         /* mbuf external storage reference count        */
307#endif
308                int                                     obufsize;       /* size of the obuf (bytes)                     */
309                RxBuf                           ibuf;           /* pointer to input buffer assigned by daemon   */
310                RpcBufU                         obuf;       /* output buffer (encoded args) APPENDED HERE   */
311} RpcUdpXactRec;
312
313typedef struct RpcUdpXactPoolRec_ {
314        rtems_id        box;
315        int                     prog;
316        int                     version;
317        int                     xactSize;
318} RpcUdpXactPoolRec;
319
320/* a global hash table where all 'living' transaction
321 * objects are registered.
322 * A number of bits in a transaction's XID maps 1:1 to
323 * an index in this table. Hence, the XACT matching
324 * an RPC/UDP reply packet can quickly be found
325 * The size of this table imposes a hard limit on the
326 * number of all created transactions in the system.
327 */
328static RpcUdpXact xactHashTbl[XACT_HASHS]={0};
329static u_long     xidUpper   [XACT_HASHS]={0};
330static unsigned   xidHashSeed            = 0 ;
331
332/* forward declarations */
333static RpcUdpXact
334sockRcv(void);
335
336static void
337rpcio_daemon(rtems_task_argument);
338
339#ifdef MBUF_TX
340ssize_t
341sendto_nocpy (
342                int s,
343                const void *buf, size_t buflen,
344                int flags,
345                const struct sockaddr *toaddr, int tolen,
346                void *closure,
347                void (*freeproc)(caddr_t, u_int),
348                void (*refproc)(caddr_t, u_int)
349);
350static void paranoia_free(caddr_t closure, u_int size);
351static void paranoia_ref (caddr_t closure, u_int size);
352#define SENDTO  sendto_nocpy
353#else
354#define SENDTO  sendto
355#endif
356
357static RpcUdpServer             rpcUdpServers = 0;      /* linked list of all servers; protected by llock */
358
359static int                              ourSock = -1;           /* the socket we are using for communication */
360static rtems_id                 rpciod  = 0;            /* task id of the RPC daemon                 */
361static rtems_id                 msgQ    = 0;            /* message queue where the daemon picks up
362                                                                                         * requests
363                                                                                         */
364#ifndef NDEBUG
365static rtems_recursive_mutex    llock;          /* MUTEX protecting the server list */
366static rtems_recursive_mutex    hlock;          /* MUTEX protecting the hash table and the list of servers */
367#endif
368static rtems_binary_semaphore   fini    = RTEMS_BINARY_SEMAPHORE_INITIALIZER("RPCf");   /* a synchronization semaphore we use during
369                                                                                         * module cleanup / driver unloading
370                                                                                         */
371static rtems_interval   ticksPerSec;            /* cached system clock rate (WHO IS ASSUMED NOT
372                                                                                         * TO CHANGE)
373                                                                                         */
374
375rtems_task_priority             rpciodPriority = 0;
376#ifdef RTEMS_SMP
377const cpu_set_t                 *rpciodCpuset = 0;
378size_t                          rpciodCpusetSize = 0;
379#endif
380
381#if (DEBUG) & DEBUG_MALLOC
382/* malloc wrappers for debugging */
383static int nibufs = 0;
384
385static inline void *MY_MALLOC(int s)
386{
387        if (s) {
388                void *rval;
389                MU_LOCK(hlock);
390                assert(nibufs++ < 2000);
391                MU_UNLOCK(hlock);
392                assert((rval = malloc(s)) != 0);
393                return rval;
394        }
395        return 0;
396}
397
398static inline void *MY_CALLOC(int n, int s)
399{
400        if (s) {
401                void *rval;
402                MU_LOCK(hlock);
403                assert(nibufs++ < 2000);
404                MU_UNLOCK(hlock);
405                assert((rval = calloc(n,s)) != 0);
406                return rval;
407        }
408        return 0;
409}
410
411
412static inline void MY_FREE(void *p)
413{
414        if (p) {
415                MU_LOCK(hlock);
416                nibufs--;
417                MU_UNLOCK(hlock);
418                free(p);
419        }
420}
421#else
422#define MY_MALLOC       malloc
423#define MY_CALLOC       calloc
424#define MY_FREE         free
425#endif
426
427static inline bool_t
428locked_marshal(RpcUdpServer s, XDR *xdrs)
429{
430bool_t rval;
431        MU_LOCK(s->authlock);
432        rval = AUTH_MARSHALL(s->auth, xdrs);
433        MU_UNLOCK(s->authlock);
434        return rval;
435}
436
437/* Locked operations on a server's auth object */
438static inline bool_t
439locked_validate(RpcUdpServer s, struct opaque_auth *v)
440{
441bool_t rval;
442        MU_LOCK(s->authlock);
443        rval = AUTH_VALIDATE(s->auth, v);
444        MU_UNLOCK(s->authlock);
445        return rval;
446}
447
448static inline bool_t
449locked_refresh(RpcUdpServer s)
450{
451bool_t rval;
452        MU_LOCK(s->authlock);
453        rval = AUTH_REFRESH(s->auth);
454        MU_UNLOCK(s->authlock);
455        return rval;
456}
457
458/* Create a server object
459 *
460 */
461enum clnt_stat
462rpcUdpServerCreate(
463        struct sockaddr_in      *paddr,
464        rpcprog_t               prog,
465        rpcvers_t               vers,
466        u_long                  uid,
467        u_long                  gid,
468        RpcUdpServer            *psrv
469        )
470{
471RpcUdpServer    rval;
472u_short                 port;
473char                    hname[MAX_MACHINE_NAME + 1];
474int                             theuid, thegid;
475int                             thegids[NGRPS];
476gid_t                   gids[NGROUPS];
477int                             len,i;
478AUTH                    *auth;
479enum clnt_stat  pmap_err;
480struct pmap             pmaparg;
481
482        if ( gethostname(hname, MAX_MACHINE_NAME) ) {
483                fprintf(stderr,
484                                "RPCIO - error: I have no hostname ?? (%s)\n",
485                                strerror(errno));
486                return RPC_UNKNOWNHOST;
487        }
488
489        len = getgroups(NGROUPS, gids);
490        if (len < 0 ) {
491                fprintf(stderr,
492                                "RPCIO - error: I unable to get group ids (%s)\n",
493                                strerror(errno));
494                return RPC_FAILED;
495        }
496
497        if ( len > NGRPS )
498                len = NGRPS;
499
500        for (i=0; i<len; i++)
501                thegids[i] = (int)gids[i];
502
503        theuid = (int) ((RPCIOD_DEFAULT_ID == uid) ? geteuid() : uid);
504        thegid = (int) ((RPCIOD_DEFAULT_ID == gid) ? getegid() : gid);
505
506        if ( !(auth = authunix_create(hname, theuid, thegid, len, thegids)) ) {
507                fprintf(stderr,
508                                "RPCIO - error: unable to create RPC AUTH\n");
509                return RPC_FAILED;
510        }
511
512        /* if they specified no port try to ask the portmapper */
513        if (!paddr->sin_port) {
514
515                paddr->sin_port = htons(PMAPPORT);
516
517        pmaparg.pm_prog = prog;
518        pmaparg.pm_vers = vers;
519        pmaparg.pm_prot = IPPROTO_UDP;
520        pmaparg.pm_port = 0;  /* not needed or used */
521
522
523                /* dont use non-reentrant pmap_getport ! */
524
525                pmap_err = rpcUdpCallRp(
526                                                paddr,
527                                                PMAPPROG,
528                                                PMAPVERS,
529                                                PMAPPROC_GETPORT,
530                                                xdr_pmap,
531                                                &pmaparg,
532                                                xdr_u_short,
533                                                &port,
534                                                uid,
535                                                gid,
536                                                0);
537
538                if ( RPC_SUCCESS != pmap_err ) {
539                        paddr->sin_port = 0;
540                        return pmap_err;
541                }
542
543                paddr->sin_port = htons(port);
544        }
545
546        if (0==paddr->sin_port) {
547                        return RPC_PROGNOTREGISTERED;
548        }
549
550        rval                            = (RpcUdpServer)MY_MALLOC(sizeof(*rval));
551        memset(rval, 0, sizeof(*rval));
552
553        if (!inet_ntop(AF_INET, &paddr->sin_addr, rval->name, sizeof(rval->name)))
554                sprintf(rval->name,"?.?.?.?");
555        rval->addr.sin          = *paddr;
556
557        /* start with a long retransmission interval - it
558         * will be adapted dynamically
559         */
560        rval->retry_period  = RPCIOD_RETX_CAP_S * ticksPerSec;
561
562        rval->auth                      = auth;
563
564        MU_CREAT( &rval->authlock );
565
566        /* link into list */
567        MU_LOCK( llock );
568        rval->next = rpcUdpServers;
569        rpcUdpServers = rval;
570        MU_UNLOCK( llock );
571
572        *psrv                           = rval;
573        return RPC_SUCCESS;
574}
575
576void
577rpcUdpServerDestroy(RpcUdpServer s)
578{
579RpcUdpServer prev;
580        if (!s)
581                return;
582        /* we should probably verify (but how?) that nobody
583         * (at least: no outstanding XACTs) is using this
584         * server;
585         */
586
587        /* remove from server list */
588        MU_LOCK(llock);
589        prev = rpcUdpServers;
590        if ( s == prev ) {
591                rpcUdpServers = s->next;
592        } else {
593                for ( ; prev ; prev = prev->next) {
594                        if (prev->next == s) {
595                                prev->next = s->next;
596                                break;
597                        }
598                }
599        }
600        MU_UNLOCK(llock);
601
602        /* MUST have found it */
603        assert(prev);
604
605        auth_destroy(s->auth);
606
607        MU_DESTROY(s->authlock);
608        MY_FREE(s);
609}
610
611int
612rpcUdpStats(FILE *f)
613{
614RpcUdpServer s;
615
616        if (!f) f = stdout;
617
618        fprintf(f,"RPCIOD statistics:\n");
619
620        MU_LOCK(llock);
621        for (s = rpcUdpServers; s; s=s->next) {
622                fprintf(f,"\nServer -- %s:\n", s->name);
623                fprintf(f,"  requests    sent: %10ld, retransmitted: %10ld\n",
624                                                s->requests, s->retrans);
625                fprintf(f,"         timed out: %10ld,   send errors: %10ld\n",
626                                                s->timeouts, s->errors);
627                fprintf(f,"  current retransmission interval: %dms\n",
628                                                (unsigned)(s->retry_period * 1000 / ticksPerSec) );
629        }
630        MU_UNLOCK(llock);
631
632        return 0;
633}
634
635RpcUdpXact
636rpcUdpXactCreate(
637        u_long  program,
638        u_long  version,
639        u_long  size
640        )
641{
642RpcUdpXact              rval=0;
643struct rpc_msg  header;
644register int    i,j;
645
646        if (!size)
647                size = UDPMSGSIZE;
648        /* word align */
649        size = (size + 3) & ~3;
650
651        rval = (RpcUdpXact)MY_CALLOC(1,sizeof(*rval) - sizeof(rval->obuf) + size);
652
653        if (rval) {
654
655                header.rm_xid             = 0;
656                header.rm_direction       = CALL;
657                header.rm_call.cb_rpcvers = RPC_MSG_VERSION;
658                header.rm_call.cb_prog    = program;
659                header.rm_call.cb_vers    = version;
660                xdrmem_create(&(rval->xdrs), rval->obuf.buf, size, XDR_ENCODE);
661
662                if (!xdr_callhdr(&(rval->xdrs), &header)) {
663                        MY_FREE(rval);
664                        return 0;
665                }
666                /* pick a free table slot and initialize the XID */
667                MU_LOCK(hlock);
668                rval->obuf.xid = (xidHashSeed++ ^ ((uintptr_t)rval>>10)) & XACT_HASH_MSK;
669                i=j=(rval->obuf.xid & XACT_HASH_MSK);
670                if (msgQ) {
671                        /* if there's no message queue, refuse to
672                         * give them transactions; we might be in the process to
673                         * go away...
674                         */
675                        do {
676                                i=(i+1) & XACT_HASH_MSK; /* cheap modulo */
677                                if (!xactHashTbl[i]) {
678#if (DEBUG) & DEBUG_TRACE_XACT
679                                        fprintf(stderr,"RPCIO: entering index %i, val %x\n",i,rval);
680#endif
681                                        xactHashTbl[i]=rval;
682                                        j=-1;
683                                        break;
684                                }
685                        } while (i!=j);
686                }
687                MU_UNLOCK(hlock);
688                if (i==j) {
689                        XDR_DESTROY(&rval->xdrs);
690                        MY_FREE(rval);
691                        return 0;
692                }
693                rval->obuf.xid  = xidUpper[i] | i;
694                rval->xdrpos    = XDR_GETPOS(&(rval->xdrs));
695                rval->obufsize  = size;
696        }
697        return rval;
698}
699
700void
701rpcUdpXactDestroy(RpcUdpXact xact)
702{
703int i = xact->obuf.xid & XACT_HASH_MSK;
704
705#if (DEBUG) & DEBUG_TRACE_XACT
706                fprintf(stderr,"RPCIO: removing index %i, val %x\n",i,xact);
707#endif
708
709                ASSERT( xactHashTbl[i]==xact );
710
711                MU_LOCK(hlock);
712                xactHashTbl[i]=0;
713                /* remember XID we used last time so we can avoid
714                 * reusing the same one (incremented by rpcUdpSend routine)
715                 */
716                xidUpper[i]   = xact->obuf.xid;
717                MU_UNLOCK(hlock);
718
719                bufFree(&xact->ibuf);
720
721                XDR_DESTROY(&xact->xdrs);
722                MY_FREE(xact);
723}
724
725
726
727/* Send a transaction, i.e. enqueue it to the
728 * RPC daemon who will actually send it.
729 */
730enum clnt_stat
731rpcUdpSend(
732        RpcUdpXact              xact,
733        RpcUdpServer    srvr,
734        struct timeval  *timeout,
735        u_long                  proc,
736        xdrproc_t               xres, caddr_t pres,
737        xdrproc_t               xargs, caddr_t pargs,
738        ...
739   )
740{
741register XDR    *xdrs;
742unsigned long   ms;
743va_list                 ap;
744
745        va_start(ap,pargs);
746
747        if (!timeout)
748                timeout = RPCIOD_DEFAULT_TIMEOUT;
749
750        ms = 1000 * timeout->tv_sec + timeout->tv_usec/1000;
751
752        /* round lifetime to closest # of ticks */
753        xact->lifetime  = (ms * ticksPerSec + 500) / 1000;
754        if ( 0 == xact->lifetime )
755                xact->lifetime = 1;
756
757#if (DEBUG) & DEBUG_TIMEOUT
758        {
759        static int once=0;
760        if (!once++) {
761                fprintf(stderr,
762                                "Initial lifetime: %i (ticks)\n",
763                                xact->lifetime);
764        }
765        }
766#endif
767
768        xact->tolive    = xact->lifetime;
769
770        xact->xres      = xres;
771        xact->pres      = pres;
772        xact->server    = srvr;
773
774        xdrs            = &xact->xdrs;
775        xdrs->x_op      = XDR_ENCODE;
776        /* increment transaction ID */
777        xact->obuf.xid += XACT_HASHS;
778        XDR_SETPOS(xdrs, xact->xdrpos);
779        if ( !XDR_PUTLONG(xdrs,(long*)&proc) || !locked_marshal(srvr, xdrs) ||
780                 !xargs(xdrs, pargs) ) {
781                va_end(ap);
782                return(xact->status.re_status=RPC_CANTENCODEARGS);
783        }
784        while ((xargs=va_arg(ap,xdrproc_t))) {
785                if (!xargs(xdrs, va_arg(ap,caddr_t)))
786                va_end(ap);
787                return(xact->status.re_status=RPC_CANTENCODEARGS);
788        }
789
790        va_end(ap);
791
792        rtems_task_ident(RTEMS_SELF, RTEMS_WHO_AM_I, &xact->requestor);
793        if ( rtems_message_queue_send( msgQ, &xact, sizeof(xact)) ) {
794                return RPC_CANTSEND;
795        }
796        /* wakeup the rpciod */
797        ASSERT( RTEMS_SUCCESSFUL==rtems_event_send(rpciod, RPCIOD_TX_EVENT) );
798
799        return RPC_SUCCESS;
800}
801
802/* Block for the RPC reply to an outstanding
803 * transaction.
804 * The caller is woken by the RPC daemon either
805 * upon reception of the reply or on timeout.
806 */
807enum clnt_stat
808rpcUdpRcv(RpcUdpXact xact)
809{
810int                                     refresh;
811XDR                     reply_xdrs;
812struct rpc_msg          reply_msg;
813rtems_status_code       status;
814rtems_event_set         gotEvents;
815
816        refresh = 0;
817
818        do {
819
820        /* block for the reply */
821        status = rtems_event_receive(
822                RTEMS_RPC_EVENT,
823                RTEMS_WAIT | RTEMS_EVENT_ANY,
824                RTEMS_NO_TIMEOUT,
825                &gotEvents);
826        ASSERT( status == RTEMS_SUCCESSFUL );
827
828        if (xact->status.re_status) {
829#ifdef MBUF_RX
830                /* add paranoia */
831                ASSERT( !xact->ibuf );
832#endif
833                return xact->status.re_status;
834        }
835
836#ifdef MBUF_RX
837        xdrmbuf_create(&reply_xdrs, xact->ibuf, XDR_DECODE);
838#else
839        xdrmem_create(&reply_xdrs, xact->ibuf->buf, xact->ibufsize, XDR_DECODE);
840#endif
841
842        reply_msg.acpted_rply.ar_verf          = _null_auth;
843        reply_msg.acpted_rply.ar_results.where = xact->pres;
844        reply_msg.acpted_rply.ar_results.proc  = xact->xres;
845
846        if (xdr_replymsg(&reply_xdrs, &reply_msg)) {
847                /* OK */
848                _seterr_reply(&reply_msg, &xact->status);
849                if (RPC_SUCCESS == xact->status.re_status) {
850                        if ( !locked_validate(xact->server,
851                                                                &reply_msg.acpted_rply.ar_verf) ) {
852                                xact->status.re_status = RPC_AUTHERROR;
853                                xact->status.re_why    = AUTH_INVALIDRESP;
854                        }
855                        if (reply_msg.acpted_rply.ar_verf.oa_base) {
856                                reply_xdrs.x_op = XDR_FREE;
857                                xdr_opaque_auth(&reply_xdrs, &reply_msg.acpted_rply.ar_verf);
858                        }
859                        refresh = 0;
860                } else {
861                        /* should we try to refresh our credentials ? */
862                        if ( !refresh ) {
863                                /* had never tried before */
864                                refresh = RPCIOD_REFRESH;
865                        }
866                }
867        } else {
868                reply_xdrs.x_op        = XDR_FREE;
869                xdr_replymsg(&reply_xdrs, &reply_msg);
870                xact->status.re_status = RPC_CANTDECODERES;
871        }
872        XDR_DESTROY(&reply_xdrs);
873
874        bufFree(&xact->ibuf);
875
876#ifndef MBUF_RX
877        xact->ibufsize = 0;
878#endif
879
880        if (refresh && locked_refresh(xact->server)) {
881                rtems_task_ident(RTEMS_SELF, RTEMS_WHO_AM_I, &xact->requestor);
882                if ( rtems_message_queue_send(msgQ, &xact, sizeof(xact)) ) {
883                        return RPC_CANTSEND;
884                }
885                /* wakeup the rpciod */
886                fprintf(stderr,"RPCIO INFO: refreshing my AUTH\n");
887                ASSERT( RTEMS_SUCCESSFUL==rtems_event_send(rpciod, RPCIOD_TX_EVENT) );
888        }
889
890        } while ( 0 &&  refresh-- > 0 );
891
892        return xact->status.re_status;
893}
894
895
896/* On RTEMS, I'm told to avoid select(); this seems to
897 * be more efficient
898 */
899static void
900rxWakeupCB(struct socket *sock, void *arg)
901{
902  rtems_id *rpciod = (rtems_id*) arg;
903  rtems_event_send(*rpciod, RPCIOD_RX_EVENT);
904}
905
906void
907rpcSetXIDs(uint32_t xid)
908{
909        uint32_t i;
910
911        xid &= ~XACT_HASH_MSK;
912
913        for (i = 0; i < XACT_HASHS; ++i) {
914                xidUpper[i] = xid | i;
915        }
916}
917
918int
919rpcUdpInit(void)
920{
921int                     s;
922rtems_status_code       status;
923int                     noblock = 1;
924struct sockwakeup       wkup;
925
926        if (ourSock < 0) {
927    fprintf(stderr,"RTEMS-RPCIOD, " \
928            "Till Straumann, Stanford/SLAC/SSRL 2002, " \
929            "See LICENSE file for licensing info.\n");
930
931                ourSock=socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
932                if (ourSock>=0) {
933                        bindresvport(ourSock,(struct sockaddr_in*)0);
934                        s = ioctl(ourSock, FIONBIO, (char*)&noblock);
935                        assert( s == 0 );
936                        /* assume nobody tampers with the clock !! */
937                        ticksPerSec = rtems_clock_get_ticks_per_second();
938                        MU_CREAT( &hlock );
939                        MU_CREAT( &llock );
940
941                        if ( !rpciodPriority ) {
942                                /* use configured networking priority */
943                                if ( ! (rpciodPriority = rtems_bsdnet_config.network_task_priority) )
944                                        rpciodPriority = RPCIOD_PRIO;   /* fallback value */
945                        }
946
947                        status = rtems_task_create(
948                                                                                        rtems_build_name('R','P','C','d'),
949                                                                                        rpciodPriority,
950                                                                                        RPCIOD_STACK,
951                                                                                        RTEMS_DEFAULT_MODES,
952                                                                                        /* fprintf saves/restores FP registers on PPC :-( */
953                                                                                        RTEMS_DEFAULT_ATTRIBUTES | RTEMS_FLOATING_POINT,
954                                                                                        &rpciod);
955                        assert( status == RTEMS_SUCCESSFUL );
956
957#ifdef RTEMS_SMP
958                        if ( rpciodCpuset == 0 ) {
959                                rpciodCpuset = rtems_bsdnet_config.network_task_cpuset;
960                                rpciodCpusetSize = rtems_bsdnet_config.network_task_cpuset_size;
961                        }
962                        if ( rpciodCpuset != 0 )
963                                rtems_task_set_affinity( rpciod, rpciodCpusetSize, rpciodCpuset );
964#endif
965
966                        wkup.sw_pfn = rxWakeupCB;
967                        wkup.sw_arg = &rpciod;
968                        assert( 0==setsockopt(ourSock, SOL_SOCKET, SO_RCVWAKEUP, &wkup, sizeof(wkup)) );
969                        status = rtems_message_queue_create(
970                                                                                        rtems_build_name('R','P','C','q'),
971                                                                                        RPCIOD_QDEPTH,
972                                                                                        sizeof(RpcUdpXact),
973                                                                                        RTEMS_DEFAULT_ATTRIBUTES,
974                                                                                        &msgQ);
975                        assert( status == RTEMS_SUCCESSFUL );
976                        status = rtems_task_start( rpciod, rpcio_daemon, 0 );
977                        assert( status == RTEMS_SUCCESSFUL );
978
979                } else {
980                        return -1;
981                }
982        }
983        return 0;
984}
985
986int
987rpcUdpCleanup(void)
988{
989        rtems_event_send(rpciod, RPCIOD_KILL_EVENT);
990        /* synchronize with daemon */
991        rtems_binary_semaphore_wait_timed_ticks(&fini, 5*ticksPerSec);
992        /* if the message queue is still there, something went wrong */
993        if (!msgQ) {
994                rtems_task_delete(rpciod);
995        }
996        return (msgQ !=0);
997}
998
999/* Another API - simpler but less efficient.
1000 * For each RPCall, a server and a Xact
1001 * are created and destroyed on the fly.
1002 *
1003 * This should be used for infrequent calls
1004 * (e.g. a NFS mount request).
1005 *
1006 * This is roughly compatible with the original
1007 * clnt_call() etc. API - but it uses our
1008 * daemon and is fully reentrant.
1009 */
1010enum clnt_stat
1011rpcUdpClntCreate(
1012                struct sockaddr_in      *psaddr,
1013                rpcprog_t               prog,
1014                rpcvers_t               vers,
1015                u_long                  uid,
1016                u_long                  gid,
1017                RpcUdpClnt              *pclnt
1018)
1019{
1020RpcUdpXact              x;
1021RpcUdpServer    s;
1022enum clnt_stat  err;
1023
1024        if ( RPC_SUCCESS != (err=rpcUdpServerCreate(psaddr, prog, vers, uid, gid, &s)) )
1025                return err;
1026
1027        if ( !(x=rpcUdpXactCreate(prog, vers, UDPMSGSIZE)) ) {
1028                rpcUdpServerDestroy(s);
1029                return RPC_FAILED;
1030        }
1031        /* TODO: could maintain a server cache */
1032
1033        x->server = s;
1034
1035        *pclnt = x;
1036
1037        return RPC_SUCCESS;
1038}
1039
1040static void
1041rpcUdpClntDestroy(RpcUdpClnt xact)
1042{
1043        rpcUdpServerDestroy(xact->server);
1044        rpcUdpXactDestroy(xact);
1045}
1046
1047enum clnt_stat
1048rpcUdpClntCall(
1049        RpcUdpClnt              xact,
1050        u_long                  proc,
1051        XdrProcT                xargs,
1052        CaddrT                  pargs,
1053        XdrProcT                xres,
1054        CaddrT                  pres,
1055        struct timeval  *timeout
1056        )
1057{
1058enum clnt_stat  stat;
1059
1060                if ( (stat = rpcUdpSend(xact, xact->server, timeout, proc,
1061                                                                xres, pres,
1062                                                                xargs, pargs,
1063                                                                0)) ) {
1064                        fprintf(stderr,"RPCIO Send failed: %i\n",stat);
1065                        return stat;
1066                }
1067                return rpcUdpRcv(xact);
1068}
1069
1070/* a yet simpler interface */
1071enum clnt_stat
1072rpcUdpCallRp(
1073        struct sockaddr_in      *psrvr,
1074        u_long                          prog,
1075        u_long                          vers,
1076        u_long                          proc,
1077        XdrProcT                        xargs,
1078        CaddrT                          pargs,
1079        XdrProcT                        xres,
1080        CaddrT                          pres,
1081        u_long                          uid,            /* RPCIO_DEFAULT_ID picks default */
1082        u_long                          gid,            /* RPCIO_DEFAULT_ID picks default */
1083        struct timeval          *timeout        /* NULL picks default           */
1084)
1085{
1086RpcUdpClnt                      clp;
1087enum clnt_stat          stat;
1088
1089        stat = rpcUdpClntCreate(
1090                                psrvr,
1091                                prog,
1092                                vers,
1093                                uid,
1094                                gid,
1095                                &clp);
1096
1097        if ( RPC_SUCCESS != stat )
1098                return stat;
1099
1100        stat = rpcUdpClntCall(
1101                                clp,
1102                                proc,
1103                                xargs, pargs,
1104                                xres,  pres,
1105                                timeout);
1106
1107        rpcUdpClntDestroy(clp);
1108
1109        return stat;
1110}
1111
1112/* linked list primitives */
1113static void
1114nodeXtract(ListNode n)
1115{
1116        if (n->prev)
1117                n->prev->next = n->next;
1118        if (n->next)
1119                n->next->prev = n->prev;
1120        n->next = n->prev = 0;
1121}
1122
1123static void
1124nodeAppend(ListNode l, ListNode n)
1125{
1126        if ( (n->next = l->next) )
1127                n->next->prev = n;
1128        l->next = n;
1129        n->prev = l;
1130
1131}
1132
1133/* this code does the work */
1134static void
1135rpcio_daemon(rtems_task_argument arg)
1136{
1137rtems_status_code stat;
1138RpcUdpXact        xact;
1139RpcUdpServer      srv;
1140rtems_interval    next_retrans, then, unow;
1141long                                            now;    /* need to do signed comparison with age! */
1142rtems_event_set   events;
1143ListNode          newList;
1144size_t            size;
1145rtems_id          q          =  0;
1146ListNodeRec       listHead   = {0, 0};
1147unsigned long     epoch      = RPCIOD_EPOCH_SECS * ticksPerSec;
1148unsigned long                   max_period = RPCIOD_RETX_CAP_S * ticksPerSec;
1149rtems_status_code       status;
1150
1151
1152        then = rtems_clock_get_ticks_since_boot();
1153
1154        for (next_retrans = epoch;;) {
1155
1156                if ( RTEMS_SUCCESSFUL !=
1157                         (stat = rtems_event_receive(
1158                                                RPCIOD_RX_EVENT | RPCIOD_TX_EVENT | RPCIOD_KILL_EVENT,
1159                                                RTEMS_WAIT | RTEMS_EVENT_ANY,
1160                                                next_retrans,
1161                                                &events)) ) {
1162                        ASSERT( RTEMS_TIMEOUT == stat );
1163                        events = 0;
1164                }
1165
1166                if (events & RPCIOD_KILL_EVENT) {
1167                        int i;
1168
1169#if (DEBUG) & DEBUG_EVENTS
1170                        fprintf(stderr,"RPCIO: got KILL event\n");
1171#endif
1172
1173                        MU_LOCK(hlock);
1174                        for (i=XACT_HASHS-1; i>=0; i--) {
1175                                if (xactHashTbl[i]) {
1176                                        break;
1177                                }
1178                        }
1179                        if (i<0) {
1180                                /* prevent them from creating and enqueueing more messages */
1181                                q=msgQ;
1182                                /* messages queued after we executed this assignment will fail */
1183                                msgQ=0;
1184                        }
1185                        MU_UNLOCK(hlock);
1186                        if (i>=0) {
1187                                fprintf(stderr,"RPCIO There are still transactions circulating; I refuse to go away\n");
1188                                fprintf(stderr,"(1st in slot %i)\n",i);
1189                                rtems_binary_semaphore_post(&fini);
1190                        } else {
1191                                break;
1192                        }
1193                }
1194
1195                unow = rtems_clock_get_ticks_since_boot();
1196
1197                /* measure everything relative to then to protect against
1198                 * rollover
1199                 */
1200                now = unow - then;
1201
1202                /* NOTE: we don't lock the hash table while we are operating
1203                 * on transactions; the paradigm is that we 'own' a particular
1204                 * transaction (and hence it's hash table slot) from the
1205                 * time the xact was put into the message queue until we
1206                 * wake up the requestor.
1207                 */
1208
1209                if (RPCIOD_RX_EVENT & events) {
1210
1211#if (DEBUG) & DEBUG_EVENTS
1212                        fprintf(stderr,"RPCIO: got RX event\n");
1213#endif
1214
1215                        while ((xact=sockRcv())) {
1216
1217                                /* extract from the retransmission list */
1218                                nodeXtract(&xact->node);
1219
1220                                /* change the ID - there might already be
1221                                 * a retransmission on the way. When it's
1222                                 * reply arrives we must not find it's ID
1223                                 * in the hashtable
1224                                 */
1225                                xact->obuf.xid        += XACT_HASHS;
1226
1227                                xact->status.re_status = RPC_SUCCESS;
1228
1229                                /* calculate roundtrip ticks */
1230                                xact->trip             = now - xact->trip;
1231
1232                                srv                    = xact->server;
1233
1234                                /* adjust the server's retry period */
1235                                {
1236                                        register TimeoutT rtry = srv->retry_period;
1237                                        register TimeoutT trip = xact->trip;
1238
1239                                        ASSERT( trip >= 0 );
1240
1241                                        if ( 0==trip )
1242                                                trip = 1;
1243
1244                                        /* retry_new = 0.75*retry_old + 0.25 * 8 * roundrip */
1245                                        rtry   = (3*rtry + (trip << 3)) >> 2;
1246
1247                                        if ( rtry > max_period )
1248                                                rtry = max_period;
1249
1250                                        srv->retry_period = rtry;
1251                                }
1252
1253                                /* wakeup requestor */
1254                                rtems_event_send(xact->requestor, RTEMS_RPC_EVENT);
1255                        }
1256                }
1257
1258                if (RPCIOD_TX_EVENT & events) {
1259
1260#if (DEBUG) & DEBUG_EVENTS
1261                        fprintf(stderr,"RPCIO: got TX event\n");
1262#endif
1263
1264                        while (RTEMS_SUCCESSFUL == rtems_message_queue_receive(
1265                                                                                        msgQ,
1266                                                                                        &xact,
1267                                                                                        &size,
1268                                                                                        RTEMS_NO_WAIT,
1269                                                                                        RTEMS_NO_TIMEOUT)) {
1270                                /* put to the head of timeout q */
1271                                nodeAppend(&listHead, &xact->node);
1272
1273                                xact->age  = now;
1274                                xact->trip = FIRST_ATTEMPT;
1275                        }
1276                }
1277
1278
1279                /* work the timeout q */
1280                newList = 0;
1281                for ( xact=(RpcUdpXact)listHead.next;
1282                          xact && xact->age <= now;
1283                          xact=(RpcUdpXact)listHead.next ) {
1284
1285                                /* extract from the list */
1286                                nodeXtract(&xact->node);
1287
1288                                srv = xact->server;
1289
1290                                if (xact->tolive < 0) {
1291                                        /* this one timed out */
1292                                        xact->status.re_errno  = ETIMEDOUT;
1293                                        xact->status.re_status = RPC_TIMEDOUT;
1294
1295                                        srv->timeouts++;
1296
1297                                        /* Change the ID - there might still be
1298                                         * a reply on the way. When it arrives we
1299                                         * must not find it's ID in the hash table
1300                                         *
1301                                         * Thanks to Steven Johnson for hunting this
1302                                         * one down.
1303                                         */
1304                                        xact->obuf.xid        += XACT_HASHS;
1305
1306#if (DEBUG) & DEBUG_TIMEOUT
1307                                        fprintf(stderr,"RPCIO XACT timed out; waking up requestor\n");
1308#endif
1309                                        if ( rtems_event_send(xact->requestor, RTEMS_RPC_EVENT) ) {
1310                                                rtems_panic("RPCIO PANIC: requestor id was 0x%08" PRIx32,
1311                                                                        xact->requestor);
1312                                        }
1313
1314                                } else {
1315                                        int len;
1316
1317                                        len = (int)XDR_GETPOS(&xact->xdrs);
1318
1319#ifdef MBUF_TX
1320                                        xact->refcnt = 1;       /* sendto itself */
1321#endif
1322                                        if ( len != SENDTO( ourSock,
1323                                                                                xact->obuf.buf,
1324                                                                                len,
1325                                                                                0,
1326                                                                                &srv->addr.sa,
1327                                                                                sizeof(srv->addr.sin)
1328#ifdef MBUF_TX
1329                                                                                , xact,
1330                                                                                paranoia_free,
1331                                                                                paranoia_ref
1332#endif
1333                                                                                ) ) {
1334
1335                                                xact->status.re_errno  = errno;
1336                                                xact->status.re_status = RPC_CANTSEND;
1337                                                srv->errors++;
1338
1339                                                /* wakeup requestor */
1340                                                fprintf(stderr,"RPCIO: SEND failure\n");
1341                                                status = rtems_event_send(xact->requestor, RTEMS_RPC_EVENT);
1342                                                assert( status == RTEMS_SUCCESSFUL );
1343
1344                                        } else {
1345                                                /* send successful; calculate retransmission time
1346                                                 * and enqueue to temporary list
1347                                                 */
1348                                                if (FIRST_ATTEMPT != xact->trip) {
1349#if (DEBUG) & DEBUG_TIMEOUT
1350                                                        fprintf(stderr,
1351                                                                "timed out; tolive is %i (ticks), retry period is %i (ticks)\n",
1352                                                                        xact->tolive,
1353                                                                        srv->retry_period);
1354#endif
1355                                                        /* this is a real retry; we backup
1356                                                         * the server's retry interval
1357                                                         */
1358                                                        if ( srv->retry_period < max_period ) {
1359
1360                                                                /* If multiple transactions for this server
1361                                                                 * fail (e.g. because it died) this will
1362                                                                 * back-off very agressively (doubling
1363                                                                 * the retransmission period for every
1364                                                                 * timed out transaction up to the CAP limit)
1365                                                                 * which is desirable - single packet failure
1366                                                                 * is treated more gracefully by this algorithm.
1367                                                                 */
1368
1369                                                                srv->retry_period<<=1;
1370#if (DEBUG) & DEBUG_TIMEOUT
1371                                                                fprintf(stderr,
1372                                                                                "adjusted to; retry period %i\n",
1373                                                                                srv->retry_period);
1374#endif
1375                                                        } else {
1376                                                                /* never wait longer than RPCIOD_RETX_CAP_S seconds */
1377                                                                fprintf(stderr,
1378                                                                                "RPCIO: server '%s' not responding - still trying\n",
1379                                                                                srv->name);
1380                                                        }
1381                                                        if ( 0 == ++srv->retrans % 1000) {
1382                                                                fprintf(stderr,
1383                                                                                "RPCIO - statistics: already %li retries to server %s\n",
1384                                                                                srv->retrans,
1385                                                                                srv->name);
1386                                                        }
1387                                                } else {
1388                                                        srv->requests++;
1389                                                }
1390                                                xact->trip      = now;
1391                                                {
1392                                                long capped_period = srv->retry_period;
1393                                                        if ( xact->lifetime < capped_period )
1394                                                                capped_period = xact->lifetime;
1395                                                xact->age       = now + capped_period;
1396                                                xact->tolive   -= capped_period;
1397                                                }
1398                                                /* enqueue to the list of newly sent transactions */
1399                                                xact->node.next = newList;
1400                                                newList         = &xact->node;
1401#if (DEBUG) & DEBUG_TIMEOUT
1402                                                fprintf(stderr,
1403                                                                "XACT (0x%08x) age is 0x%x, now: 0x%x\n",
1404                                                                xact,
1405                                                                xact->age,
1406                                                                now);
1407#endif
1408                                        }
1409                                }
1410            }
1411
1412                /* insert the newly sent transactions into the
1413                 * sorted retransmission list
1414                 */
1415                for (; (xact = (RpcUdpXact)newList); ) {
1416                        register ListNode p,n;
1417                        newList = newList->next;
1418                        for ( p=&listHead; (n=p->next) && xact->age > ((RpcUdpXact)n)->age; p=n )
1419                                /* nothing else to do */;
1420                        nodeAppend(p, &xact->node);
1421                }
1422
1423                if (now > epoch) {
1424                        /* every now and then, readjust the epoch */
1425                        register ListNode n;
1426                        then += now;
1427                        for (n=listHead.next; n; n=n->next) {
1428                                /* readjust outstanding time intervals subject to the
1429                                 * condition that the 'absolute' time must remain
1430                                 * the same. 'age' and 'trip' are measured with
1431                                 * respect to 'then' - hence:
1432                                 *
1433                                 * abs_age == old_age + old_then == new_age + new_then
1434                                 *
1435                                 * ==> new_age = old_age + old_then - new_then == old_age - 'now'
1436                                 */
1437                                ((RpcUdpXact)n)->age  -= now;
1438                                ((RpcUdpXact)n)->trip -= now;
1439#if (DEBUG) & DEBUG_TIMEOUT
1440                                fprintf(stderr,
1441                                                "readjusted XACT (0x%08x); age is 0x%x, trip: 0x%x now: 0x%x\n",
1442                                                (RpcUdpXact)n,
1443                                                ((RpcUdpXact)n)->trip,
1444                                                ((RpcUdpXact)n)->age,
1445                                                now);
1446#endif
1447                        }
1448                        now = 0;
1449                }
1450
1451                next_retrans = listHead.next ?
1452                                                        ((RpcUdpXact)listHead.next)->age - now :
1453                                                        epoch;  /* make sure we don't miss updating the epoch */
1454#if (DEBUG) & DEBUG_TIMEOUT
1455                fprintf(stderr,"RPCIO: next timeout is %x\n",next_retrans);
1456#endif
1457        }
1458        /* close our socket; shut down the receiver */
1459        close(ourSock);
1460
1461#if 0 /* if we get here, no transactions exist, hence there can be none
1462           * in the queue whatsoever
1463           */
1464        /* flush the message queue */
1465        while (RTEMS_SUCCESSFUL == rtems_message_queue_receive(
1466                                                                                q,
1467                                                                                &xact,
1468                                                                                &size,
1469                                                                                RTEMS_NO_WAIT,
1470                                                                                RTEMS_NO_TIMEOUT)) {
1471                        /* TODO enque xact */
1472        }
1473
1474        /* flush all outstanding transactions */
1475
1476        for (xact=((RpcUdpXact)listHead.next); xact; xact=((RpcUdpXact)xact->node.next)) {
1477                        xact->status.re_status = RPC_TIMEDOUT;
1478                        rtems_event_send(xact->requestor, RTEMS_RPC_EVENT);
1479        }
1480#endif
1481
1482        rtems_message_queue_delete(q);
1483
1484        MU_DESTROY(hlock);
1485
1486        fprintf(stderr,"RPC daemon exited...\n");
1487
1488        rtems_binary_semaphore_post(&fini);
1489        rtems_task_suspend(RTEMS_SELF);
1490}
1491
1492
1493/* support for transaction 'pools'. A number of XACT objects
1494 * is always kept around. The initial number is 0 but it
1495 * is allowed to grow up to a maximum.
1496 * If the need grows beyond the maximum, behavior depends:
1497 * Users can either block until a transaction becomes available,
1498 * they can create a new XACT on the fly or get an error
1499 * if no free XACT is available from the pool.
1500 */
1501
1502RpcUdpXactPool
1503rpcUdpXactPoolCreate(
1504        rpcprog_t prog,                 rpcvers_t version,
1505        int xactsize,   int poolsize)
1506{
1507RpcUdpXactPool  rval = MY_MALLOC(sizeof(*rval));
1508rtems_status_code       status;
1509
1510        ASSERT( rval );
1511        status = rtems_message_queue_create(
1512                                        rtems_build_name('R','P','C','p'),
1513                                        poolsize,
1514                                        sizeof(RpcUdpXact),
1515                                        RTEMS_DEFAULT_ATTRIBUTES,
1516                                        &rval->box);
1517        assert( status == RTEMS_SUCCESSFUL );
1518
1519        rval->prog     = prog;
1520        rval->version  = version;
1521        rval->xactSize = xactsize;
1522        return rval;
1523}
1524
1525void
1526rpcUdpXactPoolDestroy(RpcUdpXactPool pool)
1527{
1528RpcUdpXact xact;
1529
1530        while ((xact = rpcUdpXactPoolGet(pool, XactGetFail))) {
1531                rpcUdpXactDestroy(xact);
1532        }
1533        rtems_message_queue_delete(pool->box);
1534        MY_FREE(pool);
1535}
1536
1537RpcUdpXact
1538rpcUdpXactPoolGet(RpcUdpXactPool pool, XactPoolGetMode mode)
1539{
1540RpcUdpXact       xact = 0;
1541size_t           size;
1542
1543        if (RTEMS_SUCCESSFUL != rtems_message_queue_receive(
1544                                                                pool->box,
1545                                                                &xact,
1546                                                                &size,
1547                                                                XactGetWait == mode ?
1548                                                                        RTEMS_WAIT : RTEMS_NO_WAIT,
1549                                                                RTEMS_NO_TIMEOUT)) {
1550
1551                /* nothing found in box; should we create a new one ? */
1552
1553                xact = (XactGetCreate == mode) ?
1554                                        rpcUdpXactCreate(
1555                                                        pool->prog,
1556                                                        pool->version,
1557                                                        pool->xactSize) : 0 ;
1558                if (xact)
1559                                xact->pool = pool;
1560
1561        }
1562        return xact;
1563}
1564
1565void
1566rpcUdpXactPoolPut(RpcUdpXact xact)
1567{
1568RpcUdpXactPool pool;
1569
1570        pool = xact->pool;
1571        ASSERT( pool );
1572
1573        if (RTEMS_SUCCESSFUL != rtems_message_queue_send(
1574                                                                pool->box,
1575                                                                &xact,
1576                                                                sizeof(xact)))
1577                rpcUdpXactDestroy(xact);
1578}
1579
1580#ifdef MBUF_RX
1581
1582/* WORKAROUND: include sys/mbuf.h (or other bsdnet headers) only
1583 *             _after_ using malloc()/free() & friends because
1584 *             the RTEMS/BSDNET headers redefine those :-(
1585 */
1586
1587#define _KERNEL
1588#include <sys/mbuf.h>
1589
1590static void
1591bufFree(struct mbuf **m)
1592{
1593        if (*m) {
1594                rtems_bsdnet_semaphore_obtain();
1595                m_freem(*m);
1596                rtems_bsdnet_semaphore_release();
1597                *m = 0;
1598        }
1599}
1600#endif
1601
1602#ifdef MBUF_TX
1603static void
1604paranoia_free(caddr_t closure, u_int size)
1605{
1606#if (DEBUG)
1607RpcUdpXact xact = (RpcUdpXact)closure;
1608int        len  = (int)XDR_GETPOS(&xact->xdrs);
1609
1610        ASSERT( --xact->refcnt >= 0 && size == len );
1611#endif
1612}
1613
1614static void
1615paranoia_ref (caddr_t closure, u_int size)
1616{
1617#if (DEBUG)
1618RpcUdpXact xact = (RpcUdpXact)closure;
1619int        len  = (int)XDR_GETPOS(&xact->xdrs);
1620        ASSERT( size == len );
1621        xact->refcnt++;
1622#endif
1623}
1624#endif
1625
1626/* receive from a socket and find
1627 * the transaction corresponding to the
1628 * transaction ID received in the server
1629 * reply.
1630 *
1631 * The semantics of the 'pibuf' pointer are
1632 * as follows:
1633 *
1634 * MBUF_RX:
1635 *
1636 */
1637
1638#define RPCIOD_RXBUFSZ  UDPMSGSIZE
1639
1640static RpcUdpXact
1641sockRcv(void)
1642{
1643int                                     len,i;
1644uint32_t                                xid;
1645union {
1646        struct sockaddr_in      sin;
1647        struct sockaddr     sa;
1648}                                       fromAddr;
1649int                                     fromLen  = sizeof(fromAddr.sin);
1650RxBuf                           ibuf     = 0;
1651RpcUdpXact                      xact     = 0;
1652
1653        do {
1654
1655        /* rcv_mbuf() and recvfrom() differ in that the
1656         * former allocates buffers and passes them back
1657         * to us whereas the latter requires us to provide
1658         * buffer space.
1659         * Hence, in the first case whe have to make sure
1660         * no old buffer is leaked - in the second case,
1661         * we might well re-use an old buffer but must
1662         * make sure we have one allocated
1663         */
1664#ifdef MBUF_RX
1665        if (ibuf)
1666                bufFree(&ibuf);
1667
1668        len  = recv_mbuf_from(
1669                                        ourSock,
1670                                        &ibuf,
1671                                        RPCIOD_RXBUFSZ,
1672                                    &fromAddr.sa,
1673                                    &fromLen);
1674#else
1675        if ( !ibuf )
1676                ibuf = (RpcBuf)MY_MALLOC(RPCIOD_RXBUFSZ);
1677        if ( !ibuf )
1678                goto cleanup; /* no memory - drop this message */
1679
1680        len  = recvfrom(ourSock,
1681                                    ibuf->buf,
1682                                    RPCIOD_RXBUFSZ,
1683                                    0,
1684                                    &fromAddr.sa,
1685                                        &fromLen);
1686#endif
1687
1688        if (len <= 0) {
1689                if (EAGAIN != errno)
1690                        fprintf(stderr,"RECV failed: %s\n",strerror(errno));
1691                goto cleanup;
1692        }
1693
1694#if (DEBUG) & DEBUG_PACKLOSS
1695        if ( (unsigned)rand() < DEBUG_PACKLOSS_FRACT ) {
1696                /* lose packets once in a while */
1697                static int xxx = 0;
1698                if ( ++xxx % 16 == 0 )
1699                        fprintf(stderr,"DEBUG: dropped %i packets, so far...\n",xxx);
1700                if ( ibuf )
1701                        bufFree( &ibuf );
1702                continue;
1703        }
1704#endif
1705
1706        i = (xid=XID(ibuf)) & XACT_HASH_MSK;
1707
1708        if ( !(xact=xactHashTbl[i])                                             ||
1709                   xact->obuf.xid                     != xid                        ||
1710#ifdef REJECT_SERVERIP_MISMATCH
1711                   xact->server->addr.sin.sin_addr.s_addr != fromAddr.sin.sin_addr.s_addr       ||
1712#endif
1713                   xact->server->addr.sin.sin_port        != fromAddr.sin.sin_port ) {
1714
1715                if (xact) {
1716                        if (
1717#ifdef REJECT_SERVERIP_MISMATCH
1718                            xact->server->addr.sin.sin_addr.s_addr == fromAddr.sin.sin_addr.s_addr &&
1719#endif
1720                        xact->server->addr.sin.sin_port        == fromAddr.sin.sin_port        &&
1721                            ( xact->obuf.xid                   == xid + XACT_HASHS   ||
1722                                  xact->obuf.xid                   == xid + 2*XACT_HASHS    )
1723                                ) {
1724#ifndef DEBUG /* don't complain if it's just a late arrival of a retry */
1725                        fprintf(stderr,"RPCIO - FYI sockRcv(): dropping late/redundant retry answer\n");
1726#endif
1727                        } else {
1728                        fprintf(stderr,"RPCIO WARNING sockRcv(): transaction mismatch\n");
1729                        fprintf(stderr,"xact: xid  0x%08" PRIx32 "  -- got 0x%08" PRIx32 "\n",
1730                                                        xact->obuf.xid, xid);
1731                        fprintf(stderr,"xact: addr 0x%08" PRIx32 "  -- got 0x%08" PRIx32 "\n",
1732                                                        xact->server->addr.sin.sin_addr.s_addr,
1733                                                        fromAddr.sin.sin_addr.s_addr);
1734                        fprintf(stderr,"xact: port 0x%08x  -- got 0x%08x\n",
1735                                                        xact->server->addr.sin.sin_port,
1736                                                        fromAddr.sin.sin_port);
1737                        }
1738                } else {
1739                        fprintf(stderr,
1740                                        "RPCIO WARNING sockRcv(): got xid 0x%08" PRIx32 " but its slot is empty\n",
1741                                        xid);
1742                }
1743                /* forget about this one and try again */
1744                xact = 0;
1745        }
1746
1747        } while ( !xact );
1748
1749        xact->ibuf     = ibuf;
1750#ifndef MBUF_RX
1751        xact->ibufsize = RPCIOD_RXBUFSZ;
1752#endif
1753
1754        return xact;
1755
1756cleanup:
1757
1758        bufFree(&ibuf);
1759
1760        return 0;
1761}
1762
1763
1764#include <rtems/rtems_bsdnet_internal.h>
1765/* double check the event configuration; should probably globally
1766 * manage system events!!
1767 * We do this at the end of the file for the same reason we had
1768 * included mbuf.h only a couple of lines above - see comment up
1769 * there...
1770 */
1771#if RTEMS_RPC_EVENT & SOSLEEP_EVENT & SBWAIT_EVENT & NETISR_EVENTS
1772#error ILLEGAL EVENT CONFIGURATION
1773#endif
Note: See TracBrowser for help on using the repository browser.