diff --git a/secop/io.py b/secop/io.py index a860047..90e0717 100644 --- a/secop/io.py +++ b/secop/io.py @@ -18,27 +18,64 @@ # Module authors: # Markus Zolliker # ***************************************************************************** -"""Line oriented stream communication +"""stream oriented input / output -StringIO: string oriented IO. May be used for TCP/IP as well for serial IO or +May be used for TCP/IP as well for serial IO or other future extensions of AsynConn """ import re -import threading import time +import threading -from secop.datatypes import ArrayOf, BoolType, \ - FloatRange, StringType, TupleOf, ValueType -from secop.errors import CommunicationFailedError, \ - CommunicationSilentError, ConfigError from secop.lib.asynconn import AsynConn, ConnectionClosed +from secop.datatypes import ArrayOf, BLOBType, BoolType, FloatRange, IntRange, StringType, TupleOf, ValueType +from secop.errors import CommunicationFailedError, CommunicationSilentError, ConfigError from secop.modules import Attached, Command, \ Communicator, Done, Module, Parameter, Property from secop.poller import REGULAR -class BaseIO(Communicator): +HEX_CODE = re.compile(r'[0-9a-fA-F][0-9a-fA-F]$') + + +class HasIodev(Module): + """Mixin for modules using a communicator""" + iodev = Attached() + uri = Property('uri for automatic creation of the attached communication module', + StringType(), default='') + + iodevDict = {} + + def __init__(self, name, logger, opts, srv): + iodev = opts.get('iodev') + Module.__init__(self, name, logger, opts, srv) + if self.uri: + opts = {'uri': self.uri, 'description': 'communication device for %s' % name, + 'export': False} + ioname = self.iodevDict.get(self.uri) + if not ioname: + ioname = iodev or name + '_iodev' + iodev = self.iodevClass(ioname, srv.log.getChild(ioname), opts, srv) + srv.modules[ioname] = iodev + self.iodevDict[self.uri] = ioname + self.iodev = ioname + elif not self.iodev: + raise ConfigError("Module %s needs a value for either 'uri' or 'iodev'" % name) + + def initModule(self): + try: + self._iodev.read_is_connected() + except (CommunicationFailedError, AttributeError): + # AttributeError: for missing _iodev? + pass + super().initModule() + + def sendRecv(self, command): + return self._iodev.communicate(command) + + +class IOBase(Communicator): """base of StringIO and BytesIO""" uri = Property('hostname:portnumber', datatype=StringType()) timeout = Parameter('timeout', datatype=FloatRange(0), default=2) @@ -49,6 +86,7 @@ class BaseIO(Communicator): _reconnectCallbacks = None _conn = None _last_error = None + _lock = None def earlyInit(self): self._lock = threading.RLock() @@ -81,7 +119,7 @@ class BaseIO(Communicator): return Done except Exception as e: if str(e) == self._last_error: - raise CommunicationSilentError(str(e)) + raise CommunicationSilentError(str(e)) from e self._last_error = str(e) self.log.error(self._last_error) raise @@ -116,11 +154,8 @@ class BaseIO(Communicator): if removeme: self._reconnectCallbacks.pop(key) - def communicate(self, command): - return NotImplementedError - -class StringIO(BaseIO): +class StringIO(IOBase): """line oriented communicator self healing is assured by polling the parameter 'is_connected' @@ -203,15 +238,15 @@ class StringIO(BaseIO): self._conn.send(cmd + self._eol_write) self.log.debug('send: %s', cmd + self._eol_write) reply = self._conn.readline(self.timeout) - except ConnectionClosed: + except ConnectionClosed as e: self.closeConnection() - raise CommunicationFailedError('disconnected') + raise CommunicationFailedError('disconnected') from None reply = reply.decode(self.encoding) self.log.debug('recv: %s', reply) return reply except Exception as e: if str(e) == self._last_error: - raise CommunicationSilentError(str(e)) + raise CommunicationSilentError(str(e)) from None self._last_error = str(e) self.log.error(self._last_error) raise @@ -226,40 +261,113 @@ class StringIO(BaseIO): return replies -class HasIodev(Module): - """Mixin for modules using a communicator +def make_regexp(string): + """create a bytes regexp pattern from a string describing a bytes pattern - not only StringIO ! + :param string: a string containing white space separated items containing either + - a two digit hexadecimal number (byte value) + - a character from first unicode page, to be replaced by its code + - ?? indicating any byte + + :return: a tuple of length and compiled re pattern + Example: make_regexp('00 ff A ??') == (4, re.compile(b'\x00\xffA.')) """ - iodev = Attached() - uri = Property('uri for automatic creation of the attached communication module', - StringType(), default='') + relist = [b'.' if c == '??' else + re.escape(bytes([int(c, 16) if HEX_CODE.match(c) else ord(c)])) + for c in string.split()] + return len(relist), re.compile(b''.join(relist) + b'$') - iodevDict = {} - def __init__(self, name, logger, opts, srv): - iodev = opts.get('iodev') - Module.__init__(self, name, logger, opts, srv) - if self.uri: - opts = {'uri': self.uri, 'description': 'communication device for %s' % name, - 'export': False} - ioname = self.iodevDict.get(self.uri) - if not ioname: - ioname = iodev or name + '_iodev' - iodev = self.iodevClass(ioname, srv.log.getChild(ioname), opts, srv) - srv.modules[ioname] = iodev - self.iodevDict[self.uri] = ioname - self.iodev = ioname - elif not self.iodev: - raise ConfigError("Module %s needs a value for either 'uri' or 'iodev'" % name) +def make_bytes(string): + """create bytes from a string describing bytes - def initModule(self): + :param string: a string containing white space separated items containing either + - a two digit hexadecimal number (byte value) + - a character from first unicode page, to be replaced by its code + + :return: the bytes + Example: make_bytes('02 A 20 B 03') == b'\x02A B\x03' + """ + return bytes([int(c, 16) if HEX_CODE.match(c) else ord(c) for c in string.split()]) + + +class BytesIO(IOBase): + identification = Property( + """identification + + a list of tuples with requests and expected responses, to be sent on connect. + requests and responses are whitespace separated items + an item is either: + - a two digit hexadecimal number (byte value) + - a character + - ?? indicating ignored bytes in responses + """, datatype=ArrayOf(TupleOf(StringType(), StringType())), default=[], export=False) + + def connectStart(self): + if not self.is_connected: + uri = self.uri + self._conn = AsynConn(uri, b'') + self.is_connected = True + for request, expected in self.identification: + replylen, replypat = make_regexp(expected) + reply = self.communicate(make_bytes(request), replylen) + if not replypat.match(reply): + self.closeConnection() + raise CommunicationFailedError('bad response: %r does not match %r' % (reply, expected)) + + @Command((BLOBType(), IntRange(0)), result=BLOBType()) + def communicate(self, request, replylen): # pylint: disable=arguments-differ + """send a request and receive (at least) bytes as reply""" + if not self.is_connected: + self.read_is_connected() # try to reconnect + if not self._conn: + raise CommunicationSilentError('can not connect to %r' % self.uri) try: - self._iodev.read_is_connected() - except (CommunicationFailedError, AttributeError): - # AttributeError: for missing _iodev? - pass - super().initModule() + with self._lock: + # read garbage and wait before send + try: + if self.wait_before: + time.sleep(self.wait_before) + garbage = self._conn.flush_recv() + if garbage: + self.log.debug('garbage: %r', garbage) + self._conn.send(request) + self.log.debug('send: %r', request) + reply = self._conn.readbytes(replylen, self.timeout) + except ConnectionClosed as e: + self.closeConnection() + raise CommunicationFailedError('disconnected') from None + self.log.debug('recv: %r', reply) + return self.getFullReply(request, reply) + except Exception as e: + if str(e) == self._last_error: + raise CommunicationSilentError(str(e)) from None + self._last_error = str(e) + self.log.error(self._last_error) + raise - def sendRecv(self, command): - return self._iodev.communicate(command) + def readBytes(self, nbytes): + """read bytes + + :param nbytes: the number of expected bytes + :return: the returned bytes + """ + return self._conn.readbytes(nbytes, self.timeout) + + def getFullReply(self, request, replyheader): + """to be overwritten in case the reply length is variable + + :param request: the request + :param replyheader: the already received bytes + :return: the full reply (replyheader + additional bytes) + + When the reply length is variable, :meth:`communicate` should be called + with the `replylen` argument set to minimum expected length of the reply. + Typically this method determines then the length of additional bytes from + the already received bytes (replyheader) and/or the request and calls + :meth:`readBytes` to get the remaining bytes. + + Remark: this mechanism avoids the need to call readBytes after communicate + separately, which would not honour the lock properly. + """ + return replyheader