adds StateManager

This commit is contained in:
Mose Müller 2023-11-02 18:21:43 +01:00
parent b08a976d2a
commit b0254daa17
2 changed files with 230 additions and 0 deletions

View File

@ -0,0 +1,146 @@
import json
import logging
import os
from typing import TYPE_CHECKING, Any, cast
import pydase.units as u
from pydase.utils.helpers import (
generate_paths_from_DataService_dict,
get_nested_value_from_DataService_by_path_and_key,
set_nested_value_in_dict,
)
if TYPE_CHECKING:
from pydase import DataService
logger = logging.getLogger(__name__)
class StateManager:
"""
Manages the state of a DataService instance, serving as both a cache and a
persistence layer. It is designed to provide quick access to the latest known state
for newly connecting web clients without the need for expensive property accesses
that may involve complex calculations or I/O operations.
The StateManager listens for state change notifications from the DataService's
callback manager and updates its cache accordingly. This cache does not always
reflect the most current complex property states but rather retains the value from
the last known state, optimizing for performance and reducing the load on the
system.
While the StateManager ensures that the cached state is as up-to-date as possible,
it does not autonomously update complex properties of the DataService. Such
properties must be updated programmatically, for instance, by invoking specific
tasks or methods that trigger the necessary operations to refresh their state.
The cached state maintained by the StateManager is particularly useful for web
clients that connect to the system and need immediate access to the current state of
the DataService. By avoiding direct and potentially costly property accesses, the
StateManager provides a snapshot of the DataService's state that is sufficiently
accurate for initial rendering and interaction.
Attributes:
cache (dict[str, Any]):
A dictionary cache of the DataService's state.
filename (str):
The file name used for storing the DataService's state.
service (DataService):
The DataService instance whose state is being managed.
Note:
The StateManager's cache updates are triggered by notifications and do not
include autonomous updates of complex DataService properties, which must be
managed programmatically. The cache serves the purpose of providing immediate
state information to web clients, reflecting the state after the last property
update.
"""
def __init__(self, service: "DataService"):
self.cache: dict[str, Any] = {} # Initialize an empty cache
self.filename = service._filename
self.service = service
self.service._callback_manager.add_notification_callback(self.update_cache)
def update_cache(self, parent_path: str, name: str, value: Any) -> None:
# Remove the part before the first "." in the parent_path
parent_path = ".".join(parent_path.split(".")[1:])
# Construct the full path
full_path = f"{parent_path}.{name}" if parent_path else name
set_nested_value_in_dict(self.cache, full_path, value)
def save_state(self) -> None:
"""
Serialize the DataService instance and write it to a JSON file.
Args:
filename (str): The name of the file to write to.
"""
if self.filename is not None:
with open(self.filename, "w") as f:
json.dump(self.cache, f, indent=4)
else:
logger.error(
f"Class {self.__class__.__name__} was not initialised with a filename. "
'Skipping "write_to_file"...'
)
def load_state(self) -> None:
# Traverse the serialized representation and set the attributes of the class
if self.cache == {}:
self.cache = self.service.serialize()
json_dict = self._load_state_from_file()
if json_dict == {}:
logger.debug("Could not load the service state.")
return
serialized_class = self.cache
for path in generate_paths_from_DataService_dict(json_dict):
value = get_nested_value_from_DataService_by_path_and_key(
json_dict, path=path
)
value_type = get_nested_value_from_DataService_by_path_and_key(
json_dict, path=path, key="type"
)
class_value_type = get_nested_value_from_DataService_by_path_and_key(
serialized_class, path=path, key="type"
)
if class_value_type == value_type:
class_attr_is_read_only = (
get_nested_value_from_DataService_by_path_and_key(
serialized_class, path=path, key="readonly"
)
)
if class_attr_is_read_only:
logger.debug(
f'Attribute "{path}" is read-only. Ignoring value from JSON '
"file..."
)
continue
# Split the path into parts
parts = path.split(".")
attr_name = parts[-1]
# Convert dictionary into Quantity
if class_value_type == "Quantity":
value = u.convert_to_quantity(value)
self.service.update_DataService_attribute(parts[:-1], attr_name, value)
else:
logger.info(
f'Attribute type of "{path}" changed from "{value_type}" to '
f'"{class_value_type}". Ignoring value from JSON file...'
)
def _load_state_from_file(self) -> dict[str, Any]:
if self.filename is not None:
# Check if the file specified by the filename exists
if os.path.exists(self.filename):
with open(self.filename, "r") as f:
# Load JSON data from file and update class attributes with these
# values
return cast(dict[str, Any], json.load(f))
return {}

View File

@ -272,6 +272,90 @@ def get_nested_value_from_DataService_by_path_and_key(
return current_data.get(key, None) return current_data.get(key, None)
def set_nested_value_in_dict(data_dict: dict[str, Any], path: str, value: Any) -> None:
"""
Set the value associated with a specific key in a dictionary given a path.
This function traverses the dictionary according to the path provided and
sets the value at that path. The path is a string with dots connecting
the levels and brackets indicating list indices.
Args:
cache (dict): The cache dictionary to set the value in.
path (str): The path to where the value should be set in the dictionary.
value (Any): The value to be set at the specified path in the dictionary.
Examples:
Let's consider the following dictionary:
cache = {
"attr1": {"type": "int", "value": 10},
"attr2": {
"type": "MyClass",
"value": {"attr3": {"type": "float", "value": 20.5}}
}
}
The function can be used to set the value of 'attr1' as follows:
set_nested_value_in_cache(cache, "attr1", 15)
It can also be used to set the value of 'attr3', which is nested within 'attr2',
as follows:
set_nested_value_in_cache(cache, "attr2.attr3", 25.0)
"""
parts = path.split(".")
current_dict: dict[str, Any] = data_dict
index: Optional[int] = None
for attr_name in parts:
# Check if the key contains an index part like '[<index>]'
if "[" in attr_name and attr_name.endswith("]"):
attr_name, index_part = attr_name.split("[", 1)
index_part = index_part.rstrip("]") # remove the closing bracket
# Convert the index part to an integer
if index_part.isdigit():
index = int(index_part)
else:
logger.error(f"Invalid index format in key: {attr_name}")
current_dict = cast(dict[str, Any], current_dict.get(attr_name, None))
if not isinstance(current_dict, dict):
# key does not exist in dictionary, e.g. when class does not have this
# attribute
return
if index is not None:
if 0 <= index < len(current_dict["value"]):
try:
current_dict = cast(dict[str, Any], current_dict["value"][index])
except Exception as e:
logger.error(f"Could not change {path}. Exception: {e}")
return
else:
# TODO: appending to a list will probably be done here
logger.error(f"Could not change {path}...")
return
# When the attribute is a class instance, the attributes are nested in the
# "value" key
if (
current_dict["type"] not in STANDARD_TYPES
and current_dict["type"] != "method"
):
current_dict = cast(dict[str, Any], current_dict.get("value", None)) # type: ignore
index = None
# setting the new value
try:
current_dict["value"] = value
except Exception as e:
logger.error(e)
def convert_arguments_to_hinted_types( def convert_arguments_to_hinted_types(
args: dict[str, Any], type_hints: dict[str, Any] args: dict[str, Any], type_hints: dict[str, Any]
) -> dict[str, Any] | str: ) -> dict[str, Any] | str: