removes data service list and callback manager, make DataService an Observable

This commit is contained in:
Mose Müller 2023-11-30 17:09:46 +01:00
parent 1d2ac57ba7
commit 114a1c6fdc
9 changed files with 7 additions and 702 deletions

View File

@ -1,16 +1,15 @@
from __future__ import annotations
from abc import ABC
from typing import TYPE_CHECKING, Any
from pydase.observer_pattern.observable.observable import Observable
if TYPE_CHECKING:
from pydase.data_service.callback_manager import CallbackManager
from pydase.data_service.data_service import DataService
from pydase.data_service.task_manager import TaskManager
class AbstractDataService(ABC):
class AbstractDataService(Observable):
__root__: DataService
_task_manager: TaskManager
_callback_manager: CallbackManager
_autostart_tasks: dict[str, tuple[Any]]

View File

@ -1,442 +0,0 @@
from __future__ import annotations
import inspect
import logging
from typing import TYPE_CHECKING, Any, ClassVar
from pydase.data_service.abstract_data_service import AbstractDataService
from pydase.utils.helpers import get_class_and_instance_attributes
from .data_service_list import DataServiceList
if TYPE_CHECKING:
from collections.abc import Callable
from .data_service import DataService
logger = logging.getLogger(__name__)
class CallbackManager:
_notification_callbacks: ClassVar[list[Callable[[str, str, Any], Any]]] = []
"""
A list of callback functions that are executed when a change occurs in the
DataService instance. These functions are intended to handle or respond to these
changes in some way, such as emitting a socket.io message to the frontend.
Each function in this list should be a callable that accepts three parameters:
- parent_path (str): The path to the parent of the attribute that was changed.
- name (str): The name of the attribute that was changed.
- value (Any): The new value of the attribute.
A callback function can be added to this list using the add_notification_callback
method. Whenever a change in the DataService instance occurs (or in its nested
DataService or DataServiceList instances), the emit_notification method is invoked,
which in turn calls all the callback functions in _notification_callbacks with the
appropriate arguments.
This implementation follows the observer pattern, with the DataService instance as
the "subject" and the callback functions as the "observers".
"""
_list_mapping: ClassVar[dict[int, DataServiceList]] = {}
"""
A dictionary mapping the id of the original lists to the corresponding
DataServiceList instances.
This is used to ensure that all references to the same list within the DataService
object point to the same DataServiceList, so that any modifications to that list can
be tracked consistently. The keys of the dictionary are the ids of the original
lists, and the values are the DataServiceList instances that wrap these lists.
"""
def __init__(self, service: DataService) -> None:
self.callbacks: set[Callable[[str, Any], None]] = set()
self.service = service
def _register_list_change_callbacks( # noqa: C901
self, obj: AbstractDataService, parent_path: str
) -> None:
"""
This method ensures that notifications are emitted whenever a public list
attribute of a DataService instance changes. These notifications pertain solely
to the list item changes, not to changes in attributes of objects within the
list.
The method works by converting all list attributes (both at the class and
instance levels) into DataServiceList objects. Each DataServiceList is then
assigned a callback that is triggered whenever an item in the list is updated.
The callback emits a notification, but only if the DataService instance was the
root instance when the callback was registered.
This method operates recursively, processing the input object and all nested
attributes that are instances of DataService. While navigating the structure,
it constructs a path for each attribute that traces back to the root. This path
is included in any emitted notifications to facilitate identification of the
source of a change.
Parameters:
-----------
obj: DataService
The target object to be processed. All list attributes (and those of its
nested DataService attributes) will be converted into DataServiceList
objects.
parent_path: str
The access path for the parent object. Used to construct the full access
path for the notifications.
"""
# Convert all list attributes (both class and instance) to DataServiceList
attrs = get_class_and_instance_attributes(obj)
for attr_name, attr_value in attrs.items():
if isinstance(attr_value, AbstractDataService):
new_path = f"{parent_path}.{attr_name}"
self._register_list_change_callbacks(attr_value, new_path)
elif isinstance(attr_value, list):
# Create callback for current attr_name
# Default arguments solve the late binding problem by capturing the
# value at the time the lambda is defined, not when it is called. This
# prevents attr_name from being overwritten in the next loop iteration.
def callback(
index: int, value: Any, attr_name: str = attr_name
) -> None:
"""Emits a notification through the service's callback manager."""
# Skip private and protected lists
if (
self.service == self.service.__root__
and not attr_name.startswith("_")
):
self.service._callback_manager.emit_notification(
parent_path=parent_path,
name=f"{attr_name}[{index}]",
value=value,
)
# Check if attr_value is already a DataServiceList or in the mapping
if isinstance(attr_value, DataServiceList):
attr_value.add_callback(callback)
continue
if id(attr_value) in self._list_mapping:
# If the list `attr_value` was already referenced somewhere else
notifying_list = self._list_mapping[id(attr_value)]
notifying_list.add_callback(callback)
else:
# convert the builtin list into a DataServiceList and add the
# callback
notifying_list = DataServiceList(
attr_value, callback_list=[callback]
)
self._list_mapping[id(attr_value)] = notifying_list
setattr(obj, attr_name, notifying_list)
# recursively add callbacks to list attributes of DataService instances
for i, item in enumerate(attr_value):
if isinstance(item, AbstractDataService):
new_path = f"{parent_path}.{attr_name}[{i}]"
self._register_list_change_callbacks(item, new_path)
def _register_data_service_instance_callbacks(
self, obj: AbstractDataService, parent_path: str
) -> None:
"""
This function is a key part of the observer pattern implemented by the
DataService class.
Its purpose is to allow the system to automatically send out notifications
whenever an attribute of a DataService instance is updated, which is especially
useful when the DataService instance is part of a nested structure.
It works by recursively registering callbacks for a given DataService instance
and all of its nested attributes. Each callback is responsible for emitting a
notification when the attribute it is attached to is modified.
This function ensures that only the root DataService instance (the one directly
exposed to the user or another system via rpyc) emits notifications.
Each notification contains a 'parent_path' that traces the attribute's location
within the nested DataService structure, starting from the root. This makes it
easier for observers to determine exactly where a change has occurred.
Parameters:
-----------
obj: DataService
The target object on which callbacks are to be registered.
parent_path: str
The access path for the parent object. This is used to construct the full
access path for the notifications.
"""
# Create and register a callback for the object
# only emit the notification when the call was registered by the root object
def callback(attr_name: str, value: Any) -> None:
"""Emits a notification through the service's callback manager."""
# Skip private and protected attrs
# exlude proerty notifications -> those are handled in separate callbacks
if (
self.service == self.service.__root__
and not attr_name.startswith("_")
and not isinstance(getattr(type(obj), attr_name, None), property)
):
self.service._callback_manager.emit_notification(
parent_path=parent_path,
name=attr_name,
value=value,
)
obj._callback_manager.callbacks.add(callback)
# Recursively register callbacks for all nested attributes of the object
attrs = get_class_and_instance_attributes(obj)
for nested_attr_name, nested_attr in attrs.items():
if isinstance(nested_attr, DataServiceList):
self._register_list_callbacks(
nested_attr, parent_path, nested_attr_name
)
elif isinstance(nested_attr, AbstractDataService):
self._register_service_callbacks(
nested_attr, parent_path, nested_attr_name
)
def _register_list_callbacks(
self, nested_attr: list[Any], parent_path: str, attr_name: str
) -> None:
"""Handles registration of callbacks for list attributes"""
for i, list_item in enumerate(nested_attr):
if isinstance(list_item, AbstractDataService):
self._register_service_callbacks(
list_item, parent_path, f"{attr_name}[{i}]"
)
def _register_service_callbacks(
self, nested_attr: AbstractDataService, parent_path: str, attr_name: str
) -> None:
"""Handles registration of callbacks for DataService attributes"""
# as the DataService is an attribute of self, change the root object
# use the dictionary to not trigger callbacks on initialised objects
nested_attr.__dict__["__root__"] = self.service.__root__
new_path = f"{parent_path}.{attr_name}"
self._register_data_service_instance_callbacks(nested_attr, new_path)
def __register_recursive_parameter_callback(
self,
obj: AbstractDataService | DataServiceList,
callback: Callable[[str | int, Any], None],
) -> None:
"""
Register callback to a DataService or DataServiceList instance and its nested
instances.
For a DataService, this method traverses its attributes and recursively adds the
callback for nested DataService or DataServiceList instances. For a
DataServiceList,
the callback is also triggered when an item gets reassigned.
"""
if isinstance(obj, DataServiceList):
# emits callback when item in list gets reassigned
obj.add_callback(callback=callback)
obj_list: DataServiceList | list[AbstractDataService] = obj
else:
obj_list = [obj]
# this enables notifications when a class instance was changed (-> item is
# changed, not reassigned)
for item in obj_list:
if isinstance(item, AbstractDataService):
item._callback_manager.callbacks.add(callback)
for attr_name in set(dir(item)) - set(dir(object)) - {"__root__"}:
attr_value = getattr(item, attr_name)
if isinstance(attr_value, AbstractDataService | DataServiceList):
self.__register_recursive_parameter_callback(
attr_value, callback
)
def _register_property_callbacks( # noqa: C901
self,
obj: AbstractDataService,
parent_path: str,
) -> None:
"""
Register callbacks to notify when properties or their dependencies change.
This method cycles through all attributes (both class and instance level) of the
input `obj`. For each attribute that is a property, it identifies dependencies
used in the getter method and creates a callback for each one.
The method is recursive for attributes that are of type DataService or
DataServiceList. It attaches the callback directly to DataServiceList items or
propagates it through nested DataService instances.
"""
attrs = get_class_and_instance_attributes(obj)
for attr_name, attr_value in attrs.items():
if isinstance(attr_value, AbstractDataService):
self._register_property_callbacks(
attr_value, parent_path=f"{parent_path}.{attr_name}"
)
elif isinstance(attr_value, DataServiceList):
for i, item in enumerate(attr_value):
if isinstance(item, AbstractDataService):
self._register_property_callbacks(
item, parent_path=f"{parent_path}.{attr_name}[{i}]"
)
if isinstance(attr_value, property):
dependencies = attr_value.fget.__code__.co_names # type: ignore[union-attr]
source_code_string = inspect.getsource(attr_value.fget) # type: ignore[arg-type]
for dependency in dependencies:
# check if the dependencies are attributes of obj
# This doesn't have to be the case like, for example, here:
# >>> @property
# >>> def power(self) -> float:
# >>> return self.class_attr.voltage * self.current
#
# The dependencies for this property are:
# > ('class_attr', 'voltage', 'current')
if f"self.{dependency}" not in source_code_string:
continue
# use `obj` instead of `type(obj)` to get DataServiceList
# instead of list
dependency_value = getattr(obj, dependency)
if isinstance(
dependency_value, DataServiceList | AbstractDataService
):
def list_or_data_service_callback(
name: Any,
value: Any,
dependent_attr: str = attr_name,
) -> None:
"""Emits a notification through the service's callback
manager.
"""
if self.service == obj.__root__:
obj._callback_manager.emit_notification(
parent_path=parent_path,
name=dependent_attr,
value=getattr(obj, dependent_attr),
)
self.__register_recursive_parameter_callback(
dependency_value,
callback=list_or_data_service_callback,
)
else:
def callback(
name: str,
value: Any,
dependent_attr: str = attr_name,
dep: str = dependency,
) -> None:
"""Emits a notification through the service's callback
manager.
"""
if name == dep and self.service == obj.__root__:
obj._callback_manager.emit_notification(
parent_path=parent_path,
name=dependent_attr,
value=getattr(obj, dependent_attr),
)
# Add to callbacks
obj._callback_manager.callbacks.add(callback)
def _register_start_stop_task_callbacks( # noqa: C901
self, obj: AbstractDataService, parent_path: str
) -> None:
"""
This function registers callbacks for start and stop methods of async functions.
These callbacks are stored in the '_task_status_change_callbacks' attribute and
are called when the status of a task changes.
Parameters:
-----------
obj: AbstractDataService
The target object on which callbacks are to be registered.
parent_path: str
The access path for the parent object. This is used to construct the full
access path for the notifications.
"""
# Create and register a callback for the object
# only emit the notification when the call was registered by the root object
def task_status_change_callback(
name: str, task_status: dict[str, Any] | None
) -> None:
"""Emits a notification through the service's callback
manager.
"""
if self.service == obj.__root__ and not name.startswith("_"):
obj._callback_manager.emit_notification(
parent_path=parent_path,
name=name,
value=task_status,
)
obj._task_manager.task_status_change_callbacks.append(
task_status_change_callback
)
# Recursively register callbacks for all nested attributes of the object
attrs: dict[str, Any] = get_class_and_instance_attributes(obj)
for nested_attr_name, nested_attr in attrs.items():
if isinstance(nested_attr, DataServiceList):
for i, item in enumerate(nested_attr):
if isinstance(item, AbstractDataService):
self._register_start_stop_task_callbacks(
item, parent_path=f"{parent_path}.{nested_attr_name}[{i}]"
)
if isinstance(nested_attr, AbstractDataService):
self._register_start_stop_task_callbacks(
nested_attr, parent_path=f"{parent_path}.{nested_attr_name}"
)
def register_callbacks(self) -> None:
self._register_list_change_callbacks(
self.service, f"{self.service.__class__.__name__}"
)
self._register_data_service_instance_callbacks(
self.service, f"{self.service.__class__.__name__}"
)
self._register_property_callbacks(
self.service, f"{self.service.__class__.__name__}"
)
self._register_start_stop_task_callbacks(
self.service, f"{self.service.__class__.__name__}"
)
def emit_notification(self, parent_path: str, name: str, value: Any) -> None:
logger.debug("%s.%s changed to %s!", parent_path, name, value)
for callback in self._notification_callbacks:
try:
callback(parent_path, name, value)
except Exception as e:
logger.error(e)
def add_notification_callback(
self, callback: Callable[[str, str, Any], None]
) -> None:
"""
Adds a new notification callback function to the list of callbacks.
This function is intended to be used for registering a function that will be
called whenever a the value of an attribute changes.
Args:
callback (Callable[[str, str, Any], None]): The callback function to
register.
It should accept three parameters:
- parent_path (str): The parent path of the parameter.
- name (str): The name of the changed parameter.
- value (Any): The value of the parameter.
"""
self._notification_callbacks.append(callback)

View File

@ -7,7 +7,6 @@ import rpyc # type: ignore[import-untyped]
import pydase.units as u
from pydase.data_service.abstract_data_service import AbstractDataService
from pydase.data_service.callback_manager import CallbackManager
from pydase.data_service.task_manager import TaskManager
from pydase.utils.helpers import (
convert_arguments_to_hinted_types,
@ -45,16 +44,12 @@ def process_callable_attribute(attr: Any, args: dict[str, Any]) -> Any:
class DataService(rpyc.Service, AbstractDataService):
def __init__(self, **kwargs: Any) -> None:
self._callback_manager: CallbackManager = CallbackManager(self)
super().__init__()
self._task_manager = TaskManager(self)
if not hasattr(self, "_autostart_tasks"):
self._autostart_tasks = {}
self.__root__: "DataService" = self
"""Keep track of the root object. This helps to filter the emission of
notifications."""
filename = kwargs.pop("filename", None)
if filename is not None:
warnings.warn(
@ -65,7 +60,6 @@ class DataService(rpyc.Service, AbstractDataService):
)
self._filename: str | Path = filename
self._callback_manager.register_callbacks()
self.__check_instance_classes()
self._initialised = True
@ -82,10 +76,7 @@ class DataService(rpyc.Service, AbstractDataService):
super().__setattr__(__name, __value)
if self.__dict__.get("_initialised") and __name != "_initialised":
for callback in self._callback_manager.callbacks:
callback(__name, __value)
elif __name.startswith(f"_{self.__class__.__name__}__"):
if __name.startswith(f"_{self.__class__.__name__}__"):
logger.warning(
"Warning: You should not set private but rather protected attributes! "
"Use %s instead of %s.",

View File

@ -23,7 +23,6 @@ class DataServiceCache:
"""Initializes the cache and sets up the callback."""
logger.debug("Initializing cache.")
self._cache = self.service.serialize()
self.service._callback_manager.add_notification_callback(self.update_cache)
def update_cache(self, parent_path: str, name: str, value: Any) -> None:
# Remove the part before the first "." in the parent_path

View File

@ -1,68 +0,0 @@
from collections.abc import Callable
from typing import Any
import pydase.units as u
from pydase.utils.warnings import (
warn_if_instance_class_does_not_inherit_from_data_service,
)
class DataServiceList(list[Any]):
"""
DataServiceList is a list with additional functionality to trigger callbacks
whenever an item is set. This can be used to track changes in the list items.
The class takes the same arguments as the list superclass during initialization,
with an additional optional 'callback' argument that is a list of functions.
These callbacks are stored and executed whenever an item in the DataServiceList
is set via the __setitem__ method. The callbacks receive the index of the changed
item and its new value as arguments.
The original list that is passed during initialization is kept as a private
attribute to prevent it from being garbage collected.
Additional callbacks can be added after initialization using the `add_callback`
method.
"""
def __init__(
self,
*args: list[Any],
callback_list: list[Callable[[int, Any], None]] | None = None,
**kwargs: Any,
) -> None:
self._callbacks: list[Callable[[int, Any], None]] = []
if isinstance(callback_list, list):
self._callbacks = callback_list
for item in args[0]:
warn_if_instance_class_does_not_inherit_from_data_service(item)
# prevent gc to delete the passed list by keeping a reference
self._original_list = args[0]
super().__init__(*args, **kwargs)
def __setitem__(self, key: int, value: Any) -> None: # type: ignore[override]
current_value = self.__getitem__(key)
# parse ints into floats if current value is a float
if isinstance(current_value, float) and isinstance(value, int):
value = float(value)
if isinstance(current_value, u.Quantity):
value = u.convert_to_quantity(value, str(current_value.u))
super().__setitem__(key, value)
for callback in self._callbacks:
callback(key, value)
def add_callback(self, callback: Callable[[int, Any], None]) -> None:
"""
Add a new callback function to be executed on item set.
Args:
callback (Callable[[int, Any], None]): Callback function that takes two
arguments - index of the changed item and its new value.
"""
self._callbacks.append(callback)

View File

@ -7,7 +7,6 @@ from functools import wraps
from typing import TYPE_CHECKING, Any, TypedDict
from pydase.data_service.abstract_data_service import AbstractDataService
from pydase.data_service.data_service_list import DataServiceList
from pydase.utils.helpers import get_class_and_instance_attributes
if TYPE_CHECKING:
@ -122,7 +121,7 @@ class TaskManager:
for attr_value in attrs.values():
if isinstance(attr_value, AbstractDataService):
attr_value._task_manager.start_autostart_tasks()
elif isinstance(attr_value, DataServiceList):
elif isinstance(attr_value, list):
for item in attr_value:
if isinstance(item, AbstractDataService):
item._task_manager.start_autostart_tasks()

View File

@ -26,9 +26,7 @@ def get_class_and_instance_attributes(obj: object) -> dict[str, Any]:
loops.
"""
attrs = dict(chain(type(obj).__dict__.items(), obj.__dict__.items()))
attrs.pop("__root__")
return attrs
return dict(chain(type(obj).__dict__.items(), obj.__dict__.items()))
def get_object_attr_from_path_list(target_obj: Any, path: list[str]) -> Any:

View File

@ -1,42 +0,0 @@
import logging
from pytest import LogCaptureFixture
import pydase
logger = logging.getLogger()
def test_DataService_task_callback(caplog: LogCaptureFixture) -> None:
class MyService(pydase.DataService):
async def my_task(self) -> None:
logger.info("Triggered task.")
async def my_other_task(self) -> None:
logger.info("Triggered other task.")
service = MyService()
service.start_my_task() # type: ignore
service.start_my_other_task() # type: ignore
assert "MyService.my_task changed to {}" in caplog.text
assert "MyService.my_other_task changed to {}" in caplog.text
def test_DataServiceList_task_callback(caplog: LogCaptureFixture) -> None:
class MySubService(pydase.DataService):
async def my_task(self) -> None:
logger.info("Triggered task.")
async def my_other_task(self) -> None:
logger.info("Triggered other task.")
class MyService(pydase.DataService):
sub_services_list = [MySubService() for i in range(2)]
service = MyService()
service.sub_services_list[0].start_my_task() # type: ignore
service.sub_services_list[1].start_my_other_task() # type: ignore
assert "MyService.sub_services_list[0].my_task changed to {}" in caplog.text
assert "MyService.sub_services_list[1].my_other_task changed to {}" in caplog.text

View File

@ -1,129 +0,0 @@
from typing import Any
from pytest import LogCaptureFixture
import pydase.units as u
from pydase import DataService
def test_class_list_attribute(caplog: LogCaptureFixture) -> None:
class ServiceClass(DataService):
attr = [0, 1]
service_instance = ServiceClass()
service_instance.attr[0] = 1337
assert "ServiceClass.attr[0] changed to 1337" in caplog.text
caplog.clear()
def test_instance_list_attribute(caplog: LogCaptureFixture) -> None:
class SubClass(DataService):
name = "SubClass"
class ServiceClass(DataService):
def __init__(self) -> None:
self.attr: list[Any] = [0, SubClass()]
super().__init__()
service_instance = ServiceClass()
service_instance.attr[0] = "Hello"
assert "ServiceClass.attr[0] changed to Hello" in caplog.text
caplog.clear()
service_instance.attr[1] = SubClass()
assert f"ServiceClass.attr[1] changed to {service_instance.attr[1]}" in caplog.text
caplog.clear()
def test_reused_instance_list_attribute(caplog: LogCaptureFixture) -> None:
some_list = [0, 1, 2]
class ServiceClass(DataService):
def __init__(self) -> None:
self.attr = some_list
self.attr_2 = some_list
self.attr_3 = [0, 1, 2]
super().__init__()
service_instance = ServiceClass()
service_instance.attr[0] = 20
assert service_instance.attr == service_instance.attr_2
assert service_instance.attr != service_instance.attr_3
assert "ServiceClass.attr[0] changed to 20" in caplog.text
assert "ServiceClass.attr_2[0] changed to 20" in caplog.text
def test_nested_reused_instance_list_attribute(caplog: LogCaptureFixture) -> None:
some_list = [0, 1, 2]
class SubClass(DataService):
attr_list = some_list
def __init__(self) -> None:
self.attr_list_2 = some_list
super().__init__()
class ServiceClass(DataService):
def __init__(self) -> None:
self.attr = some_list
self.subclass = SubClass()
super().__init__()
service_instance = ServiceClass()
service_instance.attr[0] = 20
assert service_instance.attr == service_instance.subclass.attr_list
assert "ServiceClass.attr[0] changed to 20" in caplog.text
assert "ServiceClass.subclass.attr_list[0] changed to 20" in caplog.text
assert "ServiceClass.subclass.attr_list_2[0] changed to 20" in caplog.text
def test_protected_list_attribute(caplog: LogCaptureFixture) -> None:
"""Changing protected lists should not emit notifications for the lists themselves,
but still for all properties depending on them.
"""
class ServiceClass(DataService):
_attr = [0, 1]
@property
def list_dependend_property(self) -> int:
return self._attr[0]
service_instance = ServiceClass()
service_instance._attr[0] = 1337
assert "ServiceClass.list_dependend_property changed to 1337" in caplog.text
def test_converting_int_to_float_entries(caplog: LogCaptureFixture) -> None:
class ServiceClass(DataService):
float_list = [0.0]
service_instance = ServiceClass()
service_instance.float_list[0] = 1
assert isinstance(service_instance.float_list[0], float)
assert "ServiceClass.float_list[0] changed to 1.0" in caplog.text
def test_converting_number_to_quantity_entries(caplog: LogCaptureFixture) -> None:
class ServiceClass(DataService):
quantity_list: list[u.Quantity] = [1 * u.units.A]
service_instance = ServiceClass()
service_instance.quantity_list[0] = 4 # type: ignore
assert isinstance(service_instance.quantity_list[0], u.Quantity)
assert "ServiceClass.quantity_list[0] changed to 4.0 A" in caplog.text
caplog.clear()
service_instance.quantity_list[0] = 3.1 * u.units.mA
assert isinstance(service_instance.quantity_list[0], u.Quantity)
assert "ServiceClass.quantity_list[0] changed to 3.1 mA" in caplog.text