frappy/frappy_psi/attocube.py
Markus Zolliker 70a31b5cae attocube: remove seperate pyanc350 python module
- implement positioner as an io class
- proper shut down behaviour

Change-Id: If04176f779809fd5b08f586556cac668cf188479
2024-03-28 10:12:18 +01:00

707 lines
25 KiB
Python

# *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
# *****************************************************************************
import time
import threading
from frappy.core import Drivable, Parameter, Command, Property, Module, HasIO, \
ERROR, WARN, BUSY, IDLE, nopoll, Limit
from frappy.datatypes import IntRange, FloatRange, StringType, BoolType
from frappy.errors import BadValueError, HardwareError, ConfigError
from PyANC350v4 import Positioner
class IO(Module):
"""'communication' module for attocube controller
why an extra class:
- HasIO assures that a single common communicator is used
- access must be thread safe
"""
uri = Property('dummy uri, only one controller may exists',
StringType())
export = False
_hw = None
_lock = None
used_axes = set()
def initModule(self):
if self._hw is None:
IO._lock = threading.Lock()
IO._hw = Positioner()
super().initModule()
def shutdownModule(self):
if IO._hw:
IO._hw.disconnect()
IO._hw = None
def configureAQuadBIn(self, axisNo, enable, resolution):
"""Enables and configures the A-Quad-B (quadrature) input for the target position.
Parameters
axisNo Axis number (0 ... 2)
enable Enable (1) or disable (0) A-Quad-B input
resolution A-Quad-B step width in m. Internal resolution is 1 nm.
Returns
None
"""
with self._lock:
return self._hw.configureAQuadBIn(axisNo, enable, resolution)
def configureAQuadBOut(self, axisNo, enable, resolution, clock):
"""Enables and configures the A-Quad-B output of the current position.
Parameters
axisNo Axis number (0 ... 2)
enable Enable (1) or disable (0) A-Quad-B output
resolution A-Quad-B step width in m; internal resolution is 1 nm
clock Clock of the A-Quad-B output [s]. Allowed range is 40ns ... 1.3ms; internal resulution is 20ns.
Returns
None
"""
with self._lock:
return self._hw.configureAQuadBOut(axisNo, enable, resolution, clock)
def configureExtTrigger(self, axisNo, mode):
"""Enables the input trigger for steps.
Parameters
axisNo Axis number (0 ... 2)
mode Disable (0), Quadratur (1), Trigger(2) for external triggering
Returns
None
"""
with self._lock:
return self._hw.configureExtTrigger(axisNo, mode)
def configureNslTriggerAxis(self, axisNo):
"""Selects Axis for NSL Trigger.
Parameters
axisNo Axis number (0 ... 2)
Returns
None
"""
with self._lock:
return self._hw.configureNslTriggerAxis(axisNo)
def configureRngTrigger(self, axisNo, lower, upper):
"""Configure lower position for range Trigger.
Parameters
axisNo Axis number (0 ... 2)
lower Lower position for range trigger (nm)
upper Upper position for range trigger (nm)
Returns
None
"""
with self._lock:
return self._hw.configureRngTrigger(axisNo, lower, upper)
def configureRngTriggerEps(self, axisNo, epsilon):
"""Configure hysteresis for range Trigger.
Parameters
axisNo Axis number (0 ... 2)
epsilon hysteresis in nm / mdeg
Returns
None
"""
with self._lock:
return self._hw.configureRngTriggerEps(axisNo, epsilon)
def configureRngTriggerPol(self, axisNo, polarity):
"""Configure lower position for range Trigger.
Parameters
axisNo Axis number (0 ... 2)
polarity Polarity of trigger signal when position is between lower and upper Low(0) and High(1)
Returns
None
"""
with self._lock:
return self._hw.configureRngTriggerPol(axisNo, polarity)
def getActuatorName(self, axisNo):
"""Get the name of the currently selected actuator
Parameters
axisNo Axis number (0 ... 2)
Returns
name Name of the actuator
"""
with self._lock:
return self._hw.getActuatorName(axisNo)
def getActuatorType(self, axisNo):
"""Get the type of the currently selected actuator
Parameters
axisNo Axis number (0 ... 2)
Returns
type_ Type of the actuator {0: linear, 1: goniometer, 2: rotator}
"""
with self._lock:
return self._hw.getActuatorType(axisNo)
def getAmplitude(self, axisNo):
"""Reads back the amplitude parameter of an axis.
Parameters
axisNo Axis number (0 ... 2)
Returns
amplitude Amplitude V
"""
with self._lock:
return self._hw.getAmplitude(axisNo)
def getAxisStatus(self, axisNo):
"""Reads status information about an axis of the device.
Parameters
axisNo Axis number (0 ... 2)
Returns
connected Output: If the axis is connected to a sensor.
enabled Output: If the axis voltage output is enabled.
moving Output: If the axis is moving.
target Output: If the target is reached in automatic positioning
eotFwd Output: If end of travel detected in forward direction.
eotBwd Output: If end of travel detected in backward direction.
error Output: If the axis' sensor is in error state.
"""
with self._lock:
return self._hw.getAxisStatus(axisNo)
def getFrequency(self, axisNo):
"""Reads back the frequency parameter of an axis.
Parameters
axisNo Axis number (0 ... 2)
Returns
frequency Output: Frequency in Hz
"""
with self._lock:
return self._hw.getFrequency(axisNo)
def getPosition(self, axisNo):
"""Retrieves the current actuator position. For linear type actuators the position unit is m; for goniometers and rotators it is degree.
Parameters
axisNo Axis number (0 ... 2)
Returns
position Output: Current position [m] or [°]
"""
with self._lock:
return self._hw.getPosition(axisNo)
def measureCapacitance(self, axisNo):
"""Performs a measurement of the capacitance of the piezo motor and returns the result. If no motor is connected, the result will be 0. The function doesn't return before the measurement is complete; this will take a few seconds of time.
Parameters
axisNo Axis number (0 ... 2)
Returns
cap Output: Capacitance [F]
"""
with self._lock:
return self._hw.measureCapacitance(axisNo)
def selectActuator(self, axisNo, actuator):
"""Selects the actuator to be used for the axis from actuator presets.
Parameters
axisNo Axis number (0 ... 2)
actuator Actuator selection (0 ... 255)
0: ANPg101res
1: ANGt101res
2: ANPx51res
3: ANPx101res
4: ANPx121res
5: ANPx122res
6: ANPz51res
7: ANPz101res
8: ANR50res
9: ANR51res
10: ANR101res
11: Test
Returns
None
"""
with self._lock:
return self._hw.selectActuator(axisNo, actuator)
def setAmplitude(self, axisNo, amplitude):
"""Sets the amplitude parameter for an axis
Parameters
axisNo Axis number (0 ... 2)
amplitude Amplitude in V, internal resolution is 1 mV
Returns
None
"""
with self._lock:
return self._hw.setAmplitude(axisNo, amplitude)
def setAxisOutput(self, axisNo, enable, autoDisable):
"""Enables or disables the voltage output of an axis.
Parameters
axisNo Axis number (0 ... 2)
enable Enables (1) or disables (0) the voltage output.
autoDisable If the voltage output is to be deactivated automatically when end of travel is detected.
Returns
None
"""
with self._lock:
return self._hw.setAxisOutput(axisNo, enable, autoDisable)
def setDcVoltage(self, axisNo, voltage):
"""Sets the DC level on the voltage output when no sawtooth based motion is active.
Parameters
axisNo Axis number (0 ... 2)
voltage DC output voltage [V], internal resolution is 1 mV
Returns
None
"""
with self._lock:
return self._hw.setDcVoltage(axisNo, voltage)
def setFrequency(self, axisNo, frequency):
"""Sets the frequency parameter for an axis
Parameters
axisNo Axis number (0 ... 2)
frequency Frequency in Hz, internal resolution is 1 Hz
Returns
None
"""
with self._lock:
return self._hw.setFrequency(axisNo, frequency)
def setTargetPosition(self, axisNo, target):
"""Sets the target position for automatic motion, see ANC_startAutoMove. For linear type actuators the position unit is m, for goniometers and rotators it is degree.
Parameters
axisNo Axis number (0 ... 2)
target Target position [m] or [°]. Internal resulution is 1 nm or 1 µ°.
Returns
None
"""
with self._lock:
return self._hw.setTargetPosition(axisNo, target)
def setTargetRange(self, axisNo, targetRg):
"""Defines the range around the target position where the target is considered to be reached.
Parameters
axisNo Axis number (0 ... 2)
targetRg Target range [m] or [°]. Internal resulution is 1 nm or 1 µ°.
Returns
None
"""
with self._lock:
return self._hw.setTargetRange(axisNo, targetRg)
def startAutoMove(self, axisNo, enable, relative):
"""Switches automatic moving (i.e. following the target position) on or off
Parameters
axisNo Axis number (0 ... 2)
enable Enables (1) or disables (0) automatic motion
relative If the target position is to be interpreted absolute (0) or relative to the current position (1)
Returns
None
"""
with self._lock:
return self._hw.startAutoMove(axisNo, enable, relative)
def startContinuousMove(self, axisNo, start, backward):
"""Starts or stops continous motion in forward direction. Other kinds of motions are stopped.
Parameters
axisNo Axis number (0 ... 2)
start Starts (1) or stops (0) the motion
backward If the move direction is forward (0) or backward (1)
Returns
None
"""
with self._lock:
return self._hw.startContinuousMove(axisNo, start, backward)
def startSingleStep(self, axisNo, backward):
"""Triggers a single step in desired direction.
Parameters
axisNo Axis number (0 ... 2)
backward If the step direction is forward (0) or backward (1)
Returns
None
"""
with self._lock:
return self._hw.startSingleStep(axisNo, backward)
class Stopped(RuntimeError):
"""thread was stopped"""
class Axis(HasIO, Drivable):
axis = Property('axis number', IntRange(0, 2), 0)
value = Parameter('axis position', FloatRange(unit='deg'))
frequency = Parameter('frequency', FloatRange(1, unit='Hz'), readonly=False)
amplitude = Parameter('amplitude', FloatRange(0, unit='V'), readonly=False)
gear = Parameter('gear factor', FloatRange(), readonly=False, value=1)
tolerance = Parameter('positioning tolerance', FloatRange(0, unit='$'),
readonly=False, default=0.01)
sensor_connected = Parameter('a sensor is connected', BoolType())
info = Parameter('axis info', StringType())
statusbits = Parameter('status bits', StringType())
step_mode = Parameter('step mode (soft closed loop)', BoolType(),
default=False, readonly=False, group='step_mode')
timeout = Parameter('timeout after no progress detected', FloatRange(0),
default=1, readonly=False, group='step_mode')
steps_fwd = Parameter('forward steps / main unit', FloatRange(0), unit='$/s',
default=0, readonly=False, group='step_mode')
steps_bwd = Parameter('backward steps / main unit', FloatRange(0, unit='$/s'),
default=0, readonly=False, group='step_mode')
delay = Parameter('delay between tries within loop', FloatRange(0, unit='s'),
readonly=False, default=0.05, group='step_mode')
maxstep = Parameter('max. step duration', FloatRange(0, unit='s'),
default=0.25, readonly=False, group='step_mode')
prop = Parameter('factor for control loop', FloatRange(0, 1),
readonly=False, default=0.8, group='step_mode')
uri = 'ANC'
ioClass = IO
target_min = Limit()
target_max = Limit()
fast_interval = 0.25
_hw = None
_scale = 1 # scale for custom units
SCALES = {'deg': 1, 'm': 1, 'mm': 1000, 'um': 1000000, 'µm': 1000000}
_thread = None
_moving_since = 0
_status = IDLE, ''
_calib_range = None
_try_cnt = 0
_at_target = False
def initModule(self):
super().initModule()
self._stopped = threading.Event()
if self.axis in IO.used_axes:
raise ConfigError(f'a module with axisNo={self.axis} already exists')
IO.used_axes.add(self.axis)
def initialReads(self):
self.read_info()
super().initialReads()
def shutdownModule(self):
IO.used_axes.discard(self.axis)
def read_value(self):
if self._thread:
return self.value
try:
return self._read_pos()
except Stopped:
return self.value
def write_gear(self, value):
self._scale = self.SCALES[self.parameters['value'].datatype.unit] * self.gear
return value
def read_frequency(self):
return self.io.getFrequency(self.axis)
def write_frequency(self, value):
self.io.setFrequency(self.axis, value)
return self.io.getFrequency(self.axis)
def read_amplitude(self):
return self.io.getAmplitude(self.axis)
def write_amplitude(self, value):
self.io.setAmplitude(self.axis, value)
return self.io.getAmplitude(self.axis)
def read_statusbits(self):
self._get_status()
return self.statusbits
def _get_status(self):
"""get axis status
- update self.sensor_connected and self.statusbits
- return <moving flag>, <error flag>, <reason>
<moving flag> is True whn moving
<in_error> is True when in error
<reason> is an error text, when in error, 'at target' or '' otherwise
"""
statusbits = self.io.getAxisStatus(self.axis)
self.sensor_connected, self._output, moving, at_target, fwd_stuck, bwd_stuck, error = statusbits
self.statusbits = ''.join(k for k, v in zip('OTFBE', (self._output,) + statusbits[3:]) if v)
if error:
return ERROR, 'other error'
if bwd_stuck:
return ERROR, 'end of travel backward'
if fwd_stuck:
return ERROR, 'end of travel forward'
target_reached = at_target > self._at_target
self._at_target = at_target
if self._moving_since:
if target_reached:
return IDLE, 'at target'
if time.time() < self._moving_since + 0.25:
return BUSY, 'started'
if at_target:
return IDLE, 'at target'
if moving and self._output:
return BUSY, 'moving'
return WARN, 'stopped by unknown reason'
if self._moving_since is False:
return IDLE, 'stopped'
if not self.step_mode and at_target:
return IDLE, 'at target'
return IDLE, ''
def read_status(self):
status = self._get_status()
if self.step_mode:
return self._status
if self._moving_since:
if status[0] != BUSY:
self._moving_since = 0
self.setFastPoll(False)
return status
def _wait(self, delay):
if self._stopped.wait(delay):
raise Stopped()
def _read_pos(self):
if not self.sensor_connected:
return 0
poslist = []
for i in range(9):
if i:
self._wait(0.001)
poslist.append(self.io.getPosition(self.axis) * self._scale)
self._poslist = sorted(poslist)
return self._poslist[len(poslist) // 2] # median
def _run_drive(self, target):
self.value = self._read_pos()
self.status = self._status = BUSY, 'drive by steps'
deadline = time.time() + self.timeout
max_steps = self.maxstep * self.frequency
while True:
for _ in range(2):
dif = target - self.value
steps_per_unit = self.steps_bwd if dif < 0 else self.steps_fwd
tol = max(self.tolerance, 0.6 / steps_per_unit) # avoid a tolerance less than 60% of a step
if abs(dif) > tol * 3:
break
# extra wait time when already close
self._wait(2 * self.delay)
self.read_value()
status = None
if abs(dif) < tol:
status = IDLE, 'in tolerance'
elif self._poslist[2] <= target <= self._poslist[-3]: # target within noise
status = IDLE, 'within noise'
elif dif > 0:
steps = min(max_steps, min(dif, (dif + tol) * self.prop) * steps_per_unit)
else:
steps = max(-max_steps, max(dif, (dif - tol) * self.prop) * steps_per_unit)
if status or steps == 0:
self._status = status
break
if round(steps) == 0: # this should not happen
self._status = WARN, 'steps=0'
break
self._move_steps(steps)
if self._step_size > self.prop * 0.25 / steps_per_unit:
# some progress happened
deadline = time.time() + self.timeout
elif time.time() > deadline:
self._status = WARN, 'timeout - no progress'
break
self.read_status()
def _thread_wrapper(self, func, *args):
try:
func(*args)
except Stopped as e:
self._status = IDLE, str(e)
except Exception as e:
self._status = ERROR, f'{type(e).__name__} - {e}'
finally:
self.io.setAxisOutput(self.axis, enable=0, autoDisable=0)
self.setFastPoll(False)
self._stopped.clear()
self._thread = None
def _stop_thread(self):
if self._thread:
self._stopped.set()
self._thread.join()
def _start_thread(self, *args):
self._stop_thread()
thread = threading.Thread(target=self._thread_wrapper, args=args)
self._thread = thread
thread.start()
def write_target(self, target):
if not self.sensor_connected:
raise HardwareError('no sensor connected')
self._stop_thread()
self.io.setTargetRange(self.axis, self.tolerance / self._scale)
if self.step_mode:
self.status = BUSY, 'changed target'
self._start_thread(self._run_drive, target)
else:
self._try_cnt = 0
self.setFastPoll(True, self.fast_interval)
self.io.setTargetPosition(self.axis, target / self._scale)
self.io.setAxisOutput(self.axis, enable=1, autoDisable=0)
self.io.startAutoMove(self.axis, enable=1, relative=0)
self._moving_since = time.time()
self.status = self._get_status()
return target
@Command()
def stop(self):
if self.step_mode:
self._stop_thread()
self._status = IDLE, 'stopped'
elif self._moving_since:
self._moving_since = False # indicate stop
self.read_status()
@Command(IntRange())
def move(self, steps):
"""relative move by number of steps"""
self._stop_thread()
self.read_value()
if steps > 0:
if self.value > self.target_max:
raise BadValueError('above upper limit')
elif self.value < self.target_min:
raise BadValueError('below lower limit')
self.status = self._status = BUSY, 'moving relative'
self._start_thread(self._run_move, steps)
def _run_move(self, steps):
self.setFastPoll(True, self.fast_interval)
self._move_steps(steps)
self.status = self._status = IDLE, ''
def _move_steps(self, steps):
steps = round(steps)
if not steps:
return
previous = self._read_pos()
self.io.setAxisOutput(self.axis, enable=1, autoDisable=0)
# wait for output is really on
for i in range(100):
self._wait(0.001)
self._get_status()
if self._output:
break
else:
raise ValueError('can not switch on output')
for cnt in range(abs(steps)):
self.io.setAxisOutput(self.axis, enable=1, autoDisable=0)
if not self._thread:
raise Stopped('stopped')
self.io.startSingleStep(self.axis, steps < 0)
self._wait(1 / self.frequency)
self._get_status()
if cnt and not self._output:
steps = cnt
break
self._wait(self.delay)
self.value = self._read_pos()
self._step_size = (self.value - previous) / steps
@Command(IntRange(0))
def calib_steps(self, delta):
"""calibrate steps_fwd and steps_bwd using <delta> steps forwards and backwards"""
if not self.sensor_connected:
raise HardwareError('no sensor connected')
self._stop_thread()
self._status = BUSY, 'calibrate step size'
self.read_status()
self._start_thread(self._run_calib, delta)
def _run_calib(self, steps):
self.value = self._read_pos()
if self._calib_range is None or abs(self.target - self.value) > self._calib_range:
self.target = self.value
maxfwd = 0
maxbwd = 0
cntfwd = 0
cntbwd = 0
self._calib_range = 0
for i in range(10):
if self.value <= self.target:
self._status = BUSY, 'move forwards'
self.read_status()
self._move_steps(steps)
while True:
self._move_steps(steps)
if self._step_size and self._output:
maxfwd = max(maxfwd, self._step_size)
cntfwd += 1
if self.value > self.target:
break
else:
self._status = BUSY, 'move backwards'
self.read_status()
self._move_steps(-steps)
while True:
self._move_steps(-steps)
if self._step_size:
maxbwd = max(maxbwd, self._step_size)
cntbwd += 1
if self.value < self.target:
break
# keep track how far we had to go for calibration
self._calib_range = max(self._calib_range, abs(self.value - self.target))
if cntfwd >= 3 and cntbwd >= 3:
self.steps_fwd = 1 / maxfwd
self.steps_bwd = 1 / maxbwd
self._status = IDLE, 'calib step size done'
break
else:
self._status = WARN, 'calib step size failed'
self.read_status()
@nopoll
def read_info(self):
"""read info from controller"""
axistype = ['linear', 'gonio', 'rotator'][self.io.getActuatorType(self.axis)]
name = self.io.getActuatorName(self.axis)
cap = self.io.measureCapacitance(self.axis) * 1e9
return f'{name} {axistype} {cap:.3g}nF'