From 6ea8bc6e52daae12bc0a47be1a262785ce0c45cb Mon Sep 17 00:00:00 2001 From: Anik Stark Date: Wed, 10 Dec 2025 17:10:54 +0100 Subject: [PATCH] frappy_psi: add oiclassic (not finished) --- frappy_psi/oiclassic.py | 514 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 514 insertions(+) create mode 100644 frappy_psi/oiclassic.py diff --git a/frappy_psi/oiclassic.py b/frappy_psi/oiclassic.py new file mode 100644 index 00000000..9ed2fd80 --- /dev/null +++ b/frappy_psi/oiclassic.py @@ -0,0 +1,514 @@ +#!/usr/bin/env python +# ***************************************************************************** +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free Software +# Foundation; either version 2 of the License, or (at your option) any later +# version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# this program; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# Module authors: +# Markus Zolliker +# Anik Stark +# ***************************************************************************** +"""older generation (classic) oxford instruments""" + +import time +import re +from frappy.core import Parameter, Property, EnumType, FloatRange, IntRange, BoolType, \ + StringIO, HasIO, Readable, Writable, Drivable, IDLE, WARN, ERROR +from frappy.lib import formatStatusBits +from frappy.lib.enum import Enum +from frappy.errors import BadValueError, HardwareError, CommunicationFailedError +from frappy_psi.magfield import Magfield, Status +from frappy.states import Retry + + +def bit(x, pos): + return bool(x & (1 << pos)) + + +class Base(HasIO): + + def query(self, cmd, scale=None): + reply = self.communicate(cmd) + if reply[0] != cmd[0]: + raise CommunicationFailedError(f'bad reply: {reply} to command {cmd}') + try: + return float(reply[1:]) * scale + except Exception: + pass + + def change(self, cmd, value, scale=None): + try: + self.communicate('C3') + reply = self.communicate(f'{cmd}{round(value / scale)}') + if reply[0] != cmd[0]: + raise CommunicationFailedError(f'bad reply: {reply}') + finally: + self.communicate('C0') + + def command(self, *cmds): + try: + self.communicate('C3') + for cmd in cmds: + self.communicate(cmd) + finally: + self.communicate('C0') + + +class IPS_IO(StringIO): + """oxford instruments power supply IPS120-10""" + end_of_line = '\r' + identification = [('V', r'IPS120-10.*')] # instrument type and software version + default_settings = {'baudrate': 9600} + + +Action = Enum(hold=0, run_to_set=1, run_to_zero=2, clamped=4) +status_map = {'0': (IDLE, ''), + '1': (ERROR, 'quenched'), + '2': (ERROR, 'overheated'), + '4': (WARN, 'warming up'), + '8': (ERROR, '') + } +limit_map = {'0': (IDLE, ''), + '1': (WARN, 'on positive voltage limit'), + '2': (WARN, 'on negative voltage limit'), + '4': (ERROR, 'outside negative current limit'), + '8': (ERROR, 'outside positive current limit') + } + + +class Field(Base, Magfield): + """ read commands: + R1 measured power supply voltage (V) + R7 demand field (output field) (T) + R8 setpoint (target field) (T) + R9 sweep field rate (T/min) + R18 persistent field (T) + X Status + + control commands: + A set activity + T set field sweep rate + H set switch heater + J set target field """ + + ioClass = IPS_IO + + action = Parameter('action', EnumType(Action), readonly=False) + setpoint = Parameter('field setpoint', FloatRange(unit='T'), default=0) + voltage = Parameter('leads voltage', FloatRange(unit='V'), default=0) + persistent_field = Parameter( + 'persistent field at last switch off', FloatRange(unit='T'), readonly=False) + wait_switch_on = Parameter(default=15) + wait_switch_off = Parameter(default=15) + wait_stable_field = Parameter(default=10) + forced_persistent_field = Parameter( + 'manual indication that persistent field is bad', BoolType(), readonly=False, default=False) + switch_heater = Parameter('turn switch heater on/off', EnumType(off=0, on=1, forced=2), default=0) + + _field_mismatch = None + __persistent_field = None # internal value of persistent field + _status = '00' + + def initModule(self): + super().initModule() + try: + self.write_action(Action.hold) + except Exception as e: + self.log.error('can not set to hold %r', e) + + def doPoll(self): + super().doPoll() + self.read_current() + + def initialReads(self): + # on restart, assume switch is changed long time ago, if not, the mercury + # will complain and this will be handled in start_ramp_to_field + self.switch_on_time = 0 + self.switch_off_time = 0 + super().initialReads() + + def read_value(self): + if self.switch_heater: + self.__persistent_field = self.query('R7') + self.forced_persistent_field = False + self._field_mismatch = False + return self.__persistent_field + pf = self.query('R18') + if self.__persistent_field is None: + self.__persistent_field = pf + self._field_mismatch = False + else: + self._field_mismatch = abs(self.__persistent_field - pf) > self.tolerance * 10 + self.persistent_field = self.__persistent_field + return self.__persistent_field + + def read_ramp(self): + return self.query('R9') + + def write_ramp(self, value): + self.change('T', value) + return self.read_ramp() + + def write_action(self, value): + self.change('A', int(value)) + self.read_status() + + def read_voltage(self): + return self.query('R1') + + def read_setpoint(self): + return self.query('R8') + + def read_current(self): + return self.query('R7') + + def write_persistent_field(self, value): + if self.forced_persistent_field or abs(self.__persistent_field - value) <= self.tolerance * 10: + self._field_mismatch = False + self.__persistent_field = value + return value + raise BadValueError('changing persistent field needs forced_persistent_field=True') + + def write_target(self, target): + if self._field_mismatch: + self.forced_persistent_field = True + raise BadValueError('persistent field does not match - set persistent field to guessed value first') + return super().write_target(target) + + def read_switch_heater(self): + self.read_status() + return self.switch_heater + + def read_status(self): + status = self.communicate('X') + match = re.match(r'X(\d\d)A(\d)C\dH(\d)M\d\dP\d\d', status) + if match is None: + raise CommunicationFailedError(f'unexpected status: {status}') + self._status = match.group(1) + self.action = int(match.group(2)) + self.switch_heater = match.group(3) == '1' + if self._status[0] != '0': + self._state_machine.stop() + return status_map.get(self._status[0], (ERROR, f'bad status: {self._status}')) + if self._status[1] != '0': + return limit_map.get(self._status[1], (ERROR, f'bad status: {self._status}')) # need to stop sm too? + return super().read_status() + + def write_switch_heater(self, value): + if value == self.read_switch_heater(): + self.log.info('switch heater already %r', value) + # we do not want to restart the timer + return value + self.log.debug('switch time fixed for 10 sec') + self.change('H', int(value)) + #return result + return int(value) + + def set_and_go(self, value): + self.change('J', value) + self.setpoint = self.read_current() + assert self.write_action(Action.hold) == Action.hold + assert self.write_action(Action.run_to_set) == Action.run_to_set + + def ramp_to_target(self, sm): + try: + return super().ramp_to_target(sm) + except HardwareError: + sm.try_cnt -= 1 + if sm.try_cnt < 0: + raise + self.set_and_go(sm.target) + return Retry + + def final_status(self, *args, **kwds): + self.write_action(Action.hold) + return super().final_status(*args, **kwds) + + def on_restart(self, sm): + self.write_action(Action.hold) + return super().on_restart(sm) + + def start_ramp_to_field(self, sm): + if abs(self.current - self.__persistent_field) <= self.tolerance: + self.log.info('leads %g are already at %g', self.current, self.__persistent_field) + return self.ramp_to_field + try: + self.set_and_go(self.__persistent_field) + except (HardwareError, AssertionError) as e: + if self.switch_heater: + self.log.warn('switch is already on!') + return self.ramp_to_field + self.log.warn('wait first for switch off current=%g pf=%g %r', self.current, self.__persistent_field, e) + sm.after_wait = self.ramp_to_field + return self.wait_for_switch + return self.ramp_to_field + + def start_ramp_to_target(self, sm): + sm.try_cnt = 5 + try: + self.set_and_go(sm.target) + except (HardwareError, AssertionError) as e: + self.log.warn('switch not yet ready %r', e) + self.status = Status.PREPARING, 'wait for switch on' + sm.after_wait = self.ramp_to_target + return self.wait_for_switch + return self.ramp_to_target + + def ramp_to_field(self, sm): + try: + return super().ramp_to_field(sm) + except HardwareError: + sm.try_cnt -= 1 + if sm.try_cnt < 0: + raise + self.set_and_go(self.__persistent_field) + return Retry + + def wait_for_switch(self, sm): + if not sm.delta(10): + return Retry + try: + self.log.warn('try again') + # try again + self.set_and_go(self.__persistent_field) + except (HardwareError, AssertionError): + return Retry + return sm.after_wait + + def wait_for_switch_on(self, sm): + self.read_switch_heater() # trigger switch_on/off_time + if self.switch_heater == self.switch_heater.off: + if sm.init: # avoid too many states chained + return Retry + self.log.warning('switch turned off manually?') + return self.start_switch_on + return super().wait_for_switch_on(sm) + + def wait_for_switch_off(self, sm): + self.read_switch_heater() + if self.switch_heater == self.switch_heater.on: + if sm.init: # avoid too many states chained + return Retry + self.log.warning('switch turned on manually?') + return self.start_switch_off + return super().wait_for_switch_off(sm) + + def start_ramp_to_zero(self, sm): + pf = self.query('R18') + if abs(pf - self.value) > self.tolerance * 10: + self.log.warning('persistent field %g does not match %g after switch off', pf, self.value) + try: + assert self.write_action(Action.hold) == Action.hold + assert self.write_action(Action.run_to_zero) == Action.run_to_zero + except (HardwareError, AssertionError) as e: + self.log.warn('switch not yet ready %r', e) + self.status = Status.PREPARING, 'wait for switch off' + sm.after_wait = self.ramp_to_zero + return self.wait_for_switch + return self.ramp_to_zero + + def ramp_to_zero(self, sm): + try: + return super().ramp_to_zero(sm) + except HardwareError: + sm.try_cnt -= 1 + if sm.try_cnt < 0: + raise + assert self.write_action(Action.hold) == Action.hold + assert self.write_action(Action.run_to_zero) == Action.run_to_zero + return Retry + + def write_trainmode(self, value): + self.change('M', '5' if value == 'off' else '1') + + +class ILM_IO(StringIO): + """oxford instruments level meter ILM200""" + end_of_line = '\r' + identification = [('V', r'ILM200.*')] # instrument type and software version + default_settings = {'baudrate': 9600} + timeout = 5 + + +class Level(Base, Readable): + + """ X code: XcccSuuvvwwRzz + c: position corresponds to channel 1, 2, 3 + possible values in each position are 0, 1, 2, 3, 9 + vv, uu, ww: channel status for channel 1, 2, 3 respectively, 2 bits each + zz: relay status """ + + ioClass = ILM_IO + + value = Parameter('level', datatype=FloatRange(unit='%')) + CHANNEL = None + X_PATTERN = re.compile(r'X(\d)(\d)(\d)S([0-9A-F]{2})([0-9A-F]{2})([0-9A-F]{2})R\d\d$') + MEDIUM = None + _statusbits = None + + def read_value(self): + return self.query(f'R{self.CHANNEL}', 0.1) + + def write_fast(self, fast): + self.command(f'T{self.CHANNEL}' if fast else f'S{self.CHANNEL}') + + def get_status(self): + reply = self.communicate('X') + match = self.X_PATTERN.match(reply) + if match: + statuslist = match.groups() + if statuslist[self.CHANNEL] == '9': + return ERROR, f'error on {self.MEDIUM} level channel (not connected?)' + if (statuslist[self.CHANNEL] == '1') != (self.MEDIUM == 'N2'): + # '1': channel is used for N2 + return ERROR, f'{self.MEDIUM} level channel not configured properly' + self._statusbits = int(statuslist[self.CHANNEL + 3], 16) + return None + return ERROR, f'bad status message {reply}' + + +class HeLevel(Level): + + fast = Parameter('switching fast/slow', datatype=BoolType(), readonly=False) + CHANNEL = 1 + MEDIUM = 'He' + + def read_status(self): + status = self.get_status() + if status is not None: + return status + return IDLE, formatStatusBits(self._statusbits, ['meas', 'fast', 'slow']) + + +class N2Level(Level): + + ioClass = ILM_IO + + value = Parameter('N2 level', FloatRange(unit='%')) + CHANNEL = 2 + MEDIUM = 'N2' + + def read_status(self): + status = self.get_status() + if status is not None: + return status + return IDLE, '' + + +VALVE_MAP = {'01': 'V9', + '02': 'V8', + '03': 'V7', + '04': 'V11A', + '05': 'V13A', + '06': 'V13B', + '07': 'V11B', + '08': 'V12B', + '09': 'He4 rotary pump', + '10': 'V1', + '11': 'V5', + '12': 'V4', + '13': 'V3', + '14': 'V14', + '15': 'V10', + '16': 'V2', + '17': 'V2A He4', + '18': 'V1A He4', + '19': 'V5A He4', + '20': 'V4A He4', + '21': 'V3A He4', + '22': 'roots pump', + '23': 'unlabeled pump', + '24': 'He3 rotary pump', + } + + +class IGH_IO(StringIO): + """oxford instruments dilution gas handling Kelvinox IGH""" + end_of_line = '\r' + identification = [('V', r'IGH.*')] + default_settings = {'baudrate': 9600} + + X_PATTERN = re.compile(r'X(\d)A\dC\dP([0-9A-F]{2})([0-9A-F]{2})([0-9A-F]{2})([0-9A-F]{2})S[0-9A-F]O\dE\d$') + statusbits = 0 + ini_valves = 0 + + def doPoll(self): + reply = self.communicate('X') + match = self.X_PATTERN.match(reply) + if match: + statuslist = match.groups() + # REVISE THE METHOD FROM HERE ON + # CHECK JUPYTER NOTEBOOK + # a hex digit indicating if motorized valves are still initializing + self.ini_valves = int(statuslist[0], 16) + + +class Valve(Base, Writable): + + ioClass = IGH_IO + + target = Parameter('open or close valve', EnumType(open=1, close=0)) + addr = Property('valve number', IntRange(1,24)) + + def write_target(self, val): + # open: 2N, close: 2N + 1 + self.change('P', (2 * self.addr + 1 - int(val)), 1) + + def read_status(self): + # CHECK + status = self.io.statusbits & (1 << self.addr) + if status is not None: + return status + return IDLE, '' + + +class PulsedValve(Valve): + + delay = Parameter('time valve is open', FloatRange(unit='s')) + _start = 0 + + def write_target(self, val): + if val: + self._start = time.time() + self.setFastPoll(True, 0.01) + else: + self.setFastPoll(False) + self.change('P', (2 * self.addr + 1 - int(val)), 1) + + def doPoll(self): + super().doPoll() + if self._start: + if time.time() > self._start + self.delay: + self.write_target(False) # turn valve off + #self.setFastPoll(False) + self._start = 0 + + +class MotorValve(Base, Drivable): + pass + + +class Pressure(Base, Readable): + pass + + +class Heater(Base, Writable): + pass + + +class N2Sensor(): + # ask Markus + pass + +class Pump(Base, Writable): + pass