mirror of
https://github.com/bec-project/bec_widgets.git
synced 2025-07-14 03:31:50 +02:00
WIP cli server serialize parent id from qt parent hierarchy
This commit is contained in:
@ -11,12 +11,15 @@ from bec_lib.endpoints import MessageEndpoints
|
||||
from bec_lib.logger import bec_logger
|
||||
from bec_lib.utils.import_utils import lazy_import
|
||||
from qtpy.QtCore import QTimer
|
||||
from qtpy.QtWidgets import QApplication
|
||||
from redis.exceptions import RedisError
|
||||
|
||||
from bec_widgets.cli.rpc.rpc_register import RPCRegister
|
||||
from bec_widgets.utils import BECDispatcher
|
||||
from bec_widgets.utils.bec_connector import BECConnector
|
||||
from bec_widgets.utils.error_popups import ErrorPopupUtility
|
||||
from bec_widgets.utils.widget_io import WidgetHierarchy
|
||||
from bec_widgets.widgets.plots.plot_base import PlotBase
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from bec_lib import messages
|
||||
@ -77,6 +80,7 @@ class CLIServer:
|
||||
self._heartbeat_timer = QTimer()
|
||||
self._heartbeat_timer.timeout.connect(self.emit_heartbeat)
|
||||
self._heartbeat_timer.start(200)
|
||||
self._registry_update_callbacks = []
|
||||
|
||||
self.status = messages.BECStatus.RUNNING
|
||||
logger.success(f"Server started with gui_id: {self.gui_id}")
|
||||
@ -141,17 +145,10 @@ class CLIServer:
|
||||
|
||||
def serialize_object(self, obj):
|
||||
if isinstance(obj, BECConnector):
|
||||
config = obj.config.model_dump()
|
||||
config["parent_id"] = obj.parent_id # add parent_id to config
|
||||
return {
|
||||
"gui_id": obj.gui_id,
|
||||
"name": (
|
||||
obj._name if hasattr(obj, "_name") else obj.__class__.__name__
|
||||
), # pylint: disable=protected-access
|
||||
"widget_class": obj.__class__.__name__,
|
||||
"config": config,
|
||||
"__rpc__": True,
|
||||
}
|
||||
# Respect RPC = False
|
||||
if hasattr(obj, "RPC") and obj.RPC is False:
|
||||
return None
|
||||
return self._serialize_bec_connector(obj)
|
||||
return obj
|
||||
|
||||
def emit_heartbeat(self):
|
||||
@ -165,20 +162,85 @@ class CLIServer:
|
||||
except RedisError as exc:
|
||||
logger.error(f"Error while emitting heartbeat: {exc}")
|
||||
|
||||
def broadcast_registry_update(self, connections: dict):
|
||||
"""
|
||||
Broadcast the updated registry to all clients.
|
||||
"""
|
||||
# FIXME signature should be changed on all levels, connection dict is no longer needed
|
||||
def broadcast_registry_update(self, _):
|
||||
# 1) Gather ALL BECConnector-based widgets
|
||||
all_qwidgets = QApplication.allWidgets()
|
||||
bec_widgets = set(w for w in all_qwidgets if isinstance(w, BECConnector))
|
||||
bec_widgets = {
|
||||
c for c in bec_widgets if not (hasattr(c, "RPC") and c.RPC is False)
|
||||
} # FIXME not needed
|
||||
|
||||
# We only need to broadcast the dock areas
|
||||
data = {key: self.serialize_object(val) for key, val in connections.items()}
|
||||
logger.info(f"Broadcasting registry update: {data} for {self.gui_id}")
|
||||
# 2) Also gather BECConnector-based data items from PlotBase
|
||||
# TODO do we need to access plot data items in cli in namespace?
|
||||
for w in all_qwidgets:
|
||||
if isinstance(w, PlotBase) and hasattr(w, "plot_item"):
|
||||
if hasattr(w.plot_item, "listDataItems"):
|
||||
for data_item in w.plot_item.listDataItems():
|
||||
if isinstance(data_item, BECConnector):
|
||||
bec_widgets.add(data_item)
|
||||
|
||||
# 3) Convert each BECConnector to a JSON-like dict
|
||||
registry_data = {}
|
||||
for connector in bec_widgets:
|
||||
if not hasattr(connector, "config"):
|
||||
continue
|
||||
serialized = self._serialize_bec_connector(connector)
|
||||
registry_data[serialized["gui_id"]] = serialized
|
||||
|
||||
# 4) Broadcast the final dictionary
|
||||
for callback in self._registry_update_callbacks:
|
||||
callback(registry_data)
|
||||
|
||||
# FIXME this message is bugged and it was even before mine refactor of parent logic
|
||||
# logger.info(f"Broadcasting registry update: {registry_data} for {self.gui_id}")
|
||||
self.client.connector.xadd(
|
||||
MessageEndpoints.gui_registry_state(self.gui_id),
|
||||
msg_dict={"data": messages.GUIRegistryStateMessage(state=data)},
|
||||
max_size=1, # only single message in stream
|
||||
msg_dict={"data": messages.GUIRegistryStateMessage(state=registry_data)},
|
||||
max_size=1,
|
||||
)
|
||||
|
||||
def _serialize_bec_connector(self, connector: BECConnector) -> dict:
|
||||
"""
|
||||
Create the serialization dict for a single BECConnector,
|
||||
setting 'parent_id' via the real nearest BECConnector parent.
|
||||
"""
|
||||
|
||||
parent_id = getattr(connector, "parent_id", None)
|
||||
if parent_id is None:
|
||||
parent = self._get_becwidget_ancestor(connector)
|
||||
parent_id = parent.gui_id if parent else None
|
||||
|
||||
config_dict = connector.config.model_dump()
|
||||
config_dict["parent_id"] = parent_id
|
||||
|
||||
return {
|
||||
"gui_id": connector.gui_id,
|
||||
"name": connector._name or connector.__class__.__name__,
|
||||
"widget_class": connector.__class__.__name__,
|
||||
"config": config_dict,
|
||||
"__rpc__": True,
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def _get_becwidget_ancestor(widget):
|
||||
"""
|
||||
Traverse up the parent chain to find the nearest BECConnector.
|
||||
Returns None if none is found.
|
||||
"""
|
||||
from bec_widgets.utils import BECConnector
|
||||
|
||||
parent = widget.parent()
|
||||
while parent is not None:
|
||||
if isinstance(parent, BECConnector):
|
||||
return parent
|
||||
parent = parent.parent()
|
||||
return None
|
||||
|
||||
# Suppose clients register callbacks to receive updates
|
||||
def add_registry_update_callback(self, cb):
|
||||
self._registry_update_callbacks.append(cb)
|
||||
|
||||
def shutdown(self): # TODO not sure if needed when cleanup is done at level of BECConnector
|
||||
self.status = messages.BECStatus.IDLE
|
||||
self._heartbeat_timer.stop()
|
||||
|
Reference in New Issue
Block a user