[WIP] work on CCU4
- nv control Change-Id: I34e634d1e167f0a60d55b014c7117c9274d83c50
This commit is contained in:
parent
09f4f1d192
commit
46f815af38
@ -21,10 +21,14 @@
|
||||
|
||||
"""drivers for CCU4, the cryostat control unit at SINQ"""
|
||||
import time
|
||||
import math
|
||||
# the most common Frappy classes can be imported from frappy.core
|
||||
from frappy.core import EnumType, FloatRange, TupleOf, \
|
||||
HasIO, Parameter, Command, Readable, StringIO, StatusType, \
|
||||
BUSY, IDLE, ERROR, DISABLED
|
||||
from frappy.core import HasIO, Parameter, Command, Readable, Writable, Drivable, \
|
||||
Property, StringIO, BUSY, IDLE, WARN, ERROR, DISABLED, Attached
|
||||
from frappy.datatypes import BoolType, EnumType, FloatRange, StructOf, \
|
||||
StatusType, IntRange, StringType, TupleOf
|
||||
from frappy.dynamic import Pinata
|
||||
from frappy.errors import CommunicationFailedError
|
||||
from frappy.states import HasStates, status_code, Retry
|
||||
|
||||
|
||||
@ -36,16 +40,35 @@ class CCU4IO(StringIO):
|
||||
identification = [('cid', r'CCU4.*')]
|
||||
|
||||
|
||||
# inheriting HasIO allows us to use the communicate method for talking with the hardware
|
||||
# Readable as a base class defines the value and status parameters
|
||||
class HeLevel(HasIO, Readable):
|
||||
"""He Level channel of CCU4"""
|
||||
|
||||
# define the communication class to create the IO module
|
||||
class CCU4Base(HasIO):
|
||||
ioClass = CCU4IO
|
||||
|
||||
# define or alter the parameters
|
||||
# as Readable.value exists already, we give only the modified property 'unit'
|
||||
def command(self, *args, **kwds):
|
||||
"""send a command and get the response
|
||||
|
||||
:param args: the name of the parameters to query
|
||||
:param kwds: <parameter>=<value> for changing a parameter
|
||||
:returns: the (new) values of the parameters
|
||||
"""
|
||||
cmds = [f'{k}={v:g}' for k, v in kwds.items()] + list(args)
|
||||
reply = self.io.communicate(' '.join(cmds)).split()
|
||||
names = list(args) + list(kwds)
|
||||
if len(reply) != len(names):
|
||||
raise CommunicationFailedError('number of reply items does not match')
|
||||
result = []
|
||||
for given, item in zip(names, reply):
|
||||
name, txtvalue = item.split('=')
|
||||
if given != name:
|
||||
raise CommunicationFailedError('result keys do not match given keys')
|
||||
result.append(float(txtvalue))
|
||||
if len(result) == 1:
|
||||
return result[0]
|
||||
return result
|
||||
|
||||
|
||||
class HeLevel(CCU4Base, Readable):
|
||||
"""He Level channel of CCU4"""
|
||||
|
||||
value = Parameter(unit='%')
|
||||
empty_length = Parameter('warm length when empty', FloatRange(0, 2000, unit='mm'),
|
||||
readonly=False)
|
||||
@ -65,143 +88,100 @@ class HeLevel(HasIO, Readable):
|
||||
5: (DISABLED, 'disabled'),
|
||||
}
|
||||
|
||||
def query(self, cmd):
|
||||
"""send a query and get the response
|
||||
|
||||
:param cmd: the name of the parameter to query or '<parameter>=<value'
|
||||
for changing a parameter
|
||||
:returns: the (new) value of the parameter
|
||||
"""
|
||||
name, txtvalue = self.communicate(cmd).split('=')
|
||||
assert name == cmd.split('=')[0] # check that we got a reply to our command
|
||||
return float(txtvalue)
|
||||
|
||||
def read_value(self):
|
||||
return self.query('h')
|
||||
return self.command('h')
|
||||
|
||||
def read_status(self):
|
||||
return self.STATUS_MAP[int(self.query('hsf'))]
|
||||
|
||||
def read_empty_length(self):
|
||||
return self.query('hem')
|
||||
|
||||
def write_empty_length(self, value):
|
||||
return self.query(f'hem={value:g}')
|
||||
|
||||
def read_full_length(self):
|
||||
return self.query('hfu')
|
||||
|
||||
def write_full_length(self, value):
|
||||
return self.query(f'hfu={value:g}')
|
||||
return self.STATUS_MAP[int(self.command('hsf'))]
|
||||
|
||||
def read_sample_rate(self):
|
||||
return self.query('hf')
|
||||
value, self.empty_length, self.full_length = self.command('hf', 'hem', 'hfu')
|
||||
return value
|
||||
|
||||
def write_sample_rate(self, value):
|
||||
return self.query(f'hf={int(value)}')
|
||||
return self.command(hf=value)
|
||||
|
||||
def write_empty_length(self, value):
|
||||
return self.command(hem=value)
|
||||
|
||||
def write_full_length(self, value):
|
||||
return self.command(hfu=value)
|
||||
|
||||
|
||||
class Valve(CCU4Base, Writable):
|
||||
value = Parameter('relay state', BoolType())
|
||||
target = Parameter('relay target', BoolType())
|
||||
ioClass = CCU4IO
|
||||
STATE_MAP = {0: (0, (IDLE, 'off')),
|
||||
1: (1, (IDLE, 'on')),
|
||||
2: (0, (ERROR, 'no valve')),
|
||||
3: (0, (WARN, 'timeout')), # timeout in filling process (takes too long to fill)
|
||||
4: (0, (WARN, 'timeout1')), # timeout in filling process (takes too long to start)
|
||||
5: (1, (IDLE, 'boost')),
|
||||
}
|
||||
_open_command = None
|
||||
_close_command = None
|
||||
_query_state = None
|
||||
|
||||
class HeLevelAuto(HasStates, HeLevel):
|
||||
fill_level = Parameter('low threshold triggering start filling',
|
||||
FloatRange(unit='%'), readonly=False)
|
||||
full_level = Parameter('high threshold triggering stop filling',
|
||||
FloatRange(unit='%'), readonly=False)
|
||||
raw = Parameter('unsmoothed level', FloatRange(unit='%'))
|
||||
fill_minutes_range = Parameter('range of possible fill rate',
|
||||
TupleOf(FloatRange(unit='min'), FloatRange(unit='min')),
|
||||
readonly=False)
|
||||
hold_hours_range = Parameter('range of possible consumption rate',
|
||||
TupleOf(FloatRange(unit='h'), FloatRange(unit='h')),
|
||||
readonly=False)
|
||||
fill_delay = Parameter('delay for cooling the transfer line',
|
||||
FloatRange(unit='min'), readonly=False)
|
||||
status = Parameter(datatype=StatusType(HeLevel, 'BUSY'))
|
||||
|
||||
_filling = False
|
||||
_fillstart = 0
|
||||
_last_read = 0
|
||||
|
||||
def doPoll(self):
|
||||
super().doPoll()
|
||||
if self._filling:
|
||||
if self._filling == 1 and
|
||||
if self.value
|
||||
self.query('hcd=1')
|
||||
|
||||
def read_value(self):
|
||||
self.raw = super().read_value()
|
||||
if not self._state_machine.is_active:
|
||||
return self.raw
|
||||
return self.value
|
||||
def write_target(self, target):
|
||||
if target:
|
||||
self.command(**self._open_command)
|
||||
else:
|
||||
self.command(**self._close_command)
|
||||
|
||||
def read_status(self):
|
||||
status = HeLevel.read_status(self)
|
||||
if status[0] == IDLE:
|
||||
return HasStates.read_status(self)
|
||||
self.stop_machine(status)
|
||||
state = self.command(self._query_state)
|
||||
self.value, status = self.STATE_MAP[state]
|
||||
return status
|
||||
|
||||
@status_code(BUSY)
|
||||
def watching(self, state):
|
||||
delta = state.delta(10)
|
||||
if self.raw > self.value:
|
||||
self.value -= delta / (3600 * self.fill_hours_range[1])
|
||||
elif self.raw < self.value:
|
||||
self.value -= delta / (3600 * self.fill_hours_range[0])
|
||||
else:
|
||||
self.value = self.raw
|
||||
if self.value < self.fill_level:
|
||||
self.query('hcd=1 hf=1')
|
||||
state.fillstart = state.now
|
||||
return self.precooling
|
||||
self.query('hcd=1 hf=1')
|
||||
return Retry
|
||||
|
||||
@status_code(BUSY)
|
||||
def precooling(self, state):
|
||||
delta = state.delta(1)
|
||||
if self.raw > self.value:
|
||||
self.value += delta / (60 * self.fill_minutes_range[0])
|
||||
elif self.raw < self.value:
|
||||
self.value -= delta / (60 * self.fill_minutes_range[0])
|
||||
else:
|
||||
self.value = self.raw
|
||||
if self.value > self.full_level:
|
||||
self.query('hcd=0 hf=0')
|
||||
return self.watching
|
||||
self.query('hcd=1 hf=1')
|
||||
if state.now > state.fillstart + self.fill_delay * 60:
|
||||
return self.filling
|
||||
return Retry
|
||||
|
||||
@status_code(BUSY)
|
||||
def filling(self, state):
|
||||
delta = state.delta(1)
|
||||
if self.raw > self.value:
|
||||
self.value += delta / (60 * self.fill_minutes_range[0])
|
||||
elif self.raw < self.value:
|
||||
self.value += delta / (60 * self.fill_minutes_range[1])
|
||||
else:
|
||||
self.value = self.raw
|
||||
if self.value > self.full_level:
|
||||
self.query('hcd=0 hf=0')
|
||||
return self.watching
|
||||
self.query('hcd=1 hf=1')
|
||||
return Retry
|
||||
|
||||
@Command()
|
||||
def fill(self):
|
||||
self.start_machine(self.precooling, fillstart=time.time())
|
||||
self.query('hcd=1 hf=1')
|
||||
|
||||
@Command()
|
||||
def stop(self):
|
||||
pass
|
||||
class HeFillValve(Valve):
|
||||
_open_command = {'hcd': 1, 'hf': 1}
|
||||
_close_command = {'hcd': 0, 'hf': 0}
|
||||
_query_state = 'hv'
|
||||
|
||||
|
||||
class N2FillValve(Valve):
|
||||
_open_command = {'nc': 1}
|
||||
_close_command = {'nc': 0}
|
||||
_query_state = 'nv'
|
||||
|
||||
|
||||
class AuxValve(Valve):
|
||||
channel = Property('valve number', IntRange(1, 12))
|
||||
|
||||
def initModule(self):
|
||||
self._open_command = {f'vc{self.channel}': 1}
|
||||
self._close_command = {f'vc{self.channel}': 0}
|
||||
self._query_state = f'v{self.channel}'
|
||||
|
||||
|
||||
class N2TempSensor(Readable):
|
||||
value = Parameter('LN2 T sensor', FloatRange(unit='K'), default=0)
|
||||
|
||||
|
||||
class N2Level(CCU4Base, Pinata, Readable):
|
||||
valve = Attached(Writable, mandatory=False)
|
||||
lower = Attached(Readable, mandatory=False)
|
||||
upper = Attached(Readable, mandatory=False)
|
||||
|
||||
value = Parameter('vessel state', EnumType(empty=0, ok=1, full=2))
|
||||
status = Parameter(datatype=StatusType(Readable, 'BUSY'))
|
||||
mode = Parameter('auto mode', EnumType(disabled=0, manual=1, auto=2), readonly=False)
|
||||
|
||||
threshold = Parameter('threshold triggering start/stop filling',
|
||||
FloatRange(unit='K'), readonly=False)
|
||||
cool_delay = Parameter('max. minutes needed to cool the lower sensor',
|
||||
FloatRange(unit='s'), readonly=False)
|
||||
fill_timeout = Parameter('max. minutes needed to fill',
|
||||
FloatRange(unit='s'), readonly=False)
|
||||
names = Property('''names of attached modules
|
||||
|
||||
configure members as empty strings to disable the creation
|
||||
''',
|
||||
StructOf(valve=StringType(), lower=StringType(), upper=StringType()),
|
||||
default={'valve': '$_valve', 'lower': '$_lower', 'upper': '$_upper'})
|
||||
|
||||
class N2Sensor(HasIO, Readable):
|
||||
# conversion of the code from the CCU4 parameter 'ns'
|
||||
STATUS_MAP = {
|
||||
0: (IDLE, 'sensor ok'),
|
||||
@ -209,6 +189,283 @@ class N2Sensor(HasIO, Readable):
|
||||
2: (ERROR, 'short circuit'),
|
||||
3: (ERROR, 'upside down'),
|
||||
4: (ERROR, 'sensor warm'),
|
||||
5: (ERROR, 'empty'),
|
||||
5: (WARN, 'empty'),
|
||||
}
|
||||
|
||||
def scanModules(self):
|
||||
for modname, name in self.names.items():
|
||||
if name:
|
||||
sensor_name = name.replace('$', self.name)
|
||||
self.setProperty(modname, sensor_name)
|
||||
yield sensor_name, {
|
||||
'cls': N2FillValve if modname == 'valve' else N2TempSensor,
|
||||
'description': f'LN2 {modname} T sensor'}
|
||||
|
||||
def initialReads(self):
|
||||
self.command(nav=1) # tell CCU4 to activate LN2 sensor readings
|
||||
super().initialReads()
|
||||
|
||||
def read_status(self):
|
||||
auto, nstate = self.command('na', 'ns')
|
||||
if not self.valve or not auto:
|
||||
if self.mode == 'auto':
|
||||
# no valve assigned
|
||||
self.mode = 'manual'
|
||||
if self.mode == 'disabled':
|
||||
return DISABLED, ''
|
||||
status = self.STATUS_MAP[nstate]
|
||||
if status[0] // 100 != IDLE // 100:
|
||||
return status
|
||||
if self.mode == 'manual':
|
||||
return IDLE, ''
|
||||
vstatus = self.valve.status
|
||||
if vstatus[0] // 100 == WARN // 100:
|
||||
return ERROR, vstatus[1]
|
||||
if vstatus[0] // 100 != IDLE // 100:
|
||||
return vstatus
|
||||
if self.valve.value:
|
||||
return BUSY, 'filling'
|
||||
return IDLE, 'watching'
|
||||
|
||||
def read_value(self):
|
||||
# read sensors
|
||||
lower, upper = self.command('nl', 'nu')
|
||||
if self.lower:
|
||||
self.lower.value = lower
|
||||
if self.upper:
|
||||
self.upper.value = upper
|
||||
if upper < self.threshold:
|
||||
return 'full'
|
||||
if lower < self.threshold:
|
||||
return 'ok'
|
||||
return 'empty'
|
||||
|
||||
def write_mode(self, mode):
|
||||
if mode == 'auto':
|
||||
if self.isBusy():
|
||||
return mode
|
||||
# set to watching
|
||||
self.command(nc=3)
|
||||
else:
|
||||
# set to off
|
||||
self.command(nc=2)
|
||||
return mode
|
||||
|
||||
def read_threshold(self):
|
||||
value, self.cool_delay, self.fill_timeout = self.command('nth', 'ntc', 'ntm')
|
||||
return value
|
||||
|
||||
def write_threshold(self, value):
|
||||
return self.command(nth=value)
|
||||
|
||||
def write_cool_delay(self, value):
|
||||
return self.command(ntc=value)
|
||||
|
||||
def write_fill_timeout(self, value):
|
||||
return self.command(ntm=value)
|
||||
|
||||
@Command()
|
||||
def fill(self):
|
||||
self.mode = 'auto'
|
||||
self.io.write(nc=1)
|
||||
|
||||
@Command()
|
||||
def stop(self):
|
||||
if self.mode == 'auto':
|
||||
# set to watching
|
||||
self.command(nc=3)
|
||||
else:
|
||||
# set to off
|
||||
self.io.write(nc=0)
|
||||
|
||||
|
||||
class FlowPressure(CCU4Base, Readable):
|
||||
value = Parameter(unit='mbar')
|
||||
mbar_offset = Parameter(unit='mbar', default=0.8, readonly=False)
|
||||
pollinterval = Parameter(default=0.25)
|
||||
|
||||
def read_value(self):
|
||||
return self.filter(self.command('f')) - self.mbar_offset
|
||||
|
||||
|
||||
class NeedleValve(HasStates, CCU4Base, Drivable):
|
||||
flow = Attached(Readable, mandatory=False)
|
||||
flow_pressure = Attached(Readable, mandatory=False)
|
||||
|
||||
value = Parameter(unit='ln/min')
|
||||
target = Parameter(unit='ln/min')
|
||||
|
||||
lnm_per_mbar = Parameter(unit='ln/min/mbar', default=0.6, readonly=False)
|
||||
use_pressure = Parameter('use flow from pressure', BoolType(),
|
||||
default=False, readonly=False)
|
||||
motor_state = Parameter('motor_state',
|
||||
EnumType(idle=0, opening=1, closing=2,
|
||||
opened=3, closed=5, no_motor=6))
|
||||
tolerance = Parameter('tolerance', FloatRange(0), value=0.25, readonly=False)
|
||||
tolerance2 = Parameter('tolerance limit above 2 lnm', FloatRange(0), value=0.5, readonly=False)
|
||||
prop = Parameter('proportional term', FloatRange(unit='s/lnm'), readonly=False)
|
||||
deriv = Parameter('min progress time constant', FloatRange(unit='s'),
|
||||
default=30, readonly=False)
|
||||
settle = Parameter('time within tolerance before getting quiet', FloatRange(unit='s'),
|
||||
default=30, readonly=False)
|
||||
step_factor = Parameter('factor (no progress time) / (min step size)', FloatRange(), default=300)
|
||||
control_active = Parameter('control active flag', BoolType(), readonly=False)
|
||||
pollinterval = Parameter(default=1)
|
||||
_ref_time = 0
|
||||
_ref_dif = 0
|
||||
_last_cycle = 0
|
||||
_last_progress = 0
|
||||
_step = 0
|
||||
|
||||
def initModule(self):
|
||||
super().initModule()
|
||||
if self.flow_pressure:
|
||||
self.flow_pressure.addCallback('value', self.update_flow_pressure)
|
||||
if self.flow:
|
||||
self.flow.addCallback('value', self.update_flow)
|
||||
self.write_tolerance(self.tolerance)
|
||||
|
||||
def write_tolerance(self, tolerance):
|
||||
if hasattr(self.flow_pressure, 'tolerance'):
|
||||
self.flow_pressure.tolerance = tolerance / self.lnm_per_mbar
|
||||
if hasattr(self.flow, 'tolerance'):
|
||||
self.flow.tolerance = tolerance
|
||||
|
||||
def read_use_pressure(self):
|
||||
if self.flow_pressure:
|
||||
if self.flow:
|
||||
return self.use_pressure
|
||||
return True
|
||||
return False
|
||||
|
||||
def update_flow(self, value):
|
||||
if not self.use_pressure:
|
||||
self.value = value
|
||||
self.doPoll()
|
||||
|
||||
def update_flow_pressure(self, value):
|
||||
if self.use_pressure:
|
||||
self.value = value * self.lnm_per_mbar
|
||||
self.doPoll()
|
||||
|
||||
def write_target(self, value):
|
||||
self.start_machine(self.controlling, in_tol_time=0,
|
||||
ref_time=0, ref_dif=0, prev_dif=0)
|
||||
|
||||
@status_code(BUSY)
|
||||
def unblock_from_open(self, state):
|
||||
self.motor_state = self.command('fm')
|
||||
if self.motor_state == 'opened':
|
||||
self.command(mp=-60)
|
||||
return Retry
|
||||
if self.motor_state == 'closing':
|
||||
return Retry
|
||||
if self.motor_state == 'closed':
|
||||
if self.value > max(1, self.target):
|
||||
return Retry
|
||||
state.flow_before = self.value
|
||||
state.wiggle = 1
|
||||
state.start_wiggle = state.now
|
||||
self.command(mp=60)
|
||||
return self.unblock_open
|
||||
return self.approaching
|
||||
|
||||
@status_code(BUSY)
|
||||
def unblock_open(self, state):
|
||||
self.motor_state = self.command('fm')
|
||||
if self.value < state.flow_before:
|
||||
state.flow_before_open = self.value
|
||||
elif self.value > state.flow_before + 1:
|
||||
state.wiggle = -state.wiggle / 2
|
||||
self.command(mp=state.wiggle)
|
||||
state.start_wiggle = state.now
|
||||
return self.unblock_close
|
||||
if self.motor_state == 'opening':
|
||||
return Retry
|
||||
if self.motor_state == 'idle':
|
||||
self.command(mp=state.wiggle)
|
||||
return Retry
|
||||
if self.motor_state == 'opened':
|
||||
if state.now < state.start_wiggle + 20:
|
||||
return Retry
|
||||
return self.final_status(ERROR, 'can not open')
|
||||
return self.controlling
|
||||
|
||||
@status_code(BUSY)
|
||||
def unblock_close(self, state):
|
||||
self.motor_state = self.command('fm')
|
||||
if self.value > state.flow_before:
|
||||
state.flow_before_open = self.value
|
||||
elif self.value < state.flow_before - 1:
|
||||
if state.wiggle < self.prop * 2:
|
||||
return self.final_status(IDLE, '')
|
||||
state.wiggle = -state.wiggle / 2
|
||||
self.command(mp=state.wiggle)
|
||||
state.start_wiggle = state.now
|
||||
return self.unblock_open
|
||||
if self.motor_state == 'closing':
|
||||
return Retry
|
||||
if self.motor_state == 'idle':
|
||||
self.command(mp=state.wiggle)
|
||||
return Retry
|
||||
if self.motor_state == 'closed':
|
||||
if state.now < state.start_wiggle + 20:
|
||||
return Retry
|
||||
return self.final_status(ERROR, 'can not close')
|
||||
return self.final_status(WARN, 'unblock interrupted')
|
||||
|
||||
def _tolerance(self):
|
||||
return min(self.tolerance * min(1, self.value / 2), self.tolerance2)
|
||||
|
||||
@status_code(IDLE)
|
||||
def at_target(self, state):
|
||||
dif = self.target - self.value
|
||||
if abs(dif) > self._tolerance():
|
||||
state.in_tol_time = 0
|
||||
return self.unstable
|
||||
return Retry
|
||||
|
||||
@status_code(IDLE, 'unstable')
|
||||
def unstable(self, state):
|
||||
return self.controlling(state)
|
||||
|
||||
@status_code(BUSY)
|
||||
def controlling(self, state):
|
||||
delta = state.delta(0)
|
||||
dif = self.target - self.value
|
||||
difdif = dif - state.prev_dif
|
||||
state.prev_dif = dif
|
||||
self.motor_state = self.command('fm')
|
||||
if self.motor_state == 'closed':
|
||||
if dif < 0 or difdif < 0:
|
||||
return Retry
|
||||
return self.unblock_from_open
|
||||
elif self.motor_state == 'opened': # trigger also when flow too high?
|
||||
if dif > 0 or difdif > 0:
|
||||
return Retry
|
||||
self.command(mp=-60)
|
||||
return self.unblock_from_open
|
||||
|
||||
tolerance = self._tolerance()
|
||||
if abs(dif) < tolerance:
|
||||
state.in_tol_time += delta
|
||||
if state.in_tol_time > self.settle:
|
||||
return self.at_target
|
||||
return Retry
|
||||
expected_dif = state.ref_dif * math.exp((state.now - state.ref_time) / self.deriv)
|
||||
if abs(dif) < expected_dif:
|
||||
if abs(dif) < expected_dif / 1.25:
|
||||
state.ref_time = state.now
|
||||
state.ref_dif = abs(dif) * 1.25
|
||||
state.last_progress = state.now
|
||||
return Retry # progress is fast enough
|
||||
state.ref_time = state.now
|
||||
state.ref_dif = abs(dif)
|
||||
state.step += dif * delta * self.prop
|
||||
if abs(state.step) < (state.now - state.last_progress) / self.step_factor:
|
||||
# wait until step size is big enough
|
||||
return Retry
|
||||
self.command(mp=state.step)
|
||||
return Retry
|
||||
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user