From abc6caa2d0b6141dfbe1f3d025f78ae14deddcb3 Mon Sep 17 00:00:00 2001 From: Mathias Guijarro Date: Tue, 7 May 2024 17:10:57 +0200 Subject: [PATCH] feat: implement non-polling, interruptible waiting of gui instruction response with timeout --- bec_widgets/cli/client_utils.py | 96 ++++++++++++------- .../jupyter_console/jupyter_console_window.py | 15 +-- bec_widgets/utils/bec_dispatcher.py | 6 +- 3 files changed, 75 insertions(+), 42 deletions(-) diff --git a/bec_widgets/cli/client_utils.py b/bec_widgets/cli/client_utils.py index 7c6a4d63..f7ca7ad3 100644 --- a/bec_widgets/cli/client_utils.py +++ b/bec_widgets/cli/client_utils.py @@ -14,7 +14,7 @@ from typing import TYPE_CHECKING from bec_lib.endpoints import MessageEndpoints from bec_lib.utils.import_utils import isinstance_based_on_class_name, lazy_import, lazy_import_from -from qtpy.QtCore import QCoreApplication +from qtpy.QtCore import QEventLoop, QSocketNotifier, QTimer import bec_widgets.cli.client as client from bec_widgets.cli.auto_updates import AutoUpdates @@ -24,6 +24,8 @@ if TYPE_CHECKING: from bec_widgets.cli.client import BECDockArea, BECFigure +from bec_lib.serialization import MsgpackSerialization + messages = lazy_import("bec_lib.messages") # from bec_lib.connector import MessageObject MessageObject = lazy_import_from("bec_lib.connector", ("MessageObject",)) @@ -205,6 +207,48 @@ class RPCResponseTimeoutError(Exception): ) +class QtRedisMessageWaiter: + def __init__(self, redis_connector, message_to_wait): + self.ev_loop = QEventLoop() + self.response = None + self.connector = redis_connector + self.message_to_wait = message_to_wait + self.pubsub = redis_connector._redis_conn.pubsub() + self.pubsub.subscribe(self.message_to_wait.endpoint) + fd = self.pubsub.connection._sock.fileno() + self.notifier = QSocketNotifier(fd, QSocketNotifier.Read) + self.notifier.activated.connect(self._pubsub_readable) + + def _msg_received(self, msg_obj): + self.response = msg_obj.value + self.ev_loop.quit() + + def wait(self, timeout=1): + timer = QTimer() + timer.singleShot(timeout * 1000, self.ev_loop.quit) + self.ev_loop.exec_() + timer.stop() + self.notifier.setEnabled(False) + self.pubsub.close() + return self.response + + def _pubsub_readable(self, fd): + while True: + msg = self.pubsub.get_message() + if msg: + if msg["type"] == "subscribe": + # get_message buffers, so we may already have the answer + # let's check... + continue + else: + break + else: + return + channel = msg["channel"].decode() + msg = MessageObject(topic=channel, value=MsgpackSerialization.loads(msg["data"])) + self.connector._execute_callback(self._msg_received, msg, {}) + + class RPCBase: def __init__(self, gui_id: str = None, config: dict = None, parent=None) -> None: self._client = BECDispatcher().client @@ -231,7 +275,7 @@ class RPCBase: parent = parent._parent return parent - def _run_rpc(self, method, *args, wait_for_rpc_response=True, **kwargs): + def _run_rpc(self, method, *args, wait_for_rpc_response=True, timeout=3, **kwargs): """ Run the RPC call. @@ -253,16 +297,24 @@ class RPCBase: # pylint: disable=protected-access receiver = self._root._gui_id + if wait_for_rpc_response: + redis_msg = QtRedisMessageWaiter( + self._client.connector, MessageEndpoints.gui_instruction_response(request_id) + ) + self._client.connector.set_and_publish(MessageEndpoints.gui_instructions(receiver), rpc_msg) - if not wait_for_rpc_response: - return None - response = self._wait_for_response(request_id) - # get class name - if not response.content["accepted"]: - raise ValueError(response.content["message"]["error"]) - msg_result = response.content["message"].get("result") - return self._create_widget_from_msg_result(msg_result) + if wait_for_rpc_response: + response = redis_msg.wait(timeout) + + if response is None: + raise RPCResponseTimeoutError(request_id, timeout) + + # get class name + if not response.accepted: + raise ValueError(response.message["error"]) + msg_result = response.message.get("result") + return self._create_widget_from_msg_result(msg_result) def _create_widget_from_msg_result(self, msg_result): if msg_result is None: @@ -285,30 +337,6 @@ class RPCBase: return cls(parent=self, **msg_result) return msg_result - def _wait_for_response(self, request_id: str, timeout: int = 5): - """ - Wait for the response from the server. - - Args: - request_id(str): The request ID. - timeout(int): The timeout in seconds. - - Returns: - The response from the server. - """ - start_time = time.time() - response = None - - while response is None and self.gui_is_alive() and (time.time() - start_time) < timeout: - response = self._client.connector.get( - MessageEndpoints.gui_instruction_response(request_id) - ) - QCoreApplication.processEvents() # keep UI responsive (and execute signals/slots) - if response is None and (time.time() - start_time) >= timeout: - raise RPCResponseTimeoutError(request_id, timeout) - - return response - def gui_is_alive(self): """ Check if the GUI is alive. diff --git a/bec_widgets/examples/jupyter_console/jupyter_console_window.py b/bec_widgets/examples/jupyter_console/jupyter_console_window.py index c8736be2..911e9b66 100644 --- a/bec_widgets/examples/jupyter_console/jupyter_console_window.py +++ b/bec_widgets/examples/jupyter_console/jupyter_console_window.py @@ -136,17 +136,18 @@ if __name__ == "__main__": # pragma: no cover module_path = os.path.dirname(bec_widgets.__file__) + app = QApplication(sys.argv) + app.setApplicationName("Jupyter Console") + app.setApplicationDisplayName("Jupyter Console") + qdarktheme.setup_theme("auto") + icon = QIcon() + icon.addFile(os.path.join(module_path, "assets", "terminal_icon.png"), size=QSize(48, 48)) + app.setWindowIcon(icon) + bec_dispatcher = BECDispatcher() client = bec_dispatcher.client client.start() - app = QApplication(sys.argv) - app.setApplicationName("Jupyter Console") - app.setApplicationDisplayName("Jupyter Console") - # qdarktheme.setup_theme("auto") - icon = QIcon() - icon.addFile(os.path.join(module_path, "assets", "terminal_icon.png"), size=QSize(48, 48)) - app.setWindowIcon(icon) win = JupyterConsoleWindow() win.show() diff --git a/bec_widgets/utils/bec_dispatcher.py b/bec_widgets/utils/bec_dispatcher.py index 3a55fa4f..e31607b4 100644 --- a/bec_widgets/utils/bec_dispatcher.py +++ b/bec_widgets/utils/bec_dispatcher.py @@ -9,7 +9,7 @@ import redis from bec_lib.client import BECClient from bec_lib.redis_connector import MessageObject, RedisConnector from bec_lib.service_config import ServiceConfig -from qtpy.QtCore import QObject +from qtpy.QtCore import QCoreApplication, QObject from qtpy.QtCore import Signal as pyqtSignal if TYPE_CHECKING: @@ -71,6 +71,7 @@ class BECDispatcher: _instance = None _initialized = False + qapp = None def __new__(cls, client=None, config: str = None, *args, **kwargs): if cls._instance is None: @@ -82,6 +83,9 @@ class BECDispatcher: if self._initialized: return + if not QCoreApplication.instance(): + BECDispatcher.qapp = QCoreApplication([]) + self._slots = collections.defaultdict(set) self.client = client