From 225b73b07351f3152084a2324b8ecb58bb816d88 Mon Sep 17 00:00:00 2001 From: appel_c Date: Wed, 19 Mar 2025 16:55:00 +0100 Subject: [PATCH] refactor(rpc_reference): refactor rpc reference tracking --- bec_widgets/cli/client_utils.py | 118 +++++++++++++++++++--------- bec_widgets/cli/rpc/rpc_base.py | 12 ++- bec_widgets/cli/rpc/rpc_register.py | 12 +++ bec_widgets/cli/server.py | 46 +++++------ 4 files changed, 124 insertions(+), 64 deletions(-) diff --git a/bec_widgets/cli/client_utils.py b/bec_widgets/cli/client_utils.py index e4061b12..a96a9efd 100644 --- a/bec_widgets/cli/client_utils.py +++ b/bec_widgets/cli/client_utils.py @@ -11,6 +11,7 @@ import subprocess import threading import time from contextlib import contextmanager +from threading import Lock from typing import TYPE_CHECKING, Any from bec_lib.endpoints import MessageEndpoints @@ -202,6 +203,7 @@ class BECGuiClient(RPCBase): def __init__(self, **kwargs) -> None: super().__init__(**kwargs) + self._lock = Lock() self._default_dock_name = "bec" self._auto_updates_enabled = True self._auto_updates = None @@ -211,7 +213,7 @@ class BECGuiClient(RPCBase): self._gui_started_event = threading.Event() self._process = None self._process_output_processing_thread = None - self._exposed_dock_areas = [] + self._exposed_widgets = [] self._registry_state = {} self._ipython_registry = {} self.available_widgets = AvailableWidgetsNamespace() @@ -312,6 +314,10 @@ class BECGuiClient(RPCBase): def kill_server(self) -> None: """Kill the GUI server.""" self._top_level.clear() + # Unregister the registry state + self._client.connector.unregister( + MessageEndpoints.gui_registry_state(self._gui_id), cb=self._handle_registry_update + ) self._killed = True if self._gui_started_timer is not None: @@ -416,6 +422,7 @@ class BECGuiClient(RPCBase): return self._start_server(wait=wait) def _handle_registry_update(self, msg: StreamMessage) -> None: + # with self._lock: self._registry_state = msg["data"].state self._update_dynamic_namespace() @@ -433,39 +440,51 @@ class BECGuiClient(RPCBase): with wait_for_server(self): rpc_client = RPCBase(gui_id=f"{self._gui_id}:window", parent=self) rpc_client._run_rpc("hide") # pylint: disable=protected-access - # because of the registry callbacks, we may have - # dock areas that are already killed, but not yet - # removed from the registry state if not self._killed: for window in self._top_level.values(): window.hide() def _update_dynamic_namespace(self): """Update the dynamic name space""" - self._clear_exposed_dock_areas() - self._cleanup_ipython_registry() + # First we update the name space based on the new registry state self._add_registry_to_namespace() - - def _clear_exposed_dock_areas(self): - """Clear the exposed dock areas""" - self._top_level.clear() - for widget_id in self._exposed_dock_areas: - delattr(self, widget_id) - self._exposed_dock_areas.clear() + # Then we clear the ipython registry from old objects + self._cleanup_ipython_registry() def _cleanup_ipython_registry(self): """Cleanup the ipython registry""" - remove_ids = [] - for widget_id in self._ipython_registry: - if widget_id not in self._registry_state: - remove_ids.append(widget_id) + names_in_registry = list(self._ipython_registry.keys()) + remove_ids = list(set(names_in_registry) - set(self._exposed_widgets)) for widget_id in remove_ids: self._ipython_registry.pop(widget_id) + self._cleanup_rpc_references_on_rpc_base(remove_ids) + # Clear the exposed widgets + self._exposed_widgets.clear() + + def _cleanup_rpc_references_on_rpc_base(self, remove_ids: list[str]) -> None: + """Cleanup the rpc references on the RPCBase object""" + if not remove_ids: + return + for widget in self._ipython_registry.values(): + to_delete = [] + for attr_name, gui_id in widget._rpc_references.items(): + if gui_id in remove_ids: + to_delete.append(attr_name) + for attr_name in to_delete: + if hasattr(widget, attr_name): + delattr(widget, attr_name) + if attr_name.startswith("elements."): + delattr(widget.elements, attr_name.split(".")[1]) + widget._rpc_references.pop(attr_name) def _set_dynamic_attributes(self, obj: object, name: str, value: Any) -> None: """Add an object to the namespace""" setattr(obj, name, value) + def _update_rpc_references(self, widget: RPCBase, name: str, gui_id: str) -> None: + """Update the RPC references""" + widget._rpc_references[name] = gui_id + def _add_registry_to_namespace(self) -> None: """Add registry to namespace""" # Add dock areas @@ -475,33 +494,51 @@ class BECGuiClient(RPCBase): if state["widget_class"] == "BECDockArea" ] for state in dock_area_states: - # obj is an RPC reference to the RPCBase object - dock_area_obj = self._add_widget(state, self) - self._set_dynamic_attributes(self, dock_area_obj.widget_name, dock_area_obj) + dock_area_ref = self._add_widget(state, self) + dock_area = self._ipython_registry.get(dock_area_ref._gui_id) + if not hasattr(dock_area, "elements"): + self._set_dynamic_attributes(dock_area, "elements", WidgetNameSpace()) + self._set_dynamic_attributes(self, dock_area.widget_name, dock_area_ref) + # Keep track of rpc references on RPCBase object + self._update_rpc_references(self, dock_area.widget_name, dock_area_ref._gui_id) # Add dock_area to the top level - self._top_level[dock_area_obj.widget_name] = dock_area_obj - self._exposed_dock_areas.append(dock_area_obj.widget_name) + self._top_level[dock_area_ref.widget_name] = dock_area_ref + self._exposed_widgets.append(dock_area_ref._gui_id) # Add docks dock_states = [ state for state in self._registry_state.values() - if state["config"].get("parent_id", "") == dock_area_obj._gui_id + if state["config"].get("parent_id", "") == dock_area_ref._gui_id ] for state in dock_states: - dock_obj = self._add_widget(state, dock_area_obj) - self._set_dynamic_attributes(dock_area_obj, dock_obj.widget_name, dock_obj) + dock_ref = self._add_widget(state, dock_area) + dock = self._ipython_registry.get(dock_ref._gui_id) + self._set_dynamic_attributes(dock_area, dock_ref.widget_name, dock_ref) + # Keep track of rpc references on RPCBase object + self._update_rpc_references(dock_area, dock_ref.widget_name, dock_ref._gui_id) + # Keep track of exposed docks + self._exposed_widgets.append(dock_ref._gui_id) # Add widgets widget_states = [ state for state in self._registry_state.values() - if state["config"].get("parent_id", "") == dock_obj._gui_id + if state["config"].get("parent_id", "") == dock_ref._gui_id ] for state in widget_states: - widget = self._add_widget(state, dock_obj) - self._set_dynamic_attributes(dock_obj, widget.widget_name, widget) - self._set_dynamic_attributes(dock_area_obj.elements, widget.widget_name, widget) + widget_ref = self._add_widget(state, dock) + self._set_dynamic_attributes(dock, widget_ref.widget_name, widget_ref) + self._set_dynamic_attributes( + dock_area.elements, widget_ref.widget_name, widget_ref + ) + # Keep track of rpc references on RPCBase object + self._update_rpc_references( + dock_area, f"elements.{widget_ref.widget_name}", widget_ref._gui_id + ) + self._update_rpc_references(dock, widget_ref.widget_name, widget_ref._gui_id) + # Keep track of exposed widgets + self._exposed_widgets.append(widget_ref._gui_id) def _add_widget(self, state: dict, parent: object) -> RPCReference: """Add a widget to the namespace @@ -519,9 +556,6 @@ class BECGuiClient(RPCBase): self._ipython_registry[gui_id] = widget else: widget = obj - if widget_class == client.BECDockArea: - # Add elements to dynamic namespace - self._set_dynamic_attributes(widget, "elements", WidgetNameSpace()) obj = RPCReference(registry=self._ipython_registry, gui_id=gui_id) return obj @@ -599,11 +633,17 @@ if __name__ == "__main__": # pragma: no cover from bec_lib.client import BECClient from bec_lib.service_config import ServiceConfig - config = ServiceConfig() - bec_client = BECClient(config) - bec_client.start() + try: + config = ServiceConfig() + bec_client = BECClient(config) + bec_client.start() - # Test the client_utils.py module - gui = BECGuiClient() - gui.start() - print(gui.window_list) + # Test the client_utils.py module + gui = BECGuiClient() + + gui.start(wait=True) + print(gui.window_list) + gui.new() + time.sleep(10) + finally: + gui.kill_server() diff --git a/bec_widgets/cli/rpc/rpc_base.py b/bec_widgets/cli/rpc/rpc_base.py index 21b4346b..ab8fa51a 100644 --- a/bec_widgets/cli/rpc/rpc_base.py +++ b/bec_widgets/cli/rpc/rpc_base.py @@ -1,5 +1,6 @@ from __future__ import annotations +import inspect import threading import uuid from functools import wraps @@ -35,6 +36,13 @@ def rpc_call(func): def wrapper(self, *args, **kwargs): # we could rely on a strict type check here, but this is more flexible # moreover, it would anyway crash for objects... + caller_frame = inspect.currentframe().f_back + while caller_frame: + if "jedi" in caller_frame.f_globals: + # Jedi module is present, likely tab completion + return None # func(*args, **kwargs) + caller_frame = caller_frame.f_back + out = [] for arg in args: if hasattr(arg, "name"): @@ -120,7 +128,7 @@ class RPCBase: self._msg_wait_event = threading.Event() self._rpc_response = None super().__init__() - # print(f"RPCBase: {self._gui_id}") + self._rpc_references: dict[str, str] = {} def __repr__(self): type_ = type(self) @@ -171,7 +179,7 @@ class RPCBase: parameter={"args": args, "kwargs": kwargs, "gui_id": self._gui_id}, metadata={"request_id": request_id}, ) - + print(f"running and rpc {method}") # pylint: disable=protected-access receiver = self._root._gui_id if wait_for_rpc_response: diff --git a/bec_widgets/cli/rpc/rpc_register.py b/bec_widgets/cli/rpc/rpc_register.py index bfe8a163..719fa399 100644 --- a/bec_widgets/cli/rpc/rpc_register.py +++ b/bec_widgets/cli/rpc/rpc_register.py @@ -1,5 +1,6 @@ from __future__ import annotations +from contextlib import contextmanager from functools import wraps from threading import Lock from typing import TYPE_CHECKING, Callable @@ -17,6 +18,17 @@ if TYPE_CHECKING: # pragma: no cover logger = bec_logger.logger +@contextmanager +def rpc_register_broadcast(rpc_register): + """ + Context manager to broadcast updates to the RPCRegister whenever a new RPC object is added or removed. + """ + try: + yield rpc_register + finally: + rpc_register.broadcast() + + def broadcast_update(func): """ Decorator to broadcast updates to the RPCRegister whenever a new RPC object is added or removed. diff --git a/bec_widgets/cli/server.py b/bec_widgets/cli/server.py index 04a1e0ce..101a3c57 100644 --- a/bec_widgets/cli/server.py +++ b/bec_widgets/cli/server.py @@ -15,8 +15,7 @@ from bec_lib.utils.import_utils import lazy_import from qtpy.QtCore import Qt, QTimer from redis.exceptions import RedisError -from bec_widgets.cli.rpc import rpc_register -from bec_widgets.cli.rpc.rpc_register import RPCRegister +from bec_widgets.cli.rpc.rpc_register import RPCRegister, rpc_register_broadcast from bec_widgets.qt_utils.error_popups import ErrorPopupUtility from bec_widgets.utils import BECDispatcher from bec_widgets.utils.bec_connector import BECConnector @@ -70,8 +69,6 @@ class BECWidgetsCLIServer: # register broadcast callback self.rpc_register = RPCRegister() self.rpc_register.add_callback(self.broadcast_registry_update) - self.gui = gui_class(parent=None, name=gui_class_id, gui_id=gui_class_id) - # self.rpc_register.add_rpc(self.gui) self.dispatcher.connect_slot( self.on_rpc_update, MessageEndpoints.gui_instructions(self.gui_id) @@ -83,6 +80,8 @@ class BECWidgetsCLIServer: self._heartbeat_timer.start(200) self.status = messages.BECStatus.RUNNING + with rpc_register_broadcast(self.rpc_register): + self.gui = gui_class(parent=None, name=gui_class_id, gui_id=gui_class_id) logger.success(f"Server started with gui_id: {self.gui_id}") # Create initial object -> BECFigure or BECDockArea @@ -118,30 +117,31 @@ class BECWidgetsCLIServer: return obj def run_rpc(self, obj, method, args, kwargs): - logger.debug(f"Running RPC instruction: {method} with args: {args}, kwargs: {kwargs}") - method_obj = getattr(obj, method) - # check if the method accepts args and kwargs - if not callable(method_obj): - if not args: - res = method_obj + with rpc_register_broadcast(self.rpc_register): + logger.debug(f"Running RPC instruction: {method} with args: {args}, kwargs: {kwargs}") + method_obj = getattr(obj, method) + # check if the method accepts args and kwargs + if not callable(method_obj): + if not args: + res = method_obj + else: + setattr(obj, method, args[0]) + res = None else: - setattr(obj, method, args[0]) - res = None - else: - res = method_obj(*args, **kwargs) + res = method_obj(*args, **kwargs) - if isinstance(res, list): - res = [self.serialize_object(obj) for obj in res] - elif isinstance(res, dict): - res = {key: self.serialize_object(val) for key, val in res.items()} - else: - res = self.serialize_object(res) - return res + if isinstance(res, list): + res = [self.serialize_object(obj) for obj in res] + elif isinstance(res, dict): + res = {key: self.serialize_object(val) for key, val in res.items()} + else: + res = self.serialize_object(res) + return res def serialize_object(self, obj): if isinstance(obj, BECConnector): - config = {} # obj.config.model_dump() - config["parent_id"] = obj.parent_id + config = obj.config.model_dump() + config["parent_id"] = obj.parent_id # add parent_id to config return { "gui_id": obj.gui_id, "name": (