From 0bfb3a44ae2d3718fa48cfc78ba0f3de6422e240 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mose=20M=C3=BCller?= Date: Wed, 2 Aug 2023 12:06:19 +0200 Subject: [PATCH] DataService: removing threading, using asyncio loop Start tasks through asyncio loop instead of a separate thread. Autmatic tasks are not started in the init function. They can be started elsewhere. --- .../data_service/data_service.py | 38 +++++-------------- 1 file changed, 9 insertions(+), 29 deletions(-) diff --git a/src/pyDataInterface/data_service/data_service.py b/src/pyDataInterface/data_service/data_service.py index 8de3eba..3b4813c 100644 --- a/src/pyDataInterface/data_service/data_service.py +++ b/src/pyDataInterface/data_service/data_service.py @@ -1,8 +1,6 @@ import asyncio import inspect -import threading from collections.abc import Callable -from concurrent.futures import Future from itertools import chain from typing import Any @@ -31,9 +29,10 @@ class DataService(rpyc.Service): # Keep track of the root object. This helps to filter the emission of # notifications self.__root__: "DataService" = self + self.__loop = asyncio.get_event_loop() # dictionary to keep track of running tasks - self.__tasks: dict[str, Future[None]] = {} + self.__tasks: dict[str, asyncio.Task[None]] = {} self._autostart_tasks: dict[str, tuple[Any]] if "_autostart_tasks" not in self.__dict__: self._autostart_tasks = {} @@ -41,9 +40,6 @@ class DataService(rpyc.Service): self._callbacks: set[Callable[[str, Any], None]] = set() self._set_start_and_stop_for_async_methods() - self._start_async_loop_in_thread() - self._start_autostart_tasks() - self._register_list_change_callbacks(self, f"{self.__class__.__name__}") self._register_DataService_instance_callbacks( self, f"{self.__class__.__name__}" @@ -95,24 +91,7 @@ class DataService(rpyc.Service): f"No start method found for service '{service_name}'" ) - def _start_loop(self) -> None: - asyncio.set_event_loop(self.__loop) - try: - self.__loop.run_forever() - finally: - # cancel all running tasks - for task in self.__tasks.values(): - self.__loop.call_soon_threadsafe(task.cancel) - self.__loop.call_soon_threadsafe(self.__loop.stop) - self.__thread.join() - - def _start_async_loop_in_thread(self) -> None: - # create a new event loop and run it in a separate thread - self.__loop = asyncio.new_event_loop() - self.__thread = threading.Thread(target=self._start_loop) - self.__thread.start() - - def _set_start_and_stop_for_async_methods(self) -> None: + def _set_start_and_stop_for_async_methods(self) -> None: # noqa: C901 # inspect the methods of the class for name, method in inspect.getmembers( self, predicate=inspect.iscoroutinefunction @@ -121,17 +100,18 @@ class DataService(rpyc.Service): def start_task(*args: Any, **kwargs: Any) -> None: async def task(*args: Any, **kwargs: Any) -> None: try: - await getattr(self, name)(*args, **kwargs) + await method(*args, **kwargs) except asyncio.CancelledError: print(f"Task {name} was cancelled") - self.__tasks[name] = asyncio.run_coroutine_threadsafe( - task(*args, **kwargs), self.__loop - ) + if not self.__tasks.get(name): + self.__tasks[name] = self.__loop.create_task(task(*args, **kwargs)) + else: + logger.error(f"Task `{name}` is already running!") def stop_task() -> None: # cancel the task - task = self.__tasks.get(name) + task = self.__tasks.pop(name) if task is not None: self.__loop.call_soon_threadsafe(task.cancel)