# ***************************************************************************** # This program is free software; you can redistribute it and/or modify it under # the terms of the GNU General Public License as published by the Free Software # Foundation; either version 2 of the License, or (at your option) any later # version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more # details. # # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # Module authors: # Markus Zolliker # ***************************************************************************** """oxford instruments old devices (ILM, IGH, IPS)""" import re import time from frappy.core import StringIO, HasIO, Readable, Parameter, ERROR, IDLE, PREPARING from frappy.datatypes import FloatRange, BoolType, EnumType from frappy.errors import HardwareError, RangeError from frappy.lib import formatStatusBits, clamp from frappy.lib.enum import Enum from frappy_psi.magfield import Magfield from frappy.states import Retry class IlmIO(StringIO): end_of_line = '\r' identification = [('V', r'ILM200.*')] timeout = 5 class OxiBase(HasIO): def query(self, cmd, dig=0): reply = self.communicate(cmd) if reply[0] == cmd[0]: if '.' not in reply and dig > 0: # add decimal point if not already there (for older systems) reply = f'{reply[1:-dig]}.{reply[-dig:]}' try: value = float(reply) return value except Exception: pass raise HardwareError(f'bad reply {reply!r} to {cmd!r}') def command(self, *cmds): try: self.communicate('C3') for cmd in cmds: self.communicate(cmd) finally: self.communicate('C0') def change(self, cmd, query, value): try: self.communicate('C3') self.communicate(f'{cmd}{value:g}') return self.query(query) finally: self.communicate('C0') class Level(OxiBase, Readable): ioClass = IlmIO value = Parameter(datatype=FloatRange(unit='%')) CHANNEL = None XPAT = re.compile(r'X(\d)(\d)(\d)S([0-9A-F]{2}){3}R\d\d$') FLUID = None _statusbits = None def read_value(self): return self.query(f'R{self.CHANNEL}', 1) def write_fast(self, fast): self.command(f'T{self.CHANNEL}' if fast else f'S{self.CHANNEL}') def get_status(self): reply = self.communicate('X') match = self.XPAT.match(reply) if match: statuslist = match.groups() if statuslist[self.CHANNEL] == '9': return ERROR, f'error on {self.FLUID} level channel (not connected?)' if statuslist[self.CHANNEL] != '2': return ERROR, f'{self.FLUID} level channel not configured properly' self._statusbits = int(statuslist[self.CHANNEL + 3], 16) return None return ERROR, f'bad status message {reply}' class HeLevel: CHANNEL = 1 FLUID = 'He' fast = Parameter('measuring mode: True is fast', BoolType()) def read_status(self): status = self.get_status() if status is not None: return status return IDLE, formatStatusBits(self._statusbits, ['meas', 'fast', 'slow']) class N2Level: CHANNEL = 2 MEDIUM = 'N2' def read_status(self): status = self.get_status() if status is not None: return status return IDLE, '' A = Enum(hold=0, run_to_set=1, run_to_zero=2, clamped=4) class Field(OxiBase, Magfield): action = Parameter('action', EnumType(A), readonly=False) setpoint = Parameter('field setpoint', FloatRange(unit='T'), default=0) voltage = Parameter('leads voltage', FloatRange(unit='V'), default=0) atob = Parameter('field to amp', FloatRange(0, unit='A/T'), default=0) working_ramp = Parameter('effective ramp', FloatRange(0, unit='T/min'), default=0) persistent_field = Parameter( 'persistent field at last switch off', FloatRange(unit='$'), readonly=False) wait_switch_on = Parameter(default=60) wait_switch_off = Parameter(default=60) forced_persistent_field = Parameter( 'manual indication that persistent field is bad', BoolType(), readonly=False, default=False) XPAT = re.compile(r'X(\d)(\d)A(\d)C\dH(\d)M(\d\d)P\d\d$') def startModule(self, start_events): # on restart, assume switch is changed long time ago, if not, the mercury # will complain and this will be handled in start_ramp_to_field self.switch_on_time = 0 self.switch_off_time = 0 super().startModule(start_events) def read_value(self): current = self.query('R7') if self.switch_heater == self.switch_heater.on: self.__persistent_field = current self.forced_persistent_field = False self._field_mismatch = False return current pf = self.query('R18') if self.__persistent_field is None: self.__persistent_field = pf self._field_mismatch = False else: self._field_mismatch = abs(self.__persistent_field - pf) > self.tolerance * 10 self.persistent_field = self.__persistent_field return self.__persistent_field def read_current(self): current = self.query('R2') return current / self.atob def write_persistent_field(self, value): if self.forced_persistent_field or abs(self.__persistent_field - value) <= self.tolerance * 10: self._field_mismatch = False self.__persistent_field = value return value raise RangeError('changing persistent field needs forced_persistent_field=True') def write_target(self, target): if self._field_mismatch: self.forced_persistent_field = True raise RangeError('persistent field does not match - set persistent field to guessed value first') return super().write_target(target) def read_status(self): status = super().read_status() # from HasStates reply = self.communicate('X') match = self.XPAT.match(reply) statuslist = match.group() if statuslist[0] != '0': return ERROR, formatStatusBits(int(statuslist[0]), ['quenched', 'overheated', 'warming up', 'fault']) # TODO: statuslist[1]: voltage / current limit status self.action = int(statuslist[2]) if statuslist[3] >= '4': return ERROR, 'auto run-down' self.switch_heater = statuslist[3] == '1' if statuslist[3] == '5': return ERROR, 'switch heater failure' # TODO: sweep mode (fast, slow), sweep limits return status def read_ramp(self): return self.query('R9') def write_ramp(self, value): return self.change('T', 'R9', value) def write_action(self, value): return self.change(f'A{int(value)}') def read_voltage(self): return self.query('R1') def read_working_ramp(self): return self.query('R6') def read_setpoint(self): return self.query('R8') def write_setpoint(self, value): return self.change('J', 'R8', value) def write_switch_heater(self, value): self.read_status() if value == self.switch_heater: self.log.info('switch heater already %r', value) # we do not want to restart the timer return value self.command('H1') # inherit Magfield.start_field_change def start_ramp_to_field(self, sm): if abs(self.current - self.__persistent_field) <= self.tolerance: self.log.info('leads %g are already at %g', self.current, self.__persistent_field) return self.ramp_to_field self.read_value() self.write_setpoint(self.__persistent_field) self.write_action(A.run_to_set) return self.ramp_to_field # inherit from Magfield: ramp_to_field, stabilize_current, start_switch_on def wait_for_switch_on(self, sm): self.read_status() if self.switch_heater == self.switch_heater.off: if sm.init: # avoid too many states chained return Retry self.log.warning('switch turned off manually?') return self.start_switch_on return super().wait_for_switch_on(sm) # will eventually return start_ramp_to_target def start_ramp_to_target(self, sm): self.write_action(A.run_to_set) return self.ramp_to_target def ramp_to_target(self, sm): step = self.ramp / 4 # step to be done in 15 seconds # change the setpoint only gradually, ramping stoppes soon after connection is lost self.write_setpoint(clamp(self.target, self.setpoint + step, self.setpoint - step)) return super().ramp_to_target() # will eventually return stabilize_field # inherit from Magfield: stabilize_field, check_switch_off, start_switch_off def wait_for_switch_off(self, sm): self.read_status() if self.switch_heater == self.switch_heater.on: if sm.init: # avoid too many states chained return Retry self.log.warning('switch turned on manually?') return self.start_switch_off return super().wait_for_switch_off(sm) # will eventually return start_ramp_to_zero def start_ramp_to_zero(self, sm): self.write_action(A.run_to_zero) return self.ramp_to_zero # inherit from Magfield: ramp_to_zero def final_status(self, *args, **kwds): self.write_action(A.hold) return super().final_status(*args, **kwds)