refactor(ddg): cleanup and improve comments
This commit is contained in:
@@ -36,7 +36,7 @@ import time
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from bec_lib.logger import bec_logger
|
||||
from ophyd import DeviceStatus, StatusBase
|
||||
from ophyd import DeviceStatus
|
||||
from ophyd_devices import CompareStatus, TransitionStatus
|
||||
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
|
||||
|
||||
@@ -116,6 +116,7 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
|
||||
self.device_manager = device_manager
|
||||
self._poll_thread = threading.Thread(target=self._poll_event_status, daemon=True)
|
||||
self._poll_thread_run_event = threading.Event()
|
||||
self._poll_thread_poll_loop_done = threading.Event()
|
||||
self._poll_thread_kill_event = threading.Event()
|
||||
self._poll_thread.start()
|
||||
|
||||
@@ -134,7 +135,7 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
|
||||
# Set proc status to passively update with 5Hz (0.2s)
|
||||
self.state.proc_status_mode.put(PROC_EVENT_MODE.EVENT)
|
||||
|
||||
def on_stage(self) -> DeviceStatus | StatusBase | None:
|
||||
def on_stage(self) -> None:
|
||||
"""
|
||||
Stage logic for the DDG1 device, being th main trigger delay generator for CSAXS.
|
||||
For standard scans, it will be triggered by a soft trigger from BEC.
|
||||
@@ -183,26 +184,30 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
|
||||
"""
|
||||
while not self._poll_thread_kill_event.is_set():
|
||||
self._poll_thread_run_event.wait()
|
||||
self._poll_thread_poll_loop_done.clear()
|
||||
while (
|
||||
not self._poll_thread_run_event.is_set() and self._poll_thread_kill_event.is_set()
|
||||
self._poll_thread_run_event.is_set() and not self._poll_thread_kill_event.is_set()
|
||||
):
|
||||
self._poll_loop()
|
||||
|
||||
self._poll_thread_poll_loop_done.set()
|
||||
|
||||
def _poll_loop(self) -> None:
|
||||
"""
|
||||
Poll loop to update event status.
|
||||
The checks ensure that the loop exist after each operation and be stuck in sleep.
|
||||
The 20ms sleep was added to ensure that the event status is not polled too frequently,
|
||||
and to give the device time to process the previous command. This was found empirically
|
||||
to be necessary to avoid missing events.
|
||||
"""
|
||||
self.state.proc_status.put(1, use_complete=True)
|
||||
if not self._poll_thread_run_event.is_set():
|
||||
return
|
||||
time.sleep(0.02) # 20ms delay for processing
|
||||
if not self._poll_thread_run_event.is_set():
|
||||
time.sleep(0.02) # 20ms delay for processing, important for not missing events
|
||||
if self._poll_thread_run_event.is_set() and not self._poll_thread_kill_event.is_set():
|
||||
return
|
||||
self.state.event_status.get(use_monitor=False)
|
||||
if not self._poll_thread_run_event.is_set():
|
||||
if self._poll_thread_run_event.is_set() and not self._poll_thread_kill_event.is_set():
|
||||
return
|
||||
time.sleep(0.02) # 20ms delay for processing
|
||||
time.sleep(0.02) # 20ms delay for processing, important for not missing events
|
||||
|
||||
def _start_polling(self) -> None:
|
||||
"""Start the polling loop in the background thread."""
|
||||
@@ -220,7 +225,6 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
|
||||
if self._poll_thread.is_alive():
|
||||
logger.warning("Polling thread did not stop gracefully.")
|
||||
else:
|
||||
self._poll_thread = None
|
||||
logger.info("Polling thread stopped.")
|
||||
|
||||
def _prepare_trigger_status_event(self, timeout: float | None = None) -> DeviceStatus:
|
||||
@@ -236,6 +240,7 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
|
||||
"""Callback to cancel the status if the device is stopped."""
|
||||
self._stop_polling()
|
||||
|
||||
# Run false is important to ensure that the status is only checked on the next event status update
|
||||
status = StatusBitsCompareStatus(
|
||||
self.state.event_status, STATUSBITS.END_OF_BURST, timeout=timeout, run=False
|
||||
)
|
||||
@@ -243,14 +248,14 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
|
||||
self.cancel_on_stop(status)
|
||||
return status
|
||||
|
||||
def on_trigger(self) -> DeviceStatus | StatusBase | None:
|
||||
def on_trigger(self) -> DeviceStatus:
|
||||
"""Note, we need to add a delay to the StatusBits callback on the event_status.
|
||||
If we don't then subsequent triggers may reach the DDG too early, and will be ignored. To
|
||||
avoid this, we've added the option to specify a delay via add_delay, default here is 50ms.
|
||||
"""
|
||||
# Stop polling, poll once manually to ensure that the register is clean
|
||||
self._stop_polling()
|
||||
self._poll_loop()
|
||||
self._poll_thread_poll_loop_done.wait(timeout=1)
|
||||
|
||||
# Prepare the MCS card for the next software trigger
|
||||
mcs = self.device_manager.devices.get("mcs", None)
|
||||
@@ -269,11 +274,11 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
|
||||
def on_stop(self) -> None:
|
||||
"""Stop the delay generator by setting the burst mode to 0"""
|
||||
self.stop_ddg()
|
||||
self._stop_polling()
|
||||
|
||||
def on_destroy(self):
|
||||
def on_destroy(self) -> None:
|
||||
"""Clean up resources when the device is destroyed."""
|
||||
self._kill_poll_thread()
|
||||
super().on_destroy()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -36,6 +36,7 @@ from csaxs_bec.devices.epics.delay_generator_csaxs.delay_generator_csaxs import
|
||||
AllChannelNames,
|
||||
ChannelConfig,
|
||||
DelayGeneratorCSAXS,
|
||||
LiteralChannels,
|
||||
)
|
||||
|
||||
logger = bec_logger.logger
|
||||
@@ -57,7 +58,7 @@ DEFAULT_IO_CONFIG: dict[AllChannelNames, ChannelConfig] = {
|
||||
DEFAULT_TRIGGER_SOURCE: TRIGGERSOURCE = TRIGGERSOURCE.EXT_RISING_EDGE
|
||||
DEFAULT_READOUT_TIMES = {"ab": 2e-4, "cd": 2e-4, "ef": 2e-4, "gh": 2e-4} # 0.2 ms 5kHz
|
||||
|
||||
DEFAULT_REFERENCES: list[tuple[AllChannelNames, CHANNELREFERENCE]] = [
|
||||
DEFAULT_REFERENCES: list[tuple[LiteralChannels, CHANNELREFERENCE]] = [
|
||||
("A", CHANNELREFERENCE.T0),
|
||||
("B", CHANNELREFERENCE.A),
|
||||
("C", CHANNELREFERENCE.T0),
|
||||
@@ -100,7 +101,7 @@ class DDG2(PSIDeviceBase, DelayGeneratorCSAXS):
|
||||
frames_per_trigger = self.scan_info.msg.scan_parameters["frames_per_trigger"]
|
||||
# a = t0
|
||||
# a has reference to t0, b has reference to a
|
||||
if any(exp_time < rt for rt in DEFAULT_READOUT_TIMES.values()):
|
||||
if any(exp_time <= rt for rt in DEFAULT_READOUT_TIMES.values()):
|
||||
raise ValueError(
|
||||
f"Exposure time {exp_time} is too short for the readout times {DEFAULT_READOUT_TIMES}"
|
||||
)
|
||||
|
||||
@@ -280,6 +280,11 @@ def test_ddg2_stage(mock_ddg2):
|
||||
assert mock_ddg2.trigger_source.get() == TRIGGERSOURCE.EXT_RISING_EDGE.value
|
||||
|
||||
assert mock_ddg2.staged == ophyd.Staged.yes
|
||||
mock_ddg2.unstage() # Reset staged state for next test
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
mock_ddg2.scan_info.msg.scan_parameters["exp_time"] = 2e-4 # too short exposure time
|
||||
mock_ddg2.stage()
|
||||
|
||||
|
||||
def test_ddg2_trigger(mock_ddg2):
|
||||
|
||||
Reference in New Issue
Block a user