source: rtems/cpukit/libfs/src/nfsclient/src/rpcio.c @ 4a3db517

4.115
Last change on this file since 4a3db517 was e4d8513, checked in by Daniel Cederman <cederman@…>, on 11/14/14 at 07:58:00

nfs: Add RPCd task affinity config option

Similar to the task priority option, the new CPU affinity
option is first controlled by the RPCI specific rpciodCpuset
option. If that is not set, it uses the global network task config.
If that is also not set, it falls back to not setting the affinity
at all, using all CPUs.

  • Property mode set to 100644
File size: 44.9 KB
Line 
1/**
2 * @file
3 *
4 * @brief RPC Multiplexor for a Multitasking Environment
5 * @ingroup libfs
6 *
7 * This code funnels arbitrary task's UDP/RPC requests
8 * through one socket to arbitrary servers.
9 * The replies are gathered and dispatched to the
10 * requestors.
11 * One task handles all the sending and receiving
12 * work including retries.
13 * It is up to the requestor, however, to do
14 * the XDR encoding of the arguments / decoding
15 * of the results (except for the RPC header which
16 * is handled by the daemon).
17 */
18
19/*
20 * Author: Till Straumann <strauman@slac.stanford.edu>, 2002
21 *
22 * Authorship
23 * ----------
24 * This software (NFS-2 client implementation for RTEMS) was created by
25 *     Till Straumann <strauman@slac.stanford.edu>, 2002-2007,
26 *         Stanford Linear Accelerator Center, Stanford University.
27 *
28 * Acknowledgement of sponsorship
29 * ------------------------------
30 * The NFS-2 client implementation for RTEMS was produced by
31 *     the Stanford Linear Accelerator Center, Stanford University,
32 *         under Contract DE-AC03-76SFO0515 with the Department of Energy.
33 *
34 * Government disclaimer of liability
35 * ----------------------------------
36 * Neither the United States nor the United States Department of Energy,
37 * nor any of their employees, makes any warranty, express or implied, or
38 * assumes any legal liability or responsibility for the accuracy,
39 * completeness, or usefulness of any data, apparatus, product, or process
40 * disclosed, or represents that its use would not infringe privately owned
41 * rights.
42 *
43 * Stanford disclaimer of liability
44 * --------------------------------
45 * Stanford University makes no representations or warranties, express or
46 * implied, nor assumes any liability for the use of this software.
47 *
48 * Stanford disclaimer of copyright
49 * --------------------------------
50 * Stanford University, owner of the copyright, hereby disclaims its
51 * copyright and all other rights in this software.  Hence, anyone may
52 * freely use it for any purpose without restriction.
53 *
54 * Maintenance of notices
55 * ----------------------
56 * In the interest of clarity regarding the origin and status of this
57 * SLAC software, this and all the preceding Stanford University notices
58 * are to remain affixed to any copy or derivative of this software made
59 * or distributed by the recipient and are to be affixed to any copy of
60 * software made or distributed by the recipient that contains a copy or
61 * derivative of this software.
62 *
63 * ------------------ SLAC Software Notices, Set 4 OTT.002a, 2004 FEB 03
64 */
65
66#if HAVE_CONFIG_H
67#include "config.h"
68#endif
69
70#include <inttypes.h>
71
72#include <rtems.h>
73#include <rtems/error.h>
74#include <rtems/rtems_bsdnet.h>
75#include <stdlib.h>
76#include <time.h>
77#include <rpc/rpc.h>
78#include <rpc/pmap_prot.h>
79#include <errno.h>
80#include <sys/ioctl.h>
81#include <assert.h>
82#include <stdio.h>
83#include <errno.h>
84#include <string.h>
85#include <netinet/in.h>
86#include <arpa/inet.h>
87#include <sys/cpuset.h>
88
89#include "rpcio.h"
90
91/****************************************************************/
92/* CONFIGURABLE PARAMETERS                                      */
93/****************************************************************/
94
95#define MBUF_RX                 /* If defined: use mbuf XDR stream for
96                                                 *  decoding directly out of mbufs
97                                                 *  Otherwise, the regular 'recvfrom()'
98                                                 *  interface will be used involving an
99                                                 *  extra buffer allocation + copy step.
100                                                 */
101
102#define MBUF_TX                 /* If defined: avoid copying data when
103                                                 *  sending. Instead, use a wrapper to
104                                                 *  'sosend()' which will point an MBUF
105                                                 *  directly to our buffer space.
106                                                 *  Note that the BSD stack does not copy
107                                                 *  data when fragmenting packets - it
108                                                 *  merely uses an mbuf chain pointing
109                                                 *  into different areas of the data.
110                                                 *
111                                                 * If undefined, the regular 'sendto()'
112                                                 *  interface is used.
113                                                 */
114
115#undef REJECT_SERVERIP_MISMATCH
116                                                /* If defined, RPC replies must come from the server
117                                                 * that was queried. Eric Norum has reported problems
118                                                 * with clustered NFS servers. So we disable this
119                                                 * reducing paranoia...
120                                                 */
121
122/* daemon task parameters */
123#define RPCIOD_STACK            10000
124#define RPCIOD_PRIO                     100     /* *fallback* priority */
125
126/* depth of the message queue for sending
127 * RPC requests to the daemon
128 */
129#define RPCIOD_QDEPTH           20
130
131/* Maximum retry limit for retransmission */
132#define RPCIOD_RETX_CAP_S       3 /* seconds */
133
134/* Default timeout for RPC calls */
135#define RPCIOD_DEFAULT_TIMEOUT  (&_rpc_default_timeout)
136static struct timeval _rpc_default_timeout = { 10 /* secs */, 0 /* usecs */ };
137
138/* how many times should we try to resend a failed
139 * transaction with refreshed AUTHs
140 */
141#define RPCIOD_REFRESH          2
142
143/* Events we are using; the RPC_EVENT
144 * MUST NOT be used by any application
145 * thread doing RPC IO (e.g. NFS)
146 */
147#define RTEMS_RPC_EVENT         RTEMS_EVENT_30  /* THE event used by RPCIO. Every task doing
148                                                                                         * RPC IO will receive this - hence it is
149                                                                                         * RESERVED
150                                                                                         */
151#define RPCIOD_RX_EVENT         RTEMS_EVENT_1   /* Events the RPCIOD is using/waiting for */
152#define RPCIOD_TX_EVENT         RTEMS_EVENT_2
153#define RPCIOD_KILL_EVENT       RTEMS_EVENT_3   /* send to the daemon to kill it          */
154
155#define LD_XACT_HASH            8                               /* ld of the size of the transaction hash table  */
156
157
158/* Debugging Flags                                              */
159
160/* NOTE: defining DEBUG 0 leaves some 'assert()' paranoia checks
161 *       but produces no output
162 */
163
164#define DEBUG_TRACE_XACT        (1<<0)
165#define DEBUG_EVENTS            (1<<1)
166#define DEBUG_MALLOC            (1<<2)
167#define DEBUG_TIMEOUT           (1<<3)
168#define DEBUG_PACKLOSS          (1<<4)  /* This introduces random, artificial packet losses to test retransmission */
169
170#define DEBUG_PACKLOSS_FRACT (0xffffffff/10)
171
172/* USE PARENTHESIS WHEN 'or'ing MULTIPLE FLAGS: (DEBUG_XX | DEBUG_YY) */
173#define DEBUG                           (0)
174
175/****************************************************************/
176/* END OF CONFIGURABLE SECTION                                  */
177/****************************************************************/
178
179/* prevent rollover of our timers by readjusting the epoch on the fly */
180#if     (DEBUG) & DEBUG_TIMEOUT
181#define RPCIOD_EPOCH_SECS       10
182#else
183#define RPCIOD_EPOCH_SECS       10000
184#endif
185
186#ifdef  DEBUG
187#define ASSERT(arg)                     assert(arg)
188#else
189#define ASSERT(arg)                     if (arg)
190#endif
191
192/****************************************************************/
193/* MACROS                                                       */
194/****************************************************************/
195
196
197#define XACT_HASHS              (1<<(LD_XACT_HASH))     /* the hash table size derived from the ld       */
198#define XACT_HASH_MSK   ((XACT_HASHS)-1)        /* mask to extract the hash index from a RPC-XID */
199
200
201#define MU_LOCK(mutex)          do {                                                    \
202                                                        assert(                                                 \
203                                                                RTEMS_SUCCESSFUL ==                     \
204                                                                rtems_semaphore_obtain(         \
205                                                                                (mutex),                        \
206                                                                                RTEMS_WAIT,                     \
207                                                                                RTEMS_NO_TIMEOUT        \
208                                                                                ) );                            \
209                                                        } while(0)
210
211#define MU_UNLOCK(mutex)        do {                                                    \
212                                                        assert(                                                 \
213                                                                RTEMS_SUCCESSFUL ==                     \
214                                                                rtems_semaphore_release(        \
215                                                                                (mutex)                         \
216                                                                                ) );                            \
217                                                        } while(0)
218
219#define MU_CREAT(pmutex)        do {                                                    \
220                                                        assert(                                                 \
221                                                                RTEMS_SUCCESSFUL ==                     \
222                                                                rtems_semaphore_create(         \
223                                                                                rtems_build_name(       \
224                                                                                        'R','P','C','l' \
225                                                                                        ),                              \
226                                                                                1,                                      \
227                                                                                MUTEX_ATTRIBUTES,       \
228                                                                                0,                                      \
229                                                                                (pmutex)) );            \
230                                                        } while (0)
231
232
233#define MU_DESTROY(mutex)       do {                                                    \
234                                                        assert(                                                 \
235                                                                RTEMS_SUCCESSFUL ==                     \
236                                                                rtems_semaphore_delete(         \
237                                                                                mutex                           \
238                                                                                ) );                            \
239                                                        } while (0)
240
241#define MUTEX_ATTRIBUTES        (RTEMS_LOCAL           |                \
242                                                        RTEMS_PRIORITY         |                \
243                                                        RTEMS_INHERIT_PRIORITY |                \
244                                                        RTEMS_BINARY_SEMAPHORE)
245
246#define FIRST_ATTEMPT           0x88888888 /* some time that is never reached */
247
248/****************************************************************/
249/* TYPE DEFINITIONS                                             */
250/****************************************************************/
251
252typedef rtems_interval          TimeoutT;
253
254/* 100000th implementation of a doubly linked list;
255 * since only one thread is looking at these,
256 * we need no locking
257 */
258typedef struct ListNodeRec_ {
259        struct ListNodeRec_ *next, *prev;
260} ListNodeRec, *ListNode;
261
262
263/* Structure representing an RPC server */
264typedef struct RpcUdpServerRec_ {
265                RpcUdpServer            next;                   /* linked list of all servers; protected by hlock */
266                union {
267                struct sockaddr_in      sin;
268                struct sockaddr     sa;
269                }                                       addr;
270                AUTH                            *auth;
271                rtems_id                        authlock;               /* must MUTEX the auth object - it's not clear
272                                                                                         *  what is better:
273                                                                                         *   1 having one (MUTEXed) auth per server
274                                                                                         *         who is shared among all transactions
275                                                                                         *         using that server
276                                                                                         *       2 maintaining an AUTH per transaction
277                                                                                         *         (there are then other options: manage
278                                                                                         *         XACT pools on a per-server basis instead
279                                                                                         *         of associating a server with a XACT when
280                                                                                         *   sending)
281                                                                                         * experience will show if the current (1)
282                                                                                         * approach has to be changed.
283                                                                                         */
284                TimeoutT                        retry_period;   /* dynamically adjusted retry period
285                                                                                         * (based on packet roundtrip time)
286                                                                                         */
287                /* STATISTICS */
288                unsigned long           retrans;                /* how many retries were issued by this server         */
289                unsigned long           requests;               /* how many requests have been sent                    */
290                unsigned long       timeouts;           /* how many requests have timed out                    */
291                unsigned long       errors;         /* how many errors have occurred (other than timeouts) */
292                char                            name[20];               /* server's address in IP 'dot' notation               */
293} RpcUdpServerRec;
294
295typedef union  RpcBufU_ {
296                uint32_t                        xid;
297                char                            buf[1];
298} RpcBufU, *RpcBuf;
299
300/* RX Buffer implementation; this is either
301 * an MBUF chain (MBUF_RX configuration)
302 * or a buffer allocated from the heap
303 * where recvfrom copies the (encoded) reply
304 * to. The XDR routines the copy/decode
305 * it into the user's data structures.
306 */
307#ifdef MBUF_RX
308typedef struct mbuf *           RxBuf;  /* an MBUF chain */
309static  void                            bufFree(struct mbuf **m);
310#define XID(ibuf)                       (*(mtod((ibuf), u_long *)))
311extern void                             xdrmbuf_create(XDR *, struct mbuf *, enum xdr_op);
312#else
313typedef RpcBuf                          RxBuf;
314#define bufFree(b)                      do { MY_FREE(*(b)); *(b)=0; } while(0)
315#define XID(ibuf)                       ((ibuf)->xid)
316#endif
317
318/* A RPC 'transaction' consisting
319 * of server and requestor information,
320 * buffer space and an XDR object
321 * (for encoding arguments).
322 */
323typedef struct RpcUdpXactRec_ {
324                ListNodeRec                     node;           /* so we can put XACTs on a list                */
325                RpcUdpServer            server;         /* server this XACT goes to                     */
326                long                            lifetime;       /* during the lifetime, retry attempts are made */
327                long                            tolive;         /* lifetime timer                               */
328                struct rpc_err          status;         /* RPC reply error status                       */
329                long                            age;            /* age info; needed to manage retransmission    */
330                long                            trip;           /* record round trip time in ticks              */
331                rtems_id                        requestor;      /* the task waiting for this XACT to complete   */
332                RpcUdpXactPool          pool;           /* if this XACT belong to a pool, this is it    */
333                XDR                                     xdrs;           /* argument encoder stream                      */
334                int                                     xdrpos;     /* stream position after the (permanent) header */
335                xdrproc_t                       xres;           /* reply decoder proc - TODO needn't be here    */
336                caddr_t                         pres;           /* reply decoded obj  - TODO needn't be here    */
337#ifndef MBUF_RX
338                int                                     ibufsize;       /* size of the ibuf (bytes)                     */
339#endif
340#ifdef  MBUF_TX
341                int                                     refcnt;         /* mbuf external storage reference count        */
342#endif
343                int                                     obufsize;       /* size of the obuf (bytes)                     */
344                RxBuf                           ibuf;           /* pointer to input buffer assigned by daemon   */
345                RpcBufU                         obuf;       /* output buffer (encoded args) APPENDED HERE   */
346} RpcUdpXactRec;
347
348typedef struct RpcUdpXactPoolRec_ {
349        rtems_id        box;
350        int                     prog;
351        int                     version;
352        int                     xactSize;
353} RpcUdpXactPoolRec;
354
355/* a global hash table where all 'living' transaction
356 * objects are registered.
357 * A number of bits in a transaction's XID maps 1:1 to
358 * an index in this table. Hence, the XACT matching
359 * an RPC/UDP reply packet can quickly be found
360 * The size of this table imposes a hard limit on the
361 * number of all created transactions in the system.
362 */
363static RpcUdpXact xactHashTbl[XACT_HASHS]={0};
364static u_long     xidUpper   [XACT_HASHS]={0};
365static unsigned   xidHashSeed            = 0 ;
366
367/* forward declarations */
368static RpcUdpXact
369sockRcv(void);
370
371static void
372rpcio_daemon(rtems_task_argument);
373
374#ifdef MBUF_TX
375ssize_t
376sendto_nocpy (
377                int s,
378                const void *buf, size_t buflen,
379                int flags,
380                const struct sockaddr *toaddr, int tolen,
381                void *closure,
382                void (*freeproc)(caddr_t, u_int),
383                void (*refproc)(caddr_t, u_int)
384);
385static void paranoia_free(caddr_t closure, u_int size);
386static void paranoia_ref (caddr_t closure, u_int size);
387#define SENDTO  sendto_nocpy
388#else
389#define SENDTO  sendto
390#endif
391
392static RpcUdpServer             rpcUdpServers = 0;      /* linked list of all servers; protected by llock */
393
394static int                              ourSock = -1;           /* the socket we are using for communication */
395static rtems_id                 rpciod  = 0;            /* task id of the RPC daemon                 */
396static rtems_id                 msgQ    = 0;            /* message queue where the daemon picks up
397                                                                                         * requests
398                                                                                         */
399#ifndef NDEBUG
400static rtems_id                 llock   = 0;            /* MUTEX protecting the server list */
401static rtems_id                 hlock   = 0;            /* MUTEX protecting the hash table and the list of servers */
402#endif
403static rtems_id                 fini    = 0;            /* a synchronization semaphore we use during
404                                                                                         * module cleanup / driver unloading
405                                                                                         */
406static rtems_interval   ticksPerSec;            /* cached system clock rate (WHO IS ASSUMED NOT
407                                                                                         * TO CHANGE)
408                                                                                         */
409
410rtems_task_priority             rpciodPriority = 0;
411#ifdef RTEMS_SMP
412const cpu_set_t                 *rpciodCpuset = 0;
413size_t                          rpciodCpusetSize = 0;
414#endif
415
416#if (DEBUG) & DEBUG_MALLOC
417/* malloc wrappers for debugging */
418static int nibufs = 0;
419
420static inline void *MY_MALLOC(int s)
421{
422        if (s) {
423                void *rval;
424                MU_LOCK(hlock);
425                assert(nibufs++ < 2000);
426                MU_UNLOCK(hlock);
427                assert((rval = malloc(s)) != 0);
428                return rval;
429        }
430        return 0;
431}
432
433static inline void *MY_CALLOC(int n, int s)
434{
435        if (s) {
436                void *rval;
437                MU_LOCK(hlock);
438                assert(nibufs++ < 2000);
439                MU_UNLOCK(hlock);
440                assert((rval = calloc(n,s)) != 0);
441                return rval;
442        }
443        return 0;
444}
445
446
447static inline void MY_FREE(void *p)
448{
449        if (p) {
450                MU_LOCK(hlock);
451                nibufs--;
452                MU_UNLOCK(hlock);
453                free(p);
454        }
455}
456#else
457#define MY_MALLOC       malloc
458#define MY_CALLOC       calloc
459#define MY_FREE         free
460#endif
461
462static inline bool_t
463locked_marshal(RpcUdpServer s, XDR *xdrs)
464{
465bool_t rval;
466        MU_LOCK(s->authlock);
467        rval = AUTH_MARSHALL(s->auth, xdrs);
468        MU_UNLOCK(s->authlock);
469        return rval;
470}
471
472/* Locked operations on a server's auth object */
473static inline bool_t
474locked_validate(RpcUdpServer s, struct opaque_auth *v)
475{
476bool_t rval;
477        MU_LOCK(s->authlock);
478        rval = AUTH_VALIDATE(s->auth, v);
479        MU_UNLOCK(s->authlock);
480        return rval;
481}
482
483static inline bool_t
484locked_refresh(RpcUdpServer s)
485{
486bool_t rval;
487        MU_LOCK(s->authlock);
488        rval = AUTH_REFRESH(s->auth);
489        MU_UNLOCK(s->authlock);
490        return rval;
491}
492
493/* Create a server object
494 *
495 */
496enum clnt_stat
497rpcUdpServerCreate(
498        struct sockaddr_in      *paddr,
499        rpcprog_t               prog,
500        rpcvers_t               vers,
501        u_long                  uid,
502        u_long                  gid,
503        RpcUdpServer            *psrv
504        )
505{
506RpcUdpServer    rval;
507u_short                 port;
508char                    hname[MAX_MACHINE_NAME + 1];
509int                             theuid, thegid;
510int                             thegids[NGRPS];
511gid_t                   gids[NGROUPS];
512int                             len,i;
513AUTH                    *auth;
514enum clnt_stat  pmap_err;
515struct pmap             pmaparg;
516
517        if ( gethostname(hname, MAX_MACHINE_NAME) ) {
518                fprintf(stderr,
519                                "RPCIO - error: I have no hostname ?? (%s)\n",
520                                strerror(errno));
521                return RPC_UNKNOWNHOST;
522        }
523
524        if ( (len = getgroups(NGROUPS, gids) < 0 ) ) {
525                fprintf(stderr,
526                                "RPCIO - error: I unable to get group ids (%s)\n",
527                                strerror(errno));
528                return RPC_FAILED;
529        }
530
531        if ( len > NGRPS )
532                len = NGRPS;
533
534        for (i=0; i<len; i++)
535                thegids[i] = (int)gids[i];
536
537        theuid = (int) ((RPCIOD_DEFAULT_ID == uid) ? geteuid() : uid);
538        thegid = (int) ((RPCIOD_DEFAULT_ID == gid) ? getegid() : gid);
539
540        if ( !(auth = authunix_create(hname, theuid, thegid, len, thegids)) ) {
541                fprintf(stderr,
542                                "RPCIO - error: unable to create RPC AUTH\n");
543                return RPC_FAILED;
544        }
545
546        /* if they specified no port try to ask the portmapper */
547        if (!paddr->sin_port) {
548
549                paddr->sin_port = htons(PMAPPORT);
550
551        pmaparg.pm_prog = prog;
552        pmaparg.pm_vers = vers;
553        pmaparg.pm_prot = IPPROTO_UDP;
554        pmaparg.pm_port = 0;  /* not needed or used */
555
556
557                /* dont use non-reentrant pmap_getport ! */
558
559                pmap_err = rpcUdpCallRp(
560                                                paddr,
561                                                PMAPPROG,
562                                                PMAPVERS,
563                                                PMAPPROC_GETPORT,
564                                                xdr_pmap,
565                                                &pmaparg,
566                                                xdr_u_short,
567                                                &port,
568                                                uid,
569                                                gid,
570                                                0);
571
572                if ( RPC_SUCCESS != pmap_err ) {
573                        paddr->sin_port = 0;
574                        return pmap_err;
575                }
576
577                paddr->sin_port = htons(port);
578        }
579
580        if (0==paddr->sin_port) {
581                        return RPC_PROGNOTREGISTERED;
582        }
583
584        rval                            = (RpcUdpServer)MY_MALLOC(sizeof(*rval));
585        memset(rval, 0, sizeof(*rval));
586
587        if (!inet_ntop(AF_INET, &paddr->sin_addr, rval->name, sizeof(rval->name)))
588                sprintf(rval->name,"?.?.?.?");
589        rval->addr.sin          = *paddr;
590
591        /* start with a long retransmission interval - it
592         * will be adapted dynamically
593         */
594        rval->retry_period  = RPCIOD_RETX_CAP_S * ticksPerSec;
595
596        rval->auth                      = auth;
597
598        MU_CREAT( &rval->authlock );
599
600        /* link into list */
601        MU_LOCK( llock );
602        rval->next = rpcUdpServers;
603        rpcUdpServers = rval;
604        MU_UNLOCK( llock );
605
606        *psrv                           = rval;
607        return RPC_SUCCESS;
608}
609
610void
611rpcUdpServerDestroy(RpcUdpServer s)
612{
613RpcUdpServer prev;
614        if (!s)
615                return;
616        /* we should probably verify (but how?) that nobody
617         * (at least: no outstanding XACTs) is using this
618         * server;
619         */
620
621        /* remove from server list */
622        MU_LOCK(llock);
623        prev = rpcUdpServers;
624        if ( s == prev ) {
625                rpcUdpServers = s->next;
626        } else {
627                for ( ; prev ; prev = prev->next) {
628                        if (prev->next == s) {
629                                prev->next = s->next;
630                                break;
631                        }
632                }
633        }
634        MU_UNLOCK(llock);
635
636        /* MUST have found it */
637        assert(prev);
638
639        auth_destroy(s->auth);
640
641        MU_DESTROY(s->authlock);
642        MY_FREE(s);
643}
644
645int
646rpcUdpStats(FILE *f)
647{
648RpcUdpServer s;
649
650        if (!f) f = stdout;
651
652        fprintf(f,"RPCIOD statistics:\n");
653
654        MU_LOCK(llock);
655        for (s = rpcUdpServers; s; s=s->next) {
656                fprintf(f,"\nServer -- %s:\n", s->name);
657                fprintf(f,"  requests    sent: %10ld, retransmitted: %10ld\n",
658                                                s->requests, s->retrans);
659                fprintf(f,"         timed out: %10ld,   send errors: %10ld\n",
660                                                s->timeouts, s->errors);
661                fprintf(f,"  current retransmission interval: %dms\n",
662                                                (unsigned)(s->retry_period * 1000 / ticksPerSec) );
663        }
664        MU_UNLOCK(llock);
665
666        return 0;
667}
668
669RpcUdpXact
670rpcUdpXactCreate(
671        u_long  program,
672        u_long  version,
673        u_long  size
674        )
675{
676RpcUdpXact              rval=0;
677struct rpc_msg  header;
678register int    i,j;
679
680        if (!size)
681                size = UDPMSGSIZE;
682        /* word align */
683        size = (size + 3) & ~3;
684
685        rval = (RpcUdpXact)MY_CALLOC(1,sizeof(*rval) - sizeof(rval->obuf) + size);
686
687        if (rval) {
688
689                header.rm_xid             = 0;
690                header.rm_direction       = CALL;
691                header.rm_call.cb_rpcvers = RPC_MSG_VERSION;
692                header.rm_call.cb_prog    = program;
693                header.rm_call.cb_vers    = version;
694                xdrmem_create(&(rval->xdrs), rval->obuf.buf, size, XDR_ENCODE);
695
696                if (!xdr_callhdr(&(rval->xdrs), &header)) {
697                        MY_FREE(rval);
698                        return 0;
699                }
700                /* pick a free table slot and initialize the XID */
701                MU_LOCK(hlock);
702                rval->obuf.xid = (xidHashSeed++ ^ ((uintptr_t)rval>>10)) & XACT_HASH_MSK;
703                i=j=(rval->obuf.xid & XACT_HASH_MSK);
704                if (msgQ) {
705                        /* if there's no message queue, refuse to
706                         * give them transactions; we might be in the process to
707                         * go away...
708                         */
709                        do {
710                                i=(i+1) & XACT_HASH_MSK; /* cheap modulo */
711                                if (!xactHashTbl[i]) {
712#if (DEBUG) & DEBUG_TRACE_XACT
713                                        fprintf(stderr,"RPCIO: entering index %i, val %x\n",i,rval);
714#endif
715                                        xactHashTbl[i]=rval;
716                                        j=-1;
717                                        break;
718                                }
719                        } while (i!=j);
720                }
721                MU_UNLOCK(hlock);
722                if (i==j) {
723                        XDR_DESTROY(&rval->xdrs);
724                        MY_FREE(rval);
725                        return 0;
726                }
727                rval->obuf.xid  = xidUpper[i] | i;
728                rval->xdrpos    = XDR_GETPOS(&(rval->xdrs));
729                rval->obufsize  = size;
730        }
731        return rval;
732}
733
734void
735rpcUdpXactDestroy(RpcUdpXact xact)
736{
737int i = xact->obuf.xid & XACT_HASH_MSK;
738
739#if (DEBUG) & DEBUG_TRACE_XACT
740                fprintf(stderr,"RPCIO: removing index %i, val %x\n",i,xact);
741#endif
742
743                ASSERT( xactHashTbl[i]==xact );
744
745                MU_LOCK(hlock);
746                xactHashTbl[i]=0;
747                /* remember XID we used last time so we can avoid
748                 * reusing the same one (incremented by rpcUdpSend routine)
749                 */
750                xidUpper[i]   = xact->obuf.xid;
751                MU_UNLOCK(hlock);
752
753                bufFree(&xact->ibuf);
754
755                XDR_DESTROY(&xact->xdrs);
756                MY_FREE(xact);
757}
758
759
760
761/* Send a transaction, i.e. enqueue it to the
762 * RPC daemon who will actually send it.
763 */
764enum clnt_stat
765rpcUdpSend(
766        RpcUdpXact              xact,
767        RpcUdpServer    srvr,
768        struct timeval  *timeout,
769        u_long                  proc,
770        xdrproc_t               xres, caddr_t pres,
771        xdrproc_t               xargs, caddr_t pargs,
772        ...
773   )
774{
775register XDR    *xdrs;
776unsigned long   ms;
777va_list                 ap;
778
779        va_start(ap,pargs);
780
781        if (!timeout)
782                timeout = RPCIOD_DEFAULT_TIMEOUT;
783
784        ms = 1000 * timeout->tv_sec + timeout->tv_usec/1000;
785
786        /* round lifetime to closest # of ticks */
787        xact->lifetime  = (ms * ticksPerSec + 500) / 1000;
788        if ( 0 == xact->lifetime )
789                xact->lifetime = 1;
790
791#if (DEBUG) & DEBUG_TIMEOUT
792        {
793        static int once=0;
794        if (!once++) {
795                fprintf(stderr,
796                                "Initial lifetime: %i (ticks)\n",
797                                xact->lifetime);
798        }
799        }
800#endif
801
802        xact->tolive    = xact->lifetime;
803
804        xact->xres      = xres;
805        xact->pres      = pres;
806        xact->server    = srvr;
807
808        xdrs            = &xact->xdrs;
809        xdrs->x_op      = XDR_ENCODE;
810        /* increment transaction ID */
811        xact->obuf.xid += XACT_HASHS;
812        XDR_SETPOS(xdrs, xact->xdrpos);
813        if ( !XDR_PUTLONG(xdrs,(long*)&proc) || !locked_marshal(srvr, xdrs) ||
814                 !xargs(xdrs, pargs) ) {
815                va_end(ap);
816                return(xact->status.re_status=RPC_CANTENCODEARGS);
817        }
818        while ((xargs=va_arg(ap,xdrproc_t))) {
819                if (!xargs(xdrs, va_arg(ap,caddr_t)))
820                va_end(ap);
821                return(xact->status.re_status=RPC_CANTENCODEARGS);
822        }
823
824        va_end(ap);
825
826        rtems_task_ident(RTEMS_SELF, RTEMS_WHO_AM_I, &xact->requestor);
827        if ( rtems_message_queue_send( msgQ, &xact, sizeof(xact)) ) {
828                return RPC_CANTSEND;
829        }
830        /* wakeup the rpciod */
831        ASSERT( RTEMS_SUCCESSFUL==rtems_event_send(rpciod, RPCIOD_TX_EVENT) );
832
833        return RPC_SUCCESS;
834}
835
836/* Block for the RPC reply to an outstanding
837 * transaction.
838 * The caller is woken by the RPC daemon either
839 * upon reception of the reply or on timeout.
840 */
841enum clnt_stat
842rpcUdpRcv(RpcUdpXact xact)
843{
844int                                     refresh;
845XDR                     reply_xdrs;
846struct rpc_msg          reply_msg;
847rtems_status_code       status;
848rtems_event_set         gotEvents;
849
850        refresh = 0;
851
852        do {
853
854        /* block for the reply */
855        status = rtems_event_receive(
856                RTEMS_RPC_EVENT,
857                RTEMS_WAIT | RTEMS_EVENT_ANY,
858                RTEMS_NO_TIMEOUT,
859                &gotEvents);
860        ASSERT( status == RTEMS_SUCCESSFUL );
861
862        if (xact->status.re_status) {
863#ifdef MBUF_RX
864                /* add paranoia */
865                ASSERT( !xact->ibuf );
866#endif
867                return xact->status.re_status;
868        }
869
870#ifdef MBUF_RX
871        xdrmbuf_create(&reply_xdrs, xact->ibuf, XDR_DECODE);
872#else
873        xdrmem_create(&reply_xdrs, xact->ibuf->buf, xact->ibufsize, XDR_DECODE);
874#endif
875
876        reply_msg.acpted_rply.ar_verf          = _null_auth;
877        reply_msg.acpted_rply.ar_results.where = xact->pres;
878        reply_msg.acpted_rply.ar_results.proc  = xact->xres;
879
880        if (xdr_replymsg(&reply_xdrs, &reply_msg)) {
881                /* OK */
882                _seterr_reply(&reply_msg, &xact->status);
883                if (RPC_SUCCESS == xact->status.re_status) {
884                        if ( !locked_validate(xact->server,
885                                                                &reply_msg.acpted_rply.ar_verf) ) {
886                                xact->status.re_status = RPC_AUTHERROR;
887                                xact->status.re_why    = AUTH_INVALIDRESP;
888                        }
889                        if (reply_msg.acpted_rply.ar_verf.oa_base) {
890                                reply_xdrs.x_op = XDR_FREE;
891                                xdr_opaque_auth(&reply_xdrs, &reply_msg.acpted_rply.ar_verf);
892                        }
893                        refresh = 0;
894                } else {
895                        /* should we try to refresh our credentials ? */
896                        if ( !refresh ) {
897                                /* had never tried before */
898                                refresh = RPCIOD_REFRESH;
899                        }
900                }
901        } else {
902                reply_xdrs.x_op        = XDR_FREE;
903                xdr_replymsg(&reply_xdrs, &reply_msg);
904                xact->status.re_status = RPC_CANTDECODERES;
905        }
906        XDR_DESTROY(&reply_xdrs);
907
908        bufFree(&xact->ibuf);
909
910#ifndef MBUF_RX
911        xact->ibufsize = 0;
912#endif
913
914        if (refresh && locked_refresh(xact->server)) {
915                rtems_task_ident(RTEMS_SELF, RTEMS_WHO_AM_I, &xact->requestor);
916                if ( rtems_message_queue_send(msgQ, &xact, sizeof(xact)) ) {
917                        return RPC_CANTSEND;
918                }
919                /* wakeup the rpciod */
920                fprintf(stderr,"RPCIO INFO: refreshing my AUTH\n");
921                ASSERT( RTEMS_SUCCESSFUL==rtems_event_send(rpciod, RPCIOD_TX_EVENT) );
922        }
923
924        } while ( 0 &&  refresh-- > 0 );
925
926        return xact->status.re_status;
927}
928
929
930/* On RTEMS, I'm told to avoid select(); this seems to
931 * be more efficient
932 */
933static void
934rxWakeupCB(struct socket *sock, void *arg)
935{
936  rtems_id *rpciod = (rtems_id*) arg;
937  rtems_event_send(*rpciod, RPCIOD_RX_EVENT);
938}
939
940void
941rpcSetXIDs(uint32_t xid)
942{
943        uint32_t i;
944
945        xid &= ~XACT_HASH_MSK;
946
947        for (i = 0; i < XACT_HASHS; ++i) {
948                xidUpper[i] = xid | i;
949        }
950}
951
952int
953rpcUdpInit(void)
954{
955int                     s;
956rtems_status_code       status;
957int                     noblock = 1;
958struct sockwakeup       wkup;
959
960        if (ourSock < 0) {
961    fprintf(stderr,"RTEMS-RPCIOD $Release$, " \
962            "Till Straumann, Stanford/SLAC/SSRL 2002, " \
963            "See LICENSE file for licensing info.\n");
964
965                ourSock=socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
966                if (ourSock>=0) {
967                        bindresvport(ourSock,(struct sockaddr_in*)0);
968                        s = ioctl(ourSock, FIONBIO, (char*)&noblock);
969                        assert( s == 0 );
970                        /* assume nobody tampers with the clock !! */
971                        ticksPerSec = rtems_clock_get_ticks_per_second();
972                        MU_CREAT( &hlock );
973                        MU_CREAT( &llock );
974
975                        if ( !rpciodPriority ) {
976                                /* use configured networking priority */
977                                if ( ! (rpciodPriority = rtems_bsdnet_config.network_task_priority) )
978                                        rpciodPriority = RPCIOD_PRIO;   /* fallback value */
979                        }
980
981                        status = rtems_task_create(
982                                                                                        rtems_build_name('R','P','C','d'),
983                                                                                        rpciodPriority,
984                                                                                        RPCIOD_STACK,
985                                                                                        RTEMS_DEFAULT_MODES,
986                                                                                        /* fprintf saves/restores FP registers on PPC :-( */
987                                                                                        RTEMS_DEFAULT_ATTRIBUTES | RTEMS_FLOATING_POINT,
988                                                                                        &rpciod);
989                        assert( status == RTEMS_SUCCESSFUL );
990
991#ifdef RTEMS_SMP
992                        if ( rpciodCpuset == 0 ) {
993                                rpciodCpuset = rtems_bsdnet_config.network_task_cpuset;
994                                rpciodCpusetSize = rtems_bsdnet_config.network_task_cpuset_size;
995                        }
996                        if ( rpciodCpuset != 0 )
997                                rtems_task_set_affinity( rpciod, rpciodCpusetSize, rpciodCpuset );
998#endif
999
1000                        wkup.sw_pfn = rxWakeupCB;
1001                        wkup.sw_arg = &rpciod;
1002                        assert( 0==setsockopt(ourSock, SOL_SOCKET, SO_RCVWAKEUP, &wkup, sizeof(wkup)) );
1003                        status = rtems_message_queue_create(
1004                                                                                        rtems_build_name('R','P','C','q'),
1005                                                                                        RPCIOD_QDEPTH,
1006                                                                                        sizeof(RpcUdpXact),
1007                                                                                        RTEMS_DEFAULT_ATTRIBUTES,
1008                                                                                        &msgQ);
1009                        assert( status == RTEMS_SUCCESSFUL );
1010                        status = rtems_task_start( rpciod, rpcio_daemon, 0 );
1011                        assert( status == RTEMS_SUCCESSFUL );
1012
1013                } else {
1014                        return -1;
1015                }
1016        }
1017        return 0;
1018}
1019
1020int
1021rpcUdpCleanup(void)
1022{
1023        rtems_semaphore_create(
1024                        rtems_build_name('R','P','C','f'),
1025                        0,
1026                        RTEMS_DEFAULT_ATTRIBUTES,
1027                        0,
1028                        &fini);
1029        rtems_event_send(rpciod, RPCIOD_KILL_EVENT);
1030        /* synchronize with daemon */
1031        rtems_semaphore_obtain(fini, RTEMS_WAIT, 5*ticksPerSec);
1032        /* if the message queue is still there, something went wrong */
1033        if (!msgQ) {
1034                rtems_task_delete(rpciod);
1035        }
1036        rtems_semaphore_delete(fini);
1037        return (msgQ !=0);
1038}
1039
1040/* Another API - simpler but less efficient.
1041 * For each RPCall, a server and a Xact
1042 * are created and destroyed on the fly.
1043 *
1044 * This should be used for infrequent calls
1045 * (e.g. a NFS mount request).
1046 *
1047 * This is roughly compatible with the original
1048 * clnt_call() etc. API - but it uses our
1049 * daemon and is fully reentrant.
1050 */
1051enum clnt_stat
1052rpcUdpClntCreate(
1053                struct sockaddr_in      *psaddr,
1054                rpcprog_t               prog,
1055                rpcvers_t               vers,
1056                u_long                  uid,
1057                u_long                  gid,
1058                RpcUdpClnt              *pclnt
1059)
1060{
1061RpcUdpXact              x;
1062RpcUdpServer    s;
1063enum clnt_stat  err;
1064
1065        if ( RPC_SUCCESS != (err=rpcUdpServerCreate(psaddr, prog, vers, uid, gid, &s)) )
1066                return err;
1067
1068        if ( !(x=rpcUdpXactCreate(prog, vers, UDPMSGSIZE)) ) {
1069                rpcUdpServerDestroy(s);
1070                return RPC_FAILED;
1071        }
1072        /* TODO: could maintain a server cache */
1073
1074        x->server = s;
1075
1076        *pclnt = x;
1077
1078        return RPC_SUCCESS;
1079}
1080
1081static void
1082rpcUdpClntDestroy(RpcUdpClnt xact)
1083{
1084        rpcUdpServerDestroy(xact->server);
1085        rpcUdpXactDestroy(xact);
1086}
1087
1088enum clnt_stat
1089rpcUdpClntCall(
1090        RpcUdpClnt              xact,
1091        u_long                  proc,
1092        XdrProcT                xargs,
1093        CaddrT                  pargs,
1094        XdrProcT                xres,
1095        CaddrT                  pres,
1096        struct timeval  *timeout
1097        )
1098{
1099enum clnt_stat  stat;
1100
1101                if ( (stat = rpcUdpSend(xact, xact->server, timeout, proc,
1102                                                                xres, pres,
1103                                                                xargs, pargs,
1104                                                                0)) ) {
1105                        fprintf(stderr,"RPCIO Send failed: %i\n",stat);
1106                        return stat;
1107                }
1108                return rpcUdpRcv(xact);
1109}
1110
1111/* a yet simpler interface */
1112enum clnt_stat
1113rpcUdpCallRp(
1114        struct sockaddr_in      *psrvr,
1115        u_long                          prog,
1116        u_long                          vers,
1117        u_long                          proc,
1118        XdrProcT                        xargs,
1119        CaddrT                          pargs,
1120        XdrProcT                        xres,
1121        CaddrT                          pres,
1122        u_long                          uid,            /* RPCIO_DEFAULT_ID picks default */
1123        u_long                          gid,            /* RPCIO_DEFAULT_ID picks default */
1124        struct timeval          *timeout        /* NULL picks default           */
1125)
1126{
1127RpcUdpClnt                      clp;
1128enum clnt_stat          stat;
1129
1130        stat = rpcUdpClntCreate(
1131                                psrvr,
1132                                prog,
1133                                vers,
1134                                uid,
1135                                gid,
1136                                &clp);
1137
1138        if ( RPC_SUCCESS != stat )
1139                return stat;
1140
1141        stat = rpcUdpClntCall(
1142                                clp,
1143                                proc,
1144                                xargs, pargs,
1145                                xres,  pres,
1146                                timeout);
1147
1148        rpcUdpClntDestroy(clp);
1149
1150        return stat;
1151}
1152
1153/* linked list primitives */
1154static void
1155nodeXtract(ListNode n)
1156{
1157        if (n->prev)
1158                n->prev->next = n->next;
1159        if (n->next)
1160                n->next->prev = n->prev;
1161        n->next = n->prev = 0;
1162}
1163
1164static void
1165nodeAppend(ListNode l, ListNode n)
1166{
1167        if ( (n->next = l->next) )
1168                n->next->prev = n;
1169        l->next = n;
1170        n->prev = l;
1171
1172}
1173
1174/* this code does the work */
1175static void
1176rpcio_daemon(rtems_task_argument arg)
1177{
1178rtems_status_code stat;
1179RpcUdpXact        xact;
1180RpcUdpServer      srv;
1181rtems_interval    next_retrans, then, unow;
1182long                                            now;    /* need to do signed comparison with age! */
1183rtems_event_set   events;
1184ListNode          newList;
1185size_t            size;
1186rtems_id          q          =  0;
1187ListNodeRec       listHead   = {0, 0};
1188unsigned long     epoch      = RPCIOD_EPOCH_SECS * ticksPerSec;
1189unsigned long                   max_period = RPCIOD_RETX_CAP_S * ticksPerSec;
1190rtems_status_code       status;
1191
1192
1193        then = rtems_clock_get_ticks_since_boot();
1194
1195        for (next_retrans = epoch;;) {
1196
1197                if ( RTEMS_SUCCESSFUL !=
1198                         (stat = rtems_event_receive(
1199                                                RPCIOD_RX_EVENT | RPCIOD_TX_EVENT | RPCIOD_KILL_EVENT,
1200                                                RTEMS_WAIT | RTEMS_EVENT_ANY,
1201                                                next_retrans,
1202                                                &events)) ) {
1203                        ASSERT( RTEMS_TIMEOUT == stat );
1204                        events = 0;
1205                }
1206
1207                if (events & RPCIOD_KILL_EVENT) {
1208                        int i;
1209
1210#if (DEBUG) & DEBUG_EVENTS
1211                        fprintf(stderr,"RPCIO: got KILL event\n");
1212#endif
1213
1214                        MU_LOCK(hlock);
1215                        for (i=XACT_HASHS-1; i>=0; i--) {
1216                                if (xactHashTbl[i]) {
1217                                        break;
1218                                }
1219                        }
1220                        if (i<0) {
1221                                /* prevent them from creating and enqueueing more messages */
1222                                q=msgQ;
1223                                /* messages queued after we executed this assignment will fail */
1224                                msgQ=0;
1225                        }
1226                        MU_UNLOCK(hlock);
1227                        if (i>=0) {
1228                                fprintf(stderr,"RPCIO There are still transactions circulating; I refuse to go away\n");
1229                                fprintf(stderr,"(1st in slot %i)\n",i);
1230                                rtems_semaphore_release(fini);
1231                        } else {
1232                                break;
1233                        }
1234                }
1235
1236                unow = rtems_clock_get_ticks_since_boot();
1237
1238                /* measure everything relative to then to protect against
1239                 * rollover
1240                 */
1241                now = unow - then;
1242
1243                /* NOTE: we don't lock the hash table while we are operating
1244                 * on transactions; the paradigm is that we 'own' a particular
1245                 * transaction (and hence it's hash table slot) from the
1246                 * time the xact was put into the message queue until we
1247                 * wake up the requestor.
1248                 */
1249
1250                if (RPCIOD_RX_EVENT & events) {
1251
1252#if (DEBUG) & DEBUG_EVENTS
1253                        fprintf(stderr,"RPCIO: got RX event\n");
1254#endif
1255
1256                        while ((xact=sockRcv())) {
1257
1258                                /* extract from the retransmission list */
1259                                nodeXtract(&xact->node);
1260
1261                                /* change the ID - there might already be
1262                                 * a retransmission on the way. When it's
1263                                 * reply arrives we must not find it's ID
1264                                 * in the hashtable
1265                                 */
1266                                xact->obuf.xid        += XACT_HASHS;
1267
1268                                xact->status.re_status = RPC_SUCCESS;
1269
1270                                /* calculate roundtrip ticks */
1271                                xact->trip             = now - xact->trip;
1272
1273                                srv                    = xact->server;
1274
1275                                /* adjust the server's retry period */
1276                                {
1277                                        register TimeoutT rtry = srv->retry_period;
1278                                        register TimeoutT trip = xact->trip;
1279
1280                                        ASSERT( trip >= 0 );
1281
1282                                        if ( 0==trip )
1283                                                trip = 1;
1284
1285                                        /* retry_new = 0.75*retry_old + 0.25 * 8 * roundrip */
1286                                        rtry   = (3*rtry + (trip << 3)) >> 2;
1287
1288                                        if ( rtry > max_period )
1289                                                rtry = max_period;
1290
1291                                        srv->retry_period = rtry;
1292                                }
1293
1294                                /* wakeup requestor */
1295                                rtems_event_send(xact->requestor, RTEMS_RPC_EVENT);
1296                        }
1297                }
1298
1299                if (RPCIOD_TX_EVENT & events) {
1300
1301#if (DEBUG) & DEBUG_EVENTS
1302                        fprintf(stderr,"RPCIO: got TX event\n");
1303#endif
1304
1305                        while (RTEMS_SUCCESSFUL == rtems_message_queue_receive(
1306                                                                                        msgQ,
1307                                                                                        &xact,
1308                                                                                        &size,
1309                                                                                        RTEMS_NO_WAIT,
1310                                                                                        RTEMS_NO_TIMEOUT)) {
1311                                /* put to the head of timeout q */
1312                                nodeAppend(&listHead, &xact->node);
1313
1314                                xact->age  = now;
1315                                xact->trip = FIRST_ATTEMPT;
1316                        }
1317                }
1318
1319
1320                /* work the timeout q */
1321                newList = 0;
1322                for ( xact=(RpcUdpXact)listHead.next;
1323                          xact && xact->age <= now;
1324                          xact=(RpcUdpXact)listHead.next ) {
1325
1326                                /* extract from the list */
1327                                nodeXtract(&xact->node);
1328
1329                                srv = xact->server;
1330
1331                                if (xact->tolive < 0) {
1332                                        /* this one timed out */
1333                                        xact->status.re_errno  = ETIMEDOUT;
1334                                        xact->status.re_status = RPC_TIMEDOUT;
1335
1336                                        srv->timeouts++;
1337
1338                                        /* Change the ID - there might still be
1339                                         * a reply on the way. When it arrives we
1340                                         * must not find it's ID in the hash table
1341                                         *
1342                                         * Thanks to Steven Johnson for hunting this
1343                                         * one down.
1344                                         */
1345                                        xact->obuf.xid        += XACT_HASHS;
1346
1347#if (DEBUG) & DEBUG_TIMEOUT
1348                                        fprintf(stderr,"RPCIO XACT timed out; waking up requestor\n");
1349#endif
1350                                        if ( rtems_event_send(xact->requestor, RTEMS_RPC_EVENT) ) {
1351                                                rtems_panic("RPCIO PANIC file %s line: %i, requestor id was 0x%08x",
1352                                                                        __FILE__,
1353                                                                        __LINE__,
1354                                                                        xact->requestor);
1355                                        }
1356
1357                                } else {
1358                                        int len;
1359
1360                                        len = (int)XDR_GETPOS(&xact->xdrs);
1361
1362#ifdef MBUF_TX
1363                                        xact->refcnt = 1;       /* sendto itself */
1364#endif
1365                                        if ( len != SENDTO( ourSock,
1366                                                                                xact->obuf.buf,
1367                                                                                len,
1368                                                                                0,
1369                                                                                &srv->addr.sa,
1370                                                                                sizeof(srv->addr.sin)
1371#ifdef MBUF_TX
1372                                                                                , xact,
1373                                                                                paranoia_free,
1374                                                                                paranoia_ref
1375#endif
1376                                                                                ) ) {
1377
1378                                                xact->status.re_errno  = errno;
1379                                                xact->status.re_status = RPC_CANTSEND;
1380                                                srv->errors++;
1381
1382                                                /* wakeup requestor */
1383                                                fprintf(stderr,"RPCIO: SEND failure\n");
1384                                                status = rtems_event_send(xact->requestor, RTEMS_RPC_EVENT);
1385                                                assert( status == RTEMS_SUCCESSFUL );
1386
1387                                        } else {
1388                                                /* send successful; calculate retransmission time
1389                                                 * and enqueue to temporary list
1390                                                 */
1391                                                if (FIRST_ATTEMPT != xact->trip) {
1392#if (DEBUG) & DEBUG_TIMEOUT
1393                                                        fprintf(stderr,
1394                                                                "timed out; tolive is %i (ticks), retry period is %i (ticks)\n",
1395                                                                        xact->tolive,
1396                                                                        srv->retry_period);
1397#endif
1398                                                        /* this is a real retry; we backup
1399                                                         * the server's retry interval
1400                                                         */
1401                                                        if ( srv->retry_period < max_period ) {
1402
1403                                                                /* If multiple transactions for this server
1404                                                                 * fail (e.g. because it died) this will
1405                                                                 * back-off very agressively (doubling
1406                                                                 * the retransmission period for every
1407                                                                 * timed out transaction up to the CAP limit)
1408                                                                 * which is desirable - single packet failure
1409                                                                 * is treated more gracefully by this algorithm.
1410                                                                 */
1411
1412                                                                srv->retry_period<<=1;
1413#if (DEBUG) & DEBUG_TIMEOUT
1414                                                                fprintf(stderr,
1415                                                                                "adjusted to; retry period %i\n",
1416                                                                                srv->retry_period);
1417#endif
1418                                                        } else {
1419                                                                /* never wait longer than RPCIOD_RETX_CAP_S seconds */
1420                                                                fprintf(stderr,
1421                                                                                "RPCIO: server '%s' not responding - still trying\n",
1422                                                                                srv->name);
1423                                                        }
1424                                                        if ( 0 == ++srv->retrans % 1000) {
1425                                                                fprintf(stderr,
1426                                                                                "RPCIO - statistics: already %li retries to server %s\n",
1427                                                                                srv->retrans,
1428                                                                                srv->name);
1429                                                        }
1430                                                } else {
1431                                                        srv->requests++;
1432                                                }
1433                                                xact->trip      = now;
1434                                                {
1435                                                long capped_period = srv->retry_period;
1436                                                        if ( xact->lifetime < capped_period )
1437                                                                capped_period = xact->lifetime;
1438                                                xact->age       = now + capped_period;
1439                                                xact->tolive   -= capped_period;
1440                                                }
1441                                                /* enqueue to the list of newly sent transactions */
1442                                                xact->node.next = newList;
1443                                                newList         = &xact->node;
1444#if (DEBUG) & DEBUG_TIMEOUT
1445                                                fprintf(stderr,
1446                                                                "XACT (0x%08x) age is 0x%x, now: 0x%x\n",
1447                                                                xact,
1448                                                                xact->age,
1449                                                                now);
1450#endif
1451                                        }
1452                                }
1453            }
1454
1455                /* insert the newly sent transactions into the
1456                 * sorted retransmission list
1457                 */
1458                for (; (xact = (RpcUdpXact)newList); ) {
1459                        register ListNode p,n;
1460                        newList = newList->next;
1461                        for ( p=&listHead; (n=p->next) && xact->age > ((RpcUdpXact)n)->age; p=n )
1462                                /* nothing else to do */;
1463                        nodeAppend(p, &xact->node);
1464                }
1465
1466                if (now > epoch) {
1467                        /* every now and then, readjust the epoch */
1468                        register ListNode n;
1469                        then += now;
1470                        for (n=listHead.next; n; n=n->next) {
1471                                /* readjust outstanding time intervals subject to the
1472                                 * condition that the 'absolute' time must remain
1473                                 * the same. 'age' and 'trip' are measured with
1474                                 * respect to 'then' - hence:
1475                                 *
1476                                 * abs_age == old_age + old_then == new_age + new_then
1477                                 *
1478                                 * ==> new_age = old_age + old_then - new_then == old_age - 'now'
1479                                 */
1480                                ((RpcUdpXact)n)->age  -= now;
1481                                ((RpcUdpXact)n)->trip -= now;
1482#if (DEBUG) & DEBUG_TIMEOUT
1483                                fprintf(stderr,
1484                                                "readjusted XACT (0x%08x); age is 0x%x, trip: 0x%x now: 0x%x\n",
1485                                                (RpcUdpXact)n,
1486                                                ((RpcUdpXact)n)->trip,
1487                                                ((RpcUdpXact)n)->age,
1488                                                now);
1489#endif
1490                        }
1491                        now = 0;
1492                }
1493
1494                next_retrans = listHead.next ?
1495                                                        ((RpcUdpXact)listHead.next)->age - now :
1496                                                        epoch;  /* make sure we don't miss updating the epoch */
1497#if (DEBUG) & DEBUG_TIMEOUT
1498                fprintf(stderr,"RPCIO: next timeout is %x\n",next_retrans);
1499#endif
1500        }
1501        /* close our socket; shut down the receiver */
1502        close(ourSock);
1503
1504#if 0 /* if we get here, no transactions exist, hence there can be none
1505           * in the queue whatsoever
1506           */
1507        /* flush the message queue */
1508        while (RTEMS_SUCCESSFUL == rtems_message_queue_receive(
1509                                                                                q,
1510                                                                                &xact,
1511                                                                                &size,
1512                                                                                RTEMS_NO_WAIT,
1513                                                                                RTEMS_NO_TIMEOUT)) {
1514                        /* TODO enque xact */
1515        }
1516
1517        /* flush all outstanding transactions */
1518
1519        for (xact=((RpcUdpXact)listHead.next); xact; xact=((RpcUdpXact)xact->node.next)) {
1520                        xact->status.re_status = RPC_TIMEDOUT;
1521                        rtems_event_send(xact->requestor, RTEMS_RPC_EVENT);
1522        }
1523#endif
1524
1525        rtems_message_queue_delete(q);
1526
1527        MU_DESTROY(hlock);
1528
1529        fprintf(stderr,"RPC daemon exited...\n");
1530
1531        rtems_semaphore_release(fini);
1532        rtems_task_suspend(RTEMS_SELF);
1533}
1534
1535
1536/* support for transaction 'pools'. A number of XACT objects
1537 * is always kept around. The initial number is 0 but it
1538 * is allowed to grow up to a maximum.
1539 * If the need grows beyond the maximum, behavior depends:
1540 * Users can either block until a transaction becomes available,
1541 * they can create a new XACT on the fly or get an error
1542 * if no free XACT is available from the pool.
1543 */
1544
1545RpcUdpXactPool
1546rpcUdpXactPoolCreate(
1547        rpcprog_t prog,                 rpcvers_t version,
1548        int xactsize,   int poolsize)
1549{
1550RpcUdpXactPool  rval = MY_MALLOC(sizeof(*rval));
1551rtems_status_code       status;
1552
1553        ASSERT( rval );
1554        status = rtems_message_queue_create(
1555                                        rtems_build_name('R','P','C','p'),
1556                                        poolsize,
1557                                        sizeof(RpcUdpXact),
1558                                        RTEMS_DEFAULT_ATTRIBUTES,
1559                                        &rval->box);
1560        assert( status == RTEMS_SUCCESSFUL );
1561
1562        rval->prog     = prog;
1563        rval->version  = version;
1564        rval->xactSize = xactsize;
1565        return rval;
1566}
1567
1568void
1569rpcUdpXactPoolDestroy(RpcUdpXactPool pool)
1570{
1571RpcUdpXact xact;
1572
1573        while ((xact = rpcUdpXactPoolGet(pool, XactGetFail))) {
1574                rpcUdpXactDestroy(xact);
1575        }
1576        rtems_message_queue_delete(pool->box);
1577        MY_FREE(pool);
1578}
1579
1580RpcUdpXact
1581rpcUdpXactPoolGet(RpcUdpXactPool pool, XactPoolGetMode mode)
1582{
1583RpcUdpXact       xact = 0;
1584size_t           size;
1585
1586        if (RTEMS_SUCCESSFUL != rtems_message_queue_receive(
1587                                                                pool->box,
1588                                                                &xact,
1589                                                                &size,
1590                                                                XactGetWait == mode ?
1591                                                                        RTEMS_WAIT : RTEMS_NO_WAIT,
1592                                                                RTEMS_NO_TIMEOUT)) {
1593
1594                /* nothing found in box; should we create a new one ? */
1595
1596                xact = (XactGetCreate == mode) ?
1597                                        rpcUdpXactCreate(
1598                                                        pool->prog,
1599                                                        pool->version,
1600                                                        pool->xactSize) : 0 ;
1601                if (xact)
1602                                xact->pool = pool;
1603
1604        }
1605        return xact;
1606}
1607
1608void
1609rpcUdpXactPoolPut(RpcUdpXact xact)
1610{
1611RpcUdpXactPool pool;
1612
1613        pool = xact->pool;
1614        ASSERT( pool );
1615
1616        if (RTEMS_SUCCESSFUL != rtems_message_queue_send(
1617                                                                pool->box,
1618                                                                &xact,
1619                                                                sizeof(xact)))
1620                rpcUdpXactDestroy(xact);
1621}
1622
1623#ifdef MBUF_RX
1624
1625/* WORKAROUND: include sys/mbuf.h (or other bsdnet headers) only
1626 *             _after_ using malloc()/free() & friends because
1627 *             the RTEMS/BSDNET headers redefine those :-(
1628 */
1629
1630#define _KERNEL
1631#include <sys/mbuf.h>
1632
1633ssize_t
1634recv_mbuf_from(int s, struct mbuf **ppm, long len, struct sockaddr *fromaddr, int *fromlen);
1635
1636static void
1637bufFree(struct mbuf **m)
1638{
1639        if (*m) {
1640                rtems_bsdnet_semaphore_obtain();
1641                m_freem(*m);
1642                rtems_bsdnet_semaphore_release();
1643                *m = 0;
1644        }
1645}
1646#endif
1647
1648#ifdef MBUF_TX
1649static void
1650paranoia_free(caddr_t closure, u_int size)
1651{
1652#if (DEBUG)
1653RpcUdpXact xact = (RpcUdpXact)closure;
1654int        len  = (int)XDR_GETPOS(&xact->xdrs);
1655
1656        ASSERT( --xact->refcnt >= 0 && size == len );
1657#endif
1658}
1659
1660static void
1661paranoia_ref (caddr_t closure, u_int size)
1662{
1663#if (DEBUG)
1664RpcUdpXact xact = (RpcUdpXact)closure;
1665int        len  = (int)XDR_GETPOS(&xact->xdrs);
1666        ASSERT( size == len );
1667        xact->refcnt++;
1668#endif
1669}
1670#endif
1671
1672/* receive from a socket and find
1673 * the transaction corresponding to the
1674 * transaction ID received in the server
1675 * reply.
1676 *
1677 * The semantics of the 'pibuf' pointer are
1678 * as follows:
1679 *
1680 * MBUF_RX:
1681 *
1682 */
1683
1684#define RPCIOD_RXBUFSZ  UDPMSGSIZE
1685
1686static RpcUdpXact
1687sockRcv(void)
1688{
1689int                                     len,i;
1690uint32_t                                xid;
1691union {
1692        struct sockaddr_in      sin;
1693        struct sockaddr     sa;
1694}                                       fromAddr;
1695int                                     fromLen  = sizeof(fromAddr.sin);
1696RxBuf                           ibuf     = 0;
1697RpcUdpXact                      xact     = 0;
1698
1699        do {
1700
1701        /* rcv_mbuf() and recvfrom() differ in that the
1702         * former allocates buffers and passes them back
1703         * to us whereas the latter requires us to provide
1704         * buffer space.
1705         * Hence, in the first case whe have to make sure
1706         * no old buffer is leaked - in the second case,
1707         * we might well re-use an old buffer but must
1708         * make sure we have one allocated
1709         */
1710#ifdef MBUF_RX
1711        if (ibuf)
1712                bufFree(&ibuf);
1713
1714        len  = recv_mbuf_from(
1715                                        ourSock,
1716                                        &ibuf,
1717                                        RPCIOD_RXBUFSZ,
1718                                    &fromAddr.sa,
1719                                    &fromLen);
1720#else
1721        if ( !ibuf )
1722                ibuf = (RpcBuf)MY_MALLOC(RPCIOD_RXBUFSZ);
1723        if ( !ibuf )
1724                goto cleanup; /* no memory - drop this message */
1725
1726        len  = recvfrom(ourSock,
1727                                    ibuf->buf,
1728                                    RPCIOD_RXBUFSZ,
1729                                    0,
1730                                    &fromAddr.sa,
1731                                        &fromLen);
1732#endif
1733
1734        if (len <= 0) {
1735                if (EAGAIN != errno)
1736                        fprintf(stderr,"RECV failed: %s\n",strerror(errno));
1737                goto cleanup;
1738        }
1739
1740#if (DEBUG) & DEBUG_PACKLOSS
1741        if ( (unsigned)rand() < DEBUG_PACKLOSS_FRACT ) {
1742                /* lose packets once in a while */
1743                static int xxx = 0;
1744                if ( ++xxx % 16 == 0 )
1745                        fprintf(stderr,"DEBUG: dropped %i packets, so far...\n",xxx);
1746                if ( ibuf )
1747                        bufFree( &ibuf );
1748                continue;
1749        }
1750#endif
1751
1752        i = (xid=XID(ibuf)) & XACT_HASH_MSK;
1753
1754        if ( !(xact=xactHashTbl[i])                                             ||
1755                   xact->obuf.xid                     != xid                        ||
1756#ifdef REJECT_SERVERIP_MISMATCH
1757                   xact->server->addr.sin.sin_addr.s_addr != fromAddr.sin.sin_addr.s_addr       ||
1758#endif
1759                   xact->server->addr.sin.sin_port        != fromAddr.sin.sin_port ) {
1760
1761                if (xact) {
1762                        if (
1763#ifdef REJECT_SERVERIP_MISMATCH
1764                            xact->server->addr.sin.sin_addr.s_addr == fromAddr.sin.sin_addr.s_addr &&
1765#endif
1766                        xact->server->addr.sin.sin_port        == fromAddr.sin.sin_port        &&
1767                            ( xact->obuf.xid                   == xid + XACT_HASHS   ||
1768                                  xact->obuf.xid                   == xid + 2*XACT_HASHS    )
1769                                ) {
1770#ifndef DEBUG /* don't complain if it's just a late arrival of a retry */
1771                        fprintf(stderr,"RPCIO - FYI sockRcv(): dropping late/redundant retry answer\n");
1772#endif
1773                        } else {
1774                        fprintf(stderr,"RPCIO WARNING sockRcv(): transaction mismatch\n");
1775                        fprintf(stderr,"xact: xid  0x%08" PRIx32 "  -- got 0x%08" PRIx32 "\n",
1776                                                        xact->obuf.xid, xid);
1777                        fprintf(stderr,"xact: addr 0x%08" PRIx32 "  -- got 0x%08" PRIx32 "\n",
1778                                                        xact->server->addr.sin.sin_addr.s_addr,
1779                                                        fromAddr.sin.sin_addr.s_addr);
1780                        fprintf(stderr,"xact: port 0x%08x  -- got 0x%08x\n",
1781                                                        xact->server->addr.sin.sin_port,
1782                                                        fromAddr.sin.sin_port);
1783                        }
1784                } else {
1785                        fprintf(stderr,
1786                                        "RPCIO WARNING sockRcv(): got xid 0x%08" PRIx32 " but its slot is empty\n",
1787                                        xid);
1788                }
1789                /* forget about this one and try again */
1790                xact = 0;
1791        }
1792
1793        } while ( !xact );
1794
1795        xact->ibuf     = ibuf;
1796#ifndef MBUF_RX
1797        xact->ibufsize = RPCIOD_RXBUFSZ;
1798#endif
1799
1800        return xact;
1801
1802cleanup:
1803
1804        bufFree(&ibuf);
1805
1806        return 0;
1807}
1808
1809
1810#include <rtems/rtems_bsdnet_internal.h>
1811/* double check the event configuration; should probably globally
1812 * manage system events!!
1813 * We do this at the end of the file for the same reason we had
1814 * included mbuf.h only a couple of lines above - see comment up
1815 * there...
1816 */
1817#if RTEMS_RPC_EVENT & SOSLEEP_EVENT & SBWAIT_EVENT & NETISR_EVENTS
1818#error ILLEGAL EVENT CONFIGURATION
1819#endif
Note: See TracBrowser for help on using the repository browser.