diff --git a/secop_psi/attocube.py b/secop_psi/attocube.py new file mode 100644 index 0000000..2959827 --- /dev/null +++ b/secop_psi/attocube.py @@ -0,0 +1,243 @@ +# ***************************************************************************** +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free Software +# Foundation; either version 2 of the License, or (at your option) any later +# version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# this program; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# Module authors: +# Markus Zolliker +# ***************************************************************************** + +import sys +import time +from secop.core import Drivable, Parameter, Command, Property, ERROR, BUSY, IDLE, Done, nopoll +from secop.features import HasLimits +from secop.datatypes import IntRange, FloatRange, StringType, BoolType +from secop.errors import ConfigError, BadValueError +sys.path.append('/home/l_samenv/Documents/anc350/Linux64/userlib/lib') +from PyANC350v4 import Positioner + + +DIRECTION_NAME = {1: 'forward', -1: 'backward'} + + +class FreezeStatus: + """freeze status for some time + + hardware quite often does not treat status correctly: on a target change it + may take some time to return the 'busy' status correctly. + + in classes with this mixin, within :meth:`write_target` call + + self.freeze_status(0.5, BUSY, 'changed target') + + a wrapper around read_status will take care that the status will be the given value, + for at least the given delay. This does NOT cover the case when self.status is set + directly from an other method. + """ + __freeze_status_until = 0 + + def __init_subclass__(cls): + def wrapped(self, inner=cls.read_status): + if time.time() < self.__freeze_status_until: + return Done + return inner(self) + + cls.read_status = wrapped + super().__init_subclass__() + + def freeze_status(self, delay, code=BUSY, text='changed target'): + """freezze status to the given value for the given delay""" + self.__freeze_status_until = time.time() + delay + self.status = code, text + + +class Axis(HasLimits, FreezeStatus, Drivable): + axis = Property('axis number', IntRange(0, 2), 0) + value = Parameter('axis position', FloatRange(unit='deg')) + frequency = Parameter('frequency', FloatRange(1, unit='Hz'), readonly=False) + amplitude = Parameter('amplitude', FloatRange(0, unit='V'), readonly=False) + gear = Parameter('gear factor', FloatRange(), readonly=False, default=1, initwrite=True) + tolerance = Parameter('positioning tolerance', FloatRange(0, unit='$'), readonly=False, default=0.01) + output = Parameter('enable output', BoolType(), readonly=False) + info = Parameter('axis info', StringType()) + statusbits = Parameter('status bits', StringType()) + + _hw = Positioner() + _scale = 1 # scale for custom units + _move_steps = 0 # number of steps to move (used by move command) + SCALES = {'deg': 1, 'm': 1, 'mm': 1000, 'um': 1000000, 'µm': 1000000} + _direction = 1 # move direction + _idle_status = IDLE, '' + _error_state = '' # empty string: no error + _history = None + _check_sensor = False + _try_count = 0 + + def __init__(self, name, logger, opts, srv): + unit = opts.pop('unit', 'deg') + opts['value.unit'] = unit + try: + self._scale = self.SCALES[unit] * opts.get('gear', 1) + except KeyError as e: + print(repr(e)) + raise ConfigError('unsupported unit: %s' % unit) + super().__init__(name, logger, opts, srv) + + def write_gear(self, value): + self._scale = self.SCALES[self.parameters['value'].datatype.unit] * self.gear + return value + + def startModule(self, start_events): + super().startModule(start_events) + start_events.queue(self.read_info) + + def check_value(self, value): + """check if value allows moving in current driection""" + if self._direction > 0: + if value > self.target_limits[1]: + raise BadValueError('above upper limit') + elif value < self.target_limits[0]: + raise BadValueError('below lower limit') + + def read_value(self): + pos = self._hw.getPosition(self.axis) * self._scale + if self.isBusy(): + try: + self.check_value(pos) + except BadValueError as e: + self._stop() + self._idle_status = ERROR, str(e) + return pos + + def read_frequency(self): + return self._hw.getFrequency(self.axis) + + def write_frequency(self, value): + self._hw.setFrequency(self.axis, value) + return self._hw.getFrequency(self.axis) + + def read_amplitude(self): + return self._hw.getAmplitude(self.axis) + + def write_amplitude(self, value): + self._hw.setAmplitude(self.axis, value) + return self._hw.getAmplitude(self.axis) + + def write_tolerance(self, value): + self._hw.setTargetRange(self.axis, value / self._scale) + return value + + def write_output(self, value): + self._hw.setAxisOutput(self.axis, enable=value, autoDisable=0) + + def read_status(self): + statusbits = self._hw.getAxisStatus(self.axis) + sensor, self.output, moving, attarget, eot_fwd, eot_bwd, sensor_error = statusbits + self.statusbits = ''.join((k for k, v in zip('SOMTFBE', statusbits) if v)) + if self._move_steps: + if not (eot_fwd or eot_bwd): + return BUSY, 'moving by steps' + if not sensor: + self._error_state = 'no sensor connected' + elif sensor_error: + self._error_state = 'sensor error' + elif eot_fwd: + self._error_state = 'end of travel forward' + elif eot_bwd: + self._error_state = 'end of travel backward' + else: + if self._error_state and not DIRECTION_NAME[self._direction] in self._error_state: + self._error_state = '' + status_text = 'moving' if self._try_count == 0 else 'moving (retry %d)' % self._try_count + if moving and self._history is not None: # history None: moving by steps + self._history.append(self.value) + if len(self._history) < 5: + return BUSY, status_text + beg = self._history.pop(0) + if abs(beg - self.target) < self.tolerance: + # reset normal tolerance + self._hw.setTargetRange(self.axis, self.tolerance / self._scale) + if (self.value - beg) * self._direction > 0: + return BUSY, status_text + self._idle_status = ERROR, 'no progress' + if self._error_state: + self._try_count += 1 + if self._try_count < 10 and self._history is not None: + self.write_target(self.target) + return Done + self._idle_status = ERROR, self._error_state + if self.status[0] != IDLE: + self._stop() + return self._idle_status + + def write_target(self, value): + if value == self.read_value(): + return value + self.check_limits(value) + self._try_count = 0 + self._direction = 1 if value > self.value else -1 + # if self._error_state and DIRECTION_NAME[-self._direction] not in self._error_state: + # raise BadValueError('can not move (%s)' % self._error_state) + self._move_steps = 0 + self.write_output(1) + # try first with 50 % of tolerance + self._hw.setTargetRange(self.axis, self.tolerance * 0.5 / self._scale) + self._hw.setTargetPosition(self.axis, value / self._scale) + self._hw.startAutoMove(self.axis, enable=1, relative=0) + self._history = [self.value] + self._idle_status = IDLE, '' + self.freeze_status(1, BUSY, 'changed target') + self.setFastPoll(True, 1) + return value + + def doPoll(self): + if self._move_steps == 0: + super().doPoll() + return + self._hw.startSingleStep(self.axis, self._direction < 0) + self._move_steps -= self._direction + if self._move_steps % int(self.frequency) == 0: # poll value and status every second + super().doPoll() + + @nopoll + def read_info(self): + """read info from controller""" + cap = self._hw.measureCapacitance(self.axis) * 1e9 + axistype = ['linear', 'gonio', 'rotator'][self._hw.getActuatorType(self.axis)] + return '%s %s %.3gnF' % (self._hw.getActuatorName(self.axis), axistype, cap) + + def _stop(self): + self._move_steps = 0 + self._history = None + self._hw.startAutoMove(self.axis, enable=0, relative=0) + self._hw.setTargetRange(self.axis, self.tolerance / self._scale) + self.setFastPoll(False) + + @Command() + def stop(self): + self._idle_status = IDLE, 'stopped' if self.isBusy() else '' + self._stop() + self.status = self._idle_status + + @Command(IntRange()) + def move(self, value): + """relative move by number of steps""" + self._direction = 1 if value > 0 else -1 + self.check_value(self.value) + self._history = None + if DIRECTION_NAME[self._direction] in self._error_state: + raise BadValueError('can not move (%s)' % self._error_state) + self._move_steps = value + self._idle_status = IDLE, '' + self.read_status() + self.setFastPoll(True, 1/self.frequency)