refactor(ddg1): remove threadpool, use dedicated thread for polling
This commit is contained in:
@@ -33,12 +33,10 @@ from __future__ import annotations
|
||||
|
||||
import threading
|
||||
import time
|
||||
from concurrent.futures import Future, ThreadPoolExecutor
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from bec_lib.logger import bec_logger
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd import DeviceStatus, Signal, StatusBase
|
||||
from ophyd import DeviceStatus, StatusBase
|
||||
from ophyd_devices import CompareStatus, TransitionStatus
|
||||
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
|
||||
|
||||
@@ -51,6 +49,7 @@ from csaxs_bec.devices.epics.delay_generator_csaxs.delay_generator_csaxs import
|
||||
AllChannelNames,
|
||||
ChannelConfig,
|
||||
DelayGeneratorCSAXS,
|
||||
LiteralChannels,
|
||||
StatusBitsCompareStatus,
|
||||
)
|
||||
from csaxs_bec.devices.epics.mcs_card.mcs_card_csaxs import ACQUIRING, READYTOREAD
|
||||
@@ -79,7 +78,7 @@ DEFAULT_IO_CONFIG: dict[AllChannelNames, ChannelConfig] = {
|
||||
DEFAULT_TRIGGER_SOURCE: TRIGGERSOURCE = TRIGGERSOURCE.SINGLE_SHOT
|
||||
DEFAULT_READOUT_TIMES = {"ab": 2e-4, "cd": 2e-4, "ef": 2e-4, "gh": 2e-4} # 0.2 ms 5kHz
|
||||
|
||||
DEFAULT_REFERENCES: list[tuple[AllChannelNames, CHANNELREFERENCE]] = [
|
||||
DEFAULT_REFERENCES: list[tuple[LiteralChannels, CHANNELREFERENCE]] = [
|
||||
("A", CHANNELREFERENCE.T0), # T0 + 2ms delay
|
||||
("B", CHANNELREFERENCE.A),
|
||||
("C", CHANNELREFERENCE.T0), # T0
|
||||
@@ -93,10 +92,11 @@ DEFAULT_REFERENCES: list[tuple[AllChannelNames, CHANNELREFERENCE]] = [
|
||||
|
||||
class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
|
||||
"""
|
||||
Implementation of DelayGeneratorCSAXS for the CSAXS master trigger delay generator at X12SA-CPCL-DDG1.
|
||||
It will be triggered by a soft trigger from BEC or a hardware trigger from a beamline device (e.g. the Galil stages).
|
||||
It is operated in standard mode, not burst mode and will trigger the EXT/EN of DDG2 (channel ab).
|
||||
It is responsible for opening the shutter (channel cd) and sending an extra trigger to an or gate for the MCS card (channel ef).
|
||||
Implementation of DelayGeneratorCSAXS for master trigger delay generator at X12SA-CPCL-DDG1.
|
||||
It will be triggered by a soft trigger from BEC or a hardware trigger from a beamline device
|
||||
(e.g. the Galil stages). It is operated in standard mode, not burst mode and will trigger the
|
||||
EXT/EN of DDG2 (channel ab). It is responsible for opening the shutter (channel cd) and sending
|
||||
an extra trigger to an or gate for the MCS card (channel ef).
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
@@ -114,9 +114,10 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
|
||||
name=name, prefix=prefix, scan_info=scan_info, device_manager=device_manager, **kwargs
|
||||
)
|
||||
self.device_manager = device_manager
|
||||
self._threadpool = ThreadPoolExecutor(max_workers=1)
|
||||
self._status_thread_event = threading.Event()
|
||||
self._status_future = None # Future for the status job
|
||||
self._poll_thread = threading.Thread(target=self._poll_event_status, daemon=True)
|
||||
self._poll_thread_run_event = threading.Event()
|
||||
self._poll_thread_kill_event = threading.Event()
|
||||
self._poll_thread.start()
|
||||
|
||||
# pylint: disable=attribute-defined-outside-init
|
||||
def on_connected(self) -> None:
|
||||
@@ -175,15 +176,52 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
|
||||
mcs.erase_start.put(1)
|
||||
status_acquiring.wait(timeout=10) # Allow 10 seconds in case communication is slow
|
||||
|
||||
def _reset_status_event_future(self) -> None:
|
||||
"""Reset the status future and thread event."""
|
||||
if self._status_future and not self._status_future.done():
|
||||
self._status_thread_event.set()
|
||||
self._status_future.result(timeout=10) # Allow 10 seconds for communication to finish
|
||||
def _poll_event_status(self) -> None:
|
||||
"""
|
||||
Poll the event status register in a background thread. Control
|
||||
the polling with the _poll_thread_run_event and _poll_thread_kill_event.
|
||||
"""
|
||||
while not self._poll_thread_kill_event.is_set():
|
||||
self._poll_thread_run_event.wait()
|
||||
while (
|
||||
not self._poll_thread_run_event.is_set() and self._poll_thread_kill_event.is_set()
|
||||
):
|
||||
self._poll_loop()
|
||||
|
||||
# Reset to known state
|
||||
self._status_thread_event.clear()
|
||||
def _poll_loop(self) -> None:
|
||||
"""
|
||||
Poll loop to update event status.
|
||||
The checks ensure that the loop exist after each operation and be stuck in sleep.
|
||||
"""
|
||||
self.state.proc_status.put(1, use_complete=True)
|
||||
if not self._poll_thread_run_event.is_set():
|
||||
return
|
||||
time.sleep(0.02) # 20ms delay for processing
|
||||
if not self._poll_thread_run_event.is_set():
|
||||
return
|
||||
self.state.event_status.get(use_monitor=False)
|
||||
if not self._poll_thread_run_event.is_set():
|
||||
return
|
||||
time.sleep(0.02) # 20ms delay for processing
|
||||
|
||||
def _start_polling(self) -> None:
|
||||
"""Start the polling loop in the background thread."""
|
||||
self._poll_thread_run_event.set()
|
||||
|
||||
def _stop_polling(self) -> None:
|
||||
"""Stop the polling loop in the background thread."""
|
||||
self._poll_thread_run_event.clear()
|
||||
|
||||
def _kill_poll_thread(self) -> None:
|
||||
"""Kill the polling thread."""
|
||||
self._poll_thread_kill_event.set()
|
||||
self._stop_polling()
|
||||
self._poll_thread.join(timeout=1)
|
||||
if self._poll_thread.is_alive():
|
||||
logger.warning("Polling thread did not stop gracefully.")
|
||||
else:
|
||||
self._poll_thread = None
|
||||
logger.info("Polling thread stopped.")
|
||||
|
||||
def _prepare_trigger_status_event(self, timeout: float | None = None) -> DeviceStatus:
|
||||
"""Prepare the trigger status event for the DDG1, and trigger the de"""
|
||||
@@ -196,25 +234,13 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
|
||||
# Callback to cancel the status if the device is stopped
|
||||
def cancel_cb(status: CompareStatus) -> None:
|
||||
"""Callback to cancel the status if the device is stopped."""
|
||||
self._status_thread_event.set()
|
||||
self._stop_polling()
|
||||
|
||||
status = StatusBitsCompareStatus(
|
||||
self.state.event_status, STATUSBITS.END_OF_BURST, timeout=timeout, run=False
|
||||
)
|
||||
status.add_callback(cancel_cb)
|
||||
self.cancel_on_stop(status)
|
||||
|
||||
# Callback to poll events, this gets executed in a separate thread by the threadpool
|
||||
def _status_job():
|
||||
"""Callback to poll event status an update the status_event signal."""
|
||||
while not self._status_thread_event.is_set():
|
||||
self.state.proc_status.put(1, use_complete=True)
|
||||
time.sleep(0.02) # 20ms delay for processing
|
||||
self.state.event_status.get(use_monitor=False)
|
||||
time.sleep(0.02) # 20ms delay for processing
|
||||
|
||||
# Submit the status job to the threadpool
|
||||
self._status_future = self._threadpool.submit(_status_job)
|
||||
return status
|
||||
|
||||
def on_trigger(self) -> DeviceStatus | StatusBase | None:
|
||||
@@ -222,8 +248,9 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
|
||||
If we don't then subsequent triggers may reach the DDG too early, and will be ignored. To
|
||||
avoid this, we've added the option to specify a delay via add_delay, default here is 50ms.
|
||||
"""
|
||||
# Make sure to reset the status future and thread event
|
||||
self._reset_status_event_future()
|
||||
# Stop polling, poll once manually to ensure that the register is clean
|
||||
self._stop_polling()
|
||||
self._poll_loop()
|
||||
|
||||
# Prepare the MCS card for the next software trigger
|
||||
mcs = self.device_manager.devices.get("mcs", None)
|
||||
@@ -231,8 +258,10 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
|
||||
logger.info("Did not find mcs card with name 'mcs' in current session")
|
||||
else:
|
||||
self._prepare_mcs_on_trigger(mcs)
|
||||
# Prepare status to wait for the end of burst
|
||||
# Prepare status with callback to cancel the polling once finished
|
||||
status = self._prepare_trigger_status_event()
|
||||
# Start polling
|
||||
self._start_polling()
|
||||
# Trigger the DDG1
|
||||
self.trigger_shot.put(1, use_complete=True)
|
||||
return status
|
||||
@@ -243,10 +272,7 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
|
||||
|
||||
def on_destroy(self):
|
||||
"""Clean up resources when the device is destroyed."""
|
||||
self._status_thread_event.set()
|
||||
if hasattr(self, "_threadpool") and self._threadpool is not None:
|
||||
self._threadpool.shutdown(wait=False)
|
||||
self._threadpool = None
|
||||
self._kill_poll_thread()
|
||||
super().on_destroy()
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user