refactor(ids): refactor ids camera

This commit is contained in:
2025-07-24 10:19:04 +02:00
parent b6af93806a
commit 8f7ada2f92
12 changed files with 1196 additions and 521 deletions

View File

@@ -6,4 +6,6 @@ _commit: v1.1.2
_src_path: https://github.com/bec-project/plugin_copier_template.git
make_commit: false
project_name: csaxs_bec
widget_plugins_input: []
widget_plugins_input:
- name: csaxs_test
use_ui: true

View File

@@ -28,4 +28,17 @@ mcs:
onFailure: raise
enabled: true
readoutPriority: monitored
softwareTrigger: false
softwareTrigger: false
ids_cam:
description: IDS camera for live image acquisition
deviceClass: csaxs_bec.devices.ids_cameras.ids_camera_new.IDSCamera
deviceConfig:
camera_id: 201
bits_per_pixel: 24
m_n_colormode: 1
live_mode: True
onFailure: raise
enabled: true
readoutPriority: async
softwareTrigger: True

View File

@@ -165,20 +165,20 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
)
self.cancel_on_stop(status_ready_read)
self.cancel_on_stop(status_acquiring)
status_ready_read.wait(2)
status_ready_read.wait(10)
mcs.ready_to_read.put(READYTOREAD.PROCESSING)
mcs.erase_start.put(1)
status_acquiring.wait(
timeout=2
timeout=10
) # 2 s wait for mcs card to start should be more than enough..
st = StatusBase()
st = DeviceStatus(self)
self.cancel_on_stop(st)
self.trigger_shot.put(1, use_complete=True)
time.sleep(self.scan_info.msg.scan_parameters["exp_time"])
self.cancel_on_stop(st)
status = self.wait_for_status(status=st, bit_event=STATUSBITS.END_OF_DELAY, timeout=2)
status = self.wait_for_status(status=st, bit_event=STATUSBITS.END_OF_DELAY, timeout=10)
return status
def wait_for_status(
@@ -198,7 +198,11 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
if (STATUSBITS(event_status) & bit_event) == bit_event:
status.set_finished()
if time.time() - current_time > timeout:
status.set_exception(TimeoutError(f"Timeout waiting for status {status}"))
status.set_exception(
TimeoutError(
f"Timeout waiting for status of device {self.name} for event_status {bit_event}"
)
)
break
time.sleep(0.1)
time.sleep(0.05) # Give time for the IOC to be ready again

View File

@@ -118,7 +118,7 @@ class DDG2(PSIDeviceBase, DelayGeneratorCSAXS):
"""
def wait_for_status(
self, status: StatusBase, bit_event: STATUSBITS, timeout: float = 2
self, status: DeviceStatus, bit_event: STATUSBITS, timeout: float = 5
) -> None:
"""Wait for a event status bit to be set.
@@ -134,7 +134,11 @@ class DDG2(PSIDeviceBase, DelayGeneratorCSAXS):
if (STATUSBITS(event_status) & bit_event) == bit_event:
status.set_finished()
if time.time() - current_time > timeout:
status.set_exception(TimeoutError(f"Timeout waiting for status {status}"))
status.set_exception(
TimeoutError(
f"Timeout waiting for status of device {self.name} for event_status {bit_event}"
)
)
break
time.sleep(0.1)
time.sleep(0.05) # Give time for the IOC to be ready again

View File

@@ -403,7 +403,7 @@ class DelayGeneratorCSAXS(Device):
In addition, the io layer allows setting amplitude, offset and polarity for each pair.
"""
_pv_timeout: float = 1.5 # Default timeout for PV operations in seconds
_pv_timeout: float = 5 # Default timeout for PV operations in seconds
# Front Panel
t0 = Cpt(StaticPair, "T0", name="t0", doc="T0 static pair")

View File

@@ -118,7 +118,7 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
name=name, prefix=prefix, scan_info=scan_info, device_manager=device_manager, **kwargs
)
self._mcs_clock = 1e7 # 10MHz clock -> 1e7 Hz
self._pv_timeout = 2
self._pv_timeout = 3 # TODO remove timeout once #129 in ophyd_devices is solved
self._rlock = RLock()
self.counter_mapping = {
f"{self.counters.name}_mca1": "current1",

View File

@@ -0,0 +1,251 @@
"""This module provides a Camera class for handling IDS cameras using the pyueye SDK library."""
from __future__ import annotations
import atexit
from typing import Literal
import numpy as np
from bec_lib.logger import bec_logger
from csaxs_bec.devices.ids_cameras.base_integration.utils import check_error
logger = bec_logger.logger
try:
from pyueye import ueye
except ImportError as exc:
logger.warning(f"The pyueye library is not properly installed : {exc}")
ueye = None # type: ignore[assignment]
class IDSCameraObject:
"""Base class for IDS Camera object.
Args:
device_id (int): The ID of the camera device. # e.g. 201; check idscamera tool
m_n_colormode (int): Color mode for the camera. # 1 for cSAXS color cameras
bits_per_pixel (int): Number of bits per pixel for the camera. # 24 for color cameras, 8 for monochrome cameras
"""
def __init__(self, device_id: int, m_n_colormode, bits_per_pixel):
self.ueye = ueye
self._device_id = device_id
self.h_cam = ueye.HIDS(device_id)
self.s_info = ueye.SENSORINFO()
self.c_info = ueye.CAMINFO()
self.rect_roi = ueye.IS_RECT()
self.pc_image_mem = ueye.c_mem_p()
self.mem_id = ueye.int()
self.pitch = ueye.INT()
self.m_n_colormode = ueye.INT(m_n_colormode)
self.n_bits_per_pixel = ueye.INT(bits_per_pixel)
self.bytes_per_pixel = int(self.n_bits_per_pixel / 8)
# Sequence to initialize the camera
check_error(ueye.is_InitCamera(self.h_cam, None), "IDSCameraObject")
check_error(ueye.is_GetSensorInfo(self.h_cam, self.s_info), "IDSCameraObject")
check_error(ueye.is_GetCameraInfo(self.h_cam, self.c_info), "IDSCameraObject")
check_error(ueye.is_ResetToDefault(self.h_cam), "IDSCameraObject")
check_error(ueye.is_SetDisplayMode(self.h_cam, ueye.IS_SET_DM_DIB), "IDSCameraObject")
if (
int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
== self.ueye.IS_COLORMODE_BAYER
):
print("Bayer color mode detected.")
# setup the color depth to the current windows setting
self.ueye.is_GetColorDepth(
self.h_cam, self.n_bits_per_pixel, self.m_n_colormode
) # TODO This raises an error - maybe check the m_n_colormode value
self.bytes_per_pixel = int(self.n_bits_per_pixel / 8)
elif (
int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
== self.ueye.IS_COLORMODE_CBYCRY
):
# for color camera models use RGB32 mode
self.m_n_colormode = self.ueye.IS_CM_BGRA8_PACKED
self.n_bits_per_pixel = self.ueye.INT(32)
self.bytes_per_pixel = int(self.n_bits_per_pixel / 8)
elif (
int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
== self.ueye.IS_COLORMODE_MONOCHROME
):
# for color camera models use RGB32 mode
self.m_n_colormode = self.ueye.IS_CM_MONO8
self.n_bits_per_pixel = self.ueye.INT(8)
self.bytes_per_pixel = int(self.n_bits_per_pixel / 8)
else:
# for monochrome camera models use Y8 mode
self.m_n_colormode = self.ueye.IS_CM_MONO8
self.n_bits_per_pixel = self.ueye.INT(8)
self.bytes_per_pixel = int(self.n_bits_per_pixel / 8)
print("else")
# Can be used to set the size and position of an "area of interest"(AOI) within an image
check_error(
self.ueye.is_AOI(
self.h_cam,
self.ueye.IS_AOI_IMAGE_GET_AOI,
self.rect_roi,
self.ueye.sizeof(self.rect_roi),
),
"IDSCameraObject",
)
self.width = self.rect_roi.s32Width
self.height = self.rect_roi.s32Height
check_error(
self.ueye.is_AllocImageMem(
self.h_cam,
self.width,
self.height,
self.n_bits_per_pixel,
self.pc_image_mem,
self.mem_id,
),
"IDSCameraObject",
)
check_error(
self.ueye.is_SetImageMem(self.h_cam, self.pc_image_mem, self.mem_id), "IDSCameraObject"
)
check_error(self.ueye.is_SetColorMode(self.h_cam, self.m_n_colormode), "IDSCameraObject")
check_error(
self.ueye.is_CaptureVideo(self.h_cam, self.ueye.IS_DONT_WAIT), "IDSCameraObject"
)
check_error(
self.ueye.is_InquireImageMem(
self.h_cam,
self.pc_image_mem,
self.mem_id,
self.width,
self.height,
self.n_bits_per_pixel,
self.pitch,
),
"IDSCameraObject",
)
def __repr__(self):
return f"IDSCameraObject\n\ndevice_id={self._device_id},\ns_info={self.s_info},\nc_info={self.c_info},\nrect_roi={self.rect_roi},\npc_image_mem={self.pc_image_mem},\nmem_id={self.mem_id},\npitch={self.pitch},\nm_n_colormode={self.m_n_colormode},\nn_bits_per_pixel={self.n_bits_per_pixel},\nbytes_per_pixel={self.bytes_per_pixel}"
class Camera:
"""Camera base class for IDS cameras."""
def __init__(
self,
camera_id: int,
m_n_colormode: Literal[0, 1, 2, 3] = 1,
bits_per_pixel: int = 24,
connect: bool = True,
):
self.ueye = ueye
self.camera_id = camera_id
self._inputs = {"m_n_colormode": m_n_colormode, "bits_per_pixel": bits_per_pixel}
self._connected = False
self.cam = None
atexit.register(self.on_disconnect)
if connect:
self.on_connect()
def set_roi(self, x: int, y: int, width: int, height: int):
"""Set the region of interest (ROI) for the camera."""
rect_roi = ueye.IS_RECT()
rect_roi.s32X = x
rect_roi.s32Y = y
rect_roi.s32Width = width
rect_roi.s32Height = height
ret = self.ueye.is_AOI(
self.cam.h_cam, self.ueye.IS_AOI_IMAGE_SET_AOI, rect_roi, self.ueye.sizeof(rect_roi)
)
check_error(ret, "IDSCameraObject")
logger.info(f"ROI set to: {rect_roi}")
def on_connect(self):
"""Connect to the camera and initialize it."""
if self._connected:
logger.warning("Camera is already connected.")
return
self.cam = IDSCameraObject(self.camera_id, **self._inputs)
self._connected = True
def on_disconnect(self):
"""Disconnect from the camera."""
try:
if self.cam and self.cam.h_cam:
check_error(self.ueye.is_ExitCamera(self.cam.h_cam), "IDSCameraObject")
self._connected = False
self.cam = None
logger.info("Camera disconnected.")
except Exception as e:
logger.info(f"Error during camera disconnection: {e}")
@property
def exposure_time(self) -> float:
"""Get the exposure time of the camera."""
exposure = ueye.c_double()
ret = self.ueye.is_Exposure(self.cam.h_cam, ueye.IS_EXPOSURE_CMD_GET_EXPOSURE, exposure, 8)
check_error(ret, "IDSCameraObject")
return exposure.value
@exposure_time.setter
def exposure_time(self, value: float):
"""Set the exposure time of the camera."""
exposure = ueye.c_double(value)
check_error(
self.ueye.is_Exposure(self.cam.h_cam, ueye.IS_EXPOSURE_CMD_SET_EXPOSURE, exposure, 8),
"IDSCameraObject",
)
def set_auto_gain(self, enable: bool):
"""Enable or disable auto gain."""
enable = ueye.c_int(1) if enable else ueye.c_int(0)
value_to_return = ueye.c_double()
check_error(
self.ueye.is_SetAutoParameter(
self.cam.h_cam, ueye.IS_SET_ENABLE_AUTO_GAIN, enable, value_to_return
),
"IDSCameraObject",
)
def set_auto_shutter(self, enable: bool):
"""Enable or disable auto exposure."""
enable = ueye.c_int(1) if enable else ueye.c_int(0)
value_to_return = ueye.c_double()
check_error(
self.ueye.is_SetAutoParameter(
self.cam.h_cam, ueye.IS_SET_ENABLE_AUTO_SHUTTER, enable, value_to_return
),
"IDSCameraObject",
)
def get_image_data(self) -> np.ndarray | None:
"""Get the image data from the camera."""
if not self._connected:
logger.warning("Camera is not connected.")
return None
array = self.ueye.get_data(
self.cam.pc_image_mem,
self.cam.width,
self.cam.height,
self.cam.n_bits_per_pixel,
self.cam.pitch,
copy=False,
)
if array is None:
logger.error("Failed to get image data from the camera.")
return None
return np.reshape(
array, (self.cam.height.value, self.cam.width.value, self.cam.bytes_per_pixel)
)
if __name__ == "__main__":
# Example usage
camera = Camera(camera_id=201)
camera.on_connect()

View File

@@ -0,0 +1,281 @@
"""Utility functions and classes for IDS cameras using the pyueye library."""
from unittest import mock
from bec_lib.logger import bec_logger
logger = bec_logger.logger
try:
from pyueye import ueye
except ImportError as exc:
logger.warning(f"The pyueye library is not properly installed : {exc}")
ueye = None
if ueye is not None:
error_codes = {
ueye.IS_NO_SUCCESS: "No success",
ueye.IS_SUCCESS: "Success",
ueye.IS_INVALID_CAMERA_HANDLE: "Invalid camera handle",
ueye.IS_INVALID_HANDLE: "Invalid handle",
ueye.IS_IO_REQUEST_FAILED: "IO request failed",
ueye.IS_CANT_OPEN_DEVICE: "Cannot open device",
ueye.IS_CANT_CLOSE_DEVICE: "Cannot close device",
ueye.IS_CANT_SETUP_MEMORY: "Cannot setup memory",
ueye.IS_NO_HWND_FOR_ERROR_REPORT: "No HWND for error report",
ueye.IS_ERROR_MESSAGE_NOT_CREATED: "Error message not created",
ueye.IS_ERROR_STRING_NOT_FOUND: "Error string not found",
ueye.IS_HOOK_NOT_CREATED: "Hook not created",
ueye.IS_TIMER_NOT_CREATED: "Timer not created",
ueye.IS_CANT_OPEN_REGISTRY: "Cannot open registry",
ueye.IS_CANT_READ_REGISTRY: "Cannot read registry",
ueye.IS_CANT_VALIDATE_BOARD: "Cannot validate board",
ueye.IS_CANT_GIVE_BOARD_ACCESS: "Cannot give board access",
ueye.IS_NO_IMAGE_MEM_ALLOCATED: "No image memory allocated",
ueye.IS_CANT_CLEANUP_MEMORY: "Cannot clean up memory",
ueye.IS_CANT_COMMUNICATE_WITH_DRIVER: "Cannot communicate with driver",
ueye.IS_FUNCTION_NOT_SUPPORTED_YET: "Function not supported yet",
ueye.IS_OPERATING_SYSTEM_NOT_SUPPORTED: "Operating system not supported",
ueye.IS_INVALID_VIDEO_IN: "Invalid video input",
ueye.IS_INVALID_IMG_SIZE: "Invalid image size",
ueye.IS_INVALID_ADDRESS: "Invalid address",
ueye.IS_INVALID_VIDEO_MODE: "Invalid video mode",
ueye.IS_INVALID_AGC_MODE: "Invalid AGC mode",
ueye.IS_INVALID_GAMMA_MODE: "Invalid gamma mode",
ueye.IS_INVALID_SYNC_LEVEL: "Invalid sync level",
ueye.IS_INVALID_CBARS_MODE: "Invalid color bars mode",
ueye.IS_INVALID_COLOR_MODE: "Invalid color mode",
ueye.IS_INVALID_SCALE_FACTOR: "Invalid scale factor",
ueye.IS_INVALID_IMAGE_SIZE: "Invalid image size",
ueye.IS_INVALID_IMAGE_POS: "Invalid image position",
ueye.IS_INVALID_CAPTURE_MODE: "Invalid capture mode",
ueye.IS_INVALID_RISC_PROGRAM: "Invalid RISC program",
ueye.IS_INVALID_BRIGHTNESS: "Invalid brightness",
ueye.IS_INVALID_CONTRAST: "Invalid contrast",
ueye.IS_INVALID_SATURATION_U: "Invalid saturation U",
ueye.IS_INVALID_SATURATION_V: "Invalid saturation V",
ueye.IS_INVALID_HUE: "Invalid hue",
ueye.IS_INVALID_HOR_FILTER_STEP: "Invalid horizontal filter step",
ueye.IS_INVALID_VERT_FILTER_STEP: "Invalid vertical filter step",
ueye.IS_INVALID_EEPROM_READ_ADDRESS: "Invalid EEPROM read address",
ueye.IS_INVALID_EEPROM_WRITE_ADDRESS: "Invalid EEPROM write address",
ueye.IS_INVALID_EEPROM_READ_LENGTH: "Invalid EEPROM read length",
ueye.IS_INVALID_EEPROM_WRITE_LENGTH: "Invalid EEPROM write length",
ueye.IS_INVALID_BOARD_INFO_POINTER: "Invalid board info pointer",
ueye.IS_INVALID_DISPLAY_MODE: "Invalid display mode",
ueye.IS_INVALID_ERR_REP_MODE: "Invalid error report mode",
ueye.IS_INVALID_BITS_PIXEL: "Invalid bits per pixel",
ueye.IS_INVALID_MEMORY_POINTER: "Invalid memory pointer",
ueye.IS_FILE_WRITE_OPEN_ERROR: "File write open error",
ueye.IS_FILE_READ_OPEN_ERROR: "File read open error",
ueye.IS_FILE_READ_INVALID_BMP_ID: "File read invalid BMP ID",
ueye.IS_FILE_READ_INVALID_BMP_SIZE: "File read invalid BMP size",
ueye.IS_FILE_READ_INVALID_BIT_COUNT: "File read invalid bit count",
ueye.IS_WRONG_KERNEL_VERSION: "Wrong kernel version",
ueye.IS_RISC_INVALID_XLENGTH: "RISC invalid X length",
ueye.IS_RISC_INVALID_YLENGTH: "RISC invalid Y length",
ueye.IS_RISC_EXCEED_IMG_SIZE: "RISC exceed image size",
ueye.IS_DD_MAIN_FAILED: "DirectDraw main surface failed",
ueye.IS_DD_PRIMSURFACE_FAILED: "DirectDraw primary surface failed",
ueye.IS_DD_SCRN_SIZE_NOT_SUPPORTED: "Screen size not supported",
ueye.IS_DD_CLIPPER_FAILED: "Clipper failed",
ueye.IS_DD_CLIPPER_HWND_FAILED: "Clipper HWND failed",
ueye.IS_DD_CLIPPER_CONNECT_FAILED: "Clipper connect failed",
ueye.IS_DD_BACKSURFACE_FAILED: "Backsurface failed",
ueye.IS_DD_BACKSURFACE_IN_SYSMEM: "Backsurface in system memory",
ueye.IS_DD_MDL_MALLOC_ERR: "Memory malloc error",
ueye.IS_DD_MDL_SIZE_ERR: "Memory size error",
ueye.IS_DD_CLIP_NO_CHANGE: "Clip no change",
ueye.IS_DD_PRIMMEM_NULL: "Primary memory null",
ueye.IS_DD_BACKMEM_NULL: "Back memory null",
ueye.IS_DD_BACKOVLMEM_NULL: "Back overlay memory null",
ueye.IS_DD_OVERLAYSURFACE_FAILED: "Overlay surface failed",
ueye.IS_DD_OVERLAYSURFACE_IN_SYSMEM: "Overlay surface in system memory",
ueye.IS_DD_OVERLAY_NOT_ALLOWED: "Overlay not allowed",
ueye.IS_DD_OVERLAY_COLKEY_ERR: "Overlay color key error",
ueye.IS_DD_OVERLAY_NOT_ENABLED: "Overlay not enabled",
ueye.IS_DD_GET_DC_ERROR: "Get DC error",
ueye.IS_DD_DDRAW_DLL_NOT_LOADED: "DirectDraw DLL not loaded",
ueye.IS_DD_THREAD_NOT_CREATED: "DirectDraw thread not created",
ueye.IS_DD_CANT_GET_CAPS: "Cannot get capabilities",
ueye.IS_DD_NO_OVERLAYSURFACE: "No overlay surface",
ueye.IS_DD_NO_OVERLAYSTRETCH: "No overlay stretch",
ueye.IS_DD_CANT_CREATE_OVERLAYSURFACE: "Cannot create overlay surface",
ueye.IS_DD_CANT_UPDATE_OVERLAYSURFACE: "Cannot update overlay surface",
ueye.IS_DD_INVALID_STRETCH: "Invalid stretch",
ueye.IS_EV_INVALID_EVENT_NUMBER: "Invalid event number",
ueye.IS_INVALID_MODE: "Invalid mode",
ueye.IS_CANT_FIND_HOOK: "Cannot find hook",
ueye.IS_CANT_GET_HOOK_PROC_ADDR: "Cannot get hook procedure address",
ueye.IS_CANT_CHAIN_HOOK_PROC: "Cannot chain hook procedure",
ueye.IS_CANT_SETUP_WND_PROC: "Cannot setup window procedure",
ueye.IS_HWND_NULL: "HWND is null",
ueye.IS_INVALID_UPDATE_MODE: "Invalid update mode",
ueye.IS_NO_ACTIVE_IMG_MEM: "No active image memory",
ueye.IS_CANT_INIT_EVENT: "Cannot initialize event",
ueye.IS_FUNC_NOT_AVAIL_IN_OS: "Function not available in OS",
ueye.IS_CAMERA_NOT_CONNECTED: "Camera not connected",
ueye.IS_SEQUENCE_LIST_EMPTY: "Sequence list empty",
ueye.IS_CANT_ADD_TO_SEQUENCE: "Cannot add to sequence",
ueye.IS_LOW_OF_SEQUENCE_RISC_MEM: "Low sequence RISC memory",
ueye.IS_IMGMEM2FREE_USED_IN_SEQ: "Image memory to free used in sequence",
ueye.IS_IMGMEM_NOT_IN_SEQUENCE_LIST: "Image memory not in sequence list",
ueye.IS_SEQUENCE_BUF_ALREADY_LOCKED: "Sequence buffer already locked",
ueye.IS_INVALID_DEVICE_ID: "Invalid device ID",
ueye.IS_INVALID_BOARD_ID: "Invalid board ID",
ueye.IS_ALL_DEVICES_BUSY: "All devices busy",
ueye.IS_HOOK_BUSY: "Hook busy",
ueye.IS_TIMED_OUT: "Timed out",
ueye.IS_NULL_POINTER: "Null pointer",
ueye.IS_WRONG_HOOK_VERSION: "Wrong hook version",
ueye.IS_INVALID_PARAMETER: "Invalid parameter",
ueye.IS_NOT_ALLOWED: "Not allowed",
ueye.IS_OUT_OF_MEMORY: "Out of memory",
ueye.IS_INVALID_WHILE_LIVE: "Invalid while live",
ueye.IS_ACCESS_VIOLATION: "Access violation",
ueye.IS_UNKNOWN_ROP_EFFECT: "Unknown ROP effect",
ueye.IS_INVALID_RENDER_MODE: "Invalid render mode",
ueye.IS_INVALID_THREAD_CONTEXT: "Invalid thread context",
ueye.IS_NO_HARDWARE_INSTALLED: "No hardware installed",
ueye.IS_INVALID_WATCHDOG_TIME: "Invalid watchdog time",
ueye.IS_INVALID_WATCHDOG_MODE: "Invalid watchdog mode",
ueye.IS_INVALID_PASSTHROUGH_IN: "Invalid passthrough input",
ueye.IS_ERROR_SETTING_PASSTHROUGH_IN: "Error setting passthrough input",
ueye.IS_FAILURE_ON_SETTING_WATCHDOG: "Failure setting watchdog",
ueye.IS_NO_USB20: "No USB 2.0",
ueye.IS_CAPTURE_RUNNING: "Capture running",
ueye.IS_MEMORY_BOARD_ACTIVATED: "Memory board activated",
ueye.IS_MEMORY_BOARD_DEACTIVATED: "Memory board deactivated",
ueye.IS_NO_MEMORY_BOARD_CONNECTED: "No memory board connected",
ueye.IS_TOO_LESS_MEMORY: "Too little memory",
ueye.IS_IMAGE_NOT_PRESENT: "Image not present",
ueye.IS_MEMORY_MODE_RUNNING: "Memory mode running",
ueye.IS_MEMORYBOARD_DISABLED: "Memoryboard disabled",
ueye.IS_TRIGGER_ACTIVATED: "Trigger activated",
ueye.IS_WRONG_KEY: "Wrong key",
ueye.IS_CRC_ERROR: "CRC error",
ueye.IS_NOT_YET_RELEASED: "Not yet released",
ueye.IS_NOT_CALIBRATED: "Not calibrated", # already present
ueye.IS_WAITING_FOR_KERNEL: "Waiting for kernel",
ueye.IS_NOT_SUPPORTED: "Not supported", # already present
ueye.IS_TRIGGER_NOT_ACTIVATED: "Trigger not activated",
ueye.IS_OPERATION_ABORTED: "Operation aborted",
ueye.IS_BAD_STRUCTURE_SIZE: "Bad structure size",
ueye.IS_INVALID_BUFFER_SIZE: "Invalid buffer size",
ueye.IS_INVALID_PIXEL_CLOCK: "Invalid pixel clock",
ueye.IS_INVALID_EXPOSURE_TIME: "Invalid exposure time",
ueye.IS_AUTO_EXPOSURE_RUNNING: "Auto exposure running",
ueye.IS_CANNOT_CREATE_BB_SURF: "Cannot create BB surface",
ueye.IS_CANNOT_CREATE_BB_MIX: "Cannot create BB mix",
ueye.IS_BB_OVLMEM_NULL: "BB overlay memory null",
ueye.IS_CANNOT_CREATE_BB_OVL: "Cannot create BB overlay",
ueye.IS_NOT_SUPP_IN_OVL_SURF_MODE: "Not supported in overlay surface mode",
ueye.IS_INVALID_SURFACE: "Invalid surface",
ueye.IS_SURFACE_LOST: "Surface lost",
ueye.IS_RELEASE_BB_OVL_DC: "Release BB overlay DC",
ueye.IS_BB_TIMER_NOT_CREATED: "BB timer not created",
ueye.IS_BB_OVL_NOT_EN: "BB overlay not enabled",
ueye.IS_ONLY_IN_BB_MODE: "Only in BB mode",
ueye.IS_INVALID_COLOR_FORMAT: "Invalid color format",
ueye.IS_INVALID_WB_BINNING_MODE: "Invalid WB binning mode",
ueye.IS_INVALID_I2C_DEVICE_ADDRESS: "Invalid I²C device address",
ueye.IS_COULD_NOT_CONVERT: "Could not convert",
ueye.IS_TRANSFER_ERROR: "Transfer error", # already present
ueye.IS_PARAMETER_SET_NOT_PRESENT: "Parameter set not present",
ueye.IS_INVALID_CAMERA_TYPE: "Invalid camera type",
ueye.IS_INVALID_HOST_IP_HIBYTE: "Invalid host IP high byte",
ueye.IS_CM_NOT_SUPP_IN_CURR_DISPLAYMODE: "Color matrix not supported in current display mode",
ueye.IS_NO_IR_FILTER: "No IR filter",
ueye.IS_STARTER_FW_UPLOAD_NEEDED: "Starter firmware upload needed",
ueye.IS_DR_LIBRARY_NOT_FOUND: "Driver library not found",
ueye.IS_DR_DEVICE_OUT_OF_MEMORY: "Driver device out of memory",
ueye.IS_DR_CANNOT_CREATE_SURFACE: "Driver cannot create surface",
ueye.IS_DR_CANNOT_CREATE_VERTEX_BUFFER: "Driver cannot create vertex buffer",
ueye.IS_DR_CANNOT_CREATE_TEXTURE: "Driver cannot create texture",
ueye.IS_DR_CANNOT_LOCK_OVERLAY_SURFACE: "Driver cannot lock overlay surface",
ueye.IS_DR_CANNOT_UNLOCK_OVERLAY_SURFACE: "Driver cannot unlock overlay surface",
ueye.IS_DR_CANNOT_GET_OVERLAY_DC: "Driver cannot get overlay DC",
ueye.IS_DR_CANNOT_RELEASE_OVERLAY_DC: "Driver cannot release overlay DC",
ueye.IS_DR_DEVICE_CAPS_INSUFFICIENT: "Driver device capabilities insufficient",
ueye.IS_INCOMPATIBLE_SETTING: "Incompatible setting",
ueye.IS_DR_NOT_ALLOWED_WHILE_DC_IS_ACTIVE: "Driver not allowed while DC is active",
ueye.IS_DEVICE_ALREADY_PAIRED: "Device already paired",
ueye.IS_SUBNETMASK_MISMATCH: "Subnet mask mismatch",
ueye.IS_SUBNET_MISMATCH: "Subnet mismatch",
ueye.IS_INVALID_IP_CONFIGURATION: "Invalid IP configuration",
ueye.IS_DEVICE_NOT_COMPATIBLE: "Device not compatible",
ueye.IS_NETWORK_FRAME_SIZE_INCOMPATIBLE: "Network frame size incompatible",
ueye.IS_NETWORK_CONFIGURATION_INVALID: "Network configuration invalid",
ueye.IS_ERROR_CPU_IDLE_STATES_CONFIGURATION: "CPU idle states configuration error",
ueye.IS_DEVICE_BUSY: "Device busy",
ueye.IS_SENSOR_INITIALIZATION_FAILED: "Sensor initialization failed",
ueye.IS_IMAGE_BUFFER_NOT_DWORD_ALIGNED: "Image buffer not DWORD aligned",
ueye.IS_SEQ_BUFFER_IS_LOCKED: "Sequence buffer is locked",
ueye.IS_FILE_PATH_DOES_NOT_EXIST: "File path does not exist",
ueye.IS_INVALID_WINDOW_HANDLE: "Invalid window handle",
ueye.IS_INVALID_IMAGE_PARAMETER: "Invalid image parameter",
ueye.IS_NO_SUCH_DEVICE: "No such device",
ueye.IS_DEVICE_IN_USE: "Device in use",
}
bits_per_pixel = {
ueye.IS_CM_SENSOR_RAW8: 8,
ueye.IS_CM_SENSOR_RAW10: 16,
ueye.IS_CM_SENSOR_RAW12: 16,
ueye.IS_CM_SENSOR_RAW16: 16,
ueye.IS_CM_MONO8: 8,
ueye.IS_CM_RGB8_PACKED: 24,
ueye.IS_CM_BGR8_PACKED: 24,
ueye.IS_CM_RGBA8_PACKED: 32,
ueye.IS_CM_BGRA8_PACKED: 32,
ueye.IS_CM_BGR10_PACKED: 32,
ueye.IS_CM_RGB10_PACKED: 32,
ueye.IS_CM_BGRA12_UNPACKED: 64,
ueye.IS_CM_BGR12_UNPACKED: 48,
ueye.IS_CM_BGRY8_PACKED: 32,
ueye.IS_CM_BGR565_PACKED: 16,
ueye.IS_CM_BGR5_PACKED: 16,
ueye.IS_CM_UYVY_PACKED: 16,
ueye.IS_CM_UYVY_MONO_PACKED: 16,
ueye.IS_CM_UYVY_BAYER_PACKED: 16,
ueye.IS_CM_CBYCRY_PACKED: 16,
}
def get_bits_per_pixel(color_mode):
"""
Returns the number of bits per pixel for the given color mode.
"""
if color_mode not in bits_per_pixel:
raise UEyeException(f"Unknown color mode: {color_mode}")
return bits_per_pixel[color_mode]
class UEyeException(Exception):
"""Custom exception for uEye errors."""
def __init__(self, error_code, called_from: str | None = None):
self.error_code = error_code
self.called_from = called_from if called_from is not None else ""
def __str__(self):
if self.error_code in error_codes:
return f"Exception: {error_codes[self.error_code]} raised in {self.called_from}."
else:
for att, val in ueye.__dict__.items():
if (
att[0:2] == "IS"
and val == self.error_code
and ("FAILED" in att or "INVALID" in att or "ERROR" in att or "NOT" in att)
):
return f"Exception: {str(self.error_code)} ({att} ? <value> {val}) raised in {self.called_from}."
return f"Exception: {str(self.error_code)} raised in {self.called_from}."
def check_error(error_code, called_from: str | None = None):
"""
Check an error code, and raise an error if adequate.
"""
if error_code != ueye.IS_SUCCESS:
called_from = called_from if called_from is not None else ""
raise UEyeException(error_code, called_from)

View File

@@ -3,226 +3,95 @@ import time
import numpy as np
from ophyd import Component as Cpt
from ophyd import Device, Kind
from ophyd_devices.interfaces.base_classes.psi_detector_base import (
CustomDetectorMixin,
PSIDetectorBase,
)
from ophyd_devices.sim.sim_signals import SetableSignal
try:
from pyueye import ueye
except ImportError:
# The pyueye library is not installed or doesn't provide the necessary c libs
ueye = None
from ophyd import DeviceStatus, Kind, Signal, StatusBase
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from ophyd_devices.utils.bec_signals import PreviewSignal
class IDSCustomPrepare(CustomDetectorMixin):
class ROISignal(Signal):
"""
Signal to handle the Region of Interest (ROI) for the IDS camera.
It is a tuple of (x, y, width, height).
"""
USER_ACCESS = ["pyueye"]
pyueye = ueye
def __init__(self, *_args, parent: Device = None, **_kwargs) -> None:
super().__init__(*_args, parent=parent, **_kwargs)
self.ueye = ueye
self.h_cam = None
self.s_info = None
self.data_thread = None
self.thread_event = None
def on_connection_established(self):
self.hCam = self.ueye.HIDS(
self.parent.camera_ID
) # 0: first available camera; 1-254: The camera with the specified camera ID
self.sInfo = self.ueye.SENSORINFO()
self.cInfo = self.ueye.CAMINFO()
self.pcImageMemory = self.ueye.c_mem_p()
self.MemID = self.ueye.int()
self.rectAOI = self.ueye.IS_RECT()
self.pitch = self.ueye.INT()
self.nBitsPerPixel = self.ueye.INT(
self.parent.bits_per_pixel
) # 24: bits per pixel for color mode; take 8 bits per pixel for monochrome
self.channels = (
self.parent.channels
) # 3: channels for color mode(RGB); take 1 channel for monochrome
self.m_nColorMode = self.ueye.INT(
self.parent.m_n_colormode
) # Y8/RGB16/RGB24/REG32 (1 for our color cameras)
self.bytes_per_pixel = int(self.nBitsPerPixel / 8)
# Starts the driver and establishes the connection to the camera
nRet = self.ueye.is_InitCamera(self.hCam, None)
if nRet != self.ueye.IS_SUCCESS:
print("is_InitCamera ERROR")
# Reads out the data hard-coded in the non-volatile camera memory and writes it to the data structure that cInfo points to
nRet = self.ueye.is_GetCameraInfo(self.hCam, self.cInfo)
if nRet != self.ueye.IS_SUCCESS:
print("is_GetCameraInfo ERROR")
# You can query additional information about the sensor type used in the camera
nRet = self.ueye.is_GetSensorInfo(self.hCam, self.sInfo)
if nRet != self.ueye.IS_SUCCESS:
print("is_GetSensorInfo ERROR")
nRet = self.ueye.is_ResetToDefault(self.hCam)
if nRet != self.ueye.IS_SUCCESS:
print("is_ResetToDefault ERROR")
# Set display mode to DIB
nRet = self.ueye.is_SetDisplayMode(self.hCam, self.ueye.IS_SET_DM_DIB)
# Set the right color mode
if (
int.from_bytes(self.sInfo.nColorMode.value, byteorder="big")
== self.ueye.IS_COLORMODE_BAYER
):
# setup the color depth to the current windows setting
self.ueye.is_GetColorDepth(self.hCam, self.nBitsPerPixel, self.m_nColorMode)
bytes_per_pixel = int(self.nBitsPerPixel / 8)
print("IS_COLORMODE_BAYER: ")
print("\tm_nColorMode: \t\t", self.m_nColorMode)
print("\tnBitsPerPixel: \t\t", self.nBitsPerPixel)
print("\tbytes_per_pixel: \t\t", bytes_per_pixel)
print()
elif (
int.from_bytes(self.sInfo.nColorMode.value, byteorder="big")
== self.ueye.IS_COLORMODE_CBYCRY
):
# for color camera models use RGB32 mode
m_nColorMode = ueye.IS_CM_BGRA8_PACKED
nBitsPerPixel = ueye.INT(32)
bytes_per_pixel = int(self.nBitsPerPixel / 8)
print("IS_COLORMODE_CBYCRY: ")
print("\tm_nColorMode: \t\t", m_nColorMode)
print("\tnBitsPerPixel: \t\t", nBitsPerPixel)
print("\tbytes_per_pixel: \t\t", bytes_per_pixel)
print()
elif (
int.from_bytes(self.sInfo.nColorMode.value, byteorder="big")
== self.ueye.IS_COLORMODE_MONOCHROME
):
# for color camera models use RGB32 mode
m_nColorMode = self.ueye.IS_CM_MONO8
nBitsPerPixel = self.ueye.INT(8)
bytes_per_pixel = int(nBitsPerPixel / 8)
print("IS_COLORMODE_MONOCHROME: ")
print("\tm_nColorMode: \t\t", m_nColorMode)
print("\tnBitsPerPixel: \t\t", nBitsPerPixel)
print("\tbytes_per_pixel: \t\t", bytes_per_pixel)
print()
else:
# for monochrome camera models use Y8 mode
m_nColorMode = self.ueye.IS_CM_MONO8
nBitsPerPixel = self.ueye.INT(8)
bytes_per_pixel = int(nBitsPerPixel / 8)
print("else")
# Can be used to set the size and position of an "area of interest"(AOI) within an image
nRet = self.ueye.is_AOI(
self.hCam, ueye.IS_AOI_IMAGE_GET_AOI, self.rectAOI, self.ueye.sizeof(self.rectAOI)
def __init__(
self,
*,
name,
roi: tuple | None = None,
value=0,
dtype=None,
shape=None,
timestamp=None,
parent=None,
labels=None,
kind=Kind.hinted,
tolerance=None,
rtolerance=None,
metadata=None,
cl=None,
attr_name="",
):
super().__init__(
name=name,
value=value,
dtype=dtype,
shape=shape,
timestamp=timestamp,
parent=parent,
labels=labels,
kind=kind,
tolerance=tolerance,
rtolerance=rtolerance,
metadata=metadata,
cl=cl,
attr_name=attr_name,
)
if nRet != self.ueye.IS_SUCCESS:
print("is_AOI ERROR")
self.roi = roi
self.width = self.rectAOI.s32Width
self.height = self.rectAOI.s32Height
def get(self, **kwargs):
image = self.parent.image_data.get().data
if not isinstance(image, np.ndarray):
return -1 # -1 if no valid image is available
# Prints out some information about the camera and the sensor
print("Camera model:\t\t", self.sInfo.strSensorName.decode("utf-8"))
print("Camera serial no.:\t", self.cInfo.SerNo.decode("utf-8"))
print("Maximum image width:\t", self.width)
print("Maximum image height:\t", self.height)
print()
# ---------------------------------------------------------------------------------------------------------------------------------------
# Allocates an image memory for an image having its dimensions defined by width and height and its color depth defined by nBitsPerPixel
nRet = self.ueye.is_AllocImageMem(
self.hCam, self.width, self.height, self.nBitsPerPixel, self.pcImageMemory, self.MemID
)
if nRet != self.ueye.IS_SUCCESS:
print("is_AllocImageMem ERROR")
if self.roi is None:
roi = (0, 0, image.shape[1], image.shape[0])
else:
# Makes the specified image memory the active memory
nRet = self.ueye.is_SetImageMem(self.hCam, self.pcImageMemory, self.MemID)
if nRet != self.ueye.IS_SUCCESS:
print("is_SetImageMem ERROR")
else:
# Set the desired color mode
nRet = self.ueye.is_SetColorMode(self.hCam, self.m_nColorMode)
# Activates the camera's live video mode (free run mode)
nRet = self.ueye.is_CaptureVideo(self.hCam, self.ueye.IS_DONT_WAIT)
if nRet != self.ueye.IS_SUCCESS:
print("is_CaptureVideo ERROR")
# Enables the queue mode for existing image memory sequences
nRet = self.ueye.is_InquireImageMem(
self.hCam,
self.pcImageMemory,
self.MemID,
self.width,
self.height,
self.nBitsPerPixel,
self.pitch,
)
if nRet != self.ueye.IS_SUCCESS:
print("is_InquireImageMem ERROR")
else:
print("Press q to leave the programm")
startmeasureframerate = True
Gain = False
# Start live mode of camera immediately
self.parent.start_live_mode()
def _start_data_thread(self):
self.thread_event = threading.Event()
self.data_thread = threading.Thread(target=self._receive_data_from_camera, daemon=True)
self.data_thread.start()
def _receive_data_from_camera(self):
while not self.thread_event.is_set():
# In order to display the image in an OpenCV window we need to...
# ...extract the data of our image memory
array = ueye.get_data(
self.pcImageMemory,
self.width,
self.height,
self.nBitsPerPixel,
self.pitch,
copy=False,
)
# bytes_per_pixel = int(nBitsPerPixel / 8)
# ...reshape it in an numpy array...
frame = np.reshape(array, (self.height.value, self.width.value, self.bytes_per_pixel))
self.parent.image_data.put(frame)
self.parent._run_subs(sub_type=self.parent.SUB_MONITOR, value=frame)
time.sleep(0.1)
def on_trigger(self):
pass
# self.parent._run_subs(sub_type=self.parent.SUB_MONITOR, value=self.parent.image_data.get())
roi = self.roi
if len(image.shape) > 2:
image = np.sum(image, axis=2) # Convert to grayscale if it's a color image
return np.sum(image[roi[1] : roi[1] + roi[3], roi[0] : roi[0] + roi[2]], (0, 1))
class IDSCamera(PSIDetectorBase):
USER_ACCESS = ["start_live_mode", "stop_live_mode"]
class IDSCamera(PSIDeviceBase):
""" "
#---------------------------------------------------------------------------------------------------------------------------------------
custom_prepare_cls = IDSCustomPrepare
#Variables
hCam = ueye.HIDS(202) #0: first available camera; 1-254: The camera with the specified camera ID
sInfo = ueye.SENSORINFO()
cInfo = ueye.CAMINFO()
pcImageMemory = ueye.c_mem_p()
MemID = ueye.int()
rectAOI = ueye.IS_RECT()
pitch = ueye.INT()
nBitsPerPixel = ueye.INT(24) #24: bits per pixel for color mode; take 8 bits per pixel for monochrome
channels = 3 #3: channels for color mode(RGB); take 1 channel for monochrome
m_nColorMode = ueye.INT(1) # Y8/RGB16/RGB24/REG32 (1 for our color cameras)
bytes_per_pixel = int(nBitsPerPixel / 8)
image_data = Cpt(SetableSignal, value=np.empty((100, 100)), kind=Kind.omitted)
ids_cam
...
"""
SUB_MONITOR = "device_monitor_2d"
_default_sub = SUB_MONITOR
USER_ACCESS = ["start_live_mode", "stop_live_mode", "set_roi", "width", "height"]
image_data = Cpt(PreviewSignal, ndim=2, kind=Kind.omitted)
# roi_bot_left = Cpt(ROISignal, roi=(400, 525, 118, 105), kind=Kind.normal)
# roi_bot_right = Cpt(ROISignal, roi=(518, 525, 118, 105), kind=Kind.normal)
# roi_top_left = Cpt(ROISignal, roi=(400, 630, 118, 105), kind=Kind.normal)
# roi_top_right = Cpt(ROISignal, roi=(518, 630, 118, 105), kind=Kind.normal)
# roi_signal = Cpt(ROISignal, kind=Kind.normal, doc="Region of Interest signal")
def __init__(
self,
@@ -234,19 +103,221 @@ class IDSCamera(PSIDetectorBase):
channels: int,
m_n_colormode: int,
kind=None,
parent=None,
device_manager=None,
**kwargs,
):
super().__init__(
prefix=prefix, name=name, kind=kind, parent=parent, device_manager=device_manager, **kwargs
prefix=prefix, name=name, kind=kind, device_manager=device_manager, **kwargs
)
self.camera_ID = camera_ID
self.bits_per_pixel = bits_per_pixel
self.bytes_per_pixel = None
self.channels = channels
self.m_n_colormode = m_n_colormode
#TODO fix connected and wait_for_connection
self.custom_prepare.on_connection_established()
self._m_n_colormode_input = m_n_colormode
self.m_n_colormode = None
self.ueye = ueye
self.h_cam = None
self.s_info = None
self.data_thread = None
self.c_info = None
self.pc_image_memory = None
self.mem_id = None
self.rect_aoi = None
self.pitch = None
self.n_bits_per_pixel = None
self.width = None
self.height = None
self.thread_event = threading.Event()
self.data_thread = None
self._roi: tuple | None = None # x, y, width, height
def set_roi(self, x: int, y: int, width: int, height: int):
self._roi = (x, y, width, height)
def start_backend(self):
if self.ueye is None:
raise ImportError("The pyueye library is not installed.")
self.h_cam = self.ueye.HIDS(
self.camera_ID
) # 0: first available camera; 1-254: The camera with the specified camera ID
self.s_info = self.ueye.SENSORINFO()
self.c_info = self.ueye.CAMINFO()
self.pc_image_memory = self.ueye.c_mem_p()
self.mem_id = self.ueye.int()
self.rect_aoi = self.ueye.IS_RECT()
self.pitch = self.ueye.INT()
self.n_bits_per_pixel = self.ueye.INT(
self.bits_per_pixel
) # 24: bits per pixel for color mode; take 8 bits per pixel for monochrome
self.m_n_colormode = self.ueye.INT(
self._m_n_colormode_input
) # Y8/RGB16/RGB24/REG32 (1 for our color cameras)
self.bytes_per_pixel = int(self.n_bits_per_pixel / 8)
# Starts the driver and establishes the connection to the camera
ret = self.ueye.is_InitCamera(self.h_cam, None)
if ret != self.ueye.IS_SUCCESS:
print("is_InitCamera ERROR")
# Reads out the data hard-coded in the non-volatile camera memory and writes it to the data structure that c_info points to
ret = self.ueye.is_GetCameraInfo(self.h_cam, self.c_info)
if ret != self.ueye.IS_SUCCESS:
print("is_GetCameraInfo ERROR")
# You can query additional information about the sensor type used in the camera
ret = self.ueye.is_GetSensorInfo(self.h_cam, self.s_info)
if ret != self.ueye.IS_SUCCESS:
print("is_GetSensorInfo ERROR")
ret = self.ueye.is_ResetToDefault(self.h_cam)
if ret != self.ueye.IS_SUCCESS:
print("is_ResetToDefault ERROR")
# Set display mode to DIB
ret = self.ueye.is_SetDisplayMode(self.h_cam, self.ueye.IS_SET_DM_DIB)
# Set the right color mode
if (
int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
== self.ueye.IS_COLORMODE_BAYER
):
# setup the color depth to the current windows setting
self.ueye.is_GetColorDepth(self.h_cam, self.n_bits_per_pixel, self.m_n_colormode)
bytes_per_pixel = int(self.n_bits_per_pixel / 8)
print("IS_COLORMODE_BAYER: ")
print("\tm_n_colormode: \t\t", self.m_n_colormode)
print("\tn_bits_per_pixel: \t\t", self.n_bits_per_pixel)
print("\tbytes_per_pixel: \t\t", bytes_per_pixel)
print()
elif (
int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
== self.ueye.IS_COLORMODE_CBYCRY
):
# for color camera models use RGB32 mode
m_n_colormode = self.ueye.IS_CM_BGRA8_PACKED
n_bits_per_pixel = self.ueye.INT(32)
bytes_per_pixel = int(self.n_bits_per_pixel / 8)
print("IS_COLORMODE_CBYCRY: ")
print("\tm_n_colormode: \t\t", m_n_colormode)
print("\tn_bits_per_pixel: \t\t", n_bits_per_pixel)
print("\tbytes_per_pixel: \t\t", bytes_per_pixel)
print()
elif (
int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
== self.ueye.IS_COLORMODE_MONOCHROME
):
# for color camera models use RGB32 mode
m_n_colormode = self.ueye.IS_CM_MONO8
n_bits_per_pixel = self.ueye.INT(8)
bytes_per_pixel = int(n_bits_per_pixel / 8)
print("IS_COLORMODE_MONOCHROME: ")
print("\tm_n_colormode: \t\t", m_n_colormode)
print("\tn_bits_per_pixel: \t\t", n_bits_per_pixel)
print("\tbytes_per_pixel: \t\t", bytes_per_pixel)
print()
else:
# for monochrome camera models use Y8 mode
m_n_colormode = self.ueye.IS_CM_MONO8
n_bits_per_pixel = self.ueye.INT(8)
bytes_per_pixel = int(n_bits_per_pixel / 8)
print("else")
# Can be used to set the size and position of an "area of interest"(AOI) within an image
ret = self.ueye.is_AOI(
self.h_cam,
self.ueye.IS_AOI_IMAGE_GET_AOI,
self.rect_aoi,
self.ueye.sizeof(self.rect_aoi),
)
if ret != self.ueye.IS_SUCCESS:
print("is_AOI ERROR")
self.width = self.rect_aoi.s32Width
self.height = self.rect_aoi.s32Height
# Prints out some information about the camera and the sensor
print("Camera model:\t\t", self.s_info.strSensorName.decode("utf-8"))
print("Camera serial no.:\t", self.c_info.SerNo.decode("utf-8"))
print("Maximum image width:\t", self.width)
print("Maximum image height:\t", self.height)
print()
# ---------------------------------------------------------------------------------------------------------------------------------------
# Allocates an image memory for an image having its dimensions defined by width and height and its color depth defined by n_bits_per_pixel
ret = self.ueye.is_AllocImageMem(
self.h_cam,
self.width,
self.height,
self.n_bits_per_pixel,
self.pc_image_memory,
self.mem_id,
)
if ret != self.ueye.IS_SUCCESS:
print("is_AllocImageMem ERROR")
else:
# Makes the specified image memory the active memory
ret = self.ueye.is_SetImageMem(self.h_cam, self.pc_image_memory, self.mem_id)
if ret != self.ueye.IS_SUCCESS:
print("is_SetImageMem ERROR")
else:
# Set the desired color mode
ret = self.ueye.is_SetColorMode(self.h_cam, self.m_n_colormode)
# Activates the camera's live video mode (free run mode)
ret = self.ueye.is_CaptureVideo(self.h_cam, self.ueye.IS_DONT_WAIT)
if ret != self.ueye.IS_SUCCESS:
print("is_CaptureVideo ERROR")
# Enables the queue mode for existing image memory sequences
ret = self.ueye.is_InquireImageMem(
self.h_cam,
self.pc_image_memory,
self.mem_id,
self.width,
self.height,
self.n_bits_per_pixel,
self.pitch,
)
if ret != self.ueye.IS_SUCCESS:
print("is_InquireImageMem ERROR")
else:
print("Press q to leave the programm")
# startmeasureframerate = True
# Gain = False
# Start live mode of camera immediately
self.start_live_mode()
def _start_data_thread(self):
self.data_thread = threading.Thread(target=self._receive_data_from_camera, daemon=True)
self.data_thread.start()
def _receive_data_from_camera(self):
while not self.thread_event.is_set():
if self.ueye is None:
print("pyueye library not available.")
return
# In order to display the image in an OpenCV window we need to...
# ...extract the data of our image memory
array = self.ueye.get_data(
self.pc_image_memory,
self.width,
self.height,
self.n_bits_per_pixel,
self.pitch,
copy=False,
)
# ...reshape it in an numpy array...
frame = np.reshape(array, (self.height.value, self.width.value, self.bytes_per_pixel))
self.image_data.put(frame)
time.sleep(0.1)
def wait_for_connection(self, all_signals=False, timeout=10):
if ueye is None:
@@ -254,226 +325,73 @@ class IDSCamera(PSIDetectorBase):
"The pyueye library is not installed or doesn't provide the necessary c libs"
)
super().wait_for_connection(all_signals, timeout)
#self.custom_prepare.on_connection_established()
def destroy(self):
"""Extend Ophyds destroy function to kill the data thread"""
self.stop_live_mode()
super().destroy()
def start_live_mode(self):
if self.custom_prepare.data_thread is not None:
if self.data_thread is not None:
self.stop_live_mode()
self.custom_prepare._start_data_thread()
self._start_data_thread()
def stop_live_mode(self):
"""Stopping the camera live mode."""
if self.custom_prepare.thread_event is not None:
self.custom_prepare.thread_event.set()
if self.custom_prepare.data_thread is not None:
self.custom_prepare.data_thread.join()
self.custom_prepare.thread_event = None
self.custom_prepare.data_thread = None
self.thread_event.set()
if self.data_thread is not None:
self.data_thread.join()
self.thread_event.clear()
self.data_thread = None
########################################
# Beamline Specific Implementations #
########################################
def on_init(self) -> None:
"""
Called when the device is initialized.
No signals are connected at this point. If you like to
set default values on signals, please use on_connected instead.
"""
def on_connected(self) -> None:
"""
Called after the device is connected and its signals are connected.
Default values for signals should be set here.
"""
self.start_backend()
self.start_live_mode()
def on_stage(self) -> DeviceStatus | StatusBase | None:
"""
Called while staging the device.
Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object.
"""
def on_unstage(self) -> DeviceStatus | StatusBase | None:
"""Called while unstaging the device."""
def on_pre_scan(self) -> DeviceStatus | StatusBase | None:
"""Called right before the scan starts on all devices automatically."""
def on_trigger(self) -> DeviceStatus | StatusBase | None:
"""Called when the device is triggered."""
def on_complete(self) -> DeviceStatus | StatusBase | None:
"""Called to inquire if a device has completed a scans."""
def on_kickoff(self) -> DeviceStatus | StatusBase | None:
"""Called to kickoff a device for a fly scan. Has to be called explicitly."""
def on_stop(self) -> None:
"""Called when the device is stopped."""
def on_destroy(self) -> None:
"""Called when the device is destroyed. Cleanup resources here."""
self.stop_live_mode()
"""from pyueye import ueye
import numpy as np
import cv2
import sys
import time
if __name__ == "__main__":
# Example usage
camera = IDSCamera(name="camera", camera_ID=201, bits_per_pixel=24, channels=3, m_n_colormode=1)
camera.wait_for_connection()
#---------------------------------------------------------------------------------------------------------------------------------------
#Variables
hCam = ueye.HIDS(202) #0: first available camera; 1-254: The camera with the specified camera ID
sInfo = ueye.SENSORINFO()
cInfo = ueye.CAMINFO()
pcImageMemory = ueye.c_mem_p()
MemID = ueye.int()
rectAOI = ueye.IS_RECT()
pitch = ueye.INT()
nBitsPerPixel = ueye.INT(24) #24: bits per pixel for color mode; take 8 bits per pixel for monochrome
channels = 3 #3: channels for color mode(RGB); take 1 channel for monochrome
m_nColorMode = ueye.INT(1) # Y8/RGB16/RGB24/REG32 (1 for our color cameras)
bytes_per_pixel = int(nBitsPerPixel / 8)
ids_cam
...
deviceConfig:
camera_ID: 202
bits_per_pixel: 24
channels: 3
m_n_colormode: 1
#---------------------------------------------------------------------------------------------------------------------------------------
print("START")
print()
# Starts the driver and establishes the connection to the camera
nRet = ueye.is_InitCamera(hCam, None)
if nRet != ueye.IS_SUCCESS:
print("is_InitCamera ERROR")
# Reads out the data hard-coded in the non-volatile camera memory and writes it to the data structure that cInfo points to
nRet = ueye.is_GetCameraInfo(hCam, cInfo)
if nRet != ueye.IS_SUCCESS:
print("is_GetCameraInfo ERROR")
# You can query additional information about the sensor type used in the camera
nRet = ueye.is_GetSensorInfo(hCam, sInfo)
if nRet != ueye.IS_SUCCESS:
print("is_GetSensorInfo ERROR")
nRet = ueye.is_ResetToDefault( hCam)
if nRet != ueye.IS_SUCCESS:
print("is_ResetToDefault ERROR")
# Set display mode to DIB
nRet = ueye.is_SetDisplayMode(hCam, ueye.IS_SET_DM_DIB)
# Set the right color mode
if int.from_bytes(sInfo.nColorMode.value, byteorder='big') == ueye.IS_COLORMODE_BAYER:
# setup the color depth to the current windows setting
ueye.is_GetColorDepth(hCam, nBitsPerPixel, m_nColorMode)
bytes_per_pixel = int(nBitsPerPixel / 8)
print("IS_COLORMODE_BAYER: ", )
print("\tm_nColorMode: \t\t", m_nColorMode)
print("\tnBitsPerPixel: \t\t", nBitsPerPixel)
print("\tbytes_per_pixel: \t\t", bytes_per_pixel)
print()
elif int.from_bytes(sInfo.nColorMode.value, byteorder='big') == ueye.IS_COLORMODE_CBYCRY:
# for color camera models use RGB32 mode
m_nColorMode = ueye.IS_CM_BGRA8_PACKED
nBitsPerPixel = ueye.INT(32)
bytes_per_pixel = int(nBitsPerPixel / 8)
print("IS_COLORMODE_CBYCRY: ", )
print("\tm_nColorMode: \t\t", m_nColorMode)
print("\tnBitsPerPixel: \t\t", nBitsPerPixel)
print("\tbytes_per_pixel: \t\t", bytes_per_pixel)
print()
elif int.from_bytes(sInfo.nColorMode.value, byteorder='big') == ueye.IS_COLORMODE_MONOCHROME:
# for color camera models use RGB32 mode
m_nColorMode = ueye.IS_CM_MONO8
nBitsPerPixel = ueye.INT(8)
bytes_per_pixel = int(nBitsPerPixel / 8)
print("IS_COLORMODE_MONOCHROME: ", )
print("\tm_nColorMode: \t\t", m_nColorMode)
print("\tnBitsPerPixel: \t\t", nBitsPerPixel)
print("\tbytes_per_pixel: \t\t", bytes_per_pixel)
print()
else:
# for monochrome camera models use Y8 mode
m_nColorMode = ueye.IS_CM_MONO8
nBitsPerPixel = ueye.INT(8)
bytes_per_pixel = int(nBitsPerPixel / 8)
print("else")
# Can be used to set the size and position of an "area of interest"(AOI) within an image
nRet = ueye.is_AOI(hCam, ueye.IS_AOI_IMAGE_GET_AOI, rectAOI, ueye.sizeof(rectAOI))
if nRet != ueye.IS_SUCCESS:
print("is_AOI ERROR")
width = rectAOI.s32Width
height = rectAOI.s32Height
# Prints out some information about the camera and the sensor
print("Camera model:\t\t", sInfo.strSensorName.decode('utf-8'))
print("Camera serial no.:\t", cInfo.SerNo.decode('utf-8'))
print("Maximum image width:\t", width)
print("Maximum image height:\t", height)
print()
#---------------------------------------------------------------------------------------------------------------------------------------
# Allocates an image memory for an image having its dimensions defined by width and height and its color depth defined by nBitsPerPixel
nRet = ueye.is_AllocImageMem(hCam, width, height, nBitsPerPixel, pcImageMemory, MemID)
if nRet != ueye.IS_SUCCESS:
print("is_AllocImageMem ERROR")
else:
# Makes the specified image memory the active memory
nRet = ueye.is_SetImageMem(hCam, pcImageMemory, MemID)
if nRet != ueye.IS_SUCCESS:
print("is_SetImageMem ERROR")
else:
# Set the desired color mode
nRet = ueye.is_SetColorMode(hCam, m_nColorMode)
# Activates the camera's live video mode (free run mode)
nRet = ueye.is_CaptureVideo(hCam, ueye.IS_DONT_WAIT)
if nRet != ueye.IS_SUCCESS:
print("is_CaptureVideo ERROR")
# Enables the queue mode for existing image memory sequences
nRet = ueye.is_InquireImageMem(hCam, pcImageMemory, MemID, width, height, nBitsPerPixel, pitch)
if nRet != ueye.IS_SUCCESS:
print("is_InquireImageMem ERROR")
else:
print("Press q to leave the programm")
startmeasureframerate=True
Gain = False
#---------------------------------------------------------------------------------------------------------------------------------------
# Continuous image display
while(nRet == ueye.IS_SUCCESS):
# In order to display the image in an OpenCV window we need to...
# ...extract the data of our image memory
array = ueye.get_data(pcImageMemory, width, height, nBitsPerPixel, pitch, copy=False)
# bytes_per_pixel = int(nBitsPerPixel / 8)
# ...reshape it in an numpy array...
frame = np.reshape(array,(height.value, width.value, bytes_per_pixel))
# ...resize the image by a half
frame = cv2.resize(frame,(0,0),fx=0.5, fy=0.5)
#---------------------------------------------------------------------------------------------------------------------------------------
#Include image data processing here
#---------------------------------------------------------------------------------------------------------------------------------------
#...and finally display it
cv2.imshow("SimpleLive_Python_uEye_OpenCV", frame)
if startmeasureframerate:
starttime = time.time()
startmeasureframerate=False
framenumber=0
if time.time() > starttime+5:
print(f"Caught {framenumber/5} frames per second")
startmeasureframerate=True
Gain = ~Gain
if Gain:
nRet = ueye.is_SetGainBoost(hCam, 1)
else:
nRet = ueye.is_SetGainBoost(hCam, 0)
print(f"Gain setting status {nRet}")
#...and finally display it
cv2.imshow("SimpleLive_Python_uEye_OpenCV", frame)
framenumber+=1
time.sleep(0.1)
# Press q if you want to end the loop
if (cv2.waitKey(1) & 0xFF) == ord('q'):
break
#---------------------------------------------------------------------------------------------------------------------------------------
# Releases an image memory that was allocated using is_AllocImageMem() and removes it from the driver management
ueye.is_FreeImageMem(hCam, pcImageMemory, MemID)
# Disables the hCam camera handle and releases the data structures and memory areas taken up by the uEye camera
ueye.is_ExitCamera(hCam)
# Destroys the OpenCv windows
cv2.destroyAllWindows()
print()
print("END")
"""
camera.on_destroy()

View File

@@ -0,0 +1,285 @@
"""IDS Camera class for cSAXS IDS cameras."""
from __future__ import annotations
import threading
import time
from typing import TYPE_CHECKING, Literal, Tuple, TypedDict
import numpy as np
from bec_lib import messages
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from ophyd_devices.utils.bec_signals import AsyncSignal, PreviewSignal
from pydantic import BaseModel, field_validator
from csaxs_bec.devices.ids_cameras.base_integration.camera import Camera
if TYPE_CHECKING:
from bec_lib.devicemanager import ScanInfo
from pydantic import ValidationInfo
logger = bec_logger.logger
AnyDimShape = Tuple[int, ...]
class ROISpec(TypedDict):
"""
Typed dictionary representing the specification for a Region of Interest (ROI).
"""
x: int
y: int
width: int
height: int
img_shape: AnyDimShape
mask: np.ndarray
def validate_roi(roi: ROISpec) -> ROISpec:
"""
Validate the ROI specification to ensure it matches the image shape.
Args:
roi (ROISpec): The ROI specification to validate.
Returns:
ROISpec: The validated ROI specification.
"""
if roi["mask"].shape != roi["img_shape"]:
raise ValueError(
f"Mask shape {roi['mask'].shape} does not match image shape {roi['img_shape']}."
)
return roi
class IDSCamera(PSIDeviceBase):
"""IDS Camera class for cSAXS.
This class inherits from PSIDeviceBase and implements the necessary methods
to interact with the IDS camera using the pyueye library.
"""
image = Cpt(PreviewSignal, name="image", ndim=2, doc="Preview signal for the camera.")
roi_signal = Cpt(
AsyncSignal,
name="roi_signal",
ndim=0,
max_size=1000,
doc="Signal for the region of interest (ROI).",
async_update={"type": "add", "max_shape": [None]},
)
USER_ACCESS = ["live_mode", "roi", "get_last_image"]
def __init__(
self,
*,
name: str,
camera_id: int,
prefix: str = "",
scan_info: ScanInfo | None = None,
m_n_colormode: Literal[0, 1, 2, 3] = 1,
bits_per_pixel: Literal[8, 24] = 24,
live_mode: bool = False,
roi: tuple[int, int, int, int] | None = None,
**kwargs,
):
"""Initialize the IDS Camera.
Args:
name (str): Name of the device.
camera_id (int): The ID of the camera device.
prefix (str): Prefix for the device.
scan_info (ScanInfo | None): Scan information for the device.
m_n_colormode (Literal[0, 1, 2, 3]): Color mode for the camera.
bits_per_pixel (Literal[8, 24]): Number of bits per pixel for the camera.
live_mode (bool): Whether to enable live mode for the camera.
"""
super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs)
self._live_mode_thread: threading.Thread | None = None
self._stop_live_mode_event: threading.Event = threading.Event()
self.cam = Camera(
camera_id=camera_id,
m_n_colormode=m_n_colormode,
bits_per_pixel=bits_per_pixel,
connect=False,
)
self._live_mode = False
self._inputs = {"roi": roi if roi else (0, 0, 1, 1), "live_mode": live_mode}
self._roi: ROISpec = validate_roi(
ROISpec(
{
"x": 0,
"y": 0,
"width": 1,
"height": 1,
"img_shape": (1, 1),
"mask": np.zeros((1, 1), dtype=np.uint8),
}
)
)
############## Live Mode Methods ##############
@property
def roi(self) -> ROISpec:
"""Return the current region of interest (ROI) for the camera."""
return self._roi
@roi.setter
def roi(self, value: ROISpec | tuple[int, int, int, int] | list[int, int, int, int]):
"""
Set the region of interest (ROI) for the camera.
Args:
value (ROI | tuple[int, int, int, int] | list[int, int, int, int]): Either an ROI object, or a tuple or list with x, y, width, and height.
"""
if isinstance(value, (tuple, list)) and len(value) == 4:
x = value[0]
y = value[1]
width = value[2]
height = value[3]
if x + width > self.cam.cam.width.value or y + height > self.cam.cam.height.value:
raise ValueError("ROI exceeds camera dimensions.")
img_shape = (self.cam.cam.height.value, self.cam.cam.width.value)
mask = np.zeros(img_shape, dtype=np.uint8)
mask[y : y + height, x : x + width] = 1
value = validate_roi(
ROISpec(
{
"x": x,
"y": y,
"width": width,
"height": height,
"img_shape": img_shape,
"mask": mask,
}
)
)
if not isinstance(value, dict) or not all(
key in value for key in ["x", "y", "width", "height", "img_shape", "mask"]
):
raise TypeError(f"ROI must be an instance of ROISpec {value}.")
self._roi = value
@property
def live_mode(self) -> bool:
"""Return whether the camera is in live mode."""
return self._live_mode
@live_mode.setter
def live_mode(self, value: bool):
"""Set the live mode for the camera."""
if value != self._live_mode:
if self.cam._connected is False: # $ pylint: disable=protected-access
self.cam.on_connect()
self._live_mode = value
if value:
self._start_live()
else:
self._stop_live()
def _start_live(self):
"""Start the live mode for the camera."""
if self._live_mode_thread is not None:
logger.info("Live mode thread is already running.")
return
self._stop_live_mode_event.clear()
self._live_mode_thread = threading.Thread(
target=self._live_mode_loop, args=(self._stop_live_mode_event,)
)
self._live_mode_thread.start()
def _stop_live(self):
"""Stop the live mode for the camera."""
if self._live_mode_thread is None:
logger.info("Live mode thread is not running.")
return
self._stop_live_mode_event.set()
self._live_mode_thread.join(timeout=5)
if self._live_mode_thread.is_alive():
logger.warning("Live mode thread did not stop gracefully.")
else:
self._live_mode_thread = None
logger.info("Live mode stopped.")
def _live_mode_loop(self, stop_event: threading.Event):
"""Loop to capture images in live mode."""
while not stop_event.is_set():
try:
self.process_data(self.cam.get_image_data())
except Exception as e:
logger.error(f"Error in live mode loop: {e}")
break
stop_event.wait(0.2) # 5 Hz
def process_data(self, image: np.ndarray | None):
"""Process the image data before sending it to the preview signal."""
if image is None:
return
self.image.put(image)
def get_last_image(self) -> np.ndarray:
image = self.image.get()
if image:
return image.data
############## User Interface Methods ##############
def on_connected(self):
"""Connect to the camera."""
self.cam.on_connect()
self.live_mode = self._inputs.get("live_mode", None)
roi = self._inputs.get("roi", None)
if roi is None or not isinstance(roi, (tuple, list)) or not len(roi) == 4:
# If ROI is not set, use the full camera resolution
roi = (0, 0, self.cam.cam.width.value, self.cam.cam.height.value)
self.roi = roi
def on_destroy(self):
"""Clean up resources when the device is destroyed."""
self.cam.on_disconnect()
super().on_destroy()
def on_trigger(self):
"""Handle the trigger event."""
if not self.live_mode:
return
image = self.image.get()
if image is not None:
image: messages.DevicePreviewMessage
if self.roi["img_shape"][0:2] != image.data.shape[0:2]:
logger.info(
f"ROI shape does not match image shape, skipping ROI application for device {self.name}."
)
return
if len(image.data.shape) == 3:
# If the image has multiple channels, apply the mask to each channel
data = (
image.data * self.roi["mask"][:, :, np.newaxis]
) # Apply mask to the image data
n_channels = 3
else:
data = image.data * self.roi["mask"]
n_channels = 1
self.roi_signal.put(
{
self.roi_signal.name: {
"value": np.sum(data)
/ (np.sum(self.roi["mask"]) * n_channels), # TODO can be optimized
"timestamp": time.time(),
}
}
)
if __name__ == "__main__":
# Example usage of the IDSCamera class
camera = IDSCamera(name="TestCamera", camera_id=201, live_mode=False)
print(f"Camera {camera.name} initialized with ID {camera.cam.camera_id}.")

View File

@@ -1,83 +0,0 @@
import time
import numpy as np
from bec_lib import bec_logger
from ophyd import Kind, Signal
from ophyd.utils import ReadOnlyError
from ophyd_devices.utils.bec_device_base import BECDeviceBase
logger = bec_logger.logger
# Readout precision for Setable/ReadOnlySignal signals
PRECISION = 3
class ReadOnlySignal(Signal):
"""Setable signal for simulated devices.
The signal will store the value in sim_state of the SimulatedData class of the parent device.
It will also return the value from sim_state when get is called. Compared to the ReadOnlySignal,
this signal can be written to.
The setable signal inherits from the Signal class of ophyd, thus the class attribute needs to be
initiated as a Component (class from ophyd).
>>> signal = SetableSignal(name="signal", parent=parent, value=0)
Parameters
----------
name (string) : Name of the signal
parent (object) : Parent object of the signal, default none.
value (any) : Initial value of the signal, default 0.
kind (int) : Kind of the signal, default Kind.normal.
precision (float) : Precision of the signal, default PRECISION.
"""
def __init__(
self,
name: str,
*args,
fcn: callable,
kind: int = Kind.normal,
precision: float = PRECISION,
**kwargs,
):
super().__init__(*args, name=name, value=value, kind=kind, **kwargs)
self._metadata.update(connected=True, write_access=False)
self._value = None
self.precision = precision
self.fcn = fcn
# pylint: disable=arguments-differ
def get(self):
"""Get the current position of the simulated device.
Core function for signal.
"""
self._value = self.fcn()
return self._value
# pylint: disable=arguments-differ
def put(self, value):
"""Put the value to the simulated device.
Core function for signal.
"""
self._update_sim_state(value)
self._value = value
def describe(self):
"""Describe the readback signal.
Core function for signal.
"""
res = super().describe()
if self.precision is not None:
res[self.name]["precision"] = self.precision
return res
@property
def timestamp(self):
"""Timestamp of the readback value"""
return self._get_timestamp()