Merge pull request #224 from tiqi-group/fix/loop-cleanup

chore: properly closing event loops in client and server
This commit is contained in:
Mose Müller 2025-05-20 15:37:57 +02:00 committed by GitHub
commit 6be27217cf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 112 additions and 58 deletions

View File

@ -104,3 +104,10 @@ disallow_incomplete_defs = true
disallow_any_generics = true
check_untyped_defs = true
ignore_missing_imports = false
[tool.pytest.ini_options]
asyncio_default_fixture_loop_scope = "function"
filterwarnings = [
# I don't controll the usage of the timeout
"ignore:parameter 'timeout' of type 'float' is deprecated, please use 'timeout=ClientWSTimeout"
]

View File

@ -33,7 +33,10 @@ class NotifyDict(TypedDict):
def asyncio_loop_thread(loop: asyncio.AbstractEventLoop) -> None:
asyncio.set_event_loop(loop)
loop.run_forever()
try:
loop.run_forever()
finally:
loop.close()
class Client:

View File

@ -182,7 +182,10 @@ class Server:
This method should be called to start the server after it's been instantiated.
"""
self._loop.run_until_complete(self.serve())
try:
self._loop.run_until_complete(self.serve())
finally:
self._loop.close()
async def serve(self) -> None:
process_id = os.getpid()

View File

@ -219,7 +219,18 @@ def is_descriptor(obj: object) -> bool:
def current_event_loop_exists() -> bool:
"""Check if an event loop has been set."""
"""Check if a running and open asyncio event loop exists in the current thread.
This checks if an event loop is set via the current event loop policy and verifies
that the loop has not been closed.
Returns:
True if an event loop exists and is not closed, False otherwise.
"""
import asyncio
return asyncio.get_event_loop_policy()._local._loop is not None # type: ignore
try:
return not asyncio.get_running_loop().is_closed()
except RuntimeError:
return False

View File

@ -2,8 +2,9 @@ import threading
from collections.abc import Generator
from typing import Any
import pydase
import pytest
import pydase
from pydase.client.proxy_loader import ProxyAttributeError
@ -52,6 +53,7 @@ def pydase_client() -> Generator[pydase.Client, None, Any]:
yield client
client.disconnect()
server.handle_exit()
thread.join()

View File

@ -2,27 +2,26 @@ import threading
from collections.abc import Callable, Generator
from typing import Any
import pydase
import pytest
import socketio.exceptions
import pydase
@pytest.fixture(scope="function")
def pydase_restartable_server() -> (
Generator[
tuple[
pydase.Server,
threading.Thread,
pydase.DataService,
Callable[
[pydase.Server, threading.Thread, pydase.DataService],
tuple[pydase.Server, threading.Thread],
],
def pydase_restartable_server() -> Generator[
tuple[
pydase.Server,
threading.Thread,
pydase.DataService,
Callable[
[pydase.Server, threading.Thread, pydase.DataService],
tuple[pydase.Server, threading.Thread],
],
None,
Any,
]
):
],
None,
Any,
]:
class MyService(pydase.DataService):
def __init__(self) -> None:
super().__init__()
@ -62,9 +61,6 @@ def pydase_restartable_server() -> (
yield server, thread, service_instance, restart
server.handle_exit()
thread.join()
def test_reconnection(
pydase_restartable_server: tuple[
@ -105,3 +101,6 @@ def test_reconnection(
# the service proxies successfully reconnect and get the new service name
assert client.proxy.name == "New service name"
assert client_2.proxy.name == "New service name"
server.handle_exit()
thread.join()

View File

@ -7,7 +7,7 @@ from pydase.task.autostart import autostart_service_tasks
from pytest import LogCaptureFixture
@pytest.mark.asyncio(scope="function")
@pytest.mark.asyncio(loop_scope="function")
async def test_reconnection(caplog: LogCaptureFixture) -> None:
class MyService(pydase.components.device_connection.DeviceConnection):
def __init__(

View File

@ -4,12 +4,13 @@ from collections.abc import Generator
from typing import Any
import aiohttp
import pydase
import pytest
import pydase
from pydase.utils.serialization.deserializer import Deserializer
@pytest.fixture()
@pytest.fixture(scope="module")
def pydase_server() -> Generator[None, None, None]:
class SubService(pydase.DataService):
name = "SubService"
@ -52,6 +53,9 @@ def pydase_server() -> Generator[None, None, None]:
yield
server.handle_exit()
thread.join()
@pytest.mark.parametrize(
"access_path, expected",
@ -107,7 +111,7 @@ def pydase_server() -> Generator[None, None, None]:
),
],
)
@pytest.mark.asyncio()
@pytest.mark.asyncio(loop_scope="module")
async def test_get_value(
access_path: str,
expected: dict[str, Any],
@ -179,7 +183,7 @@ async def test_get_value(
),
],
)
@pytest.mark.asyncio()
@pytest.mark.asyncio(loop_scope="module")
async def test_update_value(
access_path: str,
new_value: dict[str, Any],
@ -219,7 +223,7 @@ async def test_update_value(
),
],
)
@pytest.mark.asyncio()
@pytest.mark.asyncio(loop_scope="module")
async def test_trigger_method(
access_path: str,
expected: Any,
@ -278,7 +282,7 @@ async def test_trigger_method(
),
],
)
@pytest.mark.asyncio()
@pytest.mark.asyncio(loop_scope="module")
async def test_client_information_logging(
headers: dict[str, str],
log_id: str,

View File

@ -2,13 +2,14 @@ import threading
from collections.abc import Generator
from typing import Any
import pydase
import pytest
import socketio
import pydase
from pydase.utils.serialization.deserializer import Deserializer
@pytest.fixture()
@pytest.fixture(scope="module")
def pydase_server() -> Generator[None, None, None]:
class SubService(pydase.DataService):
name = "SubService"
@ -51,6 +52,9 @@ def pydase_server() -> Generator[None, None, None]:
yield
server.handle_exit()
thread.join()
@pytest.mark.parametrize(
"access_path, expected",
@ -106,7 +110,7 @@ def pydase_server() -> Generator[None, None, None]:
),
],
)
@pytest.mark.asyncio()
@pytest.mark.asyncio(loop_scope="module")
async def test_get_value(
access_path: str,
expected: dict[str, Any],
@ -181,7 +185,7 @@ async def test_get_value(
),
],
)
@pytest.mark.asyncio()
@pytest.mark.asyncio(loop_scope="module")
async def test_update_value(
access_path: str,
new_value: dict[str, Any],
@ -226,7 +230,7 @@ async def test_update_value(
),
],
)
@pytest.mark.asyncio()
@pytest.mark.asyncio(loop_scope="module")
async def test_trigger_method(
access_path: str,
expected: Any,
@ -291,7 +295,7 @@ async def test_trigger_method(
),
],
)
@pytest.mark.asyncio()
@pytest.mark.asyncio(loop_scope="module")
async def test_client_information_logging(
headers: dict[str, str],
log_id: str,

View File

@ -1,19 +1,20 @@
import asyncio
import logging
import pydase
import pytest
from pytest import LogCaptureFixture
import pydase
from pydase.data_service.data_service_observer import DataServiceObserver
from pydase.data_service.state_manager import StateManager
from pydase.task.autostart import autostart_service_tasks
from pydase.task.decorator import task
from pydase.task.task_status import TaskStatus
from pytest import LogCaptureFixture
logger = logging.getLogger("pydase")
@pytest.mark.asyncio(scope="function")
@pytest.mark.asyncio()
async def test_start_and_stop_task(caplog: LogCaptureFixture) -> None:
class MyService(pydase.DataService):
@task()
@ -28,11 +29,11 @@ async def test_start_and_stop_task(caplog: LogCaptureFixture) -> None:
DataServiceObserver(state_manager)
autostart_service_tasks(service_instance)
await asyncio.sleep(0.1)
await asyncio.sleep(0.01)
assert service_instance.my_task.status == TaskStatus.NOT_RUNNING
service_instance.my_task.start()
await asyncio.sleep(0.1)
await asyncio.sleep(0.01)
assert service_instance.my_task.status == TaskStatus.RUNNING
assert "'my_task.status' changed to 'TaskStatus.RUNNING'" in caplog.text
@ -40,12 +41,12 @@ async def test_start_and_stop_task(caplog: LogCaptureFixture) -> None:
caplog.clear()
service_instance.my_task.stop()
await asyncio.sleep(0.1)
await asyncio.sleep(0.01)
assert service_instance.my_task.status == TaskStatus.NOT_RUNNING
assert "Task 'my_task' was cancelled" in caplog.text
@pytest.mark.asyncio(scope="function")
@pytest.mark.asyncio()
async def test_autostart_task(caplog: LogCaptureFixture) -> None:
class MyService(pydase.DataService):
@task(autostart=True)
@ -61,13 +62,16 @@ async def test_autostart_task(caplog: LogCaptureFixture) -> None:
autostart_service_tasks(service_instance)
await asyncio.sleep(0.1)
await asyncio.sleep(0.01)
assert service_instance.my_task.status == TaskStatus.RUNNING
assert "'my_task.status' changed to 'TaskStatus.RUNNING'" in caplog.text
service_instance.my_task.stop()
await asyncio.sleep(0.01)
@pytest.mark.asyncio(scope="function")
@pytest.mark.asyncio()
async def test_nested_list_autostart_task(
caplog: LogCaptureFixture,
) -> None:
@ -86,7 +90,7 @@ async def test_nested_list_autostart_task(
DataServiceObserver(state_manager)
autostart_service_tasks(service_instance)
await asyncio.sleep(0.1)
await asyncio.sleep(0.01)
assert service_instance.sub_services_list[0].my_task.status == TaskStatus.RUNNING
assert service_instance.sub_services_list[1].my_task.status == TaskStatus.RUNNING
@ -99,8 +103,12 @@ async def test_nested_list_autostart_task(
in caplog.text
)
service_instance.sub_services_list[0].my_task.stop()
service_instance.sub_services_list[1].my_task.stop()
await asyncio.sleep(0.01)
@pytest.mark.asyncio(scope="function")
@pytest.mark.asyncio()
async def test_nested_dict_autostart_task(
caplog: LogCaptureFixture,
) -> None:
@ -120,7 +128,7 @@ async def test_nested_dict_autostart_task(
autostart_service_tasks(service_instance)
await asyncio.sleep(0.1)
await asyncio.sleep(0.01)
assert (
service_instance.sub_services_dict["first"].my_task.status == TaskStatus.RUNNING
@ -139,8 +147,12 @@ async def test_nested_dict_autostart_task(
in caplog.text
)
service_instance.sub_services_dict["first"].my_task.stop()
service_instance.sub_services_dict["second"].my_task.stop()
await asyncio.sleep(0.01)
@pytest.mark.asyncio(scope="function")
@pytest.mark.asyncio()
async def test_manual_start_with_multiple_service_instances(
caplog: LogCaptureFixture,
) -> None:
@ -161,7 +173,7 @@ async def test_manual_start_with_multiple_service_instances(
autostart_service_tasks(service_instance)
await asyncio.sleep(0.1)
await asyncio.sleep(0.01)
assert (
service_instance.sub_services_list[0].my_task.status == TaskStatus.NOT_RUNNING
@ -291,7 +303,7 @@ async def test_manual_start_with_multiple_service_instances(
assert "Task 'my_task' was cancelled" in caplog.text
@pytest.mark.asyncio(scope="function")
@pytest.mark.asyncio()
async def test_restart_on_exception(caplog: LogCaptureFixture) -> None:
class MyService(pydase.DataService):
@task(restart_on_exception=True, restart_sec=0.1)
@ -312,8 +324,11 @@ async def test_restart_on_exception(caplog: LogCaptureFixture) -> None:
assert "Task 'my_task' encountered an exception" in caplog.text
assert "Triggered task." in caplog.text
service_instance.my_task.stop()
await asyncio.sleep(0.01)
@pytest.mark.asyncio(scope="function")
@pytest.mark.asyncio()
async def test_restart_sec(caplog: LogCaptureFixture) -> None:
class MyService(pydase.DataService):
@task(restart_on_exception=True, restart_sec=0.1)
@ -334,8 +349,11 @@ async def test_restart_sec(caplog: LogCaptureFixture) -> None:
await asyncio.sleep(0.05)
assert "Triggered task." in caplog.text # Ensures the task restarted after 0.2s
service_instance.my_task.stop()
await asyncio.sleep(0.01)
@pytest.mark.asyncio(scope="function")
@pytest.mark.asyncio()
async def test_exceeding_start_limit_interval_sec_and_burst(
caplog: LogCaptureFixture,
) -> None:
@ -359,7 +377,7 @@ async def test_exceeding_start_limit_interval_sec_and_burst(
assert service_instance.my_task.status == TaskStatus.NOT_RUNNING
@pytest.mark.asyncio(scope="function")
@pytest.mark.asyncio()
async def test_non_exceeding_start_limit_interval_sec_and_burst(
caplog: LogCaptureFixture,
) -> None:
@ -382,8 +400,11 @@ async def test_non_exceeding_start_limit_interval_sec_and_burst(
assert "Task 'my_task' exceeded restart burst limit" not in caplog.text
assert service_instance.my_task.status == TaskStatus.RUNNING
service_instance.my_task.stop()
await asyncio.sleep(0.01)
@pytest.mark.asyncio(scope="function")
@pytest.mark.asyncio()
async def test_exit_on_failure(
monkeypatch: pytest.MonkeyPatch, caplog: LogCaptureFixture
) -> None:
@ -408,7 +429,7 @@ async def test_exit_on_failure(
assert "Task 'my_task' encountered an exception" in caplog.text
@pytest.mark.asyncio(scope="function")
@pytest.mark.asyncio()
async def test_exit_on_failure_exceeding_rate_limit(
monkeypatch: pytest.MonkeyPatch, caplog: LogCaptureFixture
) -> None:
@ -438,7 +459,7 @@ async def test_exit_on_failure_exceeding_rate_limit(
assert "Task 'my_task' encountered an exception" in caplog.text
@pytest.mark.asyncio(scope="function")
@pytest.mark.asyncio()
async def test_gracefully_finishing_task(
monkeypatch: pytest.MonkeyPatch, caplog: LogCaptureFixture
) -> None:

View File

@ -207,7 +207,7 @@ def test_ColouredEnum_serialize() -> None:
}
@pytest.mark.asyncio(scope="module")
@pytest.mark.asyncio(loop_scope="module")
async def test_method_serialization() -> None:
class ClassWithMethod(pydase.DataService):
def some_method(self) -> str: