From 7d3bd609d1eca339c2d20e2606cd7d6821b42bb9 Mon Sep 17 00:00:00 2001 From: "gac-x01da (Resp. Clark Adam Hugh)" Date: Wed, 24 Jul 2024 16:14:30 +0200 Subject: [PATCH] feat: first draft for XRD scan --- debye_bec/devices/mo1_bragg.py | 123 ++++++++++++++++++++++------ debye_bec/scans/__init__.py | 2 +- debye_bec/scans/mono_bragg_scans.py | 98 ++++++++++++++++++---- 3 files changed, 180 insertions(+), 43 deletions(-) diff --git a/debye_bec/devices/mo1_bragg.py b/debye_bec/devices/mo1_bragg.py index e339a28..4a62572 100644 --- a/debye_bec/devices/mo1_bragg.py +++ b/debye_bec/devices/mo1_bragg.py @@ -269,6 +269,14 @@ class Mo1Bragg(Device, PositionerBase): self.scaninfo = None self.scan_time = None self.scan_duration = None + self.xrd_enable_low = None + self.xrd_enable_high = None + self.num_trigger_low = None + self.num_trigger_high = None + self.exp_time_low = None + self.exp_time_high = None + self.cycle_low = None + self.cycle_high = None self.start = None self.end = None @@ -296,7 +304,7 @@ class Mo1Bragg(Device, PositionerBase): Args: value (int) : current progress value """ - max_value = 100 + max_value=100 logger.info(f"Progress at {value}") self._run_subs( sub_type=self.SUB_PROGRESS, @@ -508,11 +516,36 @@ class Mo1Bragg(Device, PositionerBase): self.scan_settings.s_scan_angle_hi.put(high) self.scan_settings.s_scan_scantime.put(scan_time) - def set_xrd_settings(self, enable: bool) -> None: - """Set XRD settings for the upcoming scan.""" - self.scan_settings.xrd_enable_hi_enum.put(int(enable)) - self.scan_settings.xrd_enable_lo_enum.put(int(enable)) - # TODO add here the remaining settings that are possible for XRD + def set_xrd_settings(self, + enable_low: bool = False, + enable_high: bool = False, + num_trigger_low:int=0, + num_trigger_high:int=0, + exp_time_low:int=0, + exp_time_high:int=0, + cycle_low:int=0, + cycle_high:int=0, + ) -> None: + """Set XRD settings for the upcoming scan. + + Args: + enable_low (bool): Enable XRD for low energy/angle + enable_high (bool): Enable XRD for high energy/angle + num_trigger_low (int): Number of triggers for low energy/angle + num_trigger_high (int): Number of triggers for high energy/angle + exp_time_low (int): Exposure time for low energy/angle + exp_time_high (int): Exposure time for high energy/angle + cycle_low (int): Cycle for low energy/angle + cycle_high (int): Cycle for high energy/angle + """ + self.scan_settings.xrd_enable_hi_enum.put(int(enable_high)) + self.scan_settings.xrd_enable_lo_enum.put(int(enable_low)) + self.scan_settings.xrd_n_trigger_hi.put(num_trigger_high) + self.scan_settings.xrd_n_trigger_lo.put(num_trigger_low) + self.scan_settings.xrd_time_hi.put(exp_time_high) + self.scan_settings.xrd_time_lo.put(exp_time_low) + self.scan_settings.xrd_every_n_hi.put(cycle_high) + self.scan_settings.xrd_every_n_lo.put(cycle_low) def set_scan_control_settings(self, mode: ScanControlMode, scan_duration: float) -> None: """Set the scan control settings for the upcoming scan. @@ -525,13 +558,11 @@ class Mo1Bragg(Device, PositionerBase): self.scan_control.scan_mode_enum.put(val) self.scan_control.scan_duration.put(scan_duration) - def setup_simple_xas_scan( + def setup_xas_scan( self, low: float, high: float, - scan_time: float, mode: Literal["simple", "advanced"], - scan_duration: float = 0, **kwargs, ): """Setup a simple XAS scan for the Bragg positioner. @@ -544,9 +575,18 @@ class Mo1Bragg(Device, PositionerBase): scan_duration (float): Duration of the scan, if <0.1s the scan will be started in infinite motion mode """ # TODO check if maybe we want an extra argument for infinite or finite motion - self.set_xas_settings(low=low, high=high, scan_time=scan_time) - self.set_xrd_settings(False) - self.set_scan_control_settings(mode=mode, scan_duration=scan_duration) + self.set_xas_settings(low=low, high=high, scan_time=self.scan_time) + self.set_xrd_settings( + enable_low=self.xrd_enable_low, + enable_high=self.xrd_enable_high, + num_trigger_low=self.num_trigger_low, + num_trigger_high=self.num_trigger_high, + exp_time_low=self.exp_time_low, + exp_time_high=self.exp_time_high, + cycle_low=self.cycle_low, + cycle_high=self.cycle_high, + ) + self.set_scan_control_settings(mode=mode, scan_duration=self.scan_duration) self.scan_control.scan_load.put(1) def kickoff(self): @@ -587,13 +627,42 @@ class Mo1Bragg(Device, PositionerBase): def _get_scaninfo_parameters(self): """Get the scaninfo parameters for the scan.""" self.start, self.end = self.scaninfo.scan_msg.info["positions"] - # scan_time and scan_duration have to be extracted from scan_msg string for the moment, will be changed in future - scan_time_pattern = r"scan_time': (\d+\.?\d*)" - scan_dur_pattern = r"scan_duration': (\d+\.?\d*)" - pattern_match = re.search(scan_time_pattern, self.scaninfo.scan_msg.info["scan_msgs"][0]) - self.scan_time = float(pattern_match.group(1)) - pattern_match = re.search(scan_dur_pattern, self.scaninfo.scan_msg.info["scan_msgs"][0]) - self.scan_duration = float(pattern_match.group(1)) + logger.info(self.scaninfo.scan_msg.info["scan_msgs"][0]) + pattern = re.compile(r"'scan_time':\s*([\d\.]+)") + pattern_match = pattern.search(self.scaninfo.scan_msg.info["scan_msgs"][0]) + self.scan_time = float(pattern_match.group(1)) if pattern_match else 0.0 + pattern = re.compile(r"'scan_duration':\s*([\d\.]+)") + pattern_match = pattern.search(self.scaninfo.scan_msg.info["scan_msgs"][0]) + self.scan_duration = float(pattern_match.group(1)) if pattern_match else 0.0 + pattern = re.compile(r"'exp_time_low':\s*([\d\.]+)") + pattern_match = pattern.search(self.scaninfo.scan_msg.info["scan_msgs"][0]) + self.exp_time_low = float(pattern_match.group(1)) if pattern_match else 0.0 + pattern = re.compile(r"'exp_time_high':\s*([\d\.]+)") + pattern_match = pattern.search(self.scaninfo.scan_msg.info["scan_msgs"][0]) + self.exp_time_high = float(pattern_match.group(1)) if pattern_match else 0.0 + + pattern = re.compile(r"'xrd_enable_low':\s*(True|False)") + pattern_match = pattern.search(self.scaninfo.scan_msg.info["scan_msgs"][0]) + self.xrd_enable_low = pattern_match.group(1) =='True' if pattern_match else False + pattern = re.compile(r"'xrd_enable_high':\s*(True|False)") + pattern_match = pattern.search(self.scaninfo.scan_msg.info["scan_msgs"][0]) + self.xrd_enable_high = pattern_match.group(1) =='True' if pattern_match else False + + pattern = re.compile(r"'num_trigger_low':\s*(\d+)") + pattern_match = pattern.search(self.scaninfo.scan_msg.info["scan_msgs"][0]) + self.num_trigger_low = int(pattern_match.group(1)) if pattern_match else 0 + pattern = re.compile(r"'num_trigger_high':\s*(\d+)") + pattern_match = pattern.search(self.scaninfo.scan_msg.info["scan_msgs"][0]) + self.num_trigger_high = int(pattern_match.group(1)) if pattern_match else 0 + pattern = re.compile(r"'cycle_low':\s*(\d+)") + pattern_match = pattern.search(self.scaninfo.scan_msg.info["scan_msgs"][0]) + self.cycle_low = int(pattern_match.group(1)) if pattern_match else 0 + pattern = re.compile(r"'cycle_high':\s*(\d+)") + pattern_match = pattern.search(self.scaninfo.scan_msg.info["scan_msgs"][0]) + self.cycle_high = int(pattern_match.group(1)) if pattern_match else 0 + + + def on_stage(self) -> None: """Actions to be executed when the device is staged.""" @@ -613,14 +682,12 @@ class Mo1Bragg(Device, PositionerBase): ) # Add here the logic to setup different type of scans scan_name = self.scaninfo.scan_msg.content["info"].get("scan_name", "") - if scan_name == "energy_oscillation_scan": + if scan_name == "xas_simple_scan" or scan_name == "xas_simple_scan_with_xrd": self._get_scaninfo_parameters() - self.setup_simple_xas_scan( + self.setup_xas_scan( low=self.start, high=self.end, - scan_time=self.scan_time, - mode=ScanControlMode.SIMPLE, - scan_duration=self.scan_duration, + mode=ScanControlMode.SIMPLE ) else: raise Mo1BraggError(f"Scan mode {scan_name} not implemented for device {self.name}") @@ -672,6 +739,14 @@ class Mo1Bragg(Device, PositionerBase): """Actions to be executed when the device is unstaged.""" # Reset scan parameter validation self.scan_control.scan_val_reset.put(1) + if not self.wait_for_signals( + signal_conditions=[(self.scan_control.scan_msg.get, ScanControlLoadMessage.PENDING)], + timeout=self.timeout_for_pvwait, + check_stopped=True, + ): + raise TimeoutError( + f"Timeout after {self.timeout_for_pvwait} while waiting for scan status, current state: {self.scan_control.scan_status.get()}" + ) def check_scan_id(self) -> None: """Checks if scan_id has changed and set stopped flagged to True if it has.""" diff --git a/debye_bec/scans/__init__.py b/debye_bec/scans/__init__.py index c99563b..a34521c 100644 --- a/debye_bec/scans/__init__.py +++ b/debye_bec/scans/__init__.py @@ -1 +1 @@ -from .mono_bragg_scans import MonoBraggOscillationScan +from .mono_bragg_scans import XASSimpleScan, XASSimpleScanWithXRD diff --git a/debye_bec/scans/mono_bragg_scans.py b/debye_bec/scans/mono_bragg_scans.py index e3c06e9..694824e 100644 --- a/debye_bec/scans/mono_bragg_scans.py +++ b/debye_bec/scans/mono_bragg_scans.py @@ -10,18 +10,18 @@ from bec_server.scan_server.scans import AsyncFlyScanBase from debye_bec.devices.mo1_bragg import MoveType -# TODO How to report the scan progress? -class MonoBraggOscillationScan(AsyncFlyScanBase): - """ - This class is used to start an oscillating scan motion on the mono bragg motor. - """ +class XASSimpleScan(AsyncFlyScanBase): - scan_name = "energy_oscillation_scan" + scan_name = "xas_simple_scan" scan_type = "fly" scan_report_hint = "device_progress" required_kwargs = [] use_scan_progress_report = False pre_move = False + gui_config = { + "Movement Parameters": ["start", "stop"], + "Scan Parameters": ["scan_time", "scan_duration"], + } def __init__( self, @@ -29,18 +29,27 @@ class MonoBraggOscillationScan(AsyncFlyScanBase): stop: float, scan_time: float, scan_duration: float, - motor: DeviceBase = None, + motor: DeviceBase = "mo1_bragg", **kwargs, ): + """ The xas_simple_scan is used to start a simple oscillating scan on the mono bragg motor. + Start and Stop define the energy range for the scan, scan_time is the time for one scan cycle and scan_duration + is the duration of the scan. If scan duration is set to 0, the scan will run infinitely. + + Args: + start (float): Start energy for the scan. + stop (float): Stop energy for the scan. + scan_time (float): Time for one scan cycle. + scan_duration (float): Duration of the scan. + motor (DeviceBase, optional): Motor device to be used for the scan. Defaults to "mo1_bragg". + """ super().__init__(**kwargs) - # TODO How to we deal with scans written for a specific device/motor? - self.motor = motor if motor is not None else self.device_manager.devices["mo1_bragg"].name + self.motor = motor self.start = start self.stop = stop self.scan_time = scan_time self.scan_duration = scan_duration - self.device_move_request_id = str(uuid.uuid4()) - self.primary_readout_cycle = 1 # Sleep time in seconds between readout cycles + self.primary_readout_cycle = 1 def prepare_positions(self): """Prepare the positions for the scan. @@ -92,10 +101,63 @@ class MonoBraggOscillationScan(AsyncFlyScanBase): time.sleep(self.primary_readout_cycle) self.point_id += 1 - def finalize(self): - """Set the number of points for the scan, base on the point_id which incrementally increases in scan_core.""" - # Call complete on all devices except the motor/Mo1Bragg - yield from self.stubs.complete( - device=[dev for dev in self.device_manager.devices.keys() if dev != self.motor] - ) - self.num_pos = self.point_id + 1 + self.num_pos = self.point_id +1 + + +class XASSimpleScanWithXRD(XASSimpleScan): + scan_name = "xas_simple_scan_with_xrd" + gui_config = { + "Movement Parameters": ["start", "stop"], + "Scan Parameters": ["scan_time", "scan_duration"], + "Low Energy Range": ["xrd_enable_low", "num_trigger_low", "exp_time_low", "cycle_low"], + "High Energy Range": ["xrd_enable_high", "num_trigger_high", "exp_time_high", "cycle_high"], + } + + def __init__( + self, + start: float, + stop: float, + scan_time: float, + scan_duration: float, + xrd_enable_low : bool, + num_trigger_low :int, + exp_time_low : float, + cycle_low : int, + xrd_enable_high : bool, + num_trigger_high :int, + exp_time_high : float, + cycle_high : float, + motor: DeviceBase = "mo1_bragg", + **kwargs, + ): + """ The xas_simple_scan_with_xrd is used to start a simple oscillating scan on the mono bragg motor with XRD triggering. + Start and Stop define the energy range for the scan, scan_time is the time for one scan cycle and scan_duration + is the duration of the scan. If scan duration is set to 0, the scan will run infinitely. + xrd_enable_low and xrd_enable_high define if XRD triggering is enabled for the low and high energy range. + num_trigger_low and num_trigger_high define the number of triggers for the low and high energy range. + exp_time_low and exp_time_high define the exposure time for the low and high energy range. + + Args: + start (float): Start energy for the scan. + stop (float): Stop energy for the scan. + scan_time (float): Time for one scan cycle. + scan_duration (float): Duration of the scan. + xrd_enable_low (bool): Enable XRD triggering for the low energy range. + num_trigger_low (int): Number of triggers for the low energy range. + exp_time_low (float): Exposure time for the low energy range. + cycle_low (int): Cycle for the low energy range. + xrd_enable_high (bool): Enable XRD triggering for the high energy range. + num_trigger_high (int): Number of triggers for the high energy range. + exp_time_high (float): Exposure time for the high energy range. + cycle_high (int): Cycle for the high energy range. + motor (DeviceBase, optional): Motor device to be used for the scan. Defaults to "mo1_bragg". + """ + super().__init__(start=start, stop=stop, scan_time=scan_time, scan_duration=scan_duration, motor=motor, **kwargs) + self.xrd_enable_low = xrd_enable_low + self.num_trigger_low = num_trigger_low + self.exp_time_low = exp_time_low + self.cycle_low = cycle_low + self.xrd_enable_high = xrd_enable_high + self.num_trigger_high = num_trigger_high + self.exp_time_high = exp_time_high + self.cycle_high = cycle_high