introduce BytesIO

rename secop.stringio to secop.io, which includes now
also BytesIO and the common base class IOBase

+ a small fix in error handling

Change-Id: I8e305e2c164f4ed131f4b36ef45edd8bd222336d
Reviewed-on: https://forge.frm2.tum.de/review/c/sine2020/secop/playground/+/26393
Tested-by: Jenkins Automated Tests <pedersen+jenkins@frm2.tum.de>
Reviewed-by: Enrico Faulhaber <enrico.faulhaber@frm2.tum.de>
Reviewed-by: Markus Zolliker <markus.zolliker@psi.ch>
This commit is contained in:
zolliker 2021-07-14 16:17:10 +02:00
parent 6b610f1e25
commit bc6a99e11b
6 changed files with 227 additions and 103 deletions

View File

@ -18,85 +18,81 @@
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
# *****************************************************************************
"""Line oriented stream communication
"""stream oriented input / output
implements TCP/IP and is be used as a base for SerialIO
May be used for TCP/IP as well for serial IO or
other future extensions of AsynConn
"""
import re
import threading
import time
import threading
from secop.datatypes import ArrayOf, BoolType, \
FloatRange, StringType, TupleOf, ValueType
from secop.errors import CommunicationFailedError, \
CommunicationSilentError, ConfigError
from secop.lib.asynconn import AsynConn, ConnectionClosed
from secop.datatypes import ArrayOf, BLOBType, BoolType, FloatRange, IntRange, StringType, TupleOf, ValueType
from secop.errors import CommunicationFailedError, CommunicationSilentError, ConfigError
from secop.modules import Attached, Command, \
Communicator, Done, Module, Parameter, Property
from secop.poller import REGULAR
class StringIO(Communicator):
"""line oriented communicator
HEX_CODE = re.compile(r'[0-9a-fA-F][0-9a-fA-F]$')
self healing is assured by polling the parameter 'is_connected'
"""
class HasIodev(Module):
"""Mixin for modules using a communicator"""
iodev = Attached()
uri = Property('uri for automatic creation of the attached communication module',
StringType(), default='')
iodevDict = {}
def __init__(self, name, logger, opts, srv):
iodev = opts.get('iodev')
Module.__init__(self, name, logger, opts, srv)
if self.uri:
opts = {'uri': self.uri, 'description': 'communication device for %s' % name,
'export': False}
ioname = self.iodevDict.get(self.uri)
if not ioname:
ioname = iodev or name + '_iodev'
iodev = self.iodevClass(ioname, srv.log.getChild(ioname), opts, srv)
srv.modules[ioname] = iodev
self.iodevDict[self.uri] = ioname
self.iodev = ioname
elif not self.iodev:
raise ConfigError("Module %s needs a value for either 'uri' or 'iodev'" % name)
def initModule(self):
try:
self._iodev.read_is_connected()
except (CommunicationFailedError, AttributeError):
# AttributeError: for missing _iodev?
pass
super().initModule()
def sendRecv(self, command):
return self._iodev.communicate(command)
class IOBase(Communicator):
"""base of StringIO and BytesIO"""
uri = Property('hostname:portnumber', datatype=StringType())
end_of_line = Property('end_of_line character', datatype=ValueType(),
default='\n', settable=True)
encoding = Property('used encoding', datatype=StringType(),
default='ascii', settable=True)
identification = Property('''
identification
a list of tuples with commands and expected responses as regexp,
to be sent on connect''',
datatype=ArrayOf(TupleOf(StringType(), StringType())), default=[], export=False)
timeout = Parameter('timeout', datatype=FloatRange(0), default=2)
wait_before = Parameter('wait time before sending', datatype=FloatRange(), default=0)
is_connected = Parameter('connection state', datatype=BoolType(), readonly=False, poll=REGULAR)
pollinterval = Parameter('reconnect interval', datatype=FloatRange(0), readonly=False, default=10)
_reconnectCallbacks = None
_conn = None
_last_error = None
_lock = None
def earlyInit(self):
self._conn = None
self._lock = threading.RLock()
eol = self.end_of_line
if isinstance(eol, (tuple, list)):
if len(eol) not in (1, 2):
raise ValueError('invalid end_of_line: %s' % eol)
else:
eol = [eol]
# eol for read and write might be distinct
self._eol_read = self._convert_eol(eol[0])
if not self._eol_read:
raise ValueError('end_of_line for read must not be empty')
self._eol_write = self._convert_eol(eol[-1])
self._last_error = None
def _convert_eol(self, value):
if isinstance(value, str):
return value.encode(self.encoding)
if isinstance(value, int):
return bytes([value])
if isinstance(value, bytes):
return value
raise ValueError('invalid end_of_line: %s' % repr(value))
def connectStart(self):
if not self.is_connected:
uri = self.uri
self._conn = AsynConn(uri, self._eol_read)
self.is_connected = True
for command, regexp in self.identification:
reply = self.communicate(command)
if not re.match(regexp, reply):
self.closeConnection()
raise CommunicationFailedError('bad response: %s does not match %s' %
(reply, regexp))
raise NotImplementedError
def closeConnection(self):
"""close connection
@ -123,7 +119,7 @@ class StringIO(Communicator):
return Done
except Exception as e:
if str(e) == self._last_error:
raise CommunicationSilentError(str(e))
raise CommunicationSilentError(str(e)) from e
self._last_error = str(e)
self.log.error(self._last_error)
raise
@ -158,6 +154,59 @@ class StringIO(Communicator):
if removeme:
self._reconnectCallbacks.pop(key)
class StringIO(IOBase):
"""line oriented communicator
self healing is assured by polling the parameter 'is_connected'
"""
end_of_line = Property('end_of_line character', datatype=ValueType(),
default='\n', settable=True)
encoding = Property('used encoding', datatype=StringType(),
default='ascii', settable=True)
identification = Property('''
identification
a list of tuples with commands and expected responses as regexp,
to be sent on connect''',
datatype=ArrayOf(TupleOf(StringType(), StringType())), default=[], export=False)
def _convert_eol(self, value):
if isinstance(value, str):
return value.encode(self.encoding)
if isinstance(value, int):
return bytes([value])
if isinstance(value, bytes):
return value
raise ValueError('invalid end_of_line: %s' % repr(value))
def earlyInit(self):
super().earlyInit()
eol = self.end_of_line
if isinstance(eol, (tuple, list)):
if len(eol) not in (1, 2):
raise ValueError('invalid end_of_line: %s' % eol)
else:
eol = [eol]
# eol for read and write might be distinct
self._eol_read = self._convert_eol(eol[0])
if not self._eol_read:
raise ValueError('end_of_line for read must not be empty')
self._eol_write = self._convert_eol(eol[-1])
def connectStart(self):
if not self.is_connected:
uri = self.uri
self._conn = AsynConn(uri, self._eol_read)
self.is_connected = True
for command, regexp in self.identification:
reply = self.communicate(command)
if not re.match(regexp, reply):
self.closeConnection()
raise CommunicationFailedError('bad response: %s does not match %s' %
(reply, regexp))
@Command(StringType(), result=StringType())
def communicate(self, command):
"""send a command and receive a reply
@ -165,6 +214,7 @@ class StringIO(Communicator):
for commands without reply, the command must be joined with a query command,
wait_before is respected for end_of_lines within a command.
"""
command = command.encode(self.encoding)
if not self.is_connected:
self.read_is_connected() # try to reconnect
if not self._conn:
@ -173,9 +223,9 @@ class StringIO(Communicator):
with self._lock:
# read garbage and wait before send
if self.wait_before and self._eol_write:
cmds = command.encode(self.encoding).split(self._eol_write)
cmds = command.split(self._eol_write)
else:
cmds = [command.encode(self.encoding)]
cmds = [command]
garbage = None
try:
for cmd in cmds:
@ -186,16 +236,17 @@ class StringIO(Communicator):
if garbage:
self.log.debug('garbage: %r', garbage)
self._conn.send(cmd + self._eol_write)
self.log.debug('send: %s', cmd + self._eol_write)
reply = self._conn.readline(self.timeout)
except ConnectionClosed:
except ConnectionClosed as e:
self.closeConnection()
raise CommunicationFailedError('disconnected')
raise CommunicationFailedError('disconnected') from None
reply = reply.decode(self.encoding)
self.log.debug('recv: %s', reply)
return reply
except Exception as e:
if str(e) == self._last_error:
raise CommunicationSilentError(str(e))
raise CommunicationSilentError(str(e)) from None
self._last_error = str(e)
self.log.error(self._last_error)
raise
@ -210,40 +261,113 @@ class StringIO(Communicator):
return replies
class HasIodev(Module):
"""Mixin for modules using a communicator
def make_regexp(string):
"""create a bytes regexp pattern from a string describing a bytes pattern
not only StringIO !
:param string: a string containing white space separated items containing either
- a two digit hexadecimal number (byte value)
- a character from first unicode page, to be replaced by its code
- ?? indicating any byte
:return: a tuple of length and compiled re pattern
Example: make_regexp('00 ff A ??') == (4, re.compile(b'\x00\xffA.'))
"""
iodev = Attached()
uri = Property('uri for automatic creation of the attached communication module',
StringType(), default='')
relist = [b'.' if c == '??' else
re.escape(bytes([int(c, 16) if HEX_CODE.match(c) else ord(c)]))
for c in string.split()]
return len(relist), re.compile(b''.join(relist) + b'$')
iodevDict = {}
def __init__(self, name, logger, opts, srv):
iodev = opts.get('iodev')
Module.__init__(self, name, logger, opts, srv)
if self.uri:
opts = {'uri': self.uri, 'description': 'communication device for %s' % name,
'export': False}
ioname = self.iodevDict.get(self.uri)
if not ioname:
ioname = iodev or name + '_iodev'
iodev = self.iodevClass(ioname, srv.log.getChild(ioname), opts, srv)
srv.modules[ioname] = iodev
self.iodevDict[self.uri] = ioname
self.iodev = ioname
elif not self.iodev:
raise ConfigError("Module %s needs a value for either 'uri' or 'iodev'" % name)
def make_bytes(string):
"""create bytes from a string describing bytes
def initModule(self):
:param string: a string containing white space separated items containing either
- a two digit hexadecimal number (byte value)
- a character from first unicode page, to be replaced by its code
:return: the bytes
Example: make_bytes('02 A 20 B 03') == b'\x02A B\x03'
"""
return bytes([int(c, 16) if HEX_CODE.match(c) else ord(c) for c in string.split()])
class BytesIO(IOBase):
identification = Property(
"""identification
a list of tuples with requests and expected responses, to be sent on connect.
requests and responses are whitespace separated items
an item is either:
- a two digit hexadecimal number (byte value)
- a character
- ?? indicating ignored bytes in responses
""", datatype=ArrayOf(TupleOf(StringType(), StringType())), default=[], export=False)
def connectStart(self):
if not self.is_connected:
uri = self.uri
self._conn = AsynConn(uri, b'')
self.is_connected = True
for request, expected in self.identification:
replylen, replypat = make_regexp(expected)
reply = self.communicate(make_bytes(request), replylen)
if not replypat.match(reply):
self.closeConnection()
raise CommunicationFailedError('bad response: %r does not match %r' % (reply, expected))
@Command((BLOBType(), IntRange(0)), result=BLOBType())
def communicate(self, request, replylen): # pylint: disable=arguments-differ
"""send a request and receive (at least) <replylen> bytes as reply"""
if not self.is_connected:
self.read_is_connected() # try to reconnect
if not self._conn:
raise CommunicationSilentError('can not connect to %r' % self.uri)
try:
self._iodev.read_is_connected()
except (CommunicationFailedError, AttributeError):
# AttributeError: for missing _iodev?
pass
super().initModule()
with self._lock:
# read garbage and wait before send
try:
if self.wait_before:
time.sleep(self.wait_before)
garbage = self._conn.flush_recv()
if garbage:
self.log.debug('garbage: %r', garbage)
self._conn.send(request)
self.log.debug('send: %r', request)
reply = self._conn.readbytes(replylen, self.timeout)
except ConnectionClosed as e:
self.closeConnection()
raise CommunicationFailedError('disconnected') from None
self.log.debug('recv: %r', reply)
return self.getFullReply(request, reply)
except Exception as e:
if str(e) == self._last_error:
raise CommunicationSilentError(str(e)) from None
self._last_error = str(e)
self.log.error(self._last_error)
raise
def sendRecv(self, command):
return self._iodev.communicate(command)
def readBytes(self, nbytes):
"""read bytes
:param nbytes: the number of expected bytes
:return: the returned bytes
"""
return self._conn.readbytes(nbytes, self.timeout)
def getFullReply(self, request, replyheader):
"""to be overwritten in case the reply length is variable
:param request: the request
:param replyheader: the already received bytes
:return: the full reply (replyheader + additional bytes)
When the reply length is variable, :meth:`communicate` should be called
with the `replylen` argument set to minimum expected length of the reply.
Typically this method determines then the length of additional bytes from
the already received bytes (replyheader) and/or the request and calls
:meth:`readBytes` to get the remaining bytes.
Remark: this mechanism avoids the need to call readBytes after communicate
separately, which would not honour the lock properly.
"""
return replyheader

View File

@ -25,7 +25,7 @@
generic class for byte oriented communication
includes implementation for TCP and Serial connections
support for asynchronous communication, but may be used also for
synchronous IO (see secop.stringio.StringIO)
synchronous IO (see secop.io)
"""
import ast
@ -62,11 +62,11 @@ class AsynConn:
except (ValueError, TypeError, AssertionError):
if 'COM' in uri:
raise ValueError("the correct uri for a COM port is: "
"'serial://COM<i>[?<option>=<value>[+<option>=value ...]]'")
"'serial://COM<i>[?<option>=<value>[+<option>=value ...]]'") from None
if '/dev' in uri:
raise ValueError("the correct uri for a serial port is: "
"'serial:///dev/<tty>[?<option>=<value>[+<option>=value ...]]'")
raise ValueError('invalid uri: %s' % uri)
"'serial:///dev/<tty>[?<option>=<value>[+<option>=value ...]]'") from None
raise ValueError('invalid uri: %s' % uri) from None
iocls = cls.SCHEME_MAP['tcp']
uri = 'tcp://%s:%d' % host_port
return object.__new__(iocls)
@ -164,7 +164,7 @@ class AsynTcp(AsynConn):
self.connection = tcpSocket(uri, self.defaultport, self.timeout)
except (ConnectionRefusedError, socket.gaierror) as e:
# indicate that retrying might make sense
raise CommunicationFailedError(str(e))
raise CommunicationFailedError(str(e)) from None
def disconnect(self):
if self.connection:
@ -237,8 +237,8 @@ class AsynSerial(AsynConn):
options = dict((kv.split('=') for kv in uri[1].split('+')))
except IndexError: # no uri[1], no options
options = {}
except ValueError:
raise ConfigError('illegal serial options')
except ValueError as e:
raise ConfigError('illegal serial options') from e
parity = options.pop('parity', None) # only parity is to be treated as text
for k, v in options.items():
try:
@ -256,7 +256,7 @@ class AsynSerial(AsynConn):
try:
self.connection = Serial(dev, **options)
except ValueError as e:
raise ConfigError(e)
raise ConfigError(e) from None
# TODO: turn exceptions into ConnectionFailedError, where a retry makes sense
def disconnect(self):

View File

@ -410,7 +410,7 @@ class Module(HasAccessibles):
for pname, p in self.parameters.items():
try:
p.checkProperties()
except ConfigError:
except ConfigError as e:
errors.append('%s: %s' % (pname, e))
if errors:
raise ConfigError(errors)

View File

@ -29,7 +29,7 @@ from secop.lib import get_class
from secop.modules import Drivable, Module, Readable, Writable
from secop.params import Command, Parameter
from secop.properties import Property
from secop.stringio import HasIodev
from secop.io import HasIodev
class ProxyModule(HasIodev, Module):

View File

@ -28,7 +28,7 @@ from secop.lib import formatStatusBits
from secop.modules import Attached, Done, \
Drivable, Parameter, Property, Readable
from secop.poller import REGULAR, Poller
from secop.stringio import HasIodev
from secop.io import HasIodev
Status = Drivable.Status
@ -53,7 +53,7 @@ scan = IOHandler('scan', 'SCAN?', '%d,%d')
STATUS_BIT_LABELS = 'CS_OVL VCM_OVL VMIX_OVL VDIF_OVL R_OVER R_UNDER T_OVER T_UNDER'.split()
class StringIO(secop.stringio.StringIO):
class StringIO(secop.io.StringIO):
identification = [('*IDN?', 'LSCI,MODEL370,.*')]
wait_before = 0.05

View File

@ -43,7 +43,7 @@ from secop.lib.enum import Enum
from secop.modules import Attached, Communicator, Done, \
Drivable, Parameter, Property, Readable
from secop.poller import Poller
from secop.stringio import HasIodev
from secop.io import HasIodev
try:
import secop_psi.ppmswindows as ppmshw