source: rtems-tools/rtemstoolkit/execute.py @ 6fa09650

5
Last change on this file since 6fa09650 was 6fa09650, checked in by Chris Johns <chrisj@…>, on 11/23/18 at 03:59:18

rtemstoolkit/execute: Use buffered output and improve performance

  • Use buffered output on the stdout and stderr streams from child processors.
  • Simplify the read thread line processing to improve performance.
  • Disable 'close_fds' as it slows down python3's popen call.
  • Update the importing of rtemstoolkit modules.
  • Property mode set to 100755
File size: 23.4 KB
Line 
1#
2# RTEMS Tools Project (http://www.rtems.org/)
3# Copyright 2010-2017 Chris Johns (chrisj@rtems.org)
4# All rights reserved.
5#
6# This file is part of the RTEMS Tools package in 'rtems-tools'.
7#
8# Redistribution and use in source and binary forms, with or without
9# modification, are permitted provided that the following conditions are met:
10#
11# 1. Redistributions of source code must retain the above copyright notice,
12# this list of conditions and the following disclaimer.
13#
14# 2. Redistributions in binary form must reproduce the above copyright notice,
15# this list of conditions and the following disclaimer in the documentation
16# and/or other materials provided with the distribution.
17#
18# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
22# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28# POSSIBILITY OF SUCH DAMAGE.
29#
30
31#
32# Execute commands or scripts.
33#
34# Note, the subprocess module is only in Python 2.4 or higher.
35#
36
37from __future__ import print_function
38
39import functools
40import io
41import os
42import re
43import sys
44import subprocess
45import threading
46import time
47import traceback
48
49from rtemstoolkit import error
50from rtemstoolkit import log
51
52# Trace exceptions
53trace_threads = False
54
55# Redefine the PIPE from subprocess
56PIPE = subprocess.PIPE
57
58# Regular expression to find quotes.
59qstr = re.compile('[rR]?\'([^\\n\'\\\\]|\\\\.)*\'|[rR]?"([^\\n"\\\\]|\\\\.)*"')
60
61def check_type(command):
62    """Checks the type of command we have. The types are spawn and
63    shell."""
64    if command in ['spawn', 'shell']:
65        return True
66    return False
67
68def arg_list(args):
69    """Turn a string of arguments into a list suitable for
70    spawning a command. If the args are already a list return
71    it."""
72    if type(args) is list:
73        return args
74    argstr = args
75    args = []
76    while len(argstr):
77        qs = qstr.search(argstr)
78        if not qs:
79            args.extend(argstr.split())
80            argstr= ''
81        else:
82            # We have a quoted string. Get the string before
83            # the quoted string and splt on white space then
84            # add the quoted string as an option then remove
85            # the first + quoted string and try again
86            front = argstr[:qs.start()]
87            args.extend(front.split())
88            args.append(argstr[qs.start() + 1:qs.end() - 1])
89            argstr = argstr[qs.end():]
90    return args
91
92def arg_subst(command, substs):
93    """Substitute the %[0-9] in the command with the subst values."""
94    args = arg_list(command)
95    if substs:
96        for a in range(0, len(args)):
97            for r in range(0, len(substs)):
98                args[a] = re.compile(('%%%d' % (r))).sub(substs[r], args[a])
99    return args
100
101def arg_subst_str(command, subst):
102    cmd = arg_subst(command, subst)
103    def add(x, y): return x + ' ' + str(y)
104    return functools.reduce(add, cmd, '')
105
106class execute(object):
107    """Execute commands or scripts. The 'output' is a funtion that handles the
108    output from the process. The 'input' is a function that blocks and returns
109    data to be written to stdin"""
110    def __init__(self, output = None, input = None, cleanup = None,
111                 error_prefix = '', verbose = False):
112        self.lock = threading.Lock()
113        self.output = output
114        self.input = input
115        self.cleanup = cleanup
116        self.error_prefix = error_prefix
117        self.verbose = verbose
118        self.shell_exe = None
119        self.shell_commands = False
120        self.path = None
121        self.environment = None
122        self.outputting = False
123        self.timing_out = False
124        self.proc = None
125
126    def capture(self, proc, command = 'pipe', timeout = None):
127        """Create 3 threads to read stdout and stderr and send to the output handler
128        and call an input handler is provided. Based on the 'communicate' code
129        in the subprocess module."""
130        def _writethread(exe, fh, input):
131            """Call the input handler and write it to the stdin. The input handler should
132            block and return None or False if this thread is to exit and True if this
133            is a timeout check."""
134            if trace_threads:
135                print('execute:_writethread: start')
136            encoding = True
137            try:
138                tmp = bytes('temp', sys.stdin.encoding)
139            except:
140                encoding = False
141            try:
142                while True:
143                    if trace_threads:
144                        print('execute:_writethread: call input', input)
145                    lines = input()
146                    if type(lines) == str or type(lines) == bytes:
147                        try:
148                            if encoding:
149                                lines = bytes(lines, sys.stdin.encoding)
150                            fh.write(lines)
151                        except:
152                            break
153                    if lines == None or \
154                       lines == False or \
155                       (lines == True and fh.closed):
156                        break
157            except:
158                if trace_threads:
159                    print('execute:_writethread: exception')
160                    print(traceback.format_exc())
161                pass
162            try:
163                fh.close()
164            except:
165                pass
166            if trace_threads:
167                print('execute:_writethread: finished')
168
169        def _readthread(exe, fh, out, prefix = ''):
170            """Read from a file handle and write to the output handler
171            until the file closes."""
172            def _output_line(line, exe, prefix, out, count):
173                #exe.lock.acquire()
174                #exe.outputting = True
175                #exe.lock.release()
176                if out:
177                    out(prefix + line)
178                else:
179                    log.output(prefix + line)
180                    if count > 10:
181                        log.flush()
182
183            if trace_threads:
184                print('execute:_readthread: start')
185            count = 0
186            line = ''
187            try:
188                while True:
189                    #
190                    # The io module file handling return up to the size passed
191                    # in to the read call. The io handle has the default
192                    # buffering size. On any error assume the handle has gone
193                    # and the process is shutting down.
194                    #
195                    try:
196                        data = fh.read(4096)
197                    except:
198                        data = ''
199                    if len(data) == 0:
200                        if len(line) > 0:
201                            _output_line(l + '\n', exe, prefix, out, count)
202                        break
203                    # str and bytes are the same type in Python2
204                    if type(data) is not str and type(data) is bytes:
205                        data = data.decode(sys.stdout.encoding)
206                    last_ch = data[-1]
207                    sd = (line + data).split('\n')
208                    if last_ch != '\n':
209                        line = sd[-1]
210                    else:
211                        line = ''
212                    sd = sd[:-1]
213                    if len(sd) > 0:
214                        for l in sd:
215                            _output_line(l + '\n', exe, prefix, out, count)
216                            count += 1
217                        if count > 10:
218                            count -= 10
219            except:
220                raise
221                if trace_threads:
222                    print('execute:_readthread: exception')
223                    print(traceback.format_exc())
224                pass
225            try:
226                fh.close()
227            except:
228                pass
229            if len(line):
230                _output_line(line, exe, prefix, out, 100)
231            if trace_threads:
232                print('execute:_readthread: finished')
233
234        def _timerthread(exe, interval, function):
235            """Timer thread is used to timeout a process if no output is
236            produced for the timeout interval."""
237            count = interval
238            while exe.timing_out:
239                time.sleep(1)
240                if count > 0:
241                    count -= 1
242                exe.lock.acquire()
243                if exe.outputting:
244                    count = interval
245                    exe.outputting = False
246                exe.lock.release()
247                if count == 0:
248                    try:
249                        proc.kill()
250                    except:
251                        pass
252                    else:
253                        function()
254                    break
255
256        name = os.path.basename(command[0])
257
258        stdin_thread = None
259        stdout_thread = None
260        stderr_thread = None
261        timeout_thread = None
262
263        if proc.stdout:
264            stdout_thread = threading.Thread(target = _readthread,
265                                             name = '_stdout[%s]' % (name),
266                                             args = (self,
267                                                     io.open(proc.stdout.fileno(),
268                                                             mode = 'rb',
269                                                             closefd = False),
270                                                     self.output,
271                                                     ''))
272            stdout_thread.daemon = True
273            stdout_thread.start()
274        if proc.stderr:
275            stderr_thread = threading.Thread(target = _readthread,
276                                             name = '_stderr[%s]' % (name),
277                                             args = (self,
278                                                     io.open(proc.stderr.fileno(),
279                                                             mode = 'rb',
280                                                             closefd = False),
281                                                     self.output,
282                                                     self.error_prefix))
283            stderr_thread.daemon = True
284            stderr_thread.start()
285        if self.input and proc.stdin:
286            stdin_thread = threading.Thread(target = _writethread,
287                                            name = '_stdin[%s]' % (name),
288                                            args = (self,
289                                                    proc.stdin,
290                                                    self.input))
291            stdin_thread.daemon = True
292            stdin_thread.start()
293        if timeout:
294            self.timing_out = True
295            timeout_thread = threading.Thread(target = _timerthread,
296                                              name = '_timeout[%s]' % (name),
297                                              args = (self,
298                                                      timeout[0],
299                                                      timeout[1]))
300            timeout_thread.daemon = True
301            timeout_thread.start()
302        try:
303            self.lock.acquire()
304            try:
305                self.proc = proc
306            except:
307                raise
308            finally:
309                self.lock.release()
310            exitcode = proc.wait()
311        except:
312            proc.kill()
313            raise
314        finally:
315            self.lock.acquire()
316            try:
317                self.proc = None
318            except:
319                raise
320            finally:
321                self.lock.release()
322            if self.cleanup:
323                self.cleanup(proc)
324            if timeout_thread:
325                self.timing_out = False
326                timeout_thread.join(10)
327            if stdin_thread:
328                stdin_thread.join(2)
329            if stdout_thread:
330                stdout_thread.join(2)
331            if stderr_thread:
332                stderr_thread.join(2)
333        return exitcode
334
335    def open(self, command, capture = True, shell = False,
336             cwd = None, env = None,
337             stdin = None, stdout = None, stderr = None,
338             timeout = None):
339        """Open a command with arguments. Provide the arguments as a list or
340        a string."""
341        if self.verbose:
342            s = command
343            if type(command) is list:
344                def add(x, y): return x + ' ' + str(y)
345                s = functools.reduce(add, command, '')[1:]
346            what = 'spawn'
347            if shell:
348                what = 'shell'
349            log.output(what + ': ' + s)
350        if self.output is None:
351            raise error.general('capture needs an output handler')
352        if shell and self.shell_exe:
353            command = arg_list(command)
354            command[:0] = self.shell_exe
355        if not stdin and self.input:
356            stdin = subprocess.PIPE
357        if not stdout:
358            stdout = subprocess.PIPE
359        if not stderr:
360            stderr = subprocess.PIPE
361        proc = None
362        if cwd is None:
363            cwd = self.path
364        if env is None:
365            env = self.environment
366        try:
367            # Work around a problem on Windows with commands that
368            # have a '.' and no extension. Windows needs the full
369            # command name.
370            if sys.platform == "win32" and type(command) is list:
371                if command[0].find('.') >= 0:
372                    r, e = os.path.splitext(command[0])
373                    if e not in ['.exe', '.com', '.bat']:
374                        command[0] = command[0] + '.exe'
375            log.trace('exe: %s' % (command))
376            proc = subprocess.Popen(command, shell = shell,
377                                    cwd = cwd, env = env,
378                                    stdin = stdin, stdout = stdout,
379                                    stderr = stderr,
380                                    close_fds = False)
381            if not capture:
382                return (0, proc)
383            if self.output is None:
384                raise error.general('capture needs an output handler')
385            exit_code = self.capture(proc, command, timeout)
386            if self.verbose:
387                log.output('exit: ' + str(exit_code))
388        except OSError as ose:
389            exit_code = ose.errno
390            if self.verbose:
391                log.output('exit: ' + str(ose))
392        return (exit_code, proc)
393
394    def spawn(self, command, capture = True, cwd = None, env = None,
395              stdin = None, stdout = None, stderr = None,
396              timeout = None):
397        """Spawn a command with arguments. Provide the arguments as a list or
398        a string."""
399        return self.open(command, capture, False, cwd, env,
400                         stdin, stdout, stderr, timeout)
401
402    def shell(self, command, capture = True, cwd = None, env = None,
403              stdin = None, stdout = None, stderr = None,
404              timeout = None):
405        """Execute a command within a shell context. The command can contain
406        argumments. The shell is specific to the operating system. For example
407        it is cmd.exe on Windows XP."""
408        return self.open(command, capture, True, cwd, env,
409                         stdin, stdout, stderr, timeout)
410
411    def command(self, command, args = None, capture = True, shell = False,
412                cwd = None, env = None,
413                stdin = None, stdout = None, stderr = None,
414                timeout = None):
415        """Run the command with the args. The args can be a list
416        or a string."""
417        if args and not type(args) is list:
418            args = arg_list(args)
419        cmd = [command]
420        if args:
421            cmd.extend(args)
422        return self.open(cmd, capture = capture, shell = shell,
423                         cwd = cwd, env = env,
424                         stdin = stdin, stdout = stdout, stderr = stderr,
425                         timeout = timeout)
426
427    def command_subst(self, command, substs, capture = True, shell = False,
428                      cwd = None, env = None,
429                      stdin = None, stdout = None, stderr = None,
430                      timeout = None):
431        """Run the command from the config data with the
432        option format string subsituted with the subst variables."""
433        args = arg_subst(command, substs)
434        return self.command(args[0], args[1:], capture = capture,
435                            shell = shell or self.shell_commands,
436                            cwd = cwd, env = env,
437                            stdin = stdin, stdout = stdout, stderr = stderr,
438                            timeout = timeout)
439
440    def set_shell(self, execute):
441        """Set the shell to execute when issuing a shell command."""
442        args = arg_list(execute)
443        if len(args) == 0 or not os.path.isfile(args[0]):
444            raise error.general('could find shell: ' + execute)
445        self.shell_exe = args
446
447    def command_use_shell(self):
448        """Force all commands to use a shell. This can be used with set_shell
449        to allow Unix commands be executed on Windows with a Unix shell such
450        as Cygwin or MSYS. This may cause piping to fail."""
451        self.shell_commands = True
452
453    def set_output(self, output):
454        """Set the output handler. The stdout of the last process in a pipe
455        line is passed to this handler."""
456        old_output = self.output
457        self.output = output
458        return old_output
459
460    def set_path(self, path):
461        """Set the path changed to before the child process is created."""
462        old_path = self.path
463        self.path = path
464        return old_path
465
466    def set_environ(self, environment):
467        """Set the environment passed to the child process when created."""
468        old_environment = self.environment
469        self.environment = environment
470        return old_environment
471
472    def kill(self):
473        self.lock.acquire()
474        try:
475            if self.proc is not None:
476                self.proc.kill()
477        except:
478            raise
479        finally:
480            self.lock.release()
481
482    def terminate(self):
483        self.lock.acquire()
484        try:
485            if self.proc is not None:
486                self.proc.terminate()
487        except:
488            raise
489        finally:
490            self.lock.release()
491
492    def send_signal(self, signal):
493        self.lock.acquire()
494        try:
495            if self.proc is not None:
496                print("sending sig")
497                self.proc.send_signal(signal)
498        except:
499            raise
500        finally:
501            self.lock.release()
502
503class capture_execution(execute):
504    """Capture all output as a string and return it."""
505
506    class _output_snapper:
507        def __init__(self, log = None, dump = False):
508            self.output = ''
509            self.log = log
510            self.dump = dump
511
512        def handler(self, text):
513            if not self.dump:
514                if self.log is not None:
515                    self.log.output(text)
516                else:
517                    self.output += text
518
519        def get_and_clear(self):
520            text = self.output
521            self.output = ''
522            return text.strip()
523
524    def __init__(self, log = None, dump = False, error_prefix = '', verbose = False):
525        self.snapper = capture_execution._output_snapper(log = log, dump = dump)
526        execute.__init__(self, output = self.snapper.handler,
527                         error_prefix = error_prefix,
528                         verbose = verbose)
529
530    def open(self, command, capture = True, shell = False, cwd = None, env = None,
531             stdin = None, stdout = None, stderr = None, timeout = None):
532        if not capture:
533            raise error.general('output capture must be true; leave as default')
534        #self.snapper.get_and_clear()
535        exit_code, proc = execute.open(self, command, capture = True, shell = shell,
536                                       cwd = cwd, env = env,
537                                       stdin = stdin, stdout = stdout, stderr = stderr,
538                                       timeout = timeout)
539        return (exit_code, proc, self.snapper.get_and_clear())
540
541    def set_output(self, output):
542        raise error.general('output capture cannot be overrided')
543
544if __name__ == "__main__":
545    def run_tests(e, commands, use_shell):
546        for c in commands['shell']:
547            e.shell(c)
548        for c in commands['spawn']:
549            e.spawn(c)
550        for c in commands['cmd']:
551            if type(c) is str:
552                e.command(c, shell = use_shell)
553            else:
554                e.command(c[0], c[1], shell = use_shell)
555        for c in commands['csubsts']:
556            e.command_subst(c[0], c[1], shell = use_shell)
557        ec, proc = e.command(commands['pipe'][0], commands['pipe'][1],
558                             capture = False, stdin = subprocess.PIPE)
559        if ec == 0:
560            print('piping input into ' + commands['pipe'][0] + ': ' + \
561                  commands['pipe'][2])
562            try:
563                out = bytes(commands['pipe'][2], sys.stdin.encoding)
564            except:
565                out = commands['pipe'][2]
566            proc.stdin.write(out)
567            proc.stdin.close()
568            e.capture(proc)
569            del proc
570
571    def capture_output(text):
572        print(text, end = '')
573
574    cmd_shell_test = 'if "%OS%" == "Windows_NT" (echo It is WinNT) else echo Is is not WinNT'
575    sh_shell_test = 'x="me"; if [ $x = "me" ]; then echo "It was me"; else "It was him"; fi'
576
577    commands = {}
578    commands['windows'] = {}
579    commands['unix'] = {}
580    commands['windows']['shell'] = ['cd', 'dir /w', '.\\xyz', cmd_shell_test]
581    commands['windows']['spawn'] = ['hostname', 'hostnameZZ', ['netstat', '/e']]
582    commands['windows']['cmd'] = [('ipconfig'), ('nslookup', 'www.python.org')]
583    commands['windows']['csubsts'] = [('netstat %0', ['-a']),
584                                      ('netstat %0 %1', ['-a', '-n'])]
585    commands['windows']['pipe'] = ('ftp', None, 'help\nquit')
586    commands['unix']['shell'] = ['pwd', 'ls -las', './xyz', sh_shell_test]
587    commands['unix']['spawn'] = ['ls', 'execute.pyc', ['ls', '-i']]
588    commands['unix']['cmd'] = [('date'), ('date', '-R'), ('date', ['-u', '+%d %D']),
589                               ('date', '-u "+%d %D %S"')]
590    commands['unix']['csubsts'] = [('date %0 "+%d %D %S"', ['-u']),
591                                   ('date %0 %1', ['-u', '+%d %D %S'])]
592    commands['unix']['pipe'] = ('grep', 'hello', 'hello world')
593
594    print(arg_list('cmd a1 a2 "a3 is a string" a4'))
595    print(arg_list('cmd b1 b2 "b3 is a string a4'))
596    print(arg_subst(['nothing', 'xx-%0-yyy', '%1', '%2-something'],
597                    ['subst0', 'subst1', 'subst2']))
598
599    e = execute(error_prefix = 'ERR: ', output = capture_output, verbose = True)
600    if sys.platform == "win32":
601        run_tests(e, commands['windows'], False)
602        if os.path.exists('c:\\msys\\1.0\\bin\\sh.exe'):
603            e.set_shell('c:\\msys\\1.0\\bin\\sh.exe --login -c')
604            commands['unix']['pipe'] = ('c:\\msys\\1.0\\bin\\grep',
605                                        'hello', 'hello world')
606            run_tests(e, commands['unix'], True)
607    else:
608        run_tests(e, commands['unix'], False)
609    del e
Note: See TracBrowser for help on using the repository browser.