From 3186e04cc199b0240ad6d0ed225f3e2b5e67d4ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mose=20M=C3=BCller?= Date: Tue, 19 Dec 2023 10:58:18 +0100 Subject: [PATCH] creates web_server module with WebServer complying with AdditionalServerProtocol --- src/pydase/server/__init__.py | 6 +- src/pydase/server/server.py | 54 ++----- src/pydase/server/web_server/__init__.py | 3 + .../sio_server_wrapper.py} | 144 +++++++----------- src/pydase/server/web_server/web_server.py | 134 ++++++++++++++++ 5 files changed, 204 insertions(+), 137 deletions(-) create mode 100644 src/pydase/server/web_server/__init__.py rename src/pydase/server/{web_server.py => web_server/sio_server_wrapper.py} (56%) create mode 100644 src/pydase/server/web_server/web_server.py diff --git a/src/pydase/server/__init__.py b/src/pydase/server/__init__.py index b79f06d..2cc803d 100644 --- a/src/pydase/server/__init__.py +++ b/src/pydase/server/__init__.py @@ -1,3 +1,7 @@ from pydase.server.server import Server +from pydase.server.web_server.web_server import WebServer -__all__ = ["Server"] +__all__ = [ + "Server", + "WebServer", +] diff --git a/src/pydase/server/server.py b/src/pydase/server/server.py index b4f6588..cb17037 100644 --- a/src/pydase/server/server.py +++ b/src/pydase/server/server.py @@ -8,16 +8,13 @@ from pathlib import Path from types import FrameType from typing import Any, Protocol, TypedDict -import uvicorn from rpyc import ForkingServer, ThreadedServer # type: ignore[import-untyped] from uvicorn.server import HANDLED_SIGNALS from pydase import DataService from pydase.data_service.data_service_observer import DataServiceObserver from pydase.data_service.state_manager import StateManager -from pydase.utils.serializer import dump - -from .web_server import WebAPI +from pydase.server.web_server import WebServer logger = logging.getLogger(__name__) @@ -230,7 +227,7 @@ class Server: logger.info("Finished server process [%s]", process_id) - async def startup(self) -> None: # noqa: C901 + async def startup(self) -> None: self._loop = asyncio.get_running_loop() self._loop.set_exception_handler(self.custom_exception_handler) self.install_signal_handlers() @@ -252,10 +249,11 @@ class Server: self.servers["rpyc"] = future_or_task for server in self._additional_servers: addin_server = server["server"]( - self._service, - port=server["port"], + service=self._service, host=self._host, + port=server["port"], state_manager=self._state_manager, + data_service_observer=self._observer, **server["kwargs"], ) @@ -266,48 +264,14 @@ class Server: future_or_task = self._loop.create_task(addin_server.serve()) self.servers[server_name] = future_or_task if self._enable_web: - self._wapi = WebAPI( + web_server = WebServer( service=self._service, + host=self._host, + port=self._web_port, state_manager=self._state_manager, + data_service_observer=self._observer, **self._kwargs, ) - web_server = uvicorn.Server( - uvicorn.Config( - self._wapi.fastapi_app, host=self._host, port=self._web_port - ) - ) - - def sio_callback( - full_access_path: str, value: Any, cached_value_dict: dict[str, Any] - ) -> None: - if cached_value_dict != {}: - serialized_value = dump(value) - if cached_value_dict["type"] != "method": - cached_value_dict["type"] = serialized_value["type"] - - cached_value_dict["value"] = serialized_value["value"] - - async def notify() -> None: - try: - await self._wapi.sio.emit( - "notify", - { - "data": { - "full_access_path": full_access_path, - "value": cached_value_dict, - } - }, - ) - except Exception as e: - logger.warning("Failed to send notification: %s", e) - - self._loop.create_task(notify()) - - self._observer.add_notification_callback(sio_callback) - - # overwrite uvicorn's signal handlers, otherwise it will bogart SIGINT and - # SIGTERM, which makes it impossible to escape out of - web_server.install_signal_handlers = lambda: None # type: ignore[method-assign] future_or_task = self._loop.create_task(web_server.serve()) self.servers["web"] = future_or_task diff --git a/src/pydase/server/web_server/__init__.py b/src/pydase/server/web_server/__init__.py new file mode 100644 index 0000000..993b87d --- /dev/null +++ b/src/pydase/server/web_server/__init__.py @@ -0,0 +1,3 @@ +from pydase.server.web_server.web_server import WebServer + +__all__ = ["WebServer"] diff --git a/src/pydase/server/web_server.py b/src/pydase/server/web_server/sio_server_wrapper.py similarity index 56% rename from src/pydase/server/web_server.py rename to src/pydase/server/web_server/sio_server_wrapper.py index 1787bc6..f7a8900 100644 --- a/src/pydase/server/web_server.py +++ b/src/pydase/server/web_server/sio_server_wrapper.py @@ -1,19 +1,14 @@ +import asyncio import logging -from pathlib import Path from typing import Any, TypedDict -import socketio # type: ignore[import-untyped] -from fastapi import FastAPI -from fastapi.middleware.cors import CORSMiddleware -from fastapi.responses import FileResponse -from fastapi.staticfiles import StaticFiles +import socketio -from pydase import DataService from pydase.data_service.data_service import process_callable_attribute -from pydase.data_service.state_manager import StateManager +from pydase.data_service.data_service_observer import DataServiceObserver from pydase.utils.helpers import get_object_attr_from_path_list from pydase.utils.logging import SocketIOHandler -from pydase.version import __version__ +from pydase.utils.serializer import dump logger = logging.getLogger(__name__) @@ -94,64 +89,53 @@ class UpdateWebSettingsDict(TypedDict): value: Any -class WebAPI: - __sio_app: socketio.ASGIApp - __fastapi_app: FastAPI - - def __init__( # noqa: PLR0913 +class SioServerWrapper: + def __init__( self, - service: DataService, - state_manager: StateManager, - frontend: str | Path | None = None, - css: str | Path | None = None, - enable_cors: bool = True, - *args: Any, - **kwargs: Any, + observer: DataServiceObserver, + enable_cors: bool, + loop: asyncio.AbstractEventLoop, ) -> None: - self.service = service - self.state_manager = state_manager - self.frontend = frontend - self.css = css - self.enable_cors = enable_cors - self.args = args - self.kwargs = kwargs + self._loop = loop + self._enable_cors = enable_cors + self._observer = observer + self._state_manager = self._observer.state_manager + self._service = self._state_manager.service - self.setup_socketio() - self.setup_fastapi_app() - self.setup_logging_handler() + self._setup_sio() + self._setup_logging_handler() - def setup_logging_handler(self) -> None: + def _setup_logging_handler(self) -> None: logger = logging.getLogger() - logger.addHandler(SocketIOHandler(self.__sio)) + logger.addHandler(SocketIOHandler(self.sio)) - def setup_socketio(self) -> None: - # the socketio ASGI app, to notify clients when params update - if self.enable_cors: - sio = socketio.AsyncServer(async_mode="asgi", cors_allowed_origins="*") + def _setup_sio(self) -> None: + if self._enable_cors: + self.sio = socketio.AsyncServer(async_mode="asgi", cors_allowed_origins="*") else: - sio = socketio.AsyncServer(async_mode="asgi") + self.sio = socketio.AsyncServer(async_mode="asgi") - @sio.event + @self.sio.event def set_attribute(sid: str, data: UpdateDict) -> Any: logger.debug("Received frontend update: %s", data) path_list = [*data["parent_path"].split("."), data["name"]] path_list.remove("DataService") # always at the start, does not do anything path = ".".join(path_list) - return self.state_manager.set_service_attribute_value_by_path( + return self._state_manager.set_service_attribute_value_by_path( path=path, value=data["value"] ) - @sio.event + @self.sio.event def run_method(sid: str, data: RunMethodDict) -> Any: logger.debug("Running method: %s", data) path_list = [*data["parent_path"].split("."), data["name"]] path_list.remove("DataService") # always at the start, does not do anything - method = get_object_attr_from_path_list(self.service, path_list) + method = get_object_attr_from_path_list(self._service, path_list) return process_callable_attribute(method, data["kwargs"]) - @sio.event # type: ignore + @self.sio.event def web_settings(sid: str, data: UpdateWebSettingsDict) -> Any: - logger.debug(f"Received web settings update: {data}") + logger.debug("Received web settings update: %s", data) path_list, config_option, value = ( data["access_path"].split("."), data["config_option"], @@ -160,55 +144,33 @@ class WebAPI: path_list.pop(0) # remove first entry (specifies root object, not needed) # write to web-settings.json file - self.__sio = sio - self.__sio_app = socketio.ASGIApp(self.__sio) + self._add_notification_callback_to_observer() - def setup_fastapi_app(self) -> None: - app = FastAPI() + def _add_notification_callback_to_observer(self) -> None: + def sio_callback( + full_access_path: str, value: Any, cached_value_dict: dict[str, Any] + ) -> None: + if cached_value_dict != {}: + serialized_value = dump(value) + if cached_value_dict["type"] != "method": + cached_value_dict["type"] = serialized_value["type"] - if self.enable_cors: - app.add_middleware( - CORSMiddleware, - allow_credentials=True, - allow_origins=["*"], - allow_methods=["*"], - allow_headers=["*"], - ) - app.mount("/ws", self.__sio_app) + cached_value_dict["value"] = serialized_value["value"] - @app.get("/version") - def version() -> str: - return __version__ + async def notify() -> None: + try: + await self.sio.emit( + "notify", + { + "data": { + "full_access_path": full_access_path, + "value": cached_value_dict, + } + }, + ) + except Exception as e: + logger.warning("Failed to send notification: %s", e) - @app.get("/name") - def name() -> str: - return self.service.get_service_name() + self._loop.create_task(notify()) - @app.get("/service-properties") - def service_properties() -> dict[str, Any]: - return self.state_manager.cache - - # exposing custom.css file provided by user - if self.css is not None: - - @app.get("/custom.css") - async def styles() -> FileResponse: - return FileResponse(str(self.css)) - - app.mount( - "/", - StaticFiles( - directory=Path(__file__).parent.parent / "frontend", - html=True, - ), - ) - - self.__fastapi_app = app - - @property - def sio(self) -> socketio.AsyncServer: - return self.__sio - - @property - def fastapi_app(self) -> FastAPI: - return self.__fastapi_app + self._observer.add_notification_callback(sio_callback) diff --git a/src/pydase/server/web_server/web_server.py b/src/pydase/server/web_server/web_server.py new file mode 100644 index 0000000..91df4e3 --- /dev/null +++ b/src/pydase/server/web_server/web_server.py @@ -0,0 +1,134 @@ +import asyncio +import logging +from pathlib import Path +from typing import Any + +import socketio # type: ignore[import-untyped] +import uvicorn +from fastapi import FastAPI +from fastapi.middleware.cors import CORSMiddleware +from fastapi.responses import FileResponse +from fastapi.staticfiles import StaticFiles + +from pydase import DataService +from pydase.data_service.data_service_observer import DataServiceObserver +from pydase.data_service.state_manager import StateManager +from pydase.server.web_server.sio_server import SioServerWrapper +from pydase.version import __version__ + +logger = logging.getLogger(__name__) + + +class WebServer: + """ + A Protocol that defines the interface for additional servers. + + This protocol sets the standard for how additional servers should be implemented + to ensure compatibility with the main Server class. The protocol requires that + any server implementing it should have an __init__ method for initialization and a + serve method for starting the server. + + Parameters: + ----------- + service: DataService + The instance of DataService that the server will use. This could be the main + application or a specific service that the server will provide. + + port: int + The port number at which the server will be accessible. This should be a valid + port number, typically in the range 1024-65535. + + host: str + The hostname or IP address at which the server will be hosted. This could be a + local address (like '127.0.0.1' for localhost) or a public IP address. + + state_manager: StateManager + The state manager managing the state cache and persistence of the exposed + service. + + **kwargs: Any + Any additional parameters required for initializing the server. These parameters + are specific to the server's implementation. + """ + + def __init__( # noqa: PLR0913 + self, + service: DataService, + host: str, + port: int, + state_manager: StateManager, + data_service_observer: DataServiceObserver, + css: str | Path | None = None, + enable_cors: bool = True, + **kwargs: Any, + ) -> None: + self.service = service + self.state_manager = state_manager + self.port = port + self.host = host + self.css = css + self.enable_cors = enable_cors + self.observer = data_service_observer + self._loop: asyncio.AbstractEventLoop + + self.setup_fastapi_app() + self.web_server = uvicorn.Server( + uvicorn.Config(self.__fastapi_app, host=self.host, port=self.port) + ) + # overwrite uvicorn's signal handlers, otherwise it will bogart SIGINT and + # SIGTERM, which makes it impossible to escape out of + self.web_server.install_signal_handlers = lambda: None # type: ignore[method-assign] + + async def serve(self) -> Any: + """Starts the server. This method should be implemented as an asynchronous + method, which means that it should be able to run concurrently with other tasks. + """ + self._loop = asyncio.get_running_loop() + self.setup_socketio() + await self.web_server.serve() + + def setup_socketio(self) -> None: + self.__sio = SioServerWrapper(self.observer, self.enable_cors, self._loop).sio + self.__sio_app = socketio.ASGIApp(self.__sio) + + def setup_fastapi_app(self) -> None: + app = FastAPI() + + if self.enable_cors: + app.add_middleware( + CORSMiddleware, + allow_credentials=True, + allow_origins=["*"], + allow_methods=["*"], + allow_headers=["*"], + ) + app.mount("/ws", self.__sio_app) + + @app.get("/version") + def version() -> str: + return __version__ + + @app.get("/name") + def name() -> str: + return self.service.get_service_name() + + @app.get("/service-properties") + def service_properties() -> dict[str, Any]: + return self.state_manager.cache + + # exposing custom.css file provided by user + if self.css is not None: + + @app.get("/custom.css") + async def styles() -> FileResponse: + return FileResponse(str(self.css)) + + app.mount( + "/", + StaticFiles( + directory=Path(__file__).parent.parent / "frontend", + html=True, + ), + ) + + self.__fastapi_app = app