443 lines
16 KiB
Python
443 lines
16 KiB
Python
from unittest import mock
|
|
|
|
import numpy as np
|
|
import pytest
|
|
from bec_lib import messages
|
|
from bec_server.device_server.tests.utils import DMMock
|
|
from bec_server.scan_server.errors import ScanAbortion
|
|
from bec_server.scan_server.tests.fixtures import (
|
|
DeviceMockType,
|
|
DMMock,
|
|
ScanStubStatusMock,
|
|
connector_mock,
|
|
instruction_handler_mock,
|
|
scan_assembler,
|
|
)
|
|
|
|
from csaxs_bec.scans.LamNIFermatScan import LamNIFermatScan
|
|
|
|
|
|
@pytest.fixture
|
|
def device_manager_mock():
|
|
device_manager = DMMock()
|
|
device_manager.add_device("lsamx")
|
|
device_manager.devices["lsamx"]._config["userParameter"] = {"center": 8.1}
|
|
device_manager.add_device("lsamy")
|
|
device_manager.devices["lsamy"]._config["userParameter"] = {"center": 10}
|
|
device_manager.add_device("samx")
|
|
device_manager.devices["samx"].read_buffer = {"value": 0}
|
|
device_manager.add_device("samy")
|
|
device_manager.devices["samy"].read_buffer = {"value": 0}
|
|
device_manager.add_device("bpm4i", dev_type=DeviceMockType.SIGNAL, readout_priority="monitored")
|
|
yield device_manager
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"scan_msg,reference_scan_list",
|
|
[
|
|
(
|
|
messages.ScanQueueMessage(
|
|
scan_type="lamni_fermat_scan",
|
|
parameter={
|
|
"args": {},
|
|
"kwargs": {
|
|
"fov_size": [5],
|
|
"exp_time": 0.1,
|
|
"step": 2,
|
|
"angle": 10,
|
|
"scan_type": "step",
|
|
},
|
|
},
|
|
queue="primary",
|
|
metadata={"RID": "1234"},
|
|
),
|
|
[
|
|
messages.DeviceInstructionMessage(
|
|
metadata={
|
|
"readout_priority": "monitored",
|
|
"RID": "1234",
|
|
"device_instr_id": "diid",
|
|
},
|
|
device=["rtx", "rty"],
|
|
action="read",
|
|
parameter={},
|
|
),
|
|
messages.DeviceInstructionMessage(
|
|
metadata={
|
|
"readout_priority": "monitored",
|
|
"RID": "1234",
|
|
"device_instr_id": "diid",
|
|
},
|
|
device="rtx",
|
|
action="rpc",
|
|
parameter={
|
|
"device": "rtx",
|
|
"func": "controller.clear_trajectory_generator",
|
|
"rpc_id": "rpc_id",
|
|
"args": (),
|
|
"kwargs": {},
|
|
},
|
|
),
|
|
messages.DeviceInstructionMessage(
|
|
metadata={
|
|
"readout_priority": "monitored",
|
|
"RID": "1234",
|
|
"device_instr_id": "diid",
|
|
},
|
|
device="lsamrot",
|
|
action="rpc",
|
|
parameter={
|
|
"device": "lsamrot",
|
|
"func": "user_setpoint.get",
|
|
"rpc_id": "rpc_id",
|
|
"args": (),
|
|
"kwargs": {},
|
|
},
|
|
),
|
|
messages.DeviceInstructionMessage(
|
|
metadata={"readout_priority": "monitored", "RID": "1234"},
|
|
device=None,
|
|
action="scan_report_instruction",
|
|
parameter={
|
|
"readback": {
|
|
"RID": "1234",
|
|
"devices": ["lsamrot"],
|
|
"start": [0],
|
|
"end": [10],
|
|
}
|
|
},
|
|
),
|
|
messages.DeviceInstructionMessage(
|
|
metadata={
|
|
"readout_priority": "monitored",
|
|
"RID": "1234",
|
|
"device_instr_id": "diid",
|
|
},
|
|
device="lsamrot",
|
|
action="set",
|
|
parameter={"value": 10},
|
|
),
|
|
messages.DeviceInstructionMessage(
|
|
metadata={
|
|
"readout_priority": "monitored",
|
|
"RID": "1234",
|
|
"device_instr_id": "diid",
|
|
},
|
|
device="rtx",
|
|
action="rpc",
|
|
parameter={
|
|
"device": "rtx",
|
|
"func": "controller.feedback_disable",
|
|
"rpc_id": "rpc_id",
|
|
"args": (),
|
|
"kwargs": {},
|
|
},
|
|
),
|
|
messages.DeviceInstructionMessage(
|
|
metadata={
|
|
"readout_priority": "monitored",
|
|
"RID": "1234",
|
|
"device_instr_id": "diid",
|
|
},
|
|
device="rtx",
|
|
action="rpc",
|
|
parameter={
|
|
"device": "rtx",
|
|
"func": "readback.get",
|
|
"rpc_id": "rpc_id",
|
|
"args": (),
|
|
"kwargs": {},
|
|
},
|
|
),
|
|
messages.DeviceInstructionMessage(
|
|
metadata={
|
|
"readout_priority": "monitored",
|
|
"RID": "1234",
|
|
"device_instr_id": "diid",
|
|
},
|
|
device="rty",
|
|
action="rpc",
|
|
parameter={
|
|
"device": "rty",
|
|
"func": "readback.get",
|
|
"rpc_id": "rpc_id",
|
|
"args": (),
|
|
"kwargs": {},
|
|
},
|
|
),
|
|
messages.DeviceInstructionMessage(
|
|
metadata={
|
|
"readout_priority": "monitored",
|
|
"RID": "1234",
|
|
"device_instr_id": "diid",
|
|
},
|
|
device="lsamx",
|
|
action="rpc",
|
|
parameter={
|
|
"device": "lsamx",
|
|
"func": "readback.get",
|
|
"rpc_id": "rpc_id",
|
|
"args": (),
|
|
"kwargs": {},
|
|
},
|
|
),
|
|
messages.DeviceInstructionMessage(
|
|
metadata={
|
|
"readout_priority": "monitored",
|
|
"RID": "1234",
|
|
"device_instr_id": "diid",
|
|
},
|
|
device="lsamy",
|
|
action="rpc",
|
|
parameter={
|
|
"device": "lsamy",
|
|
"func": "readback.get",
|
|
"rpc_id": "rpc_id",
|
|
"args": (),
|
|
"kwargs": {},
|
|
},
|
|
),
|
|
messages.DeviceInstructionMessage(
|
|
metadata={
|
|
"readout_priority": "monitored",
|
|
"RID": "1234",
|
|
"device_instr_id": "diid",
|
|
},
|
|
device="rtx",
|
|
action="rpc",
|
|
parameter={
|
|
"device": "rtx",
|
|
"func": "readback.get",
|
|
"rpc_id": "rpc_id",
|
|
"args": (),
|
|
"kwargs": {},
|
|
},
|
|
),
|
|
messages.DeviceInstructionMessage(
|
|
metadata={
|
|
"readout_priority": "monitored",
|
|
"RID": "1234",
|
|
"device_instr_id": "diid",
|
|
},
|
|
device="rty",
|
|
action="rpc",
|
|
parameter={
|
|
"device": "rty",
|
|
"func": "readback.get",
|
|
"rpc_id": "rpc_id",
|
|
"args": (),
|
|
"kwargs": {},
|
|
},
|
|
),
|
|
messages.DeviceInstructionMessage(
|
|
metadata={
|
|
"readout_priority": "monitored",
|
|
"RID": "1234",
|
|
"device_instr_id": "diid",
|
|
},
|
|
device="rtx",
|
|
action="rpc",
|
|
parameter={
|
|
"device": "rtx",
|
|
"func": "controller.feedback_enable_without_reset",
|
|
"rpc_id": "rpc_id",
|
|
"args": (),
|
|
"kwargs": {},
|
|
},
|
|
),
|
|
messages.DeviceInstructionMessage(
|
|
metadata={"readout_priority": "monitored", "RID": "1234"},
|
|
device=None,
|
|
action="open_scan",
|
|
parameter={
|
|
"scan_motors": ["rtx", "rty"],
|
|
"readout_priority": {
|
|
"monitored": [],
|
|
"baseline": [],
|
|
"on_request": [],
|
|
"async": [],
|
|
},
|
|
"num_points": 2,
|
|
"positions": [
|
|
[1.3681828686580249, 2.1508313829565293],
|
|
[-0.7700589354581364, -0.8406005210092851],
|
|
],
|
|
"scan_name": "lamni_fermat_scan",
|
|
"scan_type": "step",
|
|
},
|
|
),
|
|
messages.DeviceInstructionMessage(
|
|
metadata={"device_instr_id": "diid"},
|
|
device=["bpm4i", "lsamx", "lsamy", "samx", "samy"],
|
|
action="stage",
|
|
parameter={},
|
|
),
|
|
messages.DeviceInstructionMessage(
|
|
metadata={
|
|
"readout_priority": "baseline",
|
|
"RID": "1234",
|
|
"device_instr_id": "diid",
|
|
},
|
|
device=["lsamx", "lsamy", "samx", "samy"],
|
|
action="read",
|
|
parameter={},
|
|
),
|
|
messages.DeviceInstructionMessage(
|
|
metadata={
|
|
"readout_priority": "monitored",
|
|
"RID": "1234",
|
|
"device_instr_id": "diid",
|
|
},
|
|
device="rtx",
|
|
action="set",
|
|
parameter={"value": 1.3681828686580249},
|
|
),
|
|
messages.DeviceInstructionMessage(
|
|
metadata={
|
|
"readout_priority": "monitored",
|
|
"RID": "1234",
|
|
"device_instr_id": "diid",
|
|
},
|
|
device="rty",
|
|
action="set",
|
|
parameter={"value": 2.1508313829565293},
|
|
),
|
|
None,
|
|
messages.DeviceInstructionMessage(
|
|
metadata={
|
|
"readout_priority": "monitored",
|
|
"RID": "1234",
|
|
"point_id": 0,
|
|
"device_instr_id": "diid",
|
|
},
|
|
device=["bpm4i"],
|
|
action="read",
|
|
parameter={"group": "monitored"},
|
|
),
|
|
messages.DeviceInstructionMessage(
|
|
metadata={
|
|
"readout_priority": "monitored",
|
|
"RID": "1234",
|
|
"device_instr_id": "diid",
|
|
},
|
|
device="rtx",
|
|
action="set",
|
|
parameter={"value": -0.7700589354581364},
|
|
),
|
|
messages.DeviceInstructionMessage(
|
|
metadata={
|
|
"readout_priority": "monitored",
|
|
"RID": "1234",
|
|
"device_instr_id": "diid",
|
|
},
|
|
device="rty",
|
|
action="set",
|
|
parameter={"value": -0.8406005210092851},
|
|
),
|
|
None,
|
|
messages.DeviceInstructionMessage(
|
|
metadata={
|
|
"readout_priority": "monitored",
|
|
"RID": "1234",
|
|
"point_id": 1,
|
|
"device_instr_id": "diid",
|
|
},
|
|
device=["bpm4i"],
|
|
action="read",
|
|
parameter={"group": "monitored"},
|
|
),
|
|
messages.DeviceInstructionMessage(
|
|
metadata={
|
|
"readout_priority": "monitored",
|
|
"RID": "1234",
|
|
"device_instr_id": "diid",
|
|
},
|
|
device=["bpm4i", "lsamx", "lsamy", "samx", "samy"],
|
|
action="complete",
|
|
parameter={},
|
|
),
|
|
messages.DeviceInstructionMessage(
|
|
metadata={"device_instr_id": "diid"},
|
|
device=["bpm4i", "lsamx", "lsamy", "samx", "samy"],
|
|
action="unstage",
|
|
parameter={},
|
|
),
|
|
messages.DeviceInstructionMessage(
|
|
metadata={"readout_priority": "monitored", "RID": "1234"},
|
|
device=None,
|
|
action="close_scan",
|
|
parameter={},
|
|
),
|
|
],
|
|
)
|
|
],
|
|
)
|
|
def test_LamNIFermatScan(scan_msg, reference_scan_list, scan_assembler):
|
|
scan = scan_assembler(
|
|
LamNIFermatScan,
|
|
parameter=scan_msg.content.get("parameter"),
|
|
metadata=scan_msg.metadata,
|
|
**scan_msg.content["parameter"]["kwargs"],
|
|
)
|
|
|
|
with mock.patch.object(scan.stubs, "_get_result_from_status", return_value=0):
|
|
with mock.patch.object(scan, "_check_min_positions") as check_min_pos:
|
|
scan_instructions = list(scan.run())
|
|
check_min_pos.assert_called_once()
|
|
|
|
for ii, instr in enumerate(scan_instructions):
|
|
if instr is None:
|
|
continue
|
|
if instr.metadata.get("scan_id") is not None:
|
|
instr.metadata["scan_id"] = "scan_id"
|
|
if instr.metadata.get("RID") is not None:
|
|
instr.metadata["RID"] = scan.metadata.get("RID")
|
|
if instr.metadata.get("device_instr_id") is not None:
|
|
instr.metadata["device_instr_id"] = "diid"
|
|
if instr.content["action"] == "rpc":
|
|
instr.content["parameter"]["rpc_id"] = "rpc_id"
|
|
if instr.content["parameter"].get("value"):
|
|
assert np.isclose(
|
|
instr.content["parameter"].get("value"),
|
|
reference_scan_list[ii].content["parameter"].get("value"),
|
|
)
|
|
instr.content["parameter"]["value"] = reference_scan_list[ii].content["parameter"][
|
|
"value"
|
|
]
|
|
if instr.content["parameter"].get("positions"):
|
|
assert np.isclose(
|
|
instr.content["parameter"].get("positions"),
|
|
reference_scan_list[ii].content["parameter"].get("positions"),
|
|
).all()
|
|
instr.content["parameter"]["positions"] = reference_scan_list[ii].content[
|
|
"parameter"
|
|
]["positions"]
|
|
assert scan_instructions == reference_scan_list
|
|
|
|
|
|
def test_LamNIFermatScan_min_positions(scan_assembler):
|
|
scan_msg = messages.ScanQueueMessage(
|
|
scan_type="lamni_fermat_scan",
|
|
parameter={
|
|
"args": {},
|
|
"kwargs": {
|
|
"fov_size": [5],
|
|
"exp_time": 0.1,
|
|
"step": 2,
|
|
"angle": 10,
|
|
"scan_type": "step",
|
|
},
|
|
},
|
|
queue="primary",
|
|
metadata={"RID": "1234"},
|
|
)
|
|
|
|
scan = scan_assembler(
|
|
LamNIFermatScan,
|
|
parameter=scan_msg.content.get("parameter"),
|
|
metadata=scan_msg.metadata,
|
|
**scan_msg.content["parameter"]["kwargs"],
|
|
)
|
|
with pytest.raises(ScanAbortion):
|
|
instructions = list(scan.run())
|