diff --git a/secop_psi/mercury.py b/secop_psi/mercury.py index 9347850..991d0fb 100644 --- a/secop_psi/mercury.py +++ b/secop_psi/mercury.py @@ -25,80 +25,6 @@ import math import re import time -<<<<<<< HEAD -from secop.core import Drivable, HasIodev, \ - Parameter, Property, Readable, StringIO -from secop.datatypes import EnumType, FloatRange, StringType, StructOf -from secop.errors import HardwareError -from secop.lib.statemachine import StateMachine - - -class MercuryIO(StringIO): - identification = [('*IDN?', r'IDN:OXFORD INSTRUMENTS:MERCURY*')] - - -VALUE_UNIT = re.compile(r'(.*\d)([A-Za-z]*)$') - - -def make_map(**kwds): - """create a dict converting internal names to values and vice versa""" - kwds.update({v: k for k, v in kwds.items()}) - return kwds - - -MODE_MAP = make_map(OFF=0, ON=1) -SAMPLE_RATE = make_map(OFF=1, ON=0) # invert the codes used by OI - - -class MercuryChannel(HasIodev): - slots = Property('''slot uids - - example: DB6.T1,DB1.H1 - slot ids for sensor (and control output)''', - StringType()) - channel_name = Parameter('mercury nick name', StringType()) - channel_type = '' #: channel type(s) for sensor (and control) - - def query(self, adr, value=None): - """get or set a parameter in mercury syntax - - :param adr: for example "TEMP:SIG:TEMP" - :param value: if given and not None, a write command is executed - :return: the value - - remark: the DEV: is added automatically, when adr starts with the channel type - in addition, when addr starts with '0:' or '1:', the channel type is added - """ - for i, (channel_type, slot) in enumerate(zip(self.channel_type.split(','), self.slots.split(','))): - if adr.startswith('%d:' % i): - adr = 'DEV:%s:%s:%s' % (slot, channel_type, adr[2:]) # assume i <= 9 - break - if adr.startswith(channel_type + ':'): - adr = 'DEV:%s:%s' % (slot, adr) - break - if value is not None: - try: - value = '%g' % value # this works for float, integers and enums - except ValueError: - value = str(value) # this alone would not work for enums, and not be nice for floats - cmd = 'SET:%s:%s' % (adr, value) - reply = self._iodev.communicate(cmd) - if reply != 'STAT:%s:VALID' % cmd: - raise HardwareError('bad response %r to %r' % (reply, cmd)) - # chain a read command anyway - cmd = 'READ:%s' % adr - reply = self._iodev.communicate(cmd) - head, _, result = reply.rpartition(':') - if head != 'STAT:%s' % adr: - raise HardwareError('bad response %r to %r' % (reply, cmd)) - match = VALUE_UNIT.match(result) - if match: # result can be interpreted as a float with optional units - return float(match.group(1)) - return result - - def read_channel_name(self): - return self.query('') -======= from secop.core import Drivable, HasIO, Writable, \ Parameter, Property, Readable, StringIO, Attached, Done, IDLE, nopoll from secop.datatypes import EnumType, FloatRange, StringType, StructOf, BoolType @@ -234,17 +160,12 @@ class MercuryChannel(HasIO): if self.channel_name: return Done # channel name will not change return self.query('0:NICK', as_string) ->>>>>>> d3379d5... support for OI mercury series class TemperatureSensor(MercuryChannel, Readable): channel_type = 'TEMP' value = Parameter(unit='K') -<<<<<<< HEAD - raw = Parameter('raw value', FloatRange()) -======= raw = Parameter('raw value', FloatRange(unit='Ohm')) ->>>>>>> d3379d5... support for OI mercury series def read_value(self): return self.query('TEMP:SIG:TEMP') @@ -253,242 +174,6 @@ class TemperatureSensor(MercuryChannel, Readable): return self.query('TEMP:SIG:RES') -<<<<<<< HEAD -class HasProgressCheck: - """mixin for progress checks - - Implements progress checks based on tolerance, settling time and timeout. - The algorithm does its best to support changes of these parameters on the - fly. However, the full history is not considered, which means for example - that the spent time inside tolerance stored already is not altered when - changing tolerance. - """ - tolerance = Parameter('absolute tolerance', FloatRange(0), readonly=False, default=0) - min_slope = Parameter('minimal abs(slope)', FloatRange(0), readonly=False, default=0) - settling_time = Parameter( - '''settling time - - total amount of time the value has to be within tolerance before switching to idle. - ''', FloatRange(0), readonly=False, default=0) - timeout = Parameter( - '''timeout - - timeout = 0: disabled, else: - A timeout event happens, when the difference (target - value) is not improved by - at least min_slope * timeout over any interval (t, t + timeout). - As soon as the value is the first time within tolerance, the criterium is changed: - then the timeout event happens after this time + settling_time + timeout. - ''', FloatRange(0, unit='sec'), readonly=False, default=3600) - status = Parameter('status determined from progress check') - value = Parameter() - target = Parameter() - - def earlyInit(self): - super().earlyInit() - self.__state = StateMachine() - - def prepare_state(self, state): - tol = self.tolerance - if not tol: - tol = 0.01 * max(abs(self.target), abs(self.value)) - dif = abs(self.target - self.value) - return dif, tol, state.now, state.delta(0) - - def state_approaching(self, state): - if self.init(): - self.status = 'BUSY', 'approaching' - dif, tol, now, delta = self.prepare_state(state) - if dif < tol: - state.timeout_base = now - state.next_step(self.state_inside) - return - if not self.timeout: - return - if state.init(): - state.timeout_base = now - state.dif_crit = dif - return - min_slope = getattr(self, 'ramp', 0) or getattr('min_slope', 0) - state.dif_crit -= min_slope * delta - if dif < state.dif_crit: - state.timeout_base = now - elif now > state.timeout_base: - self.status = 'WARNING', 'convergence timeout' - state.next_action(self.state_idle) - - def state_inside(self, state): - if state.init(): - self.status = 'BUSY', 'inside tolerance' - dif, tol, now, delta = self.prepare_state(state) - if dif > tol: - state.next_action(self.state_outside) - state.spent_inside += delta - if state.spent_inside > self.settling_time: - self.status = 'IDLE', 'reached target' - state.next_action(self.state_idle) - - def state_outside(self, state, now, dif, tol, delta): - if state.init(): - self.status = 'BUSY', 'outside tolerance' - dif, tol, now, delta = self.prepare_state(state) - if dif < tol: - state.next_action(self.state_inside) - elif now > self.timeout_base + self.settling_time + self.timeout: - self.status = 'WARNING', 'settling timeout' - state.next_action(self.state_idle) - - def start_state(self): - """must be called from write_target, whenever the target changes""" - self.__state.start(self.state_approach) - - def poll(self): - super().poll() - self.__state.poll() - - def read_status(self): - if self.status[0] == 'IDLE': - # do not change when idle already - return self.status - return self.check_progress(self.value, self.target) - - def write_target(self, value): - raise NotImplementedError() - - -class Loop(HasProgressCheck, MercuryChannel): - """common base class for loops""" - mode = Parameter('control mode', EnumType(manual=0, pid=1), readonly=False) - ctrlpars = Parameter( - 'pid (proportional nad, integral time, differential time', - StructOf(p=FloatRange(0, unit='$'), i=FloatRange(0, unit='min'), d=FloatRange(0, unit='min')), - readonly=False, poll=True - ) - """pid = Parameter('control parameters', StructOf(p=FloatRange(), i=FloatRange(), d=FloatRange()),readonly=False)""" - pid_table_mode = Parameter('', EnumType(off=0, on=1), readonly=False) - - def read_ctrlpars(self): - return self.query('0:LOOP:P') - - def read_integ(self): - return self.query('0:LOOP:I') - - def read_deriv(self): - return self.query('0:LOOP:D') - - def write_prop(self, value): - return self.query('0:LOOP:P', value) - - def write_integ(self, value): - return self.query('0:LOOP:I', value) - - def write_deriv(self, value): - return self.query('0:LOOP:D', value) - - def read_enable_pid_table(self): - return self.query('0:LOOP:PIDT').lower() - - def write_enable_pid_table(self, value): - return self.query('0:LOOP:PIDT', value.upper()).lower() - - def read_mode(self): - return MODE_MAP[self.query('0:LOOP:ENAB')] - - def write_mode(self, value): - if value == 'manual': - self.status = 'IDLE', 'manual mode' - elif self.status[0] == 'IDLE': - self.status = 'IDLE', '' - return MODE_MAP[self.query('0:LOOP:ENAB', value)] - - def write_target(self, value): - raise NotImplementedError - - # def read_pid(self): - # # read all in one go, in order to reduce comm. traffic - # cmd = 'READ:DEV:%s:TEMP:LOOP:P:I:D' % self.slots.split(',')[0] - # reply = self._iodev.communicate(cmd) - # result = list(reply.split(':')) - # pid = result[6::2] - # del result[6::2] - # if ':'.join(result) != cmd: - # raise HardwareError('bad response %r to %r' % (reply, cmd)) - # return dict(zip('pid', pid)) - # - # def write_pid(self, value): - # # for simplicity use single writes - # return {k: self.query('LOOP:%s' % k.upper(), value[k]) for k in 'pid'} - - -class TemperatureLoop(Loop, TemperatureSensor, Drivable): - channel_type = 'TEMP,HTR' - heater_limit = Parameter('heater output limit', FloatRange(0, 100, unit='W'), readonly=False) - heater_resistivity = Parameter('heater resistivity', FloatRange(10, 1000, unit='Ohm'), readonly=False) - ramp = Parameter('ramp rate', FloatRange(0, unit='K/min'), readonly=False) - enable_ramp = Parameter('enable ramp rate', EnumType(off=0, on=1), readonly=False) - auto_flow = Parameter('enable auto flow', EnumType(off=0, on=1), readonly=False) - heater_output = Parameter('heater output', FloatRange(0, 100, unit='W'), readonly=False) - - def read_heater_limit(self): - return self.query('HTR:VLIM') ** 2 / self.heater_resistivity - - def write_heater_limit(self, value): - result = self.query('HTR:VLIM', math.sqrt(value * self.heater_resistivity)) - return result ** 2 / self.heater_resistivity - - def read_heater_resistivity(self): - value = self.query('HTR:RES') - if value: - return value - return self.heater_resistivity - - def write_heater_resistivity(self, value): - return self.query('HTR:RES', value) - - def read_enable_ramp(self): - return self.query('TEMP:LOOP:RENA').lower() - - def write_enable_ramp(self, value): - return self.query('TEMP:LOOP:RENA', EnumType(off=0, on=1)(value).name).lower() - - def read_auto_flow(self): - return self.query('TEMP:LOOP:FAUT').lower() - - def write_auto_flow(self, value): - return self.query('TEMP:LOOP:FAUT', EnumType(off=0, on=1)(value).name).lower() - - def read_ramp(self): - return self.query('TEMP:LOOP:RSET') - - def write_ramp(self, value): - if not value: - self.write_enable_ramp(0) - return 0 - if value: - self.write_enable_ramp(1) - return self.query('TEMP:LOOP:RSET', value) - - def read_target(self): - # TODO: check about working setpoint - return self.query('TEMP:LOOP:TSET') - - def write_target(self, value): - if self.mode != 'pid': - self.log.warning('switch to pid loop mode') - self.write_mode('pid') - self.reset_progress(self.value, value) - return self.query('TEMP:LOOP:TSET', value) - - def read_heater_output(self): - # TODO: check that this really works, else next line - return self.query('HTR:SIG:POWR') - # return self.query('HTR:SIG:VOLT') ** 2 / self.heater_resistivity - - def write_heater_output(self, value): - if self.mode != 'manual': - self.log.warning('switch to manual heater mode') - self.write_mode('manual') - return self.query('HTR:SIG:VOLT', math.sqrt(value * self.heater_resistivity)) -======= class HasInput(MercuryChannel): controlled_by = Parameter('source of target value', EnumType(members={'self': SELF}), default=0) target = Parameter(readonly=False) @@ -733,7 +418,6 @@ class TemperatureLoop(TemperatureSensor, Loop, Drivable): return 9e99 self.write_enable_ramp(1) return self.change('TEMP:LOOP:RSET', max(1e-4, value)) ->>>>>>> d3379d5... support for OI mercury series class PressureSensor(MercuryChannel, Readable): @@ -744,34 +428,6 @@ class PressureSensor(MercuryChannel, Readable): return self.query('PRES:SIG:PRES') -<<<<<<< HEAD -class PressureLoop(Loop, PressureSensor, Drivable): - channel_type = 'PRES,AUX' - - valve_pos = Parameter('valve position', FloatRange(0, 100, unit='%'), readonly=False) - - def read_valve_pos(self): - return self.query('AUX:SIG:PERC') - - def write_valve_pos(self, value): - if self.mode != 'manual': - self.log.warning('switch to manual valve mode') - self.write_mode('manual') - return self.query('AUX:SIG:PERC', value) - - def write_target(self, value): - self.reset_progress(self.value, value) - return self.query('PRES:LOOP:PRST', value) - - -class HeLevel(MercuryChannel, Readable): - channel_type = 'LVL' - sample_rate = Parameter('_', EnumType(slow=0, fast=1), readonly=False, poll=True) - hysteresis = Parameter('hysteresis for detection of increase', FloatRange(0, 100, unit='%'), readonly=False) - fast_timeout = Parameter('timeout for switching to slow', FloatRange(0, unit='sec'), readonly=False) - _min_level = 200 - _max_level = -100 -======= class ValvePos(HasInput, MercuryChannel, Drivable): channel_type = 'PRES,AUX' value = Parameter('value pos', FloatRange(unit='%'), readonly=False) @@ -833,7 +489,6 @@ class HeLevel(MercuryChannel, Readable): default=300, readonly=False) _min_level = 999 _max_level = -999 ->>>>>>> d3379d5... support for OI mercury series _last_increase = None # None when in slow mode, last increase time in fast mode def check_rate(self, sample_rate): @@ -845,20 +500,6 @@ class HeLevel(MercuryChannel, Readable): if sample_rate != 0: # fast if not self._last_increase: self._last_increase = time.time() -<<<<<<< HEAD - self._max_level = -100 - elif self._last_increase: - self._last_increase = None - self._min_level = 200 - return sample_rate - - def read_sample_rate(self): - return self.check_rate(SAMPLE_RATE[self.query('LVL:HEL:PULS:SLOW')]) - - def write_sample_rate(self, value): - self.check_rate(value) - return SAMPLE_RATE[self.query('LVL:HEL:PULS:SLOW', SAMPLE_RATE[value])] -======= self._max_level = -999 elif self._last_increase: self._last_increase = None @@ -871,7 +512,6 @@ class HeLevel(MercuryChannel, Readable): def write_sample_rate(self, value): self.check_rate(value) return self.change('LVL:HEL:PULS:SLOW', value, fast_slow) ->>>>>>> d3379d5... support for OI mercury series def read_value(self): level = self.query('LVL:SIG:HEL:LEV') @@ -883,19 +523,11 @@ class HeLevel(MercuryChannel, Readable): self._max_level = level elif now > self._last_increase + self.fast_timeout: # no increase since fast timeout -> slow -<<<<<<< HEAD - self.write_sample_rate('slow') - else: - if level > self._min_level + self.hysteresis: - # substantial increase -> fast - self.write_sample_rate('fast') -======= self.write_sample_rate(self.sample_rate.slow) else: if level > self._min_level + self.hysteresis: # substantial increase -> fast self.write_sample_rate(self.sample_rate.fast) ->>>>>>> d3379d5... support for OI mercury series else: self._min_level = min(self._min_level, level) return level @@ -908,9 +540,4 @@ class N2Level(MercuryChannel, Readable): return self.query('LVL:SIG:NIT:LEV') -<<<<<<< HEAD -class MagnetOutput(MercuryChannel, Drivable): - pass -======= # TODO: magnet power supply ->>>>>>> d3379d5... support for OI mercury series