mirror of
https://github.com/bec-project/bec_widgets.git
synced 2025-07-13 19:21:50 +02:00
fix: broadcast context manager to emit registry changes just once
This commit is contained in:
@ -192,8 +192,6 @@ class AvailableWidgetsNamespace:
|
||||
class BECGuiClient(RPCBase):
|
||||
"""BEC GUI client class. Container for GUI applications within Python."""
|
||||
|
||||
_top_level: dict[str, client.BECDockArea] = {}
|
||||
|
||||
def __init__(self, **kwargs) -> None:
|
||||
super().__init__(**kwargs)
|
||||
self._lock = Lock()
|
||||
@ -201,6 +199,7 @@ class BECGuiClient(RPCBase):
|
||||
self._auto_updates_enabled = True
|
||||
self._auto_updates = None
|
||||
self._killed = False
|
||||
self._top_level: dict[str, client.BECDockArea] = {}
|
||||
self._startup_timeout = 0
|
||||
self._gui_started_timer = None
|
||||
self._gui_started_event = threading.Event()
|
||||
|
@ -12,7 +12,7 @@ from bec_lib.utils.import_utils import lazy_import, lazy_import_from
|
||||
|
||||
import bec_widgets.cli.client as client
|
||||
|
||||
if TYPE_CHECKING:
|
||||
if TYPE_CHECKING: # pragma: no cover
|
||||
from bec_lib import messages
|
||||
from bec_lib.connector import MessageObject
|
||||
else:
|
||||
|
@ -1,6 +1,5 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from contextlib import contextmanager
|
||||
from functools import wraps
|
||||
from threading import Lock
|
||||
from typing import TYPE_CHECKING, Callable
|
||||
@ -18,26 +17,16 @@ if TYPE_CHECKING: # pragma: no cover
|
||||
logger = bec_logger.logger
|
||||
|
||||
|
||||
@contextmanager
|
||||
def rpc_register_broadcast(rpc_register):
|
||||
"""
|
||||
Context manager to broadcast updates to the RPCRegister whenever a new RPC object is added or removed.
|
||||
"""
|
||||
try:
|
||||
yield rpc_register
|
||||
finally:
|
||||
rpc_register.broadcast()
|
||||
|
||||
|
||||
def broadcast_update(func):
|
||||
"""
|
||||
Decorator to broadcast updates to the RPCRegister whenever a new RPC object is added or removed.
|
||||
If class attribute _skip_broadcast is set to True, the broadcast will be skipped
|
||||
"""
|
||||
|
||||
@wraps(func)
|
||||
def wrapper(self, *args, **kwargs):
|
||||
result = func(self, *args, **kwargs)
|
||||
# self.broadcast()
|
||||
self.broadcast()
|
||||
return result
|
||||
|
||||
return wrapper
|
||||
@ -51,6 +40,7 @@ class RPCRegister:
|
||||
_instance = None
|
||||
_initialized = False
|
||||
_lock = Lock()
|
||||
_skip_broadcast = False
|
||||
|
||||
def __new__(cls, *args, **kwargs):
|
||||
if cls._instance is None:
|
||||
@ -62,9 +52,18 @@ class RPCRegister:
|
||||
if self._initialized:
|
||||
return
|
||||
self._rpc_register = WeakValueDictionary()
|
||||
self._broadcast_on_hold = RPCRegisterBroadcast(self)
|
||||
self._initialized = True
|
||||
self.callbacks = []
|
||||
|
||||
@classmethod
|
||||
def delayed_broadcast(cls):
|
||||
"""
|
||||
Delay the broadcast of the update to all the callbacks.
|
||||
"""
|
||||
register = cls()
|
||||
return register._broadcast_on_hold
|
||||
|
||||
@broadcast_update
|
||||
def add_rpc(self, rpc: QObject):
|
||||
"""
|
||||
@ -130,6 +129,9 @@ class RPCRegister:
|
||||
"""
|
||||
Broadcast the update to all the callbacks.
|
||||
"""
|
||||
|
||||
if self._skip_broadcast:
|
||||
return
|
||||
connections = self.list_all_connections()
|
||||
for callback in self.callbacks:
|
||||
callback(connections)
|
||||
@ -151,3 +153,24 @@ class RPCRegister:
|
||||
"""
|
||||
cls._instance = None
|
||||
cls._initialized = False
|
||||
|
||||
|
||||
class RPCRegisterBroadcast:
|
||||
"""Context manager for RPCRegister broadcast."""
|
||||
|
||||
def __init__(self, rpc_register: RPCRegister) -> None:
|
||||
self.rpc_register = rpc_register
|
||||
self._call_depth = 0
|
||||
|
||||
def __enter__(self):
|
||||
"""Enter the context manager"""
|
||||
self._call_depth += 1 # Needed for nested calls
|
||||
self.rpc_register._skip_broadcast = True
|
||||
return self.rpc_register
|
||||
|
||||
def __exit__(self, *exc):
|
||||
"""Exit the context manager"""
|
||||
self._call_depth -= 1 # Remove nested calls
|
||||
if self._call_depth == 0: # Last one to exit is repsonsible for broadcasting
|
||||
self.rpc_register._skip_broadcast = False
|
||||
self.rpc_register.broadcast()
|
||||
|
@ -15,7 +15,7 @@ from bec_lib.utils.import_utils import lazy_import
|
||||
from qtpy.QtCore import Qt, QTimer
|
||||
from redis.exceptions import RedisError
|
||||
|
||||
from bec_widgets.cli.rpc.rpc_register import RPCRegister, rpc_register_broadcast
|
||||
from bec_widgets.cli.rpc.rpc_register import RPCRegister
|
||||
from bec_widgets.qt_utils.error_popups import ErrorPopupUtility
|
||||
from bec_widgets.utils import BECDispatcher
|
||||
from bec_widgets.utils.bec_connector import BECConnector
|
||||
@ -80,7 +80,7 @@ class BECWidgetsCLIServer:
|
||||
self._heartbeat_timer.start(200)
|
||||
|
||||
self.status = messages.BECStatus.RUNNING
|
||||
with rpc_register_broadcast(self.rpc_register):
|
||||
with RPCRegister.delayed_broadcast():
|
||||
self.gui = gui_class(parent=None, name=gui_class_id, gui_id=gui_class_id)
|
||||
logger.success(f"Server started with gui_id: {self.gui_id}")
|
||||
# Create initial object -> BECFigure or BECDockArea
|
||||
@ -118,7 +118,7 @@ class BECWidgetsCLIServer:
|
||||
|
||||
def run_rpc(self, obj, method, args, kwargs):
|
||||
# Run with rpc registry broadcast, but only once
|
||||
with rpc_register_broadcast(self.rpc_register):
|
||||
with RPCRegister.delayed_broadcast():
|
||||
logger.debug(f"Running RPC instruction: {method} with args: {args}, kwargs: {kwargs}")
|
||||
method_obj = getattr(obj, method)
|
||||
# check if the method accepts args and kwargs
|
||||
@ -186,7 +186,9 @@ class BECWidgetsCLIServer:
|
||||
self.status = messages.BECStatus.IDLE
|
||||
self._heartbeat_timer.stop()
|
||||
self.emit_heartbeat()
|
||||
logger.info(f"Shutting down app with {self.gui.gui_id}")
|
||||
self.gui.close()
|
||||
logger.info("Succeded in shutting down gui")
|
||||
self.client.shutdown()
|
||||
|
||||
|
||||
@ -317,13 +319,14 @@ def main():
|
||||
# display message, for people to let it terminate gracefully
|
||||
print("Caught SIGINT, exiting")
|
||||
# Close all widgets
|
||||
rpc_register = RPCRegister()
|
||||
with rpc_register_broadcast(rpc_register):
|
||||
with RPCRegister.delayed_broadcast():
|
||||
for widget in QApplication.instance().topLevelWidgets():
|
||||
widget.close()
|
||||
|
||||
app.quit()
|
||||
|
||||
# gui.bec.close()
|
||||
# win.shutdown()
|
||||
signal.signal(signal.SIGINT, sigint_handler)
|
||||
signal.signal(signal.SIGTERM, sigint_handler)
|
||||
|
||||
|
@ -7,7 +7,7 @@ from bec_lib.logger import bec_logger
|
||||
from qtpy.QtCore import Slot
|
||||
from qtpy.QtWidgets import QApplication, QWidget
|
||||
|
||||
from bec_widgets.cli.rpc.rpc_register import rpc_register_broadcast
|
||||
from bec_widgets.cli.rpc.rpc_register import RPCRegister
|
||||
from bec_widgets.utils.bec_connector import BECConnector, ConnectionConfig
|
||||
from bec_widgets.utils.colors import set_theme
|
||||
|
||||
@ -105,7 +105,7 @@ class BECWidget(BECConnector):
|
||||
|
||||
def cleanup(self):
|
||||
"""Cleanup the widget."""
|
||||
with rpc_register_broadcast(self.rpc_register):
|
||||
with RPCRegister.delayed_broadcast():
|
||||
# All widgets need to call super().cleanup() in their cleanup method
|
||||
self.rpc_register.remove_rpc(self)
|
||||
|
||||
|
@ -11,7 +11,7 @@ from qtpy.QtCore import QSize, Qt
|
||||
from qtpy.QtGui import QPainter, QPaintEvent
|
||||
from qtpy.QtWidgets import QApplication, QSizePolicy, QVBoxLayout, QWidget
|
||||
|
||||
from bec_widgets.cli.rpc.rpc_register import RPCRegister, rpc_register_broadcast
|
||||
from bec_widgets.cli.rpc.rpc_register import RPCRegister
|
||||
from bec_widgets.qt_utils.error_popups import SafeSlot
|
||||
from bec_widgets.qt_utils.toolbar import (
|
||||
ExpandableMenuAction,
|
||||
@ -225,9 +225,8 @@ class BECDockArea(BECWidget, QWidget):
|
||||
|
||||
@SafeSlot()
|
||||
def _create_widget_from_toolbar(self, widget_name: str) -> None:
|
||||
rpc_register = RPCRegister()
|
||||
# Run with RPC broadcast to namespace of all widgets
|
||||
with rpc_register_broadcast(rpc_register):
|
||||
with RPCRegister.delayed_broadcast():
|
||||
dock_name = WidgetContainerUtils.generate_unique_name(widget_name, self.panels.keys())
|
||||
self.new(name=dock_name, widget=widget_name)
|
||||
|
||||
|
@ -1,7 +1,7 @@
|
||||
from bec_lib.logger import bec_logger
|
||||
from qtpy.QtWidgets import QApplication, QMainWindow
|
||||
|
||||
from bec_widgets.cli.rpc.rpc_register import RPCRegister, rpc_register_broadcast
|
||||
from bec_widgets.cli.rpc.rpc_register import RPCRegister
|
||||
from bec_widgets.utils.bec_widget import BECWidget
|
||||
from bec_widgets.utils.container_utils import WidgetContainerUtils
|
||||
from bec_widgets.widgets.containers.dock.dock_area import BECDockArea
|
||||
@ -49,8 +49,7 @@ class BECMainWindow(BECWidget, QMainWindow):
|
||||
Returns:
|
||||
BECDockArea: The newly created dock area.
|
||||
"""
|
||||
rpc_register = RPCRegister()
|
||||
with rpc_register_broadcast(rpc_register):
|
||||
with RPCRegister.delayed_broadcast() as rpc_register:
|
||||
existing_dock_areas = rpc_register.get_names_of_rpc_by_class_type(BECDockArea)
|
||||
if name is not None:
|
||||
if name in existing_dock_areas:
|
||||
|
@ -20,7 +20,7 @@ def test_gui_rpc_registry(qtbot, connected_client_gui_obj):
|
||||
dock_area = gui.new("cool_dock_area")
|
||||
|
||||
def check_dock_area_registered():
|
||||
return dock_area._gui_id in gui._registry_state
|
||||
return dock_area._gui_id in gui._server_registry
|
||||
|
||||
qtbot.waitUntil(check_dock_area_registered, timeout=5000)
|
||||
assert hasattr(gui, "cool_dock_area")
|
||||
@ -28,7 +28,7 @@ def test_gui_rpc_registry(qtbot, connected_client_gui_obj):
|
||||
dock = dock_area.new("dock_0")
|
||||
|
||||
def check_dock_registered():
|
||||
return dock._gui_id in gui._registry_state
|
||||
return dock._gui_id in gui._server_registry
|
||||
|
||||
qtbot.waitUntil(check_dock_registered, timeout=5000)
|
||||
assert hasattr(gui.cool_dock_area, "dock_0")
|
||||
@ -52,7 +52,7 @@ def test_rpc_add_dock_with_figure_e2e(qtbot, bec_client_lib, connected_client_gu
|
||||
# Check that callback for dock_registry is done
|
||||
def check_docks_registered():
|
||||
return all(
|
||||
[gui_id in gui._registry_state for gui_id in [d0._gui_id, d1._gui_id, d2._gui_id]]
|
||||
[gui_id in gui._server_registry for gui_id in [d0._gui_id, d1._gui_id, d2._gui_id]]
|
||||
)
|
||||
|
||||
# Waii until docks are registered
|
||||
@ -68,7 +68,10 @@ def test_rpc_add_dock_with_figure_e2e(qtbot, bec_client_lib, connected_client_gu
|
||||
|
||||
def check_figs_registered():
|
||||
return all(
|
||||
[gui_id in gui._registry_state for gui_id in [fig0._gui_id, fig1._gui_id, fig2._gui_id]]
|
||||
[
|
||||
gui_id in gui._server_registry
|
||||
for gui_id in [fig0._gui_id, fig1._gui_id, fig2._gui_id]
|
||||
]
|
||||
)
|
||||
|
||||
qtbot.waitUntil(check_figs_registered, timeout=5000)
|
||||
|
Reference in New Issue
Block a user