frappy_psi.oiclassic: extend and test IGH

This commit is contained in:
2026-01-14 17:29:11 +01:00
parent 600d11d3bb
commit 07263281fd

View File

@@ -22,7 +22,7 @@
import time
import re
from frappy.core import Parameter, Property, EnumType, FloatRange, IntRange, BoolType, StringType, \
from frappy.core import Parameter, Property, EnumType, FloatRange, BoolType, \
StringIO, HasIO, Readable, Writable, Drivable, IDLE, BUSY, WARN, ERROR, Attached
from frappy.lib import formatStatusBits
from frappy.lib.enum import Enum
@@ -351,6 +351,7 @@ class Level(OxBase, Readable):
ioClass = ILM_IO
value = Parameter('level', datatype=FloatRange(unit='%'))
fast = Parameter('fast reading', datatype=BoolType())
CHANNEL = None
X_PATTERN = re.compile(r'X(\d)(\d)(\d)S([0-9A-F]{2})([0-9A-F]{2})([0-9A-F]{2})R\d\d$')
MEDIUM = None
@@ -449,17 +450,21 @@ class IGH_IO(StringIO):
identification = [('V', r'IGH.*')]
default_settings = {'baudrate': 9600}
X_PATTERN = re.compile(r'X(\d)A\dC\dP([0-9A-F]{8})S([0-9A-F])O(\d)E(\d)$')
X_PATTERN = re.compile(r'X(\d)A(\d)C\dP([0-9A-F]{8})S([0-9A-F])O(\d)E(\d)$')
_ini_valves = 0 # ini status of motorized valves
_valves = '' # status of solenoid valves and pumps
_mix_status = 0
_valves = 0 # status of solenoid valves and pumps
_motor_status = 0
_heater_status = 0
_heater_range = 0
def doPoll(self):
reply = self.communicate('X')
match = self.X_PATTERN.match(reply)
if match:
ini_valves, valves, motor_status, heater_status, heater_range = match.groups()
ini_valves, mix_status, valves, motor_status, heater_status, heater_range = match.groups()
self._ini_valves = int(ini_valves, 16)
self._mix_status = int(mix_status)
self._valves = int(valves, 16)
self._motor_status = int(motor_status, 16)
self._heater_status = int(heater_status)
@@ -470,7 +475,7 @@ class Valve(OxBase, Writable):
ioClass = IGH_IO
value = Parameter('state of valve (open or close)', datatype=IntRange(0,1))
value = Parameter('state of valve (open or close)', datatype=EnumType(open=1, close=0))
target = Parameter('open or close valve', datatype=EnumType(open=1, close=0))
addr = Property('valve name', datatype=EnumType(VALVE_MAP))
@@ -485,7 +490,7 @@ class Valve(OxBase, Writable):
class PulsedValve(Valve):
delay = Parameter('delay (time valve is open)', FloatRange(unit='s'))
delay = Parameter('delay (time valve is open)', FloatRange(unit='s'), readonly=False)
_start = 0
def write_target(self, target):
@@ -508,36 +513,61 @@ class MotorValve(OxBase, Writable):
ioClass = IGH_IO
# TODO: class for valve 12 --> based on SlowMotorValve, but arrives instantanous, no busy state
target = Parameter('target of motor valve', datatype=FloatRange(0, 100, unit='%'))
value = Parameter('position of fast valve', datatype=FloatRange(0, 100, unit='%'))
def write_target(self, target):
self.change('H', target, 0.1) # valve V12A
self.value = target
def read_value(self):
return self.target
def read_status(self):
if bit(self.io._ini_valves, 1):
self.value = 0
return BUSY, 'valve V12A is initializing'
return IDLE, ''
class SlowMotorValve(OxBase, Drivable):
ioClass = IGH_IO
target = Parameter('target of motor valve', datatype=FloatRange(0, 100, unit='%'))
motor_valve = Property('motor valves', datatype=StringType('V6'))
value = Parameter('position of valve', datatype=FloatRange(0, 100, unit='%'))
target = Parameter('target of slow motor valve', datatype=FloatRange(0, 100, unit='%', fmtstr='%.1f'))
value = Parameter('position of slow valve', datatype=FloatRange(0, 100, unit='%', fmtstr='%.1f'))
_prev_time = 0
_prev = 0
_direction = 0
def read_target(self):
return self.query('R7', 0.1)
def write_target(self, target):
self._prev_time = time.time()
self.change('G', target, 0.1)
self._direction = (target > self._prev) - (target < self._prev)
self._prev = target
self.change('G', target, 0.1) # valve V6
self.read_status()
def read_status(self):
if str(self.io._ini_valves) == '001':
return BUSY, 'valve 6 is initializing'
# TODO: estimate position of valve (update, and adapt if changing direction inbetween)
self.value = self.value + self._direction * (time.time() - self._prev_time) * 100
if (self.io._motor_status >> 1) & 1:
return BUSY, 'valve is moving'
if bit(self.io._ini_valves, 0):
self.value = 0
return BUSY, 'valve V6 is initializing'
now = time.time()
if self._prev_time == 0:
self.value = self.read_target()
delta_t = 0
else:
return IDLE, ''
delta_t = now - self._prev_time
self._prev_time = now
if (self.io._motor_status >> 0) & 1:
if self.target > self.value:
self.value = min(self.target, self.value + delta_t / 300 * 100)
else:
self.value = max(self.target, self.value - delta_t / 300 * 100)
return BUSY, 'valve V6 is moving'
self.value = self.target
return IDLE, ''
def stop(self):
"""stop moving"""
self.write_target(self.value)
GAUGE_MAP = {'G1': 14,
@@ -550,13 +580,13 @@ GAUGE_MAP = {'G1': 14,
class Pressure(OxBase, Readable):
gauge_addr = Property('pressure gauge address', datatype=EnumType(GAUGE_MAP))
addr = Property('pressure gauge address', datatype=EnumType(GAUGE_MAP))
def read_value(self):
nr = GAUGE_MAP[self.gauge_addr]
if self.gauge_addr.startswith('G'):
nr = self.addr.value
if self.addr.name.startswith('G'):
return self.query(f'R{nr}', 0.1)
return self.query(f'R{nr}', 1)
return self.query(f'R{nr}', 1)
class MixPower(OxBase, Writable):
@@ -564,6 +594,7 @@ class MixPower(OxBase, Writable):
ioClass = IGH_IO
target = Parameter('mix power', datatype=FloatRange(0, 0.02, unit='W'))
value = Parameter('mix power', datatype=FloatRange(0, 0.02, unit='W'))
def read_value(self):
scale = 10**-(7 - self.io._heater_range)
@@ -575,22 +606,30 @@ class MixPower(OxBase, Writable):
target = min(0.01999, target)
target_nW = str(int(target * 1e9))
range_mix = max(1, len(target_nW) - 3)
scale = 10**-(7 - range_mix)
if target_nW >= '2000':
range_mix += 1
scale = 10**-(10 - range_mix)
self.command(f'E{range_mix}')
self.change('M', target, scale)
else:
self.command('A0') # turn off
def read_status(self):
#
pass
if self.io._mix_status:
return IDLE, 'on'
return IDLE, 'off'
class SorbPower(OxBase, Writable):
""" heater status:
bit 0 still on
bit 1 sorb in temperature control (this ctr mode is not used)
bit 2 sorb in power control """
ioClass = IGH_IO
target = Parameter('sorb power', datatype=FloatRange(0.001, 2, unit='W'))
target = Parameter('sorb power', datatype=FloatRange(0, 2, unit='W')) # Werte 0.001, 2
writecmd = 'B' # in units of 1mW (range 0000 to 1999)
scale = 1e-3
@@ -600,55 +639,66 @@ class SorbPower(OxBase, Writable):
return 0
def write_target(self, target):
if target:
self.change('O', self.io._heater_status & 1 | 4)
else:
self.change('O', self.io._heater_status & 1)
self.change('O', self.io._heater_status & 1 | 4 * (target > 0), 1)
self.change('B', target, self.scale)
def read_status(self):
sorb_status = self.io._heater_status & 6
if sorb_status == 2:
return WARN, 'sorb in control mode'
return WARN, 'sorb in temperature control mode'
return IDLE, ('on' if sorb_status else 'off')
class StillPower(OxBase, Writable):
""" heater status:
bit 0 still on
bit 1 sorb in temperature control (this ctr mode is not used)
bit 2 sorb in power control """
ioClass = IGH_IO
target = Parameter('still power', datatype=FloatRange(0.0001, 0.2, unit='W'))
target = Parameter('still power', datatype=FloatRange(0, 0.2, unit='W'))
readcmd = 'R5'
writecmd = 'S' # in units of 0.1mW (range 0000 to 1999)
scale = 1e-4
# bit-wise arithmetik analog SorbPower (zu checkende position wechseln)
def read_value(self):
if self.io._heater_status & 1:
return self.query('R5', self.scale)
return 0
def write_target(self, target):
self.change('O', self.io._heater_status & 6 | (target > 0), 1)
self.change('S', target, self.scale)
def read_status(self):
sorb_status = self.io._heater_status & 1
return IDLE, ('on' if sorb_status else 'off')
class N2Sensor(Readable):
value = Parameter(datatype=FloatRange(unit='Ohm'))
value = Parameter(datatype=FloatRange(unit='K'))
class PumpFeedback(Valve):
class Pump(Valve):
value = Parameter('pump feedback', datatype=BoolType())
value = Parameter('state of valve (open or close)', datatype=EnumType(on=1, off=0))
target = Parameter('open or close valve', datatype=EnumType(on=1, off=0))
upper_LN2 = Attached()
lower_LN2 = Attached()
PATTERN = re.compile(r'?(\d),(\d),(\d)')
PATTERN = re.compile(r'\?\{(\d),(\d+),(\d+)\}')
def read_value(self):
reply = self.communicate('{r}')
match = self.PATTERN.match(reply)
if match:
self.value, self.upper_LN2, self.lower_LN2 = match.groups()
self.upper_LN2 = 0.1 * self.upper_LN2
self.lower_LN2 = 0.1 * self.lower_LN2
# lesen: {r}
# N2 sensor und pumpe laufen hin ueber arduino, kommando {cmd},
# zuruck via IGH, welches ? voranstellt,
# antwort ist '?{\d,\d,\d}' # 0,1 pumpe on/off , widerstand * 0.1 (lower), widerstand * 0.1 (upper)
return self.value
value, upper_LN2, lower_LN2 = match.groups()
self.upper_LN2.value = 0.1 * int(upper_LN2)
self.lower_LN2.value = 0.1 * int(lower_LN2)
return int(value)
raise CommunicationFailedError('bad reply to {r}')
def read_target(self):
# hex -> int -> check if bit in bin(integer) is set at the addr position