Files
csaxs_bec/tests/tests_devices/test_eiger.py
appel_c 2bb6667f30
Some checks failed
CI for csaxs_bec / test (push) Failing after 34s
test(eiger): cleanup and add tests
2025-09-17 17:40:43 +02:00

319 lines
12 KiB
Python

# pylint: skip-file
import os
import threading
from time import time
from typing import TYPE_CHECKING, Generator
from unittest import mock
import pytest
from bec_lib.messages import FileMessage, ScanStatusMessage
from jfjoch_client.models.broker_status import BrokerStatus
from jfjoch_client.models.dataset_settings import DatasetSettings
from jfjoch_client.models.detector_list import DetectorList
from jfjoch_client.models.detector_list_element import DetectorListElement
from jfjoch_client.models.detector_settings import DetectorSettings
from jfjoch_client.models.detector_timing import DetectorTiming
from jfjoch_client.models.file_writer_format import FileWriterFormat
from jfjoch_client.models.file_writer_settings import FileWriterSettings
from jfjoch_client.models.measurement_statistics import MeasurementStatistics
from ophyd import Staged
from ophyd_devices.utils.psi_device_base_utils import DeviceStatus
from csaxs_bec.devices.jungfraujoch.eiger import Eiger
from csaxs_bec.devices.jungfraujoch.eiger_1_5m import Eiger1_5M
from csaxs_bec.devices.jungfraujoch.eiger_9m import Eiger9M
if TYPE_CHECKING: # pragma no cover
from bec_lib.messages import FileMessage
# @pytest.fixture(scope="function")
# def scan_worker_mock(scan_server_mock):
# scan_server_mock.device_manager.connector = mock.MagicMock()
# scan_worker = ScanWorker(parent=scan_server_mock)
# yield scan_worker
@pytest.fixture(
scope="function",
params=[(0.1, 1, 1, "line_scan"), (0.2, 2, 2, "time_scan"), (0.5, 5, 5, "acquire")],
)
def mock_scan_info(request, tmpdir):
exp_time, frames_per_trigger, num_points, scan_name = request.param
scan_info = ScanStatusMessage(
scan_id="test_id",
status="open",
scan_number=1,
scan_parameters={
"exp_time": exp_time,
"frames_per_trigger": frames_per_trigger,
"system_config": {},
},
info={"file_components": (f"{tmpdir}/data/S00000/S000001", "h5")},
num_points=num_points,
scan_name=scan_name,
)
yield scan_info
@pytest.fixture(scope="function", params=[(1,), (2,)])
def detector_list(request) -> Generator[DetectorList, None, None]:
"""Fixture for the detector list."""
current_id = request.param[0]
detector_list = DetectorList(
detectors=[
DetectorListElement(
id=1,
description="EIGER 1.5M",
serial_number="123456",
base_ipv4_addr="192.168.0.1",
udp_interface_count=1,
nmodules=1,
width=512,
height=512,
pixel_size_mm=0.1,
readout_time_us=100,
min_frame_time_us=1000,
min_count_time_us=100,
type="EIGER",
),
DetectorListElement(
id=2,
description="EIGER 8.5M (tmp)",
serial_number="123456",
base_ipv4_addr="192.168.0.1",
udp_interface_count=1,
nmodules=1,
width=512,
height=512,
pixel_size_mm=0.1,
readout_time_us=100,
min_frame_time_us=1000,
min_count_time_us=100,
type="EIGER",
),
],
current_id=current_id,
)
yield detector_list
@pytest.fixture(scope="function")
def eiger_1_5m(mock_scan_info) -> Generator[Eiger1_5M, None, None]:
"""Fixture for the Eiger 1.5M device."""
name = "eiger_1_5m"
dev = Eiger1_5M(name=name, beam_center=(256, 256), detector_distance=100.0)
dev.scan_info.msg = mock_scan_info
yield dev
@pytest.fixture(scope="function")
def eiger_9m(mock_scan_info) -> Generator[Eiger9M, None, None]:
"""Fixture for the Eiger 9M device.
Currently only on_connected is different for both devices, all other methods are the same."""
name = "eiger_9m"
dev = Eiger9M(name=name)
dev.scan_info.msg = mock_scan_info
yield dev
@pytest.mark.parametrize("detector_state", ["Idle", "Inactive"])
def test_eiger_1_5m_on_connected(eiger_1_5m, detector_list, detector_state):
"""Test the on_connected logic of the Eiger detector."""
eiger = eiger_1_5m
detector_id = 1
with (
mock.patch.object(eiger.jfj_client, "stop") as mock_jfj_client_stop,
mock.patch.object(
eiger.jfj_client.api, "config_select_detector_get", return_value=detector_list
),
mock.patch.object(
eiger.jfj_client.api, "status_get", return_value=BrokerStatus(state=detector_state)
),
mock.patch.object(eiger.jfj_client, "set_detector_settings") as mock_set_det,
mock.patch.object(eiger.jfj_client.api, "config_file_writer_put") as mock_file_writer,
mock.patch.object(eiger, "jfj_preview_client") as mock_jfj_preview_client,
):
if detector_state != "Idle" or detector_list.current_id != detector_id:
with pytest.raises(RuntimeError):
eiger.on_connected()
mock_jfj_client_stop.assert_called_once()
assert mock_jfj_preview_client.call_count == 0
else:
eiger.on_connected()
assert mock_set_det.call_args == mock.call(
DetectorSettings(frame_time_us=500, timing=DetectorTiming.TRIGGER), timeout=10
)
assert mock_file_writer.call_args == mock.call(
file_writer_settings=FileWriterSettings(
overwrite=True, format=FileWriterFormat.NXMXVDS
),
_request_timeout=10,
)
mock_jfj_client_stop.assert_called_once()
assert mock_jfj_preview_client.connect.call_count == 1
assert mock_jfj_preview_client.start.call_count == 1
@pytest.mark.parametrize("detector_state", ["Idle", "Inactive"])
def test_eiger_9m_on_connected(eiger_9m, detector_list, detector_state):
"""Test the on_connected logic of the Eiger detector."""
eiger = eiger_9m
detector_id = 2
with (
mock.patch.object(eiger.jfj_client, "stop") as mock_jfj_client_stop,
mock.patch.object(
eiger.jfj_client.api, "config_select_detector_get", return_value=detector_list
),
mock.patch.object(
eiger.jfj_client.api, "status_get", return_value=BrokerStatus(state=detector_state)
),
mock.patch.object(eiger.jfj_client, "set_detector_settings") as mock_set_det,
mock.patch.object(eiger.jfj_client.api, "config_file_writer_put") as mock_file_writer,
mock.patch.object(eiger, "jfj_preview_client") as mock_jfj_preview_client,
):
if detector_state != "Idle" or detector_list.current_id != detector_id:
with pytest.raises(RuntimeError):
eiger.on_connected()
mock_jfj_client_stop.assert_called_once()
assert mock_jfj_preview_client.call_count == 0
else:
eiger.on_connected()
assert mock_set_det.call_args == mock.call(
DetectorSettings(frame_time_us=500, timing=DetectorTiming.TRIGGER), timeout=10
)
assert mock_file_writer.call_args == mock.call(
file_writer_settings=FileWriterSettings(
overwrite=True, format=FileWriterFormat.NXMXVDS
),
_request_timeout=10,
)
mock_jfj_client_stop.assert_called_once()
assert mock_jfj_preview_client.connect.call_count == 1
assert mock_jfj_preview_client.start.call_count == 1
@pytest.mark.timeout(20)
def test_eiger_on_stop(eiger_1_5m):
"""Test the on_stop logic of the Eiger detector. This is equivalent for 9M and 1_5M."""
eiger = eiger_1_5m
start_event = threading.Event()
stop_event = threading.Event()
def tmp_task():
start_event.set()
try:
while True:
time.sleep(0.1)
finally:
stop_event.set()
eiger.task_handler.submit_task(tmp_task, run=True)
start_event.wait(timeout=5) # Wait for thread to start
with mock.patch.object(eiger.jfj_client, "stop") as mock_jfj_client_stop:
eiger.on_stop()
mock_jfj_client_stop.assert_called_once()
stop_event.wait(timeout=5) # Thread should be killed from task_handler
@pytest.mark.timeout(25)
@pytest.mark.parametrize("raise_timeout", [True, False])
def test_eiger_on_complete(eiger_1_5m, raise_timeout):
"""Test the on_complete logic of the Eiger detector. This is equivalent for 9M and 1_5M."""
eiger = eiger_1_5m
callback_completed_event = threading.Event()
def _callback_complete(status: DeviceStatus):
if status.done:
callback_completed_event.set()
unblock_wait_for_idle = threading.Event()
def mock_wait_for_idle(timeout: int, request_timeout: float):
if unblock_wait_for_idle.wait(timeout):
if raise_timeout:
return False
return True
return False
with (
mock.patch.object(eiger.jfj_client, "wait_for_idle", side_effect=mock_wait_for_idle),
mock.patch.object(
eiger.jfj_client.api,
"statistics_data_collection_get",
return_value=MeasurementStatistics(run_number=1),
),
):
status = eiger.complete()
status.add_callback(_callback_complete)
assert status.done == False
assert status.success == False
assert eiger.file_event.get() is None
unblock_wait_for_idle.set()
if raise_timeout:
with pytest.raises(TimeoutError):
status.wait(timeout=10)
else:
status.wait(timeout=10)
assert status.done == True
assert status.success == False if raise_timeout else True
def test_eiger_file_event_callback(eiger_1_5m, tmp_path):
"""Test the file_event callback of the Eiger detector. This is equivalent for 9M and 1_5M."""
eiger = eiger_1_5m
test_file = tmp_path / "test_file.h5"
eiger._full_path = str(test_file)
assert eiger.file_event.get() is None
status = DeviceStatus(device=eiger, done=True, success=True)
eiger._file_event_callback(status)
file_msg: FileMessage = eiger.file_event.get()
assert file_msg.device_name == eiger.name
assert file_msg.file_path == str(test_file)
assert file_msg.done is True
assert file_msg.successful is True
assert file_msg.hinted_h5_entries == {"data": "entry/data/data"}
status = DeviceStatus(device=eiger, done=False, success=False)
eiger._file_event_callback(status)
file_msg: FileMessage = eiger.file_event.get()
assert file_msg.device_name == eiger.name
assert file_msg.file_path == str(test_file)
assert file_msg.done is False
assert file_msg.successful is False
assert file_msg.hinted_h5_entries == {"data": "entry/data/data"}
def test_eiger_on_sage(eiger_1_5m):
"""Test the on_stage and on_unstage logic of the Eiger detector. This is equivalent for 9M and 1_5M."""
eiger = eiger_1_5m
scan_msg = eiger.scan_info.msg
with (
mock.patch.object(eiger.jfj_client, "wait_for_idle", return_value=True),
mock.patch.object(eiger.jfj_client, "start") as mock_start,
):
eiger.stage()
assert (
eiger._full_path
== f"{scan_msg.info['file_components'][0]}_{eiger.name}_master.{scan_msg.info['file_components'][1]}"
)
file_msg: FileMessage = eiger.file_event.get()
assert file_msg.file_path == eiger._full_path
assert file_msg.done is False
assert file_msg.successful is False
assert file_msg.hinted_h5_entries == {"data": "entry/data/data"}
data_settings = DatasetSettings(
image_time_us=int(scan_msg.scan_parameters["exp_time"] * 1e6),
ntrigger=int(scan_msg.num_points * scan_msg.scan_parameters["frames_per_trigger"]),
file_prefix=os.path.relpath(eiger._full_path, start="/sls/x12sa/data").removesuffix(
"_master.h5"
),
beam_x_pxl=eiger.beam_center[0],
beam_y_pxl=eiger.beam_center[1],
detector_distance_mm=eiger.detector_distance,
incident_energy_ke_v=12.0, # hardcoded at this moment as it is hardcoded in the Eiger implementation
)
assert mock_start.call_args == mock.call(settings=data_settings)
assert eiger.staged is Staged.yes