0
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2025-07-13 19:21:50 +02:00

tests(user-interaction-e2e): add module scoped e2e tests with user interaction; closes #508

This commit is contained in:
2025-05-01 17:21:53 +02:00
parent a6c479e42e
commit 75a2780fe0
10 changed files with 790 additions and 30 deletions

View File

@ -4,8 +4,7 @@ import random
import pytest
from bec_widgets.cli.client_utils import BECGuiClient, _start_plot_process
from bec_widgets.utils import BECDispatcher
from bec_widgets.cli.client_utils import BECGuiClient
# pylint: disable=unused-argument
# pylint: disable=redefined-outer-name
@ -28,7 +27,7 @@ def gui_id():
return f"figure_{random.randint(0,100)}" # make a new gui id each time, to ensure no 'gui is alive' zombie key can perturbate
@pytest.fixture
@pytest.fixture(scope="function")
def connected_client_gui_obj(qtbot, gui_id, bec_client_lib):
"""
Fixture to create a new BECGuiClient object and start a server in the background.
@ -42,22 +41,3 @@ def connected_client_gui_obj(qtbot, gui_id, bec_client_lib):
yield gui
finally:
gui.kill_server()
@pytest.fixture(scope="session")
def connected_gui_with_scope_session(qtbot, gui_id, bec_client_lib):
"""
Fixture to create a new BECGuiClient object and start a server in the background.
This fixture is scoped to the session, meaning it remains alive for all tests in the session.
We can use this fixture to create a gui object that is used across multiple tests, and
simulate a real-world scenario where the gui is not restarted for each test.
"""
gui = BECGuiClient(gui_id=gui_id)
try:
gui.start(wait=True)
# After the server started, we need to wait until the bec exists in the namespace
qtbot.waitUntil(lambda: hasattr(gui, "bec"), timeout=5000)
yield gui
finally:
gui.kill_server()

View File

@ -136,7 +136,7 @@ def test_async_plotting(qtbot, bec_client_lib, connected_client_gui_obj):
dev.waveform.sim.select_model("GaussianModel")
dev.waveform.sim.params = {"amplitude": 1000, "center": 4000, "sigma": 300}
dev.waveform.async_update.set("add").wait()
dev.waveform.waveform_shape.set(1000).wait()
dev.waveform.waveform_shape.set(10000).wait()
wf = dock.new("wf_dock").new("Waveform")
curve = wf.plot(y_name="waveform")

View File

@ -9,7 +9,7 @@ from bec_widgets.cli.rpc.rpc_base import RPCReference
def test_rpc_reference_objects(connected_client_gui_obj):
gui = connected_client_gui_obj
dock = gui.window_list[0].new("dock")
dock = gui.window_list[0].new()
plt = dock.new(name="fig", widget="Waveform")
plt.plot(x_name="samx", y_name="bpm4i")

View File

@ -0,0 +1,82 @@
"""
End-2-End test fixtures for module scoped testing. The fixtures overwrite the default versions used
for the function scoped tests. The fixtures will only be created once for this entire module, meaning
that any test can be used to test user interaction and potential leakage of threads or other resources across
different widgets.
"""
import random
import pytest
from bec_ipython_client import BECIPythonClient
from bec_lib.redis_connector import RedisConnector
from bec_lib.service_config import ServiceConfig
from bec_lib.tests.utils import wait_for_empty_queue
from pytestqt.plugin import QtBot
from bec_widgets.cli.client_utils import BECGuiClient
# pylint: disable=unused-argument
# pylint: disable=redefined-outer-name
@pytest.fixture(scope="module")
def gui_id():
"""New gui id each time, to ensure no 'gui is alive' zombie key can perturbate"""
return f"figure_{random.randint(0,100)}"
@pytest.fixture(scope="module")
def bec_ipython_client_with_demo_config(
bec_redis_fixture, bec_services_config_file_path, bec_servers
):
"""Fixture to create a BECIPythonClient with a demo config."""
config = ServiceConfig(bec_services_config_file_path)
bec = BECIPythonClient(config, RedisConnector, forced=True)
bec.start()
bec.config.load_demo_config()
try:
yield bec
finally:
bec.shutdown()
bec._client._reset_singleton()
@pytest.fixture(scope="module")
def bec_client_lib(bec_ipython_client_with_demo_config):
"""Fixture to create a BECIPythonClient with a demo config."""
bec = bec_ipython_client_with_demo_config
bec.queue.request_queue_reset()
bec.queue.request_scan_continuation()
wait_for_empty_queue(bec)
yield bec
@pytest.fixture(scope="module")
def qtbot_scope_module(qapp, request):
"""
Fixture used to create a QtBot instance for using during testing.
Make sure to call addWidget for each top-level widget you create to ensure
that they are properly closed after the test ends.
"""
result = QtBot(request)
return result
@pytest.fixture(scope="module")
def connected_client_gui_obj(qtbot_scope_module, gui_id, bec_client_lib):
"""
Fixture to create a new BECGuiClient object and start a server in the background.
This fixture is scoped to the session, meaning it remains alive for all tests in the session.
We can use this fixture to create a gui object that is used across multiple tests, and
simulate a real-world scenario where the gui is not restarted for each test.
"""
gui = BECGuiClient(gui_id=gui_id)
try:
gui.start(wait=True)
qtbot_scope_module.waitUntil(lambda: hasattr(gui, "bec"), timeout=5000)
yield gui
finally:
gui.kill_server()

View File

@ -0,0 +1,667 @@
"""
End-to-end tests single gui instance across the full session.
Each test will use the same gui instance, simulating a real-world scenario where the gui is not
restarted for each test. The interaction is tested through the rpc calls.
Note: wait_for_namespace_created is a utility method that helps to wait for the namespace to be
created in the gui. This is necessary because the rpc calls are asynchronous and the namespace
may not be created immediately after the rpc call is made.
"""
from __future__ import annotations
import random
from typing import TYPE_CHECKING, Any
import numpy as np
import pytest
from bec_widgets.cli.client import BECDockArea
from bec_widgets.cli.rpc.rpc_base import RPCBase, RPCReference
PYTEST_TIMEOUT = 50
if TYPE_CHECKING: # pragma: no cover
from bec_widgets.cli import client
from bec_widgets.cli.client_utils import BECGuiClient
# pylint: disable=redefined-outer-name
# pylint: disable=too-many-arguments
# pylint: disable=protected-access
# pylint: disable=unused-variable
def wait_for_namespace_change(
qtbot,
gui: BECGuiClient,
parent_widget: RPCBase | RPCReference,
object_name: str,
widget_gui_id: str,
timeout: float = 10000,
exists: bool = True,
):
"""
Utility method to wait for the namespace to be created in the widget.
Args:
qtbot: The qtbot fixture.
gui: The client_utils.BECGuiClient 'gui' object from the CLI.
parent_widget: The widget that creates a new widget.
object_name: The name of the widget that was created. Must appear as attribute in namespace of parent.
widget_gui_id: The gui_id of the created widget.
timeout: The timeout in milliseconds for the qtbot to wait for changes to appear.
exists: If True, wait for the object to be created. If False, wait for the object to be removed.
"""
# GUI object is not registered in the registry (yet)
if parent_widget is gui:
def check_reference_registered():
# Check server registry
obj = gui._server_registry.get(widget_gui_id, None)
if obj is None:
if not exists:
return True
return False
# CHeck Ipython registry
obj = gui._ipython_registry.get(widget_gui_id, None)
if obj is None:
if not exists:
return True
return False
else:
def check_reference_registered():
# Check server registry
obj = gui._server_registry.get(widget_gui_id, None)
if obj is None:
if not exists:
return True
return False
# CHeck Ipython registry
obj = gui._ipython_registry.get(widget_gui_id, None)
if obj is None:
if not exists:
return True
return False
# Check reference registry
ref = parent_widget._rpc_references.get(widget_gui_id, None)
if exists:
return ref is not None
return ref is None
try:
qtbot.waitUntil(check_reference_registered, timeout=timeout)
except Exception as e:
raise RuntimeError(
f"Timeout waiting for {parent_widget.object_name}.{object_name} to be created."
) from e
def create_widget(
qtbot, gui: BECGuiClient, widget_cls_name: str
) -> tuple[RPCReference, RPCReference]:
"""Utility method to create a widget and wait for the namespaces to be created."""
if hasattr(gui, "dock_area"):
dock_area: client.BECDockArea = gui.dock_area
else:
dock_area: client.BECDockArea = gui.new(name="dock_area")
wait_for_namespace_change(qtbot, gui, gui, dock_area.object_name, dock_area._gui_id)
dock: client.BECDock = dock_area.new()
wait_for_namespace_change(qtbot, gui, dock_area, dock.object_name, dock._gui_id)
widget = dock.new(widget=widget_cls_name)
wait_for_namespace_change(qtbot, gui, dock, widget.object_name, widget._gui_id)
return dock, widget
@pytest.fixture(scope="module")
def random_generator_from_seed(request):
"""Fixture to get a random seed for the following tests."""
seed = request.config.getoption("--random-order-seed").split(":")[-1]
try:
seed = int(seed)
except ValueError: # Should not be required...
seed = 42
rng = random.Random(seed)
yield rng
def maybe_remove_dock_area(qtbot, gui: BECGuiClient, random_int_gen: random.Random):
"""Utility method to remove all dock_ares from gui object, likelihood 50%."""
random_int = random_int_gen.randint(0, 100)
if random_int >= 50:
# Needed, reference gets deleted in the gui
name = gui.dock_area.object_name
gui_id = gui.dock_area._gui_id
gui.delete("dock_area")
wait_for_namespace_change(
qtbot, gui=gui, parent_widget=gui, object_name=name, widget_gui_id=gui_id, exists=False
)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_abort_button(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the AbortButton widget."""
gui: BECGuiClient = connected_client_gui_obj
bec = gui._client
# Create dock_area, dock, widget
dock, widget = create_widget(qtbot, gui, gui.available_widgets.AbortButton)
dock: client.BECDock
widget: client.AbortButton
# No rpc calls to check so far
# Try detaching the dock
dock.detach()
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_bec_progress_bar(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the BECProgressBar widget."""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area, dock, widget
dock, widget = create_widget(qtbot, gui, gui.available_widgets.BECProgressBar)
dock: client.BECDock
widget: client.BECProgressBar
# Check rpc calls
assert widget.label_template == "$value / $maximum - $percentage %"
widget.set_maximum(100)
widget.set_minimum(50)
widget.set_value(75)
assert widget._get_label() == "75 / 100 - 50 %"
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_bec_queue(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the BECQueue widget."""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area, dock, widget
dock, widget = create_widget(qtbot, gui, gui.available_widgets.BECQueue)
dock: client.BECDock
widget: client.BECQueue
# No rpc calls to test so far
# maybe we can add an rpc call to check the queue length
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_bec_status_box(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the BECStatusBox widget."""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area, dock, widget
dock, widget = create_widget(qtbot, gui, gui.available_widgets.BECStatusBox)
# Check rpc calls
assert widget.get_server_state() in ["RUNNING", "IDLE", "BUSY", "ERROR"]
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_dap_combo_box(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the DAPComboBox widget."""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area, dock, widget
dock, widget = create_widget(qtbot, gui, gui.available_widgets.DapComboBox)
dock: client.BECDock
widget: client.DAPComboBox
# Check rpc calls
widget.select_fit_model("PseudoVoigtModel")
widget.select_x_axis("samx")
widget.select_y_axis("bpm4i")
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_device_browser(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the DeviceBrowser widget."""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area, dock, widget
dock, widget = create_widget(qtbot, gui, gui.available_widgets.DeviceBrowser)
dock: client.BECDock
widget: client.DeviceBrowser
# No rpc calls yet to check
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_device_combo_box(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the DeviceComboBox widget."""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area, dock, widget
dock, widget = create_widget(qtbot, gui, gui.available_widgets.DeviceComboBox)
dock: client.BECDock
widget: client.DeviceComboBox
# No rpc calls to check so far, maybe set_device should be exposed
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_device_line_edit(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the DeviceLineEdit widget."""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area, dock, widget
dock, widget = create_widget(qtbot, gui, gui.available_widgets.DeviceLineEdit)
dock: client.BECDock
widget: client.DeviceLineEdit
# No rpc calls to check so far
# Should probably have a set_device method
# No rpc calls to check so far, maybe set_device should be exposed
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_image(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the Image widget."""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area, dock, widget
dock, widget = create_widget(qtbot, gui, gui.available_widgets.Image)
dock: client.BECDock
widget: client.Image
scans = bec.scans
dev = bec.device_manager.devices
# Test rpc calls
img = widget.image(dev.eiger)
assert img.get_data() is None
# Run a scan and plot the image
s = scans.line_scan(dev.samx, -3, 3, steps=50, exp_time=0.01, relative=False)
s.wait()
def _wait_for_scan_in_history():
# Get scan item from history
scan_item = bec.history.get_by_scan_id(s.scan.scan_id)
return scan_item is not None
qtbot.waitUntil(_wait_for_scan_in_history, timeout=7000)
# Check that last image is equivalent to data in Redis
last_img = bec.device_monitor.get_data(
dev.eiger, count=1
) # Get last image from Redis monitor 2D endpoint
assert np.allclose(img.get_data(), last_img)
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
# TODO re-enable when issue is resolved #560
# @pytest.mark.timeout(PYTEST_TIMEOUT)
# def test_widgets_e2e_log_panel(qtbot, connected_client_gui_obj, random_generator_from_seed):
# """Test the LogPanel widget."""
# gui = connected_client_gui_obj
# bec = gui._client
# # Create dock_area, dock, widget
# dock, widget = create_widget(qtbot, gui, gui.available_widgets.LogPanel)
# dock: client.BECDock
# widget: client.LogPanel
# # No rpc calls to check so far
# # Test removing the widget, or leaving it open for the next test
# maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_minesweeper(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the MineSweeper widget."""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area, dock, widget
dock, widget = create_widget(qtbot, gui, gui.available_widgets.Minesweeper)
dock: client.BECDock
widget: client.MineSweeper
# No rpc calls to check so far
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_motor_map(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the MotorMap widget."""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area, dock, widget
dock, widget = create_widget(qtbot, gui, gui.available_widgets.MotorMap)
dock: client.BECDock
widget: client.MotorMap
# Test RPC calls
dev = bec.device_manager.devices
scans = bec.scans
# Set motor map to names
widget.map(dev.samx, dev.samy)
# Move motor samx to pos
pos = dev.samx.limits[1] - 1 # -1 from higher limit
scans.mv(dev.samx, pos, relative=False).wait()
# Check that data is up to date
assert np.isclose(widget.get_data()["x"][-1], pos, dev.samx.precision)
# Move motor samy to pos
pos = dev.samy.limits[0] + 1 # +1 from lower limit
scans.mv(dev.samy, pos, relative=False).wait()
# Check that data is up to date
assert np.isclose(widget.get_data()["y"][-1], pos, dev.samy.precision)
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_multi_waveform(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test MultiWaveform widget."""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area, dock, widget
dock, widget = create_widget(qtbot, gui, gui.available_widgets.MultiWaveform)
dock: client.BECDock
widget: client.MultiWaveform
# Test RPC calls
dev = bec.device_manager.devices
scans = bec.scans
# test plotting
cm = "cividis"
widget.plot(dev.waveform, color_palette=cm)
assert widget.monitor == dev.waveform.name
assert widget.color_palette == cm
# Scan with BEC
s = scans.line_scan(dev.samx, -3, 3, steps=5, exp_time=0.01, relative=False)
s.wait()
def _wait_for_scan_in_history():
# Get scan item from history
scan_item = bec.history.get_by_scan_id(s.scan.scan_id)
return scan_item is not None
qtbot.waitUntil(_wait_for_scan_in_history, timeout=7000)
# Wait for data in history (should be plotted?)
# TODO how can we check that the data was plotted, implement get_data()
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_positioner_indicator(
qtbot, connected_client_gui_obj, random_generator_from_seed
):
"""Test the PositionIndicator widget."""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area, dock, widget
dock, widget = create_widget(qtbot, gui, gui.available_widgets.PositionIndicator)
dock: client.BECDock
widget: client.PositionIndicator
# TODO check what these rpc calls are supposed to do! Issue created #461
widget.set_value(5)
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_positioner_box(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the PositionerBox widget."""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area, dock, widget
dock, widget = create_widget(qtbot, gui, gui.available_widgets.PositionerBox)
dock: client.BECDock
widget: client.PositionerBox
# Test rpc calls
dev = bec.device_manager.devices
scans = bec.scans
# No rpc calls to check so far
widget.set_positioner(dev.samx)
widget.set_positioner(dev.samy.name)
scans.mv(dev.samy, -3, relative=False).wait()
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_positioner_box_2d(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the PositionerBox2D widget."""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area, dock, widget
dock, widget = create_widget(qtbot, gui, gui.available_widgets.PositionerBox2D)
dock: client.BECDock
widget: client.PositionerBox2D
# Test rpc calls
dev = bec.device_manager.devices
scans = bec.scans
# No rpc calls to check so far
widget.set_positioner_hor(dev.samx)
widget.set_positioner_ver(dev.samy)
# Try moving the motors
scans.mv(dev.samx, 3, relative=False).wait()
scans.mv(dev.samy, -3, relative=False).wait()
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_positioner_control_line(
qtbot, connected_client_gui_obj, random_generator_from_seed
):
"""Test the positioner control line widget"""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area, dock, widget
dock, widget = create_widget(qtbot, gui, gui.available_widgets.PositionerControlLine)
dock: client.BECDock
widget: client.PositionerControlLine
# Test rpc calls
dev = bec.device_manager.devices
scans = bec.scans
# Set positioner
widget.set_positioner(dev.samx)
scans.mv(dev.samx, 3, relative=False).wait()
widget.set_positioner(dev.samy.name)
scans.mv(dev.samy, -3, relative=False).wait()
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_ring_progress_bar(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the RingProgressBar widget"""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area, dock, widget
dock, widget = create_widget(qtbot, gui, gui.available_widgets.RingProgressBar)
dock: client.BECDock
widget: client.RingProgressBar
# Test rpc calls
dev = bec.device_manager.devices
scans = bec.scans
# Do a scan
scans.line_scan(dev.samx, -3, 3, steps=50, exp_time=0.01, relative=False).wait()
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_scan_control(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the ScanControl widget"""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area, dock, widget
dock, widget = create_widget(qtbot, gui, gui.available_widgets.ScanControl)
dock: client.BECDock
widget: client.ScanControl
# No rpc calls to check so far
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_scatter_waveform(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the ScatterWaveform widget"""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area, dock, widget
dock, widget = create_widget(qtbot, gui, gui.available_widgets.ScatterWaveform)
dock: client.BECDock
widget: client.ScatterWaveform
# Test rpc calls
dev = bec.device_manager.devices
scans = bec.scans
widget.plot(dev.samx, dev.samy, dev.bpm4i)
scans.grid_scan(dev.samx, -5, 5, 5, dev.samy, -5, 5, 5, exp_time=0.01, relative=False).wait()
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_stop_button(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the StopButton widget"""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area, dock, widget
dock, widget = create_widget(qtbot, gui, gui.available_widgets.StopButton)
dock: client.BECDock
widget: client.StopButton
# No rpc calls to check so far
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_resume_button(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the StopButton widget"""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area, dock, widget
dock, widget = create_widget(qtbot, gui, gui.available_widgets.ResumeButton)
dock: client.BECDock
widget: client.ResumeButton
# No rpc calls to check so far
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_reset_button(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the StopButton widget"""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area, dock, widget
dock, widget = create_widget(qtbot, gui, gui.available_widgets.ResetButton)
dock: client.BECDock
widget: client.ResetButton
# No rpc calls to check so far
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_text_box(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the TextBox widget"""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area, dock, widget
dock, widget = create_widget(qtbot, gui, gui.available_widgets.TextBox)
dock: client.BECDock
widget: client.TextBox
# RPC calls
widget.set_plain_text("Hello World")
widget.set_html_text("<b> Hello World HTML </b>")
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)
@pytest.mark.timeout(PYTEST_TIMEOUT)
def test_widgets_e2e_waveform(qtbot, connected_client_gui_obj, random_generator_from_seed):
"""Test the Waveform widget"""
gui = connected_client_gui_obj
bec = gui._client
# Create dock_area, dock, widget
dock, widget = create_widget(qtbot, gui, gui.available_widgets.Waveform)
dock: client.BECDock
widget: client.Waveform
# Test rpc calls
dev = bec.device_manager.devices
scans = bec.scans
widget.plot(dev.bpm4i)
s = scans.line_scan(dev.samx, -3, 3, steps=50, exp_time=0.01, relative=False)
s.wait()
def _wait_for_scan_in_history():
# Get scan item from history
scan_item = bec.history.get_by_scan_id(s.scan.scan_id)
return scan_item is not None
qtbot.waitUntil(_wait_for_scan_in_history, timeout=7000)
scan_item = bec.history.get_by_scan_id(s.scan.scan_id)
samx_data = scan_item.devices.samx.samx.read()["value"]
bpm4i_data = scan_item.devices.bpm4i.bpm4i.read()["value"]
curve = widget.curves[0]
assert np.allclose(curve.get_data()[0], samx_data)
assert np.allclose(curve.get_data()[1], bpm4i_data)
# Test removing the widget, or leaving it open for the next test
maybe_remove_dock_area(qtbot, gui=gui, random_int_gen=random_generator_from_seed)

View File

@ -1,4 +1,6 @@
import json
from types import SimpleNamespace
from unittest import mock
from unittest.mock import MagicMock
import numpy as np
@ -19,6 +21,8 @@ from tests.unit_tests.client_mocks import (
from .conftest import create_widget
# pylint: disable=unexpected-keyword-arg
##################################################
# Waveform widget base functionality tests
##################################################
@ -541,7 +545,14 @@ def test_on_async_readback_add_update(qtbot, mocked_client):
msg = {"signals": {"async_device": {"value": [100, 200], "timestamp": [1001, 1002]}}}
metadata = {"async_update": {"max_shape": [None], "type": "add"}}
wf.on_async_readback(msg, metadata)
cb_info_ret = {"scan_id": wf.scan_id}
def ret_sender():
return SimpleNamespace(cb_info={"scan_id": wf.scan_id})
with mock.patch.object(wf, "sender", side_effect=ret_sender):
wf.on_async_readback(msg, metadata, _override_slot_params={"verify_sender": False})
x_data, y_data = c.get_data()
assert len(x_data) == 5
@ -553,7 +564,8 @@ def test_on_async_readback_add_update(qtbot, mocked_client):
# instruction='replace'
msg2 = {"signals": {"async_device": {"value": [999], "timestamp": [555]}}}
metadata2 = {"async_update": {"max_shape": [None], "type": "replace"}}
wf.on_async_readback(msg2, metadata2)
with mock.patch.object(wf, "sender", side_effect=ret_sender):
wf.on_async_readback(msg2, metadata2, _override_slot_params={"verify_sender": False})
x_data2, y_data2 = c.get_data()
np.testing.assert_array_equal(x_data2, [0])
@ -568,7 +580,8 @@ def test_on_async_readback_add_update(qtbot, mocked_client):
metadata = {
"async_update": {"max_shape": [None, waveform_shape], "index": 0, "type": "add_slice"}
}
wf.on_async_readback(msg, metadata)
with mock.patch.object(wf, "sender", side_effect=ret_sender):
wf.on_async_readback(msg, metadata, _override_slot_params={"verify_sender": False})
# Old data should be deleted since the slice_index did not match
x_data, y_data = c.get_data()
@ -595,7 +608,8 @@ def test_on_async_readback_add_update(qtbot, mocked_client):
metadata = {
"async_update": {"max_shape": [None, waveform_shape], "index": 0, "type": "add_slice"}
}
wf.on_async_readback(msg, metadata)
with mock.patch.object(wf, "sender", side_effect=ret_sender):
wf.on_async_readback(msg, metadata, _override_slot_params={"verify_sender": False})
x_data, y_data = c.get_data()
assert len(y_data) == waveform_shape
assert len(x_data) == waveform_shape
@ -616,7 +630,8 @@ def test_on_async_readback_add_update(qtbot, mocked_client):
}
}
metadata = {"async_update": {"type": "replace"}}
wf.on_async_readback(msg, metadata)
with mock.patch.object(wf, "sender", side_effect=ret_sender):
wf.on_async_readback(msg, metadata, _override_slot_params={"verify_sender": False})
x_data, y_data = c.get_data()
assert np.array_equal(y_data, np.array(range(waveform_shape)))