refactor: refactored methods, changed PV kinds and added more info to docstrings

This commit is contained in:
2024-07-19 08:54:40 +02:00
parent 8eab8e0c6a
commit 5201bbf480

View File

@@ -1,4 +1,12 @@
""" Module for the Mo1 Bragg positioner of the Debye beamline"""
""" Module for the Mo1 Bragg positioner of the Debye beamline.
The soft IOC is reachable via the EPICS prefix X01DA-OP-MO1:BRAGG: and connected
to a motor controller via web sockets. The Mo1 Bragg positioner is not only a
positioner, but also a scan controller to setup XAS and XRD scans. A few scan modes
are programmed in the controller, e.g. simple and advanced XAS scans + XRD triggering mode.
Note: For some of the Epics PVs, in particular action buttons, the suffix .PROC and
put_complete=True is used to ensure that the action is executed completely. This is believed
to allow for a more stable execution of the action."""
import enum
import threading
@@ -34,12 +42,14 @@ class ScanControlScanStatus(int, enum.Enum):
READY = 2
RUNNING = 3
class ScanControlLoadMessage(int, enum.Enum):
""" Validation of loading the scan parameters """
"""Validation of loading the scan parameters"""
PENDING = 0
STARTED = 1
SUCCESS = 2
#TODO add here specific errors above 3-14
# TODO add here specific errors above 3-14
INVALID_SCAN_MODE = 15
@@ -77,7 +87,8 @@ class MoveTypeSignal(Signal):
class Mo1BraggStatus(Device):
# General status
"""Mo1 Bragg PVs for status monitoring"""
error_status = Cpt(EpicsSignalRO, suffix="error_status_RBV", kind="config", auto_monitor=True)
brake_enabled = Cpt(EpicsSignalRO, suffix="brake_enabled_RBV", kind="config", auto_monitor=True)
mot_commutated = Cpt(
@@ -91,117 +102,112 @@ class Mo1BraggStatus(Device):
class Mo1BraggEncoder(Device):
# Encoder reinitialization
enc_reinit = Cpt(EpicsSignal, suffix="enc_reinit", kind="config")
enc_reinit_done = Cpt(
EpicsSignalRO, suffix="enc_reinit_done_RBV", kind="config", auto_monitor=True
)
"""Mo1 Bragg PVs to communiucate with the encoder"""
enc_reinit = Cpt(EpicsSignal, suffix="enc_reinit.PROC", kind="config")
enc_reinit_done = Cpt(EpicsSignalRO, suffix="enc_reinit_done_RBV", kind="config")
class Mo1BraggCrystal(Device):
"""Class to set the crystal parameters of the Bragg positioner"""
"""Mo1 Bragg PVs to set the crystal parameters"""
offset_si111 = Cpt(EpicsSignalWithRBV, suffix="offset_si111", kind="config", auto_monitor=True)
offset_si311 = Cpt(EpicsSignalWithRBV, suffix="offset_si311", kind="config", auto_monitor=True)
xtal_enum = Cpt(EpicsSignalWithRBV, suffix="xtal_ENUM", kind="config", auto_monitor=True)
d_spacing_si111 = Cpt(
EpicsSignalWithRBV, suffix="d_spacing_si111", kind="config", auto_monitor=True
)
d_spacing_si311 = Cpt(
EpicsSignalWithRBV, suffix="d_spacing_si311", kind="config", auto_monitor=True
)
set_offset = Cpt(EpicsSignal, suffix="set_offset", kind="config")
offset_si111 = Cpt(EpicsSignalWithRBV, suffix="offset_si111", kind="config")
offset_si311 = Cpt(EpicsSignalWithRBV, suffix="offset_si311", kind="config")
xtal_enum = Cpt(EpicsSignalWithRBV, suffix="xtal_ENUM", kind="config")
d_spacing_si111 = Cpt(EpicsSignalWithRBV, suffix="d_spacing_si111", kind="config")
d_spacing_si311 = Cpt(EpicsSignalWithRBV, suffix="d_spacing_si311", kind="config")
set_offset = Cpt(EpicsSignal, suffix="set_offset.PROC", kind="config", put_complete=True)
current_xtal = Cpt(
EpicsSignalRO, suffix="current_xtal_ENUM_RBV", kind="normal", auto_monitor=True
)
# TODO Reevaluate which PVs need and should have auto_monitors + kind = normal/config!
class Mo1BraggScanSettings(Device):
"""Scan Settings for Mo1 Bragg positioner"""
"""Mo1 Bragg PVs to set the scan setttings"""
# XRD measurement settings
xrd_select_ref_enum = Cpt(
EpicsSignalWithRBV, suffix="xrd_select_ref_ENUM", kind="normal", auto_monitor=True
)
# XRD settings
xrd_select_ref_enum = Cpt(EpicsSignalWithRBV, suffix="xrd_select_ref_ENUM", kind="confit")
xrd_enable_hi_enum = Cpt(EpicsSignalWithRBV, suffix="xrd_enable_hi_ENUM", kind="config")
# High
xrd_enable_hi_enum = Cpt(
EpicsSignalWithRBV, suffix="xrd_enable_hi_ENUM", kind="normal", auto_monitor=True
)
xrd_time_hi = Cpt(EpicsSignalWithRBV, suffix="xrd_time_hi", kind="normal", auto_monitor=True)
xrd_n_trigger_hi = Cpt(
EpicsSignalWithRBV, suffix="xrd_n_trigger_hi", kind="normal", auto_monitor=True
)
xrd_every_n_hi = Cpt(
EpicsSignalWithRBV, suffix="xrd_every_n_hi", kind="normal", auto_monitor=True
)
xrd_time_hi = Cpt(EpicsSignalWithRBV, suffix="xrd_time_hi", kind="config")
xrd_n_trigger_hi = Cpt(EpicsSignalWithRBV, suffix="xrd_n_trigger_hi", kind="config")
xrd_every_n_hi = Cpt(EpicsSignalWithRBV, suffix="xrd_every_n_hi", kind="config")
# Low
xrd_enable_lo_enum = Cpt(
EpicsSignalWithRBV, suffix="xrd_enable_lo_ENUM", kind="normal", auto_monitor=True
)
xrd_time_lo = Cpt(EpicsSignalWithRBV, suffix="xrd_time_lo", kind="normal", auto_monitor=True)
xrd_n_trigger_lo = Cpt(
EpicsSignalWithRBV, suffix="xrd_n_trigger_lo", kind="normal", auto_monitor=True
)
xrd_every_n_lo = Cpt(
EpicsSignalWithRBV, suffix="xrd_every_n_lo", kind="normal", auto_monitor=True
)
xrd_enable_lo_enum = Cpt(EpicsSignalWithRBV, suffix="xrd_enable_lo_ENUM", kind="config")
xrd_time_lo = Cpt(EpicsSignalWithRBV, suffix="xrd_time_lo", kind="config")
xrd_n_trigger_lo = Cpt(EpicsSignalWithRBV, suffix="xrd_n_trigger_lo", kind="config")
xrd_every_n_lo = Cpt(EpicsSignalWithRBV, suffix="xrd_every_n_lo", kind="config")
# XAS simple scan settings
s_scan_angle_hi = Cpt(
EpicsSignalWithRBV, suffix="s_scan_angle_hi", kind="normal", auto_monitor=True
)
s_scan_angle_lo = Cpt(
EpicsSignalWithRBV, suffix="s_scan_angle_lo", kind="normal", auto_monitor=True
)
s_scan_angle_hi = Cpt(EpicsSignalWithRBV, suffix="s_scan_angle_hi", kind="config")
s_scan_angle_lo = Cpt(EpicsSignalWithRBV, suffix="s_scan_angle_lo", kind="config")
s_scan_energy_lo = Cpt(
EpicsSignalWithRBV, suffix="s_scan_energy_lo", kind="normal", auto_monitor=True
EpicsSignalWithRBV, suffix="s_scan_energy_lo", kind="config", auto_monitor=True
)
s_scan_energy_hi = Cpt(
EpicsSignalWithRBV, suffix="s_scan_energy_hi", kind="normal", auto_monitor=True
EpicsSignalWithRBV, suffix="s_scan_energy_hi", kind="config", auto_monitor=True
)
s_scan_scantime = Cpt(
EpicsSignalWithRBV, suffix="s_scan_scantime", kind="normal", auto_monitor=True
EpicsSignalWithRBV, suffix="s_scan_scantime", kind="config", auto_monitor=True
)
# XAS advanced scan settings
a_scan_pos = Cpt(EpicsSignalWithRBV, suffix="a_scan_pos", kind="normal", auto_monitor=True)
a_scan_vel = Cpt(EpicsSignalWithRBV, suffix="a_scan_vel", kind="normal", auto_monitor=True)
a_scan_time = Cpt(EpicsSignalWithRBV, suffix="a_scan_time", kind="normal", auto_monitor=True)
a_scan_pos = Cpt(EpicsSignalWithRBV, suffix="a_scan_pos", kind="config", auto_monitor=True)
a_scan_vel = Cpt(EpicsSignalWithRBV, suffix="a_scan_vel", kind="config", auto_monitor=True)
a_scan_time = Cpt(EpicsSignalWithRBV, suffix="a_scan_time", kind="config", auto_monitor=True)
class Mo1BraggScanControl(Device):
# Scan control
scan_mode_enum = Cpt(
EpicsSignalWithRBV, suffix="scan_mode_ENUM", kind="normal", auto_monitor=True
)
"""Mo1 Bragg PVs to control the scan after setting the parameters."""
scan_mode_enum = Cpt(EpicsSignalWithRBV, suffix="scan_mode_ENUM", kind="config")
scan_duration = Cpt(
EpicsSignalWithRBV, suffix="scan_duration", kind="normal", auto_monitor=True
EpicsSignalWithRBV, suffix="scan_duration", kind="config", auto_monitor=True
)
scan_load = Cpt(EpicsSignal, suffix="scan_load.PROC", kind="normal", put_complete=True)
scan_msg = Cpt(EpicsSignalRO, suffix="scan_msg_ENUM_RBV", kind="normal", auto_monitor=True)
scan_start_infinite = Cpt(EpicsSignal, suffix="scan_start_infinite", kind="normal")
scan_start_timer = Cpt(EpicsSignal, suffix="scan_start_timer", kind="normal")
scan_stop = Cpt(EpicsSignal, suffix="scan_stop", kind="normal")
scan_load = Cpt(EpicsSignal, suffix="scan_load.PROC", kind="config", put_complete=True)
scan_msg = Cpt(EpicsSignalRO, suffix="scan_msg_ENUM_RBV", kind="config", auto_monitor=True)
scan_start_infinite = Cpt(
EpicsSignal, suffix="scan_start_infinite.PROC", kind="config", put_complete=True
)
scan_start_timer = Cpt(
EpicsSignal, suffix="scan_start_timer.PROC", kind="config", put_complete=True
)
scan_stop = Cpt(EpicsSignal, suffix="scan_stop.PROC", kind="config", put_complete=True)
scan_status = Cpt(
EpicsSignalRO, suffix="scan_status_ENUM_RBV", kind="normal", auto_monitor=True
EpicsSignalRO, suffix="scan_status_ENUM_RBV", kind="config", auto_monitor=True
)
scan_time_left = Cpt(
EpicsSignalRO, suffix="scan_time_left_RBV", kind="normal", auto_monitor=True
EpicsSignalRO, suffix="scan_time_left_RBV", kind="config", auto_monitor=True
)
scan_done = Cpt(EpicsSignalRO, suffix="scan_done_RBV", kind="config", auto_monitor=True)
scan_val_reset = Cpt(
EpicsSignal, suffix="scan_val_reset.PROC", kind="config", put_complete=True
)
scan_progress = Cpt(EpicsSignalRO, suffix="scan_progress_RBV", kind="config", auto_monitor=True)
scan_spectra_done = Cpt(
EpicsSignalRO, suffix="scan_n_osc_RBV", kind="config", auto_monitor=True
)
scan_spectra_left = Cpt(
EpicsSignalRO, suffix="scan_n_osc_left_RBV", kind="config", auto_monitor=True
)
scan_done = Cpt(EpicsSignalRO, suffix="scan_done_RBV", kind="normal", auto_monitor=True)
scan_val_reset = Cpt(EpicsSignal, suffix="scan_val_reset.PROC", kind="config", put_complete=True)
scan_progress = Cpt(EpicsSignalRO, suffix="scan_progress_RBV", kind="normal", auto_monitor=True)
scan_spectra_done = Cpt(EpicsSignalRO, suffix="scan_n_osc_RBV", kind="normal", auto_monitor=True)
scan_spectra_left = Cpt(EpicsSignalRO, suffix="scan_n_osc_left_RBV", kind="normal", auto_monitor=True)
#TODO Review kind for all PVs
class Mo1Bragg(Device, PositionerBase):
"""Class for the Mo1 Bragg positioner of the Debye beamline"""
"""Class for the Mo1 Bragg positioner of the Debye beamline.
The prefix to connect to the soft IOC is X01DA-OP-MO1:BRAGG: which is connected to
the NI motor controller via web sockets.
USER_ACCESS = ["set_xtal"]
Args:
prefix (str): EPICS prefix to connect to the soft IOC
name (str): Name of the device
kind (Kind): Kind of the device
device_manager (object): Device manager object to connect to the BEC
parent (object): Parent object
kwargs: Additional keyword arguments
"""
USER_ACCESS = ["set_xtal", "stop_scan"]
crystal = Cpt(Mo1BraggCrystal, "")
encoder = Cpt(Mo1BraggEncoder, "")
@@ -210,7 +216,7 @@ class Mo1Bragg(Device, PositionerBase):
status = Cpt(Mo1BraggStatus, "")
# signal to indicate the move type 'energy' or 'angle'
move_type = Cpt(MoveTypeSignal, value=MoveType.ENERGY, kind="normal")
move_type = Cpt(MoveTypeSignal, value=MoveType.ENERGY, kind="config")
################ Motor PVs ################
# Energy PVs
@@ -220,13 +226,22 @@ class Mo1Bragg(Device, PositionerBase):
setpoint = Cpt(
EpicsSignalWithRBV, suffix="set_abs_pos_energy", kind="normal", auto_monitor=True
)
motor_is_moving = Cpt(
EpicsSignalRO, suffix="move_abs_done_RBV", kind="normal", auto_monitor=True
)
low_lim = Cpt(EpicsSignalRO, suffix="lo_lim_pos_energy_RBV", kind="config", auto_monitor=True)
high_lim = Cpt(EpicsSignalRO, suffix="hi_lim_pos_energy_RBV", kind="config", auto_monitor=True)
velocity = Cpt(EpicsSignalWithRBV, suffix="move_velocity", kind="config", auto_monitor=True)
# Angle PVs
# TODO consider making the angle a pseudo axis of the energy
# Or we could even consider making a pseudo positioner for the angle since its less often used and would
# be a more elegant solution
feedback_pos_angle = Cpt(
EpicsSignalRO, suffix="feedback_pos_angle_RBV", kind="hinted", auto_monitor=True
EpicsSignalRO, suffix="feedback_pos_angle_RBV", kind="normal", auto_monitor=True
)
setpoint_abs_angle = Cpt(
EpicsSignalWithRBV, suffix="set_abs_pos_angle", kind="normal", auto_monitor=True
)
low_limit_angle = Cpt(
EpicsSignalRO, suffix="lo_lim_pos_angle_RBV", kind="config", auto_monitor=True
@@ -234,20 +249,14 @@ class Mo1Bragg(Device, PositionerBase):
high_limit_angle = Cpt(
EpicsSignalRO, suffix="hi_lim_pos_angle_RBV", kind="config", auto_monitor=True
)
setpoint_abs_angle = Cpt(
EpicsSignalWithRBV, suffix="set_abs_pos_angle", kind="normal", auto_monitor=True
)
# Execute motion
move_abs = Cpt(EpicsSignal, suffix="move_abs.PROC", kind="config", put_complete=True)
move_stop = Cpt(EpicsSignal, suffix="move_stop", kind="config")
motor_is_moving = Cpt(
EpicsSignalRO, suffix="move_abs_done_RBV", kind="normal", auto_monitor=True
)
move_stop = Cpt(EpicsSignal, suffix="move_stop.PROC", kind="config", put_complete=True)
SUB_READBACK = "readback"
_default_sub = SUB_READBACK
SUB_PROGRESS = 'progress'
SUB_PROGRESS = "progress"
def __init__(
self, prefix="", *, name: str, kind: Kind = None, device_manager=None, parent=None, **kwargs
@@ -256,16 +265,17 @@ class Mo1Bragg(Device, PositionerBase):
self._stopped = False
self.device_manager = device_manager
self._move_thread = None
# TODO We need to investigate whether we could make the angle/energy a pseudo axis and only listen to a single feedback.
# This would follow more the idea of the motor implementation, i.e. problem with renaming the readback PV.
self.readback.name = self.name
self.service_cfg = None
self.scaninfo = None
self.scan_time = None
self.scan_duration = None
self.start = None
self.end = None
self.timeout_for_pvwait = 2.5
self.readback.name = self.name
# Wait for connection on all components, ensure IOC is connected
self.wait_for_connection(all_signals=True, timeout=5)
if device_manager:
self.device_manager = device_manager
@@ -274,20 +284,29 @@ class Mo1Bragg(Device, PositionerBase):
self.connector = self.device_manager.connector
self._update_scaninfo()
self._init()
self._on_init()
def _init(self):
def _on_init(self):
"""Action to be executed on initialization of the device"""
self.scan_control.scan_progress.subscribe(self._progress_update, run=False)
def _progress_update(self, value, **kwargs):
max_value=100
def _progress_update(self, value, **kwargs) -> None:
"""Callback method to update the scan progress, runs a callback to SUB_PROGRESS subscribers, i.e. BEC.
Args:
value (int) : current progress value
"""
max_value = 100
logger.info(f"Progress at {value}")
self._run_subs(sub_type=self.SUB_PROGRESS, value=value, max_value = max_value, done=bool(max_value == value))
self._run_subs(
sub_type=self.SUB_PROGRESS,
value=value,
max_value=max_value,
done=bool(max_value == value),
)
def _update_scaninfo(self) -> None:
"""Update scaninfo from BecScaninfoMixing
This depends on device manager and operation/sim_mode
"""
"""Connect to the ScanInfo mixin"""
self.scaninfo = bec_scaninfo_mixin.BecScaninfoMixin(self.device_manager)
self.scaninfo.load_scan_metadata()
@@ -322,9 +341,14 @@ class Mo1Bragg(Device, PositionerBase):
move_cpt = self.readback if move_type == MoveType.ENERGY else self.feedback_pos_angle
return move_cpt.get()
@property
def stopped(self) -> bool:
"""Return the stopped flag. If True, the motion is stopped."""
return self._stopped
# pylint: disable=arguments-differ
def check_value(self, value: float) -> None:
"""Check if value is within limits
"""Method to check if a value is within limits of the positioner. Called by PositionerBase.move()
Args:
value (float) : value to move axis to.
@@ -335,18 +359,27 @@ class Mo1Bragg(Device, PositionerBase):
raise LimitError(f"position={value} not within limits {self.limits}")
def stop(self, *, success=False) -> None:
"""Stop any motion on the positioner"""
# Stop the motion
#TODO decide appropriate stop procedure
self.stop_scan()
"""Stop any motion on the positioner
Args:
success (bool) : Flag to indicate if the motion was successful
"""
# Stop any motion on the device
self.move_stop.put(1)
# Move thread is stopped too
self._stopped = True
# Stop ongoing scan, check if this is still needed after work on th controller
self.stop_scan()
# Stop the move thread
if self._move_thread is not None:
self._move_thread.join()
self._move_thread = None
super().stop(success=success)
def stop_scan(self) -> None:
"""Stop the currently running scan gracefully, this finishes the running oscillation."""
self.scan_control.scan_stop.put(1)
self._stopped = True
def _move_and_finish(
self,
target_pos: float,
@@ -355,13 +388,16 @@ class Mo1Bragg(Device, PositionerBase):
status: DeviceStatus,
update_frequency: float = 0.1,
) -> None:
"""Move the simulated device and finish the motion.
"""Method to be called in the move thread to move the Bragg positioner to the target position.
Args:
target_pos (float) : target position for the motion
status (DeviceStatus) : status object to set the status of the motion
target_pos (float) : target position for the motion
move_cpt (Cpt) : component to set the target position on the IOC,
either setpoint or setpoint_abs_angle depending on the move type
read_cpt (Cpt) : component to read the current position of the motion, readback or feedback_pos_angle
status (DeviceStatus) : status object to set the status of the motion
update_frequency (float): Optional, frequency to update the current position of the motion, defaults to 0.1s
"""
#TODO if called in sequence, the move resolves directly.
success = True
exception = None
try:
@@ -369,34 +405,34 @@ class Mo1Bragg(Device, PositionerBase):
move_cpt.put(target_pos)
# Start motion
self.move_abs.put(1)
# Sleep needed due to updated frequency to soft IOC
# Currently sleep is needed due to delay in updates on PVs, maybe time can be reduced
time.sleep(0.5)
while self.motor_is_moving.get() == 0:
# TODO check if the _run_subs is needed since we have an auto_monitor on the readback PV
# However, since the move_type can change, it might be necessary to have it here
val = read_cpt.get()
self._run_subs(sub_type=self.SUB_READBACK, value=val)
logger.info(f"Current position of {self.name} is {val}")
if self._stopped:
if self.stopped:
success = False
break
time.sleep(update_frequency)
# pylint: disable=broad-except
except Exception as exc:
content = traceback.format_exc()
logger.error(f"Error in move thread of device {self.name}: {content}")
# status._finished(success=success)
exception = exc
#status.set_exception(exc=exc)
finally:
if exception:
status.set_exception(exc=exception)
else:
# The set_finished method does not allow passing success, so we use this one for the moment.
# pylint: disable=protected-access
status._finished(success=success)
def move(self, value: float, move_type: str | MoveType = None, **kwargs) -> DeviceStatus:
"""Move the Bragg positioner to the specified value.
The motion considers the current move_type, and allows it to be adjusted if specified.
This allows the motion to switch between energy and angle moves.
"""Move the Bragg positioner to the specified value, allows to switch between move types angle and energy.
Args:
value (float) : target value for the motion
@@ -409,12 +445,12 @@ class Mo1Bragg(Device, PositionerBase):
if move_type is not None:
self.move_type.put(move_type)
move_type = self.move_type.get()
# This checks also if the value is within the limits
self.check_value(value)
status = DeviceStatus(device=self)
# Get the right move/read component for the current move_type
move_cpt = self.setpoint if move_type == MoveType.ENERGY else self.setpoint_abs_angle
read_cpt = self.readback if move_type == MoveType.ENERGY else self.feedback_pos_angle
self.check_value(value)
status = DeviceStatus(device=self)
self._move_thread = threading.Thread(
target=self._move_and_finish, args=(value, move_cpt, read_cpt, status, 0.1)
)
@@ -455,10 +491,9 @@ class Mo1Bragg(Device, PositionerBase):
f"Invalid argument for xtal_enum : {xtal_enum}, choose from '111' or '311'"
)
self.crystal.xtal_enum.put(crystal_set)
# Send the new settings from the IOC to the motor controller
self.crystal.set_offset.put(1)
def set_xas_settings(self, low: float, high: float, scan_time: float):
def set_xas_settings(self, low: float, high: float, scan_time: float) -> None:
"""Set XAS parameters for upcoming scan.
Args:
@@ -475,15 +510,21 @@ class Mo1Bragg(Device, PositionerBase):
self.scan_settings.s_scan_angle_hi.put(high)
self.scan_settings.s_scan_scantime.put(scan_time)
def set_xrd_settings(self, enable: bool):
def set_xrd_settings(self, enable: bool) -> None:
"""Set XRD settings for the upcoming scan."""
self.scan_settings.xrd_enable_hi_enum.put(int(enable))
self.scan_settings.xrd_enable_lo_enum.put(int(enable))
# TODO add here the remaining settings that are possible for XRD
def set_scan_control_settings(self, mode: ScanControlMode, scan_duration: float):
"""Set the scan control settings for the upcoming scan."""
val = ScanControlMode(mode)
self.scan_control.scan_mode_enum.put(val) # 0 Simple 1 Advance
def set_scan_control_settings(self, mode: ScanControlMode, scan_duration: float) -> None:
"""Set the scan control settings for the upcoming scan.
Args:
mode (ScanControlMode): Mode for the scan, either simple or advanced
scan_duration (float): Duration of the scan
"""
val = ScanControlMode(mode).value
self.scan_control.scan_mode_enum.put(val)
self.scan_control.scan_duration.put(scan_duration)
def setup_simple_xas_scan(
@@ -502,14 +543,13 @@ class Mo1Bragg(Device, PositionerBase):
high (float): High energy/angle value of the scan
scan_time (float): Time for a half oscillation
mode (Literal["simple", "advanced"]): Pick a mode for the oscillation motion, either simple or advanced (needs to be configured)
scan_duration (float): Duration of the scan
scan_duration (float): Duration of the scan, if <0.1s the scan will be started in infinite motion mode
"""
# TODO check if maybe we want an extra argument for infinite or finite motion
self.set_xas_settings(low=low, high=high, scan_time=scan_time)
self.set_xrd_settings(False)
self.set_scan_control_settings(mode=mode, scan_duration=scan_duration)
self.scan_control.scan_load.put(1)
def kickoff(self):
"""Kickoff the device, called from BEC."""
@@ -549,34 +589,33 @@ class Mo1Bragg(Device, PositionerBase):
def _get_scaninfo_parameters(self):
"""Get the scaninfo parameters for the scan."""
self.start, self.end = self.scaninfo.scan_msg.info["positions"]
# scan_time and scan_duration have to be extracted from scan_msg string for the moment, will be changed in future
scan_time_pattern = r"scan_time': (\d+\.?\d*)"
scan_duration_pattern = r"scan_duration': (\d+\.?\d*)"
scan_dur_pattern = r"scan_duration': (\d+\.?\d*)"
pattern_match = re.search(scan_time_pattern, self.scaninfo.scan_msg.info["scan_msgs"][0])
self.scan_time = float(pattern_match.group(1))
pattern_match = re.search(
scan_duration_pattern, self.scaninfo.scan_msg.info["scan_msgs"][0]
)
pattern_match = re.search(scan_dur_pattern, self.scaninfo.scan_msg.info["scan_msgs"][0])
self.scan_duration = float(pattern_match.group(1))
def on_stage(self) -> None:
"""Actions to be executed when the device is staged."""
if not self.scaninfo.scan_type == "xas":
if not self.scaninfo.scan_type == "fly":
return
# Assume that scan_val_reset was usee in unstage, if required, do it again but then we have to introduce a sleep ~1
if self.scan_control.scan_msg.get() != ScanControlLoadMessage.PENDING:
self.scan_control.scan_val_reset.put(1)#.set()
time.sleep(1)
self.scan_control.scan_val_reset.put(1)
# Wait for reset to be done
# TODO check if this sleep can really be removed
# time.sleep(1)
self._get_scaninfo_parameters()
# Wait for check to go to Pending
if not self.wait_for_signals(
signal_conditions=[
(self.scan_control.scan_msg.get, ScanControlLoadMessage.PENDING)
],
signal_conditions=[(self.scan_control.scan_msg.get, ScanControlLoadMessage.PENDING)],
timeout=self.timeout_for_pvwait,
check_stopped=True,
):
raise TimeoutError(
f"Timeout while waiting for scan status to get ready. State: {self.scan_control.scan_status.get()}"
f"Timeout after {self.timeout_for_pvwait} while waiting for scan status, current state: {self.scan_control.scan_status.get()}"
)
# Add here the logic to setup different type of scans
self.setup_simple_xas_scan(
@@ -586,11 +625,9 @@ class Mo1Bragg(Device, PositionerBase):
mode=ScanControlMode.SIMPLE,
scan_duration=self.scan_duration,
)
#Wait for params to be checked from controller
# Wait for params to be checked from controller
if not self.wait_for_signals(
signal_conditions=[
(self.scan_control.scan_msg.get, ScanControlLoadMessage.SUCCESS)
],
signal_conditions=[(self.scan_control.scan_msg.get, ScanControlLoadMessage.SUCCESS)],
timeout=self.timeout_for_pvwait,
check_stopped=True,
):
@@ -598,13 +635,10 @@ class Mo1Bragg(Device, PositionerBase):
f"Scan parameter validation run into timeout after {self.timeout_for_pvwait} with {self.scan_control.scan_status.get()}"
)
def complete(self) -> None:
"""Complete the acquisition, called from BEC.
def complete(self) -> DeviceStatus:
"""Complete the acquisition.
This function is called after the scan is complete, just before unstage.
We can check here with the data backend and detector if the acquisition successfully finished.
Actions are implemented in custom_prepare.on_complete since they are beamline specific.
The method returns a DeviceStatus object that resolves to set_finished or set_exception once the acquisition is completed.
"""
status = self.on_complete()
if isinstance(status, DeviceStatus):
@@ -614,11 +648,7 @@ class Mo1Bragg(Device, PositionerBase):
return status
def on_complete(self) -> DeviceStatus:
"""
Actions to be executed when the device is completed.
Implement for instance checks if the acquisition was successful.
"""
# Create DeviceStatus object with hook to finishing the scan.
"""Specify actions to be performed for the completion of the acquisition."""
status = self.wait_with_status(
signal_conditions=[(self.scan_control.scan_done.get, 1)],
timeout=None,
@@ -626,12 +656,6 @@ class Mo1Bragg(Device, PositionerBase):
)
return status
def stop_scan(self) -> None:
"""Stop the scan manually."""
self.scan_control.scan_stop.put(1)
#TODO Check if a single variable for stop scan and stop motion is sufficient.
self._stopped = True
def unstage(self) -> list[object]:
"""
Unstage device after a scan. It has to be possible to call this multiple times.
@@ -648,7 +672,7 @@ class Mo1Bragg(Device, PositionerBase):
def on_unstage(self) -> None:
"""Actions to be executed when the device is unstaged."""
# Reset setting of paramters after scan to be ready and clean for the next scan
# Reset scan parameter validation
self.scan_control.scan_val_reset.put(1)
def check_scan_id(self) -> None:
@@ -666,13 +690,13 @@ class Mo1Bragg(Device, PositionerBase):
interval: float = 0.05,
all_signals: bool = False,
) -> bool:
"""
Convenience wrapper to allow waiting for signals to reach a certain condition.
"""Wrapper around a list of conditions that allows waiting for them to become True.
For EPICs PVs, an example usage is pasted at the bottom.
Args:
signal_conditions (list[tuple]): tuple of executable calls for conditions (get_current_state, condition) to check
timeout (float): timeout in seconds
check_stopped (bool): True if stopped flag should be checked. The function relies on the self.stopped property to be set
interval (float): interval in seconds
all_signals (bool): True if all signals should be True, False if any signal should be True
@@ -689,7 +713,7 @@ class Mo1Bragg(Device, PositionerBase):
get_current_state() == condition
for get_current_state, condition in signal_conditions
]
if check_stopped is True and self._stopped is True:
if check_stopped is True and self.stopped is True:
return False
if (all_signals and all(checks)) or (not all_signals and any(checks)):
return True
@@ -707,16 +731,9 @@ class Mo1Bragg(Device, PositionerBase):
all_signals: bool = False,
exception_on_timeout: Exception = TimeoutError("Timeout while waiting for signals"),
) -> DeviceStatus:
"""Utility function to wait for signals in a thread.
Returns a DevicesStatus object that resolves either to set_finished or set_exception.
The DeviceStatus is attached to the parent device, i.e. the detector object inheriting from PSIDetectorBase.
Usage:
This function should be used to wait for signals to reach a certain condition, especially in the context of
on_trigger and on_complete. If it is not used, functions may block and slow down the performance of BEC.
It will return a DeviceStatus object that is to be returned from the function. Once the conditions are met,
the DeviceStatus will be set to set_finished in case of success or set_exception in case of a timeout or exception.
The exception can be specified with the exception_on_timeout argument. The default exception is a TimeoutError.
"""Wrapper around wait_for_signals to be started in thread and attach a DeviceStatus object.
This allows BEC to perform actinos in parallel and not be blocked by method calls on a device.
Typically used for on_trigger, on_complete methods or also the kickoff.
Args:
signal_conditions (list[tuple]): tuple of executable calls for conditions (get_current_state, condition) to check
@@ -732,7 +749,6 @@ class Mo1Bragg(Device, PositionerBase):
status = DeviceStatus(device=self)
# utility function to wrap the wait_for_signals function
def wait_for_signals_wrapper(
status: DeviceStatus,
signal_conditions: list[tuple],
@@ -761,6 +777,7 @@ class Mo1Bragg(Device, PositionerBase):
status.set_finished()
else:
status.set_exception(exception_on_timeout)
# pylint: disable=broad-except
except Exception as exc:
status.set_exception(exc=exc)
@@ -779,8 +796,3 @@ class Mo1Bragg(Device, PositionerBase):
)
thread.start()
return status
if __name__ == "__main__":
# bragg = Mo1Bragg(name='bragg', prefix="X01DA-OP-MO1:BRAGG:")
pass