frappy_psi.ccu4: some smaller updates

Change-Id: I128ac57aad951fd8ad3bdf663c69c85644063645
This commit is contained in:
zolliker 2024-12-18 15:40:05 +01:00
parent 2a617fbaf0
commit 0b5b40cfba

View File

@ -22,6 +22,7 @@
"""drivers for CCU4, the cryostat control unit at SINQ"""
import time
import math
from frappy.lib.enum import Enum
# the most common Frappy classes can be imported from frappy.core
from frappy.core import HasIO, Parameter, Command, Readable, Writable, Drivable, \
Property, StringIO, BUSY, IDLE, WARN, ERROR, DISABLED, Attached
@ -32,6 +33,10 @@ from frappy.errors import CommunicationFailedError
from frappy.states import HasStates, status_code, Retry
M = Enum(idle=0, opening=1, closing=2, opened=3, closed=5, no_motor=6)
A = Enum(disabled=0, manual=1, auto=2)
class CCU4IO(StringIO):
"""communication with CCU4"""
# for completeness: (not needed, as it is the default)
@ -43,25 +48,34 @@ class CCU4IO(StringIO):
class CCU4Base(HasIO):
ioClass = CCU4IO
def command(self, *args, **kwds):
def command(self, **kwds):
"""send a command and get the response
:param args: the name of the parameters to query
:param kwds: <parameter>=<value> for changing a parameter
:param kwds: <parameter>=<value> for changing a parameter <parameter>=<type> for querying a parameter
:returns: the (new) values of the parameters
"""
cmds = [f'{k}={v:g}' for k, v in kwds.items()] + list(args)
types = {}
cmds = []
for key, value in kwds.items():
if isinstance(value, type):
types[key] = value
cmds.append(key)
elif isinstance(value, str):
types[key] = str
cmds.append(f'{key}={value}')
else:
types[key] = float
cmds.append(f'{key}={value:g}')
reply = self.io.communicate(' '.join(cmds)).split()
names = list(args) + list(kwds)
if len(reply) != len(names):
if len(reply) != len(types):
raise CommunicationFailedError('number of reply items does not match')
result = []
for given, item in zip(names, reply):
name, txtvalue = item.split('=')
for (given, typ), res in zip(types.items(), reply):
name, txtvalue = res.split('=')
if given != name:
raise CommunicationFailedError('result keys do not match given keys')
result.append(float(txtvalue))
if len(result) == 1:
result.append(typ(txtvalue))
if len(kwds) == 1:
return result[0]
return result
@ -89,13 +103,13 @@ class HeLevel(CCU4Base, Readable):
}
def read_value(self):
return self.command('h')
return self.command(h=float)
def read_status(self):
return self.STATUS_MAP[int(self.command('hsf'))]
return self.STATUS_MAP[int(self.command(hsf=int))]
def read_sample_rate(self):
value, self.empty_length, self.full_length = self.command('hf', 'hem', 'hfu')
value, self.empty_length, self.full_length = self.command(hf=int, hem=float, hfu=float)
return value
def write_sample_rate(self, value):
@ -138,13 +152,13 @@ class Valve(CCU4Base, Writable):
class HeFillValve(Valve):
_open_command = {'hcd': 1, 'hf': 1}
_close_command = {'hcd': 0, 'hf': 0}
_query_state = 'hv'
_query_state = {'hv': int}
class N2FillValve(Valve):
_open_command = {'nc': 1}
_close_command = {'nc': 0}
_query_state = 'nv'
_query_state = {'nv': int}
class AuxValve(Valve):
@ -153,7 +167,7 @@ class AuxValve(Valve):
def initModule(self):
self._open_command = {f'vc{self.channel}': 1}
self._close_command = {f'vc{self.channel}': 0}
self._query_state = f'v{self.channel}'
self._query_state = {f'v{self.channel}': int}
class N2TempSensor(Readable):
@ -167,7 +181,7 @@ class N2Level(CCU4Base, Pinata, Readable):
value = Parameter('vessel state', EnumType(empty=0, ok=1, full=2))
status = Parameter(datatype=StatusType(Readable, 'BUSY'))
mode = Parameter('auto mode', EnumType(disabled=0, manual=1, auto=2), readonly=False)
mode = Parameter('auto mode', EnumType(A), readonly=False)
threshold = Parameter('threshold triggering start/stop filling',
FloatRange(unit='K'), readonly=False)
@ -206,17 +220,17 @@ class N2Level(CCU4Base, Pinata, Readable):
super().initialReads()
def read_status(self):
auto, nstate = self.command('na', 'ns')
auto, nstate = self.command(na=int, ns=int)
if not self.valve or not auto:
if self.mode == 'auto':
if self.mode == A.auto:
# no valve assigned
self.mode = 'manual'
if self.mode == 'disabled':
self.mode = A.manual
if self.mode == A.disabled:
return DISABLED, ''
status = self.STATUS_MAP[nstate]
if status[0] // 100 != IDLE // 100:
return status
if self.mode == 'manual':
if self.mode == A.manual:
return IDLE, ''
vstatus = self.valve.status
if vstatus[0] // 100 == WARN // 100:
@ -229,7 +243,7 @@ class N2Level(CCU4Base, Pinata, Readable):
def read_value(self):
# read sensors
lower, upper = self.command('nl', 'nu')
lower, upper = self.command(nl=float, nu=float)
if self.lower:
self.lower.value = lower
if self.upper:
@ -241,7 +255,7 @@ class N2Level(CCU4Base, Pinata, Readable):
return 'empty'
def write_mode(self, mode):
if mode == 'auto':
if mode == A.auto:
if self.isBusy():
return mode
# set to watching
@ -252,7 +266,7 @@ class N2Level(CCU4Base, Pinata, Readable):
return mode
def read_threshold(self):
value, self.cool_delay, self.fill_timeout = self.command('nth', 'ntc', 'ntm')
value, self.cool_delay, self.fill_timeout = self.command(nth=float, ntc=float, ntm=float)
return value
def write_threshold(self, value):
@ -266,12 +280,12 @@ class N2Level(CCU4Base, Pinata, Readable):
@Command()
def fill(self):
self.mode = 'auto'
self.mode = A.auto
self.io.write(nc=1)
@Command()
def stop(self):
if self.mode == 'auto':
if self.mode == A.auto:
# set to watching
self.command(nc=3)
else:
@ -285,7 +299,7 @@ class FlowPressure(CCU4Base, Readable):
pollinterval = Parameter(default=0.25)
def read_value(self):
return self.filter(self.command('f')) - self.mbar_offset
return self.filter(self.command(f=float)) - self.mbar_offset
class NeedleValve(HasStates, CCU4Base, Drivable):
@ -298,9 +312,7 @@ class NeedleValve(HasStates, CCU4Base, Drivable):
lnm_per_mbar = Parameter(unit='ln/min/mbar', default=0.6, readonly=False)
use_pressure = Parameter('use flow from pressure', BoolType(),
default=False, readonly=False)
motor_state = Parameter('motor_state',
EnumType(idle=0, opening=1, closing=2,
opened=3, closed=5, no_motor=6))
motor_state = Parameter('motor_state', EnumType(M))
tolerance = Parameter('tolerance', FloatRange(0), value=0.25, readonly=False)
tolerance2 = Parameter('tolerance limit above 2 lnm', FloatRange(0), value=0.5, readonly=False)
prop = Parameter('proportional term', FloatRange(unit='s/lnm'), readonly=False)
@ -341,12 +353,12 @@ class NeedleValve(HasStates, CCU4Base, Drivable):
def update_flow(self, value):
if not self.use_pressure:
self.value = value
self.doPoll()
self.cycle_machine()
def update_flow_pressure(self, value):
if self.use_pressure:
self.value = value * self.lnm_per_mbar
self.doPoll()
self.cycle_machine()
def write_target(self, value):
self.start_machine(self.controlling, in_tol_time=0,
@ -354,7 +366,7 @@ class NeedleValve(HasStates, CCU4Base, Drivable):
@status_code(BUSY)
def unblock_from_open(self, state):
self.motor_state = self.command('fm')
self.motor_state = self.command(fm=int)
if self.motor_state == 'opened':
self.command(mp=-60)
return Retry
@ -372,7 +384,7 @@ class NeedleValve(HasStates, CCU4Base, Drivable):
@status_code(BUSY)
def unblock_open(self, state):
self.motor_state = self.command('fm')
self.motor_state = self.command(fm=int)
if self.value < state.flow_before:
state.flow_before_open = self.value
elif self.value > state.flow_before + 1:
@ -393,7 +405,7 @@ class NeedleValve(HasStates, CCU4Base, Drivable):
@status_code(BUSY)
def unblock_close(self, state):
self.motor_state = self.command('fm')
self.motor_state = self.command(fm=int)
if self.value > state.flow_before:
state.flow_before_open = self.value
elif self.value < state.flow_before - 1:
@ -435,7 +447,7 @@ class NeedleValve(HasStates, CCU4Base, Drivable):
dif = self.target - self.value
difdif = dif - state.prev_dif
state.prev_dif = dif
self.motor_state = self.command('fm')
self.motor_state = self.command(fm=int)
if self.motor_state == 'closed':
if dif < 0 or difdif < 0:
return Retry
@ -467,5 +479,3 @@ class NeedleValve(HasStates, CCU4Base, Drivable):
return Retry
self.command(mp=state.step)
return Retry