diff --git a/.copier-answers.yml b/.copier-answers.yml index a4b4022..e233aca 100644 --- a/.copier-answers.yml +++ b/.copier-answers.yml @@ -6,4 +6,6 @@ _commit: v1.1.2 _src_path: https://github.com/bec-project/plugin_copier_template.git make_commit: false project_name: csaxs_bec -widget_plugins_input: [] +widget_plugins_input: +- name: csaxs_test + use_ui: true diff --git a/csaxs_bec/device_configs/endstation.yaml b/csaxs_bec/device_configs/endstation.yaml index 56a3d10..1bd8ef6 100644 --- a/csaxs_bec/device_configs/endstation.yaml +++ b/csaxs_bec/device_configs/endstation.yaml @@ -28,4 +28,17 @@ mcs: onFailure: raise enabled: true readoutPriority: monitored - softwareTrigger: false \ No newline at end of file + softwareTrigger: false + +ids_cam: + description: IDS camera for live image acquisition + deviceClass: csaxs_bec.devices.ids_cameras.ids_camera_new.IDSCamera + deviceConfig: + camera_id: 201 + bits_per_pixel: 24 + m_n_colormode: 1 + live_mode: True + onFailure: raise + enabled: true + readoutPriority: async + softwareTrigger: True diff --git a/csaxs_bec/devices/epics/delay_generator_csaxs/ddg_1.py b/csaxs_bec/devices/epics/delay_generator_csaxs/ddg_1.py index a5c2702..f5dde68 100644 --- a/csaxs_bec/devices/epics/delay_generator_csaxs/ddg_1.py +++ b/csaxs_bec/devices/epics/delay_generator_csaxs/ddg_1.py @@ -165,20 +165,20 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS): ) self.cancel_on_stop(status_ready_read) self.cancel_on_stop(status_acquiring) - status_ready_read.wait(2) + status_ready_read.wait(10) mcs.ready_to_read.put(READYTOREAD.PROCESSING) mcs.erase_start.put(1) status_acquiring.wait( - timeout=2 + timeout=10 ) # 2 s wait for mcs card to start should be more than enough.. - st = StatusBase() + st = DeviceStatus(self) self.cancel_on_stop(st) self.trigger_shot.put(1, use_complete=True) time.sleep(self.scan_info.msg.scan_parameters["exp_time"]) self.cancel_on_stop(st) - status = self.wait_for_status(status=st, bit_event=STATUSBITS.END_OF_DELAY, timeout=2) + status = self.wait_for_status(status=st, bit_event=STATUSBITS.END_OF_DELAY, timeout=10) return status def wait_for_status( @@ -198,7 +198,11 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS): if (STATUSBITS(event_status) & bit_event) == bit_event: status.set_finished() if time.time() - current_time > timeout: - status.set_exception(TimeoutError(f"Timeout waiting for status {status}")) + status.set_exception( + TimeoutError( + f"Timeout waiting for status of device {self.name} for event_status {bit_event}" + ) + ) break time.sleep(0.1) time.sleep(0.05) # Give time for the IOC to be ready again diff --git a/csaxs_bec/devices/epics/delay_generator_csaxs/ddg_2.py b/csaxs_bec/devices/epics/delay_generator_csaxs/ddg_2.py index 260e649..af84506 100644 --- a/csaxs_bec/devices/epics/delay_generator_csaxs/ddg_2.py +++ b/csaxs_bec/devices/epics/delay_generator_csaxs/ddg_2.py @@ -118,7 +118,7 @@ class DDG2(PSIDeviceBase, DelayGeneratorCSAXS): """ def wait_for_status( - self, status: StatusBase, bit_event: STATUSBITS, timeout: float = 2 + self, status: DeviceStatus, bit_event: STATUSBITS, timeout: float = 5 ) -> None: """Wait for a event status bit to be set. @@ -134,7 +134,11 @@ class DDG2(PSIDeviceBase, DelayGeneratorCSAXS): if (STATUSBITS(event_status) & bit_event) == bit_event: status.set_finished() if time.time() - current_time > timeout: - status.set_exception(TimeoutError(f"Timeout waiting for status {status}")) + status.set_exception( + TimeoutError( + f"Timeout waiting for status of device {self.name} for event_status {bit_event}" + ) + ) break time.sleep(0.1) time.sleep(0.05) # Give time for the IOC to be ready again diff --git a/csaxs_bec/devices/epics/delay_generator_csaxs/delay_generator_csaxs.py b/csaxs_bec/devices/epics/delay_generator_csaxs/delay_generator_csaxs.py index 981f09c..676ef48 100644 --- a/csaxs_bec/devices/epics/delay_generator_csaxs/delay_generator_csaxs.py +++ b/csaxs_bec/devices/epics/delay_generator_csaxs/delay_generator_csaxs.py @@ -403,7 +403,7 @@ class DelayGeneratorCSAXS(Device): In addition, the io layer allows setting amplitude, offset and polarity for each pair. """ - _pv_timeout: float = 1.5 # Default timeout for PV operations in seconds + _pv_timeout: float = 5 # Default timeout for PV operations in seconds # Front Panel t0 = Cpt(StaticPair, "T0", name="t0", doc="T0 static pair") diff --git a/csaxs_bec/devices/epics/mcs_card/mcs_card_csaxs.py b/csaxs_bec/devices/epics/mcs_card/mcs_card_csaxs.py index 166a48e..e3d352c 100644 --- a/csaxs_bec/devices/epics/mcs_card/mcs_card_csaxs.py +++ b/csaxs_bec/devices/epics/mcs_card/mcs_card_csaxs.py @@ -118,7 +118,7 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard): name=name, prefix=prefix, scan_info=scan_info, device_manager=device_manager, **kwargs ) self._mcs_clock = 1e7 # 10MHz clock -> 1e7 Hz - self._pv_timeout = 2 + self._pv_timeout = 3 # TODO remove timeout once #129 in ophyd_devices is solved self._rlock = RLock() self.counter_mapping = { f"{self.counters.name}_mca1": "current1", diff --git a/csaxs_bec/devices/ids_cameras/base_integration/__init__.py b/csaxs_bec/devices/ids_cameras/base_integration/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/csaxs_bec/devices/ids_cameras/base_integration/camera.py b/csaxs_bec/devices/ids_cameras/base_integration/camera.py new file mode 100644 index 0000000..a5a7d86 --- /dev/null +++ b/csaxs_bec/devices/ids_cameras/base_integration/camera.py @@ -0,0 +1,251 @@ +"""This module provides a Camera class for handling IDS cameras using the pyueye SDK library.""" + +from __future__ import annotations + +import atexit +from typing import Literal + +import numpy as np +from bec_lib.logger import bec_logger + +from csaxs_bec.devices.ids_cameras.base_integration.utils import check_error + +logger = bec_logger.logger + +try: + from pyueye import ueye +except ImportError as exc: + logger.warning(f"The pyueye library is not properly installed : {exc}") + ueye = None # type: ignore[assignment] + + +class IDSCameraObject: + """Base class for IDS Camera object. + + Args: + device_id (int): The ID of the camera device. # e.g. 201; check idscamera tool + m_n_colormode (int): Color mode for the camera. # 1 for cSAXS color cameras + bits_per_pixel (int): Number of bits per pixel for the camera. # 24 for color cameras, 8 for monochrome cameras + """ + + def __init__(self, device_id: int, m_n_colormode, bits_per_pixel): + self.ueye = ueye + self._device_id = device_id + self.h_cam = ueye.HIDS(device_id) + self.s_info = ueye.SENSORINFO() + self.c_info = ueye.CAMINFO() + self.rect_roi = ueye.IS_RECT() + self.pc_image_mem = ueye.c_mem_p() + self.mem_id = ueye.int() + self.pitch = ueye.INT() + self.m_n_colormode = ueye.INT(m_n_colormode) + self.n_bits_per_pixel = ueye.INT(bits_per_pixel) + self.bytes_per_pixel = int(self.n_bits_per_pixel / 8) + + # Sequence to initialize the camera + check_error(ueye.is_InitCamera(self.h_cam, None), "IDSCameraObject") + check_error(ueye.is_GetSensorInfo(self.h_cam, self.s_info), "IDSCameraObject") + check_error(ueye.is_GetCameraInfo(self.h_cam, self.c_info), "IDSCameraObject") + check_error(ueye.is_ResetToDefault(self.h_cam), "IDSCameraObject") + check_error(ueye.is_SetDisplayMode(self.h_cam, ueye.IS_SET_DM_DIB), "IDSCameraObject") + + if ( + int.from_bytes(self.s_info.nColorMode.value, byteorder="big") + == self.ueye.IS_COLORMODE_BAYER + ): + print("Bayer color mode detected.") + # setup the color depth to the current windows setting + self.ueye.is_GetColorDepth( + self.h_cam, self.n_bits_per_pixel, self.m_n_colormode + ) # TODO This raises an error - maybe check the m_n_colormode value + self.bytes_per_pixel = int(self.n_bits_per_pixel / 8) + elif ( + int.from_bytes(self.s_info.nColorMode.value, byteorder="big") + == self.ueye.IS_COLORMODE_CBYCRY + ): + # for color camera models use RGB32 mode + self.m_n_colormode = self.ueye.IS_CM_BGRA8_PACKED + self.n_bits_per_pixel = self.ueye.INT(32) + self.bytes_per_pixel = int(self.n_bits_per_pixel / 8) + elif ( + int.from_bytes(self.s_info.nColorMode.value, byteorder="big") + == self.ueye.IS_COLORMODE_MONOCHROME + ): + # for color camera models use RGB32 mode + self.m_n_colormode = self.ueye.IS_CM_MONO8 + self.n_bits_per_pixel = self.ueye.INT(8) + self.bytes_per_pixel = int(self.n_bits_per_pixel / 8) + else: + # for monochrome camera models use Y8 mode + self.m_n_colormode = self.ueye.IS_CM_MONO8 + self.n_bits_per_pixel = self.ueye.INT(8) + self.bytes_per_pixel = int(self.n_bits_per_pixel / 8) + print("else") + + # Can be used to set the size and position of an "area of interest"(AOI) within an image + check_error( + self.ueye.is_AOI( + self.h_cam, + self.ueye.IS_AOI_IMAGE_GET_AOI, + self.rect_roi, + self.ueye.sizeof(self.rect_roi), + ), + "IDSCameraObject", + ) + self.width = self.rect_roi.s32Width + self.height = self.rect_roi.s32Height + + check_error( + self.ueye.is_AllocImageMem( + self.h_cam, + self.width, + self.height, + self.n_bits_per_pixel, + self.pc_image_mem, + self.mem_id, + ), + "IDSCameraObject", + ) + + check_error( + self.ueye.is_SetImageMem(self.h_cam, self.pc_image_mem, self.mem_id), "IDSCameraObject" + ) + check_error(self.ueye.is_SetColorMode(self.h_cam, self.m_n_colormode), "IDSCameraObject") + + check_error( + self.ueye.is_CaptureVideo(self.h_cam, self.ueye.IS_DONT_WAIT), "IDSCameraObject" + ) + check_error( + self.ueye.is_InquireImageMem( + self.h_cam, + self.pc_image_mem, + self.mem_id, + self.width, + self.height, + self.n_bits_per_pixel, + self.pitch, + ), + "IDSCameraObject", + ) + + def __repr__(self): + return f"IDSCameraObject\n\ndevice_id={self._device_id},\ns_info={self.s_info},\nc_info={self.c_info},\nrect_roi={self.rect_roi},\npc_image_mem={self.pc_image_mem},\nmem_id={self.mem_id},\npitch={self.pitch},\nm_n_colormode={self.m_n_colormode},\nn_bits_per_pixel={self.n_bits_per_pixel},\nbytes_per_pixel={self.bytes_per_pixel}" + + +class Camera: + """Camera base class for IDS cameras.""" + + def __init__( + self, + camera_id: int, + m_n_colormode: Literal[0, 1, 2, 3] = 1, + bits_per_pixel: int = 24, + connect: bool = True, + ): + self.ueye = ueye + self.camera_id = camera_id + self._inputs = {"m_n_colormode": m_n_colormode, "bits_per_pixel": bits_per_pixel} + self._connected = False + self.cam = None + atexit.register(self.on_disconnect) + + if connect: + self.on_connect() + + def set_roi(self, x: int, y: int, width: int, height: int): + """Set the region of interest (ROI) for the camera.""" + rect_roi = ueye.IS_RECT() + rect_roi.s32X = x + rect_roi.s32Y = y + rect_roi.s32Width = width + rect_roi.s32Height = height + + ret = self.ueye.is_AOI( + self.cam.h_cam, self.ueye.IS_AOI_IMAGE_SET_AOI, rect_roi, self.ueye.sizeof(rect_roi) + ) + check_error(ret, "IDSCameraObject") + logger.info(f"ROI set to: {rect_roi}") + + def on_connect(self): + """Connect to the camera and initialize it.""" + if self._connected: + logger.warning("Camera is already connected.") + return + self.cam = IDSCameraObject(self.camera_id, **self._inputs) + self._connected = True + + def on_disconnect(self): + """Disconnect from the camera.""" + try: + if self.cam and self.cam.h_cam: + check_error(self.ueye.is_ExitCamera(self.cam.h_cam), "IDSCameraObject") + self._connected = False + self.cam = None + logger.info("Camera disconnected.") + except Exception as e: + logger.info(f"Error during camera disconnection: {e}") + + @property + def exposure_time(self) -> float: + """Get the exposure time of the camera.""" + exposure = ueye.c_double() + ret = self.ueye.is_Exposure(self.cam.h_cam, ueye.IS_EXPOSURE_CMD_GET_EXPOSURE, exposure, 8) + check_error(ret, "IDSCameraObject") + return exposure.value + + @exposure_time.setter + def exposure_time(self, value: float): + """Set the exposure time of the camera.""" + exposure = ueye.c_double(value) + check_error( + self.ueye.is_Exposure(self.cam.h_cam, ueye.IS_EXPOSURE_CMD_SET_EXPOSURE, exposure, 8), + "IDSCameraObject", + ) + + def set_auto_gain(self, enable: bool): + """Enable or disable auto gain.""" + enable = ueye.c_int(1) if enable else ueye.c_int(0) + value_to_return = ueye.c_double() + check_error( + self.ueye.is_SetAutoParameter( + self.cam.h_cam, ueye.IS_SET_ENABLE_AUTO_GAIN, enable, value_to_return + ), + "IDSCameraObject", + ) + + def set_auto_shutter(self, enable: bool): + """Enable or disable auto exposure.""" + enable = ueye.c_int(1) if enable else ueye.c_int(0) + value_to_return = ueye.c_double() + check_error( + self.ueye.is_SetAutoParameter( + self.cam.h_cam, ueye.IS_SET_ENABLE_AUTO_SHUTTER, enable, value_to_return + ), + "IDSCameraObject", + ) + + def get_image_data(self) -> np.ndarray | None: + """Get the image data from the camera.""" + if not self._connected: + logger.warning("Camera is not connected.") + return None + array = self.ueye.get_data( + self.cam.pc_image_mem, + self.cam.width, + self.cam.height, + self.cam.n_bits_per_pixel, + self.cam.pitch, + copy=False, + ) + if array is None: + logger.error("Failed to get image data from the camera.") + return None + return np.reshape( + array, (self.cam.height.value, self.cam.width.value, self.cam.bytes_per_pixel) + ) + + +if __name__ == "__main__": + # Example usage + camera = Camera(camera_id=201) + camera.on_connect() diff --git a/csaxs_bec/devices/ids_cameras/base_integration/utils.py b/csaxs_bec/devices/ids_cameras/base_integration/utils.py new file mode 100644 index 0000000..c059cab --- /dev/null +++ b/csaxs_bec/devices/ids_cameras/base_integration/utils.py @@ -0,0 +1,281 @@ +"""Utility functions and classes for IDS cameras using the pyueye library.""" + +from unittest import mock + +from bec_lib.logger import bec_logger + +logger = bec_logger.logger + +try: + from pyueye import ueye +except ImportError as exc: + logger.warning(f"The pyueye library is not properly installed : {exc}") + ueye = None + +if ueye is not None: + error_codes = { + ueye.IS_NO_SUCCESS: "No success", + ueye.IS_SUCCESS: "Success", + ueye.IS_INVALID_CAMERA_HANDLE: "Invalid camera handle", + ueye.IS_INVALID_HANDLE: "Invalid handle", + ueye.IS_IO_REQUEST_FAILED: "IO request failed", + ueye.IS_CANT_OPEN_DEVICE: "Cannot open device", + ueye.IS_CANT_CLOSE_DEVICE: "Cannot close device", + ueye.IS_CANT_SETUP_MEMORY: "Cannot setup memory", + ueye.IS_NO_HWND_FOR_ERROR_REPORT: "No HWND for error report", + ueye.IS_ERROR_MESSAGE_NOT_CREATED: "Error message not created", + ueye.IS_ERROR_STRING_NOT_FOUND: "Error string not found", + ueye.IS_HOOK_NOT_CREATED: "Hook not created", + ueye.IS_TIMER_NOT_CREATED: "Timer not created", + ueye.IS_CANT_OPEN_REGISTRY: "Cannot open registry", + ueye.IS_CANT_READ_REGISTRY: "Cannot read registry", + ueye.IS_CANT_VALIDATE_BOARD: "Cannot validate board", + ueye.IS_CANT_GIVE_BOARD_ACCESS: "Cannot give board access", + ueye.IS_NO_IMAGE_MEM_ALLOCATED: "No image memory allocated", + ueye.IS_CANT_CLEANUP_MEMORY: "Cannot clean up memory", + ueye.IS_CANT_COMMUNICATE_WITH_DRIVER: "Cannot communicate with driver", + ueye.IS_FUNCTION_NOT_SUPPORTED_YET: "Function not supported yet", + ueye.IS_OPERATING_SYSTEM_NOT_SUPPORTED: "Operating system not supported", + ueye.IS_INVALID_VIDEO_IN: "Invalid video input", + ueye.IS_INVALID_IMG_SIZE: "Invalid image size", + ueye.IS_INVALID_ADDRESS: "Invalid address", + ueye.IS_INVALID_VIDEO_MODE: "Invalid video mode", + ueye.IS_INVALID_AGC_MODE: "Invalid AGC mode", + ueye.IS_INVALID_GAMMA_MODE: "Invalid gamma mode", + ueye.IS_INVALID_SYNC_LEVEL: "Invalid sync level", + ueye.IS_INVALID_CBARS_MODE: "Invalid color bars mode", + ueye.IS_INVALID_COLOR_MODE: "Invalid color mode", + ueye.IS_INVALID_SCALE_FACTOR: "Invalid scale factor", + ueye.IS_INVALID_IMAGE_SIZE: "Invalid image size", + ueye.IS_INVALID_IMAGE_POS: "Invalid image position", + ueye.IS_INVALID_CAPTURE_MODE: "Invalid capture mode", + ueye.IS_INVALID_RISC_PROGRAM: "Invalid RISC program", + ueye.IS_INVALID_BRIGHTNESS: "Invalid brightness", + ueye.IS_INVALID_CONTRAST: "Invalid contrast", + ueye.IS_INVALID_SATURATION_U: "Invalid saturation U", + ueye.IS_INVALID_SATURATION_V: "Invalid saturation V", + ueye.IS_INVALID_HUE: "Invalid hue", + ueye.IS_INVALID_HOR_FILTER_STEP: "Invalid horizontal filter step", + ueye.IS_INVALID_VERT_FILTER_STEP: "Invalid vertical filter step", + ueye.IS_INVALID_EEPROM_READ_ADDRESS: "Invalid EEPROM read address", + ueye.IS_INVALID_EEPROM_WRITE_ADDRESS: "Invalid EEPROM write address", + ueye.IS_INVALID_EEPROM_READ_LENGTH: "Invalid EEPROM read length", + ueye.IS_INVALID_EEPROM_WRITE_LENGTH: "Invalid EEPROM write length", + ueye.IS_INVALID_BOARD_INFO_POINTER: "Invalid board info pointer", + ueye.IS_INVALID_DISPLAY_MODE: "Invalid display mode", + ueye.IS_INVALID_ERR_REP_MODE: "Invalid error report mode", + ueye.IS_INVALID_BITS_PIXEL: "Invalid bits per pixel", + ueye.IS_INVALID_MEMORY_POINTER: "Invalid memory pointer", + ueye.IS_FILE_WRITE_OPEN_ERROR: "File write open error", + ueye.IS_FILE_READ_OPEN_ERROR: "File read open error", + ueye.IS_FILE_READ_INVALID_BMP_ID: "File read invalid BMP ID", + ueye.IS_FILE_READ_INVALID_BMP_SIZE: "File read invalid BMP size", + ueye.IS_FILE_READ_INVALID_BIT_COUNT: "File read invalid bit count", + ueye.IS_WRONG_KERNEL_VERSION: "Wrong kernel version", + ueye.IS_RISC_INVALID_XLENGTH: "RISC invalid X length", + ueye.IS_RISC_INVALID_YLENGTH: "RISC invalid Y length", + ueye.IS_RISC_EXCEED_IMG_SIZE: "RISC exceed image size", + ueye.IS_DD_MAIN_FAILED: "DirectDraw main surface failed", + ueye.IS_DD_PRIMSURFACE_FAILED: "DirectDraw primary surface failed", + ueye.IS_DD_SCRN_SIZE_NOT_SUPPORTED: "Screen size not supported", + ueye.IS_DD_CLIPPER_FAILED: "Clipper failed", + ueye.IS_DD_CLIPPER_HWND_FAILED: "Clipper HWND failed", + ueye.IS_DD_CLIPPER_CONNECT_FAILED: "Clipper connect failed", + ueye.IS_DD_BACKSURFACE_FAILED: "Backsurface failed", + ueye.IS_DD_BACKSURFACE_IN_SYSMEM: "Backsurface in system memory", + ueye.IS_DD_MDL_MALLOC_ERR: "Memory malloc error", + ueye.IS_DD_MDL_SIZE_ERR: "Memory size error", + ueye.IS_DD_CLIP_NO_CHANGE: "Clip no change", + ueye.IS_DD_PRIMMEM_NULL: "Primary memory null", + ueye.IS_DD_BACKMEM_NULL: "Back memory null", + ueye.IS_DD_BACKOVLMEM_NULL: "Back overlay memory null", + ueye.IS_DD_OVERLAYSURFACE_FAILED: "Overlay surface failed", + ueye.IS_DD_OVERLAYSURFACE_IN_SYSMEM: "Overlay surface in system memory", + ueye.IS_DD_OVERLAY_NOT_ALLOWED: "Overlay not allowed", + ueye.IS_DD_OVERLAY_COLKEY_ERR: "Overlay color key error", + ueye.IS_DD_OVERLAY_NOT_ENABLED: "Overlay not enabled", + ueye.IS_DD_GET_DC_ERROR: "Get DC error", + ueye.IS_DD_DDRAW_DLL_NOT_LOADED: "DirectDraw DLL not loaded", + ueye.IS_DD_THREAD_NOT_CREATED: "DirectDraw thread not created", + ueye.IS_DD_CANT_GET_CAPS: "Cannot get capabilities", + ueye.IS_DD_NO_OVERLAYSURFACE: "No overlay surface", + ueye.IS_DD_NO_OVERLAYSTRETCH: "No overlay stretch", + ueye.IS_DD_CANT_CREATE_OVERLAYSURFACE: "Cannot create overlay surface", + ueye.IS_DD_CANT_UPDATE_OVERLAYSURFACE: "Cannot update overlay surface", + ueye.IS_DD_INVALID_STRETCH: "Invalid stretch", + ueye.IS_EV_INVALID_EVENT_NUMBER: "Invalid event number", + ueye.IS_INVALID_MODE: "Invalid mode", + ueye.IS_CANT_FIND_HOOK: "Cannot find hook", + ueye.IS_CANT_GET_HOOK_PROC_ADDR: "Cannot get hook procedure address", + ueye.IS_CANT_CHAIN_HOOK_PROC: "Cannot chain hook procedure", + ueye.IS_CANT_SETUP_WND_PROC: "Cannot setup window procedure", + ueye.IS_HWND_NULL: "HWND is null", + ueye.IS_INVALID_UPDATE_MODE: "Invalid update mode", + ueye.IS_NO_ACTIVE_IMG_MEM: "No active image memory", + ueye.IS_CANT_INIT_EVENT: "Cannot initialize event", + ueye.IS_FUNC_NOT_AVAIL_IN_OS: "Function not available in OS", + ueye.IS_CAMERA_NOT_CONNECTED: "Camera not connected", + ueye.IS_SEQUENCE_LIST_EMPTY: "Sequence list empty", + ueye.IS_CANT_ADD_TO_SEQUENCE: "Cannot add to sequence", + ueye.IS_LOW_OF_SEQUENCE_RISC_MEM: "Low sequence RISC memory", + ueye.IS_IMGMEM2FREE_USED_IN_SEQ: "Image memory to free used in sequence", + ueye.IS_IMGMEM_NOT_IN_SEQUENCE_LIST: "Image memory not in sequence list", + ueye.IS_SEQUENCE_BUF_ALREADY_LOCKED: "Sequence buffer already locked", + ueye.IS_INVALID_DEVICE_ID: "Invalid device ID", + ueye.IS_INVALID_BOARD_ID: "Invalid board ID", + ueye.IS_ALL_DEVICES_BUSY: "All devices busy", + ueye.IS_HOOK_BUSY: "Hook busy", + ueye.IS_TIMED_OUT: "Timed out", + ueye.IS_NULL_POINTER: "Null pointer", + ueye.IS_WRONG_HOOK_VERSION: "Wrong hook version", + ueye.IS_INVALID_PARAMETER: "Invalid parameter", + ueye.IS_NOT_ALLOWED: "Not allowed", + ueye.IS_OUT_OF_MEMORY: "Out of memory", + ueye.IS_INVALID_WHILE_LIVE: "Invalid while live", + ueye.IS_ACCESS_VIOLATION: "Access violation", + ueye.IS_UNKNOWN_ROP_EFFECT: "Unknown ROP effect", + ueye.IS_INVALID_RENDER_MODE: "Invalid render mode", + ueye.IS_INVALID_THREAD_CONTEXT: "Invalid thread context", + ueye.IS_NO_HARDWARE_INSTALLED: "No hardware installed", + ueye.IS_INVALID_WATCHDOG_TIME: "Invalid watchdog time", + ueye.IS_INVALID_WATCHDOG_MODE: "Invalid watchdog mode", + ueye.IS_INVALID_PASSTHROUGH_IN: "Invalid passthrough input", + ueye.IS_ERROR_SETTING_PASSTHROUGH_IN: "Error setting passthrough input", + ueye.IS_FAILURE_ON_SETTING_WATCHDOG: "Failure setting watchdog", + ueye.IS_NO_USB20: "No USB 2.0", + ueye.IS_CAPTURE_RUNNING: "Capture running", + ueye.IS_MEMORY_BOARD_ACTIVATED: "Memory board activated", + ueye.IS_MEMORY_BOARD_DEACTIVATED: "Memory board deactivated", + ueye.IS_NO_MEMORY_BOARD_CONNECTED: "No memory board connected", + ueye.IS_TOO_LESS_MEMORY: "Too little memory", + ueye.IS_IMAGE_NOT_PRESENT: "Image not present", + ueye.IS_MEMORY_MODE_RUNNING: "Memory mode running", + ueye.IS_MEMORYBOARD_DISABLED: "Memoryboard disabled", + ueye.IS_TRIGGER_ACTIVATED: "Trigger activated", + ueye.IS_WRONG_KEY: "Wrong key", + ueye.IS_CRC_ERROR: "CRC error", + ueye.IS_NOT_YET_RELEASED: "Not yet released", + ueye.IS_NOT_CALIBRATED: "Not calibrated", # already present + ueye.IS_WAITING_FOR_KERNEL: "Waiting for kernel", + ueye.IS_NOT_SUPPORTED: "Not supported", # already present + ueye.IS_TRIGGER_NOT_ACTIVATED: "Trigger not activated", + ueye.IS_OPERATION_ABORTED: "Operation aborted", + ueye.IS_BAD_STRUCTURE_SIZE: "Bad structure size", + ueye.IS_INVALID_BUFFER_SIZE: "Invalid buffer size", + ueye.IS_INVALID_PIXEL_CLOCK: "Invalid pixel clock", + ueye.IS_INVALID_EXPOSURE_TIME: "Invalid exposure time", + ueye.IS_AUTO_EXPOSURE_RUNNING: "Auto exposure running", + ueye.IS_CANNOT_CREATE_BB_SURF: "Cannot create BB surface", + ueye.IS_CANNOT_CREATE_BB_MIX: "Cannot create BB mix", + ueye.IS_BB_OVLMEM_NULL: "BB overlay memory null", + ueye.IS_CANNOT_CREATE_BB_OVL: "Cannot create BB overlay", + ueye.IS_NOT_SUPP_IN_OVL_SURF_MODE: "Not supported in overlay surface mode", + ueye.IS_INVALID_SURFACE: "Invalid surface", + ueye.IS_SURFACE_LOST: "Surface lost", + ueye.IS_RELEASE_BB_OVL_DC: "Release BB overlay DC", + ueye.IS_BB_TIMER_NOT_CREATED: "BB timer not created", + ueye.IS_BB_OVL_NOT_EN: "BB overlay not enabled", + ueye.IS_ONLY_IN_BB_MODE: "Only in BB mode", + ueye.IS_INVALID_COLOR_FORMAT: "Invalid color format", + ueye.IS_INVALID_WB_BINNING_MODE: "Invalid WB binning mode", + ueye.IS_INVALID_I2C_DEVICE_ADDRESS: "Invalid I²C device address", + ueye.IS_COULD_NOT_CONVERT: "Could not convert", + ueye.IS_TRANSFER_ERROR: "Transfer error", # already present + ueye.IS_PARAMETER_SET_NOT_PRESENT: "Parameter set not present", + ueye.IS_INVALID_CAMERA_TYPE: "Invalid camera type", + ueye.IS_INVALID_HOST_IP_HIBYTE: "Invalid host IP high byte", + ueye.IS_CM_NOT_SUPP_IN_CURR_DISPLAYMODE: "Color matrix not supported in current display mode", + ueye.IS_NO_IR_FILTER: "No IR filter", + ueye.IS_STARTER_FW_UPLOAD_NEEDED: "Starter firmware upload needed", + ueye.IS_DR_LIBRARY_NOT_FOUND: "Driver library not found", + ueye.IS_DR_DEVICE_OUT_OF_MEMORY: "Driver device out of memory", + ueye.IS_DR_CANNOT_CREATE_SURFACE: "Driver cannot create surface", + ueye.IS_DR_CANNOT_CREATE_VERTEX_BUFFER: "Driver cannot create vertex buffer", + ueye.IS_DR_CANNOT_CREATE_TEXTURE: "Driver cannot create texture", + ueye.IS_DR_CANNOT_LOCK_OVERLAY_SURFACE: "Driver cannot lock overlay surface", + ueye.IS_DR_CANNOT_UNLOCK_OVERLAY_SURFACE: "Driver cannot unlock overlay surface", + ueye.IS_DR_CANNOT_GET_OVERLAY_DC: "Driver cannot get overlay DC", + ueye.IS_DR_CANNOT_RELEASE_OVERLAY_DC: "Driver cannot release overlay DC", + ueye.IS_DR_DEVICE_CAPS_INSUFFICIENT: "Driver device capabilities insufficient", + ueye.IS_INCOMPATIBLE_SETTING: "Incompatible setting", + ueye.IS_DR_NOT_ALLOWED_WHILE_DC_IS_ACTIVE: "Driver not allowed while DC is active", + ueye.IS_DEVICE_ALREADY_PAIRED: "Device already paired", + ueye.IS_SUBNETMASK_MISMATCH: "Subnet mask mismatch", + ueye.IS_SUBNET_MISMATCH: "Subnet mismatch", + ueye.IS_INVALID_IP_CONFIGURATION: "Invalid IP configuration", + ueye.IS_DEVICE_NOT_COMPATIBLE: "Device not compatible", + ueye.IS_NETWORK_FRAME_SIZE_INCOMPATIBLE: "Network frame size incompatible", + ueye.IS_NETWORK_CONFIGURATION_INVALID: "Network configuration invalid", + ueye.IS_ERROR_CPU_IDLE_STATES_CONFIGURATION: "CPU idle states configuration error", + ueye.IS_DEVICE_BUSY: "Device busy", + ueye.IS_SENSOR_INITIALIZATION_FAILED: "Sensor initialization failed", + ueye.IS_IMAGE_BUFFER_NOT_DWORD_ALIGNED: "Image buffer not DWORD aligned", + ueye.IS_SEQ_BUFFER_IS_LOCKED: "Sequence buffer is locked", + ueye.IS_FILE_PATH_DOES_NOT_EXIST: "File path does not exist", + ueye.IS_INVALID_WINDOW_HANDLE: "Invalid window handle", + ueye.IS_INVALID_IMAGE_PARAMETER: "Invalid image parameter", + ueye.IS_NO_SUCH_DEVICE: "No such device", + ueye.IS_DEVICE_IN_USE: "Device in use", + } + + bits_per_pixel = { + ueye.IS_CM_SENSOR_RAW8: 8, + ueye.IS_CM_SENSOR_RAW10: 16, + ueye.IS_CM_SENSOR_RAW12: 16, + ueye.IS_CM_SENSOR_RAW16: 16, + ueye.IS_CM_MONO8: 8, + ueye.IS_CM_RGB8_PACKED: 24, + ueye.IS_CM_BGR8_PACKED: 24, + ueye.IS_CM_RGBA8_PACKED: 32, + ueye.IS_CM_BGRA8_PACKED: 32, + ueye.IS_CM_BGR10_PACKED: 32, + ueye.IS_CM_RGB10_PACKED: 32, + ueye.IS_CM_BGRA12_UNPACKED: 64, + ueye.IS_CM_BGR12_UNPACKED: 48, + ueye.IS_CM_BGRY8_PACKED: 32, + ueye.IS_CM_BGR565_PACKED: 16, + ueye.IS_CM_BGR5_PACKED: 16, + ueye.IS_CM_UYVY_PACKED: 16, + ueye.IS_CM_UYVY_MONO_PACKED: 16, + ueye.IS_CM_UYVY_BAYER_PACKED: 16, + ueye.IS_CM_CBYCRY_PACKED: 16, + } + + +def get_bits_per_pixel(color_mode): + """ + Returns the number of bits per pixel for the given color mode. + """ + if color_mode not in bits_per_pixel: + raise UEyeException(f"Unknown color mode: {color_mode}") + return bits_per_pixel[color_mode] + + +class UEyeException(Exception): + """Custom exception for uEye errors.""" + + def __init__(self, error_code, called_from: str | None = None): + self.error_code = error_code + self.called_from = called_from if called_from is not None else "" + + def __str__(self): + if self.error_code in error_codes: + return f"Exception: {error_codes[self.error_code]} raised in {self.called_from}." + else: + for att, val in ueye.__dict__.items(): + if ( + att[0:2] == "IS" + and val == self.error_code + and ("FAILED" in att or "INVALID" in att or "ERROR" in att or "NOT" in att) + ): + return f"Exception: {str(self.error_code)} ({att} ? {val}) raised in {self.called_from}." + return f"Exception: {str(self.error_code)} raised in {self.called_from}." + + +def check_error(error_code, called_from: str | None = None): + """ + Check an error code, and raise an error if adequate. + """ + if error_code != ueye.IS_SUCCESS: + called_from = called_from if called_from is not None else "" + raise UEyeException(error_code, called_from) diff --git a/csaxs_bec/devices/ids_cameras/ids_camera.py b/csaxs_bec/devices/ids_cameras/ids_camera.py index 37edadb..1b7fe7f 100644 --- a/csaxs_bec/devices/ids_cameras/ids_camera.py +++ b/csaxs_bec/devices/ids_cameras/ids_camera.py @@ -3,226 +3,95 @@ import time import numpy as np from ophyd import Component as Cpt -from ophyd import Device, Kind -from ophyd_devices.interfaces.base_classes.psi_detector_base import ( - CustomDetectorMixin, - PSIDetectorBase, -) -from ophyd_devices.sim.sim_signals import SetableSignal - -try: - from pyueye import ueye -except ImportError: - # The pyueye library is not installed or doesn't provide the necessary c libs - ueye = None +from ophyd import DeviceStatus, Kind, Signal, StatusBase +from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase +from ophyd_devices.utils.bec_signals import PreviewSignal -class IDSCustomPrepare(CustomDetectorMixin): +class ROISignal(Signal): + """ + Signal to handle the Region of Interest (ROI) for the IDS camera. + It is a tuple of (x, y, width, height). + """ - USER_ACCESS = ["pyueye"] - pyueye = ueye - - def __init__(self, *_args, parent: Device = None, **_kwargs) -> None: - super().__init__(*_args, parent=parent, **_kwargs) - self.ueye = ueye - self.h_cam = None - self.s_info = None - self.data_thread = None - self.thread_event = None - - def on_connection_established(self): - self.hCam = self.ueye.HIDS( - self.parent.camera_ID - ) # 0: first available camera; 1-254: The camera with the specified camera ID - self.sInfo = self.ueye.SENSORINFO() - self.cInfo = self.ueye.CAMINFO() - self.pcImageMemory = self.ueye.c_mem_p() - self.MemID = self.ueye.int() - self.rectAOI = self.ueye.IS_RECT() - self.pitch = self.ueye.INT() - self.nBitsPerPixel = self.ueye.INT( - self.parent.bits_per_pixel - ) # 24: bits per pixel for color mode; take 8 bits per pixel for monochrome - self.channels = ( - self.parent.channels - ) # 3: channels for color mode(RGB); take 1 channel for monochrome - self.m_nColorMode = self.ueye.INT( - self.parent.m_n_colormode - ) # Y8/RGB16/RGB24/REG32 (1 for our color cameras) - self.bytes_per_pixel = int(self.nBitsPerPixel / 8) - - # Starts the driver and establishes the connection to the camera - nRet = self.ueye.is_InitCamera(self.hCam, None) - if nRet != self.ueye.IS_SUCCESS: - print("is_InitCamera ERROR") - - # Reads out the data hard-coded in the non-volatile camera memory and writes it to the data structure that cInfo points to - nRet = self.ueye.is_GetCameraInfo(self.hCam, self.cInfo) - if nRet != self.ueye.IS_SUCCESS: - print("is_GetCameraInfo ERROR") - - # You can query additional information about the sensor type used in the camera - nRet = self.ueye.is_GetSensorInfo(self.hCam, self.sInfo) - if nRet != self.ueye.IS_SUCCESS: - print("is_GetSensorInfo ERROR") - - nRet = self.ueye.is_ResetToDefault(self.hCam) - if nRet != self.ueye.IS_SUCCESS: - print("is_ResetToDefault ERROR") - - # Set display mode to DIB - nRet = self.ueye.is_SetDisplayMode(self.hCam, self.ueye.IS_SET_DM_DIB) - - # Set the right color mode - if ( - int.from_bytes(self.sInfo.nColorMode.value, byteorder="big") - == self.ueye.IS_COLORMODE_BAYER - ): - # setup the color depth to the current windows setting - self.ueye.is_GetColorDepth(self.hCam, self.nBitsPerPixel, self.m_nColorMode) - bytes_per_pixel = int(self.nBitsPerPixel / 8) - print("IS_COLORMODE_BAYER: ") - print("\tm_nColorMode: \t\t", self.m_nColorMode) - print("\tnBitsPerPixel: \t\t", self.nBitsPerPixel) - print("\tbytes_per_pixel: \t\t", bytes_per_pixel) - print() - - elif ( - int.from_bytes(self.sInfo.nColorMode.value, byteorder="big") - == self.ueye.IS_COLORMODE_CBYCRY - ): - # for color camera models use RGB32 mode - m_nColorMode = ueye.IS_CM_BGRA8_PACKED - nBitsPerPixel = ueye.INT(32) - bytes_per_pixel = int(self.nBitsPerPixel / 8) - print("IS_COLORMODE_CBYCRY: ") - print("\tm_nColorMode: \t\t", m_nColorMode) - print("\tnBitsPerPixel: \t\t", nBitsPerPixel) - print("\tbytes_per_pixel: \t\t", bytes_per_pixel) - print() - - elif ( - int.from_bytes(self.sInfo.nColorMode.value, byteorder="big") - == self.ueye.IS_COLORMODE_MONOCHROME - ): - # for color camera models use RGB32 mode - m_nColorMode = self.ueye.IS_CM_MONO8 - nBitsPerPixel = self.ueye.INT(8) - bytes_per_pixel = int(nBitsPerPixel / 8) - print("IS_COLORMODE_MONOCHROME: ") - print("\tm_nColorMode: \t\t", m_nColorMode) - print("\tnBitsPerPixel: \t\t", nBitsPerPixel) - print("\tbytes_per_pixel: \t\t", bytes_per_pixel) - print() - - else: - # for monochrome camera models use Y8 mode - m_nColorMode = self.ueye.IS_CM_MONO8 - nBitsPerPixel = self.ueye.INT(8) - bytes_per_pixel = int(nBitsPerPixel / 8) - print("else") - - # Can be used to set the size and position of an "area of interest"(AOI) within an image - nRet = self.ueye.is_AOI( - self.hCam, ueye.IS_AOI_IMAGE_GET_AOI, self.rectAOI, self.ueye.sizeof(self.rectAOI) + def __init__( + self, + *, + name, + roi: tuple | None = None, + value=0, + dtype=None, + shape=None, + timestamp=None, + parent=None, + labels=None, + kind=Kind.hinted, + tolerance=None, + rtolerance=None, + metadata=None, + cl=None, + attr_name="", + ): + super().__init__( + name=name, + value=value, + dtype=dtype, + shape=shape, + timestamp=timestamp, + parent=parent, + labels=labels, + kind=kind, + tolerance=tolerance, + rtolerance=rtolerance, + metadata=metadata, + cl=cl, + attr_name=attr_name, ) - if nRet != self.ueye.IS_SUCCESS: - print("is_AOI ERROR") + self.roi = roi - self.width = self.rectAOI.s32Width - self.height = self.rectAOI.s32Height + def get(self, **kwargs): + image = self.parent.image_data.get().data + if not isinstance(image, np.ndarray): + return -1 # -1 if no valid image is available - # Prints out some information about the camera and the sensor - print("Camera model:\t\t", self.sInfo.strSensorName.decode("utf-8")) - print("Camera serial no.:\t", self.cInfo.SerNo.decode("utf-8")) - print("Maximum image width:\t", self.width) - print("Maximum image height:\t", self.height) - print() - - # --------------------------------------------------------------------------------------------------------------------------------------- - - # Allocates an image memory for an image having its dimensions defined by width and height and its color depth defined by nBitsPerPixel - nRet = self.ueye.is_AllocImageMem( - self.hCam, self.width, self.height, self.nBitsPerPixel, self.pcImageMemory, self.MemID - ) - if nRet != self.ueye.IS_SUCCESS: - print("is_AllocImageMem ERROR") + if self.roi is None: + roi = (0, 0, image.shape[1], image.shape[0]) else: - # Makes the specified image memory the active memory - nRet = self.ueye.is_SetImageMem(self.hCam, self.pcImageMemory, self.MemID) - if nRet != self.ueye.IS_SUCCESS: - print("is_SetImageMem ERROR") - else: - # Set the desired color mode - nRet = self.ueye.is_SetColorMode(self.hCam, self.m_nColorMode) - - # Activates the camera's live video mode (free run mode) - nRet = self.ueye.is_CaptureVideo(self.hCam, self.ueye.IS_DONT_WAIT) - if nRet != self.ueye.IS_SUCCESS: - print("is_CaptureVideo ERROR") - - # Enables the queue mode for existing image memory sequences - nRet = self.ueye.is_InquireImageMem( - self.hCam, - self.pcImageMemory, - self.MemID, - self.width, - self.height, - self.nBitsPerPixel, - self.pitch, - ) - if nRet != self.ueye.IS_SUCCESS: - print("is_InquireImageMem ERROR") - else: - print("Press q to leave the programm") - startmeasureframerate = True - Gain = False - - # Start live mode of camera immediately - self.parent.start_live_mode() - - def _start_data_thread(self): - self.thread_event = threading.Event() - self.data_thread = threading.Thread(target=self._receive_data_from_camera, daemon=True) - self.data_thread.start() - - def _receive_data_from_camera(self): - while not self.thread_event.is_set(): - - # In order to display the image in an OpenCV window we need to... - # ...extract the data of our image memory - array = ueye.get_data( - self.pcImageMemory, - self.width, - self.height, - self.nBitsPerPixel, - self.pitch, - copy=False, - ) - - # bytes_per_pixel = int(nBitsPerPixel / 8) - - # ...reshape it in an numpy array... - frame = np.reshape(array, (self.height.value, self.width.value, self.bytes_per_pixel)) - self.parent.image_data.put(frame) - self.parent._run_subs(sub_type=self.parent.SUB_MONITOR, value=frame) - - time.sleep(0.1) - - def on_trigger(self): - pass - # self.parent._run_subs(sub_type=self.parent.SUB_MONITOR, value=self.parent.image_data.get()) + roi = self.roi + if len(image.shape) > 2: + image = np.sum(image, axis=2) # Convert to grayscale if it's a color image + return np.sum(image[roi[1] : roi[1] + roi[3], roi[0] : roi[0] + roi[2]], (0, 1)) -class IDSCamera(PSIDetectorBase): - USER_ACCESS = ["start_live_mode", "stop_live_mode"] +class IDSCamera(PSIDeviceBase): + """ " + #--------------------------------------------------------------------------------------------------------------------------------------- - custom_prepare_cls = IDSCustomPrepare + #Variables + hCam = ueye.HIDS(202) #0: first available camera; 1-254: The camera with the specified camera ID + sInfo = ueye.SENSORINFO() + cInfo = ueye.CAMINFO() + pcImageMemory = ueye.c_mem_p() + MemID = ueye.int() + rectAOI = ueye.IS_RECT() + pitch = ueye.INT() + nBitsPerPixel = ueye.INT(24) #24: bits per pixel for color mode; take 8 bits per pixel for monochrome + channels = 3 #3: channels for color mode(RGB); take 1 channel for monochrome + m_nColorMode = ueye.INT(1) # Y8/RGB16/RGB24/REG32 (1 for our color cameras) + bytes_per_pixel = int(nBitsPerPixel / 8) - image_data = Cpt(SetableSignal, value=np.empty((100, 100)), kind=Kind.omitted) + ids_cam + ... + """ - SUB_MONITOR = "device_monitor_2d" - _default_sub = SUB_MONITOR + USER_ACCESS = ["start_live_mode", "stop_live_mode", "set_roi", "width", "height"] + + image_data = Cpt(PreviewSignal, ndim=2, kind=Kind.omitted) + # roi_bot_left = Cpt(ROISignal, roi=(400, 525, 118, 105), kind=Kind.normal) + # roi_bot_right = Cpt(ROISignal, roi=(518, 525, 118, 105), kind=Kind.normal) + # roi_top_left = Cpt(ROISignal, roi=(400, 630, 118, 105), kind=Kind.normal) + # roi_top_right = Cpt(ROISignal, roi=(518, 630, 118, 105), kind=Kind.normal) + # roi_signal = Cpt(ROISignal, kind=Kind.normal, doc="Region of Interest signal") def __init__( self, @@ -234,19 +103,221 @@ class IDSCamera(PSIDetectorBase): channels: int, m_n_colormode: int, kind=None, - parent=None, device_manager=None, **kwargs, ): + super().__init__( - prefix=prefix, name=name, kind=kind, parent=parent, device_manager=device_manager, **kwargs + prefix=prefix, name=name, kind=kind, device_manager=device_manager, **kwargs ) self.camera_ID = camera_ID self.bits_per_pixel = bits_per_pixel + self.bytes_per_pixel = None self.channels = channels - self.m_n_colormode = m_n_colormode - #TODO fix connected and wait_for_connection - self.custom_prepare.on_connection_established() + self._m_n_colormode_input = m_n_colormode + self.m_n_colormode = None + self.ueye = ueye + self.h_cam = None + self.s_info = None + self.data_thread = None + self.c_info = None + self.pc_image_memory = None + self.mem_id = None + self.rect_aoi = None + self.pitch = None + self.n_bits_per_pixel = None + self.width = None + self.height = None + self.thread_event = threading.Event() + self.data_thread = None + self._roi: tuple | None = None # x, y, width, height + + def set_roi(self, x: int, y: int, width: int, height: int): + self._roi = (x, y, width, height) + + def start_backend(self): + if self.ueye is None: + raise ImportError("The pyueye library is not installed.") + self.h_cam = self.ueye.HIDS( + self.camera_ID + ) # 0: first available camera; 1-254: The camera with the specified camera ID + self.s_info = self.ueye.SENSORINFO() + self.c_info = self.ueye.CAMINFO() + self.pc_image_memory = self.ueye.c_mem_p() + self.mem_id = self.ueye.int() + self.rect_aoi = self.ueye.IS_RECT() + self.pitch = self.ueye.INT() + self.n_bits_per_pixel = self.ueye.INT( + self.bits_per_pixel + ) # 24: bits per pixel for color mode; take 8 bits per pixel for monochrome + self.m_n_colormode = self.ueye.INT( + self._m_n_colormode_input + ) # Y8/RGB16/RGB24/REG32 (1 for our color cameras) + self.bytes_per_pixel = int(self.n_bits_per_pixel / 8) + + # Starts the driver and establishes the connection to the camera + ret = self.ueye.is_InitCamera(self.h_cam, None) + if ret != self.ueye.IS_SUCCESS: + print("is_InitCamera ERROR") + + # Reads out the data hard-coded in the non-volatile camera memory and writes it to the data structure that c_info points to + ret = self.ueye.is_GetCameraInfo(self.h_cam, self.c_info) + if ret != self.ueye.IS_SUCCESS: + print("is_GetCameraInfo ERROR") + + # You can query additional information about the sensor type used in the camera + ret = self.ueye.is_GetSensorInfo(self.h_cam, self.s_info) + if ret != self.ueye.IS_SUCCESS: + print("is_GetSensorInfo ERROR") + + ret = self.ueye.is_ResetToDefault(self.h_cam) + if ret != self.ueye.IS_SUCCESS: + print("is_ResetToDefault ERROR") + + # Set display mode to DIB + ret = self.ueye.is_SetDisplayMode(self.h_cam, self.ueye.IS_SET_DM_DIB) + + # Set the right color mode + if ( + int.from_bytes(self.s_info.nColorMode.value, byteorder="big") + == self.ueye.IS_COLORMODE_BAYER + ): + # setup the color depth to the current windows setting + self.ueye.is_GetColorDepth(self.h_cam, self.n_bits_per_pixel, self.m_n_colormode) + bytes_per_pixel = int(self.n_bits_per_pixel / 8) + print("IS_COLORMODE_BAYER: ") + print("\tm_n_colormode: \t\t", self.m_n_colormode) + print("\tn_bits_per_pixel: \t\t", self.n_bits_per_pixel) + print("\tbytes_per_pixel: \t\t", bytes_per_pixel) + print() + + elif ( + int.from_bytes(self.s_info.nColorMode.value, byteorder="big") + == self.ueye.IS_COLORMODE_CBYCRY + ): + # for color camera models use RGB32 mode + m_n_colormode = self.ueye.IS_CM_BGRA8_PACKED + n_bits_per_pixel = self.ueye.INT(32) + bytes_per_pixel = int(self.n_bits_per_pixel / 8) + print("IS_COLORMODE_CBYCRY: ") + print("\tm_n_colormode: \t\t", m_n_colormode) + print("\tn_bits_per_pixel: \t\t", n_bits_per_pixel) + print("\tbytes_per_pixel: \t\t", bytes_per_pixel) + print() + + elif ( + int.from_bytes(self.s_info.nColorMode.value, byteorder="big") + == self.ueye.IS_COLORMODE_MONOCHROME + ): + # for color camera models use RGB32 mode + m_n_colormode = self.ueye.IS_CM_MONO8 + n_bits_per_pixel = self.ueye.INT(8) + bytes_per_pixel = int(n_bits_per_pixel / 8) + print("IS_COLORMODE_MONOCHROME: ") + print("\tm_n_colormode: \t\t", m_n_colormode) + print("\tn_bits_per_pixel: \t\t", n_bits_per_pixel) + print("\tbytes_per_pixel: \t\t", bytes_per_pixel) + print() + + else: + # for monochrome camera models use Y8 mode + m_n_colormode = self.ueye.IS_CM_MONO8 + n_bits_per_pixel = self.ueye.INT(8) + bytes_per_pixel = int(n_bits_per_pixel / 8) + print("else") + + # Can be used to set the size and position of an "area of interest"(AOI) within an image + ret = self.ueye.is_AOI( + self.h_cam, + self.ueye.IS_AOI_IMAGE_GET_AOI, + self.rect_aoi, + self.ueye.sizeof(self.rect_aoi), + ) + if ret != self.ueye.IS_SUCCESS: + print("is_AOI ERROR") + + self.width = self.rect_aoi.s32Width + self.height = self.rect_aoi.s32Height + + # Prints out some information about the camera and the sensor + print("Camera model:\t\t", self.s_info.strSensorName.decode("utf-8")) + print("Camera serial no.:\t", self.c_info.SerNo.decode("utf-8")) + print("Maximum image width:\t", self.width) + print("Maximum image height:\t", self.height) + print() + + # --------------------------------------------------------------------------------------------------------------------------------------- + + # Allocates an image memory for an image having its dimensions defined by width and height and its color depth defined by n_bits_per_pixel + ret = self.ueye.is_AllocImageMem( + self.h_cam, + self.width, + self.height, + self.n_bits_per_pixel, + self.pc_image_memory, + self.mem_id, + ) + if ret != self.ueye.IS_SUCCESS: + print("is_AllocImageMem ERROR") + else: + # Makes the specified image memory the active memory + ret = self.ueye.is_SetImageMem(self.h_cam, self.pc_image_memory, self.mem_id) + if ret != self.ueye.IS_SUCCESS: + print("is_SetImageMem ERROR") + else: + # Set the desired color mode + ret = self.ueye.is_SetColorMode(self.h_cam, self.m_n_colormode) + + # Activates the camera's live video mode (free run mode) + ret = self.ueye.is_CaptureVideo(self.h_cam, self.ueye.IS_DONT_WAIT) + if ret != self.ueye.IS_SUCCESS: + print("is_CaptureVideo ERROR") + + # Enables the queue mode for existing image memory sequences + ret = self.ueye.is_InquireImageMem( + self.h_cam, + self.pc_image_memory, + self.mem_id, + self.width, + self.height, + self.n_bits_per_pixel, + self.pitch, + ) + if ret != self.ueye.IS_SUCCESS: + print("is_InquireImageMem ERROR") + else: + print("Press q to leave the programm") + # startmeasureframerate = True + # Gain = False + + # Start live mode of camera immediately + self.start_live_mode() + + def _start_data_thread(self): + self.data_thread = threading.Thread(target=self._receive_data_from_camera, daemon=True) + self.data_thread.start() + + def _receive_data_from_camera(self): + while not self.thread_event.is_set(): + if self.ueye is None: + print("pyueye library not available.") + return + # In order to display the image in an OpenCV window we need to... + # ...extract the data of our image memory + array = self.ueye.get_data( + self.pc_image_memory, + self.width, + self.height, + self.n_bits_per_pixel, + self.pitch, + copy=False, + ) + + # ...reshape it in an numpy array... + frame = np.reshape(array, (self.height.value, self.width.value, self.bytes_per_pixel)) + self.image_data.put(frame) + + time.sleep(0.1) def wait_for_connection(self, all_signals=False, timeout=10): if ueye is None: @@ -254,226 +325,73 @@ class IDSCamera(PSIDetectorBase): "The pyueye library is not installed or doesn't provide the necessary c libs" ) super().wait_for_connection(all_signals, timeout) - #self.custom_prepare.on_connection_established() - - def destroy(self): - """Extend Ophyds destroy function to kill the data thread""" - self.stop_live_mode() - super().destroy() def start_live_mode(self): - if self.custom_prepare.data_thread is not None: + if self.data_thread is not None: self.stop_live_mode() - self.custom_prepare._start_data_thread() + self._start_data_thread() def stop_live_mode(self): """Stopping the camera live mode.""" - if self.custom_prepare.thread_event is not None: - self.custom_prepare.thread_event.set() - if self.custom_prepare.data_thread is not None: - self.custom_prepare.data_thread.join() - self.custom_prepare.thread_event = None - self.custom_prepare.data_thread = None + self.thread_event.set() + if self.data_thread is not None: + self.data_thread.join() + self.thread_event.clear() + self.data_thread = None + + ######################################## + # Beamline Specific Implementations # + ######################################## + + def on_init(self) -> None: + """ + Called when the device is initialized. + + No signals are connected at this point. If you like to + set default values on signals, please use on_connected instead. + """ + + def on_connected(self) -> None: + """ + Called after the device is connected and its signals are connected. + Default values for signals should be set here. + """ + self.start_backend() + self.start_live_mode() + + def on_stage(self) -> DeviceStatus | StatusBase | None: + """ + Called while staging the device. + + Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object. + """ + + def on_unstage(self) -> DeviceStatus | StatusBase | None: + """Called while unstaging the device.""" + + def on_pre_scan(self) -> DeviceStatus | StatusBase | None: + """Called right before the scan starts on all devices automatically.""" + + def on_trigger(self) -> DeviceStatus | StatusBase | None: + """Called when the device is triggered.""" + + def on_complete(self) -> DeviceStatus | StatusBase | None: + """Called to inquire if a device has completed a scans.""" + + def on_kickoff(self) -> DeviceStatus | StatusBase | None: + """Called to kickoff a device for a fly scan. Has to be called explicitly.""" + + def on_stop(self) -> None: + """Called when the device is stopped.""" + + def on_destroy(self) -> None: + """Called when the device is destroyed. Cleanup resources here.""" + self.stop_live_mode() -"""from pyueye import ueye -import numpy as np -import cv2 -import sys -import time +if __name__ == "__main__": + # Example usage + camera = IDSCamera(name="camera", camera_ID=201, bits_per_pixel=24, channels=3, m_n_colormode=1) + camera.wait_for_connection() -#--------------------------------------------------------------------------------------------------------------------------------------- - -#Variables -hCam = ueye.HIDS(202) #0: first available camera; 1-254: The camera with the specified camera ID -sInfo = ueye.SENSORINFO() -cInfo = ueye.CAMINFO() -pcImageMemory = ueye.c_mem_p() -MemID = ueye.int() -rectAOI = ueye.IS_RECT() -pitch = ueye.INT() -nBitsPerPixel = ueye.INT(24) #24: bits per pixel for color mode; take 8 bits per pixel for monochrome -channels = 3 #3: channels for color mode(RGB); take 1 channel for monochrome -m_nColorMode = ueye.INT(1) # Y8/RGB16/RGB24/REG32 (1 for our color cameras) -bytes_per_pixel = int(nBitsPerPixel / 8) - -ids_cam - ... - deviceConfig: - camera_ID: 202 - bits_per_pixel: 24 - channels: 3 - m_n_colormode: 1 - -#--------------------------------------------------------------------------------------------------------------------------------------- -print("START") -print() - -# Starts the driver and establishes the connection to the camera -nRet = ueye.is_InitCamera(hCam, None) -if nRet != ueye.IS_SUCCESS: - print("is_InitCamera ERROR") - -# Reads out the data hard-coded in the non-volatile camera memory and writes it to the data structure that cInfo points to -nRet = ueye.is_GetCameraInfo(hCam, cInfo) -if nRet != ueye.IS_SUCCESS: - print("is_GetCameraInfo ERROR") - -# You can query additional information about the sensor type used in the camera -nRet = ueye.is_GetSensorInfo(hCam, sInfo) -if nRet != ueye.IS_SUCCESS: - print("is_GetSensorInfo ERROR") - -nRet = ueye.is_ResetToDefault( hCam) -if nRet != ueye.IS_SUCCESS: - print("is_ResetToDefault ERROR") - -# Set display mode to DIB -nRet = ueye.is_SetDisplayMode(hCam, ueye.IS_SET_DM_DIB) - - - -# Set the right color mode -if int.from_bytes(sInfo.nColorMode.value, byteorder='big') == ueye.IS_COLORMODE_BAYER: - # setup the color depth to the current windows setting - ueye.is_GetColorDepth(hCam, nBitsPerPixel, m_nColorMode) - bytes_per_pixel = int(nBitsPerPixel / 8) - print("IS_COLORMODE_BAYER: ", ) - print("\tm_nColorMode: \t\t", m_nColorMode) - print("\tnBitsPerPixel: \t\t", nBitsPerPixel) - print("\tbytes_per_pixel: \t\t", bytes_per_pixel) - print() - -elif int.from_bytes(sInfo.nColorMode.value, byteorder='big') == ueye.IS_COLORMODE_CBYCRY: - # for color camera models use RGB32 mode - m_nColorMode = ueye.IS_CM_BGRA8_PACKED - nBitsPerPixel = ueye.INT(32) - bytes_per_pixel = int(nBitsPerPixel / 8) - print("IS_COLORMODE_CBYCRY: ", ) - print("\tm_nColorMode: \t\t", m_nColorMode) - print("\tnBitsPerPixel: \t\t", nBitsPerPixel) - print("\tbytes_per_pixel: \t\t", bytes_per_pixel) - print() - -elif int.from_bytes(sInfo.nColorMode.value, byteorder='big') == ueye.IS_COLORMODE_MONOCHROME: - # for color camera models use RGB32 mode - m_nColorMode = ueye.IS_CM_MONO8 - nBitsPerPixel = ueye.INT(8) - bytes_per_pixel = int(nBitsPerPixel / 8) - print("IS_COLORMODE_MONOCHROME: ", ) - print("\tm_nColorMode: \t\t", m_nColorMode) - print("\tnBitsPerPixel: \t\t", nBitsPerPixel) - print("\tbytes_per_pixel: \t\t", bytes_per_pixel) - print() - -else: - # for monochrome camera models use Y8 mode - m_nColorMode = ueye.IS_CM_MONO8 - nBitsPerPixel = ueye.INT(8) - bytes_per_pixel = int(nBitsPerPixel / 8) - print("else") - -# Can be used to set the size and position of an "area of interest"(AOI) within an image -nRet = ueye.is_AOI(hCam, ueye.IS_AOI_IMAGE_GET_AOI, rectAOI, ueye.sizeof(rectAOI)) -if nRet != ueye.IS_SUCCESS: - print("is_AOI ERROR") - -width = rectAOI.s32Width -height = rectAOI.s32Height - -# Prints out some information about the camera and the sensor -print("Camera model:\t\t", sInfo.strSensorName.decode('utf-8')) -print("Camera serial no.:\t", cInfo.SerNo.decode('utf-8')) -print("Maximum image width:\t", width) -print("Maximum image height:\t", height) -print() - -#--------------------------------------------------------------------------------------------------------------------------------------- - -# Allocates an image memory for an image having its dimensions defined by width and height and its color depth defined by nBitsPerPixel -nRet = ueye.is_AllocImageMem(hCam, width, height, nBitsPerPixel, pcImageMemory, MemID) -if nRet != ueye.IS_SUCCESS: - print("is_AllocImageMem ERROR") -else: - # Makes the specified image memory the active memory - nRet = ueye.is_SetImageMem(hCam, pcImageMemory, MemID) - if nRet != ueye.IS_SUCCESS: - print("is_SetImageMem ERROR") - else: - # Set the desired color mode - nRet = ueye.is_SetColorMode(hCam, m_nColorMode) - - - -# Activates the camera's live video mode (free run mode) -nRet = ueye.is_CaptureVideo(hCam, ueye.IS_DONT_WAIT) -if nRet != ueye.IS_SUCCESS: - print("is_CaptureVideo ERROR") - -# Enables the queue mode for existing image memory sequences -nRet = ueye.is_InquireImageMem(hCam, pcImageMemory, MemID, width, height, nBitsPerPixel, pitch) -if nRet != ueye.IS_SUCCESS: - print("is_InquireImageMem ERROR") -else: - print("Press q to leave the programm") -startmeasureframerate=True -Gain = False -#--------------------------------------------------------------------------------------------------------------------------------------- - -# Continuous image display -while(nRet == ueye.IS_SUCCESS): - - # In order to display the image in an OpenCV window we need to... - # ...extract the data of our image memory - array = ueye.get_data(pcImageMemory, width, height, nBitsPerPixel, pitch, copy=False) - - # bytes_per_pixel = int(nBitsPerPixel / 8) - - # ...reshape it in an numpy array... - frame = np.reshape(array,(height.value, width.value, bytes_per_pixel)) - - # ...resize the image by a half - frame = cv2.resize(frame,(0,0),fx=0.5, fy=0.5) - -#--------------------------------------------------------------------------------------------------------------------------------------- - #Include image data processing here - -#--------------------------------------------------------------------------------------------------------------------------------------- - - #...and finally display it - cv2.imshow("SimpleLive_Python_uEye_OpenCV", frame) - if startmeasureframerate: - starttime = time.time() - startmeasureframerate=False - framenumber=0 - if time.time() > starttime+5: - print(f"Caught {framenumber/5} frames per second") - startmeasureframerate=True - Gain = ~Gain - if Gain: - nRet = ueye.is_SetGainBoost(hCam, 1) - else: - nRet = ueye.is_SetGainBoost(hCam, 0) - print(f"Gain setting status {nRet}") - #...and finally display it - cv2.imshow("SimpleLive_Python_uEye_OpenCV", frame) - framenumber+=1 - time.sleep(0.1) - - # Press q if you want to end the loop - if (cv2.waitKey(1) & 0xFF) == ord('q'): - break -#--------------------------------------------------------------------------------------------------------------------------------------- - -# Releases an image memory that was allocated using is_AllocImageMem() and removes it from the driver management -ueye.is_FreeImageMem(hCam, pcImageMemory, MemID) - -# Disables the hCam camera handle and releases the data structures and memory areas taken up by the uEye camera -ueye.is_ExitCamera(hCam) - -# Destroys the OpenCv windows -cv2.destroyAllWindows() - -print() -print("END") -""" + camera.on_destroy() diff --git a/csaxs_bec/devices/ids_cameras/ids_camera_new.py b/csaxs_bec/devices/ids_cameras/ids_camera_new.py new file mode 100644 index 0000000..0fe2e5e --- /dev/null +++ b/csaxs_bec/devices/ids_cameras/ids_camera_new.py @@ -0,0 +1,285 @@ +"""IDS Camera class for cSAXS IDS cameras.""" + +from __future__ import annotations + +import threading +import time +from typing import TYPE_CHECKING, Literal, Tuple, TypedDict + +import numpy as np +from bec_lib import messages +from bec_lib.logger import bec_logger +from ophyd import Component as Cpt +from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase +from ophyd_devices.utils.bec_signals import AsyncSignal, PreviewSignal +from pydantic import BaseModel, field_validator + +from csaxs_bec.devices.ids_cameras.base_integration.camera import Camera + +if TYPE_CHECKING: + from bec_lib.devicemanager import ScanInfo + from pydantic import ValidationInfo + + +logger = bec_logger.logger + + +AnyDimShape = Tuple[int, ...] + + +class ROISpec(TypedDict): + """ + Typed dictionary representing the specification for a Region of Interest (ROI). + """ + + x: int + y: int + width: int + height: int + img_shape: AnyDimShape + mask: np.ndarray + + +def validate_roi(roi: ROISpec) -> ROISpec: + """ + Validate the ROI specification to ensure it matches the image shape. + + Args: + roi (ROISpec): The ROI specification to validate. + + Returns: + ROISpec: The validated ROI specification. + """ + if roi["mask"].shape != roi["img_shape"]: + raise ValueError( + f"Mask shape {roi['mask'].shape} does not match image shape {roi['img_shape']}." + ) + return roi + + +class IDSCamera(PSIDeviceBase): + """IDS Camera class for cSAXS. + + This class inherits from PSIDeviceBase and implements the necessary methods + to interact with the IDS camera using the pyueye library. + """ + + image = Cpt(PreviewSignal, name="image", ndim=2, doc="Preview signal for the camera.") + roi_signal = Cpt( + AsyncSignal, + name="roi_signal", + ndim=0, + max_size=1000, + doc="Signal for the region of interest (ROI).", + async_update={"type": "add", "max_shape": [None]}, + ) + + USER_ACCESS = ["live_mode", "roi", "get_last_image"] + + def __init__( + self, + *, + name: str, + camera_id: int, + prefix: str = "", + scan_info: ScanInfo | None = None, + m_n_colormode: Literal[0, 1, 2, 3] = 1, + bits_per_pixel: Literal[8, 24] = 24, + live_mode: bool = False, + roi: tuple[int, int, int, int] | None = None, + **kwargs, + ): + """Initialize the IDS Camera. + + Args: + name (str): Name of the device. + camera_id (int): The ID of the camera device. + prefix (str): Prefix for the device. + scan_info (ScanInfo | None): Scan information for the device. + m_n_colormode (Literal[0, 1, 2, 3]): Color mode for the camera. + bits_per_pixel (Literal[8, 24]): Number of bits per pixel for the camera. + live_mode (bool): Whether to enable live mode for the camera. + """ + super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs) + self._live_mode_thread: threading.Thread | None = None + self._stop_live_mode_event: threading.Event = threading.Event() + self.cam = Camera( + camera_id=camera_id, + m_n_colormode=m_n_colormode, + bits_per_pixel=bits_per_pixel, + connect=False, + ) + self._live_mode = False + self._inputs = {"roi": roi if roi else (0, 0, 1, 1), "live_mode": live_mode} + self._roi: ROISpec = validate_roi( + ROISpec( + { + "x": 0, + "y": 0, + "width": 1, + "height": 1, + "img_shape": (1, 1), + "mask": np.zeros((1, 1), dtype=np.uint8), + } + ) + ) + + ############## Live Mode Methods ############## + + @property + def roi(self) -> ROISpec: + """Return the current region of interest (ROI) for the camera.""" + return self._roi + + @roi.setter + def roi(self, value: ROISpec | tuple[int, int, int, int] | list[int, int, int, int]): + """ + Set the region of interest (ROI) for the camera. + + Args: + value (ROI | tuple[int, int, int, int] | list[int, int, int, int]): Either an ROI object, or a tuple or list with x, y, width, and height. + """ + if isinstance(value, (tuple, list)) and len(value) == 4: + x = value[0] + y = value[1] + width = value[2] + height = value[3] + if x + width > self.cam.cam.width.value or y + height > self.cam.cam.height.value: + raise ValueError("ROI exceeds camera dimensions.") + img_shape = (self.cam.cam.height.value, self.cam.cam.width.value) + mask = np.zeros(img_shape, dtype=np.uint8) + mask[y : y + height, x : x + width] = 1 + value = validate_roi( + ROISpec( + { + "x": x, + "y": y, + "width": width, + "height": height, + "img_shape": img_shape, + "mask": mask, + } + ) + ) + if not isinstance(value, dict) or not all( + key in value for key in ["x", "y", "width", "height", "img_shape", "mask"] + ): + raise TypeError(f"ROI must be an instance of ROISpec {value}.") + self._roi = value + + @property + def live_mode(self) -> bool: + """Return whether the camera is in live mode.""" + return self._live_mode + + @live_mode.setter + def live_mode(self, value: bool): + """Set the live mode for the camera.""" + if value != self._live_mode: + if self.cam._connected is False: # $ pylint: disable=protected-access + self.cam.on_connect() + self._live_mode = value + if value: + self._start_live() + else: + self._stop_live() + + def _start_live(self): + """Start the live mode for the camera.""" + if self._live_mode_thread is not None: + logger.info("Live mode thread is already running.") + return + self._stop_live_mode_event.clear() + self._live_mode_thread = threading.Thread( + target=self._live_mode_loop, args=(self._stop_live_mode_event,) + ) + self._live_mode_thread.start() + + def _stop_live(self): + """Stop the live mode for the camera.""" + if self._live_mode_thread is None: + logger.info("Live mode thread is not running.") + return + self._stop_live_mode_event.set() + self._live_mode_thread.join(timeout=5) + if self._live_mode_thread.is_alive(): + logger.warning("Live mode thread did not stop gracefully.") + else: + self._live_mode_thread = None + logger.info("Live mode stopped.") + + def _live_mode_loop(self, stop_event: threading.Event): + """Loop to capture images in live mode.""" + while not stop_event.is_set(): + try: + self.process_data(self.cam.get_image_data()) + except Exception as e: + logger.error(f"Error in live mode loop: {e}") + break + stop_event.wait(0.2) # 5 Hz + + def process_data(self, image: np.ndarray | None): + """Process the image data before sending it to the preview signal.""" + if image is None: + return + self.image.put(image) + + def get_last_image(self) -> np.ndarray: + image = self.image.get() + if image: + return image.data + + ############## User Interface Methods ############## + + def on_connected(self): + """Connect to the camera.""" + self.cam.on_connect() + self.live_mode = self._inputs.get("live_mode", None) + roi = self._inputs.get("roi", None) + if roi is None or not isinstance(roi, (tuple, list)) or not len(roi) == 4: + # If ROI is not set, use the full camera resolution + roi = (0, 0, self.cam.cam.width.value, self.cam.cam.height.value) + self.roi = roi + + def on_destroy(self): + """Clean up resources when the device is destroyed.""" + self.cam.on_disconnect() + super().on_destroy() + + def on_trigger(self): + """Handle the trigger event.""" + if not self.live_mode: + return + image = self.image.get() + if image is not None: + image: messages.DevicePreviewMessage + if self.roi["img_shape"][0:2] != image.data.shape[0:2]: + logger.info( + f"ROI shape does not match image shape, skipping ROI application for device {self.name}." + ) + return + + if len(image.data.shape) == 3: + # If the image has multiple channels, apply the mask to each channel + data = ( + image.data * self.roi["mask"][:, :, np.newaxis] + ) # Apply mask to the image data + n_channels = 3 + else: + data = image.data * self.roi["mask"] + n_channels = 1 + self.roi_signal.put( + { + self.roi_signal.name: { + "value": np.sum(data) + / (np.sum(self.roi["mask"]) * n_channels), # TODO can be optimized + "timestamp": time.time(), + } + } + ) + + +if __name__ == "__main__": + # Example usage of the IDSCamera class + camera = IDSCamera(name="TestCamera", camera_id=201, live_mode=False) + print(f"Camera {camera.name} initialized with ID {camera.cam.camera_id}.") diff --git a/csaxs_bec/devices/ids_cameras/ids_ueye_signals.py b/csaxs_bec/devices/ids_cameras/ids_ueye_signals.py deleted file mode 100644 index 9c1c215..0000000 --- a/csaxs_bec/devices/ids_cameras/ids_ueye_signals.py +++ /dev/null @@ -1,83 +0,0 @@ -import time - -import numpy as np -from bec_lib import bec_logger -from ophyd import Kind, Signal -from ophyd.utils import ReadOnlyError - -from ophyd_devices.utils.bec_device_base import BECDeviceBase - -logger = bec_logger.logger - -# Readout precision for Setable/ReadOnlySignal signals -PRECISION = 3 - - -class ReadOnlySignal(Signal): - """Setable signal for simulated devices. - - The signal will store the value in sim_state of the SimulatedData class of the parent device. - It will also return the value from sim_state when get is called. Compared to the ReadOnlySignal, - this signal can be written to. - The setable signal inherits from the Signal class of ophyd, thus the class attribute needs to be - initiated as a Component (class from ophyd). - - >>> signal = SetableSignal(name="signal", parent=parent, value=0) - - Parameters - ---------- - - name (string) : Name of the signal - parent (object) : Parent object of the signal, default none. - value (any) : Initial value of the signal, default 0. - kind (int) : Kind of the signal, default Kind.normal. - precision (float) : Precision of the signal, default PRECISION. - """ - - def __init__( - self, - name: str, - *args, - fcn: callable, - kind: int = Kind.normal, - precision: float = PRECISION, - **kwargs, - ): - super().__init__(*args, name=name, value=value, kind=kind, **kwargs) - self._metadata.update(connected=True, write_access=False) - self._value = None - self.precision = precision - self.fcn = fcn - - # pylint: disable=arguments-differ - def get(self): - """Get the current position of the simulated device. - - Core function for signal. - """ - self._value = self.fcn() - return self._value - - # pylint: disable=arguments-differ - def put(self, value): - """Put the value to the simulated device. - - Core function for signal. - """ - self._update_sim_state(value) - self._value = value - - def describe(self): - """Describe the readback signal. - - Core function for signal. - """ - res = super().describe() - if self.precision is not None: - res[self.name]["precision"] = self.precision - return res - - @property - def timestamp(self): - """Timestamp of the readback value""" - return self._get_timestamp()