diff --git a/src/pyDataInterface/data_service/data_service.py b/src/pyDataInterface/data_service/data_service.py index faadf66..ba7d4cc 100644 --- a/src/pyDataInterface/data_service/data_service.py +++ b/src/pyDataInterface/data_service/data_service.py @@ -37,7 +37,7 @@ def process_callable_attribute(attr: Any, args: dict[str, Any]) -> Any: ) -class DataService(rpyc.Service, TaskManager): +class CallbackManager: _list_mapping: dict[int, DataServiceList] = {} """ A dictionary mapping the id of the original lists to the corresponding @@ -47,135 +47,10 @@ class DataService(rpyc.Service, TaskManager): be tracked consistently. The keys of the dictionary are the ids of the original lists, and the values are the DataServiceList instances that wrap these lists. """ - _notification_callbacks: list[Callable[[str, str, Any], Any]] = [] - """ - A list of callback functions that are executed when a change occurs in the - DataService instance. These functions are intended to handle or respond to these - changes in some way, such as emitting a socket.io message to the frontend. - Each function in this list should be a callable that accepts three parameters: - - - parent_path (str): The path to the parent of the attribute that was changed. - - name (str): The name of the attribute that was changed. - - value (Any): The new value of the attribute. - - A callback function can be added to this list using the add_notification_callback - method. Whenever a change in the DataService instance occurs (or in its nested - DataService or DataServiceList instances), the _emit_notification method is invoked, - which in turn calls all the callback functions in _notification_callbacks with the - appropriate arguments. - - This implementation follows the observer pattern, with the DataService instance as - the "subject" and the callback functions as the "observers". - """ - - def __init__(self, filename: Optional[str] = None) -> None: - TaskManager.__init__(self) - self.__root__: "DataService" = self - """Keep track of the root object. This helps to filter the emission of - notifications. This overwrite the TaksManager's __root__ attribute.""" - - self._callbacks: set[Callable[[str, Any], None]] = set() - self._filename: Optional[str] = filename - - self._register_callbacks() - self.__check_instance_classes() - self._initialised = True - self._load_values_from_json() - - def _load_values_from_json(self) -> None: - if self._filename is not None: - # Check if the file specified by the filename exists - if os.path.exists(self._filename): - with open(self._filename, "r") as f: - # Load JSON data from file and update class attributes with these - # values - self.load_DataService_from_JSON(cast(dict[str, Any], json.load(f))) - - def write_to_file(self) -> None: - """ - Serialize the DataService instance and write it to a JSON file. - - Args: - filename (str): The name of the file to write to. - """ - if self._filename is not None: - with open(self._filename, "w") as f: - json.dump(self.serialize(), f, indent=4) - else: - logger.error( - f"Class {self.__class__.__name__} was not initialised with a filename. " - 'Skipping "write_to_file"...' - ) - - def load_DataService_from_JSON(self, json_dict: dict[str, Any]) -> None: - # Traverse the serialized representation and set the attributes of the class - serialized_class = self.serialize() - for path in generate_paths_from_DataService_dict(json_dict): - value = get_nested_value_by_path_and_key(json_dict, path=path) - value_type = get_nested_value_by_path_and_key( - json_dict, path=path, key="type" - ) - class_value_type = get_nested_value_by_path_and_key( - serialized_class, path=path, key="type" - ) - if class_value_type == value_type: - # Split the path into parts - parts = path.split(".") - attr_name = parts[-1] - - self.update_DataService_attribute(parts[:-1], attr_name, value) - else: - logger.info( - f'Attribute type of "{path}" changed from "{value_type}" to ' - f'"{class_value_type}". Ignoring value from JSON file...' - ) - - def __setattr__(self, __name: str, __value: Any) -> None: - current_value = getattr(self, __name, None) - # parse ints into floats if current value is a float - if isinstance(current_value, float) and isinstance(__value, int): - __value = float(__value) - - super().__setattr__(__name, __value) - - if self.__dict__.get("_initialised") and not __name == "_initialised": - for callback in self._callbacks: - callback(__name, __value) - elif __name.startswith(f"_{self.__class__.__name__}__"): - logger.warning( - f"Warning: You should not set private but rather protected attributes! " - f"Use {__name.replace(f'_{self.__class__.__name__}__', '_')} instead " - f"of {__name.replace(f'_{self.__class__.__name__}__', '__')}." - ) - - def _rpyc_getattr(self, name: str) -> Any: - if name.startswith("_"): - # disallow special and private attributes - raise AttributeError("cannot access private/special names") - # allow all other attributes - return getattr(self, name) - - def _rpyc_setattr(self, name: str, value: Any) -> None: - if name.startswith("_"): - # disallow special and private attributes - raise AttributeError("cannot access private/special names") - - # check if the attribute has a setter method - attr = getattr(self, name, None) - if isinstance(attr, property) and attr.fset is None: - raise AttributeError(f"{name} attribute does not have a setter method") - - # allow all other attributes - setattr(self, name, value) - - def _register_callbacks(self) -> None: - self._register_list_change_callbacks(self, f"{self.__class__.__name__}") - self._register_DataService_instance_callbacks( - self, f"{self.__class__.__name__}" - ) - self._register_property_callbacks(self, f"{self.__class__.__name__}") - self._register_start_stop_task_callbacks(self, f"{self.__class__.__name__}") + def __init__(self, service: "DataService") -> None: + self.callbacks: set[Callable[[str, Any], None]] = set() + self.service = service def _register_list_change_callbacks( # noqa: C901 self, obj: "DataService", parent_path: str @@ -221,12 +96,12 @@ class DataService(rpyc.Service, TaskManager): # value at the time the lambda is defined, not when it is called. This # prevents attr_name from being overwritten in the next loop iteration. callback = ( - lambda index, value, attr_name=attr_name: self._emit_notification( + lambda index, value, attr_name=attr_name: self.service._emit_notification( parent_path=parent_path, name=f"{attr_name}[{index}]", value=value, ) - if self == self.__root__ + if self.service == self.service.__root__ else None ) @@ -285,7 +160,7 @@ class DataService(rpyc.Service, TaskManager): lambda name, value: obj._emit_notification( parent_path=parent_path, name=name, value=value ) - if self == obj.__root__ + if self.service == obj.__root__ and not name.startswith("_") # we are only interested in public attributes and not isinstance( getattr(type(obj), name, None), property @@ -293,7 +168,7 @@ class DataService(rpyc.Service, TaskManager): else None ) - obj._callbacks.add(callback) + obj._callback_manager.callbacks.add(callback) # Recursively register callbacks for all nested attributes of the object attrs = get_class_and_instance_attributes(obj) @@ -325,7 +200,7 @@ class DataService(rpyc.Service, TaskManager): # as the DataService is an attribute of self, change the root object # use the dictionary to not trigger callbacks on initialised objects - nested_attr.__dict__["__root__"] = self.__root__ + nested_attr.__dict__["__root__"] = self.service.__root__ new_path = f"{parent_path}.{attr_name}" self._register_DataService_instance_callbacks(nested_attr, new_path) @@ -356,7 +231,7 @@ class DataService(rpyc.Service, TaskManager): # changed, not reassigned) for item in obj_list: if isinstance(item, DataService): - item._callbacks.add(callback) + item._callback_manager.callbacks.add(callback) for attr_name in set(dir(item)) - set(dir(object)) - {"__root__"}: attr_value = getattr(item, attr_name) if isinstance(attr_value, (DataService, DataServiceList)): @@ -421,7 +296,7 @@ class DataService(rpyc.Service, TaskManager): name=dependent_attr, value=getattr(obj, dependent_attr), ) - if self == obj.__root__ + if self.service == obj.__root__ else None ) @@ -436,11 +311,188 @@ class DataService(rpyc.Service, TaskManager): name=dependent_attr, value=getattr(obj, dependent_attr), ) - if name == dep and self == obj.__root__ + if name == dep and self.service == obj.__root__ else None ) # Add to _callbacks - obj._callbacks.add(callback) + obj._callback_manager.callbacks.add(callback) + + def _register_start_stop_task_callbacks( + self, obj: "DataService", parent_path: str + ) -> None: + """ + This function registers callbacks for start and stop methods of async functions. + These callbacks are stored in the '_task_status_change_callbacks' attribute and + are called when the status of a task changes. + + Parameters: + ----------- + obj: DataService + The target object on which callbacks are to be registered. + parent_path: str + The access path for the parent object. This is used to construct the full + access path for the notifications. + """ + + # Create and register a callback for the object + # only emit the notification when the call was registered by the root object + callback: Callable[[str, dict[str, Any] | None], None] = ( + lambda name, status: obj._emit_notification( + parent_path=parent_path, name=name, value=status + ) + if self.service == obj.__root__ + and not name.startswith("_") # we are only interested in public attributes + else None + ) + + obj._task_status_change_callbacks.append(callback) + + # Recursively register callbacks for all nested attributes of the object + attrs: dict[str, Any] = get_class_and_instance_attributes(obj) + + for nested_attr_name, nested_attr in attrs.items(): + if isinstance(nested_attr, DataService): + self._register_start_stop_task_callbacks( + nested_attr, parent_path=f"{parent_path}.{nested_attr_name}" + ) + + def register_callbacks(self) -> None: + self._register_list_change_callbacks( + self.service, f"{self.service.__class__.__name__}" + ) + self._register_DataService_instance_callbacks( + self.service, f"{self.service.__class__.__name__}" + ) + self._register_property_callbacks( + self.service, f"{self.service.__class__.__name__}" + ) + self._register_start_stop_task_callbacks( + self.service, f"{self.service.__class__.__name__}" + ) + + +class DataService(rpyc.Service, TaskManager): + _notification_callbacks: list[Callable[[str, str, Any], Any]] = [] + """ + A list of callback functions that are executed when a change occurs in the + DataService instance. These functions are intended to handle or respond to these + changes in some way, such as emitting a socket.io message to the frontend. + + Each function in this list should be a callable that accepts three parameters: + + - parent_path (str): The path to the parent of the attribute that was changed. + - name (str): The name of the attribute that was changed. + - value (Any): The new value of the attribute. + + A callback function can be added to this list using the add_notification_callback + method. Whenever a change in the DataService instance occurs (or in its nested + DataService or DataServiceList instances), the _emit_notification method is invoked, + which in turn calls all the callback functions in _notification_callbacks with the + appropriate arguments. + + This implementation follows the observer pattern, with the DataService instance as + the "subject" and the callback functions as the "observers". + """ + + def __init__(self, filename: Optional[str] = None) -> None: + TaskManager.__init__(self) + self._callback_manager = CallbackManager(self) + self.__root__: "DataService" = self + """Keep track of the root object. This helps to filter the emission of + notifications. This overwrite the TaksManager's __root__ attribute.""" + + self._filename: Optional[str] = filename + + self._callback_manager.register_callbacks() + self.__check_instance_classes() + self._initialised = True + self._load_values_from_json() + + def _load_values_from_json(self) -> None: + if self._filename is not None: + # Check if the file specified by the filename exists + if os.path.exists(self._filename): + with open(self._filename, "r") as f: + # Load JSON data from file and update class attributes with these + # values + self.load_DataService_from_JSON(cast(dict[str, Any], json.load(f))) + + def write_to_file(self) -> None: + """ + Serialize the DataService instance and write it to a JSON file. + + Args: + filename (str): The name of the file to write to. + """ + if self._filename is not None: + with open(self._filename, "w") as f: + json.dump(self.serialize(), f, indent=4) + else: + logger.error( + f"Class {self.__class__.__name__} was not initialised with a filename. " + 'Skipping "write_to_file"...' + ) + + def load_DataService_from_JSON(self, json_dict: dict[str, Any]) -> None: + # Traverse the serialized representation and set the attributes of the class + serialized_class = self.serialize() + for path in generate_paths_from_DataService_dict(json_dict): + value = get_nested_value_by_path_and_key(json_dict, path=path) + value_type = get_nested_value_by_path_and_key( + json_dict, path=path, key="type" + ) + class_value_type = get_nested_value_by_path_and_key( + serialized_class, path=path, key="type" + ) + if class_value_type == value_type: + # Split the path into parts + parts = path.split(".") + attr_name = parts[-1] + + self.update_DataService_attribute(parts[:-1], attr_name, value) + else: + logger.info( + f'Attribute type of "{path}" changed from "{value_type}" to ' + f'"{class_value_type}". Ignoring value from JSON file...' + ) + + def __setattr__(self, __name: str, __value: Any) -> None: + current_value = getattr(self, __name, None) + # parse ints into floats if current value is a float + if isinstance(current_value, float) and isinstance(__value, int): + __value = float(__value) + + super().__setattr__(__name, __value) + + if self.__dict__.get("_initialised") and not __name == "_initialised": + for callback in self._callback_manager.callbacks: + callback(__name, __value) + elif __name.startswith(f"_{self.__class__.__name__}__"): + logger.warning( + f"Warning: You should not set private but rather protected attributes! " + f"Use {__name.replace(f'_{self.__class__.__name__}__', '_')} instead " + f"of {__name.replace(f'_{self.__class__.__name__}__', '__')}." + ) + + def _rpyc_getattr(self, name: str) -> Any: + if name.startswith("_"): + # disallow special and private attributes + raise AttributeError("cannot access private/special names") + # allow all other attributes + return getattr(self, name) + + def _rpyc_setattr(self, name: str, value: Any) -> None: + if name.startswith("_"): + # disallow special and private attributes + raise AttributeError("cannot access private/special names") + + # check if the attribute has a setter method + attr = getattr(self, name, None) + if isinstance(attr, property) and attr.fset is None: + raise AttributeError(f"{name} attribute does not have a setter method") + + # allow all other attributes + setattr(self, name, value) def __check_instance_classes(self) -> None: for attr_name, attr_value in get_class_and_instance_attributes(self).items(): diff --git a/src/pyDataInterface/data_service/task_manager.py b/src/pyDataInterface/data_service/task_manager.py index f79b180..5052450 100644 --- a/src/pyDataInterface/data_service/task_manager.py +++ b/src/pyDataInterface/data_service/task_manager.py @@ -95,45 +95,6 @@ class TaskManager(ABC): self._set_start_and_stop_for_async_methods() - def _register_start_stop_task_callbacks( - self, obj: "TaskManager", parent_path: str - ) -> None: - """ - This function registers callbacks for start and stop methods of async functions. - These callbacks are stored in the '_task_status_change_callbacks' attribute and - are called when the status of a task changes. - - Parameters: - ----------- - obj: DataService - The target object on which callbacks are to be registered. - parent_path: str - The access path for the parent object. This is used to construct the full - access path for the notifications. - """ - - # Create and register a callback for the object - # only emit the notification when the call was registered by the root object - callback: Callable[[str, dict[str, Any] | None], None] = ( - lambda name, status: obj._emit_notification( - parent_path=parent_path, name=name, value=status - ) - if self == obj.__root__ - and not name.startswith("_") # we are only interested in public attributes - else None - ) - - obj._task_status_change_callbacks.append(callback) - - # Recursively register callbacks for all nested attributes of the object - attrs: dict[str, Any] = get_class_and_instance_attributes(obj) - - for nested_attr_name, nested_attr in attrs.items(): - if isinstance(nested_attr, TaskManager): - self._register_start_stop_task_callbacks( - nested_attr, parent_path=f"{parent_path}.{nested_attr_name}" - ) - def _set_start_and_stop_for_async_methods(self) -> None: # noqa: C901 # inspect the methods of the class for name, method in inspect.getmembers(