feat(debye-base-cam): introduce base class for cameras at debye
This commit is contained in:
120
debye_bec/devices/cameras/debye_base_cam.py
Normal file
120
debye_bec/devices/cameras/debye_base_cam.py
Normal file
@@ -0,0 +1,120 @@
|
||||
"""Base class for Camera integration at Debye."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import threading
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import numpy as np
|
||||
from ophyd import DeviceStatus, StatusBase
|
||||
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
|
||||
from typeguard import typechecked
|
||||
|
||||
if TYPE_CHECKING: # pragma: no cover
|
||||
from bec_lib.devicemanager import ScanInfo
|
||||
|
||||
|
||||
class DebyeBaseCamera(PSIDeviceBase):
|
||||
"""Base class for Debye cameras."""
|
||||
|
||||
USER_ACCESS = ["live_mode"]
|
||||
|
||||
def __init__(self, *, name: str, prefix: str = "", scan_info: ScanInfo | None = None, **kwargs):
|
||||
super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs)
|
||||
self._update_frequency = 5 # Hz
|
||||
self._live_mode = False
|
||||
self._live_mode_event = None
|
||||
self._task_status = None
|
||||
self._n_rot90 = -1
|
||||
|
||||
@property
|
||||
def live_mode(self) -> bool:
|
||||
"""Live mode status."""
|
||||
|
||||
@typechecked
|
||||
@live_mode.setter
|
||||
def live_mode(self, value: bool) -> None:
|
||||
"""
|
||||
Set the live mode status.
|
||||
|
||||
Args:
|
||||
value (bool): True to enable live mode, False to disable.
|
||||
"""
|
||||
if value == self._live_mode:
|
||||
return
|
||||
self._live_mode = value
|
||||
if value:
|
||||
self._start_live_mode()
|
||||
else:
|
||||
self._stop_live_mode()
|
||||
|
||||
def _start_live_mode(self) -> None:
|
||||
"""Start live mode."""
|
||||
if self._live_mode_event is not None: # Kill task if it exists
|
||||
self._live_mode_event.set()
|
||||
self._live_mode_event = None
|
||||
|
||||
self._live_mode_event = threading.Event()
|
||||
self._task_status = self.task_handler.submit_task(task=self.emit_to_bec)
|
||||
|
||||
def _stop_live_mode(self) -> None:
|
||||
"""Stop live mode."""
|
||||
if self._live_mode_event is not None:
|
||||
self._live_mode_event.set()
|
||||
self._live_mode_event = None
|
||||
|
||||
def emit_to_bec(self):
|
||||
"""Emit the image data to BEC. If _live_mode_event is set, stop the task."""
|
||||
while not self._live_mode_event.wait(1 / self._update_frequency):
|
||||
value = self.image1.array.data.get()
|
||||
if value is None:
|
||||
continue
|
||||
width = self.image1.array_size.width.get()
|
||||
height = self.image1.array_size.height.get()
|
||||
# Geometry correction for the image
|
||||
data = np.rot90(np.reshape(value, (height, width)), k=self._n_rot90, axes=(0, 1))
|
||||
self._run_subs(sub_type=self.SUB_DEVICE_MONITOR_2D, value=data)
|
||||
|
||||
########################################
|
||||
# Beamline Specific Implementations #
|
||||
########################################
|
||||
|
||||
def on_init(self) -> None:
|
||||
"""
|
||||
Called when the device is initialized.
|
||||
|
||||
No signals are connected at this point. If you like to
|
||||
set default values on signals, please use on_connected instead.
|
||||
"""
|
||||
|
||||
def on_connected(self) -> None:
|
||||
"""
|
||||
Called after the device is connected and its signals are connected.
|
||||
Default values for signals should be set here.
|
||||
"""
|
||||
self.live_mode = True
|
||||
|
||||
def on_stage(self) -> DeviceStatus | StatusBase | None:
|
||||
"""
|
||||
Called while staging the device.
|
||||
|
||||
Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object.
|
||||
"""
|
||||
|
||||
def on_unstage(self) -> DeviceStatus | StatusBase | None:
|
||||
"""Called while unstaging the device."""
|
||||
|
||||
def on_pre_scan(self) -> DeviceStatus | StatusBase | None:
|
||||
"""Called right before the scan starts on all devices automatically."""
|
||||
|
||||
def on_trigger(self) -> DeviceStatus | StatusBase | None:
|
||||
"""Called when the device is triggered."""
|
||||
|
||||
def on_complete(self) -> DeviceStatus | StatusBase | None:
|
||||
"""Called to inquire if a device has completed a scans."""
|
||||
|
||||
def on_kickoff(self) -> DeviceStatus | StatusBase | None:
|
||||
"""Called to kickoff a device for a fly scan. Has to be called explicitly."""
|
||||
|
||||
def on_stop(self) -> None:
|
||||
"""Called when the device is stopped."""
|
||||
Reference in New Issue
Block a user