#!/usr/bin/env python # ***************************************************************************** # This program is free software; you can redistribute it and/or modify it under # the terms of the GNU General Public License as published by the Free Software # Foundation; either version 2 of the License, or (at your option) any later # version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more # details. # # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # Module authors: # Markus Zolliker # Anik Stark # ***************************************************************************** """oxford instruments old (classic) devices (ILM, IGH, IPS)""" import time import re from frappy.core import Parameter, Property, EnumType, FloatRange, IntRange, BoolType, StringType, \ StringIO, HasIO, Readable, Writable, Drivable, IDLE, BUSY, WARN, ERROR, Attached from frappy.lib import formatStatusBits from frappy.lib.enum import Enum from frappy.errors import BadValueError, HardwareError, CommunicationFailedError from frappy_psi.magfield import Magfield, Status from frappy.states import Retry def bit(x, pos): """Check if the bit at a certain position is set""" return bool(x & (1 << pos)) class OxBase(HasIO): def query(self, cmd, scale=None): reply = self.communicate(cmd) if reply[0] != cmd[0]: raise CommunicationFailedError(f'bad reply: {reply} to command {cmd}') if scale is None: return int(reply[1:]) return float(reply[1:]) * scale def change(self, cmd, value, scale=None): try: self.communicate('C3') reply = self.communicate(f'{cmd}{round(value / scale)}') if reply[0] != cmd[0]: raise CommunicationFailedError(f'bad reply: {reply}') finally: self.communicate('C0') def command(self, *cmds): try: self.communicate('C3') for cmd in cmds: self.communicate(cmd) finally: self.communicate('C0') class IPS_IO(StringIO): """oxford instruments power supply IPS120-10""" end_of_line = '\r' identification = [('V', r'IPS120-10.*')] # instrument type and software version default_settings = {'baudrate': 9600} Action = Enum(hold=0, run_to_set=1, run_to_zero=2, clamped=4) status_map = {'0': (IDLE, ''), '1': (ERROR, 'quenched'), '2': (ERROR, 'overheated'), '4': (WARN, 'warming up'), '8': (ERROR, '') } limit_map = {'0': (IDLE, ''), '1': (WARN, 'on positive voltage limit'), '2': (WARN, 'on negative voltage limit'), '4': (ERROR, 'outside negative current limit'), '8': (ERROR, 'outside positive current limit') } class Field(OxBase, Magfield): """ read commands: R1 measured power supply voltage (V) R7 demand field (output field) (T) R8 setpoint (target field) (T) R9 sweep field rate (T/min) R18 persistent field (T) X Status control commands: A set activity T set field sweep rate H set switch heater J set target field """ ioClass = IPS_IO action = Parameter('action', EnumType(Action), readonly=False) setpoint = Parameter('field setpoint', FloatRange(unit='T'), default=0) voltage = Parameter('leads voltage', FloatRange(unit='V'), default=0) persistent_field = Parameter( 'persistent field at last switch off', FloatRange(unit='T'), readonly=False) wait_switch_on = Parameter(default=15) wait_switch_off = Parameter(default=15) wait_stable_field = Parameter(default=10) forced_persistent_field = Parameter( 'manual indication that persistent field is bad', BoolType(), readonly=False, default=False) switch_heater = Parameter('turn switch heater on/off', EnumType(off=0, on=1, forced=2), default=0) _field_mismatch = None __persistent_field = None # internal value of persistent field _status = '00' def initModule(self): super().initModule() try: self.write_action(Action.hold) except Exception as e: self.log.error('can not set to hold %r', e) def doPoll(self): super().doPoll() self.read_current() def initialReads(self): # on restart, assume switch is changed long time ago, if not, the mercury # will complain and this will be handled in start_ramp_to_field self.switch_on_time = 0 self.switch_off_time = 0 super().initialReads() def read_value(self): if self.switch_heater: self.__persistent_field = self.query('R7') self.forced_persistent_field = False self._field_mismatch = False return self.__persistent_field pf = self.query('R18') if self.__persistent_field is None: self.__persistent_field = pf self._field_mismatch = False else: self._field_mismatch = abs(self.__persistent_field - pf) > self.tolerance * 10 self.persistent_field = self.__persistent_field return self.__persistent_field def read_ramp(self): return self.query('R9') def write_ramp(self, value): self.change('T', value) return self.read_ramp() def write_action(self, value): self.change('A', int(value)) self.read_status() def read_voltage(self): return self.query('R1') def read_setpoint(self): return self.query('R8') def read_current(self): return self.query('R7') def write_persistent_field(self, value): if self.forced_persistent_field or abs(self.__persistent_field - value) <= self.tolerance * 10: self._field_mismatch = False self.__persistent_field = value return value raise BadValueError('changing persistent field needs forced_persistent_field=True') def write_target(self, target): if self._field_mismatch: self.forced_persistent_field = True raise BadValueError('persistent field does not match - set persistent field to guessed value first') return super().write_target(target) def read_switch_heater(self): self.read_status() return self.switch_heater def read_status(self): status = self.communicate('X') match = re.match(r'X(\d\d)A(\d)C\dH(\d)M\d\dP\d\d', status) if match is None: raise CommunicationFailedError(f'unexpected status: {status}') self._status = match.group(1) self.action = int(match.group(2)) self.switch_heater = match.group(3) == '1' if self._status[0] != '0': self._state_machine.stop() return status_map.get(self._status[0], (ERROR, f'bad status: {self._status}')) if self._status[1] != '0': return limit_map.get(self._status[1], (ERROR, f'bad status: {self._status}')) # need to stop sm too? return super().read_status() def write_switch_heater(self, value): if value == self.read_switch_heater(): self.log.info('switch heater already %r', value) # we do not want to restart the timer return value self.log.debug('switch time fixed for 10 sec') self.change('H', int(value)) #return result return int(value) def set_and_go(self, value): self.change('J', value) self.setpoint = self.read_current() assert self.write_action(Action.hold) == Action.hold assert self.write_action(Action.run_to_set) == Action.run_to_set def ramp_to_target(self, sm): try: return super().ramp_to_target(sm) except HardwareError: sm.try_cnt -= 1 if sm.try_cnt < 0: raise self.set_and_go(sm.target) return Retry def final_status(self, *args, **kwds): self.write_action(Action.hold) return super().final_status(*args, **kwds) def on_restart(self, sm): self.write_action(Action.hold) return super().on_restart(sm) def start_ramp_to_field(self, sm): if abs(self.current - self.__persistent_field) <= self.tolerance: self.log.info('leads %g are already at %g', self.current, self.__persistent_field) return self.ramp_to_field try: self.set_and_go(self.__persistent_field) except (HardwareError, AssertionError) as e: if self.switch_heater: self.log.warn('switch is already on!') return self.ramp_to_field self.log.warn('wait first for switch off current=%g pf=%g %r', self.current, self.__persistent_field, e) sm.after_wait = self.ramp_to_field return self.wait_for_switch return self.ramp_to_field def start_ramp_to_target(self, sm): sm.try_cnt = 5 try: self.set_and_go(sm.target) except (HardwareError, AssertionError) as e: self.log.warn('switch not yet ready %r', e) self.status = Status.PREPARING, 'wait for switch on' sm.after_wait = self.ramp_to_target return self.wait_for_switch return self.ramp_to_target def ramp_to_field(self, sm): try: return super().ramp_to_field(sm) except HardwareError: sm.try_cnt -= 1 if sm.try_cnt < 0: raise self.set_and_go(self.__persistent_field) return Retry def wait_for_switch(self, sm): if not sm.delta(10): return Retry try: self.log.warn('try again') # try again self.set_and_go(self.__persistent_field) except (HardwareError, AssertionError): return Retry return sm.after_wait def wait_for_switch_on(self, sm): self.read_switch_heater() # trigger switch_on/off_time if self.switch_heater == self.switch_heater.off: if sm.init: # avoid too many states chained return Retry self.log.warning('switch turned off manually?') return self.start_switch_on return super().wait_for_switch_on(sm) def wait_for_switch_off(self, sm): self.read_switch_heater() if self.switch_heater == self.switch_heater.on: if sm.init: # avoid too many states chained return Retry self.log.warning('switch turned on manually?') return self.start_switch_off return super().wait_for_switch_off(sm) def start_ramp_to_zero(self, sm): pf = self.query('R18') if abs(pf - self.value) > self.tolerance * 10: self.log.warning('persistent field %g does not match %g after switch off', pf, self.value) try: assert self.write_action(Action.hold) == Action.hold assert self.write_action(Action.run_to_zero) == Action.run_to_zero except (HardwareError, AssertionError) as e: self.log.warn('switch not yet ready %r', e) self.status = Status.PREPARING, 'wait for switch off' sm.after_wait = self.ramp_to_zero return self.wait_for_switch return self.ramp_to_zero def ramp_to_zero(self, sm): try: return super().ramp_to_zero(sm) except HardwareError: sm.try_cnt -= 1 if sm.try_cnt < 0: raise assert self.write_action(Action.hold) == Action.hold assert self.write_action(Action.run_to_zero) == Action.run_to_zero return Retry def write_trainmode(self, value): self.change('M', '5' if value == 'off' else '1') class ILM_IO(StringIO): """oxford instruments level meter ILM200""" end_of_line = '\r' identification = [('V', r'ILM200.*')] # instrument type and software version default_settings = {'baudrate': 9600} timeout = 5 class Level(OxBase, Readable): """ X code: XcccSuuvvwwRzz c: position corresponds to channel 1, 2, 3 possible values in each position are 0, 1, 2, 3, 9 vv, uu, ww: channel status for channel 1, 2, 3 respectively, 2 bits each zz: relay status """ ioClass = ILM_IO value = Parameter('level', datatype=FloatRange(unit='%')) CHANNEL = None X_PATTERN = re.compile(r'X(\d)(\d)(\d)S([0-9A-F]{2})([0-9A-F]{2})([0-9A-F]{2})R\d\d$') MEDIUM = None _statusbits = None def read_value(self): return self.query(f'R{self.CHANNEL}', 0.1) def write_fast(self, fast): self.command(f'T{self.CHANNEL}' if fast else f'S{self.CHANNEL}') def get_status(self): reply = self.communicate('X') match = self.X_PATTERN.match(reply) if match: statuslist = match.groups() if statuslist[self.CHANNEL] == '9': return ERROR, f'error on {self.MEDIUM} level channel (not connected?)' if (statuslist[self.CHANNEL] == '1') != (self.MEDIUM == 'N2'): # '1': channel is used for N2 return ERROR, f'{self.MEDIUM} level channel not configured properly' self._statusbits = int(statuslist[self.CHANNEL + 3], 16) return None return ERROR, f'bad status message {reply}' class HeLevel(Level): value = Parameter('He level', FloatRange(unit='%')) fast = Parameter('switching fast/slow', datatype=BoolType(), readonly=False) CHANNEL = 1 MEDIUM = 'He' def read_status(self): status = self.get_status() if status is not None: return status return IDLE, formatStatusBits(self._statusbits, ['meas', 'fast', 'slow']) class N2Level(Level): ioClass = ILM_IO value = Parameter('N2 level', FloatRange(unit='%')) CHANNEL = 2 MEDIUM = 'N2' def read_status(self): status = self.get_status() if status is not None: return status return IDLE, '' VALVE_MAP = {'V9': 1, 'V8': 2, 'V7': 3, 'V11A': 4, 'V13A': 5, 'V13B': 6, 'V11B': 7, 'V12B': 8, 'rotary_pump_He4': 9, 'V1': 10, 'V5': 11, 'V4': 12, 'V3': 13, 'V14' : 14, 'V10': 15, 'V2': 16, 'V2A_He4': 17, 'V1A_He4': 18, 'V5A_He4': 19, 'V4A_He4': 20, 'V3A_He4': 21, 'roots_pump': 22, 'unlabeled_pump': 23, 'rotary_pump_He3': 24, } class IGH_IO(StringIO): """ oxford instruments dilution gas handling Kelvinox IGH X code: XxAaCcPpppSsOoEe x motorized valves are still initializing a mix heater activity c control status (0, 1, 2, 3) pppp 4 hex numbers (two digits each), state of solenoid valves and pumps s hex digit, state of the 3 motorized valves o still and sorb heater information e mix heater power range """ end_of_line = '\r' identification = [('V', r'IGH.*')] default_settings = {'baudrate': 9600} X_PATTERN = re.compile(r'X(\d)A\dC\dP([0-9A-F]{8})S([0-9A-F])O(\d)E(\d)$') _ini_valves = 0 # ini status of motorized valves _valves = '' # status of solenoid valves and pumps _heater_range = 0 def doPoll(self): reply = self.communicate('X') match = self.X_PATTERN.match(reply) if match: ini_valves, valves, motor_status, heater_status, heater_range = match.groups() self._ini_valves = int(ini_valves, 16) self._valves = int(valves, 16) self._motor_status = int(motor_status, 16) self._heater_status = int(heater_status) self._heater_range = int(heater_range) class Valve(OxBase, Writable): ioClass = IGH_IO value = Parameter('state of valve (open or close)', datatype=IntRange(0,1)) target = Parameter('open or close valve', datatype=EnumType(open=1, close=0)) addr = Property('valve name', datatype=EnumType(VALVE_MAP)) def read_value(self): # hex -> int -> check if bit in bin(integer) is set at the addr position return bit(self.io._valves, self.addr.value - 1) def write_target(self, target): # open: 2N, close: 2N + 1 self.change('P', (2 * self.addr.value + 1 - int(target)), 1) class PulsedValve(Valve): delay = Parameter('delay (time valve is open)', FloatRange(unit='s')) _start = 0 def write_target(self, target): if target: self._start = time.time() self.setFastPoll(True, 0.01) else: self.setFastPoll(False) self.change('P', (2 * self.addr.value + 1 - int(target)), 1) def doPoll(self): super().doPoll() if self._start: if time.time() > self._start + self.delay: self.write_target(0) self._start = 0 class MotorValve(OxBase, Writable): ioClass = IGH_IO # TODO: class for valve 12 --> based on SlowMotorValve, but arrives instantanous, no busy state class SlowMotorValve(OxBase, Drivable): ioClass = IGH_IO target = Parameter('target of motor valve', datatype=FloatRange(0, 100, unit='%')) motor_valve = Property('motor valves', datatype=StringType('V6')) value = Parameter('position of valve', datatype=FloatRange(0, 100, unit='%')) _prev_time = 0 _prev = 0 _direction = 0 def write_target(self, target): self._prev_time = time.time() self.change('G', target, 0.1) self._direction = (target > self._prev) - (target < self._prev) self._prev = target def read_status(self): if str(self.io._ini_valves) == '001': return BUSY, 'valve 6 is initializing' # TODO: estimate position of valve (update, and adapt if changing direction inbetween) self.value = self.value + self._direction * (time.time() - self._prev_time) * 100 if (self.io._motor_status >> 1) & 1: return BUSY, 'valve is moving' else: return IDLE, '' GAUGE_MAP = {'G1': 14, 'G2': 15, 'G3': 16, 'P1': 20, 'P2': 21, } class Pressure(OxBase, Readable): gauge_addr = Property('pressure gauge address', datatype=EnumType(GAUGE_MAP)) def read_value(self): nr = GAUGE_MAP[self.gauge_addr] if self.gauge_addr.startswith('G'): return self.query(f'R{nr}', 0.1) return self.query(f'R{nr}', 1) class MixPower(OxBase, Writable): ioClass = IGH_IO target = Parameter('mix power', datatype=FloatRange(0, 0.02, unit='W')) def read_value(self): scale = 10**-(7 - self.io._heater_range) return self.query('R4', scale) def write_target(self, target): if target: self.command('A1') # on, fixed heater power target = min(0.01999, target) target_nW = str(int(target * 1e9)) range_mix = max(1, len(target_nW) - 3) scale = 10**-(7 - range_mix) self.command(f'E{range_mix}') self.change('M', target, scale) else: self.command('A0') # turn off def read_status(self): # pass class SorbPower(OxBase, Writable): ioClass = IGH_IO target = Parameter('sorb power', datatype=FloatRange(0.001, 2, unit='W')) writecmd = 'B' # in units of 1mW (range 0000 to 1999) scale = 1e-3 def read_value(self): if self.io._heater_status & 6: return self.query('R6', self.scale) return 0 def write_target(self, target): if target: self.change('O', self.io._heater_status & 1 | 4) else: self.change('O', self.io._heater_status & 1) self.change('B', target, self.scale) def read_status(self): sorb_status = self.io._heater_status & 6 if sorb_status == 2: return WARN, 'sorb in control mode' return IDLE, ('on' if sorb_status else 'off') class StillPower(OxBase, Writable): ioClass = IGH_IO target = Parameter('still power', datatype=FloatRange(0.0001, 0.2, unit='W')) readcmd = 'R5' writecmd = 'S' # in units of 0.1mW (range 0000 to 1999) scale = 1e-4 # bit-wise arithmetik analog SorbPower (zu checkende position wechseln) class N2Sensor(Readable): value = Parameter(datatype=FloatRange(unit='Ohm')) class PumpFeedback(Valve): value = Parameter('pump feedback', datatype=BoolType()) upper_LN2 = Attached() lower_LN2 = Attached() PATTERN = re.compile(r'?(\d),(\d),(\d)') def read_value(self): reply = self.communicate('{r}') match = self.PATTERN.match(reply) if match: self.value, self.upper_LN2, self.lower_LN2 = match.groups() self.upper_LN2 = 0.1 * self.upper_LN2 self.lower_LN2 = 0.1 * self.lower_LN2 # lesen: {r} # N2 sensor und pumpe laufen hin ueber arduino, kommando {cmd}, # zuruck via IGH, welches ? voranstellt, # antwort ist '?{\d,\d,\d}' # 0,1 pumpe on/off , widerstand * 0.1 (lower), widerstand * 0.1 (upper) return self.value def read_target(self): # hex -> int -> check if bit in bin(integer) is set at the addr position return bit(self.io._valves, self.addr.value - 1) def write_target(self, target): # open: 2 * 24, close: 2 * 24 + 1 self.change('P', 2 * self.addr.value + 1 - target, 1) self.value = target def read_status(self): if self.target and not self.value: return WARN, 'pump switched off' return IDLE, ''