source: rtems/cpukit/libfs/src/nfsclient/src/rpcio.c @ 36ae561

4.115
Last change on this file since 36ae561 was 36ae561, checked in by Sebastian Huber <sebastian.huber@…>, on 06/04/13 at 13:51:35

nfsclient: Add rpcSetXIDs()

  • Property mode set to 100644
File size: 44.6 KB
Line 
1/**
2 * @file
3 *
4 * @brief RPC Multiplexor for a Multitasking Environment
5 * @ingroup libfs
6 *
7 * This code funnels arbitrary task's UDP/RPC requests
8 * through one socket to arbitrary servers.
9 * The replies are gathered and dispatched to the
10 * requestors.
11 * One task handles all the sending and receiving
12 * work including retries.
13 * It is up to the requestor, however, to do
14 * the XDR encoding of the arguments / decoding
15 * of the results (except for the RPC header which
16 * is handled by the daemon).
17 */
18
19/*
20 * Author: Till Straumann <strauman@slac.stanford.edu>, 2002
21 *
22 * Authorship
23 * ----------
24 * This software (NFS-2 client implementation for RTEMS) was created by
25 *     Till Straumann <strauman@slac.stanford.edu>, 2002-2007,
26 *         Stanford Linear Accelerator Center, Stanford University.
27 *
28 * Acknowledgement of sponsorship
29 * ------------------------------
30 * The NFS-2 client implementation for RTEMS was produced by
31 *     the Stanford Linear Accelerator Center, Stanford University,
32 *         under Contract DE-AC03-76SFO0515 with the Department of Energy.
33 *
34 * Government disclaimer of liability
35 * ----------------------------------
36 * Neither the United States nor the United States Department of Energy,
37 * nor any of their employees, makes any warranty, express or implied, or
38 * assumes any legal liability or responsibility for the accuracy,
39 * completeness, or usefulness of any data, apparatus, product, or process
40 * disclosed, or represents that its use would not infringe privately owned
41 * rights.
42 *
43 * Stanford disclaimer of liability
44 * --------------------------------
45 * Stanford University makes no representations or warranties, express or
46 * implied, nor assumes any liability for the use of this software.
47 *
48 * Stanford disclaimer of copyright
49 * --------------------------------
50 * Stanford University, owner of the copyright, hereby disclaims its
51 * copyright and all other rights in this software.  Hence, anyone may
52 * freely use it for any purpose without restriction.
53 *
54 * Maintenance of notices
55 * ----------------------
56 * In the interest of clarity regarding the origin and status of this
57 * SLAC software, this and all the preceding Stanford University notices
58 * are to remain affixed to any copy or derivative of this software made
59 * or distributed by the recipient and are to be affixed to any copy of
60 * software made or distributed by the recipient that contains a copy or
61 * derivative of this software.
62 *
63 * ------------------ SLAC Software Notices, Set 4 OTT.002a, 2004 FEB 03
64 */
65
66#if HAVE_CONFIG_H
67#include "config.h"
68#endif
69
70#include <inttypes.h>
71
72#include <rtems.h>
73#include <rtems/error.h>
74#include <rtems/rtems_bsdnet.h>
75#include <stdlib.h>
76#include <time.h>
77#include <rpc/rpc.h>
78#include <rpc/pmap_prot.h>
79#include <errno.h>
80#include <sys/ioctl.h>
81#include <assert.h>
82#include <stdio.h>
83#include <errno.h>
84#include <string.h>
85#include <netinet/in.h>
86#include <arpa/inet.h>
87
88#include "rpcio.h"
89
90/****************************************************************/
91/* CONFIGURABLE PARAMETERS                                      */
92/****************************************************************/
93
94#define MBUF_RX                 /* If defined: use mbuf XDR stream for
95                                                 *  decoding directly out of mbufs
96                                                 *  Otherwise, the regular 'recvfrom()'
97                                                 *  interface will be used involving an
98                                                 *  extra buffer allocation + copy step.
99                                                 */
100
101#define MBUF_TX                 /* If defined: avoid copying data when
102                                                 *  sending. Instead, use a wrapper to
103                                                 *  'sosend()' which will point an MBUF
104                                                 *  directly to our buffer space.
105                                                 *  Note that the BSD stack does not copy
106                                                 *  data when fragmenting packets - it
107                                                 *  merely uses an mbuf chain pointing
108                                                 *  into different areas of the data.
109                                                 *
110                                                 * If undefined, the regular 'sendto()'
111                                                 *  interface is used.
112                                                 */
113
114#undef REJECT_SERVERIP_MISMATCH
115                                                /* If defined, RPC replies must come from the server
116                                                 * that was queried. Eric Norum has reported problems
117                                                 * with clustered NFS servers. So we disable this
118                                                 * reducing paranoia...
119                                                 */
120
121/* daemon task parameters */
122#define RPCIOD_STACK            10000
123#define RPCIOD_PRIO                     100     /* *fallback* priority */
124
125/* depth of the message queue for sending
126 * RPC requests to the daemon
127 */
128#define RPCIOD_QDEPTH           20
129
130/* Maximum retry limit for retransmission */
131#define RPCIOD_RETX_CAP_S       3 /* seconds */
132
133/* Default timeout for RPC calls */
134#define RPCIOD_DEFAULT_TIMEOUT  (&_rpc_default_timeout)
135static struct timeval _rpc_default_timeout = { 10 /* secs */, 0 /* usecs */ };
136
137/* how many times should we try to resend a failed
138 * transaction with refreshed AUTHs
139 */
140#define RPCIOD_REFRESH          2
141
142/* Events we are using; the RPC_EVENT
143 * MUST NOT be used by any application
144 * thread doing RPC IO (e.g. NFS)
145 */
146#define RTEMS_RPC_EVENT         RTEMS_EVENT_30  /* THE event used by RPCIO. Every task doing
147                                                                                         * RPC IO will receive this - hence it is
148                                                                                         * RESERVED
149                                                                                         */
150#define RPCIOD_RX_EVENT         RTEMS_EVENT_1   /* Events the RPCIOD is using/waiting for */
151#define RPCIOD_TX_EVENT         RTEMS_EVENT_2
152#define RPCIOD_KILL_EVENT       RTEMS_EVENT_3   /* send to the daemon to kill it          */
153
154#define LD_XACT_HASH            8                               /* ld of the size of the transaction hash table  */
155
156
157/* Debugging Flags                                              */
158
159/* NOTE: defining DEBUG 0 leaves some 'assert()' paranoia checks
160 *       but produces no output
161 */
162
163#define DEBUG_TRACE_XACT        (1<<0)
164#define DEBUG_EVENTS            (1<<1)
165#define DEBUG_MALLOC            (1<<2)
166#define DEBUG_TIMEOUT           (1<<3)
167#define DEBUG_PACKLOSS          (1<<4)  /* This introduces random, artificial packet losses to test retransmission */
168
169#define DEBUG_PACKLOSS_FRACT (0xffffffff/10)
170
171/* USE PARENTHESIS WHEN 'or'ing MULTIPLE FLAGS: (DEBUG_XX | DEBUG_YY) */
172#define DEBUG                           (0)
173
174/****************************************************************/
175/* END OF CONFIGURABLE SECTION                                  */
176/****************************************************************/
177
178/* prevent rollover of our timers by readjusting the epoch on the fly */
179#if     (DEBUG) & DEBUG_TIMEOUT
180#define RPCIOD_EPOCH_SECS       10
181#else
182#define RPCIOD_EPOCH_SECS       10000
183#endif
184
185#ifdef  DEBUG
186#define ASSERT(arg)                     assert(arg)
187#else
188#define ASSERT(arg)                     if (arg)
189#endif
190
191/****************************************************************/
192/* MACROS                                                       */
193/****************************************************************/
194
195
196#define XACT_HASHS              (1<<(LD_XACT_HASH))     /* the hash table size derived from the ld       */
197#define XACT_HASH_MSK   ((XACT_HASHS)-1)        /* mask to extract the hash index from a RPC-XID */
198
199
200#define MU_LOCK(mutex)          do {                                                    \
201                                                        assert(                                                 \
202                                                                RTEMS_SUCCESSFUL ==                     \
203                                                                rtems_semaphore_obtain(         \
204                                                                                (mutex),                        \
205                                                                                RTEMS_WAIT,                     \
206                                                                                RTEMS_NO_TIMEOUT        \
207                                                                                ) );                            \
208                                                        } while(0)
209
210#define MU_UNLOCK(mutex)        do {                                                    \
211                                                        assert(                                                 \
212                                                                RTEMS_SUCCESSFUL ==                     \
213                                                                rtems_semaphore_release(        \
214                                                                                (mutex)                         \
215                                                                                ) );                            \
216                                                        } while(0)
217
218#define MU_CREAT(pmutex)        do {                                                    \
219                                                        assert(                                                 \
220                                                                RTEMS_SUCCESSFUL ==                     \
221                                                                rtems_semaphore_create(         \
222                                                                                rtems_build_name(       \
223                                                                                        'R','P','C','l' \
224                                                                                        ),                              \
225                                                                                1,                                      \
226                                                                                MUTEX_ATTRIBUTES,       \
227                                                                                0,                                      \
228                                                                                (pmutex)) );            \
229                                                        } while (0)
230
231
232#define MU_DESTROY(mutex)       do {                                                    \
233                                                        assert(                                                 \
234                                                                RTEMS_SUCCESSFUL ==                     \
235                                                                rtems_semaphore_delete(         \
236                                                                                mutex                           \
237                                                                                ) );                            \
238                                                        } while (0)
239
240#define MUTEX_ATTRIBUTES        (RTEMS_LOCAL           |                \
241                                                        RTEMS_PRIORITY         |                \
242                                                        RTEMS_INHERIT_PRIORITY |                \
243                                                        RTEMS_BINARY_SEMAPHORE)
244
245#define FIRST_ATTEMPT           0x88888888 /* some time that is never reached */
246
247/****************************************************************/
248/* TYPE DEFINITIONS                                             */
249/****************************************************************/
250
251typedef rtems_interval          TimeoutT;
252
253/* 100000th implementation of a doubly linked list;
254 * since only one thread is looking at these,
255 * we need no locking
256 */
257typedef struct ListNodeRec_ {
258        struct ListNodeRec_ *next, *prev;
259} ListNodeRec, *ListNode;
260
261
262/* Structure representing an RPC server */
263typedef struct RpcUdpServerRec_ {
264                RpcUdpServer            next;                   /* linked list of all servers; protected by hlock */
265                union {
266                struct sockaddr_in      sin;
267                struct sockaddr     sa;
268                }                                       addr;
269                AUTH                            *auth;
270                rtems_id                        authlock;               /* must MUTEX the auth object - it's not clear
271                                                                                         *  what is better:
272                                                                                         *   1 having one (MUTEXed) auth per server
273                                                                                         *         who is shared among all transactions
274                                                                                         *         using that server
275                                                                                         *       2 maintaining an AUTH per transaction
276                                                                                         *         (there are then other options: manage
277                                                                                         *         XACT pools on a per-server basis instead
278                                                                                         *         of associating a server with a XACT when
279                                                                                         *   sending)
280                                                                                         * experience will show if the current (1)
281                                                                                         * approach has to be changed.
282                                                                                         */
283                TimeoutT                        retry_period;   /* dynamically adjusted retry period
284                                                                                         * (based on packet roundtrip time)
285                                                                                         */
286                /* STATISTICS */
287                unsigned long           retrans;                /* how many retries were issued by this server         */
288                unsigned long           requests;               /* how many requests have been sent                    */
289                unsigned long       timeouts;           /* how many requests have timed out                    */
290                unsigned long       errors;         /* how many errors have occurred (other than timeouts) */
291                char                            name[20];               /* server's address in IP 'dot' notation               */
292} RpcUdpServerRec;
293
294typedef union  RpcBufU_ {
295                uint32_t                        xid;
296                char                            buf[1];
297} RpcBufU, *RpcBuf;
298
299/* RX Buffer implementation; this is either
300 * an MBUF chain (MBUF_RX configuration)
301 * or a buffer allocated from the heap
302 * where recvfrom copies the (encoded) reply
303 * to. The XDR routines the copy/decode
304 * it into the user's data structures.
305 */
306#ifdef MBUF_RX
307typedef struct mbuf *           RxBuf;  /* an MBUF chain */
308static  void                            bufFree(struct mbuf **m);
309#define XID(ibuf)                       (*(mtod((ibuf), u_long *)))
310extern void                             xdrmbuf_create(XDR *, struct mbuf *, enum xdr_op);
311#else
312typedef RpcBuf                          RxBuf;
313#define bufFree(b)                      do { MY_FREE(*(b)); *(b)=0; } while(0)
314#define XID(ibuf)                       ((ibuf)->xid)
315#endif
316
317/* A RPC 'transaction' consisting
318 * of server and requestor information,
319 * buffer space and an XDR object
320 * (for encoding arguments).
321 */
322typedef struct RpcUdpXactRec_ {
323                ListNodeRec                     node;           /* so we can put XACTs on a list                */
324                RpcUdpServer            server;         /* server this XACT goes to                     */
325                long                            lifetime;       /* during the lifetime, retry attempts are made */
326                long                            tolive;         /* lifetime timer                               */
327                struct rpc_err          status;         /* RPC reply error status                       */
328                long                            age;            /* age info; needed to manage retransmission    */
329                long                            trip;           /* record round trip time in ticks              */
330                rtems_id                        requestor;      /* the task waiting for this XACT to complete   */
331                RpcUdpXactPool          pool;           /* if this XACT belong to a pool, this is it    */
332                XDR                                     xdrs;           /* argument encoder stream                      */
333                int                                     xdrpos;     /* stream position after the (permanent) header */
334                xdrproc_t                       xres;           /* reply decoder proc - TODO needn't be here    */
335                caddr_t                         pres;           /* reply decoded obj  - TODO needn't be here    */
336#ifndef MBUF_RX
337                int                                     ibufsize;       /* size of the ibuf (bytes)                     */
338#endif
339#ifdef  MBUF_TX
340                int                                     refcnt;         /* mbuf external storage reference count        */
341#endif
342                int                                     obufsize;       /* size of the obuf (bytes)                     */
343                RxBuf                           ibuf;           /* pointer to input buffer assigned by daemon   */
344                RpcBufU                         obuf;       /* output buffer (encoded args) APPENDED HERE   */
345} RpcUdpXactRec;
346
347typedef struct RpcUdpXactPoolRec_ {
348        rtems_id        box;
349        int                     prog;
350        int                     version;
351        int                     xactSize;
352} RpcUdpXactPoolRec;
353
354/* a global hash table where all 'living' transaction
355 * objects are registered.
356 * A number of bits in a transaction's XID maps 1:1 to
357 * an index in this table. Hence, the XACT matching
358 * an RPC/UDP reply packet can quickly be found
359 * The size of this table imposes a hard limit on the
360 * number of all created transactions in the system.
361 */
362static RpcUdpXact xactHashTbl[XACT_HASHS]={0};
363static u_long     xidUpper   [XACT_HASHS]={0};
364static unsigned   xidHashSeed            = 0 ;
365
366/* forward declarations */
367static RpcUdpXact
368sockRcv(void);
369
370static void
371rpcio_daemon(rtems_task_argument);
372
373#ifdef MBUF_TX
374ssize_t
375sendto_nocpy (
376                int s,
377                const void *buf, size_t buflen,
378                int flags,
379                const struct sockaddr *toaddr, int tolen,
380                void *closure,
381                void (*freeproc)(caddr_t, u_int),
382                void (*refproc)(caddr_t, u_int)
383);
384static void paranoia_free(caddr_t closure, u_int size);
385static void paranoia_ref (caddr_t closure, u_int size);
386#define SENDTO  sendto_nocpy
387#else
388#define SENDTO  sendto
389#endif
390
391static RpcUdpServer             rpcUdpServers = 0;      /* linked list of all servers; protected by llock */
392
393static int                              ourSock = -1;           /* the socket we are using for communication */
394static rtems_id                 rpciod  = 0;            /* task id of the RPC daemon                 */
395static rtems_id                 msgQ    = 0;            /* message queue where the daemon picks up
396                                                                                         * requests
397                                                                                         */
398#ifndef NDEBUG
399static rtems_id                 llock   = 0;            /* MUTEX protecting the server list */
400static rtems_id                 hlock   = 0;            /* MUTEX protecting the hash table and the list of servers */
401#endif
402static rtems_id                 fini    = 0;            /* a synchronization semaphore we use during
403                                                                                         * module cleanup / driver unloading
404                                                                                         */
405static rtems_interval   ticksPerSec;            /* cached system clock rate (WHO IS ASSUMED NOT
406                                                                                         * TO CHANGE)
407                                                                                         */
408
409rtems_task_priority             rpciodPriority = 0;
410
411#if (DEBUG) & DEBUG_MALLOC
412/* malloc wrappers for debugging */
413static int nibufs = 0;
414
415static inline void *MY_MALLOC(int s)
416{
417        if (s) {
418                void *rval;
419                MU_LOCK(hlock);
420                assert(nibufs++ < 2000);
421                MU_UNLOCK(hlock);
422                assert((rval = malloc(s)) != 0);
423                return rval;
424        }
425        return 0;
426}
427
428static inline void *MY_CALLOC(int n, int s)
429{
430        if (s) {
431                void *rval;
432                MU_LOCK(hlock);
433                assert(nibufs++ < 2000);
434                MU_UNLOCK(hlock);
435                assert((rval = calloc(n,s)) != 0);
436                return rval;
437        }
438        return 0;
439}
440
441
442static inline void MY_FREE(void *p)
443{
444        if (p) {
445                MU_LOCK(hlock);
446                nibufs--;
447                MU_UNLOCK(hlock);
448                free(p);
449        }
450}
451#else
452#define MY_MALLOC       malloc
453#define MY_CALLOC       calloc
454#define MY_FREE         free
455#endif
456
457static inline bool_t
458locked_marshal(RpcUdpServer s, XDR *xdrs)
459{
460bool_t rval;
461        MU_LOCK(s->authlock);
462        rval = AUTH_MARSHALL(s->auth, xdrs);
463        MU_UNLOCK(s->authlock);
464        return rval;
465}
466
467/* Locked operations on a server's auth object */
468static inline bool_t
469locked_validate(RpcUdpServer s, struct opaque_auth *v)
470{
471bool_t rval;
472        MU_LOCK(s->authlock);
473        rval = AUTH_VALIDATE(s->auth, v);
474        MU_UNLOCK(s->authlock);
475        return rval;
476}
477
478static inline bool_t
479locked_refresh(RpcUdpServer s)
480{
481bool_t rval;
482        MU_LOCK(s->authlock);
483        rval = AUTH_REFRESH(s->auth);
484        MU_UNLOCK(s->authlock);
485        return rval;
486}
487
488/* Create a server object
489 *
490 */
491enum clnt_stat
492rpcUdpServerCreate(
493        struct sockaddr_in      *paddr,
494        rpcprog_t               prog,
495        rpcvers_t               vers,
496        u_long                  uid,
497        u_long                  gid,
498        RpcUdpServer            *psrv
499        )
500{
501RpcUdpServer    rval;
502u_short                 port;
503char                    hname[MAX_MACHINE_NAME + 1];
504int                             theuid, thegid;
505int                             thegids[NGRPS];
506gid_t                   gids[NGROUPS];
507int                             len,i;
508AUTH                    *auth;
509enum clnt_stat  pmap_err;
510struct pmap             pmaparg;
511
512        if ( gethostname(hname, MAX_MACHINE_NAME) ) {
513                fprintf(stderr,
514                                "RPCIO - error: I have no hostname ?? (%s)\n",
515                                strerror(errno));
516                return RPC_UNKNOWNHOST;
517        }
518
519        if ( (len = getgroups(NGROUPS, gids) < 0 ) ) {
520                fprintf(stderr,
521                                "RPCIO - error: I unable to get group ids (%s)\n",
522                                strerror(errno));
523                return RPC_FAILED;
524        }
525
526        if ( len > NGRPS )
527                len = NGRPS;
528
529        for (i=0; i<len; i++)
530                thegids[i] = (int)gids[i];
531
532        theuid = (int) ((RPCIOD_DEFAULT_ID == uid) ? geteuid() : uid);
533        thegid = (int) ((RPCIOD_DEFAULT_ID == gid) ? getegid() : gid);
534
535        if ( !(auth = authunix_create(hname, theuid, thegid, len, thegids)) ) {
536                fprintf(stderr,
537                                "RPCIO - error: unable to create RPC AUTH\n");
538                return RPC_FAILED;
539        }
540
541        /* if they specified no port try to ask the portmapper */
542        if (!paddr->sin_port) {
543
544                paddr->sin_port = htons(PMAPPORT);
545
546        pmaparg.pm_prog = prog;
547        pmaparg.pm_vers = vers;
548        pmaparg.pm_prot = IPPROTO_UDP;
549        pmaparg.pm_port = 0;  /* not needed or used */
550
551
552                /* dont use non-reentrant pmap_getport ! */
553
554                pmap_err = rpcUdpCallRp(
555                                                paddr,
556                                                PMAPPROG,
557                                                PMAPVERS,
558                                                PMAPPROC_GETPORT,
559                                                xdr_pmap,
560                                                &pmaparg,
561                                                xdr_u_short,
562                                                &port,
563                                                uid,
564                                                gid,
565                                                0);
566
567                if ( RPC_SUCCESS != pmap_err ) {
568                        paddr->sin_port = 0;
569                        return pmap_err;
570                }
571
572                paddr->sin_port = htons(port);
573        }
574
575        if (0==paddr->sin_port) {
576                        return RPC_PROGNOTREGISTERED;
577        }
578
579        rval                            = (RpcUdpServer)MY_MALLOC(sizeof(*rval));
580        memset(rval, 0, sizeof(*rval));
581
582        if (!inet_ntop(AF_INET, &paddr->sin_addr, rval->name, sizeof(rval->name)))
583                sprintf(rval->name,"?.?.?.?");
584        rval->addr.sin          = *paddr;
585
586        /* start with a long retransmission interval - it
587         * will be adapted dynamically
588         */
589        rval->retry_period  = RPCIOD_RETX_CAP_S * ticksPerSec;
590
591        rval->auth                      = auth;
592
593        MU_CREAT( &rval->authlock );
594
595        /* link into list */
596        MU_LOCK( llock );
597        rval->next = rpcUdpServers;
598        rpcUdpServers = rval;
599        MU_UNLOCK( llock );
600
601        *psrv                           = rval;
602        return RPC_SUCCESS;
603}
604
605void
606rpcUdpServerDestroy(RpcUdpServer s)
607{
608RpcUdpServer prev;
609        if (!s)
610                return;
611        /* we should probably verify (but how?) that nobody
612         * (at least: no outstanding XACTs) is using this
613         * server;
614         */
615
616        /* remove from server list */
617        MU_LOCK(llock);
618        prev = rpcUdpServers;
619        if ( s == prev ) {
620                rpcUdpServers = s->next;
621        } else {
622                for ( ; prev ; prev = prev->next) {
623                        if (prev->next == s) {
624                                prev->next = s->next;
625                                break;
626                        }
627                }
628        }
629        MU_UNLOCK(llock);
630
631        /* MUST have found it */
632        assert(prev);
633
634        auth_destroy(s->auth);
635
636        MU_DESTROY(s->authlock);
637        MY_FREE(s);
638}
639
640int
641rpcUdpStats(FILE *f)
642{
643RpcUdpServer s;
644
645        if (!f) f = stdout;
646
647        fprintf(f,"RPCIOD statistics:\n");
648
649        MU_LOCK(llock);
650        for (s = rpcUdpServers; s; s=s->next) {
651                fprintf(f,"\nServer -- %s:\n", s->name);
652                fprintf(f,"  requests    sent: %10ld, retransmitted: %10ld\n",
653                                                s->requests, s->retrans);
654                fprintf(f,"         timed out: %10ld,   send errors: %10ld\n",
655                                                s->timeouts, s->errors);
656                fprintf(f,"  current retransmission interval: %dms\n",
657                                                (unsigned)(s->retry_period * 1000 / ticksPerSec) );
658        }
659        MU_UNLOCK(llock);
660
661        return 0;
662}
663
664RpcUdpXact
665rpcUdpXactCreate(
666        u_long  program,
667        u_long  version,
668        u_long  size
669        )
670{
671RpcUdpXact              rval=0;
672struct rpc_msg  header;
673register int    i,j;
674
675        if (!size)
676                size = UDPMSGSIZE;
677        /* word align */
678        size = (size + 3) & ~3;
679
680        rval = (RpcUdpXact)MY_CALLOC(1,sizeof(*rval) - sizeof(rval->obuf) + size);
681
682        if (rval) {
683
684                header.rm_xid             = 0;
685                header.rm_direction       = CALL;
686                header.rm_call.cb_rpcvers = RPC_MSG_VERSION;
687                header.rm_call.cb_prog    = program;
688                header.rm_call.cb_vers    = version;
689                xdrmem_create(&(rval->xdrs), rval->obuf.buf, size, XDR_ENCODE);
690
691                if (!xdr_callhdr(&(rval->xdrs), &header)) {
692                        MY_FREE(rval);
693                        return 0;
694                }
695                /* pick a free table slot and initialize the XID */
696                MU_LOCK(hlock);
697                rval->obuf.xid = (xidHashSeed++ ^ ((uintptr_t)rval>>10)) & XACT_HASH_MSK;
698                i=j=(rval->obuf.xid & XACT_HASH_MSK);
699                if (msgQ) {
700                        /* if there's no message queue, refuse to
701                         * give them transactions; we might be in the process to
702                         * go away...
703                         */
704                        do {
705                                i=(i+1) & XACT_HASH_MSK; /* cheap modulo */
706                                if (!xactHashTbl[i]) {
707#if (DEBUG) & DEBUG_TRACE_XACT
708                                        fprintf(stderr,"RPCIO: entering index %i, val %x\n",i,rval);
709#endif
710                                        xactHashTbl[i]=rval;
711                                        j=-1;
712                                        break;
713                                }
714                        } while (i!=j);
715                }
716                MU_UNLOCK(hlock);
717                if (i==j) {
718                        XDR_DESTROY(&rval->xdrs);
719                        MY_FREE(rval);
720                        return 0;
721                }
722                rval->obuf.xid  = xidUpper[i] | i;
723                rval->xdrpos    = XDR_GETPOS(&(rval->xdrs));
724                rval->obufsize  = size;
725        }
726        return rval;
727}
728
729void
730rpcUdpXactDestroy(RpcUdpXact xact)
731{
732int i = xact->obuf.xid & XACT_HASH_MSK;
733
734#if (DEBUG) & DEBUG_TRACE_XACT
735                fprintf(stderr,"RPCIO: removing index %i, val %x\n",i,xact);
736#endif
737
738                ASSERT( xactHashTbl[i]==xact );
739
740                MU_LOCK(hlock);
741                xactHashTbl[i]=0;
742                /* remember XID we used last time so we can avoid
743                 * reusing the same one (incremented by rpcUdpSend routine)
744                 */
745                xidUpper[i]   = xact->obuf.xid;
746                MU_UNLOCK(hlock);
747
748                bufFree(&xact->ibuf);
749
750                XDR_DESTROY(&xact->xdrs);
751                MY_FREE(xact);
752}
753
754
755
756/* Send a transaction, i.e. enqueue it to the
757 * RPC daemon who will actually send it.
758 */
759enum clnt_stat
760rpcUdpSend(
761        RpcUdpXact              xact,
762        RpcUdpServer    srvr,
763        struct timeval  *timeout,
764        u_long                  proc,
765        xdrproc_t               xres, caddr_t pres,
766        xdrproc_t               xargs, caddr_t pargs,
767        ...
768   )
769{
770register XDR    *xdrs;
771unsigned long   ms;
772va_list                 ap;
773
774        va_start(ap,pargs);
775
776        if (!timeout)
777                timeout = RPCIOD_DEFAULT_TIMEOUT;
778
779        ms = 1000 * timeout->tv_sec + timeout->tv_usec/1000;
780
781        /* round lifetime to closest # of ticks */
782        xact->lifetime  = (ms * ticksPerSec + 500) / 1000;
783        if ( 0 == xact->lifetime )
784                xact->lifetime = 1;
785
786#if (DEBUG) & DEBUG_TIMEOUT
787        {
788        static int once=0;
789        if (!once++) {
790                fprintf(stderr,
791                                "Initial lifetime: %i (ticks)\n",
792                                xact->lifetime);
793        }
794        }
795#endif
796
797        xact->tolive    = xact->lifetime;
798
799        xact->xres      = xres;
800        xact->pres      = pres;
801        xact->server    = srvr;
802
803        xdrs            = &xact->xdrs;
804        xdrs->x_op      = XDR_ENCODE;
805        /* increment transaction ID */
806        xact->obuf.xid += XACT_HASHS;
807        XDR_SETPOS(xdrs, xact->xdrpos);
808        if ( !XDR_PUTLONG(xdrs,(long*)&proc) || !locked_marshal(srvr, xdrs) ||
809                 !xargs(xdrs, pargs) ) {
810                va_end(ap);
811                return(xact->status.re_status=RPC_CANTENCODEARGS);
812        }
813        while ((xargs=va_arg(ap,xdrproc_t))) {
814                if (!xargs(xdrs, va_arg(ap,caddr_t)))
815                va_end(ap);
816                return(xact->status.re_status=RPC_CANTENCODEARGS);
817        }
818
819        va_end(ap);
820
821        rtems_task_ident(RTEMS_SELF, RTEMS_WHO_AM_I, &xact->requestor);
822        if ( rtems_message_queue_send( msgQ, &xact, sizeof(xact)) ) {
823                return RPC_CANTSEND;
824        }
825        /* wakeup the rpciod */
826        ASSERT( RTEMS_SUCCESSFUL==rtems_event_send(rpciod, RPCIOD_TX_EVENT) );
827
828        return RPC_SUCCESS;
829}
830
831/* Block for the RPC reply to an outstanding
832 * transaction.
833 * The caller is woken by the RPC daemon either
834 * upon reception of the reply or on timeout.
835 */
836enum clnt_stat
837rpcUdpRcv(RpcUdpXact xact)
838{
839int                                     refresh;
840XDR                     reply_xdrs;
841struct rpc_msg          reply_msg;
842rtems_status_code       status;
843rtems_event_set         gotEvents;
844
845        refresh = 0;
846
847        do {
848
849        /* block for the reply */
850        status = rtems_event_receive(
851                RTEMS_RPC_EVENT,
852                RTEMS_WAIT | RTEMS_EVENT_ANY,
853                RTEMS_NO_TIMEOUT,
854                &gotEvents);
855        ASSERT( status == RTEMS_SUCCESSFUL );
856
857        if (xact->status.re_status) {
858#ifdef MBUF_RX
859                /* add paranoia */
860                ASSERT( !xact->ibuf );
861#endif
862                return xact->status.re_status;
863        }
864
865#ifdef MBUF_RX
866        xdrmbuf_create(&reply_xdrs, xact->ibuf, XDR_DECODE);
867#else
868        xdrmem_create(&reply_xdrs, xact->ibuf->buf, xact->ibufsize, XDR_DECODE);
869#endif
870
871        reply_msg.acpted_rply.ar_verf          = _null_auth;
872        reply_msg.acpted_rply.ar_results.where = xact->pres;
873        reply_msg.acpted_rply.ar_results.proc  = xact->xres;
874
875        if (xdr_replymsg(&reply_xdrs, &reply_msg)) {
876                /* OK */
877                _seterr_reply(&reply_msg, &xact->status);
878                if (RPC_SUCCESS == xact->status.re_status) {
879                        if ( !locked_validate(xact->server,
880                                                                &reply_msg.acpted_rply.ar_verf) ) {
881                                xact->status.re_status = RPC_AUTHERROR;
882                                xact->status.re_why    = AUTH_INVALIDRESP;
883                        }
884                        if (reply_msg.acpted_rply.ar_verf.oa_base) {
885                                reply_xdrs.x_op = XDR_FREE;
886                                xdr_opaque_auth(&reply_xdrs, &reply_msg.acpted_rply.ar_verf);
887                        }
888                        refresh = 0;
889                } else {
890                        /* should we try to refresh our credentials ? */
891                        if ( !refresh ) {
892                                /* had never tried before */
893                                refresh = RPCIOD_REFRESH;
894                        }
895                }
896        } else {
897                reply_xdrs.x_op        = XDR_FREE;
898                xdr_replymsg(&reply_xdrs, &reply_msg);
899                xact->status.re_status = RPC_CANTDECODERES;
900        }
901        XDR_DESTROY(&reply_xdrs);
902
903        bufFree(&xact->ibuf);
904
905#ifndef MBUF_RX
906        xact->ibufsize = 0;
907#endif
908
909        if (refresh && locked_refresh(xact->server)) {
910                rtems_task_ident(RTEMS_SELF, RTEMS_WHO_AM_I, &xact->requestor);
911                if ( rtems_message_queue_send(msgQ, &xact, sizeof(xact)) ) {
912                        return RPC_CANTSEND;
913                }
914                /* wakeup the rpciod */
915                fprintf(stderr,"RPCIO INFO: refreshing my AUTH\n");
916                ASSERT( RTEMS_SUCCESSFUL==rtems_event_send(rpciod, RPCIOD_TX_EVENT) );
917        }
918
919        } while ( 0 &&  refresh-- > 0 );
920
921        return xact->status.re_status;
922}
923
924
925/* On RTEMS, I'm told to avoid select(); this seems to
926 * be more efficient
927 */
928static void
929rxWakeupCB(struct socket *sock, void *arg)
930{
931  rtems_id *rpciod = (rtems_id*) arg;
932  rtems_event_send(*rpciod, RPCIOD_RX_EVENT);
933}
934
935void
936rpcSetXIDs(uint32_t xid)
937{
938        uint32_t i;
939
940        xid &= ~XACT_HASH_MSK;
941
942        for (i = 0; i < XACT_HASHS; ++i) {
943                xidUpper[i] = xid | i;
944        }
945}
946
947int
948rpcUdpInit(void)
949{
950int                     s;
951rtems_status_code       status;
952int                     noblock = 1;
953struct sockwakeup       wkup;
954
955        if (ourSock < 0) {
956    fprintf(stderr,"RTEMS-RPCIOD $Release$, " \
957            "Till Straumann, Stanford/SLAC/SSRL 2002, " \
958            "See LICENSE file for licensing info.\n");
959
960                ourSock=socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
961                if (ourSock>=0) {
962                        bindresvport(ourSock,(struct sockaddr_in*)0);
963                        s = ioctl(ourSock, FIONBIO, (char*)&noblock);
964                        assert( s == 0 );
965                        /* assume nobody tampers with the clock !! */
966                        ticksPerSec = rtems_clock_get_ticks_per_second();
967                        MU_CREAT( &hlock );
968                        MU_CREAT( &llock );
969
970                        if ( !rpciodPriority ) {
971                                /* use configured networking priority */
972                                if ( ! (rpciodPriority = rtems_bsdnet_config.network_task_priority) )
973                                        rpciodPriority = RPCIOD_PRIO;   /* fallback value */
974                        }
975
976                        status = rtems_task_create(
977                                                                                        rtems_build_name('R','P','C','d'),
978                                                                                        rpciodPriority,
979                                                                                        RPCIOD_STACK,
980                                                                                        RTEMS_DEFAULT_MODES,
981                                                                                        /* fprintf saves/restores FP registers on PPC :-( */
982                                                                                        RTEMS_DEFAULT_ATTRIBUTES | RTEMS_FLOATING_POINT,
983                                                                                        &rpciod);
984                        assert( status == RTEMS_SUCCESSFUL );
985
986                        wkup.sw_pfn = rxWakeupCB;
987                        wkup.sw_arg = &rpciod;
988                        assert( 0==setsockopt(ourSock, SOL_SOCKET, SO_RCVWAKEUP, &wkup, sizeof(wkup)) );
989                        status = rtems_message_queue_create(
990                                                                                        rtems_build_name('R','P','C','q'),
991                                                                                        RPCIOD_QDEPTH,
992                                                                                        sizeof(RpcUdpXact),
993                                                                                        RTEMS_DEFAULT_ATTRIBUTES,
994                                                                                        &msgQ);
995                        assert( status == RTEMS_SUCCESSFUL );
996                        status = rtems_task_start( rpciod, rpcio_daemon, 0 );
997                        assert( status == RTEMS_SUCCESSFUL );
998
999                } else {
1000                        return -1;
1001                }
1002        }
1003        return 0;
1004}
1005
1006int
1007rpcUdpCleanup(void)
1008{
1009        rtems_semaphore_create(
1010                        rtems_build_name('R','P','C','f'),
1011                        0,
1012                        RTEMS_DEFAULT_ATTRIBUTES,
1013                        0,
1014                        &fini);
1015        rtems_event_send(rpciod, RPCIOD_KILL_EVENT);
1016        /* synchronize with daemon */
1017        rtems_semaphore_obtain(fini, RTEMS_WAIT, 5*ticksPerSec);
1018        /* if the message queue is still there, something went wrong */
1019        if (!msgQ) {
1020                rtems_task_delete(rpciod);
1021        }
1022        rtems_semaphore_delete(fini);
1023        return (msgQ !=0);
1024}
1025
1026/* Another API - simpler but less efficient.
1027 * For each RPCall, a server and a Xact
1028 * are created and destroyed on the fly.
1029 *
1030 * This should be used for infrequent calls
1031 * (e.g. a NFS mount request).
1032 *
1033 * This is roughly compatible with the original
1034 * clnt_call() etc. API - but it uses our
1035 * daemon and is fully reentrant.
1036 */
1037enum clnt_stat
1038rpcUdpClntCreate(
1039                struct sockaddr_in      *psaddr,
1040                rpcprog_t               prog,
1041                rpcvers_t               vers,
1042                u_long                  uid,
1043                u_long                  gid,
1044                RpcUdpClnt              *pclnt
1045)
1046{
1047RpcUdpXact              x;
1048RpcUdpServer    s;
1049enum clnt_stat  err;
1050
1051        if ( RPC_SUCCESS != (err=rpcUdpServerCreate(psaddr, prog, vers, uid, gid, &s)) )
1052                return err;
1053
1054        if ( !(x=rpcUdpXactCreate(prog, vers, UDPMSGSIZE)) ) {
1055                rpcUdpServerDestroy(s);
1056                return RPC_FAILED;
1057        }
1058        /* TODO: could maintain a server cache */
1059
1060        x->server = s;
1061
1062        *pclnt = x;
1063
1064        return RPC_SUCCESS;
1065}
1066
1067static void
1068rpcUdpClntDestroy(RpcUdpClnt xact)
1069{
1070        rpcUdpServerDestroy(xact->server);
1071        rpcUdpXactDestroy(xact);
1072}
1073
1074enum clnt_stat
1075rpcUdpClntCall(
1076        RpcUdpClnt              xact,
1077        u_long                  proc,
1078        XdrProcT                xargs,
1079        CaddrT                  pargs,
1080        XdrProcT                xres,
1081        CaddrT                  pres,
1082        struct timeval  *timeout
1083        )
1084{
1085enum clnt_stat  stat;
1086
1087                if ( (stat = rpcUdpSend(xact, xact->server, timeout, proc,
1088                                                                xres, pres,
1089                                                                xargs, pargs,
1090                                                                0)) ) {
1091                        fprintf(stderr,"RPCIO Send failed: %i\n",stat);
1092                        return stat;
1093                }
1094                return rpcUdpRcv(xact);
1095}
1096
1097/* a yet simpler interface */
1098enum clnt_stat
1099rpcUdpCallRp(
1100        struct sockaddr_in      *psrvr,
1101        u_long                          prog,
1102        u_long                          vers,
1103        u_long                          proc,
1104        XdrProcT                        xargs,
1105        CaddrT                          pargs,
1106        XdrProcT                        xres,
1107        CaddrT                          pres,
1108        u_long                          uid,            /* RPCIO_DEFAULT_ID picks default */
1109        u_long                          gid,            /* RPCIO_DEFAULT_ID picks default */
1110        struct timeval          *timeout        /* NULL picks default           */
1111)
1112{
1113RpcUdpClnt                      clp;
1114enum clnt_stat          stat;
1115
1116        stat = rpcUdpClntCreate(
1117                                psrvr,
1118                                prog,
1119                                vers,
1120                                uid,
1121                                gid,
1122                                &clp);
1123
1124        if ( RPC_SUCCESS != stat )
1125                return stat;
1126
1127        stat = rpcUdpClntCall(
1128                                clp,
1129                                proc,
1130                                xargs, pargs,
1131                                xres,  pres,
1132                                timeout);
1133
1134        rpcUdpClntDestroy(clp);
1135
1136        return stat;
1137}
1138
1139/* linked list primitives */
1140static void
1141nodeXtract(ListNode n)
1142{
1143        if (n->prev)
1144                n->prev->next = n->next;
1145        if (n->next)
1146                n->next->prev = n->prev;
1147        n->next = n->prev = 0;
1148}
1149
1150static void
1151nodeAppend(ListNode l, ListNode n)
1152{
1153        if ( (n->next = l->next) )
1154                n->next->prev = n;
1155        l->next = n;
1156        n->prev = l;
1157
1158}
1159
1160/* this code does the work */
1161static void
1162rpcio_daemon(rtems_task_argument arg)
1163{
1164rtems_status_code stat;
1165RpcUdpXact        xact;
1166RpcUdpServer      srv;
1167rtems_interval    next_retrans, then, unow;
1168long                                            now;    /* need to do signed comparison with age! */
1169rtems_event_set   events;
1170ListNode          newList;
1171size_t            size;
1172rtems_id          q          =  0;
1173ListNodeRec       listHead   = {0, 0};
1174unsigned long     epoch      = RPCIOD_EPOCH_SECS * ticksPerSec;
1175unsigned long                   max_period = RPCIOD_RETX_CAP_S * ticksPerSec;
1176rtems_status_code       status;
1177
1178
1179        then = rtems_clock_get_ticks_since_boot();
1180
1181        for (next_retrans = epoch;;) {
1182
1183                if ( RTEMS_SUCCESSFUL !=
1184                         (stat = rtems_event_receive(
1185                                                RPCIOD_RX_EVENT | RPCIOD_TX_EVENT | RPCIOD_KILL_EVENT,
1186                                                RTEMS_WAIT | RTEMS_EVENT_ANY,
1187                                                next_retrans,
1188                                                &events)) ) {
1189                        ASSERT( RTEMS_TIMEOUT == stat );
1190                        events = 0;
1191                }
1192
1193                if (events & RPCIOD_KILL_EVENT) {
1194                        int i;
1195
1196#if (DEBUG) & DEBUG_EVENTS
1197                        fprintf(stderr,"RPCIO: got KILL event\n");
1198#endif
1199
1200                        MU_LOCK(hlock);
1201                        for (i=XACT_HASHS-1; i>=0; i--) {
1202                                if (xactHashTbl[i]) {
1203                                        break;
1204                                }
1205                        }
1206                        if (i<0) {
1207                                /* prevent them from creating and enqueueing more messages */
1208                                q=msgQ;
1209                                /* messages queued after we executed this assignment will fail */
1210                                msgQ=0;
1211                        }
1212                        MU_UNLOCK(hlock);
1213                        if (i>=0) {
1214                                fprintf(stderr,"RPCIO There are still transactions circulating; I refuse to go away\n");
1215                                fprintf(stderr,"(1st in slot %i)\n",i);
1216                                rtems_semaphore_release(fini);
1217                        } else {
1218                                break;
1219                        }
1220                }
1221
1222                unow = rtems_clock_get_ticks_since_boot();
1223
1224                /* measure everything relative to then to protect against
1225                 * rollover
1226                 */
1227                now = unow - then;
1228
1229                /* NOTE: we don't lock the hash table while we are operating
1230                 * on transactions; the paradigm is that we 'own' a particular
1231                 * transaction (and hence it's hash table slot) from the
1232                 * time the xact was put into the message queue until we
1233                 * wake up the requestor.
1234                 */
1235
1236                if (RPCIOD_RX_EVENT & events) {
1237
1238#if (DEBUG) & DEBUG_EVENTS
1239                        fprintf(stderr,"RPCIO: got RX event\n");
1240#endif
1241
1242                        while ((xact=sockRcv())) {
1243
1244                                /* extract from the retransmission list */
1245                                nodeXtract(&xact->node);
1246
1247                                /* change the ID - there might already be
1248                                 * a retransmission on the way. When it's
1249                                 * reply arrives we must not find it's ID
1250                                 * in the hashtable
1251                                 */
1252                                xact->obuf.xid        += XACT_HASHS;
1253
1254                                xact->status.re_status = RPC_SUCCESS;
1255
1256                                /* calculate roundtrip ticks */
1257                                xact->trip             = now - xact->trip;
1258
1259                                srv                    = xact->server;
1260
1261                                /* adjust the server's retry period */
1262                                {
1263                                        register TimeoutT rtry = srv->retry_period;
1264                                        register TimeoutT trip = xact->trip;
1265
1266                                        ASSERT( trip >= 0 );
1267
1268                                        if ( 0==trip )
1269                                                trip = 1;
1270
1271                                        /* retry_new = 0.75*retry_old + 0.25 * 8 * roundrip */
1272                                        rtry   = (3*rtry + (trip << 3)) >> 2;
1273
1274                                        if ( rtry > max_period )
1275                                                rtry = max_period;
1276
1277                                        srv->retry_period = rtry;
1278                                }
1279
1280                                /* wakeup requestor */
1281                                rtems_event_send(xact->requestor, RTEMS_RPC_EVENT);
1282                        }
1283                }
1284
1285                if (RPCIOD_TX_EVENT & events) {
1286
1287#if (DEBUG) & DEBUG_EVENTS
1288                        fprintf(stderr,"RPCIO: got TX event\n");
1289#endif
1290
1291                        while (RTEMS_SUCCESSFUL == rtems_message_queue_receive(
1292                                                                                        msgQ,
1293                                                                                        &xact,
1294                                                                                        &size,
1295                                                                                        RTEMS_NO_WAIT,
1296                                                                                        RTEMS_NO_TIMEOUT)) {
1297                                /* put to the head of timeout q */
1298                                nodeAppend(&listHead, &xact->node);
1299
1300                                xact->age  = now;
1301                                xact->trip = FIRST_ATTEMPT;
1302                        }
1303                }
1304
1305
1306                /* work the timeout q */
1307                newList = 0;
1308                for ( xact=(RpcUdpXact)listHead.next;
1309                          xact && xact->age <= now;
1310                          xact=(RpcUdpXact)listHead.next ) {
1311
1312                                /* extract from the list */
1313                                nodeXtract(&xact->node);
1314
1315                                srv = xact->server;
1316
1317                                if (xact->tolive < 0) {
1318                                        /* this one timed out */
1319                                        xact->status.re_errno  = ETIMEDOUT;
1320                                        xact->status.re_status = RPC_TIMEDOUT;
1321
1322                                        srv->timeouts++;
1323
1324                                        /* Change the ID - there might still be
1325                                         * a reply on the way. When it arrives we
1326                                         * must not find it's ID in the hash table
1327                                         *
1328                                         * Thanks to Steven Johnson for hunting this
1329                                         * one down.
1330                                         */
1331                                        xact->obuf.xid        += XACT_HASHS;
1332
1333#if (DEBUG) & DEBUG_TIMEOUT
1334                                        fprintf(stderr,"RPCIO XACT timed out; waking up requestor\n");
1335#endif
1336                                        if ( rtems_event_send(xact->requestor, RTEMS_RPC_EVENT) ) {
1337                                                rtems_panic("RPCIO PANIC file %s line: %i, requestor id was 0x%08x",
1338                                                                        __FILE__,
1339                                                                        __LINE__,
1340                                                                        xact->requestor);
1341                                        }
1342
1343                                } else {
1344                                        int len;
1345
1346                                        len = (int)XDR_GETPOS(&xact->xdrs);
1347
1348#ifdef MBUF_TX
1349                                        xact->refcnt = 1;       /* sendto itself */
1350#endif
1351                                        if ( len != SENDTO( ourSock,
1352                                                                                xact->obuf.buf,
1353                                                                                len,
1354                                                                                0,
1355                                                                                &srv->addr.sa,
1356                                                                                sizeof(srv->addr.sin)
1357#ifdef MBUF_TX
1358                                                                                , xact,
1359                                                                                paranoia_free,
1360                                                                                paranoia_ref
1361#endif
1362                                                                                ) ) {
1363
1364                                                xact->status.re_errno  = errno;
1365                                                xact->status.re_status = RPC_CANTSEND;
1366                                                srv->errors++;
1367
1368                                                /* wakeup requestor */
1369                                                fprintf(stderr,"RPCIO: SEND failure\n");
1370                                                status = rtems_event_send(xact->requestor, RTEMS_RPC_EVENT);
1371                                                assert( status == RTEMS_SUCCESSFUL );
1372
1373                                        } else {
1374                                                /* send successful; calculate retransmission time
1375                                                 * and enqueue to temporary list
1376                                                 */
1377                                                if (FIRST_ATTEMPT != xact->trip) {
1378#if (DEBUG) & DEBUG_TIMEOUT
1379                                                        fprintf(stderr,
1380                                                                "timed out; tolive is %i (ticks), retry period is %i (ticks)\n",
1381                                                                        xact->tolive,
1382                                                                        srv->retry_period);
1383#endif
1384                                                        /* this is a real retry; we backup
1385                                                         * the server's retry interval
1386                                                         */
1387                                                        if ( srv->retry_period < max_period ) {
1388
1389                                                                /* If multiple transactions for this server
1390                                                                 * fail (e.g. because it died) this will
1391                                                                 * back-off very agressively (doubling
1392                                                                 * the retransmission period for every
1393                                                                 * timed out transaction up to the CAP limit)
1394                                                                 * which is desirable - single packet failure
1395                                                                 * is treated more gracefully by this algorithm.
1396                                                                 */
1397
1398                                                                srv->retry_period<<=1;
1399#if (DEBUG) & DEBUG_TIMEOUT
1400                                                                fprintf(stderr,
1401                                                                                "adjusted to; retry period %i\n",
1402                                                                                srv->retry_period);
1403#endif
1404                                                        } else {
1405                                                                /* never wait longer than RPCIOD_RETX_CAP_S seconds */
1406                                                                fprintf(stderr,
1407                                                                                "RPCIO: server '%s' not responding - still trying\n",
1408                                                                                srv->name);
1409                                                        }
1410                                                        if ( 0 == ++srv->retrans % 1000) {
1411                                                                fprintf(stderr,
1412                                                                                "RPCIO - statistics: already %li retries to server %s\n",
1413                                                                                srv->retrans,
1414                                                                                srv->name);
1415                                                        }
1416                                                } else {
1417                                                        srv->requests++;
1418                                                }
1419                                                xact->trip      = now;
1420                                                {
1421                                                long capped_period = srv->retry_period;
1422                                                        if ( xact->lifetime < capped_period )
1423                                                                capped_period = xact->lifetime;
1424                                                xact->age       = now + capped_period;
1425                                                xact->tolive   -= capped_period;
1426                                                }
1427                                                /* enqueue to the list of newly sent transactions */
1428                                                xact->node.next = newList;
1429                                                newList         = &xact->node;
1430#if (DEBUG) & DEBUG_TIMEOUT
1431                                                fprintf(stderr,
1432                                                                "XACT (0x%08x) age is 0x%x, now: 0x%x\n",
1433                                                                xact,
1434                                                                xact->age,
1435                                                                now);
1436#endif
1437                                        }
1438                                }
1439            }
1440
1441                /* insert the newly sent transactions into the
1442                 * sorted retransmission list
1443                 */
1444                for (; (xact = (RpcUdpXact)newList); ) {
1445                        register ListNode p,n;
1446                        newList = newList->next;
1447                        for ( p=&listHead; (n=p->next) && xact->age > ((RpcUdpXact)n)->age; p=n )
1448                                /* nothing else to do */;
1449                        nodeAppend(p, &xact->node);
1450                }
1451
1452                if (now > epoch) {
1453                        /* every now and then, readjust the epoch */
1454                        register ListNode n;
1455                        then += now;
1456                        for (n=listHead.next; n; n=n->next) {
1457                                /* readjust outstanding time intervals subject to the
1458                                 * condition that the 'absolute' time must remain
1459                                 * the same. 'age' and 'trip' are measured with
1460                                 * respect to 'then' - hence:
1461                                 *
1462                                 * abs_age == old_age + old_then == new_age + new_then
1463                                 *
1464                                 * ==> new_age = old_age + old_then - new_then == old_age - 'now'
1465                                 */
1466                                ((RpcUdpXact)n)->age  -= now;
1467                                ((RpcUdpXact)n)->trip -= now;
1468#if (DEBUG) & DEBUG_TIMEOUT
1469                                fprintf(stderr,
1470                                                "readjusted XACT (0x%08x); age is 0x%x, trip: 0x%x now: 0x%x\n",
1471                                                (RpcUdpXact)n,
1472                                                ((RpcUdpXact)n)->trip,
1473                                                ((RpcUdpXact)n)->age,
1474                                                now);
1475#endif
1476                        }
1477                        now = 0;
1478                }
1479
1480                next_retrans = listHead.next ?
1481                                                        ((RpcUdpXact)listHead.next)->age - now :
1482                                                        epoch;  /* make sure we don't miss updating the epoch */
1483#if (DEBUG) & DEBUG_TIMEOUT
1484                fprintf(stderr,"RPCIO: next timeout is %x\n",next_retrans);
1485#endif
1486        }
1487        /* close our socket; shut down the receiver */
1488        close(ourSock);
1489
1490#if 0 /* if we get here, no transactions exist, hence there can be none
1491           * in the queue whatsoever
1492           */
1493        /* flush the message queue */
1494        while (RTEMS_SUCCESSFUL == rtems_message_queue_receive(
1495                                                                                q,
1496                                                                                &xact,
1497                                                                                &size,
1498                                                                                RTEMS_NO_WAIT,
1499                                                                                RTEMS_NO_TIMEOUT)) {
1500                        /* TODO enque xact */
1501        }
1502
1503        /* flush all outstanding transactions */
1504
1505        for (xact=((RpcUdpXact)listHead.next); xact; xact=((RpcUdpXact)xact->node.next)) {
1506                        xact->status.re_status = RPC_TIMEDOUT;
1507                        rtems_event_send(xact->requestor, RTEMS_RPC_EVENT);
1508        }
1509#endif
1510
1511        rtems_message_queue_delete(q);
1512
1513        MU_DESTROY(hlock);
1514
1515        fprintf(stderr,"RPC daemon exited...\n");
1516
1517        rtems_semaphore_release(fini);
1518        rtems_task_suspend(RTEMS_SELF);
1519}
1520
1521
1522/* support for transaction 'pools'. A number of XACT objects
1523 * is always kept around. The initial number is 0 but it
1524 * is allowed to grow up to a maximum.
1525 * If the need grows beyond the maximum, behavior depends:
1526 * Users can either block until a transaction becomes available,
1527 * they can create a new XACT on the fly or get an error
1528 * if no free XACT is available from the pool.
1529 */
1530
1531RpcUdpXactPool
1532rpcUdpXactPoolCreate(
1533        rpcprog_t prog,                 rpcvers_t version,
1534        int xactsize,   int poolsize)
1535{
1536RpcUdpXactPool  rval = MY_MALLOC(sizeof(*rval));
1537rtems_status_code       status;
1538
1539        ASSERT( rval );
1540        status = rtems_message_queue_create(
1541                                        rtems_build_name('R','P','C','p'),
1542                                        poolsize,
1543                                        sizeof(RpcUdpXact),
1544                                        RTEMS_DEFAULT_ATTRIBUTES,
1545                                        &rval->box);
1546        assert( status == RTEMS_SUCCESSFUL );
1547
1548        rval->prog     = prog;
1549        rval->version  = version;
1550        rval->xactSize = xactsize;
1551        return rval;
1552}
1553
1554void
1555rpcUdpXactPoolDestroy(RpcUdpXactPool pool)
1556{
1557RpcUdpXact xact;
1558
1559        while ((xact = rpcUdpXactPoolGet(pool, XactGetFail))) {
1560                rpcUdpXactDestroy(xact);
1561        }
1562        rtems_message_queue_delete(pool->box);
1563        MY_FREE(pool);
1564}
1565
1566RpcUdpXact
1567rpcUdpXactPoolGet(RpcUdpXactPool pool, XactPoolGetMode mode)
1568{
1569RpcUdpXact       xact = 0;
1570size_t           size;
1571
1572        if (RTEMS_SUCCESSFUL != rtems_message_queue_receive(
1573                                                                pool->box,
1574                                                                &xact,
1575                                                                &size,
1576                                                                XactGetWait == mode ?
1577                                                                        RTEMS_WAIT : RTEMS_NO_WAIT,
1578                                                                RTEMS_NO_TIMEOUT)) {
1579
1580                /* nothing found in box; should we create a new one ? */
1581
1582                xact = (XactGetCreate == mode) ?
1583                                        rpcUdpXactCreate(
1584                                                        pool->prog,
1585                                                        pool->version,
1586                                                        pool->xactSize) : 0 ;
1587                if (xact)
1588                                xact->pool = pool;
1589
1590        }
1591        return xact;
1592}
1593
1594void
1595rpcUdpXactPoolPut(RpcUdpXact xact)
1596{
1597RpcUdpXactPool pool;
1598
1599        pool = xact->pool;
1600        ASSERT( pool );
1601
1602        if (RTEMS_SUCCESSFUL != rtems_message_queue_send(
1603                                                                pool->box,
1604                                                                &xact,
1605                                                                sizeof(xact)))
1606                rpcUdpXactDestroy(xact);
1607}
1608
1609#ifdef MBUF_RX
1610
1611/* WORKAROUND: include sys/mbuf.h (or other bsdnet headers) only
1612 *             _after_ using malloc()/free() & friends because
1613 *             the RTEMS/BSDNET headers redefine those :-(
1614 */
1615
1616#define _KERNEL
1617#include <sys/mbuf.h>
1618
1619ssize_t
1620recv_mbuf_from(int s, struct mbuf **ppm, long len, struct sockaddr *fromaddr, int *fromlen);
1621
1622static void
1623bufFree(struct mbuf **m)
1624{
1625        if (*m) {
1626                rtems_bsdnet_semaphore_obtain();
1627                m_freem(*m);
1628                rtems_bsdnet_semaphore_release();
1629                *m = 0;
1630        }
1631}
1632#endif
1633
1634#ifdef MBUF_TX
1635static void
1636paranoia_free(caddr_t closure, u_int size)
1637{
1638#if (DEBUG)
1639RpcUdpXact xact = (RpcUdpXact)closure;
1640int        len  = (int)XDR_GETPOS(&xact->xdrs);
1641
1642        ASSERT( --xact->refcnt >= 0 && size == len );
1643#endif
1644}
1645
1646static void
1647paranoia_ref (caddr_t closure, u_int size)
1648{
1649#if (DEBUG)
1650RpcUdpXact xact = (RpcUdpXact)closure;
1651int        len  = (int)XDR_GETPOS(&xact->xdrs);
1652        ASSERT( size == len );
1653        xact->refcnt++;
1654#endif
1655}
1656#endif
1657
1658/* receive from a socket and find
1659 * the transaction corresponding to the
1660 * transaction ID received in the server
1661 * reply.
1662 *
1663 * The semantics of the 'pibuf' pointer are
1664 * as follows:
1665 *
1666 * MBUF_RX:
1667 *
1668 */
1669
1670#define RPCIOD_RXBUFSZ  UDPMSGSIZE
1671
1672static RpcUdpXact
1673sockRcv(void)
1674{
1675int                                     len,i;
1676uint32_t                                xid;
1677union {
1678        struct sockaddr_in      sin;
1679        struct sockaddr     sa;
1680}                                       fromAddr;
1681int                                     fromLen  = sizeof(fromAddr.sin);
1682RxBuf                           ibuf     = 0;
1683RpcUdpXact                      xact     = 0;
1684
1685        do {
1686
1687        /* rcv_mbuf() and recvfrom() differ in that the
1688         * former allocates buffers and passes them back
1689         * to us whereas the latter requires us to provide
1690         * buffer space.
1691         * Hence, in the first case whe have to make sure
1692         * no old buffer is leaked - in the second case,
1693         * we might well re-use an old buffer but must
1694         * make sure we have one allocated
1695         */
1696#ifdef MBUF_RX
1697        if (ibuf)
1698                bufFree(&ibuf);
1699
1700        len  = recv_mbuf_from(
1701                                        ourSock,
1702                                        &ibuf,
1703                                        RPCIOD_RXBUFSZ,
1704                                    &fromAddr.sa,
1705                                    &fromLen);
1706#else
1707        if ( !ibuf )
1708                ibuf = (RpcBuf)MY_MALLOC(RPCIOD_RXBUFSZ);
1709        if ( !ibuf )
1710                goto cleanup; /* no memory - drop this message */
1711
1712        len  = recvfrom(ourSock,
1713                                    ibuf->buf,
1714                                    RPCIOD_RXBUFSZ,
1715                                    0,
1716                                    &fromAddr.sa,
1717                                        &fromLen);
1718#endif
1719
1720        if (len <= 0) {
1721                if (EAGAIN != errno)
1722                        fprintf(stderr,"RECV failed: %s\n",strerror(errno));
1723                goto cleanup;
1724        }
1725
1726#if (DEBUG) & DEBUG_PACKLOSS
1727        if ( (unsigned)rand() < DEBUG_PACKLOSS_FRACT ) {
1728                /* lose packets once in a while */
1729                static int xxx = 0;
1730                if ( ++xxx % 16 == 0 )
1731                        fprintf(stderr,"DEBUG: dropped %i packets, so far...\n",xxx);
1732                if ( ibuf )
1733                        bufFree( &ibuf );
1734                continue;
1735        }
1736#endif
1737
1738        i = (xid=XID(ibuf)) & XACT_HASH_MSK;
1739
1740        if ( !(xact=xactHashTbl[i])                                             ||
1741                   xact->obuf.xid                     != xid                        ||
1742#ifdef REJECT_SERVERIP_MISMATCH
1743                   xact->server->addr.sin.sin_addr.s_addr != fromAddr.sin.sin_addr.s_addr       ||
1744#endif
1745                   xact->server->addr.sin.sin_port        != fromAddr.sin.sin_port ) {
1746
1747                if (xact) {
1748                        if (
1749#ifdef REJECT_SERVERIP_MISMATCH
1750                            xact->server->addr.sin.sin_addr.s_addr == fromAddr.sin.sin_addr.s_addr &&
1751#endif
1752                        xact->server->addr.sin.sin_port        == fromAddr.sin.sin_port        &&
1753                            ( xact->obuf.xid                   == xid + XACT_HASHS   ||
1754                                  xact->obuf.xid                   == xid + 2*XACT_HASHS    )
1755                                ) {
1756#ifndef DEBUG /* don't complain if it's just a late arrival of a retry */
1757                        fprintf(stderr,"RPCIO - FYI sockRcv(): dropping late/redundant retry answer\n");
1758#endif
1759                        } else {
1760                        fprintf(stderr,"RPCIO WARNING sockRcv(): transaction mismatch\n");
1761                        fprintf(stderr,"xact: xid  0x%08" PRIx32 "  -- got 0x%08" PRIx32 "\n",
1762                                                        xact->obuf.xid, xid);
1763                        fprintf(stderr,"xact: addr 0x%08" PRIx32 "  -- got 0x%08" PRIx32 "\n",
1764                                                        xact->server->addr.sin.sin_addr.s_addr,
1765                                                        fromAddr.sin.sin_addr.s_addr);
1766                        fprintf(stderr,"xact: port 0x%08x  -- got 0x%08x\n",
1767                                                        xact->server->addr.sin.sin_port,
1768                                                        fromAddr.sin.sin_port);
1769                        }
1770                } else {
1771                        fprintf(stderr,
1772                                        "RPCIO WARNING sockRcv(): got xid 0x%08" PRIx32 " but its slot is empty\n",
1773                                        xid);
1774                }
1775                /* forget about this one and try again */
1776                xact = 0;
1777        }
1778
1779        } while ( !xact );
1780
1781        xact->ibuf     = ibuf;
1782#ifndef MBUF_RX
1783        xact->ibufsize = RPCIOD_RXBUFSZ;
1784#endif
1785
1786        return xact;
1787
1788cleanup:
1789
1790        bufFree(&ibuf);
1791
1792        return 0;
1793}
1794
1795
1796#include <rtems/rtems_bsdnet_internal.h>
1797/* double check the event configuration; should probably globally
1798 * manage system events!!
1799 * We do this at the end of the file for the same reason we had
1800 * included mbuf.h only a couple of lines above - see comment up
1801 * there...
1802 */
1803#if RTEMS_RPC_EVENT & SOSLEEP_EVENT & SBWAIT_EVENT & NETISR_EVENTS
1804#error ILLEGAL EVENT CONFIGURATION
1805#endif
Note: See TracBrowser for help on using the repository browser.