0
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2025-07-13 11:11:49 +02:00

refactor: move RPC-related classes and modules to 'rpc' directory

This allows to break circular import, too
This commit is contained in:
2024-12-18 15:26:52 +01:00
committed by wakonig_k
parent 1b0382524f
commit 5c83702382
15 changed files with 183 additions and 177 deletions

View File

@ -5,7 +5,7 @@ from __future__ import annotations
import enum
from typing import Literal, Optional, overload
from bec_widgets.cli.client_utils import RPCBase, rpc_call
from bec_widgets.cli.rpc.rpc_base import RPCBase, rpc_call
# pylint: skip-file

View File

@ -7,20 +7,17 @@ import os
import select
import subprocess
import threading
import time
import uuid
from contextlib import contextmanager
from dataclasses import dataclass
from functools import wraps
from typing import TYPE_CHECKING
from bec_lib.client import BECClient
from bec_lib.endpoints import MessageEndpoints
from bec_lib.logger import bec_logger
from bec_lib.utils.import_utils import isinstance_based_on_class_name, lazy_import, lazy_import_from
import bec_widgets.cli.client as client
from bec_widgets.cli.auto_updates import AutoUpdates
from bec_widgets.cli.rpc.rpc_base import RPCBase
if TYPE_CHECKING:
from bec_lib.device import DeviceBase
@ -33,37 +30,6 @@ BECDispatcher = lazy_import_from("bec_widgets.utils.bec_dispatcher", ("BECDispat
logger = bec_logger.logger
def rpc_call(func):
"""
A decorator for calling a function on the server.
Args:
func: The function to call.
Returns:
The result of the function call.
"""
@wraps(func)
def wrapper(self, *args, **kwargs):
# we could rely on a strict type check here, but this is more flexible
# moreover, it would anyway crash for objects...
out = []
for arg in args:
if hasattr(arg, "name"):
arg = arg.name
out.append(arg)
args = tuple(out)
for key, val in kwargs.items():
if hasattr(val, "name"):
kwargs[key] = val.name
if not self.gui_is_alive():
raise RuntimeError("GUI is not alive")
return self._run_rpc(func.__name__, *args, **kwargs)
return wrapper
def _get_output(process, logger) -> None:
log_func = {process.stdout: logger.debug, process.stderr: logger.error}
stream_buffer = {process.stdout: [], process.stderr: []}
@ -134,138 +100,6 @@ class RepeatTimer(threading.Timer):
self.function(*self.args, **self.kwargs)
class RPCResponseTimeoutError(Exception):
"""Exception raised when an RPC response is not received within the expected time."""
def __init__(self, request_id, timeout):
super().__init__(
f"RPC response not received within {timeout} seconds for request ID {request_id}"
)
class RPCBase:
def __init__(self, gui_id: str = None, config: dict = None, parent=None) -> None:
self._client = BECClient() # BECClient is a singleton; here, we simply get the instance
self._config = config if config is not None else {}
self._gui_id = gui_id if gui_id is not None else str(uuid.uuid4())[:5]
self._parent = parent
self._msg_wait_event = threading.Event()
self._rpc_response = None
super().__init__()
# print(f"RPCBase: {self._gui_id}")
def __repr__(self):
type_ = type(self)
qualname = type_.__qualname__
return f"<{qualname} object at {hex(id(self))}>"
@property
def _root(self):
"""
Get the root widget. This is the BECFigure widget that holds
the anchor gui_id.
"""
parent = self
# pylint: disable=protected-access
while parent._parent is not None:
parent = parent._parent
return parent
def _run_rpc(self, method, *args, wait_for_rpc_response=True, timeout=3, **kwargs):
"""
Run the RPC call.
Args:
method: The method to call.
args: The arguments to pass to the method.
wait_for_rpc_response: Whether to wait for the RPC response.
kwargs: The keyword arguments to pass to the method.
Returns:
The result of the RPC call.
"""
request_id = str(uuid.uuid4())
rpc_msg = messages.GUIInstructionMessage(
action=method,
parameter={"args": args, "kwargs": kwargs, "gui_id": self._gui_id},
metadata={"request_id": request_id},
)
# pylint: disable=protected-access
receiver = self._root._gui_id
if wait_for_rpc_response:
self._rpc_response = None
self._msg_wait_event.clear()
self._client.connector.register(
MessageEndpoints.gui_instruction_response(request_id),
cb=self._on_rpc_response,
parent=self,
)
self._client.connector.set_and_publish(MessageEndpoints.gui_instructions(receiver), rpc_msg)
if wait_for_rpc_response:
try:
finished = self._msg_wait_event.wait(timeout)
if not finished:
raise RPCResponseTimeoutError(request_id, timeout)
finally:
self._msg_wait_event.clear()
self._client.connector.unregister(
MessageEndpoints.gui_instruction_response(request_id), cb=self._on_rpc_response
)
# get class name
if not self._rpc_response.accepted:
raise ValueError(self._rpc_response.message["error"])
msg_result = self._rpc_response.message.get("result")
self._rpc_response = None
return self._create_widget_from_msg_result(msg_result)
@staticmethod
def _on_rpc_response(msg: MessageObject, parent: RPCBase) -> None:
msg = msg.value
parent._msg_wait_event.set()
parent._rpc_response = msg
def _create_widget_from_msg_result(self, msg_result):
if msg_result is None:
return None
if isinstance(msg_result, list):
return [self._create_widget_from_msg_result(res) for res in msg_result]
if isinstance(msg_result, dict):
if "__rpc__" not in msg_result:
return {
key: self._create_widget_from_msg_result(val) for key, val in msg_result.items()
}
cls = msg_result.pop("widget_class", None)
msg_result.pop("__rpc__", None)
if not cls:
return msg_result
cls = getattr(client, cls)
# print(msg_result)
return cls(parent=self, **msg_result)
return msg_result
def gui_is_alive(self):
"""
Check if the GUI is alive.
"""
heart = self._client.connector.get(MessageEndpoints.gui_heartbeat(self._root._gui_id))
if heart is None:
return False
if heart.status == messages.BECStatus.RUNNING:
return True
return False
class RepeatTimer(threading.Timer):
def run(self):
while not self.finished.wait(self.interval):
self.function(*self.args, **self.kwargs)
@contextmanager
def wait_for_server(client):
timeout = client._startup_timeout

View File

@ -35,7 +35,7 @@ from __future__ import annotations
import enum
from typing import Literal, Optional, overload
from bec_widgets.cli.client_utils import RPCBase, rpc_call
from bec_widgets.cli.rpc.rpc_base import RPCBase, rpc_call
# pylint: skip-file"""

View File

View File

@ -0,0 +1,172 @@
from __future__ import annotations
import threading
import uuid
from functools import wraps
from bec_lib.client import BECClient
from bec_lib.endpoints import MessageEndpoints
from bec_lib.utils.import_utils import lazy_import, lazy_import_from
import bec_widgets.cli.client as client
messages = lazy_import("bec_lib.messages")
# from bec_lib.connector import MessageObject
MessageObject = lazy_import_from("bec_lib.connector", ("MessageObject",))
def rpc_call(func):
"""
A decorator for calling a function on the server.
Args:
func: The function to call.
Returns:
The result of the function call.
"""
@wraps(func)
def wrapper(self, *args, **kwargs):
# we could rely on a strict type check here, but this is more flexible
# moreover, it would anyway crash for objects...
out = []
for arg in args:
if hasattr(arg, "name"):
arg = arg.name
out.append(arg)
args = tuple(out)
for key, val in kwargs.items():
if hasattr(val, "name"):
kwargs[key] = val.name
if not self.gui_is_alive():
raise RuntimeError("GUI is not alive")
return self._run_rpc(func.__name__, *args, **kwargs)
return wrapper
class RPCResponseTimeoutError(Exception):
"""Exception raised when an RPC response is not received within the expected time."""
def __init__(self, request_id, timeout):
super().__init__(
f"RPC response not received within {timeout} seconds for request ID {request_id}"
)
class RPCBase:
def __init__(self, gui_id: str = None, config: dict = None, parent=None) -> None:
self._client = BECClient() # BECClient is a singleton; here, we simply get the instance
self._config = config if config is not None else {}
self._gui_id = gui_id if gui_id is not None else str(uuid.uuid4())[:5]
self._parent = parent
self._msg_wait_event = threading.Event()
self._rpc_response = None
super().__init__()
# print(f"RPCBase: {self._gui_id}")
def __repr__(self):
type_ = type(self)
qualname = type_.__qualname__
return f"<{qualname} object at {hex(id(self))}>"
@property
def _root(self):
"""
Get the root widget. This is the BECFigure widget that holds
the anchor gui_id.
"""
parent = self
# pylint: disable=protected-access
while parent._parent is not None:
parent = parent._parent
return parent
def _run_rpc(self, method, *args, wait_for_rpc_response=True, timeout=3, **kwargs):
"""
Run the RPC call.
Args:
method: The method to call.
args: The arguments to pass to the method.
wait_for_rpc_response: Whether to wait for the RPC response.
kwargs: The keyword arguments to pass to the method.
Returns:
The result of the RPC call.
"""
request_id = str(uuid.uuid4())
rpc_msg = messages.GUIInstructionMessage(
action=method,
parameter={"args": args, "kwargs": kwargs, "gui_id": self._gui_id},
metadata={"request_id": request_id},
)
# pylint: disable=protected-access
receiver = self._root._gui_id
if wait_for_rpc_response:
self._rpc_response = None
self._msg_wait_event.clear()
self._client.connector.register(
MessageEndpoints.gui_instruction_response(request_id),
cb=self._on_rpc_response,
parent=self,
)
self._client.connector.set_and_publish(MessageEndpoints.gui_instructions(receiver), rpc_msg)
if wait_for_rpc_response:
try:
finished = self._msg_wait_event.wait(timeout)
if not finished:
raise RPCResponseTimeoutError(request_id, timeout)
finally:
self._msg_wait_event.clear()
self._client.connector.unregister(
MessageEndpoints.gui_instruction_response(request_id), cb=self._on_rpc_response
)
# get class name
if not self._rpc_response.accepted:
raise ValueError(self._rpc_response.message["error"])
msg_result = self._rpc_response.message.get("result")
self._rpc_response = None
return self._create_widget_from_msg_result(msg_result)
@staticmethod
def _on_rpc_response(msg: MessageObject, parent: RPCBase) -> None:
msg = msg.value
parent._msg_wait_event.set()
parent._rpc_response = msg
def _create_widget_from_msg_result(self, msg_result):
if msg_result is None:
return None
if isinstance(msg_result, list):
return [self._create_widget_from_msg_result(res) for res in msg_result]
if isinstance(msg_result, dict):
if "__rpc__" not in msg_result:
return {
key: self._create_widget_from_msg_result(val) for key, val in msg_result.items()
}
cls = msg_result.pop("widget_class", None)
msg_result.pop("__rpc__", None)
if not cls:
return msg_result
cls = getattr(client, cls)
# print(msg_result)
return cls(parent=self, **msg_result)
return msg_result
def gui_is_alive(self):
"""
Check if the GUI is alive.
"""
heart = self._client.connector.get(MessageEndpoints.gui_heartbeat(self._root._gui_id))
if heart is None:
return False
if heart.status == messages.BECStatus.RUNNING:
return True
return False

View File

@ -12,7 +12,7 @@ from bec_lib.service_config import ServiceConfig
from bec_lib.utils.import_utils import lazy_import
from qtpy.QtCore import Qt, QTimer
from bec_widgets.cli.rpc_register import RPCRegister
from bec_widgets.cli.rpc.rpc_register import RPCRegister
from bec_widgets.utils import BECDispatcher
from bec_widgets.utils.bec_connector import BECConnector
from bec_widgets.widgets.containers.dock import BECDockArea

View File

@ -12,7 +12,7 @@ from pydantic import BaseModel, Field, field_validator
from qtpy.QtCore import QObject, QRunnable, QThreadPool, Signal
from qtpy.QtWidgets import QApplication
from bec_widgets.cli.rpc_register import RPCRegister
from bec_widgets.cli.rpc.rpc_register import RPCRegister
from bec_widgets.qt_utils.error_popups import ErrorPopupUtility
from bec_widgets.qt_utils.error_popups import SafeSlot as pyqtSlot
from bec_widgets.utils.yaml_dialog import load_yaml, load_yaml_gui, save_yaml, save_yaml_gui

View File

@ -6,7 +6,7 @@ from pydantic import Field
from pyqtgraph.dockarea import Dock, DockLabel
from qtpy import QtCore, QtGui
from bec_widgets.cli.rpc_wigdet_handler import widget_handler
from bec_widgets.cli.rpc.rpc_widget_handler import widget_handler
from bec_widgets.utils import ConnectionConfig, GridLayoutManager
from bec_widgets.utils.bec_widget import BECWidget

View File

@ -20,7 +20,7 @@ from qtpy.QtWidgets import (
)
from typeguard import typechecked
from bec_widgets.cli.rpc_wigdet_handler import widget_handler
from bec_widgets.cli.rpc.rpc_widget_handler import widget_handler
class LayoutManagerWidget(QWidget):

View File

@ -5,7 +5,7 @@ from pytestqt.exceptions import TimeoutError as QtBotTimeoutError
from qtpy.QtCore import QTimer
from qtpy.QtWidgets import QApplication
from bec_widgets.cli.rpc_register import RPCRegister
from bec_widgets.cli.rpc.rpc_register import RPCRegister
from bec_widgets.qt_utils import error_popups
from bec_widgets.utils import bec_dispatcher as bec_dispatcher_module

View File

@ -70,7 +70,7 @@ def test_client_generator_with_black_formatting():
import enum
from typing import Literal, Optional, overload
from bec_widgets.cli.client_utils import RPCBase, rpc_call
from bec_widgets.cli.rpc.rpc_base import RPCBase, rpc_call
# pylint: skip-file

View File

@ -1,4 +1,4 @@
from bec_widgets.cli.rpc_register import RPCRegister
from bec_widgets.cli.rpc.rpc_register import RPCRegister
class FakeObject:

View File

@ -1,4 +1,4 @@
from bec_widgets.cli.rpc_wigdet_handler import RPCWidgetHandler
from bec_widgets.cli.rpc.rpc_widget_handler import RPCWidgetHandler
def test_rpc_widget_handler():