source: rtems-tools/tester/rt/config.py @ 04a5204

4.105
Last change on this file since 04a5204 was 04a5204, checked in by Sebastian Huber <sebastian.huber@…>, on 11/12/15 at 10:15:23

Python 3 compatibility

  • Property mode set to 100644
File size: 7.8 KB
Line 
1#
2# RTEMS Tools Project (http://www.rtems.org/)
3# Copyright 2013-2014 Chris Johns (chrisj@rtems.org)
4# All rights reserved.
5#
6# This file is part of the RTEMS Tools package in 'rtems-tools'.
7#
8# Redistribution and use in source and binary forms, with or without
9# modification, are permitted provided that the following conditions are met:
10#
11# 1. Redistributions of source code must retain the above copyright notice,
12# this list of conditions and the following disclaimer.
13#
14# 2. Redistributions in binary form must reproduce the above copyright notice,
15# this list of conditions and the following disclaimer in the documentation
16# and/or other materials provided with the distribution.
17#
18# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
22# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28# POSSIBILITY OF SUCH DAMAGE.
29#
30
31#
32# RTEMS Testing Config
33#
34
35import datetime
36import os
37import threading
38
39from rtemstoolkit import config
40from rtemstoolkit import error
41from rtemstoolkit import execute
42from rtemstoolkit import log
43from rtemstoolkit import path
44
45import console
46import gdb
47
48timeout = 15
49
50class file(config.file):
51    """RTEMS Testing configuration."""
52
53    _directives = ['%execute',
54                   '%gdb',
55                   '%console']
56
57    def __init__(self, report, name, opts, _directives = _directives):
58        super(file, self).__init__(name, opts, directives = _directives)
59        self.lock = threading.Lock()
60        self.realtime_trace = self.debug_trace('output')
61        self.process = None
62        self.console = None
63        self.output = None
64        self.report = report
65        self.name = name
66        self.timedout = False
67
68    def __del__(self):
69        if self.console:
70            del self.console
71        super(file, self).__del__()
72
73    def _lock(self):
74        self.lock.acquire()
75
76    def _unlock(self):
77        self.lock.release()
78
79    def _timeout(self):
80        self._lock()
81        self.timedout = True
82        self._unlock()
83        self.capture('*** TIMEOUT TIMEOUT')
84
85    def _dir_console(self, data):
86        if self.console is not None:
87            raise error.general(self._name_line_msg('console already configured'))
88        if len(data) == 0:
89            raise error.general(self._name_line_msg('no console configuration provided'))
90        console_trace = trace = self.debug_trace('console')
91        if data[0] == 'stdio':
92            self.console = console.stdio(trace = console_trace)
93        elif data[0] == 'tty':
94            if len(data) < 2 or len(data) >3:
95                raise error.general(self._name_line_msg('no tty configuration provided'))
96            if len(data) == 3:
97                settings = data[2]
98            else:
99                settings = None
100            self.console = console.tty(data[1],
101                                       output = self.capture,
102                                       setup = settings,
103                                       trace = console_trace)
104        else:
105            raise error.general(self._name_line_msg('invalid console type'))
106
107    def _dir_execute(self, data, total, index, exe, bsp_arch, bsp):
108        self.process = execute.execute(output = self.capture)
109        if not self.in_error:
110            if self.console:
111                self.console.open()
112            self.capture_console('run: %s' % (' '.join(data)))
113            ec, proc = self.process.open(data,
114                                         timeout = (int(self.expand('%{timeout}')),
115                                                    self._timeout))
116            self._lock()
117            if ec > 0:
118                self._error('execute failed: %s: exit-code:%d' % (' '.join(data), ec))
119            elif self.timedout:
120                self.process.kill()
121            self._unlock()
122            if self.console:
123                self.console.close()
124
125    def _dir_gdb(self, data, total, index, exe, bsp_arch, bsp):
126        if len(data) < 3 or len(data) > 4:
127            raise error.general('invalid %gdb arguments')
128        self.process = gdb.gdb(bsp_arch, bsp,
129                               trace = self.debug_trace('gdb'),
130                               mi_trace = self.debug_trace('gdb-mi'))
131        script = self.expand('%%{%s}' % data[2])
132        if script:
133            script = [l.strip() for l in script.splitlines()]
134        if not self.in_error:
135            if self.console:
136                self.console.open()
137            self.process.open(data[0], data[1],
138                              script = script,
139                              output = self.capture,
140                              gdb_console = self.capture_console,
141                              timeout = int(self.expand('%{timeout}')))
142            if self.console:
143                self.console.close()
144
145    def _directive_filter(self, results, directive, info, data):
146        if results[0] == 'directive':
147            _directive = results[1]
148            _data = results[2]
149            ds = []
150            if len(_data):
151                ds = [_data[0]]
152                if len(_data) > 1:
153                    ds += _data[1].split()
154            ds = self.expand(ds)
155
156            if _directive == '%console':
157                self._dir_console(ds)
158            else:
159                self._lock()
160                try:
161                    total = int(self.expand('%{test_total}'))
162                    index = int(self.expand('%{test_index}'))
163                    exe = self.expand('%{test_executable}')
164                    bsp_arch = self.expand('%{bsp_arch}')
165                    bsp = self.expand('%{bsp}')
166                    self.report.start(index, total, exe, exe, bsp_arch, bsp)
167                    self.output = []
168                finally:
169                    self._unlock()
170                if _directive == '%execute':
171                    self._dir_execute(ds, total, index, exe, bsp_arch, bsp)
172                elif _directive == '%gdb':
173                    self._dir_gdb(ds, total, index, exe, bsp_arch, bsp)
174                else:
175                    raise error.general(self._name_line_msg('invalid directive'))
176                self._lock()
177                try:
178                    self.report.end(exe, self.output)
179                    self.process = None
180                    self.output = None
181                finally:
182                    self._unlock()
183        return None, None, None
184
185    def _realtime_trace(self, text):
186        if self.realtime_trace:
187            for l in text:
188                print(' '.join(l))
189
190    def run(self):
191        self.load(self.name)
192
193    def capture(self, text):
194        text = [(']', l) for l in text.replace(chr(13), '').splitlines()]
195        self._lock()
196        if self.output is not None:
197            self._realtime_trace(text)
198            self.output += text
199        self._unlock()
200
201    def capture_console(self, text):
202        text = [('>', l) for l in text.replace(chr(13), '').splitlines()]
203        self._lock()
204        if self.output is not None:
205            self._realtime_trace(text)
206            self.output += text
207        self._unlock()
208
209    def debug_trace(self, flag):
210        dt = self.macros['debug_trace']
211        if dt:
212            if flag in dt.split(','):
213                return True
214        return False
215
216    def kill(self):
217        if self.process:
218            self.process.kill()
Note: See TracBrowser for help on using the repository browser.