0
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2025-07-13 19:21:50 +02:00

refactor(rpc_reference): refactor rpc reference tracking

This commit is contained in:
2025-03-19 16:55:00 +01:00
committed by wyzula-jan
parent f3d3c9425d
commit bd5e251ee9
4 changed files with 124 additions and 64 deletions

View File

@ -11,6 +11,7 @@ import subprocess
import threading
import time
from contextlib import contextmanager
from threading import Lock
from typing import TYPE_CHECKING, Any
from bec_lib.endpoints import MessageEndpoints
@ -202,6 +203,7 @@ class BECGuiClient(RPCBase):
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self._lock = Lock()
self._default_dock_name = "bec"
self._auto_updates_enabled = True
self._auto_updates = None
@ -211,7 +213,7 @@ class BECGuiClient(RPCBase):
self._gui_started_event = threading.Event()
self._process = None
self._process_output_processing_thread = None
self._exposed_dock_areas = []
self._exposed_widgets = []
self._registry_state = {}
self._ipython_registry = {}
self.available_widgets = AvailableWidgetsNamespace()
@ -312,6 +314,10 @@ class BECGuiClient(RPCBase):
def kill_server(self) -> None:
"""Kill the GUI server."""
self._top_level.clear()
# Unregister the registry state
self._client.connector.unregister(
MessageEndpoints.gui_registry_state(self._gui_id), cb=self._handle_registry_update
)
self._killed = True
if self._gui_started_timer is not None:
@ -416,6 +422,7 @@ class BECGuiClient(RPCBase):
return self._start_server(wait=wait)
def _handle_registry_update(self, msg: StreamMessage) -> None:
# with self._lock:
self._registry_state = msg["data"].state
self._update_dynamic_namespace()
@ -433,39 +440,51 @@ class BECGuiClient(RPCBase):
with wait_for_server(self):
rpc_client = RPCBase(gui_id=f"{self._gui_id}:window", parent=self)
rpc_client._run_rpc("hide") # pylint: disable=protected-access
# because of the registry callbacks, we may have
# dock areas that are already killed, but not yet
# removed from the registry state
if not self._killed:
for window in self._top_level.values():
window.hide()
def _update_dynamic_namespace(self):
"""Update the dynamic name space"""
self._clear_exposed_dock_areas()
self._cleanup_ipython_registry()
# First we update the name space based on the new registry state
self._add_registry_to_namespace()
def _clear_exposed_dock_areas(self):
"""Clear the exposed dock areas"""
self._top_level.clear()
for widget_id in self._exposed_dock_areas:
delattr(self, widget_id)
self._exposed_dock_areas.clear()
# Then we clear the ipython registry from old objects
self._cleanup_ipython_registry()
def _cleanup_ipython_registry(self):
"""Cleanup the ipython registry"""
remove_ids = []
for widget_id in self._ipython_registry:
if widget_id not in self._registry_state:
remove_ids.append(widget_id)
names_in_registry = list(self._ipython_registry.keys())
remove_ids = list(set(names_in_registry) - set(self._exposed_widgets))
for widget_id in remove_ids:
self._ipython_registry.pop(widget_id)
self._cleanup_rpc_references_on_rpc_base(remove_ids)
# Clear the exposed widgets
self._exposed_widgets.clear()
def _cleanup_rpc_references_on_rpc_base(self, remove_ids: list[str]) -> None:
"""Cleanup the rpc references on the RPCBase object"""
if not remove_ids:
return
for widget in self._ipython_registry.values():
to_delete = []
for attr_name, gui_id in widget._rpc_references.items():
if gui_id in remove_ids:
to_delete.append(attr_name)
for attr_name in to_delete:
if hasattr(widget, attr_name):
delattr(widget, attr_name)
if attr_name.startswith("elements."):
delattr(widget.elements, attr_name.split(".")[1])
widget._rpc_references.pop(attr_name)
def _set_dynamic_attributes(self, obj: object, name: str, value: Any) -> None:
"""Add an object to the namespace"""
setattr(obj, name, value)
def _update_rpc_references(self, widget: RPCBase, name: str, gui_id: str) -> None:
"""Update the RPC references"""
widget._rpc_references[name] = gui_id
def _add_registry_to_namespace(self) -> None:
"""Add registry to namespace"""
# Add dock areas
@ -475,33 +494,51 @@ class BECGuiClient(RPCBase):
if state["widget_class"] == "BECDockArea"
]
for state in dock_area_states:
# obj is an RPC reference to the RPCBase object
dock_area_obj = self._add_widget(state, self)
self._set_dynamic_attributes(self, dock_area_obj.widget_name, dock_area_obj)
dock_area_ref = self._add_widget(state, self)
dock_area = self._ipython_registry.get(dock_area_ref._gui_id)
if not hasattr(dock_area, "elements"):
self._set_dynamic_attributes(dock_area, "elements", WidgetNameSpace())
self._set_dynamic_attributes(self, dock_area.widget_name, dock_area_ref)
# Keep track of rpc references on RPCBase object
self._update_rpc_references(self, dock_area.widget_name, dock_area_ref._gui_id)
# Add dock_area to the top level
self._top_level[dock_area_obj.widget_name] = dock_area_obj
self._exposed_dock_areas.append(dock_area_obj.widget_name)
self._top_level[dock_area_ref.widget_name] = dock_area_ref
self._exposed_widgets.append(dock_area_ref._gui_id)
# Add docks
dock_states = [
state
for state in self._registry_state.values()
if state["config"].get("parent_id", "") == dock_area_obj._gui_id
if state["config"].get("parent_id", "") == dock_area_ref._gui_id
]
for state in dock_states:
dock_obj = self._add_widget(state, dock_area_obj)
self._set_dynamic_attributes(dock_area_obj, dock_obj.widget_name, dock_obj)
dock_ref = self._add_widget(state, dock_area)
dock = self._ipython_registry.get(dock_ref._gui_id)
self._set_dynamic_attributes(dock_area, dock_ref.widget_name, dock_ref)
# Keep track of rpc references on RPCBase object
self._update_rpc_references(dock_area, dock_ref.widget_name, dock_ref._gui_id)
# Keep track of exposed docks
self._exposed_widgets.append(dock_ref._gui_id)
# Add widgets
widget_states = [
state
for state in self._registry_state.values()
if state["config"].get("parent_id", "") == dock_obj._gui_id
if state["config"].get("parent_id", "") == dock_ref._gui_id
]
for state in widget_states:
widget = self._add_widget(state, dock_obj)
self._set_dynamic_attributes(dock_obj, widget.widget_name, widget)
self._set_dynamic_attributes(dock_area_obj.elements, widget.widget_name, widget)
widget_ref = self._add_widget(state, dock)
self._set_dynamic_attributes(dock, widget_ref.widget_name, widget_ref)
self._set_dynamic_attributes(
dock_area.elements, widget_ref.widget_name, widget_ref
)
# Keep track of rpc references on RPCBase object
self._update_rpc_references(
dock_area, f"elements.{widget_ref.widget_name}", widget_ref._gui_id
)
self._update_rpc_references(dock, widget_ref.widget_name, widget_ref._gui_id)
# Keep track of exposed widgets
self._exposed_widgets.append(widget_ref._gui_id)
def _add_widget(self, state: dict, parent: object) -> RPCReference:
"""Add a widget to the namespace
@ -519,9 +556,6 @@ class BECGuiClient(RPCBase):
self._ipython_registry[gui_id] = widget
else:
widget = obj
if widget_class == client.BECDockArea:
# Add elements to dynamic namespace
self._set_dynamic_attributes(widget, "elements", WidgetNameSpace())
obj = RPCReference(registry=self._ipython_registry, gui_id=gui_id)
return obj
@ -599,11 +633,17 @@ if __name__ == "__main__": # pragma: no cover
from bec_lib.client import BECClient
from bec_lib.service_config import ServiceConfig
config = ServiceConfig()
bec_client = BECClient(config)
bec_client.start()
try:
config = ServiceConfig()
bec_client = BECClient(config)
bec_client.start()
# Test the client_utils.py module
gui = BECGuiClient()
gui.start()
print(gui.window_list)
# Test the client_utils.py module
gui = BECGuiClient()
gui.start(wait=True)
print(gui.window_list)
gui.new()
time.sleep(10)
finally:
gui.kill_server()

View File

@ -1,5 +1,6 @@
from __future__ import annotations
import inspect
import threading
import uuid
from functools import wraps
@ -35,6 +36,13 @@ def rpc_call(func):
def wrapper(self, *args, **kwargs):
# we could rely on a strict type check here, but this is more flexible
# moreover, it would anyway crash for objects...
caller_frame = inspect.currentframe().f_back
while caller_frame:
if "jedi" in caller_frame.f_globals:
# Jedi module is present, likely tab completion
return None # func(*args, **kwargs)
caller_frame = caller_frame.f_back
out = []
for arg in args:
if hasattr(arg, "name"):
@ -120,7 +128,7 @@ class RPCBase:
self._msg_wait_event = threading.Event()
self._rpc_response = None
super().__init__()
# print(f"RPCBase: {self._gui_id}")
self._rpc_references: dict[str, str] = {}
def __repr__(self):
type_ = type(self)
@ -171,7 +179,7 @@ class RPCBase:
parameter={"args": args, "kwargs": kwargs, "gui_id": self._gui_id},
metadata={"request_id": request_id},
)
print(f"running and rpc {method}")
# pylint: disable=protected-access
receiver = self._root._gui_id
if wait_for_rpc_response:

View File

@ -1,5 +1,6 @@
from __future__ import annotations
from contextlib import contextmanager
from functools import wraps
from threading import Lock
from typing import TYPE_CHECKING, Callable
@ -17,6 +18,17 @@ if TYPE_CHECKING: # pragma: no cover
logger = bec_logger.logger
@contextmanager
def rpc_register_broadcast(rpc_register):
"""
Context manager to broadcast updates to the RPCRegister whenever a new RPC object is added or removed.
"""
try:
yield rpc_register
finally:
rpc_register.broadcast()
def broadcast_update(func):
"""
Decorator to broadcast updates to the RPCRegister whenever a new RPC object is added or removed.

View File

@ -15,8 +15,7 @@ from bec_lib.utils.import_utils import lazy_import
from qtpy.QtCore import Qt, QTimer
from redis.exceptions import RedisError
from bec_widgets.cli.rpc import rpc_register
from bec_widgets.cli.rpc.rpc_register import RPCRegister
from bec_widgets.cli.rpc.rpc_register import RPCRegister, rpc_register_broadcast
from bec_widgets.qt_utils.error_popups import ErrorPopupUtility
from bec_widgets.utils import BECDispatcher
from bec_widgets.utils.bec_connector import BECConnector
@ -70,8 +69,6 @@ class BECWidgetsCLIServer:
# register broadcast callback
self.rpc_register = RPCRegister()
self.rpc_register.add_callback(self.broadcast_registry_update)
self.gui = gui_class(parent=None, name=gui_class_id, gui_id=gui_class_id)
# self.rpc_register.add_rpc(self.gui)
self.dispatcher.connect_slot(
self.on_rpc_update, MessageEndpoints.gui_instructions(self.gui_id)
@ -83,6 +80,8 @@ class BECWidgetsCLIServer:
self._heartbeat_timer.start(200)
self.status = messages.BECStatus.RUNNING
with rpc_register_broadcast(self.rpc_register):
self.gui = gui_class(parent=None, name=gui_class_id, gui_id=gui_class_id)
logger.success(f"Server started with gui_id: {self.gui_id}")
# Create initial object -> BECFigure or BECDockArea
@ -118,30 +117,31 @@ class BECWidgetsCLIServer:
return obj
def run_rpc(self, obj, method, args, kwargs):
logger.debug(f"Running RPC instruction: {method} with args: {args}, kwargs: {kwargs}")
method_obj = getattr(obj, method)
# check if the method accepts args and kwargs
if not callable(method_obj):
if not args:
res = method_obj
with rpc_register_broadcast(self.rpc_register):
logger.debug(f"Running RPC instruction: {method} with args: {args}, kwargs: {kwargs}")
method_obj = getattr(obj, method)
# check if the method accepts args and kwargs
if not callable(method_obj):
if not args:
res = method_obj
else:
setattr(obj, method, args[0])
res = None
else:
setattr(obj, method, args[0])
res = None
else:
res = method_obj(*args, **kwargs)
res = method_obj(*args, **kwargs)
if isinstance(res, list):
res = [self.serialize_object(obj) for obj in res]
elif isinstance(res, dict):
res = {key: self.serialize_object(val) for key, val in res.items()}
else:
res = self.serialize_object(res)
return res
if isinstance(res, list):
res = [self.serialize_object(obj) for obj in res]
elif isinstance(res, dict):
res = {key: self.serialize_object(val) for key, val in res.items()}
else:
res = self.serialize_object(res)
return res
def serialize_object(self, obj):
if isinstance(obj, BECConnector):
config = {} # obj.config.model_dump()
config["parent_id"] = obj.parent_id
config = obj.config.model_dump()
config["parent_id"] = obj.parent_id # add parent_id to config
return {
"gui_id": obj.gui_id,
"name": (