Merge pull request #223 from tiqi-group/refactor/client-lifecycle

client: allow reconnecting by moving loop and thread initialization out of constructor
This commit is contained in:
Mose Müller 2025-05-20 15:11:50 +02:00 committed by GitHub
commit c5eb5f80b4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -101,17 +101,12 @@ class Client:
self._path_prefix = parsed_url.path.rstrip("/") # Remove trailing slash if any self._path_prefix = parsed_url.path.rstrip("/") # Remove trailing slash if any
self._url = url self._url = url
self._sio = socketio.AsyncClient(**sio_client_kwargs) self._sio = socketio.AsyncClient(**sio_client_kwargs)
self._loop = asyncio.new_event_loop()
self._client_id = client_id self._client_id = client_id
self.proxy = ProxyClass( self._loop: asyncio.AbstractEventLoop | None = None
sio_client=self._sio, loop=self._loop, reconnect=self.connect self._thread: threading.Thread | None = None
) self.proxy: ProxyClass
"""A proxy object representing the remote service, facilitating interaction as """A proxy object representing the remote service, facilitating interaction as
if it were local.""" if it were local."""
self._thread = threading.Thread(
target=asyncio_loop_thread, args=(self._loop,), daemon=True
)
self._thread.start()
self.connect(block_until_connected=block_until_connected) self.connect(block_until_connected=block_until_connected)
def __enter__(self) -> Self: def __enter__(self) -> Self:
@ -126,17 +121,46 @@ class Client:
self.disconnect() self.disconnect()
def connect(self, block_until_connected: bool = True) -> None: def connect(self, block_until_connected: bool = True) -> None:
if self._thread is None or self._loop is None:
self._loop = self._initialize_loop_and_thread()
connection_future = asyncio.run_coroutine_threadsafe( connection_future = asyncio.run_coroutine_threadsafe(
self._connect(), self._loop self._connect(), self._loop
) )
if block_until_connected: if block_until_connected:
connection_future.result() connection_future.result()
def disconnect(self) -> None: def _initialize_loop_and_thread(self) -> asyncio.AbstractEventLoop:
connection_future = asyncio.run_coroutine_threadsafe( """Initialize a new asyncio event loop, start it in a background thread,
self._disconnect(), self._loop and create the ProxyClass instance bound to that loop.
"""
loop = asyncio.new_event_loop()
self.proxy = ProxyClass(
sio_client=self._sio,
loop=loop,
reconnect=self.connect,
) )
connection_future.result() self._thread = threading.Thread(
target=asyncio_loop_thread,
args=(loop,),
daemon=True,
)
self._thread.start()
return loop
def disconnect(self) -> None:
if self._loop is not None and self._thread is not None:
connection_future = asyncio.run_coroutine_threadsafe(
self._disconnect(), self._loop
)
connection_future.result()
# Stop the event loop and thread
self._loop.call_soon_threadsafe(self._loop.stop)
self._thread.join()
self._thread = None
async def _connect(self) -> None: async def _connect(self) -> None:
logger.debug("Connecting to server '%s' ...", self._url) logger.debug("Connecting to server '%s' ...", self._url)