diff --git a/debye_bec/devices/cameras/basler_cam.py b/debye_bec/devices/cameras/basler_cam.py index 746ee7e..c53fb29 100644 --- a/debye_bec/devices/cameras/basler_cam.py +++ b/debye_bec/devices/cameras/basler_cam.py @@ -1,43 +1,38 @@ +"""Basler camera class for Debye BEC.""" + from __future__ import annotations -import time from typing import TYPE_CHECKING -import numpy as np from ophyd import ADBase from ophyd import ADComponent as ADCpt from ophyd_devices.devices.areadetector.cam import AravisDetectorCam from ophyd_devices.devices.areadetector.plugins import ImagePlugin_V35 -from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase -if TYPE_CHECKING: +from debye_bec.devices.cameras.debye_base_cam import DebyeBaseCamera + +if TYPE_CHECKING: # pragma: no cover from bec_lib.devicemanager import ScanInfo class BaslerCamBase(ADBase): + """BaslerCam Base class.""" + cam1 = ADCpt(AravisDetectorCam, "cam1:") image1 = ADCpt(ImagePlugin_V35, "image1:") -class BaslerCam(PSIDeviceBase, BaslerCamBase): - - # preview_2d = PSIComponent(SetableSignal, signal_type=SignalType.PREVIEW, ndim=2, kind=Kind.omitted) +class BaslerCam(DebyeBaseCamera, BaslerCamBase): + """Basler camera class at Debye. IOC prefix: X01DA-ES-XRAYEYE:""" def __init__(self, *, name: str, prefix: str = "", scan_info: ScanInfo | None = None, **kwargs): + """ + Initialize the Prosilica camera class. + + Args: + name (str): Name of the camera. + prefix (str): IOC prefix. + scan_info (ScanInfo): The scan info to use. + """ super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs) - self.last_emit = time.time() - self.update_frequency = 5 # Hz - - def emit_to_bec(self, *args, obj=None, old_value=None, value=None, **kwargs): - if (time.time() - self.last_emit) < (1 / self.update_frequency): - return # Check logic - width = self.image1.array_size.width.get() - height = self.image1.array_size.height.get() - data = np.rot90(np.reshape(value, (height, width)), k=-1, axes=(0, 1)) - - # self.preview_2d.put(data) - self._run_subs(sub_type=self.SUB_DEVICE_MONITOR_2D, value=data) - self.last_emit = time.time() - - def on_connected(self): - self.image1.array_data.subscribe(self.emit_to_bec, run=False) + self._n_rot90 = -1 # Rotate the image by -90 degrees diff --git a/debye_bec/devices/cameras/prosilica_cam.py b/debye_bec/devices/cameras/prosilica_cam.py index 472ec00..a8cab07 100644 --- a/debye_bec/devices/cameras/prosilica_cam.py +++ b/debye_bec/devices/cameras/prosilica_cam.py @@ -1,41 +1,41 @@ +"""Prosilica camera class for integration of beam_monitor 1/2 cameras.""" + from __future__ import annotations -import time from typing import TYPE_CHECKING -import numpy as np from ophyd import ADBase from ophyd import ADComponent as ADCpt -from ophyd import Component as Cpt -from ophyd import Device from ophyd_devices.devices.areadetector.cam import ProsilicaDetectorCam from ophyd_devices.devices.areadetector.plugins import ImagePlugin_V35 -from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase + +from debye_bec.devices.cameras.debye_base_cam import DebyeBaseCamera if TYPE_CHECKING: # pragma: no cover from bec_lib.devicemanager import ScanInfo class ProsilicaCamBase(ADBase): + """Base class for Prosilica cameras.""" + cam1 = ADCpt(ProsilicaDetectorCam, "cam1:") image1 = ADCpt(ImagePlugin_V35, "image1:") -class ProsilicaCam(PSIDeviceBase, ProsilicaCamBase): +class ProsilicaCam(DebyeBaseCamera, ProsilicaCamBase): + """ + Prosilica camera class, for integration of beam_monitor 1/2 cameras. + Prefixes are: X01DA-OP-GIGE02: and X01DA-OP-GIGE01: + """ def __init__(self, *, name: str, prefix: str = "", scan_info: ScanInfo | None = None, **kwargs): + """ + Initialize the Prosilica camera class. + + Args: + name (str): Name of the camera. + prefix (str): IOC prefix. + scan_info (ScanInfo): The scan info to use. + """ super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs) - self.last_emit = time.time() - self.update_frequency = 5 # Hz - - def emit_to_bec(self, *args, obj=None, old_value=None, value=None, **kwargs): - if (time.time() - self.last_emit) < (1 / self.update_frequency): - return # Check logic - width = self.image1.array_size.width.get() - height = self.image1.array_size.height.get() - data = np.rot90(np.reshape(value, (height, width)), k=-1, axes=(0, 1)) - self._run_subs(sub_type=self.SUB_DEVICE_MONITOR_2D, value=data) - self.last_emit = time.time() - - def on_connected(self): - self.image1.array_data.subscribe(self.emit_to_bec, run=False) + self._n_rot90 = -1 # Rotate the image by -90 degrees