chore: refactoring Task

This commit is contained in:
Mose Müller 2025-01-17 20:21:00 +01:00
parent d0b0803407
commit c66b90c4e5

View File

@ -161,60 +161,86 @@ class Task(pydase.data_service.data_service.DataService, Generic[R]):
else:
self._result = task.result()
async def run_task() -> R | None:
logger.info("Starting task %r", self._func_name)
self._status = TaskStatus.RUNNING
attempts = 0
start_time = None
if self._timeout_start_sec:
# Wait for timeout_start_sec seconds
await asyncio.sleep(self._timeout_start_sec)
while True:
res: Coroutine[None, None, R | None] = self._func()
try:
await res
except asyncio.CancelledError:
logger.info("Task '%s' was cancelled", self._func_name)
raise
except Exception as e:
if start_time is None:
start_time = time()
attempts += 1
logger.exception(
"Task %r encountered an exception: %r [attempt %s since %s].",
self._func.__name__,
e,
attempts,
datetime.fromtimestamp(start_time),
)
if not self._restart_on_failure:
if self._exit_on_failure:
raise e
break
if self._start_limit_interval_sec is not None:
if (time() - start_time) > self._start_limit_interval_sec:
# reset attempts if start_limit_interval_sec is exceeded
start_time = time()
attempts = 1
elif attempts > self._start_limit_burst:
logger.error(
"Task %r exceeded restart burst limit. Stopping.",
self._func.__name__,
)
if self._exit_on_failure:
raise e
break
await asyncio.sleep(self._restart_sec)
return None
logger.info("Creating task %r", self._func_name)
self._task = self._loop.create_task(run_task())
self._task = self._loop.create_task(self.__running_task_loop())
self._task.add_done_callback(task_done_callback)
async def __running_task_loop(self) -> R | None:
logger.info("Starting task %r", self._func_name)
self._status = TaskStatus.RUNNING
attempts = 0
start_time_of_start_limit_interval = None
await self._handle_startup_timeout()
while True:
try:
await self._func()
except asyncio.CancelledError:
logger.info("Task '%s' was cancelled", self._func_name)
raise
except Exception as e:
attempts, start_time_of_start_limit_interval = (
self._handle_task_exception(
e, attempts, start_time_of_start_limit_interval
)
)
if not self._should_restart_task(
attempts, start_time_of_start_limit_interval
):
if self._exit_on_failure:
raise e
break
await asyncio.sleep(self._restart_sec)
return None
async def _handle_startup_timeout(self) -> None:
"""Wait for the configured startup timeout."""
if self._timeout_start_sec:
await asyncio.sleep(self._timeout_start_sec)
def _handle_task_exception(
self,
exception: Exception,
attempts: int,
start_time_of_start_limit_interval: float | None,
) -> tuple[int, float]:
"""Handle an exception raised during task execution."""
if start_time_of_start_limit_interval is None:
start_time_of_start_limit_interval = time()
attempts += 1
logger.exception(
"Task %r encountered an exception: %r [attempt %s since %s].",
self._func.__name__,
exception,
attempts,
datetime.fromtimestamp(start_time_of_start_limit_interval),
)
return attempts, start_time_of_start_limit_interval
def _should_restart_task(
self, attempts: int, start_time_of_start_limit_interval: float
) -> bool:
"""Determine if the task should be restarted."""
if not self._restart_on_failure:
return False
if self._start_limit_interval_sec is not None:
if (
time() - start_time_of_start_limit_interval
) > self._start_limit_interval_sec:
# Reset attempts if interval is exceeded
start_time_of_start_limit_interval = time()
attempts = 1
elif attempts > self._start_limit_burst:
logger.error(
"Task %r exceeded restart burst limit. Stopping.",
self._func.__name__,
)
return False
return True
def stop(self) -> None:
"""Stops the running asynchronous task by cancelling it."""