diff --git a/tests/tests_scans/test_hyst_scan.py b/tests/tests_scans/test_hyst_scan.py new file mode 100644 index 0000000..aa35057 --- /dev/null +++ b/tests/tests_scans/test_hyst_scan.py @@ -0,0 +1,132 @@ +from unittest import mock + +import pytest + +from bec_server.scan_server.tests.fixtures import * # noqa: F401, F403 + +from xtreme_bec.scans.hyst_scan import HystScan + +# pylint: disable=missing-function-docstring +# pylint: disable=protected-access + + +@pytest.fixture +def hyst_scan(scan_assembler, device_manager_mock): + """HystScan instance with samx as field motor (flyer) and samy as mono.""" + return scan_assembler( + HystScan, + device_manager_mock.devices["samx"], # field_motor / flyer + 0.0, # start_field + 0.5, # end_field + device_manager_mock.devices["samy"], # mono / energy_motor + 600.0, # energy1 + 640.0, # energy2 + ramp_rate=3.0, # distinct from default_ramp_rate=2 for clearer assertions + parameter={"args": {}, "kwargs": {}}, + ) + + +@pytest.mark.parametrize( + "in_args, num_steps, reference_positions", + [ + ([600], 5, [600, 600, 600, 600, 600]), + ([600, 640], 6, [600, 640, 600, 640, 600, 640]), + ([600, 620, 640], 9, [600, 620, 640, 620, 600, 620, 640, 620, 600]), + ([1, 2, 3, 4], 10, [1, 2, 3, 4, 3, 2, 1, 2, 3, 4]), + ], +) +def test_get_next_scan_motor_position(in_args, num_steps, reference_positions, hyst_scan): + hyst_scan.energy_positions = in_args + gen = hyst_scan._get_next_scan_motor_position() + assert [next(gen) for _ in range(num_steps)] == reference_positions + + +def test_hyst_scan_init(scan_assembler, device_manager_mock): + field_motor = device_manager_mock.devices["samx"] + mono = device_manager_mock.devices["samy"] + scan = scan_assembler( + HystScan, + field_motor, 0.0, 0.5, + mono, 600.0, 640.0, + parameter={"args": {}, "kwargs": {}}, + ) + assert scan.flyer is field_motor + assert scan.energy_motor is mono + assert scan.flyer_positions == [0.0, 0.5] + assert scan.energy_positions == [600.0, 640.0] + assert scan.ramp_rate == HystScan.default_ramp_rate + assert scan.scan_motors == [mono, field_motor] + + +def test_hyst_scan_prepare_positions(hyst_scan): + with mock.patch.object(hyst_scan, "_check_limits"): + list(hyst_scan.prepare_positions()) + assert hyst_scan.positions == [[600.0], [640.0]] + assert hyst_scan.num_pos == 0 + + +def test_hyst_scan_scan_core(hyst_scan, device_manager_mock, ScanStubStatusMock): + """scan_core ramps the field while stepping the energy motor once per loop iteration.""" + field_motor = device_manager_mock.devices["samx"] + mono = device_manager_mock.devices["samy"] + + def fake_done(): + yield False # first while-check: enter loop + yield True # second while-check: exit loop + + def fake_set(*args, **kwargs): + yield "fake_set" + return ScanStubStatusMock(done_func=fake_done) + + def fake_rpc(*args, **kwargs): + yield "fake_rpc" + + def fake_read(*args, **kwargs): + yield "fake_read" + + # Replace connector with a MagicMock so send_client_info is freely callable + hyst_scan.connector = mock.MagicMock() + + devices_cls = type(device_manager_mock.devices) + with mock.patch.object(hyst_scan.stubs, "set", side_effect=fake_set) as mock_set: + with mock.patch.object( + hyst_scan.stubs, "send_rpc_and_wait", side_effect=fake_rpc + ) as mock_rpc: + with mock.patch.object(hyst_scan.stubs, "read", side_effect=fake_read) as mock_read: + with mock.patch.object(devices_cls, "monitored_devices", return_value=[]): + output = list(hyst_scan.scan_core()) + + assert output == [ + "fake_rpc", # send_rpc_and_wait(flyer, "ramprate.set", default_ramp_rate) + "fake_set", # set(flyer, start_field=0.0) + "fake_rpc", # send_rpc_and_wait(flyer, "ramprate.set", ramp_rate=3.0) + "fake_set", # set(flyer, end_field=0.5) ← status captured here + "fake_set", # set(energy_motor, energy1=600.0) — inside loop + "fake_read", # read(flyer, energy_motor) — inside loop + "fake_rpc", # send_rpc_and_wait(flyer, "ramprate.set", default_ramp_rate) — after loop + ] + assert hyst_scan.num_pos == 1 + assert hyst_scan.point_id == 1 + + # Ramp-rate sequence: brake to default → set scan rate → restore default + assert mock_rpc.call_args_list[0] == mock.call( + field_motor, "ramprate.set", HystScan.default_ramp_rate + ) + assert mock_rpc.call_args_list[1] == mock.call(field_motor, "ramprate.set", 3.0) + assert mock_rpc.call_args_list[2] == mock.call( + field_motor, "ramprate.set", HystScan.default_ramp_rate + ) + + # Field motor: move to start position, then begin flying to end (non-blocking) + assert mock_set.call_args_list[0] == mock.call(device=field_motor, value=0.0) + assert mock_set.call_args_list[1] == mock.call(device=field_motor, value=0.5, wait=False) + # Energy motor steps to first energy position inside the loop + assert mock_set.call_args_list[2] == mock.call(device=mono, value=600.0) + + # Read issued once (one loop iteration), with flyer and energy motor + mock_read.assert_called_once_with(device=[field_motor, mono], point_id=0) + + # Progress message sent for the energy step + hyst_scan.connector.send_client_info.assert_called_once_with( + "Moving mono to 600.0.", source="scan_server", expire=10 + ) diff --git a/xtreme_bec/scans/__init__.py b/xtreme_bec/scans/__init__.py index e69de29..7df5747 100644 --- a/xtreme_bec/scans/__init__.py +++ b/xtreme_bec/scans/__init__.py @@ -0,0 +1,2 @@ +from .hyst_scan import HystScan +from .otf_scan import OTFScan diff --git a/xtreme_bec/scans/hyst_scan.py b/xtreme_bec/scans/hyst_scan.py new file mode 100644 index 0000000..0a2dee7 --- /dev/null +++ b/xtreme_bec/scans/hyst_scan.py @@ -0,0 +1,112 @@ +from __future__ import annotations + +from bec_lib.device import DeviceBase +from bec_lib.logger import bec_logger +from bec_server.scan_server.scans import ScanBase + +logger = bec_logger.logger + + +class HystScan(ScanBase): + scan_name = "hyst_scan" + required_kwargs = [] + scan_type = "step" + default_ramp_rate = 2 + + def __init__( + self, + field_motor: DeviceBase, + start_field: float, + end_field: float, + mono: DeviceBase, + energy1: float, + energy2: float, + ramp_rate: float = default_ramp_rate, + **kwargs, + ): + """ + A hysteresis scan that steps the energy between energy1 and energy2 while + ramping the field from start_field to end_field and back to start_field. + The field is ramped at ramp_rate T/min. + + scans.hyst_scan(field_motor, start_field, end_field, mono, energy1, energy2) + + Examples: + >>> scans.hyst_scan(dev.field_x, 0, 0.5, dev.mono, 600, 640, ramp_rate=2) + + """ + super().__init__(**kwargs) + self.axis = [] + self.flyer = field_motor + self.energy_motor = mono + self.scan_motors = [self.energy_motor, self.flyer] + self.flyer_positions = [start_field, end_field] + self.energy_positions = [energy1, energy2] + self._current_scan_motor_index = 0 + self._scan_motor_direction = 1 + self.ramp_rate = ramp_rate + + def prepare_positions(self): + self.positions = [[pos] for pos in self.energy_positions] + self.num_pos = 0 + yield None + self._check_limits() + + def _get_next_scan_motor_position(self): + positions = self.energy_positions + n = len(positions) + + if n == 1: + # Only one position, so just yield it indefinitely + while True: + yield positions[0] + + # Multiple positions, so yield them in a back-and-forth pattern + # For example, if positions = [600, 620, 640], yield: + # 600, 620, 640, 620, 600, 620, 640, 620, 600, ... + idx = 0 + direction = 1 + + while True: + yield positions[idx] + if idx == n - 1: + direction = -1 + elif idx == 0: + direction = 1 + + idx += direction + + def scan_core(self): + # yield from self._move_scan_motors_and_wait(self.positions[0]) + yield from self.stubs.send_rpc_and_wait(self.flyer, "ramprate.set", self.default_ramp_rate) + yield from self.stubs.set(device=self.flyer, value=self.flyer_positions[0]) + + yield from self.stubs.send_rpc_and_wait(self.flyer, "ramprate.set", self.ramp_rate) + + # send the slow motor on its way + status = yield from self.stubs.set( + device=self.flyer, value=self.flyer_positions[1], wait=False + ) + + pos_generator = self._get_next_scan_motor_position() + + dev = self.device_manager.devices + while not status.done: + val = next(pos_generator) + self.connector.send_client_info( + f"Moving mono to {val}.", source="scan_server", expire=10 + ) + yield from self.stubs.set(device=self.energy_motor, value=val) + + monitored_devices = [_dev.name for _dev in dev.monitored_devices([])] + yield from self.stubs.read( + device=[self.flyer, self.scan_motors[0], *monitored_devices], point_id=self.point_id + ) + # time.sleep(1) + self.point_id += 1 + self.num_pos += 1 + + yield from self.stubs.send_rpc_and_wait(self.flyer, "ramprate.set", self.default_ramp_rate) + + def move_to_start(self): + yield None diff --git a/xtreme_bec/scans/otf_scan.py b/xtreme_bec/scans/otf_scan.py new file mode 100644 index 0000000..eb72f91 --- /dev/null +++ b/xtreme_bec/scans/otf_scan.py @@ -0,0 +1,59 @@ +from __future__ import annotations + +import time + +from bec_lib.logger import bec_logger +from bec_server.scan_server.scans import SyncFlyScanBase + +logger = bec_logger.logger + + +class OTFScan(SyncFlyScanBase): + scan_name = "otf_scan" + required_kwargs = ["e1", "e2", "time"] + arg_input = {} + arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None} + + def __init__(self, *args, parameter: dict = None, **kwargs): + """Scans the energy from e1 to e2 in