716 lines
24 KiB
Python
716 lines
24 KiB
Python
#!/usr/bin/env python
|
|
# *****************************************************************************
|
|
# This program is free software; you can redistribute it and/or modify it under
|
|
# the terms of the GNU General Public License as published by the Free Software
|
|
# Foundation; either version 2 of the License, or (at your option) any later
|
|
# version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful, but WITHOUT
|
|
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
|
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
|
|
# details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License along with
|
|
# this program; if not, write to the Free Software Foundation, Inc.,
|
|
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
|
#
|
|
# Module authors:
|
|
# Markus Zolliker <markus.zolliker@psi.ch>
|
|
# Anik Stark <anik.stark@psi.ch>
|
|
# *****************************************************************************
|
|
"""oxford instruments old (classic) devices (ILM, IGH, IPS)"""
|
|
|
|
import time
|
|
import re
|
|
from frappy.core import Parameter, Property, EnumType, FloatRange, BoolType, \
|
|
StringIO, HasIO, Readable, Writable, Drivable, IDLE, BUSY, WARN, ERROR, Attached
|
|
from frappy.lib import formatStatusBits
|
|
from frappy.lib.enum import Enum
|
|
from frappy.errors import BadValueError, HardwareError, CommunicationFailedError
|
|
from frappy_psi.magfield import Magfield, Status
|
|
from frappy.states import Retry
|
|
|
|
|
|
def bit(x, pos):
|
|
"""Check if the bit at a certain position is set"""
|
|
return bool(x & (1 << pos))
|
|
|
|
|
|
class OxBase(HasIO):
|
|
|
|
def query(self, cmd, scale=None):
|
|
reply = self.communicate(cmd)
|
|
if reply[0] != cmd[0]:
|
|
raise CommunicationFailedError(f'bad reply: {reply} to command {cmd}')
|
|
if scale is None:
|
|
return int(reply[1:])
|
|
return float(reply[1:]) * scale
|
|
|
|
def change(self, cmd, value, scale=None):
|
|
try:
|
|
self.communicate('C3')
|
|
reply = self.communicate(f'{cmd}{round(value / scale)}')
|
|
if reply[0] != cmd[0]:
|
|
raise CommunicationFailedError(f'bad reply: {reply}')
|
|
finally:
|
|
self.communicate('C0')
|
|
|
|
def command(self, *cmds):
|
|
try:
|
|
self.communicate('C3')
|
|
for cmd in cmds:
|
|
self.communicate(cmd)
|
|
finally:
|
|
self.communicate('C0')
|
|
|
|
|
|
class IPS_IO(StringIO):
|
|
"""oxford instruments power supply IPS120-10"""
|
|
end_of_line = '\r'
|
|
identification = [('V', r'IPS120-10.*')] # instrument type and software version
|
|
default_settings = {'baudrate': 9600}
|
|
|
|
|
|
Action = Enum(hold=0, run_to_set=1, run_to_zero=2, clamped=4)
|
|
status_map = {'0': (IDLE, ''),
|
|
'1': (ERROR, 'quenched'),
|
|
'2': (ERROR, 'overheated'),
|
|
'4': (WARN, 'warming up'),
|
|
'8': (ERROR, '')
|
|
}
|
|
limit_map = {'0': (IDLE, ''),
|
|
'1': (WARN, 'on positive voltage limit'),
|
|
'2': (WARN, 'on negative voltage limit'),
|
|
'4': (ERROR, 'outside negative current limit'),
|
|
'8': (ERROR, 'outside positive current limit')
|
|
}
|
|
|
|
|
|
class Field(OxBase, Magfield):
|
|
""" read commands:
|
|
R1 measured power supply voltage (V)
|
|
R7 demand field (output field) (T)
|
|
R8 setpoint (target field) (T)
|
|
R9 sweep field rate (T/min)
|
|
R18 persistent field (T)
|
|
X Status
|
|
|
|
control commands:
|
|
A set activity
|
|
T set field sweep rate
|
|
H set switch heater
|
|
J set target field """
|
|
|
|
ioClass = IPS_IO
|
|
|
|
action = Parameter('action', EnumType(Action), readonly=False)
|
|
setpoint = Parameter('field setpoint', FloatRange(unit='T'), default=0)
|
|
voltage = Parameter('leads voltage', FloatRange(unit='V'), default=0)
|
|
persistent_field = Parameter(
|
|
'persistent field at last switch off', FloatRange(unit='T'), readonly=False)
|
|
wait_switch_on = Parameter(default=15)
|
|
wait_switch_off = Parameter(default=15)
|
|
wait_stable_field = Parameter(default=10)
|
|
forced_persistent_field = Parameter(
|
|
'manual indication that persistent field is bad', BoolType(), readonly=False, default=False)
|
|
switch_heater = Parameter('turn switch heater on/off', EnumType(off=0, on=1, forced=2), default=0)
|
|
|
|
_field_mismatch = None
|
|
__persistent_field = None # internal value of persistent field
|
|
_status = '00'
|
|
|
|
def initModule(self):
|
|
super().initModule()
|
|
try:
|
|
self.write_action(Action.hold)
|
|
except Exception as e:
|
|
self.log.error('can not set to hold %r', e)
|
|
|
|
def doPoll(self):
|
|
super().doPoll()
|
|
self.read_current()
|
|
|
|
def initialReads(self):
|
|
# on restart, assume switch is changed long time ago, if not, the mercury
|
|
# will complain and this will be handled in start_ramp_to_field
|
|
self.switch_on_time = 0
|
|
self.switch_off_time = 0
|
|
super().initialReads()
|
|
|
|
def read_value(self):
|
|
if self.switch_heater:
|
|
self.__persistent_field = self.query('R7')
|
|
self.forced_persistent_field = False
|
|
self._field_mismatch = False
|
|
return self.__persistent_field
|
|
pf = self.query('R18')
|
|
if self.__persistent_field is None:
|
|
self.__persistent_field = pf
|
|
self._field_mismatch = False
|
|
else:
|
|
self._field_mismatch = abs(self.__persistent_field - pf) > self.tolerance * 10
|
|
self.persistent_field = self.__persistent_field
|
|
return self.__persistent_field
|
|
|
|
def read_ramp(self):
|
|
return self.query('R9')
|
|
|
|
def write_ramp(self, value):
|
|
self.change('T', value)
|
|
return self.read_ramp()
|
|
|
|
def write_action(self, value):
|
|
self.change('A', int(value))
|
|
self.read_status()
|
|
|
|
def read_voltage(self):
|
|
return self.query('R1')
|
|
|
|
def read_setpoint(self):
|
|
return self.query('R8')
|
|
|
|
def read_current(self):
|
|
return self.query('R7')
|
|
|
|
def write_persistent_field(self, value):
|
|
if self.forced_persistent_field or abs(self.__persistent_field - value) <= self.tolerance * 10:
|
|
self._field_mismatch = False
|
|
self.__persistent_field = value
|
|
return value
|
|
raise BadValueError('changing persistent field needs forced_persistent_field=True')
|
|
|
|
def write_target(self, target):
|
|
if self._field_mismatch:
|
|
self.forced_persistent_field = True
|
|
raise BadValueError('persistent field does not match - set persistent field to guessed value first')
|
|
return super().write_target(target)
|
|
|
|
def read_switch_heater(self):
|
|
self.read_status()
|
|
return self.switch_heater
|
|
|
|
def read_status(self):
|
|
status = self.communicate('X')
|
|
match = re.match(r'X(\d\d)A(\d)C\dH(\d)M\d\dP\d\d', status)
|
|
if match is None:
|
|
raise CommunicationFailedError(f'unexpected status: {status}')
|
|
self._status = match.group(1)
|
|
self.action = int(match.group(2))
|
|
self.switch_heater = match.group(3) == '1'
|
|
if self._status[0] != '0':
|
|
self._state_machine.stop()
|
|
return status_map.get(self._status[0], (ERROR, f'bad status: {self._status}'))
|
|
if self._status[1] != '0':
|
|
return limit_map.get(self._status[1], (ERROR, f'bad status: {self._status}')) # need to stop sm too?
|
|
return super().read_status()
|
|
|
|
def write_switch_heater(self, value):
|
|
if value == self.read_switch_heater():
|
|
self.log.info('switch heater already %r', value)
|
|
# we do not want to restart the timer
|
|
return value
|
|
self.log.debug('switch time fixed for 10 sec')
|
|
self.change('H', int(value))
|
|
#return result
|
|
return int(value)
|
|
|
|
def set_and_go(self, value):
|
|
self.change('J', value)
|
|
self.setpoint = self.read_current()
|
|
assert self.write_action(Action.hold) == Action.hold
|
|
assert self.write_action(Action.run_to_set) == Action.run_to_set
|
|
|
|
def ramp_to_target(self, sm):
|
|
try:
|
|
return super().ramp_to_target(sm)
|
|
except HardwareError:
|
|
sm.try_cnt -= 1
|
|
if sm.try_cnt < 0:
|
|
raise
|
|
self.set_and_go(sm.target)
|
|
return Retry
|
|
|
|
def final_status(self, *args, **kwds):
|
|
self.write_action(Action.hold)
|
|
return super().final_status(*args, **kwds)
|
|
|
|
def on_restart(self, sm):
|
|
self.write_action(Action.hold)
|
|
return super().on_restart(sm)
|
|
|
|
def start_ramp_to_field(self, sm):
|
|
if abs(self.current - self.__persistent_field) <= self.tolerance:
|
|
self.log.info('leads %g are already at %g', self.current, self.__persistent_field)
|
|
return self.ramp_to_field
|
|
try:
|
|
self.set_and_go(self.__persistent_field)
|
|
except (HardwareError, AssertionError) as e:
|
|
if self.switch_heater:
|
|
self.log.warn('switch is already on!')
|
|
return self.ramp_to_field
|
|
self.log.warn('wait first for switch off current=%g pf=%g %r', self.current, self.__persistent_field, e)
|
|
sm.after_wait = self.ramp_to_field
|
|
return self.wait_for_switch
|
|
return self.ramp_to_field
|
|
|
|
def start_ramp_to_target(self, sm):
|
|
sm.try_cnt = 5
|
|
try:
|
|
self.set_and_go(sm.target)
|
|
except (HardwareError, AssertionError) as e:
|
|
self.log.warn('switch not yet ready %r', e)
|
|
self.status = Status.PREPARING, 'wait for switch on'
|
|
sm.after_wait = self.ramp_to_target
|
|
return self.wait_for_switch
|
|
return self.ramp_to_target
|
|
|
|
def ramp_to_field(self, sm):
|
|
try:
|
|
return super().ramp_to_field(sm)
|
|
except HardwareError:
|
|
sm.try_cnt -= 1
|
|
if sm.try_cnt < 0:
|
|
raise
|
|
self.set_and_go(self.__persistent_field)
|
|
return Retry
|
|
|
|
def wait_for_switch(self, sm):
|
|
if not sm.delta(10):
|
|
return Retry
|
|
try:
|
|
self.log.warn('try again')
|
|
# try again
|
|
self.set_and_go(self.__persistent_field)
|
|
except (HardwareError, AssertionError):
|
|
return Retry
|
|
return sm.after_wait
|
|
|
|
def wait_for_switch_on(self, sm):
|
|
self.read_switch_heater() # trigger switch_on/off_time
|
|
if self.switch_heater == self.switch_heater.off:
|
|
if sm.init: # avoid too many states chained
|
|
return Retry
|
|
self.log.warning('switch turned off manually?')
|
|
return self.start_switch_on
|
|
return super().wait_for_switch_on(sm)
|
|
|
|
def wait_for_switch_off(self, sm):
|
|
self.read_switch_heater()
|
|
if self.switch_heater == self.switch_heater.on:
|
|
if sm.init: # avoid too many states chained
|
|
return Retry
|
|
self.log.warning('switch turned on manually?')
|
|
return self.start_switch_off
|
|
return super().wait_for_switch_off(sm)
|
|
|
|
def start_ramp_to_zero(self, sm):
|
|
pf = self.query('R18')
|
|
if abs(pf - self.value) > self.tolerance * 10:
|
|
self.log.warning('persistent field %g does not match %g after switch off', pf, self.value)
|
|
try:
|
|
assert self.write_action(Action.hold) == Action.hold
|
|
assert self.write_action(Action.run_to_zero) == Action.run_to_zero
|
|
except (HardwareError, AssertionError) as e:
|
|
self.log.warn('switch not yet ready %r', e)
|
|
self.status = Status.PREPARING, 'wait for switch off'
|
|
sm.after_wait = self.ramp_to_zero
|
|
return self.wait_for_switch
|
|
return self.ramp_to_zero
|
|
|
|
def ramp_to_zero(self, sm):
|
|
try:
|
|
return super().ramp_to_zero(sm)
|
|
except HardwareError:
|
|
sm.try_cnt -= 1
|
|
if sm.try_cnt < 0:
|
|
raise
|
|
assert self.write_action(Action.hold) == Action.hold
|
|
assert self.write_action(Action.run_to_zero) == Action.run_to_zero
|
|
return Retry
|
|
|
|
def write_trainmode(self, value):
|
|
self.change('M', '5' if value == 'off' else '1')
|
|
|
|
|
|
class ILM_IO(StringIO):
|
|
"""oxford instruments level meter ILM200"""
|
|
end_of_line = '\r'
|
|
identification = [('V', r'ILM200.*')] # instrument type and software version
|
|
default_settings = {'baudrate': 9600}
|
|
timeout = 5
|
|
|
|
|
|
class Level(OxBase, Readable):
|
|
|
|
""" X code: XcccSuuvvwwRzz
|
|
c: position corresponds to channel 1, 2, 3
|
|
possible values in each position are 0, 1, 2, 3, 9
|
|
vv, uu, ww: channel status for channel 1, 2, 3 respectively, 2 bits each
|
|
zz: relay status """
|
|
|
|
ioClass = ILM_IO
|
|
|
|
value = Parameter('level', datatype=FloatRange(unit='%'))
|
|
fast = Parameter('fast reading', datatype=BoolType())
|
|
CHANNEL = None
|
|
X_PATTERN = re.compile(r'X(\d)(\d)(\d)S([0-9A-F]{2})([0-9A-F]{2})([0-9A-F]{2})R\d\d$')
|
|
MEDIUM = None
|
|
_statusbits = None
|
|
|
|
def read_value(self):
|
|
return self.query(f'R{self.CHANNEL}', 0.1)
|
|
|
|
def write_fast(self, fast):
|
|
self.command(f'T{self.CHANNEL}' if fast else f'S{self.CHANNEL}')
|
|
|
|
def get_status(self):
|
|
reply = self.communicate('X')
|
|
match = self.X_PATTERN.match(reply)
|
|
if match:
|
|
statuslist = match.groups()
|
|
if statuslist[self.CHANNEL] == '9':
|
|
return ERROR, f'error on {self.MEDIUM} level channel (not connected?)'
|
|
if (statuslist[self.CHANNEL] == '1') != (self.MEDIUM == 'N2'):
|
|
# '1': channel is used for N2
|
|
return ERROR, f'{self.MEDIUM} level channel not configured properly'
|
|
self._statusbits = int(statuslist[self.CHANNEL + 3], 16)
|
|
return None
|
|
return ERROR, f'bad status message {reply}'
|
|
|
|
|
|
class HeLevel(Level):
|
|
|
|
value = Parameter('He level', FloatRange(unit='%'))
|
|
fast = Parameter('switching fast/slow', datatype=BoolType(), readonly=False)
|
|
CHANNEL = 1
|
|
MEDIUM = 'He'
|
|
|
|
def read_status(self):
|
|
status = self.get_status()
|
|
if status is not None:
|
|
return status
|
|
return IDLE, formatStatusBits(self._statusbits, ['meas', 'fast', 'slow'])
|
|
|
|
|
|
class N2Level(Level):
|
|
|
|
ioClass = ILM_IO
|
|
|
|
value = Parameter('N2 level', FloatRange(unit='%'))
|
|
CHANNEL = 2
|
|
MEDIUM = 'N2'
|
|
|
|
def read_status(self):
|
|
status = self.get_status()
|
|
if status is not None:
|
|
return status
|
|
return IDLE, ''
|
|
|
|
|
|
VALVE_MAP = {'V9': 1,
|
|
'V8': 2,
|
|
'V7': 3,
|
|
'V11A': 4,
|
|
'V13A': 5,
|
|
'V13B': 6,
|
|
'V11B': 7,
|
|
'V12B': 8,
|
|
'rotary_pump_He4': 9,
|
|
'V1': 10,
|
|
'V5': 11,
|
|
'V4': 12,
|
|
'V3': 13,
|
|
'V14' : 14,
|
|
'V10': 15,
|
|
'V2': 16,
|
|
'V2A_He4': 17,
|
|
'V1A_He4': 18,
|
|
'V5A_He4': 19,
|
|
'V4A_He4': 20,
|
|
'V3A_He4': 21,
|
|
'roots_pump': 22,
|
|
'unlabeled_pump': 23,
|
|
'rotary_pump_He3': 24,
|
|
}
|
|
|
|
|
|
class IGH_IO(StringIO):
|
|
""" oxford instruments dilution gas handling Kelvinox IGH
|
|
|
|
X code: XxAaCcPpppSsOoEe
|
|
x motorized valves are still initializing
|
|
a mix heater activity
|
|
c control status (0, 1, 2, 3)
|
|
pppp 4 hex numbers (two digits each), state of solenoid valves and pumps
|
|
s hex digit, state of the 3 motorized valves
|
|
o still and sorb heater information
|
|
e mix heater power range """
|
|
|
|
end_of_line = '\r'
|
|
identification = [('V', r'IGH.*')]
|
|
default_settings = {'baudrate': 9600}
|
|
|
|
X_PATTERN = re.compile(r'X(\d)A(\d)C\dP([0-9A-F]{8})S([0-9A-F])O(\d)E(\d)$')
|
|
_ini_valves = 0 # ini status of motorized valves
|
|
_mix_status = 0
|
|
_valves = 0 # status of solenoid valves and pumps
|
|
_motor_status = 0
|
|
_heater_status = 0
|
|
_heater_range = 0
|
|
|
|
def doPoll(self):
|
|
reply = self.communicate('X')
|
|
match = self.X_PATTERN.match(reply)
|
|
if match:
|
|
ini_valves, mix_status, valves, motor_status, heater_status, heater_range = match.groups()
|
|
self._ini_valves = int(ini_valves, 16)
|
|
self._mix_status = int(mix_status)
|
|
self._valves = int(valves, 16)
|
|
self._motor_status = int(motor_status, 16)
|
|
self._heater_status = int(heater_status)
|
|
self._heater_range = int(heater_range)
|
|
|
|
|
|
class Valve(OxBase, Writable):
|
|
|
|
ioClass = IGH_IO
|
|
|
|
value = Parameter('state of valve (open or close)', datatype=EnumType(open=1, close=0))
|
|
target = Parameter('open or close valve', datatype=EnumType(open=1, close=0))
|
|
addr = Property('valve name', datatype=EnumType(VALVE_MAP))
|
|
|
|
def read_value(self):
|
|
# hex -> int -> check if bit in bin(integer) is set at the addr position
|
|
return bit(self.io._valves, self.addr.value - 1)
|
|
|
|
def write_target(self, target):
|
|
# open: 2N, close: 2N + 1
|
|
self.change('P', (2 * self.addr.value + 1 - int(target)), 1)
|
|
|
|
|
|
class PulsedValve(Valve):
|
|
|
|
delay = Parameter('delay (time valve is open)', FloatRange(unit='s'), readonly=False)
|
|
_start = 0
|
|
|
|
def write_target(self, target):
|
|
if target:
|
|
self._start = time.time()
|
|
self.setFastPoll(True, 0.01)
|
|
else:
|
|
self.setFastPoll(False)
|
|
self.change('P', (2 * self.addr.value + 1 - int(target)), 1)
|
|
|
|
def doPoll(self):
|
|
super().doPoll()
|
|
if self._start:
|
|
if time.time() > self._start + self.delay:
|
|
self.write_target(0)
|
|
self._start = 0
|
|
|
|
|
|
class MotorValve(OxBase, Writable):
|
|
|
|
ioClass = IGH_IO
|
|
|
|
target = Parameter('target of motor valve', datatype=FloatRange(0, 100, unit='%'))
|
|
value = Parameter('position of fast valve', datatype=FloatRange(0, 100, unit='%'))
|
|
|
|
def write_target(self, target):
|
|
self.change('H', target, 0.1) # valve V12A
|
|
self.value = target
|
|
|
|
def read_value(self):
|
|
return self.target
|
|
|
|
def read_status(self):
|
|
if bit(self.io._ini_valves, 1):
|
|
self.value = 0
|
|
return BUSY, 'valve V12A is initializing'
|
|
return IDLE, ''
|
|
|
|
|
|
class SlowMotorValve(OxBase, Drivable):
|
|
|
|
ioClass = IGH_IO
|
|
|
|
target = Parameter('target of slow motor valve', datatype=FloatRange(0, 100, unit='%', fmtstr='%.1f'))
|
|
value = Parameter('position of slow valve', datatype=FloatRange(0, 100, unit='%', fmtstr='%.1f'))
|
|
_prev_time = 0
|
|
|
|
def read_target(self):
|
|
return self.query('R7', 0.1)
|
|
|
|
def write_target(self, target):
|
|
self.change('G', target, 0.1) # valve V6
|
|
self.read_status()
|
|
|
|
def read_status(self):
|
|
if bit(self.io._ini_valves, 0):
|
|
self.value = 0
|
|
return BUSY, 'valve V6 is initializing'
|
|
now = time.time()
|
|
if self._prev_time == 0:
|
|
self.value = self.read_target()
|
|
delta_t = 0
|
|
else:
|
|
delta_t = now - self._prev_time
|
|
self._prev_time = now
|
|
if (self.io._motor_status >> 0) & 1:
|
|
if self.target > self.value:
|
|
self.value = min(self.target, self.value + delta_t / 300 * 100)
|
|
else:
|
|
self.value = max(self.target, self.value - delta_t / 300 * 100)
|
|
return BUSY, 'valve V6 is moving'
|
|
self.value = self.target
|
|
return IDLE, ''
|
|
|
|
def stop(self):
|
|
"""stop moving"""
|
|
self.write_target(self.value)
|
|
|
|
|
|
GAUGE_MAP = {'G1': 14,
|
|
'G2': 15,
|
|
'G3': 16,
|
|
'P1': 20,
|
|
'P2': 21,
|
|
}
|
|
|
|
|
|
class Pressure(OxBase, Readable):
|
|
|
|
addr = Property('pressure gauge address', datatype=EnumType(GAUGE_MAP))
|
|
|
|
def read_value(self):
|
|
nr = self.addr.value
|
|
if self.addr.name.startswith('G'):
|
|
return self.query(f'R{nr}', 0.1)
|
|
return self.query(f'R{nr}', 1)
|
|
|
|
|
|
class MixPower(OxBase, Writable):
|
|
|
|
ioClass = IGH_IO
|
|
|
|
target = Parameter('mix power', datatype=FloatRange(0, 0.02, unit='W'))
|
|
value = Parameter('mix power', datatype=FloatRange(0, 0.02, unit='W'))
|
|
|
|
def read_value(self):
|
|
scale = 10**-(7 - self.io._heater_range)
|
|
return self.query('R4', scale)
|
|
|
|
def write_target(self, target):
|
|
if target:
|
|
self.command('A1') # on, fixed heater power
|
|
target = min(0.01999, target)
|
|
target_nW = str(int(target * 1e9))
|
|
range_mix = max(1, len(target_nW) - 3)
|
|
if target_nW >= '2000':
|
|
range_mix += 1
|
|
scale = 10**-(10 - range_mix)
|
|
self.command(f'E{range_mix}')
|
|
self.change('M', target, scale)
|
|
else:
|
|
self.command('A0') # turn off
|
|
|
|
def read_status(self):
|
|
if self.io._mix_status:
|
|
return IDLE, 'on'
|
|
return IDLE, 'off'
|
|
|
|
|
|
class SorbPower(OxBase, Writable):
|
|
|
|
""" heater status:
|
|
bit 0 still on
|
|
bit 1 sorb in temperature control (this ctr mode is not used)
|
|
bit 2 sorb in power control """
|
|
|
|
ioClass = IGH_IO
|
|
|
|
target = Parameter('sorb power', datatype=FloatRange(0, 2, unit='W')) # Werte 0.001, 2
|
|
writecmd = 'B' # in units of 1mW (range 0000 to 1999)
|
|
scale = 1e-3
|
|
|
|
def read_value(self):
|
|
if self.io._heater_status & 6:
|
|
return self.query('R6', self.scale)
|
|
return 0
|
|
|
|
def write_target(self, target):
|
|
self.change('O', self.io._heater_status & 1 | 4 * (target > 0), 1)
|
|
self.change('B', target, self.scale)
|
|
|
|
def read_status(self):
|
|
sorb_status = self.io._heater_status & 6
|
|
if sorb_status == 2:
|
|
return WARN, 'sorb in temperature control mode'
|
|
return IDLE, ('on' if sorb_status else 'off')
|
|
|
|
|
|
class StillPower(OxBase, Writable):
|
|
|
|
""" heater status:
|
|
bit 0 still on
|
|
bit 1 sorb in temperature control (this ctr mode is not used)
|
|
bit 2 sorb in power control """
|
|
|
|
ioClass = IGH_IO
|
|
|
|
target = Parameter('still power', datatype=FloatRange(0, 0.2, unit='W'))
|
|
readcmd = 'R5'
|
|
writecmd = 'S' # in units of 0.1mW (range 0000 to 1999)
|
|
scale = 1e-4
|
|
|
|
def read_value(self):
|
|
if self.io._heater_status & 1:
|
|
return self.query('R5', self.scale)
|
|
return 0
|
|
|
|
def write_target(self, target):
|
|
self.change('O', self.io._heater_status & 6 | (target > 0), 1)
|
|
self.change('S', target, self.scale)
|
|
|
|
def read_status(self):
|
|
sorb_status = self.io._heater_status & 1
|
|
return IDLE, ('on' if sorb_status else 'off')
|
|
|
|
|
|
class N2Sensor(Readable):
|
|
|
|
value = Parameter(datatype=FloatRange(unit='K'))
|
|
|
|
|
|
class Pump(Valve):
|
|
|
|
value = Parameter('state of valve (open or close)', datatype=EnumType(on=1, off=0))
|
|
target = Parameter('open or close valve', datatype=EnumType(on=1, off=0))
|
|
upper_LN2 = Attached()
|
|
lower_LN2 = Attached()
|
|
PATTERN = re.compile(r'\?\{(\d),(\d+),(\d+)\}')
|
|
|
|
def read_value(self):
|
|
reply = self.communicate('{r}')
|
|
match = self.PATTERN.match(reply)
|
|
if match:
|
|
value, upper_LN2, lower_LN2 = match.groups()
|
|
self.upper_LN2.value = 0.1 * int(upper_LN2)
|
|
self.lower_LN2.value = 0.1 * int(lower_LN2)
|
|
return int(value)
|
|
raise CommunicationFailedError('bad reply to {r}')
|
|
|
|
def read_target(self):
|
|
# hex -> int -> check if bit in bin(integer) is set at the addr position
|
|
return bit(self.io._valves, self.addr.value - 1)
|
|
|
|
def write_target(self, target):
|
|
# open: 2 * 24, close: 2 * 24 + 1
|
|
self.change('P', 2 * self.addr.value + 1 - target, 1)
|
|
self.value = target
|
|
|
|
def read_status(self):
|
|
if self.target and not self.value:
|
|
return WARN, 'pump switched off'
|
|
return IDLE, ''
|