Files
frappy/frappy_psi/ahcapbridge.py
Markus Zolliker dfb8037a65 frappy_psi.ahcapbridge: fix loss unit mechanism
Change-Id: Iba4ca4988146a71a01dcc6dbe911be5588bfe292
2025-11-21 07:58:24 +01:00

451 lines
16 KiB
Python

# *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
# *****************************************************************************
"""Andeen Hagerling capacitance bridge
The speciality of this capacitance bridge is, that a measurement might take
between factions of seconds up to more than half an hour.
creates up to two additional modules for 'loss' and 'freq'
in the configuration file, only the capacitance module needs to be configured,
while the loss module will be created automatically.
the name of the loss (and freq) module may be configured, or disabled by
choosing an empty name
"""
import re
import time
import threading
from frappy.core import HasIO, Parameter, Readable, StringIO, \
Attached, Property, Command, Writable, BUSY, IDLE, WARN
from frappy.datatypes import FloatRange, IntRange, StringType, TupleOf
from frappy.modules import Acquisition
from frappy.dynamic import Pinata
from frappy.errors import IsBusyError, CommunicationFailedError, HardwareError
CONTINUOUS = 0
STARTING = 1
RUNNING = 2
FINISHED = 3
class IO(StringIO):
end_of_line = ('\r\n', '\r')
timeout = 5
sent_command = False # used to detect that communicate was called directly
ECHO = re.compile('>|AV |VO |FR |SI |SH ') # this is recognized as an echo
MEAS = None # overriden by the module
@Command(StringType(), result=StringType())
def communicate(self, command, noreply=False):
"""communicate and remind that a command was sent"""
# this is also called by writeline
self.sent_command = True
for _ in range(3):
reply = super().communicate(command, noreply)
reply = reply and reply.strip()
if self.check_echo_off(reply):
return reply
raise CommunicationFailedError('detected echo but can not switch off')
def check_echo_off(self, reply):
if self.ECHO.match(reply or ''):
super().writeline('\rSERIAL ECHO OFF;UN 2')
for _ in range(3):
reply = self.readline()
if self.MEAS.match(reply or ''):
# this is a meas reply
break
if reply == 'NO DATA FOUND':
break
return False
return True
class AHBase(HasIO, Pinata, Acquisition):
value = Parameter('capacitance', FloatRange(unit='pF'))
freq = Parameter('frequency', FloatRange(unit='Hz'), default=1000)
voltage = Parameter('upper voltage limit',
FloatRange(0, 15, unit='V', fmtstr='%.1f'),
readonly=False, default=0)
loss = Parameter('loss',
FloatRange(unit=''), default=0)
averexp = Parameter('average exponent - roughly log2 of number of samples averaged',
IntRange(0, 15), readonly=False, default=0)
goal = Parameter('value for averexp for the next go()',
IntRange(0, 15), readonly=False, default=0)
meas_time = Parameter('measured measuring time',
FloatRange(unit='s', fmtstr='%.4g'), default=0)
calculated_time = Parameter('calculated measuring time',
FloatRange(unit='s', fmtstr='%.4g'), default=0)
loss_module = Property(
'''name of loss module (default: <name>_loss)
configure '' to disable the creation of the loss module
''', StringType(), default='')
pollinterval = Parameter('minimum pollinterval - the polling rate is determined by averaging',
value=0.1)
export = True # for a Pinata module, the default is False!
ioClass = IO
_error = ''
_last_start = None
_params = None
_mode = CONTINUOUS # or RUNNING or FINISHED
_cont_deadline = 0 # when to switch back to continuous after finished
_averexp_deadline = 0 # to make sure averexp is polled periodically
_lossunit = 'undefined'
# to be overridden:
PATTERN = None # a list of patterns to parse replies
MEAS_PAT = None # the pattern to parse the measurement reply
UNIT = None # our desired loss unit
MODEL_PAT = None
MODEL = None
def scanModules(self):
if self.loss_module:
# if loss_module is not empty, we tell the framework to create
# a module for the loss with this name, and config below
yield self.loss_module.replace('$', self.name), {
'cls': Loss,
'description': f'loss value of {self.name}',
'cap': self.name}
def initModule(self):
super().initModule()
self._lock = threading.RLock()
self.io.MEAS = self.MEAS_PAT
self.io.checkHWIdent = self.checkHWIdent
def checkHWIdent(self):
for _ in range(3):
if self.MODEL_PAT.match(self.communicate('SH MODEL')):
return
raise CommunicationFailedError(f'we are not connected to a {self.MODEL}')
def initialReads(self):
# UN 2 does also return the results of the last measurement
# (including the frequency for AH2700)
self.freq = self.get_param('FR', 'freq')
self.set_lossunit()
self.verify_averexp()
self.goal = self.averexp
self.single_meas()
def communicate(self, command):
reply = self.io.communicate(command)
self.io.sent_command = False
return reply
def set_lossunit(self):
self._lossunit = self.UNIT
reply = self.communicate('UN 2')
# this should be a measurement reply
mdict = self.get_meas_reply(reply)
unit = mdict.get('lossunit', 'undefined')
if unit == self.UNIT:
if self._lossunit != self.UNIT:
self.log.warn('changed loss unit from %r to %r', self._lossunit, self.UNIT)
else:
self.log.warn('bad reply for UN 2: %r', reply)
self._lossunit = unit
def change_param(self, short, value, param):
if self._mode == RUNNING:
raise IsBusyError('can not change parameters while measuring')
with self._lock:
for _ in range(3):
reply = self.communicate(f'{short} {value};SH {short}')
match = self.PATTERN[param].match(reply)
if match:
result = match.group(1)
self.retrigger_meas()
return float(result)
self.retrigger_meas()
raise CommunicationFailedError(f'can not change {param} to {value}')
def get_param(self, short, param):
with self._lock:
for _ in range(3):
reply = self.communicate(f'SH {short}')
match = self.PATTERN[param].match(reply)
if match:
result = match.group(1)
self.retrigger_meas()
return float(result)
self.retrigger_meas()
raise CommunicationFailedError(f'can not get {param}')
def retrigger_meas(self):
if self._mode == CONTINUOUS:
self.single_meas()
def single_meas(self):
self._last_start = time.time()
self.io.writeline('SI')
self.io.sent_command = False
def get_meas_reply(self, reply):
match = self.MEAS_PAT.match(reply)
if match:
return match.groupdict()
return {}
def doPoll(self):
# this typically waits longer than the low pollinterval
# -> after returning, doPoll is called again immediately
reply = self.io.readline()
if reply:
meas = self.get_meas_reply(reply)
if meas:
self.update_meas(**meas)
else:
self.io.check_echo_off(reply)
self.retrigger_meas()
elif self._mode == FINISHED and time.time() > self._cont_deadline:
self._mode = CONTINUOUS
self.status = IDLE, ''
self.single_meas()
elif self.io.sent_command:
# self.io.communicate was called directly
# -> we have to retrigger SI again
if self._mode == CONTINUOUS:
self.single_meas()
elif self._mode == RUNNING:
self.finish(WARN, 'interrupted')
def update_freq(self, value):
self.freq = value
self._calculate_time(self.averexp, value)
def update_averexp(self, value):
self.averexp = value
self._averexp_deadline = time.time() + 15
self._calculate_time(value, self.freq)
def update_meas(self, cap, loss, lossunit, voltage, error, freq=None):
"""update given arguments from a measurement reply (these are strings!)"""
self._error = error
if self._error:
status = WARN, self._error
else:
status = IDLE, '' if self._mode == CONTINUOUS else 'finished'
now = time.time()
if self._mode == RUNNING:
self.finish(*status)
elif status != self.status:
self.status = status
if freq:
self.freq = float(freq)
self._calculate_time(self.averexp, self.freq)
self.value = float(cap)
self.voltage = float(voltage)
if lossunit != self.UNIT:
self.set_lossunit()
self.retrigger_meas()
return
self.loss = float(loss)
if self._last_start:
self.meas_time = now - self._last_start
self._last_start = 0
if now > self._averexp_deadline and self._mode == CONTINUOUS:
self.verify_averexp()
else:
self.retrigger_meas()
def read_loss(self):
if self._lossunit != self.UNIT:
raise HardwareError(f'bad loss unit: {self._lossunit!r}')
return self.loss
def write_voltage(self, value):
return round(self.change_param('VO', f'{value:.1f}', 'voltage'), 1)
def write_averexp(self, value):
self.update_averexp(self.change_param('AV', f'{value}', 'averexp'))
def verify_averexp(self):
# we do not want to use read_averexp for this,
# as it will stop the measurement when polled
self.update_averexp(self.get_param('AV', 'averexp'))
def _calculate_time(self, averexp, freq):
self.calculated_time = self.calculate_time(averexp, freq)
def go(self):
"""start acquisition"""
prevmode = self._mode
self._mode = STARTING
if prevmode != FINISHED or time.time() > self._averexp_deadline:
# this also makes sure we catch a previous meas reply
self.verify_averexp()
if self.averexp != self.goal:
self.write_averexp(self.goal)
self.status = BUSY, 'started'
self.single_meas()
self._mode = RUNNING
def finish(self, statuscode, statustext):
self.status = statuscode, statustext
self._mode = FINISHED
self._cont_deadline = time.time() + 5
def stop(self):
"""stops measurement"""
if self._mode == RUNNING:
self.verify_averexp()
self.finish(WARN, 'stopped')
def calculate_time(self, averexp, freq):
"""estimate measuring time"""
raise NotImplementedError
class Loss(Readable):
cap = Attached()
value = Parameter('loss', FloatRange(unit=''), default=0)
def initModule(self):
super().initModule()
self.cap.addCallback('loss', self.update_loss) # auto update status
def update_freq(self, freq):
self.freq = float(freq)
def update_loss(self, loss):
self.value = float(loss)
class Freq(Writable):
cap = Attached()
value = Parameter('', FloatRange(unit='Hz'), default=0)
def initModule(self):
super().initModule()
self.cap.addCallback('freq', self.update_freq) # auto update status
def update_freq(self, freq):
self.value = freq
def write_target(self, target):
self.cap.write_freq(target)
class AH2550(AHBase):
PATTERN = {
'averexp': re.compile(r'AVERAGE_AVEREXP *([0-9]*)'),
'voltage': re.compile(r'VOLTAGE_HIGHEST *([0-9.E+-]+)'),
'freq': re.compile(r'FREQUENCY *([0-9.E+-]+)'),
}
MEAS_PAT = re.compile(
r'C= *(?P<cap>[0-9.E+-]+) *PF,'
r'L= *(?P<loss>[0-9.E+-]+) *(?P<lossunit>[A-Z]*),'
r'V= *(?P<voltage>[0-9.E+-]+) *V,A,*(?P<error>.*)$'
)
UNIT = 'DF'
MODEL_PAT = re.compile('ILLEGAL WORD: MODEL')
MODEL = 'AH2550'
# empirically determined - may vary with noise
# differs drastically from the table in the manual
MEAS_TIME_CONST = [0.2, 0.3, 0.4, 1.0, 1.3, 1.6, 2.2, 3.3,
5.5, 8.3, 14, 25, 47, 91, 180, 360]
def _calculate_time(self, averexp, freq):
self.calculated_time = self.calculate_time(averexp)
@Command(TupleOf(IntRange(0, 15)), result=FloatRange())
def calculate_time(self, averexp):
"""calculate estimated measuring time"""
return self.MEAS_TIME_CONST[int(averexp)]
class AH2700(AHBase):
freq = Parameter(datatype=FloatRange(50, 20000, unit='Hz', fmtstr='%.1f'),
readonly=False)
freq_module = Property('''name of freq module
default: not created
''',
StringType(), default='')
PATTERN = {
'averexp': re.compile(r'AVERAGE *AVEREXP=([0-9]*)'),
'voltage': re.compile(r'VOLTAGE HIGHEST *([0-9.E+-]+)'),
'freq': re.compile(r'FREQUENCY *([0-9.E+-]+)'),
}
MEAS_PAT = re.compile(
r'F= *(?P<freq>[0-9.E+-]+) *HZ '
r'C= *(?P<cap>[0-9.E+-]+) *PF '
r'L= *(?P<loss>[0-9.E+-]+) *(?P<lossunit>[A-Z]*) '
f'V= *(?P<voltage>[0-9.E+-]+) *V *(?P<error>.*)$'
)
UNIT = 'DS'
MODEL_PAT = re.compile('MODEL/OPTIONS *AH2700')
MODEL = 'AH2700'
def scanModules(self):
yield from super().scanModules()
if self.freq_module:
yield self.freq_module.replace('$', self.name), {
'cls': Freq,
'description': f'freq module of {self.name}',
'cap': self.name}
def write_freq(self, value):
self.change_param('FR', f'{value:g}', 'freq')
self.update_freq(value)
return round(value, 1)
# empirically determined - may vary with noise
# differs drastically from the table in the manual
MEAS_TIME_CONST = [
# (upper freq limit, meas time @ avrexp=7 )
(75, 20.8),
(150, 10.8),
(270, 6.42),
(550, 3.14),
(1100, 3.53),
(4500, 1.82),
(20000, 1.31),
]
@Command(TupleOf(IntRange(0, 15), FloatRange(50, 20000)),
result=FloatRange())
def calculate_time(self, averexp, freq):
"""calculate estimated measuring time
from time efficiency considerations averexp > 7 is recommended
especially for freq < 550 no time is saved with averexp <= 7
"""
for f, c in self.MEAS_TIME_CONST:
if f > freq:
const = c
break
else:
const = self.MEAS_TIME_CONST[-1][1]
if averexp >= 8:
result = 0.8 + (const - 0.8) * (0.5 + 2 ** (averexp - 8))
elif freq < 550:
result = const
else:
result = 0.6 + 2 ** (averexp - 7) * (const - 0.8)
return round(result, 1)