0
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2025-07-13 19:21:50 +02:00

feat: implement non-polling, interruptible waiting of gui instruction response with timeout

This commit is contained in:
2024-05-07 17:10:57 +02:00
committed by wyzula-jan
parent 99fb82561b
commit abc6caa2d0
3 changed files with 75 additions and 42 deletions

View File

@ -14,7 +14,7 @@ from typing import TYPE_CHECKING
from bec_lib.endpoints import MessageEndpoints
from bec_lib.utils.import_utils import isinstance_based_on_class_name, lazy_import, lazy_import_from
from qtpy.QtCore import QCoreApplication
from qtpy.QtCore import QEventLoop, QSocketNotifier, QTimer
import bec_widgets.cli.client as client
from bec_widgets.cli.auto_updates import AutoUpdates
@ -24,6 +24,8 @@ if TYPE_CHECKING:
from bec_widgets.cli.client import BECDockArea, BECFigure
from bec_lib.serialization import MsgpackSerialization
messages = lazy_import("bec_lib.messages")
# from bec_lib.connector import MessageObject
MessageObject = lazy_import_from("bec_lib.connector", ("MessageObject",))
@ -205,6 +207,48 @@ class RPCResponseTimeoutError(Exception):
)
class QtRedisMessageWaiter:
def __init__(self, redis_connector, message_to_wait):
self.ev_loop = QEventLoop()
self.response = None
self.connector = redis_connector
self.message_to_wait = message_to_wait
self.pubsub = redis_connector._redis_conn.pubsub()
self.pubsub.subscribe(self.message_to_wait.endpoint)
fd = self.pubsub.connection._sock.fileno()
self.notifier = QSocketNotifier(fd, QSocketNotifier.Read)
self.notifier.activated.connect(self._pubsub_readable)
def _msg_received(self, msg_obj):
self.response = msg_obj.value
self.ev_loop.quit()
def wait(self, timeout=1):
timer = QTimer()
timer.singleShot(timeout * 1000, self.ev_loop.quit)
self.ev_loop.exec_()
timer.stop()
self.notifier.setEnabled(False)
self.pubsub.close()
return self.response
def _pubsub_readable(self, fd):
while True:
msg = self.pubsub.get_message()
if msg:
if msg["type"] == "subscribe":
# get_message buffers, so we may already have the answer
# let's check...
continue
else:
break
else:
return
channel = msg["channel"].decode()
msg = MessageObject(topic=channel, value=MsgpackSerialization.loads(msg["data"]))
self.connector._execute_callback(self._msg_received, msg, {})
class RPCBase:
def __init__(self, gui_id: str = None, config: dict = None, parent=None) -> None:
self._client = BECDispatcher().client
@ -231,7 +275,7 @@ class RPCBase:
parent = parent._parent
return parent
def _run_rpc(self, method, *args, wait_for_rpc_response=True, **kwargs):
def _run_rpc(self, method, *args, wait_for_rpc_response=True, timeout=3, **kwargs):
"""
Run the RPC call.
@ -253,16 +297,24 @@ class RPCBase:
# pylint: disable=protected-access
receiver = self._root._gui_id
if wait_for_rpc_response:
redis_msg = QtRedisMessageWaiter(
self._client.connector, MessageEndpoints.gui_instruction_response(request_id)
)
self._client.connector.set_and_publish(MessageEndpoints.gui_instructions(receiver), rpc_msg)
if not wait_for_rpc_response:
return None
response = self._wait_for_response(request_id)
# get class name
if not response.content["accepted"]:
raise ValueError(response.content["message"]["error"])
msg_result = response.content["message"].get("result")
return self._create_widget_from_msg_result(msg_result)
if wait_for_rpc_response:
response = redis_msg.wait(timeout)
if response is None:
raise RPCResponseTimeoutError(request_id, timeout)
# get class name
if not response.accepted:
raise ValueError(response.message["error"])
msg_result = response.message.get("result")
return self._create_widget_from_msg_result(msg_result)
def _create_widget_from_msg_result(self, msg_result):
if msg_result is None:
@ -285,30 +337,6 @@ class RPCBase:
return cls(parent=self, **msg_result)
return msg_result
def _wait_for_response(self, request_id: str, timeout: int = 5):
"""
Wait for the response from the server.
Args:
request_id(str): The request ID.
timeout(int): The timeout in seconds.
Returns:
The response from the server.
"""
start_time = time.time()
response = None
while response is None and self.gui_is_alive() and (time.time() - start_time) < timeout:
response = self._client.connector.get(
MessageEndpoints.gui_instruction_response(request_id)
)
QCoreApplication.processEvents() # keep UI responsive (and execute signals/slots)
if response is None and (time.time() - start_time) >= timeout:
raise RPCResponseTimeoutError(request_id, timeout)
return response
def gui_is_alive(self):
"""
Check if the GUI is alive.

View File

@ -136,17 +136,18 @@ if __name__ == "__main__": # pragma: no cover
module_path = os.path.dirname(bec_widgets.__file__)
app = QApplication(sys.argv)
app.setApplicationName("Jupyter Console")
app.setApplicationDisplayName("Jupyter Console")
qdarktheme.setup_theme("auto")
icon = QIcon()
icon.addFile(os.path.join(module_path, "assets", "terminal_icon.png"), size=QSize(48, 48))
app.setWindowIcon(icon)
bec_dispatcher = BECDispatcher()
client = bec_dispatcher.client
client.start()
app = QApplication(sys.argv)
app.setApplicationName("Jupyter Console")
app.setApplicationDisplayName("Jupyter Console")
# qdarktheme.setup_theme("auto")
icon = QIcon()
icon.addFile(os.path.join(module_path, "assets", "terminal_icon.png"), size=QSize(48, 48))
app.setWindowIcon(icon)
win = JupyterConsoleWindow()
win.show()

View File

@ -9,7 +9,7 @@ import redis
from bec_lib.client import BECClient
from bec_lib.redis_connector import MessageObject, RedisConnector
from bec_lib.service_config import ServiceConfig
from qtpy.QtCore import QObject
from qtpy.QtCore import QCoreApplication, QObject
from qtpy.QtCore import Signal as pyqtSignal
if TYPE_CHECKING:
@ -71,6 +71,7 @@ class BECDispatcher:
_instance = None
_initialized = False
qapp = None
def __new__(cls, client=None, config: str = None, *args, **kwargs):
if cls._instance is None:
@ -82,6 +83,9 @@ class BECDispatcher:
if self._initialized:
return
if not QCoreApplication.instance():
BECDispatcher.qapp = QCoreApplication([])
self._slots = collections.defaultdict(set)
self.client = client