fix(rpc): log dispatcher receipt before qt callback

This commit is contained in:
2026-05-22 15:08:07 +02:00
committed by Jan Wyzula
parent e41e60956b
commit 878745b99a
2 changed files with 118 additions and 1 deletions
+65 -1
View File
@@ -3,8 +3,9 @@ from __future__ import annotations
import collections
import random
import string
import time
from collections.abc import Callable
from typing import TYPE_CHECKING, DefaultDict, Hashable, Union
from typing import TYPE_CHECKING, Any, DefaultDict, Hashable, Union
import louie
import redis
@@ -25,6 +26,67 @@ if TYPE_CHECKING: # pragma: no cover
from bec_widgets.utils.rpc_server import RPCServer
def _format_rpc_payload(value: Any, limit: int = 500) -> str:
try:
text = repr(value)
except Exception as exc: # pragma: no cover - defensive logging helper
text = f"<unrepresentable {type(value).__name__}: {exc}>"
if len(text) <= limit:
return text
return f"{text[:limit]}...<truncated {len(text) - limit} chars>"
def _elapsed_seconds(start: float | int | None, stop: float) -> float | None:
if start is None:
return None
try:
return max(0.0, stop - float(start))
except (TypeError, ValueError):
return None
def _format_elapsed(elapsed: float | None) -> str:
if elapsed is None:
return "unknown"
return f"{elapsed:.3f}"
def _log_rpc_dispatcher_receive(msg_content: Any, metadata: Any) -> None:
if not isinstance(msg_content, dict) or not isinstance(metadata, dict):
return
request_id = metadata.get("request_id")
method = msg_content.get("action")
parameter = msg_content.get("parameter")
if request_id is None or method is None or not isinstance(parameter, dict):
return
dispatch_received_at = time.time()
sent_at = metadata.get("sent_at")
deadline = metadata.get("deadline")
timeout = metadata.get("timeout")
dispatch_latency = _elapsed_seconds(sent_at, dispatch_received_at)
stale_on_dispatch = deadline is not None and dispatch_received_at > deadline
target_gui_id = parameter.get("gui_id") or metadata.get("target_gui_id")
args_log = _format_rpc_payload(parameter.get("args", []))
kwargs_log = _format_rpc_payload(parameter.get("kwargs", {}))
logger.info(
"GUI RPC dispatcher received request before Qt callback emit "
f"request_id={request_id} method={method} receiver={metadata.get('receiver')} "
f"target_gui_id={target_gui_id} object_name={metadata.get('object_name')} "
f"timeout={timeout} dispatch_latency_s={_format_elapsed(dispatch_latency)} "
f"stale_on_dispatch={stale_on_dispatch} args={args_log} kwargs={kwargs_log}"
)
if stale_on_dispatch:
logger.warning(
"GUI RPC dispatcher received request after client timeout deadline "
f"request_id={request_id} method={method} receiver={metadata.get('receiver')} "
f"target_gui_id={target_gui_id} object_name={metadata.get('object_name')} "
f"timeout={timeout} dispatch_latency_s={_format_elapsed(dispatch_latency)} "
f"args={args_log} kwargs={kwargs_log}"
)
class QtThreadSafeCallback(QObject):
"""QtThreadSafeCallback is a wrapper around a callback function to make it thread-safe for Qt."""
@@ -88,10 +150,12 @@ class QtRedisConnector(RedisConnector):
# we can notice kwargs are lost when passed to Qt slot
metadata = msg.metadata
_log_rpc_dispatcher_receive(msg.content, metadata)
cb(msg.content, metadata)
else:
# from stream
msg = msg["data"]
_log_rpc_dispatcher_receive(msg.content, msg.metadata)
cb(msg.content, msg.metadata)
+53
View File
@@ -1,6 +1,7 @@
# pylint: disable = no-name-in-module,missing-class-docstring, missing-module-docstring
import threading
import time
from types import SimpleNamespace
from unittest import mock
import pytest
@@ -213,3 +214,55 @@ def test_dispatcher_2_topic_same_cb_with_boundmethod(
send_msg_event.set()
qtbot.wait(10)
def test_qt_redis_connector_logs_rpc_before_qt_callback(monkeypatch):
info_mock = mock.MagicMock()
warning_mock = mock.MagicMock()
monkeypatch.setattr("bec_widgets.utils.bec_dispatcher.logger.info", info_mock)
monkeypatch.setattr("bec_widgets.utils.bec_dispatcher.logger.warning", warning_mock)
def callback(_msg, _metadata):
pass
cb = QtThreadSafeCallback(callback)
connector = QtRedisConnector("localhost:1", mock.MagicMock())
rpc_msg = SimpleNamespace(
content={
"action": "set_value",
"parameter": {"args": [1], "kwargs": {"source": "test"}, "gui_id": "ring"},
},
metadata={
"request_id": "dispatcher-request",
"receiver": "gui",
"object_name": "progressbar",
"timeout": 0.1,
"sent_at": 1.0,
"deadline": 1.1,
},
)
try:
connector._execute_callback(cb, {"data": rpc_msg}, {})
info_mock.assert_called_once()
info_message = info_mock.call_args.args[0]
assert "GUI RPC dispatcher received request before Qt callback emit" in info_message
assert "request_id=dispatcher-request" in info_message
assert "method=set_value" in info_message
assert "receiver=gui" in info_message
assert "target_gui_id=ring" in info_message
assert "object_name=progressbar" in info_message
assert "timeout=0.1" in info_message
assert "stale_on_dispatch=True" in info_message
assert "args=[1]" in info_message
assert "kwargs={'source': 'test'}" in info_message
warning_mock.assert_called_once()
warning_message = warning_mock.call_args.args[0]
assert "received request after client timeout deadline" in warning_message
assert "request_id=dispatcher-request" in warning_message
assert "args=[1]" in warning_message
assert "kwargs={'source': 'test'}" in warning_message
finally:
connector.shutdown()