Fix/fix camera live mode handling #53

Merged
appel_c merged 4 commits from fix/fix_camera_live_mode_handling into main 2025-05-16 15:16:44 +02:00
5 changed files with 321 additions and 43 deletions

View File

@@ -1,43 +1,38 @@
"""Basler camera class for Debye BEC."""
from __future__ import annotations
import time
from typing import TYPE_CHECKING
import numpy as np
from ophyd import ADBase
from ophyd import ADComponent as ADCpt
from ophyd_devices.devices.areadetector.cam import AravisDetectorCam
from ophyd_devices.devices.areadetector.plugins import ImagePlugin_V35
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
if TYPE_CHECKING:
from debye_bec.devices.cameras.debye_base_cam import DebyeBaseCamera
if TYPE_CHECKING: # pragma: no cover
from bec_lib.devicemanager import ScanInfo
class BaslerCamBase(ADBase):
"""BaslerCam Base class."""
cam1 = ADCpt(AravisDetectorCam, "cam1:")
image1 = ADCpt(ImagePlugin_V35, "image1:")
class BaslerCam(PSIDeviceBase, BaslerCamBase):
# preview_2d = PSIComponent(SetableSignal, signal_type=SignalType.PREVIEW, ndim=2, kind=Kind.omitted)
class BaslerCam(DebyeBaseCamera, BaslerCamBase):
"""Basler camera class at Debye. IOC prefix: X01DA-ES-XRAYEYE:"""
def __init__(self, *, name: str, prefix: str = "", scan_info: ScanInfo | None = None, **kwargs):
"""
Initialize the Prosilica camera class.
Args:
name (str): Name of the camera.
prefix (str): IOC prefix.
scan_info (ScanInfo): The scan info to use.
"""
super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs)
self.last_emit = time.time()
self.update_frequency = 5 # Hz
def emit_to_bec(self, *args, obj=None, old_value=None, value=None, **kwargs):
if (time.time() - self.last_emit) < (1 / self.update_frequency):
return # Check logic
width = self.image1.array_size.width.get()
height = self.image1.array_size.height.get()
data = np.rot90(np.reshape(value, (height, width)), k=-1, axes=(0, 1))
# self.preview_2d.put(data)
self._run_subs(sub_type=self.SUB_DEVICE_MONITOR_2D, value=data)
self.last_emit = time.time()
def on_connected(self):
self.image1.array_data.subscribe(self.emit_to_bec, run=False)
self._n_rot90 = -1 # Rotate the image by -90 degrees

View File

@@ -0,0 +1,130 @@
"""Base class for Camera integration at Debye."""
from __future__ import annotations
import threading
from typing import TYPE_CHECKING
import numpy as np
from bec_lib.logger import bec_logger
from ophyd import DeviceStatus, StatusBase
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from typeguard import typechecked
if TYPE_CHECKING: # pragma: no cover
from bec_lib.devicemanager import ScanInfo
from ophyd_devices.devices.areadetector.plugins import ImagePlugin_V35
logger = bec_logger.logger
class DebyeBaseCamera(PSIDeviceBase):
"""Base class for Debye cameras."""
USER_ACCESS = ["live_mode"]
def __init__(self, *, name: str, prefix: str = "", scan_info: ScanInfo | None = None, **kwargs):
super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs)
self.image1: "ImagePlugin_V35"
self._update_frequency = 1 # Hz
self._live_mode = False
self._live_mode_event = None
self._task_status = None
self._n_rot90 = -1
@property
def live_mode(self) -> bool:
"""Live mode status."""
return self._live_mode
@typechecked
@live_mode.setter
def live_mode(self, value: bool) -> None:
"""
Set the live mode status.
Args:
value (bool): True to enable live mode, False to disable.
"""
if value == self._live_mode:
return
self._live_mode = value
if value:
self._start_live_mode()
else:
self._stop_live_mode()
def _start_live_mode(self) -> None:
"""Start live mode."""
if self._live_mode_event is not None: # Kill task if it exists
self._live_mode_event.set()
self._live_mode_event = None
if self._task_status is not None:
self.task_handler.kill_task(task_status=self._task_status)
self._task_status = None
self._live_mode_event = threading.Event()
self._task_status = self.task_handler.submit_task(task=self.emit_to_bec)
def _stop_live_mode(self) -> None:
"""Stop live mode."""
if self._live_mode_event is not None:
self._live_mode_event.set()
self._live_mode_event = None
def emit_to_bec(self):
"""Emit the image data to BEC. If _live_mode_event is set, stop the task."""
while not self._live_mode_event.wait(1 / self._update_frequency):
value = self.image1.array_data.get()
if value is None:
continue
width = self.image1.array_size.width.get()
height = self.image1.array_size.height.get()
# Geometry correction for the image
data = np.rot90(np.reshape(value, (height, width)), k=self._n_rot90, axes=(0, 1))
self._run_subs(sub_type=self.SUB_DEVICE_MONITOR_2D, value=data)
########################################
# Beamline Specific Implementations #
########################################
def on_init(self) -> None:
"""
Called when the device is initialized.
No signals are connected at this point. If you like to
set default values on signals, please use on_connected instead.
"""
def on_connected(self) -> None:
"""
Called after the device is connected and its signals are connected.
Default values for signals should be set here.
"""
self.live_mode = True
def on_stage(self) -> DeviceStatus | StatusBase | None:
"""
Called while staging the device.
Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object.
"""
def on_unstage(self) -> DeviceStatus | StatusBase | None:
"""Called while unstaging the device."""
def on_pre_scan(self) -> DeviceStatus | StatusBase | None:
"""Called right before the scan starts on all devices automatically."""
def on_trigger(self) -> DeviceStatus | StatusBase | None:
"""Called when the device is triggered."""
def on_complete(self) -> DeviceStatus | StatusBase | None:
"""Called to inquire if a device has completed a scans."""
def on_kickoff(self) -> DeviceStatus | StatusBase | None:
"""Called to kickoff a device for a fly scan. Has to be called explicitly."""
def on_stop(self) -> None:
"""Called when the device is stopped."""

View File

@@ -1,41 +1,41 @@
"""Prosilica camera class for integration of beam_monitor 1/2 cameras."""
from __future__ import annotations
import time
from typing import TYPE_CHECKING
import numpy as np
from ophyd import ADBase
from ophyd import ADComponent as ADCpt
from ophyd import Component as Cpt
from ophyd import Device
from ophyd_devices.devices.areadetector.cam import ProsilicaDetectorCam
from ophyd_devices.devices.areadetector.plugins import ImagePlugin_V35
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from debye_bec.devices.cameras.debye_base_cam import DebyeBaseCamera
if TYPE_CHECKING: # pragma: no cover
from bec_lib.devicemanager import ScanInfo
class ProsilicaCamBase(ADBase):
"""Base class for Prosilica cameras."""
cam1 = ADCpt(ProsilicaDetectorCam, "cam1:")
image1 = ADCpt(ImagePlugin_V35, "image1:")
class ProsilicaCam(PSIDeviceBase, ProsilicaCamBase):
class ProsilicaCam(DebyeBaseCamera, ProsilicaCamBase):
"""
Prosilica camera class, for integration of beam_monitor 1/2 cameras.
Prefixes are: X01DA-OP-GIGE02: and X01DA-OP-GIGE01:
"""
def __init__(self, *, name: str, prefix: str = "", scan_info: ScanInfo | None = None, **kwargs):
"""
Initialize the Prosilica camera class.
Args:
name (str): Name of the camera.
prefix (str): IOC prefix.
scan_info (ScanInfo): The scan info to use.
"""
super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs)
self.last_emit = time.time()
self.update_frequency = 5 # Hz
def emit_to_bec(self, *args, obj=None, old_value=None, value=None, **kwargs):
if (time.time() - self.last_emit) < (1 / self.update_frequency):
return # Check logic
width = self.image1.array_size.width.get()
height = self.image1.array_size.height.get()
data = np.rot90(np.reshape(value, (height, width)), k=-1, axes=(0, 1))
self._run_subs(sub_type=self.SUB_DEVICE_MONITOR_2D, value=data)
self.last_emit = time.time()
def on_connected(self):
self.image1.array_data.subscribe(self.emit_to_bec, run=False)
self._n_rot90 = -1 # Rotate the image by -90 degrees

View File

@@ -0,0 +1,68 @@
"""Module to test prosilica and Basler cam integrations."""
import threading
from unittest import mock
import ophyd
import pytest
from ophyd_devices.devices.areadetector.cam import AravisDetectorCam, ProsilicaDetectorCam
from ophyd_devices.devices.areadetector.plugins import ImagePlugin_V35
from ophyd_devices.tests.utils import MockPV, patch_dual_pvs
from debye_bec.devices.cameras.basler_cam import BaslerCam
from debye_bec.devices.cameras.prosilica_cam import ProsilicaCam
# pylint: disable=protected-access
# pylint: disable=redefined-outer-name
@pytest.fixture(scope="function")
def mock_basler():
"""Fixture to mock the camera device."""
name = "cam"
prefix = "test:"
with mock.patch.object(ophyd, "cl") as mock_cl:
mock_cl.get_pv = MockPV
mock_cl.thread_class = threading.Thread
dev = BaslerCam(name=name, prefix=prefix)
patch_dual_pvs(dev)
yield dev
def test_basler_init(mock_basler):
"""Test the initialization of the Basler camera device."""
assert mock_basler.name == "cam"
assert mock_basler.prefix == "test:"
assert isinstance(mock_basler.cam1, AravisDetectorCam)
assert isinstance(mock_basler.image1, ImagePlugin_V35)
assert mock_basler._update_frequency == 1
assert mock_basler._live_mode is False
assert mock_basler._live_mode_event is None
assert mock_basler._task_status is None
assert mock_basler._n_rot90 == -1
@pytest.fixture(scope="function")
def mock_prosilica():
"""Fixture to mock the camera device."""
name = "cam"
prefix = "test:"
with mock.patch.object(ophyd, "cl") as mock_cl:
mock_cl.get_pv = MockPV
mock_cl.thread_class = threading.Thread
dev = ProsilicaCam(name=name, prefix=prefix)
patch_dual_pvs(dev)
yield dev
def test_prosilica_init(mock_prosilica):
"""Test the initialization of the Prosilica camera device."""
assert mock_prosilica.name == "cam"
assert mock_prosilica.prefix == "test:"
assert isinstance(mock_prosilica.cam1, ProsilicaDetectorCam)
assert isinstance(mock_prosilica.image1, ImagePlugin_V35)
assert mock_prosilica._update_frequency == 1
assert mock_prosilica._live_mode is False
assert mock_prosilica._live_mode_event is None
assert mock_prosilica._task_status is None
assert mock_prosilica._n_rot90 == -1

View File

@@ -0,0 +1,85 @@
"""Module to test camera base integration class for Debye."""
import threading
from unittest import mock
import ophyd
import pytest
from ophyd_devices.tests.utils import MockPV, patch_dual_pvs
from debye_bec.devices.cameras.debye_base_cam import DebyeBaseCamera
# pylint: disable=protected-access
# pylint: disable=redefined-outer-name
@pytest.fixture(scope="function")
def mock_cam():
"""Fixture to mock the camera device."""
name = "cam"
prefix = "test:"
with mock.patch.object(ophyd, "cl") as mock_cl:
mock_cl.get_pv = MockPV
mock_cl.thread_class = threading.Thread
dev = DebyeBaseCamera(name=name, prefix=prefix)
patch_dual_pvs(dev)
yield dev
def test_init(mock_cam):
"""Test the initialization of the camera device."""
assert mock_cam.name == "cam"
assert mock_cam.prefix == "test:"
assert mock_cam._update_frequency == 1
assert mock_cam._live_mode is False
assert mock_cam._live_mode_event is None
assert mock_cam._task_status is None
assert mock_cam._n_rot90 == -1
def test_start_live_mode(mock_cam):
"""Test starting live mode."""
def mock_emit_to_bec(*args, **kwargs):
"""Mock emit_to_bec method."""
while not mock_cam._live_mode_event.wait(1 / mock_cam._update_frequency):
pass
with mock.patch.object(mock_cam, "emit_to_bec", side_effect=mock_emit_to_bec):
mock_cam._start_live_mode()
assert mock_cam._live_mode_event is not None
assert mock_cam._task_status is not None
assert mock_cam._task_status.state == "running"
mock_cam._live_mode_event.set()
# Wait for the task to resolve
mock_cam._task_status.wait(timeout=5)
assert mock_cam._task_status.done is True
def test_stop_live_mode(mock_cam):
"""Test stopping live mode."""
with mock.patch.object(mock_cam, "_live_mode_event") as mock_live_mode_event:
mock_cam._stop_live_mode()
assert mock_live_mode_event.set.called
assert mock_cam._live_mode_event is None
def test_live_mode_property(mock_cam):
"""Test the live_mode property."""
assert mock_cam.live_mode is False
with mock.patch.object(mock_cam, "_start_live_mode") as mock_start_live_mode:
with mock.patch.object(mock_cam, "_stop_live_mode") as mock_stop_live_mode:
# Set to true
mock_cam.live_mode = True
assert mock_start_live_mode.called
assert mock_cam._live_mode is True
assert mock_start_live_mode.call_count == 1
# Second call should call _start_live_mode
mock_cam.live_mode = True
assert mock_start_live_mode.call_count == 1
# Set to false
mock_cam.live_mode = False
assert mock_stop_live_mode.called
assert mock_cam._live_mode is False
assert mock_stop_live_mode.call_count == 1