From a2905d9fbce828bbe51a16b60863a7017ddac2e1 Mon Sep 17 00:00:00 2001 From: Markus Zolliker Date: Fri, 22 Mar 2024 14:38:23 +0100 Subject: [PATCH] improve attocube driver - driving in an extra thread, hoping not to miss end of travel status bits (does not work always) - maxtry parameter for trying several times TODO: move by step (in an other thread) Change-Id: I89b51d1f6926f2fd26ec22d43aede377b5231583 --- frappy_psi/attocube.py | 275 +++++++++++++++++++++++------------------ 1 file changed, 154 insertions(+), 121 deletions(-) diff --git a/frappy_psi/attocube.py b/frappy_psi/attocube.py index ee0957e..6e862ef 100644 --- a/frappy_psi/attocube.py +++ b/frappy_psi/attocube.py @@ -19,6 +19,7 @@ import sys import time +import threading from frappy.core import Drivable, Parameter, Command, Property, \ ERROR, WARN, BUSY, IDLE, Done, nopoll, Limit # from frappy.features import HasSimpleOffset @@ -28,7 +29,19 @@ sys.path.append('/home/l_samenv/Documents/anc350/Linux64/userlib/lib') from PyANC350v4 import Positioner -DIRECTION_NAME = {1: 'forward', -1: 'backward'} +DIRECTION_NAME = {1: 'forward', -1: 'backward', 0: 'idle'} + + +class DriveInfo: + def __init__(self, value, target, status=(BUSY, 'changed target')): + self.pos = value + self.direction = -1 if target < value else 1 + self.target = target + self.status = status + self.thread = None + self.statusbits = '' + self.output = False + self.sensor_connected = False class Axis(Drivable): @@ -41,46 +54,130 @@ class Axis(Drivable): output = Parameter('enable output', BoolType(), readonly=False) info = Parameter('axis info', StringType()) statusbits = Parameter('status bits', StringType()) + maxtry = Parameter('max. number of move tries', IntRange(), default=3, readonly=False) + settling_time = Property('time to be spent wihtin tolerance', FloatRange(0), default=0.25) + fast_interval = Property('waiting time with in internal drive polls', FloatRange(0), default=0.001) target_min = Limit() target_max = Limit() - _hw = Positioner() + _bad_status_max = 0.05 + _settling_time = 0.1 + _hw = None _scale = 1 # scale for custom units _move_steps = 0 # number of steps to move (used by move command) SCALES = {'deg': 1, 'm': 1, 'mm': 1000, 'um': 1000000, 'µm': 1000000} - _direction = 1 # move direction + _moving = None # None when not moving, else status text _idle_status = IDLE, '' _error_state = '' # empty string: no error - _history = None + # _history = None _check_sensor = False - _try_count = 0 - __freeze_until = 0 + _drive_info = DriveInfo(0, 0, (IDLE, '')) + STATUSBIT_NAMES = ['sensor', 'output', 'moving', 'at_target', 'fwd_stuck', 'bwd_stuck', 'error'] + + def initModule(self): + super().initModule() + # TODO: catch timeout + self._hw = Positioner() def write_gear(self, value): self._scale = self.SCALES[self.parameters['value'].datatype.unit] * self.gear return value def startModule(self, start_events): + # self._history = [0] super().startModule(start_events) start_events.queue(self.read_info) + def _get_state(self, info): + """get axis status + + - update _drive_info.output ans _drive_info.sensor_connected + - return , , + + is True whn moving + is True when in error + is an error text, when in error, 'at target' or '' otherwise + """ + statusbits = self._hw.getAxisStatus(self.axis) + info.sensor_connected, info.output, moving, at_target, fwd_stuck, bwd_stuck, error = statusbits + info.statusbits = statusbits + if error: + return False, True, 'other error' + if bwd_stuck: + return False, True, 'end of travel backward' + if bwd_stuck: + return False, True, 'end of travel forward' + if at_target: + return False, False, 'at target' + if moving: + #if not info.output: + # return True, True, 'output off' + return True, False, '' + return False, False, '' + + def _drive_thread(self, info): + try: + for ntry in range(self.maxtry, 0, -1): + self._hw.setAxisOutput(self.axis, enable=True, autoDisable=0) + self._hw.setTargetRange(self.axis, 0 / self._scale) + self._hw.setTargetPosition(self.axis, info.target / self._scale) + self._hw.startAutoMove(self.axis, enable=1, relative=0) + inside_since = 0 + saved_inside_time = 0 + last_inside = 0 + start_time = time.time() + status = None + while info.thread: + time.sleep(self.fast_interval) + moving, in_error, reason = self._get_state(info) + if time.time() > start_time + 0.05: + if reason: + if in_error: + status = ERROR, reason + else: + status = IDLE, reason + break + if not moving: + status = WARN, 'stopped by unknown reason' + break + info.pos = self._hw.getPosition(self.axis) * self._scale + self.check_value(info.pos) + if abs(info.pos - info.target) < self.tolerance: + last_inside = time.time() + if not inside_since: + inside_since = last_inside - saved_inside_time + if last_inside > inside_since + self._settling_time: + info.status = IDLE, 'in tolerance' + self.doPoll() + return + else: + info.status = BUSY, 'driving' + if inside_since: + saved_inside_time = last_inside - inside_since + inside_since = 0 + if ntry == 1: + info.status = status + else: + info.status = BUSY, f'retry after {status[1]} ({ntry})' + self.doPoll() + except Exception as e: + info.status = ERROR, repr(e) + finally: + info.thread = None + def check_value(self, value): """check if value allows moving in current direction""" - if self._direction > 0: - if value > self.target_limits[1]: + if self._drive_info.direction > 0: + if value > self.target_max: raise BadValueError('above upper limit') - elif value < self.target_limits[0]: + elif value < self.target_min: raise BadValueError('below lower limit') def read_value(self): - pos = self._hw.getPosition(self.axis) * self._scale - if self.isBusy(): - try: - self.check_value(pos) - except BadValueError as e: - self._stop() - self._idle_status = ERROR, str(e) - return pos + drv = self._drive_info + if drv.thread: + return drv.pos + return self._hw.getPosition(self.axis) * self._scale def read_frequency(self): return self._hw.getFrequency(self.axis) @@ -96,104 +193,49 @@ class Axis(Drivable): self._hw.setAmplitude(self.axis, value) return self._hw.getAmplitude(self.axis) - def write_tolerance(self, value): - self._hw.setTargetRange(self.axis, value / self._scale) - return value + #def write_tolerance(self, value): + # self._hw.setTargetRange(self.axis, value / self._scale) + # return value def write_output(self, value): self._hw.setAxisOutput(self.axis, enable=value, autoDisable=0) return value - def freeze_status(self, delay, code=BUSY, text='changed target'): - """freeze status to the given value for the given delay""" - self.__freeze_until = time.time() + delay - self.status = code, text - def read_status(self): - if time.time() < self.__freeze_until: - return self.status - self.__freeze_until = 0 - statusbits = self._hw.getAxisStatus(self.axis) - sensor, self.output, moving, attarget, eot_fwd, eot_bwd, sensor_error = statusbits - self.statusbits = ''.join((k for k, v in zip('SOMTFBE', statusbits) if v)) - if self._move_steps: - if not (eot_fwd or eot_bwd): - return BUSY, 'moving by steps' - if not sensor: - self._error_state = 'no sensor connected' - elif sensor_error: - self._error_state = 'sensor error' - elif eot_fwd: - self._error_state = 'end of travel forward' - elif eot_bwd: - self._error_state = 'end of travel backward' - else: - if self._error_state and not DIRECTION_NAME[self._direction] in self._error_state: - self._error_state = '' - status_text = 'moving' if self._try_count == 0 else 'moving (retry %d)' % self._try_count - if moving and self._history is not None: # history None: moving by steps - self._history.append(self.value) - if len(self._history) < 5: - return BUSY, status_text - beg = self._history.pop(0) - if abs(beg - self.target) < self.tolerance: - # reset normal tolerance - self._stop() - self._idle_status = IDLE, 'in tolerance' - return self._idle_status - # self._hw.setTargetRange(self.axis, self.tolerance / self._scale) - if (self.value - beg) * self._direction > 0: - return BUSY, status_text - self._try_count += 1 - if self._try_count < 10: - self.log.warn('no progress retry %d', self._try_count) - return BUSY, status_text - self._idle_status = WARN, 'no progress' - if self._error_state: - self._try_count += 1 - if self._try_count < 10 and self._history is not None: - self.log.warn('end of travel retry %d', self._try_count) - self.write_target(self.target) - return Done - self._idle_status = WARN, self._error_state - if self.status[0] != IDLE: - self._stop() - return self._idle_status + drv = self._drive_info + if not drv.thread: + self._get_state(drv) + self.setFastPoll(False) + self.statusbits = ''.join(k for k, v in zip('SOMTFBE', drv.statusbits) if v) + return drv.status - def write_target(self, value): - if value == self.read_value(): - return value - # self.check_limits(value) - self._try_count = 0 - self._direction = 1 if value > self.value else -1 - # if self._error_state and DIRECTION_NAME[-self._direction] not in self._error_state: - # raise BadValueError('can not move (%s)' % self._error_state) + def _start_motor(self, target): + self._stop_thread() + self._drive_info = DriveInfo(self.value, target) + self._drive_info.thread = threading.Thread(target=self._drive_thread, args=(self._drive_info,)) + self._drive_info.thread.start() + + def _stop_thread(self): + try: + drv = self._drive_info + drv.thread.join() + except AttributeError: + pass + + def _stop(self): self._move_steps = 0 - self.write_output(1) - # try first with 50 % of tolerance - self._hw.setTargetRange(self.axis, self.tolerance * 0.5 / self._scale) - for itry in range(5): - try: - self._hw.setTargetPosition(self.axis, value / self._scale) - self._hw.startAutoMove(self.axis, enable=1, relative=0) - except Exception as e: - if itry == 4: - raise - self.log.warn('%r', e) - self._history = [self.value] - self._idle_status = IDLE, '' - self.freeze_status(1) - self.setFastPoll(True, 1) - return value + self.setFastPoll(False) + self._stop_thread() + self.read_status() - def doPoll(self): - if self._move_steps == 0: - super().doPoll() - return - self._hw.startSingleStep(self.axis, self._direction < 0) - self._move_steps -= self._direction - if self._move_steps % int(self.frequency) == 0: # poll value and status every second - super().doPoll() + def write_target(self, target): + # self.check_limits(value) + # self._try_count = 0 + self._move_steps = 0 + self._start_motor(target) + # self._history = [self.value] + self.setFastPoll(True, 0.25) + return target @nopoll def read_info(self): @@ -202,20 +244,6 @@ class Axis(Drivable): axistype = ['linear', 'gonio', 'rotator'][self._hw.getActuatorType(self.axis)] return '%s %s %.3gnF' % (self._hw.getActuatorName(self.axis), axistype, cap) - def _stop(self): - self._move_steps = 0 - self._history = None - for _ in range(5): - try: - self._hw.startAutoMove(self.axis, enable=0, relative=0) - break - except Exception as e: - if itry == 4: - raise - self.log.warn('%r', e) - self._hw.setTargetRange(self.axis, self.tolerance / self._scale) - self.setFastPoll(False) - @Command() def stop(self): self._idle_status = IDLE, 'stopped' if self.isBusy() else '' @@ -227,10 +255,15 @@ class Axis(Drivable): """relative move by number of steps""" self._direction = 1 if value > 0 else -1 self.check_value(self.value) - self._history = None if DIRECTION_NAME[self._direction] in self._error_state: raise BadValueError('can not move (%s)' % self._error_state) self._move_steps = value self._idle_status = IDLE, '' self.read_status() self.setFastPoll(True, 1/self.frequency) + + @Command(IntRange()) + def automove(self, flag): + """switch automove and output""" + self._hw.setAxisOutput(self.axis, enable=flag, autoDisable=0) + self._hw.startAutoMove(self.axis, enable=flag, relative=0) \ No newline at end of file