updates Server (adds Observer, updates sio_callback)

This commit is contained in:
Mose Müller 2023-12-05 10:49:00 +01:00
parent bb415af460
commit 52d571e551

View File

@ -4,7 +4,6 @@ import os
import signal import signal
import threading import threading
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
from copy import deepcopy
from pathlib import Path from pathlib import Path
from types import FrameType from types import FrameType
from typing import Any, Protocol, TypedDict from typing import Any, Protocol, TypedDict
@ -14,8 +13,9 @@ from rpyc import ForkingServer, ThreadedServer # type: ignore[import-untyped]
from uvicorn.server import HANDLED_SIGNALS from uvicorn.server import HANDLED_SIGNALS
from pydase import DataService from pydase import DataService
from pydase.data_service.data_service_observer import DataServiceObserver
from pydase.data_service.state_manager import StateManager from pydase.data_service.state_manager import StateManager
from pydase.utils.serializer import dump, get_nested_dict_by_path from pydase.utils.serializer import dump
from .web_server import WebAPI from .web_server import WebAPI
@ -192,6 +192,7 @@ class Server:
if getattr(self._service, "_filename", None) is not None: if getattr(self._service, "_filename", None) is not None:
self._service._state_manager = self._state_manager self._service._state_manager = self._state_manager
self._state_manager.load_state() self._state_manager.load_state()
self._observer = DataServiceObserver(self._state_manager)
def run(self) -> None: def run(self) -> None:
""" """
@ -276,36 +277,33 @@ class Server:
) )
) )
def sio_callback(parent_path: str, name: str, value: Any) -> None: def sio_callback(
full_access_path = ".".join([*parent_path.split(".")[1:], name]) full_access_path: str, value: Any, cached_value_dict: dict[str, Any]
cached_value_dict = deepcopy( ) -> None:
get_nested_dict_by_path(self._state_manager.cache, full_access_path) if cached_value_dict != {}:
) serialized_value = dump(value)
serialized_value = dump(value) if cached_value_dict["type"] != "method":
cached_value_dict["type"] = serialized_value["type"]
if cached_value_dict["type"] != "method": cached_value_dict["value"] = serialized_value["value"]
cached_value_dict["type"] = serialized_value["type"]
cached_value_dict["value"] = serialized_value["value"] async def notify() -> None:
try:
await self._wapi.sio.emit(
"notify",
{
"data": {
"full_access_path": full_access_path,
"value": cached_value_dict,
}
},
)
except Exception as e:
logger.warning("Failed to send notification: %s", e)
async def notify() -> None: self._loop.create_task(notify())
try:
await self._wapi.sio.emit(
"notify",
{
"data": {
"parent_path": parent_path,
"name": name,
"value": cached_value_dict,
}
},
)
except Exception as e:
logger.warning("Failed to send notification: %s", e)
self._loop.create_task(notify()) self._observer.add_notification_callback(sio_callback)
self._service._callback_manager.add_notification_callback(sio_callback)
# overwrite uvicorn's signal handlers, otherwise it will bogart SIGINT and # overwrite uvicorn's signal handlers, otherwise it will bogart SIGINT and
# SIGTERM, which makes it impossible to escape out of # SIGTERM, which makes it impossible to escape out of