Lamni scan adjustments #195

Merged
appel_c merged 3 commits from fix/lamni_scan_fix into main 2026-04-21 16:31:33 +02:00
5 changed files with 78 additions and 136 deletions
@@ -259,7 +259,7 @@ class Camera:
def get_image_data(self) -> np.ndarray | None:
"""Get the image data from the camera."""
if not self._connected:
self._rate_limited_warning_log("Camera is not connected.")
self._rate_limited_warning_log(f"Camera with id {self.camera_id} is not connected.")
return None
array = self.ueye.get_data(
self.cam.pc_image_mem,
+24 -20
View File
@@ -26,7 +26,7 @@ import numpy as np
from bec_lib import bec_logger
from bec_lib.endpoints import MessageEndpoints
from bec_server.scan_server.errors import ScanAbortion
from bec_server.scan_server.scans import RequestBase, ScanArgType, ScanBase
from bec_server.scan_server.scans import AsyncFlyScanBase, RequestBase, ScanArgType
MOVEMENT_SCALE_X = np.sin(np.radians(15)) * np.cos(np.radians(30))
MOVEMENT_SCALE_Y = np.cos(np.radians(15))
@@ -205,10 +205,9 @@ class LamNIMoveToScanCenter(RequestBase, LamNIMixin):
yield from self.lamni_new_scan_center_interferometer(center_x, center_y)
class LamNIFermatScan(ScanBase, LamNIMixin):
class LamNIFermatScan(AsyncFlyScanBase, LamNIMixin):
scan_name = "lamni_fermat_scan"
scan_report_hint = "table"
scan_type = "step"
scan_type = "fly"
required_kwargs = ["fov_size", "exp_time", "step", "angle"]
arg_input = {}
arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None}
@@ -258,7 +257,16 @@ class LamNIFermatScan(ScanBase, LamNIMixin):
self.optim_trajectory_corridor = scan_kwargs.get("optim_trajectory_corridor")
def initialize(self):
self.scan_motors = ["rtx", "rty"]
self.scan_motors = []
self.update_readout_priority()
def scan_report_instructions(self):
"""Scan report instructions for the progress bar"""
yield from self.stubs.scan_report_instruction({"device_progress": ["rt_positions"]})
@property
def monitor_sync(self) -> str:
return "rt_positions"
def _optimize_trajectory(self):
self.positions = self.optimize_corridor(
@@ -449,29 +457,25 @@ class LamNIFermatScan(ScanBase, LamNIMixin):
yield from self.stubs.set(device="lsamrot", value=angle)
def scan_core(self):
if self.scan_type == "step":
for ind, pos in self._get_position():
for self.burst_index in range(self.burst_at_each_point):
yield from self._at_each_point(ind, pos)
self.burst_index = 0
elif self.scan_type == "fly":
# fly scan mode
yield from self.stubs.kickoff(device="rt_positions")
# fly scan mode
yield from self.stubs.kickoff(device="rt_positions")
# start the readout loop of the flyer
status = yield from self.stubs.complete(device="rt_positions", wait=False)
# start the readout loop of the flyer
status = yield from self.stubs.complete(device="rt_positions", wait=False)
while not status.done:
yield from self.stubs.read(group="monitored", point_id=self.point_id)
self.point_id += 1
time.sleep(1)
logger.debug("reading monitors")
while not status.done:
yield from self.stubs.read(group="monitored", point_id=self.point_id)
self.point_id += 1
time.sleep(1)
logger.debug("reading monitors")
def run(self):
self.initialize()
yield from self.read_scan_motors()
self.prepare_positions()
yield from self._prepare_setup()
yield from self.scan_report_instructions()
yield from self.open_scan()
yield from self.stage()
yield from self.run_baseline_reading()
+3
View File
@@ -0,0 +1,3 @@
import os
os.environ.setdefault("OPHYD_CONTROL_LAYER", "dummy")
+1 -1
View File
@@ -1,7 +1,7 @@
# pylint: skip-file
import os
import threading
from time import time
import time
from typing import TYPE_CHECKING, Generator
from unittest import mock
+49 -114
View File
@@ -52,16 +52,7 @@ def device_manager_mock():
metadata={"RID": "1234"},
),
[
messages.DeviceInstructionMessage(
metadata={
"readout_priority": "monitored",
"RID": "1234",
"device_instr_id": "diid",
},
device=["rtx", "rty"],
action="read",
parameter={},
),
None,
messages.DeviceInstructionMessage(
metadata={
"readout_priority": "monitored",
@@ -261,6 +252,12 @@ def device_manager_mock():
"kwargs": {},
},
),
messages.DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "1234"},
device=None,
action="scan_report_instruction",
parameter={"device_progress": ["rt_positions"]},
),
messages.DeviceInstructionMessage(
metadata={"readout_priority": "monitored", "RID": "1234"},
device=None,
@@ -297,26 +294,6 @@ def device_manager_mock():
action="read",
parameter={},
),
messages.DeviceInstructionMessage(
metadata={
"readout_priority": "monitored",
"RID": "1234",
"device_instr_id": "diid",
},
device="rtx",
action="set",
parameter={"value": 1.3681828686580249},
),
messages.DeviceInstructionMessage(
metadata={
"readout_priority": "monitored",
"RID": "1234",
"device_instr_id": "diid",
},
device="rty",
action="set",
parameter={"value": 2.1508313829565293},
),
messages.DeviceInstructionMessage(
metadata={
"readout_priority": "monitored",
@@ -333,19 +310,9 @@ def device_manager_mock():
"RID": "1234",
"device_instr_id": "diid",
},
device="rtx",
action="set",
parameter={"value": 1.3681828686580249},
),
messages.DeviceInstructionMessage(
metadata={
"readout_priority": "monitored",
"RID": "1234",
"device_instr_id": "diid",
},
device="rty",
action="set",
parameter={"value": 2.1508313829565293},
device="rt_positions",
action="kickoff",
parameter={"configure": {}},
),
None,
messages.DeviceInstructionMessage(
@@ -359,48 +326,7 @@ def device_manager_mock():
action="read",
parameter={"group": "monitored"},
),
messages.DeviceInstructionMessage(
metadata={
"readout_priority": "monitored",
"RID": "1234",
"device_instr_id": "diid",
},
device="rtx",
action="set",
parameter={"value": -0.7700589354581364},
),
messages.DeviceInstructionMessage(
metadata={
"readout_priority": "monitored",
"RID": "1234",
"device_instr_id": "diid",
},
device="rty",
action="set",
parameter={"value": -0.8406005210092851},
),
None,
messages.DeviceInstructionMessage(
metadata={
"readout_priority": "monitored",
"RID": "1234",
"point_id": 1,
"device_instr_id": "diid",
},
device=["bpm4i"],
action="read",
parameter={"group": "monitored"},
),
messages.DeviceInstructionMessage(
metadata={
"readout_priority": "monitored",
"RID": "1234",
"device_instr_id": "diid",
},
device=["bpm4i", "lsamx", "lsamy", "samx", "samy"],
action="complete",
parameter={},
),
messages.DeviceInstructionMessage(
metadata={"device_instr_id": "diid"},
device=["bpm4i", "lsamx", "lsamy", "samx", "samy"],
@@ -417,7 +343,7 @@ def device_manager_mock():
)
],
)
def test_LamNIFermatScan(scan_msg, reference_scan_list, scan_assembler):
def test_LamNIFermatScan(scan_msg, reference_scan_list, scan_assembler, ScanStubStatusMock):
scan = scan_assembler(
LamNIFermatScan,
parameter=scan_msg.content.get("parameter"),
@@ -425,38 +351,47 @@ def test_LamNIFermatScan(scan_msg, reference_scan_list, scan_assembler):
**scan_msg.content["parameter"]["kwargs"],
)
def fake_done():
yield False
yield True
def fake_complete(*args, **kwargs):
yield None
return ScanStubStatusMock(done_func=fake_done)
with mock.patch.object(scan.stubs, "_get_result_from_status", return_value=0):
with mock.patch.object(scan, "_check_min_positions") as check_min_pos:
scan_instructions = list(scan.run())
check_min_pos.assert_called_once()
with mock.patch.object(scan.stubs, "complete", side_effect=fake_complete):
scan_instructions = list(scan.run())
check_min_pos.assert_called_once()
for ii, instr in enumerate(scan_instructions):
if instr is None:
continue
if instr.metadata.get("scan_id") is not None:
instr.metadata["scan_id"] = "scan_id"
if instr.metadata.get("RID") is not None:
instr.metadata["RID"] = scan.metadata.get("RID")
if instr.metadata.get("device_instr_id") is not None:
instr.metadata["device_instr_id"] = "diid"
if instr.content["action"] == "rpc":
instr.content["parameter"]["rpc_id"] = "rpc_id"
if instr.content["parameter"].get("value"):
assert np.isclose(
instr.content["parameter"].get("value"),
reference_scan_list[ii].content["parameter"].get("value"),
)
instr.content["parameter"]["value"] = reference_scan_list[ii].content["parameter"][
"value"
]
if instr.content["parameter"].get("positions"):
assert np.isclose(
instr.content["parameter"].get("positions"),
reference_scan_list[ii].content["parameter"].get("positions"),
).all()
instr.content["parameter"]["positions"] = reference_scan_list[ii].content[
"parameter"
]["positions"]
for ii, instr in enumerate(scan_instructions):
if instr is None:
continue
if instr.metadata.get("scan_id") is not None:
instr.metadata["scan_id"] = "scan_id"
if instr.metadata.get("RID") is not None:
instr.metadata["RID"] = scan.metadata.get("RID")
if instr.metadata.get("device_instr_id") is not None:
instr.metadata["device_instr_id"] = "diid"
if instr.content["action"] == "rpc":
instr.content["parameter"]["rpc_id"] = "rpc_id"
if instr.content["parameter"].get("value"):
assert np.isclose(
instr.content["parameter"].get("value"),
reference_scan_list[ii].content["parameter"].get("value"),
)
instr.content["parameter"]["value"] = reference_scan_list[ii].content[
"parameter"
]["value"]
if instr.content["parameter"].get("positions"):
assert np.isclose(
instr.content["parameter"].get("positions"),
reference_scan_list[ii].content["parameter"].get("positions"),
).all()
instr.content["parameter"]["positions"] = reference_scan_list[ii].content[
"parameter"
]["positions"]
assert scan_instructions == reference_scan_list