default settings on the IO class

allow to define default settings on the IO class:
- a default 'port' may be given for tcp
- defaults like 'baudrate' or 'parity' might be given
  for serial connections

this avoids explicit settings in the config file in case
the settings can not be changed or have a typical value
other than the defaults in serial.Serial

Change-Id: I990f47d63e785f8cc48c4af197944a8eebe91fb4
Reviewed-on: https://forge.frm2.tum.de/review/c/secop/frappy/+/30555
Reviewed-by: Georg Brandl <g.brandl@fz-juelich.de>
Tested-by: Jenkins Automated Tests <pedersen+jenkins@frm2.tum.de>
Reviewed-by: Markus Zolliker <markus.zolliker@psi.ch>
This commit is contained in:
zolliker 2023-03-02 09:24:29 +01:00
parent 0acb80380a
commit c8f30582a5
4 changed files with 33 additions and 22 deletions

View File

@ -102,6 +102,7 @@ Communication
.. autoclass:: frappy.io.IOBase
:show-inheritance:
:members: default_settings
.. autoclass:: frappy.io.StringIO
:show-inheritance:

View File

@ -112,6 +112,12 @@ class IOBase(Communicator):
wait_before = Parameter('wait time before sending', datatype=FloatRange(), default=0)
is_connected = Parameter('connection state', datatype=BoolType(), readonly=False, default=False)
pollinterval = Parameter('reconnect interval', datatype=FloatRange(0), readonly=False, default=10)
#: a dict of default settings for a device, e.g. for a LakeShore 336:
#:
#: ``default_settings = {'port': 7777, 'baudrate': 57600, 'parity': 'O', 'bytesize': 7}``
#:
#: port is used in case of tcp, the others for serial over USB
default_settings = {}
_reconnectCallbacks = None
_conn = None
@ -243,7 +249,7 @@ class StringIO(IOBase):
def connectStart(self):
if not self.is_connected:
uri = self.uri
self._conn = AsynConn(uri, self._eol_read)
self._conn = AsynConn(uri, self._eol_read, default_settings=self.default_settings)
self.is_connected = True
for command, regexp in self.identification:
reply = self.communicate(command)
@ -354,7 +360,7 @@ class BytesIO(IOBase):
def connectStart(self):
if not self.is_connected:
uri = self.uri
self._conn = AsynConn(uri, b'')
self._conn = AsynConn(uri, b'', default_settings=self.default_settings)
self.is_connected = True
for request, expected in self.identification:
replylen, replypat = make_regexp(expected)

View File

@ -32,6 +32,7 @@ import ast
import select
import socket
import time
import re
from frappy.errors import CommunicationFailedError, ConfigError
from frappy.lib import closeSocket, parseHostPort, tcpSocket
@ -51,15 +52,14 @@ class AsynConn:
scheme = None
SCHEME_MAP = {}
connection = None # is not None, if connected
defaultport = None
def __new__(cls, uri, end_of_line=b'\n'):
def __new__(cls, uri, end_of_line=b'\n', default_settings=None):
scheme = uri.split('://')[0]
iocls = cls.SCHEME_MAP.get(scheme, None)
if not iocls:
# try tcp, if scheme not given
try:
host_port = parseHostPort(uri, cls.defaultport)
host_port = parseHostPort(uri, None)
except (ValueError, TypeError, AssertionError):
if 'COM' in uri:
raise ValueError("the correct uri for a COM port is: "
@ -72,8 +72,9 @@ class AsynConn:
uri = 'tcp://%s:%d' % host_port
return object.__new__(iocls)
def __init__(self, uri, end_of_line=b'\n'):
def __init__(self, uri, end_of_line=b'\n', default_settings=None):
self.end_of_line = end_of_line
self.default_settings = default_settings or {}
self._rxbuffer = b''
def __del__(self):
@ -172,7 +173,7 @@ class AsynTcp(AsynConn):
# should be the case always
uri = uri[6:]
try:
self.connection = tcpSocket(uri, self.defaultport, self.timeout)
self.connection = tcpSocket(uri, self.default_settings.get('port'), self.timeout)
except (ConnectionRefusedError, socket.gaierror, socket.timeout) as e:
# indicate that retrying might make sense
raise CommunicationFailedError(str(e)) from None
@ -218,7 +219,7 @@ class AsynSerial(AsynConn):
uri syntax::
serial://<serial device>?[<option>=<value>[+<option>=<value> ...]]
serial://<serial device>?[<option>=<value>[&<option>=<value> ...]]
options (defaults, other examples):
@ -231,7 +232,9 @@ class AsynSerial(AsynConn):
and others (see documentation of serial.Serial)
"""
scheme = 'serial'
PARITY_NAMES = {name[0]: name for name in ['NONE', 'ODD', 'EVEN', 'MASK', 'SPACE']}
PARITY_NAMES = {name[0]: name for name in ['NONE', 'ODD', 'EVEN', 'MARK', 'SPACE']}
ARG_SEP = re.compile('[+&]') # allow + or & as options separator in uri
SETTINGS = set(Serial(None).get_settings()) # keys of valid Serial settings
def __init__(self, uri, *args, **kwargs):
if Serial is None:
@ -243,19 +246,19 @@ class AsynSerial(AsynConn):
uri = uri[9:]
uri = uri.split('?', 1)
dev = uri[0]
options = {k: v for k, v in self.default_settings.items() if k in self.SETTINGS}
if len(uri) > 1:
for kv in self.ARG_SEP.split(uri[1]):
try:
options = dict((kv.split('=') for kv in uri[1].split('+')))
except IndexError: # no uri[1], no options
options = {}
except ValueError as e:
raise ConfigError('illegal serial options') from e
parity = options.pop('parity', None) # only parity is to be treated as text
for k, v in options.items():
try:
options[k] = ast.literal_eval(v.title()) # title(): turn false/true into False/True
except ValueError:
pass
if parity is not None:
key, value = kv.split('=')
except TypeError:
raise ConfigError('%r must be <key>=<value>' % kv) from None
if key == 'parity':
options[key] = value
else:
options[key] = ast.literal_eval(value.title()) # title(): turn false/true into False/True
parity = options.get('parity')
if parity:
name = parity.upper()
fullname = self.PARITY_NAMES[name[0]]
if not fullname.startswith(name):

View File

@ -32,6 +32,7 @@ class LakeshoreIO(StringIO):
# Lakeshore commands (see manual)
# '*IDN?' is sent on connect, and the reply is checked to match the regexp 'LSCI,.*'
identification = [('*IDN?', 'LSCI,.*')]
default_settings = {'port': 7777, 'baudrate': 57600, 'parity': 'O', 'bytesize': 7}
class TemperatureSensor(HasIO, Readable):