Reordering DataService methods

This commit is contained in:
Mose Müller 2023-08-02 12:06:19 +02:00
parent a09ec145ca
commit 387c258e95

View File

@ -49,14 +49,248 @@ class DataService(rpyc.Service):
self, f"{self.__class__.__name__}"
)
self._register_property_callbacks(self, f"{self.__class__.__name__}")
self._check_instance_classes()
self.__check_instance_classes()
self._initialised = True
def _check_instance_classes(self) -> None:
for attr_name, attr_value in self.__get_class_and_instance_attributes().items():
# every class defined by the user should inherit from DataService
if not attr_name.startswith("_DataService__"):
warn_if_instance_class_does_not_inherit_from_DataService(attr_value)
def __setattr__(self, __name: str, __value: Any) -> None:
super().__setattr__(__name, __value)
if self.__dict__.get("_initialised") and not __name == "_initialised":
for callback in self._callbacks:
callback(__name, __value)
elif __name.startswith(f"_{self.__class__.__name__}__"):
logger.warning(
f"Warning: You should not set private but rather protected attributes! "
f"Use {__name.replace(f'_{self.__class__.__name__}__', '_')} instead "
f"of {__name.replace(f'_{self.__class__.__name__}__', '__')}."
)
def _rpyc_getattr(self, name: str) -> Any:
if name.startswith("_"):
# disallow special and private attributes
raise AttributeError("cannot access private/special names")
# allow all other attributes
return getattr(self, name)
def _rpyc_setattr(self, name: str, value: Any) -> None:
if name.startswith("_"):
# disallow special and private attributes
raise AttributeError("cannot access private/special names")
# check if the attribute has a setter method
attr = getattr(self, name, None)
if isinstance(attr, property) and attr.fset is None:
raise AttributeError(f"{name} attribute does not have a setter method")
# allow all other attributes
setattr(self, name, value)
def _start_autostart_tasks(self) -> None:
if self._autostart_tasks is not None:
for service_name, args in self._autostart_tasks.items():
start_method = getattr(self, f"start_{service_name}", None)
if start_method is not None and callable(start_method):
start_method(*args)
else:
logger.warning(
f"No start method found for service '{service_name}'"
)
def _start_loop(self) -> None:
asyncio.set_event_loop(self.__loop)
try:
self.__loop.run_forever()
finally:
# cancel all running tasks
for task in self.__tasks.values():
self.__loop.call_soon_threadsafe(task.cancel)
self.__loop.call_soon_threadsafe(self.__loop.stop)
self.__thread.join()
def _start_async_loop_in_thread(self) -> None:
# create a new event loop and run it in a separate thread
self.__loop = asyncio.new_event_loop()
self.__thread = threading.Thread(target=self._start_loop)
self.__thread.start()
def _set_start_and_stop_for_async_methods(self) -> None:
# inspect the methods of the class
for name, method in inspect.getmembers(
self, predicate=inspect.iscoroutinefunction
):
def start_task(*args: Any, **kwargs: Any) -> None:
async def task(*args: Any, **kwargs: Any) -> None:
try:
await getattr(self, name)(*args, **kwargs)
except asyncio.CancelledError:
print(f"Task {name} was cancelled")
self.__tasks[name] = asyncio.run_coroutine_threadsafe(
task(*args, **kwargs), self.__loop
)
def stop_task() -> None:
# cancel the task
task = self.__tasks.get(name)
if task is not None:
self.__loop.call_soon_threadsafe(task.cancel)
# create start and stop methods for each coroutine
setattr(self, f"start_{name}", start_task)
setattr(self, f"stop_{name}", stop_task)
def _register_list_change_callbacks(
self, obj: "DataService", parent_path: str
) -> None:
"""
This method ensures that notifications are emitted whenever a list attribute of
a DataService instance changes. These notifications pertain solely to the list
item changes, not to changes in attributes of objects within the list.
The method works by converting all list attributes (both at the class and
instance levels) into DataServiceList objects. Each DataServiceList is then
assigned a callback that is triggered whenever an item in the list is updated.
The callback emits a notification, but only if the DataService instance was the
root instance when the callback was registered.
This method operates recursively, processing the input object and all nested
attributes that are instances of DataService. While navigating the structure,
it constructs a path for each attribute that traces back to the root. This path
is included in any emitted notifications to facilitate identification of the
source of a change.
Parameters:
-----------
obj: DataService
The target object to be processed. All list attributes (and those of its
nested DataService attributes) will be converted into DataServiceList
objects.
parent_path: str
The access path for the parent object. Used to construct the full access
path for the notifications.
"""
# Convert all list attributes (both class and instance) to DataServiceList
attrs = obj.__get_class_and_instance_attributes()
for attr_name, attr_value in attrs.items():
if isinstance(attr_value, DataService):
new_path = f"{parent_path}.{attr_name}"
self._register_list_change_callbacks(attr_value, new_path)
elif isinstance(attr_value, list):
# Create callback for current attr_name
# Default arguments solve the late binding problem by capturing the
# value at the time the lambda is defined, not when it is called. This
# prevents attr_name from being overwritten in the next loop iteration.
callback = (
lambda index, value, attr_name=attr_name: self._emit_notification(
parent_path=parent_path,
name=f"{attr_name}[{index}]",
value=value,
)
if self == self.__root__
else None
)
# Check if attr_value is already a DataServiceList or in the mapping
if isinstance(attr_value, DataServiceList):
attr_value.add_callback(callback)
continue
if id(attr_value) in self._list_mapping:
notifying_list = self._list_mapping[id(attr_value)]
notifying_list.add_callback(callback)
else:
notifying_list = DataServiceList(attr_value, callback=[callback])
self._list_mapping[id(attr_value)] = notifying_list
setattr(obj, attr_name, notifying_list)
# recursively add callbacks to list attributes of DataService instances
for i, item in enumerate(attr_value):
if isinstance(item, DataService):
new_path = f"{parent_path}.{attr_name}[{i}]"
self._register_list_change_callbacks(item, new_path)
def _register_DataService_instance_callbacks(
self, obj: "DataService", parent_path: str
) -> None:
"""
This function is a key part of the observer pattern implemented by the
DataService class.
Its purpose is to allow the system to automatically send out notifications
whenever an attribute of a DataService instance is updated, which is especially
useful when the DataService instance is part of a nested structure.
It works by recursively registering callbacks for a given DataService instance
and all of its nested attributes. Each callback is responsible for emitting a
notification when the attribute it is attached to is modified.
This function ensures that only the root DataService instance (the one directly
exposed to the user or another system via rpyc) emits notifications.
Each notification contains a 'parent_path' that traces the attribute's location
within the nested DataService structure, starting from the root. This makes it
easier for observers to determine exactly where a change has occurred.
Parameters:
-----------
obj: DataService
The target object on which callbacks are to be registered.
parent_path: str
The access path for the parent object. This is used to construct the full
access path for the notifications.
"""
# Create and register a callback for the object
# only emit the notification when the call was registered by the root object
callback: Callable[[str, Any], None] = (
lambda name, value: obj._emit_notification(
parent_path=parent_path, name=name, value=value
)
if self == obj.__root__
and not name.startswith("_") # we are only interested in public attributes
and not isinstance(
getattr(type(obj), name, None), property
) # exlude proerty notifications -> those are handled in separate callbacks
else None
)
obj._callbacks.add(callback)
# Recursively register callbacks for all nested attributes of the object
attrs = obj.__get_class_and_instance_attributes()
for nested_attr_name, nested_attr in attrs.items():
if isinstance(nested_attr, DataServiceList):
self._register_list_callbacks(
nested_attr, parent_path, nested_attr_name
)
elif isinstance(nested_attr, DataService):
self._register_service_callbacks(
nested_attr, parent_path, nested_attr_name
)
def _register_list_callbacks(
self, nested_attr: list[Any], parent_path: str, attr_name: str
) -> None:
"""Handles registration of callbacks for list attributes"""
for i, list_item in enumerate(nested_attr):
if isinstance(list_item, DataService):
self._register_service_callbacks(
list_item, parent_path, f"{attr_name}[{i}]"
)
def _register_service_callbacks(
self, nested_attr: "DataService", parent_path: str, attr_name: str
) -> None:
"""Handles registration of callbacks for DataService attributes"""
# as the DataService is an attribute of self, change the root object
# use the dictionary to not trigger callbacks on initialised objects
nested_attr.__dict__["__root__"] = self.__root__
new_path = f"{parent_path}.{attr_name}"
self._register_DataService_instance_callbacks(nested_attr, new_path)
def __register_recursive_parameter_callback(
self,
@ -170,249 +404,6 @@ class DataService(rpyc.Service):
# Add to _callbacks
obj._callbacks.add(callback)
def _register_list_change_callbacks(
self, obj: "DataService", parent_path: str
) -> None:
"""
This method ensures that notifications are emitted whenever a list attribute of
a DataService instance changes. These notifications pertain solely to the list
item changes, not to changes in attributes of objects within the list.
The method works by converting all list attributes (both at the class and
instance levels) into DataServiceList objects. Each DataServiceList is then
assigned a callback that is triggered whenever an item in the list is updated.
The callback emits a notification, but only if the DataService instance was the
root instance when the callback was registered.
This method operates recursively, processing the input object and all nested
attributes that are instances of DataService. While navigating the structure,
it constructs a path for each attribute that traces back to the root. This path
is included in any emitted notifications to facilitate identification of the
source of a change.
Parameters:
-----------
obj: DataService
The target object to be processed. All list attributes (and those of its
nested DataService attributes) will be converted into DataServiceList
objects.
parent_path: str
The access path for the parent object. Used to construct the full access
path for the notifications.
"""
# Convert all list attributes (both class and instance) to DataServiceList
attrs = obj.__get_class_and_instance_attributes()
for attr_name, attr_value in attrs.items():
if isinstance(attr_value, DataService):
new_path = f"{parent_path}.{attr_name}"
self._register_list_change_callbacks(attr_value, new_path)
elif isinstance(attr_value, list):
# Create callback for current attr_name
# Default arguments solve the late binding problem by capturing the
# value at the time the lambda is defined, not when it is called. This
# prevents attr_name from being overwritten in the next loop iteration.
callback = (
lambda index, value, attr_name=attr_name: self._emit_notification(
parent_path=parent_path,
name=f"{attr_name}[{index}]",
value=value,
)
if self == self.__root__
else None
)
# Check if attr_value is already a DataServiceList or in the mapping
if isinstance(attr_value, DataServiceList):
attr_value.add_callback(callback)
continue
if id(attr_value) in self._list_mapping:
notifying_list = self._list_mapping[id(attr_value)]
notifying_list.add_callback(callback)
else:
notifying_list = DataServiceList(attr_value, callback=[callback])
self._list_mapping[id(attr_value)] = notifying_list
setattr(obj, attr_name, notifying_list)
# recursively add callbacks to list attributes of DataService instances
for i, item in enumerate(attr_value):
if isinstance(item, DataService):
new_path = f"{parent_path}.{attr_name}[{i}]"
self._register_list_change_callbacks(item, new_path)
def _start_autostart_tasks(self) -> None:
if self._autostart_tasks is not None:
for service_name, args in self._autostart_tasks.items():
start_method = getattr(self, f"start_{service_name}", None)
if start_method is not None and callable(start_method):
start_method(*args)
else:
logger.warning(
f"No start method found for service '{service_name}'"
)
def _start_async_loop_in_thread(self) -> None:
# create a new event loop and run it in a separate thread
self.__loop = asyncio.new_event_loop()
self.__thread = threading.Thread(target=self._start_loop)
self.__thread.start()
def _set_start_and_stop_for_async_methods(self) -> None:
# inspect the methods of the class
for name, method in inspect.getmembers(
self, predicate=inspect.iscoroutinefunction
):
def start_task(*args: Any, **kwargs: Any) -> None:
async def task(*args: Any, **kwargs: Any) -> None:
try:
await getattr(self, name)(*args, **kwargs)
except asyncio.CancelledError:
print(f"Task {name} was cancelled")
self.__tasks[name] = asyncio.run_coroutine_threadsafe(
task(*args, **kwargs), self.__loop
)
def stop_task() -> None:
# cancel the task
task = self.__tasks.get(name)
if task is not None:
self.__loop.call_soon_threadsafe(task.cancel)
# create start and stop methods for each coroutine
setattr(self, f"start_{name}", start_task)
setattr(self, f"stop_{name}", stop_task)
def _register_DataService_instance_callbacks(
self, obj: "DataService", parent_path: str
) -> None:
"""
This function is a key part of the observer pattern implemented by the
DataService class.
Its purpose is to allow the system to automatically send out notifications
whenever an attribute of a DataService instance is updated, which is especially
useful when the DataService instance is part of a nested structure.
It works by recursively registering callbacks for a given DataService instance
and all of its nested attributes. Each callback is responsible for emitting a
notification when the attribute it is attached to is modified.
This function ensures that only the root DataService instance (the one directly
exposed to the user or another system via rpyc) emits notifications.
Each notification contains a 'parent_path' that traces the attribute's location
within the nested DataService structure, starting from the root. This makes it
easier for observers to determine exactly where a change has occurred.
Parameters:
-----------
obj: DataService
The target object on which callbacks are to be registered.
parent_path: str
The access path for the parent object. This is used to construct the full
access path for the notifications.
"""
# Create and register a callback for the object
# only emit the notification when the call was registered by the root object
callback: Callable[[str, Any], None] = (
lambda name, value: obj._emit_notification(
parent_path=parent_path, name=name, value=value
)
if self == obj.__root__
and not name.startswith("_") # we are only interested in public attributes
and not isinstance(
getattr(type(obj), name, None), property
) # exlude proerty notifications -> those are handled in separate callbacks
else None
)
obj._callbacks.add(callback)
# Recursively register callbacks for all nested attributes of the object
attrs = obj.__get_class_and_instance_attributes()
for nested_attr_name, nested_attr in attrs.items():
if isinstance(nested_attr, DataServiceList):
self._register_list_callbacks(
nested_attr, parent_path, nested_attr_name
)
elif isinstance(nested_attr, DataService):
self._register_service_callbacks(
nested_attr, parent_path, nested_attr_name
)
def _register_list_callbacks(
self, nested_attr: list[Any], parent_path: str, attr_name: str
) -> None:
"""Handles registration of callbacks for list attributes"""
for i, list_item in enumerate(nested_attr):
if isinstance(list_item, DataService):
self._register_service_callbacks(
list_item, parent_path, f"{attr_name}[{i}]"
)
def _register_service_callbacks(
self, nested_attr: "DataService", parent_path: str, attr_name: str
) -> None:
"""Handles registration of callbacks for DataService attributes"""
# as the DataService is an attribute of self, change the root object
# use the dictionary to not trigger callbacks on initialised objects
nested_attr.__dict__["__root__"] = self.__root__
new_path = f"{parent_path}.{attr_name}"
self._register_DataService_instance_callbacks(nested_attr, new_path)
def _start_loop(self) -> None:
asyncio.set_event_loop(self.__loop)
try:
self.__loop.run_forever()
finally:
# cancel all running tasks
for task in self.__tasks.values():
self.__loop.call_soon_threadsafe(task.cancel)
self.__loop.call_soon_threadsafe(self.__loop.stop)
self.__thread.join()
def __setattr__(self, __name: str, __value: Any) -> None:
super().__setattr__(__name, __value)
if self.__dict__.get("_initialised") and not __name == "_initialised":
for callback in self._callbacks:
callback(__name, __value)
elif __name.startswith(f"_{self.__class__.__name__}__"):
logger.warning(
f"Warning: You should not set private but rather protected attributes! "
f"Use {__name.replace(f'_{self.__class__.__name__}__', '_')} instead "
f"of {__name.replace(f'_{self.__class__.__name__}__', '__')}."
)
def _emit_notification(self, parent_path: str, name: str, value: Any) -> None:
logger.debug(f"{parent_path}.{name} changed to {value}!")
def _rpyc_getattr(self, name: str) -> Any:
if name.startswith("_"):
# disallow special and private attributes
raise AttributeError("cannot access private/special names")
# allow all other attributes
return getattr(self, name)
def _rpyc_setattr(self, name: str, value: Any) -> None:
if name.startswith("_"):
# disallow special and private attributes
raise AttributeError("cannot access private/special names")
# check if the attribute has a setter method
attr = getattr(self, name, None)
if isinstance(attr, property) and attr.fset is None:
raise AttributeError(f"{name} attribute does not have a setter method")
# allow all other attributes
setattr(self, name, value)
def __get_class_and_instance_attributes(self) -> dict[str, Any]:
"""Dictionary containing all attributes (both instance and class level) of a
given object.
@ -427,6 +418,15 @@ class DataService(rpyc.Service):
attrs.pop("__root__")
return attrs
def __check_instance_classes(self) -> None:
for attr_name, attr_value in self.__get_class_and_instance_attributes().items():
# every class defined by the user should inherit from DataService
if not attr_name.startswith("_DataService__"):
warn_if_instance_class_does_not_inherit_from_DataService(attr_value)
def _emit_notification(self, parent_path: str, name: str, value: Any) -> None:
logger.debug(f"{parent_path}.{name} changed to {value}!")
def serialize(self, prefix: str = "") -> dict[str, dict[str, Any]]:
"""
Serializes the instance into a dictionary, preserving the structure of the