0
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2025-07-13 11:11:49 +02:00

tests(e2e): wait for the plotting to finish before checking the data

This commit is contained in:
2025-01-09 11:15:43 +01:00
parent 54e64c9f10
commit b4a240e463
3 changed files with 39 additions and 10 deletions

View File

@ -224,3 +224,11 @@ DEVICES = [
Positioner("test", limits=[-10, 10], read_value=2.0),
Device("test_device"),
]
def check_remote_data_size(widget, plot_name, num_elements):
"""
Check if the remote data has the correct number of elements.
Used in the qtbot.waitUntil function.
"""
return len(widget.get_all_data()[plot_name]["x"]) == num_elements

View File

@ -4,7 +4,8 @@ import numpy as np
import pytest
from bec_lib.endpoints import MessageEndpoints
from bec_widgets.cli.client import BECDockArea, BECFigure, BECImageShow, BECMotorMap, BECWaveform
from bec_widgets.cli.client import BECFigure, BECImageShow, BECMotorMap, BECWaveform
from bec_widgets.tests.utils import check_remote_data_size
from bec_widgets.utils import Colors
# pylint: disable=unused-argument
@ -12,7 +13,7 @@ from bec_widgets.utils import Colors
# pylint: disable=too-many-locals
def test_rpc_add_dock_with_figure_e2e(bec_client_lib, connected_client_dock):
def test_rpc_add_dock_with_figure_e2e(qtbot, bec_client_lib, connected_client_dock):
# BEC client shortcuts
dock = connected_client_dock
client = bec_client_lib
@ -88,14 +89,17 @@ def test_rpc_add_dock_with_figure_e2e(bec_client_lib, connected_client_dock):
# Try to make a scan
status = scans.line_scan(dev.samx, -5, 5, steps=10, exp_time=0.05, relative=False)
# wait for scan to finish
while not status.status == "COMPLETED":
time.sleep(0.2)
status.wait()
# plot
item = queue.scan_storage.storage[-1]
plt_last_scan_data = item.live_data if hasattr(item, "live_data") else item.data
num_elements = len(plt_last_scan_data["samx"]["samx"].val)
plot_name = "bpm4i-bpm4i"
qtbot.waitUntil(lambda: check_remote_data_size(plt, plot_name, num_elements))
plt_data = plt.get_all_data()
assert plt_data["bpm4i-bpm4i"]["x"] == plt_last_scan_data["samx"]["samx"].val
assert plt_data["bpm4i-bpm4i"]["y"] == plt_last_scan_data["bpm4i"]["bpm4i"].val
@ -255,11 +259,17 @@ def test_auto_update(bec_client_lib, connected_client_dock_w_auto_updates, qtbot
# get data from curves
widgets = plt.widget_list
qtbot.waitUntil(lambda: len(plt.widget_list) > 0, timeout=5000)
plt_data = widgets[0].get_all_data()
item = queue.scan_storage.storage[-1]
last_scan_data = item.live_data if hasattr(item, "live_data") else item.data
num_elements = len(last_scan_data["samx"]["samx"].val)
plot_name = f"Scan {status.scan.scan_number} - {dock.selected_device}"
qtbot.waitUntil(lambda: check_remote_data_size(widgets[0], plot_name, num_elements))
plt_data = widgets[0].get_all_data()
# check plotted data
assert (
plt_data[f"Scan {status.scan.scan_number} - bpm4i"]["x"]
@ -277,12 +287,18 @@ def test_auto_update(bec_client_lib, connected_client_dock_w_auto_updates, qtbot
plt = auto_updates.get_default_figure()
widgets = plt.widget_list
qtbot.waitUntil(lambda: len(plt.widget_list) > 0, timeout=5000)
plt_data = widgets[0].get_all_data()
item = queue.scan_storage.storage[-1]
last_scan_data = item.live_data if hasattr(item, "live_data") else item.data
plot_name = f"Scan {status.scan.scan_number} - bpm4i"
num_elements_bec = len(last_scan_data["samx"]["samx"].val)
qtbot.waitUntil(lambda: check_remote_data_size(widgets[0], plot_name, num_elements_bec))
plt_data = widgets[0].get_all_data()
# check plotted data
assert (
plt_data[f"Scan {status.scan.scan_number} - {dock.selected_device}"]["x"]

View File

@ -1,10 +1,10 @@
import time
import numpy as np
import pytest
from bec_lib.endpoints import MessageEndpoints
from bec_widgets.cli.client import BECFigure, BECImageShow, BECMotorMap, BECWaveform
from bec_widgets.tests.utils import check_remote_data_size
def test_rpc_waveform1d_custom_curve(connected_client_figure):
@ -78,7 +78,7 @@ def test_rpc_plotting_shortcuts_init_configs(connected_client_figure, qtbot):
}
def test_rpc_waveform_scan(connected_client_figure, bec_client_lib):
def test_rpc_waveform_scan(qtbot, connected_client_figure, bec_client_lib):
fig = BECFigure(connected_client_figure)
# add 3 different curves to track
@ -97,6 +97,11 @@ def test_rpc_waveform_scan(connected_client_figure, bec_client_lib):
item = queue.scan_storage.storage[-1]
last_scan_data = item.live_data if hasattr(item, "live_data") else item.data
num_elements = len(last_scan_data["samx"]["samx"].val)
for plot_name in ["bpm4i-bpm4i", "bpm3a-bpm3a", "bpm4d-bpm4d"]:
qtbot.waitUntil(lambda: check_remote_data_size(plt, plot_name, num_elements))
# get data from curves
plt_data = plt.get_all_data()