# pylint: skip-file import os import threading from time import time from typing import TYPE_CHECKING, Generator from unittest import mock import numpy as np import pytest from bec_lib.messages import FileMessage, ScanStatusMessage from jfjoch_client.models.broker_status import BrokerStatus from jfjoch_client.models.dataset_settings import DatasetSettings from jfjoch_client.models.detector_list import DetectorList from jfjoch_client.models.detector_list_element import DetectorListElement from jfjoch_client.models.detector_settings import DetectorSettings from jfjoch_client.models.detector_timing import DetectorTiming from jfjoch_client.models.file_writer_format import FileWriterFormat from jfjoch_client.models.file_writer_settings import FileWriterSettings from jfjoch_client.models.measurement_statistics import MeasurementStatistics from ophyd import Staged from ophyd_devices.utils.psi_device_base_utils import DeviceStatus from csaxs_bec.devices.jungfraujoch.eiger import Eiger from csaxs_bec.devices.jungfraujoch.eiger_1_5m import Eiger1_5M from csaxs_bec.devices.jungfraujoch.eiger_9m import Eiger9M if TYPE_CHECKING: # pragma no cover from bec_lib.messages import FileMessage # @pytest.fixture(scope="function") # def scan_worker_mock(scan_server_mock): # scan_server_mock.device_manager.connector = mock.MagicMock() # scan_worker = ScanWorker(parent=scan_server_mock) # yield scan_worker @pytest.fixture( scope="function", params=[(0.1, 1, 1, "line_scan"), (0.2, 2, 2, "time_scan"), (0.5, 5, 5, "acquire")], ) def mock_scan_info(request, tmpdir): exp_time, frames_per_trigger, num_points, scan_name = request.param scan_info = ScanStatusMessage( scan_id="test_id", status="open", scan_number=1, scan_parameters={ "exp_time": exp_time, "frames_per_trigger": frames_per_trigger, "system_config": {}, }, info={"file_components": (f"{tmpdir}/data/S00000/S000001", "h5")}, num_points=num_points, scan_name=scan_name, ) yield scan_info @pytest.fixture(scope="function", params=[(1,), (2,)]) def detector_list(request) -> Generator[DetectorList, None, None]: """Fixture for the detector list.""" current_id = request.param[0] detector_list = DetectorList( detectors=[ DetectorListElement( id=1, description="EIGER 1.5M", serial_number="123456", base_ipv4_addr="192.168.0.1", udp_interface_count=1, nmodules=1, width=512, height=512, pixel_size_mm=0.1, readout_time_us=100, min_frame_time_us=1000, min_count_time_us=100, type="EIGER", ), DetectorListElement( id=2, description="EIGER 9M", serial_number="123456", base_ipv4_addr="192.168.0.1", udp_interface_count=1, nmodules=1, width=512, height=512, pixel_size_mm=0.1, readout_time_us=100, min_frame_time_us=1000, min_count_time_us=100, type="EIGER", ), ], current_id=current_id, ) yield detector_list @pytest.fixture(scope="function") def eiger_1_5m(mock_scan_info) -> Generator[Eiger1_5M, None, None]: """Fixture for the Eiger 1.5M device.""" name = "eiger_1_5m" dev = Eiger1_5M(name=name, beam_center=(256, 256), detector_distance=100.0) dev.scan_info.msg = mock_scan_info try: yield dev finally: if dev._destroyed is False: dev.destroy() @pytest.fixture(scope="function") def eiger_9m(mock_scan_info) -> Generator[Eiger9M, None, None]: """Fixture for the Eiger 9M device. Currently only on_connected is different for both devices, all other methods are the same.""" name = "eiger_9m" dev = Eiger9M(name=name) dev.scan_info.msg = mock_scan_info try: yield dev finally: if dev._destroyed is False: dev.destroy() def test_eiger_wait_for_connection(eiger_1_5m, eiger_9m): """Test the wait_for_connection metho is calling status_get on the JFJ API client.""" for eiger in (eiger_1_5m, eiger_9m): with mock.patch.object(eiger.jfj_client.api, "status_get") as mock_status_get: eiger.wait_for_connection(timeout=1) mock_status_get.assert_called_once_with(_request_timeout=1) @pytest.mark.parametrize("detector_state", ["Idle", "Inactive"]) def test_eiger_1_5m_on_connected(eiger_1_5m, detector_list, detector_state): """Test the on_connected logic of the Eiger detector.""" eiger = eiger_1_5m detector_id = 1 with ( mock.patch.object(eiger.jfj_client, "stop") as mock_jfj_client_stop, mock.patch.object( eiger.jfj_client.api, "config_select_detector_get", return_value=detector_list ), mock.patch.object( eiger.jfj_client.api, "status_get", return_value=BrokerStatus(state=detector_state) ), mock.patch.object(eiger.jfj_client, "set_detector_settings") as mock_set_det, mock.patch.object(eiger.jfj_client.api, "config_file_writer_put") as mock_file_writer, mock.patch.object(eiger, "jfj_preview_client") as mock_jfj_preview_client, ): if detector_state != "Idle" or detector_list.current_id != detector_id: with pytest.raises(RuntimeError): eiger.on_connected() mock_jfj_client_stop.assert_called_once() assert mock_jfj_preview_client.call_count == 0 else: eiger.on_connected() assert mock_set_det.call_args == mock.call( DetectorSettings(frame_time_us=500, timing=DetectorTiming.TRIGGER), timeout=5 ) assert mock_file_writer.call_args == mock.call( file_writer_settings=FileWriterSettings( overwrite=True, format=FileWriterFormat.NXMXVDS ), _request_timeout=10, ) mock_jfj_client_stop.assert_called_once() assert mock_jfj_preview_client.connect.call_count == 1 assert mock_jfj_preview_client.start.call_count == 1 @pytest.mark.parametrize("detector_state", ["Idle", "Inactive"]) def test_eiger_9m_on_connected(eiger_9m, detector_list, detector_state): """Test the on_connected logic of the Eiger detector.""" eiger = eiger_9m detector_id = 2 with ( mock.patch.object(eiger.jfj_client, "stop") as mock_jfj_client_stop, mock.patch.object( eiger.jfj_client.api, "config_select_detector_get", return_value=detector_list ), mock.patch.object( eiger.jfj_client.api, "status_get", return_value=BrokerStatus(state=detector_state) ), mock.patch.object(eiger.jfj_client, "set_detector_settings") as mock_set_det, mock.patch.object(eiger.jfj_client.api, "config_file_writer_put") as mock_file_writer, mock.patch.object(eiger, "jfj_preview_client") as mock_jfj_preview_client, ): if detector_state != "Idle" or detector_list.current_id != detector_id: with pytest.raises(RuntimeError): eiger.on_connected() mock_jfj_client_stop.assert_called_once() assert mock_jfj_preview_client.call_count == 0 else: eiger.on_connected() assert mock_set_det.call_args == mock.call( DetectorSettings(frame_time_us=500, timing=DetectorTiming.TRIGGER), timeout=5 ) assert mock_file_writer.call_args == mock.call( file_writer_settings=FileWriterSettings( overwrite=True, format=FileWriterFormat.NXMXVDS ), _request_timeout=10, ) mock_jfj_client_stop.assert_called_once() assert mock_jfj_preview_client.connect.call_count == 1 assert mock_jfj_preview_client.start.call_count == 1 @pytest.mark.timeout(20) def test_eiger_on_stop(eiger_1_5m): """Test the on_stop logic of the Eiger detector. This is equivalent for 9M and 1_5M.""" eiger = eiger_1_5m start_event = threading.Event() stop_event = threading.Event() def tmp_task(): start_event.set() try: while True: time.sleep(0.1) finally: stop_event.set() eiger.task_handler.submit_task(tmp_task, run=True) start_event.wait(timeout=5) # Wait for thread to start with mock.patch.object(eiger.jfj_client, "stop") as mock_jfj_client_stop: eiger.on_stop() mock_jfj_client_stop.assert_called_once() stop_event.wait(timeout=5) # Thread should be killed from task_handler def test_eiger_on_destroy(eiger_1_5m): """Test the on_destroy logic of the Eiger detector. This is equivalent for 9M and 1_5M.""" eiger = eiger_1_5m start_event = threading.Event() stop_event = threading.Event() def tmp_task(): start_event.set() try: while True: time.sleep(0.1) finally: stop_event.set() eiger.task_handler.submit_task(tmp_task) start_event.wait(timeout=5) with ( mock.patch.object(eiger.jfj_preview_client, "stop") as mock_jfj_preview_client_stop, mock.patch.object(eiger.jfj_client, "stop") as mock_jfj_client_stop, ): eiger.on_destroy() mock_jfj_preview_client_stop.assert_called_once() mock_jfj_client_stop.assert_called_once() stop_event.wait(timeout=5) @pytest.mark.timeout(25) @pytest.mark.parametrize("raise_timeout", [True, False]) def test_eiger_on_complete(eiger_1_5m, raise_timeout): """Test the on_complete logic of the Eiger detector. This is equivalent for 9M and 1_5M.""" eiger = eiger_1_5m eiger._wait_for_on_complete = 1 # reduce wait time for testing callback_completed_event = threading.Event() def _callback_complete(status: DeviceStatus): if status.done: callback_completed_event.set() unblock_wait_for_idle = threading.Event() def mock_wait_for_idle(timeout: float, raise_on_timeout: bool) -> bool: if unblock_wait_for_idle.wait(timeout): if raise_timeout: return False return True return False with ( mock.patch.object( eiger.jfj_client.api, "status_get", return_value=BrokerStatus(state="Idle") ), mock.patch.object(eiger.jfj_client, "wait_for_idle", side_effect=mock_wait_for_idle), mock.patch.object( eiger.jfj_client.api, "statistics_data_collection_get", return_value=MeasurementStatistics( run_number=1, images_collected=eiger.scan_info.msg.num_points * eiger.scan_info.msg.scan_parameters["frames_per_trigger"], ), ), ): status = eiger.complete() status.add_callback(_callback_complete) assert status.done == False assert status.success == False assert eiger.file_event.get() is None unblock_wait_for_idle.set() if raise_timeout: with pytest.raises(TimeoutError): status.wait(timeout=10) else: status.wait(timeout=10) assert status.done == True assert status.success == False if raise_timeout else True def test_eiger_file_event_callback(eiger_1_5m, tmp_path): """Test the file_event callback of the Eiger detector. This is equivalent for 9M and 1_5M.""" eiger = eiger_1_5m test_file = tmp_path / "test_file.h5" eiger._full_path = str(test_file) assert eiger.file_event.get() is None status = DeviceStatus(device=eiger, done=True, success=True) eiger._file_event_callback(status) file_msg: FileMessage = eiger.file_event.get() assert file_msg.device_name == eiger.name assert file_msg.file_path == str(test_file) assert file_msg.done is True assert file_msg.successful is True assert file_msg.hinted_h5_entries == {"data": "entry/data/data"} status = DeviceStatus(device=eiger, done=False, success=False) eiger._file_event_callback(status) file_msg: FileMessage = eiger.file_event.get() assert file_msg.device_name == eiger.name assert file_msg.file_path == str(test_file) assert file_msg.done is False assert file_msg.successful is False assert file_msg.hinted_h5_entries == {"data": "entry/data/data"} def test_eiger_on_stage(eiger_1_5m): """Test the on_stage and on_unstage logic of the Eiger detector. This is equivalent for 9M and 1_5M.""" eiger = eiger_1_5m scan_msg = eiger.scan_info.msg with ( mock.patch.object(eiger.jfj_client, "wait_for_idle", return_value=True), mock.patch.object(eiger.jfj_client, "start") as mock_start, ): eiger.stage() assert ( eiger._full_path == f"{scan_msg.info['file_components'][0]}_{eiger.name}_master.{scan_msg.info['file_components'][1]}" ) file_msg: FileMessage = eiger.file_event.get() assert file_msg.file_path == eiger._full_path assert file_msg.done is False assert file_msg.successful is False assert file_msg.hinted_h5_entries == {"data": "entry/data/data"} data_settings = DatasetSettings( image_time_us=int(scan_msg.scan_parameters["exp_time"] * 1e6), ntrigger=int(scan_msg.num_points * scan_msg.scan_parameters["frames_per_trigger"]), file_prefix=os.path.relpath(eiger._full_path, start="/sls/x12sa/data").removesuffix( "_master.h5" ), beam_x_pxl=eiger.beam_center[0], beam_y_pxl=eiger.beam_center[1], detector_distance_mm=eiger.detector_distance, incident_energy_ke_v=12.0, # hardcoded at this moment as it is hardcoded in the Eiger implementation ) assert mock_start.call_args == mock.call(settings=data_settings) assert eiger.staged is Staged.yes def test_eiger_set_det_distance_test_beam_center(eiger_1_5m): """Test the set_detector_distance and set_beam_center methods. Equivalent for 9M and 1_5M.""" eiger = eiger_1_5m old_distance = eiger.detector_distance new_distance = old_distance + 100 old_beam_center = eiger.beam_center new_beam_center = (old_beam_center[0] + 20, old_beam_center[1] + 50) eiger.set_detector_distance(new_distance) assert eiger.detector_distance == new_distance eiger.set_beam_center(x=new_beam_center[0], y=new_beam_center[1]) assert eiger.beam_center == new_beam_center with pytest.raises(ValueError): eiger.set_beam_center(x=-10, y=100) # Cannot set negative beam center with pytest.raises(ValueError): eiger.detector_distance = -50 # Cannot set negative detector distance def test_eiger_preview_callback(eiger_1_5m): """Preview callback test for the Eiger detector. This is equivalent for 9M and 1_5M.""" eiger = eiger_1_5m # NOTE: I don't find models for the CBOR messages used by JFJ, currently using a dummay dict. # Please adjust once the proper model is found. for msg_type in ["start", "end", "image", "calibration", "metadata"]: msg = {"type": msg_type, "data": {"default": np.array([[1, 2], [3, 4]])}} with mock.patch.object(eiger.preview_image, "put") as mock_preview_put: eiger._preview_callback(msg) if msg_type == "image": mock_preview_put.assert_called_once_with(msg["data"]["default"]) else: mock_preview_put.assert_not_called()