diff --git a/cfg/uniax.cfg b/cfg/uniax.cfg index 21473fc..3131094 100644 --- a/cfg/uniax.cfg +++ b/cfg/uniax.cfg @@ -5,6 +5,12 @@ description = uniax pressure stick with motor and transducer [INTERFACE] uri = tcp://5000 +[force] +description = force control +class = secop_psi.uniax.Uniax +motor = drv +transducer = transducer + #[drv_iodev] #description = #class = secop.core.BytesIO @@ -34,12 +40,6 @@ uri = tcp://192.168.127.254:3001 digits = 2 scale_factor = 0.0156 -[force] -description = force control -class = secop_psi.uniax.Uniax -motor = drv -transducer = transducer - [res] description = temperature on uniax stick class = secop_psi.ls340res.ResChannel diff --git a/secop_psi/uniax.py b/secop_psi/uniax.py index 21c1fa6..c4ccba5 100644 --- a/secop_psi/uniax.py +++ b/secop_psi/uniax.py @@ -23,71 +23,77 @@ import time import math -from secop.core import Drivable, Parameter, FloatRange, EnumType, Done, \ +from secop.core import Drivable, Parameter, FloatRange, Done, \ Attached, Command, PersistentMixin, PersistentParam, BoolType +from secop.errors import BadValueError class Uniax(PersistentMixin, Drivable): value = Parameter(unit='N') motor = Attached() transducer = Attached() - tolerance = Parameter('force tolerance', FloatRange(), readonly=False, default=0.1) + limit = Parameter('abs limit of force', FloatRange(0, 150, unit='N'), readonly=False, default=150) + tolerance = Parameter('force tolerance', FloatRange(0, 10, unit='N'), readonly=False, default=0.1) slope = PersistentParam('spring constant', FloatRange(unit='deg/N'), readonly=False, default=0.5, persistent='auto') pid_i = PersistentParam('integral', FloatRange(), readonly=False, default=0.5, persistent='auto') filter_interval = Parameter('filter time', FloatRange(0, 60, unit='s'), readonly=False, default=1) - current_sign = Parameter('', EnumType(negative=-1, undefined=0, positive=1), default=0) - current_step = Parameter('', FloatRange(), default=0) - force_offset = PersistentParam('transducer offset', FloatRange(), readonly=False, default=0, + current_step = Parameter('', FloatRange(unit='deg'), default=0) + force_offset = PersistentParam('transducer offset', FloatRange(unit='N'), readonly=False, default=0, initwrite=True, persistent='auto') - hysteresis = PersistentParam('force hysteresis', FloatRange(0), readonly=False, default=5, + hysteresis = PersistentParam('force hysteresis', FloatRange(0, 150, unit='N'), readonly=False, default=5, persistent='auto') - release_step = PersistentParam('step when releasing force', FloatRange(0), readonly=False, default=20, - persistent='auto') - adjusting = Parameter('', BoolType(), readonly=False, default=False) + adjusting = Parameter('', BoolType(), readonly=False, default=False, initwrite=True) adjusting_current = PersistentParam('current when adjusting force', FloatRange(0, 2.8, unit='A'), readonly=False, default=0.5, persistent='auto') - adjusting_step = PersistentParam('max. motor step when adjusting force', FloatRange(0, unit='deg'), readonly=False, - default=5, persistent='auto') + safe_step = PersistentParam('max. motor step when adjusting force', FloatRange(0, unit='deg'), readonly=False, + default=5, persistent='auto') safe_current = PersistentParam('current when moving far', FloatRange(0, 2.8, unit='A'), readonly=False, default=0.2, persistent='auto') - low_pos = PersistentParam('max. position for positive forces', FloatRange(), readonly=False, default=0) - high_pos = PersistentParam('min. position for negative forces', FloatRange(), readonly=False, default=0) + low_pos = Parameter('max. position for positive forces', FloatRange(unit='deg'), readonly=False, needscfg=False) + high_pos = Parameter('min. position for negative forces', FloatRange(unit='deg'), readonly=False, needscfg=False) pollinterval = 0.1 fast_pollfactor = 1 - _driver = None # whe defined a gerenator to be called for driving - _target = None # freshly set target _mot_target = None # for detecting manual motor manipulations _filter_start = 0 _cnt = 0 _sum = 0 _cnt_rderr = 0 _cnt_wrerr = 0 + _action = None + _last_force = 0 + _expected_step = 1 + _fail_cnt = 0 + _in_cnt = 0 + _init_action = False + _zero_pos_tol = None + _find_target = 0 - def rel_drive(self, pos): - """drive relative to high_pos / low_pos, with current_sign""" + def earlyInit(self): + self._zero_pos_tol = {} + self._action = self.idle + + def drive_relative(self, step, ntry=3): + """drive relative, try 3 times""" mot = self._motor - step = (pos - self.rel_pos()) * self.current_sign + mot.read_value() # make sure motor value is fresh + if self.adjusting and abs(step) > self.safe_step: + step = math.copysign(self.safe_step, step) self.current_step = step - for _ in range(3): + for _ in range(ntry): try: - print('drive by %.2f' % self.current_step) self._mot_target = self._motor.write_target(mot.value + step) self._cnt_wrerr = max(0, self._cnt_wrerr - 1) - break + return True except Exception as e: - print('drive error %s ' % e) + self.log.warning('drive error %s', e) self._cnt_wrerr += 1 if self._cnt_wrerr > 5: raise - print('motor reset') + self.log.warning('motor reset') self._motor.reset() - - def rel_pos(self): - if self.current_sign < 0: - return self.low_pos - self._motor.value - return self._motor.value - self.high_pos + return False def reset_filter(self, now=0.0): self._sum = self._cnt = 0 @@ -97,7 +103,7 @@ class Uniax(PersistentMixin, Drivable): mot = self._motor if mot.isBusy(): if mot.target != self._mot_target: - self.action = None + self.action = self.idle return True return False @@ -109,6 +115,7 @@ class Uniax(PersistentMixin, Drivable): """ self._action = action self._init_action = True + self.log.info('action %r', action.__name__) if do_now: self._next_cycle = False @@ -122,120 +129,241 @@ class Uniax(PersistentMixin, Drivable): def execute_action(self): for _ in range(5): # limit number of subsequent actions in one cycle self._next_cycle = True - self._action(self.value * self.current_sign, self.target * self.current_sign) + self._action(self.value, self.target) if self._next_cycle: break - def find(self, force, target): - if force > self.hysteresis: - if self.motor_busy(): - self.stop() - return - if self.current_sign > 0: - self.high_pos += self.rel_pos() - self.slope * force - else: - self.low_pos -= self.rel_pos() - self.slope * force - self._pid_factor = 1 - self.next_action(self.release, True) - return - if self.init_action(): - self.write_adjusting(False) - self.status = 'BUSY', 'find active motor range' - if self.rel_pos() < 0: - self.rel_drive((2 * self.hysteresis) * self.slope) - else: - self.rel_drive(self.rel_pos() + 360) + def zero_pos(self, value,): + """get high_pos or low_pos, depending on sign of value - def release(self, force, target): - if self.motor_busy(): - return - if force < target + self.tolerance: + :param force: when not 0, return an estimate for a good starting position + """ + + name = 'high_pos' if value > 0 else 'low_pos' + if name not in self._zero_pos_tol: + return None + return getattr(self, name) + + def set_zero_pos(self, force, pos): + """set zero position high_pos or low_pos, depending on sign and value of force + + :param force: the force used for calculating zero_pos + :param pos: the position used for calculating zero_pos + """ + name = 'high_pos' if force > 0 else 'low_pos' + if pos is None: + self._zero_pos_tol.pop(name, None) + return None + pos -= force * self.slope + tol = (abs(force) - self.hysteresis) * self.slope * 0.2 + if name in self._zero_pos_tol: + old = self.zero_pos(force) + if abs(old - pos) < self._zero_pos_tol[name] + tol: + return + self._zero_pos_tol[name] = tol + self.log.info('set %s = %.1f +- %.1f (@%g N)' % (name, pos, tol, force)) + setattr(self, name, pos) + return pos + + def find(self, force, target): + """find active (engaged) range""" + sign = math.copysign(1, target) + if force * sign > self.hysteresis or force * sign > target * sign: + if self.motor_busy(): + self.log.info('motor stopped - substantial force detected: %g', force) + self._motor.stop() + elif self.init_action(): + self.next_action(self.adjust, True) + return + if abs(force) > self.hysteresis: + self.set_zero_pos(force, self._motor.read_value()) self.next_action(self.adjust, True) return - step = (target - self.release_step - force) * self.slope * self.pid_i + if force * sign < -self.hysteresis: + self._previous_force = force + self.next_action(self.free) + return + if self.motor_busy(): + if sign * self._find_target < 0: # target sign changed + self._motor.stop() + self.next_action(self.find) # restart find + return + else: + self._find_target = target + zero_pos = self.zero_pos(target) + side_name = 'positive' if target > 0 else 'negative' + if not self.init_action(): + if abs(self._motor.target - self._motor.value) > self._motor.tolerance: + # no success on last find try, try short and strong step + self.write_adjusting(True) + self.log.info('one step to %g', self._motor.value + self.safe_step) + self.drive_relative(sign * self.safe_step) + return + if zero_pos is not None: + self.status = 'BUSY', 'change to %s side' % side_name + zero_pos += sign * (self.hysteresis * self.slope - self._motor.tolerance) + if (self._motor.value - zero_pos) * sign < -self._motor.tolerance: + self.write_adjusting(False) + self.log.info('change side to %g', zero_pos) + self.drive_relative(zero_pos - self._motor.value) + return + # we are already at or beyond zero_pos + self.next_action(self.adjust) + return + self.write_adjusting(False) + self.status = 'BUSY', 'find %s side' % side_name + self.log.info('one turn to %g', self._motor.value + sign * 360) + self.drive_relative(sign * 360) + + def free(self, force, target): + """free from high force at other end""" + if self.motor_busy(): + return + if abs(force) > abs(self._previous_force) + self.tolerance: + self.stop() + self.status = 'ERROR', 'force increase while freeing' + self.log.error(self.status[1]) + return + if abs(force) < self.hysteresis: + self.next_action(self.find) + return if self.init_action(): + self._free_way = 0 + self.log.info('free from high force %g', force) self.write_adjusting(True) - self.status = 'BUSY', 'releasing force' - self.rel_drive(self.rel_pos() + step) + sign = math.copysign(1, target) + if self._free_way > (abs(self._previous_force) + self.hysteresis) * self.slope: + self.stop() + self.status = 'ERROR', 'freeing failed' + self.log.error(self.status[1]) + return + self._free_way += self.safe_step + self.drive_relative(sign * self.safe_step) + + def within_tolerance(self, force, target): + """within tolerance""" + if self.motor_busy(): + return + if abs(target - force) > self.tolerance: + self.next_action(self.adjust) + elif self.init_action(): + self.status = 'IDLE', 'within tolerance' def adjust(self, force, target): + """adjust force""" if self.motor_busy(): return if abs(target - force) < self.tolerance: - self.status = 'IDLE', '' - self.next_action(self.idle) - elif force > target: - if self._pid_factor < 0.2: - self.status = 'WARN', 'too may tries' - self.next_action(self.idle) - self._pid_factor *= 0.5 - self.next_action(self.release) + self._in_cnt += 1 + if self._in_cnt >= 3: + self.next_action(self.within_tolerance) + return else: - if self.init_action(): - self.write_adjusting(True) - self.status = 'BUSY', 'adjusting force' - step = (target - force) * self.slope * self.pid_i * self._pid_factor - self.rel_drive(self.rel_pos() + step) + self._in_cnt = 0 + if self.init_action(): + self._fail_cnt = 0 + self.write_adjusting(True) + self.status = 'BUSY', 'adjusting force' + elif not self._filtered: + return + else: + force_step = force - self._last_force + if self._expected_step: + # compare detected / expected step + q = force_step / self._expected_step + if q < 0.1: + self._fail_cnt += 1 + elif q > 0.5: + self._fail_cnt = max(0, self._fail_cnt - 1) + if self._fail_cnt >= 10: + if force < self.hysteresis: + self.log.warning('adjusting failed - try to find zero pos') + self.set_zero_pos(target, None) + self.next_action(self.find) + elif self._fail_cnt > 20: + self.stop() + self.status = 'ERROR', 'force seems not to change substantially' + self.log.error(self.status[1]) + return + self._last_force = force + force_step = (target - force) * self.pid_i + if abs(target - force) < self.tolerance * 0.5: + self._expected_step = 0 + return + self._expected_step = force_step + step = force_step * self.slope + self.drive_relative(step) - def idle(self): - pass + def idle(self, *args): + if self.init_action(): + self.write_adjusting(False) + if self.status[0] == 'BUSY': + self.status = 'IDLE', 'stopped' def read_value(self): try: - value = self._transducer.read_value() + force = self._transducer.read_value() self._cnt_rderr = max(0, self._cnt_rderr - 1) except Exception as e: self._cnt_rderr += 1 if self._cnt_rderr > 10: - self.terminate('ERROR', 'too many read errors: %s' % e) + self.stop() + self.status = 'ERROR', 'too many read errors: %s' % e + self.log.error(self.status[1]) return Done now = time.time() if self.motor_busy(): # do not filter while driving - self.value = value + self.value = force self.reset_filter() + self._filtered = False else: - self._sum += value + self._sum += force self._cnt += 1 if now < self._filter_start + self.filter_interval: return Done - self.value = self._sum / self._cnt + force = self._sum / self._cnt + self.value = force self.reset_filter(now) - if self.current_sign: - self.execute_action() + self._filtered = True + if abs(force) > self.limit + self.hysteresis: + self.status = 'ERROR', 'above max limit' + self.log.error(self.status[1]) + return Done + if self.zero_pos(force) is None and abs(force) > self.hysteresis and self._filtered: + self.set_zero_pos(force, self._motor.read_value()) + self.execute_action() return Done def write_target(self, target): + if abs(target) > self.limit: + raise BadValueError('force above limit') if abs(target - self.value) <= self.tolerance: if self.isBusy(): self.stop() - self.status = 'IDLE', 'already at target' - return target + self.next_action(self.within_tolerance) + else: + self.status = 'IDLE', 'already at target' + self.next_action(self.within_tolerance) + return target + self.log.info('new target %g', target) self._cnt_rderr = 0 self._cnt_wrerr = 0 - if self.target: - self.status = 'BUSY', 'changed target' - self.current_sign = math.copysign(1, self.target) - self.next_action(self.find) + self.status = 'BUSY', 'changed target' + self.next_action(self.find, False) return target - def terminate(self, *status): - self.stop() - self.status = status - print(status) - @Command() def stop(self): - self._driver = None + self._action = self.idle if self._motor.isBusy(): + self.log.info('stop motor') self._motor.stop() - self.status = 'IDLE', 'stopped' - self._filterd = True + self.next_action(self.idle) def write_force_offset(self, value): self.force_offset = value - # self.saveParameters() self._transducer.write_offset(value) return Done @@ -243,10 +371,11 @@ class Uniax(PersistentMixin, Drivable): mot = self._motor if value: mot_current = self.adjusting_current - mot.write_move_limit(self.adjusting_step) + mot.write_move_limit(self.safe_step) else: mot_current = self.safe_current mot.write_safe_current(mot_current) if abs(mot_current - mot.maxcurrent) > 0.01: # resolution of current: 2.8 / 250 + self.log.info('motor current %g', mot_current) mot.write_maxcurrent(mot_current) return value