refactor: refactor basler and prosilica cameras
This commit is contained in:
@@ -1,43 +1,38 @@
|
||||
"""Basler camera class for Debye BEC."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import numpy as np
|
||||
from ophyd import ADBase
|
||||
from ophyd import ADComponent as ADCpt
|
||||
from ophyd_devices.devices.areadetector.cam import AravisDetectorCam
|
||||
from ophyd_devices.devices.areadetector.plugins import ImagePlugin_V35
|
||||
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from debye_bec.devices.cameras.debye_base_cam import DebyeBaseCamera
|
||||
|
||||
if TYPE_CHECKING: # pragma: no cover
|
||||
from bec_lib.devicemanager import ScanInfo
|
||||
|
||||
|
||||
class BaslerCamBase(ADBase):
|
||||
"""BaslerCam Base class."""
|
||||
|
||||
cam1 = ADCpt(AravisDetectorCam, "cam1:")
|
||||
image1 = ADCpt(ImagePlugin_V35, "image1:")
|
||||
|
||||
|
||||
class BaslerCam(PSIDeviceBase, BaslerCamBase):
|
||||
|
||||
# preview_2d = PSIComponent(SetableSignal, signal_type=SignalType.PREVIEW, ndim=2, kind=Kind.omitted)
|
||||
class BaslerCam(DebyeBaseCamera, BaslerCamBase):
|
||||
"""Basler camera class at Debye. IOC prefix: X01DA-ES-XRAYEYE:"""
|
||||
|
||||
def __init__(self, *, name: str, prefix: str = "", scan_info: ScanInfo | None = None, **kwargs):
|
||||
"""
|
||||
Initialize the Prosilica camera class.
|
||||
|
||||
Args:
|
||||
name (str): Name of the camera.
|
||||
prefix (str): IOC prefix.
|
||||
scan_info (ScanInfo): The scan info to use.
|
||||
"""
|
||||
super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs)
|
||||
self.last_emit = time.time()
|
||||
self.update_frequency = 5 # Hz
|
||||
|
||||
def emit_to_bec(self, *args, obj=None, old_value=None, value=None, **kwargs):
|
||||
if (time.time() - self.last_emit) < (1 / self.update_frequency):
|
||||
return # Check logic
|
||||
width = self.image1.array_size.width.get()
|
||||
height = self.image1.array_size.height.get()
|
||||
data = np.rot90(np.reshape(value, (height, width)), k=-1, axes=(0, 1))
|
||||
|
||||
# self.preview_2d.put(data)
|
||||
self._run_subs(sub_type=self.SUB_DEVICE_MONITOR_2D, value=data)
|
||||
self.last_emit = time.time()
|
||||
|
||||
def on_connected(self):
|
||||
self.image1.array_data.subscribe(self.emit_to_bec, run=False)
|
||||
self._n_rot90 = -1 # Rotate the image by -90 degrees
|
||||
|
||||
@@ -1,41 +1,41 @@
|
||||
"""Prosilica camera class for integration of beam_monitor 1/2 cameras."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import numpy as np
|
||||
from ophyd import ADBase
|
||||
from ophyd import ADComponent as ADCpt
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd import Device
|
||||
from ophyd_devices.devices.areadetector.cam import ProsilicaDetectorCam
|
||||
from ophyd_devices.devices.areadetector.plugins import ImagePlugin_V35
|
||||
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
|
||||
|
||||
from debye_bec.devices.cameras.debye_base_cam import DebyeBaseCamera
|
||||
|
||||
if TYPE_CHECKING: # pragma: no cover
|
||||
from bec_lib.devicemanager import ScanInfo
|
||||
|
||||
|
||||
class ProsilicaCamBase(ADBase):
|
||||
"""Base class for Prosilica cameras."""
|
||||
|
||||
cam1 = ADCpt(ProsilicaDetectorCam, "cam1:")
|
||||
image1 = ADCpt(ImagePlugin_V35, "image1:")
|
||||
|
||||
|
||||
class ProsilicaCam(PSIDeviceBase, ProsilicaCamBase):
|
||||
class ProsilicaCam(DebyeBaseCamera, ProsilicaCamBase):
|
||||
"""
|
||||
Prosilica camera class, for integration of beam_monitor 1/2 cameras.
|
||||
Prefixes are: X01DA-OP-GIGE02: and X01DA-OP-GIGE01:
|
||||
"""
|
||||
|
||||
def __init__(self, *, name: str, prefix: str = "", scan_info: ScanInfo | None = None, **kwargs):
|
||||
"""
|
||||
Initialize the Prosilica camera class.
|
||||
|
||||
Args:
|
||||
name (str): Name of the camera.
|
||||
prefix (str): IOC prefix.
|
||||
scan_info (ScanInfo): The scan info to use.
|
||||
"""
|
||||
super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs)
|
||||
self.last_emit = time.time()
|
||||
self.update_frequency = 5 # Hz
|
||||
|
||||
def emit_to_bec(self, *args, obj=None, old_value=None, value=None, **kwargs):
|
||||
if (time.time() - self.last_emit) < (1 / self.update_frequency):
|
||||
return # Check logic
|
||||
width = self.image1.array_size.width.get()
|
||||
height = self.image1.array_size.height.get()
|
||||
data = np.rot90(np.reshape(value, (height, width)), k=-1, axes=(0, 1))
|
||||
self._run_subs(sub_type=self.SUB_DEVICE_MONITOR_2D, value=data)
|
||||
self.last_emit = time.time()
|
||||
|
||||
def on_connected(self):
|
||||
self.image1.array_data.subscribe(self.emit_to_bec, run=False)
|
||||
self._n_rot90 = -1 # Rotate the image by -90 degrees
|
||||
|
||||
Reference in New Issue
Block a user