From 151627b4f4dc83f9cce96a407871090c8ae11ced Mon Sep 17 00:00:00 2001 From: Markus Zolliker Date: Mon, 17 Nov 2025 15:48:45 +0100 Subject: [PATCH] frappy.io: add readline method For devices sending messages asynchronously, a simple readline method is added. Waiting for characters is not blocking writeline - only the actual reading is using a lock. This needs also some rework of frappy.lib.asynconn. --- frappy/io.py | 39 ++++++++++++-- frappy/lib/asynconn.py | 115 ++++++++++++++++++++++++++--------------- 2 files changed, 107 insertions(+), 47 deletions(-) diff --git a/frappy/io.py b/frappy/io.py index cb52d083..ba14e4f6 100644 --- a/frappy/io.py +++ b/frappy/io.py @@ -196,7 +196,7 @@ class IOBase(Communicator): now = time.time() if now >= self._last_connect_attempt + self.pollinterval: # we do not try to reconnect more often than pollinterval - _last_connect_attempt = now + self._last_connect_attempt = now if self.read_is_connected(): return raise SilentError('disconnected') from None @@ -236,7 +236,8 @@ class StringIO(IOBase): self-healing is assured by polling the parameter 'is_connected' """ - end_of_line = Property('end_of_line character', datatype=ValueType(), + end_of_line = Property('end_of_line character or tuple(eol_read, eol_write)', + datatype=ValueType(), default='\n', settable=True) encoding = Property('used encoding', datatype=StringType(), default='ascii', settable=True) @@ -317,12 +318,14 @@ class StringIO(IOBase): cmds = command.split(self._eol_write) else: cmds = [command] - garbage = None + # do not skip garbage when no reply is expected + skip_garbage = not noreply try: for cmd in cmds: if self.wait_before: time.sleep(self.wait_before) - if garbage is None: # read garbage only once + if skip_garbage: + skip_garbage = False # read garbage only once garbage = self._conn.flush_recv() if garbage: self.comLog('garbage: %r', garbage) @@ -417,6 +420,34 @@ class StringIO(IOBase): time.sleep(delay) return replies + @Command(result=StringType()) + def readline(self): + """read a line, if available within self.timeout + + remark: the call might return earlier, when an other + thread consumed the data in parallel + """ + self.check_connection() + try: + if self._conn.read_ready(self.timeout): + with self._lock: # important: lock only after waiting + reply = self._conn.readline(0) + if reply: + reply = reply.decode(self.encoding) + self.comLog('< %s', reply) + return reply + return '' + except ConnectionClosed: + self.closeConnection() + raise CommunicationFailedError('disconnected') from None + except Exception as e: + if self._conn is None: + raise SilentError('disconnected') from None + if repr(e) != self._last_error: + self._last_error = repr(e) + self.log.error(self._last_error) + raise SilentError(repr(e)) from e + def make_regexp(string): """create a bytes regexp pattern from a string describing a bytes pattern diff --git a/frappy/lib/asynconn.py b/frappy/lib/asynconn.py index 3f4ba818..1abe5fa8 100644 --- a/frappy/lib/asynconn.py +++ b/frappy/lib/asynconn.py @@ -28,10 +28,10 @@ synchronous IO (see frappy.io) """ import ast -import select import socket import time import re +from select import select from frappy.errors import CommunicationFailedError, ConfigError from frappy.lib import closeSocket, parse_host_port, SECoP_DEFAULT_PORT @@ -50,8 +50,7 @@ class AsynConn: timeout = 1 # inter byte timeout scheme = None SCHEME_MAP = {} - connection = None # is not None, if connected - HOSTNAMEPAT = re.compile(r'[a-z0-9_.-]+$', re.IGNORECASE) # roughly checking if it is a valid hostname + _connection = None # None means disconnected def __new__(cls, uri, end_of_line=b'\n', default_settings=None): scheme = uri.split('://')[0] @@ -85,6 +84,12 @@ class AsynConn: if cls.scheme: cls.SCHEME_MAP[cls.scheme] = cls + @property + def connection(self): + if self._connection is None: + raise ConnectionClosed() + return self._connection + def shutdown(self): """prepare connection for disconnect, can be empty""" @@ -106,25 +111,34 @@ class AsynConn: """ raise NotImplementedError + def recv_nowait(self): + """return bytes in buffer without waiting""" + raise NotImplementedError + def flush_recv(self): """flush all available bytes (return them)""" - raise NotImplementedError + result = self._rxbuffer + self.recv_nowait() + self._rxbuffer = b'' + return result def readline(self, timeout=None): """read one line - return either a complete line or None if no data available within 1 sec (self.timeout) - if a non-zero timeout is given, a timeout error is raised instead of returning None - the timeout effectively used will not be lower than self.timeout (1 sec) + Return either a complete line or None if not enough is available + within 1 sec (self.timeout). + With timeout=0 no waiting happens at all. + If a non-zero timeout is given, a timeout error is raised instead + of returning None. + The timeout resolution is self.timeout (1 sec by default) """ - if timeout: - end = time.time() + timeout + end = time.time() + timeout if timeout else 0 + recv = self.recv_nowait if timeout == 0 else self.recv while True: splitted = self._rxbuffer.split(self.end_of_line, 1) if len(splitted) == 2: line, self._rxbuffer = splitted return line - data = self.recv() + data = recv() if not data: if timeout: if time.time() < end: @@ -136,14 +150,17 @@ class AsynConn: def readbytes(self, nbytes, timeout=None): """read a fixed number of bytes - return either bytes or None if not enough data available within 1 sec (self.timeout) - if a non-zero timeout is given, a timeout error is raised instead of returning None - the timeout effectively used will not be lower than self.timeout (1 sec) + Return either bytes or None if not enough data is available + within 1 sec (self.timeout). + With timeout=0 no waiting happens at all. + If a non-zero timeout is given, a timeout error is raised instead + of returning None + The timeout resolution is self.timeout (1 sec by default) """ - if timeout: - end = time.time() + timeout + end = time.time() + timeout if timeout else 0 + recv = self.recv_nowait if timeout == 0 else self.recv while len(self._rxbuffer) < nbytes: - data = self.recv() + data = recv() if not data: if timeout: if time.time() < end: @@ -158,6 +175,9 @@ class AsynConn: def writeline(self, line): self.send(line + self.end_of_line) + def read_ready(self, timeout=1): + return bool(select([self.connection], [], [], timeout)[0]) + class AsynTcp(AsynConn): """a tcp/ip connection @@ -174,42 +194,35 @@ class AsynTcp(AsynConn): if uri.startswith('tcp://'): uri = uri[6:] try: - - host, port = parse_host_port(uri, self.default_settings.get('port', SECoP_DEFAULT_PORT)) - self.connection = socket.create_connection((host, port), timeout=self.timeout) + host, port = parse_host_port( + uri, self.default_settings.get('port', SECoP_DEFAULT_PORT)) + self._connection = socket.create_connection( + (host, port), timeout=self.timeout) except (ConnectionRefusedError, socket.gaierror, socket.timeout) as e: # indicate that retrying might make sense raise CommunicationFailedError(f'can not connect to {host}:{port}, {e}') from None def shutdown(self): - if self.connection: + if self._connection: try: self.connection.shutdown(socket.SHUT_RDWR) except OSError: pass # in case socket is already disconnected def disconnect(self): - if self.connection: + if self._connection: closeSocket(self.connection) - self.connection = None + self._connection = None def send(self, data): """send data (bytes!)""" - # remark: will raise socket.timeout when output buffer is full and blocked for 1 sec + # remark: will raise socket.timeout when output buffer is full and blocked for self.timeout self.connection.sendall(data) - def flush_recv(self): - """flush recv buffer""" - data = [self._rxbuffer] - while select.select([self.connection], [], [], 0)[0]: - data.append(self.recv()) - self._rxbuffer = b'' - return b''.join(data) - def recv(self): """return bytes in the recv buffer - or bytes received within 1 sec + or bytes received within self.timeout """ try: data = self.connection.recv(1024*1024) @@ -225,6 +238,12 @@ class AsynTcp(AsynConn): # help in this case. raise ConnectionClosed() # marks end of connection + def recv_nowait(self): + """return bytes in the recv buffer""" + if select([self.connection], [], [], 0)[0]: + return self.recv() + return b'' + class AsynSerial(AsynConn): """a serial connection using pyserial @@ -279,31 +298,41 @@ class AsynSerial(AsynConn): if 'timeout' not in options: options['timeout'] = self.timeout try: - self.connection = Serial(dev, **options) + self._connection = Serial(dev, **options) except ValueError as e: raise ConfigError(e) from None # TODO: turn exceptions into ConnectionFailedError, where a retry makes sense def disconnect(self): - if self.connection: + if self._connection: self.connection.close() - self.connection = None + self._connection = None def send(self, data): """send data (bytes!)""" self.connection.write(data) - def flush_recv(self): - result = self._rxbuffer + self.connection.read(self.connection.in_waiting) - self._rxbuffer = b'' - return result - def recv(self): - """return bytes received within 1 sec""" - if not self.connection: # disconnect() might have been called in between - raise ConnectionClosed() + """return bytes received within self.timeout""" n = self.connection.in_waiting if n: return self.connection.read(n) data = self.connection.read(1) return data + self.connection.read(self.connection.in_waiting) + + def recv_nowait(self): + n = self.connection.in_waiting + return self.connection.read(n) if n else b'' + + if not hasattr(Serial, 'fileno'): + # select is not supported for serial objects (Windows) + # create a less efficient workaround + def read_ready(self, timeout=1): + if self.connection.in_waiting: + return True + deadline = time.time() + timeout + while time.time() < deadline: + if self.connection.in_waiting: + return True + time.sleep(0.05) + return False