feat: add old xtreme scans #12

Merged
wakonig_k merged 1 commits from feature/xtreme_scans into main 2026-03-17 10:16:25 +01:00
4 changed files with 305 additions and 0 deletions

View File

@@ -0,0 +1,132 @@
from unittest import mock
import pytest
from bec_server.scan_server.tests.fixtures import * # noqa: F401, F403
from xtreme_bec.scans.hyst_scan import HystScan
# pylint: disable=missing-function-docstring
# pylint: disable=protected-access
@pytest.fixture
def hyst_scan(scan_assembler, device_manager_mock):
"""HystScan instance with samx as field motor (flyer) and samy as mono."""
return scan_assembler(
HystScan,
device_manager_mock.devices["samx"], # field_motor / flyer
0.0, # start_field
0.5, # end_field
device_manager_mock.devices["samy"], # mono / energy_motor
600.0, # energy1
640.0, # energy2
ramp_rate=3.0, # distinct from default_ramp_rate=2 for clearer assertions
parameter={"args": {}, "kwargs": {}},
)
@pytest.mark.parametrize(
"in_args, num_steps, reference_positions",
[
([600], 5, [600, 600, 600, 600, 600]),
([600, 640], 6, [600, 640, 600, 640, 600, 640]),
([600, 620, 640], 9, [600, 620, 640, 620, 600, 620, 640, 620, 600]),
([1, 2, 3, 4], 10, [1, 2, 3, 4, 3, 2, 1, 2, 3, 4]),
],
)
def test_get_next_scan_motor_position(in_args, num_steps, reference_positions, hyst_scan):
hyst_scan.energy_positions = in_args
gen = hyst_scan._get_next_scan_motor_position()
assert [next(gen) for _ in range(num_steps)] == reference_positions
def test_hyst_scan_init(scan_assembler, device_manager_mock):
field_motor = device_manager_mock.devices["samx"]
mono = device_manager_mock.devices["samy"]
scan = scan_assembler(
HystScan,
field_motor, 0.0, 0.5,
mono, 600.0, 640.0,
parameter={"args": {}, "kwargs": {}},
)
assert scan.flyer is field_motor
assert scan.energy_motor is mono
assert scan.flyer_positions == [0.0, 0.5]
assert scan.energy_positions == [600.0, 640.0]
assert scan.ramp_rate == HystScan.default_ramp_rate
assert scan.scan_motors == [mono, field_motor]
def test_hyst_scan_prepare_positions(hyst_scan):
with mock.patch.object(hyst_scan, "_check_limits"):
list(hyst_scan.prepare_positions())
assert hyst_scan.positions == [[600.0], [640.0]]
assert hyst_scan.num_pos == 0
def test_hyst_scan_scan_core(hyst_scan, device_manager_mock, ScanStubStatusMock):
"""scan_core ramps the field while stepping the energy motor once per loop iteration."""
field_motor = device_manager_mock.devices["samx"]
mono = device_manager_mock.devices["samy"]
def fake_done():
yield False # first while-check: enter loop
yield True # second while-check: exit loop
def fake_set(*args, **kwargs):
yield "fake_set"
return ScanStubStatusMock(done_func=fake_done)
def fake_rpc(*args, **kwargs):
yield "fake_rpc"
def fake_read(*args, **kwargs):
yield "fake_read"
# Replace connector with a MagicMock so send_client_info is freely callable
hyst_scan.connector = mock.MagicMock()
devices_cls = type(device_manager_mock.devices)
with mock.patch.object(hyst_scan.stubs, "set", side_effect=fake_set) as mock_set:
with mock.patch.object(
hyst_scan.stubs, "send_rpc_and_wait", side_effect=fake_rpc
) as mock_rpc:
with mock.patch.object(hyst_scan.stubs, "read", side_effect=fake_read) as mock_read:
with mock.patch.object(devices_cls, "monitored_devices", return_value=[]):
output = list(hyst_scan.scan_core())
assert output == [
"fake_rpc", # send_rpc_and_wait(flyer, "ramprate.set", default_ramp_rate)
"fake_set", # set(flyer, start_field=0.0)
"fake_rpc", # send_rpc_and_wait(flyer, "ramprate.set", ramp_rate=3.0)
"fake_set", # set(flyer, end_field=0.5) ← status captured here
"fake_set", # set(energy_motor, energy1=600.0) — inside loop
"fake_read", # read(flyer, energy_motor) — inside loop
"fake_rpc", # send_rpc_and_wait(flyer, "ramprate.set", default_ramp_rate) — after loop
]
assert hyst_scan.num_pos == 1
assert hyst_scan.point_id == 1
# Ramp-rate sequence: brake to default → set scan rate → restore default
assert mock_rpc.call_args_list[0] == mock.call(
field_motor, "ramprate.set", HystScan.default_ramp_rate
)
assert mock_rpc.call_args_list[1] == mock.call(field_motor, "ramprate.set", 3.0)
assert mock_rpc.call_args_list[2] == mock.call(
field_motor, "ramprate.set", HystScan.default_ramp_rate
)
# Field motor: move to start position, then begin flying to end (non-blocking)
assert mock_set.call_args_list[0] == mock.call(device=field_motor, value=0.0)
assert mock_set.call_args_list[1] == mock.call(device=field_motor, value=0.5, wait=False)
# Energy motor steps to first energy position inside the loop
assert mock_set.call_args_list[2] == mock.call(device=mono, value=600.0)
# Read issued once (one loop iteration), with flyer and energy motor
mock_read.assert_called_once_with(device=[field_motor, mono], point_id=0)
# Progress message sent for the energy step
hyst_scan.connector.send_client_info.assert_called_once_with(
"Moving mono to 600.0.", source="scan_server", expire=10
)

View File

@@ -0,0 +1,2 @@
from .hyst_scan import HystScan
from .otf_scan import OTFScan

View File

@@ -0,0 +1,112 @@
from __future__ import annotations
from bec_lib.device import DeviceBase
from bec_lib.logger import bec_logger
from bec_server.scan_server.scans import ScanBase
logger = bec_logger.logger
class HystScan(ScanBase):
scan_name = "hyst_scan"
required_kwargs = []
scan_type = "step"
default_ramp_rate = 2
def __init__(
self,
field_motor: DeviceBase,
start_field: float,
end_field: float,
mono: DeviceBase,
energy1: float,
energy2: float,
ramp_rate: float = default_ramp_rate,
**kwargs,
):
"""
A hysteresis scan that steps the energy between energy1 and energy2 while
ramping the field from start_field to end_field and back to start_field.
The field is ramped at ramp_rate T/min.
scans.hyst_scan(field_motor, start_field, end_field, mono, energy1, energy2)
Examples:
>>> scans.hyst_scan(dev.field_x, 0, 0.5, dev.mono, 600, 640, ramp_rate=2)
"""
super().__init__(**kwargs)
self.axis = []
self.flyer = field_motor
self.energy_motor = mono
self.scan_motors = [self.energy_motor, self.flyer]
self.flyer_positions = [start_field, end_field]
self.energy_positions = [energy1, energy2]
self._current_scan_motor_index = 0
self._scan_motor_direction = 1
self.ramp_rate = ramp_rate
def prepare_positions(self):
self.positions = [[pos] for pos in self.energy_positions]
self.num_pos = 0
yield None
self._check_limits()
def _get_next_scan_motor_position(self):
positions = self.energy_positions
n = len(positions)
if n == 1:
# Only one position, so just yield it indefinitely
while True:
yield positions[0]
# Multiple positions, so yield them in a back-and-forth pattern
# For example, if positions = [600, 620, 640], yield:
# 600, 620, 640, 620, 600, 620, 640, 620, 600, ...
idx = 0
direction = 1
while True:
yield positions[idx]
if idx == n - 1:
direction = -1
elif idx == 0:
direction = 1
idx += direction
def scan_core(self):
# yield from self._move_scan_motors_and_wait(self.positions[0])
yield from self.stubs.send_rpc_and_wait(self.flyer, "ramprate.set", self.default_ramp_rate)
yield from self.stubs.set(device=self.flyer, value=self.flyer_positions[0])
yield from self.stubs.send_rpc_and_wait(self.flyer, "ramprate.set", self.ramp_rate)
# send the slow motor on its way
status = yield from self.stubs.set(
device=self.flyer, value=self.flyer_positions[1], wait=False
)
pos_generator = self._get_next_scan_motor_position()
dev = self.device_manager.devices
while not status.done:
val = next(pos_generator)
self.connector.send_client_info(
f"Moving mono to {val}.", source="scan_server", expire=10
)
yield from self.stubs.set(device=self.energy_motor, value=val)
monitored_devices = [_dev.name for _dev in dev.monitored_devices([])]
yield from self.stubs.read(
device=[self.flyer, self.scan_motors[0], *monitored_devices], point_id=self.point_id
)
# time.sleep(1)
self.point_id += 1
self.num_pos += 1
yield from self.stubs.send_rpc_and_wait(self.flyer, "ramprate.set", self.default_ramp_rate)
def move_to_start(self):
yield None

View File

@@ -0,0 +1,59 @@
from __future__ import annotations
import time
from bec_lib.logger import bec_logger
from bec_server.scan_server.scans import SyncFlyScanBase
logger = bec_logger.logger
class OTFScan(SyncFlyScanBase):
scan_name = "otf_scan"
required_kwargs = ["e1", "e2", "time"]
arg_input = {}
arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None}
def __init__(self, *args, parameter: dict = None, **kwargs):
"""Scans the energy from e1 to e2 in <time> minutes.
Examples:
>>> scans.otf_scan(e1=700, e2=740, time=4)
"""
super().__init__(parameter=parameter, **kwargs)
self.axis = []
self.scan_motors = []
self.num_pos = 0
self.mono = self.caller_kwargs.get("mono", "mono")
self.otf_device = self.caller_kwargs.get("otf", "otf")
def pre_scan(self):
yield None
def scan_report_instructions(self):
yield from self.stubs.scan_report_instruction({"device_progress": [self.otf_device]})
@property
def monitor_sync(self):
return self.otf_device
def scan_core(self):
yield from self.stubs.set(device=self.mono, value=self.caller_kwargs["e1"], wait=True)
yield from self.stubs.kickoff(
device=self.otf_device,
parameter={
key: val for key, val in self.caller_kwargs.items() if key in ["e1", "e2", "time"]
},
wait=True,
)
status = yield from self.stubs.complete(device=self.otf_device)
while not status.done:
yield from self.stubs.read(group="monitored", wait=True)
progress = self.stubs.get_device_progress(
device=self.otf_device, RID=self.metadata["RID"]
)
if progress:
self.num_pos = progress
time.sleep(1)