adds first version of observer_pattern module

This commit is contained in:
Mose Müller 2023-11-27 16:48:55 +01:00
parent e6e5ac84b4
commit 99dea381a3
7 changed files with 388 additions and 0 deletions

View File

View File

@ -0,0 +1,3 @@
from pydase.observer_pattern.observable.observable import Observable
__all__ = ["Observable"]

View File

@ -0,0 +1,53 @@
import logging
from typing import Any
from pydase.observer_pattern.observable.observable_object import ObservableObject
from pydase.utils.helpers import is_property_attribute
logger = logging.getLogger(__name__)
class Observable(ObservableObject):
def __init__(self) -> None:
super().__init__()
class_attrs = {
k: type(self).__dict__[k]
for k in set(type(self).__dict__)
- set(Observable.__dict__)
- set(self.__dict__)
}
for name, value in class_attrs.items():
if isinstance(value, property) or callable(value):
continue
self.__dict__[name] = self._initialise_new_objects(name, value)
def __setattr__(self, name: str, value: Any) -> None:
if hasattr(self, "_observers"):
self._remove_observer_if_observable(name)
value = self._initialise_new_objects(name, value)
self._notify_change_start(name)
super().__setattr__(name, value)
self._notify_changed(name, value)
def __getattribute__(self, name: str) -> Any:
value = super().__getattribute__(name)
if is_property_attribute(self, name):
self._notify_changed(name, value)
return value
def _remove_observer_if_observable(self, name: str) -> None:
if not is_property_attribute(self, name):
current_value = getattr(self, name, None)
if isinstance(current_value, ObservableObject):
current_value._remove_observer(self, name)
def _construct_extended_attr_path(
self, observer_attr_name: str, instance_attr_name: str
) -> str:
if observer_attr_name != "":
return f"{observer_attr_name}.{instance_attr_name}"
return instance_attr_name

View File

@ -0,0 +1,202 @@
import logging
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Any, ClassVar
if TYPE_CHECKING:
from pydase.observer_pattern.observer.observer import Observer
logger = logging.getLogger(__name__)
class ObservableObject(ABC):
_list_mapping: ClassVar[dict[int, "_ObservableList"]] = {}
_dict_mapping: ClassVar[dict[int, "_ObservableDict"]] = {}
def __init__(self) -> None:
self._observers: dict[str, list["ObservableObject | Observer"]] = {}
def add_observer(
self, observer: "ObservableObject | Observer", attr_name: str = ""
) -> None:
if attr_name not in self._observers:
self._observers[attr_name] = []
if observer not in self._observers[attr_name]:
self._observers[attr_name].append(observer)
def _remove_observer(self, observer: "ObservableObject", attribute: str) -> None:
if attribute in self._observers:
self._observers[attribute].remove(observer)
@abstractmethod
def _remove_observer_if_observable(self, name: str) -> None:
"""Removes the current object as an observer from an observable attribute.
This method is called before an attribute of the observable object is
changed. If the current value of the attribute is an instance of
`ObservableObject`, this method removes the current object from its list
of observers. This is a crucial step to avoid unwanted notifications from
the old value of the attribute.
"""
...
def _notify_changed(self, changed_attribute: str, value: Any) -> None:
"""Notifies all observers about changes to an attribute.
This method iterates through all observers registered for the object and
invokes their notification method. It is called whenever an attribute of
the observable object is changed.
Args:
changed_attribute (str): The name of the changed attribute.
value (Any): The value that the attribute was set to.
"""
for attr_name, observer_list in self._observers.items():
for observer in observer_list:
extendend_attr_path = self._construct_extended_attr_path(
attr_name, changed_attribute
)
observer._notify_changed(extendend_attr_path, value)
def _notify_change_start(self, changing_attribute: str) -> None:
"""Notify observers that an attribute or item change process has started.
This method is called at the start of the process of modifying an attribute in
the observed `Observable` object. It registers the attribute as currently
undergoing a change. This registration helps in managing and tracking changes as
they occur, especially in scenarios where the order of changes or their state
during the transition is significant.
Args:
changing_attribute (str): The name of the attribute that is starting to
change. This is typically the full access path of the attribute in the
`Observable`.
value (Any): The value that the attribute is being set to.
"""
for attr_name, observer_list in self._observers.items():
for observer in observer_list:
extended_attr_path = self._construct_extended_attr_path(
attr_name, changing_attribute
)
observer._notify_change_start(extended_attr_path)
def _initialise_new_objects(self, attr_name_or_key: Any, value: Any) -> Any:
new_value = value
if isinstance(value, list):
if id(value) in self._list_mapping:
# If the list `value` was already referenced somewhere else
new_value = self._list_mapping[id(value)]
else:
# convert the builtin list into a ObservableList
new_value = _ObservableList(original_list=value)
self._list_mapping[id(value)] = new_value
elif isinstance(value, dict):
if id(value) in self._dict_mapping:
# If the list `value` was already referenced somewhere else
new_value = self._dict_mapping[id(value)]
else:
# convert the builtin list into a ObservableList
new_value = _ObservableDict(original_dict=value)
self._dict_mapping[id(value)] = new_value
if isinstance(new_value, ObservableObject):
new_value.add_observer(self, str(attr_name_or_key))
return new_value
@abstractmethod
def _construct_extended_attr_path(
self, observer_attr_name: str, instance_attr_name: str
) -> str:
"""
Constructs the extended attribute path for notification purposes, which is used
in the observer pattern to specify the full path of an observed attribute.
This abstract method is implemented by the classes inheriting from
`ObservableObject`.
Args:
observer_attr_name (str): The name of the attribute in the observer that
holds a reference to the instance. Equals `""` if observer itself is of type
`Observer`.
instance_attr_name (str): The name of the attribute within the instance that
has changed.
Returns:
str: The constructed extended attribute path.
"""
...
class _ObservableList(ObservableObject, list):
def __init__(
self,
original_list: list[Any],
) -> None:
self._original_list = original_list
ObservableObject.__init__(self)
list.__init__(self, self._original_list)
for i, item in enumerate(self._original_list):
super().__setitem__(i, self._initialise_new_objects(f"[{i}]", item))
def __setitem__(self, key: int, value: Any) -> None: # type: ignore[override]
if hasattr(self, "_observers"):
self._remove_observer_if_observable(f"[{key}]")
value = self._initialise_new_objects(f"[{key}]", value)
self._notify_change_start(f"[{key}]")
super().__setitem__(key, value)
self._notify_changed(f"[{key}]", value)
def _remove_observer_if_observable(self, name: str) -> None:
key = int(name[1:-1])
current_value = self.__getitem__(key)
if isinstance(current_value, ObservableObject):
current_value._remove_observer(self, name)
def _construct_extended_attr_path(
self, observer_attr_name: str, instance_attr_name: str
) -> str:
if observer_attr_name != "":
return f"{observer_attr_name}{instance_attr_name}"
return instance_attr_name
class _ObservableDict(dict, ObservableObject):
def __init__(
self,
original_dict: dict[Any, Any],
) -> None:
self._original_dict = original_dict
ObservableObject.__init__(self)
dict.__init__(self)
for key, value in self._original_dict.items():
super().__setitem__(key, self._initialise_new_objects(f"['{key}']", value))
def __setitem__(self, key: str, value: Any) -> None: # type: ignore[override]
if not isinstance(key, str):
logger.warning("Converting non-string dictionary key %s to string.", key)
key = str(key)
if hasattr(self, "_observers"):
self._remove_observer_if_observable(f"['{key}']")
value = self._initialise_new_objects(key, value)
self._notify_change_start(f"['{key}']")
super().__setitem__(key, value)
self._notify_changed(f"['{key}']", value)
def _remove_observer_if_observable(self, name: str) -> None:
key = name[2:-2]
current_value = self.get(key, None)
if isinstance(current_value, ObservableObject):
current_value._remove_observer(self, name)
def _construct_extended_attr_path(
self, observer_attr_name: str, instance_attr_name: str
) -> str:
if observer_attr_name != "":
return f"{observer_attr_name}{instance_attr_name}"
return instance_attr_name

View File

@ -0,0 +1,31 @@
import logging
from abc import ABC, abstractmethod
from typing import Any
from pydase.observer_pattern.observable import Observable
logger = logging.getLogger(__name__)
class Observer(ABC):
def __init__(self, observable: Observable) -> None:
self.observable = observable
self.observable.add_observer(self)
self.changing_attributes: list[str] = []
def _notify_changed(self, changed_attribute: str, value: Any) -> None:
if changed_attribute in self.changing_attributes:
self.changing_attributes.remove(changed_attribute)
self.on_change(full_access_path=changed_attribute, value=value)
def _notify_change_start(self, changing_attribute: str) -> None:
self.changing_attributes.append(changing_attribute)
self.on_change_start(changing_attribute)
@abstractmethod
def on_change(self, full_access_path: str, value: Any) -> None:
...
def on_change_start(self, full_access_path: str) -> None:
return

View File

@ -0,0 +1,99 @@
import inspect
import logging
import re
from typing import Any
from pydase.observer_pattern.observable.observable import Observable
from pydase.observer_pattern.observer.observer import Observer
from pydase.utils.helpers import get_object_attr_from_path_list
logger = logging.getLogger(__name__)
def reverse_dict(original_dict: dict[str, list[str]]) -> dict[str, list[str]]:
reversed_dict: dict[str, list[str]] = {
value: [] for values in original_dict.values() for value in values
}
for key, values in original_dict.items():
for value in values:
reversed_dict[value].append(key)
return reversed_dict
def get_property_dependencies(prop: property, prefix: str = "") -> list[str]:
source_code_string = inspect.getsource(prop.fget) # type: ignore[arg-type, reportGeneralTypeIssues]
pattern = r"self\.([^\s\{\}]+)"
matches = re.findall(pattern, source_code_string)
return [prefix + match for match in matches if "(" not in match]
class PropertyObserver(Observer):
def __init__(self, observable: Observable) -> None:
super().__init__(observable)
self.initialised = False
self.changing_attributes: list[str] = []
self.property_deps_dict = reverse_dict(
self._get_properties_and_their_dependencies(self.observable)
)
self.property_values = self._get_property_values(self.observable)
self.initialised = True
def on_change(self, full_access_path: str, value: Any) -> None:
if full_access_path in self.changing_attributes:
self.changing_attributes.remove(full_access_path)
if (
not self.initialised
or self.property_values.get(full_access_path, None) == value
):
return
logger.info("'%s' changed to '%s'", full_access_path, value)
if full_access_path in self.property_values:
self.property_values[full_access_path] = value
changed_props = self.property_deps_dict.get(full_access_path, [])
for prop in changed_props:
if prop not in self.changing_attributes:
self._notify_changed(
prop,
get_object_attr_from_path_list(self.observable, prop.split(".")),
)
def on_change_start(self, full_access_path: str) -> None:
self.changing_attributes.append(full_access_path)
logger.info("'%s' is being changed", full_access_path)
def _get_properties_and_their_dependencies(
self, obj: Observable, prefix: str = ""
) -> dict[str, list[str]]:
deps = {}
for k, value in vars(type(obj)).items():
key = f"{prefix}{k}"
if isinstance(value, property):
deps[key] = get_property_dependencies(value, prefix)
for k, value in vars(obj).items():
key = f"{prefix}{k}"
if isinstance(value, Observable):
new_prefix = f"{key}." if not key.endswith("]") else key
deps.update(
self._get_properties_and_their_dependencies(value, new_prefix)
)
return deps
def _get_property_values(
self, obj: Observable, prefix: str = ""
) -> dict[str, list[str]]:
values = {}
for k, value in vars(type(obj)).items():
key = f"{prefix}{k}"
if isinstance(value, property):
values[key] = getattr(obj, k)
for k, value in vars(obj).items():
key = f"{prefix}{k}"
if isinstance(value, Observable):
new_prefix = f"{key}." if not key.endswith("]") else key
values.update(self._get_property_values(value, new_prefix))
return values