mirror of
https://github.com/tiqi-group/pydase.git
synced 2025-06-23 04:27:58 +02:00
updates tests
This commit is contained in:
@ -1,8 +1,8 @@
|
||||
import logging
|
||||
|
||||
import pydase
|
||||
from pydase.data_service.data_service_cache import DataServiceCache
|
||||
from pydase.utils.serializer import get_nested_dict_by_path
|
||||
from pydase.data_service.data_service_observer import DataServiceObserver
|
||||
from pydase.data_service.state_manager import StateManager
|
||||
|
||||
logger = logging.getLogger()
|
||||
|
||||
@ -15,14 +15,23 @@ def test_nested_attributes_cache_callback() -> None:
|
||||
class_attr = SubClass()
|
||||
name = "World"
|
||||
|
||||
test_service = ServiceClass()
|
||||
cache = DataServiceCache(test_service)
|
||||
service_instance = ServiceClass()
|
||||
state_manager = StateManager(service_instance)
|
||||
DataServiceObserver(state_manager)
|
||||
|
||||
test_service.name = "Peepz"
|
||||
assert get_nested_dict_by_path(cache.cache, "name")["value"] == "Peepz"
|
||||
service_instance.name = "Peepz"
|
||||
assert (
|
||||
state_manager._data_service_cache.get_value_dict_from_cache("name")["value"]
|
||||
== "Peepz"
|
||||
)
|
||||
|
||||
test_service.class_attr.name = "Ciao"
|
||||
assert get_nested_dict_by_path(cache.cache, "class_attr.name")["value"] == "Ciao"
|
||||
service_instance.class_attr.name = "Ciao"
|
||||
assert (
|
||||
state_manager._data_service_cache.get_value_dict_from_cache("class_attr.name")[
|
||||
"value"
|
||||
]
|
||||
== "Ciao"
|
||||
)
|
||||
|
||||
|
||||
def test_task_status_update() -> None:
|
||||
@ -32,11 +41,29 @@ def test_task_status_update() -> None:
|
||||
async def my_method(self) -> None:
|
||||
pass
|
||||
|
||||
test_service = ServiceClass()
|
||||
cache = DataServiceCache(test_service)
|
||||
assert get_nested_dict_by_path(cache.cache, "my_method")["type"] == "method"
|
||||
assert get_nested_dict_by_path(cache.cache, "my_method")["value"] is None
|
||||
service_instance = ServiceClass()
|
||||
state_manager = StateManager(service_instance)
|
||||
DataServiceObserver(state_manager)
|
||||
|
||||
test_service.start_my_method() # type: ignore
|
||||
assert get_nested_dict_by_path(cache.cache, "my_method")["type"] == "method"
|
||||
assert get_nested_dict_by_path(cache.cache, "my_method")["value"] == {}
|
||||
assert (
|
||||
state_manager._data_service_cache.get_value_dict_from_cache("my_method")["type"]
|
||||
== "method"
|
||||
)
|
||||
assert (
|
||||
state_manager._data_service_cache.get_value_dict_from_cache("my_method")[
|
||||
"value"
|
||||
]
|
||||
is None
|
||||
)
|
||||
|
||||
service_instance.start_my_method() # type: ignore
|
||||
assert (
|
||||
state_manager._data_service_cache.get_value_dict_from_cache("my_method")["type"]
|
||||
== "method"
|
||||
)
|
||||
assert (
|
||||
state_manager._data_service_cache.get_value_dict_from_cache("my_method")[
|
||||
"value"
|
||||
]
|
||||
== {}
|
||||
)
|
||||
|
@ -2,8 +2,6 @@ import json
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from pytest import LogCaptureFixture
|
||||
|
||||
import pydase
|
||||
import pydase.units as u
|
||||
from pydase.components.coloured_enum import ColouredEnum
|
||||
@ -12,6 +10,7 @@ from pydase.data_service.state_manager import (
|
||||
has_load_state_decorator,
|
||||
load_state,
|
||||
)
|
||||
from pytest import LogCaptureFixture
|
||||
|
||||
|
||||
class SubService(pydase.DataService):
|
||||
@ -26,6 +25,7 @@ class State(ColouredEnum):
|
||||
|
||||
class Service(pydase.DataService):
|
||||
def __init__(self, **kwargs: Any) -> None:
|
||||
super().__init__(**kwargs)
|
||||
self.subservice = SubService()
|
||||
self.some_unit: u.Quantity = 1.2 * u.units.A
|
||||
self.some_float = 1.0
|
||||
@ -33,7 +33,6 @@ class Service(pydase.DataService):
|
||||
self._property_attr = 1337.0
|
||||
self._name = "Service"
|
||||
self.state = State.RUNNING
|
||||
super().__init__(**kwargs)
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
@ -152,7 +151,7 @@ def test_load_state(tmp_path: Path, caplog: LogCaptureFixture):
|
||||
assert service.some_float == 1.0 # has not changed due to different type
|
||||
assert service.subservice.name == "SubService" # didn't change
|
||||
|
||||
assert "Service.some_unit changed to 12.0 A!" in caplog.text
|
||||
assert "'some_unit' changed to '12.0 A!'" in caplog.text
|
||||
assert (
|
||||
"Property 'name' has no '@load_state' decorator. "
|
||||
"Ignoring value from JSON file..." in caplog.text
|
||||
|
@ -1,8 +1,7 @@
|
||||
import logging
|
||||
|
||||
from pytest import LogCaptureFixture
|
||||
|
||||
import pydase
|
||||
from pytest import LogCaptureFixture
|
||||
|
||||
logger = logging.getLogger()
|
||||
|
||||
@ -10,11 +9,11 @@ logger = logging.getLogger()
|
||||
def test_autostart_task_callback(caplog: LogCaptureFixture) -> None:
|
||||
class MyService(pydase.DataService):
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
self._autostart_tasks = { # type: ignore
|
||||
"my_task": (),
|
||||
"my_other_task": (),
|
||||
}
|
||||
super().__init__()
|
||||
|
||||
async def my_task(self) -> None:
|
||||
logger.info("Triggered task.")
|
||||
@ -25,8 +24,8 @@ def test_autostart_task_callback(caplog: LogCaptureFixture) -> None:
|
||||
service = MyService()
|
||||
service._task_manager.start_autostart_tasks()
|
||||
|
||||
assert "MyService.my_task changed to {}" in caplog.text
|
||||
assert "MyService.my_other_task changed to {}" in caplog.text
|
||||
assert "'my_task' changed to '{}'" in caplog.text
|
||||
assert "'my_other_task' changed to '{}'" in caplog.text
|
||||
|
||||
|
||||
def test_DataService_subclass_autostart_task_callback(
|
||||
@ -34,11 +33,11 @@ def test_DataService_subclass_autostart_task_callback(
|
||||
) -> None:
|
||||
class MySubService(pydase.DataService):
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
self._autostart_tasks = { # type: ignore
|
||||
"my_task": (),
|
||||
"my_other_task": (),
|
||||
}
|
||||
super().__init__()
|
||||
|
||||
async def my_task(self) -> None:
|
||||
logger.info("Triggered task.")
|
||||
@ -52,8 +51,8 @@ def test_DataService_subclass_autostart_task_callback(
|
||||
service = MyService()
|
||||
service._task_manager.start_autostart_tasks()
|
||||
|
||||
assert "MyService.sub_service.my_task changed to {}" in caplog.text
|
||||
assert "MyService.sub_service.my_other_task changed to {}" in caplog.text
|
||||
assert "'sub_service.my_task' changed to '{}'" in caplog.text
|
||||
assert "'sub_service.my_other_task' changed to '{}'" in caplog.text
|
||||
|
||||
|
||||
def test_DataServiceList_subclass_autostart_task_callback(
|
||||
@ -61,11 +60,11 @@ def test_DataServiceList_subclass_autostart_task_callback(
|
||||
) -> None:
|
||||
class MySubService(pydase.DataService):
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
self._autostart_tasks = { # type: ignore
|
||||
"my_task": (),
|
||||
"my_other_task": (),
|
||||
}
|
||||
super().__init__()
|
||||
|
||||
async def my_task(self) -> None:
|
||||
logger.info("Triggered task.")
|
||||
@ -79,7 +78,7 @@ def test_DataServiceList_subclass_autostart_task_callback(
|
||||
service = MyService()
|
||||
service._task_manager.start_autostart_tasks()
|
||||
|
||||
assert "MyService.sub_services_list[0].my_task changed to {}" in caplog.text
|
||||
assert "MyService.sub_services_list[0].my_other_task changed to {}" in caplog.text
|
||||
assert "MyService.sub_services_list[1].my_task changed to {}" in caplog.text
|
||||
assert "MyService.sub_services_list[1].my_other_task changed to {}" in caplog.text
|
||||
assert "'sub_services_list[0].my_task' changed to '{}'" in caplog.text
|
||||
assert "'sub_services_list[0].my_other_task' changed to '{}'" in caplog.text
|
||||
assert "'sub_services_list[1].my_task' changed to '{}'" in caplog.text
|
||||
assert "'sub_services_list[1].my_other_task' changed to '{}'" in caplog.text
|
||||
|
Reference in New Issue
Block a user