# ***************************************************************************** # This program is free software; you can redistribute it and/or modify it under # the terms of the GNU General Public License as published by the Free Software # Foundation; either version 2 of the License, or (at your option) any later # version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more # details. # # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # Module authors: # Markus Zolliker # ***************************************************************************** import sys import time import threading from frappy.core import Drivable, Parameter, Command, Property, \ ERROR, WARN, BUSY, IDLE, nopoll, Limit, TupleOf # from frappy.features import HasSimpleOffset from frappy.datatypes import IntRange, FloatRange, StringType, BoolType from frappy.errors import BadValueError sys.path.append('/home/l_samenv/Documents/anc350/Linux64/userlib/lib') from PyANC350v4 import Positioner class Stopped(RuntimeError): """thread was stopped""" class DriveInfo: def __init__(self, value, target, status=(BUSY, 'changed target')): self.pos = value self.direction = -1 if target < value else 1 self.target = target self.status = status self.thread = None self.statusbits = '' self.output = False self.sensor_connected = False class Axis(Drivable): axis = Property('axis number', IntRange(0, 2), 0) value = Parameter('axis position', FloatRange(unit='deg')) frequency = Parameter('frequency', FloatRange(1, unit='Hz'), readonly=False) amplitude = Parameter('amplitude', FloatRange(0, unit='V'), readonly=False) gear = Parameter('gear factor', FloatRange(), readonly=False, value=1) tolerance = Parameter('positioning tolerance', FloatRange(0, unit='$'), readonly=False, default=0.01) output = Parameter('enable output', BoolType(), readonly=False) info = Parameter('axis info', StringType()) statusbits = Parameter('status bits', StringType()) step_mode = Parameter('step mode (soft closed loop)', BoolType(), default=False, readonly=False, group='step_mode') maxtry = Parameter('max. number of move tries', IntRange(), default=5, readonly=False, group='step_mode') steps_fwd = Parameter('forward steps / main unit', FloatRange(0), default=0, readonly=False, group='step_mode') steps_bwd = Parameter('backward steps / main unit', FloatRange(0), default=0, readonly=False, group='step_mode') delay = Parameter('delay between tries within loop', FloatRange(0), readonly=False, default=0.05, group='step_mode') prop = Parameter('factor for control loop', FloatRange(0, 1), readonly=False, default=0.8, group='step_mode') target_min = Limit() target_max = Limit() fast_interval = 0.25 _hw = None _scale = 1 # scale for custom units SCALES = {'deg': 1, 'm': 1, 'mm': 1000, 'um': 1000000, 'µm': 1000000} _thread = None _moving_since = 0 _output = False _sensor_connected = False _status = IDLE, '' _statusbits = None def initModule(self): super().initModule() self._stopped = threading.Event() # TODO: catch timeout self._hw = Positioner() def write_gear(self, value): self._scale = self.SCALES[self.parameters['value'].datatype.unit] * self.gear return value def initialReads(self): self.read_info() super().initialReads() def _get_status(self): """get axis status - update _output and _sensor_connected - return , , is True whn moving is True when in error is an error text, when in error, 'at target' or '' otherwise """ statusbits = self._hw.getAxisStatus(self.axis) self._sensor_connected, self._output, moving, at_target, fwd_stuck, bwd_stuck, error = statusbits self._statusbits = statusbits if error: return ERROR, 'other error' if bwd_stuck: return ERROR, 'end of travel backward' if bwd_stuck: return ERROR, 'end of travel forward' if self._moving_since: if time.time() < self._moving_since + 1: return BUSY, 'started' if at_target: self.setFastPoll(False) self._moving_since = 0 return IDLE, 'at target' if moving and self._output: return BUSY, 'moving' if not self._output: return WARN, 'switched output off by unknown reason' return WARN, 'switched moving off by unknown reason' return IDLE, '' def check_value(self, value, direction): """check if value allows moving in current direction""" if direction > 0: if value > self.target_max: raise BadValueError('above upper limit') elif value < self.target_min: raise BadValueError('below lower limit') def read_status(self): status = self._get_status() self.statusbits = ''.join(k for k, v in zip('SOMTFBE', self._statusbits) if v) if self.step_mode: return self._status return status def _wait(self, delay): if self._stopped.wait(delay): raise Stopped() def _read_pos(self): poslist = [] for i in range(9): if i: self._wait(0.001) poslist.append(self._hw.getPosition(self.axis) * self._scale) self._poslist = sorted(poslist) return self._poslist[len(poslist) // 2] # median def read_value(self): if self._thread: return self.value return self._read_pos() def read_frequency(self): return self._hw.getFrequency(self.axis) def write_frequency(self, value): self._hw.setFrequency(self.axis, value) return self._hw.getFrequency(self.axis) def read_amplitude(self): return self._hw.getAmplitude(self.axis) def write_amplitude(self, value): self._hw.setAmplitude(self.axis, value) return self._hw.getAmplitude(self.axis) def write_output(self, value): if self._thread: if not value: self.stop() else: self._hw.setAxisOutput(self.axis, enable=value, autoDisable=0) self._output = value return value def read_output(self): return self._output def _stop_thread(self): if self._thread: self._thread.join() def _run_drive(self, target): self.value = self._read_pos() self.status = self._status = BUSY, 'drive by steps' cnt = self.maxtry prev = 0 tol = self.tolerance while True: dif = target - self.value if target > self._poslist[2] + tol: # 78th percentile steps = max(1, min(dif, (dif + tol) * self.prop) * self.steps_fwd) elif target < self._poslist[-3] - tol: # 22th percentile steps = min(-1, max(dif, (dif - tol) * self.prop) * self.steps_bwd) else: self._status = IDLE, 'in tolerance' self.read_status() return if cnt <= 0: self._status = ERROR, 'too many tries' self.read_status() return if abs(steps) > prev * 0.7: cnt -= 1 prev = abs(steps) self._move_steps(steps) def _thread_wrapper(self, func, *args): try: func(*args) except Stopped as e: self._status = IDLE, str(e) except Exception as e: self._status = ERROR, f'{type(e).__name__} - {e}' finally: self._hw.setAxisOutput(self.axis, enable=0, autoDisable=0) self.setFastPoll(False) self._thread = None def _start_thread(self, *args): thread = threading.Thread(target=self._thread_wrapper, args=args) self._thread = thread thread.start() def write_target(self, target): self._stop_thread() self._hw.setTargetRange(self.axis, self.tolerance / self._scale) if self.step_mode: self.status = BUSY, 'changed target' self._start_thread(self._run_drive, target) else: self.setFastPoll(True, 0.25) self._hw.setTargetPosition(self.axis, target / self._scale) self._hw.setAxisOutput(self.axis, enable=0, autoDisable=0) self._hw.startAutoMove(self.axis, enable=1, relative=0) self._moving_since = time.time() self.status = self._get_status() return target @Command() def stop(self): self._stopped.set() self._stop_thread() self._status = IDLE, 'stopped' self.read_status() @Command(IntRange()) def move(self, value): """relative move by number of steps""" self.check_value(self.value, value) self._start_thread(self._run_move, value) def _move_steps(self, steps): steps = round(steps) if not steps: return previous = self._read_pos() self._hw.setAxisOutput(self.axis, enable=1, autoDisable=0) # wait for output is really on for i in range(100): self._wait(0.001) self._get_status() if self._output: break else: raise ValueError('can not switch on output') for i in range(abs(steps)): if not self._thread: raise Stopped('stopped') self._hw.startSingleStep(self.axis, steps < 0) self._wait(1 / self.frequency) self._get_status() if not self._output and i: steps = i self._step_size = 0 break self._wait(self.delay) self.value = pos = self._read_pos() if self._output: self._step_size = (pos - previous) / steps def _run_move(self, steps): self.setFastPoll(True, self.fast_interval) self._move_steps(steps) @Command(IntRange(0)) def calib_steps(self, delta): """relative move by number of steps""" self._stop_thread() self._start_thread(self._run_calib, delta) def _run_calib(self, steps): self.target = self.value = self._read_pos() maxfwd = 0 maxbwd = 0 cntfwd = 0 cntbwd = 0 for i in range(10): if self.value < self.target: self._move_steps(steps) self._move_steps(steps) while self.value < self.target: self._move_steps(steps) if self._step_size: maxfwd = max(maxfwd, self._step_size) cntfwd += 1 else: self._move_steps(-steps) self._move_steps(-steps) while self.value > self.target: self._move_steps(-steps) if self._step_size: maxbwd = max(maxbwd, self._step_size) cntbwd += 1 if cntfwd >= 3 and cntbwd >= 3: self.steps_fwd = 1 / maxfwd self.steps_bwd = 1 / maxbwd self._run_drive() self._status = IDLE, 'calib step size done' break else: self._status = WARN, 'calib step size failed' @nopoll def read_info(self): """read info from controller""" cap = self._hw.measureCapacitance(self.axis) * 1e9 axistype = ['linear', 'gonio', 'rotator'][self._hw.getActuatorType(self.axis)] return '%s %s %.3gnF' % (self._hw.getActuatorName(self.axis), axistype, cap)