adds PerInstanceTaskDescriptor class managing task objects for service class instances

When defining a task on a DataService class, formerly a task object was
created which replaced the decorated method as a class attribute. This
caused errors when using multiple instances of that class as each
instance was referring to the same task.
This descriptor class now handles the tasks per instance of the service
class.
This commit is contained in:
Mose Müller 2024-09-23 09:21:04 +02:00
parent aa705592b2
commit e9d8cbafc2

View File

@ -1,7 +1,8 @@
import logging
from collections.abc import Callable, Coroutine
from typing import Any, TypeVar
from typing import Any, Generic, TypeVar, overload
from pydase.data_service.data_service import DataService
from pydase.task.task import Task
logger = logging.getLogger(__name__)
@ -9,6 +10,69 @@ logger = logging.getLogger(__name__)
R = TypeVar("R")
class PerInstanceTaskDescriptor(Generic[R]):
"""
A descriptor class that provides a unique [`Task`][pydase.task.task.Task] object
for each instance of a [`DataService`][pydase.data_service.data_service.DataService]
class.
The `PerInstanceTaskDescriptor` is used to transform an asynchronous function into a
task that is managed independently for each instance of a `DataService` subclass.
This allows tasks to be initialized, started, and stopped on a per-instance basis,
providing better control over task execution within the service.
The `PerInstanceTaskDescriptor` is not intended to be used directly. Instead, it is
used internally by the `@task` decorator to manage task objects for each instance of
the service class.
"""
def __init__(
self,
func: Callable[[Any], Coroutine[None, None, R]]
| Callable[[], Coroutine[None, None, R]],
autostart: bool = False,
) -> None:
self.__func = func
self.__autostart = autostart
self.__task_instances: dict[object, Task[R]] = {}
def __set_name__(self, owner: type[DataService], name: str) -> None:
"""Stores the name of the task within the owning class. This method is called
automatically when the descriptor is assigned to a class attribute.
"""
self.__task_name = name
@overload
def __get__(
self, instance: None, owner: type[DataService]
) -> "PerInstanceTaskDescriptor[R]":
"""Returns the descriptor itself when accessed through the class."""
@overload
def __get__(self, instance: DataService, owner: type[DataService]) -> Task[R]:
"""Returns the `Task` object associated with the specific `DataService`
instance.
If no task exists for the instance, a new `Task` object is created and stored
in the `__task_instances` dictionary.
"""
def __get__(
self, instance: DataService | None, owner: type[DataService]
) -> "Task[R] | PerInstanceTaskDescriptor[R]":
if instance is None:
return self
# Create a new Task object for this instance, using the function's name.
if instance not in self.__task_instances:
self.__task_instances[instance] = instance._initialise_new_objects(
self.__task_name,
Task(self.__func.__get__(instance, owner), autostart=self.__autostart),
)
return self.__task_instances[instance]
def task(
*, autostart: bool = False
) -> Callable[
@ -16,18 +80,22 @@ def task(
Callable[[Any], Coroutine[None, None, R]]
| Callable[[], Coroutine[None, None, R]]
],
Task[R],
PerInstanceTaskDescriptor[R],
]:
"""
A decorator to define a function as a task within a
A decorator to define an asynchronous function as a per-instance task within a
[`DataService`][pydase.DataService] class.
This decorator transforms an asynchronous function into a
[`Task`][pydase.task.task.Task] object. The `Task` object provides methods like
`start()` and `stop()` to control the execution of the task.
[`Task`][pydase.task.task.Task] object that is unique to each instance of the
`DataService` class. The resulting `Task` object provides methods like `start()`
and `stop()` to control the execution of the task, and manages the task's lifecycle
independently for each instance of the service.
Tasks are typically used to perform periodic or recurring jobs, such as reading
sensor data, updating databases, or other operations that need to be repeated over
The decorator is particularly useful for defining tasks that need to run
periodically or perform asynchronous operations, such as polling data sources,
updating databases, or any recurring job that should be managed within the context
of a `DataService`.
time.
Args:
@ -36,8 +104,10 @@ def task(
initialized. Defaults to False.
Returns:
A decorator that converts an asynchronous function into a
[`Task`][pydase.task.task.Task] object.
A decorator that wraps an asynchronous function in a
[`PerInstanceTaskDescriptor`][pydase.task.decorator.PerInstanceTaskDescriptor]
object, which, when accessed, provides an instance-specific
[`Task`][pydase.task.task.Task] object.
Example:
```python
@ -69,7 +139,7 @@ def task(
def decorator(
func: Callable[[Any], Coroutine[None, None, R]]
| Callable[[], Coroutine[None, None, R]],
) -> Task[R]:
return Task(func, autostart=autostart)
) -> PerInstanceTaskDescriptor[R]:
return PerInstanceTaskDescriptor(func, autostart=autostart)
return decorator