diff --git a/tests/test_utils.py b/tests/test_utils.py index f0023ea..7da000c 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -38,6 +38,7 @@ from ophyd_devices.utils.psi_device_base_utils import ( TaskStatus, TransitionStatus, ) +from ophyd_devices.utils.signal_monitoring import SignalMonitoring # pylint: disable=protected-access # pylint: disable=redefined-outer-name @@ -227,6 +228,94 @@ def test_utils_task_handler_shutdown(task_handler): assert status1.exception().__class__ == TaskKilledError +@pytest.mark.timeout(10) +def test_utils_signal_monitoring_of_script(): + """Verify polling triggers repeatedly and approximately at the configured cadence.""" + monitoring = SignalMonitoring(name="test_signal_monitoring") + monitoring.polling_interval = 0.05 + + callback_timestamps = [] + reached_target_calls = threading.Event() + + def monitored_callback(): + callback_timestamps.append(time.perf_counter()) + if len(callback_timestamps) >= 6: + reached_target_calls.set() + + callback_id = monitoring.register_signal(monitored_callback) + + try: + monitoring.start() + assert reached_target_calls.wait( + timeout=1.5 + ), "SignalMonitoring did not poll the registered callback enough times in time." + monitoring.stop() + + # Use a subset of intervals to reduce startup/shutdown jitter influence. + intervals = [ + callback_timestamps[idx + 1] - callback_timestamps[idx] + for idx in range(len(callback_timestamps) - 1) + ] + stable_intervals = intervals[1:-1] if len(intervals) > 4 else intervals + + assert len(callback_timestamps) >= 6 + assert stable_intervals + + mean_interval = float(np.mean(stable_intervals)) + assert mean_interval == pytest.approx(monitoring.polling_interval, abs=0.03) + + # Guard against pathological bursts or stalls. + assert min(stable_intervals) > 0.02 + assert max(stable_intervals) < 0.15 + finally: + monitoring.remove_signal(callback_id) + monitoring.shutdown() + + +def test_utils_signal_monitoring_of_ophyd_signal(): + """Test that SignalMonitoring can monitor an ophyd Signal and trigger callbacks on value changes.""" + + class MockSignalWithCounter(Signal): + + _target_event = threading.Event() + _get_counter = 0 + + def get(self): + self._get_counter += 1 + if self._get_counter == 20: # After 20 polls, trigger the event to stop the test + self._target_event.set() + return self._readback + + monitoring = SignalMonitoring(name="test_signal_monitoring") + monitoring.polling_interval = 0.05 # 20 times per second + + signal = MockSignalWithCounter(name="test_signal", value=0) + assert signal._get_counter == 0 + assert signal._target_event.is_set() is False + + monitoring.register_signal(signal=signal) + monitoring.start() + assert signal._target_event.wait( + timeout=1.5 + ), "SignalMonitoring did not poll the Signal enough times in time." + monitoring.stop() + signal_counter = signal._get_counter + assert signal_counter >= 20, f"Expected at least 20 polls, got {signal_counter}" + time.sleep(0.2) # Wait to ensure no more polls happen after stopping + assert np.isclose( + signal._get_counter, signal_counter, atol=1 + ), ( # Allow for 1 additional poll due to timing uncertainty + "SignalMonitoring continued polling after stopping" + ) + + monitoring.shutdown() + timer = time.time() + while monitoring._poll_thread.is_alive(): + time.sleep(0.1) + if time.time() - timer > 2: + raise TimeoutError("Polling thread did not shut down within expected time.") + + ########################################## ######### Test PSI cusomt signals ###### ##########################################