fix(rpc): client/server rpc handshake for shutdown

This commit is contained in:
2026-05-29 12:09:22 +02:00
committed by Jan Wyzula
parent 4572760b56
commit 8a180eaa7b
4 changed files with 67 additions and 15 deletions
+19 -14
View File
@@ -37,6 +37,7 @@ IGNORE_WIDGETS = ["LaunchWindow"]
PROCESS_TERMINATION_TIMEOUT = 10
PROCESS_OUTPUT_THREAD_JOIN_TIMEOUT = 2
PROCESS_OUTPUT_SELECT_TIMEOUT = 0.2
GRACEFUL_SERVER_SHUTDOWN_RPC_TIMEOUT = 3
GRACEFUL_SERVER_SHUTDOWN_TIMEOUT = 5
OUTPUT_READER_STOP_EVENT_ATTR = "_bec_output_reader_stop_event"
@@ -141,19 +142,18 @@ def _terminate_plot_process(process, logger, timeout: float = PROCESS_TERMINATIO
process.wait(timeout=timeout)
return
except Exception as exc:
logger.warning(
f"Failed to terminate GUI process group: {exc}; terminating process only. "
f"{process_details}"
)
logger.warning("Failed to terminate GUI process group; terminating process only.")
logger.info(f"GUI process termination failure details: {exc}. {process_details}")
process.terminate()
try:
process.wait(timeout=timeout)
return
except subprocess.TimeoutExpired:
logger.warning(
f"GUI process did not stop within {timeout}s; killing it. "
f"{process_details}\n{_process_group_snapshot(process)}"
logger.warning(f"GUI process did not stop within {timeout}s; killing it.")
logger.info(
f"GUI process force-kill details: {process_details}\n"
f"{_process_group_snapshot(process)}"
)
try:
@@ -196,10 +196,8 @@ def _join_process_output_thread(process, thread: threading.Thread | None, logger
logger.error(f"Failed to close stream {str(e)}")
thread.join(timeout=PROCESS_OUTPUT_THREAD_JOIN_TIMEOUT)
if thread.is_alive():
logger.warning(
"GUI process output reader thread did not stop after process shutdown. "
f"{_process_details(process)}"
)
logger.warning("GUI process output reader thread did not stop after process shutdown.")
logger.info(f"GUI process output reader thread details: {_process_details(process)}")
def _start_plot_process(
@@ -630,19 +628,26 @@ class BECGuiClient(RPCBase):
logger.info(f"Requesting graceful GUI shutdown {process_details}")
try:
self.launcher._run_rpc( # pylint: disable=protected-access
"system.shutdown", wait_for_rpc_response=False
"system.shutdown",
wait_for_rpc_response=True,
timeout=GRACEFUL_SERVER_SHUTDOWN_RPC_TIMEOUT,
)
except Exception as exc:
logger.warning(
f"Could not request graceful GUI shutdown via RPC: {exc}. " f"{process_details}"
"Could not confirm graceful GUI shutdown via RPC; "
"falling back to process termination."
)
logger.info(f"Graceful GUI shutdown RPC failure details: {exc}. {process_details}")
return False
if _wait_for_process_exit(self._process, GRACEFUL_SERVER_SHUTDOWN_TIMEOUT):
logger.info(f"GUI server exited after graceful shutdown {process_details}")
return True
logger.warning(
"GUI server did not exit after graceful shutdown request; "
f"falling back to process termination. {process_details}\n"
"falling back to process termination."
)
logger.info(
f"Graceful GUI shutdown timeout details: {process_details}\n"
f"{_process_group_snapshot(self._process)}"
)
return False
+28
View File
@@ -143,6 +143,34 @@ class RPCServer:
f"receive_latency_s={format_elapsed(receive_latency)}"
)
logger.debug(f"Received RPC instruction: {msg}, metadata: {metadata}")
# Shutdown must acknowledge before teardown starts. The generic RPC path
# below publishes successful responses through QTimer.singleShot(0);
# for system.shutdown that would race with the queued app quit and
# dispatcher shutdown scheduled by _shutdown_gui_server().
if method == "system.shutdown":
execution_start = time.perf_counter()
try:
self.run_system_rpc(method, args, kwargs)
except Exception:
execution_duration = time.perf_counter() - execution_start
content = traceback.format_exc()
logger.error(
"GUI RPC server shutdown request failed "
f"request_id={request_id} method={method} gui_id={self.gui_id} "
f"execution_duration_s={execution_duration:.3f}\n{content}"
)
self.send_response(request_id, False, {"error": content})
else:
execution_duration = time.perf_counter() - execution_start
logger.info(
"GUI RPC server acknowledged shutdown request "
f"request_id={request_id} method={method} gui_id={self.gui_id} "
f"execution_duration_s={execution_duration:.3f}"
)
self.send_response(request_id, True, {"result": None})
return
execution_start = time.perf_counter()
with rpc_exception_hook(functools.partial(self.send_response, request_id, False)):
try:
+4 -1
View File
@@ -7,6 +7,7 @@ import pytest
from bec_widgets.cli.client import BECDockArea
from bec_widgets.cli.client_utils import (
GRACEFUL_SERVER_SHUTDOWN_RPC_TIMEOUT,
OUTPUT_READER_STOP_EVENT_ATTR,
BECGuiClient,
_join_process_output_thread,
@@ -318,7 +319,9 @@ def test_client_utils_kill_server_requests_graceful_shutdown_before_signal():
launcher_prop.return_value = launcher
gui.kill_server()
launcher._run_rpc.assert_called_once_with("system.shutdown", wait_for_rpc_response=False)
launcher._run_rpc.assert_called_once_with(
"system.shutdown", wait_for_rpc_response=True, timeout=GRACEFUL_SERVER_SHUTDOWN_RPC_TIMEOUT
)
process.wait.assert_called_once_with(timeout=5)
killpg.assert_not_called()
assert gui._process is None
+16
View File
@@ -106,6 +106,22 @@ def test_rpc_server_system_shutdown_requests_gui_server_shutdown(rpc_server, qap
del qapp.gui_server
def test_on_rpc_update_system_shutdown_sends_response_before_return(rpc_server):
order = []
rpc_server.run_system_rpc = MagicMock(side_effect=lambda *_args: order.append("shutdown"))
rpc_server.send_response = MagicMock(side_effect=lambda *_args: order.append("response"))
rpc_server.serialize_result_and_send = MagicMock()
rpc_server.on_rpc_update(
{"action": "system.shutdown", "parameter": {"args": [], "kwargs": {}}},
{"request_id": "shutdown-request", "sent_at": 1.0, "deadline": 10.0, "timeout": 2},
)
assert order == ["shutdown", "response"]
rpc_server.send_response.assert_called_once_with("shutdown-request", True, {"result": None})
rpc_server.serialize_result_and_send.assert_not_called()
def test_singleshot_rpc_repeat_raises_on_repeated_singleshot(rpc_server):
"""
Test that a singleshot RPC method raises an error when called multiple times.