source: rtems-tools/tester/rt/console.py @ 3a867a4

Last change on this file since 3a867a4 was 3a867a4, checked in by Chris Johns <chrisj@…>, on Sep 21, 2017 at 8:26:20 AM

Add TFTP as a back end option for testing. Add telnet as a console option.

TFTP runs a local TFTP server on port 69 or another specified port and
serves each test for any requested file.

Telnet is now a console option.

  • Property mode set to 100644
File size: 4.1 KB
Line 
1#
2# RTEMS Tools Project (http://www.rtems.org/)
3# Copyright 2013-2014 Chris Johns (chrisj@rtems.org)
4# All rights reserved.
5#
6# This file is part of the RTEMS Tools package in 'rtems-tools'.
7#
8# Redistribution and use in source and binary forms, with or without
9# modification, are permitted provided that the following conditions are met:
10#
11# 1. Redistributions of source code must retain the above copyright notice,
12# this list of conditions and the following disclaimer.
13#
14# 2. Redistributions in binary form must reproduce the above copyright notice,
15# this list of conditions and the following disclaimer in the documentation
16# and/or other materials provided with the distribution.
17#
18# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
22# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28# POSSIBILITY OF SUCH DAMAGE.
29#
30
31#
32# RTEMS Testing Consoles
33#
34
35from __future__ import print_function
36
37import errno
38import os
39import threading
40import time
41
42from rtemstoolkit import path
43
44from . import telnet
45
46#
47# Not available on Windows. Not sure what this means.
48#
49if os.name != 'nt':
50    from . import stty
51else:
52    stty = None
53
54def save():
55    if stty is not None:
56        return stty.save()
57    return None
58
59def restore(attributes):
60    if attributes is not None and stty is not None:
61        stty.restore(attributes)
62
63class console(object):
64    '''RTEMS Testing console base.'''
65
66    def __init__(self, name, trace):
67        self.name = name
68        self.trace = trace
69
70    def __del__(self):
71        pass
72
73    def _tracing(self):
74        return self.trace
75
76    def open(self):
77        pass
78
79    def close(self):
80        pass
81
82class stdio(console):
83    '''STDIO console.'''
84
85    def __init__(self, trace = False):
86        super(stdio, self).__init__('stdio', trace)
87
88class tty(console):
89    '''TTY console connects to the target's console.'''
90
91    def __init__(self, dev, output, setup = None, trace = False):
92        self.tty = None
93        self.read_thread = None
94        self.dev = dev
95        self.output = output
96        self.setup = setup
97        super(tty, self).__init__(dev, trace)
98
99    def __del__(self):
100        super(tty, self).__del__()
101        self.close()
102
103    def open(self):
104        def _readthread(me, x):
105            line = ''
106            while me.running:
107                time.sleep(0.05)
108                try:
109                    data = me.tty.read()
110                except IOError as ioe:
111                    if ioe.errno == errno.EAGAIN:
112                        continue
113                    raise
114                except:
115                    raise
116                for c in data:
117                    if len(c) == 0:
118                        continue
119                    if c != chr(0):
120                        line += c
121                    if c == '\n':
122                        me.output(line)
123                        line = ''
124        if stty and path.exists(self.dev):
125            self.tty = stty.tty(self.dev)
126        else:
127            self.tty = telnet.tty(self.dev)
128        self.tty.set(self.setup)
129        self.tty.on()
130        self.read_thread = threading.Thread(target = _readthread,
131                                            name = 'tty[%s]' % (self.dev),
132                                            args = (self, 0))
133        self.read_thread.daemon = True
134        self.running = True
135        self.read_thread.start()
136
137    def close(self):
138        if self.tty:
139            time.sleep(1)
140            if self.read_thread:
141                self.running = False
142                self.read_thread.join(1)
143            self.tty = None
Note: See TracBrowser for help on using the repository browser.