frappy_psi.ahcapbridge: fix behaviour when serial echo is on

This commit is contained in:
2025-11-20 17:20:49 +01:00
parent 77bda6a72f
commit e8e5d2743a
2 changed files with 43 additions and 29 deletions

View File

@@ -51,13 +51,31 @@ class IO(StringIO):
end_of_line = ('\r\n', '\r')
timeout = 5
sent_command = False # used to detect that communicate was called directly
ECHO = re.compile('>|AV |VO |FR |SI |SH ') # this is recognized as an echo
MEAS = re.compile(' *([FC]=|NO DATA)') # overriden by the module
@Command(StringType(), result=StringType())
def communicate(self, command, noreply=False):
"""communicate and remind that a command was sent"""
# this is also called by writeline
self.sent_command = True
return super().communicate(command, noreply)
for _ in range(3):
reply = super().communicate(command, noreply)
reply = reply and reply.strip()
if self.check_echo_off(reply):
return reply
raise CommunicationFailedError('detected echo but can not switch off')
def check_echo_off(self, reply):
if self.ECHO.match(reply or ''):
super().writeline('\rSERIAL ECHO OFF;UN 2')
for _ in range(3):
reply = self.readline()
if self.MEAS.match(reply or ''):
# this is a meas reply
break
return False
return True
class AHBase(HasIO, Pinata, Acquisition):
@@ -86,17 +104,19 @@ class AHBase(HasIO, Pinata, Acquisition):
export = True # for a Pinata module, the default is False!
ioClass = IO
COMMANDS = ['AV', 'VO', 'SI', 'SH', 'FR']
_error = ''
_last_start = None
_params = None
_mode = CONTINUOUS # or RUNNING or FINISHED
_cont_deadline = 0 # when to switch back to continuous after finished
_averexp_deadline = 0 # to make sure averexp is polled periodically
_lossunit = 'undefined'
# to be overridden:
PATTERN = None # a list of patterns to parse replies
MEAS_PAT = None # the pattern to parse the measurement reply
UNIT = None # our desired loss unit
MODEL_PAT = None
MODEL = None
def scanModules(self):
if self.loss_module:
@@ -108,14 +128,17 @@ class AHBase(HasIO, Pinata, Acquisition):
'cap': self.name}
def initModule(self):
self.io.setProperty('identification',
[('\rSERIAL ECHO OFF;SH MODEL',
'ILLEGAL WORD: MODEL')])
super().initModule()
self.echo = re.compile('|'.join(self.COMMANDS))
self._params = {}
self._lock = threading.RLock()
self.io.MEAS = self.MEAS_PAT
self.io.checkHWIdent = self.checkHWIdent
def checkHWIdent(self):
for _ in range(3):
if self.MODEL_PAT.match(self.communicate('SH MODEL')):
return
raise CommunicationFailedError(f'we are not connected to a {self.MODEL}')
def initialReads(self):
# UN 2 does also return the results of the last measurement
# (including the frequency for AH2700)
@@ -125,8 +148,8 @@ class AHBase(HasIO, Pinata, Acquisition):
self.goal = self.averexp
self.single_meas()
def communciate(self, command):
reply = self.io.communciate(command)
def communicate(self, command):
reply = self.io.communicate(command)
self.io.sent_command = False
return reply
@@ -176,7 +199,6 @@ class AHBase(HasIO, Pinata, Acquisition):
match = self.MEAS_PAT.match(reply)
if match:
return match.groupdict()
self.log.warn('got unexpected message %r from SI', reply)
return {}
def doPoll(self):
@@ -188,7 +210,7 @@ class AHBase(HasIO, Pinata, Acquisition):
if meas:
self.update_meas(**meas)
else:
self.log.warn('unexpected reply: %r', reply)
self.io.check_echo_off(reply)
self.retrigger_meas()
elif self._mode == FINISHED and time.time() > self._cont_deadline:
self._mode = CONTINUOUS
@@ -331,18 +353,14 @@ class AH2550(AHBase):
r'V= *(?P<voltage>[0-9.E+-]+) *V,A,*(?P<error>.*)$'
)
UNIT = 'DF'
MODEL_PAT = re.compile('ILLEGAL WORD: MODEL')
MODEL = 'AH2550'
# empirically determined - may vary with noise
# differs drastically from the table in the manual
MEAS_TIME_CONST = [0.2, 0.3, 0.4, 1.0, 1.3, 1.6, 2.2, 3.3,
5.5, 8.3, 14, 25, 47, 91, 180, 360]
def initModule(self):
self.io.setProperty('identification',
[('\rSERIAL ECHO OFF;SH MODEL',
'ILLEGAL WORD: MODEL')])
super().initModule()
def _calculate_time(self, averexp, freq):
self.calculated_time = self.calculate_time(averexp)
@@ -372,12 +390,8 @@ class AH2700(AHBase):
f'V= *(?P<voltage>[0-9.E+-]+) *V *(?P<error>.*)$'
)
UNIT = 'DS'
def initModule(self):
super().initModule()
self.io.setProperty('identification',
[('\r\nSERIAL ECHO OFF;SH MODEL',
'MODEL/OPTIONS *AH2700')])
MODEL_PAT = re.compile('MODEL/OPTIONS *AH2700')
MODEL = 'AH2700'
def scanModules(self):
yield from super().scanModules()