stringio now works with serial connections

also allow SECoP client connections via serial

Change-Id: I10c02532a9f8e9b8f16599b98c439742da6d8f5c
Reviewed-on: https://forge.frm2.tum.de/review/c/sine2020/secop/playground/+/22525
Tested-by: JenkinsCodeReview <bjoern_pedersen@frm2.tum.de>
Reviewed-by: Markus Zolliker <markus.zolliker@psi.ch>
This commit is contained in:
zolliker 2020-02-26 11:36:28 +01:00
parent d021a116f1
commit 4bb11e249d
2 changed files with 141 additions and 111 deletions

View File

@ -16,21 +16,25 @@
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
# #
# Module authors: # Module authors:
# Enrico Faulhaber <enrico.faulhaber@frm2.tum.de>
# Markus Zolliker <markus.zolliker@psi.ch> # Markus Zolliker <markus.zolliker@psi.ch>
# #
# ***************************************************************************** # *****************************************************************************
"""asynchonous connections """asynchronous connections
generic class for byte oriented communication generic class for byte oriented communication
includes implementation for TCP connections includes implementation for TCP connections
support for asynchronous communication, but may be used also for StringIO
""" """
import socket import socket
import select
import time import time
import ast
from serial import Serial
from secop.lib import parseHostPort, tcpSocket, closeSocket from secop.lib import parseHostPort, tcpSocket, closeSocket
from secop.errors import ConfigError
class ConnectionClosed(ConnectionError): class ConnectionClosed(ConnectionError):
@ -43,7 +47,7 @@ class AsynConn:
connection = None # is not None, if connected connection = None # is not None, if connected
defaultport = None defaultport = None
def __new__(cls, uri): def __new__(cls, uri, end_of_line=b'\n'):
scheme = uri.split('://')[0] scheme = uri.split('://')[0]
iocls = cls.SCHEME_MAP.get(scheme, None) iocls = cls.SCHEME_MAP.get(scheme, None)
if not iocls: if not iocls:
@ -51,12 +55,19 @@ class AsynConn:
try: try:
host_port = parseHostPort(uri, cls.defaultport) host_port = parseHostPort(uri, cls.defaultport)
except (ValueError, TypeError, AssertionError): except (ValueError, TypeError, AssertionError):
if 'COM' in uri:
raise ValueError("the correct uri for a COM port is: "
"'serial://COM<i>[?<option>=<value>[+<option>=value ...]]'" )
if '/dev' in uri:
raise ValueError("the correct uri for a serial port is: "
"'serial:///dev/<tty>[?<option>=<value>[+<option>=value ...]]'" )
raise ValueError('invalid uri: %s' % uri) raise ValueError('invalid uri: %s' % uri)
iocls = cls.SCHEME_MAP['tcp'] iocls = cls.SCHEME_MAP['tcp']
uri = 'tcp://%s:%d' % host_port uri = 'tcp://%s:%d' % host_port
return object.__new__(iocls) return object.__new__(iocls)
def __init__(self, *args): def __init__(self, uri, end_of_line=b'\n'):
self.end_of_line = end_of_line
self._rxbuffer = b'' self._rxbuffer = b''
def __del__(self): def __del__(self):
@ -84,33 +95,40 @@ class AsynConn:
""" """
raise NotImplementedError raise NotImplementedError
def flush_recv(self):
"""flush all available bytes (return them)"""
raise NotImplementedError
def readline(self, timeout=None): def readline(self, timeout=None):
"""read one line """read one line
return either a complete line or None in case of timeout return either a complete line or None if no data available within 1 sec (self.timeout)
the timeout argument may increase, but not decrease the default timeout if a non-zero timeout is given, a timeout error is raised instead of returning None
the timeout effectively used will not be lower than self.timeout (1 sec)
""" """
if timeout: if timeout:
end = time.time() + timeout end = time.time() + timeout
while b'\n' not in self._rxbuffer: while True:
splitted = self._rxbuffer.split(self.end_of_line, 1)
if len(splitted) == 2:
line, self._rxbuffer = splitted
return line
data = self.recv() data = self.recv()
if not data: if not data:
if timeout: if timeout:
if time.time() < end: if time.time() < end:
continue continue
raise TimeoutError('timeout in readline') raise TimeoutError('timeout in readline (%g sec)' % timeout)
return None return None
self._rxbuffer += data self._rxbuffer += data
line, self._rxbuffer = self._rxbuffer.split(b'\n', 1)
return line
def writeline(self, line): def writeline(self, line):
self.send(line + b'\n') self.send(line + self.end_of_line)
class AsynTcp(AsynConn): class AsynTcp(AsynConn):
def __init__(self, uri): def __init__(self, uri, *args, **kwargs):
super().__init__() super().__init__(uri, *args, **kwargs)
self.uri = uri self.uri = uri
if uri.startswith('tcp://'): if uri.startswith('tcp://'):
# should be the case always # should be the case always
@ -126,6 +144,13 @@ class AsynTcp(AsynConn):
"""send data (bytes!)""" """send data (bytes!)"""
self.connection.sendall(data) self.connection.sendall(data)
def flush_recv(self):
"""flush recv buffer"""
data = []
while select.select([self.connection], [], [], 0)[0]:
data.append(self.recv())
return b''.join(data)
def recv(self): def recv(self):
"""return bytes received within 1 sec""" """return bytes received within 1 sec"""
try: try:
@ -138,3 +163,80 @@ class AsynTcp(AsynConn):
raise ConnectionClosed() # marks end of connection raise ConnectionClosed() # marks end of connection
AsynTcp.register_scheme('tcp') AsynTcp.register_scheme('tcp')
class AsynSerial(AsynConn):
"""a serial connection using pyserial
uri syntax:
serial://<path>?[<option>=<value>[+<option>=<value> ...]]
options (defaults, other examples):
baudrate=9600 # 4800, 115200
bytesize=8 # 5,6,7
parity=none # even, odd, mark, space
stopbits=1 # 1.5, 2
xonxoff=False # True
and others (see documentation of serial.Serial)
"""
PARITY_NAMES = {name[0]: name for name in ['NONE', 'ODD', 'EVEN', 'MASK', 'SPACE']}
def __init__(self, uri, *args, **kwargs):
super().__init__(uri, *args, **kwargs)
self.uri = uri
if uri.startswith('serial://'):
# should be the case always
uri = uri[9:]
uri = uri.split('?', 1)
dev = uri[0]
try:
options = dict((kv.split('=') for kv in uri[1].split('+')))
except IndexError: # no uri[1], no options
options = {}
except ValueError:
raise ConfigError('illegal serial options')
parity = options.pop('parity', None) # only parity is to be treated as text
for k, v in options.items():
try:
options[k] = ast.literal_eval(v.title()) # title(): turn false/true into False/True
except ValueError:
pass
if parity is not None:
name = parity.upper()
fullname = self.PARITY_NAMES[name[0]]
if not fullname.startswith(name):
raise ConfigError('illegal parity: %s' % parity)
options['parity'] = name[0]
if 'timeout' not in options:
options['timeout'] = self.timeout
try:
self.connection = Serial(dev, **options)
except ValueError as e:
raise ConfigError(e)
def disconnect(self):
if self.connection:
self.connection.close()
self.connection = None
def send(self, data):
"""send data (bytes!)"""
self.connection.write(data)
def flush_recv(self):
return self.connection.read(self.connection.in_waiting)
def recv(self):
"""return bytes received within 1 sec"""
if not self.connection: # disconnect() might have been called in between
raise ConnectionClosed()
n = self.connection.in_waiting
if n:
return self.connection.read(n)
data = self.connection.read(1)
return data + self.connection.read(self.connection.in_waiting)
AsynSerial.register_scheme('serial')

View File

@ -24,9 +24,9 @@ implements TCP/IP and is be used as a base for SerialIO
""" """
import time import time
import socket
import threading import threading
import re import re
from secop.lib.asynconn import AsynConn, ConnectionClosed
from secop.modules import Module, Communicator, Parameter, Command, Property, Attached from secop.modules import Module, Communicator, Parameter, Command, Property, Attached
from secop.datatypes import StringType, FloatRange, ArrayOf, BoolType, TupleOf from secop.datatypes import StringType, FloatRange, ArrayOf, BoolType, TupleOf
from secop.errors import CommunicationFailedError, CommunicationSilentError from secop.errors import CommunicationFailedError, CommunicationSilentError
@ -34,14 +34,9 @@ from secop.poller import REGULAR
from secop.metaclass import Done from secop.metaclass import Done
class StringIO(Communicator): class StringIO(Communicator):
"""line oriented communicator """line oriented communicator
implementation for TCP/IP streams.
other types have to override the following methods:
createConnection, readWithTimeout, writeBytes, closeConnection
self healing is assured by polling the parameter 'is_connected' self healing is assured by polling the parameter 'is_connected'
""" """
properties = { properties = {
@ -76,82 +71,35 @@ class StringIO(Communicator):
_reconnectCallbacks = None _reconnectCallbacks = None
def earlyInit(self): def earlyInit(self):
self._stream = None self._conn = None
self._lock = threading.RLock() self._lock = threading.RLock()
self._end_of_line = self.end_of_line.encode(self.encoding) self._end_of_line = self.end_of_line.encode(self.encoding)
self._connect_error = None self._connect_error = None
self._last_error = None self._last_error = None
def createConnection(self): def connectStart(self):
"""create connection
in case of success, self.is_connected MUST be set to True by implementors
"""
uri = self.uri
if uri.startswith('tcp://'):
uri = uri[6:]
try:
host, port = uri.split(':')
self._stream = socket.create_connection((host, int(port)), 10)
self.is_connected = True
except (ConnectionRefusedError, socket.gaierror) as e:
raise CommunicationFailedError(str(e))
except Exception as e:
# this is really bad, do not try again
self._connect_error = e
raise
def readWithTimeout(self, timeout):
"""read with timeout
Read bytes available now, or wait at most the specified timeout until some bytes
are available. Throw an error, if disconnected.
If no bytes are available, return b''
to be overwritten for other stream types
"""
if timeout is None or timeout < 0:
raise ValueError('illegal timeout %r' % timeout)
if not self.is_connected: if not self.is_connected:
raise CommunicationSilentError(self._last_error or 'not connected') uri = self.uri
self._stream.settimeout(timeout) try:
try: self._conn = AsynConn(uri, self._end_of_line)
reply = self._stream.recv(4096) self.is_connected = True
if reply: except Exception as e:
return reply # this is really bad, do not try again
except (BlockingIOError, socket.timeout): self._connect_error = e
return b'' raise
except Exception as e: for command, regexp in self.identification:
self.closeConnection() reply = self.do_communicate(command)
raise CommunicationFailedError('disconnected because of %s' % e) if not re.match(regexp, reply):
# other end disconnected self.closeConnection()
self.closeConnection() raise CommunicationFailedError('bad response: %s does not match %s' %
raise CommunicationFailedError('other end disconnected') (reply, regexp))
def writeBytes(self, data):
"""write bytes
to be overwritten for other stream types
"""
self._stream.sendall(data)
def closeConnection(self): def closeConnection(self):
"""close connection """close connection
self.is_connected MUST be set to False by implementors self.is_connected MUST be set to False by implementors
""" """
if not self._stream: self._conn.disconnect()
return self._conn = None
self.log.debug('disconnect %s' % self.uri)
try:
self._stream.shutdown(socket.SHUT_RDWR)
except socket.error:
pass
try:
self._stream.close()
except socket.error:
pass
self._stream = None
self.is_connected = False self.is_connected = False
def read_is_connected(self): def read_is_connected(self):
@ -185,16 +133,6 @@ class StringIO(Communicator):
return False return False
return self.read_is_connected() return self.read_is_connected()
def connectStart(self):
if not self.is_connected:
self.createConnection()
for command, regexp in self.identification:
reply = self.do_communicate(command)
if not re.match(regexp, reply):
self.closeConnection()
raise CommunicationFailedError('bad response: %s does not match %s' %
(reply, regexp))
def registerReconnectCallback(self, name, func): def registerReconnectCallback(self, name, func):
"""register reconnect callback """register reconnect callback
@ -236,25 +174,15 @@ class StringIO(Communicator):
if self.wait_before: if self.wait_before:
time.sleep(self.wait_before) time.sleep(self.wait_before)
if garbage is None: # read garbage only once if garbage is None: # read garbage only once
garbage = b'' garbage = self._conn.flush_recv()
data = self.readWithTimeout(0)
while data:
garbage += data
data = self.readWithTimeout(0)
if garbage: if garbage:
self.log.debug('garbage: %s', garbage.decode(self.encoding)) self.log.debug('garbage: %s', garbage.decode(self.encoding))
self.writeBytes((cmd + self.end_of_line).encode(self.encoding)) self._conn.send((cmd + self.end_of_line).encode(self.encoding))
timeout = self.timeout try:
buffer = b'' reply = self._conn.readline(self.timeout)
data = True except ConnectionClosed:
while data: raise CommunicationFailedError('disconnected')
data = self.readWithTimeout(timeout) reply = reply.decode(self.encoding)
buffer += data
if self._end_of_line in buffer:
break
else:
raise CommunicationFailedError('timeout')
reply = buffer.split(self._end_of_line, 1)[0].decode(self.encoding)
self.log.debug('recv: %s', reply) self.log.debug('recv: %s', reply)
return reply return reply
except Exception as e: except Exception as e: