From bc6a99e11bcbae9536ad223f533e0cfe52c5e5cb Mon Sep 17 00:00:00 2001 From: Markus Zolliker Date: Wed, 14 Jul 2021 16:17:10 +0200 Subject: [PATCH] introduce BytesIO rename secop.stringio to secop.io, which includes now also BytesIO and the common base class IOBase + a small fix in error handling Change-Id: I8e305e2c164f4ed131f4b36ef45edd8bd222336d Reviewed-on: https://forge.frm2.tum.de/review/c/sine2020/secop/playground/+/26393 Tested-by: Jenkins Automated Tests Reviewed-by: Enrico Faulhaber Reviewed-by: Markus Zolliker --- secop/{stringio.py => io.py} | 304 ++++++++++++++++++++++++----------- secop/lib/asynconn.py | 16 +- secop/modules.py | 2 +- secop/proxy.py | 2 +- secop_psi/ls370res.py | 4 +- secop_psi/ppms.py | 2 +- 6 files changed, 227 insertions(+), 103 deletions(-) rename secop/{stringio.py => io.py} (60%) diff --git a/secop/stringio.py b/secop/io.py similarity index 60% rename from secop/stringio.py rename to secop/io.py index 5554990..90e0717 100644 --- a/secop/stringio.py +++ b/secop/io.py @@ -18,85 +18,81 @@ # Module authors: # Markus Zolliker # ***************************************************************************** -"""Line oriented stream communication +"""stream oriented input / output -implements TCP/IP and is be used as a base for SerialIO +May be used for TCP/IP as well for serial IO or +other future extensions of AsynConn """ import re -import threading import time +import threading -from secop.datatypes import ArrayOf, BoolType, \ - FloatRange, StringType, TupleOf, ValueType -from secop.errors import CommunicationFailedError, \ - CommunicationSilentError, ConfigError from secop.lib.asynconn import AsynConn, ConnectionClosed +from secop.datatypes import ArrayOf, BLOBType, BoolType, FloatRange, IntRange, StringType, TupleOf, ValueType +from secop.errors import CommunicationFailedError, CommunicationSilentError, ConfigError from secop.modules import Attached, Command, \ Communicator, Done, Module, Parameter, Property from secop.poller import REGULAR -class StringIO(Communicator): - """line oriented communicator +HEX_CODE = re.compile(r'[0-9a-fA-F][0-9a-fA-F]$') - self healing is assured by polling the parameter 'is_connected' - """ + +class HasIodev(Module): + """Mixin for modules using a communicator""" + iodev = Attached() + uri = Property('uri for automatic creation of the attached communication module', + StringType(), default='') + + iodevDict = {} + + def __init__(self, name, logger, opts, srv): + iodev = opts.get('iodev') + Module.__init__(self, name, logger, opts, srv) + if self.uri: + opts = {'uri': self.uri, 'description': 'communication device for %s' % name, + 'export': False} + ioname = self.iodevDict.get(self.uri) + if not ioname: + ioname = iodev or name + '_iodev' + iodev = self.iodevClass(ioname, srv.log.getChild(ioname), opts, srv) + srv.modules[ioname] = iodev + self.iodevDict[self.uri] = ioname + self.iodev = ioname + elif not self.iodev: + raise ConfigError("Module %s needs a value for either 'uri' or 'iodev'" % name) + + def initModule(self): + try: + self._iodev.read_is_connected() + except (CommunicationFailedError, AttributeError): + # AttributeError: for missing _iodev? + pass + super().initModule() + + def sendRecv(self, command): + return self._iodev.communicate(command) + + +class IOBase(Communicator): + """base of StringIO and BytesIO""" uri = Property('hostname:portnumber', datatype=StringType()) - end_of_line = Property('end_of_line character', datatype=ValueType(), - default='\n', settable=True) - encoding = Property('used encoding', datatype=StringType(), - default='ascii', settable=True) - identification = Property(''' - identification - - a list of tuples with commands and expected responses as regexp, - to be sent on connect''', - datatype=ArrayOf(TupleOf(StringType(), StringType())), default=[], export=False) - timeout = Parameter('timeout', datatype=FloatRange(0), default=2) wait_before = Parameter('wait time before sending', datatype=FloatRange(), default=0) is_connected = Parameter('connection state', datatype=BoolType(), readonly=False, poll=REGULAR) pollinterval = Parameter('reconnect interval', datatype=FloatRange(0), readonly=False, default=10) _reconnectCallbacks = None + _conn = None + _last_error = None + _lock = None def earlyInit(self): - self._conn = None self._lock = threading.RLock() - eol = self.end_of_line - if isinstance(eol, (tuple, list)): - if len(eol) not in (1, 2): - raise ValueError('invalid end_of_line: %s' % eol) - else: - eol = [eol] - # eol for read and write might be distinct - self._eol_read = self._convert_eol(eol[0]) - if not self._eol_read: - raise ValueError('end_of_line for read must not be empty') - self._eol_write = self._convert_eol(eol[-1]) - self._last_error = None - - def _convert_eol(self, value): - if isinstance(value, str): - return value.encode(self.encoding) - if isinstance(value, int): - return bytes([value]) - if isinstance(value, bytes): - return value - raise ValueError('invalid end_of_line: %s' % repr(value)) def connectStart(self): - if not self.is_connected: - uri = self.uri - self._conn = AsynConn(uri, self._eol_read) - self.is_connected = True - for command, regexp in self.identification: - reply = self.communicate(command) - if not re.match(regexp, reply): - self.closeConnection() - raise CommunicationFailedError('bad response: %s does not match %s' % - (reply, regexp)) + raise NotImplementedError def closeConnection(self): """close connection @@ -123,7 +119,7 @@ class StringIO(Communicator): return Done except Exception as e: if str(e) == self._last_error: - raise CommunicationSilentError(str(e)) + raise CommunicationSilentError(str(e)) from e self._last_error = str(e) self.log.error(self._last_error) raise @@ -158,6 +154,59 @@ class StringIO(Communicator): if removeme: self._reconnectCallbacks.pop(key) + +class StringIO(IOBase): + """line oriented communicator + + self healing is assured by polling the parameter 'is_connected' + """ + end_of_line = Property('end_of_line character', datatype=ValueType(), + default='\n', settable=True) + encoding = Property('used encoding', datatype=StringType(), + default='ascii', settable=True) + identification = Property(''' + identification + + a list of tuples with commands and expected responses as regexp, + to be sent on connect''', + datatype=ArrayOf(TupleOf(StringType(), StringType())), default=[], export=False) + + def _convert_eol(self, value): + if isinstance(value, str): + return value.encode(self.encoding) + if isinstance(value, int): + return bytes([value]) + if isinstance(value, bytes): + return value + raise ValueError('invalid end_of_line: %s' % repr(value)) + + def earlyInit(self): + super().earlyInit() + eol = self.end_of_line + if isinstance(eol, (tuple, list)): + if len(eol) not in (1, 2): + raise ValueError('invalid end_of_line: %s' % eol) + else: + eol = [eol] + # eol for read and write might be distinct + self._eol_read = self._convert_eol(eol[0]) + if not self._eol_read: + raise ValueError('end_of_line for read must not be empty') + self._eol_write = self._convert_eol(eol[-1]) + + def connectStart(self): + if not self.is_connected: + uri = self.uri + self._conn = AsynConn(uri, self._eol_read) + self.is_connected = True + for command, regexp in self.identification: + reply = self.communicate(command) + if not re.match(regexp, reply): + self.closeConnection() + raise CommunicationFailedError('bad response: %s does not match %s' % + (reply, regexp)) + + @Command(StringType(), result=StringType()) def communicate(self, command): """send a command and receive a reply @@ -165,6 +214,7 @@ class StringIO(Communicator): for commands without reply, the command must be joined with a query command, wait_before is respected for end_of_lines within a command. """ + command = command.encode(self.encoding) if not self.is_connected: self.read_is_connected() # try to reconnect if not self._conn: @@ -173,9 +223,9 @@ class StringIO(Communicator): with self._lock: # read garbage and wait before send if self.wait_before and self._eol_write: - cmds = command.encode(self.encoding).split(self._eol_write) + cmds = command.split(self._eol_write) else: - cmds = [command.encode(self.encoding)] + cmds = [command] garbage = None try: for cmd in cmds: @@ -186,16 +236,17 @@ class StringIO(Communicator): if garbage: self.log.debug('garbage: %r', garbage) self._conn.send(cmd + self._eol_write) + self.log.debug('send: %s', cmd + self._eol_write) reply = self._conn.readline(self.timeout) - except ConnectionClosed: + except ConnectionClosed as e: self.closeConnection() - raise CommunicationFailedError('disconnected') + raise CommunicationFailedError('disconnected') from None reply = reply.decode(self.encoding) self.log.debug('recv: %s', reply) return reply except Exception as e: if str(e) == self._last_error: - raise CommunicationSilentError(str(e)) + raise CommunicationSilentError(str(e)) from None self._last_error = str(e) self.log.error(self._last_error) raise @@ -210,40 +261,113 @@ class StringIO(Communicator): return replies -class HasIodev(Module): - """Mixin for modules using a communicator +def make_regexp(string): + """create a bytes regexp pattern from a string describing a bytes pattern - not only StringIO ! + :param string: a string containing white space separated items containing either + - a two digit hexadecimal number (byte value) + - a character from first unicode page, to be replaced by its code + - ?? indicating any byte + + :return: a tuple of length and compiled re pattern + Example: make_regexp('00 ff A ??') == (4, re.compile(b'\x00\xffA.')) """ - iodev = Attached() - uri = Property('uri for automatic creation of the attached communication module', - StringType(), default='') + relist = [b'.' if c == '??' else + re.escape(bytes([int(c, 16) if HEX_CODE.match(c) else ord(c)])) + for c in string.split()] + return len(relist), re.compile(b''.join(relist) + b'$') - iodevDict = {} - def __init__(self, name, logger, opts, srv): - iodev = opts.get('iodev') - Module.__init__(self, name, logger, opts, srv) - if self.uri: - opts = {'uri': self.uri, 'description': 'communication device for %s' % name, - 'export': False} - ioname = self.iodevDict.get(self.uri) - if not ioname: - ioname = iodev or name + '_iodev' - iodev = self.iodevClass(ioname, srv.log.getChild(ioname), opts, srv) - srv.modules[ioname] = iodev - self.iodevDict[self.uri] = ioname - self.iodev = ioname - elif not self.iodev: - raise ConfigError("Module %s needs a value for either 'uri' or 'iodev'" % name) +def make_bytes(string): + """create bytes from a string describing bytes - def initModule(self): + :param string: a string containing white space separated items containing either + - a two digit hexadecimal number (byte value) + - a character from first unicode page, to be replaced by its code + + :return: the bytes + Example: make_bytes('02 A 20 B 03') == b'\x02A B\x03' + """ + return bytes([int(c, 16) if HEX_CODE.match(c) else ord(c) for c in string.split()]) + + +class BytesIO(IOBase): + identification = Property( + """identification + + a list of tuples with requests and expected responses, to be sent on connect. + requests and responses are whitespace separated items + an item is either: + - a two digit hexadecimal number (byte value) + - a character + - ?? indicating ignored bytes in responses + """, datatype=ArrayOf(TupleOf(StringType(), StringType())), default=[], export=False) + + def connectStart(self): + if not self.is_connected: + uri = self.uri + self._conn = AsynConn(uri, b'') + self.is_connected = True + for request, expected in self.identification: + replylen, replypat = make_regexp(expected) + reply = self.communicate(make_bytes(request), replylen) + if not replypat.match(reply): + self.closeConnection() + raise CommunicationFailedError('bad response: %r does not match %r' % (reply, expected)) + + @Command((BLOBType(), IntRange(0)), result=BLOBType()) + def communicate(self, request, replylen): # pylint: disable=arguments-differ + """send a request and receive (at least) bytes as reply""" + if not self.is_connected: + self.read_is_connected() # try to reconnect + if not self._conn: + raise CommunicationSilentError('can not connect to %r' % self.uri) try: - self._iodev.read_is_connected() - except (CommunicationFailedError, AttributeError): - # AttributeError: for missing _iodev? - pass - super().initModule() + with self._lock: + # read garbage and wait before send + try: + if self.wait_before: + time.sleep(self.wait_before) + garbage = self._conn.flush_recv() + if garbage: + self.log.debug('garbage: %r', garbage) + self._conn.send(request) + self.log.debug('send: %r', request) + reply = self._conn.readbytes(replylen, self.timeout) + except ConnectionClosed as e: + self.closeConnection() + raise CommunicationFailedError('disconnected') from None + self.log.debug('recv: %r', reply) + return self.getFullReply(request, reply) + except Exception as e: + if str(e) == self._last_error: + raise CommunicationSilentError(str(e)) from None + self._last_error = str(e) + self.log.error(self._last_error) + raise - def sendRecv(self, command): - return self._iodev.communicate(command) + def readBytes(self, nbytes): + """read bytes + + :param nbytes: the number of expected bytes + :return: the returned bytes + """ + return self._conn.readbytes(nbytes, self.timeout) + + def getFullReply(self, request, replyheader): + """to be overwritten in case the reply length is variable + + :param request: the request + :param replyheader: the already received bytes + :return: the full reply (replyheader + additional bytes) + + When the reply length is variable, :meth:`communicate` should be called + with the `replylen` argument set to minimum expected length of the reply. + Typically this method determines then the length of additional bytes from + the already received bytes (replyheader) and/or the request and calls + :meth:`readBytes` to get the remaining bytes. + + Remark: this mechanism avoids the need to call readBytes after communicate + separately, which would not honour the lock properly. + """ + return replyheader diff --git a/secop/lib/asynconn.py b/secop/lib/asynconn.py index 348f971..6d9fc5c 100644 --- a/secop/lib/asynconn.py +++ b/secop/lib/asynconn.py @@ -25,7 +25,7 @@ generic class for byte oriented communication includes implementation for TCP and Serial connections support for asynchronous communication, but may be used also for -synchronous IO (see secop.stringio.StringIO) +synchronous IO (see secop.io) """ import ast @@ -62,11 +62,11 @@ class AsynConn: except (ValueError, TypeError, AssertionError): if 'COM' in uri: raise ValueError("the correct uri for a COM port is: " - "'serial://COM[?