From 8a8ac9d297f04622ee324a261b06a082fb871a03 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mose=20M=C3=BCller?= Date: Fri, 17 Jan 2025 17:16:19 +0100 Subject: [PATCH 1/9] task: adds systemd-like keyword arguments to task decorator --- src/pydase/task/decorator.py | 62 ++++++++++++++++++++--- src/pydase/task/task.py | 97 ++++++++++++++++++++++++++++++------ 2 files changed, 138 insertions(+), 21 deletions(-) diff --git a/src/pydase/task/decorator.py b/src/pydase/task/decorator.py index b6974b2..3caaba7 100644 --- a/src/pydase/task/decorator.py +++ b/src/pydase/task/decorator.py @@ -26,15 +26,25 @@ class PerInstanceTaskDescriptor(Generic[R]): the service class. """ - def __init__( + def __init__( # noqa: PLR0913 self, func: Callable[[Any], Coroutine[None, None, R]] | Callable[[], Coroutine[None, None, R]], autostart: bool = False, + restart_on_failure: bool = False, + restart_sec: float = 1.0, + start_limit_interval_sec: float | None = None, + start_limit_burst: int = 3, + timeout_start_sec: float = 0.0, ) -> None: self.__func = func self.__autostart = autostart self.__task_instances: dict[object, Task[R]] = {} + self.__restart_on_failure = restart_on_failure + self.__restart_sec = restart_sec + self.__start_limit_interval_sec = start_limit_interval_sec + self.__start_limit_burst = start_limit_burst + self.__timeout_start_sec = timeout_start_sec def __set_name__(self, owner: type[DataService], name: str) -> None: """Stores the name of the task within the owning class. This method is called @@ -67,14 +77,28 @@ class PerInstanceTaskDescriptor(Generic[R]): if instance not in self.__task_instances: self.__task_instances[instance] = instance._initialise_new_objects( self.__task_name, - Task(self.__func.__get__(instance, owner), autostart=self.__autostart), + Task( + self.__func.__get__(instance, owner), + autostart=self.__autostart, + restart_on_failure=self.__restart_on_failure, + restart_sec=self.__restart_sec, + start_limit_interval_sec=self.__start_limit_interval_sec, + start_limit_burst=self.__start_limit_burst, + timeout_start_sec=self.__timeout_start_sec, + ), ) return self.__task_instances[instance] -def task( - *, autostart: bool = False +def task( # noqa: PLR0913 + *, + autostart: bool = False, + restart_on_failure: bool = False, + restart_sec: float = 1.0, + start_limit_interval_sec: float | None = None, + start_limit_burst: int = 3, + timeout_start_sec: float = 0.0, ) -> Callable[ [ Callable[[Any], Coroutine[None, None, R]] @@ -96,13 +120,29 @@ def task( periodically or perform asynchronous operations, such as polling data sources, updating databases, or any recurring job that should be managed within the context of a `DataService`. - time. + + The keyword arguments that can be passed to this decorator are inspired by systemd + unit services. Args: autostart: If set to True, the task will automatically start when the service is initialized. Defaults to False. - + restart_on_failure: + Configures whether the task shall be restarted when it exits with an + exception other than [`asyncio.CancelledError`][asyncio.CancelledError]. + restart_sec: + Configures the time to sleep before restarting a task. Defaults to 1.0. + start_limit_interval_sec: + Configures start rate limiting. Tasks which are started more than + `start_limit_burst` times within an `start_limit_interval_sec` time span are + not permitted to start any more. Defaults to None (disabled rate limiting). + start_limit_burst: + Configures unit start rate limiting. Tasks which are started more than + `start_limit_burst` times within an `start_limit_interval_sec` time span are + not permitted to start any more. Defaults to 3. + timeout_start_sec: + Configures the time to wait for start-up. Defaults to 0.0. Returns: A decorator that wraps an asynchronous function in a [`PerInstanceTaskDescriptor`][pydase.task.decorator.PerInstanceTaskDescriptor] @@ -140,6 +180,14 @@ def task( func: Callable[[Any], Coroutine[None, None, R]] | Callable[[], Coroutine[None, None, R]], ) -> PerInstanceTaskDescriptor[R]: - return PerInstanceTaskDescriptor(func, autostart=autostart) + return PerInstanceTaskDescriptor( + func, + autostart=autostart, + restart_on_failure=restart_on_failure, + restart_sec=restart_sec, + start_limit_interval_sec=start_limit_interval_sec, + start_limit_burst=start_limit_burst, + timeout_start_sec=timeout_start_sec, + ) return decorator diff --git a/src/pydase/task/task.py b/src/pydase/task/task.py index 6865f0f..3733dd3 100644 --- a/src/pydase/task/task.py +++ b/src/pydase/task/task.py @@ -2,6 +2,8 @@ import asyncio import inspect import logging from collections.abc import Callable, Coroutine +from datetime import datetime +from time import time from typing import ( Generic, TypeVar, @@ -28,6 +30,9 @@ class Task(pydase.data_service.data_service.DataService, Generic[R]): decorator, it is replaced by a `Task` instance that controls the execution of the original function. + The keyword arguments that can be passed to this class are inspired by systemd unit + services. + Args: func: The asynchronous function that this task wraps. It must be a coroutine @@ -35,6 +40,21 @@ class Task(pydase.data_service.data_service.DataService, Generic[R]): autostart: If set to True, the task will automatically start when the service is initialized. Defaults to False. + restart_on_failure: + Configures whether the task shall be restarted when it exits with an + exception other than [`asyncio.CancelledError`][asyncio.CancelledError]. + restart_sec: + Configures the time to sleep before restarting a task. Defaults to 1.0. + start_limit_interval_sec: + Configures start rate limiting. Tasks which are started more than + `start_limit_burst` times within an `start_limit_interval_sec` time span are + not permitted to start any more. Defaults to None (disabled rate limiting). + start_limit_burst: + Configures unit start rate limiting. Tasks which are started more than + `start_limit_burst` times within an `start_limit_interval_sec` time span are + not permitted to start any more. Defaults to 3. + timeout_start_sec: + Configures the time to wait for start-up. Defaults to 0.0. Example: ```python @@ -63,14 +83,24 @@ class Task(pydase.data_service.data_service.DataService, Generic[R]): `service.my_task.start()` and `service.my_task.stop()`, respectively. """ - def __init__( + def __init__( # noqa: PLR0913 self, func: Callable[[], Coroutine[None, None, R | None]], *, autostart: bool = False, + restart_on_failure: bool = False, + restart_sec: float = 1.0, + start_limit_interval_sec: float | None = None, + start_limit_burst: int = 3, + timeout_start_sec: float = 0.0, ) -> None: super().__init__() self._autostart = autostart + self._restart_on_failure = restart_on_failure + self._restart_sec = restart_sec + self._start_limit_interval_sec = start_limit_interval_sec + self._start_limit_burst = start_limit_burst + self._timeout_start_sec = timeout_start_sec self._func_name = func.__name__ self._func = func self._task: asyncio.Task[R | None] | None = None @@ -109,28 +139,67 @@ class Task(pydase.data_service.data_service.DataService, Generic[R]): self._task = None self._status = TaskStatus.NOT_RUNNING - exception = task.exception() - if exception is not None: + try: + task.exception() + except asyncio.CancelledError: + pass + except Exception as e: logger.exception( "Task '%s' encountered an exception: %s: %s", self._func_name, - type(exception).__name__, - exception, + type(e).__name__, + e, ) - raise exception - - self._result = task.result() + raise e + else: + self._result = task.result() async def run_task() -> R | None: if inspect.iscoroutinefunction(self._func): logger.info("Starting task %r", self._func_name) self._status = TaskStatus.RUNNING - res: Coroutine[None, None, R | None] = self._func() - try: - return await res - except asyncio.CancelledError: - logger.info("Task '%s' was cancelled", self._func_name) - return None + attempts = 0 + start_time = None + + if self._timeout_start_sec: + # Wait for timeout_start_sec seconds + await asyncio.sleep(self._timeout_start_sec) + + while True: + res: Coroutine[None, None, R | None] = self._func() + + try: + await res + except asyncio.CancelledError: + logger.info("Task '%s' was cancelled", self._func_name) + raise + except Exception as e: + if start_time is None: + start_time = time() + + attempts += 1 + logger.exception( + "Task %r encountered an exception: %r [attempt %s since %s].", + self._func.__name__, + e, + attempts, + datetime.fromtimestamp(start_time), + ) + if not self._restart_on_failure: + break + if self._start_limit_interval_sec is not None: + if (time() - start_time) > self._start_limit_interval_sec: + # reset attempts if start_limit_interval_sec is exceeded + start_time = time() + attempts = 1 + elif attempts > self._start_limit_burst: + logger.error( + "Task %r exceeded restart burst limit. Stopping.", + self._func.__name__, + ) + break + await asyncio.sleep(self._restart_sec) + return None logger.warning( "Cannot start task %r. Function has not been bound yet", self._func_name ) From 1b35dba64f4e5ce5f525f3c95af8c7950b52d50e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mose=20M=C3=BCller?= Date: Fri, 17 Jan 2025 17:32:31 +0100 Subject: [PATCH 2/9] task: adds exit_on_failure option --- src/pydase/task/decorator.py | 8 ++++++++ src/pydase/task/task.py | 28 ++++++++++++++++++++-------- 2 files changed, 28 insertions(+), 8 deletions(-) diff --git a/src/pydase/task/decorator.py b/src/pydase/task/decorator.py index 3caaba7..7772465 100644 --- a/src/pydase/task/decorator.py +++ b/src/pydase/task/decorator.py @@ -36,6 +36,7 @@ class PerInstanceTaskDescriptor(Generic[R]): start_limit_interval_sec: float | None = None, start_limit_burst: int = 3, timeout_start_sec: float = 0.0, + exit_on_failure: bool = False, ) -> None: self.__func = func self.__autostart = autostart @@ -45,6 +46,7 @@ class PerInstanceTaskDescriptor(Generic[R]): self.__start_limit_interval_sec = start_limit_interval_sec self.__start_limit_burst = start_limit_burst self.__timeout_start_sec = timeout_start_sec + self.__exit_on_failure = exit_on_failure def __set_name__(self, owner: type[DataService], name: str) -> None: """Stores the name of the task within the owning class. This method is called @@ -85,6 +87,7 @@ class PerInstanceTaskDescriptor(Generic[R]): start_limit_interval_sec=self.__start_limit_interval_sec, start_limit_burst=self.__start_limit_burst, timeout_start_sec=self.__timeout_start_sec, + exit_on_failure=self.__exit_on_failure, ), ) @@ -99,6 +102,7 @@ def task( # noqa: PLR0913 start_limit_interval_sec: float | None = None, start_limit_burst: int = 3, timeout_start_sec: float = 0.0, + exit_on_failure: bool = False, ) -> Callable[ [ Callable[[Any], Coroutine[None, None, R]] @@ -143,6 +147,9 @@ def task( # noqa: PLR0913 not permitted to start any more. Defaults to 3. timeout_start_sec: Configures the time to wait for start-up. Defaults to 0.0. + exit_on_failure: + If True, exit the service if the task fails and restart_on_failure is False + or burst limits are exceeded. Returns: A decorator that wraps an asynchronous function in a [`PerInstanceTaskDescriptor`][pydase.task.decorator.PerInstanceTaskDescriptor] @@ -188,6 +195,7 @@ def task( # noqa: PLR0913 start_limit_interval_sec=start_limit_interval_sec, start_limit_burst=start_limit_burst, timeout_start_sec=timeout_start_sec, + exit_on_failure=exit_on_failure, ) return decorator diff --git a/src/pydase/task/task.py b/src/pydase/task/task.py index 3733dd3..55ed8f8 100644 --- a/src/pydase/task/task.py +++ b/src/pydase/task/task.py @@ -1,6 +1,8 @@ import asyncio import inspect import logging +import os +import signal from collections.abc import Callable, Coroutine from datetime import datetime from time import time @@ -55,6 +57,9 @@ class Task(pydase.data_service.data_service.DataService, Generic[R]): not permitted to start any more. Defaults to 3. timeout_start_sec: Configures the time to wait for start-up. Defaults to 0.0. + exit_on_failure: + If True, exit the service if the task fails and restart_on_failure is False + or burst limits are exceeded. Example: ```python @@ -93,6 +98,7 @@ class Task(pydase.data_service.data_service.DataService, Generic[R]): start_limit_interval_sec: float | None = None, start_limit_burst: int = 3, timeout_start_sec: float = 0.0, + exit_on_failure: bool = False, ) -> None: super().__init__() self._autostart = autostart @@ -101,6 +107,7 @@ class Task(pydase.data_service.data_service.DataService, Generic[R]): self._start_limit_interval_sec = start_limit_interval_sec self._start_limit_burst = start_limit_burst self._timeout_start_sec = timeout_start_sec + self._exit_on_failure = exit_on_failure self._func_name = func.__name__ self._func = func self._task: asyncio.Task[R | None] | None = None @@ -139,18 +146,19 @@ class Task(pydase.data_service.data_service.DataService, Generic[R]): self._task = None self._status = TaskStatus.NOT_RUNNING + exception = None try: - task.exception() + exception = task.exception() except asyncio.CancelledError: - pass - except Exception as e: - logger.exception( - "Task '%s' encountered an exception: %s: %s", + return + + if exception is not None: + logger.error( + "Task '%s' encountered an exception: %r", self._func_name, - type(e).__name__, - e, + exception, ) - raise e + os.kill(os.getpid(), signal.SIGTERM) else: self._result = task.result() @@ -186,6 +194,8 @@ class Task(pydase.data_service.data_service.DataService, Generic[R]): datetime.fromtimestamp(start_time), ) if not self._restart_on_failure: + if self._exit_on_failure: + raise e break if self._start_limit_interval_sec is not None: if (time() - start_time) > self._start_limit_interval_sec: @@ -197,6 +207,8 @@ class Task(pydase.data_service.data_service.DataService, Generic[R]): "Task %r exceeded restart burst limit. Stopping.", self._func.__name__, ) + if self._exit_on_failure: + raise e break await asyncio.sleep(self._restart_sec) return None From db559e8ada6335790063a1dea39a78365ad9ae85 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mose=20M=C3=BCller?= Date: Fri, 17 Jan 2025 17:37:39 +0100 Subject: [PATCH 3/9] removes defaults in Task and PerInstanceTaskDescriptor Removes overhead of keeping defaults the same everywhere. --- src/pydase/task/decorator.py | 14 +++++++------- src/pydase/task/task.py | 14 +++++++------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/src/pydase/task/decorator.py b/src/pydase/task/decorator.py index 7772465..1e3037e 100644 --- a/src/pydase/task/decorator.py +++ b/src/pydase/task/decorator.py @@ -30,13 +30,13 @@ class PerInstanceTaskDescriptor(Generic[R]): self, func: Callable[[Any], Coroutine[None, None, R]] | Callable[[], Coroutine[None, None, R]], - autostart: bool = False, - restart_on_failure: bool = False, - restart_sec: float = 1.0, - start_limit_interval_sec: float | None = None, - start_limit_burst: int = 3, - timeout_start_sec: float = 0.0, - exit_on_failure: bool = False, + autostart: bool, + restart_on_failure: bool, + restart_sec: float, + start_limit_interval_sec: float | None, + start_limit_burst: int, + timeout_start_sec: float, + exit_on_failure: bool, ) -> None: self.__func = func self.__autostart = autostart diff --git a/src/pydase/task/task.py b/src/pydase/task/task.py index 55ed8f8..ccc5804 100644 --- a/src/pydase/task/task.py +++ b/src/pydase/task/task.py @@ -92,13 +92,13 @@ class Task(pydase.data_service.data_service.DataService, Generic[R]): self, func: Callable[[], Coroutine[None, None, R | None]], *, - autostart: bool = False, - restart_on_failure: bool = False, - restart_sec: float = 1.0, - start_limit_interval_sec: float | None = None, - start_limit_burst: int = 3, - timeout_start_sec: float = 0.0, - exit_on_failure: bool = False, + autostart: bool, + restart_on_failure: bool, + restart_sec: float, + start_limit_interval_sec: float | None, + start_limit_burst: int, + timeout_start_sec: float, + exit_on_failure: bool, ) -> None: super().__init__() self._autostart = autostart From 303de8231899aad8f0735ecf68a253f4e37f529a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mose=20M=C3=BCller?= Date: Fri, 17 Jan 2025 17:37:52 +0100 Subject: [PATCH 4/9] changes restart_on_failure default to True --- src/pydase/task/decorator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pydase/task/decorator.py b/src/pydase/task/decorator.py index 1e3037e..542a18e 100644 --- a/src/pydase/task/decorator.py +++ b/src/pydase/task/decorator.py @@ -97,7 +97,7 @@ class PerInstanceTaskDescriptor(Generic[R]): def task( # noqa: PLR0913 *, autostart: bool = False, - restart_on_failure: bool = False, + restart_on_failure: bool = True, restart_sec: float = 1.0, start_limit_interval_sec: float | None = None, start_limit_burst: int = 3, From e25511768d907bb7037ae101b9c68b6095b2f4e0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mose=20M=C3=BCller?= Date: Fri, 17 Jan 2025 19:59:51 +0100 Subject: [PATCH 5/9] task: removes check if function is bound (not used) --- src/pydase/task/task.py | 86 +++++++++++++++++++---------------------- 1 file changed, 40 insertions(+), 46 deletions(-) diff --git a/src/pydase/task/task.py b/src/pydase/task/task.py index ccc5804..0c4b157 100644 --- a/src/pydase/task/task.py +++ b/src/pydase/task/task.py @@ -1,5 +1,4 @@ import asyncio -import inspect import logging import os import signal @@ -163,58 +162,53 @@ class Task(pydase.data_service.data_service.DataService, Generic[R]): self._result = task.result() async def run_task() -> R | None: - if inspect.iscoroutinefunction(self._func): - logger.info("Starting task %r", self._func_name) - self._status = TaskStatus.RUNNING - attempts = 0 - start_time = None + logger.info("Starting task %r", self._func_name) + self._status = TaskStatus.RUNNING + attempts = 0 + start_time = None - if self._timeout_start_sec: - # Wait for timeout_start_sec seconds - await asyncio.sleep(self._timeout_start_sec) + if self._timeout_start_sec: + # Wait for timeout_start_sec seconds + await asyncio.sleep(self._timeout_start_sec) - while True: - res: Coroutine[None, None, R | None] = self._func() + while True: + res: Coroutine[None, None, R | None] = self._func() - try: - await res - except asyncio.CancelledError: - logger.info("Task '%s' was cancelled", self._func_name) - raise - except Exception as e: - if start_time is None: + try: + await res + except asyncio.CancelledError: + logger.info("Task '%s' was cancelled", self._func_name) + raise + except Exception as e: + if start_time is None: + start_time = time() + + attempts += 1 + logger.exception( + "Task %r encountered an exception: %r [attempt %s since %s].", + self._func.__name__, + e, + attempts, + datetime.fromtimestamp(start_time), + ) + if not self._restart_on_failure: + if self._exit_on_failure: + raise e + break + if self._start_limit_interval_sec is not None: + if (time() - start_time) > self._start_limit_interval_sec: + # reset attempts if start_limit_interval_sec is exceeded start_time = time() - - attempts += 1 - logger.exception( - "Task %r encountered an exception: %r [attempt %s since %s].", - self._func.__name__, - e, - attempts, - datetime.fromtimestamp(start_time), - ) - if not self._restart_on_failure: + attempts = 1 + elif attempts > self._start_limit_burst: + logger.error( + "Task %r exceeded restart burst limit. Stopping.", + self._func.__name__, + ) if self._exit_on_failure: raise e break - if self._start_limit_interval_sec is not None: - if (time() - start_time) > self._start_limit_interval_sec: - # reset attempts if start_limit_interval_sec is exceeded - start_time = time() - attempts = 1 - elif attempts > self._start_limit_burst: - logger.error( - "Task %r exceeded restart burst limit. Stopping.", - self._func.__name__, - ) - if self._exit_on_failure: - raise e - break - await asyncio.sleep(self._restart_sec) - return None - logger.warning( - "Cannot start task %r. Function has not been bound yet", self._func_name - ) + await asyncio.sleep(self._restart_sec) return None logger.info("Creating task %r", self._func_name) From d0b0803407227448576c6c86ededb8738247bca4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mose=20M=C3=BCller?= Date: Fri, 17 Jan 2025 20:00:04 +0100 Subject: [PATCH 6/9] adds tests for new task options --- tests/task/test_task.py | 166 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 166 insertions(+) diff --git a/tests/task/test_task.py b/tests/task/test_task.py index 9c83d42..230d95c 100644 --- a/tests/task/test_task.py +++ b/tests/task/test_task.py @@ -289,3 +289,169 @@ async def test_manual_start_with_multiple_service_instances( await asyncio.sleep(0.01) assert "Task 'my_task' was cancelled" in caplog.text + + +@pytest.mark.asyncio(scope="function") +async def test_restart_on_failure(caplog: LogCaptureFixture) -> None: + class MyService(pydase.DataService): + @task(restart_on_failure=True, restart_sec=0.1) + async def my_task(self) -> None: + logger.info("Triggered task.") + raise Exception("Task failure") + + service_instance = MyService() + state_manager = StateManager(service_instance) + DataServiceObserver(state_manager) + service_instance.my_task.start() + + await asyncio.sleep(0.01) + assert "Task 'my_task' encountered an exception" in caplog.text + caplog.clear() + await asyncio.sleep(0.1) + assert service_instance.my_task.status == TaskStatus.RUNNING + assert "Task 'my_task' encountered an exception" in caplog.text + assert "Triggered task." in caplog.text + + +@pytest.mark.asyncio(scope="function") +async def test_restart_sec(caplog: LogCaptureFixture) -> None: + class MyService(pydase.DataService): + @task(restart_on_failure=True, restart_sec=0.1) + async def my_task(self) -> None: + logger.info("Triggered task.") + raise Exception("Task failure") + + service_instance = MyService() + state_manager = StateManager(service_instance) + DataServiceObserver(state_manager) + service_instance.my_task.start() + + await asyncio.sleep(0.001) + assert "Triggered task." in caplog.text + caplog.clear() + await asyncio.sleep(0.05) + assert "Triggered task." not in caplog.text + await asyncio.sleep(0.05) + assert "Triggered task." in caplog.text # Ensures the task restarted after 0.2s + + +@pytest.mark.asyncio(scope="function") +async def test_exceeding_start_limit_interval_sec_and_burst( + caplog: LogCaptureFixture, +) -> None: + class MyService(pydase.DataService): + @task( + restart_on_failure=True, + restart_sec=0.0, + start_limit_interval_sec=1.0, + start_limit_burst=2, + ) + async def my_task(self) -> None: + raise Exception("Task failure") + + service_instance = MyService() + state_manager = StateManager(service_instance) + DataServiceObserver(state_manager) + service_instance.my_task.start() + + await asyncio.sleep(0.1) + assert "Task 'my_task' exceeded restart burst limit" in caplog.text + assert service_instance.my_task.status == TaskStatus.NOT_RUNNING + + +@pytest.mark.asyncio(scope="function") +async def test_non_exceeding_start_limit_interval_sec_and_burst( + caplog: LogCaptureFixture, +) -> None: + class MyService(pydase.DataService): + @task( + restart_on_failure=True, + restart_sec=0.1, + start_limit_interval_sec=0.1, + start_limit_burst=2, + ) + async def my_task(self) -> None: + raise Exception("Task failure") + + service_instance = MyService() + state_manager = StateManager(service_instance) + DataServiceObserver(state_manager) + service_instance.my_task.start() + + await asyncio.sleep(0.5) + assert "Task 'my_task' exceeded restart burst limit" not in caplog.text + assert service_instance.my_task.status == TaskStatus.RUNNING + + +@pytest.mark.asyncio(scope="function") +async def test_timeout_start_sec(caplog: LogCaptureFixture) -> None: + class MyService(pydase.DataService): + @task(timeout_start_sec=0.2) + async def my_task(self) -> None: + logger.info("Starting task.") + await asyncio.sleep(1) + + service_instance = MyService() + state_manager = StateManager(service_instance) + DataServiceObserver(state_manager) + service_instance.my_task.start() + + await asyncio.sleep(0.1) + assert "Starting task." not in caplog.text + await asyncio.sleep(0.2) + assert "Starting task." in caplog.text + + +@pytest.mark.asyncio(scope="function") +async def test_exit_on_failure( + monkeypatch: pytest.MonkeyPatch, caplog: LogCaptureFixture +) -> None: + class MyService(pydase.DataService): + @task(restart_on_failure=False, exit_on_failure=True) + async def my_task(self) -> None: + logger.info("Triggered task.") + raise Exception("Critical failure") + + def mock_os_kill(pid: int, signal: int) -> None: + logger.critical("os.kill called with signal=%s and pid=%s", signal, pid) + + monkeypatch.setattr("os.kill", mock_os_kill) + + service_instance = MyService() + state_manager = StateManager(service_instance) + DataServiceObserver(state_manager) + service_instance.my_task.start() + + await asyncio.sleep(0.1) + assert "os.kill called with signal=15 and pid=" in caplog.text + assert "Task 'my_task' encountered an exception" in caplog.text + + +@pytest.mark.asyncio(scope="function") +async def test_exit_on_failure_exceeding_rate_limit( + monkeypatch: pytest.MonkeyPatch, caplog: LogCaptureFixture +) -> None: + class MyService(pydase.DataService): + @task( + restart_on_failure=True, + restart_sec=0.0, + start_limit_interval_sec=0.1, + start_limit_burst=2, + exit_on_failure=True, + ) + async def my_task(self) -> None: + raise Exception("Critical failure") + + def mock_os_kill(pid: int, signal: int) -> None: + logger.critical("os.kill called with signal=%s and pid=%s", signal, pid) + + monkeypatch.setattr("os.kill", mock_os_kill) + + service_instance = MyService() + state_manager = StateManager(service_instance) + DataServiceObserver(state_manager) + service_instance.my_task.start() + + await asyncio.sleep(0.5) + assert "os.kill called with signal=15 and pid=" in caplog.text + assert "Task 'my_task' encountered an exception" in caplog.text From c66b90c4e59cc19dff4dfd232f69e8ffbd6a3f3b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mose=20M=C3=BCller?= Date: Fri, 17 Jan 2025 20:21:00 +0100 Subject: [PATCH 7/9] chore: refactoring Task --- src/pydase/task/task.py | 128 ++++++++++++++++++++++++---------------- 1 file changed, 77 insertions(+), 51 deletions(-) diff --git a/src/pydase/task/task.py b/src/pydase/task/task.py index 0c4b157..86cd3e2 100644 --- a/src/pydase/task/task.py +++ b/src/pydase/task/task.py @@ -161,60 +161,86 @@ class Task(pydase.data_service.data_service.DataService, Generic[R]): else: self._result = task.result() - async def run_task() -> R | None: - logger.info("Starting task %r", self._func_name) - self._status = TaskStatus.RUNNING - attempts = 0 - start_time = None - - if self._timeout_start_sec: - # Wait for timeout_start_sec seconds - await asyncio.sleep(self._timeout_start_sec) - - while True: - res: Coroutine[None, None, R | None] = self._func() - - try: - await res - except asyncio.CancelledError: - logger.info("Task '%s' was cancelled", self._func_name) - raise - except Exception as e: - if start_time is None: - start_time = time() - - attempts += 1 - logger.exception( - "Task %r encountered an exception: %r [attempt %s since %s].", - self._func.__name__, - e, - attempts, - datetime.fromtimestamp(start_time), - ) - if not self._restart_on_failure: - if self._exit_on_failure: - raise e - break - if self._start_limit_interval_sec is not None: - if (time() - start_time) > self._start_limit_interval_sec: - # reset attempts if start_limit_interval_sec is exceeded - start_time = time() - attempts = 1 - elif attempts > self._start_limit_burst: - logger.error( - "Task %r exceeded restart burst limit. Stopping.", - self._func.__name__, - ) - if self._exit_on_failure: - raise e - break - await asyncio.sleep(self._restart_sec) - return None - logger.info("Creating task %r", self._func_name) - self._task = self._loop.create_task(run_task()) + self._task = self._loop.create_task(self.__running_task_loop()) self._task.add_done_callback(task_done_callback) + async def __running_task_loop(self) -> R | None: + logger.info("Starting task %r", self._func_name) + self._status = TaskStatus.RUNNING + attempts = 0 + start_time_of_start_limit_interval = None + + await self._handle_startup_timeout() + + while True: + try: + await self._func() + except asyncio.CancelledError: + logger.info("Task '%s' was cancelled", self._func_name) + raise + except Exception as e: + attempts, start_time_of_start_limit_interval = ( + self._handle_task_exception( + e, attempts, start_time_of_start_limit_interval + ) + ) + if not self._should_restart_task( + attempts, start_time_of_start_limit_interval + ): + if self._exit_on_failure: + raise e + break + await asyncio.sleep(self._restart_sec) + return None + + async def _handle_startup_timeout(self) -> None: + """Wait for the configured startup timeout.""" + if self._timeout_start_sec: + await asyncio.sleep(self._timeout_start_sec) + + def _handle_task_exception( + self, + exception: Exception, + attempts: int, + start_time_of_start_limit_interval: float | None, + ) -> tuple[int, float]: + """Handle an exception raised during task execution.""" + if start_time_of_start_limit_interval is None: + start_time_of_start_limit_interval = time() + + attempts += 1 + logger.exception( + "Task %r encountered an exception: %r [attempt %s since %s].", + self._func.__name__, + exception, + attempts, + datetime.fromtimestamp(start_time_of_start_limit_interval), + ) + return attempts, start_time_of_start_limit_interval + + def _should_restart_task( + self, attempts: int, start_time_of_start_limit_interval: float + ) -> bool: + """Determine if the task should be restarted.""" + if not self._restart_on_failure: + return False + + if self._start_limit_interval_sec is not None: + if ( + time() - start_time_of_start_limit_interval + ) > self._start_limit_interval_sec: + # Reset attempts if interval is exceeded + start_time_of_start_limit_interval = time() + attempts = 1 + elif attempts > self._start_limit_burst: + logger.error( + "Task %r exceeded restart burst limit. Stopping.", + self._func.__name__, + ) + return False + return True + def stop(self) -> None: """Stops the running asynchronous task by cancelling it.""" From f83bc0073b0662274f7a520585fce4dbec48bbb6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mose=20M=C3=BCller?= Date: Fri, 17 Jan 2025 20:23:45 +0100 Subject: [PATCH 8/9] fix: tests were expecting linux-type signals --- tests/task/test_task.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/task/test_task.py b/tests/task/test_task.py index 230d95c..8dc5627 100644 --- a/tests/task/test_task.py +++ b/tests/task/test_task.py @@ -423,7 +423,7 @@ async def test_exit_on_failure( service_instance.my_task.start() await asyncio.sleep(0.1) - assert "os.kill called with signal=15 and pid=" in caplog.text + assert "os.kill called with signal=" in caplog.text assert "Task 'my_task' encountered an exception" in caplog.text @@ -453,5 +453,5 @@ async def test_exit_on_failure_exceeding_rate_limit( service_instance.my_task.start() await asyncio.sleep(0.5) - assert "os.kill called with signal=15 and pid=" in caplog.text + assert "os.kill called with signal=" in caplog.text assert "Task 'my_task' encountered an exception" in caplog.text From b9a91e5ee2480b38c36604facc48250240dbb3ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mose=20M=C3=BCller?= Date: Fri, 17 Jan 2025 20:32:27 +0100 Subject: [PATCH 9/9] removes timeout_start_sec I misinterpreted this option as the time to wait before starting the task. This is apparently not what it stands for in systemd.service --- src/pydase/task/decorator.py | 7 ------- src/pydase/task/task.py | 11 ----------- tests/task/test_task.py | 19 ------------------- 3 files changed, 37 deletions(-) diff --git a/src/pydase/task/decorator.py b/src/pydase/task/decorator.py index 542a18e..0149bb5 100644 --- a/src/pydase/task/decorator.py +++ b/src/pydase/task/decorator.py @@ -35,7 +35,6 @@ class PerInstanceTaskDescriptor(Generic[R]): restart_sec: float, start_limit_interval_sec: float | None, start_limit_burst: int, - timeout_start_sec: float, exit_on_failure: bool, ) -> None: self.__func = func @@ -45,7 +44,6 @@ class PerInstanceTaskDescriptor(Generic[R]): self.__restart_sec = restart_sec self.__start_limit_interval_sec = start_limit_interval_sec self.__start_limit_burst = start_limit_burst - self.__timeout_start_sec = timeout_start_sec self.__exit_on_failure = exit_on_failure def __set_name__(self, owner: type[DataService], name: str) -> None: @@ -86,7 +84,6 @@ class PerInstanceTaskDescriptor(Generic[R]): restart_sec=self.__restart_sec, start_limit_interval_sec=self.__start_limit_interval_sec, start_limit_burst=self.__start_limit_burst, - timeout_start_sec=self.__timeout_start_sec, exit_on_failure=self.__exit_on_failure, ), ) @@ -101,7 +98,6 @@ def task( # noqa: PLR0913 restart_sec: float = 1.0, start_limit_interval_sec: float | None = None, start_limit_burst: int = 3, - timeout_start_sec: float = 0.0, exit_on_failure: bool = False, ) -> Callable[ [ @@ -145,8 +141,6 @@ def task( # noqa: PLR0913 Configures unit start rate limiting. Tasks which are started more than `start_limit_burst` times within an `start_limit_interval_sec` time span are not permitted to start any more. Defaults to 3. - timeout_start_sec: - Configures the time to wait for start-up. Defaults to 0.0. exit_on_failure: If True, exit the service if the task fails and restart_on_failure is False or burst limits are exceeded. @@ -194,7 +188,6 @@ def task( # noqa: PLR0913 restart_sec=restart_sec, start_limit_interval_sec=start_limit_interval_sec, start_limit_burst=start_limit_burst, - timeout_start_sec=timeout_start_sec, exit_on_failure=exit_on_failure, ) diff --git a/src/pydase/task/task.py b/src/pydase/task/task.py index 86cd3e2..321a907 100644 --- a/src/pydase/task/task.py +++ b/src/pydase/task/task.py @@ -54,8 +54,6 @@ class Task(pydase.data_service.data_service.DataService, Generic[R]): Configures unit start rate limiting. Tasks which are started more than `start_limit_burst` times within an `start_limit_interval_sec` time span are not permitted to start any more. Defaults to 3. - timeout_start_sec: - Configures the time to wait for start-up. Defaults to 0.0. exit_on_failure: If True, exit the service if the task fails and restart_on_failure is False or burst limits are exceeded. @@ -96,7 +94,6 @@ class Task(pydase.data_service.data_service.DataService, Generic[R]): restart_sec: float, start_limit_interval_sec: float | None, start_limit_burst: int, - timeout_start_sec: float, exit_on_failure: bool, ) -> None: super().__init__() @@ -105,7 +102,6 @@ class Task(pydase.data_service.data_service.DataService, Generic[R]): self._restart_sec = restart_sec self._start_limit_interval_sec = start_limit_interval_sec self._start_limit_burst = start_limit_burst - self._timeout_start_sec = timeout_start_sec self._exit_on_failure = exit_on_failure self._func_name = func.__name__ self._func = func @@ -171,8 +167,6 @@ class Task(pydase.data_service.data_service.DataService, Generic[R]): attempts = 0 start_time_of_start_limit_interval = None - await self._handle_startup_timeout() - while True: try: await self._func() @@ -194,11 +188,6 @@ class Task(pydase.data_service.data_service.DataService, Generic[R]): await asyncio.sleep(self._restart_sec) return None - async def _handle_startup_timeout(self) -> None: - """Wait for the configured startup timeout.""" - if self._timeout_start_sec: - await asyncio.sleep(self._timeout_start_sec) - def _handle_task_exception( self, exception: Exception, diff --git a/tests/task/test_task.py b/tests/task/test_task.py index 8dc5627..ea267ff 100644 --- a/tests/task/test_task.py +++ b/tests/task/test_task.py @@ -383,25 +383,6 @@ async def test_non_exceeding_start_limit_interval_sec_and_burst( assert service_instance.my_task.status == TaskStatus.RUNNING -@pytest.mark.asyncio(scope="function") -async def test_timeout_start_sec(caplog: LogCaptureFixture) -> None: - class MyService(pydase.DataService): - @task(timeout_start_sec=0.2) - async def my_task(self) -> None: - logger.info("Starting task.") - await asyncio.sleep(1) - - service_instance = MyService() - state_manager = StateManager(service_instance) - DataServiceObserver(state_manager) - service_instance.my_task.start() - - await asyncio.sleep(0.1) - assert "Starting task." not in caplog.text - await asyncio.sleep(0.2) - assert "Starting task." in caplog.text - - @pytest.mark.asyncio(scope="function") async def test_exit_on_failure( monkeypatch: pytest.MonkeyPatch, caplog: LogCaptureFixture