diff --git a/bec_widgets/cli/rpc/rpc_base.py b/bec_widgets/cli/rpc/rpc_base.py index 2523789e..92df5d2a 100644 --- a/bec_widgets/cli/rpc/rpc_base.py +++ b/bec_widgets/cli/rpc/rpc_base.py @@ -2,6 +2,7 @@ from __future__ import annotations import inspect import threading +import time import uuid from functools import wraps from typing import TYPE_CHECKING, Any, cast @@ -9,6 +10,7 @@ from typing import TYPE_CHECKING, Any, cast from bec_lib.client import BECClient from bec_lib.device import DeviceBaseWithConfig from bec_lib.endpoints import MessageEndpoints +from bec_lib.logger import bec_logger from bec_lib.utils.import_utils import lazy_import, lazy_import_from if TYPE_CHECKING: # pragma: no cover @@ -25,6 +27,7 @@ else: # pylint: disable=protected-access _DEFAULT_RPC_TIMEOUT = object() +logger = bec_logger.logger def _name_arg(arg): @@ -41,6 +44,16 @@ def _transform_args_kwargs(args, kwargs) -> tuple[tuple, dict]: return tuple(_name_arg(arg) for arg in args), {k: _name_arg(v) for k, v in kwargs.items()} +def _format_rpc_payload(value: Any, limit: int = 500) -> str: + try: + text = repr(value) + except Exception as exc: # pragma: no cover - defensive logging helper + text = f"" + if len(text) <= limit: + return text + return f"{text[:limit]}..." + + def rpc_timeout(timeout): """ A decorator to set a timeout for an RPC call. @@ -261,12 +274,42 @@ class RPCBase: MessageEndpoints.gui_instruction_response(request_id), cb=self._on_rpc_response ) + target_gui_id = gui_id or self._gui_id + args_log = _format_rpc_payload(args) + kwargs_log = _format_rpc_payload(kwargs) + sent_at = time.time() + deadline = sent_at + timeout if timeout is not None else None + rpc_msg.metadata.update( + { + "method": method, + "receiver": receiver, + "target_gui_id": target_gui_id, + "object_name": self.object_name, + "wait_for_response": wait_for_rpc_response, + "timeout": timeout, + "sent_at": sent_at, + "deadline": deadline, + } + ) + logger.info( + "Sending GUI RPC request " + f"request_id={request_id} method={method} receiver={receiver} " + f"target_gui_id={target_gui_id} object_name={self.object_name} " + f"wait_for_response={wait_for_rpc_response} timeout={timeout} " + f"args={args_log} kwargs={kwargs_log}" + ) self._client.connector.set_and_publish(MessageEndpoints.gui_instructions(receiver), rpc_msg) if wait_for_rpc_response: try: finished = self._msg_wait_event.wait(timeout) if not finished: + logger.error( + "GUI RPC response timeout " + f"request_id={request_id} method={method} receiver={receiver} " + f"target_gui_id={target_gui_id} object_name={self.object_name} " + f"timeout={timeout} args={args_log} kwargs={kwargs_log}" + ) raise RPCResponseTimeoutError(request_id, timeout) finally: self._msg_wait_event.clear() @@ -278,6 +321,12 @@ class RPCBase: # the _on_rpc_response method assert isinstance(self._rpc_response, messages.RequestResponseMessage) + logger.info( + "Received GUI RPC response " + f"request_id={request_id} method={method} receiver={receiver} " + f"target_gui_id={target_gui_id} object_name={self.object_name} " + f"accepted={self._rpc_response.accepted} args={args_log} kwargs={kwargs_log}" + ) if not self._rpc_response.accepted: raise ValueError(self._rpc_response.message["error"]) msg_result = self._rpc_response.message.get("result") @@ -286,6 +335,7 @@ class RPCBase: def _on_rpc_response(self, msg_obj: MessageObject) -> None: msg = cast(messages.RequestResponseMessage, msg_obj.value) + logger.debug(f"GUI RPC response callback received: {msg}") self._rpc_response = msg self._msg_wait_event.set() diff --git a/bec_widgets/utils/rpc_server.py b/bec_widgets/utils/rpc_server.py index 25fd0cca..8a91f404 100644 --- a/bec_widgets/utils/rpc_server.py +++ b/bec_widgets/utils/rpc_server.py @@ -1,10 +1,11 @@ from __future__ import annotations import functools +import time import traceback import types from contextlib import contextmanager -from typing import TYPE_CHECKING, Callable, Literal, TypeVar +from typing import TYPE_CHECKING, Any, Callable, Literal, TypeVar from bec_lib.client import BECClient from bec_lib.endpoints import MessageEndpoints @@ -115,33 +116,116 @@ class RPCServer: if request_id is None: logger.error("Received RPC instruction without request_id") return + method = msg.get("action") + parameter = msg.get("parameter", {}) + args = parameter.get("args", []) + kwargs = parameter.get("kwargs", {}) + args_log = self._format_rpc_payload(args) + kwargs_log = self._format_rpc_payload(kwargs) + target_gui_id = parameter.get("gui_id") + sent_at = metadata.get("sent_at") + deadline = metadata.get("deadline") + timeout = metadata.get("timeout") + received_at = time.time() + receive_latency = self._elapsed_seconds(sent_at, received_at) + stale_on_receive = deadline is not None and received_at > deadline + logger.info( + "GUI RPC server received request " + f"request_id={request_id} method={method} gui_id={self.gui_id} " + f"target_gui_id={target_gui_id} timeout={timeout} " + f"receive_latency_s={self._format_elapsed(receive_latency)} " + f"stale_on_receive={stale_on_receive} args={args_log} kwargs={kwargs_log}" + ) + if stale_on_receive: + logger.warning( + "GUI RPC server received request after client timeout deadline " + f"request_id={request_id} method={method} gui_id={self.gui_id} " + f"target_gui_id={target_gui_id} timeout={timeout} " + f"receive_latency_s={self._format_elapsed(receive_latency)} " + f"args={args_log} kwargs={kwargs_log}" + ) logger.debug(f"Received RPC instruction: {msg}, metadata: {metadata}") + execution_start = time.perf_counter() with rpc_exception_hook(functools.partial(self.send_response, request_id, False)): try: - method = msg["action"] - args = msg["parameter"].get("args", []) - kwargs = msg["parameter"].get("kwargs", {}) if method.startswith("system."): res = self.run_system_rpc(method, args, kwargs) else: - obj = self.get_object_from_config(msg["parameter"]) + obj = self.get_object_from_config(parameter) res = self.run_rpc(obj, method, args, kwargs) except Exception: + execution_duration = time.perf_counter() - execution_start content = traceback.format_exc() - logger.error(f"Error while executing RPC instruction: {content}") + logger.error( + "GUI RPC server execution failed " + f"request_id={request_id} method={method} gui_id={self.gui_id} " + f"target_gui_id={target_gui_id} execution_duration_s={execution_duration:.3f} " + f"args={args_log} kwargs={kwargs_log}\n" + f"{content}" + ) self.send_response(request_id, False, {"error": content}) else: + execution_duration = time.perf_counter() - execution_start + response_stale = deadline is not None and time.time() > deadline + logger.info( + "GUI RPC server executed request " + f"request_id={request_id} method={method} gui_id={self.gui_id} " + f"target_gui_id={target_gui_id} execution_duration_s={execution_duration:.3f} " + f"response_after_client_deadline={response_stale} " + f"args={args_log} kwargs={kwargs_log}" + ) + if response_stale: + logger.warning( + "GUI RPC server response is late for client timeout " + f"request_id={request_id} method={method} gui_id={self.gui_id} " + f"target_gui_id={target_gui_id} timeout={timeout} " + f"execution_duration_s={execution_duration:.3f} " + f"args={args_log} kwargs={kwargs_log}" + ) logger.debug(f"RPC instruction executed successfully: {res}") self._rpc_singleshot_repeats[request_id] = SingleshotRPCRepeat() QTimer.singleShot(0, lambda: self.serialize_result_and_send(request_id, res)) def send_response(self, request_id: str, accepted: bool, msg: dict): + log_message = ( + "GUI RPC server publishing response " + f"request_id={request_id} gui_id={self.gui_id} accepted={accepted}" + ) + if accepted: + logger.info(log_message) + else: + logger.error(log_message) self.client.connector.set_and_publish( MessageEndpoints.gui_instruction_response(request_id), messages.RequestResponseMessage(accepted=accepted, message=msg), expire=60, ) + @staticmethod + def _elapsed_seconds(start: float | int | None, stop: float) -> float | None: + if start is None: + return None + try: + return max(0.0, stop - float(start)) + except (TypeError, ValueError): + return None + + @staticmethod + def _format_elapsed(elapsed: float | None) -> str: + if elapsed is None: + return "unknown" + return f"{elapsed:.3f}" + + @staticmethod + def _format_rpc_payload(value: Any, limit: int = 500) -> str: + try: + text = repr(value) + except Exception as exc: # pragma: no cover - defensive logging helper + text = f"" + if len(text) <= limit: + return text + return f"{text[:limit]}..." + def get_object_from_config(self, config: dict): gui_id = config.get("gui_id") obj = self.rpc_register.get_rpc_by_id(gui_id) diff --git a/tests/unit_tests/test_rpc_base.py b/tests/unit_tests/test_rpc_base.py index 3fddb4e7..7fc44cf4 100644 --- a/tests/unit_tests/test_rpc_base.py +++ b/tests/unit_tests/test_rpc_base.py @@ -3,10 +3,12 @@ from unittest.mock import MagicMock import pytest from bec_lib.device import DeviceBaseWithConfig, Signal +from bec_widgets.cli.rpc import rpc_base as rpc_base_module from bec_widgets.cli.rpc.rpc_base import ( DeletedWidgetError, RPCBase, RPCReference, + RPCResponseTimeoutError, _transform_args_kwargs, ) @@ -51,3 +53,37 @@ def test_transform_args_kwargs(): ) assert args == ("full name", "short name", "string_arg", "full name") assert kwargs == {"a": "full name", "b": "short name", "c": "string_arg", "d": "full name"} + + +def test_run_rpc_logs_response_timeout(monkeypatch): + rpc = RPCBase(gui_id="progress_widget", object_name="progressbar") + rpc._rpc_timeout = 0 + rpc._client = MagicMock() + + info_mock = MagicMock() + error_mock = MagicMock() + monkeypatch.setattr(rpc_base_module.logger, "info", info_mock) + monkeypatch.setattr(rpc_base_module.logger, "error", error_mock) + + with pytest.raises(RPCResponseTimeoutError): + rpc._run_rpc("set_value", 42, precision=2, timeout=0) + + publish_msg = rpc._client.connector.set_and_publish.call_args.args[1] + assert publish_msg.metadata["method"] == "set_value" + assert publish_msg.metadata["target_gui_id"] == "progress_widget" + assert publish_msg.metadata["object_name"] == "progressbar" + assert publish_msg.metadata["timeout"] == 0 + assert publish_msg.metadata["deadline"] == publish_msg.metadata["sent_at"] + assert info_mock.call_count == 1 + info_message = info_mock.call_args.args[0] + assert "args=(42,)" in info_message + assert "kwargs={'precision': 2}" in info_message + error_mock.assert_called_once() + error_message = error_mock.call_args.args[0] + assert "GUI RPC response timeout" in error_message + assert "method=set_value" in error_message + assert "target_gui_id=progress_widget" in error_message + assert "object_name=progressbar" in error_message + assert "timeout=0" in error_message + assert "args=(42,)" in error_message + assert "kwargs={'precision': 2}" in error_message diff --git a/tests/unit_tests/test_rpc_server.py b/tests/unit_tests/test_rpc_server.py index 7f83ea13..33f99ba3 100644 --- a/tests/unit_tests/test_rpc_server.py +++ b/tests/unit_tests/test_rpc_server.py @@ -1,11 +1,12 @@ import argparse -from unittest.mock import patch +from unittest.mock import MagicMock, patch import pytest from bec_lib.service_config import ServiceConfig from qtpy.QtWidgets import QWidget from bec_widgets.applications.companion_app import GUIServer +from bec_widgets.utils import rpc_server as rpc_server_module from bec_widgets.utils.bec_connector import BECConnector from bec_widgets.utils.rpc_server import RegistryNotReadyError, RPCServer, SingleshotRPCRepeat @@ -140,6 +141,62 @@ def test_serialize_result_and_send_max_delay_exceeded(rpc_server, qtbot, dummy_w assert "Max delay exceeded" in args[2]["error"] +def test_send_response_logs_publish_status(rpc_server, monkeypatch): + info_mock = MagicMock() + error_mock = MagicMock() + monkeypatch.setattr(rpc_server_module.logger, "info", info_mock) + monkeypatch.setattr(rpc_server_module.logger, "error", error_mock) + + with patch.object(rpc_server.client.connector, "set_and_publish") as publish_mock: + rpc_server.send_response("request-ok", True, {"result": None}) + rpc_server.send_response("request-failed", False, {"error": "bad"}) + + assert publish_mock.call_count == 2 + assert "request_id=request-ok" in info_mock.call_args.args[0] + assert "accepted=True" in info_mock.call_args.args[0] + assert "request_id=request-failed" in error_mock.call_args.args[0] + assert "accepted=False" in error_mock.call_args.args[0] + + +def test_on_rpc_update_logs_late_client_deadline(rpc_server, monkeypatch): + info_mock = MagicMock() + warning_mock = MagicMock() + monkeypatch.setattr(rpc_server_module.logger, "info", info_mock) + monkeypatch.setattr(rpc_server_module.logger, "warning", warning_mock) + + rpc_server.rpc_register.get_rpc_by_id = MagicMock() + rpc_server.run_rpc = MagicMock(return_value=None) + rpc_server.serialize_result_and_send = MagicMock() + + rpc_server.on_rpc_update( + { + "action": "set_value", + "parameter": {"args": [1], "kwargs": {"source": "test"}, "gui_id": "ring"}, + }, + {"request_id": "late-request", "timeout": 0.1, "sent_at": 1.0, "deadline": 1.1}, + ) + + received_log = info_mock.call_args_list[0].args[0] + executed_log = info_mock.call_args_list[1].args[0] + warning_logs = "\n".join(call.args[0] for call in warning_mock.call_args_list) + + assert "GUI RPC server received request" in received_log + assert "request_id=late-request" in received_log + assert "method=set_value" in received_log + assert "target_gui_id=ring" in received_log + assert "timeout=0.1" in received_log + assert "stale_on_receive=True" in received_log + assert "args=[1]" in received_log + assert "kwargs={'source': 'test'}" in received_log + assert "response_after_client_deadline=True" in executed_log + assert "args=[1]" in executed_log + assert "kwargs={'source': 'test'}" in executed_log + assert "received request after client timeout deadline" in warning_logs + assert "response is late for client timeout" in warning_logs + assert "args=[1]" in warning_logs + assert "kwargs={'source': 'test'}" in warning_logs + + def test_run_rpc_delegates_to_rpc_content_class(rpc_server): class Content: USER_ACCESS = ["foo", "mode", "mode.setter"]