Files
cristallina/gp_exp/smaract.py

289 lines
8.6 KiB
Python
Executable File

import time
import subprocess
from types import SimpleNamespace
from enum import IntEnum
from epics import ca
# import slic
from slic.core.adjustable import Adjustable, AdjustableError
from slic.utils import typename
from slic.utils.printing import printable_dict
from slic.utils.hastyepics import get_pv as PV
# from ..basedevice import BaseDevice
from slic.core.device.basedevice import BaseDevice
class Status(IntEnum):
STOPPED = 0
STEPPING = 1
SCANNING = 2
HOLDING = 3
TARGETING = 4
MOVE_DELAY = 5
CALIBRATING = 6
FINDING_REF = 7
LOCKED = 8
class SmarActStage(BaseDevice):
def __init__(self, name=None, **axis_ids):
self.name = name
self.axis_ids = axis_ids
self.axes = {}
for ax_name, ax_id in axis_ids.items():
record_name = f"{name}: {ax_name}"
ax = SmarActAxis(ax_id, name=record_name)
setattr(self, ax_name, ax)
self.axes[ax_name] = ax
def __repr__(self):
tname = typename(self)
name = self.name
head = f'{tname} "{name}"'
to_print = {ax_name: ax.get_current_value() for ax_name, ax in self.axes.items()}
return printable_dict(to_print, head)
class SmarActAxis(Adjustable):
def __init__(self, ID, name=None, units=None, internal=False, tolerance=0.5):
super().__init__(ID, name=name, units=units, internal=internal)
self.wait_time = 0.1
self.timeout = 60
self._move_requested = False
self.config = SimpleNamespace(
tolerance=tolerance,
)
self.pvs = SimpleNamespace(
drive=PV(ID + ":DRIVE"),
readback=PV(ID + ":MOTRBV"),
hlm=PV(ID + ":HLM"),
llm=PV(ID + ":LLM"),
status=PV(ID + ":STATUS"),
set_pos=PV(ID + ":SET_POS"),
stop=PV(ID + ":STOP.PROC"),
hold=PV(ID + ":HOLD"),
twv=PV(ID + ":TWV"),
units=PV(ID + ":DRIVE.EGU"),
)
@property
def units(self):
units = self._units
if units is not None:
return units
return self.pvs.units.get()
@units.setter
def units(self, value):
self._units = value
def get_current_value(self, readback=True):
if readback:
return self.pvs.readback.get()
else:
return self.pvs.drive.get()
def reset_current_value_to(self, value):
return self.pvs.set_pos.put(value)
def set_target_value(self, value):
print(f"moving to {value}")
wait_time = self.wait_time
timeout = self.timeout + time.time()
self._move_requested = True
self.pvs.drive.put(value, wait=True)
# wait for start
while self._move_requested and not self.is_moving():
time.sleep(wait_time)
if time.time() >= timeout:
tname = typename(self)
self.stop()
raise SmarActError(f'starting to move {tname} "{self.name}" to {value} {self.units} timed out')
# wait for move done
while self._move_requested and self.is_moving():
if self.is_holding(): # holding == arrived at target!
break
time.sleep(wait_time)
self._move_requested = False
print("move done")
def is_at_target(self, from_pv=False, tolerance=None):
"""Check whether target was reached.
If from_pv is True TARGET_REACHED_RB is used, else it's calculated as diffrence between the readback and set value within tolerance.
Default is False, because otherwise tolerance can only be set and read out as an integer."""
if tolerance == None:
tolerance = self.config.tolerance
if from_pv == True:
return self.pvs.target_reached.value == 1
else:
return abs(self.pvs.readback.get() - self.pvs.drive.get()) < tolerance
def stop(self):
self._move_requested = False
self.pvs.stop.put(1, wait=True)
def is_moving(self):
return self.status != Status.STOPPED
def is_holding(self):
return self.status == Status.HOLDING
@property
def status(self):
return self.pvs.status.get()
def within_epics_limits(self, val):
low, high = self.get_epics_limits()
return low <= val <= high
def get_epics_limits(self):
low = self.pvs.llm.get()
high = self.pvs.hlm.get()
return low, high
def set_epics_limits(self, low, high, relative_to_current=False):
low = -np.inf if low is None else low
high = +np.inf if high is None else high
if relative_to_current:
val = self.get_current_value()
low += val
high += val
self.pvs.llm.put(low)
self.pvs.hlm.put(high)
def move(self, val, relative=False, wait=True, ignore_limits=False, confirm_move=True, timeout=300.0):
"""
moves SmarAct drive to position (emulating pyepics Motor class)
arguments:
==========
val value to move to (float) [Must be provided]
relative move relative to current position (T/F) [F]
wait whether to wait for move to complete (T/F) [F]
ignore_limits try move without regard to limits (T/F) [F]
confirm_move try to confirm that move has begun (T/F) [F]
timeout max time for move to complete (in seconds) [300]
return values:
==============
-13 : invalid value (cannot convert to float). Move not attempted.
-12 : target value outside soft limits. Move not attempted.
-11 : drive PV is not connected. Move not attempted.
-8 : move started, but timed-out.
# -7 : move started, timed-out, but appears done.
-5 : move started, unexpected return value from PV.put().
# -4 : move-with-wait finished, soft limit violation seen.
# -3 : move-with-wait finished, hard limit violation seen.
0 : move-with-wait finished OK.
0 : move-without-wait executed, move not confirmed.
1 : move-without-wait executed, move confirmed.
# 3 : move-without-wait finished, hard limit violation seen.
# 4 : move-without-wait finished, soft limit violation seen.
"""
INVALID_VALUE = -13
OUTSIDE_LIMITS = -12
NOT_CONNECTED = -11
TIMEOUT = -8
UNKNOWN_ERROR = -5
SUCCESS = 0
EXECUTED = 0
CONFIRMED = 1
PUT_SUCCESS = 1
PUT_TIMEOUT = -1
try:
val = float(val)
except Exception:
return INVALID_VALUE
if relative:
val += self.pvs.drive.get()
if not ignore_limits:
if not self.within_epics_limits(val):
return OUTSIDE_LIMITS
put_stat = self.pvs.drive.put(val, wait=wait, timeout=timeout)
if put_stat is None:
return NOT_CONNECTED
if wait and put_stat == PUT_TIMEOUT:
return TIMEOUT
if put_stat != PUT_SUCCESS:
return UNKNOWN_ERROR
stat = self.status
t0 = time.time()
thold = t0 + self.pvs.hold.get() * 0.001
tstart = t0 + min(timeout, 10)
tout = t0 + timeout
if not wait and not confirm_move:
return EXECUTED
while stat == Status.HOLDING and time.time() <= thold:
ca.poll(evt=1.0e-2)
stat = self.status
while stat == Status.STOPPED and time.time() <= tstart:
ca.poll(evt=1.0e-2)
stat = self.status
if stat != Status.TARGETING:
if time.time() > tout:
return TIMEOUT
else:
return UNKNOWN_ERROR
if not wait:
return CONFIRMED
while stat == Status.TARGETING and time.time() <= tout:
ca.poll(evt=1.0e-2)
stat = self.status
if stat not in (Status.HOLDING, Status.TARGETING):
return UNKNOWN_ERROR
if time.time() > tout:
return TIMEOUT
twv = self.pvs.twv.get()
twv = abs(twv)
ca.poll(evt=1.0e-1)
while stat == Status.HOLDING and time.time() <= tout:
ca.poll(evt=1.0e-2)
stat = self.status
delta = self.pvs.readback.get() - val
delta = abs(delta)
if delta < twv:
return SUCCESS
return UNKNOWN_ERROR
def gui(self):
device, motor = self.ID.split(":")
cmd = f'caqtdm -macro "P={device}:,M={motor}" SARES30-CPCL-SMARACTX1_6axis_old.ui'
return subprocess.Popen(cmd, shell=True)
class SmarActError(AdjustableError):
pass