refactor(ddg): use threadpool with active polling for state

This commit is contained in:
2025-08-05 13:16:10 +02:00
parent a03e99d615
commit c7e11eeabc
4 changed files with 91 additions and 58 deletions

View File

@@ -32,7 +32,7 @@ mcs:
ids_cam:
description: IDS camera for live image acquisition
deviceClass: csaxs_bec.devices.ids_cameras.ids_camera_new.IDSCamera
deviceClass: csaxs_bec.devices.ids_cameras.IDSCamera
deviceConfig:
camera_id: 201
bits_per_pixel: 24

View File

@@ -31,11 +31,14 @@ DELAY CHANNELS:
from __future__ import annotations
import threading
import time
from concurrent.futures import Future, ThreadPoolExecutor
from typing import TYPE_CHECKING
from bec_lib.logger import bec_logger
from ophyd import DeviceStatus, StatusBase
from ophyd import Component as Cpt
from ophyd import DeviceStatus, Signal, StatusBase
from ophyd_devices import CompareStatus, TransitionStatus
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
@@ -48,6 +51,7 @@ from csaxs_bec.devices.epics.delay_generator_csaxs.delay_generator_csaxs import
AllChannelNames,
ChannelConfig,
DelayGeneratorCSAXS,
StatusBitsCompareStatus,
)
from csaxs_bec.devices.epics.mcs_card.mcs_card_csaxs import ACQUIRING, READYTOREAD
@@ -110,6 +114,9 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
name=name, prefix=prefix, scan_info=scan_info, device_manager=device_manager, **kwargs
)
self.device_manager = device_manager
self._threadpool = ThreadPoolExecutor(max_workers=1)
self._status_thread_event = threading.Event()
self._status_future = None # Future for the status job
# pylint: disable=attribute-defined-outside-init
def on_connected(self) -> None:
@@ -124,7 +131,7 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
self.set_trigger(DEFAULT_TRIGGER_SOURCE)
self.set_references_for_channels(DEFAULT_REFERENCES)
# Set proc status to passively update with 5Hz (0.2s)
self.state.proc_status_mode.put(PROC_EVENT_MODE.FREQ_5HZ)
self.state.proc_status_mode.put(PROC_EVENT_MODE.EVENT)
def on_stage(self) -> DeviceStatus | StatusBase | None:
"""
@@ -134,7 +141,8 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
This DDG is always not in burst mode.
"""
self.burst_disable()
exp_time = self.scan_info.msg.scan_parameters["exp_time"]
self.burst_enable(1, 0, exp_time)
exp_time = self.scan_info.msg.scan_parameters["exp_time"]
frames_per_trigger = self.scan_info.msg.scan_parameters["frames_per_trigger"]
# Trigger DDG2
@@ -151,73 +159,96 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
# e has refernce to d, f has reference to e
self.set_delay_pairs(channel="ef", delay=0, width=1e-6)
def _prepare_mcs_on_trigger(self, mcs: MCSCardCSAXS) -> None:
"""Prepare the MCS card for the next trigger.
This method holds the logic to ensure that the MCS card is ready to read.
It's logic is coupled to the MCS card implementation and the DDG1 trigger logic.
"""
status_ready_read = CompareStatus(mcs.ready_to_read, READYTOREAD.DONE)
mcs.stop_all.put(1)
status_acquiring = TransitionStatus(mcs.acquiring, [ACQUIRING.DONE, ACQUIRING.ACQUIRING])
self.cancel_on_stop(status_ready_read)
self.cancel_on_stop(status_acquiring)
status_ready_read.wait(10)
mcs.ready_to_read.put(READYTOREAD.PROCESSING)
mcs.erase_start.put(1)
status_acquiring.wait(timeout=10) # Allow 10 seconds in case communication is slow
def _reset_status_event_future(self) -> None:
"""Reset the status future and thread event."""
if self._status_future and not self._status_future.done():
self._status_thread_event.set()
self._status_future.result(timeout=10) # Allow 10 seconds for communication to finish
# Reset to known state
self._status_thread_event.clear()
self.state.proc_status.put(1, use_complete=True)
def _prepare_trigger_status_event(self, timeout: float | None = None) -> DeviceStatus:
"""Prepare the trigger status event for the DDG1, and trigger the de"""
if timeout is None:
# Default timeout of 5 seconds + exposure time * frames_per_trigger
timeout = 5 + self.scan_info.msg.scan_parameters.get(
"exp_time", 0.1
) * self.scan_info.msg.scan_parameters.get("frames_per_trigger", 1)
# Callback to cancel the status if the device is stopped
def cancel_cb(status: CompareStatus) -> None:
"""Callback to cancel the status if the device is stopped."""
self._status_thread_event.set()
status = StatusBitsCompareStatus(
self.state.event_status, STATUSBITS.END_OF_BURST, timeout=timeout, run=False
)
status.add_callback(cancel_cb)
self.cancel_on_stop(status)
# Callback to poll events, this gets executed in a separate thread by the threadpool
def _status_job():
"""Callback to poll event status an update the status_event signal."""
while not self._status_thread_event.is_set():
self.state.proc_status.put(1, use_complete=True)
time.sleep(0.02) # 20ms delay for processing
self.state.event_status.get(use_monitor=False)
time.sleep(0.02) # 20ms delay for processing
# Submit the status job to the threadpool
self._status_future = self._threadpool.submit(_status_job)
return status
def on_trigger(self) -> DeviceStatus | StatusBase | None:
"""Note, we need to add a delay to the StatusBits callback on the event_status.
If we don't then subsequent triggers may reach the DDG too early, and will be ignored. To
avoid this, we've added the option to specify a delay via add_delay, default here is 50ms.
"""
status = CompareStatus(self.state.event_status, STATUSBITS.NONE)
self.cancel_on_stop(status)
# Make sure to reset the status future and thread event
self._reset_status_event_future()
# Prepare the MCS card for the next software trigger
mcs = self.device_manager.devices.get("mcs", None)
if mcs is None:
logger.info("Did not find mcs card with name 'mcs' in current session")
else:
mcs: MCSCardCSAXS
status_ready_read = CompareStatus(mcs.ready_to_read, READYTOREAD.DONE)
mcs.stop_all.put(1)
status_acquiring = TransitionStatus(
mcs.acquiring, [ACQUIRING.DONE, ACQUIRING.ACQUIRING]
)
self.cancel_on_stop(status_ready_read)
self.cancel_on_stop(status_acquiring)
status_ready_read.wait(10)
mcs.ready_to_read.put(READYTOREAD.PROCESSING)
mcs.erase_start.put(1)
status_acquiring.wait(
timeout=10
) # 2 s wait for mcs card to start should be more than enough..
status.wait(timeout=10)
# Default timeout of 5 seconds + exposure time * frames_per_trigger
timeout = 5 + self.scan_info.msg.scan_parameters.get(
"exp_time", 0.1
) * self.scan_info.msg.scan_parameters.get("frames_per_trigger", 1)
status = CompareStatus(self.state.event_status, STATUSBITS.END_OF_DELAY, timeout=timeout)
self.cancel_on_stop(status)
self._prepare_mcs_on_trigger(mcs)
# Prepare status to wait for the end of burst
status = self._prepare_trigger_status_event()
# Trigger the DDG1
self.trigger_shot.put(1, use_complete=True)
return status
def wait_for_status(
self, status: DeviceStatus, bit_event: STATUSBITS, timeout: float = 2
) -> None:
"""Wait for a event status bit to be set.
Args:
status (DeviceStatus): The status object to update.
bit_event (STATUSBITS): The event status bit to wait for.
timeout (float): Maximum time to wait for the event status bit to be set.
"""
current_time = time.time()
while not status.done:
self.state.proc_status.put(1, use_complete=True)
event_status = self.state.event_status.get()
if (STATUSBITS(event_status) & bit_event) == bit_event:
status.set_finished()
if time.time() - current_time > timeout:
status.set_exception(
TimeoutError(
f"Timeout waiting for status of device {self.name} for event_status {bit_event}"
)
)
break
time.sleep(0.1)
time.sleep(0.05) # Give time for the IOC to be ready again
return status
def on_stop(self) -> None:
"""Stop the delay generator by setting the burst mode to 0"""
self.stop_ddg()
def on_destroy(self):
"""Clean up resources when the device is destroyed."""
self._status_thread_event.set()
if hasattr(self, "_threadpool") and self._threadpool is not None:
self._threadpool.shutdown(wait=False)
self._threadpool = None
super().on_destroy()
if __name__ == "__main__":
ddg = DDG1(name="ddg1", prefix="X12SA-CPCL-DDG1:")

View File

@@ -402,7 +402,6 @@ class DelayGeneratorEventStatus(Device):
EpicsSignal,
"EventStatusLI.PROC",
name="proc_status",
auto_monitor=True,
kind=Kind.omitted,
doc="Poll and flush the latest event status register entry from the HW to the event_status signal",
)

View File

@@ -191,7 +191,10 @@ def test_ddg1_stage(mock_ddg1):
mock_ddg1.stage()
assert np.isclose(mock_ddg1.burst_mode.get(), 0) # Burst mode is disabled
assert np.isclose(mock_ddg1.burst_mode.get(), 1) # burst mode is enabled
assert np.isclose(mock_ddg1.burst_delay.get(), 0)
assert np.isclose(mock_ddg1.burst_period.get(), exp_time)
# Trigger DDG2 through EXT/EN
assert np.isclose(mock_ddg1.ab.delay.get(), 2e-3)
@@ -218,7 +221,7 @@ def test_ddg1_trigger(mock_ddg1):
assert status.done is False
assert status.success is False
assert mock_ddg1.trigger_shot.get() == 1
mock_ddg1.state.event_status._read_pv.mock_data = STATUSBITS.END_OF_DELAY.value
mock_ddg1.state.event_status._read_pv.mock_data = STATUSBITS.END_OF_BURST.value
status.wait(timeout=1) # Wait for the status to be done
assert status.done is True
assert status.success is True