diff --git a/src/pydase/task/decorator.py b/src/pydase/task/decorator.py index b6974b2..3caaba7 100644 --- a/src/pydase/task/decorator.py +++ b/src/pydase/task/decorator.py @@ -26,15 +26,25 @@ class PerInstanceTaskDescriptor(Generic[R]): the service class. """ - def __init__( + def __init__( # noqa: PLR0913 self, func: Callable[[Any], Coroutine[None, None, R]] | Callable[[], Coroutine[None, None, R]], autostart: bool = False, + restart_on_failure: bool = False, + restart_sec: float = 1.0, + start_limit_interval_sec: float | None = None, + start_limit_burst: int = 3, + timeout_start_sec: float = 0.0, ) -> None: self.__func = func self.__autostart = autostart self.__task_instances: dict[object, Task[R]] = {} + self.__restart_on_failure = restart_on_failure + self.__restart_sec = restart_sec + self.__start_limit_interval_sec = start_limit_interval_sec + self.__start_limit_burst = start_limit_burst + self.__timeout_start_sec = timeout_start_sec def __set_name__(self, owner: type[DataService], name: str) -> None: """Stores the name of the task within the owning class. This method is called @@ -67,14 +77,28 @@ class PerInstanceTaskDescriptor(Generic[R]): if instance not in self.__task_instances: self.__task_instances[instance] = instance._initialise_new_objects( self.__task_name, - Task(self.__func.__get__(instance, owner), autostart=self.__autostart), + Task( + self.__func.__get__(instance, owner), + autostart=self.__autostart, + restart_on_failure=self.__restart_on_failure, + restart_sec=self.__restart_sec, + start_limit_interval_sec=self.__start_limit_interval_sec, + start_limit_burst=self.__start_limit_burst, + timeout_start_sec=self.__timeout_start_sec, + ), ) return self.__task_instances[instance] -def task( - *, autostart: bool = False +def task( # noqa: PLR0913 + *, + autostart: bool = False, + restart_on_failure: bool = False, + restart_sec: float = 1.0, + start_limit_interval_sec: float | None = None, + start_limit_burst: int = 3, + timeout_start_sec: float = 0.0, ) -> Callable[ [ Callable[[Any], Coroutine[None, None, R]] @@ -96,13 +120,29 @@ def task( periodically or perform asynchronous operations, such as polling data sources, updating databases, or any recurring job that should be managed within the context of a `DataService`. - time. + + The keyword arguments that can be passed to this decorator are inspired by systemd + unit services. Args: autostart: If set to True, the task will automatically start when the service is initialized. Defaults to False. - + restart_on_failure: + Configures whether the task shall be restarted when it exits with an + exception other than [`asyncio.CancelledError`][asyncio.CancelledError]. + restart_sec: + Configures the time to sleep before restarting a task. Defaults to 1.0. + start_limit_interval_sec: + Configures start rate limiting. Tasks which are started more than + `start_limit_burst` times within an `start_limit_interval_sec` time span are + not permitted to start any more. Defaults to None (disabled rate limiting). + start_limit_burst: + Configures unit start rate limiting. Tasks which are started more than + `start_limit_burst` times within an `start_limit_interval_sec` time span are + not permitted to start any more. Defaults to 3. + timeout_start_sec: + Configures the time to wait for start-up. Defaults to 0.0. Returns: A decorator that wraps an asynchronous function in a [`PerInstanceTaskDescriptor`][pydase.task.decorator.PerInstanceTaskDescriptor] @@ -140,6 +180,14 @@ def task( func: Callable[[Any], Coroutine[None, None, R]] | Callable[[], Coroutine[None, None, R]], ) -> PerInstanceTaskDescriptor[R]: - return PerInstanceTaskDescriptor(func, autostart=autostart) + return PerInstanceTaskDescriptor( + func, + autostart=autostart, + restart_on_failure=restart_on_failure, + restart_sec=restart_sec, + start_limit_interval_sec=start_limit_interval_sec, + start_limit_burst=start_limit_burst, + timeout_start_sec=timeout_start_sec, + ) return decorator diff --git a/src/pydase/task/task.py b/src/pydase/task/task.py index 6865f0f..3733dd3 100644 --- a/src/pydase/task/task.py +++ b/src/pydase/task/task.py @@ -2,6 +2,8 @@ import asyncio import inspect import logging from collections.abc import Callable, Coroutine +from datetime import datetime +from time import time from typing import ( Generic, TypeVar, @@ -28,6 +30,9 @@ class Task(pydase.data_service.data_service.DataService, Generic[R]): decorator, it is replaced by a `Task` instance that controls the execution of the original function. + The keyword arguments that can be passed to this class are inspired by systemd unit + services. + Args: func: The asynchronous function that this task wraps. It must be a coroutine @@ -35,6 +40,21 @@ class Task(pydase.data_service.data_service.DataService, Generic[R]): autostart: If set to True, the task will automatically start when the service is initialized. Defaults to False. + restart_on_failure: + Configures whether the task shall be restarted when it exits with an + exception other than [`asyncio.CancelledError`][asyncio.CancelledError]. + restart_sec: + Configures the time to sleep before restarting a task. Defaults to 1.0. + start_limit_interval_sec: + Configures start rate limiting. Tasks which are started more than + `start_limit_burst` times within an `start_limit_interval_sec` time span are + not permitted to start any more. Defaults to None (disabled rate limiting). + start_limit_burst: + Configures unit start rate limiting. Tasks which are started more than + `start_limit_burst` times within an `start_limit_interval_sec` time span are + not permitted to start any more. Defaults to 3. + timeout_start_sec: + Configures the time to wait for start-up. Defaults to 0.0. Example: ```python @@ -63,14 +83,24 @@ class Task(pydase.data_service.data_service.DataService, Generic[R]): `service.my_task.start()` and `service.my_task.stop()`, respectively. """ - def __init__( + def __init__( # noqa: PLR0913 self, func: Callable[[], Coroutine[None, None, R | None]], *, autostart: bool = False, + restart_on_failure: bool = False, + restart_sec: float = 1.0, + start_limit_interval_sec: float | None = None, + start_limit_burst: int = 3, + timeout_start_sec: float = 0.0, ) -> None: super().__init__() self._autostart = autostart + self._restart_on_failure = restart_on_failure + self._restart_sec = restart_sec + self._start_limit_interval_sec = start_limit_interval_sec + self._start_limit_burst = start_limit_burst + self._timeout_start_sec = timeout_start_sec self._func_name = func.__name__ self._func = func self._task: asyncio.Task[R | None] | None = None @@ -109,28 +139,67 @@ class Task(pydase.data_service.data_service.DataService, Generic[R]): self._task = None self._status = TaskStatus.NOT_RUNNING - exception = task.exception() - if exception is not None: + try: + task.exception() + except asyncio.CancelledError: + pass + except Exception as e: logger.exception( "Task '%s' encountered an exception: %s: %s", self._func_name, - type(exception).__name__, - exception, + type(e).__name__, + e, ) - raise exception - - self._result = task.result() + raise e + else: + self._result = task.result() async def run_task() -> R | None: if inspect.iscoroutinefunction(self._func): logger.info("Starting task %r", self._func_name) self._status = TaskStatus.RUNNING - res: Coroutine[None, None, R | None] = self._func() - try: - return await res - except asyncio.CancelledError: - logger.info("Task '%s' was cancelled", self._func_name) - return None + attempts = 0 + start_time = None + + if self._timeout_start_sec: + # Wait for timeout_start_sec seconds + await asyncio.sleep(self._timeout_start_sec) + + while True: + res: Coroutine[None, None, R | None] = self._func() + + try: + await res + except asyncio.CancelledError: + logger.info("Task '%s' was cancelled", self._func_name) + raise + except Exception as e: + if start_time is None: + start_time = time() + + attempts += 1 + logger.exception( + "Task %r encountered an exception: %r [attempt %s since %s].", + self._func.__name__, + e, + attempts, + datetime.fromtimestamp(start_time), + ) + if not self._restart_on_failure: + break + if self._start_limit_interval_sec is not None: + if (time() - start_time) > self._start_limit_interval_sec: + # reset attempts if start_limit_interval_sec is exceeded + start_time = time() + attempts = 1 + elif attempts > self._start_limit_burst: + logger.error( + "Task %r exceeded restart burst limit. Stopping.", + self._func.__name__, + ) + break + await asyncio.sleep(self._restart_sec) + return None logger.warning( "Cannot start task %r. Function has not been bound yet", self._func_name )