1 | """This module implements the TFTP Server functionality. Instantiate an |
---|
2 | instance of the server, and then run the listen() method to listen for client |
---|
3 | requests. Logging is performed via a standard logging object set in |
---|
4 | TftpShared.""" |
---|
5 | |
---|
6 | from __future__ import absolute_import, division, print_function, unicode_literals |
---|
7 | import socket, os, time |
---|
8 | import select |
---|
9 | import threading |
---|
10 | from errno import EINTR |
---|
11 | from .TftpShared import * |
---|
12 | from .TftpPacketTypes import * |
---|
13 | from .TftpPacketFactory import TftpPacketFactory |
---|
14 | from .TftpContexts import TftpContextServer |
---|
15 | |
---|
16 | class TftpServer(TftpSession): |
---|
17 | """This class implements a tftp server object. Run the listen() method to |
---|
18 | listen for client requests. |
---|
19 | |
---|
20 | tftproot is the path to the tftproot directory to serve files from and/or |
---|
21 | write them to. |
---|
22 | |
---|
23 | dyn_file_func is a callable that takes a requested download |
---|
24 | path that is not present on the file system and must return either a |
---|
25 | file-like object to read from or None if the path should appear as not |
---|
26 | found. This permits the serving of dynamic content. |
---|
27 | |
---|
28 | upload_open is a callable that is triggered on every upload with the |
---|
29 | requested destination path and server context. It must either return a |
---|
30 | file-like object ready for writing or None if the path is invalid.""" |
---|
31 | |
---|
32 | def __init__(self, |
---|
33 | tftproot='/tftpboot', |
---|
34 | dyn_file_func=None, |
---|
35 | upload_open=None): |
---|
36 | self.listenip = None |
---|
37 | self.listenport = None |
---|
38 | self.sock = None |
---|
39 | # FIXME: What about multiple roots? |
---|
40 | self.root = os.path.abspath(tftproot) |
---|
41 | self.dyn_file_func = dyn_file_func |
---|
42 | self.upload_open = upload_open |
---|
43 | # A dict of sessions, where each session is keyed by a string like |
---|
44 | # ip:tid for the remote end. |
---|
45 | self.sessions = {} |
---|
46 | # A threading event to help threads synchronize with the server |
---|
47 | # is_running state. |
---|
48 | self.is_running = threading.Event() |
---|
49 | |
---|
50 | self.shutdown_gracefully = False |
---|
51 | self.shutdown_immediately = False |
---|
52 | |
---|
53 | for name in 'dyn_file_func', 'upload_open': |
---|
54 | attr = getattr(self, name) |
---|
55 | if attr and not callable(attr): |
---|
56 | raise TftpException, "%s supplied, but it is not callable." % ( |
---|
57 | name,) |
---|
58 | if os.path.exists(self.root): |
---|
59 | log.debug("tftproot %s does exist", self.root) |
---|
60 | if not os.path.isdir(self.root): |
---|
61 | raise TftpException("The tftproot must be a directory.") |
---|
62 | else: |
---|
63 | log.debug("tftproot %s is a directory" % self.root) |
---|
64 | if os.access(self.root, os.R_OK): |
---|
65 | log.debug("tftproot %s is readable" % self.root) |
---|
66 | else: |
---|
67 | raise TftpException("The tftproot must be readable") |
---|
68 | if os.access(self.root, os.W_OK): |
---|
69 | log.debug("tftproot %s is writable" % self.root) |
---|
70 | else: |
---|
71 | log.warning("The tftproot %s is not writable" % self.root) |
---|
72 | else: |
---|
73 | raise TftpException("The tftproot does not exist.") |
---|
74 | |
---|
75 | def listen(self, listenip="", listenport=DEF_TFTP_PORT, |
---|
76 | timeout=SOCK_TIMEOUT): |
---|
77 | """Start a server listening on the supplied interface and port. This |
---|
78 | defaults to INADDR_ANY (all interfaces) and UDP port 69. You can also |
---|
79 | supply a different socket timeout value, if desired.""" |
---|
80 | tftp_factory = TftpPacketFactory() |
---|
81 | |
---|
82 | # Don't use new 2.5 ternary operator yet |
---|
83 | # listenip = listenip if listenip else '0.0.0.0' |
---|
84 | if not listenip: listenip = '0.0.0.0' |
---|
85 | log.info("Server requested on ip %s, port %s" % (listenip, listenport)) |
---|
86 | try: |
---|
87 | # FIXME - sockets should be non-blocking |
---|
88 | self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) |
---|
89 | self.sock.bind((listenip, listenport)) |
---|
90 | _, self.listenport = self.sock.getsockname() |
---|
91 | except socket.error as err: |
---|
92 | # Reraise it for now. |
---|
93 | raise err |
---|
94 | |
---|
95 | self.is_running.set() |
---|
96 | |
---|
97 | log.info("Starting receive loop...") |
---|
98 | while True: |
---|
99 | log.debug("shutdown_immediately is %s" % self.shutdown_immediately) |
---|
100 | log.debug("shutdown_gracefully is %s" % self.shutdown_gracefully) |
---|
101 | if self.shutdown_immediately: |
---|
102 | log.warn("Shutting down now. Session count: %d" % |
---|
103 | len(self.sessions)) |
---|
104 | self.sock.close() |
---|
105 | for key in self.sessions: |
---|
106 | self.sessions[key].end() |
---|
107 | self.sessions = [] |
---|
108 | break |
---|
109 | |
---|
110 | elif self.shutdown_gracefully: |
---|
111 | if not self.sessions: |
---|
112 | log.warn("In graceful shutdown mode and all " |
---|
113 | "sessions complete.") |
---|
114 | self.sock.close() |
---|
115 | break |
---|
116 | |
---|
117 | # Build the inputlist array of sockets to select() on. |
---|
118 | inputlist = [] |
---|
119 | inputlist.append(self.sock) |
---|
120 | for key in self.sessions: |
---|
121 | inputlist.append(self.sessions[key].sock) |
---|
122 | |
---|
123 | # Block until some socket has input on it. |
---|
124 | log.debug("Performing select on this inputlist: %s", inputlist) |
---|
125 | try: |
---|
126 | readyinput, readyoutput, readyspecial = \ |
---|
127 | select.select(inputlist, [], [], timeout) |
---|
128 | except select.error as err: |
---|
129 | if err[0] == EINTR: |
---|
130 | # Interrupted system call |
---|
131 | log.debug("Interrupted syscall, retrying") |
---|
132 | continue |
---|
133 | else: |
---|
134 | raise |
---|
135 | |
---|
136 | deletion_list = [] |
---|
137 | |
---|
138 | # Handle the available data, if any. Maybe we timed-out. |
---|
139 | for readysock in readyinput: |
---|
140 | # Is the traffic on the main server socket? ie. new session? |
---|
141 | if readysock == self.sock: |
---|
142 | log.debug("Data ready on our main socket") |
---|
143 | buffer, (raddress, rport) = self.sock.recvfrom(MAX_BLKSIZE) |
---|
144 | |
---|
145 | log.debug("Read %d bytes", len(buffer)) |
---|
146 | |
---|
147 | if self.shutdown_gracefully: |
---|
148 | log.warn("Discarding data on main port, " |
---|
149 | "in graceful shutdown mode") |
---|
150 | continue |
---|
151 | |
---|
152 | # Forge a session key based on the client's IP and port, |
---|
153 | # which should safely work through NAT. |
---|
154 | key = "%s:%s" % (raddress, rport) |
---|
155 | |
---|
156 | if not key in self.sessions: |
---|
157 | log.debug("Creating new server context for " |
---|
158 | "session key = %s" % key) |
---|
159 | self.sessions[key] = TftpContextServer(raddress, |
---|
160 | rport, |
---|
161 | timeout, |
---|
162 | self.root, |
---|
163 | self.dyn_file_func, |
---|
164 | self.upload_open) |
---|
165 | try: |
---|
166 | self.sessions[key].start(buffer) |
---|
167 | except TftpException as err: |
---|
168 | deletion_list.append(key) |
---|
169 | log.error("Fatal exception thrown from " |
---|
170 | "session %s: %s" % (key, str(err))) |
---|
171 | else: |
---|
172 | log.warn("received traffic on main socket for " |
---|
173 | "existing session??") |
---|
174 | log.info("Currently handling these sessions:") |
---|
175 | for session_key, session in self.sessions.items(): |
---|
176 | log.info(" %s" % session) |
---|
177 | |
---|
178 | else: |
---|
179 | # Must find the owner of this traffic. |
---|
180 | for key in self.sessions: |
---|
181 | if readysock == self.sessions[key].sock: |
---|
182 | log.debug("Matched input to session key %s" |
---|
183 | % key) |
---|
184 | try: |
---|
185 | self.sessions[key].cycle() |
---|
186 | if self.sessions[key].state == None: |
---|
187 | log.info("Successful transfer.") |
---|
188 | deletion_list.append(key) |
---|
189 | except TftpException as err: |
---|
190 | deletion_list.append(key) |
---|
191 | log.error("Fatal exception thrown from " |
---|
192 | "session %s: %s" |
---|
193 | % (key, str(err))) |
---|
194 | # Break out of for loop since we found the correct |
---|
195 | # session. |
---|
196 | break |
---|
197 | else: |
---|
198 | log.error("Can't find the owner for this packet. " |
---|
199 | "Discarding.") |
---|
200 | |
---|
201 | log.debug("Looping on all sessions to check for timeouts") |
---|
202 | now = time.time() |
---|
203 | for key in self.sessions: |
---|
204 | try: |
---|
205 | self.sessions[key].checkTimeout(now) |
---|
206 | except TftpTimeout as err: |
---|
207 | log.error(str(err)) |
---|
208 | self.sessions[key].retry_count += 1 |
---|
209 | if self.sessions[key].retry_count >= TIMEOUT_RETRIES: |
---|
210 | log.debug("hit max retries on %s, giving up" % |
---|
211 | self.sessions[key]) |
---|
212 | deletion_list.append(key) |
---|
213 | else: |
---|
214 | log.debug("resending on session %s" % self.sessions[key]) |
---|
215 | self.sessions[key].state.resendLast() |
---|
216 | |
---|
217 | log.debug("Iterating deletion list.") |
---|
218 | for key in deletion_list: |
---|
219 | log.info('') |
---|
220 | log.info("Session %s complete" % key) |
---|
221 | if key in self.sessions: |
---|
222 | log.debug("Gathering up metrics from session before deleting") |
---|
223 | self.sessions[key].end() |
---|
224 | metrics = self.sessions[key].metrics |
---|
225 | if metrics.duration == 0: |
---|
226 | log.info("Duration too short, rate undetermined") |
---|
227 | else: |
---|
228 | log.info("Transferred %d bytes in %.2f seconds" |
---|
229 | % (metrics.bytes, metrics.duration)) |
---|
230 | log.info("Average rate: %.2f kbps" % metrics.kbps) |
---|
231 | log.info("%.2f bytes in resent data" % metrics.resent_bytes) |
---|
232 | log.info("%d duplicate packets" % metrics.dupcount) |
---|
233 | log.debug("Deleting session %s" % key) |
---|
234 | del self.sessions[key] |
---|
235 | log.debug("Session list is now %s" % self.sessions) |
---|
236 | else: |
---|
237 | log.warn( |
---|
238 | "Strange, session %s is not on the deletion list" % key) |
---|
239 | |
---|
240 | self.is_running.clear() |
---|
241 | |
---|
242 | log.debug("server returning from while loop") |
---|
243 | self.shutdown_gracefully = self.shutdown_immediately = False |
---|
244 | |
---|
245 | def stop(self, now=False): |
---|
246 | """Stop the server gracefully. Do not take any new transfers, |
---|
247 | but complete the existing ones. If force is True, drop everything |
---|
248 | and stop. Note, immediately will not interrupt the select loop, it |
---|
249 | will happen when the server returns on ready data, or a timeout. |
---|
250 | ie. SOCK_TIMEOUT""" |
---|
251 | if now: |
---|
252 | self.shutdown_immediately = True |
---|
253 | else: |
---|
254 | self.shutdown_gracefully = True |
---|