feat(debye-base-cam): introduce base class for cameras at debye
This commit is contained in:
120
debye_bec/devices/cameras/debye_base_cam.py
Normal file
120
debye_bec/devices/cameras/debye_base_cam.py
Normal file
@@ -0,0 +1,120 @@
|
|||||||
|
"""Base class for Camera integration at Debye."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import threading
|
||||||
|
from typing import TYPE_CHECKING
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
from ophyd import DeviceStatus, StatusBase
|
||||||
|
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
|
||||||
|
from typeguard import typechecked
|
||||||
|
|
||||||
|
if TYPE_CHECKING: # pragma: no cover
|
||||||
|
from bec_lib.devicemanager import ScanInfo
|
||||||
|
|
||||||
|
|
||||||
|
class DebyeBaseCamera(PSIDeviceBase):
|
||||||
|
"""Base class for Debye cameras."""
|
||||||
|
|
||||||
|
USER_ACCESS = ["live_mode"]
|
||||||
|
|
||||||
|
def __init__(self, *, name: str, prefix: str = "", scan_info: ScanInfo | None = None, **kwargs):
|
||||||
|
super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs)
|
||||||
|
self._update_frequency = 5 # Hz
|
||||||
|
self._live_mode = False
|
||||||
|
self._live_mode_event = None
|
||||||
|
self._task_status = None
|
||||||
|
self._n_rot90 = -1
|
||||||
|
|
||||||
|
@property
|
||||||
|
def live_mode(self) -> bool:
|
||||||
|
"""Live mode status."""
|
||||||
|
|
||||||
|
@typechecked
|
||||||
|
@live_mode.setter
|
||||||
|
def live_mode(self, value: bool) -> None:
|
||||||
|
"""
|
||||||
|
Set the live mode status.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
value (bool): True to enable live mode, False to disable.
|
||||||
|
"""
|
||||||
|
if value == self._live_mode:
|
||||||
|
return
|
||||||
|
self._live_mode = value
|
||||||
|
if value:
|
||||||
|
self._start_live_mode()
|
||||||
|
else:
|
||||||
|
self._stop_live_mode()
|
||||||
|
|
||||||
|
def _start_live_mode(self) -> None:
|
||||||
|
"""Start live mode."""
|
||||||
|
if self._live_mode_event is not None: # Kill task if it exists
|
||||||
|
self._live_mode_event.set()
|
||||||
|
self._live_mode_event = None
|
||||||
|
|
||||||
|
self._live_mode_event = threading.Event()
|
||||||
|
self._task_status = self.task_handler.submit_task(task=self.emit_to_bec)
|
||||||
|
|
||||||
|
def _stop_live_mode(self) -> None:
|
||||||
|
"""Stop live mode."""
|
||||||
|
if self._live_mode_event is not None:
|
||||||
|
self._live_mode_event.set()
|
||||||
|
self._live_mode_event = None
|
||||||
|
|
||||||
|
def emit_to_bec(self):
|
||||||
|
"""Emit the image data to BEC. If _live_mode_event is set, stop the task."""
|
||||||
|
while not self._live_mode_event.wait(1 / self._update_frequency):
|
||||||
|
value = self.image1.array.data.get()
|
||||||
|
if value is None:
|
||||||
|
continue
|
||||||
|
width = self.image1.array_size.width.get()
|
||||||
|
height = self.image1.array_size.height.get()
|
||||||
|
# Geometry correction for the image
|
||||||
|
data = np.rot90(np.reshape(value, (height, width)), k=self._n_rot90, axes=(0, 1))
|
||||||
|
self._run_subs(sub_type=self.SUB_DEVICE_MONITOR_2D, value=data)
|
||||||
|
|
||||||
|
########################################
|
||||||
|
# Beamline Specific Implementations #
|
||||||
|
########################################
|
||||||
|
|
||||||
|
def on_init(self) -> None:
|
||||||
|
"""
|
||||||
|
Called when the device is initialized.
|
||||||
|
|
||||||
|
No signals are connected at this point. If you like to
|
||||||
|
set default values on signals, please use on_connected instead.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def on_connected(self) -> None:
|
||||||
|
"""
|
||||||
|
Called after the device is connected and its signals are connected.
|
||||||
|
Default values for signals should be set here.
|
||||||
|
"""
|
||||||
|
self.live_mode = True
|
||||||
|
|
||||||
|
def on_stage(self) -> DeviceStatus | StatusBase | None:
|
||||||
|
"""
|
||||||
|
Called while staging the device.
|
||||||
|
|
||||||
|
Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def on_unstage(self) -> DeviceStatus | StatusBase | None:
|
||||||
|
"""Called while unstaging the device."""
|
||||||
|
|
||||||
|
def on_pre_scan(self) -> DeviceStatus | StatusBase | None:
|
||||||
|
"""Called right before the scan starts on all devices automatically."""
|
||||||
|
|
||||||
|
def on_trigger(self) -> DeviceStatus | StatusBase | None:
|
||||||
|
"""Called when the device is triggered."""
|
||||||
|
|
||||||
|
def on_complete(self) -> DeviceStatus | StatusBase | None:
|
||||||
|
"""Called to inquire if a device has completed a scans."""
|
||||||
|
|
||||||
|
def on_kickoff(self) -> DeviceStatus | StatusBase | None:
|
||||||
|
"""Called to kickoff a device for a fly scan. Has to be called explicitly."""
|
||||||
|
|
||||||
|
def on_stop(self) -> None:
|
||||||
|
"""Called when the device is stopped."""
|
||||||
Reference in New Issue
Block a user