task: adds systemd-like keyword arguments to task decorator

This commit is contained in:
Mose Müller 2025-01-17 17:16:19 +01:00
parent 40a8863ecd
commit 8a8ac9d297
2 changed files with 138 additions and 21 deletions

View File

@ -26,15 +26,25 @@ class PerInstanceTaskDescriptor(Generic[R]):
the service class. the service class.
""" """
def __init__( def __init__( # noqa: PLR0913
self, self,
func: Callable[[Any], Coroutine[None, None, R]] func: Callable[[Any], Coroutine[None, None, R]]
| Callable[[], Coroutine[None, None, R]], | Callable[[], Coroutine[None, None, R]],
autostart: bool = False, autostart: bool = False,
restart_on_failure: bool = False,
restart_sec: float = 1.0,
start_limit_interval_sec: float | None = None,
start_limit_burst: int = 3,
timeout_start_sec: float = 0.0,
) -> None: ) -> None:
self.__func = func self.__func = func
self.__autostart = autostart self.__autostart = autostart
self.__task_instances: dict[object, Task[R]] = {} self.__task_instances: dict[object, Task[R]] = {}
self.__restart_on_failure = restart_on_failure
self.__restart_sec = restart_sec
self.__start_limit_interval_sec = start_limit_interval_sec
self.__start_limit_burst = start_limit_burst
self.__timeout_start_sec = timeout_start_sec
def __set_name__(self, owner: type[DataService], name: str) -> None: def __set_name__(self, owner: type[DataService], name: str) -> None:
"""Stores the name of the task within the owning class. This method is called """Stores the name of the task within the owning class. This method is called
@ -67,14 +77,28 @@ class PerInstanceTaskDescriptor(Generic[R]):
if instance not in self.__task_instances: if instance not in self.__task_instances:
self.__task_instances[instance] = instance._initialise_new_objects( self.__task_instances[instance] = instance._initialise_new_objects(
self.__task_name, self.__task_name,
Task(self.__func.__get__(instance, owner), autostart=self.__autostart), Task(
self.__func.__get__(instance, owner),
autostart=self.__autostart,
restart_on_failure=self.__restart_on_failure,
restart_sec=self.__restart_sec,
start_limit_interval_sec=self.__start_limit_interval_sec,
start_limit_burst=self.__start_limit_burst,
timeout_start_sec=self.__timeout_start_sec,
),
) )
return self.__task_instances[instance] return self.__task_instances[instance]
def task( def task( # noqa: PLR0913
*, autostart: bool = False *,
autostart: bool = False,
restart_on_failure: bool = False,
restart_sec: float = 1.0,
start_limit_interval_sec: float | None = None,
start_limit_burst: int = 3,
timeout_start_sec: float = 0.0,
) -> Callable[ ) -> Callable[
[ [
Callable[[Any], Coroutine[None, None, R]] Callable[[Any], Coroutine[None, None, R]]
@ -96,13 +120,29 @@ def task(
periodically or perform asynchronous operations, such as polling data sources, periodically or perform asynchronous operations, such as polling data sources,
updating databases, or any recurring job that should be managed within the context updating databases, or any recurring job that should be managed within the context
of a `DataService`. of a `DataService`.
time.
The keyword arguments that can be passed to this decorator are inspired by systemd
unit services.
Args: Args:
autostart: autostart:
If set to True, the task will automatically start when the service is If set to True, the task will automatically start when the service is
initialized. Defaults to False. initialized. Defaults to False.
restart_on_failure:
Configures whether the task shall be restarted when it exits with an
exception other than [`asyncio.CancelledError`][asyncio.CancelledError].
restart_sec:
Configures the time to sleep before restarting a task. Defaults to 1.0.
start_limit_interval_sec:
Configures start rate limiting. Tasks which are started more than
`start_limit_burst` times within an `start_limit_interval_sec` time span are
not permitted to start any more. Defaults to None (disabled rate limiting).
start_limit_burst:
Configures unit start rate limiting. Tasks which are started more than
`start_limit_burst` times within an `start_limit_interval_sec` time span are
not permitted to start any more. Defaults to 3.
timeout_start_sec:
Configures the time to wait for start-up. Defaults to 0.0.
Returns: Returns:
A decorator that wraps an asynchronous function in a A decorator that wraps an asynchronous function in a
[`PerInstanceTaskDescriptor`][pydase.task.decorator.PerInstanceTaskDescriptor] [`PerInstanceTaskDescriptor`][pydase.task.decorator.PerInstanceTaskDescriptor]
@ -140,6 +180,14 @@ def task(
func: Callable[[Any], Coroutine[None, None, R]] func: Callable[[Any], Coroutine[None, None, R]]
| Callable[[], Coroutine[None, None, R]], | Callable[[], Coroutine[None, None, R]],
) -> PerInstanceTaskDescriptor[R]: ) -> PerInstanceTaskDescriptor[R]:
return PerInstanceTaskDescriptor(func, autostart=autostart) return PerInstanceTaskDescriptor(
func,
autostart=autostart,
restart_on_failure=restart_on_failure,
restart_sec=restart_sec,
start_limit_interval_sec=start_limit_interval_sec,
start_limit_burst=start_limit_burst,
timeout_start_sec=timeout_start_sec,
)
return decorator return decorator

View File

@ -2,6 +2,8 @@ import asyncio
import inspect import inspect
import logging import logging
from collections.abc import Callable, Coroutine from collections.abc import Callable, Coroutine
from datetime import datetime
from time import time
from typing import ( from typing import (
Generic, Generic,
TypeVar, TypeVar,
@ -28,6 +30,9 @@ class Task(pydase.data_service.data_service.DataService, Generic[R]):
decorator, it is replaced by a `Task` instance that controls the execution of the decorator, it is replaced by a `Task` instance that controls the execution of the
original function. original function.
The keyword arguments that can be passed to this class are inspired by systemd unit
services.
Args: Args:
func: func:
The asynchronous function that this task wraps. It must be a coroutine The asynchronous function that this task wraps. It must be a coroutine
@ -35,6 +40,21 @@ class Task(pydase.data_service.data_service.DataService, Generic[R]):
autostart: autostart:
If set to True, the task will automatically start when the service is If set to True, the task will automatically start when the service is
initialized. Defaults to False. initialized. Defaults to False.
restart_on_failure:
Configures whether the task shall be restarted when it exits with an
exception other than [`asyncio.CancelledError`][asyncio.CancelledError].
restart_sec:
Configures the time to sleep before restarting a task. Defaults to 1.0.
start_limit_interval_sec:
Configures start rate limiting. Tasks which are started more than
`start_limit_burst` times within an `start_limit_interval_sec` time span are
not permitted to start any more. Defaults to None (disabled rate limiting).
start_limit_burst:
Configures unit start rate limiting. Tasks which are started more than
`start_limit_burst` times within an `start_limit_interval_sec` time span are
not permitted to start any more. Defaults to 3.
timeout_start_sec:
Configures the time to wait for start-up. Defaults to 0.0.
Example: Example:
```python ```python
@ -63,14 +83,24 @@ class Task(pydase.data_service.data_service.DataService, Generic[R]):
`service.my_task.start()` and `service.my_task.stop()`, respectively. `service.my_task.start()` and `service.my_task.stop()`, respectively.
""" """
def __init__( def __init__( # noqa: PLR0913
self, self,
func: Callable[[], Coroutine[None, None, R | None]], func: Callable[[], Coroutine[None, None, R | None]],
*, *,
autostart: bool = False, autostart: bool = False,
restart_on_failure: bool = False,
restart_sec: float = 1.0,
start_limit_interval_sec: float | None = None,
start_limit_burst: int = 3,
timeout_start_sec: float = 0.0,
) -> None: ) -> None:
super().__init__() super().__init__()
self._autostart = autostart self._autostart = autostart
self._restart_on_failure = restart_on_failure
self._restart_sec = restart_sec
self._start_limit_interval_sec = start_limit_interval_sec
self._start_limit_burst = start_limit_burst
self._timeout_start_sec = timeout_start_sec
self._func_name = func.__name__ self._func_name = func.__name__
self._func = func self._func = func
self._task: asyncio.Task[R | None] | None = None self._task: asyncio.Task[R | None] | None = None
@ -109,28 +139,67 @@ class Task(pydase.data_service.data_service.DataService, Generic[R]):
self._task = None self._task = None
self._status = TaskStatus.NOT_RUNNING self._status = TaskStatus.NOT_RUNNING
exception = task.exception() try:
if exception is not None: task.exception()
except asyncio.CancelledError:
pass
except Exception as e:
logger.exception( logger.exception(
"Task '%s' encountered an exception: %s: %s", "Task '%s' encountered an exception: %s: %s",
self._func_name, self._func_name,
type(exception).__name__, type(e).__name__,
exception, e,
) )
raise exception raise e
else:
self._result = task.result() self._result = task.result()
async def run_task() -> R | None: async def run_task() -> R | None:
if inspect.iscoroutinefunction(self._func): if inspect.iscoroutinefunction(self._func):
logger.info("Starting task %r", self._func_name) logger.info("Starting task %r", self._func_name)
self._status = TaskStatus.RUNNING self._status = TaskStatus.RUNNING
res: Coroutine[None, None, R | None] = self._func() attempts = 0
try: start_time = None
return await res
except asyncio.CancelledError: if self._timeout_start_sec:
logger.info("Task '%s' was cancelled", self._func_name) # Wait for timeout_start_sec seconds
return None await asyncio.sleep(self._timeout_start_sec)
while True:
res: Coroutine[None, None, R | None] = self._func()
try:
await res
except asyncio.CancelledError:
logger.info("Task '%s' was cancelled", self._func_name)
raise
except Exception as e:
if start_time is None:
start_time = time()
attempts += 1
logger.exception(
"Task %r encountered an exception: %r [attempt %s since %s].",
self._func.__name__,
e,
attempts,
datetime.fromtimestamp(start_time),
)
if not self._restart_on_failure:
break
if self._start_limit_interval_sec is not None:
if (time() - start_time) > self._start_limit_interval_sec:
# reset attempts if start_limit_interval_sec is exceeded
start_time = time()
attempts = 1
elif attempts > self._start_limit_burst:
logger.error(
"Task %r exceeded restart burst limit. Stopping.",
self._func.__name__,
)
break
await asyncio.sleep(self._restart_sec)
return None
logger.warning( logger.warning(
"Cannot start task %r. Function has not been bound yet", self._func_name "Cannot start task %r. Function has not been bound yet", self._func_name
) )