Merge pull request #203 from tiqi-group/feat/add_more_task_config_options

chore: adds task docs, renames restart_on_failure to restart_on_exception
This commit is contained in:
Mose Müller 2025-01-18 07:23:16 +01:00 committed by GitHub
commit de1483bdc5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 68 additions and 24 deletions

View File

@ -1,8 +1,8 @@
# Understanding Tasks # Understanding Tasks
In `pydase`, a task is defined as an asynchronous function without arguments that is decorated with the `@task` decorator and contained in a class that inherits from `pydase.DataService`. These tasks usually contain a while loop and are designed to carry out periodic functions. For example, a task might be used to periodically read sensor data, update a database, or perform any other recurring job. In `pydase`, a task is defined as an asynchronous function without arguments that is decorated with the [`@task`][pydase.task.decorator.task] decorator and contained in a class that inherits from [`pydase.DataService`][pydase.DataService]. These tasks usually contain a while loop and are designed to carry out periodic functions. For example, a task might be used to periodically read sensor data, update a database, or perform any other recurring job.
`pydase` allows you to control task execution via both the frontend and Python clients and can automatically start tasks upon initialization of the service. By using the `@task` decorator with the `autostart=True` argument in your service class, `pydase` will automatically start these tasks when the server is started. Here's an example: `pydase` allows you to control task execution via both the frontend and Python clients and can automatically start tasks upon initialization of the service. By using the [`@task`][pydase.task.decorator.task] decorator with the `autostart=True` argument in your service class, `pydase` will automatically start these tasks when the server is started. Here's an example:
```python ```python
import pydase import pydase
@ -35,4 +35,48 @@ if __name__ == "__main__":
In this example, `read_sensor_data` is a task that continuously reads data from a sensor. By decorating it with `@task(autostart=True)`, it will automatically start running when `pydase.Server(service).run()` is executed. In this example, `read_sensor_data` is a task that continuously reads data from a sensor. By decorating it with `@task(autostart=True)`, it will automatically start running when `pydase.Server(service).run()` is executed.
The `@task` decorator replaces the function with a task object that has `start()` and `stop()` methods. This means you can control the task execution directly using these methods. For instance, you can manually start or stop the task by calling `service.read_sensor_data.start()` and `service.read_sensor_data.stop()`, respectively. ## Task Lifecycle Control
The [`@task`][pydase.task.decorator.task] decorator replaces the function with a task object that has `start()` and `stop()` methods. This means you can control the task execution directly using these methods. For instance, you can manually start or stop the task by calling `service.read_sensor_data.start()` and `service.read_sensor_data.stop()`, respectively.
## Advanced Task Options
The [`@task`][pydase.task.decorator.task] decorator supports several options inspired by systemd unit services, allowing fine-grained control over task behavior:
- **`autostart`**: Automatically starts the task when the service initializes. Defaults to `False`.
- **`restart_on_exception`**: Configures whether the task should restart if it exits due to an exception (other than `asyncio.CancelledError`). Defaults to `True`.
- **`restart_sec`**: Specifies the delay (in seconds) before restarting a failed task. Defaults to `1.0`.
- **`start_limit_interval_sec`**: Configures a time window (in seconds) for rate limiting task restarts. If the task restarts more than `start_limit_burst` times within this interval, it will no longer restart. Defaults to `None` (disabled).
- **`start_limit_burst`**: Defines the maximum number of restarts allowed within the interval specified by `start_limit_interval_sec`. Defaults to `3`.
- **`exit_on_failure`**: If set to `True`, the service will exit if the task fails and either `restart_on_exception` is `False` or the start rate limiting is exceeded. Defaults to `False`.
### Example with Advanced Options
Here is an example showcasing advanced task options:
```python
import pydase
from pydase.task.decorator import task
class AdvancedTaskService(pydase.DataService):
def __init__(self):
super().__init__()
@task(
autostart=True,
restart_on_exception=True,
restart_sec=2.0,
start_limit_interval_sec=10.0,
start_limit_burst=5,
exit_on_failure=True,
)
async def critical_task(self):
while True:
raise Exception("Critical failure")
if __name__ == "__main__":
service = AdvancedTaskService()
pydase.Server(service=service).run()
```

View File

@ -31,7 +31,7 @@ class PerInstanceTaskDescriptor(Generic[R]):
func: Callable[[Any], Coroutine[None, None, R]] func: Callable[[Any], Coroutine[None, None, R]]
| Callable[[], Coroutine[None, None, R]], | Callable[[], Coroutine[None, None, R]],
autostart: bool, autostart: bool,
restart_on_failure: bool, restart_on_exception: bool,
restart_sec: float, restart_sec: float,
start_limit_interval_sec: float | None, start_limit_interval_sec: float | None,
start_limit_burst: int, start_limit_burst: int,
@ -40,7 +40,7 @@ class PerInstanceTaskDescriptor(Generic[R]):
self.__func = func self.__func = func
self.__autostart = autostart self.__autostart = autostart
self.__task_instances: dict[object, Task[R]] = {} self.__task_instances: dict[object, Task[R]] = {}
self.__restart_on_failure = restart_on_failure self.__restart_on_exception = restart_on_exception
self.__restart_sec = restart_sec self.__restart_sec = restart_sec
self.__start_limit_interval_sec = start_limit_interval_sec self.__start_limit_interval_sec = start_limit_interval_sec
self.__start_limit_burst = start_limit_burst self.__start_limit_burst = start_limit_burst
@ -80,7 +80,7 @@ class PerInstanceTaskDescriptor(Generic[R]):
Task( Task(
self.__func.__get__(instance, owner), self.__func.__get__(instance, owner),
autostart=self.__autostart, autostart=self.__autostart,
restart_on_failure=self.__restart_on_failure, restart_on_exception=self.__restart_on_exception,
restart_sec=self.__restart_sec, restart_sec=self.__restart_sec,
start_limit_interval_sec=self.__start_limit_interval_sec, start_limit_interval_sec=self.__start_limit_interval_sec,
start_limit_burst=self.__start_limit_burst, start_limit_burst=self.__start_limit_burst,
@ -94,7 +94,7 @@ class PerInstanceTaskDescriptor(Generic[R]):
def task( # noqa: PLR0913 def task( # noqa: PLR0913
*, *,
autostart: bool = False, autostart: bool = False,
restart_on_failure: bool = True, restart_on_exception: bool = True,
restart_sec: float = 1.0, restart_sec: float = 1.0,
start_limit_interval_sec: float | None = None, start_limit_interval_sec: float | None = None,
start_limit_burst: int = 3, start_limit_burst: int = 3,
@ -128,7 +128,7 @@ def task( # noqa: PLR0913
autostart: autostart:
If set to True, the task will automatically start when the service is If set to True, the task will automatically start when the service is
initialized. Defaults to False. initialized. Defaults to False.
restart_on_failure: restart_on_exception:
Configures whether the task shall be restarted when it exits with an Configures whether the task shall be restarted when it exits with an
exception other than [`asyncio.CancelledError`][asyncio.CancelledError]. exception other than [`asyncio.CancelledError`][asyncio.CancelledError].
restart_sec: restart_sec:
@ -142,8 +142,8 @@ def task( # noqa: PLR0913
`start_limit_burst` times within an `start_limit_interval_sec` time span are `start_limit_burst` times within an `start_limit_interval_sec` time span are
not permitted to start any more. Defaults to 3. not permitted to start any more. Defaults to 3.
exit_on_failure: exit_on_failure:
If True, exit the service if the task fails and restart_on_failure is False If True, exit the service if the task fails and restart_on_exception is
or burst limits are exceeded. False or burst limits are exceeded.
Returns: Returns:
A decorator that wraps an asynchronous function in a A decorator that wraps an asynchronous function in a
[`PerInstanceTaskDescriptor`][pydase.task.decorator.PerInstanceTaskDescriptor] [`PerInstanceTaskDescriptor`][pydase.task.decorator.PerInstanceTaskDescriptor]
@ -184,7 +184,7 @@ def task( # noqa: PLR0913
return PerInstanceTaskDescriptor( return PerInstanceTaskDescriptor(
func, func,
autostart=autostart, autostart=autostart,
restart_on_failure=restart_on_failure, restart_on_exception=restart_on_exception,
restart_sec=restart_sec, restart_sec=restart_sec,
start_limit_interval_sec=start_limit_interval_sec, start_limit_interval_sec=start_limit_interval_sec,
start_limit_burst=start_limit_burst, start_limit_burst=start_limit_burst,

View File

@ -41,7 +41,7 @@ class Task(pydase.data_service.data_service.DataService, Generic[R]):
autostart: autostart:
If set to True, the task will automatically start when the service is If set to True, the task will automatically start when the service is
initialized. Defaults to False. initialized. Defaults to False.
restart_on_failure: restart_on_exception:
Configures whether the task shall be restarted when it exits with an Configures whether the task shall be restarted when it exits with an
exception other than [`asyncio.CancelledError`][asyncio.CancelledError]. exception other than [`asyncio.CancelledError`][asyncio.CancelledError].
restart_sec: restart_sec:
@ -55,8 +55,8 @@ class Task(pydase.data_service.data_service.DataService, Generic[R]):
`start_limit_burst` times within an `start_limit_interval_sec` time span are `start_limit_burst` times within an `start_limit_interval_sec` time span are
not permitted to start any more. Defaults to 3. not permitted to start any more. Defaults to 3.
exit_on_failure: exit_on_failure:
If True, exit the service if the task fails and restart_on_failure is False If True, exit the service if the task fails and restart_on_exception is
or burst limits are exceeded. False or burst limits are exceeded.
Example: Example:
```python ```python
@ -90,7 +90,7 @@ class Task(pydase.data_service.data_service.DataService, Generic[R]):
func: Callable[[], Coroutine[None, None, R | None]], func: Callable[[], Coroutine[None, None, R | None]],
*, *,
autostart: bool, autostart: bool,
restart_on_failure: bool, restart_on_exception: bool,
restart_sec: float, restart_sec: float,
start_limit_interval_sec: float | None, start_limit_interval_sec: float | None,
start_limit_burst: int, start_limit_burst: int,
@ -98,7 +98,7 @@ class Task(pydase.data_service.data_service.DataService, Generic[R]):
) -> None: ) -> None:
super().__init__() super().__init__()
self._autostart = autostart self._autostart = autostart
self._restart_on_failure = restart_on_failure self._restart_on_exception = restart_on_exception
self._restart_sec = restart_sec self._restart_sec = restart_sec
self._start_limit_interval_sec = start_limit_interval_sec self._start_limit_interval_sec = start_limit_interval_sec
self._start_limit_burst = start_limit_burst self._start_limit_burst = start_limit_burst
@ -212,7 +212,7 @@ class Task(pydase.data_service.data_service.DataService, Generic[R]):
self, attempts: int, start_time_of_start_limit_interval: float self, attempts: int, start_time_of_start_limit_interval: float
) -> bool: ) -> bool:
"""Determine if the task should be restarted.""" """Determine if the task should be restarted."""
if not self._restart_on_failure: if not self._restart_on_exception:
return False return False
if self._start_limit_interval_sec is not None: if self._start_limit_interval_sec is not None:

View File

@ -292,9 +292,9 @@ async def test_manual_start_with_multiple_service_instances(
@pytest.mark.asyncio(scope="function") @pytest.mark.asyncio(scope="function")
async def test_restart_on_failure(caplog: LogCaptureFixture) -> None: async def test_restart_on_exception(caplog: LogCaptureFixture) -> None:
class MyService(pydase.DataService): class MyService(pydase.DataService):
@task(restart_on_failure=True, restart_sec=0.1) @task(restart_on_exception=True, restart_sec=0.1)
async def my_task(self) -> None: async def my_task(self) -> None:
logger.info("Triggered task.") logger.info("Triggered task.")
raise Exception("Task failure") raise Exception("Task failure")
@ -316,7 +316,7 @@ async def test_restart_on_failure(caplog: LogCaptureFixture) -> None:
@pytest.mark.asyncio(scope="function") @pytest.mark.asyncio(scope="function")
async def test_restart_sec(caplog: LogCaptureFixture) -> None: async def test_restart_sec(caplog: LogCaptureFixture) -> None:
class MyService(pydase.DataService): class MyService(pydase.DataService):
@task(restart_on_failure=True, restart_sec=0.1) @task(restart_on_exception=True, restart_sec=0.1)
async def my_task(self) -> None: async def my_task(self) -> None:
logger.info("Triggered task.") logger.info("Triggered task.")
raise Exception("Task failure") raise Exception("Task failure")
@ -341,7 +341,7 @@ async def test_exceeding_start_limit_interval_sec_and_burst(
) -> None: ) -> None:
class MyService(pydase.DataService): class MyService(pydase.DataService):
@task( @task(
restart_on_failure=True, restart_on_exception=True,
restart_sec=0.0, restart_sec=0.0,
start_limit_interval_sec=1.0, start_limit_interval_sec=1.0,
start_limit_burst=2, start_limit_burst=2,
@ -365,7 +365,7 @@ async def test_non_exceeding_start_limit_interval_sec_and_burst(
) -> None: ) -> None:
class MyService(pydase.DataService): class MyService(pydase.DataService):
@task( @task(
restart_on_failure=True, restart_on_exception=True,
restart_sec=0.1, restart_sec=0.1,
start_limit_interval_sec=0.1, start_limit_interval_sec=0.1,
start_limit_burst=2, start_limit_burst=2,
@ -388,7 +388,7 @@ async def test_exit_on_failure(
monkeypatch: pytest.MonkeyPatch, caplog: LogCaptureFixture monkeypatch: pytest.MonkeyPatch, caplog: LogCaptureFixture
) -> None: ) -> None:
class MyService(pydase.DataService): class MyService(pydase.DataService):
@task(restart_on_failure=False, exit_on_failure=True) @task(restart_on_exception=False, exit_on_failure=True)
async def my_task(self) -> None: async def my_task(self) -> None:
logger.info("Triggered task.") logger.info("Triggered task.")
raise Exception("Critical failure") raise Exception("Critical failure")
@ -414,7 +414,7 @@ async def test_exit_on_failure_exceeding_rate_limit(
) -> None: ) -> None:
class MyService(pydase.DataService): class MyService(pydase.DataService):
@task( @task(
restart_on_failure=True, restart_on_exception=True,
restart_sec=0.0, restart_sec=0.0,
start_limit_interval_sec=0.1, start_limit_interval_sec=0.1,
start_limit_burst=2, start_limit_burst=2,