diff --git a/bec_widgets/cli/client_utils.py b/bec_widgets/cli/client_utils.py index ec05ba9f..f32f73a6 100644 --- a/bec_widgets/cli/client_utils.py +++ b/bec_widgets/cli/client_utils.py @@ -37,6 +37,7 @@ IGNORE_WIDGETS = ["LaunchWindow"] PROCESS_TERMINATION_TIMEOUT = 10 PROCESS_OUTPUT_THREAD_JOIN_TIMEOUT = 2 PROCESS_OUTPUT_SELECT_TIMEOUT = 0.2 +GRACEFUL_SERVER_SHUTDOWN_RPC_TIMEOUT = 3 GRACEFUL_SERVER_SHUTDOWN_TIMEOUT = 5 OUTPUT_READER_STOP_EVENT_ATTR = "_bec_output_reader_stop_event" @@ -141,19 +142,18 @@ def _terminate_plot_process(process, logger, timeout: float = PROCESS_TERMINATIO process.wait(timeout=timeout) return except Exception as exc: - logger.warning( - f"Failed to terminate GUI process group: {exc}; terminating process only. " - f"{process_details}" - ) + logger.warning("Failed to terminate GUI process group; terminating process only.") + logger.info(f"GUI process termination failure details: {exc}. {process_details}") process.terminate() try: process.wait(timeout=timeout) return except subprocess.TimeoutExpired: - logger.warning( - f"GUI process did not stop within {timeout}s; killing it. " - f"{process_details}\n{_process_group_snapshot(process)}" + logger.warning(f"GUI process did not stop within {timeout}s; killing it.") + logger.info( + f"GUI process force-kill details: {process_details}\n" + f"{_process_group_snapshot(process)}" ) try: @@ -196,10 +196,8 @@ def _join_process_output_thread(process, thread: threading.Thread | None, logger logger.error(f"Failed to close stream {str(e)}") thread.join(timeout=PROCESS_OUTPUT_THREAD_JOIN_TIMEOUT) if thread.is_alive(): - logger.warning( - "GUI process output reader thread did not stop after process shutdown. " - f"{_process_details(process)}" - ) + logger.warning("GUI process output reader thread did not stop after process shutdown.") + logger.info(f"GUI process output reader thread details: {_process_details(process)}") def _start_plot_process( @@ -630,19 +628,26 @@ class BECGuiClient(RPCBase): logger.info(f"Requesting graceful GUI shutdown {process_details}") try: self.launcher._run_rpc( # pylint: disable=protected-access - "system.shutdown", wait_for_rpc_response=False + "system.shutdown", + wait_for_rpc_response=True, + timeout=GRACEFUL_SERVER_SHUTDOWN_RPC_TIMEOUT, ) except Exception as exc: logger.warning( - f"Could not request graceful GUI shutdown via RPC: {exc}. " f"{process_details}" + "Could not confirm graceful GUI shutdown via RPC; " + "falling back to process termination." ) + logger.info(f"Graceful GUI shutdown RPC failure details: {exc}. {process_details}") return False if _wait_for_process_exit(self._process, GRACEFUL_SERVER_SHUTDOWN_TIMEOUT): logger.info(f"GUI server exited after graceful shutdown {process_details}") return True logger.warning( "GUI server did not exit after graceful shutdown request; " - f"falling back to process termination. {process_details}\n" + "falling back to process termination." + ) + logger.info( + f"Graceful GUI shutdown timeout details: {process_details}\n" f"{_process_group_snapshot(self._process)}" ) return False diff --git a/bec_widgets/utils/rpc_server.py b/bec_widgets/utils/rpc_server.py index d3b9cf4b..a9446c68 100644 --- a/bec_widgets/utils/rpc_server.py +++ b/bec_widgets/utils/rpc_server.py @@ -143,6 +143,34 @@ class RPCServer: f"receive_latency_s={format_elapsed(receive_latency)}" ) logger.debug(f"Received RPC instruction: {msg}, metadata: {metadata}") + + # Shutdown must acknowledge before teardown starts. The generic RPC path + # below publishes successful responses through QTimer.singleShot(0); + # for system.shutdown that would race with the queued app quit and + # dispatcher shutdown scheduled by _shutdown_gui_server(). + if method == "system.shutdown": + execution_start = time.perf_counter() + try: + self.run_system_rpc(method, args, kwargs) + except Exception: + execution_duration = time.perf_counter() - execution_start + content = traceback.format_exc() + logger.error( + "GUI RPC server shutdown request failed " + f"request_id={request_id} method={method} gui_id={self.gui_id} " + f"execution_duration_s={execution_duration:.3f}\n{content}" + ) + self.send_response(request_id, False, {"error": content}) + else: + execution_duration = time.perf_counter() - execution_start + logger.info( + "GUI RPC server acknowledged shutdown request " + f"request_id={request_id} method={method} gui_id={self.gui_id} " + f"execution_duration_s={execution_duration:.3f}" + ) + self.send_response(request_id, True, {"result": None}) + return + execution_start = time.perf_counter() with rpc_exception_hook(functools.partial(self.send_response, request_id, False)): try: diff --git a/tests/unit_tests/test_client_utils.py b/tests/unit_tests/test_client_utils.py index 2afdcf5b..5cf5750f 100644 --- a/tests/unit_tests/test_client_utils.py +++ b/tests/unit_tests/test_client_utils.py @@ -7,6 +7,7 @@ import pytest from bec_widgets.cli.client import BECDockArea from bec_widgets.cli.client_utils import ( + GRACEFUL_SERVER_SHUTDOWN_RPC_TIMEOUT, OUTPUT_READER_STOP_EVENT_ATTR, BECGuiClient, _join_process_output_thread, @@ -318,7 +319,9 @@ def test_client_utils_kill_server_requests_graceful_shutdown_before_signal(): launcher_prop.return_value = launcher gui.kill_server() - launcher._run_rpc.assert_called_once_with("system.shutdown", wait_for_rpc_response=False) + launcher._run_rpc.assert_called_once_with( + "system.shutdown", wait_for_rpc_response=True, timeout=GRACEFUL_SERVER_SHUTDOWN_RPC_TIMEOUT + ) process.wait.assert_called_once_with(timeout=5) killpg.assert_not_called() assert gui._process is None diff --git a/tests/unit_tests/test_rpc_server.py b/tests/unit_tests/test_rpc_server.py index 8b71d8f7..fa8ebf14 100644 --- a/tests/unit_tests/test_rpc_server.py +++ b/tests/unit_tests/test_rpc_server.py @@ -106,6 +106,22 @@ def test_rpc_server_system_shutdown_requests_gui_server_shutdown(rpc_server, qap del qapp.gui_server +def test_on_rpc_update_system_shutdown_sends_response_before_return(rpc_server): + order = [] + rpc_server.run_system_rpc = MagicMock(side_effect=lambda *_args: order.append("shutdown")) + rpc_server.send_response = MagicMock(side_effect=lambda *_args: order.append("response")) + rpc_server.serialize_result_and_send = MagicMock() + + rpc_server.on_rpc_update( + {"action": "system.shutdown", "parameter": {"args": [], "kwargs": {}}}, + {"request_id": "shutdown-request", "sent_at": 1.0, "deadline": 10.0, "timeout": 2}, + ) + + assert order == ["shutdown", "response"] + rpc_server.send_response.assert_called_once_with("shutdown-request", True, {"result": None}) + rpc_server.serialize_result_and_send.assert_not_called() + + def test_singleshot_rpc_repeat_raises_on_repeated_singleshot(rpc_server): """ Test that a singleshot RPC method raises an error when called multiple times.