diff --git a/csaxs_bec/devices/epics/delay_generator_csaxs/ddg_1.py b/csaxs_bec/devices/epics/delay_generator_csaxs/ddg_1.py index f12a0b7..586c9cc 100644 --- a/csaxs_bec/devices/epics/delay_generator_csaxs/ddg_1.py +++ b/csaxs_bec/devices/epics/delay_generator_csaxs/ddg_1.py @@ -33,12 +33,10 @@ from __future__ import annotations import threading import time -from concurrent.futures import Future, ThreadPoolExecutor from typing import TYPE_CHECKING from bec_lib.logger import bec_logger -from ophyd import Component as Cpt -from ophyd import DeviceStatus, Signal, StatusBase +from ophyd import DeviceStatus, StatusBase from ophyd_devices import CompareStatus, TransitionStatus from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase @@ -51,6 +49,7 @@ from csaxs_bec.devices.epics.delay_generator_csaxs.delay_generator_csaxs import AllChannelNames, ChannelConfig, DelayGeneratorCSAXS, + LiteralChannels, StatusBitsCompareStatus, ) from csaxs_bec.devices.epics.mcs_card.mcs_card_csaxs import ACQUIRING, READYTOREAD @@ -79,7 +78,7 @@ DEFAULT_IO_CONFIG: dict[AllChannelNames, ChannelConfig] = { DEFAULT_TRIGGER_SOURCE: TRIGGERSOURCE = TRIGGERSOURCE.SINGLE_SHOT DEFAULT_READOUT_TIMES = {"ab": 2e-4, "cd": 2e-4, "ef": 2e-4, "gh": 2e-4} # 0.2 ms 5kHz -DEFAULT_REFERENCES: list[tuple[AllChannelNames, CHANNELREFERENCE]] = [ +DEFAULT_REFERENCES: list[tuple[LiteralChannels, CHANNELREFERENCE]] = [ ("A", CHANNELREFERENCE.T0), # T0 + 2ms delay ("B", CHANNELREFERENCE.A), ("C", CHANNELREFERENCE.T0), # T0 @@ -93,10 +92,11 @@ DEFAULT_REFERENCES: list[tuple[AllChannelNames, CHANNELREFERENCE]] = [ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS): """ - Implementation of DelayGeneratorCSAXS for the CSAXS master trigger delay generator at X12SA-CPCL-DDG1. - It will be triggered by a soft trigger from BEC or a hardware trigger from a beamline device (e.g. the Galil stages). - It is operated in standard mode, not burst mode and will trigger the EXT/EN of DDG2 (channel ab). - It is responsible for opening the shutter (channel cd) and sending an extra trigger to an or gate for the MCS card (channel ef). + Implementation of DelayGeneratorCSAXS for master trigger delay generator at X12SA-CPCL-DDG1. + It will be triggered by a soft trigger from BEC or a hardware trigger from a beamline device + (e.g. the Galil stages). It is operated in standard mode, not burst mode and will trigger the + EXT/EN of DDG2 (channel ab). It is responsible for opening the shutter (channel cd) and sending + an extra trigger to an or gate for the MCS card (channel ef). """ def __init__( @@ -114,9 +114,10 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS): name=name, prefix=prefix, scan_info=scan_info, device_manager=device_manager, **kwargs ) self.device_manager = device_manager - self._threadpool = ThreadPoolExecutor(max_workers=1) - self._status_thread_event = threading.Event() - self._status_future = None # Future for the status job + self._poll_thread = threading.Thread(target=self._poll_event_status, daemon=True) + self._poll_thread_run_event = threading.Event() + self._poll_thread_kill_event = threading.Event() + self._poll_thread.start() # pylint: disable=attribute-defined-outside-init def on_connected(self) -> None: @@ -175,15 +176,52 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS): mcs.erase_start.put(1) status_acquiring.wait(timeout=10) # Allow 10 seconds in case communication is slow - def _reset_status_event_future(self) -> None: - """Reset the status future and thread event.""" - if self._status_future and not self._status_future.done(): - self._status_thread_event.set() - self._status_future.result(timeout=10) # Allow 10 seconds for communication to finish + def _poll_event_status(self) -> None: + """ + Poll the event status register in a background thread. Control + the polling with the _poll_thread_run_event and _poll_thread_kill_event. + """ + while not self._poll_thread_kill_event.is_set(): + self._poll_thread_run_event.wait() + while ( + not self._poll_thread_run_event.is_set() and self._poll_thread_kill_event.is_set() + ): + self._poll_loop() - # Reset to known state - self._status_thread_event.clear() + def _poll_loop(self) -> None: + """ + Poll loop to update event status. + The checks ensure that the loop exist after each operation and be stuck in sleep. + """ self.state.proc_status.put(1, use_complete=True) + if not self._poll_thread_run_event.is_set(): + return + time.sleep(0.02) # 20ms delay for processing + if not self._poll_thread_run_event.is_set(): + return + self.state.event_status.get(use_monitor=False) + if not self._poll_thread_run_event.is_set(): + return + time.sleep(0.02) # 20ms delay for processing + + def _start_polling(self) -> None: + """Start the polling loop in the background thread.""" + self._poll_thread_run_event.set() + + def _stop_polling(self) -> None: + """Stop the polling loop in the background thread.""" + self._poll_thread_run_event.clear() + + def _kill_poll_thread(self) -> None: + """Kill the polling thread.""" + self._poll_thread_kill_event.set() + self._stop_polling() + self._poll_thread.join(timeout=1) + if self._poll_thread.is_alive(): + logger.warning("Polling thread did not stop gracefully.") + else: + self._poll_thread = None + logger.info("Polling thread stopped.") def _prepare_trigger_status_event(self, timeout: float | None = None) -> DeviceStatus: """Prepare the trigger status event for the DDG1, and trigger the de""" @@ -196,25 +234,13 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS): # Callback to cancel the status if the device is stopped def cancel_cb(status: CompareStatus) -> None: """Callback to cancel the status if the device is stopped.""" - self._status_thread_event.set() + self._stop_polling() status = StatusBitsCompareStatus( self.state.event_status, STATUSBITS.END_OF_BURST, timeout=timeout, run=False ) status.add_callback(cancel_cb) self.cancel_on_stop(status) - - # Callback to poll events, this gets executed in a separate thread by the threadpool - def _status_job(): - """Callback to poll event status an update the status_event signal.""" - while not self._status_thread_event.is_set(): - self.state.proc_status.put(1, use_complete=True) - time.sleep(0.02) # 20ms delay for processing - self.state.event_status.get(use_monitor=False) - time.sleep(0.02) # 20ms delay for processing - - # Submit the status job to the threadpool - self._status_future = self._threadpool.submit(_status_job) return status def on_trigger(self) -> DeviceStatus | StatusBase | None: @@ -222,8 +248,9 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS): If we don't then subsequent triggers may reach the DDG too early, and will be ignored. To avoid this, we've added the option to specify a delay via add_delay, default here is 50ms. """ - # Make sure to reset the status future and thread event - self._reset_status_event_future() + # Stop polling, poll once manually to ensure that the register is clean + self._stop_polling() + self._poll_loop() # Prepare the MCS card for the next software trigger mcs = self.device_manager.devices.get("mcs", None) @@ -231,8 +258,10 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS): logger.info("Did not find mcs card with name 'mcs' in current session") else: self._prepare_mcs_on_trigger(mcs) - # Prepare status to wait for the end of burst + # Prepare status with callback to cancel the polling once finished status = self._prepare_trigger_status_event() + # Start polling + self._start_polling() # Trigger the DDG1 self.trigger_shot.put(1, use_complete=True) return status @@ -243,10 +272,7 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS): def on_destroy(self): """Clean up resources when the device is destroyed.""" - self._status_thread_event.set() - if hasattr(self, "_threadpool") and self._threadpool is not None: - self._threadpool.shutdown(wait=False) - self._threadpool = None + self._kill_poll_thread() super().on_destroy()