frappy.io: add readline method

For devices sending messages asynchronously, a simple
readline method is added. Waiting for characters is not
blocking writeline - only the actual reading is using
a lock.
This needs also some rework of frappy.lib.asynconn.
This commit is contained in:
2025-11-17 15:48:45 +01:00
parent ee26c72ed4
commit 151627b4f4
2 changed files with 107 additions and 47 deletions

View File

@@ -196,7 +196,7 @@ class IOBase(Communicator):
now = time.time() now = time.time()
if now >= self._last_connect_attempt + self.pollinterval: if now >= self._last_connect_attempt + self.pollinterval:
# we do not try to reconnect more often than pollinterval # we do not try to reconnect more often than pollinterval
_last_connect_attempt = now self._last_connect_attempt = now
if self.read_is_connected(): if self.read_is_connected():
return return
raise SilentError('disconnected') from None raise SilentError('disconnected') from None
@@ -236,7 +236,8 @@ class StringIO(IOBase):
self-healing is assured by polling the parameter 'is_connected' self-healing is assured by polling the parameter 'is_connected'
""" """
end_of_line = Property('end_of_line character', datatype=ValueType(), end_of_line = Property('end_of_line character or tuple(eol_read, eol_write)',
datatype=ValueType(),
default='\n', settable=True) default='\n', settable=True)
encoding = Property('used encoding', datatype=StringType(), encoding = Property('used encoding', datatype=StringType(),
default='ascii', settable=True) default='ascii', settable=True)
@@ -317,12 +318,14 @@ class StringIO(IOBase):
cmds = command.split(self._eol_write) cmds = command.split(self._eol_write)
else: else:
cmds = [command] cmds = [command]
garbage = None # do not skip garbage when no reply is expected
skip_garbage = not noreply
try: try:
for cmd in cmds: for cmd in cmds:
if self.wait_before: if self.wait_before:
time.sleep(self.wait_before) time.sleep(self.wait_before)
if garbage is None: # read garbage only once if skip_garbage:
skip_garbage = False # read garbage only once
garbage = self._conn.flush_recv() garbage = self._conn.flush_recv()
if garbage: if garbage:
self.comLog('garbage: %r', garbage) self.comLog('garbage: %r', garbage)
@@ -417,6 +420,34 @@ class StringIO(IOBase):
time.sleep(delay) time.sleep(delay)
return replies return replies
@Command(result=StringType())
def readline(self):
"""read a line, if available within self.timeout
remark: the call might return earlier, when an other
thread consumed the data in parallel
"""
self.check_connection()
try:
if self._conn.read_ready(self.timeout):
with self._lock: # important: lock only after waiting
reply = self._conn.readline(0)
if reply:
reply = reply.decode(self.encoding)
self.comLog('< %s', reply)
return reply
return ''
except ConnectionClosed:
self.closeConnection()
raise CommunicationFailedError('disconnected') from None
except Exception as e:
if self._conn is None:
raise SilentError('disconnected') from None
if repr(e) != self._last_error:
self._last_error = repr(e)
self.log.error(self._last_error)
raise SilentError(repr(e)) from e
def make_regexp(string): def make_regexp(string):
"""create a bytes regexp pattern from a string describing a bytes pattern """create a bytes regexp pattern from a string describing a bytes pattern

View File

@@ -28,10 +28,10 @@ synchronous IO (see frappy.io)
""" """
import ast import ast
import select
import socket import socket
import time import time
import re import re
from select import select
from frappy.errors import CommunicationFailedError, ConfigError from frappy.errors import CommunicationFailedError, ConfigError
from frappy.lib import closeSocket, parse_host_port, SECoP_DEFAULT_PORT from frappy.lib import closeSocket, parse_host_port, SECoP_DEFAULT_PORT
@@ -50,8 +50,7 @@ class AsynConn:
timeout = 1 # inter byte timeout timeout = 1 # inter byte timeout
scheme = None scheme = None
SCHEME_MAP = {} SCHEME_MAP = {}
connection = None # is not None, if connected _connection = None # None means disconnected
HOSTNAMEPAT = re.compile(r'[a-z0-9_.-]+$', re.IGNORECASE) # roughly checking if it is a valid hostname
def __new__(cls, uri, end_of_line=b'\n', default_settings=None): def __new__(cls, uri, end_of_line=b'\n', default_settings=None):
scheme = uri.split('://')[0] scheme = uri.split('://')[0]
@@ -85,6 +84,12 @@ class AsynConn:
if cls.scheme: if cls.scheme:
cls.SCHEME_MAP[cls.scheme] = cls cls.SCHEME_MAP[cls.scheme] = cls
@property
def connection(self):
if self._connection is None:
raise ConnectionClosed()
return self._connection
def shutdown(self): def shutdown(self):
"""prepare connection for disconnect, can be empty""" """prepare connection for disconnect, can be empty"""
@@ -106,25 +111,34 @@ class AsynConn:
""" """
raise NotImplementedError raise NotImplementedError
def recv_nowait(self):
"""return bytes in buffer without waiting"""
raise NotImplementedError
def flush_recv(self): def flush_recv(self):
"""flush all available bytes (return them)""" """flush all available bytes (return them)"""
raise NotImplementedError result = self._rxbuffer + self.recv_nowait()
self._rxbuffer = b''
return result
def readline(self, timeout=None): def readline(self, timeout=None):
"""read one line """read one line
return either a complete line or None if no data available within 1 sec (self.timeout) Return either a complete line or None if not enough is available
if a non-zero timeout is given, a timeout error is raised instead of returning None within 1 sec (self.timeout).
the timeout effectively used will not be lower than self.timeout (1 sec) With timeout=0 no waiting happens at all.
If a non-zero timeout is given, a timeout error is raised instead
of returning None.
The timeout resolution is self.timeout (1 sec by default)
""" """
if timeout: end = time.time() + timeout if timeout else 0
end = time.time() + timeout recv = self.recv_nowait if timeout == 0 else self.recv
while True: while True:
splitted = self._rxbuffer.split(self.end_of_line, 1) splitted = self._rxbuffer.split(self.end_of_line, 1)
if len(splitted) == 2: if len(splitted) == 2:
line, self._rxbuffer = splitted line, self._rxbuffer = splitted
return line return line
data = self.recv() data = recv()
if not data: if not data:
if timeout: if timeout:
if time.time() < end: if time.time() < end:
@@ -136,14 +150,17 @@ class AsynConn:
def readbytes(self, nbytes, timeout=None): def readbytes(self, nbytes, timeout=None):
"""read a fixed number of bytes """read a fixed number of bytes
return either <nbytes> bytes or None if not enough data available within 1 sec (self.timeout) Return either <nbytes> bytes or None if not enough data is available
if a non-zero timeout is given, a timeout error is raised instead of returning None within 1 sec (self.timeout).
the timeout effectively used will not be lower than self.timeout (1 sec) With timeout=0 no waiting happens at all.
If a non-zero timeout is given, a timeout error is raised instead
of returning None
The timeout resolution is self.timeout (1 sec by default)
""" """
if timeout: end = time.time() + timeout if timeout else 0
end = time.time() + timeout recv = self.recv_nowait if timeout == 0 else self.recv
while len(self._rxbuffer) < nbytes: while len(self._rxbuffer) < nbytes:
data = self.recv() data = recv()
if not data: if not data:
if timeout: if timeout:
if time.time() < end: if time.time() < end:
@@ -158,6 +175,9 @@ class AsynConn:
def writeline(self, line): def writeline(self, line):
self.send(line + self.end_of_line) self.send(line + self.end_of_line)
def read_ready(self, timeout=1):
return bool(select([self.connection], [], [], timeout)[0])
class AsynTcp(AsynConn): class AsynTcp(AsynConn):
"""a tcp/ip connection """a tcp/ip connection
@@ -174,42 +194,35 @@ class AsynTcp(AsynConn):
if uri.startswith('tcp://'): if uri.startswith('tcp://'):
uri = uri[6:] uri = uri[6:]
try: try:
host, port = parse_host_port(
host, port = parse_host_port(uri, self.default_settings.get('port', SECoP_DEFAULT_PORT)) uri, self.default_settings.get('port', SECoP_DEFAULT_PORT))
self.connection = socket.create_connection((host, port), timeout=self.timeout) self._connection = socket.create_connection(
(host, port), timeout=self.timeout)
except (ConnectionRefusedError, socket.gaierror, socket.timeout) as e: except (ConnectionRefusedError, socket.gaierror, socket.timeout) as e:
# indicate that retrying might make sense # indicate that retrying might make sense
raise CommunicationFailedError(f'can not connect to {host}:{port}, {e}') from None raise CommunicationFailedError(f'can not connect to {host}:{port}, {e}') from None
def shutdown(self): def shutdown(self):
if self.connection: if self._connection:
try: try:
self.connection.shutdown(socket.SHUT_RDWR) self.connection.shutdown(socket.SHUT_RDWR)
except OSError: except OSError:
pass # in case socket is already disconnected pass # in case socket is already disconnected
def disconnect(self): def disconnect(self):
if self.connection: if self._connection:
closeSocket(self.connection) closeSocket(self.connection)
self.connection = None self._connection = None
def send(self, data): def send(self, data):
"""send data (bytes!)""" """send data (bytes!)"""
# remark: will raise socket.timeout when output buffer is full and blocked for 1 sec # remark: will raise socket.timeout when output buffer is full and blocked for self.timeout
self.connection.sendall(data) self.connection.sendall(data)
def flush_recv(self):
"""flush recv buffer"""
data = [self._rxbuffer]
while select.select([self.connection], [], [], 0)[0]:
data.append(self.recv())
self._rxbuffer = b''
return b''.join(data)
def recv(self): def recv(self):
"""return bytes in the recv buffer """return bytes in the recv buffer
or bytes received within 1 sec or bytes received within self.timeout
""" """
try: try:
data = self.connection.recv(1024*1024) data = self.connection.recv(1024*1024)
@@ -225,6 +238,12 @@ class AsynTcp(AsynConn):
# help in this case. # help in this case.
raise ConnectionClosed() # marks end of connection raise ConnectionClosed() # marks end of connection
def recv_nowait(self):
"""return bytes in the recv buffer"""
if select([self.connection], [], [], 0)[0]:
return self.recv()
return b''
class AsynSerial(AsynConn): class AsynSerial(AsynConn):
"""a serial connection using pyserial """a serial connection using pyserial
@@ -279,31 +298,41 @@ class AsynSerial(AsynConn):
if 'timeout' not in options: if 'timeout' not in options:
options['timeout'] = self.timeout options['timeout'] = self.timeout
try: try:
self.connection = Serial(dev, **options) self._connection = Serial(dev, **options)
except ValueError as e: except ValueError as e:
raise ConfigError(e) from None raise ConfigError(e) from None
# TODO: turn exceptions into ConnectionFailedError, where a retry makes sense # TODO: turn exceptions into ConnectionFailedError, where a retry makes sense
def disconnect(self): def disconnect(self):
if self.connection: if self._connection:
self.connection.close() self.connection.close()
self.connection = None self._connection = None
def send(self, data): def send(self, data):
"""send data (bytes!)""" """send data (bytes!)"""
self.connection.write(data) self.connection.write(data)
def flush_recv(self):
result = self._rxbuffer + self.connection.read(self.connection.in_waiting)
self._rxbuffer = b''
return result
def recv(self): def recv(self):
"""return bytes received within 1 sec""" """return bytes received within self.timeout"""
if not self.connection: # disconnect() might have been called in between
raise ConnectionClosed()
n = self.connection.in_waiting n = self.connection.in_waiting
if n: if n:
return self.connection.read(n) return self.connection.read(n)
data = self.connection.read(1) data = self.connection.read(1)
return data + self.connection.read(self.connection.in_waiting) return data + self.connection.read(self.connection.in_waiting)
def recv_nowait(self):
n = self.connection.in_waiting
return self.connection.read(n) if n else b''
if not hasattr(Serial, 'fileno'):
# select is not supported for serial objects (Windows)
# create a less efficient workaround
def read_ready(self, timeout=1):
if self.connection.in_waiting:
return True
deadline = time.time() + timeout
while time.time() < deadline:
if self.connection.in_waiting:
return True
time.sleep(0.05)
return False