mirror of
https://github.com/tiqi-group/pydase.git
synced 2025-06-03 20:30:40 +02:00
fixes tests for DataServiceCache and TaskManager
This commit is contained in:
parent
26689d8578
commit
a72a551f54
@ -67,5 +67,5 @@ async def test_task_status_update() -> None:
|
||||
state_manager._data_service_cache.get_value_dict_from_cache("my_method")[
|
||||
"value"
|
||||
]
|
||||
== {}
|
||||
== "RUNNING"
|
||||
)
|
||||
|
@ -32,8 +32,8 @@ async def test_autostart_task_callback(caplog: LogCaptureFixture) -> None:
|
||||
DataServiceObserver(state_manager)
|
||||
service_instance._task_manager.start_autostart_tasks()
|
||||
|
||||
assert "'my_task' changed to '{}'" in caplog.text
|
||||
assert "'my_other_task' changed to '{}'" in caplog.text
|
||||
assert "'my_task' changed to 'TaskStatus.RUNNING'" in caplog.text
|
||||
assert "'my_other_task' changed to 'TaskStatus.RUNNING'" in caplog.text
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@ -62,8 +62,8 @@ async def test_DataService_subclass_autostart_task_callback(
|
||||
DataServiceObserver(state_manager)
|
||||
service_instance._task_manager.start_autostart_tasks()
|
||||
|
||||
assert "'sub_service.my_task' changed to '{}'" in caplog.text
|
||||
assert "'sub_service.my_other_task' changed to '{}'" in caplog.text
|
||||
assert "'sub_service.my_task' changed to 'TaskStatus.RUNNING'" in caplog.text
|
||||
assert "'sub_service.my_other_task' changed to 'TaskStatus.RUNNING'" in caplog.text
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@ -92,10 +92,20 @@ async def test_DataService_subclass_list_autostart_task_callback(
|
||||
DataServiceObserver(state_manager)
|
||||
service_instance._task_manager.start_autostart_tasks()
|
||||
|
||||
assert "'sub_services_list[0].my_task' changed to '{}'" in caplog.text
|
||||
assert "'sub_services_list[0].my_other_task' changed to '{}'" in caplog.text
|
||||
assert "'sub_services_list[1].my_task' changed to '{}'" in caplog.text
|
||||
assert "'sub_services_list[1].my_other_task' changed to '{}'" in caplog.text
|
||||
assert (
|
||||
"'sub_services_list[0].my_task' changed to 'TaskStatus.RUNNING'" in caplog.text
|
||||
)
|
||||
assert (
|
||||
"'sub_services_list[0].my_other_task' changed to 'TaskStatus.RUNNING'"
|
||||
in caplog.text
|
||||
)
|
||||
assert (
|
||||
"'sub_services_list[1].my_task' changed to 'TaskStatus.RUNNING'" in caplog.text
|
||||
)
|
||||
assert (
|
||||
"'sub_services_list[1].my_other_task' changed to 'TaskStatus.RUNNING'"
|
||||
in caplog.text
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@ -104,20 +114,20 @@ async def test_start_and_stop_task_methods(caplog: LogCaptureFixture) -> None:
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
|
||||
async def my_task(self, param: str) -> None:
|
||||
async def my_task(self) -> None:
|
||||
while True:
|
||||
logger.debug("Logging param: %s", param)
|
||||
logger.debug("Logging message")
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
# Your test code here
|
||||
service_instance = MyService()
|
||||
state_manager = StateManager(service_instance)
|
||||
DataServiceObserver(state_manager)
|
||||
service_instance.start_my_task("Hello")
|
||||
service_instance.start_my_task()
|
||||
await asyncio.sleep(0.01)
|
||||
|
||||
assert "'my_task' changed to '{'param': 'Hello'}'" in caplog.text
|
||||
assert "Logging param: Hello" in caplog.text
|
||||
assert "'my_task' changed to 'TaskStatus.RUNNING'" in caplog.text
|
||||
assert "Logging message" in caplog.text
|
||||
caplog.clear()
|
||||
|
||||
service_instance.stop_my_task()
|
||||
|
Loading…
x
Reference in New Issue
Block a user