215 lines
7.3 KiB
Python
215 lines
7.3 KiB
Python
# *****************************************************************************
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify it under
|
|
# the terms of the GNU General Public License as published by the Free Software
|
|
# Foundation; either version 2 of the License, or (at your option) any later
|
|
# version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful, but WITHOUT
|
|
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
|
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
|
|
# details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License along with
|
|
# this program; if not, write to the Free Software Foundation, Inc.,
|
|
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
|
#
|
|
# Module authors:
|
|
# Markus Zolliker <markus.zolliker@psi.ch>
|
|
#
|
|
# *****************************************************************************
|
|
|
|
"""drivers for CCU4, the cryostat control unit at SINQ"""
|
|
import time
|
|
# the most common Frappy classes can be imported from frappy.core
|
|
from frappy.core import EnumType, FloatRange, TupleOf, \
|
|
HasIO, Parameter, Command, Readable, StringIO, StatusType, \
|
|
BUSY, IDLE, ERROR, DISABLED
|
|
from frappy.states import HasStates, status_code, Retry
|
|
|
|
|
|
class CCU4IO(StringIO):
|
|
"""communication with CCU4"""
|
|
# for completeness: (not needed, as it is the default)
|
|
end_of_line = '\n'
|
|
# on connect, we send 'cid' and expect a reply starting with 'CCU4'
|
|
identification = [('cid', r'CCU4.*')]
|
|
|
|
|
|
# inheriting HasIO allows us to use the communicate method for talking with the hardware
|
|
# Readable as a base class defines the value and status parameters
|
|
class HeLevel(HasIO, Readable):
|
|
"""He Level channel of CCU4"""
|
|
|
|
# define the communication class to create the IO module
|
|
ioClass = CCU4IO
|
|
|
|
# define or alter the parameters
|
|
# as Readable.value exists already, we give only the modified property 'unit'
|
|
value = Parameter(unit='%')
|
|
empty_length = Parameter('warm length when empty', FloatRange(0, 2000, unit='mm'),
|
|
readonly=False)
|
|
full_length = Parameter('warm length when full', FloatRange(0, 2000, unit='mm'),
|
|
readonly=False)
|
|
sample_rate = Parameter('sample rate', EnumType(slow=0, fast=1), readonly=False)
|
|
|
|
status = Parameter(datatype=StatusType(Readable, 'DISABLED'))
|
|
|
|
# conversion of the code from the CCU4 parameter 'hsf'
|
|
STATUS_MAP = {
|
|
0: (IDLE, 'sensor ok'),
|
|
1: (ERROR, 'sensor warm'),
|
|
2: (ERROR, 'no sensor'),
|
|
3: (ERROR, 'timeout'),
|
|
4: (ERROR, 'not yet read'),
|
|
5: (DISABLED, 'disabled'),
|
|
}
|
|
|
|
def query(self, cmd):
|
|
"""send a query and get the response
|
|
|
|
:param cmd: the name of the parameter to query or '<parameter>=<value'
|
|
for changing a parameter
|
|
:returns: the (new) value of the parameter
|
|
"""
|
|
name, txtvalue = self.communicate(cmd).split('=')
|
|
assert name == cmd.split('=')[0] # check that we got a reply to our command
|
|
return float(txtvalue)
|
|
|
|
def read_value(self):
|
|
return self.query('h')
|
|
|
|
def read_status(self):
|
|
return self.STATUS_MAP[int(self.query('hsf'))]
|
|
|
|
def read_empty_length(self):
|
|
return self.query('hem')
|
|
|
|
def write_empty_length(self, value):
|
|
return self.query(f'hem={value:g}')
|
|
|
|
def read_full_length(self):
|
|
return self.query('hfu')
|
|
|
|
def write_full_length(self, value):
|
|
return self.query(f'hfu={value:g}')
|
|
|
|
def read_sample_rate(self):
|
|
return self.query('hf')
|
|
|
|
def write_sample_rate(self, value):
|
|
return self.query(f'hf={int(value)}')
|
|
|
|
|
|
|
|
class HeLevelAuto(HasStates, HeLevel):
|
|
fill_level = Parameter('low threshold triggering start filling',
|
|
FloatRange(unit='%'), readonly=False)
|
|
full_level = Parameter('high threshold triggering stop filling',
|
|
FloatRange(unit='%'), readonly=False)
|
|
raw = Parameter('unsmoothed level', FloatRange(unit='%'))
|
|
fill_minutes_range = Parameter('range of possible fill rate',
|
|
TupleOf(FloatRange(unit='min'), FloatRange(unit='min')),
|
|
readonly=False)
|
|
hold_hours_range = Parameter('range of possible consumption rate',
|
|
TupleOf(FloatRange(unit='h'), FloatRange(unit='h')),
|
|
readonly=False)
|
|
fill_delay = Parameter('delay for cooling the transfer line',
|
|
FloatRange(unit='min'), readonly=False)
|
|
status = Parameter(datatype=StatusType(HeLevel, 'BUSY'))
|
|
|
|
_filling = False
|
|
_fillstart = 0
|
|
_last_read = 0
|
|
|
|
def doPoll(self):
|
|
super().doPoll()
|
|
if self._filling:
|
|
if self._filling == 1 and
|
|
if self.value
|
|
self.query('hcd=1')
|
|
|
|
def read_value(self):
|
|
self.raw = super().read_value()
|
|
if not self._state_machine.is_active:
|
|
return self.raw
|
|
return self.value
|
|
|
|
def read_status(self):
|
|
status = HeLevel.read_status(self)
|
|
if status[0] == IDLE:
|
|
return HasStates.read_status(self)
|
|
self.stop_machine(status)
|
|
return status
|
|
|
|
@status_code(BUSY)
|
|
def watching(self, state):
|
|
delta = state.delta(10)
|
|
if self.raw > self.value:
|
|
self.value -= delta / (3600 * self.fill_hours_range[1])
|
|
elif self.raw < self.value:
|
|
self.value -= delta / (3600 * self.fill_hours_range[0])
|
|
else:
|
|
self.value = self.raw
|
|
if self.value < self.fill_level:
|
|
self.query('hcd=1 hf=1')
|
|
state.fillstart = state.now
|
|
return self.precooling
|
|
self.query('hcd=1 hf=1')
|
|
return Retry
|
|
|
|
@status_code(BUSY)
|
|
def precooling(self, state):
|
|
delta = state.delta(1)
|
|
if self.raw > self.value:
|
|
self.value += delta / (60 * self.fill_minutes_range[0])
|
|
elif self.raw < self.value:
|
|
self.value -= delta / (60 * self.fill_minutes_range[0])
|
|
else:
|
|
self.value = self.raw
|
|
if self.value > self.full_level:
|
|
self.query('hcd=0 hf=0')
|
|
return self.watching
|
|
self.query('hcd=1 hf=1')
|
|
if state.now > state.fillstart + self.fill_delay * 60:
|
|
return self.filling
|
|
return Retry
|
|
|
|
@status_code(BUSY)
|
|
def filling(self, state):
|
|
delta = state.delta(1)
|
|
if self.raw > self.value:
|
|
self.value += delta / (60 * self.fill_minutes_range[0])
|
|
elif self.raw < self.value:
|
|
self.value += delta / (60 * self.fill_minutes_range[1])
|
|
else:
|
|
self.value = self.raw
|
|
if self.value > self.full_level:
|
|
self.query('hcd=0 hf=0')
|
|
return self.watching
|
|
self.query('hcd=1 hf=1')
|
|
return Retry
|
|
|
|
@Command()
|
|
def fill(self):
|
|
self.start_machine(self.precooling, fillstart=time.time())
|
|
self.query('hcd=1 hf=1')
|
|
|
|
@Command()
|
|
def stop(self):
|
|
pass
|
|
|
|
|
|
|
|
class N2Sensor(HasIO, Readable):
|
|
# conversion of the code from the CCU4 parameter 'ns'
|
|
STATUS_MAP = {
|
|
0: (IDLE, 'sensor ok'),
|
|
1: (ERROR, 'no sensor'),
|
|
2: (ERROR, 'short circuit'),
|
|
3: (ERROR, 'upside down'),
|
|
4: (ERROR, 'sensor warm'),
|
|
5: (ERROR, 'empty'),
|
|
}
|
|
|