# ***************************************************************************** # # This program is free software; you can redistribute it and/or modify it under # the terms of the GNU General Public License as published by the Free Software # Foundation; either version 2 of the License, or (at your option) any later # version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more # details. # # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # Module authors: # M. Zolliker # # ***************************************************************************** """use transducer and motor to adjust force""" import time import math from frappy.core import Drivable, Parameter, FloatRange, Done, \ Attached, Command, PersistentMixin, PersistentParam, BoolType from frappy.errors import BadValueError, SECoPError from frappy.lib.statemachine import Retry, StateMachine, Stop # TODO: to be tested, or better migrated to new state machine! class Error(SECoPError): pass class Uniax(PersistentMixin, Drivable): value = Parameter(unit='N') motor = Attached() transducer = Attached() limit = Parameter('abs limit of force', FloatRange(0, 190, unit='N'), readonly=False, default=150) tolerance = Parameter('force tolerance', FloatRange(0, 10, unit='N'), readonly=False, default=0.2) slope = PersistentParam('spring constant', FloatRange(unit='deg/N'), readonly=False, default=0.5, persistent='auto') pid_i = PersistentParam('integral', FloatRange(), readonly=False, default=0.5, persistent='auto') filter_interval = Parameter('filter time', FloatRange(0, 60, unit='s'), readonly=False, default=5) current_step = Parameter('', FloatRange(unit='deg'), default=0) force_offset = PersistentParam('transducer offset', FloatRange(unit='N'), readonly=False, value=0, persistent='auto') hysteresis = PersistentParam('force hysteresis', FloatRange(0, 190, unit='N'), readonly=False, default=5, persistent='auto') adjusting = Parameter('', BoolType(), readonly=False, value=False) adjusting_current = PersistentParam('current when adjusting force', FloatRange(0, 2.8, unit='A'), readonly=False, default=0.5, persistent='auto') safe_step = PersistentParam('max. motor step when adjusting force', FloatRange(0, unit='deg'), readonly=False, default=5, persistent='auto') safe_current = PersistentParam('current when moving far', FloatRange(0, 2.8, unit='A'), readonly=False, default=0.2, persistent='auto') low_pos = Parameter('max. position for positive forces', FloatRange(unit='deg'), readonly=False, needscfg=False) high_pos = Parameter('min. position for negative forces', FloatRange(unit='deg'), readonly=False, needscfg=False) substantial_force = Parameter('min. force change expected within motor play', FloatRange(), default=1) motor_play = Parameter('acceptable motor play within hysteresis', FloatRange(), readonly=False, default=10) motor_max_play = Parameter('acceptable motor play outside hysteresis', FloatRange(), readonly=False, default=90) timeout = Parameter('driving finishes when no progress within this delay', FloatRange(), readonly=False, default=300) pollinterval = 0.1 _mot_target = None # for detecting manual motor manipulations _filter_start = 0 _cnt = 0 _sum = 0 _cnt_rderr = 0 _cnt_wrerr = 0 _zero_pos_tol = None _state = None _force = None # raw force def earlyInit(self): super().earlyInit() self._zero_pos_tol = {} def initModule(self): super().initModule() self._state = StateMachine(logger=self.log, threaded=False, cleanup=self.cleanup) def doPoll(self): self.read_value() self._state.cycle() def drive_relative(self, step, ntry=3): """drive relative, try 3 times""" mot = self.motor mot.read_value() # make sure motor value is fresh if self.adjusting and abs(step) > self.safe_step: step = math.copysign(self.safe_step, step) self.current_step = step for _ in range(ntry): try: if abs(mot.value - mot.target) < mot.tolerance: # make sure rounding erros do not suppress small steps newpos = mot.target + step else: newpos = mot.value + step self._mot_target = self.motor.write_target(newpos) self._cnt_wrerr = max(0, self._cnt_wrerr - 1) return True except Exception as e: self.log.warning('drive error %s', e) self._cnt_wrerr += 1 if self._cnt_wrerr > 5: raise self.log.warning('motor reset') self.motor.reset() return False def motor_busy(self): mot = self.motor if mot.isBusy(): if self._mot_target is not None and mot.target != self._mot_target: raise Error('control stopped - motor moved directly') return True return False def motor_stop(self): self.motor.stop() self._mot_target = None def read_value(self): try: self._force = force = self.transducer.read_value() self._cnt_rderr = max(0, self._cnt_rderr - 1) except Exception as e: self._cnt_rderr += 1 if self._cnt_rderr > 10: self.stop() self.status = 'ERROR', 'too many read errors: %s' % e self.log.error(self.status[1]) self.read_target() return Done now = time.time() self._sum += force self._cnt += 1 if now < self._filter_start + self.filter_interval: return Done force = self._sum / self._cnt self.reset_filter(now) if abs(force) > self.limit + self.hysteresis: self.motor_stop() self.status = 'ERROR', 'above max limit' self.log.error(self.status[1]) self.read_target() return Done if self.zero_pos(force) is None and abs(force) > self.hysteresis: self.set_zero_pos(force, self.motor.read_value()) return force def reset_filter(self, now=0.0): self._sum = self._cnt = 0 self._filter_start = now or time.time() def zero_pos(self, value): """get high_pos or low_pos, depending on sign of value :param value: return an estimate for a good starting position """ name = 'high_pos' if value > 0 else 'low_pos' if name not in self._zero_pos_tol: return None return getattr(self, name) def set_zero_pos(self, force, pos): """set zero position high_pos or low_pos, depending on sign and value of force :param force: the force used for calculating zero_pos :param pos: the position used for calculating zero_pos """ name = 'high_pos' if force > 0 else 'low_pos' if pos is None: self._zero_pos_tol.pop(name, None) return None pos -= force * self.slope tol = (abs(force) - self.hysteresis) * self.slope * 0.2 if name in self._zero_pos_tol: old = self.zero_pos(force) if abs(old - pos) < self._zero_pos_tol[name] + tol: return self._zero_pos_tol[name] = tol self.log.info('set %s = %.1f +- %.1f (@%g N)' % (name, pos, tol, force)) setattr(self, name, pos) def cleanup(self, state): """in case of error, set error status""" if state.next_task: # stop or restart if not isinstance(state.next_task, Stop): return self.status = 'IDLE', 'stopped' self.log.warning('stopped') else: self.status = 'ERROR', str(state.last_error) if isinstance(state.last_error, Error): self.log.error('%s', state.last_error) else: self.log.error('%r raised in state %r', str(state.last_error), state.status_string) self.read_target() # make target invalid self.motor_stop() self.write_adjusting(False) def reset_progress(self, state): state.prev_force = self.value state.prev_pos = self.motor.value state.prev_time = time.time() def check_progress(self, state): force_step = self.target - self.value direction = math.copysign(1, force_step) try: force_progress = direction * (self.value - state.prev_force) except AttributeError: # prev_force undefined? self.reset_progress(state) return True if force_progress >= self.substantial_force: self.reset_progress(state) else: motor_dif = abs(self.motor.value - state.prev_pos) if motor_dif > self.motor_play: if motor_dif > self.motor_max_play: raise Error('force seems not to change substantially %g %g (%g %g)' % (self.value, self.motor.value, state.prev_force, state.prev_pos)) return False return True def adjust(self, state): """adjust force""" if state.init: state.phase = 0 # just initialized state.in_since = 0 state.direction = math.copysign(1, self.target - self.value) state.pid_fact = 1 if self.motor_busy(): return Retry self.value = self._force force_step = self.target - self.value if abs(force_step) < self.tolerance: if state.in_since == 0: state.in_since = state.now if state.now > state.in_since + 10: return self.within_tolerance else: if force_step * state.direction < 0: if state.pid_fact == 1: self.log.info('overshoot -> adjust with reduced pid_i') state.pid_fact = 0.1 state.in_since = 0 if state.phase == 0: state.phase = 1 self.reset_progress(state) self.write_adjusting(True) self.status = 'BUSY', 'adjusting force' elif not self.check_progress(state): if abs(self.value) < self.hysteresis: motor_dif = abs(self.motor.value - state.prev_pos) if motor_dif > self.motor_play: self.log.warning('adjusting failed - try to find zero pos') self.set_zero_pos(self.target, None) return self.find elif time.time() > state.prev_time + self.timeout: if state.phase == 1: state.phase = 2 self.log.warning('no substantial progress since %d sec', self.timeout) self.status = 'IDLE', 'adjusting timeout' self.drive_relative(force_step * self.slope * self.pid_i * min(1, state.delta()) * state.pid_fact) return Retry def within_tolerance(self, state): """within tolerance""" if state.init: self.status = 'IDLE', 'within tolerance' return Retry if self.motor_busy(): return Retry force_step = self.target - self.value if abs(force_step) < self.tolerance * 0.5: self.current_step = 0 else: self.check_progress(state) self.drive_relative(force_step * self.slope * self.pid_i * min(1, state.delta()) * 0.1) if abs(force_step) > self.tolerance: return self.out_of_tolerance return Retry def out_of_tolerance(self, state): """out of tolerance""" if state.init: self.status = 'WARN', 'out of tolerance' state.in_since = 0 return Retry if self.motor_busy(): return Retry force_step = self.target - self._force if abs(force_step) < self.tolerance: if state.in_since == 0: state.in_since = state.now if state.now > state.in_since + 10: return self.within_tolerance if abs(force_step) < self.tolerance * 0.5: return Retry self.check_progress(state) self.drive_relative(force_step * self.slope * self.pid_i * min(1, state.delta()) * 0.1) return Retry def find(self, state): """find active (engaged) range""" if state.init: state.prev_direction = 0 # find not yet started self.reset_progress(state) direction = math.copysign(1, self.target) self.value = self._force abs_force = self.value * direction if abs_force > self.hysteresis or abs_force > self.target * direction: if self.motor_busy(): self.log.info('motor stopped - substantial force detected: %g', self.value) self.motor_stop() elif state.prev_direction == 0: return self.adjust if abs_force > self.hysteresis: self.set_zero_pos(self.value, self.motor.read_value()) return self.adjust if abs_force < -self.hysteresis: state.force_before_free = self.value return self.free if self.motor_busy(): if direction == -state.prev_direction: # target direction changed self.motor_stop() state.init_find = True # restart find return Retry zero_pos = self.zero_pos(self.target) if state.prev_direction: # find already started if abs(self.motor.target - self.motor.value) > self.motor.tolerance: # no success on last find try, try short and strong step self.write_adjusting(True) self.log.info('one step to %g', self.motor.value + self.safe_step) self.drive_relative(direction * self.safe_step) return Retry else: state.prev_direction = math.copysign(1, self.target) side_name = 'negative' if direction == -1 else 'positive' if zero_pos is not None: self.status = 'BUSY', 'change to %s side' % side_name zero_pos += direction * (self.hysteresis * self.slope - self.motor.tolerance) if (self.motor.value - zero_pos) * direction < -self.motor.tolerance: self.write_adjusting(False) self.log.info('change side to %g', zero_pos) self.drive_relative(zero_pos - self.motor.value) return Retry # we are already at or beyond zero_pos return self.adjust self.write_adjusting(False) self.status = 'BUSY', 'find %s side' % side_name self.log.info('one turn to %g', self.motor.value + direction * 360) self.drive_relative(direction * 360) return Retry def free(self, state): """free from high force at other end""" if state.init: state.free_way = None self.reset_progress(state) if self.motor_busy(): return Retry self.value = self._force if abs(self.value) > abs(state.force_before_free) + self.hysteresis: raise Error('force increase while freeing') if abs(self.value) < self.hysteresis: return self.find if state.free_way is None: state.free_way = 0 self.log.info('free from high force %g', self.value) self.write_adjusting(True) direction = math.copysign(1, self.target) if state.free_way > abs(state.force_before_free + self.hysteresis) * self.slope + self.motor_max_play: raise Error('freeing failed') state.free_way += self.safe_step self.drive_relative(direction * self.safe_step) return Retry def write_target(self, target): if abs(target) > self.limit: raise BadValueError('force above limit') if abs(target - self.value) <= self.tolerance: if not self.isBusy(): self.status = 'IDLE', 'already at target' self._state.start(self.within_tolerance) return target self.log.info('new target %g', target) self._cnt_rderr = 0 self._cnt_wrerr = 0 self.status = 'BUSY', 'changed target' self.target = target if self.value * math.copysign(1, target) > self.hysteresis: self._state.start(self.adjust) else: self._state.start(self.find) return Done def read_target(self): if self._state.statefunc is None: if self.status[1]: raise Error(self.status[1]) raise Error('inactive') return self.target @Command() def stop(self): """stop motor and control""" if self.motor.isBusy(): self.log.info('stop motor') self.motor_stop() self.status = 'IDLE', 'stopped' self._state.stop() def write_force_offset(self, value): self.force_offset = value self.transducer.write_offset(value) return Done def write_adjusting(self, value): mot = self.motor if value: mot_current = self.adjusting_current mot.write_move_limit(self.safe_step) else: mot_current = self.safe_current mot.write_safe_current(mot_current) if abs(mot_current - mot.maxcurrent) > 0.01: # resolution of current: 2.8 / 250 self.log.info('motor current %g', mot_current) mot.write_maxcurrent(mot_current) return value