diff --git a/frappy_psi/ccu4.py b/frappy_psi/ccu4.py index 202feec..19bccf0 100644 --- a/frappy_psi/ccu4.py +++ b/frappy_psi/ccu4.py @@ -21,10 +21,14 @@ """drivers for CCU4, the cryostat control unit at SINQ""" import time +import math # the most common Frappy classes can be imported from frappy.core -from frappy.core import EnumType, FloatRange, TupleOf, \ - HasIO, Parameter, Command, Readable, StringIO, StatusType, \ - BUSY, IDLE, ERROR, DISABLED +from frappy.core import HasIO, Parameter, Command, Readable, Writable, Drivable, \ + Property, StringIO, BUSY, IDLE, WARN, ERROR, DISABLED, Attached +from frappy.datatypes import BoolType, EnumType, FloatRange, StructOf, \ + StatusType, IntRange, StringType, TupleOf +from frappy.dynamic import Pinata +from frappy.errors import CommunicationFailedError from frappy.states import HasStates, status_code, Retry @@ -36,16 +40,35 @@ class CCU4IO(StringIO): identification = [('cid', r'CCU4.*')] -# inheriting HasIO allows us to use the communicate method for talking with the hardware -# Readable as a base class defines the value and status parameters -class HeLevel(HasIO, Readable): - """He Level channel of CCU4""" - - # define the communication class to create the IO module +class CCU4Base(HasIO): ioClass = CCU4IO - # define or alter the parameters - # as Readable.value exists already, we give only the modified property 'unit' + def command(self, *args, **kwds): + """send a command and get the response + + :param args: the name of the parameters to query + :param kwds: = for changing a parameter + :returns: the (new) values of the parameters + """ + cmds = [f'{k}={v:g}' for k, v in kwds.items()] + list(args) + reply = self.io.communicate(' '.join(cmds)).split() + names = list(args) + list(kwds) + if len(reply) != len(names): + raise CommunicationFailedError('number of reply items does not match') + result = [] + for given, item in zip(names, reply): + name, txtvalue = item.split('=') + if given != name: + raise CommunicationFailedError('result keys do not match given keys') + result.append(float(txtvalue)) + if len(result) == 1: + return result[0] + return result + + +class HeLevel(CCU4Base, Readable): + """He Level channel of CCU4""" + value = Parameter(unit='%') empty_length = Parameter('warm length when empty', FloatRange(0, 2000, unit='mm'), readonly=False) @@ -65,143 +88,100 @@ class HeLevel(HasIO, Readable): 5: (DISABLED, 'disabled'), } - def query(self, cmd): - """send a query and get the response - - :param cmd: the name of the parameter to query or '= self.value: - self.value -= delta / (3600 * self.fill_hours_range[1]) - elif self.raw < self.value: - self.value -= delta / (3600 * self.fill_hours_range[0]) - else: - self.value = self.raw - if self.value < self.fill_level: - self.query('hcd=1 hf=1') - state.fillstart = state.now - return self.precooling - self.query('hcd=1 hf=1') - return Retry - @status_code(BUSY) - def precooling(self, state): - delta = state.delta(1) - if self.raw > self.value: - self.value += delta / (60 * self.fill_minutes_range[0]) - elif self.raw < self.value: - self.value -= delta / (60 * self.fill_minutes_range[0]) - else: - self.value = self.raw - if self.value > self.full_level: - self.query('hcd=0 hf=0') - return self.watching - self.query('hcd=1 hf=1') - if state.now > state.fillstart + self.fill_delay * 60: - return self.filling - return Retry - - @status_code(BUSY) - def filling(self, state): - delta = state.delta(1) - if self.raw > self.value: - self.value += delta / (60 * self.fill_minutes_range[0]) - elif self.raw < self.value: - self.value += delta / (60 * self.fill_minutes_range[1]) - else: - self.value = self.raw - if self.value > self.full_level: - self.query('hcd=0 hf=0') - return self.watching - self.query('hcd=1 hf=1') - return Retry - - @Command() - def fill(self): - self.start_machine(self.precooling, fillstart=time.time()) - self.query('hcd=1 hf=1') - - @Command() - def stop(self): - pass +class HeFillValve(Valve): + _open_command = {'hcd': 1, 'hf': 1} + _close_command = {'hcd': 0, 'hf': 0} + _query_state = 'hv' +class N2FillValve(Valve): + _open_command = {'nc': 1} + _close_command = {'nc': 0} + _query_state = 'nv' + + +class AuxValve(Valve): + channel = Property('valve number', IntRange(1, 12)) + + def initModule(self): + self._open_command = {f'vc{self.channel}': 1} + self._close_command = {f'vc{self.channel}': 0} + self._query_state = f'v{self.channel}' + + +class N2TempSensor(Readable): + value = Parameter('LN2 T sensor', FloatRange(unit='K'), default=0) + + +class N2Level(CCU4Base, Pinata, Readable): + valve = Attached(Writable, mandatory=False) + lower = Attached(Readable, mandatory=False) + upper = Attached(Readable, mandatory=False) + + value = Parameter('vessel state', EnumType(empty=0, ok=1, full=2)) + status = Parameter(datatype=StatusType(Readable, 'BUSY')) + mode = Parameter('auto mode', EnumType(disabled=0, manual=1, auto=2), readonly=False) + + threshold = Parameter('threshold triggering start/stop filling', + FloatRange(unit='K'), readonly=False) + cool_delay = Parameter('max. minutes needed to cool the lower sensor', + FloatRange(unit='s'), readonly=False) + fill_timeout = Parameter('max. minutes needed to fill', + FloatRange(unit='s'), readonly=False) + names = Property('''names of attached modules + + configure members as empty strings to disable the creation + ''', + StructOf(valve=StringType(), lower=StringType(), upper=StringType()), + default={'valve': '$_valve', 'lower': '$_lower', 'upper': '$_upper'}) -class N2Sensor(HasIO, Readable): # conversion of the code from the CCU4 parameter 'ns' STATUS_MAP = { 0: (IDLE, 'sensor ok'), @@ -209,6 +189,283 @@ class N2Sensor(HasIO, Readable): 2: (ERROR, 'short circuit'), 3: (ERROR, 'upside down'), 4: (ERROR, 'sensor warm'), - 5: (ERROR, 'empty'), + 5: (WARN, 'empty'), } + def scanModules(self): + for modname, name in self.names.items(): + if name: + sensor_name = name.replace('$', self.name) + self.setProperty(modname, sensor_name) + yield sensor_name, { + 'cls': N2FillValve if modname == 'valve' else N2TempSensor, + 'description': f'LN2 {modname} T sensor'} + + def initialReads(self): + self.command(nav=1) # tell CCU4 to activate LN2 sensor readings + super().initialReads() + + def read_status(self): + auto, nstate = self.command('na', 'ns') + if not self.valve or not auto: + if self.mode == 'auto': + # no valve assigned + self.mode = 'manual' + if self.mode == 'disabled': + return DISABLED, '' + status = self.STATUS_MAP[nstate] + if status[0] // 100 != IDLE // 100: + return status + if self.mode == 'manual': + return IDLE, '' + vstatus = self.valve.status + if vstatus[0] // 100 == WARN // 100: + return ERROR, vstatus[1] + if vstatus[0] // 100 != IDLE // 100: + return vstatus + if self.valve.value: + return BUSY, 'filling' + return IDLE, 'watching' + + def read_value(self): + # read sensors + lower, upper = self.command('nl', 'nu') + if self.lower: + self.lower.value = lower + if self.upper: + self.upper.value = upper + if upper < self.threshold: + return 'full' + if lower < self.threshold: + return 'ok' + return 'empty' + + def write_mode(self, mode): + if mode == 'auto': + if self.isBusy(): + return mode + # set to watching + self.command(nc=3) + else: + # set to off + self.command(nc=2) + return mode + + def read_threshold(self): + value, self.cool_delay, self.fill_timeout = self.command('nth', 'ntc', 'ntm') + return value + + def write_threshold(self, value): + return self.command(nth=value) + + def write_cool_delay(self, value): + return self.command(ntc=value) + + def write_fill_timeout(self, value): + return self.command(ntm=value) + + @Command() + def fill(self): + self.mode = 'auto' + self.io.write(nc=1) + + @Command() + def stop(self): + if self.mode == 'auto': + # set to watching + self.command(nc=3) + else: + # set to off + self.io.write(nc=0) + + +class FlowPressure(CCU4Base, Readable): + value = Parameter(unit='mbar') + mbar_offset = Parameter(unit='mbar', default=0.8, readonly=False) + pollinterval = Parameter(default=0.25) + + def read_value(self): + return self.filter(self.command('f')) - self.mbar_offset + + +class NeedleValve(HasStates, CCU4Base, Drivable): + flow = Attached(Readable, mandatory=False) + flow_pressure = Attached(Readable, mandatory=False) + + value = Parameter(unit='ln/min') + target = Parameter(unit='ln/min') + + lnm_per_mbar = Parameter(unit='ln/min/mbar', default=0.6, readonly=False) + use_pressure = Parameter('use flow from pressure', BoolType(), + default=False, readonly=False) + motor_state = Parameter('motor_state', + EnumType(idle=0, opening=1, closing=2, + opened=3, closed=5, no_motor=6)) + tolerance = Parameter('tolerance', FloatRange(0), value=0.25, readonly=False) + tolerance2 = Parameter('tolerance limit above 2 lnm', FloatRange(0), value=0.5, readonly=False) + prop = Parameter('proportional term', FloatRange(unit='s/lnm'), readonly=False) + deriv = Parameter('min progress time constant', FloatRange(unit='s'), + default=30, readonly=False) + settle = Parameter('time within tolerance before getting quiet', FloatRange(unit='s'), + default=30, readonly=False) + step_factor = Parameter('factor (no progress time) / (min step size)', FloatRange(), default=300) + control_active = Parameter('control active flag', BoolType(), readonly=False) + pollinterval = Parameter(default=1) + _ref_time = 0 + _ref_dif = 0 + _last_cycle = 0 + _last_progress = 0 + _step = 0 + + def initModule(self): + super().initModule() + if self.flow_pressure: + self.flow_pressure.addCallback('value', self.update_flow_pressure) + if self.flow: + self.flow.addCallback('value', self.update_flow) + self.write_tolerance(self.tolerance) + + def write_tolerance(self, tolerance): + if hasattr(self.flow_pressure, 'tolerance'): + self.flow_pressure.tolerance = tolerance / self.lnm_per_mbar + if hasattr(self.flow, 'tolerance'): + self.flow.tolerance = tolerance + + def read_use_pressure(self): + if self.flow_pressure: + if self.flow: + return self.use_pressure + return True + return False + + def update_flow(self, value): + if not self.use_pressure: + self.value = value + self.doPoll() + + def update_flow_pressure(self, value): + if self.use_pressure: + self.value = value * self.lnm_per_mbar + self.doPoll() + + def write_target(self, value): + self.start_machine(self.controlling, in_tol_time=0, + ref_time=0, ref_dif=0, prev_dif=0) + + @status_code(BUSY) + def unblock_from_open(self, state): + self.motor_state = self.command('fm') + if self.motor_state == 'opened': + self.command(mp=-60) + return Retry + if self.motor_state == 'closing': + return Retry + if self.motor_state == 'closed': + if self.value > max(1, self.target): + return Retry + state.flow_before = self.value + state.wiggle = 1 + state.start_wiggle = state.now + self.command(mp=60) + return self.unblock_open + return self.approaching + + @status_code(BUSY) + def unblock_open(self, state): + self.motor_state = self.command('fm') + if self.value < state.flow_before: + state.flow_before_open = self.value + elif self.value > state.flow_before + 1: + state.wiggle = -state.wiggle / 2 + self.command(mp=state.wiggle) + state.start_wiggle = state.now + return self.unblock_close + if self.motor_state == 'opening': + return Retry + if self.motor_state == 'idle': + self.command(mp=state.wiggle) + return Retry + if self.motor_state == 'opened': + if state.now < state.start_wiggle + 20: + return Retry + return self.final_status(ERROR, 'can not open') + return self.controlling + + @status_code(BUSY) + def unblock_close(self, state): + self.motor_state = self.command('fm') + if self.value > state.flow_before: + state.flow_before_open = self.value + elif self.value < state.flow_before - 1: + if state.wiggle < self.prop * 2: + return self.final_status(IDLE, '') + state.wiggle = -state.wiggle / 2 + self.command(mp=state.wiggle) + state.start_wiggle = state.now + return self.unblock_open + if self.motor_state == 'closing': + return Retry + if self.motor_state == 'idle': + self.command(mp=state.wiggle) + return Retry + if self.motor_state == 'closed': + if state.now < state.start_wiggle + 20: + return Retry + return self.final_status(ERROR, 'can not close') + return self.final_status(WARN, 'unblock interrupted') + + def _tolerance(self): + return min(self.tolerance * min(1, self.value / 2), self.tolerance2) + + @status_code(IDLE) + def at_target(self, state): + dif = self.target - self.value + if abs(dif) > self._tolerance(): + state.in_tol_time = 0 + return self.unstable + return Retry + + @status_code(IDLE, 'unstable') + def unstable(self, state): + return self.controlling(state) + + @status_code(BUSY) + def controlling(self, state): + delta = state.delta(0) + dif = self.target - self.value + difdif = dif - state.prev_dif + state.prev_dif = dif + self.motor_state = self.command('fm') + if self.motor_state == 'closed': + if dif < 0 or difdif < 0: + return Retry + return self.unblock_from_open + elif self.motor_state == 'opened': # trigger also when flow too high? + if dif > 0 or difdif > 0: + return Retry + self.command(mp=-60) + return self.unblock_from_open + + tolerance = self._tolerance() + if abs(dif) < tolerance: + state.in_tol_time += delta + if state.in_tol_time > self.settle: + return self.at_target + return Retry + expected_dif = state.ref_dif * math.exp((state.now - state.ref_time) / self.deriv) + if abs(dif) < expected_dif: + if abs(dif) < expected_dif / 1.25: + state.ref_time = state.now + state.ref_dif = abs(dif) * 1.25 + state.last_progress = state.now + return Retry # progress is fast enough + state.ref_time = state.now + state.ref_dif = abs(dif) + state.step += dif * delta * self.prop + if abs(state.step) < (state.now - state.last_progress) / self.step_factor: + # wait until step size is big enough + return Retry + self.command(mp=state.step) + return Retry + +