diff --git a/src/pyDataInterface/data_service/data_service.py b/src/pyDataInterface/data_service/data_service.py index c81a0e4..8de3eba 100644 --- a/src/pyDataInterface/data_service/data_service.py +++ b/src/pyDataInterface/data_service/data_service.py @@ -49,14 +49,248 @@ class DataService(rpyc.Service): self, f"{self.__class__.__name__}" ) self._register_property_callbacks(self, f"{self.__class__.__name__}") - self._check_instance_classes() + self.__check_instance_classes() self._initialised = True - def _check_instance_classes(self) -> None: - for attr_name, attr_value in self.__get_class_and_instance_attributes().items(): - # every class defined by the user should inherit from DataService - if not attr_name.startswith("_DataService__"): - warn_if_instance_class_does_not_inherit_from_DataService(attr_value) + def __setattr__(self, __name: str, __value: Any) -> None: + super().__setattr__(__name, __value) + if self.__dict__.get("_initialised") and not __name == "_initialised": + for callback in self._callbacks: + callback(__name, __value) + elif __name.startswith(f"_{self.__class__.__name__}__"): + logger.warning( + f"Warning: You should not set private but rather protected attributes! " + f"Use {__name.replace(f'_{self.__class__.__name__}__', '_')} instead " + f"of {__name.replace(f'_{self.__class__.__name__}__', '__')}." + ) + + def _rpyc_getattr(self, name: str) -> Any: + if name.startswith("_"): + # disallow special and private attributes + raise AttributeError("cannot access private/special names") + # allow all other attributes + return getattr(self, name) + + def _rpyc_setattr(self, name: str, value: Any) -> None: + if name.startswith("_"): + # disallow special and private attributes + raise AttributeError("cannot access private/special names") + + # check if the attribute has a setter method + attr = getattr(self, name, None) + if isinstance(attr, property) and attr.fset is None: + raise AttributeError(f"{name} attribute does not have a setter method") + + # allow all other attributes + setattr(self, name, value) + + def _start_autostart_tasks(self) -> None: + if self._autostart_tasks is not None: + for service_name, args in self._autostart_tasks.items(): + start_method = getattr(self, f"start_{service_name}", None) + if start_method is not None and callable(start_method): + start_method(*args) + else: + logger.warning( + f"No start method found for service '{service_name}'" + ) + + def _start_loop(self) -> None: + asyncio.set_event_loop(self.__loop) + try: + self.__loop.run_forever() + finally: + # cancel all running tasks + for task in self.__tasks.values(): + self.__loop.call_soon_threadsafe(task.cancel) + self.__loop.call_soon_threadsafe(self.__loop.stop) + self.__thread.join() + + def _start_async_loop_in_thread(self) -> None: + # create a new event loop and run it in a separate thread + self.__loop = asyncio.new_event_loop() + self.__thread = threading.Thread(target=self._start_loop) + self.__thread.start() + + def _set_start_and_stop_for_async_methods(self) -> None: + # inspect the methods of the class + for name, method in inspect.getmembers( + self, predicate=inspect.iscoroutinefunction + ): + + def start_task(*args: Any, **kwargs: Any) -> None: + async def task(*args: Any, **kwargs: Any) -> None: + try: + await getattr(self, name)(*args, **kwargs) + except asyncio.CancelledError: + print(f"Task {name} was cancelled") + + self.__tasks[name] = asyncio.run_coroutine_threadsafe( + task(*args, **kwargs), self.__loop + ) + + def stop_task() -> None: + # cancel the task + task = self.__tasks.get(name) + if task is not None: + self.__loop.call_soon_threadsafe(task.cancel) + + # create start and stop methods for each coroutine + setattr(self, f"start_{name}", start_task) + setattr(self, f"stop_{name}", stop_task) + + def _register_list_change_callbacks( + self, obj: "DataService", parent_path: str + ) -> None: + """ + This method ensures that notifications are emitted whenever a list attribute of + a DataService instance changes. These notifications pertain solely to the list + item changes, not to changes in attributes of objects within the list. + + The method works by converting all list attributes (both at the class and + instance levels) into DataServiceList objects. Each DataServiceList is then + assigned a callback that is triggered whenever an item in the list is updated. + The callback emits a notification, but only if the DataService instance was the + root instance when the callback was registered. + + This method operates recursively, processing the input object and all nested + attributes that are instances of DataService. While navigating the structure, + it constructs a path for each attribute that traces back to the root. This path + is included in any emitted notifications to facilitate identification of the + source of a change. + + Parameters: + ----------- + obj: DataService + The target object to be processed. All list attributes (and those of its + nested DataService attributes) will be converted into DataServiceList + objects. + parent_path: str + The access path for the parent object. Used to construct the full access + path for the notifications. + """ + + # Convert all list attributes (both class and instance) to DataServiceList + attrs = obj.__get_class_and_instance_attributes() + + for attr_name, attr_value in attrs.items(): + if isinstance(attr_value, DataService): + new_path = f"{parent_path}.{attr_name}" + self._register_list_change_callbacks(attr_value, new_path) + elif isinstance(attr_value, list): + # Create callback for current attr_name + # Default arguments solve the late binding problem by capturing the + # value at the time the lambda is defined, not when it is called. This + # prevents attr_name from being overwritten in the next loop iteration. + callback = ( + lambda index, value, attr_name=attr_name: self._emit_notification( + parent_path=parent_path, + name=f"{attr_name}[{index}]", + value=value, + ) + if self == self.__root__ + else None + ) + + # Check if attr_value is already a DataServiceList or in the mapping + if isinstance(attr_value, DataServiceList): + attr_value.add_callback(callback) + continue + if id(attr_value) in self._list_mapping: + notifying_list = self._list_mapping[id(attr_value)] + notifying_list.add_callback(callback) + else: + notifying_list = DataServiceList(attr_value, callback=[callback]) + self._list_mapping[id(attr_value)] = notifying_list + + setattr(obj, attr_name, notifying_list) + + # recursively add callbacks to list attributes of DataService instances + for i, item in enumerate(attr_value): + if isinstance(item, DataService): + new_path = f"{parent_path}.{attr_name}[{i}]" + self._register_list_change_callbacks(item, new_path) + + def _register_DataService_instance_callbacks( + self, obj: "DataService", parent_path: str + ) -> None: + """ + This function is a key part of the observer pattern implemented by the + DataService class. + Its purpose is to allow the system to automatically send out notifications + whenever an attribute of a DataService instance is updated, which is especially + useful when the DataService instance is part of a nested structure. + + It works by recursively registering callbacks for a given DataService instance + and all of its nested attributes. Each callback is responsible for emitting a + notification when the attribute it is attached to is modified. + + This function ensures that only the root DataService instance (the one directly + exposed to the user or another system via rpyc) emits notifications. + + Each notification contains a 'parent_path' that traces the attribute's location + within the nested DataService structure, starting from the root. This makes it + easier for observers to determine exactly where a change has occurred. + + Parameters: + ----------- + obj: DataService + The target object on which callbacks are to be registered. + parent_path: str + The access path for the parent object. This is used to construct the full + access path for the notifications. + """ + + # Create and register a callback for the object + # only emit the notification when the call was registered by the root object + callback: Callable[[str, Any], None] = ( + lambda name, value: obj._emit_notification( + parent_path=parent_path, name=name, value=value + ) + if self == obj.__root__ + and not name.startswith("_") # we are only interested in public attributes + and not isinstance( + getattr(type(obj), name, None), property + ) # exlude proerty notifications -> those are handled in separate callbacks + else None + ) + + obj._callbacks.add(callback) + + # Recursively register callbacks for all nested attributes of the object + attrs = obj.__get_class_and_instance_attributes() + + for nested_attr_name, nested_attr in attrs.items(): + if isinstance(nested_attr, DataServiceList): + self._register_list_callbacks( + nested_attr, parent_path, nested_attr_name + ) + elif isinstance(nested_attr, DataService): + self._register_service_callbacks( + nested_attr, parent_path, nested_attr_name + ) + + def _register_list_callbacks( + self, nested_attr: list[Any], parent_path: str, attr_name: str + ) -> None: + """Handles registration of callbacks for list attributes""" + for i, list_item in enumerate(nested_attr): + if isinstance(list_item, DataService): + self._register_service_callbacks( + list_item, parent_path, f"{attr_name}[{i}]" + ) + + def _register_service_callbacks( + self, nested_attr: "DataService", parent_path: str, attr_name: str + ) -> None: + """Handles registration of callbacks for DataService attributes""" + + # as the DataService is an attribute of self, change the root object + # use the dictionary to not trigger callbacks on initialised objects + nested_attr.__dict__["__root__"] = self.__root__ + + new_path = f"{parent_path}.{attr_name}" + self._register_DataService_instance_callbacks(nested_attr, new_path) def __register_recursive_parameter_callback( self, @@ -170,249 +404,6 @@ class DataService(rpyc.Service): # Add to _callbacks obj._callbacks.add(callback) - def _register_list_change_callbacks( - self, obj: "DataService", parent_path: str - ) -> None: - """ - This method ensures that notifications are emitted whenever a list attribute of - a DataService instance changes. These notifications pertain solely to the list - item changes, not to changes in attributes of objects within the list. - - The method works by converting all list attributes (both at the class and - instance levels) into DataServiceList objects. Each DataServiceList is then - assigned a callback that is triggered whenever an item in the list is updated. - The callback emits a notification, but only if the DataService instance was the - root instance when the callback was registered. - - This method operates recursively, processing the input object and all nested - attributes that are instances of DataService. While navigating the structure, - it constructs a path for each attribute that traces back to the root. This path - is included in any emitted notifications to facilitate identification of the - source of a change. - - Parameters: - ----------- - obj: DataService - The target object to be processed. All list attributes (and those of its - nested DataService attributes) will be converted into DataServiceList - objects. - parent_path: str - The access path for the parent object. Used to construct the full access - path for the notifications. - """ - - # Convert all list attributes (both class and instance) to DataServiceList - attrs = obj.__get_class_and_instance_attributes() - - for attr_name, attr_value in attrs.items(): - if isinstance(attr_value, DataService): - new_path = f"{parent_path}.{attr_name}" - self._register_list_change_callbacks(attr_value, new_path) - elif isinstance(attr_value, list): - # Create callback for current attr_name - # Default arguments solve the late binding problem by capturing the - # value at the time the lambda is defined, not when it is called. This - # prevents attr_name from being overwritten in the next loop iteration. - callback = ( - lambda index, value, attr_name=attr_name: self._emit_notification( - parent_path=parent_path, - name=f"{attr_name}[{index}]", - value=value, - ) - if self == self.__root__ - else None - ) - - # Check if attr_value is already a DataServiceList or in the mapping - if isinstance(attr_value, DataServiceList): - attr_value.add_callback(callback) - continue - if id(attr_value) in self._list_mapping: - notifying_list = self._list_mapping[id(attr_value)] - notifying_list.add_callback(callback) - else: - notifying_list = DataServiceList(attr_value, callback=[callback]) - self._list_mapping[id(attr_value)] = notifying_list - - setattr(obj, attr_name, notifying_list) - - # recursively add callbacks to list attributes of DataService instances - for i, item in enumerate(attr_value): - if isinstance(item, DataService): - new_path = f"{parent_path}.{attr_name}[{i}]" - self._register_list_change_callbacks(item, new_path) - - def _start_autostart_tasks(self) -> None: - if self._autostart_tasks is not None: - for service_name, args in self._autostart_tasks.items(): - start_method = getattr(self, f"start_{service_name}", None) - if start_method is not None and callable(start_method): - start_method(*args) - else: - logger.warning( - f"No start method found for service '{service_name}'" - ) - - def _start_async_loop_in_thread(self) -> None: - # create a new event loop and run it in a separate thread - self.__loop = asyncio.new_event_loop() - self.__thread = threading.Thread(target=self._start_loop) - self.__thread.start() - - def _set_start_and_stop_for_async_methods(self) -> None: - # inspect the methods of the class - for name, method in inspect.getmembers( - self, predicate=inspect.iscoroutinefunction - ): - - def start_task(*args: Any, **kwargs: Any) -> None: - async def task(*args: Any, **kwargs: Any) -> None: - try: - await getattr(self, name)(*args, **kwargs) - except asyncio.CancelledError: - print(f"Task {name} was cancelled") - - self.__tasks[name] = asyncio.run_coroutine_threadsafe( - task(*args, **kwargs), self.__loop - ) - - def stop_task() -> None: - # cancel the task - task = self.__tasks.get(name) - if task is not None: - self.__loop.call_soon_threadsafe(task.cancel) - - # create start and stop methods for each coroutine - setattr(self, f"start_{name}", start_task) - setattr(self, f"stop_{name}", stop_task) - - def _register_DataService_instance_callbacks( - self, obj: "DataService", parent_path: str - ) -> None: - """ - This function is a key part of the observer pattern implemented by the - DataService class. - Its purpose is to allow the system to automatically send out notifications - whenever an attribute of a DataService instance is updated, which is especially - useful when the DataService instance is part of a nested structure. - - It works by recursively registering callbacks for a given DataService instance - and all of its nested attributes. Each callback is responsible for emitting a - notification when the attribute it is attached to is modified. - - This function ensures that only the root DataService instance (the one directly - exposed to the user or another system via rpyc) emits notifications. - - Each notification contains a 'parent_path' that traces the attribute's location - within the nested DataService structure, starting from the root. This makes it - easier for observers to determine exactly where a change has occurred. - - Parameters: - ----------- - obj: DataService - The target object on which callbacks are to be registered. - parent_path: str - The access path for the parent object. This is used to construct the full - access path for the notifications. - """ - - # Create and register a callback for the object - # only emit the notification when the call was registered by the root object - callback: Callable[[str, Any], None] = ( - lambda name, value: obj._emit_notification( - parent_path=parent_path, name=name, value=value - ) - if self == obj.__root__ - and not name.startswith("_") # we are only interested in public attributes - and not isinstance( - getattr(type(obj), name, None), property - ) # exlude proerty notifications -> those are handled in separate callbacks - else None - ) - - obj._callbacks.add(callback) - - # Recursively register callbacks for all nested attributes of the object - attrs = obj.__get_class_and_instance_attributes() - - for nested_attr_name, nested_attr in attrs.items(): - if isinstance(nested_attr, DataServiceList): - self._register_list_callbacks( - nested_attr, parent_path, nested_attr_name - ) - elif isinstance(nested_attr, DataService): - self._register_service_callbacks( - nested_attr, parent_path, nested_attr_name - ) - - def _register_list_callbacks( - self, nested_attr: list[Any], parent_path: str, attr_name: str - ) -> None: - """Handles registration of callbacks for list attributes""" - for i, list_item in enumerate(nested_attr): - if isinstance(list_item, DataService): - self._register_service_callbacks( - list_item, parent_path, f"{attr_name}[{i}]" - ) - - def _register_service_callbacks( - self, nested_attr: "DataService", parent_path: str, attr_name: str - ) -> None: - """Handles registration of callbacks for DataService attributes""" - - # as the DataService is an attribute of self, change the root object - # use the dictionary to not trigger callbacks on initialised objects - nested_attr.__dict__["__root__"] = self.__root__ - - new_path = f"{parent_path}.{attr_name}" - self._register_DataService_instance_callbacks(nested_attr, new_path) - - def _start_loop(self) -> None: - asyncio.set_event_loop(self.__loop) - try: - self.__loop.run_forever() - finally: - # cancel all running tasks - for task in self.__tasks.values(): - self.__loop.call_soon_threadsafe(task.cancel) - self.__loop.call_soon_threadsafe(self.__loop.stop) - self.__thread.join() - - def __setattr__(self, __name: str, __value: Any) -> None: - super().__setattr__(__name, __value) - if self.__dict__.get("_initialised") and not __name == "_initialised": - for callback in self._callbacks: - callback(__name, __value) - elif __name.startswith(f"_{self.__class__.__name__}__"): - logger.warning( - f"Warning: You should not set private but rather protected attributes! " - f"Use {__name.replace(f'_{self.__class__.__name__}__', '_')} instead " - f"of {__name.replace(f'_{self.__class__.__name__}__', '__')}." - ) - - def _emit_notification(self, parent_path: str, name: str, value: Any) -> None: - logger.debug(f"{parent_path}.{name} changed to {value}!") - - def _rpyc_getattr(self, name: str) -> Any: - if name.startswith("_"): - # disallow special and private attributes - raise AttributeError("cannot access private/special names") - # allow all other attributes - return getattr(self, name) - - def _rpyc_setattr(self, name: str, value: Any) -> None: - if name.startswith("_"): - # disallow special and private attributes - raise AttributeError("cannot access private/special names") - - # check if the attribute has a setter method - attr = getattr(self, name, None) - if isinstance(attr, property) and attr.fset is None: - raise AttributeError(f"{name} attribute does not have a setter method") - - # allow all other attributes - setattr(self, name, value) - def __get_class_and_instance_attributes(self) -> dict[str, Any]: """Dictionary containing all attributes (both instance and class level) of a given object. @@ -427,6 +418,15 @@ class DataService(rpyc.Service): attrs.pop("__root__") return attrs + def __check_instance_classes(self) -> None: + for attr_name, attr_value in self.__get_class_and_instance_attributes().items(): + # every class defined by the user should inherit from DataService + if not attr_name.startswith("_DataService__"): + warn_if_instance_class_does_not_inherit_from_DataService(attr_value) + + def _emit_notification(self, parent_path: str, name: str, value: Any) -> None: + logger.debug(f"{parent_path}.{name} changed to {value}!") + def serialize(self, prefix: str = "") -> dict[str, dict[str, Any]]: """ Serializes the instance into a dictionary, preserving the structure of the