From a639e057e48c665010681e6fa26a15ecfe8b4ece Mon Sep 17 00:00:00 2001 From: appel_c Date: Tue, 6 Jan 2026 17:57:31 +0100 Subject: [PATCH] test(falcon): fix test after falcon refactoring --- csaxs_bec/devices/epics/falcon_csaxs.py | 19 +- tests/tests_devices/test_falcon_csaxs.py | 435 ++++++++++------------- 2 files changed, 196 insertions(+), 258 deletions(-) diff --git a/csaxs_bec/devices/epics/falcon_csaxs.py b/csaxs_bec/devices/epics/falcon_csaxs.py index 3c7bec0..da12e6b 100644 --- a/csaxs_bec/devices/epics/falcon_csaxs.py +++ b/csaxs_bec/devices/epics/falcon_csaxs.py @@ -80,6 +80,7 @@ class FalconcSAXS(PSIDeviceBase, FalconControl): """ Setup Falcon Sitoro detector default parameters once signals are connected """ + self.on_stop() self._initialize_detector() self._initialize_detector_backend() self.set_trigger( @@ -109,11 +110,6 @@ class FalconcSAXS(PSIDeviceBase, FalconControl): def _initialize_detector(self) -> None: """Initialize Falcon detector""" - self.stop_detector() - self.stop_detector_backend() - self.set_trigger( - mapping_mode=MappingSource.MAPPING, trigger_source=TriggerSource.GATE, ignore_gate=0 - ) # 1 Realtime self.preset_mode.put(1) @@ -141,6 +137,8 @@ class FalconcSAXS(PSIDeviceBase, FalconControl): # Size of the queue for the number of spectra allowed in the buffer. If too small, data is lost at high throughput self.hdf5.queue_size.put(self._queue_size) + self.hdf5.file_template.put("%s%s") + self.hdf5.file_write_mode.put(2) # Set nd_array mode to 1: This means segmentation into Spectra within EPICS, 1 is activate, 0 is deactivate self.nd_array_mode.put(1) @@ -154,7 +152,7 @@ class FalconcSAXS(PSIDeviceBase, FalconControl): num_points = self.scan_info.msg.num_points frames_per_trigger = self.scan_info.msg.scan_parameters.get("frames_per_trigger", 1) overall_frames = int(num_points * frames_per_trigger) - exp_time = self.scan_info.exp_time + exp_time = self.scan_info.msg.scan_parameters["exp_time"] self._full_path = get_full_path(self.scan_info.msg, self.name) # Check that exposure time is larger than readout time @@ -170,16 +168,15 @@ class FalconcSAXS(PSIDeviceBase, FalconControl): # TODO: Add h5_entries for linking the Falcon NEXUS entries with the master file self.file_event.put(file_path=self._full_path, done=False, successful=False) - self.preset_real.put(exp_time) + self.preset_real_time.put(exp_time) self.pixels_per_run.put(overall_frames) # Prepare detector backend PVs file_path, file_name = os.path.split(self._full_path) self.hdf5.file_path.put(file_path) self.hdf5.file_name.put(file_name) - self.hdf5.file_template.put("%s%s") + self.hdf5.num_capture.put(overall_frames) - self.hdf5.file_write_mode.put(2) # Reset spectrum counter in filewriter, used for indexing & identifying missing triggers self.hdf5.array_counter.put(0) @@ -193,10 +190,10 @@ class FalconcSAXS(PSIDeviceBase, FalconControl): Method for actions just before the scan starts. """ status_camera = CompareStatus( - self.cam.acquire_busy, ACQUIRESTATUS.DONE, timeout=self._pv_timeout + self.acquire_busy, ACQUIRESTATUS.ACQUIRING, timeout=self._pv_timeout ) status_writer = CompareStatus( - self.hdf.capture, ACQUIRESTATUS.ACQUIRING, timeout=self._pv_timeout + self.hdf5.capture, ACQUIRESTATUS.ACQUIRING, timeout=self._pv_timeout ) # Logical combine of statuses status = status_camera & status_writer diff --git a/tests/tests_devices/test_falcon_csaxs.py b/tests/tests_devices/test_falcon_csaxs.py index 18a7b8a..058e7ae 100644 --- a/tests/tests_devices/test_falcon_csaxs.py +++ b/tests/tests_devices/test_falcon_csaxs.py @@ -8,10 +8,17 @@ import ophyd import pytest from bec_lib import messages from bec_lib.endpoints import MessageEndpoints +from bec_lib.file_utils import get_full_path from bec_server.device_server.tests.utils import DMMock +from ophyd_devices.interfaces.base_classes.psi_device_base import DeviceStoppedError from ophyd_devices.tests.utils import patched_device -from csaxs_bec.devices.epics.falcon_csaxs import FalconcSAXS +from csaxs_bec.devices.epics.falcon_csaxs import ( + ACQUIRESTATUS, + FalconcSAXS, + MappingSource, + TriggerSource, +) @pytest.fixture(scope="function") @@ -28,264 +35,198 @@ def mock_det() -> Generator[FalconcSAXS, None, None]: _mock_pv_initial_value=0, ) as dev: try: + for dotted_name, device in dev.walk_subdevices(include_lazy=True): + device.stage_sigs = {} # Remove stage signals + device.trigger_sigs = {} # Remove trigger signals + if hasattr(device, "plugin_type"): + device.plugin_type._read_pv.mock_data = device._plugin_type yield dev finally: dev.destroy() -# @pytest.mark.parametrize( -# "trigger_source, mapping_source, ignore_gate, pixels_per_buffer, detector_state," -# " expected_exception", -# [(1, 1, 0, 20, 0, False), (1, 1, 0, 20, 1, True)], -# ) -# # TODO rewrite this one, write test for init_detector, init_filewriter is tested -# def test_init_detector( -# mock_det, -# trigger_source, -# mapping_source, -# ignore_gate, -# pixels_per_buffer, -# detector_state, -# expected_exception, -# ): -# """Test the _init function: - -# This includes testing the functions: -# - _init_detector -# - _stop_det -# - _set_trigger -# --> Testing the filewriter is done in test_init_filewriter - -# Validation upon setting the correct PVs - -# """ -# mock_det.value_pixel_per_buffer = pixels_per_buffer -# mock_det.state._read_pv.mock_data = detector_state -# if expected_exception: -# with pytest.raises(FalconTimeoutError): -# mock_det.timeout = 0.1 -# mock_det.custom_prepare.initialize_detector() -# else: -# mock_det.custom_prepare.initialize_detector() -# assert mock_det.state.get() == detector_state -# assert mock_det.collect_mode.get() == mapping_source -# assert mock_det.pixel_advance_mode.get() == trigger_source -# assert mock_det.ignore_gate.get() == ignore_gate - -# assert mock_det.preset_mode.get() == 1 -# assert mock_det.erase_all.get() == 1 -# assert mock_det.input_logic_polarity.get() == 0 -# assert mock_det.auto_pixels_per_buffer.get() == 0 -# assert mock_det.pixels_per_buffer.get() == pixels_per_buffer +def test_falcon_init(mock_det: FalconcSAXS): + """Test the initialization of the FalconcSAXS device.""" + assert mock_det._readout_time == mock_det.MIN_READOUT + assert mock_det._value_pixel_per_buffer == 20 + assert mock_det._queue_size == 2000 + assert mock_det._full_path == "" -@pytest.mark.parametrize( - "readout_time, expected_value", [(1e-3, 3e-3), (3e-3, 3e-3), (5e-3, 5e-3), (None, 3e-3)] -) -def test_update_readout_time(mock_det, readout_time, expected_value): - if readout_time is None: - mock_det._update_readout_time() - assert mock_det._readout_time == expected_value - else: - mock_det.scan_info.readout_time = readout_time - mock_det._update_readout_time() - assert mock_det._readout_time == expected_value +def test_falcon_on_connected(mock_det: FalconcSAXS): + """Test the on_connected method of the FalconcSAXS device.""" + falcon = mock_det + + # Set known default values + falcon.preset_mode.put(-1) + falcon.input_logic_polarity.put(-1) + falcon.auto_pixels_per_buffer.put(-1) + falcon.hdf5.enable.put(-1) + + with ( + mock.patch.object(falcon, "on_stop") as mock_on_stop, + mock.patch.object(falcon, "set_trigger") as mock_set_trigger, + ): + + falcon.on_connected() + mock_on_stop.assert_called_once() + mock_set_trigger.assert_called_once_with( + mapping_mode=MappingSource.MAPPING, trigger_source=TriggerSource.GATE, ignore_gate=0 + ) + + # Detector default PV values + assert falcon.preset_mode.get() == "1" # Real Time + assert falcon.input_logic_polarity.get() == 0 + assert falcon.auto_pixels_per_buffer.get() == 0 + assert falcon.pixels_per_buffer.get() == falcon._value_pixel_per_buffer + + # Backend default PV values + assert falcon.hdf5.enable.get() == "1" # Enabled + assert falcon.hdf5.xml_file_name.get() == "layout.xml" + assert falcon.hdf5.lazy_open.get() == "1" # Enabled + assert falcon.hdf5.temp_suffix.get() == "" + assert falcon.hdf5.queue_size.get() == falcon._queue_size + assert falcon.nd_array_mode.get() == 1 + assert falcon.hdf5.file_template.get() == "%s%s" + assert falcon.hdf5.file_write_mode.get() == 2 -def test_initialize_default_parameter(mock_det): - with mock.patch.object( - mock_det.custom_prepare, "update_readout_time" - ) as mock_update_readout_time: - mock_det.custom_prepare.initialize_default_parameter() - assert mock_det.value_pixel_per_buffer == 20 - mock_update_readout_time.assert_called_once() - - -@pytest.mark.parametrize( - "scaninfo", - [ - { - "eacc": "e12345", - "num_points": 500, - "frames_per_trigger": 1, - "exp_time": 0.1, - "filepath": "test.h5", - "scan_id": "123", - "mokev": 12.4, - } - ], -) -def test_stage(mock_det, scaninfo): - """Test the stage function: - - This includes testing _prep_det +def test_falcon_on_stage(mock_det: FalconcSAXS): """ - with ( - mock.patch.object(mock_det.custom_prepare, "set_trigger") as mock_set_trigger, - mock.patch.object( - mock_det.custom_prepare, "prepare_data_backend" - ) as mock_prep_data_backend, - mock.patch.object( - mock_det.custom_prepare, "publish_file_location" - ) as mock_publish_file_location, - mock.patch.object(mock_det.custom_prepare, "arm_acquisition") as mock_arm_acquisition, - ): - mock_det.scaninfo.exp_time = scaninfo["exp_time"] - mock_det.scaninfo.num_points = scaninfo["num_points"] - mock_det.scaninfo.frames_per_trigger = scaninfo["frames_per_trigger"] - mock_det.stage() - mock_set_trigger.assert_called_once() - assert mock_det.preset_real.get() == scaninfo["exp_time"] - assert mock_det.pixels_per_run.get() == int( - scaninfo["num_points"] * scaninfo["frames_per_trigger"] - ) - mock_prep_data_backend.assert_called_once() - mock_publish_file_location.assert_called_once_with(done=False, successful=False) - mock_arm_acquisition.assert_called_once() + + Test the on_stage method of the FalconcSAXS device. + All relevant information is available in the scan_info attribute and used + to bootstrap the detector for the upcoming acquisition. Two scenarios are tested: + I. Normal case with exposure time larger than readout time + II. Case where exposure time is smaller than readout time, which should raise an exception. + """ + falcon = mock_det + num_points = 10 + exp_time = 0.2 + frames_per_trigger = 5 + falcon.scan_info.msg.num_points = num_points + falcon.scan_info.msg.scan_parameters["frames_per_trigger"] = frames_per_trigger + falcon.scan_info.msg.scan_parameters["exp_time"] = exp_time + falcon.hdf5.array_counter.put(5) # Set to non-zero to check reset + + # I. Normal case + old_file_event = falcon.file_event.get() + with mock.patch.object(falcon.hdf5.array_size, "get", return_value=(1, 1, 1)): + falcon.stage() + + assert falcon.staged is ophyd.Staged.yes + assert falcon._full_path == get_full_path(falcon.scan_info.msg, falcon.name) + file_path = falcon.hdf5.file_path.get() + file_name = falcon.hdf5.file_name.get() + assert os.path.join(file_path, file_name) == falcon._full_path + + assert falcon.preset_real_time.get() == exp_time + assert falcon.pixels_per_run.get() == num_points * frames_per_trigger + assert falcon.hdf5.num_capture.get() == num_points * frames_per_trigger + assert falcon.hdf5.array_counter.get() == 0 + assert falcon.hdf5.capture.get() == 1 + assert falcon.start_all.get() == 1 + + # II. Unstage device first + falcon.unstage() + exp_time = 1e-3 # Smaller than readout time + falcon.scan_info.msg.scan_parameters["exp_time"] = exp_time + with pytest.raises(ValueError): + falcon.stage() + assert falcon.staged is not ophyd.Staged.no -@pytest.mark.parametrize( - "scaninfo", - [ - ( - { - "filepath": "/das/work/p18/p18533/data/S00000-S00999/S00001/data.h5", - "num_points": 500, - "frames_per_trigger": 1, - } - ), - ( - { - "filepath": "/das/work/p18/p18533/data/S00000-S00999/S00001/data1234.h5", - "num_points": 500, - "frames_per_trigger": 1, - } - ), - ], -) -def test_prepare_data_backend(mock_det, scaninfo): - mock_det.filewriter.compile_full_filename.return_value = scaninfo["filepath"] - mock_det.scaninfo.num_points = scaninfo["num_points"] - mock_det.scaninfo.frames_per_trigger = scaninfo["frames_per_trigger"] - mock_det.scaninfo.scan_number = 1 - mock_det.custom_prepare.prepare_data_backend() - file_path, file_name = os.path.split(scaninfo["filepath"]) - assert mock_det.hdf5.file_path.get() == file_path - assert mock_det.hdf5.file_name.get() == file_name - assert mock_det.hdf5.file_template.get() == "%s%s" - assert mock_det.hdf5.num_capture.get() == int( - scaninfo["num_points"] * scaninfo["frames_per_trigger"] +def test_falcon_on_pre_scan(mock_det: FalconcSAXS): + """Test the on_pre_scan method of the FalconcSAXS device.""" + falcon = mock_det + # I. Test normal case with success + falcon.acquire_busy._read_pv.mock_data = ACQUIRESTATUS.DONE + falcon.hdf5.capture._read_pv.mock_data = ACQUIRESTATUS.DONE + falcon = mock_det + st = falcon.on_pre_scan() + assert st.done is False + assert st.success is False + falcon.acquire_busy._read_pv.mock_data = ACQUIRESTATUS.ACQUIRING + assert st.done is False + assert st.success is False + falcon.hdf5.capture._read_pv.mock_data = ACQUIRESTATUS.ACQUIRING + st.wait(3) + assert st.done is True + assert st.success is True + + # II. Test abort case with stop called + falcon.acquire_busy._read_pv.mock_data = ACQUIRESTATUS.DONE + falcon.hdf5.capture._read_pv.mock_data = ACQUIRESTATUS.DONE + st = falcon.on_pre_scan() + assert st.done is False + assert st.success is False + falcon.stop() + with pytest.raises(DeviceStoppedError): + st.wait(3) + assert st.done is True + assert st.success is False + + +def test_falcon_stop(mock_det: FalconcSAXS): + """Test the stop method of the FalconcSAXS device.""" + falcon = mock_det + + falcon.stop_all.put(0) + falcon.hdf5.capture.put(1) + falcon.erase_all.put(0) + falcon.stop() + assert falcon.stop_all.get() == 1 + assert falcon.hdf5.capture.get() == 0 + assert falcon.erase_all.get() == 1 + + +def test_falcon_complete(mock_det: FalconcSAXS): + """Test the complete method of the FalconcSAXS device.""" + falcon = mock_det + + num_points = 10 + frames_per_trigger = 5 + falcon.scan_info.msg.num_points = num_points + falcon.scan_info.msg.scan_parameters["frames_per_trigger"] = frames_per_trigger + + # I. Test normal case with success + falcon.dxp.current_pixel._read_pv.mock_data = num_points * frames_per_trigger - 1 + falcon.hdf5.array_counter._read_pv.mock_data = num_points * frames_per_trigger - 1 + falcon._full_path = "/tmp/fake_path/test.h5" + st = falcon.on_complete() + assert st.done is False + assert st.success is False + falcon.dxp.current_pixel._read_pv.mock_data = num_points * frames_per_trigger + assert st.done is False + assert st.success is False + falcon.hdf5.array_counter._read_pv.mock_data = num_points * frames_per_trigger + st.wait(3) + assert st.done is True + assert st.success is True + assert falcon.file_event.get() == messages.FileMessage( + file_path="/tmp/fake_path/test.h5", + done=True, + successful=True, + device_name=falcon.name, + file_type="h5", + hinted_h5_entries=None, + metadata={}, ) - assert mock_det.hdf5.file_write_mode.get() == 2 - assert mock_det.hdf5.array_counter.get() == 0 - assert mock_det.hdf5.capture.get() == 1 - -@pytest.mark.parametrize( - "scaninfo", - [ - ({"filepath": "test.h5", "successful": True, "done": False, "scan_id": "123"}), - ({"filepath": "test.h5", "successful": False, "done": True, "scan_id": "123"}), - ], -) -def test_publish_file_location(mock_det, scaninfo): - mock_det.scaninfo.scan_id = scaninfo["scan_id"] - mock_det.filepath.set(scaninfo["filepath"]).wait() - mock_det.custom_prepare.publish_file_location( - done=scaninfo["done"], successful=scaninfo["successful"] + # II. Test case where acquisition fails due to interruption + falcon.dxp.current_pixel._read_pv.mock_data = num_points * frames_per_trigger - 1 + st = falcon.on_complete() + assert st.done is False + assert st.success is False + falcon.stop() + with pytest.raises(DeviceStoppedError): + st.wait(3) + assert falcon.file_event.get() == messages.FileMessage( + file_path="/tmp/fake_path/test.h5", + done=True, + successful=False, + device_name=falcon.name, + file_type="h5", + hinted_h5_entries=None, + metadata={}, ) - if scaninfo["successful"] is None: - msg = messages.FileMessage(file_path=scaninfo["filepath"], done=scaninfo["done"]) - else: - msg = messages.FileMessage( - file_path=scaninfo["filepath"], done=scaninfo["done"], successful=scaninfo["successful"] - ) - expected_calls = [ - mock.call( - MessageEndpoints.public_file(scaninfo["scan_id"], mock_det.name), - msg, - pipe=mock_det.connector.pipeline.return_value, - ), - mock.call( - MessageEndpoints.file_event(mock_det.name), - msg, - pipe=mock_det.connector.pipeline.return_value, - ), - ] - assert mock_det.connector.set_and_publish.call_args_list == expected_calls - - -@pytest.mark.parametrize("detector_state, expected_exception", [(1, False), (0, True)]) -def test_arm_acquisition(mock_det, detector_state, expected_exception): - with mock.patch.object(mock_det, "stop") as mock_stop: - mock_det.state._read_pv.mock_data = detector_state - if expected_exception: - with pytest.raises(FalconTimeoutError): - mock_det.timeout = 0.1 - mock_det.custom_prepare.arm_acquisition() - mock_stop.assert_called_once() - else: - mock_det.custom_prepare.arm_acquisition() - assert mock_det.start_all.get() == 1 - - -def test_trigger(mock_det): - with mock.patch.object(mock_det.custom_prepare, "on_trigger") as mock_on_trigger: - mock_det.trigger() - mock_on_trigger.assert_called_once() - - -def test_complete(mock_det): - with ( - mock.patch.object(mock_det.custom_prepare, "finished") as mock_finished, - mock.patch.object( - mock_det.custom_prepare, "publish_file_location" - ) as mock_publish_file_location, - ): - mock_det.stopped = False - mock_det.complete() - assert mock_finished.call_count == 1 - call = mock.call(done=True, successful=True) - assert mock_publish_file_location.call_args == call - - -def test_stop(mock_det): - with ( - mock.patch.object(mock_det.custom_prepare, "stop_detector") as mock_stop_det, - mock.patch.object( - mock_det.custom_prepare, "stop_detector_backend" - ) as mock_stop_detector_backend, - ): - mock_det.stop() - mock_stop_det.assert_called_once() - mock_stop_detector_backend.assert_called_once() - assert mock_det.stopped is True - - -@pytest.mark.parametrize( - "stopped, scaninfo", - [ - (False, {"num_points": 500, "frames_per_trigger": 1}), - (True, {"num_points": 500, "frames_per_trigger": 1}), - ], -) -def test_finished(mock_det, stopped, scaninfo): - with ( - mock.patch.object(mock_det.custom_prepare, "stop_detector") as mock_stop_det, - mock.patch.object( - mock_det.custom_prepare, "stop_detector_backend" - ) as mock_stop_file_writer, - ): - mock_det.stopped = stopped - mock_det.dxp.current_pixel._read_pv.mock_data = int( - scaninfo["num_points"] * scaninfo["frames_per_trigger"] - ) - mock_det.hdf5.array_counter._read_pv.mock_data = int( - scaninfo["num_points"] * scaninfo["frames_per_trigger"] - ) - mock_det.scaninfo.frames_per_trigger = scaninfo["frames_per_trigger"] - mock_det.scaninfo.num_points = scaninfo["num_points"] - mock_det.custom_prepare.finished() - assert mock_det.stopped is stopped - mock_stop_det.assert_called_once() - mock_stop_file_writer.assert_called_once()