attocube: remove seperate pyanc350 python module

- implement positioner as an io class
- proper shut down behaviour

Change-Id: If04176f779809fd5b08f586556cac668cf188479
This commit is contained in:
zolliker 2024-03-28 10:12:18 +01:00
parent 8ee97ade63
commit 70a31b5cae
2 changed files with 359 additions and 94 deletions

View File

@ -19,18 +19,348 @@
import time
import threading
from frappy.core import Drivable, Parameter, Command, Property, \
from frappy.core import Drivable, Parameter, Command, Property, Module, HasIO, \
ERROR, WARN, BUSY, IDLE, nopoll, Limit
from frappy.datatypes import IntRange, FloatRange, StringType, BoolType
from frappy.errors import BadValueError, HardwareError
from frappy_psi.pyanc350 import HwAxis
from frappy.errors import BadValueError, HardwareError, ConfigError
from PyANC350v4 import Positioner
class IO(Module):
"""'communication' module for attocube controller
why an extra class:
- HasIO assures that a single common communicator is used
- access must be thread safe
"""
uri = Property('dummy uri, only one controller may exists',
StringType())
export = False
_hw = None
_lock = None
used_axes = set()
def initModule(self):
if self._hw is None:
IO._lock = threading.Lock()
IO._hw = Positioner()
super().initModule()
def shutdownModule(self):
if IO._hw:
IO._hw.disconnect()
IO._hw = None
def configureAQuadBIn(self, axisNo, enable, resolution):
"""Enables and configures the A-Quad-B (quadrature) input for the target position.
Parameters
axisNo Axis number (0 ... 2)
enable Enable (1) or disable (0) A-Quad-B input
resolution A-Quad-B step width in m. Internal resolution is 1 nm.
Returns
None
"""
with self._lock:
return self._hw.configureAQuadBIn(axisNo, enable, resolution)
def configureAQuadBOut(self, axisNo, enable, resolution, clock):
"""Enables and configures the A-Quad-B output of the current position.
Parameters
axisNo Axis number (0 ... 2)
enable Enable (1) or disable (0) A-Quad-B output
resolution A-Quad-B step width in m; internal resolution is 1 nm
clock Clock of the A-Quad-B output [s]. Allowed range is 40ns ... 1.3ms; internal resulution is 20ns.
Returns
None
"""
with self._lock:
return self._hw.configureAQuadBOut(axisNo, enable, resolution, clock)
def configureExtTrigger(self, axisNo, mode):
"""Enables the input trigger for steps.
Parameters
axisNo Axis number (0 ... 2)
mode Disable (0), Quadratur (1), Trigger(2) for external triggering
Returns
None
"""
with self._lock:
return self._hw.configureExtTrigger(axisNo, mode)
def configureNslTriggerAxis(self, axisNo):
"""Selects Axis for NSL Trigger.
Parameters
axisNo Axis number (0 ... 2)
Returns
None
"""
with self._lock:
return self._hw.configureNslTriggerAxis(axisNo)
def configureRngTrigger(self, axisNo, lower, upper):
"""Configure lower position for range Trigger.
Parameters
axisNo Axis number (0 ... 2)
lower Lower position for range trigger (nm)
upper Upper position for range trigger (nm)
Returns
None
"""
with self._lock:
return self._hw.configureRngTrigger(axisNo, lower, upper)
def configureRngTriggerEps(self, axisNo, epsilon):
"""Configure hysteresis for range Trigger.
Parameters
axisNo Axis number (0 ... 2)
epsilon hysteresis in nm / mdeg
Returns
None
"""
with self._lock:
return self._hw.configureRngTriggerEps(axisNo, epsilon)
def configureRngTriggerPol(self, axisNo, polarity):
"""Configure lower position for range Trigger.
Parameters
axisNo Axis number (0 ... 2)
polarity Polarity of trigger signal when position is between lower and upper Low(0) and High(1)
Returns
None
"""
with self._lock:
return self._hw.configureRngTriggerPol(axisNo, polarity)
def getActuatorName(self, axisNo):
"""Get the name of the currently selected actuator
Parameters
axisNo Axis number (0 ... 2)
Returns
name Name of the actuator
"""
with self._lock:
return self._hw.getActuatorName(axisNo)
def getActuatorType(self, axisNo):
"""Get the type of the currently selected actuator
Parameters
axisNo Axis number (0 ... 2)
Returns
type_ Type of the actuator {0: linear, 1: goniometer, 2: rotator}
"""
with self._lock:
return self._hw.getActuatorType(axisNo)
def getAmplitude(self, axisNo):
"""Reads back the amplitude parameter of an axis.
Parameters
axisNo Axis number (0 ... 2)
Returns
amplitude Amplitude V
"""
with self._lock:
return self._hw.getAmplitude(axisNo)
def getAxisStatus(self, axisNo):
"""Reads status information about an axis of the device.
Parameters
axisNo Axis number (0 ... 2)
Returns
connected Output: If the axis is connected to a sensor.
enabled Output: If the axis voltage output is enabled.
moving Output: If the axis is moving.
target Output: If the target is reached in automatic positioning
eotFwd Output: If end of travel detected in forward direction.
eotBwd Output: If end of travel detected in backward direction.
error Output: If the axis' sensor is in error state.
"""
with self._lock:
return self._hw.getAxisStatus(axisNo)
def getFrequency(self, axisNo):
"""Reads back the frequency parameter of an axis.
Parameters
axisNo Axis number (0 ... 2)
Returns
frequency Output: Frequency in Hz
"""
with self._lock:
return self._hw.getFrequency(axisNo)
def getPosition(self, axisNo):
"""Retrieves the current actuator position. For linear type actuators the position unit is m; for goniometers and rotators it is degree.
Parameters
axisNo Axis number (0 ... 2)
Returns
position Output: Current position [m] or [°]
"""
with self._lock:
return self._hw.getPosition(axisNo)
def measureCapacitance(self, axisNo):
"""Performs a measurement of the capacitance of the piezo motor and returns the result. If no motor is connected, the result will be 0. The function doesn't return before the measurement is complete; this will take a few seconds of time.
Parameters
axisNo Axis number (0 ... 2)
Returns
cap Output: Capacitance [F]
"""
with self._lock:
return self._hw.measureCapacitance(axisNo)
def selectActuator(self, axisNo, actuator):
"""Selects the actuator to be used for the axis from actuator presets.
Parameters
axisNo Axis number (0 ... 2)
actuator Actuator selection (0 ... 255)
0: ANPg101res
1: ANGt101res
2: ANPx51res
3: ANPx101res
4: ANPx121res
5: ANPx122res
6: ANPz51res
7: ANPz101res
8: ANR50res
9: ANR51res
10: ANR101res
11: Test
Returns
None
"""
with self._lock:
return self._hw.selectActuator(axisNo, actuator)
def setAmplitude(self, axisNo, amplitude):
"""Sets the amplitude parameter for an axis
Parameters
axisNo Axis number (0 ... 2)
amplitude Amplitude in V, internal resolution is 1 mV
Returns
None
"""
with self._lock:
return self._hw.setAmplitude(axisNo, amplitude)
def setAxisOutput(self, axisNo, enable, autoDisable):
"""Enables or disables the voltage output of an axis.
Parameters
axisNo Axis number (0 ... 2)
enable Enables (1) or disables (0) the voltage output.
autoDisable If the voltage output is to be deactivated automatically when end of travel is detected.
Returns
None
"""
with self._lock:
return self._hw.setAxisOutput(axisNo, enable, autoDisable)
def setDcVoltage(self, axisNo, voltage):
"""Sets the DC level on the voltage output when no sawtooth based motion is active.
Parameters
axisNo Axis number (0 ... 2)
voltage DC output voltage [V], internal resolution is 1 mV
Returns
None
"""
with self._lock:
return self._hw.setDcVoltage(axisNo, voltage)
def setFrequency(self, axisNo, frequency):
"""Sets the frequency parameter for an axis
Parameters
axisNo Axis number (0 ... 2)
frequency Frequency in Hz, internal resolution is 1 Hz
Returns
None
"""
with self._lock:
return self._hw.setFrequency(axisNo, frequency)
def setTargetPosition(self, axisNo, target):
"""Sets the target position for automatic motion, see ANC_startAutoMove. For linear type actuators the position unit is m, for goniometers and rotators it is degree.
Parameters
axisNo Axis number (0 ... 2)
target Target position [m] or [°]. Internal resulution is 1 nm or 1 µ°.
Returns
None
"""
with self._lock:
return self._hw.setTargetPosition(axisNo, target)
def setTargetRange(self, axisNo, targetRg):
"""Defines the range around the target position where the target is considered to be reached.
Parameters
axisNo Axis number (0 ... 2)
targetRg Target range [m] or [°]. Internal resulution is 1 nm or 1 µ°.
Returns
None
"""
with self._lock:
return self._hw.setTargetRange(axisNo, targetRg)
def startAutoMove(self, axisNo, enable, relative):
"""Switches automatic moving (i.e. following the target position) on or off
Parameters
axisNo Axis number (0 ... 2)
enable Enables (1) or disables (0) automatic motion
relative If the target position is to be interpreted absolute (0) or relative to the current position (1)
Returns
None
"""
with self._lock:
return self._hw.startAutoMove(axisNo, enable, relative)
def startContinuousMove(self, axisNo, start, backward):
"""Starts or stops continous motion in forward direction. Other kinds of motions are stopped.
Parameters
axisNo Axis number (0 ... 2)
start Starts (1) or stops (0) the motion
backward If the move direction is forward (0) or backward (1)
Returns
None
"""
with self._lock:
return self._hw.startContinuousMove(axisNo, start, backward)
def startSingleStep(self, axisNo, backward):
"""Triggers a single step in desired direction.
Parameters
axisNo Axis number (0 ... 2)
backward If the step direction is forward (0) or backward (1)
Returns
None
"""
with self._lock:
return self._hw.startSingleStep(axisNo, backward)
class Stopped(RuntimeError):
"""thread was stopped"""
class Axis(Drivable):
class Axis(HasIO, Drivable):
axis = Property('axis number', IntRange(0, 2), 0)
value = Parameter('axis position', FloatRange(unit='deg'))
frequency = Parameter('frequency', FloatRange(1, unit='Hz'), readonly=False)
@ -55,6 +385,8 @@ class Axis(Drivable):
default=0.25, readonly=False, group='step_mode')
prop = Parameter('factor for control loop', FloatRange(0, 1),
readonly=False, default=0.8, group='step_mode')
uri = 'ANC'
ioClass = IO
target_min = Limit()
target_max = Limit()
@ -73,14 +405,16 @@ class Axis(Drivable):
def initModule(self):
super().initModule()
self._stopped = threading.Event()
self._hw = HwAxis(self.axis)
if self.axis in IO.used_axes:
raise ConfigError(f'a module with axisNo={self.axis} already exists')
IO.used_axes.add(self.axis)
def initialReads(self):
self.read_info()
super().initialReads()
def shutdownModule(self):
self._hw.close()
IO.used_axes.discard(self.axis)
def read_value(self):
if self._thread:
@ -95,18 +429,18 @@ class Axis(Drivable):
return value
def read_frequency(self):
return self._hw.getFrequency()
return self.io.getFrequency(self.axis)
def write_frequency(self, value):
self._hw.setFrequency(value)
return self._hw.getFrequency()
self.io.setFrequency(self.axis, value)
return self.io.getFrequency(self.axis)
def read_amplitude(self):
return self._hw.getAmplitude()
return self.io.getAmplitude(self.axis)
def write_amplitude(self, value):
self._hw.setAmplitude(value)
return self._hw.getAmplitude()
self.io.setAmplitude(self.axis, value)
return self.io.getAmplitude(self.axis)
def read_statusbits(self):
self._get_status()
@ -122,7 +456,7 @@ class Axis(Drivable):
<in_error> is True when in error
<reason> is an error text, when in error, 'at target' or '' otherwise
"""
statusbits = self._hw.getAxisStatus()
statusbits = self.io.getAxisStatus(self.axis)
self.sensor_connected, self._output, moving, at_target, fwd_stuck, bwd_stuck, error = statusbits
self.statusbits = ''.join(k for k, v in zip('OTFBE', (self._output,) + statusbits[3:]) if v)
if error:
@ -170,7 +504,7 @@ class Axis(Drivable):
for i in range(9):
if i:
self._wait(0.001)
poslist.append(self._hw.getPosition() * self._scale)
poslist.append(self.io.getPosition(self.axis) * self._scale)
self._poslist = sorted(poslist)
return self._poslist[len(poslist) // 2] # median
@ -221,7 +555,7 @@ class Axis(Drivable):
except Exception as e:
self._status = ERROR, f'{type(e).__name__} - {e}'
finally:
self._hw.setAxisOutput(enable=0, autoDisable=0)
self.io.setAxisOutput(self.axis, enable=0, autoDisable=0)
self.setFastPoll(False)
self._stopped.clear()
self._thread = None
@ -241,16 +575,16 @@ class Axis(Drivable):
if not self.sensor_connected:
raise HardwareError('no sensor connected')
self._stop_thread()
self._hw.setTargetRange(self.tolerance / self._scale)
self.io.setTargetRange(self.axis, self.tolerance / self._scale)
if self.step_mode:
self.status = BUSY, 'changed target'
self._start_thread(self._run_drive, target)
else:
self._try_cnt = 0
self.setFastPoll(True, self.fast_interval)
self._hw.setTargetPosition(target / self._scale)
self._hw.setAxisOutput(enable=1, autoDisable=0)
self._hw.startAutoMove(enable=1, relative=0)
self.io.setTargetPosition(self.axis, target / self._scale)
self.io.setAxisOutput(self.axis, enable=1, autoDisable=0)
self.io.startAutoMove(self.axis, enable=1, relative=0)
self._moving_since = time.time()
self.status = self._get_status()
return target
@ -287,7 +621,7 @@ class Axis(Drivable):
if not steps:
return
previous = self._read_pos()
self._hw.setAxisOutput(enable=1, autoDisable=0)
self.io.setAxisOutput(self.axis, enable=1, autoDisable=0)
# wait for output is really on
for i in range(100):
self._wait(0.001)
@ -297,10 +631,10 @@ class Axis(Drivable):
else:
raise ValueError('can not switch on output')
for cnt in range(abs(steps)):
self._hw.setAxisOutput(enable=1, autoDisable=0)
self.io.setAxisOutput(self.axis, enable=1, autoDisable=0)
if not self._thread:
raise Stopped('stopped')
self._hw.startSingleStep(steps < 0)
self.io.startSingleStep(self.axis, steps < 0)
self._wait(1 / self.frequency)
self._get_status()
if cnt and not self._output:
@ -363,34 +697,10 @@ class Axis(Drivable):
self._status = WARN, 'calib step size failed'
self.read_status()
# def _measure_cap(self, event):
# """do cap measurement in a separate thread, as it may time out"""
# axistype = ['linear', 'gonio', 'rotator'][self._hw.getActuatorType()]
# name = self._hw.getActuatorName()
# self._info = f'{name} {axistype} timeout measuring capacitance'
# for _ in range(5):
# try:
# cap = self._hw.measureCapacitance() * 1e9
# break
# except Exception:
# pass
# self.info = f'{name} {axistype} {cap:.3g}nF'
# event.set()
@nopoll
def read_info(self):
"""read info from controller"""
axistype = ['linear', 'gonio', 'rotator'][self._hw.getActuatorType()]
name = self._hw.getActuatorName()
cap = self._hw.measureCapacitance() * 1e9
axistype = ['linear', 'gonio', 'rotator'][self.io.getActuatorType(self.axis)]
name = self.io.getActuatorName(self.axis)
cap = self.io.measureCapacitance(self.axis) * 1e9
return f'{name} {axistype} {cap:.3g}nF'
# self._hw = Positioner()
# event = threading.Event()
# self._start_thread(self._measure_cap, event)
# t = time.time()
# if not event.wait(0.25):
# print('CAP TIMEOUT')
# return self._info
# print('CAP', time.time() - t)
# return self.info

View File

@ -1,45 +0,0 @@
"""wrapper around PyANC350v4
main purpose: make Positioner thread safe
without this, when accessing the device from different threads, timeouts may appear
allows also to omit axis in method arguments
"""
import inspect
import threading
# make sure PYTHONPATH points to the directory of PyANC350v4.py
# e.g. in /home/l_samenv/Documents/anc350/Linux64/userlib/lib
from PyANC350v4 import Positioner
class HwAxis:
hw = None
lock = None
axes = set()
def __init__(self, axisNo):
self.axisNo = axisNo
if not self.axes:
self.axes.add(axisNo)
HwAxis.hw = Positioner()
HwAxis.lock = threading.Lock()
super().__init__()
def close(self):
self.axes.discard(self.axisNo)
if not self.axes:
self.hw.disconnect()
# wrap methods from Positioner and insert to HwAxis
for name, method in Positioner.__dict__.items():
try:
arguments = inspect.getfullargspec(method).args
except TypeError as e:
continue
if arguments[0:2] == ['self', 'axisNo']:
def wrapper(self, *args, _method=method, **kwds):
with self.lock:
return _method(self.hw, self.axisNo, *args, **kwds)
setattr(HwAxis, name, wrapper)