mirror of
https://github.com/bec-project/bec_widgets.git
synced 2026-06-05 04:48:40 +02:00
fix(client_utils): stop output reader thread on shutdown
This commit is contained in:
@@ -36,7 +36,9 @@ logger = bec_logger.logger
|
||||
IGNORE_WIDGETS = ["LaunchWindow"]
|
||||
PROCESS_TERMINATION_TIMEOUT = 10
|
||||
PROCESS_OUTPUT_THREAD_JOIN_TIMEOUT = 2
|
||||
PROCESS_OUTPUT_SELECT_TIMEOUT = 0.2
|
||||
GRACEFUL_SERVER_SHUTDOWN_TIMEOUT = 5
|
||||
OUTPUT_READER_STOP_EVENT_ATTR = "_bec_output_reader_stop_event"
|
||||
|
||||
RegistryState: TypeAlias = dict[
|
||||
Literal["gui_id", "name", "widget_class", "config", "__rpc__", "container_proxy"],
|
||||
@@ -57,14 +59,16 @@ def _filter_output(output: str) -> str:
|
||||
return output
|
||||
|
||||
|
||||
def _get_output(process, logger) -> None:
|
||||
def _get_output(process, logger, stop_event: threading.Event | None = None) -> None:
|
||||
log_func = {process.stdout: logger.debug, process.stderr: logger.info}
|
||||
stream_buffer = {process.stdout: [], process.stderr: []}
|
||||
try:
|
||||
os.set_blocking(process.stdout.fileno(), False)
|
||||
os.set_blocking(process.stderr.fileno(), False)
|
||||
while process.poll() is None:
|
||||
readylist, _, _ = select.select([process.stdout, process.stderr], [], [], 1)
|
||||
while process.poll() is None and not (stop_event and stop_event.is_set()):
|
||||
readylist, _, _ = select.select(
|
||||
[process.stdout, process.stderr], [], [], PROCESS_OUTPUT_SELECT_TIMEOUT
|
||||
)
|
||||
for stream in (process.stdout, process.stderr):
|
||||
buf = stream_buffer[stream]
|
||||
if stream in readylist:
|
||||
@@ -180,6 +184,9 @@ def _join_process_output_thread(process, thread: threading.Thread | None, logger
|
||||
if not thread.is_alive():
|
||||
return
|
||||
|
||||
if stop_event := getattr(thread, OUTPUT_READER_STOP_EVENT_ATTR, None):
|
||||
stop_event.set()
|
||||
|
||||
for stream in (process.stdout, process.stderr):
|
||||
if stream is None:
|
||||
continue
|
||||
@@ -187,7 +194,6 @@ def _join_process_output_thread(process, thread: threading.Thread | None, logger
|
||||
stream.close()
|
||||
except OSError as e:
|
||||
logger.error(f"Failed to close stream {str(e)}")
|
||||
pass
|
||||
thread.join(timeout=PROCESS_OUTPUT_THREAD_JOIN_TIMEOUT)
|
||||
if thread.is_alive():
|
||||
logger.warning(
|
||||
@@ -247,8 +253,14 @@ def _start_plot_process(
|
||||
if logger is None:
|
||||
process_output_processing_thread = None
|
||||
else:
|
||||
process_output_stop_event = threading.Event()
|
||||
process_output_processing_thread = threading.Thread(
|
||||
target=_get_output, args=(process, logger)
|
||||
target=_get_output, args=(process, logger, process_output_stop_event)
|
||||
)
|
||||
setattr(
|
||||
process_output_processing_thread,
|
||||
OUTPUT_READER_STOP_EVENT_ATTR,
|
||||
process_output_stop_event,
|
||||
)
|
||||
process_output_processing_thread.start()
|
||||
return process, process_output_processing_thread
|
||||
|
||||
@@ -69,7 +69,7 @@ def create_widget(
|
||||
return widget
|
||||
|
||||
|
||||
@pytest.mark.timeout(100)
|
||||
@pytest.mark.timeout(20)
|
||||
def test_available_widgets(qtbot, connected_client_gui_obj):
|
||||
"""This test checks that all widgets that are available via gui.available_widgets can be created and removed."""
|
||||
gui = connected_client_gui_obj
|
||||
|
||||
@@ -6,7 +6,12 @@ from unittest import mock
|
||||
import pytest
|
||||
|
||||
from bec_widgets.cli.client import BECDockArea
|
||||
from bec_widgets.cli.client_utils import BECGuiClient, _start_plot_process
|
||||
from bec_widgets.cli.client_utils import (
|
||||
OUTPUT_READER_STOP_EVENT_ATTR,
|
||||
BECGuiClient,
|
||||
_join_process_output_thread,
|
||||
_start_plot_process,
|
||||
)
|
||||
from bec_widgets.cli.rpc.rpc_base import RPCBase, RPCResponseTimeoutError, rpc_timeout
|
||||
|
||||
|
||||
@@ -346,3 +351,21 @@ def test_client_utils_kill_server_kills_process_group_after_timeout():
|
||||
text=True,
|
||||
timeout=2,
|
||||
)
|
||||
|
||||
|
||||
def test_join_process_output_thread_signals_reader_before_closing_streams():
|
||||
process = mock.MagicMock(pid=123, args=["bec-gui-server"])
|
||||
process.stdout = mock.MagicMock()
|
||||
process.stderr = mock.MagicMock()
|
||||
thread = mock.MagicMock()
|
||||
stop_event = mock.MagicMock()
|
||||
setattr(thread, OUTPUT_READER_STOP_EVENT_ATTR, stop_event)
|
||||
thread.is_alive.side_effect = [True, False]
|
||||
logger = mock.MagicMock()
|
||||
|
||||
_join_process_output_thread(process, thread, logger)
|
||||
|
||||
assert thread.join.call_args_list == [mock.call(timeout=2), mock.call(timeout=2)]
|
||||
stop_event.set.assert_called_once_with()
|
||||
process.stdout.close.assert_called_once_with()
|
||||
process.stderr.close.assert_called_once_with()
|
||||
|
||||
Reference in New Issue
Block a user