diff --git a/frappy_psi/ccu4.py b/frappy_psi/ccu4.py index 19bccf0..4fb4176 100644 --- a/frappy_psi/ccu4.py +++ b/frappy_psi/ccu4.py @@ -22,6 +22,7 @@ """drivers for CCU4, the cryostat control unit at SINQ""" import time import math +from frappy.lib.enum import Enum # the most common Frappy classes can be imported from frappy.core from frappy.core import HasIO, Parameter, Command, Readable, Writable, Drivable, \ Property, StringIO, BUSY, IDLE, WARN, ERROR, DISABLED, Attached @@ -32,6 +33,10 @@ from frappy.errors import CommunicationFailedError from frappy.states import HasStates, status_code, Retry +M = Enum(idle=0, opening=1, closing=2, opened=3, closed=5, no_motor=6) +A = Enum(disabled=0, manual=1, auto=2) + + class CCU4IO(StringIO): """communication with CCU4""" # for completeness: (not needed, as it is the default) @@ -43,25 +48,34 @@ class CCU4IO(StringIO): class CCU4Base(HasIO): ioClass = CCU4IO - def command(self, *args, **kwds): + def command(self, **kwds): """send a command and get the response - :param args: the name of the parameters to query - :param kwds: = for changing a parameter + :param kwds: = for changing a parameter = for querying a parameter :returns: the (new) values of the parameters """ - cmds = [f'{k}={v:g}' for k, v in kwds.items()] + list(args) + types = {} + cmds = [] + for key, value in kwds.items(): + if isinstance(value, type): + types[key] = value + cmds.append(key) + elif isinstance(value, str): + types[key] = str + cmds.append(f'{key}={value}') + else: + types[key] = float + cmds.append(f'{key}={value:g}') reply = self.io.communicate(' '.join(cmds)).split() - names = list(args) + list(kwds) - if len(reply) != len(names): + if len(reply) != len(types): raise CommunicationFailedError('number of reply items does not match') result = [] - for given, item in zip(names, reply): - name, txtvalue = item.split('=') + for (given, typ), res in zip(types.items(), reply): + name, txtvalue = res.split('=') if given != name: raise CommunicationFailedError('result keys do not match given keys') - result.append(float(txtvalue)) - if len(result) == 1: + result.append(typ(txtvalue)) + if len(kwds) == 1: return result[0] return result @@ -89,13 +103,13 @@ class HeLevel(CCU4Base, Readable): } def read_value(self): - return self.command('h') + return self.command(h=float) def read_status(self): - return self.STATUS_MAP[int(self.command('hsf'))] + return self.STATUS_MAP[int(self.command(hsf=int))] def read_sample_rate(self): - value, self.empty_length, self.full_length = self.command('hf', 'hem', 'hfu') + value, self.empty_length, self.full_length = self.command(hf=int, hem=float, hfu=float) return value def write_sample_rate(self, value): @@ -138,13 +152,13 @@ class Valve(CCU4Base, Writable): class HeFillValve(Valve): _open_command = {'hcd': 1, 'hf': 1} _close_command = {'hcd': 0, 'hf': 0} - _query_state = 'hv' + _query_state = {'hv': int} class N2FillValve(Valve): _open_command = {'nc': 1} _close_command = {'nc': 0} - _query_state = 'nv' + _query_state = {'nv': int} class AuxValve(Valve): @@ -153,7 +167,7 @@ class AuxValve(Valve): def initModule(self): self._open_command = {f'vc{self.channel}': 1} self._close_command = {f'vc{self.channel}': 0} - self._query_state = f'v{self.channel}' + self._query_state = {f'v{self.channel}': int} class N2TempSensor(Readable): @@ -167,7 +181,7 @@ class N2Level(CCU4Base, Pinata, Readable): value = Parameter('vessel state', EnumType(empty=0, ok=1, full=2)) status = Parameter(datatype=StatusType(Readable, 'BUSY')) - mode = Parameter('auto mode', EnumType(disabled=0, manual=1, auto=2), readonly=False) + mode = Parameter('auto mode', EnumType(A), readonly=False) threshold = Parameter('threshold triggering start/stop filling', FloatRange(unit='K'), readonly=False) @@ -206,17 +220,17 @@ class N2Level(CCU4Base, Pinata, Readable): super().initialReads() def read_status(self): - auto, nstate = self.command('na', 'ns') + auto, nstate = self.command(na=int, ns=int) if not self.valve or not auto: - if self.mode == 'auto': + if self.mode == A.auto: # no valve assigned - self.mode = 'manual' - if self.mode == 'disabled': + self.mode = A.manual + if self.mode == A.disabled: return DISABLED, '' status = self.STATUS_MAP[nstate] if status[0] // 100 != IDLE // 100: return status - if self.mode == 'manual': + if self.mode == A.manual: return IDLE, '' vstatus = self.valve.status if vstatus[0] // 100 == WARN // 100: @@ -229,7 +243,7 @@ class N2Level(CCU4Base, Pinata, Readable): def read_value(self): # read sensors - lower, upper = self.command('nl', 'nu') + lower, upper = self.command(nl=float, nu=float) if self.lower: self.lower.value = lower if self.upper: @@ -241,7 +255,7 @@ class N2Level(CCU4Base, Pinata, Readable): return 'empty' def write_mode(self, mode): - if mode == 'auto': + if mode == A.auto: if self.isBusy(): return mode # set to watching @@ -252,7 +266,7 @@ class N2Level(CCU4Base, Pinata, Readable): return mode def read_threshold(self): - value, self.cool_delay, self.fill_timeout = self.command('nth', 'ntc', 'ntm') + value, self.cool_delay, self.fill_timeout = self.command(nth=float, ntc=float, ntm=float) return value def write_threshold(self, value): @@ -266,12 +280,12 @@ class N2Level(CCU4Base, Pinata, Readable): @Command() def fill(self): - self.mode = 'auto' + self.mode = A.auto self.io.write(nc=1) @Command() def stop(self): - if self.mode == 'auto': + if self.mode == A.auto: # set to watching self.command(nc=3) else: @@ -285,7 +299,7 @@ class FlowPressure(CCU4Base, Readable): pollinterval = Parameter(default=0.25) def read_value(self): - return self.filter(self.command('f')) - self.mbar_offset + return self.filter(self.command(f=float)) - self.mbar_offset class NeedleValve(HasStates, CCU4Base, Drivable): @@ -298,9 +312,7 @@ class NeedleValve(HasStates, CCU4Base, Drivable): lnm_per_mbar = Parameter(unit='ln/min/mbar', default=0.6, readonly=False) use_pressure = Parameter('use flow from pressure', BoolType(), default=False, readonly=False) - motor_state = Parameter('motor_state', - EnumType(idle=0, opening=1, closing=2, - opened=3, closed=5, no_motor=6)) + motor_state = Parameter('motor_state', EnumType(M)) tolerance = Parameter('tolerance', FloatRange(0), value=0.25, readonly=False) tolerance2 = Parameter('tolerance limit above 2 lnm', FloatRange(0), value=0.5, readonly=False) prop = Parameter('proportional term', FloatRange(unit='s/lnm'), readonly=False) @@ -341,12 +353,12 @@ class NeedleValve(HasStates, CCU4Base, Drivable): def update_flow(self, value): if not self.use_pressure: self.value = value - self.doPoll() + self.cycle_machine() def update_flow_pressure(self, value): if self.use_pressure: self.value = value * self.lnm_per_mbar - self.doPoll() + self.cycle_machine() def write_target(self, value): self.start_machine(self.controlling, in_tol_time=0, @@ -354,7 +366,7 @@ class NeedleValve(HasStates, CCU4Base, Drivable): @status_code(BUSY) def unblock_from_open(self, state): - self.motor_state = self.command('fm') + self.motor_state = self.command(fm=int) if self.motor_state == 'opened': self.command(mp=-60) return Retry @@ -372,7 +384,7 @@ class NeedleValve(HasStates, CCU4Base, Drivable): @status_code(BUSY) def unblock_open(self, state): - self.motor_state = self.command('fm') + self.motor_state = self.command(fm=int) if self.value < state.flow_before: state.flow_before_open = self.value elif self.value > state.flow_before + 1: @@ -393,7 +405,7 @@ class NeedleValve(HasStates, CCU4Base, Drivable): @status_code(BUSY) def unblock_close(self, state): - self.motor_state = self.command('fm') + self.motor_state = self.command(fm=int) if self.value > state.flow_before: state.flow_before_open = self.value elif self.value < state.flow_before - 1: @@ -435,7 +447,7 @@ class NeedleValve(HasStates, CCU4Base, Drivable): dif = self.target - self.value difdif = dif - state.prev_dif state.prev_dif = dif - self.motor_state = self.command('fm') + self.motor_state = self.command(fm=int) if self.motor_state == 'closed': if dif < 0 or difdif < 0: return Retry @@ -467,5 +479,3 @@ class NeedleValve(HasStates, CCU4Base, Drivable): return Retry self.command(mp=state.step) return Retry - -