mirror of
https://github.com/bec-project/bec_widgets.git
synced 2025-07-13 19:21:50 +02:00
test(e2e): e2e tests adjusted for new plotting framework
This commit is contained in:
@ -1332,6 +1332,14 @@ class ImageItem(RPCBase):
|
||||
Get or set whether the image is transposed.
|
||||
"""
|
||||
|
||||
@rpc_call
|
||||
def get_data(self) -> "np.ndarray":
|
||||
"""
|
||||
Get the data of the image.
|
||||
Returns:
|
||||
np.ndarray: The data of the image.
|
||||
"""
|
||||
|
||||
|
||||
class LMFitDialog(RPCBase):
|
||||
"""Dialog for displaying the fit summary and params for LMFit DAP processes"""
|
||||
@ -3431,6 +3439,35 @@ class Waveform(RPCBase):
|
||||
dict[str, dict]: DAP summary of all DAP curves.
|
||||
"""
|
||||
|
||||
@rpc_call
|
||||
def get_all_data(self, output: "Literal['dict', 'pandas']" = "dict") -> "dict":
|
||||
"""
|
||||
Extract all curve data into a dictionary or a pandas DataFrame.
|
||||
|
||||
Args:
|
||||
output (Literal["dict", "pandas"]): Format of the output data.
|
||||
|
||||
Returns:
|
||||
dict | pd.DataFrame: Data of all curves in the specified format.
|
||||
"""
|
||||
|
||||
@rpc_call
|
||||
def get_curve(self, curve: "int | str") -> "Curve | None":
|
||||
"""
|
||||
Get a curve from the plot widget.
|
||||
|
||||
Args:
|
||||
curve(int|str): The curve to get. It Can be the order of the curve or the name of the curve.
|
||||
|
||||
Return(Curve|None): The curve object if found, None otherwise.
|
||||
"""
|
||||
|
||||
@rpc_call
|
||||
def select_roi(self, region: "tuple[float, float]"):
|
||||
"""
|
||||
Public method if you want the old `select_roi` style.
|
||||
"""
|
||||
|
||||
|
||||
class WebsiteWidget(RPCBase):
|
||||
"""A simple widget to display a website"""
|
||||
|
@ -65,6 +65,7 @@ class ImageItem(BECConnector, pg.ImageItem):
|
||||
"rotation.setter",
|
||||
"transpose",
|
||||
"transpose.setter",
|
||||
"get_data",
|
||||
]
|
||||
|
||||
vRangeChangedManually = Signal(tuple)
|
||||
@ -251,7 +252,14 @@ class ImageItem(BECConnector, pg.ImageItem):
|
||||
self._process_image()
|
||||
|
||||
################################################################################
|
||||
# Data Update Logic
|
||||
# Export
|
||||
def get_data(self) -> np.ndarray:
|
||||
"""
|
||||
Get the data of the image.
|
||||
Returns:
|
||||
np.ndarray: The data of the image.
|
||||
"""
|
||||
return self.image
|
||||
|
||||
def clear(self):
|
||||
super().clear()
|
||||
|
@ -96,6 +96,9 @@ class Waveform(PlotBase):
|
||||
"update_with_scan_history",
|
||||
"get_dap_params",
|
||||
"get_dap_summary",
|
||||
"get_all_data",
|
||||
"get_curve",
|
||||
"select_roi",
|
||||
]
|
||||
|
||||
sync_signal_update = Signal()
|
||||
|
@ -4,7 +4,6 @@ import random
|
||||
|
||||
import pytest
|
||||
|
||||
from bec_widgets.cli.client_utils import BECGuiClient
|
||||
from bec_widgets.cli.client_utils import BECGuiClient, _start_plot_process
|
||||
from bec_widgets.utils import BECDispatcher
|
||||
|
||||
@ -25,28 +24,8 @@ def threads_check_fixture(threads_check):
|
||||
|
||||
@pytest.fixture
|
||||
def gui_id():
|
||||
return f"figure_{random.randint(0,100)}" # make a new gui id each time, to ensure no 'gui is alive' zombie key can perturbate
|
||||
|
||||
|
||||
@contextmanager
|
||||
def plot_server(gui_id, klass, client_lib):
|
||||
dispatcher = BECDispatcher(client=client_lib) # Has to init singleton with fixture client
|
||||
process, _ = _start_plot_process(
|
||||
gui_id, klass, gui_class_id="bec", config=client_lib._client._service_config.config_path
|
||||
)
|
||||
try:
|
||||
while client_lib._client.connector.get(MessageEndpoints.gui_heartbeat(gui_id)) is None:
|
||||
time.sleep(0.3)
|
||||
yield gui_id
|
||||
finally:
|
||||
process.terminate()
|
||||
process.wait()
|
||||
dispatcher.disconnect_all()
|
||||
dispatcher.reset_singleton()
|
||||
|
||||
|
||||
"""New gui id each time, to ensure no 'gui is alive' zombie key can perturbate"""
|
||||
return f"figure_{random.randint(0,100)}"
|
||||
return f"figure_{random.randint(0,100)}" # make a new gui id each time, to ensure no 'gui is alive' zombie key can perturbate
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
|
@ -1,13 +1,7 @@
|
||||
import time
|
||||
|
||||
import numpy as np
|
||||
import pytest
|
||||
from bec_lib.endpoints import MessageEndpoints
|
||||
|
||||
from bec_widgets.cli.client import BECFigure, BECImageShow, BECMotorMap, BECWaveform
|
||||
from bec_widgets.cli import Image, MotorMap, Waveform
|
||||
from bec_widgets.cli.rpc.rpc_base import RPCReference
|
||||
from bec_widgets.tests.utils import check_remote_data_size
|
||||
from bec_widgets.utils import Colors
|
||||
|
||||
# pylint: disable=unused-argument
|
||||
# pylint: disable=redefined-outer-name
|
||||
@ -34,7 +28,7 @@ def test_gui_rpc_registry(qtbot, connected_client_gui_obj):
|
||||
assert hasattr(gui.cool_dock_area, "dock_0")
|
||||
|
||||
|
||||
def test_rpc_add_dock_with_figure_e2e(qtbot, bec_client_lib, connected_client_gui_obj):
|
||||
def test_rpc_add_dock_with_plots_e2e(qtbot, bec_client_lib, connected_client_gui_obj):
|
||||
|
||||
gui = connected_client_gui_obj
|
||||
# BEC client shortcuts
|
||||
@ -62,16 +56,13 @@ def test_rpc_add_dock_with_figure_e2e(qtbot, bec_client_lib, connected_client_gu
|
||||
assert hasattr(gui.bec, "dock_0")
|
||||
|
||||
# Add 3 figures with some widgets
|
||||
fig0 = d0.new("BECFigure")
|
||||
fig1 = d1.new("BECFigure")
|
||||
fig2 = d2.new("BECFigure")
|
||||
wf = d0.new("Waveform")
|
||||
im = d1.new("Image")
|
||||
mm = d2.new("MotorMap")
|
||||
|
||||
def check_figs_registered():
|
||||
return all(
|
||||
[
|
||||
gui_id in gui._server_registry
|
||||
for gui_id in [fig0._gui_id, fig1._gui_id, fig2._gui_id]
|
||||
]
|
||||
[gui_id in gui._server_registry for gui_id in [wf._gui_id, im._gui_id, mm._gui_id]]
|
||||
)
|
||||
|
||||
qtbot.waitUntil(check_figs_registered, timeout=5000)
|
||||
@ -80,23 +71,24 @@ def test_rpc_add_dock_with_figure_e2e(qtbot, bec_client_lib, connected_client_gu
|
||||
assert len(d1.element_list) == 1
|
||||
assert len(d2.element_list) == 1
|
||||
|
||||
assert fig1.__class__.__name__ == "RPCReference"
|
||||
assert fig1.__class__ == RPCReference
|
||||
assert gui._ipython_registry[fig1._gui_id].__class__ == BECFigure
|
||||
assert fig2.__class__.__name__ == "RPCReference"
|
||||
assert fig2.__class__ == RPCReference
|
||||
assert gui._ipython_registry[fig2._gui_id].__class__ == BECFigure
|
||||
|
||||
mm = fig0.motor_map("samx", "samy")
|
||||
plt = fig1.plot(x_name="samx", y_name="bpm4i")
|
||||
im = fig2.image("eiger")
|
||||
|
||||
assert mm.__class__.__name__ == "RPCReference"
|
||||
assert mm.__class__ == RPCReference
|
||||
assert plt.__class__.__name__ == "RPCReference"
|
||||
assert plt.__class__ == RPCReference
|
||||
assert wf.__class__.__name__ == "RPCReference"
|
||||
assert wf.__class__ == RPCReference
|
||||
assert gui._ipython_registry[wf._gui_id].__class__ == Waveform
|
||||
assert im.__class__.__name__ == "RPCReference"
|
||||
assert im.__class__ == RPCReference
|
||||
assert gui._ipython_registry[im._gui_id].__class__ == Image
|
||||
assert mm.__class__.__name__ == "RPCReference"
|
||||
assert mm.__class__ == RPCReference
|
||||
assert gui._ipython_registry[mm._gui_id].__class__ == MotorMap
|
||||
|
||||
mm.map("samx", "samy")
|
||||
curve = wf.plot(x_name="samx", y_name="bpm4i")
|
||||
im_item = im.image("eiger")
|
||||
|
||||
assert curve.__class__.__name__ == "RPCReference"
|
||||
assert curve.__class__ == RPCReference
|
||||
assert im_item.__class__.__name__ == "RPCReference"
|
||||
assert im_item.__class__ == RPCReference
|
||||
|
||||
|
||||
def test_dock_manipulations_e2e(qtbot, connected_client_gui_obj):
|
||||
|
208
tests/end-2-end/test_plotting_framework_e2e.py
Normal file
208
tests/end-2-end/test_plotting_framework_e2e.py
Normal file
@ -0,0 +1,208 @@
|
||||
import time
|
||||
|
||||
import numpy as np
|
||||
import pytest
|
||||
from bec_lib.endpoints import MessageEndpoints
|
||||
|
||||
from bec_widgets.cli.client import Image, MotorMap, MultiWaveform, ScatterWaveform, Waveform
|
||||
from bec_widgets.cli.rpc.rpc_base import RPCReference
|
||||
from bec_widgets.tests.utils import check_remote_data_size
|
||||
|
||||
|
||||
def test_rpc_waveform1d_custom_curve(qtbot, connected_client_gui_obj):
|
||||
gui = connected_client_gui_obj
|
||||
dock = gui.bec
|
||||
|
||||
wf = dock.new("wf_dock").new("Waveform")
|
||||
|
||||
c1 = wf.plot(x=[1, 2, 3], y=[1, 2, 3])
|
||||
c1.set_color("red")
|
||||
assert c1._config_dict["color"] == "red"
|
||||
c1.set_color("blue")
|
||||
assert c1._config_dict["color"] == "blue"
|
||||
|
||||
assert len(wf.curves) == 1
|
||||
|
||||
|
||||
def test_rpc_plotting_shortcuts_init_configs(qtbot, connected_client_gui_obj):
|
||||
gui = connected_client_gui_obj
|
||||
dock = gui.bec
|
||||
|
||||
wf = dock.new("wf_dock").new("Waveform")
|
||||
im = dock.new("im_dock").new("Image")
|
||||
mm = dock.new("mm_dock").new("MotorMap")
|
||||
sw = dock.new("sw_dock").new("ScatterWaveform")
|
||||
mw = dock.new("mw_dock").new("MultiWaveform")
|
||||
|
||||
c1 = wf.plot(x_name="samx", y_name="bpm4i")
|
||||
im_item = im.image(monitor="eiger")
|
||||
mm.map(x_name="samx", y_name="samy")
|
||||
sw.plot(x_name="samx", y_name="samy", z_name="bpm4i")
|
||||
mw.plot(monitor="waveform")
|
||||
|
||||
# Checking if classes are correctly initialised
|
||||
assert len(dock.panel_list) == 5
|
||||
assert wf.__class__.__name__ == "RPCReference"
|
||||
assert wf.__class__ == RPCReference
|
||||
assert gui._ipython_registry[wf._gui_id].__class__ == Waveform
|
||||
assert im.__class__.__name__ == "RPCReference"
|
||||
assert im.__class__ == RPCReference
|
||||
assert gui._ipython_registry[im._gui_id].__class__ == Image
|
||||
assert mm.__class__.__name__ == "RPCReference"
|
||||
assert mm.__class__ == RPCReference
|
||||
assert gui._ipython_registry[mm._gui_id].__class__ == MotorMap
|
||||
assert sw.__class__.__name__ == "RPCReference"
|
||||
assert sw.__class__ == RPCReference
|
||||
assert gui._ipython_registry[sw._gui_id].__class__ == ScatterWaveform
|
||||
assert mw.__class__.__name__ == "RPCReference"
|
||||
assert mw.__class__ == RPCReference
|
||||
assert gui._ipython_registry[mw._gui_id].__class__ == MultiWaveform
|
||||
|
||||
# check if the correct devices are set
|
||||
# Curve
|
||||
assert c1._config["signal"] == {
|
||||
"dap": None,
|
||||
"name": "bpm4i",
|
||||
"entry": "bpm4i",
|
||||
"dap_oversample": 1,
|
||||
}
|
||||
assert c1._config["source"] == "device"
|
||||
assert c1._config["label"] == "bpm4i-bpm4i"
|
||||
|
||||
# Image Item
|
||||
assert im_item._config["monitor"] == "eiger"
|
||||
assert im_item._config["source"] == "auto"
|
||||
|
||||
|
||||
def test_rpc_waveform_scan(qtbot, bec_client_lib, connected_client_gui_obj):
|
||||
gui = connected_client_gui_obj
|
||||
dock = gui.bec
|
||||
|
||||
client = bec_client_lib
|
||||
dev = client.device_manager.devices
|
||||
scans = client.scans
|
||||
queue = client.queue
|
||||
|
||||
wf = dock.new("wf_dock").new("Waveform")
|
||||
|
||||
# add 3 different curves to track
|
||||
wf.plot(x_name="samx", y_name="bpm4i")
|
||||
wf.plot(x_name="samx", y_name="bpm3a")
|
||||
wf.plot(x_name="samx", y_name="bpm4d")
|
||||
|
||||
status = scans.line_scan(dev.samx, -5, 5, steps=10, exp_time=0.05, relative=False)
|
||||
status.wait()
|
||||
|
||||
item = queue.scan_storage.storage[-1]
|
||||
last_scan_data = item.live_data if hasattr(item, "live_data") else item.data
|
||||
|
||||
num_elements = 10
|
||||
|
||||
for plot_name in ["bpm4i-bpm4i", "bpm3a-bpm3a", "bpm4d-bpm4d"]:
|
||||
qtbot.waitUntil(lambda: check_remote_data_size(wf, plot_name, num_elements))
|
||||
|
||||
# get data from curves
|
||||
plt_data = wf.get_all_data()
|
||||
|
||||
# check plotted data
|
||||
assert plt_data["bpm4i-bpm4i"]["x"] == last_scan_data["samx"]["samx"].val
|
||||
assert plt_data["bpm4i-bpm4i"]["y"] == last_scan_data["bpm4i"]["bpm4i"].val
|
||||
assert plt_data["bpm3a-bpm3a"]["x"] == last_scan_data["samx"]["samx"].val
|
||||
assert plt_data["bpm3a-bpm3a"]["y"] == last_scan_data["bpm3a"]["bpm3a"].val
|
||||
assert plt_data["bpm4d-bpm4d"]["x"] == last_scan_data["samx"]["samx"].val
|
||||
assert plt_data["bpm4d-bpm4d"]["y"] == last_scan_data["bpm4d"]["bpm4d"].val
|
||||
|
||||
|
||||
def test_rpc_image(qtbot, bec_client_lib, connected_client_gui_obj):
|
||||
gui = connected_client_gui_obj
|
||||
dock = gui.bec
|
||||
|
||||
client = bec_client_lib
|
||||
dev = client.device_manager.devices
|
||||
scans = client.scans
|
||||
queue = client.queue
|
||||
|
||||
im = dock.new("im_dock").new("Image")
|
||||
im.image(monitor="eiger")
|
||||
|
||||
status = scans.line_scan(dev.samx, -5, 5, steps=10, exp_time=0.05, relative=False)
|
||||
status.wait()
|
||||
|
||||
last_image_device = client.connector.get_last(MessageEndpoints.device_monitor_2d("eiger"))[
|
||||
"data"
|
||||
].data
|
||||
last_image_plot = im.main_image.get_data()
|
||||
|
||||
# check plotted data
|
||||
np.testing.assert_equal(last_image_device, last_image_plot)
|
||||
|
||||
|
||||
def test_rpc_motor_map(qtbot, bec_client_lib, connected_client_gui_obj):
|
||||
gui = connected_client_gui_obj
|
||||
client = bec_client_lib
|
||||
dev = client.device_manager.devices
|
||||
scans = client.scans
|
||||
|
||||
dock = gui.bec
|
||||
motor_map = dock.new("mm_dock").new("MotorMap")
|
||||
motor_map.map(x_name="samx", y_name="samy")
|
||||
|
||||
initial_pos_x = dev.samx.read()["samx"]["value"]
|
||||
initial_pos_y = dev.samy.read()["samy"]["value"]
|
||||
|
||||
status = scans.mv(dev.samx, 1, dev.samy, 2, relative=True)
|
||||
status.wait()
|
||||
|
||||
final_pos_x = dev.samx.read()["samx"]["value"]
|
||||
final_pos_y = dev.samy.read()["samy"]["value"]
|
||||
|
||||
# check plotted data
|
||||
motor_map_data = motor_map.get_data()
|
||||
|
||||
np.testing.assert_equal(
|
||||
[motor_map_data["x"][0], motor_map_data["y"][0]], [initial_pos_x, initial_pos_y]
|
||||
)
|
||||
np.testing.assert_equal(
|
||||
[motor_map_data["x"][-1], motor_map_data["y"][-1]], [final_pos_x, final_pos_y]
|
||||
)
|
||||
|
||||
|
||||
def test_dap_rpc(qtbot, bec_client_lib, connected_client_gui_obj):
|
||||
gui = connected_client_gui_obj
|
||||
client = bec_client_lib
|
||||
dev = client.device_manager.devices
|
||||
scans = client.scans
|
||||
|
||||
dock = gui.bec
|
||||
wf = dock.new("wf_dock").new("Waveform")
|
||||
wf.plot(x_name="samx", y_name="bpm4i", dap="GaussianModel")
|
||||
|
||||
dev.bpm4i.sim.select_model("GaussianModel")
|
||||
params = dev.bpm4i.sim.params
|
||||
params.update(
|
||||
{"noise": "uniform", "noise_multiplier": 10, "center": 5, "sigma": 1, "amplitude": 200}
|
||||
)
|
||||
dev.bpm4i.sim.params = params
|
||||
time.sleep(1)
|
||||
|
||||
res = scans.line_scan(dev.samx, 0, 8, steps=50, relative=False)
|
||||
res.wait()
|
||||
|
||||
# especially on slow machines, the fit might not be done yet
|
||||
# so we wait until the fit reaches the expected value
|
||||
def wait_for_fit():
|
||||
dap_curve = wf.get_curve("bpm4i-bpm4i-GaussianModel")
|
||||
fit_params = dap_curve.dap_params
|
||||
if fit_params is None:
|
||||
return False
|
||||
print(fit_params)
|
||||
return np.isclose(fit_params["center"], 5, atol=0.5)
|
||||
|
||||
qtbot.waitUntil(wait_for_fit, timeout=10000)
|
||||
|
||||
# Repeat fit after adding a region of interest
|
||||
wf.select_roi(region=(3, 7))
|
||||
res = scans.line_scan(dev.samx, 0, 8, steps=50, relative=False)
|
||||
res.wait()
|
||||
|
||||
qtbot.waitUntil(wait_for_fit, timeout=10000)
|
Reference in New Issue
Block a user