updated with slic master branch

This commit is contained in:
2022-09-20 14:45:33 +02:00
parent 8e82ddcc67
commit 92b17647f9
6 changed files with 334 additions and 22 deletions

16
channels_minimal.py Normal file
View File

@ -0,0 +1,16 @@
# Channels at Cristallina endstation
##########################################################################################################
# BS channels
# TODO: JF settings regarding raw conversion, compression, etc.
detectors = [
# "JF16T03V01",
]
channels = []
pvs = []#pvs_slits + pv_channels + smaract_channels

View File

@ -13,12 +13,23 @@ import numpy as np
# from tqdm import trange
from epics import PV
#MODULE_PATH = "/sf/cristallina/applications/slic/slic/slic/__init__.py"
#MODULE_NAME = "slic"
#import importlib
#import sys
#spec = importlib.util.spec_from_file_location(MODULE_NAME, MODULE_PATH)
#module = importlib.util.module_from_spec(spec)
#sys.modules[spec.name] = module
#spec.loader.exec_module(module)
from slic.gui import GUI
from slic.core.adjustable import Adjustable, PVAdjustable, DummyAdjustable
from slic.core.acquisition import SFAcquisition, PVAcquisition
from slic.core.condition import PVCondition
from slic.core.scanner import Scanner
from slic.devices.simpledevice import SimpleDevice
from slic.core.device.simpledevice import SimpleDevice
from slic.devices.general.motor import Motor
from slic.utils import devices, Marker, as_shortcut
from slic.utils import Channels, Config, Elog, Screenshot, PV
@ -38,23 +49,21 @@ cool_motor = MyNewCoolThing("cool_motor")
dummy = DummyAdjustable(units="au")
## Attenuator
from slic.devices.xoptics.attenuator_aramis import AttenuatorAramis
from slic.devices.xoptics.aramis_attenuator import Attenuator
from knife_edge import KnifeEdge
attenuator_ID = "SAROP31-OATA150"
attenuator = AttenuatorAramis(
attenuator = Attenuator(
attenuator_ID, description="Attenuators with absolute encoders"
)
def test_attenuator():
tfundamental, tHG = attenuator.get_transmission(verbose=False)
tfundamental = attenuator.get_transmission()
try:
assert tfundamental > 0
except TypeError:
print("No transmission value reported from {attenuator.ID}")
test_attenuator()
@ -68,11 +77,11 @@ undulators = undulator.Undulators()
from slic.devices.xoptics import slits
# _old for Alvra codepath, recommended here
slits_ADC = slits.SlitPosWidth_old("SAROP31-OAPU149", name="Apertures - ADC")
# slits_ADC = slits.SlitPosWidth_old("SAROP31-OAPU149", name="Apertures - ADC")
## Smaract stage
from smaract import smaract
from smaract_device_def import smaract
# from attocube_assignment import attocube
###########################################
instrument = "cristallina"

View File

@ -1,4 +1,4 @@
from slic.devices.simpledevice import SimpleDevice
from slic.core.device.simpledevice import SimpleDevice
from slic.devices.general.motor import Motor
mot_x = Motor("SAR-EXPMX:MOT_FX")

View File

@ -1,14 +1,287 @@
from slic.devices.general.smaract import SmarActStage
import time
import subprocess
from types import SimpleNamespace
from enum import IntEnum
from epics import ca
# this currently uses a modified SmarActStage module
# otherwise the wait times are not working correctly.
# import slic
from slic.core.adjustable import Adjustable, AdjustableError
from slic.utils import typename
from slic.utils.printing import printable_dict
from slic.utils.hastyepics import get_pv as PV
# from ..basedevice import BaseDevice
from slic.core.device.basedevice import BaseDevice
smaract = SmarActStage("SARES30-XSMA156",
X='SARES30-XSMA156:X',
Y='SARES30-XSMA156:Y',
Z='SARES30-XSMA156:Z',
Ry='SARES30-XSMA156:Ry',
Rx='SARES30-XSMA156:Rx',
Rz='SARES30-XSMA156:Rz',
class Status(IntEnum):
STOPPED = 0
STEPPING = 1
SCANNING = 2
HOLDING = 3
TARGETING = 4
MOVE_DELAY = 5
CALIBRATING = 6
FINDING_REF = 7
LOCKED = 8
class SmarActStage(BaseDevice):
def __init__(self, name=None, **axis_ids):
self.name = name
self.axis_ids = axis_ids
self.axes = {}
for ax_name, ax_id in axis_ids.items():
record_name = f"{name}: {ax_name}"
ax = SmarActAxis(ax_id, name=record_name)
setattr(self, ax_name, ax)
self.axes[ax_name] = ax
def __repr__(self):
tname = typename(self)
name = self.name
head = f"{tname} \"{name}\""
to_print = {ax_name: ax.get_current_value() for ax_name, ax in self.axes.items()}
return printable_dict(to_print, head)
class SmarActAxis(Adjustable):
def __init__(self, ID, name=None, units=None, internal=False):
super().__init__(ID, name=name, units=units, internal=internal)
self.wait_time = 0.1
self.timeout = 60
self._move_requested = False
self.pvs = SimpleNamespace(
drive = PV(ID + ":DRIVE"),
readback = PV(ID + ":MOTRBV"),
hlm = PV(ID + ":HLM"),
llm = PV(ID + ":LLM"),
status = PV(ID + ":STATUS"),
set_pos = PV(ID + ":SET_POS"),
stop = PV(ID + ":STOP.PROC"),
hold = PV(ID + ":HOLD"),
twv = PV(ID + ":TWV"),
units = PV(ID + ":DRIVE.EGU")
)
@property
def units(self):
units = self._units
if units is not None:
return units
return self.pvs.units.get()
@units.setter
def units(self, value):
self._units = value
def get_current_value(self, readback=True):
if readback:
return self.pvs.readback.get()
else:
return self.pvs.drive.get()
def reset_current_value_to(self, value):
return self.pvs.set_pos.put(value)
def set_target_value(self, value):
print(f"moving to {value}")
wait_time = self.wait_time
timeout = self.timeout + time.time()
self._move_requested = True
self.pvs.drive.put(value, wait=True)
# wait for start
while self._move_requested and not self.is_moving():
time.sleep(wait_time)
if time.time() >= timeout:
tname = typename(self)
self.stop()
raise SmarActError(f"starting to move {tname} \"{self.name}\" to {value} {self.units} timed out")
# wait for move done
while self._move_requested and self.is_moving():
if self.is_holding(): # holding == arrived at target!
break
time.sleep(wait_time)
self._move_requested = False
print("move done")
def stop(self):
self._move_requested = False
self.pvs.stop.put(1, wait=True)
def is_moving(self):
return self.status != Status.STOPPED
def is_holding(self):
return self.status == Status.HOLDING
@property
def status(self):
return self.pvs.status.get()
def within_epics_limits(self, val):
low, high = self.get_epics_limits()
return low <= val <= high
def get_epics_limits(self):
low = self.pvs.llm.get()
high = self.pvs.hlm.get()
return low, high
def set_epics_limits(self, low, high, relative_to_current=False):
low = -np.inf if low is None else low
high = +np.inf if high is None else high
if relative_to_current:
val = self.get_current_value()
low += val
high += val
self.pvs.llm.put(low)
self.pvs.hlm.put(high)
def move(self, val, relative=False, wait=True, ignore_limits=False, confirm_move=True, timeout=300.0):
"""
moves SmarAct drive to position (emulating pyepics Motor class)
arguments:
==========
val value to move to (float) [Must be provided]
relative move relative to current position (T/F) [F]
wait whether to wait for move to complete (T/F) [F]
ignore_limits try move without regard to limits (T/F) [F]
confirm_move try to confirm that move has begun (T/F) [F]
timeout max time for move to complete (in seconds) [300]
return values:
==============
-13 : invalid value (cannot convert to float). Move not attempted.
-12 : target value outside soft limits. Move not attempted.
-11 : drive PV is not connected. Move not attempted.
-8 : move started, but timed-out.
# -7 : move started, timed-out, but appears done.
-5 : move started, unexpected return value from PV.put().
# -4 : move-with-wait finished, soft limit violation seen.
# -3 : move-with-wait finished, hard limit violation seen.
0 : move-with-wait finished OK.
0 : move-without-wait executed, move not confirmed.
1 : move-without-wait executed, move confirmed.
# 3 : move-without-wait finished, hard limit violation seen.
# 4 : move-without-wait finished, soft limit violation seen.
"""
INVALID_VALUE = -13
OUTSIDE_LIMITS = -12
NOT_CONNECTED = -11
TIMEOUT = -8
UNKNOWN_ERROR = -5
SUCCESS = 0
EXECUTED = 0
CONFIRMED = 1
PUT_SUCCESS = 1
PUT_TIMEOUT = -1
try:
val = float(val)
except Exception:
return INVALID_VALUE
if relative:
val += self.pvs.drive.get()
if not ignore_limits:
if not self.within_epics_limits(val):
return OUTSIDE_LIMITS
put_stat = self.pvs.drive.put(val, wait=wait, timeout=timeout)
if put_stat is None:
return NOT_CONNECTED
if wait and put_stat == PUT_TIMEOUT:
return TIMEOUT
if put_stat != PUT_SUCCESS:
return UNKNOWN_ERROR
stat = self.status
t0 = time.time()
thold = t0 + self.pvs.hold.get() * 0.001
tstart = t0 + min(timeout, 10)
tout = t0 + timeout
if not wait and not confirm_move:
return EXECUTED
while stat == Status.HOLDING and time.time() <= thold:
ca.poll(evt=1.0e-2)
stat = self.status
while stat == Status.STOPPED and time.time() <= tstart:
ca.poll(evt=1.0e-2)
stat = self.status
if stat != Status.TARGETING:
if time.time() > tout:
return TIMEOUT
else:
return UNKNOWN_ERROR
if not wait:
return CONFIRMED
while stat == Status.TARGETING and time.time() <= tout:
ca.poll(evt=1.0e-2)
stat = self.status
if stat not in (Status.HOLDING, Status.TARGETING):
return UNKNOWN_ERROR
if time.time() > tout:
return TIMEOUT
twv = self.pvs.twv.get()
twv = abs(twv)
while stat == Status.HOLDING and time.time() <= tout:
ca.poll(evt=1.0e-2)
stat = self.status
delta = self.pvs.readback.get() - val
delta = abs(delta)
if delta < twv:
return SUCCESS
return UNKNOWN_ERROR
def gui(self):
device, motor = self.ID.split(":")
cmd = f'caqtdm -macro "P={device}:,M={motor}" ESB_MX_SMARACT_mot_exp.ui'
return subprocess.Popen(cmd, shell=True)
class SmarActError(AdjustableError):
pass

14
smaract_device_def.py Normal file
View File

@ -0,0 +1,14 @@
# from slic.devices.general.smaract import SmarActStage
from smaract import SmarActStage
# this currently uses a modified SmarActStage module
# otherwise the wait times are not working correctly.
smaract = SmarActStage("SARES30-XSMA156",
X='SARES30-XSMA156:X',
Y='SARES30-XSMA156:Y',
Z='SARES30-XSMA156:Z',
Ry='SARES30-XSMA156:Ry',
Rx='SARES30-XSMA156:Rx',
Rz='SARES30-XSMA156:Rz',
)

View File

@ -1,5 +1,5 @@
from slic.core.adjustable import PVAdjustable, PVEnumAdjustable
from slic.devices.simpledevice import SimpleDevice
from slic.core.device.simpledevice import SimpleDevice
from slic.utils import as_shortcut