refactor(ddg): Add fsh signal to ddg, improve trigger logic.
CI for csaxs_bec / test (push) Failing after 1m26s
CI for csaxs_bec / test (pull_request) Failing after 1m27s

This commit is contained in:
2026-02-09 14:11:00 +01:00
parent 8849b9ffea
commit 32b4c39659
3 changed files with 101 additions and 17 deletions
@@ -37,7 +37,9 @@ import traceback
from typing import TYPE_CHECKING
from bec_lib.logger import bec_logger
from ophyd_devices import CompareStatus, DeviceStatus, TransitionStatus
from ophyd import Component as Cpt
from ophyd import EpicsSignalRO, Kind
from ophyd_devices import CompareStatus, DeviceStatus, StatusBase, TransitionStatus
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from csaxs_bec.devices.epics.delay_generator_csaxs.delay_generator_csaxs import (
@@ -133,6 +135,24 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
USER_ACCESS = ["keep_shutter_open_during_scan", "set_trigger"]
fast_shutter_readback = Cpt(
EpicsSignalRO,
read_pv="X12SA-ES1-TTL:IN_01",
add_prefix=("",), # Add this to prevent the prefix to be added to the signal
kind=Kind.omitted,
auto_monitor=True,
)
# The shutter control PV can indicate if the shutter is requested to be kept open. If that
# is the case, we can not use the signal shutter_readback signal to check if the delay cycle
# finishes but have to use the polling of the event status register to check if the burst finished.
fast_shutter_control = Cpt(
EpicsSignalRO,
read_pv="X12SA-ES1-TTL:OUT_01",
add_prefix=("",), # Add this to prevent the prefix to be added to the signal
kind=Kind.omitted,
auto_monitor=True,
)
def __init__(
self,
name: str,
@@ -195,7 +215,16 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
self.burst_delay.put(0)
self.burst_count.put(1)
def keep_shutter_open_during_scan(self, open:True) -> None:
def keep_shutter_open_during_scan(self, open: True) -> None:
"""
Method to configure the delay generator for keeping the shutter open during a scans.
This means that the additional delay to open the shutter needs to be removed (2e-3)
from the timing of the signals.
Args:
open (bool): If True, the shutter will be kept open during the scan.
If False, the shutter will be opened and closed for each trigger cycle.
"""
if open is True:
self._shutter_to_open_delay = 0
else:
@@ -239,6 +268,18 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
if self.burst_count.get() != 1:
self.burst_count.put(1)
#####################################
## Setup trigger source if needed ###
#####################################
# NOTE Some scans may change the trigger source to an external trigger,
# so we will make sure that the default trigger source is set for the DDG1
# before each scan. If a scan requires a different trigger source, i.e.
# external triggers then the scan should implement this change after the
# on_stage method was called.
if self.trigger_source.get() != DEFAULT_TRIGGER_SOURCE:
self.set_trigger(DEFAULT_TRIGGER_SOURCE)
#########################################
### Setup timing for burst and delays ###
#########################################
@@ -249,7 +290,9 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
# Burst Period DDG1
# Set burst_period to shutter width
# c/t0 + self._shutter_to_open_delay + exp_time * burst_count
shutter_width = self._shutter_to_open_delay + exp_time * frames_per_trigger # Shutter starts closing at end of exposure
shutter_width = (
self._shutter_to_open_delay + exp_time * frames_per_trigger
) # Shutter starts closing at end of exposure
if self.burst_period.get() != shutter_width:
self.burst_period.put(shutter_width)
@@ -443,7 +486,13 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
def on_trigger(self) -> DeviceStatus:
"""
This method is called from BEC as a software trigger.
This method is called from BEC as a software trigger. Here the logic is as follows:
We check the signal of the "fast_shutter_control". If it is low, we know that the shutter
open/closes for each trigger cycle. In this case, we can use the signal of the
"fast_shutter_readback" to check if the shutter transitioned from open to close, which
indicates the end of the burst. If the "fast_shutter_control" signal is high, we know
that the shutter is kept open during the scan. In this case, we can only rely on polling
the event status register to check if the burst finished.
It follows a specific procedure to ensure that the DDG1 and MCS card are properly handled
on a trigger event. The established logic is as follows:
@@ -464,9 +513,13 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
"""
self._stop_polling()
self._poll_thread_poll_loop_done.wait(timeout=1)
# TODO This may move to scan modifiers
if self.trigger_source.get() != TRIGGERSOURCE.SINGLE_SHOT.value:
status = StatusBase(obj=self)
status.set_finished()
return status
# NOTE: This sleep is important to ensure that the HW is ready to process new commands.
# It has been empirically determined after long testing that this improves stability.
time.sleep(0.02)
# NOTE If the MCS card is present in the current session of BEC,
# we prepare the card for the next trigger. The procedure is implemented
@@ -482,12 +535,23 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
# be investigated why the EPICS interface is slow to respond.
status_mcs.wait(timeout=3)
# Prepare StatusBitsCompareStatus to resolve once the END_OF_BURST bit was set.
status = self._prepare_trigger_status_event()
# Start polling thread again to monitor event status
self._start_polling()
if self.fast_shutter_control.get() == 0:
# Shutter is not kept open, we can rely on the shutter readback signal
status = TransitionStatus(
self.fast_shutter_readback, [1, 0]
) # Wait for shutter to transition from open (1) to close (0)
else:
# NOTE This sleep is needed for 20ms to make sure that the HW of the DDG is
# again ready to process new commands. It was transferred from just after the
# _stop_polling() call, as it should only be relevant in case of polling the
# event status register, which may only be if the shutter is kept open.
time.sleep(0.02)
# Shutter is kept open, we can only rely on the event status register
status = self._prepare_trigger_status_event()
# Start polling thread again to monitor event status
self._start_polling()
# Trigger the DDG1
self.cancel_on_stop(status)
self.trigger_shot.put(1, use_complete=True)
return status
@@ -488,6 +488,7 @@ class DelayGeneratorCSAXS(Device):
name="trigger_source",
kind=Kind.omitted,
doc="Trigger Source for the DDG, options in TRIGGERSOURCE",
auto_monitor=True,
)
trigger_level = Cpt(
EpicsSignal,
@@ -22,7 +22,13 @@ import numpy as np
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import EpicsSignalRO, Kind
from ophyd_devices import AsyncMultiSignal, CompareStatus, ProgressSignal, StatusBase, TransitionStatus
from ophyd_devices import (
AsyncMultiSignal,
CompareStatus,
ProgressSignal,
StatusBase,
TransitionStatus,
)
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from csaxs_bec.devices.epics.mcs_card.mcs_card import (
@@ -369,7 +375,6 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
elif self.scan_info.msg.scan_type == "fly":
self.num_use_all.set(self._num_total_triggers).wait(timeout=self._pv_timeout)
# Clear any previous data, just to be sure
with self._rlock:
self._current_data.clear()
@@ -392,10 +397,19 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
logger.info(f"MCS Card {self.name} on_stage completed in {time.time() - start_time:.3f}s.")
# For a fly scan we need to start the mcs card ourselves
if self.scan_info.msg.scan_type == "fly":
status_acquiring = TransitionStatus(self.acquiring, [ACQUIRING.DONE, ACQUIRING.ACQUIRING])
self.cancel_on_stop(status_acquiring)
self.erase_start.put(1)
def on_prescan(self) -> None | StatusBase:
"""
This method is called after on_stage and before the scan starts. For the MCS card, we need to make sure
that the card is properly started for fly scans. For step scans, this will be handled by the DDG,
so no action is required here.
"""
if self.scan_info.msg.scan_type == "fly":
status_acquiring = CompareStatus(self.acquiring, ACQUIRING.ACQUIRING)
self.cancel_on_stop(status_acquiring)
return status_acquiring
return None
def on_unstage(self) -> None:
"""
@@ -439,7 +453,7 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
callback(exception=None)
else:
logger.info(f"Current data index is {self._current_data_index}")
if self._current_data_index >=1:
if self._current_data_index >= 1:
for callback in self._scan_done_callbacks:
callback(exception=None)
@@ -470,7 +484,6 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
"""Callback for status failure, the monitoring thread should be stopped."""
# NOTE Check for status.done and status.success is important to avoid
if status.done:
self._start_monitor_async_data_emission.clear() # Stop monitoring
def on_complete(self) -> CompareStatus:
@@ -496,7 +509,13 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
monitoring thread is stopped properly.
"""
# NOTE For fly scans with EXT/EN enabled triggering, the MCS card needs to receive an
# additional trigger at the end of the scan to advance the channel. This will ensure
# that the acquisition finishes on the card and that data is emitted to BEC. If the acquisition
# was already finished (i.e. normal step scan sends 1 extra pulse per burst cycle), this will
# not have any effect as the card will already be in DONE state and signal.
self.software_channel_advance.put(1)
# Prepare and register status callback for the async monitoring loop
status_async_data = StatusBase(obj=self)
self._scan_done_callbacks.append(partial(self._status_callback, status_async_data))
@@ -510,7 +529,7 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
# Combine both statuses
ret_status = status & status_async_data
# Handle external stop/cancel, and stop monitoring
# NOTE: Handle external stop/cancel, and stop monitoring
ret_status.add_callback(self._status_failed_callback)
self.cancel_on_stop(ret_status)
return ret_status