diff --git a/csaxs_bec/scans/scans_v4/__init__.py b/csaxs_bec/scans/scans_v4/__init__.py index e69de29..26814a2 100644 --- a/csaxs_bec/scans/scans_v4/__init__.py +++ b/csaxs_bec/scans/scans_v4/__init__.py @@ -0,0 +1,2 @@ +from .cont_grid_scan import ContGridScanCSAXS +from .cont_line_scan import ContLineScanCSAXS diff --git a/csaxs_bec/scans/scans_v4/cont_grid_scan.py b/csaxs_bec/scans/scans_v4/cont_grid_scan.py index e69de29..7a35288 100644 --- a/csaxs_bec/scans/scans_v4/cont_grid_scan.py +++ b/csaxs_bec/scans/scans_v4/cont_grid_scan.py @@ -0,0 +1,344 @@ +""" +Continuous line scan implemented implemented for any motor with velocity and acceleration attributes. +Optionally, it supports a check for a base velocity attribute for user_motors. The scan will move the +motor to a premove position before starting the scan, then move continuously across the line while +detectors are triggered by the delay generator. The motor velocity is set such that it reaches the +end position at the same time as the last acquisition point is finished. The acceleration time is calculated +based on the fraction of target velocity to maximum velocity. +We assume that the motor is configured with its maximum velocity and acceleration time needed to reach this +speed. The scan will then scale down the velocity and acceleration time proportionally to reach the target velocity. + +Having a base_velocity available helps to ensure that the motor is properly configured for the scan. We check for +the signal 'base_velocity' if available and consider this for the calculation. + +After the scan, velocity and acceleration time will be restored. This also happens if the scan is aborted or an exception +occurs. + +Scan procedure: + - prepare_scan + - open_scan + - stage + - pre_scan + - scan_core + - at_each_point (optionally called by scan_core) + - post_scan + - unstage + - close_scan + - on_exception (called if any exception is raised during the scan) +""" + +from __future__ import annotations + +import time +from typing import Annotated + +import numpy as np +from bec_lib.device import DeviceBase +from bec_lib.logger import bec_logger +from bec_lib.scan_args import ScanArgument, Units +from bec_server.scan_server.errors import ScanAbortion +from bec_server.scan_server.scans.scan_modifier import scan_hook +from bec_server.scan_server.scans.scans_v4 import ScanBase, ScanType + +logger = bec_logger.logger + + +class ContGridScanCSAXS(ScanBase): + scan_type = ScanType.HARDWARE_TRIGGERED + scan_name = "cont_grid_scan_csaxs" + required_kwargs = ["steps", "relative"] + gui_config = { + "Devices": ["fast_motor"], + "Movement Parameters": ["start", "stop", "steps", "relative"], + "Acquisition Parameters": [ + "exp_time", + "readout_time", + "frames_per_trigger", + "detector_delay", + ], + } + + def __init__( + self, + fast_motor: DeviceBase, + start: Annotated[ + float, ScanArgument(display_name="Start Position", reference_units="fast_motor") + ], + stop: Annotated[ + float, ScanArgument(display_name="Stop Position", reference_units="fast_motor") + ], + step_size: Annotated[ + float, ScanArgument(display_name="Step Size", reference_units="fast_motor", gt=0) + ], + exp_time: Annotated[float, ScanArgument(display_name="Exposure Time", units=Units.s, gt=0)], + detector_delay: Annotated[ + float, ScanArgument(display_name="Detector Delay", units=Units.s, ge=0) + ] = 0.01, + relative: bool = False, + always_scan_in_positive_direction: bool = False, + **kwargs, + ): + """ + Continuous line scan. + + Args: + start: nominal line start position of the fast axis + stop: nominal line end position of the fast axis + step_size: step size for the continuous line scan + exp_time: exposure time in seconds + readout_time: detector readout time in seconds + frames_per_trigger: number of frames per trigger + fast_motor: OWIS motor used as fast axis + relative: if True, interpret start and stop relative to the current motor position + detector_delay: delay between motor reaching scan speed and detector burst start + shutter_additional_width: extra time the fast shutter stays open per line + add_pre_move_time: additional pre-move time translated into distance + detector_delay_generator: DDG triggering the detector chain + monitor_delay_generator: DDG providing monitor/MCS timing + shutter_delay_generator: DDG controlling the shutter pulse + progress_device: device exposing line progress + """ + super().__init__(**kwargs) + + self.fast_motor = self.dev[fast_motor] if isinstance(fast_motor, str) else fast_motor + self.ddg = self.dev.get("ddg", None) + if self.ddg is None: + raise ScanAbortion( + "Main delay generator named 'ddg1' device is required for the continuous line scan." + ) + self.mcs = self.dev.get("mcs", None) + if self.mcs is None: + logger.warning( + "Did not find MCS device named 'mcs'. Monitoring of acquisition progress might not work properly." + ) + + # Relevant scan parameters + self.motors = [self.fast_motor] + self.start = start + self.stop = stop + # Flip start and stop if configured to always scan in positive direction + if always_scan_in_positive_direction and self.stop < self.start: + self.start, self.stop = self.stop, self.start + self.step_size = step_size + self.exp_time = exp_time + self.relative = relative + self.detector_delay = detector_delay + + # Motor config parameters + self._original_motor_values = {"velocity": None, "acceleration": None} + self.target_velocity = None + self.base_velocity = None + self.acc_time = None + self.premove_distance = None + + # DDG parameter + self.shutter_open_delay = None + + @scan_hook + def prepare_scan(self): + if np.isclose(self.stop, self.start, atol=self.step_size): + raise ScanAbortion( + f"Stop {self.stop} and start {self.start} positions are too close for the given step size {self.step_size}." + ) + frames_per_trigger = int(np.ceil(np.abs(self.stop - self.start) / self.step_size)) + + # Handle positions and limits first to catch potential issues before everything else. + self.positions = self._prepare_positions( + start=self.start, stop=self.stop, steps=frames_per_trigger + ) + if self.relative: + self.start_positions = self.components.get_start_positions(self.motors) + self.positions += self.start_positions + + self.actions.set_device_readout_priority(self.motors, priority="monitored") + + # Compute target velocity, acceleration time and premove distance based on the scan parameters and motor capabilities + # We also need to fetch the delay between the shutter opening and the triggers to start by the delay generator + # As this time is very small (2e-3), we add it to the premove distance. But technically it could also be + # handled by adjusting the sleep after sending of the motor move command. + self.shutter_open_delay = self.ddg.get_shutter_open_delay() + self.premove_distance = self._compute_premove_distance(self.shutter_open_delay) + + # Check +/- premove distance around start and stop + position_range = self._prepare_positions( + start=self.start - self.premove_distance, + stop=self.stop + self.premove_distance, + steps=2, + include_endpoint=True, + ) + self.components.check_limits(self.motors, position_range) + + # Setup progress device if available + if self.mcs is not None: + self.actions.add_scan_report_instruction_device_progress(self.mcs) + else: + logger.warning( + "MCS device not found. Progress reporting will not work for this scan. Please add a device named 'mcs' to enable progress reporting." + ) + + # Set scan info paramters + self.update_scan_info( + positions=self.positions, + num_points=1, + num_monitored_readouts=0, + frames_per_trigger=frames_per_trigger, + exp_time=self.exp_time, + relative=self.relative, + run_on_exception_hook=True, + ) + self._baseline_readout_status = self.actions.read_baseline_devices(wait=False) + self._premove_motor_status = self.actions.set( + self.fast_motor, self.positions[0, 0] - self.premove_distance, wait=False + ) + + @scan_hook + def open_scan(self): + self.actions.open_scan() + + @scan_hook + def stage(self): + self.actions.stage_all_devices() + + @scan_hook + def pre_scan(self): + self._premove_motor_status.wait() + self.actions.pre_scan() + + @scan_hook + def scan_core(self): + self.at_each_point() + + @scan_hook + def at_each_point(self): + # Now we set the motor velocity and acceleration for the scan + st = self.fast_motor.velocity.set(self.target_velocity) + st2 = self.fast_motor.acceleration.set(self.acc_time) + st.wait() + st2.wait() + move_status = self.actions.set( + self.fast_motor, self.stop + self.premove_distance, wait=False + ) + time.sleep(self.acc_time) + trigger_status = self.ddg.trigger() + while not move_status.done: + self.actions.read_monitored_devices(wait=True) + try: # Readout at 2 Hz + move_status.wait(timeout=0.5) # only sleep as long as the move is not finished + except TimeoutError: + continue + + try: + trigger_status.wait(timeout=2) + except TimeoutError: + raise ScanAbortion( + f"Status for delay generator trigger {self.ddg} did not resolve after 2 seconds. " + ) + + @scan_hook + def post_scan(self): + self._restore_motor_properties() + status = self.actions.complete_all_devices(wait=False) + if self.relative: + self.components.move_and_wait(self.motors, self.start_positions) + status.wait() + + @scan_hook + def unstage(self): + self.actions.unstage_all_devices() + + @scan_hook + def close_scan(self): + if self._baseline_readout_status is not None: + self._baseline_readout_status.wait() + self.actions.close_scan() + self.actions.check_for_unchecked_statuses() + + @scan_hook + def on_exception(self, exception: Exception): + self._restore_motor_properties() + if self.relative: + self.components.move_and_wait(self.motors, self.start_positions) + + ########################## + ### Internal helper methods + ########################## + + def _prepare_positions( + self, start: float, stop: float, steps: int, include_endpoint: bool = False + ) -> np.ndarray: + axis_positions = np.linspace(start, stop, steps, endpoint=include_endpoint, dtype=float) + return np.column_stack(axis_positions) + + def _load_motor_properties(self) -> tuple[float, float, float]: + if not hasattr(self.fast_motor, "velocity"): + raise ScanAbortion(f"Motor {self.fast_motor} does not have a velocity attribute.") + if not hasattr(self.fast_motor, "acceleration"): + raise ScanAbortion(f"Motor {self.fast_motor} does not have an acceleration attribute.") + + vel = self.fast_motor.velocity.get() + acc = self.fast_motor.acceleration.get() + self._original_motor_values["velocity"] = vel + self._original_motor_values["acceleration"] = acc + + if self._has_base_velocity(): + base_vel = self.fast_motor.base_velocity.get() + else: + base_vel = 0.0 + + return vel, acc, base_vel + + def _compute_continous_motion_params(self) -> tuple[float, float]: + # Compute target velocity based on step size and exposure time + self.target_velocity = self.step_size / self.exp_time + + # Safeguard high and low velocity limits + if self.target_velocity > self._original_motor_values["velocity"]: + raise ScanAbortion( + f"Requested velocity of {self.target_velocity} exceeds maximum velocity {self._original_motor_values['velocity']} of motor {self.fast_motor.name}." + ) + if self._has_base_velocity() and self.target_velocity < self.base_velocity: + raise ScanAbortion( + f"Requested velocity of {self.target_velocity} is below base velocity {self.base_velocity}." + ) + + # Take base velocity into account. + acc_time = ( + (self.target_velocity - self.base_velocity) + / (self._original_motor_values["velocity"] - self.base_velocity) + * self._original_motor_values["acceleration"] + ) + + return self.target_velocity, acc_time + + def _compute_premove_distance(self, additional_distance: float) -> float: + # Load old motor parameters + vel, acc, base_vel = self._load_motor_properties() + self._original_motor_values["velocity"] = vel + self._original_motor_values["acceleration"] = acc + self.base_velocity = base_vel + + # Compute target velocity and acceleration time for the continuous motion + target_vel, acc_time = self._compute_continous_motion_params() + self.target_velocity = target_vel + self.acc_time = acc_time + + # Compute the premove distance needed to ensure the motor reaches the target velocity + return ( + 0.5 * (self.target_velocity + self.base_velocity) * self.acc_time + + additional_distance * self.target_velocity + ) + + def _set_scan_velocity(self): + self.fast_motor.velocity.put(self.target_velocity) + self.fast_motor.acceleration.put(self.acc_time) + + def _restore_motor_properties(self): + vel = self._original_motor_values["velocity"] + acc = self._original_motor_values["acceleration"] + if vel is not None: + self.fast_motor.velocity.put(vel) + if acc is not None: + self.fast_motor.acceleration.put(acc) + + def _has_base_velocity(self) -> bool: + return hasattr(self.fast_motor, "base_velocity") diff --git a/csaxs_bec/scans/scans_v4/cont_line_scan.py b/csaxs_bec/scans/scans_v4/cont_line_scan.py index e69de29..cf0a3c2 100644 --- a/csaxs_bec/scans/scans_v4/cont_line_scan.py +++ b/csaxs_bec/scans/scans_v4/cont_line_scan.py @@ -0,0 +1,344 @@ +""" +Continuous line scan implemented implemented for any motor with velocity and acceleration attributes. +Optionally, it supports a check for a base velocity attribute for user_motors. The scan will move the +motor to a premove position before starting the scan, then move continuously across the line while +detectors are triggered by the delay generator. The motor velocity is set such that it reaches the +end position at the same time as the last acquisition point is finished. The acceleration time is calculated +based on the fraction of target velocity to maximum velocity. +We assume that the motor is configured with its maximum velocity and acceleration time needed to reach this +speed. The scan will then scale down the velocity and acceleration time proportionally to reach the target velocity. + +Having a base_velocity available helps to ensure that the motor is properly configured for the scan. We check for +the signal 'base_velocity' if available and consider this for the calculation. + +After the scan, velocity and acceleration time will be restored. This also happens if the scan is aborted or an exception +occurs. + +Scan procedure: + - prepare_scan + - open_scan + - stage + - pre_scan + - scan_core + - at_each_point (optionally called by scan_core) + - post_scan + - unstage + - close_scan + - on_exception (called if any exception is raised during the scan) +""" + +from __future__ import annotations + +import time +from typing import Annotated + +import numpy as np +from bec_lib.device import DeviceBase +from bec_lib.logger import bec_logger +from bec_lib.scan_args import ScanArgument, Units +from bec_server.scan_server.errors import ScanAbortion +from bec_server.scan_server.scans.scan_modifier import scan_hook +from bec_server.scan_server.scans.scans_v4 import ScanBase, ScanType + +logger = bec_logger.logger + + +class ContLineScanCSAXS(ScanBase): + scan_type = ScanType.HARDWARE_TRIGGERED + scan_name = "cont_line_scan_csaxs" + required_kwargs = ["steps", "relative"] + gui_config = { + "Devices": ["fast_motor"], + "Movement Parameters": ["start", "stop", "steps", "relative"], + "Acquisition Parameters": [ + "exp_time", + "readout_time", + "frames_per_trigger", + "detector_delay", + ], + } + + def __init__( + self, + fast_motor: DeviceBase, + start: Annotated[ + float, ScanArgument(display_name="Start Position", reference_units="fast_motor") + ], + stop: Annotated[ + float, ScanArgument(display_name="Stop Position", reference_units="fast_motor") + ], + step_size: Annotated[ + float, ScanArgument(display_name="Step Size", reference_units="fast_motor", gt=0) + ], + exp_time: Annotated[float, ScanArgument(display_name="Exposure Time", units=Units.s, gt=0)], + detector_delay: Annotated[ + float, ScanArgument(display_name="Detector Delay", units=Units.s, ge=0) + ] = 0.01, + relative: bool = False, + always_scan_in_positive_direction: bool = False, + **kwargs, + ): + """ + Continuous line scan. + + Args: + start: nominal line start position of the fast axis + stop: nominal line end position of the fast axis + step_size: step size for the continuous line scan + exp_time: exposure time in seconds + readout_time: detector readout time in seconds + frames_per_trigger: number of frames per trigger + fast_motor: OWIS motor used as fast axis + relative: if True, interpret start and stop relative to the current motor position + detector_delay: delay between motor reaching scan speed and detector burst start + shutter_additional_width: extra time the fast shutter stays open per line + add_pre_move_time: additional pre-move time translated into distance + detector_delay_generator: DDG triggering the detector chain + monitor_delay_generator: DDG providing monitor/MCS timing + shutter_delay_generator: DDG controlling the shutter pulse + progress_device: device exposing line progress + """ + super().__init__(**kwargs) + + self.fast_motor = self.dev[fast_motor] if isinstance(fast_motor, str) else fast_motor + self.ddg = self.dev.get("ddg", None) + if self.ddg is None: + raise ScanAbortion( + "Main delay generator named 'ddg1' device is required for the continuous line scan." + ) + self.mcs = self.dev.get("mcs", None) + if self.mcs is None: + logger.warning( + "Did not find MCS device named 'mcs'. Monitoring of acquisition progress might not work properly." + ) + + # Relevant scan parameters + self.motors = [self.fast_motor] + self.start = start + self.stop = stop + # Flip start and stop if configured to always scan in positive direction + if always_scan_in_positive_direction and self.stop < self.start: + self.start, self.stop = self.stop, self.start + self.step_size = step_size + self.exp_time = exp_time + self.relative = relative + self.detector_delay = detector_delay + + # Motor config parameters + self._original_motor_values = {"velocity": None, "acceleration": None} + self.target_velocity = None + self.base_velocity = None + self.acc_time = None + self.premove_distance = None + + # DDG parameter + self.shutter_open_delay = None + + @scan_hook + def prepare_scan(self): + if np.isclose(self.stop, self.start, atol=self.step_size): + raise ScanAbortion( + f"Stop {self.stop} and start {self.start} positions are too close for the given step size {self.step_size}." + ) + frames_per_trigger = int(np.ceil(np.abs(self.stop - self.start) / self.step_size)) + + # Handle positions and limits first to catch potential issues before everything else. + self.positions = self._prepare_positions( + start=self.start, stop=self.stop, steps=frames_per_trigger + ) + if self.relative: + self.start_positions = self.components.get_start_positions(self.motors) + self.positions += self.start_positions + + self.actions.set_device_readout_priority(self.motors, priority="monitored") + + # Compute target velocity, acceleration time and premove distance based on the scan parameters and motor capabilities + # We also need to fetch the delay between the shutter opening and the triggers to start by the delay generator + # As this time is very small (2e-3), we add it to the premove distance. But technically it could also be + # handled by adjusting the sleep after sending of the motor move command. + self.shutter_open_delay = self.ddg.get_shutter_open_delay() + self.premove_distance = self._compute_premove_distance(self.shutter_open_delay) + + # Check +/- premove distance around start and stop + position_range = self._prepare_positions( + start=self.start - self.premove_distance, + stop=self.stop + self.premove_distance, + steps=2, + include_endpoint=True, + ) + self.components.check_limits(self.motors, position_range) + + # Setup progress device if available + if self.mcs is not None: + self.actions.add_scan_report_instruction_device_progress(self.mcs) + else: + logger.warning( + "MCS device not found. Progress reporting will not work for this scan. Please add a device named 'mcs' to enable progress reporting." + ) + + # Set scan info paramters + self.update_scan_info( + positions=self.positions, + num_points=1, + num_monitored_readouts=0, + frames_per_trigger=frames_per_trigger, + exp_time=self.exp_time, + relative=self.relative, + run_on_exception_hook=True, + ) + self._baseline_readout_status = self.actions.read_baseline_devices(wait=False) + self._premove_motor_status = self.actions.set( + self.fast_motor, self.positions[0, 0] - self.premove_distance, wait=False + ) + + @scan_hook + def open_scan(self): + self.actions.open_scan() + + @scan_hook + def stage(self): + self.actions.stage_all_devices() + + @scan_hook + def pre_scan(self): + self._premove_motor_status.wait() + self.actions.pre_scan() + + @scan_hook + def scan_core(self): + self.at_each_point() + + @scan_hook + def at_each_point(self): + # Now we set the motor velocity and acceleration for the scan + st = self.fast_motor.velocity.set(self.target_velocity) + st2 = self.fast_motor.acceleration.set(self.acc_time) + st.wait() + st2.wait() + move_status = self.actions.set( + self.fast_motor, self.stop + self.premove_distance, wait=False + ) + time.sleep(self.acc_time) + trigger_status = self.ddg.trigger() + while not move_status.done: + self.actions.read_monitored_devices(wait=True) + try: # Readout at 2 Hz + move_status.wait(timeout=0.5) # only sleep as long as the move is not finished + except TimeoutError: + continue + + try: + trigger_status.wait(timeout=2) + except TimeoutError: + raise ScanAbortion( + f"Status for delay generator trigger {self.ddg} did not resolve after 2 seconds. " + ) + + @scan_hook + def post_scan(self): + self._restore_motor_properties() + status = self.actions.complete_all_devices(wait=False) + if self.relative: + self.components.move_and_wait(self.motors, self.start_positions) + status.wait() + + @scan_hook + def unstage(self): + self.actions.unstage_all_devices() + + @scan_hook + def close_scan(self): + if self._baseline_readout_status is not None: + self._baseline_readout_status.wait() + self.actions.close_scan() + self.actions.check_for_unchecked_statuses() + + @scan_hook + def on_exception(self, exception: Exception): + self._restore_motor_properties() + if self.relative: + self.components.move_and_wait(self.motors, self.start_positions) + + ########################## + ### Internal helper methods + ########################## + + def _prepare_positions( + self, start: float, stop: float, steps: int, include_endpoint: bool = False + ) -> np.ndarray: + axis_positions = np.linspace(start, stop, steps, endpoint=include_endpoint, dtype=float) + return np.column_stack(axis_positions) + + def _load_motor_properties(self) -> tuple[float, float, float]: + if not hasattr(self.fast_motor, "velocity"): + raise ScanAbortion(f"Motor {self.fast_motor} does not have a velocity attribute.") + if not hasattr(self.fast_motor, "acceleration"): + raise ScanAbortion(f"Motor {self.fast_motor} does not have an acceleration attribute.") + + vel = self.fast_motor.velocity.get() + acc = self.fast_motor.acceleration.get() + self._original_motor_values["velocity"] = vel + self._original_motor_values["acceleration"] = acc + + if self._has_base_velocity(): + base_vel = self.fast_motor.base_velocity.get() + else: + base_vel = 0.0 + + return vel, acc, base_vel + + def _compute_continous_motion_params(self) -> tuple[float, float]: + # Compute target velocity based on step size and exposure time + self.target_velocity = self.step_size / self.exp_time + + # Safeguard high and low velocity limits + if self.target_velocity > self._original_motor_values["velocity"]: + raise ScanAbortion( + f"Requested velocity of {self.target_velocity} exceeds maximum velocity {self._original_motor_values['velocity']} of motor {self.fast_motor.name}." + ) + if self._has_base_velocity() and self.target_velocity < self.base_velocity: + raise ScanAbortion( + f"Requested velocity of {self.target_velocity} is below base velocity {self.base_velocity}." + ) + + # Take base velocity into account. + acc_time = ( + (self.target_velocity - self.base_velocity) + / (self._original_motor_values["velocity"] - self.base_velocity) + * self._original_motor_values["acceleration"] + ) + + return self.target_velocity, acc_time + + def _compute_premove_distance(self, additional_distance: float) -> float: + # Load old motor parameters + vel, acc, base_vel = self._load_motor_properties() + self._original_motor_values["velocity"] = vel + self._original_motor_values["acceleration"] = acc + self.base_velocity = base_vel + + # Compute target velocity and acceleration time for the continuous motion + target_vel, acc_time = self._compute_continous_motion_params() + self.target_velocity = target_vel + self.acc_time = acc_time + + # Compute the premove distance needed to ensure the motor reaches the target velocity + return ( + 0.5 * (self.target_velocity + self.base_velocity) * self.acc_time + + additional_distance * self.target_velocity + ) + + def _set_scan_velocity(self): + self.fast_motor.velocity.put(self.target_velocity) + self.fast_motor.acceleration.put(self.acc_time) + + def _restore_motor_properties(self): + vel = self._original_motor_values["velocity"] + acc = self._original_motor_values["acceleration"] + if vel is not None: + self.fast_motor.velocity.put(vel) + if acc is not None: + self.fast_motor.acceleration.put(acc) + + def _has_base_velocity(self) -> bool: + return hasattr(self.fast_motor, "base_velocity")