add attocube

This commit is contained in:
l_samenv 2022-12-02 08:54:26 +01:00
parent 00b0a54f12
commit e4c1f35e4a

243
secop_psi/attocube.py Normal file
View File

@ -0,0 +1,243 @@
# *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
# *****************************************************************************
import sys
import time
from secop.core import Drivable, Parameter, Command, Property, ERROR, BUSY, IDLE, Done, nopoll
from secop.features import HasLimits
from secop.datatypes import IntRange, FloatRange, StringType, BoolType
from secop.errors import ConfigError, BadValueError
sys.path.append('/home/l_samenv/Documents/anc350/Linux64/userlib/lib')
from PyANC350v4 import Positioner
DIRECTION_NAME = {1: 'forward', -1: 'backward'}
class FreezeStatus:
"""freeze status for some time
hardware quite often does not treat status correctly: on a target change it
may take some time to return the 'busy' status correctly.
in classes with this mixin, within :meth:`write_target` call
self.freeze_status(0.5, BUSY, 'changed target')
a wrapper around read_status will take care that the status will be the given value,
for at least the given delay. This does NOT cover the case when self.status is set
directly from an other method.
"""
__freeze_status_until = 0
def __init_subclass__(cls):
def wrapped(self, inner=cls.read_status):
if time.time() < self.__freeze_status_until:
return Done
return inner(self)
cls.read_status = wrapped
super().__init_subclass__()
def freeze_status(self, delay, code=BUSY, text='changed target'):
"""freezze status to the given value for the given delay"""
self.__freeze_status_until = time.time() + delay
self.status = code, text
class Axis(HasLimits, FreezeStatus, Drivable):
axis = Property('axis number', IntRange(0, 2), 0)
value = Parameter('axis position', FloatRange(unit='deg'))
frequency = Parameter('frequency', FloatRange(1, unit='Hz'), readonly=False)
amplitude = Parameter('amplitude', FloatRange(0, unit='V'), readonly=False)
gear = Parameter('gear factor', FloatRange(), readonly=False, default=1, initwrite=True)
tolerance = Parameter('positioning tolerance', FloatRange(0, unit='$'), readonly=False, default=0.01)
output = Parameter('enable output', BoolType(), readonly=False)
info = Parameter('axis info', StringType())
statusbits = Parameter('status bits', StringType())
_hw = Positioner()
_scale = 1 # scale for custom units
_move_steps = 0 # number of steps to move (used by move command)
SCALES = {'deg': 1, 'm': 1, 'mm': 1000, 'um': 1000000, 'µm': 1000000}
_direction = 1 # move direction
_idle_status = IDLE, ''
_error_state = '' # empty string: no error
_history = None
_check_sensor = False
_try_count = 0
def __init__(self, name, logger, opts, srv):
unit = opts.pop('unit', 'deg')
opts['value.unit'] = unit
try:
self._scale = self.SCALES[unit] * opts.get('gear', 1)
except KeyError as e:
print(repr(e))
raise ConfigError('unsupported unit: %s' % unit)
super().__init__(name, logger, opts, srv)
def write_gear(self, value):
self._scale = self.SCALES[self.parameters['value'].datatype.unit] * self.gear
return value
def startModule(self, start_events):
super().startModule(start_events)
start_events.queue(self.read_info)
def check_value(self, value):
"""check if value allows moving in current driection"""
if self._direction > 0:
if value > self.target_limits[1]:
raise BadValueError('above upper limit')
elif value < self.target_limits[0]:
raise BadValueError('below lower limit')
def read_value(self):
pos = self._hw.getPosition(self.axis) * self._scale
if self.isBusy():
try:
self.check_value(pos)
except BadValueError as e:
self._stop()
self._idle_status = ERROR, str(e)
return pos
def read_frequency(self):
return self._hw.getFrequency(self.axis)
def write_frequency(self, value):
self._hw.setFrequency(self.axis, value)
return self._hw.getFrequency(self.axis)
def read_amplitude(self):
return self._hw.getAmplitude(self.axis)
def write_amplitude(self, value):
self._hw.setAmplitude(self.axis, value)
return self._hw.getAmplitude(self.axis)
def write_tolerance(self, value):
self._hw.setTargetRange(self.axis, value / self._scale)
return value
def write_output(self, value):
self._hw.setAxisOutput(self.axis, enable=value, autoDisable=0)
def read_status(self):
statusbits = self._hw.getAxisStatus(self.axis)
sensor, self.output, moving, attarget, eot_fwd, eot_bwd, sensor_error = statusbits
self.statusbits = ''.join((k for k, v in zip('SOMTFBE', statusbits) if v))
if self._move_steps:
if not (eot_fwd or eot_bwd):
return BUSY, 'moving by steps'
if not sensor:
self._error_state = 'no sensor connected'
elif sensor_error:
self._error_state = 'sensor error'
elif eot_fwd:
self._error_state = 'end of travel forward'
elif eot_bwd:
self._error_state = 'end of travel backward'
else:
if self._error_state and not DIRECTION_NAME[self._direction] in self._error_state:
self._error_state = ''
status_text = 'moving' if self._try_count == 0 else 'moving (retry %d)' % self._try_count
if moving and self._history is not None: # history None: moving by steps
self._history.append(self.value)
if len(self._history) < 5:
return BUSY, status_text
beg = self._history.pop(0)
if abs(beg - self.target) < self.tolerance:
# reset normal tolerance
self._hw.setTargetRange(self.axis, self.tolerance / self._scale)
if (self.value - beg) * self._direction > 0:
return BUSY, status_text
self._idle_status = ERROR, 'no progress'
if self._error_state:
self._try_count += 1
if self._try_count < 10 and self._history is not None:
self.write_target(self.target)
return Done
self._idle_status = ERROR, self._error_state
if self.status[0] != IDLE:
self._stop()
return self._idle_status
def write_target(self, value):
if value == self.read_value():
return value
self.check_limits(value)
self._try_count = 0
self._direction = 1 if value > self.value else -1
# if self._error_state and DIRECTION_NAME[-self._direction] not in self._error_state:
# raise BadValueError('can not move (%s)' % self._error_state)
self._move_steps = 0
self.write_output(1)
# try first with 50 % of tolerance
self._hw.setTargetRange(self.axis, self.tolerance * 0.5 / self._scale)
self._hw.setTargetPosition(self.axis, value / self._scale)
self._hw.startAutoMove(self.axis, enable=1, relative=0)
self._history = [self.value]
self._idle_status = IDLE, ''
self.freeze_status(1, BUSY, 'changed target')
self.setFastPoll(True, 1)
return value
def doPoll(self):
if self._move_steps == 0:
super().doPoll()
return
self._hw.startSingleStep(self.axis, self._direction < 0)
self._move_steps -= self._direction
if self._move_steps % int(self.frequency) == 0: # poll value and status every second
super().doPoll()
@nopoll
def read_info(self):
"""read info from controller"""
cap = self._hw.measureCapacitance(self.axis) * 1e9
axistype = ['linear', 'gonio', 'rotator'][self._hw.getActuatorType(self.axis)]
return '%s %s %.3gnF' % (self._hw.getActuatorName(self.axis), axistype, cap)
def _stop(self):
self._move_steps = 0
self._history = None
self._hw.startAutoMove(self.axis, enable=0, relative=0)
self._hw.setTargetRange(self.axis, self.tolerance / self._scale)
self.setFastPoll(False)
@Command()
def stop(self):
self._idle_status = IDLE, 'stopped' if self.isBusy() else ''
self._stop()
self.status = self._idle_status
@Command(IntRange())
def move(self, value):
"""relative move by number of steps"""
self._direction = 1 if value > 0 else -1
self.check_value(self.value)
self._history = None
if DIRECTION_NAME[self._direction] in self._error_state:
raise BadValueError('can not move (%s)' % self._error_state)
self._move_steps = value
self._idle_status = IDLE, ''
self.read_status()
self.setFastPoll(True, 1/self.frequency)