From c7d63f51397c715779c2b0a66dd560fc1287419c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mose=20M=C3=BCller?= Date: Tue, 19 Dec 2023 12:57:30 +0100 Subject: [PATCH] replaces SioServerWrapper with setup function --- src/pydase/server/web_server/sio_server.py | 157 +++++++++++---------- src/pydase/server/web_server/web_server.py | 6 +- 2 files changed, 88 insertions(+), 75 deletions(-) diff --git a/src/pydase/server/web_server/sio_server.py b/src/pydase/server/web_server/sio_server.py index 00f88fe..7759b9c 100644 --- a/src/pydase/server/web_server/sio_server.py +++ b/src/pydase/server/web_server/sio_server.py @@ -6,6 +6,7 @@ import socketio # type: ignore[import-untyped] from pydase.data_service.data_service import process_callable_attribute from pydase.data_service.data_service_observer import DataServiceObserver +from pydase.data_service.state_manager import StateManager from pydase.utils.helpers import get_object_attr_from_path_list from pydase.utils.logging import SocketIOHandler from pydase.utils.serializer import dump @@ -89,88 +90,98 @@ class UpdateWebSettingsDict(TypedDict): value: Any -class SioServerWrapper: - def __init__( - self, - observer: DataServiceObserver, - enable_cors: bool, - loop: asyncio.AbstractEventLoop, +def setup_sio_server( + observer: DataServiceObserver, + enable_cors: bool, + loop: asyncio.AbstractEventLoop, +) -> socketio.AsyncServer: + """ + Sets up and configures a Socket.IO asynchronous server. + + Args: + observer (DataServiceObserver): + The observer managing state updates and communication. + enable_cors (bool): + Flag indicating whether CORS should be enabled for the server. + loop (asyncio.AbstractEventLoop): + The event loop in which the server will run. + + Returns: + socketio.AsyncServer: The configured Socket.IO asynchronous server. + """ + + state_manager = observer.state_manager + + if enable_cors: + sio = socketio.AsyncServer(async_mode="asgi", cors_allowed_origins="*") + else: + sio = socketio.AsyncServer(async_mode="asgi") + + setup_sio_events(sio, state_manager) + setup_logging_handler(sio) + + # Add notification callback to observer + def sio_callback( + full_access_path: str, value: Any, cached_value_dict: dict[str, Any] ) -> None: - self._loop = loop - self._enable_cors = enable_cors - self._observer = observer - self._state_manager = self._observer.state_manager - self._service = self._state_manager.service + if cached_value_dict != {}: + serialized_value = dump(value) + if cached_value_dict["type"] != "method": + cached_value_dict["type"] = serialized_value["type"] - self._setup_sio() - self._setup_logging_handler() + cached_value_dict["value"] = serialized_value["value"] - def _setup_logging_handler(self) -> None: - logger = logging.getLogger() - logger.addHandler(SocketIOHandler(self.sio)) + async def notify() -> None: + try: + await sio.emit( + "notify", + { + "data": { + "full_access_path": full_access_path, + "value": cached_value_dict, + } + }, + ) + except Exception as e: + logger.warning("Failed to send notification: %s", e) - def _setup_sio(self) -> None: - if self._enable_cors: - self.sio = socketio.AsyncServer(async_mode="asgi", cors_allowed_origins="*") - else: - self.sio = socketio.AsyncServer(async_mode="asgi") + loop.create_task(notify()) - @self.sio.event - def set_attribute(sid: str, data: UpdateDict) -> Any: - logger.debug("Received frontend update: %s", data) - path_list = [*data["parent_path"].split("."), data["name"]] - path_list.remove("DataService") # always at the start, does not do anything - path = ".".join(path_list) - return self._state_manager.set_service_attribute_value_by_path( - path=path, value=data["value"] - ) + observer.add_notification_callback(sio_callback) - @self.sio.event - def run_method(sid: str, data: RunMethodDict) -> Any: - logger.debug("Running method: %s", data) - path_list = [*data["parent_path"].split("."), data["name"]] - path_list.remove("DataService") # always at the start, does not do anything - method = get_object_attr_from_path_list(self._service, path_list) - return process_callable_attribute(method, data["kwargs"]) + return sio - @self.sio.event - def web_settings(sid: str, data: UpdateWebSettingsDict) -> Any: - logger.debug("Received web settings update: %s", data) - path_list, config_option, value = ( - data["access_path"].split("."), - data["config_option"], - data["value"], - ) - path_list.pop(0) # remove first entry (specifies root object, not needed) - # write to web-settings.json file - self._add_notification_callback_to_observer() +def setup_sio_events(sio: socketio.AsyncServer, state_manager: StateManager) -> None: + @sio.event + def set_attribute(sid: str, data: UpdateDict) -> Any: + logger.debug("Received frontend update: %s", data) + path_list = [*data["parent_path"].split("."), data["name"]] + path_list.remove("DataService") # always at the start, does not do anything + path = ".".join(path_list) + return state_manager.set_service_attribute_value_by_path( + path=path, value=data["value"] + ) - def _add_notification_callback_to_observer(self) -> None: - def sio_callback( - full_access_path: str, value: Any, cached_value_dict: dict[str, Any] - ) -> None: - if cached_value_dict != {}: - serialized_value = dump(value) - if cached_value_dict["type"] != "method": - cached_value_dict["type"] = serialized_value["type"] + @sio.event + def run_method(sid: str, data: RunMethodDict) -> Any: + logger.debug("Running method: %s", data) + path_list = [*data["parent_path"].split("."), data["name"]] + path_list.remove("DataService") # always at the start, does not do anything + method = get_object_attr_from_path_list(state_manager.service, path_list) + return process_callable_attribute(method, data["kwargs"]) - cached_value_dict["value"] = serialized_value["value"] + @sio.event + def web_settings(sid: str, data: UpdateWebSettingsDict) -> Any: + logger.debug("Received web settings update: %s", data) + path_list, config_option, value = ( + data["access_path"].split("."), + data["config_option"], + data["value"], + ) + path_list.pop(0) # remove first entry (specifies root object, not needed) - async def notify() -> None: - try: - await self.sio.emit( - "notify", - { - "data": { - "full_access_path": full_access_path, - "value": cached_value_dict, - } - }, - ) - except Exception as e: - logger.warning("Failed to send notification: %s", e) - self._loop.create_task(notify()) - - self._observer.add_notification_callback(sio_callback) +def setup_logging_handler(sio: socketio.AsyncServer) -> None: + logger = logging.getLogger() + logger.addHandler(SocketIOHandler(sio)) diff --git a/src/pydase/server/web_server/web_server.py b/src/pydase/server/web_server/web_server.py index be0c2cd..cae60b8 100644 --- a/src/pydase/server/web_server/web_server.py +++ b/src/pydase/server/web_server/web_server.py @@ -11,7 +11,9 @@ from fastapi.responses import FileResponse from fastapi.staticfiles import StaticFiles from pydase.data_service.data_service_observer import DataServiceObserver -from pydase.server.web_server.sio_server_wrapper import SioServerWrapper +from pydase.server.web_server.sio_server import ( + setup_sio_server, +) from pydase.version import __version__ logger = logging.getLogger(__name__) @@ -77,7 +79,7 @@ class WebServer: await self.web_server.serve() def _setup_socketio(self) -> None: - self._sio = SioServerWrapper(self.observer, self.enable_cors, self._loop).sio + self._sio = setup_sio_server(self.observer, self.enable_cors, self._loop) self.__sio_app = socketio.ASGIApp(self._sio) def _setup_fastapi_app(self) -> None: