feat: migrate to v4 scans
CI for csaxs_bec / test (push) Successful in 1m34s
CI for csaxs_bec / test (pull_request) Successful in 1m29s

This commit is contained in:
2026-05-20 10:23:17 +02:00
parent bde111f995
commit 41c4a2693e
5 changed files with 1144 additions and 0 deletions
+3
View File
@@ -1,5 +1,8 @@
from .flomni_fermat_scan import FlomniFermatScan
from .flomni_fermat_scan_v4 import FlomniFermatScanV4
from .jungfrau_joch_scan import JungfrauJochTestScan
from .lamni_fermat_scan_v4 import LamniFermatScanV4
from .lamni_move_to_scan_center import LamniMoveToScanCenter
from .LamNIFermatScan import LamNIFermatScan, LamNIMoveToScanCenter
from .omny_fermat_scan import OMNYFermatScan
from .owis_grid import OwisGrid
+425
View File
@@ -0,0 +1,425 @@
"""
flOMNI Fermat Scan
Scan procedure:
- prepare_scan
- open_scan
- stage
- pre_scan
- scan_core
- at_each_point (optionally called by scan_core)
- post_scan
- unstage
- close_scan
- on_exception (called if any exception is raised during the scan)
"""
from __future__ import annotations
import time
from typing import Annotated
import numpy as np
from bec_lib import messages
from bec_lib.alarm_handler import Alarms
from bec_lib.endpoints import MessageEndpoints
from bec_lib.logger import bec_logger
from bec_lib.scan_args import DefaultArgType, ScanArgument, Units
from bec_server.scan_server.scans import ScanAbortion
from bec_server.scan_server.scans.scan_base import ScanBase, ScanType
from bec_server.scan_server.scans.scan_modifier import scan_hook
from csaxs_bec.devices.epics.delay_generator_csaxs.delay_generator_csaxs import TRIGGERSOURCE
logger = bec_logger.logger
class FlomniFermatScanV4(ScanBase):
# Scan Type: Hardware triggered or software triggered?
# If the main trigger and readout logic is done within the at_each_point method in scan_core, choose SOFTWARE_TRIGGERED.
# If the main trigger and readout logic is implemented on a device that is simply kicked off in this scan, choose HARDWARE_TRIGGERED.
# This primarily serves as information for devices: The device may need to react differently if a software trigger is expected
# for every point.
scan_type = ScanType.HARDWARE_TRIGGERED
# Scan name: This is the name of the scan, e.g. "line_scan". This is used for display purposes and to identify the scan type in user interfaces.
# Choose a descriptive name that does not conflict with existing scan names.
# It must be a valid Python identifier, that is, it can only contain letters, numbers, and underscores, and must not start with a number.
scan_name = "flomni_fermat_scan_v4"
gui_config = {
"Scan Parameters": [
"fovx",
"fovy",
"cenx",
"ceny",
"step",
"zshift",
"angle",
"corridor_size",
"relative",
],
"Acquisition Parameters": ["exp_time", "frames_per_trigger", "burst_at_each_point"],
}
def __init__(
# fmt: off
self,
fovx: Annotated[float, ScanArgument(display_name="Fovx", description="FOV in the piezo plane (i.e. piezo range). Max 200 um.", units=Units.µm, gt=0, lt=200)],
fovy: Annotated[float, ScanArgument(display_name="Fovy", description="FOV in the piezo plane (i.e. piezo range). Max 100 um.", units=Units.µm, gt=0, lt=100)],
cenx: Annotated[float, ScanArgument(display_name="Cenx", description="Center position in x.", units=Units.µm)],
ceny: Annotated[float, ScanArgument(display_name="Ceny", description="Center position in y.", units=Units.µm)],
step: Annotated[float, ScanArgument(display_name="Step", description="Step size.", units=Units.µm)],
zshift: Annotated[float, ScanArgument(display_name="Zshift", description="Shift in z. ", units=Units.µm)],
angle: Annotated[float, ScanArgument(display_name="Angle", description="Rotation angle (will rotate first)", units=Units.deg)],
corridor_size: Annotated[float, ScanArgument(display_name="Corridor Size", description="Corridor size for the corridor optimization.", units=Units.µm)],
exp_time: DefaultArgType.ExposureTime = 0,
frames_per_trigger: DefaultArgType.FramesPerTrigger = 1,
burst_at_each_point: DefaultArgType.BurstAtEachPoint = 1,
**kwargs,
# fmt: on
):
"""
flOMNI Fermat Scan
Args:
fovx (float): FOV in the piezo plane (i.e. piezo range). Max 200 um.
fovy (float): FOV in the piezo plane (i.e. piezo range). Max 100 um.
cenx (float): Center position in x.
ceny (float): Center position in y.
step (float): Step size.
zshift (float): Shift in z.
angle (float): Rotation angle (will rotate first)
corridor_size (float): Corridor size for the corridor optimization.
exp_time (float): Exposure time in seconds
frames_per_trigger (int): Number of frames per trigger for devices that support configurable frame counts per trigger.
burst_at_each_point (int): Number of triggers and readouts at each point.
Returns:
ScanReport
"""
super().__init__(**kwargs)
self._baseline_readout_status = None
self.fovx = fovx
self.fovy = fovy
self.cenx = cenx
self.ceny = ceny
self.step = step
self.zshift = zshift
self.angle = angle
self.corridor_size = corridor_size
self.exp_time = exp_time
self.frames_per_trigger = frames_per_trigger
self.burst_at_each_point = burst_at_each_point
self.flomni_rotation_status = None
if self.zshift > 100:
logger.warning("The zshift is larger than 100 um. It will be limited to 100 um.")
self.zshift = 100
if self.zshift < -100:
logger.warning("The zshift is smaller than -100 um. It will be limited to -100 um.")
self.zshift = -100
self.update_scan_info(
exp_time=exp_time,
frames_per_trigger=frames_per_trigger,
burst_at_each_point=burst_at_each_point,
)
@scan_hook
def prepare_scan(self):
"""
Prepare the scan. This can include any steps that need to be executed
before the scan is opened, such as preparing the positions (if not done already)
or setting up the devices.
"""
positions = self.get_flomni_fermat_spiral_pos(
-np.abs(self.fovx / 2),
np.abs(self.fovx / 2),
-np.abs(self.fovy / 2),
np.abs(self.fovy / 2),
step=self.step,
spiral_type=0,
center=False,
)
if len(positions) < 20:
raise ScanAbortion(f"The number positions must exceed 20. Currently: {len(positions)}.")
self.positions = self.components.optimize_trajectory(
positions, optimization_type="corridor", corridor_size=self.corridor_size
)
flip_axes = self.reverse_trajectory()
if flip_axes:
self.positions = np.flipud(self.positions)
self.update_scan_info(positions=self.positions, num_points=len(self.positions))
self.prepare_setup()
self.actions.add_scan_report_instruction_device_progress(device="rt_positions")
self._baseline_readout_status = self.actions.read_baseline_devices(wait=False)
@scan_hook
def open_scan(self):
"""
Open the scan.
This step must call self.actions.open_scan() to ensure that a new scan is
opened. Make sure to prepare the scan metadata before, either in
prepare_scan() or in open_scan() itself and call self.update_scan_info(...)
to update the scan metadata if needed.
"""
self.actions.open_scan()
@scan_hook
def stage(self):
"""
Stage the devices for the upcoming scan. The stage logic is typically
implemented on the device itself (i.e. by the device's stage method).
However, if there are any additional steps that need to be executed before
staging the devices, they can be implemented here.
"""
self.actions.stage_all_devices()
@scan_hook
def pre_scan(self):
"""
Pre-scan steps to be executed before the main scan logic.
This is typically the last chance to prepare the devices before the core scan
logic is executed. For example, this is a good place to initialize time-criticial
devices, e.g. devices that have a short timeout.
The pre-scan logic is typically implemented on the device itself.
"""
self.prepare_setup_part_2()
self.actions.pre_scan_all_devices()
@scan_hook
def scan_core(self):
"""
Core scan logic to be executed during the scan.
This is where the main scan logic should be implemented.
"""
# send off the flyer
self.dev.rt_positions.kickoff().wait()
# start the readout loop of the flyer
status = self.actions.complete(device="rt_positions", wait=False)
while not status.done:
self.at_each_point()
@scan_hook
def at_each_point(self):
"""
Logic to be executed at each acquisition point during the scan.
"""
self.actions.read_monitored_devices()
time.sleep(1)
@scan_hook
def post_scan(self):
"""
Post-scan steps to be executed after the main scan logic.
"""
# in flomni, we need to move to the start position of the next scan,
# which is the end position of the current scan
move_status = None
if isinstance(self.positions, np.ndarray) and len(self.positions[-1]) == 3:
# in x we move to cenx, then we avoid jumps in centering routine
value = self.positions[-1]
value[0] = self.cenx
move_status = self.actions.set(device=["rtx", "rty", "rtz"], value=value, wait=False)
self.actions.complete_all_devices()
if move_status:
move_status.wait()
self.dev.ddg1.set_trigger(TRIGGERSOURCE.SINGLE_SHOT.value)
@scan_hook
def unstage(self):
"""Unstage the scan by executing post-scan steps."""
self.actions.unstage_all_devices()
@scan_hook
def close_scan(self):
"""Close the scan."""
if self._baseline_readout_status is not None:
self._baseline_readout_status.wait()
self.actions.close_scan()
self.actions.check_for_unchecked_statuses()
@scan_hook
def on_exception(self, exception: Exception):
"""
Handle exceptions that occur during the scan.
This is a good place to implement any cleanup logic that needs to be executed in case of an exception,
such as returning the devices to a safe state or moving the motors back to their starting position.
"""
self.dev.ddg1.set_trigger(TRIGGERSOURCE.SINGLE_SHOT.value)
#######################################################
######### Helper methods for the scan logic ###########
#######################################################
def reverse_trajectory(self):
"""
Reverse the trajectory. Every other scan should be reversed to
shorten the movement time. In order to keep the last state, even if the
server is restarted, the state is stored in a global variable in redis.
"""
msg = self.redis_connector.get(MessageEndpoints.global_vars("reverse_flomni_trajectory"))
if msg:
val = msg.content.get("value", False)
else:
val = False
self.redis_connector.set(
MessageEndpoints.global_vars("reverse_flomni_trajectory"),
messages.VariableMessage(value=(not val)),
)
return val
def prepare_setup(self):
"""
Prepare the first part of the setup:
- Clear the trajectory of the rt controller
- Rotate flomni to the requested angle
- Move rty to the start position
"""
self.dev.rtx.controller.clear_trajectory_generator()
self.flomni_rotation(self.angle)
self.dev.rty.set(self.positions[0][1])
def prepare_setup_part_2(self):
"""
Prepare the second part of the setup:
- Set the delay generator ddg1 to external rising edge
- Wait for flomni rotation to complete (started in prepare_setup)
- Move rtx and rtz to the start position
- Turn on the laser tracker
- Add the positions to the rt controller's scan trajectory
- Check the signal strength of the laser tracker and raise an alarm if it is low
- Move samx to the scan region
"""
dev = self.dev
# Prepare DDG1
dev.ddg1.set_trigger(TRIGGERSOURCE.EXT_RISING_EDGE.value)
if self.flomni_rotation_status:
self.flomni_rotation_status.wait()
rtx_status = dev.rtx.set(self.cenx, wait=False)
rtz_status = dev.rtz.set(self.positions[0][2], wait=False)
dev.rtx.controller.laser_tracker_on()
rtx_status.wait()
rtz_status.wait()
dev.rtx.controller.add_pos_to_scan(self.positions.tolist())
tracker_signal_status = dev.rtx.controller.laser_tracker_check_signalstrength()
dev.rtx.controller.move_samx_to_scan_region(self.cenx)
if tracker_signal_status == "low":
error_info = messages.ErrorInfo(
error_message="Signal strength of the laser tracker is low, but sufficient to continue. Realignment recommended!",
compact_error_message="Low signal strength of the laser tracker. Realignment recommended!",
exception_type="LaserTrackerSignalStrengthLow",
device="rtx",
)
self.device_manager.connector.raise_alarm(severity=Alarms.WARNING, info=error_info)
elif tracker_signal_status == "toolow":
raise ScanAbortion(
"Signal strength of the laser tracker is too low for scanning. Realignment required!"
)
def flomni_rotation(self, angle: float):
"""
Rotate flomni to the requested angle.
We also emit a scan report instruction to keep users informed about the progress of the
rotation as it may take a few seconds.
Note that we do not wait for the rotation to complete here, but
instead wait in prepare_setup_part_2.
Args:
angle (float): The target angle for the flomni rotation.
"""
fsamroy_current_setpoint = self.dev.fsamroy.user_setpoint.get()
if angle == fsamroy_current_setpoint:
logger.info("No rotation required")
return
logger.info("Rotating to requested angle")
self.actions.add_scan_report_instruction_readback(
devices=["fsamroy"],
start=[fsamroy_current_setpoint],
stop=[angle],
request_id=self.scan_info.metadata["RID"],
)
self.flomni_rotation_status = self.dev.fsamroy.set(angle, wait=False)
def get_flomni_fermat_spiral_pos(
self,
m1_start: float,
m1_stop: float,
m2_start: float,
m2_stop: float,
step: float = 1,
spiral_type: int = 0,
center: bool = False,
):
"""
Calculate positions for a Fermat spiral scan.
Args:
m1_start(float): start position in m1
m1_stop(float): stop position in m1
m2_start(float): start position in m2
m2_stop(float): stop position in m2
step(float): stepsize
spiral_type(int): 0 for traditional Fermat spiral
center(bool): whether to include the center position
Returns:
positions(array): positions
"""
positions = []
phi = 2 * np.pi * ((1 + np.sqrt(5)) / 2.0) + spiral_type * np.pi
start = int(not center)
length_axis1 = np.abs(m1_stop - m1_start)
length_axis2 = np.abs(m2_stop - m2_start)
n_max = int(length_axis1 * length_axis2 * 3.2 / step / step)
z_pos = self.zshift
for ii in range(start, n_max):
radius = step * 0.57 * np.sqrt(ii)
# FOV is restructed below at check pos in range
if abs(radius * np.sin(ii * phi)) > length_axis1 / 2:
continue
if abs(radius * np.cos(ii * phi)) > length_axis2 / 2:
continue
x = radius * np.sin(ii * phi)
y = radius * np.cos(ii * phi)
positions.append([x + self.cenx, y + self.ceny, z_pos])
left_lower_corner = [
min(m1_start, m1_stop) + self.cenx,
min(m2_start, m2_stop) + self.ceny,
z_pos,
]
right_upper_corner = [
max(m1_start, m1_stop) + self.cenx,
max(m2_start, m2_stop) + self.ceny,
z_pos,
]
positions.append(left_lower_corner)
positions.append(right_upper_corner)
return np.array(positions)
+175
View File
@@ -0,0 +1,175 @@
from __future__ import annotations
import time
import numpy as np
from bec_lib.logger import bec_logger
from bec_server.scan_server.scans.scan_components import ScanComponents
logger = bec_logger.logger
MOVEMENT_SCALE_X = np.sin(np.radians(15)) * np.cos(np.radians(30))
MOVEMENT_SCALE_Y = np.cos(np.radians(15))
class LamNIComponents(ScanComponents):
def lamni_compute_scan_center(
self, x: float, y: float, angle_deg: float
) -> tuple[float, float]:
"""
Compute the scan center in the stage coordinates based on the
provided center in lamni coordinates and the rotation angle.
Args:
x (float): Center position in x at 0 deg in lamni coordinates (mm)
y (float): Center position in y at 0 deg in lamni coordinates (mm)
angle_deg (float): Rotation angle in degrees
Returns:
tuple: (shift_x, shift_y) in mm to be applied to the scan center in the stage coordinates
"""
alpha = angle_deg / 180 * np.pi
stage_x, stage_y = self.lamni_to_stage_coordinates(x, y)
stage_x_rot = np.cos(alpha) * stage_x - np.sin(alpha) * stage_y
stage_y_rot = np.sin(alpha) * stage_x + np.cos(alpha) * stage_y
return self.lamni_from_stage_coordinates(stage_x_rot, stage_y_rot)
def lamni_to_stage_coordinates(self, x: float, y: float) -> tuple[float, float]:
"""convert from lamni coordinates to stage coordinates"""
y_stage = y / MOVEMENT_SCALE_Y
x_stage = 2 * (x - y_stage * MOVEMENT_SCALE_X)
return (x_stage, y_stage)
def lamni_from_stage_coordinates(self, x_stage: float, y_stage: float) -> tuple[float, float]:
"""convert to lamni coordinates from stage coordinates"""
x = x_stage * 0.5 + y_stage * MOVEMENT_SCALE_X
y = y_stage * MOVEMENT_SCALE_Y
return (x, y)
def lamni_new_scan_center_interferometer(self, x: float, y: float):
"""
Move to the new scan center.
Args:
x (float): Center position in x in mm
y (float): Center position in y in mm
"""
lsamx_user_params = self._dev.lsamx.user_parameter
if lsamx_user_params is None or lsamx_user_params.get("center") is None:
raise RuntimeError("lsamx center is not defined")
lsamy_user_params = self._dev.lsamy.user_parameter
if lsamy_user_params is None or lsamy_user_params.get("center") is None:
raise RuntimeError("lsamy center is not defined")
lsamx_center = lsamx_user_params.get("center")
lsamy_center = lsamy_user_params.get("center")
# disable the feedback
self._dev.rtx.controller.feedback_disable()
rtx_current = self._dev.rtx.readback.get()
rty_current = self._dev.rty.readback.get()
lsamx_current = self._dev.lsamx.readback.get()
lsamy_current = self._dev.lsamy.readback.get()
x_stage, y_stage = self.lamni_to_stage_coordinates(x, y)
x_center_expect, y_center_expect = self.lamni_from_stage_coordinates(
lsamx_current - lsamx_center, lsamy_current - lsamy_center
)
# in microns
x_drift = x_center_expect * 1000 - rtx_current
y_drift = y_center_expect * 1000 - rty_current
logger.info(f"Current uncompensated drift of setup is x={x_drift:.3f}, y={y_drift:.3f}")
move_x = (
x_stage + lsamx_center + self.lamni_to_stage_coordinates(x_drift, y_drift)[0] / 1000
)
move_y = (
y_stage + lsamy_center + self.lamni_to_stage_coordinates(x_drift, y_drift)[1] / 1000
)
coarse_move_req_x = np.abs(lsamx_current - move_x)
coarse_move_req_y = np.abs(lsamy_current - move_y)
self._dev.lsamx.read_only = False
self._dev.lsamy.read_only = False
if (
np.abs(y_drift) > 150
or np.abs(x_drift) > 150
or (coarse_move_req_y < 0.003 and coarse_move_req_x < 0.003)
):
logger.info("No drift correction.")
else:
logger.info(
f"Compensating {[val/1000 for val in self.lamni_to_stage_coordinates(x_drift,y_drift)]}"
)
self._dev.lsamx.set(move_x)
self._dev.lsamy.set(move_y)
time.sleep(0.01)
rtx_current = self._dev.rtx.readback.get()
rty_current = self._dev.rty.readback.get()
logger.info(f"New scan center interferometer {rtx_current:.3f}, {rty_current:.3f} microns")
# second iteration
x_center_expect, y_center_expect = self.lamni_from_stage_coordinates(x_stage, y_stage)
# in microns
x_drift2 = x_center_expect * 1000 - rtx_current
y_drift2 = y_center_expect * 1000 - rty_current
logger.info(
f"Uncompensated drift of setup after first iteration is x={x_drift2:.3f},"
f" y={y_drift2:.3f}"
)
if np.abs(x_drift2) > 5 or np.abs(y_drift2) > 5:
logger.info(
"Compensating second iteration"
f" {[val/1000 for val in self.lamni_to_stage_coordinates(x_drift2,y_drift2)]}"
)
move_x = (
x_stage
+ lsamx_center
+ self.lamni_to_stage_coordinates(x_drift, y_drift)[0] / 1000
+ self.lamni_to_stage_coordinates(x_drift2, y_drift2)[0] / 1000
)
move_y = (
y_stage
+ lsamy_center
+ self.lamni_to_stage_coordinates(x_drift, y_drift)[1] / 1000
+ self.lamni_to_stage_coordinates(x_drift2, y_drift2)[1] / 1000
)
self._dev.lsamx.set(move_x)
self._dev.lsamy.set(move_y)
time.sleep(0.01)
rtx_current = self._dev.rtx.readback.get()
rty_current = self._dev.rty.readback.get()
logger.info(
f"New scan center interferometer after second iteration {rtx_current:.3f},"
f" {rty_current:.3f} microns"
)
x_drift2 = x_center_expect * 1000 - rtx_current
y_drift2 = y_center_expect * 1000 - rty_current
logger.info(
f"Uncompensated drift of setup after second iteration is x={x_drift2:.3f},"
f" y={y_drift2:.3f}"
)
else:
logger.info("No second iteration required")
self._dev.lsamx.read_only = True
self._dev.lsamy.read_only = True
# update angle readback before start of the scan
self._dev.lsamrot.readback.get()
# re-enable the feedback
self._dev.rtx.controller.feedback_enable_without_reset()
+422
View File
@@ -0,0 +1,422 @@
"""
LamNI Fermat Scan
Scan procedure:
- prepare_scan
- open_scan
- stage
- pre_scan
- scan_core
- at_each_point (optionally called by scan_core)
- post_scan
- unstage
- close_scan
- on_exception (called if any exception is raised during the scan)
"""
from __future__ import annotations
import time
from typing import Annotated
import numpy as np
from bec_lib.logger import bec_logger
from bec_lib.scan_args import DefaultArgType, ScanArgument, Units
from bec_server.scan_server.errors import ScanAbortion
from bec_server.scan_server.scans.scan_base import ScanBase, ScanType
from bec_server.scan_server.scans.scan_modifier import scan_hook
from csaxs_bec.scans.lamni_components import LamNIComponents
logger = bec_logger.logger
class LamniFermatScanV4(ScanBase):
# Scan Type: Hardware triggered or software triggered?
# If the main trigger and readout logic is done within the at_each_point method in scan_core, choose SOFTWARE_TRIGGERED.
# If the main trigger and readout logic is implemented on a device that is simply kicked off in this scan, choose HARDWARE_TRIGGERED.
# This primarily serves as information for devices: The device may need to react differently if a software trigger is expected
# for every point.
scan_type = ScanType.HARDWARE_TRIGGERED
# Scan name: This is the name of the scan, e.g. "line_scan". This is used for display purposes and to identify the scan type in user interfaces.
# Choose a descriptive name that does not conflict with existing scan names.
# It must be a valid Python identifier, that is, it can only contain letters, numbers, and underscores, and must not start with a number.
scan_name = "lamni_fermat_scan_v4"
gui_config = {
"Scan Parameters": [
"fov_size",
"step",
"shift_x",
"shift_y",
"center_x",
"center_y",
"angle",
"stitch_x",
"stitch_y",
"fov_circular",
"stitch_overlap",
"relative",
],
"Acquisition Parameters": ["exp_time", "frames_per_trigger", "readout_time"],
}
def __init__(
# fmt: off
self,
fovx: Annotated[float, ScanArgument(display_name="FOV x", description="FOV in the piezo plane (i.e. piezo range). Max 80 um", units=Units.µm, gt=0, lt=80)],
fovy: Annotated[float, ScanArgument(display_name="FOV y", description="FOV in the piezo plane (i.e. piezo range). Max 80 um", units=Units.µm, gt=0, lt=80)],
step: Annotated[float, ScanArgument(display_name="Step", description="Step size", units=Units.µm)],
shift_x: Annotated[float, ScanArgument(display_name="Shift X", description="Extra shift in x. The shift is directly applied to the scan. It will not be auto-rotated.", units=Units.mm)] = 0,
shift_y: Annotated[float, ScanArgument(display_name="Shift Y", description="Extra shift in y. The shift is directly applied to the scan. It will not be auto-rotated.", units=Units.mm)] = 0,
center_x: Annotated[float, ScanArgument(display_name="Center X", description="Center position in x at 0 deg. This shift is rotated using the geometry of LamNI. It is determined by the first 'click' in the x-ray eye alignment procedure.", units=Units.mm)] = 0,
center_y: Annotated[float, ScanArgument(display_name="Center Y", description="Center position in y at 0 deg. This shift is rotated using the geometry of LamNI. It is determined by the first 'click' in the x-ray eye alignment procedure.", units=Units.mm)] = 0,
angle: Annotated[float, ScanArgument(display_name="Angle", description="Rotation angle (will rotate first)", units=Units.deg)] = 0,
stitch_x: Annotated[float, ScanArgument(display_name="Stitch X", description="Shift scan to adjacent stitch region", units=Units.mm)] = 0,
stitch_y: Annotated[float, ScanArgument(display_name="Stitch Y", description="Shift scan to adjacent stitch region", units=Units.mm)] = 0,
fov_circular: Annotated[float, ScanArgument(display_name="Fov Circular", description="Generate a circular field of view in the sample plane. This is an additional cropping to fov_size", units=Units.µm)] = 0,
stitch_overlap: Annotated[float, ScanArgument(display_name="Stitch Overlap", description="Overlap of the stitched regions", units=Units.µm)] = 1,
corridor_size: Annotated[float | None, ScanArgument(display_name="Corridor Size", description="Corridor size for the corridor optimization.", units=Units.µm)] = None,
exp_time: DefaultArgType.ExposureTime = 0,
frames_per_trigger: DefaultArgType.FramesPerTrigger = 1,
readout_time: DefaultArgType.ReadoutTime = 0,
**kwargs,
# fmt: on
):
"""
LamNI Fermat Scan
Args:
fovx (float): FOV in the piezo plane (i.e. piezo range) along the x-axis. Max 80 um
fovy (float): FOV in the piezo plane (i.e. piezo range) along the y-axis. Max 80 um
step (float): Step size
shift_x (float): Extra shift in x. The shift is directly applied to the scan. It will not be auto-rotated.
shift_y (float): Extra shift in y. The shift is directly applied to the scan. It will not be auto-rotated.
center_x (float): Center position in x at 0 deg. This shift is rotated using the geometry of LamNI. It is determined by the first 'click' in the x-ray eye alignment procedure.
center_y (float): Center position in y at 0 deg. This shift is rotated using the geometry of LamNI. It is determined by the first 'click' in the x-ray eye alignment procedure.
angle (float): Rotation angle (will rotate first)
stitch_x (float): Shift scan to adjacent stitch region
stitch_y (float): Shift scan to adjacent stitch region
fov_circular (float): Generate a circular field of view in the sample plane. This is an additional cropping to fov_size
stitch_overlap (float): Overlap of the stitched regions
exp_time (float): Exposure time in seconds
corridor_size (float | None): Corridor size for the corridor optimization. If None, the corridor size will be estimated.
frames_per_trigger (int): Number of frames per trigger for devices that support configurable frame counts per trigger.
readout_time (float): Configuration for devices that support configurable readout times.
Returns:
ScanReport
"""
super().__init__(**kwargs)
self.components = LamNIComponents(self)
self._baseline_readout_status = None
self.fovx = fovx
self.fovy = fovy
self.step = step
self.shift_x = shift_x
self.shift_y = shift_y
self.center_x = center_x
self.center_y = center_y
self.angle = angle
self.stitch_x = stitch_x
self.stitch_y = stitch_y
self.fov_circular = fov_circular
self.stitch_overlap = stitch_overlap
self.exp_time = exp_time
self.frames_per_trigger = frames_per_trigger
self.readout_time = readout_time
self.corridor_size = corridor_size
self.update_scan_info(
exp_time=exp_time, frames_per_trigger=frames_per_trigger, readout_time=readout_time
)
@scan_hook
def prepare_scan(self):
"""
Prepare the scan. This can include any steps that need to be executed
before the scan is opened, such as preparing the positions (if not done already)
or setting up the devices.
"""
positions = self.get_lamni_fermat_spiral_pos(
-np.abs(self.fovx / 2),
np.abs(self.fovx / 2),
-np.abs(self.fovy / 2),
np.abs(self.fovy / 2),
step=self.step,
spiral_type=0,
center=False,
)
if len(positions) < 20:
raise ScanAbortion(f"The number positions must exceed 20. Currently: {len(positions)}.")
self.positions = self.components.optimize_trajectory(
positions, optimization_type="corridor", corridor_size=self.corridor_size
)
self.update_scan_info(num_points=len(self.positions), positions=self.positions)
self.prepare_setup()
self.actions.add_scan_report_instruction_device_progress(device="rt_positions")
self._baseline_readout_status = self.actions.read_baseline_devices(wait=False)
@scan_hook
def open_scan(self):
"""
Open the scan.
This step must call self.actions.open_scan() to ensure that a new scan is
opened. Make sure to prepare the scan metadata before, either in
prepare_scan() or in open_scan() itself and call self.update_scan_info(...)
to update the scan metadata if needed.
"""
self.actions.open_scan()
@scan_hook
def stage(self):
"""
Stage the devices for the upcoming scan. The stage logic is typically
implemented on the device itself (i.e. by the device's stage method).
However, if there are any additional steps that need to be executed before
staging the devices, they can be implemented here.
"""
self.actions.stage_all_devices()
@scan_hook
def pre_scan(self):
"""
Pre-scan steps to be executed before the main scan logic.
This is typically the last chance to prepare the devices before the core scan
logic is executed. For example, this is a good place to initialize time-criticial
devices, e.g. devices that have a short timeout.
The pre-scan logic is typically implemented on the device itself.
"""
self.actions.pre_scan_all_devices()
@scan_hook
def scan_core(self):
"""
Core scan logic to be executed during the scan.
This is where the main scan logic should be implemented.
"""
self.actions.kickoff(device="rt_positions")
status = self.actions.complete(device="rt_positions", wait=False)
while not status.done:
self.at_each_point()
time.sleep(1)
@scan_hook
def at_each_point(self):
"""
Logic to be executed at each acquisition point during the scan.
"""
self.actions.read_monitored_devices()
@scan_hook
def post_scan(self):
"""
Post-scan steps to be executed after the main scan logic.
"""
self.actions.complete_all_devices()
@scan_hook
def unstage(self):
"""Unstage the scan by executing post-scan steps."""
self.actions.unstage_all_devices()
@scan_hook
def close_scan(self):
"""Close the scan."""
if self._baseline_readout_status is not None:
self._baseline_readout_status.wait()
self.actions.close_scan()
self.actions.check_for_unchecked_statuses()
@scan_hook
def on_exception(self, exception: Exception):
"""
Handle exceptions that occur during the scan.
This is a good place to implement any cleanup logic that needs to be executed in case of an exception,
such as returning the devices to a safe state or moving the motors back to their starting position.
"""
#######################################################
######### Helper methods for the scan logic ###########
#######################################################
def get_lamni_fermat_spiral_pos(
self,
m1_start: float,
m1_stop: float,
m2_start: float,
m2_stop: float,
step: float = 1,
spiral_type: float = 0,
center: bool = False,
) -> np.ndarray:
"""Generate positions for a Fermat spiral scan pattern for LamNI.
Args:
m1_start (float): start position motor 1
m1_stop (float): end position motor 1
m2_start (float): start position motor 2
m2_stop (float): end position motor 2
step (float, optional): Step size. Defaults to 1.
spiral_type (float, optional): Angular offset in radians that determines the shape of the spiral.
A spiral with spiral_type=2 is the same as spiral_type=0. Defaults to 0.
center (bool, optional): Add a center point. Defaults to False.
Returns:
np.ndarray: Array of positions for the Fermat spiral scan.
"""
positions = []
phi = 2 * np.pi * ((1 + np.sqrt(5)) / 2.0) + spiral_type * np.pi
start = int(not center)
length_axis1 = np.abs(m1_stop - m1_start)
length_axis2 = np.abs(m2_stop - m2_start)
n_max = int(length_axis1 * length_axis2 * 3.2 / step / step)
total_shift_x, total_shift_y = self._compute_total_shift()
for ii in range(start, n_max):
radius = step * 0.57 * np.sqrt(ii)
x = radius * np.sin(ii * phi)
y = radius * np.cos(ii * phi)
if self._lamni_check_pos_in_fov_range_and_circ_fov(x, y):
positions.extend([(x + total_shift_x * 1000, y + total_shift_y * 1000)])
# for testing we just shift by center_i and prepare also the setup to center_i
return np.array(positions)
def _compute_total_shift(self) -> tuple[float, float]:
_shfitx, _shfity = self.components.lamni_compute_scan_center(
self.center_x, self.center_y, self.angle
)
x_stitch_shift, y_stitch_shift = self._lamni_compute_stitch_center(
self.stitch_x, self.stitch_y, self.angle
)
logger.info(
f"Total shift [mm] {_shfitx+x_stitch_shift/1000+self.shift_x},"
f" {_shfity+y_stitch_shift/1000+self.shift_y}"
)
return (
_shfitx + x_stitch_shift / 1000 + self.shift_x,
_shfity + y_stitch_shift / 1000 + self.shift_y,
)
def _lamni_compute_stitch_center(
self, xcount: float, ycount: float, angle_deg: float
) -> tuple[float, float]:
"""
Compute the stitch center in the stage coordinates based on the provided stitch counts and the rotation angle.
Args:
xcount (float): Stitch count in x direction
ycount (float): Stitch count in y direction
angle_deg (float): Rotation angle in degrees
Returns:
tuple: (shift_x, shift_y) in mm to be applied to the scan center in the stage coordinates
"""
alpha = angle_deg / 180 * np.pi
stage_x = xcount * (self.fovx - self.stitch_overlap)
stage_y = ycount * (self.fovy - self.stitch_overlap)
x_rot = np.cos(alpha) * stage_x - np.sin(alpha) * stage_y
y_rot = np.sin(alpha) * stage_x + np.cos(alpha) * stage_y
return self.components.lamni_from_stage_coordinates(x_rot, y_rot)
def _lamni_check_pos_in_fov_range_and_circ_fov(self, x: float, y: float) -> bool:
"""
Check if the given position is within the FOV range and circular FOV (if specified).
Args:
x (float): x position in lamni coordinates (mm)
y (float): y position in lamni coordinates (mm)
Returns:
bool: True if the position is within the FOV range and circular FOV, False otherwise.
"""
# this function checks if positions are reachable in a scan
# these x y intererometer positions are not shifted to the scan center
# so its purpose is to see if the position is reachable by the
# rotated piezo stage. For a scan these positions have to be shifted to
# the current scan center before starting the scan
stage_x, stage_y = self.components.lamni_to_stage_coordinates(x, y)
stage_x_with_stitch, stage_y_with_stitch = self._lamni_compute_stitch_center(
self.stitch_x, self.stitch_y, self.angle
)
stage_x_with_stitch, stage_y_with_stitch = self.components.lamni_to_stage_coordinates(
stage_x_with_stitch, stage_y_with_stitch
)
# piezo stage is currently rotated to stage_angle_deg in degrees
# rotate positions to the piezo stage system
alpha = (self.angle - 300 + 30.5) / 180 * np.pi
stage_x_rot = np.cos(alpha) * stage_x + np.sin(alpha) * stage_y
stage_y_rot = -np.sin(alpha) * stage_x + np.cos(alpha) * stage_y
stage_x_rot_with_stitch = (
np.cos(alpha) * stage_x_with_stitch + np.sin(alpha) * stage_y_with_stitch
)
stage_y_rot_with_stitch = (
-np.sin(alpha) * stage_x_with_stitch + np.cos(alpha) * stage_y_with_stitch
)
return (
np.abs(stage_x_rot) <= (self.fovy / 2)
and np.abs(stage_y_rot) <= (self.fovx / 2)
and (
self.fov_circular == 0
or (
np.power((stage_x_rot_with_stitch + stage_x_rot), 2)
+ np.power((stage_y_rot_with_stitch + stage_y_rot), 2)
)
<= pow((self.fov_circular / 2), 2)
)
)
def lamni_rotation(self, angle: float):
"""
Rotate LamNI to the specified angle. The rotation is only performed
if the current setpoint of the rotation stage is different from the requested angle.
Args:
angle (float): Rotation angle in degrees
"""
# get last setpoint (cannot be based on pos get because they will deviate slightly)
lsamrot_current_setpoint = self.dev.lsamrot.user_setpoint.get()
if angle == lsamrot_current_setpoint:
logger.info("No rotation required")
return
logger.info("Rotating to requested angle")
self.actions.add_scan_report_instruction_readback(
devices=["lsamrot"], start=[lsamrot_current_setpoint], stop=[angle]
)
self.dev.lsamrot.set(angle)
def prepare_setup(self):
"""
Prepare the setup for the scan:
- Clear the trajectory generator of the RT controller to remove any previous positions.
- Rotate LamNI to the requested angle.
- Compute the total shift based on the center, stitch, and user-defined shifts, and
move to the new scan center using the interferometer feedback.
- Transfer the positions to the RT controller by adding them to the trajectory generator.
"""
self.dev.rtx.controller.clear_trajectory_generator()
self.lamni_rotation(self.angle)
total_shift_x, total_shift_y = self._compute_total_shift()
self.components.lamni_new_scan_center_interferometer(total_shift_x, total_shift_y)
# Transfer the positions to the RT controller
self.dev.rtx.controller.add_pos_to_scan(self.positions.tolist())
@@ -0,0 +1,119 @@
"""
LamNI scan to move the interferometer to the computed scan center based on the provided shift and angle.
Scan procedure:
- prepare_scan
- open_scan
- stage
- pre_scan
- scan_core
- at_each_point (optionally called by scan_core)
- post_scan
- unstage
- close_scan
- on_exception (called if any exception is raised during the scan)
"""
from __future__ import annotations
from typing import Annotated
from bec_lib.scan_args import ScanArgument, Units
from bec_server.scan_server.scans.scan_base import ScanBase
from bec_server.scan_server.scans.scan_modifier import scan_hook
from csaxs_bec.scans.lamni_components import LamNIComponents
class LamniMoveToScanCenter(ScanBase):
# Scan Type: Hardware triggered or software triggered?
# If the main trigger and readout logic is done within the at_each_point method in scan_core, choose SOFTWARE_TRIGGERED.
# If the main trigger and readout logic is implemented on a device that is simply kicked off in this scan, choose HARDWARE_TRIGGERED.
# This primarily serves as information for devices: The device may need to react differently if a software trigger is expected
# for every point.
scan_type = None
is_scan = False
# Scan name: This is the name of the scan, e.g. "line_scan". This is used for display purposes and to identify the scan type in user interfaces.
# Choose a descriptive name that does not conflict with existing scan names.
# It must be a valid Python identifier, that is, it can only contain letters, numbers, and underscores, and must not start with a number.
scan_name = "lamni_move_to_scan_center"
gui_config = {"Scan Parameters": ["shift_x", "shift_y", "angle"]}
def __init__(
self,
shift_x: Annotated[
float, ScanArgument(display_name="Shift X", description="Shift x.", units=Units.mm)
],
shift_y: Annotated[
float, ScanArgument(display_name="Shift Y", description="Shift y.", units=Units.mm)
],
angle: Annotated[
float, ScanArgument(display_name="Angle", description="Angle.", units=Units.deg)
],
**kwargs,
):
"""
LamNI scan to move the interferometer to the computed scan center based on the provided shift and angle.
Args:
shift_x (float): Shift x.
shift_y (float): Shift y.
angle (float): Angle.
Returns:
ScanReport
"""
super().__init__(**kwargs)
self.components = LamNIComponents(self)
self._baseline_readout_status = None
self.shift_x = shift_x
self.shift_y = shift_y
self.angle = angle
self.update_scan_info()
@scan_hook
def prepare_scan(self): ...
@scan_hook
def open_scan(self): ...
@scan_hook
def stage(self): ...
@scan_hook
def pre_scan(self): ...
@scan_hook
def scan_core(self):
"""
Core scan logic to be executed during the scan.
This is where the main scan logic should be implemented.
"""
center_x, center_y = self.components.lamni_compute_scan_center(
self.shift_x, self.shift_y, self.angle
)
self.components.lamni_new_scan_center_interferometer(center_x, center_y)
@scan_hook
def at_each_point(self): ...
@scan_hook
def post_scan(self): ...
@scan_hook
def unstage(self): ...
@scan_hook
def close_scan(self): ...
@scan_hook
def on_exception(self, exception: Exception): ...
#######################################################
######### Helper methods for the scan logic ###########
#######################################################
# Implement scan-specific helper methods below.