feat: add old xtreme scans #12
132
tests/tests_scans/test_hyst_scan.py
Normal file
132
tests/tests_scans/test_hyst_scan.py
Normal file
@@ -0,0 +1,132 @@
|
||||
from unittest import mock
|
||||
|
||||
import pytest
|
||||
|
||||
from bec_server.scan_server.tests.fixtures import * # noqa: F401, F403
|
||||
|
||||
from xtreme_bec.scans.hyst_scan import HystScan
|
||||
|
||||
# pylint: disable=missing-function-docstring
|
||||
# pylint: disable=protected-access
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def hyst_scan(scan_assembler, device_manager_mock):
|
||||
"""HystScan instance with samx as field motor (flyer) and samy as mono."""
|
||||
return scan_assembler(
|
||||
HystScan,
|
||||
device_manager_mock.devices["samx"], # field_motor / flyer
|
||||
0.0, # start_field
|
||||
0.5, # end_field
|
||||
device_manager_mock.devices["samy"], # mono / energy_motor
|
||||
600.0, # energy1
|
||||
640.0, # energy2
|
||||
ramp_rate=3.0, # distinct from default_ramp_rate=2 for clearer assertions
|
||||
parameter={"args": {}, "kwargs": {}},
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"in_args, num_steps, reference_positions",
|
||||
[
|
||||
([600], 5, [600, 600, 600, 600, 600]),
|
||||
([600, 640], 6, [600, 640, 600, 640, 600, 640]),
|
||||
([600, 620, 640], 9, [600, 620, 640, 620, 600, 620, 640, 620, 600]),
|
||||
([1, 2, 3, 4], 10, [1, 2, 3, 4, 3, 2, 1, 2, 3, 4]),
|
||||
],
|
||||
)
|
||||
def test_get_next_scan_motor_position(in_args, num_steps, reference_positions, hyst_scan):
|
||||
hyst_scan.energy_positions = in_args
|
||||
gen = hyst_scan._get_next_scan_motor_position()
|
||||
assert [next(gen) for _ in range(num_steps)] == reference_positions
|
||||
|
||||
|
||||
def test_hyst_scan_init(scan_assembler, device_manager_mock):
|
||||
field_motor = device_manager_mock.devices["samx"]
|
||||
mono = device_manager_mock.devices["samy"]
|
||||
scan = scan_assembler(
|
||||
HystScan,
|
||||
field_motor, 0.0, 0.5,
|
||||
mono, 600.0, 640.0,
|
||||
parameter={"args": {}, "kwargs": {}},
|
||||
)
|
||||
assert scan.flyer is field_motor
|
||||
assert scan.energy_motor is mono
|
||||
assert scan.flyer_positions == [0.0, 0.5]
|
||||
assert scan.energy_positions == [600.0, 640.0]
|
||||
assert scan.ramp_rate == HystScan.default_ramp_rate
|
||||
assert scan.scan_motors == [mono, field_motor]
|
||||
|
||||
|
||||
def test_hyst_scan_prepare_positions(hyst_scan):
|
||||
with mock.patch.object(hyst_scan, "_check_limits"):
|
||||
list(hyst_scan.prepare_positions())
|
||||
assert hyst_scan.positions == [[600.0], [640.0]]
|
||||
assert hyst_scan.num_pos == 0
|
||||
|
||||
|
||||
def test_hyst_scan_scan_core(hyst_scan, device_manager_mock, ScanStubStatusMock):
|
||||
"""scan_core ramps the field while stepping the energy motor once per loop iteration."""
|
||||
field_motor = device_manager_mock.devices["samx"]
|
||||
mono = device_manager_mock.devices["samy"]
|
||||
|
||||
def fake_done():
|
||||
yield False # first while-check: enter loop
|
||||
yield True # second while-check: exit loop
|
||||
|
||||
def fake_set(*args, **kwargs):
|
||||
yield "fake_set"
|
||||
return ScanStubStatusMock(done_func=fake_done)
|
||||
|
||||
def fake_rpc(*args, **kwargs):
|
||||
yield "fake_rpc"
|
||||
|
||||
def fake_read(*args, **kwargs):
|
||||
yield "fake_read"
|
||||
|
||||
# Replace connector with a MagicMock so send_client_info is freely callable
|
||||
hyst_scan.connector = mock.MagicMock()
|
||||
|
||||
devices_cls = type(device_manager_mock.devices)
|
||||
with mock.patch.object(hyst_scan.stubs, "set", side_effect=fake_set) as mock_set:
|
||||
with mock.patch.object(
|
||||
hyst_scan.stubs, "send_rpc_and_wait", side_effect=fake_rpc
|
||||
) as mock_rpc:
|
||||
with mock.patch.object(hyst_scan.stubs, "read", side_effect=fake_read) as mock_read:
|
||||
with mock.patch.object(devices_cls, "monitored_devices", return_value=[]):
|
||||
output = list(hyst_scan.scan_core())
|
||||
|
||||
assert output == [
|
||||
"fake_rpc", # send_rpc_and_wait(flyer, "ramprate.set", default_ramp_rate)
|
||||
"fake_set", # set(flyer, start_field=0.0)
|
||||
"fake_rpc", # send_rpc_and_wait(flyer, "ramprate.set", ramp_rate=3.0)
|
||||
"fake_set", # set(flyer, end_field=0.5) ← status captured here
|
||||
"fake_set", # set(energy_motor, energy1=600.0) — inside loop
|
||||
"fake_read", # read(flyer, energy_motor) — inside loop
|
||||
"fake_rpc", # send_rpc_and_wait(flyer, "ramprate.set", default_ramp_rate) — after loop
|
||||
]
|
||||
assert hyst_scan.num_pos == 1
|
||||
assert hyst_scan.point_id == 1
|
||||
|
||||
# Ramp-rate sequence: brake to default → set scan rate → restore default
|
||||
assert mock_rpc.call_args_list[0] == mock.call(
|
||||
field_motor, "ramprate.set", HystScan.default_ramp_rate
|
||||
)
|
||||
assert mock_rpc.call_args_list[1] == mock.call(field_motor, "ramprate.set", 3.0)
|
||||
assert mock_rpc.call_args_list[2] == mock.call(
|
||||
field_motor, "ramprate.set", HystScan.default_ramp_rate
|
||||
)
|
||||
|
||||
# Field motor: move to start position, then begin flying to end (non-blocking)
|
||||
assert mock_set.call_args_list[0] == mock.call(device=field_motor, value=0.0)
|
||||
assert mock_set.call_args_list[1] == mock.call(device=field_motor, value=0.5, wait=False)
|
||||
# Energy motor steps to first energy position inside the loop
|
||||
assert mock_set.call_args_list[2] == mock.call(device=mono, value=600.0)
|
||||
|
||||
# Read issued once (one loop iteration), with flyer and energy motor
|
||||
mock_read.assert_called_once_with(device=[field_motor, mono], point_id=0)
|
||||
|
||||
# Progress message sent for the energy step
|
||||
hyst_scan.connector.send_client_info.assert_called_once_with(
|
||||
"Moving mono to 600.0.", source="scan_server", expire=10
|
||||
)
|
||||
@@ -0,0 +1,2 @@
|
||||
from .hyst_scan import HystScan
|
||||
from .otf_scan import OTFScan
|
||||
|
||||
112
xtreme_bec/scans/hyst_scan.py
Normal file
112
xtreme_bec/scans/hyst_scan.py
Normal file
@@ -0,0 +1,112 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from bec_lib.device import DeviceBase
|
||||
from bec_lib.logger import bec_logger
|
||||
from bec_server.scan_server.scans import ScanBase
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
|
||||
class HystScan(ScanBase):
|
||||
scan_name = "hyst_scan"
|
||||
required_kwargs = []
|
||||
scan_type = "step"
|
||||
default_ramp_rate = 2
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
field_motor: DeviceBase,
|
||||
start_field: float,
|
||||
end_field: float,
|
||||
mono: DeviceBase,
|
||||
energy1: float,
|
||||
energy2: float,
|
||||
ramp_rate: float = default_ramp_rate,
|
||||
**kwargs,
|
||||
):
|
||||
"""
|
||||
A hysteresis scan that steps the energy between energy1 and energy2 while
|
||||
ramping the field from start_field to end_field and back to start_field.
|
||||
The field is ramped at ramp_rate T/min.
|
||||
|
||||
scans.hyst_scan(field_motor, start_field, end_field, mono, energy1, energy2)
|
||||
|
||||
Examples:
|
||||
>>> scans.hyst_scan(dev.field_x, 0, 0.5, dev.mono, 600, 640, ramp_rate=2)
|
||||
|
||||
"""
|
||||
super().__init__(**kwargs)
|
||||
self.axis = []
|
||||
self.flyer = field_motor
|
||||
self.energy_motor = mono
|
||||
self.scan_motors = [self.energy_motor, self.flyer]
|
||||
self.flyer_positions = [start_field, end_field]
|
||||
self.energy_positions = [energy1, energy2]
|
||||
self._current_scan_motor_index = 0
|
||||
self._scan_motor_direction = 1
|
||||
self.ramp_rate = ramp_rate
|
||||
|
||||
def prepare_positions(self):
|
||||
self.positions = [[pos] for pos in self.energy_positions]
|
||||
self.num_pos = 0
|
||||
yield None
|
||||
self._check_limits()
|
||||
|
||||
def _get_next_scan_motor_position(self):
|
||||
positions = self.energy_positions
|
||||
n = len(positions)
|
||||
|
||||
if n == 1:
|
||||
# Only one position, so just yield it indefinitely
|
||||
while True:
|
||||
yield positions[0]
|
||||
|
||||
# Multiple positions, so yield them in a back-and-forth pattern
|
||||
# For example, if positions = [600, 620, 640], yield:
|
||||
# 600, 620, 640, 620, 600, 620, 640, 620, 600, ...
|
||||
idx = 0
|
||||
direction = 1
|
||||
|
||||
while True:
|
||||
yield positions[idx]
|
||||
if idx == n - 1:
|
||||
direction = -1
|
||||
elif idx == 0:
|
||||
direction = 1
|
||||
|
||||
idx += direction
|
||||
|
||||
def scan_core(self):
|
||||
# yield from self._move_scan_motors_and_wait(self.positions[0])
|
||||
yield from self.stubs.send_rpc_and_wait(self.flyer, "ramprate.set", self.default_ramp_rate)
|
||||
yield from self.stubs.set(device=self.flyer, value=self.flyer_positions[0])
|
||||
|
||||
yield from self.stubs.send_rpc_and_wait(self.flyer, "ramprate.set", self.ramp_rate)
|
||||
|
||||
# send the slow motor on its way
|
||||
status = yield from self.stubs.set(
|
||||
device=self.flyer, value=self.flyer_positions[1], wait=False
|
||||
)
|
||||
|
||||
pos_generator = self._get_next_scan_motor_position()
|
||||
|
||||
dev = self.device_manager.devices
|
||||
while not status.done:
|
||||
val = next(pos_generator)
|
||||
self.connector.send_client_info(
|
||||
f"Moving mono to {val}.", source="scan_server", expire=10
|
||||
)
|
||||
yield from self.stubs.set(device=self.energy_motor, value=val)
|
||||
|
||||
monitored_devices = [_dev.name for _dev in dev.monitored_devices([])]
|
||||
yield from self.stubs.read(
|
||||
device=[self.flyer, self.scan_motors[0], *monitored_devices], point_id=self.point_id
|
||||
)
|
||||
# time.sleep(1)
|
||||
self.point_id += 1
|
||||
self.num_pos += 1
|
||||
|
||||
yield from self.stubs.send_rpc_and_wait(self.flyer, "ramprate.set", self.default_ramp_rate)
|
||||
|
||||
def move_to_start(self):
|
||||
yield None
|
||||
59
xtreme_bec/scans/otf_scan.py
Normal file
59
xtreme_bec/scans/otf_scan.py
Normal file
@@ -0,0 +1,59 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
|
||||
from bec_lib.logger import bec_logger
|
||||
from bec_server.scan_server.scans import SyncFlyScanBase
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
|
||||
class OTFScan(SyncFlyScanBase):
|
||||
scan_name = "otf_scan"
|
||||
required_kwargs = ["e1", "e2", "time"]
|
||||
arg_input = {}
|
||||
arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None}
|
||||
|
||||
def __init__(self, *args, parameter: dict = None, **kwargs):
|
||||
"""Scans the energy from e1 to e2 in <time> minutes.
|
||||
|
||||
Examples:
|
||||
>>> scans.otf_scan(e1=700, e2=740, time=4)
|
||||
|
||||
"""
|
||||
super().__init__(parameter=parameter, **kwargs)
|
||||
self.axis = []
|
||||
self.scan_motors = []
|
||||
self.num_pos = 0
|
||||
self.mono = self.caller_kwargs.get("mono", "mono")
|
||||
self.otf_device = self.caller_kwargs.get("otf", "otf")
|
||||
|
||||
def pre_scan(self):
|
||||
yield None
|
||||
|
||||
def scan_report_instructions(self):
|
||||
yield from self.stubs.scan_report_instruction({"device_progress": [self.otf_device]})
|
||||
|
||||
@property
|
||||
def monitor_sync(self):
|
||||
return self.otf_device
|
||||
|
||||
def scan_core(self):
|
||||
yield from self.stubs.set(device=self.mono, value=self.caller_kwargs["e1"], wait=True)
|
||||
yield from self.stubs.kickoff(
|
||||
device=self.otf_device,
|
||||
parameter={
|
||||
key: val for key, val in self.caller_kwargs.items() if key in ["e1", "e2", "time"]
|
||||
},
|
||||
wait=True,
|
||||
)
|
||||
status = yield from self.stubs.complete(device=self.otf_device)
|
||||
|
||||
while not status.done:
|
||||
yield from self.stubs.read(group="monitored", wait=True)
|
||||
progress = self.stubs.get_device_progress(
|
||||
device=self.otf_device, RID=self.metadata["RID"]
|
||||
)
|
||||
if progress:
|
||||
self.num_pos = progress
|
||||
time.sleep(1)
|
||||
Reference in New Issue
Block a user