# ***************************************************************************** # This program is free software; you can redistribute it and/or modify it under # the terms of the GNU General Public License as published by the Free Software # Foundation; either version 2 of the License, or (at your option) any later # version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more # details. # # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # Module authors: # Markus Zolliker # ***************************************************************************** import time import threading from frappy.core import Drivable, Parameter, Command, Property, \ ERROR, WARN, BUSY, IDLE, nopoll, Limit from frappy.datatypes import IntRange, FloatRange, StringType, BoolType from frappy.errors import BadValueError, HardwareError from frappy_psi.pyanc350 import HwAxis class Stopped(RuntimeError): """thread was stopped""" class Axis(Drivable): axis = Property('axis number', IntRange(0, 2), 0) value = Parameter('axis position', FloatRange(unit='deg')) frequency = Parameter('frequency', FloatRange(1, unit='Hz'), readonly=False) amplitude = Parameter('amplitude', FloatRange(0, unit='V'), readonly=False) gear = Parameter('gear factor', FloatRange(), readonly=False, value=1) tolerance = Parameter('positioning tolerance', FloatRange(0, unit='$'), readonly=False, default=0.01) sensor_connected = Parameter('a sensor is connected', BoolType()) info = Parameter('axis info', StringType()) statusbits = Parameter('status bits', StringType()) step_mode = Parameter('step mode (soft closed loop)', BoolType(), default=False, readonly=False, group='step_mode') timeout = Parameter('timeout after no progress detected', FloatRange(0), default=1, readonly=False, group='step_mode') steps_fwd = Parameter('forward steps / main unit', FloatRange(0), unit='$/s', default=0, readonly=False, group='step_mode') steps_bwd = Parameter('backward steps / main unit', FloatRange(0, unit='$/s'), default=0, readonly=False, group='step_mode') delay = Parameter('delay between tries within loop', FloatRange(0, unit='s'), readonly=False, default=0.05, group='step_mode') maxstep = Parameter('max. step duration', FloatRange(0, unit='s'), default=0.25, readonly=False, group='step_mode') prop = Parameter('factor for control loop', FloatRange(0, 1), readonly=False, default=0.8, group='step_mode') target_min = Limit() target_max = Limit() fast_interval = 0.25 _hw = None _scale = 1 # scale for custom units SCALES = {'deg': 1, 'm': 1, 'mm': 1000, 'um': 1000000, 'µm': 1000000} _thread = None _moving_since = 0 _status = IDLE, '' _calib_range = None _try_cnt = 0 _at_target = False def initModule(self): super().initModule() self._stopped = threading.Event() self._hw = HwAxis(self.axis) def initialReads(self): self.read_info() super().initialReads() def shutdownModule(self): self._hw.close() def read_value(self): if self._thread: return self.value try: return self._read_pos() except Stopped: return self.value def write_gear(self, value): self._scale = self.SCALES[self.parameters['value'].datatype.unit] * self.gear return value def read_frequency(self): return self._hw.getFrequency() def write_frequency(self, value): self._hw.setFrequency(value) return self._hw.getFrequency() def read_amplitude(self): return self._hw.getAmplitude() def write_amplitude(self, value): self._hw.setAmplitude(value) return self._hw.getAmplitude() def read_statusbits(self): self._get_status() return self.statusbits def _get_status(self): """get axis status - update self.sensor_connected and self.statusbits - return , , is True whn moving is True when in error is an error text, when in error, 'at target' or '' otherwise """ statusbits = self._hw.getAxisStatus() self.sensor_connected, self._output, moving, at_target, fwd_stuck, bwd_stuck, error = statusbits self.statusbits = ''.join(k for k, v in zip('OTFBE', (self._output,) + statusbits[3:]) if v) if error: return ERROR, 'other error' if bwd_stuck: return ERROR, 'end of travel backward' if fwd_stuck: return ERROR, 'end of travel forward' target_reached = at_target > self._at_target self._at_target = at_target if self._moving_since: if target_reached: return IDLE, 'at target' if time.time() < self._moving_since + 0.25: return BUSY, 'started' if at_target: return IDLE, 'at target' if moving and self._output: return BUSY, 'moving' return WARN, 'stopped by unknown reason' if self._moving_since is False: return IDLE, 'stopped' if not self.step_mode and at_target: return IDLE, 'at target' return IDLE, '' def read_status(self): status = self._get_status() if self.step_mode: return self._status if self._moving_since: if status[0] != BUSY: self._moving_since = 0 self.setFastPoll(False) return status def _wait(self, delay): if self._stopped.wait(delay): raise Stopped() def _read_pos(self): if not self.sensor_connected: return 0 poslist = [] for i in range(9): if i: self._wait(0.001) poslist.append(self._hw.getPosition() * self._scale) self._poslist = sorted(poslist) return self._poslist[len(poslist) // 2] # median def _run_drive(self, target): self.value = self._read_pos() self.status = self._status = BUSY, 'drive by steps' deadline = time.time() + self.timeout max_steps = self.maxstep * self.frequency while True: for _ in range(2): dif = target - self.value steps_per_unit = self.steps_bwd if dif < 0 else self.steps_fwd tol = max(self.tolerance, 0.6 / steps_per_unit) # avoid a tolerance less than 60% of a step if abs(dif) > tol * 3: break # extra wait time when already close self._wait(2 * self.delay) self.read_value() status = None if abs(dif) < tol: status = IDLE, 'in tolerance' elif self._poslist[2] <= target <= self._poslist[-3]: # target within noise status = IDLE, 'within noise' elif dif > 0: steps = min(max_steps, min(dif, (dif + tol) * self.prop) * steps_per_unit) else: steps = max(-max_steps, max(dif, (dif - tol) * self.prop) * steps_per_unit) if status or steps == 0: self._status = status break if round(steps) == 0: # this should not happen self._status = WARN, 'steps=0' break self._move_steps(steps) if self._step_size > self.prop * 0.25 / steps_per_unit: # some progress happened deadline = time.time() + self.timeout elif time.time() > deadline: self._status = WARN, 'timeout - no progress' break self.read_status() def _thread_wrapper(self, func, *args): try: func(*args) except Stopped as e: self._status = IDLE, str(e) except Exception as e: self._status = ERROR, f'{type(e).__name__} - {e}' finally: self._hw.setAxisOutput(enable=0, autoDisable=0) self.setFastPoll(False) self._stopped.clear() self._thread = None def _stop_thread(self): if self._thread: self._stopped.set() self._thread.join() def _start_thread(self, *args): self._stop_thread() thread = threading.Thread(target=self._thread_wrapper, args=args) self._thread = thread thread.start() def write_target(self, target): if not self.sensor_connected: raise HardwareError('no sensor connected') self._stop_thread() self._hw.setTargetRange(self.tolerance / self._scale) if self.step_mode: self.status = BUSY, 'changed target' self._start_thread(self._run_drive, target) else: self._try_cnt = 0 self.setFastPoll(True, self.fast_interval) self._hw.setTargetPosition(target / self._scale) self._hw.setAxisOutput(enable=1, autoDisable=0) self._hw.startAutoMove(enable=1, relative=0) self._moving_since = time.time() self.status = self._get_status() return target @Command() def stop(self): if self.step_mode: self._stop_thread() self._status = IDLE, 'stopped' elif self._moving_since: self._moving_since = False # indicate stop self.read_status() @Command(IntRange()) def move(self, steps): """relative move by number of steps""" self._stop_thread() self.read_value() if steps > 0: if self.value > self.target_max: raise BadValueError('above upper limit') elif self.value < self.target_min: raise BadValueError('below lower limit') self.status = self._status = BUSY, 'moving relative' self._start_thread(self._run_move, steps) def _run_move(self, steps): self.setFastPoll(True, self.fast_interval) self._move_steps(steps) self.status = self._status = IDLE, '' def _move_steps(self, steps): steps = round(steps) if not steps: return previous = self._read_pos() self._hw.setAxisOutput(enable=1, autoDisable=0) # wait for output is really on for i in range(100): self._wait(0.001) self._get_status() if self._output: break else: raise ValueError('can not switch on output') for cnt in range(abs(steps)): self._hw.setAxisOutput(enable=1, autoDisable=0) if not self._thread: raise Stopped('stopped') self._hw.startSingleStep(steps < 0) self._wait(1 / self.frequency) self._get_status() if cnt and not self._output: steps = cnt break self._wait(self.delay) self.value = self._read_pos() self._step_size = (self.value - previous) / steps @Command(IntRange(0)) def calib_steps(self, delta): """calibrate steps_fwd and steps_bwd using steps forwards and backwards""" if not self.sensor_connected: raise HardwareError('no sensor connected') self._stop_thread() self._status = BUSY, 'calibrate step size' self.read_status() self._start_thread(self._run_calib, delta) def _run_calib(self, steps): self.value = self._read_pos() if self._calib_range is None or abs(self.target - self.value) > self._calib_range: self.target = self.value maxfwd = 0 maxbwd = 0 cntfwd = 0 cntbwd = 0 self._calib_range = 0 for i in range(10): if self.value <= self.target: self._status = BUSY, 'move forwards' self.read_status() self._move_steps(steps) while True: self._move_steps(steps) if self._step_size and self._output: maxfwd = max(maxfwd, self._step_size) cntfwd += 1 if self.value > self.target: break else: self._status = BUSY, 'move backwards' self.read_status() self._move_steps(-steps) while True: self._move_steps(-steps) if self._step_size: maxbwd = max(maxbwd, self._step_size) cntbwd += 1 if self.value < self.target: break # keep track how far we had to go for calibration self._calib_range = max(self._calib_range, abs(self.value - self.target)) if cntfwd >= 3 and cntbwd >= 3: self.steps_fwd = 1 / maxfwd self.steps_bwd = 1 / maxbwd self._status = IDLE, 'calib step size done' break else: self._status = WARN, 'calib step size failed' self.read_status() # def _measure_cap(self, event): # """do cap measurement in a separate thread, as it may time out""" # axistype = ['linear', 'gonio', 'rotator'][self._hw.getActuatorType()] # name = self._hw.getActuatorName() # self._info = f'{name} {axistype} timeout measuring capacitance' # for _ in range(5): # try: # cap = self._hw.measureCapacitance() * 1e9 # break # except Exception: # pass # self.info = f'{name} {axistype} {cap:.3g}nF' # event.set() @nopoll def read_info(self): """read info from controller""" axistype = ['linear', 'gonio', 'rotator'][self._hw.getActuatorType()] name = self._hw.getActuatorName() cap = self._hw.measureCapacitance() * 1e9 return f'{name} {axistype} {cap:.3g}nF' # self._hw = Positioner() # event = threading.Event() # self._start_thread(self._measure_cap, event) # t = time.time() # if not event.wait(0.25): # print('CAP TIMEOUT') # return self._info # print('CAP', time.time() - t) # return self.info