# ***************************************************************************** # This program is free software; you can redistribute it and/or modify it under # the terms of the GNU General Public License as published by the Free Software # Foundation; either version 2 of the License, or (at your option) any later # version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more # details. # # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # Module authors: # Markus Zolliker # ***************************************************************************** import sys import time from frappy.core import Drivable, Parameter, Command, Property, \ ERROR, WARN, BUSY, IDLE, Done, nopoll, Limit # from frappy.features import HasSimpleOffset from frappy.datatypes import IntRange, FloatRange, StringType, BoolType from frappy.errors import ConfigError, BadValueError sys.path.append('/home/l_samenv/Documents/anc350/Linux64/userlib/lib') from PyANC350v4 import Positioner DIRECTION_NAME = {1: 'forward', -1: 'backward'} class Axis(Drivable): axis = Property('axis number', IntRange(0, 2), 0) value = Parameter('axis position', FloatRange(unit='deg')) frequency = Parameter('frequency', FloatRange(1, unit='Hz'), readonly=False) amplitude = Parameter('amplitude', FloatRange(0, unit='V'), readonly=False) gear = Parameter('gear factor', FloatRange(), readonly=False, value=1) tolerance = Parameter('positioning tolerance', FloatRange(0, unit='$'), readonly=False, default=0.01) output = Parameter('enable output', BoolType(), readonly=False) info = Parameter('axis info', StringType()) statusbits = Parameter('status bits', StringType()) target_min = Limit() target_max = Limit() _hw = Positioner() _scale = 1 # scale for custom units _move_steps = 0 # number of steps to move (used by move command) SCALES = {'deg': 1, 'm': 1, 'mm': 1000, 'um': 1000000, 'µm': 1000000} _direction = 1 # move direction _idle_status = IDLE, '' _error_state = '' # empty string: no error _history = None _check_sensor = False _try_count = 0 __freeze_until = 0 def write_gear(self, value): self._scale = self.SCALES[self.parameters['value'].datatype.unit] * self.gear return value def startModule(self, start_events): super().startModule(start_events) start_events.queue(self.read_info) def check_value(self, value): """check if value allows moving in current direction""" if self._direction > 0: if value > self.target_limits[1]: raise BadValueError('above upper limit') elif value < self.target_limits[0]: raise BadValueError('below lower limit') def read_value(self): pos = self._hw.getPosition(self.axis) * self._scale if self.isBusy(): try: self.check_value(pos) except BadValueError as e: self._stop() self._idle_status = ERROR, str(e) return pos def read_frequency(self): return self._hw.getFrequency(self.axis) def write_frequency(self, value): self._hw.setFrequency(self.axis, value) return self._hw.getFrequency(self.axis) def read_amplitude(self): return self._hw.getAmplitude(self.axis) def write_amplitude(self, value): self._hw.setAmplitude(self.axis, value) return self._hw.getAmplitude(self.axis) def write_tolerance(self, value): self._hw.setTargetRange(self.axis, value / self._scale) return value def write_output(self, value): self._hw.setAxisOutput(self.axis, enable=value, autoDisable=0) return value def freeze_status(self, delay, code=BUSY, text='changed target'): """freeze status to the given value for the given delay""" self.__freeze_until = time.time() + delay self.status = code, text def read_status(self): if time.time() < self.__freeze_until: return self.status self.__freeze_until = 0 statusbits = self._hw.getAxisStatus(self.axis) sensor, self.output, moving, attarget, eot_fwd, eot_bwd, sensor_error = statusbits self.statusbits = ''.join((k for k, v in zip('SOMTFBE', statusbits) if v)) if self._move_steps: if not (eot_fwd or eot_bwd): return BUSY, 'moving by steps' if not sensor: self._error_state = 'no sensor connected' elif sensor_error: self._error_state = 'sensor error' elif eot_fwd: self._error_state = 'end of travel forward' elif eot_bwd: self._error_state = 'end of travel backward' else: if self._error_state and not DIRECTION_NAME[self._direction] in self._error_state: self._error_state = '' status_text = 'moving' if self._try_count == 0 else 'moving (retry %d)' % self._try_count if moving and self._history is not None: # history None: moving by steps self._history.append(self.value) if len(self._history) < 5: return BUSY, status_text beg = self._history.pop(0) if abs(beg - self.target) < self.tolerance: # reset normal tolerance self._stop() self._idle_status = IDLE, 'in tolerance' return self._idle_status # self._hw.setTargetRange(self.axis, self.tolerance / self._scale) if (self.value - beg) * self._direction > 0: return BUSY, status_text self._try_count += 1 if self._try_count < 10: self.log.warn('no progress retry %d', self._try_count) return BUSY, status_text self._idle_status = WARN, 'no progress' if self._error_state: self._try_count += 1 if self._try_count < 10 and self._history is not None: self.log.warn('end of travel retry %d', self._try_count) self.write_target(self.target) return Done self._idle_status = WARN, self._error_state if self.status[0] != IDLE: self._stop() return self._idle_status def write_target(self, value): if value == self.read_value(): return value # self.check_limits(value) self._try_count = 0 self._direction = 1 if value > self.value else -1 # if self._error_state and DIRECTION_NAME[-self._direction] not in self._error_state: # raise BadValueError('can not move (%s)' % self._error_state) self._move_steps = 0 self.write_output(1) # try first with 50 % of tolerance self._hw.setTargetRange(self.axis, self.tolerance * 0.5 / self._scale) for itry in range(5): try: self._hw.setTargetPosition(self.axis, value / self._scale) self._hw.startAutoMove(self.axis, enable=1, relative=0) except Exception as e: if itry == 4: raise self.log.warn('%r', e) self._history = [self.value] self._idle_status = IDLE, '' self.freeze_status(1) self.setFastPoll(True, 1) return value def doPoll(self): if self._move_steps == 0: super().doPoll() return self._hw.startSingleStep(self.axis, self._direction < 0) self._move_steps -= self._direction if self._move_steps % int(self.frequency) == 0: # poll value and status every second super().doPoll() @nopoll def read_info(self): """read info from controller""" cap = self._hw.measureCapacitance(self.axis) * 1e9 axistype = ['linear', 'gonio', 'rotator'][self._hw.getActuatorType(self.axis)] return '%s %s %.3gnF' % (self._hw.getActuatorName(self.axis), axistype, cap) def _stop(self): self._move_steps = 0 self._history = None for _ in range(5): try: self._hw.startAutoMove(self.axis, enable=0, relative=0) break except Exception as e: if itry == 4: raise self.log.warn('%r', e) self._hw.setTargetRange(self.axis, self.tolerance / self._scale) self.setFastPoll(False) @Command() def stop(self): self._idle_status = IDLE, 'stopped' if self.isBusy() else '' self._stop() self.status = self._idle_status @Command(IntRange()) def move(self, value): """relative move by number of steps""" self._direction = 1 if value > 0 else -1 self.check_value(self.value) self._history = None if DIRECTION_NAME[self._direction] in self._error_state: raise BadValueError('can not move (%s)' % self._error_state) self._move_steps = value self._idle_status = IDLE, '' self.read_status() self.setFastPoll(True, 1/self.frequency)