fixes or removes task-related tests

This commit is contained in:
Mose Müller 2024-09-13 12:59:37 +02:00
parent 71adc8bea2
commit 2e31ebb7d9
4 changed files with 9 additions and 188 deletions

View File

@ -36,8 +36,7 @@ def test_unexpected_type_change_warning(caplog: LogCaptureFixture) -> None:
def test_basic_inheritance_warning(caplog: LogCaptureFixture) -> None:
class SubService(DataService):
...
class SubService(DataService): ...
class SomeEnum(Enum):
HI = 0
@ -57,11 +56,9 @@ def test_basic_inheritance_warning(caplog: LogCaptureFixture) -> None:
def name(self) -> str:
return self._name
def some_method(self) -> None:
...
def some_method(self) -> None: ...
async def some_task(self) -> None:
...
async def some_task(self) -> None: ...
ServiceClass()
@ -129,17 +126,12 @@ def test_exposing_methods(caplog: LogCaptureFixture) -> None:
return "some method"
class ClassWithTask(pydase.DataService):
async def some_task(self, sleep_time: int) -> None:
pass
@frontend
def some_method(self) -> str:
return "some method"
ClassWithTask()
assert (
"Async function 'some_task' is defined with at least one argument. If you want "
"to use it as a task, remove the argument(s) from the function definition."
in caplog.text
)
def test_dynamically_added_attribute(caplog: LogCaptureFixture) -> None:
class MyService(DataService):

View File

@ -1,7 +1,6 @@
import logging
import pydase
import pytest
from pydase.data_service.data_service_observer import DataServiceObserver
from pydase.data_service.state_manager import StateManager
@ -33,35 +32,3 @@ def test_nested_attributes_cache_callback() -> None:
]
== "Ciao"
)
@pytest.mark.asyncio(scope="function")
async def test_task_status_update() -> None:
class ServiceClass(pydase.DataService):
name = "World"
async def my_method(self) -> None:
pass
service_instance = ServiceClass()
state_manager = StateManager(service_instance)
DataServiceObserver(state_manager)
assert (
state_manager.cache_manager.get_value_dict_from_cache("my_method")["type"]
== "method"
)
assert (
state_manager.cache_manager.get_value_dict_from_cache("my_method")["value"]
is None
)
service_instance.start_my_method() # type: ignore
assert (
state_manager.cache_manager.get_value_dict_from_cache("my_method")["type"]
== "method"
)
assert (
state_manager.cache_manager.get_value_dict_from_cache("my_method")["value"]
== "RUNNING"
)

View File

@ -1,135 +0,0 @@
import asyncio
import logging
import pydase
import pytest
from pydase.data_service.data_service_observer import DataServiceObserver
from pydase.data_service.state_manager import StateManager
from pytest import LogCaptureFixture
logger = logging.getLogger("pydase")
@pytest.mark.asyncio(scope="function")
async def test_autostart_task_callback(caplog: LogCaptureFixture) -> None:
class MyService(pydase.DataService):
def __init__(self) -> None:
super().__init__()
self._autostart_tasks = { # type: ignore
"my_task": (), # type: ignore
"my_other_task": (), # type: ignore
}
async def my_task(self) -> None:
logger.info("Triggered task.")
async def my_other_task(self) -> None:
logger.info("Triggered other task.")
# Your test code here
service_instance = MyService()
state_manager = StateManager(service_instance)
DataServiceObserver(state_manager)
service_instance._task_manager.start_autostart_tasks()
assert "'my_task' changed to 'TaskStatus.RUNNING'" in caplog.text
assert "'my_other_task' changed to 'TaskStatus.RUNNING'" in caplog.text
@pytest.mark.asyncio(scope="function")
async def test_DataService_subclass_autostart_task_callback(
caplog: LogCaptureFixture,
) -> None:
class MySubService(pydase.DataService):
def __init__(self) -> None:
super().__init__()
self._autostart_tasks = { # type: ignore
"my_task": (),
"my_other_task": (),
}
async def my_task(self) -> None:
logger.info("Triggered task.")
async def my_other_task(self) -> None:
logger.info("Triggered other task.")
class MyService(pydase.DataService):
sub_service = MySubService()
service_instance = MyService()
state_manager = StateManager(service_instance)
DataServiceObserver(state_manager)
service_instance._task_manager.start_autostart_tasks()
assert "'sub_service.my_task' changed to 'TaskStatus.RUNNING'" in caplog.text
assert "'sub_service.my_other_task' changed to 'TaskStatus.RUNNING'" in caplog.text
@pytest.mark.asyncio(scope="function")
async def test_DataService_subclass_list_autostart_task_callback(
caplog: LogCaptureFixture,
) -> None:
class MySubService(pydase.DataService):
def __init__(self) -> None:
super().__init__()
self._autostart_tasks = { # type: ignore
"my_task": (),
"my_other_task": (),
}
async def my_task(self) -> None:
logger.info("Triggered task.")
async def my_other_task(self) -> None:
logger.info("Triggered other task.")
class MyService(pydase.DataService):
sub_services_list = [MySubService() for i in range(2)]
service_instance = MyService()
state_manager = StateManager(service_instance)
DataServiceObserver(state_manager)
service_instance._task_manager.start_autostart_tasks()
assert (
"'sub_services_list[0].my_task' changed to 'TaskStatus.RUNNING'" in caplog.text
)
assert (
"'sub_services_list[0].my_other_task' changed to 'TaskStatus.RUNNING'"
in caplog.text
)
assert (
"'sub_services_list[1].my_task' changed to 'TaskStatus.RUNNING'" in caplog.text
)
assert (
"'sub_services_list[1].my_other_task' changed to 'TaskStatus.RUNNING'"
in caplog.text
)
@pytest.mark.asyncio(scope="function")
async def test_start_and_stop_task_methods(caplog: LogCaptureFixture) -> None:
class MyService(pydase.DataService):
def __init__(self) -> None:
super().__init__()
async def my_task(self) -> None:
while True:
logger.debug("Logging message")
await asyncio.sleep(0.1)
# Your test code here
service_instance = MyService()
state_manager = StateManager(service_instance)
DataServiceObserver(state_manager)
service_instance.start_my_task()
await asyncio.sleep(0.01)
assert "'my_task' changed to 'TaskStatus.RUNNING'" in caplog.text
assert "Logging message" in caplog.text
caplog.clear()
service_instance.stop_my_task()
await asyncio.sleep(0.01)
assert "Task 'my_task' was cancelled" in caplog.text

View File

@ -1,4 +1,3 @@
import asyncio
import enum
from datetime import datetime
from enum import Enum
@ -8,7 +7,7 @@ import pydase
import pydase.units as u
import pytest
from pydase.components.coloured_enum import ColouredEnum
from pydase.data_service.task_manager import TaskStatus
from pydase.task.task_status import TaskStatus
from pydase.utils.decorators import frontend
from pydase.utils.serialization.serializer import (
SerializationPathError,
@ -214,11 +213,9 @@ async def test_method_serialization() -> None:
return "some method"
async def some_task(self) -> None:
while True:
await asyncio.sleep(10)
pass
instance = ClassWithMethod()
instance.start_some_task() # type: ignore
assert dump(instance)["value"] == {
"some_method": {
@ -234,7 +231,7 @@ async def test_method_serialization() -> None:
"some_task": {
"full_access_path": "some_task",
"type": "method",
"value": TaskStatus.RUNNING.name,
"value": None,
"readonly": True,
"doc": None,
"async": True,