mirror of
https://github.com/tiqi-group/pydase.git
synced 2025-04-21 16:50:02 +02:00
moves property-related stuff from DataServiceObserver to PropertyObserver
This commit is contained in:
parent
a9c6070ca3
commit
0944a404dc
@ -4,11 +4,8 @@ from copy import deepcopy
|
||||
from typing import Any
|
||||
|
||||
from pydase.data_service.state_manager import StateManager
|
||||
from pydase.observer_pattern.observable.observable import Observable
|
||||
from pydase.observer_pattern.observer.observer import Observer
|
||||
from pydase.observer_pattern.observer.property_observer import (
|
||||
get_property_dependencies,
|
||||
reverse_dict,
|
||||
PropertyObserver,
|
||||
)
|
||||
from pydase.utils.helpers import get_object_attr_from_path_list
|
||||
from pydase.utils.serializer import dump
|
||||
@ -16,23 +13,15 @@ from pydase.utils.serializer import dump
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DataServiceObserver(Observer):
|
||||
class DataServiceObserver(PropertyObserver):
|
||||
def __init__(self, state_manager: StateManager) -> None:
|
||||
super().__init__(state_manager.service)
|
||||
self.initialised = False
|
||||
self.state_manager = state_manager
|
||||
self.property_deps_dict = reverse_dict(
|
||||
self._get_properties_and_their_dependencies(self.observable)
|
||||
)
|
||||
self._notification_callbacks: list[
|
||||
Callable[[str, Any, dict[str, Any]], None]
|
||||
] = []
|
||||
self.initialised = True
|
||||
super().__init__(state_manager.service)
|
||||
|
||||
def on_change(self, full_access_path: str, value: Any) -> None:
|
||||
if not self.initialised:
|
||||
return
|
||||
|
||||
cached_value_dict = deepcopy(
|
||||
self.state_manager._data_service_cache.get_value_dict_from_cache(
|
||||
full_access_path
|
||||
@ -84,64 +73,6 @@ class DataServiceObserver(Observer):
|
||||
get_object_attr_from_path_list(self.observable, prop.split(".")),
|
||||
)
|
||||
|
||||
def _get_properties_and_their_dependencies(
|
||||
self, obj: Observable, prefix: str = ""
|
||||
) -> dict[str, list[str]]:
|
||||
deps: dict[str, Any] = {}
|
||||
|
||||
self._process_observable_properties(obj, deps, prefix)
|
||||
self._process_nested_observables_properties(obj, deps, prefix)
|
||||
|
||||
return deps
|
||||
|
||||
def _process_observable_properties(
|
||||
self, obj: Observable, deps: dict[str, Any], prefix: str
|
||||
) -> None:
|
||||
for k, value in vars(type(obj)).items():
|
||||
prefix = (
|
||||
f"{prefix}." if prefix != "" and not prefix.endswith(".") else prefix
|
||||
)
|
||||
key = f"{prefix}{k}"
|
||||
if isinstance(value, property):
|
||||
deps[key] = get_property_dependencies(value, prefix)
|
||||
|
||||
def _process_nested_observables_properties(
|
||||
self, obj: Observable, deps: dict[str, Any], prefix: str
|
||||
) -> None:
|
||||
for k, value in vars(obj).items():
|
||||
prefix = (
|
||||
f"{prefix}." if prefix != "" and not prefix.endswith(".") else prefix
|
||||
)
|
||||
parent_path = f"{prefix}{k}"
|
||||
if isinstance(value, Observable):
|
||||
new_prefix = f"{parent_path}."
|
||||
deps.update(
|
||||
self._get_properties_and_their_dependencies(value, new_prefix)
|
||||
)
|
||||
elif isinstance(value, list | dict):
|
||||
self._process_collection_item_properties(value, deps, parent_path)
|
||||
|
||||
def _process_collection_item_properties(
|
||||
self,
|
||||
collection: list[Any] | dict[str, Any],
|
||||
deps: dict[str, Any],
|
||||
parent_path: str,
|
||||
) -> None:
|
||||
if isinstance(collection, list):
|
||||
for i, item in enumerate(collection):
|
||||
if isinstance(item, Observable):
|
||||
new_prefix = f"{parent_path}[{i}]"
|
||||
deps.update(
|
||||
self._get_properties_and_their_dependencies(item, new_prefix)
|
||||
)
|
||||
elif isinstance(collection, dict):
|
||||
for key, val in collection.items():
|
||||
if isinstance(val, Observable):
|
||||
new_prefix = f"{parent_path}['{key}']"
|
||||
deps.update(
|
||||
self._get_properties_and_their_dependencies(val, new_prefix)
|
||||
)
|
||||
|
||||
def add_notification_callback(
|
||||
self, callback: Callable[[str, Any, dict[str, Any]], None]
|
||||
) -> None:
|
||||
|
@ -5,7 +5,6 @@ from typing import Any
|
||||
|
||||
from pydase.observer_pattern.observable.observable import Observable
|
||||
from pydase.observer_pattern.observer.observer import Observer
|
||||
from pydase.utils.helpers import get_object_attr_from_path_list
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -21,7 +20,7 @@ def reverse_dict(original_dict: dict[str, list[str]]) -> dict[str, list[str]]:
|
||||
|
||||
|
||||
def get_property_dependencies(prop: property, prefix: str = "") -> list[str]:
|
||||
source_code_string = inspect.getsource(prop.fget) # type: ignore[arg-type, reportGeneralTypeIssues]
|
||||
source_code_string = inspect.getsource(prop.fget) # type: ignore[arg-type]
|
||||
pattern = r"self\.([^\s\{\}]+)"
|
||||
matches = re.findall(pattern, source_code_string)
|
||||
return [prefix + match for match in matches if "(" not in match]
|
||||
@ -30,70 +29,64 @@ def get_property_dependencies(prop: property, prefix: str = "") -> list[str]:
|
||||
class PropertyObserver(Observer):
|
||||
def __init__(self, observable: Observable) -> None:
|
||||
super().__init__(observable)
|
||||
self.initialised = False
|
||||
self.changing_attributes: list[str] = []
|
||||
self.property_deps_dict = reverse_dict(
|
||||
self._get_properties_and_their_dependencies(self.observable)
|
||||
)
|
||||
self.property_values = self._get_property_values(self.observable)
|
||||
self.initialised = True
|
||||
|
||||
def on_change(self, full_access_path: str, value: Any) -> None:
|
||||
if full_access_path in self.changing_attributes:
|
||||
self.changing_attributes.remove(full_access_path)
|
||||
|
||||
if (
|
||||
not self.initialised
|
||||
or self.property_values.get(full_access_path, None) == value
|
||||
):
|
||||
return
|
||||
|
||||
logger.info("'%s' changed to '%s'", full_access_path, value)
|
||||
if full_access_path in self.property_values:
|
||||
self.property_values[full_access_path] = value
|
||||
|
||||
changed_props = self.property_deps_dict.get(full_access_path, [])
|
||||
for prop in changed_props:
|
||||
if prop not in self.changing_attributes:
|
||||
self._notify_changed(
|
||||
prop,
|
||||
get_object_attr_from_path_list(self.observable, prop.split(".")),
|
||||
)
|
||||
|
||||
def on_change_start(self, full_access_path: str) -> None:
|
||||
self.changing_attributes.append(full_access_path)
|
||||
logger.info("'%s' is being changed", full_access_path)
|
||||
|
||||
def _get_properties_and_their_dependencies(
|
||||
self, obj: Observable, prefix: str = ""
|
||||
) -> dict[str, list[str]]:
|
||||
deps = {}
|
||||
deps: dict[str, Any] = {}
|
||||
|
||||
self._process_observable_properties(obj, deps, prefix)
|
||||
self._process_nested_observables_properties(obj, deps, prefix)
|
||||
|
||||
return deps
|
||||
|
||||
def _process_observable_properties(
|
||||
self, obj: Observable, deps: dict[str, Any], prefix: str
|
||||
) -> None:
|
||||
for k, value in vars(type(obj)).items():
|
||||
prefix = (
|
||||
f"{prefix}." if prefix != "" and not prefix.endswith(".") else prefix
|
||||
)
|
||||
key = f"{prefix}{k}"
|
||||
if isinstance(value, property):
|
||||
deps[key] = get_property_dependencies(value, prefix)
|
||||
|
||||
def _process_nested_observables_properties(
|
||||
self, obj: Observable, deps: dict[str, Any], prefix: str
|
||||
) -> None:
|
||||
for k, value in vars(obj).items():
|
||||
key = f"{prefix}{k}"
|
||||
prefix = (
|
||||
f"{prefix}." if prefix != "" and not prefix.endswith(".") else prefix
|
||||
)
|
||||
parent_path = f"{prefix}{k}"
|
||||
if isinstance(value, Observable):
|
||||
new_prefix = f"{key}." if not key.endswith("]") else key
|
||||
new_prefix = f"{parent_path}."
|
||||
deps.update(
|
||||
self._get_properties_and_their_dependencies(value, new_prefix)
|
||||
)
|
||||
return deps
|
||||
elif isinstance(value, list | dict):
|
||||
self._process_collection_item_properties(value, deps, parent_path)
|
||||
|
||||
def _get_property_values(
|
||||
self, obj: Observable, prefix: str = ""
|
||||
) -> dict[str, list[str]]:
|
||||
values = {}
|
||||
for k, value in vars(type(obj)).items():
|
||||
key = f"{prefix}{k}"
|
||||
if isinstance(value, property):
|
||||
values[key] = getattr(obj, k)
|
||||
|
||||
for k, value in vars(obj).items():
|
||||
key = f"{prefix}{k}"
|
||||
if isinstance(value, Observable):
|
||||
new_prefix = f"{key}." if not key.endswith("]") else key
|
||||
values.update(self._get_property_values(value, new_prefix))
|
||||
return values
|
||||
def _process_collection_item_properties(
|
||||
self,
|
||||
collection: list[Any] | dict[str, Any],
|
||||
deps: dict[str, Any],
|
||||
parent_path: str,
|
||||
) -> None:
|
||||
if isinstance(collection, list):
|
||||
for i, item in enumerate(collection):
|
||||
if isinstance(item, Observable):
|
||||
new_prefix = f"{parent_path}[{i}]"
|
||||
deps.update(
|
||||
self._get_properties_and_their_dependencies(item, new_prefix)
|
||||
)
|
||||
elif isinstance(collection, dict):
|
||||
for key, val in collection.items():
|
||||
if isinstance(val, Observable):
|
||||
new_prefix = f"{parent_path}['{key}']"
|
||||
deps.update(
|
||||
self._get_properties_and_their_dependencies(val, new_prefix)
|
||||
)
|
||||
|
Loading…
x
Reference in New Issue
Block a user