mirror of
https://github.com/bec-project/bec_widgets.git
synced 2026-06-30 00:29:46 +02:00
test(bl-status-browser): full e2e test
This commit is contained in:
@@ -0,0 +1,73 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import uuid
|
||||
|
||||
import pytest
|
||||
from bec_lib.bl_states import DeviceWithinLimitsStateConfig
|
||||
|
||||
from bec_widgets.widgets.services.beamline_states.beamline_state_pill import BeamlineStateManager
|
||||
|
||||
# pylint: disable=protected-access
|
||||
|
||||
|
||||
def _delete_state_if_present(bec, state_name: str) -> None:
|
||||
if hasattr(bec.beamline_states, state_name):
|
||||
bec.beamline_states.delete(state_name)
|
||||
|
||||
|
||||
@pytest.mark.timeout(100)
|
||||
def test_beamline_state_manager_adds_updates_and_deletes_state_e2e(qtbot, bec_client_lib):
|
||||
"""
|
||||
Verify the real BEC beamline-state flow is reflected by BeamlineStateManager.
|
||||
|
||||
This test requires the e2e BEC servers and is intended to be run with
|
||||
``--start-servers``.
|
||||
"""
|
||||
bec = bec_client_lib
|
||||
dev = bec.device_manager.devices
|
||||
scans = bec.scans
|
||||
state_name = f"samx_widget_limits_{uuid.uuid4().hex[:8]}"
|
||||
config = DeviceWithinLimitsStateConfig(
|
||||
name=state_name, device="samx", signal="samx", low_limit=0.0, high_limit=10.0, tolerance=1.0
|
||||
)
|
||||
manager = BeamlineStateManager(client=bec)
|
||||
qtbot.addWidget(manager)
|
||||
manager.show()
|
||||
qtbot.waitExposed(manager)
|
||||
|
||||
_delete_state_if_present(bec, state_name)
|
||||
|
||||
try:
|
||||
bec.beamline_states.add(config)
|
||||
|
||||
qtbot.waitUntil(lambda: hasattr(bec.beamline_states, state_name), timeout=10000)
|
||||
qtbot.waitUntil(lambda: state_name in manager._state_pills, timeout=10000)
|
||||
|
||||
pill = manager._state_pills[state_name]
|
||||
assert pill.state_name == state_name
|
||||
|
||||
scans.umv(dev.samx, 5, relative=False).wait()
|
||||
qtbot.waitUntil(
|
||||
lambda: getattr(bec.beamline_states, state_name).get()["status"] == "valid",
|
||||
timeout=10000,
|
||||
)
|
||||
qtbot.waitUntil(lambda: pill._status == "valid", timeout=10000)
|
||||
assert pill._status_label.text() == "VALID"
|
||||
assert pill._detail_label.text() == "Device samx within limits"
|
||||
|
||||
scans.umv(dev.samx, 20, relative=False).wait()
|
||||
qtbot.waitUntil(
|
||||
lambda: getattr(bec.beamline_states, state_name).get()["status"] == "invalid",
|
||||
timeout=10000,
|
||||
)
|
||||
qtbot.waitUntil(lambda: pill._status == "invalid", timeout=10000)
|
||||
assert pill._status_label.text() == "INVALID"
|
||||
assert pill._detail_label.text() == "Device samx out of limits"
|
||||
|
||||
bec.beamline_states.delete(state_name)
|
||||
qtbot.waitUntil(lambda: not hasattr(bec.beamline_states, state_name), timeout=10000)
|
||||
qtbot.waitUntil(lambda: state_name not in manager._state_pills, timeout=10000)
|
||||
|
||||
finally:
|
||||
_delete_state_if_present(bec, state_name)
|
||||
scans.umv(dev.samx, 0, relative=False).wait()
|
||||
Reference in New Issue
Block a user