mirror of
https://github.com/bec-project/bec_widgets.git
synced 2025-07-14 11:41:49 +02:00
fix: broadcast context manager to emit registry changes just once
This commit is contained in:
@ -192,8 +192,6 @@ class AvailableWidgetsNamespace:
|
|||||||
class BECGuiClient(RPCBase):
|
class BECGuiClient(RPCBase):
|
||||||
"""BEC GUI client class. Container for GUI applications within Python."""
|
"""BEC GUI client class. Container for GUI applications within Python."""
|
||||||
|
|
||||||
_top_level: dict[str, client.BECDockArea] = {}
|
|
||||||
|
|
||||||
def __init__(self, **kwargs) -> None:
|
def __init__(self, **kwargs) -> None:
|
||||||
super().__init__(**kwargs)
|
super().__init__(**kwargs)
|
||||||
self._lock = Lock()
|
self._lock = Lock()
|
||||||
@ -201,6 +199,7 @@ class BECGuiClient(RPCBase):
|
|||||||
self._auto_updates_enabled = True
|
self._auto_updates_enabled = True
|
||||||
self._auto_updates = None
|
self._auto_updates = None
|
||||||
self._killed = False
|
self._killed = False
|
||||||
|
self._top_level: dict[str, client.BECDockArea] = {}
|
||||||
self._startup_timeout = 0
|
self._startup_timeout = 0
|
||||||
self._gui_started_timer = None
|
self._gui_started_timer = None
|
||||||
self._gui_started_event = threading.Event()
|
self._gui_started_event = threading.Event()
|
||||||
|
@ -12,7 +12,7 @@ from bec_lib.utils.import_utils import lazy_import, lazy_import_from
|
|||||||
|
|
||||||
import bec_widgets.cli.client as client
|
import bec_widgets.cli.client as client
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING: # pragma: no cover
|
||||||
from bec_lib import messages
|
from bec_lib import messages
|
||||||
from bec_lib.connector import MessageObject
|
from bec_lib.connector import MessageObject
|
||||||
else:
|
else:
|
||||||
|
@ -1,6 +1,5 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from contextlib import contextmanager
|
|
||||||
from functools import wraps
|
from functools import wraps
|
||||||
from threading import Lock
|
from threading import Lock
|
||||||
from typing import TYPE_CHECKING, Callable
|
from typing import TYPE_CHECKING, Callable
|
||||||
@ -18,26 +17,16 @@ if TYPE_CHECKING: # pragma: no cover
|
|||||||
logger = bec_logger.logger
|
logger = bec_logger.logger
|
||||||
|
|
||||||
|
|
||||||
@contextmanager
|
|
||||||
def rpc_register_broadcast(rpc_register):
|
|
||||||
"""
|
|
||||||
Context manager to broadcast updates to the RPCRegister whenever a new RPC object is added or removed.
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
yield rpc_register
|
|
||||||
finally:
|
|
||||||
rpc_register.broadcast()
|
|
||||||
|
|
||||||
|
|
||||||
def broadcast_update(func):
|
def broadcast_update(func):
|
||||||
"""
|
"""
|
||||||
Decorator to broadcast updates to the RPCRegister whenever a new RPC object is added or removed.
|
Decorator to broadcast updates to the RPCRegister whenever a new RPC object is added or removed.
|
||||||
|
If class attribute _skip_broadcast is set to True, the broadcast will be skipped
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@wraps(func)
|
@wraps(func)
|
||||||
def wrapper(self, *args, **kwargs):
|
def wrapper(self, *args, **kwargs):
|
||||||
result = func(self, *args, **kwargs)
|
result = func(self, *args, **kwargs)
|
||||||
# self.broadcast()
|
self.broadcast()
|
||||||
return result
|
return result
|
||||||
|
|
||||||
return wrapper
|
return wrapper
|
||||||
@ -51,6 +40,7 @@ class RPCRegister:
|
|||||||
_instance = None
|
_instance = None
|
||||||
_initialized = False
|
_initialized = False
|
||||||
_lock = Lock()
|
_lock = Lock()
|
||||||
|
_skip_broadcast = False
|
||||||
|
|
||||||
def __new__(cls, *args, **kwargs):
|
def __new__(cls, *args, **kwargs):
|
||||||
if cls._instance is None:
|
if cls._instance is None:
|
||||||
@ -62,9 +52,18 @@ class RPCRegister:
|
|||||||
if self._initialized:
|
if self._initialized:
|
||||||
return
|
return
|
||||||
self._rpc_register = WeakValueDictionary()
|
self._rpc_register = WeakValueDictionary()
|
||||||
|
self._broadcast_on_hold = RPCRegisterBroadcast(self)
|
||||||
self._initialized = True
|
self._initialized = True
|
||||||
self.callbacks = []
|
self.callbacks = []
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def delayed_broadcast(cls):
|
||||||
|
"""
|
||||||
|
Delay the broadcast of the update to all the callbacks.
|
||||||
|
"""
|
||||||
|
register = cls()
|
||||||
|
return register._broadcast_on_hold
|
||||||
|
|
||||||
@broadcast_update
|
@broadcast_update
|
||||||
def add_rpc(self, rpc: QObject):
|
def add_rpc(self, rpc: QObject):
|
||||||
"""
|
"""
|
||||||
@ -130,6 +129,9 @@ class RPCRegister:
|
|||||||
"""
|
"""
|
||||||
Broadcast the update to all the callbacks.
|
Broadcast the update to all the callbacks.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
if self._skip_broadcast:
|
||||||
|
return
|
||||||
connections = self.list_all_connections()
|
connections = self.list_all_connections()
|
||||||
for callback in self.callbacks:
|
for callback in self.callbacks:
|
||||||
callback(connections)
|
callback(connections)
|
||||||
@ -151,3 +153,24 @@ class RPCRegister:
|
|||||||
"""
|
"""
|
||||||
cls._instance = None
|
cls._instance = None
|
||||||
cls._initialized = False
|
cls._initialized = False
|
||||||
|
|
||||||
|
|
||||||
|
class RPCRegisterBroadcast:
|
||||||
|
"""Context manager for RPCRegister broadcast."""
|
||||||
|
|
||||||
|
def __init__(self, rpc_register: RPCRegister) -> None:
|
||||||
|
self.rpc_register = rpc_register
|
||||||
|
self._call_depth = 0
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
"""Enter the context manager"""
|
||||||
|
self._call_depth += 1 # Needed for nested calls
|
||||||
|
self.rpc_register._skip_broadcast = True
|
||||||
|
return self.rpc_register
|
||||||
|
|
||||||
|
def __exit__(self, *exc):
|
||||||
|
"""Exit the context manager"""
|
||||||
|
self._call_depth -= 1 # Remove nested calls
|
||||||
|
if self._call_depth == 0: # Last one to exit is repsonsible for broadcasting
|
||||||
|
self.rpc_register._skip_broadcast = False
|
||||||
|
self.rpc_register.broadcast()
|
||||||
|
@ -15,7 +15,7 @@ from bec_lib.utils.import_utils import lazy_import
|
|||||||
from qtpy.QtCore import Qt, QTimer
|
from qtpy.QtCore import Qt, QTimer
|
||||||
from redis.exceptions import RedisError
|
from redis.exceptions import RedisError
|
||||||
|
|
||||||
from bec_widgets.cli.rpc.rpc_register import RPCRegister, rpc_register_broadcast
|
from bec_widgets.cli.rpc.rpc_register import RPCRegister
|
||||||
from bec_widgets.qt_utils.error_popups import ErrorPopupUtility
|
from bec_widgets.qt_utils.error_popups import ErrorPopupUtility
|
||||||
from bec_widgets.utils import BECDispatcher
|
from bec_widgets.utils import BECDispatcher
|
||||||
from bec_widgets.utils.bec_connector import BECConnector
|
from bec_widgets.utils.bec_connector import BECConnector
|
||||||
@ -80,7 +80,7 @@ class BECWidgetsCLIServer:
|
|||||||
self._heartbeat_timer.start(200)
|
self._heartbeat_timer.start(200)
|
||||||
|
|
||||||
self.status = messages.BECStatus.RUNNING
|
self.status = messages.BECStatus.RUNNING
|
||||||
with rpc_register_broadcast(self.rpc_register):
|
with RPCRegister.delayed_broadcast():
|
||||||
self.gui = gui_class(parent=None, name=gui_class_id, gui_id=gui_class_id)
|
self.gui = gui_class(parent=None, name=gui_class_id, gui_id=gui_class_id)
|
||||||
logger.success(f"Server started with gui_id: {self.gui_id}")
|
logger.success(f"Server started with gui_id: {self.gui_id}")
|
||||||
# Create initial object -> BECFigure or BECDockArea
|
# Create initial object -> BECFigure or BECDockArea
|
||||||
@ -118,7 +118,7 @@ class BECWidgetsCLIServer:
|
|||||||
|
|
||||||
def run_rpc(self, obj, method, args, kwargs):
|
def run_rpc(self, obj, method, args, kwargs):
|
||||||
# Run with rpc registry broadcast, but only once
|
# Run with rpc registry broadcast, but only once
|
||||||
with rpc_register_broadcast(self.rpc_register):
|
with RPCRegister.delayed_broadcast():
|
||||||
logger.debug(f"Running RPC instruction: {method} with args: {args}, kwargs: {kwargs}")
|
logger.debug(f"Running RPC instruction: {method} with args: {args}, kwargs: {kwargs}")
|
||||||
method_obj = getattr(obj, method)
|
method_obj = getattr(obj, method)
|
||||||
# check if the method accepts args and kwargs
|
# check if the method accepts args and kwargs
|
||||||
@ -186,7 +186,9 @@ class BECWidgetsCLIServer:
|
|||||||
self.status = messages.BECStatus.IDLE
|
self.status = messages.BECStatus.IDLE
|
||||||
self._heartbeat_timer.stop()
|
self._heartbeat_timer.stop()
|
||||||
self.emit_heartbeat()
|
self.emit_heartbeat()
|
||||||
|
logger.info(f"Shutting down app with {self.gui.gui_id}")
|
||||||
self.gui.close()
|
self.gui.close()
|
||||||
|
logger.info("Succeded in shutting down gui")
|
||||||
self.client.shutdown()
|
self.client.shutdown()
|
||||||
|
|
||||||
|
|
||||||
@ -317,13 +319,14 @@ def main():
|
|||||||
# display message, for people to let it terminate gracefully
|
# display message, for people to let it terminate gracefully
|
||||||
print("Caught SIGINT, exiting")
|
print("Caught SIGINT, exiting")
|
||||||
# Close all widgets
|
# Close all widgets
|
||||||
rpc_register = RPCRegister()
|
with RPCRegister.delayed_broadcast():
|
||||||
with rpc_register_broadcast(rpc_register):
|
|
||||||
for widget in QApplication.instance().topLevelWidgets():
|
for widget in QApplication.instance().topLevelWidgets():
|
||||||
widget.close()
|
widget.close()
|
||||||
|
|
||||||
app.quit()
|
app.quit()
|
||||||
|
|
||||||
|
# gui.bec.close()
|
||||||
|
# win.shutdown()
|
||||||
signal.signal(signal.SIGINT, sigint_handler)
|
signal.signal(signal.SIGINT, sigint_handler)
|
||||||
signal.signal(signal.SIGTERM, sigint_handler)
|
signal.signal(signal.SIGTERM, sigint_handler)
|
||||||
|
|
||||||
|
@ -7,7 +7,7 @@ from bec_lib.logger import bec_logger
|
|||||||
from qtpy.QtCore import Slot
|
from qtpy.QtCore import Slot
|
||||||
from qtpy.QtWidgets import QApplication, QWidget
|
from qtpy.QtWidgets import QApplication, QWidget
|
||||||
|
|
||||||
from bec_widgets.cli.rpc.rpc_register import rpc_register_broadcast
|
from bec_widgets.cli.rpc.rpc_register import RPCRegister
|
||||||
from bec_widgets.utils.bec_connector import BECConnector, ConnectionConfig
|
from bec_widgets.utils.bec_connector import BECConnector, ConnectionConfig
|
||||||
from bec_widgets.utils.colors import set_theme
|
from bec_widgets.utils.colors import set_theme
|
||||||
|
|
||||||
@ -105,7 +105,7 @@ class BECWidget(BECConnector):
|
|||||||
|
|
||||||
def cleanup(self):
|
def cleanup(self):
|
||||||
"""Cleanup the widget."""
|
"""Cleanup the widget."""
|
||||||
with rpc_register_broadcast(self.rpc_register):
|
with RPCRegister.delayed_broadcast():
|
||||||
# All widgets need to call super().cleanup() in their cleanup method
|
# All widgets need to call super().cleanup() in their cleanup method
|
||||||
self.rpc_register.remove_rpc(self)
|
self.rpc_register.remove_rpc(self)
|
||||||
|
|
||||||
|
@ -11,7 +11,7 @@ from qtpy.QtCore import QSize, Qt
|
|||||||
from qtpy.QtGui import QPainter, QPaintEvent
|
from qtpy.QtGui import QPainter, QPaintEvent
|
||||||
from qtpy.QtWidgets import QApplication, QSizePolicy, QVBoxLayout, QWidget
|
from qtpy.QtWidgets import QApplication, QSizePolicy, QVBoxLayout, QWidget
|
||||||
|
|
||||||
from bec_widgets.cli.rpc.rpc_register import RPCRegister, rpc_register_broadcast
|
from bec_widgets.cli.rpc.rpc_register import RPCRegister
|
||||||
from bec_widgets.qt_utils.error_popups import SafeSlot
|
from bec_widgets.qt_utils.error_popups import SafeSlot
|
||||||
from bec_widgets.qt_utils.toolbar import (
|
from bec_widgets.qt_utils.toolbar import (
|
||||||
ExpandableMenuAction,
|
ExpandableMenuAction,
|
||||||
@ -225,9 +225,8 @@ class BECDockArea(BECWidget, QWidget):
|
|||||||
|
|
||||||
@SafeSlot()
|
@SafeSlot()
|
||||||
def _create_widget_from_toolbar(self, widget_name: str) -> None:
|
def _create_widget_from_toolbar(self, widget_name: str) -> None:
|
||||||
rpc_register = RPCRegister()
|
|
||||||
# Run with RPC broadcast to namespace of all widgets
|
# Run with RPC broadcast to namespace of all widgets
|
||||||
with rpc_register_broadcast(rpc_register):
|
with RPCRegister.delayed_broadcast():
|
||||||
dock_name = WidgetContainerUtils.generate_unique_name(widget_name, self.panels.keys())
|
dock_name = WidgetContainerUtils.generate_unique_name(widget_name, self.panels.keys())
|
||||||
self.new(name=dock_name, widget=widget_name)
|
self.new(name=dock_name, widget=widget_name)
|
||||||
|
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
from bec_lib.logger import bec_logger
|
from bec_lib.logger import bec_logger
|
||||||
from qtpy.QtWidgets import QApplication, QMainWindow
|
from qtpy.QtWidgets import QApplication, QMainWindow
|
||||||
|
|
||||||
from bec_widgets.cli.rpc.rpc_register import RPCRegister, rpc_register_broadcast
|
from bec_widgets.cli.rpc.rpc_register import RPCRegister
|
||||||
from bec_widgets.utils.bec_widget import BECWidget
|
from bec_widgets.utils.bec_widget import BECWidget
|
||||||
from bec_widgets.utils.container_utils import WidgetContainerUtils
|
from bec_widgets.utils.container_utils import WidgetContainerUtils
|
||||||
from bec_widgets.widgets.containers.dock.dock_area import BECDockArea
|
from bec_widgets.widgets.containers.dock.dock_area import BECDockArea
|
||||||
@ -49,8 +49,7 @@ class BECMainWindow(BECWidget, QMainWindow):
|
|||||||
Returns:
|
Returns:
|
||||||
BECDockArea: The newly created dock area.
|
BECDockArea: The newly created dock area.
|
||||||
"""
|
"""
|
||||||
rpc_register = RPCRegister()
|
with RPCRegister.delayed_broadcast() as rpc_register:
|
||||||
with rpc_register_broadcast(rpc_register):
|
|
||||||
existing_dock_areas = rpc_register.get_names_of_rpc_by_class_type(BECDockArea)
|
existing_dock_areas = rpc_register.get_names_of_rpc_by_class_type(BECDockArea)
|
||||||
if name is not None:
|
if name is not None:
|
||||||
if name in existing_dock_areas:
|
if name in existing_dock_areas:
|
||||||
|
@ -20,7 +20,7 @@ def test_gui_rpc_registry(qtbot, connected_client_gui_obj):
|
|||||||
dock_area = gui.new("cool_dock_area")
|
dock_area = gui.new("cool_dock_area")
|
||||||
|
|
||||||
def check_dock_area_registered():
|
def check_dock_area_registered():
|
||||||
return dock_area._gui_id in gui._registry_state
|
return dock_area._gui_id in gui._server_registry
|
||||||
|
|
||||||
qtbot.waitUntil(check_dock_area_registered, timeout=5000)
|
qtbot.waitUntil(check_dock_area_registered, timeout=5000)
|
||||||
assert hasattr(gui, "cool_dock_area")
|
assert hasattr(gui, "cool_dock_area")
|
||||||
@ -28,7 +28,7 @@ def test_gui_rpc_registry(qtbot, connected_client_gui_obj):
|
|||||||
dock = dock_area.new("dock_0")
|
dock = dock_area.new("dock_0")
|
||||||
|
|
||||||
def check_dock_registered():
|
def check_dock_registered():
|
||||||
return dock._gui_id in gui._registry_state
|
return dock._gui_id in gui._server_registry
|
||||||
|
|
||||||
qtbot.waitUntil(check_dock_registered, timeout=5000)
|
qtbot.waitUntil(check_dock_registered, timeout=5000)
|
||||||
assert hasattr(gui.cool_dock_area, "dock_0")
|
assert hasattr(gui.cool_dock_area, "dock_0")
|
||||||
@ -52,7 +52,7 @@ def test_rpc_add_dock_with_figure_e2e(qtbot, bec_client_lib, connected_client_gu
|
|||||||
# Check that callback for dock_registry is done
|
# Check that callback for dock_registry is done
|
||||||
def check_docks_registered():
|
def check_docks_registered():
|
||||||
return all(
|
return all(
|
||||||
[gui_id in gui._registry_state for gui_id in [d0._gui_id, d1._gui_id, d2._gui_id]]
|
[gui_id in gui._server_registry for gui_id in [d0._gui_id, d1._gui_id, d2._gui_id]]
|
||||||
)
|
)
|
||||||
|
|
||||||
# Waii until docks are registered
|
# Waii until docks are registered
|
||||||
@ -68,7 +68,10 @@ def test_rpc_add_dock_with_figure_e2e(qtbot, bec_client_lib, connected_client_gu
|
|||||||
|
|
||||||
def check_figs_registered():
|
def check_figs_registered():
|
||||||
return all(
|
return all(
|
||||||
[gui_id in gui._registry_state for gui_id in [fig0._gui_id, fig1._gui_id, fig2._gui_id]]
|
[
|
||||||
|
gui_id in gui._server_registry
|
||||||
|
for gui_id in [fig0._gui_id, fig1._gui_id, fig2._gui_id]
|
||||||
|
]
|
||||||
)
|
)
|
||||||
|
|
||||||
qtbot.waitUntil(check_figs_registered, timeout=5000)
|
qtbot.waitUntil(check_figs_registered, timeout=5000)
|
||||||
|
Reference in New Issue
Block a user