source: rtems/cpukit/libfs/src/nfsclient/src/rpcio.c @ 58d38a0

4.104.114.95
Last change on this file since 58d38a0 was 58d38a0, checked in by Joel Sherrill <joel.sherrill@…>, on 02/26/08 at 19:23:53

2008-02-26 Joel Sherrill <joel.sherrill@…>

  • configure.ac, libfs/Makefile.am: Add nfsclient to cpukit. Although the use of RPC/XDR could be an issue, the code does build multilib across all targets. There are a few remaining warnings to deal with.
  • libfs/src/nfsclient/.cvsignore, libfs/src/nfsclient/ChangeLog.slac, libfs/src/nfsclient/LICENSE, libfs/src/nfsclient/Makefile.am, libfs/src/nfsclient/README, libfs/src/nfsclient/preinstall.am, libfs/src/nfsclient/rfc1094.txt, libfs/src/nfsclient/proto/mount_prot.h, libfs/src/nfsclient/proto/mount_prot.x, libfs/src/nfsclient/proto/mount_prot_xdr.c, libfs/src/nfsclient/proto/nfs_prot.h, libfs/src/nfsclient/proto/nfs_prot.x, libfs/src/nfsclient/proto/nfs_prot_xdr.c, libfs/src/nfsclient/src/cexphelp.c, libfs/src/nfsclient/src/dirutils.c, libfs/src/nfsclient/src/librtemsNfs.h, libfs/src/nfsclient/src/nfs.c, libfs/src/nfsclient/src/nfs.modini.c, libfs/src/nfsclient/src/nfsTest.c, libfs/src/nfsclient/src/rpcio.c, libfs/src/nfsclient/src/rpcio.h, libfs/src/nfsclient/src/rpcio.modini.c, libfs/src/nfsclient/src/sock_mbuf.c, libfs/src/nfsclient/src/xdr_mbuf.c: New files.
  • Property mode set to 100644
File size: 44.0 KB
Line 
1/* $Id$ */
2
3/* RPC multiplexor for a multitasking environment */
4
5/* Author: Till Straumann <strauman@slac.stanford.edu>, 2002 */
6
7/* This code funnels arbitrary task's UDP/RPC requests
8 * through one socket to arbitrary servers.
9 * The replies are gathered and dispatched to the
10 * requestors.
11 * One task handles all the sending and receiving
12 * work including retries.
13 * It is up to the requestor, however, to do
14 * the XDR encoding of the arguments / decoding
15 * of the results (except for the RPC header which
16 * is handled by the daemon).
17 */
18 
19/*
20 * Authorship
21 * ----------
22 * This software (NFS-2 client implementation for RTEMS) was created by
23 *     Till Straumann <strauman@slac.stanford.edu>, 2002-2007,
24 *         Stanford Linear Accelerator Center, Stanford University.
25 *
26 * Acknowledgement of sponsorship
27 * ------------------------------
28 * The NFS-2 client implementation for RTEMS was produced by
29 *     the Stanford Linear Accelerator Center, Stanford University,
30 *         under Contract DE-AC03-76SFO0515 with the Department of Energy.
31 *
32 * Government disclaimer of liability
33 * ----------------------------------
34 * Neither the United States nor the United States Department of Energy,
35 * nor any of their employees, makes any warranty, express or implied, or
36 * assumes any legal liability or responsibility for the accuracy,
37 * completeness, or usefulness of any data, apparatus, product, or process
38 * disclosed, or represents that its use would not infringe privately owned
39 * rights.
40 *
41 * Stanford disclaimer of liability
42 * --------------------------------
43 * Stanford University makes no representations or warranties, express or
44 * implied, nor assumes any liability for the use of this software.
45 *
46 * Stanford disclaimer of copyright
47 * --------------------------------
48 * Stanford University, owner of the copyright, hereby disclaims its
49 * copyright and all other rights in this software.  Hence, anyone may
50 * freely use it for any purpose without restriction. 
51 *
52 * Maintenance of notices
53 * ----------------------
54 * In the interest of clarity regarding the origin and status of this
55 * SLAC software, this and all the preceding Stanford University notices
56 * are to remain affixed to any copy or derivative of this software made
57 * or distributed by the recipient and are to be affixed to any copy of
58 * software made or distributed by the recipient that contains a copy or
59 * derivative of this software.
60 *
61 * ------------------ SLAC Software Notices, Set 4 OTT.002a, 2004 FEB 03
62 */
63
64#include <rtems.h>
65#include <rtems/error.h>
66#include <rtems/rtems_bsdnet.h>
67#include <stdlib.h>
68#include <time.h>
69#include <rpc/rpc.h>
70#include <rpc/pmap_prot.h>
71#include <errno.h>
72#include <sys/ioctl.h>
73#include <assert.h>
74#include <stdio.h>
75#include <errno.h>
76#include <string.h>
77#include <netinet/in.h>
78#include <arpa/inet.h>
79
80#include "rpcio.h"
81
82/****************************************************************/
83/* CONFIGURABLE PARAMETERS                                      */
84/****************************************************************/
85
86#define MBUF_RX                 /* If defined: use mbuf XDR stream for
87                                                 *  decoding directly out of mbufs
88                                                 *  Otherwise, the regular 'recvfrom()'
89                                                 *  interface will be used involving an
90                                                 *  extra buffer allocation + copy step.
91                                                 */
92
93#define MBUF_TX                 /* If defined: avoid copying data when
94                                                 *  sending. Instead, use a wrapper to
95                                                 *  'sosend()' which will point an MBUF
96                                                 *  directly to our buffer space.
97                                                 *  Note that the BSD stack does not copy
98                                                 *  data when fragmenting packets - it
99                                                 *  merely uses an mbuf chain pointing
100                                                 *  into different areas of the data.
101                                                 *
102                                                 * If undefined, the regular 'sendto()'
103                                                 *  interface is used.
104                                                 */
105
106#undef REJECT_SERVERIP_MISMATCH
107                                                /* If defined, RPC replies must come from the server
108                                                 * that was queried. Eric Norum has reported problems
109                                                 * with clustered NFS servers. So we disable this
110                                                 * reducing paranoia...
111                                                 */
112
113/* daemon task parameters */
114#define RPCIOD_STACK            10000
115#define RPCIOD_PRIO                     100     /* *fallback* priority */
116
117/* depth of the message queue for sending
118 * RPC requests to the daemon
119 */
120#define RPCIOD_QDEPTH           20
121
122/* Maximum retry limit for retransmission */
123#define RPCIOD_RETX_CAP_S       3 /* seconds */
124
125/* Default timeout for RPC calls */
126#define RPCIOD_DEFAULT_TIMEOUT  (&_rpc_default_timeout)
127static struct timeval _rpc_default_timeout = { 10 /* secs */, 0 /* usecs */ };
128
129/* how many times should we try to resend a failed
130 * transaction with refreshed AUTHs
131 */
132#define RPCIOD_REFRESH          2
133
134/* Events we are using; the RPC_EVENT
135 * MUST NOT be used by any application
136 * thread doing RPC IO (e.g. NFS)
137 */
138#define RTEMS_RPC_EVENT         RTEMS_EVENT_30  /* THE event used by RPCIO. Every task doing
139                                                                                         * RPC IO will receive this - hence it is
140                                                                                         * RESERVED
141                                                                                         */
142#define RPCIOD_RX_EVENT         RTEMS_EVENT_1   /* Events the RPCIOD is using/waiting for */
143#define RPCIOD_TX_EVENT         RTEMS_EVENT_2
144#define RPCIOD_KILL_EVENT       RTEMS_EVENT_3   /* send to the daemon to kill it          */
145
146#define LD_XACT_HASH            8                               /* ld of the size of the transaction hash table  */
147
148
149/* Debugging Flags                                              */
150
151/* NOTE: defining DEBUG 0 leaves some 'assert()' paranoia checks
152 *       but produces no output
153 */
154
155#define DEBUG_TRACE_XACT        (1<<0)
156#define DEBUG_EVENTS            (1<<1)
157#define DEBUG_MALLOC            (1<<2)
158#define DEBUG_TIMEOUT           (1<<3)
159#define DEBUG_PACKLOSS          (1<<4)  /* This introduces random, artificial packet losses to test retransmission */
160
161#define DEBUG_PACKLOSS_FRACT (0xffffffff/10)
162
163/* USE PARENTHESIS WHEN 'or'ing MULTIPLE FLAGS: (DEBUG_XX | DEBUG_YY) */
164#define DEBUG                           (0)
165
166/****************************************************************/
167/* END OF CONFIGURABLE SECTION                                  */
168/****************************************************************/
169
170/* prevent rollover of our timers by readjusting the epoch on the fly */
171#if     (DEBUG) & DEBUG_TIMEOUT
172#define RPCIOD_EPOCH_SECS       10
173#else
174#define RPCIOD_EPOCH_SECS       10000
175#endif
176
177#ifdef  DEBUG
178#define ASSERT(arg)                     assert(arg)
179#else
180#define ASSERT(arg)                     if (arg)
181#endif
182
183/****************************************************************/
184/* MACROS                                                       */
185/****************************************************************/
186
187
188#define XACT_HASHS              (1<<(LD_XACT_HASH))     /* the hash table size derived from the ld       */
189#define XACT_HASH_MSK   ((XACT_HASHS)-1)        /* mask to extract the hash index from a RPC-XID */
190
191
192#define MU_LOCK(mutex)          do {                                                    \
193                                                        assert(                                                 \
194                                                                RTEMS_SUCCESSFUL ==                     \
195                                                                rtems_semaphore_obtain(         \
196                                                                                (mutex),                        \
197                                                                                RTEMS_WAIT,                     \
198                                                                                RTEMS_NO_TIMEOUT        \
199                                                                                ) );                            \
200                                                        } while(0)
201
202#define MU_UNLOCK(mutex)        do {                                                    \
203                                                        assert(                                                 \
204                                                                RTEMS_SUCCESSFUL ==                     \
205                                                                rtems_semaphore_release(        \
206                                                                                (mutex)                         \
207                                                                                ) );                            \
208                                                        } while(0)
209
210#define MU_CREAT(pmutex)        do {                                                    \
211                                                        assert(                                                 \
212                                                                RTEMS_SUCCESSFUL ==                     \
213                                                                rtems_semaphore_create(         \
214                                                                                rtems_build_name(       \
215                                                                                        'R','P','C','l' \
216                                                                                        ),                              \
217                                                                                1,                                      \
218                                                                                MUTEX_ATTRIBUTES,       \
219                                                                                0,                                      \
220                                                                                (pmutex)) );            \
221                                                        } while (0)
222
223
224#define MU_DESTROY(mutex)       do {                                                    \
225                                                        assert(                                                 \
226                                                                RTEMS_SUCCESSFUL ==                     \
227                                                                rtems_semaphore_delete(         \
228                                                                                mutex                           \
229                                                                                ) );                            \
230                                                        } while (0)
231
232#define MUTEX_ATTRIBUTES        (RTEMS_LOCAL           |                \
233                                                        RTEMS_PRIORITY         |                \
234                                                        RTEMS_INHERIT_PRIORITY |                \
235                                                        RTEMS_BINARY_SEMAPHORE)
236
237#define FIRST_ATTEMPT           0x88888888 /* some time that is never reached */
238
239/****************************************************************/
240/* TYPE DEFINITIONS                                             */
241/****************************************************************/
242
243typedef rtems_interval          TimeoutT;
244
245/* 100000th implementation of a doubly linked list;
246 * since only one thread is looking at these,
247 * we need no locking
248 */
249typedef struct ListNodeRec_ {
250        struct ListNodeRec_ *next, *prev;
251} ListNodeRec, *ListNode;
252
253
254/* Structure representing an RPC server */
255typedef struct RpcUdpServerRec_ {
256                RpcUdpServer            next;                   /* linked list of all servers; protected by hlock */
257                union {
258                struct sockaddr_in      sin;
259                struct sockaddr     sa;
260                }                                       addr;
261                AUTH                            *auth;
262                rtems_id                        authlock;               /* must MUTEX the auth object - it's not clear
263                                                                                         *  what is better:
264                                                                                         *   1 having one (MUTEXed) auth per server
265                                                                                         *         who is shared among all transactions
266                                                                                         *         using that server
267                                                                                         *       2 maintaining an AUTH per transaction
268                                                                                         *         (there are then other options: manage
269                                                                                         *         XACT pools on a per-server basis instead
270                                                                                         *         of associating a server with a XACT when
271                                                                                         *   sending)
272                                                                                         * experience will show if the current (1)
273                                                                                         * approach has to be changed.
274                                                                                         */
275                TimeoutT                        retry_period;   /* dynamically adjusted retry period
276                                                                                         * (based on packet roundtrip time)
277                                                                                         */
278                /* STATISTICS */
279                unsigned long           retrans;                /* how many retries were issued by this server         */
280                unsigned long           requests;               /* how many requests have been sent                    */
281                unsigned long       timeouts;           /* how many requests have timed out                    */
282                unsigned long       errors;         /* how many errors have occurred (other than timeouts) */
283                char                            name[20];               /* server's address in IP 'dot' notation               */
284} RpcUdpServerRec;
285
286typedef union  RpcBufU_ {
287                u_long                          xid;
288                char                            buf[1];
289} RpcBufU, *RpcBuf;
290
291/* RX Buffer implementation; this is either
292 * an MBUF chain (MBUF_RX configuration)
293 * or a buffer allocated from the heap
294 * where recvfrom copies the (encoded) reply
295 * to. The XDR routines the copy/decode
296 * it into the user's data structures.
297 */
298#ifdef MBUF_RX
299typedef struct mbuf *           RxBuf;  /* an MBUF chain */
300static  void                            bufFree(struct mbuf **m);
301#define XID(ibuf)                       (*(mtod((ibuf), u_long *)))
302extern void                             xdrmbuf_create(XDR *, struct mbuf *, enum xdr_op);
303#else
304typedef RpcBuf                          RxBuf;
305#define bufFree(b)                      do { MY_FREE(*(b)); *(b)=0; } while(0)
306#define XID(ibuf)                       ((ibuf)->xid)
307#endif
308
309/* A RPC 'transaction' consisting
310 * of server and requestor information,
311 * buffer space and an XDR object
312 * (for encoding arguments).
313 */
314typedef struct RpcUdpXactRec_ {
315                ListNodeRec                     node;           /* so we can put XACTs on a list                */
316                RpcUdpServer            server;         /* server this XACT goes to                     */
317                long                            lifetime;       /* during the lifetime, retry attempts are made */
318                long                            tolive;         /* lifetime timer                               */
319                struct rpc_err          status;         /* RPC reply error status                       */
320                long                            age;            /* age info; needed to manage retransmission    */
321                long                            trip;           /* record round trip time in ticks              */
322                rtems_id                        requestor;      /* the task waiting for this XACT to complete   */
323                RpcUdpXactPool          pool;           /* if this XACT belong to a pool, this is it    */
324                XDR                                     xdrs;           /* argument encoder stream                      */
325                int                                     xdrpos;     /* stream position after the (permanent) header */
326                xdrproc_t                       xres;           /* reply decoder proc - TODO needn't be here    */
327                caddr_t                         pres;           /* reply decoded obj  - TODO needn't be here    */
328#ifndef MBUF_RX
329                int                                     ibufsize;       /* size of the ibuf (bytes)                     */
330#endif
331#ifdef  MBUF_TX
332                int                                     refcnt;         /* mbuf external storage reference count        */
333#endif
334                int                                     obufsize;       /* size of the obuf (bytes)                     */
335                RxBuf                           ibuf;           /* pointer to input buffer assigned by daemon   */
336                RpcBufU                         obuf;       /* output buffer (encoded args) APPENDED HERE   */
337} RpcUdpXactRec;
338
339typedef struct RpcUdpXactPoolRec_ {
340        rtems_id        box;
341        int                     prog;
342        int                     version;
343        int                     xactSize;
344} RpcUdpXactPoolRec;
345
346/* a global hash table where all 'living' transaction
347 * objects are registered.
348 * A number of bits in a transaction's XID maps 1:1 to
349 * an index in this table. Hence, the XACT matching
350 * an RPC/UDP reply packet can quickly be found
351 * The size of this table imposes a hard limit on the
352 * number of all created transactions in the system.
353 */
354static RpcUdpXact xactHashTbl[XACT_HASHS]={0};
355
356/* forward declarations */
357static RpcUdpXact
358sockRcv(void);
359
360static void
361rpcio_daemon(rtems_task_argument);
362
363#ifdef MBUF_TX
364ssize_t
365sendto_nocpy (
366                int s,
367                const void *buf, size_t buflen,
368                int flags,
369                const struct sockaddr *toaddr, int tolen,
370                void *closure,
371                void (*freeproc)(caddr_t, u_int),
372                void (*refproc)(caddr_t, u_int)
373);
374static void paranoia_free(caddr_t closure, u_int size);
375static void paranoia_ref (caddr_t closure, u_int size);
376#define SENDTO  sendto_nocpy
377#else
378#define SENDTO  sendto
379#endif
380
381static RpcUdpServer             rpcUdpServers = 0;      /* linked list of all servers; protected by llock */
382
383static int                              ourSock = -1;           /* the socket we are using for communication */
384static rtems_id                 rpciod  = 0;            /* task id of the RPC daemon                 */
385static rtems_id                 msgQ    = 0;            /* message queue where the daemon picks up
386                                                                                         * requests
387                                                                                         */
388static rtems_id                 llock   = 0;            /* MUTEX protecting the server list */
389static rtems_id                 hlock   = 0;            /* MUTEX protecting the hash table and the list of servers */
390static rtems_id                 fini    = 0;            /* a synchronization semaphore we use during
391                                                                                         * module cleanup / driver unloading
392                                                                                         */
393static rtems_interval   ticksPerSec;            /* cached system clock rate (WHO IS ASSUMED NOT
394                                                                                         * TO CHANGE)
395                                                                                         */
396
397rtems_task_priority             rpciodPriority = 0;
398
399#if (DEBUG) & DEBUG_MALLOC
400/* malloc wrappers for debugging */
401static int nibufs = 0;
402
403static inline void *MY_MALLOC(int s)
404{
405        if (s) {
406                void *rval;
407                MU_LOCK(hlock);
408                assert(nibufs++ < 2000);
409                MU_UNLOCK(hlock);
410                assert(rval = malloc(s));
411                return rval;
412        }
413        return 0;
414}
415
416static inline void *MY_CALLOC(int n, int s)
417{
418        if (s) {
419                void *rval;
420                MU_LOCK(hlock);
421                assert(nibufs++ < 2000);
422                MU_UNLOCK(hlock);
423                assert(rval = calloc(n,s));
424                return rval;
425        }
426        return 0;
427}
428
429
430static inline void MY_FREE(void *p)
431{
432        if (p) {
433                MU_LOCK(hlock);
434                nibufs--;
435                MU_UNLOCK(hlock);
436                free(p);
437        }
438}
439#else
440#define MY_MALLOC       malloc
441#define MY_CALLOC       calloc
442#define MY_FREE         free
443#endif
444
445static inline bool_t
446locked_marshal(RpcUdpServer s, XDR *xdrs)
447{
448bool_t rval;
449        MU_LOCK(s->authlock);
450        rval = AUTH_MARSHALL(s->auth, xdrs);
451        MU_UNLOCK(s->authlock);
452        return rval;
453}
454
455/* Locked operations on a server's auth object */
456static inline bool_t
457locked_validate(RpcUdpServer s, struct opaque_auth *v)
458{
459bool_t rval;
460        MU_LOCK(s->authlock);
461        rval = AUTH_VALIDATE(s->auth, v);
462        MU_UNLOCK(s->authlock);
463        return rval;
464}
465
466static inline bool_t
467locked_refresh(RpcUdpServer s)
468{
469bool_t rval;
470        MU_LOCK(s->authlock);
471        rval = AUTH_REFRESH(s->auth);
472        MU_UNLOCK(s->authlock);
473        return rval;
474}
475
476/* Create a server object
477 *
478 */
479enum clnt_stat
480rpcUdpServerCreate(
481        struct sockaddr_in      *paddr,
482        int                                     prog,
483        int                                     vers,
484        u_long                          uid,
485        u_long                          gid,
486        RpcUdpServer            *psrv
487        )
488{
489RpcUdpServer    rval;
490u_short                 port;
491char                    hname[MAX_MACHINE_NAME + 1];
492int                             theuid, thegid;
493int                             thegids[NGRPS];
494gid_t                   gids[NGROUPS];
495int                             len,i;
496AUTH                    *auth;
497enum clnt_stat  pmap_err;
498struct pmap             pmaparg;
499
500        if ( gethostname(hname, MAX_MACHINE_NAME) ) {
501                fprintf(stderr,
502                                "RPCIO - error: I have no hostname ?? (%s)\n",
503                                strerror(errno));
504                return RPC_UNKNOWNHOST;
505        }
506
507        if ( (len = getgroups(NGROUPS, gids) < 0 ) ) {
508                fprintf(stderr,
509                                "RPCIO - error: I unable to get group ids (%s)\n",
510                                strerror(errno));
511                return RPC_FAILED;
512        }
513
514        if ( len > NGRPS )
515                len = NGRPS;
516
517        for (i=0; i<len; i++)
518                thegids[i] = (int)gids[i];
519
520        theuid = (int) ((RPCIOD_DEFAULT_ID == uid) ? geteuid() : uid);
521        thegid = (int) ((RPCIOD_DEFAULT_ID == gid) ? getegid() : gid);
522
523        if ( !(auth = authunix_create(hname, theuid, thegid, len, thegids)) ) {
524                fprintf(stderr,
525                                "RPCIO - error: unable to create RPC AUTH\n");
526                return RPC_FAILED;
527        }
528
529        /* if they specified no port try to ask the portmapper */
530        if (!paddr->sin_port) {
531
532                paddr->sin_port = htons(PMAPPORT);
533
534        pmaparg.pm_prog = prog;
535        pmaparg.pm_vers = vers;
536        pmaparg.pm_prot = IPPROTO_UDP;
537        pmaparg.pm_port = 0;  /* not needed or used */
538
539
540                /* dont use non-reentrant pmap_getport ! */
541
542                pmap_err = rpcUdpCallRp(
543                                                paddr,
544                                                PMAPPROG,
545                                                PMAPVERS,
546                                                PMAPPROC_GETPORT,
547                                                xdr_pmap,
548                                                &pmaparg,
549                                                xdr_u_short,
550                                                &port,
551                                                uid,
552                                                gid,
553                                                0);
554
555                if ( RPC_SUCCESS != pmap_err ) {
556                        paddr->sin_port = 0;
557                        return pmap_err;
558                }
559
560                paddr->sin_port = htons(port);
561        }
562
563        if (0==paddr->sin_port) {
564                        return RPC_PROGNOTREGISTERED;
565        }
566
567        rval                            = (RpcUdpServer)MY_MALLOC(sizeof(*rval));
568        memset(rval, 0, sizeof(*rval));
569
570        if (!inet_ntop(AF_INET, &paddr->sin_addr, rval->name, sizeof(rval->name)))
571                sprintf(rval->name,"?.?.?.?");
572        rval->addr.sin          = *paddr;
573
574        /* start with a long retransmission interval - it
575         * will be adapted dynamically
576         */
577        rval->retry_period  = RPCIOD_RETX_CAP_S * ticksPerSec;
578
579        rval->auth                      = auth;
580
581        MU_CREAT( &rval->authlock );
582
583        /* link into list */
584        MU_LOCK( llock );
585        rval->next = rpcUdpServers;
586        rpcUdpServers = rval;
587        MU_UNLOCK( llock );
588
589        *psrv                           = rval;
590        return RPC_SUCCESS;
591}
592
593void
594rpcUdpServerDestroy(RpcUdpServer s)
595{
596RpcUdpServer prev;
597        if (!s)
598                return;
599        /* we should probably verify (but how?) that nobody
600         * (at least: no outstanding XACTs) is using this
601         * server;
602         */
603
604        /* remove from server list */
605        MU_LOCK(llock);
606        prev = rpcUdpServers;
607        if ( s == prev ) {
608                rpcUdpServers = s->next;
609        } else {
610                for ( ; prev ; prev = prev->next) {
611                        if (prev->next == s) {
612                                prev->next = s->next;
613                                break;
614                        }
615                }
616        }
617        MU_UNLOCK(llock);
618
619        /* MUST have found it */
620        assert(prev);
621
622        auth_destroy(s->auth);
623
624        MU_DESTROY(s->authlock);
625        MY_FREE(s);
626}
627
628int
629rpcUdpStats(FILE *f)
630{
631RpcUdpServer s;
632
633        if (!f) f = stdout;
634
635        fprintf(f,"RPCIOD statistics:\n");
636
637        MU_LOCK(llock);
638        for (s = rpcUdpServers; s; s=s->next) {
639                fprintf(f,"\nServer -- %s:\n", s->name);
640                fprintf(f,"  requests    sent: %10ld, retransmitted: %10ld\n",
641                                                s->requests, s->retrans);
642                fprintf(f,"         timed out: %10ld,   send errors: %10ld\n",
643                                                s->timeouts, s->errors);
644                fprintf(f,"  current retransmission interval: %dms\n",
645                                                (unsigned)(s->retry_period * 1000 / ticksPerSec) );
646        }
647        MU_UNLOCK(llock);
648
649        return 0;
650}
651
652RpcUdpXact
653rpcUdpXactCreate(
654        u_long  program,
655        u_long  version,
656        u_long  size
657        )
658{
659RpcUdpXact              rval=0;
660struct rpc_msg  header;
661register int    i,j;
662
663        if (!size)
664                size = UDPMSGSIZE;
665        /* word align */
666        size = (size + 3) & ~3;
667
668        rval = (RpcUdpXact)MY_CALLOC(1,sizeof(*rval) - sizeof(rval->obuf) + size);
669
670        if (rval) {
671       
672                header.rm_xid             = 0;
673                header.rm_direction       = CALL;
674                header.rm_call.cb_rpcvers = RPC_MSG_VERSION;
675                header.rm_call.cb_prog    = program;
676                header.rm_call.cb_vers    = version;
677                xdrmem_create(&(rval->xdrs), rval->obuf.buf, size, XDR_ENCODE);
678
679                if (!xdr_callhdr(&(rval->xdrs), &header)) {
680                        MY_FREE(rval);
681                        return 0;
682                }
683                /* pick a free table slot and initialize the XID */
684                rval->obuf.xid = time(0) ^ (unsigned long)rval;
685                MU_LOCK(hlock);
686                i=j=(rval->obuf.xid & XACT_HASH_MSK);
687                if (msgQ) {
688                        /* if there's no message queue, refuse to
689                         * give them transactions; we might be in the process to
690                         * go away...
691                         */
692                        do {
693                                i=(i+1) & XACT_HASH_MSK; /* cheap modulo */
694                                if (!xactHashTbl[i]) {
695#if (DEBUG) & DEBUG_TRACE_XACT
696                                        fprintf(stderr,"RPCIO: entering index %i, val %x\n",i,rval);
697#endif
698                                        xactHashTbl[i]=rval;
699                                        j=-1;
700                                        break;
701                                }
702                        } while (i!=j);
703                }
704                MU_UNLOCK(hlock);
705                if (i==j) {
706                        XDR_DESTROY(&rval->xdrs);
707                        MY_FREE(rval);
708                        return 0;
709                }
710                rval->obuf.xid  = (rval->obuf.xid << LD_XACT_HASH) | i;
711                rval->xdrpos    = XDR_GETPOS(&(rval->xdrs));
712                rval->obufsize  = size;
713        }
714        return rval;
715}
716
717void
718rpcUdpXactDestroy(RpcUdpXact xact)
719{
720int i = xact->obuf.xid & XACT_HASH_MSK;
721
722#if (DEBUG) & DEBUG_TRACE_XACT
723                fprintf(stderr,"RPCIO: removing index %i, val %x\n",i,xact);
724#endif
725
726                ASSERT( xactHashTbl[i]==xact );
727
728                MU_LOCK(hlock);
729                xactHashTbl[i]=0;
730                MU_UNLOCK(hlock);
731
732                bufFree(&xact->ibuf);
733
734                XDR_DESTROY(&xact->xdrs);
735                MY_FREE(xact);
736}
737
738
739
740/* Send a transaction, i.e. enqueue it to the
741 * RPC daemon who will actually send it.
742 */
743enum clnt_stat
744rpcUdpSend(
745        RpcUdpXact              xact,
746        RpcUdpServer    srvr,
747        struct timeval  *timeout,
748        u_long                  proc,
749        xdrproc_t               xres, caddr_t pres,
750        xdrproc_t               xargs, caddr_t pargs,
751        ...
752   )
753{
754register XDR    *xdrs;
755unsigned long   ms;
756va_list                 ap;
757
758        va_start(ap,pargs);
759
760        if (!timeout)
761                timeout = RPCIOD_DEFAULT_TIMEOUT;
762
763        ms = 1000 * timeout->tv_sec + timeout->tv_usec/1000;
764
765        /* round lifetime to closest # of ticks */
766        xact->lifetime  = (ms * ticksPerSec + 500) / 1000;
767        if ( 0 == xact->lifetime )
768                xact->lifetime = 1;
769
770#if (DEBUG) & DEBUG_TIMEOUT
771        {
772        static int once=0;
773        if (!once++) {
774                fprintf(stderr,
775                                "Initial lifetime: %i (ticks)\n",
776                                xact->lifetime);
777        }
778        }
779#endif
780
781        xact->tolive    = xact->lifetime;
782
783        xact->xres      = xres;
784        xact->pres      = pres;
785        xact->server    = srvr;
786
787        xdrs            = &xact->xdrs;
788        xdrs->x_op      = XDR_ENCODE;
789        /* increment transaction ID */
790        xact->obuf.xid += XACT_HASHS;
791        XDR_SETPOS(xdrs, xact->xdrpos);
792        if ( !XDR_PUTLONG(xdrs,(long*)&proc) || !locked_marshal(srvr, xdrs) ||
793                 !xargs(xdrs, pargs) ) {
794                va_end(ap);
795                return(xact->status.re_status=RPC_CANTENCODEARGS);
796        }
797        while ((xargs=va_arg(ap,xdrproc_t))) {
798                if (!xargs(xdrs, va_arg(ap,caddr_t)))
799                va_end(ap);
800                return(xact->status.re_status=RPC_CANTENCODEARGS);
801        }
802
803        va_end(ap);
804
805        rtems_task_ident(RTEMS_SELF, RTEMS_WHO_AM_I, &xact->requestor);
806        if ( rtems_message_queue_send( msgQ, &xact, sizeof(xact)) ) {
807                return RPC_CANTSEND;
808        }
809        /* wakeup the rpciod */
810        ASSERT( RTEMS_SUCCESSFUL==rtems_event_send(rpciod, RPCIOD_TX_EVENT) );
811
812        return RPC_SUCCESS;
813}
814
815/* Block for the RPC reply to an outstanding
816 * transaction.
817 * The caller is woken by the RPC daemon either
818 * upon reception of the reply or on timeout.
819 */
820enum clnt_stat
821rpcUdpRcv(RpcUdpXact xact)
822{
823int                                     refresh;
824XDR                                     reply_xdrs;
825struct rpc_msg          reply_msg;
826rtems_event_set         gotEvents;
827
828        refresh = 0;
829
830        do {
831
832        /* block for the reply */
833        ASSERT( RTEMS_SUCCESSFUL ==
834                        rtems_event_receive(
835                                        RTEMS_RPC_EVENT,
836                                        RTEMS_WAIT | RTEMS_EVENT_ANY,
837                                        RTEMS_NO_TIMEOUT,
838                                        &gotEvents) );
839
840        if (xact->status.re_status) {
841#ifdef MBUF_RX
842                /* add paranoia */
843                ASSERT( !xact->ibuf );
844#endif
845                return xact->status.re_status;
846        }
847
848#ifdef MBUF_RX
849        xdrmbuf_create(&reply_xdrs, xact->ibuf, XDR_DECODE);
850#else
851        xdrmem_create(&reply_xdrs, xact->ibuf->buf, xact->ibufsize, XDR_DECODE);
852#endif
853
854        reply_msg.acpted_rply.ar_verf          = _null_auth;
855        reply_msg.acpted_rply.ar_results.where = xact->pres;
856        reply_msg.acpted_rply.ar_results.proc  = xact->xres;
857
858        if (xdr_replymsg(&reply_xdrs, &reply_msg)) {
859                /* OK */
860                _seterr_reply(&reply_msg, &xact->status);
861                if (RPC_SUCCESS == xact->status.re_status) {
862                        if ( !locked_validate(xact->server,
863                                                                &reply_msg.acpted_rply.ar_verf) ) {
864                                xact->status.re_status = RPC_AUTHERROR;
865                                xact->status.re_why    = AUTH_INVALIDRESP;
866                        }
867                        if (reply_msg.acpted_rply.ar_verf.oa_base) {
868                                reply_xdrs.x_op = XDR_FREE;
869                                xdr_opaque_auth(&reply_xdrs, &reply_msg.acpted_rply.ar_verf);
870                        }
871                        refresh = 0;
872                } else {
873                        /* should we try to refresh our credentials ? */
874                        if ( !refresh ) {
875                                /* had never tried before */
876                                refresh = RPCIOD_REFRESH;
877                        }
878                }
879        } else {
880                reply_xdrs.x_op        = XDR_FREE;
881                xdr_replymsg(&reply_xdrs, &reply_msg);
882                xact->status.re_status = RPC_CANTDECODERES;
883        }
884        XDR_DESTROY(&reply_xdrs);
885
886        bufFree(&xact->ibuf);
887
888#ifndef MBUF_RX
889        xact->ibufsize = 0;
890#endif
891       
892        if (refresh && locked_refresh(xact->server)) {
893                rtems_task_ident(RTEMS_SELF, RTEMS_WHO_AM_I, &xact->requestor);
894                if ( rtems_message_queue_send(msgQ, &xact, sizeof(xact)) ) {
895                        return RPC_CANTSEND;
896                }
897                /* wakeup the rpciod */
898                fprintf(stderr,"RPCIO INFO: refreshing my AUTH\n");
899                ASSERT( RTEMS_SUCCESSFUL==rtems_event_send(rpciod, RPCIOD_TX_EVENT) );
900        }
901
902        } while ( 0 &&  refresh-- > 0 );
903
904        return xact->status.re_status;
905}
906
907
908/* On RTEMS, I'm told to avoid select(); this seems to
909 * be more efficient
910 */
911static void
912rxWakeupCB(struct socket *sock, caddr_t arg)
913{
914rtems_event_send((rtems_id)arg, RPCIOD_RX_EVENT);
915}
916
917int
918rpcUdpInit(void)
919{
920int                                     noblock = 1;
921struct sockwakeup       wkup;
922
923        if (ourSock < 0) {
924    fprintf(stderr,"RTEMS-RPCIOD $Release$, " \
925            "Till Straumann, Stanford/SLAC/SSRL 2002, " \
926            "See LICENSE file for licensing info.\n");
927
928                ourSock=socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
929                if (ourSock>=0) {
930                        bindresvport(ourSock,(struct sockaddr_in*)0);
931                        assert( 0==ioctl(ourSock, FIONBIO, (char*)&noblock) );
932                        /* assume nobody tampers with the clock !! */
933                        assert( RTEMS_SUCCESSFUL == rtems_clock_get(
934                                                                                        RTEMS_CLOCK_GET_TICKS_PER_SECOND,
935                                                                                        &ticksPerSec));
936                        MU_CREAT( &hlock );
937                        MU_CREAT( &llock );
938
939                        if ( !rpciodPriority ) {
940                                /* use configured networking priority */
941                                if ( ! (rpciodPriority = rtems_bsdnet_config.network_task_priority) )
942                                        rpciodPriority = RPCIOD_PRIO;   /* fallback value */
943                        }
944
945                        assert( RTEMS_SUCCESSFUL == rtems_task_create(
946                                                                                        rtems_build_name('R','P','C','d'),
947                                                                                        rpciodPriority,
948                                                                                        RPCIOD_STACK,
949                                                                                        RTEMS_DEFAULT_MODES,
950                                                                                        /* fprintf saves/restores FP registers on PPC :-( */
951                                                                                        RTEMS_DEFAULT_ATTRIBUTES | RTEMS_FLOATING_POINT,
952                                                                                        &rpciod) );
953                        wkup.sw_pfn = rxWakeupCB;
954                        wkup.sw_arg = (caddr_t)rpciod;
955                        assert( 0==setsockopt(ourSock, SOL_SOCKET, SO_RCVWAKEUP, &wkup, sizeof(wkup)) );
956                        assert( RTEMS_SUCCESSFUL == rtems_message_queue_create(
957                                                                                        rtems_build_name('R','P','C','q'),
958                                                                                        RPCIOD_QDEPTH,
959                                                                                        sizeof(RpcUdpXact),
960                                                                                        RTEMS_DEFAULT_ATTRIBUTES,
961                                                                                        &msgQ) );
962                        assert( RTEMS_SUCCESSFUL == rtems_task_start(
963                                                                                        rpciod,
964                                                                                        rpcio_daemon,
965                                                                                        0 ) );
966
967                } else {
968                        return -1;
969                }
970        }
971        return 0;
972}
973
974int
975rpcUdpCleanup(void)
976{
977        rtems_semaphore_create(
978                        rtems_build_name('R','P','C','f'),
979                        0,
980                        RTEMS_DEFAULT_ATTRIBUTES,
981                        0,
982                        &fini);
983        rtems_event_send(rpciod, RPCIOD_KILL_EVENT);
984        /* synchronize with daemon */
985        rtems_semaphore_obtain(fini, RTEMS_WAIT, 5*ticksPerSec);
986        /* if the message queue is still there, something went wrong */
987        if (!msgQ) {
988                rtems_task_delete(rpciod);
989        }
990        rtems_semaphore_delete(fini);
991        return (msgQ !=0);
992}
993
994/* Another API - simpler but less efficient.
995 * For each RPCall, a server and a Xact
996 * are created and destroyed on the fly.
997 *
998 * This should be used for infrequent calls
999 * (e.g. a NFS mount request).
1000 *
1001 * This is roughly compatible with the original
1002 * clnt_call() etc. API - but it uses our
1003 * daemon and is fully reentrant.
1004 */
1005enum clnt_stat
1006rpcUdpClntCreate(
1007                struct sockaddr_in *psaddr,
1008                int                                     prog,
1009                int                                     vers,
1010                u_long                          uid,
1011                u_long                          gid,
1012                RpcUdpClnt                      *pclnt
1013)
1014{
1015RpcUdpXact              x;
1016RpcUdpServer    s;
1017enum clnt_stat  err;
1018
1019        if ( RPC_SUCCESS != (err=rpcUdpServerCreate(psaddr, prog, vers, uid, gid, &s)) )
1020                return err;
1021
1022        if ( !(x=rpcUdpXactCreate(prog, vers, UDPMSGSIZE)) ) {
1023                rpcUdpServerDestroy(s);
1024                return RPC_FAILED;
1025        }
1026        /* TODO: could maintain a server cache */
1027
1028        x->server = s;
1029       
1030        *pclnt = x;
1031
1032        return RPC_SUCCESS;
1033}
1034
1035void
1036rpcUdpClntDestroy(RpcUdpClnt xact)
1037{
1038        rpcUdpServerDestroy(xact->server);
1039        rpcUdpXactDestroy(xact);
1040}
1041
1042enum clnt_stat
1043rpcUdpClntCall(
1044        RpcUdpClnt              xact,
1045        u_long                  proc,
1046        XdrProcT                xargs,
1047        CaddrT                  pargs,
1048        XdrProcT                xres,
1049        CaddrT                  pres,
1050        struct timeval  *timeout
1051        )
1052{
1053enum clnt_stat  stat;
1054
1055                if ( (stat = rpcUdpSend(xact, xact->server, timeout, proc,
1056                                                                xres, pres,
1057                                                                xargs, pargs,
1058                                                                0)) ) {
1059                        fprintf(stderr,"RPCIO Send failed: %i\n",stat);
1060                        return stat;
1061                }
1062                return rpcUdpRcv(xact);
1063}
1064
1065/* a yet simpler interface */
1066enum clnt_stat
1067rpcUdpCallRp(
1068        struct sockaddr_in      *psrvr,
1069        u_long                          prog,
1070        u_long                          vers,
1071        u_long                          proc,
1072        XdrProcT                        xargs,
1073        CaddrT                          pargs,
1074        XdrProcT                        xres,
1075        CaddrT                          pres,
1076        u_long                          uid,            /* RPCIO_DEFAULT_ID picks default */
1077        u_long                          gid,            /* RPCIO_DEFAULT_ID picks default */
1078        struct timeval          *timeout        /* NULL picks default           */
1079)
1080{
1081RpcUdpClnt                      clp;
1082enum clnt_stat          stat;
1083
1084        stat = rpcUdpClntCreate(
1085                                psrvr,
1086                                prog,
1087                                vers,
1088                                uid,
1089                                gid,
1090                                &clp);
1091
1092        if ( RPC_SUCCESS != stat )
1093                return stat;
1094
1095        stat = rpcUdpClntCall(
1096                                clp,
1097                                proc,
1098                                xargs, pargs,
1099                                xres,  pres,
1100                                timeout);
1101
1102        rpcUdpClntDestroy(clp);
1103
1104        return stat;
1105}
1106
1107/* linked list primitives */
1108static void
1109nodeXtract(ListNode n)
1110{
1111        if (n->prev)
1112                n->prev->next = n->next;
1113        if (n->next)
1114                n->next->prev = n->prev;
1115        n->next = n->prev = 0;
1116}
1117
1118static void
1119nodeAppend(ListNode l, ListNode n)
1120{
1121        if ( (n->next = l->next) )
1122                n->next->prev = n;
1123        l->next = n;
1124        n->prev = l;
1125       
1126}
1127
1128/* this code does the work */
1129static void
1130rpcio_daemon(rtems_task_argument arg)
1131{
1132rtems_status_code stat;
1133RpcUdpXact        xact;
1134RpcUdpServer      srv;
1135rtems_interval    next_retrans, then, unow;
1136long                      now;  /* need to do signed comparison with age! */
1137rtems_event_set   events;
1138ListNode          newList;
1139size_t            size;
1140rtems_id          q          =  0;
1141ListNodeRec       listHead   = {0};
1142unsigned long     epoch      = RPCIOD_EPOCH_SECS * ticksPerSec;
1143unsigned long     max_period = RPCIOD_RETX_CAP_S * ticksPerSec;
1144
1145        assert( RTEMS_SUCCESSFUL == rtems_clock_get(
1146                                                                        RTEMS_CLOCK_GET_TICKS_SINCE_BOOT,
1147                                                                        &then) );
1148
1149        for (next_retrans = epoch;;) {
1150
1151                if ( RTEMS_SUCCESSFUL !=
1152                         (stat = rtems_event_receive(
1153                                                RPCIOD_RX_EVENT | RPCIOD_TX_EVENT | RPCIOD_KILL_EVENT,
1154                                                RTEMS_WAIT | RTEMS_EVENT_ANY,
1155                                                next_retrans,
1156                                                &events)) ) {
1157                        ASSERT( RTEMS_TIMEOUT == stat );
1158                        events = 0;
1159                }
1160
1161                if (events & RPCIOD_KILL_EVENT) {
1162                        int i;
1163
1164#if (DEBUG) & DEBUG_EVENTS
1165                        fprintf(stderr,"RPCIO: got KILL event\n");
1166#endif
1167
1168                        MU_LOCK(hlock);
1169                        for (i=XACT_HASHS-1; i>=0; i--) {
1170                                if (xactHashTbl[i]) {
1171                                        break;
1172                                }
1173                        }
1174                        if (i<0) {
1175                                /* prevent them from creating and enqueueing more messages */
1176                                q=msgQ;
1177                                /* messages queued after we executed this assignment will fail */
1178                                msgQ=0;
1179                        }
1180                        MU_UNLOCK(hlock);
1181                        if (i>=0) {
1182                                fprintf(stderr,"RPCIO There are still transactions circulating; I refuse to go away\n");
1183                                fprintf(stderr,"(1st in slot %i)\n",i);
1184                                rtems_semaphore_release(fini);
1185                        } else {
1186                                break;
1187                        }
1188                }
1189
1190                ASSERT( RTEMS_SUCCESSFUL == rtems_clock_get(
1191                                                                                RTEMS_CLOCK_GET_TICKS_SINCE_BOOT,
1192                                                                                &unow ) );
1193
1194                /* measure everything relative to then to protect against
1195                 * rollover
1196                 */
1197                now = unow - then;
1198
1199                /* NOTE: we don't lock the hash table while we are operating
1200                 * on transactions; the paradigm is that we 'own' a particular
1201                 * transaction (and hence it's hash table slot) from the
1202                 * time the xact was put into the message queue until we
1203                 * wake up the requestor.
1204                 */
1205
1206                if (RPCIOD_RX_EVENT & events) {
1207
1208#if (DEBUG) & DEBUG_EVENTS
1209                        fprintf(stderr,"RPCIO: got RX event\n");
1210#endif
1211
1212                        while ((xact=sockRcv())) {
1213
1214                                /* extract from the retransmission list */
1215                                nodeXtract(&xact->node);
1216
1217                                /* change the ID - there might already be
1218                                 * a retransmission on the way. When it's
1219                                 * reply arrives we must not find it's ID
1220                                 * in the hashtable
1221                                 */
1222                                xact->obuf.xid        += XACT_HASHS;
1223
1224                                xact->status.re_status = RPC_SUCCESS;
1225
1226                                /* calculate roundtrip ticks */
1227                                xact->trip             = now - xact->trip;
1228
1229                                srv                    = xact->server;
1230
1231                                /* adjust the server's retry period */
1232                                {
1233                                        register TimeoutT rtry = srv->retry_period;
1234                                        register TimeoutT trip = xact->trip;
1235
1236                                        ASSERT( trip >= 0 );
1237
1238                                        if ( 0==trip )
1239                                                trip = 1;
1240
1241                                        /* retry_new = 0.75*retry_old + 0.25 * 8 * roundrip */
1242                                        rtry   = (3*rtry + (trip << 3)) >> 2;
1243
1244                                        if ( rtry > max_period )
1245                                                rtry = max_period;
1246
1247                                        srv->retry_period = rtry;
1248                                }
1249
1250                                /* wakeup requestor */
1251                                rtems_event_send(xact->requestor, RTEMS_RPC_EVENT);
1252                        }
1253                }
1254
1255                if (RPCIOD_TX_EVENT & events) {
1256
1257#if (DEBUG) & DEBUG_EVENTS
1258                        fprintf(stderr,"RPCIO: got TX event\n");
1259#endif
1260
1261                        while (RTEMS_SUCCESSFUL == rtems_message_queue_receive(
1262                                                                                        msgQ,
1263                                                                                        &xact,
1264                                                                                        &size,
1265                                                                                        RTEMS_NO_WAIT,
1266                                                                                        RTEMS_NO_TIMEOUT)) {
1267                                /* put to the head of timeout q */
1268                                nodeAppend(&listHead, &xact->node);
1269
1270                                xact->age  = now;
1271                                xact->trip = FIRST_ATTEMPT;
1272                        }
1273                }
1274
1275
1276                /* work the timeout q */
1277                newList = 0;
1278                for ( xact=(RpcUdpXact)listHead.next;
1279                          xact && xact->age <= now;
1280                          xact=(RpcUdpXact)listHead.next ) {
1281
1282                                /* extract from the list */
1283                                nodeXtract(&xact->node);
1284
1285                                srv = xact->server;
1286
1287                                if (xact->tolive < 0) {
1288                                        /* this one timed out */
1289                                        xact->status.re_errno  = ETIMEDOUT;
1290                                        xact->status.re_status = RPC_TIMEDOUT;
1291
1292                                        srv->timeouts++;
1293
1294                                        /* Change the ID - there might still be
1295                                         * a reply on the way. When it arrives we
1296                                         * must not find it's ID in the hash table
1297                                         *
1298                                         * Thanks to Steven Johnson for hunting this
1299                                         * one down.
1300                                         */
1301                                        xact->obuf.xid        += XACT_HASHS;
1302
1303#if (DEBUG) & DEBUG_TIMEOUT
1304                                        fprintf(stderr,"RPCIO XACT timed out; waking up requestor\n");
1305#endif
1306                                        if ( rtems_event_send(xact->requestor, RTEMS_RPC_EVENT) ) {
1307                                                rtems_panic("RPCIO PANIC file %s line: %i, requestor id was 0x%08x",
1308                                                                        __FILE__,
1309                                                                        __LINE__,
1310                                                                        xact->requestor);
1311                                        }       
1312
1313                                } else {
1314                                        int len;
1315
1316                                        len = (int)XDR_GETPOS(&xact->xdrs);
1317                               
1318#ifdef MBUF_TX
1319                                        xact->refcnt = 1;       /* sendto itself */
1320#endif
1321                                        if ( len != SENDTO( ourSock,
1322                                                                                xact->obuf.buf,
1323                                                                                len,
1324                                                                                0,
1325                                                                                &srv->addr.sa,
1326                                                                                sizeof(srv->addr.sin)
1327#ifdef MBUF_TX
1328                                                                                , xact,
1329                                                                                paranoia_free,
1330                                                                                paranoia_ref
1331#endif
1332                                                                                ) ) {
1333
1334                                                xact->status.re_errno  = errno;
1335                                                xact->status.re_status = RPC_CANTSEND;
1336                                                srv->errors++;
1337
1338                                                /* wakeup requestor */
1339                                                fprintf(stderr,"RPCIO: SEND failure\n");
1340                                                ASSERT( RTEMS_SUCCESSFUL ==
1341                                                                        rtems_event_send(xact->requestor, RTEMS_RPC_EVENT) );
1342
1343                                        } else {
1344                                                /* send successful; calculate retransmission time
1345                                                 * and enqueue to temporary list
1346                                                 */
1347                                                if (FIRST_ATTEMPT != xact->trip) {
1348#if (DEBUG) & DEBUG_TIMEOUT
1349                                                        fprintf(stderr,
1350                                                                "timed out; tolive is %i (ticks), retry period is %i (ticks)\n",
1351                                                                        xact->tolive,
1352                                                                        srv->retry_period);
1353#endif
1354                                                        /* this is a real retry; we backup
1355                                                         * the server's retry interval
1356                                                         */
1357                                                        if ( srv->retry_period < max_period ) {
1358
1359                                                                /* If multiple transactions for this server
1360                                                                 * fail (e.g. because it died) this will
1361                                                                 * back-off very agressively (doubling
1362                                                                 * the retransmission period for every
1363                                                                 * timed out transaction up to the CAP limit)
1364                                                                 * which is desirable - single packet failure
1365                                                                 * is treated more gracefully by this algorithm.
1366                                                                 */
1367
1368                                                                srv->retry_period<<=1;
1369#if (DEBUG) & DEBUG_TIMEOUT
1370                                                                fprintf(stderr,
1371                                                                                "adjusted to; retry period %i\n",
1372                                                                                srv->retry_period);
1373#endif
1374                                                        } else {
1375                                                                /* never wait longer than RPCIOD_RETX_CAP_S seconds */
1376                                                                fprintf(stderr,
1377                                                                                "RPCIO: server '%s' not responding - still trying\n",
1378                                                                                srv->name);
1379                                                        }
1380                                                        if ( 0 == ++srv->retrans % 1000) {
1381                                                                fprintf(stderr,
1382                                                                                "RPCIO - statistics: already %li retries to server %s\n",
1383                                                                                srv->retrans,
1384                                                                                srv->name);
1385                                                        }
1386                                                } else {
1387                                                        srv->requests++;
1388                                                }
1389                                                xact->trip      = now;
1390                                                {
1391                                                long capped_period = srv->retry_period;
1392                                                        if ( xact->lifetime < capped_period )
1393                                                                capped_period = xact->lifetime;
1394                                                xact->age       = now + capped_period;
1395                                                xact->tolive   -= capped_period;
1396                                                }
1397                                                /* enqueue to the list of newly sent transactions */
1398                                                xact->node.next = newList;
1399                                                newList         = &xact->node;
1400#if (DEBUG) & DEBUG_TIMEOUT
1401                                                fprintf(stderr,
1402                                                                "XACT (0x%08x) age is 0x%x, now: 0x%x\n",
1403                                                                xact,
1404                                                                xact->age,
1405                                                                now);
1406#endif
1407                                        }
1408                                }
1409            }
1410
1411                /* insert the newly sent transactions into the
1412                 * sorted retransmission list
1413                 */
1414                for (; (xact = (RpcUdpXact)newList); ) {
1415                        register ListNode p,n;
1416                        newList = newList->next;
1417                        for ( p=&listHead; (n=p->next) && xact->age > ((RpcUdpXact)n)->age; p=n )
1418                                /* nothing else to do */;
1419                        nodeAppend(p, &xact->node);
1420                }
1421
1422                if (now > epoch) {
1423                        /* every now and then, readjust the epoch */
1424                        register ListNode n;
1425                        then += now;
1426                        for (n=listHead.next; n; n=n->next) {
1427                                /* readjust outstanding time intervals subject to the
1428                                 * condition that the 'absolute' time must remain
1429                                 * the same. 'age' and 'trip' are measured with
1430                                 * respect to 'then' - hence:
1431                                 *
1432                                 * abs_age == old_age + old_then == new_age + new_then
1433                                 *
1434                                 * ==> new_age = old_age + old_then - new_then == old_age - 'now'
1435                                 */
1436                                ((RpcUdpXact)n)->age  -= now;
1437                                ((RpcUdpXact)n)->trip -= now;
1438#if (DEBUG) & DEBUG_TIMEOUT
1439                                fprintf(stderr,
1440                                                "readjusted XACT (0x%08x); age is 0x%x, trip: 0x%x now: 0x%x\n",
1441                                                (RpcUdpXact)n,
1442                                                ((RpcUdpXact)n)->trip,
1443                                                ((RpcUdpXact)n)->age,
1444                                                now);
1445#endif
1446                        }
1447                        now = 0;
1448                }
1449
1450                next_retrans = listHead.next ?
1451                                                        ((RpcUdpXact)listHead.next)->age - now :
1452                                                        epoch;  /* make sure we don't miss updating the epoch */
1453#if (DEBUG) & DEBUG_TIMEOUT
1454                fprintf(stderr,"RPCIO: next timeout is %x\n",next_retrans);
1455#endif
1456        }
1457        /* close our socket; shut down the receiver */
1458        close(ourSock);
1459
1460#if 0 /* if we get here, no transactions exist, hence there can be none
1461           * in the queue whatsoever
1462           */
1463        /* flush the message queue */
1464        while (RTEMS_SUCCESSFUL == rtems_message_queue_receive(
1465                                                                                q,
1466                                                                                &xact,
1467                                                                                &size,
1468                                                                                RTEMS_NO_WAIT,
1469                                                                                RTEMS_NO_TIMEOUT)) {
1470                        /* TODO enque xact */
1471        }
1472
1473        /* flush all outstanding transactions */
1474
1475        for (xact=((RpcUdpXact)listHead.next); xact; xact=((RpcUdpXact)xact->node.next)) {
1476                        xact->status.re_status = RPC_TIMEDOUT;
1477                        rtems_event_send(xact->requestor, RTEMS_RPC_EVENT);
1478        }
1479#endif
1480
1481        rtems_message_queue_delete(q);
1482
1483        MU_DESTROY(hlock);
1484
1485        fprintf(stderr,"RPC daemon exited...\n");
1486
1487        rtems_semaphore_release(fini);
1488        rtems_task_suspend(RTEMS_SELF);
1489}
1490
1491
1492/* support for transaction 'pools'. A number of XACT objects
1493 * is always kept around. The initial number is 0 but it
1494 * is allowed to grow up to a maximum.
1495 * If the need grows beyond the maximum, behavior depends:
1496 * Users can either block until a transaction becomes available,
1497 * they can create a new XACT on the fly or get an error
1498 * if no free XACT is available from the pool.
1499 */
1500
1501RpcUdpXactPool
1502rpcUdpXactPoolCreate(
1503        int prog,               int version,
1504        int xactsize,   int poolsize)
1505{
1506RpcUdpXactPool  rval = MY_MALLOC(sizeof(*rval));
1507
1508        ASSERT( rval &&
1509                        RTEMS_SUCCESSFUL == rtems_message_queue_create(
1510                                                                        rtems_build_name('R','P','C','p'),
1511                                                                        poolsize,
1512                                                                        sizeof(RpcUdpXact),
1513                                                                        RTEMS_DEFAULT_ATTRIBUTES,
1514                                                                        &rval->box) );
1515        rval->prog     = prog;
1516        rval->version  = version;
1517        rval->xactSize = xactsize;
1518        return rval;
1519}
1520
1521void
1522rpcUdpXactPoolDestroy(RpcUdpXactPool pool)
1523{
1524RpcUdpXact xact;
1525
1526        while ((xact = rpcUdpXactPoolGet(pool, XactGetFail))) {
1527                rpcUdpXactDestroy(xact);
1528        }
1529        rtems_message_queue_delete(pool->box);
1530        MY_FREE(pool);
1531}
1532
1533RpcUdpXact
1534rpcUdpXactPoolGet(RpcUdpXactPool pool, XactPoolGetMode mode)
1535{
1536RpcUdpXact       xact = 0;
1537size_t           size;
1538
1539        if (RTEMS_SUCCESSFUL != rtems_message_queue_receive(
1540                                                                pool->box,
1541                                                                &xact,
1542                                                                &size,
1543                                                                XactGetWait == mode ?
1544                                                                        RTEMS_WAIT : RTEMS_NO_WAIT,
1545                                                                RTEMS_NO_TIMEOUT)) {
1546
1547                /* nothing found in box; should we create a new one ? */
1548
1549                xact = (XactGetCreate == mode) ?
1550                                        rpcUdpXactCreate(
1551                                                        pool->prog,
1552                                                        pool->version,
1553                                                        pool->xactSize) : 0 ;
1554                if (xact)
1555                                xact->pool = pool;
1556
1557        }
1558        return xact;
1559}
1560
1561void
1562rpcUdpXactPoolPut(RpcUdpXact xact)
1563{
1564RpcUdpXactPool pool;
1565        ASSERT( pool=xact->pool );
1566        if (RTEMS_SUCCESSFUL != rtems_message_queue_send(
1567                                                                pool->box,
1568                                                                &xact,
1569                                                                sizeof(xact)))
1570                rpcUdpXactDestroy(xact);
1571}
1572
1573#ifdef MBUF_RX
1574
1575/* WORKAROUND: include sys/mbuf.h (or other bsdnet headers) only
1576 *             _after_ using malloc()/free() & friends because
1577 *             the RTEMS/BSDNET headers redefine those :-(
1578 */
1579
1580#define KERNEL
1581#define _KERNEL
1582#include <sys/mbuf.h>
1583
1584ssize_t
1585recv_mbuf_from(int s, struct mbuf **ppm, long len, struct sockaddr *fromaddr, int *fromlen);
1586
1587static void
1588bufFree(struct mbuf **m)
1589{
1590        if (*m) {
1591                rtems_bsdnet_semaphore_obtain();
1592                m_freem(*m);
1593                rtems_bsdnet_semaphore_release();
1594                *m = 0;
1595        }
1596}
1597#endif
1598
1599#ifdef MBUF_TX
1600static void
1601paranoia_free(caddr_t closure, u_int size)
1602{
1603#if (DEBUG)
1604RpcUdpXact xact = (RpcUdpXact)closure;
1605int        len  = (int)XDR_GETPOS(&xact->xdrs);
1606
1607        ASSERT( --xact->refcnt >= 0 && size == len );
1608#endif
1609}
1610
1611static void
1612paranoia_ref (caddr_t closure, u_int size)
1613{
1614#if (DEBUG)
1615RpcUdpXact xact = (RpcUdpXact)closure;
1616int        len  = (int)XDR_GETPOS(&xact->xdrs);
1617        ASSERT( size == len );
1618        xact->refcnt++;
1619#endif
1620}
1621#endif
1622
1623/* receive from a socket and find
1624 * the transaction corresponding to the
1625 * transaction ID received in the server
1626 * reply.
1627 *
1628 * The semantics of the 'pibuf' pointer are
1629 * as follows:
1630 *
1631 * MBUF_RX:
1632 *
1633 */
1634
1635#define RPCIOD_RXBUFSZ  UDPMSGSIZE
1636
1637static RpcUdpXact
1638sockRcv(void)
1639{
1640int                                     len,i;
1641u_long                          xid;
1642union {
1643        struct sockaddr_in      sin;
1644        struct sockaddr     sa;
1645}                                       fromAddr;
1646int                                     fromLen  = sizeof(fromAddr.sin);
1647RxBuf                           ibuf     = 0;
1648RpcUdpXact                      xact     = 0;
1649
1650        do {
1651
1652        /* rcv_mbuf() and recvfrom() differ in that the
1653         * former allocates buffers and passes them back
1654         * to us whereas the latter requires us to provide
1655         * buffer space.
1656         * Hence, in the first case whe have to make sure
1657         * no old buffer is leaked - in the second case,
1658         * we might well re-use an old buffer but must
1659         * make sure we have one allocated
1660         */
1661#ifdef MBUF_RX
1662        if (ibuf)
1663                bufFree(&ibuf);
1664
1665        len  = recv_mbuf_from(
1666                                        ourSock,
1667                                        &ibuf,
1668                                        RPCIOD_RXBUFSZ,
1669                                    &fromAddr.sa,
1670                                    &fromLen);
1671#else
1672        if ( !ibuf )
1673                ibuf = (RpcBuf)MY_MALLOC(RPCIOD_RXBUFSZ);
1674        if ( !ibuf )
1675                goto cleanup; /* no memory - drop this message */
1676
1677        len  = recvfrom(ourSock,
1678                                    ibuf->buf,
1679                                    RPCIOD_RXBUFSZ,
1680                                    0,
1681                                    &fromAddr.sa,
1682                                        &fromLen);
1683#endif
1684
1685        if (len <= 0) {
1686                if (EAGAIN != errno)
1687                        fprintf(stderr,"RECV failed: %s\n",strerror(errno));
1688                goto cleanup;
1689        }
1690
1691#if (DEBUG) & DEBUG_PACKLOSS
1692        if ( (unsigned)rand() < DEBUG_PACKLOSS_FRACT ) {
1693                /* lose packets once in a while */
1694                static int xxx = 0;
1695                if ( ++xxx % 16 == 0 )
1696                        fprintf(stderr,"DEBUG: dropped %i packets, so far...\n",xxx);
1697                if ( ibuf )
1698                        bufFree( &ibuf );
1699                continue;
1700        }
1701#endif
1702
1703        i = (xid=XID(ibuf)) & XACT_HASH_MSK;
1704
1705        if ( !(xact=xactHashTbl[i])                                             ||
1706                   xact->obuf.xid                     != xid                        ||
1707#ifdef REJECT_SERVERIP_MISMATCH
1708                   xact->server->addr.sin.sin_addr.s_addr != fromAddr.sin.sin_addr.s_addr       ||
1709#endif
1710                   xact->server->addr.sin.sin_port        != fromAddr.sin.sin_port ) {
1711
1712                if (xact) {
1713                        if (
1714#ifdef REJECT_SERVERIP_MISMATCH
1715                            xact->server->addr.sin.sin_addr.s_addr == fromAddr.sin.sin_addr.s_addr &&
1716#endif
1717                        xact->server->addr.sin.sin_port        == fromAddr.sin.sin_port        &&
1718                            ( xact->obuf.xid                   == xid + XACT_HASHS   ||
1719                                  xact->obuf.xid                   == xid + 2*XACT_HASHS    )
1720                                ) {
1721#ifndef DEBUG /* don't complain if it's just a late arrival of a retry */
1722                        fprintf(stderr,"RPCIO - FYI sockRcv(): dropping late/redundant retry answer\n");
1723#endif
1724                        } else {
1725                        fprintf(stderr,"RPCIO WARNING sockRcv(): transaction mismatch\n");
1726                        fprintf(stderr,"xact: xid  0x%08lx  -- got 0x%08lx\n",
1727                                                        xact->obuf.xid, xid);
1728                        fprintf(stderr,"xact: addr 0x%08lx  -- got 0x%08lx\n",
1729                                                        xact->server->addr.sin.sin_addr.s_addr,
1730                                                        fromAddr.sin.sin_addr.s_addr);
1731                        fprintf(stderr,"xact: port 0x%08x  -- got 0x%08x\n",
1732                                                        xact->server->addr.sin.sin_port,
1733                                                        fromAddr.sin.sin_port);
1734                        }
1735                } else {
1736                        fprintf(stderr,
1737                                        "RPCIO WARNING sockRcv(): got xid 0x%08lx but its slot is empty\n",
1738                                        xid);
1739                }
1740                /* forget about this one and try again */
1741                xact = 0;
1742        }
1743
1744        } while ( !xact );
1745
1746        xact->ibuf     = ibuf;
1747#ifndef MBUF_RX
1748        xact->ibufsize = RPCIOD_RXBUFSZ;
1749#endif
1750
1751        return xact;
1752
1753cleanup:
1754
1755        bufFree(&ibuf);
1756
1757        return 0;
1758}
1759
1760
1761#include <rtems/rtems_bsdnet_internal.h>
1762/* double check the event configuration; should probably globally
1763 * manage system events!!
1764 * We do this at the end of the file for the same reason we had
1765 * included mbuf.h only a couple of lines above - see comment up
1766 * there...
1767 */
1768#if RTEMS_RPC_EVENT & SOSLEEP_EVENT & SBWAIT_EVENT & NETISR_EVENTS
1769#error ILLEGAL EVENT CONFIGURATION
1770#endif
Note: See TracBrowser for help on using the repository browser.