introduces check if current event loop is closed

This introduces a check for if the event loop is actually closed
already.
This commit is contained in:
Mose Müller 2025-05-20 14:01:59 +02:00
parent 7c18d86e9c
commit 57cfe45c76

View File

@ -219,7 +219,18 @@ def is_descriptor(obj: object) -> bool:
def current_event_loop_exists() -> bool:
"""Check if an event loop has been set."""
"""Check if a running and open asyncio event loop exists in the current thread.
This checks if an event loop is set via the current event loop policy and verifies
that the loop has not been closed.
Returns:
True if an event loop exists and is not closed, False otherwise.
"""
import asyncio
return asyncio.get_event_loop_policy()._local._loop is not None # type: ignore
try:
return not asyncio.get_running_loop().is_closed()
except RuntimeError:
return False