feat: moving CallbackManager to new file, moving emit_notification to CallbackManager

This commit is contained in:
Mose Müller 2023-08-02 12:06:22 +02:00
parent 2632e7ba6e
commit bcc846f70d
6 changed files with 422 additions and 400 deletions

View File

@ -0,0 +1,403 @@
import inspect
from collections.abc import Callable
from typing import Any, cast
from loguru import logger
from pyDataInterface.utils.helpers import get_class_and_instance_attributes
from .abstract_data_service import AbstractDataService
from .data_service_list import DataServiceList
class CallbackManager:
_notification_callbacks: list[Callable[[str, str, Any], Any]] = []
"""
A list of callback functions that are executed when a change occurs in the
DataService instance. These functions are intended to handle or respond to these
changes in some way, such as emitting a socket.io message to the frontend.
Each function in this list should be a callable that accepts three parameters:
- parent_path (str): The path to the parent of the attribute that was changed.
- name (str): The name of the attribute that was changed.
- value (Any): The new value of the attribute.
A callback function can be added to this list using the add_notification_callback
method. Whenever a change in the DataService instance occurs (or in its nested
DataService or DataServiceList instances), the emit_notification method is invoked,
which in turn calls all the callback functions in _notification_callbacks with the
appropriate arguments.
This implementation follows the observer pattern, with the DataService instance as
the "subject" and the callback functions as the "observers".
"""
_list_mapping: dict[int, DataServiceList] = {}
"""
A dictionary mapping the id of the original lists to the corresponding
DataServiceList instances.
This is used to ensure that all references to the same list within the DataService
object point to the same DataServiceList, so that any modifications to that list can
be tracked consistently. The keys of the dictionary are the ids of the original
lists, and the values are the DataServiceList instances that wrap these lists.
"""
def __init__(self, service: "AbstractDataService") -> None:
self.callbacks: set[Callable[[str, Any], None]] = set()
self.service = service
def _register_list_change_callbacks( # noqa: C901
self, obj: "AbstractDataService", parent_path: str
) -> None:
"""
This method ensures that notifications are emitted whenever a list attribute of
a DataService instance changes. These notifications pertain solely to the list
item changes, not to changes in attributes of objects within the list.
The method works by converting all list attributes (both at the class and
instance levels) into DataServiceList objects. Each DataServiceList is then
assigned a callback that is triggered whenever an item in the list is updated.
The callback emits a notification, but only if the DataService instance was the
root instance when the callback was registered.
This method operates recursively, processing the input object and all nested
attributes that are instances of DataService. While navigating the structure,
it constructs a path for each attribute that traces back to the root. This path
is included in any emitted notifications to facilitate identification of the
source of a change.
Parameters:
-----------
obj: DataService
The target object to be processed. All list attributes (and those of its
nested DataService attributes) will be converted into DataServiceList
objects.
parent_path: str
The access path for the parent object. Used to construct the full access
path for the notifications.
"""
# Convert all list attributes (both class and instance) to DataServiceList
attrs = get_class_and_instance_attributes(obj)
for attr_name, attr_value in attrs.items():
if isinstance(attr_value, AbstractDataService):
new_path = f"{parent_path}.{attr_name}"
self._register_list_change_callbacks(attr_value, new_path)
elif isinstance(attr_value, list):
# Create callback for current attr_name
# Default arguments solve the late binding problem by capturing the
# value at the time the lambda is defined, not when it is called. This
# prevents attr_name from being overwritten in the next loop iteration.
callback = (
lambda index, value, attr_name=attr_name: cast(
CallbackManager, self.service._callback_manager
).emit_notification(
parent_path=parent_path,
name=f"{attr_name}[{index}]",
value=value,
)
if self.service == self.service.__root__
else None
)
# Check if attr_value is already a DataServiceList or in the mapping
if isinstance(attr_value, DataServiceList):
attr_value.add_callback(callback)
continue
if id(attr_value) in self._list_mapping:
notifying_list = self._list_mapping[id(attr_value)]
notifying_list.add_callback(callback)
else:
notifying_list = DataServiceList(attr_value, callback=[callback])
self._list_mapping[id(attr_value)] = notifying_list
setattr(obj, attr_name, notifying_list)
# recursively add callbacks to list attributes of DataService instances
for i, item in enumerate(attr_value):
if isinstance(item, AbstractDataService):
new_path = f"{parent_path}.{attr_name}[{i}]"
self._register_list_change_callbacks(item, new_path)
def _register_DataService_instance_callbacks(
self, obj: "AbstractDataService", parent_path: str
) -> None:
"""
This function is a key part of the observer pattern implemented by the
DataService class.
Its purpose is to allow the system to automatically send out notifications
whenever an attribute of a DataService instance is updated, which is especially
useful when the DataService instance is part of a nested structure.
It works by recursively registering callbacks for a given DataService instance
and all of its nested attributes. Each callback is responsible for emitting a
notification when the attribute it is attached to is modified.
This function ensures that only the root DataService instance (the one directly
exposed to the user or another system via rpyc) emits notifications.
Each notification contains a 'parent_path' that traces the attribute's location
within the nested DataService structure, starting from the root. This makes it
easier for observers to determine exactly where a change has occurred.
Parameters:
-----------
obj: DataService
The target object on which callbacks are to be registered.
parent_path: str
The access path for the parent object. This is used to construct the full
access path for the notifications.
"""
# Create and register a callback for the object
# only emit the notification when the call was registered by the root object
callback: Callable[[str, Any], None] = (
lambda name, value: cast(
CallbackManager, obj._callback_manager
).emit_notification(parent_path=parent_path, name=name, value=value)
if self.service == obj.__root__
and not name.startswith("_") # we are only interested in public attributes
and not isinstance(
getattr(type(obj), name, None), property
) # exlude proerty notifications -> those are handled in separate callbacks
else None
)
cast(CallbackManager, obj._callback_manager).callbacks.add(callback)
# Recursively register callbacks for all nested attributes of the object
attrs = get_class_and_instance_attributes(obj)
for nested_attr_name, nested_attr in attrs.items():
if isinstance(nested_attr, DataServiceList):
self._register_list_callbacks(
nested_attr, parent_path, nested_attr_name
)
elif isinstance(nested_attr, AbstractDataService):
self._register_service_callbacks(
nested_attr, parent_path, nested_attr_name
)
def _register_list_callbacks(
self, nested_attr: list[Any], parent_path: str, attr_name: str
) -> None:
"""Handles registration of callbacks for list attributes"""
for i, list_item in enumerate(nested_attr):
if isinstance(list_item, AbstractDataService):
self._register_service_callbacks(
list_item, parent_path, f"{attr_name}[{i}]"
)
def _register_service_callbacks(
self, nested_attr: "AbstractDataService", parent_path: str, attr_name: str
) -> None:
"""Handles registration of callbacks for DataService attributes"""
# as the DataService is an attribute of self, change the root object
# use the dictionary to not trigger callbacks on initialised objects
nested_attr.__dict__["__root__"] = self.service.__root__
new_path = f"{parent_path}.{attr_name}"
self._register_DataService_instance_callbacks(nested_attr, new_path)
def __register_recursive_parameter_callback(
self,
obj: "AbstractDataService | DataServiceList",
callback: Callable[[str | int, Any], None],
) -> None:
"""
Register callback to a DataService or DataServiceList instance and its nested
instances.
For a DataService, this method traverses its attributes and recursively adds the
callback for nested DataService or DataServiceList instances. For a
DataServiceList,
the callback is also triggered when an item gets reassigned.
"""
if isinstance(obj, DataServiceList):
# emits callback when item in list gets reassigned
obj.add_callback(callback=callback)
obj_list: DataServiceList | list[AbstractDataService] = obj
else:
obj_list = [obj]
# this enables notifications when a class instance was changed (-> item is
# changed, not reassigned)
for item in obj_list:
if isinstance(item, AbstractDataService):
cast(CallbackManager, item._callback_manager).callbacks.add(callback)
for attr_name in set(dir(item)) - set(dir(object)) - {"__root__"}:
attr_value = getattr(item, attr_name)
if isinstance(attr_value, (AbstractDataService, DataServiceList)):
self.__register_recursive_parameter_callback(
attr_value, callback
)
def _register_property_callbacks( # noqa: C901
self,
obj: "AbstractDataService",
parent_path: str,
) -> None:
"""
Register callbacks to notify when properties or their dependencies change.
This method cycles through all attributes (both class and instance level) of the
input `obj`. For each attribute that is a property, it identifies dependencies
used in the getter method and creates a callback for each one.
The method is recursive for attributes that are of type DataService or
DataServiceList. It attaches the callback directly to DataServiceList items or
propagates it through nested DataService instances.
"""
attrs = get_class_and_instance_attributes(obj)
for attr_name, attr_value in attrs.items():
if isinstance(attr_value, AbstractDataService):
self._register_property_callbacks(
attr_value, parent_path=f"{parent_path}.{attr_name}"
)
elif isinstance(attr_value, DataServiceList):
for i, item in enumerate(attr_value):
if isinstance(item, AbstractDataService):
self._register_property_callbacks(
item, parent_path=f"{parent_path}.{attr_name}[{i}]"
)
if isinstance(attr_value, property):
dependencies = attr_value.fget.__code__.co_names # type: ignore
source_code_string = inspect.getsource(attr_value.fget) # type: ignore
for dependency in dependencies:
# check if the dependencies are attributes of obj
# This doesn't have to be the case like, for example, here:
# >>> @property
# >>> def power(self) -> float:
# >>> return self.class_attr.voltage * self.current
#
# The dependencies for this property are:
# > ('class_attr', 'voltage', 'current')
if f"self.{dependency}" not in source_code_string:
continue
# use `obj` instead of `type(obj)` to get DataServiceList
# instead of list
dependency_value = getattr(obj, dependency)
if isinstance(
dependency_value, (DataServiceList, AbstractDataService)
):
callback = (
lambda name, value, dependent_attr=attr_name: cast(
CallbackManager, obj._callback_manager
).emit_notification(
parent_path=parent_path,
name=dependent_attr,
value=getattr(obj, dependent_attr),
)
if self.service == obj.__root__
else None
)
self.__register_recursive_parameter_callback(
dependency_value,
callback=callback,
)
else:
callback = (
lambda name, _, dep_attr=attr_name, dep=dependency: cast( # type: ignore
CallbackManager, obj._callback_manager
).emit_notification(
parent_path=parent_path,
name=dep_attr,
value=getattr(obj, dep_attr),
)
if name == dep and self.service == obj.__root__
else None
)
# Add to callbacks
cast(CallbackManager, obj._callback_manager).callbacks.add(
callback
)
def _register_start_stop_task_callbacks(
self, obj: "AbstractDataService", parent_path: str
) -> None:
"""
This function registers callbacks for start and stop methods of async functions.
These callbacks are stored in the '_task_status_change_callbacks' attribute and
are called when the status of a task changes.
Parameters:
-----------
obj: AbstractDataService
The target object on which callbacks are to be registered.
parent_path: str
The access path for the parent object. This is used to construct the full
access path for the notifications.
"""
# Create and register a callback for the object
# only emit the notification when the call was registered by the root object
callback: Callable[[str, dict[str, Any] | None], None] = (
lambda name, status: cast(
CallbackManager, obj._callback_manager
).emit_notification(parent_path=parent_path, name=name, value=status)
if self.service == obj.__root__
and not name.startswith("_") # we are only interested in public attributes
else None
)
obj._task_status_change_callbacks.append(callback)
# Recursively register callbacks for all nested attributes of the object
attrs: dict[str, Any] = get_class_and_instance_attributes(obj)
for nested_attr_name, nested_attr in attrs.items():
if isinstance(nested_attr, AbstractDataService):
self._register_start_stop_task_callbacks(
nested_attr, parent_path=f"{parent_path}.{nested_attr_name}"
)
def register_callbacks(self) -> None:
self._register_list_change_callbacks(
self.service, f"{self.service.__class__.__name__}"
)
self._register_DataService_instance_callbacks(
self.service, f"{self.service.__class__.__name__}"
)
self._register_property_callbacks(
self.service, f"{self.service.__class__.__name__}"
)
self._register_start_stop_task_callbacks(
self.service, f"{self.service.__class__.__name__}"
)
def emit_notification(self, parent_path: str, name: str, value: Any) -> None:
logger.debug(f"{parent_path}.{name} changed to {value}!")
for callback in self._notification_callbacks:
try:
callback(parent_path, name, value)
except Exception as e:
logger.error(e)
def add_notification_callback(
self, callback: Callable[[str, str, Any], None]
) -> None:
"""
Adds a new notification callback function to the list of callbacks.
This function is intended to be used for registering a function that will be
called whenever a the value of an attribute changes.
Args:
callback (Callable[[str, str, Any], None]): The callback function to
register.
It should accept three parameters:
- parent_path (str): The parent path of the parameter.
- name (str): The name of the changed parameter.
- value (Any): The value of the parameter.
"""
self._notification_callbacks.append(callback)

View File

@ -22,7 +22,8 @@ from pyDataInterface.utils.warnings import (
warn_if_instance_class_does_not_inherit_from_DataService,
)
from .data_service_list import DataServiceList
from .abstract_data_service import AbstractDataService
from .callback_manager import CallbackManager
from .task_manager import TaskManager
@ -37,366 +38,10 @@ def process_callable_attribute(attr: Any, args: dict[str, Any]) -> Any:
)
class CallbackManager:
_list_mapping: dict[int, DataServiceList] = {}
"""
A dictionary mapping the id of the original lists to the corresponding
DataServiceList instances.
This is used to ensure that all references to the same list within the DataService
object point to the same DataServiceList, so that any modifications to that list can
be tracked consistently. The keys of the dictionary are the ids of the original
lists, and the values are the DataServiceList instances that wrap these lists.
"""
def __init__(self, service: "DataService") -> None:
self.callbacks: set[Callable[[str, Any], None]] = set()
self.service = service
def _register_list_change_callbacks( # noqa: C901
self, obj: "DataService", parent_path: str
) -> None:
"""
This method ensures that notifications are emitted whenever a list attribute of
a DataService instance changes. These notifications pertain solely to the list
item changes, not to changes in attributes of objects within the list.
The method works by converting all list attributes (both at the class and
instance levels) into DataServiceList objects. Each DataServiceList is then
assigned a callback that is triggered whenever an item in the list is updated.
The callback emits a notification, but only if the DataService instance was the
root instance when the callback was registered.
This method operates recursively, processing the input object and all nested
attributes that are instances of DataService. While navigating the structure,
it constructs a path for each attribute that traces back to the root. This path
is included in any emitted notifications to facilitate identification of the
source of a change.
Parameters:
-----------
obj: DataService
The target object to be processed. All list attributes (and those of its
nested DataService attributes) will be converted into DataServiceList
objects.
parent_path: str
The access path for the parent object. Used to construct the full access
path for the notifications.
"""
# Convert all list attributes (both class and instance) to DataServiceList
attrs = get_class_and_instance_attributes(obj)
for attr_name, attr_value in attrs.items():
if isinstance(attr_value, DataService):
new_path = f"{parent_path}.{attr_name}"
self._register_list_change_callbacks(attr_value, new_path)
elif isinstance(attr_value, list):
# Create callback for current attr_name
# Default arguments solve the late binding problem by capturing the
# value at the time the lambda is defined, not when it is called. This
# prevents attr_name from being overwritten in the next loop iteration.
callback = (
lambda index, value, attr_name=attr_name: self.service._emit_notification(
parent_path=parent_path,
name=f"{attr_name}[{index}]",
value=value,
)
if self.service == self.service.__root__
else None
)
# Check if attr_value is already a DataServiceList or in the mapping
if isinstance(attr_value, DataServiceList):
attr_value.add_callback(callback)
continue
if id(attr_value) in self._list_mapping:
notifying_list = self._list_mapping[id(attr_value)]
notifying_list.add_callback(callback)
else:
notifying_list = DataServiceList(attr_value, callback=[callback])
self._list_mapping[id(attr_value)] = notifying_list
setattr(obj, attr_name, notifying_list)
# recursively add callbacks to list attributes of DataService instances
for i, item in enumerate(attr_value):
if isinstance(item, DataService):
new_path = f"{parent_path}.{attr_name}[{i}]"
self._register_list_change_callbacks(item, new_path)
def _register_DataService_instance_callbacks(
self, obj: "DataService", parent_path: str
) -> None:
"""
This function is a key part of the observer pattern implemented by the
DataService class.
Its purpose is to allow the system to automatically send out notifications
whenever an attribute of a DataService instance is updated, which is especially
useful when the DataService instance is part of a nested structure.
It works by recursively registering callbacks for a given DataService instance
and all of its nested attributes. Each callback is responsible for emitting a
notification when the attribute it is attached to is modified.
This function ensures that only the root DataService instance (the one directly
exposed to the user or another system via rpyc) emits notifications.
Each notification contains a 'parent_path' that traces the attribute's location
within the nested DataService structure, starting from the root. This makes it
easier for observers to determine exactly where a change has occurred.
Parameters:
-----------
obj: DataService
The target object on which callbacks are to be registered.
parent_path: str
The access path for the parent object. This is used to construct the full
access path for the notifications.
"""
# Create and register a callback for the object
# only emit the notification when the call was registered by the root object
callback: Callable[[str, Any], None] = (
lambda name, value: obj._emit_notification(
parent_path=parent_path, name=name, value=value
)
if self.service == obj.__root__
and not name.startswith("_") # we are only interested in public attributes
and not isinstance(
getattr(type(obj), name, None), property
) # exlude proerty notifications -> those are handled in separate callbacks
else None
)
obj._callback_manager.callbacks.add(callback)
# Recursively register callbacks for all nested attributes of the object
attrs = get_class_and_instance_attributes(obj)
for nested_attr_name, nested_attr in attrs.items():
if isinstance(nested_attr, DataServiceList):
self._register_list_callbacks(
nested_attr, parent_path, nested_attr_name
)
elif isinstance(nested_attr, DataService):
self._register_service_callbacks(
nested_attr, parent_path, nested_attr_name
)
def _register_list_callbacks(
self, nested_attr: list[Any], parent_path: str, attr_name: str
) -> None:
"""Handles registration of callbacks for list attributes"""
for i, list_item in enumerate(nested_attr):
if isinstance(list_item, DataService):
self._register_service_callbacks(
list_item, parent_path, f"{attr_name}[{i}]"
)
def _register_service_callbacks(
self, nested_attr: "DataService", parent_path: str, attr_name: str
) -> None:
"""Handles registration of callbacks for DataService attributes"""
# as the DataService is an attribute of self, change the root object
# use the dictionary to not trigger callbacks on initialised objects
nested_attr.__dict__["__root__"] = self.service.__root__
new_path = f"{parent_path}.{attr_name}"
self._register_DataService_instance_callbacks(nested_attr, new_path)
def __register_recursive_parameter_callback(
self,
obj: "DataService | DataServiceList",
callback: Callable[[str | int, Any], None],
) -> None:
"""
Register callback to a DataService or DataServiceList instance and its nested
instances.
For a DataService, this method traverses its attributes and recursively adds the
callback for nested DataService or DataServiceList instances. For a
DataServiceList,
the callback is also triggered when an item gets reassigned.
"""
if isinstance(obj, DataServiceList):
# emits callback when item in list gets reassigned
obj.add_callback(callback=callback)
obj_list: DataServiceList | list[DataService] = obj
else:
obj_list = [obj]
# this enables notifications when a class instance was changed (-> item is
# changed, not reassigned)
for item in obj_list:
if isinstance(item, DataService):
item._callback_manager.callbacks.add(callback)
for attr_name in set(dir(item)) - set(dir(object)) - {"__root__"}:
attr_value = getattr(item, attr_name)
if isinstance(attr_value, (DataService, DataServiceList)):
self.__register_recursive_parameter_callback(
attr_value, callback
)
def _register_property_callbacks( # noqa: C901
self,
obj: "DataService",
parent_path: str,
) -> None:
"""
Register callbacks to notify when properties or their dependencies change.
This method cycles through all attributes (both class and instance level) of the
input `obj`. For each attribute that is a property, it identifies dependencies
used in the getter method and creates a callback for each one.
The method is recursive for attributes that are of type DataService or
DataServiceList. It attaches the callback directly to DataServiceList items or
propagates it through nested DataService instances.
"""
attrs = get_class_and_instance_attributes(obj)
for attr_name, attr_value in attrs.items():
if isinstance(attr_value, DataService):
self._register_property_callbacks(
attr_value, parent_path=f"{parent_path}.{attr_name}"
)
elif isinstance(attr_value, DataServiceList):
for i, item in enumerate(attr_value):
if isinstance(item, DataService):
self._register_property_callbacks(
item, parent_path=f"{parent_path}.{attr_name}[{i}]"
)
if isinstance(attr_value, property):
dependencies = attr_value.fget.__code__.co_names # type: ignore
source_code_string = inspect.getsource(attr_value.fget) # type: ignore
for dependency in dependencies:
# check if the dependencies are attributes of obj
# This doesn't have to be the case like, for example, here:
# >>> @property
# >>> def power(self) -> float:
# >>> return self.class_attr.voltage * self.current
#
# The dependencies for this property are:
# > ('class_attr', 'voltage', 'current')
if f"self.{dependency}" not in source_code_string:
continue
# use `obj` instead of `type(obj)` to get DataServiceList
# instead of list
dependency_value = getattr(obj, dependency)
if isinstance(dependency_value, (DataServiceList, DataService)):
callback = (
lambda name, value, dependent_attr=attr_name: obj._emit_notification(
parent_path=parent_path,
name=dependent_attr,
value=getattr(obj, dependent_attr),
)
if self.service == obj.__root__
else None
)
self.__register_recursive_parameter_callback(
dependency_value,
callback=callback,
)
else:
callback = (
lambda name, _, dependent_attr=attr_name, dep=dependency: obj._emit_notification( # type: ignore
parent_path=parent_path,
name=dependent_attr,
value=getattr(obj, dependent_attr),
)
if name == dep and self.service == obj.__root__
else None
)
# Add to _callbacks
obj._callback_manager.callbacks.add(callback)
def _register_start_stop_task_callbacks(
self, obj: "DataService", parent_path: str
) -> None:
"""
This function registers callbacks for start and stop methods of async functions.
These callbacks are stored in the '_task_status_change_callbacks' attribute and
are called when the status of a task changes.
Parameters:
-----------
obj: DataService
The target object on which callbacks are to be registered.
parent_path: str
The access path for the parent object. This is used to construct the full
access path for the notifications.
"""
# Create and register a callback for the object
# only emit the notification when the call was registered by the root object
callback: Callable[[str, dict[str, Any] | None], None] = (
lambda name, status: obj._emit_notification(
parent_path=parent_path, name=name, value=status
)
if self.service == obj.__root__
and not name.startswith("_") # we are only interested in public attributes
else None
)
obj._task_status_change_callbacks.append(callback)
# Recursively register callbacks for all nested attributes of the object
attrs: dict[str, Any] = get_class_and_instance_attributes(obj)
for nested_attr_name, nested_attr in attrs.items():
if isinstance(nested_attr, DataService):
self._register_start_stop_task_callbacks(
nested_attr, parent_path=f"{parent_path}.{nested_attr_name}"
)
def register_callbacks(self) -> None:
self._register_list_change_callbacks(
self.service, f"{self.service.__class__.__name__}"
)
self._register_DataService_instance_callbacks(
self.service, f"{self.service.__class__.__name__}"
)
self._register_property_callbacks(
self.service, f"{self.service.__class__.__name__}"
)
self._register_start_stop_task_callbacks(
self.service, f"{self.service.__class__.__name__}"
)
class DataService(rpyc.Service, TaskManager):
_notification_callbacks: list[Callable[[str, str, Any], Any]] = []
"""
A list of callback functions that are executed when a change occurs in the
DataService instance. These functions are intended to handle or respond to these
changes in some way, such as emitting a socket.io message to the frontend.
Each function in this list should be a callable that accepts three parameters:
- parent_path (str): The path to the parent of the attribute that was changed.
- name (str): The name of the attribute that was changed.
- value (Any): The new value of the attribute.
A callback function can be added to this list using the add_notification_callback
method. Whenever a change in the DataService instance occurs (or in its nested
DataService or DataServiceList instances), the _emit_notification method is invoked,
which in turn calls all the callback functions in _notification_callbacks with the
appropriate arguments.
This implementation follows the observer pattern, with the DataService instance as
the "subject" and the callback functions as the "observers".
"""
class DataService(rpyc.Service, AbstractDataService, TaskManager):
def __init__(self, filename: Optional[str] = None) -> None:
TaskManager.__init__(self)
self._callback_manager = CallbackManager(self)
self._callback_manager: CallbackManager = CallbackManager(self)
self.__root__: "DataService" = self
"""Keep track of the root object. This helps to filter the emission of
notifications. This overwrite the TaksManager's __root__ attribute."""
@ -500,34 +145,6 @@ class DataService(rpyc.Service, TaskManager):
if not attr_name.startswith("_DataService__"):
warn_if_instance_class_does_not_inherit_from_DataService(attr_value)
def _emit_notification(self, parent_path: str, name: str, value: Any) -> None:
logger.debug(f"{parent_path}.{name} changed to {value}!")
for callback in self._notification_callbacks:
try:
callback(parent_path, name, value)
except Exception as e:
logger.error(e)
def add_notification_callback(
self, callback: Callable[[str, str, Any], None]
) -> None:
"""
Adds a new notification callback function to the list of callbacks.
This function is intended to be used for registering a function that will be
called whenever a the value of an attribute changes.
Args:
callback (Callable[[str, str, Any], None]): The callback function to
register.
It should accept three parameters:
- parent_path (str): The parent path of the parameter.
- name (str): The name of the changed parameter.
- value (Any): The value of the parameter.
"""
self._notification_callbacks.append(callback)
def serialize(self) -> dict[str, dict[str, Any]]: # noqa
"""
Serializes the instance into a dictionary, preserving the structure of the

View File

@ -170,7 +170,3 @@ class TaskManager(ABC):
logger.warning(
f"No start method found for service '{service_name}'"
)
@abstractmethod
def _emit_notification(self, parent_path: str, name: str, value: Any) -> None:
raise NotImplementedError

View File

@ -162,7 +162,7 @@ class Server:
self._loop.create_task(notify())
self._service.add_notification_callback(sio_callback)
self._service._callback_manager.add_notification_callback(sio_callback)
# overwrite uvicorn's signal handlers, otherwise it will bogart SIGINT and
# SIGTERM, which makes it impossible to escape out of

View File

@ -5,12 +5,17 @@ def warn_if_instance_class_does_not_inherit_from_DataService(__value: object) ->
base_class_name = __value.__class__.__base__.__name__
module_name = __value.__class__.__module__
if module_name not in [
"builtins",
"__builtin__",
"asyncio.unix_events",
"_abc",
] and base_class_name not in ["DataService", "list", "Enum"]:
if (
module_name
not in [
"builtins",
"__builtin__",
"asyncio.unix_events",
"_abc",
]
and base_class_name not in ["DataService", "list", "Enum"]
and type(__value).__name__ not in ["CallbackManager"]
):
logger.warning(
f"Warning: Class {type(__value).__name__} does not inherit from DataService."
)

View File

@ -1,11 +1,12 @@
from collections.abc import Generator
from typing import Any
from typing import Any, cast
import pytest
from loguru import logger
from pytest import LogCaptureFixture
from pyDataInterface import DataService
from pyDataInterface.data_service.callback_manager import CallbackManager
@pytest.fixture
@ -22,4 +23,4 @@ def emit(self: Any, parent_path: str, name: str, value: Any) -> None:
print(f"{parent_path}.{name} = {value}")
DataService._emit_notification = emit # type: ignore
cast(CallbackManager, DataService._callback_manager).emit_notification = emit # type: ignore