diff --git a/src/pydase/data_service/data_service_observer.py b/src/pydase/data_service/data_service_observer.py index 268d6ee..4989f5c 100644 --- a/src/pydase/data_service/data_service_observer.py +++ b/src/pydase/data_service/data_service_observer.py @@ -4,11 +4,8 @@ from copy import deepcopy from typing import Any from pydase.data_service.state_manager import StateManager -from pydase.observer_pattern.observable.observable import Observable -from pydase.observer_pattern.observer.observer import Observer from pydase.observer_pattern.observer.property_observer import ( - get_property_dependencies, - reverse_dict, + PropertyObserver, ) from pydase.utils.helpers import get_object_attr_from_path_list from pydase.utils.serializer import dump @@ -16,23 +13,15 @@ from pydase.utils.serializer import dump logger = logging.getLogger(__name__) -class DataServiceObserver(Observer): +class DataServiceObserver(PropertyObserver): def __init__(self, state_manager: StateManager) -> None: - super().__init__(state_manager.service) - self.initialised = False self.state_manager = state_manager - self.property_deps_dict = reverse_dict( - self._get_properties_and_their_dependencies(self.observable) - ) self._notification_callbacks: list[ Callable[[str, Any, dict[str, Any]], None] ] = [] - self.initialised = True + super().__init__(state_manager.service) def on_change(self, full_access_path: str, value: Any) -> None: - if not self.initialised: - return - cached_value_dict = deepcopy( self.state_manager._data_service_cache.get_value_dict_from_cache( full_access_path @@ -84,64 +73,6 @@ class DataServiceObserver(Observer): get_object_attr_from_path_list(self.observable, prop.split(".")), ) - def _get_properties_and_their_dependencies( - self, obj: Observable, prefix: str = "" - ) -> dict[str, list[str]]: - deps: dict[str, Any] = {} - - self._process_observable_properties(obj, deps, prefix) - self._process_nested_observables_properties(obj, deps, prefix) - - return deps - - def _process_observable_properties( - self, obj: Observable, deps: dict[str, Any], prefix: str - ) -> None: - for k, value in vars(type(obj)).items(): - prefix = ( - f"{prefix}." if prefix != "" and not prefix.endswith(".") else prefix - ) - key = f"{prefix}{k}" - if isinstance(value, property): - deps[key] = get_property_dependencies(value, prefix) - - def _process_nested_observables_properties( - self, obj: Observable, deps: dict[str, Any], prefix: str - ) -> None: - for k, value in vars(obj).items(): - prefix = ( - f"{prefix}." if prefix != "" and not prefix.endswith(".") else prefix - ) - parent_path = f"{prefix}{k}" - if isinstance(value, Observable): - new_prefix = f"{parent_path}." - deps.update( - self._get_properties_and_their_dependencies(value, new_prefix) - ) - elif isinstance(value, list | dict): - self._process_collection_item_properties(value, deps, parent_path) - - def _process_collection_item_properties( - self, - collection: list[Any] | dict[str, Any], - deps: dict[str, Any], - parent_path: str, - ) -> None: - if isinstance(collection, list): - for i, item in enumerate(collection): - if isinstance(item, Observable): - new_prefix = f"{parent_path}[{i}]" - deps.update( - self._get_properties_and_their_dependencies(item, new_prefix) - ) - elif isinstance(collection, dict): - for key, val in collection.items(): - if isinstance(val, Observable): - new_prefix = f"{parent_path}['{key}']" - deps.update( - self._get_properties_and_their_dependencies(val, new_prefix) - ) - def add_notification_callback( self, callback: Callable[[str, Any, dict[str, Any]], None] ) -> None: diff --git a/src/pydase/observer_pattern/observer/property_observer.py b/src/pydase/observer_pattern/observer/property_observer.py index 4e34b83..9298d3e 100644 --- a/src/pydase/observer_pattern/observer/property_observer.py +++ b/src/pydase/observer_pattern/observer/property_observer.py @@ -5,7 +5,6 @@ from typing import Any from pydase.observer_pattern.observable.observable import Observable from pydase.observer_pattern.observer.observer import Observer -from pydase.utils.helpers import get_object_attr_from_path_list logger = logging.getLogger(__name__) @@ -21,7 +20,7 @@ def reverse_dict(original_dict: dict[str, list[str]]) -> dict[str, list[str]]: def get_property_dependencies(prop: property, prefix: str = "") -> list[str]: - source_code_string = inspect.getsource(prop.fget) # type: ignore[arg-type, reportGeneralTypeIssues] + source_code_string = inspect.getsource(prop.fget) # type: ignore[arg-type] pattern = r"self\.([^\s\{\}]+)" matches = re.findall(pattern, source_code_string) return [prefix + match for match in matches if "(" not in match] @@ -30,70 +29,64 @@ def get_property_dependencies(prop: property, prefix: str = "") -> list[str]: class PropertyObserver(Observer): def __init__(self, observable: Observable) -> None: super().__init__(observable) - self.initialised = False - self.changing_attributes: list[str] = [] self.property_deps_dict = reverse_dict( self._get_properties_and_their_dependencies(self.observable) ) - self.property_values = self._get_property_values(self.observable) - self.initialised = True - - def on_change(self, full_access_path: str, value: Any) -> None: - if full_access_path in self.changing_attributes: - self.changing_attributes.remove(full_access_path) - - if ( - not self.initialised - or self.property_values.get(full_access_path, None) == value - ): - return - - logger.info("'%s' changed to '%s'", full_access_path, value) - if full_access_path in self.property_values: - self.property_values[full_access_path] = value - - changed_props = self.property_deps_dict.get(full_access_path, []) - for prop in changed_props: - if prop not in self.changing_attributes: - self._notify_changed( - prop, - get_object_attr_from_path_list(self.observable, prop.split(".")), - ) - - def on_change_start(self, full_access_path: str) -> None: - self.changing_attributes.append(full_access_path) - logger.info("'%s' is being changed", full_access_path) def _get_properties_and_their_dependencies( self, obj: Observable, prefix: str = "" ) -> dict[str, list[str]]: - deps = {} + deps: dict[str, Any] = {} + + self._process_observable_properties(obj, deps, prefix) + self._process_nested_observables_properties(obj, deps, prefix) + + return deps + + def _process_observable_properties( + self, obj: Observable, deps: dict[str, Any], prefix: str + ) -> None: for k, value in vars(type(obj)).items(): + prefix = ( + f"{prefix}." if prefix != "" and not prefix.endswith(".") else prefix + ) key = f"{prefix}{k}" if isinstance(value, property): deps[key] = get_property_dependencies(value, prefix) + def _process_nested_observables_properties( + self, obj: Observable, deps: dict[str, Any], prefix: str + ) -> None: for k, value in vars(obj).items(): - key = f"{prefix}{k}" + prefix = ( + f"{prefix}." if prefix != "" and not prefix.endswith(".") else prefix + ) + parent_path = f"{prefix}{k}" if isinstance(value, Observable): - new_prefix = f"{key}." if not key.endswith("]") else key + new_prefix = f"{parent_path}." deps.update( self._get_properties_and_their_dependencies(value, new_prefix) ) - return deps + elif isinstance(value, list | dict): + self._process_collection_item_properties(value, deps, parent_path) - def _get_property_values( - self, obj: Observable, prefix: str = "" - ) -> dict[str, list[str]]: - values = {} - for k, value in vars(type(obj)).items(): - key = f"{prefix}{k}" - if isinstance(value, property): - values[key] = getattr(obj, k) - - for k, value in vars(obj).items(): - key = f"{prefix}{k}" - if isinstance(value, Observable): - new_prefix = f"{key}." if not key.endswith("]") else key - values.update(self._get_property_values(value, new_prefix)) - return values + def _process_collection_item_properties( + self, + collection: list[Any] | dict[str, Any], + deps: dict[str, Any], + parent_path: str, + ) -> None: + if isinstance(collection, list): + for i, item in enumerate(collection): + if isinstance(item, Observable): + new_prefix = f"{parent_path}[{i}]" + deps.update( + self._get_properties_and_their_dependencies(item, new_prefix) + ) + elif isinstance(collection, dict): + for key, val in collection.items(): + if isinstance(val, Observable): + new_prefix = f"{parent_path}['{key}']" + deps.update( + self._get_properties_and_their_dependencies(val, new_prefix) + )