test: Add tests for signal monitoring tool

This commit is contained in:
2026-04-07 13:28:21 +02:00
parent 83a6f5de29
commit d3695cd846
+89
View File
@@ -38,6 +38,7 @@ from ophyd_devices.utils.psi_device_base_utils import (
TaskStatus,
TransitionStatus,
)
from ophyd_devices.utils.signal_monitoring import SignalMonitoring
# pylint: disable=protected-access
# pylint: disable=redefined-outer-name
@@ -227,6 +228,94 @@ def test_utils_task_handler_shutdown(task_handler):
assert status1.exception().__class__ == TaskKilledError
@pytest.mark.timeout(10)
def test_utils_signal_monitoring_of_script():
"""Verify polling triggers repeatedly and approximately at the configured cadence."""
monitoring = SignalMonitoring(name="test_signal_monitoring")
monitoring.polling_interval = 0.05
callback_timestamps = []
reached_target_calls = threading.Event()
def monitored_callback():
callback_timestamps.append(time.perf_counter())
if len(callback_timestamps) >= 6:
reached_target_calls.set()
callback_id = monitoring.register_signal(monitored_callback)
try:
monitoring.start()
assert reached_target_calls.wait(
timeout=1.5
), "SignalMonitoring did not poll the registered callback enough times in time."
monitoring.stop()
# Use a subset of intervals to reduce startup/shutdown jitter influence.
intervals = [
callback_timestamps[idx + 1] - callback_timestamps[idx]
for idx in range(len(callback_timestamps) - 1)
]
stable_intervals = intervals[1:-1] if len(intervals) > 4 else intervals
assert len(callback_timestamps) >= 6
assert stable_intervals
mean_interval = float(np.mean(stable_intervals))
assert mean_interval == pytest.approx(monitoring.polling_interval, abs=0.03)
# Guard against pathological bursts or stalls.
assert min(stable_intervals) > 0.02
assert max(stable_intervals) < 0.15
finally:
monitoring.remove_signal(callback_id)
monitoring.shutdown()
def test_utils_signal_monitoring_of_ophyd_signal():
"""Test that SignalMonitoring can monitor an ophyd Signal and trigger callbacks on value changes."""
class MockSignalWithCounter(Signal):
_target_event = threading.Event()
_get_counter = 0
def get(self):
self._get_counter += 1
if self._get_counter == 20: # After 20 polls, trigger the event to stop the test
self._target_event.set()
return self._readback
monitoring = SignalMonitoring(name="test_signal_monitoring")
monitoring.polling_interval = 0.05 # 20 times per second
signal = MockSignalWithCounter(name="test_signal", value=0)
assert signal._get_counter == 0
assert signal._target_event.is_set() is False
monitoring.register_signal(signal=signal)
monitoring.start()
assert signal._target_event.wait(
timeout=1.5
), "SignalMonitoring did not poll the Signal enough times in time."
monitoring.stop()
signal_counter = signal._get_counter
assert signal_counter >= 20, f"Expected at least 20 polls, got {signal_counter}"
time.sleep(0.2) # Wait to ensure no more polls happen after stopping
assert np.isclose(
signal._get_counter, signal_counter, atol=1
), ( # Allow for 1 additional poll due to timing uncertainty
"SignalMonitoring continued polling after stopping"
)
monitoring.shutdown()
timer = time.time()
while monitoring._poll_thread.is_alive():
time.sleep(0.1)
if time.time() - timer > 2:
raise TimeoutError("Polling thread did not shut down within expected time.")
##########################################
######### Test PSI cusomt signals ######
##########################################