From 8564df5adc7030f35ade12f87c95db10c9fcc18e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mose=20M=C3=BCller?= Date: Wed, 25 Oct 2023 11:23:26 +0200 Subject: [PATCH 1/6] fix: adds start_stop_task callbacks to lists --- src/pydase/data_service/callback_manager.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/pydase/data_service/callback_manager.py b/src/pydase/data_service/callback_manager.py index 3715201..84d4760 100644 --- a/src/pydase/data_service/callback_manager.py +++ b/src/pydase/data_service/callback_manager.py @@ -359,6 +359,12 @@ class CallbackManager: attrs: dict[str, Any] = get_class_and_instance_attributes(obj) for nested_attr_name, nested_attr in attrs.items(): + if isinstance(nested_attr, DataServiceList): + for i, item in enumerate(nested_attr): + if isinstance(item, AbstractDataService): + self._register_start_stop_task_callbacks( + item, parent_path=f"{parent_path}.{nested_attr_name}[{i}]" + ) if isinstance(nested_attr, AbstractDataService): self._register_start_stop_task_callbacks( nested_attr, parent_path=f"{parent_path}.{nested_attr_name}" From 0504a50a08d5b8d173305a857d4ca8b80201f7a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mose=20M=C3=BCller?= Date: Wed, 25 Oct 2023 11:57:50 +0200 Subject: [PATCH 2/6] fix: creates property functions to avoid closure and late binding issue When having multiple tasks, they all pointed to the one defined last. --- src/pydase/data_service/task_manager.py | 193 ++++++++++++++---------- 1 file changed, 110 insertions(+), 83 deletions(-) diff --git a/src/pydase/data_service/task_manager.py b/src/pydase/data_service/task_manager.py index 5908369..1089453 100644 --- a/src/pydase/data_service/task_manager.py +++ b/src/pydase/data_service/task_manager.py @@ -95,90 +95,9 @@ class TaskManager: for name, method in inspect.getmembers( self.service, predicate=inspect.iscoroutinefunction ): - - @wraps(method) - def start_task(*args: Any, **kwargs: Any) -> None: - def task_done_callback(task: asyncio.Task, name: str) -> None: - """Handles tasks that have finished. - - Removes a task from the tasks dictionary, calls the defined - callbacks, and logs and re-raises exceptions.""" - - # removing the finished task from the tasks i - self.tasks.pop(name, None) - - # emit the notification that the task was stopped - for callback in self.task_status_change_callbacks: - callback(name, None) - - exception = task.exception() - if exception is not None: - # Handle the exception, or you can re-raise it. - logger.error( - f"Task '{name}' encountered an exception: " - f"{type(exception).__name__}: {exception}" - ) - raise exception - - async def task(*args: Any, **kwargs: Any) -> None: - try: - await method(*args, **kwargs) - except asyncio.CancelledError: - logger.info(f"Task {name} was cancelled") - - if not self.tasks.get(name): - # Get the signature of the coroutine method to start - sig = inspect.signature(method) - - # Create a list of the parameter names from the method signature. - parameter_names = list(sig.parameters.keys()) - - # Extend the list of positional arguments with None values to match - # the length of the parameter names list. This is done to ensure - # that zip can pair each parameter name with a corresponding value. - args_padded = list(args) + [None] * ( - len(parameter_names) - len(args) - ) - - # Create a dictionary of keyword arguments by pairing the parameter - # names with the values in 'args_padded'. Then merge this dictionary - # with the 'kwargs' dictionary. If a parameter is specified in both - # 'args_padded' and 'kwargs', the value from 'kwargs' is used. - kwargs_updated = { - **dict(zip(parameter_names, args_padded)), - **kwargs, - } - - # creating the task and adding the task_done_callback which checks - # if an exception has occured during the task execution - task_object = self._loop.create_task(task(*args, **kwargs)) - task_object.add_done_callback( - lambda task: task_done_callback(task, name) - ) - - # Store the task and its arguments in the '__tasks' dictionary. The - # key is the name of the method, and the value is a dictionary - # containing the task object and the updated keyword arguments. - self.tasks[name] = { - "task": task_object, - "kwargs": kwargs_updated, - } - - # emit the notification that the task was started - for callback in self.task_status_change_callbacks: - callback(name, kwargs_updated) - else: - logger.error(f"Task `{name}` is already running!") - - def stop_task() -> None: - # cancel the task - task = self.tasks.get(name, None) - if task is not None: - self._loop.call_soon_threadsafe(task["task"].cancel) - # create start and stop methods for each coroutine - setattr(self.service, f"start_{name}", start_task) - setattr(self.service, f"stop_{name}", stop_task) + setattr(self.service, f"start_{name}", self._make_start_task(name, method)) + setattr(self.service, f"stop_{name}", self._make_stop_task(name)) def start_autostart_tasks(self) -> None: if self.service._autostart_tasks is not None: @@ -190,3 +109,111 @@ class TaskManager: logger.warning( f"No start method found for service '{service_name}'" ) + + def _make_stop_task(self, name: str) -> Callable[..., Any]: + """ + Factory function to create a 'stop_task' function for a running task. + + The generated function cancels the associated asyncio task using 'name' for + identification, ensuring proper cleanup. Avoids closure and late binding issues. + + Args: + name (str): The name of the coroutine task, used for its identification. + """ + + def stop_task() -> None: + # cancel the task + task = self.tasks.get(name, None) + if task is not None: + self._loop.call_soon_threadsafe(task["task"].cancel) + + return stop_task + + def _make_start_task( # noqa + self, name: str, method: Callable[..., Any] + ) -> Callable[..., Any]: + """ + Factory function to create a 'start_task' function for a coroutine. + + The generated function starts the coroutine as an asyncio task, handling + registration and monitoring. + It uses 'name' and 'method' to avoid the closure and late binding issue. + + Args: + name (str): The name of the coroutine, used for task management. + method (callable): The coroutine to be turned into an asyncio task. + """ + + @wraps(method) + def start_task(*args: Any, **kwargs: Any) -> None: + def task_done_callback(task: asyncio.Task, name: str) -> None: + """Handles tasks that have finished. + + Removes a task from the tasks dictionary, calls the defined + callbacks, and logs and re-raises exceptions.""" + + # removing the finished task from the tasks i + self.tasks.pop(name, None) + + # emit the notification that the task was stopped + for callback in self.task_status_change_callbacks: + callback(name, None) + + exception = task.exception() + if exception is not None: + # Handle the exception, or you can re-raise it. + logger.error( + f"Task '{name}' encountered an exception: " + f"{type(exception).__name__}: {exception}" + ) + raise exception + + async def task(*args: Any, **kwargs: Any) -> None: + try: + await method(*args, **kwargs) + except asyncio.CancelledError: + logger.info(f"Task {name} was cancelled") + + if not self.tasks.get(name): + # Get the signature of the coroutine method to start + sig = inspect.signature(method) + + # Create a list of the parameter names from the method signature. + parameter_names = list(sig.parameters.keys()) + + # Extend the list of positional arguments with None values to match + # the length of the parameter names list. This is done to ensure + # that zip can pair each parameter name with a corresponding value. + args_padded = list(args) + [None] * (len(parameter_names) - len(args)) + + # Create a dictionary of keyword arguments by pairing the parameter + # names with the values in 'args_padded'. Then merge this dictionary + # with the 'kwargs' dictionary. If a parameter is specified in both + # 'args_padded' and 'kwargs', the value from 'kwargs' is used. + kwargs_updated = { + **dict(zip(parameter_names, args_padded)), + **kwargs, + } + + # creating the task and adding the task_done_callback which checks + # if an exception has occured during the task execution + task_object = self._loop.create_task(task(*args, **kwargs)) + task_object.add_done_callback( + lambda task: task_done_callback(task, name) + ) + + # Store the task and its arguments in the '__tasks' dictionary. The + # key is the name of the method, and the value is a dictionary + # containing the task object and the updated keyword arguments. + self.tasks[name] = { + "task": task_object, + "kwargs": kwargs_updated, + } + + # emit the notification that the task was started + for callback in self.task_status_change_callbacks: + callback(name, kwargs_updated) + else: + logger.error(f"Task `{name}` is already running!") + + return start_task From f7f64bbe924ccf3e6b69e268b0788952f72df8d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mose=20M=C3=BCller?= Date: Wed, 25 Oct 2023 11:59:48 +0200 Subject: [PATCH 3/6] adding callback_manager tests --- tests/data_service/test_callback_manager.py | 56 +++++++++++++++++++++ 1 file changed, 56 insertions(+) create mode 100644 tests/data_service/test_callback_manager.py diff --git a/tests/data_service/test_callback_manager.py b/tests/data_service/test_callback_manager.py new file mode 100644 index 0000000..05a15a1 --- /dev/null +++ b/tests/data_service/test_callback_manager.py @@ -0,0 +1,56 @@ +import logging + +from pytest import CaptureFixture + +import pydase + +logger = logging.getLogger() + + +def test_DataService_task_callback(capsys: CaptureFixture) -> None: + class MyService(pydase.DataService): + async def my_task(self) -> None: + logger.info("Triggered task.") + + async def my_other_task(self) -> None: + logger.info("Triggered other task.") + + service = MyService() + service.start_my_task() # type: ignore + service.start_my_other_task() # type: ignore + + captured = capsys.readouterr() + expected_output = sorted( + [ + "MyService.my_task = {}", + "MyService.my_other_task = {}", + ] + ) + actual_output = sorted(captured.out.strip().split("\n")) # type: ignore + assert expected_output == actual_output + + +def test_DataServiceList_task_callback(capsys: CaptureFixture) -> None: + class MySubService(pydase.DataService): + async def my_task(self) -> None: + logger.info("Triggered task.") + + async def my_other_task(self) -> None: + logger.info("Triggered other task.") + + class MyService(pydase.DataService): + sub_services_list = [MySubService() for i in range(2)] + + service = MyService() + service.sub_services_list[0].start_my_task() # type: ignore + service.sub_services_list[1].start_my_other_task() # type: ignore + + captured = capsys.readouterr() + expected_output = sorted( + [ + "MyService.sub_services_list[0].my_task = {}", + "MyService.sub_services_list[1].my_other_task = {}", + ] + ) + actual_output = sorted(captured.out.strip().split("\n")) # type: ignore + assert expected_output == actual_output From 81f2281002d08051cf68e5d231d0085b0256ecad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mose=20M=C3=BCller?= Date: Wed, 25 Oct 2023 16:33:29 +0200 Subject: [PATCH 4/6] fix: autostart_tasks capbability in sub-classes --- src/pydase/data_service/task_manager.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/src/pydase/data_service/task_manager.py b/src/pydase/data_service/task_manager.py index 1089453..7ac7cd7 100644 --- a/src/pydase/data_service/task_manager.py +++ b/src/pydase/data_service/task_manager.py @@ -7,6 +7,10 @@ from collections.abc import Callable from functools import wraps from typing import TYPE_CHECKING, Any, TypedDict +from pydase.data_service.abstract_data_service import AbstractDataService +from pydase.data_service.data_service_list import DataServiceList +from pydase.utils.helpers import get_class_and_instance_attributes + if TYPE_CHECKING: from .data_service import DataService @@ -110,6 +114,16 @@ class TaskManager: f"No start method found for service '{service_name}'" ) + attrs = get_class_and_instance_attributes(self.service) + + for _, attr_value in attrs.items(): + if isinstance(attr_value, AbstractDataService): + attr_value._task_manager.start_autostart_tasks() + elif isinstance(attr_value, DataServiceList): + for i, item in enumerate(attr_value): + if isinstance(item, AbstractDataService): + item._task_manager.start_autostart_tasks() + def _make_stop_task(self, name: str) -> Callable[..., Any]: """ Factory function to create a 'stop_task' function for a running task. From 69cd86b6011f92ffe56e35b7333e6021f7d28c35 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mose=20M=C3=BCller?= Date: Wed, 25 Oct 2023 16:39:11 +0200 Subject: [PATCH 5/6] feat: adds autostart_tasks test --- tests/data_service/test_task_manager.py | 104 ++++++++++++++++++++++++ 1 file changed, 104 insertions(+) create mode 100644 tests/data_service/test_task_manager.py diff --git a/tests/data_service/test_task_manager.py b/tests/data_service/test_task_manager.py new file mode 100644 index 0000000..675d7ae --- /dev/null +++ b/tests/data_service/test_task_manager.py @@ -0,0 +1,104 @@ +import logging + +from pytest import CaptureFixture + +import pydase + +logger = logging.getLogger() + + +def test_autostart_task_callback(capsys: CaptureFixture) -> None: + class MyService(pydase.DataService): + def __init__(self) -> None: + self._autostart_tasks = { # type: ignore + "my_task": (), + "my_other_task": (), + } + super().__init__() + + async def my_task(self) -> None: + logger.info("Triggered task.") + + async def my_other_task(self) -> None: + logger.info("Triggered other task.") + + service = MyService() + service._task_manager.start_autostart_tasks() + + captured = capsys.readouterr() + expected_output = sorted( + [ + "MyService.my_task = {}", + "MyService.my_other_task = {}", + ] + ) + actual_output = sorted(captured.out.strip().split("\n")) # type: ignore + assert expected_output == actual_output + + +def test_DataService_subclass_autostart_task_callback(capsys: CaptureFixture) -> None: + class MySubService(pydase.DataService): + def __init__(self) -> None: + self._autostart_tasks = { # type: ignore + "my_task": (), + "my_other_task": (), + } + super().__init__() + + async def my_task(self) -> None: + logger.info("Triggered task.") + + async def my_other_task(self) -> None: + logger.info("Triggered other task.") + + class MyService(pydase.DataService): + sub_service = MySubService() + + service = MyService() + service._task_manager.start_autostart_tasks() + + captured = capsys.readouterr() + expected_output = sorted( + [ + "MyService.sub_service.my_task = {}", + "MyService.sub_service.my_other_task = {}", + ] + ) + actual_output = sorted(captured.out.strip().split("\n")) # type: ignore + assert expected_output == actual_output + + +def test_DataServiceList_subclass_autostart_task_callback( + capsys: CaptureFixture, +) -> None: + class MySubService(pydase.DataService): + def __init__(self) -> None: + self._autostart_tasks = { # type: ignore + "my_task": (), + "my_other_task": (), + } + super().__init__() + + async def my_task(self) -> None: + logger.info("Triggered task.") + + async def my_other_task(self) -> None: + logger.info("Triggered other task.") + + class MyService(pydase.DataService): + sub_services_list = [MySubService() for i in range(2)] + + service = MyService() + service._task_manager.start_autostart_tasks() + + captured = capsys.readouterr() + expected_output = sorted( + [ + "MyService.sub_services_list[0].my_task = {}", + "MyService.sub_services_list[0].my_other_task = {}", + "MyService.sub_services_list[1].my_task = {}", + "MyService.sub_services_list[1].my_other_task = {}", + ] + ) + actual_output = sorted(captured.out.strip().split("\n")) # type: ignore + assert expected_output == actual_output From eb46a088ee895bbaf2dd12c127d95df94ed59de1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mose=20M=C3=BCller?= Date: Wed, 25 Oct 2023 16:48:33 +0200 Subject: [PATCH 6/6] chore: refactoring method --- src/pydase/data_service/task_manager.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/pydase/data_service/task_manager.py b/src/pydase/data_service/task_manager.py index 7ac7cd7..72519d0 100644 --- a/src/pydase/data_service/task_manager.py +++ b/src/pydase/data_service/task_manager.py @@ -103,7 +103,7 @@ class TaskManager: setattr(self.service, f"start_{name}", self._make_start_task(name, method)) setattr(self.service, f"stop_{name}", self._make_stop_task(name)) - def start_autostart_tasks(self) -> None: + def _initiate_task_startup(self) -> None: if self.service._autostart_tasks is not None: for service_name, args in self.service._autostart_tasks.items(): start_method = getattr(self.service, f"start_{service_name}", None) @@ -114,6 +114,8 @@ class TaskManager: f"No start method found for service '{service_name}'" ) + def start_autostart_tasks(self) -> None: + self._initiate_task_startup() attrs = get_class_and_instance_attributes(self.service) for _, attr_value in attrs.items():