Merge pull request #161 from tiqi-group/160-control-of-tasks-in-instances-derived-from-same-class-only-controls-task-of-first-instance

fix: controlling tasks of instances derived from same class
This commit is contained in:
Mose Müller 2024-09-23 12:54:38 +02:00 committed by GitHub
commit c98e407ed7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 273 additions and 67 deletions

View File

@ -22,12 +22,9 @@ class Observable(ObservableObject):
- {"__annotations__"}
}
for name, value in class_attrs.items():
if isinstance(value, property) or callable(value):
continue
if is_descriptor(value):
# Descriptors have to be stored as a class variable in another class to
# work properly. So don't make it an instance attribute.
self._initialise_new_objects(name, value)
if isinstance(value, property) or callable(value) or is_descriptor(value):
# Properties, methods and descriptors have to be stored as class
# attributes to work properly. So don't make it an instance attribute.
continue
self.__dict__[name] = self._initialise_new_objects(name, value)

View File

@ -5,6 +5,7 @@ from typing import Any
from pydase.observer_pattern.observable.observable import Observable
from pydase.observer_pattern.observer.observer import Observer
from pydase.utils.helpers import is_descriptor
logger = logging.getLogger(__name__)
@ -61,17 +62,27 @@ class PropertyObserver(Observer):
self, obj: Observable, deps: dict[str, Any], prefix: str
) -> None:
for k, value in {**vars(type(obj)), **vars(obj)}.items():
actual_value = value
prefix = (
f"{prefix}." if prefix != "" and not prefix.endswith(".") else prefix
)
parent_path = f"{prefix}{k}"
if isinstance(value, Observable):
# Get value from descriptor
if not isinstance(value, property) and is_descriptor(value):
actual_value = getattr(obj, k)
if isinstance(actual_value, Observable):
new_prefix = f"{parent_path}."
deps.update(
self._get_properties_and_their_dependencies(value, new_prefix)
self._get_properties_and_their_dependencies(
actual_value, new_prefix
)
)
elif isinstance(value, list | dict):
self._process_collection_item_properties(value, deps, parent_path)
self._process_collection_item_properties(
actual_value, deps, parent_path
)
def _process_collection_item_properties(
self,

View File

@ -1,7 +1,8 @@
import logging
from collections.abc import Callable, Coroutine
from typing import Any, TypeVar
from typing import Any, Generic, TypeVar, overload
from pydase.data_service.data_service import DataService
from pydase.task.task import Task
logger = logging.getLogger(__name__)
@ -9,6 +10,69 @@ logger = logging.getLogger(__name__)
R = TypeVar("R")
class PerInstanceTaskDescriptor(Generic[R]):
"""
A descriptor class that provides a unique [`Task`][pydase.task.task.Task] object
for each instance of a [`DataService`][pydase.data_service.data_service.DataService]
class.
The `PerInstanceTaskDescriptor` is used to transform an asynchronous function into a
task that is managed independently for each instance of a `DataService` subclass.
This allows tasks to be initialized, started, and stopped on a per-instance basis,
providing better control over task execution within the service.
The `PerInstanceTaskDescriptor` is not intended to be used directly. Instead, it is
used internally by the `@task` decorator to manage task objects for each instance of
the service class.
"""
def __init__(
self,
func: Callable[[Any], Coroutine[None, None, R]]
| Callable[[], Coroutine[None, None, R]],
autostart: bool = False,
) -> None:
self.__func = func
self.__autostart = autostart
self.__task_instances: dict[object, Task[R]] = {}
def __set_name__(self, owner: type[DataService], name: str) -> None:
"""Stores the name of the task within the owning class. This method is called
automatically when the descriptor is assigned to a class attribute.
"""
self.__task_name = name
@overload
def __get__(
self, instance: None, owner: type[DataService]
) -> "PerInstanceTaskDescriptor[R]":
"""Returns the descriptor itself when accessed through the class."""
@overload
def __get__(self, instance: DataService, owner: type[DataService]) -> Task[R]:
"""Returns the `Task` object associated with the specific `DataService`
instance.
If no task exists for the instance, a new `Task` object is created and stored
in the `__task_instances` dictionary.
"""
def __get__(
self, instance: DataService | None, owner: type[DataService]
) -> "Task[R] | PerInstanceTaskDescriptor[R]":
if instance is None:
return self
# Create a new Task object for this instance, using the function's name.
if instance not in self.__task_instances:
self.__task_instances[instance] = instance._initialise_new_objects(
self.__task_name,
Task(self.__func.__get__(instance, owner), autostart=self.__autostart),
)
return self.__task_instances[instance]
def task(
*, autostart: bool = False
) -> Callable[
@ -16,18 +80,22 @@ def task(
Callable[[Any], Coroutine[None, None, R]]
| Callable[[], Coroutine[None, None, R]]
],
Task[R],
PerInstanceTaskDescriptor[R],
]:
"""
A decorator to define a function as a task within a
A decorator to define an asynchronous function as a per-instance task within a
[`DataService`][pydase.DataService] class.
This decorator transforms an asynchronous function into a
[`Task`][pydase.task.task.Task] object. The `Task` object provides methods like
`start()` and `stop()` to control the execution of the task.
[`Task`][pydase.task.task.Task] object that is unique to each instance of the
`DataService` class. The resulting `Task` object provides methods like `start()`
and `stop()` to control the execution of the task, and manages the task's lifecycle
independently for each instance of the service.
Tasks are typically used to perform periodic or recurring jobs, such as reading
sensor data, updating databases, or other operations that need to be repeated over
The decorator is particularly useful for defining tasks that need to run
periodically or perform asynchronous operations, such as polling data sources,
updating databases, or any recurring job that should be managed within the context
of a `DataService`.
time.
Args:
@ -36,8 +104,10 @@ def task(
initialized. Defaults to False.
Returns:
A decorator that converts an asynchronous function into a
[`Task`][pydase.task.task.Task] object.
A decorator that wraps an asynchronous function in a
[`PerInstanceTaskDescriptor`][pydase.task.decorator.PerInstanceTaskDescriptor]
object, which, when accessed, provides an instance-specific
[`Task`][pydase.task.task.Task] object.
Example:
```python
@ -69,7 +139,7 @@ def task(
def decorator(
func: Callable[[Any], Coroutine[None, None, R]]
| Callable[[], Coroutine[None, None, R]],
) -> Task[R]:
return Task(func, autostart=autostart)
) -> PerInstanceTaskDescriptor[R]:
return PerInstanceTaskDescriptor(func, autostart=autostart)
return decorator

View File

@ -4,19 +4,16 @@ import logging
import sys
from collections.abc import Callable, Coroutine
from typing import (
Any,
Generic,
TypeVar,
)
from typing_extensions import TypeIs
from pydase.task.task_status import TaskStatus
if sys.version_info < (3, 11):
from typing_extensions import Self
pass
else:
from typing import Self
pass
import pydase.data_service.data_service
from pydase.utils.helpers import current_event_loop_exists
@ -27,17 +24,8 @@ logger = logging.getLogger(__name__)
R = TypeVar("R")
def is_bound_method(
method: Callable[[], Coroutine[None, None, R | None]]
| Callable[[Any], Coroutine[None, None, R | None]],
) -> TypeIs[Callable[[], Coroutine[None, None, R | None]]]:
"""Check if instance method is bound to an object."""
return inspect.ismethod(method)
class Task(pydase.data_service.data_service.DataService, Generic[R]):
"""
A class representing a task within the `pydase` framework.
"""A class representing a task within the `pydase` framework.
The `Task` class wraps an asynchronous function and provides methods to manage its
lifecycle, such as `start()` and `stop()`. It is typically used to perform periodic
@ -85,25 +73,24 @@ class Task(pydase.data_service.data_service.DataService, Generic[R]):
def __init__(
self,
func: Callable[[Any], Coroutine[None, None, R | None]]
| Callable[[], Coroutine[None, None, R | None]],
func: Callable[[], Coroutine[None, None, R | None]],
*,
autostart: bool = False,
) -> None:
super().__init__()
self._autostart = autostart
self._func_name = func.__name__
self._bound_func: Callable[[], Coroutine[None, None, R | None]] | None = None
self._set_up = False
if is_bound_method(func):
self._func = func
self._bound_func = func
else:
self._func = func
self._func = func
self._task: asyncio.Task[R | None] | None = None
self._status = TaskStatus.NOT_RUNNING
self._result: R | None = None
if not current_event_loop_exists():
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
else:
self._loop = asyncio.get_event_loop()
@property
def autostart(self) -> bool:
"""Defines if the task should be started automatically when the
@ -144,10 +131,10 @@ class Task(pydase.data_service.data_service.DataService, Generic[R]):
self._result = task.result()
async def run_task() -> R | None:
if inspect.iscoroutinefunction(self._bound_func):
if inspect.iscoroutinefunction(self._func):
logger.info("Starting task %r", self._func_name)
self._status = TaskStatus.RUNNING
res: Coroutine[None, None, R] = self._bound_func()
res: Coroutine[None, None, R | None] = self._func()
try:
return await res
except asyncio.CancelledError:
@ -167,24 +154,3 @@ class Task(pydase.data_service.data_service.DataService, Generic[R]):
if self._task:
self._task.cancel()
def __get__(self, instance: Any, owner: Any) -> Self:
"""Descriptor method used to correctly set up the task.
This descriptor method is called by the class instance containing the task.
It binds the task function to that class instance.
Since the `__init__` function is called when a function is decorated with
[`@task`][pydase.task.decorator.task], some setup is delayed until this
descriptor function is called.
"""
if instance and not self._set_up:
if not current_event_loop_exists():
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
else:
self._loop = asyncio.get_event_loop()
self._bound_func = self._func.__get__(instance, owner)
self._set_up = True
return self

View File

@ -204,6 +204,17 @@ def function_has_arguments(func: Callable[..., Any]) -> bool:
def is_descriptor(obj: object) -> bool:
"""Check if an object is a descriptor."""
# Exclude functions, methods, builtins and properties
if (
inspect.isfunction(obj)
or inspect.ismethod(obj)
or inspect.isbuiltin(obj)
or isinstance(obj, property)
):
return False
# Check if it has any descriptor methods
return any(hasattr(obj, method) for method in ("__get__", "__set__", "__delete__"))

View File

@ -138,3 +138,154 @@ async def test_nested_dict_autostart_task(
"'sub_services_dict[\"second\"].my_task.status' changed to 'TaskStatus.RUNNING'"
in caplog.text
)
@pytest.mark.asyncio(scope="function")
async def test_manual_start_with_multiple_service_instances(
caplog: LogCaptureFixture,
) -> None:
class MySubService(pydase.DataService):
@task()
async def my_task(self) -> None:
logger.info("Triggered task.")
while True:
await asyncio.sleep(1)
class MyService(pydase.DataService):
sub_services_list = [MySubService() for i in range(2)]
sub_services_dict = {"first": MySubService(), "second": MySubService()}
service_instance = MyService()
state_manager = StateManager(service_instance)
DataServiceObserver(state_manager)
autostart_service_tasks(service_instance)
await asyncio.sleep(0.1)
assert (
service_instance.sub_services_list[0].my_task.status == TaskStatus.NOT_RUNNING
)
assert (
service_instance.sub_services_list[1].my_task.status == TaskStatus.NOT_RUNNING
)
assert (
service_instance.sub_services_dict["first"].my_task.status
== TaskStatus.NOT_RUNNING
)
assert (
service_instance.sub_services_dict["second"].my_task.status
== TaskStatus.NOT_RUNNING
)
service_instance.sub_services_list[0].my_task.start()
await asyncio.sleep(0.01)
assert service_instance.sub_services_list[0].my_task.status == TaskStatus.RUNNING
assert (
"'sub_services_list[0].my_task.status' changed to 'TaskStatus.RUNNING'"
in caplog.text
)
assert (
"'sub_services_list[1].my_task.status' changed to 'TaskStatus.RUNNING'"
not in caplog.text
)
assert (
"'sub_services_dict[\"first\"].my_task.status' changed to 'TaskStatus.RUNNING'"
not in caplog.text
)
assert (
"'sub_services_dict[\"second\"].my_task.status' changed to 'TaskStatus.RUNNING'"
not in caplog.text
)
service_instance.sub_services_list[0].my_task.stop()
await asyncio.sleep(0.01)
assert "Task 'my_task' was cancelled" in caplog.text
caplog.clear()
service_instance.sub_services_list[1].my_task.start()
await asyncio.sleep(0.01)
assert service_instance.sub_services_list[1].my_task.status == TaskStatus.RUNNING
assert (
"'sub_services_list[0].my_task.status' changed to 'TaskStatus.RUNNING'"
not in caplog.text
)
assert (
"'sub_services_list[1].my_task.status' changed to 'TaskStatus.RUNNING'"
in caplog.text
)
assert (
"'sub_services_dict[\"first\"].my_task.status' changed to 'TaskStatus.RUNNING'"
not in caplog.text
)
assert (
"'sub_services_dict[\"second\"].my_task.status' changed to 'TaskStatus.RUNNING'"
not in caplog.text
)
service_instance.sub_services_list[1].my_task.stop()
await asyncio.sleep(0.01)
assert "Task 'my_task' was cancelled" in caplog.text
caplog.clear()
service_instance.sub_services_dict["first"].my_task.start()
await asyncio.sleep(0.01)
assert (
service_instance.sub_services_dict["first"].my_task.status == TaskStatus.RUNNING
)
assert (
"'sub_services_list[0].my_task.status' changed to 'TaskStatus.RUNNING'"
not in caplog.text
)
assert (
"'sub_services_list[1].my_task.status' changed to 'TaskStatus.RUNNING'"
not in caplog.text
)
assert (
"'sub_services_dict[\"first\"].my_task.status' changed to 'TaskStatus.RUNNING'"
in caplog.text
)
assert (
"'sub_services_dict[\"second\"].my_task.status' changed to 'TaskStatus.RUNNING'"
not in caplog.text
)
service_instance.sub_services_dict["first"].my_task.stop()
await asyncio.sleep(0.01)
assert "Task 'my_task' was cancelled" in caplog.text
caplog.clear()
service_instance.sub_services_dict["second"].my_task.start()
await asyncio.sleep(0.01)
assert (
service_instance.sub_services_dict["second"].my_task.status
== TaskStatus.RUNNING
)
assert (
"'sub_services_list[0].my_task.status' changed to 'TaskStatus.RUNNING'"
not in caplog.text
)
assert (
"'sub_services_list[1].my_task.status' changed to 'TaskStatus.RUNNING'"
not in caplog.text
)
assert (
"'sub_services_dict[\"first\"].my_task.status' changed to 'TaskStatus.RUNNING'"
not in caplog.text
)
assert (
"'sub_services_dict[\"second\"].my_task.status' changed to 'TaskStatus.RUNNING'"
in caplog.text
)
service_instance.sub_services_dict["second"].my_task.stop()
await asyncio.sleep(0.01)
assert "Task 'my_task' was cancelled" in caplog.text