Merge branch 'wip' of gitlab.psi.ch:samenv/frappy into wip

This commit is contained in:
l_samenv
2022-12-19 16:09:40 +01:00
15 changed files with 959 additions and 217 deletions

264
secop_psi/attocube.py Normal file
View File

@@ -0,0 +1,264 @@
# *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
# *****************************************************************************
import sys
import time
from secop.core import Drivable, Parameter, Command, Property, ERROR, WARN, BUSY, IDLE, Done, nopoll
from secop.features import HasTargetLimits, HasSimpleOffset
from secop.datatypes import IntRange, FloatRange, StringType, BoolType
from secop.errors import ConfigError, BadValueError
sys.path.append('/home/l_samenv/Documents/anc350/Linux64/userlib/lib')
from PyANC350v4 import Positioner
DIRECTION_NAME = {1: 'forward', -1: 'backward'}
class FreezeStatus:
"""freeze status for some time
hardware quite often does not treat status correctly: on a target change it
may take some time to return the 'busy' status correctly.
in classes with this mixin, within :meth:`write_target` call
self.freeze_status(0.5, BUSY, 'changed target')
a wrapper around read_status will take care that the status will be the given value,
for at least the given delay. This does NOT cover the case when self.status is set
directly from an other method.
"""
__freeze_status_until = 0
def __init_subclass__(cls):
def wrapped(self, inner=cls.read_status):
if time.time() < self.__freeze_status_until:
return Done
return inner(self)
cls.read_status = wrapped
super().__init_subclass__()
def freeze_status(self, delay, code=BUSY, text='changed target'):
"""freezze status to the given value for the given delay"""
self.__freeze_status_until = time.time() + delay
self.status = code, text
class Axis(HasTargetLimits, FreezeStatus, Drivable):
axis = Property('axis number', IntRange(0, 2), 0)
value = Parameter('axis position', FloatRange(unit='deg'))
frequency = Parameter('frequency', FloatRange(1, unit='Hz'), readonly=False)
amplitude = Parameter('amplitude', FloatRange(0, unit='V'), readonly=False)
gear = Parameter('gear factor', FloatRange(), readonly=False, default=1, initwrite=True)
tolerance = Parameter('positioning tolerance', FloatRange(0, unit='$'), readonly=False, default=0.01)
output = Parameter('enable output', BoolType(), readonly=False)
info = Parameter('axis info', StringType())
statusbits = Parameter('status bits', StringType())
_hw = Positioner()
_scale = 1 # scale for custom units
_move_steps = 0 # number of steps to move (used by move command)
SCALES = {'deg': 1, 'm': 1, 'mm': 1000, 'um': 1000000, 'µm': 1000000}
_direction = 1 # move direction
_idle_status = IDLE, ''
_error_state = '' # empty string: no error
_history = None
_check_sensor = False
_try_count = 0
def __init__(self, name, logger, opts, srv):
unit = opts.pop('unit', 'deg')
opts['value.unit'] = unit
try:
self._scale = self.SCALES[unit] * opts.get('gear', 1)
except KeyError as e:
raise ConfigError('unsupported unit: %s' % unit)
super().__init__(name, logger, opts, srv)
def write_gear(self, value):
self._scale = self.SCALES[self.parameters['value'].datatype.unit] * self.gear
return value
def startModule(self, start_events):
super().startModule(start_events)
start_events.queue(self.read_info)
def check_value(self, value):
"""check if value allows moving in current direction"""
if self._direction > 0:
if value > self.target_limits[1]:
raise BadValueError('above upper limit')
elif value < self.target_limits[0]:
raise BadValueError('below lower limit')
def read_value(self):
pos = self._hw.getPosition(self.axis) * self._scale
if self.isBusy():
try:
self.check_value(pos)
except BadValueError as e:
self._stop()
self._idle_status = ERROR, str(e)
return pos
def read_frequency(self):
return self._hw.getFrequency(self.axis)
def write_frequency(self, value):
self._hw.setFrequency(self.axis, value)
return self._hw.getFrequency(self.axis)
def read_amplitude(self):
return self._hw.getAmplitude(self.axis)
def write_amplitude(self, value):
self._hw.setAmplitude(self.axis, value)
return self._hw.getAmplitude(self.axis)
def write_tolerance(self, value):
self._hw.setTargetRange(self.axis, value / self._scale)
return value
def write_output(self, value):
self._hw.setAxisOutput(self.axis, enable=value, autoDisable=0)
return value
def read_status(self):
statusbits = self._hw.getAxisStatus(self.axis)
sensor, self.output, moving, attarget, eot_fwd, eot_bwd, sensor_error = statusbits
self.statusbits = ''.join((k for k, v in zip('SOMTFBE', statusbits) if v))
if self._move_steps:
if not (eot_fwd or eot_bwd):
return BUSY, 'moving by steps'
if not sensor:
self._error_state = 'no sensor connected'
elif sensor_error:
self._error_state = 'sensor error'
elif eot_fwd:
self._error_state = 'end of travel forward'
elif eot_bwd:
self._error_state = 'end of travel backward'
else:
if self._error_state and not DIRECTION_NAME[self._direction] in self._error_state:
self._error_state = ''
status_text = 'moving' if self._try_count == 0 else 'moving (retry %d)' % self._try_count
if moving and self._history is not None: # history None: moving by steps
self._history.append(self.value)
if len(self._history) < 5:
return BUSY, status_text
beg = self._history.pop(0)
if abs(beg - self.target) < self.tolerance:
# reset normal tolerance
self._stop()
self._idle_status = IDLE, 'in tolerance'
return self._idle_status
# self._hw.setTargetRange(self.axis, self.tolerance / self._scale)
if (self.value - beg) * self._direction > 0:
return BUSY, status_text
self._try_count += 1
if self._try_count < 10:
self.log.warn('no progress retry %d', self._try_count)
return BUSY, status_text
self._idle_status = WARN, 'no progress'
if self._error_state:
self._try_count += 1
if self._try_count < 10 and self._history is not None:
self.log.warn('end of travel retry %d', self._try_count)
self.write_target(self.target)
return Done
self._idle_status = WARN, self._error_state
if self.status[0] != IDLE:
self._stop()
return self._idle_status
def write_target(self, value):
if value == self.read_value():
return value
self.check_limits(value)
self._try_count = 0
self._direction = 1 if value > self.value else -1
# if self._error_state and DIRECTION_NAME[-self._direction] not in self._error_state:
# raise BadValueError('can not move (%s)' % self._error_state)
self._move_steps = 0
self.write_output(1)
# try first with 50 % of tolerance
self._hw.setTargetRange(self.axis, self.tolerance * 0.5 / self._scale)
for itry in range(5):
try:
self._hw.setTargetPosition(self.axis, value / self._scale)
self._hw.startAutoMove(self.axis, enable=1, relative=0)
except Exception as e:
if itry == 4:
raise
self.log.warn('%r', e)
self._history = [self.value]
self._idle_status = IDLE, ''
self.freeze_status(1, BUSY, 'changed target')
self.setFastPoll(True, 1)
return value
def doPoll(self):
if self._move_steps == 0:
super().doPoll()
return
self._hw.startSingleStep(self.axis, self._direction < 0)
self._move_steps -= self._direction
if self._move_steps % int(self.frequency) == 0: # poll value and status every second
super().doPoll()
@nopoll
def read_info(self):
"""read info from controller"""
cap = self._hw.measureCapacitance(self.axis) * 1e9
axistype = ['linear', 'gonio', 'rotator'][self._hw.getActuatorType(self.axis)]
return '%s %s %.3gnF' % (self._hw.getActuatorName(self.axis), axistype, cap)
def _stop(self):
self._move_steps = 0
self._history = None
for _ in range(5):
try:
self._hw.startAutoMove(self.axis, enable=0, relative=0)
break
except Exception as e:
if itry == 4:
raise
self.log.warn('%r', e)
self._hw.setTargetRange(self.axis, self.tolerance / self._scale)
self.setFastPoll(False)
@Command()
def stop(self):
self._idle_status = IDLE, 'stopped' if self.isBusy() else ''
self._stop()
self.status = self._idle_status
@Command(IntRange())
def move(self, value):
"""relative move by number of steps"""
self._direction = 1 if value > 0 else -1
self.check_value(self.value)
self._history = None
if DIRECTION_NAME[self._direction] in self._error_state:
raise BadValueError('can not move (%s)' % self._error_state)
self._move_steps = value
self._idle_status = IDLE, ''
self.read_status()
self.setFastPoll(True, 1/self.frequency)

View File

@@ -26,7 +26,7 @@ from secop.lib.enum import Enum
from secop.errors import BadValueError, HardwareError
from secop_psi.magfield import Magfield, SimpleMagfield, Status
from secop_psi.mercury import MercuryChannel, off_on, Mapped
from secop.lib.statemachine import Retry
from secop.states import Retry
Action = Enum(hold=0, run_to_set=1, run_to_zero=2, clamped=3)
hold_rtoz_rtos_clmp = Mapped(HOLD=Action.hold, RTOS=Action.run_to_set,
@@ -65,6 +65,13 @@ class SimpleField(MercuryChannel, SimpleMagfield):
obj = object.__new__(newclass)
return obj
def initModule(self):
super().initModule()
try:
self.write_action(Action.hold)
except Exception as e:
self.log.error('can not set to hold %r', e)
def read_value(self):
return self.query('PSU:SIG:FLD')
@@ -94,12 +101,12 @@ class SimpleField(MercuryChannel, SimpleMagfield):
def set_and_go(self, value):
self.setpoint = self.change('PSU:SIG:FSET', value)
assert self.write_action('hold') == 'hold'
assert self.write_action('run_to_set') == 'run_to_set'
assert self.write_action(Action.hold) == Action.hold
assert self.write_action(Action.run_to_set) == Action.run_to_set
def start_ramp_to_target(self, sm):
# if self.action != 'hold':
# assert self.write_action('hold') == 'hold'
# if self.action != Action.hold:
# assert self.write_action(Action.hold) == Action.hold
# return Retry
self.set_and_go(sm.target)
sm.try_cnt = 5
@@ -116,17 +123,17 @@ class SimpleField(MercuryChannel, SimpleMagfield):
return Retry
def final_status(self, *args, **kwds):
print('FINAL-hold')
self.write_action('hold')
self.write_action(Action.hold)
return super().final_status(*args, **kwds)
def on_restart(self, sm):
print('ON_RESTART-hold', sm.sm)
self.write_action('hold')
self.write_action(Action.hold)
return super().on_restart(sm)
class Field(SimpleField, Magfield):
persistent_field = Parameter(
'persistent field', FloatRange(unit='$'), readonly=False)
wait_switch_on = Parameter(
'wait time to ensure switch is on', FloatRange(0, unit='s'), readonly=True, default=60)
wait_switch_off = Parameter(
@@ -136,7 +143,7 @@ class Field(SimpleField, Magfield):
_field_mismatch = None
__init = True
__switch_heater_fix = 0
__switch_fixed_until = 0
def doPoll(self):
super().doPoll()
@@ -208,7 +215,8 @@ class Field(SimpleField, Magfield):
value = self.query('PSU:SIG:SWHT', off_on)
now = time.time()
if value != self.switch_heater:
if now < self.__switch_heater_fix:
if now < self.__switch_fixed_until:
self.log.debug('correct fixed switch time')
# probably switch heater was changed, but IPS reply is not yet updated
if self.switch_heater:
self.switch_on_time = time.time()
@@ -228,8 +236,10 @@ class Field(SimpleField, Magfield):
self.log.info('switch heater already %r', value)
# we do not want to restart the timer
return value
self.__switch_heater_fix = time.time() + 10
return self.change('PSU:SIG:SWHT', value, off_on)
self.__switch_fixed_until = time.time() + 10
self.log.debug('switch time fixed for 10 sec')
result = self.change('PSU:SIG:SWHT', value, off_on, n_retry=0) # no readback check
return result
def start_ramp_to_field(self, sm):
if abs(self.current - self.persistent_field) <= self.tolerance:
@@ -241,9 +251,7 @@ class Field(SimpleField, Magfield):
if self.switch_heater:
self.log.warn('switch is already on!')
return self.ramp_to_field
self.log.warn('wait first for switch off current=%g pf=%g', self.current, self.persistent_field)
return Retry
self.status = Status.PREPARING, 'wait for switch off'
self.log.warn('wait first for switch off current=%g pf=%g %r', self.current, self.persistent_field, e)
sm.after_wait = self.ramp_to_field
return self.wait_for_switch
return self.ramp_to_field
@@ -270,7 +278,7 @@ class Field(SimpleField, Magfield):
return Retry
def wait_for_switch(self, sm):
if not self.delay(10):
if not sm.delta(10):
return Retry
try:
self.log.warn('try again')
@@ -282,8 +290,8 @@ class Field(SimpleField, Magfield):
def start_ramp_to_zero(self, sm):
try:
assert self.write_action('hold') == 'hold'
assert self.write_action('run_to_zero') == 'run_to_zero'
assert self.write_action(Action.hold) == Action.hold
assert self.write_action(Action.run_to_zero) == Action.run_to_zero
except (HardwareError, AssertionError) as e:
self.log.warn('switch not yet ready %r', e)
self.status = Status.PREPARING, 'wait for switch off'
@@ -298,6 +306,6 @@ class Field(SimpleField, Magfield):
sm.try_cnt -= 1
if sm.try_cnt < 0:
raise
assert self.write_action('hold') == 'hold'
assert self.write_action('run_to_zero') == 'run_to_zero'
assert self.write_action(Action.hold) == Action.hold
assert self.write_action(Action.run_to_zero) == Action.run_to_zero
return Retry

View File

@@ -23,9 +23,9 @@ import time
from secop.core import Drivable, Parameter, Done, IDLE, BUSY, ERROR
from secop.datatypes import FloatRange, EnumType, ArrayOf, TupleOf, StatusType
from secop.features import HasLimits
from secop.errors import ConfigError, ProgrammingError, HardwareError
from secop.errors import ConfigError, ProgrammingError, HardwareError, BadValueError
from secop.lib.enum import Enum
from secop.states import Retry, HasStates, status_code
from secop.states import Retry, HasStates, status_code, Start
UNLIMITED = FloatRange()
@@ -130,8 +130,6 @@ class Magfield(SimpleMagfield):
'persistent mode', EnumType(Mode), readonly=False, default=Mode.PERSISTENT)
switch_heater = Parameter('switch heater', EnumType(off=OFF, on=ON),
readonly=False, default=0)
persistent_field = Parameter(
'persistent field', FloatRange(unit='$'), readonly=False)
current = Parameter(
'leads current (in units of field)', FloatRange(unit='$'))
# TODO: time_to_target
@@ -157,54 +155,73 @@ class Magfield(SimpleMagfield):
ramp_tmo = Parameter(
'timeout for field ramp progress',
FloatRange(0, unit='s'), readonly=False, default=30)
__init = True
__init_persistency = True
switch_on_time = None
switch_off_time = None
def doPoll(self):
if self.__init:
self.__init = False
if self.read_switch_heater() and self.mode == Mode.PERSISTENT:
if self.__init_persistency:
if self.__init_persistency is True:
self._last_target = self.value
self.__init_persistency = time.time() + 60
self.read_value() # check for persistent field mismatch
# switch off heater from previous live or manual intervention
self.write_target(self.persistent_field)
elif self.read_switch_heater() and self.mode != Mode.DRIVEN:
if time.time() > self.__init_persistency:
# switch off heater from previous live or manual intervention
self.log.info('fix mode after startup')
self.write_mode(self.mode)
else:
self._last_target = self.persistent_field
else:
super().doPoll()
self.__init_persistency = False
super().doPoll()
def initModule(self):
super().initModule()
self.registerCallbacks(self) # for update_switch_heater
def write_mode(self, value):
self.start_machine(self.start_field_change, cleanup=self.cleanup, target=self.target, mode=value)
self.__init_persistency = False
target = self.value
func = self.start_field_change
if value == Mode.DISABLED:
target = 0
if abs(self.value) < self.tolerance:
func = self.start_switch_off
elif value == Mode.PERSISTENT:
func = self.start_switch_off
self.start_machine(func, target=target, mode=value)
return value
def write_target(self, target):
self.__init_persistency = False
if self.mode == Mode.DISABLED:
if target == 0:
return 0
self.log.info('raise error %r', target)
raise BadValueError('disabled')
self.check_limits(target)
self.start_machine(self.start_field_change, cleanup=self.cleanup, target=target, mode=self.mode)
self.start_machine(self.start_field_change, target=target, mode=self.mode)
return target
def cleanup(self, sm): # sm is short for statemachine
if self.switch_heater != 0:
self.persistent_field = self.read_value()
def on_error(self, sm): # sm is short for statemachine
if self.switch_heater == ON:
self.read_value()
if sm.mode != Mode.DRIVEN:
self.log.warning('turn switch heater off')
self.write_switch_heater(0)
self.write_switch_heater(OFF)
return self.on_error(sm)
@status_code('PREPARING')
@status_code(Status.PREPARING)
def start_field_change(self, sm):
self.setFastPoll(True, 1.0)
if sm.target == self.persistent_field or (
if sm.target == self.value or (
sm.target == self._last_target and
abs(sm.target - self.persistent_field) <= self.tolerance): # short cut
abs(sm.target - self.value) <= self.tolerance): # short cut
return self.check_switch_off
if self.switch_heater:
if self.switch_heater == ON:
return self.start_switch_on
return self.start_ramp_to_field
@status_code('PREPARING')
@status_code(Status.PREPARING)
def start_ramp_to_field(self, sm):
"""start ramping current to persistent field
@@ -213,12 +230,12 @@ class Magfield(SimpleMagfield):
"""
raise NotImplementedError
@status_code('PREPARING', 'ramp leads to match field')
@status_code(Status.PREPARING, 'ramp leads to match field')
def ramp_to_field(self, sm):
if sm.init:
sm.stabilize_start = 0 # in case current is already at field
self.init_progress(sm, self.current)
dif = abs(self.current - self.persistent_field)
dif = abs(self.current - self.value)
if dif > self.tolerance:
tdif = self.get_progress(sm, self.current)
if tdif > self.leads_ramp_tmo:
@@ -229,7 +246,7 @@ class Magfield(SimpleMagfield):
sm.stabilize_start = sm.now
return self.stabilize_current
@status_code('PREPARING')
@status_code(Status.PREPARING)
def stabilize_current(self, sm):
if sm.now - sm.stabilize_start < self.wait_stable_leads:
return Retry
@@ -237,7 +254,6 @@ class Magfield(SimpleMagfield):
def update_switch_heater(self, value):
"""is called whenever switch heater was changed"""
print('SW', value)
if value == 0:
if self.switch_off_time is None:
self.log.info('restart switch_off_time')
@@ -249,12 +265,12 @@ class Magfield(SimpleMagfield):
self.switch_on_time = time.time()
self.switch_off_time = None
@status_code('PREPARING')
@status_code(Status.PREPARING)
def start_switch_on(self, sm):
if self.read_switch_heater() == 0:
if self.read_switch_heater() == OFF:
self.status = Status.PREPARING, 'turn switch heater on'
try:
self.write_switch_heater(True)
self.write_switch_heater(ON)
except Exception as e:
self.log.warning('write_switch_heater %r', e)
return Retry
@@ -262,13 +278,15 @@ class Magfield(SimpleMagfield):
self.status = Status.PREPARING, 'wait for heater on'
return self.wait_for_switch_on
@status_code('PREPARING')
@status_code(Status.PREPARING)
def wait_for_switch_on(self, sm):
if (sm.target == self._last_target and
abs(sm.target - self.persistent_field) <= self.tolerance): # short cut
abs(sm.target - self.value) <= self.tolerance): # short cut
return self.check_switch_off
self.read_switch_heater() # trigger switch_on/off_time
if self.switch_heater == 0:
if self.switch_heater == OFF:
if sm.init: # avoid too many states chained
return Retry
self.log.warning('switch turned off manually?')
return self.start_switch_on
if sm.now - self.switch_on_time < self.wait_switch_on:
@@ -278,7 +296,7 @@ class Magfield(SimpleMagfield):
self._last_target = sm.target
return self.start_ramp_to_target
@status_code('RAMPING')
@status_code(Status.RAMPING)
def start_ramp_to_target(self, sm):
"""start ramping current to target field
@@ -287,9 +305,8 @@ class Magfield(SimpleMagfield):
"""
raise NotImplementedError
@status_code('RAMPING')
@status_code(Status.RAMPING)
def ramp_to_target(self, sm):
self.persistent_field = self.value
dif = abs(self.value - sm.target)
if sm.init:
sm.stabilize_start = 0 # in case current is already at target
@@ -298,6 +315,7 @@ class Magfield(SimpleMagfield):
sm.stabilize_start = sm.now
tdif = self.get_progress(sm, self.value)
if tdif > self.workingramp / self.tolerance * 60 + self.ramp_tmo:
self.log.warn('no progress')
raise HardwareError('no progress')
sm.stabilize_start = None
return Retry
@@ -305,10 +323,9 @@ class Magfield(SimpleMagfield):
sm.stabilize_start = sm.now
return self.stabilize_field
@status_code('STABILIZING')
@status_code(Status.STABILIZING)
def stabilize_field(self, sm):
self.persistent_field = self.value
if sm.now > sm.stablize_start + self.wait_stable_field:
if sm.now < sm.stabilize_start + self.wait_stable_field:
return Retry
return self.check_switch_off
@@ -317,17 +334,18 @@ class Magfield(SimpleMagfield):
return self.final_status(Status.PREPARED, 'driven')
return self.start_switch_off
@status_code('FINALIZING')
@status_code(Status.FINALIZING)
def start_switch_off(self, sm):
if self.switch_heater == 1:
self.write_switch_heater(False)
if self.switch_heater == ON:
self.write_switch_heater(OFF)
return self.wait_for_switch_off
@status_code('FINALIZING')
@status_code(Status.FINALIZING)
def wait_for_switch_off(self, sm):
self.persistent_field = self.value
self.read_switch_heater()
if self.switch_off_time is None:
if self.switch_heater == ON:
if sm.init: # avoid too many states chained
return Retry
self.log.warning('switch turned on manually?')
return self.start_switch_off
if sm.now - self.switch_off_time < self.wait_switch_off:
@@ -336,7 +354,7 @@ class Magfield(SimpleMagfield):
return self.final_status(Status.IDLE, 'leads current at field, switch off')
return self.start_ramp_to_zero
@status_code('FINALIZING')
@status_code(Status.FINALIZING)
def start_ramp_to_zero(self, sm):
"""start ramping current to zero
@@ -345,15 +363,15 @@ class Magfield(SimpleMagfield):
"""
raise NotImplementedError
@status_code('FINALIZING')
@status_code(Status.FINALIZING)
def ramp_to_zero(self, sm):
"""ramp field to zero"""
if sm.init:
self.init_progress(sm, self.current)
if abs(self.current) > self.tolerance:
if self.get_progress(sm, self.current, self.ramp) > self.leads_ramp_tmo:
if self.get_progress(sm, self.value) > self.leads_ramp_tmo:
raise HardwareError('no progress')
return Retry
if sm.mode == Mode.DISABLED and self.persistent_field == 0:
if sm.mode == Mode.DISABLED and abs(self.value) < self.tolerance:
return self.final_status(Status.DISABLED, 'disabled')
return self.final_status(Status.IDLE, 'persistent mode')

View File

@@ -123,13 +123,14 @@ class MercuryChannel(HasIO):
else:
raise HardwareError(msg) from None
def multichange(self, adr, values, convert=as_float, tolerance=0):
def multichange(self, adr, values, convert=as_float, tolerance=0, n_retry=3):
"""set parameter(s) in mercury syntax
:param adr: as in see multiquery method
:param values: [(name1, value1), (name2, value2) ...]
:param convert: a converter function (converts given value to string and replied string to value)
:param tolerance: tolerance for readback check
:param n_retry: number of retries or 0 for no readback check
:return: the values as tuple
Example:
@@ -143,7 +144,7 @@ class MercuryChannel(HasIO):
adr = self._complete_adr(adr)
params = ['%s:%s' % (k, convert(v)) for k, v in values]
cmd = 'SET:%s:%s' % (adr, ':'.join(params))
for _ in range(3): # try 3 times or until readback result matches
for _ in range(max(1, n_retry)): # try n_retry times or until readback result matches
t = time.time()
reply = self.communicate(cmd)
head = 'STAT:SET:%s:' % adr
@@ -158,6 +159,8 @@ class MercuryChannel(HasIO):
except (AssertionError, AttributeError, ValueError) as e:
time.sleep(0.1) # in case of missed replies this might help to skip garbage
raise HardwareError('invalid reply %r to cmd %r' % (reply, cmd)) from e
if n_retry == 0:
return [v[1] for v in values] # no readback check
keys = [v[0] for v in values]
debug = []
readback = self.multiquery(adr, keys, convert, debug)
@@ -182,9 +185,9 @@ class MercuryChannel(HasIO):
adr, _, name = adr.rpartition(':')
return self.multiquery(adr, [name], convert)[0]
def change(self, adr, value, convert=as_float, tolerance=0):
def change(self, adr, value, convert=as_float, tolerance=0, n_retry=3):
adr, _, name = adr.rpartition(':')
return self.multichange(adr, [(name, value)], convert, tolerance)[0]
return self.multichange(adr, [(name, value)], convert, tolerance, n_retry)[0]
class TemperatureSensor(MercuryChannel, Readable):