frappy_psi.nanov: minor fixes

Change-Id: I0c6f1f09358155f49d4dedfe0c254dbe6b514b36
This commit is contained in:
2025-11-05 16:37:42 +01:00
parent ebfb8a005d
commit 308283412e
2 changed files with 309 additions and 3 deletions

306
frappy_psi/ahcapbridge.py Normal file
View File

@@ -0,0 +1,306 @@
#!/usr/bin/env python
# *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
# *****************************************************************************
"""Andeen Hagerling capacitance bridge
two modules: the capacitance itself and the loss angle
in the configuration file, only the capacitance module needs to be configured,
while the loss module will be created automatically.
the name of the loss module may be configured, or disabled by choosing
an empty name
"""
import re
import time
import math
from frappy.core import FloatRange, HasIO, Parameter, Readable, StringIO, \
Attached, Property, StringType, Command, Writable, IntRange, BUSY, IDLE, WARN, ERROR
from frappy.modules import Acquisition
from frappy.dynamic import Pinata
from frappy.errors import ProgrammingError
from frappy.lib import clamp
class IO(StringIO):
end_of_line = '\r\n'
timeout = 1
# default_settings = {'timeout': 0.1}
last_command = None
# @Command(result=StringType())
# def readline(self):
# """async read"""
# with self._lock:
# reply = self._conn.readline()
# if reply:
# result = reply.decode(self.encoding)
# self.comLog('< %s', result)
# return result
# return ''
#
# @Command(StringType())
# def writeline(self, command):
# """no flush before"""
# cmd = command.encode(self.encoding)
# self.check_connection()
# try:
# self.comLog('> %s', command)
# self._conn.send(cmd + self._eol_write)
# self.last_command = command
# except ConnectionClosed:
# self.closeConnection()
# raise CommunicationFailedError('disconnected') from None
# except Exception as e:
# if self._conn is None:
# raise SilentError('disconnected') from None
# if repr(e) != self._last_error:
# self._last_error = repr(e)
# self.log.error(self._last_error)
# raise SilentError(repr(e)) from e
@Command(StringType(), result=StringType())
def communicate(self, command, noreply=False):
"""communicate and save the last command"""
# this is also called by writeline
self.last_time = time.time()
return super().communicate(command, noreply)
class AH2550(HasIO, Pinata, Acquisition):
value = Parameter('capacitance', FloatRange(unit='pF'))
voltage = Parameter('upper voltage limit',
FloatRange(0, 15, unit='V', fmtstr='%.1f'),
readonly=False, default=0)
loss = Parameter('loss',
FloatRange(unit=''), default=0)
averexp = Parameter('base 2 exponent of goal',
IntRange(0, 15), readonly=False, default=0)
goal = Parameter('number of samples to average',
FloatRange(0, 50000, unit='samples'),
readonly=False, default=0)
meas_time = Parameter('measured measuring time',
FloatRange(unit='s', fmtstr='%.1f'), default=0)
loss_module = Property('''name of loss module (default: <name>_loss)
configure '' to disable the creation of the loss module
''',
StringType(), default='')
pollinterval = Parameter(export=False, value=0.1)
export = True # for a Pinata module, the default is False!
ioClass = IO
MODEL = 'AH2550'
PATTERN = [
r'AVERAGE *AVEREXP=(?P<averexp>[0-9]*)',
r'VOLTAGE HIGHEST *(?P<voltage>[0-9.E+-]+)',
]
MEAS_PAT = (
r'C= *(?P<cap>[0-9.E+-]+) *PF '
r'L= *(?P<loss>[0-9.E+-]+) *(?P<lossunit>[A-Z]*) '
f'V= *(?P<voltage>[0-9.E+-]+) *V *(?P<error>.*)$'
# <volt> must not collide with <voltage>
)
COMMANDS = ['AV', 'VO', 'SI' ,'SH']
_started = 0
_meas_state = 0
_todo = None
_error = ''
def initModule(self):
self.io.setProperty('identification',
[('\r\nSERIAL ECHO OFF;SH MODEL', f'MODEL/OPTIONS *{self.MODEL}')])
super().initModule()
self.log.info('INIT')
pattern = self.PATTERN + [self.MEAS_PAT]
self.pattern = {p[:2]: re.compile(p) for p in pattern}
if len(self.pattern) != len(pattern):
raise ProgrammingError('need more than two letters to distinguish patterns')
self.echo = re.compile('|'.join(self.COMMANDS))
# first commands:
# UN DS does also return the resukts of the last measurement (including the frequency for AH2700)
self._todo = {'averexp': 'SH AV', '<unit>': 'UN DS'}
def doPoll(self):
if self.io.last_time: # a command was sent
self._started = 0 # trigger start
reply = self.io.readline(0)
if reply:
# parse reply of (here unknown) command
self.io.last_time = 0
pattern = self.pattern.get(reply[:2])
if pattern:
match = pattern.match(reply)
if not match:
self.log.warning('unexpected reply syntax: %r', reply)
return
values = match.groupdict()
if len(values) == 1:
key, value = next(iter(values.items()))
getattr(self, f'update_{key}')(float(value))
return
self.update_meas(**values)
return
if self.echo.match(reply):
# this is probably an echo
# we may have lost connection, so query again averexp
self.writeline('\r\nSERIAL ECHO OFF;SH AV')
return
self.log.warning('unknown reply %r', reply)
return
now = time.time()
if now < self.io.last_time + 1:
# send no more commands before we have the reply of the previous
return
if self._todo:
# we have parameters to be changed
# this will interrupt the current measurement
command = self._todo.pop(next(iter(self._todo)))
self.io.writeline(command)
return
if self._started:
# we are measuring, the last command was SI
return
# nothing else to do - we start measuring
if self._meas_state == 1:
self._meas_state = 2
self.status = BUSY, 'measuring'
self.trigger_measurement()
def trigger_measurement(self):
self._started = time.time()
self.writeline('SI')
self.io.last_time = 0 # do not retrigger again
def update_meas(self, cap, loss, lossunit, voltage, error, freq=None):
"""update given arguments (these are strings!)"""
now = time.time()
self.meas_time = now - self._started
self.value = float(cap)
self.voltage = float(voltage)
self._error = error
if self._meas_state == 2:
self._meas_state = 0
if self._error:
self.status = WARN, self._error
else:
self.status = IDLE, ''
if lossunit != 'DS':
self.io.writeline('UN DS') # this will trigger a measuremement reply
return
self.loss = float(loss)
self.trigger_measurement()
def update_lossunit(self, unit):
if unit != 'DS':
self.log.warning('can not change unit to DS')
def update_averexp(self, averexp):
self.averexp = int(averexp)
self.goal = 2 ** self.averexp
def scanModules(self):
if self.loss_module:
# if loss_module is not empty, we tell the framework to create
# a module for the loss with this name, and config below
yield self.loss_module.replace('$', self.name), {
'cls': Loss,
'description': f'loss value of {self.name}',
'cap': self.name}
def write_voltage(self, value):
self._todo['voltage'] = f'VO {value:g};SH VO'
return round(value, 1)
def write_averexp(self, value):
self._todo['averexp'] = f'AV {value};SH AV'
self.goal = 2 ** value
def write_goal(self, value):
self.averexp = clamp(0, 15, round(math.log2(value)))
self._todo['averexp'] = f'AV {self.averexp};SH AV'
return 2 ** self.averexp
def go(self):
"""start acquisition"""
self.status = BUSY, 'started'
self._started = 0 # interrupt current measurement
self._meas_state = 1 # retrigger a measurement
class Loss(Readable):
cap = Attached()
value = Parameter('loss', FloatRange(unit=''), default=0)
def initModule(self):
super().initModule()
self.cap.addCallback('loss', self.update_loss) # auto update status
def update_freq(self, freq):
self.freq = float(freq)
def update_loss(self, loss):
self.value = float(loss)
class Freq(Writable):
cap = Attached()
value = Parameter('', FloatRange(unit='Hz'), default=0)
def initModule(self):
super().initModule()
self.cap.addCallback('freq', self.update_freq) # auto update status
def update_freq(self, freq):
self.value = float(freq)
def write_target(self, target):
self.cap.write_freq(target)
class AH2700(AH2550):
MODEL = 'AH2700'
freq = Parameter('frequency',
FloatRange(50, 20000, unit='Hz', fmtstr='%.1f'),
readonly=False, default=50)
freq_module = Property('''name of freq module
default: not created
''',
StringType(), default='')
MEAS_PAT = r'F= *(?P<freq>[0-9.E+-]+) *HZ ' + AH2550.MEAS_PAT
PATTERN = AH2550.PATTERN + [r'FREQUENCY *(?P<freq>[0-9.E+-]+)']
COMMANDS = AH2550.COMMANDS + ['FR']
def scanModules(self):
yield from super().scanModules()
if self.freq_module:
yield self.freq_module.replace('$', self.name), {
'cls': Freq,
'description': f'freq module of {self.name}',
'cap': self.name}
def write_freq(self, value):
self._todo['freq'] = f'FR {value:g};SH FR'
return round(value, 1)
def update_meas(self, freq, **kwds):
self.freq = float(freq)
super().update_meas(**kwds)

View File

@@ -45,18 +45,18 @@ class NanovIO(StringIO):
channel = self._channels[idx]
reply = float(self.communicate('FETCH?'))
if abs(reply) > 1000:
print('BAD', reply)
self.log.warning('bad reply: %s', reply)
return
channel.value = float(reply)
except IndexError:
idx = -1
now = time.time()
if now < self._last_change + 5 or (idx >= 0 and len(self._channels) == 1):
return idx
return
self._last_change = now
self._channel_index = idx = (idx + 1) % len(self._channels)
self.channel = self._channels[idx].channel
result = self.communicate(';SENS:CHAN %i;:SENS:CHAN?' % self.channel)
self.communicate(';SENS:CHAN %i;:SENS:CHAN?' % self.channel)
class Volt(HasIO, Readable):