clients will now receive updates from socketio server and notify the observer

This commit is contained in:
Mose Müller 2024-03-27 08:37:39 +01:00
parent 11670addc4
commit c1aa678384

View File

@ -1,20 +1,34 @@
import logging
import time
from typing import TYPE_CHECKING, TypedDict
import socketio # type: ignore
from pydase.client.proxy_class_factory import ProxyClassFactory
from pydase.utils.deserializer import loads
from pydase.utils.serializer import SerializedObject
if TYPE_CHECKING:
from pydase.client.proxy_class_factory import ProxyClass
logger = logging.getLogger(__name__)
class NotifyDataDict(TypedDict):
full_access_path: str
value: SerializedObject
class NotifyDict(TypedDict):
data: NotifyDataDict
class Client:
def __init__(self, hostname: str, port: int):
self.sio = socketio.Client()
self.setup_events()
self.proxy_class_factory = ProxyClassFactory(self.sio)
self.proxy = None
self.proxy: ProxyClass | None = None
self.sio.connect(
f"ws://{hostname}:{port}",
socketio_path="/ws/socket.io",
@ -29,5 +43,12 @@ class Client:
def class_structure(data: SerializedObject) -> None:
self.proxy = self.proxy_class_factory.create_proxy(data)
@self.sio.event
def notify(data: NotifyDict) -> None:
if self.proxy is not None:
self.proxy._notify_changed(
data["data"]["full_access_path"], loads(data["data"]["value"])
)
def disconnect(self) -> None:
self.sio.disconnect()