From e6f6ef094a383d2ae5d821172a031f7ab1126270 Mon Sep 17 00:00:00 2001 From: appel_c Date: Tue, 14 May 2024 16:37:49 +0200 Subject: [PATCH] feat: change grashopper and tests to match psi_detector_base refactoring --- tests/__init__.py | 0 tests/tests_devices/test_grashopper_tomcat.py | 19 +++---- tests/utils.py | 9 ++++ tomcat_bec/devices/grashopper_tomcat.py | 53 ++++++++++--------- 4 files changed, 44 insertions(+), 37 deletions(-) create mode 100644 tests/__init__.py create mode 100644 tests/utils.py diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/tests_devices/test_grashopper_tomcat.py b/tests/tests_devices/test_grashopper_tomcat.py index def052b..4b1745a 100644 --- a/tests/tests_devices/test_grashopper_tomcat.py +++ b/tests/tests_devices/test_grashopper_tomcat.py @@ -3,6 +3,7 @@ from unittest import mock import pytest +from tests.utils import patch_dual_pvs from tomcat_bec.devices.grashopper_tomcat import ( COLORMODE, AutoMode, @@ -19,16 +20,6 @@ from tomcat_bec.devices.grashopper_tomcat import ( ) -def patch_dual_pvs(device): - for walk in device.walk_signals(): - if not hasattr(walk.item, "_read_pv"): - continue - if not hasattr(walk.item, "_write_pv"): - continue - if walk.item._read_pv.pvname.endswith("_RBV"): - walk.item._read_pv = walk.item._write_pv - - @pytest.fixture(scope="function") def mock_GrashopperSetup(): mock_Grashopper = mock.MagicMock() @@ -147,7 +138,9 @@ def test_prepare_detector(mock_GrashopperSetup, scaninfo): mock_GrashopperSetup.parent.cam.acquire_time_auto.put.assert_called_once_with( AutoMode.CONTINUOUS ) - mock_GrashopperSetup.parent.set_trigger.assert_called_once_with(TriggerSource.SOFTWARE) + mock_GrashopperSetup.parent.cam.trigger_source.put.assert_called_once_with( + TriggerSource.SOFTWARE + ) mock_GrashopperSetup.parent.cam.set_image_counter.put.assert_called_once_with(0) mock_set_acquisition_params.assert_called_once() mock_set_exposure_time.assert_called_once_with(scaninfo["exp_time"]) @@ -167,6 +160,8 @@ def test_arm_acquisition(mock_GrashopperSetup, detector_state): """Test the arm_acquisition method.""" # Call the function you want to test mock_GrashopperSetup.parent.cam.detector_state.get.return_value = detector_state + mock_GrashopperSetup.parent.stopped = False + mock_GrashopperSetup.parent.TIMEOUT_FOR_SIGNALS = 0.1 mock_GrashopperSetup.parent.timeout = 0.1 if detector_state != DetectorState.WAITING: @@ -213,6 +208,8 @@ def test_stop_detector(mock_GrashopperSetup, detector_state): """Test the arm_acquisition method.""" # Call the function you want to test mock_GrashopperSetup.parent.cam.detector_state.get.return_value = detector_state + mock_GrashopperSetup.parent.stopped = False + mock_GrashopperSetup.parent.TIMEOUT_FOR_SIGNALS = 0.1 mock_GrashopperSetup.parent.timeout = 0.1 if detector_state != DetectorState.IDLE: diff --git a/tests/utils.py b/tests/utils.py new file mode 100644 index 0000000..e4843cb --- /dev/null +++ b/tests/utils.py @@ -0,0 +1,9 @@ +def patch_dual_pvs(device): + device.wait_for_connection(all_signals=True) + for walk in device.walk_signals(): + if not hasattr(walk.item, "_read_pv"): + continue + if not hasattr(walk.item, "_write_pv"): + continue + if walk.item._read_pv.pvname.endswith("_RBV"): + walk.item._read_pv = walk.item._write_pv diff --git a/tomcat_bec/devices/grashopper_tomcat.py b/tomcat_bec/devices/grashopper_tomcat.py index 0c94801..25b02b5 100644 --- a/tomcat_bec/devices/grashopper_tomcat.py +++ b/tomcat_bec/devices/grashopper_tomcat.py @@ -156,6 +156,7 @@ class GrashopperTOMCATSetup(CustomDetectorMixin): def __init__(self, *_args, parent: Device = None, **_kwargs) -> None: super().__init__(*_args, parent=parent, **_kwargs) + self._rlock = threading.RLock() self.image_shape = (self.parent.cam.image_size_y.get(), self.parent.cam.image_size_x.get()) self.monitor_thread = None @@ -163,6 +164,11 @@ class GrashopperTOMCATSetup(CustomDetectorMixin): self.update_frequency = 1 self.low_frame_rate = 80 + def on_init(self) -> None: + """Run on initialization of the detector.""" + self.initialize_detector() + self.initialize_detector_backend() + def initialize_detector(self) -> None: """Initialize detector.""" self.parent.cam.acquire.put(0) @@ -182,6 +188,12 @@ class GrashopperTOMCATSetup(CustomDetectorMixin): self.parent.image.enable_cb.put(1) self.parent.image.set_array_counter.put(0) + def on_stage(self) -> None: + """Run on stage of the detector.""" + self.prepare_detector() + self.prepare_detector_backend() + self.arm_acquisition() + def set_exposure_time(self, exposure_time: float) -> None: """Set the detector framerate. @@ -201,7 +213,7 @@ class GrashopperTOMCATSetup(CustomDetectorMixin): self.parent.cam.image_mode.put(ImageMode.MULTIPLE) self.parent.cam.acquire_time_auto.put(AutoMode.CONTINUOUS) self.set_exposure_time(self.parent.scaninfo.exp_time) - self.parent.set_trigger(TriggerSource.SOFTWARE) + self.parent.cam.trigger_source.put(TriggerSource.SOFTWARE) self.parent.cam.set_image_counter.put(0) self.set_acquisition_params() @@ -226,7 +238,7 @@ class GrashopperTOMCATSetup(CustomDetectorMixin): signal_conditions = [(self.parent.cam.detector_state.get, DetectorState.WAITING)] if not self.wait_for_signals( signal_conditions=signal_conditions, - timeout=self.parent.timeout, + timeout=self.parent.TIMEOUT_FOR_SIGNALS, check_stopped=True, all_signals=False, ): @@ -267,13 +279,22 @@ class GrashopperTOMCATSetup(CustomDetectorMixin): except Exception as e: logger.debug(f"{e} for image with shape {self.parent.image.array_data.get().shape}") + def on_unstage(self) -> None: + """Run on unstage of the detector.""" + self.stop_monitor = True + + def on_stop(self) -> None: + """Run on stop of the detector.""" + self.stop_detector() + self.stop_detector_backend() + def stop_detector(self) -> None: """Stop detector.""" self.parent.cam.acquire.put(0) signal_conditions = [(self.parent.cam.detector_state.get, DetectorState.IDLE)] if not self.wait_for_signals( signal_conditions=signal_conditions, - timeout=self.parent.timeout - self.parent.timeout // 2, + timeout=self.parent.TIMEOUT_FOR_SIGNALS - self.parent.TIMEOUT_FOR_SIGNALS // 2, check_stopped=True, all_signals=False, ): @@ -281,7 +302,7 @@ class GrashopperTOMCATSetup(CustomDetectorMixin): self.parent.cam.acquire.put(0) if not self.wait_for_signals( signal_conditions=signal_conditions, - timeout=self.parent.timeout - self.parent.timeout // 2, + timeout=self.parent.TIMEOUT_FOR_SIGNALS - self.parent.TIMEOUT_FOR_SIGNALS // 2, check_stopped=True, all_signals=False, ): @@ -394,7 +415,7 @@ class GrashopperTOMCAT(PSIDetectorBase): """ # Specify which functions are revealed to the user in BEC client - USER_ACCESS = ["describe"] + USER_ACCESS = [] SUB_MONITOR = "monitor" SUB_VALUE = "value" @@ -404,31 +425,11 @@ class GrashopperTOMCAT(PSIDetectorBase): custom_prepare_cls = GrashopperTOMCATSetup # specify minimum readout time for detector MIN_READOUT = 0 + TIMEOUT_FOR_SIGNALS = 5 # specify class attributes cam = ADCpt(SLSDetectorCam, "cam1:") image = ADCpt(SLSImagePlugin, "image1:") - def stage(self) -> list[object]: - rtr = super().stage() - self.custom_prepare.arm_acquisition() - return rtr - - def unstage(self) -> list[object]: - rtr = super().unstage() - self.custom_prepare.stop_monitor = True - return rtr - - def set_trigger(self, trigger_source: TriggerSource) -> None: - """Set trigger source for the detector. - Check the TriggerSource enum for possible values - - Args: - trigger_source (TriggerSource): Trigger source for the detector - - """ - value = trigger_source - self.cam.trigger_source.put(value) - if __name__ == "__main__": hopper = GrashopperTOMCAT(name="hopper", prefix="X02DA-PG-USB:", sim_mode=True)