updates sio events in web server

This commit is contained in:
Mose Müller 2024-03-27 09:29:29 +01:00
parent 27f22d472d
commit 4db15f2fe8

View File

@ -2,14 +2,15 @@ import asyncio
import logging import logging
from typing import Any, TypedDict from typing import Any, TypedDict
import click
import socketio # type: ignore[import-untyped] import socketio # type: ignore[import-untyped]
from pydase.data_service.data_service import process_callable_attribute
from pydase.data_service.data_service_observer import DataServiceObserver from pydase.data_service.data_service_observer import DataServiceObserver
from pydase.data_service.state_manager import StateManager from pydase.data_service.state_manager import StateManager
from pydase.utils.deserializer import Deserializer, loads
from pydase.utils.helpers import get_object_attr_from_path_list from pydase.utils.helpers import get_object_attr_from_path_list
from pydase.utils.logging import SocketIOHandler from pydase.utils.logging import SocketIOHandler
from pydase.utils.serializer import SerializedObject from pydase.utils.serializer import SerializedObject, dump
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -21,26 +22,17 @@ class UpdateDict(TypedDict):
Attributes: Attributes:
---------- ----------
name : str value : SerializedObject
The name of the attribute to be updated in the DataService instance. The serialized new value to be assigned to the attribute.
If the attribute is part of a nested structure, this would be the name of the
attribute in the last nested object. For example, for an attribute access path
'attr1.list_attr[0].attr2', 'attr2' would be the name.
parent_path : str
The access path for the parent object of the attribute to be updated. This is
used to construct the full access path for the attribute. For example, for an
attribute access path 'attr1.list_attr[0].attr2', 'attr1.list_attr[0]' would be
the parent_path.
value : Any
The new value to be assigned to the attribute. The type of this value should
match the type of the attribute to be updated.
""" """
name: str value: SerializedObject
parent_path: str
value: Any
class TriggerMethodDict(TypedDict):
access_path: str
args: SerializedObject
kwargs: SerializedObject
class RunMethodDict(TypedDict): class RunMethodDict(TypedDict):
@ -120,23 +112,46 @@ def setup_sio_server(
def setup_sio_events(sio: socketio.AsyncServer, state_manager: StateManager) -> None: def setup_sio_events(sio: socketio.AsyncServer, state_manager: StateManager) -> None:
@sio.event # type: ignore
async def connect(sid: str, environ: Any) -> None:
logging.debug("Client [%s] connected", click.style(str(sid), fg="cyan"))
await sio.emit("class_structure", state_manager.cache, to=sid)
@sio.event # type: ignore
async def disconnect(sid: str) -> None:
logging.debug("Client [%s] disconnected", click.style(str(sid), fg="cyan"))
@sio.event @sio.event
def set_attribute(sid: str, data: UpdateDict) -> Any: async def update_value(sid: str, data: UpdateDict) -> None:
logger.debug("Received frontend update: %s", data) logger.debug(data)
parent_path = data["parent_path"].split(".") path = data["value"]["full_access_path"]
path_list = [element for element in parent_path if element] + [data["name"]]
path = ".".join(path_list) # this should probably happen within the following function call -> can also
# look at the current type of the attribute at "path"
new_value = loads(data["value"])
return state_manager.set_service_attribute_value_by_path( return state_manager.set_service_attribute_value_by_path(
path=path, value=data["value"] path=path, value=new_value
) )
@sio.event @sio.event
def run_method(sid: str, data: RunMethodDict) -> Any: async def get_value(sid: str, access_path: str) -> SerializedObject:
logger.debug("Running method: %s", data) return state_manager._data_service_cache.get_value_dict_from_cache(access_path)
parent_path = data["parent_path"].split(".")
path_list = [element for element in parent_path if element] + [data["name"]] @sio.event
method = get_object_attr_from_path_list(state_manager.service, path_list) async def trigger_method(sid: str, data: TriggerMethodDict) -> Any:
return process_callable_attribute(method, data["kwargs"]) logger.info(data)
try:
method = get_object_attr_from_path_list(
state_manager.service, data["access_path"].split(".")
)
args = Deserializer.deserialize(data["args"])
kwargs: dict[str, Any] = Deserializer.deserialize(data["kwargs"])
return dump(method(*args, **kwargs))
except Exception as e:
logger.error(e)
return dump(e)
def setup_logging_handler(sio: socketio.AsyncServer) -> None: def setup_logging_handler(sio: socketio.AsyncServer) -> None: