refactor(ddg1): remove threadpool, use dedicated thread for polling

This commit is contained in:
2025-08-06 10:50:39 +02:00
parent c7e11eeabc
commit a58c23845f

View File

@@ -33,12 +33,10 @@ from __future__ import annotations
import threading
import time
from concurrent.futures import Future, ThreadPoolExecutor
from typing import TYPE_CHECKING
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import DeviceStatus, Signal, StatusBase
from ophyd import DeviceStatus, StatusBase
from ophyd_devices import CompareStatus, TransitionStatus
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
@@ -51,6 +49,7 @@ from csaxs_bec.devices.epics.delay_generator_csaxs.delay_generator_csaxs import
AllChannelNames,
ChannelConfig,
DelayGeneratorCSAXS,
LiteralChannels,
StatusBitsCompareStatus,
)
from csaxs_bec.devices.epics.mcs_card.mcs_card_csaxs import ACQUIRING, READYTOREAD
@@ -79,7 +78,7 @@ DEFAULT_IO_CONFIG: dict[AllChannelNames, ChannelConfig] = {
DEFAULT_TRIGGER_SOURCE: TRIGGERSOURCE = TRIGGERSOURCE.SINGLE_SHOT
DEFAULT_READOUT_TIMES = {"ab": 2e-4, "cd": 2e-4, "ef": 2e-4, "gh": 2e-4} # 0.2 ms 5kHz
DEFAULT_REFERENCES: list[tuple[AllChannelNames, CHANNELREFERENCE]] = [
DEFAULT_REFERENCES: list[tuple[LiteralChannels, CHANNELREFERENCE]] = [
("A", CHANNELREFERENCE.T0), # T0 + 2ms delay
("B", CHANNELREFERENCE.A),
("C", CHANNELREFERENCE.T0), # T0
@@ -93,10 +92,11 @@ DEFAULT_REFERENCES: list[tuple[AllChannelNames, CHANNELREFERENCE]] = [
class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
"""
Implementation of DelayGeneratorCSAXS for the CSAXS master trigger delay generator at X12SA-CPCL-DDG1.
It will be triggered by a soft trigger from BEC or a hardware trigger from a beamline device (e.g. the Galil stages).
It is operated in standard mode, not burst mode and will trigger the EXT/EN of DDG2 (channel ab).
It is responsible for opening the shutter (channel cd) and sending an extra trigger to an or gate for the MCS card (channel ef).
Implementation of DelayGeneratorCSAXS for master trigger delay generator at X12SA-CPCL-DDG1.
It will be triggered by a soft trigger from BEC or a hardware trigger from a beamline device
(e.g. the Galil stages). It is operated in standard mode, not burst mode and will trigger the
EXT/EN of DDG2 (channel ab). It is responsible for opening the shutter (channel cd) and sending
an extra trigger to an or gate for the MCS card (channel ef).
"""
def __init__(
@@ -114,9 +114,10 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
name=name, prefix=prefix, scan_info=scan_info, device_manager=device_manager, **kwargs
)
self.device_manager = device_manager
self._threadpool = ThreadPoolExecutor(max_workers=1)
self._status_thread_event = threading.Event()
self._status_future = None # Future for the status job
self._poll_thread = threading.Thread(target=self._poll_event_status, daemon=True)
self._poll_thread_run_event = threading.Event()
self._poll_thread_kill_event = threading.Event()
self._poll_thread.start()
# pylint: disable=attribute-defined-outside-init
def on_connected(self) -> None:
@@ -175,15 +176,52 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
mcs.erase_start.put(1)
status_acquiring.wait(timeout=10) # Allow 10 seconds in case communication is slow
def _reset_status_event_future(self) -> None:
"""Reset the status future and thread event."""
if self._status_future and not self._status_future.done():
self._status_thread_event.set()
self._status_future.result(timeout=10) # Allow 10 seconds for communication to finish
def _poll_event_status(self) -> None:
"""
Poll the event status register in a background thread. Control
the polling with the _poll_thread_run_event and _poll_thread_kill_event.
"""
while not self._poll_thread_kill_event.is_set():
self._poll_thread_run_event.wait()
while (
not self._poll_thread_run_event.is_set() and self._poll_thread_kill_event.is_set()
):
self._poll_loop()
# Reset to known state
self._status_thread_event.clear()
def _poll_loop(self) -> None:
"""
Poll loop to update event status.
The checks ensure that the loop exist after each operation and be stuck in sleep.
"""
self.state.proc_status.put(1, use_complete=True)
if not self._poll_thread_run_event.is_set():
return
time.sleep(0.02) # 20ms delay for processing
if not self._poll_thread_run_event.is_set():
return
self.state.event_status.get(use_monitor=False)
if not self._poll_thread_run_event.is_set():
return
time.sleep(0.02) # 20ms delay for processing
def _start_polling(self) -> None:
"""Start the polling loop in the background thread."""
self._poll_thread_run_event.set()
def _stop_polling(self) -> None:
"""Stop the polling loop in the background thread."""
self._poll_thread_run_event.clear()
def _kill_poll_thread(self) -> None:
"""Kill the polling thread."""
self._poll_thread_kill_event.set()
self._stop_polling()
self._poll_thread.join(timeout=1)
if self._poll_thread.is_alive():
logger.warning("Polling thread did not stop gracefully.")
else:
self._poll_thread = None
logger.info("Polling thread stopped.")
def _prepare_trigger_status_event(self, timeout: float | None = None) -> DeviceStatus:
"""Prepare the trigger status event for the DDG1, and trigger the de"""
@@ -196,25 +234,13 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
# Callback to cancel the status if the device is stopped
def cancel_cb(status: CompareStatus) -> None:
"""Callback to cancel the status if the device is stopped."""
self._status_thread_event.set()
self._stop_polling()
status = StatusBitsCompareStatus(
self.state.event_status, STATUSBITS.END_OF_BURST, timeout=timeout, run=False
)
status.add_callback(cancel_cb)
self.cancel_on_stop(status)
# Callback to poll events, this gets executed in a separate thread by the threadpool
def _status_job():
"""Callback to poll event status an update the status_event signal."""
while not self._status_thread_event.is_set():
self.state.proc_status.put(1, use_complete=True)
time.sleep(0.02) # 20ms delay for processing
self.state.event_status.get(use_monitor=False)
time.sleep(0.02) # 20ms delay for processing
# Submit the status job to the threadpool
self._status_future = self._threadpool.submit(_status_job)
return status
def on_trigger(self) -> DeviceStatus | StatusBase | None:
@@ -222,8 +248,9 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
If we don't then subsequent triggers may reach the DDG too early, and will be ignored. To
avoid this, we've added the option to specify a delay via add_delay, default here is 50ms.
"""
# Make sure to reset the status future and thread event
self._reset_status_event_future()
# Stop polling, poll once manually to ensure that the register is clean
self._stop_polling()
self._poll_loop()
# Prepare the MCS card for the next software trigger
mcs = self.device_manager.devices.get("mcs", None)
@@ -231,8 +258,10 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
logger.info("Did not find mcs card with name 'mcs' in current session")
else:
self._prepare_mcs_on_trigger(mcs)
# Prepare status to wait for the end of burst
# Prepare status with callback to cancel the polling once finished
status = self._prepare_trigger_status_event()
# Start polling
self._start_polling()
# Trigger the DDG1
self.trigger_shot.put(1, use_complete=True)
return status
@@ -243,10 +272,7 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
def on_destroy(self):
"""Clean up resources when the device is destroyed."""
self._status_thread_event.set()
if hasattr(self, "_threadpool") and self._threadpool is not None:
self._threadpool.shutdown(wait=False)
self._threadpool = None
self._kill_poll_thread()
super().on_destroy()