From a1da3a5f40d432560d68c59fad05581217a54b9c Mon Sep 17 00:00:00 2001 From: appel_c Date: Wed, 4 Dec 2024 15:48:18 +0100 Subject: [PATCH] test: update tests --- ophyd_devices/sim/sim_camera.py | 23 ++++++++++++------ tests/test_base_classes.py | 41 ++++++++++++++++++--------------- tests/test_simulation.py | 4 +--- 3 files changed, 40 insertions(+), 28 deletions(-) diff --git a/ophyd_devices/sim/sim_camera.py b/ophyd_devices/sim/sim_camera.py index 31e6068..361ce20 100644 --- a/ophyd_devices/sim/sim_camera.py +++ b/ophyd_devices/sim/sim_camera.py @@ -25,6 +25,7 @@ class SimCameraSetup(CustomDetectorMixin): super().__init__(*args, **kwargs) self._thread_trigger = None self._thread_complete = None + self.file_path = None def on_trigger(self) -> None: """Trigger the camera to acquire images. @@ -69,9 +70,7 @@ class SimCameraSetup(CustomDetectorMixin): FYI: No data is written to disk in the simulation, but upon each trigger it is published to the device_monitor endpoint in REDIS. """ - self.parent.filepath.set( - self.parent.filewriter.compile_full_filename(f"{self.parent.name}") - ).wait() + self.file_path = self.parent.filewriter.compile_full_filename(f"{self.parent.name}") self.parent.frames.set( self.parent.scaninfo.num_points * self.parent.scaninfo.frames_per_trigger @@ -79,10 +78,14 @@ class SimCameraSetup(CustomDetectorMixin): self.parent.exp_time.set(self.parent.scaninfo.exp_time) self.parent.burst.set(self.parent.scaninfo.frames_per_trigger) if self.parent.write_to_disk.get(): - self.parent.h5_writer.on_stage( - file_path=self.parent.filepath.get(), h5_entry="/entry/data/data" + self.parent.h5_writer.on_stage(file_path=self.file_path, h5_entry="/entry/data/data") + self.parent._run_subs( + sub_type=self.parent.SUB_FILE_EVENT, + file_path=self.file_path, + done=False, + successful=False, + hinted_location={"data": "/entry/data/data"}, ) - self.publish_file_location(done=False, successful=False) self.parent.stopped = False def on_complete(self) -> None: @@ -93,7 +96,13 @@ class SimCameraSetup(CustomDetectorMixin): try: if self.parent.write_to_disk.get(): self.parent.h5_writer.on_complete() - self.publish_file_location(done=True, successful=True) + self.parent._run_subs( + sub_type=self.parent.SUB_FILE_EVENT, + file_path=self.file_path, + done=True, + successful=True, + hinted_location={"data": "/entry/data/data"}, + ) if self.parent.stopped: raise DeviceStopError(f"{self.parent.name} was stopped") status.set_finished() diff --git a/tests/test_base_classes.py b/tests/test_base_classes.py index 7e3f9fd..f5172bf 100644 --- a/tests/test_base_classes.py +++ b/tests/test_base_classes.py @@ -6,10 +6,8 @@ import pytest from ophyd import DeviceStatus, Staged from ophyd.utils.errors import RedundantStaging -from ophyd_devices.interfaces.base_classes.psi_detector_base import ( - CustomDetectorMixin, - PSIDetectorBase, -) +from ophyd_devices.interfaces.base_classes.psi_detector_base import PSIDetectorBase +from ophyd_devices.interfaces.base_classes.psi_device_base import CustomPrepare from ophyd_devices.utils.bec_scaninfo_mixin import BecScaninfoMixin from ophyd_devices.utils.errors import DeviceStopError, DeviceTimeoutError @@ -24,7 +22,7 @@ def test_detector_base_init(detector_base): assert detector_base.name == "test_detector" assert "base_path" in detector_base.filewriter.service_config assert isinstance(detector_base.scaninfo, BecScaninfoMixin) - assert issubclass(detector_base.custom_prepare_cls, CustomDetectorMixin) + assert issubclass(detector_base.custom_prepare_cls, CustomPrepare) def test_stage(detector_base): @@ -113,9 +111,14 @@ def test_check_scan_id(detector_base): def test_wait_for_signal(detector_base): - expected_value = "test" + my_value = False + + def my_callback(): + return my_value + + detector_base status = detector_base.custom_prepare.wait_with_status( - [(detector_base.filepath.get, expected_value)], + [(my_callback, True)], check_stopped=True, timeout=5, interval=0.01, @@ -131,14 +134,14 @@ def test_wait_for_signal(detector_base): assert status.exception().args == DeviceStopError(f"{detector_base.name} was stopped").args detector_base.stopped = False status = detector_base.custom_prepare.wait_with_status( - [(detector_base.filepath.get, expected_value)], + [(my_callback, True)], check_stopped=True, timeout=5, interval=0.01, exception_on_timeout=None, ) # Check that thread resolves when expected value is set - detector_base.filepath.set(expected_value) + my_value = True # some delay to allow the stop to take effect time.sleep(0.15) assert status.done is True @@ -147,12 +150,10 @@ def test_wait_for_signal(detector_base): detector_base.stopped = False # Check that wait for status runs into timeout with expectd exception - st = detector_base.filepath.set("wrong_value") - st.wait() - assert detector_base.filepath.get() == "wrong_value" + my_value = "random_value" exception = TimeoutError("Timeout") status = detector_base.custom_prepare.wait_with_status( - [(detector_base.filepath.get, expected_value)], + [(my_callback, True)], check_stopped=True, timeout=0.01, interval=0.01, @@ -165,12 +166,16 @@ def test_wait_for_signal(detector_base): def test_wait_for_signal_returns_exception(detector_base): - expected_value = "test" + my_value = False + + def my_callback(): + return my_value + # Check that wait for status runs into timeout with expectd exception - detector_base.filepath.set("wrong_value") + exception = TimeoutError("Timeout") status = detector_base.custom_prepare.wait_with_status( - [(detector_base.filepath.get, expected_value)], + [(my_callback, True)], check_stopped=True, timeout=0.01, interval=0.01, @@ -184,7 +189,7 @@ def test_wait_for_signal_returns_exception(detector_base): detector_base.stopped = False # Check that standard exception is thrown status = detector_base.custom_prepare.wait_with_status( - [(detector_base.filepath.get, expected_value)], + [(my_callback, True)], check_stopped=True, timeout=0.01, interval=0.01, @@ -195,7 +200,7 @@ def test_wait_for_signal_returns_exception(detector_base): assert ( status.exception().args == DeviceTimeoutError( - f"Timeout error for {detector_base.name} while waiting for signals {[(detector_base.filepath.get, expected_value)]}" + f"Timeout error for {detector_base.name} while waiting for signals {[(my_callback, True)]}" ).args ) assert status.success is False diff --git a/tests/test_simulation.py b/tests/test_simulation.py index a4b443f..f06769d 100644 --- a/tests/test_simulation.py +++ b/tests/test_simulation.py @@ -449,9 +449,7 @@ def test_cam_stage_h5writer(camera): """Test the H5Writer class""" with ( mock.patch.object(camera, "h5_writer") as mock_h5_writer, - mock.patch.object( - camera.custom_prepare, "publish_file_location" - ) as mock_publish_file_location, + mock.patch.object(camera, "_run_subs") as mock_run_subs, ): camera.scaninfo.num_points = 10 camera.scaninfo.frames_per_trigger = 1