mirror of
https://github.com/bec-project/bec_widgets.git
synced 2026-06-05 12:58:40 +02:00
fix(rpc): additional logs
This commit is contained in:
@@ -2,6 +2,7 @@ from __future__ import annotations
|
||||
|
||||
import inspect
|
||||
import threading
|
||||
import time
|
||||
import uuid
|
||||
from functools import wraps
|
||||
from typing import TYPE_CHECKING, Any, cast
|
||||
@@ -9,6 +10,7 @@ from typing import TYPE_CHECKING, Any, cast
|
||||
from bec_lib.client import BECClient
|
||||
from bec_lib.device import DeviceBaseWithConfig
|
||||
from bec_lib.endpoints import MessageEndpoints
|
||||
from bec_lib.logger import bec_logger
|
||||
from bec_lib.utils.import_utils import lazy_import, lazy_import_from
|
||||
|
||||
if TYPE_CHECKING: # pragma: no cover
|
||||
@@ -25,6 +27,7 @@ else:
|
||||
# pylint: disable=protected-access
|
||||
|
||||
_DEFAULT_RPC_TIMEOUT = object()
|
||||
logger = bec_logger.logger
|
||||
|
||||
|
||||
def _name_arg(arg):
|
||||
@@ -41,6 +44,16 @@ def _transform_args_kwargs(args, kwargs) -> tuple[tuple, dict]:
|
||||
return tuple(_name_arg(arg) for arg in args), {k: _name_arg(v) for k, v in kwargs.items()}
|
||||
|
||||
|
||||
def _format_rpc_payload(value: Any, limit: int = 500) -> str:
|
||||
try:
|
||||
text = repr(value)
|
||||
except Exception as exc: # pragma: no cover - defensive logging helper
|
||||
text = f"<unrepresentable {type(value).__name__}: {exc}>"
|
||||
if len(text) <= limit:
|
||||
return text
|
||||
return f"{text[:limit]}...<truncated {len(text) - limit} chars>"
|
||||
|
||||
|
||||
def rpc_timeout(timeout):
|
||||
"""
|
||||
A decorator to set a timeout for an RPC call.
|
||||
@@ -261,12 +274,42 @@ class RPCBase:
|
||||
MessageEndpoints.gui_instruction_response(request_id), cb=self._on_rpc_response
|
||||
)
|
||||
|
||||
target_gui_id = gui_id or self._gui_id
|
||||
args_log = _format_rpc_payload(args)
|
||||
kwargs_log = _format_rpc_payload(kwargs)
|
||||
sent_at = time.time()
|
||||
deadline = sent_at + timeout if timeout is not None else None
|
||||
rpc_msg.metadata.update(
|
||||
{
|
||||
"method": method,
|
||||
"receiver": receiver,
|
||||
"target_gui_id": target_gui_id,
|
||||
"object_name": self.object_name,
|
||||
"wait_for_response": wait_for_rpc_response,
|
||||
"timeout": timeout,
|
||||
"sent_at": sent_at,
|
||||
"deadline": deadline,
|
||||
}
|
||||
)
|
||||
logger.info(
|
||||
"Sending GUI RPC request "
|
||||
f"request_id={request_id} method={method} receiver={receiver} "
|
||||
f"target_gui_id={target_gui_id} object_name={self.object_name} "
|
||||
f"wait_for_response={wait_for_rpc_response} timeout={timeout} "
|
||||
f"args={args_log} kwargs={kwargs_log}"
|
||||
)
|
||||
self._client.connector.set_and_publish(MessageEndpoints.gui_instructions(receiver), rpc_msg)
|
||||
|
||||
if wait_for_rpc_response:
|
||||
try:
|
||||
finished = self._msg_wait_event.wait(timeout)
|
||||
if not finished:
|
||||
logger.error(
|
||||
"GUI RPC response timeout "
|
||||
f"request_id={request_id} method={method} receiver={receiver} "
|
||||
f"target_gui_id={target_gui_id} object_name={self.object_name} "
|
||||
f"timeout={timeout} args={args_log} kwargs={kwargs_log}"
|
||||
)
|
||||
raise RPCResponseTimeoutError(request_id, timeout)
|
||||
finally:
|
||||
self._msg_wait_event.clear()
|
||||
@@ -278,6 +321,12 @@ class RPCBase:
|
||||
# the _on_rpc_response method
|
||||
assert isinstance(self._rpc_response, messages.RequestResponseMessage)
|
||||
|
||||
logger.info(
|
||||
"Received GUI RPC response "
|
||||
f"request_id={request_id} method={method} receiver={receiver} "
|
||||
f"target_gui_id={target_gui_id} object_name={self.object_name} "
|
||||
f"accepted={self._rpc_response.accepted} args={args_log} kwargs={kwargs_log}"
|
||||
)
|
||||
if not self._rpc_response.accepted:
|
||||
raise ValueError(self._rpc_response.message["error"])
|
||||
msg_result = self._rpc_response.message.get("result")
|
||||
@@ -286,6 +335,7 @@ class RPCBase:
|
||||
|
||||
def _on_rpc_response(self, msg_obj: MessageObject) -> None:
|
||||
msg = cast(messages.RequestResponseMessage, msg_obj.value)
|
||||
logger.debug(f"GUI RPC response callback received: {msg}")
|
||||
self._rpc_response = msg
|
||||
self._msg_wait_event.set()
|
||||
|
||||
|
||||
@@ -1,10 +1,11 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import functools
|
||||
import time
|
||||
import traceback
|
||||
import types
|
||||
from contextlib import contextmanager
|
||||
from typing import TYPE_CHECKING, Callable, Literal, TypeVar
|
||||
from typing import TYPE_CHECKING, Any, Callable, Literal, TypeVar
|
||||
|
||||
from bec_lib.client import BECClient
|
||||
from bec_lib.endpoints import MessageEndpoints
|
||||
@@ -115,33 +116,116 @@ class RPCServer:
|
||||
if request_id is None:
|
||||
logger.error("Received RPC instruction without request_id")
|
||||
return
|
||||
method = msg.get("action")
|
||||
parameter = msg.get("parameter", {})
|
||||
args = parameter.get("args", [])
|
||||
kwargs = parameter.get("kwargs", {})
|
||||
args_log = self._format_rpc_payload(args)
|
||||
kwargs_log = self._format_rpc_payload(kwargs)
|
||||
target_gui_id = parameter.get("gui_id")
|
||||
sent_at = metadata.get("sent_at")
|
||||
deadline = metadata.get("deadline")
|
||||
timeout = metadata.get("timeout")
|
||||
received_at = time.time()
|
||||
receive_latency = self._elapsed_seconds(sent_at, received_at)
|
||||
stale_on_receive = deadline is not None and received_at > deadline
|
||||
logger.info(
|
||||
"GUI RPC server received request "
|
||||
f"request_id={request_id} method={method} gui_id={self.gui_id} "
|
||||
f"target_gui_id={target_gui_id} timeout={timeout} "
|
||||
f"receive_latency_s={self._format_elapsed(receive_latency)} "
|
||||
f"stale_on_receive={stale_on_receive} args={args_log} kwargs={kwargs_log}"
|
||||
)
|
||||
if stale_on_receive:
|
||||
logger.warning(
|
||||
"GUI RPC server received request after client timeout deadline "
|
||||
f"request_id={request_id} method={method} gui_id={self.gui_id} "
|
||||
f"target_gui_id={target_gui_id} timeout={timeout} "
|
||||
f"receive_latency_s={self._format_elapsed(receive_latency)} "
|
||||
f"args={args_log} kwargs={kwargs_log}"
|
||||
)
|
||||
logger.debug(f"Received RPC instruction: {msg}, metadata: {metadata}")
|
||||
execution_start = time.perf_counter()
|
||||
with rpc_exception_hook(functools.partial(self.send_response, request_id, False)):
|
||||
try:
|
||||
method = msg["action"]
|
||||
args = msg["parameter"].get("args", [])
|
||||
kwargs = msg["parameter"].get("kwargs", {})
|
||||
if method.startswith("system."):
|
||||
res = self.run_system_rpc(method, args, kwargs)
|
||||
else:
|
||||
obj = self.get_object_from_config(msg["parameter"])
|
||||
obj = self.get_object_from_config(parameter)
|
||||
res = self.run_rpc(obj, method, args, kwargs)
|
||||
except Exception:
|
||||
execution_duration = time.perf_counter() - execution_start
|
||||
content = traceback.format_exc()
|
||||
logger.error(f"Error while executing RPC instruction: {content}")
|
||||
logger.error(
|
||||
"GUI RPC server execution failed "
|
||||
f"request_id={request_id} method={method} gui_id={self.gui_id} "
|
||||
f"target_gui_id={target_gui_id} execution_duration_s={execution_duration:.3f} "
|
||||
f"args={args_log} kwargs={kwargs_log}\n"
|
||||
f"{content}"
|
||||
)
|
||||
self.send_response(request_id, False, {"error": content})
|
||||
else:
|
||||
execution_duration = time.perf_counter() - execution_start
|
||||
response_stale = deadline is not None and time.time() > deadline
|
||||
logger.info(
|
||||
"GUI RPC server executed request "
|
||||
f"request_id={request_id} method={method} gui_id={self.gui_id} "
|
||||
f"target_gui_id={target_gui_id} execution_duration_s={execution_duration:.3f} "
|
||||
f"response_after_client_deadline={response_stale} "
|
||||
f"args={args_log} kwargs={kwargs_log}"
|
||||
)
|
||||
if response_stale:
|
||||
logger.warning(
|
||||
"GUI RPC server response is late for client timeout "
|
||||
f"request_id={request_id} method={method} gui_id={self.gui_id} "
|
||||
f"target_gui_id={target_gui_id} timeout={timeout} "
|
||||
f"execution_duration_s={execution_duration:.3f} "
|
||||
f"args={args_log} kwargs={kwargs_log}"
|
||||
)
|
||||
logger.debug(f"RPC instruction executed successfully: {res}")
|
||||
self._rpc_singleshot_repeats[request_id] = SingleshotRPCRepeat()
|
||||
QTimer.singleShot(0, lambda: self.serialize_result_and_send(request_id, res))
|
||||
|
||||
def send_response(self, request_id: str, accepted: bool, msg: dict):
|
||||
log_message = (
|
||||
"GUI RPC server publishing response "
|
||||
f"request_id={request_id} gui_id={self.gui_id} accepted={accepted}"
|
||||
)
|
||||
if accepted:
|
||||
logger.info(log_message)
|
||||
else:
|
||||
logger.error(log_message)
|
||||
self.client.connector.set_and_publish(
|
||||
MessageEndpoints.gui_instruction_response(request_id),
|
||||
messages.RequestResponseMessage(accepted=accepted, message=msg),
|
||||
expire=60,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _elapsed_seconds(start: float | int | None, stop: float) -> float | None:
|
||||
if start is None:
|
||||
return None
|
||||
try:
|
||||
return max(0.0, stop - float(start))
|
||||
except (TypeError, ValueError):
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _format_elapsed(elapsed: float | None) -> str:
|
||||
if elapsed is None:
|
||||
return "unknown"
|
||||
return f"{elapsed:.3f}"
|
||||
|
||||
@staticmethod
|
||||
def _format_rpc_payload(value: Any, limit: int = 500) -> str:
|
||||
try:
|
||||
text = repr(value)
|
||||
except Exception as exc: # pragma: no cover - defensive logging helper
|
||||
text = f"<unrepresentable {type(value).__name__}: {exc}>"
|
||||
if len(text) <= limit:
|
||||
return text
|
||||
return f"{text[:limit]}...<truncated {len(text) - limit} chars>"
|
||||
|
||||
def get_object_from_config(self, config: dict):
|
||||
gui_id = config.get("gui_id")
|
||||
obj = self.rpc_register.get_rpc_by_id(gui_id)
|
||||
|
||||
@@ -3,10 +3,12 @@ from unittest.mock import MagicMock
|
||||
import pytest
|
||||
from bec_lib.device import DeviceBaseWithConfig, Signal
|
||||
|
||||
from bec_widgets.cli.rpc import rpc_base as rpc_base_module
|
||||
from bec_widgets.cli.rpc.rpc_base import (
|
||||
DeletedWidgetError,
|
||||
RPCBase,
|
||||
RPCReference,
|
||||
RPCResponseTimeoutError,
|
||||
_transform_args_kwargs,
|
||||
)
|
||||
|
||||
@@ -51,3 +53,37 @@ def test_transform_args_kwargs():
|
||||
)
|
||||
assert args == ("full name", "short name", "string_arg", "full name")
|
||||
assert kwargs == {"a": "full name", "b": "short name", "c": "string_arg", "d": "full name"}
|
||||
|
||||
|
||||
def test_run_rpc_logs_response_timeout(monkeypatch):
|
||||
rpc = RPCBase(gui_id="progress_widget", object_name="progressbar")
|
||||
rpc._rpc_timeout = 0
|
||||
rpc._client = MagicMock()
|
||||
|
||||
info_mock = MagicMock()
|
||||
error_mock = MagicMock()
|
||||
monkeypatch.setattr(rpc_base_module.logger, "info", info_mock)
|
||||
monkeypatch.setattr(rpc_base_module.logger, "error", error_mock)
|
||||
|
||||
with pytest.raises(RPCResponseTimeoutError):
|
||||
rpc._run_rpc("set_value", 42, precision=2, timeout=0)
|
||||
|
||||
publish_msg = rpc._client.connector.set_and_publish.call_args.args[1]
|
||||
assert publish_msg.metadata["method"] == "set_value"
|
||||
assert publish_msg.metadata["target_gui_id"] == "progress_widget"
|
||||
assert publish_msg.metadata["object_name"] == "progressbar"
|
||||
assert publish_msg.metadata["timeout"] == 0
|
||||
assert publish_msg.metadata["deadline"] == publish_msg.metadata["sent_at"]
|
||||
assert info_mock.call_count == 1
|
||||
info_message = info_mock.call_args.args[0]
|
||||
assert "args=(42,)" in info_message
|
||||
assert "kwargs={'precision': 2}" in info_message
|
||||
error_mock.assert_called_once()
|
||||
error_message = error_mock.call_args.args[0]
|
||||
assert "GUI RPC response timeout" in error_message
|
||||
assert "method=set_value" in error_message
|
||||
assert "target_gui_id=progress_widget" in error_message
|
||||
assert "object_name=progressbar" in error_message
|
||||
assert "timeout=0" in error_message
|
||||
assert "args=(42,)" in error_message
|
||||
assert "kwargs={'precision': 2}" in error_message
|
||||
|
||||
@@ -1,11 +1,12 @@
|
||||
import argparse
|
||||
from unittest.mock import patch
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
from bec_lib.service_config import ServiceConfig
|
||||
from qtpy.QtWidgets import QWidget
|
||||
|
||||
from bec_widgets.applications.companion_app import GUIServer
|
||||
from bec_widgets.utils import rpc_server as rpc_server_module
|
||||
from bec_widgets.utils.bec_connector import BECConnector
|
||||
from bec_widgets.utils.rpc_server import RegistryNotReadyError, RPCServer, SingleshotRPCRepeat
|
||||
|
||||
@@ -140,6 +141,62 @@ def test_serialize_result_and_send_max_delay_exceeded(rpc_server, qtbot, dummy_w
|
||||
assert "Max delay exceeded" in args[2]["error"]
|
||||
|
||||
|
||||
def test_send_response_logs_publish_status(rpc_server, monkeypatch):
|
||||
info_mock = MagicMock()
|
||||
error_mock = MagicMock()
|
||||
monkeypatch.setattr(rpc_server_module.logger, "info", info_mock)
|
||||
monkeypatch.setattr(rpc_server_module.logger, "error", error_mock)
|
||||
|
||||
with patch.object(rpc_server.client.connector, "set_and_publish") as publish_mock:
|
||||
rpc_server.send_response("request-ok", True, {"result": None})
|
||||
rpc_server.send_response("request-failed", False, {"error": "bad"})
|
||||
|
||||
assert publish_mock.call_count == 2
|
||||
assert "request_id=request-ok" in info_mock.call_args.args[0]
|
||||
assert "accepted=True" in info_mock.call_args.args[0]
|
||||
assert "request_id=request-failed" in error_mock.call_args.args[0]
|
||||
assert "accepted=False" in error_mock.call_args.args[0]
|
||||
|
||||
|
||||
def test_on_rpc_update_logs_late_client_deadline(rpc_server, monkeypatch):
|
||||
info_mock = MagicMock()
|
||||
warning_mock = MagicMock()
|
||||
monkeypatch.setattr(rpc_server_module.logger, "info", info_mock)
|
||||
monkeypatch.setattr(rpc_server_module.logger, "warning", warning_mock)
|
||||
|
||||
rpc_server.rpc_register.get_rpc_by_id = MagicMock()
|
||||
rpc_server.run_rpc = MagicMock(return_value=None)
|
||||
rpc_server.serialize_result_and_send = MagicMock()
|
||||
|
||||
rpc_server.on_rpc_update(
|
||||
{
|
||||
"action": "set_value",
|
||||
"parameter": {"args": [1], "kwargs": {"source": "test"}, "gui_id": "ring"},
|
||||
},
|
||||
{"request_id": "late-request", "timeout": 0.1, "sent_at": 1.0, "deadline": 1.1},
|
||||
)
|
||||
|
||||
received_log = info_mock.call_args_list[0].args[0]
|
||||
executed_log = info_mock.call_args_list[1].args[0]
|
||||
warning_logs = "\n".join(call.args[0] for call in warning_mock.call_args_list)
|
||||
|
||||
assert "GUI RPC server received request" in received_log
|
||||
assert "request_id=late-request" in received_log
|
||||
assert "method=set_value" in received_log
|
||||
assert "target_gui_id=ring" in received_log
|
||||
assert "timeout=0.1" in received_log
|
||||
assert "stale_on_receive=True" in received_log
|
||||
assert "args=[1]" in received_log
|
||||
assert "kwargs={'source': 'test'}" in received_log
|
||||
assert "response_after_client_deadline=True" in executed_log
|
||||
assert "args=[1]" in executed_log
|
||||
assert "kwargs={'source': 'test'}" in executed_log
|
||||
assert "received request after client timeout deadline" in warning_logs
|
||||
assert "response is late for client timeout" in warning_logs
|
||||
assert "args=[1]" in warning_logs
|
||||
assert "kwargs={'source': 'test'}" in warning_logs
|
||||
|
||||
|
||||
def test_run_rpc_delegates_to_rpc_content_class(rpc_server):
|
||||
class Content:
|
||||
USER_ACCESS = ["foo", "mode", "mode.setter"]
|
||||
|
||||
Reference in New Issue
Block a user