From 9f5254abe272d149d3d67fe5a313e07603ce5ab1 Mon Sep 17 00:00:00 2001 From: appel_c Date: Wed, 6 Aug 2025 11:00:46 +0200 Subject: [PATCH] refactor(ddg): cleanup and improve comments --- .../epics/delay_generator_csaxs/ddg_1.py | 33 +++++++++++-------- .../epics/delay_generator_csaxs/ddg_2.py | 5 +-- .../test_delay_generator_csaxs.py | 5 +++ 3 files changed, 27 insertions(+), 16 deletions(-) diff --git a/csaxs_bec/devices/epics/delay_generator_csaxs/ddg_1.py b/csaxs_bec/devices/epics/delay_generator_csaxs/ddg_1.py index 586c9cc..314730d 100644 --- a/csaxs_bec/devices/epics/delay_generator_csaxs/ddg_1.py +++ b/csaxs_bec/devices/epics/delay_generator_csaxs/ddg_1.py @@ -36,7 +36,7 @@ import time from typing import TYPE_CHECKING from bec_lib.logger import bec_logger -from ophyd import DeviceStatus, StatusBase +from ophyd import DeviceStatus from ophyd_devices import CompareStatus, TransitionStatus from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase @@ -116,6 +116,7 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS): self.device_manager = device_manager self._poll_thread = threading.Thread(target=self._poll_event_status, daemon=True) self._poll_thread_run_event = threading.Event() + self._poll_thread_poll_loop_done = threading.Event() self._poll_thread_kill_event = threading.Event() self._poll_thread.start() @@ -134,7 +135,7 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS): # Set proc status to passively update with 5Hz (0.2s) self.state.proc_status_mode.put(PROC_EVENT_MODE.EVENT) - def on_stage(self) -> DeviceStatus | StatusBase | None: + def on_stage(self) -> None: """ Stage logic for the DDG1 device, being th main trigger delay generator for CSAXS. For standard scans, it will be triggered by a soft trigger from BEC. @@ -183,26 +184,30 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS): """ while not self._poll_thread_kill_event.is_set(): self._poll_thread_run_event.wait() + self._poll_thread_poll_loop_done.clear() while ( - not self._poll_thread_run_event.is_set() and self._poll_thread_kill_event.is_set() + self._poll_thread_run_event.is_set() and not self._poll_thread_kill_event.is_set() ): self._poll_loop() + self._poll_thread_poll_loop_done.set() + def _poll_loop(self) -> None: """ Poll loop to update event status. The checks ensure that the loop exist after each operation and be stuck in sleep. + The 20ms sleep was added to ensure that the event status is not polled too frequently, + and to give the device time to process the previous command. This was found empirically + to be necessary to avoid missing events. """ self.state.proc_status.put(1, use_complete=True) - if not self._poll_thread_run_event.is_set(): - return - time.sleep(0.02) # 20ms delay for processing - if not self._poll_thread_run_event.is_set(): + time.sleep(0.02) # 20ms delay for processing, important for not missing events + if self._poll_thread_run_event.is_set() and not self._poll_thread_kill_event.is_set(): return self.state.event_status.get(use_monitor=False) - if not self._poll_thread_run_event.is_set(): + if self._poll_thread_run_event.is_set() and not self._poll_thread_kill_event.is_set(): return - time.sleep(0.02) # 20ms delay for processing + time.sleep(0.02) # 20ms delay for processing, important for not missing events def _start_polling(self) -> None: """Start the polling loop in the background thread.""" @@ -220,7 +225,6 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS): if self._poll_thread.is_alive(): logger.warning("Polling thread did not stop gracefully.") else: - self._poll_thread = None logger.info("Polling thread stopped.") def _prepare_trigger_status_event(self, timeout: float | None = None) -> DeviceStatus: @@ -236,6 +240,7 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS): """Callback to cancel the status if the device is stopped.""" self._stop_polling() + # Run false is important to ensure that the status is only checked on the next event status update status = StatusBitsCompareStatus( self.state.event_status, STATUSBITS.END_OF_BURST, timeout=timeout, run=False ) @@ -243,14 +248,14 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS): self.cancel_on_stop(status) return status - def on_trigger(self) -> DeviceStatus | StatusBase | None: + def on_trigger(self) -> DeviceStatus: """Note, we need to add a delay to the StatusBits callback on the event_status. If we don't then subsequent triggers may reach the DDG too early, and will be ignored. To avoid this, we've added the option to specify a delay via add_delay, default here is 50ms. """ # Stop polling, poll once manually to ensure that the register is clean self._stop_polling() - self._poll_loop() + self._poll_thread_poll_loop_done.wait(timeout=1) # Prepare the MCS card for the next software trigger mcs = self.device_manager.devices.get("mcs", None) @@ -269,11 +274,11 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS): def on_stop(self) -> None: """Stop the delay generator by setting the burst mode to 0""" self.stop_ddg() + self._stop_polling() - def on_destroy(self): + def on_destroy(self) -> None: """Clean up resources when the device is destroyed.""" self._kill_poll_thread() - super().on_destroy() if __name__ == "__main__": diff --git a/csaxs_bec/devices/epics/delay_generator_csaxs/ddg_2.py b/csaxs_bec/devices/epics/delay_generator_csaxs/ddg_2.py index 0e78b7f..4d8d0c4 100644 --- a/csaxs_bec/devices/epics/delay_generator_csaxs/ddg_2.py +++ b/csaxs_bec/devices/epics/delay_generator_csaxs/ddg_2.py @@ -36,6 +36,7 @@ from csaxs_bec.devices.epics.delay_generator_csaxs.delay_generator_csaxs import AllChannelNames, ChannelConfig, DelayGeneratorCSAXS, + LiteralChannels, ) logger = bec_logger.logger @@ -57,7 +58,7 @@ DEFAULT_IO_CONFIG: dict[AllChannelNames, ChannelConfig] = { DEFAULT_TRIGGER_SOURCE: TRIGGERSOURCE = TRIGGERSOURCE.EXT_RISING_EDGE DEFAULT_READOUT_TIMES = {"ab": 2e-4, "cd": 2e-4, "ef": 2e-4, "gh": 2e-4} # 0.2 ms 5kHz -DEFAULT_REFERENCES: list[tuple[AllChannelNames, CHANNELREFERENCE]] = [ +DEFAULT_REFERENCES: list[tuple[LiteralChannels, CHANNELREFERENCE]] = [ ("A", CHANNELREFERENCE.T0), ("B", CHANNELREFERENCE.A), ("C", CHANNELREFERENCE.T0), @@ -100,7 +101,7 @@ class DDG2(PSIDeviceBase, DelayGeneratorCSAXS): frames_per_trigger = self.scan_info.msg.scan_parameters["frames_per_trigger"] # a = t0 # a has reference to t0, b has reference to a - if any(exp_time < rt for rt in DEFAULT_READOUT_TIMES.values()): + if any(exp_time <= rt for rt in DEFAULT_READOUT_TIMES.values()): raise ValueError( f"Exposure time {exp_time} is too short for the readout times {DEFAULT_READOUT_TIMES}" ) diff --git a/tests/tests_devices/test_delay_generator_csaxs.py b/tests/tests_devices/test_delay_generator_csaxs.py index 68a1f3c..252babc 100644 --- a/tests/tests_devices/test_delay_generator_csaxs.py +++ b/tests/tests_devices/test_delay_generator_csaxs.py @@ -280,6 +280,11 @@ def test_ddg2_stage(mock_ddg2): assert mock_ddg2.trigger_source.get() == TRIGGERSOURCE.EXT_RISING_EDGE.value assert mock_ddg2.staged == ophyd.Staged.yes + mock_ddg2.unstage() # Reset staged state for next test + + with pytest.raises(ValueError): + mock_ddg2.scan_info.msg.scan_parameters["exp_time"] = 2e-4 # too short exposure time + mock_ddg2.stage() def test_ddg2_trigger(mock_ddg2):