source: rtems-tools/tester/rt/config.py @ 43843b8

Last change on this file since 43843b8 was 43843b8, checked in by Karel Gardas <karel@…>, on 04/19/22 at 12:28:51

tester/gdb: allow kill on the test end

Sponsored-By: Precidata

  • Property mode set to 100644
File size: 21.2 KB
Line 
1#
2# RTEMS Tools Project (http://www.rtems.org/)
3# Copyright 2013-2020 Chris Johns (chrisj@rtems.org)
4# All rights reserved.
5#
6# This file is part of the RTEMS Tools package in 'rtems-tools'.
7#
8# Redistribution and use in source and binary forms, with or without
9# modification, are permitted provided that the following conditions are met:
10#
11# 1. Redistributions of source code must retain the above copyright notice,
12# this list of conditions and the following disclaimer.
13#
14# 2. Redistributions in binary form must reproduce the above copyright notice,
15# this list of conditions and the following disclaimer in the documentation
16# and/or other materials provided with the distribution.
17#
18# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
22# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28# POSSIBILITY OF SUCH DAMAGE.
29#
30
31#
32# RTEMS Testing Config
33#
34
35from __future__ import print_function
36
37import datetime
38import os
39import re
40import threading
41
42from rtemstoolkit import configuration
43from rtemstoolkit import config
44from rtemstoolkit import error
45from rtemstoolkit import execute
46from rtemstoolkit import log
47from rtemstoolkit import path
48
49from rtemstoolkit import stacktraces
50
51import tester.rt.console
52import tester.rt.exe
53import tester.rt.gdb
54import tester.rt.tftp
55import tester.rt.wait
56
57timeout = 15
58
59class file(config.file):
60    """RTEMS Testing configuration."""
61
62    _directives = ['%execute',
63                   '%gdb',
64                   '%tftp',
65                   '%wait',
66                   '%console']
67
68    def __init__(self, index, total, report, name, opts,
69                 console_prefix = ']', _directives = _directives):
70        super(file, self).__init__(name, opts, directives = _directives)
71        self.lock = threading.Lock()
72        self.realtime_trace = self.exe_trace('output')
73        self.console_trace = self.exe_trace('console')
74        self.console_prefix = console_prefix
75        self.show_header = not self.defined('test_disable_header')
76        self.process = None
77        self.console = None
78        self.output = None
79        self.index = index
80        self.total = total
81        self.report = report
82        self.name = name
83        self.timedout = False
84        self.test_too_long = False
85        self.test_started = False
86        self.kill_good = False
87        self.kill_on_end = False
88        self.test_label = None
89        self.target_start_regx = None
90        self.target_reset_regx = None
91        self.restarts = 0
92        self.max_restarts = int(self.expand('%{max_restarts}'))
93
94    def __del__(self):
95        if self.console:
96            del self.console
97        super(file, self).__del__()
98
99    def _lock(self):
100        self.lock.acquire()
101
102    def _unlock(self):
103        self.lock.release()
104
105    def _timeout(self):
106        self._lock()
107        self.timedout = True
108        self._unlock()
109        self.capture('*** TIMEOUT TIMEOUT')
110
111    def _test_too_long(self):
112        self._lock()
113        self.test_too_long = True
114        self._unlock()
115        self.capture('*** TEST TOO LONG')
116
117    def _ok_kill(self):
118        self._lock()
119        self.kill_good = True
120        self._unlock()
121        try:
122            self.process.kill()
123        except:
124            pass
125
126    def _target_regex(self, label):
127        regex = None
128        if self.defined(label):
129            r = self.expand('%%{%s}' % (label))
130            try:
131                regex = re.compile(r, re.MULTILINE)
132            except:
133                msg = 'invalid target regex: %s: %s' % (label, r)
134                raise error.general(msg)
135        return regex
136
137    def _target_command(self, command, bsp_arch = None, bsp = None, exe = None, fexe = None):
138        if self.defined('target_%s_command' % (command)):
139            cmd = self.expand('%%{target_%s_command}' % (command)).strip()
140            if bsp_arch is not None and '@ARCH@' in cmd:
141                cmd = cmd.replace('@ARCH@', bsp_arch)
142            if bsp is not None and '@BSP@' in cmd:
143                cmd = cmd.replace('@BSP@', bsp)
144            if exe is not None and '@EXE@' in cmd:
145                cmd = cmd.replace('@EXE@', exe)
146            if fexe is not None and '@FEXE@' in cmd:
147                cmd = cmd.replace('@FEXE@', exe)
148            if len(cmd) > 0:
149                output = ''
150                if not self.opts.dry_run():
151                    rs_proc = execute.capture_execution()
152                    ec, proc, output = rs_proc.open(cmd, shell = True)
153                self._capture_console('target %s: %s' % (command, cmd))
154                if len(output) > 0:
155                    output = os.linesep.join([' ' + l for l in output.splitlines()])
156                    self._capture_console(output)
157
158    def _target_exe_filter(self, exe):
159        if self.defined('target_exe_filter'):
160            f = self.expand('%{target_exe_filter}').strip()
161            # Be like sed and use the first character as the delmiter.
162            if len(f) > 0:
163                delimiter = f[0]
164                pat = ''
165                repl = ''
166                repl_not_pat = False
167                esc = False
168                for c in f[1:]:
169                    add = True
170                    if not esc and c == '\\':
171                        esc = True
172                        add = False
173                    elif esc:
174                        if c == delimiter:
175                            c = delimiter
176                        else:
177                            c = '\\' + c
178                            esc = False
179                    elif c == delimiter:
180                        if repl_not_pat:
181                            exe = re.sub(r'%s' % (pat), repl, exe)
182                            self._capture_console('target exe filter: find:%s subst:%s -> %s' % \
183                                                  (pat, repl, exe))
184                            return exe
185                        repl_not_pat = True
186                        add = False
187                    if add:
188                        if repl_not_pat:
189                            repl += c
190                        else:
191                            pat += c
192                raise error.general('invalid exe filter: %s' % (f))
193        return exe
194
195    def _output_length(self):
196        self._lock()
197        try:
198            if self.test_started:
199                l = len(self.output)
200                return l
201        finally:
202            self._unlock()
203        return 0
204
205    def _capture_console(self, text):
206        text = [('=> ', l) for l in text.replace(chr(13), '').splitlines()]
207        if self.output is not None:
208            if self.console_trace:
209                self._realtime_trace(text)
210            self.output += text
211
212    def _dir_console(self, data):
213        if self.console is not None:
214            raise error.general(self._name_line_msg('console already configured'))
215        if len(data) == 0:
216            raise error.general(self._name_line_msg('no console configuration provided'))
217        console_trace = trace = self.exe_trace('console')
218        if not self.opts.dry_run():
219            if data[0] == 'stdio':
220                self.console = tester.rt.console.stdio(trace = console_trace)
221            elif data[0] == 'tty':
222                if len(data) < 2 or len(data) > 3:
223                    raise error.general(self._name_line_msg('invalid tty configuration provided'))
224                if len(data) == 3:
225                    settings = data[2]
226                else:
227                    settings = None
228                self.console = tester.rt.console.tty(data[1],
229                                                     output = self.capture,
230                                                     setup = settings,
231                                                     trace = console_trace)
232            else:
233                raise error.general(self._name_line_msg('invalid console type'))
234
235    def _dir_execute(self, data, total, index, rexe, bsp_arch, bsp):
236        self.process = tester.rt.exe.exe(bsp_arch, bsp, trace = self.exe_trace('exe'))
237        if not self.in_error:
238            if self.console:
239                self.console.open()
240            if not self.opts.dry_run():
241                self.process.open(data,
242                                  ignore_exit_code = self.defined('exe_ignore_ret'),
243                                  output = self.capture,
244                                  console = self.capture_console,
245                                  timeout = (int(self.expand('%{timeout}')),
246                                             int(self.expand('%{max_test_period}')),
247                                             self._timeout,
248                                             self._test_too_long))
249            if self.console:
250                self.console.close()
251
252    def _dir_gdb(self, data, total, index, exe, bsp_arch, bsp):
253        if len(data) < 3 or len(data) > 4:
254            raise error.general('invalid %gdb arguments')
255        self.process = tester.rt.gdb.gdb(bsp_arch, bsp,
256                                         trace = self.exe_trace('gdb'),
257                                         mi_trace = self.exe_trace('gdb-mi'))
258        script = self.expand('%%{%s}' % data[2])
259        if script:
260            script = [l.strip() for l in script.splitlines()]
261        self.kill_on_end = True
262        if not self.in_error:
263            if self.console:
264                self.console.open()
265            if not self.opts.dry_run():
266                self.process.open(data[0], data[1],
267                                  script = script,
268                                  output = self.capture,
269                                  gdb_console = self.capture_console,
270                                  timeout = (int(self.expand('%{timeout}')),
271                                             int(self.expand('%{max_test_period}')),
272                                             self._timeout,
273                                             self._test_too_long))
274            if self.console:
275                self.console.close()
276
277    def _dir_tftp(self, data, total, index, exe, bsp_arch, bsp):
278        if len(data) != 2:
279            raise error.general('invalid %tftp arguments')
280        try:
281            port = int(data[1])
282        except:
283            raise error.general('invalid %tftp port')
284        self.kill_on_end = True
285        if not self.opts.dry_run():
286            self.process = tester.rt.tftp.tftp(bsp_arch, bsp,
287                                               trace = self.exe_trace('tftp'))
288            if not self.in_error:
289                if self.console:
290                    self.console.open()
291                self.process.open(executable = exe,
292                                  port = port,
293                                  output_length = self._output_length,
294                                  console = self.capture_console,
295                                  timeout = (int(self.expand('%{timeout}')),
296                                             int(self.expand('%{max_test_period}')),
297                                             self._timeout,
298                                             self._test_too_long))
299                if self.console:
300                    self.console.close()
301
302    def _dir_wait(self, data, total, index, exe, bsp_arch, bsp):
303        if len(data) != 0:
304            raise error.general('%wait has no arguments')
305        self.kill_on_end = True
306        if not self.opts.dry_run():
307            self.process = tester.rt.wait.wait(bsp_arch, bsp,
308                                               trace = self.exe_trace('wait'))
309            if not self.in_error:
310                if self.console:
311                    self.console.open()
312                self.process.open(output_length = self._output_length,
313                                  console = self.capture_console,
314                                  timeout = (int(self.expand('%{timeout}')),
315                                             int(self.expand('%{max_test_period}')),
316                                             self._timeout,
317                                             self._test_too_long))
318                if self.console:
319                    self.console.close()
320
321    def _directive_filter(self, results, directive, info, data):
322        if results[0] == 'directive':
323            _directive = results[1]
324            _data = results[2]
325            ds = []
326            if len(_data):
327                ds = [_data[0]]
328                if len(_data) > 1:
329                    ds += _data[1].split()
330            ds = self.expand(ds)
331
332            if _directive == '%console':
333                self._dir_console(ds)
334            else:
335                self._lock()
336                try:
337                    self.output = []
338                    total = int(self.expand('%{test_total}'))
339                    index = int(self.expand('%{test_index}'))
340                    exe = self.expand('%{test_executable}')
341                    bsp_arch = self.expand('%{arch}')
342                    bsp = self.expand('%{bsp}')
343                    fexe = self._target_exe_filter(exe)
344                    self.report.start(index, total, exe, fexe,
345                                      bsp_arch, bsp, self.show_header)
346                    if self.index == 1:
347                        self._target_command('on', bsp_arch, bsp, exe, fexe)
348                    self._target_command('pretest', bsp_arch, bsp, exe, fexe)
349                finally:
350                    self._unlock()
351                if _directive == '%execute':
352                    self._dir_execute(ds, total, index, fexe, bsp_arch, bsp)
353                elif _directive == '%gdb':
354                    self._dir_gdb(ds, total, index, fexe, bsp_arch, bsp)
355                elif _directive == '%tftp':
356                    self._dir_tftp(ds, total, index, fexe, bsp_arch, bsp)
357                elif _directive == '%wait':
358                    self._dir_wait(ds, total, index, fexe, bsp_arch, bsp)
359                else:
360                    raise error.general(self._name_line_msg('invalid directive'))
361                self._lock()
362                if self.index == self.total:
363                    self._target_command('off', bsp_arch, bsp, exe, fexe)
364                self._target_command('posttest', bsp_arch, bsp, exe, fexe)
365                try:
366                    status = self.report.end(exe, self.output, self.console_prefix)
367                    version = self.report.get_config('version', not_found = 'n/p')
368                    build = self.report.get_config('build', not_found = 'n/p')
369                    tools = self.report.get_config('tools', not_found = 'n/p')
370                    self._capture_console('test result: %s' % (status))
371                    self._capture_console('test version: %s' % (version))
372                    self._capture_console('test build: %s' % (build))
373                    self._capture_console('test tools: %s' % (tools))
374                    if status == 'timeout':
375                        if self.index != self.total:
376                            self._target_command('reset', bsp_arch, bsp, exe, fexe)
377                    self.process = None
378                    self.output = None
379                finally:
380                    self._unlock()
381        return None, None, None
382
383    def _realtime_trace(self, text):
384        self._unlock()
385        try:
386            for l in text:
387                log.output(''.join(l))
388        except:
389            stacktraces.trace()
390            raise
391        finally:
392            self._lock()
393
394    def run(self):
395        self.target_start_regx = self._target_regex('target_start_regex')
396        self.target_reset_regx = self._target_regex('target_reset_regex')
397        self.load(self.name)
398
399    def capture(self, text):
400        self._lock()
401        try:
402            if not self.test_started:
403                s = text.find('*** BEGIN OF TEST ')
404                if s >= 0:
405                    self.test_started = True
406                    e = text[s + 3:].find('***')
407                    if e >= 0:
408                        self.test_label = text[s + len('*** BEGIN OF TEST '):s + e + 3 - 1]
409                    self._capture_console('test start: %s' % (self.test_label))
410                    self.process.target_start()
411            ok_to_kill = '*** TEST STATE: USER_INPUT' in text or \
412                         '*** TEST STATE: BENCHMARK' in text
413            if ok_to_kill:
414                reset_target = True
415            else:
416                reset_target = False
417            if self.target_start_regx is not None:
418                if self.target_start_regx.match(text):
419                    if self.test_started:
420                        self._capture_console('target start detected')
421                        ok_to_kill = True
422                    else:
423                        self.restarts += 1
424                        if self.restarts > self.max_restarts:
425                            self._capture_console('target restart maximum count reached')
426                            ok_to_kill = True
427                        else:
428                            self.process.target_restart(self.test_started)
429            if not reset_target and self.target_reset_regx is not None:
430                if self.target_reset_regx.match(text):
431                    self._capture_console('target reset condition detected')
432                    self._target_command('reset')
433                    self.process.target_reset(self.test_started)
434            if self.kill_on_end:
435                if not ok_to_kill and '*** END OF TEST ' in text:
436                    self._capture_console('test end: %s' % (self.test_label))
437                    if self.test_label is not None:
438                        ok_to_kill = '*** END OF TEST %s ***' % (self.test_label) in text
439                    self.process.target_end()
440            text = [(self.console_prefix, l) for l in text.replace(chr(13), '').splitlines()]
441            if self.output is not None:
442                if self.realtime_trace:
443                    self._realtime_trace(text)
444                self.output += text
445            if reset_target:
446                if self.index == self.total:
447                    self._target_command('off')
448                else:
449                    self._target_command('reset')
450        finally:
451            self._unlock()
452        if ok_to_kill:
453            self._ok_kill()
454
455    def capture_console(self, text):
456        self._lock()
457        try:
458            self._capture_console(text)
459        finally:
460            self._unlock()
461
462    def exe_trace(self, flag):
463        dt = self.macros['exe_trace']
464        if dt:
465            if 'all' in dt or flag in dt.split(','):
466                return True
467        return False
468
469    def kill(self):
470        if self.process:
471            try:
472                self.process.kill()
473            except:
474                pass
475
476def load(bsp, opts):
477    mandatory = ['bsp', 'arch', 'tester']
478    cfg = configuration.configuration()
479    path_ = opts.defaults.expand('%%{_configdir}/bsps/%s.ini' % (bsp))
480    ini_name = path.basename(path_)
481    for p in path.dirname(path_).split(':'):
482        if path.exists(path.join(p, ini_name)):
483            cfg.load(path.join(p, ini_name))
484            if not cfg.has_section(bsp):
485                raise error.general('bsp section not found in ini: [%s]' % (bsp))
486            item_names = cfg.get_item_names(bsp, err = False)
487            for m in mandatory:
488                if m not in item_names:
489                    raise error.general('mandatory item not found in bsp section: %s' % (m))
490            opts.defaults.set_write_map(bsp, add = True)
491            for i in cfg.get_items(bsp, flatten = False):
492                opts.defaults[i[0]] = i[1]
493            if not opts.defaults.set_read_map(bsp):
494                raise error.general('cannot set BSP read map: %s' % (bsp))
495            # Get a copy of the required fields we need
496            requires = cfg.comma_list(bsp, 'requires', err = False)
497            del cfg
498            user_config = opts.find_arg('--user-config')
499            if user_config is not None:
500                user_config = path.expanduser(user_config[1])
501                if not path.exists(user_config):
502                    raise error.general('cannot find user configuration file: %s' % (user_config))
503            else:
504                if 'HOME' in os.environ:
505                    user_config = path.join(os.environ['HOME'], '.rtemstesterrc')
506            if user_config:
507                if path.exists(user_config):
508                    cfg = configuration.configuration()
509                    cfg.load(user_config)
510                    if cfg.has_section(bsp):
511                        for i in cfg.get_items(bsp, flatten = False):
512                            opts.defaults[i[0]] = i[1]
513            # Check for the required values.
514            for r in requires:
515                if opts.defaults.get(r) is None:
516                    raise error.general('user value missing, BSP %s requires \'%s\': missing: %s' % \
517                                        (bsp, ', '.join(requires), r))
518            return opts.defaults['bsp']
519    raise error.general('cannot find bsp configuration file: %s.ini' % (bsp))
Note: See TracBrowser for help on using the repository browser.