fix: do not display error popup if command is executed via RPC

This commit is contained in:
2024-12-20 13:13:05 +01:00
committed by wakonig_k
parent c405421db9
commit 52c5286d64
3 changed files with 51 additions and 15 deletions
+38 -13
View File
@@ -1,9 +1,11 @@
from __future__ import annotations
import functools
import json
import signal
import sys
from contextlib import redirect_stderr, redirect_stdout
import types
from contextlib import contextmanager, redirect_stderr, redirect_stdout
from typing import Union
from bec_lib.endpoints import MessageEndpoints
@@ -13,6 +15,7 @@ from bec_lib.utils.import_utils import lazy_import
from qtpy.QtCore import Qt, QTimer
from bec_widgets.cli.rpc.rpc_register import RPCRegister
from bec_widgets.qt_utils.error_popups import ErrorPopupUtility
from bec_widgets.utils import BECDispatcher
from bec_widgets.utils.bec_connector import BECConnector
from bec_widgets.widgets.containers.dock import BECDockArea
@@ -23,6 +26,27 @@ messages = lazy_import("bec_lib.messages")
logger = bec_logger.logger
@contextmanager
def rpc_exception_hook(err_func):
"""This context replaces the popup message box for error display with a specific hook"""
# get error popup utility singleton
popup = ErrorPopupUtility()
# save current setting
old_exception_hook = popup.custom_exception_hook
# install err_func, if it is a callable
def custom_exception_hook(self, exc_type, value, tb, **kwargs):
err_func({"error": popup.get_error_message(exc_type, value, tb)})
popup.custom_exception_hook = types.MethodType(custom_exception_hook, popup)
try:
yield popup
finally:
# restore state of error popup utility singleton
popup.custom_exception_hook = old_exception_hook
class BECWidgetsCLIServer:
def __init__(
@@ -57,18 +81,19 @@ class BECWidgetsCLIServer:
def on_rpc_update(self, msg: dict, metadata: dict):
request_id = metadata.get("request_id")
logger.debug(f"Received RPC instruction: {msg}, metadata: {metadata}")
try:
obj = self.get_object_from_config(msg["parameter"])
method = msg["action"]
args = msg["parameter"].get("args", [])
kwargs = msg["parameter"].get("kwargs", {})
res = self.run_rpc(obj, method, args, kwargs)
except Exception as e:
logger.error(f"Error while executing RPC instruction: {e}")
self.send_response(request_id, False, {"error": str(e)})
else:
logger.debug(f"RPC instruction executed successfully: {res}")
self.send_response(request_id, True, {"result": res})
with rpc_exception_hook(functools.partial(self.send_response, request_id, False)):
try:
obj = self.get_object_from_config(msg["parameter"])
method = msg["action"]
args = msg["parameter"].get("args", [])
kwargs = msg["parameter"].get("kwargs", {})
res = self.run_rpc(obj, method, args, kwargs)
except Exception as e:
logger.error(f"Error while executing RPC instruction: {e}")
self.send_response(request_id, False, {"error": str(e)})
else:
logger.debug(f"RPC instruction executed successfully: {res}")
self.send_response(request_id, True, {"result": res})
def send_response(self, request_id: str, accepted: bool, msg: dict):
self.client.connector.set_and_publish(