frappy/frappy_psi/oxinst.py
Markus Zolliker 8eaad86b66 WIP: old oxford devices (ILM, IPS, IGH...)
Change-Id: I4ca0dc6149d257818d300db4d886a1e33e8210be
2025-02-13 09:39:49 +01:00

277 lines
9.9 KiB
Python

# *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
# *****************************************************************************
"""oxford instruments old devices (ILM, IGH, IPS)"""
import re
import time
from frappy.core import StringIO, HasIO, Readable, Parameter, ERROR, IDLE, PREPARING
from frappy.datatypes import FloatRange, BoolType, EnumType
from frappy.errors import HardwareError, RangeError
from frappy.lib import formatStatusBits, clamp
from frappy.lib.enum import Enum
from frappy_psi.magfield import Magfield
from frappy.states import Retry
class IlmIO(StringIO):
end_of_line = '\r'
identification = [('V', r'ILM200.*')]
timeout = 5
class OxiBase(HasIO):
def query(self, cmd, dig=0):
reply = self.communicate(cmd)
if reply[0] == cmd[0]:
if '.' not in reply and dig > 0:
# add decimal point if not already there (for older systems)
reply = f'{reply[1:-dig]}.{reply[-dig:]}'
try:
value = float(reply)
return value
except Exception:
pass
raise HardwareError(f'bad reply {reply!r} to {cmd!r}')
def command(self, *cmds):
try:
self.communicate('C3')
for cmd in cmds:
self.communicate(cmd)
finally:
self.communicate('C0')
def change(self, cmd, query, value):
try:
self.communicate('C3')
self.communicate(f'{cmd}{value:g}')
return self.query(query)
finally:
self.communicate('C0')
class Level(OxiBase, Readable):
ioClass = IlmIO
value = Parameter(datatype=FloatRange(unit='%'))
CHANNEL = None
XPAT = re.compile(r'X(\d)(\d)(\d)S([0-9A-F]{2}){3}R\d\d$')
FLUID = None
_statusbits = None
def read_value(self):
return self.query(f'R{self.CHANNEL}', 1)
def write_fast(self, fast):
self.command(f'T{self.CHANNEL}' if fast else f'S{self.CHANNEL}')
def get_status(self):
reply = self.communicate('X')
match = self.XPAT.match(reply)
if match:
statuslist = match.groups()
if statuslist[self.CHANNEL] == '9':
return ERROR, f'error on {self.FLUID} level channel (not connected?)'
if statuslist[self.CHANNEL] != '2':
return ERROR, f'{self.FLUID} level channel not configured properly'
self._statusbits = int(statuslist[self.CHANNEL + 3], 16)
return None
return ERROR, f'bad status message {reply}'
class HeLevel:
CHANNEL = 1
FLUID = 'He'
fast = Parameter('measuring mode: True is fast', BoolType())
def read_status(self):
status = self.get_status()
if status is not None:
return status
return IDLE, formatStatusBits(self._statusbits, ['meas', 'fast', 'slow'])
class N2Level:
CHANNEL = 2
MEDIUM = 'N2'
def read_status(self):
status = self.get_status()
if status is not None:
return status
return IDLE, ''
A = Enum(hold=0, run_to_set=1, run_to_zero=2, clamped=4)
class Field(OxiBase, Magfield):
action = Parameter('action', EnumType(A), readonly=False)
setpoint = Parameter('field setpoint', FloatRange(unit='T'), default=0)
voltage = Parameter('leads voltage', FloatRange(unit='V'), default=0)
atob = Parameter('field to amp', FloatRange(0, unit='A/T'), default=0)
working_ramp = Parameter('effective ramp', FloatRange(0, unit='T/min'), default=0)
persistent_field = Parameter(
'persistent field at last switch off', FloatRange(unit='$'), readonly=False)
wait_switch_on = Parameter(default=60)
wait_switch_off = Parameter(default=60)
forced_persistent_field = Parameter(
'manual indication that persistent field is bad', BoolType(), readonly=False, default=False)
XPAT = re.compile(r'X(\d)(\d)A(\d)C\dH(\d)M(\d\d)P\d\d$')
def startModule(self, start_events):
# on restart, assume switch is changed long time ago, if not, the mercury
# will complain and this will be handled in start_ramp_to_field
self.switch_on_time = 0
self.switch_off_time = 0
super().startModule(start_events)
def read_value(self):
current = self.query('R7')
if self.switch_heater == self.switch_heater.on:
self.__persistent_field = current
self.forced_persistent_field = False
self._field_mismatch = False
return current
pf = self.query('R18')
if self.__persistent_field is None:
self.__persistent_field = pf
self._field_mismatch = False
else:
self._field_mismatch = abs(self.__persistent_field - pf) > self.tolerance * 10
self.persistent_field = self.__persistent_field
return self.__persistent_field
def read_current(self):
current = self.query('R2')
return current / self.atob
def write_persistent_field(self, value):
if self.forced_persistent_field or abs(self.__persistent_field - value) <= self.tolerance * 10:
self._field_mismatch = False
self.__persistent_field = value
return value
raise RangeError('changing persistent field needs forced_persistent_field=True')
def write_target(self, target):
if self._field_mismatch:
self.forced_persistent_field = True
raise RangeError('persistent field does not match - set persistent field to guessed value first')
return super().write_target(target)
def read_status(self):
status = super().read_status() # from HasStates
reply = self.communicate('X')
match = self.XPAT.match(reply)
statuslist = match.group()
if statuslist[0] != '0':
return ERROR, formatStatusBits(int(statuslist[0]),
['quenched', 'overheated', 'warming up', 'fault'])
# TODO: statuslist[1]: voltage / current limit status
self.action = int(statuslist[2])
if statuslist[3] >= '4':
return ERROR, 'auto run-down'
self.switch_heater = statuslist[3] == '1'
if statuslist[3] == '5':
return ERROR, 'switch heater failure'
# TODO: sweep mode (fast, slow), sweep limits
return status
def read_ramp(self):
return self.query('R9')
def write_ramp(self, value):
return self.change('T', 'R9', value)
def write_action(self, value):
return self.change(f'A{int(value)}')
def read_voltage(self):
return self.query('R1')
def read_working_ramp(self):
return self.query('R6')
def read_setpoint(self):
return self.query('R8')
def write_setpoint(self, value):
return self.change('J', 'R8', value)
def write_switch_heater(self, value):
self.read_status()
if value == self.switch_heater:
self.log.info('switch heater already %r', value)
# we do not want to restart the timer
return value
self.command('H1')
# inherit Magfield.start_field_change
def start_ramp_to_field(self, sm):
if abs(self.current - self.__persistent_field) <= self.tolerance:
self.log.info('leads %g are already at %g', self.current, self.__persistent_field)
return self.ramp_to_field
self.read_value()
self.write_setpoint(self.__persistent_field)
self.write_action(A.run_to_set)
return self.ramp_to_field
# inherit from Magfield: ramp_to_field, stabilize_current, start_switch_on
def wait_for_switch_on(self, sm):
self.read_status()
if self.switch_heater == self.switch_heater.off:
if sm.init: # avoid too many states chained
return Retry
self.log.warning('switch turned off manually?')
return self.start_switch_on
return super().wait_for_switch_on(sm) # will eventually return start_ramp_to_target
def start_ramp_to_target(self, sm):
self.write_action(A.run_to_set)
return self.ramp_to_target
def ramp_to_target(self, sm):
step = self.ramp / 4 # step to be done in 15 seconds
# change the setpoint only gradually, ramping stoppes soon after connection is lost
self.write_setpoint(clamp(self.target, self.setpoint + step, self.setpoint - step))
return super().ramp_to_target() # will eventually return stabilize_field
# inherit from Magfield: stabilize_field, check_switch_off, start_switch_off
def wait_for_switch_off(self, sm):
self.read_status()
if self.switch_heater == self.switch_heater.on:
if sm.init: # avoid too many states chained
return Retry
self.log.warning('switch turned on manually?')
return self.start_switch_off
return super().wait_for_switch_off(sm) # will eventually return start_ramp_to_zero
def start_ramp_to_zero(self, sm):
self.write_action(A.run_to_zero)
return self.ramp_to_zero
# inherit from Magfield: ramp_to_zero
def final_status(self, *args, **kwds):
self.write_action(A.hold)
return super().final_status(*args, **kwds)