diff --git a/bec_widgets/utils/bec_dispatcher.py b/bec_widgets/utils/bec_dispatcher.py index 6f145315..66092d9b 100644 --- a/bec_widgets/utils/bec_dispatcher.py +++ b/bec_widgets/utils/bec_dispatcher.py @@ -12,6 +12,7 @@ import redis from bec_lib.client import BECClient from bec_lib.logger import bec_logger from bec_lib.redis_connector import MessageObject, RedisConnector +from bec_lib.redis_connector.buffered_redis_connector import BufferedRedisConnector from bec_lib.service_config import ServiceConfig from qtpy.QtCore import QObject from qtpy.QtCore import Signal as pyqtSignal @@ -99,22 +100,11 @@ class QtThreadSafeCallback(QObject): self.cb_signal.emit(msg_content, metadata) -class QtRedisConnector(RedisConnector): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - +class QtBufferedRedisConnector(BufferedRedisConnector): def _execute_callback(self, cb, msg, kwargs): if not isinstance(cb, QtThreadSafeCallback): return super()._execute_callback(cb, msg, kwargs) - # if msg.msg_type == "bundle_message": - # # big warning: how to handle bundle messages? - # # message with messages inside ; which slot to call? - # # bundle_msg = msg - # # for msg in bundle_msg: - # # ... - # # for now, only consider the 1st message - # msg = msg[0] - # raise RuntimeError(f" + if isinstance(msg, MessageObject): if isinstance(msg.value, list): msg = msg.value[0] @@ -132,6 +122,10 @@ class QtRedisConnector(RedisConnector): cb(msg.content, msg.metadata) +class QtRedisConnector(RedisConnector): + connector_cls = QtBufferedRedisConnector + + class BECDispatcher: """Utility class to keep track of slots connected to a particular redis connector""" diff --git a/tests/unit_tests/test_bec_dispatcher.py b/tests/unit_tests/test_bec_dispatcher.py index b433e885..3d6df87f 100644 --- a/tests/unit_tests/test_bec_dispatcher.py +++ b/tests/unit_tests/test_bec_dispatcher.py @@ -240,7 +240,7 @@ def test_qt_redis_connector_logs_rpc_before_qt_callback(monkeypatch): ) try: - connector._execute_callback(cb, {"data": rpc_msg}, {}) + connector._buffered_connection._execute_callback(cb, {"data": rpc_msg}, {}) info_mock.assert_called_once() info_message = info_mock.call_args.args[0]