mirror of
https://github.com/tiqi-group/pydase.git
synced 2025-12-19 12:41:19 +01:00
Compare commits
15 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
757dc9aa3c | ||
|
|
3d938562a6 | ||
|
|
964a62d4b4 | ||
|
|
99aa38fcfe | ||
|
|
5658514c8a | ||
|
|
109ee7d5e1 | ||
|
|
f4fa02fe11 | ||
|
|
487ef504a8 | ||
|
|
c98e407ed7 | ||
|
|
6b6ce1d43f | ||
|
|
e491ac7458 | ||
|
|
e9d8cbafc2 | ||
|
|
aa705592b2 | ||
|
|
008e1262bb | ||
|
|
91a71ad004 |
@@ -1,6 +1,6 @@
|
||||
[tool.poetry]
|
||||
name = "pydase"
|
||||
version = "0.10.2"
|
||||
version = "0.10.4"
|
||||
description = "A flexible and robust Python library for creating, managing, and interacting with data services, with built-in support for web and RPC servers, and customizable features for diverse use cases."
|
||||
authors = ["Mose Mueller <mosmuell@ethz.ch>"]
|
||||
readme = "README.md"
|
||||
|
||||
@@ -10,6 +10,7 @@ from pydase.observer_pattern.observable.observable import (
|
||||
)
|
||||
from pydase.utils.helpers import (
|
||||
get_class_and_instance_attributes,
|
||||
is_descriptor,
|
||||
is_property_attribute,
|
||||
)
|
||||
from pydase.utils.serialization.serializer import (
|
||||
@@ -68,7 +69,7 @@ class DataService(AbstractDataService):
|
||||
if not issubclass(
|
||||
value_class,
|
||||
(int | float | bool | str | list | dict | Enum | u.Quantity | Observable),
|
||||
):
|
||||
) and not is_descriptor(__value):
|
||||
logger.warning(
|
||||
"Class '%s' does not inherit from DataService. This may lead to"
|
||||
" unexpected behaviour!",
|
||||
|
||||
@@ -8,7 +8,10 @@ from pydase.observer_pattern.observable.observable_object import ObservableObjec
|
||||
from pydase.observer_pattern.observer.property_observer import (
|
||||
PropertyObserver,
|
||||
)
|
||||
from pydase.utils.helpers import get_object_attr_from_path
|
||||
from pydase.utils.helpers import (
|
||||
get_object_attr_from_path,
|
||||
normalize_full_access_path_string,
|
||||
)
|
||||
from pydase.utils.serialization.serializer import (
|
||||
SerializationPathError,
|
||||
SerializedObject,
|
||||
@@ -99,7 +102,8 @@ class DataServiceObserver(PropertyObserver):
|
||||
)
|
||||
|
||||
def _notify_dependent_property_changes(self, changed_attr_path: str) -> None:
|
||||
changed_props = self.property_deps_dict.get(changed_attr_path, [])
|
||||
normalized_attr_path = normalize_full_access_path_string(changed_attr_path)
|
||||
changed_props = self.property_deps_dict.get(normalized_attr_path, [])
|
||||
for prop in changed_props:
|
||||
# only notify about changing attribute if it is not currently being
|
||||
# "changed" e.g. when calling the getter of a property within another
|
||||
|
||||
@@ -22,12 +22,9 @@ class Observable(ObservableObject):
|
||||
- {"__annotations__"}
|
||||
}
|
||||
for name, value in class_attrs.items():
|
||||
if isinstance(value, property) or callable(value):
|
||||
continue
|
||||
if is_descriptor(value):
|
||||
# Descriptors have to be stored as a class variable in another class to
|
||||
# work properly. So don't make it an instance attribute.
|
||||
self._initialise_new_objects(name, value)
|
||||
if isinstance(value, property) or callable(value) or is_descriptor(value):
|
||||
# Properties, methods and descriptors have to be stored as class
|
||||
# attributes to work properly. So don't make it an instance attribute.
|
||||
continue
|
||||
self.__dict__[name] = self._initialise_new_objects(name, value)
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@ from typing import Any
|
||||
|
||||
from pydase.observer_pattern.observable.observable import Observable
|
||||
from pydase.observer_pattern.observer.observer import Observer
|
||||
from pydase.utils.helpers import is_descriptor
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -61,17 +62,27 @@ class PropertyObserver(Observer):
|
||||
self, obj: Observable, deps: dict[str, Any], prefix: str
|
||||
) -> None:
|
||||
for k, value in {**vars(type(obj)), **vars(obj)}.items():
|
||||
actual_value = value
|
||||
prefix = (
|
||||
f"{prefix}." if prefix != "" and not prefix.endswith(".") else prefix
|
||||
)
|
||||
parent_path = f"{prefix}{k}"
|
||||
if isinstance(value, Observable):
|
||||
|
||||
# Get value from descriptor
|
||||
if not isinstance(value, property) and is_descriptor(value):
|
||||
actual_value = getattr(obj, k)
|
||||
|
||||
if isinstance(actual_value, Observable):
|
||||
new_prefix = f"{parent_path}."
|
||||
deps.update(
|
||||
self._get_properties_and_their_dependencies(value, new_prefix)
|
||||
self._get_properties_and_their_dependencies(
|
||||
actual_value, new_prefix
|
||||
)
|
||||
)
|
||||
elif isinstance(value, list | dict):
|
||||
self._process_collection_item_properties(value, deps, parent_path)
|
||||
self._process_collection_item_properties(
|
||||
actual_value, deps, parent_path
|
||||
)
|
||||
|
||||
def _process_collection_item_properties(
|
||||
self,
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
import logging
|
||||
from collections.abc import Callable, Coroutine
|
||||
from typing import Any, TypeVar
|
||||
from typing import Any, Generic, TypeVar, overload
|
||||
|
||||
from pydase.data_service.data_service import DataService
|
||||
from pydase.task.task import Task
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -9,6 +10,69 @@ logger = logging.getLogger(__name__)
|
||||
R = TypeVar("R")
|
||||
|
||||
|
||||
class PerInstanceTaskDescriptor(Generic[R]):
|
||||
"""
|
||||
A descriptor class that provides a unique [`Task`][pydase.task.task.Task] object
|
||||
for each instance of a [`DataService`][pydase.data_service.data_service.DataService]
|
||||
class.
|
||||
|
||||
The `PerInstanceTaskDescriptor` is used to transform an asynchronous function into a
|
||||
task that is managed independently for each instance of a `DataService` subclass.
|
||||
This allows tasks to be initialized, started, and stopped on a per-instance basis,
|
||||
providing better control over task execution within the service.
|
||||
|
||||
The `PerInstanceTaskDescriptor` is not intended to be used directly. Instead, it is
|
||||
used internally by the `@task` decorator to manage task objects for each instance of
|
||||
the service class.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
func: Callable[[Any], Coroutine[None, None, R]]
|
||||
| Callable[[], Coroutine[None, None, R]],
|
||||
autostart: bool = False,
|
||||
) -> None:
|
||||
self.__func = func
|
||||
self.__autostart = autostart
|
||||
self.__task_instances: dict[object, Task[R]] = {}
|
||||
|
||||
def __set_name__(self, owner: type[DataService], name: str) -> None:
|
||||
"""Stores the name of the task within the owning class. This method is called
|
||||
automatically when the descriptor is assigned to a class attribute.
|
||||
"""
|
||||
|
||||
self.__task_name = name
|
||||
|
||||
@overload
|
||||
def __get__(
|
||||
self, instance: None, owner: type[DataService]
|
||||
) -> "PerInstanceTaskDescriptor[R]":
|
||||
"""Returns the descriptor itself when accessed through the class."""
|
||||
|
||||
@overload
|
||||
def __get__(self, instance: DataService, owner: type[DataService]) -> Task[R]:
|
||||
"""Returns the `Task` object associated with the specific `DataService`
|
||||
instance.
|
||||
If no task exists for the instance, a new `Task` object is created and stored
|
||||
in the `__task_instances` dictionary.
|
||||
"""
|
||||
|
||||
def __get__(
|
||||
self, instance: DataService | None, owner: type[DataService]
|
||||
) -> "Task[R] | PerInstanceTaskDescriptor[R]":
|
||||
if instance is None:
|
||||
return self
|
||||
|
||||
# Create a new Task object for this instance, using the function's name.
|
||||
if instance not in self.__task_instances:
|
||||
self.__task_instances[instance] = instance._initialise_new_objects(
|
||||
self.__task_name,
|
||||
Task(self.__func.__get__(instance, owner), autostart=self.__autostart),
|
||||
)
|
||||
|
||||
return self.__task_instances[instance]
|
||||
|
||||
|
||||
def task(
|
||||
*, autostart: bool = False
|
||||
) -> Callable[
|
||||
@@ -16,18 +80,22 @@ def task(
|
||||
Callable[[Any], Coroutine[None, None, R]]
|
||||
| Callable[[], Coroutine[None, None, R]]
|
||||
],
|
||||
Task[R],
|
||||
PerInstanceTaskDescriptor[R],
|
||||
]:
|
||||
"""
|
||||
A decorator to define a function as a task within a
|
||||
A decorator to define an asynchronous function as a per-instance task within a
|
||||
[`DataService`][pydase.DataService] class.
|
||||
|
||||
This decorator transforms an asynchronous function into a
|
||||
[`Task`][pydase.task.task.Task] object. The `Task` object provides methods like
|
||||
`start()` and `stop()` to control the execution of the task.
|
||||
[`Task`][pydase.task.task.Task] object that is unique to each instance of the
|
||||
`DataService` class. The resulting `Task` object provides methods like `start()`
|
||||
and `stop()` to control the execution of the task, and manages the task's lifecycle
|
||||
independently for each instance of the service.
|
||||
|
||||
Tasks are typically used to perform periodic or recurring jobs, such as reading
|
||||
sensor data, updating databases, or other operations that need to be repeated over
|
||||
The decorator is particularly useful for defining tasks that need to run
|
||||
periodically or perform asynchronous operations, such as polling data sources,
|
||||
updating databases, or any recurring job that should be managed within the context
|
||||
of a `DataService`.
|
||||
time.
|
||||
|
||||
Args:
|
||||
@@ -36,8 +104,10 @@ def task(
|
||||
initialized. Defaults to False.
|
||||
|
||||
Returns:
|
||||
A decorator that converts an asynchronous function into a
|
||||
[`Task`][pydase.task.task.Task] object.
|
||||
A decorator that wraps an asynchronous function in a
|
||||
[`PerInstanceTaskDescriptor`][pydase.task.decorator.PerInstanceTaskDescriptor]
|
||||
object, which, when accessed, provides an instance-specific
|
||||
[`Task`][pydase.task.task.Task] object.
|
||||
|
||||
Example:
|
||||
```python
|
||||
@@ -69,7 +139,7 @@ def task(
|
||||
def decorator(
|
||||
func: Callable[[Any], Coroutine[None, None, R]]
|
||||
| Callable[[], Coroutine[None, None, R]],
|
||||
) -> Task[R]:
|
||||
return Task(func, autostart=autostart)
|
||||
) -> PerInstanceTaskDescriptor[R]:
|
||||
return PerInstanceTaskDescriptor(func, autostart=autostart)
|
||||
|
||||
return decorator
|
||||
|
||||
@@ -1,24 +1,14 @@
|
||||
import asyncio
|
||||
import inspect
|
||||
import logging
|
||||
import sys
|
||||
from collections.abc import Callable, Coroutine
|
||||
from typing import (
|
||||
Any,
|
||||
Generic,
|
||||
TypeVar,
|
||||
)
|
||||
|
||||
from typing_extensions import TypeIs
|
||||
|
||||
from pydase.task.task_status import TaskStatus
|
||||
|
||||
if sys.version_info < (3, 11):
|
||||
from typing_extensions import Self
|
||||
else:
|
||||
from typing import Self
|
||||
|
||||
import pydase.data_service.data_service
|
||||
from pydase.task.task_status import TaskStatus
|
||||
from pydase.utils.helpers import current_event_loop_exists
|
||||
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
@@ -27,17 +17,8 @@ logger = logging.getLogger(__name__)
|
||||
R = TypeVar("R")
|
||||
|
||||
|
||||
def is_bound_method(
|
||||
method: Callable[[], Coroutine[None, None, R | None]]
|
||||
| Callable[[Any], Coroutine[None, None, R | None]],
|
||||
) -> TypeIs[Callable[[], Coroutine[None, None, R | None]]]:
|
||||
"""Check if instance method is bound to an object."""
|
||||
return inspect.ismethod(method)
|
||||
|
||||
|
||||
class Task(pydase.data_service.data_service.DataService, Generic[R]):
|
||||
"""
|
||||
A class representing a task within the `pydase` framework.
|
||||
"""A class representing a task within the `pydase` framework.
|
||||
|
||||
The `Task` class wraps an asynchronous function and provides methods to manage its
|
||||
lifecycle, such as `start()` and `stop()`. It is typically used to perform periodic
|
||||
@@ -85,25 +66,24 @@ class Task(pydase.data_service.data_service.DataService, Generic[R]):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
func: Callable[[Any], Coroutine[None, None, R | None]]
|
||||
| Callable[[], Coroutine[None, None, R | None]],
|
||||
func: Callable[[], Coroutine[None, None, R | None]],
|
||||
*,
|
||||
autostart: bool = False,
|
||||
) -> None:
|
||||
super().__init__()
|
||||
self._autostart = autostart
|
||||
self._func_name = func.__name__
|
||||
self._bound_func: Callable[[], Coroutine[None, None, R | None]] | None = None
|
||||
self._set_up = False
|
||||
if is_bound_method(func):
|
||||
self._func = func
|
||||
self._bound_func = func
|
||||
else:
|
||||
self._func = func
|
||||
self._func = func
|
||||
self._task: asyncio.Task[R | None] | None = None
|
||||
self._status = TaskStatus.NOT_RUNNING
|
||||
self._result: R | None = None
|
||||
|
||||
if not current_event_loop_exists():
|
||||
self._loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(self._loop)
|
||||
else:
|
||||
self._loop = asyncio.get_event_loop()
|
||||
|
||||
@property
|
||||
def autostart(self) -> bool:
|
||||
"""Defines if the task should be started automatically when the
|
||||
@@ -144,10 +124,10 @@ class Task(pydase.data_service.data_service.DataService, Generic[R]):
|
||||
self._result = task.result()
|
||||
|
||||
async def run_task() -> R | None:
|
||||
if inspect.iscoroutinefunction(self._bound_func):
|
||||
if inspect.iscoroutinefunction(self._func):
|
||||
logger.info("Starting task %r", self._func_name)
|
||||
self._status = TaskStatus.RUNNING
|
||||
res: Coroutine[None, None, R] = self._bound_func()
|
||||
res: Coroutine[None, None, R | None] = self._func()
|
||||
try:
|
||||
return await res
|
||||
except asyncio.CancelledError:
|
||||
@@ -167,24 +147,3 @@ class Task(pydase.data_service.data_service.DataService, Generic[R]):
|
||||
|
||||
if self._task:
|
||||
self._task.cancel()
|
||||
|
||||
def __get__(self, instance: Any, owner: Any) -> Self:
|
||||
"""Descriptor method used to correctly set up the task.
|
||||
|
||||
This descriptor method is called by the class instance containing the task.
|
||||
It binds the task function to that class instance.
|
||||
|
||||
Since the `__init__` function is called when a function is decorated with
|
||||
[`@task`][pydase.task.decorator.task], some setup is delayed until this
|
||||
descriptor function is called.
|
||||
"""
|
||||
|
||||
if instance and not self._set_up:
|
||||
if not current_event_loop_exists():
|
||||
self._loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(self._loop)
|
||||
else:
|
||||
self._loop = asyncio.get_event_loop()
|
||||
self._bound_func = self._func.__get__(instance, owner)
|
||||
self._set_up = True
|
||||
return self
|
||||
|
||||
@@ -204,6 +204,17 @@ def function_has_arguments(func: Callable[..., Any]) -> bool:
|
||||
|
||||
def is_descriptor(obj: object) -> bool:
|
||||
"""Check if an object is a descriptor."""
|
||||
|
||||
# Exclude functions, methods, builtins and properties
|
||||
if (
|
||||
inspect.isfunction(obj)
|
||||
or inspect.ismethod(obj)
|
||||
or inspect.isbuiltin(obj)
|
||||
or isinstance(obj, property)
|
||||
):
|
||||
return False
|
||||
|
||||
# Check if it has any descriptor methods
|
||||
return any(hasattr(obj, method) for method in ("__get__", "__set__", "__delete__"))
|
||||
|
||||
|
||||
@@ -212,3 +223,25 @@ def current_event_loop_exists() -> bool:
|
||||
import asyncio
|
||||
|
||||
return asyncio.get_event_loop_policy()._local._loop is not None # type: ignore
|
||||
|
||||
|
||||
def normalize_full_access_path_string(s: str) -> str:
|
||||
"""Normalizes a string representing a full access path by converting double quotes
|
||||
to single quotes.
|
||||
|
||||
This function is useful for ensuring consistency in strings that represent access
|
||||
paths containing dictionary keys, by replacing all double quotes (`"`) with single
|
||||
quotes (`'`).
|
||||
|
||||
Args:
|
||||
s (str): The input string to be normalized.
|
||||
|
||||
Returns:
|
||||
A new string with all double quotes replaced by single quotes.
|
||||
|
||||
Example:
|
||||
>>> normalize_full_access_path_string('dictionary["first"].my_task')
|
||||
"dictionary['first'].my_task"
|
||||
"""
|
||||
|
||||
return s.replace('"', "'")
|
||||
|
||||
@@ -5,7 +5,7 @@ import pydase
|
||||
import pytest
|
||||
from pydase.data_service.data_service_observer import DataServiceObserver
|
||||
from pydase.data_service.state_manager import StateManager
|
||||
from pydase.utils.serialization.serializer import SerializationError
|
||||
from pydase.utils.serialization.serializer import SerializationError, dump
|
||||
|
||||
logger = logging.getLogger()
|
||||
|
||||
@@ -146,3 +146,41 @@ def test_private_attribute_does_not_have_to_be_serializable() -> None:
|
||||
service_instance.change_publ_attr()
|
||||
|
||||
service_instance.change_priv_attr()
|
||||
|
||||
|
||||
def test_normalized_attr_path_in_dependent_property_changes(
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
) -> None:
|
||||
class SubService(pydase.DataService):
|
||||
_prop = 10.0
|
||||
|
||||
@property
|
||||
def prop(self) -> float:
|
||||
return self._prop
|
||||
|
||||
class MyService(pydase.DataService):
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
self.service_dict = {"one": SubService()}
|
||||
|
||||
service_instance = MyService()
|
||||
state_manager = StateManager(service=service_instance)
|
||||
observer = DataServiceObserver(state_manager=state_manager)
|
||||
|
||||
assert observer.property_deps_dict["service_dict['one']._prop"] == [
|
||||
"service_dict['one'].prop"
|
||||
]
|
||||
|
||||
# We can use dict key path encoded with double quotes
|
||||
state_manager.set_service_attribute_value_by_path(
|
||||
'service_dict["one"]._prop', dump(11.0)
|
||||
)
|
||||
assert service_instance.service_dict["one"].prop == 11.0
|
||||
assert "'service_dict[\"one\"].prop' changed to '11.0'" in caplog.text
|
||||
|
||||
# We can use dict key path encoded with single quotes
|
||||
state_manager.set_service_attribute_value_by_path(
|
||||
"service_dict['one']._prop", dump(12.0)
|
||||
)
|
||||
assert service_instance.service_dict["one"].prop == 12.0
|
||||
assert "'service_dict[\"one\"].prop' changed to '12.0'" in caplog.text
|
||||
|
||||
@@ -138,3 +138,154 @@ async def test_nested_dict_autostart_task(
|
||||
"'sub_services_dict[\"second\"].my_task.status' changed to 'TaskStatus.RUNNING'"
|
||||
in caplog.text
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio(scope="function")
|
||||
async def test_manual_start_with_multiple_service_instances(
|
||||
caplog: LogCaptureFixture,
|
||||
) -> None:
|
||||
class MySubService(pydase.DataService):
|
||||
@task()
|
||||
async def my_task(self) -> None:
|
||||
logger.info("Triggered task.")
|
||||
while True:
|
||||
await asyncio.sleep(1)
|
||||
|
||||
class MyService(pydase.DataService):
|
||||
sub_services_list = [MySubService() for i in range(2)]
|
||||
sub_services_dict = {"first": MySubService(), "second": MySubService()}
|
||||
|
||||
service_instance = MyService()
|
||||
state_manager = StateManager(service_instance)
|
||||
DataServiceObserver(state_manager)
|
||||
|
||||
autostart_service_tasks(service_instance)
|
||||
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
assert (
|
||||
service_instance.sub_services_list[0].my_task.status == TaskStatus.NOT_RUNNING
|
||||
)
|
||||
assert (
|
||||
service_instance.sub_services_list[1].my_task.status == TaskStatus.NOT_RUNNING
|
||||
)
|
||||
assert (
|
||||
service_instance.sub_services_dict["first"].my_task.status
|
||||
== TaskStatus.NOT_RUNNING
|
||||
)
|
||||
assert (
|
||||
service_instance.sub_services_dict["second"].my_task.status
|
||||
== TaskStatus.NOT_RUNNING
|
||||
)
|
||||
|
||||
service_instance.sub_services_list[0].my_task.start()
|
||||
await asyncio.sleep(0.01)
|
||||
|
||||
assert service_instance.sub_services_list[0].my_task.status == TaskStatus.RUNNING
|
||||
assert (
|
||||
"'sub_services_list[0].my_task.status' changed to 'TaskStatus.RUNNING'"
|
||||
in caplog.text
|
||||
)
|
||||
assert (
|
||||
"'sub_services_list[1].my_task.status' changed to 'TaskStatus.RUNNING'"
|
||||
not in caplog.text
|
||||
)
|
||||
assert (
|
||||
"'sub_services_dict[\"first\"].my_task.status' changed to 'TaskStatus.RUNNING'"
|
||||
not in caplog.text
|
||||
)
|
||||
assert (
|
||||
"'sub_services_dict[\"second\"].my_task.status' changed to 'TaskStatus.RUNNING'"
|
||||
not in caplog.text
|
||||
)
|
||||
|
||||
service_instance.sub_services_list[0].my_task.stop()
|
||||
await asyncio.sleep(0.01)
|
||||
|
||||
assert "Task 'my_task' was cancelled" in caplog.text
|
||||
caplog.clear()
|
||||
|
||||
service_instance.sub_services_list[1].my_task.start()
|
||||
await asyncio.sleep(0.01)
|
||||
|
||||
assert service_instance.sub_services_list[1].my_task.status == TaskStatus.RUNNING
|
||||
assert (
|
||||
"'sub_services_list[0].my_task.status' changed to 'TaskStatus.RUNNING'"
|
||||
not in caplog.text
|
||||
)
|
||||
assert (
|
||||
"'sub_services_list[1].my_task.status' changed to 'TaskStatus.RUNNING'"
|
||||
in caplog.text
|
||||
)
|
||||
assert (
|
||||
"'sub_services_dict[\"first\"].my_task.status' changed to 'TaskStatus.RUNNING'"
|
||||
not in caplog.text
|
||||
)
|
||||
assert (
|
||||
"'sub_services_dict[\"second\"].my_task.status' changed to 'TaskStatus.RUNNING'"
|
||||
not in caplog.text
|
||||
)
|
||||
|
||||
service_instance.sub_services_list[1].my_task.stop()
|
||||
await asyncio.sleep(0.01)
|
||||
|
||||
assert "Task 'my_task' was cancelled" in caplog.text
|
||||
caplog.clear()
|
||||
|
||||
service_instance.sub_services_dict["first"].my_task.start()
|
||||
await asyncio.sleep(0.01)
|
||||
|
||||
assert (
|
||||
service_instance.sub_services_dict["first"].my_task.status == TaskStatus.RUNNING
|
||||
)
|
||||
assert (
|
||||
"'sub_services_list[0].my_task.status' changed to 'TaskStatus.RUNNING'"
|
||||
not in caplog.text
|
||||
)
|
||||
assert (
|
||||
"'sub_services_list[1].my_task.status' changed to 'TaskStatus.RUNNING'"
|
||||
not in caplog.text
|
||||
)
|
||||
assert (
|
||||
"'sub_services_dict[\"first\"].my_task.status' changed to 'TaskStatus.RUNNING'"
|
||||
in caplog.text
|
||||
)
|
||||
assert (
|
||||
"'sub_services_dict[\"second\"].my_task.status' changed to 'TaskStatus.RUNNING'"
|
||||
not in caplog.text
|
||||
)
|
||||
|
||||
service_instance.sub_services_dict["first"].my_task.stop()
|
||||
await asyncio.sleep(0.01)
|
||||
|
||||
assert "Task 'my_task' was cancelled" in caplog.text
|
||||
caplog.clear()
|
||||
|
||||
service_instance.sub_services_dict["second"].my_task.start()
|
||||
await asyncio.sleep(0.01)
|
||||
|
||||
assert (
|
||||
service_instance.sub_services_dict["second"].my_task.status
|
||||
== TaskStatus.RUNNING
|
||||
)
|
||||
assert (
|
||||
"'sub_services_list[0].my_task.status' changed to 'TaskStatus.RUNNING'"
|
||||
not in caplog.text
|
||||
)
|
||||
assert (
|
||||
"'sub_services_list[1].my_task.status' changed to 'TaskStatus.RUNNING'"
|
||||
not in caplog.text
|
||||
)
|
||||
assert (
|
||||
"'sub_services_dict[\"first\"].my_task.status' changed to 'TaskStatus.RUNNING'"
|
||||
not in caplog.text
|
||||
)
|
||||
assert (
|
||||
"'sub_services_dict[\"second\"].my_task.status' changed to 'TaskStatus.RUNNING'"
|
||||
in caplog.text
|
||||
)
|
||||
|
||||
service_instance.sub_services_dict["second"].my_task.stop()
|
||||
await asyncio.sleep(0.01)
|
||||
|
||||
assert "Task 'my_task' was cancelled" in caplog.text
|
||||
|
||||
Reference in New Issue
Block a user