io: add option to retry first ident request

Change-Id: I524c15387eaf2461e3dfe690250a55f058467b0b
Reviewed-on: https://forge.frm2.tum.de/review/c/secop/frappy/+/31291
Tested-by: Jenkins Automated Tests <pedersen+jenkins@frm2.tum.de>
Reviewed-by: Bjoern Pedersen <bjoern.pedersen@frm2.tum.de>
Reviewed-by: Alexander Zaft <a.zaft@fz-juelich.de>
This commit is contained in:
Alexander Zaft 2023-06-06 10:30:19 +02:00 committed by Markus Zolliker
parent bf43858031
commit fd917724d8

View File

@ -133,6 +133,14 @@ class IOBase(Communicator):
self._lock = threading.RLock() self._lock = threading.RLock()
def connectStart(self): def connectStart(self):
if not self.is_connected:
uri = self.uri
self._conn = AsynConn(uri, self._eol_read,
default_settings=self.default_settings)
self.is_connected = True
self.checkHWIdent()
def checkHWIdent(self):
raise NotImplementedError raise NotImplementedError
def closeConnection(self): def closeConnection(self):
@ -218,12 +226,19 @@ class StringIO(IOBase):
default='\n', settable=True) default='\n', settable=True)
encoding = Property('used encoding', datatype=StringType(), encoding = Property('used encoding', datatype=StringType(),
default='ascii', settable=True) default='ascii', settable=True)
identification = Property(''' identification = Property(
identification '''identification
a list of tuples with commands and expected responses as regexp, a list of tuples with commands and expected responses as regexp,
to be sent on connect''', to be sent on connect''',
datatype=ArrayOf(TupleOf(StringType(), StringType())), default=[], export=False) datatype=ArrayOf(TupleOf(StringType(), StringType())),
default=[], export=False)
retry_first_idn = Property(
'''retry first identification message
a flag to indicate whether the first message should be resent once to
avoid data that may still be in the buffer to garble the message''',
datatype=BoolType(), default=False)
def _convert_eol(self, value): def _convert_eol(self, value):
if isinstance(value, str): if isinstance(value, str):
@ -248,16 +263,22 @@ class StringIO(IOBase):
raise ValueError('end_of_line for read must not be empty') raise ValueError('end_of_line for read must not be empty')
self._eol_write = self._convert_eol(eol[-1]) self._eol_write = self._convert_eol(eol[-1])
def connectStart(self): def checkHWIdent(self):
if not self.is_connected: if not self.identification:
uri = self.uri return
self._conn = AsynConn(uri, self._eol_read, default_settings=self.default_settings) idents = iter(self.identification)
self.is_connected = True command, regexp = next(idents)
for command, regexp in self.identification: reply = self.communicate(command)
reply = self.communicate(command) if self.retry_first_idn and not re.match(regexp, reply):
if not re.match(regexp, reply): self.log.debug('first ident command not successful.'
self.closeConnection() ' retrying in case of garbage data.')
raise CommunicationFailedError(f'bad response: {reply} does not match {regexp}') idents = iter(self.identification)
for command, regexp in idents:
reply = self.communicate(command)
if not re.match(regexp, reply):
self.closeConnection()
raise CommunicationFailedError(f'bad response: {reply}'
' does not match {regexp}')
@Command(StringType(), result=StringType()) @Command(StringType(), result=StringType())
def communicate(self, command): def communicate(self, command):
@ -356,19 +377,19 @@ class BytesIO(IOBase):
- a two digit hexadecimal number (byte value) - a two digit hexadecimal number (byte value)
- a character - a character
- ?? indicating ignored bytes in responses - ?? indicating ignored bytes in responses
""", datatype=ArrayOf(TupleOf(StringType(), StringType())), default=[], export=False) """, datatype=ArrayOf(TupleOf(StringType(), StringType())),
default=[], export=False)
def connectStart(self): _eol_read = b''
if not self.is_connected:
uri = self.uri def checkHWIdent(self):
self._conn = AsynConn(uri, b'', default_settings=self.default_settings) for request, expected in self.identification:
self.is_connected = True replylen, replypat = make_regexp(expected)
for request, expected in self.identification: reply = self.communicate(make_bytes(request), replylen)
replylen, replypat = make_regexp(expected) if not replypat.match(reply):
reply = self.communicate(make_bytes(request), replylen) self.closeConnection()
if not replypat.match(reply): raise CommunicationFailedError(f'bad response: {reply!r}'
self.closeConnection() ' does not match {expected!r}')
raise CommunicationFailedError(f'bad response: {reply!r} does not match {expected!r}')
@Command((BLOBType(), IntRange(0)), result=BLOBType()) @Command((BLOBType(), IntRange(0)), result=BLOBType())
def communicate(self, request, replylen): # pylint: disable=arguments-differ def communicate(self, request, replylen): # pylint: disable=arguments-differ