0
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2025-07-13 19:21:50 +02:00

test(e2e): e2e tests adjusted for new plotting framework

This commit is contained in:
2025-03-23 13:20:05 +01:00
parent 6ade934356
commit 378398a29b
6 changed files with 280 additions and 53 deletions

View File

@ -1332,6 +1332,14 @@ class ImageItem(RPCBase):
Get or set whether the image is transposed.
"""
@rpc_call
def get_data(self) -> "np.ndarray":
"""
Get the data of the image.
Returns:
np.ndarray: The data of the image.
"""
class LMFitDialog(RPCBase):
"""Dialog for displaying the fit summary and params for LMFit DAP processes"""
@ -3431,6 +3439,35 @@ class Waveform(RPCBase):
dict[str, dict]: DAP summary of all DAP curves.
"""
@rpc_call
def get_all_data(self, output: "Literal['dict', 'pandas']" = "dict") -> "dict":
"""
Extract all curve data into a dictionary or a pandas DataFrame.
Args:
output (Literal["dict", "pandas"]): Format of the output data.
Returns:
dict | pd.DataFrame: Data of all curves in the specified format.
"""
@rpc_call
def get_curve(self, curve: "int | str") -> "Curve | None":
"""
Get a curve from the plot widget.
Args:
curve(int|str): The curve to get. It Can be the order of the curve or the name of the curve.
Return(Curve|None): The curve object if found, None otherwise.
"""
@rpc_call
def select_roi(self, region: "tuple[float, float]"):
"""
Public method if you want the old `select_roi` style.
"""
class WebsiteWidget(RPCBase):
"""A simple widget to display a website"""

View File

@ -65,6 +65,7 @@ class ImageItem(BECConnector, pg.ImageItem):
"rotation.setter",
"transpose",
"transpose.setter",
"get_data",
]
vRangeChangedManually = Signal(tuple)
@ -251,7 +252,14 @@ class ImageItem(BECConnector, pg.ImageItem):
self._process_image()
################################################################################
# Data Update Logic
# Export
def get_data(self) -> np.ndarray:
"""
Get the data of the image.
Returns:
np.ndarray: The data of the image.
"""
return self.image
def clear(self):
super().clear()

View File

@ -96,6 +96,9 @@ class Waveform(PlotBase):
"update_with_scan_history",
"get_dap_params",
"get_dap_summary",
"get_all_data",
"get_curve",
"select_roi",
]
sync_signal_update = Signal()

View File

@ -4,7 +4,6 @@ import random
import pytest
from bec_widgets.cli.client_utils import BECGuiClient
from bec_widgets.cli.client_utils import BECGuiClient, _start_plot_process
from bec_widgets.utils import BECDispatcher
@ -25,28 +24,8 @@ def threads_check_fixture(threads_check):
@pytest.fixture
def gui_id():
return f"figure_{random.randint(0,100)}" # make a new gui id each time, to ensure no 'gui is alive' zombie key can perturbate
@contextmanager
def plot_server(gui_id, klass, client_lib):
dispatcher = BECDispatcher(client=client_lib) # Has to init singleton with fixture client
process, _ = _start_plot_process(
gui_id, klass, gui_class_id="bec", config=client_lib._client._service_config.config_path
)
try:
while client_lib._client.connector.get(MessageEndpoints.gui_heartbeat(gui_id)) is None:
time.sleep(0.3)
yield gui_id
finally:
process.terminate()
process.wait()
dispatcher.disconnect_all()
dispatcher.reset_singleton()
"""New gui id each time, to ensure no 'gui is alive' zombie key can perturbate"""
return f"figure_{random.randint(0,100)}"
return f"figure_{random.randint(0,100)}" # make a new gui id each time, to ensure no 'gui is alive' zombie key can perturbate
@pytest.fixture

View File

@ -1,13 +1,7 @@
import time
import numpy as np
import pytest
from bec_lib.endpoints import MessageEndpoints
from bec_widgets.cli.client import BECFigure, BECImageShow, BECMotorMap, BECWaveform
from bec_widgets.cli import Image, MotorMap, Waveform
from bec_widgets.cli.rpc.rpc_base import RPCReference
from bec_widgets.tests.utils import check_remote_data_size
from bec_widgets.utils import Colors
# pylint: disable=unused-argument
# pylint: disable=redefined-outer-name
@ -34,7 +28,7 @@ def test_gui_rpc_registry(qtbot, connected_client_gui_obj):
assert hasattr(gui.cool_dock_area, "dock_0")
def test_rpc_add_dock_with_figure_e2e(qtbot, bec_client_lib, connected_client_gui_obj):
def test_rpc_add_dock_with_plots_e2e(qtbot, bec_client_lib, connected_client_gui_obj):
gui = connected_client_gui_obj
# BEC client shortcuts
@ -62,16 +56,13 @@ def test_rpc_add_dock_with_figure_e2e(qtbot, bec_client_lib, connected_client_gu
assert hasattr(gui.bec, "dock_0")
# Add 3 figures with some widgets
fig0 = d0.new("BECFigure")
fig1 = d1.new("BECFigure")
fig2 = d2.new("BECFigure")
wf = d0.new("Waveform")
im = d1.new("Image")
mm = d2.new("MotorMap")
def check_figs_registered():
return all(
[
gui_id in gui._server_registry
for gui_id in [fig0._gui_id, fig1._gui_id, fig2._gui_id]
]
[gui_id in gui._server_registry for gui_id in [wf._gui_id, im._gui_id, mm._gui_id]]
)
qtbot.waitUntil(check_figs_registered, timeout=5000)
@ -80,23 +71,24 @@ def test_rpc_add_dock_with_figure_e2e(qtbot, bec_client_lib, connected_client_gu
assert len(d1.element_list) == 1
assert len(d2.element_list) == 1
assert fig1.__class__.__name__ == "RPCReference"
assert fig1.__class__ == RPCReference
assert gui._ipython_registry[fig1._gui_id].__class__ == BECFigure
assert fig2.__class__.__name__ == "RPCReference"
assert fig2.__class__ == RPCReference
assert gui._ipython_registry[fig2._gui_id].__class__ == BECFigure
mm = fig0.motor_map("samx", "samy")
plt = fig1.plot(x_name="samx", y_name="bpm4i")
im = fig2.image("eiger")
assert mm.__class__.__name__ == "RPCReference"
assert mm.__class__ == RPCReference
assert plt.__class__.__name__ == "RPCReference"
assert plt.__class__ == RPCReference
assert wf.__class__.__name__ == "RPCReference"
assert wf.__class__ == RPCReference
assert gui._ipython_registry[wf._gui_id].__class__ == Waveform
assert im.__class__.__name__ == "RPCReference"
assert im.__class__ == RPCReference
assert gui._ipython_registry[im._gui_id].__class__ == Image
assert mm.__class__.__name__ == "RPCReference"
assert mm.__class__ == RPCReference
assert gui._ipython_registry[mm._gui_id].__class__ == MotorMap
mm.map("samx", "samy")
curve = wf.plot(x_name="samx", y_name="bpm4i")
im_item = im.image("eiger")
assert curve.__class__.__name__ == "RPCReference"
assert curve.__class__ == RPCReference
assert im_item.__class__.__name__ == "RPCReference"
assert im_item.__class__ == RPCReference
def test_dock_manipulations_e2e(qtbot, connected_client_gui_obj):

View File

@ -0,0 +1,208 @@
import time
import numpy as np
import pytest
from bec_lib.endpoints import MessageEndpoints
from bec_widgets.cli.client import Image, MotorMap, MultiWaveform, ScatterWaveform, Waveform
from bec_widgets.cli.rpc.rpc_base import RPCReference
from bec_widgets.tests.utils import check_remote_data_size
def test_rpc_waveform1d_custom_curve(qtbot, connected_client_gui_obj):
gui = connected_client_gui_obj
dock = gui.bec
wf = dock.new("wf_dock").new("Waveform")
c1 = wf.plot(x=[1, 2, 3], y=[1, 2, 3])
c1.set_color("red")
assert c1._config_dict["color"] == "red"
c1.set_color("blue")
assert c1._config_dict["color"] == "blue"
assert len(wf.curves) == 1
def test_rpc_plotting_shortcuts_init_configs(qtbot, connected_client_gui_obj):
gui = connected_client_gui_obj
dock = gui.bec
wf = dock.new("wf_dock").new("Waveform")
im = dock.new("im_dock").new("Image")
mm = dock.new("mm_dock").new("MotorMap")
sw = dock.new("sw_dock").new("ScatterWaveform")
mw = dock.new("mw_dock").new("MultiWaveform")
c1 = wf.plot(x_name="samx", y_name="bpm4i")
im_item = im.image(monitor="eiger")
mm.map(x_name="samx", y_name="samy")
sw.plot(x_name="samx", y_name="samy", z_name="bpm4i")
mw.plot(monitor="waveform")
# Checking if classes are correctly initialised
assert len(dock.panel_list) == 5
assert wf.__class__.__name__ == "RPCReference"
assert wf.__class__ == RPCReference
assert gui._ipython_registry[wf._gui_id].__class__ == Waveform
assert im.__class__.__name__ == "RPCReference"
assert im.__class__ == RPCReference
assert gui._ipython_registry[im._gui_id].__class__ == Image
assert mm.__class__.__name__ == "RPCReference"
assert mm.__class__ == RPCReference
assert gui._ipython_registry[mm._gui_id].__class__ == MotorMap
assert sw.__class__.__name__ == "RPCReference"
assert sw.__class__ == RPCReference
assert gui._ipython_registry[sw._gui_id].__class__ == ScatterWaveform
assert mw.__class__.__name__ == "RPCReference"
assert mw.__class__ == RPCReference
assert gui._ipython_registry[mw._gui_id].__class__ == MultiWaveform
# check if the correct devices are set
# Curve
assert c1._config["signal"] == {
"dap": None,
"name": "bpm4i",
"entry": "bpm4i",
"dap_oversample": 1,
}
assert c1._config["source"] == "device"
assert c1._config["label"] == "bpm4i-bpm4i"
# Image Item
assert im_item._config["monitor"] == "eiger"
assert im_item._config["source"] == "auto"
def test_rpc_waveform_scan(qtbot, bec_client_lib, connected_client_gui_obj):
gui = connected_client_gui_obj
dock = gui.bec
client = bec_client_lib
dev = client.device_manager.devices
scans = client.scans
queue = client.queue
wf = dock.new("wf_dock").new("Waveform")
# add 3 different curves to track
wf.plot(x_name="samx", y_name="bpm4i")
wf.plot(x_name="samx", y_name="bpm3a")
wf.plot(x_name="samx", y_name="bpm4d")
status = scans.line_scan(dev.samx, -5, 5, steps=10, exp_time=0.05, relative=False)
status.wait()
item = queue.scan_storage.storage[-1]
last_scan_data = item.live_data if hasattr(item, "live_data") else item.data
num_elements = 10
for plot_name in ["bpm4i-bpm4i", "bpm3a-bpm3a", "bpm4d-bpm4d"]:
qtbot.waitUntil(lambda: check_remote_data_size(wf, plot_name, num_elements))
# get data from curves
plt_data = wf.get_all_data()
# check plotted data
assert plt_data["bpm4i-bpm4i"]["x"] == last_scan_data["samx"]["samx"].val
assert plt_data["bpm4i-bpm4i"]["y"] == last_scan_data["bpm4i"]["bpm4i"].val
assert plt_data["bpm3a-bpm3a"]["x"] == last_scan_data["samx"]["samx"].val
assert plt_data["bpm3a-bpm3a"]["y"] == last_scan_data["bpm3a"]["bpm3a"].val
assert plt_data["bpm4d-bpm4d"]["x"] == last_scan_data["samx"]["samx"].val
assert plt_data["bpm4d-bpm4d"]["y"] == last_scan_data["bpm4d"]["bpm4d"].val
def test_rpc_image(qtbot, bec_client_lib, connected_client_gui_obj):
gui = connected_client_gui_obj
dock = gui.bec
client = bec_client_lib
dev = client.device_manager.devices
scans = client.scans
queue = client.queue
im = dock.new("im_dock").new("Image")
im.image(monitor="eiger")
status = scans.line_scan(dev.samx, -5, 5, steps=10, exp_time=0.05, relative=False)
status.wait()
last_image_device = client.connector.get_last(MessageEndpoints.device_monitor_2d("eiger"))[
"data"
].data
last_image_plot = im.main_image.get_data()
# check plotted data
np.testing.assert_equal(last_image_device, last_image_plot)
def test_rpc_motor_map(qtbot, bec_client_lib, connected_client_gui_obj):
gui = connected_client_gui_obj
client = bec_client_lib
dev = client.device_manager.devices
scans = client.scans
dock = gui.bec
motor_map = dock.new("mm_dock").new("MotorMap")
motor_map.map(x_name="samx", y_name="samy")
initial_pos_x = dev.samx.read()["samx"]["value"]
initial_pos_y = dev.samy.read()["samy"]["value"]
status = scans.mv(dev.samx, 1, dev.samy, 2, relative=True)
status.wait()
final_pos_x = dev.samx.read()["samx"]["value"]
final_pos_y = dev.samy.read()["samy"]["value"]
# check plotted data
motor_map_data = motor_map.get_data()
np.testing.assert_equal(
[motor_map_data["x"][0], motor_map_data["y"][0]], [initial_pos_x, initial_pos_y]
)
np.testing.assert_equal(
[motor_map_data["x"][-1], motor_map_data["y"][-1]], [final_pos_x, final_pos_y]
)
def test_dap_rpc(qtbot, bec_client_lib, connected_client_gui_obj):
gui = connected_client_gui_obj
client = bec_client_lib
dev = client.device_manager.devices
scans = client.scans
dock = gui.bec
wf = dock.new("wf_dock").new("Waveform")
wf.plot(x_name="samx", y_name="bpm4i", dap="GaussianModel")
dev.bpm4i.sim.select_model("GaussianModel")
params = dev.bpm4i.sim.params
params.update(
{"noise": "uniform", "noise_multiplier": 10, "center": 5, "sigma": 1, "amplitude": 200}
)
dev.bpm4i.sim.params = params
time.sleep(1)
res = scans.line_scan(dev.samx, 0, 8, steps=50, relative=False)
res.wait()
# especially on slow machines, the fit might not be done yet
# so we wait until the fit reaches the expected value
def wait_for_fit():
dap_curve = wf.get_curve("bpm4i-bpm4i-GaussianModel")
fit_params = dap_curve.dap_params
if fit_params is None:
return False
print(fit_params)
return np.isclose(fit_params["center"], 5, atol=0.5)
qtbot.waitUntil(wait_for_fit, timeout=10000)
# Repeat fit after adding a region of interest
wf.select_roi(region=(3, 7))
res = scans.line_scan(dev.samx, 0, 8, steps=50, relative=False)
res.wait()
qtbot.waitUntil(wait_for_fit, timeout=10000)