diff --git a/csaxs_bec/device_configs/endstation.yaml b/csaxs_bec/device_configs/endstation.yaml index 6f7d9ca..962bd71 100644 --- a/csaxs_bec/device_configs/endstation.yaml +++ b/csaxs_bec/device_configs/endstation.yaml @@ -53,18 +53,3 @@ eiger_1_5: enabled: true readoutPriority: async softwareTrigger: False - -samx: - readoutPriority: baseline - deviceClass: ophyd_devices.SimPositioner - deviceConfig: - delay: 1 - limits: - - -50 - - 50 - tolerance: 0.01 - update_frequency: 400 - deviceTags: - - user motors - enabled: true - readOnly: false diff --git a/csaxs_bec/devices/jungfraujoch/eiger.py b/csaxs_bec/devices/jungfraujoch/eiger.py index f172323..70ee01f 100644 --- a/csaxs_bec/devices/jungfraujoch/eiger.py +++ b/csaxs_bec/devices/jungfraujoch/eiger.py @@ -281,7 +281,7 @@ class Eiger(PSIDeviceBase): logger.info(f"Acquisition done callback called for {self.name} for status {status.success}") self.file_event.put( file_path=self._full_path, - done=True, + done=status.done, successful=status.success, hinted_h5_entries={"data": "entry/data/data"}, ) diff --git a/csaxs_bec/devices/jungfraujoch/eiger_9m.py b/csaxs_bec/devices/jungfraujoch/eiger_9m.py index ca47caf..f44ca1d 100644 --- a/csaxs_bec/devices/jungfraujoch/eiger_9m.py +++ b/csaxs_bec/devices/jungfraujoch/eiger_9m.py @@ -25,7 +25,7 @@ DETECTOR_NAME = "EIGER 8.5M (tmp)" # "EIGER 9M"" # pylint:disable=invalid-name -class Eiger1_5M(Eiger): +class Eiger9M(Eiger): """ Eiger 1.5M specific integration for the in-vaccum Eiger. diff --git a/csaxs_bec/devices/jungfraujoch/jungfrau_joch_client.py b/csaxs_bec/devices/jungfraujoch/jungfrau_joch_client.py index 8f08be0..1db6b15 100644 --- a/csaxs_bec/devices/jungfraujoch/jungfrau_joch_client.py +++ b/csaxs_bec/devices/jungfraujoch/jungfrau_joch_client.py @@ -74,6 +74,8 @@ class JungfrauJochClient: """Set the connected status""" self._initialised = value + # TODO this is not correct, as it may be that the state in INACTIVE. Models are not in sync... + # REMOVE all model enums as most of the validation takes place in the Pydantic models, i.e. BrokerStatus here.. @property def detector_state(self) -> DetectorState: """Get the status of JungfrauJoch""" diff --git a/tests/tests_devices/test_eiger.py b/tests/tests_devices/test_eiger.py new file mode 100644 index 0000000..e43c981 --- /dev/null +++ b/tests/tests_devices/test_eiger.py @@ -0,0 +1,318 @@ +# pylint: skip-file +import os +import threading +from time import time +from typing import TYPE_CHECKING, Generator +from unittest import mock + +import pytest +from bec_lib.messages import FileMessage, ScanStatusMessage +from jfjoch_client.models.broker_status import BrokerStatus +from jfjoch_client.models.dataset_settings import DatasetSettings +from jfjoch_client.models.detector_list import DetectorList +from jfjoch_client.models.detector_list_element import DetectorListElement +from jfjoch_client.models.detector_settings import DetectorSettings +from jfjoch_client.models.detector_timing import DetectorTiming +from jfjoch_client.models.file_writer_format import FileWriterFormat +from jfjoch_client.models.file_writer_settings import FileWriterSettings +from jfjoch_client.models.measurement_statistics import MeasurementStatistics +from ophyd import Staged +from ophyd_devices.utils.psi_device_base_utils import DeviceStatus + +from csaxs_bec.devices.jungfraujoch.eiger import Eiger +from csaxs_bec.devices.jungfraujoch.eiger_1_5m import Eiger1_5M +from csaxs_bec.devices.jungfraujoch.eiger_9m import Eiger9M + +if TYPE_CHECKING: # pragma no cover + from bec_lib.messages import FileMessage + +# @pytest.fixture(scope="function") +# def scan_worker_mock(scan_server_mock): +# scan_server_mock.device_manager.connector = mock.MagicMock() +# scan_worker = ScanWorker(parent=scan_server_mock) +# yield scan_worker + + +@pytest.fixture( + scope="function", + params=[(0.1, 1, 1, "line_scan"), (0.2, 2, 2, "time_scan"), (0.5, 5, 5, "acquire")], +) +def mock_scan_info(request, tmpdir): + exp_time, frames_per_trigger, num_points, scan_name = request.param + scan_info = ScanStatusMessage( + scan_id="test_id", + status="open", + scan_number=1, + scan_parameters={ + "exp_time": exp_time, + "frames_per_trigger": frames_per_trigger, + "system_config": {}, + }, + info={"file_components": (f"{tmpdir}/data/S00000/S000001", "h5")}, + num_points=num_points, + scan_name=scan_name, + ) + yield scan_info + + +@pytest.fixture(scope="function", params=[(1,), (2,)]) +def detector_list(request) -> Generator[DetectorList, None, None]: + """Fixture for the detector list.""" + current_id = request.param[0] + detector_list = DetectorList( + detectors=[ + DetectorListElement( + id=1, + description="EIGER 1.5M", + serial_number="123456", + base_ipv4_addr="192.168.0.1", + udp_interface_count=1, + nmodules=1, + width=512, + height=512, + pixel_size_mm=0.1, + readout_time_us=100, + min_frame_time_us=1000, + min_count_time_us=100, + type="EIGER", + ), + DetectorListElement( + id=2, + description="EIGER 8.5M (tmp)", + serial_number="123456", + base_ipv4_addr="192.168.0.1", + udp_interface_count=1, + nmodules=1, + width=512, + height=512, + pixel_size_mm=0.1, + readout_time_us=100, + min_frame_time_us=1000, + min_count_time_us=100, + type="EIGER", + ), + ], + current_id=current_id, + ) + yield detector_list + + +@pytest.fixture(scope="function") +def eiger_1_5m(mock_scan_info) -> Generator[Eiger1_5M, None, None]: + """Fixture for the Eiger 1.5M device.""" + name = "eiger_1_5m" + dev = Eiger1_5M(name=name, beam_center=(256, 256), detector_distance=100.0) + dev.scan_info.msg = mock_scan_info + yield dev + + +@pytest.fixture(scope="function") +def eiger_9m(mock_scan_info) -> Generator[Eiger9M, None, None]: + """Fixture for the Eiger 9M device. + Currently only on_connected is different for both devices, all other methods are the same.""" + name = "eiger_9m" + dev = Eiger9M(name=name) + dev.scan_info.msg = mock_scan_info + yield dev + + +@pytest.mark.parametrize("detector_state", ["Idle", "Inactive"]) +def test_eiger_1_5m_on_connected(eiger_1_5m, detector_list, detector_state): + """Test the on_connected logic of the Eiger detector.""" + eiger = eiger_1_5m + detector_id = 1 + with ( + mock.patch.object(eiger.jfj_client, "stop") as mock_jfj_client_stop, + mock.patch.object( + eiger.jfj_client.api, "config_select_detector_get", return_value=detector_list + ), + mock.patch.object( + eiger.jfj_client.api, "status_get", return_value=BrokerStatus(state=detector_state) + ), + mock.patch.object(eiger.jfj_client, "set_detector_settings") as mock_set_det, + mock.patch.object(eiger.jfj_client.api, "config_file_writer_put") as mock_file_writer, + mock.patch.object(eiger, "jfj_preview_client") as mock_jfj_preview_client, + ): + if detector_state != "Idle" or detector_list.current_id != detector_id: + with pytest.raises(RuntimeError): + eiger.on_connected() + mock_jfj_client_stop.assert_called_once() + assert mock_jfj_preview_client.call_count == 0 + else: + eiger.on_connected() + assert mock_set_det.call_args == mock.call( + DetectorSettings(frame_time_us=500, timing=DetectorTiming.TRIGGER), timeout=10 + ) + assert mock_file_writer.call_args == mock.call( + file_writer_settings=FileWriterSettings( + overwrite=True, format=FileWriterFormat.NXMXVDS + ), + _request_timeout=10, + ) + mock_jfj_client_stop.assert_called_once() + assert mock_jfj_preview_client.connect.call_count == 1 + assert mock_jfj_preview_client.start.call_count == 1 + + +@pytest.mark.parametrize("detector_state", ["Idle", "Inactive"]) +def test_eiger_9m_on_connected(eiger_9m, detector_list, detector_state): + """Test the on_connected logic of the Eiger detector.""" + eiger = eiger_9m + detector_id = 2 + with ( + mock.patch.object(eiger.jfj_client, "stop") as mock_jfj_client_stop, + mock.patch.object( + eiger.jfj_client.api, "config_select_detector_get", return_value=detector_list + ), + mock.patch.object( + eiger.jfj_client.api, "status_get", return_value=BrokerStatus(state=detector_state) + ), + mock.patch.object(eiger.jfj_client, "set_detector_settings") as mock_set_det, + mock.patch.object(eiger.jfj_client.api, "config_file_writer_put") as mock_file_writer, + mock.patch.object(eiger, "jfj_preview_client") as mock_jfj_preview_client, + ): + if detector_state != "Idle" or detector_list.current_id != detector_id: + with pytest.raises(RuntimeError): + eiger.on_connected() + mock_jfj_client_stop.assert_called_once() + assert mock_jfj_preview_client.call_count == 0 + else: + eiger.on_connected() + assert mock_set_det.call_args == mock.call( + DetectorSettings(frame_time_us=500, timing=DetectorTiming.TRIGGER), timeout=10 + ) + assert mock_file_writer.call_args == mock.call( + file_writer_settings=FileWriterSettings( + overwrite=True, format=FileWriterFormat.NXMXVDS + ), + _request_timeout=10, + ) + mock_jfj_client_stop.assert_called_once() + assert mock_jfj_preview_client.connect.call_count == 1 + assert mock_jfj_preview_client.start.call_count == 1 + + +@pytest.mark.timeout(20) +def test_eiger_on_stop(eiger_1_5m): + """Test the on_stop logic of the Eiger detector. This is equivalent for 9M and 1_5M.""" + eiger = eiger_1_5m + start_event = threading.Event() + stop_event = threading.Event() + + def tmp_task(): + start_event.set() + try: + while True: + time.sleep(0.1) + finally: + stop_event.set() + + eiger.task_handler.submit_task(tmp_task, run=True) + start_event.wait(timeout=5) # Wait for thread to start + + with mock.patch.object(eiger.jfj_client, "stop") as mock_jfj_client_stop: + eiger.on_stop() + mock_jfj_client_stop.assert_called_once() + stop_event.wait(timeout=5) # Thread should be killed from task_handler + + +@pytest.mark.timeout(25) +@pytest.mark.parametrize("raise_timeout", [True, False]) +def test_eiger_on_complete(eiger_1_5m, raise_timeout): + """Test the on_complete logic of the Eiger detector. This is equivalent for 9M and 1_5M.""" + eiger = eiger_1_5m + + callback_completed_event = threading.Event() + + def _callback_complete(status: DeviceStatus): + if status.done: + callback_completed_event.set() + + unblock_wait_for_idle = threading.Event() + + def mock_wait_for_idle(timeout: int, request_timeout: float): + if unblock_wait_for_idle.wait(timeout): + if raise_timeout: + return False + return True + return False + + with ( + mock.patch.object(eiger.jfj_client, "wait_for_idle", side_effect=mock_wait_for_idle), + mock.patch.object( + eiger.jfj_client.api, + "statistics_data_collection_get", + return_value=MeasurementStatistics(run_number=1), + ), + ): + status = eiger.complete() + status.add_callback(_callback_complete) + assert status.done == False + assert status.success == False + assert eiger.file_event.get() is None + unblock_wait_for_idle.set() + if raise_timeout: + with pytest.raises(TimeoutError): + status.wait(timeout=10) + else: + status.wait(timeout=10) + assert status.done == True + assert status.success == False if raise_timeout else True + + +def test_eiger_file_event_callback(eiger_1_5m, tmp_path): + """Test the file_event callback of the Eiger detector. This is equivalent for 9M and 1_5M.""" + eiger = eiger_1_5m + test_file = tmp_path / "test_file.h5" + eiger._full_path = str(test_file) + assert eiger.file_event.get() is None + status = DeviceStatus(device=eiger, done=True, success=True) + eiger._file_event_callback(status) + file_msg: FileMessage = eiger.file_event.get() + assert file_msg.device_name == eiger.name + assert file_msg.file_path == str(test_file) + assert file_msg.done is True + assert file_msg.successful is True + assert file_msg.hinted_h5_entries == {"data": "entry/data/data"} + status = DeviceStatus(device=eiger, done=False, success=False) + eiger._file_event_callback(status) + file_msg: FileMessage = eiger.file_event.get() + assert file_msg.device_name == eiger.name + assert file_msg.file_path == str(test_file) + assert file_msg.done is False + assert file_msg.successful is False + assert file_msg.hinted_h5_entries == {"data": "entry/data/data"} + + +def test_eiger_on_sage(eiger_1_5m): + """Test the on_stage and on_unstage logic of the Eiger detector. This is equivalent for 9M and 1_5M.""" + eiger = eiger_1_5m + scan_msg = eiger.scan_info.msg + with ( + mock.patch.object(eiger.jfj_client, "wait_for_idle", return_value=True), + mock.patch.object(eiger.jfj_client, "start") as mock_start, + ): + eiger.stage() + assert ( + eiger._full_path + == f"{scan_msg.info['file_components'][0]}_{eiger.name}_master.{scan_msg.info['file_components'][1]}" + ) + file_msg: FileMessage = eiger.file_event.get() + assert file_msg.file_path == eiger._full_path + assert file_msg.done is False + assert file_msg.successful is False + assert file_msg.hinted_h5_entries == {"data": "entry/data/data"} + + data_settings = DatasetSettings( + image_time_us=int(scan_msg.scan_parameters["exp_time"] * 1e6), + ntrigger=int(scan_msg.num_points * scan_msg.scan_parameters["frames_per_trigger"]), + file_prefix=os.path.relpath(eiger._full_path, start="/sls/x12sa/data").removesuffix( + "_master.h5" + ), + beam_x_pxl=eiger.beam_center[0], + beam_y_pxl=eiger.beam_center[1], + detector_distance_mm=eiger.detector_distance, + incident_energy_ke_v=12.0, # hardcoded at this moment as it is hardcoded in the Eiger implementation + ) + assert mock_start.call_args == mock.call(settings=data_settings) + assert eiger.staged is Staged.yes diff --git a/tests/tests_devices/test_eiger9m_csaxs.py b/tests/tests_devices/test_eiger9m_csaxs.py deleted file mode 100644 index 43e3b86..0000000 --- a/tests/tests_devices/test_eiger9m_csaxs.py +++ /dev/null @@ -1,444 +0,0 @@ -# pylint: skip-file -import threading -from unittest import mock - -import ophyd -import pytest -from bec_lib import messages -from bec_lib.endpoints import MessageEndpoints -from bec_server.device_server.tests.utils import DMMock -from ophyd_devices.tests.utils import MockPV - -from csaxs_bec.devices.epics.eiger9m_csaxs import Eiger9McSAXS -from csaxs_bec.devices.tests_utils.utils import patch_dual_pvs - - -@pytest.fixture(scope="function") -def mock_det(): - name = "eiger" - prefix = "X12SA-ES-EIGER9M:" - dm = DMMock() - with mock.patch.object(dm, "connector"): - with ( - mock.patch("ophyd_devices.interfaces.base_classes.bec_device_base.FileWriter"), - mock.patch( - "ophyd_devices.interfaces.base_classes.psi_detector_base.PSIDetectorBase._update_service_config" - ), - ): - with mock.patch.object(ophyd, "cl") as mock_cl: - mock_cl.get_pv = MockPV - mock_cl.thread_class = threading.Thread - with mock.patch.object(Eiger9McSAXS, "_init"): - det = Eiger9McSAXS(name=name, prefix=prefix, device_manager=dm) - patch_dual_pvs(det) - det.TIMEOUT_FOR_SIGNALS = 0.1 - yield det - - -def test_init(): - """Test the _init function:""" - name = "eiger" - prefix = "X12SA-ES-EIGER9M:" - dm = DMMock() - with mock.patch.object(dm, "connector"): - with ( - mock.patch("ophyd_devices.interfaces.base_classes.bec_device_base.FileWriter"), - mock.patch( - "ophyd_devices.interfaces.base_classes.psi_detector_base.PSIDetectorBase._update_service_config" - ), - ): - with mock.patch.object(ophyd, "cl") as mock_cl: - mock_cl.get_pv = MockPV - with ( - mock.patch( - "csaxs_bec.devices.epics.eiger9m_csaxs.Eiger9MSetup.initialize_default_parameter" - ) as mock_default, - mock.patch( - "csaxs_bec.devices.epics.eiger9m_csaxs.Eiger9MSetup.initialize_detector" - ) as mock_init_det, - mock.patch( - "csaxs_bec.devices.epics.eiger9m_csaxs.Eiger9MSetup.initialize_detector_backend" - ) as mock_init_backend, - ): - Eiger9McSAXS(name=name, prefix=prefix, device_manager=dm) - mock_default.assert_called_once() - mock_init_det.assert_called_once() - mock_init_backend.assert_called_once() - - -@pytest.mark.parametrize( - "trigger_source, detector_state, expected_exception", [(2, 1, True), (2, 0, False)] -) -def test_initialize_detector(mock_det, trigger_source, detector_state, expected_exception): - """Test the _init function: - - This includes testing the functions: - - _init_detector - - _stop_det - - _set_trigger - --> Testing the filewriter is done in test_init_filewriter - - Validation upon setting the correct PVs - - """ - mock_det.cam.detector_state._read_pv.mock_data = detector_state - if expected_exception: - with pytest.raises(Exception): - mock_det.timeout = 0.1 - mock_det.custom_prepare.initialize_detector() - else: - mock_det.custom_prepare.initialize_detector() # call the method you want to test - assert mock_det.cam.acquire.get() == 0 - assert mock_det.cam.detector_state.get() == detector_state - assert mock_det.cam.trigger_mode.get() == trigger_source - - -def test_trigger(mock_det): - """Test the trigger function: - Validate that trigger calls the custom_prepare.on_trigger() function - """ - with mock.patch.object(mock_det.custom_prepare, "on_trigger") as mock_on_trigger: - mock_det.trigger() - mock_on_trigger.assert_called_once() - - -@pytest.mark.parametrize( - "readout_time, expected_value", [(1e-3, 3e-3), (3e-3, 3e-3), (5e-3, 5e-3), (None, 3e-3)] -) -def test_update_readout_time(mock_det, readout_time, expected_value): - if readout_time is None: - mock_det.custom_prepare.update_readout_time() - assert mock_det.readout_time == expected_value - else: - mock_det.scaninfo.readout_time = readout_time - mock_det.custom_prepare.update_readout_time() - assert mock_det.readout_time == expected_value - - -@pytest.mark.parametrize( - "eacc, exp_url, daq_status, daq_cfg, expected_exception", - [ - ("e12345", "http://xbl-daq-29:5000", {"state": "READY"}, {"writer_user_id": 12543}, False), - ("e12345", "http://xbl-daq-29:5000", {"state": "READY"}, {"writer_user_id": 15421}, False), - ("e12345", "http://xbl-daq-29:5000", {"state": "BUSY"}, {"writer_user_id": 15421}, True), - ("e12345", "http://xbl-daq-29:5000", {"state": "READY"}, {"writer_ud": 12345}, True), - ], -) -def test_initialize_detector_backend( - mock_det, eacc, exp_url, daq_status, daq_cfg, expected_exception -): - """Test self.custom_prepare.initialize_detector_backend (std daq in this case) - - This includes testing the functions: - - - _update_service_config - - Validation upon checking set values in mocked std_daq instance - """ - with mock.patch("csaxs_bec.devices.epics.eiger9m_csaxs.StdDaqClient") as mock_std_daq: - instance = mock_std_daq.return_value - instance.stop_writer.return_value = None - instance.get_status.return_value = daq_status - instance.get_config.return_value = daq_cfg - mock_det.scaninfo.username = eacc - # scaninfo.username.return_value = eacc - if expected_exception: - with pytest.raises(Exception): - mock_det.timeout = 0.1 - mock_det.custom_prepare.initialize_detector_backend() - else: - mock_det.custom_prepare.initialize_detector_backend() - - instance.stop_writer.assert_called_once() - instance.get_status.assert_called() - instance.set_config.assert_called_once_with(daq_cfg) - - -@pytest.mark.parametrize( - "scaninfo, daq_status, daq_cfg, detector_state, stopped, expected_exception", - [ - ( - { - "eacc": "e12345", - "num_points": 500, - "frames_per_trigger": 1, - "filepath": "test.h5", - "scan_id": "123", - "mokev": 12.4, - }, - {"state": "READY"}, - {"writer_user_id": 12543}, - 5, - False, - False, - ), - ( - { - "eacc": "e12345", - "num_points": 500, - "frames_per_trigger": 1, - "filepath": "test.h5", - "scan_id": "123", - "mokev": 12.4, - }, - {"state": "BUSY"}, - {"writer_user_id": 15421}, - 5, - False, - False, - ), - ( - { - "eacc": "e12345", - "num_points": 500, - "frames_per_trigger": 1, - "filepath": "test.h5", - "scan_id": "123", - "mokev": 18.4, - }, - {"state": "READY"}, - {"writer_user_id": 12345}, - 4, - False, - True, - ), - ], -) -def test_stage( - mock_det, scaninfo, daq_status, daq_cfg, detector_state, stopped, expected_exception -): - with ( - mock.patch.object(mock_det.custom_prepare, "std_client") as mock_std_daq, - mock.patch.object( - mock_det.custom_prepare, "publish_file_location" - ) as mock_publish_file_location, - ): - mock_std_daq.stop_writer.return_value = None - mock_std_daq.get_status.return_value = daq_status - mock_std_daq.get_config.return_value = daq_cfg - mock_det.scaninfo.num_points = scaninfo["num_points"] - mock_det.scaninfo.frames_per_trigger = scaninfo["frames_per_trigger"] - mock_det.filewriter.compile_full_filename.return_value = scaninfo["filepath"] - # TODO consider putting energy as variable in scaninfo - mock_det.device_manager.add_device("mokev", value=12.4) - mock_det.cam.beam_energy.put(scaninfo["mokev"]) - mock_det.stopped = stopped - mock_det.cam.detector_state._read_pv.mock_data = detector_state - with mock.patch.object(mock_det.custom_prepare, "prepare_data_backend") as mock_prep_fw: - mock_det.filepath.set(scaninfo["filepath"]).wait() - if expected_exception: - with pytest.raises(Exception): - mock_det.timeout = 0.1 - mock_det.stage() - else: - mock_det.stage() - mock_prep_fw.assert_called_once() - # Check _prep_det - assert mock_det.cam.num_images.get() == int( - scaninfo["num_points"] * scaninfo["frames_per_trigger"] - ) - assert mock_det.cam.num_frames.get() == 1 - - mock_publish_file_location.assert_called_with(done=False, successful=False) - assert mock_det.cam.acquire.get() == 1 - - -@pytest.mark.parametrize( - "scaninfo, daq_status, expected_exception", - [ - ( - { - "eacc": "e12345", - "num_points": 500, - "frames_per_trigger": 1, - "filepath": "test.h5", - "scan_id": "123", - }, - {"state": "BUSY", "acquisition": {"state": "WAITING_IMAGES"}}, - False, - ), - ( - { - "eacc": "e12345", - "num_points": 500, - "frames_per_trigger": 1, - "filepath": "test.h5", - "scan_id": "123", - }, - {"state": "BUSY", "acquisition": {"state": "WAITING_IMAGES"}}, - False, - ), - ( - { - "eacc": "e12345", - "num_points": 500, - "frames_per_trigger": 1, - "filepath": "test.h5", - "scan_id": "123", - }, - {"state": "BUSY", "acquisition": {"state": "ERROR"}}, - True, - ), - ], -) -def test_prepare_detector_backend(mock_det, scaninfo, daq_status, expected_exception): - with ( - mock.patch.object(mock_det.custom_prepare, "std_client") as mock_std_daq, - mock.patch.object(mock_det.custom_prepare, "filepath_exists") as mock_file_path_exists, - mock.patch.object(mock_det.custom_prepare, "stop_detector_backend") as mock_stop_backend, - mock.patch.object(mock_det, "scaninfo"), - ): - mock_std_daq.start_writer_async.return_value = None - mock_std_daq.get_status.return_value = daq_status - mock_det.filewriter.compile_full_filename.return_value = scaninfo["filepath"] - mock_det.scaninfo.num_points = scaninfo["num_points"] - mock_det.scaninfo.frames_per_trigger = scaninfo["frames_per_trigger"] - - if expected_exception: - with pytest.raises(Exception): - mock_det.timeout = 0.1 - mock_det.custom_prepare.prepare_data_backend() - mock_file_path_exists.assert_called_once() - assert mock_stop_backend.call_count == 2 - - else: - mock_det.custom_prepare.prepare_data_backend() - mock_file_path_exists.assert_called_once() - mock_stop_backend.assert_called_once() - - daq_writer_call = { - "output_file": scaninfo["filepath"], - "n_images": int(scaninfo["num_points"] * scaninfo["frames_per_trigger"]), - } - mock_std_daq.start_writer_async.assert_called_with(daq_writer_call) - - -@pytest.mark.parametrize("stopped, expected_exception", [(False, False), (True, True)]) -def test_complete(mock_det, stopped, expected_exception): - with ( - mock.patch.object(mock_det.custom_prepare, "finished") as mock_finished, - mock.patch.object( - mock_det.custom_prepare, "publish_file_location" - ) as mock_publish_file_location, - ): - mock_det.stopped = stopped - if expected_exception: - mock_det.complete() - assert mock_det.stopped is True - else: - mock_det.complete() - mock_finished.assert_called_once() - mock_publish_file_location.assert_called_with(done=True, successful=True) - assert mock_det.stopped is False - - -def test_stop_detector_backend(mock_det): - with mock.patch.object(mock_det.custom_prepare, "std_client") as mock_std_daq: - mock_std_daq.stop_writer.return_value = None - mock_det.std_client = mock_std_daq - mock_det.custom_prepare.stop_detector_backend() - mock_std_daq.stop_writer.assert_called_once() - - -@pytest.mark.parametrize( - "scaninfo", - [ - ({"filepath": "test.h5", "successful": True, "done": False, "scan_id": "123"}), - ({"filepath": "test.h5", "successful": False, "done": True, "scan_id": "123"}), - ], -) -def test_publish_file_location(mock_det, scaninfo): - mock_det.scaninfo.scan_id = scaninfo["scan_id"] - mock_det.filepath.set(scaninfo["filepath"]).wait() - mock_det.custom_prepare.publish_file_location( - done=scaninfo["done"], successful=scaninfo["successful"] - ) - if scaninfo["successful"] is None: - msg = messages.FileMessage(file_path=scaninfo["filepath"], done=scaninfo["done"]) - else: - msg = messages.FileMessage( - file_path=scaninfo["filepath"], done=scaninfo["done"], successful=scaninfo["successful"] - ) - expected_calls = [ - mock.call( - MessageEndpoints.public_file(scaninfo["scan_id"], mock_det.name), - msg, - pipe=mock_det.connector.pipeline.return_value, - ), - mock.call( - MessageEndpoints.file_event(mock_det.name), - msg, - pipe=mock_det.connector.pipeline.return_value, - ), - ] - assert mock_det.connector.set_and_publish.call_args_list == expected_calls - - -def test_stop(mock_det): - with ( - mock.patch.object(mock_det.custom_prepare, "stop_detector") as mock_stop_det, - mock.patch.object( - mock_det.custom_prepare, "stop_detector_backend" - ) as mock_stop_detector_backend, - ): - mock_det.stop() - mock_stop_det.assert_called_once() - mock_stop_detector_backend.assert_called_once() - assert mock_det.stopped is True - - -@pytest.mark.parametrize( - "stopped, scaninfo, cam_state, daq_status, expected_exception", - [ - ( - False, - {"num_points": 500, "frames_per_trigger": 4}, - 0, - {"acquisition": {"state": "FINISHED", "stats": {"n_write_completed": 2000}}}, - False, - ), - ( - False, - {"num_points": 500, "frames_per_trigger": 4}, - 0, - {"acquisition": {"state": "FINISHED", "stats": {"n_write_completed": 1999}}}, - True, - ), - ( - False, - {"num_points": 500, "frames_per_trigger": 1}, - 1, - {"acquisition": {"state": "READY", "stats": {"n_write_completed": 500}}}, - True, - ), - ( - False, - {"num_points": 500, "frames_per_trigger": 1}, - 0, - {"acquisition": {"state": "FINISHED", "stats": {"n_write_completed": 500}}}, - False, - ), - ], -) -def test_finished(mock_det, stopped, cam_state, daq_status, scaninfo, expected_exception): - with ( - mock.patch.object(mock_det.custom_prepare, "std_client") as mock_std_daq, - mock.patch.object(mock_det.custom_prepare, "stop_detector_backend") as mock_stop_backend, - mock.patch.object(mock_det.custom_prepare, "stop_detector") as mock_stop_det, - ): - mock_std_daq.get_status.return_value = daq_status - mock_det.cam.acquire._read_pv.mock_state = cam_state - mock_det.scaninfo.num_points = scaninfo["num_points"] - mock_det.scaninfo.frames_per_trigger = scaninfo["frames_per_trigger"] - if expected_exception: - with pytest.raises(Exception): - mock_det.timeout = 0.1 - mock_det.custom_prepare.finished() - assert mock_det.stopped is stopped - else: - mock_det.custom_prepare.finished() - if stopped: - assert mock_det.stopped is stopped - - mock_stop_backend.assert_called() - mock_stop_det.assert_called_once()