wip grid scan still needs some work. in particular how to compute the positions that should be included.
CI for csaxs_bec / test (push) Successful in 30m55s

This commit is contained in:
2026-04-16 17:02:52 +02:00
parent e514fb85b0
commit 0c1cd07906
+83 -99
View File
@@ -1,30 +1,10 @@
"""
Continuous line scan implemented implemented for any motor with velocity and acceleration attributes.
Optionally, it supports a check for a base velocity attribute for user_motors. The scan will move the
motor to a premove position before starting the scan, then move continuously across the line while
detectors are triggered by the delay generator. The motor velocity is set such that it reaches the
end position at the same time as the last acquisition point is finished. The acceleration time is calculated
based on the fraction of target velocity to maximum velocity.
We assume that the motor is configured with its maximum velocity and acceleration time needed to reach this
speed. The scan will then scale down the velocity and acceleration time proportionally to reach the target velocity.
Continuous grid scan implemented for motors with velocity and acceleration attributes.
Having a base_velocity available helps to ensure that the motor is properly configured for the scan. We check for
the signal 'base_velocity' if available and consider this for the calculation.
After the scan, velocity and acceleration time will be restored. This also happens if the scan is aborted or an exception
occurs.
Scan procedure:
- prepare_scan
- open_scan
- stage
- pre_scan
- scan_core
- at_each_point (optionally called by scan_core)
- post_scan
- unstage
- close_scan
- on_exception (called if any exception is raised during the scan)
The slow axis is stepped between lines while the fast axis is scanned continuously
in the same direction for each line. Between lines, the fast motor velocity and
acceleration are restored to their original values and the slow-axis step plus
fast-axis return move are executed together.
"""
from __future__ import annotations
@@ -37,6 +17,7 @@ from bec_lib.device import DeviceBase
from bec_lib.logger import bec_logger
from bec_lib.scan_args import ScanArgument, Units
from bec_server.scan_server.errors import ScanAbortion
from bec_server.scan_server.scans import position_generators
from bec_server.scan_server.scans.scan_modifier import scan_hook
from bec_server.scan_server.scans.scans_v4 import ScanBase, ScanType
@@ -46,16 +27,12 @@ logger = bec_logger.logger
class ContGridScanCSAXS(ScanBase):
scan_type = ScanType.HARDWARE_TRIGGERED
scan_name = "cont_grid_scan_csaxs"
required_kwargs = ["steps", "relative"]
required_kwargs = ["relative"]
gui_config = {
"Devices": ["fast_motor"],
"Movement Parameters": ["start", "stop", "steps", "relative"],
"Acquisition Parameters": [
"exp_time",
"readout_time",
"frames_per_trigger",
"detector_delay",
],
"Devices": ["slow_motor", "fast_motor"],
"Fast Axis": ["start", "stop", "step_size"],
"Slow Axis": ["slow_start", "slow_stop", "slow_step_size"],
"Acquisition Parameters": ["exp_time", "detector_delay", "relative"],
}
def __init__(
@@ -70,6 +47,16 @@ class ContGridScanCSAXS(ScanBase):
step_size: Annotated[
float, ScanArgument(display_name="Step Size", reference_units="fast_motor", gt=0)
],
slow_motor: DeviceBase,
slow_start: Annotated[
float, ScanArgument(display_name="Slow Start Position", reference_units="slow_motor")
],
slow_stop: Annotated[
float, ScanArgument(display_name="Slow Stop Position", reference_units="slow_motor")
],
slow_step_size: Annotated[
float, ScanArgument(display_name="Slow Step Size", reference_units="slow_motor", gt=0)
],
exp_time: Annotated[float, ScanArgument(display_name="Exposure Time", units=Units.s, gt=0)],
detector_delay: Annotated[
float, ScanArgument(display_name="Detector Delay", units=Units.s, ge=0)
@@ -78,29 +65,10 @@ class ContGridScanCSAXS(ScanBase):
always_scan_in_positive_direction: bool = False,
**kwargs,
):
"""
Continuous line scan.
Args:
start: nominal line start position of the fast axis
stop: nominal line end position of the fast axis
step_size: step size for the continuous line scan
exp_time: exposure time in seconds
readout_time: detector readout time in seconds
frames_per_trigger: number of frames per trigger
fast_motor: OWIS motor used as fast axis
relative: if True, interpret start and stop relative to the current motor position
detector_delay: delay between motor reaching scan speed and detector burst start
shutter_additional_width: extra time the fast shutter stays open per line
add_pre_move_time: additional pre-move time translated into distance
detector_delay_generator: DDG triggering the detector chain
monitor_delay_generator: DDG providing monitor/MCS timing
shutter_delay_generator: DDG controlling the shutter pulse
progress_device: device exposing line progress
"""
super().__init__(**kwargs)
self.fast_motor = self.dev[fast_motor] if isinstance(fast_motor, str) else fast_motor
self.slow_motor = self.dev[slow_motor] if isinstance(slow_motor, str) else slow_motor
self.ddg = self.dev.get("ddg", None)
if self.ddg is None:
raise ScanAbortion(
@@ -112,27 +80,28 @@ class ContGridScanCSAXS(ScanBase):
"Did not find MCS device named 'mcs'. Monitoring of acquisition progress might not work properly."
)
# Relevant scan parameters
self.motors = [self.fast_motor]
self.motors = [self.slow_motor, self.fast_motor]
self.start = start
self.stop = stop
# Flip start and stop if configured to always scan in positive direction
if always_scan_in_positive_direction and self.stop < self.start:
self.start, self.stop = self.stop, self.start
self.step_size = step_size
self.slow_start = slow_start
self.slow_stop = slow_stop
self.slow_step_size = slow_step_size
self.exp_time = exp_time
self.relative = relative
self.detector_delay = detector_delay
# Motor config parameters
self._original_motor_values = {"velocity": None, "acceleration": None}
self.target_velocity = None
self.base_velocity = None
self.acc_time = None
self.premove_distance = None
# DDG parameter
self.shutter_open_delay = None
self.line_start = None
self.line_stop = None
self.num_lines = None
@scan_hook
def prepare_scan(self):
@@ -140,35 +109,47 @@ class ContGridScanCSAXS(ScanBase):
raise ScanAbortion(
f"Stop {self.stop} and start {self.start} positions are too close for the given step size {self.step_size}."
)
frames_per_trigger = int(np.ceil(np.abs(self.stop - self.start) / self.step_size))
if np.isclose(self.slow_stop, self.slow_start, atol=self.slow_step_size):
raise ScanAbortion(
f"Slow stop {self.slow_stop} and slow start {self.slow_start} positions are too close for the given step size {self.slow_step_size}."
)
# Handle positions and limits first to catch potential issues before everything else.
self.positions = self._prepare_positions(
start=self.start, stop=self.stop, steps=frames_per_trigger
frames_per_trigger = int(np.ceil(np.abs(self.stop - self.start) / self.step_size))
self.num_lines = int(
np.ceil(np.abs(self.slow_stop - self.slow_start) / self.slow_step_size)
)
self.positions = position_generators.nd_grid_positions(
[
(self.slow_start, self.slow_stop, self.num_lines),
(self.start, self.stop, frames_per_trigger),
],
snaked=False,
)
if self.relative:
self.start_positions = self.components.get_start_positions(self.motors)
self.positions += self.start_positions
self.line_start = self.positions[0, 1]
self.line_stop = self.positions[frames_per_trigger - 1, 1]
self.actions.set_device_readout_priority(self.motors, priority="monitored")
# Compute target velocity, acceleration time and premove distance based on the scan parameters and motor capabilities
# We also need to fetch the delay between the shutter opening and the triggers to start by the delay generator
# As this time is very small (2e-3), we add it to the premove distance. But technically it could also be
# handled by adjusting the sleep after sending of the motor move command.
self.shutter_open_delay = self.ddg.get_shutter_open_delay()
self.premove_distance = self._compute_premove_distance(self.shutter_open_delay)
# Check +/- premove distance around start and stop
position_range = self._prepare_positions(
start=self.start - self.premove_distance,
stop=self.stop + self.premove_distance,
fast_position_range = position_generators.line_scan_positions(
axes=[
(self.line_start - self.premove_distance, self.line_stop + self.premove_distance)
],
steps=2,
include_endpoint=True,
)
self.components.check_limits(self.motors, position_range)
# Setup progress device if available
self.components.check_limits([self.fast_motor], fast_position_range)
self.components.check_limits([self.slow_motor], self.positions[:, [0]])
if self.mcs is not None:
self.actions.add_scan_report_instruction_device_progress(self.mcs)
else:
@@ -176,10 +157,9 @@ class ContGridScanCSAXS(ScanBase):
"MCS device not found. Progress reporting will not work for this scan. Please add a device named 'mcs' to enable progress reporting."
)
# Set scan info paramters
self.update_scan_info(
positions=self.positions,
num_points=1,
num_points=len(self.positions),
num_monitored_readouts=0,
frames_per_trigger=frames_per_trigger,
exp_time=self.exp_time,
@@ -187,8 +167,11 @@ class ContGridScanCSAXS(ScanBase):
run_on_exception_hook=True,
)
self._baseline_readout_status = self.actions.read_baseline_devices(wait=False)
self.actions.add_scan_report_instruction_scan_progress(
points=self.num_lines, show_table=False
)
self._premove_motor_status = self.actions.set(
self.fast_motor, self.positions[0, 0] - self.premove_distance, wait=False
self.motors, [self.positions[0, 0], self.line_start - self.premove_distance], wait=False
)
@scan_hook
@@ -206,11 +189,27 @@ class ContGridScanCSAXS(ScanBase):
@scan_hook
def scan_core(self):
self.at_each_point()
line_positions = self.positions.reshape(self.num_lines, -1, 2)
for line_index, _line_positions_for_step in enumerate(line_positions):
self.components.move_and_wait(
self.motors,
[line_positions[line_index][0, 0], self.line_start - self.premove_distance],
)
self.at_each_point()
self._restore_motor_properties()
self.components.move_and_wait(
self.motors,
[line_positions[line_index][0, 0], self.line_start - self.premove_distance],
)
reset_and_step_status = self.actions.set(
self.motors,
[line_positions[line_index + 1][0, 0], self.line_start - self.premove_distance],
wait=False,
)
reset_and_step_status.wait()
@scan_hook
def at_each_point(self):
# Now we set the motor velocity and acceleration for the scan
st = self.fast_motor.velocity.set(self.target_velocity)
st2 = self.fast_motor.acceleration.set(self.acc_time)
st.wait()
@@ -222,17 +221,17 @@ class ContGridScanCSAXS(ScanBase):
trigger_status = self.ddg.trigger()
while not move_status.done:
self.actions.read_monitored_devices(wait=True)
try: # Readout at 2 Hz
move_status.wait(timeout=0.5) # only sleep as long as the move is not finished
try:
move_status.wait(timeout=0.5)
except TimeoutError:
continue
try:
trigger_status.wait(timeout=2)
except TimeoutError:
except TimeoutError as exc:
raise ScanAbortion(
f"Status for delay generator trigger {self.ddg} did not resolve after 2 seconds. "
)
) from exc
@scan_hook
def post_scan(self):
@@ -255,20 +254,11 @@ class ContGridScanCSAXS(ScanBase):
@scan_hook
def on_exception(self, exception: Exception):
del exception
self._restore_motor_properties()
if self.relative:
self.components.move_and_wait(self.motors, self.start_positions)
##########################
### Internal helper methods
##########################
def _prepare_positions(
self, start: float, stop: float, steps: int, include_endpoint: bool = False
) -> np.ndarray:
axis_positions = np.linspace(start, stop, steps, endpoint=include_endpoint, dtype=float)
return np.column_stack(axis_positions)
def _load_motor_properties(self) -> tuple[float, float, float]:
if not hasattr(self.fast_motor, "velocity"):
raise ScanAbortion(f"Motor {self.fast_motor} does not have a velocity attribute.")
@@ -288,10 +278,8 @@ class ContGridScanCSAXS(ScanBase):
return vel, acc, base_vel
def _compute_continous_motion_params(self) -> tuple[float, float]:
# Compute target velocity based on step size and exposure time
self.target_velocity = self.step_size / self.exp_time
# Safeguard high and low velocity limits
if self.target_velocity > self._original_motor_values["velocity"]:
raise ScanAbortion(
f"Requested velocity of {self.target_velocity} exceeds maximum velocity {self._original_motor_values['velocity']} of motor {self.fast_motor.name}."
@@ -301,7 +289,6 @@ class ContGridScanCSAXS(ScanBase):
f"Requested velocity of {self.target_velocity} is below base velocity {self.base_velocity}."
)
# Take base velocity into account.
acc_time = (
(self.target_velocity - self.base_velocity)
/ (self._original_motor_values["velocity"] - self.base_velocity)
@@ -311,18 +298,15 @@ class ContGridScanCSAXS(ScanBase):
return self.target_velocity, acc_time
def _compute_premove_distance(self, additional_distance: float) -> float:
# Load old motor parameters
vel, acc, base_vel = self._load_motor_properties()
self._original_motor_values["velocity"] = vel
self._original_motor_values["acceleration"] = acc
self.base_velocity = base_vel
# Compute target velocity and acceleration time for the continuous motion
target_vel, acc_time = self._compute_continous_motion_params()
self.target_velocity = target_vel
self.acc_time = acc_time
# Compute the premove distance needed to ensure the motor reaches the target velocity
return (
0.5 * (self.target_velocity + self.base_velocity) * self.acc_time
+ additional_distance * self.target_velocity