frappy_psi.ahcapbridge: simplify

- remove the auto detect from replies
- improve readability
- add not implemented attributes in AHBase to avoid pylint
  complaints

Change-Id: I20aabe6c5cfaad94845cdfff22b889fb7ff7e257
This commit is contained in:
2025-11-18 12:00:07 +01:00
parent f00d37b7a6
commit 765218eed2

View File

@@ -18,6 +18,9 @@
# *****************************************************************************
"""Andeen Hagerling capacitance bridge
The speciality of this capacitance bridge is, that a measurement might take
between factions of seconds up to more than half an hour.
creates up to two additional modules for 'loss' and 'freq'
in the configuration file, only the capacitance module needs to be configured,
@@ -35,22 +38,7 @@ from frappy.core import HasIO, Parameter, Readable, StringIO, \
from frappy.datatypes import FloatRange, IntRange, StringType, TupleOf
from frappy.modules import Acquisition
from frappy.dynamic import Pinata
from frappy.errors import ProgrammingError, IsBusyError
class IO(StringIO):
end_of_line = ('\r\n', '\r')
# for writing, '\r\n' also accepted on AH2700
# for reading, '\r\n' would be correct, but '\r' does also works
# when stripping the reply
timeout = 5
@Command(StringType(), result=StringType())
def communicate(self, command, noreply=False):
"""communicate and save the last command"""
# this is also called by writeline
reply = super().communicate(command, noreply)
return reply and reply.strip()
from frappy.errors import IsBusyError, CommunicationFailedError, HardwareError
CONTINUOUS = 0
@@ -59,6 +47,19 @@ RUNNING = 2
FINISHED = 3
class IO(StringIO):
end_of_line = ('\r\n', '\r')
timeout = 5
sent_command = False # used to detect that communicate was called directly
@Command(StringType(), result=StringType())
def communicate(self, command, noreply=False):
"""communicate and remind that a command was sent"""
# this is also called by writeline
self.sent_command = True
return super().communicate(command, noreply)
class AHBase(HasIO, Pinata, Acquisition):
value = Parameter('capacitance', FloatRange(unit='pF'))
freq = Parameter('frequency', FloatRange(unit='Hz'), default=1000)
@@ -67,7 +68,7 @@ class AHBase(HasIO, Pinata, Acquisition):
readonly=False, default=0)
loss = Parameter('loss',
FloatRange(unit=''), default=0)
averexp = Parameter('base 2 exponent of goal',
averexp = Parameter('average exponent - roughly log2 of number of samples averaged',
IntRange(0, 15), readonly=False, default=0)
goal = Parameter('value for averexp for the next go()',
IntRange(0, 15), readonly=False, default=0)
@@ -80,8 +81,9 @@ class AHBase(HasIO, Pinata, Acquisition):
configure '' to disable the creation of the loss module
''', StringType(), default='')
pollinterval = Parameter(datatype=FloatRange(0.001, 1),
export=False, value=0.001)
pollinterval = Parameter('minimum pollinterval - the polling rate is determined by averaging',
value=0.1)
export = True # for a Pinata module, the default is False!
ioClass = IO
COMMANDS = ['AV', 'VO', 'SI', 'SH', 'FR']
@@ -94,6 +96,7 @@ class AHBase(HasIO, Pinata, Acquisition):
# to be overridden:
PATTERN = None # a list of patterns to parse replies
MEAS_PAT = None # the pattern to parse the measurement reply
UNIT = None # our desired loss unit
def scanModules(self):
if self.loss_module:
@@ -109,11 +112,6 @@ class AHBase(HasIO, Pinata, Acquisition):
[('\rSERIAL ECHO OFF;SH MODEL',
'ILLEGAL WORD: MODEL')])
super().initModule()
pattern = self.PATTERN + [self.MEAS_PAT]
self.pattern = {p[:2]: re.compile(p) for p in pattern}
if len(self.pattern) != len(pattern):
raise ProgrammingError('need more than two letters '
'to distinguish patterns')
self.echo = re.compile('|'.join(self.COMMANDS))
self._params = {}
self._lock = threading.RLock()
@@ -121,67 +119,49 @@ class AHBase(HasIO, Pinata, Acquisition):
def initialReads(self):
# UN 2 does also return the results of the last measurement
# (including the frequency for AH2700)
self.io.writeline('SH FR;UN 2')
self.freq = self.interprete('freq')
self.freq = self.get_param('FR', 'freq')
self.set_lossunit()
self.verify_averexp()
self.goal = self.averexp
self.single_meas()
def interprete(self, wait_for=None, tmo=None):
"""
def communciate(self, command):
reply = self.io.communciate(command)
self.io.sent_command = False
return reply
:param wait_for: name of parameter to wait for or None to wait
for measurement
:param tmo:
:return:
"""
if tmo is None:
tmo = self.io.timeout
reply = self.io.readline(tmo)
now = time.time()
while reply:
pattern = self.pattern.get(reply[:2])
if pattern:
match = pattern.match(reply)
if not match:
self.log.warning('unexpected reply syntax: %r', reply)
break
values = match.groupdict()
if len(values) == 1:
key, value = next(iter(values.items()))
self._params[key] = float(value)
else:
self._params['meas'] = values
elif self.echo.match(reply):
# this is probably an echo
# we may have lost connection, so query again averexp
self.writeline('\r\nSERIAL ECHO OFF;SH AV')
elif reply:
self.log.warning('unknown reply %r', reply)
if (wait_for or 'meas') in self._params:
break
reply = self.io.readline(tmo)
self.log.debug('doPoll %r params %r wait_for %r %d', reply,
list(self._params), wait_for, self._mode)
result = self._params.pop(wait_for, None)
if self._mode == FINISHED and now > self._cont_deadline:
self._mode = CONTINUOUS
self.status = IDLE, ''
self.single_meas()
return
if result is None and wait_for:
self.log.info(f'missing reply for {wait_for}')
return getattr(self, wait_for, None)
return result
def set_lossunit(self):
self._lossunit = self.UNIT
reply = self.communicate('UN 2')
# this should be a measurement reply
mdict = self.get_meas_reply(reply)
self._lossunit = mdict.get('lossunit', 'undefined')
def change(self, short, value, param):
def change_param(self, short, value, param):
if self._mode == RUNNING:
raise IsBusyError('can not change parameters while measuring')
with self._lock:
self.io.writeline(f'{short} {value};SH {short}')
result = self.interprete(param)
for _ in range(3):
reply = self.communicate(f'{short} {value};SH {short}')
match = self.PATTERN[param].match(reply)
if match:
result = match.group(1)
self.retrigger_meas()
return float(result)
self.retrigger_meas()
return result
raise CommunicationFailedError(f'can not change {param} to {value}')
def get_param(self, short, param):
with self._lock:
for _ in range(3):
reply = self.communicate(f'SH {short}')
match = self.PATTERN[param].match(reply)
if match:
result = match.group(1)
self.retrigger_meas()
return float(result)
self.retrigger_meas()
raise CommunicationFailedError(f'can not get {param}')
def retrigger_meas(self):
if self._mode == CONTINUOUS:
@@ -189,25 +169,38 @@ class AHBase(HasIO, Pinata, Acquisition):
def single_meas(self):
self._last_start = time.time()
self.writeline('SI')
self.io.writeline('SI')
self.io.sent_command = False
def get_meas_reply(self, reply):
match = self.MEAS_PAT.match(reply)
if match:
return match.groupdict()
self.log.warn('got unexpected message %r from SI', reply)
return {}
def doPoll(self):
# this polls measurement results
# we can not do polling of other parameters, as they would
# interrupt measurements. averexp needs a special treatment
self.interprete(tmo=1)
with self._lock:
for param in list(self._params):
value = self._params.pop(param, None)
if param == 'meas':
self.update_meas(**value)
self.retrigger_meas()
elif param == 'averexp':
self.update_averexp(value)
elif param == 'freq':
self.update_freq(value)
elif param == 'voltage':
self.voltage = value
# this typically waits longer than the low pollinterval
# -> after returning, doPoll is called again immediately
reply = self.io.readline()
if reply:
meas = self.get_meas_reply(reply)
if meas:
self.update_meas(**meas)
else:
self.log.warn('unexpected reply: %r', reply)
self.retrigger_meas()
elif self._mode == FINISHED and time.time() > self._cont_deadline:
self._mode = CONTINUOUS
self.status = IDLE, ''
self.single_meas()
elif self.io.sent_command:
# self.io.communicate was called directly
# -> we have to retrigger SI again
if self._mode == CONTINUOUS:
self.single_meas()
elif self._mode == RUNNING:
self.finish(WARN, 'interrupted')
def update_freq(self, value):
self.freq = value
@@ -219,52 +212,50 @@ class AHBase(HasIO, Pinata, Acquisition):
self._calculate_time(value, self.freq)
def update_meas(self, cap, loss, lossunit, voltage, error, freq=None):
"""update given arguments (these are strings!)"""
"""update given arguments from a measurement reply (these are strings!)"""
self._error = error
if self._error:
status = WARN, self._error
else:
status = IDLE, '' if self._mode == CONTINUOUS else 'finished'
if status != self.status:
self.status = status
now = time.time()
if self._mode == RUNNING:
self._cont_deadline = now + 5
self._mode = FINISHED
self.finish(*status)
elif status != self.status:
self.status = status
if freq:
self.freq = float(freq)
self._calculate_time(self.averexp, self.freq)
self.value = float(cap)
self.voltage = float(voltage)
if lossunit != self.UNIT:
if self._last_start == 0:
self.log.warning('change unit to %s failed', self.UNIT)
else:
self.io.writeline('UN 2')
# this will trigger a measurement reply
# skip calculation of meas_time while interpreting result
self._last_start = 0
self.interprete('meas')
self.retrigger_meas()
self.set_lossunit()
self.retrigger_meas()
return
self.loss = float(loss)
if self._last_start:
self.meas_time = now - self._last_start
self._last_start = 0
if now > self._averexp_deadline and self._mode == CONTINUOUS:
self.verify_averexp()
self.loss = float(loss)
else:
self.retrigger_meas()
def read_loss(self):
if self._lossunit != self.UNIT:
raise HardwareError(f'bad loss unit: {self._lossunit!r}')
return self.loss
def write_voltage(self, value):
return round(self.change('VO', f'{value:.1f}', 'voltage'), 1)
return round(self.change_param('VO', f'{value:.1f}', 'voltage'), 1)
def write_averexp(self, value):
self.update_averexp(self.change('AV', f'{value}', 'averexp'))
self.update_averexp(self.change_param('AV', f'{value}', 'averexp'))
def verify_averexp(self):
# we do not want to use read_averexp for this,
# as it will stop the measurement when polled
self.io.writeline('SH AV')
self.update_averexp(self.interprete('averexp'))
self.update_averexp(self.get_param('AV', 'averexp'))
def _calculate_time(self, averexp, freq):
self.calculated_time = self.calculate_time(averexp, freq)
@@ -277,19 +268,25 @@ class AHBase(HasIO, Pinata, Acquisition):
# this also makes sure we catch a previous meas reply
self.verify_averexp()
if self.averexp != self.goal:
self.log.info('changed averexp')
self.write_averexp(self.goal)
self.status = BUSY, 'started'
self.single_meas()
self._mode = RUNNING
def finish(self, statuscode, statustext):
self.status = statuscode, statustext
self._mode = FINISHED
self._cont_deadline = time.time() + 5
def stop(self):
"""stops measurement"""
if self._mode == RUNNING:
self.verify_averexp()
self.status = WARN, 'stopped'
self._mode = FINISHED
self._cont_deadline = time.time() + 5
self.finish(WARN, 'stopped')
def calculate_time(self, averexp, freq):
"""estimate measuring time"""
raise NotImplementedError
class Loss(Readable):
@@ -323,12 +320,12 @@ class Freq(Writable):
class AH2550(AHBase):
PATTERN = [
r'AVERAGE_AVEREXP *(?P<averexp>[0-9]*)',
r'VOLTAGE_HIGHEST *(?P<voltage>[0-9.E+-]+)',
r'FREQUENCY *(?P<freq>[0-9.E+-]+)',
]
MEAS_PAT = (
PATTERN = {
'averexp': re.compile(r'AVERAGE_AVEREXP *([0-9]*)'),
'voltage': re.compile(r'VOLTAGE_HIGHEST *([0-9.E+-]+)'),
'freq': re.compile(r'FREQUENCY *([0-9.E+-]+)'),
}
MEAS_PAT = re.compile(
r'C= *(?P<cap>[0-9.E+-]+) *PF,'
r'L= *(?P<loss>[0-9.E+-]+) *(?P<lossunit>[A-Z]*),'
r'V= *(?P<voltage>[0-9.E+-]+) *V,A,*(?P<error>.*)$'
@@ -363,18 +360,18 @@ class AH2700(AHBase):
default: not created
''',
StringType(), default='')
PATTERN = [
r'AVERAGE *AVEREXP=(?P<averexp>[0-9]*)',
r'VOLTAGE HIGHEST *(?P<voltage>[0-9.E+-]+)',
r'FREQUENCY *(?P<freq>[0-9.E+-]+)',
]
UNIT = 'DS'
MEAS_PAT = (
PATTERN = {
'averexp': re.compile(r'AVERAGE *AVEREXP=([0-9]*)'),
'voltage': re.compile(r'VOLTAGE HIGHEST *([0-9.E+-]+)'),
'freq': re.compile(r'FREQUENCY *([0-9.E+-]+)'),
}
MEAS_PAT = re.compile(
r'F= *(?P<freq>[0-9.E+-]+) *HZ '
r'C= *(?P<cap>[0-9.E+-]+) *PF '
r'L= *(?P<loss>[0-9.E+-]+) *(?P<lossunit>[A-Z]*) '
f'V= *(?P<voltage>[0-9.E+-]+) *V *(?P<error>.*)$'
)
UNIT = 'DS'
def initModule(self):
super().initModule()
@@ -391,19 +388,21 @@ class AH2700(AHBase):
'cap': self.name}
def write_freq(self, value):
self.change('FR', f'{value:g}', 'freq')
self.change_param('FR', f'{value:g}', 'freq')
self.update_freq(value)
return round(value, 1)
# empirically determined - may vary with noise
# differs drastically from the table in the manual
MEAS_TIME_CONST = [
# (upper freq limit, meas time @ avrexp=7 - 0.8)
(75, 20),
(150, 10),
(270, 5.62),
(550, 2.34),
(1100, 2.73),
(4500, 1.02),
(20000, 0.51),
# (upper freq limit, meas time @ avrexp=7 )
(75, 20.8),
(150, 10.8),
(270, 6.42),
(550, 3.14),
(1100, 3.53),
(4500, 1.82),
(20000, 1.31),
]
@Command(TupleOf(IntRange(0, 15), FloatRange(50, 20000)),
@@ -412,7 +411,7 @@ class AH2700(AHBase):
"""calculate estimated measuring time
from time efficiency considerations averexp > 7 is recommended
especially for freq < 550 no time may be saved with averexp <= 7
especially for freq < 550 no time is saved with averexp <= 7
"""
for f, c in self.MEAS_TIME_CONST:
if f > freq:
@@ -421,9 +420,9 @@ class AH2700(AHBase):
else:
const = self.MEAS_TIME_CONST[-1][1]
if averexp >= 8:
result = 0.8 + const * (0.5 + 2 ** (averexp - 8))
result = 0.8 + (const - 0.8) * (0.5 + 2 ** (averexp - 8))
elif freq < 550:
result = 0.8 + const
result = const
else:
result = 0.6 + 2 ** (averexp - 7) * const
result = 0.6 + 2 ** (averexp - 7) * (const - 0.8)
return round(result, 1)