From 415c601d2a9dbeb308ae378cf1fb1cd18b5859ac Mon Sep 17 00:00:00 2001 From: appel_c Date: Thu, 8 May 2025 16:31:49 +0200 Subject: [PATCH] feat(debye-base-cam): introduce base class for cameras at debye --- debye_bec/devices/cameras/debye_base_cam.py | 120 ++++++++++++++++++++ 1 file changed, 120 insertions(+) create mode 100644 debye_bec/devices/cameras/debye_base_cam.py diff --git a/debye_bec/devices/cameras/debye_base_cam.py b/debye_bec/devices/cameras/debye_base_cam.py new file mode 100644 index 0000000..4ead479 --- /dev/null +++ b/debye_bec/devices/cameras/debye_base_cam.py @@ -0,0 +1,120 @@ +"""Base class for Camera integration at Debye.""" + +from __future__ import annotations + +import threading +from typing import TYPE_CHECKING + +import numpy as np +from ophyd import DeviceStatus, StatusBase +from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase +from typeguard import typechecked + +if TYPE_CHECKING: # pragma: no cover + from bec_lib.devicemanager import ScanInfo + + +class DebyeBaseCamera(PSIDeviceBase): + """Base class for Debye cameras.""" + + USER_ACCESS = ["live_mode"] + + def __init__(self, *, name: str, prefix: str = "", scan_info: ScanInfo | None = None, **kwargs): + super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs) + self._update_frequency = 5 # Hz + self._live_mode = False + self._live_mode_event = None + self._task_status = None + self._n_rot90 = -1 + + @property + def live_mode(self) -> bool: + """Live mode status.""" + + @typechecked + @live_mode.setter + def live_mode(self, value: bool) -> None: + """ + Set the live mode status. + + Args: + value (bool): True to enable live mode, False to disable. + """ + if value == self._live_mode: + return + self._live_mode = value + if value: + self._start_live_mode() + else: + self._stop_live_mode() + + def _start_live_mode(self) -> None: + """Start live mode.""" + if self._live_mode_event is not None: # Kill task if it exists + self._live_mode_event.set() + self._live_mode_event = None + + self._live_mode_event = threading.Event() + self._task_status = self.task_handler.submit_task(task=self.emit_to_bec) + + def _stop_live_mode(self) -> None: + """Stop live mode.""" + if self._live_mode_event is not None: + self._live_mode_event.set() + self._live_mode_event = None + + def emit_to_bec(self): + """Emit the image data to BEC. If _live_mode_event is set, stop the task.""" + while not self._live_mode_event.wait(1 / self._update_frequency): + value = self.image1.array.data.get() + if value is None: + continue + width = self.image1.array_size.width.get() + height = self.image1.array_size.height.get() + # Geometry correction for the image + data = np.rot90(np.reshape(value, (height, width)), k=self._n_rot90, axes=(0, 1)) + self._run_subs(sub_type=self.SUB_DEVICE_MONITOR_2D, value=data) + + ######################################## + # Beamline Specific Implementations # + ######################################## + + def on_init(self) -> None: + """ + Called when the device is initialized. + + No signals are connected at this point. If you like to + set default values on signals, please use on_connected instead. + """ + + def on_connected(self) -> None: + """ + Called after the device is connected and its signals are connected. + Default values for signals should be set here. + """ + self.live_mode = True + + def on_stage(self) -> DeviceStatus | StatusBase | None: + """ + Called while staging the device. + + Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object. + """ + + def on_unstage(self) -> DeviceStatus | StatusBase | None: + """Called while unstaging the device.""" + + def on_pre_scan(self) -> DeviceStatus | StatusBase | None: + """Called right before the scan starts on all devices automatically.""" + + def on_trigger(self) -> DeviceStatus | StatusBase | None: + """Called when the device is triggered.""" + + def on_complete(self) -> DeviceStatus | StatusBase | None: + """Called to inquire if a device has completed a scans.""" + + def on_kickoff(self) -> DeviceStatus | StatusBase | None: + """Called to kickoff a device for a fly scan. Has to be called explicitly.""" + + def on_stop(self) -> None: + """Called when the device is stopped."""