diff --git a/bec_widgets/cli/rpc/rpc_base.py b/bec_widgets/cli/rpc/rpc_base.py index 92df5d2a..15091c8b 100644 --- a/bec_widgets/cli/rpc/rpc_base.py +++ b/bec_widgets/cli/rpc/rpc_base.py @@ -13,6 +13,8 @@ from bec_lib.endpoints import MessageEndpoints from bec_lib.logger import bec_logger from bec_lib.utils.import_utils import lazy_import, lazy_import_from +from bec_widgets.utils.rpc_logging import format_rpc_payload + if TYPE_CHECKING: # pragma: no cover from bec_lib import messages from bec_lib.connector import MessageObject @@ -44,16 +46,6 @@ def _transform_args_kwargs(args, kwargs) -> tuple[tuple, dict]: return tuple(_name_arg(arg) for arg in args), {k: _name_arg(v) for k, v in kwargs.items()} -def _format_rpc_payload(value: Any, limit: int = 500) -> str: - try: - text = repr(value) - except Exception as exc: # pragma: no cover - defensive logging helper - text = f"" - if len(text) <= limit: - return text - return f"{text[:limit]}..." - - def rpc_timeout(timeout): """ A decorator to set a timeout for an RPC call. @@ -275,8 +267,8 @@ class RPCBase: ) target_gui_id = gui_id or self._gui_id - args_log = _format_rpc_payload(args) - kwargs_log = _format_rpc_payload(kwargs) + args_log = format_rpc_payload(args) + kwargs_log = format_rpc_payload(kwargs) sent_at = time.time() deadline = sent_at + timeout if timeout is not None else None rpc_msg.metadata.update( diff --git a/bec_widgets/utils/bec_dispatcher.py b/bec_widgets/utils/bec_dispatcher.py index b99ae9ad..28b3e139 100644 --- a/bec_widgets/utils/bec_dispatcher.py +++ b/bec_widgets/utils/bec_dispatcher.py @@ -16,6 +16,7 @@ from bec_lib.service_config import ServiceConfig from qtpy.QtCore import QObject from qtpy.QtCore import Signal as pyqtSignal +from bec_widgets.utils.rpc_logging import elapsed_seconds, format_elapsed, format_rpc_payload from bec_widgets.utils.serialization import register_serializer_extension logger = bec_logger.logger @@ -26,31 +27,6 @@ if TYPE_CHECKING: # pragma: no cover from bec_widgets.utils.rpc_server import RPCServer -def _format_rpc_payload(value: Any, limit: int = 500) -> str: - try: - text = repr(value) - except Exception as exc: # pragma: no cover - defensive logging helper - text = f"" - if len(text) <= limit: - return text - return f"{text[:limit]}..." - - -def _elapsed_seconds(start: float | int | None, stop: float) -> float | None: - if start is None: - return None - try: - return max(0.0, stop - float(start)) - except (TypeError, ValueError): - return None - - -def _format_elapsed(elapsed: float | None) -> str: - if elapsed is None: - return "unknown" - return f"{elapsed:.3f}" - - def _log_rpc_dispatcher_receive(msg_content: Any, metadata: Any) -> None: if not isinstance(msg_content, dict) or not isinstance(metadata, dict): return @@ -64,17 +40,17 @@ def _log_rpc_dispatcher_receive(msg_content: Any, metadata: Any) -> None: sent_at = metadata.get("sent_at") deadline = metadata.get("deadline") timeout = metadata.get("timeout") - dispatch_latency = _elapsed_seconds(sent_at, dispatch_received_at) + dispatch_latency = elapsed_seconds(sent_at, dispatch_received_at) stale_on_dispatch = deadline is not None and dispatch_received_at > deadline target_gui_id = parameter.get("gui_id") or metadata.get("target_gui_id") - args_log = _format_rpc_payload(parameter.get("args", [])) - kwargs_log = _format_rpc_payload(parameter.get("kwargs", {})) + args_log = format_rpc_payload(parameter.get("args", [])) + kwargs_log = format_rpc_payload(parameter.get("kwargs", {})) logger.info( "GUI RPC dispatcher received request before Qt callback emit " f"request_id={request_id} method={method} receiver={metadata.get('receiver')} " f"target_gui_id={target_gui_id} object_name={metadata.get('object_name')} " - f"timeout={timeout} dispatch_latency_s={_format_elapsed(dispatch_latency)} " + f"timeout={timeout} dispatch_latency_s={format_elapsed(dispatch_latency)} " f"stale_on_dispatch={stale_on_dispatch} args={args_log} kwargs={kwargs_log}" ) if stale_on_dispatch: @@ -82,7 +58,7 @@ def _log_rpc_dispatcher_receive(msg_content: Any, metadata: Any) -> None: "GUI RPC dispatcher received request after client timeout deadline " f"request_id={request_id} method={method} receiver={metadata.get('receiver')} " f"target_gui_id={target_gui_id} object_name={metadata.get('object_name')} " - f"timeout={timeout} dispatch_latency_s={_format_elapsed(dispatch_latency)} " + f"timeout={timeout} dispatch_latency_s={format_elapsed(dispatch_latency)} " f"args={args_log} kwargs={kwargs_log}" ) diff --git a/bec_widgets/utils/rpc_logging.py b/bec_widgets/utils/rpc_logging.py new file mode 100644 index 00000000..90a21f4f --- /dev/null +++ b/bec_widgets/utils/rpc_logging.py @@ -0,0 +1,28 @@ +from __future__ import annotations + +from typing import Any + + +def format_rpc_payload(value: Any, limit: int = 500) -> str: + try: + text = repr(value) + except Exception as exc: # pragma: no cover - defensive logging helper + text = f"" + if len(text) <= limit: + return text + return f"{text[:limit]}..." + + +def elapsed_seconds(start: float | int | None, stop: float) -> float | None: + if start is None: + return None + try: + return max(0.0, stop - float(start)) + except (TypeError, ValueError): + return None + + +def format_elapsed(elapsed: float | None) -> str: + if elapsed is None: + return "unknown" + return f"{elapsed:.3f}" diff --git a/bec_widgets/utils/rpc_server.py b/bec_widgets/utils/rpc_server.py index 8a91f404..041e1f82 100644 --- a/bec_widgets/utils/rpc_server.py +++ b/bec_widgets/utils/rpc_server.py @@ -5,7 +5,7 @@ import time import traceback import types from contextlib import contextmanager -from typing import TYPE_CHECKING, Any, Callable, Literal, TypeVar +from typing import TYPE_CHECKING, Callable, Literal, TypeVar from bec_lib.client import BECClient from bec_lib.endpoints import MessageEndpoints @@ -19,6 +19,7 @@ from bec_widgets.utils.bec_connector import BECConnector from bec_widgets.utils.bec_dispatcher import BECDispatcher from bec_widgets.utils.container_utils import WidgetContainerUtils from bec_widgets.utils.error_popups import ErrorPopupUtility +from bec_widgets.utils.rpc_logging import elapsed_seconds, format_elapsed, format_rpc_payload from bec_widgets.utils.rpc_register import RPCRegister from bec_widgets.utils.screen_utils import apply_window_geometry from bec_widgets.widgets.containers.dock_area.dock_area import BECDockArea @@ -120,20 +121,20 @@ class RPCServer: parameter = msg.get("parameter", {}) args = parameter.get("args", []) kwargs = parameter.get("kwargs", {}) - args_log = self._format_rpc_payload(args) - kwargs_log = self._format_rpc_payload(kwargs) + args_log = format_rpc_payload(args) + kwargs_log = format_rpc_payload(kwargs) target_gui_id = parameter.get("gui_id") sent_at = metadata.get("sent_at") deadline = metadata.get("deadline") timeout = metadata.get("timeout") received_at = time.time() - receive_latency = self._elapsed_seconds(sent_at, received_at) + receive_latency = elapsed_seconds(sent_at, received_at) stale_on_receive = deadline is not None and received_at > deadline logger.info( "GUI RPC server received request " f"request_id={request_id} method={method} gui_id={self.gui_id} " f"target_gui_id={target_gui_id} timeout={timeout} " - f"receive_latency_s={self._format_elapsed(receive_latency)} " + f"receive_latency_s={format_elapsed(receive_latency)} " f"stale_on_receive={stale_on_receive} args={args_log} kwargs={kwargs_log}" ) if stale_on_receive: @@ -141,7 +142,7 @@ class RPCServer: "GUI RPC server received request after client timeout deadline " f"request_id={request_id} method={method} gui_id={self.gui_id} " f"target_gui_id={target_gui_id} timeout={timeout} " - f"receive_latency_s={self._format_elapsed(receive_latency)} " + f"receive_latency_s={format_elapsed(receive_latency)} " f"args={args_log} kwargs={kwargs_log}" ) logger.debug(f"Received RPC instruction: {msg}, metadata: {metadata}") @@ -201,31 +202,6 @@ class RPCServer: expire=60, ) - @staticmethod - def _elapsed_seconds(start: float | int | None, stop: float) -> float | None: - if start is None: - return None - try: - return max(0.0, stop - float(start)) - except (TypeError, ValueError): - return None - - @staticmethod - def _format_elapsed(elapsed: float | None) -> str: - if elapsed is None: - return "unknown" - return f"{elapsed:.3f}" - - @staticmethod - def _format_rpc_payload(value: Any, limit: int = 500) -> str: - try: - text = repr(value) - except Exception as exc: # pragma: no cover - defensive logging helper - text = f"" - if len(text) <= limit: - return text - return f"{text[:limit]}..." - def get_object_from_config(self, config: dict): gui_id = config.get("gui_id") obj = self.rpc_register.get_rpc_by_id(gui_id)