refactor(rpc): share logging helpers

This commit is contained in:
2026-05-22 15:27:20 +02:00
committed by Jan Wyzula
parent 878745b99a
commit 8e1e282fac
4 changed files with 45 additions and 73 deletions
+4 -12
View File
@@ -13,6 +13,8 @@ from bec_lib.endpoints import MessageEndpoints
from bec_lib.logger import bec_logger
from bec_lib.utils.import_utils import lazy_import, lazy_import_from
from bec_widgets.utils.rpc_logging import format_rpc_payload
if TYPE_CHECKING: # pragma: no cover
from bec_lib import messages
from bec_lib.connector import MessageObject
@@ -44,16 +46,6 @@ def _transform_args_kwargs(args, kwargs) -> tuple[tuple, dict]:
return tuple(_name_arg(arg) for arg in args), {k: _name_arg(v) for k, v in kwargs.items()}
def _format_rpc_payload(value: Any, limit: int = 500) -> str:
try:
text = repr(value)
except Exception as exc: # pragma: no cover - defensive logging helper
text = f"<unrepresentable {type(value).__name__}: {exc}>"
if len(text) <= limit:
return text
return f"{text[:limit]}...<truncated {len(text) - limit} chars>"
def rpc_timeout(timeout):
"""
A decorator to set a timeout for an RPC call.
@@ -275,8 +267,8 @@ class RPCBase:
)
target_gui_id = gui_id or self._gui_id
args_log = _format_rpc_payload(args)
kwargs_log = _format_rpc_payload(kwargs)
args_log = format_rpc_payload(args)
kwargs_log = format_rpc_payload(kwargs)
sent_at = time.time()
deadline = sent_at + timeout if timeout is not None else None
rpc_msg.metadata.update(
+6 -30
View File
@@ -16,6 +16,7 @@ from bec_lib.service_config import ServiceConfig
from qtpy.QtCore import QObject
from qtpy.QtCore import Signal as pyqtSignal
from bec_widgets.utils.rpc_logging import elapsed_seconds, format_elapsed, format_rpc_payload
from bec_widgets.utils.serialization import register_serializer_extension
logger = bec_logger.logger
@@ -26,31 +27,6 @@ if TYPE_CHECKING: # pragma: no cover
from bec_widgets.utils.rpc_server import RPCServer
def _format_rpc_payload(value: Any, limit: int = 500) -> str:
try:
text = repr(value)
except Exception as exc: # pragma: no cover - defensive logging helper
text = f"<unrepresentable {type(value).__name__}: {exc}>"
if len(text) <= limit:
return text
return f"{text[:limit]}...<truncated {len(text) - limit} chars>"
def _elapsed_seconds(start: float | int | None, stop: float) -> float | None:
if start is None:
return None
try:
return max(0.0, stop - float(start))
except (TypeError, ValueError):
return None
def _format_elapsed(elapsed: float | None) -> str:
if elapsed is None:
return "unknown"
return f"{elapsed:.3f}"
def _log_rpc_dispatcher_receive(msg_content: Any, metadata: Any) -> None:
if not isinstance(msg_content, dict) or not isinstance(metadata, dict):
return
@@ -64,17 +40,17 @@ def _log_rpc_dispatcher_receive(msg_content: Any, metadata: Any) -> None:
sent_at = metadata.get("sent_at")
deadline = metadata.get("deadline")
timeout = metadata.get("timeout")
dispatch_latency = _elapsed_seconds(sent_at, dispatch_received_at)
dispatch_latency = elapsed_seconds(sent_at, dispatch_received_at)
stale_on_dispatch = deadline is not None and dispatch_received_at > deadline
target_gui_id = parameter.get("gui_id") or metadata.get("target_gui_id")
args_log = _format_rpc_payload(parameter.get("args", []))
kwargs_log = _format_rpc_payload(parameter.get("kwargs", {}))
args_log = format_rpc_payload(parameter.get("args", []))
kwargs_log = format_rpc_payload(parameter.get("kwargs", {}))
logger.info(
"GUI RPC dispatcher received request before Qt callback emit "
f"request_id={request_id} method={method} receiver={metadata.get('receiver')} "
f"target_gui_id={target_gui_id} object_name={metadata.get('object_name')} "
f"timeout={timeout} dispatch_latency_s={_format_elapsed(dispatch_latency)} "
f"timeout={timeout} dispatch_latency_s={format_elapsed(dispatch_latency)} "
f"stale_on_dispatch={stale_on_dispatch} args={args_log} kwargs={kwargs_log}"
)
if stale_on_dispatch:
@@ -82,7 +58,7 @@ def _log_rpc_dispatcher_receive(msg_content: Any, metadata: Any) -> None:
"GUI RPC dispatcher received request after client timeout deadline "
f"request_id={request_id} method={method} receiver={metadata.get('receiver')} "
f"target_gui_id={target_gui_id} object_name={metadata.get('object_name')} "
f"timeout={timeout} dispatch_latency_s={_format_elapsed(dispatch_latency)} "
f"timeout={timeout} dispatch_latency_s={format_elapsed(dispatch_latency)} "
f"args={args_log} kwargs={kwargs_log}"
)
+28
View File
@@ -0,0 +1,28 @@
from __future__ import annotations
from typing import Any
def format_rpc_payload(value: Any, limit: int = 500) -> str:
try:
text = repr(value)
except Exception as exc: # pragma: no cover - defensive logging helper
text = f"<unrepresentable {type(value).__name__}: {exc}>"
if len(text) <= limit:
return text
return f"{text[:limit]}...<truncated {len(text) - limit} chars>"
def elapsed_seconds(start: float | int | None, stop: float) -> float | None:
if start is None:
return None
try:
return max(0.0, stop - float(start))
except (TypeError, ValueError):
return None
def format_elapsed(elapsed: float | None) -> str:
if elapsed is None:
return "unknown"
return f"{elapsed:.3f}"
+7 -31
View File
@@ -5,7 +5,7 @@ import time
import traceback
import types
from contextlib import contextmanager
from typing import TYPE_CHECKING, Any, Callable, Literal, TypeVar
from typing import TYPE_CHECKING, Callable, Literal, TypeVar
from bec_lib.client import BECClient
from bec_lib.endpoints import MessageEndpoints
@@ -19,6 +19,7 @@ from bec_widgets.utils.bec_connector import BECConnector
from bec_widgets.utils.bec_dispatcher import BECDispatcher
from bec_widgets.utils.container_utils import WidgetContainerUtils
from bec_widgets.utils.error_popups import ErrorPopupUtility
from bec_widgets.utils.rpc_logging import elapsed_seconds, format_elapsed, format_rpc_payload
from bec_widgets.utils.rpc_register import RPCRegister
from bec_widgets.utils.screen_utils import apply_window_geometry
from bec_widgets.widgets.containers.dock_area.dock_area import BECDockArea
@@ -120,20 +121,20 @@ class RPCServer:
parameter = msg.get("parameter", {})
args = parameter.get("args", [])
kwargs = parameter.get("kwargs", {})
args_log = self._format_rpc_payload(args)
kwargs_log = self._format_rpc_payload(kwargs)
args_log = format_rpc_payload(args)
kwargs_log = format_rpc_payload(kwargs)
target_gui_id = parameter.get("gui_id")
sent_at = metadata.get("sent_at")
deadline = metadata.get("deadline")
timeout = metadata.get("timeout")
received_at = time.time()
receive_latency = self._elapsed_seconds(sent_at, received_at)
receive_latency = elapsed_seconds(sent_at, received_at)
stale_on_receive = deadline is not None and received_at > deadline
logger.info(
"GUI RPC server received request "
f"request_id={request_id} method={method} gui_id={self.gui_id} "
f"target_gui_id={target_gui_id} timeout={timeout} "
f"receive_latency_s={self._format_elapsed(receive_latency)} "
f"receive_latency_s={format_elapsed(receive_latency)} "
f"stale_on_receive={stale_on_receive} args={args_log} kwargs={kwargs_log}"
)
if stale_on_receive:
@@ -141,7 +142,7 @@ class RPCServer:
"GUI RPC server received request after client timeout deadline "
f"request_id={request_id} method={method} gui_id={self.gui_id} "
f"target_gui_id={target_gui_id} timeout={timeout} "
f"receive_latency_s={self._format_elapsed(receive_latency)} "
f"receive_latency_s={format_elapsed(receive_latency)} "
f"args={args_log} kwargs={kwargs_log}"
)
logger.debug(f"Received RPC instruction: {msg}, metadata: {metadata}")
@@ -201,31 +202,6 @@ class RPCServer:
expire=60,
)
@staticmethod
def _elapsed_seconds(start: float | int | None, stop: float) -> float | None:
if start is None:
return None
try:
return max(0.0, stop - float(start))
except (TypeError, ValueError):
return None
@staticmethod
def _format_elapsed(elapsed: float | None) -> str:
if elapsed is None:
return "unknown"
return f"{elapsed:.3f}"
@staticmethod
def _format_rpc_payload(value: Any, limit: int = 500) -> str:
try:
text = repr(value)
except Exception as exc: # pragma: no cover - defensive logging helper
text = f"<unrepresentable {type(value).__name__}: {exc}>"
if len(text) <= limit:
return text
return f"{text[:limit]}...<truncated {len(text) - limit} chars>"
def get_object_from_config(self, config: dict):
gui_id = config.get("gui_id")
obj = self.rpc_register.get_rpc_by_id(gui_id)