diff --git a/frappy_psi/mercury.py b/frappy_psi/mercury.py index 38c8aa7..304f2a1 100644 --- a/frappy_psi/mercury.py +++ b/frappy_psi/mercury.py @@ -27,7 +27,7 @@ import time from frappy.core import Drivable, HasIO, Writable, \ Parameter, Property, Readable, StringIO, Attached, IDLE, nopoll -from frappy.datatypes import EnumType, FloatRange, StringType, StructOf, BoolType +from frappy.datatypes import EnumType, FloatRange, StringType, StructOf, BoolType, TupleOf from frappy.errors import HardwareError from frappy_psi.convergence import HasConvergence from frappy.lib.enum import Enum @@ -38,6 +38,7 @@ SELF = 0 def as_float(value): + """converts string (with unit) to float and float to string""" if isinstance(value, str): return float(VALUE_UNIT.match(value).group(1)) return f'{value:g}' @@ -61,7 +62,7 @@ fast_slow = Mapped(ON=0, OFF=1) # maps OIs slow=ON/fast=OFF to sample_rate.slow class IO(StringIO): - identification = [('*IDN?', r'IDN:OXFORD INSTRUMENTS:MERCURY*')] + identification = [('*IDN?', r'IDN:OXFORD INSTRUMENTS:*')] class MercuryChannel(HasIO): @@ -70,7 +71,6 @@ class MercuryChannel(HasIO): example: DB6.T1,DB1.H1 slot ids for sensor (and control output)''', StringType()) - channel_name = Parameter('mercury nick name', StringType(), default='', update_unchanged='never') channel_type = '' #: channel type(s) for sensor (and control) e.g. TEMP,HTR or PRES,AUX def _complete_adr(self, adr): @@ -83,7 +83,7 @@ class MercuryChannel(HasIO): return f'DEV:{slot}:{head}{sep}{tail}' return adr - def multiquery(self, adr, names=(), convert=as_float): + def multiquery(self, adr, names=(), convert=as_float, debug=None): """get parameter(s) in mercury syntax :param adr: the 'address part' of the SCPI command @@ -100,25 +100,36 @@ class MercuryChannel(HasIO): self.slot='DB5.P1,DB3.G1' # -> take second slot -> query command will be READ:DEV:DB3.G1:PRES:SIG:PERC """ + # TODO: if the need arises: allow convert to be a list adr = self._complete_adr(adr) cmd = f"READ:{adr}:{':'.join(names)}" - reply = self.communicate(cmd) - head = f'STAT:{adr}:' - try: - assert reply.startswith(head) - replyiter = iter(reply[len(head):].split(':')) - keys, result = zip(*zip(replyiter, replyiter)) - assert keys == tuple(names) - return tuple(convert(r) for r in result) - except (AssertionError, AttributeError, ValueError): - raise HardwareError(f'invalid reply {reply!r} to cmd {cmd!r}') from None + msg = '' + for _ in range(3): + if msg: + self.log.warning('%s', msg) + reply = self.communicate(cmd) + if debug is not None: + debug.append(reply) + head = f'STAT:{adr}:' + try: + assert reply.startswith(head) + replyiter = iter(reply[len(head):].split(':')) + keys, result = zip(*zip(replyiter, replyiter)) + assert keys == tuple(names) + return tuple(convert(r) for r in result) + except (AssertionError, AttributeError, ValueError): + time.sleep(0.1) # in case this was the answer of a previous command + msg = f'invalid reply {reply!r} to cmd {cmd!r}' + raise HardwareError(msg) from None - def multichange(self, adr, values, convert=as_float): + def multichange(self, adr, values, convert=as_float, tolerance=0, n_retry=3): """set parameter(s) in mercury syntax :param adr: as in see multiquery method :param values: [(name1, value1), (name2, value2) ...] :param convert: a converter function (converts given value to string and replied string to value) + :param tolerance: tolerance for readback check + :param n_retry: number of retries or 0 for no readback check :return: the values as tuple Example: @@ -128,21 +139,41 @@ class MercuryChannel(HasIO): self.slot='DB6.T1,DB1.H1' # and take first slot -> change command will be SET:DEV:DB6.T1:TEMP:LOOP:P:5:I:2:D:0 """ + # TODO: if the need arises: allow convert and or tolerance to be a list adr = self._complete_adr(adr) params = [f'{k}:{convert(v)}' for k, v in values] cmd = f"SET:{adr}:{':'.join(params)}" - reply = self.communicate(cmd) - head = f'STAT:SET:{adr}:' - - try: - assert reply.startswith(head) - replyiter = iter(reply[len(head):].split(':')) - keys, result, valid = zip(*zip(replyiter, replyiter, replyiter)) - assert keys == tuple(k for k, _ in values) - assert any(v == 'VALID' for v in valid) - return tuple(convert(r) for r in result) - except (AssertionError, AttributeError, ValueError) as e: - raise HardwareError(f'invalid reply {reply!r} to cmd {cmd!r}') from e + for _ in range(max(1, n_retry)): # try n_retry times or until readback result matches + reply = self.communicate(cmd) + head = f'STAT:SET:{adr}:' + try: + assert reply.startswith(head) + replyiter = iter(reply[len(head):].split(':')) + # reshuffle reply=(k1, r1, v1, k2, r2, v1) --> keys = (k1, k2), result = (r1, r2), valid = (v1, v2) + keys, result, valid = zip(*zip(replyiter, replyiter, replyiter)) + assert keys == tuple(k for k, _ in values) + assert any(v == 'VALID' for v in valid) + result = tuple(convert(r) for r in result) + except (AssertionError, AttributeError, ValueError) as e: + time.sleep(0.1) # in case of missed replies this might help to skip garbage + raise HardwareError(f'invalid reply {reply!r} to cmd {cmd!r}') from e + if n_retry == 0: + return [v[1] for v in values] # no readback check + keys = [v[0] for v in values] + debug = [] + readback = self.multiquery(adr, keys, convert, debug) + for k, r, b in zip(keys, result, readback): + if convert is as_float: + tol = max(abs(r) * 1e-3, abs(b) * 1e-3, tolerance) + if abs(r - b) > tol: + break + elif r != b: + break + else: + return readback + self.log.warning('sent: %s', cmd) + self.log.warning('got: %s', debug[0]) + return readback def query(self, adr, convert=as_float): """query a single parameter @@ -152,14 +183,9 @@ class MercuryChannel(HasIO): adr, _, name = adr.rpartition(':') return self.multiquery(adr, [name], convert)[0] - def change(self, adr, value, convert=as_float): + def change(self, adr, value, convert=as_float, tolerance=0, n_retry=3): adr, _, name = adr.rpartition(':') - return self.multichange(adr, [(name, value)], convert)[0] - - def read_channel_name(self): - if self.channel_name: - return self.channel_name # channel name will not change - return self.query('0:NICK', as_string) + return self.multichange(adr, [(name, value)], convert, tolerance, n_retry)[0] class TemperatureSensor(MercuryChannel, Readable): @@ -176,26 +202,29 @@ class TemperatureSensor(MercuryChannel, Readable): class HasInput(MercuryChannel): controlled_by = Parameter('source of target value', EnumType(members={'self': SELF}), default=0) - target = Parameter(readonly=False) - input_modules = () + # do not know why this? target = Parameter(readonly=False) + input_callbacks = () - def add_input(self, modobj): - if not self.input_modules: - self.input_modules = [] - self.input_modules.append(modobj) + def register_input(self, name, control_off): + """register input + + :param name: the name of the module (for controlled_by enum) + :param control_off: a method on the input module to switch off control + """ + if not self.input_callbacks: + self.input_callbacks = [] + self.input_callbacks.append(control_off) prev_enum = self.parameters['controlled_by'].datatype._enum # add enum member, using autoincrement feature of Enum - self.parameters['controlled_by'].datatype = EnumType(Enum(prev_enum, **{modobj.name: None})) + self.parameters['controlled_by'].datatype = EnumType(Enum(prev_enum, **{name: None})) def write_controlled_by(self, value): if self.controlled_by == value: return value self.controlled_by = value if value == SELF: - self.log.warning('switch to manual mode') - for input_module in self.input_modules: - if input_module.control_active: - input_module.write_control_active(False) + for control_off in self.input_callbacks: + control_off() return value @@ -213,12 +242,17 @@ class Loop(HasConvergence, MercuryChannel, Drivable): def initModule(self): super().initModule() if self.output_module: - self.output_module.add_input(self) + self.output_module.register_input(self.name, self.control_off) + + def control_off(self): + if self.control_active: + self.log.warning('switch to manual mode') + self.write_control_active(False) def set_output(self, active): if active: if self.output_module and self.output_module.controlled_by != self.name: - self.output_module.controlled_by = self.name + self.output_module.write_controlled_by(self.name) else: if self.output_module and self.output_module.controlled_by != SELF: self.output_module.write_controlled_by(SELF) @@ -231,7 +265,7 @@ class Loop(HasConvergence, MercuryChannel, Drivable): self.set_output(True) else: self.log.warning('switch loop control on') - self.write_control_active(True) + self.write_control_active(True) self.target = target self.start_state() @@ -312,24 +346,23 @@ class HeaterOutput(HasInput, MercuryChannel, Writable): return volt * current def read_target(self): - if self.controlled_by != 0 and self.target: - return 0 - if self._last_target is not None: + if self.controlled_by != 0 or self._last_target is not None: + # read back only when not yet initialized return self.target self._volt_target = self.query('HTR:SIG:VOLT') self.resistivity = max(10, self.query('HTR:RES')) self._last_target = self._volt_target ** 2 / max(10, self.resistivity) return self._last_target - def set_target(self, value): + def set_target(self, target): """set the target without switching to manual might be used by a software loop """ - self._volt_target = math.sqrt(value * self.resistivity) + self._volt_target = math.sqrt(target * self.resistivity) self.change('HTR:SIG:VOLT', self._volt_target) - self._last_target = value - return value + self._last_target = target + return target def write_target(self, value): self.write_controlled_by(SELF) @@ -337,26 +370,29 @@ class HeaterOutput(HasInput, MercuryChannel, Writable): class TemperatureLoop(TemperatureSensor, Loop, Drivable): - channel_type = 'TEMP,HTR' - output_module = Attached(HeaterOutput, mandatory=False) - ramp = Parameter('ramp rate', FloatRange(0, unit='K/min'), readonly=False) + channel_type = 'TEMP' + output_module = Attached(HasInput, mandatory=False) + ramp = Parameter('ramp rate', FloatRange(0, unit='$/min'), readonly=False) enable_ramp = Parameter('enable ramp rate', BoolType(), readonly=False) setpoint = Parameter('working setpoint (differs from target when ramping)', FloatRange(0, unit='$')) - auto_flow = Parameter('enable auto flow', BoolType(), readonly=False) + tolerance = Parameter(default=0.1) _last_setpoint_change = None + ENABLE = 'TEMP:LOOP:ENAB' + ENABLE_RAMP = 'TEMP:LOOP:RENA' + RAMP_RATE = 'TEMP:LOOP:RSET' def doPoll(self): super().doPoll() self.read_setpoint() def read_control_active(self): - active = self.query('TEMP:LOOP:ENAB', off_on) + active = self.query(self.ENABLE, off_on) self.set_output(active) return active def write_control_active(self, value): self.set_output(value) - return self.change('TEMP:LOOP:ENAB', value, off_on) + return self.change(self.ENABLE, value, off_on) @nopoll # polled by read_setpoint def read_target(self): @@ -390,19 +426,24 @@ class TemperatureLoop(TemperatureSensor, Loop, Drivable): return self.target def read_enable_ramp(self): - return self.query('TEMP:LOOP:RENA', off_on) + return self.query(self.ENABLE_RAMP, off_on) def write_enable_ramp(self, value): - return self.change('TEMP:LOOP:RENA', value, off_on) + return self.change(self.ENABLE_RAMP, value, off_on) - def read_auto_flow(self): - return self.query('TEMP:LOOP:FAUT', off_on) - - def write_auto_flow(self, value): - return self.change('TEMP:LOOP:FAUT', value, off_on) + def set_output(self, active): + if active: + if self.output_module and self.output_module.controlled_by != self.name: + self.output_module.write_controlled_by(self.name) + else: + if self.output_module and self.output_module.controlled_by != SELF: + self.output_module.write_controlled_by(SELF) + status = IDLE, 'control inactive' + if self.status != status: + self.status = status def read_ramp(self): - result = self.query('TEMP:LOOP:RSET') + result = self.query(self.RAMP_RATE) return min(9e99, result) def write_ramp(self, value): @@ -411,11 +452,11 @@ class TemperatureLoop(TemperatureSensor, Loop, Drivable): self.write_enable_ramp(0) return 0 if value >= 9e99: - self.change('TEMP:LOOP:RSET', 'inf', as_string) + self.change(self.RAMP_RATE, 'inf', as_string) self.write_enable_ramp(0) return 9e99 self.write_enable_ramp(1) - return self.change('TEMP:LOOP:RSET', max(1e-4, value)) + return self.change(self.RAMP_RATE, max(1e-4, value)) class PressureSensor(MercuryChannel, Readable): @@ -451,9 +492,10 @@ class ValvePos(HasInput, MercuryChannel, Drivable): return self.change('PRES:LOOP:FSET', value) -class PressureLoop(PressureSensor, Loop, Drivable): - channel_type = 'PRES,AUX' +class PressureLoop(HasInput, PressureSensor, Loop, Drivable): + channel_type = 'PRES' output_module = Attached(ValvePos, mandatory=False) + tolerance = Parameter(default=0.1) def read_control_active(self): active = self.query('PRES:LOOP:FAUT', off_on) @@ -467,10 +509,60 @@ class PressureLoop(PressureSensor, Loop, Drivable): def read_target(self): return self.query('PRES:LOOP:PRST') + def set_target(self, target): + """set the target without switching to manual + + might be used by a software loop + """ + self.change('PRES:LOOP:PRST', target) + super().set_target(target) + def write_target(self, value): - target = self.change('PRES:LOOP:PRST', value) - self.set_target(target) - return self.target + self.write_controlled_by(SELF) + self.set_target(value) + return value + + +class HasAutoFlow: + needle_valve = Attached(PressureLoop, mandatory=False) + auto_flow = Parameter('enable auto flow', BoolType(), readonly=False, default=0) + flowpars = Parameter('Tdif(min, max), FlowSet(min, max)', + TupleOf(TupleOf(FloatRange(unit='K'), FloatRange(unit='K')), + TupleOf(FloatRange(unit='mbar'), FloatRange(unit='mbar'))), + readonly=False, default=((1, 5), (4, 20))) + + def read_value(self): + value = super().read_value() + if self.auto_flow: + (dmin, dmax), (fmin, fmax) = self.flowpars + flowset = min(dmax - dmin, max(0, value - self.target - dmin)) / (dmax - dmin) * (fmax - fmin) + fmin + self.needle_valve.set_target(flowset) + return value + + def initModule(self): + super().initModule() + if self.needle_valve: + self.needle_valve.register_input(self.name, self.auto_flow_off) + + def write_auto_flow(self, value): + if value: + if self.needle_valve and self.needle_valve.controlled_by != self.name: + self.needle_valve.write_controlled_by(self.name) + else: + if self.needle_valve and self.needle_valve.controlled_by != SELF: + self.needle_valve.write_controlled_by(SELF) + _, (fmin, _) = self.flowpars + self.needle_valve.write_target(fmin) + return value + + def auto_flow_off(self): + if self.auto_flow: + self.log.warning('switch auto flow off') + self.write_auto_flow(False) + + +class TemperatureAutoFlow(HasAutoFlow, TemperatureLoop): + pass class HeLevel(MercuryChannel, Readable): @@ -480,6 +572,7 @@ class HeLevel(MercuryChannel, Readable): (when filling) and slow (when consuming). We have to handle this by software. """ channel_type = 'LVL' + value = Parameter(unit='%') sample_rate = Parameter('_', EnumType(slow=0, fast=1), readonly=False) hysteresis = Parameter('hysteresis for detection of increase', FloatRange(0, 100, unit='%'), default=5, readonly=False) @@ -533,9 +626,7 @@ class HeLevel(MercuryChannel, Readable): class N2Level(MercuryChannel, Readable): channel_type = 'LVL' + value = Parameter(unit='%') def read_value(self): return self.query('LVL:SIG:NIT:LEV') - - -# TODO: magnet power supply