task: adds exit_on_failure option

This commit is contained in:
Mose Müller 2025-01-17 17:32:31 +01:00
parent 8a8ac9d297
commit 1b35dba64f
2 changed files with 28 additions and 8 deletions

View File

@ -36,6 +36,7 @@ class PerInstanceTaskDescriptor(Generic[R]):
start_limit_interval_sec: float | None = None,
start_limit_burst: int = 3,
timeout_start_sec: float = 0.0,
exit_on_failure: bool = False,
) -> None:
self.__func = func
self.__autostart = autostart
@ -45,6 +46,7 @@ class PerInstanceTaskDescriptor(Generic[R]):
self.__start_limit_interval_sec = start_limit_interval_sec
self.__start_limit_burst = start_limit_burst
self.__timeout_start_sec = timeout_start_sec
self.__exit_on_failure = exit_on_failure
def __set_name__(self, owner: type[DataService], name: str) -> None:
"""Stores the name of the task within the owning class. This method is called
@ -85,6 +87,7 @@ class PerInstanceTaskDescriptor(Generic[R]):
start_limit_interval_sec=self.__start_limit_interval_sec,
start_limit_burst=self.__start_limit_burst,
timeout_start_sec=self.__timeout_start_sec,
exit_on_failure=self.__exit_on_failure,
),
)
@ -99,6 +102,7 @@ def task( # noqa: PLR0913
start_limit_interval_sec: float | None = None,
start_limit_burst: int = 3,
timeout_start_sec: float = 0.0,
exit_on_failure: bool = False,
) -> Callable[
[
Callable[[Any], Coroutine[None, None, R]]
@ -143,6 +147,9 @@ def task( # noqa: PLR0913
not permitted to start any more. Defaults to 3.
timeout_start_sec:
Configures the time to wait for start-up. Defaults to 0.0.
exit_on_failure:
If True, exit the service if the task fails and restart_on_failure is False
or burst limits are exceeded.
Returns:
A decorator that wraps an asynchronous function in a
[`PerInstanceTaskDescriptor`][pydase.task.decorator.PerInstanceTaskDescriptor]
@ -188,6 +195,7 @@ def task( # noqa: PLR0913
start_limit_interval_sec=start_limit_interval_sec,
start_limit_burst=start_limit_burst,
timeout_start_sec=timeout_start_sec,
exit_on_failure=exit_on_failure,
)
return decorator

View File

@ -1,6 +1,8 @@
import asyncio
import inspect
import logging
import os
import signal
from collections.abc import Callable, Coroutine
from datetime import datetime
from time import time
@ -55,6 +57,9 @@ class Task(pydase.data_service.data_service.DataService, Generic[R]):
not permitted to start any more. Defaults to 3.
timeout_start_sec:
Configures the time to wait for start-up. Defaults to 0.0.
exit_on_failure:
If True, exit the service if the task fails and restart_on_failure is False
or burst limits are exceeded.
Example:
```python
@ -93,6 +98,7 @@ class Task(pydase.data_service.data_service.DataService, Generic[R]):
start_limit_interval_sec: float | None = None,
start_limit_burst: int = 3,
timeout_start_sec: float = 0.0,
exit_on_failure: bool = False,
) -> None:
super().__init__()
self._autostart = autostart
@ -101,6 +107,7 @@ class Task(pydase.data_service.data_service.DataService, Generic[R]):
self._start_limit_interval_sec = start_limit_interval_sec
self._start_limit_burst = start_limit_burst
self._timeout_start_sec = timeout_start_sec
self._exit_on_failure = exit_on_failure
self._func_name = func.__name__
self._func = func
self._task: asyncio.Task[R | None] | None = None
@ -139,18 +146,19 @@ class Task(pydase.data_service.data_service.DataService, Generic[R]):
self._task = None
self._status = TaskStatus.NOT_RUNNING
exception = None
try:
task.exception()
exception = task.exception()
except asyncio.CancelledError:
pass
except Exception as e:
logger.exception(
"Task '%s' encountered an exception: %s: %s",
return
if exception is not None:
logger.error(
"Task '%s' encountered an exception: %r",
self._func_name,
type(e).__name__,
e,
exception,
)
raise e
os.kill(os.getpid(), signal.SIGTERM)
else:
self._result = task.result()
@ -186,6 +194,8 @@ class Task(pydase.data_service.data_service.DataService, Generic[R]):
datetime.fromtimestamp(start_time),
)
if not self._restart_on_failure:
if self._exit_on_failure:
raise e
break
if self._start_limit_interval_sec is not None:
if (time() - start_time) > self._start_limit_interval_sec:
@ -197,6 +207,8 @@ class Task(pydase.data_service.data_service.DataService, Generic[R]):
"Task %r exceeded restart burst limit. Stopping.",
self._func.__name__,
)
if self._exit_on_failure:
raise e
break
await asyncio.sleep(self._restart_sec)
return None