From 600d11d3bbae31d61d37718cd9399c8b7fb16123 Mon Sep 17 00:00:00 2001 From: Anik Stark Date: Wed, 7 Jan 2026 18:09:15 +0100 Subject: [PATCH] frappy_psi.oiclassic: add IGH (not finished) --- frappy_psi/oiclassic.py | 295 ++++++++++++++++++++++++++++++---------- 1 file changed, 223 insertions(+), 72 deletions(-) diff --git a/frappy_psi/oiclassic.py b/frappy_psi/oiclassic.py index 9ed2fd80..ed2aaf71 100644 --- a/frappy_psi/oiclassic.py +++ b/frappy_psi/oiclassic.py @@ -18,12 +18,12 @@ # Markus Zolliker # Anik Stark # ***************************************************************************** -"""older generation (classic) oxford instruments""" +"""oxford instruments old (classic) devices (ILM, IGH, IPS)""" import time import re -from frappy.core import Parameter, Property, EnumType, FloatRange, IntRange, BoolType, \ - StringIO, HasIO, Readable, Writable, Drivable, IDLE, WARN, ERROR +from frappy.core import Parameter, Property, EnumType, FloatRange, IntRange, BoolType, StringType, \ + StringIO, HasIO, Readable, Writable, Drivable, IDLE, BUSY, WARN, ERROR, Attached from frappy.lib import formatStatusBits from frappy.lib.enum import Enum from frappy.errors import BadValueError, HardwareError, CommunicationFailedError @@ -32,19 +32,19 @@ from frappy.states import Retry def bit(x, pos): + """Check if the bit at a certain position is set""" return bool(x & (1 << pos)) -class Base(HasIO): +class OxBase(HasIO): def query(self, cmd, scale=None): reply = self.communicate(cmd) if reply[0] != cmd[0]: raise CommunicationFailedError(f'bad reply: {reply} to command {cmd}') - try: - return float(reply[1:]) * scale - except Exception: - pass + if scale is None: + return int(reply[1:]) + return float(reply[1:]) * scale def change(self, cmd, value, scale=None): try: @@ -86,7 +86,7 @@ limit_map = {'0': (IDLE, ''), } -class Field(Base, Magfield): +class Field(OxBase, Magfield): """ read commands: R1 measured power supply voltage (V) R7 demand field (output field) (T) @@ -340,7 +340,7 @@ class ILM_IO(StringIO): timeout = 5 -class Level(Base, Readable): +class Level(OxBase, Readable): """ X code: XcccSuuvvwwRzz c: position corresponds to channel 1, 2, 3 @@ -379,6 +379,7 @@ class Level(Base, Readable): class HeLevel(Level): + value = Parameter('He level', FloatRange(unit='%')) fast = Parameter('switching fast/slow', datatype=BoolType(), readonly=False) CHANNEL = 1 MEDIUM = 'He' @@ -405,110 +406,260 @@ class N2Level(Level): return IDLE, '' -VALVE_MAP = {'01': 'V9', - '02': 'V8', - '03': 'V7', - '04': 'V11A', - '05': 'V13A', - '06': 'V13B', - '07': 'V11B', - '08': 'V12B', - '09': 'He4 rotary pump', - '10': 'V1', - '11': 'V5', - '12': 'V4', - '13': 'V3', - '14': 'V14', - '15': 'V10', - '16': 'V2', - '17': 'V2A He4', - '18': 'V1A He4', - '19': 'V5A He4', - '20': 'V4A He4', - '21': 'V3A He4', - '22': 'roots pump', - '23': 'unlabeled pump', - '24': 'He3 rotary pump', +VALVE_MAP = {'V9': 1, + 'V8': 2, + 'V7': 3, + 'V11A': 4, + 'V13A': 5, + 'V13B': 6, + 'V11B': 7, + 'V12B': 8, + 'rotary_pump_He4': 9, + 'V1': 10, + 'V5': 11, + 'V4': 12, + 'V3': 13, + 'V14' : 14, + 'V10': 15, + 'V2': 16, + 'V2A_He4': 17, + 'V1A_He4': 18, + 'V5A_He4': 19, + 'V4A_He4': 20, + 'V3A_He4': 21, + 'roots_pump': 22, + 'unlabeled_pump': 23, + 'rotary_pump_He3': 24, } class IGH_IO(StringIO): - """oxford instruments dilution gas handling Kelvinox IGH""" + """ oxford instruments dilution gas handling Kelvinox IGH + + X code: XxAaCcPpppSsOoEe + x motorized valves are still initializing + a mix heater activity + c control status (0, 1, 2, 3) + pppp 4 hex numbers (two digits each), state of solenoid valves and pumps + s hex digit, state of the 3 motorized valves + o still and sorb heater information + e mix heater power range """ + end_of_line = '\r' identification = [('V', r'IGH.*')] default_settings = {'baudrate': 9600} - X_PATTERN = re.compile(r'X(\d)A\dC\dP([0-9A-F]{2})([0-9A-F]{2})([0-9A-F]{2})([0-9A-F]{2})S[0-9A-F]O\dE\d$') - statusbits = 0 - ini_valves = 0 + X_PATTERN = re.compile(r'X(\d)A\dC\dP([0-9A-F]{8})S([0-9A-F])O(\d)E(\d)$') + _ini_valves = 0 # ini status of motorized valves + _valves = '' # status of solenoid valves and pumps + _heater_range = 0 def doPoll(self): reply = self.communicate('X') match = self.X_PATTERN.match(reply) if match: - statuslist = match.groups() - # REVISE THE METHOD FROM HERE ON - # CHECK JUPYTER NOTEBOOK - # a hex digit indicating if motorized valves are still initializing - self.ini_valves = int(statuslist[0], 16) + ini_valves, valves, motor_status, heater_status, heater_range = match.groups() + self._ini_valves = int(ini_valves, 16) + self._valves = int(valves, 16) + self._motor_status = int(motor_status, 16) + self._heater_status = int(heater_status) + self._heater_range = int(heater_range) -class Valve(Base, Writable): +class Valve(OxBase, Writable): ioClass = IGH_IO - target = Parameter('open or close valve', EnumType(open=1, close=0)) - addr = Property('valve number', IntRange(1,24)) + value = Parameter('state of valve (open or close)', datatype=IntRange(0,1)) + target = Parameter('open or close valve', datatype=EnumType(open=1, close=0)) + addr = Property('valve name', datatype=EnumType(VALVE_MAP)) - def write_target(self, val): + def read_value(self): + # hex -> int -> check if bit in bin(integer) is set at the addr position + return bit(self.io._valves, self.addr.value - 1) + + def write_target(self, target): # open: 2N, close: 2N + 1 - self.change('P', (2 * self.addr + 1 - int(val)), 1) - - def read_status(self): - # CHECK - status = self.io.statusbits & (1 << self.addr) - if status is not None: - return status - return IDLE, '' + self.change('P', (2 * self.addr.value + 1 - int(target)), 1) class PulsedValve(Valve): - delay = Parameter('time valve is open', FloatRange(unit='s')) + delay = Parameter('delay (time valve is open)', FloatRange(unit='s')) _start = 0 - def write_target(self, val): - if val: + def write_target(self, target): + if target: self._start = time.time() self.setFastPoll(True, 0.01) else: self.setFastPoll(False) - self.change('P', (2 * self.addr + 1 - int(val)), 1) + self.change('P', (2 * self.addr.value + 1 - int(target)), 1) def doPoll(self): super().doPoll() if self._start: if time.time() > self._start + self.delay: - self.write_target(False) # turn valve off - #self.setFastPoll(False) + self.write_target(0) self._start = 0 -class MotorValve(Base, Drivable): - pass +class MotorValve(OxBase, Writable): + + ioClass = IGH_IO + + # TODO: class for valve 12 --> based on SlowMotorValve, but arrives instantanous, no busy state -class Pressure(Base, Readable): - pass +class SlowMotorValve(OxBase, Drivable): + + ioClass = IGH_IO + + target = Parameter('target of motor valve', datatype=FloatRange(0, 100, unit='%')) + motor_valve = Property('motor valves', datatype=StringType('V6')) + value = Parameter('position of valve', datatype=FloatRange(0, 100, unit='%')) + _prev_time = 0 + _prev = 0 + _direction = 0 + + def write_target(self, target): + self._prev_time = time.time() + self.change('G', target, 0.1) + self._direction = (target > self._prev) - (target < self._prev) + self._prev = target -class Heater(Base, Writable): - pass + def read_status(self): + if str(self.io._ini_valves) == '001': + return BUSY, 'valve 6 is initializing' + # TODO: estimate position of valve (update, and adapt if changing direction inbetween) + self.value = self.value + self._direction * (time.time() - self._prev_time) * 100 + if (self.io._motor_status >> 1) & 1: + return BUSY, 'valve is moving' + else: + return IDLE, '' -class N2Sensor(): - # ask Markus - pass +GAUGE_MAP = {'G1': 14, + 'G2': 15, + 'G3': 16, + 'P1': 20, + 'P2': 21, + } -class Pump(Base, Writable): - pass + +class Pressure(OxBase, Readable): + + gauge_addr = Property('pressure gauge address', datatype=EnumType(GAUGE_MAP)) + + def read_value(self): + nr = GAUGE_MAP[self.gauge_addr] + if self.gauge_addr.startswith('G'): + return self.query(f'R{nr}', 0.1) + return self.query(f'R{nr}', 1) + + +class MixPower(OxBase, Writable): + + ioClass = IGH_IO + + target = Parameter('mix power', datatype=FloatRange(0, 0.02, unit='W')) + + def read_value(self): + scale = 10**-(7 - self.io._heater_range) + return self.query('R4', scale) + + def write_target(self, target): + if target: + self.command('A1') # on, fixed heater power + target = min(0.01999, target) + target_nW = str(int(target * 1e9)) + range_mix = max(1, len(target_nW) - 3) + scale = 10**-(7 - range_mix) + self.command(f'E{range_mix}') + self.change('M', target, scale) + else: + self.command('A0') # turn off + + def read_status(self): + # + pass + + +class SorbPower(OxBase, Writable): + + ioClass = IGH_IO + + target = Parameter('sorb power', datatype=FloatRange(0.001, 2, unit='W')) + writecmd = 'B' # in units of 1mW (range 0000 to 1999) + scale = 1e-3 + + def read_value(self): + if self.io._heater_status & 6: + return self.query('R6', self.scale) + return 0 + + def write_target(self, target): + if target: + self.change('O', self.io._heater_status & 1 | 4) + else: + self.change('O', self.io._heater_status & 1) + self.change('B', target, self.scale) + + def read_status(self): + sorb_status = self.io._heater_status & 6 + if sorb_status == 2: + return WARN, 'sorb in control mode' + return IDLE, ('on' if sorb_status else 'off') + + +class StillPower(OxBase, Writable): + + ioClass = IGH_IO + + target = Parameter('still power', datatype=FloatRange(0.0001, 0.2, unit='W')) + readcmd = 'R5' + writecmd = 'S' # in units of 0.1mW (range 0000 to 1999) + scale = 1e-4 + + # bit-wise arithmetik analog SorbPower (zu checkende position wechseln) + + +class N2Sensor(Readable): + + value = Parameter(datatype=FloatRange(unit='Ohm')) + + +class PumpFeedback(Valve): + + value = Parameter('pump feedback', datatype=BoolType()) + upper_LN2 = Attached() + lower_LN2 = Attached() + PATTERN = re.compile(r'?(\d),(\d),(\d)') + + def read_value(self): + reply = self.communicate('{r}') + match = self.PATTERN.match(reply) + if match: + self.value, self.upper_LN2, self.lower_LN2 = match.groups() + self.upper_LN2 = 0.1 * self.upper_LN2 + self.lower_LN2 = 0.1 * self.lower_LN2 + # lesen: {r} + # N2 sensor und pumpe laufen hin ueber arduino, kommando {cmd}, + # zuruck via IGH, welches ? voranstellt, + # antwort ist '?{\d,\d,\d}' # 0,1 pumpe on/off , widerstand * 0.1 (lower), widerstand * 0.1 (upper) + return self.value + + def read_target(self): + # hex -> int -> check if bit in bin(integer) is set at the addr position + return bit(self.io._valves, self.addr.value - 1) + + def write_target(self, target): + # open: 2 * 24, close: 2 * 24 + 1 + self.change('P', 2 * self.addr.value + 1 - target, 1) + self.value = target + + def read_status(self): + if self.target and not self.value: + return WARN, 'pump switched off' + return IDLE, ''