From 18c64db826dcda1adc75122451e9e0566fc5f793 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mose=20M=C3=BCller?= Date: Tue, 20 May 2025 15:06:42 +0200 Subject: [PATCH] client: allow reconnecting by moving loop and thread initialization out of constructor - Refactored Client to delay event loop and thread creation until connect() is called. - Introduced _initialize_loop_and_thread() helper for consistent loop/thread/proxy setup. - Updated disconnect() to stop the loop and join the thread without closing the loop immediately. This allows proper cleanup and supports reconnecting a client instance after disconnection. - Fixes issues with restarting closed event loops and improves lifecycle control in testing and production. --- src/pydase/client/client.py | 48 +++++++++++++++++++++++++++---------- 1 file changed, 36 insertions(+), 12 deletions(-) diff --git a/src/pydase/client/client.py b/src/pydase/client/client.py index c429331..620d2df 100644 --- a/src/pydase/client/client.py +++ b/src/pydase/client/client.py @@ -101,17 +101,12 @@ class Client: self._path_prefix = parsed_url.path.rstrip("/") # Remove trailing slash if any self._url = url self._sio = socketio.AsyncClient(**sio_client_kwargs) - self._loop = asyncio.new_event_loop() self._client_id = client_id - self.proxy = ProxyClass( - sio_client=self._sio, loop=self._loop, reconnect=self.connect - ) + self._loop: asyncio.AbstractEventLoop | None = None + self._thread: threading.Thread | None = None + self.proxy: ProxyClass """A proxy object representing the remote service, facilitating interaction as if it were local.""" - self._thread = threading.Thread( - target=asyncio_loop_thread, args=(self._loop,), daemon=True - ) - self._thread.start() self.connect(block_until_connected=block_until_connected) def __enter__(self) -> Self: @@ -126,17 +121,46 @@ class Client: self.disconnect() def connect(self, block_until_connected: bool = True) -> None: + if self._thread is None or self._loop is None: + self._loop = self._initialize_loop_and_thread() + connection_future = asyncio.run_coroutine_threadsafe( self._connect(), self._loop ) if block_until_connected: connection_future.result() - def disconnect(self) -> None: - connection_future = asyncio.run_coroutine_threadsafe( - self._disconnect(), self._loop + def _initialize_loop_and_thread(self) -> asyncio.AbstractEventLoop: + """Initialize a new asyncio event loop, start it in a background thread, + and create the ProxyClass instance bound to that loop. + """ + + loop = asyncio.new_event_loop() + self.proxy = ProxyClass( + sio_client=self._sio, + loop=loop, + reconnect=self.connect, ) - connection_future.result() + self._thread = threading.Thread( + target=asyncio_loop_thread, + args=(loop,), + daemon=True, + ) + self._thread.start() + + return loop + + def disconnect(self) -> None: + if self._loop is not None and self._thread is not None: + connection_future = asyncio.run_coroutine_threadsafe( + self._disconnect(), self._loop + ) + connection_future.result() + + # Stop the event loop and thread + self._loop.call_soon_threadsafe(self._loop.stop) + self._thread.join() + self._thread = None async def _connect(self) -> None: logger.debug("Connecting to server '%s' ...", self._url)