From ae1c71e704a1ac426abb124b758372a7c2fd266e Mon Sep 17 00:00:00 2001 From: appel_c Date: Mon, 24 Mar 2025 13:53:53 +0100 Subject: [PATCH] fix: broadcast context manager to emit registry changes just once --- bec_widgets/cli/client_utils.py | 3 +- bec_widgets/cli/rpc/rpc_base.py | 2 +- bec_widgets/cli/rpc/rpc_register.py | 49 ++++++++++++++----- bec_widgets/cli/server.py | 13 +++-- bec_widgets/utils/bec_widget.py | 4 +- .../widgets/containers/dock/dock_area.py | 5 +- .../containers/main_window/main_window.py | 5 +- tests/end-2-end/test_bec_dock_rpc_e2e.py | 11 +++-- 8 files changed, 59 insertions(+), 33 deletions(-) diff --git a/bec_widgets/cli/client_utils.py b/bec_widgets/cli/client_utils.py index 6c48da8f..732a0669 100644 --- a/bec_widgets/cli/client_utils.py +++ b/bec_widgets/cli/client_utils.py @@ -192,8 +192,6 @@ class AvailableWidgetsNamespace: class BECGuiClient(RPCBase): """BEC GUI client class. Container for GUI applications within Python.""" - _top_level: dict[str, client.BECDockArea] = {} - def __init__(self, **kwargs) -> None: super().__init__(**kwargs) self._lock = Lock() @@ -201,6 +199,7 @@ class BECGuiClient(RPCBase): self._auto_updates_enabled = True self._auto_updates = None self._killed = False + self._top_level: dict[str, client.BECDockArea] = {} self._startup_timeout = 0 self._gui_started_timer = None self._gui_started_event = threading.Event() diff --git a/bec_widgets/cli/rpc/rpc_base.py b/bec_widgets/cli/rpc/rpc_base.py index 652337b1..206395e3 100644 --- a/bec_widgets/cli/rpc/rpc_base.py +++ b/bec_widgets/cli/rpc/rpc_base.py @@ -12,7 +12,7 @@ from bec_lib.utils.import_utils import lazy_import, lazy_import_from import bec_widgets.cli.client as client -if TYPE_CHECKING: +if TYPE_CHECKING: # pragma: no cover from bec_lib import messages from bec_lib.connector import MessageObject else: diff --git a/bec_widgets/cli/rpc/rpc_register.py b/bec_widgets/cli/rpc/rpc_register.py index 5f0d7a2f..69c82667 100644 --- a/bec_widgets/cli/rpc/rpc_register.py +++ b/bec_widgets/cli/rpc/rpc_register.py @@ -1,6 +1,5 @@ from __future__ import annotations -from contextlib import contextmanager from functools import wraps from threading import Lock from typing import TYPE_CHECKING, Callable @@ -18,26 +17,16 @@ if TYPE_CHECKING: # pragma: no cover logger = bec_logger.logger -@contextmanager -def rpc_register_broadcast(rpc_register): - """ - Context manager to broadcast updates to the RPCRegister whenever a new RPC object is added or removed. - """ - try: - yield rpc_register - finally: - rpc_register.broadcast() - - def broadcast_update(func): """ Decorator to broadcast updates to the RPCRegister whenever a new RPC object is added or removed. + If class attribute _skip_broadcast is set to True, the broadcast will be skipped """ @wraps(func) def wrapper(self, *args, **kwargs): result = func(self, *args, **kwargs) - # self.broadcast() + self.broadcast() return result return wrapper @@ -51,6 +40,7 @@ class RPCRegister: _instance = None _initialized = False _lock = Lock() + _skip_broadcast = False def __new__(cls, *args, **kwargs): if cls._instance is None: @@ -62,9 +52,18 @@ class RPCRegister: if self._initialized: return self._rpc_register = WeakValueDictionary() + self._broadcast_on_hold = RPCRegisterBroadcast(self) self._initialized = True self.callbacks = [] + @classmethod + def delayed_broadcast(cls): + """ + Delay the broadcast of the update to all the callbacks. + """ + register = cls() + return register._broadcast_on_hold + @broadcast_update def add_rpc(self, rpc: QObject): """ @@ -130,6 +129,9 @@ class RPCRegister: """ Broadcast the update to all the callbacks. """ + + if self._skip_broadcast: + return connections = self.list_all_connections() for callback in self.callbacks: callback(connections) @@ -151,3 +153,24 @@ class RPCRegister: """ cls._instance = None cls._initialized = False + + +class RPCRegisterBroadcast: + """Context manager for RPCRegister broadcast.""" + + def __init__(self, rpc_register: RPCRegister) -> None: + self.rpc_register = rpc_register + self._call_depth = 0 + + def __enter__(self): + """Enter the context manager""" + self._call_depth += 1 # Needed for nested calls + self.rpc_register._skip_broadcast = True + return self.rpc_register + + def __exit__(self, *exc): + """Exit the context manager""" + self._call_depth -= 1 # Remove nested calls + if self._call_depth == 0: # Last one to exit is repsonsible for broadcasting + self.rpc_register._skip_broadcast = False + self.rpc_register.broadcast() diff --git a/bec_widgets/cli/server.py b/bec_widgets/cli/server.py index c19ce60d..4c7970ef 100644 --- a/bec_widgets/cli/server.py +++ b/bec_widgets/cli/server.py @@ -15,7 +15,7 @@ from bec_lib.utils.import_utils import lazy_import from qtpy.QtCore import Qt, QTimer from redis.exceptions import RedisError -from bec_widgets.cli.rpc.rpc_register import RPCRegister, rpc_register_broadcast +from bec_widgets.cli.rpc.rpc_register import RPCRegister from bec_widgets.qt_utils.error_popups import ErrorPopupUtility from bec_widgets.utils import BECDispatcher from bec_widgets.utils.bec_connector import BECConnector @@ -80,7 +80,7 @@ class BECWidgetsCLIServer: self._heartbeat_timer.start(200) self.status = messages.BECStatus.RUNNING - with rpc_register_broadcast(self.rpc_register): + with RPCRegister.delayed_broadcast(): self.gui = gui_class(parent=None, name=gui_class_id, gui_id=gui_class_id) logger.success(f"Server started with gui_id: {self.gui_id}") # Create initial object -> BECFigure or BECDockArea @@ -118,7 +118,7 @@ class BECWidgetsCLIServer: def run_rpc(self, obj, method, args, kwargs): # Run with rpc registry broadcast, but only once - with rpc_register_broadcast(self.rpc_register): + with RPCRegister.delayed_broadcast(): logger.debug(f"Running RPC instruction: {method} with args: {args}, kwargs: {kwargs}") method_obj = getattr(obj, method) # check if the method accepts args and kwargs @@ -186,7 +186,9 @@ class BECWidgetsCLIServer: self.status = messages.BECStatus.IDLE self._heartbeat_timer.stop() self.emit_heartbeat() + logger.info(f"Shutting down app with {self.gui.gui_id}") self.gui.close() + logger.info("Succeded in shutting down gui") self.client.shutdown() @@ -317,13 +319,14 @@ def main(): # display message, for people to let it terminate gracefully print("Caught SIGINT, exiting") # Close all widgets - rpc_register = RPCRegister() - with rpc_register_broadcast(rpc_register): + with RPCRegister.delayed_broadcast(): for widget in QApplication.instance().topLevelWidgets(): widget.close() app.quit() + # gui.bec.close() + # win.shutdown() signal.signal(signal.SIGINT, sigint_handler) signal.signal(signal.SIGTERM, sigint_handler) diff --git a/bec_widgets/utils/bec_widget.py b/bec_widgets/utils/bec_widget.py index 7a172a9c..b68fdd2d 100644 --- a/bec_widgets/utils/bec_widget.py +++ b/bec_widgets/utils/bec_widget.py @@ -7,7 +7,7 @@ from bec_lib.logger import bec_logger from qtpy.QtCore import Slot from qtpy.QtWidgets import QApplication, QWidget -from bec_widgets.cli.rpc.rpc_register import rpc_register_broadcast +from bec_widgets.cli.rpc.rpc_register import RPCRegister from bec_widgets.utils.bec_connector import BECConnector, ConnectionConfig from bec_widgets.utils.colors import set_theme @@ -105,7 +105,7 @@ class BECWidget(BECConnector): def cleanup(self): """Cleanup the widget.""" - with rpc_register_broadcast(self.rpc_register): + with RPCRegister.delayed_broadcast(): # All widgets need to call super().cleanup() in their cleanup method self.rpc_register.remove_rpc(self) diff --git a/bec_widgets/widgets/containers/dock/dock_area.py b/bec_widgets/widgets/containers/dock/dock_area.py index 488378c7..d561c65e 100644 --- a/bec_widgets/widgets/containers/dock/dock_area.py +++ b/bec_widgets/widgets/containers/dock/dock_area.py @@ -11,7 +11,7 @@ from qtpy.QtCore import QSize, Qt from qtpy.QtGui import QPainter, QPaintEvent from qtpy.QtWidgets import QApplication, QSizePolicy, QVBoxLayout, QWidget -from bec_widgets.cli.rpc.rpc_register import RPCRegister, rpc_register_broadcast +from bec_widgets.cli.rpc.rpc_register import RPCRegister from bec_widgets.qt_utils.error_popups import SafeSlot from bec_widgets.qt_utils.toolbar import ( ExpandableMenuAction, @@ -225,9 +225,8 @@ class BECDockArea(BECWidget, QWidget): @SafeSlot() def _create_widget_from_toolbar(self, widget_name: str) -> None: - rpc_register = RPCRegister() # Run with RPC broadcast to namespace of all widgets - with rpc_register_broadcast(rpc_register): + with RPCRegister.delayed_broadcast(): dock_name = WidgetContainerUtils.generate_unique_name(widget_name, self.panels.keys()) self.new(name=dock_name, widget=widget_name) diff --git a/bec_widgets/widgets/containers/main_window/main_window.py b/bec_widgets/widgets/containers/main_window/main_window.py index 1a821445..bc529939 100644 --- a/bec_widgets/widgets/containers/main_window/main_window.py +++ b/bec_widgets/widgets/containers/main_window/main_window.py @@ -1,7 +1,7 @@ from bec_lib.logger import bec_logger from qtpy.QtWidgets import QApplication, QMainWindow -from bec_widgets.cli.rpc.rpc_register import RPCRegister, rpc_register_broadcast +from bec_widgets.cli.rpc.rpc_register import RPCRegister from bec_widgets.utils.bec_widget import BECWidget from bec_widgets.utils.container_utils import WidgetContainerUtils from bec_widgets.widgets.containers.dock.dock_area import BECDockArea @@ -49,8 +49,7 @@ class BECMainWindow(BECWidget, QMainWindow): Returns: BECDockArea: The newly created dock area. """ - rpc_register = RPCRegister() - with rpc_register_broadcast(rpc_register): + with RPCRegister.delayed_broadcast() as rpc_register: existing_dock_areas = rpc_register.get_names_of_rpc_by_class_type(BECDockArea) if name is not None: if name in existing_dock_areas: diff --git a/tests/end-2-end/test_bec_dock_rpc_e2e.py b/tests/end-2-end/test_bec_dock_rpc_e2e.py index 0d2d5be6..82d5fac9 100644 --- a/tests/end-2-end/test_bec_dock_rpc_e2e.py +++ b/tests/end-2-end/test_bec_dock_rpc_e2e.py @@ -20,7 +20,7 @@ def test_gui_rpc_registry(qtbot, connected_client_gui_obj): dock_area = gui.new("cool_dock_area") def check_dock_area_registered(): - return dock_area._gui_id in gui._registry_state + return dock_area._gui_id in gui._server_registry qtbot.waitUntil(check_dock_area_registered, timeout=5000) assert hasattr(gui, "cool_dock_area") @@ -28,7 +28,7 @@ def test_gui_rpc_registry(qtbot, connected_client_gui_obj): dock = dock_area.new("dock_0") def check_dock_registered(): - return dock._gui_id in gui._registry_state + return dock._gui_id in gui._server_registry qtbot.waitUntil(check_dock_registered, timeout=5000) assert hasattr(gui.cool_dock_area, "dock_0") @@ -52,7 +52,7 @@ def test_rpc_add_dock_with_figure_e2e(qtbot, bec_client_lib, connected_client_gu # Check that callback for dock_registry is done def check_docks_registered(): return all( - [gui_id in gui._registry_state for gui_id in [d0._gui_id, d1._gui_id, d2._gui_id]] + [gui_id in gui._server_registry for gui_id in [d0._gui_id, d1._gui_id, d2._gui_id]] ) # Waii until docks are registered @@ -68,7 +68,10 @@ def test_rpc_add_dock_with_figure_e2e(qtbot, bec_client_lib, connected_client_gu def check_figs_registered(): return all( - [gui_id in gui._registry_state for gui_id in [fig0._gui_id, fig1._gui_id, fig2._gui_id]] + [ + gui_id in gui._server_registry + for gui_id in [fig0._gui_id, fig1._gui_id, fig2._gui_id] + ] ) qtbot.waitUntil(check_figs_registered, timeout=5000)