from distutils.command.install_egg_info import to_filename from re import S import time import subprocess from types import SimpleNamespace from enum import IntEnum from epics import ca # import slic from slic.core.adjustable import Adjustable, AdjustableError from slic.utils import typename from slic.utils.printing import printable_dict from slic.utils.hastyepics import get_pv as PV # from ..basedevice import BaseDevice from slic.core.device.basedevice import BaseDevice class AttocubeStage(BaseDevice): def __init__(self, name=None, **axis_ids): self.name = name self.axis_ids = axis_ids self.axes = {} for ax_name, ax_id in axis_ids.items(): record_name = f"{name}: {ax_name}" ax = AttocubeAxis(ax_id, name=record_name) setattr(self, ax_name, ax) self.axes[ax_name] = ax def __repr__(self): tname = typename(self) name = self.name head = f"{tname} \"{name}\"" to_print = {ax_name: ax.get_current_value() for ax_name, ax in self.axes.items()} return printable_dict(to_print, head) class AttocubeAxis(Adjustable): def __init__(self, ID, name=None, units=None, internal=False): super().__init__(ID, name=name, units=units, internal=internal) self.wait_time = 0.1 self.timeout = 60 self._move_requested = False self.pvs = SimpleNamespace( drive = PV(ID + "-SET_TARGET"), readback = PV(ID + "-POS"), amplitude = PV(ID + "-AMPL_SET"), amplitude_rb = PV(ID + "-AMPL_RB"), frequency = PV(ID + "-FREQ_SET"), frequency_rb = PV(ID + "-FREQ_RB"), moving_status = PV(ID + "-MOVING"), hlm = PV(ID + ":HLM"), llm = PV(ID + ":LLM"), set_pos = PV(ID + ":SET_POS"), stop = PV(ID + "-STOP_AUTO_CMD"), move = PV(ID + "-MV_ABS_SET"), stop_auto_cmd = PV(ID + "-STOP_AUTO_CMD"), target_tol_rb = PV(ID + "-TARGET_RANGE"), target_tol_set = PV(ID + "-SET_TARGET_RANGE"), target_reached = PV(ID + "-TARGET_REACHED_RB"), target_ground = PV(ID + "-TARGET_GND_SET"), output_enabled = PV(ID + "-ENABLE_SET"), twv = PV(ID + ":TWV"), units = PV(ID + "-UNIT"), output_RB = PV(ID + "-ENABLE_RB"), ) @property def units(self): units = self._units if units is not None: return units return self.pvs.units.get() @units.setter def units(self, value): self._units = value def disable_output(self,verbose=True): old_value = self.pvs.output_RB.value self.pvs.output_enabled.put(0) if verbose: if old_value == 1: print("Output has been disabled") else: print("Output has been off and stays that way") def enable_output(self,verbose=True): old_value = self.pvs.output_RB.value self.pvs.output_enabled.put(1) if verbose: if old_value == 0: print("Output has been enabled") else: print("Output has been on and stays that way") def get_ground_on_targert(self): return self.pvs.target_ground.value def get_amplitude(self,verbose=False): amplitude = self.pvs.amplitude_rb.value if verbose: print(f'Drive amplitude is {amplitude/1000} V') return amplitude/1000 def set_amplitude(self,value,verbose=False,check=False): """Set drive amplitude for the axis in Volts. Add check=True for checking if it happened (takes 2s).""" self.pvs.amplitude.put(value*1000) if check: time.sleep(2) assert self.pvs.amplitude_rb.value == value*1000, 'drive amplitude readback does not match set value' if verbose: print(f'Drive amplitude is set to {value} V') def get_frequency(self,verbose=False): frequency = self.pvs.frequency_rb.value if verbose: print(f'Drive frequency is {frequency} V') return frequency def set_frequency(self,value,verbose=False,check=False): """Set drive frequency for the axis in Hz. Add check=True for checking if it happened (takes 2s).""" self.pvs.frequency.put(value) if check: time.sleep(2) assert self.pvs.frequency_rb.value == value, 'drive frequency readback does not match set value' if verbose: print(f'Drive frequency is set to {value} Hz') def set_ground_on_target(self,value,verbose=True): if value == True or value == 1: self.pvs.target_ground.put(1) if verbose: print("grounding upon target arrival is now active") elif value == False or value == 0: self.pvs.target_ground.put(0) if verbose: print("grounding upon target arrival has been deactivated") else: raise AttocubeError(f"only true/false and 0/1 values are allowed, you entered value {value}") def get_target_tolerance(self): return self.pvs.target_tol_rb.value def set_target_tolerance(self,value): self.pvs.target_tol_set.put(value) print(f"target tolerance set to {value} {self.pvs.units.char_value}") def get_current_value(self, readback=True): if readback: return self.pvs.readback.get() else: return self.pvs.drive.get() def set_target_value(self, value): print(f"moving to {value}") wait_time = self.wait_time timeout = self.timeout + time.time() self._move_requested = True self.pvs.drive.put(value, wait=True) # Move button must be pressed to make the move self.set_move_allowed(True) # wait for start while self._move_requested and not self.is_moving(): time.sleep(wait_time) if time.time() >= timeout: tname = typename(self) self.stop() raise AttocubeError(f"starting to move {tname} \"{self.name}\" to {value} {self.pvs.units.char_value} timed out") # wait for move done while self._move_requested and self.is_moving(): if self.is_at_target(): # holding == arrived at target! break time.sleep(wait_time) self._move_requested = False print("move done") def stop(self): self._move_requested = False self.pvs.stop.put(1, wait=True) def is_output_on(self): return self.pvs.output_RB.value == 1 def is_moving(self): return self.pvs.moving_status.value == 1 def is_at_target(self,from_pv=False,tolerance = 0.3): """Check whether target was reached. If from_pv is True TARGET_REACHED_RB is used, else it's calculated as diffrence between the readback and set value within tolerance. Default is False, because otherwise tolerance can only be set and read out as an integer.""" if from_pv == True: return self.pvs.target_reached.value == 1 else: return abs( self.pvs.readback.get() - self.pvs.drive.get() ) < tolerance def allow_move(self): self.pvs.move.put(1, wait=True) def forbid_move(self): self.pvs.move.put(0, wait=True) def set_move_allowed(self,value): if value == True or value == 1: self.pvs.move.put(1) elif value == False or value == 0: self.pvs.move.put(0) else: raise AttocubeError(f"only true/false and 0/1 values are allowed, you entered value {value}") def within_epics_limits(self, val): low, high = self.get_epics_limits() return low <= val <= high def get_epics_limits(self): low = self.pvs.llm.get() high = self.pvs.hlm.get() return low, high def set_epics_limits(self, low, high, relative_to_current=False): low = -np.inf if low is None else low high = +np.inf if high is None else high if relative_to_current: val = self.get_current_value() low += val high += val self.pvs.llm.put(low) self.pvs.hlm.put(high) ### # Things to be desired from the epics module: # 1. Set SET_TARGET_RANGE as a float or somehow alow decimals to be entered # 2. Soft epics limits def move(self, val, relative=False, wait=True, ignore_limits=True, confirm_move=True, timeout=300.0, pos_check_delay=0.15, target_gnd=None, move_attempts = 5, move_attempt_sleep=4, verbose = False, tolerance=0.3): """ moves Attocube drive to position (emulating pyepics Motor class) arguments: ========== val value to move to (float) [Must be provided] relative move relative to current position (T/F) [F] wait whether to wait for move to complete (T/F) [F] ignore_limits try move without regard to limits (no epics limits implemented yet) (T/F) [T] confirm_move try to confirm that move has begun (T/F) [F] timeout max time for move to complete (in seconds) [300] pos_check_delay delay before checkig motor in position (in seconds) [0.15] move_attempts how many times should the move be attempted if not in limits (attocube often needs to be poked a few times for the last 1um) [5] move_attempt_sleep how long to wait before another move poke attept is done (in seconds) [4] tolerance when to consider target destination reached (in microns) [0.3] return values: ============== -13 : invalid value (cannot convert to float). Move not attempted. -12 : target value outside soft limits. Move not attempted. -11 : drive PV is not connected. Move not attempted. -8 : move started, but timed-out. # -7 : move started, timed-out, but appears done. -5 : move started, unexpected return value from PV.put(). # -4 : move-with-wait finished, soft limit violation seen. # -3 : move-with-wait finished, hard limit violation seen. 0 : move-with-wait finished OK. 0 : move-without-wait executed, move not confirmed. 1 : move-without-wait executed, move confirmed. # 3 : move-without-wait finished, hard limit violation seen. # 4 : move-without-wait finished, soft limit violation seen. """ INVALID_VALUE = -13 OUTSIDE_LIMITS = -12 NOT_CONNECTED = -11 TIMEOUT = -8 UNKNOWN_ERROR = -5 SUCCESS = 0 EXECUTED = 0 CONFIRMED = 1 PUT_SUCCESS = 1 PUT_TIMEOUT = -1 try: val = float(val) except Exception: return INVALID_VALUE if relative: val += self.pvs.drive.get() if not ignore_limits: if not self.within_epics_limits(val): return OUTSIDE_LIMITS put_stat = self.pvs.drive.put(val, wait=wait, timeout=timeout) if not self.is_output_on(): self.enable_output(verbose=False) if target_gnd == True or target_gnd == 1: self.set_ground_on_target(True,verbose=False) elif target_gnd == False or target_gnd == 0: self.set_ground_on_target(False,verbose=False) self.allow_move() if put_stat is None: return NOT_CONNECTED if wait and put_stat == PUT_TIMEOUT: return TIMEOUT if put_stat != PUT_SUCCESS: return UNKNOWN_ERROR t0 = time.time() tstart = t0 + min(timeout, 10) tout = t0 + timeout if not wait and not confirm_move: return EXECUTED if not wait: return CONFIRMED if time.time() > tout: return TIMEOUT # Wait before checking if target value reached. It's necessary as sometimes this PV takes a while to set for the new target value. time.sleep(pos_check_delay) while self.is_at_target(tolerance=tolerance) == False and time.time() <= tout: # If the value is near the last 2um. Move the cube to setpoint a few more times to get to the right position. if self.is_at_target( tolerance = 2): for attempt in range(move_attempts): if verbose: print(f'move attempt: {attempt+1}') self.pvs.move.put(1, wait=True) time.sleep(move_attempt_sleep) if self.is_at_target(tolerance=tolerance): if verbose: print(f'Move finished. Had to poke the cube {attempt+1} times to get there though.') return SUCCESS print('Position reached within 2um, but not better.') return TIMEOUT if self.is_at_target(tolerance=tolerance): return SUCCESS return UNKNOWN_ERROR def gui(self): device, motor = self.ID.split(":") cmd = f'caqtdm -macro "DEVICE={device}" S_ANC350.ui' return subprocess.Popen(cmd, shell=True) class AttocubeError(AdjustableError): pass