diff --git a/tests/end-2-end/test_beamline_state_manager_e2e.py b/tests/end-2-end/test_beamline_state_manager_e2e.py new file mode 100644 index 00000000..e35408e5 --- /dev/null +++ b/tests/end-2-end/test_beamline_state_manager_e2e.py @@ -0,0 +1,73 @@ +from __future__ import annotations + +import uuid + +import pytest +from bec_lib.bl_states import DeviceWithinLimitsStateConfig + +from bec_widgets.widgets.services.beamline_states.beamline_state_pill import BeamlineStateManager + +# pylint: disable=protected-access + + +def _delete_state_if_present(bec, state_name: str) -> None: + if hasattr(bec.beamline_states, state_name): + bec.beamline_states.delete(state_name) + + +@pytest.mark.timeout(100) +def test_beamline_state_manager_adds_updates_and_deletes_state_e2e(qtbot, bec_client_lib): + """ + Verify the real BEC beamline-state flow is reflected by BeamlineStateManager. + + This test requires the e2e BEC servers and is intended to be run with + ``--start-servers``. + """ + bec = bec_client_lib + dev = bec.device_manager.devices + scans = bec.scans + state_name = f"samx_widget_limits_{uuid.uuid4().hex[:8]}" + config = DeviceWithinLimitsStateConfig( + name=state_name, device="samx", signal="samx", low_limit=0.0, high_limit=10.0, tolerance=1.0 + ) + manager = BeamlineStateManager(client=bec) + qtbot.addWidget(manager) + manager.show() + qtbot.waitExposed(manager) + + _delete_state_if_present(bec, state_name) + + try: + bec.beamline_states.add(config) + + qtbot.waitUntil(lambda: hasattr(bec.beamline_states, state_name), timeout=10000) + qtbot.waitUntil(lambda: state_name in manager._state_pills, timeout=10000) + + pill = manager._state_pills[state_name] + assert pill.state_name == state_name + + scans.umv(dev.samx, 5, relative=False).wait() + qtbot.waitUntil( + lambda: getattr(bec.beamline_states, state_name).get()["status"] == "valid", + timeout=10000, + ) + qtbot.waitUntil(lambda: pill._status == "valid", timeout=10000) + assert pill._status_label.text() == "VALID" + assert pill._detail_label.text() == "Device samx within limits" + + scans.umv(dev.samx, 20, relative=False).wait() + qtbot.waitUntil( + lambda: getattr(bec.beamline_states, state_name).get()["status"] == "invalid", + timeout=10000, + ) + qtbot.waitUntil(lambda: pill._status == "invalid", timeout=10000) + assert pill._status_label.text() == "INVALID" + assert pill._detail_label.text() == "Device samx out of limits" + + bec.beamline_states.delete(state_name) + qtbot.waitUntil(lambda: not hasattr(bec.beamline_states, state_name), timeout=10000) + qtbot.waitUntil(lambda: state_name not in manager._state_pills, timeout=10000) + + finally: + _delete_state_if_present(bec, state_name) + scans.umv(dev.samx, 0, relative=False).wait()