From a8f1495bc8db0579e5c64cfea0015557dd9fa49e Mon Sep 17 00:00:00 2001 From: Markus Zolliker Date: Fri, 16 Sep 2022 14:53:42 +0200 Subject: [PATCH] [WIP] uniax after changing to StateMachine Change-Id: I0173f8c8eaaeb2526477d05803a615673297667d --- secop_psi/uniax.py | 281 +++++++++++++++++++++------------------------ 1 file changed, 132 insertions(+), 149 deletions(-) diff --git a/secop_psi/uniax.py b/secop_psi/uniax.py index 51a2182..fa819f2 100644 --- a/secop_psi/uniax.py +++ b/secop_psi/uniax.py @@ -26,6 +26,11 @@ import math from secop.core import Drivable, Parameter, FloatRange, Done, \ Attached, Command, PersistentMixin, PersistentParam, BoolType from secop.errors import BadValueError +from secop.lib.statemachine import Retry, StateMachine, Restart + + +class Error(Exception): + pass class Uniax(PersistentMixin, Drivable): @@ -52,29 +57,33 @@ class Uniax(PersistentMixin, Drivable): default=0.2, persistent='auto') low_pos = Parameter('max. position for positive forces', FloatRange(unit='deg'), readonly=False, needscfg=False) high_pos = Parameter('min. position for negative forces', FloatRange(unit='deg'), readonly=False, needscfg=False) - motor_play = Parameter('summed steps without substantial change', FloatRange(), default=0) - max_play = Parameter('max. summed steps without substantial change', FloatRange(), readonly=False, default=70) + substantial_force = Parameter('min. force change expected within motor play', FloatRange(), default=0) + motor_play = Parameter('acceptable motor play within hysteresis', FloatRange(), readonly=False, default=10) + motor_max_play = Parameter('acceptable motor play outside hysteresis', FloatRange(), readonly=False, default=70) pollinterval = 0.1 - fast_pollfactor = 1 _mot_target = None # for detecting manual motor manipulations _filter_start = 0 + _filtered = False _cnt = 0 _sum = 0 _cnt_rderr = 0 _cnt_wrerr = 0 - _action = None - _last_force = 0 - _expected_step = 1 _in_cnt = 0 - _init_action = False _zero_pos_tol = None - _find_target = 0 + _state = None def earlyInit(self): super().earlyInit() self._zero_pos_tol = {} - self._action = self.idle + + def initModule(self): + super().initModule() + self._state = StateMachine(logger=self.log, threaded=False, cleanup=self.cleanup) + + def doPoll(self): + self.read_value() + self._state.cycle() def drive_relative(self, step, ntry=3): """drive relative, try 3 times""" @@ -105,31 +114,14 @@ class Uniax(PersistentMixin, Drivable): mot = self.motor if mot.isBusy(): if mot.target != self._mot_target: - self.action = self.idle + raise Error('control stopped - motor moved directly') return True return False - def next_action(self, action): - """call next action - - :param action: function to be called next time - :param do_now: do next action in the same cycle - """ - self._action = action - self._init_action = True - self.log.info('action %r', action.__name__) - - def init_action(self): - """return true when called the first time after next_action""" - if self._init_action: - self._init_action = False - return True - return False - - def zero_pos(self, value,): + def zero_pos(self, value): """get high_pos or low_pos, depending on sign of value - :param force: when not 0, return an estimate for a good starting position + :param value: return an estimate for a good starting position """ name = 'high_pos' if value > 0 else 'low_pos' @@ -158,145 +150,141 @@ class Uniax(PersistentMixin, Drivable): setattr(self, name, pos) return pos - def find(self, force, target): + def do_find(self, state): """find active (engaged) range""" - sign = math.copysign(1, target) - if force * sign > self.hysteresis or force * sign > target * sign: + if state.init: + state.prev_direction = 0 # find not yet started + direction = math.copysign(1, self.target) + if self.value * direction > self.hysteresis or self.value * direction > self.target * direction: if self.motor_busy(): - self.log.info('motor stopped - substantial force detected: %g', force) + self.log.info('motor stopped - substantial force detected: %g', self.value) self.motor.stop() - elif self.init_action(): - self.next_action(self.adjust) - return - if abs(force) > self.hysteresis: - self.set_zero_pos(force, self.motor.read_value()) - self.next_action(self.adjust) - return - if force * sign < -self.hysteresis: - self._previous_force = force - self.next_action(self.free) - return + elif state.prev_direction == 0: + return self.do_adjust + if abs(self.value) > self.hysteresis: + self.set_zero_pos(self.value, self.motor.read_value()) + return self.do_adjust + if self.value * direction < -self.hysteresis: + state.force_before_free = self.value + return self.do_free if self.motor_busy(): - if sign * self._find_target < 0: # target sign changed + if direction == -state.prev_direction: # target direction changed self.motor.stop() - self.next_action(self.find) # restart find - return + state.init_find = True # restart find + return Retry() + zero_pos = self.zero_pos(self.target) + if state.prev_direction: # find already started + if abs(self.motor.target - self.motor.value) > self.motor.tolerance: + # no success on last find try, try short and strong step + self.write_adjusting(True) + self.log.info('one step to %g', self.motor.value + self.safe_step) + self.drive_relative(direction * self.safe_step) + return Retry() else: - self._find_target = target - zero_pos = self.zero_pos(target) - side_name = 'positive' if target > 0 else 'negative' - if not self.init_action(): - if abs(self.motor.target - self.motor.value) > self.motor.tolerance: - # no success on last find try, try short and strong step - self.write_adjusting(True) - self.log.info('one step to %g', self.motor.value + self.safe_step) - self.drive_relative(sign * self.safe_step) - return - if zero_pos is not None: - self.status = 'BUSY', 'change to %s side' % side_name - zero_pos += sign * (self.hysteresis * self.slope - self.motor.tolerance) - if (self.motor.value - zero_pos) * sign < -self.motor.tolerance: - self.write_adjusting(False) - self.log.info('change side to %g', zero_pos) - self.drive_relative(zero_pos - self.motor.value) - return - # we are already at or beyond zero_pos - self.next_action(self.adjust) - return - self.write_adjusting(False) - self.status = 'BUSY', 'find %s side' % side_name - self.log.info('one turn to %g', self.motor.value + sign * 360) - self.drive_relative(sign * 360) + state.prev_direction = math.copysign(1, self.target) + side_name = 'negative' if direction == -1 else 'positive' + if zero_pos is not None: + self.status = 'BUSY', 'change to %s side' % side_name + zero_pos += direction * (self.hysteresis * self.slope - self.motor.tolerance) + if (self.motor.value - zero_pos) * direction < -self.motor.tolerance: + self.write_adjusting(False) + self.log.info('change side to %g', zero_pos) + self.drive_relative(zero_pos - self.motor.value) + return Retry() + # we are already at or beyond zero_pos + return self.do_adjust + self.write_adjusting(False) + self.status = 'BUSY', 'find %s side' % side_name + self.log.info('one turn to %g', self.motor.value + direction * 360) + self.drive_relative(direction * 360) + return Retry() - def free(self, force, target): + def cleanup(self, state): + """in case of error, set error status""" + if state.stopped: # stop or restart + if state.stopped is Restart: + return + self.status = 'IDLE', 'stopped' + self.log.warning('stopped') + else: + self.status = 'ERROR', str(state.last_error) + if isinstance(state.last_error, Error): + self.log.error('%s', state.last_error) + else: + self.log.error('%r raised in state %r', str(state.last_error), state.status_string) + self.motor.stop() + self.write_adjusting(False) + + def do_free(self, state): """free from high force at other end""" + if state.init: + state.free_way = None if self.motor_busy(): - return - if abs(force) > abs(self._previous_force) + self.tolerance: + return Retry() + if abs(self.value) > abs(state.force_before_free) + self.tolerance: self.stop() self.status = 'ERROR', 'force increase while freeing' self.log.error(self.status[1]) - return - if abs(force) < self.hysteresis: - self.next_action(self.find) - return - if self.init_action(): - self._free_way = 0 - self.log.info('free from high force %g', force) + return None + if abs(self.value) < self.hysteresis: + return self.do_find + if state.free_way is None: + state.free_way = 0 + self.log.info('free from high force %g', self.value) self.write_adjusting(True) - sign = math.copysign(1, target) - if self._free_way > (abs(self._previous_force) + self.hysteresis) * self.slope: + direction = math.copysign(1, self.target) + if state.free_way > (abs(state.force_before_free) + self.hysteresis) * self.slope: self.stop() self.status = 'ERROR', 'freeing failed' self.log.error(self.status[1]) - return - self._free_way += self.safe_step - self.drive_relative(sign * self.safe_step) + return None + state.free_way += self.safe_step + self.drive_relative(direction * self.safe_step) + return Retry() - def within_tolerance(self, force, target): + def do_within_tolerance(self, state): """within tolerance""" if self.motor_busy(): - return - if abs(target - force) > self.tolerance: - self.next_action(self.adjust) - elif self.init_action(): - self.status = 'IDLE', 'within tolerance' + return Retry() + if abs(self.target - self.value) > self.tolerance: + return self.do_adjust + self.status = 'IDLE', 'within tolerance' - def adjust(self, force, target): + def do_adjust(self, state): """adjust force""" + if state.init: + state.prev_force = None if self.motor_busy(): return - if abs(target - force) < self.tolerance: + if abs(self.target - self.value) < self.tolerance: self._in_cnt += 1 if self._in_cnt >= 3: - self.motor_play = 0 - self.next_action(self.within_tolerance) - return + return self.do_within_tolerance else: self._in_cnt = 0 - if self.init_action(): - self.motor_play = 0 + if state.prev_force is None: + state.prev_force = self.value + state.prev_pos = self.motor.pos self.write_adjusting(True) self.status = 'BUSY', 'adjusting force' elif not self._filtered: return else: - force_step = force - self._last_force - if self._expected_step: - # compare detected / expected step - q = force_step / self._expected_step - mstep = self._expected_step * self.slope - if q < 0.1: - self.motor_play += mstep - elif q > 0.5: - if abs(self.motor_play) <= abs(mstep): - self.motor_play = 0 - else: - self.motor_play = self.motor_play * (1 - abs(mstep / self.motor_play)) - if abs(self.motor_play) >= 10: - if force < self.hysteresis: + if abs(self.value - state.prev_force) > self.substantial_force: + state.prev_force = self.value + state.prev_pos = self.motor.value + else: + motor_dif = abs(self.value - state.prev_pos) + if abs(self.value) < self.hysteresis: + if motor_dif > self.motor_play: self.log.warning('adjusting failed - try to find zero pos') - self.set_zero_pos(target, None) - self.next_action(self.find) - elif abs(self.motor_play) > self.max_play: - self.stop() - self.status = 'ERROR', 'force seems not to change substantially' - self.log.error(self.status[1]) - return - self._last_force = force - force_step = (target - force) * self.pid_i - if abs(target - force) < self.tolerance * 0.5: - self._expected_step = 0 - return - self._expected_step = force_step - step = force_step * self.slope - self.drive_relative(step) - - def idle(self, *args): - if self.init_action(): - self.write_adjusting(False) - if self.status[0] == 'BUSY': - self.status = 'IDLE', 'stopped' + self.set_zero_pos(self.target, None) + return self.do_find + elif motor_dif > self.motor_max_play: + raise Error('force seems not to change substantially') + force_step = (self.target - self.value) * self.pid_i + self.drive_relative(force_step * self.slope) + return Retry() def read_value(self): try: @@ -313,7 +301,6 @@ class Uniax(PersistentMixin, Drivable): now = time.time() if self.motor_busy(): # do not filter while driving - self.value = force self.reset_filter() self._filtered = False else: @@ -322,46 +309,42 @@ class Uniax(PersistentMixin, Drivable): if now < self._filter_start + self.filter_interval: return Done force = self._sum / self._cnt - self.value = force self.reset_filter(now) self._filtered = True if abs(force) > self.limit + self.hysteresis: + self.motor.stop() self.status = 'ERROR', 'above max limit' self.log.error(self.status[1]) return Done if self.zero_pos(force) is None and abs(force) > self.hysteresis and self._filtered: self.set_zero_pos(force, self.motor.read_value()) - self._action(self.value, self.target) - return Done + return force def write_target(self, target): if abs(target) > self.limit: raise BadValueError('force above limit') if abs(target - self.value) <= self.tolerance: - if self.isBusy(): - self.stop() - self.next_action(self.within_tolerance) - else: + if not self.isBusy(): self.status = 'IDLE', 'already at target' - self.next_action(self.within_tolerance) - return target + self._state.start(self.do_within_tolerance) + return target self.log.info('new target %g', target) self._cnt_rderr = 0 self._cnt_wrerr = 0 self.status = 'BUSY', 'changed target' if self.value * math.copysign(1, target) > self.hysteresis: - self.next_action(self.adjust) + self._state.start(self.do_adjust) else: - self.next_action(self.find) + self._state.start(self.do_find) return target @Command() def stop(self): - self._action = self.idle if self.motor.isBusy(): self.log.info('stop motor') self.motor.stop() - self.next_action(self.idle) + self.status = 'IDLE', 'stopped' + self._state.stop() def write_force_offset(self, value): self.force_offset = value