fix: fixes related to BEC v3

This commit is contained in:
2024-11-08 13:22:24 +01:00
parent 79f63b84fc
commit 4fa81ddc5c
8 changed files with 275 additions and 285 deletions
+7 -8
View File
@@ -110,9 +110,8 @@ class LamNIMixin:
logger.info(
f"Compensating {[val/1000 for val in lamni_to_stage_coordinates(x_drift,y_drift)]}"
)
yield from self.stubs.set_and_wait(
device=["lsamx", "lsamy"], positions=[move_x, move_y]
)
yield from self.stubs.set(device="lsamx", value=move_x)
yield from self.stubs.set(device="lsamy", value=move_y)
time.sleep(0.01)
rtx_current = yield from self.stubs.send_rpc_and_wait("rtx", "readback.get")
@@ -148,9 +147,9 @@ class LamNIMixin:
+ lamni_to_stage_coordinates(x_drift, y_drift)[1] / 1000
+ lamni_to_stage_coordinates(x_drift2, y_drift2)[1] / 1000
)
yield from self.stubs.set_and_wait(
device=["lsamx", "lsamy"], positions=[move_x, move_y]
)
yield from self.stubs.set(device="lsamx", value=move_x)
yield from self.stubs.set(device="lsamy", value=move_y)
time.sleep(0.01)
rtx_current = yield from self.stubs.send_rpc_and_wait("rtx", "readback.get")
rty_current = yield from self.stubs.send_rpc_and_wait("rty", "readback.get")
@@ -444,7 +443,7 @@ class LamNIFermatScan(ScanBase, LamNIMixin):
}
}
)
yield from self.stubs.set_and_wait(device=["lsamrot"], positions=[angle])
yield from self.stubs.set(device="lsamrot", value=angle)
def scan_core(self):
if self.scan_type == "step":
@@ -457,7 +456,7 @@ class LamNIFermatScan(ScanBase, LamNIMixin):
# scan ID before sending the message to the device server
yield from self.stubs.kickoff(device="rtx")
while True:
yield from self.stubs.read_and_wait(group="primary", wait_group="readout_primary")
yield from self.stubs.read(group="monitored")
msg = self.device_manager.connector.get(MessageEndpoints.device_status("rt_scan"))
if msg:
status = msg
+16 -25
View File
@@ -95,6 +95,7 @@ class FlomniFermatScan(SyncFlyScanBase):
if self.zshift < -100:
logger.warning("The zshift is smaller than -100 um. It will be limited to -100 um.")
self.zshift = -100
self.flomni_rotation_status = None
def initialize(self):
self.scan_motors = []
@@ -150,17 +151,17 @@ class FlomniFermatScan(SyncFlyScanBase):
yield from self.stubs.send_rpc_and_wait("rty", "set", self.positions[0][1])
def _prepare_setup_part2(self):
yield from self.stubs.wait(wait_type="move", device="fsamroy", wait_group="flomni_rotation")
yield from self.stubs.set(
device="rtx", value=self.positions[0][0], wait_group="prepare_setup_part2"
)
yield from self.stubs.set(
device="rtz", value=self.positions[0][2], wait_group="prepare_setup_part2"
)
if self.flomni_rotation_status:
self.flomni_rotation_status.wait()
rtx_status = yield from self.stubs.set(device="rtx", value=self.positions[0][0], wait=False)
rtz_status = yield from self.stubs.set(device="rtz", value=self.positions[0][2], wait=False)
yield from self.stubs.send_rpc_and_wait("rtx", "controller.laser_tracker_on")
yield from self.stubs.wait(
wait_type="move", device=["rtx", "rtz"], wait_group="prepare_setup_part2"
)
rtx_status.wait()
rtz_status.wait()
yield from self._transfer_positions_to_flomni()
yield from self.stubs.send_rpc_and_wait(
"rtx", "controller.move_samx_to_scan_region", self.fovx, self.cenx
@@ -200,7 +201,9 @@ class FlomniFermatScan(SyncFlyScanBase):
}
}
)
yield from self.stubs.set(device="fsamroy", value=angle, wait_group="flomni_rotation")
self.flomni_rotation_status = yield from self.stubs.set(
device="fsamroy", value=angle, wait=False
)
def _transfer_positions_to_flomni(self):
yield from self.stubs.send_rpc_and_wait(
@@ -276,7 +279,7 @@ class FlomniFermatScan(SyncFlyScanBase):
# scan ID before sending the message to the device server
yield from self.stubs.kickoff(device="rtx")
while True:
yield from self.stubs.read_and_wait(group="primary", wait_group="readout_primary")
yield from self.stubs.read(group="monitored")
status = self.device_manager.producer.get(MessageEndpoints.device_status("rt_scan"))
if status:
status_id = status.content.get("status", 1)
@@ -297,19 +300,7 @@ class FlomniFermatScan(SyncFlyScanBase):
"""return to the start position"""
# in flomni, we need to move to the start position of the next scan
if isinstance(self.positions, np.ndarray) and len(self.positions[-1]) == 3:
yield from self.stubs.set(
device="rtx", value=self.positions[-1][0], wait_group="scan_motor"
)
yield from self.stubs.set(
device="rty", value=self.positions[-1][1], wait_group="scan_motor"
)
yield from self.stubs.set(
device="rtz", value=self.positions[-1][2], wait_group="scan_motor"
)
yield from self.stubs.wait(
wait_type="move", device=["rtx", "rty", "rtz"], wait_group="scan_motor"
)
yield from self.stubs.set(device=["rtx", "rty", "rtz"], value=self.positions[-1])
return
logger.warning("No positions found to return to start")
+20 -27
View File
@@ -95,6 +95,7 @@ class OMNYFermatScan(SyncFlyScanBase):
if self.zshift < -100:
logger.warning("The zshift is smaller than -100 um. It will be limited to -100 um.")
self.zshift = -100
self.omny_rotation_status = None
def initialize(self):
self.scan_motors = []
@@ -150,20 +151,22 @@ class OMNYFermatScan(SyncFlyScanBase):
yield from self.stubs.send_rpc_and_wait("rty", "set", self.positions[0][1])
def _prepare_setup_part2(self):
yield from self.stubs.wait(wait_type="move", device="osamroy", wait_group="omny_rotation")
yield from self.stubs.set(
device="rtx", value=self.positions[0][0], wait_group="prepare_setup_part2"
)
yield from self.stubs.set(
device="rtz", value=self.positions[0][2], wait_group="prepare_setup_part2"
)
yield from self.stubs.send_rpc_and_wait("rtx", "controller.laser_tracker_check_and_wait_for_signalstrength")
yield from self.stubs.wait(
wait_type="move", device=["rtx", "rtz"], wait_group="prepare_setup_part2"
if self.omny_rotation_status:
self.omny_rotation_status.wait()
rtx_status = yield from self.stubs.set(device="rtx", value=self.positions[0][0], wait=False)
rtz_status = yield from self.stubs.set(device="rtz", value=self.positions[0][2], wait=False)
yield from self.stubs.send_rpc_and_wait(
"rtx", "controller.laser_tracker_check_and_wait_for_signalstrength"
)
rtx_status.wait()
rtz_status.wait()
yield from self._transfer_positions_to_omny()
yield from self.stubs.send_rpc_and_wait("osamx","omny_osamx_to_scan_center",self.cenx)
yield from self.stubs.send_rpc_and_wait("osamx", "omny_osamx_to_scan_center", self.cenx)
def omny_rotation(self, angle):
# get last setpoint (cannot be based on pos get because they will deviate slightly)
osamroy_current_setpoint = yield from self.stubs.send_rpc_and_wait(
@@ -183,7 +186,9 @@ class OMNYFermatScan(SyncFlyScanBase):
}
}
)
yield from self.stubs.set(device="osamroy", value=angle, wait_group="omny_rotation")
self.omny_rotation_status = yield from self.stubs.set(
device="osamroy", value=angle, wait=False
)
def _transfer_positions_to_omny(self):
yield from self.stubs.send_rpc_and_wait(
@@ -259,7 +264,7 @@ class OMNYFermatScan(SyncFlyScanBase):
# scan ID before sending the message to the device server
yield from self.stubs.kickoff(device="rtx")
while True:
yield from self.stubs.read_and_wait(group="primary", wait_group="readout_primary")
yield from self.stubs.read(group="monitored")
status = self.device_manager.producer.get(MessageEndpoints.device_status("rt_scan"))
if status:
status_id = status.content.get("status", 1)
@@ -280,19 +285,7 @@ class OMNYFermatScan(SyncFlyScanBase):
"""return to the start position"""
# in omny, we need to move to the start position of the next scan
if isinstance(self.positions, np.ndarray) and len(self.positions[-1]) == 3:
yield from self.stubs.set(
device="rtx", value=self.positions[-1][0], wait_group="scan_motor"
)
yield from self.stubs.set(
device="rty", value=self.positions[-1][1], wait_group="scan_motor"
)
yield from self.stubs.set(
device="rtz", value=self.positions[-1][2], wait_group="scan_motor"
)
yield from self.stubs.wait(
wait_type="move", device=["rtx", "rty", "rtz"], wait_group="scan_motor"
)
yield from self.stubs.set(device=["rtx", "rty", "rtz"], value=self.positions[-1])
return
logger.warning("No positions found to return to start")
+10 -16
View File
@@ -262,10 +262,8 @@ class OwisGrid(AsyncFlyScanBase):
yield from self.stubs.send_rpc_and_wait("samy", "acceleration.put", self.acc_time)
# Start motion and send triggers
yield from self.stubs.set(
device="samy",
value=(self.end_y + (self.sign * self.premove_distance)),
wait_group="flyer",
flyer_status = yield from self.stubs.set(
device="samy", value=(self.end_y + (self.sign * self.premove_distance)), wait=False
)
# Trigger fast shutter, open them right away
yield from self.stubs.send_rpc_and_wait("ddg_fsh", "trigger")
@@ -275,32 +273,28 @@ class OwisGrid(AsyncFlyScanBase):
# Trigger detectors
yield from self.stubs.send_rpc_and_wait("ddg_detectors", "trigger")
# Readout primary devices, this waits and could lead to additional overheads
# if devices are slow to response. For optimizing performance, primary devices
# Readout monitored devices, this waits and could lead to additional overheads
# if devices are slow to response. For optimizing performance, monitored devices
# could be read out only once at beginning and end
yield from self.stubs.read_and_wait(
group="primary", wait_group="readout_primary", point_id=self.point_id
)
yield from self.stubs.read(group="monitored", point_id=self.point_id)
self.point_id += 1
# Wait for motion to finish
yield from self.stubs.wait(device="samy", wait_group="flyer", wait_type="move")
flyer_status.wait()
# Move second axis by a step
yield from self.stubs.set(
device="samx", value=(self.start_x - ii * self.stepping_x), wait_group="motion"
status_x = yield from self.stubs.set(
device="samx", value=(self.start_x - ii * self.stepping_x), wait=False
)
# Set acceleration and velocity to max
yield from self.stubs.send_rpc_and_wait("samy", "velocity.put", self.high_velocity)
yield from self.stubs.send_rpc_and_wait("samy", "acceleration.put", self.high_acc_time)
# Move back to start
status_prepos = yield from self.stubs.send_rpc_and_wait(
"samy", "move", (self.start_y - self.premove_distance)
)
yield from self.stubs.set(device="samy", value=(self.start_y - self.premove_distance))
# Wait for motion to finish
status_prepos.wait()
status_x.wait()
# Set speed and acceleration to initial values
def finalize(self):
+13 -20
View File
@@ -126,7 +126,7 @@ class SgalilGrid(AsyncFlyScanBase):
"""
# set up the delay generators
status_ddg_detectors_burst = yield from self.stubs.send_rpc_and_wait(
yield from self.stubs.send_rpc_and_wait(
"ddg_detectors",
"burst_enable",
count=self.interval_y,
@@ -134,7 +134,7 @@ class SgalilGrid(AsyncFlyScanBase):
period=(self.exp_time + self.readout_time),
config="first",
)
status_ddg_mcs_burst = yield from self.stubs.send_rpc_and_wait(
yield from self.stubs.send_rpc_and_wait(
"ddg_mcs",
"burst_enable",
count=self.interval_y,
@@ -143,9 +143,9 @@ class SgalilGrid(AsyncFlyScanBase):
config="first",
)
# Disable burst mod on DDF for fsh and EN of MCS card
status_ddg_fsh_burst = yield from self.stubs.send_rpc_and_wait("ddg_fsh", "burst_disable")
yield from self.stubs.send_rpc_and_wait("ddg_fsh", "burst_disable")
# Set width of FSH opening to 0
status_ddg_fsh_ttlwidth = yield from self.stubs.send_rpc_and_wait(
yield from self.stubs.send_rpc_and_wait(
"ddg_fsh", "set_channels", "width", 0, channels=["channelCD"]
)
@@ -166,19 +166,17 @@ class SgalilGrid(AsyncFlyScanBase):
# status_ddg_mcs_ttlwidth = yield from self.stubs.send_rpc_and_wait(
# "ddg_mcs", "set_channels", "width", 3e-3
# )
status_ddg_mcs_ttldelay = yield from self.stubs.send_rpc_and_wait(
"ddg_mcs", "set_channels", "delay", 0
)
yield from self.stubs.send_rpc_and_wait("ddg_mcs", "set_channels", "delay", 0)
# wait for the delay generators to finish setting up
status_ddg_detectors_source.wait()
status_ddg_mcs_source.wait()
trigger_ddg_fsh = yield from self.stubs.send_rpc_and_wait("ddg_fsh", "trigger")
yield from self.stubs.send_rpc_and_wait("ddg_fsh", "trigger")
# trigger_ddg_fsh.wait()
# status_mcs_points_per_line.wait()
# status_mcs_lines.wait()
yield from self.stubs.kickoff(
kickoff_status = yield from self.stubs.kickoff(
device="samx",
parameter={
"start_y": self.start_y,
@@ -190,19 +188,14 @@ class SgalilGrid(AsyncFlyScanBase):
"exp_time": self.exp_time,
"readout_time": self.readout_time,
},
wait=False,
)
target_diid = self.DIID - 1
while True:
# readout the primary device and wait for the fly scan to finish
yield from self.stubs.read_and_wait(
group="primary", wait_group="readout_primary", point_id=self.point_id
)
while not kickoff_status.done:
# readout the monitored device and wait for the fly scan to finish
yield from self.stubs.read(group="monitored", point_id=self.point_id)
self.point_id += 1
status = self.stubs.get_req_status(
device="samx", RID=self.metadata["RID"], DIID=target_diid
)
if status:
break
time.sleep(self.sleep_time)
if self.scan_progress() > int(self.timeout_scan_abortion / self.sleep_time):
logger.info("would have raised a scan abortion here")
+7 -11
View File
@@ -2,15 +2,15 @@ from unittest import mock
import pytest
from bec_server.device_server.tests.utils import DMMock
from bec_server.scan_server.tests.fixtures import *
from csaxs_bec.scans.flomni_fermat_scan import FlomniFermatScan
@pytest.fixture
def scan_request():
device_manager = DMMock()
device_manager.producer = mock.MagicMock()
flomni_request = FlomniFermatScan(
def scan_request(scan_assembler):
flomni_request = scan_assembler(
FlomniFermatScan,
fovx=5,
fovy=5,
cenx=0.0,
@@ -19,7 +19,6 @@ def scan_request():
step=1,
zshift=0.0,
angle=0.0,
device_manager=device_manager,
metadata={"RID": "1234"},
)
yield flomni_request
@@ -31,7 +30,7 @@ def test_flomni_fermat_scan(scan_request):
def test_flomni_rotation_no_rotation_required(scan_request):
with mock.patch.object(scan_request.stubs, "_get_from_rpc") as get_from_rpc_mock:
with mock.patch.object(scan_request.stubs, "_get_result_from_status") as get_from_rpc_mock:
get_from_rpc_mock.return_value = 90
with mock.patch.object(scan_request.stubs, "scan_report_instruction") as scan_report_mock:
with mock.patch.object(scan_request.stubs, "set") as set_mock:
@@ -41,8 +40,7 @@ def test_flomni_rotation_no_rotation_required(scan_request):
def test_flomni_rotation_rotation_required(scan_request):
with mock.patch.object(scan_request.stubs, "_get_from_rpc") as get_from_rpc_mock:
get_from_rpc_mock.return_value = 0
with mock.patch.object(scan_request.stubs, "_get_result_from_status", return_value=0):
with mock.patch.object(scan_request.stubs, "scan_report_instruction") as scan_report_mock:
with mock.patch.object(scan_request.stubs, "set") as set_mock:
list(scan_request.flomni_rotation(90))
@@ -56,6 +54,4 @@ def test_flomni_rotation_rotation_required(scan_request):
}
}
)
set_mock.assert_called_once_with(
device="fsamroy", value=90, wait_group="flomni_rotation"
)
set_mock.assert_called_once_with(device="fsamroy", value=90, wait=False)
+187 -172
View File
@@ -5,10 +5,33 @@ import pytest
from bec_lib import messages
from bec_server.device_server.tests.utils import DMMock
from bec_server.scan_server.errors import ScanAbortion
from bec_server.scan_server.tests.fixtures import (
DeviceMockType,
DMMock,
ScanStubStatusMock,
connector_mock,
instruction_handler_mock,
scan_assembler,
)
from csaxs_bec.scans.LamNIFermatScan import LamNIFermatScan
@pytest.fixture
def device_manager_mock():
device_manager = DMMock()
device_manager.add_device("lsamx")
device_manager.devices["lsamx"]._config["userParameter"] = {"center": 8.1}
device_manager.add_device("lsamy")
device_manager.devices["lsamy"]._config["userParameter"] = {"center": 10}
device_manager.add_device("samx")
device_manager.devices["samx"].read_buffer = {"value": 0}
device_manager.add_device("samy")
device_manager.devices["samy"].read_buffer = {"value": 0}
device_manager.add_device("bpm4i", dev_type=DeviceMockType.SIGNAL, readout_priority="monitored")
yield device_manager
@pytest.mark.parametrize(
"scan_msg,reference_scan_list",
[
@@ -30,42 +53,49 @@ from csaxs_bec.scans.LamNIFermatScan import LamNIFermatScan
),
[
messages.DeviceInstructionMessage(
metadata={
"readout_priority": "monitored",
"RID": "1234",
"device_instr_id": "diid",
},
device=["rtx", "rty"],
action="read",
parameter={"wait_group": "scan_motor"},
metadata={"readout_priority": "monitored", "DIID": 0},
),
messages.DeviceInstructionMessage(
device=["rtx", "rty"],
action="wait",
parameter={"type": "read", "wait_group": "scan_motor"},
metadata={"readout_priority": "monitored", "DIID": 1},
parameter={},
),
messages.DeviceInstructionMessage(
metadata={
"readout_priority": "monitored",
"RID": "1234",
"device_instr_id": "diid",
},
device="rtx",
action="rpc",
parameter={
"device": "rtx",
"func": "controller.clear_trajectory_generator",
"rpc_id": "e4897d7b-f8d9-4792-ac27-375d72d02aef",
"rpc_id": "rpc_id",
"args": (),
"kwargs": {},
},
metadata={"readout_priority": "monitored", "DIID": 2, "response": True},
),
messages.DeviceInstructionMessage(
metadata={
"readout_priority": "monitored",
"RID": "1234",
"device_instr_id": "diid",
},
device="lsamrot",
action="rpc",
parameter={
"device": "lsamrot",
"func": "user_setpoint.get",
"rpc_id": "7feb8d9e-b536-4958-9965-708a27c5e5f9",
"rpc_id": "rpc_id",
"args": (),
"kwargs": {},
},
metadata={"readout_priority": "monitored", "DIID": 2, "response": True},
),
messages.DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "1234"},
device=None,
action="scan_report_instruction",
parameter={
@@ -76,117 +106,147 @@ from csaxs_bec.scans.LamNIFermatScan import LamNIFermatScan
"end": [10],
}
},
metadata={"readout_priority": "monitored", "DIID": 0},
),
messages.DeviceInstructionMessage(
metadata={
"readout_priority": "monitored",
"RID": "1234",
"device_instr_id": "diid",
},
device="lsamrot",
action="set",
parameter={"value": 10, "wait_group": "scan_motor"},
metadata={"readout_priority": "monitored", "DIID": 3},
),
messages.DeviceInstructionMessage(
device=["lsamrot"],
action="wait",
parameter={"type": "move", "wait_group": "scan_motor"},
metadata={"readout_priority": "monitored", "DIID": 4},
parameter={"value": 10},
),
messages.DeviceInstructionMessage(
metadata={
"readout_priority": "monitored",
"RID": "1234",
"device_instr_id": "diid",
},
device="rtx",
action="rpc",
parameter={
"device": "rtx",
"func": "controller.feedback_disable",
"rpc_id": "a5f5167b-61f2-4c24-8a08-698c0b52a971",
"rpc_id": "rpc_id",
"args": (),
"kwargs": {},
},
metadata={"readout_priority": "monitored", "DIID": 5, "response": True},
),
messages.DeviceInstructionMessage(
metadata={
"readout_priority": "monitored",
"RID": "1234",
"device_instr_id": "diid",
},
device="rtx",
action="rpc",
parameter={
"device": "rtx",
"func": "readback.get",
"rpc_id": "409d1afc-39a5-442b-87e5-18145e59f367",
"rpc_id": "rpc_id",
"args": (),
"kwargs": {},
},
metadata={"readout_priority": "monitored", "DIID": 6, "response": True},
),
messages.DeviceInstructionMessage(
metadata={
"readout_priority": "monitored",
"RID": "1234",
"device_instr_id": "diid",
},
device="rty",
action="rpc",
parameter={
"device": "rty",
"func": "readback.get",
"rpc_id": "80e560c8-c11a-4b6c-87e3-11addea3e80d",
"rpc_id": "rpc_id",
"args": (),
"kwargs": {},
},
metadata={"readout_priority": "monitored", "DIID": 7, "response": True},
),
messages.DeviceInstructionMessage(
metadata={
"readout_priority": "monitored",
"RID": "1234",
"device_instr_id": "diid",
},
device="lsamx",
action="rpc",
parameter={
"device": "lsamx",
"func": "readback.get",
"rpc_id": "5cef7087-3537-40fc-b558-8a2256019783",
"rpc_id": "rpc_id",
"args": (),
"kwargs": {},
},
metadata={"readout_priority": "monitored", "DIID": 8, "response": True},
),
messages.DeviceInstructionMessage(
metadata={
"readout_priority": "monitored",
"RID": "1234",
"device_instr_id": "diid",
},
device="lsamy",
action="rpc",
parameter={
"device": "lsamy",
"func": "readback.get",
"rpc_id": "61a7376c-36cf-41af-94b1-76c1ba821d47",
"rpc_id": "rpc_id",
"args": (),
"kwargs": {},
},
metadata={"readout_priority": "monitored", "DIID": 9, "response": True},
),
messages.DeviceInstructionMessage(
metadata={
"readout_priority": "monitored",
"RID": "1234",
"device_instr_id": "diid",
},
device="rtx",
action="rpc",
parameter={
"device": "rtx",
"func": "readback.get",
"rpc_id": "a1d3c021-12fb-483e-a5b9-95a59d3c1304",
"rpc_id": "rpc_id",
"args": (),
"kwargs": {},
},
metadata={"readout_priority": "monitored", "DIID": 10, "response": True},
),
messages.DeviceInstructionMessage(
metadata={
"readout_priority": "monitored",
"RID": "1234",
"device_instr_id": "diid",
},
device="rty",
action="rpc",
parameter={
"device": "rty",
"func": "readback.get",
"rpc_id": "bde7e130-b7b7-41d0-a56a-c83d740450df",
"rpc_id": "rpc_id",
"args": (),
"kwargs": {},
},
metadata={"readout_priority": "monitored", "DIID": 11, "response": True},
),
messages.DeviceInstructionMessage(
metadata={
"readout_priority": "monitored",
"RID": "1234",
"device_instr_id": "diid",
},
device="rtx",
action="rpc",
parameter={
"device": "rtx",
"func": "controller.feedback_enable_without_reset",
"rpc_id": "aa2117b4-ef44-4c0d-8537-6b6ccea86d1e",
"rpc_id": "rpc_id",
"args": (),
"kwargs": {},
},
metadata={"readout_priority": "monitored", "DIID": 12, "response": True},
),
messages.DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "1234"},
device=None,
action="open_scan",
parameter={
@@ -199,201 +259,163 @@ from csaxs_bec.scans.LamNIFermatScan import LamNIFermatScan
},
"num_points": 2,
"positions": [
[1.3681828686580249, 2.1508313829565298],
[1.3681828686580249, 2.1508313829565293],
[-0.7700589354581364, -0.8406005210092851],
],
"scan_name": "lamni_fermat_scan",
"scan_type": "step",
},
metadata={"readout_priority": "monitored", "DIID": 13},
),
messages.DeviceInstructionMessage(
device=None,
metadata={"device_instr_id": "diid"},
device=["bpm4i", "lsamx", "lsamy", "samx", "samy"],
action="stage",
parameter={},
metadata={"readout_priority": "monitored", "DIID": 14},
),
messages.DeviceInstructionMessage(
device=None,
action="baseline_reading",
metadata={
"readout_priority": "baseline",
"RID": "1234",
"device_instr_id": "diid",
},
device=["lsamx", "lsamy", "samx", "samy"],
action="read",
parameter={},
metadata={"readout_priority": "baseline", "DIID": 15},
),
messages.DeviceInstructionMessage(
metadata={
"readout_priority": "monitored",
"RID": "1234",
"device_instr_id": "diid",
},
device="rtx",
action="set",
parameter={"value": 1.3681828686580249, "wait_group": "scan_motor"},
metadata={"readout_priority": "monitored", "DIID": 17},
parameter={"value": 1.3681828686580249},
),
messages.DeviceInstructionMessage(
metadata={
"readout_priority": "monitored",
"RID": "1234",
"device_instr_id": "diid",
},
device="rty",
action="set",
parameter={"value": 2.1508313829565298, "wait_group": "scan_motor"},
metadata={"readout_priority": "monitored", "DIID": 18},
parameter={"value": 2.1508313829565293},
),
None,
messages.DeviceInstructionMessage(
device=None,
action="wait",
parameter={"type": "move", "group": "scan_motor", "wait_group": "scan_motor"},
metadata={"readout_priority": "monitored", "DIID": 19},
),
messages.DeviceInstructionMessage(
device=None,
action="trigger",
parameter={"group": "trigger"},
metadata={"readout_priority": "monitored", "DIID": 20, "point_id": 0},
),
messages.DeviceInstructionMessage(
device=None,
action="wait",
parameter={"type": "trigger", "group": "trigger", "time": 0.1},
metadata={"readout_priority": "monitored", "DIID": 21},
),
messages.DeviceInstructionMessage(
device=None,
action="read",
parameter={"group": "primary", "wait_group": "readout_primary"},
metadata={"readout_priority": "monitored", "DIID": 22, "point_id": 0},
),
messages.DeviceInstructionMessage(
device=None,
action="wait",
parameter={
"type": "read",
"group": "scan_motor",
"wait_group": "readout_primary",
metadata={
"readout_priority": "monitored",
"RID": "1234",
"point_id": 0,
"device_instr_id": "diid",
},
metadata={"readout_priority": "monitored", "DIID": 23},
device=["bpm4i"],
action="read",
parameter={"group": "monitored"},
),
messages.DeviceInstructionMessage(
metadata={
"readout_priority": "monitored",
"RID": "1234",
"device_instr_id": "diid",
},
device="rtx",
action="set",
parameter={"value": -0.7700589354581364, "wait_group": "scan_motor"},
metadata={"readout_priority": "monitored", "DIID": 24},
parameter={"value": -0.7700589354581364},
),
messages.DeviceInstructionMessage(
metadata={
"readout_priority": "monitored",
"RID": "1234",
"device_instr_id": "diid",
},
device="rty",
action="set",
parameter={"value": -0.8406005210092851, "wait_group": "scan_motor"},
metadata={"readout_priority": "monitored", "DIID": 25},
parameter={"value": -0.8406005210092851},
),
None,
messages.DeviceInstructionMessage(
device=None,
action="wait",
parameter={"type": "move", "group": "scan_motor", "wait_group": "scan_motor"},
metadata={"readout_priority": "monitored", "DIID": 26},
),
messages.DeviceInstructionMessage(
device=None,
action="wait",
parameter={"type": "read", "group": "primary", "wait_group": "readout_primary"},
metadata={"readout_priority": "monitored", "DIID": 27},
),
messages.DeviceInstructionMessage(
device=None,
action="trigger",
parameter={"group": "trigger"},
metadata={"readout_priority": "monitored", "DIID": 28, "point_id": 1},
),
messages.DeviceInstructionMessage(
device=None,
action="wait",
parameter={"type": "trigger", "group": "trigger", "time": 0.1},
metadata={"readout_priority": "monitored", "DIID": 29},
),
messages.DeviceInstructionMessage(
device=None,
action="read",
parameter={"group": "primary", "wait_group": "readout_primary"},
metadata={"readout_priority": "monitored", "DIID": 30, "point_id": 1},
),
messages.DeviceInstructionMessage(
device=None,
action="wait",
parameter={
"type": "read",
"group": "scan_motor",
"wait_group": "readout_primary",
metadata={
"readout_priority": "monitored",
"RID": "1234",
"point_id": 1,
"device_instr_id": "diid",
},
metadata={"readout_priority": "monitored", "DIID": 31},
device=["bpm4i"],
action="read",
parameter={"group": "monitored"},
),
messages.DeviceInstructionMessage(
device=None,
action="wait",
parameter={"type": "read", "group": "primary", "wait_group": "readout_primary"},
metadata={"readout_priority": "monitored", "DIID": 16},
metadata={
"readout_priority": "monitored",
"RID": "1234",
"device_instr_id": "diid",
},
device=["bpm4i", "lsamx", "lsamy", "samx", "samy"],
action="complete",
parameter={},
),
messages.DeviceInstructionMessage(
**{"device": None, "action": "complete", "parameter": {}},
metadata={"readout_priority": "monitored", "DIID": 31},
),
messages.DeviceInstructionMessage(
device=None,
metadata={"device_instr_id": "diid"},
device=["bpm4i", "lsamx", "lsamy", "samx", "samy"],
action="unstage",
parameter={},
metadata={"readout_priority": "monitored", "DIID": 17},
),
messages.DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "1234"},
device=None,
action="close_scan",
parameter={},
metadata={"readout_priority": "monitored", "DIID": 18},
),
],
)
],
)
def test_LamNIFermatScan(scan_msg, reference_scan_list):
device_manager = DMMock()
device_manager.add_device("lsamx")
device_manager.devices["lsamx"]._config["userParameter"] = {"center": 8.1}
device_manager.add_device("lsamy")
device_manager.devices["lsamy"]._config["userParameter"] = {"center": 10}
device_manager.add_device("samx")
device_manager.devices["samx"].read_buffer = {"value": 0}
device_manager.add_device("samy")
device_manager.devices["samy"].read_buffer = {"value": 0}
scan = LamNIFermatScan(
def test_LamNIFermatScan(scan_msg, reference_scan_list, scan_assembler):
scan = scan_assembler(
LamNIFermatScan,
parameter=scan_msg.content.get("parameter"),
device_manager=device_manager,
metadata=scan_msg.metadata,
**scan_msg.content["parameter"]["kwargs"],
)
scan.stubs._get_from_rpc = lambda x: 0
with mock.patch.object(scan, "_check_min_positions") as check_min_pos:
scan_instructions = list(scan.run())
check_min_pos.assert_called_once()
scan_uid = scan_instructions[0].metadata.get("scan_id")
for ii, instr in enumerate(reference_scan_list):
with mock.patch.object(scan.stubs, "_get_result_from_status", return_value=0):
with mock.patch.object(scan, "_check_min_positions") as check_min_pos:
scan_instructions = list(scan.run())
check_min_pos.assert_called_once()
for ii, instr in enumerate(scan_instructions):
if instr is None:
continue
if instr.metadata.get("scan_id") is not None:
instr.metadata["scan_id"] = scan_uid
instr.metadata["DIID"] = ii
instr.metadata["RID"] = scan.metadata.get("RID")
instr.metadata["scan_id"] = "scan_id"
if instr.metadata.get("RID") is not None:
instr.metadata["RID"] = scan.metadata.get("RID")
if instr.metadata.get("device_instr_id") is not None:
instr.metadata["device_instr_id"] = "diid"
if instr.content["action"] == "rpc":
reference_scan_list[ii].content["parameter"]["rpc_id"] = scan_instructions[
ii
].content["parameter"]["rpc_id"]
reference_scan_list[ii].metadata["RID"] = scan_instructions[ii].metadata.get("RID")
instr.content["parameter"]["rpc_id"] = "rpc_id"
if instr.content["parameter"].get("value"):
assert np.isclose(
instr.content["parameter"].get("value"),
scan_instructions[ii].content["parameter"].get("value"),
reference_scan_list[ii].content["parameter"].get("value"),
)
instr.content["parameter"]["value"] = scan_instructions[ii].content["parameter"][
instr.content["parameter"]["value"] = reference_scan_list[ii].content["parameter"][
"value"
]
if instr.content["parameter"].get("positions"):
assert np.isclose(
instr.content["parameter"].get("positions"),
scan_instructions[ii].content["parameter"].get("positions"),
reference_scan_list[ii].content["parameter"].get("positions"),
).all()
instr.content["parameter"]["positions"] = scan_instructions[ii].content[
instr.content["parameter"]["positions"] = reference_scan_list[ii].content[
"parameter"
]["positions"]
assert scan_instructions == reference_scan_list
def test_LamNIFermatScan_min_positions():
def test_LamNIFermatScan_min_positions(scan_assembler):
scan_msg = messages.ScanQueueMessage(
scan_type="lamni_fermat_scan",
parameter={
@@ -409,19 +431,12 @@ def test_LamNIFermatScan_min_positions():
queue="primary",
metadata={"RID": "1234"},
)
device_manager = DMMock()
device_manager.add_device("lsamx")
device_manager.devices["lsamx"]._config["userParameter"] = {"center": 8.1}
device_manager.add_device("lsamy")
device_manager.devices["lsamy"]._config["userParameter"] = {"center": 10}
device_manager.add_device("samx")
device_manager.devices["samx"].read_buffer = {"value": 0}
device_manager.add_device("samy")
device_manager.devices["samy"].read_buffer = {"value": 0}
scan = LamNIFermatScan(
scan = scan_assembler(
LamNIFermatScan,
parameter=scan_msg.content.get("parameter"),
device_manager=device_manager,
metadata=scan_msg.metadata,
**scan_msg.content["parameter"]["kwargs"],
)
with pytest.raises(ScanAbortion):
instructions = list(scan.run())
+15 -6
View File
@@ -3,6 +3,7 @@ from unittest import mock
import numpy as np
import pytest
from bec_lib import messages
from bec_server.scan_server.tests.fixtures import *
from csaxs_bec.scans.owis_grid import OwisGrid
@@ -28,21 +29,29 @@ from csaxs_bec.scans.owis_grid import OwisGrid
)
],
)
def test_owis_grid(scan_msg):
dm = mock.MagicMock()
request = OwisGrid(*scan_msg.content["parameter"]["args"].values(), device_manager=dm)
def test_owis_grid(scan_msg, scan_assembler, ScanStubStatusMock):
request = scan_assembler(OwisGrid, *scan_msg.content["parameter"]["args"].values())
request.high_velocity = 10
request.high_acc_time = 0.2
request.base_velocity = 0.0625
# pylint: disable=protected-access
request.stubs._get_from_rpc = lambda x: mock.MagicMock()
def fake_done():
yield False
yield True
def fake_set(*args, **kwargs):
yield "fake_set"
return ScanStubStatusMock(done_func=fake_done)
with (
mock.patch.object(request.stubs, "get_req_status", return_value=1),
mock.patch.object(request.stubs, "set", side_effect=fake_set),
mock.patch.object(request.stubs, "_get_result_from_status"),
mock.patch.object(
request, "get_initial_motor_properties"
) as mock_get_init_motor_properties,
):
scan_instructions = list(request.run())
mock_get_init_motor_properties.assert_called_once()
assert request.point_id == scan_msg.content["parameter"]["args"]["interval_x"]
assert np.isclose(