mirror of
https://github.com/bec-project/bec_widgets.git
synced 2025-07-13 11:11:49 +02:00
fix: mock QTimer, improve timeout message
This commit is contained in:
46
tests/conftest.py
Normal file
46
tests/conftest.py
Normal file
@ -0,0 +1,46 @@
|
|||||||
|
import pytest
|
||||||
|
import qtpy.QtCore
|
||||||
|
from pytestqt.exceptions import TimeoutError as QtBotTimeoutError
|
||||||
|
from qtpy.QtCore import QTimer
|
||||||
|
|
||||||
|
|
||||||
|
class TestableQTimer(QTimer):
|
||||||
|
_instances: list[tuple[QTimer, str]] = []
|
||||||
|
_current_test_name: str = ""
|
||||||
|
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
super().__init__(*args, **kwargs)
|
||||||
|
TestableQTimer._instances.append((self, TestableQTimer._current_test_name))
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def check_all_stopped(cls, qtbot):
|
||||||
|
def _is_done_or_deleted(t: QTimer):
|
||||||
|
try:
|
||||||
|
return not t.isActive()
|
||||||
|
except RuntimeError as e:
|
||||||
|
return "already deleted" in e.args[0]
|
||||||
|
|
||||||
|
try:
|
||||||
|
qtbot.waitUntil(lambda: all(_is_done_or_deleted(timer) for timer, _ in cls._instances))
|
||||||
|
except QtBotTimeoutError as exc:
|
||||||
|
active_timers = list(filter(lambda t: t[0].isActive(), cls._instances))
|
||||||
|
(t.stop() for t, _ in cls._instances)
|
||||||
|
raise TimeoutError(f"Failed to stop all timers: {active_timers}") from exc
|
||||||
|
cls._instances = []
|
||||||
|
|
||||||
|
|
||||||
|
# To support 'from qtpy.QtCore import QTimer' syntax we just replace this completely for the test session
|
||||||
|
# see: https://docs.python.org/3/library/unittest.mock.html#where-to-patch
|
||||||
|
qtpy.QtCore.QTimer = TestableQTimer
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def _capture_test_name_in_qtimer(request):
|
||||||
|
TestableQTimer._current_test_name = request.node.name
|
||||||
|
yield
|
||||||
|
TestableQTimer._current_test_name = ""
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def testable_qtimer_class():
|
||||||
|
return TestableQTimer
|
@ -2,7 +2,6 @@ from unittest import mock
|
|||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
from pytestqt.exceptions import TimeoutError as QtBotTimeoutError
|
from pytestqt.exceptions import TimeoutError as QtBotTimeoutError
|
||||||
from qtpy.QtCore import QTimer
|
|
||||||
from qtpy.QtWidgets import QApplication
|
from qtpy.QtWidgets import QApplication
|
||||||
|
|
||||||
from bec_widgets.cli.rpc.rpc_register import RPCRegister
|
from bec_widgets.cli.rpc.rpc_register import RPCRegister
|
||||||
@ -10,22 +9,6 @@ from bec_widgets.qt_utils import error_popups
|
|||||||
from bec_widgets.utils import bec_dispatcher as bec_dispatcher_module
|
from bec_widgets.utils import bec_dispatcher as bec_dispatcher_module
|
||||||
|
|
||||||
|
|
||||||
class TestableQTimer(QTimer):
|
|
||||||
_instances = []
|
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
|
||||||
super().__init__(*args, **kwargs)
|
|
||||||
TestableQTimer._instances.append(self)
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def check_all_stopped(cls, qtbot):
|
|
||||||
try:
|
|
||||||
qtbot.waitUntil(lambda: all(not timer.isActive() for timer in cls._instances))
|
|
||||||
except QtBotTimeoutError as exc:
|
|
||||||
raise TimeoutError("Failed to stop all timers") from exc
|
|
||||||
cls._instances = []
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.hookimpl(tryfirst=True, hookwrapper=True)
|
@pytest.hookimpl(tryfirst=True, hookwrapper=True)
|
||||||
def pytest_runtest_makereport(item, call):
|
def pytest_runtest_makereport(item, call):
|
||||||
# execute all other hooks to obtain the report object
|
# execute all other hooks to obtain the report object
|
||||||
@ -36,17 +19,16 @@ def pytest_runtest_makereport(item, call):
|
|||||||
|
|
||||||
|
|
||||||
@pytest.fixture(autouse=True)
|
@pytest.fixture(autouse=True)
|
||||||
def qapplication(qtbot, request): # pylint: disable=unused-argument
|
def qapplication(qtbot, request, testable_qtimer_class): # pylint: disable=unused-argument
|
||||||
with mock.patch("qtpy.QtCore.QTimer", new=TestableQTimer):
|
yield
|
||||||
yield
|
|
||||||
|
|
||||||
# if the test failed, we don't want to check for open widgets as
|
# if the test failed, we don't want to check for open widgets as
|
||||||
# it simply pollutes the output
|
# it simply pollutes the output
|
||||||
if request.node.stash._storage.get("failed"):
|
if request.node.stash._storage.get("failed"):
|
||||||
print("Test failed, skipping cleanup checks")
|
print("Test failed, skipping cleanup checks")
|
||||||
return
|
return
|
||||||
|
|
||||||
TestableQTimer.check_all_stopped(qtbot)
|
testable_qtimer_class.check_all_stopped(qtbot)
|
||||||
|
|
||||||
qapp = QApplication.instance()
|
qapp = QApplication.instance()
|
||||||
qapp.processEvents()
|
qapp.processEvents()
|
||||||
|
10
tests/unit_tests/test_testing_utils.py
Normal file
10
tests/unit_tests/test_testing_utils.py
Normal file
@ -0,0 +1,10 @@
|
|||||||
|
from unittest.mock import MagicMock
|
||||||
|
|
||||||
|
from bec_widgets.widgets.services.bec_status_box.bec_status_box import BECServiceStatusMixin
|
||||||
|
|
||||||
|
|
||||||
|
def test_qtimer_uses_testable_qtimer():
|
||||||
|
service_status = BECServiceStatusMixin(None, MagicMock())
|
||||||
|
assert service_status._service_update_timer.__class__.__name__ != "QTimer"
|
||||||
|
assert service_status._service_update_timer.__class__.__name__ == "TestableQTimer"
|
||||||
|
service_status.cleanup()
|
Reference in New Issue
Block a user