source: rtems-tools/tester/rt/config.py @ 0737b46

5
Last change on this file since 0737b46 was 0737b46, checked in by Chris Johns <chrisj@…>, on 10/05/17 at 03:21:01

tester: Add a target reset regular expression to detect a reset.

  • Property mode set to 100644
File size: 11.6 KB
Line 
1#
2# RTEMS Tools Project (http://www.rtems.org/)
3# Copyright 2013-2017 Chris Johns (chrisj@rtems.org)
4# All rights reserved.
5#
6# This file is part of the RTEMS Tools package in 'rtems-tools'.
7#
8# Redistribution and use in source and binary forms, with or without
9# modification, are permitted provided that the following conditions are met:
10#
11# 1. Redistributions of source code must retain the above copyright notice,
12# this list of conditions and the following disclaimer.
13#
14# 2. Redistributions in binary form must reproduce the above copyright notice,
15# this list of conditions and the following disclaimer in the documentation
16# and/or other materials provided with the distribution.
17#
18# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
22# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28# POSSIBILITY OF SUCH DAMAGE.
29#
30
31#
32# RTEMS Testing Config
33#
34
35from __future__ import print_function
36
37import datetime
38import os
39import re
40import threading
41
42from rtemstoolkit import config
43from rtemstoolkit import error
44from rtemstoolkit import execute
45from rtemstoolkit import log
46from rtemstoolkit import path
47
48from . import console
49from . import gdb
50from . import tftp
51
52timeout = 15
53
54class file(config.file):
55    """RTEMS Testing configuration."""
56
57    _directives = ['%execute',
58                   '%gdb',
59                   '%tftp',
60                   '%console']
61
62    def __init__(self, index, report, name, opts, _directives = _directives):
63        super(file, self).__init__(name, opts, directives = _directives)
64        self.lock = threading.Lock()
65        self.realtime_trace = self.debug_trace('output')
66        self.process = None
67        self.console = None
68        self.output = None
69        self.index = index
70        self.report = report
71        self.name = name
72        self.timedout = False
73        self.test_started = False
74        self.kill_good = False
75        self.kill_on_end = False
76        self.test_label = None
77        self.target_reset_regx = None
78
79    def __del__(self):
80        if self.console:
81            del self.console
82        super(file, self).__del__()
83
84    def _lock(self):
85        self.lock.acquire()
86
87    def _unlock(self):
88        self.lock.release()
89
90    def _timeout(self):
91        self._lock()
92        self.timedout = True
93        self._unlock()
94        self.capture('*** TIMEOUT TIMEOUT')
95
96    def _ok_kill(self):
97        self._lock()
98        self.kill_good = True
99        self._unlock()
100        try:
101            self.process.kill()
102        except:
103            pass
104
105    def _target_reset(self):
106        if self.defined('target_reset_command'):
107            reset_cmd = self.expand('%{target_reset_command}').strip()
108            if len(reset_cmd) > 0:
109                rs_proc = execute.capture_execution()
110                ec, proc, output = rs_proc.open(reset_cmd, shell = True)
111                self._capture_console('target reset: %s: %s' % (reset_cmd, output))
112
113    def _output_length(self):
114        self._lock()
115        if self.test_started:
116            l = len(self.output)
117            self._unlock()
118            return l
119        self._unlock()
120        return 0
121
122    def _capture_console(self, text):
123        text = [('>', l) for l in text.replace(chr(13), '').splitlines()]
124        if self.output is not None:
125            self._realtime_trace(text)
126            self.output += text
127
128    def _dir_console(self, data):
129        if self.console is not None:
130            raise error.general(self._name_line_msg('console already configured'))
131        if len(data) == 0:
132            raise error.general(self._name_line_msg('no console configuration provided'))
133        console_trace = trace = self.debug_trace('console')
134        if data[0] == 'stdio':
135            self.console = console.stdio(trace = console_trace)
136        elif data[0] == 'tty':
137            if len(data) < 2 or len(data) >3:
138                raise error.general(self._name_line_msg('no tty configuration provided'))
139            if len(data) == 3:
140                settings = data[2]
141            else:
142                settings = None
143            self.console = console.tty(data[1],
144                                       output = self.capture,
145                                       setup = settings,
146                                       trace = console_trace)
147        else:
148            raise error.general(self._name_line_msg('invalid console type'))
149
150    def _dir_execute(self, data, total, index, exe, bsp_arch, bsp):
151        self.process = execute.execute(output = self.capture)
152        if not self.in_error:
153            if self.console:
154                self.console.open()
155            self.capture_console('run: %s' % (' '.join(data)))
156            ec, proc = self.process.open(data,
157                                         timeout = (int(self.expand('%{timeout}')),
158                                                    self._timeout))
159            self._lock()
160            if not self.kill_good and ec > 0:
161                self._error('execute failed: %s: exit-code:%d' % (' '.join(data), ec))
162            elif self.timedout:
163                self.process.kill()
164            self._unlock()
165            if self.console:
166                self.console.close()
167
168    def _dir_gdb(self, data, total, index, exe, bsp_arch, bsp):
169        if len(data) < 3 or len(data) > 4:
170            raise error.general('invalid %gdb arguments')
171        self.process = gdb.gdb(bsp_arch, bsp,
172                               trace = self.debug_trace('gdb'),
173                               mi_trace = self.debug_trace('gdb-mi'))
174        script = self.expand('%%{%s}' % data[2])
175        if script:
176            script = [l.strip() for l in script.splitlines()]
177        if not self.in_error:
178            if self.console:
179                self.console.open()
180            self.process.open(data[0], data[1],
181                              script = script,
182                              output = self.capture,
183                              gdb_console = self.capture_console,
184                              timeout = int(self.expand('%{timeout}')))
185            if self.console:
186                self.console.close()
187
188    def _dir_tftp(self, data, total, index, exe, bsp_arch, bsp):
189        if len(data) != 2:
190            raise error.general('invalid %tftp arguments')
191        try:
192            port = int(data[1])
193        except:
194            raise error.general('invalid %tftp port')
195        self.kill_on_end = True
196        self.process = tftp.tftp(bsp_arch, bsp,
197                                 trace = self.debug_trace('gdb'))
198        if not self.in_error:
199            if self.console:
200                self.console.open()
201            self.process.open(executable = data[0],
202                              port = port,
203                              output_length = self._output_length,
204                              console = self.capture_console,
205                              timeout = (int(self.expand('%{timeout}')),
206                                         self._timeout))
207            if self.console:
208                self.console.close()
209
210    def _directive_filter(self, results, directive, info, data):
211        if results[0] == 'directive':
212            _directive = results[1]
213            _data = results[2]
214            ds = []
215            if len(_data):
216                ds = [_data[0]]
217                if len(_data) > 1:
218                    ds += _data[1].split()
219            ds = self.expand(ds)
220
221            if _directive == '%console':
222                self._dir_console(ds)
223            else:
224                self._lock()
225                try:
226                    self.output = []
227                    total = int(self.expand('%{test_total}'))
228                    index = int(self.expand('%{test_index}'))
229                    exe = self.expand('%{test_executable}')
230                    bsp_arch = self.expand('%{bsp_arch}')
231                    bsp = self.expand('%{bsp}')
232                    self.report.start(index, total, exe, exe, bsp_arch, bsp)
233                    if self.index == 1:
234                        self._target_reset()
235                finally:
236                    self._unlock()
237                if _directive == '%execute':
238                    self._dir_execute(ds, total, index, exe, bsp_arch, bsp)
239                elif _directive == '%gdb':
240                    self._dir_gdb(ds, total, index, exe, bsp_arch, bsp)
241                elif _directive == '%tftp':
242                    self._dir_tftp(ds, total, index, exe, bsp_arch, bsp)
243                else:
244                    raise error.general(self._name_line_msg('invalid directive'))
245                self._lock()
246                try:
247                    status = self.report.end(exe, self.output)
248                    if status == 'timeout':
249                        self._target_reset()
250                    self.process = None
251                    self.output = None
252                finally:
253                    self._unlock()
254        return None, None, None
255
256    def _realtime_trace(self, text):
257        if self.realtime_trace:
258            for l in text:
259                print(' '.join(l))
260
261    def run(self):
262        if self.defined('target_reset_regex'):
263            try:
264                regex = self.expand('%{target_reset_regex}')
265                self.target_reset_regx = re.compile(regex, re.MULTILINE)
266            except:
267                msg = 'invalid target reset regex: %s' % (regex)
268                raise error.general(msg)
269        self.load(self.name)
270
271    def capture(self, text):
272        if not self.test_started:
273            self.test_started = '*** BEGIN OF TEST ' in text
274        ok_to_kill = '*** TEST STATE: USER_INPUT' in text or \
275                     '*** TEST STATE: BENCHMARK' in text
276        if ok_to_kill:
277            reset_target = True
278        else:
279            reset_target = False
280        if self.test_started and self.target_reset_regx is not None:
281            if self.target_reset_regx.match(text):
282                self.capture_console('target reset detected')
283                ok_to_kill = True
284        if self.kill_on_end:
285            if self.test_label is None:
286                s = text.find('*** BEGIN OF TEST ')
287                if s >= 0:
288                    e = text[s + 3:].find('***')
289                    if e >= 0:
290                        self.test_label = text[s + len('*** BEGIN OF TEST '):s + e + 3 - 1]
291            if not ok_to_kill and self.test_label is not None:
292                ok_to_kill = '*** END OF TEST %s ***' % (self.test_label) in text
293        text = [(']', l) for l in text.replace(chr(13), '').splitlines()]
294        self._lock()
295        if self.output is not None:
296            self._realtime_trace(text)
297            self.output += text
298        if reset_target:
299            self._target_reset()
300        self._unlock()
301        if ok_to_kill:
302            self._ok_kill()
303
304    def capture_console(self, text):
305        self._lock()
306        self._capture_console(text)
307        self._unlock()
308
309    def debug_trace(self, flag):
310        dt = self.macros['debug_trace']
311        if dt:
312            if flag in dt.split(','):
313                return True
314        return False
315
316    def kill(self):
317        if self.process:
318            try:
319                self.process.kill()
320            except:
321                pass
Note: See TracBrowser for help on using the repository browser.