From 5201bbf48088092a61b21e0fe572a2868adfa751 Mon Sep 17 00:00:00 2001 From: appel_c Date: Fri, 19 Jul 2024 08:54:40 +0200 Subject: [PATCH] refactor: refactored methods, changed PV kinds and added more info to docstrings --- debye_bec/devices/mo1_bragg.py | 384 +++++++++++++++++---------------- 1 file changed, 198 insertions(+), 186 deletions(-) diff --git a/debye_bec/devices/mo1_bragg.py b/debye_bec/devices/mo1_bragg.py index d089723..1c3c29c 100644 --- a/debye_bec/devices/mo1_bragg.py +++ b/debye_bec/devices/mo1_bragg.py @@ -1,4 +1,12 @@ -""" Module for the Mo1 Bragg positioner of the Debye beamline""" +""" Module for the Mo1 Bragg positioner of the Debye beamline. +The soft IOC is reachable via the EPICS prefix X01DA-OP-MO1:BRAGG: and connected +to a motor controller via web sockets. The Mo1 Bragg positioner is not only a +positioner, but also a scan controller to setup XAS and XRD scans. A few scan modes +are programmed in the controller, e.g. simple and advanced XAS scans + XRD triggering mode. + +Note: For some of the Epics PVs, in particular action buttons, the suffix .PROC and +put_complete=True is used to ensure that the action is executed completely. This is believed +to allow for a more stable execution of the action.""" import enum import threading @@ -34,12 +42,14 @@ class ScanControlScanStatus(int, enum.Enum): READY = 2 RUNNING = 3 + class ScanControlLoadMessage(int, enum.Enum): - """ Validation of loading the scan parameters """ + """Validation of loading the scan parameters""" + PENDING = 0 STARTED = 1 SUCCESS = 2 - #TODO add here specific errors above 3-14 + # TODO add here specific errors above 3-14 INVALID_SCAN_MODE = 15 @@ -77,7 +87,8 @@ class MoveTypeSignal(Signal): class Mo1BraggStatus(Device): - # General status + """Mo1 Bragg PVs for status monitoring""" + error_status = Cpt(EpicsSignalRO, suffix="error_status_RBV", kind="config", auto_monitor=True) brake_enabled = Cpt(EpicsSignalRO, suffix="brake_enabled_RBV", kind="config", auto_monitor=True) mot_commutated = Cpt( @@ -91,117 +102,112 @@ class Mo1BraggStatus(Device): class Mo1BraggEncoder(Device): - # Encoder reinitialization - enc_reinit = Cpt(EpicsSignal, suffix="enc_reinit", kind="config") - enc_reinit_done = Cpt( - EpicsSignalRO, suffix="enc_reinit_done_RBV", kind="config", auto_monitor=True - ) + """Mo1 Bragg PVs to communiucate with the encoder""" + + enc_reinit = Cpt(EpicsSignal, suffix="enc_reinit.PROC", kind="config") + enc_reinit_done = Cpt(EpicsSignalRO, suffix="enc_reinit_done_RBV", kind="config") class Mo1BraggCrystal(Device): - """Class to set the crystal parameters of the Bragg positioner""" + """Mo1 Bragg PVs to set the crystal parameters""" - offset_si111 = Cpt(EpicsSignalWithRBV, suffix="offset_si111", kind="config", auto_monitor=True) - offset_si311 = Cpt(EpicsSignalWithRBV, suffix="offset_si311", kind="config", auto_monitor=True) - xtal_enum = Cpt(EpicsSignalWithRBV, suffix="xtal_ENUM", kind="config", auto_monitor=True) - d_spacing_si111 = Cpt( - EpicsSignalWithRBV, suffix="d_spacing_si111", kind="config", auto_monitor=True - ) - d_spacing_si311 = Cpt( - EpicsSignalWithRBV, suffix="d_spacing_si311", kind="config", auto_monitor=True - ) - set_offset = Cpt(EpicsSignal, suffix="set_offset", kind="config") + offset_si111 = Cpt(EpicsSignalWithRBV, suffix="offset_si111", kind="config") + offset_si311 = Cpt(EpicsSignalWithRBV, suffix="offset_si311", kind="config") + xtal_enum = Cpt(EpicsSignalWithRBV, suffix="xtal_ENUM", kind="config") + d_spacing_si111 = Cpt(EpicsSignalWithRBV, suffix="d_spacing_si111", kind="config") + d_spacing_si311 = Cpt(EpicsSignalWithRBV, suffix="d_spacing_si311", kind="config") + set_offset = Cpt(EpicsSignal, suffix="set_offset.PROC", kind="config", put_complete=True) current_xtal = Cpt( EpicsSignalRO, suffix="current_xtal_ENUM_RBV", kind="normal", auto_monitor=True ) -# TODO Reevaluate which PVs need and should have auto_monitors + kind = normal/config! class Mo1BraggScanSettings(Device): - """Scan Settings for Mo1 Bragg positioner""" + """Mo1 Bragg PVs to set the scan setttings""" - # XRD measurement settings - xrd_select_ref_enum = Cpt( - EpicsSignalWithRBV, suffix="xrd_select_ref_ENUM", kind="normal", auto_monitor=True - ) + # XRD settings + xrd_select_ref_enum = Cpt(EpicsSignalWithRBV, suffix="xrd_select_ref_ENUM", kind="confit") + xrd_enable_hi_enum = Cpt(EpicsSignalWithRBV, suffix="xrd_enable_hi_ENUM", kind="config") - # High - xrd_enable_hi_enum = Cpt( - EpicsSignalWithRBV, suffix="xrd_enable_hi_ENUM", kind="normal", auto_monitor=True - ) - xrd_time_hi = Cpt(EpicsSignalWithRBV, suffix="xrd_time_hi", kind="normal", auto_monitor=True) - xrd_n_trigger_hi = Cpt( - EpicsSignalWithRBV, suffix="xrd_n_trigger_hi", kind="normal", auto_monitor=True - ) - xrd_every_n_hi = Cpt( - EpicsSignalWithRBV, suffix="xrd_every_n_hi", kind="normal", auto_monitor=True - ) + xrd_time_hi = Cpt(EpicsSignalWithRBV, suffix="xrd_time_hi", kind="config") + xrd_n_trigger_hi = Cpt(EpicsSignalWithRBV, suffix="xrd_n_trigger_hi", kind="config") + xrd_every_n_hi = Cpt(EpicsSignalWithRBV, suffix="xrd_every_n_hi", kind="config") - # Low - xrd_enable_lo_enum = Cpt( - EpicsSignalWithRBV, suffix="xrd_enable_lo_ENUM", kind="normal", auto_monitor=True - ) - xrd_time_lo = Cpt(EpicsSignalWithRBV, suffix="xrd_time_lo", kind="normal", auto_monitor=True) - xrd_n_trigger_lo = Cpt( - EpicsSignalWithRBV, suffix="xrd_n_trigger_lo", kind="normal", auto_monitor=True - ) - xrd_every_n_lo = Cpt( - EpicsSignalWithRBV, suffix="xrd_every_n_lo", kind="normal", auto_monitor=True - ) + xrd_enable_lo_enum = Cpt(EpicsSignalWithRBV, suffix="xrd_enable_lo_ENUM", kind="config") + xrd_time_lo = Cpt(EpicsSignalWithRBV, suffix="xrd_time_lo", kind="config") + xrd_n_trigger_lo = Cpt(EpicsSignalWithRBV, suffix="xrd_n_trigger_lo", kind="config") + xrd_every_n_lo = Cpt(EpicsSignalWithRBV, suffix="xrd_every_n_lo", kind="config") # XAS simple scan settings - s_scan_angle_hi = Cpt( - EpicsSignalWithRBV, suffix="s_scan_angle_hi", kind="normal", auto_monitor=True - ) - s_scan_angle_lo = Cpt( - EpicsSignalWithRBV, suffix="s_scan_angle_lo", kind="normal", auto_monitor=True - ) + s_scan_angle_hi = Cpt(EpicsSignalWithRBV, suffix="s_scan_angle_hi", kind="config") + s_scan_angle_lo = Cpt(EpicsSignalWithRBV, suffix="s_scan_angle_lo", kind="config") s_scan_energy_lo = Cpt( - EpicsSignalWithRBV, suffix="s_scan_energy_lo", kind="normal", auto_monitor=True + EpicsSignalWithRBV, suffix="s_scan_energy_lo", kind="config", auto_monitor=True ) s_scan_energy_hi = Cpt( - EpicsSignalWithRBV, suffix="s_scan_energy_hi", kind="normal", auto_monitor=True + EpicsSignalWithRBV, suffix="s_scan_energy_hi", kind="config", auto_monitor=True ) s_scan_scantime = Cpt( - EpicsSignalWithRBV, suffix="s_scan_scantime", kind="normal", auto_monitor=True + EpicsSignalWithRBV, suffix="s_scan_scantime", kind="config", auto_monitor=True ) # XAS advanced scan settings - a_scan_pos = Cpt(EpicsSignalWithRBV, suffix="a_scan_pos", kind="normal", auto_monitor=True) - a_scan_vel = Cpt(EpicsSignalWithRBV, suffix="a_scan_vel", kind="normal", auto_monitor=True) - a_scan_time = Cpt(EpicsSignalWithRBV, suffix="a_scan_time", kind="normal", auto_monitor=True) + a_scan_pos = Cpt(EpicsSignalWithRBV, suffix="a_scan_pos", kind="config", auto_monitor=True) + a_scan_vel = Cpt(EpicsSignalWithRBV, suffix="a_scan_vel", kind="config", auto_monitor=True) + a_scan_time = Cpt(EpicsSignalWithRBV, suffix="a_scan_time", kind="config", auto_monitor=True) class Mo1BraggScanControl(Device): - # Scan control - scan_mode_enum = Cpt( - EpicsSignalWithRBV, suffix="scan_mode_ENUM", kind="normal", auto_monitor=True - ) + """Mo1 Bragg PVs to control the scan after setting the parameters.""" + + scan_mode_enum = Cpt(EpicsSignalWithRBV, suffix="scan_mode_ENUM", kind="config") scan_duration = Cpt( - EpicsSignalWithRBV, suffix="scan_duration", kind="normal", auto_monitor=True + EpicsSignalWithRBV, suffix="scan_duration", kind="config", auto_monitor=True ) - scan_load = Cpt(EpicsSignal, suffix="scan_load.PROC", kind="normal", put_complete=True) - scan_msg = Cpt(EpicsSignalRO, suffix="scan_msg_ENUM_RBV", kind="normal", auto_monitor=True) - scan_start_infinite = Cpt(EpicsSignal, suffix="scan_start_infinite", kind="normal") - scan_start_timer = Cpt(EpicsSignal, suffix="scan_start_timer", kind="normal") - scan_stop = Cpt(EpicsSignal, suffix="scan_stop", kind="normal") + scan_load = Cpt(EpicsSignal, suffix="scan_load.PROC", kind="config", put_complete=True) + scan_msg = Cpt(EpicsSignalRO, suffix="scan_msg_ENUM_RBV", kind="config", auto_monitor=True) + scan_start_infinite = Cpt( + EpicsSignal, suffix="scan_start_infinite.PROC", kind="config", put_complete=True + ) + scan_start_timer = Cpt( + EpicsSignal, suffix="scan_start_timer.PROC", kind="config", put_complete=True + ) + scan_stop = Cpt(EpicsSignal, suffix="scan_stop.PROC", kind="config", put_complete=True) scan_status = Cpt( - EpicsSignalRO, suffix="scan_status_ENUM_RBV", kind="normal", auto_monitor=True + EpicsSignalRO, suffix="scan_status_ENUM_RBV", kind="config", auto_monitor=True ) scan_time_left = Cpt( - EpicsSignalRO, suffix="scan_time_left_RBV", kind="normal", auto_monitor=True + EpicsSignalRO, suffix="scan_time_left_RBV", kind="config", auto_monitor=True + ) + scan_done = Cpt(EpicsSignalRO, suffix="scan_done_RBV", kind="config", auto_monitor=True) + scan_val_reset = Cpt( + EpicsSignal, suffix="scan_val_reset.PROC", kind="config", put_complete=True + ) + scan_progress = Cpt(EpicsSignalRO, suffix="scan_progress_RBV", kind="config", auto_monitor=True) + scan_spectra_done = Cpt( + EpicsSignalRO, suffix="scan_n_osc_RBV", kind="config", auto_monitor=True + ) + scan_spectra_left = Cpt( + EpicsSignalRO, suffix="scan_n_osc_left_RBV", kind="config", auto_monitor=True ) - scan_done = Cpt(EpicsSignalRO, suffix="scan_done_RBV", kind="normal", auto_monitor=True) - scan_val_reset = Cpt(EpicsSignal, suffix="scan_val_reset.PROC", kind="config", put_complete=True) - scan_progress = Cpt(EpicsSignalRO, suffix="scan_progress_RBV", kind="normal", auto_monitor=True) - scan_spectra_done = Cpt(EpicsSignalRO, suffix="scan_n_osc_RBV", kind="normal", auto_monitor=True) - scan_spectra_left = Cpt(EpicsSignalRO, suffix="scan_n_osc_left_RBV", kind="normal", auto_monitor=True) -#TODO Review kind for all PVs + class Mo1Bragg(Device, PositionerBase): - """Class for the Mo1 Bragg positioner of the Debye beamline""" + """Class for the Mo1 Bragg positioner of the Debye beamline. + The prefix to connect to the soft IOC is X01DA-OP-MO1:BRAGG: which is connected to + the NI motor controller via web sockets. - USER_ACCESS = ["set_xtal"] + Args: + prefix (str): EPICS prefix to connect to the soft IOC + name (str): Name of the device + kind (Kind): Kind of the device + device_manager (object): Device manager object to connect to the BEC + parent (object): Parent object + kwargs: Additional keyword arguments + + """ + + USER_ACCESS = ["set_xtal", "stop_scan"] crystal = Cpt(Mo1BraggCrystal, "") encoder = Cpt(Mo1BraggEncoder, "") @@ -210,7 +216,7 @@ class Mo1Bragg(Device, PositionerBase): status = Cpt(Mo1BraggStatus, "") # signal to indicate the move type 'energy' or 'angle' - move_type = Cpt(MoveTypeSignal, value=MoveType.ENERGY, kind="normal") + move_type = Cpt(MoveTypeSignal, value=MoveType.ENERGY, kind="config") ################ Motor PVs ################ # Energy PVs @@ -220,13 +226,22 @@ class Mo1Bragg(Device, PositionerBase): setpoint = Cpt( EpicsSignalWithRBV, suffix="set_abs_pos_energy", kind="normal", auto_monitor=True ) + motor_is_moving = Cpt( + EpicsSignalRO, suffix="move_abs_done_RBV", kind="normal", auto_monitor=True + ) low_lim = Cpt(EpicsSignalRO, suffix="lo_lim_pos_energy_RBV", kind="config", auto_monitor=True) high_lim = Cpt(EpicsSignalRO, suffix="hi_lim_pos_energy_RBV", kind="config", auto_monitor=True) velocity = Cpt(EpicsSignalWithRBV, suffix="move_velocity", kind="config", auto_monitor=True) # Angle PVs + # TODO consider making the angle a pseudo axis of the energy + # Or we could even consider making a pseudo positioner for the angle since its less often used and would + # be a more elegant solution feedback_pos_angle = Cpt( - EpicsSignalRO, suffix="feedback_pos_angle_RBV", kind="hinted", auto_monitor=True + EpicsSignalRO, suffix="feedback_pos_angle_RBV", kind="normal", auto_monitor=True + ) + setpoint_abs_angle = Cpt( + EpicsSignalWithRBV, suffix="set_abs_pos_angle", kind="normal", auto_monitor=True ) low_limit_angle = Cpt( EpicsSignalRO, suffix="lo_lim_pos_angle_RBV", kind="config", auto_monitor=True @@ -234,20 +249,14 @@ class Mo1Bragg(Device, PositionerBase): high_limit_angle = Cpt( EpicsSignalRO, suffix="hi_lim_pos_angle_RBV", kind="config", auto_monitor=True ) - setpoint_abs_angle = Cpt( - EpicsSignalWithRBV, suffix="set_abs_pos_angle", kind="normal", auto_monitor=True - ) # Execute motion move_abs = Cpt(EpicsSignal, suffix="move_abs.PROC", kind="config", put_complete=True) - move_stop = Cpt(EpicsSignal, suffix="move_stop", kind="config") - motor_is_moving = Cpt( - EpicsSignalRO, suffix="move_abs_done_RBV", kind="normal", auto_monitor=True - ) + move_stop = Cpt(EpicsSignal, suffix="move_stop.PROC", kind="config", put_complete=True) SUB_READBACK = "readback" _default_sub = SUB_READBACK - SUB_PROGRESS = 'progress' + SUB_PROGRESS = "progress" def __init__( self, prefix="", *, name: str, kind: Kind = None, device_manager=None, parent=None, **kwargs @@ -256,16 +265,17 @@ class Mo1Bragg(Device, PositionerBase): self._stopped = False self.device_manager = device_manager self._move_thread = None - # TODO We need to investigate whether we could make the angle/energy a pseudo axis and only listen to a single feedback. - # This would follow more the idea of the motor implementation, i.e. problem with renaming the readback PV. - self.readback.name = self.name self.service_cfg = None self.scaninfo = None self.scan_time = None self.scan_duration = None self.start = None self.end = None + self.timeout_for_pvwait = 2.5 + self.readback.name = self.name + # Wait for connection on all components, ensure IOC is connected + self.wait_for_connection(all_signals=True, timeout=5) if device_manager: self.device_manager = device_manager @@ -274,20 +284,29 @@ class Mo1Bragg(Device, PositionerBase): self.connector = self.device_manager.connector self._update_scaninfo() - self._init() + self._on_init() - def _init(self): + def _on_init(self): + """Action to be executed on initialization of the device""" self.scan_control.scan_progress.subscribe(self._progress_update, run=False) - def _progress_update(self, value, **kwargs): - max_value=100 + def _progress_update(self, value, **kwargs) -> None: + """Callback method to update the scan progress, runs a callback to SUB_PROGRESS subscribers, i.e. BEC. + + Args: + value (int) : current progress value + """ + max_value = 100 logger.info(f"Progress at {value}") - self._run_subs(sub_type=self.SUB_PROGRESS, value=value, max_value = max_value, done=bool(max_value == value)) + self._run_subs( + sub_type=self.SUB_PROGRESS, + value=value, + max_value=max_value, + done=bool(max_value == value), + ) def _update_scaninfo(self) -> None: - """Update scaninfo from BecScaninfoMixing - This depends on device manager and operation/sim_mode - """ + """Connect to the ScanInfo mixin""" self.scaninfo = bec_scaninfo_mixin.BecScaninfoMixin(self.device_manager) self.scaninfo.load_scan_metadata() @@ -322,9 +341,14 @@ class Mo1Bragg(Device, PositionerBase): move_cpt = self.readback if move_type == MoveType.ENERGY else self.feedback_pos_angle return move_cpt.get() + @property + def stopped(self) -> bool: + """Return the stopped flag. If True, the motion is stopped.""" + return self._stopped + # pylint: disable=arguments-differ def check_value(self, value: float) -> None: - """Check if value is within limits + """Method to check if a value is within limits of the positioner. Called by PositionerBase.move() Args: value (float) : value to move axis to. @@ -335,18 +359,27 @@ class Mo1Bragg(Device, PositionerBase): raise LimitError(f"position={value} not within limits {self.limits}") def stop(self, *, success=False) -> None: - """Stop any motion on the positioner""" - # Stop the motion - #TODO decide appropriate stop procedure - self.stop_scan() + """Stop any motion on the positioner + + Args: + success (bool) : Flag to indicate if the motion was successful + """ + # Stop any motion on the device self.move_stop.put(1) - # Move thread is stopped too self._stopped = True + # Stop ongoing scan, check if this is still needed after work on th controller + self.stop_scan() + # Stop the move thread if self._move_thread is not None: self._move_thread.join() self._move_thread = None super().stop(success=success) + def stop_scan(self) -> None: + """Stop the currently running scan gracefully, this finishes the running oscillation.""" + self.scan_control.scan_stop.put(1) + self._stopped = True + def _move_and_finish( self, target_pos: float, @@ -355,13 +388,16 @@ class Mo1Bragg(Device, PositionerBase): status: DeviceStatus, update_frequency: float = 0.1, ) -> None: - """Move the simulated device and finish the motion. + """Method to be called in the move thread to move the Bragg positioner to the target position. Args: - target_pos (float) : target position for the motion - status (DeviceStatus) : status object to set the status of the motion + target_pos (float) : target position for the motion + move_cpt (Cpt) : component to set the target position on the IOC, + either setpoint or setpoint_abs_angle depending on the move type + read_cpt (Cpt) : component to read the current position of the motion, readback or feedback_pos_angle + status (DeviceStatus) : status object to set the status of the motion + update_frequency (float): Optional, frequency to update the current position of the motion, defaults to 0.1s """ - #TODO if called in sequence, the move resolves directly. success = True exception = None try: @@ -369,34 +405,34 @@ class Mo1Bragg(Device, PositionerBase): move_cpt.put(target_pos) # Start motion self.move_abs.put(1) - # Sleep needed due to updated frequency to soft IOC + # Currently sleep is needed due to delay in updates on PVs, maybe time can be reduced time.sleep(0.5) while self.motor_is_moving.get() == 0: + # TODO check if the _run_subs is needed since we have an auto_monitor on the readback PV + # However, since the move_type can change, it might be necessary to have it here val = read_cpt.get() self._run_subs(sub_type=self.SUB_READBACK, value=val) logger.info(f"Current position of {self.name} is {val}") - if self._stopped: + if self.stopped: success = False break time.sleep(update_frequency) - + # pylint: disable=broad-except except Exception as exc: content = traceback.format_exc() logger.error(f"Error in move thread of device {self.name}: {content}") - # status._finished(success=success) exception = exc - #status.set_exception(exc=exc) finally: if exception: status.set_exception(exc=exception) else: + # The set_finished method does not allow passing success, so we use this one for the moment. + # pylint: disable=protected-access status._finished(success=success) def move(self, value: float, move_type: str | MoveType = None, **kwargs) -> DeviceStatus: - """Move the Bragg positioner to the specified value. - The motion considers the current move_type, and allows it to be adjusted if specified. - This allows the motion to switch between energy and angle moves. + """Move the Bragg positioner to the specified value, allows to switch between move types angle and energy. Args: value (float) : target value for the motion @@ -409,12 +445,12 @@ class Mo1Bragg(Device, PositionerBase): if move_type is not None: self.move_type.put(move_type) move_type = self.move_type.get() - # This checks also if the value is within the limits - self.check_value(value) - status = DeviceStatus(device=self) - # Get the right move/read component for the current move_type move_cpt = self.setpoint if move_type == MoveType.ENERGY else self.setpoint_abs_angle read_cpt = self.readback if move_type == MoveType.ENERGY else self.feedback_pos_angle + + self.check_value(value) + status = DeviceStatus(device=self) + self._move_thread = threading.Thread( target=self._move_and_finish, args=(value, move_cpt, read_cpt, status, 0.1) ) @@ -455,10 +491,9 @@ class Mo1Bragg(Device, PositionerBase): f"Invalid argument for xtal_enum : {xtal_enum}, choose from '111' or '311'" ) self.crystal.xtal_enum.put(crystal_set) - # Send the new settings from the IOC to the motor controller self.crystal.set_offset.put(1) - def set_xas_settings(self, low: float, high: float, scan_time: float): + def set_xas_settings(self, low: float, high: float, scan_time: float) -> None: """Set XAS parameters for upcoming scan. Args: @@ -475,15 +510,21 @@ class Mo1Bragg(Device, PositionerBase): self.scan_settings.s_scan_angle_hi.put(high) self.scan_settings.s_scan_scantime.put(scan_time) - def set_xrd_settings(self, enable: bool): + def set_xrd_settings(self, enable: bool) -> None: """Set XRD settings for the upcoming scan.""" self.scan_settings.xrd_enable_hi_enum.put(int(enable)) self.scan_settings.xrd_enable_lo_enum.put(int(enable)) + # TODO add here the remaining settings that are possible for XRD - def set_scan_control_settings(self, mode: ScanControlMode, scan_duration: float): - """Set the scan control settings for the upcoming scan.""" - val = ScanControlMode(mode) - self.scan_control.scan_mode_enum.put(val) # 0 Simple 1 Advance + def set_scan_control_settings(self, mode: ScanControlMode, scan_duration: float) -> None: + """Set the scan control settings for the upcoming scan. + + Args: + mode (ScanControlMode): Mode for the scan, either simple or advanced + scan_duration (float): Duration of the scan + """ + val = ScanControlMode(mode).value + self.scan_control.scan_mode_enum.put(val) self.scan_control.scan_duration.put(scan_duration) def setup_simple_xas_scan( @@ -502,14 +543,13 @@ class Mo1Bragg(Device, PositionerBase): high (float): High energy/angle value of the scan scan_time (float): Time for a half oscillation mode (Literal["simple", "advanced"]): Pick a mode for the oscillation motion, either simple or advanced (needs to be configured) - scan_duration (float): Duration of the scan + scan_duration (float): Duration of the scan, if <0.1s the scan will be started in infinite motion mode """ # TODO check if maybe we want an extra argument for infinite or finite motion self.set_xas_settings(low=low, high=high, scan_time=scan_time) self.set_xrd_settings(False) self.set_scan_control_settings(mode=mode, scan_duration=scan_duration) self.scan_control.scan_load.put(1) - def kickoff(self): """Kickoff the device, called from BEC.""" @@ -549,34 +589,33 @@ class Mo1Bragg(Device, PositionerBase): def _get_scaninfo_parameters(self): """Get the scaninfo parameters for the scan.""" self.start, self.end = self.scaninfo.scan_msg.info["positions"] + # scan_time and scan_duration have to be extracted from scan_msg string for the moment, will be changed in future scan_time_pattern = r"scan_time': (\d+\.?\d*)" - scan_duration_pattern = r"scan_duration': (\d+\.?\d*)" + scan_dur_pattern = r"scan_duration': (\d+\.?\d*)" pattern_match = re.search(scan_time_pattern, self.scaninfo.scan_msg.info["scan_msgs"][0]) self.scan_time = float(pattern_match.group(1)) - pattern_match = re.search( - scan_duration_pattern, self.scaninfo.scan_msg.info["scan_msgs"][0] - ) + pattern_match = re.search(scan_dur_pattern, self.scaninfo.scan_msg.info["scan_msgs"][0]) self.scan_duration = float(pattern_match.group(1)) def on_stage(self) -> None: """Actions to be executed when the device is staged.""" - if not self.scaninfo.scan_type == "xas": + if not self.scaninfo.scan_type == "fly": return - # Assume that scan_val_reset was usee in unstage, if required, do it again but then we have to introduce a sleep ~1 + if self.scan_control.scan_msg.get() != ScanControlLoadMessage.PENDING: - self.scan_control.scan_val_reset.put(1)#.set() - time.sleep(1) + self.scan_control.scan_val_reset.put(1) + # Wait for reset to be done + # TODO check if this sleep can really be removed + # time.sleep(1) self._get_scaninfo_parameters() - # Wait for check to go to Pending + if not self.wait_for_signals( - signal_conditions=[ - (self.scan_control.scan_msg.get, ScanControlLoadMessage.PENDING) - ], + signal_conditions=[(self.scan_control.scan_msg.get, ScanControlLoadMessage.PENDING)], timeout=self.timeout_for_pvwait, check_stopped=True, ): raise TimeoutError( - f"Timeout while waiting for scan status to get ready. State: {self.scan_control.scan_status.get()}" + f"Timeout after {self.timeout_for_pvwait} while waiting for scan status, current state: {self.scan_control.scan_status.get()}" ) # Add here the logic to setup different type of scans self.setup_simple_xas_scan( @@ -586,11 +625,9 @@ class Mo1Bragg(Device, PositionerBase): mode=ScanControlMode.SIMPLE, scan_duration=self.scan_duration, ) - #Wait for params to be checked from controller + # Wait for params to be checked from controller if not self.wait_for_signals( - signal_conditions=[ - (self.scan_control.scan_msg.get, ScanControlLoadMessage.SUCCESS) - ], + signal_conditions=[(self.scan_control.scan_msg.get, ScanControlLoadMessage.SUCCESS)], timeout=self.timeout_for_pvwait, check_stopped=True, ): @@ -598,13 +635,10 @@ class Mo1Bragg(Device, PositionerBase): f"Scan parameter validation run into timeout after {self.timeout_for_pvwait} with {self.scan_control.scan_status.get()}" ) - def complete(self) -> None: - """Complete the acquisition, called from BEC. + def complete(self) -> DeviceStatus: + """Complete the acquisition. - This function is called after the scan is complete, just before unstage. - We can check here with the data backend and detector if the acquisition successfully finished. - - Actions are implemented in custom_prepare.on_complete since they are beamline specific. + The method returns a DeviceStatus object that resolves to set_finished or set_exception once the acquisition is completed. """ status = self.on_complete() if isinstance(status, DeviceStatus): @@ -614,11 +648,7 @@ class Mo1Bragg(Device, PositionerBase): return status def on_complete(self) -> DeviceStatus: - """ - Actions to be executed when the device is completed. - Implement for instance checks if the acquisition was successful. - """ - # Create DeviceStatus object with hook to finishing the scan. + """Specify actions to be performed for the completion of the acquisition.""" status = self.wait_with_status( signal_conditions=[(self.scan_control.scan_done.get, 1)], timeout=None, @@ -626,12 +656,6 @@ class Mo1Bragg(Device, PositionerBase): ) return status - def stop_scan(self) -> None: - """Stop the scan manually.""" - self.scan_control.scan_stop.put(1) - #TODO Check if a single variable for stop scan and stop motion is sufficient. - self._stopped = True - def unstage(self) -> list[object]: """ Unstage device after a scan. It has to be possible to call this multiple times. @@ -648,7 +672,7 @@ class Mo1Bragg(Device, PositionerBase): def on_unstage(self) -> None: """Actions to be executed when the device is unstaged.""" - # Reset setting of paramters after scan to be ready and clean for the next scan + # Reset scan parameter validation self.scan_control.scan_val_reset.put(1) def check_scan_id(self) -> None: @@ -666,13 +690,13 @@ class Mo1Bragg(Device, PositionerBase): interval: float = 0.05, all_signals: bool = False, ) -> bool: - """ - Convenience wrapper to allow waiting for signals to reach a certain condition. + """Wrapper around a list of conditions that allows waiting for them to become True. For EPICs PVs, an example usage is pasted at the bottom. Args: signal_conditions (list[tuple]): tuple of executable calls for conditions (get_current_state, condition) to check timeout (float): timeout in seconds + check_stopped (bool): True if stopped flag should be checked. The function relies on the self.stopped property to be set interval (float): interval in seconds all_signals (bool): True if all signals should be True, False if any signal should be True @@ -689,7 +713,7 @@ class Mo1Bragg(Device, PositionerBase): get_current_state() == condition for get_current_state, condition in signal_conditions ] - if check_stopped is True and self._stopped is True: + if check_stopped is True and self.stopped is True: return False if (all_signals and all(checks)) or (not all_signals and any(checks)): return True @@ -707,16 +731,9 @@ class Mo1Bragg(Device, PositionerBase): all_signals: bool = False, exception_on_timeout: Exception = TimeoutError("Timeout while waiting for signals"), ) -> DeviceStatus: - """Utility function to wait for signals in a thread. - Returns a DevicesStatus object that resolves either to set_finished or set_exception. - The DeviceStatus is attached to the parent device, i.e. the detector object inheriting from PSIDetectorBase. - - Usage: - This function should be used to wait for signals to reach a certain condition, especially in the context of - on_trigger and on_complete. If it is not used, functions may block and slow down the performance of BEC. - It will return a DeviceStatus object that is to be returned from the function. Once the conditions are met, - the DeviceStatus will be set to set_finished in case of success or set_exception in case of a timeout or exception. - The exception can be specified with the exception_on_timeout argument. The default exception is a TimeoutError. + """Wrapper around wait_for_signals to be started in thread and attach a DeviceStatus object. + This allows BEC to perform actinos in parallel and not be blocked by method calls on a device. + Typically used for on_trigger, on_complete methods or also the kickoff. Args: signal_conditions (list[tuple]): tuple of executable calls for conditions (get_current_state, condition) to check @@ -732,7 +749,6 @@ class Mo1Bragg(Device, PositionerBase): status = DeviceStatus(device=self) - # utility function to wrap the wait_for_signals function def wait_for_signals_wrapper( status: DeviceStatus, signal_conditions: list[tuple], @@ -761,6 +777,7 @@ class Mo1Bragg(Device, PositionerBase): status.set_finished() else: status.set_exception(exception_on_timeout) + # pylint: disable=broad-except except Exception as exc: status.set_exception(exc=exc) @@ -779,8 +796,3 @@ class Mo1Bragg(Device, PositionerBase): ) thread.start() return status - - -if __name__ == "__main__": - # bragg = Mo1Bragg(name='bragg', prefix="X01DA-OP-MO1:BRAGG:") - pass