diff --git a/bec_widgets/utils/rpc_server.py b/bec_widgets/utils/rpc_server.py index 1b831e05..ef458539 100644 --- a/bec_widgets/utils/rpc_server.py +++ b/bec_widgets/utils/rpc_server.py @@ -4,7 +4,7 @@ import functools import traceback import types from contextlib import contextmanager -from typing import TYPE_CHECKING, TypeVar +from typing import TYPE_CHECKING, Callable, TypeVar from bec_lib.client import BECClient from bec_lib.endpoints import MessageEndpoints @@ -163,7 +163,11 @@ class RPCServer: return None return self._serialize_bec_connector(obj) - def emit_heartbeat(self): + def emit_heartbeat(self) -> None: + """ + Emit a heartbeat message to the GUI server. + This method is called periodically to indicate that the server is still running. + """ logger.trace(f"Emitting heartbeat for {self.gui_id}") try: self.client.connector.set( @@ -174,7 +178,11 @@ class RPCServer: except RedisError as exc: logger.error(f"Error while emitting heartbeat: {exc}") - def broadcast_registry_update(self, connections: dict): + def broadcast_registry_update(self, connections: dict) -> None: + """ + Broadcast the registry update to all the callbacks. + This method is called whenever the registry is updated. + """ data = {} for key, val in connections.items(): if not isinstance(val, BECConnector): @@ -208,12 +216,11 @@ class RPCServer: } @staticmethod - def _get_becwidget_ancestor(widget): + def _get_becwidget_ancestor(widget) -> BECConnector | None: """ Traverse up the parent chain to find the nearest BECConnector. Returns None if none is found. """ - from bec_widgets.utils import BECConnector parent = widget.parent() while parent is not None: @@ -223,7 +230,15 @@ class RPCServer: return None # Suppose clients register callbacks to receive updates - def add_registry_update_callback(self, cb): + def add_registry_update_callback(self, cb: Callable) -> None: + """ + Add a callback to be called whenever the registry is updated. + The specified callback is called whenever the registry is updated. + + Args: + cb (Callable): The callback to be added. It should accept a dictionary of all the + registered RPC objects as an argument. + """ self._registry_update_callbacks.append(cb) def shutdown(self): # TODO not sure if needed when cleanup is done at level of BECConnector