refactor: add first attempt at new scan interface

This commit is contained in:
2026-04-16 14:31:19 +02:00
parent a3a865715f
commit d7cf0b38af
3 changed files with 690 additions and 0 deletions
+2
View File
@@ -0,0 +1,2 @@
from .cont_grid_scan import ContGridScanCSAXS
from .cont_line_scan import ContLineScanCSAXS
+344
View File
@@ -0,0 +1,344 @@
"""
Continuous line scan implemented implemented for any motor with velocity and acceleration attributes.
Optionally, it supports a check for a base velocity attribute for user_motors. The scan will move the
motor to a premove position before starting the scan, then move continuously across the line while
detectors are triggered by the delay generator. The motor velocity is set such that it reaches the
end position at the same time as the last acquisition point is finished. The acceleration time is calculated
based on the fraction of target velocity to maximum velocity.
We assume that the motor is configured with its maximum velocity and acceleration time needed to reach this
speed. The scan will then scale down the velocity and acceleration time proportionally to reach the target velocity.
Having a base_velocity available helps to ensure that the motor is properly configured for the scan. We check for
the signal 'base_velocity' if available and consider this for the calculation.
After the scan, velocity and acceleration time will be restored. This also happens if the scan is aborted or an exception
occurs.
Scan procedure:
- prepare_scan
- open_scan
- stage
- pre_scan
- scan_core
- at_each_point (optionally called by scan_core)
- post_scan
- unstage
- close_scan
- on_exception (called if any exception is raised during the scan)
"""
from __future__ import annotations
import time
from typing import Annotated
import numpy as np
from bec_lib.device import DeviceBase
from bec_lib.logger import bec_logger
from bec_lib.scan_args import ScanArgument, Units
from bec_server.scan_server.errors import ScanAbortion
from bec_server.scan_server.scans.scan_modifier import scan_hook
from bec_server.scan_server.scans.scans_v4 import ScanBase, ScanType
logger = bec_logger.logger
class ContGridScanCSAXS(ScanBase):
scan_type = ScanType.HARDWARE_TRIGGERED
scan_name = "cont_grid_scan_csaxs"
required_kwargs = ["steps", "relative"]
gui_config = {
"Devices": ["fast_motor"],
"Movement Parameters": ["start", "stop", "steps", "relative"],
"Acquisition Parameters": [
"exp_time",
"readout_time",
"frames_per_trigger",
"detector_delay",
],
}
def __init__(
self,
fast_motor: DeviceBase,
start: Annotated[
float, ScanArgument(display_name="Start Position", reference_units="fast_motor")
],
stop: Annotated[
float, ScanArgument(display_name="Stop Position", reference_units="fast_motor")
],
step_size: Annotated[
float, ScanArgument(display_name="Step Size", reference_units="fast_motor", gt=0)
],
exp_time: Annotated[float, ScanArgument(display_name="Exposure Time", units=Units.s, gt=0)],
detector_delay: Annotated[
float, ScanArgument(display_name="Detector Delay", units=Units.s, ge=0)
] = 0.01,
relative: bool = False,
always_scan_in_positive_direction: bool = False,
**kwargs,
):
"""
Continuous line scan.
Args:
start: nominal line start position of the fast axis
stop: nominal line end position of the fast axis
step_size: step size for the continuous line scan
exp_time: exposure time in seconds
readout_time: detector readout time in seconds
frames_per_trigger: number of frames per trigger
fast_motor: OWIS motor used as fast axis
relative: if True, interpret start and stop relative to the current motor position
detector_delay: delay between motor reaching scan speed and detector burst start
shutter_additional_width: extra time the fast shutter stays open per line
add_pre_move_time: additional pre-move time translated into distance
detector_delay_generator: DDG triggering the detector chain
monitor_delay_generator: DDG providing monitor/MCS timing
shutter_delay_generator: DDG controlling the shutter pulse
progress_device: device exposing line progress
"""
super().__init__(**kwargs)
self.fast_motor = self.dev[fast_motor] if isinstance(fast_motor, str) else fast_motor
self.ddg = self.dev.get("ddg", None)
if self.ddg is None:
raise ScanAbortion(
"Main delay generator named 'ddg1' device is required for the continuous line scan."
)
self.mcs = self.dev.get("mcs", None)
if self.mcs is None:
logger.warning(
"Did not find MCS device named 'mcs'. Monitoring of acquisition progress might not work properly."
)
# Relevant scan parameters
self.motors = [self.fast_motor]
self.start = start
self.stop = stop
# Flip start and stop if configured to always scan in positive direction
if always_scan_in_positive_direction and self.stop < self.start:
self.start, self.stop = self.stop, self.start
self.step_size = step_size
self.exp_time = exp_time
self.relative = relative
self.detector_delay = detector_delay
# Motor config parameters
self._original_motor_values = {"velocity": None, "acceleration": None}
self.target_velocity = None
self.base_velocity = None
self.acc_time = None
self.premove_distance = None
# DDG parameter
self.shutter_open_delay = None
@scan_hook
def prepare_scan(self):
if np.isclose(self.stop, self.start, atol=self.step_size):
raise ScanAbortion(
f"Stop {self.stop} and start {self.start} positions are too close for the given step size {self.step_size}."
)
frames_per_trigger = int(np.ceil(np.abs(self.stop - self.start) / self.step_size))
# Handle positions and limits first to catch potential issues before everything else.
self.positions = self._prepare_positions(
start=self.start, stop=self.stop, steps=frames_per_trigger
)
if self.relative:
self.start_positions = self.components.get_start_positions(self.motors)
self.positions += self.start_positions
self.actions.set_device_readout_priority(self.motors, priority="monitored")
# Compute target velocity, acceleration time and premove distance based on the scan parameters and motor capabilities
# We also need to fetch the delay between the shutter opening and the triggers to start by the delay generator
# As this time is very small (2e-3), we add it to the premove distance. But technically it could also be
# handled by adjusting the sleep after sending of the motor move command.
self.shutter_open_delay = self.ddg.get_shutter_open_delay()
self.premove_distance = self._compute_premove_distance(self.shutter_open_delay)
# Check +/- premove distance around start and stop
position_range = self._prepare_positions(
start=self.start - self.premove_distance,
stop=self.stop + self.premove_distance,
steps=2,
include_endpoint=True,
)
self.components.check_limits(self.motors, position_range)
# Setup progress device if available
if self.mcs is not None:
self.actions.add_scan_report_instruction_device_progress(self.mcs)
else:
logger.warning(
"MCS device not found. Progress reporting will not work for this scan. Please add a device named 'mcs' to enable progress reporting."
)
# Set scan info paramters
self.update_scan_info(
positions=self.positions,
num_points=1,
num_monitored_readouts=0,
frames_per_trigger=frames_per_trigger,
exp_time=self.exp_time,
relative=self.relative,
run_on_exception_hook=True,
)
self._baseline_readout_status = self.actions.read_baseline_devices(wait=False)
self._premove_motor_status = self.actions.set(
self.fast_motor, self.positions[0, 0] - self.premove_distance, wait=False
)
@scan_hook
def open_scan(self):
self.actions.open_scan()
@scan_hook
def stage(self):
self.actions.stage_all_devices()
@scan_hook
def pre_scan(self):
self._premove_motor_status.wait()
self.actions.pre_scan()
@scan_hook
def scan_core(self):
self.at_each_point()
@scan_hook
def at_each_point(self):
# Now we set the motor velocity and acceleration for the scan
st = self.fast_motor.velocity.set(self.target_velocity)
st2 = self.fast_motor.acceleration.set(self.acc_time)
st.wait()
st2.wait()
move_status = self.actions.set(
self.fast_motor, self.stop + self.premove_distance, wait=False
)
time.sleep(self.acc_time)
trigger_status = self.ddg.trigger()
while not move_status.done:
self.actions.read_monitored_devices(wait=True)
try: # Readout at 2 Hz
move_status.wait(timeout=0.5) # only sleep as long as the move is not finished
except TimeoutError:
continue
try:
trigger_status.wait(timeout=2)
except TimeoutError:
raise ScanAbortion(
f"Status for delay generator trigger {self.ddg} did not resolve after 2 seconds. "
)
@scan_hook
def post_scan(self):
self._restore_motor_properties()
status = self.actions.complete_all_devices(wait=False)
if self.relative:
self.components.move_and_wait(self.motors, self.start_positions)
status.wait()
@scan_hook
def unstage(self):
self.actions.unstage_all_devices()
@scan_hook
def close_scan(self):
if self._baseline_readout_status is not None:
self._baseline_readout_status.wait()
self.actions.close_scan()
self.actions.check_for_unchecked_statuses()
@scan_hook
def on_exception(self, exception: Exception):
self._restore_motor_properties()
if self.relative:
self.components.move_and_wait(self.motors, self.start_positions)
##########################
### Internal helper methods
##########################
def _prepare_positions(
self, start: float, stop: float, steps: int, include_endpoint: bool = False
) -> np.ndarray:
axis_positions = np.linspace(start, stop, steps, endpoint=include_endpoint, dtype=float)
return np.column_stack(axis_positions)
def _load_motor_properties(self) -> tuple[float, float, float]:
if not hasattr(self.fast_motor, "velocity"):
raise ScanAbortion(f"Motor {self.fast_motor} does not have a velocity attribute.")
if not hasattr(self.fast_motor, "acceleration"):
raise ScanAbortion(f"Motor {self.fast_motor} does not have an acceleration attribute.")
vel = self.fast_motor.velocity.get()
acc = self.fast_motor.acceleration.get()
self._original_motor_values["velocity"] = vel
self._original_motor_values["acceleration"] = acc
if self._has_base_velocity():
base_vel = self.fast_motor.base_velocity.get()
else:
base_vel = 0.0
return vel, acc, base_vel
def _compute_continous_motion_params(self) -> tuple[float, float]:
# Compute target velocity based on step size and exposure time
self.target_velocity = self.step_size / self.exp_time
# Safeguard high and low velocity limits
if self.target_velocity > self._original_motor_values["velocity"]:
raise ScanAbortion(
f"Requested velocity of {self.target_velocity} exceeds maximum velocity {self._original_motor_values['velocity']} of motor {self.fast_motor.name}."
)
if self._has_base_velocity() and self.target_velocity < self.base_velocity:
raise ScanAbortion(
f"Requested velocity of {self.target_velocity} is below base velocity {self.base_velocity}."
)
# Take base velocity into account.
acc_time = (
(self.target_velocity - self.base_velocity)
/ (self._original_motor_values["velocity"] - self.base_velocity)
* self._original_motor_values["acceleration"]
)
return self.target_velocity, acc_time
def _compute_premove_distance(self, additional_distance: float) -> float:
# Load old motor parameters
vel, acc, base_vel = self._load_motor_properties()
self._original_motor_values["velocity"] = vel
self._original_motor_values["acceleration"] = acc
self.base_velocity = base_vel
# Compute target velocity and acceleration time for the continuous motion
target_vel, acc_time = self._compute_continous_motion_params()
self.target_velocity = target_vel
self.acc_time = acc_time
# Compute the premove distance needed to ensure the motor reaches the target velocity
return (
0.5 * (self.target_velocity + self.base_velocity) * self.acc_time
+ additional_distance * self.target_velocity
)
def _set_scan_velocity(self):
self.fast_motor.velocity.put(self.target_velocity)
self.fast_motor.acceleration.put(self.acc_time)
def _restore_motor_properties(self):
vel = self._original_motor_values["velocity"]
acc = self._original_motor_values["acceleration"]
if vel is not None:
self.fast_motor.velocity.put(vel)
if acc is not None:
self.fast_motor.acceleration.put(acc)
def _has_base_velocity(self) -> bool:
return hasattr(self.fast_motor, "base_velocity")
+344
View File
@@ -0,0 +1,344 @@
"""
Continuous line scan implemented implemented for any motor with velocity and acceleration attributes.
Optionally, it supports a check for a base velocity attribute for user_motors. The scan will move the
motor to a premove position before starting the scan, then move continuously across the line while
detectors are triggered by the delay generator. The motor velocity is set such that it reaches the
end position at the same time as the last acquisition point is finished. The acceleration time is calculated
based on the fraction of target velocity to maximum velocity.
We assume that the motor is configured with its maximum velocity and acceleration time needed to reach this
speed. The scan will then scale down the velocity and acceleration time proportionally to reach the target velocity.
Having a base_velocity available helps to ensure that the motor is properly configured for the scan. We check for
the signal 'base_velocity' if available and consider this for the calculation.
After the scan, velocity and acceleration time will be restored. This also happens if the scan is aborted or an exception
occurs.
Scan procedure:
- prepare_scan
- open_scan
- stage
- pre_scan
- scan_core
- at_each_point (optionally called by scan_core)
- post_scan
- unstage
- close_scan
- on_exception (called if any exception is raised during the scan)
"""
from __future__ import annotations
import time
from typing import Annotated
import numpy as np
from bec_lib.device import DeviceBase
from bec_lib.logger import bec_logger
from bec_lib.scan_args import ScanArgument, Units
from bec_server.scan_server.errors import ScanAbortion
from bec_server.scan_server.scans.scan_modifier import scan_hook
from bec_server.scan_server.scans.scans_v4 import ScanBase, ScanType
logger = bec_logger.logger
class ContLineScanCSAXS(ScanBase):
scan_type = ScanType.HARDWARE_TRIGGERED
scan_name = "cont_line_scan_csaxs"
required_kwargs = ["steps", "relative"]
gui_config = {
"Devices": ["fast_motor"],
"Movement Parameters": ["start", "stop", "steps", "relative"],
"Acquisition Parameters": [
"exp_time",
"readout_time",
"frames_per_trigger",
"detector_delay",
],
}
def __init__(
self,
fast_motor: DeviceBase,
start: Annotated[
float, ScanArgument(display_name="Start Position", reference_units="fast_motor")
],
stop: Annotated[
float, ScanArgument(display_name="Stop Position", reference_units="fast_motor")
],
step_size: Annotated[
float, ScanArgument(display_name="Step Size", reference_units="fast_motor", gt=0)
],
exp_time: Annotated[float, ScanArgument(display_name="Exposure Time", units=Units.s, gt=0)],
detector_delay: Annotated[
float, ScanArgument(display_name="Detector Delay", units=Units.s, ge=0)
] = 0.01,
relative: bool = False,
always_scan_in_positive_direction: bool = False,
**kwargs,
):
"""
Continuous line scan.
Args:
start: nominal line start position of the fast axis
stop: nominal line end position of the fast axis
step_size: step size for the continuous line scan
exp_time: exposure time in seconds
readout_time: detector readout time in seconds
frames_per_trigger: number of frames per trigger
fast_motor: OWIS motor used as fast axis
relative: if True, interpret start and stop relative to the current motor position
detector_delay: delay between motor reaching scan speed and detector burst start
shutter_additional_width: extra time the fast shutter stays open per line
add_pre_move_time: additional pre-move time translated into distance
detector_delay_generator: DDG triggering the detector chain
monitor_delay_generator: DDG providing monitor/MCS timing
shutter_delay_generator: DDG controlling the shutter pulse
progress_device: device exposing line progress
"""
super().__init__(**kwargs)
self.fast_motor = self.dev[fast_motor] if isinstance(fast_motor, str) else fast_motor
self.ddg = self.dev.get("ddg", None)
if self.ddg is None:
raise ScanAbortion(
"Main delay generator named 'ddg1' device is required for the continuous line scan."
)
self.mcs = self.dev.get("mcs", None)
if self.mcs is None:
logger.warning(
"Did not find MCS device named 'mcs'. Monitoring of acquisition progress might not work properly."
)
# Relevant scan parameters
self.motors = [self.fast_motor]
self.start = start
self.stop = stop
# Flip start and stop if configured to always scan in positive direction
if always_scan_in_positive_direction and self.stop < self.start:
self.start, self.stop = self.stop, self.start
self.step_size = step_size
self.exp_time = exp_time
self.relative = relative
self.detector_delay = detector_delay
# Motor config parameters
self._original_motor_values = {"velocity": None, "acceleration": None}
self.target_velocity = None
self.base_velocity = None
self.acc_time = None
self.premove_distance = None
# DDG parameter
self.shutter_open_delay = None
@scan_hook
def prepare_scan(self):
if np.isclose(self.stop, self.start, atol=self.step_size):
raise ScanAbortion(
f"Stop {self.stop} and start {self.start} positions are too close for the given step size {self.step_size}."
)
frames_per_trigger = int(np.ceil(np.abs(self.stop - self.start) / self.step_size))
# Handle positions and limits first to catch potential issues before everything else.
self.positions = self._prepare_positions(
start=self.start, stop=self.stop, steps=frames_per_trigger
)
if self.relative:
self.start_positions = self.components.get_start_positions(self.motors)
self.positions += self.start_positions
self.actions.set_device_readout_priority(self.motors, priority="monitored")
# Compute target velocity, acceleration time and premove distance based on the scan parameters and motor capabilities
# We also need to fetch the delay between the shutter opening and the triggers to start by the delay generator
# As this time is very small (2e-3), we add it to the premove distance. But technically it could also be
# handled by adjusting the sleep after sending of the motor move command.
self.shutter_open_delay = self.ddg.get_shutter_open_delay()
self.premove_distance = self._compute_premove_distance(self.shutter_open_delay)
# Check +/- premove distance around start and stop
position_range = self._prepare_positions(
start=self.start - self.premove_distance,
stop=self.stop + self.premove_distance,
steps=2,
include_endpoint=True,
)
self.components.check_limits(self.motors, position_range)
# Setup progress device if available
if self.mcs is not None:
self.actions.add_scan_report_instruction_device_progress(self.mcs)
else:
logger.warning(
"MCS device not found. Progress reporting will not work for this scan. Please add a device named 'mcs' to enable progress reporting."
)
# Set scan info paramters
self.update_scan_info(
positions=self.positions,
num_points=1,
num_monitored_readouts=0,
frames_per_trigger=frames_per_trigger,
exp_time=self.exp_time,
relative=self.relative,
run_on_exception_hook=True,
)
self._baseline_readout_status = self.actions.read_baseline_devices(wait=False)
self._premove_motor_status = self.actions.set(
self.fast_motor, self.positions[0, 0] - self.premove_distance, wait=False
)
@scan_hook
def open_scan(self):
self.actions.open_scan()
@scan_hook
def stage(self):
self.actions.stage_all_devices()
@scan_hook
def pre_scan(self):
self._premove_motor_status.wait()
self.actions.pre_scan()
@scan_hook
def scan_core(self):
self.at_each_point()
@scan_hook
def at_each_point(self):
# Now we set the motor velocity and acceleration for the scan
st = self.fast_motor.velocity.set(self.target_velocity)
st2 = self.fast_motor.acceleration.set(self.acc_time)
st.wait()
st2.wait()
move_status = self.actions.set(
self.fast_motor, self.stop + self.premove_distance, wait=False
)
time.sleep(self.acc_time)
trigger_status = self.ddg.trigger()
while not move_status.done:
self.actions.read_monitored_devices(wait=True)
try: # Readout at 2 Hz
move_status.wait(timeout=0.5) # only sleep as long as the move is not finished
except TimeoutError:
continue
try:
trigger_status.wait(timeout=2)
except TimeoutError:
raise ScanAbortion(
f"Status for delay generator trigger {self.ddg} did not resolve after 2 seconds. "
)
@scan_hook
def post_scan(self):
self._restore_motor_properties()
status = self.actions.complete_all_devices(wait=False)
if self.relative:
self.components.move_and_wait(self.motors, self.start_positions)
status.wait()
@scan_hook
def unstage(self):
self.actions.unstage_all_devices()
@scan_hook
def close_scan(self):
if self._baseline_readout_status is not None:
self._baseline_readout_status.wait()
self.actions.close_scan()
self.actions.check_for_unchecked_statuses()
@scan_hook
def on_exception(self, exception: Exception):
self._restore_motor_properties()
if self.relative:
self.components.move_and_wait(self.motors, self.start_positions)
##########################
### Internal helper methods
##########################
def _prepare_positions(
self, start: float, stop: float, steps: int, include_endpoint: bool = False
) -> np.ndarray:
axis_positions = np.linspace(start, stop, steps, endpoint=include_endpoint, dtype=float)
return np.column_stack(axis_positions)
def _load_motor_properties(self) -> tuple[float, float, float]:
if not hasattr(self.fast_motor, "velocity"):
raise ScanAbortion(f"Motor {self.fast_motor} does not have a velocity attribute.")
if not hasattr(self.fast_motor, "acceleration"):
raise ScanAbortion(f"Motor {self.fast_motor} does not have an acceleration attribute.")
vel = self.fast_motor.velocity.get()
acc = self.fast_motor.acceleration.get()
self._original_motor_values["velocity"] = vel
self._original_motor_values["acceleration"] = acc
if self._has_base_velocity():
base_vel = self.fast_motor.base_velocity.get()
else:
base_vel = 0.0
return vel, acc, base_vel
def _compute_continous_motion_params(self) -> tuple[float, float]:
# Compute target velocity based on step size and exposure time
self.target_velocity = self.step_size / self.exp_time
# Safeguard high and low velocity limits
if self.target_velocity > self._original_motor_values["velocity"]:
raise ScanAbortion(
f"Requested velocity of {self.target_velocity} exceeds maximum velocity {self._original_motor_values['velocity']} of motor {self.fast_motor.name}."
)
if self._has_base_velocity() and self.target_velocity < self.base_velocity:
raise ScanAbortion(
f"Requested velocity of {self.target_velocity} is below base velocity {self.base_velocity}."
)
# Take base velocity into account.
acc_time = (
(self.target_velocity - self.base_velocity)
/ (self._original_motor_values["velocity"] - self.base_velocity)
* self._original_motor_values["acceleration"]
)
return self.target_velocity, acc_time
def _compute_premove_distance(self, additional_distance: float) -> float:
# Load old motor parameters
vel, acc, base_vel = self._load_motor_properties()
self._original_motor_values["velocity"] = vel
self._original_motor_values["acceleration"] = acc
self.base_velocity = base_vel
# Compute target velocity and acceleration time for the continuous motion
target_vel, acc_time = self._compute_continous_motion_params()
self.target_velocity = target_vel
self.acc_time = acc_time
# Compute the premove distance needed to ensure the motor reaches the target velocity
return (
0.5 * (self.target_velocity + self.base_velocity) * self.acc_time
+ additional_distance * self.target_velocity
)
def _set_scan_velocity(self):
self.fast_motor.velocity.put(self.target_velocity)
self.fast_motor.acceleration.put(self.acc_time)
def _restore_motor_properties(self):
vel = self._original_motor_values["velocity"]
acc = self._original_motor_values["acceleration"]
if vel is not None:
self.fast_motor.velocity.put(vel)
if acc is not None:
self.fast_motor.acceleration.put(acc)
def _has_base_velocity(self) -> bool:
return hasattr(self.fast_motor, "base_velocity")