import time import subprocess from types import SimpleNamespace from enum import IntEnum from epics import ca # import slic from slic.core.adjustable import Adjustable, AdjustableError from slic.utils import typename from slic.utils.printing import printable_dict from slic.utils.hastyepics import get_pv as PV # from ..basedevice import BaseDevice from slic.core.device.basedevice import BaseDevice class Status(IntEnum): STOPPED = 0 STEPPING = 1 SCANNING = 2 HOLDING = 3 TARGETING = 4 MOVE_DELAY = 5 CALIBRATING = 6 FINDING_REF = 7 LOCKED = 8 class SmarActStage(BaseDevice): def __init__(self, name=None, **axis_ids): self.name = name self.axis_ids = axis_ids self.axes = {} for ax_name, ax_id in axis_ids.items(): record_name = f"{name}: {ax_name}" ax = SmarActAxis(ax_id, name=record_name) setattr(self, ax_name, ax) self.axes[ax_name] = ax def __repr__(self): tname = typename(self) name = self.name head = f'{tname} "{name}"' to_print = {ax_name: ax.get_current_value() for ax_name, ax in self.axes.items()} return printable_dict(to_print, head) class SmarActAxis(Adjustable): def __init__(self, ID, name=None, units=None, internal=False, tolerance=0.5): super().__init__(ID, name=name, units=units, internal=internal) self.wait_time = 0.1 self.timeout = 60 self._move_requested = False self.config = SimpleNamespace( tolerance=tolerance, ) self.pvs = SimpleNamespace( drive=PV(ID + ":DRIVE"), readback=PV(ID + ":MOTRBV"), hlm=PV(ID + ":HLM"), llm=PV(ID + ":LLM"), status=PV(ID + ":STATUS"), set_pos=PV(ID + ":SET_POS"), stop=PV(ID + ":STOP.PROC"), hold=PV(ID + ":HOLD"), twv=PV(ID + ":TWV"), units=PV(ID + ":DRIVE.EGU"), ) @property def units(self): units = self._units if units is not None: return units return self.pvs.units.get() @units.setter def units(self, value): self._units = value def get_current_value(self, readback=True): if readback: return self.pvs.readback.get() else: return self.pvs.drive.get() def reset_current_value_to(self, value): return self.pvs.set_pos.put(value) def set_target_value(self, value): print(f"moving to {value}") wait_time = self.wait_time timeout = self.timeout + time.time() self._move_requested = True self.pvs.drive.put(value, wait=True) # wait for start while self._move_requested and not self.is_moving(): time.sleep(wait_time) if time.time() >= timeout: tname = typename(self) self.stop() raise SmarActError(f'starting to move {tname} "{self.name}" to {value} {self.units} timed out') # wait for move done while self._move_requested and self.is_moving(): if self.is_holding(): # holding == arrived at target! break time.sleep(wait_time) self._move_requested = False print("move done") def is_at_target(self, from_pv=False, tolerance=None): """Check whether target was reached. If from_pv is True TARGET_REACHED_RB is used, else it's calculated as diffrence between the readback and set value within tolerance. Default is False, because otherwise tolerance can only be set and read out as an integer.""" if tolerance == None: tolerance = self.config.tolerance if from_pv == True: return self.pvs.target_reached.value == 1 else: return abs(self.pvs.readback.get() - self.pvs.drive.get()) < tolerance def stop(self): self._move_requested = False self.pvs.stop.put(1, wait=True) def is_moving(self): return self.status != Status.STOPPED def is_holding(self): return self.status == Status.HOLDING @property def status(self): return self.pvs.status.get() def within_epics_limits(self, val): low, high = self.get_epics_limits() return low <= val <= high def get_epics_limits(self): low = self.pvs.llm.get() high = self.pvs.hlm.get() return low, high def set_epics_limits(self, low, high, relative_to_current=False): low = -np.inf if low is None else low high = +np.inf if high is None else high if relative_to_current: val = self.get_current_value() low += val high += val self.pvs.llm.put(low) self.pvs.hlm.put(high) def move(self, val, relative=False, wait=True, ignore_limits=False, confirm_move=True, timeout=300.0): """ moves SmarAct drive to position (emulating pyepics Motor class) arguments: ========== val value to move to (float) [Must be provided] relative move relative to current position (T/F) [F] wait whether to wait for move to complete (T/F) [F] ignore_limits try move without regard to limits (T/F) [F] confirm_move try to confirm that move has begun (T/F) [F] timeout max time for move to complete (in seconds) [300] return values: ============== -13 : invalid value (cannot convert to float). Move not attempted. -12 : target value outside soft limits. Move not attempted. -11 : drive PV is not connected. Move not attempted. -8 : move started, but timed-out. # -7 : move started, timed-out, but appears done. -5 : move started, unexpected return value from PV.put(). # -4 : move-with-wait finished, soft limit violation seen. # -3 : move-with-wait finished, hard limit violation seen. 0 : move-with-wait finished OK. 0 : move-without-wait executed, move not confirmed. 1 : move-without-wait executed, move confirmed. # 3 : move-without-wait finished, hard limit violation seen. # 4 : move-without-wait finished, soft limit violation seen. """ INVALID_VALUE = -13 OUTSIDE_LIMITS = -12 NOT_CONNECTED = -11 TIMEOUT = -8 UNKNOWN_ERROR = -5 SUCCESS = 0 EXECUTED = 0 CONFIRMED = 1 PUT_SUCCESS = 1 PUT_TIMEOUT = -1 try: val = float(val) except Exception: return INVALID_VALUE if relative: val += self.pvs.drive.get() if not ignore_limits: if not self.within_epics_limits(val): return OUTSIDE_LIMITS put_stat = self.pvs.drive.put(val, wait=wait, timeout=timeout) if put_stat is None: return NOT_CONNECTED if wait and put_stat == PUT_TIMEOUT: return TIMEOUT if put_stat != PUT_SUCCESS: return UNKNOWN_ERROR stat = self.status t0 = time.time() thold = t0 + self.pvs.hold.get() * 0.001 tstart = t0 + min(timeout, 10) tout = t0 + timeout if not wait and not confirm_move: return EXECUTED while stat == Status.HOLDING and time.time() <= thold: ca.poll(evt=1.0e-2) stat = self.status while stat == Status.STOPPED and time.time() <= tstart: ca.poll(evt=1.0e-2) stat = self.status if stat != Status.TARGETING: if time.time() > tout: return TIMEOUT else: return UNKNOWN_ERROR if not wait: return CONFIRMED while stat == Status.TARGETING and time.time() <= tout: ca.poll(evt=1.0e-2) stat = self.status if stat not in (Status.HOLDING, Status.TARGETING): return UNKNOWN_ERROR if time.time() > tout: return TIMEOUT twv = self.pvs.twv.get() twv = abs(twv) ca.poll(evt=1.0e-1) while stat == Status.HOLDING and time.time() <= tout: ca.poll(evt=1.0e-2) stat = self.status delta = self.pvs.readback.get() - val delta = abs(delta) if delta < twv: return SUCCESS return UNKNOWN_ERROR def gui(self): device, motor = self.ID.split(":") cmd = f'caqtdm -macro "P={device}:,M={motor}" SARES30-CPCL-SMARACTX1_6axis_old.ui' return subprocess.Popen(cmd, shell=True) class SmarActError(AdjustableError): pass