refactor(ids): refactor ids camera
This commit is contained in:
@@ -6,4 +6,6 @@ _commit: v1.1.2
|
||||
_src_path: https://github.com/bec-project/plugin_copier_template.git
|
||||
make_commit: false
|
||||
project_name: csaxs_bec
|
||||
widget_plugins_input: []
|
||||
widget_plugins_input:
|
||||
- name: csaxs_test
|
||||
use_ui: true
|
||||
|
||||
@@ -28,4 +28,17 @@ mcs:
|
||||
onFailure: raise
|
||||
enabled: true
|
||||
readoutPriority: monitored
|
||||
softwareTrigger: false
|
||||
softwareTrigger: false
|
||||
|
||||
ids_cam:
|
||||
description: IDS camera for live image acquisition
|
||||
deviceClass: csaxs_bec.devices.ids_cameras.ids_camera_new.IDSCamera
|
||||
deviceConfig:
|
||||
camera_id: 201
|
||||
bits_per_pixel: 24
|
||||
m_n_colormode: 1
|
||||
live_mode: True
|
||||
onFailure: raise
|
||||
enabled: true
|
||||
readoutPriority: async
|
||||
softwareTrigger: True
|
||||
|
||||
@@ -165,20 +165,20 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
|
||||
)
|
||||
self.cancel_on_stop(status_ready_read)
|
||||
self.cancel_on_stop(status_acquiring)
|
||||
status_ready_read.wait(2)
|
||||
status_ready_read.wait(10)
|
||||
|
||||
mcs.ready_to_read.put(READYTOREAD.PROCESSING)
|
||||
mcs.erase_start.put(1)
|
||||
status_acquiring.wait(
|
||||
timeout=2
|
||||
timeout=10
|
||||
) # 2 s wait for mcs card to start should be more than enough..
|
||||
|
||||
st = StatusBase()
|
||||
st = DeviceStatus(self)
|
||||
self.cancel_on_stop(st)
|
||||
self.trigger_shot.put(1, use_complete=True)
|
||||
time.sleep(self.scan_info.msg.scan_parameters["exp_time"])
|
||||
self.cancel_on_stop(st)
|
||||
status = self.wait_for_status(status=st, bit_event=STATUSBITS.END_OF_DELAY, timeout=2)
|
||||
status = self.wait_for_status(status=st, bit_event=STATUSBITS.END_OF_DELAY, timeout=10)
|
||||
return status
|
||||
|
||||
def wait_for_status(
|
||||
@@ -198,7 +198,11 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
|
||||
if (STATUSBITS(event_status) & bit_event) == bit_event:
|
||||
status.set_finished()
|
||||
if time.time() - current_time > timeout:
|
||||
status.set_exception(TimeoutError(f"Timeout waiting for status {status}"))
|
||||
status.set_exception(
|
||||
TimeoutError(
|
||||
f"Timeout waiting for status of device {self.name} for event_status {bit_event}"
|
||||
)
|
||||
)
|
||||
break
|
||||
time.sleep(0.1)
|
||||
time.sleep(0.05) # Give time for the IOC to be ready again
|
||||
|
||||
@@ -118,7 +118,7 @@ class DDG2(PSIDeviceBase, DelayGeneratorCSAXS):
|
||||
"""
|
||||
|
||||
def wait_for_status(
|
||||
self, status: StatusBase, bit_event: STATUSBITS, timeout: float = 2
|
||||
self, status: DeviceStatus, bit_event: STATUSBITS, timeout: float = 5
|
||||
) -> None:
|
||||
"""Wait for a event status bit to be set.
|
||||
|
||||
@@ -134,7 +134,11 @@ class DDG2(PSIDeviceBase, DelayGeneratorCSAXS):
|
||||
if (STATUSBITS(event_status) & bit_event) == bit_event:
|
||||
status.set_finished()
|
||||
if time.time() - current_time > timeout:
|
||||
status.set_exception(TimeoutError(f"Timeout waiting for status {status}"))
|
||||
status.set_exception(
|
||||
TimeoutError(
|
||||
f"Timeout waiting for status of device {self.name} for event_status {bit_event}"
|
||||
)
|
||||
)
|
||||
break
|
||||
time.sleep(0.1)
|
||||
time.sleep(0.05) # Give time for the IOC to be ready again
|
||||
|
||||
@@ -403,7 +403,7 @@ class DelayGeneratorCSAXS(Device):
|
||||
In addition, the io layer allows setting amplitude, offset and polarity for each pair.
|
||||
"""
|
||||
|
||||
_pv_timeout: float = 1.5 # Default timeout for PV operations in seconds
|
||||
_pv_timeout: float = 5 # Default timeout for PV operations in seconds
|
||||
|
||||
# Front Panel
|
||||
t0 = Cpt(StaticPair, "T0", name="t0", doc="T0 static pair")
|
||||
|
||||
@@ -118,7 +118,7 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
|
||||
name=name, prefix=prefix, scan_info=scan_info, device_manager=device_manager, **kwargs
|
||||
)
|
||||
self._mcs_clock = 1e7 # 10MHz clock -> 1e7 Hz
|
||||
self._pv_timeout = 2
|
||||
self._pv_timeout = 3 # TODO remove timeout once #129 in ophyd_devices is solved
|
||||
self._rlock = RLock()
|
||||
self.counter_mapping = {
|
||||
f"{self.counters.name}_mca1": "current1",
|
||||
|
||||
251
csaxs_bec/devices/ids_cameras/base_integration/camera.py
Normal file
251
csaxs_bec/devices/ids_cameras/base_integration/camera.py
Normal file
@@ -0,0 +1,251 @@
|
||||
"""This module provides a Camera class for handling IDS cameras using the pyueye SDK library."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import atexit
|
||||
from typing import Literal
|
||||
|
||||
import numpy as np
|
||||
from bec_lib.logger import bec_logger
|
||||
|
||||
from csaxs_bec.devices.ids_cameras.base_integration.utils import check_error
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
try:
|
||||
from pyueye import ueye
|
||||
except ImportError as exc:
|
||||
logger.warning(f"The pyueye library is not properly installed : {exc}")
|
||||
ueye = None # type: ignore[assignment]
|
||||
|
||||
|
||||
class IDSCameraObject:
|
||||
"""Base class for IDS Camera object.
|
||||
|
||||
Args:
|
||||
device_id (int): The ID of the camera device. # e.g. 201; check idscamera tool
|
||||
m_n_colormode (int): Color mode for the camera. # 1 for cSAXS color cameras
|
||||
bits_per_pixel (int): Number of bits per pixel for the camera. # 24 for color cameras, 8 for monochrome cameras
|
||||
"""
|
||||
|
||||
def __init__(self, device_id: int, m_n_colormode, bits_per_pixel):
|
||||
self.ueye = ueye
|
||||
self._device_id = device_id
|
||||
self.h_cam = ueye.HIDS(device_id)
|
||||
self.s_info = ueye.SENSORINFO()
|
||||
self.c_info = ueye.CAMINFO()
|
||||
self.rect_roi = ueye.IS_RECT()
|
||||
self.pc_image_mem = ueye.c_mem_p()
|
||||
self.mem_id = ueye.int()
|
||||
self.pitch = ueye.INT()
|
||||
self.m_n_colormode = ueye.INT(m_n_colormode)
|
||||
self.n_bits_per_pixel = ueye.INT(bits_per_pixel)
|
||||
self.bytes_per_pixel = int(self.n_bits_per_pixel / 8)
|
||||
|
||||
# Sequence to initialize the camera
|
||||
check_error(ueye.is_InitCamera(self.h_cam, None), "IDSCameraObject")
|
||||
check_error(ueye.is_GetSensorInfo(self.h_cam, self.s_info), "IDSCameraObject")
|
||||
check_error(ueye.is_GetCameraInfo(self.h_cam, self.c_info), "IDSCameraObject")
|
||||
check_error(ueye.is_ResetToDefault(self.h_cam), "IDSCameraObject")
|
||||
check_error(ueye.is_SetDisplayMode(self.h_cam, ueye.IS_SET_DM_DIB), "IDSCameraObject")
|
||||
|
||||
if (
|
||||
int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
|
||||
== self.ueye.IS_COLORMODE_BAYER
|
||||
):
|
||||
print("Bayer color mode detected.")
|
||||
# setup the color depth to the current windows setting
|
||||
self.ueye.is_GetColorDepth(
|
||||
self.h_cam, self.n_bits_per_pixel, self.m_n_colormode
|
||||
) # TODO This raises an error - maybe check the m_n_colormode value
|
||||
self.bytes_per_pixel = int(self.n_bits_per_pixel / 8)
|
||||
elif (
|
||||
int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
|
||||
== self.ueye.IS_COLORMODE_CBYCRY
|
||||
):
|
||||
# for color camera models use RGB32 mode
|
||||
self.m_n_colormode = self.ueye.IS_CM_BGRA8_PACKED
|
||||
self.n_bits_per_pixel = self.ueye.INT(32)
|
||||
self.bytes_per_pixel = int(self.n_bits_per_pixel / 8)
|
||||
elif (
|
||||
int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
|
||||
== self.ueye.IS_COLORMODE_MONOCHROME
|
||||
):
|
||||
# for color camera models use RGB32 mode
|
||||
self.m_n_colormode = self.ueye.IS_CM_MONO8
|
||||
self.n_bits_per_pixel = self.ueye.INT(8)
|
||||
self.bytes_per_pixel = int(self.n_bits_per_pixel / 8)
|
||||
else:
|
||||
# for monochrome camera models use Y8 mode
|
||||
self.m_n_colormode = self.ueye.IS_CM_MONO8
|
||||
self.n_bits_per_pixel = self.ueye.INT(8)
|
||||
self.bytes_per_pixel = int(self.n_bits_per_pixel / 8)
|
||||
print("else")
|
||||
|
||||
# Can be used to set the size and position of an "area of interest"(AOI) within an image
|
||||
check_error(
|
||||
self.ueye.is_AOI(
|
||||
self.h_cam,
|
||||
self.ueye.IS_AOI_IMAGE_GET_AOI,
|
||||
self.rect_roi,
|
||||
self.ueye.sizeof(self.rect_roi),
|
||||
),
|
||||
"IDSCameraObject",
|
||||
)
|
||||
self.width = self.rect_roi.s32Width
|
||||
self.height = self.rect_roi.s32Height
|
||||
|
||||
check_error(
|
||||
self.ueye.is_AllocImageMem(
|
||||
self.h_cam,
|
||||
self.width,
|
||||
self.height,
|
||||
self.n_bits_per_pixel,
|
||||
self.pc_image_mem,
|
||||
self.mem_id,
|
||||
),
|
||||
"IDSCameraObject",
|
||||
)
|
||||
|
||||
check_error(
|
||||
self.ueye.is_SetImageMem(self.h_cam, self.pc_image_mem, self.mem_id), "IDSCameraObject"
|
||||
)
|
||||
check_error(self.ueye.is_SetColorMode(self.h_cam, self.m_n_colormode), "IDSCameraObject")
|
||||
|
||||
check_error(
|
||||
self.ueye.is_CaptureVideo(self.h_cam, self.ueye.IS_DONT_WAIT), "IDSCameraObject"
|
||||
)
|
||||
check_error(
|
||||
self.ueye.is_InquireImageMem(
|
||||
self.h_cam,
|
||||
self.pc_image_mem,
|
||||
self.mem_id,
|
||||
self.width,
|
||||
self.height,
|
||||
self.n_bits_per_pixel,
|
||||
self.pitch,
|
||||
),
|
||||
"IDSCameraObject",
|
||||
)
|
||||
|
||||
def __repr__(self):
|
||||
return f"IDSCameraObject\n\ndevice_id={self._device_id},\ns_info={self.s_info},\nc_info={self.c_info},\nrect_roi={self.rect_roi},\npc_image_mem={self.pc_image_mem},\nmem_id={self.mem_id},\npitch={self.pitch},\nm_n_colormode={self.m_n_colormode},\nn_bits_per_pixel={self.n_bits_per_pixel},\nbytes_per_pixel={self.bytes_per_pixel}"
|
||||
|
||||
|
||||
class Camera:
|
||||
"""Camera base class for IDS cameras."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
camera_id: int,
|
||||
m_n_colormode: Literal[0, 1, 2, 3] = 1,
|
||||
bits_per_pixel: int = 24,
|
||||
connect: bool = True,
|
||||
):
|
||||
self.ueye = ueye
|
||||
self.camera_id = camera_id
|
||||
self._inputs = {"m_n_colormode": m_n_colormode, "bits_per_pixel": bits_per_pixel}
|
||||
self._connected = False
|
||||
self.cam = None
|
||||
atexit.register(self.on_disconnect)
|
||||
|
||||
if connect:
|
||||
self.on_connect()
|
||||
|
||||
def set_roi(self, x: int, y: int, width: int, height: int):
|
||||
"""Set the region of interest (ROI) for the camera."""
|
||||
rect_roi = ueye.IS_RECT()
|
||||
rect_roi.s32X = x
|
||||
rect_roi.s32Y = y
|
||||
rect_roi.s32Width = width
|
||||
rect_roi.s32Height = height
|
||||
|
||||
ret = self.ueye.is_AOI(
|
||||
self.cam.h_cam, self.ueye.IS_AOI_IMAGE_SET_AOI, rect_roi, self.ueye.sizeof(rect_roi)
|
||||
)
|
||||
check_error(ret, "IDSCameraObject")
|
||||
logger.info(f"ROI set to: {rect_roi}")
|
||||
|
||||
def on_connect(self):
|
||||
"""Connect to the camera and initialize it."""
|
||||
if self._connected:
|
||||
logger.warning("Camera is already connected.")
|
||||
return
|
||||
self.cam = IDSCameraObject(self.camera_id, **self._inputs)
|
||||
self._connected = True
|
||||
|
||||
def on_disconnect(self):
|
||||
"""Disconnect from the camera."""
|
||||
try:
|
||||
if self.cam and self.cam.h_cam:
|
||||
check_error(self.ueye.is_ExitCamera(self.cam.h_cam), "IDSCameraObject")
|
||||
self._connected = False
|
||||
self.cam = None
|
||||
logger.info("Camera disconnected.")
|
||||
except Exception as e:
|
||||
logger.info(f"Error during camera disconnection: {e}")
|
||||
|
||||
@property
|
||||
def exposure_time(self) -> float:
|
||||
"""Get the exposure time of the camera."""
|
||||
exposure = ueye.c_double()
|
||||
ret = self.ueye.is_Exposure(self.cam.h_cam, ueye.IS_EXPOSURE_CMD_GET_EXPOSURE, exposure, 8)
|
||||
check_error(ret, "IDSCameraObject")
|
||||
return exposure.value
|
||||
|
||||
@exposure_time.setter
|
||||
def exposure_time(self, value: float):
|
||||
"""Set the exposure time of the camera."""
|
||||
exposure = ueye.c_double(value)
|
||||
check_error(
|
||||
self.ueye.is_Exposure(self.cam.h_cam, ueye.IS_EXPOSURE_CMD_SET_EXPOSURE, exposure, 8),
|
||||
"IDSCameraObject",
|
||||
)
|
||||
|
||||
def set_auto_gain(self, enable: bool):
|
||||
"""Enable or disable auto gain."""
|
||||
enable = ueye.c_int(1) if enable else ueye.c_int(0)
|
||||
value_to_return = ueye.c_double()
|
||||
check_error(
|
||||
self.ueye.is_SetAutoParameter(
|
||||
self.cam.h_cam, ueye.IS_SET_ENABLE_AUTO_GAIN, enable, value_to_return
|
||||
),
|
||||
"IDSCameraObject",
|
||||
)
|
||||
|
||||
def set_auto_shutter(self, enable: bool):
|
||||
"""Enable or disable auto exposure."""
|
||||
enable = ueye.c_int(1) if enable else ueye.c_int(0)
|
||||
value_to_return = ueye.c_double()
|
||||
check_error(
|
||||
self.ueye.is_SetAutoParameter(
|
||||
self.cam.h_cam, ueye.IS_SET_ENABLE_AUTO_SHUTTER, enable, value_to_return
|
||||
),
|
||||
"IDSCameraObject",
|
||||
)
|
||||
|
||||
def get_image_data(self) -> np.ndarray | None:
|
||||
"""Get the image data from the camera."""
|
||||
if not self._connected:
|
||||
logger.warning("Camera is not connected.")
|
||||
return None
|
||||
array = self.ueye.get_data(
|
||||
self.cam.pc_image_mem,
|
||||
self.cam.width,
|
||||
self.cam.height,
|
||||
self.cam.n_bits_per_pixel,
|
||||
self.cam.pitch,
|
||||
copy=False,
|
||||
)
|
||||
if array is None:
|
||||
logger.error("Failed to get image data from the camera.")
|
||||
return None
|
||||
return np.reshape(
|
||||
array, (self.cam.height.value, self.cam.width.value, self.cam.bytes_per_pixel)
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Example usage
|
||||
camera = Camera(camera_id=201)
|
||||
camera.on_connect()
|
||||
281
csaxs_bec/devices/ids_cameras/base_integration/utils.py
Normal file
281
csaxs_bec/devices/ids_cameras/base_integration/utils.py
Normal file
@@ -0,0 +1,281 @@
|
||||
"""Utility functions and classes for IDS cameras using the pyueye library."""
|
||||
|
||||
from unittest import mock
|
||||
|
||||
from bec_lib.logger import bec_logger
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
try:
|
||||
from pyueye import ueye
|
||||
except ImportError as exc:
|
||||
logger.warning(f"The pyueye library is not properly installed : {exc}")
|
||||
ueye = None
|
||||
|
||||
if ueye is not None:
|
||||
error_codes = {
|
||||
ueye.IS_NO_SUCCESS: "No success",
|
||||
ueye.IS_SUCCESS: "Success",
|
||||
ueye.IS_INVALID_CAMERA_HANDLE: "Invalid camera handle",
|
||||
ueye.IS_INVALID_HANDLE: "Invalid handle",
|
||||
ueye.IS_IO_REQUEST_FAILED: "IO request failed",
|
||||
ueye.IS_CANT_OPEN_DEVICE: "Cannot open device",
|
||||
ueye.IS_CANT_CLOSE_DEVICE: "Cannot close device",
|
||||
ueye.IS_CANT_SETUP_MEMORY: "Cannot setup memory",
|
||||
ueye.IS_NO_HWND_FOR_ERROR_REPORT: "No HWND for error report",
|
||||
ueye.IS_ERROR_MESSAGE_NOT_CREATED: "Error message not created",
|
||||
ueye.IS_ERROR_STRING_NOT_FOUND: "Error string not found",
|
||||
ueye.IS_HOOK_NOT_CREATED: "Hook not created",
|
||||
ueye.IS_TIMER_NOT_CREATED: "Timer not created",
|
||||
ueye.IS_CANT_OPEN_REGISTRY: "Cannot open registry",
|
||||
ueye.IS_CANT_READ_REGISTRY: "Cannot read registry",
|
||||
ueye.IS_CANT_VALIDATE_BOARD: "Cannot validate board",
|
||||
ueye.IS_CANT_GIVE_BOARD_ACCESS: "Cannot give board access",
|
||||
ueye.IS_NO_IMAGE_MEM_ALLOCATED: "No image memory allocated",
|
||||
ueye.IS_CANT_CLEANUP_MEMORY: "Cannot clean up memory",
|
||||
ueye.IS_CANT_COMMUNICATE_WITH_DRIVER: "Cannot communicate with driver",
|
||||
ueye.IS_FUNCTION_NOT_SUPPORTED_YET: "Function not supported yet",
|
||||
ueye.IS_OPERATING_SYSTEM_NOT_SUPPORTED: "Operating system not supported",
|
||||
ueye.IS_INVALID_VIDEO_IN: "Invalid video input",
|
||||
ueye.IS_INVALID_IMG_SIZE: "Invalid image size",
|
||||
ueye.IS_INVALID_ADDRESS: "Invalid address",
|
||||
ueye.IS_INVALID_VIDEO_MODE: "Invalid video mode",
|
||||
ueye.IS_INVALID_AGC_MODE: "Invalid AGC mode",
|
||||
ueye.IS_INVALID_GAMMA_MODE: "Invalid gamma mode",
|
||||
ueye.IS_INVALID_SYNC_LEVEL: "Invalid sync level",
|
||||
ueye.IS_INVALID_CBARS_MODE: "Invalid color bars mode",
|
||||
ueye.IS_INVALID_COLOR_MODE: "Invalid color mode",
|
||||
ueye.IS_INVALID_SCALE_FACTOR: "Invalid scale factor",
|
||||
ueye.IS_INVALID_IMAGE_SIZE: "Invalid image size",
|
||||
ueye.IS_INVALID_IMAGE_POS: "Invalid image position",
|
||||
ueye.IS_INVALID_CAPTURE_MODE: "Invalid capture mode",
|
||||
ueye.IS_INVALID_RISC_PROGRAM: "Invalid RISC program",
|
||||
ueye.IS_INVALID_BRIGHTNESS: "Invalid brightness",
|
||||
ueye.IS_INVALID_CONTRAST: "Invalid contrast",
|
||||
ueye.IS_INVALID_SATURATION_U: "Invalid saturation U",
|
||||
ueye.IS_INVALID_SATURATION_V: "Invalid saturation V",
|
||||
ueye.IS_INVALID_HUE: "Invalid hue",
|
||||
ueye.IS_INVALID_HOR_FILTER_STEP: "Invalid horizontal filter step",
|
||||
ueye.IS_INVALID_VERT_FILTER_STEP: "Invalid vertical filter step",
|
||||
ueye.IS_INVALID_EEPROM_READ_ADDRESS: "Invalid EEPROM read address",
|
||||
ueye.IS_INVALID_EEPROM_WRITE_ADDRESS: "Invalid EEPROM write address",
|
||||
ueye.IS_INVALID_EEPROM_READ_LENGTH: "Invalid EEPROM read length",
|
||||
ueye.IS_INVALID_EEPROM_WRITE_LENGTH: "Invalid EEPROM write length",
|
||||
ueye.IS_INVALID_BOARD_INFO_POINTER: "Invalid board info pointer",
|
||||
ueye.IS_INVALID_DISPLAY_MODE: "Invalid display mode",
|
||||
ueye.IS_INVALID_ERR_REP_MODE: "Invalid error report mode",
|
||||
ueye.IS_INVALID_BITS_PIXEL: "Invalid bits per pixel",
|
||||
ueye.IS_INVALID_MEMORY_POINTER: "Invalid memory pointer",
|
||||
ueye.IS_FILE_WRITE_OPEN_ERROR: "File write open error",
|
||||
ueye.IS_FILE_READ_OPEN_ERROR: "File read open error",
|
||||
ueye.IS_FILE_READ_INVALID_BMP_ID: "File read invalid BMP ID",
|
||||
ueye.IS_FILE_READ_INVALID_BMP_SIZE: "File read invalid BMP size",
|
||||
ueye.IS_FILE_READ_INVALID_BIT_COUNT: "File read invalid bit count",
|
||||
ueye.IS_WRONG_KERNEL_VERSION: "Wrong kernel version",
|
||||
ueye.IS_RISC_INVALID_XLENGTH: "RISC invalid X length",
|
||||
ueye.IS_RISC_INVALID_YLENGTH: "RISC invalid Y length",
|
||||
ueye.IS_RISC_EXCEED_IMG_SIZE: "RISC exceed image size",
|
||||
ueye.IS_DD_MAIN_FAILED: "DirectDraw main surface failed",
|
||||
ueye.IS_DD_PRIMSURFACE_FAILED: "DirectDraw primary surface failed",
|
||||
ueye.IS_DD_SCRN_SIZE_NOT_SUPPORTED: "Screen size not supported",
|
||||
ueye.IS_DD_CLIPPER_FAILED: "Clipper failed",
|
||||
ueye.IS_DD_CLIPPER_HWND_FAILED: "Clipper HWND failed",
|
||||
ueye.IS_DD_CLIPPER_CONNECT_FAILED: "Clipper connect failed",
|
||||
ueye.IS_DD_BACKSURFACE_FAILED: "Backsurface failed",
|
||||
ueye.IS_DD_BACKSURFACE_IN_SYSMEM: "Backsurface in system memory",
|
||||
ueye.IS_DD_MDL_MALLOC_ERR: "Memory malloc error",
|
||||
ueye.IS_DD_MDL_SIZE_ERR: "Memory size error",
|
||||
ueye.IS_DD_CLIP_NO_CHANGE: "Clip no change",
|
||||
ueye.IS_DD_PRIMMEM_NULL: "Primary memory null",
|
||||
ueye.IS_DD_BACKMEM_NULL: "Back memory null",
|
||||
ueye.IS_DD_BACKOVLMEM_NULL: "Back overlay memory null",
|
||||
ueye.IS_DD_OVERLAYSURFACE_FAILED: "Overlay surface failed",
|
||||
ueye.IS_DD_OVERLAYSURFACE_IN_SYSMEM: "Overlay surface in system memory",
|
||||
ueye.IS_DD_OVERLAY_NOT_ALLOWED: "Overlay not allowed",
|
||||
ueye.IS_DD_OVERLAY_COLKEY_ERR: "Overlay color key error",
|
||||
ueye.IS_DD_OVERLAY_NOT_ENABLED: "Overlay not enabled",
|
||||
ueye.IS_DD_GET_DC_ERROR: "Get DC error",
|
||||
ueye.IS_DD_DDRAW_DLL_NOT_LOADED: "DirectDraw DLL not loaded",
|
||||
ueye.IS_DD_THREAD_NOT_CREATED: "DirectDraw thread not created",
|
||||
ueye.IS_DD_CANT_GET_CAPS: "Cannot get capabilities",
|
||||
ueye.IS_DD_NO_OVERLAYSURFACE: "No overlay surface",
|
||||
ueye.IS_DD_NO_OVERLAYSTRETCH: "No overlay stretch",
|
||||
ueye.IS_DD_CANT_CREATE_OVERLAYSURFACE: "Cannot create overlay surface",
|
||||
ueye.IS_DD_CANT_UPDATE_OVERLAYSURFACE: "Cannot update overlay surface",
|
||||
ueye.IS_DD_INVALID_STRETCH: "Invalid stretch",
|
||||
ueye.IS_EV_INVALID_EVENT_NUMBER: "Invalid event number",
|
||||
ueye.IS_INVALID_MODE: "Invalid mode",
|
||||
ueye.IS_CANT_FIND_HOOK: "Cannot find hook",
|
||||
ueye.IS_CANT_GET_HOOK_PROC_ADDR: "Cannot get hook procedure address",
|
||||
ueye.IS_CANT_CHAIN_HOOK_PROC: "Cannot chain hook procedure",
|
||||
ueye.IS_CANT_SETUP_WND_PROC: "Cannot setup window procedure",
|
||||
ueye.IS_HWND_NULL: "HWND is null",
|
||||
ueye.IS_INVALID_UPDATE_MODE: "Invalid update mode",
|
||||
ueye.IS_NO_ACTIVE_IMG_MEM: "No active image memory",
|
||||
ueye.IS_CANT_INIT_EVENT: "Cannot initialize event",
|
||||
ueye.IS_FUNC_NOT_AVAIL_IN_OS: "Function not available in OS",
|
||||
ueye.IS_CAMERA_NOT_CONNECTED: "Camera not connected",
|
||||
ueye.IS_SEQUENCE_LIST_EMPTY: "Sequence list empty",
|
||||
ueye.IS_CANT_ADD_TO_SEQUENCE: "Cannot add to sequence",
|
||||
ueye.IS_LOW_OF_SEQUENCE_RISC_MEM: "Low sequence RISC memory",
|
||||
ueye.IS_IMGMEM2FREE_USED_IN_SEQ: "Image memory to free used in sequence",
|
||||
ueye.IS_IMGMEM_NOT_IN_SEQUENCE_LIST: "Image memory not in sequence list",
|
||||
ueye.IS_SEQUENCE_BUF_ALREADY_LOCKED: "Sequence buffer already locked",
|
||||
ueye.IS_INVALID_DEVICE_ID: "Invalid device ID",
|
||||
ueye.IS_INVALID_BOARD_ID: "Invalid board ID",
|
||||
ueye.IS_ALL_DEVICES_BUSY: "All devices busy",
|
||||
ueye.IS_HOOK_BUSY: "Hook busy",
|
||||
ueye.IS_TIMED_OUT: "Timed out",
|
||||
ueye.IS_NULL_POINTER: "Null pointer",
|
||||
ueye.IS_WRONG_HOOK_VERSION: "Wrong hook version",
|
||||
ueye.IS_INVALID_PARAMETER: "Invalid parameter",
|
||||
ueye.IS_NOT_ALLOWED: "Not allowed",
|
||||
ueye.IS_OUT_OF_MEMORY: "Out of memory",
|
||||
ueye.IS_INVALID_WHILE_LIVE: "Invalid while live",
|
||||
ueye.IS_ACCESS_VIOLATION: "Access violation",
|
||||
ueye.IS_UNKNOWN_ROP_EFFECT: "Unknown ROP effect",
|
||||
ueye.IS_INVALID_RENDER_MODE: "Invalid render mode",
|
||||
ueye.IS_INVALID_THREAD_CONTEXT: "Invalid thread context",
|
||||
ueye.IS_NO_HARDWARE_INSTALLED: "No hardware installed",
|
||||
ueye.IS_INVALID_WATCHDOG_TIME: "Invalid watchdog time",
|
||||
ueye.IS_INVALID_WATCHDOG_MODE: "Invalid watchdog mode",
|
||||
ueye.IS_INVALID_PASSTHROUGH_IN: "Invalid passthrough input",
|
||||
ueye.IS_ERROR_SETTING_PASSTHROUGH_IN: "Error setting passthrough input",
|
||||
ueye.IS_FAILURE_ON_SETTING_WATCHDOG: "Failure setting watchdog",
|
||||
ueye.IS_NO_USB20: "No USB 2.0",
|
||||
ueye.IS_CAPTURE_RUNNING: "Capture running",
|
||||
ueye.IS_MEMORY_BOARD_ACTIVATED: "Memory board activated",
|
||||
ueye.IS_MEMORY_BOARD_DEACTIVATED: "Memory board deactivated",
|
||||
ueye.IS_NO_MEMORY_BOARD_CONNECTED: "No memory board connected",
|
||||
ueye.IS_TOO_LESS_MEMORY: "Too little memory",
|
||||
ueye.IS_IMAGE_NOT_PRESENT: "Image not present",
|
||||
ueye.IS_MEMORY_MODE_RUNNING: "Memory mode running",
|
||||
ueye.IS_MEMORYBOARD_DISABLED: "Memoryboard disabled",
|
||||
ueye.IS_TRIGGER_ACTIVATED: "Trigger activated",
|
||||
ueye.IS_WRONG_KEY: "Wrong key",
|
||||
ueye.IS_CRC_ERROR: "CRC error",
|
||||
ueye.IS_NOT_YET_RELEASED: "Not yet released",
|
||||
ueye.IS_NOT_CALIBRATED: "Not calibrated", # already present
|
||||
ueye.IS_WAITING_FOR_KERNEL: "Waiting for kernel",
|
||||
ueye.IS_NOT_SUPPORTED: "Not supported", # already present
|
||||
ueye.IS_TRIGGER_NOT_ACTIVATED: "Trigger not activated",
|
||||
ueye.IS_OPERATION_ABORTED: "Operation aborted",
|
||||
ueye.IS_BAD_STRUCTURE_SIZE: "Bad structure size",
|
||||
ueye.IS_INVALID_BUFFER_SIZE: "Invalid buffer size",
|
||||
ueye.IS_INVALID_PIXEL_CLOCK: "Invalid pixel clock",
|
||||
ueye.IS_INVALID_EXPOSURE_TIME: "Invalid exposure time",
|
||||
ueye.IS_AUTO_EXPOSURE_RUNNING: "Auto exposure running",
|
||||
ueye.IS_CANNOT_CREATE_BB_SURF: "Cannot create BB surface",
|
||||
ueye.IS_CANNOT_CREATE_BB_MIX: "Cannot create BB mix",
|
||||
ueye.IS_BB_OVLMEM_NULL: "BB overlay memory null",
|
||||
ueye.IS_CANNOT_CREATE_BB_OVL: "Cannot create BB overlay",
|
||||
ueye.IS_NOT_SUPP_IN_OVL_SURF_MODE: "Not supported in overlay surface mode",
|
||||
ueye.IS_INVALID_SURFACE: "Invalid surface",
|
||||
ueye.IS_SURFACE_LOST: "Surface lost",
|
||||
ueye.IS_RELEASE_BB_OVL_DC: "Release BB overlay DC",
|
||||
ueye.IS_BB_TIMER_NOT_CREATED: "BB timer not created",
|
||||
ueye.IS_BB_OVL_NOT_EN: "BB overlay not enabled",
|
||||
ueye.IS_ONLY_IN_BB_MODE: "Only in BB mode",
|
||||
ueye.IS_INVALID_COLOR_FORMAT: "Invalid color format",
|
||||
ueye.IS_INVALID_WB_BINNING_MODE: "Invalid WB binning mode",
|
||||
ueye.IS_INVALID_I2C_DEVICE_ADDRESS: "Invalid I²C device address",
|
||||
ueye.IS_COULD_NOT_CONVERT: "Could not convert",
|
||||
ueye.IS_TRANSFER_ERROR: "Transfer error", # already present
|
||||
ueye.IS_PARAMETER_SET_NOT_PRESENT: "Parameter set not present",
|
||||
ueye.IS_INVALID_CAMERA_TYPE: "Invalid camera type",
|
||||
ueye.IS_INVALID_HOST_IP_HIBYTE: "Invalid host IP high byte",
|
||||
ueye.IS_CM_NOT_SUPP_IN_CURR_DISPLAYMODE: "Color matrix not supported in current display mode",
|
||||
ueye.IS_NO_IR_FILTER: "No IR filter",
|
||||
ueye.IS_STARTER_FW_UPLOAD_NEEDED: "Starter firmware upload needed",
|
||||
ueye.IS_DR_LIBRARY_NOT_FOUND: "Driver library not found",
|
||||
ueye.IS_DR_DEVICE_OUT_OF_MEMORY: "Driver device out of memory",
|
||||
ueye.IS_DR_CANNOT_CREATE_SURFACE: "Driver cannot create surface",
|
||||
ueye.IS_DR_CANNOT_CREATE_VERTEX_BUFFER: "Driver cannot create vertex buffer",
|
||||
ueye.IS_DR_CANNOT_CREATE_TEXTURE: "Driver cannot create texture",
|
||||
ueye.IS_DR_CANNOT_LOCK_OVERLAY_SURFACE: "Driver cannot lock overlay surface",
|
||||
ueye.IS_DR_CANNOT_UNLOCK_OVERLAY_SURFACE: "Driver cannot unlock overlay surface",
|
||||
ueye.IS_DR_CANNOT_GET_OVERLAY_DC: "Driver cannot get overlay DC",
|
||||
ueye.IS_DR_CANNOT_RELEASE_OVERLAY_DC: "Driver cannot release overlay DC",
|
||||
ueye.IS_DR_DEVICE_CAPS_INSUFFICIENT: "Driver device capabilities insufficient",
|
||||
ueye.IS_INCOMPATIBLE_SETTING: "Incompatible setting",
|
||||
ueye.IS_DR_NOT_ALLOWED_WHILE_DC_IS_ACTIVE: "Driver not allowed while DC is active",
|
||||
ueye.IS_DEVICE_ALREADY_PAIRED: "Device already paired",
|
||||
ueye.IS_SUBNETMASK_MISMATCH: "Subnet mask mismatch",
|
||||
ueye.IS_SUBNET_MISMATCH: "Subnet mismatch",
|
||||
ueye.IS_INVALID_IP_CONFIGURATION: "Invalid IP configuration",
|
||||
ueye.IS_DEVICE_NOT_COMPATIBLE: "Device not compatible",
|
||||
ueye.IS_NETWORK_FRAME_SIZE_INCOMPATIBLE: "Network frame size incompatible",
|
||||
ueye.IS_NETWORK_CONFIGURATION_INVALID: "Network configuration invalid",
|
||||
ueye.IS_ERROR_CPU_IDLE_STATES_CONFIGURATION: "CPU idle states configuration error",
|
||||
ueye.IS_DEVICE_BUSY: "Device busy",
|
||||
ueye.IS_SENSOR_INITIALIZATION_FAILED: "Sensor initialization failed",
|
||||
ueye.IS_IMAGE_BUFFER_NOT_DWORD_ALIGNED: "Image buffer not DWORD aligned",
|
||||
ueye.IS_SEQ_BUFFER_IS_LOCKED: "Sequence buffer is locked",
|
||||
ueye.IS_FILE_PATH_DOES_NOT_EXIST: "File path does not exist",
|
||||
ueye.IS_INVALID_WINDOW_HANDLE: "Invalid window handle",
|
||||
ueye.IS_INVALID_IMAGE_PARAMETER: "Invalid image parameter",
|
||||
ueye.IS_NO_SUCH_DEVICE: "No such device",
|
||||
ueye.IS_DEVICE_IN_USE: "Device in use",
|
||||
}
|
||||
|
||||
bits_per_pixel = {
|
||||
ueye.IS_CM_SENSOR_RAW8: 8,
|
||||
ueye.IS_CM_SENSOR_RAW10: 16,
|
||||
ueye.IS_CM_SENSOR_RAW12: 16,
|
||||
ueye.IS_CM_SENSOR_RAW16: 16,
|
||||
ueye.IS_CM_MONO8: 8,
|
||||
ueye.IS_CM_RGB8_PACKED: 24,
|
||||
ueye.IS_CM_BGR8_PACKED: 24,
|
||||
ueye.IS_CM_RGBA8_PACKED: 32,
|
||||
ueye.IS_CM_BGRA8_PACKED: 32,
|
||||
ueye.IS_CM_BGR10_PACKED: 32,
|
||||
ueye.IS_CM_RGB10_PACKED: 32,
|
||||
ueye.IS_CM_BGRA12_UNPACKED: 64,
|
||||
ueye.IS_CM_BGR12_UNPACKED: 48,
|
||||
ueye.IS_CM_BGRY8_PACKED: 32,
|
||||
ueye.IS_CM_BGR565_PACKED: 16,
|
||||
ueye.IS_CM_BGR5_PACKED: 16,
|
||||
ueye.IS_CM_UYVY_PACKED: 16,
|
||||
ueye.IS_CM_UYVY_MONO_PACKED: 16,
|
||||
ueye.IS_CM_UYVY_BAYER_PACKED: 16,
|
||||
ueye.IS_CM_CBYCRY_PACKED: 16,
|
||||
}
|
||||
|
||||
|
||||
def get_bits_per_pixel(color_mode):
|
||||
"""
|
||||
Returns the number of bits per pixel for the given color mode.
|
||||
"""
|
||||
if color_mode not in bits_per_pixel:
|
||||
raise UEyeException(f"Unknown color mode: {color_mode}")
|
||||
return bits_per_pixel[color_mode]
|
||||
|
||||
|
||||
class UEyeException(Exception):
|
||||
"""Custom exception for uEye errors."""
|
||||
|
||||
def __init__(self, error_code, called_from: str | None = None):
|
||||
self.error_code = error_code
|
||||
self.called_from = called_from if called_from is not None else ""
|
||||
|
||||
def __str__(self):
|
||||
if self.error_code in error_codes:
|
||||
return f"Exception: {error_codes[self.error_code]} raised in {self.called_from}."
|
||||
else:
|
||||
for att, val in ueye.__dict__.items():
|
||||
if (
|
||||
att[0:2] == "IS"
|
||||
and val == self.error_code
|
||||
and ("FAILED" in att or "INVALID" in att or "ERROR" in att or "NOT" in att)
|
||||
):
|
||||
return f"Exception: {str(self.error_code)} ({att} ? <value> {val}) raised in {self.called_from}."
|
||||
return f"Exception: {str(self.error_code)} raised in {self.called_from}."
|
||||
|
||||
|
||||
def check_error(error_code, called_from: str | None = None):
|
||||
"""
|
||||
Check an error code, and raise an error if adequate.
|
||||
"""
|
||||
if error_code != ueye.IS_SUCCESS:
|
||||
called_from = called_from if called_from is not None else ""
|
||||
raise UEyeException(error_code, called_from)
|
||||
@@ -3,226 +3,95 @@ import time
|
||||
|
||||
import numpy as np
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd import Device, Kind
|
||||
from ophyd_devices.interfaces.base_classes.psi_detector_base import (
|
||||
CustomDetectorMixin,
|
||||
PSIDetectorBase,
|
||||
)
|
||||
from ophyd_devices.sim.sim_signals import SetableSignal
|
||||
|
||||
try:
|
||||
from pyueye import ueye
|
||||
except ImportError:
|
||||
# The pyueye library is not installed or doesn't provide the necessary c libs
|
||||
ueye = None
|
||||
from ophyd import DeviceStatus, Kind, Signal, StatusBase
|
||||
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
|
||||
from ophyd_devices.utils.bec_signals import PreviewSignal
|
||||
|
||||
|
||||
class IDSCustomPrepare(CustomDetectorMixin):
|
||||
class ROISignal(Signal):
|
||||
"""
|
||||
Signal to handle the Region of Interest (ROI) for the IDS camera.
|
||||
It is a tuple of (x, y, width, height).
|
||||
"""
|
||||
|
||||
USER_ACCESS = ["pyueye"]
|
||||
pyueye = ueye
|
||||
|
||||
def __init__(self, *_args, parent: Device = None, **_kwargs) -> None:
|
||||
super().__init__(*_args, parent=parent, **_kwargs)
|
||||
self.ueye = ueye
|
||||
self.h_cam = None
|
||||
self.s_info = None
|
||||
self.data_thread = None
|
||||
self.thread_event = None
|
||||
|
||||
def on_connection_established(self):
|
||||
self.hCam = self.ueye.HIDS(
|
||||
self.parent.camera_ID
|
||||
) # 0: first available camera; 1-254: The camera with the specified camera ID
|
||||
self.sInfo = self.ueye.SENSORINFO()
|
||||
self.cInfo = self.ueye.CAMINFO()
|
||||
self.pcImageMemory = self.ueye.c_mem_p()
|
||||
self.MemID = self.ueye.int()
|
||||
self.rectAOI = self.ueye.IS_RECT()
|
||||
self.pitch = self.ueye.INT()
|
||||
self.nBitsPerPixel = self.ueye.INT(
|
||||
self.parent.bits_per_pixel
|
||||
) # 24: bits per pixel for color mode; take 8 bits per pixel for monochrome
|
||||
self.channels = (
|
||||
self.parent.channels
|
||||
) # 3: channels for color mode(RGB); take 1 channel for monochrome
|
||||
self.m_nColorMode = self.ueye.INT(
|
||||
self.parent.m_n_colormode
|
||||
) # Y8/RGB16/RGB24/REG32 (1 for our color cameras)
|
||||
self.bytes_per_pixel = int(self.nBitsPerPixel / 8)
|
||||
|
||||
# Starts the driver and establishes the connection to the camera
|
||||
nRet = self.ueye.is_InitCamera(self.hCam, None)
|
||||
if nRet != self.ueye.IS_SUCCESS:
|
||||
print("is_InitCamera ERROR")
|
||||
|
||||
# Reads out the data hard-coded in the non-volatile camera memory and writes it to the data structure that cInfo points to
|
||||
nRet = self.ueye.is_GetCameraInfo(self.hCam, self.cInfo)
|
||||
if nRet != self.ueye.IS_SUCCESS:
|
||||
print("is_GetCameraInfo ERROR")
|
||||
|
||||
# You can query additional information about the sensor type used in the camera
|
||||
nRet = self.ueye.is_GetSensorInfo(self.hCam, self.sInfo)
|
||||
if nRet != self.ueye.IS_SUCCESS:
|
||||
print("is_GetSensorInfo ERROR")
|
||||
|
||||
nRet = self.ueye.is_ResetToDefault(self.hCam)
|
||||
if nRet != self.ueye.IS_SUCCESS:
|
||||
print("is_ResetToDefault ERROR")
|
||||
|
||||
# Set display mode to DIB
|
||||
nRet = self.ueye.is_SetDisplayMode(self.hCam, self.ueye.IS_SET_DM_DIB)
|
||||
|
||||
# Set the right color mode
|
||||
if (
|
||||
int.from_bytes(self.sInfo.nColorMode.value, byteorder="big")
|
||||
== self.ueye.IS_COLORMODE_BAYER
|
||||
):
|
||||
# setup the color depth to the current windows setting
|
||||
self.ueye.is_GetColorDepth(self.hCam, self.nBitsPerPixel, self.m_nColorMode)
|
||||
bytes_per_pixel = int(self.nBitsPerPixel / 8)
|
||||
print("IS_COLORMODE_BAYER: ")
|
||||
print("\tm_nColorMode: \t\t", self.m_nColorMode)
|
||||
print("\tnBitsPerPixel: \t\t", self.nBitsPerPixel)
|
||||
print("\tbytes_per_pixel: \t\t", bytes_per_pixel)
|
||||
print()
|
||||
|
||||
elif (
|
||||
int.from_bytes(self.sInfo.nColorMode.value, byteorder="big")
|
||||
== self.ueye.IS_COLORMODE_CBYCRY
|
||||
):
|
||||
# for color camera models use RGB32 mode
|
||||
m_nColorMode = ueye.IS_CM_BGRA8_PACKED
|
||||
nBitsPerPixel = ueye.INT(32)
|
||||
bytes_per_pixel = int(self.nBitsPerPixel / 8)
|
||||
print("IS_COLORMODE_CBYCRY: ")
|
||||
print("\tm_nColorMode: \t\t", m_nColorMode)
|
||||
print("\tnBitsPerPixel: \t\t", nBitsPerPixel)
|
||||
print("\tbytes_per_pixel: \t\t", bytes_per_pixel)
|
||||
print()
|
||||
|
||||
elif (
|
||||
int.from_bytes(self.sInfo.nColorMode.value, byteorder="big")
|
||||
== self.ueye.IS_COLORMODE_MONOCHROME
|
||||
):
|
||||
# for color camera models use RGB32 mode
|
||||
m_nColorMode = self.ueye.IS_CM_MONO8
|
||||
nBitsPerPixel = self.ueye.INT(8)
|
||||
bytes_per_pixel = int(nBitsPerPixel / 8)
|
||||
print("IS_COLORMODE_MONOCHROME: ")
|
||||
print("\tm_nColorMode: \t\t", m_nColorMode)
|
||||
print("\tnBitsPerPixel: \t\t", nBitsPerPixel)
|
||||
print("\tbytes_per_pixel: \t\t", bytes_per_pixel)
|
||||
print()
|
||||
|
||||
else:
|
||||
# for monochrome camera models use Y8 mode
|
||||
m_nColorMode = self.ueye.IS_CM_MONO8
|
||||
nBitsPerPixel = self.ueye.INT(8)
|
||||
bytes_per_pixel = int(nBitsPerPixel / 8)
|
||||
print("else")
|
||||
|
||||
# Can be used to set the size and position of an "area of interest"(AOI) within an image
|
||||
nRet = self.ueye.is_AOI(
|
||||
self.hCam, ueye.IS_AOI_IMAGE_GET_AOI, self.rectAOI, self.ueye.sizeof(self.rectAOI)
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
name,
|
||||
roi: tuple | None = None,
|
||||
value=0,
|
||||
dtype=None,
|
||||
shape=None,
|
||||
timestamp=None,
|
||||
parent=None,
|
||||
labels=None,
|
||||
kind=Kind.hinted,
|
||||
tolerance=None,
|
||||
rtolerance=None,
|
||||
metadata=None,
|
||||
cl=None,
|
||||
attr_name="",
|
||||
):
|
||||
super().__init__(
|
||||
name=name,
|
||||
value=value,
|
||||
dtype=dtype,
|
||||
shape=shape,
|
||||
timestamp=timestamp,
|
||||
parent=parent,
|
||||
labels=labels,
|
||||
kind=kind,
|
||||
tolerance=tolerance,
|
||||
rtolerance=rtolerance,
|
||||
metadata=metadata,
|
||||
cl=cl,
|
||||
attr_name=attr_name,
|
||||
)
|
||||
if nRet != self.ueye.IS_SUCCESS:
|
||||
print("is_AOI ERROR")
|
||||
self.roi = roi
|
||||
|
||||
self.width = self.rectAOI.s32Width
|
||||
self.height = self.rectAOI.s32Height
|
||||
def get(self, **kwargs):
|
||||
image = self.parent.image_data.get().data
|
||||
if not isinstance(image, np.ndarray):
|
||||
return -1 # -1 if no valid image is available
|
||||
|
||||
# Prints out some information about the camera and the sensor
|
||||
print("Camera model:\t\t", self.sInfo.strSensorName.decode("utf-8"))
|
||||
print("Camera serial no.:\t", self.cInfo.SerNo.decode("utf-8"))
|
||||
print("Maximum image width:\t", self.width)
|
||||
print("Maximum image height:\t", self.height)
|
||||
print()
|
||||
|
||||
# ---------------------------------------------------------------------------------------------------------------------------------------
|
||||
|
||||
# Allocates an image memory for an image having its dimensions defined by width and height and its color depth defined by nBitsPerPixel
|
||||
nRet = self.ueye.is_AllocImageMem(
|
||||
self.hCam, self.width, self.height, self.nBitsPerPixel, self.pcImageMemory, self.MemID
|
||||
)
|
||||
if nRet != self.ueye.IS_SUCCESS:
|
||||
print("is_AllocImageMem ERROR")
|
||||
if self.roi is None:
|
||||
roi = (0, 0, image.shape[1], image.shape[0])
|
||||
else:
|
||||
# Makes the specified image memory the active memory
|
||||
nRet = self.ueye.is_SetImageMem(self.hCam, self.pcImageMemory, self.MemID)
|
||||
if nRet != self.ueye.IS_SUCCESS:
|
||||
print("is_SetImageMem ERROR")
|
||||
else:
|
||||
# Set the desired color mode
|
||||
nRet = self.ueye.is_SetColorMode(self.hCam, self.m_nColorMode)
|
||||
|
||||
# Activates the camera's live video mode (free run mode)
|
||||
nRet = self.ueye.is_CaptureVideo(self.hCam, self.ueye.IS_DONT_WAIT)
|
||||
if nRet != self.ueye.IS_SUCCESS:
|
||||
print("is_CaptureVideo ERROR")
|
||||
|
||||
# Enables the queue mode for existing image memory sequences
|
||||
nRet = self.ueye.is_InquireImageMem(
|
||||
self.hCam,
|
||||
self.pcImageMemory,
|
||||
self.MemID,
|
||||
self.width,
|
||||
self.height,
|
||||
self.nBitsPerPixel,
|
||||
self.pitch,
|
||||
)
|
||||
if nRet != self.ueye.IS_SUCCESS:
|
||||
print("is_InquireImageMem ERROR")
|
||||
else:
|
||||
print("Press q to leave the programm")
|
||||
startmeasureframerate = True
|
||||
Gain = False
|
||||
|
||||
# Start live mode of camera immediately
|
||||
self.parent.start_live_mode()
|
||||
|
||||
def _start_data_thread(self):
|
||||
self.thread_event = threading.Event()
|
||||
self.data_thread = threading.Thread(target=self._receive_data_from_camera, daemon=True)
|
||||
self.data_thread.start()
|
||||
|
||||
def _receive_data_from_camera(self):
|
||||
while not self.thread_event.is_set():
|
||||
|
||||
# In order to display the image in an OpenCV window we need to...
|
||||
# ...extract the data of our image memory
|
||||
array = ueye.get_data(
|
||||
self.pcImageMemory,
|
||||
self.width,
|
||||
self.height,
|
||||
self.nBitsPerPixel,
|
||||
self.pitch,
|
||||
copy=False,
|
||||
)
|
||||
|
||||
# bytes_per_pixel = int(nBitsPerPixel / 8)
|
||||
|
||||
# ...reshape it in an numpy array...
|
||||
frame = np.reshape(array, (self.height.value, self.width.value, self.bytes_per_pixel))
|
||||
self.parent.image_data.put(frame)
|
||||
self.parent._run_subs(sub_type=self.parent.SUB_MONITOR, value=frame)
|
||||
|
||||
time.sleep(0.1)
|
||||
|
||||
def on_trigger(self):
|
||||
pass
|
||||
# self.parent._run_subs(sub_type=self.parent.SUB_MONITOR, value=self.parent.image_data.get())
|
||||
roi = self.roi
|
||||
if len(image.shape) > 2:
|
||||
image = np.sum(image, axis=2) # Convert to grayscale if it's a color image
|
||||
return np.sum(image[roi[1] : roi[1] + roi[3], roi[0] : roi[0] + roi[2]], (0, 1))
|
||||
|
||||
|
||||
class IDSCamera(PSIDetectorBase):
|
||||
USER_ACCESS = ["start_live_mode", "stop_live_mode"]
|
||||
class IDSCamera(PSIDeviceBase):
|
||||
""" "
|
||||
#---------------------------------------------------------------------------------------------------------------------------------------
|
||||
|
||||
custom_prepare_cls = IDSCustomPrepare
|
||||
#Variables
|
||||
hCam = ueye.HIDS(202) #0: first available camera; 1-254: The camera with the specified camera ID
|
||||
sInfo = ueye.SENSORINFO()
|
||||
cInfo = ueye.CAMINFO()
|
||||
pcImageMemory = ueye.c_mem_p()
|
||||
MemID = ueye.int()
|
||||
rectAOI = ueye.IS_RECT()
|
||||
pitch = ueye.INT()
|
||||
nBitsPerPixel = ueye.INT(24) #24: bits per pixel for color mode; take 8 bits per pixel for monochrome
|
||||
channels = 3 #3: channels for color mode(RGB); take 1 channel for monochrome
|
||||
m_nColorMode = ueye.INT(1) # Y8/RGB16/RGB24/REG32 (1 for our color cameras)
|
||||
bytes_per_pixel = int(nBitsPerPixel / 8)
|
||||
|
||||
image_data = Cpt(SetableSignal, value=np.empty((100, 100)), kind=Kind.omitted)
|
||||
ids_cam
|
||||
...
|
||||
"""
|
||||
|
||||
SUB_MONITOR = "device_monitor_2d"
|
||||
_default_sub = SUB_MONITOR
|
||||
USER_ACCESS = ["start_live_mode", "stop_live_mode", "set_roi", "width", "height"]
|
||||
|
||||
image_data = Cpt(PreviewSignal, ndim=2, kind=Kind.omitted)
|
||||
# roi_bot_left = Cpt(ROISignal, roi=(400, 525, 118, 105), kind=Kind.normal)
|
||||
# roi_bot_right = Cpt(ROISignal, roi=(518, 525, 118, 105), kind=Kind.normal)
|
||||
# roi_top_left = Cpt(ROISignal, roi=(400, 630, 118, 105), kind=Kind.normal)
|
||||
# roi_top_right = Cpt(ROISignal, roi=(518, 630, 118, 105), kind=Kind.normal)
|
||||
# roi_signal = Cpt(ROISignal, kind=Kind.normal, doc="Region of Interest signal")
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@@ -234,19 +103,221 @@ class IDSCamera(PSIDetectorBase):
|
||||
channels: int,
|
||||
m_n_colormode: int,
|
||||
kind=None,
|
||||
parent=None,
|
||||
device_manager=None,
|
||||
**kwargs,
|
||||
):
|
||||
|
||||
super().__init__(
|
||||
prefix=prefix, name=name, kind=kind, parent=parent, device_manager=device_manager, **kwargs
|
||||
prefix=prefix, name=name, kind=kind, device_manager=device_manager, **kwargs
|
||||
)
|
||||
self.camera_ID = camera_ID
|
||||
self.bits_per_pixel = bits_per_pixel
|
||||
self.bytes_per_pixel = None
|
||||
self.channels = channels
|
||||
self.m_n_colormode = m_n_colormode
|
||||
#TODO fix connected and wait_for_connection
|
||||
self.custom_prepare.on_connection_established()
|
||||
self._m_n_colormode_input = m_n_colormode
|
||||
self.m_n_colormode = None
|
||||
self.ueye = ueye
|
||||
self.h_cam = None
|
||||
self.s_info = None
|
||||
self.data_thread = None
|
||||
self.c_info = None
|
||||
self.pc_image_memory = None
|
||||
self.mem_id = None
|
||||
self.rect_aoi = None
|
||||
self.pitch = None
|
||||
self.n_bits_per_pixel = None
|
||||
self.width = None
|
||||
self.height = None
|
||||
self.thread_event = threading.Event()
|
||||
self.data_thread = None
|
||||
self._roi: tuple | None = None # x, y, width, height
|
||||
|
||||
def set_roi(self, x: int, y: int, width: int, height: int):
|
||||
self._roi = (x, y, width, height)
|
||||
|
||||
def start_backend(self):
|
||||
if self.ueye is None:
|
||||
raise ImportError("The pyueye library is not installed.")
|
||||
self.h_cam = self.ueye.HIDS(
|
||||
self.camera_ID
|
||||
) # 0: first available camera; 1-254: The camera with the specified camera ID
|
||||
self.s_info = self.ueye.SENSORINFO()
|
||||
self.c_info = self.ueye.CAMINFO()
|
||||
self.pc_image_memory = self.ueye.c_mem_p()
|
||||
self.mem_id = self.ueye.int()
|
||||
self.rect_aoi = self.ueye.IS_RECT()
|
||||
self.pitch = self.ueye.INT()
|
||||
self.n_bits_per_pixel = self.ueye.INT(
|
||||
self.bits_per_pixel
|
||||
) # 24: bits per pixel for color mode; take 8 bits per pixel for monochrome
|
||||
self.m_n_colormode = self.ueye.INT(
|
||||
self._m_n_colormode_input
|
||||
) # Y8/RGB16/RGB24/REG32 (1 for our color cameras)
|
||||
self.bytes_per_pixel = int(self.n_bits_per_pixel / 8)
|
||||
|
||||
# Starts the driver and establishes the connection to the camera
|
||||
ret = self.ueye.is_InitCamera(self.h_cam, None)
|
||||
if ret != self.ueye.IS_SUCCESS:
|
||||
print("is_InitCamera ERROR")
|
||||
|
||||
# Reads out the data hard-coded in the non-volatile camera memory and writes it to the data structure that c_info points to
|
||||
ret = self.ueye.is_GetCameraInfo(self.h_cam, self.c_info)
|
||||
if ret != self.ueye.IS_SUCCESS:
|
||||
print("is_GetCameraInfo ERROR")
|
||||
|
||||
# You can query additional information about the sensor type used in the camera
|
||||
ret = self.ueye.is_GetSensorInfo(self.h_cam, self.s_info)
|
||||
if ret != self.ueye.IS_SUCCESS:
|
||||
print("is_GetSensorInfo ERROR")
|
||||
|
||||
ret = self.ueye.is_ResetToDefault(self.h_cam)
|
||||
if ret != self.ueye.IS_SUCCESS:
|
||||
print("is_ResetToDefault ERROR")
|
||||
|
||||
# Set display mode to DIB
|
||||
ret = self.ueye.is_SetDisplayMode(self.h_cam, self.ueye.IS_SET_DM_DIB)
|
||||
|
||||
# Set the right color mode
|
||||
if (
|
||||
int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
|
||||
== self.ueye.IS_COLORMODE_BAYER
|
||||
):
|
||||
# setup the color depth to the current windows setting
|
||||
self.ueye.is_GetColorDepth(self.h_cam, self.n_bits_per_pixel, self.m_n_colormode)
|
||||
bytes_per_pixel = int(self.n_bits_per_pixel / 8)
|
||||
print("IS_COLORMODE_BAYER: ")
|
||||
print("\tm_n_colormode: \t\t", self.m_n_colormode)
|
||||
print("\tn_bits_per_pixel: \t\t", self.n_bits_per_pixel)
|
||||
print("\tbytes_per_pixel: \t\t", bytes_per_pixel)
|
||||
print()
|
||||
|
||||
elif (
|
||||
int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
|
||||
== self.ueye.IS_COLORMODE_CBYCRY
|
||||
):
|
||||
# for color camera models use RGB32 mode
|
||||
m_n_colormode = self.ueye.IS_CM_BGRA8_PACKED
|
||||
n_bits_per_pixel = self.ueye.INT(32)
|
||||
bytes_per_pixel = int(self.n_bits_per_pixel / 8)
|
||||
print("IS_COLORMODE_CBYCRY: ")
|
||||
print("\tm_n_colormode: \t\t", m_n_colormode)
|
||||
print("\tn_bits_per_pixel: \t\t", n_bits_per_pixel)
|
||||
print("\tbytes_per_pixel: \t\t", bytes_per_pixel)
|
||||
print()
|
||||
|
||||
elif (
|
||||
int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
|
||||
== self.ueye.IS_COLORMODE_MONOCHROME
|
||||
):
|
||||
# for color camera models use RGB32 mode
|
||||
m_n_colormode = self.ueye.IS_CM_MONO8
|
||||
n_bits_per_pixel = self.ueye.INT(8)
|
||||
bytes_per_pixel = int(n_bits_per_pixel / 8)
|
||||
print("IS_COLORMODE_MONOCHROME: ")
|
||||
print("\tm_n_colormode: \t\t", m_n_colormode)
|
||||
print("\tn_bits_per_pixel: \t\t", n_bits_per_pixel)
|
||||
print("\tbytes_per_pixel: \t\t", bytes_per_pixel)
|
||||
print()
|
||||
|
||||
else:
|
||||
# for monochrome camera models use Y8 mode
|
||||
m_n_colormode = self.ueye.IS_CM_MONO8
|
||||
n_bits_per_pixel = self.ueye.INT(8)
|
||||
bytes_per_pixel = int(n_bits_per_pixel / 8)
|
||||
print("else")
|
||||
|
||||
# Can be used to set the size and position of an "area of interest"(AOI) within an image
|
||||
ret = self.ueye.is_AOI(
|
||||
self.h_cam,
|
||||
self.ueye.IS_AOI_IMAGE_GET_AOI,
|
||||
self.rect_aoi,
|
||||
self.ueye.sizeof(self.rect_aoi),
|
||||
)
|
||||
if ret != self.ueye.IS_SUCCESS:
|
||||
print("is_AOI ERROR")
|
||||
|
||||
self.width = self.rect_aoi.s32Width
|
||||
self.height = self.rect_aoi.s32Height
|
||||
|
||||
# Prints out some information about the camera and the sensor
|
||||
print("Camera model:\t\t", self.s_info.strSensorName.decode("utf-8"))
|
||||
print("Camera serial no.:\t", self.c_info.SerNo.decode("utf-8"))
|
||||
print("Maximum image width:\t", self.width)
|
||||
print("Maximum image height:\t", self.height)
|
||||
print()
|
||||
|
||||
# ---------------------------------------------------------------------------------------------------------------------------------------
|
||||
|
||||
# Allocates an image memory for an image having its dimensions defined by width and height and its color depth defined by n_bits_per_pixel
|
||||
ret = self.ueye.is_AllocImageMem(
|
||||
self.h_cam,
|
||||
self.width,
|
||||
self.height,
|
||||
self.n_bits_per_pixel,
|
||||
self.pc_image_memory,
|
||||
self.mem_id,
|
||||
)
|
||||
if ret != self.ueye.IS_SUCCESS:
|
||||
print("is_AllocImageMem ERROR")
|
||||
else:
|
||||
# Makes the specified image memory the active memory
|
||||
ret = self.ueye.is_SetImageMem(self.h_cam, self.pc_image_memory, self.mem_id)
|
||||
if ret != self.ueye.IS_SUCCESS:
|
||||
print("is_SetImageMem ERROR")
|
||||
else:
|
||||
# Set the desired color mode
|
||||
ret = self.ueye.is_SetColorMode(self.h_cam, self.m_n_colormode)
|
||||
|
||||
# Activates the camera's live video mode (free run mode)
|
||||
ret = self.ueye.is_CaptureVideo(self.h_cam, self.ueye.IS_DONT_WAIT)
|
||||
if ret != self.ueye.IS_SUCCESS:
|
||||
print("is_CaptureVideo ERROR")
|
||||
|
||||
# Enables the queue mode for existing image memory sequences
|
||||
ret = self.ueye.is_InquireImageMem(
|
||||
self.h_cam,
|
||||
self.pc_image_memory,
|
||||
self.mem_id,
|
||||
self.width,
|
||||
self.height,
|
||||
self.n_bits_per_pixel,
|
||||
self.pitch,
|
||||
)
|
||||
if ret != self.ueye.IS_SUCCESS:
|
||||
print("is_InquireImageMem ERROR")
|
||||
else:
|
||||
print("Press q to leave the programm")
|
||||
# startmeasureframerate = True
|
||||
# Gain = False
|
||||
|
||||
# Start live mode of camera immediately
|
||||
self.start_live_mode()
|
||||
|
||||
def _start_data_thread(self):
|
||||
self.data_thread = threading.Thread(target=self._receive_data_from_camera, daemon=True)
|
||||
self.data_thread.start()
|
||||
|
||||
def _receive_data_from_camera(self):
|
||||
while not self.thread_event.is_set():
|
||||
if self.ueye is None:
|
||||
print("pyueye library not available.")
|
||||
return
|
||||
# In order to display the image in an OpenCV window we need to...
|
||||
# ...extract the data of our image memory
|
||||
array = self.ueye.get_data(
|
||||
self.pc_image_memory,
|
||||
self.width,
|
||||
self.height,
|
||||
self.n_bits_per_pixel,
|
||||
self.pitch,
|
||||
copy=False,
|
||||
)
|
||||
|
||||
# ...reshape it in an numpy array...
|
||||
frame = np.reshape(array, (self.height.value, self.width.value, self.bytes_per_pixel))
|
||||
self.image_data.put(frame)
|
||||
|
||||
time.sleep(0.1)
|
||||
|
||||
def wait_for_connection(self, all_signals=False, timeout=10):
|
||||
if ueye is None:
|
||||
@@ -254,226 +325,73 @@ class IDSCamera(PSIDetectorBase):
|
||||
"The pyueye library is not installed or doesn't provide the necessary c libs"
|
||||
)
|
||||
super().wait_for_connection(all_signals, timeout)
|
||||
#self.custom_prepare.on_connection_established()
|
||||
|
||||
def destroy(self):
|
||||
"""Extend Ophyds destroy function to kill the data thread"""
|
||||
self.stop_live_mode()
|
||||
super().destroy()
|
||||
|
||||
def start_live_mode(self):
|
||||
if self.custom_prepare.data_thread is not None:
|
||||
if self.data_thread is not None:
|
||||
self.stop_live_mode()
|
||||
self.custom_prepare._start_data_thread()
|
||||
self._start_data_thread()
|
||||
|
||||
def stop_live_mode(self):
|
||||
"""Stopping the camera live mode."""
|
||||
if self.custom_prepare.thread_event is not None:
|
||||
self.custom_prepare.thread_event.set()
|
||||
if self.custom_prepare.data_thread is not None:
|
||||
self.custom_prepare.data_thread.join()
|
||||
self.custom_prepare.thread_event = None
|
||||
self.custom_prepare.data_thread = None
|
||||
self.thread_event.set()
|
||||
if self.data_thread is not None:
|
||||
self.data_thread.join()
|
||||
self.thread_event.clear()
|
||||
self.data_thread = None
|
||||
|
||||
########################################
|
||||
# Beamline Specific Implementations #
|
||||
########################################
|
||||
|
||||
def on_init(self) -> None:
|
||||
"""
|
||||
Called when the device is initialized.
|
||||
|
||||
No signals are connected at this point. If you like to
|
||||
set default values on signals, please use on_connected instead.
|
||||
"""
|
||||
|
||||
def on_connected(self) -> None:
|
||||
"""
|
||||
Called after the device is connected and its signals are connected.
|
||||
Default values for signals should be set here.
|
||||
"""
|
||||
self.start_backend()
|
||||
self.start_live_mode()
|
||||
|
||||
def on_stage(self) -> DeviceStatus | StatusBase | None:
|
||||
"""
|
||||
Called while staging the device.
|
||||
|
||||
Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object.
|
||||
"""
|
||||
|
||||
def on_unstage(self) -> DeviceStatus | StatusBase | None:
|
||||
"""Called while unstaging the device."""
|
||||
|
||||
def on_pre_scan(self) -> DeviceStatus | StatusBase | None:
|
||||
"""Called right before the scan starts on all devices automatically."""
|
||||
|
||||
def on_trigger(self) -> DeviceStatus | StatusBase | None:
|
||||
"""Called when the device is triggered."""
|
||||
|
||||
def on_complete(self) -> DeviceStatus | StatusBase | None:
|
||||
"""Called to inquire if a device has completed a scans."""
|
||||
|
||||
def on_kickoff(self) -> DeviceStatus | StatusBase | None:
|
||||
"""Called to kickoff a device for a fly scan. Has to be called explicitly."""
|
||||
|
||||
def on_stop(self) -> None:
|
||||
"""Called when the device is stopped."""
|
||||
|
||||
def on_destroy(self) -> None:
|
||||
"""Called when the device is destroyed. Cleanup resources here."""
|
||||
self.stop_live_mode()
|
||||
|
||||
|
||||
"""from pyueye import ueye
|
||||
import numpy as np
|
||||
import cv2
|
||||
import sys
|
||||
import time
|
||||
if __name__ == "__main__":
|
||||
# Example usage
|
||||
camera = IDSCamera(name="camera", camera_ID=201, bits_per_pixel=24, channels=3, m_n_colormode=1)
|
||||
camera.wait_for_connection()
|
||||
|
||||
#---------------------------------------------------------------------------------------------------------------------------------------
|
||||
|
||||
#Variables
|
||||
hCam = ueye.HIDS(202) #0: first available camera; 1-254: The camera with the specified camera ID
|
||||
sInfo = ueye.SENSORINFO()
|
||||
cInfo = ueye.CAMINFO()
|
||||
pcImageMemory = ueye.c_mem_p()
|
||||
MemID = ueye.int()
|
||||
rectAOI = ueye.IS_RECT()
|
||||
pitch = ueye.INT()
|
||||
nBitsPerPixel = ueye.INT(24) #24: bits per pixel for color mode; take 8 bits per pixel for monochrome
|
||||
channels = 3 #3: channels for color mode(RGB); take 1 channel for monochrome
|
||||
m_nColorMode = ueye.INT(1) # Y8/RGB16/RGB24/REG32 (1 for our color cameras)
|
||||
bytes_per_pixel = int(nBitsPerPixel / 8)
|
||||
|
||||
ids_cam
|
||||
...
|
||||
deviceConfig:
|
||||
camera_ID: 202
|
||||
bits_per_pixel: 24
|
||||
channels: 3
|
||||
m_n_colormode: 1
|
||||
|
||||
#---------------------------------------------------------------------------------------------------------------------------------------
|
||||
print("START")
|
||||
print()
|
||||
|
||||
# Starts the driver and establishes the connection to the camera
|
||||
nRet = ueye.is_InitCamera(hCam, None)
|
||||
if nRet != ueye.IS_SUCCESS:
|
||||
print("is_InitCamera ERROR")
|
||||
|
||||
# Reads out the data hard-coded in the non-volatile camera memory and writes it to the data structure that cInfo points to
|
||||
nRet = ueye.is_GetCameraInfo(hCam, cInfo)
|
||||
if nRet != ueye.IS_SUCCESS:
|
||||
print("is_GetCameraInfo ERROR")
|
||||
|
||||
# You can query additional information about the sensor type used in the camera
|
||||
nRet = ueye.is_GetSensorInfo(hCam, sInfo)
|
||||
if nRet != ueye.IS_SUCCESS:
|
||||
print("is_GetSensorInfo ERROR")
|
||||
|
||||
nRet = ueye.is_ResetToDefault( hCam)
|
||||
if nRet != ueye.IS_SUCCESS:
|
||||
print("is_ResetToDefault ERROR")
|
||||
|
||||
# Set display mode to DIB
|
||||
nRet = ueye.is_SetDisplayMode(hCam, ueye.IS_SET_DM_DIB)
|
||||
|
||||
|
||||
|
||||
# Set the right color mode
|
||||
if int.from_bytes(sInfo.nColorMode.value, byteorder='big') == ueye.IS_COLORMODE_BAYER:
|
||||
# setup the color depth to the current windows setting
|
||||
ueye.is_GetColorDepth(hCam, nBitsPerPixel, m_nColorMode)
|
||||
bytes_per_pixel = int(nBitsPerPixel / 8)
|
||||
print("IS_COLORMODE_BAYER: ", )
|
||||
print("\tm_nColorMode: \t\t", m_nColorMode)
|
||||
print("\tnBitsPerPixel: \t\t", nBitsPerPixel)
|
||||
print("\tbytes_per_pixel: \t\t", bytes_per_pixel)
|
||||
print()
|
||||
|
||||
elif int.from_bytes(sInfo.nColorMode.value, byteorder='big') == ueye.IS_COLORMODE_CBYCRY:
|
||||
# for color camera models use RGB32 mode
|
||||
m_nColorMode = ueye.IS_CM_BGRA8_PACKED
|
||||
nBitsPerPixel = ueye.INT(32)
|
||||
bytes_per_pixel = int(nBitsPerPixel / 8)
|
||||
print("IS_COLORMODE_CBYCRY: ", )
|
||||
print("\tm_nColorMode: \t\t", m_nColorMode)
|
||||
print("\tnBitsPerPixel: \t\t", nBitsPerPixel)
|
||||
print("\tbytes_per_pixel: \t\t", bytes_per_pixel)
|
||||
print()
|
||||
|
||||
elif int.from_bytes(sInfo.nColorMode.value, byteorder='big') == ueye.IS_COLORMODE_MONOCHROME:
|
||||
# for color camera models use RGB32 mode
|
||||
m_nColorMode = ueye.IS_CM_MONO8
|
||||
nBitsPerPixel = ueye.INT(8)
|
||||
bytes_per_pixel = int(nBitsPerPixel / 8)
|
||||
print("IS_COLORMODE_MONOCHROME: ", )
|
||||
print("\tm_nColorMode: \t\t", m_nColorMode)
|
||||
print("\tnBitsPerPixel: \t\t", nBitsPerPixel)
|
||||
print("\tbytes_per_pixel: \t\t", bytes_per_pixel)
|
||||
print()
|
||||
|
||||
else:
|
||||
# for monochrome camera models use Y8 mode
|
||||
m_nColorMode = ueye.IS_CM_MONO8
|
||||
nBitsPerPixel = ueye.INT(8)
|
||||
bytes_per_pixel = int(nBitsPerPixel / 8)
|
||||
print("else")
|
||||
|
||||
# Can be used to set the size and position of an "area of interest"(AOI) within an image
|
||||
nRet = ueye.is_AOI(hCam, ueye.IS_AOI_IMAGE_GET_AOI, rectAOI, ueye.sizeof(rectAOI))
|
||||
if nRet != ueye.IS_SUCCESS:
|
||||
print("is_AOI ERROR")
|
||||
|
||||
width = rectAOI.s32Width
|
||||
height = rectAOI.s32Height
|
||||
|
||||
# Prints out some information about the camera and the sensor
|
||||
print("Camera model:\t\t", sInfo.strSensorName.decode('utf-8'))
|
||||
print("Camera serial no.:\t", cInfo.SerNo.decode('utf-8'))
|
||||
print("Maximum image width:\t", width)
|
||||
print("Maximum image height:\t", height)
|
||||
print()
|
||||
|
||||
#---------------------------------------------------------------------------------------------------------------------------------------
|
||||
|
||||
# Allocates an image memory for an image having its dimensions defined by width and height and its color depth defined by nBitsPerPixel
|
||||
nRet = ueye.is_AllocImageMem(hCam, width, height, nBitsPerPixel, pcImageMemory, MemID)
|
||||
if nRet != ueye.IS_SUCCESS:
|
||||
print("is_AllocImageMem ERROR")
|
||||
else:
|
||||
# Makes the specified image memory the active memory
|
||||
nRet = ueye.is_SetImageMem(hCam, pcImageMemory, MemID)
|
||||
if nRet != ueye.IS_SUCCESS:
|
||||
print("is_SetImageMem ERROR")
|
||||
else:
|
||||
# Set the desired color mode
|
||||
nRet = ueye.is_SetColorMode(hCam, m_nColorMode)
|
||||
|
||||
|
||||
|
||||
# Activates the camera's live video mode (free run mode)
|
||||
nRet = ueye.is_CaptureVideo(hCam, ueye.IS_DONT_WAIT)
|
||||
if nRet != ueye.IS_SUCCESS:
|
||||
print("is_CaptureVideo ERROR")
|
||||
|
||||
# Enables the queue mode for existing image memory sequences
|
||||
nRet = ueye.is_InquireImageMem(hCam, pcImageMemory, MemID, width, height, nBitsPerPixel, pitch)
|
||||
if nRet != ueye.IS_SUCCESS:
|
||||
print("is_InquireImageMem ERROR")
|
||||
else:
|
||||
print("Press q to leave the programm")
|
||||
startmeasureframerate=True
|
||||
Gain = False
|
||||
#---------------------------------------------------------------------------------------------------------------------------------------
|
||||
|
||||
# Continuous image display
|
||||
while(nRet == ueye.IS_SUCCESS):
|
||||
|
||||
# In order to display the image in an OpenCV window we need to...
|
||||
# ...extract the data of our image memory
|
||||
array = ueye.get_data(pcImageMemory, width, height, nBitsPerPixel, pitch, copy=False)
|
||||
|
||||
# bytes_per_pixel = int(nBitsPerPixel / 8)
|
||||
|
||||
# ...reshape it in an numpy array...
|
||||
frame = np.reshape(array,(height.value, width.value, bytes_per_pixel))
|
||||
|
||||
# ...resize the image by a half
|
||||
frame = cv2.resize(frame,(0,0),fx=0.5, fy=0.5)
|
||||
|
||||
#---------------------------------------------------------------------------------------------------------------------------------------
|
||||
#Include image data processing here
|
||||
|
||||
#---------------------------------------------------------------------------------------------------------------------------------------
|
||||
|
||||
#...and finally display it
|
||||
cv2.imshow("SimpleLive_Python_uEye_OpenCV", frame)
|
||||
if startmeasureframerate:
|
||||
starttime = time.time()
|
||||
startmeasureframerate=False
|
||||
framenumber=0
|
||||
if time.time() > starttime+5:
|
||||
print(f"Caught {framenumber/5} frames per second")
|
||||
startmeasureframerate=True
|
||||
Gain = ~Gain
|
||||
if Gain:
|
||||
nRet = ueye.is_SetGainBoost(hCam, 1)
|
||||
else:
|
||||
nRet = ueye.is_SetGainBoost(hCam, 0)
|
||||
print(f"Gain setting status {nRet}")
|
||||
#...and finally display it
|
||||
cv2.imshow("SimpleLive_Python_uEye_OpenCV", frame)
|
||||
framenumber+=1
|
||||
time.sleep(0.1)
|
||||
|
||||
# Press q if you want to end the loop
|
||||
if (cv2.waitKey(1) & 0xFF) == ord('q'):
|
||||
break
|
||||
#---------------------------------------------------------------------------------------------------------------------------------------
|
||||
|
||||
# Releases an image memory that was allocated using is_AllocImageMem() and removes it from the driver management
|
||||
ueye.is_FreeImageMem(hCam, pcImageMemory, MemID)
|
||||
|
||||
# Disables the hCam camera handle and releases the data structures and memory areas taken up by the uEye camera
|
||||
ueye.is_ExitCamera(hCam)
|
||||
|
||||
# Destroys the OpenCv windows
|
||||
cv2.destroyAllWindows()
|
||||
|
||||
print()
|
||||
print("END")
|
||||
"""
|
||||
camera.on_destroy()
|
||||
|
||||
285
csaxs_bec/devices/ids_cameras/ids_camera_new.py
Normal file
285
csaxs_bec/devices/ids_cameras/ids_camera_new.py
Normal file
@@ -0,0 +1,285 @@
|
||||
"""IDS Camera class for cSAXS IDS cameras."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import threading
|
||||
import time
|
||||
from typing import TYPE_CHECKING, Literal, Tuple, TypedDict
|
||||
|
||||
import numpy as np
|
||||
from bec_lib import messages
|
||||
from bec_lib.logger import bec_logger
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
|
||||
from ophyd_devices.utils.bec_signals import AsyncSignal, PreviewSignal
|
||||
from pydantic import BaseModel, field_validator
|
||||
|
||||
from csaxs_bec.devices.ids_cameras.base_integration.camera import Camera
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from bec_lib.devicemanager import ScanInfo
|
||||
from pydantic import ValidationInfo
|
||||
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
|
||||
AnyDimShape = Tuple[int, ...]
|
||||
|
||||
|
||||
class ROISpec(TypedDict):
|
||||
"""
|
||||
Typed dictionary representing the specification for a Region of Interest (ROI).
|
||||
"""
|
||||
|
||||
x: int
|
||||
y: int
|
||||
width: int
|
||||
height: int
|
||||
img_shape: AnyDimShape
|
||||
mask: np.ndarray
|
||||
|
||||
|
||||
def validate_roi(roi: ROISpec) -> ROISpec:
|
||||
"""
|
||||
Validate the ROI specification to ensure it matches the image shape.
|
||||
|
||||
Args:
|
||||
roi (ROISpec): The ROI specification to validate.
|
||||
|
||||
Returns:
|
||||
ROISpec: The validated ROI specification.
|
||||
"""
|
||||
if roi["mask"].shape != roi["img_shape"]:
|
||||
raise ValueError(
|
||||
f"Mask shape {roi['mask'].shape} does not match image shape {roi['img_shape']}."
|
||||
)
|
||||
return roi
|
||||
|
||||
|
||||
class IDSCamera(PSIDeviceBase):
|
||||
"""IDS Camera class for cSAXS.
|
||||
|
||||
This class inherits from PSIDeviceBase and implements the necessary methods
|
||||
to interact with the IDS camera using the pyueye library.
|
||||
"""
|
||||
|
||||
image = Cpt(PreviewSignal, name="image", ndim=2, doc="Preview signal for the camera.")
|
||||
roi_signal = Cpt(
|
||||
AsyncSignal,
|
||||
name="roi_signal",
|
||||
ndim=0,
|
||||
max_size=1000,
|
||||
doc="Signal for the region of interest (ROI).",
|
||||
async_update={"type": "add", "max_shape": [None]},
|
||||
)
|
||||
|
||||
USER_ACCESS = ["live_mode", "roi", "get_last_image"]
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
name: str,
|
||||
camera_id: int,
|
||||
prefix: str = "",
|
||||
scan_info: ScanInfo | None = None,
|
||||
m_n_colormode: Literal[0, 1, 2, 3] = 1,
|
||||
bits_per_pixel: Literal[8, 24] = 24,
|
||||
live_mode: bool = False,
|
||||
roi: tuple[int, int, int, int] | None = None,
|
||||
**kwargs,
|
||||
):
|
||||
"""Initialize the IDS Camera.
|
||||
|
||||
Args:
|
||||
name (str): Name of the device.
|
||||
camera_id (int): The ID of the camera device.
|
||||
prefix (str): Prefix for the device.
|
||||
scan_info (ScanInfo | None): Scan information for the device.
|
||||
m_n_colormode (Literal[0, 1, 2, 3]): Color mode for the camera.
|
||||
bits_per_pixel (Literal[8, 24]): Number of bits per pixel for the camera.
|
||||
live_mode (bool): Whether to enable live mode for the camera.
|
||||
"""
|
||||
super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs)
|
||||
self._live_mode_thread: threading.Thread | None = None
|
||||
self._stop_live_mode_event: threading.Event = threading.Event()
|
||||
self.cam = Camera(
|
||||
camera_id=camera_id,
|
||||
m_n_colormode=m_n_colormode,
|
||||
bits_per_pixel=bits_per_pixel,
|
||||
connect=False,
|
||||
)
|
||||
self._live_mode = False
|
||||
self._inputs = {"roi": roi if roi else (0, 0, 1, 1), "live_mode": live_mode}
|
||||
self._roi: ROISpec = validate_roi(
|
||||
ROISpec(
|
||||
{
|
||||
"x": 0,
|
||||
"y": 0,
|
||||
"width": 1,
|
||||
"height": 1,
|
||||
"img_shape": (1, 1),
|
||||
"mask": np.zeros((1, 1), dtype=np.uint8),
|
||||
}
|
||||
)
|
||||
)
|
||||
|
||||
############## Live Mode Methods ##############
|
||||
|
||||
@property
|
||||
def roi(self) -> ROISpec:
|
||||
"""Return the current region of interest (ROI) for the camera."""
|
||||
return self._roi
|
||||
|
||||
@roi.setter
|
||||
def roi(self, value: ROISpec | tuple[int, int, int, int] | list[int, int, int, int]):
|
||||
"""
|
||||
Set the region of interest (ROI) for the camera.
|
||||
|
||||
Args:
|
||||
value (ROI | tuple[int, int, int, int] | list[int, int, int, int]): Either an ROI object, or a tuple or list with x, y, width, and height.
|
||||
"""
|
||||
if isinstance(value, (tuple, list)) and len(value) == 4:
|
||||
x = value[0]
|
||||
y = value[1]
|
||||
width = value[2]
|
||||
height = value[3]
|
||||
if x + width > self.cam.cam.width.value or y + height > self.cam.cam.height.value:
|
||||
raise ValueError("ROI exceeds camera dimensions.")
|
||||
img_shape = (self.cam.cam.height.value, self.cam.cam.width.value)
|
||||
mask = np.zeros(img_shape, dtype=np.uint8)
|
||||
mask[y : y + height, x : x + width] = 1
|
||||
value = validate_roi(
|
||||
ROISpec(
|
||||
{
|
||||
"x": x,
|
||||
"y": y,
|
||||
"width": width,
|
||||
"height": height,
|
||||
"img_shape": img_shape,
|
||||
"mask": mask,
|
||||
}
|
||||
)
|
||||
)
|
||||
if not isinstance(value, dict) or not all(
|
||||
key in value for key in ["x", "y", "width", "height", "img_shape", "mask"]
|
||||
):
|
||||
raise TypeError(f"ROI must be an instance of ROISpec {value}.")
|
||||
self._roi = value
|
||||
|
||||
@property
|
||||
def live_mode(self) -> bool:
|
||||
"""Return whether the camera is in live mode."""
|
||||
return self._live_mode
|
||||
|
||||
@live_mode.setter
|
||||
def live_mode(self, value: bool):
|
||||
"""Set the live mode for the camera."""
|
||||
if value != self._live_mode:
|
||||
if self.cam._connected is False: # $ pylint: disable=protected-access
|
||||
self.cam.on_connect()
|
||||
self._live_mode = value
|
||||
if value:
|
||||
self._start_live()
|
||||
else:
|
||||
self._stop_live()
|
||||
|
||||
def _start_live(self):
|
||||
"""Start the live mode for the camera."""
|
||||
if self._live_mode_thread is not None:
|
||||
logger.info("Live mode thread is already running.")
|
||||
return
|
||||
self._stop_live_mode_event.clear()
|
||||
self._live_mode_thread = threading.Thread(
|
||||
target=self._live_mode_loop, args=(self._stop_live_mode_event,)
|
||||
)
|
||||
self._live_mode_thread.start()
|
||||
|
||||
def _stop_live(self):
|
||||
"""Stop the live mode for the camera."""
|
||||
if self._live_mode_thread is None:
|
||||
logger.info("Live mode thread is not running.")
|
||||
return
|
||||
self._stop_live_mode_event.set()
|
||||
self._live_mode_thread.join(timeout=5)
|
||||
if self._live_mode_thread.is_alive():
|
||||
logger.warning("Live mode thread did not stop gracefully.")
|
||||
else:
|
||||
self._live_mode_thread = None
|
||||
logger.info("Live mode stopped.")
|
||||
|
||||
def _live_mode_loop(self, stop_event: threading.Event):
|
||||
"""Loop to capture images in live mode."""
|
||||
while not stop_event.is_set():
|
||||
try:
|
||||
self.process_data(self.cam.get_image_data())
|
||||
except Exception as e:
|
||||
logger.error(f"Error in live mode loop: {e}")
|
||||
break
|
||||
stop_event.wait(0.2) # 5 Hz
|
||||
|
||||
def process_data(self, image: np.ndarray | None):
|
||||
"""Process the image data before sending it to the preview signal."""
|
||||
if image is None:
|
||||
return
|
||||
self.image.put(image)
|
||||
|
||||
def get_last_image(self) -> np.ndarray:
|
||||
image = self.image.get()
|
||||
if image:
|
||||
return image.data
|
||||
|
||||
############## User Interface Methods ##############
|
||||
|
||||
def on_connected(self):
|
||||
"""Connect to the camera."""
|
||||
self.cam.on_connect()
|
||||
self.live_mode = self._inputs.get("live_mode", None)
|
||||
roi = self._inputs.get("roi", None)
|
||||
if roi is None or not isinstance(roi, (tuple, list)) or not len(roi) == 4:
|
||||
# If ROI is not set, use the full camera resolution
|
||||
roi = (0, 0, self.cam.cam.width.value, self.cam.cam.height.value)
|
||||
self.roi = roi
|
||||
|
||||
def on_destroy(self):
|
||||
"""Clean up resources when the device is destroyed."""
|
||||
self.cam.on_disconnect()
|
||||
super().on_destroy()
|
||||
|
||||
def on_trigger(self):
|
||||
"""Handle the trigger event."""
|
||||
if not self.live_mode:
|
||||
return
|
||||
image = self.image.get()
|
||||
if image is not None:
|
||||
image: messages.DevicePreviewMessage
|
||||
if self.roi["img_shape"][0:2] != image.data.shape[0:2]:
|
||||
logger.info(
|
||||
f"ROI shape does not match image shape, skipping ROI application for device {self.name}."
|
||||
)
|
||||
return
|
||||
|
||||
if len(image.data.shape) == 3:
|
||||
# If the image has multiple channels, apply the mask to each channel
|
||||
data = (
|
||||
image.data * self.roi["mask"][:, :, np.newaxis]
|
||||
) # Apply mask to the image data
|
||||
n_channels = 3
|
||||
else:
|
||||
data = image.data * self.roi["mask"]
|
||||
n_channels = 1
|
||||
self.roi_signal.put(
|
||||
{
|
||||
self.roi_signal.name: {
|
||||
"value": np.sum(data)
|
||||
/ (np.sum(self.roi["mask"]) * n_channels), # TODO can be optimized
|
||||
"timestamp": time.time(),
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Example usage of the IDSCamera class
|
||||
camera = IDSCamera(name="TestCamera", camera_id=201, live_mode=False)
|
||||
print(f"Camera {camera.name} initialized with ID {camera.cam.camera_id}.")
|
||||
@@ -1,83 +0,0 @@
|
||||
import time
|
||||
|
||||
import numpy as np
|
||||
from bec_lib import bec_logger
|
||||
from ophyd import Kind, Signal
|
||||
from ophyd.utils import ReadOnlyError
|
||||
|
||||
from ophyd_devices.utils.bec_device_base import BECDeviceBase
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
# Readout precision for Setable/ReadOnlySignal signals
|
||||
PRECISION = 3
|
||||
|
||||
|
||||
class ReadOnlySignal(Signal):
|
||||
"""Setable signal for simulated devices.
|
||||
|
||||
The signal will store the value in sim_state of the SimulatedData class of the parent device.
|
||||
It will also return the value from sim_state when get is called. Compared to the ReadOnlySignal,
|
||||
this signal can be written to.
|
||||
The setable signal inherits from the Signal class of ophyd, thus the class attribute needs to be
|
||||
initiated as a Component (class from ophyd).
|
||||
|
||||
>>> signal = SetableSignal(name="signal", parent=parent, value=0)
|
||||
|
||||
Parameters
|
||||
----------
|
||||
|
||||
name (string) : Name of the signal
|
||||
parent (object) : Parent object of the signal, default none.
|
||||
value (any) : Initial value of the signal, default 0.
|
||||
kind (int) : Kind of the signal, default Kind.normal.
|
||||
precision (float) : Precision of the signal, default PRECISION.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
name: str,
|
||||
*args,
|
||||
fcn: callable,
|
||||
kind: int = Kind.normal,
|
||||
precision: float = PRECISION,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(*args, name=name, value=value, kind=kind, **kwargs)
|
||||
self._metadata.update(connected=True, write_access=False)
|
||||
self._value = None
|
||||
self.precision = precision
|
||||
self.fcn = fcn
|
||||
|
||||
# pylint: disable=arguments-differ
|
||||
def get(self):
|
||||
"""Get the current position of the simulated device.
|
||||
|
||||
Core function for signal.
|
||||
"""
|
||||
self._value = self.fcn()
|
||||
return self._value
|
||||
|
||||
# pylint: disable=arguments-differ
|
||||
def put(self, value):
|
||||
"""Put the value to the simulated device.
|
||||
|
||||
Core function for signal.
|
||||
"""
|
||||
self._update_sim_state(value)
|
||||
self._value = value
|
||||
|
||||
def describe(self):
|
||||
"""Describe the readback signal.
|
||||
|
||||
Core function for signal.
|
||||
"""
|
||||
res = super().describe()
|
||||
if self.precision is not None:
|
||||
res[self.name]["precision"] = self.precision
|
||||
return res
|
||||
|
||||
@property
|
||||
def timestamp(self):
|
||||
"""Timestamp of the readback value"""
|
||||
return self._get_timestamp()
|
||||
Reference in New Issue
Block a user