source: rtems/cpukit/libfs/src/nfsclient/src/rpcio.c @ 8b389473

4.104.115
Last change on this file since 8b389473 was 8b389473, checked in by Ralf Corsepius <ralf.corsepius@…>, on 05/30/10 at 10:18:41

2010-05-30 Ralf Corsépius <ralf.corsepius@…>

  • libfs/src/nfsclient/src/rpcio.c: Warning removal.
  • Property mode set to 100644
File size: 44.4 KB
Line 
1/* $Id$ */
2
3/* RPC multiplexor for a multitasking environment */
4
5/* Author: Till Straumann <strauman@slac.stanford.edu>, 2002 */
6
7/* This code funnels arbitrary task's UDP/RPC requests
8 * through one socket to arbitrary servers.
9 * The replies are gathered and dispatched to the
10 * requestors.
11 * One task handles all the sending and receiving
12 * work including retries.
13 * It is up to the requestor, however, to do
14 * the XDR encoding of the arguments / decoding
15 * of the results (except for the RPC header which
16 * is handled by the daemon).
17 */
18
19/*
20 * Authorship
21 * ----------
22 * This software (NFS-2 client implementation for RTEMS) was created by
23 *     Till Straumann <strauman@slac.stanford.edu>, 2002-2007,
24 *         Stanford Linear Accelerator Center, Stanford University.
25 *
26 * Acknowledgement of sponsorship
27 * ------------------------------
28 * The NFS-2 client implementation for RTEMS was produced by
29 *     the Stanford Linear Accelerator Center, Stanford University,
30 *         under Contract DE-AC03-76SFO0515 with the Department of Energy.
31 *
32 * Government disclaimer of liability
33 * ----------------------------------
34 * Neither the United States nor the United States Department of Energy,
35 * nor any of their employees, makes any warranty, express or implied, or
36 * assumes any legal liability or responsibility for the accuracy,
37 * completeness, or usefulness of any data, apparatus, product, or process
38 * disclosed, or represents that its use would not infringe privately owned
39 * rights.
40 *
41 * Stanford disclaimer of liability
42 * --------------------------------
43 * Stanford University makes no representations or warranties, express or
44 * implied, nor assumes any liability for the use of this software.
45 *
46 * Stanford disclaimer of copyright
47 * --------------------------------
48 * Stanford University, owner of the copyright, hereby disclaims its
49 * copyright and all other rights in this software.  Hence, anyone may
50 * freely use it for any purpose without restriction.
51 *
52 * Maintenance of notices
53 * ----------------------
54 * In the interest of clarity regarding the origin and status of this
55 * SLAC software, this and all the preceding Stanford University notices
56 * are to remain affixed to any copy or derivative of this software made
57 * or distributed by the recipient and are to be affixed to any copy of
58 * software made or distributed by the recipient that contains a copy or
59 * derivative of this software.
60 *
61 * ------------------ SLAC Software Notices, Set 4 OTT.002a, 2004 FEB 03
62 */
63
64#if HAVE_CONFIG_H
65#include "config.h"
66#endif
67
68#include <rtems.h>
69#include <rtems/error.h>
70#include <rtems/rtems_bsdnet.h>
71#include <stdlib.h>
72#include <time.h>
73#include <rpc/rpc.h>
74#include <rpc/pmap_prot.h>
75#include <errno.h>
76#include <sys/ioctl.h>
77#include <assert.h>
78#include <stdio.h>
79#include <errno.h>
80#include <string.h>
81#include <netinet/in.h>
82#include <arpa/inet.h>
83
84#include "rpcio.h"
85
86/****************************************************************/
87/* CONFIGURABLE PARAMETERS                                      */
88/****************************************************************/
89
90#define MBUF_RX                 /* If defined: use mbuf XDR stream for
91                                                 *  decoding directly out of mbufs
92                                                 *  Otherwise, the regular 'recvfrom()'
93                                                 *  interface will be used involving an
94                                                 *  extra buffer allocation + copy step.
95                                                 */
96
97#define MBUF_TX                 /* If defined: avoid copying data when
98                                                 *  sending. Instead, use a wrapper to
99                                                 *  'sosend()' which will point an MBUF
100                                                 *  directly to our buffer space.
101                                                 *  Note that the BSD stack does not copy
102                                                 *  data when fragmenting packets - it
103                                                 *  merely uses an mbuf chain pointing
104                                                 *  into different areas of the data.
105                                                 *
106                                                 * If undefined, the regular 'sendto()'
107                                                 *  interface is used.
108                                                 */
109
110#undef REJECT_SERVERIP_MISMATCH
111                                                /* If defined, RPC replies must come from the server
112                                                 * that was queried. Eric Norum has reported problems
113                                                 * with clustered NFS servers. So we disable this
114                                                 * reducing paranoia...
115                                                 */
116
117/* daemon task parameters */
118#define RPCIOD_STACK            10000
119#define RPCIOD_PRIO                     100     /* *fallback* priority */
120
121/* depth of the message queue for sending
122 * RPC requests to the daemon
123 */
124#define RPCIOD_QDEPTH           20
125
126/* Maximum retry limit for retransmission */
127#define RPCIOD_RETX_CAP_S       3 /* seconds */
128
129/* Default timeout for RPC calls */
130#define RPCIOD_DEFAULT_TIMEOUT  (&_rpc_default_timeout)
131static struct timeval _rpc_default_timeout = { 10 /* secs */, 0 /* usecs */ };
132
133/* how many times should we try to resend a failed
134 * transaction with refreshed AUTHs
135 */
136#define RPCIOD_REFRESH          2
137
138/* Events we are using; the RPC_EVENT
139 * MUST NOT be used by any application
140 * thread doing RPC IO (e.g. NFS)
141 */
142#define RTEMS_RPC_EVENT         RTEMS_EVENT_30  /* THE event used by RPCIO. Every task doing
143                                                                                         * RPC IO will receive this - hence it is
144                                                                                         * RESERVED
145                                                                                         */
146#define RPCIOD_RX_EVENT         RTEMS_EVENT_1   /* Events the RPCIOD is using/waiting for */
147#define RPCIOD_TX_EVENT         RTEMS_EVENT_2
148#define RPCIOD_KILL_EVENT       RTEMS_EVENT_3   /* send to the daemon to kill it          */
149
150#define LD_XACT_HASH            8                               /* ld of the size of the transaction hash table  */
151
152
153/* Debugging Flags                                              */
154
155/* NOTE: defining DEBUG 0 leaves some 'assert()' paranoia checks
156 *       but produces no output
157 */
158
159#define DEBUG_TRACE_XACT        (1<<0)
160#define DEBUG_EVENTS            (1<<1)
161#define DEBUG_MALLOC            (1<<2)
162#define DEBUG_TIMEOUT           (1<<3)
163#define DEBUG_PACKLOSS          (1<<4)  /* This introduces random, artificial packet losses to test retransmission */
164
165#define DEBUG_PACKLOSS_FRACT (0xffffffff/10)
166
167/* USE PARENTHESIS WHEN 'or'ing MULTIPLE FLAGS: (DEBUG_XX | DEBUG_YY) */
168#define DEBUG                           (0)
169
170/****************************************************************/
171/* END OF CONFIGURABLE SECTION                                  */
172/****************************************************************/
173
174/* prevent rollover of our timers by readjusting the epoch on the fly */
175#if     (DEBUG) & DEBUG_TIMEOUT
176#define RPCIOD_EPOCH_SECS       10
177#else
178#define RPCIOD_EPOCH_SECS       10000
179#endif
180
181#ifdef  DEBUG
182#define ASSERT(arg)                     assert(arg)
183#else
184#define ASSERT(arg)                     if (arg)
185#endif
186
187/****************************************************************/
188/* MACROS                                                       */
189/****************************************************************/
190
191
192#define XACT_HASHS              (1<<(LD_XACT_HASH))     /* the hash table size derived from the ld       */
193#define XACT_HASH_MSK   ((XACT_HASHS)-1)        /* mask to extract the hash index from a RPC-XID */
194
195
196#define MU_LOCK(mutex)          do {                                                    \
197                                                        assert(                                                 \
198                                                                RTEMS_SUCCESSFUL ==                     \
199                                                                rtems_semaphore_obtain(         \
200                                                                                (mutex),                        \
201                                                                                RTEMS_WAIT,                     \
202                                                                                RTEMS_NO_TIMEOUT        \
203                                                                                ) );                            \
204                                                        } while(0)
205
206#define MU_UNLOCK(mutex)        do {                                                    \
207                                                        assert(                                                 \
208                                                                RTEMS_SUCCESSFUL ==                     \
209                                                                rtems_semaphore_release(        \
210                                                                                (mutex)                         \
211                                                                                ) );                            \
212                                                        } while(0)
213
214#define MU_CREAT(pmutex)        do {                                                    \
215                                                        assert(                                                 \
216                                                                RTEMS_SUCCESSFUL ==                     \
217                                                                rtems_semaphore_create(         \
218                                                                                rtems_build_name(       \
219                                                                                        'R','P','C','l' \
220                                                                                        ),                              \
221                                                                                1,                                      \
222                                                                                MUTEX_ATTRIBUTES,       \
223                                                                                0,                                      \
224                                                                                (pmutex)) );            \
225                                                        } while (0)
226
227
228#define MU_DESTROY(mutex)       do {                                                    \
229                                                        assert(                                                 \
230                                                                RTEMS_SUCCESSFUL ==                     \
231                                                                rtems_semaphore_delete(         \
232                                                                                mutex                           \
233                                                                                ) );                            \
234                                                        } while (0)
235
236#define MUTEX_ATTRIBUTES        (RTEMS_LOCAL           |                \
237                                                        RTEMS_PRIORITY         |                \
238                                                        RTEMS_INHERIT_PRIORITY |                \
239                                                        RTEMS_BINARY_SEMAPHORE)
240
241#define FIRST_ATTEMPT           0x88888888 /* some time that is never reached */
242
243/****************************************************************/
244/* TYPE DEFINITIONS                                             */
245/****************************************************************/
246
247typedef rtems_interval          TimeoutT;
248
249/* 100000th implementation of a doubly linked list;
250 * since only one thread is looking at these,
251 * we need no locking
252 */
253typedef struct ListNodeRec_ {
254        struct ListNodeRec_ *next, *prev;
255} ListNodeRec, *ListNode;
256
257
258/* Structure representing an RPC server */
259typedef struct RpcUdpServerRec_ {
260                RpcUdpServer            next;                   /* linked list of all servers; protected by hlock */
261                union {
262                struct sockaddr_in      sin;
263                struct sockaddr     sa;
264                }                                       addr;
265                AUTH                            *auth;
266                rtems_id                        authlock;               /* must MUTEX the auth object - it's not clear
267                                                                                         *  what is better:
268                                                                                         *   1 having one (MUTEXed) auth per server
269                                                                                         *         who is shared among all transactions
270                                                                                         *         using that server
271                                                                                         *       2 maintaining an AUTH per transaction
272                                                                                         *         (there are then other options: manage
273                                                                                         *         XACT pools on a per-server basis instead
274                                                                                         *         of associating a server with a XACT when
275                                                                                         *   sending)
276                                                                                         * experience will show if the current (1)
277                                                                                         * approach has to be changed.
278                                                                                         */
279                TimeoutT                        retry_period;   /* dynamically adjusted retry period
280                                                                                         * (based on packet roundtrip time)
281                                                                                         */
282                /* STATISTICS */
283                unsigned long           retrans;                /* how many retries were issued by this server         */
284                unsigned long           requests;               /* how many requests have been sent                    */
285                unsigned long       timeouts;           /* how many requests have timed out                    */
286                unsigned long       errors;         /* how many errors have occurred (other than timeouts) */
287                char                            name[20];               /* server's address in IP 'dot' notation               */
288} RpcUdpServerRec;
289
290typedef union  RpcBufU_ {
291                uint32_t                        xid;
292                char                            buf[1];
293} RpcBufU, *RpcBuf;
294
295/* RX Buffer implementation; this is either
296 * an MBUF chain (MBUF_RX configuration)
297 * or a buffer allocated from the heap
298 * where recvfrom copies the (encoded) reply
299 * to. The XDR routines the copy/decode
300 * it into the user's data structures.
301 */
302#ifdef MBUF_RX
303typedef struct mbuf *           RxBuf;  /* an MBUF chain */
304static  void                            bufFree(struct mbuf **m);
305#define XID(ibuf)                       (*(mtod((ibuf), u_long *)))
306extern void                             xdrmbuf_create(XDR *, struct mbuf *, enum xdr_op);
307#else
308typedef RpcBuf                          RxBuf;
309#define bufFree(b)                      do { MY_FREE(*(b)); *(b)=0; } while(0)
310#define XID(ibuf)                       ((ibuf)->xid)
311#endif
312
313/* A RPC 'transaction' consisting
314 * of server and requestor information,
315 * buffer space and an XDR object
316 * (for encoding arguments).
317 */
318typedef struct RpcUdpXactRec_ {
319                ListNodeRec                     node;           /* so we can put XACTs on a list                */
320                RpcUdpServer            server;         /* server this XACT goes to                     */
321                long                            lifetime;       /* during the lifetime, retry attempts are made */
322                long                            tolive;         /* lifetime timer                               */
323                struct rpc_err          status;         /* RPC reply error status                       */
324                long                            age;            /* age info; needed to manage retransmission    */
325                long                            trip;           /* record round trip time in ticks              */
326                rtems_id                        requestor;      /* the task waiting for this XACT to complete   */
327                RpcUdpXactPool          pool;           /* if this XACT belong to a pool, this is it    */
328                XDR                                     xdrs;           /* argument encoder stream                      */
329                int                                     xdrpos;     /* stream position after the (permanent) header */
330                xdrproc_t                       xres;           /* reply decoder proc - TODO needn't be here    */
331                caddr_t                         pres;           /* reply decoded obj  - TODO needn't be here    */
332#ifndef MBUF_RX
333                int                                     ibufsize;       /* size of the ibuf (bytes)                     */
334#endif
335#ifdef  MBUF_TX
336                int                                     refcnt;         /* mbuf external storage reference count        */
337#endif
338                int                                     obufsize;       /* size of the obuf (bytes)                     */
339                RxBuf                           ibuf;           /* pointer to input buffer assigned by daemon   */
340                RpcBufU                         obuf;       /* output buffer (encoded args) APPENDED HERE   */
341} RpcUdpXactRec;
342
343typedef struct RpcUdpXactPoolRec_ {
344        rtems_id        box;
345        int                     prog;
346        int                     version;
347        int                     xactSize;
348} RpcUdpXactPoolRec;
349
350/* a global hash table where all 'living' transaction
351 * objects are registered.
352 * A number of bits in a transaction's XID maps 1:1 to
353 * an index in this table. Hence, the XACT matching
354 * an RPC/UDP reply packet can quickly be found
355 * The size of this table imposes a hard limit on the
356 * number of all created transactions in the system.
357 */
358static RpcUdpXact xactHashTbl[XACT_HASHS]={0};
359static u_long     xidUpper   [XACT_HASHS]={0};
360static unsigned   xidHashSeed            = 0 ;
361
362/* forward declarations */
363static RpcUdpXact
364sockRcv(void);
365
366static void
367rpcio_daemon(rtems_task_argument);
368
369#ifdef MBUF_TX
370ssize_t
371sendto_nocpy (
372                int s,
373                const void *buf, size_t buflen,
374                int flags,
375                const struct sockaddr *toaddr, int tolen,
376                void *closure,
377                void (*freeproc)(caddr_t, u_int),
378                void (*refproc)(caddr_t, u_int)
379);
380static void paranoia_free(caddr_t closure, u_int size);
381static void paranoia_ref (caddr_t closure, u_int size);
382#define SENDTO  sendto_nocpy
383#else
384#define SENDTO  sendto
385#endif
386
387static RpcUdpServer             rpcUdpServers = 0;      /* linked list of all servers; protected by llock */
388
389static int                              ourSock = -1;           /* the socket we are using for communication */
390static rtems_id                 rpciod  = 0;            /* task id of the RPC daemon                 */
391static rtems_id                 msgQ    = 0;            /* message queue where the daemon picks up
392                                                                                         * requests
393                                                                                         */
394#ifndef NDEBUG
395static rtems_id                 llock   = 0;            /* MUTEX protecting the server list */
396static rtems_id                 hlock   = 0;            /* MUTEX protecting the hash table and the list of servers */
397#endif
398static rtems_id                 fini    = 0;            /* a synchronization semaphore we use during
399                                                                                         * module cleanup / driver unloading
400                                                                                         */
401static rtems_interval   ticksPerSec;            /* cached system clock rate (WHO IS ASSUMED NOT
402                                                                                         * TO CHANGE)
403                                                                                         */
404
405rtems_task_priority             rpciodPriority = 0;
406
407#if (DEBUG) & DEBUG_MALLOC
408/* malloc wrappers for debugging */
409static int nibufs = 0;
410
411static inline void *MY_MALLOC(int s)
412{
413        if (s) {
414                void *rval;
415                MU_LOCK(hlock);
416                assert(nibufs++ < 2000);
417                MU_UNLOCK(hlock);
418                assert((rval = malloc(s)) != 0);
419                return rval;
420        }
421        return 0;
422}
423
424static inline void *MY_CALLOC(int n, int s)
425{
426        if (s) {
427                void *rval;
428                MU_LOCK(hlock);
429                assert(nibufs++ < 2000);
430                MU_UNLOCK(hlock);
431                assert((rval = calloc(n,s)) != 0);
432                return rval;
433        }
434        return 0;
435}
436
437
438static inline void MY_FREE(void *p)
439{
440        if (p) {
441                MU_LOCK(hlock);
442                nibufs--;
443                MU_UNLOCK(hlock);
444                free(p);
445        }
446}
447#else
448#define MY_MALLOC       malloc
449#define MY_CALLOC       calloc
450#define MY_FREE         free
451#endif
452
453static inline bool_t
454locked_marshal(RpcUdpServer s, XDR *xdrs)
455{
456bool_t rval;
457        MU_LOCK(s->authlock);
458        rval = AUTH_MARSHALL(s->auth, xdrs);
459        MU_UNLOCK(s->authlock);
460        return rval;
461}
462
463/* Locked operations on a server's auth object */
464static inline bool_t
465locked_validate(RpcUdpServer s, struct opaque_auth *v)
466{
467bool_t rval;
468        MU_LOCK(s->authlock);
469        rval = AUTH_VALIDATE(s->auth, v);
470        MU_UNLOCK(s->authlock);
471        return rval;
472}
473
474static inline bool_t
475locked_refresh(RpcUdpServer s)
476{
477bool_t rval;
478        MU_LOCK(s->authlock);
479        rval = AUTH_REFRESH(s->auth);
480        MU_UNLOCK(s->authlock);
481        return rval;
482}
483
484/* Create a server object
485 *
486 */
487enum clnt_stat
488rpcUdpServerCreate(
489        struct sockaddr_in      *paddr,
490        rpcprog_t               prog,
491        rpcvers_t               vers,
492        u_long                  uid,
493        u_long                  gid,
494        RpcUdpServer            *psrv
495        )
496{
497RpcUdpServer    rval;
498u_short                 port;
499char                    hname[MAX_MACHINE_NAME + 1];
500int                             theuid, thegid;
501int                             thegids[NGRPS];
502gid_t                   gids[NGROUPS];
503int                             len,i;
504AUTH                    *auth;
505enum clnt_stat  pmap_err;
506struct pmap             pmaparg;
507
508        if ( gethostname(hname, MAX_MACHINE_NAME) ) {
509                fprintf(stderr,
510                                "RPCIO - error: I have no hostname ?? (%s)\n",
511                                strerror(errno));
512                return RPC_UNKNOWNHOST;
513        }
514
515        if ( (len = getgroups(NGROUPS, gids) < 0 ) ) {
516                fprintf(stderr,
517                                "RPCIO - error: I unable to get group ids (%s)\n",
518                                strerror(errno));
519                return RPC_FAILED;
520        }
521
522        if ( len > NGRPS )
523                len = NGRPS;
524
525        for (i=0; i<len; i++)
526                thegids[i] = (int)gids[i];
527
528        theuid = (int) ((RPCIOD_DEFAULT_ID == uid) ? geteuid() : uid);
529        thegid = (int) ((RPCIOD_DEFAULT_ID == gid) ? getegid() : gid);
530
531        if ( !(auth = authunix_create(hname, theuid, thegid, len, thegids)) ) {
532                fprintf(stderr,
533                                "RPCIO - error: unable to create RPC AUTH\n");
534                return RPC_FAILED;
535        }
536
537        /* if they specified no port try to ask the portmapper */
538        if (!paddr->sin_port) {
539
540                paddr->sin_port = htons(PMAPPORT);
541
542        pmaparg.pm_prog = prog;
543        pmaparg.pm_vers = vers;
544        pmaparg.pm_prot = IPPROTO_UDP;
545        pmaparg.pm_port = 0;  /* not needed or used */
546
547
548                /* dont use non-reentrant pmap_getport ! */
549
550                pmap_err = rpcUdpCallRp(
551                                                paddr,
552                                                PMAPPROG,
553                                                PMAPVERS,
554                                                PMAPPROC_GETPORT,
555                                                xdr_pmap,
556                                                &pmaparg,
557                                                xdr_u_short,
558                                                &port,
559                                                uid,
560                                                gid,
561                                                0);
562
563                if ( RPC_SUCCESS != pmap_err ) {
564                        paddr->sin_port = 0;
565                        return pmap_err;
566                }
567
568                paddr->sin_port = htons(port);
569        }
570
571        if (0==paddr->sin_port) {
572                        return RPC_PROGNOTREGISTERED;
573        }
574
575        rval                            = (RpcUdpServer)MY_MALLOC(sizeof(*rval));
576        memset(rval, 0, sizeof(*rval));
577
578        if (!inet_ntop(AF_INET, &paddr->sin_addr, rval->name, sizeof(rval->name)))
579                sprintf(rval->name,"?.?.?.?");
580        rval->addr.sin          = *paddr;
581
582        /* start with a long retransmission interval - it
583         * will be adapted dynamically
584         */
585        rval->retry_period  = RPCIOD_RETX_CAP_S * ticksPerSec;
586
587        rval->auth                      = auth;
588
589        MU_CREAT( &rval->authlock );
590
591        /* link into list */
592        MU_LOCK( llock );
593        rval->next = rpcUdpServers;
594        rpcUdpServers = rval;
595        MU_UNLOCK( llock );
596
597        *psrv                           = rval;
598        return RPC_SUCCESS;
599}
600
601void
602rpcUdpServerDestroy(RpcUdpServer s)
603{
604RpcUdpServer prev;
605        if (!s)
606                return;
607        /* we should probably verify (but how?) that nobody
608         * (at least: no outstanding XACTs) is using this
609         * server;
610         */
611
612        /* remove from server list */
613        MU_LOCK(llock);
614        prev = rpcUdpServers;
615        if ( s == prev ) {
616                rpcUdpServers = s->next;
617        } else {
618                for ( ; prev ; prev = prev->next) {
619                        if (prev->next == s) {
620                                prev->next = s->next;
621                                break;
622                        }
623                }
624        }
625        MU_UNLOCK(llock);
626
627        /* MUST have found it */
628        assert(prev);
629
630        auth_destroy(s->auth);
631
632        MU_DESTROY(s->authlock);
633        MY_FREE(s);
634}
635
636int
637rpcUdpStats(FILE *f)
638{
639RpcUdpServer s;
640
641        if (!f) f = stdout;
642
643        fprintf(f,"RPCIOD statistics:\n");
644
645        MU_LOCK(llock);
646        for (s = rpcUdpServers; s; s=s->next) {
647                fprintf(f,"\nServer -- %s:\n", s->name);
648                fprintf(f,"  requests    sent: %10ld, retransmitted: %10ld\n",
649                                                s->requests, s->retrans);
650                fprintf(f,"         timed out: %10ld,   send errors: %10ld\n",
651                                                s->timeouts, s->errors);
652                fprintf(f,"  current retransmission interval: %dms\n",
653                                                (unsigned)(s->retry_period * 1000 / ticksPerSec) );
654        }
655        MU_UNLOCK(llock);
656
657        return 0;
658}
659
660RpcUdpXact
661rpcUdpXactCreate(
662        u_long  program,
663        u_long  version,
664        u_long  size
665        )
666{
667RpcUdpXact              rval=0;
668struct rpc_msg  header;
669register int    i,j;
670
671        if (!size)
672                size = UDPMSGSIZE;
673        /* word align */
674        size = (size + 3) & ~3;
675
676        rval = (RpcUdpXact)MY_CALLOC(1,sizeof(*rval) - sizeof(rval->obuf) + size);
677
678        if (rval) {
679
680                header.rm_xid             = 0;
681                header.rm_direction       = CALL;
682                header.rm_call.cb_rpcvers = RPC_MSG_VERSION;
683                header.rm_call.cb_prog    = program;
684                header.rm_call.cb_vers    = version;
685                xdrmem_create(&(rval->xdrs), rval->obuf.buf, size, XDR_ENCODE);
686
687                if (!xdr_callhdr(&(rval->xdrs), &header)) {
688                        MY_FREE(rval);
689                        return 0;
690                }
691                /* pick a free table slot and initialize the XID */
692                rval->obuf.xid = time(0) ^ (uintptr_t)rval;
693                MU_LOCK(hlock);
694                rval->obuf.xid = (xidHashSeed++ ^ ((uintptr_t)rval>>10)) & XACT_HASH_MSK;
695                i=j=(rval->obuf.xid & XACT_HASH_MSK);
696                if (msgQ) {
697                        /* if there's no message queue, refuse to
698                         * give them transactions; we might be in the process to
699                         * go away...
700                         */
701                        do {
702                                i=(i+1) & XACT_HASH_MSK; /* cheap modulo */
703                                if (!xactHashTbl[i]) {
704#if (DEBUG) & DEBUG_TRACE_XACT
705                                        fprintf(stderr,"RPCIO: entering index %i, val %x\n",i,rval);
706#endif
707                                        xactHashTbl[i]=rval;
708                                        j=-1;
709                                        break;
710                                }
711                        } while (i!=j);
712                }
713                MU_UNLOCK(hlock);
714                if (i==j) {
715                        XDR_DESTROY(&rval->xdrs);
716                        MY_FREE(rval);
717                        return 0;
718                }
719                rval->obuf.xid  = xidUpper[i] | i;
720                rval->xdrpos    = XDR_GETPOS(&(rval->xdrs));
721                rval->obufsize  = size;
722        }
723        return rval;
724}
725
726void
727rpcUdpXactDestroy(RpcUdpXact xact)
728{
729int i = xact->obuf.xid & XACT_HASH_MSK;
730
731#if (DEBUG) & DEBUG_TRACE_XACT
732                fprintf(stderr,"RPCIO: removing index %i, val %x\n",i,xact);
733#endif
734
735                ASSERT( xactHashTbl[i]==xact );
736
737                MU_LOCK(hlock);
738                xactHashTbl[i]=0;
739                /* remember XID we used last time so we can avoid
740                 * reusing the same one (incremented by rpcUdpSend routine)
741                 */
742                xidUpper[i]   = xact->obuf.xid & ~XACT_HASH_MSK;
743                MU_UNLOCK(hlock);
744
745                bufFree(&xact->ibuf);
746
747                XDR_DESTROY(&xact->xdrs);
748                MY_FREE(xact);
749}
750
751
752
753/* Send a transaction, i.e. enqueue it to the
754 * RPC daemon who will actually send it.
755 */
756enum clnt_stat
757rpcUdpSend(
758        RpcUdpXact              xact,
759        RpcUdpServer    srvr,
760        struct timeval  *timeout,
761        u_long                  proc,
762        xdrproc_t               xres, caddr_t pres,
763        xdrproc_t               xargs, caddr_t pargs,
764        ...
765   )
766{
767register XDR    *xdrs;
768unsigned long   ms;
769va_list                 ap;
770
771        va_start(ap,pargs);
772
773        if (!timeout)
774                timeout = RPCIOD_DEFAULT_TIMEOUT;
775
776        ms = 1000 * timeout->tv_sec + timeout->tv_usec/1000;
777
778        /* round lifetime to closest # of ticks */
779        xact->lifetime  = (ms * ticksPerSec + 500) / 1000;
780        if ( 0 == xact->lifetime )
781                xact->lifetime = 1;
782
783#if (DEBUG) & DEBUG_TIMEOUT
784        {
785        static int once=0;
786        if (!once++) {
787                fprintf(stderr,
788                                "Initial lifetime: %i (ticks)\n",
789                                xact->lifetime);
790        }
791        }
792#endif
793
794        xact->tolive    = xact->lifetime;
795
796        xact->xres      = xres;
797        xact->pres      = pres;
798        xact->server    = srvr;
799
800        xdrs            = &xact->xdrs;
801        xdrs->x_op      = XDR_ENCODE;
802        /* increment transaction ID */
803        xact->obuf.xid += XACT_HASHS;
804        XDR_SETPOS(xdrs, xact->xdrpos);
805        if ( !XDR_PUTLONG(xdrs,(long*)&proc) || !locked_marshal(srvr, xdrs) ||
806                 !xargs(xdrs, pargs) ) {
807                va_end(ap);
808                return(xact->status.re_status=RPC_CANTENCODEARGS);
809        }
810        while ((xargs=va_arg(ap,xdrproc_t))) {
811                if (!xargs(xdrs, va_arg(ap,caddr_t)))
812                va_end(ap);
813                return(xact->status.re_status=RPC_CANTENCODEARGS);
814        }
815
816        va_end(ap);
817
818        rtems_task_ident(RTEMS_SELF, RTEMS_WHO_AM_I, &xact->requestor);
819        if ( rtems_message_queue_send( msgQ, &xact, sizeof(xact)) ) {
820                return RPC_CANTSEND;
821        }
822        /* wakeup the rpciod */
823        ASSERT( RTEMS_SUCCESSFUL==rtems_event_send(rpciod, RPCIOD_TX_EVENT) );
824
825        return RPC_SUCCESS;
826}
827
828/* Block for the RPC reply to an outstanding
829 * transaction.
830 * The caller is woken by the RPC daemon either
831 * upon reception of the reply or on timeout.
832 */
833enum clnt_stat
834rpcUdpRcv(RpcUdpXact xact)
835{
836int                                     refresh;
837XDR                     reply_xdrs;
838struct rpc_msg          reply_msg;
839rtems_status_code       status;
840rtems_event_set         gotEvents;
841
842        refresh = 0;
843
844        do {
845
846        /* block for the reply */
847        status = rtems_event_receive(
848                RTEMS_RPC_EVENT,
849                RTEMS_WAIT | RTEMS_EVENT_ANY,
850                RTEMS_NO_TIMEOUT,
851                &gotEvents);
852        ASSERT( status == RTEMS_SUCCESSFUL );
853
854        if (xact->status.re_status) {
855#ifdef MBUF_RX
856                /* add paranoia */
857                ASSERT( !xact->ibuf );
858#endif
859                return xact->status.re_status;
860        }
861
862#ifdef MBUF_RX
863        xdrmbuf_create(&reply_xdrs, xact->ibuf, XDR_DECODE);
864#else
865        xdrmem_create(&reply_xdrs, xact->ibuf->buf, xact->ibufsize, XDR_DECODE);
866#endif
867
868        reply_msg.acpted_rply.ar_verf          = _null_auth;
869        reply_msg.acpted_rply.ar_results.where = xact->pres;
870        reply_msg.acpted_rply.ar_results.proc  = xact->xres;
871
872        if (xdr_replymsg(&reply_xdrs, &reply_msg)) {
873                /* OK */
874                _seterr_reply(&reply_msg, &xact->status);
875                if (RPC_SUCCESS == xact->status.re_status) {
876                        if ( !locked_validate(xact->server,
877                                                                &reply_msg.acpted_rply.ar_verf) ) {
878                                xact->status.re_status = RPC_AUTHERROR;
879                                xact->status.re_why    = AUTH_INVALIDRESP;
880                        }
881                        if (reply_msg.acpted_rply.ar_verf.oa_base) {
882                                reply_xdrs.x_op = XDR_FREE;
883                                xdr_opaque_auth(&reply_xdrs, &reply_msg.acpted_rply.ar_verf);
884                        }
885                        refresh = 0;
886                } else {
887                        /* should we try to refresh our credentials ? */
888                        if ( !refresh ) {
889                                /* had never tried before */
890                                refresh = RPCIOD_REFRESH;
891                        }
892                }
893        } else {
894                reply_xdrs.x_op        = XDR_FREE;
895                xdr_replymsg(&reply_xdrs, &reply_msg);
896                xact->status.re_status = RPC_CANTDECODERES;
897        }
898        XDR_DESTROY(&reply_xdrs);
899
900        bufFree(&xact->ibuf);
901
902#ifndef MBUF_RX
903        xact->ibufsize = 0;
904#endif
905
906        if (refresh && locked_refresh(xact->server)) {
907                rtems_task_ident(RTEMS_SELF, RTEMS_WHO_AM_I, &xact->requestor);
908                if ( rtems_message_queue_send(msgQ, &xact, sizeof(xact)) ) {
909                        return RPC_CANTSEND;
910                }
911                /* wakeup the rpciod */
912                fprintf(stderr,"RPCIO INFO: refreshing my AUTH\n");
913                ASSERT( RTEMS_SUCCESSFUL==rtems_event_send(rpciod, RPCIOD_TX_EVENT) );
914        }
915
916        } while ( 0 &&  refresh-- > 0 );
917
918        return xact->status.re_status;
919}
920
921
922/* On RTEMS, I'm told to avoid select(); this seems to
923 * be more efficient
924 */
925static void
926rxWakeupCB(struct socket *sock, void *arg)
927{
928  rtems_id *rpciod = (rtems_id*) arg;
929  rtems_event_send(*rpciod, RPCIOD_RX_EVENT);
930}
931
932int
933rpcUdpInit(void)
934{
935int                     s;
936rtems_status_code       status;
937int                     noblock = 1;
938struct sockwakeup       wkup;
939
940        if (ourSock < 0) {
941    fprintf(stderr,"RTEMS-RPCIOD $Release$, " \
942            "Till Straumann, Stanford/SLAC/SSRL 2002, " \
943            "See LICENSE file for licensing info.\n");
944
945                ourSock=socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
946                if (ourSock>=0) {
947                        bindresvport(ourSock,(struct sockaddr_in*)0);
948                        s = ioctl(ourSock, FIONBIO, (char*)&noblock);
949                        assert( s == 0 );
950                        /* assume nobody tampers with the clock !! */
951                        ticksPerSec = rtems_clock_get_ticks_per_second();
952                        MU_CREAT( &hlock );
953                        MU_CREAT( &llock );
954
955                        if ( !rpciodPriority ) {
956                                /* use configured networking priority */
957                                if ( ! (rpciodPriority = rtems_bsdnet_config.network_task_priority) )
958                                        rpciodPriority = RPCIOD_PRIO;   /* fallback value */
959                        }
960
961                        status = rtems_task_create(
962                                                                                        rtems_build_name('R','P','C','d'),
963                                                                                        rpciodPriority,
964                                                                                        RPCIOD_STACK,
965                                                                                        RTEMS_DEFAULT_MODES,
966                                                                                        /* fprintf saves/restores FP registers on PPC :-( */
967                                                                                        RTEMS_DEFAULT_ATTRIBUTES | RTEMS_FLOATING_POINT,
968                                                                                        &rpciod);
969                        assert( status == RTEMS_SUCCESSFUL );
970
971                        wkup.sw_pfn = rxWakeupCB;
972                        wkup.sw_arg = &rpciod;
973                        assert( 0==setsockopt(ourSock, SOL_SOCKET, SO_RCVWAKEUP, &wkup, sizeof(wkup)) );
974                        status = rtems_message_queue_create(
975                                                                                        rtems_build_name('R','P','C','q'),
976                                                                                        RPCIOD_QDEPTH,
977                                                                                        sizeof(RpcUdpXact),
978                                                                                        RTEMS_DEFAULT_ATTRIBUTES,
979                                                                                        &msgQ);
980                        assert( status == RTEMS_SUCCESSFUL );
981                        status = rtems_task_start( rpciod, rpcio_daemon, 0 );
982                        assert( status == RTEMS_SUCCESSFUL );
983
984                } else {
985                        return -1;
986                }
987        }
988        return 0;
989}
990
991int
992rpcUdpCleanup(void)
993{
994        rtems_semaphore_create(
995                        rtems_build_name('R','P','C','f'),
996                        0,
997                        RTEMS_DEFAULT_ATTRIBUTES,
998                        0,
999                        &fini);
1000        rtems_event_send(rpciod, RPCIOD_KILL_EVENT);
1001        /* synchronize with daemon */
1002        rtems_semaphore_obtain(fini, RTEMS_WAIT, 5*ticksPerSec);
1003        /* if the message queue is still there, something went wrong */
1004        if (!msgQ) {
1005                rtems_task_delete(rpciod);
1006        }
1007        rtems_semaphore_delete(fini);
1008        return (msgQ !=0);
1009}
1010
1011/* Another API - simpler but less efficient.
1012 * For each RPCall, a server and a Xact
1013 * are created and destroyed on the fly.
1014 *
1015 * This should be used for infrequent calls
1016 * (e.g. a NFS mount request).
1017 *
1018 * This is roughly compatible with the original
1019 * clnt_call() etc. API - but it uses our
1020 * daemon and is fully reentrant.
1021 */
1022enum clnt_stat
1023rpcUdpClntCreate(
1024                struct sockaddr_in      *psaddr,
1025                rpcprog_t               prog,
1026                rpcvers_t               vers,
1027                u_long                  uid,
1028                u_long                  gid,
1029                RpcUdpClnt              *pclnt
1030)
1031{
1032RpcUdpXact              x;
1033RpcUdpServer    s;
1034enum clnt_stat  err;
1035
1036        if ( RPC_SUCCESS != (err=rpcUdpServerCreate(psaddr, prog, vers, uid, gid, &s)) )
1037                return err;
1038
1039        if ( !(x=rpcUdpXactCreate(prog, vers, UDPMSGSIZE)) ) {
1040                rpcUdpServerDestroy(s);
1041                return RPC_FAILED;
1042        }
1043        /* TODO: could maintain a server cache */
1044
1045        x->server = s;
1046
1047        *pclnt = x;
1048
1049        return RPC_SUCCESS;
1050}
1051
1052void
1053rpcUdpClntDestroy(RpcUdpClnt xact)
1054{
1055        rpcUdpServerDestroy(xact->server);
1056        rpcUdpXactDestroy(xact);
1057}
1058
1059enum clnt_stat
1060rpcUdpClntCall(
1061        RpcUdpClnt              xact,
1062        u_long                  proc,
1063        XdrProcT                xargs,
1064        CaddrT                  pargs,
1065        XdrProcT                xres,
1066        CaddrT                  pres,
1067        struct timeval  *timeout
1068        )
1069{
1070enum clnt_stat  stat;
1071
1072                if ( (stat = rpcUdpSend(xact, xact->server, timeout, proc,
1073                                                                xres, pres,
1074                                                                xargs, pargs,
1075                                                                0)) ) {
1076                        fprintf(stderr,"RPCIO Send failed: %i\n",stat);
1077                        return stat;
1078                }
1079                return rpcUdpRcv(xact);
1080}
1081
1082/* a yet simpler interface */
1083enum clnt_stat
1084rpcUdpCallRp(
1085        struct sockaddr_in      *psrvr,
1086        u_long                          prog,
1087        u_long                          vers,
1088        u_long                          proc,
1089        XdrProcT                        xargs,
1090        CaddrT                          pargs,
1091        XdrProcT                        xres,
1092        CaddrT                          pres,
1093        u_long                          uid,            /* RPCIO_DEFAULT_ID picks default */
1094        u_long                          gid,            /* RPCIO_DEFAULT_ID picks default */
1095        struct timeval          *timeout        /* NULL picks default           */
1096)
1097{
1098RpcUdpClnt                      clp;
1099enum clnt_stat          stat;
1100
1101        stat = rpcUdpClntCreate(
1102                                psrvr,
1103                                prog,
1104                                vers,
1105                                uid,
1106                                gid,
1107                                &clp);
1108
1109        if ( RPC_SUCCESS != stat )
1110                return stat;
1111
1112        stat = rpcUdpClntCall(
1113                                clp,
1114                                proc,
1115                                xargs, pargs,
1116                                xres,  pres,
1117                                timeout);
1118
1119        rpcUdpClntDestroy(clp);
1120
1121        return stat;
1122}
1123
1124/* linked list primitives */
1125static void
1126nodeXtract(ListNode n)
1127{
1128        if (n->prev)
1129                n->prev->next = n->next;
1130        if (n->next)
1131                n->next->prev = n->prev;
1132        n->next = n->prev = 0;
1133}
1134
1135static void
1136nodeAppend(ListNode l, ListNode n)
1137{
1138        if ( (n->next = l->next) )
1139                n->next->prev = n;
1140        l->next = n;
1141        n->prev = l;
1142
1143}
1144
1145/* this code does the work */
1146static void
1147rpcio_daemon(rtems_task_argument arg)
1148{
1149rtems_status_code stat;
1150RpcUdpXact        xact;
1151RpcUdpServer      srv;
1152rtems_interval    next_retrans, then, unow;
1153long                                            now;    /* need to do signed comparison with age! */
1154rtems_event_set   events;
1155ListNode          newList;
1156size_t            size;
1157rtems_id          q          =  0;
1158ListNodeRec       listHead   = {0, 0};
1159unsigned long     epoch      = RPCIOD_EPOCH_SECS * ticksPerSec;
1160unsigned long                   max_period = RPCIOD_RETX_CAP_S * ticksPerSec;
1161rtems_status_code       status;
1162
1163
1164        then = rtems_clock_get_ticks_since_boot();
1165
1166        for (next_retrans = epoch;;) {
1167
1168                if ( RTEMS_SUCCESSFUL !=
1169                         (stat = rtems_event_receive(
1170                                                RPCIOD_RX_EVENT | RPCIOD_TX_EVENT | RPCIOD_KILL_EVENT,
1171                                                RTEMS_WAIT | RTEMS_EVENT_ANY,
1172                                                next_retrans,
1173                                                &events)) ) {
1174                        ASSERT( RTEMS_TIMEOUT == stat );
1175                        events = 0;
1176                }
1177
1178                if (events & RPCIOD_KILL_EVENT) {
1179                        int i;
1180
1181#if (DEBUG) & DEBUG_EVENTS
1182                        fprintf(stderr,"RPCIO: got KILL event\n");
1183#endif
1184
1185                        MU_LOCK(hlock);
1186                        for (i=XACT_HASHS-1; i>=0; i--) {
1187                                if (xactHashTbl[i]) {
1188                                        break;
1189                                }
1190                        }
1191                        if (i<0) {
1192                                /* prevent them from creating and enqueueing more messages */
1193                                q=msgQ;
1194                                /* messages queued after we executed this assignment will fail */
1195                                msgQ=0;
1196                        }
1197                        MU_UNLOCK(hlock);
1198                        if (i>=0) {
1199                                fprintf(stderr,"RPCIO There are still transactions circulating; I refuse to go away\n");
1200                                fprintf(stderr,"(1st in slot %i)\n",i);
1201                                rtems_semaphore_release(fini);
1202                        } else {
1203                                break;
1204                        }
1205                }
1206
1207                unow = rtems_clock_get_ticks_since_boot();
1208
1209                /* measure everything relative to then to protect against
1210                 * rollover
1211                 */
1212                now = unow - then;
1213
1214                /* NOTE: we don't lock the hash table while we are operating
1215                 * on transactions; the paradigm is that we 'own' a particular
1216                 * transaction (and hence it's hash table slot) from the
1217                 * time the xact was put into the message queue until we
1218                 * wake up the requestor.
1219                 */
1220
1221                if (RPCIOD_RX_EVENT & events) {
1222
1223#if (DEBUG) & DEBUG_EVENTS
1224                        fprintf(stderr,"RPCIO: got RX event\n");
1225#endif
1226
1227                        while ((xact=sockRcv())) {
1228
1229                                /* extract from the retransmission list */
1230                                nodeXtract(&xact->node);
1231
1232                                /* change the ID - there might already be
1233                                 * a retransmission on the way. When it's
1234                                 * reply arrives we must not find it's ID
1235                                 * in the hashtable
1236                                 */
1237                                xact->obuf.xid        += XACT_HASHS;
1238
1239                                xact->status.re_status = RPC_SUCCESS;
1240
1241                                /* calculate roundtrip ticks */
1242                                xact->trip             = now - xact->trip;
1243
1244                                srv                    = xact->server;
1245
1246                                /* adjust the server's retry period */
1247                                {
1248                                        register TimeoutT rtry = srv->retry_period;
1249                                        register TimeoutT trip = xact->trip;
1250
1251                                        ASSERT( trip >= 0 );
1252
1253                                        if ( 0==trip )
1254                                                trip = 1;
1255
1256                                        /* retry_new = 0.75*retry_old + 0.25 * 8 * roundrip */
1257                                        rtry   = (3*rtry + (trip << 3)) >> 2;
1258
1259                                        if ( rtry > max_period )
1260                                                rtry = max_period;
1261
1262                                        srv->retry_period = rtry;
1263                                }
1264
1265                                /* wakeup requestor */
1266                                rtems_event_send(xact->requestor, RTEMS_RPC_EVENT);
1267                        }
1268                }
1269
1270                if (RPCIOD_TX_EVENT & events) {
1271
1272#if (DEBUG) & DEBUG_EVENTS
1273                        fprintf(stderr,"RPCIO: got TX event\n");
1274#endif
1275
1276                        while (RTEMS_SUCCESSFUL == rtems_message_queue_receive(
1277                                                                                        msgQ,
1278                                                                                        &xact,
1279                                                                                        &size,
1280                                                                                        RTEMS_NO_WAIT,
1281                                                                                        RTEMS_NO_TIMEOUT)) {
1282                                /* put to the head of timeout q */
1283                                nodeAppend(&listHead, &xact->node);
1284
1285                                xact->age  = now;
1286                                xact->trip = FIRST_ATTEMPT;
1287                        }
1288                }
1289
1290
1291                /* work the timeout q */
1292                newList = 0;
1293                for ( xact=(RpcUdpXact)listHead.next;
1294                          xact && xact->age <= now;
1295                          xact=(RpcUdpXact)listHead.next ) {
1296
1297                                /* extract from the list */
1298                                nodeXtract(&xact->node);
1299
1300                                srv = xact->server;
1301
1302                                if (xact->tolive < 0) {
1303                                        /* this one timed out */
1304                                        xact->status.re_errno  = ETIMEDOUT;
1305                                        xact->status.re_status = RPC_TIMEDOUT;
1306
1307                                        srv->timeouts++;
1308
1309                                        /* Change the ID - there might still be
1310                                         * a reply on the way. When it arrives we
1311                                         * must not find it's ID in the hash table
1312                                         *
1313                                         * Thanks to Steven Johnson for hunting this
1314                                         * one down.
1315                                         */
1316                                        xact->obuf.xid        += XACT_HASHS;
1317
1318#if (DEBUG) & DEBUG_TIMEOUT
1319                                        fprintf(stderr,"RPCIO XACT timed out; waking up requestor\n");
1320#endif
1321                                        if ( rtems_event_send(xact->requestor, RTEMS_RPC_EVENT) ) {
1322                                                rtems_panic("RPCIO PANIC file %s line: %i, requestor id was 0x%08x",
1323                                                                        __FILE__,
1324                                                                        __LINE__,
1325                                                                        xact->requestor);
1326                                        }
1327
1328                                } else {
1329                                        int len;
1330
1331                                        len = (int)XDR_GETPOS(&xact->xdrs);
1332
1333#ifdef MBUF_TX
1334                                        xact->refcnt = 1;       /* sendto itself */
1335#endif
1336                                        if ( len != SENDTO( ourSock,
1337                                                                                xact->obuf.buf,
1338                                                                                len,
1339                                                                                0,
1340                                                                                &srv->addr.sa,
1341                                                                                sizeof(srv->addr.sin)
1342#ifdef MBUF_TX
1343                                                                                , xact,
1344                                                                                paranoia_free,
1345                                                                                paranoia_ref
1346#endif
1347                                                                                ) ) {
1348
1349                                                xact->status.re_errno  = errno;
1350                                                xact->status.re_status = RPC_CANTSEND;
1351                                                srv->errors++;
1352
1353                                                /* wakeup requestor */
1354                                                fprintf(stderr,"RPCIO: SEND failure\n");
1355                                                status = rtems_event_send(xact->requestor, RTEMS_RPC_EVENT);
1356                                                assert( status == RTEMS_SUCCESSFUL );
1357
1358                                        } else {
1359                                                /* send successful; calculate retransmission time
1360                                                 * and enqueue to temporary list
1361                                                 */
1362                                                if (FIRST_ATTEMPT != xact->trip) {
1363#if (DEBUG) & DEBUG_TIMEOUT
1364                                                        fprintf(stderr,
1365                                                                "timed out; tolive is %i (ticks), retry period is %i (ticks)\n",
1366                                                                        xact->tolive,
1367                                                                        srv->retry_period);
1368#endif
1369                                                        /* this is a real retry; we backup
1370                                                         * the server's retry interval
1371                                                         */
1372                                                        if ( srv->retry_period < max_period ) {
1373
1374                                                                /* If multiple transactions for this server
1375                                                                 * fail (e.g. because it died) this will
1376                                                                 * back-off very agressively (doubling
1377                                                                 * the retransmission period for every
1378                                                                 * timed out transaction up to the CAP limit)
1379                                                                 * which is desirable - single packet failure
1380                                                                 * is treated more gracefully by this algorithm.
1381                                                                 */
1382
1383                                                                srv->retry_period<<=1;
1384#if (DEBUG) & DEBUG_TIMEOUT
1385                                                                fprintf(stderr,
1386                                                                                "adjusted to; retry period %i\n",
1387                                                                                srv->retry_period);
1388#endif
1389                                                        } else {
1390                                                                /* never wait longer than RPCIOD_RETX_CAP_S seconds */
1391                                                                fprintf(stderr,
1392                                                                                "RPCIO: server '%s' not responding - still trying\n",
1393                                                                                srv->name);
1394                                                        }
1395                                                        if ( 0 == ++srv->retrans % 1000) {
1396                                                                fprintf(stderr,
1397                                                                                "RPCIO - statistics: already %li retries to server %s\n",
1398                                                                                srv->retrans,
1399                                                                                srv->name);
1400                                                        }
1401                                                } else {
1402                                                        srv->requests++;
1403                                                }
1404                                                xact->trip      = now;
1405                                                {
1406                                                long capped_period = srv->retry_period;
1407                                                        if ( xact->lifetime < capped_period )
1408                                                                capped_period = xact->lifetime;
1409                                                xact->age       = now + capped_period;
1410                                                xact->tolive   -= capped_period;
1411                                                }
1412                                                /* enqueue to the list of newly sent transactions */
1413                                                xact->node.next = newList;
1414                                                newList         = &xact->node;
1415#if (DEBUG) & DEBUG_TIMEOUT
1416                                                fprintf(stderr,
1417                                                                "XACT (0x%08x) age is 0x%x, now: 0x%x\n",
1418                                                                xact,
1419                                                                xact->age,
1420                                                                now);
1421#endif
1422                                        }
1423                                }
1424            }
1425
1426                /* insert the newly sent transactions into the
1427                 * sorted retransmission list
1428                 */
1429                for (; (xact = (RpcUdpXact)newList); ) {
1430                        register ListNode p,n;
1431                        newList = newList->next;
1432                        for ( p=&listHead; (n=p->next) && xact->age > ((RpcUdpXact)n)->age; p=n )
1433                                /* nothing else to do */;
1434                        nodeAppend(p, &xact->node);
1435                }
1436
1437                if (now > epoch) {
1438                        /* every now and then, readjust the epoch */
1439                        register ListNode n;
1440                        then += now;
1441                        for (n=listHead.next; n; n=n->next) {
1442                                /* readjust outstanding time intervals subject to the
1443                                 * condition that the 'absolute' time must remain
1444                                 * the same. 'age' and 'trip' are measured with
1445                                 * respect to 'then' - hence:
1446                                 *
1447                                 * abs_age == old_age + old_then == new_age + new_then
1448                                 *
1449                                 * ==> new_age = old_age + old_then - new_then == old_age - 'now'
1450                                 */
1451                                ((RpcUdpXact)n)->age  -= now;
1452                                ((RpcUdpXact)n)->trip -= now;
1453#if (DEBUG) & DEBUG_TIMEOUT
1454                                fprintf(stderr,
1455                                                "readjusted XACT (0x%08x); age is 0x%x, trip: 0x%x now: 0x%x\n",
1456                                                (RpcUdpXact)n,
1457                                                ((RpcUdpXact)n)->trip,
1458                                                ((RpcUdpXact)n)->age,
1459                                                now);
1460#endif
1461                        }
1462                        now = 0;
1463                }
1464
1465                next_retrans = listHead.next ?
1466                                                        ((RpcUdpXact)listHead.next)->age - now :
1467                                                        epoch;  /* make sure we don't miss updating the epoch */
1468#if (DEBUG) & DEBUG_TIMEOUT
1469                fprintf(stderr,"RPCIO: next timeout is %x\n",next_retrans);
1470#endif
1471        }
1472        /* close our socket; shut down the receiver */
1473        close(ourSock);
1474
1475#if 0 /* if we get here, no transactions exist, hence there can be none
1476           * in the queue whatsoever
1477           */
1478        /* flush the message queue */
1479        while (RTEMS_SUCCESSFUL == rtems_message_queue_receive(
1480                                                                                q,
1481                                                                                &xact,
1482                                                                                &size,
1483                                                                                RTEMS_NO_WAIT,
1484                                                                                RTEMS_NO_TIMEOUT)) {
1485                        /* TODO enque xact */
1486        }
1487
1488        /* flush all outstanding transactions */
1489
1490        for (xact=((RpcUdpXact)listHead.next); xact; xact=((RpcUdpXact)xact->node.next)) {
1491                        xact->status.re_status = RPC_TIMEDOUT;
1492                        rtems_event_send(xact->requestor, RTEMS_RPC_EVENT);
1493        }
1494#endif
1495
1496        rtems_message_queue_delete(q);
1497
1498        MU_DESTROY(hlock);
1499
1500        fprintf(stderr,"RPC daemon exited...\n");
1501
1502        rtems_semaphore_release(fini);
1503        rtems_task_suspend(RTEMS_SELF);
1504}
1505
1506
1507/* support for transaction 'pools'. A number of XACT objects
1508 * is always kept around. The initial number is 0 but it
1509 * is allowed to grow up to a maximum.
1510 * If the need grows beyond the maximum, behavior depends:
1511 * Users can either block until a transaction becomes available,
1512 * they can create a new XACT on the fly or get an error
1513 * if no free XACT is available from the pool.
1514 */
1515
1516RpcUdpXactPool
1517rpcUdpXactPoolCreate(
1518        rpcprog_t prog,                 rpcvers_t version,
1519        int xactsize,   int poolsize)
1520{
1521RpcUdpXactPool  rval = MY_MALLOC(sizeof(*rval));
1522rtems_status_code       status;
1523
1524        ASSERT( rval );
1525        status = rtems_message_queue_create(
1526                                        rtems_build_name('R','P','C','p'),
1527                                        poolsize,
1528                                        sizeof(RpcUdpXact),
1529                                        RTEMS_DEFAULT_ATTRIBUTES,
1530                                        &rval->box);
1531        assert( status == RTEMS_SUCCESSFUL );
1532
1533        rval->prog     = prog;
1534        rval->version  = version;
1535        rval->xactSize = xactsize;
1536        return rval;
1537}
1538
1539void
1540rpcUdpXactPoolDestroy(RpcUdpXactPool pool)
1541{
1542RpcUdpXact xact;
1543
1544        while ((xact = rpcUdpXactPoolGet(pool, XactGetFail))) {
1545                rpcUdpXactDestroy(xact);
1546        }
1547        rtems_message_queue_delete(pool->box);
1548        MY_FREE(pool);
1549}
1550
1551RpcUdpXact
1552rpcUdpXactPoolGet(RpcUdpXactPool pool, XactPoolGetMode mode)
1553{
1554RpcUdpXact       xact = 0;
1555size_t           size;
1556
1557        if (RTEMS_SUCCESSFUL != rtems_message_queue_receive(
1558                                                                pool->box,
1559                                                                &xact,
1560                                                                &size,
1561                                                                XactGetWait == mode ?
1562                                                                        RTEMS_WAIT : RTEMS_NO_WAIT,
1563                                                                RTEMS_NO_TIMEOUT)) {
1564
1565                /* nothing found in box; should we create a new one ? */
1566
1567                xact = (XactGetCreate == mode) ?
1568                                        rpcUdpXactCreate(
1569                                                        pool->prog,
1570                                                        pool->version,
1571                                                        pool->xactSize) : 0 ;
1572                if (xact)
1573                                xact->pool = pool;
1574
1575        }
1576        return xact;
1577}
1578
1579void
1580rpcUdpXactPoolPut(RpcUdpXact xact)
1581{
1582RpcUdpXactPool pool;
1583
1584        pool = xact->pool;
1585        ASSERT( pool );
1586
1587        if (RTEMS_SUCCESSFUL != rtems_message_queue_send(
1588                                                                pool->box,
1589                                                                &xact,
1590                                                                sizeof(xact)))
1591                rpcUdpXactDestroy(xact);
1592}
1593
1594#ifdef MBUF_RX
1595
1596/* WORKAROUND: include sys/mbuf.h (or other bsdnet headers) only
1597 *             _after_ using malloc()/free() & friends because
1598 *             the RTEMS/BSDNET headers redefine those :-(
1599 */
1600
1601#define _KERNEL
1602#include <sys/mbuf.h>
1603
1604ssize_t
1605recv_mbuf_from(int s, struct mbuf **ppm, long len, struct sockaddr *fromaddr, int *fromlen);
1606
1607static void
1608bufFree(struct mbuf **m)
1609{
1610        if (*m) {
1611                rtems_bsdnet_semaphore_obtain();
1612                m_freem(*m);
1613                rtems_bsdnet_semaphore_release();
1614                *m = 0;
1615        }
1616}
1617#endif
1618
1619#ifdef MBUF_TX
1620static void
1621paranoia_free(caddr_t closure, u_int size)
1622{
1623#if (DEBUG)
1624RpcUdpXact xact = (RpcUdpXact)closure;
1625int        len  = (int)XDR_GETPOS(&xact->xdrs);
1626
1627        ASSERT( --xact->refcnt >= 0 && size == len );
1628#endif
1629}
1630
1631static void
1632paranoia_ref (caddr_t closure, u_int size)
1633{
1634#if (DEBUG)
1635RpcUdpXact xact = (RpcUdpXact)closure;
1636int        len  = (int)XDR_GETPOS(&xact->xdrs);
1637        ASSERT( size == len );
1638        xact->refcnt++;
1639#endif
1640}
1641#endif
1642
1643/* receive from a socket and find
1644 * the transaction corresponding to the
1645 * transaction ID received in the server
1646 * reply.
1647 *
1648 * The semantics of the 'pibuf' pointer are
1649 * as follows:
1650 *
1651 * MBUF_RX:
1652 *
1653 */
1654
1655#define RPCIOD_RXBUFSZ  UDPMSGSIZE
1656
1657static RpcUdpXact
1658sockRcv(void)
1659{
1660int                                     len,i;
1661u_long                          xid;
1662union {
1663        struct sockaddr_in      sin;
1664        struct sockaddr     sa;
1665}                                       fromAddr;
1666int                                     fromLen  = sizeof(fromAddr.sin);
1667RxBuf                           ibuf     = 0;
1668RpcUdpXact                      xact     = 0;
1669
1670        do {
1671
1672        /* rcv_mbuf() and recvfrom() differ in that the
1673         * former allocates buffers and passes them back
1674         * to us whereas the latter requires us to provide
1675         * buffer space.
1676         * Hence, in the first case whe have to make sure
1677         * no old buffer is leaked - in the second case,
1678         * we might well re-use an old buffer but must
1679         * make sure we have one allocated
1680         */
1681#ifdef MBUF_RX
1682        if (ibuf)
1683                bufFree(&ibuf);
1684
1685        len  = recv_mbuf_from(
1686                                        ourSock,
1687                                        &ibuf,
1688                                        RPCIOD_RXBUFSZ,
1689                                    &fromAddr.sa,
1690                                    &fromLen);
1691#else
1692        if ( !ibuf )
1693                ibuf = (RpcBuf)MY_MALLOC(RPCIOD_RXBUFSZ);
1694        if ( !ibuf )
1695                goto cleanup; /* no memory - drop this message */
1696
1697        len  = recvfrom(ourSock,
1698                                    ibuf->buf,
1699                                    RPCIOD_RXBUFSZ,
1700                                    0,
1701                                    &fromAddr.sa,
1702                                        &fromLen);
1703#endif
1704
1705        if (len <= 0) {
1706                if (EAGAIN != errno)
1707                        fprintf(stderr,"RECV failed: %s\n",strerror(errno));
1708                goto cleanup;
1709        }
1710
1711#if (DEBUG) & DEBUG_PACKLOSS
1712        if ( (unsigned)rand() < DEBUG_PACKLOSS_FRACT ) {
1713                /* lose packets once in a while */
1714                static int xxx = 0;
1715                if ( ++xxx % 16 == 0 )
1716                        fprintf(stderr,"DEBUG: dropped %i packets, so far...\n",xxx);
1717                if ( ibuf )
1718                        bufFree( &ibuf );
1719                continue;
1720        }
1721#endif
1722
1723        i = (xid=XID(ibuf)) & XACT_HASH_MSK;
1724
1725        if ( !(xact=xactHashTbl[i])                                             ||
1726                   xact->obuf.xid                     != xid                        ||
1727#ifdef REJECT_SERVERIP_MISMATCH
1728                   xact->server->addr.sin.sin_addr.s_addr != fromAddr.sin.sin_addr.s_addr       ||
1729#endif
1730                   xact->server->addr.sin.sin_port        != fromAddr.sin.sin_port ) {
1731
1732                if (xact) {
1733                        if (
1734#ifdef REJECT_SERVERIP_MISMATCH
1735                            xact->server->addr.sin.sin_addr.s_addr == fromAddr.sin.sin_addr.s_addr &&
1736#endif
1737                        xact->server->addr.sin.sin_port        == fromAddr.sin.sin_port        &&
1738                            ( xact->obuf.xid                   == xid + XACT_HASHS   ||
1739                                  xact->obuf.xid                   == xid + 2*XACT_HASHS    )
1740                                ) {
1741#ifndef DEBUG /* don't complain if it's just a late arrival of a retry */
1742                        fprintf(stderr,"RPCIO - FYI sockRcv(): dropping late/redundant retry answer\n");
1743#endif
1744                        } else {
1745                        fprintf(stderr,"RPCIO WARNING sockRcv(): transaction mismatch\n");
1746                        fprintf(stderr,"xact: xid  0x%08lx  -- got 0x%08lx\n",
1747                                                        xact->obuf.xid, xid);
1748                        fprintf(stderr,"xact: addr 0x%08lx  -- got 0x%08lx\n",
1749                                                        xact->server->addr.sin.sin_addr.s_addr,
1750                                                        fromAddr.sin.sin_addr.s_addr);
1751                        fprintf(stderr,"xact: port 0x%08x  -- got 0x%08x\n",
1752                                                        xact->server->addr.sin.sin_port,
1753                                                        fromAddr.sin.sin_port);
1754                        }
1755                } else {
1756                        fprintf(stderr,
1757                                        "RPCIO WARNING sockRcv(): got xid 0x%08lx but its slot is empty\n",
1758                                        xid);
1759                }
1760                /* forget about this one and try again */
1761                xact = 0;
1762        }
1763
1764        } while ( !xact );
1765
1766        xact->ibuf     = ibuf;
1767#ifndef MBUF_RX
1768        xact->ibufsize = RPCIOD_RXBUFSZ;
1769#endif
1770
1771        return xact;
1772
1773cleanup:
1774
1775        bufFree(&ibuf);
1776
1777        return 0;
1778}
1779
1780
1781#include <rtems/rtems_bsdnet_internal.h>
1782/* double check the event configuration; should probably globally
1783 * manage system events!!
1784 * We do this at the end of the file for the same reason we had
1785 * included mbuf.h only a couple of lines above - see comment up
1786 * there...
1787 */
1788#if RTEMS_RPC_EVENT & SOSLEEP_EVENT & SBWAIT_EVENT & NETISR_EVENTS
1789#error ILLEGAL EVENT CONFIGURATION
1790#endif
Note: See TracBrowser for help on using the repository browser.