1
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2026-04-15 13:10:54 +02:00

Compare commits

..

10 Commits

Author SHA1 Message Date
3653fe9799 wip - fix formatter 2025-03-13 10:32:04 +01:00
750350dc52 wip - removed rpcreference import 2025-03-13 10:30:04 +01:00
9eb1608b01 wip - test namespace 2025-03-13 10:24:35 +01:00
0433b40054 wip - namespace 2025-03-13 10:24:35 +01:00
d1a41752c4 wip - namespace 2025-03-13 10:24:35 +01:00
bf060b3aba wip - namespace 2025-03-13 10:24:35 +01:00
5f0dd62f25 wip - namespace 2025-03-13 10:24:35 +01:00
21b1f0b2de wip - namespace 2025-03-13 10:24:35 +01:00
17f6dbb0d4 wip - namespace 2025-03-13 10:24:35 +01:00
ba347e026a wip - namespace update 2025-03-13 10:06:11 +01:00
8 changed files with 116 additions and 445 deletions

View File

@@ -21,7 +21,7 @@ from rich.table import Table
import bec_widgets.cli.client as client
from bec_widgets.cli.auto_updates import AutoUpdates
from bec_widgets.cli.rpc.rpc_base import RPCBase, RPCReference
from bec_widgets.cli.rpc.rpc_base import RPCBase
if TYPE_CHECKING:
from bec_lib import messages
@@ -220,28 +220,6 @@ class BECGuiClient(RPCBase):
self._gui_started_event = threading.Event()
self._process = None
self._process_output_processing_thread = None
self._exposed_dock_areas = []
self._registry_state = {}
self._ipython_registry = {}
self.available_widgets = AvailableWidgetsNamespace()
def connect_to_gui_server(self, gui_id: str) -> None:
"""Connect to a GUI server"""
# Unregister the old callback
self._client.connector.unregister(
MessageEndpoints.gui_registry_state(self._gui_id), cb=self._handle_registry_update
)
self._gui_id = gui_id
# Get the registry state
msgs = self._client.connector.xread(
MessageEndpoints.gui_registry_state(self._gui_id), count=1
)
if msgs:
self._handle_registry_update(msgs[0])
# Register the new callback
self._client.connector.register(
MessageEndpoints.gui_registry_state(self._gui_id), cb=self._handle_registry_update
)
@property
def windows(self) -> dict:
@@ -318,24 +296,20 @@ class BECGuiClient(RPCBase):
# return self.auto_updates.do_update(msg)
def _gui_post_startup(self):
timeout = 10
while time.time() < time.time() + timeout:
if len(list(self._registry_state.keys())) == 0:
time.sleep(0.1)
else:
break
# FIXME AUTO UPDATES
# if self._auto_updates_enabled:
# if self._auto_updates is None:
# auto_updates = self._get_update_script()
# if auto_updates is None:
# AutoUpdates.create_default_dock = True
# AutoUpdates.enabled = True
# auto_updates = AutoUpdates(self._top_level[name])
# if auto_updates.create_default_dock:
# auto_updates.start_default_dock()
# self._start_update_script()
# self._auto_updates = auto_updates
self._top_level["main"] = WidgetDesc(
title="BEC Widgets", widget=BECDockArea(gui_id=self._gui_id)
)
if self._auto_updates_enabled:
if self._auto_updates is None:
auto_updates = self._get_update_script()
if auto_updates is None:
AutoUpdates.create_default_dock = True
AutoUpdates.enabled = True
auto_updates = AutoUpdates(self._top_level["main"].widget)
if auto_updates.create_default_dock:
auto_updates.start_default_dock()
self._start_update_script()
self._auto_updates = auto_updates
self._do_show_all()
self._gui_started_event.set()
@@ -374,22 +348,8 @@ class BECGuiClient(RPCBase):
rpc_client = RPCBase(gui_id=f"{self._gui_id}:window", parent=self)
return rpc_client._run_rpc("_dump")
def _start(self, wait: bool = False) -> None:
self._killed = False
self._client.connector.register(
MessageEndpoints.gui_registry_state(self._gui_id), cb=self._handle_registry_update
)
return self._start_server(wait=wait)
def start(self, wait: bool = False) -> None:
"""Start the GUI server."""
return self._start(wait=wait)
def _handle_registry_update(self, msg: StreamMessage) -> None:
self._registry_state = msg["data"].state
self._update_dynamic_namespace()
# self._update_dynamic_namespace()
# FIXME logic to update namespace
def start(self):
return self.start_server()
def _do_show_all(self):
rpc_client = RPCBase(gui_id=f"{self._gui_id}:window", parent=self)
@@ -469,94 +429,6 @@ class BECGuiClient(RPCBase):
for widget_name in self.windows.keys():
self.delete(widget_name)
def _clear_top_level_widgets(self):
self._top_level.clear()
for widget_id in self._exposed_dock_areas:
delattr(self, widget_id)
self._exposed_dock_areas.clear()
def _add_dock_areas_from_registry(self):
existing_gui_ids = []
for dock_area_info in self._registry_state.values():
name = dock_area_info["name"]
gui_id = dock_area_info["gui_id"]
obj = self._ipython_registry.get("gui_id")
if obj is None:
dock_area = BECDockArea(gui_id=gui_id, name=name, parent=self)
self._top_level[name] = dock_area
# weak ref to for all widgets
self._ipython_registry[gui_id] = dock_area
else:
# update namespace
dock_area = obj
existing_gui_ids.append(gui_id)
obj = RPCReference(registry=self._ipython_registry, gui_id=gui_id)
self._top_level[name] = obj
self._exposed_dock_areas.append(name)
setattr(self, name, obj)
dock_info = dock_area_info["config"].get("docks", None)
if dock_info:
self._add_docks_from_registry(dock_info, dock_area, gui_ids=existing_gui_ids)
remove_ids = []
for widget_id in self._ipython_registry:
if widget_id not in existing_gui_ids:
remove_ids.append(widget_id)
for widget_id in remove_ids:
self._ipython_registry.pop(widget_id)
def _add_docks_from_registry(
self, dock_info: dict[str, dict], dock_area: BECDockArea, gui_ids: list[str]
):
for dock_name, info in dock_info.items():
# add the gui_id to the list of existing gui_ids
gui_ids.append(info["gui_id"])
obj = self._ipython_registry.get(info["gui_id"])
if obj is None:
# create new rpc object
dock = client.BECDock(gui_id=info["gui_id"], name=dock_name, parent=dock_area)
# add reference to the registry
self._ipython_registry[info["gui_id"]] = dock
else:
dock = obj
# create weak reference for the namespace
obj = RPCReference(registry=self._ipython_registry, gui_id=info["gui_id"])
# add the dock to the dock area
setattr(dock_area, dock_name, obj)
widget_info = info["widgets"]
if widget_info:
self._add_widgets_from_registry(
widget_info=widget_info, dock_area=dock_area, dock=dock, gui_ids=gui_ids
)
def _add_widgets_from_registry(
self,
widget_info: dict[str, dict],
dock_area: client.BECDockArea,
dock: client.BECDock,
gui_ids: list[str],
):
for widget_name, info in widget_info.items():
gui_ids.append(info["gui_id"])
obj = self._ipython_registry.get(info["gui_id"])
if obj is None:
widget_class = getattr(client, info["widget_class"])
widget = widget_class(gui_id=info["gui_id"], name=widget_name, parent=dock)
self._ipython_registry[info["gui_id"]] = widget
else:
widget = obj
dock_area_elements = getattr(dock_area, "elements")
obj = RPCReference(registry=self._ipython_registry, gui_id=info["gui_id"])
setattr(dock_area_elements, widget_name, obj)
setattr(dock, widget_name, obj)
def _update_dynamic_namespace(self):
"""Update the dynamic name space"""
self._clear_top_level_widgets()
self._add_dock_areas_from_registry()
def close(self):
"""Deprecated. Use kill_server() instead."""
# FIXME, deprecated in favor of kill, will be removed in the future
@@ -581,7 +453,3 @@ class BECGuiClient(RPCBase):
self._process_output_processing_thread.join()
self._process.wait()
self._process = None
# Unregister the registry state
self._client.connector.unregister(
MessageEndpoints.gui_registry_state(self._gui_id), cb=self._handle_registry_update
)

View File

@@ -60,50 +60,6 @@ class RPCResponseTimeoutError(Exception):
)
class DeletedWidgetError(Exception): ...
def check_for_deleted_widget(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
if self._gui_id not in self._registry:
raise DeletedWidgetError(f"Widget with gui_id {self._gui_id} has been deleted")
return func(self, *args, **kwargs)
return wrapper
class RPCReference:
def __init__(self, registry: dict, gui_id: str) -> None:
self._registry = registry
self._gui_id = gui_id
@check_for_deleted_widget
def __getattr__(self, name):
if name in ["_registry", "_gui_id"]:
return super().__getattribute__(name)
return self._registry[self._gui_id].__getattribute__(name)
@check_for_deleted_widget
def __getitem__(self, key):
return self._registry[self._gui_id].__getitem__(key)
def __repr__(self):
if self._gui_id not in self._registry:
return f"<Deleted widget with gui_id {self._gui_id}>"
return self._registry[self._gui_id].__repr__()
def __str__(self):
if self._gui_id not in self._registry:
return f"<Deleted widget with gui_id {self._gui_id}>"
return self._registry[self._gui_id].__str__()
def __dir__(self):
if self._gui_id not in self._registry:
return []
return self._registry[self._gui_id].__dir__()
class RPCBase:
def __init__(
self,
@@ -226,11 +182,7 @@ class RPCBase:
cls = getattr(client, cls)
# print(msg_result)
ret = cls(parent=self, **msg_result)
self._root._ipython_registry[ret._gui_id] = ret
obj = RPCReference(self._root._ipython_registry, ret._gui_id)
return obj
# return ret
return cls(parent=self, **msg_result)
return msg_result
def _gui_is_alive(self):

View File

@@ -17,20 +17,6 @@ if TYPE_CHECKING:
logger = bec_logger.logger
def broadcast_update(func):
"""
Decorator to broadcast updates to the RPCRegister whenever a new RPC object is added or removed.
"""
@wraps(func)
def wrapper(self, *args, **kwargs):
result = func(self, *args, **kwargs)
self.broadcast()
return result
return wrapper
class RPCRegister:
"""
A singleton class that keeps track of all the RPC objects registered in the system for CLI usage.
@@ -51,9 +37,7 @@ class RPCRegister:
return
self._rpc_register = WeakValueDictionary()
self._initialized = True
self.callbacks = []
@broadcast_update
def add_rpc(self, rpc: QObject):
"""
Add an RPC object to the register.
@@ -65,7 +49,6 @@ class RPCRegister:
raise ValueError("RPC object must have a 'gui_id' attribute.")
self._rpc_register[rpc.gui_id] = rpc
@broadcast_update
def remove_rpc(self, rpc: str):
"""
Remove an RPC object from the register.
@@ -128,42 +111,6 @@ class RPCRegister:
widgets = [rpc for rpc in self._rpc_register.values() if isinstance(rpc, cls)]
return [widget._name for widget in widgets]
# def get_names_by_class_name(self, class_name: str) -> list[str]:
# """
# Get all RPC objects of a class, i.e. BECDockArea, BECDock.
# Args:
# class_name(str): The type of the RPC object to be retrieved.
# Returns:
# list: A list of names of RPC objects of the given type.
# """
# rpc_objects = [
# rpc._name
# for name, rpc in self._rpc_register.items()
# if rpc.__class__.__name__ == class_name
# ]
# return rpc_objects
def broadcast(self):
"""
Broadcast the update to all the callbacks.
"""
# print("Broadcasting")
connections = self.list_all_connections()
for callback in self.callbacks:
callback(connections)
def add_callback(self, callback: Callable[[dict], None]):
"""
Add a callback that will be called whenever the registry is updated.
Args:
callback(Callable[[dict], None]): The callback to be added. It should accept a dictionary of all the
registered RPC objects as an argument.
"""
self.callbacks.append(callback)
@classmethod
def reset_singleton(cls):
"""

View File

@@ -69,7 +69,8 @@ class BECWidgetsCLIServer:
self.gui_id = gui_id
# register broadcast callback
self.rpc_register = RPCRegister()
self.rpc_register.add_callback(self.broadcast_registry_update)
self.rpc_register.add_rpc(self.gui)
self.dispatcher.connect_slot(
self.on_rpc_update, MessageEndpoints.gui_instructions(self.gui_id)
)
@@ -160,24 +161,6 @@ class BECWidgetsCLIServer:
except RedisError as exc:
logger.error(f"Error while emitting heartbeat: {exc}")
def broadcast_registry_update(self, connections: dict):
"""
Broadcast the updated registry to all clients.
"""
# We only need to broadcast the dock areas
data = {
key: self.serialize_object(val)
for key, val in connections.items()
if val.__class__.__name__ == "BECDockArea"
}
logger.info(f"All registered connections: {list(connections.keys())}")
self.client.connector.xadd(
MessageEndpoints.gui_registry_state(self.gui_id),
msg_dict={"data": messages.GUIRegistryStateMessage(state=data)},
max_size=1, # only single message in stream
)
def shutdown(self): # TODO not sure if needed when cleanup is done at level of BECConnector
logger.info(f"Shutting down server with gui_id: {self.gui_id}")
self.status = messages.BECStatus.IDLE

View File

@@ -330,15 +330,11 @@ class BECDock(BECWidget, Dock):
widget._name = name # pylint: disable=protected-access
self.addWidget(widget, row=row, col=col, rowspan=rowspan, colspan=colspan)
if hasattr(widget, "config"):
widget.config.gui_id = widget.gui_id
self.config.widgets[widget._name] = widget.config # pylint: disable=protected-access
self._broadcast_update()
return widget
def _broadcast_update(self):
rpc_register = RPCRegister()
rpc_register.broadcast()
if hasattr(widget, "config"):
self.config.widgets[widget.gui_id] = widget.config
return widget
def move_widget(self, widget: QWidget, new_row: int, new_col: int):
"""

View File

@@ -374,14 +374,8 @@ class BECDockArea(BECWidget, QWidget):
self.update()
if floating:
dock.detach()
# Run broadcast update
self._broadcast_update()
return dock
def _broadcast_update(self):
rpc_register = RPCRegister()
rpc_register.broadcast()
def detach_dock(self, dock_name: str) -> BECDock:
"""
Undock a dock from the dock area.

View File

@@ -11,85 +11,33 @@ from bec_widgets.utils import Colors
# pylint: disable=unused-argument
# pylint: disable=redefined-outer-name
# pylint: disable=too-many-locals
# pylint: disable=protected-access
def test_gui_rpc_registry(qtbot, connected_client_gui_obj):
gui = connected_client_gui_obj
dock_area = gui.new("cool_dock_area")
def check_dock_area_registered():
return dock_area._gui_id in gui._registry_state
qtbot.waitUntil(check_dock_area_registered, timeout=5000)
assert hasattr(gui, "cool_dock_area")
dock = dock_area.new("dock_0")
def check_dock_registered():
dock_dict = (
gui._registry_state.get(dock_area._gui_id, {}).get("config", {}).get("docks", {})
)
return len(dock_dict) == 1
qtbot.waitUntil(check_dock_registered, timeout=5000)
assert hasattr(gui.cool_dock_area, "dock_0")
# assert hasattr(dock_area, "dock_0")
def test_rpc_add_dock_with_figure_e2e(qtbot, bec_client_lib, connected_client_gui_obj):
gui = connected_client_gui_obj
def test_rpc_add_dock_with_figure_e2e(qtbot, bec_client_lib, connected_client_dock):
# BEC client shortcuts
dock = gui.bec
dock = connected_client_dock
client = bec_client_lib
dev = client.device_manager.devices
scans = client.scans
queue = client.queue
# Create 3 docks
d0 = dock.new("dock_0")
d1 = dock.new("dock_1")
d2 = dock.new("dock_2")
# Check that callback for dock_registry is done
def check_docks_registered():
dock_register = dock._parent._registry_state.get(dock._gui_id, None)
if dock_register is not None:
n_docks = dock_register.get("config", {}).get("docks", {})
return len(n_docks) == 3
return False
# raise AssertionError("Docks not registered yet")
# Waii until docks are registered
qtbot.waitUntil(check_docks_registered, timeout=5000)
qtbot.wait(500)
assert len(dock.panels) == 3
assert hasattr(gui.bec, "dock_0")
d0 = dock.add_dock("dock_0")
d1 = dock.add_dock("dock_1")
d2 = dock.add_dock("dock_2")
dock_config = dock._config_dict
assert len(dock_config["docks"]) == 3
# Add 3 figures with some widgets
fig0 = d0.new("BECFigure")
fig1 = d1.new("BECFigure")
fig2 = d2.new("BECFigure")
fig0 = d0.add_widget("BECFigure")
fig1 = d1.add_widget("BECFigure")
fig2 = d2.add_widget("BECFigure")
def check_fig2_registered():
# return hasattr(d2, "BECFigure_2")
dock_config = dock._parent._registry_state[dock._gui_id]["config"]["docks"].get(
d2.widget_name, {}
)
if dock_config:
n_widgets = dock_config.get("widgets", {})
if any(widget_name.startswith("BECFigure") for widget_name in n_widgets.keys()):
return True
raise AssertionError("Figure not registered yet")
qtbot.waitUntil(check_fig2_registered, timeout=5000)
assert len(d0.element_list) == 1
# assert hasattr(d0, "BECFigure_0")
assert len(d1.element_list) == 1
assert len(d2.element_list) == 1
dock_config = dock._config_dict
assert len(dock_config["docks"]) == 3
assert len(dock_config["docks"]["dock_0"]["widgets"]) == 1
assert len(dock_config["docks"]["dock_1"]["widgets"]) == 1
assert len(dock_config["docks"]["dock_2"]["widgets"]) == 1
assert fig1.__class__.__name__ == "BECFigure"
assert fig1.__class__ == BECFigure
@@ -107,48 +55,76 @@ def test_rpc_add_dock_with_figure_e2e(qtbot, bec_client_lib, connected_client_gu
assert im.__class__.__name__ == "BECImageShow"
assert im.__class__ == BECImageShow
# # check initial position of motor map
# initial_pos_x = dev.samx.read()["samx"]["value"]
# initial_pos_y = dev.samy.read()["samy"]["value"]
assert mm._config_dict["signals"] == {
"dap": None,
"source": "device_readback",
"x": {
"name": "samx",
"entry": "samx",
"unit": None,
"modifier": None,
"limits": [-50.0, 50.0],
},
"y": {
"name": "samy",
"entry": "samy",
"unit": None,
"modifier": None,
"limits": [-50.0, 50.0],
},
"z": None,
}
assert plt._config_dict["curves"]["bpm4i-bpm4i"]["signals"] == {
"dap": None,
"source": "scan_segment",
"x": {"name": "samx", "entry": "samx", "unit": None, "modifier": None, "limits": None},
"y": {"name": "bpm4i", "entry": "bpm4i", "unit": None, "modifier": None, "limits": None},
"z": None,
}
assert im._config_dict["images"]["eiger"]["monitor"] == "eiger"
# # Try to make a scan
# status = scans.line_scan(dev.samx, -5, 5, steps=10, exp_time=0.05, relative=False)
# status.wait()
# check initial position of motor map
initial_pos_x = dev.samx.read()["samx"]["value"]
initial_pos_y = dev.samy.read()["samy"]["value"]
# # plot
# item = queue.scan_storage.storage[-1]
# plt_last_scan_data = item.live_data if hasattr(item, "live_data") else item.data
# num_elements = 10
# Try to make a scan
status = scans.line_scan(dev.samx, -5, 5, steps=10, exp_time=0.05, relative=False)
status.wait()
# plot_name = "bpm4i-bpm4i"
# plot
item = queue.scan_storage.storage[-1]
plt_last_scan_data = item.live_data if hasattr(item, "live_data") else item.data
num_elements = 10
# qtbot.waitUntil(lambda: check_remote_data_size(plt, plot_name, num_elements))
plot_name = "bpm4i-bpm4i"
# plt_data = plt.get_all_data()
# assert plt_data["bpm4i-bpm4i"]["x"] == plt_last_scan_data["samx"]["samx"].val
# assert plt_data["bpm4i-bpm4i"]["y"] == plt_last_scan_data["bpm4i"]["bpm4i"].val
qtbot.waitUntil(lambda: check_remote_data_size(plt, plot_name, num_elements))
# # image
# last_image_device = client.connector.get_last(MessageEndpoints.device_monitor_2d("eiger"))[
# "data"
# ].data
# time.sleep(0.5)
# last_image_plot = im.images[0].get_data()
# np.testing.assert_equal(last_image_device, last_image_plot)
plt_data = plt.get_all_data()
assert plt_data["bpm4i-bpm4i"]["x"] == plt_last_scan_data["samx"]["samx"].val
assert plt_data["bpm4i-bpm4i"]["y"] == plt_last_scan_data["bpm4i"]["bpm4i"].val
# # motor map
# final_pos_x = dev.samx.read()["samx"]["value"]
# final_pos_y = dev.samy.read()["samy"]["value"]
# image
last_image_device = client.connector.get_last(MessageEndpoints.device_monitor_2d("eiger"))[
"data"
].data
time.sleep(0.5)
last_image_plot = im.images[0].get_data()
np.testing.assert_equal(last_image_device, last_image_plot)
# # check final coordinates of motor map
# motor_map_data = mm.get_data()
# motor map
final_pos_x = dev.samx.read()["samx"]["value"]
final_pos_y = dev.samy.read()["samy"]["value"]
# np.testing.assert_equal(
# [motor_map_data["x"][0], motor_map_data["y"][0]], [initial_pos_x, initial_pos_y]
# )
# np.testing.assert_equal(
# [motor_map_data["x"][-1], motor_map_data["y"][-1]], [final_pos_x, final_pos_y]
# )
# check final coordinates of motor map
motor_map_data = mm.get_data()
np.testing.assert_equal(
[motor_map_data["x"][0], motor_map_data["y"][0]], [initial_pos_x, initial_pos_y]
)
np.testing.assert_equal(
[motor_map_data["x"][-1], motor_map_data["y"][-1]], [final_pos_x, final_pos_y]
)
def test_dock_manipulations_e2e(connected_client_dock):
@@ -339,8 +315,9 @@ def test_rpc_gui_obj(connected_client_gui_obj, qtbot):
assert gui.selected_device is None
assert len(gui.windows) == 1
assert gui.windows["bec"] is gui.bec
mw = gui.bec
assert gui.windows["main"].widget is gui.main
assert gui.windows["main"].title == "BEC Widgets"
mw = gui.main
assert mw.__class__.__name__ == "BECDockArea"
xw = gui.new("X")
@@ -349,10 +326,10 @@ def test_rpc_gui_obj(connected_client_gui_obj, qtbot):
gui_info = gui._dump()
mw_info = gui_info[mw._gui_id]
assert mw_info["title"] == "BEC"
assert mw_info["title"] == "BEC Widgets"
assert mw_info["visible"]
xw_info = gui_info[xw._gui_id]
assert xw_info["title"] == "BEC - X"
assert xw_info["title"] == "X"
assert xw_info["visible"]
gui.hide()
@@ -363,23 +340,23 @@ def test_rpc_gui_obj(connected_client_gui_obj, qtbot):
gui_info = gui._dump()
assert all(windows["visible"] for windows in gui_info.values())
assert gui._gui_is_alive()
gui._close()
assert not gui._gui_is_alive()
gui._start_server(wait=True)
assert gui._gui_is_alive()
assert gui.gui_is_alive()
gui.close()
assert not gui.gui_is_alive()
gui.start_server(wait=True)
assert gui.gui_is_alive()
# calling start multiple times should not change anything
gui._start_server(wait=True)
gui._start()
# gui.windows should have bec with gui_id 'bec'
gui.start_server(wait=True)
gui.start()
# gui.windows should have main, and main dock area should have same gui_id as before
assert len(gui.windows) == 1
assert gui.windows["bec"]._gui_id == mw._gui_id
assert gui.windows["main"].widget._gui_id == mw._gui_id
# communication should work, main dock area should have same id and be visible
gui_info = gui._dump()
assert gui_info[mw._gui_id]["visible"]
with pytest.raises(RuntimeError):
gui.bec.delete()
gui.main.delete()
yw = gui.new("Y")
assert len(gui.windows) == 2
@@ -397,5 +374,5 @@ def test_rpc_call_with_exception_in_safeslot_error_popup(connected_client_gui_ob
qtbot.waitUntil(lambda: len(gui.main.panels) == 2) # default_figure + test
qtbot.wait(500)
with pytest.raises(ValueError):
gui.bec.add_dock("test")
gui.main.add_dock("test")
# time.sleep(0.1)

View File

@@ -1,46 +0,0 @@
def test_gui_rpc_registry(qtbot, connected_client_gui_obj):
gui = connected_client_gui_obj
dock_name = "dock"
global name
name = None
def check_dock_created():
dock_dict = gui._registry_state.get(gui.bec._gui_id, {}).get("config", {}).get("docks", {})
return len(dock_dict) > 0
def check_dock_removed():
dock_dict = gui._registry_state.get(gui.bec._gui_id, {}).get("config", {}).get("docks", {})
return len(dock_dict) == 0
def check_widget_created():
widgets_dict = (
(gui._registry_state.get(gui.bec._gui_id, {}).get("config", {}).get("docks", {}))
.get(dock_name, {})
.get("widgets", {})
)
widget_config = widgets_dict.get(name, {})
return len(widget_config) > 0
def check_widget_remove():
widgets_dict = (
(gui._registry_state.get(gui.bec._gui_id, {}).get("config", {}).get("docks", {}))
.get(dock_name, {})
.get("widgets", {})
)
return name not in widgets_dict
for widget_name, _ in gui.available_widgets.__dict__.items():
name = widget_name
gui.bec.new(dock_name, widget=widget_name, widget_name=widget_name)
qtbot.wait_until(check_dock_created)
qtbot.wait_until(check_widget_created)
assert hasattr(gui.bec.elements, f"{widget_name}")
assert hasattr(gui.bec, f"{dock_name}")
gui.bec.delete(dock_name)
qtbot.wait_until(check_dock_removed)
assert not hasattr(gui.bec.elements, f"{widget_name}")
assert not hasattr(gui.bec, f"{dock_name}")