diff --git a/frappy_psi/ahcapbridge.py b/frappy_psi/ahcapbridge.py index 4009ba48..df33159a 100644 --- a/frappy_psi/ahcapbridge.py +++ b/frappy_psi/ahcapbridge.py @@ -18,6 +18,9 @@ # ***************************************************************************** """Andeen Hagerling capacitance bridge +The speciality of this capacitance bridge is, that a measurement might take +between factions of seconds up to more than half an hour. + creates up to two additional modules for 'loss' and 'freq' in the configuration file, only the capacitance module needs to be configured, @@ -35,22 +38,7 @@ from frappy.core import HasIO, Parameter, Readable, StringIO, \ from frappy.datatypes import FloatRange, IntRange, StringType, TupleOf from frappy.modules import Acquisition from frappy.dynamic import Pinata -from frappy.errors import ProgrammingError, IsBusyError - - -class IO(StringIO): - end_of_line = ('\r\n', '\r') - # for writing, '\r\n' also accepted on AH2700 - # for reading, '\r\n' would be correct, but '\r' does also works - # when stripping the reply - timeout = 5 - - @Command(StringType(), result=StringType()) - def communicate(self, command, noreply=False): - """communicate and save the last command""" - # this is also called by writeline - reply = super().communicate(command, noreply) - return reply and reply.strip() +from frappy.errors import IsBusyError, CommunicationFailedError, HardwareError CONTINUOUS = 0 @@ -59,6 +47,19 @@ RUNNING = 2 FINISHED = 3 +class IO(StringIO): + end_of_line = ('\r\n', '\r') + timeout = 5 + sent_command = False # used to detect that communicate was called directly + + @Command(StringType(), result=StringType()) + def communicate(self, command, noreply=False): + """communicate and remind that a command was sent""" + # this is also called by writeline + self.sent_command = True + return super().communicate(command, noreply) + + class AHBase(HasIO, Pinata, Acquisition): value = Parameter('capacitance', FloatRange(unit='pF')) freq = Parameter('frequency', FloatRange(unit='Hz'), default=1000) @@ -67,7 +68,7 @@ class AHBase(HasIO, Pinata, Acquisition): readonly=False, default=0) loss = Parameter('loss', FloatRange(unit=''), default=0) - averexp = Parameter('base 2 exponent of goal', + averexp = Parameter('average exponent - roughly log2 of number of samples averaged', IntRange(0, 15), readonly=False, default=0) goal = Parameter('value for averexp for the next go()', IntRange(0, 15), readonly=False, default=0) @@ -80,8 +81,9 @@ class AHBase(HasIO, Pinata, Acquisition): configure '' to disable the creation of the loss module ''', StringType(), default='') - pollinterval = Parameter(datatype=FloatRange(0.001, 1), - export=False, value=0.001) + pollinterval = Parameter('minimum pollinterval - the polling rate is determined by averaging', + value=0.1) + export = True # for a Pinata module, the default is False! ioClass = IO COMMANDS = ['AV', 'VO', 'SI', 'SH', 'FR'] @@ -94,6 +96,7 @@ class AHBase(HasIO, Pinata, Acquisition): # to be overridden: PATTERN = None # a list of patterns to parse replies MEAS_PAT = None # the pattern to parse the measurement reply + UNIT = None # our desired loss unit def scanModules(self): if self.loss_module: @@ -109,11 +112,6 @@ class AHBase(HasIO, Pinata, Acquisition): [('\rSERIAL ECHO OFF;SH MODEL', 'ILLEGAL WORD: MODEL')]) super().initModule() - pattern = self.PATTERN + [self.MEAS_PAT] - self.pattern = {p[:2]: re.compile(p) for p in pattern} - if len(self.pattern) != len(pattern): - raise ProgrammingError('need more than two letters ' - 'to distinguish patterns') self.echo = re.compile('|'.join(self.COMMANDS)) self._params = {} self._lock = threading.RLock() @@ -121,67 +119,49 @@ class AHBase(HasIO, Pinata, Acquisition): def initialReads(self): # UN 2 does also return the results of the last measurement # (including the frequency for AH2700) - self.io.writeline('SH FR;UN 2') - self.freq = self.interprete('freq') + self.freq = self.get_param('FR', 'freq') + self.set_lossunit() self.verify_averexp() self.goal = self.averexp self.single_meas() - def interprete(self, wait_for=None, tmo=None): - """ + def communciate(self, command): + reply = self.io.communciate(command) + self.io.sent_command = False + return reply - :param wait_for: name of parameter to wait for or None to wait - for measurement - :param tmo: - :return: - """ - if tmo is None: - tmo = self.io.timeout - reply = self.io.readline(tmo) - now = time.time() - while reply: - pattern = self.pattern.get(reply[:2]) - if pattern: - match = pattern.match(reply) - if not match: - self.log.warning('unexpected reply syntax: %r', reply) - break - values = match.groupdict() - if len(values) == 1: - key, value = next(iter(values.items())) - self._params[key] = float(value) - else: - self._params['meas'] = values - elif self.echo.match(reply): - # this is probably an echo - # we may have lost connection, so query again averexp - self.writeline('\r\nSERIAL ECHO OFF;SH AV') - elif reply: - self.log.warning('unknown reply %r', reply) - if (wait_for or 'meas') in self._params: - break - reply = self.io.readline(tmo) - self.log.debug('doPoll %r params %r wait_for %r %d', reply, - list(self._params), wait_for, self._mode) - result = self._params.pop(wait_for, None) - if self._mode == FINISHED and now > self._cont_deadline: - self._mode = CONTINUOUS - self.status = IDLE, '' - self.single_meas() - return - if result is None and wait_for: - self.log.info(f'missing reply for {wait_for}') - return getattr(self, wait_for, None) - return result + def set_lossunit(self): + self._lossunit = self.UNIT + reply = self.communicate('UN 2') + # this should be a measurement reply + mdict = self.get_meas_reply(reply) + self._lossunit = mdict.get('lossunit', 'undefined') - def change(self, short, value, param): + def change_param(self, short, value, param): if self._mode == RUNNING: raise IsBusyError('can not change parameters while measuring') with self._lock: - self.io.writeline(f'{short} {value};SH {short}') - result = self.interprete(param) + for _ in range(3): + reply = self.communicate(f'{short} {value};SH {short}') + match = self.PATTERN[param].match(reply) + if match: + result = match.group(1) + self.retrigger_meas() + return float(result) self.retrigger_meas() - return result + raise CommunicationFailedError(f'can not change {param} to {value}') + + def get_param(self, short, param): + with self._lock: + for _ in range(3): + reply = self.communicate(f'SH {short}') + match = self.PATTERN[param].match(reply) + if match: + result = match.group(1) + self.retrigger_meas() + return float(result) + self.retrigger_meas() + raise CommunicationFailedError(f'can not get {param}') def retrigger_meas(self): if self._mode == CONTINUOUS: @@ -189,25 +169,38 @@ class AHBase(HasIO, Pinata, Acquisition): def single_meas(self): self._last_start = time.time() - self.writeline('SI') + self.io.writeline('SI') + self.io.sent_command = False + + def get_meas_reply(self, reply): + match = self.MEAS_PAT.match(reply) + if match: + return match.groupdict() + self.log.warn('got unexpected message %r from SI', reply) + return {} def doPoll(self): - # this polls measurement results - # we can not do polling of other parameters, as they would - # interrupt measurements. averexp needs a special treatment - self.interprete(tmo=1) - with self._lock: - for param in list(self._params): - value = self._params.pop(param, None) - if param == 'meas': - self.update_meas(**value) - self.retrigger_meas() - elif param == 'averexp': - self.update_averexp(value) - elif param == 'freq': - self.update_freq(value) - elif param == 'voltage': - self.voltage = value + # this typically waits longer than the low pollinterval + # -> after returning, doPoll is called again immediately + reply = self.io.readline() + if reply: + meas = self.get_meas_reply(reply) + if meas: + self.update_meas(**meas) + else: + self.log.warn('unexpected reply: %r', reply) + self.retrigger_meas() + elif self._mode == FINISHED and time.time() > self._cont_deadline: + self._mode = CONTINUOUS + self.status = IDLE, '' + self.single_meas() + elif self.io.sent_command: + # self.io.communicate was called directly + # -> we have to retrigger SI again + if self._mode == CONTINUOUS: + self.single_meas() + elif self._mode == RUNNING: + self.finish(WARN, 'interrupted') def update_freq(self, value): self.freq = value @@ -219,52 +212,50 @@ class AHBase(HasIO, Pinata, Acquisition): self._calculate_time(value, self.freq) def update_meas(self, cap, loss, lossunit, voltage, error, freq=None): - """update given arguments (these are strings!)""" + """update given arguments from a measurement reply (these are strings!)""" self._error = error if self._error: status = WARN, self._error else: status = IDLE, '' if self._mode == CONTINUOUS else 'finished' - if status != self.status: - self.status = status now = time.time() if self._mode == RUNNING: - self._cont_deadline = now + 5 - self._mode = FINISHED + self.finish(*status) + elif status != self.status: + self.status = status if freq: self.freq = float(freq) self._calculate_time(self.averexp, self.freq) self.value = float(cap) self.voltage = float(voltage) if lossunit != self.UNIT: - if self._last_start == 0: - self.log.warning('change unit to %s failed', self.UNIT) - else: - self.io.writeline('UN 2') - # this will trigger a measurement reply - # skip calculation of meas_time while interpreting result - self._last_start = 0 - self.interprete('meas') - self.retrigger_meas() + self.set_lossunit() + self.retrigger_meas() return + self.loss = float(loss) if self._last_start: self.meas_time = now - self._last_start self._last_start = 0 if now > self._averexp_deadline and self._mode == CONTINUOUS: self.verify_averexp() - self.loss = float(loss) + else: + self.retrigger_meas() + + def read_loss(self): + if self._lossunit != self.UNIT: + raise HardwareError(f'bad loss unit: {self._lossunit!r}') + return self.loss def write_voltage(self, value): - return round(self.change('VO', f'{value:.1f}', 'voltage'), 1) + return round(self.change_param('VO', f'{value:.1f}', 'voltage'), 1) def write_averexp(self, value): - self.update_averexp(self.change('AV', f'{value}', 'averexp')) + self.update_averexp(self.change_param('AV', f'{value}', 'averexp')) def verify_averexp(self): # we do not want to use read_averexp for this, # as it will stop the measurement when polled - self.io.writeline('SH AV') - self.update_averexp(self.interprete('averexp')) + self.update_averexp(self.get_param('AV', 'averexp')) def _calculate_time(self, averexp, freq): self.calculated_time = self.calculate_time(averexp, freq) @@ -277,19 +268,25 @@ class AHBase(HasIO, Pinata, Acquisition): # this also makes sure we catch a previous meas reply self.verify_averexp() if self.averexp != self.goal: - self.log.info('changed averexp') self.write_averexp(self.goal) self.status = BUSY, 'started' self.single_meas() self._mode = RUNNING + def finish(self, statuscode, statustext): + self.status = statuscode, statustext + self._mode = FINISHED + self._cont_deadline = time.time() + 5 + def stop(self): """stops measurement""" if self._mode == RUNNING: self.verify_averexp() - self.status = WARN, 'stopped' - self._mode = FINISHED - self._cont_deadline = time.time() + 5 + self.finish(WARN, 'stopped') + + def calculate_time(self, averexp, freq): + """estimate measuring time""" + raise NotImplementedError class Loss(Readable): @@ -323,12 +320,12 @@ class Freq(Writable): class AH2550(AHBase): - PATTERN = [ - r'AVERAGE_AVEREXP *(?P[0-9]*)', - r'VOLTAGE_HIGHEST *(?P[0-9.E+-]+)', - r'FREQUENCY *(?P[0-9.E+-]+)', - ] - MEAS_PAT = ( + PATTERN = { + 'averexp': re.compile(r'AVERAGE_AVEREXP *([0-9]*)'), + 'voltage': re.compile(r'VOLTAGE_HIGHEST *([0-9.E+-]+)'), + 'freq': re.compile(r'FREQUENCY *([0-9.E+-]+)'), + } + MEAS_PAT = re.compile( r'C= *(?P[0-9.E+-]+) *PF,' r'L= *(?P[0-9.E+-]+) *(?P[A-Z]*),' r'V= *(?P[0-9.E+-]+) *V,A,*(?P.*)$' @@ -363,18 +360,18 @@ class AH2700(AHBase): default: not created ''', StringType(), default='') - PATTERN = [ - r'AVERAGE *AVEREXP=(?P[0-9]*)', - r'VOLTAGE HIGHEST *(?P[0-9.E+-]+)', - r'FREQUENCY *(?P[0-9.E+-]+)', - ] - UNIT = 'DS' - MEAS_PAT = ( + PATTERN = { + 'averexp': re.compile(r'AVERAGE *AVEREXP=([0-9]*)'), + 'voltage': re.compile(r'VOLTAGE HIGHEST *([0-9.E+-]+)'), + 'freq': re.compile(r'FREQUENCY *([0-9.E+-]+)'), + } + MEAS_PAT = re.compile( r'F= *(?P[0-9.E+-]+) *HZ ' r'C= *(?P[0-9.E+-]+) *PF ' r'L= *(?P[0-9.E+-]+) *(?P[A-Z]*) ' f'V= *(?P[0-9.E+-]+) *V *(?P.*)$' ) + UNIT = 'DS' def initModule(self): super().initModule() @@ -391,19 +388,21 @@ class AH2700(AHBase): 'cap': self.name} def write_freq(self, value): - self.change('FR', f'{value:g}', 'freq') + self.change_param('FR', f'{value:g}', 'freq') self.update_freq(value) return round(value, 1) + # empirically determined - may vary with noise + # differs drastically from the table in the manual MEAS_TIME_CONST = [ - # (upper freq limit, meas time @ avrexp=7 - 0.8) - (75, 20), - (150, 10), - (270, 5.62), - (550, 2.34), - (1100, 2.73), - (4500, 1.02), - (20000, 0.51), + # (upper freq limit, meas time @ avrexp=7 ) + (75, 20.8), + (150, 10.8), + (270, 6.42), + (550, 3.14), + (1100, 3.53), + (4500, 1.82), + (20000, 1.31), ] @Command(TupleOf(IntRange(0, 15), FloatRange(50, 20000)), @@ -412,7 +411,7 @@ class AH2700(AHBase): """calculate estimated measuring time from time efficiency considerations averexp > 7 is recommended - especially for freq < 550 no time may be saved with averexp <= 7 + especially for freq < 550 no time is saved with averexp <= 7 """ for f, c in self.MEAS_TIME_CONST: if f > freq: @@ -421,9 +420,9 @@ class AH2700(AHBase): else: const = self.MEAS_TIME_CONST[-1][1] if averexp >= 8: - result = 0.8 + const * (0.5 + 2 ** (averexp - 8)) + result = 0.8 + (const - 0.8) * (0.5 + 2 ** (averexp - 8)) elif freq < 550: - result = 0.8 + const + result = const else: - result = 0.6 + 2 ** (averexp - 7) * const + result = 0.6 + 2 ** (averexp - 7) * (const - 0.8) return round(result, 1)