fix: rate limit warning log in live mode #120
@@ -15,10 +15,10 @@ CI/CD pipelines can run without the pyueye library or the related DLLs installed
|
||||
from __future__ import annotations
|
||||
|
||||
import atexit
|
||||
import time
|
||||
from typing import Literal
|
||||
|
||||
import numpy as np
|
||||
import time
|
||||
from bec_lib.logger import bec_logger
|
||||
|
||||
from csaxs_bec.devices.ids_cameras.base_integration.utils import check_error
|
||||
@@ -67,8 +67,8 @@ class IDSCameraObject:
|
||||
check_error(ueye.is_SetDisplayMode(self.h_cam, ueye.IS_SET_DM_DIB), "IDSCameraObject")
|
||||
|
||||
if (
|
||||
int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
|
||||
== self.ueye.IS_COLORMODE_BAYER
|
||||
int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
|
||||
== self.ueye.IS_COLORMODE_BAYER
|
||||
):
|
||||
logger.info("Bayer color mode detected.")
|
||||
# setup the color depth to the current windows setting
|
||||
@@ -77,16 +77,16 @@ class IDSCameraObject:
|
||||
) # TODO This raises an error - maybe check the m_n_colormode value
|
||||
self.bytes_per_pixel = int(self.n_bits_per_pixel / 8)
|
||||
elif (
|
||||
int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
|
||||
== self.ueye.IS_COLORMODE_CBYCRY
|
||||
int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
|
||||
== self.ueye.IS_COLORMODE_CBYCRY
|
||||
):
|
||||
# for color camera models use RGB32 mode
|
||||
self.m_n_colormode = self.ueye.IS_CM_BGRA8_PACKED
|
||||
self.n_bits_per_pixel = self.ueye.INT(32)
|
||||
self.bytes_per_pixel = int(self.n_bits_per_pixel / 8)
|
||||
elif (
|
||||
int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
|
||||
== self.ueye.IS_COLORMODE_MONOCHROME
|
||||
int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
|
||||
== self.ueye.IS_COLORMODE_MONOCHROME
|
||||
):
|
||||
# for color camera models use RGB32 mode
|
||||
self.m_n_colormode = self.ueye.IS_CM_MONO8
|
||||
@@ -160,12 +160,12 @@ class Camera:
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
camera_id: int,
|
||||
m_n_colormode: Literal[0, 1, 2, 3] = 1,
|
||||
bits_per_pixel: int = 24,
|
||||
connect: bool = True,
|
||||
force_monochrome: bool = False,
|
||||
self,
|
||||
camera_id: int,
|
||||
m_n_colormode: Literal[0, 1, 2, 3] = 1,
|
||||
bits_per_pixel: int = 24,
|
||||
connect: bool = True,
|
||||
force_monochrome: bool = False,
|
||||
):
|
||||
self.ueye = ueye
|
||||
self.camera_id = camera_id
|
||||
@@ -173,8 +173,13 @@ class Camera:
|
||||
self.force_monochrome = force_monochrome
|
||||
self._connected = False
|
||||
self.cam = None
|
||||
|
||||
atexit.register(self.on_disconnect)
|
||||
|
||||
self._enable_warning_rate_limit: bool = False
|
||||
self._last_rate_limited_log: float = 0
|
||||
self._warning_log_rate_limit_s: float = 10
|
||||
|
||||
if connect:
|
||||
self.on_connect()
|
||||
|
||||
@@ -255,7 +260,7 @@ class Camera:
|
||||
def get_image_data(self) -> np.ndarray | None:
|
||||
"""Get the image data from the camera."""
|
||||
if not self._connected:
|
||||
logger.warning("Camera is not connected.")
|
||||
self._rate_limited_warning_log("Camera is not connected.")
|
||||
return None
|
||||
array = self.ueye.get_data(
|
||||
self.cam.pc_image_mem,
|
||||
@@ -282,6 +287,22 @@ class Camera:
|
||||
|
||||
return img
|
||||
|
||||
def set_camera_rate_limiting(self, enabled: bool, rate_limit_s: float | None = None):
|
||||
if rate_limit_s is not None:
|
||||
if rate_limit_s <= 0:
|
||||
raise ValueError(f"Invalid rate limit: {rate_limit_s}, must be positive nonzero.")
|
||||
self._warning_log_rate_limit_s = rate_limit_s
|
||||
self._enable_warning_rate_limit = enabled
|
||||
|
||||
def _rate_limited_warning_log(self, msg: "str"):
|
||||
if (
|
||||
self._enable_warning_rate_limit
|
||||
and time.monotonic() < self._last_rate_limited_log + self._warning_log_rate_limit_s
|
||||
):
|
||||
return
|
||||
self._last_rate_limited_log = time.monotonic()
|
||||
logger.warning(msg)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Example usage
|
||||
|
||||
@@ -29,8 +29,14 @@ class IDSCamera(PSIDeviceBase):
|
||||
to interact with the IDS camera using the pyueye library.
|
||||
"""
|
||||
|
||||
image = Cpt(PreviewSignal, name="image", ndim=2, doc="Preview signal for the camera.", num_rotation_90=0,
|
||||
transpose=False)
|
||||
image = Cpt(
|
||||
PreviewSignal,
|
||||
name="image",
|
||||
ndim=2,
|
||||
doc="Preview signal for the camera.",
|
||||
num_rotation_90=0,
|
||||
transpose=False,
|
||||
)
|
||||
roi_signal = Cpt(
|
||||
AsyncSignal,
|
||||
name="roi_signal",
|
||||
@@ -43,19 +49,19 @@ class IDSCamera(PSIDeviceBase):
|
||||
USER_ACCESS = ["live_mode", "mask", "set_rect_roi", "get_last_image"]
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
name: str,
|
||||
camera_id: int,
|
||||
prefix: str = "",
|
||||
scan_info: ScanInfo | None = None,
|
||||
m_n_colormode: Literal[0, 1, 2, 3] = 1,
|
||||
bits_per_pixel: Literal[8, 24] = 24,
|
||||
live_mode: bool = False,
|
||||
num_rotation_90: int = 0,
|
||||
transpose: bool = False,
|
||||
force_monochrome: bool = False,
|
||||
**kwargs,
|
||||
self,
|
||||
*,
|
||||
name: str,
|
||||
camera_id: int,
|
||||
prefix: str = "",
|
||||
scan_info: ScanInfo | None = None,
|
||||
m_n_colormode: Literal[0, 1, 2, 3] = 1,
|
||||
bits_per_pixel: Literal[8, 24] = 24,
|
||||
live_mode: bool = False,
|
||||
num_rotation_90: int = 0,
|
||||
transpose: bool = False,
|
||||
force_monochrome: bool = False,
|
||||
**kwargs,
|
||||
):
|
||||
"""Initialize the IDS Camera.
|
||||
|
||||
@@ -133,7 +139,7 @@ class IDSCamera(PSIDeviceBase):
|
||||
if x + width > img_shape[1] or y + height > img_shape[0]:
|
||||
raise ValueError("ROI exceeds camera dimensions.")
|
||||
mask = np.zeros(img_shape, dtype=np.uint8)
|
||||
mask[y: y + height, x: x + width] = 1
|
||||
mask[y : y + height, x : x + width] = 1
|
||||
self.mask = mask
|
||||
|
||||
def _start_live(self):
|
||||
@@ -162,6 +168,7 @@ class IDSCamera(PSIDeviceBase):
|
||||
|
||||
def _live_mode_loop(self, stop_event: threading.Event):
|
||||
"""Loop to capture images in live mode."""
|
||||
self.cam.set_camera_rate_limiting(True)
|
||||
while not stop_event.is_set():
|
||||
try:
|
||||
self.process_data(self.cam.get_image_data())
|
||||
@@ -169,6 +176,7 @@ class IDSCamera(PSIDeviceBase):
|
||||
logger.error(f"Error in live mode loop: {e}")
|
||||
break
|
||||
stop_event.wait(0.2) # 5 Hz
|
||||
self.cam.set_camera_rate_limiting(False)
|
||||
|
||||
def process_data(self, image: np.ndarray | None):
|
||||
"""Process the image data before sending it to the preview signal."""
|
||||
|
||||
Reference in New Issue
Block a user