fix: debug with devices, throttle live_view to 1Hz

This commit is contained in:
gac-x01da
2025-05-08 18:25:35 +02:00
committed by appel_c
parent ca2cf40d6a
commit 10b0608d31

View File

@@ -6,12 +6,18 @@ import threading
from typing import TYPE_CHECKING
import numpy as np
import time
from bec_lib.logger import bec_logger
from ophyd import DeviceStatus, StatusBase
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from typeguard import typechecked
if TYPE_CHECKING: # pragma: no cover
from bec_lib.devicemanager import ScanInfo
from ophyd_devices.devices.areadetector.plugins import ImagePlugin_V35
logger = bec_logger.logger
class DebyeBaseCamera(PSIDeviceBase):
@@ -21,7 +27,8 @@ class DebyeBaseCamera(PSIDeviceBase):
def __init__(self, *, name: str, prefix: str = "", scan_info: ScanInfo | None = None, **kwargs):
super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs)
self._update_frequency = 5 # Hz
self.image1 : "ImagePlugin_V35"
self._update_frequency = 1 # Hz
self._live_mode = False
self._live_mode_event = None
self._task_status = None
@@ -30,6 +37,7 @@ class DebyeBaseCamera(PSIDeviceBase):
@property
def live_mode(self) -> bool:
"""Live mode status."""
return self._live_mode
@typechecked
@live_mode.setter
@@ -53,6 +61,9 @@ class DebyeBaseCamera(PSIDeviceBase):
if self._live_mode_event is not None: # Kill task if it exists
self._live_mode_event.set()
self._live_mode_event = None
if self._task_status is not None:
self.task_handler.kill_task(task_status=self._task_status)
self._task_status = None
self._live_mode_event = threading.Event()
self._task_status = self.task_handler.submit_task(task=self.emit_to_bec)
@@ -66,7 +77,7 @@ class DebyeBaseCamera(PSIDeviceBase):
def emit_to_bec(self):
"""Emit the image data to BEC. If _live_mode_event is set, stop the task."""
while not self._live_mode_event.wait(1 / self._update_frequency):
value = self.image1.array.data.get()
value = self.image1.array_data.get()
if value is None:
continue
width = self.image1.array_size.width.get()