#!/usr/bin/env python # ***************************************************************************** # This program is free software; you can redistribute it and/or modify it under # the terms of the GNU General Public License as published by the Free Software # Foundation; either version 2 of the License, or (at your option) any later # version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more # details. # # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # Module authors: # Markus Zolliker # ***************************************************************************** """Andeen Hagerling capacitance bridge two modules: the capacitance itself and the loss angle in the configuration file, only the capacitance module needs to be configured, while the loss module will be created automatically. the name of the loss module may be configured, or disabled by choosing an empty name """ import re import time import math from frappy.core import FloatRange, HasIO, Parameter, Readable, StringIO, \ Attached, Property, StringType, Command, Writable, IntRange, BUSY, IDLE, WARN, ERROR from frappy.modules import Acquisition from frappy.dynamic import Pinata from frappy.errors import ProgrammingError from frappy.lib import clamp class IO(StringIO): end_of_line = '\r\n' timeout = 1 # default_settings = {'timeout': 0.1} last_command = None # @Command(result=StringType()) # def readline(self): # """async read""" # with self._lock: # reply = self._conn.readline() # if reply: # result = reply.decode(self.encoding) # self.comLog('< %s', result) # return result # return '' # # @Command(StringType()) # def writeline(self, command): # """no flush before""" # cmd = command.encode(self.encoding) # self.check_connection() # try: # self.comLog('> %s', command) # self._conn.send(cmd + self._eol_write) # self.last_command = command # except ConnectionClosed: # self.closeConnection() # raise CommunicationFailedError('disconnected') from None # except Exception as e: # if self._conn is None: # raise SilentError('disconnected') from None # if repr(e) != self._last_error: # self._last_error = repr(e) # self.log.error(self._last_error) # raise SilentError(repr(e)) from e @Command(StringType(), result=StringType()) def communicate(self, command, noreply=False): """communicate and save the last command""" # this is also called by writeline self.last_time = time.time() return super().communicate(command, noreply) class AH2550(HasIO, Pinata, Acquisition): value = Parameter('capacitance', FloatRange(unit='pF')) voltage = Parameter('upper voltage limit', FloatRange(0, 15, unit='V', fmtstr='%.1f'), readonly=False, default=0) loss = Parameter('loss', FloatRange(unit=''), default=0) averexp = Parameter('base 2 exponent of goal', IntRange(0, 15), readonly=False, default=0) goal = Parameter('number of samples to average', FloatRange(0, 50000, unit='samples'), readonly=False, default=0) meas_time = Parameter('measured measuring time', FloatRange(unit='s', fmtstr='%.1f'), default=0) loss_module = Property('''name of loss module (default: _loss) configure '' to disable the creation of the loss module ''', StringType(), default='') pollinterval = Parameter(export=False, value=0.1) export = True # for a Pinata module, the default is False! ioClass = IO MODEL = 'AH2550' PATTERN = [ r'AVERAGE *AVEREXP=(?P[0-9]*)', r'VOLTAGE HIGHEST *(?P[0-9.E+-]+)', ] MEAS_PAT = ( r'C= *(?P[0-9.E+-]+) *PF ' r'L= *(?P[0-9.E+-]+) *(?P[A-Z]*) ' f'V= *(?P[0-9.E+-]+) *V *(?P.*)$' # must not collide with ) COMMANDS = ['AV', 'VO', 'SI' ,'SH'] _started = 0 _meas_state = 0 _todo = None _error = '' def initModule(self): self.io.setProperty('identification', [('\r\nSERIAL ECHO OFF;SH MODEL', f'MODEL/OPTIONS *{self.MODEL}')]) super().initModule() self.log.info('INIT') pattern = self.PATTERN + [self.MEAS_PAT] self.pattern = {p[:2]: re.compile(p) for p in pattern} if len(self.pattern) != len(pattern): raise ProgrammingError('need more than two letters to distinguish patterns') self.echo = re.compile('|'.join(self.COMMANDS)) # first commands: # UN DS does also return the resukts of the last measurement (including the frequency for AH2700) self._todo = {'averexp': 'SH AV', '': 'UN DS'} def doPoll(self): if self.io.last_time: # a command was sent self._started = 0 # trigger start reply = self.io.readline(0) if reply: # parse reply of (here unknown) command self.io.last_time = 0 pattern = self.pattern.get(reply[:2]) if pattern: match = pattern.match(reply) if not match: self.log.warning('unexpected reply syntax: %r', reply) return values = match.groupdict() if len(values) == 1: key, value = next(iter(values.items())) getattr(self, f'update_{key}')(float(value)) return self.update_meas(**values) return if self.echo.match(reply): # this is probably an echo # we may have lost connection, so query again averexp self.writeline('\r\nSERIAL ECHO OFF;SH AV') return self.log.warning('unknown reply %r', reply) return now = time.time() if now < self.io.last_time + 1: # send no more commands before we have the reply of the previous return if self._todo: # we have parameters to be changed # this will interrupt the current measurement command = self._todo.pop(next(iter(self._todo))) self.io.writeline(command) return if self._started: # we are measuring, the last command was SI return # nothing else to do - we start measuring if self._meas_state == 1: self._meas_state = 2 self.status = BUSY, 'measuring' self.trigger_measurement() def trigger_measurement(self): self._started = time.time() self.writeline('SI') self.io.last_time = 0 # do not retrigger again def update_meas(self, cap, loss, lossunit, voltage, error, freq=None): """update given arguments (these are strings!)""" now = time.time() self.meas_time = now - self._started self.value = float(cap) self.voltage = float(voltage) self._error = error if self._meas_state == 2: self._meas_state = 0 if self._error: self.status = WARN, self._error else: self.status = IDLE, '' if lossunit != 'DS': self.io.writeline('UN DS') # this will trigger a measuremement reply return self.loss = float(loss) self.trigger_measurement() def update_lossunit(self, unit): if unit != 'DS': self.log.warning('can not change unit to DS') def update_averexp(self, averexp): self.averexp = int(averexp) self.goal = 2 ** self.averexp def scanModules(self): if self.loss_module: # if loss_module is not empty, we tell the framework to create # a module for the loss with this name, and config below yield self.loss_module.replace('$', self.name), { 'cls': Loss, 'description': f'loss value of {self.name}', 'cap': self.name} def write_voltage(self, value): self._todo['voltage'] = f'VO {value:g};SH VO' return round(value, 1) def write_averexp(self, value): self._todo['averexp'] = f'AV {value};SH AV' self.goal = 2 ** value def write_goal(self, value): self.averexp = clamp(0, 15, round(math.log2(value))) self._todo['averexp'] = f'AV {self.averexp};SH AV' return 2 ** self.averexp def go(self): """start acquisition""" self.status = BUSY, 'started' self._started = 0 # interrupt current measurement self._meas_state = 1 # retrigger a measurement class Loss(Readable): cap = Attached() value = Parameter('loss', FloatRange(unit=''), default=0) def initModule(self): super().initModule() self.cap.addCallback('loss', self.update_loss) # auto update status def update_freq(self, freq): self.freq = float(freq) def update_loss(self, loss): self.value = float(loss) class Freq(Writable): cap = Attached() value = Parameter('', FloatRange(unit='Hz'), default=0) def initModule(self): super().initModule() self.cap.addCallback('freq', self.update_freq) # auto update status def update_freq(self, freq): self.value = float(freq) def write_target(self, target): self.cap.write_freq(target) class AH2700(AH2550): MODEL = 'AH2700' freq = Parameter('frequency', FloatRange(50, 20000, unit='Hz', fmtstr='%.1f'), readonly=False, default=50) freq_module = Property('''name of freq module default: not created ''', StringType(), default='') MEAS_PAT = r'F= *(?P[0-9.E+-]+) *HZ ' + AH2550.MEAS_PAT PATTERN = AH2550.PATTERN + [r'FREQUENCY *(?P[0-9.E+-]+)'] COMMANDS = AH2550.COMMANDS + ['FR'] def scanModules(self): yield from super().scanModules() if self.freq_module: yield self.freq_module.replace('$', self.name), { 'cls': Freq, 'description': f'freq module of {self.name}', 'cap': self.name} def write_freq(self, value): self._todo['freq'] = f'FR {value:g};SH FR' return round(value, 1) def update_meas(self, freq, **kwds): self.freq = float(freq) super().update_meas(**kwds)