0
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2025-07-14 03:31:50 +02:00

fix: fix rpc after qapp refactoring; simplified update logic

This commit is contained in:
2025-03-27 20:44:35 +01:00
committed by wyzula-jan
parent accaeed832
commit f423e8463d
2 changed files with 91 additions and 122 deletions

View File

@ -10,11 +10,11 @@ import threading
import time
from contextlib import contextmanager
from threading import Lock
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Literal, TypeAlias
from bec_lib.endpoints import MessageEndpoints
from bec_lib.logger import bec_logger
from bec_lib.utils.import_utils import lazy_import, lazy_import_from
from bec_lib.utils.import_utils import lazy_import_from
from rich.console import Console
from rich.table import Table
@ -28,7 +28,11 @@ else:
logger = bec_logger.logger
IGNORE_WIDGETS = ["BECDockArea", "BECDock"]
IGNORE_WIDGETS = ["LaunchWindow"]
RegistryState: TypeAlias = dict[
Literal["gui_id", "name", "widget_class", "config", "__rpc__"], str | bool | dict
]
# pylint: disable=redefined-outer-scope
@ -82,7 +86,7 @@ def _start_plot_process(
"--id",
gui_id,
"--gui_class",
gui_class.__name__,
"launcher",
"--gui_class_id",
gui_class_id,
"--hide",
@ -199,21 +203,25 @@ class BECGuiClient(RPCBase):
self._auto_updates_enabled = True
self._auto_updates = None
self._killed = False
self._top_level: dict[str, RPCBase] = {} # TODO should be more general than just DockArea
self._top_level: dict[str, RPCReference] = {}
self._startup_timeout = 0
self._gui_started_timer = None
self._gui_started_event = threading.Event()
self._process = None
self._process_output_processing_thread = None
self._exposed_widgets = []
self._server_registry = {}
self._ipython_registry = {}
self._server_registry: dict[str, RegistryState] = {}
self._ipython_registry: dict[str, RPCReference] = {}
self.available_widgets = AvailableWidgetsNamespace()
####################
#### Client API ####
####################
@property
def launcher(self) -> RPCBase:
"""The launcher object."""
return RPCBase(gui_id=f"{self._gui_id}:launcher", parent=self, name="launcher")
def connect_to_gui_server(self, gui_id: str) -> None:
"""Connect to a GUI server"""
# Unregister the old callback
@ -221,18 +229,20 @@ class BECGuiClient(RPCBase):
MessageEndpoints.gui_registry_state(self._gui_id), cb=self._handle_registry_update
)
self._gui_id = gui_id
# Get the registry state
msgs = self._client.connector.xread(
MessageEndpoints.gui_registry_state(self._gui_id), count=1
)
if msgs:
self._handle_registry_update(msgs[0])
# reset the namespace
self._update_dynamic_namespace({})
self._server_registry = {}
self._top_level = {}
self._ipython_registry = {}
# Register the new callback
self._client.connector.register(
MessageEndpoints.gui_registry_state(self._gui_id), cb=self._handle_registry_update
MessageEndpoints.gui_registry_state(self._gui_id),
cb=self._handle_registry_update,
parent=self,
from_start=True,
)
self._update_dynamic_namespace() # FIXME don't refresh the namespace
# TODO do a cleanup of the previous server...
@property
def windows(self) -> dict:
@ -277,12 +287,12 @@ class BECGuiClient(RPCBase):
self.start(wait=True)
if wait:
with wait_for_server(self):
rpc_client = RPCBase(gui_id=f"{self._gui_id}:window", parent=self)
rpc_client = RPCBase(gui_id=f"{self._gui_id}:launcher", parent=self)
widget = rpc_client._run_rpc(
"new_dock_area", name, geometry
"launch", "dock_area", name, geometry
) # pylint: disable=protected-access
return widget
rpc_client = RPCBase(gui_id=f"{self._gui_id}:window", parent=self)
rpc_client = RPCBase(gui_id=f"{self._gui_id}:launcher", parent=self)
widget = rpc_client._run_rpc(
"new_dock_area", name, geometry
) # pylint: disable=protected-access
@ -358,7 +368,6 @@ class BECGuiClient(RPCBase):
time.sleep(0.1)
else:
break
self._do_show_all()
self._gui_started_event.set()
def _start_server(self, wait: bool = False) -> None:
@ -382,7 +391,7 @@ class BECGuiClient(RPCBase):
if callable(callback):
callback()
finally:
threading.current_thread().cancel()
threading.current_thread().cancel() # type: ignore
self._gui_started_timer = RepeatTimer(
0.5, lambda: self._gui_is_alive() and gui_started_callback(self._gui_post_startup)
@ -393,24 +402,29 @@ class BECGuiClient(RPCBase):
self._gui_started_event.wait()
def _dump(self):
rpc_client = RPCBase(gui_id=f"{self._gui_id}:window", parent=self)
rpc_client = RPCBase(gui_id=f"{self._gui_id}:launcher", parent=self)
return rpc_client._run_rpc("_dump")
def _start(self, wait: bool = False) -> None:
self._killed = False
self._client.connector.register(
MessageEndpoints.gui_registry_state(self._gui_id), cb=self._handle_registry_update
MessageEndpoints.gui_registry_state(self._gui_id),
cb=self._handle_registry_update,
parent=self,
)
return self._start_server(wait=wait)
def _handle_registry_update(self, msg: StreamMessage) -> None:
@staticmethod
def _handle_registry_update(msg: StreamMessage, parent: BECGuiClient) -> None:
# This was causing a deadlock during shutdown, not sure why.
# with self._lock:
print(msg)
self = parent
self._server_registry = msg["data"].state
self._update_dynamic_namespace()
self._update_dynamic_namespace(self._server_registry)
def _do_show_all(self):
rpc_client = RPCBase(gui_id=f"{self._gui_id}:window", parent=self)
rpc_client = RPCBase(gui_id=f"{self._gui_id}:launcher", parent=self)
rpc_client._run_rpc("show") # pylint: disable=protected-access
for window in self._top_level.values():
window.show()
@ -421,112 +435,44 @@ class BECGuiClient(RPCBase):
def _hide_all(self):
with wait_for_server(self):
rpc_client = RPCBase(gui_id=f"{self._gui_id}:window", parent=self)
rpc_client = RPCBase(gui_id=f"{self._gui_id}:launcher", parent=self)
rpc_client._run_rpc("hide") # pylint: disable=protected-access
if not self._killed:
for window in self._top_level.values():
window.hide()
def _update_dynamic_namespace(self):
def _update_dynamic_namespace(self, server_registry: dict):
"""Update the dynamic name space"""
# Clear the top level
self._top_level.clear()
# First we update the name space based on the new registry state
self._add_registry_to_namespace()
# Then we clear the ipython registry from old objects
self._cleanup_ipython_registry()
print("Updating dynamic namespace")
for state in server_registry.values():
if state["widget_class"] in IGNORE_WIDGETS:
continue
self._add_widget(state, self)
def _cleanup_ipython_registry(self):
"""Cleanup the ipython registry"""
names_in_registry = list(self._ipython_registry.keys())
names_in_server_state = list(self._server_registry.keys())
remove_ids = list(set(names_in_registry) - set(names_in_server_state))
for widget_id in remove_ids:
self._ipython_registry.pop(widget_id)
self._cleanup_rpc_references_on_rpc_base(remove_ids)
# Clear the exposed widgets
self._exposed_widgets.clear() # No longer needed I think
def _cleanup_rpc_references_on_rpc_base(self, remove_ids: list[str]) -> None:
"""Cleanup the rpc references on the RPCBase object"""
if not remove_ids:
return
for widget in self._ipython_registry.values():
to_delete = []
for attr_name, gui_id in widget._rpc_references.items():
if gui_id in remove_ids:
to_delete.append(attr_name)
for attr_name in to_delete:
if hasattr(widget, attr_name):
delattr(widget, attr_name)
if attr_name.startswith("elements."):
delattr(widget.elements, attr_name.split(".")[1])
widget._rpc_references.pop(attr_name)
widget._refresh_references()
def _set_dynamic_attributes(self, obj: object, name: str, value: Any) -> None:
"""Add an object to the namespace"""
setattr(obj, name, value)
def _update_rpc_references(self, widget: RPCBase, name: str, gui_id: str) -> None:
"""Update the RPC references"""
widget._rpc_references[name] = gui_id
def _add_registry_to_namespace(self) -> None:
"""Add registry to namespace"""
# Add dock areas
dock_area_states = [
# get all top-level widgets. These are widgets that have no parent
top_level_widgets = [
state
for state in self._server_registry.values()
if state["widget_class"] == "BECDockArea"
for state in server_registry.values()
if not state["config"].get("parent_id") and state["widget_class"] not in IGNORE_WIDGETS
]
for state in dock_area_states:
dock_area_ref = self._add_widget(state, self)
dock_area = self._ipython_registry.get(dock_area_ref._gui_id)
if not hasattr(dock_area, "elements"):
self._set_dynamic_attributes(dock_area, "elements", WidgetNameSpace())
self._set_dynamic_attributes(self, dock_area.widget_name, dock_area_ref)
# Keep track of rpc references on RPCBase object
self._update_rpc_references(self, dock_area.widget_name, dock_area_ref._gui_id)
# Add dock_area to the top level
self._top_level[dock_area_ref.widget_name] = dock_area_ref
self._exposed_widgets.append(dock_area_ref._gui_id)
removed_widgets = set(self._top_level.keys()) - set(
[state["name"] for state in top_level_widgets]
)
# Add docks
dock_states = [
state
for state in self._server_registry.values()
if state["config"].get("parent_id", "") == dock_area_ref._gui_id
]
for state in dock_states:
dock_ref = self._add_widget(state, dock_area)
dock = self._ipython_registry.get(dock_ref._gui_id)
self._set_dynamic_attributes(dock_area, dock_ref.widget_name, dock_ref)
# Keep track of rpc references on RPCBase object
self._update_rpc_references(dock_area, dock_ref.widget_name, dock_ref._gui_id)
# Keep track of exposed docks
self._exposed_widgets.append(dock_ref._gui_id)
for widget_name in removed_widgets:
delattr(self, widget_name)
# Add widgets
widget_states = [
state
for state in self._server_registry.values()
if state["config"].get("parent_id", "") == dock_ref._gui_id
]
for state in widget_states:
widget_ref = self._add_widget(state, dock)
self._set_dynamic_attributes(dock, widget_ref.widget_name, widget_ref)
self._set_dynamic_attributes(
dock_area.elements, widget_ref.widget_name, widget_ref
)
# Keep track of rpc references on RPCBase object
self._update_rpc_references(
dock_area, f"elements.{widget_ref.widget_name}", widget_ref._gui_id
)
self._update_rpc_references(dock, widget_ref.widget_name, widget_ref._gui_id)
# Keep track of exposed widgets
self._exposed_widgets.append(widget_ref._gui_id)
for state in top_level_widgets:
setattr(self, state["name"], self._ipython_registry[state["gui_id"]])
def _add_widget(self, state: dict, parent: object) -> RPCReference:
self._top_level = {
state["name"]: self._ipython_registry[state["gui_id"]] for state in top_level_widgets
}
def _add_widget(self, state: dict, parent: object) -> RPCReference | None:
"""Add a widget to the namespace
Args:
@ -535,6 +481,8 @@ class BECGuiClient(RPCBase):
"""
name = state["name"]
gui_id = state["gui_id"]
if state["widget_class"] in IGNORE_WIDGETS:
return
widget_class = getattr(client, state["widget_class"])
obj = self._ipython_registry.get(gui_id)
if obj is None:

View File

@ -15,6 +15,9 @@ import bec_widgets.cli.client as client
if TYPE_CHECKING: # pragma: no cover
from bec_lib import messages
from bec_lib.connector import MessageObject
from bec_widgets.cli.client_utils import BECGuiClient
else:
messages = lazy_import("bec_lib.messages")
# from bec_lib.connector import MessageObject
@ -152,7 +155,7 @@ class RPCBase:
return self._name
@property
def _root(self):
def _root(self) -> BECGuiClient:
"""
Get the root widget. This is the BECFigure widget that holds
the anchor gui_id.
@ -163,7 +166,7 @@ class RPCBase:
parent = parent._parent
return parent
def _run_rpc(self, method, *args, wait_for_rpc_response=True, timeout=3, **kwargs) -> Any:
def _run_rpc(self, method, *args, wait_for_rpc_response=True, timeout=300, **kwargs) -> Any:
"""
Run the RPC call.
@ -236,13 +239,14 @@ class RPCBase:
cls = getattr(client, cls)
# The namespace of the object will be updated dynamically on the client side
# Therefor it is important to check if the object is already in the registry
# Therefore it is important to check if the object is already in the registry
# If yes, we return the reference to the object, otherwise we create a new object
# pylint: disable=protected-access
if msg_result["gui_id"] in self._root._ipython_registry:
return RPCReference(self._root._ipython_registry, msg_result["gui_id"])
ret = cls(parent=self, **msg_result)
self._root._ipython_registry[ret._gui_id] = ret
self._refresh_references()
obj = RPCReference(self._root._ipython_registry, ret._gui_id)
return obj
# return ret
@ -258,3 +262,20 @@ class RPCBase:
if heart.status == messages.BECStatus.RUNNING:
return True
return False
def _refresh_references(self):
"""
Refresh the references.
"""
with self._root._lock:
references = {}
for key, val in self._root._server_registry.items():
parent_id = val["config"].get("parent_id")
if parent_id == self._gui_id:
references[key] = val["config"]["gui_id"]
removed_references = set(self._rpc_references.keys()) - set(references.keys())
self._rpc_references = references
for key in removed_references:
delattr(self, key)
for key, val in references.items():
setattr(self, key, RPCReference(self._root._ipython_registry, val))