source: rtems-tools/tester/rt/console.py @ e058db0

Last change on this file since e058db0 was e058db0, checked in by Chris Johns <chrisj@…>, on Nov 7, 2018 at 3:55:20 AM

python: Provide support to select a valid python version.

  • Update imports after wrapping the code.
  • Fix python3 issues.
  • Fix config path issues for in repo and install runs.

Closes #3537

  • Property mode set to 100644
File size: 4.3 KB
Line 
1#
2# RTEMS Tools Project (http://www.rtems.org/)
3# Copyright 2013-2014 Chris Johns (chrisj@rtems.org)
4# All rights reserved.
5#
6# This file is part of the RTEMS Tools package in 'rtems-tools'.
7#
8# Redistribution and use in source and binary forms, with or without
9# modification, are permitted provided that the following conditions are met:
10#
11# 1. Redistributions of source code must retain the above copyright notice,
12# this list of conditions and the following disclaimer.
13#
14# 2. Redistributions in binary form must reproduce the above copyright notice,
15# this list of conditions and the following disclaimer in the documentation
16# and/or other materials provided with the distribution.
17#
18# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
22# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28# POSSIBILITY OF SUCH DAMAGE.
29#
30
31#
32# RTEMS Testing Consoles
33#
34
35from __future__ import print_function
36
37import errno
38import os
39import threading
40import time
41
42from rtemstoolkit import path
43
44import telnet
45
46#
47# Not available on Windows. Not sure what this means.
48#
49if os.name != 'nt':
50    import stty
51else:
52    stty = None
53
54def save():
55    if stty is not None:
56        return stty.save()
57    return None
58
59def restore(attributes):
60    if attributes is not None and stty is not None:
61        stty.restore(attributes)
62
63class console(object):
64    '''RTEMS Testing console base.'''
65
66    def __init__(self, name, trace):
67        self.name = name
68        self.trace = trace
69
70    def __del__(self):
71        pass
72
73    def _tracing(self):
74        return self.trace
75
76    def open(self):
77        pass
78
79    def close(self):
80        pass
81
82class stdio(console):
83    '''STDIO console.'''
84
85    def __init__(self, trace = False):
86        super(stdio, self).__init__('stdio', trace)
87
88class tty(console):
89    '''TTY console connects to the target's console.'''
90
91    def __init__(self, dev, output, setup = None, trace = False):
92        self.tty = None
93        self.read_thread = None
94        self.dev = dev
95        self.output = output
96        self.setup = setup
97        super(tty, self).__init__(dev, trace)
98
99    def __del__(self):
100        super(tty, self).__del__()
101        self.close()
102
103    def open(self):
104        def _readthread(me, x):
105            line = ''
106            while me.running:
107                time.sleep(0.05)
108                try:
109                    data = me.tty.read()
110                    if isinstance(data, bytes):
111                        data = data.decode('utf-8', 'ignore')
112                    data = [c for c in data if ord(c) < 128]
113                except IOError as ioe:
114                    if ioe.errno == errno.EAGAIN:
115                        continue
116                    raise
117                except:
118                    raise
119                for c in data:
120                    if len(c) == 0:
121                        continue
122                    if c != chr(0):
123                        line += c
124                    if c == '\n':
125                        me.output(line)
126                        line = ''
127        if stty and path.exists(self.dev):
128            self.tty = stty.tty(self.dev)
129        else:
130            self.tty = telnet.tty(self.dev)
131        self.tty.set(self.setup)
132        self.tty.on()
133        self.read_thread = threading.Thread(target = _readthread,
134                                            name = 'tty[%s]' % (self.dev),
135                                            args = (self, 0))
136        self.read_thread.daemon = True
137        self.running = True
138        self.read_thread.start()
139
140    def close(self):
141        if self.tty:
142            time.sleep(1)
143            if self.read_thread:
144                self.running = False
145                self.read_thread.join(1)
146            self.tty = None
Note: See TracBrowser for help on using the repository browser.