diff --git a/csaxs_bec/devices/epics/allied_vision_camera.py b/csaxs_bec/devices/epics/allied_vision_camera.py new file mode 100644 index 0000000..bcba5e0 --- /dev/null +++ b/csaxs_bec/devices/epics/allied_vision_camera.py @@ -0,0 +1,129 @@ +"""Module for the EPICS integration of the AlliedVision Camera via Vimba SDK.""" + +import threading +import traceback +from enum import IntEnum + +import numpy as np +from bec_lib.logger import bec_logger +from ophyd import Component as Cpt +from ophyd.areadetector import ADComponent as ADCpt +from ophyd.areadetector import DetectorBase +from ophyd_devices import PreviewSignal +from ophyd_devices.devices.areadetector.cam import VimbaDetectorCam +from ophyd_devices.devices.areadetector.plugins import ImagePlugin_V35 as ImagePlugin +from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase +from typeguard import typechecked + +logger = bec_logger.logger + + +class ACQUIRE_MODES(IntEnum): + """Acquiring enums for Allied Vision Camera""" + + ACQUIRING = 1 + DONE = 0 + + +class AlliedVisionCamera(PSIDeviceBase, DetectorBase): + """ + Epics Area Detector interface for the Allied Vision Alvium G1-507m camera via Vimba SDK. + The IOC runs with under the prefix: 'X12SA-GIGECAM-AV1:'. + """ + + USER_ACCESS = ["start_live_mode", "stop_live_mode"] + + cam = ADCpt(VimbaDetectorCam, "cam1:") + image = ADCpt(ImagePlugin, "image1:") + + preview = Cpt( + PreviewSignal, + name="preview", + ndim=2, + num_rotation_90=0, + doc="Preview signal of the AlliedVision camera.", + ) + + def __init__( + self, + *, + name: str, + prefix: str, + poll_rate: int = 5, + scan_info=None, + device_manager=None, + **kwargs, + ): + super().__init__( + name=name, prefix=prefix, scan_info=scan_info, device_manager=device_manager, **kwargs + ) + self._poll_thread = threading.Thread( + target=self._poll_array_data, daemon=True, name=f"{self.name}_poll_thread" + ) + self._poll_thread_kill_event = threading.Event() + self._poll_start_event = threading.Event() + if poll_rate > 10: + logger.warning(f"Poll rate too high for Camera {self.name}, setting to 10 Hz max.") + poll_rate = 10 + self._poll_rate = poll_rate + self._unique_array_id = 0 + self._pv_timeout = 2.0 + self.r_lock = threading.RLock() + self.image: ImagePlugin + + def start_live_mode(self) -> None: + """Start live mode.""" + if not self._poll_start_event.is_set(): + self._poll_start_event.set() + self.cam.acquire.put(ACQUIRE_MODES.ACQUIRING.value) # Start acquisition + else: + logger.info(f"Live mode already started for {self.name}.") + + def stop_live_mode(self) -> None: + """Stop live mode.""" + if self._poll_start_event.is_set(): + self._poll_start_event.clear() + self.cam.acquire.put(ACQUIRE_MODES.DONE.value) # Stop acquisition + else: + logger.info(f"Live mode already stopped for {self.name}.") + + def on_connected(self): + """Reset the unique array ID on connection.""" + self.cam.array_counter.set(0).wait(timeout=self._pv_timeout) + self.cam.array_callbacks.set(1).wait(timeout=self._pv_timeout) + self._poll_thread.start() + + def _poll_array_data(self): + """Poll the array data for preview updates.""" + while not self._poll_thread_kill_event.wait(1 / self._poll_rate): + while self._poll_start_event.wait(): + try: + # First check if there is a new image + if self.image.unique_id.get() != self._unique_array_id: + self._unique_array_id = self.image.unique_id.get() + else: + continue # No new image, skip update + # Get new image data + value = self.image.array_data.get() + if value is None: + logger.info(f"No image data available for preview of {self.name}") + continue + + array_size = self.image.array_size.get() + if array_size[0] == 0: # 2D image, not color image + array_size = array_size[1:] + # Geometry correction for the image + data = np.reshape(value, array_size) + self.preview.put(data) + except Exception: # pylint: disable=broad-except + content = traceback.format_exc() + logger.error( + f"Error while polling array data for preview of {self.name}: {content}" + ) + + def on_destroy(self): + """Stop the polling thread on destruction.""" + self._poll_thread_kill_event.set() + self._poll_start_event.set() + if self._poll_thread.is_alive(): + self._poll_thread.join(timeout=2)