feat(falcon): Minimal integration of the Falcon, forwarding array data.
CI for debye_bec / test (push) Successful in 57s
CI for debye_bec / test (pull_request) Successful in 55s

This commit is contained in:
2026-06-01 13:13:34 +02:00
parent 40309491b0
commit a693ae0f05
2 changed files with 124 additions and 0 deletions
+124
View File
@@ -0,0 +1,124 @@
"""FALCON device implementation for SuperXAS"""
from __future__ import annotations
import enum
import traceback
from typing import TYPE_CHECKING
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd_devices import AsyncSignal, CompareStatus, DeviceStatus, StatusBase
from ophyd_devices.devices.areadetector.plugins import ImagePlugin_V35 as ImagePlugin
from ophyd_devices.devices.dxp import EpicsDXPFalcon, EpicsMCARecord, Falcon
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
if TYPE_CHECKING:
from bec_lib.devicemanager import ScanInfo
logger = bec_logger.logger
class FalconAcquiringStatus(int, enum.Enum):
"""Status of Falcon"""
DONE = 0
ACQUIRING = 1
class FalconControl(Falcon):
"""Falcon Control class at SuperXAS. prefix: 'X10DA-SITORO:'"""
# DXP parameters
dxp1 = Cpt(EpicsDXPFalcon, "dxp1:")
# MCA record with spectrum data
mca1 = Cpt(EpicsMCARecord, "mca1")
# Image record
image = Cpt(ImagePlugin, "image1:")
class FalconSuperXAS(PSIDeviceBase, FalconControl):
"""Falcon implementierung at SuperXAS. prefix: 'X10DA-SITORO:'"""
data = Cpt(
AsyncSignal,
name="data",
ndim=1,
max_size=1000,
doc="1D Waveform data from Falcon detector.",
)
########################################
# Beamline Specific Implementations #
########################################
def on_init(self) -> None:
"""
Called when the device is initialized.
No signals are connected at this point. If you like to
set default values on signals, please use on_connected instead.
"""
self._pv_timeout = 1
self._falcon_energy_channels = None
def on_connected(self) -> None:
"""
Called after the device is connected and its signals are connected.
Default values for signals should be set here.
"""
# Reset array counter on connect
self.cam.array_counter.set(0).wait(timeout=self._pv_timeout)
self.image.unique_id.subscribe(self._on_new_data_received, run=False)
def on_stage(self) -> CompareStatus:
"""
Called while staging the device.
Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object.
"""
def on_unstage(self) -> CompareStatus:
"""Called while unstaging the device."""
def on_pre_scan(self) -> DeviceStatus | StatusBase | None:
"""Called right before the scan starts on all devices automatically."""
def on_trigger(self) -> DeviceStatus | StatusBase | None:
"""Called when the device is triggered."""
def on_complete(self) -> DeviceStatus | StatusBase | None:
"""Called to inquire if a device has completed a scans."""
def on_kickoff(self) -> DeviceStatus | StatusBase | None:
"""Called to kickoff a device for a fly scan. Has to be called explicitly."""
def on_stop(self) -> None:
"""Called when the device is stopped."""
########################################
# Custom Methods #
########################################
def _on_new_data_received(self, value: int, old_value: int, **kwargs):
"""Callback for image unique ID updates to trigger preview update."""
if value == old_value:
return # No new image, or counter reset
try:
# Get new image data
array_data = self.image.array_data.get()
if array_data is None:
logger.info(f"No image data available for preview of {self.name}")
return
if self._falcon_energy_channels is None:
# Initialize energy channels based on the first received data
self._falcon_energy_channels = len(array_data)
logger.info(f"Initialized Falcon energy channels to {self._falcon_energy_channels}")
# Geometry correction for the image
self.data.put(
array_data,
async_update={"type": "add", "max_shape": [None, self._falcon_energy_channels]},
)
except Exception: # pylint: disable=broad-except
content = traceback.format_exc()
logger.error(f"Error while updating preview for {self.name} on image update: {content}")