feat: change grashopper and tests to match psi_detector_base refactoring
This commit is contained in:
@@ -3,6 +3,7 @@ from unittest import mock
|
||||
|
||||
import pytest
|
||||
|
||||
from tests.utils import patch_dual_pvs
|
||||
from tomcat_bec.devices.grashopper_tomcat import (
|
||||
COLORMODE,
|
||||
AutoMode,
|
||||
@@ -19,16 +20,6 @@ from tomcat_bec.devices.grashopper_tomcat import (
|
||||
)
|
||||
|
||||
|
||||
def patch_dual_pvs(device):
|
||||
for walk in device.walk_signals():
|
||||
if not hasattr(walk.item, "_read_pv"):
|
||||
continue
|
||||
if not hasattr(walk.item, "_write_pv"):
|
||||
continue
|
||||
if walk.item._read_pv.pvname.endswith("_RBV"):
|
||||
walk.item._read_pv = walk.item._write_pv
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def mock_GrashopperSetup():
|
||||
mock_Grashopper = mock.MagicMock()
|
||||
@@ -147,7 +138,9 @@ def test_prepare_detector(mock_GrashopperSetup, scaninfo):
|
||||
mock_GrashopperSetup.parent.cam.acquire_time_auto.put.assert_called_once_with(
|
||||
AutoMode.CONTINUOUS
|
||||
)
|
||||
mock_GrashopperSetup.parent.set_trigger.assert_called_once_with(TriggerSource.SOFTWARE)
|
||||
mock_GrashopperSetup.parent.cam.trigger_source.put.assert_called_once_with(
|
||||
TriggerSource.SOFTWARE
|
||||
)
|
||||
mock_GrashopperSetup.parent.cam.set_image_counter.put.assert_called_once_with(0)
|
||||
mock_set_acquisition_params.assert_called_once()
|
||||
mock_set_exposure_time.assert_called_once_with(scaninfo["exp_time"])
|
||||
@@ -167,6 +160,8 @@ def test_arm_acquisition(mock_GrashopperSetup, detector_state):
|
||||
"""Test the arm_acquisition method."""
|
||||
# Call the function you want to test
|
||||
mock_GrashopperSetup.parent.cam.detector_state.get.return_value = detector_state
|
||||
mock_GrashopperSetup.parent.stopped = False
|
||||
mock_GrashopperSetup.parent.TIMEOUT_FOR_SIGNALS = 0.1
|
||||
mock_GrashopperSetup.parent.timeout = 0.1
|
||||
|
||||
if detector_state != DetectorState.WAITING:
|
||||
@@ -213,6 +208,8 @@ def test_stop_detector(mock_GrashopperSetup, detector_state):
|
||||
"""Test the arm_acquisition method."""
|
||||
# Call the function you want to test
|
||||
mock_GrashopperSetup.parent.cam.detector_state.get.return_value = detector_state
|
||||
mock_GrashopperSetup.parent.stopped = False
|
||||
mock_GrashopperSetup.parent.TIMEOUT_FOR_SIGNALS = 0.1
|
||||
mock_GrashopperSetup.parent.timeout = 0.1
|
||||
|
||||
if detector_state != DetectorState.IDLE:
|
||||
|
||||
@@ -0,0 +1,9 @@
|
||||
def patch_dual_pvs(device):
|
||||
device.wait_for_connection(all_signals=True)
|
||||
for walk in device.walk_signals():
|
||||
if not hasattr(walk.item, "_read_pv"):
|
||||
continue
|
||||
if not hasattr(walk.item, "_write_pv"):
|
||||
continue
|
||||
if walk.item._read_pv.pvname.endswith("_RBV"):
|
||||
walk.item._read_pv = walk.item._write_pv
|
||||
@@ -156,6 +156,7 @@ class GrashopperTOMCATSetup(CustomDetectorMixin):
|
||||
|
||||
def __init__(self, *_args, parent: Device = None, **_kwargs) -> None:
|
||||
super().__init__(*_args, parent=parent, **_kwargs)
|
||||
self._rlock = threading.RLock()
|
||||
|
||||
self.image_shape = (self.parent.cam.image_size_y.get(), self.parent.cam.image_size_x.get())
|
||||
self.monitor_thread = None
|
||||
@@ -163,6 +164,11 @@ class GrashopperTOMCATSetup(CustomDetectorMixin):
|
||||
self.update_frequency = 1
|
||||
self.low_frame_rate = 80
|
||||
|
||||
def on_init(self) -> None:
|
||||
"""Run on initialization of the detector."""
|
||||
self.initialize_detector()
|
||||
self.initialize_detector_backend()
|
||||
|
||||
def initialize_detector(self) -> None:
|
||||
"""Initialize detector."""
|
||||
self.parent.cam.acquire.put(0)
|
||||
@@ -182,6 +188,12 @@ class GrashopperTOMCATSetup(CustomDetectorMixin):
|
||||
self.parent.image.enable_cb.put(1)
|
||||
self.parent.image.set_array_counter.put(0)
|
||||
|
||||
def on_stage(self) -> None:
|
||||
"""Run on stage of the detector."""
|
||||
self.prepare_detector()
|
||||
self.prepare_detector_backend()
|
||||
self.arm_acquisition()
|
||||
|
||||
def set_exposure_time(self, exposure_time: float) -> None:
|
||||
"""Set the detector framerate.
|
||||
|
||||
@@ -201,7 +213,7 @@ class GrashopperTOMCATSetup(CustomDetectorMixin):
|
||||
self.parent.cam.image_mode.put(ImageMode.MULTIPLE)
|
||||
self.parent.cam.acquire_time_auto.put(AutoMode.CONTINUOUS)
|
||||
self.set_exposure_time(self.parent.scaninfo.exp_time)
|
||||
self.parent.set_trigger(TriggerSource.SOFTWARE)
|
||||
self.parent.cam.trigger_source.put(TriggerSource.SOFTWARE)
|
||||
self.parent.cam.set_image_counter.put(0)
|
||||
self.set_acquisition_params()
|
||||
|
||||
@@ -226,7 +238,7 @@ class GrashopperTOMCATSetup(CustomDetectorMixin):
|
||||
signal_conditions = [(self.parent.cam.detector_state.get, DetectorState.WAITING)]
|
||||
if not self.wait_for_signals(
|
||||
signal_conditions=signal_conditions,
|
||||
timeout=self.parent.timeout,
|
||||
timeout=self.parent.TIMEOUT_FOR_SIGNALS,
|
||||
check_stopped=True,
|
||||
all_signals=False,
|
||||
):
|
||||
@@ -267,13 +279,22 @@ class GrashopperTOMCATSetup(CustomDetectorMixin):
|
||||
except Exception as e:
|
||||
logger.debug(f"{e} for image with shape {self.parent.image.array_data.get().shape}")
|
||||
|
||||
def on_unstage(self) -> None:
|
||||
"""Run on unstage of the detector."""
|
||||
self.stop_monitor = True
|
||||
|
||||
def on_stop(self) -> None:
|
||||
"""Run on stop of the detector."""
|
||||
self.stop_detector()
|
||||
self.stop_detector_backend()
|
||||
|
||||
def stop_detector(self) -> None:
|
||||
"""Stop detector."""
|
||||
self.parent.cam.acquire.put(0)
|
||||
signal_conditions = [(self.parent.cam.detector_state.get, DetectorState.IDLE)]
|
||||
if not self.wait_for_signals(
|
||||
signal_conditions=signal_conditions,
|
||||
timeout=self.parent.timeout - self.parent.timeout // 2,
|
||||
timeout=self.parent.TIMEOUT_FOR_SIGNALS - self.parent.TIMEOUT_FOR_SIGNALS // 2,
|
||||
check_stopped=True,
|
||||
all_signals=False,
|
||||
):
|
||||
@@ -281,7 +302,7 @@ class GrashopperTOMCATSetup(CustomDetectorMixin):
|
||||
self.parent.cam.acquire.put(0)
|
||||
if not self.wait_for_signals(
|
||||
signal_conditions=signal_conditions,
|
||||
timeout=self.parent.timeout - self.parent.timeout // 2,
|
||||
timeout=self.parent.TIMEOUT_FOR_SIGNALS - self.parent.TIMEOUT_FOR_SIGNALS // 2,
|
||||
check_stopped=True,
|
||||
all_signals=False,
|
||||
):
|
||||
@@ -394,7 +415,7 @@ class GrashopperTOMCAT(PSIDetectorBase):
|
||||
"""
|
||||
|
||||
# Specify which functions are revealed to the user in BEC client
|
||||
USER_ACCESS = ["describe"]
|
||||
USER_ACCESS = []
|
||||
|
||||
SUB_MONITOR = "monitor"
|
||||
SUB_VALUE = "value"
|
||||
@@ -404,31 +425,11 @@ class GrashopperTOMCAT(PSIDetectorBase):
|
||||
custom_prepare_cls = GrashopperTOMCATSetup
|
||||
# specify minimum readout time for detector
|
||||
MIN_READOUT = 0
|
||||
TIMEOUT_FOR_SIGNALS = 5
|
||||
# specify class attributes
|
||||
cam = ADCpt(SLSDetectorCam, "cam1:")
|
||||
image = ADCpt(SLSImagePlugin, "image1:")
|
||||
|
||||
def stage(self) -> list[object]:
|
||||
rtr = super().stage()
|
||||
self.custom_prepare.arm_acquisition()
|
||||
return rtr
|
||||
|
||||
def unstage(self) -> list[object]:
|
||||
rtr = super().unstage()
|
||||
self.custom_prepare.stop_monitor = True
|
||||
return rtr
|
||||
|
||||
def set_trigger(self, trigger_source: TriggerSource) -> None:
|
||||
"""Set trigger source for the detector.
|
||||
Check the TriggerSource enum for possible values
|
||||
|
||||
Args:
|
||||
trigger_source (TriggerSource): Trigger source for the detector
|
||||
|
||||
"""
|
||||
value = trigger_source
|
||||
self.cam.trigger_source.put(value)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
hopper = GrashopperTOMCAT(name="hopper", prefix="X02DA-PG-USB:", sim_mode=True)
|
||||
|
||||
Reference in New Issue
Block a user