fix(scans): fixes for bec v3

This commit is contained in:
2024-11-08 10:28:28 +01:00
parent 42ca7ed9a4
commit 7061aaf450
3 changed files with 461 additions and 588 deletions
+12 -24
View File
@@ -32,7 +32,7 @@ class XASSimpleScan(AsyncFlyScanBase):
):
"""The xas_simple_scan is used to start a simple oscillating scan on the mono bragg motor.
Start and Stop define the energy range for the scan, scan_time is the time for one scan
cycle and scan_duration is the duration of the scan. If scan duration is set to 0, the
cycle and scan_duration is the duration of the scan. If scan duration is set to 0, the
scan will run infinitely.
Args:
@@ -84,26 +84,12 @@ class XASSimpleScan(AsyncFlyScanBase):
Kickoff the oscillation on the Bragg motor and wait for the completion of the motion.
"""
# Start the oscillation on the Bragg motor.
yield from self.stubs.kickoff(device=self.motor, wait_group="kickoff")
yield from self.stubs.wait(wait_type="move", device=self.motor, wait_group="kickoff")
yield from self.stubs.complete(device=self.motor)
yield from self.stubs.kickoff(device=self.motor)
complete_status = yield from self.stubs.complete(device=self.motor, wait=False)
# Get the target DIID (instruction number) for the stubs.complete call
target_diid = self.DIID - 1
while True:
while not complete_status.done:
# Readout monitored devices
yield from self.stubs.read_and_wait(
group="primary",
wait_group="readout_primary",
point_id=self.point_id,
)
# Check if complete call on Mo1 Bragg has been finished
status = self.stubs.get_req_status(
device=self.motor, RID=self.metadata["RID"], DIID=target_diid
)
if status:
break
yield from self.stubs.read(group="primary", point_id=self.point_id)
time.sleep(self.primary_readout_cycle)
self.point_id += 1
@@ -159,7 +145,7 @@ class XASSimpleScanWithXRD(XASSimpleScan):
every nth cycle for high
motor (DeviceBase, optional): Motor device to be used for the scan.
Defaults to "mo1_bragg".
Examples:
>>> scans.xas_simple_scan_with_xrd(start=8000, stop=9000, scan_time=1, scan_duration=10, xrd_enable_low=True, num_trigger_low=5, cycle_low=2, exp_time_low=100, xrd_enable_high=False, num_trigger_high=3, cycle_high=1, exp_time_high=1000)
"""
@@ -180,6 +166,7 @@ class XASSimpleScanWithXRD(XASSimpleScan):
self.exp_time_high = exp_time_high
self.cycle_high = cycle_high
class XASAdvancedScan(XASSimpleScan):
"""Class for the XAS advanced scan"""
@@ -217,10 +204,10 @@ class XASAdvancedScan(XASSimpleScan):
e_kink (float): Energy of the kink.
motor (DeviceBase, optional): Motor device to be used for the scan.
Defaults to "mo1_bragg".
Examples:
>>> scans.xas_advanced_scan(start=10000, stop=12000, scan_time=0.5, scan_duration=10, p_kink=50, e_kink=10500)
"""
"""
super().__init__(
start=start,
stop=stop,
@@ -232,6 +219,7 @@ class XASAdvancedScan(XASSimpleScan):
self.p_kink = p_kink
self.e_kink = e_kink
class XASAdvancedScanWithXRD(XASAdvancedScan):
"""Class for the XAS advanced scan with XRD"""
@@ -289,10 +277,10 @@ class XASAdvancedScanWithXRD(XASAdvancedScan):
every nth cycle for high
motor (DeviceBase, optional): Motor device to be used for the scan.
Defaults to "mo1_bragg".
Examples:
>>> scans.xas_advanced_scan_with_xrd(start=10000, stop=12000, scan_time=0.5, scan_duration=10, p_kink=50, e_kink=10500, xrd_enable_low=True, num_trigger_low=5, cycle_low=2, exp_time_low=100, xrd_enable_high=False, num_trigger_high=3, cycle_high=1, exp_time_high=1000)
"""
"""
super().__init__(
start=start,
stop=stop,
+64
View File
@@ -0,0 +1,64 @@
# pylint: skip-file
from functools import partial
from unittest import mock
import pytest
from bec_lib.tests.utils import ConnectorMock
from bec_server.device_server.tests.utils import DeviceMockType, DMMock
from bec_server.scan_server.instruction_handler import InstructionHandler
@pytest.fixture
def device_manager_mock():
device_manager = DMMock()
device_manager.add_device(
"mo1_bragg", dev_type=DeviceMockType.POSITIONER, readout_priority="monitored"
)
device_manager.add_device("samx")
device_manager.add_device(
"eiger", dev_type=DeviceMockType.SIGNAL, readout_priority="monitored", software_trigger=True
)
device_manager.add_device("bpm4i", dev_type=DeviceMockType.SIGNAL, readout_priority="monitored")
yield device_manager
@pytest.fixture
def connector_mock():
connector = ConnectorMock("")
yield connector
class _ScanStubStatusMock:
def __init__(self, done_func) -> None:
self._done = done_func()
@property
def done(self):
return next(self._done)
def wait(self):
return
@pytest.fixture
def ScanStubStatusMock():
return _ScanStubStatusMock
@pytest.fixture
def instruction_handler_mock(connector_mock):
instruction_handler = InstructionHandler(connector_mock)
with mock.patch("bec_server.scan_server.scan_stubs.ScanStubStatus.wait", return_value=None):
yield instruction_handler
@pytest.fixture
def scan_assembler(instruction_handler_mock, device_manager_mock):
def _assemble_scan(scan_class, *args, **kwargs):
return scan_class(*args, **kwargs)
return partial(
_assemble_scan,
instruction_handler=instruction_handler_mock,
device_manager=device_manager_mock,
)
+385 -564
View File
@@ -4,164 +4,143 @@ from unittest import mock
from bec_lib.messages import DeviceInstructionMessage
from bec_server.device_server.tests.utils import DMMock
from debye_bec.scans import XASSimpleScan, XASSimpleScanWithXRD, XASAdvancedScan, XASAdvancedScanWithXRD
from debye_bec.scans import (
XASAdvancedScan,
XASAdvancedScanWithXRD,
XASSimpleScan,
XASSimpleScanWithXRD,
)
def test_xas_simple_scan():
# create a fake device manager that we can use to add devices
device_manager = DMMock()
device_manager.add_device("mo1_bragg")
request = XASSimpleScan(
start=0, stop=5, scan_time=1, scan_duration=10, device_manager=device_manager
)
def get_instructions(request, ScanStubStatusMock):
request.metadata["RID"] = "my_test_request_id"
def fake_done():
"""
Fake done function for ScanStubStatusMock. Upon each call, it returns the next value from the generator.
This is used to simulate the completion of the scan.
"""
yield False
yield False
yield True
def fake_complete(*args, **kwargs):
yield "fake_complete"
return ScanStubStatusMock(done_func=fake_done)
with (
mock.patch.object(request.stubs, "get_req_status", side_effect=[False, True]),
mock.patch.object(request.stubs, "_get_from_rpc", return_value=True),
mock.patch.object(request.stubs, "complete", side_effect=fake_complete),
mock.patch.object(request.stubs, "_get_result_from_status", return_value=None),
):
reference_commands = list(request.run())
for cmd in reference_commands:
if not cmd:
if not cmd or isinstance(cmd, str):
continue
if "RID" in cmd.metadata:
cmd.metadata["RID"] = "my_test_request_id"
if "DIID" in cmd.metadata:
cmd.metadata.pop("DIID")
if "rpc_id" in cmd.parameter:
cmd.parameter["rpc_id"] = "my_test_rpc_id"
cmd.metadata.pop("device_instr_id", None)
assert reference_commands == [
None,
None,
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="scan_report_instruction",
parameter={"device_progress": ["mo1_bragg"]},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="open_scan",
parameter={
"scan_motors": [],
"readout_priority": {
"monitored": [],
"baseline": [],
"on_request": [],
"async": [],
},
"num_points": None,
"positions": [0.0, 5.0],
"scan_name": "xas_simple_scan",
"scan_type": "fly",
return reference_commands
def test_xas_simple_scan(scan_assembler, ScanStubStatusMock):
request = scan_assembler(XASSimpleScan, start=0, stop=5, scan_time=1, scan_duration=10)
reference_commands = get_instructions(request, ScanStubStatusMock)
assert reference_commands == [
None,
None,
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="scan_report_instruction",
parameter={"device_progress": ["mo1_bragg"]},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="open_scan",
parameter={
"scan_motors": [],
"readout_priority": {
"monitored": [],
"baseline": [],
"on_request": [],
"async": [],
},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="stage",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "baseline", "RID": "my_test_request_id"},
device=None,
action="baseline_reading",
parameter={},
),
DeviceInstructionMessage(
metadata={
"readout_priority": "monitored",
"RID": "my_test_request_id",
"response": True,
},
device="mo1_bragg",
action="rpc",
parameter={
"device": "mo1_bragg",
"func": "move_type.set",
"rpc_id": "my_test_rpc_id",
"args": ("energy",),
"kwargs": {},
},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device="mo1_bragg",
action="kickoff",
parameter={"configure": {}, "wait_group": "kickoff"},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device="mo1_bragg",
action="wait",
parameter={"type": "move", "wait_group": "kickoff"},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device="mo1_bragg",
action="complete",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 0},
device=None,
action="read",
parameter={"group": "primary", "wait_group": "readout_primary"},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="wait",
parameter={"type": "read", "group": "primary", "wait_group": "readout_primary"},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 1},
device=None,
action="read",
parameter={"group": "primary", "wait_group": "readout_primary"},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="wait",
parameter={"type": "read", "group": "primary", "wait_group": "readout_primary"},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="wait",
parameter={"type": "read", "group": "primary", "wait_group": "readout_primary"},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="complete",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="unstage",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="close_scan",
parameter={},
),
]
"num_points": None,
"positions": [0.0, 5.0],
"scan_name": "xas_simple_scan",
"scan_type": "fly",
},
),
DeviceInstructionMessage(
metadata={},
device=["bpm4i", "eiger", "mo1_bragg", "samx"],
action="stage",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "baseline", "RID": "my_test_request_id"},
device=["samx"],
action="read",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device="mo1_bragg",
action="rpc",
parameter={
"device": "mo1_bragg",
"func": "move_type.set",
"rpc_id": "my_test_rpc_id",
"args": ("energy",),
"kwargs": {},
},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device="mo1_bragg",
action="kickoff",
parameter={"configure": {}},
),
"fake_complete",
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 0},
device=["bpm4i", "eiger", "mo1_bragg"],
action="read",
parameter={"group": "primary"},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 1},
device=["bpm4i", "eiger", "mo1_bragg"],
action="read",
parameter={"group": "primary"},
),
"fake_complete",
DeviceInstructionMessage(
metadata={},
device=["bpm4i", "eiger", "mo1_bragg", "samx"],
action="unstage",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="close_scan",
parameter={},
),
]
def test_xas_simple_scan_with_xrd():
# create a fake device manager that we can use to add devices
device_manager = DMMock()
device_manager.add_device("mo1_bragg")
def test_xas_simple_scan_with_xrd(scan_assembler, ScanStubStatusMock):
request = XASSimpleScanWithXRD(
request = scan_assembler(
XASSimpleScanWithXRD,
start=0,
stop=5,
scan_time=1,
@@ -174,302 +153,198 @@ def test_xas_simple_scan_with_xrd():
num_trigger_high=2,
exp_time_high=3,
cycle_high=4,
device_manager=device_manager,
)
request.metadata["RID"] = "my_test_request_id"
with (
mock.patch.object(request.stubs, "get_req_status", side_effect=[False, True]),
mock.patch.object(request.stubs, "_get_from_rpc", return_value=True),
):
reference_commands = list(request.run())
reference_commands = get_instructions(request, ScanStubStatusMock)
for cmd in reference_commands:
if not cmd:
continue
if "RID" in cmd.metadata:
cmd.metadata["RID"] = "my_test_request_id"
if "DIID" in cmd.metadata:
cmd.metadata.pop("DIID")
if "rpc_id" in cmd.parameter:
cmd.parameter["rpc_id"] = "my_test_rpc_id"
assert reference_commands == [
None,
None,
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="scan_report_instruction",
parameter={"device_progress": ["mo1_bragg"]},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="open_scan",
parameter={
"scan_motors": [],
"readout_priority": {
"monitored": [],
"baseline": [],
"on_request": [],
"async": [],
},
"num_points": None,
"positions": [0.0, 5.0],
"scan_name": "xas_simple_scan_with_xrd",
"scan_type": "fly",
assert reference_commands == [
None,
None,
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="scan_report_instruction",
parameter={"device_progress": ["mo1_bragg"]},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="open_scan",
parameter={
"scan_motors": [],
"readout_priority": {
"monitored": [],
"baseline": [],
"on_request": [],
"async": [],
},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="stage",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "baseline", "RID": "my_test_request_id"},
device=None,
action="baseline_reading",
parameter={},
),
DeviceInstructionMessage(
metadata={
"readout_priority": "monitored",
"RID": "my_test_request_id",
"response": True,
},
device="mo1_bragg",
action="rpc",
parameter={
"device": "mo1_bragg",
"func": "move_type.set",
"rpc_id": "my_test_rpc_id",
"args": ("energy",),
"kwargs": {},
},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device="mo1_bragg",
action="kickoff",
parameter={"configure": {}, "wait_group": "kickoff"},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device="mo1_bragg",
action="wait",
parameter={"type": "move", "wait_group": "kickoff"},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device="mo1_bragg",
action="complete",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 0},
device=None,
action="read",
parameter={"group": "primary", "wait_group": "readout_primary"},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="wait",
parameter={"type": "read", "group": "primary", "wait_group": "readout_primary"},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 1},
device=None,
action="read",
parameter={"group": "primary", "wait_group": "readout_primary"},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="wait",
parameter={"type": "read", "group": "primary", "wait_group": "readout_primary"},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="wait",
parameter={"type": "read", "group": "primary", "wait_group": "readout_primary"},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="complete",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="unstage",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="close_scan",
parameter={},
),
]
"num_points": None,
"positions": [0.0, 5.0],
"scan_name": "xas_simple_scan_with_xrd",
"scan_type": "fly",
},
),
DeviceInstructionMessage(
metadata={},
device=["bpm4i", "eiger", "mo1_bragg", "samx"],
action="stage",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "baseline", "RID": "my_test_request_id"},
device=["samx"],
action="read",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device="mo1_bragg",
action="rpc",
parameter={
"device": "mo1_bragg",
"func": "move_type.set",
"rpc_id": "my_test_rpc_id",
"args": ("energy",),
"kwargs": {},
},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device="mo1_bragg",
action="kickoff",
parameter={"configure": {}},
),
"fake_complete",
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 0},
device=["bpm4i", "eiger", "mo1_bragg"],
action="read",
parameter={"group": "primary"},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 1},
device=["bpm4i", "eiger", "mo1_bragg"],
action="read",
parameter={"group": "primary"},
),
"fake_complete",
DeviceInstructionMessage(
metadata={},
device=["bpm4i", "eiger", "mo1_bragg", "samx"],
action="unstage",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="close_scan",
parameter={},
),
]
def test_xas_advanced_scan():
# create a fake device manager that we can use to add devices
device_manager = DMMock()
device_manager.add_device("mo1_bragg")
request = XASAdvancedScan(
start=8000, stop=9000, scan_time=1, scan_duration=10, p_kink=50, e_kink=8500, device_manager=device_manager
def test_xas_advanced_scan(scan_assembler, ScanStubStatusMock):
request = scan_assembler(
XASAdvancedScan,
start=8000,
stop=9000,
scan_time=1,
scan_duration=10,
p_kink=50,
e_kink=8500,
)
request.metadata["RID"] = "my_test_request_id"
with (
mock.patch.object(request.stubs, "get_req_status", side_effect=[False, True]),
mock.patch.object(request.stubs, "_get_from_rpc", return_value=True),
):
reference_commands = list(request.run())
reference_commands = get_instructions(request, ScanStubStatusMock)
for cmd in reference_commands:
if not cmd:
continue
if "RID" in cmd.metadata:
cmd.metadata["RID"] = "my_test_request_id"
if "DIID" in cmd.metadata:
cmd.metadata.pop("DIID")
if "rpc_id" in cmd.parameter:
cmd.parameter["rpc_id"] = "my_test_rpc_id"
assert reference_commands == [
None,
None,
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="scan_report_instruction",
parameter={"device_progress": ["mo1_bragg"]},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="open_scan",
parameter={
"scan_motors": [],
"readout_priority": {
"monitored": [],
"baseline": [],
"on_request": [],
"async": [],
},
"num_points": None,
"positions": [8000.0, 9000.0],
"scan_name": "xas_advanced_scan",
"scan_type": "fly",
assert reference_commands == [
None,
None,
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="scan_report_instruction",
parameter={"device_progress": ["mo1_bragg"]},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="open_scan",
parameter={
"scan_motors": [],
"readout_priority": {
"monitored": [],
"baseline": [],
"on_request": [],
"async": [],
},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="stage",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "baseline", "RID": "my_test_request_id"},
device=None,
action="baseline_reading",
parameter={},
),
DeviceInstructionMessage(
metadata={
"readout_priority": "monitored",
"RID": "my_test_request_id",
"response": True,
},
device="mo1_bragg",
action="rpc",
parameter={
"device": "mo1_bragg",
"func": "move_type.set",
"rpc_id": "my_test_rpc_id",
"args": ("energy",),
"kwargs": {},
},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device="mo1_bragg",
action="kickoff",
parameter={"configure": {}, "wait_group": "kickoff"},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device="mo1_bragg",
action="wait",
parameter={"type": "move", "wait_group": "kickoff"},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device="mo1_bragg",
action="complete",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 0},
device=None,
action="read",
parameter={"group": "primary", "wait_group": "readout_primary"},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="wait",
parameter={"type": "read", "group": "primary", "wait_group": "readout_primary"},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 1},
device=None,
action="read",
parameter={"group": "primary", "wait_group": "readout_primary"},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="wait",
parameter={"type": "read", "group": "primary", "wait_group": "readout_primary"},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="wait",
parameter={"type": "read", "group": "primary", "wait_group": "readout_primary"},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="complete",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="unstage",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="close_scan",
parameter={},
),
]
"num_points": None,
"positions": [8000.0, 9000.0],
"scan_name": "xas_advanced_scan",
"scan_type": "fly",
},
),
DeviceInstructionMessage(
metadata={},
device=["bpm4i", "eiger", "mo1_bragg", "samx"],
action="stage",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "baseline", "RID": "my_test_request_id"},
device=["samx"],
action="read",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device="mo1_bragg",
action="rpc",
parameter={
"device": "mo1_bragg",
"func": "move_type.set",
"rpc_id": "my_test_rpc_id",
"args": ("energy",),
"kwargs": {},
},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device="mo1_bragg",
action="kickoff",
parameter={"configure": {}},
),
"fake_complete",
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 0},
device=["bpm4i", "eiger", "mo1_bragg"],
action="read",
parameter={"group": "primary"},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 1},
device=["bpm4i", "eiger", "mo1_bragg"],
action="read",
parameter={"group": "primary"},
),
"fake_complete",
DeviceInstructionMessage(
metadata={},
device=["bpm4i", "eiger", "mo1_bragg", "samx"],
action="unstage",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="close_scan",
parameter={},
),
]
def test_xas_advanced_scan_with_xrd():
# create a fake device manager that we can use to add devices
device_manager = DMMock()
device_manager.add_device("mo1_bragg")
request = XASAdvancedScanWithXRD(
def test_xas_advanced_scan_with_xrd(scan_assembler, ScanStubStatusMock):
request = scan_assembler(
XASAdvancedScanWithXRD,
start=8000,
stop=9000,
scan_time=1,
@@ -484,144 +359,90 @@ def test_xas_advanced_scan_with_xrd():
num_trigger_high=2,
exp_time_high=3,
cycle_high=4,
device_manager=device_manager,
)
request.metadata["RID"] = "my_test_request_id"
with (
mock.patch.object(request.stubs, "get_req_status", side_effect=[False, True]),
mock.patch.object(request.stubs, "_get_from_rpc", return_value=True),
):
reference_commands = list(request.run())
reference_commands = get_instructions(request, ScanStubStatusMock)
for cmd in reference_commands:
if not cmd:
continue
if "RID" in cmd.metadata:
cmd.metadata["RID"] = "my_test_request_id"
if "DIID" in cmd.metadata:
cmd.metadata.pop("DIID")
if "rpc_id" in cmd.parameter:
cmd.parameter["rpc_id"] = "my_test_rpc_id"
assert reference_commands == [
None,
None,
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="scan_report_instruction",
parameter={"device_progress": ["mo1_bragg"]},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="open_scan",
parameter={
"scan_motors": [],
"readout_priority": {
"monitored": [],
"baseline": [],
"on_request": [],
"async": [],
},
"num_points": None,
"positions": [8000.0, 9000.0],
"scan_name": "xas_advanced_scan_with_xrd",
"scan_type": "fly",
assert reference_commands == [
None,
None,
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="scan_report_instruction",
parameter={"device_progress": ["mo1_bragg"]},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="open_scan",
parameter={
"scan_motors": [],
"readout_priority": {
"monitored": [],
"baseline": [],
"on_request": [],
"async": [],
},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="stage",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "baseline", "RID": "my_test_request_id"},
device=None,
action="baseline_reading",
parameter={},
),
DeviceInstructionMessage(
metadata={
"readout_priority": "monitored",
"RID": "my_test_request_id",
"response": True,
},
device="mo1_bragg",
action="rpc",
parameter={
"device": "mo1_bragg",
"func": "move_type.set",
"rpc_id": "my_test_rpc_id",
"args": ("energy",),
"kwargs": {},
},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device="mo1_bragg",
action="kickoff",
parameter={"configure": {}, "wait_group": "kickoff"},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device="mo1_bragg",
action="wait",
parameter={"type": "move", "wait_group": "kickoff"},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device="mo1_bragg",
action="complete",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 0},
device=None,
action="read",
parameter={"group": "primary", "wait_group": "readout_primary"},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="wait",
parameter={"type": "read", "group": "primary", "wait_group": "readout_primary"},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 1},
device=None,
action="read",
parameter={"group": "primary", "wait_group": "readout_primary"},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="wait",
parameter={"type": "read", "group": "primary", "wait_group": "readout_primary"},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="wait",
parameter={"type": "read", "group": "primary", "wait_group": "readout_primary"},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="complete",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="unstage",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="close_scan",
parameter={},
),
]
"num_points": None,
"positions": [8000.0, 9000.0],
"scan_name": "xas_advanced_scan_with_xrd",
"scan_type": "fly",
},
),
DeviceInstructionMessage(
metadata={},
device=["bpm4i", "eiger", "mo1_bragg", "samx"],
action="stage",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "baseline", "RID": "my_test_request_id"},
device=["samx"],
action="read",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device="mo1_bragg",
action="rpc",
parameter={
"device": "mo1_bragg",
"func": "move_type.set",
"rpc_id": "my_test_rpc_id",
"args": ("energy",),
"kwargs": {},
},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device="mo1_bragg",
action="kickoff",
parameter={"configure": {}},
),
"fake_complete",
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 0},
device=["bpm4i", "eiger", "mo1_bragg"],
action="read",
parameter={"group": "primary"},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id", "point_id": 1},
device=["bpm4i", "eiger", "mo1_bragg"],
action="read",
parameter={"group": "primary"},
),
"fake_complete",
DeviceInstructionMessage(
metadata={},
device=["bpm4i", "eiger", "mo1_bragg", "samx"],
action="unstage",
parameter={},
),
DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "my_test_request_id"},
device=None,
action="close_scan",
parameter={},
),
]