refactor(scans): moved fixtures to scan server module

This commit is contained in:
2024-11-10 12:08:49 +01:00
parent ebf12802e7
commit d2a738b8c2
5 changed files with 130 additions and 163 deletions
+1 -3
View File
@@ -1300,9 +1300,7 @@ class ContLineFlyScan(AsyncFlyScanBase):
yield from self.stubs.read(group="primary", point_id=self.point_id)
self.point_id += 1
def finalize(self):
yield from super().finalize()
self.num_pos = self.point_id + 1
self.num_pos = self.point_id
class RoundScanFlySim(SyncFlyScanBase):
@@ -1,7 +1,13 @@
from functools import partial
from unittest import mock
import pytest
from bec_lib.logger import bec_logger
from bec_lib.tests.fixtures import dm_with_devices
from bec_lib.tests.utils import ConnectorMock
from bec_server.device_server.tests.utils import DeviceMockType, DMMock
from bec_server.scan_server.instruction_handler import InstructionHandler
from bec_server.scan_server.tests.utils import ScanServerMock
@@ -12,3 +18,59 @@ def scan_server_mock(dm_with_devices):
server = ScanServerMock(dm_with_devices)
yield server
bec_logger.logger.remove()
@pytest.fixture
def connector_mock():
connector = ConnectorMock("")
yield connector
@pytest.fixture
def device_manager_mock():
device_manager = DMMock()
device_manager.add_device("rtx")
device_manager.add_device("samx")
device_manager.add_device("samy")
device_manager.add_device("samz")
device_manager.add_device(
"eiger", dev_type=DeviceMockType.SIGNAL, readout_priority="monitored", software_trigger=True
)
device_manager.add_device("bpm4i", dev_type=DeviceMockType.SIGNAL, readout_priority="monitored")
yield device_manager
@pytest.fixture
def instruction_handler_mock(connector_mock):
instruction_handler = InstructionHandler(connector_mock)
with mock.patch("bec_server.scan_server.scan_stubs.ScanStubStatus.wait", return_value=None):
yield instruction_handler
class _ScanStubStatusMock:
def __init__(self, done_func) -> None:
self._done = done_func()
@property
def done(self):
return next(self._done)
def wait(self):
return
@pytest.fixture
def ScanStubStatusMock():
return _ScanStubStatusMock
@pytest.fixture
def scan_assembler(instruction_handler_mock, device_manager_mock):
def _assemble_scan(scan_class, *args, **kwargs):
return scan_class(*args, **kwargs)
return partial(
_assemble_scan,
instruction_handler=instruction_handler_mock,
device_manager=device_manager_mock,
)
@@ -27,20 +27,3 @@ def connected_connector():
yield connector
finally:
connector.shutdown()
class _ScanStubStatusMock:
def __init__(self, done_func) -> None:
self._done = done_func()
@property
def done(self):
return next(self._done)
def wait(self):
return
@pytest.fixture
def ScanStubStatusMock():
return _ScanStubStatusMock
@@ -9,6 +9,7 @@ from bec_server.device_server.tests.utils import DMMock
from bec_server.scan_server.errors import DeviceMessageError
from bec_server.scan_server.instruction_handler import InstructionHandler
from bec_server.scan_server.scan_stubs import ScanStubs
from bec_server.scan_server.tests.fixtures import ScanStubStatusMock
@pytest.fixture
+66 -143
View File
@@ -6,8 +6,7 @@ import numpy as np
import pytest
from bec_lib import messages
from bec_lib.tests.utils import ConnectorMock
from bec_server.device_server.tests.utils import DeviceMockType, DMMock
from bec_server.device_server.tests.utils import DMMock
from bec_server.scan_server.instruction_handler import InstructionHandler
from bec_server.scan_server.scan_plugins.otf_scan import OTFScan
from bec_server.scan_server.scans import (
@@ -39,37 +38,13 @@ from bec_server.scan_server.scans import (
unpack_scan_args,
)
# the following imports are fixtures that are used in the tests
from bec_server.scan_server.tests.fixtures import *
# pylint: disable=missing-function-docstring
# pylint: disable=protected-access
@pytest.fixture
def connector_mock():
connector = ConnectorMock("")
yield connector
@pytest.fixture
def device_manager_mock():
device_manager = DMMock()
device_manager.add_device("rtx")
device_manager.add_device("samx")
device_manager.add_device("samy")
device_manager.add_device("samz")
device_manager.add_device(
"eiger", dev_type=DeviceMockType.SIGNAL, readout_priority="monitored", software_trigger=True
)
device_manager.add_device("bpm4i", dev_type=DeviceMockType.SIGNAL, readout_priority="monitored")
yield device_manager
@pytest.fixture
def instruction_handler_mock(connector_mock):
instruction_handler = InstructionHandler(connector_mock)
with mock.patch("bec_server.scan_server.scan_stubs.ScanStubStatus.wait", return_value=None):
yield instruction_handler
def test_unpack_scan_args_empty_dict():
scan_args = {}
expected_args = []
@@ -153,16 +128,13 @@ def test_unpack_scan_args_valid_input():
),
],
)
def test_scan_move(mv_msg, reference_msg_list, device_manager_mock, instruction_handler_mock):
def test_scan_move(mv_msg, reference_msg_list, scan_assembler):
def offset_mock():
yield None
s = Move(
parameter=mv_msg.content.get("parameter"),
device_manager=device_manager_mock,
instruction_handler=instruction_handler_mock,
)
s = scan_assembler(Move, parameter=mv_msg.content.get("parameter"))
s._set_position_offset = offset_mock
msg_list_reference = []
for msg in list(s.run()):
@@ -312,16 +284,11 @@ def test_scan_move(mv_msg, reference_msg_list, device_manager_mock, instruction_
),
],
)
def test_scan_updated_move(
mv_msg, reference_msg_list, device_manager_mock, instruction_handler_mock, ScanStubStatusMock
):
def test_scan_updated_move(mv_msg, reference_msg_list, scan_assembler, ScanStubStatusMock):
msg_list = []
s = UpdatedMove(
parameter=mv_msg.content.get("parameter"),
device_manager=device_manager_mock,
instruction_handler=instruction_handler_mock,
metadata=mv_msg.metadata,
s = scan_assembler(
UpdatedMove, parameter=mv_msg.content.get("parameter"), metadata=mv_msg.metadata
)
with mock.patch.object(s.stubs, "_get_result_from_status") as mock_get_from_rpc:
@@ -482,7 +449,7 @@ def test_scan_updated_move(
)
],
)
def test_scan_scan(scan_msg, reference_scan_list, device_manager_mock, instruction_handler_mock):
def test_scan_scan(scan_msg, reference_scan_list, scan_assembler):
device_manager = DMMock()
device_manager.add_device("samx")
device_manager.devices["samx"].readback.put(0)
@@ -491,11 +458,8 @@ def test_scan_scan(scan_msg, reference_scan_list, device_manager_mock, instructi
def offset_mock():
yield None
scan = Scan(
parameter=scan_msg.content.get("parameter"),
device_manager=device_manager_mock,
instruction_handler=instruction_handler_mock,
)
scan = scan_assembler(Scan, parameter=scan_msg.content.get("parameter"))
scan._set_position_offset = offset_mock
for step in scan.run():
if step:
@@ -554,16 +518,12 @@ def test_scan_scan(scan_msg, reference_scan_list, device_manager_mock, instructi
),
],
)
def test_fermat_scan(scan_msg, reference_scan_list):
device_manager = DMMock()
device_manager.add_device("samx")
device_manager.devices["samx"].readback.put(0)
device_manager.add_device("samy")
device_manager.devices["samy"].readback.put(0)
def test_fermat_scan(scan_msg, reference_scan_list, scan_assembler):
args = unpack_scan_args(scan_msg.content.get("parameter").get("args"))
kwargs = scan_msg.content.get("parameter").get("kwargs")
scan = FermatSpiralScan(
*args, parameter=scan_msg.content.get("parameter"), device_manager=device_manager, **kwargs
scan = scan_assembler(
FermatSpiralScan, *args, parameter=scan_msg.content.get("parameter"), **kwargs
)
def offset_mock():
@@ -751,18 +711,11 @@ def test_fermat_scan(scan_msg, reference_scan_list):
)
],
)
def test_cont_line_scan(
scan_msg, reference_scan_list, device_manager_mock, instruction_handler_mock
):
def test_cont_line_scan(scan_msg, reference_scan_list, scan_assembler, device_manager_mock):
args = unpack_scan_args(scan_msg.content["parameter"]["args"])
kwargs = scan_msg.content["parameter"]["kwargs"]
request = ContLineScan(
*args,
device_manager=device_manager_mock,
parameter=scan_msg.content["parameter"],
instruction_handler=instruction_handler_mock,
**kwargs,
)
request = scan_assembler(ContLineScan, *args, parameter=scan_msg.content["parameter"], **kwargs)
readback = collections.deque()
readback.extend(
@@ -808,7 +761,7 @@ def test_cont_line_scan(
assert msg_list == reference_scan_list
def test_device_rpc(device_manager_mock, instruction_handler_mock):
def test_device_rpc(scan_assembler):
parameter = {
"device": "samx",
"rpc_id": "baf7c4c0-4948-4046-8fc5-ad1e9d188c10",
@@ -816,12 +769,8 @@ def test_device_rpc(device_manager_mock, instruction_handler_mock):
"args": [],
"kwargs": {},
}
scan = scan_assembler(DeviceRPC, parameter=parameter)
scan = DeviceRPC(
parameter=parameter,
device_manager=device_manager_mock,
instruction_handler=instruction_handler_mock,
)
scan_instructions = list(scan.run())
for ii, _ in enumerate(scan_instructions):
scan_instructions[ii].metadata.pop("device_instr_id", None)
@@ -916,11 +865,10 @@ def test_device_rpc(device_manager_mock, instruction_handler_mock):
)
],
)
def test_acquire(scan_msg, reference_scan_list, device_manager_mock, instruction_handler_mock):
def test_acquire(scan_msg, reference_scan_list, scan_assembler):
scan = scan_assembler(Acquire, parameter=scan_msg.content.get("parameter"))
scan = Acquire(
exp_time=1, device_manager=device_manager_mock, instruction_handler=instruction_handler_mock
)
scan_instructions = list(scan.run())
scan_uid = scan_instructions[0].metadata.get("scan_id")
for ii, _ in enumerate(reference_scan_list):
@@ -1328,24 +1276,22 @@ def test_round_scan_fly_sim_calculate_positions(in_args, reference_positions):
@pytest.mark.parametrize(
"in_args,reference_positions", [((1, 5, 1, 1), [[0, -3], [0, -7], [0, 7]])]
)
def test_round_scan_fly_sim_scan_core(
in_args, reference_positions, device_manager_mock, instruction_handler_mock
):
def test_round_scan_fly_sim_scan_core(in_args, reference_positions, scan_assembler):
scan_msg = messages.ScanQueueMessage(
scan_type="round_scan_fly",
parameter={"args": {"samx": in_args}, "kwargs": {"realtive": True}},
queue="primary",
)
request = RoundScanFlySim(
request = scan_assembler(
RoundScanFlySim,
flyer="samx",
inner_ring=in_args[0],
outer_ring=in_args[1],
number_of_rings=in_args[2],
number_pos=in_args[3],
device_manager=device_manager_mock,
instruction_handler=instruction_handler_mock,
parameter=scan_msg.content["parameter"],
)
request.positions = np.array(reference_positions)
ret = next(request.scan_core())
@@ -1552,13 +1498,12 @@ def test_list_scan_raises_for_different_lengths():
)
],
)
def test_time_scan(scan_msg, reference_scan_list, device_manager_mock, instruction_handler_mock):
request = TimeScan(
device_manager=device_manager_mock,
instruction_handler=instruction_handler_mock,
parameter=scan_msg.content["parameter"],
**scan_msg.content["parameter"]["kwargs"],
def test_time_scan(scan_msg, reference_scan_list, scan_assembler):
request = scan_assembler(
TimeScan, parameter=scan_msg.content["parameter"], **scan_msg.content["parameter"]["kwargs"]
)
scan_instructions = list(request.run())
for msg in scan_instructions:
if msg:
@@ -1647,14 +1592,10 @@ def test_time_scan(scan_msg, reference_scan_list, device_manager_mock, instructi
)
],
)
def test_otf_scan(
scan_msg, reference_scan_list, device_manager_mock, instruction_handler_mock, ScanStubStatusMock
):
request = OTFScan(
device_manager=device_manager_mock,
instruction_handler=instruction_handler_mock,
parameter=scan_msg.content["parameter"],
metadata=scan_msg.metadata,
def test_otf_scan(scan_msg, reference_scan_list, ScanStubStatusMock, scan_assembler):
request = scan_assembler(
OTFScan, parameter=scan_msg.content["parameter"], metadata=scan_msg.metadata
)
def fake_done():
@@ -1692,7 +1633,7 @@ def test_monitor_scan():
assert np.isclose(request.positions, [[-5], [5]]).all()
def test_monitor_scan_run(device_manager_mock, instruction_handler_mock, ScanStubStatusMock):
def test_monitor_scan_run(scan_assembler, ScanStubStatusMock):
scan_msg = messages.ScanQueueMessage(
scan_type="monitor_scan",
parameter={"args": {"samx": [-5, 5]}, "kwargs": {"relative": True, "exp_time": 0.1}},
@@ -1700,13 +1641,8 @@ def test_monitor_scan_run(device_manager_mock, instruction_handler_mock, ScanStu
)
args = unpack_scan_args(scan_msg.content["parameter"]["args"])
kwargs = scan_msg.content["parameter"]["kwargs"]
request = MonitorScan(
*args,
device_manager=device_manager_mock,
instruction_handler=instruction_handler_mock,
parameter=scan_msg.content["parameter"],
**scan_msg.content["parameter"]["kwargs"],
)
request = scan_assembler(MonitorScan, *args, parameter=scan_msg.content["parameter"], **kwargs)
def fake_done():
yield False
@@ -1798,20 +1734,21 @@ def test_monitor_scan_run(device_manager_mock, instruction_handler_mock, ScanStu
]
def test_OpenInteractiveScan(device_manager_mock, instruction_handler_mock):
def test_OpenInteractiveScan(scan_assembler):
scan_msg = messages.ScanQueueMessage(
scan_type="open_interactive_scan",
parameter={"args": {"samx": []}, "kwargs": {"relative": True, "exp_time": 0.1}},
queue="primary",
)
args = unpack_scan_args(scan_msg.content["parameter"]["args"])
request = OpenInteractiveScan(
request = scan_assembler(
OpenInteractiveScan,
*args,
device_manager=device_manager_mock,
instruction_handler=instruction_handler_mock,
parameter=scan_msg.content["parameter"],
**scan_msg.content["parameter"]["kwargs"],
)
ref_list = list(request.run())
for msg in ref_list:
msg.metadata.pop("device_instr_id", None)
@@ -1855,7 +1792,7 @@ def test_OpenInteractiveScan(device_manager_mock, instruction_handler_mock):
]
def test_InteractiveReadMontiored(device_manager_mock, instruction_handler_mock):
def test_InteractiveReadMontiored(scan_assembler):
scan_msg = messages.ScanQueueMessage(
scan_type="_interactive_scan_trigger",
parameter={"args": {"samx": []}, "kwargs": {"relative": True, "exp_time": 0.1}},
@@ -1863,13 +1800,10 @@ def test_InteractiveReadMontiored(device_manager_mock, instruction_handler_mock)
)
args = unpack_scan_args(scan_msg.content["parameter"]["args"])
kwargs = scan_msg.content["parameter"]["kwargs"]
request = InteractiveReadMontiored(
*args,
device_manager=device_manager_mock,
instruction_handler=instruction_handler_mock,
parameter=scan_msg.content["parameter"],
**kwargs,
request = scan_assembler(
InteractiveReadMontiored, *args, parameter=scan_msg.content["parameter"], **kwargs
)
ref_list = list(request.run())
ref_list[0].metadata.pop("device_instr_id", None)
ref_list[0].device = sorted(ref_list[0].device)
@@ -1883,7 +1817,7 @@ def test_InteractiveReadMontiored(device_manager_mock, instruction_handler_mock)
]
def test_InteractiveTrigger(device_manager_mock, instruction_handler_mock):
def test_InteractiveTrigger(scan_assembler):
scan_msg = messages.ScanQueueMessage(
scan_type="_interactive_scan_trigger",
parameter={"args": {"samx": []}, "kwargs": {"relative": True, "exp_time": 0.1}},
@@ -1891,13 +1825,11 @@ def test_InteractiveTrigger(device_manager_mock, instruction_handler_mock):
)
args = unpack_scan_args(scan_msg.content["parameter"]["args"])
kwargs = scan_msg.content["parameter"]["kwargs"]
request = InteractiveTrigger(
*args,
device_manager=device_manager_mock,
instruction_handler=instruction_handler_mock,
parameter=scan_msg.content["parameter"],
**kwargs,
request = scan_assembler(
InteractiveTrigger, *args, parameter=scan_msg.content["parameter"], **kwargs
)
ref_list = list(request.run())
ref_list[0].metadata.pop("device_instr_id", None)
ref_list[0].device = sorted(ref_list[0].device)
@@ -1911,7 +1843,7 @@ def test_InteractiveTrigger(device_manager_mock, instruction_handler_mock):
]
def test_CloseInteractiveScan(device_manager_mock, instruction_handler_mock):
def test_CloseInteractiveScan(scan_assembler):
scan_msg = messages.ScanQueueMessage(
scan_type="close_interactive_scan",
parameter={"args": {"samx": []}, "kwargs": {"relative": True, "exp_time": 0.1}},
@@ -1919,13 +1851,10 @@ def test_CloseInteractiveScan(device_manager_mock, instruction_handler_mock):
)
args = unpack_scan_args(scan_msg.content["parameter"]["args"])
kwargs = scan_msg.content["parameter"]["kwargs"]
request = CloseInteractiveScan(
*args,
device_manager=device_manager_mock,
instruction_handler=instruction_handler_mock,
parameter=scan_msg.content["parameter"],
**scan_msg.content["parameter"]["kwargs"],
request = scan_assembler(
CloseInteractiveScan, *args, parameter=scan_msg.content["parameter"], **kwargs
)
request.start_pos = [0]
ref_list = list(request.run())
for ii, _ in enumerate(ref_list):
@@ -1967,7 +1896,7 @@ def test_CloseInteractiveScan(device_manager_mock, instruction_handler_mock):
]
def test_RoundScan(device_manager_mock, instruction_handler_mock):
def test_RoundScan(scan_assembler):
scan_msg = messages.ScanQueueMessage(
scan_type="round_scan",
parameter={
@@ -1977,10 +1906,10 @@ def test_RoundScan(device_manager_mock, instruction_handler_mock):
queue="primary",
)
args = unpack_scan_args(scan_msg.content["parameter"]["args"])
request = RoundScan(
request = scan_assembler(
RoundScan,
*args,
device_manager=device_manager_mock,
instruction_handler=instruction_handler_mock,
parameter=scan_msg.content["parameter"],
**scan_msg.content["parameter"]["kwargs"],
)
@@ -1991,15 +1920,9 @@ def test_RoundScan(device_manager_mock, instruction_handler_mock):
assert len(ref) == 47
def test_ContLineFlyScan(device_manager_mock, instruction_handler_mock, ScanStubStatusMock):
request = ContLineFlyScan(
motor="samx",
start=0,
stop=5,
relative=False,
device_manager=device_manager_mock,
instruction_handler=instruction_handler_mock,
)
def test_ContLineFlyScan(scan_assembler, ScanStubStatusMock):
request = scan_assembler(ContLineFlyScan, motor="samx", start=0, stop=5, relative=False)
def fake_done():
yield False