fix mo1_bragg
This commit is contained in:
@@ -70,6 +70,13 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
|
||||
super().__init__(name=name, scan_info=scan_info, prefix=prefix, **kwargs)
|
||||
self.scan_parameters: ScanServerScanInfo = None
|
||||
self.timeout_for_pvwait = 7.5
|
||||
self.valid_scan_names = [
|
||||
"xas_simple_scan",
|
||||
"xas_simple_scan_with_xrd",
|
||||
"xas_advanced_scan",
|
||||
"xas_advanced_scan_with_xrd",
|
||||
"nidaq_continuous_scan",
|
||||
]
|
||||
|
||||
########################################
|
||||
# Beamline Specific Implementations #
|
||||
@@ -104,158 +111,172 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
|
||||
status.wait(timeout=self.timeout_for_pvwait)
|
||||
|
||||
scan_name = self.scan_parameters.scan_name
|
||||
start, stop = self.scan_parameters.positions or (None, None)
|
||||
scan_time = self.scan_parameters.additional_scan_parameters.get("scan_time", None)
|
||||
scan_duration = self.scan_parameters.additional_scan_parameters.get("scan_duration", None)
|
||||
if scan_name == "xas_simple_scan":
|
||||
if any(param is None for param in [start, stop, scan_time, scan_duration]):
|
||||
raise Mo1BraggError(
|
||||
f"Missing scan parameters for xas_simple_scan. Required parameters: start, stop, scan_time, scan_duration in additional_scan_parameters dict {self.scan_parameters.additional_scan_parameters}"
|
||||
if self._check_if_scan_name_is_valid(self.scan_parameters):
|
||||
start, stop = self.scan_parameters.positions or (None, None)
|
||||
scan_time = self.scan_parameters.additional_scan_parameters.get("scan_time", None)
|
||||
scan_duration = self.scan_parameters.additional_scan_parameters.get(
|
||||
"scan_duration", None
|
||||
)
|
||||
if scan_name == "xas_simple_scan":
|
||||
if any(param is None for param in [start, stop, scan_time, scan_duration]):
|
||||
raise Mo1BraggError(
|
||||
f"Missing scan parameters for xas_simple_scan. Required parameters: start, stop, scan_time, scan_duration in additional_scan_parameters dict {self.scan_parameters.additional_scan_parameters}"
|
||||
)
|
||||
self.set_xas_settings(low=start, high=stop, scan_time=scan_time)
|
||||
self.set_trig_settings(
|
||||
enable_low=False,
|
||||
enable_high=False,
|
||||
break_time_low=0,
|
||||
break_time_high=0,
|
||||
cycle_low=0,
|
||||
cycle_high=0,
|
||||
exp_time=0,
|
||||
n_of_trigger=0,
|
||||
)
|
||||
self.set_xas_settings(low=start, high=stop, scan_time=scan_time)
|
||||
self.set_trig_settings(
|
||||
enable_low=False,
|
||||
enable_high=False,
|
||||
break_time_low=0,
|
||||
break_time_high=0,
|
||||
cycle_low=0,
|
||||
cycle_high=0,
|
||||
exp_time=0,
|
||||
n_of_trigger=0,
|
||||
)
|
||||
self.set_scan_control_settings(mode=ScanControlMode.SIMPLE, scan_duration=scan_duration)
|
||||
elif scan_name == "xas_simple_scan_with_xrd":
|
||||
break_enable_low = self.scan_parameters.additional_scan_parameters.get(
|
||||
"break_enable_low", None
|
||||
)
|
||||
break_enable_high = self.scan_parameters.additional_scan_parameters.get(
|
||||
"break_enable_high", None
|
||||
)
|
||||
break_time_low = self.scan_parameters.additional_scan_parameters.get(
|
||||
"break_time_low", None
|
||||
)
|
||||
break_time_high = self.scan_parameters.additional_scan_parameters.get(
|
||||
"break_time_high", None
|
||||
)
|
||||
cycle_low = self.scan_parameters.additional_scan_parameters.get("cycle_low", None)
|
||||
cycle_high = self.scan_parameters.additional_scan_parameters.get("cycle_high", None)
|
||||
exp_time = self.scan_parameters.exp_time
|
||||
n_of_trigger = self.scan_parameters.additional_scan_parameters.get("n_of_trigger", None)
|
||||
if any(
|
||||
param is None
|
||||
for param in [
|
||||
start,
|
||||
stop,
|
||||
scan_time,
|
||||
scan_duration,
|
||||
break_enable_low,
|
||||
break_enable_high,
|
||||
break_time_low,
|
||||
break_time_high,
|
||||
cycle_low,
|
||||
cycle_high,
|
||||
exp_time,
|
||||
n_of_trigger,
|
||||
]
|
||||
):
|
||||
raise Mo1BraggError(
|
||||
f"Missing scan parameters for xas_simple_scan_with_xrd. Required parameters: start, stop, scan_time, scan_duration, break_enable_low, break_enable_high, break_time_low, break_time_high, cycle_low, cycle_high, exp_time, n_of_trigger in additional_scan_parameters dict {self.scan_parameters.additional_scan_parameters}"
|
||||
self.set_scan_control_settings(
|
||||
mode=ScanControlMode.SIMPLE, scan_duration=scan_duration
|
||||
)
|
||||
self.set_xas_settings(low=start, high=stop, scan_time=scan_time)
|
||||
self.set_trig_settings(
|
||||
enable_low=break_enable_low,
|
||||
enable_high=break_enable_high,
|
||||
break_time_low=break_time_low,
|
||||
break_time_high=break_time_high,
|
||||
cycle_low=cycle_low,
|
||||
cycle_high=cycle_high,
|
||||
exp_time=exp_time,
|
||||
n_of_trigger=n_of_trigger,
|
||||
)
|
||||
self.set_scan_control_settings(mode=ScanControlMode.SIMPLE, scan_duration=scan_duration)
|
||||
elif scan_name == "xas_advanced_scan":
|
||||
p_kink = self.scan_parameters.additional_scan_parameters.get("p_kink", None)
|
||||
e_kink = self.scan_parameters.additional_scan_parameters.get("e_kink", None)
|
||||
if any(
|
||||
param is None for param in [start, stop, scan_time, scan_duration, p_kink, e_kink]
|
||||
):
|
||||
raise Mo1BraggError(
|
||||
f"Missing scan parameters for xas_advanced_scan. Required parameters: start, stop, scan_time, scan_duration, p_kink, e_kink in additional_scan_parameters dict {self.scan_parameters.additional_scan_parameters}"
|
||||
elif scan_name == "xas_simple_scan_with_xrd":
|
||||
break_enable_low = self.scan_parameters.additional_scan_parameters.get(
|
||||
"break_enable_low", None
|
||||
)
|
||||
self.set_advanced_xas_settings(
|
||||
low=start, high=stop, scan_time=scan_time, p_kink=p_kink, e_kink=e_kink
|
||||
)
|
||||
self.set_trig_settings(
|
||||
enable_low=False,
|
||||
enable_high=False,
|
||||
break_time_low=0,
|
||||
break_time_high=0,
|
||||
cycle_low=0,
|
||||
cycle_high=0,
|
||||
exp_time=0,
|
||||
n_of_trigger=0,
|
||||
)
|
||||
self.set_scan_control_settings(
|
||||
mode=ScanControlMode.ADVANCED, scan_duration=scan_duration
|
||||
)
|
||||
elif scan_name == "xas_advanced_scan_with_xrd":
|
||||
p_kink = self.scan_parameters.additional_scan_parameters.get("p_kink", None)
|
||||
e_kink = self.scan_parameters.additional_scan_parameters.get("e_kink", None)
|
||||
break_enable_low = self.scan_parameters.additional_scan_parameters.get(
|
||||
"break_enable_low", None
|
||||
)
|
||||
break_enable_high = self.scan_parameters.additional_scan_parameters.get(
|
||||
"break_enable_high", None
|
||||
)
|
||||
break_time_low = self.scan_parameters.additional_scan_parameters.get(
|
||||
"break_time_low", None
|
||||
)
|
||||
break_time_high = self.scan_parameters.additional_scan_parameters.get(
|
||||
"break_time_high", None
|
||||
)
|
||||
cycle_low = self.scan_parameters.additional_scan_parameters.get("cycle_low", None)
|
||||
cycle_high = self.scan_parameters.additional_scan_parameters.get("cycle_high", None)
|
||||
exp_time = self.scan_parameters.exp_time
|
||||
n_of_trigger = self.scan_parameters.additional_scan_parameters.get("n_of_trigger", None)
|
||||
if any(
|
||||
param is None
|
||||
for param in [
|
||||
start,
|
||||
stop,
|
||||
scan_time,
|
||||
scan_duration,
|
||||
p_kink,
|
||||
e_kink,
|
||||
break_enable_low,
|
||||
break_enable_high,
|
||||
break_time_low,
|
||||
break_time_high,
|
||||
cycle_low,
|
||||
cycle_high,
|
||||
exp_time,
|
||||
n_of_trigger,
|
||||
]
|
||||
):
|
||||
raise Mo1BraggError(
|
||||
f"Missing scan parameters for xas_advanced_scan_with_xrd. Required parameters: start, stop, scan_time, scan_duration, p_kink, e_kink, break_enable_low, break_enable_high, break_time_low, break_time_high, cycle_low, cycle_high, exp_time, n_of_trigger in additional_scan_parameters dict {self.scan_parameters.additional_scan_parameters}"
|
||||
break_enable_high = self.scan_parameters.additional_scan_parameters.get(
|
||||
"break_enable_high", None
|
||||
)
|
||||
break_time_low = self.scan_parameters.additional_scan_parameters.get(
|
||||
"break_time_low", None
|
||||
)
|
||||
break_time_high = self.scan_parameters.additional_scan_parameters.get(
|
||||
"break_time_high", None
|
||||
)
|
||||
cycle_low = self.scan_parameters.additional_scan_parameters.get("cycle_low", None)
|
||||
cycle_high = self.scan_parameters.additional_scan_parameters.get("cycle_high", None)
|
||||
exp_time = self.scan_parameters.exp_time
|
||||
n_of_trigger = self.scan_parameters.additional_scan_parameters.get(
|
||||
"n_of_trigger", None
|
||||
)
|
||||
if any(
|
||||
param is None
|
||||
for param in [
|
||||
start,
|
||||
stop,
|
||||
scan_time,
|
||||
scan_duration,
|
||||
break_enable_low,
|
||||
break_enable_high,
|
||||
break_time_low,
|
||||
break_time_high,
|
||||
cycle_low,
|
||||
cycle_high,
|
||||
exp_time,
|
||||
n_of_trigger,
|
||||
]
|
||||
):
|
||||
raise Mo1BraggError(
|
||||
f"Missing scan parameters for xas_simple_scan_with_xrd. Required parameters: start, stop, scan_time, scan_duration, break_enable_low, break_enable_high, break_time_low, break_time_high, cycle_low, cycle_high, exp_time, n_of_trigger in additional_scan_parameters dict {self.scan_parameters.additional_scan_parameters}"
|
||||
)
|
||||
self.set_xas_settings(low=start, high=stop, scan_time=scan_time)
|
||||
self.set_trig_settings(
|
||||
enable_low=break_enable_low,
|
||||
enable_high=break_enable_high,
|
||||
break_time_low=break_time_low,
|
||||
break_time_high=break_time_high,
|
||||
cycle_low=cycle_low,
|
||||
cycle_high=cycle_high,
|
||||
exp_time=exp_time,
|
||||
n_of_trigger=n_of_trigger,
|
||||
)
|
||||
self.set_scan_control_settings(
|
||||
mode=ScanControlMode.SIMPLE, scan_duration=scan_duration
|
||||
)
|
||||
elif scan_name == "xas_advanced_scan":
|
||||
p_kink = self.scan_parameters.additional_scan_parameters.get("p_kink", None)
|
||||
e_kink = self.scan_parameters.additional_scan_parameters.get("e_kink", None)
|
||||
if any(
|
||||
param is None
|
||||
for param in [start, stop, scan_time, scan_duration, p_kink, e_kink]
|
||||
):
|
||||
raise Mo1BraggError(
|
||||
f"Missing scan parameters for xas_advanced_scan. Required parameters: start, stop, scan_time, scan_duration, p_kink, e_kink in additional_scan_parameters dict {self.scan_parameters.additional_scan_parameters}"
|
||||
)
|
||||
self.set_advanced_xas_settings(
|
||||
low=start, high=stop, scan_time=scan_time, p_kink=p_kink, e_kink=e_kink
|
||||
)
|
||||
self.set_trig_settings(
|
||||
enable_low=False,
|
||||
enable_high=False,
|
||||
break_time_low=0,
|
||||
break_time_high=0,
|
||||
cycle_low=0,
|
||||
cycle_high=0,
|
||||
exp_time=0,
|
||||
n_of_trigger=0,
|
||||
)
|
||||
self.set_scan_control_settings(
|
||||
mode=ScanControlMode.ADVANCED, scan_duration=scan_duration
|
||||
)
|
||||
elif scan_name == "xas_advanced_scan_with_xrd":
|
||||
p_kink = self.scan_parameters.additional_scan_parameters.get("p_kink", None)
|
||||
e_kink = self.scan_parameters.additional_scan_parameters.get("e_kink", None)
|
||||
break_enable_low = self.scan_parameters.additional_scan_parameters.get(
|
||||
"break_enable_low", None
|
||||
)
|
||||
break_enable_high = self.scan_parameters.additional_scan_parameters.get(
|
||||
"break_enable_high", None
|
||||
)
|
||||
break_time_low = self.scan_parameters.additional_scan_parameters.get(
|
||||
"break_time_low", None
|
||||
)
|
||||
break_time_high = self.scan_parameters.additional_scan_parameters.get(
|
||||
"break_time_high", None
|
||||
)
|
||||
cycle_low = self.scan_parameters.additional_scan_parameters.get("cycle_low", None)
|
||||
cycle_high = self.scan_parameters.additional_scan_parameters.get("cycle_high", None)
|
||||
exp_time = self.scan_parameters.exp_time
|
||||
n_of_trigger = self.scan_parameters.additional_scan_parameters.get(
|
||||
"n_of_trigger", None
|
||||
)
|
||||
if any(
|
||||
param is None
|
||||
for param in [
|
||||
start,
|
||||
stop,
|
||||
scan_time,
|
||||
scan_duration,
|
||||
p_kink,
|
||||
e_kink,
|
||||
break_enable_low,
|
||||
break_enable_high,
|
||||
break_time_low,
|
||||
break_time_high,
|
||||
cycle_low,
|
||||
cycle_high,
|
||||
exp_time,
|
||||
n_of_trigger,
|
||||
]
|
||||
):
|
||||
raise Mo1BraggError(
|
||||
f"Missing scan parameters for xas_advanced_scan_with_xrd. Required parameters: start, stop, scan_time, scan_duration, p_kink, e_kink, break_enable_low, break_enable_high, break_time_low, break_time_high, cycle_low, cycle_high, exp_time, n_of_trigger in additional_scan_parameters dict {self.scan_parameters.additional_scan_parameters}"
|
||||
)
|
||||
|
||||
self.set_advanced_xas_settings(
|
||||
low=start, high=stop, scan_time=scan_time, p_kink=p_kink, e_kink=e_kink
|
||||
)
|
||||
self.set_trig_settings(
|
||||
enable_low=break_enable_low,
|
||||
enable_high=break_enable_high,
|
||||
break_time_low=break_time_low,
|
||||
break_time_high=break_time_high,
|
||||
cycle_low=cycle_low,
|
||||
cycle_high=cycle_high,
|
||||
exp_time=exp_time,
|
||||
n_of_trigger=n_of_trigger,
|
||||
)
|
||||
self.set_scan_control_settings(
|
||||
mode=ScanControlMode.ADVANCED, scan_duration=scan_duration
|
||||
)
|
||||
self.set_advanced_xas_settings(
|
||||
low=start, high=stop, scan_time=scan_time, p_kink=p_kink, e_kink=e_kink
|
||||
)
|
||||
self.set_trig_settings(
|
||||
enable_low=break_enable_low,
|
||||
enable_high=break_enable_high,
|
||||
break_time_low=break_time_low,
|
||||
break_time_high=break_time_high,
|
||||
cycle_low=cycle_low,
|
||||
cycle_high=cycle_high,
|
||||
exp_time=exp_time,
|
||||
n_of_trigger=n_of_trigger,
|
||||
)
|
||||
self.set_scan_control_settings(
|
||||
mode=ScanControlMode.ADVANCED, scan_duration=scan_duration
|
||||
)
|
||||
else:
|
||||
return # Should never happen.
|
||||
else:
|
||||
return
|
||||
# Setting scan duration seems to lag behind slightly in the backend, include small sleep
|
||||
@@ -335,6 +356,13 @@ class Mo1Bragg(PSIDeviceBase, Mo1BraggPositioner):
|
||||
self.stopped = True # Needs to be set to stop motion
|
||||
|
||||
######### Utility Methods #########
|
||||
|
||||
def _check_if_scan_name_is_valid(self, scan_parameters: ScanServerScanInfo) -> bool:
|
||||
"""Check if the scan is within the list of scans for which the backend is working"""
|
||||
if scan_parameters.scan_name in self.valid_scan_names:
|
||||
return True
|
||||
return False
|
||||
|
||||
def _progress_update(self, value, **kwargs) -> None:
|
||||
"""Callback method to update the scan progress, runs a callback
|
||||
to SUB_PROGRESS subscribers, i.e. BEC.
|
||||
|
||||
Reference in New Issue
Block a user