From 878745b99ac1e22c0fbddecc294e599469a2adfe Mon Sep 17 00:00:00 2001 From: wyzula-jan Date: Fri, 22 May 2026 15:08:07 +0200 Subject: [PATCH] fix(rpc): log dispatcher receipt before qt callback --- bec_widgets/utils/bec_dispatcher.py | 66 ++++++++++++++++++++++++- tests/unit_tests/test_bec_dispatcher.py | 53 ++++++++++++++++++++ 2 files changed, 118 insertions(+), 1 deletion(-) diff --git a/bec_widgets/utils/bec_dispatcher.py b/bec_widgets/utils/bec_dispatcher.py index af739ef2..b99ae9ad 100644 --- a/bec_widgets/utils/bec_dispatcher.py +++ b/bec_widgets/utils/bec_dispatcher.py @@ -3,8 +3,9 @@ from __future__ import annotations import collections import random import string +import time from collections.abc import Callable -from typing import TYPE_CHECKING, DefaultDict, Hashable, Union +from typing import TYPE_CHECKING, Any, DefaultDict, Hashable, Union import louie import redis @@ -25,6 +26,67 @@ if TYPE_CHECKING: # pragma: no cover from bec_widgets.utils.rpc_server import RPCServer +def _format_rpc_payload(value: Any, limit: int = 500) -> str: + try: + text = repr(value) + except Exception as exc: # pragma: no cover - defensive logging helper + text = f"" + if len(text) <= limit: + return text + return f"{text[:limit]}..." + + +def _elapsed_seconds(start: float | int | None, stop: float) -> float | None: + if start is None: + return None + try: + return max(0.0, stop - float(start)) + except (TypeError, ValueError): + return None + + +def _format_elapsed(elapsed: float | None) -> str: + if elapsed is None: + return "unknown" + return f"{elapsed:.3f}" + + +def _log_rpc_dispatcher_receive(msg_content: Any, metadata: Any) -> None: + if not isinstance(msg_content, dict) or not isinstance(metadata, dict): + return + request_id = metadata.get("request_id") + method = msg_content.get("action") + parameter = msg_content.get("parameter") + if request_id is None or method is None or not isinstance(parameter, dict): + return + + dispatch_received_at = time.time() + sent_at = metadata.get("sent_at") + deadline = metadata.get("deadline") + timeout = metadata.get("timeout") + dispatch_latency = _elapsed_seconds(sent_at, dispatch_received_at) + stale_on_dispatch = deadline is not None and dispatch_received_at > deadline + target_gui_id = parameter.get("gui_id") or metadata.get("target_gui_id") + args_log = _format_rpc_payload(parameter.get("args", [])) + kwargs_log = _format_rpc_payload(parameter.get("kwargs", {})) + + logger.info( + "GUI RPC dispatcher received request before Qt callback emit " + f"request_id={request_id} method={method} receiver={metadata.get('receiver')} " + f"target_gui_id={target_gui_id} object_name={metadata.get('object_name')} " + f"timeout={timeout} dispatch_latency_s={_format_elapsed(dispatch_latency)} " + f"stale_on_dispatch={stale_on_dispatch} args={args_log} kwargs={kwargs_log}" + ) + if stale_on_dispatch: + logger.warning( + "GUI RPC dispatcher received request after client timeout deadline " + f"request_id={request_id} method={method} receiver={metadata.get('receiver')} " + f"target_gui_id={target_gui_id} object_name={metadata.get('object_name')} " + f"timeout={timeout} dispatch_latency_s={_format_elapsed(dispatch_latency)} " + f"args={args_log} kwargs={kwargs_log}" + ) + + class QtThreadSafeCallback(QObject): """QtThreadSafeCallback is a wrapper around a callback function to make it thread-safe for Qt.""" @@ -88,10 +150,12 @@ class QtRedisConnector(RedisConnector): # we can notice kwargs are lost when passed to Qt slot metadata = msg.metadata + _log_rpc_dispatcher_receive(msg.content, metadata) cb(msg.content, metadata) else: # from stream msg = msg["data"] + _log_rpc_dispatcher_receive(msg.content, msg.metadata) cb(msg.content, msg.metadata) diff --git a/tests/unit_tests/test_bec_dispatcher.py b/tests/unit_tests/test_bec_dispatcher.py index 12b57c67..48bbd8ec 100644 --- a/tests/unit_tests/test_bec_dispatcher.py +++ b/tests/unit_tests/test_bec_dispatcher.py @@ -1,6 +1,7 @@ # pylint: disable = no-name-in-module,missing-class-docstring, missing-module-docstring import threading import time +from types import SimpleNamespace from unittest import mock import pytest @@ -213,3 +214,55 @@ def test_dispatcher_2_topic_same_cb_with_boundmethod( send_msg_event.set() qtbot.wait(10) + + +def test_qt_redis_connector_logs_rpc_before_qt_callback(monkeypatch): + info_mock = mock.MagicMock() + warning_mock = mock.MagicMock() + monkeypatch.setattr("bec_widgets.utils.bec_dispatcher.logger.info", info_mock) + monkeypatch.setattr("bec_widgets.utils.bec_dispatcher.logger.warning", warning_mock) + + def callback(_msg, _metadata): + pass + + cb = QtThreadSafeCallback(callback) + connector = QtRedisConnector("localhost:1", mock.MagicMock()) + rpc_msg = SimpleNamespace( + content={ + "action": "set_value", + "parameter": {"args": [1], "kwargs": {"source": "test"}, "gui_id": "ring"}, + }, + metadata={ + "request_id": "dispatcher-request", + "receiver": "gui", + "object_name": "progressbar", + "timeout": 0.1, + "sent_at": 1.0, + "deadline": 1.1, + }, + ) + + try: + connector._execute_callback(cb, {"data": rpc_msg}, {}) + + info_mock.assert_called_once() + info_message = info_mock.call_args.args[0] + assert "GUI RPC dispatcher received request before Qt callback emit" in info_message + assert "request_id=dispatcher-request" in info_message + assert "method=set_value" in info_message + assert "receiver=gui" in info_message + assert "target_gui_id=ring" in info_message + assert "object_name=progressbar" in info_message + assert "timeout=0.1" in info_message + assert "stale_on_dispatch=True" in info_message + assert "args=[1]" in info_message + assert "kwargs={'source': 'test'}" in info_message + + warning_mock.assert_called_once() + warning_message = warning_mock.call_args.args[0] + assert "received request after client timeout deadline" in warning_message + assert "request_id=dispatcher-request" in warning_message + assert "args=[1]" in warning_message + assert "kwargs={'source': 'test'}" in warning_message + finally: + connector.shutdown()