From 308283412e86ff6e67d3194b6bff5557007c7de7 Mon Sep 17 00:00:00 2001 From: Markus Zolliker Date: Wed, 5 Nov 2025 16:37:42 +0100 Subject: [PATCH] frappy_psi.nanov: minor fixes Change-Id: I0c6f1f09358155f49d4dedfe0c254dbe6b514b36 --- frappy_psi/ahcapbridge.py | 306 ++++++++++++++++++++++++++++++++++++++ frappy_psi/nanov.py | 6 +- 2 files changed, 309 insertions(+), 3 deletions(-) create mode 100644 frappy_psi/ahcapbridge.py diff --git a/frappy_psi/ahcapbridge.py b/frappy_psi/ahcapbridge.py new file mode 100644 index 00000000..3d1ad7d4 --- /dev/null +++ b/frappy_psi/ahcapbridge.py @@ -0,0 +1,306 @@ +#!/usr/bin/env python +# ***************************************************************************** +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free Software +# Foundation; either version 2 of the License, or (at your option) any later +# version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# this program; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# Module authors: +# Markus Zolliker +# ***************************************************************************** +"""Andeen Hagerling capacitance bridge + +two modules: the capacitance itself and the loss angle + +in the configuration file, only the capacitance module needs to be configured, +while the loss module will be created automatically. + +the name of the loss module may be configured, or disabled by choosing +an empty name +""" + +import re +import time +import math +from frappy.core import FloatRange, HasIO, Parameter, Readable, StringIO, \ + Attached, Property, StringType, Command, Writable, IntRange, BUSY, IDLE, WARN, ERROR +from frappy.modules import Acquisition +from frappy.dynamic import Pinata +from frappy.errors import ProgrammingError +from frappy.lib import clamp + + +class IO(StringIO): + end_of_line = '\r\n' + timeout = 1 + # default_settings = {'timeout': 0.1} + last_command = None + + # @Command(result=StringType()) + # def readline(self): + # """async read""" + # with self._lock: + # reply = self._conn.readline() + # if reply: + # result = reply.decode(self.encoding) + # self.comLog('< %s', result) + # return result + # return '' + # + # @Command(StringType()) + # def writeline(self, command): + # """no flush before""" + # cmd = command.encode(self.encoding) + # self.check_connection() + # try: + # self.comLog('> %s', command) + # self._conn.send(cmd + self._eol_write) + # self.last_command = command + # except ConnectionClosed: + # self.closeConnection() + # raise CommunicationFailedError('disconnected') from None + # except Exception as e: + # if self._conn is None: + # raise SilentError('disconnected') from None + # if repr(e) != self._last_error: + # self._last_error = repr(e) + # self.log.error(self._last_error) + # raise SilentError(repr(e)) from e + + @Command(StringType(), result=StringType()) + def communicate(self, command, noreply=False): + """communicate and save the last command""" + # this is also called by writeline + self.last_time = time.time() + return super().communicate(command, noreply) + + +class AH2550(HasIO, Pinata, Acquisition): + value = Parameter('capacitance', FloatRange(unit='pF')) + voltage = Parameter('upper voltage limit', + FloatRange(0, 15, unit='V', fmtstr='%.1f'), + readonly=False, default=0) + loss = Parameter('loss', + FloatRange(unit=''), default=0) + averexp = Parameter('base 2 exponent of goal', + IntRange(0, 15), readonly=False, default=0) + goal = Parameter('number of samples to average', + FloatRange(0, 50000, unit='samples'), + readonly=False, default=0) + meas_time = Parameter('measured measuring time', + FloatRange(unit='s', fmtstr='%.1f'), default=0) + loss_module = Property('''name of loss module (default: _loss) + + configure '' to disable the creation of the loss module + ''', + StringType(), default='') + pollinterval = Parameter(export=False, value=0.1) + export = True # for a Pinata module, the default is False! + ioClass = IO + MODEL = 'AH2550' + PATTERN = [ + r'AVERAGE *AVEREXP=(?P[0-9]*)', + r'VOLTAGE HIGHEST *(?P[0-9.E+-]+)', + ] + MEAS_PAT = ( + r'C= *(?P[0-9.E+-]+) *PF ' + r'L= *(?P[0-9.E+-]+) *(?P[A-Z]*) ' + f'V= *(?P[0-9.E+-]+) *V *(?P.*)$' + # must not collide with + ) + COMMANDS = ['AV', 'VO', 'SI' ,'SH'] + _started = 0 + _meas_state = 0 + _todo = None + _error = '' + + def initModule(self): + self.io.setProperty('identification', + [('\r\nSERIAL ECHO OFF;SH MODEL', f'MODEL/OPTIONS *{self.MODEL}')]) + super().initModule() + self.log.info('INIT') + pattern = self.PATTERN + [self.MEAS_PAT] + self.pattern = {p[:2]: re.compile(p) for p in pattern} + if len(self.pattern) != len(pattern): + raise ProgrammingError('need more than two letters to distinguish patterns') + self.echo = re.compile('|'.join(self.COMMANDS)) + # first commands: + # UN DS does also return the resukts of the last measurement (including the frequency for AH2700) + self._todo = {'averexp': 'SH AV', '': 'UN DS'} + + def doPoll(self): + if self.io.last_time: # a command was sent + self._started = 0 # trigger start + reply = self.io.readline(0) + if reply: + # parse reply of (here unknown) command + self.io.last_time = 0 + pattern = self.pattern.get(reply[:2]) + if pattern: + match = pattern.match(reply) + if not match: + self.log.warning('unexpected reply syntax: %r', reply) + return + values = match.groupdict() + if len(values) == 1: + key, value = next(iter(values.items())) + getattr(self, f'update_{key}')(float(value)) + return + self.update_meas(**values) + return + if self.echo.match(reply): + # this is probably an echo + # we may have lost connection, so query again averexp + self.writeline('\r\nSERIAL ECHO OFF;SH AV') + return + self.log.warning('unknown reply %r', reply) + return + now = time.time() + if now < self.io.last_time + 1: + # send no more commands before we have the reply of the previous + return + if self._todo: + # we have parameters to be changed + # this will interrupt the current measurement + command = self._todo.pop(next(iter(self._todo))) + self.io.writeline(command) + return + if self._started: + # we are measuring, the last command was SI + return + # nothing else to do - we start measuring + if self._meas_state == 1: + self._meas_state = 2 + self.status = BUSY, 'measuring' + self.trigger_measurement() + + def trigger_measurement(self): + self._started = time.time() + self.writeline('SI') + self.io.last_time = 0 # do not retrigger again + + def update_meas(self, cap, loss, lossunit, voltage, error, freq=None): + """update given arguments (these are strings!)""" + now = time.time() + self.meas_time = now - self._started + self.value = float(cap) + self.voltage = float(voltage) + self._error = error + if self._meas_state == 2: + self._meas_state = 0 + if self._error: + self.status = WARN, self._error + else: + self.status = IDLE, '' + if lossunit != 'DS': + self.io.writeline('UN DS') # this will trigger a measuremement reply + return + self.loss = float(loss) + self.trigger_measurement() + + def update_lossunit(self, unit): + if unit != 'DS': + self.log.warning('can not change unit to DS') + + def update_averexp(self, averexp): + self.averexp = int(averexp) + self.goal = 2 ** self.averexp + + def scanModules(self): + if self.loss_module: + # if loss_module is not empty, we tell the framework to create + # a module for the loss with this name, and config below + yield self.loss_module.replace('$', self.name), { + 'cls': Loss, + 'description': f'loss value of {self.name}', + 'cap': self.name} + + def write_voltage(self, value): + self._todo['voltage'] = f'VO {value:g};SH VO' + return round(value, 1) + + def write_averexp(self, value): + self._todo['averexp'] = f'AV {value};SH AV' + self.goal = 2 ** value + + def write_goal(self, value): + self.averexp = clamp(0, 15, round(math.log2(value))) + self._todo['averexp'] = f'AV {self.averexp};SH AV' + return 2 ** self.averexp + + def go(self): + """start acquisition""" + self.status = BUSY, 'started' + self._started = 0 # interrupt current measurement + self._meas_state = 1 # retrigger a measurement + + +class Loss(Readable): + cap = Attached() + value = Parameter('loss', FloatRange(unit=''), default=0) + + def initModule(self): + super().initModule() + self.cap.addCallback('loss', self.update_loss) # auto update status + + def update_freq(self, freq): + self.freq = float(freq) + + def update_loss(self, loss): + self.value = float(loss) + + +class Freq(Writable): + cap = Attached() + value = Parameter('', FloatRange(unit='Hz'), default=0) + + def initModule(self): + super().initModule() + self.cap.addCallback('freq', self.update_freq) # auto update status + + def update_freq(self, freq): + self.value = float(freq) + + def write_target(self, target): + self.cap.write_freq(target) + + +class AH2700(AH2550): + MODEL = 'AH2700' + freq = Parameter('frequency', + FloatRange(50, 20000, unit='Hz', fmtstr='%.1f'), + readonly=False, default=50) + freq_module = Property('''name of freq module + + default: not created + ''', + StringType(), default='') + MEAS_PAT = r'F= *(?P[0-9.E+-]+) *HZ ' + AH2550.MEAS_PAT + PATTERN = AH2550.PATTERN + [r'FREQUENCY *(?P[0-9.E+-]+)'] + COMMANDS = AH2550.COMMANDS + ['FR'] + + def scanModules(self): + yield from super().scanModules() + if self.freq_module: + yield self.freq_module.replace('$', self.name), { + 'cls': Freq, + 'description': f'freq module of {self.name}', + 'cap': self.name} + + def write_freq(self, value): + self._todo['freq'] = f'FR {value:g};SH FR' + return round(value, 1) + + def update_meas(self, freq, **kwds): + self.freq = float(freq) + super().update_meas(**kwds) + diff --git a/frappy_psi/nanov.py b/frappy_psi/nanov.py index e2744042..26d5f6fe 100644 --- a/frappy_psi/nanov.py +++ b/frappy_psi/nanov.py @@ -45,18 +45,18 @@ class NanovIO(StringIO): channel = self._channels[idx] reply = float(self.communicate('FETCH?')) if abs(reply) > 1000: - print('BAD', reply) + self.log.warning('bad reply: %s', reply) return channel.value = float(reply) except IndexError: idx = -1 now = time.time() if now < self._last_change + 5 or (idx >= 0 and len(self._channels) == 1): - return idx + return self._last_change = now self._channel_index = idx = (idx + 1) % len(self._channels) self.channel = self._channels[idx].channel - result = self.communicate(';SENS:CHAN %i;:SENS:CHAN?' % self.channel) + self.communicate(';SENS:CHAN %i;:SENS:CHAN?' % self.channel) class Volt(HasIO, Readable):