Merge pull request #76 from tiqi-group/72-support-for-dynamic-attribute-handling-and-collection-management

72 support for dynamic attribute handling and collection management
This commit is contained in:
Mose Müller 2023-12-06 09:05:54 +01:00 committed by GitHub
commit 0e47f6c4d3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
47 changed files with 1576 additions and 1107 deletions

View File

@ -0,0 +1,27 @@
# Observer Pattern Implementation in Pydase
## Overview
The Observer Pattern is a fundamental design pattern in the `pydase` package, serving as the central communication mechanism for state updates to clients connected to a service.
## How it Works
### The Observable Class
The `Observable` class is at the core of the pattern. It maintains a list of observers and is responsible for notifying them about state changes. It does so by overriding the following methods:
- `__setattr__`: This function emits a notification before and after a new value is set. These two notifications are important to track which attributes are being set to avoid endless recursion (e.g. when accessing a property within another property). Moreover, when setting an attribute to another observable, the former class will add itself as an observer to the latter class, ensuring that nested classes are properly observed.
- `__getattribute__`: This function notifies the observers when a property getter is called, allowing for monitoring state changes in remote devices, as opposed to local instance attributes.
### Custom Collection Classes
To handle collections (like lists and dictionaries), the `Observable` class converts them into custom collection classes `_ObservableList` and `_ObservableDict` that notify observers of any changes in their state. For this, they have to override the methods changing the state, e.g., `__setitem__` or `append` for lists.
### The Observer Class
The `Observer` is the final element in the chain of observers. The notifications of attribute changes it receives include the full access path (in dot-notation) and the new value. It implements logic to handle state changes, like caching, error logging for type changes, etc. This can be extended by custom notification callbacks (implemented using `add_notification_callback` in `DataServiceObserver`). This enables the user to perform specific actions in response to changes. In `pydase`, the web server adds an additional notification callback that emits the websocket events (`sio_callback`).
Furthermore, the `DataServiceObserver` implements logic to reload the values of properties when an attribute change occurs that a property depends on.
- **Dynamic Inspection**: The observer dynamically inspects the observable object (recursively) to create a mapping of properties and their dependencies. This mapping is constructed based on the class or instance attributes used within the source code of the property getters.
- **Dependency Management**: When a change in an attribute occurs, `DataServiceObserver` updates any properties that depend on this attribute. This ensures that the overall state remains consistent and up-to-date, especially in complex scenarios where properties depend on other instance attribute or properties.

View File

@ -18,12 +18,11 @@ type Action =
| { type: 'SET_DATA'; data: State }
| {
type: 'UPDATE_ATTRIBUTE';
parentPath: string;
name: string;
value: SerializedValue;
fullAccessPath: string;
newValue: SerializedValue;
};
type UpdateMessage = {
data: { parent_path: string; name: string; value: SerializedValue };
data: { full_access_path: string; value: SerializedValue };
};
type LogMessage = {
levelname: LevelName;
@ -35,10 +34,7 @@ const reducer = (state: State, action: Action): State => {
case 'SET_DATA':
return action.data;
case 'UPDATE_ATTRIBUTE': {
const pathList = action.parentPath.split('.').slice(1).concat(action.name);
const joinedPath = pathList.join('.');
return setNestedValueByPath(state, joinedPath, action.value);
return setNestedValueByPath(state, action.fullAccessPath, action.newValue);
}
default:
throw new Error();
@ -129,14 +125,13 @@ const App = () => {
function onNotify(value: UpdateMessage) {
// Extracting data from the notification
const { parent_path: parentPath, name, value: newValue } = value.data;
const { full_access_path: fullAccessPath, value: newValue } = value.data;
// Dispatching the update to the reducer
dispatch({
type: 'UPDATE_ATTRIBUTE',
parentPath,
name,
value: newValue
fullAccessPath,
newValue
});
}

View File

@ -123,7 +123,7 @@ export const NumberComponent = React.memo((props: NumberComponentProps) => {
// Whether to show the name infront of the component (false if used with a slider)
const showName = props.showName !== undefined ? props.showName : true;
// If emitUpdate is passed, use this instead of the emit_update from the socket
// If emitUpdate is passed, use this instead of the setAttribute from the socket
// Also used when used with a slider
const emitUpdate =
props.customEmitUpdate !== undefined ? props.customEmitUpdate : setAttribute;

View File

@ -10,6 +10,7 @@ nav:
- Developer Guide: dev-guide/README.md
- API Reference: dev-guide/api.md
- Adding Components: dev-guide/Adding_Components.md
- Observer Pattern Implementation: dev-guide/Observer_Pattern_Implementation.md # <-- New section
- About:
- Release Notes: about/release-notes.md
- Contributing: about/contributing.md

View File

@ -46,6 +46,7 @@ class NumberSlider(DataService):
step_size: float = 1.0,
type_: Literal["int", "float"] = "float",
) -> None:
super().__init__()
if type_ not in {"float", "int"}:
logger.error("Unknown type '%s'. Using 'float'.", type_)
type_ = "float"
@ -56,8 +57,6 @@ class NumberSlider(DataService):
self.min = min_
self.max = max_
super().__init__()
def __setattr__(self, name: str, value: Any) -> None:
if name in ["value", "step_size"]:
value = int(value) if self._type == "int" else float(value)

View File

@ -1,16 +1,15 @@
from __future__ import annotations
from abc import ABC
from typing import TYPE_CHECKING, Any
from pydase.observer_pattern.observable.observable import Observable
if TYPE_CHECKING:
from pydase.data_service.callback_manager import CallbackManager
from pydase.data_service.data_service import DataService
from pydase.data_service.task_manager import TaskManager
class AbstractDataService(ABC):
class AbstractDataService(Observable):
__root__: DataService
_task_manager: TaskManager
_callback_manager: CallbackManager
_autostart_tasks: dict[str, tuple[Any]]

View File

@ -1,442 +0,0 @@
from __future__ import annotations
import inspect
import logging
from typing import TYPE_CHECKING, Any, ClassVar
from pydase.data_service.abstract_data_service import AbstractDataService
from pydase.utils.helpers import get_class_and_instance_attributes
from .data_service_list import DataServiceList
if TYPE_CHECKING:
from collections.abc import Callable
from .data_service import DataService
logger = logging.getLogger(__name__)
class CallbackManager:
_notification_callbacks: ClassVar[list[Callable[[str, str, Any], Any]]] = []
"""
A list of callback functions that are executed when a change occurs in the
DataService instance. These functions are intended to handle or respond to these
changes in some way, such as emitting a socket.io message to the frontend.
Each function in this list should be a callable that accepts three parameters:
- parent_path (str): The path to the parent of the attribute that was changed.
- name (str): The name of the attribute that was changed.
- value (Any): The new value of the attribute.
A callback function can be added to this list using the add_notification_callback
method. Whenever a change in the DataService instance occurs (or in its nested
DataService or DataServiceList instances), the emit_notification method is invoked,
which in turn calls all the callback functions in _notification_callbacks with the
appropriate arguments.
This implementation follows the observer pattern, with the DataService instance as
the "subject" and the callback functions as the "observers".
"""
_list_mapping: ClassVar[dict[int, DataServiceList]] = {}
"""
A dictionary mapping the id of the original lists to the corresponding
DataServiceList instances.
This is used to ensure that all references to the same list within the DataService
object point to the same DataServiceList, so that any modifications to that list can
be tracked consistently. The keys of the dictionary are the ids of the original
lists, and the values are the DataServiceList instances that wrap these lists.
"""
def __init__(self, service: DataService) -> None:
self.callbacks: set[Callable[[str, Any], None]] = set()
self.service = service
def _register_list_change_callbacks( # noqa: C901
self, obj: AbstractDataService, parent_path: str
) -> None:
"""
This method ensures that notifications are emitted whenever a public list
attribute of a DataService instance changes. These notifications pertain solely
to the list item changes, not to changes in attributes of objects within the
list.
The method works by converting all list attributes (both at the class and
instance levels) into DataServiceList objects. Each DataServiceList is then
assigned a callback that is triggered whenever an item in the list is updated.
The callback emits a notification, but only if the DataService instance was the
root instance when the callback was registered.
This method operates recursively, processing the input object and all nested
attributes that are instances of DataService. While navigating the structure,
it constructs a path for each attribute that traces back to the root. This path
is included in any emitted notifications to facilitate identification of the
source of a change.
Parameters:
-----------
obj: DataService
The target object to be processed. All list attributes (and those of its
nested DataService attributes) will be converted into DataServiceList
objects.
parent_path: str
The access path for the parent object. Used to construct the full access
path for the notifications.
"""
# Convert all list attributes (both class and instance) to DataServiceList
attrs = get_class_and_instance_attributes(obj)
for attr_name, attr_value in attrs.items():
if isinstance(attr_value, AbstractDataService):
new_path = f"{parent_path}.{attr_name}"
self._register_list_change_callbacks(attr_value, new_path)
elif isinstance(attr_value, list):
# Create callback for current attr_name
# Default arguments solve the late binding problem by capturing the
# value at the time the lambda is defined, not when it is called. This
# prevents attr_name from being overwritten in the next loop iteration.
def callback(
index: int, value: Any, attr_name: str = attr_name
) -> None:
"""Emits a notification through the service's callback manager."""
# Skip private and protected lists
if (
self.service == self.service.__root__
and not attr_name.startswith("_")
):
self.service._callback_manager.emit_notification(
parent_path=parent_path,
name=f"{attr_name}[{index}]",
value=value,
)
# Check if attr_value is already a DataServiceList or in the mapping
if isinstance(attr_value, DataServiceList):
attr_value.add_callback(callback)
continue
if id(attr_value) in self._list_mapping:
# If the list `attr_value` was already referenced somewhere else
notifying_list = self._list_mapping[id(attr_value)]
notifying_list.add_callback(callback)
else:
# convert the builtin list into a DataServiceList and add the
# callback
notifying_list = DataServiceList(
attr_value, callback_list=[callback]
)
self._list_mapping[id(attr_value)] = notifying_list
setattr(obj, attr_name, notifying_list)
# recursively add callbacks to list attributes of DataService instances
for i, item in enumerate(attr_value):
if isinstance(item, AbstractDataService):
new_path = f"{parent_path}.{attr_name}[{i}]"
self._register_list_change_callbacks(item, new_path)
def _register_data_service_instance_callbacks(
self, obj: AbstractDataService, parent_path: str
) -> None:
"""
This function is a key part of the observer pattern implemented by the
DataService class.
Its purpose is to allow the system to automatically send out notifications
whenever an attribute of a DataService instance is updated, which is especially
useful when the DataService instance is part of a nested structure.
It works by recursively registering callbacks for a given DataService instance
and all of its nested attributes. Each callback is responsible for emitting a
notification when the attribute it is attached to is modified.
This function ensures that only the root DataService instance (the one directly
exposed to the user or another system via rpyc) emits notifications.
Each notification contains a 'parent_path' that traces the attribute's location
within the nested DataService structure, starting from the root. This makes it
easier for observers to determine exactly where a change has occurred.
Parameters:
-----------
obj: DataService
The target object on which callbacks are to be registered.
parent_path: str
The access path for the parent object. This is used to construct the full
access path for the notifications.
"""
# Create and register a callback for the object
# only emit the notification when the call was registered by the root object
def callback(attr_name: str, value: Any) -> None:
"""Emits a notification through the service's callback manager."""
# Skip private and protected attrs
# exlude proerty notifications -> those are handled in separate callbacks
if (
self.service == self.service.__root__
and not attr_name.startswith("_")
and not isinstance(getattr(type(obj), attr_name, None), property)
):
self.service._callback_manager.emit_notification(
parent_path=parent_path,
name=attr_name,
value=value,
)
obj._callback_manager.callbacks.add(callback)
# Recursively register callbacks for all nested attributes of the object
attrs = get_class_and_instance_attributes(obj)
for nested_attr_name, nested_attr in attrs.items():
if isinstance(nested_attr, DataServiceList):
self._register_list_callbacks(
nested_attr, parent_path, nested_attr_name
)
elif isinstance(nested_attr, AbstractDataService):
self._register_service_callbacks(
nested_attr, parent_path, nested_attr_name
)
def _register_list_callbacks(
self, nested_attr: list[Any], parent_path: str, attr_name: str
) -> None:
"""Handles registration of callbacks for list attributes"""
for i, list_item in enumerate(nested_attr):
if isinstance(list_item, AbstractDataService):
self._register_service_callbacks(
list_item, parent_path, f"{attr_name}[{i}]"
)
def _register_service_callbacks(
self, nested_attr: AbstractDataService, parent_path: str, attr_name: str
) -> None:
"""Handles registration of callbacks for DataService attributes"""
# as the DataService is an attribute of self, change the root object
# use the dictionary to not trigger callbacks on initialised objects
nested_attr.__dict__["__root__"] = self.service.__root__
new_path = f"{parent_path}.{attr_name}"
self._register_data_service_instance_callbacks(nested_attr, new_path)
def __register_recursive_parameter_callback(
self,
obj: AbstractDataService | DataServiceList,
callback: Callable[[str | int, Any], None],
) -> None:
"""
Register callback to a DataService or DataServiceList instance and its nested
instances.
For a DataService, this method traverses its attributes and recursively adds the
callback for nested DataService or DataServiceList instances. For a
DataServiceList,
the callback is also triggered when an item gets reassigned.
"""
if isinstance(obj, DataServiceList):
# emits callback when item in list gets reassigned
obj.add_callback(callback=callback)
obj_list: DataServiceList | list[AbstractDataService] = obj
else:
obj_list = [obj]
# this enables notifications when a class instance was changed (-> item is
# changed, not reassigned)
for item in obj_list:
if isinstance(item, AbstractDataService):
item._callback_manager.callbacks.add(callback)
for attr_name in set(dir(item)) - set(dir(object)) - {"__root__"}:
attr_value = getattr(item, attr_name)
if isinstance(attr_value, AbstractDataService | DataServiceList):
self.__register_recursive_parameter_callback(
attr_value, callback
)
def _register_property_callbacks( # noqa: C901
self,
obj: AbstractDataService,
parent_path: str,
) -> None:
"""
Register callbacks to notify when properties or their dependencies change.
This method cycles through all attributes (both class and instance level) of the
input `obj`. For each attribute that is a property, it identifies dependencies
used in the getter method and creates a callback for each one.
The method is recursive for attributes that are of type DataService or
DataServiceList. It attaches the callback directly to DataServiceList items or
propagates it through nested DataService instances.
"""
attrs = get_class_and_instance_attributes(obj)
for attr_name, attr_value in attrs.items():
if isinstance(attr_value, AbstractDataService):
self._register_property_callbacks(
attr_value, parent_path=f"{parent_path}.{attr_name}"
)
elif isinstance(attr_value, DataServiceList):
for i, item in enumerate(attr_value):
if isinstance(item, AbstractDataService):
self._register_property_callbacks(
item, parent_path=f"{parent_path}.{attr_name}[{i}]"
)
if isinstance(attr_value, property):
dependencies = attr_value.fget.__code__.co_names # type: ignore[union-attr]
source_code_string = inspect.getsource(attr_value.fget) # type: ignore[arg-type]
for dependency in dependencies:
# check if the dependencies are attributes of obj
# This doesn't have to be the case like, for example, here:
# >>> @property
# >>> def power(self) -> float:
# >>> return self.class_attr.voltage * self.current
#
# The dependencies for this property are:
# > ('class_attr', 'voltage', 'current')
if f"self.{dependency}" not in source_code_string:
continue
# use `obj` instead of `type(obj)` to get DataServiceList
# instead of list
dependency_value = getattr(obj, dependency)
if isinstance(
dependency_value, DataServiceList | AbstractDataService
):
def list_or_data_service_callback(
name: Any,
value: Any,
dependent_attr: str = attr_name,
) -> None:
"""Emits a notification through the service's callback
manager.
"""
if self.service == obj.__root__:
obj._callback_manager.emit_notification(
parent_path=parent_path,
name=dependent_attr,
value=getattr(obj, dependent_attr),
)
self.__register_recursive_parameter_callback(
dependency_value,
callback=list_or_data_service_callback,
)
else:
def callback(
name: str,
value: Any,
dependent_attr: str = attr_name,
dep: str = dependency,
) -> None:
"""Emits a notification through the service's callback
manager.
"""
if name == dep and self.service == obj.__root__:
obj._callback_manager.emit_notification(
parent_path=parent_path,
name=dependent_attr,
value=getattr(obj, dependent_attr),
)
# Add to callbacks
obj._callback_manager.callbacks.add(callback)
def _register_start_stop_task_callbacks( # noqa: C901
self, obj: AbstractDataService, parent_path: str
) -> None:
"""
This function registers callbacks for start and stop methods of async functions.
These callbacks are stored in the '_task_status_change_callbacks' attribute and
are called when the status of a task changes.
Parameters:
-----------
obj: AbstractDataService
The target object on which callbacks are to be registered.
parent_path: str
The access path for the parent object. This is used to construct the full
access path for the notifications.
"""
# Create and register a callback for the object
# only emit the notification when the call was registered by the root object
def task_status_change_callback(
name: str, task_status: dict[str, Any] | None
) -> None:
"""Emits a notification through the service's callback
manager.
"""
if self.service == obj.__root__ and not name.startswith("_"):
obj._callback_manager.emit_notification(
parent_path=parent_path,
name=name,
value=task_status,
)
obj._task_manager.task_status_change_callbacks.append(
task_status_change_callback
)
# Recursively register callbacks for all nested attributes of the object
attrs: dict[str, Any] = get_class_and_instance_attributes(obj)
for nested_attr_name, nested_attr in attrs.items():
if isinstance(nested_attr, DataServiceList):
for i, item in enumerate(nested_attr):
if isinstance(item, AbstractDataService):
self._register_start_stop_task_callbacks(
item, parent_path=f"{parent_path}.{nested_attr_name}[{i}]"
)
if isinstance(nested_attr, AbstractDataService):
self._register_start_stop_task_callbacks(
nested_attr, parent_path=f"{parent_path}.{nested_attr_name}"
)
def register_callbacks(self) -> None:
self._register_list_change_callbacks(
self.service, f"{self.service.__class__.__name__}"
)
self._register_data_service_instance_callbacks(
self.service, f"{self.service.__class__.__name__}"
)
self._register_property_callbacks(
self.service, f"{self.service.__class__.__name__}"
)
self._register_start_stop_task_callbacks(
self.service, f"{self.service.__class__.__name__}"
)
def emit_notification(self, parent_path: str, name: str, value: Any) -> None:
logger.debug("%s.%s changed to %s!", parent_path, name, value)
for callback in self._notification_callbacks:
try:
callback(parent_path, name, value)
except Exception as e:
logger.error(e)
def add_notification_callback(
self, callback: Callable[[str, str, Any], None]
) -> None:
"""
Adds a new notification callback function to the list of callbacks.
This function is intended to be used for registering a function that will be
called whenever a the value of an attribute changes.
Args:
callback (Callable[[str, str, Any], None]): The callback function to
register.
It should accept three parameters:
- parent_path (str): The parent path of the parameter.
- name (str): The name of the changed parameter.
- value (Any): The value of the parameter.
"""
self._notification_callbacks.append(callback)

View File

@ -7,7 +7,6 @@ import rpyc # type: ignore[import-untyped]
import pydase.units as u
from pydase.data_service.abstract_data_service import AbstractDataService
from pydase.data_service.callback_manager import CallbackManager
from pydase.data_service.task_manager import TaskManager
from pydase.utils.helpers import (
convert_arguments_to_hinted_types,
@ -45,16 +44,12 @@ def process_callable_attribute(attr: Any, args: dict[str, Any]) -> Any:
class DataService(rpyc.Service, AbstractDataService):
def __init__(self, **kwargs: Any) -> None:
self._callback_manager: CallbackManager = CallbackManager(self)
super().__init__()
self._task_manager = TaskManager(self)
if not hasattr(self, "_autostart_tasks"):
self._autostart_tasks = {}
self.__root__: "DataService" = self
"""Keep track of the root object. This helps to filter the emission of
notifications."""
filename = kwargs.pop("filename", None)
if filename is not None:
warnings.warn(
@ -65,32 +60,56 @@ class DataService(rpyc.Service, AbstractDataService):
)
self._filename: str | Path = filename
self._callback_manager.register_callbacks()
self.__check_instance_classes()
self._initialised = True
def __setattr__(self, __name: str, __value: Any) -> None:
# converting attributes that are not properties
if not isinstance(getattr(type(self), __name, None), property):
current_value = getattr(self, __name, None)
# parse ints into floats if current value is a float
if isinstance(current_value, float) and isinstance(__value, int):
__value = float(__value)
# Check and warn for unexpected type changes in attributes
self._warn_on_type_change(__name, __value)
if isinstance(current_value, u.Quantity):
__value = u.convert_to_quantity(__value, str(current_value.u))
# Warn if setting private attributes
self._warn_on_private_attr_set(__name)
# every class defined by the user should inherit from DataService if it is
# assigned to a public attribute
if not __name.startswith("_"):
warn_if_instance_class_does_not_inherit_from_data_service(__value)
# Set the attribute
super().__setattr__(__name, __value)
if self.__dict__.get("_initialised") and __name != "_initialised":
for callback in self._callback_manager.callbacks:
callback(__name, __value)
elif __name.startswith(f"_{self.__class__.__name__}__"):
def _warn_on_type_change(self, attr_name: str, new_value: Any) -> None:
if is_property_attribute(self, attr_name):
return
current_value = getattr(self, attr_name, None)
if self._is_unexpected_type_change(current_value, new_value):
logger.warning(
"Type of '%s' changed from '%s' to '%s'. This may have unwanted "
"side effects! Consider setting it to '%s' directly.",
attr_name,
type(current_value).__name__,
type(new_value).__name__,
type(current_value).__name__,
)
def _is_unexpected_type_change(self, current_value: Any, new_value: Any) -> bool:
return (
isinstance(current_value, float)
and not isinstance(new_value, float)
or (
isinstance(current_value, u.Quantity)
and not isinstance(new_value, u.Quantity)
)
)
def _warn_on_private_attr_set(self, attr_name: str) -> None:
if attr_name.startswith(f"_{self.__class__.__name__}__"):
logger.warning(
"Warning: You should not set private but rather protected attributes! "
"Use %s instead of %s.",
__name.replace(f"_{self.__class__.__name__}__", "_"),
__name.replace(f"_{self.__class__.__name__}__", "__"),
attr_name.replace(f"_{self.__class__.__name__}__", "_"),
attr_name.replace(f"_{self.__class__.__name__}__", "__"),
)
def __check_instance_classes(self) -> None:
@ -157,7 +176,7 @@ class DataService(rpyc.Service, AbstractDataService):
)
if hasattr(self, "_state_manager"):
self._state_manager.save_state() # type: ignore[reportGeneralTypeIssue]
self._state_manager.save_state()
def load_DataService_from_JSON( # noqa: N802
self, json_dict: dict[str, Any]

View File

@ -1,7 +1,12 @@
import logging
from typing import TYPE_CHECKING, Any
from pydase.utils.serializer import set_nested_value_by_path
from pydase.utils.serializer import (
SerializationPathError,
SerializationValueError,
get_nested_dict_by_path,
set_nested_value_by_path,
)
if TYPE_CHECKING:
from pydase import DataService
@ -23,13 +28,12 @@ class DataServiceCache:
"""Initializes the cache and sets up the callback."""
logger.debug("Initializing cache.")
self._cache = self.service.serialize()
self.service._callback_manager.add_notification_callback(self.update_cache)
def update_cache(self, parent_path: str, name: str, value: Any) -> None:
# Remove the part before the first "." in the parent_path
parent_path = ".".join(parent_path.split(".")[1:])
def update_cache(self, full_access_path: str, value: Any) -> None:
set_nested_value_by_path(self._cache, full_access_path, value)
# Construct the full path
full_path = f"{parent_path}.{name}" if parent_path else name
set_nested_value_by_path(self._cache, full_path, value)
def get_value_dict_from_cache(self, full_access_path: str) -> dict[str, Any]:
try:
return get_nested_dict_by_path(self._cache, full_access_path)
except (SerializationPathError, SerializationValueError, KeyError):
return {}

View File

@ -1,68 +0,0 @@
from collections.abc import Callable
from typing import Any
import pydase.units as u
from pydase.utils.warnings import (
warn_if_instance_class_does_not_inherit_from_data_service,
)
class DataServiceList(list[Any]):
"""
DataServiceList is a list with additional functionality to trigger callbacks
whenever an item is set. This can be used to track changes in the list items.
The class takes the same arguments as the list superclass during initialization,
with an additional optional 'callback' argument that is a list of functions.
These callbacks are stored and executed whenever an item in the DataServiceList
is set via the __setitem__ method. The callbacks receive the index of the changed
item and its new value as arguments.
The original list that is passed during initialization is kept as a private
attribute to prevent it from being garbage collected.
Additional callbacks can be added after initialization using the `add_callback`
method.
"""
def __init__(
self,
*args: list[Any],
callback_list: list[Callable[[int, Any], None]] | None = None,
**kwargs: Any,
) -> None:
self._callbacks: list[Callable[[int, Any], None]] = []
if isinstance(callback_list, list):
self._callbacks = callback_list
for item in args[0]:
warn_if_instance_class_does_not_inherit_from_data_service(item)
# prevent gc to delete the passed list by keeping a reference
self._original_list = args[0]
super().__init__(*args, **kwargs)
def __setitem__(self, key: int, value: Any) -> None: # type: ignore[override]
current_value = self.__getitem__(key)
# parse ints into floats if current value is a float
if isinstance(current_value, float) and isinstance(value, int):
value = float(value)
if isinstance(current_value, u.Quantity):
value = u.convert_to_quantity(value, str(current_value.u))
super().__setitem__(key, value)
for callback in self._callbacks:
callback(key, value)
def add_callback(self, callback: Callable[[int, Any], None]) -> None:
"""
Add a new callback function to be executed on item set.
Args:
callback (Callable[[int, Any], None]): Callback function that takes two
arguments - index of the changed item and its new value.
"""
self._callbacks.append(callback)

View File

@ -0,0 +1,101 @@
import logging
from collections.abc import Callable
from copy import deepcopy
from typing import Any
from pydase.data_service.state_manager import StateManager
from pydase.observer_pattern.observer.property_observer import (
PropertyObserver,
)
from pydase.utils.helpers import get_object_attr_from_path_list
from pydase.utils.serializer import dump
logger = logging.getLogger(__name__)
class DataServiceObserver(PropertyObserver):
def __init__(self, state_manager: StateManager) -> None:
self.state_manager = state_manager
self._notification_callbacks: list[
Callable[[str, Any, dict[str, Any]], None]
] = []
super().__init__(state_manager.service)
def on_change(self, full_access_path: str, value: Any) -> None:
cached_value_dict = deepcopy(
self.state_manager._data_service_cache.get_value_dict_from_cache(
full_access_path
)
)
cached_value = cached_value_dict.get("value")
if cached_value != dump(value)["value"]:
logger.debug("'%s' changed to '%s'", full_access_path, value)
self._update_cache_value(full_access_path, value, cached_value_dict)
for callback in self._notification_callbacks:
callback(full_access_path, value, cached_value_dict)
self._notify_dependent_property_changes(full_access_path)
def _update_cache_value(
self, full_access_path: str, value: Any, cached_value_dict: dict[str, Any]
) -> None:
value_dict = dump(value)
if cached_value_dict != {}:
if (
cached_value_dict["type"] != "method"
and cached_value_dict["type"] != value_dict["type"]
):
logger.warning(
"Type of '%s' changed from '%s' to '%s'. This could have unwanted "
"side effects! Consider setting it to '%s' directly.",
full_access_path,
cached_value_dict["type"],
value_dict["type"],
cached_value_dict["type"],
)
self.state_manager._data_service_cache.update_cache(
full_access_path,
value,
)
def _notify_dependent_property_changes(self, changed_attr_path: str) -> None:
changed_props = self.property_deps_dict.get(changed_attr_path, [])
for prop in changed_props:
# only notify about changing attribute if it is not currently being
# "changed" e.g. when calling the getter of a property within another
# property
if prop not in self.changing_attributes:
self._notify_changed(
prop,
get_object_attr_from_path_list(self.observable, prop.split(".")),
)
def add_notification_callback(
self, callback: Callable[[str, Any, dict[str, Any]], None]
) -> None:
"""
Registers a callback function to be invoked upon attribute changes in the
observed object.
This method allows for the addition of custom callback functions that will be
executed whenever there is a change in the value of an observed attribute. The
callback function is called with detailed information about the change, enabling
external logic to respond to specific state changes within the observable
object.
Args:
callback (Callable[[str, Any, dict[str, Any]]): The callback function to be
registered. The function should have the following signature:
- full_access_path (str): The full dot-notation access path of the
changed attribute. This path indicates the location of the changed
attribute within the observable object's structure.
- value (Any): The new value of the changed attribute.
- cached_value_dict (dict[str, Any]): A dictionary representing the
cached state of the attribute prior to the change. This can be useful
for understanding the nature of the change and for historical
comparison.
"""
self._notification_callbacks.append(callback)

View File

@ -145,7 +145,7 @@ class StateManager:
for path in generate_serialized_data_paths(json_dict):
nested_json_dict = get_nested_dict_by_path(json_dict, path)
nested_class_dict = get_nested_dict_by_path(self.cache, path)
nested_class_dict = self._data_service_cache.get_value_dict_from_cache(path)
value, value_type = nested_json_dict["value"], nested_json_dict["type"]
class_attr_value_type = nested_class_dict.get("type", None)
@ -220,6 +220,8 @@ class StateManager:
) -> Any:
if current_value_dict["type"] == "Quantity":
return u.convert_to_quantity(value, current_value_dict["value"]["unit"])
if current_value_dict["type"] == "float" and not isinstance(value, float):
return float(value)
return value
def __update_attribute_by_path(self, path: str, value: Any) -> None:

View File

@ -7,7 +7,6 @@ from functools import wraps
from typing import TYPE_CHECKING, Any, TypedDict
from pydase.data_service.abstract_data_service import AbstractDataService
from pydase.data_service.data_service_list import DataServiceList
from pydase.utils.helpers import get_class_and_instance_attributes
if TYPE_CHECKING:
@ -87,12 +86,6 @@ class TaskManager:
its kwargs.
"""
self.task_status_change_callbacks: list[
Callable[[str, dict[str, Any] | None], Any]
] = []
"""A list of callback functions to be invoked when the status of a task (start
or stop) changes."""
self._set_start_and_stop_for_async_methods()
def _set_start_and_stop_for_async_methods(self) -> None:
@ -122,7 +115,7 @@ class TaskManager:
for attr_value in attrs.values():
if isinstance(attr_value, AbstractDataService):
attr_value._task_manager.start_autostart_tasks()
elif isinstance(attr_value, DataServiceList):
elif isinstance(attr_value, list):
for item in attr_value:
if isinstance(item, AbstractDataService):
item._task_manager.start_autostart_tasks()
@ -146,7 +139,7 @@ class TaskManager:
return stop_task
def _make_start_task( # noqa: C901
def _make_start_task(
self, name: str, method: Callable[..., Any]
) -> Callable[..., Any]:
"""
@ -162,7 +155,7 @@ class TaskManager:
"""
@wraps(method)
def start_task(*args: Any, **kwargs: Any) -> None: # noqa: C901
def start_task(*args: Any, **kwargs: Any) -> None:
def task_done_callback(task: asyncio.Task[None], name: str) -> None:
"""Handles tasks that have finished.
@ -173,8 +166,7 @@ class TaskManager:
self.tasks.pop(name, None)
# emit the notification that the task was stopped
for callback in self.task_status_change_callbacks:
callback(name, None)
self.service._notify_changed(name, None)
exception = task.exception()
if exception is not None:
@ -230,8 +222,7 @@ class TaskManager:
}
# emit the notification that the task was started
for callback in self.task_status_change_callbacks:
callback(name, kwargs_updated)
self.service._notify_changed(name, kwargs_updated)
else:
logger.error("Task '%s' is already running!", name)

View File

@ -1,13 +1,13 @@
{
"files": {
"main.css": "/static/css/main.2d8458eb.css",
"main.js": "/static/js/main.08fc7255.js",
"main.js": "/static/js/main.7f907b0f.js",
"index.html": "/index.html",
"main.2d8458eb.css.map": "/static/css/main.2d8458eb.css.map",
"main.08fc7255.js.map": "/static/js/main.08fc7255.js.map"
"main.7f907b0f.js.map": "/static/js/main.7f907b0f.js.map"
},
"entrypoints": [
"static/css/main.2d8458eb.css",
"static/js/main.08fc7255.js"
"static/js/main.7f907b0f.js"
]
}

View File

@ -1 +1 @@
<!doctype html><html lang="en"><head><meta charset="utf-8"/><link rel="icon" href="/favicon.ico"/><meta name="viewport" content="width=device-width,initial-scale=1"/><meta name="theme-color" content="#000000"/><meta name="description" content="Web site displaying a pydase UI."/><link rel="apple-touch-icon" href="/logo192.png"/><link rel="manifest" href="/manifest.json"/><title>pydase App</title><script defer="defer" src="/static/js/main.08fc7255.js"></script><link href="/static/css/main.2d8458eb.css" rel="stylesheet"></head><body><noscript>You need to enable JavaScript to run this app.</noscript><div id="root"></div></body></html>
<!doctype html><html lang="en"><head><meta charset="utf-8"/><link rel="icon" href="/favicon.ico"/><meta name="viewport" content="width=device-width,initial-scale=1"/><meta name="theme-color" content="#000000"/><meta name="description" content="Web site displaying a pydase UI."/><link rel="apple-touch-icon" href="/logo192.png"/><link rel="manifest" href="/manifest.json"/><title>pydase App</title><script defer="defer" src="/static/js/main.7f907b0f.js"></script><link href="/static/css/main.2d8458eb.css" rel="stylesheet"></head><body><noscript>You need to enable JavaScript to run this app.</noscript><div id="root"></div></body></html>

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

View File

@ -0,0 +1,3 @@
from pydase.observer_pattern.observable.observable import Observable
__all__ = ["Observable"]

View File

@ -0,0 +1,71 @@
import logging
from typing import Any
from pydase.observer_pattern.observable.observable_object import ObservableObject
from pydase.utils.helpers import is_property_attribute
logger = logging.getLogger(__name__)
class Observable(ObservableObject):
def __init__(self) -> None:
super().__init__()
class_attrs = {
k: type(self).__dict__[k]
for k in set(type(self).__dict__)
- set(Observable.__dict__)
- set(self.__dict__)
}
for name, value in class_attrs.items():
if isinstance(value, property) or callable(value):
continue
self.__dict__[name] = self._initialise_new_objects(name, value)
def __setattr__(self, name: str, value: Any) -> None:
if not hasattr(self, "_observers") and name != "_observers":
logger.warning(
"Ensure that super().__init__() is called at the start of the '%s' "
"constructor! Failing to do so may lead to unexpected behavior.",
type(self).__name__,
)
self._observers = {}
value = self._handle_observable_setattr(name, value)
super().__setattr__(name, value)
self._notify_changed(name, value)
def __getattribute__(self, name: str) -> Any:
if is_property_attribute(self, name):
self._notify_change_start(name)
value = super().__getattribute__(name)
if is_property_attribute(self, name):
self._notify_changed(name, value)
return value
def _handle_observable_setattr(self, name: str, value: Any) -> Any:
if name == "_observers":
return value
self._remove_observer_if_observable(name)
value = self._initialise_new_objects(name, value)
self._notify_change_start(name)
return value
def _remove_observer_if_observable(self, name: str) -> None:
if not is_property_attribute(self, name):
current_value = getattr(self, name, None)
if isinstance(current_value, ObservableObject):
current_value._remove_observer(self, name)
def _construct_extended_attr_path(
self, observer_attr_name: str, instance_attr_name: str
) -> str:
if observer_attr_name != "":
return f"{observer_attr_name}.{instance_attr_name}"
return instance_attr_name

View File

@ -0,0 +1,201 @@
import logging
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Any, ClassVar
if TYPE_CHECKING:
from pydase.observer_pattern.observer.observer import Observer
logger = logging.getLogger(__name__)
class ObservableObject(ABC):
_list_mapping: ClassVar[dict[int, "_ObservableList"]] = {}
_dict_mapping: ClassVar[dict[int, "_ObservableDict"]] = {}
def __init__(self) -> None:
if not hasattr(self, "_observers"):
self._observers: dict[str, list["ObservableObject | Observer"]] = {}
def add_observer(
self, observer: "ObservableObject | Observer", attr_name: str = ""
) -> None:
if attr_name not in self._observers:
self._observers[attr_name] = []
if observer not in self._observers[attr_name]:
self._observers[attr_name].append(observer)
def _remove_observer(self, observer: "ObservableObject", attribute: str) -> None:
if attribute in self._observers:
self._observers[attribute].remove(observer)
@abstractmethod
def _remove_observer_if_observable(self, name: str) -> None:
"""Removes the current object as an observer from an observable attribute.
This method is called before an attribute of the observable object is
changed. If the current value of the attribute is an instance of
`ObservableObject`, this method removes the current object from its list
of observers. This is a crucial step to avoid unwanted notifications from
the old value of the attribute.
"""
def _notify_changed(self, changed_attribute: str, value: Any) -> None:
"""Notifies all observers about changes to an attribute.
This method iterates through all observers registered for the object and
invokes their notification method. It is called whenever an attribute of
the observable object is changed.
Args:
changed_attribute (str): The name of the changed attribute.
value (Any): The value that the attribute was set to.
"""
for attr_name, observer_list in self._observers.items():
for observer in observer_list:
extendend_attr_path = self._construct_extended_attr_path(
attr_name, changed_attribute
)
observer._notify_changed(extendend_attr_path, value)
def _notify_change_start(self, changing_attribute: str) -> None:
"""Notify observers that an attribute or item change process has started.
This method is called at the start of the process of modifying an attribute in
the observed `Observable` object. It registers the attribute as currently
undergoing a change. This registration helps in managing and tracking changes as
they occur, especially in scenarios where the order of changes or their state
during the transition is significant.
Args:
changing_attribute (str): The name of the attribute that is starting to
change. This is typically the full access path of the attribute in the
`Observable`.
value (Any): The value that the attribute is being set to.
"""
for attr_name, observer_list in self._observers.items():
for observer in observer_list:
extended_attr_path = self._construct_extended_attr_path(
attr_name, changing_attribute
)
observer._notify_change_start(extended_attr_path)
def _initialise_new_objects(self, attr_name_or_key: Any, value: Any) -> Any:
new_value = value
if isinstance(value, list):
if id(value) in self._list_mapping:
# If the list `value` was already referenced somewhere else
new_value = self._list_mapping[id(value)]
else:
# convert the builtin list into a ObservableList
new_value = _ObservableList(original_list=value)
self._list_mapping[id(value)] = new_value
elif isinstance(value, dict):
if id(value) in self._dict_mapping:
# If the list `value` was already referenced somewhere else
new_value = self._dict_mapping[id(value)]
else:
# convert the builtin list into a ObservableList
new_value = _ObservableDict(original_dict=value)
self._dict_mapping[id(value)] = new_value
if isinstance(new_value, ObservableObject):
new_value.add_observer(self, str(attr_name_or_key))
return new_value
@abstractmethod
def _construct_extended_attr_path(
self, observer_attr_name: str, instance_attr_name: str
) -> str:
"""
Constructs the extended attribute path for notification purposes, which is used
in the observer pattern to specify the full path of an observed attribute.
This abstract method is implemented by the classes inheriting from
`ObservableObject`.
Args:
observer_attr_name (str): The name of the attribute in the observer that
holds a reference to the instance. Equals `""` if observer itself is of type
`Observer`.
instance_attr_name (str): The name of the attribute within the instance that
has changed.
Returns:
str: The constructed extended attribute path.
"""
class _ObservableList(ObservableObject, list[Any]):
def __init__(
self,
original_list: list[Any],
) -> None:
self._original_list = original_list
ObservableObject.__init__(self)
list.__init__(self, self._original_list)
for i, item in enumerate(self._original_list):
super().__setitem__(i, self._initialise_new_objects(f"[{i}]", item))
def __setitem__(self, key: int, value: Any) -> None: # type: ignore[override]
if hasattr(self, "_observers"):
self._remove_observer_if_observable(f"[{key}]")
value = self._initialise_new_objects(f"[{key}]", value)
self._notify_change_start(f"[{key}]")
super().__setitem__(key, value)
self._notify_changed(f"[{key}]", value)
def _remove_observer_if_observable(self, name: str) -> None:
key = int(name[1:-1])
current_value = self.__getitem__(key)
if isinstance(current_value, ObservableObject):
current_value._remove_observer(self, name)
def _construct_extended_attr_path(
self, observer_attr_name: str, instance_attr_name: str
) -> str:
if observer_attr_name != "":
return f"{observer_attr_name}{instance_attr_name}"
return instance_attr_name
class _ObservableDict(dict[str, Any], ObservableObject):
def __init__(
self,
original_dict: dict[str, Any],
) -> None:
self._original_dict = original_dict
ObservableObject.__init__(self)
dict.__init__(self)
for key, value in self._original_dict.items():
super().__setitem__(key, self._initialise_new_objects(f"['{key}']", value))
def __setitem__(self, key: str, value: Any) -> None:
if not isinstance(key, str):
logger.warning("Converting non-string dictionary key %s to string.", key)
key = str(key)
if hasattr(self, "_observers"):
self._remove_observer_if_observable(f"['{key}']")
value = self._initialise_new_objects(key, value)
self._notify_change_start(f"['{key}']")
super().__setitem__(key, value)
self._notify_changed(f"['{key}']", value)
def _remove_observer_if_observable(self, name: str) -> None:
key = name[2:-2]
current_value = self.get(key, None)
if isinstance(current_value, ObservableObject):
current_value._remove_observer(self, name)
def _construct_extended_attr_path(
self, observer_attr_name: str, instance_attr_name: str
) -> str:
if observer_attr_name != "":
return f"{observer_attr_name}{instance_attr_name}"
return instance_attr_name

View File

@ -0,0 +1,7 @@
from pydase.observer_pattern.observer.observer import Observer
from pydase.observer_pattern.observer.property_observer import PropertyObserver
__all__ = [
"Observer",
"PropertyObserver",
]

View File

@ -0,0 +1,31 @@
import logging
from abc import ABC, abstractmethod
from typing import Any
from pydase.observer_pattern.observable import Observable
logger = logging.getLogger(__name__)
class Observer(ABC):
def __init__(self, observable: Observable) -> None:
self.observable = observable
self.observable.add_observer(self)
self.changing_attributes: list[str] = []
def _notify_changed(self, changed_attribute: str, value: Any) -> None:
if changed_attribute in self.changing_attributes:
self.changing_attributes.remove(changed_attribute)
self.on_change(full_access_path=changed_attribute, value=value)
def _notify_change_start(self, changing_attribute: str) -> None:
self.changing_attributes.append(changing_attribute)
self.on_change_start(changing_attribute)
@abstractmethod
def on_change(self, full_access_path: str, value: Any) -> None:
...
def on_change_start(self, full_access_path: str) -> None:
return

View File

@ -0,0 +1,92 @@
import inspect
import logging
import re
from typing import Any
from pydase.observer_pattern.observable.observable import Observable
from pydase.observer_pattern.observer.observer import Observer
logger = logging.getLogger(__name__)
def reverse_dict(original_dict: dict[str, list[str]]) -> dict[str, list[str]]:
reversed_dict: dict[str, list[str]] = {
value: [] for values in original_dict.values() for value in values
}
for key, values in original_dict.items():
for value in values:
reversed_dict[value].append(key)
return reversed_dict
def get_property_dependencies(prop: property, prefix: str = "") -> list[str]:
source_code_string = inspect.getsource(prop.fget) # type: ignore[arg-type]
pattern = r"self\.([^\s\{\}]+)"
matches = re.findall(pattern, source_code_string)
return [prefix + match for match in matches if "(" not in match]
class PropertyObserver(Observer):
def __init__(self, observable: Observable) -> None:
super().__init__(observable)
self.property_deps_dict = reverse_dict(
self._get_properties_and_their_dependencies(self.observable)
)
def _get_properties_and_their_dependencies(
self, obj: Observable, prefix: str = ""
) -> dict[str, list[str]]:
deps: dict[str, Any] = {}
self._process_observable_properties(obj, deps, prefix)
self._process_nested_observables_properties(obj, deps, prefix)
return deps
def _process_observable_properties(
self, obj: Observable, deps: dict[str, Any], prefix: str
) -> None:
for k, value in vars(type(obj)).items():
prefix = (
f"{prefix}." if prefix != "" and not prefix.endswith(".") else prefix
)
key = f"{prefix}{k}"
if isinstance(value, property):
deps[key] = get_property_dependencies(value, prefix)
def _process_nested_observables_properties(
self, obj: Observable, deps: dict[str, Any], prefix: str
) -> None:
for k, value in vars(obj).items():
prefix = (
f"{prefix}." if prefix != "" and not prefix.endswith(".") else prefix
)
parent_path = f"{prefix}{k}"
if isinstance(value, Observable):
new_prefix = f"{parent_path}."
deps.update(
self._get_properties_and_their_dependencies(value, new_prefix)
)
elif isinstance(value, list | dict):
self._process_collection_item_properties(value, deps, parent_path)
def _process_collection_item_properties(
self,
collection: list[Any] | dict[str, Any],
deps: dict[str, Any],
parent_path: str,
) -> None:
if isinstance(collection, list):
for i, item in enumerate(collection):
if isinstance(item, Observable):
new_prefix = f"{parent_path}[{i}]"
deps.update(
self._get_properties_and_their_dependencies(item, new_prefix)
)
elif isinstance(collection, dict):
for key, val in collection.items():
if isinstance(val, Observable):
new_prefix = f"{parent_path}['{key}']"
deps.update(
self._get_properties_and_their_dependencies(val, new_prefix)
)

View File

@ -4,7 +4,6 @@ import os
import signal
import threading
from concurrent.futures import ThreadPoolExecutor
from copy import deepcopy
from pathlib import Path
from types import FrameType
from typing import Any, Protocol, TypedDict
@ -14,8 +13,9 @@ from rpyc import ForkingServer, ThreadedServer # type: ignore[import-untyped]
from uvicorn.server import HANDLED_SIGNALS
from pydase import DataService
from pydase.data_service.data_service_observer import DataServiceObserver
from pydase.data_service.state_manager import StateManager
from pydase.utils.serializer import dump, get_nested_dict_by_path
from pydase.utils.serializer import dump
from .web_server import WebAPI
@ -68,7 +68,6 @@ class AdditionalServerProtocol(Protocol):
"""Starts the server. This method should be implemented as an asynchronous
method, which means that it should be able to run concurrently with other tasks.
"""
...
class AdditionalServer(TypedDict):
@ -193,6 +192,7 @@ class Server:
if getattr(self._service, "_filename", None) is not None:
self._service._state_manager = self._state_manager
self._state_manager.load_state()
self._observer = DataServiceObserver(self._state_manager)
def run(self) -> None:
"""
@ -266,7 +266,7 @@ class Server:
future_or_task = self._loop.create_task(addin_server.serve())
self.servers[server_name] = future_or_task
if self._enable_web:
self._wapi: WebAPI = WebAPI(
self._wapi = WebAPI(
service=self._service,
state_manager=self._state_manager,
**self._kwargs,
@ -277,13 +277,11 @@ class Server:
)
)
def sio_callback(parent_path: str, name: str, value: Any) -> None:
full_access_path = ".".join([*parent_path.split(".")[1:], name])
cached_value_dict = deepcopy(
get_nested_dict_by_path(self._state_manager.cache, full_access_path)
)
def sio_callback(
full_access_path: str, value: Any, cached_value_dict: dict[str, Any]
) -> None:
if cached_value_dict != {}:
serialized_value = dump(value)
if cached_value_dict["type"] != "method":
cached_value_dict["type"] = serialized_value["type"]
@ -295,8 +293,7 @@ class Server:
"notify",
{
"data": {
"parent_path": parent_path,
"name": name,
"full_access_path": full_access_path,
"value": cached_value_dict,
}
},
@ -306,7 +303,7 @@ class Server:
self._loop.create_task(notify())
self._service._callback_manager.add_notification_callback(sio_callback)
self._observer.add_notification_callback(sio_callback)
# overwrite uvicorn's signal handlers, otherwise it will bogart SIGINT and
# SIGTERM, which makes it impossible to escape out of

View File

@ -26,9 +26,7 @@ def get_class_and_instance_attributes(obj: object) -> dict[str, Any]:
loops.
"""
attrs = dict(chain(type(obj).__dict__.items(), obj.__dict__.items()))
attrs.pop("__root__")
return attrs
return dict(chain(type(obj).__dict__.items(), obj.__dict__.items()))
def get_object_attr_from_path_list(target_obj: Any, path: list[str]) -> Any:

View File

@ -271,21 +271,12 @@ def get_nested_dict_by_path(
parent_path_parts, attr_name = path.split(".")[:-1], path.split(".")[-1]
current_dict: dict[str, Any] = serialization_dict
try:
for path_part in parent_path_parts:
current_dict = get_next_level_dict_by_key(
current_dict, path_part, allow_append=False
)
current_dict = current_dict["value"]
current_dict = get_next_level_dict_by_key(
current_dict, attr_name, allow_append=False
)
except (SerializationPathError, SerializationValueError, KeyError) as e:
logger.error(e)
return {}
return current_dict
return get_next_level_dict_by_key(current_dict, attr_name, allow_append=False)
def get_next_level_dict_by_key(

View File

@ -22,6 +22,7 @@ def warn_if_instance_class_does_not_inherit_from_data_service(__value: object) -
and type(__value).__name__ not in ["CallbackManager", "TaskManager", "Quantity"]
):
logger.warning(
"Warning: Class '%s' does not inherit from DataService.",
"Class '%s' does not inherit from DataService. This may lead to unexpected "
"behaviour!",
type(__value).__name__,
)

View File

@ -1,7 +1,8 @@
from pytest import LogCaptureFixture
from pydase.components.coloured_enum import ColouredEnum
from pydase.data_service.data_service import DataService
from pydase.data_service.data_service_observer import DataServiceObserver
from pydase.data_service.state_manager import StateManager
from pytest import LogCaptureFixture
def test_ColouredEnum(caplog: LogCaptureFixture) -> None:
@ -21,14 +22,16 @@ def test_ColouredEnum(caplog: LogCaptureFixture) -> None:
# do something ...
self._status = value
service = ServiceClass()
service_instance = ServiceClass()
state_manager = StateManager(service_instance)
DataServiceObserver(state_manager)
service.status = MyStatus.FAILING
service_instance.status = MyStatus.FAILING
assert "ServiceClass.status changed to MyStatus.FAILING" in caplog.text
assert "'status' changed to 'MyStatus.FAILING'" in caplog.text
def test_warning(caplog: LogCaptureFixture) -> None: # noqa
def test_warning(caplog: LogCaptureFixture) -> None:
class MyStatus(ColouredEnum):
RUNNING = "#00FF00"
FAILING = "#FF0000"

View File

@ -1,7 +1,8 @@
from pytest import LogCaptureFixture
from pydase.components.number_slider import NumberSlider
from pydase.data_service.data_service import DataService
from pydase.data_service.data_service_observer import DataServiceObserver
from pydase.data_service.state_manager import StateManager
from pytest import LogCaptureFixture
def test_NumberSlider(caplog: LogCaptureFixture) -> None:
@ -9,35 +10,37 @@ def test_NumberSlider(caplog: LogCaptureFixture) -> None:
number_slider = NumberSlider(1, 0, 10, 1)
int_number_slider = NumberSlider(1, 0, 10, 1, "int")
service = ServiceClass()
service_instance = ServiceClass()
state_manager = StateManager(service_instance)
DataServiceObserver(state_manager)
assert service.number_slider.value == 1
assert isinstance(service.number_slider.value, float)
assert service.number_slider.min == 0
assert isinstance(service.number_slider.min, float)
assert service.number_slider.max == 10
assert isinstance(service.number_slider.max, float)
assert service.number_slider.step_size == 1
assert isinstance(service.number_slider.step_size, float)
assert service_instance.number_slider.value == 1
assert isinstance(service_instance.number_slider.value, float)
assert service_instance.number_slider.min == 0
assert isinstance(service_instance.number_slider.min, float)
assert service_instance.number_slider.max == 10
assert isinstance(service_instance.number_slider.max, float)
assert service_instance.number_slider.step_size == 1
assert isinstance(service_instance.number_slider.step_size, float)
assert service.int_number_slider.value == 1
assert isinstance(service.int_number_slider.value, int)
assert service.int_number_slider.step_size == 1
assert isinstance(service.int_number_slider.step_size, int)
assert service_instance.int_number_slider.value == 1
assert isinstance(service_instance.int_number_slider.value, int)
assert service_instance.int_number_slider.step_size == 1
assert isinstance(service_instance.int_number_slider.step_size, int)
service.number_slider.value = 10.0
service.int_number_slider.value = 10.1
service_instance.number_slider.value = 10.0
service_instance.int_number_slider.value = 10.1
assert "ServiceClass.number_slider.value changed to 10.0" in caplog.text
assert "ServiceClass.int_number_slider.value changed to 10" in caplog.text
assert "'number_slider.value' changed to '10.0'" in caplog.text
assert "'int_number_slider.value' changed to '10'" in caplog.text
caplog.clear()
service.number_slider.min = 1.1
service_instance.number_slider.min = 1.1
assert "ServiceClass.number_slider.min changed to 1.1" in caplog.text
assert "'number_slider.min' changed to '1.1'" in caplog.text
def test_init_error(caplog: LogCaptureFixture) -> None: # noqa
def test_init_error(caplog: LogCaptureFixture) -> None:
number_slider = NumberSlider(type_="str") # type: ignore # noqa
assert "Unknown type 'str'. Using 'float'" in caplog.text

View File

@ -1,42 +0,0 @@
import logging
from pytest import LogCaptureFixture
import pydase
logger = logging.getLogger()
def test_DataService_task_callback(caplog: LogCaptureFixture) -> None:
class MyService(pydase.DataService):
async def my_task(self) -> None:
logger.info("Triggered task.")
async def my_other_task(self) -> None:
logger.info("Triggered other task.")
service = MyService()
service.start_my_task() # type: ignore
service.start_my_other_task() # type: ignore
assert "MyService.my_task changed to {}" in caplog.text
assert "MyService.my_other_task changed to {}" in caplog.text
def test_DataServiceList_task_callback(caplog: LogCaptureFixture) -> None:
class MySubService(pydase.DataService):
async def my_task(self) -> None:
logger.info("Triggered task.")
async def my_other_task(self) -> None:
logger.info("Triggered other task.")
class MyService(pydase.DataService):
sub_services_list = [MySubService() for i in range(2)]
service = MyService()
service.sub_services_list[0].start_my_task() # type: ignore
service.sub_services_list[1].start_my_other_task() # type: ignore
assert "MyService.sub_services_list[0].my_task changed to {}" in caplog.text
assert "MyService.sub_services_list[1].my_other_task changed to {}" in caplog.text

View File

@ -0,0 +1,29 @@
import pydase.units as u
from pydase import DataService
from pydase.data_service.data_service_observer import DataServiceObserver
from pydase.data_service.state_manager import StateManager
from pytest import LogCaptureFixture
def test_unexpected_type_change_warning(caplog: LogCaptureFixture) -> None:
class ServiceClass(DataService):
attr_1 = 1.0
current = 1.0 * u.units.A
service_instance = ServiceClass()
state_manager = StateManager(service_instance)
DataServiceObserver(state_manager)
service_instance.attr_1 = 2
assert "'attr_1' changed to '2'" in caplog.text
assert (
"Type of 'attr_1' changed from 'float' to 'int'. This may have unwanted "
"side effects! Consider setting it to 'float' directly." in caplog.text
)
service_instance.current = 2
assert "'current' changed to '2'" in caplog.text
assert (
"Type of 'current' changed from 'Quantity' to 'int'. This may have unwanted "
"side effects! Consider setting it to 'Quantity' directly." in caplog.text
)

View File

@ -1,8 +1,8 @@
import logging
import pydase
from pydase.data_service.data_service_cache import DataServiceCache
from pydase.utils.serializer import get_nested_dict_by_path
from pydase.data_service.data_service_observer import DataServiceObserver
from pydase.data_service.state_manager import StateManager
logger = logging.getLogger()
@ -15,14 +15,23 @@ def test_nested_attributes_cache_callback() -> None:
class_attr = SubClass()
name = "World"
test_service = ServiceClass()
cache = DataServiceCache(test_service)
service_instance = ServiceClass()
state_manager = StateManager(service_instance)
DataServiceObserver(state_manager)
test_service.name = "Peepz"
assert get_nested_dict_by_path(cache.cache, "name")["value"] == "Peepz"
service_instance.name = "Peepz"
assert (
state_manager._data_service_cache.get_value_dict_from_cache("name")["value"]
== "Peepz"
)
test_service.class_attr.name = "Ciao"
assert get_nested_dict_by_path(cache.cache, "class_attr.name")["value"] == "Ciao"
service_instance.class_attr.name = "Ciao"
assert (
state_manager._data_service_cache.get_value_dict_from_cache("class_attr.name")[
"value"
]
== "Ciao"
)
def test_task_status_update() -> None:
@ -32,11 +41,29 @@ def test_task_status_update() -> None:
async def my_method(self) -> None:
pass
test_service = ServiceClass()
cache = DataServiceCache(test_service)
assert get_nested_dict_by_path(cache.cache, "my_method")["type"] == "method"
assert get_nested_dict_by_path(cache.cache, "my_method")["value"] is None
service_instance = ServiceClass()
state_manager = StateManager(service_instance)
DataServiceObserver(state_manager)
test_service.start_my_method() # type: ignore
assert get_nested_dict_by_path(cache.cache, "my_method")["type"] == "method"
assert get_nested_dict_by_path(cache.cache, "my_method")["value"] == {}
assert (
state_manager._data_service_cache.get_value_dict_from_cache("my_method")["type"]
== "method"
)
assert (
state_manager._data_service_cache.get_value_dict_from_cache("my_method")[
"value"
]
is None
)
service_instance.start_my_method() # type: ignore
assert (
state_manager._data_service_cache.get_value_dict_from_cache("my_method")["type"]
== "method"
)
assert (
state_manager._data_service_cache.get_value_dict_from_cache("my_method")[
"value"
]
== {}
)

View File

@ -1,129 +0,0 @@
from typing import Any
from pytest import LogCaptureFixture
import pydase.units as u
from pydase import DataService
def test_class_list_attribute(caplog: LogCaptureFixture) -> None:
class ServiceClass(DataService):
attr = [0, 1]
service_instance = ServiceClass()
service_instance.attr[0] = 1337
assert "ServiceClass.attr[0] changed to 1337" in caplog.text
caplog.clear()
def test_instance_list_attribute(caplog: LogCaptureFixture) -> None:
class SubClass(DataService):
name = "SubClass"
class ServiceClass(DataService):
def __init__(self) -> None:
self.attr: list[Any] = [0, SubClass()]
super().__init__()
service_instance = ServiceClass()
service_instance.attr[0] = "Hello"
assert "ServiceClass.attr[0] changed to Hello" in caplog.text
caplog.clear()
service_instance.attr[1] = SubClass()
assert f"ServiceClass.attr[1] changed to {service_instance.attr[1]}" in caplog.text
caplog.clear()
def test_reused_instance_list_attribute(caplog: LogCaptureFixture) -> None:
some_list = [0, 1, 2]
class ServiceClass(DataService):
def __init__(self) -> None:
self.attr = some_list
self.attr_2 = some_list
self.attr_3 = [0, 1, 2]
super().__init__()
service_instance = ServiceClass()
service_instance.attr[0] = 20
assert service_instance.attr == service_instance.attr_2
assert service_instance.attr != service_instance.attr_3
assert "ServiceClass.attr[0] changed to 20" in caplog.text
assert "ServiceClass.attr_2[0] changed to 20" in caplog.text
def test_nested_reused_instance_list_attribute(caplog: LogCaptureFixture) -> None:
some_list = [0, 1, 2]
class SubClass(DataService):
attr_list = some_list
def __init__(self) -> None:
self.attr_list_2 = some_list
super().__init__()
class ServiceClass(DataService):
def __init__(self) -> None:
self.attr = some_list
self.subclass = SubClass()
super().__init__()
service_instance = ServiceClass()
service_instance.attr[0] = 20
assert service_instance.attr == service_instance.subclass.attr_list
assert "ServiceClass.attr[0] changed to 20" in caplog.text
assert "ServiceClass.subclass.attr_list[0] changed to 20" in caplog.text
assert "ServiceClass.subclass.attr_list_2[0] changed to 20" in caplog.text
def test_protected_list_attribute(caplog: LogCaptureFixture) -> None:
"""Changing protected lists should not emit notifications for the lists themselves,
but still for all properties depending on them.
"""
class ServiceClass(DataService):
_attr = [0, 1]
@property
def list_dependend_property(self) -> int:
return self._attr[0]
service_instance = ServiceClass()
service_instance._attr[0] = 1337
assert "ServiceClass.list_dependend_property changed to 1337" in caplog.text
def test_converting_int_to_float_entries(caplog: LogCaptureFixture) -> None:
class ServiceClass(DataService):
float_list = [0.0]
service_instance = ServiceClass()
service_instance.float_list[0] = 1
assert isinstance(service_instance.float_list[0], float)
assert "ServiceClass.float_list[0] changed to 1.0" in caplog.text
def test_converting_number_to_quantity_entries(caplog: LogCaptureFixture) -> None:
class ServiceClass(DataService):
quantity_list: list[u.Quantity] = [1 * u.units.A]
service_instance = ServiceClass()
service_instance.quantity_list[0] = 4 # type: ignore
assert isinstance(service_instance.quantity_list[0], u.Quantity)
assert "ServiceClass.quantity_list[0] changed to 4.0 A" in caplog.text
caplog.clear()
service_instance.quantity_list[0] = 3.1 * u.units.mA
assert isinstance(service_instance.quantity_list[0], u.Quantity)
assert "ServiceClass.quantity_list[0] changed to 3.1 mA" in caplog.text

View File

@ -2,16 +2,17 @@ import json
from pathlib import Path
from typing import Any
from pytest import LogCaptureFixture
import pydase
import pydase.units as u
import pytest
from pydase.components.coloured_enum import ColouredEnum
from pydase.data_service.data_service_observer import DataServiceObserver
from pydase.data_service.state_manager import (
StateManager,
has_load_state_decorator,
load_state,
)
from pytest import LogCaptureFixture
class SubService(pydase.DataService):
@ -26,6 +27,7 @@ class State(ColouredEnum):
class Service(pydase.DataService):
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self.subservice = SubService()
self.some_unit: u.Quantity = 1.2 * u.units.A
self.some_float = 1.0
@ -33,7 +35,6 @@ class Service(pydase.DataService):
self._property_attr = 1337.0
self._name = "Service"
self.state = State.RUNNING
super().__init__(**kwargs)
@property
def name(self) -> str:
@ -117,7 +118,7 @@ LOAD_STATE = {
}
def test_save_state(tmp_path: Path):
def test_save_state(tmp_path: Path) -> None:
# Create a StateManager instance with a temporary file
file = tmp_path / "test_state.json"
manager = StateManager(service=Service(), filename=str(file))
@ -129,7 +130,7 @@ def test_save_state(tmp_path: Path):
assert file.read_text() == json.dumps(CURRENT_STATE, indent=4)
def test_load_state(tmp_path: Path, caplog: LogCaptureFixture):
def test_load_state(tmp_path: Path, caplog: LogCaptureFixture) -> None:
# Create a StateManager instance with a temporary file
file = tmp_path / "test_state.json"
@ -138,8 +139,9 @@ def test_load_state(tmp_path: Path, caplog: LogCaptureFixture):
json.dump(LOAD_STATE, f, indent=4)
service = Service()
manager = StateManager(service=service, filename=str(file))
manager.load_state()
state_manager = StateManager(service=service, filename=str(file))
DataServiceObserver(state_manager)
state_manager.load_state()
assert service.some_unit == u.Quantity(12, "A") # has changed
assert service.list_attr[0] == 1.4 # has changed
@ -152,7 +154,7 @@ def test_load_state(tmp_path: Path, caplog: LogCaptureFixture):
assert service.some_float == 1.0 # has not changed due to different type
assert service.subservice.name == "SubService" # didn't change
assert "Service.some_unit changed to 12.0 A!" in caplog.text
assert "'some_unit' changed to '12.0 A'" in caplog.text
assert (
"Property 'name' has no '@load_state' decorator. "
"Ignoring value from JSON file..." in caplog.text
@ -168,15 +170,17 @@ def test_load_state(tmp_path: Path, caplog: LogCaptureFixture):
assert "Value of attribute 'subservice.name' has not changed..." in caplog.text
def test_filename_warning(tmp_path: Path, caplog: LogCaptureFixture):
def test_filename_warning(tmp_path: Path, caplog: LogCaptureFixture) -> None:
file = tmp_path / "test_state.json"
with pytest.warns(DeprecationWarning):
service = Service(filename=str(file))
StateManager(service=service, filename=str(file))
assert f"Overwriting filename {str(file)!r} with {str(file)!r}." in caplog.text
def test_filename_error(caplog: LogCaptureFixture):
def test_filename_error(caplog: LogCaptureFixture) -> None:
service = Service()
manager = StateManager(service=service)
@ -187,7 +191,7 @@ def test_filename_error(caplog: LogCaptureFixture):
)
def test_readonly_attribute(tmp_path: Path, caplog: LogCaptureFixture):
def test_readonly_attribute(tmp_path: Path, caplog: LogCaptureFixture) -> None:
# Create a StateManager instance with a temporary file
file = tmp_path / "test_state.json"
@ -205,7 +209,7 @@ def test_readonly_attribute(tmp_path: Path, caplog: LogCaptureFixture):
)
def test_changed_type(tmp_path: Path, caplog: LogCaptureFixture):
def test_changed_type(tmp_path: Path, caplog: LogCaptureFixture) -> None:
# Create a StateManager instance with a temporary file
file = tmp_path / "test_state.json"
@ -222,7 +226,7 @@ def test_changed_type(tmp_path: Path, caplog: LogCaptureFixture):
) in caplog.text
def test_property_load_state(tmp_path: Path):
def test_property_load_state(tmp_path: Path) -> None:
# Create a StateManager instance with a temporary file
file = tmp_path / "test_state.json"

View File

@ -1,8 +1,9 @@
import logging
from pytest import LogCaptureFixture
import pydase
from pydase.data_service.data_service_observer import DataServiceObserver
from pydase.data_service.state_manager import StateManager
from pytest import LogCaptureFixture
logger = logging.getLogger()
@ -10,11 +11,11 @@ logger = logging.getLogger()
def test_autostart_task_callback(caplog: LogCaptureFixture) -> None:
class MyService(pydase.DataService):
def __init__(self) -> None:
super().__init__()
self._autostart_tasks = { # type: ignore
"my_task": (),
"my_other_task": (),
}
super().__init__()
async def my_task(self) -> None:
logger.info("Triggered task.")
@ -22,11 +23,13 @@ def test_autostart_task_callback(caplog: LogCaptureFixture) -> None:
async def my_other_task(self) -> None:
logger.info("Triggered other task.")
service = MyService()
service._task_manager.start_autostart_tasks()
service_instance = MyService()
state_manager = StateManager(service_instance)
DataServiceObserver(state_manager)
service_instance._task_manager.start_autostart_tasks()
assert "MyService.my_task changed to {}" in caplog.text
assert "MyService.my_other_task changed to {}" in caplog.text
assert "'my_task' changed to '{}'" in caplog.text
assert "'my_other_task' changed to '{}'" in caplog.text
def test_DataService_subclass_autostart_task_callback(
@ -34,11 +37,11 @@ def test_DataService_subclass_autostart_task_callback(
) -> None:
class MySubService(pydase.DataService):
def __init__(self) -> None:
super().__init__()
self._autostart_tasks = { # type: ignore
"my_task": (),
"my_other_task": (),
}
super().__init__()
async def my_task(self) -> None:
logger.info("Triggered task.")
@ -49,23 +52,25 @@ def test_DataService_subclass_autostart_task_callback(
class MyService(pydase.DataService):
sub_service = MySubService()
service = MyService()
service._task_manager.start_autostart_tasks()
service_instance = MyService()
state_manager = StateManager(service_instance)
DataServiceObserver(state_manager)
service_instance._task_manager.start_autostart_tasks()
assert "MyService.sub_service.my_task changed to {}" in caplog.text
assert "MyService.sub_service.my_other_task changed to {}" in caplog.text
assert "'sub_service.my_task' changed to '{}'" in caplog.text
assert "'sub_service.my_other_task' changed to '{}'" in caplog.text
def test_DataServiceList_subclass_autostart_task_callback(
def test_DataService_subclass_list_autostart_task_callback(
caplog: LogCaptureFixture,
) -> None:
class MySubService(pydase.DataService):
def __init__(self) -> None:
super().__init__()
self._autostart_tasks = { # type: ignore
"my_task": (),
"my_other_task": (),
}
super().__init__()
async def my_task(self) -> None:
logger.info("Triggered task.")
@ -76,10 +81,12 @@ def test_DataServiceList_subclass_autostart_task_callback(
class MyService(pydase.DataService):
sub_services_list = [MySubService() for i in range(2)]
service = MyService()
service._task_manager.start_autostart_tasks()
service_instance = MyService()
state_manager = StateManager(service_instance)
DataServiceObserver(state_manager)
service_instance._task_manager.start_autostart_tasks()
assert "MyService.sub_services_list[0].my_task changed to {}" in caplog.text
assert "MyService.sub_services_list[0].my_other_task changed to {}" in caplog.text
assert "MyService.sub_services_list[1].my_task changed to {}" in caplog.text
assert "MyService.sub_services_list[1].my_other_task changed to {}" in caplog.text
assert "'sub_services_list[0].my_task' changed to '{}'" in caplog.text
assert "'sub_services_list[0].my_other_task' changed to '{}'" in caplog.text
assert "'sub_services_list[1].my_task' changed to '{}'" in caplog.text
assert "'sub_services_list[1].my_other_task' changed to '{}'" in caplog.text

View File

@ -0,0 +1,173 @@
import logging
from typing import Any
import pytest
from pydase.observer_pattern.observable import Observable
from pydase.observer_pattern.observer import Observer
logger = logging.getLogger(__name__)
class MyObserver(Observer):
def on_change(self, full_access_path: str, value: Any) -> None:
logger.info("'%s' changed to '%s'", full_access_path, value)
def test_constructor_error_message(caplog: pytest.LogCaptureFixture) -> None:
class MyObservable(Observable):
def __init__(self) -> None:
self.attr = 1
super().__init__()
MyObservable()
assert (
"Ensure that super().__init__() is called at the start of the 'MyObservable' "
"constructor! Failing to do so may lead to unexpected behavior." in caplog.text
)
def test_simple_class_attribute(caplog: pytest.LogCaptureFixture) -> None:
class MyObservable(Observable):
int_attribute = 10
instance = MyObservable()
observer = MyObserver(instance)
instance.int_attribute = 12
assert "'int_attribute' changed to '12'" in caplog.text
def test_simple_instance_attribute(caplog: pytest.LogCaptureFixture) -> None:
class MyObservable(Observable):
def __init__(self) -> None:
super().__init__()
self.int_attribute = 10
instance = MyObservable()
observer = MyObserver(instance)
instance.int_attribute = 12
assert "'int_attribute' changed to '12'" in caplog.text
def test_nested_class_attribute(caplog: pytest.LogCaptureFixture) -> None:
class MySubclass(Observable):
name = "My Subclass"
class MyObservable(Observable):
subclass = MySubclass()
instance = MyObservable()
observer = MyObserver(instance)
instance.subclass.name = "Other name"
assert "'subclass.name' changed to 'Other name'" in caplog.text
def test_nested_instance_attribute(caplog: pytest.LogCaptureFixture) -> None:
class MySubclass(Observable):
def __init__(self) -> None:
super().__init__()
self.name = "Subclass name"
class MyObservable(Observable):
def __init__(self) -> None:
super().__init__()
self.subclass = MySubclass()
instance = MyObservable()
observer = MyObserver(instance)
instance.subclass.name = "Other name"
assert "'subclass.name' changed to 'Other name'" in caplog.text
def test_removed_observer_on_class_attr(caplog: pytest.LogCaptureFixture) -> None:
class NestedObservable(Observable):
name = "Hello"
nested_instance = NestedObservable()
class MyObservable(Observable):
nested_attr = nested_instance
changed_attr = nested_instance
instance = MyObservable()
observer = MyObserver(instance)
instance.changed_attr = "Ciao"
assert "'changed_attr' changed to 'Ciao'" in caplog.text
caplog.clear()
instance.nested_attr.name = "Hi"
assert "'nested_attr.name' changed to 'Hi'" in caplog.text
assert "'changed_attr.name' changed to 'Hi'" not in caplog.text
def test_removed_observer_on_instance_attr(caplog: pytest.LogCaptureFixture) -> None:
class NestedObservable(Observable):
def __init__(self) -> None:
super().__init__()
self.name = "Hello"
nested_instance = NestedObservable()
class MyObservable(Observable):
def __init__(self) -> None:
super().__init__()
self.nested_attr = nested_instance
self.changed_attr = nested_instance
instance = MyObservable()
observer = MyObserver(instance)
instance.changed_attr = "Ciao"
assert "'changed_attr' changed to 'Ciao'" in caplog.text
caplog.clear()
instance.nested_attr.name = "Hi"
assert "'nested_attr.name' changed to 'Hi'" in caplog.text
assert "'changed_attr.name' changed to 'Hi'" not in caplog.text
def test_property_getter(caplog: pytest.LogCaptureFixture) -> None:
class MyObservable(Observable):
def __init__(self) -> None:
super().__init__()
self._name = "Hello"
@property
def name(self) -> str:
"""The name property."""
return self._name
instance = MyObservable()
observer = MyObserver(instance)
_ = instance.name
assert "'name' changed to 'Hello'" in caplog.text
def test_property_setter(caplog: pytest.LogCaptureFixture) -> None:
class MyObservable(Observable):
def __init__(self) -> None:
super().__init__()
self._name = "Hello"
@property
def name(self) -> str:
return self._name
@name.setter
def name(self, value: str) -> None:
self._name = value
instance = MyObservable()
observer = MyObserver(instance)
instance.name = "Ciao"
assert "'name' changed to 'Hello'" not in caplog.text
assert "'name' changed to 'Ciao'" in caplog.text

View File

@ -0,0 +1,282 @@
import logging
from typing import Any
import pytest
from pydase.observer_pattern.observable import Observable
from pydase.observer_pattern.observer import Observer
logger = logging.getLogger(__name__)
class MyObserver(Observer):
def on_change(self, full_access_path: str, value: Any) -> None:
logger.info("'%s' changed to '%s'", full_access_path, value)
def test_simple_instance_list_attribute(caplog: pytest.LogCaptureFixture) -> None:
class MyObservable(Observable):
def __init__(self) -> None:
super().__init__()
self.list_attr = [1, 2]
instance = MyObservable()
observer = MyObserver(instance)
instance.list_attr[0] = 12
assert "'list_attr[0]' changed to '12'" in caplog.text
def test_instance_object_list_attribute(caplog: pytest.LogCaptureFixture) -> None:
class NestedObservable(Observable):
def __init__(self) -> None:
super().__init__()
self.name = "Hello"
class MyObservable(Observable):
def __init__(self) -> None:
super().__init__()
self.list_attr = [NestedObservable()]
instance = MyObservable()
observer = MyObserver(instance)
instance.list_attr[0].name = "Ciao"
assert "'list_attr[0].name' changed to 'Ciao'" in caplog.text
def test_simple_class_list_attribute(caplog: pytest.LogCaptureFixture) -> None:
class MyObservable(Observable):
list_attr = [1, 2]
instance = MyObservable()
observer = MyObserver(instance)
instance.list_attr[0] = 12
assert "'list_attr[0]' changed to '12'" in caplog.text
def test_class_object_list_attribute(caplog: pytest.LogCaptureFixture) -> None:
class NestedObservable(Observable):
name = "Hello"
class MyObservable(Observable):
list_attr = [NestedObservable()]
instance = MyObservable()
observer = MyObserver(instance)
instance.list_attr[0].name = "Ciao"
assert "'list_attr[0].name' changed to 'Ciao'" in caplog.text
def test_simple_instance_dict_attribute(caplog: pytest.LogCaptureFixture) -> None:
class MyObservable(Observable):
def __init__(self) -> None:
super().__init__()
self.dict_attr = {"first": "Hello"}
instance = MyObservable()
observer = MyObserver(instance)
instance.dict_attr["first"] = "Ciao"
instance.dict_attr["second"] = "World"
assert "'dict_attr['first']' changed to 'Ciao'" in caplog.text
assert "'dict_attr['second']' changed to 'World'" in caplog.text
def test_simple_class_dict_attribute(caplog: pytest.LogCaptureFixture) -> None:
class MyObservable(Observable):
dict_attr = {"first": "Hello"}
instance = MyObservable()
observer = MyObserver(instance)
instance.dict_attr["first"] = "Ciao"
instance.dict_attr["second"] = "World"
assert "'dict_attr['first']' changed to 'Ciao'" in caplog.text
assert "'dict_attr['second']' changed to 'World'" in caplog.text
def test_instance_dict_attribute(caplog: pytest.LogCaptureFixture) -> None:
class NestedObservable(Observable):
def __init__(self) -> None:
super().__init__()
self.name = "Hello"
class MyObservable(Observable):
def __init__(self) -> None:
super().__init__()
self.dict_attr = {"first": NestedObservable()}
instance = MyObservable()
observer = MyObserver(instance)
instance.dict_attr["first"].name = "Ciao"
assert "'dict_attr['first'].name' changed to 'Ciao'" in caplog.text
def test_class_dict_attribute(caplog: pytest.LogCaptureFixture) -> None:
class NestedObservable(Observable):
name = "Hello"
class MyObservable(Observable):
dict_attr = {"first": NestedObservable()}
instance = MyObservable()
observer = MyObserver(instance)
instance.dict_attr["first"].name = "Ciao"
assert "'dict_attr['first'].name' changed to 'Ciao'" in caplog.text
def test_removed_observer_on_class_list_attr(caplog: pytest.LogCaptureFixture) -> None:
class NestedObservable(Observable):
name = "Hello"
nested_instance = NestedObservable()
class MyObservable(Observable):
nested_attr = nested_instance
changed_list_attr = [nested_instance]
instance = MyObservable()
observer = MyObserver(instance)
instance.changed_list_attr[0] = "Ciao"
assert "'changed_list_attr[0]' changed to 'Ciao'" in caplog.text
caplog.clear()
instance.nested_attr.name = "Hi"
assert "'nested_attr.name' changed to 'Hi'" in caplog.text
assert "'changed_list_attr[0].name' changed to 'Hi'" not in caplog.text
def test_removed_observer_on_instance_dict_attr(
caplog: pytest.LogCaptureFixture,
) -> None:
class NestedObservable(Observable):
def __init__(self) -> None:
super().__init__()
self.name = "Hello"
nested_instance = NestedObservable()
class MyObservable(Observable):
def __init__(self) -> None:
super().__init__()
self.nested_attr = nested_instance
self.changed_dict_attr = {"nested": nested_instance}
instance = MyObservable()
observer = MyObserver(instance)
instance.changed_dict_attr["nested"] = "Ciao"
assert "'changed_dict_attr['nested']' changed to 'Ciao'" in caplog.text
caplog.clear()
instance.nested_attr.name = "Hi"
assert "'nested_attr.name' changed to 'Hi'" in caplog.text
assert "'changed_dict_attr['nested'].name' changed to 'Hi'" not in caplog.text
def test_removed_observer_on_instance_list_attr(
caplog: pytest.LogCaptureFixture,
) -> None:
class NestedObservable(Observable):
def __init__(self) -> None:
super().__init__()
self.name = "Hello"
nested_instance = NestedObservable()
class MyObservable(Observable):
def __init__(self) -> None:
super().__init__()
self.nested_attr = nested_instance
self.changed_list_attr = [nested_instance]
instance = MyObservable()
observer = MyObserver(instance)
instance.changed_list_attr[0] = "Ciao"
assert "'changed_list_attr[0]' changed to 'Ciao'" in caplog.text
caplog.clear()
instance.nested_attr.name = "Hi"
assert "'nested_attr.name' changed to 'Hi'" in caplog.text
assert "'changed_list_attr[0].name' changed to 'Hi'" not in caplog.text
def test_removed_observer_on_class_dict_attr(caplog: pytest.LogCaptureFixture) -> None:
class NestedObservable(Observable):
def __init__(self) -> None:
super().__init__()
self.name = "Hello"
nested_instance = NestedObservable()
class MyObservable(Observable):
def __init__(self) -> None:
super().__init__()
self.nested_attr = nested_instance
self.changed_dict_attr = {"nested": nested_instance}
instance = MyObservable()
observer = MyObserver(instance)
instance.changed_dict_attr["nested"] = "Ciao"
assert "'changed_dict_attr['nested']' changed to 'Ciao'" in caplog.text
caplog.clear()
instance.nested_attr.name = "Hi"
assert "'nested_attr.name' changed to 'Hi'" in caplog.text
assert "'changed_dict_attr['nested'].name' changed to 'Hi'" not in caplog.text
def test_nested_dict_instances(caplog: pytest.LogCaptureFixture) -> None:
dict_instance = {"first": "Hello", "second": "World"}
class MyObservable(Observable):
def __init__(self) -> None:
super().__init__()
self.nested_dict_attr = {"nested": dict_instance}
instance = MyObservable()
observer = MyObserver(instance)
instance.nested_dict_attr["nested"]["first"] = "Ciao"
assert "'nested_dict_attr['nested']['first']' changed to 'Ciao'" in caplog.text
def test_dict_in_list_instance(caplog: pytest.LogCaptureFixture) -> None:
dict_instance = {"first": "Hello", "second": "World"}
class MyObservable(Observable):
def __init__(self) -> None:
super().__init__()
self.dict_in_list = [dict_instance]
instance = MyObservable()
observer = MyObserver(instance)
instance.dict_in_list[0]["first"] = "Ciao"
assert "'dict_in_list[0]['first']' changed to 'Ciao'" in caplog.text
def test_list_in_dict_instance(caplog: pytest.LogCaptureFixture) -> None:
list_instance = [1, 2, 3]
class MyObservable(Observable):
def __init__(self) -> None:
super().__init__()
self.list_in_dict = {"some_list": list_instance}
instance = MyObservable()
observer = MyObserver(instance)
instance.list_in_dict["some_list"][0] = "Ciao"
assert "'list_in_dict['some_list'][0]' changed to 'Ciao'" in caplog.text

View File

@ -0,0 +1,25 @@
from typing import Any
import pytest
from pydase.observer_pattern.observable import Observable
from pydase.observer_pattern.observer import Observer
def test_abstract_method_error() -> None:
class MyObserver(Observer):
pass
class MyObservable(Observable):
pass
with pytest.raises(TypeError):
MyObserver(MyObservable())
def test_constructor_error() -> None:
class MyObserver(Observer):
def on_change(self, full_access_path: str, value: Any) -> None:
pass
with pytest.raises(TypeError):
MyObserver()

View File

@ -1,6 +1,7 @@
from pytest import LogCaptureFixture
from pydase import DataService
from pydase.data_service.data_service_observer import DataServiceObserver
from pydase.data_service.state_manager import StateManager
from pytest import LogCaptureFixture
def test_class_attributes(caplog: LogCaptureFixture) -> None:
@ -11,9 +12,11 @@ def test_class_attributes(caplog: LogCaptureFixture) -> None:
attr_1 = SubClass()
service_instance = ServiceClass()
state_manager = StateManager(service_instance)
DataServiceObserver(state_manager)
service_instance.attr_1.name = "Hi"
assert "ServiceClass.attr_1.name changed to Hi" in caplog.text
assert "'attr_1.name' changed to 'Hi'" in caplog.text
def test_instance_attributes(caplog: LogCaptureFixture) -> None:
@ -22,13 +25,15 @@ def test_instance_attributes(caplog: LogCaptureFixture) -> None:
class ServiceClass(DataService):
def __init__(self) -> None:
self.attr_1 = SubClass()
super().__init__()
self.attr_1 = SubClass()
service_instance = ServiceClass()
state_manager = StateManager(service_instance)
DataServiceObserver(state_manager)
service_instance.attr_1.name = "Hi"
assert "ServiceClass.attr_1.name changed to Hi" in caplog.text
assert "'attr_1.name' changed to 'Hi'" in caplog.text
def test_class_attribute(caplog: LogCaptureFixture) -> None:
@ -36,21 +41,25 @@ def test_class_attribute(caplog: LogCaptureFixture) -> None:
attr = 0
service_instance = ServiceClass()
state_manager = StateManager(service_instance)
DataServiceObserver(state_manager)
service_instance.attr = 1
assert "ServiceClass.attr changed to 1" in caplog.text
assert "'attr' changed to '1'" in caplog.text
def test_instance_attribute(caplog: LogCaptureFixture) -> None:
class ServiceClass(DataService):
def __init__(self) -> None:
self.attr = "Hello World"
super().__init__()
self.attr = "Hello World"
service_instance = ServiceClass()
state_manager = StateManager(service_instance)
DataServiceObserver(state_manager)
service_instance.attr = "Hello"
assert "ServiceClass.attr changed to Hello" in caplog.text
assert "'attr' changed to 'Hello'" in caplog.text
def test_reused_instance_attributes(caplog: LogCaptureFixture) -> None:
@ -61,16 +70,19 @@ def test_reused_instance_attributes(caplog: LogCaptureFixture) -> None:
class ServiceClass(DataService):
def __init__(self) -> None:
super().__init__()
self.attr_1 = subclass_instance
self.attr_2 = subclass_instance
super().__init__()
service_instance = ServiceClass()
state_manager = StateManager(service_instance)
DataServiceObserver(state_manager)
service_instance.attr_1.name = "Hi"
assert service_instance.attr_1 == service_instance.attr_2
assert "ServiceClass.attr_1.name changed to Hi" in caplog.text
assert "ServiceClass.attr_2.name changed to Hi" in caplog.text
assert "'attr_1.name' changed to 'Hi'" in caplog.text
assert "'attr_2.name' changed to 'Hi'" in caplog.text
def test_reused_attributes_mixed(caplog: LogCaptureFixture) -> None:
@ -83,15 +95,18 @@ def test_reused_attributes_mixed(caplog: LogCaptureFixture) -> None:
attr_1 = subclass_instance
def __init__(self) -> None:
self.attr_2 = subclass_instance
super().__init__()
self.attr_2 = subclass_instance
service_instance = ServiceClass()
state_manager = StateManager(service_instance)
DataServiceObserver(state_manager)
service_instance.attr_1.name = "Hi"
assert service_instance.attr_1 == service_instance.attr_2
assert "ServiceClass.attr_1.name changed to Hi" in caplog.text
assert "ServiceClass.attr_2.name changed to Hi" in caplog.text
assert "'attr_1.name' changed to 'Hi'" in caplog.text
assert "'attr_2.name' changed to 'Hi'" in caplog.text
def test_nested_class_attributes(caplog: LogCaptureFixture) -> None:
@ -111,15 +126,18 @@ def test_nested_class_attributes(caplog: LogCaptureFixture) -> None:
attr = SubClass()
service_instance = ServiceClass()
state_manager = StateManager(service_instance)
DataServiceObserver(state_manager)
service_instance.attr.attr.attr.name = "Hi"
service_instance.attr.attr.name = "Hou"
service_instance.attr.name = "foo"
service_instance.name = "bar"
assert "ServiceClass.attr.attr.attr.name changed to Hi" in caplog.text
assert "ServiceClass.attr.attr.name changed to Hou" in caplog.text
assert "ServiceClass.attr.name changed to foo" in caplog.text
assert "ServiceClass.name changed to bar" in caplog.text
assert "'attr.attr.attr.name' changed to 'Hi'" in caplog.text
assert "'attr.attr.name' changed to 'Hou'" in caplog.text
assert "'attr.name' changed to 'foo'" in caplog.text
assert "'name' changed to 'bar'" in caplog.text
def test_nested_instance_attributes(caplog: LogCaptureFixture) -> None:
@ -128,32 +146,35 @@ def test_nested_instance_attributes(caplog: LogCaptureFixture) -> None:
class SubSubClass(DataService):
def __init__(self) -> None:
super().__init__()
self.attr = SubSubSubClass()
self.name = "Hello"
super().__init__()
class SubClass(DataService):
def __init__(self) -> None:
super().__init__()
self.attr = SubSubClass()
self.name = "Hello"
super().__init__()
class ServiceClass(DataService):
def __init__(self) -> None:
super().__init__()
self.attr = SubClass()
self.name = "Hello"
super().__init__()
service_instance = ServiceClass()
state_manager = StateManager(service_instance)
DataServiceObserver(state_manager)
service_instance.attr.attr.attr.name = "Hi"
service_instance.attr.attr.name = "Hou"
service_instance.attr.name = "foo"
service_instance.name = "bar"
assert "ServiceClass.attr.attr.attr.name changed to Hi" in caplog.text
assert "ServiceClass.attr.attr.name changed to Hou" in caplog.text
assert "ServiceClass.attr.name changed to foo" in caplog.text
assert "ServiceClass.name changed to bar" in caplog.text
assert "'attr.attr.attr.name' changed to 'Hi'" in caplog.text
assert "'attr.attr.name' changed to 'Hou'" in caplog.text
assert "'attr.name' changed to 'foo'" in caplog.text
assert "'name' changed to 'bar'" in caplog.text
def test_advanced_nested_class_attributes(caplog: LogCaptureFixture) -> None:
@ -171,14 +192,19 @@ def test_advanced_nested_class_attributes(caplog: LogCaptureFixture) -> None:
subattr = SubSubClass()
service_instance = ServiceClass()
state_manager = StateManager(service_instance)
DataServiceObserver(state_manager)
service_instance.attr.attr.attr.name = "Hi"
assert "ServiceClass.attr.attr.attr.name changed to Hi" in caplog.text
assert "ServiceClass.subattr.attr.name changed to Hi" in caplog.text
assert "'attr.attr.attr.name' changed to 'Hi'" in caplog.text
assert "'subattr.attr.name' changed to 'Hi'" in caplog.text
caplog.clear()
service_instance.subattr.attr.name = "Ho"
assert "ServiceClass.attr.attr.attr.name changed to Ho" in caplog.text
assert "ServiceClass.subattr.attr.name changed to Ho" in caplog.text
assert "'attr.attr.attr.name' changed to 'Ho'" in caplog.text
assert "'subattr.attr.name' changed to 'Ho'" in caplog.text
def test_advanced_nested_instance_attributes(caplog: LogCaptureFixture) -> None:
@ -187,32 +213,34 @@ def test_advanced_nested_instance_attributes(caplog: LogCaptureFixture) -> None:
class SubSubClass(DataService):
def __init__(self) -> None:
self.attr = SubSubSubClass()
super().__init__()
self.attr = SubSubSubClass()
subsubclass_instance = SubSubClass()
class SubClass(DataService):
def __init__(self) -> None:
self.attr = subsubclass_instance
super().__init__()
self.attr = subsubclass_instance
class ServiceClass(DataService):
def __init__(self) -> None:
super().__init__()
self.attr = SubClass()
self.subattr = subsubclass_instance
super().__init__()
service_instance = ServiceClass()
state_manager = StateManager(service_instance)
DataServiceObserver(state_manager)
service_instance.attr.attr.attr.name = "Hi"
assert "ServiceClass.attr.attr.attr.name changed to Hi" in caplog.text
assert "ServiceClass.subattr.attr.name changed to Hi" in caplog.text
assert "'attr.attr.attr.name' changed to 'Hi'" in caplog.text
assert "'subattr.attr.name' changed to 'Hi'" in caplog.text
caplog.clear()
service_instance.subattr.attr.name = "Ho"
assert "ServiceClass.attr.attr.attr.name changed to Ho" in caplog.text
assert "ServiceClass.subattr.attr.name changed to Ho" in caplog.text
assert "'attr.attr.attr.name' changed to 'Ho'" in caplog.text
assert "'subattr.attr.name' changed to 'Ho'" in caplog.text
caplog.clear()
@ -224,17 +252,20 @@ def test_advanced_nested_attributes_mixed(caplog: LogCaptureFixture) -> None:
class_attr = SubSubClass()
def __init__(self) -> None:
self.attr_1 = SubSubClass()
super().__init__()
self.attr_1 = SubSubClass()
class ServiceClass(DataService):
class_attr = SubClass()
def __init__(self) -> None:
self.attr = SubClass()
super().__init__()
self.attr = SubClass()
service_instance = ServiceClass()
state_manager = StateManager(service_instance)
DataServiceObserver(state_manager)
# Subclass.attr is the same for all instances
assert service_instance.attr.class_attr == service_instance.class_attr.class_attr
@ -245,23 +276,23 @@ def test_advanced_nested_attributes_mixed(caplog: LogCaptureFixture) -> None:
assert service_instance.attr.attr_1 != service_instance.class_attr.class_attr
service_instance.class_attr.class_attr.name = "Ho"
assert "ServiceClass.class_attr.class_attr.name changed to Ho" in caplog.text
assert "ServiceClass.attr.class_attr.name changed to Ho" in caplog.text
assert "'class_attr.class_attr.name' changed to 'Ho'" in caplog.text
assert "'attr.class_attr.name' changed to 'Ho'" in caplog.text
caplog.clear()
service_instance.class_attr.attr_1.name = "Ho"
assert "ServiceClass.class_attr.attr_1.name changed to Ho" in caplog.text
assert "ServiceClass.attr.attr_1.name changed to Ho" not in caplog.text
assert "'class_attr.attr_1.name' changed to 'Ho'" in caplog.text
assert "'attr.attr_1.name' changed to 'Ho'" not in caplog.text
caplog.clear()
service_instance.attr.class_attr.name = "Ho"
assert "ServiceClass.class_attr.class_attr.name changed to Ho" in caplog.text
assert "ServiceClass.attr.class_attr.name changed to Ho" in caplog.text
service_instance.attr.class_attr.name = "Hello"
assert "'class_attr.class_attr.name' changed to 'Hello'" in caplog.text
assert "'attr.class_attr.name' changed to 'Hello'" in caplog.text
caplog.clear()
service_instance.attr.attr_1.name = "Ho"
assert "ServiceClass.attr.attr_1.name changed to Ho" in caplog.text
assert "ServiceClass.class_attr.attr_1.name changed to Ho" not in caplog.text
assert "'attr.attr_1.name' changed to 'Ho'" in caplog.text
assert "'class_attr.attr_1.name' changed to 'Ho'" not in caplog.text
caplog.clear()
@ -277,32 +308,34 @@ def test_class_list_attributes(caplog: LogCaptureFixture) -> None:
attr = subclass_instance
service_instance = ServiceClass()
state_manager = StateManager(service_instance)
DataServiceObserver(state_manager)
assert service_instance.attr_list[0] != service_instance.attr_list[1]
service_instance.attr_list[0].name = "Ho"
assert "ServiceClass.attr_list[0].name changed to Ho" in caplog.text
assert "ServiceClass.attr_list[1].name changed to Ho" not in caplog.text
assert "'attr_list[0].name' changed to 'Ho'" in caplog.text
assert "'attr_list[1].name' changed to 'Ho'" not in caplog.text
caplog.clear()
service_instance.attr_list[1].name = "Ho"
assert "ServiceClass.attr_list[0].name changed to Ho" not in caplog.text
assert "ServiceClass.attr_list[1].name changed to Ho" in caplog.text
assert "'attr_list[0].name' changed to 'Ho'" not in caplog.text
assert "'attr_list[1].name' changed to 'Ho'" in caplog.text
caplog.clear()
assert service_instance.attr_list_2[0] == service_instance.attr
assert service_instance.attr_list_2[0] == service_instance.attr_list_2[1]
service_instance.attr_list_2[0].name = "Ho"
assert "ServiceClass.attr_list_2[0].name changed to Ho" in caplog.text
assert "ServiceClass.attr_list_2[1].name changed to Ho" in caplog.text
assert "ServiceClass.attr.name changed to Ho" in caplog.text
service_instance.attr_list_2[0].name = "Ciao"
assert "'attr_list_2[0].name' changed to 'Ciao'" in caplog.text
assert "'attr_list_2[1].name' changed to 'Ciao'" in caplog.text
assert "'attr.name' changed to 'Ciao'" in caplog.text
caplog.clear()
service_instance.attr_list_2[1].name = "Ho"
assert "ServiceClass.attr_list_2[0].name changed to Ho" in caplog.text
assert "ServiceClass.attr_list_2[1].name changed to Ho" in caplog.text
assert "ServiceClass.attr.name changed to Ho" in caplog.text
service_instance.attr_list_2[1].name = "Bye"
assert "'attr_list_2[0].name' changed to 'Bye'" in caplog.text
assert "'attr_list_2[1].name' changed to 'Bye'" in caplog.text
assert "'attr.name' changed to 'Bye'" in caplog.text
caplog.clear()
@ -320,17 +353,19 @@ def test_nested_class_list_attributes(caplog: LogCaptureFixture) -> None:
subattr = subsubclass_instance
service_instance = ServiceClass()
state_manager = StateManager(service_instance)
DataServiceObserver(state_manager)
assert service_instance.attr[0].attr_list[0] == service_instance.subattr
service_instance.attr[0].attr_list[0].name = "Ho"
assert "ServiceClass.attr[0].attr_list[0].name changed to Ho" in caplog.text
assert "ServiceClass.subattr.name changed to Ho" in caplog.text
assert "'attr[0].attr_list[0].name' changed to 'Ho'" in caplog.text
assert "'subattr.name' changed to 'Ho'" in caplog.text
caplog.clear()
service_instance.subattr.name = "Ho"
assert "ServiceClass.attr[0].attr_list[0].name changed to Ho" in caplog.text
assert "ServiceClass.subattr.name changed to Ho" in caplog.text
service_instance.subattr.name = "Hi"
assert "'attr[0].attr_list[0].name' changed to 'Hi'" in caplog.text
assert "'subattr.name' changed to 'Hi'" in caplog.text
caplog.clear()
@ -342,44 +377,46 @@ def test_instance_list_attributes(caplog: LogCaptureFixture) -> None:
class ServiceClass(DataService):
def __init__(self) -> None:
super().__init__()
self.attr_list = [SubClass() for _ in range(2)]
self.attr_list_2 = [subclass_instance, subclass_instance]
self.attr = subclass_instance
super().__init__()
service_instance = ServiceClass()
state_manager = StateManager(service_instance)
DataServiceObserver(state_manager)
assert service_instance.attr_list[0] != service_instance.attr_list[1]
service_instance.attr_list[0].name = "Ho"
assert "ServiceClass.attr_list[0].name changed to Ho" in caplog.text
assert "ServiceClass.attr_list[1].name changed to Ho" not in caplog.text
assert "'attr_list[0].name' changed to 'Ho'" in caplog.text
assert "'attr_list[1].name' changed to 'Ho'" not in caplog.text
caplog.clear()
service_instance.attr_list[1].name = "Ho"
assert "ServiceClass.attr_list[0].name changed to Ho" not in caplog.text
assert "ServiceClass.attr_list[1].name changed to Ho" in caplog.text
service_instance.attr_list[1].name = "Hi"
assert "'attr_list[0].name' changed to 'Hi'" not in caplog.text
assert "'attr_list[1].name' changed to 'Hi'" in caplog.text
caplog.clear()
assert service_instance.attr_list_2[0] == service_instance.attr
assert service_instance.attr_list_2[0] == service_instance.attr_list_2[1]
service_instance.attr_list_2[0].name = "Ho"
assert "ServiceClass.attr.name changed to Ho" in caplog.text
assert "ServiceClass.attr_list_2[0].name changed to Ho" in caplog.text
assert "ServiceClass.attr_list_2[1].name changed to Ho" in caplog.text
service_instance.attr_list_2[0].name = "Ciao"
assert "'attr.name' changed to 'Ciao'" in caplog.text
assert "'attr_list_2[0].name' changed to 'Ciao'" in caplog.text
assert "'attr_list_2[1].name' changed to 'Ciao'" in caplog.text
caplog.clear()
service_instance.attr_list_2[1].name = "Ho"
assert "ServiceClass.attr.name changed to Ho" in caplog.text
assert "ServiceClass.attr_list_2[0].name changed to Ho" in caplog.text
assert "ServiceClass.attr_list_2[1].name changed to Ho" in caplog.text
service_instance.attr_list_2[1].name = "Bye"
assert "'attr.name' changed to 'Bye'" in caplog.text
assert "'attr_list_2[0].name' changed to 'Bye'" in caplog.text
assert "'attr_list_2[1].name' changed to 'Bye'" in caplog.text
caplog.clear()
service_instance.attr.name = "Ho"
assert "ServiceClass.attr.name changed to Ho" in caplog.text
assert "ServiceClass.attr_list_2[0].name changed to Ho" in caplog.text
assert "ServiceClass.attr_list_2[1].name changed to Ho" in caplog.text
assert "'attr.name' changed to 'Ho'" in caplog.text
assert "'attr_list_2[0].name' changed to 'Ho'" in caplog.text
assert "'attr_list_2[1].name' changed to 'Ho'" in caplog.text
caplog.clear()
@ -391,26 +428,28 @@ def test_nested_instance_list_attributes(caplog: LogCaptureFixture) -> None:
class SubClass(DataService):
def __init__(self) -> None:
self.attr_list = [subsubclass_instance]
super().__init__()
self.attr_list = [subsubclass_instance]
class ServiceClass(DataService):
class_attr = subsubclass_instance
def __init__(self) -> None:
self.attr = [SubClass()]
super().__init__()
self.attr = [SubClass()]
service_instance = ServiceClass()
state_manager = StateManager(service_instance)
DataServiceObserver(state_manager)
assert service_instance.attr[0].attr_list[0] == service_instance.class_attr
service_instance.attr[0].attr_list[0].name = "Ho"
assert "ServiceClass.attr[0].attr_list[0].name changed to Ho" in caplog.text
assert "ServiceClass.class_attr.name changed to Ho" in caplog.text
assert "'attr[0].attr_list[0].name' changed to 'Ho'" in caplog.text
assert "'class_attr.name' changed to 'Ho'" in caplog.text
caplog.clear()
service_instance.class_attr.name = "Ho"
assert "ServiceClass.attr[0].attr_list[0].name changed to Ho" in caplog.text
assert "ServiceClass.class_attr.name changed to Ho" in caplog.text
service_instance.class_attr.name = "Hi"
assert "'attr[0].attr_list[0].name' changed to 'Hi'" in caplog.text
assert "'class_attr.name' changed to 'Hi'" in caplog.text
caplog.clear()

View File

@ -1,6 +1,7 @@
from pytest import LogCaptureFixture
from pydase import DataService
from pydase.data_service.data_service_observer import DataServiceObserver
from pydase.data_service.state_manager import StateManager
from pytest import LogCaptureFixture
def test_properties(caplog: LogCaptureFixture) -> None:
@ -28,17 +29,20 @@ def test_properties(caplog: LogCaptureFixture) -> None:
def current(self, value: float) -> None:
self._current = value
test_service = ServiceClass()
test_service.voltage = 1
service_instance = ServiceClass()
state_manager = StateManager(service_instance)
DataServiceObserver(state_manager)
assert "ServiceClass.power changed to 1.0" in caplog.text
assert "ServiceClass.voltage changed to 1.0" in caplog.text
service_instance.voltage = 1.0
assert "'power' changed to '1.0'" in caplog.text
assert "'voltage' changed to '1.0'" in caplog.text
caplog.clear()
test_service.current = 12.0
service_instance.current = 12.0
assert "ServiceClass.power changed to 12.0" in caplog.text
assert "ServiceClass.current changed to 12.0" in caplog.text
assert "'power' changed to '12.0'" in caplog.text
assert "'current' changed to '12.0'" in caplog.text
def test_nested_properties(caplog: LogCaptureFixture) -> None:
@ -61,30 +65,32 @@ def test_nested_properties(caplog: LogCaptureFixture) -> None:
def sub_name(self) -> str:
return f"{self.class_attr.name} {self.name}"
test_service = ServiceClass()
test_service.name = "Peepz"
service_instance = ServiceClass()
state_manager = StateManager(service_instance)
DataServiceObserver(state_manager)
assert "ServiceClass.name changed to Peepz" in caplog.text
assert "ServiceClass.sub_name changed to Hello Peepz" in caplog.text
assert "ServiceClass.subsub_name changed to Hello Peepz" in caplog.text
service_instance.name = "Peepz"
assert "'name' changed to 'Peepz'" in caplog.text
assert "'sub_name' changed to 'Hello Peepz'" in caplog.text
assert "'subsub_name' changed to 'Hello Peepz'" in caplog.text
caplog.clear()
test_service.class_attr.name = "Hi"
service_instance.class_attr.name = "Hi"
assert service_instance.subsub_name == "Hello Peepz"
assert "ServiceClass.sub_name changed to Hi Peepz" in caplog.text
assert (
"ServiceClass.subsub_name changed to Hello Peepz" in caplog.text
) # registers subclass changes
assert "ServiceClass.class_attr.name changed to Hi" in caplog.text
assert "'sub_name' changed to 'Hi Peepz'" in caplog.text
assert "'subsub_name' " not in caplog.text # subsub_name does not depend on change
assert "'class_attr.name' changed to 'Hi'" in caplog.text
caplog.clear()
test_service.class_attr.class_attr.name = "Ciao"
service_instance.class_attr.class_attr.name = "Ciao"
assert (
"ServiceClass.sub_name changed to Hi Peepz" in caplog.text
) # registers subclass changes
assert "ServiceClass.subsub_name changed to Ciao Peepz" in caplog.text
assert "ServiceClass.class_attr.class_attr.name changed to Ciao" in caplog.text
"'sub_name' changed to" not in caplog.text
) # sub_name does not depend on change
assert "'subsub_name' changed to 'Ciao Peepz'" in caplog.text
assert "'class_attr.class_attr.name' changed to 'Ciao'" in caplog.text
caplog.clear()
@ -97,17 +103,20 @@ def test_simple_list_properties(caplog: LogCaptureFixture) -> None:
def total_name(self) -> str:
return f"{self.list[0]} {self.name}"
test_service = ServiceClass()
test_service.name = "Peepz"
service_instance = ServiceClass()
state_manager = StateManager(service_instance)
DataServiceObserver(state_manager)
assert "ServiceClass.name changed to Peepz" in caplog.text
assert "ServiceClass.total_name changed to Hello Peepz" in caplog.text
service_instance.name = "Peepz"
assert "'name' changed to 'Peepz'" in caplog.text
assert "'total_name' changed to 'Hello Peepz'" in caplog.text
caplog.clear()
test_service.list[0] = "Hi"
service_instance.list[0] = "Hi"
assert "ServiceClass.total_name changed to Hi Peepz" in caplog.text
assert "ServiceClass.list[0] changed to Hi" in caplog.text
assert "'total_name' changed to 'Hi Peepz'" in caplog.text
assert "'list[0]' changed to 'Hi'" in caplog.text
def test_class_list_properties(caplog: LogCaptureFixture) -> None:
@ -122,23 +131,26 @@ def test_class_list_properties(caplog: LogCaptureFixture) -> None:
def total_name(self) -> str:
return f"{self.list[0].name} {self.name}"
test_service = ServiceClass()
test_service.name = "Peepz"
service_instance = ServiceClass()
state_manager = StateManager(service_instance)
DataServiceObserver(state_manager)
assert "ServiceClass.name changed to Peepz" in caplog.text
assert "ServiceClass.total_name changed to Hello Peepz" in caplog.text
service_instance.name = "Peepz"
assert "'name' changed to 'Peepz'" in caplog.text
assert "'total_name' changed to 'Hello Peepz'" in caplog.text
caplog.clear()
test_service.list[0].name = "Hi"
service_instance.list[0].name = "Hi"
assert "ServiceClass.total_name changed to Hi Peepz" in caplog.text
assert "ServiceClass.list[0].name changed to Hi" in caplog.text
assert "'total_name' changed to 'Hi Peepz'" in caplog.text
assert "'list[0].name' changed to 'Hi'" in caplog.text
def test_subclass_properties(caplog: LogCaptureFixture) -> None:
class SubClass(DataService):
name = "Hello"
_voltage = 10.0
_voltage = 11.0
_current = 1.0
@property
@ -168,14 +180,15 @@ def test_subclass_properties(caplog: LogCaptureFixture) -> None:
def voltage(self) -> float:
return self.class_attr.voltage
test_service = ServiceClass()
test_service.class_attr.voltage = 10.0
service_instance = ServiceClass()
state_manager = StateManager(service_instance)
DataServiceObserver(state_manager)
# using a set here as "ServiceClass.voltage = 10.0" is emitted twice. Once for
# changing voltage, and once for changing power.
assert "ServiceClass.class_attr.voltage changed to 10.0" in caplog.text
assert "ServiceClass.class_attr.power changed to 10.0" in caplog.text
assert "ServiceClass.voltage changed to 10.0" in caplog.text
service_instance.class_attr.voltage = 10.0
assert "'class_attr.voltage' changed to '10.0'" in caplog.text
assert "'class_attr.power' changed to '10.0'" in caplog.text
assert "'voltage' changed to '10.0'" in caplog.text
caplog.clear()
@ -212,17 +225,20 @@ def test_subclass_properties_2(caplog: LogCaptureFixture) -> None:
def voltage(self) -> float:
return self.class_attr[0].voltage
test_service = ServiceClass()
test_service.class_attr[1].current = 10.0
service_instance = ServiceClass()
state_manager = StateManager(service_instance)
DataServiceObserver(state_manager)
# using a set here as "ServiceClass.voltage = 10.0" is emitted twice. Once for
# changing current, and once for changing power. Note that the voltage property is
# only dependent on class_attr[0] but still emits an update notification. This is
# because every time any item in the list `test_service.class_attr` is changed,
# a notification will be emitted.
assert "ServiceClass.class_attr[1].current changed to 10.0" in caplog.text
assert "ServiceClass.class_attr[1].power changed to 100.0" in caplog.text
assert "ServiceClass.voltage changed to 10.0" in caplog.text
service_instance.class_attr[0].current = 10.0
assert "'class_attr[0].current' changed to '10.0'" in caplog.text
assert "'class_attr[0].power' changed to '100.0'" in caplog.text
caplog.clear()
service_instance.class_attr[0].voltage = 11.0
assert "'class_attr[0].voltage' changed to '11.0'" in caplog.text
assert "'class_attr[0].power' changed to '110.0'" in caplog.text
assert "'voltage' changed to '11.0'" in caplog.text
def test_subsubclass_properties(caplog: LogCaptureFixture) -> None:
@ -252,25 +268,23 @@ def test_subsubclass_properties(caplog: LogCaptureFixture) -> None:
def power(self) -> float:
return self.class_attr[0].power
test_service = ServiceClass()
service_instance = ServiceClass()
state_manager = StateManager(service_instance)
DataServiceObserver(state_manager)
test_service.class_attr[1].class_attr.voltage = 100.0
assert (
"ServiceClass.class_attr[0].class_attr.voltage changed to 100.0" in caplog.text
)
assert (
"ServiceClass.class_attr[1].class_attr.voltage changed to 100.0" in caplog.text
)
assert "ServiceClass.class_attr[0].power changed to 50.0" in caplog.text
assert "ServiceClass.class_attr[1].power changed to 50.0" in caplog.text
assert "ServiceClass.power changed to 50.0" in caplog.text
service_instance.class_attr[1].class_attr.voltage = 100.0
assert "'class_attr[0].class_attr.voltage' changed to '100.0'" in caplog.text
assert "'class_attr[1].class_attr.voltage' changed to '100.0'" in caplog.text
assert "'class_attr[0].power' changed to '50.0'" in caplog.text
assert "'class_attr[1].power' changed to '50.0'" in caplog.text
assert "'power' changed to '50.0'" in caplog.text
def test_subsubclass_instance_properties(caplog: LogCaptureFixture) -> None:
class SubSubClass(DataService):
def __init__(self) -> None:
self._voltage = 10.0
super().__init__()
self._voltage = 10.0
@property
def voltage(self) -> float:
@ -282,9 +296,9 @@ def test_subsubclass_instance_properties(caplog: LogCaptureFixture) -> None:
class SubClass(DataService):
def __init__(self) -> None:
super().__init__()
self.attr = [SubSubClass()]
self.current = 0.5
super().__init__()
@property
def power(self) -> float:
@ -297,12 +311,11 @@ def test_subsubclass_instance_properties(caplog: LogCaptureFixture) -> None:
def power(self) -> float:
return self.class_attr[0].power
test_service = ServiceClass()
service_instance = ServiceClass()
state_manager = StateManager(service_instance)
DataServiceObserver(state_manager)
test_service.class_attr[1].attr[0].voltage = 100.0
# again, changing an item in a list will trigger the callbacks. This is why a
# notification for `ServiceClass.power` is emitted although it did not change its
# value
assert "ServiceClass.class_attr[1].attr[0].voltage changed to 100.0" in caplog.text
assert "ServiceClass.class_attr[1].power changed to 50.0" in caplog.text
assert "ServiceClass.power changed to 5.0" in caplog.text
service_instance.class_attr[0].attr[0].voltage = 100.0
assert "'class_attr[0].attr[0].voltage' changed to '100.0'" in caplog.text
assert "'class_attr[0].power' changed to '50.0'" in caplog.text
assert "'power' changed to '50.0'" in caplog.text

View File

@ -1,9 +1,10 @@
from typing import Any
from pytest import LogCaptureFixture
import pydase.units as u
from pydase.data_service.data_service import DataService
from pydase.data_service.data_service_observer import DataServiceObserver
from pydase.data_service.state_manager import StateManager
from pytest import LogCaptureFixture
def test_DataService_setattr(caplog: LogCaptureFixture) -> None:
@ -19,26 +20,28 @@ def test_DataService_setattr(caplog: LogCaptureFixture) -> None:
def current(self, value: Any) -> None:
self._current = value
service = ServiceClass()
service_instance = ServiceClass()
state_manager = StateManager(service_instance)
DataServiceObserver(state_manager)
# You can just set floats to the Quantity objects. The DataService __setattr__ will
# automatically convert this
service.voltage = 10.0 # type: ignore
service.current = 1.5
service_instance.voltage = 10.0 * u.units.V
service_instance.current = 1.5 * u.units.mA
assert service.voltage == 10.0 * u.units.V # type: ignore
assert service.current == 1.5 * u.units.mA
assert "'voltage' changed to '10.0 V'" in caplog.text
assert "'current' changed to '1.5 mA'" in caplog.text
assert "ServiceClass.voltage changed to 10.0 V" in caplog.text
assert "ServiceClass.current changed to 1.5 mA" in caplog.text
assert service_instance.voltage == 10.0 * u.units.V
assert service_instance.current == 1.5 * u.units.mA
caplog.clear()
service.voltage = 12.0 * u.units.V # type: ignore
service.current = 1.51 * u.units.A
assert service.voltage == 12.0 * u.units.V # type: ignore
assert service.current == 1.51 * u.units.A
service_instance.voltage = 12.0 * u.units.V
service_instance.current = 1.51 * u.units.A
assert "ServiceClass.voltage changed to 12.0 V" in caplog.text
assert "ServiceClass.current changed to 1.51 A" in caplog.text
assert "'voltage' changed to '12.0 V'" in caplog.text
assert "'current' changed to '1.51 A'" in caplog.text
assert service_instance.voltage == 12.0 * u.units.V
assert service_instance.current == 1.51 * u.units.A
def test_convert_to_quantity() -> None:
@ -48,7 +51,7 @@ def test_convert_to_quantity() -> None:
assert u.convert_to_quantity(1.0 * u.units.mV) == 1.0 * u.units.mV
def test_update_DataService_attribute(caplog: LogCaptureFixture) -> None:
def test_set_service_attribute_value_by_path(caplog: LogCaptureFixture) -> None:
class ServiceClass(DataService):
voltage = 1.0 * u.units.V
_current: u.Quantity = 1.0 * u.units.mA
@ -61,23 +64,25 @@ def test_update_DataService_attribute(caplog: LogCaptureFixture) -> None:
def current(self, value: Any) -> None:
self._current = value
service = ServiceClass()
service_instance = ServiceClass()
state_manager = StateManager(service_instance)
DataServiceObserver(state_manager)
service.update_DataService_attribute(
path_list=[], attr_name="voltage", value=1.0 * u.units.mV
state_manager.set_service_attribute_value_by_path(
path="voltage", value=1.0 * u.units.mV
)
assert "'voltage' changed to '1.0 mV'" in caplog.text
caplog.clear()
assert "ServiceClass.voltage changed to 1.0 mV" in caplog.text
state_manager.set_service_attribute_value_by_path(path="voltage", value=2)
service.update_DataService_attribute(path_list=[], attr_name="voltage", value=2)
assert "'voltage' changed to '2.0 mV'" in caplog.text
caplog.clear()
assert "ServiceClass.voltage changed to 2.0 mV" in caplog.text
service.update_DataService_attribute(
path_list=[], attr_name="voltage", value={"magnitude": 123, "unit": "kV"}
state_manager.set_service_attribute_value_by_path(
path="voltage", value={"magnitude": 123, "unit": "kV"}
)
assert "ServiceClass.voltage changed to 123.0 kV" in caplog.text
assert "'voltage' changed to '123.0 kV'" in caplog.text
def test_autoconvert_offset_to_baseunit() -> None:
@ -104,9 +109,9 @@ def test_loading_from_json(caplog: LogCaptureFixture) -> None:
}
class ServiceClass(DataService):
def __init__(self):
self._unit: u.Quantity = 1 * u.units.A
def __init__(self) -> None:
super().__init__()
self._unit: u.Quantity = 1 * u.units.A
@property
def some_unit(self) -> u.Quantity:
@ -117,8 +122,10 @@ def test_loading_from_json(caplog: LogCaptureFixture) -> None:
assert isinstance(value, u.Quantity)
self._unit = value
service = ServiceClass()
service_instance = ServiceClass()
state_manager = StateManager(service_instance)
DataServiceObserver(state_manager)
service.load_DataService_from_JSON(JSON_DICT)
service_instance.load_DataService_from_JSON(JSON_DICT)
assert "ServiceClass.some_unit changed to 10.0 A" in caplog.text
assert "'some_unit' changed to '10.0 A'" in caplog.text

View File

@ -1,6 +1,5 @@
import toml
import pydase.version
import toml
def test_project_version() -> None:

View File

@ -1,10 +1,10 @@
import asyncio
from enum import Enum
import pytest
from typing import Any
import pydase
import pydase.units as u
import pytest
from pydase.components.coloured_enum import ColouredEnum
from pydase.utils.serializer import (
SerializationPathError,
@ -32,7 +32,7 @@ from pydase.utils.serializer import (
),
],
)
def test_dump(test_input, expected):
def test_dump(test_input: Any, expected: dict[str, Any]) -> None:
assert dump(test_input) == expected
@ -43,13 +43,13 @@ def test_enum_serialize() -> None:
class EnumAttribute(pydase.DataService):
def __init__(self) -> None:
self.some_enum = EnumClass.FOO
super().__init__()
self.some_enum = EnumClass.FOO
class EnumPropertyWithoutSetter(pydase.DataService):
def __init__(self) -> None:
self._some_enum = EnumClass.FOO
super().__init__()
self._some_enum = EnumClass.FOO
@property
def some_enum(self) -> EnumClass:
@ -57,8 +57,8 @@ def test_enum_serialize() -> None:
class EnumPropertyWithSetter(pydase.DataService):
def __init__(self) -> None:
self._some_enum = EnumClass.FOO
super().__init__()
self._some_enum = EnumClass.FOO
@property
def some_enum(self) -> EnumClass:
@ -401,17 +401,10 @@ def test_get_class_attribute_inside_list(setup_dict):
def test_get_invalid_list_index(setup_dict, caplog: pytest.LogCaptureFixture):
with pytest.raises(SerializationPathError):
get_nested_dict_by_path(setup_dict, "attr_list[10]")
assert (
"Error occured trying to change 'attr_list[10]': list index "
"out of range" in caplog.text
)
def test_get_invalid_path(setup_dict, caplog: pytest.LogCaptureFixture):
with pytest.raises(SerializationPathError):
get_nested_dict_by_path(setup_dict, "invalid_path")
assert (
"Error occured trying to access the key 'invalid_path': it is either "
"not present in the current dictionary or its value does not contain "
"a 'value' key." in caplog.text
)

View File

@ -1,28 +1,44 @@
from pydase import DataService
from pytest import LogCaptureFixture
from pydase import DataService
def test_class_attr_inheritance_warning(caplog: LogCaptureFixture) -> None:
class SubClass:
name = "Hello"
class ServiceClass(DataService):
attr_1 = SubClass()
ServiceClass()
assert (
"Class 'SubClass' does not inherit from DataService. This may lead to "
"unexpected behaviour!"
) in caplog.text
def test_setattr_warnings(caplog: LogCaptureFixture) -> None: # noqa
# def test_setattr_warnings(capsys: CaptureFixture) -> None:
def test_instance_attr_inheritance_warning(caplog: LogCaptureFixture) -> None:
class SubClass:
name = "Hello"
class ServiceClass(DataService):
def __init__(self) -> None:
self.attr_1 = SubClass()
super().__init__()
self.attr_1 = SubClass()
ServiceClass()
assert "Warning: Class 'SubClass' does not inherit from DataService." in caplog.text
assert (
"Class 'SubClass' does not inherit from DataService. This may lead to "
"unexpected behaviour!"
) in caplog.text
def test_private_attribute_warning(caplog: LogCaptureFixture) -> None: # noqa
def test_private_attribute_warning(caplog: LogCaptureFixture) -> None:
class ServiceClass(DataService):
def __init__(self) -> None:
self.__something = ""
super().__init__()
self.__something = ""
ServiceClass()
@ -32,14 +48,14 @@ def test_private_attribute_warning(caplog: LogCaptureFixture) -> None: # noqa
)
def test_protected_attribute_warning(caplog: LogCaptureFixture) -> None: # noqa
def test_protected_attribute_warning(caplog: LogCaptureFixture) -> None:
class SubClass:
name = "Hello"
class ServiceClass(DataService):
def __init__(self) -> None:
self._subclass = SubClass
super().__init__()
self._subclass = SubClass
ServiceClass()