0
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2025-07-14 03:31:50 +02:00

feat: add rpc broadcast

This commit is contained in:
2025-03-13 10:06:34 +01:00
committed by wyzula-jan
parent 6282210698
commit 1c322cc20a
14 changed files with 780 additions and 635 deletions

View File

@ -1,81 +1,59 @@
"""This module contains fixtures that are used in the end-2-end tests."""
import random
import time
from contextlib import contextmanager
import pytest
from bec_lib.endpoints import MessageEndpoints
from bec_widgets.cli.client_utils import BECGuiClient, _start_plot_process
from bec_widgets.utils import BECDispatcher
from bec_widgets.widgets.containers.figure import BECFigure
from bec_widgets.cli.client_utils import BECGuiClient
# pylint: disable=unused-argument
# pylint: disable=redefined-outer-name
# make threads check in autouse, **will be executed at the end**; better than
# having it in fixtures for each test, since it prevents from needing to
# 'manually' shutdown bec_client_lib (for example) to make it happy, whereas
# whereas in fact bec_client_lib makes its on cleanup
@pytest.fixture(autouse=True)
def threads_check_fixture(threads_check):
"""
Fixture to check if threads are still alive at the end of the test.
This should always run to avoid leaked threads within our application.
The fixture is set to autouse, meaning it will run for every test.
"""
return
@pytest.fixture
def gui_id():
return f"figure_{random.randint(0,100)}" # make a new gui id each time, to ensure no 'gui is alive' zombie key can perturbate
@contextmanager
def plot_server(gui_id, klass, client_lib):
dispatcher = BECDispatcher(client=client_lib) # Has to init singleton with fixture client
process, _ = _start_plot_process(
gui_id, klass, gui_class_id="bec", config=client_lib._client._service_config.config_path
)
try:
while client_lib._client.connector.get(MessageEndpoints.gui_heartbeat(gui_id)) is None:
time.sleep(0.3)
yield gui_id
finally:
process.terminate()
process.wait()
dispatcher.disconnect_all()
dispatcher.reset_singleton()
@pytest.fixture
def connected_client_figure(gui_id, bec_client_lib):
with plot_server(gui_id, BECFigure, bec_client_lib) as server:
yield server
"""New gui id each time, to ensure no 'gui is alive' zombie key can perturbate"""
return f"figure_{random.randint(0,100)}"
@pytest.fixture
def connected_client_gui_obj(gui_id, bec_client_lib):
"""
Fixture to create a new BECGuiClient object and start a server in the background.
This fixture should be used if a new gui instance is needed for each test.
"""
gui = BECGuiClient(gui_id=gui_id)
try:
gui.start(wait=True)
# gui._start_server(wait=True)
yield gui
finally:
gui.kill_server()
@pytest.fixture
def connected_client_dock(gui_id, bec_client_lib):
@pytest.fixture(scope="session")
def connected_gui_with_scope_session(gui_id, bec_client_lib):
"""
Fixture to create a new BECGuiClient object and start a server in the background.
This fixture is scoped to the session, meaning it remains alive for all tests in the session.
We can use this fixture to create a gui object that is used across multiple tests, and
simulate a real-world scenario where the gui is not restarted for each test.
"""
gui = BECGuiClient(gui_id=gui_id)
gui._auto_updates_enabled = False
try:
gui.start(wait=True)
gui.window_list[0]
yield gui.window_list[0]
finally:
gui.kill_server()
@pytest.fixture
def connected_client_dock_w_auto_updates(gui_id, bec_client_lib):
gui = BECGuiClient(gui_id=gui_id)
try:
gui._start_server(wait=True)
yield gui, gui.window_list[0]
yield gui
finally:
gui.kill_server()

View File

@ -1,378 +1,284 @@
# import time
# import numpy as np
# import pytest
# from bec_lib.endpoints import MessageEndpoints
# from bec_widgets.cli.client import BECFigure, BECImageShow, BECMotorMap, BECWaveform
# from bec_widgets.tests.utils import check_remote_data_size
# from bec_widgets.utils import Colors
# # pylint: disable=unused-argument
# # pylint: disable=redefined-outer-name
# # pylint: disable=too-many-locals
# def test_rpc_add_dock_with_figure_e2e(qtbot, bec_client_lib, connected_client_dock):
# # BEC client shortcuts
# dock = connected_client_dock
# client = bec_client_lib
# dev = client.device_manager.devices
# scans = client.scans
# queue = client.queue
# # Create 3 docks
# d0 = dock.add_dock("dock_0")
# d1 = dock.add_dock("dock_1")
# d2 = dock.add_dock("dock_2")
# dock_config = dock._config_dict
# assert len(dock_config["docks"]) == 3
# # Add 3 figures with some widgets
# fig0 = d0.add_widget("BECFigure")
# fig1 = d1.add_widget("BECFigure")
# fig2 = d2.add_widget("BECFigure")
# dock_config = dock._config_dict
# assert len(dock_config["docks"]) == 3
# assert len(dock_config["docks"]["dock_0"]["widgets"]) == 1
# assert len(dock_config["docks"]["dock_1"]["widgets"]) == 1
# assert len(dock_config["docks"]["dock_2"]["widgets"]) == 1
# assert fig1.__class__.__name__ == "BECFigure"
# assert fig1.__class__ == BECFigure
# assert fig2.__class__.__name__ == "BECFigure"
# assert fig2.__class__ == BECFigure
# mm = fig0.motor_map("samx", "samy")
# plt = fig1.plot(x_name="samx", y_name="bpm4i")
# im = fig2.image("eiger")
# assert mm.__class__.__name__ == "BECMotorMap"
# assert mm.__class__ == BECMotorMap
# assert plt.__class__.__name__ == "BECWaveform"
# assert plt.__class__ == BECWaveform
# assert im.__class__.__name__ == "BECImageShow"
# assert im.__class__ == BECImageShow
# assert mm._config_dict["signals"] == {
# "dap": None,
# "source": "device_readback",
# "x": {
# "name": "samx",
# "entry": "samx",
# "unit": None,
# "modifier": None,
# "limits": [-50.0, 50.0],
# },
# "y": {
# "name": "samy",
# "entry": "samy",
# "unit": None,
# "modifier": None,
# "limits": [-50.0, 50.0],
# },
# "z": None,
# }
# assert plt._config_dict["curves"]["bpm4i-bpm4i"]["signals"] == {
# "dap": None,
# "source": "scan_segment",
# "x": {"name": "samx", "entry": "samx", "unit": None, "modifier": None, "limits": None},
# "y": {"name": "bpm4i", "entry": "bpm4i", "unit": None, "modifier": None, "limits": None},
# "z": None,
# }
# assert im._config_dict["images"]["eiger"]["monitor"] == "eiger"
# # check initial position of motor map
# initial_pos_x = dev.samx.read()["samx"]["value"]
# initial_pos_y = dev.samy.read()["samy"]["value"]
# # Try to make a scan
# status = scans.line_scan(dev.samx, -5, 5, steps=10, exp_time=0.05, relative=False)
# status.wait()
# # plot
# item = queue.scan_storage.storage[-1]
# plt_last_scan_data = item.live_data if hasattr(item, "live_data") else item.data
# num_elements = 10
# plot_name = "bpm4i-bpm4i"
# qtbot.waitUntil(lambda: check_remote_data_size(plt, plot_name, num_elements))
# plt_data = plt.get_all_data()
# assert plt_data["bpm4i-bpm4i"]["x"] == plt_last_scan_data["samx"]["samx"].val
# assert plt_data["bpm4i-bpm4i"]["y"] == plt_last_scan_data["bpm4i"]["bpm4i"].val
# # image
# last_image_device = client.connector.get_last(MessageEndpoints.device_monitor_2d("eiger"))[
# "data"
# ].data
# time.sleep(0.5)
# last_image_plot = im.images[0].get_data()
# np.testing.assert_equal(last_image_device, last_image_plot)
# # motor map
# final_pos_x = dev.samx.read()["samx"]["value"]
# final_pos_y = dev.samy.read()["samy"]["value"]
# # check final coordinates of motor map
# motor_map_data = mm.get_data()
# np.testing.assert_equal(
# [motor_map_data["x"][0], motor_map_data["y"][0]], [initial_pos_x, initial_pos_y]
# )
# np.testing.assert_equal(
# [motor_map_data["x"][-1], motor_map_data["y"][-1]], [final_pos_x, final_pos_y]
# )
# def test_dock_manipulations_e2e(connected_client_dock):
# dock = connected_client_dock
# d0 = dock.add_dock("dock_0")
# d1 = dock.add_dock("dock_1")
# d2 = dock.add_dock("dock_2")
# dock_config = dock._config_dict
# assert len(dock_config["docks"]) == 3
# d0.detach()
# dock.detach_dock("dock_2")
# dock_config = dock._config_dict
# assert len(dock_config["docks"]) == 3
# assert len(dock.temp_areas) == 2
# d0.attach()
# dock_config = dock._config_dict
# assert len(dock_config["docks"]) == 3
# assert len(dock.temp_areas) == 1
# d2.remove()
# dock_config = dock._config_dict
# assert ["dock_0", "dock_1"] == list(dock_config["docks"])
# dock.clear_all()
# dock_config = dock._config_dict
# assert len(dock_config["docks"]) == 0
# assert len(dock.temp_areas) == 0
# def test_ring_bar(connected_client_dock):
# dock = connected_client_dock
# d0 = dock.add_dock(name="dock_0")
# bar = d0.add_widget("RingProgressBar")
# assert bar.__class__.__name__ == "RingProgressBar"
import time
import numpy as np
import pytest
from bec_lib.endpoints import MessageEndpoints
from bec_widgets.cli.client import BECFigure, BECImageShow, BECMotorMap, BECWaveform
from bec_widgets.cli.rpc.rpc_base import RPCReference
from bec_widgets.tests.utils import check_remote_data_size
from bec_widgets.utils import Colors
# pylint: disable=unused-argument
# pylint: disable=redefined-outer-name
# pylint: disable=too-many-locals
# pylint: disable=protected-access
def test_gui_rpc_registry(qtbot, connected_client_gui_obj):
gui = connected_client_gui_obj
dock_area = gui.new("cool_dock_area")
def check_dock_area_registered():
return dock_area._gui_id in gui._registry_state
qtbot.waitUntil(check_dock_area_registered, timeout=5000)
assert hasattr(gui, "cool_dock_area")
dock = dock_area.new("dock_0")
def check_dock_registered():
dock_dict = (
gui._registry_state.get(dock_area._gui_id, {}).get("config", {}).get("docks", {})
)
return len(dock_dict) == 1
qtbot.waitUntil(check_dock_registered, timeout=5000)
assert hasattr(gui.cool_dock_area, "dock_0")
# assert hasattr(dock_area, "dock_0")
def test_rpc_add_dock_with_figure_e2e(qtbot, bec_client_lib, connected_client_gui_obj):
gui = connected_client_gui_obj
# BEC client shortcuts
dock = gui.bec
client = bec_client_lib
dev = client.device_manager.devices
scans = client.scans
queue = client.queue
# Create 3 docks
d0 = dock.new("dock_0")
d1 = dock.new("dock_1")
d2 = dock.new("dock_2")
# Check that callback for dock_registry is done
def check_docks_registered():
dock_register = dock._parent._registry_state.get(dock._gui_id, None)
if dock_register is not None:
n_docks = dock_register.get("config", {}).get("docks", {})
return len(n_docks) == 3
return False
# raise AssertionError("Docks not registered yet")
# bar.set_number_of_bars(5)
# bar.set_colors_from_map("viridis")
# bar.set_value([10, 20, 30, 40, 50])
# Waii until docks are registered
qtbot.waitUntil(check_docks_registered, timeout=5000)
qtbot.wait(500)
assert len(dock.panels) == 3
assert hasattr(gui.bec, "dock_0")
# bar_config = bar._config_dict
# Add 3 figures with some widgets
fig0 = d0.new("BECFigure")
fig1 = d1.new("BECFigure")
fig2 = d2.new("BECFigure")
# expected_colors_light = [
# list(color) for color in Colors.golden_angle_color("viridis", 5, "RGB", theme="light")
# ]
# expected_colors_dark = [
# list(color) for color in Colors.golden_angle_color("viridis", 5, "RGB", theme="dark")
# ]
# bar_colors = [ring._config_dict["color"] for ring in bar.rings]
# bar_values = [ring._config_dict["value"] for ring in bar.rings]
# assert bar_config["num_bars"] == 5
# assert bar_values == [10, 20, 30, 40, 50]
# assert bar_colors == expected_colors_light or bar_colors == expected_colors_dark
def check_fig2_registered():
# return hasattr(d2, "BECFigure_2")
dock_config = dock._parent._registry_state[dock._gui_id]["config"]["docks"].get(
d2.widget_name, {}
)
if dock_config:
n_widgets = dock_config.get("widgets", {})
if any(widget_name.startswith("BECFigure") for widget_name in n_widgets.keys()):
return True
raise AssertionError("Figure not registered yet")
qtbot.waitUntil(check_fig2_registered, timeout=5000)
# def test_ring_bar_scan_update(bec_client_lib, connected_client_dock):
# dock = connected_client_dock
assert len(d0.element_list) == 1
assert len(d1.element_list) == 1
assert len(d2.element_list) == 1
# d0 = dock.add_dock("dock_0")
assert fig1.__class__.__name__ == "RPCReference"
assert fig1.__class__ == RPCReference
assert gui._ipython_registry[fig1._gui_id].__class__ == BECFigure
assert fig2.__class__.__name__ == "RPCReference"
assert fig2.__class__ == RPCReference
assert gui._ipython_registry[fig2._gui_id].__class__ == BECFigure
# bar = d0.add_widget("RingProgressBar")
mm = fig0.motor_map("samx", "samy")
plt = fig1.plot(x_name="samx", y_name="bpm4i")
im = fig2.image("eiger")
# client = bec_client_lib
# dev = client.device_manager.devices
# dev.samx.tolerance.set(0)
# dev.samy.tolerance.set(0)
# scans = client.scans
assert mm.__class__.__name__ == "RPCReference"
assert mm.__class__ == RPCReference
assert plt.__class__.__name__ == "RPCReference"
assert plt.__class__ == RPCReference
assert im.__class__.__name__ == "RPCReference"
assert im.__class__ == RPCReference
# status = scans.line_scan(dev.samx, -5, 5, steps=10, exp_time=0.05, relative=False)
# status.wait()
# bar_config = bar._config_dict
# assert bar_config["num_bars"] == 1
# assert bar_config["rings"][0]["value"] == 10
# assert bar_config["rings"][0]["min_value"] == 0
# assert bar_config["rings"][0]["max_value"] == 10
def test_dock_manipulations_e2e(qtbot, connected_client_gui_obj):
gui = connected_client_gui_obj
dock = gui.bec
# status = scans.grid_scan(dev.samx, -5, 5, 4, dev.samy, -10, 10, 4, relative=True, exp_time=0.1)
# status.wait()
d0 = dock.new("dock_0")
d1 = dock.new("dock_1")
d2 = dock.new("dock_2")
# bar_config = bar._config_dict
# assert bar_config["num_bars"] == 1
# assert bar_config["rings"][0]["value"] == 16
# assert bar_config["rings"][0]["min_value"] == 0
# assert bar_config["rings"][0]["max_value"] == 16
assert hasattr(gui.bec, "dock_0")
assert hasattr(gui.bec, "dock_1")
assert hasattr(gui.bec, "dock_2")
assert len(gui.bec.panels) == 3
# init_samx = dev.samx.read()["samx"]["value"]
# init_samy = dev.samy.read()["samy"]["value"]
# final_samx = init_samx + 5
# final_samy = init_samy + 10
d0.detach()
dock.detach_dock("dock_2")
# How can we properly check that the dock is detached?
assert len(gui.bec.panels) == 3
d0.attach()
assert len(gui.bec.panels) == 3
# dev.samx.velocity.put(5)
# dev.samy.velocity.put(5)
# status = scans.umv(dev.samx, 5, dev.samy, 10, relative=True)
# status.wait()
# bar_config = bar._config_dict
# assert bar_config["num_bars"] == 2
# assert bar_config["rings"][0]["value"] == final_samx
# assert bar_config["rings"][1]["value"] == final_samy
# assert bar_config["rings"][0]["min_value"] == init_samx
# assert bar_config["rings"][0]["max_value"] == final_samx
# assert bar_config["rings"][1]["min_value"] == init_samy
# assert bar_config["rings"][1]["max_value"] == final_samy
# def test_auto_update(bec_client_lib, connected_client_dock_w_auto_updates, qtbot):
# client = bec_client_lib
# dev = client.device_manager.devices
# scans = client.scans
# queue = client.queue
# gui, dock = connected_client_dock_w_auto_updates
# auto_updates = gui.auto_updates
# def get_default_figure():
# return auto_updates.get_default_figure()
# plt = get_default_figure()
# gui.selected_device = "bpm4i"
# status = scans.line_scan(dev.samx, -5, 5, steps=10, exp_time=0.05, relative=False)
# status.wait()
# # get data from curves
# widgets = plt.widget_list
# qtbot.waitUntil(lambda: len(plt.widget_list) > 0, timeout=5000)
# item = queue.scan_storage.storage[-1]
# last_scan_data = item.live_data if hasattr(item, "live_data") else item.data
# num_elements = 10
# plot_name = f"Scan {status.scan.scan_number} - {dock.selected_device}"
# qtbot.waitUntil(lambda: check_remote_data_size(widgets[0], plot_name, num_elements))
# plt_data = widgets[0].get_all_data()
# # check plotted data
# assert (
# plt_data[f"Scan {status.scan.scan_number} - bpm4i"]["x"]
# == last_scan_data["samx"]["samx"].val
# )
# assert (
# plt_data[f"Scan {status.scan.scan_number} - bpm4i"]["y"]
# == last_scan_data["bpm4i"]["bpm4i"].val
# )
# status = scans.grid_scan(
# dev.samx, -10, 10, 5, dev.samy, -5, 5, 5, exp_time=0.05, relative=False
# )
# status.wait()
# plt = auto_updates.get_default_figure()
# widgets = plt.widget_list
# qtbot.waitUntil(lambda: len(plt.widget_list) > 0, timeout=5000)
# item = queue.scan_storage.storage[-1]
# last_scan_data = item.live_data if hasattr(item, "live_data") else item.data
# plot_name = f"Scan {status.scan.scan_number} - bpm4i"
# num_elements_bec = 25
# qtbot.waitUntil(lambda: check_remote_data_size(widgets[0], plot_name, num_elements_bec))
# plt_data = widgets[0].get_all_data()
# # check plotted data
# assert (
# plt_data[f"Scan {status.scan.scan_number} - {dock.selected_device}"]["x"]
# == last_scan_data["samx"]["samx"].val
# )
# assert (
# plt_data[f"Scan {status.scan.scan_number} - {dock.selected_device}"]["y"]
# == last_scan_data["samy"]["samy"].val
# )
# def test_rpc_gui_obj(connected_client_gui_obj, qtbot):
# gui = connected_client_gui_obj
# assert gui.selected_device is None
# assert len(gui.windows) == 1
# assert gui.windows["main"].widget is gui.main
# assert gui.windows["main"].title == "BEC Widgets"
# mw = gui.main
# assert mw.__class__.__name__ == "BECDockArea"
# xw = gui.new("X")
# assert xw.__class__.__name__ == "BECDockArea"
# assert len(gui.windows) == 2
# gui_info = gui._dump()
# mw_info = gui_info[mw._gui_id]
# assert mw_info["title"] == "BEC Widgets"
# assert mw_info["visible"]
# xw_info = gui_info[xw._gui_id]
# assert xw_info["title"] == "X"
# assert xw_info["visible"]
# gui.hide()
# gui_info = gui._dump()
# assert not any(windows["visible"] for windows in gui_info.values())
# gui.show()
# gui_info = gui._dump()
# assert all(windows["visible"] for windows in gui_info.values())
# assert gui.gui_is_alive()
# gui.close()
# assert not gui.gui_is_alive()
# gui.start_server(wait=True)
# assert gui.gui_is_alive()
# # calling start multiple times should not change anything
# gui.start_server(wait=True)
# gui.start()
# # gui.windows should have main, and main dock area should have same gui_id as before
# assert len(gui.windows) == 1
# assert gui.windows["main"].widget._gui_id == mw._gui_id
# # communication should work, main dock area should have same id and be visible
# gui_info = gui._dump()
# assert gui_info[mw._gui_id]["visible"]
# with pytest.raises(RuntimeError):
# gui.main.delete()
# yw = gui.new("Y")
# assert len(gui.windows) == 2
# yw.delete()
# assert len(gui.windows) == 1
# # check it is really deleted on server
# gui_info = gui._dump()
# assert yw._gui_id not in gui_info
# def test_rpc_call_with_exception_in_safeslot_error_popup(connected_client_gui_obj, qtbot):
# gui = connected_client_gui_obj
# gui.main.add_dock("test")
# qtbot.waitUntil(lambda: len(gui.main.panels) == 2) # default_figure + test
# qtbot.wait(500)
# with pytest.raises(ValueError):
# gui.main.add_dock("test")
# # time.sleep(0.1)
def wait_for_dock_removed():
dock_config = gui._registry_state[gui.bec._gui_id]["config"]["docks"]
return len(dock_config.keys()) == 2
d2.remove()
qtbot.waitUntil(wait_for_dock_removed, timeout=5000)
assert len(gui.bec.panels) == 2
def wait_for_docks_removed():
dock_config = gui._registry_state[gui.bec._gui_id]["config"]["docks"]
return len(dock_config.keys()) == 0
dock.clear_all()
qtbot.waitUntil(wait_for_docks_removed, timeout=5000)
assert len(gui.bec.panels) == 0
def test_ring_bar(connected_client_dock):
dock = connected_client_dock
d0 = dock.add_dock(name="dock_0")
bar = d0.add_widget("RingProgressBar")
assert bar.__class__.__name__ == "RingProgressBar"
plt = get_default_figure()
gui.selected_device = "bpm4i"
status = scans.line_scan(dev.samx, -5, 5, steps=10, exp_time=0.05, relative=False)
status.wait()
# get data from curves
widgets = plt.widget_list
qtbot.waitUntil(lambda: len(plt.widget_list) > 0, timeout=5000)
item = queue.scan_storage.storage[-1]
last_scan_data = item.live_data if hasattr(item, "live_data") else item.data
num_elements = 10
plot_name = f"Scan {status.scan.scan_number} - {dock.selected_device}"
qtbot.waitUntil(lambda: check_remote_data_size(widgets[0], plot_name, num_elements))
plt_data = widgets[0].get_all_data()
# check plotted data
assert (
plt_data[f"Scan {status.scan.scan_number} - bpm4i"]["x"]
== last_scan_data["samx"]["samx"].val
)
assert (
plt_data[f"Scan {status.scan.scan_number} - bpm4i"]["y"]
== last_scan_data["bpm4i"]["bpm4i"].val
)
status = scans.grid_scan(
dev.samx, -10, 10, 5, dev.samy, -5, 5, 5, exp_time=0.05, relative=False
)
status.wait()
plt = auto_updates.get_default_figure()
widgets = plt.widget_list
qtbot.waitUntil(lambda: len(plt.widget_list) > 0, timeout=5000)
item = queue.scan_storage.storage[-1]
last_scan_data = item.live_data if hasattr(item, "live_data") else item.data
plot_name = f"Scan {status.scan.scan_number} - bpm4i"
num_elements_bec = 25
qtbot.waitUntil(lambda: check_remote_data_size(widgets[0], plot_name, num_elements_bec))
plt_data = widgets[0].get_all_data()
# check plotted data
assert (
plt_data[f"Scan {status.scan.scan_number} - {dock.selected_device}"]["x"]
== last_scan_data["samx"]["samx"].val
)
assert (
plt_data[f"Scan {status.scan.scan_number} - {dock.selected_device}"]["y"]
== last_scan_data["samy"]["samy"].val
)
def test_rpc_gui_obj(connected_client_gui_obj, qtbot):
gui = connected_client_gui_obj
assert gui.selected_device is None
assert len(gui.windows) == 1
assert gui.windows["bec"] is gui.bec
mw = gui.bec
assert mw.__class__.__name__ == "BECDockArea"
xw = gui.new("X")
assert xw.__class__.__name__ == "BECDockArea"
assert len(gui.windows) == 2
gui_info = gui._dump()
mw_info = gui_info[mw._gui_id]
assert mw_info["title"] == "BEC"
assert mw_info["visible"]
xw_info = gui_info[xw._gui_id]
assert xw_info["title"] == "BEC - X"
assert xw_info["visible"]
gui.hide()
gui_info = gui._dump()
assert not any(windows["visible"] for windows in gui_info.values())
gui.show()
gui_info = gui._dump()
assert all(windows["visible"] for windows in gui_info.values())
assert gui._gui_is_alive()
gui._close()
assert not gui._gui_is_alive()
gui._start_server(wait=True)
assert gui._gui_is_alive()
# calling start multiple times should not change anything
gui._start_server(wait=True)
gui._start()
# gui.windows should have bec with gui_id 'bec'
assert len(gui.windows) == 1
assert gui.windows["bec"]._gui_id == mw._gui_id
# communication should work, main dock area should have same id and be visible
gui_info = gui._dump()
assert gui_info[mw._gui_id]["visible"]
with pytest.raises(RuntimeError):
gui.bec.delete()
yw = gui.new("Y")
assert len(gui.windows) == 2
yw.delete()
assert len(gui.windows) == 1
# check it is really deleted on server
gui_info = gui._dump()
assert yw._gui_id not in gui_info
def test_rpc_call_with_exception_in_safeslot_error_popup(connected_client_gui_obj, qtbot):
gui = connected_client_gui_obj
gui.main.add_dock("test")
qtbot.waitUntil(lambda: len(gui.main.panels) == 2) # default_figure + test
qtbot.wait(500)
with pytest.raises(ValueError):
gui.bec.add_dock("test")
# time.sleep(0.1)

View File

@ -5,8 +5,11 @@ import pytest
from bec_lib.endpoints import MessageEndpoints
from bec_widgets.cli.client import BECFigure, BECImageShow, BECMotorMap, BECWaveform
from bec_widgets.cli.rpc.rpc_base import RPCReference
from bec_widgets.tests.utils import check_remote_data_size
# pylint: disable=protected-access
@pytest.fixture
def connected_figure(connected_client_gui_obj):
@ -39,12 +42,15 @@ def test_rpc_plotting_shortcuts_init_configs(connected_figure, qtbot):
# Checking if classes are correctly initialised
assert len(fig.widgets) == 4
assert plt.__class__.__name__ == "BECWaveform"
assert plt.__class__ == BECWaveform
assert im.__class__.__name__ == "BECImageShow"
assert im.__class__ == BECImageShow
assert motor_map.__class__.__name__ == "BECMotorMap"
assert motor_map.__class__ == BECMotorMap
assert plt.__class__.__name__ == "RPCReference"
assert plt.__class__ == RPCReference
assert plt._root._ipython_registry[plt._gui_id].__class__ == BECWaveform
assert im.__class__.__name__ == "RPCReference"
assert im.__class__ == RPCReference
assert im._root._ipython_registry[im._gui_id].__class__ == BECImageShow
assert motor_map.__class__.__name__ == "RPCReference"
assert motor_map.__class__ == RPCReference
assert motor_map._root._ipython_registry[motor_map._gui_id].__class__ == BECMotorMap
# check if the correct devices are set
# plot
@ -215,10 +221,11 @@ def test_dap_rpc(connected_figure, bec_client_lib, qtbot):
def test_removing_subplots(connected_figure, bec_client_lib):
fig = connected_figure
plt = fig.plot(x_name="samx", y_name="bpm4i", dap="GaussianModel")
im = fig.image(monitor="eiger")
mm = fig.motor_map(motor_x="samx", motor_y="samy")
# Registry can't handle multiple subplots on one widget, BECFigure will be deprecated though
# im = fig.image(monitor="eiger")
# mm = fig.motor_map(motor_x="samx", motor_y="samy")
assert len(fig.widget_list) == 3
assert len(fig.widget_list) == 1
# removing curves
assert len(plt.curves) == 2
@ -229,7 +236,7 @@ def test_removing_subplots(connected_figure, bec_client_lib):
# removing all subplots from figure
plt.remove()
im.remove()
mm.remove()
# im.remove()
# mm.remove()
assert len(fig.widget_list) == 0