# -*- coding: utf-8 -*- # ***************************************************************************** # # This program is free software; you can redistribute it and/or modify it under # the terms of the GNU General Public License as published by the Free Software # Foundation; either version 2 of the License, or (at your option) any later # version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more # details. # # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # Module authors: # M. Zolliker # # ***************************************************************************** """use transducer and motor to adjust force""" import time import math from secop.core import Drivable, Parameter, FloatRange, EnumType, Done, \ Attached, Command, PersistentMixin, PersistentParam, BoolType class Uniax(PersistentMixin, Drivable): value = Parameter(unit='N') motor = Attached() transducer = Attached() tolerance = Parameter('force tolerance', FloatRange(), readonly=False, default=0.1) slope = PersistentParam('spring constant', FloatRange(unit='deg/N'), readonly=False, default=0.5, persistent='auto') pid_i = PersistentParam('integral', FloatRange(), readonly=False, default=0.5, persistent='auto') filter_interval = Parameter('filter time', FloatRange(0, 60, unit='s'), readonly=False, default=1) current_sign = Parameter('', EnumType(negative=-1, undefined=0, positive=1), default=0) current_step = Parameter('', FloatRange(), default=0) force_offset = PersistentParam('transducer offset', FloatRange(), readonly=False, default=0, initwrite=True, persistent='auto') hysteresis = PersistentParam('force hysteresis', FloatRange(0), readonly=False, default=5, persistent='auto') release_step = PersistentParam('step when releasing force', FloatRange(0), readonly=False, default=20, persistent='auto') adjusting = Parameter('', BoolType(), readonly=False, default=False) adjusting_current = PersistentParam('current when adjusting force', FloatRange(0, 2.8, unit='A'), readonly=False, default=0.5, persistent='auto') adjusting_step = PersistentParam('max. motor step when adjusting force', FloatRange(0, unit='deg'), readonly=False, default=5, persistent='auto') safe_current = PersistentParam('current when moving far', FloatRange(0, 2.8, unit='A'), readonly=False, default=0.2, persistent='auto') low_pos = PersistentParam('max. position for positive forces', FloatRange(), readonly=False, default=0) high_pos = PersistentParam('min. position for negative forces', FloatRange(), readonly=False, default=0) pollinterval = 0.1 fast_pollfactor = 1 _driver = None # whe defined a gerenator to be called for driving _target = None # freshly set target _mot_target = None # for detecting manual motor manipulations _filter_start = 0 _cnt = 0 _sum = 0 _cnt_rderr = 0 _cnt_wrerr = 0 def rel_drive(self, pos): """drive relative to high_pos / low_pos, with current_sign""" mot = self._motor step = (pos - self.rel_pos()) * self.current_sign self.current_step = step for _ in range(3): try: print('drive by %.2f' % self.current_step) self._mot_target = self._motor.write_target(mot.value + step) self._cnt_wrerr = max(0, self._cnt_wrerr - 1) break except Exception as e: print('drive error %s ' % e) self._cnt_wrerr += 1 if self._cnt_wrerr > 5: raise print('motor reset') self._motor.reset() def rel_pos(self): if self.current_sign < 0: return self.low_pos - self._motor.value return self._motor.value - self.high_pos def reset_filter(self, now=0.0): self._sum = self._cnt = 0 self._filter_start = now or time.time() def motor_busy(self): mot = self._motor if mot.isBusy(): if mot.target != self._mot_target: self.action = None return True return False def next_action(self, action, do_now=True): """call next action :param action: function to be called next time :param do_now: do next action in the same cycle """ self._action = action self._init_action = True if do_now: self._next_cycle = False def init_action(self): """return true when called the first time after next_action""" if self._init_action: self._init_action = False return True return False def execute_action(self): for _ in range(5): # limit number of subsequent actions in one cycle self._next_cycle = True self._action(self.value * self.current_sign, self.target * self.current_sign) if self._next_cycle: break def find(self, force, target): if force > self.hysteresis: if self.motor_busy(): self.stop() return if self.current_sign > 0: self.high_pos += self.rel_pos() - self.slope * force else: self.low_pos -= self.rel_pos() - self.slope * force self._pid_factor = 1 self.next_action(self.release, True) return if self.init_action(): self.write_adjusting(False) self.status = 'BUSY', 'find active motor range' if self.rel_pos() < 0: self.rel_drive((2 * self.hysteresis) * self.slope) else: self.rel_drive(self.rel_pos() + 360) def release(self, force, target): if self.motor_busy(): return if force < target + self.tolerance: self.next_action(self.adjust, True) return step = (target - self.release_step - force) * self.slope * self.pid_i if self.init_action(): self.write_adjusting(True) self.status = 'BUSY', 'releasing force' self.rel_drive(self.rel_pos() + step) def adjust(self, force, target): if self.motor_busy(): return if abs(target - force) < self.tolerance: self.status = 'IDLE', '' self.next_action(self.idle) elif force > target: if self._pid_factor < 0.2: self.status = 'WARN', 'too may tries' self.next_action(self.idle) self._pid_factor *= 0.5 self.next_action(self.release) else: if self.init_action(): self.write_adjusting(True) self.status = 'BUSY', 'adjusting force' step = (target - force) * self.slope * self.pid_i * self._pid_factor self.rel_drive(self.rel_pos() + step) def idle(self): pass def read_value(self): try: value = self._transducer.read_value() self._cnt_rderr = max(0, self._cnt_rderr - 1) except Exception as e: self._cnt_rderr += 1 if self._cnt_rderr > 10: self.terminate('ERROR', 'too many read errors: %s' % e) return Done now = time.time() if self.motor_busy(): # do not filter while driving self.value = value self.reset_filter() else: self._sum += value self._cnt += 1 if now < self._filter_start + self.filter_interval: return Done self.value = self._sum / self._cnt self.reset_filter(now) if self.current_sign: self.execute_action() return Done def write_target(self, target): if abs(target - self.value) <= self.tolerance: if self.isBusy(): self.stop() self.status = 'IDLE', 'already at target' return target self._cnt_rderr = 0 self._cnt_wrerr = 0 if self.target: self.status = 'BUSY', 'changed target' self.current_sign = math.copysign(1, self.target) self.next_action(self.find) return target def terminate(self, *status): self.stop() self.status = status print(status) @Command() def stop(self): self._driver = None if self._motor.isBusy(): self._motor.stop() self.status = 'IDLE', 'stopped' self._filterd = True def write_force_offset(self, value): self.force_offset = value # self.saveParameters() self._transducer.write_offset(value) return Done def write_adjusting(self, value): mot = self._motor if value: mot_current = self.adjusting_current mot.write_move_limit(self.adjusting_step) else: mot_current = self.safe_current mot.write_safe_current(mot_current) if abs(mot_current - mot.maxcurrent) > 0.01: # resolution of current: 2.8 / 250 mot.write_maxcurrent(mot_current) return value