From d26834c5d49e2f6c6d269643cb471ea04e65f8fc Mon Sep 17 00:00:00 2001 From: appel_c Date: Thu, 25 Jul 2024 18:04:41 +0200 Subject: [PATCH] refactor: mono scans and device refactoring, added dataclass for scan args --- debye_bec/devices/mo1_bragg.py | 313 +++++++++++++--------------- debye_bec/scans/mono_bragg_scans.py | 60 +++--- 2 files changed, 179 insertions(+), 194 deletions(-) diff --git a/debye_bec/devices/mo1_bragg.py b/debye_bec/devices/mo1_bragg.py index 4a62572..74ca262 100644 --- a/debye_bec/devices/mo1_bragg.py +++ b/debye_bec/devices/mo1_bragg.py @@ -13,6 +13,7 @@ import re import threading import time import traceback +from dataclasses import dataclass from typing import Literal from bec_lib.logger import bec_logger @@ -35,7 +36,7 @@ logger = bec_logger.logger class ScanControlScanStatus(int, enum.Enum): - """Enum class for the scan status of the Bragg positioner""" + """Enum class for the scan status of Bragg positioner""" PARAMETER_WRONG = 0 VALIDATION_PENDING = 1 @@ -44,7 +45,7 @@ class ScanControlScanStatus(int, enum.Enum): class ScanControlLoadMessage(int, enum.Enum): - """Validation of loading the scan parameters""" + """Enum for validating messages for load message of Bragg positioner""" PENDING = 0 STARTED = 1 @@ -102,7 +103,7 @@ class Mo1BraggStatus(Device): class Mo1BraggEncoder(Device): - """Mo1 Bragg PVs to communiucate with the encoder""" + """Mo1 Bragg PVs to communicate with the encoder""" enc_reinit = Cpt(EpicsSignal, suffix="enc_reinit.PROC", kind="config") enc_reinit_done = Cpt(EpicsSignalRO, suffix="enc_reinit_done_RBV", kind="config") @@ -192,19 +193,28 @@ class Mo1BraggScanControl(Device): ) +@dataclass +class ScanParameter: + """Dataclass to store the scan parameters for the Mo1 Bragg positioner""" + + scan_time: float = None + scan_duration: float = None + xrd_enable_low: bool = None + xrd_enable_high: bool = None + num_trigger_low: int = None + num_trigger_high: int = None + exp_time_low: float = None + exp_time_high: float = None + cycle_low: int = None + cycle_high: int = None + start: float = None + end: float = None + + class Mo1Bragg(Device, PositionerBase): """Class for the Mo1 Bragg positioner of the Debye beamline. The prefix to connect to the soft IOC is X01DA-OP-MO1:BRAGG: which is connected to the NI motor controller via web sockets. - - Args: - prefix (str): EPICS prefix to connect to the soft IOC - name (str): Name of the device - kind (Kind): Kind of the device - device_manager (object): Device manager object to connect to the BEC - parent (object): Parent object - kwargs: Additional keyword arguments - """ USER_ACCESS = ["set_xtal", "stop_scan"] @@ -234,9 +244,7 @@ class Mo1Bragg(Device, PositionerBase): velocity = Cpt(EpicsSignalWithRBV, suffix="move_velocity", kind="config", auto_monitor=True) # Angle PVs - # TODO consider making the angle a pseudo axis of the energy - # Or we could even consider making a pseudo positioner for the angle since its less often used and would - # be a more elegant solution + # TODO makd angle motion a pseudo motor feedback_pos_angle = Cpt( EpicsSignalRO, suffix="feedback_pos_angle_RBV", kind="normal", auto_monitor=True ) @@ -261,24 +269,24 @@ class Mo1Bragg(Device, PositionerBase): def __init__( self, prefix="", *, name: str, kind: Kind = None, device_manager=None, parent=None, **kwargs ): + """Initialize the Mo1 Bragg positioner. + + Args: + prefix (str): EPICS prefix for the device + name (str): Name of the device + kind (Kind): Kind of the device + device_manager (DeviceManager): Device manager instance + parent (Device): Parent device + kwargs: Additional keyword arguments + """ super().__init__(prefix, name=name, kind=kind, parent=parent, **kwargs) self._stopped = False self.device_manager = device_manager self._move_thread = None self.service_cfg = None self.scaninfo = None - self.scan_time = None - self.scan_duration = None - self.xrd_enable_low = None - self.xrd_enable_high = None - self.num_trigger_low = None - self.num_trigger_high = None - self.exp_time_low = None - self.exp_time_high = None - self.cycle_low = None - self.cycle_high = None - self.start = None - self.end = None + # Init scan parameters + self.scan_parameter = ScanParameter() self.timeout_for_pvwait = 2.5 self.readback.name = self.name @@ -304,7 +312,7 @@ class Mo1Bragg(Device, PositionerBase): Args: value (int) : current progress value """ - max_value=100 + max_value = 100 logger.info(f"Progress at {value}") self._run_subs( sub_type=self.SUB_PROGRESS, @@ -318,6 +326,31 @@ class Mo1Bragg(Device, PositionerBase): self.scaninfo = bec_scaninfo_mixin.BecScaninfoMixin(self.device_manager) self.scaninfo.load_scan_metadata() + @property + def stopped(self) -> bool: + """Return the stopped flag. If True, the motion is stopped.""" + return self._stopped + + def stop(self, *, success=False) -> None: + """Stop any motion on the positioner + + Args: + success (bool) : Flag to indicate if the motion was successful + """ + # Stop any motion on the device + self.move_stop.put(1) + # Stop the move thread + self._stopped = True + if self._move_thread is not None: + self._move_thread.join() + self._move_thread = None + super().stop(success=success) + + def stop_scan(self) -> None: + """Stop the currently running scan gracefully, this finishes the running oscillation.""" + self.scan_control.scan_stop.put(1) + + # -------------- Positioner specific methods -----------------# @property def limits(self) -> tuple: """Return limits of the Bragg positioner""" @@ -349,11 +382,6 @@ class Mo1Bragg(Device, PositionerBase): move_cpt = self.readback if move_type == MoveType.ENERGY else self.feedback_pos_angle return move_cpt.get() - @property - def stopped(self) -> bool: - """Return the stopped flag. If True, the motion is stopped.""" - return self._stopped - # pylint: disable=arguments-differ def check_value(self, value: float) -> None: """Method to check if a value is within limits of the positioner. Called by PositionerBase.move() @@ -366,26 +394,6 @@ class Mo1Bragg(Device, PositionerBase): if low_limit < high_limit and not low_limit <= value <= high_limit: raise LimitError(f"position={value} not within limits {self.limits}") - def stop(self, *, success=False) -> None: - """Stop any motion on the positioner - - Args: - success (bool) : Flag to indicate if the motion was successful - """ - # Stop any motion on the device - self.move_stop.put(1) - # Stop the move thread - self._stopped = True - if self._move_thread is not None: - self._move_thread.join() - self._move_thread = None - super().stop(success=success) - - def stop_scan(self) -> None: - """Stop the currently running scan gracefully, this finishes the running oscillation.""" - self.scan_control.scan_stop.put(1) - self._stopped = True - def _move_and_finish( self, target_pos: float, @@ -414,11 +422,8 @@ class Mo1Bragg(Device, PositionerBase): # Currently sleep is needed due to delay in updates on PVs, maybe time can be reduced time.sleep(0.5) while self.motor_is_moving.get() == 0: - # TODO check if the _run_subs is needed since we have an auto_monitor on the readback PV - # However, since the move_type can change, it might be necessary to have it here val = read_cpt.get() self._run_subs(sub_type=self.SUB_READBACK, value=val) - logger.info(f"Current position of {self.name} is {val}") if self.stopped: success = False break @@ -433,7 +438,6 @@ class Mo1Bragg(Device, PositionerBase): if exception: status.set_exception(exc=exception) else: - # The set_finished method does not allow passing success, so we use this one for the moment. # pylint: disable=protected-access status._finished(success=success) @@ -463,6 +467,10 @@ class Mo1Bragg(Device, PositionerBase): self._move_thread.start() return status + # -------------- End of Positioner specific methods -----------------# + + # -------------- MO1 Bragg specific methods -----------------# + def set_xtal( self, xtal_enum: Literal["111", "311"], @@ -516,18 +524,19 @@ class Mo1Bragg(Device, PositionerBase): self.scan_settings.s_scan_angle_hi.put(high) self.scan_settings.s_scan_scantime.put(scan_time) - def set_xrd_settings(self, - enable_low: bool = False, - enable_high: bool = False, - num_trigger_low:int=0, - num_trigger_high:int=0, - exp_time_low:int=0, - exp_time_high:int=0, - cycle_low:int=0, - cycle_high:int=0, - ) -> None: + def set_xrd_settings( + self, + enable_low: bool, + enable_high: bool, + num_trigger_low: int, + num_trigger_high: int, + exp_time_low: int, + exp_time_high: int, + cycle_low: int, + cycle_high: int, + ) -> None: """Set XRD settings for the upcoming scan. - + Args: enable_low (bool): Enable XRD for low energy/angle enable_high (bool): Enable XRD for high energy/angle @@ -558,44 +567,18 @@ class Mo1Bragg(Device, PositionerBase): self.scan_control.scan_mode_enum.put(val) self.scan_control.scan_duration.put(scan_duration) - def setup_xas_scan( - self, - low: float, - high: float, - mode: Literal["simple", "advanced"], - **kwargs, - ): - """Setup a simple XAS scan for the Bragg positioner. + def _update_scan_parameter(self): + """Get the scaninfo parameters for the scan.""" + for key, value in self.scaninfo.scan_msg.content["info"]["args"].items(): + if hasattr(self.scan_parameter, key): + setattr(self.scan_parameter, key, value) - Args: - low (float): Low energy/angle value of the scan - high (float): High energy/angle value of the scan - scan_time (float): Time for a half oscillation - mode (Literal["simple", "advanced"]): Pick a mode for the oscillation motion, either simple or advanced (needs to be configured) - scan_duration (float): Duration of the scan, if <0.1s the scan will be started in infinite motion mode - """ - # TODO check if maybe we want an extra argument for infinite or finite motion - self.set_xas_settings(low=low, high=high, scan_time=self.scan_time) - self.set_xrd_settings( - enable_low=self.xrd_enable_low, - enable_high=self.xrd_enable_high, - num_trigger_low=self.num_trigger_low, - num_trigger_high=self.num_trigger_high, - exp_time_low=self.exp_time_low, - exp_time_high=self.exp_time_high, - cycle_low=self.cycle_low, - cycle_high=self.cycle_high, - ) - self.set_scan_control_settings(mode=mode, scan_duration=self.scan_duration) - self.scan_control.scan_load.put(1) + # -------------- End MO1 Bragg specific methods -----------------# + + # -------------- Flyer Interface methods -----------------# def kickoff(self): """Kickoff the device, called from BEC.""" - # TODO - tobe discussed and decided with Debye team - # There are 2 possible ways to start the scan, either infinit or with a timer. - # If the scan_duration is less than 0.1s, the scan will be started infinite - # This logic should move in future to the controller, but for now it is implemented here - # It is necessary since the current NIDAQ implementation relies on it scan_duration = self.scan_control.scan_duration.get() start_func = ( self.scan_control.scan_start_infinite.put @@ -624,73 +607,77 @@ class Mo1Bragg(Device, PositionerBase): self.on_stage() return super().stage() - def _get_scaninfo_parameters(self): - """Get the scaninfo parameters for the scan.""" - self.start, self.end = self.scaninfo.scan_msg.info["positions"] - logger.info(self.scaninfo.scan_msg.info["scan_msgs"][0]) - pattern = re.compile(r"'scan_time':\s*([\d\.]+)") - pattern_match = pattern.search(self.scaninfo.scan_msg.info["scan_msgs"][0]) - self.scan_time = float(pattern_match.group(1)) if pattern_match else 0.0 - pattern = re.compile(r"'scan_duration':\s*([\d\.]+)") - pattern_match = pattern.search(self.scaninfo.scan_msg.info["scan_msgs"][0]) - self.scan_duration = float(pattern_match.group(1)) if pattern_match else 0.0 - pattern = re.compile(r"'exp_time_low':\s*([\d\.]+)") - pattern_match = pattern.search(self.scaninfo.scan_msg.info["scan_msgs"][0]) - self.exp_time_low = float(pattern_match.group(1)) if pattern_match else 0.0 - pattern = re.compile(r"'exp_time_high':\s*([\d\.]+)") - pattern_match = pattern.search(self.scaninfo.scan_msg.info["scan_msgs"][0]) - self.exp_time_high = float(pattern_match.group(1)) if pattern_match else 0.0 + def _check_scan_msg(self, target_state: ScanControlLoadMessage) -> None: + """Check if the scan message is gettting available - pattern = re.compile(r"'xrd_enable_low':\s*(True|False)") - pattern_match = pattern.search(self.scaninfo.scan_msg.info["scan_msgs"][0]) - self.xrd_enable_low = pattern_match.group(1) =='True' if pattern_match else False - pattern = re.compile(r"'xrd_enable_high':\s*(True|False)") - pattern_match = pattern.search(self.scaninfo.scan_msg.info["scan_msgs"][0]) - self.xrd_enable_high = pattern_match.group(1) =='True' if pattern_match else False + Args: + target_state (ScanControlLoadMessage): Target state to check for - pattern = re.compile(r"'num_trigger_low':\s*(\d+)") - pattern_match = pattern.search(self.scaninfo.scan_msg.info["scan_msgs"][0]) - self.num_trigger_low = int(pattern_match.group(1)) if pattern_match else 0 - pattern = re.compile(r"'num_trigger_high':\s*(\d+)") - pattern_match = pattern.search(self.scaninfo.scan_msg.info["scan_msgs"][0]) - self.num_trigger_high = int(pattern_match.group(1)) if pattern_match else 0 - pattern = re.compile(r"'cycle_low':\s*(\d+)") - pattern_match = pattern.search(self.scaninfo.scan_msg.info["scan_msgs"][0]) - self.cycle_low = int(pattern_match.group(1)) if pattern_match else 0 - pattern = re.compile(r"'cycle_high':\s*(\d+)") - pattern_match = pattern.search(self.scaninfo.scan_msg.info["scan_msgs"][0]) - self.cycle_high = int(pattern_match.group(1)) if pattern_match else 0 - - - - - def on_stage(self) -> None: - """Actions to be executed when the device is staged.""" - if not self.scaninfo.scan_type == "fly": - return - - if self.scan_control.scan_msg.get() != ScanControlLoadMessage.PENDING: + Raises: + TimeoutError: If the scan message is not available after the timeout + """ + if self.scan_control.scan_msg.get() != target_state: self.scan_control.scan_val_reset.put(1) if not self.wait_for_signals( - signal_conditions=[(self.scan_control.scan_msg.get, ScanControlLoadMessage.PENDING)], + signal_conditions=[(self.scan_control.scan_msg.get, target_state)], timeout=self.timeout_for_pvwait, check_stopped=True, ): raise TimeoutError( f"Timeout after {self.timeout_for_pvwait} while waiting for scan status, current state: {self.scan_control.scan_status.get()}" ) - # Add here the logic to setup different type of scans + + def on_stage(self) -> None: + """Actions to be executed when the device is staged.""" + if not self.scaninfo.scan_type == "fly": + return + self._check_scan_msg(ScanControlLoadMessage.PENDING) + scan_name = self.scaninfo.scan_msg.content["info"].get("scan_name", "") - if scan_name == "xas_simple_scan" or scan_name == "xas_simple_scan_with_xrd": - self._get_scaninfo_parameters() - self.setup_xas_scan( - low=self.start, - high=self.end, - mode=ScanControlMode.SIMPLE + self._update_scan_parameter() + if scan_name == "xas_simple_scan": + self.set_xas_settings( + low=self.scan_parameter.start, + high=self.scan_parameter.end, + scan_time=self.scan_parameter.scan_time, + ) + self.set_xrd_settings( + enable_low=False, + enable_high=False, + num_trigger_low=0, + num_trigger_high=0, + exp_time_low=0, + exp_time_high=0, + cycle_low=0, + cycle_high=0, + ) + self.set_scan_control_settings( + mode=ScanControlMode.SIMPLE, scan_duration=self.scan_parameter.scan_duration + ) + elif scan_name == "xas_simple_scan_with_xrd": + self.set_xas_settings( + low=self.scan_parameter.start, + high=self.scan_parameter.end, + scan_time=self.scan_parameter.scan_time, + ) + self.set_xrd_settings( + enable_low=self.scan_parameter.xrd_enable_low, + enable_high=self.scan_parameter.xrd_enable_high, + num_trigger_low=self.scan_parameter.num_trigger_low, + num_trigger_high=self.scan_parameter.num_trigger_high, + exp_time_low=self.scan_parameter.exp_time_low, + exp_time_high=self.scan_parameter.exp_time_high, + cycle_low=self.scan_parameter.cycle_low, + cycle_high=self.scan_parameter.cycle_high, + ) + self.set_scan_control_settings( + mode=ScanControlMode.SIMPLE, scan_duration=self.scan_parameter.scan_duration ) else: raise Mo1BraggError(f"Scan mode {scan_name} not implemented for device {self.name}") + # Load the scan parameters to the controller + self.scan_control.scan_load.put(1) # Wait for params to be checked from controller if not self.wait_for_signals( signal_conditions=[(self.scan_control.scan_msg.get, ScanControlLoadMessage.SUCCESS)], @@ -731,22 +718,18 @@ class Mo1Bragg(Device, PositionerBase): """ self.check_scan_id() self.on_unstage() - # TODO should this reset on unstage? self._stopped = False return super().unstage() def on_unstage(self) -> None: """Actions to be executed when the device is unstaged.""" - # Reset scan parameter validation - self.scan_control.scan_val_reset.put(1) - if not self.wait_for_signals( - signal_conditions=[(self.scan_control.scan_msg.get, ScanControlLoadMessage.PENDING)], - timeout=self.timeout_for_pvwait, - check_stopped=True, - ): - raise TimeoutError( - f"Timeout after {self.timeout_for_pvwait} while waiting for scan status, current state: {self.scan_control.scan_status.get()}" - ) + # Reset scan parameter validation if needed + if self.scan_control.scan_msg.get() != ScanControlLoadMessage.PENDING: + self.scan_control.scan_val_reset.put(1) + + # -------------- End Flyer Interface methods -----------------# + + # -------------- Utility methods -----------------# def check_scan_id(self) -> None: """Checks if scan_id has changed and set stopped flagged to True if it has.""" diff --git a/debye_bec/scans/mono_bragg_scans.py b/debye_bec/scans/mono_bragg_scans.py index 694824e..0a20634 100644 --- a/debye_bec/scans/mono_bragg_scans.py +++ b/debye_bec/scans/mono_bragg_scans.py @@ -32,10 +32,10 @@ class XASSimpleScan(AsyncFlyScanBase): motor: DeviceBase = "mo1_bragg", **kwargs, ): - """ The xas_simple_scan is used to start a simple oscillating scan on the mono bragg motor. - Start and Stop define the energy range for the scan, scan_time is the time for one scan cycle and scan_duration + """The xas_simple_scan is used to start a simple oscillating scan on the mono bragg motor. + Start and Stop define the energy range for the scan, scan_time is the time for one scan cycle and scan_duration is the duration of the scan. If scan duration is set to 0, the scan will run infinitely. - + Args: start (float): Start energy for the scan. stop (float): Stop energy for the scan. @@ -49,7 +49,7 @@ class XASSimpleScan(AsyncFlyScanBase): self.stop = stop self.scan_time = scan_time self.scan_duration = scan_duration - self.primary_readout_cycle = 1 + self.primary_readout_cycle = 1 def prepare_positions(self): """Prepare the positions for the scan. @@ -61,9 +61,8 @@ class XASSimpleScan(AsyncFlyScanBase): yield None def pre_scan(self): - """Pre Scan action. Happens just before the scan. - - Ensure the motor movetype is set to energy, then check limits for start/end energy. + """Pre Scan action. Ensure the motor movetype is set to energy, then check limits for start/end energy. + #TODO Remove once the motor movetype is removed and ANGLE motion is a pseudo motor. """ yield from self.stubs.send_rpc_and_wait(self.motor, "move_type.set", "energy") @@ -79,8 +78,7 @@ class XASSimpleScan(AsyncFlyScanBase): def scan_core(self): """Run the scan core. - - This is the core of the scan. It is a loop over all positions. + Kickoff the oscillation on the Bragg motor and wait for the completion of the motion. """ # Start the oscillation on the Bragg motor. yield from self.stubs.kickoff(device=self.motor) @@ -101,7 +99,7 @@ class XASSimpleScan(AsyncFlyScanBase): time.sleep(self.primary_readout_cycle) self.point_id += 1 - self.num_pos = self.point_id +1 + self.num_pos = self.point_id + 1 class XASSimpleScanWithXRD(XASSimpleScan): @@ -119,40 +117,44 @@ class XASSimpleScanWithXRD(XASSimpleScan): stop: float, scan_time: float, scan_duration: float, - xrd_enable_low : bool, - num_trigger_low :int, - exp_time_low : float, - cycle_low : int, - xrd_enable_high : bool, - num_trigger_high :int, - exp_time_high : float, - cycle_high : float, + xrd_enable_low: bool, + num_trigger_low: int, + exp_time_low: float, + cycle_low: int, + xrd_enable_high: bool, + num_trigger_high: int, + exp_time_high: float, + cycle_high: float, motor: DeviceBase = "mo1_bragg", **kwargs, ): - """ The xas_simple_scan_with_xrd is used to start a simple oscillating scan on the mono bragg motor with XRD triggering. - Start and Stop define the energy range for the scan, scan_time is the time for one scan cycle and scan_duration - is the duration of the scan. If scan duration is set to 0, the scan will run infinitely. - xrd_enable_low and xrd_enable_high define if XRD triggering is enabled for the low and high energy range. - num_trigger_low and num_trigger_high define the number of triggers for the low and high energy range. - exp_time_low and exp_time_high define the exposure time for the low and high energy range. + """The xas_simple_scan_with_xrd is an oscillation motion on the mono motor + with XRD triggering at low and high energy ranges. + If scan duration is set to 0, the scan will run infinitely. Args: start (float): Start energy for the scan. stop (float): Stop energy for the scan. - scan_time (float): Time for one scan cycle. - scan_duration (float): Duration of the scan. + scan_time (float): Time for one oscillation . + scan_duration (float): Total duration of the scan. xrd_enable_low (bool): Enable XRD triggering for the low energy range. num_trigger_low (int): Number of triggers for the low energy range. exp_time_low (float): Exposure time for the low energy range. - cycle_low (int): Cycle for the low energy range. + cycle_low (int): Specify how often the triggers should be considered, every nth cycle for low xrd_enable_high (bool): Enable XRD triggering for the high energy range. num_trigger_high (int): Number of triggers for the high energy range. exp_time_high (float): Exposure time for the high energy range. - cycle_high (int): Cycle for the high energy range. + cycle_high (int): Specify how often the triggers should be considered, every nth cycle for high motor (DeviceBase, optional): Motor device to be used for the scan. Defaults to "mo1_bragg". """ - super().__init__(start=start, stop=stop, scan_time=scan_time, scan_duration=scan_duration, motor=motor, **kwargs) + super().__init__( + start=start, + stop=stop, + scan_time=scan_time, + scan_duration=scan_duration, + motor=motor, + **kwargs, + ) self.xrd_enable_low = xrd_enable_low self.num_trigger_low = num_trigger_low self.exp_time_low = exp_time_low