diff --git a/frappy_psi/mercury.py b/frappy_psi/mercury.py index 304f2a1..2ffade5 100644 --- a/frappy_psi/mercury.py +++ b/frappy_psi/mercury.py @@ -28,7 +28,7 @@ import time from frappy.core import Drivable, HasIO, Writable, \ Parameter, Property, Readable, StringIO, Attached, IDLE, nopoll from frappy.datatypes import EnumType, FloatRange, StringType, StructOf, BoolType, TupleOf -from frappy.errors import HardwareError +from frappy.errors import HardwareError, ProgrammingError, ConfigError from frappy_psi.convergence import HasConvergence from frappy.lib.enum import Enum @@ -66,39 +66,43 @@ class IO(StringIO): class MercuryChannel(HasIO): - slot = Property('''slot uids - - example: DB6.T1,DB1.H1 - slot ids for sensor (and control output)''', - StringType()) - channel_type = '' #: channel type(s) for sensor (and control) e.g. TEMP,HTR or PRES,AUX + slot = Property('comma separated slot id(s), e.g. DB6.T1', StringType()) + kind = '' #: used slot kind(s) + slots = () #: dict[] of + + def earlyInit(self): + super().earlyInit() + self.kinds = self.kind.split(',') + slots = self.slot.split(',') + if len(slots) != len(self.kinds): + raise ConfigError(f'slot needs {len(self.kinds)} comma separated names') + self.slots = dict(zip(self.kinds, slots)) + self.setProperty('slot', slots[0]) def _complete_adr(self, adr): - """complete address from channel_type and slot""" - head, sep, tail = adr.partition(':') - for i, (channel_type, slot) in enumerate(zip(self.channel_type.split(','), self.slot.split(','))): - if head == str(i): - return f'DEV:{slot}:{channel_type}{sep}{tail}' - if head == channel_type: - return f'DEV:{slot}:{head}{sep}{tail}' + """insert slot to adr""" + spl = adr.split(':') + if spl[0] == 'DEV': + if spl[1] == '': + spl[1] = self.slots[spl[2]] + return ':'.join(spl) + elif spl[0] != 'SYS': + raise ProgrammingError('using old style adr?') return adr def multiquery(self, adr, names=(), convert=as_float, debug=None): """get parameter(s) in mercury syntax :param adr: the 'address part' of the SCPI command - the DEV: is added automatically, when adr starts with the channel type - in addition, when adr starts with '0:' or '1:', channel type and slot are added + READ: is added automatically, slot is inserted when adr starts with DEV:: :param names: the SCPI names of the parameter(s), for example ['TEMP'] :param convert: a converter function (converts replied string to value) :return: the values as tuple - Example: - adr='AUX:SIG' + Example (kind=PRES,AUX slot=DB5.P1,DB3.G1): + adr='DEV::AUX:SIG' names = ('PERC',) - self.channel_type='PRES,AUX' # adr starts with 'AUX' - self.slot='DB5.P1,DB3.G1' # -> take second slot - -> query command will be READ:DEV:DB3.G1:PRES:SIG:PERC + -> query command will be READ:DEV:DB3.G1:AUX:SIG:PERC """ # TODO: if the need arises: allow convert to be a list adr = self._complete_adr(adr) @@ -125,18 +129,16 @@ class MercuryChannel(HasIO): def multichange(self, adr, values, convert=as_float, tolerance=0, n_retry=3): """set parameter(s) in mercury syntax - :param adr: as in see multiquery method + :param adr: as in multiquery method. SET: is added automatically :param values: [(name1, value1), (name2, value2) ...] :param convert: a converter function (converts given value to string and replied string to value) :param tolerance: tolerance for readback check :param n_retry: number of retries or 0 for no readback check :return: the values as tuple - Example: - adr='0:LOOP' + Example (kind=TEMP, slot=DB6.T1: + adr='DEV::TEMP:LOOP' values = [('P', 5), ('I', 2), ('D', 0)] - self.channel_type='TEMP,HTR' # adr starts with 0: take TEMP - self.slot='DB6.T1,DB1.H1' # and take first slot -> change command will be SET:DEV:DB6.T1:TEMP:LOOP:P:5:I:2:D:0 """ # TODO: if the need arises: allow convert and or tolerance to be a list @@ -178,7 +180,7 @@ class MercuryChannel(HasIO): def query(self, adr, convert=as_float): """query a single parameter - 'adr' and 'convert' areg + 'adr' and 'convert' as in multiquery """ adr, _, name = adr.rpartition(':') return self.multiquery(adr, [name], convert)[0] @@ -189,15 +191,15 @@ class MercuryChannel(HasIO): class TemperatureSensor(MercuryChannel, Readable): - channel_type = 'TEMP' + kind = 'TEMP' value = Parameter(unit='K') raw = Parameter('raw value', FloatRange(unit='Ohm')) def read_value(self): - return self.query('TEMP:SIG:TEMP') + return self.query('DEV::TEMP:SIG:TEMP') def read_raw(self): - return self.query('TEMP:SIG:RES') + return self.query('DEV::TEMP:SIG:RES') class HasInput(MercuryChannel): @@ -270,18 +272,18 @@ class Loop(HasConvergence, MercuryChannel, Drivable): self.start_state() def read_enable_pid_table(self): - return self.query('0:LOOP:PIDT', off_on) + return self.query(f'DEV::{self.kinds[0]}:LOOP:PIDT', off_on) def write_enable_pid_table(self, value): - return self.change('0:LOOP:PIDT', value, off_on) + return self.change(f'DEV::{self.kinds[0]}:LOOP:PIDT', value, off_on) def read_ctrlpars(self): # read all in one go, in order to reduce comm. traffic - pid = self.multiquery('0:LOOP', ('P', 'I', 'D')) + pid = self.multiquery(f'DEV::{self.kinds[0]}:LOOP', ('P', 'I', 'D')) return {k: float(v) for k, v in zip('pid', pid)} def write_ctrlpars(self, value): - pid = self.multichange('0:LOOP', [(k, value[k.lower()]) for k in 'PID']) + pid = self.multichange(f'DEV::{self.kinds[0]}:LOOP', [(k, value[k.lower()]) for k in 'PID']) return {k.lower(): v for k, v in zip('PID', pid)} @@ -293,7 +295,7 @@ class HeaterOutput(HasInput, MercuryChannel, Writable): resistivity. As the measured heater current is available, the resistivity will be adjusted automatically, when true_power is True. """ - channel_type = 'HTR' + kind = 'HTR' value = Parameter('heater output', FloatRange(unit='W'), readonly=False) status = Parameter(update_unchanged='never') target = Parameter('heater output', FloatRange(0, 100, unit='$'), readonly=False, update_unchanged='never') @@ -306,23 +308,23 @@ class HeaterOutput(HasInput, MercuryChannel, Writable): _volt_target = None def read_limit(self): - return self.query('HTR:VLIM') ** 2 / self.resistivity + return self.query('DEV::HTR:VLIM') ** 2 / self.resistivity def write_limit(self, value): - result = self.change('HTR:VLIM', math.sqrt(value * self.resistivity)) + result = self.change('DEV::HTR:VLIM', math.sqrt(value * self.resistivity)) return result ** 2 / self.resistivity def read_resistivity(self): if self.true_power: return self.resistivity - return max(10, self.query('HTR:RES')) + return max(10.0, self.query('DEV::HTR:RES')) def write_resistivity(self, value): - self.resistivity = self.change('HTR:RES', max(10, value)) + self.resistivity = self.change('DEV::HTR:RES', max(10.0, value)) if self._last_target is not None: if not self.true_power: self._volt_target = math.sqrt(self._last_target * self.resistivity) - self.change('HTR:SIG:VOLT', self._volt_target) + self.change('DEV::HTR:SIG:VOLT', self._volt_target) return self.resistivity def read_status(self): @@ -332,9 +334,9 @@ class HeaterOutput(HasInput, MercuryChannel, Writable): if self._last_target is None: # on init self.read_target() if not self.true_power: - volt = self.query('HTR:SIG:VOLT') - return volt ** 2 / max(10, self.resistivity) - volt, current = self.multiquery('HTR:SIG', ('VOLT', 'CURR')) + volt = self.query('DEV::HTR:SIG:VOLT') + return volt ** 2 / max(10.0, self.resistivity) + volt, current = self.multiquery('DEV::HTR:SIG', ('VOLT', 'CURR')) if volt > 0 and current > 0.0001 and self._last_target: res = volt / current tol = res * max(max(0.0003, abs(volt - self._volt_target)) / volt, 0.0001 / current, 0.0001) @@ -342,16 +344,16 @@ class HeaterOutput(HasInput, MercuryChannel, Writable): self.write_resistivity(round(res, 1)) if self.controlled_by == 0: self._volt_target = math.sqrt(self._last_target * self.resistivity) - self.change('HTR:SIG:VOLT', self._volt_target) + self.change('DEV::HTR:SIG:VOLT', self._volt_target) return volt * current def read_target(self): if self.controlled_by != 0 or self._last_target is not None: # read back only when not yet initialized return self.target - self._volt_target = self.query('HTR:SIG:VOLT') - self.resistivity = max(10, self.query('HTR:RES')) - self._last_target = self._volt_target ** 2 / max(10, self.resistivity) + self._volt_target = self.query('DEV::HTR:SIG:VOLT') + self.resistivity = max(10.0, self.query('DEV::HTR:RES')) + self._last_target = self._volt_target ** 2 / max(10.0, self.resistivity) return self._last_target def set_target(self, target): @@ -360,7 +362,7 @@ class HeaterOutput(HasInput, MercuryChannel, Writable): might be used by a software loop """ self._volt_target = math.sqrt(target * self.resistivity) - self.change('HTR:SIG:VOLT', self._volt_target) + self.change('DEV::HTR:SIG:VOLT', self._volt_target) self._last_target = target return target @@ -370,13 +372,14 @@ class HeaterOutput(HasInput, MercuryChannel, Writable): class TemperatureLoop(TemperatureSensor, Loop, Drivable): - channel_type = 'TEMP' + kind = 'TEMP' output_module = Attached(HasInput, mandatory=False) ramp = Parameter('ramp rate', FloatRange(0, unit='$/min'), readonly=False) enable_ramp = Parameter('enable ramp rate', BoolType(), readonly=False) setpoint = Parameter('working setpoint (differs from target when ramping)', FloatRange(0, unit='$')) tolerance = Parameter(default=0.1) _last_setpoint_change = None + # overridden in subclass frappy_psi.triton.TemperatureLoop ENABLE = 'TEMP:LOOP:ENAB' ENABLE_RAMP = 'TEMP:LOOP:RENA' RAMP_RATE = 'TEMP:LOOP:RSET' @@ -386,23 +389,23 @@ class TemperatureLoop(TemperatureSensor, Loop, Drivable): self.read_setpoint() def read_control_active(self): - active = self.query(self.ENABLE, off_on) + active = self.query(f'DEV::{self.ENABLE}', off_on) self.set_output(active) return active def write_control_active(self, value): self.set_output(value) - return self.change(self.ENABLE, value, off_on) + return self.change(f'DEV::{self.ENABLE}', value, off_on) @nopoll # polled by read_setpoint def read_target(self): if self.read_enable_ramp(): return self.target - self.setpoint = self.query('TEMP:LOOP:TSET') + self.setpoint = self.query('DEV::TEMP:LOOP:TSET') return self.setpoint def read_setpoint(self): - setpoint = self.query('TEMP:LOOP:TSET') + setpoint = self.query('DEV::TEMP:LOOP:TSET') if self.enable_ramp: if setpoint == self.setpoint: # update target when working setpoint does no longer change @@ -417,7 +420,7 @@ class TemperatureLoop(TemperatureSensor, Loop, Drivable): return setpoint def write_target(self, value): - target = self.change('TEMP:LOOP:TSET', value) + target = self.change('DEV::TEMP:LOOP:TSET', value) if self.enable_ramp: self._last_setpoint_change = None self.set_target(value) @@ -426,10 +429,10 @@ class TemperatureLoop(TemperatureSensor, Loop, Drivable): return self.target def read_enable_ramp(self): - return self.query(self.ENABLE_RAMP, off_on) + return self.query(f'DEV::{self.ENABLE_RAMP}', off_on) def write_enable_ramp(self, value): - return self.change(self.ENABLE_RAMP, value, off_on) + return self.change(f'DEV::{self.ENABLE_RAMP}', value, off_on) def set_output(self, active): if active: @@ -443,7 +446,7 @@ class TemperatureLoop(TemperatureSensor, Loop, Drivable): self.status = status def read_ramp(self): - result = self.query(self.RAMP_RATE) + result = self.query(f'DEV::{self.RAMP_RATE}') return min(9e99, result) def write_ramp(self, value): @@ -452,23 +455,23 @@ class TemperatureLoop(TemperatureSensor, Loop, Drivable): self.write_enable_ramp(0) return 0 if value >= 9e99: - self.change(self.RAMP_RATE, 'inf', as_string) + self.change(f'DEV::{self.RAMP_RATE}', 'inf', as_string) self.write_enable_ramp(0) return 9e99 self.write_enable_ramp(1) - return self.change(self.RAMP_RATE, max(1e-4, value)) + return self.change(f'DEV::{self.RAMP_RATE}', max(1e-4, value)) class PressureSensor(MercuryChannel, Readable): - channel_type = 'PRES' + kind = 'PRES' value = Parameter(unit='mbar') def read_value(self): - return self.query('PRES:SIG:PRES') + return self.query('DEV::PRES:SIG:PRES') class ValvePos(HasInput, MercuryChannel, Drivable): - channel_type = 'PRES,AUX' + kind = 'PRES,AUX' value = Parameter('value pos', FloatRange(unit='%'), readonly=False) target = Parameter('valve pos target', FloatRange(0, 100, unit='$'), readonly=False) @@ -476,7 +479,7 @@ class ValvePos(HasInput, MercuryChannel, Drivable): self.read_status() def read_value(self): - return self.query('AUX:SIG:PERC') + return self.query(f'DEV:{self.slots["AUX"]}:AUX:SIG:PERC') def read_status(self): self.read_value() @@ -485,36 +488,36 @@ class ValvePos(HasInput, MercuryChannel, Drivable): return 'BUSY', 'moving' def read_target(self): - return self.query('PRES:LOOP:FSET') + return self.query('DEV::PRES:LOOP:FSET') def write_target(self, value): self.write_controlled_by(SELF) - return self.change('PRES:LOOP:FSET', value) + return self.change('DEV::PRES:LOOP:FSET', value) class PressureLoop(HasInput, PressureSensor, Loop, Drivable): - channel_type = 'PRES' + kind = 'PRES' output_module = Attached(ValvePos, mandatory=False) tolerance = Parameter(default=0.1) def read_control_active(self): - active = self.query('PRES:LOOP:FAUT', off_on) + active = self.query('DEV::PRES:LOOP:FAUT', off_on) self.set_output(active) return active def write_control_active(self, value): self.set_output(value) - return self.change('PRES:LOOP:FAUT', value, off_on) + return self.change('DEV::PRES:LOOP:FAUT', value, off_on) def read_target(self): - return self.query('PRES:LOOP:PRST') + return self.query('DEV::PRES:LOOP:PRST') def set_target(self, target): """set the target without switching to manual might be used by a software loop """ - self.change('PRES:LOOP:PRST', target) + self.change('DEV::PRES:LOOP:PRST', target) super().set_target(target) def write_target(self, value): @@ -571,7 +574,7 @@ class HeLevel(MercuryChannel, Readable): The Mercury system does not support automatic switching between fast (when filling) and slow (when consuming). We have to handle this by software. """ - channel_type = 'LVL' + kind = 'LVL' value = Parameter(unit='%') sample_rate = Parameter('_', EnumType(slow=0, fast=1), readonly=False) hysteresis = Parameter('hysteresis for detection of increase', FloatRange(0, 100, unit='%'), @@ -598,14 +601,14 @@ class HeLevel(MercuryChannel, Readable): return sample_rate def read_sample_rate(self): - return self.check_rate(self.query('LVL:HEL:PULS:SLOW', fast_slow)) + return self.check_rate(self.query('DEV::LVL:HEL:PULS:SLOW', fast_slow)) def write_sample_rate(self, value): self.check_rate(value) - return self.change('LVL:HEL:PULS:SLOW', value, fast_slow) + return self.change('DEV::LVL:HEL:PULS:SLOW', value, fast_slow) def read_value(self): - level = self.query('LVL:SIG:HEL:LEV') + level = self.query('DEV::LVL:SIG:HEL:LEV') # handle automatic switching depending on increase now = time.time() if self._last_increase: # fast mode @@ -625,8 +628,8 @@ class HeLevel(MercuryChannel, Readable): class N2Level(MercuryChannel, Readable): - channel_type = 'LVL' + kind = 'LVL' value = Parameter(unit='%') def read_value(self): - return self.query('LVL:SIG:NIT:LEV') + return self.query('DEV::LVL:SIG:NIT:LEV')