source: rtems/cpukit/libfs/src/nfsclient/src/rpcio.c @ a2097c5

4.115
Last change on this file since a2097c5 was b657ea4d, checked in by Sebastian Huber <sebastian.huber@…>, on 12/05/14 at 12:05:34

nfsclient: Avoid FILE and LINE

The FILE prevents reproducible builds.

  • Property mode set to 100644
File size: 44.9 KB
Line 
1/**
2 * @file
3 *
4 * @brief RPC Multiplexor for a Multitasking Environment
5 * @ingroup libfs
6 *
7 * This code funnels arbitrary task's UDP/RPC requests
8 * through one socket to arbitrary servers.
9 * The replies are gathered and dispatched to the
10 * requestors.
11 * One task handles all the sending and receiving
12 * work including retries.
13 * It is up to the requestor, however, to do
14 * the XDR encoding of the arguments / decoding
15 * of the results (except for the RPC header which
16 * is handled by the daemon).
17 */
18
19/*
20 * Author: Till Straumann <strauman@slac.stanford.edu>, 2002
21 *
22 * Authorship
23 * ----------
24 * This software (NFS-2 client implementation for RTEMS) was created by
25 *     Till Straumann <strauman@slac.stanford.edu>, 2002-2007,
26 *         Stanford Linear Accelerator Center, Stanford University.
27 *
28 * Acknowledgement of sponsorship
29 * ------------------------------
30 * The NFS-2 client implementation for RTEMS was produced by
31 *     the Stanford Linear Accelerator Center, Stanford University,
32 *         under Contract DE-AC03-76SFO0515 with the Department of Energy.
33 *
34 * Government disclaimer of liability
35 * ----------------------------------
36 * Neither the United States nor the United States Department of Energy,
37 * nor any of their employees, makes any warranty, express or implied, or
38 * assumes any legal liability or responsibility for the accuracy,
39 * completeness, or usefulness of any data, apparatus, product, or process
40 * disclosed, or represents that its use would not infringe privately owned
41 * rights.
42 *
43 * Stanford disclaimer of liability
44 * --------------------------------
45 * Stanford University makes no representations or warranties, express or
46 * implied, nor assumes any liability for the use of this software.
47 *
48 * Stanford disclaimer of copyright
49 * --------------------------------
50 * Stanford University, owner of the copyright, hereby disclaims its
51 * copyright and all other rights in this software.  Hence, anyone may
52 * freely use it for any purpose without restriction.
53 *
54 * Maintenance of notices
55 * ----------------------
56 * In the interest of clarity regarding the origin and status of this
57 * SLAC software, this and all the preceding Stanford University notices
58 * are to remain affixed to any copy or derivative of this software made
59 * or distributed by the recipient and are to be affixed to any copy of
60 * software made or distributed by the recipient that contains a copy or
61 * derivative of this software.
62 *
63 * ------------------ SLAC Software Notices, Set 4 OTT.002a, 2004 FEB 03
64 */
65
66#if HAVE_CONFIG_H
67#include "config.h"
68#endif
69
70#include <inttypes.h>
71
72#include <rtems.h>
73#include <rtems/error.h>
74#include <rtems/rtems_bsdnet.h>
75#include <stdlib.h>
76#include <time.h>
77#include <rpc/rpc.h>
78#include <rpc/pmap_prot.h>
79#include <errno.h>
80#include <sys/ioctl.h>
81#include <assert.h>
82#include <stdio.h>
83#include <errno.h>
84#include <string.h>
85#include <netinet/in.h>
86#include <arpa/inet.h>
87#include <sys/cpuset.h>
88
89#include "rpcio.h"
90
91/****************************************************************/
92/* CONFIGURABLE PARAMETERS                                      */
93/****************************************************************/
94
95#define MBUF_RX                 /* If defined: use mbuf XDR stream for
96                                                 *  decoding directly out of mbufs
97                                                 *  Otherwise, the regular 'recvfrom()'
98                                                 *  interface will be used involving an
99                                                 *  extra buffer allocation + copy step.
100                                                 */
101
102#define MBUF_TX                 /* If defined: avoid copying data when
103                                                 *  sending. Instead, use a wrapper to
104                                                 *  'sosend()' which will point an MBUF
105                                                 *  directly to our buffer space.
106                                                 *  Note that the BSD stack does not copy
107                                                 *  data when fragmenting packets - it
108                                                 *  merely uses an mbuf chain pointing
109                                                 *  into different areas of the data.
110                                                 *
111                                                 * If undefined, the regular 'sendto()'
112                                                 *  interface is used.
113                                                 */
114
115#undef REJECT_SERVERIP_MISMATCH
116                                                /* If defined, RPC replies must come from the server
117                                                 * that was queried. Eric Norum has reported problems
118                                                 * with clustered NFS servers. So we disable this
119                                                 * reducing paranoia...
120                                                 */
121
122/* daemon task parameters */
123#define RPCIOD_STACK            10000
124#define RPCIOD_PRIO                     100     /* *fallback* priority */
125
126/* depth of the message queue for sending
127 * RPC requests to the daemon
128 */
129#define RPCIOD_QDEPTH           20
130
131/* Maximum retry limit for retransmission */
132#define RPCIOD_RETX_CAP_S       3 /* seconds */
133
134/* Default timeout for RPC calls */
135#define RPCIOD_DEFAULT_TIMEOUT  (&_rpc_default_timeout)
136static struct timeval _rpc_default_timeout = { 10 /* secs */, 0 /* usecs */ };
137
138/* how many times should we try to resend a failed
139 * transaction with refreshed AUTHs
140 */
141#define RPCIOD_REFRESH          2
142
143/* Events we are using; the RPC_EVENT
144 * MUST NOT be used by any application
145 * thread doing RPC IO (e.g. NFS)
146 */
147#define RTEMS_RPC_EVENT         RTEMS_EVENT_30  /* THE event used by RPCIO. Every task doing
148                                                                                         * RPC IO will receive this - hence it is
149                                                                                         * RESERVED
150                                                                                         */
151#define RPCIOD_RX_EVENT         RTEMS_EVENT_1   /* Events the RPCIOD is using/waiting for */
152#define RPCIOD_TX_EVENT         RTEMS_EVENT_2
153#define RPCIOD_KILL_EVENT       RTEMS_EVENT_3   /* send to the daemon to kill it          */
154
155#define LD_XACT_HASH            8                               /* ld of the size of the transaction hash table  */
156
157
158/* Debugging Flags                                              */
159
160/* NOTE: defining DEBUG 0 leaves some 'assert()' paranoia checks
161 *       but produces no output
162 */
163
164#define DEBUG_TRACE_XACT        (1<<0)
165#define DEBUG_EVENTS            (1<<1)
166#define DEBUG_MALLOC            (1<<2)
167#define DEBUG_TIMEOUT           (1<<3)
168#define DEBUG_PACKLOSS          (1<<4)  /* This introduces random, artificial packet losses to test retransmission */
169
170#define DEBUG_PACKLOSS_FRACT (0xffffffff/10)
171
172/* USE PARENTHESIS WHEN 'or'ing MULTIPLE FLAGS: (DEBUG_XX | DEBUG_YY) */
173#define DEBUG                           (0)
174
175/****************************************************************/
176/* END OF CONFIGURABLE SECTION                                  */
177/****************************************************************/
178
179/* prevent rollover of our timers by readjusting the epoch on the fly */
180#if     (DEBUG) & DEBUG_TIMEOUT
181#define RPCIOD_EPOCH_SECS       10
182#else
183#define RPCIOD_EPOCH_SECS       10000
184#endif
185
186#ifdef  DEBUG
187#define ASSERT(arg)                     assert(arg)
188#else
189#define ASSERT(arg)                     if (arg)
190#endif
191
192/****************************************************************/
193/* MACROS                                                       */
194/****************************************************************/
195
196
197#define XACT_HASHS              (1<<(LD_XACT_HASH))     /* the hash table size derived from the ld       */
198#define XACT_HASH_MSK   ((XACT_HASHS)-1)        /* mask to extract the hash index from a RPC-XID */
199
200
201#define MU_LOCK(mutex)          do {                                                    \
202                                                        assert(                                                 \
203                                                                RTEMS_SUCCESSFUL ==                     \
204                                                                rtems_semaphore_obtain(         \
205                                                                                (mutex),                        \
206                                                                                RTEMS_WAIT,                     \
207                                                                                RTEMS_NO_TIMEOUT        \
208                                                                                ) );                            \
209                                                        } while(0)
210
211#define MU_UNLOCK(mutex)        do {                                                    \
212                                                        assert(                                                 \
213                                                                RTEMS_SUCCESSFUL ==                     \
214                                                                rtems_semaphore_release(        \
215                                                                                (mutex)                         \
216                                                                                ) );                            \
217                                                        } while(0)
218
219#define MU_CREAT(pmutex)        do {                                                    \
220                                                        assert(                                                 \
221                                                                RTEMS_SUCCESSFUL ==                     \
222                                                                rtems_semaphore_create(         \
223                                                                                rtems_build_name(       \
224                                                                                        'R','P','C','l' \
225                                                                                        ),                              \
226                                                                                1,                                      \
227                                                                                MUTEX_ATTRIBUTES,       \
228                                                                                0,                                      \
229                                                                                (pmutex)) );            \
230                                                        } while (0)
231
232
233#define MU_DESTROY(mutex)       do {                                                    \
234                                                        assert(                                                 \
235                                                                RTEMS_SUCCESSFUL ==                     \
236                                                                rtems_semaphore_delete(         \
237                                                                                mutex                           \
238                                                                                ) );                            \
239                                                        } while (0)
240
241#define MUTEX_ATTRIBUTES        (RTEMS_LOCAL           |                \
242                                                        RTEMS_PRIORITY         |                \
243                                                        RTEMS_INHERIT_PRIORITY |                \
244                                                        RTEMS_BINARY_SEMAPHORE)
245
246#define FIRST_ATTEMPT           0x88888888 /* some time that is never reached */
247
248/****************************************************************/
249/* TYPE DEFINITIONS                                             */
250/****************************************************************/
251
252typedef rtems_interval          TimeoutT;
253
254/* 100000th implementation of a doubly linked list;
255 * since only one thread is looking at these,
256 * we need no locking
257 */
258typedef struct ListNodeRec_ {
259        struct ListNodeRec_ *next, *prev;
260} ListNodeRec, *ListNode;
261
262
263/* Structure representing an RPC server */
264typedef struct RpcUdpServerRec_ {
265                RpcUdpServer            next;                   /* linked list of all servers; protected by hlock */
266                union {
267                struct sockaddr_in      sin;
268                struct sockaddr     sa;
269                }                                       addr;
270                AUTH                            *auth;
271                rtems_id                        authlock;               /* must MUTEX the auth object - it's not clear
272                                                                                         *  what is better:
273                                                                                         *   1 having one (MUTEXed) auth per server
274                                                                                         *         who is shared among all transactions
275                                                                                         *         using that server
276                                                                                         *       2 maintaining an AUTH per transaction
277                                                                                         *         (there are then other options: manage
278                                                                                         *         XACT pools on a per-server basis instead
279                                                                                         *         of associating a server with a XACT when
280                                                                                         *   sending)
281                                                                                         * experience will show if the current (1)
282                                                                                         * approach has to be changed.
283                                                                                         */
284                TimeoutT                        retry_period;   /* dynamically adjusted retry period
285                                                                                         * (based on packet roundtrip time)
286                                                                                         */
287                /* STATISTICS */
288                unsigned long           retrans;                /* how many retries were issued by this server         */
289                unsigned long           requests;               /* how many requests have been sent                    */
290                unsigned long       timeouts;           /* how many requests have timed out                    */
291                unsigned long       errors;         /* how many errors have occurred (other than timeouts) */
292                char                            name[20];               /* server's address in IP 'dot' notation               */
293} RpcUdpServerRec;
294
295typedef union  RpcBufU_ {
296                uint32_t                        xid;
297                char                            buf[1];
298} RpcBufU, *RpcBuf;
299
300/* RX Buffer implementation; this is either
301 * an MBUF chain (MBUF_RX configuration)
302 * or a buffer allocated from the heap
303 * where recvfrom copies the (encoded) reply
304 * to. The XDR routines the copy/decode
305 * it into the user's data structures.
306 */
307#ifdef MBUF_RX
308typedef struct mbuf *           RxBuf;  /* an MBUF chain */
309static  void                            bufFree(struct mbuf **m);
310#define XID(ibuf)                       (*(mtod((ibuf), u_long *)))
311extern void                             xdrmbuf_create(XDR *, struct mbuf *, enum xdr_op);
312#else
313typedef RpcBuf                          RxBuf;
314#define bufFree(b)                      do { MY_FREE(*(b)); *(b)=0; } while(0)
315#define XID(ibuf)                       ((ibuf)->xid)
316#endif
317
318/* A RPC 'transaction' consisting
319 * of server and requestor information,
320 * buffer space and an XDR object
321 * (for encoding arguments).
322 */
323typedef struct RpcUdpXactRec_ {
324                ListNodeRec                     node;           /* so we can put XACTs on a list                */
325                RpcUdpServer            server;         /* server this XACT goes to                     */
326                long                            lifetime;       /* during the lifetime, retry attempts are made */
327                long                            tolive;         /* lifetime timer                               */
328                struct rpc_err          status;         /* RPC reply error status                       */
329                long                            age;            /* age info; needed to manage retransmission    */
330                long                            trip;           /* record round trip time in ticks              */
331                rtems_id                        requestor;      /* the task waiting for this XACT to complete   */
332                RpcUdpXactPool          pool;           /* if this XACT belong to a pool, this is it    */
333                XDR                                     xdrs;           /* argument encoder stream                      */
334                int                                     xdrpos;     /* stream position after the (permanent) header */
335                xdrproc_t                       xres;           /* reply decoder proc - TODO needn't be here    */
336                caddr_t                         pres;           /* reply decoded obj  - TODO needn't be here    */
337#ifndef MBUF_RX
338                int                                     ibufsize;       /* size of the ibuf (bytes)                     */
339#endif
340#ifdef  MBUF_TX
341                int                                     refcnt;         /* mbuf external storage reference count        */
342#endif
343                int                                     obufsize;       /* size of the obuf (bytes)                     */
344                RxBuf                           ibuf;           /* pointer to input buffer assigned by daemon   */
345                RpcBufU                         obuf;       /* output buffer (encoded args) APPENDED HERE   */
346} RpcUdpXactRec;
347
348typedef struct RpcUdpXactPoolRec_ {
349        rtems_id        box;
350        int                     prog;
351        int                     version;
352        int                     xactSize;
353} RpcUdpXactPoolRec;
354
355/* a global hash table where all 'living' transaction
356 * objects are registered.
357 * A number of bits in a transaction's XID maps 1:1 to
358 * an index in this table. Hence, the XACT matching
359 * an RPC/UDP reply packet can quickly be found
360 * The size of this table imposes a hard limit on the
361 * number of all created transactions in the system.
362 */
363static RpcUdpXact xactHashTbl[XACT_HASHS]={0};
364static u_long     xidUpper   [XACT_HASHS]={0};
365static unsigned   xidHashSeed            = 0 ;
366
367/* forward declarations */
368static RpcUdpXact
369sockRcv(void);
370
371static void
372rpcio_daemon(rtems_task_argument);
373
374#ifdef MBUF_TX
375ssize_t
376sendto_nocpy (
377                int s,
378                const void *buf, size_t buflen,
379                int flags,
380                const struct sockaddr *toaddr, int tolen,
381                void *closure,
382                void (*freeproc)(caddr_t, u_int),
383                void (*refproc)(caddr_t, u_int)
384);
385static void paranoia_free(caddr_t closure, u_int size);
386static void paranoia_ref (caddr_t closure, u_int size);
387#define SENDTO  sendto_nocpy
388#else
389#define SENDTO  sendto
390#endif
391
392static RpcUdpServer             rpcUdpServers = 0;      /* linked list of all servers; protected by llock */
393
394static int                              ourSock = -1;           /* the socket we are using for communication */
395static rtems_id                 rpciod  = 0;            /* task id of the RPC daemon                 */
396static rtems_id                 msgQ    = 0;            /* message queue where the daemon picks up
397                                                                                         * requests
398                                                                                         */
399#ifndef NDEBUG
400static rtems_id                 llock   = 0;            /* MUTEX protecting the server list */
401static rtems_id                 hlock   = 0;            /* MUTEX protecting the hash table and the list of servers */
402#endif
403static rtems_id                 fini    = 0;            /* a synchronization semaphore we use during
404                                                                                         * module cleanup / driver unloading
405                                                                                         */
406static rtems_interval   ticksPerSec;            /* cached system clock rate (WHO IS ASSUMED NOT
407                                                                                         * TO CHANGE)
408                                                                                         */
409
410rtems_task_priority             rpciodPriority = 0;
411#ifdef RTEMS_SMP
412const cpu_set_t                 *rpciodCpuset = 0;
413size_t                          rpciodCpusetSize = 0;
414#endif
415
416#if (DEBUG) & DEBUG_MALLOC
417/* malloc wrappers for debugging */
418static int nibufs = 0;
419
420static inline void *MY_MALLOC(int s)
421{
422        if (s) {
423                void *rval;
424                MU_LOCK(hlock);
425                assert(nibufs++ < 2000);
426                MU_UNLOCK(hlock);
427                assert((rval = malloc(s)) != 0);
428                return rval;
429        }
430        return 0;
431}
432
433static inline void *MY_CALLOC(int n, int s)
434{
435        if (s) {
436                void *rval;
437                MU_LOCK(hlock);
438                assert(nibufs++ < 2000);
439                MU_UNLOCK(hlock);
440                assert((rval = calloc(n,s)) != 0);
441                return rval;
442        }
443        return 0;
444}
445
446
447static inline void MY_FREE(void *p)
448{
449        if (p) {
450                MU_LOCK(hlock);
451                nibufs--;
452                MU_UNLOCK(hlock);
453                free(p);
454        }
455}
456#else
457#define MY_MALLOC       malloc
458#define MY_CALLOC       calloc
459#define MY_FREE         free
460#endif
461
462static inline bool_t
463locked_marshal(RpcUdpServer s, XDR *xdrs)
464{
465bool_t rval;
466        MU_LOCK(s->authlock);
467        rval = AUTH_MARSHALL(s->auth, xdrs);
468        MU_UNLOCK(s->authlock);
469        return rval;
470}
471
472/* Locked operations on a server's auth object */
473static inline bool_t
474locked_validate(RpcUdpServer s, struct opaque_auth *v)
475{
476bool_t rval;
477        MU_LOCK(s->authlock);
478        rval = AUTH_VALIDATE(s->auth, v);
479        MU_UNLOCK(s->authlock);
480        return rval;
481}
482
483static inline bool_t
484locked_refresh(RpcUdpServer s)
485{
486bool_t rval;
487        MU_LOCK(s->authlock);
488        rval = AUTH_REFRESH(s->auth);
489        MU_UNLOCK(s->authlock);
490        return rval;
491}
492
493/* Create a server object
494 *
495 */
496enum clnt_stat
497rpcUdpServerCreate(
498        struct sockaddr_in      *paddr,
499        rpcprog_t               prog,
500        rpcvers_t               vers,
501        u_long                  uid,
502        u_long                  gid,
503        RpcUdpServer            *psrv
504        )
505{
506RpcUdpServer    rval;
507u_short                 port;
508char                    hname[MAX_MACHINE_NAME + 1];
509int                             theuid, thegid;
510int                             thegids[NGRPS];
511gid_t                   gids[NGROUPS];
512int                             len,i;
513AUTH                    *auth;
514enum clnt_stat  pmap_err;
515struct pmap             pmaparg;
516
517        if ( gethostname(hname, MAX_MACHINE_NAME) ) {
518                fprintf(stderr,
519                                "RPCIO - error: I have no hostname ?? (%s)\n",
520                                strerror(errno));
521                return RPC_UNKNOWNHOST;
522        }
523
524        if ( (len = getgroups(NGROUPS, gids) < 0 ) ) {
525                fprintf(stderr,
526                                "RPCIO - error: I unable to get group ids (%s)\n",
527                                strerror(errno));
528                return RPC_FAILED;
529        }
530
531        if ( len > NGRPS )
532                len = NGRPS;
533
534        for (i=0; i<len; i++)
535                thegids[i] = (int)gids[i];
536
537        theuid = (int) ((RPCIOD_DEFAULT_ID == uid) ? geteuid() : uid);
538        thegid = (int) ((RPCIOD_DEFAULT_ID == gid) ? getegid() : gid);
539
540        if ( !(auth = authunix_create(hname, theuid, thegid, len, thegids)) ) {
541                fprintf(stderr,
542                                "RPCIO - error: unable to create RPC AUTH\n");
543                return RPC_FAILED;
544        }
545
546        /* if they specified no port try to ask the portmapper */
547        if (!paddr->sin_port) {
548
549                paddr->sin_port = htons(PMAPPORT);
550
551        pmaparg.pm_prog = prog;
552        pmaparg.pm_vers = vers;
553        pmaparg.pm_prot = IPPROTO_UDP;
554        pmaparg.pm_port = 0;  /* not needed or used */
555
556
557                /* dont use non-reentrant pmap_getport ! */
558
559                pmap_err = rpcUdpCallRp(
560                                                paddr,
561                                                PMAPPROG,
562                                                PMAPVERS,
563                                                PMAPPROC_GETPORT,
564                                                xdr_pmap,
565                                                &pmaparg,
566                                                xdr_u_short,
567                                                &port,
568                                                uid,
569                                                gid,
570                                                0);
571
572                if ( RPC_SUCCESS != pmap_err ) {
573                        paddr->sin_port = 0;
574                        return pmap_err;
575                }
576
577                paddr->sin_port = htons(port);
578        }
579
580        if (0==paddr->sin_port) {
581                        return RPC_PROGNOTREGISTERED;
582        }
583
584        rval                            = (RpcUdpServer)MY_MALLOC(sizeof(*rval));
585        memset(rval, 0, sizeof(*rval));
586
587        if (!inet_ntop(AF_INET, &paddr->sin_addr, rval->name, sizeof(rval->name)))
588                sprintf(rval->name,"?.?.?.?");
589        rval->addr.sin          = *paddr;
590
591        /* start with a long retransmission interval - it
592         * will be adapted dynamically
593         */
594        rval->retry_period  = RPCIOD_RETX_CAP_S * ticksPerSec;
595
596        rval->auth                      = auth;
597
598        MU_CREAT( &rval->authlock );
599
600        /* link into list */
601        MU_LOCK( llock );
602        rval->next = rpcUdpServers;
603        rpcUdpServers = rval;
604        MU_UNLOCK( llock );
605
606        *psrv                           = rval;
607        return RPC_SUCCESS;
608}
609
610void
611rpcUdpServerDestroy(RpcUdpServer s)
612{
613RpcUdpServer prev;
614        if (!s)
615                return;
616        /* we should probably verify (but how?) that nobody
617         * (at least: no outstanding XACTs) is using this
618         * server;
619         */
620
621        /* remove from server list */
622        MU_LOCK(llock);
623        prev = rpcUdpServers;
624        if ( s == prev ) {
625                rpcUdpServers = s->next;
626        } else {
627                for ( ; prev ; prev = prev->next) {
628                        if (prev->next == s) {
629                                prev->next = s->next;
630                                break;
631                        }
632                }
633        }
634        MU_UNLOCK(llock);
635
636        /* MUST have found it */
637        assert(prev);
638
639        auth_destroy(s->auth);
640
641        MU_DESTROY(s->authlock);
642        MY_FREE(s);
643}
644
645int
646rpcUdpStats(FILE *f)
647{
648RpcUdpServer s;
649
650        if (!f) f = stdout;
651
652        fprintf(f,"RPCIOD statistics:\n");
653
654        MU_LOCK(llock);
655        for (s = rpcUdpServers; s; s=s->next) {
656                fprintf(f,"\nServer -- %s:\n", s->name);
657                fprintf(f,"  requests    sent: %10ld, retransmitted: %10ld\n",
658                                                s->requests, s->retrans);
659                fprintf(f,"         timed out: %10ld,   send errors: %10ld\n",
660                                                s->timeouts, s->errors);
661                fprintf(f,"  current retransmission interval: %dms\n",
662                                                (unsigned)(s->retry_period * 1000 / ticksPerSec) );
663        }
664        MU_UNLOCK(llock);
665
666        return 0;
667}
668
669RpcUdpXact
670rpcUdpXactCreate(
671        u_long  program,
672        u_long  version,
673        u_long  size
674        )
675{
676RpcUdpXact              rval=0;
677struct rpc_msg  header;
678register int    i,j;
679
680        if (!size)
681                size = UDPMSGSIZE;
682        /* word align */
683        size = (size + 3) & ~3;
684
685        rval = (RpcUdpXact)MY_CALLOC(1,sizeof(*rval) - sizeof(rval->obuf) + size);
686
687        if (rval) {
688
689                header.rm_xid             = 0;
690                header.rm_direction       = CALL;
691                header.rm_call.cb_rpcvers = RPC_MSG_VERSION;
692                header.rm_call.cb_prog    = program;
693                header.rm_call.cb_vers    = version;
694                xdrmem_create(&(rval->xdrs), rval->obuf.buf, size, XDR_ENCODE);
695
696                if (!xdr_callhdr(&(rval->xdrs), &header)) {
697                        MY_FREE(rval);
698                        return 0;
699                }
700                /* pick a free table slot and initialize the XID */
701                MU_LOCK(hlock);
702                rval->obuf.xid = (xidHashSeed++ ^ ((uintptr_t)rval>>10)) & XACT_HASH_MSK;
703                i=j=(rval->obuf.xid & XACT_HASH_MSK);
704                if (msgQ) {
705                        /* if there's no message queue, refuse to
706                         * give them transactions; we might be in the process to
707                         * go away...
708                         */
709                        do {
710                                i=(i+1) & XACT_HASH_MSK; /* cheap modulo */
711                                if (!xactHashTbl[i]) {
712#if (DEBUG) & DEBUG_TRACE_XACT
713                                        fprintf(stderr,"RPCIO: entering index %i, val %x\n",i,rval);
714#endif
715                                        xactHashTbl[i]=rval;
716                                        j=-1;
717                                        break;
718                                }
719                        } while (i!=j);
720                }
721                MU_UNLOCK(hlock);
722                if (i==j) {
723                        XDR_DESTROY(&rval->xdrs);
724                        MY_FREE(rval);
725                        return 0;
726                }
727                rval->obuf.xid  = xidUpper[i] | i;
728                rval->xdrpos    = XDR_GETPOS(&(rval->xdrs));
729                rval->obufsize  = size;
730        }
731        return rval;
732}
733
734void
735rpcUdpXactDestroy(RpcUdpXact xact)
736{
737int i = xact->obuf.xid & XACT_HASH_MSK;
738
739#if (DEBUG) & DEBUG_TRACE_XACT
740                fprintf(stderr,"RPCIO: removing index %i, val %x\n",i,xact);
741#endif
742
743                ASSERT( xactHashTbl[i]==xact );
744
745                MU_LOCK(hlock);
746                xactHashTbl[i]=0;
747                /* remember XID we used last time so we can avoid
748                 * reusing the same one (incremented by rpcUdpSend routine)
749                 */
750                xidUpper[i]   = xact->obuf.xid;
751                MU_UNLOCK(hlock);
752
753                bufFree(&xact->ibuf);
754
755                XDR_DESTROY(&xact->xdrs);
756                MY_FREE(xact);
757}
758
759
760
761/* Send a transaction, i.e. enqueue it to the
762 * RPC daemon who will actually send it.
763 */
764enum clnt_stat
765rpcUdpSend(
766        RpcUdpXact              xact,
767        RpcUdpServer    srvr,
768        struct timeval  *timeout,
769        u_long                  proc,
770        xdrproc_t               xres, caddr_t pres,
771        xdrproc_t               xargs, caddr_t pargs,
772        ...
773   )
774{
775register XDR    *xdrs;
776unsigned long   ms;
777va_list                 ap;
778
779        va_start(ap,pargs);
780
781        if (!timeout)
782                timeout = RPCIOD_DEFAULT_TIMEOUT;
783
784        ms = 1000 * timeout->tv_sec + timeout->tv_usec/1000;
785
786        /* round lifetime to closest # of ticks */
787        xact->lifetime  = (ms * ticksPerSec + 500) / 1000;
788        if ( 0 == xact->lifetime )
789                xact->lifetime = 1;
790
791#if (DEBUG) & DEBUG_TIMEOUT
792        {
793        static int once=0;
794        if (!once++) {
795                fprintf(stderr,
796                                "Initial lifetime: %i (ticks)\n",
797                                xact->lifetime);
798        }
799        }
800#endif
801
802        xact->tolive    = xact->lifetime;
803
804        xact->xres      = xres;
805        xact->pres      = pres;
806        xact->server    = srvr;
807
808        xdrs            = &xact->xdrs;
809        xdrs->x_op      = XDR_ENCODE;
810        /* increment transaction ID */
811        xact->obuf.xid += XACT_HASHS;
812        XDR_SETPOS(xdrs, xact->xdrpos);
813        if ( !XDR_PUTLONG(xdrs,(long*)&proc) || !locked_marshal(srvr, xdrs) ||
814                 !xargs(xdrs, pargs) ) {
815                va_end(ap);
816                return(xact->status.re_status=RPC_CANTENCODEARGS);
817        }
818        while ((xargs=va_arg(ap,xdrproc_t))) {
819                if (!xargs(xdrs, va_arg(ap,caddr_t)))
820                va_end(ap);
821                return(xact->status.re_status=RPC_CANTENCODEARGS);
822        }
823
824        va_end(ap);
825
826        rtems_task_ident(RTEMS_SELF, RTEMS_WHO_AM_I, &xact->requestor);
827        if ( rtems_message_queue_send( msgQ, &xact, sizeof(xact)) ) {
828                return RPC_CANTSEND;
829        }
830        /* wakeup the rpciod */
831        ASSERT( RTEMS_SUCCESSFUL==rtems_event_send(rpciod, RPCIOD_TX_EVENT) );
832
833        return RPC_SUCCESS;
834}
835
836/* Block for the RPC reply to an outstanding
837 * transaction.
838 * The caller is woken by the RPC daemon either
839 * upon reception of the reply or on timeout.
840 */
841enum clnt_stat
842rpcUdpRcv(RpcUdpXact xact)
843{
844int                                     refresh;
845XDR                     reply_xdrs;
846struct rpc_msg          reply_msg;
847rtems_status_code       status;
848rtems_event_set         gotEvents;
849
850        refresh = 0;
851
852        do {
853
854        /* block for the reply */
855        status = rtems_event_receive(
856                RTEMS_RPC_EVENT,
857                RTEMS_WAIT | RTEMS_EVENT_ANY,
858                RTEMS_NO_TIMEOUT,
859                &gotEvents);
860        ASSERT( status == RTEMS_SUCCESSFUL );
861
862        if (xact->status.re_status) {
863#ifdef MBUF_RX
864                /* add paranoia */
865                ASSERT( !xact->ibuf );
866#endif
867                return xact->status.re_status;
868        }
869
870#ifdef MBUF_RX
871        xdrmbuf_create(&reply_xdrs, xact->ibuf, XDR_DECODE);
872#else
873        xdrmem_create(&reply_xdrs, xact->ibuf->buf, xact->ibufsize, XDR_DECODE);
874#endif
875
876        reply_msg.acpted_rply.ar_verf          = _null_auth;
877        reply_msg.acpted_rply.ar_results.where = xact->pres;
878        reply_msg.acpted_rply.ar_results.proc  = xact->xres;
879
880        if (xdr_replymsg(&reply_xdrs, &reply_msg)) {
881                /* OK */
882                _seterr_reply(&reply_msg, &xact->status);
883                if (RPC_SUCCESS == xact->status.re_status) {
884                        if ( !locked_validate(xact->server,
885                                                                &reply_msg.acpted_rply.ar_verf) ) {
886                                xact->status.re_status = RPC_AUTHERROR;
887                                xact->status.re_why    = AUTH_INVALIDRESP;
888                        }
889                        if (reply_msg.acpted_rply.ar_verf.oa_base) {
890                                reply_xdrs.x_op = XDR_FREE;
891                                xdr_opaque_auth(&reply_xdrs, &reply_msg.acpted_rply.ar_verf);
892                        }
893                        refresh = 0;
894                } else {
895                        /* should we try to refresh our credentials ? */
896                        if ( !refresh ) {
897                                /* had never tried before */
898                                refresh = RPCIOD_REFRESH;
899                        }
900                }
901        } else {
902                reply_xdrs.x_op        = XDR_FREE;
903                xdr_replymsg(&reply_xdrs, &reply_msg);
904                xact->status.re_status = RPC_CANTDECODERES;
905        }
906        XDR_DESTROY(&reply_xdrs);
907
908        bufFree(&xact->ibuf);
909
910#ifndef MBUF_RX
911        xact->ibufsize = 0;
912#endif
913
914        if (refresh && locked_refresh(xact->server)) {
915                rtems_task_ident(RTEMS_SELF, RTEMS_WHO_AM_I, &xact->requestor);
916                if ( rtems_message_queue_send(msgQ, &xact, sizeof(xact)) ) {
917                        return RPC_CANTSEND;
918                }
919                /* wakeup the rpciod */
920                fprintf(stderr,"RPCIO INFO: refreshing my AUTH\n");
921                ASSERT( RTEMS_SUCCESSFUL==rtems_event_send(rpciod, RPCIOD_TX_EVENT) );
922        }
923
924        } while ( 0 &&  refresh-- > 0 );
925
926        return xact->status.re_status;
927}
928
929
930/* On RTEMS, I'm told to avoid select(); this seems to
931 * be more efficient
932 */
933static void
934rxWakeupCB(struct socket *sock, void *arg)
935{
936  rtems_id *rpciod = (rtems_id*) arg;
937  rtems_event_send(*rpciod, RPCIOD_RX_EVENT);
938}
939
940void
941rpcSetXIDs(uint32_t xid)
942{
943        uint32_t i;
944
945        xid &= ~XACT_HASH_MSK;
946
947        for (i = 0; i < XACT_HASHS; ++i) {
948                xidUpper[i] = xid | i;
949        }
950}
951
952int
953rpcUdpInit(void)
954{
955int                     s;
956rtems_status_code       status;
957int                     noblock = 1;
958struct sockwakeup       wkup;
959
960        if (ourSock < 0) {
961    fprintf(stderr,"RTEMS-RPCIOD $Release$, " \
962            "Till Straumann, Stanford/SLAC/SSRL 2002, " \
963            "See LICENSE file for licensing info.\n");
964
965                ourSock=socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
966                if (ourSock>=0) {
967                        bindresvport(ourSock,(struct sockaddr_in*)0);
968                        s = ioctl(ourSock, FIONBIO, (char*)&noblock);
969                        assert( s == 0 );
970                        /* assume nobody tampers with the clock !! */
971                        ticksPerSec = rtems_clock_get_ticks_per_second();
972                        MU_CREAT( &hlock );
973                        MU_CREAT( &llock );
974
975                        if ( !rpciodPriority ) {
976                                /* use configured networking priority */
977                                if ( ! (rpciodPriority = rtems_bsdnet_config.network_task_priority) )
978                                        rpciodPriority = RPCIOD_PRIO;   /* fallback value */
979                        }
980
981                        status = rtems_task_create(
982                                                                                        rtems_build_name('R','P','C','d'),
983                                                                                        rpciodPriority,
984                                                                                        RPCIOD_STACK,
985                                                                                        RTEMS_DEFAULT_MODES,
986                                                                                        /* fprintf saves/restores FP registers on PPC :-( */
987                                                                                        RTEMS_DEFAULT_ATTRIBUTES | RTEMS_FLOATING_POINT,
988                                                                                        &rpciod);
989                        assert( status == RTEMS_SUCCESSFUL );
990
991#ifdef RTEMS_SMP
992                        if ( rpciodCpuset == 0 ) {
993                                rpciodCpuset = rtems_bsdnet_config.network_task_cpuset;
994                                rpciodCpusetSize = rtems_bsdnet_config.network_task_cpuset_size;
995                        }
996                        if ( rpciodCpuset != 0 )
997                                rtems_task_set_affinity( rpciod, rpciodCpusetSize, rpciodCpuset );
998#endif
999
1000                        wkup.sw_pfn = rxWakeupCB;
1001                        wkup.sw_arg = &rpciod;
1002                        assert( 0==setsockopt(ourSock, SOL_SOCKET, SO_RCVWAKEUP, &wkup, sizeof(wkup)) );
1003                        status = rtems_message_queue_create(
1004                                                                                        rtems_build_name('R','P','C','q'),
1005                                                                                        RPCIOD_QDEPTH,
1006                                                                                        sizeof(RpcUdpXact),
1007                                                                                        RTEMS_DEFAULT_ATTRIBUTES,
1008                                                                                        &msgQ);
1009                        assert( status == RTEMS_SUCCESSFUL );
1010                        status = rtems_task_start( rpciod, rpcio_daemon, 0 );
1011                        assert( status == RTEMS_SUCCESSFUL );
1012
1013                } else {
1014                        return -1;
1015                }
1016        }
1017        return 0;
1018}
1019
1020int
1021rpcUdpCleanup(void)
1022{
1023        rtems_semaphore_create(
1024                        rtems_build_name('R','P','C','f'),
1025                        0,
1026                        RTEMS_DEFAULT_ATTRIBUTES,
1027                        0,
1028                        &fini);
1029        rtems_event_send(rpciod, RPCIOD_KILL_EVENT);
1030        /* synchronize with daemon */
1031        rtems_semaphore_obtain(fini, RTEMS_WAIT, 5*ticksPerSec);
1032        /* if the message queue is still there, something went wrong */
1033        if (!msgQ) {
1034                rtems_task_delete(rpciod);
1035        }
1036        rtems_semaphore_delete(fini);
1037        return (msgQ !=0);
1038}
1039
1040/* Another API - simpler but less efficient.
1041 * For each RPCall, a server and a Xact
1042 * are created and destroyed on the fly.
1043 *
1044 * This should be used for infrequent calls
1045 * (e.g. a NFS mount request).
1046 *
1047 * This is roughly compatible with the original
1048 * clnt_call() etc. API - but it uses our
1049 * daemon and is fully reentrant.
1050 */
1051enum clnt_stat
1052rpcUdpClntCreate(
1053                struct sockaddr_in      *psaddr,
1054                rpcprog_t               prog,
1055                rpcvers_t               vers,
1056                u_long                  uid,
1057                u_long                  gid,
1058                RpcUdpClnt              *pclnt
1059)
1060{
1061RpcUdpXact              x;
1062RpcUdpServer    s;
1063enum clnt_stat  err;
1064
1065        if ( RPC_SUCCESS != (err=rpcUdpServerCreate(psaddr, prog, vers, uid, gid, &s)) )
1066                return err;
1067
1068        if ( !(x=rpcUdpXactCreate(prog, vers, UDPMSGSIZE)) ) {
1069                rpcUdpServerDestroy(s);
1070                return RPC_FAILED;
1071        }
1072        /* TODO: could maintain a server cache */
1073
1074        x->server = s;
1075
1076        *pclnt = x;
1077
1078        return RPC_SUCCESS;
1079}
1080
1081static void
1082rpcUdpClntDestroy(RpcUdpClnt xact)
1083{
1084        rpcUdpServerDestroy(xact->server);
1085        rpcUdpXactDestroy(xact);
1086}
1087
1088enum clnt_stat
1089rpcUdpClntCall(
1090        RpcUdpClnt              xact,
1091        u_long                  proc,
1092        XdrProcT                xargs,
1093        CaddrT                  pargs,
1094        XdrProcT                xres,
1095        CaddrT                  pres,
1096        struct timeval  *timeout
1097        )
1098{
1099enum clnt_stat  stat;
1100
1101                if ( (stat = rpcUdpSend(xact, xact->server, timeout, proc,
1102                                                                xres, pres,
1103                                                                xargs, pargs,
1104                                                                0)) ) {
1105                        fprintf(stderr,"RPCIO Send failed: %i\n",stat);
1106                        return stat;
1107                }
1108                return rpcUdpRcv(xact);
1109}
1110
1111/* a yet simpler interface */
1112enum clnt_stat
1113rpcUdpCallRp(
1114        struct sockaddr_in      *psrvr,
1115        u_long                          prog,
1116        u_long                          vers,
1117        u_long                          proc,
1118        XdrProcT                        xargs,
1119        CaddrT                          pargs,
1120        XdrProcT                        xres,
1121        CaddrT                          pres,
1122        u_long                          uid,            /* RPCIO_DEFAULT_ID picks default */
1123        u_long                          gid,            /* RPCIO_DEFAULT_ID picks default */
1124        struct timeval          *timeout        /* NULL picks default           */
1125)
1126{
1127RpcUdpClnt                      clp;
1128enum clnt_stat          stat;
1129
1130        stat = rpcUdpClntCreate(
1131                                psrvr,
1132                                prog,
1133                                vers,
1134                                uid,
1135                                gid,
1136                                &clp);
1137
1138        if ( RPC_SUCCESS != stat )
1139                return stat;
1140
1141        stat = rpcUdpClntCall(
1142                                clp,
1143                                proc,
1144                                xargs, pargs,
1145                                xres,  pres,
1146                                timeout);
1147
1148        rpcUdpClntDestroy(clp);
1149
1150        return stat;
1151}
1152
1153/* linked list primitives */
1154static void
1155nodeXtract(ListNode n)
1156{
1157        if (n->prev)
1158                n->prev->next = n->next;
1159        if (n->next)
1160                n->next->prev = n->prev;
1161        n->next = n->prev = 0;
1162}
1163
1164static void
1165nodeAppend(ListNode l, ListNode n)
1166{
1167        if ( (n->next = l->next) )
1168                n->next->prev = n;
1169        l->next = n;
1170        n->prev = l;
1171
1172}
1173
1174/* this code does the work */
1175static void
1176rpcio_daemon(rtems_task_argument arg)
1177{
1178rtems_status_code stat;
1179RpcUdpXact        xact;
1180RpcUdpServer      srv;
1181rtems_interval    next_retrans, then, unow;
1182long                                            now;    /* need to do signed comparison with age! */
1183rtems_event_set   events;
1184ListNode          newList;
1185size_t            size;
1186rtems_id          q          =  0;
1187ListNodeRec       listHead   = {0, 0};
1188unsigned long     epoch      = RPCIOD_EPOCH_SECS * ticksPerSec;
1189unsigned long                   max_period = RPCIOD_RETX_CAP_S * ticksPerSec;
1190rtems_status_code       status;
1191
1192
1193        then = rtems_clock_get_ticks_since_boot();
1194
1195        for (next_retrans = epoch;;) {
1196
1197                if ( RTEMS_SUCCESSFUL !=
1198                         (stat = rtems_event_receive(
1199                                                RPCIOD_RX_EVENT | RPCIOD_TX_EVENT | RPCIOD_KILL_EVENT,
1200                                                RTEMS_WAIT | RTEMS_EVENT_ANY,
1201                                                next_retrans,
1202                                                &events)) ) {
1203                        ASSERT( RTEMS_TIMEOUT == stat );
1204                        events = 0;
1205                }
1206
1207                if (events & RPCIOD_KILL_EVENT) {
1208                        int i;
1209
1210#if (DEBUG) & DEBUG_EVENTS
1211                        fprintf(stderr,"RPCIO: got KILL event\n");
1212#endif
1213
1214                        MU_LOCK(hlock);
1215                        for (i=XACT_HASHS-1; i>=0; i--) {
1216                                if (xactHashTbl[i]) {
1217                                        break;
1218                                }
1219                        }
1220                        if (i<0) {
1221                                /* prevent them from creating and enqueueing more messages */
1222                                q=msgQ;
1223                                /* messages queued after we executed this assignment will fail */
1224                                msgQ=0;
1225                        }
1226                        MU_UNLOCK(hlock);
1227                        if (i>=0) {
1228                                fprintf(stderr,"RPCIO There are still transactions circulating; I refuse to go away\n");
1229                                fprintf(stderr,"(1st in slot %i)\n",i);
1230                                rtems_semaphore_release(fini);
1231                        } else {
1232                                break;
1233                        }
1234                }
1235
1236                unow = rtems_clock_get_ticks_since_boot();
1237
1238                /* measure everything relative to then to protect against
1239                 * rollover
1240                 */
1241                now = unow - then;
1242
1243                /* NOTE: we don't lock the hash table while we are operating
1244                 * on transactions; the paradigm is that we 'own' a particular
1245                 * transaction (and hence it's hash table slot) from the
1246                 * time the xact was put into the message queue until we
1247                 * wake up the requestor.
1248                 */
1249
1250                if (RPCIOD_RX_EVENT & events) {
1251
1252#if (DEBUG) & DEBUG_EVENTS
1253                        fprintf(stderr,"RPCIO: got RX event\n");
1254#endif
1255
1256                        while ((xact=sockRcv())) {
1257
1258                                /* extract from the retransmission list */
1259                                nodeXtract(&xact->node);
1260
1261                                /* change the ID - there might already be
1262                                 * a retransmission on the way. When it's
1263                                 * reply arrives we must not find it's ID
1264                                 * in the hashtable
1265                                 */
1266                                xact->obuf.xid        += XACT_HASHS;
1267
1268                                xact->status.re_status = RPC_SUCCESS;
1269
1270                                /* calculate roundtrip ticks */
1271                                xact->trip             = now - xact->trip;
1272
1273                                srv                    = xact->server;
1274
1275                                /* adjust the server's retry period */
1276                                {
1277                                        register TimeoutT rtry = srv->retry_period;
1278                                        register TimeoutT trip = xact->trip;
1279
1280                                        ASSERT( trip >= 0 );
1281
1282                                        if ( 0==trip )
1283                                                trip = 1;
1284
1285                                        /* retry_new = 0.75*retry_old + 0.25 * 8 * roundrip */
1286                                        rtry   = (3*rtry + (trip << 3)) >> 2;
1287
1288                                        if ( rtry > max_period )
1289                                                rtry = max_period;
1290
1291                                        srv->retry_period = rtry;
1292                                }
1293
1294                                /* wakeup requestor */
1295                                rtems_event_send(xact->requestor, RTEMS_RPC_EVENT);
1296                        }
1297                }
1298
1299                if (RPCIOD_TX_EVENT & events) {
1300
1301#if (DEBUG) & DEBUG_EVENTS
1302                        fprintf(stderr,"RPCIO: got TX event\n");
1303#endif
1304
1305                        while (RTEMS_SUCCESSFUL == rtems_message_queue_receive(
1306                                                                                        msgQ,
1307                                                                                        &xact,
1308                                                                                        &size,
1309                                                                                        RTEMS_NO_WAIT,
1310                                                                                        RTEMS_NO_TIMEOUT)) {
1311                                /* put to the head of timeout q */
1312                                nodeAppend(&listHead, &xact->node);
1313
1314                                xact->age  = now;
1315                                xact->trip = FIRST_ATTEMPT;
1316                        }
1317                }
1318
1319
1320                /* work the timeout q */
1321                newList = 0;
1322                for ( xact=(RpcUdpXact)listHead.next;
1323                          xact && xact->age <= now;
1324                          xact=(RpcUdpXact)listHead.next ) {
1325
1326                                /* extract from the list */
1327                                nodeXtract(&xact->node);
1328
1329                                srv = xact->server;
1330
1331                                if (xact->tolive < 0) {
1332                                        /* this one timed out */
1333                                        xact->status.re_errno  = ETIMEDOUT;
1334                                        xact->status.re_status = RPC_TIMEDOUT;
1335
1336                                        srv->timeouts++;
1337
1338                                        /* Change the ID - there might still be
1339                                         * a reply on the way. When it arrives we
1340                                         * must not find it's ID in the hash table
1341                                         *
1342                                         * Thanks to Steven Johnson for hunting this
1343                                         * one down.
1344                                         */
1345                                        xact->obuf.xid        += XACT_HASHS;
1346
1347#if (DEBUG) & DEBUG_TIMEOUT
1348                                        fprintf(stderr,"RPCIO XACT timed out; waking up requestor\n");
1349#endif
1350                                        if ( rtems_event_send(xact->requestor, RTEMS_RPC_EVENT) ) {
1351                                                rtems_panic("RPCIO PANIC: requestor id was 0x%08x",
1352                                                                        xact->requestor);
1353                                        }
1354
1355                                } else {
1356                                        int len;
1357
1358                                        len = (int)XDR_GETPOS(&xact->xdrs);
1359
1360#ifdef MBUF_TX
1361                                        xact->refcnt = 1;       /* sendto itself */
1362#endif
1363                                        if ( len != SENDTO( ourSock,
1364                                                                                xact->obuf.buf,
1365                                                                                len,
1366                                                                                0,
1367                                                                                &srv->addr.sa,
1368                                                                                sizeof(srv->addr.sin)
1369#ifdef MBUF_TX
1370                                                                                , xact,
1371                                                                                paranoia_free,
1372                                                                                paranoia_ref
1373#endif
1374                                                                                ) ) {
1375
1376                                                xact->status.re_errno  = errno;
1377                                                xact->status.re_status = RPC_CANTSEND;
1378                                                srv->errors++;
1379
1380                                                /* wakeup requestor */
1381                                                fprintf(stderr,"RPCIO: SEND failure\n");
1382                                                status = rtems_event_send(xact->requestor, RTEMS_RPC_EVENT);
1383                                                assert( status == RTEMS_SUCCESSFUL );
1384
1385                                        } else {
1386                                                /* send successful; calculate retransmission time
1387                                                 * and enqueue to temporary list
1388                                                 */
1389                                                if (FIRST_ATTEMPT != xact->trip) {
1390#if (DEBUG) & DEBUG_TIMEOUT
1391                                                        fprintf(stderr,
1392                                                                "timed out; tolive is %i (ticks), retry period is %i (ticks)\n",
1393                                                                        xact->tolive,
1394                                                                        srv->retry_period);
1395#endif
1396                                                        /* this is a real retry; we backup
1397                                                         * the server's retry interval
1398                                                         */
1399                                                        if ( srv->retry_period < max_period ) {
1400
1401                                                                /* If multiple transactions for this server
1402                                                                 * fail (e.g. because it died) this will
1403                                                                 * back-off very agressively (doubling
1404                                                                 * the retransmission period for every
1405                                                                 * timed out transaction up to the CAP limit)
1406                                                                 * which is desirable - single packet failure
1407                                                                 * is treated more gracefully by this algorithm.
1408                                                                 */
1409
1410                                                                srv->retry_period<<=1;
1411#if (DEBUG) & DEBUG_TIMEOUT
1412                                                                fprintf(stderr,
1413                                                                                "adjusted to; retry period %i\n",
1414                                                                                srv->retry_period);
1415#endif
1416                                                        } else {
1417                                                                /* never wait longer than RPCIOD_RETX_CAP_S seconds */
1418                                                                fprintf(stderr,
1419                                                                                "RPCIO: server '%s' not responding - still trying\n",
1420                                                                                srv->name);
1421                                                        }
1422                                                        if ( 0 == ++srv->retrans % 1000) {
1423                                                                fprintf(stderr,
1424                                                                                "RPCIO - statistics: already %li retries to server %s\n",
1425                                                                                srv->retrans,
1426                                                                                srv->name);
1427                                                        }
1428                                                } else {
1429                                                        srv->requests++;
1430                                                }
1431                                                xact->trip      = now;
1432                                                {
1433                                                long capped_period = srv->retry_period;
1434                                                        if ( xact->lifetime < capped_period )
1435                                                                capped_period = xact->lifetime;
1436                                                xact->age       = now + capped_period;
1437                                                xact->tolive   -= capped_period;
1438                                                }
1439                                                /* enqueue to the list of newly sent transactions */
1440                                                xact->node.next = newList;
1441                                                newList         = &xact->node;
1442#if (DEBUG) & DEBUG_TIMEOUT
1443                                                fprintf(stderr,
1444                                                                "XACT (0x%08x) age is 0x%x, now: 0x%x\n",
1445                                                                xact,
1446                                                                xact->age,
1447                                                                now);
1448#endif
1449                                        }
1450                                }
1451            }
1452
1453                /* insert the newly sent transactions into the
1454                 * sorted retransmission list
1455                 */
1456                for (; (xact = (RpcUdpXact)newList); ) {
1457                        register ListNode p,n;
1458                        newList = newList->next;
1459                        for ( p=&listHead; (n=p->next) && xact->age > ((RpcUdpXact)n)->age; p=n )
1460                                /* nothing else to do */;
1461                        nodeAppend(p, &xact->node);
1462                }
1463
1464                if (now > epoch) {
1465                        /* every now and then, readjust the epoch */
1466                        register ListNode n;
1467                        then += now;
1468                        for (n=listHead.next; n; n=n->next) {
1469                                /* readjust outstanding time intervals subject to the
1470                                 * condition that the 'absolute' time must remain
1471                                 * the same. 'age' and 'trip' are measured with
1472                                 * respect to 'then' - hence:
1473                                 *
1474                                 * abs_age == old_age + old_then == new_age + new_then
1475                                 *
1476                                 * ==> new_age = old_age + old_then - new_then == old_age - 'now'
1477                                 */
1478                                ((RpcUdpXact)n)->age  -= now;
1479                                ((RpcUdpXact)n)->trip -= now;
1480#if (DEBUG) & DEBUG_TIMEOUT
1481                                fprintf(stderr,
1482                                                "readjusted XACT (0x%08x); age is 0x%x, trip: 0x%x now: 0x%x\n",
1483                                                (RpcUdpXact)n,
1484                                                ((RpcUdpXact)n)->trip,
1485                                                ((RpcUdpXact)n)->age,
1486                                                now);
1487#endif
1488                        }
1489                        now = 0;
1490                }
1491
1492                next_retrans = listHead.next ?
1493                                                        ((RpcUdpXact)listHead.next)->age - now :
1494                                                        epoch;  /* make sure we don't miss updating the epoch */
1495#if (DEBUG) & DEBUG_TIMEOUT
1496                fprintf(stderr,"RPCIO: next timeout is %x\n",next_retrans);
1497#endif
1498        }
1499        /* close our socket; shut down the receiver */
1500        close(ourSock);
1501
1502#if 0 /* if we get here, no transactions exist, hence there can be none
1503           * in the queue whatsoever
1504           */
1505        /* flush the message queue */
1506        while (RTEMS_SUCCESSFUL == rtems_message_queue_receive(
1507                                                                                q,
1508                                                                                &xact,
1509                                                                                &size,
1510                                                                                RTEMS_NO_WAIT,
1511                                                                                RTEMS_NO_TIMEOUT)) {
1512                        /* TODO enque xact */
1513        }
1514
1515        /* flush all outstanding transactions */
1516
1517        for (xact=((RpcUdpXact)listHead.next); xact; xact=((RpcUdpXact)xact->node.next)) {
1518                        xact->status.re_status = RPC_TIMEDOUT;
1519                        rtems_event_send(xact->requestor, RTEMS_RPC_EVENT);
1520        }
1521#endif
1522
1523        rtems_message_queue_delete(q);
1524
1525        MU_DESTROY(hlock);
1526
1527        fprintf(stderr,"RPC daemon exited...\n");
1528
1529        rtems_semaphore_release(fini);
1530        rtems_task_suspend(RTEMS_SELF);
1531}
1532
1533
1534/* support for transaction 'pools'. A number of XACT objects
1535 * is always kept around. The initial number is 0 but it
1536 * is allowed to grow up to a maximum.
1537 * If the need grows beyond the maximum, behavior depends:
1538 * Users can either block until a transaction becomes available,
1539 * they can create a new XACT on the fly or get an error
1540 * if no free XACT is available from the pool.
1541 */
1542
1543RpcUdpXactPool
1544rpcUdpXactPoolCreate(
1545        rpcprog_t prog,                 rpcvers_t version,
1546        int xactsize,   int poolsize)
1547{
1548RpcUdpXactPool  rval = MY_MALLOC(sizeof(*rval));
1549rtems_status_code       status;
1550
1551        ASSERT( rval );
1552        status = rtems_message_queue_create(
1553                                        rtems_build_name('R','P','C','p'),
1554                                        poolsize,
1555                                        sizeof(RpcUdpXact),
1556                                        RTEMS_DEFAULT_ATTRIBUTES,
1557                                        &rval->box);
1558        assert( status == RTEMS_SUCCESSFUL );
1559
1560        rval->prog     = prog;
1561        rval->version  = version;
1562        rval->xactSize = xactsize;
1563        return rval;
1564}
1565
1566void
1567rpcUdpXactPoolDestroy(RpcUdpXactPool pool)
1568{
1569RpcUdpXact xact;
1570
1571        while ((xact = rpcUdpXactPoolGet(pool, XactGetFail))) {
1572                rpcUdpXactDestroy(xact);
1573        }
1574        rtems_message_queue_delete(pool->box);
1575        MY_FREE(pool);
1576}
1577
1578RpcUdpXact
1579rpcUdpXactPoolGet(RpcUdpXactPool pool, XactPoolGetMode mode)
1580{
1581RpcUdpXact       xact = 0;
1582size_t           size;
1583
1584        if (RTEMS_SUCCESSFUL != rtems_message_queue_receive(
1585                                                                pool->box,
1586                                                                &xact,
1587                                                                &size,
1588                                                                XactGetWait == mode ?
1589                                                                        RTEMS_WAIT : RTEMS_NO_WAIT,
1590                                                                RTEMS_NO_TIMEOUT)) {
1591
1592                /* nothing found in box; should we create a new one ? */
1593
1594                xact = (XactGetCreate == mode) ?
1595                                        rpcUdpXactCreate(
1596                                                        pool->prog,
1597                                                        pool->version,
1598                                                        pool->xactSize) : 0 ;
1599                if (xact)
1600                                xact->pool = pool;
1601
1602        }
1603        return xact;
1604}
1605
1606void
1607rpcUdpXactPoolPut(RpcUdpXact xact)
1608{
1609RpcUdpXactPool pool;
1610
1611        pool = xact->pool;
1612        ASSERT( pool );
1613
1614        if (RTEMS_SUCCESSFUL != rtems_message_queue_send(
1615                                                                pool->box,
1616                                                                &xact,
1617                                                                sizeof(xact)))
1618                rpcUdpXactDestroy(xact);
1619}
1620
1621#ifdef MBUF_RX
1622
1623/* WORKAROUND: include sys/mbuf.h (or other bsdnet headers) only
1624 *             _after_ using malloc()/free() & friends because
1625 *             the RTEMS/BSDNET headers redefine those :-(
1626 */
1627
1628#define _KERNEL
1629#include <sys/mbuf.h>
1630
1631ssize_t
1632recv_mbuf_from(int s, struct mbuf **ppm, long len, struct sockaddr *fromaddr, int *fromlen);
1633
1634static void
1635bufFree(struct mbuf **m)
1636{
1637        if (*m) {
1638                rtems_bsdnet_semaphore_obtain();
1639                m_freem(*m);
1640                rtems_bsdnet_semaphore_release();
1641                *m = 0;
1642        }
1643}
1644#endif
1645
1646#ifdef MBUF_TX
1647static void
1648paranoia_free(caddr_t closure, u_int size)
1649{
1650#if (DEBUG)
1651RpcUdpXact xact = (RpcUdpXact)closure;
1652int        len  = (int)XDR_GETPOS(&xact->xdrs);
1653
1654        ASSERT( --xact->refcnt >= 0 && size == len );
1655#endif
1656}
1657
1658static void
1659paranoia_ref (caddr_t closure, u_int size)
1660{
1661#if (DEBUG)
1662RpcUdpXact xact = (RpcUdpXact)closure;
1663int        len  = (int)XDR_GETPOS(&xact->xdrs);
1664        ASSERT( size == len );
1665        xact->refcnt++;
1666#endif
1667}
1668#endif
1669
1670/* receive from a socket and find
1671 * the transaction corresponding to the
1672 * transaction ID received in the server
1673 * reply.
1674 *
1675 * The semantics of the 'pibuf' pointer are
1676 * as follows:
1677 *
1678 * MBUF_RX:
1679 *
1680 */
1681
1682#define RPCIOD_RXBUFSZ  UDPMSGSIZE
1683
1684static RpcUdpXact
1685sockRcv(void)
1686{
1687int                                     len,i;
1688uint32_t                                xid;
1689union {
1690        struct sockaddr_in      sin;
1691        struct sockaddr     sa;
1692}                                       fromAddr;
1693int                                     fromLen  = sizeof(fromAddr.sin);
1694RxBuf                           ibuf     = 0;
1695RpcUdpXact                      xact     = 0;
1696
1697        do {
1698
1699        /* rcv_mbuf() and recvfrom() differ in that the
1700         * former allocates buffers and passes them back
1701         * to us whereas the latter requires us to provide
1702         * buffer space.
1703         * Hence, in the first case whe have to make sure
1704         * no old buffer is leaked - in the second case,
1705         * we might well re-use an old buffer but must
1706         * make sure we have one allocated
1707         */
1708#ifdef MBUF_RX
1709        if (ibuf)
1710                bufFree(&ibuf);
1711
1712        len  = recv_mbuf_from(
1713                                        ourSock,
1714                                        &ibuf,
1715                                        RPCIOD_RXBUFSZ,
1716                                    &fromAddr.sa,
1717                                    &fromLen);
1718#else
1719        if ( !ibuf )
1720                ibuf = (RpcBuf)MY_MALLOC(RPCIOD_RXBUFSZ);
1721        if ( !ibuf )
1722                goto cleanup; /* no memory - drop this message */
1723
1724        len  = recvfrom(ourSock,
1725                                    ibuf->buf,
1726                                    RPCIOD_RXBUFSZ,
1727                                    0,
1728                                    &fromAddr.sa,
1729                                        &fromLen);
1730#endif
1731
1732        if (len <= 0) {
1733                if (EAGAIN != errno)
1734                        fprintf(stderr,"RECV failed: %s\n",strerror(errno));
1735                goto cleanup;
1736        }
1737
1738#if (DEBUG) & DEBUG_PACKLOSS
1739        if ( (unsigned)rand() < DEBUG_PACKLOSS_FRACT ) {
1740                /* lose packets once in a while */
1741                static int xxx = 0;
1742                if ( ++xxx % 16 == 0 )
1743                        fprintf(stderr,"DEBUG: dropped %i packets, so far...\n",xxx);
1744                if ( ibuf )
1745                        bufFree( &ibuf );
1746                continue;
1747        }
1748#endif
1749
1750        i = (xid=XID(ibuf)) & XACT_HASH_MSK;
1751
1752        if ( !(xact=xactHashTbl[i])                                             ||
1753                   xact->obuf.xid                     != xid                        ||
1754#ifdef REJECT_SERVERIP_MISMATCH
1755                   xact->server->addr.sin.sin_addr.s_addr != fromAddr.sin.sin_addr.s_addr       ||
1756#endif
1757                   xact->server->addr.sin.sin_port        != fromAddr.sin.sin_port ) {
1758
1759                if (xact) {
1760                        if (
1761#ifdef REJECT_SERVERIP_MISMATCH
1762                            xact->server->addr.sin.sin_addr.s_addr == fromAddr.sin.sin_addr.s_addr &&
1763#endif
1764                        xact->server->addr.sin.sin_port        == fromAddr.sin.sin_port        &&
1765                            ( xact->obuf.xid                   == xid + XACT_HASHS   ||
1766                                  xact->obuf.xid                   == xid + 2*XACT_HASHS    )
1767                                ) {
1768#ifndef DEBUG /* don't complain if it's just a late arrival of a retry */
1769                        fprintf(stderr,"RPCIO - FYI sockRcv(): dropping late/redundant retry answer\n");
1770#endif
1771                        } else {
1772                        fprintf(stderr,"RPCIO WARNING sockRcv(): transaction mismatch\n");
1773                        fprintf(stderr,"xact: xid  0x%08" PRIx32 "  -- got 0x%08" PRIx32 "\n",
1774                                                        xact->obuf.xid, xid);
1775                        fprintf(stderr,"xact: addr 0x%08" PRIx32 "  -- got 0x%08" PRIx32 "\n",
1776                                                        xact->server->addr.sin.sin_addr.s_addr,
1777                                                        fromAddr.sin.sin_addr.s_addr);
1778                        fprintf(stderr,"xact: port 0x%08x  -- got 0x%08x\n",
1779                                                        xact->server->addr.sin.sin_port,
1780                                                        fromAddr.sin.sin_port);
1781                        }
1782                } else {
1783                        fprintf(stderr,
1784                                        "RPCIO WARNING sockRcv(): got xid 0x%08" PRIx32 " but its slot is empty\n",
1785                                        xid);
1786                }
1787                /* forget about this one and try again */
1788                xact = 0;
1789        }
1790
1791        } while ( !xact );
1792
1793        xact->ibuf     = ibuf;
1794#ifndef MBUF_RX
1795        xact->ibufsize = RPCIOD_RXBUFSZ;
1796#endif
1797
1798        return xact;
1799
1800cleanup:
1801
1802        bufFree(&ibuf);
1803
1804        return 0;
1805}
1806
1807
1808#include <rtems/rtems_bsdnet_internal.h>
1809/* double check the event configuration; should probably globally
1810 * manage system events!!
1811 * We do this at the end of the file for the same reason we had
1812 * included mbuf.h only a couple of lines above - see comment up
1813 * there...
1814 */
1815#if RTEMS_RPC_EVENT & SOSLEEP_EVENT & SBWAIT_EVENT & NETISR_EVENTS
1816#error ILLEGAL EVENT CONFIGURATION
1817#endif
Note: See TracBrowser for help on using the repository browser.