diff --git a/frappy_psi/oxinst.py b/frappy_psi/oxinst.py new file mode 100644 index 0000000..69fa539 --- /dev/null +++ b/frappy_psi/oxinst.py @@ -0,0 +1,276 @@ +# ***************************************************************************** +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free Software +# Foundation; either version 2 of the License, or (at your option) any later +# version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# this program; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# Module authors: +# Markus Zolliker +# ***************************************************************************** +"""oxford instruments old devices (ILM, IGH, IPS)""" + +import re +import time +from frappy.core import StringIO, HasIO, Readable, Parameter, ERROR, IDLE, PREPARING +from frappy.datatypes import FloatRange, BoolType, EnumType +from frappy.errors import HardwareError, RangeError +from frappy.lib import formatStatusBits, clamp +from frappy.lib.enum import Enum +from frappy_psi.magfield import Magfield +from frappy.states import Retry + + +class IlmIO(StringIO): + end_of_line = '\r' + identification = [('V', r'ILM200.*')] + timeout = 5 + + +class OxiBase(HasIO): + def query(self, cmd, dig=0): + reply = self.communicate(cmd) + if reply[0] == cmd[0]: + if '.' not in reply and dig > 0: + # add decimal point if not already there (for older systems) + reply = f'{reply[1:-dig]}.{reply[-dig:]}' + try: + value = float(reply) + return value + except Exception: + pass + raise HardwareError(f'bad reply {reply!r} to {cmd!r}') + + def command(self, *cmds): + try: + self.communicate('C3') + for cmd in cmds: + self.communicate(cmd) + finally: + self.communicate('C0') + + def change(self, cmd, query, value): + try: + self.communicate('C3') + self.communicate(f'{cmd}{value:g}') + return self.query(query) + finally: + self.communicate('C0') + + +class Level(OxiBase, Readable): + ioClass = IlmIO + value = Parameter(datatype=FloatRange(unit='%')) + CHANNEL = None + XPAT = re.compile(r'X(\d)(\d)(\d)S([0-9A-F]{2}){3}R\d\d$') + FLUID = None + _statusbits = None + + def read_value(self): + return self.query(f'R{self.CHANNEL}', 1) + + def write_fast(self, fast): + self.command(f'T{self.CHANNEL}' if fast else f'S{self.CHANNEL}') + + def get_status(self): + reply = self.communicate('X') + match = self.XPAT.match(reply) + if match: + statuslist = match.groups() + if statuslist[self.CHANNEL] == '9': + return ERROR, f'error on {self.FLUID} level channel (not connected?)' + if statuslist[self.CHANNEL] != '2': + return ERROR, f'{self.FLUID} level channel not configured properly' + self._statusbits = int(statuslist[self.CHANNEL + 3], 16) + return None + return ERROR, f'bad status message {reply}' + + +class HeLevel: + CHANNEL = 1 + FLUID = 'He' + fast = Parameter('measuring mode: True is fast', BoolType()) + + def read_status(self): + status = self.get_status() + if status is not None: + return status + return IDLE, formatStatusBits(self._statusbits, ['meas', 'fast', 'slow']) + + +class N2Level: + CHANNEL = 2 + MEDIUM = 'N2' + + def read_status(self): + status = self.get_status() + if status is not None: + return status + return IDLE, '' + + +A = Enum(hold=0, run_to_set=1, run_to_zero=2, clamped=4) + + +class Field(OxiBase, Magfield): + action = Parameter('action', EnumType(A), readonly=False) + setpoint = Parameter('field setpoint', FloatRange(unit='T'), default=0) + voltage = Parameter('leads voltage', FloatRange(unit='V'), default=0) + atob = Parameter('field to amp', FloatRange(0, unit='A/T'), default=0) + working_ramp = Parameter('effective ramp', FloatRange(0, unit='T/min'), default=0) + persistent_field = Parameter( + 'persistent field at last switch off', FloatRange(unit='$'), readonly=False) + wait_switch_on = Parameter(default=60) + wait_switch_off = Parameter(default=60) + forced_persistent_field = Parameter( + 'manual indication that persistent field is bad', BoolType(), readonly=False, default=False) + + XPAT = re.compile(r'X(\d)(\d)A(\d)C\dH(\d)M(\d\d)P\d\d$') + + def startModule(self, start_events): + # on restart, assume switch is changed long time ago, if not, the mercury + # will complain and this will be handled in start_ramp_to_field + self.switch_on_time = 0 + self.switch_off_time = 0 + super().startModule(start_events) + + def read_value(self): + current = self.query('R7') + if self.switch_heater == self.switch_heater.on: + self.__persistent_field = current + self.forced_persistent_field = False + self._field_mismatch = False + return current + pf = self.query('R18') + if self.__persistent_field is None: + self.__persistent_field = pf + self._field_mismatch = False + else: + self._field_mismatch = abs(self.__persistent_field - pf) > self.tolerance * 10 + self.persistent_field = self.__persistent_field + return self.__persistent_field + + def read_current(self): + current = self.query('R2') + return current / self.atob + + def write_persistent_field(self, value): + if self.forced_persistent_field or abs(self.__persistent_field - value) <= self.tolerance * 10: + self._field_mismatch = False + self.__persistent_field = value + return value + raise RangeError('changing persistent field needs forced_persistent_field=True') + + def write_target(self, target): + if self._field_mismatch: + self.forced_persistent_field = True + raise RangeError('persistent field does not match - set persistent field to guessed value first') + return super().write_target(target) + + def read_status(self): + status = super().read_status() # from HasStates + reply = self.communicate('X') + match = self.XPAT.match(reply) + statuslist = match.group() + if statuslist[0] != '0': + return ERROR, formatStatusBits(int(statuslist[0]), + ['quenched', 'overheated', 'warming up', 'fault']) + # TODO: statuslist[1]: voltage / current limit status + self.action = int(statuslist[2]) + if statuslist[3] >= '4': + return ERROR, 'auto run-down' + self.switch_heater = statuslist[3] == '1' + if statuslist[3] == '5': + return ERROR, 'switch heater failure' + # TODO: sweep mode (fast, slow), sweep limits + return status + + def read_ramp(self): + return self.query('R9') + + def write_ramp(self, value): + return self.change('T', 'R9', value) + + def write_action(self, value): + return self.change(f'A{int(value)}') + + def read_voltage(self): + return self.query('R1') + + def read_working_ramp(self): + return self.query('R6') + + def read_setpoint(self): + return self.query('R8') + + def write_setpoint(self, value): + return self.change('J', 'R8', value) + + def write_switch_heater(self, value): + self.read_status() + if value == self.switch_heater: + self.log.info('switch heater already %r', value) + # we do not want to restart the timer + return value + self.command('H1') + + # inherit Magfield.start_field_change + + def start_ramp_to_field(self, sm): + if abs(self.current - self.__persistent_field) <= self.tolerance: + self.log.info('leads %g are already at %g', self.current, self.__persistent_field) + return self.ramp_to_field + self.read_value() + self.write_setpoint(self.__persistent_field) + self.write_action(A.run_to_set) + return self.ramp_to_field + + # inherit from Magfield: ramp_to_field, stabilize_current, start_switch_on + + def wait_for_switch_on(self, sm): + self.read_status() + if self.switch_heater == self.switch_heater.off: + if sm.init: # avoid too many states chained + return Retry + self.log.warning('switch turned off manually?') + return self.start_switch_on + return super().wait_for_switch_on(sm) # will eventually return start_ramp_to_target + + def start_ramp_to_target(self, sm): + self.write_action(A.run_to_set) + return self.ramp_to_target + + def ramp_to_target(self, sm): + step = self.ramp / 4 # step to be done in 15 seconds + # change the setpoint only gradually, ramping stoppes soon after connection is lost + self.write_setpoint(clamp(self.target, self.setpoint + step, self.setpoint - step)) + return super().ramp_to_target() # will eventually return stabilize_field + + # inherit from Magfield: stabilize_field, check_switch_off, start_switch_off + + def wait_for_switch_off(self, sm): + self.read_status() + if self.switch_heater == self.switch_heater.on: + if sm.init: # avoid too many states chained + return Retry + self.log.warning('switch turned on manually?') + return self.start_switch_off + return super().wait_for_switch_off(sm) # will eventually return start_ramp_to_zero + + def start_ramp_to_zero(self, sm): + self.write_action(A.run_to_zero) + return self.ramp_to_zero + + # inherit from Magfield: ramp_to_zero + + def final_status(self, *args, **kwds): + self.write_action(A.hold) + return super().final_status(*args, **kwds)