diff --git a/csaxs_bec/scans/__init__.py b/csaxs_bec/scans/__init__.py index 0497a31..f4aad14 100644 --- a/csaxs_bec/scans/__init__.py +++ b/csaxs_bec/scans/__init__.py @@ -1,5 +1,8 @@ from .flomni_fermat_scan import FlomniFermatScan +from .flomni_fermat_scan_v4 import FlomniFermatScanV4 from .jungfrau_joch_scan import JungfrauJochTestScan +from .lamni_fermat_scan_v4 import LamniFermatScanV4 +from .lamni_move_to_scan_center import LamniMoveToScanCenter from .LamNIFermatScan import LamNIFermatScan, LamNIMoveToScanCenter from .omny_fermat_scan import OMNYFermatScan from .owis_grid import OwisGrid diff --git a/csaxs_bec/scans/flomni_fermat_scan_v4.py b/csaxs_bec/scans/flomni_fermat_scan_v4.py new file mode 100644 index 0000000..de76c66 --- /dev/null +++ b/csaxs_bec/scans/flomni_fermat_scan_v4.py @@ -0,0 +1,425 @@ +""" +flOMNI Fermat Scan + +Scan procedure: + - prepare_scan + - open_scan + - stage + - pre_scan + - scan_core + - at_each_point (optionally called by scan_core) + - post_scan + - unstage + - close_scan + - on_exception (called if any exception is raised during the scan) +""" + +from __future__ import annotations + +import time +from typing import Annotated + +import numpy as np +from bec_lib import messages +from bec_lib.alarm_handler import Alarms +from bec_lib.endpoints import MessageEndpoints +from bec_lib.logger import bec_logger +from bec_lib.scan_args import DefaultArgType, ScanArgument, Units +from bec_server.scan_server.scans import ScanAbortion +from bec_server.scan_server.scans.scan_base import ScanBase, ScanType +from bec_server.scan_server.scans.scan_modifier import scan_hook + +from csaxs_bec.devices.epics.delay_generator_csaxs.delay_generator_csaxs import TRIGGERSOURCE + +logger = bec_logger.logger + + +class FlomniFermatScanV4(ScanBase): + # Scan Type: Hardware triggered or software triggered? + # If the main trigger and readout logic is done within the at_each_point method in scan_core, choose SOFTWARE_TRIGGERED. + # If the main trigger and readout logic is implemented on a device that is simply kicked off in this scan, choose HARDWARE_TRIGGERED. + # This primarily serves as information for devices: The device may need to react differently if a software trigger is expected + # for every point. + scan_type = ScanType.HARDWARE_TRIGGERED + + # Scan name: This is the name of the scan, e.g. "line_scan". This is used for display purposes and to identify the scan type in user interfaces. + # Choose a descriptive name that does not conflict with existing scan names. + # It must be a valid Python identifier, that is, it can only contain letters, numbers, and underscores, and must not start with a number. + scan_name = "flomni_fermat_scan_v4" + + gui_config = { + "Scan Parameters": [ + "fovx", + "fovy", + "cenx", + "ceny", + "step", + "zshift", + "angle", + "corridor_size", + "relative", + ], + "Acquisition Parameters": ["exp_time", "frames_per_trigger", "burst_at_each_point"], + } + + def __init__( + # fmt: off + self, + fovx: Annotated[float, ScanArgument(display_name="Fovx", description="FOV in the piezo plane (i.e. piezo range). Max 200 um.", units=Units.µm, gt=0, lt=200)], + fovy: Annotated[float, ScanArgument(display_name="Fovy", description="FOV in the piezo plane (i.e. piezo range). Max 100 um.", units=Units.µm, gt=0, lt=100)], + cenx: Annotated[float, ScanArgument(display_name="Cenx", description="Center position in x.", units=Units.µm)], + ceny: Annotated[float, ScanArgument(display_name="Ceny", description="Center position in y.", units=Units.µm)], + step: Annotated[float, ScanArgument(display_name="Step", description="Step size.", units=Units.µm)], + zshift: Annotated[float, ScanArgument(display_name="Zshift", description="Shift in z. ", units=Units.µm)], + angle: Annotated[float, ScanArgument(display_name="Angle", description="Rotation angle (will rotate first)", units=Units.deg)], + corridor_size: Annotated[float, ScanArgument(display_name="Corridor Size", description="Corridor size for the corridor optimization.", units=Units.µm)], + exp_time: DefaultArgType.ExposureTime = 0, + frames_per_trigger: DefaultArgType.FramesPerTrigger = 1, + burst_at_each_point: DefaultArgType.BurstAtEachPoint = 1, + **kwargs, + # fmt: on + ): + """ + flOMNI Fermat Scan + + Args: + fovx (float): FOV in the piezo plane (i.e. piezo range). Max 200 um. + fovy (float): FOV in the piezo plane (i.e. piezo range). Max 100 um. + cenx (float): Center position in x. + ceny (float): Center position in y. + step (float): Step size. + zshift (float): Shift in z. + angle (float): Rotation angle (will rotate first) + corridor_size (float): Corridor size for the corridor optimization. + exp_time (float): Exposure time in seconds + frames_per_trigger (int): Number of frames per trigger for devices that support configurable frame counts per trigger. + burst_at_each_point (int): Number of triggers and readouts at each point. + + Returns: + ScanReport + """ + super().__init__(**kwargs) + self._baseline_readout_status = None + self.fovx = fovx + self.fovy = fovy + self.cenx = cenx + self.ceny = ceny + self.step = step + self.zshift = zshift + self.angle = angle + self.corridor_size = corridor_size + self.exp_time = exp_time + self.frames_per_trigger = frames_per_trigger + self.burst_at_each_point = burst_at_each_point + self.flomni_rotation_status = None + + if self.zshift > 100: + logger.warning("The zshift is larger than 100 um. It will be limited to 100 um.") + self.zshift = 100 + + if self.zshift < -100: + logger.warning("The zshift is smaller than -100 um. It will be limited to -100 um.") + self.zshift = -100 + + self.update_scan_info( + exp_time=exp_time, + frames_per_trigger=frames_per_trigger, + burst_at_each_point=burst_at_each_point, + ) + + @scan_hook + def prepare_scan(self): + """ + Prepare the scan. This can include any steps that need to be executed + before the scan is opened, such as preparing the positions (if not done already) + or setting up the devices. + """ + + positions = self.get_flomni_fermat_spiral_pos( + -np.abs(self.fovx / 2), + np.abs(self.fovx / 2), + -np.abs(self.fovy / 2), + np.abs(self.fovy / 2), + step=self.step, + spiral_type=0, + center=False, + ) + + if len(positions) < 20: + raise ScanAbortion(f"The number positions must exceed 20. Currently: {len(positions)}.") + + self.positions = self.components.optimize_trajectory( + positions, optimization_type="corridor", corridor_size=self.corridor_size + ) + flip_axes = self.reverse_trajectory() + if flip_axes: + self.positions = np.flipud(self.positions) + + self.update_scan_info(positions=self.positions, num_points=len(self.positions)) + + self.prepare_setup() + + self.actions.add_scan_report_instruction_device_progress(device="rt_positions") + self._baseline_readout_status = self.actions.read_baseline_devices(wait=False) + + @scan_hook + def open_scan(self): + """ + Open the scan. + This step must call self.actions.open_scan() to ensure that a new scan is + opened. Make sure to prepare the scan metadata before, either in + prepare_scan() or in open_scan() itself and call self.update_scan_info(...) + to update the scan metadata if needed. + """ + self.actions.open_scan() + + @scan_hook + def stage(self): + """ + Stage the devices for the upcoming scan. The stage logic is typically + implemented on the device itself (i.e. by the device's stage method). + However, if there are any additional steps that need to be executed before + staging the devices, they can be implemented here. + """ + self.actions.stage_all_devices() + + @scan_hook + def pre_scan(self): + """ + Pre-scan steps to be executed before the main scan logic. + This is typically the last chance to prepare the devices before the core scan + logic is executed. For example, this is a good place to initialize time-criticial + devices, e.g. devices that have a short timeout. + The pre-scan logic is typically implemented on the device itself. + """ + self.prepare_setup_part_2() + self.actions.pre_scan_all_devices() + + @scan_hook + def scan_core(self): + """ + Core scan logic to be executed during the scan. + This is where the main scan logic should be implemented. + """ + + # send off the flyer + self.dev.rt_positions.kickoff().wait() + + # start the readout loop of the flyer + status = self.actions.complete(device="rt_positions", wait=False) + while not status.done: + self.at_each_point() + + @scan_hook + def at_each_point(self): + """ + Logic to be executed at each acquisition point during the scan. + """ + self.actions.read_monitored_devices() + time.sleep(1) + + @scan_hook + def post_scan(self): + """ + Post-scan steps to be executed after the main scan logic. + """ + # in flomni, we need to move to the start position of the next scan, + # which is the end position of the current scan + move_status = None + if isinstance(self.positions, np.ndarray) and len(self.positions[-1]) == 3: + # in x we move to cenx, then we avoid jumps in centering routine + value = self.positions[-1] + value[0] = self.cenx + move_status = self.actions.set(device=["rtx", "rty", "rtz"], value=value, wait=False) + + self.actions.complete_all_devices() + + if move_status: + move_status.wait() + + self.dev.ddg1.set_trigger(TRIGGERSOURCE.SINGLE_SHOT.value) + + @scan_hook + def unstage(self): + """Unstage the scan by executing post-scan steps.""" + self.actions.unstage_all_devices() + + @scan_hook + def close_scan(self): + """Close the scan.""" + if self._baseline_readout_status is not None: + self._baseline_readout_status.wait() + self.actions.close_scan() + self.actions.check_for_unchecked_statuses() + + @scan_hook + def on_exception(self, exception: Exception): + """ + Handle exceptions that occur during the scan. + This is a good place to implement any cleanup logic that needs to be executed in case of an exception, + such as returning the devices to a safe state or moving the motors back to their starting position. + """ + self.dev.ddg1.set_trigger(TRIGGERSOURCE.SINGLE_SHOT.value) + + ####################################################### + ######### Helper methods for the scan logic ########### + ####################################################### + + def reverse_trajectory(self): + """ + Reverse the trajectory. Every other scan should be reversed to + shorten the movement time. In order to keep the last state, even if the + server is restarted, the state is stored in a global variable in redis. + """ + msg = self.redis_connector.get(MessageEndpoints.global_vars("reverse_flomni_trajectory")) + if msg: + val = msg.content.get("value", False) + else: + val = False + self.redis_connector.set( + MessageEndpoints.global_vars("reverse_flomni_trajectory"), + messages.VariableMessage(value=(not val)), + ) + return val + + def prepare_setup(self): + """ + Prepare the first part of the setup: + - Clear the trajectory of the rt controller + - Rotate flomni to the requested angle + - Move rty to the start position + """ + self.dev.rtx.controller.clear_trajectory_generator() + self.flomni_rotation(self.angle) + self.dev.rty.set(self.positions[0][1]) + + def prepare_setup_part_2(self): + """ + Prepare the second part of the setup: + - Set the delay generator ddg1 to external rising edge + - Wait for flomni rotation to complete (started in prepare_setup) + - Move rtx and rtz to the start position + - Turn on the laser tracker + - Add the positions to the rt controller's scan trajectory + - Check the signal strength of the laser tracker and raise an alarm if it is low + - Move samx to the scan region + """ + dev = self.dev + + # Prepare DDG1 + dev.ddg1.set_trigger(TRIGGERSOURCE.EXT_RISING_EDGE.value) + + if self.flomni_rotation_status: + self.flomni_rotation_status.wait() + + rtx_status = dev.rtx.set(self.cenx, wait=False) + rtz_status = dev.rtz.set(self.positions[0][2], wait=False) + + dev.rtx.controller.laser_tracker_on() + + rtx_status.wait() + rtz_status.wait() + + dev.rtx.controller.add_pos_to_scan(self.positions.tolist()) + + tracker_signal_status = dev.rtx.controller.laser_tracker_check_signalstrength() + dev.rtx.controller.move_samx_to_scan_region(self.cenx) + + if tracker_signal_status == "low": + error_info = messages.ErrorInfo( + error_message="Signal strength of the laser tracker is low, but sufficient to continue. Realignment recommended!", + compact_error_message="Low signal strength of the laser tracker. Realignment recommended!", + exception_type="LaserTrackerSignalStrengthLow", + device="rtx", + ) + self.device_manager.connector.raise_alarm(severity=Alarms.WARNING, info=error_info) + elif tracker_signal_status == "toolow": + raise ScanAbortion( + "Signal strength of the laser tracker is too low for scanning. Realignment required!" + ) + + def flomni_rotation(self, angle: float): + """ + Rotate flomni to the requested angle. + We also emit a scan report instruction to keep users informed about the progress of the + rotation as it may take a few seconds. + + Note that we do not wait for the rotation to complete here, but + instead wait in prepare_setup_part_2. + + Args: + angle (float): The target angle for the flomni rotation. + """ + fsamroy_current_setpoint = self.dev.fsamroy.user_setpoint.get() + if angle == fsamroy_current_setpoint: + logger.info("No rotation required") + return + + logger.info("Rotating to requested angle") + self.actions.add_scan_report_instruction_readback( + devices=["fsamroy"], + start=[fsamroy_current_setpoint], + stop=[angle], + request_id=self.scan_info.metadata["RID"], + ) + self.flomni_rotation_status = self.dev.fsamroy.set(angle, wait=False) + + def get_flomni_fermat_spiral_pos( + self, + m1_start: float, + m1_stop: float, + m2_start: float, + m2_stop: float, + step: float = 1, + spiral_type: int = 0, + center: bool = False, + ): + """ + Calculate positions for a Fermat spiral scan. + + Args: + m1_start(float): start position in m1 + m1_stop(float): stop position in m1 + m2_start(float): start position in m2 + m2_stop(float): stop position in m2 + step(float): stepsize + spiral_type(int): 0 for traditional Fermat spiral + center(bool): whether to include the center position + + Returns: + positions(array): positions + """ + positions = [] + phi = 2 * np.pi * ((1 + np.sqrt(5)) / 2.0) + spiral_type * np.pi + + start = int(not center) + + length_axis1 = np.abs(m1_stop - m1_start) + length_axis2 = np.abs(m2_stop - m2_start) + n_max = int(length_axis1 * length_axis2 * 3.2 / step / step) + + z_pos = self.zshift + + for ii in range(start, n_max): + radius = step * 0.57 * np.sqrt(ii) + # FOV is restructed below at check pos in range + if abs(radius * np.sin(ii * phi)) > length_axis1 / 2: + continue + if abs(radius * np.cos(ii * phi)) > length_axis2 / 2: + continue + x = radius * np.sin(ii * phi) + y = radius * np.cos(ii * phi) + positions.append([x + self.cenx, y + self.ceny, z_pos]) + left_lower_corner = [ + min(m1_start, m1_stop) + self.cenx, + min(m2_start, m2_stop) + self.ceny, + z_pos, + ] + right_upper_corner = [ + max(m1_start, m1_stop) + self.cenx, + max(m2_start, m2_stop) + self.ceny, + z_pos, + ] + positions.append(left_lower_corner) + positions.append(right_upper_corner) + return np.array(positions) diff --git a/csaxs_bec/scans/lamni_components.py b/csaxs_bec/scans/lamni_components.py new file mode 100644 index 0000000..5cbefd1 --- /dev/null +++ b/csaxs_bec/scans/lamni_components.py @@ -0,0 +1,175 @@ +from __future__ import annotations + +import time + +import numpy as np +from bec_lib.logger import bec_logger +from bec_server.scan_server.scans.scan_components import ScanComponents + +logger = bec_logger.logger + +MOVEMENT_SCALE_X = np.sin(np.radians(15)) * np.cos(np.radians(30)) +MOVEMENT_SCALE_Y = np.cos(np.radians(15)) + + +class LamNIComponents(ScanComponents): + + def lamni_compute_scan_center( + self, x: float, y: float, angle_deg: float + ) -> tuple[float, float]: + """ + Compute the scan center in the stage coordinates based on the + provided center in lamni coordinates and the rotation angle. + + Args: + x (float): Center position in x at 0 deg in lamni coordinates (mm) + y (float): Center position in y at 0 deg in lamni coordinates (mm) + angle_deg (float): Rotation angle in degrees + + Returns: + tuple: (shift_x, shift_y) in mm to be applied to the scan center in the stage coordinates + """ + alpha = angle_deg / 180 * np.pi + stage_x, stage_y = self.lamni_to_stage_coordinates(x, y) + stage_x_rot = np.cos(alpha) * stage_x - np.sin(alpha) * stage_y + stage_y_rot = np.sin(alpha) * stage_x + np.cos(alpha) * stage_y + return self.lamni_from_stage_coordinates(stage_x_rot, stage_y_rot) + + def lamni_to_stage_coordinates(self, x: float, y: float) -> tuple[float, float]: + """convert from lamni coordinates to stage coordinates""" + y_stage = y / MOVEMENT_SCALE_Y + x_stage = 2 * (x - y_stage * MOVEMENT_SCALE_X) + return (x_stage, y_stage) + + def lamni_from_stage_coordinates(self, x_stage: float, y_stage: float) -> tuple[float, float]: + """convert to lamni coordinates from stage coordinates""" + x = x_stage * 0.5 + y_stage * MOVEMENT_SCALE_X + y = y_stage * MOVEMENT_SCALE_Y + return (x, y) + + def lamni_new_scan_center_interferometer(self, x: float, y: float): + """ + Move to the new scan center. + + Args: + x (float): Center position in x in mm + y (float): Center position in y in mm + """ + lsamx_user_params = self._dev.lsamx.user_parameter + if lsamx_user_params is None or lsamx_user_params.get("center") is None: + raise RuntimeError("lsamx center is not defined") + lsamy_user_params = self._dev.lsamy.user_parameter + + if lsamy_user_params is None or lsamy_user_params.get("center") is None: + raise RuntimeError("lsamy center is not defined") + + lsamx_center = lsamx_user_params.get("center") + lsamy_center = lsamy_user_params.get("center") + + # disable the feedback + self._dev.rtx.controller.feedback_disable() + + rtx_current = self._dev.rtx.readback.get() + rty_current = self._dev.rty.readback.get() + lsamx_current = self._dev.lsamx.readback.get() + lsamy_current = self._dev.lsamy.readback.get() + + x_stage, y_stage = self.lamni_to_stage_coordinates(x, y) + x_center_expect, y_center_expect = self.lamni_from_stage_coordinates( + lsamx_current - lsamx_center, lsamy_current - lsamy_center + ) + + # in microns + x_drift = x_center_expect * 1000 - rtx_current + y_drift = y_center_expect * 1000 - rty_current + + logger.info(f"Current uncompensated drift of setup is x={x_drift:.3f}, y={y_drift:.3f}") + + move_x = ( + x_stage + lsamx_center + self.lamni_to_stage_coordinates(x_drift, y_drift)[0] / 1000 + ) + move_y = ( + y_stage + lsamy_center + self.lamni_to_stage_coordinates(x_drift, y_drift)[1] / 1000 + ) + + coarse_move_req_x = np.abs(lsamx_current - move_x) + coarse_move_req_y = np.abs(lsamy_current - move_y) + + self._dev.lsamx.read_only = False + self._dev.lsamy.read_only = False + + if ( + np.abs(y_drift) > 150 + or np.abs(x_drift) > 150 + or (coarse_move_req_y < 0.003 and coarse_move_req_x < 0.003) + ): + logger.info("No drift correction.") + else: + logger.info( + f"Compensating {[val/1000 for val in self.lamni_to_stage_coordinates(x_drift,y_drift)]}" + ) + self._dev.lsamx.set(move_x) + self._dev.lsamy.set(move_y) + + time.sleep(0.01) + rtx_current = self._dev.rtx.readback.get() + rty_current = self._dev.rty.readback.get() + + logger.info(f"New scan center interferometer {rtx_current:.3f}, {rty_current:.3f} microns") + + # second iteration + x_center_expect, y_center_expect = self.lamni_from_stage_coordinates(x_stage, y_stage) + + # in microns + x_drift2 = x_center_expect * 1000 - rtx_current + y_drift2 = y_center_expect * 1000 - rty_current + logger.info( + f"Uncompensated drift of setup after first iteration is x={x_drift2:.3f}," + f" y={y_drift2:.3f}" + ) + + if np.abs(x_drift2) > 5 or np.abs(y_drift2) > 5: + logger.info( + "Compensating second iteration" + f" {[val/1000 for val in self.lamni_to_stage_coordinates(x_drift2,y_drift2)]}" + ) + move_x = ( + x_stage + + lsamx_center + + self.lamni_to_stage_coordinates(x_drift, y_drift)[0] / 1000 + + self.lamni_to_stage_coordinates(x_drift2, y_drift2)[0] / 1000 + ) + move_y = ( + y_stage + + lsamy_center + + self.lamni_to_stage_coordinates(x_drift, y_drift)[1] / 1000 + + self.lamni_to_stage_coordinates(x_drift2, y_drift2)[1] / 1000 + ) + self._dev.lsamx.set(move_x) + self._dev.lsamy.set(move_y) + + time.sleep(0.01) + rtx_current = self._dev.rtx.readback.get() + rty_current = self._dev.rty.readback.get() + + logger.info( + f"New scan center interferometer after second iteration {rtx_current:.3f}," + f" {rty_current:.3f} microns" + ) + x_drift2 = x_center_expect * 1000 - rtx_current + y_drift2 = y_center_expect * 1000 - rty_current + logger.info( + f"Uncompensated drift of setup after second iteration is x={x_drift2:.3f}," + f" y={y_drift2:.3f}" + ) + else: + logger.info("No second iteration required") + + self._dev.lsamx.read_only = True + self._dev.lsamy.read_only = True + + # update angle readback before start of the scan + self._dev.lsamrot.readback.get() + + # re-enable the feedback + self._dev.rtx.controller.feedback_enable_without_reset() diff --git a/csaxs_bec/scans/lamni_fermat_scan_v4.py b/csaxs_bec/scans/lamni_fermat_scan_v4.py new file mode 100644 index 0000000..5c9b638 --- /dev/null +++ b/csaxs_bec/scans/lamni_fermat_scan_v4.py @@ -0,0 +1,422 @@ +""" +LamNI Fermat Scan + +Scan procedure: + - prepare_scan + - open_scan + - stage + - pre_scan + - scan_core + - at_each_point (optionally called by scan_core) + - post_scan + - unstage + - close_scan + - on_exception (called if any exception is raised during the scan) +""" + +from __future__ import annotations + +import time +from typing import Annotated + +import numpy as np +from bec_lib.logger import bec_logger +from bec_lib.scan_args import DefaultArgType, ScanArgument, Units +from bec_server.scan_server.errors import ScanAbortion +from bec_server.scan_server.scans.scan_base import ScanBase, ScanType +from bec_server.scan_server.scans.scan_modifier import scan_hook + +from csaxs_bec.scans.lamni_components import LamNIComponents + +logger = bec_logger.logger + + +class LamniFermatScanV4(ScanBase): + # Scan Type: Hardware triggered or software triggered? + # If the main trigger and readout logic is done within the at_each_point method in scan_core, choose SOFTWARE_TRIGGERED. + # If the main trigger and readout logic is implemented on a device that is simply kicked off in this scan, choose HARDWARE_TRIGGERED. + # This primarily serves as information for devices: The device may need to react differently if a software trigger is expected + # for every point. + scan_type = ScanType.HARDWARE_TRIGGERED + + # Scan name: This is the name of the scan, e.g. "line_scan". This is used for display purposes and to identify the scan type in user interfaces. + # Choose a descriptive name that does not conflict with existing scan names. + # It must be a valid Python identifier, that is, it can only contain letters, numbers, and underscores, and must not start with a number. + scan_name = "lamni_fermat_scan_v4" + + gui_config = { + "Scan Parameters": [ + "fov_size", + "step", + "shift_x", + "shift_y", + "center_x", + "center_y", + "angle", + "stitch_x", + "stitch_y", + "fov_circular", + "stitch_overlap", + "relative", + ], + "Acquisition Parameters": ["exp_time", "frames_per_trigger", "readout_time"], + } + + def __init__( + # fmt: off + self, + fovx: Annotated[float, ScanArgument(display_name="FOV x", description="FOV in the piezo plane (i.e. piezo range). Max 80 um", units=Units.µm, gt=0, lt=80)], + fovy: Annotated[float, ScanArgument(display_name="FOV y", description="FOV in the piezo plane (i.e. piezo range). Max 80 um", units=Units.µm, gt=0, lt=80)], + step: Annotated[float, ScanArgument(display_name="Step", description="Step size", units=Units.µm)], + shift_x: Annotated[float, ScanArgument(display_name="Shift X", description="Extra shift in x. The shift is directly applied to the scan. It will not be auto-rotated.", units=Units.mm)] = 0, + shift_y: Annotated[float, ScanArgument(display_name="Shift Y", description="Extra shift in y. The shift is directly applied to the scan. It will not be auto-rotated.", units=Units.mm)] = 0, + center_x: Annotated[float, ScanArgument(display_name="Center X", description="Center position in x at 0 deg. This shift is rotated using the geometry of LamNI. It is determined by the first 'click' in the x-ray eye alignment procedure.", units=Units.mm)] = 0, + center_y: Annotated[float, ScanArgument(display_name="Center Y", description="Center position in y at 0 deg. This shift is rotated using the geometry of LamNI. It is determined by the first 'click' in the x-ray eye alignment procedure.", units=Units.mm)] = 0, + angle: Annotated[float, ScanArgument(display_name="Angle", description="Rotation angle (will rotate first)", units=Units.deg)] = 0, + stitch_x: Annotated[float, ScanArgument(display_name="Stitch X", description="Shift scan to adjacent stitch region", units=Units.mm)] = 0, + stitch_y: Annotated[float, ScanArgument(display_name="Stitch Y", description="Shift scan to adjacent stitch region", units=Units.mm)] = 0, + fov_circular: Annotated[float, ScanArgument(display_name="Fov Circular", description="Generate a circular field of view in the sample plane. This is an additional cropping to fov_size", units=Units.µm)] = 0, + stitch_overlap: Annotated[float, ScanArgument(display_name="Stitch Overlap", description="Overlap of the stitched regions", units=Units.µm)] = 1, + corridor_size: Annotated[float | None, ScanArgument(display_name="Corridor Size", description="Corridor size for the corridor optimization.", units=Units.µm)] = None, + exp_time: DefaultArgType.ExposureTime = 0, + frames_per_trigger: DefaultArgType.FramesPerTrigger = 1, + readout_time: DefaultArgType.ReadoutTime = 0, + **kwargs, + # fmt: on + ): + """ + LamNI Fermat Scan + + Args: + fovx (float): FOV in the piezo plane (i.e. piezo range) along the x-axis. Max 80 um + fovy (float): FOV in the piezo plane (i.e. piezo range) along the y-axis. Max 80 um + step (float): Step size + shift_x (float): Extra shift in x. The shift is directly applied to the scan. It will not be auto-rotated. + shift_y (float): Extra shift in y. The shift is directly applied to the scan. It will not be auto-rotated. + center_x (float): Center position in x at 0 deg. This shift is rotated using the geometry of LamNI. It is determined by the first 'click' in the x-ray eye alignment procedure. + center_y (float): Center position in y at 0 deg. This shift is rotated using the geometry of LamNI. It is determined by the first 'click' in the x-ray eye alignment procedure. + angle (float): Rotation angle (will rotate first) + stitch_x (float): Shift scan to adjacent stitch region + stitch_y (float): Shift scan to adjacent stitch region + fov_circular (float): Generate a circular field of view in the sample plane. This is an additional cropping to fov_size + stitch_overlap (float): Overlap of the stitched regions + exp_time (float): Exposure time in seconds + corridor_size (float | None): Corridor size for the corridor optimization. If None, the corridor size will be estimated. + frames_per_trigger (int): Number of frames per trigger for devices that support configurable frame counts per trigger. + readout_time (float): Configuration for devices that support configurable readout times. + + Returns: + ScanReport + """ + super().__init__(**kwargs) + self.components = LamNIComponents(self) + self._baseline_readout_status = None + self.fovx = fovx + self.fovy = fovy + self.step = step + self.shift_x = shift_x + self.shift_y = shift_y + self.center_x = center_x + self.center_y = center_y + self.angle = angle + self.stitch_x = stitch_x + self.stitch_y = stitch_y + self.fov_circular = fov_circular + self.stitch_overlap = stitch_overlap + self.exp_time = exp_time + self.frames_per_trigger = frames_per_trigger + self.readout_time = readout_time + self.corridor_size = corridor_size + + self.update_scan_info( + exp_time=exp_time, frames_per_trigger=frames_per_trigger, readout_time=readout_time + ) + + @scan_hook + def prepare_scan(self): + """ + Prepare the scan. This can include any steps that need to be executed + before the scan is opened, such as preparing the positions (if not done already) + or setting up the devices. + """ + + positions = self.get_lamni_fermat_spiral_pos( + -np.abs(self.fovx / 2), + np.abs(self.fovx / 2), + -np.abs(self.fovy / 2), + np.abs(self.fovy / 2), + step=self.step, + spiral_type=0, + center=False, + ) + + if len(positions) < 20: + raise ScanAbortion(f"The number positions must exceed 20. Currently: {len(positions)}.") + + self.positions = self.components.optimize_trajectory( + positions, optimization_type="corridor", corridor_size=self.corridor_size + ) + + self.update_scan_info(num_points=len(self.positions), positions=self.positions) + + self.prepare_setup() + + self.actions.add_scan_report_instruction_device_progress(device="rt_positions") + + self._baseline_readout_status = self.actions.read_baseline_devices(wait=False) + + @scan_hook + def open_scan(self): + """ + Open the scan. + This step must call self.actions.open_scan() to ensure that a new scan is + opened. Make sure to prepare the scan metadata before, either in + prepare_scan() or in open_scan() itself and call self.update_scan_info(...) + to update the scan metadata if needed. + """ + self.actions.open_scan() + + @scan_hook + def stage(self): + """ + Stage the devices for the upcoming scan. The stage logic is typically + implemented on the device itself (i.e. by the device's stage method). + However, if there are any additional steps that need to be executed before + staging the devices, they can be implemented here. + """ + self.actions.stage_all_devices() + + @scan_hook + def pre_scan(self): + """ + Pre-scan steps to be executed before the main scan logic. + This is typically the last chance to prepare the devices before the core scan + logic is executed. For example, this is a good place to initialize time-criticial + devices, e.g. devices that have a short timeout. + The pre-scan logic is typically implemented on the device itself. + """ + self.actions.pre_scan_all_devices() + + @scan_hook + def scan_core(self): + """ + Core scan logic to be executed during the scan. + This is where the main scan logic should be implemented. + """ + + self.actions.kickoff(device="rt_positions") + + status = self.actions.complete(device="rt_positions", wait=False) + + while not status.done: + self.at_each_point() + time.sleep(1) + + @scan_hook + def at_each_point(self): + """ + Logic to be executed at each acquisition point during the scan. + """ + self.actions.read_monitored_devices() + + @scan_hook + def post_scan(self): + """ + Post-scan steps to be executed after the main scan logic. + """ + self.actions.complete_all_devices() + + @scan_hook + def unstage(self): + """Unstage the scan by executing post-scan steps.""" + self.actions.unstage_all_devices() + + @scan_hook + def close_scan(self): + """Close the scan.""" + if self._baseline_readout_status is not None: + self._baseline_readout_status.wait() + self.actions.close_scan() + self.actions.check_for_unchecked_statuses() + + @scan_hook + def on_exception(self, exception: Exception): + """ + Handle exceptions that occur during the scan. + This is a good place to implement any cleanup logic that needs to be executed in case of an exception, + such as returning the devices to a safe state or moving the motors back to their starting position. + """ + + ####################################################### + ######### Helper methods for the scan logic ########### + ####################################################### + + def get_lamni_fermat_spiral_pos( + self, + m1_start: float, + m1_stop: float, + m2_start: float, + m2_stop: float, + step: float = 1, + spiral_type: float = 0, + center: bool = False, + ) -> np.ndarray: + """Generate positions for a Fermat spiral scan pattern for LamNI. + + Args: + m1_start (float): start position motor 1 + m1_stop (float): end position motor 1 + m2_start (float): start position motor 2 + m2_stop (float): end position motor 2 + step (float, optional): Step size. Defaults to 1. + spiral_type (float, optional): Angular offset in radians that determines the shape of the spiral. + A spiral with spiral_type=2 is the same as spiral_type=0. Defaults to 0. + center (bool, optional): Add a center point. Defaults to False. + + Returns: + np.ndarray: Array of positions for the Fermat spiral scan. + """ + positions = [] + phi = 2 * np.pi * ((1 + np.sqrt(5)) / 2.0) + spiral_type * np.pi + + start = int(not center) + + length_axis1 = np.abs(m1_stop - m1_start) + length_axis2 = np.abs(m2_stop - m2_start) + n_max = int(length_axis1 * length_axis2 * 3.2 / step / step) + + total_shift_x, total_shift_y = self._compute_total_shift() + + for ii in range(start, n_max): + radius = step * 0.57 * np.sqrt(ii) + x = radius * np.sin(ii * phi) + y = radius * np.cos(ii * phi) + if self._lamni_check_pos_in_fov_range_and_circ_fov(x, y): + positions.extend([(x + total_shift_x * 1000, y + total_shift_y * 1000)]) + # for testing we just shift by center_i and prepare also the setup to center_i + return np.array(positions) + + def _compute_total_shift(self) -> tuple[float, float]: + _shfitx, _shfity = self.components.lamni_compute_scan_center( + self.center_x, self.center_y, self.angle + ) + x_stitch_shift, y_stitch_shift = self._lamni_compute_stitch_center( + self.stitch_x, self.stitch_y, self.angle + ) + logger.info( + f"Total shift [mm] {_shfitx+x_stitch_shift/1000+self.shift_x}," + f" {_shfity+y_stitch_shift/1000+self.shift_y}" + ) + return ( + _shfitx + x_stitch_shift / 1000 + self.shift_x, + _shfity + y_stitch_shift / 1000 + self.shift_y, + ) + + def _lamni_compute_stitch_center( + self, xcount: float, ycount: float, angle_deg: float + ) -> tuple[float, float]: + """ + Compute the stitch center in the stage coordinates based on the provided stitch counts and the rotation angle. + + Args: + xcount (float): Stitch count in x direction + ycount (float): Stitch count in y direction + angle_deg (float): Rotation angle in degrees + + Returns: + tuple: (shift_x, shift_y) in mm to be applied to the scan center in the stage coordinates + """ + alpha = angle_deg / 180 * np.pi + stage_x = xcount * (self.fovx - self.stitch_overlap) + stage_y = ycount * (self.fovy - self.stitch_overlap) + x_rot = np.cos(alpha) * stage_x - np.sin(alpha) * stage_y + y_rot = np.sin(alpha) * stage_x + np.cos(alpha) * stage_y + + return self.components.lamni_from_stage_coordinates(x_rot, y_rot) + + def _lamni_check_pos_in_fov_range_and_circ_fov(self, x: float, y: float) -> bool: + """ + Check if the given position is within the FOV range and circular FOV (if specified). + + Args: + x (float): x position in lamni coordinates (mm) + y (float): y position in lamni coordinates (mm) + + Returns: + bool: True if the position is within the FOV range and circular FOV, False otherwise. + """ + # this function checks if positions are reachable in a scan + # these x y intererometer positions are not shifted to the scan center + # so its purpose is to see if the position is reachable by the + # rotated piezo stage. For a scan these positions have to be shifted to + # the current scan center before starting the scan + stage_x, stage_y = self.components.lamni_to_stage_coordinates(x, y) + stage_x_with_stitch, stage_y_with_stitch = self._lamni_compute_stitch_center( + self.stitch_x, self.stitch_y, self.angle + ) + stage_x_with_stitch, stage_y_with_stitch = self.components.lamni_to_stage_coordinates( + stage_x_with_stitch, stage_y_with_stitch + ) + + # piezo stage is currently rotated to stage_angle_deg in degrees + # rotate positions to the piezo stage system + alpha = (self.angle - 300 + 30.5) / 180 * np.pi + stage_x_rot = np.cos(alpha) * stage_x + np.sin(alpha) * stage_y + stage_y_rot = -np.sin(alpha) * stage_x + np.cos(alpha) * stage_y + + stage_x_rot_with_stitch = ( + np.cos(alpha) * stage_x_with_stitch + np.sin(alpha) * stage_y_with_stitch + ) + stage_y_rot_with_stitch = ( + -np.sin(alpha) * stage_x_with_stitch + np.cos(alpha) * stage_y_with_stitch + ) + + return ( + np.abs(stage_x_rot) <= (self.fovy / 2) + and np.abs(stage_y_rot) <= (self.fovx / 2) + and ( + self.fov_circular == 0 + or ( + np.power((stage_x_rot_with_stitch + stage_x_rot), 2) + + np.power((stage_y_rot_with_stitch + stage_y_rot), 2) + ) + <= pow((self.fov_circular / 2), 2) + ) + ) + + def lamni_rotation(self, angle: float): + """ + Rotate LamNI to the specified angle. The rotation is only performed + if the current setpoint of the rotation stage is different from the requested angle. + + Args: + angle (float): Rotation angle in degrees + """ + # get last setpoint (cannot be based on pos get because they will deviate slightly) + lsamrot_current_setpoint = self.dev.lsamrot.user_setpoint.get() + if angle == lsamrot_current_setpoint: + logger.info("No rotation required") + return + + logger.info("Rotating to requested angle") + self.actions.add_scan_report_instruction_readback( + devices=["lsamrot"], start=[lsamrot_current_setpoint], stop=[angle] + ) + self.dev.lsamrot.set(angle) + + def prepare_setup(self): + """ + Prepare the setup for the scan: + - Clear the trajectory generator of the RT controller to remove any previous positions. + - Rotate LamNI to the requested angle. + - Compute the total shift based on the center, stitch, and user-defined shifts, and + move to the new scan center using the interferometer feedback. + - Transfer the positions to the RT controller by adding them to the trajectory generator. + """ + self.dev.rtx.controller.clear_trajectory_generator() + self.lamni_rotation(self.angle) + total_shift_x, total_shift_y = self._compute_total_shift() + self.components.lamni_new_scan_center_interferometer(total_shift_x, total_shift_y) + + # Transfer the positions to the RT controller + self.dev.rtx.controller.add_pos_to_scan(self.positions.tolist()) diff --git a/csaxs_bec/scans/lamni_move_to_scan_center.py b/csaxs_bec/scans/lamni_move_to_scan_center.py new file mode 100644 index 0000000..36407fc --- /dev/null +++ b/csaxs_bec/scans/lamni_move_to_scan_center.py @@ -0,0 +1,119 @@ +""" +LamNI scan to move the interferometer to the computed scan center based on the provided shift and angle. + +Scan procedure: + - prepare_scan + - open_scan + - stage + - pre_scan + - scan_core + - at_each_point (optionally called by scan_core) + - post_scan + - unstage + - close_scan + - on_exception (called if any exception is raised during the scan) +""" + +from __future__ import annotations + +from typing import Annotated + +from bec_lib.scan_args import ScanArgument, Units +from bec_server.scan_server.scans.scan_base import ScanBase +from bec_server.scan_server.scans.scan_modifier import scan_hook + +from csaxs_bec.scans.lamni_components import LamNIComponents + + +class LamniMoveToScanCenter(ScanBase): + # Scan Type: Hardware triggered or software triggered? + # If the main trigger and readout logic is done within the at_each_point method in scan_core, choose SOFTWARE_TRIGGERED. + # If the main trigger and readout logic is implemented on a device that is simply kicked off in this scan, choose HARDWARE_TRIGGERED. + # This primarily serves as information for devices: The device may need to react differently if a software trigger is expected + # for every point. + scan_type = None + is_scan = False + + # Scan name: This is the name of the scan, e.g. "line_scan". This is used for display purposes and to identify the scan type in user interfaces. + # Choose a descriptive name that does not conflict with existing scan names. + # It must be a valid Python identifier, that is, it can only contain letters, numbers, and underscores, and must not start with a number. + scan_name = "lamni_move_to_scan_center" + + gui_config = {"Scan Parameters": ["shift_x", "shift_y", "angle"]} + + def __init__( + self, + shift_x: Annotated[ + float, ScanArgument(display_name="Shift X", description="Shift x.", units=Units.mm) + ], + shift_y: Annotated[ + float, ScanArgument(display_name="Shift Y", description="Shift y.", units=Units.mm) + ], + angle: Annotated[ + float, ScanArgument(display_name="Angle", description="Angle.", units=Units.deg) + ], + **kwargs, + ): + """ + LamNI scan to move the interferometer to the computed scan center based on the provided shift and angle. + + Args: + shift_x (float): Shift x. + shift_y (float): Shift y. + angle (float): Angle. + + Returns: + ScanReport + """ + super().__init__(**kwargs) + self.components = LamNIComponents(self) + self._baseline_readout_status = None + self.shift_x = shift_x + self.shift_y = shift_y + self.angle = angle + + self.update_scan_info() + + @scan_hook + def prepare_scan(self): ... + + @scan_hook + def open_scan(self): ... + + @scan_hook + def stage(self): ... + + @scan_hook + def pre_scan(self): ... + + @scan_hook + def scan_core(self): + """ + Core scan logic to be executed during the scan. + This is where the main scan logic should be implemented. + """ + center_x, center_y = self.components.lamni_compute_scan_center( + self.shift_x, self.shift_y, self.angle + ) + self.components.lamni_new_scan_center_interferometer(center_x, center_y) + + @scan_hook + def at_each_point(self): ... + + @scan_hook + def post_scan(self): ... + + @scan_hook + def unstage(self): ... + + @scan_hook + def close_scan(self): ... + + @scan_hook + def on_exception(self, exception: Exception): ... + + ####################################################### + ######### Helper methods for the scan logic ########### + ####################################################### + + # Implement scan-specific helper methods below.