mirror of
https://github.com/tiqi-group/pydase.git
synced 2025-04-23 01:20:03 +02:00
DataService: removing threading, using asyncio loop
Start tasks through asyncio loop instead of a separate thread. Autmatic tasks are not started in the init function. They can be started elsewhere.
This commit is contained in:
parent
ebeb4c1520
commit
0bfb3a44ae
@ -1,8 +1,6 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
import inspect
|
import inspect
|
||||||
import threading
|
|
||||||
from collections.abc import Callable
|
from collections.abc import Callable
|
||||||
from concurrent.futures import Future
|
|
||||||
from itertools import chain
|
from itertools import chain
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
@ -31,9 +29,10 @@ class DataService(rpyc.Service):
|
|||||||
# Keep track of the root object. This helps to filter the emission of
|
# Keep track of the root object. This helps to filter the emission of
|
||||||
# notifications
|
# notifications
|
||||||
self.__root__: "DataService" = self
|
self.__root__: "DataService" = self
|
||||||
|
self.__loop = asyncio.get_event_loop()
|
||||||
|
|
||||||
# dictionary to keep track of running tasks
|
# dictionary to keep track of running tasks
|
||||||
self.__tasks: dict[str, Future[None]] = {}
|
self.__tasks: dict[str, asyncio.Task[None]] = {}
|
||||||
self._autostart_tasks: dict[str, tuple[Any]]
|
self._autostart_tasks: dict[str, tuple[Any]]
|
||||||
if "_autostart_tasks" not in self.__dict__:
|
if "_autostart_tasks" not in self.__dict__:
|
||||||
self._autostart_tasks = {}
|
self._autostart_tasks = {}
|
||||||
@ -41,9 +40,6 @@ class DataService(rpyc.Service):
|
|||||||
self._callbacks: set[Callable[[str, Any], None]] = set()
|
self._callbacks: set[Callable[[str, Any], None]] = set()
|
||||||
self._set_start_and_stop_for_async_methods()
|
self._set_start_and_stop_for_async_methods()
|
||||||
|
|
||||||
self._start_async_loop_in_thread()
|
|
||||||
self._start_autostart_tasks()
|
|
||||||
|
|
||||||
self._register_list_change_callbacks(self, f"{self.__class__.__name__}")
|
self._register_list_change_callbacks(self, f"{self.__class__.__name__}")
|
||||||
self._register_DataService_instance_callbacks(
|
self._register_DataService_instance_callbacks(
|
||||||
self, f"{self.__class__.__name__}"
|
self, f"{self.__class__.__name__}"
|
||||||
@ -95,24 +91,7 @@ class DataService(rpyc.Service):
|
|||||||
f"No start method found for service '{service_name}'"
|
f"No start method found for service '{service_name}'"
|
||||||
)
|
)
|
||||||
|
|
||||||
def _start_loop(self) -> None:
|
def _set_start_and_stop_for_async_methods(self) -> None: # noqa: C901
|
||||||
asyncio.set_event_loop(self.__loop)
|
|
||||||
try:
|
|
||||||
self.__loop.run_forever()
|
|
||||||
finally:
|
|
||||||
# cancel all running tasks
|
|
||||||
for task in self.__tasks.values():
|
|
||||||
self.__loop.call_soon_threadsafe(task.cancel)
|
|
||||||
self.__loop.call_soon_threadsafe(self.__loop.stop)
|
|
||||||
self.__thread.join()
|
|
||||||
|
|
||||||
def _start_async_loop_in_thread(self) -> None:
|
|
||||||
# create a new event loop and run it in a separate thread
|
|
||||||
self.__loop = asyncio.new_event_loop()
|
|
||||||
self.__thread = threading.Thread(target=self._start_loop)
|
|
||||||
self.__thread.start()
|
|
||||||
|
|
||||||
def _set_start_and_stop_for_async_methods(self) -> None:
|
|
||||||
# inspect the methods of the class
|
# inspect the methods of the class
|
||||||
for name, method in inspect.getmembers(
|
for name, method in inspect.getmembers(
|
||||||
self, predicate=inspect.iscoroutinefunction
|
self, predicate=inspect.iscoroutinefunction
|
||||||
@ -121,17 +100,18 @@ class DataService(rpyc.Service):
|
|||||||
def start_task(*args: Any, **kwargs: Any) -> None:
|
def start_task(*args: Any, **kwargs: Any) -> None:
|
||||||
async def task(*args: Any, **kwargs: Any) -> None:
|
async def task(*args: Any, **kwargs: Any) -> None:
|
||||||
try:
|
try:
|
||||||
await getattr(self, name)(*args, **kwargs)
|
await method(*args, **kwargs)
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
print(f"Task {name} was cancelled")
|
print(f"Task {name} was cancelled")
|
||||||
|
|
||||||
self.__tasks[name] = asyncio.run_coroutine_threadsafe(
|
if not self.__tasks.get(name):
|
||||||
task(*args, **kwargs), self.__loop
|
self.__tasks[name] = self.__loop.create_task(task(*args, **kwargs))
|
||||||
)
|
else:
|
||||||
|
logger.error(f"Task `{name}` is already running!")
|
||||||
|
|
||||||
def stop_task() -> None:
|
def stop_task() -> None:
|
||||||
# cancel the task
|
# cancel the task
|
||||||
task = self.__tasks.get(name)
|
task = self.__tasks.pop(name)
|
||||||
if task is not None:
|
if task is not None:
|
||||||
self.__loop.call_soon_threadsafe(task.cancel)
|
self.__loop.call_soon_threadsafe(task.cancel)
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user