updates client proxy

- will now be changed in place (instead of being overwritten on reconnect, which was the only way
of adding or removing property getters / setters)
- replaces getters/setters and methods of proxy with __setattr__ and __getattribute__ functionality
- replaces ProxyClassFactory with ProxyClass and ProxyLoader. The latter updates the former on
reconnect
- client does not need to be a DataService anymore. It only establishes the connection and holds
the reference to the proxy class.
This commit is contained in:
Mose Müller 2024-04-04 11:31:14 +02:00
parent 439665177d
commit 5511ebc808
3 changed files with 327 additions and 258 deletions

View File

@ -1,16 +1,13 @@
import asyncio
import logging
import threading
from typing import Any, TypedDict, cast
from typing import TypedDict, cast
import socketio # type: ignore
import pydase.data_service
from pydase.client.proxy_class_factory import ProxyClassFactory, ProxyConnection
from pydase.utils.helpers import is_property_attribute
from pydase.client.proxy_loader import ProxyClass, ProxyLoader
from pydase.utils.serialization.deserializer import loads
from pydase.utils.serialization.serializer import SerializedObject, dump
from pydase.utils.serialization.types import SerializedDataService
from pydase.utils.serialization.types import SerializedDataService, SerializedObject
logger = logging.getLogger(__name__)
@ -24,19 +21,22 @@ class NotifyDict(TypedDict):
data: NotifyDataDict
class Client(pydase.data_service.DataService):
def __init__(self, hostname: str, port: int):
super().__init__()
def asyncio_loop_thread(loop: asyncio.AbstractEventLoop) -> None:
asyncio.set_event_loop(loop)
loop.run_forever()
class Client:
def __init__(self, hostname: str = "localhost", port: int = 8001):
self._hostname = hostname
self._port = port
self._sio = socketio.AsyncClient()
self._loop = asyncio.new_event_loop()
self._proxy_class_factory = ProxyClassFactory(self._sio, self._loop)
self.proxy = ProxyClass(sio_client=self._sio, loop=self._loop)
self._thread = threading.Thread(
target=self.__asyncio_loop_thread, args=(self._loop,), daemon=True
target=asyncio_loop_thread, args=(self._loop,), daemon=True
)
self._thread.start()
self.proxy: ProxyConnection
asyncio.run_coroutine_threadsafe(self._connect(), self._loop).result()
async def _connect(self) -> None:
@ -48,60 +48,20 @@ class Client(pydase.data_service.DataService):
transports=["websocket"],
)
def __asyncio_loop_thread(self, loop: asyncio.AbstractEventLoop) -> None:
asyncio.set_event_loop(loop)
loop.run_forever()
async def _setup_events(self) -> None:
@self._sio.event
async def connect() -> None:
logger.debug("Connected to '%s:%s' ...", self._hostname, self._port)
serialized_data = cast(
serialized_object = cast(
SerializedDataService, await self._sio.call("service_serialization")
)
if not hasattr(self, "proxy"):
self.proxy = self._proxy_class_factory.create_proxy(serialized_data)
else:
# need to change to avoid overwriting the proxy class
serialized_data["type"] = "DeviceConnection"
super(pydase.DataService, self.proxy)._notify_changed(
"", loads(serialized_data)
)
self.proxy._connected = True
@self._sio.event
async def disconnect() -> None:
logger.debug("Disconnected")
self.proxy._connected = False
ProxyLoader.update_data_service_proxy(
self.proxy, serialized_object=serialized_object
)
@self._sio.event
async def notify(data: NotifyDict) -> None:
# Notify the DataServiceObserver directly, not going through
# self._notify_changed as this would trigger the "update_value" event
super(pydase.DataService, self.proxy)._notify_changed(
self.proxy._notify_changed(
data["data"]["full_access_path"],
loads(data["data"]["value"]),
)
async def _disconnect(self) -> None:
await self._sio.disconnect()
def _notify_changed(self, changed_attribute: str, value: Any) -> None:
if (
changed_attribute.startswith("proxy.")
# do not emit update event for properties which emit that event themselves
and not is_property_attribute(self, changed_attribute)
and all(part[0] != "_" for part in changed_attribute.split("."))
):
async def update_value() -> None:
await self._sio.call(
"update_value",
{
"access_path": changed_attribute[6:],
"value": dump(value),
},
)
asyncio.run_coroutine_threadsafe(update_value(), loop=self._loop)
return super()._notify_changed(changed_attribute, value)

View File

@ -1,201 +0,0 @@
import asyncio
import logging
from collections.abc import Callable
from copy import copy
from typing import Any, cast
import socketio # type: ignore
import pydase.components
import pydase.data_service
import pydase.observer_pattern.observer
from pydase.utils.helpers import is_property_attribute
from pydase.utils.serialization.deserializer import Deserializer, loads
from pydase.utils.serialization.serializer import (
SerializedMethod,
SerializedObject,
dump,
)
logger = logging.getLogger(__name__)
class ProxyClassMixin:
_sio: socketio.AsyncClient
_loop: asyncio.AbstractEventLoop
def __setattr__(self, key: str, value: Any) -> None:
# prevent overriding of proxy attributes
if (
not is_property_attribute(self, key)
and hasattr(self, key)
and isinstance(getattr(self, key), ProxyBaseClass)
):
raise AttributeError(f"{key} is read-only and cannot be overridden.")
super().__setattr__(key, value)
class ProxyBaseClass(pydase.data_service.DataService, ProxyClassMixin):
pass
class ProxyConnection(pydase.components.DeviceConnection, ProxyClassMixin):
def __init__(self) -> None:
super().__init__()
self._reconnection_wait_time = 1.0
class ProxyClassFactory:
def __init__(
self, sio_client: socketio.AsyncClient, loop: asyncio.AbstractEventLoop
) -> None:
self.sio_client = sio_client
self.loop = loop
def create_proxy(self, data: SerializedObject) -> ProxyConnection:
proxy_class = self._deserialize_component_type(data, ProxyConnection)
proxy_class._sio = self.sio_client
proxy_class._loop = self.loop
proxy_class._initialised = True
return proxy_class # type: ignore
def _deserialize(self, serialized_object: SerializedObject) -> Any:
type_handler: dict[str | None, None | Callable[..., Any]] = {
None: None,
"int": self._create_attr_property,
"float": self._create_attr_property,
"bool": self._create_attr_property,
"str": self._create_attr_property,
"NoneType": self._create_attr_property,
"Quantity": self._create_attr_property,
"Enum": self._create_attr_property,
"ColouredEnum": self._create_attr_property,
"list": loads,
"dict": loads,
"Exception": loads,
}
# First go through handled types (as ColouredEnum is also within the components)
handler = type_handler.get(serialized_object["type"])
if handler:
return handler(serialized_object)
# Custom types like Components or DataService classes
component_class = Deserializer.get_component_class(serialized_object["type"])
if component_class:
proxy_class = self._deserialize_component_type(
serialized_object, component_class
)
proxy_class._sio = self.sio_client
proxy_class._loop = self.loop
proxy_class._initialised = True
return proxy_class
return None
def _deserialize_method(
self, serialized_object: SerializedMethod
) -> Callable[..., Any]:
def method_proxy(self: ProxyBaseClass, *args: Any, **kwargs: Any) -> Any:
async def trigger_method() -> Any:
return await self._sio.call(
"trigger_method",
{
"access_path": serialized_object["full_access_path"],
"args": dump(list(args)),
"kwargs": dump(kwargs),
},
)
result = asyncio.run_coroutine_threadsafe(
trigger_method(),
loop=self._loop,
).result()
return loads(result)
return method_proxy
def _deserialize_component_type(
self, serialized_object: SerializedObject, base_class: type
) -> pydase.data_service.DataService:
def add_prefix_to_last_path_element(s: str, prefix: str) -> str:
parts = s.split(".")
parts[-1] = f"{prefix}_{parts[-1]}"
return ".".join(parts)
def create_proxy_class(serialized_object: SerializedObject) -> type:
class_bases = (
ProxyBaseClass,
base_class,
)
class_attrs: dict[str, Any] = {}
# Process and add properties based on the serialized object
for key, value in cast(
dict[str, SerializedObject], serialized_object["value"]
).items():
if value["type"] == "method":
if value["async"] is True:
start_method = copy(value)
start_method["full_access_path"] = (
add_prefix_to_last_path_element(
start_method["full_access_path"], "start"
)
)
stop_method = copy(value)
stop_method["full_access_path"] = (
add_prefix_to_last_path_element(
stop_method["full_access_path"], "stop"
)
)
class_attrs[f"start_{key}"] = self._deserialize_method(
start_method
)
class_attrs[f"stop_{key}"] = self._deserialize_method(
stop_method
)
else:
class_attrs[key] = self._deserialize_method(value)
else:
class_attrs[key] = self._deserialize(value)
# Create the dynamic class with the given name and attributes
return type(serialized_object["name"], class_bases, class_attrs) # type: ignore
return create_proxy_class(serialized_object)()
def _create_attr_property(self, serialized_attr: SerializedObject) -> property:
def get(self: ProxyBaseClass) -> Any:
async def get_result() -> Any:
return await self._sio.call(
"get_value", serialized_attr["full_access_path"]
)
result = asyncio.run_coroutine_threadsafe(
get_result(),
loop=self._loop,
).result()
return loads(result)
get.__doc__ = serialized_attr["doc"]
def set(self: ProxyBaseClass, value: Any) -> None:
async def set_result() -> Any:
return await self._sio.call(
"update_value",
{
"access_path": serialized_attr["full_access_path"],
"value": dump(value),
},
)
result: SerializedObject | None = asyncio.run_coroutine_threadsafe(
set_result(),
loop=self._loop,
).result()
if result is not None:
loads(result)
if serialized_attr["readonly"]:
return property(get)
return property(get, set)

View File

@ -0,0 +1,310 @@
import asyncio
import logging
from copy import copy
from typing import TYPE_CHECKING, Any, cast
import socketio # type: ignore
import pydase.components
import pydase.data_service
from pydase.utils.serialization.deserializer import loads
from pydase.utils.serialization.serializer import dump
from pydase.utils.serialization.types import SerializedObject
if TYPE_CHECKING:
from collections.abc import Callable
logger = logging.getLogger(__name__)
class ProxyAttributeError(Exception): ...
class ProxyList(list[Any]):
def __init__(
self,
original_list: list[Any],
parent_path: str,
sio_client: socketio.AsyncClient,
loop: asyncio.AbstractEventLoop,
) -> None:
super().__init__(original_list)
self._parent_path = parent_path
self._loop = loop
self._sio = sio_client
def __setitem__(self, key: int, value: Any) -> None: # type: ignore[override]
full_access_path = f"{self._parent_path}[{key}]"
async def set_result() -> Any:
return await self._sio.call(
"update_value",
{
"access_path": full_access_path,
"value": dump(value),
},
)
result: SerializedObject | None = asyncio.run_coroutine_threadsafe(
set_result(),
loop=self._loop,
).result()
if result is not None:
ProxyLoader.loads_proxy(
serialized_object=result, sio_client=self._sio, loop=self._loop
)
class ProxyClassMixin:
def __init__(
self,
sio_client: socketio.AsyncClient,
loop: asyncio.AbstractEventLoop,
) -> None:
self._proxy_getters: dict[str, Callable[..., Any]] = {}
self._proxy_setters: dict[str, Callable[..., Any]] = {}
self._proxy_methods: dict[str, Callable[..., Any]] = {}
self._loop = loop
self._sio = sio_client
def __dir__(self) -> list[str]:
"""Used to provide tab completion on CLI / notebook"""
static_dir = super().__dir__()
return sorted({*static_dir, *self._proxy_getters, *self._proxy_methods.keys()})
def __getattribute__(self, name: str) -> Any:
try:
if name in super().__getattribute__("_proxy_getters"):
return super().__getattribute__("_proxy_getters")[name]()
if name in super().__getattribute__("_proxy_methods"):
return super().__getattribute__("_proxy_methods")[name]
except AttributeError:
pass
return super().__getattribute__(name)
def __setattr__(self, name: str, value: Any) -> None:
try:
if name in super().__getattribute__("_proxy_setters"):
return super().__getattribute__("_proxy_setters")[name](value)
if name in super().__getattribute__("_proxy_getters"):
raise ProxyAttributeError(
f"Proxy attribute {name!r} of {type(self).__name__!r} is readonly!"
)
except AttributeError:
pass
return super().__setattr__(name, value)
def _handle_serialized_method(
self, attr_name: str, serialized_object: SerializedObject
) -> None:
def add_prefix_to_last_path_element(s: str, prefix: str) -> str:
parts = s.split(".")
parts[-1] = f"{prefix}_{parts[-1]}"
return ".".join(parts)
if serialized_object["type"] == "method":
if serialized_object["async"] is True:
start_method = copy(serialized_object)
start_method["full_access_path"] = add_prefix_to_last_path_element(
start_method["full_access_path"], "start"
)
stop_method = copy(serialized_object)
stop_method["full_access_path"] = add_prefix_to_last_path_element(
stop_method["full_access_path"], "stop"
)
self._add_method_proxy(f"start_{attr_name}", start_method)
self._add_method_proxy(f"stop_{attr_name}", stop_method)
else:
self._add_method_proxy(attr_name, serialized_object)
def _add_method_proxy(
self, attr_name: str, serialized_object: SerializedObject
) -> None:
def method_proxy(*args: Any, **kwargs: Any) -> Any:
async def trigger_method() -> Any:
return await self._sio.call(
"trigger_method",
{
"access_path": serialized_object["full_access_path"],
"args": dump(list(args)),
"kwargs": dump(kwargs),
},
)
result = asyncio.run_coroutine_threadsafe(
trigger_method(),
loop=self._loop,
).result()
return loads(result)
self._proxy_methods[attr_name] = method_proxy
def _add_attr_proxy(
self, attr_name: str, serialized_object: SerializedObject
) -> None:
self._add_getattr_proxy(attr_name, serialized_object=serialized_object)
if not serialized_object["readonly"]:
self._add_setattr_proxy(attr_name, serialized_object=serialized_object)
def _add_setattr_proxy(
self, attr_name: str, serialized_object: SerializedObject
) -> None:
self._add_getattr_proxy(attr_name, serialized_object=serialized_object)
if not serialized_object["readonly"]:
def setter_proxy(value: Any) -> None:
async def set_result() -> Any:
return await self._sio.call(
"update_value",
{
"access_path": serialized_object["full_access_path"],
"value": dump(value),
},
)
result: SerializedObject | None = asyncio.run_coroutine_threadsafe(
set_result(),
loop=self._loop,
).result()
if result is not None:
ProxyLoader.loads_proxy(result, self._sio, self._loop)
self._proxy_setters[attr_name] = setter_proxy
def _add_getattr_proxy(
self, attr_name: str, serialized_object: SerializedObject
) -> None:
def getter_proxy() -> Any:
async def get_result() -> Any:
return await self._sio.call(
"get_value", serialized_object["full_access_path"]
)
result = asyncio.run_coroutine_threadsafe(
get_result(),
loop=self._loop,
).result()
return ProxyLoader.loads_proxy(result, self._sio, self._loop)
self._proxy_getters[attr_name] = getter_proxy
class ProxyClass(pydase.data_service.DataService, ProxyClassMixin):
def __init__(
self,
sio_client: socketio.AsyncClient,
loop: asyncio.AbstractEventLoop,
) -> None:
ProxyClassMixin.__init__(self, sio_client=sio_client, loop=loop)
pydase.DataService.__init__(self)
class ProxyLoader:
@staticmethod
def load_list_proxy(
serialized_object: SerializedObject,
sio_client: socketio.AsyncClient,
loop: asyncio.AbstractEventLoop,
) -> Any:
return ProxyList(
[
ProxyLoader.loads_proxy(item, sio_client, loop)
for item in cast(list[SerializedObject], serialized_object["value"])
],
parent_path=serialized_object["full_access_path"],
sio_client=sio_client,
loop=loop,
)
@staticmethod
def load_dict_proxy(
serialized_object: SerializedObject,
sio_client: socketio.AsyncClient,
loop: asyncio.AbstractEventLoop,
) -> Any:
return loads(serialized_object)
@staticmethod
def update_data_service_proxy(
proxy_class: ProxyClassMixin,
serialized_object: SerializedObject,
) -> Any:
proxy_class._proxy_getters.clear()
proxy_class._proxy_setters.clear()
proxy_class._proxy_methods.clear()
for key, value in cast(
dict[str, SerializedObject], serialized_object["value"]
).items():
type_handler: dict[str | None, None | Callable[..., Any]] = {
None: None,
"int": proxy_class._add_attr_proxy,
"float": proxy_class._add_attr_proxy,
"bool": proxy_class._add_attr_proxy,
"str": proxy_class._add_attr_proxy,
"NoneType": proxy_class._add_attr_proxy,
"Quantity": proxy_class._add_attr_proxy,
"Enum": proxy_class._add_attr_proxy,
"ColouredEnum": proxy_class._add_attr_proxy,
"method": proxy_class._handle_serialized_method,
"list": proxy_class._add_getattr_proxy,
"dict": proxy_class._add_getattr_proxy,
}
# First go through handled types (as ColouredEnum is also within the
# components)
handler = type_handler.get(value["type"])
if handler:
handler(key, value)
else:
proxy_class._add_getattr_proxy(key, value)
@staticmethod
def load_data_service_proxy(
serialized_object: SerializedObject,
sio_client: socketio.AsyncClient,
loop: asyncio.AbstractEventLoop,
) -> Any:
proxy_class = ProxyClass(sio_client=sio_client, loop=loop)
ProxyLoader.update_data_service_proxy(
proxy_class=proxy_class, serialized_object=serialized_object
)
return proxy_class
@staticmethod
def load_default(
serialized_object: SerializedObject,
sio_client: socketio.AsyncClient,
loop: asyncio.AbstractEventLoop,
) -> Any:
return loads(serialized_object)
@staticmethod
def loads_proxy(
serialized_object: SerializedObject,
sio_client: socketio.AsyncClient,
loop: asyncio.AbstractEventLoop,
) -> Any:
type_handler: dict[str | None, None | Callable[..., Any]] = {
"int": ProxyLoader.load_default,
"float": ProxyLoader.load_default,
"bool": ProxyLoader.load_default,
"str": ProxyLoader.load_default,
"NoneType": ProxyLoader.load_default,
"Quantity": ProxyLoader.load_default,
"Enum": ProxyLoader.load_default,
"ColouredEnum": ProxyLoader.load_default,
"Exception": ProxyLoader.load_default,
"list": ProxyLoader.load_list_proxy,
"dict": ProxyLoader.load_dict_proxy,
}
# First go through handled types (as ColouredEnum is also within the components)
handler = type_handler.get(serialized_object["type"])
if handler:
return handler(
serialized_object=serialized_object, sio_client=sio_client, loop=loop
)
return ProxyLoader.load_data_service_proxy(
serialized_object=serialized_object, sio_client=sio_client, loop=loop
)