fix: creates property functions to avoid closure and late binding issue

When having multiple tasks, they all pointed to the one defined last.
This commit is contained in:
Mose Müller 2023-10-25 11:57:50 +02:00
parent 8564df5adc
commit 0504a50a08

View File

@ -95,90 +95,9 @@ class TaskManager:
for name, method in inspect.getmembers(
self.service, predicate=inspect.iscoroutinefunction
):
@wraps(method)
def start_task(*args: Any, **kwargs: Any) -> None:
def task_done_callback(task: asyncio.Task, name: str) -> None:
"""Handles tasks that have finished.
Removes a task from the tasks dictionary, calls the defined
callbacks, and logs and re-raises exceptions."""
# removing the finished task from the tasks i
self.tasks.pop(name, None)
# emit the notification that the task was stopped
for callback in self.task_status_change_callbacks:
callback(name, None)
exception = task.exception()
if exception is not None:
# Handle the exception, or you can re-raise it.
logger.error(
f"Task '{name}' encountered an exception: "
f"{type(exception).__name__}: {exception}"
)
raise exception
async def task(*args: Any, **kwargs: Any) -> None:
try:
await method(*args, **kwargs)
except asyncio.CancelledError:
logger.info(f"Task {name} was cancelled")
if not self.tasks.get(name):
# Get the signature of the coroutine method to start
sig = inspect.signature(method)
# Create a list of the parameter names from the method signature.
parameter_names = list(sig.parameters.keys())
# Extend the list of positional arguments with None values to match
# the length of the parameter names list. This is done to ensure
# that zip can pair each parameter name with a corresponding value.
args_padded = list(args) + [None] * (
len(parameter_names) - len(args)
)
# Create a dictionary of keyword arguments by pairing the parameter
# names with the values in 'args_padded'. Then merge this dictionary
# with the 'kwargs' dictionary. If a parameter is specified in both
# 'args_padded' and 'kwargs', the value from 'kwargs' is used.
kwargs_updated = {
**dict(zip(parameter_names, args_padded)),
**kwargs,
}
# creating the task and adding the task_done_callback which checks
# if an exception has occured during the task execution
task_object = self._loop.create_task(task(*args, **kwargs))
task_object.add_done_callback(
lambda task: task_done_callback(task, name)
)
# Store the task and its arguments in the '__tasks' dictionary. The
# key is the name of the method, and the value is a dictionary
# containing the task object and the updated keyword arguments.
self.tasks[name] = {
"task": task_object,
"kwargs": kwargs_updated,
}
# emit the notification that the task was started
for callback in self.task_status_change_callbacks:
callback(name, kwargs_updated)
else:
logger.error(f"Task `{name}` is already running!")
def stop_task() -> None:
# cancel the task
task = self.tasks.get(name, None)
if task is not None:
self._loop.call_soon_threadsafe(task["task"].cancel)
# create start and stop methods for each coroutine
setattr(self.service, f"start_{name}", start_task)
setattr(self.service, f"stop_{name}", stop_task)
setattr(self.service, f"start_{name}", self._make_start_task(name, method))
setattr(self.service, f"stop_{name}", self._make_stop_task(name))
def start_autostart_tasks(self) -> None:
if self.service._autostart_tasks is not None:
@ -190,3 +109,111 @@ class TaskManager:
logger.warning(
f"No start method found for service '{service_name}'"
)
def _make_stop_task(self, name: str) -> Callable[..., Any]:
"""
Factory function to create a 'stop_task' function for a running task.
The generated function cancels the associated asyncio task using 'name' for
identification, ensuring proper cleanup. Avoids closure and late binding issues.
Args:
name (str): The name of the coroutine task, used for its identification.
"""
def stop_task() -> None:
# cancel the task
task = self.tasks.get(name, None)
if task is not None:
self._loop.call_soon_threadsafe(task["task"].cancel)
return stop_task
def _make_start_task( # noqa
self, name: str, method: Callable[..., Any]
) -> Callable[..., Any]:
"""
Factory function to create a 'start_task' function for a coroutine.
The generated function starts the coroutine as an asyncio task, handling
registration and monitoring.
It uses 'name' and 'method' to avoid the closure and late binding issue.
Args:
name (str): The name of the coroutine, used for task management.
method (callable): The coroutine to be turned into an asyncio task.
"""
@wraps(method)
def start_task(*args: Any, **kwargs: Any) -> None:
def task_done_callback(task: asyncio.Task, name: str) -> None:
"""Handles tasks that have finished.
Removes a task from the tasks dictionary, calls the defined
callbacks, and logs and re-raises exceptions."""
# removing the finished task from the tasks i
self.tasks.pop(name, None)
# emit the notification that the task was stopped
for callback in self.task_status_change_callbacks:
callback(name, None)
exception = task.exception()
if exception is not None:
# Handle the exception, or you can re-raise it.
logger.error(
f"Task '{name}' encountered an exception: "
f"{type(exception).__name__}: {exception}"
)
raise exception
async def task(*args: Any, **kwargs: Any) -> None:
try:
await method(*args, **kwargs)
except asyncio.CancelledError:
logger.info(f"Task {name} was cancelled")
if not self.tasks.get(name):
# Get the signature of the coroutine method to start
sig = inspect.signature(method)
# Create a list of the parameter names from the method signature.
parameter_names = list(sig.parameters.keys())
# Extend the list of positional arguments with None values to match
# the length of the parameter names list. This is done to ensure
# that zip can pair each parameter name with a corresponding value.
args_padded = list(args) + [None] * (len(parameter_names) - len(args))
# Create a dictionary of keyword arguments by pairing the parameter
# names with the values in 'args_padded'. Then merge this dictionary
# with the 'kwargs' dictionary. If a parameter is specified in both
# 'args_padded' and 'kwargs', the value from 'kwargs' is used.
kwargs_updated = {
**dict(zip(parameter_names, args_padded)),
**kwargs,
}
# creating the task and adding the task_done_callback which checks
# if an exception has occured during the task execution
task_object = self._loop.create_task(task(*args, **kwargs))
task_object.add_done_callback(
lambda task: task_done_callback(task, name)
)
# Store the task and its arguments in the '__tasks' dictionary. The
# key is the name of the method, and the value is a dictionary
# containing the task object and the updated keyword arguments.
self.tasks[name] = {
"task": task_object,
"kwargs": kwargs_updated,
}
# emit the notification that the task was started
for callback in self.task_status_change_callbacks:
callback(name, kwargs_updated)
else:
logger.error(f"Task `{name}` is already running!")
return start_task