source: rtems-tools/tester/rt/tftpy/TftpServer.py @ bf5fdab

5
Last change on this file since bf5fdab was bf5fdab, checked in by Chris Johns <chrisj@…>, on 11/27/18 at 00:24:38

tester/tftpd: Set the socket option to reuse.

  • Property mode set to 100644
File size: 11.6 KB
Line 
1# vim: ts=4 sw=4 et ai:
2# -*- coding: utf8 -*-
3"""This module implements the TFTP Server functionality. Instantiate an
4instance of the server, and then run the listen() method to listen for client
5requests. Logging is performed via a standard logging object set in
6TftpShared."""
7
8
9import socket, os, time
10import select
11import threading
12import logging
13from errno import EINTR
14from .TftpShared import *
15from .TftpPacketTypes import *
16from .TftpPacketFactory import TftpPacketFactory
17from .TftpContexts import TftpContextServer
18
19log = logging.getLogger('tftpy.TftpServer')
20
21class TftpServer(TftpSession):
22    """This class implements a tftp server object. Run the listen() method to
23    listen for client requests.
24
25    tftproot is the path to the tftproot directory to serve files from and/or
26    write them to.
27
28    dyn_file_func is a callable that takes a requested download
29    path that is not present on the file system and must return either a
30    file-like object to read from or None if the path should appear as not
31    found. This permits the serving of dynamic content.
32
33    upload_open is a callable that is triggered on every upload with the
34    requested destination path and server context. It must either return a
35    file-like object ready for writing or None if the path is invalid."""
36
37    def __init__(self,
38                 tftproot='/tftpboot',
39                 dyn_file_func=None,
40                 upload_open=None):
41        self.listenip = None
42        self.listenport = None
43        self.sock = None
44        # FIXME: What about multiple roots?
45        self.root = os.path.abspath(tftproot)
46        self.dyn_file_func = dyn_file_func
47        self.upload_open = upload_open
48        # A dict of sessions, where each session is keyed by a string like
49        # ip:tid for the remote end.
50        self.sessions = {}
51        # A threading event to help threads synchronize with the server
52        # is_running state.
53        self.is_running = threading.Event()
54
55        self.shutdown_gracefully = False
56        self.shutdown_immediately = False
57
58        for name in 'dyn_file_func', 'upload_open':
59            attr = getattr(self, name)
60            if attr and not callable(attr):
61                raise TftpException("{} supplied, but it is not callable.".format(name))
62        if os.path.exists(self.root):
63            log.debug("tftproot %s does exist", self.root)
64            if not os.path.isdir(self.root):
65                raise TftpException("The tftproot must be a directory.")
66            else:
67                log.debug("tftproot %s is a directory" % self.root)
68                if os.access(self.root, os.R_OK):
69                    log.debug("tftproot %s is readable" % self.root)
70                else:
71                    raise TftpException("The tftproot must be readable")
72                if os.access(self.root, os.W_OK):
73                    log.debug("tftproot %s is writable" % self.root)
74                else:
75                    log.warning("The tftproot %s is not writable" % self.root)
76        else:
77            raise TftpException("The tftproot does not exist.")
78
79    def __del__(self):
80        if self.sock is not None:
81            try:
82                self.sock.close()
83            except:
84                pass
85
86    def listen(self, listenip="", listenport=DEF_TFTP_PORT,
87               timeout=SOCK_TIMEOUT):
88        """Start a server listening on the supplied interface and port. This
89        defaults to INADDR_ANY (all interfaces) and UDP port 69. You can also
90        supply a different socket timeout value, if desired."""
91        tftp_factory = TftpPacketFactory()
92
93        # Don't use new 2.5 ternary operator yet
94        # listenip = listenip if listenip else '0.0.0.0'
95        if not listenip: listenip = '0.0.0.0'
96        log.info("Server requested on ip %s, port %s" % (listenip, listenport))
97        try:
98            # FIXME - sockets should be non-blocking
99            self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
100            self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
101            self.sock.bind((listenip, listenport))
102            _, self.listenport = self.sock.getsockname()
103        except socket.error as err:
104            # Reraise it for now.
105            raise err
106
107        self.is_running.set()
108
109        log.info("Starting receive loop...")
110        while True:
111            log.debug("shutdown_immediately is %s" % self.shutdown_immediately)
112            log.debug("shutdown_gracefully is %s" % self.shutdown_gracefully)
113            if self.shutdown_immediately:
114                log.warning("Shutting down now. Session count: %d" %
115                         len(self.sessions))
116                self.sock.close()
117                for key in self.sessions:
118                    self.sessions[key].end()
119                self.sessions = []
120                break
121
122            elif self.shutdown_gracefully:
123                if not self.sessions:
124                    log.warning("In graceful shutdown mode and all "
125                             "sessions complete.")
126                    self.sock.close()
127                    break
128
129            # Build the inputlist array of sockets to select() on.
130            inputlist = []
131            inputlist.append(self.sock)
132            for key in self.sessions:
133                inputlist.append(self.sessions[key].sock)
134
135            # Block until some socket has input on it.
136            log.debug("Performing select on this inputlist: %s", inputlist)
137            try:
138                readyinput, readyoutput, readyspecial = \
139                        select.select(inputlist, [], [], SOCK_TIMEOUT)
140            except select.error as err:
141                if err[0] == EINTR:
142                    # Interrupted system call
143                    log.debug("Interrupted syscall, retrying")
144                    continue
145                else:
146                    raise
147
148            deletion_list = []
149
150            # Handle the available data, if any. Maybe we timed-out.
151            for readysock in readyinput:
152                # Is the traffic on the main server socket? ie. new session?
153                if readysock == self.sock:
154                    log.debug("Data ready on our main socket")
155                    buffer, (raddress, rport) = self.sock.recvfrom(MAX_BLKSIZE)
156
157                    log.debug("Read %d bytes", len(buffer))
158
159                    if self.shutdown_gracefully:
160                        log.warning("Discarding data on main port, "
161                                 "in graceful shutdown mode")
162                        continue
163
164                    # Forge a session key based on the client's IP and port,
165                    # which should safely work through NAT.
166                    key = "%s:%s" % (raddress, rport)
167
168                    if not key in self.sessions:
169                        log.debug("Creating new server context for "
170                                     "session key = %s" % key)
171                        self.sessions[key] = TftpContextServer(raddress,
172                                                               rport,
173                                                               timeout,
174                                                               self.root,
175                                                               self.dyn_file_func,
176                                                               self.upload_open)
177                        try:
178                            self.sessions[key].start(buffer)
179                        except TftpException as err:
180                            deletion_list.append(key)
181                            log.error("Fatal exception thrown from "
182                                      "session %s: %s" % (key, str(err)))
183                    else:
184                        log.warning("received traffic on main socket for "
185                                 "existing session??")
186                    log.info("Currently handling these sessions:")
187                    for session_key, session in list(self.sessions.items()):
188                        log.info("    %s" % session)
189
190                else:
191                    # Must find the owner of this traffic.
192                    for key in self.sessions:
193                        if readysock == self.sessions[key].sock:
194                            log.debug("Matched input to session key %s"
195                                % key)
196                            try:
197                                self.sessions[key].cycle()
198                                if self.sessions[key].state == None:
199                                    log.info("Successful transfer.")
200                                    deletion_list.append(key)
201                            except TftpException as err:
202                                deletion_list.append(key)
203                                log.error("Fatal exception thrown from "
204                                          "session %s: %s"
205                                          % (key, str(err)))
206                            # Break out of for loop since we found the correct
207                            # session.
208                            break
209                    else:
210                        log.error("Can't find the owner for this packet. "
211                                  "Discarding.")
212
213            log.debug("Looping on all sessions to check for timeouts")
214            now = time.time()
215            for key in self.sessions:
216                try:
217                    self.sessions[key].checkTimeout(now)
218                except TftpTimeout as err:
219                    log.error(str(err))
220                    self.sessions[key].retry_count += 1
221                    if self.sessions[key].retry_count >= TIMEOUT_RETRIES:
222                        log.debug("hit max retries on %s, giving up" %
223                            self.sessions[key])
224                        deletion_list.append(key)
225                    else:
226                        log.debug("resending on session %s" % self.sessions[key])
227                        self.sessions[key].state.resendLast()
228
229            log.debug("Iterating deletion list.")
230            for key in deletion_list:
231                log.info('')
232                log.info("Session %s complete" % key)
233                if key in self.sessions:
234                    log.debug("Gathering up metrics from session before deleting")
235                    self.sessions[key].end()
236                    metrics = self.sessions[key].metrics
237                    if metrics.duration == 0:
238                        log.info("Duration too short, rate undetermined")
239                    else:
240                        log.info("Transferred %d bytes in %.2f seconds"
241                            % (metrics.bytes, metrics.duration))
242                        log.info("Average rate: %.2f kbps" % metrics.kbps)
243                    log.info("%.2f bytes in resent data" % metrics.resent_bytes)
244                    log.info("%d duplicate packets" % metrics.dupcount)
245                    log.debug("Deleting session %s" % key)
246                    del self.sessions[key]
247                    log.debug("Session list is now %s" % self.sessions)
248                else:
249                    log.warning(
250                        "Strange, session %s is not on the deletion list" % key)
251
252        self.is_running.clear()
253
254        log.debug("server returning from while loop")
255        self.shutdown_gracefully = self.shutdown_immediately = False
256
257    def stop(self, now=False):
258        """Stop the server gracefully. Do not take any new transfers,
259        but complete the existing ones. If force is True, drop everything
260        and stop. Note, immediately will not interrupt the select loop, it
261        will happen when the server returns on ready data, or a timeout.
262        ie. SOCK_TIMEOUT"""
263        if now:
264            self.shutdown_immediately = True
265        else:
266            self.shutdown_gracefully = True
Note: See TracBrowser for help on using the repository browser.