[WIP] uniax after changing to StateMachine
Change-Id: I0173f8c8eaaeb2526477d05803a615673297667d
This commit is contained in:
parent
3ab9821860
commit
a8f1495bc8
@ -26,6 +26,11 @@ import math
|
|||||||
from secop.core import Drivable, Parameter, FloatRange, Done, \
|
from secop.core import Drivable, Parameter, FloatRange, Done, \
|
||||||
Attached, Command, PersistentMixin, PersistentParam, BoolType
|
Attached, Command, PersistentMixin, PersistentParam, BoolType
|
||||||
from secop.errors import BadValueError
|
from secop.errors import BadValueError
|
||||||
|
from secop.lib.statemachine import Retry, StateMachine, Restart
|
||||||
|
|
||||||
|
|
||||||
|
class Error(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
class Uniax(PersistentMixin, Drivable):
|
class Uniax(PersistentMixin, Drivable):
|
||||||
@ -52,29 +57,33 @@ class Uniax(PersistentMixin, Drivable):
|
|||||||
default=0.2, persistent='auto')
|
default=0.2, persistent='auto')
|
||||||
low_pos = Parameter('max. position for positive forces', FloatRange(unit='deg'), readonly=False, needscfg=False)
|
low_pos = Parameter('max. position for positive forces', FloatRange(unit='deg'), readonly=False, needscfg=False)
|
||||||
high_pos = Parameter('min. position for negative forces', FloatRange(unit='deg'), readonly=False, needscfg=False)
|
high_pos = Parameter('min. position for negative forces', FloatRange(unit='deg'), readonly=False, needscfg=False)
|
||||||
motor_play = Parameter('summed steps without substantial change', FloatRange(), default=0)
|
substantial_force = Parameter('min. force change expected within motor play', FloatRange(), default=0)
|
||||||
max_play = Parameter('max. summed steps without substantial change', FloatRange(), readonly=False, default=70)
|
motor_play = Parameter('acceptable motor play within hysteresis', FloatRange(), readonly=False, default=10)
|
||||||
|
motor_max_play = Parameter('acceptable motor play outside hysteresis', FloatRange(), readonly=False, default=70)
|
||||||
pollinterval = 0.1
|
pollinterval = 0.1
|
||||||
fast_pollfactor = 1
|
|
||||||
|
|
||||||
_mot_target = None # for detecting manual motor manipulations
|
_mot_target = None # for detecting manual motor manipulations
|
||||||
_filter_start = 0
|
_filter_start = 0
|
||||||
|
_filtered = False
|
||||||
_cnt = 0
|
_cnt = 0
|
||||||
_sum = 0
|
_sum = 0
|
||||||
_cnt_rderr = 0
|
_cnt_rderr = 0
|
||||||
_cnt_wrerr = 0
|
_cnt_wrerr = 0
|
||||||
_action = None
|
|
||||||
_last_force = 0
|
|
||||||
_expected_step = 1
|
|
||||||
_in_cnt = 0
|
_in_cnt = 0
|
||||||
_init_action = False
|
|
||||||
_zero_pos_tol = None
|
_zero_pos_tol = None
|
||||||
_find_target = 0
|
_state = None
|
||||||
|
|
||||||
def earlyInit(self):
|
def earlyInit(self):
|
||||||
super().earlyInit()
|
super().earlyInit()
|
||||||
self._zero_pos_tol = {}
|
self._zero_pos_tol = {}
|
||||||
self._action = self.idle
|
|
||||||
|
def initModule(self):
|
||||||
|
super().initModule()
|
||||||
|
self._state = StateMachine(logger=self.log, threaded=False, cleanup=self.cleanup)
|
||||||
|
|
||||||
|
def doPoll(self):
|
||||||
|
self.read_value()
|
||||||
|
self._state.cycle()
|
||||||
|
|
||||||
def drive_relative(self, step, ntry=3):
|
def drive_relative(self, step, ntry=3):
|
||||||
"""drive relative, try 3 times"""
|
"""drive relative, try 3 times"""
|
||||||
@ -105,31 +114,14 @@ class Uniax(PersistentMixin, Drivable):
|
|||||||
mot = self.motor
|
mot = self.motor
|
||||||
if mot.isBusy():
|
if mot.isBusy():
|
||||||
if mot.target != self._mot_target:
|
if mot.target != self._mot_target:
|
||||||
self.action = self.idle
|
raise Error('control stopped - motor moved directly')
|
||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def next_action(self, action):
|
def zero_pos(self, value):
|
||||||
"""call next action
|
|
||||||
|
|
||||||
:param action: function to be called next time
|
|
||||||
:param do_now: do next action in the same cycle
|
|
||||||
"""
|
|
||||||
self._action = action
|
|
||||||
self._init_action = True
|
|
||||||
self.log.info('action %r', action.__name__)
|
|
||||||
|
|
||||||
def init_action(self):
|
|
||||||
"""return true when called the first time after next_action"""
|
|
||||||
if self._init_action:
|
|
||||||
self._init_action = False
|
|
||||||
return True
|
|
||||||
return False
|
|
||||||
|
|
||||||
def zero_pos(self, value,):
|
|
||||||
"""get high_pos or low_pos, depending on sign of value
|
"""get high_pos or low_pos, depending on sign of value
|
||||||
|
|
||||||
:param force: when not 0, return an estimate for a good starting position
|
:param value: return an estimate for a good starting position
|
||||||
"""
|
"""
|
||||||
|
|
||||||
name = 'high_pos' if value > 0 else 'low_pos'
|
name = 'high_pos' if value > 0 else 'low_pos'
|
||||||
@ -158,145 +150,141 @@ class Uniax(PersistentMixin, Drivable):
|
|||||||
setattr(self, name, pos)
|
setattr(self, name, pos)
|
||||||
return pos
|
return pos
|
||||||
|
|
||||||
def find(self, force, target):
|
def do_find(self, state):
|
||||||
"""find active (engaged) range"""
|
"""find active (engaged) range"""
|
||||||
sign = math.copysign(1, target)
|
if state.init:
|
||||||
if force * sign > self.hysteresis or force * sign > target * sign:
|
state.prev_direction = 0 # find not yet started
|
||||||
|
direction = math.copysign(1, self.target)
|
||||||
|
if self.value * direction > self.hysteresis or self.value * direction > self.target * direction:
|
||||||
if self.motor_busy():
|
if self.motor_busy():
|
||||||
self.log.info('motor stopped - substantial force detected: %g', force)
|
self.log.info('motor stopped - substantial force detected: %g', self.value)
|
||||||
self.motor.stop()
|
self.motor.stop()
|
||||||
elif self.init_action():
|
elif state.prev_direction == 0:
|
||||||
self.next_action(self.adjust)
|
return self.do_adjust
|
||||||
return
|
if abs(self.value) > self.hysteresis:
|
||||||
if abs(force) > self.hysteresis:
|
self.set_zero_pos(self.value, self.motor.read_value())
|
||||||
self.set_zero_pos(force, self.motor.read_value())
|
return self.do_adjust
|
||||||
self.next_action(self.adjust)
|
if self.value * direction < -self.hysteresis:
|
||||||
return
|
state.force_before_free = self.value
|
||||||
if force * sign < -self.hysteresis:
|
return self.do_free
|
||||||
self._previous_force = force
|
|
||||||
self.next_action(self.free)
|
|
||||||
return
|
|
||||||
if self.motor_busy():
|
if self.motor_busy():
|
||||||
if sign * self._find_target < 0: # target sign changed
|
if direction == -state.prev_direction: # target direction changed
|
||||||
self.motor.stop()
|
self.motor.stop()
|
||||||
self.next_action(self.find) # restart find
|
state.init_find = True # restart find
|
||||||
return
|
return Retry()
|
||||||
|
zero_pos = self.zero_pos(self.target)
|
||||||
|
if state.prev_direction: # find already started
|
||||||
|
if abs(self.motor.target - self.motor.value) > self.motor.tolerance:
|
||||||
|
# no success on last find try, try short and strong step
|
||||||
|
self.write_adjusting(True)
|
||||||
|
self.log.info('one step to %g', self.motor.value + self.safe_step)
|
||||||
|
self.drive_relative(direction * self.safe_step)
|
||||||
|
return Retry()
|
||||||
else:
|
else:
|
||||||
self._find_target = target
|
state.prev_direction = math.copysign(1, self.target)
|
||||||
zero_pos = self.zero_pos(target)
|
side_name = 'negative' if direction == -1 else 'positive'
|
||||||
side_name = 'positive' if target > 0 else 'negative'
|
if zero_pos is not None:
|
||||||
if not self.init_action():
|
self.status = 'BUSY', 'change to %s side' % side_name
|
||||||
if abs(self.motor.target - self.motor.value) > self.motor.tolerance:
|
zero_pos += direction * (self.hysteresis * self.slope - self.motor.tolerance)
|
||||||
# no success on last find try, try short and strong step
|
if (self.motor.value - zero_pos) * direction < -self.motor.tolerance:
|
||||||
self.write_adjusting(True)
|
self.write_adjusting(False)
|
||||||
self.log.info('one step to %g', self.motor.value + self.safe_step)
|
self.log.info('change side to %g', zero_pos)
|
||||||
self.drive_relative(sign * self.safe_step)
|
self.drive_relative(zero_pos - self.motor.value)
|
||||||
return
|
return Retry()
|
||||||
if zero_pos is not None:
|
# we are already at or beyond zero_pos
|
||||||
self.status = 'BUSY', 'change to %s side' % side_name
|
return self.do_adjust
|
||||||
zero_pos += sign * (self.hysteresis * self.slope - self.motor.tolerance)
|
self.write_adjusting(False)
|
||||||
if (self.motor.value - zero_pos) * sign < -self.motor.tolerance:
|
self.status = 'BUSY', 'find %s side' % side_name
|
||||||
self.write_adjusting(False)
|
self.log.info('one turn to %g', self.motor.value + direction * 360)
|
||||||
self.log.info('change side to %g', zero_pos)
|
self.drive_relative(direction * 360)
|
||||||
self.drive_relative(zero_pos - self.motor.value)
|
return Retry()
|
||||||
return
|
|
||||||
# we are already at or beyond zero_pos
|
|
||||||
self.next_action(self.adjust)
|
|
||||||
return
|
|
||||||
self.write_adjusting(False)
|
|
||||||
self.status = 'BUSY', 'find %s side' % side_name
|
|
||||||
self.log.info('one turn to %g', self.motor.value + sign * 360)
|
|
||||||
self.drive_relative(sign * 360)
|
|
||||||
|
|
||||||
def free(self, force, target):
|
def cleanup(self, state):
|
||||||
|
"""in case of error, set error status"""
|
||||||
|
if state.stopped: # stop or restart
|
||||||
|
if state.stopped is Restart:
|
||||||
|
return
|
||||||
|
self.status = 'IDLE', 'stopped'
|
||||||
|
self.log.warning('stopped')
|
||||||
|
else:
|
||||||
|
self.status = 'ERROR', str(state.last_error)
|
||||||
|
if isinstance(state.last_error, Error):
|
||||||
|
self.log.error('%s', state.last_error)
|
||||||
|
else:
|
||||||
|
self.log.error('%r raised in state %r', str(state.last_error), state.status_string)
|
||||||
|
self.motor.stop()
|
||||||
|
self.write_adjusting(False)
|
||||||
|
|
||||||
|
def do_free(self, state):
|
||||||
"""free from high force at other end"""
|
"""free from high force at other end"""
|
||||||
|
if state.init:
|
||||||
|
state.free_way = None
|
||||||
if self.motor_busy():
|
if self.motor_busy():
|
||||||
return
|
return Retry()
|
||||||
if abs(force) > abs(self._previous_force) + self.tolerance:
|
if abs(self.value) > abs(state.force_before_free) + self.tolerance:
|
||||||
self.stop()
|
self.stop()
|
||||||
self.status = 'ERROR', 'force increase while freeing'
|
self.status = 'ERROR', 'force increase while freeing'
|
||||||
self.log.error(self.status[1])
|
self.log.error(self.status[1])
|
||||||
return
|
return None
|
||||||
if abs(force) < self.hysteresis:
|
if abs(self.value) < self.hysteresis:
|
||||||
self.next_action(self.find)
|
return self.do_find
|
||||||
return
|
if state.free_way is None:
|
||||||
if self.init_action():
|
state.free_way = 0
|
||||||
self._free_way = 0
|
self.log.info('free from high force %g', self.value)
|
||||||
self.log.info('free from high force %g', force)
|
|
||||||
self.write_adjusting(True)
|
self.write_adjusting(True)
|
||||||
sign = math.copysign(1, target)
|
direction = math.copysign(1, self.target)
|
||||||
if self._free_way > (abs(self._previous_force) + self.hysteresis) * self.slope:
|
if state.free_way > (abs(state.force_before_free) + self.hysteresis) * self.slope:
|
||||||
self.stop()
|
self.stop()
|
||||||
self.status = 'ERROR', 'freeing failed'
|
self.status = 'ERROR', 'freeing failed'
|
||||||
self.log.error(self.status[1])
|
self.log.error(self.status[1])
|
||||||
return
|
return None
|
||||||
self._free_way += self.safe_step
|
state.free_way += self.safe_step
|
||||||
self.drive_relative(sign * self.safe_step)
|
self.drive_relative(direction * self.safe_step)
|
||||||
|
return Retry()
|
||||||
|
|
||||||
def within_tolerance(self, force, target):
|
def do_within_tolerance(self, state):
|
||||||
"""within tolerance"""
|
"""within tolerance"""
|
||||||
if self.motor_busy():
|
if self.motor_busy():
|
||||||
return
|
return Retry()
|
||||||
if abs(target - force) > self.tolerance:
|
if abs(self.target - self.value) > self.tolerance:
|
||||||
self.next_action(self.adjust)
|
return self.do_adjust
|
||||||
elif self.init_action():
|
self.status = 'IDLE', 'within tolerance'
|
||||||
self.status = 'IDLE', 'within tolerance'
|
|
||||||
|
|
||||||
def adjust(self, force, target):
|
def do_adjust(self, state):
|
||||||
"""adjust force"""
|
"""adjust force"""
|
||||||
|
if state.init:
|
||||||
|
state.prev_force = None
|
||||||
if self.motor_busy():
|
if self.motor_busy():
|
||||||
return
|
return
|
||||||
if abs(target - force) < self.tolerance:
|
if abs(self.target - self.value) < self.tolerance:
|
||||||
self._in_cnt += 1
|
self._in_cnt += 1
|
||||||
if self._in_cnt >= 3:
|
if self._in_cnt >= 3:
|
||||||
self.motor_play = 0
|
return self.do_within_tolerance
|
||||||
self.next_action(self.within_tolerance)
|
|
||||||
return
|
|
||||||
else:
|
else:
|
||||||
self._in_cnt = 0
|
self._in_cnt = 0
|
||||||
if self.init_action():
|
if state.prev_force is None:
|
||||||
self.motor_play = 0
|
state.prev_force = self.value
|
||||||
|
state.prev_pos = self.motor.pos
|
||||||
self.write_adjusting(True)
|
self.write_adjusting(True)
|
||||||
self.status = 'BUSY', 'adjusting force'
|
self.status = 'BUSY', 'adjusting force'
|
||||||
elif not self._filtered:
|
elif not self._filtered:
|
||||||
return
|
return
|
||||||
else:
|
else:
|
||||||
force_step = force - self._last_force
|
if abs(self.value - state.prev_force) > self.substantial_force:
|
||||||
if self._expected_step:
|
state.prev_force = self.value
|
||||||
# compare detected / expected step
|
state.prev_pos = self.motor.value
|
||||||
q = force_step / self._expected_step
|
else:
|
||||||
mstep = self._expected_step * self.slope
|
motor_dif = abs(self.value - state.prev_pos)
|
||||||
if q < 0.1:
|
if abs(self.value) < self.hysteresis:
|
||||||
self.motor_play += mstep
|
if motor_dif > self.motor_play:
|
||||||
elif q > 0.5:
|
|
||||||
if abs(self.motor_play) <= abs(mstep):
|
|
||||||
self.motor_play = 0
|
|
||||||
else:
|
|
||||||
self.motor_play = self.motor_play * (1 - abs(mstep / self.motor_play))
|
|
||||||
if abs(self.motor_play) >= 10:
|
|
||||||
if force < self.hysteresis:
|
|
||||||
self.log.warning('adjusting failed - try to find zero pos')
|
self.log.warning('adjusting failed - try to find zero pos')
|
||||||
self.set_zero_pos(target, None)
|
self.set_zero_pos(self.target, None)
|
||||||
self.next_action(self.find)
|
return self.do_find
|
||||||
elif abs(self.motor_play) > self.max_play:
|
elif motor_dif > self.motor_max_play:
|
||||||
self.stop()
|
raise Error('force seems not to change substantially')
|
||||||
self.status = 'ERROR', 'force seems not to change substantially'
|
force_step = (self.target - self.value) * self.pid_i
|
||||||
self.log.error(self.status[1])
|
self.drive_relative(force_step * self.slope)
|
||||||
return
|
return Retry()
|
||||||
self._last_force = force
|
|
||||||
force_step = (target - force) * self.pid_i
|
|
||||||
if abs(target - force) < self.tolerance * 0.5:
|
|
||||||
self._expected_step = 0
|
|
||||||
return
|
|
||||||
self._expected_step = force_step
|
|
||||||
step = force_step * self.slope
|
|
||||||
self.drive_relative(step)
|
|
||||||
|
|
||||||
def idle(self, *args):
|
|
||||||
if self.init_action():
|
|
||||||
self.write_adjusting(False)
|
|
||||||
if self.status[0] == 'BUSY':
|
|
||||||
self.status = 'IDLE', 'stopped'
|
|
||||||
|
|
||||||
def read_value(self):
|
def read_value(self):
|
||||||
try:
|
try:
|
||||||
@ -313,7 +301,6 @@ class Uniax(PersistentMixin, Drivable):
|
|||||||
now = time.time()
|
now = time.time()
|
||||||
if self.motor_busy():
|
if self.motor_busy():
|
||||||
# do not filter while driving
|
# do not filter while driving
|
||||||
self.value = force
|
|
||||||
self.reset_filter()
|
self.reset_filter()
|
||||||
self._filtered = False
|
self._filtered = False
|
||||||
else:
|
else:
|
||||||
@ -322,46 +309,42 @@ class Uniax(PersistentMixin, Drivable):
|
|||||||
if now < self._filter_start + self.filter_interval:
|
if now < self._filter_start + self.filter_interval:
|
||||||
return Done
|
return Done
|
||||||
force = self._sum / self._cnt
|
force = self._sum / self._cnt
|
||||||
self.value = force
|
|
||||||
self.reset_filter(now)
|
self.reset_filter(now)
|
||||||
self._filtered = True
|
self._filtered = True
|
||||||
if abs(force) > self.limit + self.hysteresis:
|
if abs(force) > self.limit + self.hysteresis:
|
||||||
|
self.motor.stop()
|
||||||
self.status = 'ERROR', 'above max limit'
|
self.status = 'ERROR', 'above max limit'
|
||||||
self.log.error(self.status[1])
|
self.log.error(self.status[1])
|
||||||
return Done
|
return Done
|
||||||
if self.zero_pos(force) is None and abs(force) > self.hysteresis and self._filtered:
|
if self.zero_pos(force) is None and abs(force) > self.hysteresis and self._filtered:
|
||||||
self.set_zero_pos(force, self.motor.read_value())
|
self.set_zero_pos(force, self.motor.read_value())
|
||||||
self._action(self.value, self.target)
|
return force
|
||||||
return Done
|
|
||||||
|
|
||||||
def write_target(self, target):
|
def write_target(self, target):
|
||||||
if abs(target) > self.limit:
|
if abs(target) > self.limit:
|
||||||
raise BadValueError('force above limit')
|
raise BadValueError('force above limit')
|
||||||
if abs(target - self.value) <= self.tolerance:
|
if abs(target - self.value) <= self.tolerance:
|
||||||
if self.isBusy():
|
if not self.isBusy():
|
||||||
self.stop()
|
|
||||||
self.next_action(self.within_tolerance)
|
|
||||||
else:
|
|
||||||
self.status = 'IDLE', 'already at target'
|
self.status = 'IDLE', 'already at target'
|
||||||
self.next_action(self.within_tolerance)
|
self._state.start(self.do_within_tolerance)
|
||||||
return target
|
return target
|
||||||
self.log.info('new target %g', target)
|
self.log.info('new target %g', target)
|
||||||
self._cnt_rderr = 0
|
self._cnt_rderr = 0
|
||||||
self._cnt_wrerr = 0
|
self._cnt_wrerr = 0
|
||||||
self.status = 'BUSY', 'changed target'
|
self.status = 'BUSY', 'changed target'
|
||||||
if self.value * math.copysign(1, target) > self.hysteresis:
|
if self.value * math.copysign(1, target) > self.hysteresis:
|
||||||
self.next_action(self.adjust)
|
self._state.start(self.do_adjust)
|
||||||
else:
|
else:
|
||||||
self.next_action(self.find)
|
self._state.start(self.do_find)
|
||||||
return target
|
return target
|
||||||
|
|
||||||
@Command()
|
@Command()
|
||||||
def stop(self):
|
def stop(self):
|
||||||
self._action = self.idle
|
|
||||||
if self.motor.isBusy():
|
if self.motor.isBusy():
|
||||||
self.log.info('stop motor')
|
self.log.info('stop motor')
|
||||||
self.motor.stop()
|
self.motor.stop()
|
||||||
self.next_action(self.idle)
|
self.status = 'IDLE', 'stopped'
|
||||||
|
self._state.stop()
|
||||||
|
|
||||||
def write_force_offset(self, value):
|
def write_force_offset(self, value):
|
||||||
self.force_offset = value
|
self.force_offset = value
|
||||||
|
Loading…
x
Reference in New Issue
Block a user