0
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2025-07-14 03:31:50 +02:00

feat(rpc/rpc_register): singleton rpc register for all rpc connections for session

This commit is contained in:
2024-04-24 18:32:38 +02:00
parent 6d13a3283b
commit a898e7e4f1
6 changed files with 74 additions and 46 deletions

View File

@ -0,0 +1,41 @@
from weakref import WeakValueDictionary
class RPCRegister:
_instance = None
_initialized = False
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super(RPCRegister, cls).__new__(cls)
cls._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
self.rpc_register = WeakValueDictionary()
self._initialized = True
def add_rpc(self, rpc):
if not hasattr(rpc, "gui_id"):
raise ValueError("RPC object must have a 'gui_id' attribute.")
self.rpc_register[rpc.gui_id] = rpc
def remove_rpc(self, rpc):
if not hasattr(rpc, "gui_id"):
raise ValueError(f"RPC object {rpc} must have a 'gui_id' attribute.")
self.rpc_register.pop(rpc.gui_id, None)
def get_rpc_by_id(self, gui_id):
rpc_object = self.rpc_register.get(gui_id, None)
print(f"get rpc by id: {rpc_object}")
return rpc_object
def list_all_connections(self):
return self.rpc_register
@classmethod
def reset_singleton(cls):
cls._instance = None
cls._initialized = False

View File

@ -5,6 +5,7 @@ import time
from bec_lib import MessageEndpoints, messages
from qtpy.QtCore import QTimer
from bec_widgets.cli.rpc_register import RPCRegister
from bec_widgets.utils import BECDispatcher
from bec_widgets.utils.bec_connector import BECConnector
from bec_widgets.widgets.figure import BECFigure
@ -20,6 +21,8 @@ class BECWidgetsCLIServer:
self.client.start()
self.gui_id = gui_id
self.fig = BECFigure(gui_id=self.gui_id)
self.rpc_register = RPCRegister()
self.rpc_register.add_rpc(self.fig)
self.dispatcher.connect_slot(
self.on_rpc_update, MessageEndpoints.gui_instructions(self.gui_id)
@ -54,20 +57,10 @@ class BECWidgetsCLIServer:
def get_object_from_config(self, config: dict):
gui_id = config.get("gui_id")
# check if the object is the figure
if gui_id == self.fig.gui_id:
return self.fig
# check if the object is a widget
if gui_id in self.fig._widgets:
obj = self.fig._widgets[config["gui_id"]]
return obj
if self.fig._widgets:
for widget in self.fig._widgets.values():
item = widget.find_widget_by_id(gui_id)
if item:
return item
raise ValueError(f"Object with gui_id {gui_id} not found")
obj = self.rpc_register.get_rpc_by_id(gui_id)
if obj is None:
raise ValueError(f"Object with gui_id {gui_id} not found")
return obj
def run_rpc(self, obj, method, args, kwargs):
method_obj = getattr(obj, method)
@ -106,7 +99,6 @@ class BECWidgetsCLIServer:
messages.StatusMessage(name=self.gui_id, status=1, info={}),
expire=10,
)
print("Heartbeat emitted")
def shutdown(self):
self._shutdown_event = True

View File

@ -7,6 +7,7 @@ from qtconsole.inprocess import QtInProcessKernelManager
from qtconsole.rich_jupyter_widget import RichJupyterWidget
from qtpy.QtWidgets import QApplication, QVBoxLayout, QWidget
from bec_widgets.cli.rpc_register import RPCRegister
from bec_widgets.utils import BECDispatcher
from bec_widgets.widgets import BECFigure
@ -43,10 +44,14 @@ class JupyterConsoleWindow(QWidget): # pragma: no cover:
self.safe_close = False
# self.figure.clean_signal.connect(self.confirm_close)
self.register = RPCRegister()
self.register.add_rpc(self.figure)
print("Registered objects:", dict(self.register.list_all_connections()))
# console push
self.console.kernel_manager.kernel.shell.push(
{
"fig": self.figure,
"register": self.register,
"w1": self.w1,
"w2": self.w2,
"w3": self.w3,

View File

@ -7,6 +7,7 @@ from typing import Optional, Type
from pydantic import BaseModel, Field, field_validator
from qtpy.QtCore import Slot as pyqtSlot
from bec_widgets.cli.rpc_register import RPCRegister
from bec_widgets.utils.bec_dispatcher import BECDispatcher
@ -31,7 +32,7 @@ class ConnectionConfig(BaseModel):
class BECConnector:
"""Connection mixin class for all BEC widgets, to handle BEC client and device manager"""
USER_ACCESS = ["config_dict"]
USER_ACCESS = ["config_dict", "get_all_rpc"]
def __init__(self, client=None, config: ConnectionConfig = None, gui_id: str = None):
# BEC related connections
@ -54,6 +55,25 @@ class BECConnector:
else:
self.gui_id = self.config.gui_id
# register widget to rpc register
self.rpc_register = RPCRegister()
self.rpc_register.add_rpc(self)
def get_all_rpc(self) -> dict:
"""Get all registered RPC objects."""
all_connections = self.rpc_register.list_all_connections()
return dict(all_connections)
@property
def rpc_id(self) -> str:
"""Get the RPC ID of the widget."""
return self.gui_id
@rpc_id.setter
def rpc_id(self, rpc_id: str) -> None:
"""Set the RPC ID of the widget."""
self.gui_id = rpc_id
@property
def config_dict(self) -> dict:
"""

View File

@ -358,20 +358,6 @@ class BECImageShow(BECPlotBase):
thread.start()
def find_widget_by_id(self, item_id: str) -> BECImageItem:
"""
Find the widget by its gui_id.
Args:
item_id(str): The gui_id of the widget.
Returns:
BECImageItem: The widget with the given gui_id.
"""
for source, images in self._images.items():
for monitor, image_item in images.items():
if image_item.gui_id == item_id:
return image_item
def find_image_by_monitor(self, item_id: str) -> BECImageItem:
"""
Find the widget by its gui_id.
@ -719,10 +705,8 @@ class BECImageShow(BECPlotBase):
processing_config = image_to_update.config.processing
self.processor.set_config(processing_config)
if self.use_threading:
print("using threaded version")
self._create_thread_worker(device, data)
else:
print("using NON-threaded version")
data = self.processor.process_image(data)
self.update_image(device, data)
@ -809,7 +793,6 @@ class BECImageShow(BECPlotBase):
"""
Clean up the widget.
"""
print(f"Cleaning up {self.gui_id}")
# for monitor in self._images["device_monitor"]:
# self.bec_dispatcher.disconnect_slot(
# self.on_image_update, MessageEndpoints.device_monitor(monitor)

View File

@ -286,19 +286,6 @@ class BECWaveform(BECPlotBase):
self.add_legend()
self.apply_config(self.config)
def find_widget_by_id(self, item_id: str) -> BECCurve:
"""
Find the curve by its ID.
Args:
item_id(str): ID of the curve.
Returns:
BECCurve: The curve object.
"""
for curve in self.plot_item.curves:
if curve.gui_id == item_id:
return curve
def apply_config(self, config: dict | WidgetConfig, replot_last_scan: bool = False):
"""
Apply the configuration to the 1D waveform widget.