diff --git a/debye_bec/devices/cameras/basler_cam.py b/debye_bec/devices/cameras/basler_cam.py index 746ee7e..c53fb29 100644 --- a/debye_bec/devices/cameras/basler_cam.py +++ b/debye_bec/devices/cameras/basler_cam.py @@ -1,43 +1,38 @@ +"""Basler camera class for Debye BEC.""" + from __future__ import annotations -import time from typing import TYPE_CHECKING -import numpy as np from ophyd import ADBase from ophyd import ADComponent as ADCpt from ophyd_devices.devices.areadetector.cam import AravisDetectorCam from ophyd_devices.devices.areadetector.plugins import ImagePlugin_V35 -from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase -if TYPE_CHECKING: +from debye_bec.devices.cameras.debye_base_cam import DebyeBaseCamera + +if TYPE_CHECKING: # pragma: no cover from bec_lib.devicemanager import ScanInfo class BaslerCamBase(ADBase): + """BaslerCam Base class.""" + cam1 = ADCpt(AravisDetectorCam, "cam1:") image1 = ADCpt(ImagePlugin_V35, "image1:") -class BaslerCam(PSIDeviceBase, BaslerCamBase): - - # preview_2d = PSIComponent(SetableSignal, signal_type=SignalType.PREVIEW, ndim=2, kind=Kind.omitted) +class BaslerCam(DebyeBaseCamera, BaslerCamBase): + """Basler camera class at Debye. IOC prefix: X01DA-ES-XRAYEYE:""" def __init__(self, *, name: str, prefix: str = "", scan_info: ScanInfo | None = None, **kwargs): + """ + Initialize the Prosilica camera class. + + Args: + name (str): Name of the camera. + prefix (str): IOC prefix. + scan_info (ScanInfo): The scan info to use. + """ super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs) - self.last_emit = time.time() - self.update_frequency = 5 # Hz - - def emit_to_bec(self, *args, obj=None, old_value=None, value=None, **kwargs): - if (time.time() - self.last_emit) < (1 / self.update_frequency): - return # Check logic - width = self.image1.array_size.width.get() - height = self.image1.array_size.height.get() - data = np.rot90(np.reshape(value, (height, width)), k=-1, axes=(0, 1)) - - # self.preview_2d.put(data) - self._run_subs(sub_type=self.SUB_DEVICE_MONITOR_2D, value=data) - self.last_emit = time.time() - - def on_connected(self): - self.image1.array_data.subscribe(self.emit_to_bec, run=False) + self._n_rot90 = -1 # Rotate the image by -90 degrees diff --git a/debye_bec/devices/cameras/debye_base_cam.py b/debye_bec/devices/cameras/debye_base_cam.py new file mode 100644 index 0000000..ef3011b --- /dev/null +++ b/debye_bec/devices/cameras/debye_base_cam.py @@ -0,0 +1,130 @@ +"""Base class for Camera integration at Debye.""" + +from __future__ import annotations + +import threading +from typing import TYPE_CHECKING + +import numpy as np +from bec_lib.logger import bec_logger +from ophyd import DeviceStatus, StatusBase +from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase +from typeguard import typechecked + +if TYPE_CHECKING: # pragma: no cover + from bec_lib.devicemanager import ScanInfo + from ophyd_devices.devices.areadetector.plugins import ImagePlugin_V35 + + +logger = bec_logger.logger + + +class DebyeBaseCamera(PSIDeviceBase): + """Base class for Debye cameras.""" + + USER_ACCESS = ["live_mode"] + + def __init__(self, *, name: str, prefix: str = "", scan_info: ScanInfo | None = None, **kwargs): + super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs) + self.image1: "ImagePlugin_V35" + self._update_frequency = 1 # Hz + self._live_mode = False + self._live_mode_event = None + self._task_status = None + self._n_rot90 = -1 + + @property + def live_mode(self) -> bool: + """Live mode status.""" + return self._live_mode + + @typechecked + @live_mode.setter + def live_mode(self, value: bool) -> None: + """ + Set the live mode status. + + Args: + value (bool): True to enable live mode, False to disable. + """ + if value == self._live_mode: + return + self._live_mode = value + if value: + self._start_live_mode() + else: + self._stop_live_mode() + + def _start_live_mode(self) -> None: + """Start live mode.""" + if self._live_mode_event is not None: # Kill task if it exists + self._live_mode_event.set() + self._live_mode_event = None + if self._task_status is not None: + self.task_handler.kill_task(task_status=self._task_status) + self._task_status = None + + self._live_mode_event = threading.Event() + self._task_status = self.task_handler.submit_task(task=self.emit_to_bec) + + def _stop_live_mode(self) -> None: + """Stop live mode.""" + if self._live_mode_event is not None: + self._live_mode_event.set() + self._live_mode_event = None + + def emit_to_bec(self): + """Emit the image data to BEC. If _live_mode_event is set, stop the task.""" + while not self._live_mode_event.wait(1 / self._update_frequency): + value = self.image1.array_data.get() + if value is None: + continue + width = self.image1.array_size.width.get() + height = self.image1.array_size.height.get() + # Geometry correction for the image + data = np.rot90(np.reshape(value, (height, width)), k=self._n_rot90, axes=(0, 1)) + self._run_subs(sub_type=self.SUB_DEVICE_MONITOR_2D, value=data) + + ######################################## + # Beamline Specific Implementations # + ######################################## + + def on_init(self) -> None: + """ + Called when the device is initialized. + + No signals are connected at this point. If you like to + set default values on signals, please use on_connected instead. + """ + + def on_connected(self) -> None: + """ + Called after the device is connected and its signals are connected. + Default values for signals should be set here. + """ + self.live_mode = True + + def on_stage(self) -> DeviceStatus | StatusBase | None: + """ + Called while staging the device. + + Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object. + """ + + def on_unstage(self) -> DeviceStatus | StatusBase | None: + """Called while unstaging the device.""" + + def on_pre_scan(self) -> DeviceStatus | StatusBase | None: + """Called right before the scan starts on all devices automatically.""" + + def on_trigger(self) -> DeviceStatus | StatusBase | None: + """Called when the device is triggered.""" + + def on_complete(self) -> DeviceStatus | StatusBase | None: + """Called to inquire if a device has completed a scans.""" + + def on_kickoff(self) -> DeviceStatus | StatusBase | None: + """Called to kickoff a device for a fly scan. Has to be called explicitly.""" + + def on_stop(self) -> None: + """Called when the device is stopped.""" diff --git a/debye_bec/devices/cameras/prosilica_cam.py b/debye_bec/devices/cameras/prosilica_cam.py index 472ec00..a8cab07 100644 --- a/debye_bec/devices/cameras/prosilica_cam.py +++ b/debye_bec/devices/cameras/prosilica_cam.py @@ -1,41 +1,41 @@ +"""Prosilica camera class for integration of beam_monitor 1/2 cameras.""" + from __future__ import annotations -import time from typing import TYPE_CHECKING -import numpy as np from ophyd import ADBase from ophyd import ADComponent as ADCpt -from ophyd import Component as Cpt -from ophyd import Device from ophyd_devices.devices.areadetector.cam import ProsilicaDetectorCam from ophyd_devices.devices.areadetector.plugins import ImagePlugin_V35 -from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase + +from debye_bec.devices.cameras.debye_base_cam import DebyeBaseCamera if TYPE_CHECKING: # pragma: no cover from bec_lib.devicemanager import ScanInfo class ProsilicaCamBase(ADBase): + """Base class for Prosilica cameras.""" + cam1 = ADCpt(ProsilicaDetectorCam, "cam1:") image1 = ADCpt(ImagePlugin_V35, "image1:") -class ProsilicaCam(PSIDeviceBase, ProsilicaCamBase): +class ProsilicaCam(DebyeBaseCamera, ProsilicaCamBase): + """ + Prosilica camera class, for integration of beam_monitor 1/2 cameras. + Prefixes are: X01DA-OP-GIGE02: and X01DA-OP-GIGE01: + """ def __init__(self, *, name: str, prefix: str = "", scan_info: ScanInfo | None = None, **kwargs): + """ + Initialize the Prosilica camera class. + + Args: + name (str): Name of the camera. + prefix (str): IOC prefix. + scan_info (ScanInfo): The scan info to use. + """ super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs) - self.last_emit = time.time() - self.update_frequency = 5 # Hz - - def emit_to_bec(self, *args, obj=None, old_value=None, value=None, **kwargs): - if (time.time() - self.last_emit) < (1 / self.update_frequency): - return # Check logic - width = self.image1.array_size.width.get() - height = self.image1.array_size.height.get() - data = np.rot90(np.reshape(value, (height, width)), k=-1, axes=(0, 1)) - self._run_subs(sub_type=self.SUB_DEVICE_MONITOR_2D, value=data) - self.last_emit = time.time() - - def on_connected(self): - self.image1.array_data.subscribe(self.emit_to_bec, run=False) + self._n_rot90 = -1 # Rotate the image by -90 degrees diff --git a/tests/tests_devices/test_cameras.py b/tests/tests_devices/test_cameras.py new file mode 100644 index 0000000..d74df11 --- /dev/null +++ b/tests/tests_devices/test_cameras.py @@ -0,0 +1,68 @@ +"""Module to test prosilica and Basler cam integrations.""" + +import threading +from unittest import mock + +import ophyd +import pytest +from ophyd_devices.devices.areadetector.cam import AravisDetectorCam, ProsilicaDetectorCam +from ophyd_devices.devices.areadetector.plugins import ImagePlugin_V35 +from ophyd_devices.tests.utils import MockPV, patch_dual_pvs + +from debye_bec.devices.cameras.basler_cam import BaslerCam +from debye_bec.devices.cameras.prosilica_cam import ProsilicaCam + +# pylint: disable=protected-access +# pylint: disable=redefined-outer-name + + +@pytest.fixture(scope="function") +def mock_basler(): + """Fixture to mock the camera device.""" + name = "cam" + prefix = "test:" + with mock.patch.object(ophyd, "cl") as mock_cl: + mock_cl.get_pv = MockPV + mock_cl.thread_class = threading.Thread + dev = BaslerCam(name=name, prefix=prefix) + patch_dual_pvs(dev) + yield dev + + +def test_basler_init(mock_basler): + """Test the initialization of the Basler camera device.""" + assert mock_basler.name == "cam" + assert mock_basler.prefix == "test:" + assert isinstance(mock_basler.cam1, AravisDetectorCam) + assert isinstance(mock_basler.image1, ImagePlugin_V35) + assert mock_basler._update_frequency == 1 + assert mock_basler._live_mode is False + assert mock_basler._live_mode_event is None + assert mock_basler._task_status is None + assert mock_basler._n_rot90 == -1 + + +@pytest.fixture(scope="function") +def mock_prosilica(): + """Fixture to mock the camera device.""" + name = "cam" + prefix = "test:" + with mock.patch.object(ophyd, "cl") as mock_cl: + mock_cl.get_pv = MockPV + mock_cl.thread_class = threading.Thread + dev = ProsilicaCam(name=name, prefix=prefix) + patch_dual_pvs(dev) + yield dev + + +def test_prosilica_init(mock_prosilica): + """Test the initialization of the Prosilica camera device.""" + assert mock_prosilica.name == "cam" + assert mock_prosilica.prefix == "test:" + assert isinstance(mock_prosilica.cam1, ProsilicaDetectorCam) + assert isinstance(mock_prosilica.image1, ImagePlugin_V35) + assert mock_prosilica._update_frequency == 1 + assert mock_prosilica._live_mode is False + assert mock_prosilica._live_mode_event is None + assert mock_prosilica._task_status is None + assert mock_prosilica._n_rot90 == -1 diff --git a/tests/tests_devices/test_debye_base_cam.py b/tests/tests_devices/test_debye_base_cam.py new file mode 100644 index 0000000..b014a02 --- /dev/null +++ b/tests/tests_devices/test_debye_base_cam.py @@ -0,0 +1,85 @@ +"""Module to test camera base integration class for Debye.""" + +import threading +from unittest import mock + +import ophyd +import pytest +from ophyd_devices.tests.utils import MockPV, patch_dual_pvs + +from debye_bec.devices.cameras.debye_base_cam import DebyeBaseCamera + +# pylint: disable=protected-access +# pylint: disable=redefined-outer-name + + +@pytest.fixture(scope="function") +def mock_cam(): + """Fixture to mock the camera device.""" + name = "cam" + prefix = "test:" + with mock.patch.object(ophyd, "cl") as mock_cl: + mock_cl.get_pv = MockPV + mock_cl.thread_class = threading.Thread + dev = DebyeBaseCamera(name=name, prefix=prefix) + patch_dual_pvs(dev) + yield dev + + +def test_init(mock_cam): + """Test the initialization of the camera device.""" + assert mock_cam.name == "cam" + assert mock_cam.prefix == "test:" + assert mock_cam._update_frequency == 1 + assert mock_cam._live_mode is False + assert mock_cam._live_mode_event is None + assert mock_cam._task_status is None + assert mock_cam._n_rot90 == -1 + + +def test_start_live_mode(mock_cam): + """Test starting live mode.""" + + def mock_emit_to_bec(*args, **kwargs): + """Mock emit_to_bec method.""" + while not mock_cam._live_mode_event.wait(1 / mock_cam._update_frequency): + pass + + with mock.patch.object(mock_cam, "emit_to_bec", side_effect=mock_emit_to_bec): + mock_cam._start_live_mode() + assert mock_cam._live_mode_event is not None + assert mock_cam._task_status is not None + assert mock_cam._task_status.state == "running" + mock_cam._live_mode_event.set() + # Wait for the task to resolve + mock_cam._task_status.wait(timeout=5) + assert mock_cam._task_status.done is True + + +def test_stop_live_mode(mock_cam): + """Test stopping live mode.""" + with mock.patch.object(mock_cam, "_live_mode_event") as mock_live_mode_event: + mock_cam._stop_live_mode() + assert mock_live_mode_event.set.called + assert mock_cam._live_mode_event is None + + +def test_live_mode_property(mock_cam): + """Test the live_mode property.""" + assert mock_cam.live_mode is False + with mock.patch.object(mock_cam, "_start_live_mode") as mock_start_live_mode: + with mock.patch.object(mock_cam, "_stop_live_mode") as mock_stop_live_mode: + # Set to true + mock_cam.live_mode = True + assert mock_start_live_mode.called + assert mock_cam._live_mode is True + assert mock_start_live_mode.call_count == 1 + # Second call should call _start_live_mode + mock_cam.live_mode = True + assert mock_start_live_mode.call_count == 1 + + # Set to false + mock_cam.live_mode = False + assert mock_stop_live_mode.called + assert mock_cam._live_mode is False + assert mock_stop_live_mode.call_count == 1