source: rtems-libbsd/rtemsbsd/nfsclient/rpcio.c @ 58c4e1c5

55-freebsd-126-freebsd-12freebsd-9.3
Last change on this file since 58c4e1c5 was 58c4e1c5, checked in by Sebastian Huber <sebastian.huber@…>, on 06/10/16 at 12:11:40

nfsclient: Port to LibBSD

  • Property mode set to 100644
File size: 44.3 KB
Line 
1/**
2 * @file
3 *
4 * @brief RPC Multiplexor for a Multitasking Environment
5 * @ingroup libfs
6 *
7 * This code funnels arbitrary task's UDP/RPC requests
8 * through one socket to arbitrary servers.
9 * The replies are gathered and dispatched to the
10 * requestors.
11 * One task handles all the sending and receiving
12 * work including retries.
13 * It is up to the requestor, however, to do
14 * the XDR encoding of the arguments / decoding
15 * of the results (except for the RPC header which
16 * is handled by the daemon).
17 */
18
19/*
20 * Author: Till Straumann <strauman@slac.stanford.edu>, 2002
21 *
22 * Authorship
23 * ----------
24 * This software (NFS-2 client implementation for RTEMS) was created by
25 *     Till Straumann <strauman@slac.stanford.edu>, 2002-2007,
26 *         Stanford Linear Accelerator Center, Stanford University.
27 *
28 * Acknowledgement of sponsorship
29 * ------------------------------
30 * The NFS-2 client implementation for RTEMS was produced by
31 *     the Stanford Linear Accelerator Center, Stanford University,
32 *         under Contract DE-AC03-76SFO0515 with the Department of Energy.
33 *
34 * Government disclaimer of liability
35 * ----------------------------------
36 * Neither the United States nor the United States Department of Energy,
37 * nor any of their employees, makes any warranty, express or implied, or
38 * assumes any legal liability or responsibility for the accuracy,
39 * completeness, or usefulness of any data, apparatus, product, or process
40 * disclosed, or represents that its use would not infringe privately owned
41 * rights.
42 *
43 * Stanford disclaimer of liability
44 * --------------------------------
45 * Stanford University makes no representations or warranties, express or
46 * implied, nor assumes any liability for the use of this software.
47 *
48 * Stanford disclaimer of copyright
49 * --------------------------------
50 * Stanford University, owner of the copyright, hereby disclaims its
51 * copyright and all other rights in this software.  Hence, anyone may
52 * freely use it for any purpose without restriction.
53 *
54 * Maintenance of notices
55 * ----------------------
56 * In the interest of clarity regarding the origin and status of this
57 * SLAC software, this and all the preceding Stanford University notices
58 * are to remain affixed to any copy or derivative of this software made
59 * or distributed by the recipient and are to be affixed to any copy of
60 * software made or distributed by the recipient that contains a copy or
61 * derivative of this software.
62 *
63 * ------------------ SLAC Software Notices, Set 4 OTT.002a, 2004 FEB 03
64 */
65
66#if HAVE_CONFIG_H
67#include "config.h"
68#endif
69
70#include <inttypes.h>
71
72#include <rtems.h>
73#include <rtems/error.h>
74#include <rtems/bsd/bsd.h>
75#include <stdlib.h>
76#include <time.h>
77#include <rpc/rpc.h>
78#include <rpc/pmap_prot.h>
79#include <errno.h>
80#include <sys/ioctl.h>
81#include <assert.h>
82#include <stdio.h>
83#include <errno.h>
84#include <string.h>
85#include <netinet/in.h>
86#include <arpa/inet.h>
87#include <sys/cpuset.h>
88#include <sys/event.h>
89
90#include "rpcio.h"
91#include "nfsclient-private.h"
92
93/****************************************************************/
94/* CONFIGURABLE PARAMETERS                                      */
95/****************************************************************/
96
97#undef MBUF_RX                  /* If defined: use mbuf XDR stream for
98                                                 *  decoding directly out of mbufs
99                                                 *  Otherwise, the regular 'recvfrom()'
100                                                 *  interface will be used involving an
101                                                 *  extra buffer allocation + copy step.
102                                                 */
103
104#undef MBUF_TX                  /* If defined: avoid copying data when
105                                                 *  sending. Instead, use a wrapper to
106                                                 *  'sosend()' which will point an MBUF
107                                                 *  directly to our buffer space.
108                                                 *  Note that the BSD stack does not copy
109                                                 *  data when fragmenting packets - it
110                                                 *  merely uses an mbuf chain pointing
111                                                 *  into different areas of the data.
112                                                 *
113                                                 * If undefined, the regular 'sendto()'
114                                                 *  interface is used.
115                                                 */
116
117#undef REJECT_SERVERIP_MISMATCH
118                                                /* If defined, RPC replies must come from the server
119                                                 * that was queried. Eric Norum has reported problems
120                                                 * with clustered NFS servers. So we disable this
121                                                 * reducing paranoia...
122                                                 */
123
124/* daemon task parameters */
125#define RPCIOD_NAME             "RPCD"
126
127/* depth of the message queue for sending
128 * RPC requests to the daemon
129 */
130#define RPCIOD_QDEPTH           20
131
132/* Maximum retry limit for retransmission */
133#define RPCIOD_RETX_CAP_S       3 /* seconds */
134
135/* Default timeout for RPC calls */
136#define RPCIOD_DEFAULT_TIMEOUT  (&_rpc_default_timeout)
137static struct timeval _rpc_default_timeout = { 10 /* secs */, 0 /* usecs */ };
138
139/* how many times should we try to resend a failed
140 * transaction with refreshed AUTHs
141 */
142#define RPCIOD_REFRESH          2
143
144/* Events we are using; the RPC_EVENT
145 * MUST NOT be used by any application
146 * thread doing RPC IO (e.g. NFS)
147 */
148#define RTEMS_RPC_EVENT         RTEMS_EVENT_30  /* THE event used by RPCIO. Every task doing
149                                                                                         * RPC IO will receive this - hence it is
150                                                                                         * RESERVED
151                                                                                         */
152#define RPCIOD_KQ_IDENT         0xeb
153#define RPCIOD_RX_EVENT         0x1     /* Events the RPCIOD is using/waiting for */
154#define RPCIOD_TX_EVENT         0x2
155#define RPCIOD_KILL_EVENT       0x4     /* send to the daemon to kill it          */
156
157#define LD_XACT_HASH            8                               /* ld of the size of the transaction hash table  */
158
159
160/* Debugging Flags                                              */
161
162/* NOTE: defining DEBUG 0 leaves some 'assert()' paranoia checks
163 *       but produces no output
164 */
165
166#define DEBUG_TRACE_XACT        (1<<0)
167#define DEBUG_EVENTS            (1<<1)
168#define DEBUG_MALLOC            (1<<2)
169#define DEBUG_TIMEOUT           (1<<3)
170#define DEBUG_PACKLOSS          (1<<4)  /* This introduces random, artificial packet losses to test retransmission */
171
172#define DEBUG_PACKLOSS_FRACT (0xffffffff/10)
173
174/* USE PARENTHESIS WHEN 'or'ing MULTIPLE FLAGS: (DEBUG_XX | DEBUG_YY) */
175#define DEBUG                           (0)
176
177/****************************************************************/
178/* END OF CONFIGURABLE SECTION                                  */
179/****************************************************************/
180
181/* prevent rollover of our timers by readjusting the epoch on the fly */
182#if     (DEBUG) & DEBUG_TIMEOUT
183#define RPCIOD_EPOCH_SECS       10
184#else
185#define RPCIOD_EPOCH_SECS       10000
186#endif
187
188#ifdef  DEBUG
189#define ASSERT(arg)                     assert(arg)
190#else
191#define ASSERT(arg)                     if (arg)
192#endif
193
194/****************************************************************/
195/* MACROS                                                       */
196/****************************************************************/
197
198
199#define XACT_HASHS              (1<<(LD_XACT_HASH))     /* the hash table size derived from the ld       */
200#define XACT_HASH_MSK   ((XACT_HASHS)-1)        /* mask to extract the hash index from a RPC-XID */
201
202
203#define MU_LOCK(mutex)          do {                                                    \
204                                                        assert(                                                 \
205                                                                RTEMS_SUCCESSFUL ==                     \
206                                                                rtems_semaphore_obtain(         \
207                                                                                (mutex),                        \
208                                                                                RTEMS_WAIT,                     \
209                                                                                RTEMS_NO_TIMEOUT        \
210                                                                                ) );                            \
211                                                        } while(0)
212
213#define MU_UNLOCK(mutex)        do {                                                    \
214                                                        assert(                                                 \
215                                                                RTEMS_SUCCESSFUL ==                     \
216                                                                rtems_semaphore_release(        \
217                                                                                (mutex)                         \
218                                                                                ) );                            \
219                                                        } while(0)
220
221#define MU_CREAT(pmutex)        do {                                                    \
222                                                        assert(                                                 \
223                                                                RTEMS_SUCCESSFUL ==                     \
224                                                                rtems_semaphore_create(         \
225                                                                                rtems_build_name(       \
226                                                                                        'R','P','C','l' \
227                                                                                        ),                              \
228                                                                                1,                                      \
229                                                                                MUTEX_ATTRIBUTES,       \
230                                                                                0,                                      \
231                                                                                (pmutex)) );            \
232                                                        } while (0)
233
234
235#define MU_DESTROY(mutex)       do {                                                    \
236                                                        assert(                                                 \
237                                                                RTEMS_SUCCESSFUL ==                     \
238                                                                rtems_semaphore_delete(         \
239                                                                                mutex                           \
240                                                                                ) );                            \
241                                                        } while (0)
242
243#define MUTEX_ATTRIBUTES        (RTEMS_LOCAL           |                \
244                                                        RTEMS_PRIORITY         |                \
245                                                        RTEMS_INHERIT_PRIORITY |                \
246                                                        RTEMS_BINARY_SEMAPHORE)
247
248#define FIRST_ATTEMPT           0x88888888 /* some time that is never reached */
249
250/****************************************************************/
251/* TYPE DEFINITIONS                                             */
252/****************************************************************/
253
254typedef rtems_interval          TimeoutT;
255
256/* 100000th implementation of a doubly linked list;
257 * since only one thread is looking at these,
258 * we need no locking
259 */
260typedef struct ListNodeRec_ {
261        struct ListNodeRec_ *next, *prev;
262} ListNodeRec, *ListNode;
263
264
265/* Structure representing an RPC server */
266typedef struct RpcUdpServerRec_ {
267                RpcUdpServer            next;                   /* linked list of all servers; protected by hlock */
268                union {
269                struct sockaddr_in      sin;
270                struct sockaddr     sa;
271                }                                       addr;
272                AUTH                            *auth;
273                rtems_id                        authlock;               /* must MUTEX the auth object - it's not clear
274                                                                                         *  what is better:
275                                                                                         *   1 having one (MUTEXed) auth per server
276                                                                                         *         who is shared among all transactions
277                                                                                         *         using that server
278                                                                                         *       2 maintaining an AUTH per transaction
279                                                                                         *         (there are then other options: manage
280                                                                                         *         XACT pools on a per-server basis instead
281                                                                                         *         of associating a server with a XACT when
282                                                                                         *   sending)
283                                                                                         * experience will show if the current (1)
284                                                                                         * approach has to be changed.
285                                                                                         */
286                TimeoutT                        retry_period;   /* dynamically adjusted retry period
287                                                                                         * (based on packet roundtrip time)
288                                                                                         */
289                /* STATISTICS */
290                unsigned long           retrans;                /* how many retries were issued by this server         */
291                unsigned long           requests;               /* how many requests have been sent                    */
292                unsigned long       timeouts;           /* how many requests have timed out                    */
293                unsigned long       errors;         /* how many errors have occurred (other than timeouts) */
294                char                            name[20];               /* server's address in IP 'dot' notation               */
295} RpcUdpServerRec;
296
297typedef union  RpcBufU_ {
298                uint32_t                        xid;
299                char                            buf[1];
300} RpcBufU, *RpcBuf;
301
302/* RX Buffer implementation; this is either
303 * an MBUF chain (MBUF_RX configuration)
304 * or a buffer allocated from the heap
305 * where recvfrom copies the (encoded) reply
306 * to. The XDR routines the copy/decode
307 * it into the user's data structures.
308 */
309#ifdef MBUF_RX
310typedef struct mbuf *           RxBuf;  /* an MBUF chain */
311static  void                            bufFree(struct mbuf **m);
312#define XID(ibuf)                       (*(mtod((ibuf), u_long *)))
313#else
314typedef RpcBuf                          RxBuf;
315#define bufFree(b)                      do { MY_FREE(*(b)); *(b)=0; } while(0)
316#define XID(ibuf)                       ((ibuf)->xid)
317#endif
318
319/* A RPC 'transaction' consisting
320 * of server and requestor information,
321 * buffer space and an XDR object
322 * (for encoding arguments).
323 */
324typedef struct RpcUdpXactRec_ {
325                ListNodeRec                     node;           /* so we can put XACTs on a list                */
326                RpcUdpServer            server;         /* server this XACT goes to                     */
327                long                            lifetime;       /* during the lifetime, retry attempts are made */
328                long                            tolive;         /* lifetime timer                               */
329                struct rpc_err          status;         /* RPC reply error status                       */
330                long                            age;            /* age info; needed to manage retransmission    */
331                long                            trip;           /* record round trip time in ticks              */
332                rtems_id                        requestor;      /* the task waiting for this XACT to complete   */
333                RpcUdpXactPool          pool;           /* if this XACT belong to a pool, this is it    */
334                XDR                                     xdrs;           /* argument encoder stream                      */
335                int                                     xdrpos;     /* stream position after the (permanent) header */
336                xdrproc_t                       xres;           /* reply decoder proc - TODO needn't be here    */
337                caddr_t                         pres;           /* reply decoded obj  - TODO needn't be here    */
338#ifndef MBUF_RX
339                int                                     ibufsize;       /* size of the ibuf (bytes)                     */
340#endif
341#ifdef  MBUF_TX
342                int                                     refcnt;         /* mbuf external storage reference count        */
343#endif
344                int                                     obufsize;       /* size of the obuf (bytes)                     */
345                RxBuf                           ibuf;           /* pointer to input buffer assigned by daemon   */
346                RpcBufU                         obuf;       /* output buffer (encoded args) APPENDED HERE   */
347} RpcUdpXactRec;
348
349typedef struct RpcUdpXactPoolRec_ {
350        rtems_id        box;
351        int                     prog;
352        int                     version;
353        int                     xactSize;
354} RpcUdpXactPoolRec;
355
356/* a global hash table where all 'living' transaction
357 * objects are registered.
358 * A number of bits in a transaction's XID maps 1:1 to
359 * an index in this table. Hence, the XACT matching
360 * an RPC/UDP reply packet can quickly be found
361 * The size of this table imposes a hard limit on the
362 * number of all created transactions in the system.
363 */
364static RpcUdpXact xactHashTbl[XACT_HASHS]={0};
365static u_long     xidUpper   [XACT_HASHS]={0};
366static unsigned   xidHashSeed            = 0 ;
367
368/* forward declarations */
369static RpcUdpXact
370sockRcv(void);
371
372static void
373rpcio_daemon(rtems_task_argument);
374
375#ifdef MBUF_TX
376ssize_t
377sendto_nocpy (
378                int s,
379                const void *buf, size_t buflen,
380                int flags,
381                const struct sockaddr *toaddr, int tolen,
382                void *closure,
383                void (*freeproc)(caddr_t, u_int),
384                void (*refproc)(caddr_t, u_int)
385);
386static void paranoia_free(caddr_t closure, u_int size);
387static void paranoia_ref (caddr_t closure, u_int size);
388#define SENDTO  sendto_nocpy
389#else
390#define SENDTO  sendto
391#endif
392
393static RpcUdpServer             rpcUdpServers = 0;      /* linked list of all servers; protected by llock */
394
395static int                              ourSock = -1;           /* the socket we are using for communication */
396static rtems_id                 rpciod  = 0;            /* task id of the RPC daemon                 */
397static int                      rpcKq = -1;             /* the kqueue of the RPC daemon */
398static rtems_id                 msgQ    = 0;            /* message queue where the daemon picks up
399                                                                                         * requests
400                                                                                         */
401#ifndef NDEBUG
402static rtems_id                 llock   = 0;            /* MUTEX protecting the server list */
403static rtems_id                 hlock   = 0;            /* MUTEX protecting the hash table and the list of servers */
404#endif
405static rtems_id                 fini    = 0;            /* a synchronization semaphore we use during
406                                                                                         * module cleanup / driver unloading
407                                                                                         */
408static rtems_interval   ticksPerSec;            /* cached system clock rate (WHO IS ASSUMED NOT
409                                                                                         * TO CHANGE)
410                                                                                         */
411
412#if (DEBUG) & DEBUG_MALLOC
413/* malloc wrappers for debugging */
414static int nibufs = 0;
415
416static inline void *MY_MALLOC(int s)
417{
418        if (s) {
419                void *rval;
420                MU_LOCK(hlock);
421                assert(nibufs++ < 2000);
422                MU_UNLOCK(hlock);
423                assert((rval = malloc(s)) != 0);
424                return rval;
425        }
426        return 0;
427}
428
429static inline void *MY_CALLOC(int n, int s)
430{
431        if (s) {
432                void *rval;
433                MU_LOCK(hlock);
434                assert(nibufs++ < 2000);
435                MU_UNLOCK(hlock);
436                assert((rval = calloc(n,s)) != 0);
437                return rval;
438        }
439        return 0;
440}
441
442
443static inline void MY_FREE(void *p)
444{
445        if (p) {
446                MU_LOCK(hlock);
447                nibufs--;
448                MU_UNLOCK(hlock);
449                free(p);
450        }
451}
452#else
453#define MY_MALLOC       malloc
454#define MY_CALLOC       calloc
455#define MY_FREE         free
456#endif
457
458static inline bool_t
459locked_marshal(RpcUdpServer s, XDR *xdrs)
460{
461bool_t rval;
462        MU_LOCK(s->authlock);
463        rval = AUTH_MARSHALL(s->auth, xdrs);
464        MU_UNLOCK(s->authlock);
465        return rval;
466}
467
468/* Locked operations on a server's auth object */
469static inline bool_t
470locked_validate(RpcUdpServer s, struct opaque_auth *v)
471{
472bool_t rval;
473        MU_LOCK(s->authlock);
474        rval = AUTH_VALIDATE(s->auth, v);
475        MU_UNLOCK(s->authlock);
476        return rval;
477}
478
479static inline bool_t
480locked_refresh(RpcUdpServer s, struct rpc_msg *msg)
481{
482bool_t rval;
483        MU_LOCK(s->authlock);
484        rval = AUTH_REFRESH(s->auth, msg);
485        MU_UNLOCK(s->authlock);
486        return rval;
487}
488
489static void
490sendEventToRpcServer(u_int events)
491{
492struct kevent   trigger;
493int             s;
494
495        EV_SET(
496                &trigger,
497                RPCIOD_KQ_IDENT,
498                EVFILT_USER,
499                0,
500                NOTE_TRIGGER | NOTE_FFOR | events,
501                0,
502                0);
503
504        s = kevent(rpcKq, &trigger, 1, NULL, 0, NULL);
505        assert(s == 0);
506}
507
508/* Create a server object
509 *
510 */
511enum clnt_stat
512rpcUdpServerCreate(
513        struct sockaddr_in      *paddr,
514        rpcprog_t               prog,
515        rpcvers_t               vers,
516        u_long                  uid,
517        u_long                  gid,
518        RpcUdpServer            *psrv
519        )
520{
521RpcUdpServer    rval;
522u_short                 port;
523char                    hname[MAX_MACHINE_NAME + 1];
524int                             theuid, thegid;
525u_int                           thegids[NGRPS];
526gid_t                   gids[NGROUPS];
527int                             len,i;
528AUTH                    *auth;
529enum clnt_stat  pmap_err;
530struct pmap             pmaparg;
531
532        if ( gethostname(hname, MAX_MACHINE_NAME) ) {
533                fprintf(stderr,
534                                "RPCIO - error: I have no hostname ?? (%s)\n",
535                                strerror(errno));
536                return RPC_UNKNOWNHOST;
537        }
538
539        if ( (len = getgroups(NGROUPS, gids) < 0 ) ) {
540                fprintf(stderr,
541                                "RPCIO - error: I unable to get group ids (%s)\n",
542                                strerror(errno));
543                return RPC_FAILED;
544        }
545
546        if ( len > NGRPS )
547                len = NGRPS;
548
549        for (i=0; i<len; i++)
550                thegids[i] = (int)gids[i];
551
552        theuid = (int) ((RPCIOD_DEFAULT_ID == uid) ? geteuid() : uid);
553        thegid = (int) ((RPCIOD_DEFAULT_ID == gid) ? getegid() : gid);
554
555        if ( !(auth = authunix_create(hname, theuid, thegid, len, thegids)) ) {
556                fprintf(stderr,
557                                "RPCIO - error: unable to create RPC AUTH\n");
558                return RPC_FAILED;
559        }
560
561        /* if they specified no port try to ask the portmapper */
562        if (!paddr->sin_port) {
563
564                paddr->sin_port = htons(PMAPPORT);
565
566        pmaparg.pm_prog = prog;
567        pmaparg.pm_vers = vers;
568        pmaparg.pm_prot = IPPROTO_UDP;
569        pmaparg.pm_port = 0;  /* not needed or used */
570
571
572                /* dont use non-reentrant pmap_getport ! */
573
574                pmap_err = rpcUdpCallRp(
575                                                paddr,
576                                                PMAPPROG,
577                                                PMAPVERS,
578                                                PMAPPROC_GETPORT,
579                                                xdr_pmap,
580                                                &pmaparg,
581                                                xdr_u_short,
582                                                &port,
583                                                uid,
584                                                gid,
585                                                0);
586
587                if ( RPC_SUCCESS != pmap_err ) {
588                        paddr->sin_port = 0;
589                        return pmap_err;
590                }
591
592                paddr->sin_port = htons(port);
593        }
594
595        if (0==paddr->sin_port) {
596                        return RPC_PROGNOTREGISTERED;
597        }
598
599        rval                            = (RpcUdpServer)MY_MALLOC(sizeof(*rval));
600        memset(rval, 0, sizeof(*rval));
601
602        if (!inet_ntop(AF_INET, &paddr->sin_addr, rval->name, sizeof(rval->name)))
603                sprintf(rval->name,"?.?.?.?");
604        rval->addr.sin          = *paddr;
605
606        /* start with a long retransmission interval - it
607         * will be adapted dynamically
608         */
609        rval->retry_period  = RPCIOD_RETX_CAP_S * ticksPerSec;
610
611        rval->auth                      = auth;
612
613        MU_CREAT( &rval->authlock );
614
615        /* link into list */
616        MU_LOCK( llock );
617        rval->next = rpcUdpServers;
618        rpcUdpServers = rval;
619        MU_UNLOCK( llock );
620
621        *psrv                           = rval;
622        return RPC_SUCCESS;
623}
624
625void
626rpcUdpServerDestroy(RpcUdpServer s)
627{
628RpcUdpServer prev;
629        if (!s)
630                return;
631        /* we should probably verify (but how?) that nobody
632         * (at least: no outstanding XACTs) is using this
633         * server;
634         */
635
636        /* remove from server list */
637        MU_LOCK(llock);
638        prev = rpcUdpServers;
639        if ( s == prev ) {
640                rpcUdpServers = s->next;
641        } else {
642                for ( ; prev ; prev = prev->next) {
643                        if (prev->next == s) {
644                                prev->next = s->next;
645                                break;
646                        }
647                }
648        }
649        MU_UNLOCK(llock);
650
651        /* MUST have found it */
652        assert(prev);
653
654        auth_destroy(s->auth);
655
656        MU_DESTROY(s->authlock);
657        MY_FREE(s);
658}
659
660int
661rpcUdpStats(FILE *f)
662{
663RpcUdpServer s;
664
665        if (!f) f = stdout;
666
667        fprintf(f,"RPCIOD statistics:\n");
668
669        MU_LOCK(llock);
670        for (s = rpcUdpServers; s; s=s->next) {
671                fprintf(f,"\nServer -- %s:\n", s->name);
672                fprintf(f,"  requests    sent: %10ld, retransmitted: %10ld\n",
673                                                s->requests, s->retrans);
674                fprintf(f,"         timed out: %10ld,   send errors: %10ld\n",
675                                                s->timeouts, s->errors);
676                fprintf(f,"  current retransmission interval: %dms\n",
677                                                (unsigned)(s->retry_period * 1000 / ticksPerSec) );
678        }
679        MU_UNLOCK(llock);
680
681        return 0;
682}
683
684RpcUdpXact
685rpcUdpXactCreate(
686        u_long  program,
687        u_long  version,
688        u_long  size
689        )
690{
691RpcUdpXact              rval=0;
692struct rpc_msg  header;
693register int    i,j;
694
695        if (!size)
696                size = UDPMSGSIZE;
697        /* word align */
698        size = (size + 3) & ~3;
699
700        rval = (RpcUdpXact)MY_CALLOC(1,sizeof(*rval) - sizeof(rval->obuf) + size);
701
702        if (rval) {
703
704                header.rm_xid             = 0;
705                header.rm_direction       = CALL;
706                header.rm_call.cb_rpcvers = RPC_MSG_VERSION;
707                header.rm_call.cb_prog    = program;
708                header.rm_call.cb_vers    = version;
709                xdrmem_create(&(rval->xdrs), rval->obuf.buf, size, XDR_ENCODE);
710
711                if (!xdr_callhdr(&(rval->xdrs), &header)) {
712                        MY_FREE(rval);
713                        return 0;
714                }
715                /* pick a free table slot and initialize the XID */
716                MU_LOCK(hlock);
717                rval->obuf.xid = (xidHashSeed++ ^ ((uintptr_t)rval>>10)) & XACT_HASH_MSK;
718                i=j=(rval->obuf.xid & XACT_HASH_MSK);
719                if (msgQ) {
720                        /* if there's no message queue, refuse to
721                         * give them transactions; we might be in the process to
722                         * go away...
723                         */
724                        do {
725                                i=(i+1) & XACT_HASH_MSK; /* cheap modulo */
726                                if (!xactHashTbl[i]) {
727#if (DEBUG) & DEBUG_TRACE_XACT
728                                        fprintf(stderr,"RPCIO: entering index %i, val %x\n",i,rval);
729#endif
730                                        xactHashTbl[i]=rval;
731                                        j=-1;
732                                        break;
733                                }
734                        } while (i!=j);
735                }
736                MU_UNLOCK(hlock);
737                if (i==j) {
738                        XDR_DESTROY(&rval->xdrs);
739                        MY_FREE(rval);
740                        return 0;
741                }
742                rval->obuf.xid  = xidUpper[i] | i;
743                rval->xdrpos    = XDR_GETPOS(&(rval->xdrs));
744                rval->obufsize  = size;
745        }
746        return rval;
747}
748
749void
750rpcUdpXactDestroy(RpcUdpXact xact)
751{
752int i = xact->obuf.xid & XACT_HASH_MSK;
753
754#if (DEBUG) & DEBUG_TRACE_XACT
755                fprintf(stderr,"RPCIO: removing index %i, val %x\n",i,xact);
756#endif
757
758                ASSERT( xactHashTbl[i]==xact );
759
760                MU_LOCK(hlock);
761                xactHashTbl[i]=0;
762                /* remember XID we used last time so we can avoid
763                 * reusing the same one (incremented by rpcUdpSend routine)
764                 */
765                xidUpper[i]   = xact->obuf.xid;
766                MU_UNLOCK(hlock);
767
768                bufFree(&xact->ibuf);
769
770                XDR_DESTROY(&xact->xdrs);
771                MY_FREE(xact);
772}
773
774
775
776/* Send a transaction, i.e. enqueue it to the
777 * RPC daemon who will actually send it.
778 */
779enum clnt_stat
780rpcUdpSend(
781        RpcUdpXact              xact,
782        RpcUdpServer    srvr,
783        struct timeval  *timeout,
784        u_long                  proc,
785        xdrproc_t               xres, caddr_t pres,
786        xdrproc_t               xargs, caddr_t pargs,
787        ...
788   )
789{
790register XDR    *xdrs;
791unsigned long   ms;
792va_list                 ap;
793
794        va_start(ap,pargs);
795
796        if (!timeout)
797                timeout = RPCIOD_DEFAULT_TIMEOUT;
798
799        ms = 1000 * timeout->tv_sec + timeout->tv_usec/1000;
800
801        /* round lifetime to closest # of ticks */
802        xact->lifetime  = (ms * ticksPerSec + 500) / 1000;
803        if ( 0 == xact->lifetime )
804                xact->lifetime = 1;
805
806#if (DEBUG) & DEBUG_TIMEOUT
807        {
808        static int once=0;
809        if (!once++) {
810                fprintf(stderr,
811                                "Initial lifetime: %i (ticks)\n",
812                                xact->lifetime);
813        }
814        }
815#endif
816
817        xact->tolive    = xact->lifetime;
818
819        xact->xres      = xres;
820        xact->pres      = pres;
821        xact->server    = srvr;
822
823        xdrs            = &xact->xdrs;
824        xdrs->x_op      = XDR_ENCODE;
825        /* increment transaction ID */
826        xact->obuf.xid += XACT_HASHS;
827        XDR_SETPOS(xdrs, xact->xdrpos);
828        if ( !XDR_PUTLONG(xdrs,(long*)&proc) || !locked_marshal(srvr, xdrs) ||
829                 !xargs(xdrs, pargs) ) {
830                va_end(ap);
831                return(xact->status.re_status=RPC_CANTENCODEARGS);
832        }
833        while ((xargs=va_arg(ap,xdrproc_t))) {
834                if (!xargs(xdrs, va_arg(ap,caddr_t)))
835                va_end(ap);
836                return(xact->status.re_status=RPC_CANTENCODEARGS);
837        }
838
839        va_end(ap);
840
841        rtems_task_ident(RTEMS_SELF, RTEMS_WHO_AM_I, &xact->requestor);
842        if ( rtems_message_queue_send( msgQ, &xact, sizeof(xact)) ) {
843                return RPC_CANTSEND;
844        }
845        /* wakeup the rpciod */
846        sendEventToRpcServer(RPCIOD_TX_EVENT);
847
848        return RPC_SUCCESS;
849}
850
851/* Block for the RPC reply to an outstanding
852 * transaction.
853 * The caller is woken by the RPC daemon either
854 * upon reception of the reply or on timeout.
855 */
856enum clnt_stat
857rpcUdpRcv(RpcUdpXact xact)
858{
859int                                     refresh;
860XDR                     reply_xdrs;
861struct rpc_msg          reply_msg;
862rtems_status_code       status;
863rtems_event_set         gotEvents;
864
865        refresh = 0;
866
867        do {
868
869        /* block for the reply */
870        status = rtems_event_receive(
871                RTEMS_RPC_EVENT,
872                RTEMS_WAIT | RTEMS_EVENT_ANY,
873                RTEMS_NO_TIMEOUT,
874                &gotEvents);
875        ASSERT( status == RTEMS_SUCCESSFUL );
876
877        if (xact->status.re_status) {
878#ifdef MBUF_RX
879                /* add paranoia */
880                ASSERT( !xact->ibuf );
881#endif
882                return xact->status.re_status;
883        }
884
885#ifdef MBUF_RX
886        xdrmbuf_create(&reply_xdrs, xact->ibuf, XDR_DECODE);
887#else
888        xdrmem_create(&reply_xdrs, xact->ibuf->buf, xact->ibufsize, XDR_DECODE);
889#endif
890
891        reply_msg.acpted_rply.ar_verf          = _null_auth;
892        reply_msg.acpted_rply.ar_results.where = xact->pres;
893        reply_msg.acpted_rply.ar_results.proc  = xact->xres;
894
895        if (xdr_replymsg(&reply_xdrs, &reply_msg)) {
896                /* OK */
897                _seterr_reply(&reply_msg, &xact->status);
898                if (RPC_SUCCESS == xact->status.re_status) {
899                        if ( !locked_validate(xact->server,
900                                                                &reply_msg.acpted_rply.ar_verf) ) {
901                                xact->status.re_status = RPC_AUTHERROR;
902                                xact->status.re_why    = AUTH_INVALIDRESP;
903                        }
904                        if (reply_msg.acpted_rply.ar_verf.oa_base) {
905                                reply_xdrs.x_op = XDR_FREE;
906                                xdr_opaque_auth(&reply_xdrs, &reply_msg.acpted_rply.ar_verf);
907                        }
908                        refresh = 0;
909                } else {
910                        /* should we try to refresh our credentials ? */
911                        if ( !refresh ) {
912                                /* had never tried before */
913                                refresh = RPCIOD_REFRESH;
914                        }
915                }
916        } else {
917                reply_xdrs.x_op        = XDR_FREE;
918                xdr_replymsg(&reply_xdrs, &reply_msg);
919                xact->status.re_status = RPC_CANTDECODERES;
920        }
921        XDR_DESTROY(&reply_xdrs);
922
923        bufFree(&xact->ibuf);
924
925#ifndef MBUF_RX
926        xact->ibufsize = 0;
927#endif
928
929        if (refresh && locked_refresh(xact->server, &reply_msg)) {
930                rtems_task_ident(RTEMS_SELF, RTEMS_WHO_AM_I, &xact->requestor);
931                if ( rtems_message_queue_send(msgQ, &xact, sizeof(xact)) ) {
932                        return RPC_CANTSEND;
933                }
934                /* wakeup the rpciod */
935                fprintf(stderr,"RPCIO INFO: refreshing my AUTH\n");
936                sendEventToRpcServer(RPCIOD_TX_EVENT);
937        }
938
939        } while ( 0 &&  refresh-- > 0 );
940
941        return xact->status.re_status;
942}
943
944void
945rpcSetXIDs(uint32_t xid)
946{
947        uint32_t i;
948
949        xid &= ~XACT_HASH_MSK;
950
951        for (i = 0; i < XACT_HASHS; ++i) {
952                xidUpper[i] = xid | i;
953        }
954}
955
956int
957rpcUdpInit(void)
958{
959int                     s;
960rtems_status_code       status;
961int                     noblock = 1;
962struct kevent           change;
963
964        if (ourSock < 0) {
965    fprintf(stderr,"RTEMS-RPCIOD $Release$, " \
966            "Till Straumann, Stanford/SLAC/SSRL 2002, " \
967            "See LICENSE file for licensing info.\n");
968
969                ourSock=socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
970                if (ourSock>=0) {
971                        bindresvport(ourSock,(struct sockaddr_in*)0);
972                        s = ioctl(ourSock, FIONBIO, (char*)&noblock);
973                        assert( s == 0 );
974                        /* assume nobody tampers with the clock !! */
975                        ticksPerSec = rtems_clock_get_ticks_per_second();
976                        MU_CREAT( &hlock );
977                        MU_CREAT( &llock );
978
979                        rpcKq = kqueue();
980                        assert( rpcKq >= 0 );
981
982                        EV_SET(
983                                &change,
984                                RPCIOD_KQ_IDENT,
985                                EVFILT_USER, EV_ADD | EV_ENABLE | EV_CLEAR,
986                                NOTE_FFNOP,
987                                0,
988                                0);
989
990                        s = kevent( rpcKq, &change, 1, NULL, 0, NULL );
991                        assert( s == 0 );
992
993                        EV_SET(
994                                &change,
995                                ourSock,
996                                EVFILT_READ, EV_ADD | EV_ENABLE,
997                                0,
998                                0,
999                                0);
1000
1001                        s = kevent( rpcKq, &change, 1, NULL, 0, NULL );
1002                        assert( s == 0 );
1003
1004                        status = rtems_task_create(
1005                                                                                        rtems_build_name('R','P','C','d'),
1006                                                                                        rtems_bsd_get_task_priority(RPCIOD_NAME),
1007                                                                                        rtems_bsd_get_task_stack_size(RPCIOD_NAME),
1008                                                                                        RTEMS_DEFAULT_MODES,
1009                                                                                        /* fprintf saves/restores FP registers on PPC :-( */
1010                                                                                        RTEMS_DEFAULT_ATTRIBUTES | RTEMS_FLOATING_POINT,
1011                                                                                        &rpciod);
1012                        assert( status == RTEMS_SUCCESSFUL );
1013
1014                        status = rtems_message_queue_create(
1015                                                                                        rtems_build_name('R','P','C','q'),
1016                                                                                        RPCIOD_QDEPTH,
1017                                                                                        sizeof(RpcUdpXact),
1018                                                                                        RTEMS_DEFAULT_ATTRIBUTES,
1019                                                                                        &msgQ);
1020                        assert( status == RTEMS_SUCCESSFUL );
1021                        status = rtems_task_start( rpciod, rpcio_daemon, 0 );
1022                        assert( status == RTEMS_SUCCESSFUL );
1023
1024                } else {
1025                        return -1;
1026                }
1027        }
1028        return 0;
1029}
1030
1031int
1032rpcUdpCleanup(void)
1033{
1034        rtems_semaphore_create(
1035                        rtems_build_name('R','P','C','f'),
1036                        0,
1037                        RTEMS_DEFAULT_ATTRIBUTES,
1038                        0,
1039                        &fini);
1040        sendEventToRpcServer(RPCIOD_KILL_EVENT);
1041        /* synchronize with daemon */
1042        rtems_semaphore_obtain(fini, RTEMS_WAIT, 5*ticksPerSec);
1043        /* if the message queue is still there, something went wrong */
1044        if (!msgQ) {
1045                rtems_task_delete(rpciod);
1046        }
1047        rtems_semaphore_delete(fini);
1048        return (msgQ !=0);
1049}
1050
1051/* Another API - simpler but less efficient.
1052 * For each RPCall, a server and a Xact
1053 * are created and destroyed on the fly.
1054 *
1055 * This should be used for infrequent calls
1056 * (e.g. a NFS mount request).
1057 *
1058 * This is roughly compatible with the original
1059 * clnt_call() etc. API - but it uses our
1060 * daemon and is fully reentrant.
1061 */
1062enum clnt_stat
1063rpcUdpClntCreate(
1064                struct sockaddr_in      *psaddr,
1065                rpcprog_t               prog,
1066                rpcvers_t               vers,
1067                u_long                  uid,
1068                u_long                  gid,
1069                RpcUdpClnt              *pclnt
1070)
1071{
1072RpcUdpXact              x;
1073RpcUdpServer    s;
1074enum clnt_stat  err;
1075
1076        if ( RPC_SUCCESS != (err=rpcUdpServerCreate(psaddr, prog, vers, uid, gid, &s)) )
1077                return err;
1078
1079        if ( !(x=rpcUdpXactCreate(prog, vers, UDPMSGSIZE)) ) {
1080                rpcUdpServerDestroy(s);
1081                return RPC_FAILED;
1082        }
1083        /* TODO: could maintain a server cache */
1084
1085        x->server = s;
1086
1087        *pclnt = x;
1088
1089        return RPC_SUCCESS;
1090}
1091
1092static void
1093rpcUdpClntDestroy(RpcUdpClnt xact)
1094{
1095        rpcUdpServerDestroy(xact->server);
1096        rpcUdpXactDestroy(xact);
1097}
1098
1099enum clnt_stat
1100rpcUdpClntCall(
1101        RpcUdpClnt              xact,
1102        u_long                  proc,
1103        XdrProcT                xargs,
1104        CaddrT                  pargs,
1105        XdrProcT                xres,
1106        CaddrT                  pres,
1107        struct timeval  *timeout
1108        )
1109{
1110enum clnt_stat  stat;
1111
1112                if ( (stat = rpcUdpSend(xact, xact->server, timeout, proc,
1113                                                                xres, pres,
1114                                                                xargs, pargs,
1115                                                                0)) ) {
1116                        fprintf(stderr,"RPCIO Send failed: %i\n",stat);
1117                        return stat;
1118                }
1119                return rpcUdpRcv(xact);
1120}
1121
1122/* a yet simpler interface */
1123enum clnt_stat
1124rpcUdpCallRp(
1125        struct sockaddr_in      *psrvr,
1126        u_long                          prog,
1127        u_long                          vers,
1128        u_long                          proc,
1129        XdrProcT                        xargs,
1130        CaddrT                          pargs,
1131        XdrProcT                        xres,
1132        CaddrT                          pres,
1133        u_long                          uid,            /* RPCIO_DEFAULT_ID picks default */
1134        u_long                          gid,            /* RPCIO_DEFAULT_ID picks default */
1135        struct timeval          *timeout        /* NULL picks default           */
1136)
1137{
1138RpcUdpClnt                      clp;
1139enum clnt_stat          stat;
1140
1141        stat = rpcUdpClntCreate(
1142                                psrvr,
1143                                prog,
1144                                vers,
1145                                uid,
1146                                gid,
1147                                &clp);
1148
1149        if ( RPC_SUCCESS != stat )
1150                return stat;
1151
1152        stat = rpcUdpClntCall(
1153                                clp,
1154                                proc,
1155                                xargs, pargs,
1156                                xres,  pres,
1157                                timeout);
1158
1159        rpcUdpClntDestroy(clp);
1160
1161        return stat;
1162}
1163
1164/* linked list primitives */
1165static void
1166nodeXtract(ListNode n)
1167{
1168        if (n->prev)
1169                n->prev->next = n->next;
1170        if (n->next)
1171                n->next->prev = n->prev;
1172        n->next = n->prev = 0;
1173}
1174
1175static void
1176nodeAppend(ListNode l, ListNode n)
1177{
1178        if ( (n->next = l->next) )
1179                n->next->prev = n;
1180        l->next = n;
1181        n->prev = l;
1182
1183}
1184
1185/* this code does the work */
1186static void
1187rpcio_daemon(rtems_task_argument arg)
1188{
1189RpcUdpXact        xact;
1190RpcUdpServer      srv;
1191rtems_interval    next_retrans, then, unow;
1192long                                            now;    /* need to do signed comparison with age! */
1193u_int             events;
1194ListNode          newList;
1195size_t            size;
1196rtems_id          q          =  0;
1197ListNodeRec       listHead   = {0, 0};
1198unsigned long     epoch      = RPCIOD_EPOCH_SECS * ticksPerSec;
1199unsigned long                   max_period = RPCIOD_RETX_CAP_S * ticksPerSec;
1200rtems_status_code       status;
1201
1202
1203        then = rtems_clock_get_ticks_since_boot();
1204
1205        for (next_retrans = epoch;;) {
1206                {
1207                        struct timespec timeout = {
1208                                .tv_sec = (next_retrans + ticksPerSec - 1) / ticksPerSec,
1209                                .tv_nsec = 0
1210                        };
1211                        struct kevent event[2];
1212                        int i;
1213                        int n;
1214
1215                        n = kevent(rpcKq, NULL, 0, &event[0], 2, &timeout);
1216                        assert(n >= 0);
1217
1218                        events = 0;
1219
1220                        for (i = 0; i < n; ++i) {
1221                                if (event[i].filter == EVFILT_USER) {
1222                                        events |= event[i].fflags;
1223                                } else {
1224                                        events |= RPCIOD_RX_EVENT;
1225                                }
1226                        }
1227                }
1228
1229                if (events & RPCIOD_KILL_EVENT) {
1230                        int i;
1231
1232#if (DEBUG) & DEBUG_EVENTS
1233                        fprintf(stderr,"RPCIO: got KILL event\n");
1234#endif
1235
1236                        MU_LOCK(hlock);
1237                        for (i=XACT_HASHS-1; i>=0; i--) {
1238                                if (xactHashTbl[i]) {
1239                                        break;
1240                                }
1241                        }
1242                        if (i<0) {
1243                                /* prevent them from creating and enqueueing more messages */
1244                                q=msgQ;
1245                                /* messages queued after we executed this assignment will fail */
1246                                msgQ=0;
1247                        }
1248                        MU_UNLOCK(hlock);
1249                        if (i>=0) {
1250                                fprintf(stderr,"RPCIO There are still transactions circulating; I refuse to go away\n");
1251                                fprintf(stderr,"(1st in slot %i)\n",i);
1252                                rtems_semaphore_release(fini);
1253                        } else {
1254                                break;
1255                        }
1256                }
1257
1258                unow = rtems_clock_get_ticks_since_boot();
1259
1260                /* measure everything relative to then to protect against
1261                 * rollover
1262                 */
1263                now = unow - then;
1264
1265                /* NOTE: we don't lock the hash table while we are operating
1266                 * on transactions; the paradigm is that we 'own' a particular
1267                 * transaction (and hence it's hash table slot) from the
1268                 * time the xact was put into the message queue until we
1269                 * wake up the requestor.
1270                 */
1271
1272                if (RPCIOD_RX_EVENT & events) {
1273
1274#if (DEBUG) & DEBUG_EVENTS
1275                        fprintf(stderr,"RPCIO: got RX event\n");
1276#endif
1277
1278                        while ((xact=sockRcv())) {
1279
1280                                /* extract from the retransmission list */
1281                                nodeXtract(&xact->node);
1282
1283                                /* change the ID - there might already be
1284                                 * a retransmission on the way. When it's
1285                                 * reply arrives we must not find it's ID
1286                                 * in the hashtable
1287                                 */
1288                                xact->obuf.xid        += XACT_HASHS;
1289
1290                                xact->status.re_status = RPC_SUCCESS;
1291
1292                                /* calculate roundtrip ticks */
1293                                xact->trip             = now - xact->trip;
1294
1295                                srv                    = xact->server;
1296
1297                                /* adjust the server's retry period */
1298                                {
1299                                        register TimeoutT rtry = srv->retry_period;
1300                                        register TimeoutT trip = xact->trip;
1301
1302                                        ASSERT( trip >= 0 );
1303
1304                                        if ( 0==trip )
1305                                                trip = 1;
1306
1307                                        /* retry_new = 0.75*retry_old + 0.25 * 8 * roundrip */
1308                                        rtry   = (3*rtry + (trip << 3)) >> 2;
1309
1310                                        if ( rtry > max_period )
1311                                                rtry = max_period;
1312
1313                                        srv->retry_period = rtry;
1314                                }
1315
1316                                /* wakeup requestor */
1317                                rtems_event_send(xact->requestor, RTEMS_RPC_EVENT);
1318                        }
1319                }
1320
1321                if (RPCIOD_TX_EVENT & events) {
1322
1323#if (DEBUG) & DEBUG_EVENTS
1324                        fprintf(stderr,"RPCIO: got TX event\n");
1325#endif
1326
1327                        while (RTEMS_SUCCESSFUL == rtems_message_queue_receive(
1328                                                                                        msgQ,
1329                                                                                        &xact,
1330                                                                                        &size,
1331                                                                                        RTEMS_NO_WAIT,
1332                                                                                        RTEMS_NO_TIMEOUT)) {
1333                                /* put to the head of timeout q */
1334                                nodeAppend(&listHead, &xact->node);
1335
1336                                xact->age  = now;
1337                                xact->trip = FIRST_ATTEMPT;
1338                        }
1339                }
1340
1341
1342                /* work the timeout q */
1343                newList = 0;
1344                for ( xact=(RpcUdpXact)listHead.next;
1345                          xact && xact->age <= now;
1346                          xact=(RpcUdpXact)listHead.next ) {
1347
1348                                /* extract from the list */
1349                                nodeXtract(&xact->node);
1350
1351                                srv = xact->server;
1352
1353                                if (xact->tolive < 0) {
1354                                        /* this one timed out */
1355                                        xact->status.re_errno  = ETIMEDOUT;
1356                                        xact->status.re_status = RPC_TIMEDOUT;
1357
1358                                        srv->timeouts++;
1359
1360                                        /* Change the ID - there might still be
1361                                         * a reply on the way. When it arrives we
1362                                         * must not find it's ID in the hash table
1363                                         *
1364                                         * Thanks to Steven Johnson for hunting this
1365                                         * one down.
1366                                         */
1367                                        xact->obuf.xid        += XACT_HASHS;
1368
1369#if (DEBUG) & DEBUG_TIMEOUT
1370                                        fprintf(stderr,"RPCIO XACT timed out; waking up requestor\n");
1371#endif
1372                                        if ( rtems_event_send(xact->requestor, RTEMS_RPC_EVENT) ) {
1373                                                rtems_panic("RPCIO PANIC: requestor id was 0x%08x",
1374                                                                        xact->requestor);
1375                                        }
1376
1377                                } else {
1378                                        int len;
1379
1380                                        len = (int)XDR_GETPOS(&xact->xdrs);
1381
1382#ifdef MBUF_TX
1383                                        xact->refcnt = 1;       /* sendto itself */
1384#endif
1385                                        if ( len != SENDTO( ourSock,
1386                                                                                xact->obuf.buf,
1387                                                                                len,
1388                                                                                0,
1389                                                                                &srv->addr.sa,
1390                                                                                sizeof(srv->addr.sin)
1391#ifdef MBUF_TX
1392                                                                                , xact,
1393                                                                                paranoia_free,
1394                                                                                paranoia_ref
1395#endif
1396                                                                                ) ) {
1397
1398                                                xact->status.re_errno  = errno;
1399                                                xact->status.re_status = RPC_CANTSEND;
1400                                                srv->errors++;
1401
1402                                                /* wakeup requestor */
1403                                                fprintf(stderr,"RPCIO: SEND failure\n");
1404                                                status = rtems_event_send(xact->requestor, RTEMS_RPC_EVENT);
1405                                                assert( status == RTEMS_SUCCESSFUL );
1406
1407                                        } else {
1408                                                /* send successful; calculate retransmission time
1409                                                 * and enqueue to temporary list
1410                                                 */
1411                                                if (FIRST_ATTEMPT != xact->trip) {
1412#if (DEBUG) & DEBUG_TIMEOUT
1413                                                        fprintf(stderr,
1414                                                                "timed out; tolive is %i (ticks), retry period is %i (ticks)\n",
1415                                                                        xact->tolive,
1416                                                                        srv->retry_period);
1417#endif
1418                                                        /* this is a real retry; we backup
1419                                                         * the server's retry interval
1420                                                         */
1421                                                        if ( srv->retry_period < max_period ) {
1422
1423                                                                /* If multiple transactions for this server
1424                                                                 * fail (e.g. because it died) this will
1425                                                                 * back-off very agressively (doubling
1426                                                                 * the retransmission period for every
1427                                                                 * timed out transaction up to the CAP limit)
1428                                                                 * which is desirable - single packet failure
1429                                                                 * is treated more gracefully by this algorithm.
1430                                                                 */
1431
1432                                                                srv->retry_period<<=1;
1433#if (DEBUG) & DEBUG_TIMEOUT
1434                                                                fprintf(stderr,
1435                                                                                "adjusted to; retry period %i\n",
1436                                                                                srv->retry_period);
1437#endif
1438                                                        } else {
1439                                                                /* never wait longer than RPCIOD_RETX_CAP_S seconds */
1440                                                                fprintf(stderr,
1441                                                                                "RPCIO: server '%s' not responding - still trying\n",
1442                                                                                srv->name);
1443                                                        }
1444                                                        if ( 0 == ++srv->retrans % 1000) {
1445                                                                fprintf(stderr,
1446                                                                                "RPCIO - statistics: already %li retries to server %s\n",
1447                                                                                srv->retrans,
1448                                                                                srv->name);
1449                                                        }
1450                                                } else {
1451                                                        srv->requests++;
1452                                                }
1453                                                xact->trip      = now;
1454                                                {
1455                                                long capped_period = srv->retry_period;
1456                                                        if ( xact->lifetime < capped_period )
1457                                                                capped_period = xact->lifetime;
1458                                                xact->age       = now + capped_period;
1459                                                xact->tolive   -= capped_period;
1460                                                }
1461                                                /* enqueue to the list of newly sent transactions */
1462                                                xact->node.next = newList;
1463                                                newList         = &xact->node;
1464#if (DEBUG) & DEBUG_TIMEOUT
1465                                                fprintf(stderr,
1466                                                                "XACT (0x%08x) age is 0x%x, now: 0x%x\n",
1467                                                                xact,
1468                                                                xact->age,
1469                                                                now);
1470#endif
1471                                        }
1472                                }
1473            }
1474
1475                /* insert the newly sent transactions into the
1476                 * sorted retransmission list
1477                 */
1478                for (; (xact = (RpcUdpXact)newList); ) {
1479                        register ListNode p,n;
1480                        newList = newList->next;
1481                        for ( p=&listHead; (n=p->next) && xact->age > ((RpcUdpXact)n)->age; p=n )
1482                                /* nothing else to do */;
1483                        nodeAppend(p, &xact->node);
1484                }
1485
1486                if (now > epoch) {
1487                        /* every now and then, readjust the epoch */
1488                        register ListNode n;
1489                        then += now;
1490                        for (n=listHead.next; n; n=n->next) {
1491                                /* readjust outstanding time intervals subject to the
1492                                 * condition that the 'absolute' time must remain
1493                                 * the same. 'age' and 'trip' are measured with
1494                                 * respect to 'then' - hence:
1495                                 *
1496                                 * abs_age == old_age + old_then == new_age + new_then
1497                                 *
1498                                 * ==> new_age = old_age + old_then - new_then == old_age - 'now'
1499                                 */
1500                                ((RpcUdpXact)n)->age  -= now;
1501                                ((RpcUdpXact)n)->trip -= now;
1502#if (DEBUG) & DEBUG_TIMEOUT
1503                                fprintf(stderr,
1504                                                "readjusted XACT (0x%08x); age is 0x%x, trip: 0x%x now: 0x%x\n",
1505                                                (RpcUdpXact)n,
1506                                                ((RpcUdpXact)n)->trip,
1507                                                ((RpcUdpXact)n)->age,
1508                                                now);
1509#endif
1510                        }
1511                        now = 0;
1512                }
1513
1514                next_retrans = listHead.next ?
1515                                                        ((RpcUdpXact)listHead.next)->age - now :
1516                                                        epoch;  /* make sure we don't miss updating the epoch */
1517#if (DEBUG) & DEBUG_TIMEOUT
1518                fprintf(stderr,"RPCIO: next timeout is %x\n",next_retrans);
1519#endif
1520        }
1521        /* close our socket; shut down the receiver */
1522        close(ourSock);
1523        close(rpcKq);
1524
1525#if 0 /* if we get here, no transactions exist, hence there can be none
1526           * in the queue whatsoever
1527           */
1528        /* flush the message queue */
1529        while (RTEMS_SUCCESSFUL == rtems_message_queue_receive(
1530                                                                                q,
1531                                                                                &xact,
1532                                                                                &size,
1533                                                                                RTEMS_NO_WAIT,
1534                                                                                RTEMS_NO_TIMEOUT)) {
1535                        /* TODO enque xact */
1536        }
1537
1538        /* flush all outstanding transactions */
1539
1540        for (xact=((RpcUdpXact)listHead.next); xact; xact=((RpcUdpXact)xact->node.next)) {
1541                        xact->status.re_status = RPC_TIMEDOUT;
1542                        rtems_event_send(xact->requestor, RTEMS_RPC_EVENT);
1543        }
1544#endif
1545
1546        rtems_message_queue_delete(q);
1547
1548        MU_DESTROY(hlock);
1549
1550        fprintf(stderr,"RPC daemon exited...\n");
1551
1552        rtems_semaphore_release(fini);
1553        rtems_task_suspend(RTEMS_SELF);
1554}
1555
1556
1557/* support for transaction 'pools'. A number of XACT objects
1558 * is always kept around. The initial number is 0 but it
1559 * is allowed to grow up to a maximum.
1560 * If the need grows beyond the maximum, behavior depends:
1561 * Users can either block until a transaction becomes available,
1562 * they can create a new XACT on the fly or get an error
1563 * if no free XACT is available from the pool.
1564 */
1565
1566RpcUdpXactPool
1567rpcUdpXactPoolCreate(
1568        rpcprog_t prog,                 rpcvers_t version,
1569        int xactsize,   int poolsize)
1570{
1571RpcUdpXactPool  rval = MY_MALLOC(sizeof(*rval));
1572rtems_status_code       status;
1573
1574        ASSERT( rval );
1575        status = rtems_message_queue_create(
1576                                        rtems_build_name('R','P','C','p'),
1577                                        poolsize,
1578                                        sizeof(RpcUdpXact),
1579                                        RTEMS_DEFAULT_ATTRIBUTES,
1580                                        &rval->box);
1581        assert( status == RTEMS_SUCCESSFUL );
1582
1583        rval->prog     = prog;
1584        rval->version  = version;
1585        rval->xactSize = xactsize;
1586        return rval;
1587}
1588
1589void
1590rpcUdpXactPoolDestroy(RpcUdpXactPool pool)
1591{
1592RpcUdpXact xact;
1593
1594        while ((xact = rpcUdpXactPoolGet(pool, XactGetFail))) {
1595                rpcUdpXactDestroy(xact);
1596        }
1597        rtems_message_queue_delete(pool->box);
1598        MY_FREE(pool);
1599}
1600
1601RpcUdpXact
1602rpcUdpXactPoolGet(RpcUdpXactPool pool, XactPoolGetMode mode)
1603{
1604RpcUdpXact       xact = 0;
1605size_t           size;
1606
1607        if (RTEMS_SUCCESSFUL != rtems_message_queue_receive(
1608                                                                pool->box,
1609                                                                &xact,
1610                                                                &size,
1611                                                                XactGetWait == mode ?
1612                                                                        RTEMS_WAIT : RTEMS_NO_WAIT,
1613                                                                RTEMS_NO_TIMEOUT)) {
1614
1615                /* nothing found in box; should we create a new one ? */
1616
1617                xact = (XactGetCreate == mode) ?
1618                                        rpcUdpXactCreate(
1619                                                        pool->prog,
1620                                                        pool->version,
1621                                                        pool->xactSize) : 0 ;
1622                if (xact)
1623                                xact->pool = pool;
1624
1625        }
1626        return xact;
1627}
1628
1629void
1630rpcUdpXactPoolPut(RpcUdpXact xact)
1631{
1632RpcUdpXactPool pool;
1633
1634        pool = xact->pool;
1635        ASSERT( pool );
1636
1637        if (RTEMS_SUCCESSFUL != rtems_message_queue_send(
1638                                                                pool->box,
1639                                                                &xact,
1640                                                                sizeof(xact)))
1641                rpcUdpXactDestroy(xact);
1642}
1643
1644#ifdef MBUF_RX
1645
1646/* WORKAROUND: include sys/mbuf.h (or other bsdnet headers) only
1647 *             _after_ using malloc()/free() & friends because
1648 *             the RTEMS/BSDNET headers redefine those :-(
1649 */
1650
1651#include <machine/rtems-bsd-kernel-space.h>
1652#include <sys/mbuf.h>
1653
1654static void
1655bufFree(struct mbuf **m)
1656{
1657        if (*m) {
1658                rtems_bsdnet_semaphore_obtain();
1659                m_freem(*m);
1660                rtems_bsdnet_semaphore_release();
1661                *m = 0;
1662        }
1663}
1664#endif
1665
1666#ifdef MBUF_TX
1667static void
1668paranoia_free(caddr_t closure, u_int size)
1669{
1670#if (DEBUG)
1671RpcUdpXact xact = (RpcUdpXact)closure;
1672int        len  = (int)XDR_GETPOS(&xact->xdrs);
1673
1674        ASSERT( --xact->refcnt >= 0 && size == len );
1675#endif
1676}
1677
1678static void
1679paranoia_ref (caddr_t closure, u_int size)
1680{
1681#if (DEBUG)
1682RpcUdpXact xact = (RpcUdpXact)closure;
1683int        len  = (int)XDR_GETPOS(&xact->xdrs);
1684        ASSERT( size == len );
1685        xact->refcnt++;
1686#endif
1687}
1688#endif
1689
1690/* receive from a socket and find
1691 * the transaction corresponding to the
1692 * transaction ID received in the server
1693 * reply.
1694 *
1695 * The semantics of the 'pibuf' pointer are
1696 * as follows:
1697 *
1698 * MBUF_RX:
1699 *
1700 */
1701
1702#define RPCIOD_RXBUFSZ  UDPMSGSIZE
1703
1704static RpcUdpXact
1705sockRcv(void)
1706{
1707int                                     len,i;
1708uint32_t                                xid;
1709union {
1710        struct sockaddr_in      sin;
1711        struct sockaddr     sa;
1712}                                       fromAddr;
1713socklen_t                               fromLen  = sizeof(fromAddr.sin);
1714RxBuf                           ibuf     = 0;
1715RpcUdpXact                      xact     = 0;
1716
1717        do {
1718
1719        /* rcv_mbuf() and recvfrom() differ in that the
1720         * former allocates buffers and passes them back
1721         * to us whereas the latter requires us to provide
1722         * buffer space.
1723         * Hence, in the first case whe have to make sure
1724         * no old buffer is leaked - in the second case,
1725         * we might well re-use an old buffer but must
1726         * make sure we have one allocated
1727         */
1728#ifdef MBUF_RX
1729        if (ibuf)
1730                bufFree(&ibuf);
1731
1732        len  = recv_mbuf_from(
1733                                        ourSock,
1734                                        &ibuf,
1735                                        RPCIOD_RXBUFSZ,
1736                                    &fromAddr.sa,
1737                                    &fromLen);
1738#else
1739        if ( !ibuf )
1740                ibuf = (RpcBuf)MY_MALLOC(RPCIOD_RXBUFSZ);
1741        if ( !ibuf )
1742                goto cleanup; /* no memory - drop this message */
1743
1744        len  = recvfrom(ourSock,
1745                                    ibuf->buf,
1746                                    RPCIOD_RXBUFSZ,
1747                                    0,
1748                                    &fromAddr.sa,
1749                                        &fromLen);
1750#endif
1751
1752        if (len <= 0) {
1753                if (EAGAIN != errno)
1754                        fprintf(stderr,"RECV failed: %s\n",strerror(errno));
1755                goto cleanup;
1756        }
1757
1758#if (DEBUG) & DEBUG_PACKLOSS
1759        if ( (unsigned)rand() < DEBUG_PACKLOSS_FRACT ) {
1760                /* lose packets once in a while */
1761                static int xxx = 0;
1762                if ( ++xxx % 16 == 0 )
1763                        fprintf(stderr,"DEBUG: dropped %i packets, so far...\n",xxx);
1764                if ( ibuf )
1765                        bufFree( &ibuf );
1766                continue;
1767        }
1768#endif
1769
1770        i = (xid=XID(ibuf)) & XACT_HASH_MSK;
1771
1772        if ( !(xact=xactHashTbl[i])                                             ||
1773                   xact->obuf.xid                     != xid                        ||
1774#ifdef REJECT_SERVERIP_MISMATCH
1775                   xact->server->addr.sin.sin_addr.s_addr != fromAddr.sin.sin_addr.s_addr       ||
1776#endif
1777                   xact->server->addr.sin.sin_port        != fromAddr.sin.sin_port ) {
1778
1779                if (xact) {
1780                        if (
1781#ifdef REJECT_SERVERIP_MISMATCH
1782                            xact->server->addr.sin.sin_addr.s_addr == fromAddr.sin.sin_addr.s_addr &&
1783#endif
1784                        xact->server->addr.sin.sin_port        == fromAddr.sin.sin_port        &&
1785                            ( xact->obuf.xid                   == xid + XACT_HASHS   ||
1786                                  xact->obuf.xid                   == xid + 2*XACT_HASHS    )
1787                                ) {
1788#ifndef DEBUG /* don't complain if it's just a late arrival of a retry */
1789                        fprintf(stderr,"RPCIO - FYI sockRcv(): dropping late/redundant retry answer\n");
1790#endif
1791                        } else {
1792                        fprintf(stderr,"RPCIO WARNING sockRcv(): transaction mismatch\n");
1793                        fprintf(stderr,"xact: xid  0x%08" PRIx32 "  -- got 0x%08" PRIx32 "\n",
1794                                                        xact->obuf.xid, xid);
1795                        fprintf(stderr,"xact: addr 0x%08" PRIx32 "  -- got 0x%08" PRIx32 "\n",
1796                                                        xact->server->addr.sin.sin_addr.s_addr,
1797                                                        fromAddr.sin.sin_addr.s_addr);
1798                        fprintf(stderr,"xact: port 0x%08x  -- got 0x%08x\n",
1799                                                        xact->server->addr.sin.sin_port,
1800                                                        fromAddr.sin.sin_port);
1801                        }
1802                } else {
1803                        fprintf(stderr,
1804                                        "RPCIO WARNING sockRcv(): got xid 0x%08" PRIx32 " but its slot is empty\n",
1805                                        xid);
1806                }
1807                /* forget about this one and try again */
1808                xact = 0;
1809        }
1810
1811        } while ( !xact );
1812
1813        xact->ibuf     = ibuf;
1814#ifndef MBUF_RX
1815        xact->ibufsize = RPCIOD_RXBUFSZ;
1816#endif
1817
1818        return xact;
1819
1820cleanup:
1821
1822        bufFree(&ibuf);
1823
1824        return 0;
1825}
Note: See TracBrowser for help on using the repository browser.