From 91a71ad004413799e08c9160fd0aa8f23c716ba0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mose=20M=C3=BCller?= Date: Mon, 23 Sep 2024 08:53:35 +0200 Subject: [PATCH 1/6] updates is_descriptor to exclude false positives for methods, functions and builtins --- src/pydase/utils/helpers.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/pydase/utils/helpers.py b/src/pydase/utils/helpers.py index 3f2f545..f47af3e 100644 --- a/src/pydase/utils/helpers.py +++ b/src/pydase/utils/helpers.py @@ -204,6 +204,17 @@ def function_has_arguments(func: Callable[..., Any]) -> bool: def is_descriptor(obj: object) -> bool: """Check if an object is a descriptor.""" + + # Exclude functions, methods, builtins and properties + if ( + inspect.isfunction(obj) + or inspect.ismethod(obj) + or inspect.isbuiltin(obj) + or isinstance(obj, property) + ): + return False + + # Check if it has any descriptor methods return any(hasattr(obj, method) for method in ("__get__", "__set__", "__delete__")) From 008e1262bbe17562b148018d9703670e5f8e44a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mose=20M=C3=BCller?= Date: Mon, 23 Sep 2024 08:57:00 +0200 Subject: [PATCH 2/6] updates PropertyObserver to support descriptors returning observables If a class attribute is a descriptor, get its value before checking if it is an Observable. --- .../observer/property_observer.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/src/pydase/observer_pattern/observer/property_observer.py b/src/pydase/observer_pattern/observer/property_observer.py index f9a125e..0e8ae4c 100644 --- a/src/pydase/observer_pattern/observer/property_observer.py +++ b/src/pydase/observer_pattern/observer/property_observer.py @@ -5,6 +5,7 @@ from typing import Any from pydase.observer_pattern.observable.observable import Observable from pydase.observer_pattern.observer.observer import Observer +from pydase.utils.helpers import is_descriptor logger = logging.getLogger(__name__) @@ -61,17 +62,27 @@ class PropertyObserver(Observer): self, obj: Observable, deps: dict[str, Any], prefix: str ) -> None: for k, value in {**vars(type(obj)), **vars(obj)}.items(): + actual_value = value prefix = ( f"{prefix}." if prefix != "" and not prefix.endswith(".") else prefix ) parent_path = f"{prefix}{k}" - if isinstance(value, Observable): + + # Get value from descriptor + if not isinstance(value, property) and is_descriptor(value): + actual_value = getattr(obj, k) + + if isinstance(actual_value, Observable): new_prefix = f"{parent_path}." deps.update( - self._get_properties_and_their_dependencies(value, new_prefix) + self._get_properties_and_their_dependencies( + actual_value, new_prefix + ) ) elif isinstance(value, list | dict): - self._process_collection_item_properties(value, deps, parent_path) + self._process_collection_item_properties( + actual_value, deps, parent_path + ) def _process_collection_item_properties( self, From aa705592b2bc1f1b8d828056cca4be3e3e5e689e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mose=20M=C3=BCller?= Date: Mon, 23 Sep 2024 09:15:42 +0200 Subject: [PATCH 3/6] removes code from Task meant to bind the passed function to the containing class instance The task object will only receive bound methods, so there is no need to keep the descriptor functionality anymore. --- src/pydase/task/task.py | 60 +++++++++-------------------------------- 1 file changed, 13 insertions(+), 47 deletions(-) diff --git a/src/pydase/task/task.py b/src/pydase/task/task.py index 3765a75..3481164 100644 --- a/src/pydase/task/task.py +++ b/src/pydase/task/task.py @@ -4,19 +4,16 @@ import logging import sys from collections.abc import Callable, Coroutine from typing import ( - Any, Generic, TypeVar, ) -from typing_extensions import TypeIs - from pydase.task.task_status import TaskStatus if sys.version_info < (3, 11): - from typing_extensions import Self + pass else: - from typing import Self + pass import pydase.data_service.data_service from pydase.utils.helpers import current_event_loop_exists @@ -27,17 +24,8 @@ logger = logging.getLogger(__name__) R = TypeVar("R") -def is_bound_method( - method: Callable[[], Coroutine[None, None, R | None]] - | Callable[[Any], Coroutine[None, None, R | None]], -) -> TypeIs[Callable[[], Coroutine[None, None, R | None]]]: - """Check if instance method is bound to an object.""" - return inspect.ismethod(method) - - class Task(pydase.data_service.data_service.DataService, Generic[R]): - """ - A class representing a task within the `pydase` framework. + """A class representing a task within the `pydase` framework. The `Task` class wraps an asynchronous function and provides methods to manage its lifecycle, such as `start()` and `stop()`. It is typically used to perform periodic @@ -85,25 +73,24 @@ class Task(pydase.data_service.data_service.DataService, Generic[R]): def __init__( self, - func: Callable[[Any], Coroutine[None, None, R | None]] - | Callable[[], Coroutine[None, None, R | None]], + func: Callable[[], Coroutine[None, None, R | None]], *, autostart: bool = False, ) -> None: super().__init__() self._autostart = autostart self._func_name = func.__name__ - self._bound_func: Callable[[], Coroutine[None, None, R | None]] | None = None - self._set_up = False - if is_bound_method(func): - self._func = func - self._bound_func = func - else: - self._func = func + self._func = func self._task: asyncio.Task[R | None] | None = None self._status = TaskStatus.NOT_RUNNING self._result: R | None = None + if not current_event_loop_exists(): + self._loop = asyncio.new_event_loop() + asyncio.set_event_loop(self._loop) + else: + self._loop = asyncio.get_event_loop() + @property def autostart(self) -> bool: """Defines if the task should be started automatically when the @@ -144,10 +131,10 @@ class Task(pydase.data_service.data_service.DataService, Generic[R]): self._result = task.result() async def run_task() -> R | None: - if inspect.iscoroutinefunction(self._bound_func): + if inspect.iscoroutinefunction(self._func): logger.info("Starting task %r", self._func_name) self._status = TaskStatus.RUNNING - res: Coroutine[None, None, R] = self._bound_func() + res: Coroutine[None, None, R | None] = self._func() try: return await res except asyncio.CancelledError: @@ -167,24 +154,3 @@ class Task(pydase.data_service.data_service.DataService, Generic[R]): if self._task: self._task.cancel() - - def __get__(self, instance: Any, owner: Any) -> Self: - """Descriptor method used to correctly set up the task. - - This descriptor method is called by the class instance containing the task. - It binds the task function to that class instance. - - Since the `__init__` function is called when a function is decorated with - [`@task`][pydase.task.decorator.task], some setup is delayed until this - descriptor function is called. - """ - - if instance and not self._set_up: - if not current_event_loop_exists(): - self._loop = asyncio.new_event_loop() - asyncio.set_event_loop(self._loop) - else: - self._loop = asyncio.get_event_loop() - self._bound_func = self._func.__get__(instance, owner) - self._set_up = True - return self From e9d8cbafc2bfabee0bc9d2b59e7f21e59cf996ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mose=20M=C3=BCller?= Date: Mon, 23 Sep 2024 09:21:04 +0200 Subject: [PATCH 4/6] adds PerInstanceTaskDescriptor class managing task objects for service class instances When defining a task on a DataService class, formerly a task object was created which replaced the decorated method as a class attribute. This caused errors when using multiple instances of that class as each instance was referring to the same task. This descriptor class now handles the tasks per instance of the service class. --- src/pydase/task/decorator.py | 92 +++++++++++++++++++++++++++++++----- 1 file changed, 81 insertions(+), 11 deletions(-) diff --git a/src/pydase/task/decorator.py b/src/pydase/task/decorator.py index 8e3aa15..b6974b2 100644 --- a/src/pydase/task/decorator.py +++ b/src/pydase/task/decorator.py @@ -1,7 +1,8 @@ import logging from collections.abc import Callable, Coroutine -from typing import Any, TypeVar +from typing import Any, Generic, TypeVar, overload +from pydase.data_service.data_service import DataService from pydase.task.task import Task logger = logging.getLogger(__name__) @@ -9,6 +10,69 @@ logger = logging.getLogger(__name__) R = TypeVar("R") +class PerInstanceTaskDescriptor(Generic[R]): + """ + A descriptor class that provides a unique [`Task`][pydase.task.task.Task] object + for each instance of a [`DataService`][pydase.data_service.data_service.DataService] + class. + + The `PerInstanceTaskDescriptor` is used to transform an asynchronous function into a + task that is managed independently for each instance of a `DataService` subclass. + This allows tasks to be initialized, started, and stopped on a per-instance basis, + providing better control over task execution within the service. + + The `PerInstanceTaskDescriptor` is not intended to be used directly. Instead, it is + used internally by the `@task` decorator to manage task objects for each instance of + the service class. + """ + + def __init__( + self, + func: Callable[[Any], Coroutine[None, None, R]] + | Callable[[], Coroutine[None, None, R]], + autostart: bool = False, + ) -> None: + self.__func = func + self.__autostart = autostart + self.__task_instances: dict[object, Task[R]] = {} + + def __set_name__(self, owner: type[DataService], name: str) -> None: + """Stores the name of the task within the owning class. This method is called + automatically when the descriptor is assigned to a class attribute. + """ + + self.__task_name = name + + @overload + def __get__( + self, instance: None, owner: type[DataService] + ) -> "PerInstanceTaskDescriptor[R]": + """Returns the descriptor itself when accessed through the class.""" + + @overload + def __get__(self, instance: DataService, owner: type[DataService]) -> Task[R]: + """Returns the `Task` object associated with the specific `DataService` + instance. + If no task exists for the instance, a new `Task` object is created and stored + in the `__task_instances` dictionary. + """ + + def __get__( + self, instance: DataService | None, owner: type[DataService] + ) -> "Task[R] | PerInstanceTaskDescriptor[R]": + if instance is None: + return self + + # Create a new Task object for this instance, using the function's name. + if instance not in self.__task_instances: + self.__task_instances[instance] = instance._initialise_new_objects( + self.__task_name, + Task(self.__func.__get__(instance, owner), autostart=self.__autostart), + ) + + return self.__task_instances[instance] + + def task( *, autostart: bool = False ) -> Callable[ @@ -16,18 +80,22 @@ def task( Callable[[Any], Coroutine[None, None, R]] | Callable[[], Coroutine[None, None, R]] ], - Task[R], + PerInstanceTaskDescriptor[R], ]: """ - A decorator to define a function as a task within a + A decorator to define an asynchronous function as a per-instance task within a [`DataService`][pydase.DataService] class. This decorator transforms an asynchronous function into a - [`Task`][pydase.task.task.Task] object. The `Task` object provides methods like - `start()` and `stop()` to control the execution of the task. + [`Task`][pydase.task.task.Task] object that is unique to each instance of the + `DataService` class. The resulting `Task` object provides methods like `start()` + and `stop()` to control the execution of the task, and manages the task's lifecycle + independently for each instance of the service. - Tasks are typically used to perform periodic or recurring jobs, such as reading - sensor data, updating databases, or other operations that need to be repeated over + The decorator is particularly useful for defining tasks that need to run + periodically or perform asynchronous operations, such as polling data sources, + updating databases, or any recurring job that should be managed within the context + of a `DataService`. time. Args: @@ -36,8 +104,10 @@ def task( initialized. Defaults to False. Returns: - A decorator that converts an asynchronous function into a - [`Task`][pydase.task.task.Task] object. + A decorator that wraps an asynchronous function in a + [`PerInstanceTaskDescriptor`][pydase.task.decorator.PerInstanceTaskDescriptor] + object, which, when accessed, provides an instance-specific + [`Task`][pydase.task.task.Task] object. Example: ```python @@ -69,7 +139,7 @@ def task( def decorator( func: Callable[[Any], Coroutine[None, None, R]] | Callable[[], Coroutine[None, None, R]], - ) -> Task[R]: - return Task(func, autostart=autostart) + ) -> PerInstanceTaskDescriptor[R]: + return PerInstanceTaskDescriptor(func, autostart=autostart) return decorator From e491ac745814f422dabdcf2a70c65bee389229fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mose=20M=C3=BCller?= Date: Mon, 23 Sep 2024 09:22:28 +0200 Subject: [PATCH 5/6] observable does not have to initialise descriptor objects anymore The task decorator is not returning a Task object directly anymore, but rather a descriptor which is returning the task. This is where the task is initialised and this does not have to be done in the observable base class, any more. --- src/pydase/observer_pattern/observable/observable.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/src/pydase/observer_pattern/observable/observable.py b/src/pydase/observer_pattern/observable/observable.py index 9299fb3..b368179 100644 --- a/src/pydase/observer_pattern/observable/observable.py +++ b/src/pydase/observer_pattern/observable/observable.py @@ -22,12 +22,9 @@ class Observable(ObservableObject): - {"__annotations__"} } for name, value in class_attrs.items(): - if isinstance(value, property) or callable(value): - continue - if is_descriptor(value): - # Descriptors have to be stored as a class variable in another class to - # work properly. So don't make it an instance attribute. - self._initialise_new_objects(name, value) + if isinstance(value, property) or callable(value) or is_descriptor(value): + # Properties, methods and descriptors have to be stored as class + # attributes to work properly. So don't make it an instance attribute. continue self.__dict__[name] = self._initialise_new_objects(name, value) From 6b6ce1d43f837eb9e4d9faea30b70ac21019127a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mose=20M=C3=BCller?= Date: Mon, 23 Sep 2024 09:44:43 +0200 Subject: [PATCH 6/6] adds test checking for multiple instances of a class containing a task --- tests/task/test_task.py | 151 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 151 insertions(+) diff --git a/tests/task/test_task.py b/tests/task/test_task.py index bd53c3d..9c83d42 100644 --- a/tests/task/test_task.py +++ b/tests/task/test_task.py @@ -138,3 +138,154 @@ async def test_nested_dict_autostart_task( "'sub_services_dict[\"second\"].my_task.status' changed to 'TaskStatus.RUNNING'" in caplog.text ) + + +@pytest.mark.asyncio(scope="function") +async def test_manual_start_with_multiple_service_instances( + caplog: LogCaptureFixture, +) -> None: + class MySubService(pydase.DataService): + @task() + async def my_task(self) -> None: + logger.info("Triggered task.") + while True: + await asyncio.sleep(1) + + class MyService(pydase.DataService): + sub_services_list = [MySubService() for i in range(2)] + sub_services_dict = {"first": MySubService(), "second": MySubService()} + + service_instance = MyService() + state_manager = StateManager(service_instance) + DataServiceObserver(state_manager) + + autostart_service_tasks(service_instance) + + await asyncio.sleep(0.1) + + assert ( + service_instance.sub_services_list[0].my_task.status == TaskStatus.NOT_RUNNING + ) + assert ( + service_instance.sub_services_list[1].my_task.status == TaskStatus.NOT_RUNNING + ) + assert ( + service_instance.sub_services_dict["first"].my_task.status + == TaskStatus.NOT_RUNNING + ) + assert ( + service_instance.sub_services_dict["second"].my_task.status + == TaskStatus.NOT_RUNNING + ) + + service_instance.sub_services_list[0].my_task.start() + await asyncio.sleep(0.01) + + assert service_instance.sub_services_list[0].my_task.status == TaskStatus.RUNNING + assert ( + "'sub_services_list[0].my_task.status' changed to 'TaskStatus.RUNNING'" + in caplog.text + ) + assert ( + "'sub_services_list[1].my_task.status' changed to 'TaskStatus.RUNNING'" + not in caplog.text + ) + assert ( + "'sub_services_dict[\"first\"].my_task.status' changed to 'TaskStatus.RUNNING'" + not in caplog.text + ) + assert ( + "'sub_services_dict[\"second\"].my_task.status' changed to 'TaskStatus.RUNNING'" + not in caplog.text + ) + + service_instance.sub_services_list[0].my_task.stop() + await asyncio.sleep(0.01) + + assert "Task 'my_task' was cancelled" in caplog.text + caplog.clear() + + service_instance.sub_services_list[1].my_task.start() + await asyncio.sleep(0.01) + + assert service_instance.sub_services_list[1].my_task.status == TaskStatus.RUNNING + assert ( + "'sub_services_list[0].my_task.status' changed to 'TaskStatus.RUNNING'" + not in caplog.text + ) + assert ( + "'sub_services_list[1].my_task.status' changed to 'TaskStatus.RUNNING'" + in caplog.text + ) + assert ( + "'sub_services_dict[\"first\"].my_task.status' changed to 'TaskStatus.RUNNING'" + not in caplog.text + ) + assert ( + "'sub_services_dict[\"second\"].my_task.status' changed to 'TaskStatus.RUNNING'" + not in caplog.text + ) + + service_instance.sub_services_list[1].my_task.stop() + await asyncio.sleep(0.01) + + assert "Task 'my_task' was cancelled" in caplog.text + caplog.clear() + + service_instance.sub_services_dict["first"].my_task.start() + await asyncio.sleep(0.01) + + assert ( + service_instance.sub_services_dict["first"].my_task.status == TaskStatus.RUNNING + ) + assert ( + "'sub_services_list[0].my_task.status' changed to 'TaskStatus.RUNNING'" + not in caplog.text + ) + assert ( + "'sub_services_list[1].my_task.status' changed to 'TaskStatus.RUNNING'" + not in caplog.text + ) + assert ( + "'sub_services_dict[\"first\"].my_task.status' changed to 'TaskStatus.RUNNING'" + in caplog.text + ) + assert ( + "'sub_services_dict[\"second\"].my_task.status' changed to 'TaskStatus.RUNNING'" + not in caplog.text + ) + + service_instance.sub_services_dict["first"].my_task.stop() + await asyncio.sleep(0.01) + + assert "Task 'my_task' was cancelled" in caplog.text + caplog.clear() + + service_instance.sub_services_dict["second"].my_task.start() + await asyncio.sleep(0.01) + + assert ( + service_instance.sub_services_dict["second"].my_task.status + == TaskStatus.RUNNING + ) + assert ( + "'sub_services_list[0].my_task.status' changed to 'TaskStatus.RUNNING'" + not in caplog.text + ) + assert ( + "'sub_services_list[1].my_task.status' changed to 'TaskStatus.RUNNING'" + not in caplog.text + ) + assert ( + "'sub_services_dict[\"first\"].my_task.status' changed to 'TaskStatus.RUNNING'" + not in caplog.text + ) + assert ( + "'sub_services_dict[\"second\"].my_task.status' changed to 'TaskStatus.RUNNING'" + in caplog.text + ) + + service_instance.sub_services_dict["second"].my_task.stop() + await asyncio.sleep(0.01) + + assert "Task 'my_task' was cancelled" in caplog.text