moving from _full_access_name to callbacks

This commit is contained in:
Mose Müller
2023-08-02 12:06:19 +02:00
parent b67c0f9da3
commit 460be17ecb
5 changed files with 671 additions and 488 deletions

View File

@ -3,7 +3,7 @@ import inspect
import threading
from collections.abc import Callable
from concurrent.futures import Future
from typing import Any, cast
from typing import Any
import rpyc
from loguru import logger
@ -12,30 +12,6 @@ from .data_service_list import DataServiceList
class DataService(rpyc.Service):
_full_access_path: set[str]
""" TODO: improve this docstring
A set of strings, each representing a unique path to access the attribute from an
exposed class instance. Each path starts with the name of the exposed class. It's
dynamically updated to accurately represent the current attribute structure.
This attribute is used to emit notifications to a web server whenever the attribute
changes, allowing for real-time tracking and updates of class instance
modifications.
Example:
--------
>>> class SubClass(DataService):
>>> pass
>>> class ExposedClass(DataService):
>>> attr = SubClass()
>>> service = ExposedClass()
>>> # ... expose class
>>> print(service.attr._full_access_path) # {"ServiceClass.attr"}
Have a look at tests/test_full_access_path.py to see more examples.
"""
_list_mapping: dict[int, DataServiceList] = {}
"""
A dictionary mapping the id of the original lists to the corresponding
@ -47,19 +23,24 @@ class DataService(rpyc.Service):
"""
def __init__(self) -> None:
# Keep track of the root object. This helps to filter the emission of
# notifications
self._root: "DataService" = self
# dictionary to keep track of running tasks
self.__tasks: dict[str, Future[None]] = {}
self._autostart_tasks: dict[str, tuple[Any]]
if "_autostart_tasks" not in self.__dict__:
self._autostart_tasks = {}
self._callbacks: set[Callable[[str, Any], None]] = set()
self._set_start_and_stop_for_async_methods()
self._start_async_loop_in_thread()
self._start_autostart_tasks()
self._update_full_access_path(self, f"{self.__class__.__name__}")
self._turn_lists_into_notify_lists()
self._register_callbacks(self, f"{self.__class__.__name__}")
self._turn_lists_into_notify_lists(self, f"{self.__class__.__name__}")
self._do_something_with_properties()
self._initialised = True
@ -69,43 +50,47 @@ class DataService(rpyc.Service):
if isinstance(attr_value, property): # If attribute is a property
logger.debug(attr_value.fget.__code__.co_names)
def _turn_lists_into_notify_lists(self) -> None:
def create_callback(attr_name: str) -> Callable:
"""TODO: explain what this is used for...
Create a callback with current attr_name captured in the default argument.
Default arguments solve the late binding problem by capturing the value at
the time the lambda is defined, not when it is called, thus preventing
attr_name from being overwritten in another loop iteratianother
"""
return lambda index, value, attr_name=attr_name: self._emit(
access_path=self._full_access_path,
name=f"{attr_name}[{index}]",
value=value,
)
def _turn_lists_into_notify_lists(
self, obj: "DataService", parent_path: str
) -> None:
# Convert all list attributes (both class and instance) to DataServiceList
for attr_name in set(dir(self)) - set(dir(object)):
attr_value = getattr(self, attr_name)
for attr_name in set(dir(obj)) - set(dir(object)) - {"_root"}:
attr_value = getattr(obj, attr_name)
if isinstance(attr_value, list):
if isinstance(attr_value, DataService):
new_path = f"{parent_path}.{attr_name}"
self._turn_lists_into_notify_lists(attr_value, new_path)
elif isinstance(attr_value, list):
# Create callback for current attr_name
callback = create_callback(attr_name)
# Default arguments solve the late binding problem by capturing the
# value at the time the lambda is defined, not when it is called. This
# prevents attr_name from being overwritten in the next loop iteration.
callback = (
lambda index, value, attr_name=attr_name: self._emit_notification(
parent_path=parent_path,
name=f"{attr_name}[{index}]",
value=value,
)
if self == self._root
else None
)
# Check if attr_value is already a DataServiceList or in the mapping
if isinstance(attr_value, DataServiceList):
attr_value.add_callback(callback)
continue
if id(attr_value) in self._list_mapping:
elif id(attr_value) in self._list_mapping:
notifying_list = self._list_mapping[id(attr_value)]
notifying_list.add_callback(callback)
else:
notifying_list = DataServiceList(attr_value, callback=[callback])
self._list_mapping[id(attr_value)] = notifying_list
setattr(self, attr_name, notifying_list)
setattr(obj, attr_name, notifying_list)
for i, item in enumerate(attr_value):
if isinstance(item, DataService):
new_path = f"{parent_path}.{attr_name}[{i}]"
self._turn_lists_into_notify_lists(item, new_path)
def _start_autostart_tasks(self) -> None:
if self._autostart_tasks is not None:
@ -151,37 +136,57 @@ class DataService(rpyc.Service):
setattr(self, f"start_{name}", start_task)
setattr(self, f"stop_{name}", stop_task)
def _update_full_access_path(self, obj: "DataService", parent_path: str) -> None:
def _register_callbacks(self, obj: "DataService", parent_path: str) -> None:
"""
Recursive helper function to update '_full_access_path' for the object and all
Recursive helper function to register callbacks for the object and all
its nested attributes
"""
parent_class_name = parent_path.split(".")[0] if parent_path else None
# Create and register a callback for the object
# only emit the notification when the call was registered by the root object
callback: Callable[[str, Any], None] = (
lambda name, value: obj._emit_notification(
parent_path=parent_path, name=name, value=value
)
if self == self._root
else None
)
# Remove all access paths that don't start with the parent class name. As the
# exposed class is instantiated last, this ensures that all access paths start
# with the root class
access_path: set[str] = {
p
for p in cast(list[str], getattr(obj, "_full_access_path", set()))
if not parent_class_name or p.startswith(parent_class_name)
}
# add the new access path
access_path.add(parent_path)
setattr(obj, "_full_access_path", access_path)
obj._callbacks.add(callback)
# Recursively update access paths for all nested attributes of the object
for nested_attr_name in set(dir(obj)) - set(dir(object)):
# Recursively register callbacks for all nested attributes of the object
attribute_set = set(dir(obj)) - set(dir(object)) - {"_root"}
for nested_attr_name in attribute_set:
nested_attr = getattr(obj, nested_attr_name)
if isinstance(nested_attr, list):
for i, list_item in enumerate(nested_attr):
if isinstance(list_item, DataService):
new_path = f"{parent_path}.{nested_attr_name}[{i}]"
self._update_full_access_path(list_item, new_path)
self._register_list_callbacks(
nested_attr, parent_path, nested_attr_name
)
elif isinstance(nested_attr, DataService):
new_path = f"{parent_path}.{nested_attr_name}"
self._update_full_access_path(nested_attr, new_path)
self._register_service_callbacks(
nested_attr, parent_path, nested_attr_name
)
def _register_list_callbacks(
self, nested_attr: list[Any], parent_path: str, attr_name: str
) -> None:
"""Handles registration of callbacks for list attributes"""
for i, list_item in enumerate(nested_attr):
if isinstance(list_item, DataService):
self._register_service_callbacks(
list_item, parent_path, f"{attr_name}[{i}]"
)
def _register_service_callbacks(
self, nested_attr: "DataService", parent_path: str, attr_name: str
) -> None:
"""Handles registration of callbacks for DataService attributes"""
# as the DataService is an attribute of self, change the root object
nested_attr._root = self._root
new_path = f"{parent_path}.{attr_name}"
self._register_callbacks(nested_attr, new_path)
def _start_loop(self) -> None:
asyncio.set_event_loop(self.__loop)
@ -195,17 +200,15 @@ class DataService(rpyc.Service):
self.__thread.join()
def __setattr__(self, __name: str, __value: Any) -> None:
if self.__dict__.get("_initialised"):
access_path: set[str] = getattr(self, "_full_access_path", set())
if access_path:
self._emit(access_path, __name, __value)
super().__setattr__(__name, __value)
if self.__dict__.get("_initialised") and not __name == "_initialised":
for callback in self._callbacks:
callback(__name, __value)
# TODO: add emits for properties -> can use co_names, which is a tuple
# containing the names used by the bytecode
super().__setattr__(__name, __value)
def _emit(self, access_path: set[str], name: str, value: Any) -> None:
for path in access_path:
logger.debug(f"{path}.{name} changed to {value}!")
def _emit_notification(self, parent_path: str, name: str, value: Any) -> None:
logger.debug(f"{parent_path}.{name} changed to {value}!")
def _rpyc_getattr(self, name: str) -> Any:
if name.startswith("_"):