From 52d571e551fe48738249b2b78cf6421cb1651dac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mose=20M=C3=BCller?= Date: Tue, 5 Dec 2023 10:49:00 +0100 Subject: [PATCH] updates Server (adds Observer, updates sio_callback) --- src/pydase/server/server.py | 54 ++++++++++++++++++------------------- 1 file changed, 26 insertions(+), 28 deletions(-) diff --git a/src/pydase/server/server.py b/src/pydase/server/server.py index f019019..e271189 100644 --- a/src/pydase/server/server.py +++ b/src/pydase/server/server.py @@ -4,7 +4,6 @@ import os import signal import threading from concurrent.futures import ThreadPoolExecutor -from copy import deepcopy from pathlib import Path from types import FrameType from typing import Any, Protocol, TypedDict @@ -14,8 +13,9 @@ from rpyc import ForkingServer, ThreadedServer # type: ignore[import-untyped] from uvicorn.server import HANDLED_SIGNALS from pydase import DataService +from pydase.data_service.data_service_observer import DataServiceObserver from pydase.data_service.state_manager import StateManager -from pydase.utils.serializer import dump, get_nested_dict_by_path +from pydase.utils.serializer import dump from .web_server import WebAPI @@ -192,6 +192,7 @@ class Server: if getattr(self._service, "_filename", None) is not None: self._service._state_manager = self._state_manager self._state_manager.load_state() + self._observer = DataServiceObserver(self._state_manager) def run(self) -> None: """ @@ -276,36 +277,33 @@ class Server: ) ) - def sio_callback(parent_path: str, name: str, value: Any) -> None: - full_access_path = ".".join([*parent_path.split(".")[1:], name]) - cached_value_dict = deepcopy( - get_nested_dict_by_path(self._state_manager.cache, full_access_path) - ) - serialized_value = dump(value) + def sio_callback( + full_access_path: str, value: Any, cached_value_dict: dict[str, Any] + ) -> None: + if cached_value_dict != {}: + serialized_value = dump(value) + if cached_value_dict["type"] != "method": + cached_value_dict["type"] = serialized_value["type"] - if cached_value_dict["type"] != "method": - cached_value_dict["type"] = serialized_value["type"] + cached_value_dict["value"] = serialized_value["value"] - cached_value_dict["value"] = serialized_value["value"] + async def notify() -> None: + try: + await self._wapi.sio.emit( + "notify", + { + "data": { + "full_access_path": full_access_path, + "value": cached_value_dict, + } + }, + ) + except Exception as e: + logger.warning("Failed to send notification: %s", e) - async def notify() -> None: - try: - await self._wapi.sio.emit( - "notify", - { - "data": { - "parent_path": parent_path, - "name": name, - "value": cached_value_dict, - } - }, - ) - except Exception as e: - logger.warning("Failed to send notification: %s", e) + self._loop.create_task(notify()) - self._loop.create_task(notify()) - - self._service._callback_manager.add_notification_callback(sio_callback) + self._observer.add_notification_callback(sio_callback) # overwrite uvicorn's signal handlers, otherwise it will bogart SIGINT and # SIGTERM, which makes it impossible to escape out of