fix(falcon): refactor Falcon to use Compare and TransitionStatus

This commit is contained in:
2026-06-01 11:21:31 +02:00
parent be1a78bd64
commit c793c84e64
+15 -25
View File
@@ -8,6 +8,7 @@ from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import DeviceStatus, Kind, Signal, StatusBase
from ophyd.status import SubscriptionStatus
from ophyd_devices import CompareStatus, StatusBase, TransitionStatus
from ophyd_devices.devices.dxp import EpicsDXPFalcon, EpicsMCARecord, Falcon
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
@@ -141,7 +142,7 @@ class FalconSuperXAS(PSIDeviceBase, FalconControl):
Default values for signals should be set here.
"""
def on_stage(self) -> DeviceStatus | StatusBase | None:
def on_stage(self) -> CompareStatus:
"""
Called while staging the device.
@@ -149,26 +150,16 @@ class FalconSuperXAS(PSIDeviceBase, FalconControl):
"""
self.collect_mode.set(0).wait()
self.preset_real_time.set(0).wait()
status = CompareStatus(self.acquiring, FalconAcquiringStatus.DONE, timeout=self._pv_timeout)
self.stop_all.put(1)
if (
self.wait_for_condition(
lambda: self.acquiring.get() == FalconAcquiringStatus.DONE, timeout=self._pv_timeout
)
is False
):
raise TimeoutError("Timeout on Falcon stage")
return status
def on_unstage(self) -> DeviceStatus | StatusBase | None:
def on_unstage(self) -> CompareStatus:
"""Called while unstaging the device."""
self.stop_all.put(1)
self.erase_all.put(1)
if (
self.wait_for_condition(
lambda: self.acquiring.get() == FalconAcquiringStatus.DONE, timeout=self._pv_timeout
)
is False
):
raise TimeoutError("Timeout on Falcon unstage")
status = CompareStatus(self.acquiring, FalconAcquiringStatus.DONE, timeout=self._pv_timeout)
return status
def on_pre_scan(self) -> DeviceStatus | StatusBase | None:
"""Called right before the scan starts on all devices automatically."""
@@ -186,19 +177,18 @@ class FalconSuperXAS(PSIDeviceBase, FalconControl):
"""Called when the device is stopped."""
self.stop_all.put(1)
def _stop_erase_and_wait_for_acquiring(self) -> DeviceStatus:
def _stop_erase_and_wait_for_acquiring(self) -> TransitionStatus:
"""Method called from the Trigger card to reset counts on the Falcon"""
if self.acquiring.get() != FalconAcquiringStatus.DONE:
self.stop_all.put(1)
def _check_acquiriting(*, old_value, value, **kwargs):
if old_value == FalconAcquiringStatus.DONE and value == FalconAcquiringStatus.ACQUIRING:
return True
return False
status = SubscriptionStatus(self.acquiring, _check_acquiriting)
logger.info("Triggering Falcon")
# TODO in case this fails, it could be that DONE is not emitted as it is only visible
# for a short time, below the update frequency of the EPICS acquiring PV (100ms). In this case,
# add here a CompareStatus, waiting for Done to appear before sending the erase command.
status = TransitionStatus(
self.acquiring,
transitions=[FalconAcquiringStatus.DONE, FalconAcquiringStatus.ACQUIRING],
)
self.erase_start.put(1)
return status