From 99dea381a3ce502e4c2b9c03669956d02ac71053 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mose=20M=C3=BCller?= Date: Mon, 27 Nov 2023 16:48:55 +0100 Subject: [PATCH] adds first version of observer_pattern module --- src/pydase/observer_pattern/__init__.py | 0 .../observer_pattern/observable/__init__.py | 3 + .../observer_pattern/observable/observable.py | 53 +++++ .../observable/observable_object.py | 202 ++++++++++++++++++ .../observer_pattern/observer/__init__.py | 0 .../observer_pattern/observer/observer.py | 31 +++ .../observer/property_observer.py | 99 +++++++++ 7 files changed, 388 insertions(+) create mode 100644 src/pydase/observer_pattern/__init__.py create mode 100644 src/pydase/observer_pattern/observable/__init__.py create mode 100644 src/pydase/observer_pattern/observable/observable.py create mode 100644 src/pydase/observer_pattern/observable/observable_object.py create mode 100644 src/pydase/observer_pattern/observer/__init__.py create mode 100644 src/pydase/observer_pattern/observer/observer.py create mode 100644 src/pydase/observer_pattern/observer/property_observer.py diff --git a/src/pydase/observer_pattern/__init__.py b/src/pydase/observer_pattern/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/pydase/observer_pattern/observable/__init__.py b/src/pydase/observer_pattern/observable/__init__.py new file mode 100644 index 0000000..3da11ec --- /dev/null +++ b/src/pydase/observer_pattern/observable/__init__.py @@ -0,0 +1,3 @@ +from pydase.observer_pattern.observable.observable import Observable + +__all__ = ["Observable"] diff --git a/src/pydase/observer_pattern/observable/observable.py b/src/pydase/observer_pattern/observable/observable.py new file mode 100644 index 0000000..c447991 --- /dev/null +++ b/src/pydase/observer_pattern/observable/observable.py @@ -0,0 +1,53 @@ +import logging +from typing import Any + +from pydase.observer_pattern.observable.observable_object import ObservableObject +from pydase.utils.helpers import is_property_attribute + +logger = logging.getLogger(__name__) + + +class Observable(ObservableObject): + def __init__(self) -> None: + super().__init__() + class_attrs = { + k: type(self).__dict__[k] + for k in set(type(self).__dict__) + - set(Observable.__dict__) + - set(self.__dict__) + } + for name, value in class_attrs.items(): + if isinstance(value, property) or callable(value): + continue + self.__dict__[name] = self._initialise_new_objects(name, value) + + def __setattr__(self, name: str, value: Any) -> None: + if hasattr(self, "_observers"): + self._remove_observer_if_observable(name) + value = self._initialise_new_objects(name, value) + self._notify_change_start(name) + + super().__setattr__(name, value) + + self._notify_changed(name, value) + + def __getattribute__(self, name: str) -> Any: + value = super().__getattribute__(name) + if is_property_attribute(self, name): + self._notify_changed(name, value) + + return value + + def _remove_observer_if_observable(self, name: str) -> None: + if not is_property_attribute(self, name): + current_value = getattr(self, name, None) + + if isinstance(current_value, ObservableObject): + current_value._remove_observer(self, name) + + def _construct_extended_attr_path( + self, observer_attr_name: str, instance_attr_name: str + ) -> str: + if observer_attr_name != "": + return f"{observer_attr_name}.{instance_attr_name}" + return instance_attr_name diff --git a/src/pydase/observer_pattern/observable/observable_object.py b/src/pydase/observer_pattern/observable/observable_object.py new file mode 100644 index 0000000..fe6fb32 --- /dev/null +++ b/src/pydase/observer_pattern/observable/observable_object.py @@ -0,0 +1,202 @@ +import logging +from abc import ABC, abstractmethod +from typing import TYPE_CHECKING, Any, ClassVar + +if TYPE_CHECKING: + from pydase.observer_pattern.observer.observer import Observer + +logger = logging.getLogger(__name__) + + +class ObservableObject(ABC): + _list_mapping: ClassVar[dict[int, "_ObservableList"]] = {} + _dict_mapping: ClassVar[dict[int, "_ObservableDict"]] = {} + + def __init__(self) -> None: + self._observers: dict[str, list["ObservableObject | Observer"]] = {} + + def add_observer( + self, observer: "ObservableObject | Observer", attr_name: str = "" + ) -> None: + if attr_name not in self._observers: + self._observers[attr_name] = [] + if observer not in self._observers[attr_name]: + self._observers[attr_name].append(observer) + + def _remove_observer(self, observer: "ObservableObject", attribute: str) -> None: + if attribute in self._observers: + self._observers[attribute].remove(observer) + + @abstractmethod + def _remove_observer_if_observable(self, name: str) -> None: + """Removes the current object as an observer from an observable attribute. + + This method is called before an attribute of the observable object is + changed. If the current value of the attribute is an instance of + `ObservableObject`, this method removes the current object from its list + of observers. This is a crucial step to avoid unwanted notifications from + the old value of the attribute. + """ + ... + + def _notify_changed(self, changed_attribute: str, value: Any) -> None: + """Notifies all observers about changes to an attribute. + + This method iterates through all observers registered for the object and + invokes their notification method. It is called whenever an attribute of + the observable object is changed. + + Args: + changed_attribute (str): The name of the changed attribute. + value (Any): The value that the attribute was set to. + """ + for attr_name, observer_list in self._observers.items(): + for observer in observer_list: + extendend_attr_path = self._construct_extended_attr_path( + attr_name, changed_attribute + ) + observer._notify_changed(extendend_attr_path, value) + + def _notify_change_start(self, changing_attribute: str) -> None: + """Notify observers that an attribute or item change process has started. + + This method is called at the start of the process of modifying an attribute in + the observed `Observable` object. It registers the attribute as currently + undergoing a change. This registration helps in managing and tracking changes as + they occur, especially in scenarios where the order of changes or their state + during the transition is significant. + + Args: + changing_attribute (str): The name of the attribute that is starting to + change. This is typically the full access path of the attribute in the + `Observable`. + value (Any): The value that the attribute is being set to. + """ + + for attr_name, observer_list in self._observers.items(): + for observer in observer_list: + extended_attr_path = self._construct_extended_attr_path( + attr_name, changing_attribute + ) + observer._notify_change_start(extended_attr_path) + + def _initialise_new_objects(self, attr_name_or_key: Any, value: Any) -> Any: + new_value = value + if isinstance(value, list): + if id(value) in self._list_mapping: + # If the list `value` was already referenced somewhere else + new_value = self._list_mapping[id(value)] + else: + # convert the builtin list into a ObservableList + new_value = _ObservableList(original_list=value) + self._list_mapping[id(value)] = new_value + elif isinstance(value, dict): + if id(value) in self._dict_mapping: + # If the list `value` was already referenced somewhere else + new_value = self._dict_mapping[id(value)] + else: + # convert the builtin list into a ObservableList + new_value = _ObservableDict(original_dict=value) + self._dict_mapping[id(value)] = new_value + if isinstance(new_value, ObservableObject): + new_value.add_observer(self, str(attr_name_or_key)) + return new_value + + @abstractmethod + def _construct_extended_attr_path( + self, observer_attr_name: str, instance_attr_name: str + ) -> str: + """ + Constructs the extended attribute path for notification purposes, which is used + in the observer pattern to specify the full path of an observed attribute. + + This abstract method is implemented by the classes inheriting from + `ObservableObject`. + + Args: + observer_attr_name (str): The name of the attribute in the observer that + holds a reference to the instance. Equals `""` if observer itself is of type + `Observer`. + instance_attr_name (str): The name of the attribute within the instance that + has changed. + + Returns: + str: The constructed extended attribute path. + """ + ... + + +class _ObservableList(ObservableObject, list): + def __init__( + self, + original_list: list[Any], + ) -> None: + self._original_list = original_list + ObservableObject.__init__(self) + list.__init__(self, self._original_list) + for i, item in enumerate(self._original_list): + super().__setitem__(i, self._initialise_new_objects(f"[{i}]", item)) + + def __setitem__(self, key: int, value: Any) -> None: # type: ignore[override] + if hasattr(self, "_observers"): + self._remove_observer_if_observable(f"[{key}]") + value = self._initialise_new_objects(f"[{key}]", value) + self._notify_change_start(f"[{key}]") + + super().__setitem__(key, value) + + self._notify_changed(f"[{key}]", value) + + def _remove_observer_if_observable(self, name: str) -> None: + key = int(name[1:-1]) + current_value = self.__getitem__(key) + + if isinstance(current_value, ObservableObject): + current_value._remove_observer(self, name) + + def _construct_extended_attr_path( + self, observer_attr_name: str, instance_attr_name: str + ) -> str: + if observer_attr_name != "": + return f"{observer_attr_name}{instance_attr_name}" + return instance_attr_name + + +class _ObservableDict(dict, ObservableObject): + def __init__( + self, + original_dict: dict[Any, Any], + ) -> None: + self._original_dict = original_dict + ObservableObject.__init__(self) + dict.__init__(self) + for key, value in self._original_dict.items(): + super().__setitem__(key, self._initialise_new_objects(f"['{key}']", value)) + + def __setitem__(self, key: str, value: Any) -> None: # type: ignore[override] + if not isinstance(key, str): + logger.warning("Converting non-string dictionary key %s to string.", key) + key = str(key) + + if hasattr(self, "_observers"): + self._remove_observer_if_observable(f"['{key}']") + value = self._initialise_new_objects(key, value) + self._notify_change_start(f"['{key}']") + + super().__setitem__(key, value) + + self._notify_changed(f"['{key}']", value) + + def _remove_observer_if_observable(self, name: str) -> None: + key = name[2:-2] + current_value = self.get(key, None) + + if isinstance(current_value, ObservableObject): + current_value._remove_observer(self, name) + + def _construct_extended_attr_path( + self, observer_attr_name: str, instance_attr_name: str + ) -> str: + if observer_attr_name != "": + return f"{observer_attr_name}{instance_attr_name}" + return instance_attr_name diff --git a/src/pydase/observer_pattern/observer/__init__.py b/src/pydase/observer_pattern/observer/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/pydase/observer_pattern/observer/observer.py b/src/pydase/observer_pattern/observer/observer.py new file mode 100644 index 0000000..b36dc6b --- /dev/null +++ b/src/pydase/observer_pattern/observer/observer.py @@ -0,0 +1,31 @@ +import logging +from abc import ABC, abstractmethod +from typing import Any + +from pydase.observer_pattern.observable import Observable + +logger = logging.getLogger(__name__) + + +class Observer(ABC): + def __init__(self, observable: Observable) -> None: + self.observable = observable + self.observable.add_observer(self) + self.changing_attributes: list[str] = [] + + def _notify_changed(self, changed_attribute: str, value: Any) -> None: + if changed_attribute in self.changing_attributes: + self.changing_attributes.remove(changed_attribute) + + self.on_change(full_access_path=changed_attribute, value=value) + + def _notify_change_start(self, changing_attribute: str) -> None: + self.changing_attributes.append(changing_attribute) + self.on_change_start(changing_attribute) + + @abstractmethod + def on_change(self, full_access_path: str, value: Any) -> None: + ... + + def on_change_start(self, full_access_path: str) -> None: + return diff --git a/src/pydase/observer_pattern/observer/property_observer.py b/src/pydase/observer_pattern/observer/property_observer.py new file mode 100644 index 0000000..4e34b83 --- /dev/null +++ b/src/pydase/observer_pattern/observer/property_observer.py @@ -0,0 +1,99 @@ +import inspect +import logging +import re +from typing import Any + +from pydase.observer_pattern.observable.observable import Observable +from pydase.observer_pattern.observer.observer import Observer +from pydase.utils.helpers import get_object_attr_from_path_list + +logger = logging.getLogger(__name__) + + +def reverse_dict(original_dict: dict[str, list[str]]) -> dict[str, list[str]]: + reversed_dict: dict[str, list[str]] = { + value: [] for values in original_dict.values() for value in values + } + for key, values in original_dict.items(): + for value in values: + reversed_dict[value].append(key) + return reversed_dict + + +def get_property_dependencies(prop: property, prefix: str = "") -> list[str]: + source_code_string = inspect.getsource(prop.fget) # type: ignore[arg-type, reportGeneralTypeIssues] + pattern = r"self\.([^\s\{\}]+)" + matches = re.findall(pattern, source_code_string) + return [prefix + match for match in matches if "(" not in match] + + +class PropertyObserver(Observer): + def __init__(self, observable: Observable) -> None: + super().__init__(observable) + self.initialised = False + self.changing_attributes: list[str] = [] + self.property_deps_dict = reverse_dict( + self._get_properties_and_their_dependencies(self.observable) + ) + self.property_values = self._get_property_values(self.observable) + self.initialised = True + + def on_change(self, full_access_path: str, value: Any) -> None: + if full_access_path in self.changing_attributes: + self.changing_attributes.remove(full_access_path) + + if ( + not self.initialised + or self.property_values.get(full_access_path, None) == value + ): + return + + logger.info("'%s' changed to '%s'", full_access_path, value) + if full_access_path in self.property_values: + self.property_values[full_access_path] = value + + changed_props = self.property_deps_dict.get(full_access_path, []) + for prop in changed_props: + if prop not in self.changing_attributes: + self._notify_changed( + prop, + get_object_attr_from_path_list(self.observable, prop.split(".")), + ) + + def on_change_start(self, full_access_path: str) -> None: + self.changing_attributes.append(full_access_path) + logger.info("'%s' is being changed", full_access_path) + + def _get_properties_and_their_dependencies( + self, obj: Observable, prefix: str = "" + ) -> dict[str, list[str]]: + deps = {} + for k, value in vars(type(obj)).items(): + key = f"{prefix}{k}" + if isinstance(value, property): + deps[key] = get_property_dependencies(value, prefix) + + for k, value in vars(obj).items(): + key = f"{prefix}{k}" + if isinstance(value, Observable): + new_prefix = f"{key}." if not key.endswith("]") else key + deps.update( + self._get_properties_and_their_dependencies(value, new_prefix) + ) + return deps + + def _get_property_values( + self, obj: Observable, prefix: str = "" + ) -> dict[str, list[str]]: + values = {} + for k, value in vars(type(obj)).items(): + key = f"{prefix}{k}" + if isinstance(value, property): + values[key] = getattr(obj, k) + + for k, value in vars(obj).items(): + key = f"{prefix}{k}" + if isinstance(value, Observable): + new_prefix = f"{key}." if not key.endswith("]") else key + values.update(self._get_property_values(value, new_prefix)) + return values