diff --git a/csaxs_bec/devices/ids_cameras/base_integration/camera.py b/csaxs_bec/devices/ids_cameras/base_integration/camera.py index bc73210..fe521b5 100644 --- a/csaxs_bec/devices/ids_cameras/base_integration/camera.py +++ b/csaxs_bec/devices/ids_cameras/base_integration/camera.py @@ -15,10 +15,10 @@ CI/CD pipelines can run without the pyueye library or the related DLLs installed from __future__ import annotations import atexit +import time from typing import Literal import numpy as np -import time from bec_lib.logger import bec_logger from csaxs_bec.devices.ids_cameras.base_integration.utils import check_error @@ -67,8 +67,8 @@ class IDSCameraObject: check_error(ueye.is_SetDisplayMode(self.h_cam, ueye.IS_SET_DM_DIB), "IDSCameraObject") if ( - int.from_bytes(self.s_info.nColorMode.value, byteorder="big") - == self.ueye.IS_COLORMODE_BAYER + int.from_bytes(self.s_info.nColorMode.value, byteorder="big") + == self.ueye.IS_COLORMODE_BAYER ): logger.info("Bayer color mode detected.") # setup the color depth to the current windows setting @@ -77,16 +77,16 @@ class IDSCameraObject: ) # TODO This raises an error - maybe check the m_n_colormode value self.bytes_per_pixel = int(self.n_bits_per_pixel / 8) elif ( - int.from_bytes(self.s_info.nColorMode.value, byteorder="big") - == self.ueye.IS_COLORMODE_CBYCRY + int.from_bytes(self.s_info.nColorMode.value, byteorder="big") + == self.ueye.IS_COLORMODE_CBYCRY ): # for color camera models use RGB32 mode self.m_n_colormode = self.ueye.IS_CM_BGRA8_PACKED self.n_bits_per_pixel = self.ueye.INT(32) self.bytes_per_pixel = int(self.n_bits_per_pixel / 8) elif ( - int.from_bytes(self.s_info.nColorMode.value, byteorder="big") - == self.ueye.IS_COLORMODE_MONOCHROME + int.from_bytes(self.s_info.nColorMode.value, byteorder="big") + == self.ueye.IS_COLORMODE_MONOCHROME ): # for color camera models use RGB32 mode self.m_n_colormode = self.ueye.IS_CM_MONO8 @@ -160,12 +160,12 @@ class Camera: """ def __init__( - self, - camera_id: int, - m_n_colormode: Literal[0, 1, 2, 3] = 1, - bits_per_pixel: int = 24, - connect: bool = True, - force_monochrome: bool = False, + self, + camera_id: int, + m_n_colormode: Literal[0, 1, 2, 3] = 1, + bits_per_pixel: int = 24, + connect: bool = True, + force_monochrome: bool = False, ): self.ueye = ueye self.camera_id = camera_id @@ -173,8 +173,13 @@ class Camera: self.force_monochrome = force_monochrome self._connected = False self.cam = None + atexit.register(self.on_disconnect) + self._enable_warning_rate_limit: bool = False + self._last_rate_limited_log: float = 0 + self._warning_log_rate_limit_s: float = 10 + if connect: self.on_connect() @@ -255,7 +260,7 @@ class Camera: def get_image_data(self) -> np.ndarray | None: """Get the image data from the camera.""" if not self._connected: - logger.warning("Camera is not connected.") + self._rate_limited_warning_log("Camera is not connected.") return None array = self.ueye.get_data( self.cam.pc_image_mem, @@ -282,6 +287,22 @@ class Camera: return img + def set_camera_rate_limiting(self, enabled: bool, rate_limit_s: float | None = None): + if rate_limit_s is not None: + if rate_limit_s <= 0: + raise ValueError(f"Invalid rate limit: {rate_limit_s}, must be positive nonzero.") + self._warning_log_rate_limit_s = rate_limit_s + self._enable_warning_rate_limit = enabled + + def _rate_limited_warning_log(self, msg: "str"): + if ( + self._enable_warning_rate_limit + and time.monotonic() < self._last_rate_limited_log + self._warning_log_rate_limit_s + ): + return + self._last_rate_limited_log = time.monotonic() + logger.warning(msg) + if __name__ == "__main__": # Example usage diff --git a/csaxs_bec/devices/ids_cameras/ids_camera.py b/csaxs_bec/devices/ids_cameras/ids_camera.py index 0691769..750e877 100644 --- a/csaxs_bec/devices/ids_cameras/ids_camera.py +++ b/csaxs_bec/devices/ids_cameras/ids_camera.py @@ -29,8 +29,14 @@ class IDSCamera(PSIDeviceBase): to interact with the IDS camera using the pyueye library. """ - image = Cpt(PreviewSignal, name="image", ndim=2, doc="Preview signal for the camera.", num_rotation_90=0, - transpose=False) + image = Cpt( + PreviewSignal, + name="image", + ndim=2, + doc="Preview signal for the camera.", + num_rotation_90=0, + transpose=False, + ) roi_signal = Cpt( AsyncSignal, name="roi_signal", @@ -43,19 +49,19 @@ class IDSCamera(PSIDeviceBase): USER_ACCESS = ["live_mode", "mask", "set_rect_roi", "get_last_image"] def __init__( - self, - *, - name: str, - camera_id: int, - prefix: str = "", - scan_info: ScanInfo | None = None, - m_n_colormode: Literal[0, 1, 2, 3] = 1, - bits_per_pixel: Literal[8, 24] = 24, - live_mode: bool = False, - num_rotation_90: int = 0, - transpose: bool = False, - force_monochrome: bool = False, - **kwargs, + self, + *, + name: str, + camera_id: int, + prefix: str = "", + scan_info: ScanInfo | None = None, + m_n_colormode: Literal[0, 1, 2, 3] = 1, + bits_per_pixel: Literal[8, 24] = 24, + live_mode: bool = False, + num_rotation_90: int = 0, + transpose: bool = False, + force_monochrome: bool = False, + **kwargs, ): """Initialize the IDS Camera. @@ -133,7 +139,7 @@ class IDSCamera(PSIDeviceBase): if x + width > img_shape[1] or y + height > img_shape[0]: raise ValueError("ROI exceeds camera dimensions.") mask = np.zeros(img_shape, dtype=np.uint8) - mask[y: y + height, x: x + width] = 1 + mask[y : y + height, x : x + width] = 1 self.mask = mask def _start_live(self): @@ -162,6 +168,7 @@ class IDSCamera(PSIDeviceBase): def _live_mode_loop(self, stop_event: threading.Event): """Loop to capture images in live mode.""" + self.cam.set_camera_rate_limiting(True) while not stop_event.is_set(): try: self.process_data(self.cam.get_image_data()) @@ -169,6 +176,7 @@ class IDSCamera(PSIDeviceBase): logger.error(f"Error in live mode loop: {e}") break stop_event.wait(0.2) # 5 Hz + self.cam.set_camera_rate_limiting(False) def process_data(self, image: np.ndarray | None): """Process the image data before sending it to the preview signal."""