fix(camera): add throttled update to cameras

This commit is contained in:
gac-x01da
2025-05-01 11:04:31 +02:00
committed by appel_c
parent c074240444
commit c164414631
2 changed files with 42 additions and 5 deletions
+25 -4
View File
@@ -1,12 +1,21 @@
from __future__ import annotations
import time
from typing import TYPE_CHECKING
# from ophyd_devices.sim.sim_signals import SetableSignal
# from ophyd_devices.utils.psi_component import PSIComponent, SignalType
import numpy as np
from ophyd import ADBase
from ophyd import ADComponent as ADCpt
from ophyd import Component as Cpt
from ophyd import Device
from ophyd_devices.devices.areadetector.cam import ProsilicaDetectorCam
from ophyd import Kind
from ophyd_devices.devices.areadetector.cam import AravisDetectorCam
from ophyd_devices.devices.areadetector.plugins import ImagePlugin_V35
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
if TYPE_CHECKING:
from bec_lib.devicemanager import ScanInfo
class BaslerDetectorCam(ProsilicaDetectorCam):
@@ -20,11 +29,23 @@ class BaslerCamBase(ADBase):
class BaslerCam(PSIDeviceBase, BaslerCamBase):
# preview_2d = PSIComponent(SetableSignal, signal_type=SignalType.PREVIEW, ndim=2, kind=Kind.omitted)
def __init__(self, *, name: str, prefix: str = "", scan_info: ScanInfo | None = None, **kwargs):
super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs)
self.last_emit = time.time()
self.update_frequency = 5 # Hz
def emit_to_bec(self, *args, obj=None, old_value=None, value=None, **kwargs):
if (time.time() - self.last_emit) < (1 / self.update_frequency):
return # Check logic
width = self.image1.array_size.width.get()
height = self.image1.array_size.height.get()
data = np.reshape(value, (height, width))
data = np.rot90(np.reshape(value, (height, width)), k=-1, axes=(0, 1))
# self.preview_2d.put(data)
self._run_subs(sub_type=self.SUB_DEVICE_MONITOR_2D, value=data)
self.last_emit = time.time()
def on_connected(self):
self.image1.array_data.subscribe(self.emit_to_bec, run=False)
+17 -1
View File
@@ -1,3 +1,8 @@
from __future__ import annotations
import time
from typing import TYPE_CHECKING
import numpy as np
from ophyd import ADBase
from ophyd import ADComponent as ADCpt
@@ -7,6 +12,9 @@ from ophyd_devices.devices.areadetector.cam import ProsilicaDetectorCam
from ophyd_devices.devices.areadetector.plugins import ImagePlugin_V35
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
if TYPE_CHECKING: # pragma: no cover
from bec_lib.devicemanager import ScanInfo
class ProsilicaCamBase(ADBase):
cam1 = ADCpt(ProsilicaDetectorCam, "cam1:")
@@ -15,11 +23,19 @@ class ProsilicaCamBase(ADBase):
class ProsilicaCam(PSIDeviceBase, ProsilicaCamBase):
def __init__(self, *, name: str, prefix: str = "", scan_info: ScanInfo | None = None, **kwargs):
super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs)
self.last_emit = time.time()
self.update_frequency = 5 # Hz
def emit_to_bec(self, *args, obj=None, old_value=None, value=None, **kwargs):
if (time.time() - self.last_emit) < (1 / self.update_frequency):
return # Check logic
width = self.image1.array_size.width.get()
height = self.image1.array_size.height.get()
data = np.reshape(value, (height, width))
data = np.rot90(np.reshape(value, (height, width)), k=-1, axes=(0, 1))
self._run_subs(sub_type=self.SUB_DEVICE_MONITOR_2D, value=data)
self.last_emit = time.time()
def on_connected(self):
self.image1.array_data.subscribe(self.emit_to_bec, run=False)