0
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2025-07-14 03:31:50 +02:00

refactor(rpc_server): add type hints and docstrings for heartbeat and registry update methods

This commit is contained in:
2025-04-11 13:37:42 +02:00
parent 125afc8907
commit 08168f28d3

View File

@ -4,7 +4,7 @@ import functools
import traceback import traceback
import types import types
from contextlib import contextmanager from contextlib import contextmanager
from typing import TYPE_CHECKING, TypeVar from typing import TYPE_CHECKING, Callable, TypeVar
from bec_lib.client import BECClient from bec_lib.client import BECClient
from bec_lib.endpoints import MessageEndpoints from bec_lib.endpoints import MessageEndpoints
@ -163,7 +163,11 @@ class RPCServer:
return None return None
return self._serialize_bec_connector(obj) return self._serialize_bec_connector(obj)
def emit_heartbeat(self): def emit_heartbeat(self) -> None:
"""
Emit a heartbeat message to the GUI server.
This method is called periodically to indicate that the server is still running.
"""
logger.trace(f"Emitting heartbeat for {self.gui_id}") logger.trace(f"Emitting heartbeat for {self.gui_id}")
try: try:
self.client.connector.set( self.client.connector.set(
@ -174,7 +178,11 @@ class RPCServer:
except RedisError as exc: except RedisError as exc:
logger.error(f"Error while emitting heartbeat: {exc}") logger.error(f"Error while emitting heartbeat: {exc}")
def broadcast_registry_update(self, connections: dict): def broadcast_registry_update(self, connections: dict) -> None:
"""
Broadcast the registry update to all the callbacks.
This method is called whenever the registry is updated.
"""
data = {} data = {}
for key, val in connections.items(): for key, val in connections.items():
if not isinstance(val, BECConnector): if not isinstance(val, BECConnector):
@ -208,12 +216,11 @@ class RPCServer:
} }
@staticmethod @staticmethod
def _get_becwidget_ancestor(widget): def _get_becwidget_ancestor(widget) -> BECConnector | None:
""" """
Traverse up the parent chain to find the nearest BECConnector. Traverse up the parent chain to find the nearest BECConnector.
Returns None if none is found. Returns None if none is found.
""" """
from bec_widgets.utils import BECConnector
parent = widget.parent() parent = widget.parent()
while parent is not None: while parent is not None:
@ -223,7 +230,15 @@ class RPCServer:
return None return None
# Suppose clients register callbacks to receive updates # Suppose clients register callbacks to receive updates
def add_registry_update_callback(self, cb): def add_registry_update_callback(self, cb: Callable) -> None:
"""
Add a callback to be called whenever the registry is updated.
The specified callback is called whenever the registry is updated.
Args:
cb (Callable): The callback to be added. It should accept a dictionary of all the
registered RPC objects as an argument.
"""
self._registry_update_callbacks.append(cb) self._registry_update_callbacks.append(cb)
def shutdown(self): # TODO not sure if needed when cleanup is done at level of BECConnector def shutdown(self): # TODO not sure if needed when cleanup is done at level of BECConnector