adds task_done_callback to each task

The task_done_callback function handles exceptions and emitting
a notification when the task has finished.
This commit is contained in:
Mose Müller 2023-08-18 16:25:13 +02:00
parent e86fd1ffbe
commit 76fb674fcd

View File

@ -97,6 +97,28 @@ class TaskManager:
@wraps(method) @wraps(method)
def start_task(*args: Any, **kwargs: Any) -> None: def start_task(*args: Any, **kwargs: Any) -> None:
def task_done_callback(task: asyncio.Task, name: str) -> None:
"""Handles tasks that have finished.
Removes a task from the tasks dictionary, calls the defined
callbacks, and logs and re-raises exceptions."""
# removing the finished task from the tasks i
self.tasks.pop(name, None)
# emit the notification that the task was stopped
for callback in self.task_status_change_callbacks:
callback(name, None)
exception = task.exception()
if exception is not None:
# Handle the exception, or you can re-raise it.
logger.error(
f"Task '{name}' encountered an exception: "
f"{type(exception).__name__}: {exception}"
)
raise exception
async def task(*args: Any, **kwargs: Any) -> None: async def task(*args: Any, **kwargs: Any) -> None:
try: try:
await method(*args, **kwargs) await method(*args, **kwargs)
@ -126,11 +148,18 @@ class TaskManager:
**kwargs, **kwargs,
} }
# creating the task and adding the task_done_callback which checks
# if an exception has occured during the task execution
task_object = self._loop.create_task(task(*args, **kwargs))
task_object.add_done_callback(
lambda task: task_done_callback(task, name)
)
# Store the task and its arguments in the '__tasks' dictionary. The # Store the task and its arguments in the '__tasks' dictionary. The
# key is the name of the method, and the value is a dictionary # key is the name of the method, and the value is a dictionary
# containing the task object and the updated keyword arguments. # containing the task object and the updated keyword arguments.
self.tasks[name] = { self.tasks[name] = {
"task": self._loop.create_task(task(*args, **kwargs)), "task": task_object,
"kwargs": kwargs_updated, "kwargs": kwargs_updated,
} }
@ -142,14 +171,10 @@ class TaskManager:
def stop_task() -> None: def stop_task() -> None:
# cancel the task # cancel the task
task = self.tasks.pop(name, None) task = self.tasks.get(name, None)
if task is not None: if task is not None:
self._loop.call_soon_threadsafe(task["task"].cancel) self._loop.call_soon_threadsafe(task["task"].cancel)
# emit the notification that the task was stopped
for callback in self.task_status_change_callbacks:
callback(name, None)
# create start and stop methods for each coroutine # create start and stop methods for each coroutine
setattr(self.service, f"start_{name}", start_task) setattr(self.service, f"start_{name}", start_task)
setattr(self.service, f"stop_{name}", stop_task) setattr(self.service, f"stop_{name}", stop_task)