# ***************************************************************************** # This program is free software; you can redistribute it and/or modify it under # the terms of the GNU General Public License as published by the Free Software # Foundation; either version 2 of the License, or (at your option) any later # version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more # details. # # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # Module authors: # Markus Zolliker # ***************************************************************************** import sys import time import threading from frappy.core import Drivable, Parameter, Command, Property, \ ERROR, WARN, BUSY, IDLE, Done, nopoll, Limit # from frappy.features import HasSimpleOffset from frappy.datatypes import IntRange, FloatRange, StringType, BoolType from frappy.errors import ConfigError, BadValueError sys.path.append('/home/l_samenv/Documents/anc350/Linux64/userlib/lib') from PyANC350v4 import Positioner DIRECTION_NAME = {1: 'forward', -1: 'backward', 0: 'idle'} class DriveInfo: def __init__(self, value, target, status=(BUSY, 'changed target')): self.pos = value self.direction = -1 if target < value else 1 self.target = target self.status = status self.thread = None self.statusbits = '' self.output = False self.sensor_connected = False class Axis(Drivable): axis = Property('axis number', IntRange(0, 2), 0) value = Parameter('axis position', FloatRange(unit='deg')) frequency = Parameter('frequency', FloatRange(1, unit='Hz'), readonly=False) amplitude = Parameter('amplitude', FloatRange(0, unit='V'), readonly=False) gear = Parameter('gear factor', FloatRange(), readonly=False, value=1) tolerance = Parameter('positioning tolerance', FloatRange(0, unit='$'), readonly=False, default=0.01) output = Parameter('enable output', BoolType(), readonly=False) info = Parameter('axis info', StringType()) statusbits = Parameter('status bits', StringType()) maxtry = Parameter('max. number of move tries', IntRange(), default=3, readonly=False) settling_time = Property('time to be spent wihtin tolerance', FloatRange(0), default=0.25) fast_interval = Property('waiting time with in internal drive polls', FloatRange(0), default=0.001) target_min = Limit() target_max = Limit() _bad_status_max = 0.05 _settling_time = 0.1 _hw = None _scale = 1 # scale for custom units _move_steps = 0 # number of steps to move (used by move command) SCALES = {'deg': 1, 'm': 1, 'mm': 1000, 'um': 1000000, 'µm': 1000000} _moving = None # None when not moving, else status text _idle_status = IDLE, '' _error_state = '' # empty string: no error # _history = None _check_sensor = False _drive_info = DriveInfo(0, 0, (IDLE, '')) STATUSBIT_NAMES = ['sensor', 'output', 'moving', 'at_target', 'fwd_stuck', 'bwd_stuck', 'error'] def initModule(self): super().initModule() # TODO: catch timeout self._hw = Positioner() def write_gear(self, value): self._scale = self.SCALES[self.parameters['value'].datatype.unit] * self.gear return value def startModule(self, start_events): # self._history = [0] super().startModule(start_events) start_events.queue(self.read_info) def _get_state(self, info): """get axis status - update _drive_info.output ans _drive_info.sensor_connected - return , , is True whn moving is True when in error is an error text, when in error, 'at target' or '' otherwise """ statusbits = self._hw.getAxisStatus(self.axis) info.sensor_connected, info.output, moving, at_target, fwd_stuck, bwd_stuck, error = statusbits info.statusbits = statusbits if error: return False, True, 'other error' if bwd_stuck: return False, True, 'end of travel backward' if bwd_stuck: return False, True, 'end of travel forward' if at_target: return False, False, 'at target' if moving: #if not info.output: # return True, True, 'output off' return True, False, '' return False, False, '' def _drive_thread(self, info): try: for ntry in range(self.maxtry, 0, -1): self._hw.setAxisOutput(self.axis, enable=True, autoDisable=0) self._hw.setTargetRange(self.axis, 0 / self._scale) self._hw.setTargetPosition(self.axis, info.target / self._scale) self._hw.startAutoMove(self.axis, enable=1, relative=0) inside_since = 0 saved_inside_time = 0 last_inside = 0 start_time = time.time() status = None while info.thread: time.sleep(self.fast_interval) moving, in_error, reason = self._get_state(info) if time.time() > start_time + 0.05: if reason: if in_error: status = ERROR, reason else: status = IDLE, reason break if not moving: status = WARN, 'stopped by unknown reason' break info.pos = self._hw.getPosition(self.axis) * self._scale self.check_value(info.pos) if abs(info.pos - info.target) < self.tolerance: last_inside = time.time() if not inside_since: inside_since = last_inside - saved_inside_time if last_inside > inside_since + self._settling_time: info.status = IDLE, 'in tolerance' self.doPoll() return else: info.status = BUSY, 'driving' if inside_since: saved_inside_time = last_inside - inside_since inside_since = 0 if ntry == 1: info.status = status else: info.status = BUSY, f'retry after {status[1]} ({ntry})' self.doPoll() except Exception as e: info.status = ERROR, repr(e) finally: info.thread = None def check_value(self, value): """check if value allows moving in current direction""" if self._drive_info.direction > 0: if value > self.target_max: raise BadValueError('above upper limit') elif value < self.target_min: raise BadValueError('below lower limit') def read_value(self): drv = self._drive_info if drv.thread: return drv.pos return self._hw.getPosition(self.axis) * self._scale def read_frequency(self): return self._hw.getFrequency(self.axis) def write_frequency(self, value): self._hw.setFrequency(self.axis, value) return self._hw.getFrequency(self.axis) def read_amplitude(self): return self._hw.getAmplitude(self.axis) def write_amplitude(self, value): self._hw.setAmplitude(self.axis, value) return self._hw.getAmplitude(self.axis) #def write_tolerance(self, value): # self._hw.setTargetRange(self.axis, value / self._scale) # return value def write_output(self, value): self._hw.setAxisOutput(self.axis, enable=value, autoDisable=0) return value def read_status(self): drv = self._drive_info if not drv.thread: self._get_state(drv) self.setFastPoll(False) self.statusbits = ''.join(k for k, v in zip('SOMTFBE', drv.statusbits) if v) return drv.status def _start_motor(self, target): self._stop_thread() self._drive_info = DriveInfo(self.value, target) self._drive_info.thread = threading.Thread(target=self._drive_thread, args=(self._drive_info,)) self._drive_info.thread.start() def _stop_thread(self): try: drv = self._drive_info drv.thread.join() except AttributeError: pass def _stop(self): self._move_steps = 0 self.setFastPoll(False) self._stop_thread() self.read_status() def write_target(self, target): # self.check_limits(value) # self._try_count = 0 self._move_steps = 0 self._start_motor(target) # self._history = [self.value] self.setFastPoll(True, 0.25) return target @nopoll def read_info(self): """read info from controller""" cap = self._hw.measureCapacitance(self.axis) * 1e9 axistype = ['linear', 'gonio', 'rotator'][self._hw.getActuatorType(self.axis)] return '%s %s %.3gnF' % (self._hw.getActuatorName(self.axis), axistype, cap) @Command() def stop(self): self._idle_status = IDLE, 'stopped' if self.isBusy() else '' self._stop() self.status = self._idle_status @Command(IntRange()) def move(self, value): """relative move by number of steps""" self._direction = 1 if value > 0 else -1 self.check_value(self.value) if DIRECTION_NAME[self._direction] in self._error_state: raise BadValueError('can not move (%s)' % self._error_state) self._move_steps = value self._idle_status = IDLE, '' self.read_status() self.setFastPoll(True, 1/self.frequency) @Command(IntRange()) def automove(self, flag): """switch automove and output""" self._hw.setAxisOutput(self.axis, enable=flag, autoDisable=0) self._hw.startAutoMove(self.axis, enable=flag, relative=0)