test: updates task tests

This commit is contained in:
Mose Müller 2024-09-21 09:04:04 +02:00
parent e48046795e
commit 20028c379d

View File

@ -18,9 +18,9 @@ async def test_start_and_stop_task(caplog: LogCaptureFixture) -> None:
class MyService(pydase.DataService): class MyService(pydase.DataService):
@task() @task()
async def my_task(self) -> None: async def my_task(self) -> None:
logger.info("Triggered task.")
while True: while True:
logger.debug("Logging message") await asyncio.sleep(1)
await asyncio.sleep(0.01)
# Your test code here # Your test code here
service_instance = MyService() service_instance = MyService()
@ -28,16 +28,20 @@ async def test_start_and_stop_task(caplog: LogCaptureFixture) -> None:
DataServiceObserver(state_manager) DataServiceObserver(state_manager)
autostart_service_tasks(service_instance) autostart_service_tasks(service_instance)
await asyncio.sleep(0.1)
assert service_instance.my_task.status == TaskStatus.NOT_RUNNING
service_instance.my_task.start() service_instance.my_task.start()
await asyncio.sleep(0.1) await asyncio.sleep(0.1)
assert service_instance.my_task.status == TaskStatus.RUNNING
assert "'my_task.status' changed to 'TaskStatus.RUNNING'" in caplog.text assert "'my_task.status' changed to 'TaskStatus.RUNNING'" in caplog.text
assert "Logging message" in caplog.text assert "Triggered task." in caplog.text
caplog.clear() caplog.clear()
service_instance.my_task.stop() service_instance.my_task.stop()
await asyncio.sleep(0.1) await asyncio.sleep(0.1)
assert service_instance.my_task.status == TaskStatus.NOT_RUNNING
assert "Task 'my_task' was cancelled" in caplog.text assert "Task 'my_task' was cancelled" in caplog.text
@ -47,6 +51,8 @@ async def test_autostart_task(caplog: LogCaptureFixture) -> None:
@task(autostart=True) @task(autostart=True)
async def my_task(self) -> None: async def my_task(self) -> None:
logger.info("Triggered task.") logger.info("Triggered task.")
while True:
await asyncio.sleep(1)
# Your test code here # Your test code here
service_instance = MyService() service_instance = MyService()
@ -56,6 +62,7 @@ async def test_autostart_task(caplog: LogCaptureFixture) -> None:
autostart_service_tasks(service_instance) autostart_service_tasks(service_instance)
await asyncio.sleep(0.1) await asyncio.sleep(0.1)
assert service_instance.my_task.status == TaskStatus.RUNNING
assert "'my_task.status' changed to 'TaskStatus.RUNNING'" in caplog.text assert "'my_task.status' changed to 'TaskStatus.RUNNING'" in caplog.text
@ -68,6 +75,8 @@ async def test_nested_list_autostart_task(
@task(autostart=True) @task(autostart=True)
async def my_task(self) -> None: async def my_task(self) -> None:
logger.info("Triggered task.") logger.info("Triggered task.")
while True:
await asyncio.sleep(1)
class MyService(pydase.DataService): class MyService(pydase.DataService):
sub_services_list = [MySubService() for i in range(2)] sub_services_list = [MySubService() for i in range(2)]
@ -78,6 +87,8 @@ async def test_nested_list_autostart_task(
autostart_service_tasks(service_instance) autostart_service_tasks(service_instance)
await asyncio.sleep(0.1) await asyncio.sleep(0.1)
assert service_instance.sub_services_list[0].my_task.status == TaskStatus.RUNNING
assert service_instance.sub_services_list[1].my_task.status == TaskStatus.RUNNING
assert ( assert (
"'sub_services_list[0].my_task.status' changed to 'TaskStatus.RUNNING'" "'sub_services_list[0].my_task.status' changed to 'TaskStatus.RUNNING'"
@ -114,6 +125,10 @@ async def test_nested_dict_autostart_task(
assert ( assert (
service_instance.sub_services_dict["first"].my_task.status == TaskStatus.RUNNING service_instance.sub_services_dict["first"].my_task.status == TaskStatus.RUNNING
) )
assert (
service_instance.sub_services_dict["second"].my_task.status
== TaskStatus.RUNNING
)
assert ( assert (
"'sub_services_dict[\"first\"].my_task.status' changed to 'TaskStatus.RUNNING'" "'sub_services_dict[\"first\"].my_task.status' changed to 'TaskStatus.RUNNING'"