frappy_psi: add oiclassic (not finished)

This commit is contained in:
2025-12-10 17:10:54 +01:00
parent 75c3161035
commit 6ea8bc6e52

514
frappy_psi/oiclassic.py Normal file
View File

@@ -0,0 +1,514 @@
#!/usr/bin/env python
# *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
# Anik Stark <anik.stark@psi.ch>
# *****************************************************************************
"""older generation (classic) oxford instruments"""
import time
import re
from frappy.core import Parameter, Property, EnumType, FloatRange, IntRange, BoolType, \
StringIO, HasIO, Readable, Writable, Drivable, IDLE, WARN, ERROR
from frappy.lib import formatStatusBits
from frappy.lib.enum import Enum
from frappy.errors import BadValueError, HardwareError, CommunicationFailedError
from frappy_psi.magfield import Magfield, Status
from frappy.states import Retry
def bit(x, pos):
return bool(x & (1 << pos))
class Base(HasIO):
def query(self, cmd, scale=None):
reply = self.communicate(cmd)
if reply[0] != cmd[0]:
raise CommunicationFailedError(f'bad reply: {reply} to command {cmd}')
try:
return float(reply[1:]) * scale
except Exception:
pass
def change(self, cmd, value, scale=None):
try:
self.communicate('C3')
reply = self.communicate(f'{cmd}{round(value / scale)}')
if reply[0] != cmd[0]:
raise CommunicationFailedError(f'bad reply: {reply}')
finally:
self.communicate('C0')
def command(self, *cmds):
try:
self.communicate('C3')
for cmd in cmds:
self.communicate(cmd)
finally:
self.communicate('C0')
class IPS_IO(StringIO):
"""oxford instruments power supply IPS120-10"""
end_of_line = '\r'
identification = [('V', r'IPS120-10.*')] # instrument type and software version
default_settings = {'baudrate': 9600}
Action = Enum(hold=0, run_to_set=1, run_to_zero=2, clamped=4)
status_map = {'0': (IDLE, ''),
'1': (ERROR, 'quenched'),
'2': (ERROR, 'overheated'),
'4': (WARN, 'warming up'),
'8': (ERROR, '')
}
limit_map = {'0': (IDLE, ''),
'1': (WARN, 'on positive voltage limit'),
'2': (WARN, 'on negative voltage limit'),
'4': (ERROR, 'outside negative current limit'),
'8': (ERROR, 'outside positive current limit')
}
class Field(Base, Magfield):
""" read commands:
R1 measured power supply voltage (V)
R7 demand field (output field) (T)
R8 setpoint (target field) (T)
R9 sweep field rate (T/min)
R18 persistent field (T)
X Status
control commands:
A set activity
T set field sweep rate
H set switch heater
J set target field """
ioClass = IPS_IO
action = Parameter('action', EnumType(Action), readonly=False)
setpoint = Parameter('field setpoint', FloatRange(unit='T'), default=0)
voltage = Parameter('leads voltage', FloatRange(unit='V'), default=0)
persistent_field = Parameter(
'persistent field at last switch off', FloatRange(unit='T'), readonly=False)
wait_switch_on = Parameter(default=15)
wait_switch_off = Parameter(default=15)
wait_stable_field = Parameter(default=10)
forced_persistent_field = Parameter(
'manual indication that persistent field is bad', BoolType(), readonly=False, default=False)
switch_heater = Parameter('turn switch heater on/off', EnumType(off=0, on=1, forced=2), default=0)
_field_mismatch = None
__persistent_field = None # internal value of persistent field
_status = '00'
def initModule(self):
super().initModule()
try:
self.write_action(Action.hold)
except Exception as e:
self.log.error('can not set to hold %r', e)
def doPoll(self):
super().doPoll()
self.read_current()
def initialReads(self):
# on restart, assume switch is changed long time ago, if not, the mercury
# will complain and this will be handled in start_ramp_to_field
self.switch_on_time = 0
self.switch_off_time = 0
super().initialReads()
def read_value(self):
if self.switch_heater:
self.__persistent_field = self.query('R7')
self.forced_persistent_field = False
self._field_mismatch = False
return self.__persistent_field
pf = self.query('R18')
if self.__persistent_field is None:
self.__persistent_field = pf
self._field_mismatch = False
else:
self._field_mismatch = abs(self.__persistent_field - pf) > self.tolerance * 10
self.persistent_field = self.__persistent_field
return self.__persistent_field
def read_ramp(self):
return self.query('R9')
def write_ramp(self, value):
self.change('T', value)
return self.read_ramp()
def write_action(self, value):
self.change('A', int(value))
self.read_status()
def read_voltage(self):
return self.query('R1')
def read_setpoint(self):
return self.query('R8')
def read_current(self):
return self.query('R7')
def write_persistent_field(self, value):
if self.forced_persistent_field or abs(self.__persistent_field - value) <= self.tolerance * 10:
self._field_mismatch = False
self.__persistent_field = value
return value
raise BadValueError('changing persistent field needs forced_persistent_field=True')
def write_target(self, target):
if self._field_mismatch:
self.forced_persistent_field = True
raise BadValueError('persistent field does not match - set persistent field to guessed value first')
return super().write_target(target)
def read_switch_heater(self):
self.read_status()
return self.switch_heater
def read_status(self):
status = self.communicate('X')
match = re.match(r'X(\d\d)A(\d)C\dH(\d)M\d\dP\d\d', status)
if match is None:
raise CommunicationFailedError(f'unexpected status: {status}')
self._status = match.group(1)
self.action = int(match.group(2))
self.switch_heater = match.group(3) == '1'
if self._status[0] != '0':
self._state_machine.stop()
return status_map.get(self._status[0], (ERROR, f'bad status: {self._status}'))
if self._status[1] != '0':
return limit_map.get(self._status[1], (ERROR, f'bad status: {self._status}')) # need to stop sm too?
return super().read_status()
def write_switch_heater(self, value):
if value == self.read_switch_heater():
self.log.info('switch heater already %r', value)
# we do not want to restart the timer
return value
self.log.debug('switch time fixed for 10 sec')
self.change('H', int(value))
#return result
return int(value)
def set_and_go(self, value):
self.change('J', value)
self.setpoint = self.read_current()
assert self.write_action(Action.hold) == Action.hold
assert self.write_action(Action.run_to_set) == Action.run_to_set
def ramp_to_target(self, sm):
try:
return super().ramp_to_target(sm)
except HardwareError:
sm.try_cnt -= 1
if sm.try_cnt < 0:
raise
self.set_and_go(sm.target)
return Retry
def final_status(self, *args, **kwds):
self.write_action(Action.hold)
return super().final_status(*args, **kwds)
def on_restart(self, sm):
self.write_action(Action.hold)
return super().on_restart(sm)
def start_ramp_to_field(self, sm):
if abs(self.current - self.__persistent_field) <= self.tolerance:
self.log.info('leads %g are already at %g', self.current, self.__persistent_field)
return self.ramp_to_field
try:
self.set_and_go(self.__persistent_field)
except (HardwareError, AssertionError) as e:
if self.switch_heater:
self.log.warn('switch is already on!')
return self.ramp_to_field
self.log.warn('wait first for switch off current=%g pf=%g %r', self.current, self.__persistent_field, e)
sm.after_wait = self.ramp_to_field
return self.wait_for_switch
return self.ramp_to_field
def start_ramp_to_target(self, sm):
sm.try_cnt = 5
try:
self.set_and_go(sm.target)
except (HardwareError, AssertionError) as e:
self.log.warn('switch not yet ready %r', e)
self.status = Status.PREPARING, 'wait for switch on'
sm.after_wait = self.ramp_to_target
return self.wait_for_switch
return self.ramp_to_target
def ramp_to_field(self, sm):
try:
return super().ramp_to_field(sm)
except HardwareError:
sm.try_cnt -= 1
if sm.try_cnt < 0:
raise
self.set_and_go(self.__persistent_field)
return Retry
def wait_for_switch(self, sm):
if not sm.delta(10):
return Retry
try:
self.log.warn('try again')
# try again
self.set_and_go(self.__persistent_field)
except (HardwareError, AssertionError):
return Retry
return sm.after_wait
def wait_for_switch_on(self, sm):
self.read_switch_heater() # trigger switch_on/off_time
if self.switch_heater == self.switch_heater.off:
if sm.init: # avoid too many states chained
return Retry
self.log.warning('switch turned off manually?')
return self.start_switch_on
return super().wait_for_switch_on(sm)
def wait_for_switch_off(self, sm):
self.read_switch_heater()
if self.switch_heater == self.switch_heater.on:
if sm.init: # avoid too many states chained
return Retry
self.log.warning('switch turned on manually?')
return self.start_switch_off
return super().wait_for_switch_off(sm)
def start_ramp_to_zero(self, sm):
pf = self.query('R18')
if abs(pf - self.value) > self.tolerance * 10:
self.log.warning('persistent field %g does not match %g after switch off', pf, self.value)
try:
assert self.write_action(Action.hold) == Action.hold
assert self.write_action(Action.run_to_zero) == Action.run_to_zero
except (HardwareError, AssertionError) as e:
self.log.warn('switch not yet ready %r', e)
self.status = Status.PREPARING, 'wait for switch off'
sm.after_wait = self.ramp_to_zero
return self.wait_for_switch
return self.ramp_to_zero
def ramp_to_zero(self, sm):
try:
return super().ramp_to_zero(sm)
except HardwareError:
sm.try_cnt -= 1
if sm.try_cnt < 0:
raise
assert self.write_action(Action.hold) == Action.hold
assert self.write_action(Action.run_to_zero) == Action.run_to_zero
return Retry
def write_trainmode(self, value):
self.change('M', '5' if value == 'off' else '1')
class ILM_IO(StringIO):
"""oxford instruments level meter ILM200"""
end_of_line = '\r'
identification = [('V', r'ILM200.*')] # instrument type and software version
default_settings = {'baudrate': 9600}
timeout = 5
class Level(Base, Readable):
""" X code: XcccSuuvvwwRzz
c: position corresponds to channel 1, 2, 3
possible values in each position are 0, 1, 2, 3, 9
vv, uu, ww: channel status for channel 1, 2, 3 respectively, 2 bits each
zz: relay status """
ioClass = ILM_IO
value = Parameter('level', datatype=FloatRange(unit='%'))
CHANNEL = None
X_PATTERN = re.compile(r'X(\d)(\d)(\d)S([0-9A-F]{2})([0-9A-F]{2})([0-9A-F]{2})R\d\d$')
MEDIUM = None
_statusbits = None
def read_value(self):
return self.query(f'R{self.CHANNEL}', 0.1)
def write_fast(self, fast):
self.command(f'T{self.CHANNEL}' if fast else f'S{self.CHANNEL}')
def get_status(self):
reply = self.communicate('X')
match = self.X_PATTERN.match(reply)
if match:
statuslist = match.groups()
if statuslist[self.CHANNEL] == '9':
return ERROR, f'error on {self.MEDIUM} level channel (not connected?)'
if (statuslist[self.CHANNEL] == '1') != (self.MEDIUM == 'N2'):
# '1': channel is used for N2
return ERROR, f'{self.MEDIUM} level channel not configured properly'
self._statusbits = int(statuslist[self.CHANNEL + 3], 16)
return None
return ERROR, f'bad status message {reply}'
class HeLevel(Level):
fast = Parameter('switching fast/slow', datatype=BoolType(), readonly=False)
CHANNEL = 1
MEDIUM = 'He'
def read_status(self):
status = self.get_status()
if status is not None:
return status
return IDLE, formatStatusBits(self._statusbits, ['meas', 'fast', 'slow'])
class N2Level(Level):
ioClass = ILM_IO
value = Parameter('N2 level', FloatRange(unit='%'))
CHANNEL = 2
MEDIUM = 'N2'
def read_status(self):
status = self.get_status()
if status is not None:
return status
return IDLE, ''
VALVE_MAP = {'01': 'V9',
'02': 'V8',
'03': 'V7',
'04': 'V11A',
'05': 'V13A',
'06': 'V13B',
'07': 'V11B',
'08': 'V12B',
'09': 'He4 rotary pump',
'10': 'V1',
'11': 'V5',
'12': 'V4',
'13': 'V3',
'14': 'V14',
'15': 'V10',
'16': 'V2',
'17': 'V2A He4',
'18': 'V1A He4',
'19': 'V5A He4',
'20': 'V4A He4',
'21': 'V3A He4',
'22': 'roots pump',
'23': 'unlabeled pump',
'24': 'He3 rotary pump',
}
class IGH_IO(StringIO):
"""oxford instruments dilution gas handling Kelvinox IGH"""
end_of_line = '\r'
identification = [('V', r'IGH.*')]
default_settings = {'baudrate': 9600}
X_PATTERN = re.compile(r'X(\d)A\dC\dP([0-9A-F]{2})([0-9A-F]{2})([0-9A-F]{2})([0-9A-F]{2})S[0-9A-F]O\dE\d$')
statusbits = 0
ini_valves = 0
def doPoll(self):
reply = self.communicate('X')
match = self.X_PATTERN.match(reply)
if match:
statuslist = match.groups()
# REVISE THE METHOD FROM HERE ON
# CHECK JUPYTER NOTEBOOK
# a hex digit indicating if motorized valves are still initializing
self.ini_valves = int(statuslist[0], 16)
class Valve(Base, Writable):
ioClass = IGH_IO
target = Parameter('open or close valve', EnumType(open=1, close=0))
addr = Property('valve number', IntRange(1,24))
def write_target(self, val):
# open: 2N, close: 2N + 1
self.change('P', (2 * self.addr + 1 - int(val)), 1)
def read_status(self):
# CHECK
status = self.io.statusbits & (1 << self.addr)
if status is not None:
return status
return IDLE, ''
class PulsedValve(Valve):
delay = Parameter('time valve is open', FloatRange(unit='s'))
_start = 0
def write_target(self, val):
if val:
self._start = time.time()
self.setFastPoll(True, 0.01)
else:
self.setFastPoll(False)
self.change('P', (2 * self.addr + 1 - int(val)), 1)
def doPoll(self):
super().doPoll()
if self._start:
if time.time() > self._start + self.delay:
self.write_target(False) # turn valve off
#self.setFastPoll(False)
self._start = 0
class MotorValve(Base, Drivable):
pass
class Pressure(Base, Readable):
pass
class Heater(Base, Writable):
pass
class N2Sensor():
# ask Markus
pass
class Pump(Base, Writable):
pass