refactor: migrate devices to new scan_info from scan_server
CI for csaxs_bec / test (push) Successful in 1m30s
CI for csaxs_bec / test (pull_request) Successful in 1m30s

This commit is contained in:
2026-06-15 15:44:17 +02:00
parent 3397befafd
commit 64a285aed3
7 changed files with 51 additions and 27 deletions
+1 -1
View File
@@ -284,7 +284,7 @@ class Eiger(PSIDeviceBase):
)
# Setting up the full path for file writing
self._full_path = get_full_path(self.scan_parameters, name=f"{self.name}_master")
self._full_path = get_full_path(self.scan_info.msg, name=f"{self.name}_master")
self._full_path = os.path.abspath(os.path.expanduser(self._full_path))
# Inform BEC about upcoming file event
@@ -24,6 +24,7 @@ class PandaBoxOMNY(PandaBox):
def on_stage(self):
start_time = time.time()
super().on_stage()
self.scan_parameters = fetch_scan_info(self.scan_info)
# TODO, adjust as seen fit.
# Adjust the acquisition group based on scan parameters if needed
if self.scan_parameters.scan_type == "hardware_triggered":
+1
View File
@@ -13,6 +13,7 @@ def fetch_scan_info(scan_info: ScanInfo) -> ScanServerScanInfo:
info = scan_info.msg.info
if isinstance(info["positions"], list):
info["positions"] = np.array(info["positions"])
info["num_monitored_readouts"] = scan_info.msg.num_monitored_readouts
try:
msg = ScanServerScanInfo.model_validate(info)
except ValidationError: # This means we have an old scan_info object.
@@ -43,6 +43,7 @@ from csaxs_bec.devices.epics.delay_generator_csaxs.delay_generator_csaxs import
DelayGeneratorCSAXS,
)
from csaxs_bec.devices.epics.mcs_card.mcs_card_csaxs import MCSCardCSAXS
from csaxs_bec.devices.utils.utils import fetch_scan_info
############################
### Test Delay Generator ###
@@ -280,8 +281,8 @@ def test_ddg1_stage(mock_ddg1: DDG1):
mock_ddg1.burst_delay.put(5) # Non-default, should be reset on stage
mock_ddg1.burst_count.put(10) # Non-default, should be reset on stage
mock_ddg1.scan_info.msg.scan_parameters["exp_time"] = exp_time
mock_ddg1.scan_info.msg.scan_parameters["frames_per_trigger"] = frames_per_trigger
mock_ddg1.scan_info.msg.info["exp_time"] = exp_time
mock_ddg1.scan_info.msg.info["frames_per_trigger"] = frames_per_trigger
mock_ddg1.fast_shutter_control._read_pv.mock_data = 0 # Simulate shutter control
mock_ddg1.stage()
@@ -315,7 +316,7 @@ def test_ddg1_stage(mock_ddg1: DDG1):
shutter_width, exp_time * frames_per_trigger
) # Shutter to open delay is not added as shutter is kept open
# Simulate fly scan, so no extra trigger for MCS card.
mock_ddg1.scan_info.msg.scan_type = "fly"
mock_ddg1.scan_info.msg.info["scan_type"] = "hardware_triggered"
mock_ddg1.stage()
# Shutter channel cd
assert np.isclose(mock_ddg1.cd.delay.get(), 0)
@@ -339,6 +340,11 @@ def test_ddg1_on_trigger(mock_ddg1: DDG1):
# Make sure DDG is setup in default state through on_connected
ddg.on_connected()
ddg.scan_info.msg.info["scan_type"] = (
"software_triggered" # Simulate fly scan, so no extra trigger for MCS card.
)
ddg.scan_parameters = fetch_scan_info(ddg.scan_info)
# Check that poll thread is running and run event is not set
assert ddg._poll_thread.is_alive()
assert not ddg._poll_thread_run_event.is_set()
@@ -514,8 +520,8 @@ def test_ddg2_on_stage(mock_ddg2: DDG2):
exp_time = 0.1
frames_per_trigger = 10
ddg.on_connected()
ddg.scan_info.msg.scan_parameters["exp_time"] = exp_time
ddg.scan_info.msg.scan_parameters["frames_per_trigger"] = frames_per_trigger
ddg.scan_info.msg.info["exp_time"] = exp_time
ddg.scan_info.msg.info["frames_per_trigger"] = frames_per_trigger
# Set non-default burst mode settings
ddg.burst_mode.put(0)
@@ -538,7 +544,7 @@ def test_ddg2_on_stage(mock_ddg2: DDG2):
ddg.unstage() # Reset staged state for next test
exp_time_short = 2e-4 # too short exposure time
with pytest.raises(ValueError):
ddg.scan_info.msg.scan_parameters["exp_time"] = exp_time_short
ddg.scan_info.msg.info["exp_time"] = exp_time_short
ddg.stage()
+11 -1
View File
@@ -8,6 +8,7 @@ from unittest import mock
import numpy as np
import pytest
from bec_lib.messages import FileMessage, ScanStatusMessage
from bec_server.scan_server.scans.scan_base import ScanInfo as ScanServerScanInfo
from jfjoch_client.models.broker_status import BrokerStatus
from jfjoch_client.models.dataset_settings import DatasetSettings
from jfjoch_client.models.detector_list import DetectorList
@@ -49,10 +50,19 @@ def mock_scan_info(request, tmpdir):
"frames_per_trigger": frames_per_trigger,
"system_config": {},
},
info={"file_components": (f"{tmpdir}/data/S00000/S000001", "h5")},
info=ScanServerScanInfo(
scan_name=scan_name,
scan_id="test_id",
positions=np.array([[0, 0], [1, 1], [2, 2]]),
exp_time=exp_time,
frames_per_trigger=frames_per_trigger,
system_config={},
num_points=num_points,
).model_dump(),
num_points=num_points,
scan_name=scan_name,
)
scan_info.info.update({"file_components": (f"{tmpdir}/data/S00000/S000001", "h5")})
yield scan_info
+6 -3
View File
@@ -25,6 +25,7 @@ from csaxs_bec.devices.epics.mcs_card.mcs_card import (
MCSCard,
)
from csaxs_bec.devices.epics.mcs_card.mcs_card_csaxs import MCSCardCSAXS
from csaxs_bec.devices.utils.utils import fetch_scan_info
@pytest.fixture(scope="function")
@@ -121,8 +122,8 @@ def test_mcs_card_csaxs_stage(mock_mcs_csaxs: MCSCardCSAXS):
mcs = mock_mcs_csaxs
triggers = 5
num_points = 10
mcs.scan_info.msg.scan_parameters["frames_per_trigger"] = triggers
mcs.scan_info.msg.num_points = num_points
mcs.scan_info.msg.info["frames_per_trigger"] = triggers
mcs.scan_info.msg.info["num_points"] = num_points
# Simulate that the MCS card is still acquiring, and that current channel is !=0
mcs.current_channel._read_pv.mock_data = 2 # Simulate that current channel is not zero
@@ -171,6 +172,7 @@ def test_mcs_card_csaxs_complete_and_stop(mock_mcs_csaxs: MCSCardCSAXS):
II. Acquisition completes normally
"""
mcs = mock_mcs_csaxs
mcs.scan_parameters = fetch_scan_info(mcs.scan_info)
mcs.acquiring._read_pv.mock_data = ACQUIRING.ACQUIRING
# Make sure that device on_connected has been called which starts the monitoring thread
mcs.on_connected()
@@ -197,7 +199,8 @@ def test_mcs_card_csaxs_complete_and_stop(mock_mcs_csaxs: MCSCardCSAXS):
#######################
mcs._current_data_index = 0
mcs.scan_info.msg.num_points = 10
mcs.scan_info.msg.info["num_points"] = 10
mcs.scan_parameters = fetch_scan_info(mcs.scan_info)
mcs.acquiring._read_pv.mock_data = ACQUIRING.ACQUIRING
st = mcs.complete()
+19 -16
View File
@@ -10,6 +10,7 @@ from ophyd import Staged
from csaxs_bec.devices.panda_box.panda_box import PandaBoxCSAXS
from csaxs_bec.devices.panda_box.panda_box_omny import PandaBoxOMNY
from csaxs_bec.devices.utils.utils import fetch_scan_info
@pytest.fixture
@@ -65,18 +66,18 @@ def test_panda_omny(panda_omny):
@pytest.mark.parametrize(
"scan_type, frames_per_trigger, expected_acquisition_group",
[
("fly", 1, "fly"),
("fly", 5, "fly"),
("step", 10, "burst"),
("step", 1, "monitored"), # Default case
("hardware_triggered", 1, "fly"),
("hardware_triggered", 5, "fly"),
("software_triggered", 10, "burst"),
("software_triggered", 1, "monitored"), # Default case
],
)
def test_panda_omny_stage(panda_omny, scan_type, frames_per_trigger, expected_acquisition_group):
# Check that the stage signal is present and has the correct PV
assert len(panda_omny._status_callbacks) == 0
panda_omny.scan_info.msg.scan_type = scan_type
panda_omny.scan_info.msg.scan_parameters["frames_per_trigger"] = frames_per_trigger
panda_omny.scan_info.msg.info["scan_type"] = scan_type
panda_omny.scan_info.msg.info["frames_per_trigger"] = frames_per_trigger
panda_omny.stage()
assert panda_omny._acquisition_group == expected_acquisition_group
@@ -85,8 +86,9 @@ def test_panda_omny_stage(panda_omny, scan_type, frames_per_trigger, expected_ac
def test_panda_omny_complete(panda_omny):
"""Test the on_complete method of the PandaBoxCSAXS device."""
panda_omny.scan_info.msg.num_points = 1
panda_omny.scan_info.msg.scan_parameters["frames_per_trigger"] = 1
panda_omny.scan_info.msg.info["num_points"] = 1
panda_omny.scan_info.msg.info["frames_per_trigger"] = 1
panda_omny.scan_parameters = fetch_scan_info(panda_omny.scan_info)
panda_omny._timeout_on_completed = 0.5 # Set a short timeout for testing
@@ -133,18 +135,18 @@ def test_panda_csaxs(panda_csaxs):
@pytest.mark.parametrize(
"scan_type, frames_per_trigger, expected_acquisition_group",
[
("fly", 1, "fly"),
("fly", 5, "fly"),
("step", 10, "burst"),
("step", 1, "monitored"), # Default case
("hardware_triggered", 1, "fly"),
("hardware_triggered", 5, "fly"),
("software_triggered", 10, "burst"),
("software_triggered", 1, "monitored"), # Default case
],
)
def test_panda_csaxs_stage(panda_csaxs, scan_type, frames_per_trigger, expected_acquisition_group):
"""Test the on_stage method of the PandaBoxCSAXS device for different scan types and frames per trigger."""
assert len(panda_csaxs._status_callbacks) == 0
panda_csaxs.scan_info.msg.scan_type = scan_type
panda_csaxs.scan_info.msg.scan_parameters["frames_per_trigger"] = frames_per_trigger
panda_csaxs.scan_info.msg.info["scan_type"] = scan_type
panda_csaxs.scan_info.msg.info["frames_per_trigger"] = frames_per_trigger
panda_csaxs.stage()
assert panda_csaxs._acquisition_group == expected_acquisition_group
@@ -153,8 +155,9 @@ def test_panda_csaxs_stage(panda_csaxs, scan_type, frames_per_trigger, expected_
def test_panda_csaxs_complete(panda_csaxs):
"""Test the on_complete method of the PandaBoxCSAXS device."""
panda_csaxs.scan_info.msg.num_points = 1
panda_csaxs.scan_info.msg.scan_parameters["frames_per_trigger"] = 1
panda_csaxs.scan_info.msg.info["num_points"] = 1
panda_csaxs.scan_info.msg.info["frames_per_trigger"] = 1
panda_csaxs.scan_parameters = fetch_scan_info(panda_csaxs.scan_info)
panda_csaxs._timeout_on_completed = 0.5 # Set a short timeout for testing