- implement positioner as an io class - proper shut down behaviour Change-Id: If04176f779809fd5b08f586556cac668cf188479
707 lines
25 KiB
Python
707 lines
25 KiB
Python
# *****************************************************************************
|
|
# This program is free software; you can redistribute it and/or modify it under
|
|
# the terms of the GNU General Public License as published by the Free Software
|
|
# Foundation; either version 2 of the License, or (at your option) any later
|
|
# version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful, but WITHOUT
|
|
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
|
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
|
|
# details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License along with
|
|
# this program; if not, write to the Free Software Foundation, Inc.,
|
|
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
|
#
|
|
# Module authors:
|
|
# Markus Zolliker <markus.zolliker@psi.ch>
|
|
# *****************************************************************************
|
|
|
|
import time
|
|
import threading
|
|
from frappy.core import Drivable, Parameter, Command, Property, Module, HasIO, \
|
|
ERROR, WARN, BUSY, IDLE, nopoll, Limit
|
|
from frappy.datatypes import IntRange, FloatRange, StringType, BoolType
|
|
from frappy.errors import BadValueError, HardwareError, ConfigError
|
|
from PyANC350v4 import Positioner
|
|
|
|
|
|
class IO(Module):
|
|
"""'communication' module for attocube controller
|
|
|
|
why an extra class:
|
|
- HasIO assures that a single common communicator is used
|
|
- access must be thread safe
|
|
"""
|
|
uri = Property('dummy uri, only one controller may exists',
|
|
StringType())
|
|
export = False
|
|
_hw = None
|
|
_lock = None
|
|
used_axes = set()
|
|
|
|
def initModule(self):
|
|
if self._hw is None:
|
|
IO._lock = threading.Lock()
|
|
IO._hw = Positioner()
|
|
super().initModule()
|
|
|
|
def shutdownModule(self):
|
|
if IO._hw:
|
|
IO._hw.disconnect()
|
|
IO._hw = None
|
|
|
|
def configureAQuadBIn(self, axisNo, enable, resolution):
|
|
"""Enables and configures the A-Quad-B (quadrature) input for the target position.
|
|
Parameters
|
|
axisNo Axis number (0 ... 2)
|
|
enable Enable (1) or disable (0) A-Quad-B input
|
|
resolution A-Quad-B step width in m. Internal resolution is 1 nm.
|
|
Returns
|
|
None
|
|
"""
|
|
with self._lock:
|
|
return self._hw.configureAQuadBIn(axisNo, enable, resolution)
|
|
|
|
def configureAQuadBOut(self, axisNo, enable, resolution, clock):
|
|
"""Enables and configures the A-Quad-B output of the current position.
|
|
|
|
Parameters
|
|
axisNo Axis number (0 ... 2)
|
|
enable Enable (1) or disable (0) A-Quad-B output
|
|
resolution A-Quad-B step width in m; internal resolution is 1 nm
|
|
clock Clock of the A-Quad-B output [s]. Allowed range is 40ns ... 1.3ms; internal resulution is 20ns.
|
|
Returns
|
|
None
|
|
"""
|
|
with self._lock:
|
|
return self._hw.configureAQuadBOut(axisNo, enable, resolution, clock)
|
|
|
|
def configureExtTrigger(self, axisNo, mode):
|
|
"""Enables the input trigger for steps.
|
|
|
|
Parameters
|
|
axisNo Axis number (0 ... 2)
|
|
mode Disable (0), Quadratur (1), Trigger(2) for external triggering
|
|
Returns
|
|
None
|
|
"""
|
|
with self._lock:
|
|
return self._hw.configureExtTrigger(axisNo, mode)
|
|
|
|
def configureNslTriggerAxis(self, axisNo):
|
|
"""Selects Axis for NSL Trigger.
|
|
|
|
Parameters
|
|
axisNo Axis number (0 ... 2)
|
|
Returns
|
|
None
|
|
"""
|
|
with self._lock:
|
|
return self._hw.configureNslTriggerAxis(axisNo)
|
|
|
|
def configureRngTrigger(self, axisNo, lower, upper):
|
|
"""Configure lower position for range Trigger.
|
|
|
|
Parameters
|
|
axisNo Axis number (0 ... 2)
|
|
lower Lower position for range trigger (nm)
|
|
upper Upper position for range trigger (nm)
|
|
Returns
|
|
None
|
|
"""
|
|
with self._lock:
|
|
return self._hw.configureRngTrigger(axisNo, lower, upper)
|
|
|
|
def configureRngTriggerEps(self, axisNo, epsilon):
|
|
"""Configure hysteresis for range Trigger.
|
|
|
|
Parameters
|
|
axisNo Axis number (0 ... 2)
|
|
epsilon hysteresis in nm / mdeg
|
|
Returns
|
|
None
|
|
"""
|
|
with self._lock:
|
|
return self._hw.configureRngTriggerEps(axisNo, epsilon)
|
|
|
|
def configureRngTriggerPol(self, axisNo, polarity):
|
|
"""Configure lower position for range Trigger.
|
|
|
|
Parameters
|
|
axisNo Axis number (0 ... 2)
|
|
polarity Polarity of trigger signal when position is between lower and upper Low(0) and High(1)
|
|
Returns
|
|
None
|
|
"""
|
|
with self._lock:
|
|
return self._hw.configureRngTriggerPol(axisNo, polarity)
|
|
|
|
def getActuatorName(self, axisNo):
|
|
"""Get the name of the currently selected actuator
|
|
|
|
Parameters
|
|
axisNo Axis number (0 ... 2)
|
|
Returns
|
|
name Name of the actuator
|
|
"""
|
|
with self._lock:
|
|
return self._hw.getActuatorName(axisNo)
|
|
|
|
def getActuatorType(self, axisNo):
|
|
"""Get the type of the currently selected actuator
|
|
|
|
Parameters
|
|
axisNo Axis number (0 ... 2)
|
|
Returns
|
|
type_ Type of the actuator {0: linear, 1: goniometer, 2: rotator}
|
|
"""
|
|
with self._lock:
|
|
return self._hw.getActuatorType(axisNo)
|
|
|
|
def getAmplitude(self, axisNo):
|
|
"""Reads back the amplitude parameter of an axis.
|
|
|
|
Parameters
|
|
axisNo Axis number (0 ... 2)
|
|
Returns
|
|
amplitude Amplitude V
|
|
"""
|
|
with self._lock:
|
|
return self._hw.getAmplitude(axisNo)
|
|
|
|
def getAxisStatus(self, axisNo):
|
|
"""Reads status information about an axis of the device.
|
|
|
|
Parameters
|
|
axisNo Axis number (0 ... 2)
|
|
Returns
|
|
connected Output: If the axis is connected to a sensor.
|
|
enabled Output: If the axis voltage output is enabled.
|
|
moving Output: If the axis is moving.
|
|
target Output: If the target is reached in automatic positioning
|
|
eotFwd Output: If end of travel detected in forward direction.
|
|
eotBwd Output: If end of travel detected in backward direction.
|
|
error Output: If the axis' sensor is in error state.
|
|
"""
|
|
with self._lock:
|
|
return self._hw.getAxisStatus(axisNo)
|
|
|
|
def getFrequency(self, axisNo):
|
|
"""Reads back the frequency parameter of an axis.
|
|
|
|
Parameters
|
|
axisNo Axis number (0 ... 2)
|
|
Returns
|
|
frequency Output: Frequency in Hz
|
|
"""
|
|
with self._lock:
|
|
return self._hw.getFrequency(axisNo)
|
|
|
|
def getPosition(self, axisNo):
|
|
"""Retrieves the current actuator position. For linear type actuators the position unit is m; for goniometers and rotators it is degree.
|
|
|
|
Parameters
|
|
axisNo Axis number (0 ... 2)
|
|
Returns
|
|
position Output: Current position [m] or [°]
|
|
"""
|
|
with self._lock:
|
|
return self._hw.getPosition(axisNo)
|
|
|
|
def measureCapacitance(self, axisNo):
|
|
"""Performs a measurement of the capacitance of the piezo motor and returns the result. If no motor is connected, the result will be 0. The function doesn't return before the measurement is complete; this will take a few seconds of time.
|
|
|
|
Parameters
|
|
axisNo Axis number (0 ... 2)
|
|
Returns
|
|
cap Output: Capacitance [F]
|
|
"""
|
|
with self._lock:
|
|
return self._hw.measureCapacitance(axisNo)
|
|
|
|
def selectActuator(self, axisNo, actuator):
|
|
"""Selects the actuator to be used for the axis from actuator presets.
|
|
|
|
Parameters
|
|
axisNo Axis number (0 ... 2)
|
|
actuator Actuator selection (0 ... 255)
|
|
0: ANPg101res
|
|
1: ANGt101res
|
|
2: ANPx51res
|
|
3: ANPx101res
|
|
4: ANPx121res
|
|
5: ANPx122res
|
|
6: ANPz51res
|
|
7: ANPz101res
|
|
8: ANR50res
|
|
9: ANR51res
|
|
10: ANR101res
|
|
11: Test
|
|
Returns
|
|
None
|
|
"""
|
|
with self._lock:
|
|
return self._hw.selectActuator(axisNo, actuator)
|
|
|
|
def setAmplitude(self, axisNo, amplitude):
|
|
"""Sets the amplitude parameter for an axis
|
|
|
|
Parameters
|
|
axisNo Axis number (0 ... 2)
|
|
amplitude Amplitude in V, internal resolution is 1 mV
|
|
Returns
|
|
None
|
|
"""
|
|
with self._lock:
|
|
return self._hw.setAmplitude(axisNo, amplitude)
|
|
|
|
def setAxisOutput(self, axisNo, enable, autoDisable):
|
|
"""Enables or disables the voltage output of an axis.
|
|
|
|
Parameters
|
|
axisNo Axis number (0 ... 2)
|
|
enable Enables (1) or disables (0) the voltage output.
|
|
autoDisable If the voltage output is to be deactivated automatically when end of travel is detected.
|
|
Returns
|
|
None
|
|
"""
|
|
with self._lock:
|
|
return self._hw.setAxisOutput(axisNo, enable, autoDisable)
|
|
|
|
def setDcVoltage(self, axisNo, voltage):
|
|
"""Sets the DC level on the voltage output when no sawtooth based motion is active.
|
|
|
|
Parameters
|
|
axisNo Axis number (0 ... 2)
|
|
voltage DC output voltage [V], internal resolution is 1 mV
|
|
Returns
|
|
None
|
|
"""
|
|
with self._lock:
|
|
return self._hw.setDcVoltage(axisNo, voltage)
|
|
|
|
def setFrequency(self, axisNo, frequency):
|
|
"""Sets the frequency parameter for an axis
|
|
|
|
Parameters
|
|
axisNo Axis number (0 ... 2)
|
|
frequency Frequency in Hz, internal resolution is 1 Hz
|
|
Returns
|
|
None
|
|
"""
|
|
with self._lock:
|
|
return self._hw.setFrequency(axisNo, frequency)
|
|
|
|
def setTargetPosition(self, axisNo, target):
|
|
"""Sets the target position for automatic motion, see ANC_startAutoMove. For linear type actuators the position unit is m, for goniometers and rotators it is degree.
|
|
|
|
Parameters
|
|
axisNo Axis number (0 ... 2)
|
|
target Target position [m] or [°]. Internal resulution is 1 nm or 1 µ°.
|
|
Returns
|
|
None
|
|
"""
|
|
with self._lock:
|
|
return self._hw.setTargetPosition(axisNo, target)
|
|
|
|
def setTargetRange(self, axisNo, targetRg):
|
|
"""Defines the range around the target position where the target is considered to be reached.
|
|
|
|
Parameters
|
|
axisNo Axis number (0 ... 2)
|
|
targetRg Target range [m] or [°]. Internal resulution is 1 nm or 1 µ°.
|
|
Returns
|
|
None
|
|
"""
|
|
with self._lock:
|
|
return self._hw.setTargetRange(axisNo, targetRg)
|
|
|
|
def startAutoMove(self, axisNo, enable, relative):
|
|
"""Switches automatic moving (i.e. following the target position) on or off
|
|
|
|
Parameters
|
|
axisNo Axis number (0 ... 2)
|
|
enable Enables (1) or disables (0) automatic motion
|
|
relative If the target position is to be interpreted absolute (0) or relative to the current position (1)
|
|
Returns
|
|
None
|
|
"""
|
|
with self._lock:
|
|
return self._hw.startAutoMove(axisNo, enable, relative)
|
|
|
|
def startContinuousMove(self, axisNo, start, backward):
|
|
"""Starts or stops continous motion in forward direction. Other kinds of motions are stopped.
|
|
|
|
Parameters
|
|
axisNo Axis number (0 ... 2)
|
|
start Starts (1) or stops (0) the motion
|
|
backward If the move direction is forward (0) or backward (1)
|
|
Returns
|
|
None
|
|
"""
|
|
with self._lock:
|
|
return self._hw.startContinuousMove(axisNo, start, backward)
|
|
|
|
def startSingleStep(self, axisNo, backward):
|
|
"""Triggers a single step in desired direction.
|
|
|
|
Parameters
|
|
axisNo Axis number (0 ... 2)
|
|
backward If the step direction is forward (0) or backward (1)
|
|
Returns
|
|
None
|
|
"""
|
|
with self._lock:
|
|
return self._hw.startSingleStep(axisNo, backward)
|
|
|
|
|
|
class Stopped(RuntimeError):
|
|
"""thread was stopped"""
|
|
|
|
|
|
class Axis(HasIO, Drivable):
|
|
axis = Property('axis number', IntRange(0, 2), 0)
|
|
value = Parameter('axis position', FloatRange(unit='deg'))
|
|
frequency = Parameter('frequency', FloatRange(1, unit='Hz'), readonly=False)
|
|
amplitude = Parameter('amplitude', FloatRange(0, unit='V'), readonly=False)
|
|
gear = Parameter('gear factor', FloatRange(), readonly=False, value=1)
|
|
tolerance = Parameter('positioning tolerance', FloatRange(0, unit='$'),
|
|
readonly=False, default=0.01)
|
|
sensor_connected = Parameter('a sensor is connected', BoolType())
|
|
info = Parameter('axis info', StringType())
|
|
statusbits = Parameter('status bits', StringType())
|
|
step_mode = Parameter('step mode (soft closed loop)', BoolType(),
|
|
default=False, readonly=False, group='step_mode')
|
|
timeout = Parameter('timeout after no progress detected', FloatRange(0),
|
|
default=1, readonly=False, group='step_mode')
|
|
steps_fwd = Parameter('forward steps / main unit', FloatRange(0), unit='$/s',
|
|
default=0, readonly=False, group='step_mode')
|
|
steps_bwd = Parameter('backward steps / main unit', FloatRange(0, unit='$/s'),
|
|
default=0, readonly=False, group='step_mode')
|
|
delay = Parameter('delay between tries within loop', FloatRange(0, unit='s'),
|
|
readonly=False, default=0.05, group='step_mode')
|
|
maxstep = Parameter('max. step duration', FloatRange(0, unit='s'),
|
|
default=0.25, readonly=False, group='step_mode')
|
|
prop = Parameter('factor for control loop', FloatRange(0, 1),
|
|
readonly=False, default=0.8, group='step_mode')
|
|
uri = 'ANC'
|
|
ioClass = IO
|
|
target_min = Limit()
|
|
target_max = Limit()
|
|
|
|
fast_interval = 0.25
|
|
|
|
_hw = None
|
|
_scale = 1 # scale for custom units
|
|
SCALES = {'deg': 1, 'm': 1, 'mm': 1000, 'um': 1000000, 'µm': 1000000}
|
|
_thread = None
|
|
_moving_since = 0
|
|
_status = IDLE, ''
|
|
_calib_range = None
|
|
_try_cnt = 0
|
|
_at_target = False
|
|
|
|
def initModule(self):
|
|
super().initModule()
|
|
self._stopped = threading.Event()
|
|
if self.axis in IO.used_axes:
|
|
raise ConfigError(f'a module with axisNo={self.axis} already exists')
|
|
IO.used_axes.add(self.axis)
|
|
|
|
def initialReads(self):
|
|
self.read_info()
|
|
super().initialReads()
|
|
|
|
def shutdownModule(self):
|
|
IO.used_axes.discard(self.axis)
|
|
|
|
def read_value(self):
|
|
if self._thread:
|
|
return self.value
|
|
try:
|
|
return self._read_pos()
|
|
except Stopped:
|
|
return self.value
|
|
|
|
def write_gear(self, value):
|
|
self._scale = self.SCALES[self.parameters['value'].datatype.unit] * self.gear
|
|
return value
|
|
|
|
def read_frequency(self):
|
|
return self.io.getFrequency(self.axis)
|
|
|
|
def write_frequency(self, value):
|
|
self.io.setFrequency(self.axis, value)
|
|
return self.io.getFrequency(self.axis)
|
|
|
|
def read_amplitude(self):
|
|
return self.io.getAmplitude(self.axis)
|
|
|
|
def write_amplitude(self, value):
|
|
self.io.setAmplitude(self.axis, value)
|
|
return self.io.getAmplitude(self.axis)
|
|
|
|
def read_statusbits(self):
|
|
self._get_status()
|
|
return self.statusbits
|
|
|
|
def _get_status(self):
|
|
"""get axis status
|
|
|
|
- update self.sensor_connected and self.statusbits
|
|
- return <moving flag>, <error flag>, <reason>
|
|
|
|
<moving flag> is True whn moving
|
|
<in_error> is True when in error
|
|
<reason> is an error text, when in error, 'at target' or '' otherwise
|
|
"""
|
|
statusbits = self.io.getAxisStatus(self.axis)
|
|
self.sensor_connected, self._output, moving, at_target, fwd_stuck, bwd_stuck, error = statusbits
|
|
self.statusbits = ''.join(k for k, v in zip('OTFBE', (self._output,) + statusbits[3:]) if v)
|
|
if error:
|
|
return ERROR, 'other error'
|
|
if bwd_stuck:
|
|
return ERROR, 'end of travel backward'
|
|
if fwd_stuck:
|
|
return ERROR, 'end of travel forward'
|
|
target_reached = at_target > self._at_target
|
|
self._at_target = at_target
|
|
if self._moving_since:
|
|
if target_reached:
|
|
return IDLE, 'at target'
|
|
if time.time() < self._moving_since + 0.25:
|
|
return BUSY, 'started'
|
|
if at_target:
|
|
return IDLE, 'at target'
|
|
if moving and self._output:
|
|
return BUSY, 'moving'
|
|
return WARN, 'stopped by unknown reason'
|
|
if self._moving_since is False:
|
|
return IDLE, 'stopped'
|
|
if not self.step_mode and at_target:
|
|
return IDLE, 'at target'
|
|
return IDLE, ''
|
|
|
|
def read_status(self):
|
|
status = self._get_status()
|
|
if self.step_mode:
|
|
return self._status
|
|
if self._moving_since:
|
|
if status[0] != BUSY:
|
|
self._moving_since = 0
|
|
self.setFastPoll(False)
|
|
return status
|
|
|
|
def _wait(self, delay):
|
|
if self._stopped.wait(delay):
|
|
raise Stopped()
|
|
|
|
def _read_pos(self):
|
|
if not self.sensor_connected:
|
|
return 0
|
|
poslist = []
|
|
for i in range(9):
|
|
if i:
|
|
self._wait(0.001)
|
|
poslist.append(self.io.getPosition(self.axis) * self._scale)
|
|
self._poslist = sorted(poslist)
|
|
return self._poslist[len(poslist) // 2] # median
|
|
|
|
def _run_drive(self, target):
|
|
self.value = self._read_pos()
|
|
self.status = self._status = BUSY, 'drive by steps'
|
|
deadline = time.time() + self.timeout
|
|
max_steps = self.maxstep * self.frequency
|
|
while True:
|
|
for _ in range(2):
|
|
dif = target - self.value
|
|
steps_per_unit = self.steps_bwd if dif < 0 else self.steps_fwd
|
|
tol = max(self.tolerance, 0.6 / steps_per_unit) # avoid a tolerance less than 60% of a step
|
|
if abs(dif) > tol * 3:
|
|
break
|
|
# extra wait time when already close
|
|
self._wait(2 * self.delay)
|
|
self.read_value()
|
|
status = None
|
|
if abs(dif) < tol:
|
|
status = IDLE, 'in tolerance'
|
|
elif self._poslist[2] <= target <= self._poslist[-3]: # target within noise
|
|
status = IDLE, 'within noise'
|
|
elif dif > 0:
|
|
steps = min(max_steps, min(dif, (dif + tol) * self.prop) * steps_per_unit)
|
|
else:
|
|
steps = max(-max_steps, max(dif, (dif - tol) * self.prop) * steps_per_unit)
|
|
if status or steps == 0:
|
|
self._status = status
|
|
break
|
|
if round(steps) == 0: # this should not happen
|
|
self._status = WARN, 'steps=0'
|
|
break
|
|
self._move_steps(steps)
|
|
if self._step_size > self.prop * 0.25 / steps_per_unit:
|
|
# some progress happened
|
|
deadline = time.time() + self.timeout
|
|
elif time.time() > deadline:
|
|
self._status = WARN, 'timeout - no progress'
|
|
break
|
|
self.read_status()
|
|
|
|
def _thread_wrapper(self, func, *args):
|
|
try:
|
|
func(*args)
|
|
except Stopped as e:
|
|
self._status = IDLE, str(e)
|
|
except Exception as e:
|
|
self._status = ERROR, f'{type(e).__name__} - {e}'
|
|
finally:
|
|
self.io.setAxisOutput(self.axis, enable=0, autoDisable=0)
|
|
self.setFastPoll(False)
|
|
self._stopped.clear()
|
|
self._thread = None
|
|
|
|
def _stop_thread(self):
|
|
if self._thread:
|
|
self._stopped.set()
|
|
self._thread.join()
|
|
|
|
def _start_thread(self, *args):
|
|
self._stop_thread()
|
|
thread = threading.Thread(target=self._thread_wrapper, args=args)
|
|
self._thread = thread
|
|
thread.start()
|
|
|
|
def write_target(self, target):
|
|
if not self.sensor_connected:
|
|
raise HardwareError('no sensor connected')
|
|
self._stop_thread()
|
|
self.io.setTargetRange(self.axis, self.tolerance / self._scale)
|
|
if self.step_mode:
|
|
self.status = BUSY, 'changed target'
|
|
self._start_thread(self._run_drive, target)
|
|
else:
|
|
self._try_cnt = 0
|
|
self.setFastPoll(True, self.fast_interval)
|
|
self.io.setTargetPosition(self.axis, target / self._scale)
|
|
self.io.setAxisOutput(self.axis, enable=1, autoDisable=0)
|
|
self.io.startAutoMove(self.axis, enable=1, relative=0)
|
|
self._moving_since = time.time()
|
|
self.status = self._get_status()
|
|
return target
|
|
|
|
@Command()
|
|
def stop(self):
|
|
if self.step_mode:
|
|
self._stop_thread()
|
|
self._status = IDLE, 'stopped'
|
|
elif self._moving_since:
|
|
self._moving_since = False # indicate stop
|
|
self.read_status()
|
|
|
|
@Command(IntRange())
|
|
def move(self, steps):
|
|
"""relative move by number of steps"""
|
|
self._stop_thread()
|
|
self.read_value()
|
|
if steps > 0:
|
|
if self.value > self.target_max:
|
|
raise BadValueError('above upper limit')
|
|
elif self.value < self.target_min:
|
|
raise BadValueError('below lower limit')
|
|
self.status = self._status = BUSY, 'moving relative'
|
|
self._start_thread(self._run_move, steps)
|
|
|
|
def _run_move(self, steps):
|
|
self.setFastPoll(True, self.fast_interval)
|
|
self._move_steps(steps)
|
|
self.status = self._status = IDLE, ''
|
|
|
|
def _move_steps(self, steps):
|
|
steps = round(steps)
|
|
if not steps:
|
|
return
|
|
previous = self._read_pos()
|
|
self.io.setAxisOutput(self.axis, enable=1, autoDisable=0)
|
|
# wait for output is really on
|
|
for i in range(100):
|
|
self._wait(0.001)
|
|
self._get_status()
|
|
if self._output:
|
|
break
|
|
else:
|
|
raise ValueError('can not switch on output')
|
|
for cnt in range(abs(steps)):
|
|
self.io.setAxisOutput(self.axis, enable=1, autoDisable=0)
|
|
if not self._thread:
|
|
raise Stopped('stopped')
|
|
self.io.startSingleStep(self.axis, steps < 0)
|
|
self._wait(1 / self.frequency)
|
|
self._get_status()
|
|
if cnt and not self._output:
|
|
steps = cnt
|
|
break
|
|
self._wait(self.delay)
|
|
self.value = self._read_pos()
|
|
self._step_size = (self.value - previous) / steps
|
|
|
|
@Command(IntRange(0))
|
|
def calib_steps(self, delta):
|
|
"""calibrate steps_fwd and steps_bwd using <delta> steps forwards and backwards"""
|
|
if not self.sensor_connected:
|
|
raise HardwareError('no sensor connected')
|
|
self._stop_thread()
|
|
self._status = BUSY, 'calibrate step size'
|
|
self.read_status()
|
|
self._start_thread(self._run_calib, delta)
|
|
|
|
def _run_calib(self, steps):
|
|
self.value = self._read_pos()
|
|
if self._calib_range is None or abs(self.target - self.value) > self._calib_range:
|
|
self.target = self.value
|
|
maxfwd = 0
|
|
maxbwd = 0
|
|
cntfwd = 0
|
|
cntbwd = 0
|
|
self._calib_range = 0
|
|
for i in range(10):
|
|
if self.value <= self.target:
|
|
self._status = BUSY, 'move forwards'
|
|
self.read_status()
|
|
self._move_steps(steps)
|
|
while True:
|
|
self._move_steps(steps)
|
|
if self._step_size and self._output:
|
|
maxfwd = max(maxfwd, self._step_size)
|
|
cntfwd += 1
|
|
if self.value > self.target:
|
|
break
|
|
else:
|
|
self._status = BUSY, 'move backwards'
|
|
self.read_status()
|
|
self._move_steps(-steps)
|
|
while True:
|
|
self._move_steps(-steps)
|
|
if self._step_size:
|
|
maxbwd = max(maxbwd, self._step_size)
|
|
cntbwd += 1
|
|
if self.value < self.target:
|
|
break
|
|
# keep track how far we had to go for calibration
|
|
self._calib_range = max(self._calib_range, abs(self.value - self.target))
|
|
if cntfwd >= 3 and cntbwd >= 3:
|
|
self.steps_fwd = 1 / maxfwd
|
|
self.steps_bwd = 1 / maxbwd
|
|
self._status = IDLE, 'calib step size done'
|
|
break
|
|
else:
|
|
self._status = WARN, 'calib step size failed'
|
|
self.read_status()
|
|
|
|
@nopoll
|
|
def read_info(self):
|
|
"""read info from controller"""
|
|
axistype = ['linear', 'gonio', 'rotator'][self.io.getActuatorType(self.axis)]
|
|
name = self.io.getActuatorName(self.axis)
|
|
cap = self.io.measureCapacitance(self.axis) * 1e9
|
|
return f'{name} {axistype} {cap:.3g}nF'
|