frappy_psi.oiclassic: make communication more robust

Change-Id: I4de0c7d1e2db930e38c464e797f23befb7679e31
This commit is contained in:
2026-05-15 17:37:41 +02:00
parent 7a5d71d1cf
commit 68d678b32a
+75 -61
View File
@@ -37,34 +37,57 @@ def bit(x, pos):
class OxBase(HasIO):
def query(self, cmd, scale=None):
reply = self.communicate(cmd)
if reply[0] != cmd[0]:
raise CommunicationFailedError(f'bad reply: {reply} to command {cmd}')
pat = cmd[0] + r'([-+]?\d+\.?\d*)$'
value = self.io.request(cmd, pat)
if scale is None:
return int(reply[1:])
return float(reply[1:]) * scale
return int(value)
try:
return float(value) * scale
except Exception as e:
self.log.error('query cmd=%r pat=%r value=%r %r', cmd, pat, value, e)
def change(self, cmd, value, scale=None):
try:
self.communicate('C3')
reply = self.communicate(f'{cmd}{round(value / scale)}')
if reply[0] != cmd[0]:
raise CommunicationFailedError(f'bad reply: {reply}')
self.io.request('C3', 'C$')
self.io.request(f'{cmd}{round(value / scale)}', cmd)
finally:
self.communicate('C0')
self.io.request('C0', 'C$')
def command(self, *cmds):
try:
self.communicate('C3')
self.io.request('C3', 'C$')
for cmd in cmds:
self.communicate(cmd)
self.io.request(cmd, cmd[0] + '$')
finally:
self.communicate('C0')
self.io.request('C0', 'C$')
class IPS_IO(StringIO):
class OI_IO(StringIO):
def request(self, cmd, pattern):
reply = None
for _ in range(5):
try:
reply = self.communicate(cmd)
except Exception as e:
self.log.warn('commerror %r', e)
continue
match = re.match(pattern, reply)
if match:
result = match.groups()
return result[0] if len(result) == 1 else result
raise CommunicationFailedError('can not parse %r', reply)
def checkHWIdent(self):
for _ in range(9):
try:
return super().checkHWIdent()
except Exception as e:
pass
return super().checkHWIdent()
class IPS_IO(OI_IO):
"""oxford instruments power supply IPS120-10"""
end_of_line = '\r'
identification = [('V', r'IPS120-10.*')] # instrument type and software version
@@ -190,13 +213,9 @@ class Field(OxBase, Magfield):
return self.switch_heater
def read_status(self):
status = self.communicate('X')
match = re.match(r'X(\d\d)A(\d)C\dH(\d)M\d\dP\d\d', status)
if match is None:
raise CommunicationFailedError(f'unexpected status: {status}')
self._status = match.group(1)
self.action = int(match.group(2))
self.switch_heater = match.group(3) == '1'
self._status, action, sw = self.io.request('X', r'X(\d\d)A(\d)C\dH(\d)M\d\dP\d\d')
self.action = int(action)
self.switch_heater = sw == '1'
if self._status[0] != '0':
self._state_machine.stop()
return status_map.get(self._status[0], (ERROR, f'bad status: {self._status}'))
@@ -332,7 +351,7 @@ class Field(OxBase, Magfield):
self.change('M', '5' if value == 'off' else '1')
class ILM_IO(StringIO):
class ILM_IO(OI_IO):
"""oxford instruments level meter ILM200"""
end_of_line = '\r'
identification = [('V', r'ILM200.*')] # instrument type and software version
@@ -353,7 +372,6 @@ class Level(OxBase, Readable):
value = Parameter('level', datatype=FloatRange(unit='%'))
fast = Parameter('fast reading', datatype=BoolType())
CHANNEL = None
X_PATTERN = re.compile(r'X(\d)(\d)(\d)S([0-9A-F]{2})([0-9A-F]{2})([0-9A-F]{2})R\d\d$')
MEDIUM = None
_statusbits = None
@@ -364,18 +382,17 @@ class Level(OxBase, Readable):
self.command(f'T{self.CHANNEL}' if fast else f'S{self.CHANNEL}')
def get_status(self):
reply = self.communicate('X')
match = self.X_PATTERN.match(reply)
if match:
statuslist = match.groups()
if statuslist[self.CHANNEL] == '9':
return ERROR, f'error on {self.MEDIUM} level channel (not connected?)'
if (statuslist[self.CHANNEL] == '1') != (self.MEDIUM == 'N2'):
# '1': channel is used for N2
return ERROR, f'{self.MEDIUM} level channel not configured properly'
self._statusbits = int(statuslist[self.CHANNEL + 3], 16)
return None
return ERROR, f'bad status message {reply}'
try:
statuslist = self.io.request('X', r'X(\d)(\d)(\d)S([0-9A-F]{2})([0-9A-F]{2})([0-9A-F]{2})R\d\d$')
except Exception as e:
return ERROR, repr(e)
if statuslist[self.CHANNEL] == '9':
return ERROR, f'error on {self.MEDIUM} level channel (not connected?)'
if (statuslist[self.CHANNEL] == '1') != (self.MEDIUM == 'N2'):
# '1': channel is used for N2
return ERROR, f'{self.MEDIUM} level channel not configured properly'
self._statusbits = int(statuslist[self.CHANNEL + 3], 16)
return None
class HeLevel(Level):
@@ -434,7 +451,7 @@ VALVE_MAP = {'V9': 1,
}
class IGH_IO(StringIO):
class IGH_IO(OI_IO):
""" oxford instruments dilution gas handling Kelvinox IGH
X code: XxAaCcPpppSsOoEe
@@ -450,7 +467,6 @@ class IGH_IO(StringIO):
identification = [('V', r'IGH.*')]
default_settings = {'baudrate': 9600}
X_PATTERN = re.compile(r'X(\d)A(\d)C\dP([0-9A-F]{8})S([0-9A-F])O(\d)E(\d)$')
_ini_valves = 0 # ini status of motorized valves
_mix_status = 0
_valves = 0 # status of solenoid valves and pumps
@@ -459,16 +475,14 @@ class IGH_IO(StringIO):
_heater_range = 0
def doPoll(self):
reply = self.communicate('X')
match = self.X_PATTERN.match(reply)
if match:
ini_valves, mix_status, valves, motor_status, heater_status, heater_range = match.groups()
self._ini_valves = int(ini_valves, 16)
self._mix_status = int(mix_status)
self._valves = int(valves, 16)
self._motor_status = int(motor_status, 16)
self._heater_status = int(heater_status)
self._heater_range = int(heater_range)
reply = self.request('X', r'X(\d)A(\d)C\dP([0-9A-F]{8})S([0-9A-F])O(\d)E(\d)$')
ini_valves, mix_status, valves, motor_status, heater_status, heater_range = reply
self._ini_valves = int(ini_valves, 16)
self._mix_status = int(mix_status)
self._valves = int(valves, 16)
self._motor_status = int(motor_status, 16)
self._heater_status = int(heater_status)
self._heater_range = int(heater_range)
class Valve(OxBase, Writable):
@@ -513,11 +527,11 @@ class MotorValve(OxBase, Writable):
ioClass = IGH_IO
target = Parameter('target of motor valve', datatype=FloatRange(0, 100, unit='%'))
value = Parameter('position of fast valve', datatype=FloatRange(0, 100, unit='%'))
target = Parameter('target of motor valve', datatype=FloatRange(0, 1))
value = Parameter('position of fast valve', datatype=FloatRange(0, 1))
def write_target(self, target):
self.change('H', target, 0.1) # valve V12A
self.change('H', target, 1/999.) # valve V12A
self.value = target
def read_value(self):
@@ -534,21 +548,21 @@ class SlowMotorValve(OxBase, Drivable):
ioClass = IGH_IO
target = Parameter('target of slow motor valve', datatype=FloatRange(0, 100, unit='%', fmtstr='%.1f'))
value = Parameter('position of slow valve', datatype=FloatRange(0, 100, unit='%', fmtstr='%.1f'))
target = Parameter('target of slow motor valve', datatype=FloatRange(0, 1, fmtstr='%.3g'))
value = Parameter('position of slow valve', datatype=FloatRange(0, 1, fmtstr='%.3g'))
_prev_time = 0
def read_target(self):
return self.query('R7', 0.1)
return self.query('R7', 1/999.)
def write_target(self, target):
self.change('G', target, 0.1) # valve V6
self.change('G', target, 1/999.) # valve V6
self.read_status()
def read_status(self):
if bit(self.io._ini_valves, 0):
self.value = 0
return BUSY, 'valve V6 is initializing'
return BUSY, 'initializing'
now = time.time()
if self._prev_time == 0:
self.value = self.read_target()
@@ -558,10 +572,10 @@ class SlowMotorValve(OxBase, Drivable):
self._prev_time = now
if (self.io._motor_status >> 0) & 1:
if self.target > self.value:
self.value = min(self.target, self.value + delta_t / 300 * 100)
else:
self.value = max(self.target, self.value - delta_t / 300 * 100)
return BUSY, 'valve V6 is moving'
self.value = min(self.target, self.value + delta_t / 300)
return BUSY, 'opening'
self.value = max(self.target, self.value - delta_t / 300)
return BUSY, 'closing'
self.value = self.target
return IDLE, ''
@@ -603,7 +617,7 @@ class MixPower(OxBase, Writable):
def write_target(self, target):
if target:
self.command('A1') # on, fixed heater power
target = min(0.01999, target)
target = min(0.0199, target)
target_nW = str(int(target * 1e9))
range_mix = max(1, len(target_nW) - 3)
if target_nW >= '2000':