import time import subprocess from types import SimpleNamespace from enum import IntEnum from epics import ca # import slic from slic.core.adjustable import Adjustable, AdjustableError from slic.utils import typename from slic.utils.printing import printable_dict from slic.utils.hastyepics import get_pv as PV # from ..basedevice import BaseDevice from slic.core.device.basedevice import BaseDevice class Status(IntEnum): STOPPED = 0 STEPPING = 1 SCANNING = 2 HOLDING = 3 TARGETING = 4 MOVE_DELAY = 5 CALIBRATING = 6 FINDING_REF = 7 LOCKED = 8 class SmarActStage(BaseDevice): def __init__(self, name=None, **axis_ids): self.name = name self.axis_ids = axis_ids self.axes = {} for ax_name, ax_id in axis_ids.items(): record_name = f"{name}: {ax_name}" ax = SmarActAxis(ax_id, name=record_name) setattr(self, ax_name, ax) self.axes[ax_name] = ax def __repr__(self): tname = typename(self) name = self.name head = f"{tname} \"{name}\"" to_print = {ax_name: ax.get_current_value() for ax_name, ax in self.axes.items()} return printable_dict(to_print, head) class SmarActAxis(Adjustable): def __init__(self, ID, name=None, units=None, internal=False): super().__init__(ID, name=name, units=units, internal=internal) self.wait_time = 0.1 self.timeout = 60 self._move_requested = False self.pvs = SimpleNamespace( drive = PV(ID + ":DRIVE"), readback = PV(ID + ":MOTRBV"), hlm = PV(ID + ":HLM"), llm = PV(ID + ":LLM"), status = PV(ID + ":STATUS"), set_pos = PV(ID + ":SET_POS"), stop = PV(ID + ":STOP.PROC"), hold = PV(ID + ":HOLD"), twv = PV(ID + ":TWV"), units = PV(ID + ":DRIVE.EGU") ) @property def units(self): units = self._units if units is not None: return units return self.pvs.units.get() @units.setter def units(self, value): self._units = value def get_current_value(self, readback=True): if readback: return self.pvs.readback.get() else: return self.pvs.drive.get() def reset_current_value_to(self, value): return self.pvs.set_pos.put(value) def set_target_value(self, value): print(f"moving to {value}") wait_time = self.wait_time timeout = self.timeout + time.time() self._move_requested = True self.pvs.drive.put(value, wait=True) # wait for start while self._move_requested and not self.is_moving(): time.sleep(wait_time) if time.time() >= timeout: tname = typename(self) self.stop() raise SmarActError(f"starting to move {tname} \"{self.name}\" to {value} {self.units} timed out") # wait for move done while self._move_requested and self.is_moving(): if self.is_holding(): # holding == arrived at target! break time.sleep(wait_time) self._move_requested = False print("move done") def stop(self): self._move_requested = False self.pvs.stop.put(1, wait=True) def is_moving(self): return self.status != Status.STOPPED def is_holding(self): return self.status == Status.HOLDING @property def status(self): return self.pvs.status.get() def within_epics_limits(self, val): low, high = self.get_epics_limits() return low <= val <= high def get_epics_limits(self): low = self.pvs.llm.get() high = self.pvs.hlm.get() return low, high def set_epics_limits(self, low, high, relative_to_current=False): low = -np.inf if low is None else low high = +np.inf if high is None else high if relative_to_current: val = self.get_current_value() low += val high += val self.pvs.llm.put(low) self.pvs.hlm.put(high) def move(self, val, relative=False, wait=True, ignore_limits=False, confirm_move=True, timeout=300.0): """ moves SmarAct drive to position (emulating pyepics Motor class) arguments: ========== val value to move to (float) [Must be provided] relative move relative to current position (T/F) [F] wait whether to wait for move to complete (T/F) [F] ignore_limits try move without regard to limits (T/F) [F] confirm_move try to confirm that move has begun (T/F) [F] timeout max time for move to complete (in seconds) [300] return values: ============== -13 : invalid value (cannot convert to float). Move not attempted. -12 : target value outside soft limits. Move not attempted. -11 : drive PV is not connected. Move not attempted. -8 : move started, but timed-out. # -7 : move started, timed-out, but appears done. -5 : move started, unexpected return value from PV.put(). # -4 : move-with-wait finished, soft limit violation seen. # -3 : move-with-wait finished, hard limit violation seen. 0 : move-with-wait finished OK. 0 : move-without-wait executed, move not confirmed. 1 : move-without-wait executed, move confirmed. # 3 : move-without-wait finished, hard limit violation seen. # 4 : move-without-wait finished, soft limit violation seen. """ INVALID_VALUE = -13 OUTSIDE_LIMITS = -12 NOT_CONNECTED = -11 TIMEOUT = -8 UNKNOWN_ERROR = -5 SUCCESS = 0 EXECUTED = 0 CONFIRMED = 1 PUT_SUCCESS = 1 PUT_TIMEOUT = -1 try: val = float(val) except Exception: return INVALID_VALUE if relative: val += self.pvs.drive.get() if not ignore_limits: if not self.within_epics_limits(val): return OUTSIDE_LIMITS put_stat = self.pvs.drive.put(val, wait=wait, timeout=timeout) if put_stat is None: return NOT_CONNECTED if wait and put_stat == PUT_TIMEOUT: return TIMEOUT if put_stat != PUT_SUCCESS: return UNKNOWN_ERROR stat = self.status t0 = time.time() thold = t0 + self.pvs.hold.get() * 0.001 tstart = t0 + min(timeout, 10) tout = t0 + timeout if not wait and not confirm_move: return EXECUTED while stat == Status.HOLDING and time.time() <= thold: ca.poll(evt=1.0e-2) stat = self.status while stat == Status.STOPPED and time.time() <= tstart: ca.poll(evt=1.0e-2) stat = self.status if stat != Status.TARGETING: if time.time() > tout: return TIMEOUT else: return UNKNOWN_ERROR if not wait: return CONFIRMED while stat == Status.TARGETING and time.time() <= tout: ca.poll(evt=1.0e-2) stat = self.status if stat not in (Status.HOLDING, Status.TARGETING): return UNKNOWN_ERROR if time.time() > tout: return TIMEOUT twv = self.pvs.twv.get() twv = abs(twv) while stat == Status.HOLDING and time.time() <= tout: ca.poll(evt=1.0e-2) stat = self.status delta = self.pvs.readback.get() - val delta = abs(delta) if delta < twv: return SUCCESS return UNKNOWN_ERROR def gui(self): device, motor = self.ID.split(":") cmd = f'caqtdm -macro "P={device}:,M={motor}" ESB_MX_SMARACT_mot_exp.ui' return subprocess.Popen(cmd, shell=True) class SmarActError(AdjustableError): pass