From b0254daa1704785606f0c13e9be40c796a66ef5e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mose=20M=C3=BCller?= Date: Thu, 2 Nov 2023 18:21:43 +0100 Subject: [PATCH] adds StateManager --- src/pydase/data_service/state_manager.py | 146 +++++++++++++++++++++++ src/pydase/utils/helpers.py | 84 +++++++++++++ 2 files changed, 230 insertions(+) create mode 100644 src/pydase/data_service/state_manager.py diff --git a/src/pydase/data_service/state_manager.py b/src/pydase/data_service/state_manager.py new file mode 100644 index 0000000..b6c3bc5 --- /dev/null +++ b/src/pydase/data_service/state_manager.py @@ -0,0 +1,146 @@ +import json +import logging +import os +from typing import TYPE_CHECKING, Any, cast + +import pydase.units as u +from pydase.utils.helpers import ( + generate_paths_from_DataService_dict, + get_nested_value_from_DataService_by_path_and_key, + set_nested_value_in_dict, +) + +if TYPE_CHECKING: + from pydase import DataService + +logger = logging.getLogger(__name__) + + +class StateManager: + """ + Manages the state of a DataService instance, serving as both a cache and a + persistence layer. It is designed to provide quick access to the latest known state + for newly connecting web clients without the need for expensive property accesses + that may involve complex calculations or I/O operations. + + The StateManager listens for state change notifications from the DataService's + callback manager and updates its cache accordingly. This cache does not always + reflect the most current complex property states but rather retains the value from + the last known state, optimizing for performance and reducing the load on the + system. + + While the StateManager ensures that the cached state is as up-to-date as possible, + it does not autonomously update complex properties of the DataService. Such + properties must be updated programmatically, for instance, by invoking specific + tasks or methods that trigger the necessary operations to refresh their state. + + The cached state maintained by the StateManager is particularly useful for web + clients that connect to the system and need immediate access to the current state of + the DataService. By avoiding direct and potentially costly property accesses, the + StateManager provides a snapshot of the DataService's state that is sufficiently + accurate for initial rendering and interaction. + + Attributes: + cache (dict[str, Any]): + A dictionary cache of the DataService's state. + filename (str): + The file name used for storing the DataService's state. + service (DataService): + The DataService instance whose state is being managed. + + Note: + The StateManager's cache updates are triggered by notifications and do not + include autonomous updates of complex DataService properties, which must be + managed programmatically. The cache serves the purpose of providing immediate + state information to web clients, reflecting the state after the last property + update. + """ + + def __init__(self, service: "DataService"): + self.cache: dict[str, Any] = {} # Initialize an empty cache + self.filename = service._filename + self.service = service + self.service._callback_manager.add_notification_callback(self.update_cache) + + def update_cache(self, parent_path: str, name: str, value: Any) -> None: + # Remove the part before the first "." in the parent_path + parent_path = ".".join(parent_path.split(".")[1:]) + + # Construct the full path + full_path = f"{parent_path}.{name}" if parent_path else name + + set_nested_value_in_dict(self.cache, full_path, value) + + def save_state(self) -> None: + """ + Serialize the DataService instance and write it to a JSON file. + + Args: + filename (str): The name of the file to write to. + """ + if self.filename is not None: + with open(self.filename, "w") as f: + json.dump(self.cache, f, indent=4) + else: + logger.error( + f"Class {self.__class__.__name__} was not initialised with a filename. " + 'Skipping "write_to_file"...' + ) + + def load_state(self) -> None: + # Traverse the serialized representation and set the attributes of the class + if self.cache == {}: + self.cache = self.service.serialize() + + json_dict = self._load_state_from_file() + if json_dict == {}: + logger.debug("Could not load the service state.") + return + + serialized_class = self.cache + for path in generate_paths_from_DataService_dict(json_dict): + value = get_nested_value_from_DataService_by_path_and_key( + json_dict, path=path + ) + value_type = get_nested_value_from_DataService_by_path_and_key( + json_dict, path=path, key="type" + ) + class_value_type = get_nested_value_from_DataService_by_path_and_key( + serialized_class, path=path, key="type" + ) + if class_value_type == value_type: + class_attr_is_read_only = ( + get_nested_value_from_DataService_by_path_and_key( + serialized_class, path=path, key="readonly" + ) + ) + if class_attr_is_read_only: + logger.debug( + f'Attribute "{path}" is read-only. Ignoring value from JSON ' + "file..." + ) + continue + # Split the path into parts + parts = path.split(".") + attr_name = parts[-1] + + # Convert dictionary into Quantity + if class_value_type == "Quantity": + value = u.convert_to_quantity(value) + + self.service.update_DataService_attribute(parts[:-1], attr_name, value) + else: + logger.info( + f'Attribute type of "{path}" changed from "{value_type}" to ' + f'"{class_value_type}". Ignoring value from JSON file...' + ) + + def _load_state_from_file(self) -> dict[str, Any]: + if self.filename is not None: + # Check if the file specified by the filename exists + if os.path.exists(self.filename): + with open(self.filename, "r") as f: + # Load JSON data from file and update class attributes with these + # values + return cast(dict[str, Any], json.load(f)) + return {} diff --git a/src/pydase/utils/helpers.py b/src/pydase/utils/helpers.py index 8d2d053..14c327f 100644 --- a/src/pydase/utils/helpers.py +++ b/src/pydase/utils/helpers.py @@ -272,6 +272,90 @@ def get_nested_value_from_DataService_by_path_and_key( return current_data.get(key, None) +def set_nested_value_in_dict(data_dict: dict[str, Any], path: str, value: Any) -> None: + """ + Set the value associated with a specific key in a dictionary given a path. + + This function traverses the dictionary according to the path provided and + sets the value at that path. The path is a string with dots connecting + the levels and brackets indicating list indices. + + Args: + cache (dict): The cache dictionary to set the value in. + path (str): The path to where the value should be set in the dictionary. + value (Any): The value to be set at the specified path in the dictionary. + + Examples: + Let's consider the following dictionary: + + cache = { + "attr1": {"type": "int", "value": 10}, + "attr2": { + "type": "MyClass", + "value": {"attr3": {"type": "float", "value": 20.5}} + } + } + + The function can be used to set the value of 'attr1' as follows: + set_nested_value_in_cache(cache, "attr1", 15) + + It can also be used to set the value of 'attr3', which is nested within 'attr2', + as follows: + set_nested_value_in_cache(cache, "attr2.attr3", 25.0) + """ + + parts = path.split(".") + current_dict: dict[str, Any] = data_dict + index: Optional[int] = None + + for attr_name in parts: + # Check if the key contains an index part like '[]' + if "[" in attr_name and attr_name.endswith("]"): + attr_name, index_part = attr_name.split("[", 1) + index_part = index_part.rstrip("]") # remove the closing bracket + + # Convert the index part to an integer + if index_part.isdigit(): + index = int(index_part) + else: + logger.error(f"Invalid index format in key: {attr_name}") + + current_dict = cast(dict[str, Any], current_dict.get(attr_name, None)) + + if not isinstance(current_dict, dict): + # key does not exist in dictionary, e.g. when class does not have this + # attribute + return + + if index is not None: + if 0 <= index < len(current_dict["value"]): + try: + current_dict = cast(dict[str, Any], current_dict["value"][index]) + except Exception as e: + logger.error(f"Could not change {path}. Exception: {e}") + return + else: + # TODO: appending to a list will probably be done here + logger.error(f"Could not change {path}...") + return + + # When the attribute is a class instance, the attributes are nested in the + # "value" key + if ( + current_dict["type"] not in STANDARD_TYPES + and current_dict["type"] != "method" + ): + current_dict = cast(dict[str, Any], current_dict.get("value", None)) # type: ignore + + index = None + + # setting the new value + try: + current_dict["value"] = value + except Exception as e: + logger.error(e) + + def convert_arguments_to_hinted_types( args: dict[str, Any], type_hints: dict[str, Any] ) -> dict[str, Any] | str: