diff --git a/bec_widgets/cli/client_utils.py b/bec_widgets/cli/client_utils.py index 57443805..ec05ba9f 100644 --- a/bec_widgets/cli/client_utils.py +++ b/bec_widgets/cli/client_utils.py @@ -36,7 +36,9 @@ logger = bec_logger.logger IGNORE_WIDGETS = ["LaunchWindow"] PROCESS_TERMINATION_TIMEOUT = 10 PROCESS_OUTPUT_THREAD_JOIN_TIMEOUT = 2 +PROCESS_OUTPUT_SELECT_TIMEOUT = 0.2 GRACEFUL_SERVER_SHUTDOWN_TIMEOUT = 5 +OUTPUT_READER_STOP_EVENT_ATTR = "_bec_output_reader_stop_event" RegistryState: TypeAlias = dict[ Literal["gui_id", "name", "widget_class", "config", "__rpc__", "container_proxy"], @@ -57,14 +59,16 @@ def _filter_output(output: str) -> str: return output -def _get_output(process, logger) -> None: +def _get_output(process, logger, stop_event: threading.Event | None = None) -> None: log_func = {process.stdout: logger.debug, process.stderr: logger.info} stream_buffer = {process.stdout: [], process.stderr: []} try: os.set_blocking(process.stdout.fileno(), False) os.set_blocking(process.stderr.fileno(), False) - while process.poll() is None: - readylist, _, _ = select.select([process.stdout, process.stderr], [], [], 1) + while process.poll() is None and not (stop_event and stop_event.is_set()): + readylist, _, _ = select.select( + [process.stdout, process.stderr], [], [], PROCESS_OUTPUT_SELECT_TIMEOUT + ) for stream in (process.stdout, process.stderr): buf = stream_buffer[stream] if stream in readylist: @@ -180,6 +184,9 @@ def _join_process_output_thread(process, thread: threading.Thread | None, logger if not thread.is_alive(): return + if stop_event := getattr(thread, OUTPUT_READER_STOP_EVENT_ATTR, None): + stop_event.set() + for stream in (process.stdout, process.stderr): if stream is None: continue @@ -187,7 +194,6 @@ def _join_process_output_thread(process, thread: threading.Thread | None, logger stream.close() except OSError as e: logger.error(f"Failed to close stream {str(e)}") - pass thread.join(timeout=PROCESS_OUTPUT_THREAD_JOIN_TIMEOUT) if thread.is_alive(): logger.warning( @@ -247,8 +253,14 @@ def _start_plot_process( if logger is None: process_output_processing_thread = None else: + process_output_stop_event = threading.Event() process_output_processing_thread = threading.Thread( - target=_get_output, args=(process, logger) + target=_get_output, args=(process, logger, process_output_stop_event) + ) + setattr( + process_output_processing_thread, + OUTPUT_READER_STOP_EVENT_ATTR, + process_output_stop_event, ) process_output_processing_thread.start() return process, process_output_processing_thread diff --git a/tests/end-2-end/test_rpc_widgets_e2e.py b/tests/end-2-end/test_rpc_widgets_e2e.py index 19e8109f..5652701b 100644 --- a/tests/end-2-end/test_rpc_widgets_e2e.py +++ b/tests/end-2-end/test_rpc_widgets_e2e.py @@ -69,7 +69,7 @@ def create_widget( return widget -@pytest.mark.timeout(100) +@pytest.mark.timeout(20) def test_available_widgets(qtbot, connected_client_gui_obj): """This test checks that all widgets that are available via gui.available_widgets can be created and removed.""" gui = connected_client_gui_obj diff --git a/tests/unit_tests/test_client_utils.py b/tests/unit_tests/test_client_utils.py index efce65bd..2afdcf5b 100644 --- a/tests/unit_tests/test_client_utils.py +++ b/tests/unit_tests/test_client_utils.py @@ -6,7 +6,12 @@ from unittest import mock import pytest from bec_widgets.cli.client import BECDockArea -from bec_widgets.cli.client_utils import BECGuiClient, _start_plot_process +from bec_widgets.cli.client_utils import ( + OUTPUT_READER_STOP_EVENT_ATTR, + BECGuiClient, + _join_process_output_thread, + _start_plot_process, +) from bec_widgets.cli.rpc.rpc_base import RPCBase, RPCResponseTimeoutError, rpc_timeout @@ -346,3 +351,21 @@ def test_client_utils_kill_server_kills_process_group_after_timeout(): text=True, timeout=2, ) + + +def test_join_process_output_thread_signals_reader_before_closing_streams(): + process = mock.MagicMock(pid=123, args=["bec-gui-server"]) + process.stdout = mock.MagicMock() + process.stderr = mock.MagicMock() + thread = mock.MagicMock() + stop_event = mock.MagicMock() + setattr(thread, OUTPUT_READER_STOP_EVENT_ATTR, stop_event) + thread.is_alive.side_effect = [True, False] + logger = mock.MagicMock() + + _join_process_output_thread(process, thread, logger) + + assert thread.join.call_args_list == [mock.call(timeout=2), mock.call(timeout=2)] + stop_event.set.assert_called_once_with() + process.stdout.close.assert_called_once_with() + process.stderr.close.assert_called_once_with()