diff --git a/secop_psi/uniax.py b/secop_psi/uniax.py index 1da6d6d..21c1fa6 100644 --- a/secop_psi/uniax.py +++ b/secop_psi/uniax.py @@ -22,7 +22,7 @@ """use transducer and motor to adjust force""" import time -from secop.lib import clamp +import math from secop.core import Drivable, Parameter, FloatRange, EnumType, Done, \ Attached, Command, PersistentMixin, PersistentParam, BoolType @@ -32,17 +32,25 @@ class Uniax(PersistentMixin, Drivable): motor = Attached() transducer = Attached() tolerance = Parameter('force tolerance', FloatRange(), readonly=False, default=0.1) - pid_p = PersistentParam('proportial term', FloatRange(unit='deg/N'), readonly=False, default=0.25, persistent='auto') + slope = PersistentParam('spring constant', FloatRange(unit='deg/N'), readonly=False, + default=0.5, persistent='auto') + pid_i = PersistentParam('integral', FloatRange(), readonly=False, default=0.5, persistent='auto') filter_interval = Parameter('filter time', FloatRange(0, 60, unit='s'), readonly=False, default=1) current_sign = Parameter('', EnumType(negative=-1, undefined=0, positive=1), default=0) current_step = Parameter('', FloatRange(), default=0) - force_offset = PersistentParam('transducer offset', FloatRange(), readonly=False, default=0, initwrite=True, persistent='auto') - hysteresis = PersistentParam('force hysteresis', FloatRange(0), readonly=False, default=5, persistent='auto') - release_step = PersistentParam('step when releasing force', FloatRange(0), readonly=False, default=20, persistent='auto') + force_offset = PersistentParam('transducer offset', FloatRange(), readonly=False, default=0, + initwrite=True, persistent='auto') + hysteresis = PersistentParam('force hysteresis', FloatRange(0), readonly=False, default=5, + persistent='auto') + release_step = PersistentParam('step when releasing force', FloatRange(0), readonly=False, default=20, + persistent='auto') adjusting = Parameter('', BoolType(), readonly=False, default=False) - adjusting_current = PersistentParam('current when adjusting force', FloatRange(0, 2.8, unit='A'), readonly=False, default=0.5, persistent='auto') - adjusting_step = PersistentParam('max. motor step when adjusting force', FloatRange(0, unit='deg'), readonly=False, default=5, persistent='auto') - safe_current = PersistentParam('current when moving far', FloatRange(0, 2.8, unit='A'), readonly=False, default=0.2, persistent='auto') + adjusting_current = PersistentParam('current when adjusting force', FloatRange(0, 2.8, unit='A'), readonly=False, + default=0.5, persistent='auto') + adjusting_step = PersistentParam('max. motor step when adjusting force', FloatRange(0, unit='deg'), readonly=False, + default=5, persistent='auto') + safe_current = PersistentParam('current when moving far', FloatRange(0, 2.8, unit='A'), readonly=False, + default=0.2, persistent='auto') low_pos = PersistentParam('max. position for positive forces', FloatRange(), readonly=False, default=0) high_pos = PersistentParam('min. position for negative forces', FloatRange(), readonly=False, default=0) pollinterval = 0.1 @@ -57,98 +65,122 @@ class Uniax(PersistentMixin, Drivable): _cnt_rderr = 0 _cnt_wrerr = 0 - def pos_range(self, target_sign, value=None): - attr = 'high_pos' if target_sign > 0 else 'low_pos' - if value is not None: - value -= self.release_step * target_sign - setattr(self, attr, value) - return getattr(self, attr) - - def drive_to(self, pos): + def rel_drive(self, pos): + """drive relative to high_pos / low_pos, with current_sign""" mot = self._motor - self.current_step = pos - mot.value - for itry in range(3): + step = (pos - self.rel_pos()) * self.current_sign + self.current_step = step + for _ in range(3): try: print('drive by %.2f' % self.current_step) - self._mot_target = self._motor.write_target(pos) + self._mot_target = self._motor.write_target(mot.value + step) self._cnt_wrerr = max(0, self._cnt_wrerr - 1) break except Exception as e: - print('driver_to', e) + print('drive error %s ' % e) self._cnt_wrerr += 1 if self._cnt_wrerr > 5: raise - print('RESET') + print('motor reset') self._motor.reset() - def drive_generator(self, target): - self._mot_target = self._motor.target - mot = self._motor - target_sign = -1 if target < 0 else 1 - force = self.value - # TODO: do this only when abs(target) > hysteresis? - if target_sign != self.current_sign and force * target_sign < self.hysteresis: - self.write_adjusting(False) - pos_lim = None - while target_sign * (target_sign * self.hysteresis - force) > self.tolerance: - self.status = 'BUSY', 'find active motor range' - # we have to increase abs(force) - if not mot.isBusy() or target_sign * (mot.target - mot.value) < 0: - self.drive_to(mot.value + 360 * target_sign) - force = yield - if target_sign * force < self.hysteresis: - pos_lim = mot.value - print('found active range') - while mot.isBusy(): - mot.stop() - force = yield - print('stopped') - if target_sign * (force - self.hysteresis) > 0: - self.current_sign = target_sign - elif pos_lim is not None: - self.pos_range(target_sign, pos_lim) - if abs(force) < self.hysteresis and self.low_pos < mot.value < self.high_pos: - self.current_sign = 0 - if target_sign * (self.value - target) > self.tolerance: - self.status = 'BUSY', 'release force' - self.write_adjusting(True) - # abs(force) too high - while target_sign * (target - self.value) < self.hysteresis and self.value * target_sign > 0: - self.drive_to(mot.value - target_sign * min(self.adjusting_step, self.release_step)) - while mot.isBusy(): - self.reset_filter() - yield - if target_sign * (force + self.hysteresis) < 0: - print('force', force) - self.terminate('ERROR', 'force too high when moving back') - return - print('released', self.value, target) - self.write_adjusting(True) - force = yield - while target_sign * (target - self.value) > self.tolerance: - self.status = 'BUSY', 'adjusting force' - # slowly increasing abs(force) - step = self.pid_p * (target - self.value) - if abs(step) > self.adjusting_step: - step = target_sign * self.adjusting_step - self.drive_to(mot.value + step) - while mot.isBusy(): - self.reset_filter() - yield - self.write_adjusting(False) - self.status = 'IDLE', 'reached target' - return + def rel_pos(self): + if self.current_sign < 0: + return self.low_pos - self._motor.value + return self._motor.value - self.high_pos - def reset_filter(self, now=0): + def reset_filter(self, now=0.0): self._sum = self._cnt = 0 self._filter_start = now or time.time() - def read_value(self): - if self._target is not None: - self._driver = self.drive_generator(self._target) - self._driver.send(None) # start - self._target = None + def motor_busy(self): mot = self._motor + if mot.isBusy(): + if mot.target != self._mot_target: + self.action = None + return True + return False + + def next_action(self, action, do_now=True): + """call next action + + :param action: function to be called next time + :param do_now: do next action in the same cycle + """ + self._action = action + self._init_action = True + if do_now: + self._next_cycle = False + + def init_action(self): + """return true when called the first time after next_action""" + if self._init_action: + self._init_action = False + return True + return False + + def execute_action(self): + for _ in range(5): # limit number of subsequent actions in one cycle + self._next_cycle = True + self._action(self.value * self.current_sign, self.target * self.current_sign) + if self._next_cycle: + break + + def find(self, force, target): + if force > self.hysteresis: + if self.motor_busy(): + self.stop() + return + if self.current_sign > 0: + self.high_pos += self.rel_pos() - self.slope * force + else: + self.low_pos -= self.rel_pos() - self.slope * force + self._pid_factor = 1 + self.next_action(self.release, True) + return + if self.init_action(): + self.write_adjusting(False) + self.status = 'BUSY', 'find active motor range' + if self.rel_pos() < 0: + self.rel_drive((2 * self.hysteresis) * self.slope) + else: + self.rel_drive(self.rel_pos() + 360) + + def release(self, force, target): + if self.motor_busy(): + return + if force < target + self.tolerance: + self.next_action(self.adjust, True) + return + step = (target - self.release_step - force) * self.slope * self.pid_i + if self.init_action(): + self.write_adjusting(True) + self.status = 'BUSY', 'releasing force' + self.rel_drive(self.rel_pos() + step) + + def adjust(self, force, target): + if self.motor_busy(): + return + if abs(target - force) < self.tolerance: + self.status = 'IDLE', '' + self.next_action(self.idle) + elif force > target: + if self._pid_factor < 0.2: + self.status = 'WARN', 'too may tries' + self.next_action(self.idle) + self._pid_factor *= 0.5 + self.next_action(self.release) + else: + if self.init_action(): + self.write_adjusting(True) + self.status = 'BUSY', 'adjusting force' + step = (target - force) * self.slope * self.pid_i * self._pid_factor + self.rel_drive(self.rel_pos() + step) + + def idle(self): + pass + + def read_value(self): try: value = self._transducer.read_value() self._cnt_rderr = max(0, self._cnt_rderr - 1) @@ -157,35 +189,21 @@ class Uniax(PersistentMixin, Drivable): if self._cnt_rderr > 10: self.terminate('ERROR', 'too many read errors: %s' % e) return Done - + now = time.time() - newvalue = False - if self._cnt > 0 and now > self._filter_start + self.filter_interval: + if self.motor_busy(): + # do not filter while driving + self.value = value + self.reset_filter() + else: + self._sum += value + self._cnt += 1 + if now < self._filter_start + self.filter_interval: + return Done self.value = self._sum / self._cnt self.reset_filter(now) - newvalue = True - self._sum += value - self._cnt += 1 - - if self._driver: - try: - if self._mot_target != mot.target and mot.isBusy(): - print('mottarget', mot.target, self._mot_target) - self.terminate('ERROR', 'stopped due to direct motor manipulation') - return Done - if self.adjusting: - if newvalue: - print(' %.2f' % self.value) - # next step only when a new filtered value is available - self._driver.send(self.value) - else: - print(' %.2f' % value) - self._driver.send(value) - except StopIteration: - self._driver = None - except Exception as e: - self.terminate('ERROR', str(e)) - self._driver = None + if self.current_sign: + self.execute_action() return Done def write_target(self, target): @@ -196,8 +214,10 @@ class Uniax(PersistentMixin, Drivable): return target self._cnt_rderr = 0 self._cnt_wrerr = 0 - self.status = 'BUSY', 'changed target' - self._target = target + if self.target: + self.status = 'BUSY', 'changed target' + self.current_sign = math.copysign(1, self.target) + self.next_action(self.find) return target def terminate(self, *status):