From c66b90c4e59cc19dff4dfd232f69e8ffbd6a3f3b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mose=20M=C3=BCller?= Date: Fri, 17 Jan 2025 20:21:00 +0100 Subject: [PATCH] chore: refactoring Task --- src/pydase/task/task.py | 128 ++++++++++++++++++++++++---------------- 1 file changed, 77 insertions(+), 51 deletions(-) diff --git a/src/pydase/task/task.py b/src/pydase/task/task.py index 0c4b157..86cd3e2 100644 --- a/src/pydase/task/task.py +++ b/src/pydase/task/task.py @@ -161,60 +161,86 @@ class Task(pydase.data_service.data_service.DataService, Generic[R]): else: self._result = task.result() - async def run_task() -> R | None: - logger.info("Starting task %r", self._func_name) - self._status = TaskStatus.RUNNING - attempts = 0 - start_time = None - - if self._timeout_start_sec: - # Wait for timeout_start_sec seconds - await asyncio.sleep(self._timeout_start_sec) - - while True: - res: Coroutine[None, None, R | None] = self._func() - - try: - await res - except asyncio.CancelledError: - logger.info("Task '%s' was cancelled", self._func_name) - raise - except Exception as e: - if start_time is None: - start_time = time() - - attempts += 1 - logger.exception( - "Task %r encountered an exception: %r [attempt %s since %s].", - self._func.__name__, - e, - attempts, - datetime.fromtimestamp(start_time), - ) - if not self._restart_on_failure: - if self._exit_on_failure: - raise e - break - if self._start_limit_interval_sec is not None: - if (time() - start_time) > self._start_limit_interval_sec: - # reset attempts if start_limit_interval_sec is exceeded - start_time = time() - attempts = 1 - elif attempts > self._start_limit_burst: - logger.error( - "Task %r exceeded restart burst limit. Stopping.", - self._func.__name__, - ) - if self._exit_on_failure: - raise e - break - await asyncio.sleep(self._restart_sec) - return None - logger.info("Creating task %r", self._func_name) - self._task = self._loop.create_task(run_task()) + self._task = self._loop.create_task(self.__running_task_loop()) self._task.add_done_callback(task_done_callback) + async def __running_task_loop(self) -> R | None: + logger.info("Starting task %r", self._func_name) + self._status = TaskStatus.RUNNING + attempts = 0 + start_time_of_start_limit_interval = None + + await self._handle_startup_timeout() + + while True: + try: + await self._func() + except asyncio.CancelledError: + logger.info("Task '%s' was cancelled", self._func_name) + raise + except Exception as e: + attempts, start_time_of_start_limit_interval = ( + self._handle_task_exception( + e, attempts, start_time_of_start_limit_interval + ) + ) + if not self._should_restart_task( + attempts, start_time_of_start_limit_interval + ): + if self._exit_on_failure: + raise e + break + await asyncio.sleep(self._restart_sec) + return None + + async def _handle_startup_timeout(self) -> None: + """Wait for the configured startup timeout.""" + if self._timeout_start_sec: + await asyncio.sleep(self._timeout_start_sec) + + def _handle_task_exception( + self, + exception: Exception, + attempts: int, + start_time_of_start_limit_interval: float | None, + ) -> tuple[int, float]: + """Handle an exception raised during task execution.""" + if start_time_of_start_limit_interval is None: + start_time_of_start_limit_interval = time() + + attempts += 1 + logger.exception( + "Task %r encountered an exception: %r [attempt %s since %s].", + self._func.__name__, + exception, + attempts, + datetime.fromtimestamp(start_time_of_start_limit_interval), + ) + return attempts, start_time_of_start_limit_interval + + def _should_restart_task( + self, attempts: int, start_time_of_start_limit_interval: float + ) -> bool: + """Determine if the task should be restarted.""" + if not self._restart_on_failure: + return False + + if self._start_limit_interval_sec is not None: + if ( + time() - start_time_of_start_limit_interval + ) > self._start_limit_interval_sec: + # Reset attempts if interval is exceeded + start_time_of_start_limit_interval = time() + attempts = 1 + elif attempts > self._start_limit_burst: + logger.error( + "Task %r exceeded restart burst limit. Stopping.", + self._func.__name__, + ) + return False + return True + def stop(self) -> None: """Stops the running asynchronous task by cancelling it."""