From 76fb674fcdab85c813ce7c039d88ff2fc0b22992 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mose=20M=C3=BCller?= Date: Fri, 18 Aug 2023 16:25:13 +0200 Subject: [PATCH] adds task_done_callback to each task The task_done_callback function handles exceptions and emitting a notification when the task has finished. --- src/pydase/data_service/task_manager.py | 37 +++++++++++++++++++++---- 1 file changed, 31 insertions(+), 6 deletions(-) diff --git a/src/pydase/data_service/task_manager.py b/src/pydase/data_service/task_manager.py index c0a4b6c..682f067 100644 --- a/src/pydase/data_service/task_manager.py +++ b/src/pydase/data_service/task_manager.py @@ -97,6 +97,28 @@ class TaskManager: @wraps(method) def start_task(*args: Any, **kwargs: Any) -> None: + def task_done_callback(task: asyncio.Task, name: str) -> None: + """Handles tasks that have finished. + + Removes a task from the tasks dictionary, calls the defined + callbacks, and logs and re-raises exceptions.""" + + # removing the finished task from the tasks i + self.tasks.pop(name, None) + + # emit the notification that the task was stopped + for callback in self.task_status_change_callbacks: + callback(name, None) + + exception = task.exception() + if exception is not None: + # Handle the exception, or you can re-raise it. + logger.error( + f"Task '{name}' encountered an exception: " + f"{type(exception).__name__}: {exception}" + ) + raise exception + async def task(*args: Any, **kwargs: Any) -> None: try: await method(*args, **kwargs) @@ -126,11 +148,18 @@ class TaskManager: **kwargs, } + # creating the task and adding the task_done_callback which checks + # if an exception has occured during the task execution + task_object = self._loop.create_task(task(*args, **kwargs)) + task_object.add_done_callback( + lambda task: task_done_callback(task, name) + ) + # Store the task and its arguments in the '__tasks' dictionary. The # key is the name of the method, and the value is a dictionary # containing the task object and the updated keyword arguments. self.tasks[name] = { - "task": self._loop.create_task(task(*args, **kwargs)), + "task": task_object, "kwargs": kwargs_updated, } @@ -142,14 +171,10 @@ class TaskManager: def stop_task() -> None: # cancel the task - task = self.tasks.pop(name, None) + task = self.tasks.get(name, None) if task is not None: self._loop.call_soon_threadsafe(task["task"].cancel) - # emit the notification that the task was stopped - for callback in self.task_status_change_callbacks: - callback(name, None) - # create start and stop methods for each coroutine setattr(self.service, f"start_{name}", start_task) setattr(self.service, f"stop_{name}", stop_task)