uniax, version 7.10.2021

- driving with a generator
- 3 phases
  1) find active range (low current, far movement until force over hysteresis)
  2) release force until well below target
  3) adjusting using pid_p. (this is in fact an integral factor)
This commit is contained in:
2021-10-08 08:53:22 +02:00
parent 2c6b42f2aa
commit 57082e276f
5 changed files with 211 additions and 84 deletions

View File

@ -19,7 +19,8 @@ uri = tcp://192.168.127.254:3002
standby_current=0.1 standby_current=0.1
maxcurrent=0.2 maxcurrent=0.2
acceleration=150. acceleration=150.
movelimit=360 move_limit=5
safe_current=0.2
speed=40 speed=40
encoder_tolerance=3.6 encoder_tolerance=3.6
free_wheeling=0.1 free_wheeling=0.1
@ -43,7 +44,7 @@ transducer = transducer
description = temperature on uniax stick description = temperature on uniax stick
class = secop_psi.ls340res.ResChannel class = secop_psi.ls340res.ResChannel
uri = tcp://192.168.127.254:3003 uri = tcp://192.168.127.254:3003
channel = C channel = A
[T] [T]
class = secop_psi.softcal.Sensor class = secop_psi.softcal.Sensor

View File

@ -495,15 +495,15 @@ class Module(HasAccessibles):
for pname in self.parameters: for pname in self.parameters:
errfunc = getattr(modobj, 'error_update_' + pname, None) errfunc = getattr(modobj, 'error_update_' + pname, None)
if errfunc: if errfunc:
def errcb(err, p=pname, efunc=errfunc): def errcb(err, p=pname, m=modobj, efunc=errfunc):
try: try:
efunc(err) efunc(err)
except Exception as e: except Exception as e:
modobj.announceUpdate(p, err=e) m.announceUpdate(p, err=e)
self.errorCallbacks[pname].append(errcb) self.errorCallbacks[pname].append(errcb)
else: else:
def errcb(err, p=pname): def errcb(err, p=pname, m=modobj):
modobj.announceUpdate(p, err=err) m.announceUpdate(p, err=err)
if pname in autoupdate: if pname in autoupdate:
self.errorCallbacks[pname].append(errcb) self.errorCallbacks[pname].append(errcb)
@ -516,8 +516,8 @@ class Module(HasAccessibles):
efunc(e) efunc(e)
self.valueCallbacks[pname].append(cb) self.valueCallbacks[pname].append(cb)
elif pname in autoupdate: elif pname in autoupdate:
def cb(value, p=pname): def cb(value, p=pname, m=modobj):
modobj.announceUpdate(p, value) m.announceUpdate(p, value)
self.valueCallbacks[pname].append(cb) self.valueCallbacks[pname].append(cb)
def isBusy(self, status=None): def isBusy(self, status=None):
@ -642,7 +642,7 @@ class Writable(Readable):
"""basic writable module""" """basic writable module"""
target = Parameter('target value of the module', target = Parameter('target value of the module',
default=0, readonly=False, datatype=FloatRange()) default=0, readonly=False, datatype=FloatRange(unit='$'))
class Drivable(Writable): class Drivable(Writable):

View File

@ -56,12 +56,14 @@ import os
import json import json
from secop.lib import getGeneralConfig from secop.lib import getGeneralConfig
from secop.datatypes import EnumType
from secop.params import Parameter, Property, BoolType, Command from secop.params import Parameter, Property, BoolType, Command
from secop.modules import HasAccessibles from secop.modules import HasAccessibles
class PersistentParam(Parameter): class PersistentParam(Parameter):
persistent = Property('persistence flag', BoolType(), default=True) persistent = Property('persistence flag (auto means: save automatically on any change)',
EnumType(off=0, on=1, auto=2), default=1)
class PersistentMixin(HasAccessibles): class PersistentMixin(HasAccessibles):
@ -73,8 +75,12 @@ class PersistentMixin(HasAccessibles):
self.initData = {} self.initData = {}
for pname in self.parameters: for pname in self.parameters:
pobj = self.parameters[pname] pobj = self.parameters[pname]
if not pobj.readonly and getattr(pobj, 'persistent', False): if not pobj.readonly and getattr(pobj, 'persistent', 0):
self.initData[pname] = pobj.value self.initData[pname] = pobj.value
if pobj.persistent == 'auto':
def cb(value, m=self):
m.saveParameters()
self.valueCallbacks[pname].append(cb)
self.writeDict.update(self.loadParameters(write=False)) self.writeDict.update(self.loadParameters(write=False))
def loadParameters(self, write=True): def loadParameters(self, write=True):

View File

@ -85,8 +85,10 @@ class Motor(PersistentMixin, HasIodev, Drivable):
1, ANGLE_SCALE, readonly=True, initwrite=False) 1, ANGLE_SCALE, readonly=True, initwrite=False)
target = Parameter('_', FloatRange(unit='$'), default=0) target = Parameter('_', FloatRange(unit='$'), default=0)
movelimit = Parameter('max. angle to drive in one go', FloatRange(unit='$'), move_limit = Parameter('max. angle to drive in one go when current above safe_current', FloatRange(unit='$'),
readonly=False, default=360, group='more') readonly=False, default=5, group='more')
safe_current = Parameter('motor current allowed for big steps', FloatRange(unit='A'),
readonly=False, default=0.2, group='more')
tolerance = Parameter('positioning tolerance', FloatRange(unit='$'), tolerance = Parameter('positioning tolerance', FloatRange(unit='$'),
readonly=False, default=0.9) readonly=False, default=0.9)
encoder_tolerance = HwParam('the allowed deviation between steppos and encoder\n\nmust be > tolerance', encoder_tolerance = HwParam('the allowed deviation between steppos and encoder\n\nmust be > tolerance',
@ -262,8 +264,9 @@ class Motor(PersistentMixin, HasIodev, Drivable):
def write_target(self, target): def write_target(self, target):
self.read_value() # make sure encoder and steppos are fresh self.read_value() # make sure encoder and steppos are fresh
if abs(target - self.encoder) > self.movelimit: if self.maxcurrent >= self.safe_current + CURRENT_SCALE and (
raise BadValueError('can not move more than %s deg' % self.movelimit) abs(target - self.encoder) > self.move_limit + self.tolerance):
raise BadValueError('can not move more than %s deg %g %g' % (self.move_limit, self.encoder, target))
diff = self.encoder - self.steppos diff = self.encoder - self.steppos
if self._need_reset: if self._need_reset:
raise HardwareError('need reset (%s)' % self.status[1]) raise HardwareError('need reset (%s)' % self.status[1])
@ -274,7 +277,7 @@ class Motor(PersistentMixin, HasIodev, Drivable):
raise HardwareError('need reset (encoder does not match internal pos)') raise HardwareError('need reset (encoder does not match internal pos)')
self.set('steppos', self.encoder - self.zero) self.set('steppos', self.encoder - self.zero)
self._started = time.time() self._started = time.time()
self.log.info('move to %.1f', target) self.log.debug('move to %.1f', target)
self.comm(MOVE, 0, (target - self.zero) / ANGLE_SCALE) self.comm(MOVE, 0, (target - self.zero) / ANGLE_SCALE)
self.status = self.Status.BUSY, 'changed target' self.status = self.Status.BUSY, 'changed target'
return target return target

View File

@ -22,94 +22,211 @@
"""use transducer and motor to adjust force""" """use transducer and motor to adjust force"""
import time import time
from secop.core import Drivable, Parameter, FloatRange, Done, Attached, Command from secop.lib import clamp
from secop.core import Drivable, Parameter, FloatRange, EnumType, Done, \
Attached, Command, PersistentMixin, PersistentParam, BoolType
class Uniax(Drivable): class Uniax(PersistentMixin, Drivable):
value = Parameter(unit='N')
motor = Attached() motor = Attached()
transducer = Attached() transducer = Attached()
step = Parameter('maximum motor step', FloatRange(unit='deg'), default=5, readonly=False) tolerance = Parameter('force tolerance', FloatRange(), readonly=False, default=0.1)
fine_step = Parameter('motor step for fine adjustment', FloatRange(unit='deg'), default=1, readonly=False) pid_p = PersistentParam('proportial term', FloatRange(unit='deg/N'), readonly=False, default=0.25, persistent='auto')
tolerance = Parameter('force tolerance', FloatRange(), readonly=False, default=1)
filter_interval = Parameter('filter time', FloatRange(0, 60, unit='s'), readonly=False, default=1) filter_interval = Parameter('filter time', FloatRange(0, 60, unit='s'), readonly=False, default=1)
span = Parameter('difference between max and min', FloatRange(), needscfg=False) current_sign = Parameter('', EnumType(negative=-1, undefined=0, positive=1), default=0)
current_step = Parameter('', FloatRange(), default=0)
force_offset = PersistentParam('transducer offset', FloatRange(), readonly=False, default=0, initwrite=True, persistent='auto')
hysteresis = PersistentParam('force hysteresis', FloatRange(0), readonly=False, default=5, persistent='auto')
release_step = PersistentParam('step when releasing force', FloatRange(0), readonly=False, default=20, persistent='auto')
adjusting = Parameter('', BoolType(), readonly=False, default=False)
adjusting_current = PersistentParam('current when adjusting force', FloatRange(0, 2.8, unit='A'), readonly=False, default=0.5, persistent='auto')
adjusting_step = PersistentParam('max. motor step when adjusting force', FloatRange(0, unit='deg'), readonly=False, default=5, persistent='auto')
safe_current = PersistentParam('current when moving far', FloatRange(0, 2.8, unit='A'), readonly=False, default=0.2, persistent='auto')
low_pos = PersistentParam('max. position for positive forces', FloatRange(), readonly=False, default=0)
high_pos = PersistentParam('min. position for negative forces', FloatRange(), readonly=False, default=0)
pollinterval = 0.1 pollinterval = 0.1
fast_pollfactor = 1 fast_pollfactor = 1
_target = None _driver = None # whe defined a gerenator to be called for driving
_mot_target = None _target = None # freshly set target
_direction = 0 _mot_target = None # for detecting manual motor manipulations
_last_era = 0 _filter_start = 0
_cnt = 0 _cnt = 0
_sum = 0 _sum = 0
_min = None _cnt_rderr = 0
_max = None _cnt_wrerr = 0
_last_min = None
_last_max = None def pos_range(self, target_sign, value=None):
attr = 'high_pos' if target_sign > 0 else 'low_pos'
if value is not None:
value -= self.release_step * target_sign
setattr(self, attr, value)
return getattr(self, attr)
def drive_to(self, pos):
mot = self._motor
self.current_step = pos - mot.value
for itry in range(3):
try:
print('drive by %.2f' % self.current_step)
self._mot_target = self._motor.write_target(pos)
self._cnt_wrerr = max(0, self._cnt_wrerr - 1)
break
except Exception as e:
print('driver_to', e)
self._cnt_wrerr += 1
if self._cnt_wrerr > 5:
raise
print('RESET')
self._motor.reset()
def drive_generator(self, target):
self._mot_target = self._motor.target
mot = self._motor
target_sign = -1 if target < 0 else 1
force = self.value
# TODO: do this only when abs(target) > hysteresis?
if target_sign != self.current_sign and force * target_sign < self.hysteresis:
self.write_adjusting(False)
pos_lim = None
while target_sign * (target_sign * self.hysteresis - force) > self.tolerance:
self.status = 'BUSY', 'find active motor range'
# we have to increase abs(force)
if not mot.isBusy() or target_sign * (mot.target - mot.value) < 0:
self.drive_to(mot.value + 360 * target_sign)
force = yield
if target_sign * force < self.hysteresis:
pos_lim = mot.value
print('found active range')
while mot.isBusy():
mot.stop()
force = yield
print('stopped')
if target_sign * (force - self.hysteresis) > 0:
self.current_sign = target_sign
elif pos_lim is not None:
self.pos_range(target_sign, pos_lim)
if abs(force) < self.hysteresis and self.low_pos < mot.value < self.high_pos:
self.current_sign = 0
if target_sign * (self.value - target) > self.tolerance:
self.status = 'BUSY', 'release force'
self.write_adjusting(True)
# abs(force) too high
while target_sign * (target - self.value) < self.hysteresis and self.value * target_sign > 0:
self.drive_to(mot.value - target_sign * min(self.adjusting_step, self.release_step))
while mot.isBusy():
self.reset_filter()
yield
if target_sign * (force + self.hysteresis) < 0:
print('force', force)
self.terminate('ERROR', 'force too high when moving back')
return
print('released', self.value, target)
self.write_adjusting(True)
force = yield
while target_sign * (target - self.value) > self.tolerance:
self.status = 'BUSY', 'adjusting force'
# slowly increasing abs(force)
step = self.pid_p * (target - self.value)
if abs(step) > self.adjusting_step:
step = target_sign * self.adjusting_step
self.drive_to(mot.value + step)
while mot.isBusy():
self.reset_filter()
yield
self.write_adjusting(False)
self.status = 'IDLE', 'reached target'
return
def reset_filter(self, now=0):
self._sum = self._cnt = 0
self._filter_start = now or time.time()
def read_value(self): def read_value(self):
force = self._transducer if self._target is not None:
self._driver = self.drive_generator(self._target)
self._driver.send(None) # start
self._target = None
mot = self._motor mot = self._motor
era = int(time.time() / self.filter_interval) try:
newera = era != self._last_era value = self._transducer.read_value()
value = force.read_value() self._cnt_rderr = max(0, self._cnt_rderr - 1)
if newera: except Exception as e:
self._last_era = era self._cnt_rderr += 1
if self._min is None: if self._cnt_rderr > 10:
self._min = self._max = self._last_max = self._last_min = value self.terminate('ERROR', 'too many read errors: %s' % e)
if self._cnt > 0: return Done
self.value = self._sum / self._cnt
self._sum = self._cnt = 0 now = time.time()
self.span = self._max - self._min newvalue = False
self._last_min = self._min if self._cnt > 0 and now > self._filter_start + self.filter_interval:
self._last_max = self._max self.value = self._sum / self._cnt
self._min = self._max = value self.reset_filter(now)
self._min = min(self._min, value) newvalue = True
self._max = max(self._max, value)
self._sum += value self._sum += value
self._cnt += 1 self._cnt += 1
high_value = max(self._max, self._last_max)
low_value = min(self._min, self._last_min) if self._driver:
if self._target is None: try:
if mot.isBusy(): if self._mot_target != mot.target and mot.isBusy():
self.status = self.Status.IDLE, 'stopping' print('mottarget', mot.target, self._mot_target)
else: self.terminate('ERROR', 'stopped due to direct motor manipulation')
self.status = self.Status.IDLE, '' return Done
return Done if self.adjusting:
step = 0 if newvalue:
if self._direction > 0: print(' %.2f' % self.value)
if high_value < self._target - self.tolerance: # next step only when a new filtered value is available
step = self.step self._driver.send(self.value)
else: else:
if low_value > self._target + self.tolerance: print(' %.2f' % value)
step = -self.step self._driver.send(value)
if not step: except StopIteration:
if self._direction * (self._target - value) < self.tolerance: self._driver = None
self.stop() except Exception as e:
self.status = self.Status.IDLE, 'target reached' self.terminate('ERROR', str(e))
return Done self._driver = None
if newera:
step = self.fine_step * self._direction
if step and not mot.isBusy():
mot_target = mot.value + step
self._mot_target = mot.write_target(mot_target)
return Done return Done
def write_target(self, target): def write_target(self, target):
if abs(target - self.value) <= self.tolerance:
if self.isBusy():
self.stop()
self.status = 'IDLE', 'already at target'
return target
self._cnt_rderr = 0
self._cnt_wrerr = 0
self.status = 'BUSY', 'changed target'
self._target = target self._target = target
if target - self.value > 0:
self._direction = 1
else:
self._direction = -1
if self._motor.status[0] == self.Status.ERROR:
self._motor.reset()
self.status = self.Status.BUSY, 'changed target'
self._mot_target = self._motor.target
self.read_value()
return target return target
def terminate(self, *status):
self.stop()
self.status = status
print(status)
@Command() @Command()
def stop(self): def stop(self):
self._target = None self._driver = None
self._motor.stop() if self._motor.isBusy():
self.status = self.Status.IDLE, 'stopped' self._motor.stop()
self.status = 'IDLE', 'stopped'
self._filterd = True
def write_force_offset(self, value):
self.force_offset = value
# self.saveParameters()
self._transducer.write_offset(value)
return Done
def write_adjusting(self, value):
mot = self._motor
if value:
mot_current = self.adjusting_current
mot.write_move_limit(self.adjusting_step)
else:
mot_current = self.safe_current
mot.write_safe_current(mot_current)
if abs(mot_current - mot.maxcurrent) > 0.01: # resolution of current: 2.8 / 250
mot.write_maxcurrent(mot_current)
return value