1
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2026-03-08 01:37:52 +01:00

wip - feat(rpc): added rpc broadcast

This commit is contained in:
2025-02-19 13:03:44 +01:00
committed by wyzula-jan
parent 705f157c04
commit 1f4e740dca
4 changed files with 95 additions and 4 deletions

View File

@@ -179,6 +179,7 @@ class BECGuiClient(RPCBase):
self._gui_started_event = threading.Event()
self._process = None
self._process_output_processing_thread = None
self._exposed_widgets = []
@property
def windows(self):
@@ -329,19 +330,31 @@ class BECGuiClient(RPCBase):
with wait_for_server(self):
return self._top_level["main"].widget
def new(self, title):
def new(self, title: str = None) -> BECDockArea:
"""Ask main window to create a new top-level dock area"""
with wait_for_server(self):
rpc_client = RPCBase(gui_id=f"{self._gui_id}:window", parent=self)
widget = rpc_client._run_rpc("new_dock_area", title)
self._top_level[widget._gui_id] = WidgetDesc(title=title, widget=widget)
self._top_level[widget._gui_id] = widget
setattr(self, widget._gui_id, widget)
self._exposed_widgets.append(widget._gui_id)
return widget
def _update_top_level_widgets(self):
for widget_id in self._exposed_widgets:
delattr(self, widget_id)
self._exposed_widgets.clear()
for widget_id, widget in self._top_level.items():
setattr(self, widget_id, widget)
self._exposed_widgets.append(widget_id)
def close(self) -> None:
"""
Close the gui window.
"""
self._top_level.clear()
self._update_top_level_widgets()
if self._gui_started_timer is not None:
self._gui_started_timer.cancel()

View File

@@ -1,11 +1,27 @@
from __future__ import annotations
from functools import wraps
from threading import Lock
from typing import Callable
from weakref import WeakValueDictionary
from qtpy.QtCore import QObject
def broadcast_update(func):
"""
Decorator to broadcast updates to the RPCRegister whenever a new RPC object is added or removed.
"""
@wraps(func)
def wrapper(self, *args, **kwargs):
result = func(self, *args, **kwargs)
self.broadcast()
return result
return wrapper
class RPCRegister:
"""
A singleton class that keeps track of all the RPC objects registered in the system for CLI usage.
@@ -26,7 +42,9 @@ class RPCRegister:
return
self._rpc_register = WeakValueDictionary()
self._initialized = True
self.callbacks = []
@broadcast_update
def add_rpc(self, rpc: QObject):
"""
Add an RPC object to the register.
@@ -38,6 +56,7 @@ class RPCRegister:
raise ValueError("RPC object must have a 'gui_id' attribute.")
self._rpc_register[rpc.gui_id] = rpc
@broadcast_update
def remove_rpc(self, rpc: str):
"""
Remove an RPC object from the register.
@@ -73,6 +92,38 @@ class RPCRegister:
connections = dict(self._rpc_register)
return connections
def get_rpc_by_type(self, type_name) -> list[str]:
"""
Get all RPC objects of a certain type.
Args:
type_name(str): The type of the RPC object to be retrieved.
Returns:
list: A list of RPC objects of the given type.
"""
rpc_objects = [rpc for rpc in self._rpc_register if rpc.startswith(type_name)]
return rpc_objects
def broadcast(self):
"""
Broadcast the update to all the callbacks.
"""
print("Broadcasting")
connections = self.list_all_connections()
for callback in self.callbacks:
callback(connections)
def add_callback(self, callback: Callable[[dict], None]):
"""
Add a callback that will be called whenever the registry is updated.
Args:
callback(Callable[[dict], None]): The callback to be added. It should accept a dictionary of all the
registered RPC objects as an argument.
"""
self.callbacks.append(callback)
@classmethod
def reset_singleton(cls):
"""

View File

@@ -152,6 +152,24 @@ class BECWidgetsCLIServer:
except RedisError as exc:
logger.error(f"Error while emitting heartbeat: {exc}")
def broadcast_registry_update(self, connections: dict):
"""
Broadcast the updated registry to all clients.
"""
# We only need to broadcast the dock areas
data = {
key: self.serialize_object(val)
for key, val in connections.items()
if val.__class__.__name__ == "BECDockArea"
}
logger.info(f"Broadcasting registry update: {data}")
# self.client.connector.set(
# MessageEndpoints.gui_registry_update(self.gui_id),
# messages.RegistryUpdateMessage(connections=connections),
# expire=10,
# )
def shutdown(self): # TODO not sure if needed when cleanup is done at level of BECConnector
logger.info(f"Shutting down server with gui_id: {self.gui_id}")
self.status = messages.BECStatus.IDLE
@@ -263,6 +281,7 @@ def main():
win.setWindowTitle("BEC Widgets")
RPCRegister().add_rpc(win)
RPCRegister().add_callback(server.broadcast_registry_update)
gui = server.gui
win.setCentralWidget(gui)

View File

@@ -1,5 +1,6 @@
from qtpy.QtWidgets import QApplication, QMainWindow
from bec_widgets.cli.rpc.rpc_register import RPCRegister
from bec_widgets.utils import BECConnector
from bec_widgets.widgets.containers.dock.dock_area import BECDockArea
@@ -33,8 +34,15 @@ class BECMainWindow(QMainWindow, BECConnector):
}
return info
def new_dock_area(self, name):
dock_area = BECDockArea()
def new_dock_area(self, name=None):
name = name or "BEC Widgets"
self.rpc_register = RPCRegister()
gui_id = name.replace(" ", "_")
existing_widgets = self.rpc_register.get_rpc_by_type(gui_id)
if existing_widgets:
name = f"{name} {len(existing_widgets) + 1}"
dock_area = BECDockArea(gui_id=name.replace(" ", "_"))
dock_area.resize(dock_area.minimumSizeHint())
dock_area.window().setWindowTitle(name)
dock_area.show()