From 5c83702382f5b66a75aec9243d08774a30f88088 Mon Sep 17 00:00:00 2001 From: Mathias Guijarro Date: Wed, 18 Dec 2024 15:26:52 +0100 Subject: [PATCH] refactor: move RPC-related classes and modules to 'rpc' directory This allows to break circular import, too --- bec_widgets/cli/client.py | 2 +- bec_widgets/cli/client_utils.py | 168 +---------------- bec_widgets/cli/generate_cli.py | 2 +- bec_widgets/cli/rpc/__init__.py | 0 bec_widgets/cli/rpc/rpc_base.py | 172 ++++++++++++++++++ bec_widgets/cli/{ => rpc}/rpc_register.py | 0 .../rpc_widget_handler.py} | 0 bec_widgets/cli/server.py | 2 +- bec_widgets/utils/bec_connector.py | 2 +- bec_widgets/widgets/containers/dock/dock.py | 2 +- .../layout_manager/layout_manager.py | 2 +- tests/unit_tests/conftest.py | 2 +- tests/unit_tests/test_generate_cli_client.py | 2 +- tests/unit_tests/test_rpc_register.py | 2 +- tests/unit_tests/test_rpc_widget_handler.py | 2 +- 15 files changed, 183 insertions(+), 177 deletions(-) create mode 100644 bec_widgets/cli/rpc/__init__.py create mode 100644 bec_widgets/cli/rpc/rpc_base.py rename bec_widgets/cli/{ => rpc}/rpc_register.py (100%) rename bec_widgets/cli/{rpc_wigdet_handler.py => rpc/rpc_widget_handler.py} (100%) diff --git a/bec_widgets/cli/client.py b/bec_widgets/cli/client.py index 1d4b620e..f9483ea2 100644 --- a/bec_widgets/cli/client.py +++ b/bec_widgets/cli/client.py @@ -5,7 +5,7 @@ from __future__ import annotations import enum from typing import Literal, Optional, overload -from bec_widgets.cli.client_utils import RPCBase, rpc_call +from bec_widgets.cli.rpc.rpc_base import RPCBase, rpc_call # pylint: skip-file diff --git a/bec_widgets/cli/client_utils.py b/bec_widgets/cli/client_utils.py index ca5755c0..91837b3a 100644 --- a/bec_widgets/cli/client_utils.py +++ b/bec_widgets/cli/client_utils.py @@ -7,20 +7,17 @@ import os import select import subprocess import threading -import time -import uuid from contextlib import contextmanager from dataclasses import dataclass -from functools import wraps from typing import TYPE_CHECKING -from bec_lib.client import BECClient from bec_lib.endpoints import MessageEndpoints from bec_lib.logger import bec_logger from bec_lib.utils.import_utils import isinstance_based_on_class_name, lazy_import, lazy_import_from import bec_widgets.cli.client as client from bec_widgets.cli.auto_updates import AutoUpdates +from bec_widgets.cli.rpc.rpc_base import RPCBase if TYPE_CHECKING: from bec_lib.device import DeviceBase @@ -33,37 +30,6 @@ BECDispatcher = lazy_import_from("bec_widgets.utils.bec_dispatcher", ("BECDispat logger = bec_logger.logger -def rpc_call(func): - """ - A decorator for calling a function on the server. - - Args: - func: The function to call. - - Returns: - The result of the function call. - """ - - @wraps(func) - def wrapper(self, *args, **kwargs): - # we could rely on a strict type check here, but this is more flexible - # moreover, it would anyway crash for objects... - out = [] - for arg in args: - if hasattr(arg, "name"): - arg = arg.name - out.append(arg) - args = tuple(out) - for key, val in kwargs.items(): - if hasattr(val, "name"): - kwargs[key] = val.name - if not self.gui_is_alive(): - raise RuntimeError("GUI is not alive") - return self._run_rpc(func.__name__, *args, **kwargs) - - return wrapper - - def _get_output(process, logger) -> None: log_func = {process.stdout: logger.debug, process.stderr: logger.error} stream_buffer = {process.stdout: [], process.stderr: []} @@ -134,138 +100,6 @@ class RepeatTimer(threading.Timer): self.function(*self.args, **self.kwargs) -class RPCResponseTimeoutError(Exception): - """Exception raised when an RPC response is not received within the expected time.""" - - def __init__(self, request_id, timeout): - super().__init__( - f"RPC response not received within {timeout} seconds for request ID {request_id}" - ) - - -class RPCBase: - def __init__(self, gui_id: str = None, config: dict = None, parent=None) -> None: - self._client = BECClient() # BECClient is a singleton; here, we simply get the instance - self._config = config if config is not None else {} - self._gui_id = gui_id if gui_id is not None else str(uuid.uuid4())[:5] - self._parent = parent - self._msg_wait_event = threading.Event() - self._rpc_response = None - super().__init__() - # print(f"RPCBase: {self._gui_id}") - - def __repr__(self): - type_ = type(self) - qualname = type_.__qualname__ - return f"<{qualname} object at {hex(id(self))}>" - - @property - def _root(self): - """ - Get the root widget. This is the BECFigure widget that holds - the anchor gui_id. - """ - parent = self - # pylint: disable=protected-access - while parent._parent is not None: - parent = parent._parent - return parent - - def _run_rpc(self, method, *args, wait_for_rpc_response=True, timeout=3, **kwargs): - """ - Run the RPC call. - - Args: - method: The method to call. - args: The arguments to pass to the method. - wait_for_rpc_response: Whether to wait for the RPC response. - kwargs: The keyword arguments to pass to the method. - - Returns: - The result of the RPC call. - """ - request_id = str(uuid.uuid4()) - rpc_msg = messages.GUIInstructionMessage( - action=method, - parameter={"args": args, "kwargs": kwargs, "gui_id": self._gui_id}, - metadata={"request_id": request_id}, - ) - - # pylint: disable=protected-access - receiver = self._root._gui_id - if wait_for_rpc_response: - self._rpc_response = None - self._msg_wait_event.clear() - self._client.connector.register( - MessageEndpoints.gui_instruction_response(request_id), - cb=self._on_rpc_response, - parent=self, - ) - - self._client.connector.set_and_publish(MessageEndpoints.gui_instructions(receiver), rpc_msg) - - if wait_for_rpc_response: - try: - finished = self._msg_wait_event.wait(timeout) - if not finished: - raise RPCResponseTimeoutError(request_id, timeout) - finally: - self._msg_wait_event.clear() - self._client.connector.unregister( - MessageEndpoints.gui_instruction_response(request_id), cb=self._on_rpc_response - ) - # get class name - if not self._rpc_response.accepted: - raise ValueError(self._rpc_response.message["error"]) - msg_result = self._rpc_response.message.get("result") - self._rpc_response = None - return self._create_widget_from_msg_result(msg_result) - - @staticmethod - def _on_rpc_response(msg: MessageObject, parent: RPCBase) -> None: - msg = msg.value - parent._msg_wait_event.set() - parent._rpc_response = msg - - def _create_widget_from_msg_result(self, msg_result): - if msg_result is None: - return None - if isinstance(msg_result, list): - return [self._create_widget_from_msg_result(res) for res in msg_result] - if isinstance(msg_result, dict): - if "__rpc__" not in msg_result: - return { - key: self._create_widget_from_msg_result(val) for key, val in msg_result.items() - } - cls = msg_result.pop("widget_class", None) - msg_result.pop("__rpc__", None) - - if not cls: - return msg_result - - cls = getattr(client, cls) - # print(msg_result) - return cls(parent=self, **msg_result) - return msg_result - - def gui_is_alive(self): - """ - Check if the GUI is alive. - """ - heart = self._client.connector.get(MessageEndpoints.gui_heartbeat(self._root._gui_id)) - if heart is None: - return False - if heart.status == messages.BECStatus.RUNNING: - return True - return False - - -class RepeatTimer(threading.Timer): - def run(self): - while not self.finished.wait(self.interval): - self.function(*self.args, **self.kwargs) - - @contextmanager def wait_for_server(client): timeout = client._startup_timeout diff --git a/bec_widgets/cli/generate_cli.py b/bec_widgets/cli/generate_cli.py index 8fed8b67..44dea1f5 100644 --- a/bec_widgets/cli/generate_cli.py +++ b/bec_widgets/cli/generate_cli.py @@ -35,7 +35,7 @@ from __future__ import annotations import enum from typing import Literal, Optional, overload -from bec_widgets.cli.client_utils import RPCBase, rpc_call +from bec_widgets.cli.rpc.rpc_base import RPCBase, rpc_call # pylint: skip-file""" diff --git a/bec_widgets/cli/rpc/__init__.py b/bec_widgets/cli/rpc/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/bec_widgets/cli/rpc/rpc_base.py b/bec_widgets/cli/rpc/rpc_base.py new file mode 100644 index 00000000..89547630 --- /dev/null +++ b/bec_widgets/cli/rpc/rpc_base.py @@ -0,0 +1,172 @@ +from __future__ import annotations + +import threading +import uuid +from functools import wraps + +from bec_lib.client import BECClient +from bec_lib.endpoints import MessageEndpoints +from bec_lib.utils.import_utils import lazy_import, lazy_import_from + +import bec_widgets.cli.client as client + +messages = lazy_import("bec_lib.messages") +# from bec_lib.connector import MessageObject +MessageObject = lazy_import_from("bec_lib.connector", ("MessageObject",)) + + +def rpc_call(func): + """ + A decorator for calling a function on the server. + + Args: + func: The function to call. + + Returns: + The result of the function call. + """ + + @wraps(func) + def wrapper(self, *args, **kwargs): + # we could rely on a strict type check here, but this is more flexible + # moreover, it would anyway crash for objects... + out = [] + for arg in args: + if hasattr(arg, "name"): + arg = arg.name + out.append(arg) + args = tuple(out) + for key, val in kwargs.items(): + if hasattr(val, "name"): + kwargs[key] = val.name + if not self.gui_is_alive(): + raise RuntimeError("GUI is not alive") + return self._run_rpc(func.__name__, *args, **kwargs) + + return wrapper + + +class RPCResponseTimeoutError(Exception): + """Exception raised when an RPC response is not received within the expected time.""" + + def __init__(self, request_id, timeout): + super().__init__( + f"RPC response not received within {timeout} seconds for request ID {request_id}" + ) + + +class RPCBase: + def __init__(self, gui_id: str = None, config: dict = None, parent=None) -> None: + self._client = BECClient() # BECClient is a singleton; here, we simply get the instance + self._config = config if config is not None else {} + self._gui_id = gui_id if gui_id is not None else str(uuid.uuid4())[:5] + self._parent = parent + self._msg_wait_event = threading.Event() + self._rpc_response = None + super().__init__() + # print(f"RPCBase: {self._gui_id}") + + def __repr__(self): + type_ = type(self) + qualname = type_.__qualname__ + return f"<{qualname} object at {hex(id(self))}>" + + @property + def _root(self): + """ + Get the root widget. This is the BECFigure widget that holds + the anchor gui_id. + """ + parent = self + # pylint: disable=protected-access + while parent._parent is not None: + parent = parent._parent + return parent + + def _run_rpc(self, method, *args, wait_for_rpc_response=True, timeout=3, **kwargs): + """ + Run the RPC call. + + Args: + method: The method to call. + args: The arguments to pass to the method. + wait_for_rpc_response: Whether to wait for the RPC response. + kwargs: The keyword arguments to pass to the method. + + Returns: + The result of the RPC call. + """ + request_id = str(uuid.uuid4()) + rpc_msg = messages.GUIInstructionMessage( + action=method, + parameter={"args": args, "kwargs": kwargs, "gui_id": self._gui_id}, + metadata={"request_id": request_id}, + ) + + # pylint: disable=protected-access + receiver = self._root._gui_id + if wait_for_rpc_response: + self._rpc_response = None + self._msg_wait_event.clear() + self._client.connector.register( + MessageEndpoints.gui_instruction_response(request_id), + cb=self._on_rpc_response, + parent=self, + ) + + self._client.connector.set_and_publish(MessageEndpoints.gui_instructions(receiver), rpc_msg) + + if wait_for_rpc_response: + try: + finished = self._msg_wait_event.wait(timeout) + if not finished: + raise RPCResponseTimeoutError(request_id, timeout) + finally: + self._msg_wait_event.clear() + self._client.connector.unregister( + MessageEndpoints.gui_instruction_response(request_id), cb=self._on_rpc_response + ) + # get class name + if not self._rpc_response.accepted: + raise ValueError(self._rpc_response.message["error"]) + msg_result = self._rpc_response.message.get("result") + self._rpc_response = None + return self._create_widget_from_msg_result(msg_result) + + @staticmethod + def _on_rpc_response(msg: MessageObject, parent: RPCBase) -> None: + msg = msg.value + parent._msg_wait_event.set() + parent._rpc_response = msg + + def _create_widget_from_msg_result(self, msg_result): + if msg_result is None: + return None + if isinstance(msg_result, list): + return [self._create_widget_from_msg_result(res) for res in msg_result] + if isinstance(msg_result, dict): + if "__rpc__" not in msg_result: + return { + key: self._create_widget_from_msg_result(val) for key, val in msg_result.items() + } + cls = msg_result.pop("widget_class", None) + msg_result.pop("__rpc__", None) + + if not cls: + return msg_result + + cls = getattr(client, cls) + # print(msg_result) + return cls(parent=self, **msg_result) + return msg_result + + def gui_is_alive(self): + """ + Check if the GUI is alive. + """ + heart = self._client.connector.get(MessageEndpoints.gui_heartbeat(self._root._gui_id)) + if heart is None: + return False + if heart.status == messages.BECStatus.RUNNING: + return True + return False diff --git a/bec_widgets/cli/rpc_register.py b/bec_widgets/cli/rpc/rpc_register.py similarity index 100% rename from bec_widgets/cli/rpc_register.py rename to bec_widgets/cli/rpc/rpc_register.py diff --git a/bec_widgets/cli/rpc_wigdet_handler.py b/bec_widgets/cli/rpc/rpc_widget_handler.py similarity index 100% rename from bec_widgets/cli/rpc_wigdet_handler.py rename to bec_widgets/cli/rpc/rpc_widget_handler.py diff --git a/bec_widgets/cli/server.py b/bec_widgets/cli/server.py index 5a5cbe4b..376f95b4 100644 --- a/bec_widgets/cli/server.py +++ b/bec_widgets/cli/server.py @@ -12,7 +12,7 @@ from bec_lib.service_config import ServiceConfig from bec_lib.utils.import_utils import lazy_import from qtpy.QtCore import Qt, QTimer -from bec_widgets.cli.rpc_register import RPCRegister +from bec_widgets.cli.rpc.rpc_register import RPCRegister from bec_widgets.utils import BECDispatcher from bec_widgets.utils.bec_connector import BECConnector from bec_widgets.widgets.containers.dock import BECDockArea diff --git a/bec_widgets/utils/bec_connector.py b/bec_widgets/utils/bec_connector.py index d739c842..ec248cd6 100644 --- a/bec_widgets/utils/bec_connector.py +++ b/bec_widgets/utils/bec_connector.py @@ -12,7 +12,7 @@ from pydantic import BaseModel, Field, field_validator from qtpy.QtCore import QObject, QRunnable, QThreadPool, Signal from qtpy.QtWidgets import QApplication -from bec_widgets.cli.rpc_register import RPCRegister +from bec_widgets.cli.rpc.rpc_register import RPCRegister from bec_widgets.qt_utils.error_popups import ErrorPopupUtility from bec_widgets.qt_utils.error_popups import SafeSlot as pyqtSlot from bec_widgets.utils.yaml_dialog import load_yaml, load_yaml_gui, save_yaml, save_yaml_gui diff --git a/bec_widgets/widgets/containers/dock/dock.py b/bec_widgets/widgets/containers/dock/dock.py index c7541de1..08235c07 100644 --- a/bec_widgets/widgets/containers/dock/dock.py +++ b/bec_widgets/widgets/containers/dock/dock.py @@ -6,7 +6,7 @@ from pydantic import Field from pyqtgraph.dockarea import Dock, DockLabel from qtpy import QtCore, QtGui -from bec_widgets.cli.rpc_wigdet_handler import widget_handler +from bec_widgets.cli.rpc.rpc_widget_handler import widget_handler from bec_widgets.utils import ConnectionConfig, GridLayoutManager from bec_widgets.utils.bec_widget import BECWidget diff --git a/bec_widgets/widgets/containers/layout_manager/layout_manager.py b/bec_widgets/widgets/containers/layout_manager/layout_manager.py index 07a90845..a5238cca 100644 --- a/bec_widgets/widgets/containers/layout_manager/layout_manager.py +++ b/bec_widgets/widgets/containers/layout_manager/layout_manager.py @@ -20,7 +20,7 @@ from qtpy.QtWidgets import ( ) from typeguard import typechecked -from bec_widgets.cli.rpc_wigdet_handler import widget_handler +from bec_widgets.cli.rpc.rpc_widget_handler import widget_handler class LayoutManagerWidget(QWidget): diff --git a/tests/unit_tests/conftest.py b/tests/unit_tests/conftest.py index 9a17a992..cff2577a 100644 --- a/tests/unit_tests/conftest.py +++ b/tests/unit_tests/conftest.py @@ -5,7 +5,7 @@ from pytestqt.exceptions import TimeoutError as QtBotTimeoutError from qtpy.QtCore import QTimer from qtpy.QtWidgets import QApplication -from bec_widgets.cli.rpc_register import RPCRegister +from bec_widgets.cli.rpc.rpc_register import RPCRegister from bec_widgets.qt_utils import error_popups from bec_widgets.utils import bec_dispatcher as bec_dispatcher_module diff --git a/tests/unit_tests/test_generate_cli_client.py b/tests/unit_tests/test_generate_cli_client.py index 062db211..f6655b39 100644 --- a/tests/unit_tests/test_generate_cli_client.py +++ b/tests/unit_tests/test_generate_cli_client.py @@ -70,7 +70,7 @@ def test_client_generator_with_black_formatting(): import enum from typing import Literal, Optional, overload - from bec_widgets.cli.client_utils import RPCBase, rpc_call + from bec_widgets.cli.rpc.rpc_base import RPCBase, rpc_call # pylint: skip-file diff --git a/tests/unit_tests/test_rpc_register.py b/tests/unit_tests/test_rpc_register.py index e6e37dab..574f11bb 100644 --- a/tests/unit_tests/test_rpc_register.py +++ b/tests/unit_tests/test_rpc_register.py @@ -1,4 +1,4 @@ -from bec_widgets.cli.rpc_register import RPCRegister +from bec_widgets.cli.rpc.rpc_register import RPCRegister class FakeObject: diff --git a/tests/unit_tests/test_rpc_widget_handler.py b/tests/unit_tests/test_rpc_widget_handler.py index ddd8f47d..eacadbd4 100644 --- a/tests/unit_tests/test_rpc_widget_handler.py +++ b/tests/unit_tests/test_rpc_widget_handler.py @@ -1,4 +1,4 @@ -from bec_widgets.cli.rpc_wigdet_handler import RPCWidgetHandler +from bec_widgets.cli.rpc.rpc_widget_handler import RPCWidgetHandler def test_rpc_widget_handler():