creates web_server module with WebServer complying with AdditionalServerProtocol

This commit is contained in:
Mose Müller 2023-12-19 10:58:18 +01:00
parent 055acbe591
commit 3186e04cc1
5 changed files with 204 additions and 137 deletions

View File

@ -1,3 +1,7 @@
from pydase.server.server import Server from pydase.server.server import Server
from pydase.server.web_server.web_server import WebServer
__all__ = ["Server"] __all__ = [
"Server",
"WebServer",
]

View File

@ -8,16 +8,13 @@ from pathlib import Path
from types import FrameType from types import FrameType
from typing import Any, Protocol, TypedDict from typing import Any, Protocol, TypedDict
import uvicorn
from rpyc import ForkingServer, ThreadedServer # type: ignore[import-untyped] from rpyc import ForkingServer, ThreadedServer # type: ignore[import-untyped]
from uvicorn.server import HANDLED_SIGNALS from uvicorn.server import HANDLED_SIGNALS
from pydase import DataService from pydase import DataService
from pydase.data_service.data_service_observer import DataServiceObserver from pydase.data_service.data_service_observer import DataServiceObserver
from pydase.data_service.state_manager import StateManager from pydase.data_service.state_manager import StateManager
from pydase.utils.serializer import dump from pydase.server.web_server import WebServer
from .web_server import WebAPI
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -230,7 +227,7 @@ class Server:
logger.info("Finished server process [%s]", process_id) logger.info("Finished server process [%s]", process_id)
async def startup(self) -> None: # noqa: C901 async def startup(self) -> None:
self._loop = asyncio.get_running_loop() self._loop = asyncio.get_running_loop()
self._loop.set_exception_handler(self.custom_exception_handler) self._loop.set_exception_handler(self.custom_exception_handler)
self.install_signal_handlers() self.install_signal_handlers()
@ -252,10 +249,11 @@ class Server:
self.servers["rpyc"] = future_or_task self.servers["rpyc"] = future_or_task
for server in self._additional_servers: for server in self._additional_servers:
addin_server = server["server"]( addin_server = server["server"](
self._service, service=self._service,
port=server["port"],
host=self._host, host=self._host,
port=server["port"],
state_manager=self._state_manager, state_manager=self._state_manager,
data_service_observer=self._observer,
**server["kwargs"], **server["kwargs"],
) )
@ -266,48 +264,14 @@ class Server:
future_or_task = self._loop.create_task(addin_server.serve()) future_or_task = self._loop.create_task(addin_server.serve())
self.servers[server_name] = future_or_task self.servers[server_name] = future_or_task
if self._enable_web: if self._enable_web:
self._wapi = WebAPI( web_server = WebServer(
service=self._service, service=self._service,
host=self._host,
port=self._web_port,
state_manager=self._state_manager, state_manager=self._state_manager,
data_service_observer=self._observer,
**self._kwargs, **self._kwargs,
) )
web_server = uvicorn.Server(
uvicorn.Config(
self._wapi.fastapi_app, host=self._host, port=self._web_port
)
)
def sio_callback(
full_access_path: str, value: Any, cached_value_dict: dict[str, Any]
) -> None:
if cached_value_dict != {}:
serialized_value = dump(value)
if cached_value_dict["type"] != "method":
cached_value_dict["type"] = serialized_value["type"]
cached_value_dict["value"] = serialized_value["value"]
async def notify() -> None:
try:
await self._wapi.sio.emit(
"notify",
{
"data": {
"full_access_path": full_access_path,
"value": cached_value_dict,
}
},
)
except Exception as e:
logger.warning("Failed to send notification: %s", e)
self._loop.create_task(notify())
self._observer.add_notification_callback(sio_callback)
# overwrite uvicorn's signal handlers, otherwise it will bogart SIGINT and
# SIGTERM, which makes it impossible to escape out of
web_server.install_signal_handlers = lambda: None # type: ignore[method-assign]
future_or_task = self._loop.create_task(web_server.serve()) future_or_task = self._loop.create_task(web_server.serve())
self.servers["web"] = future_or_task self.servers["web"] = future_or_task

View File

@ -0,0 +1,3 @@
from pydase.server.web_server.web_server import WebServer
__all__ = ["WebServer"]

View File

@ -1,19 +1,14 @@
import asyncio
import logging import logging
from pathlib import Path
from typing import Any, TypedDict from typing import Any, TypedDict
import socketio # type: ignore[import-untyped] import socketio
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles
from pydase import DataService
from pydase.data_service.data_service import process_callable_attribute from pydase.data_service.data_service import process_callable_attribute
from pydase.data_service.state_manager import StateManager from pydase.data_service.data_service_observer import DataServiceObserver
from pydase.utils.helpers import get_object_attr_from_path_list from pydase.utils.helpers import get_object_attr_from_path_list
from pydase.utils.logging import SocketIOHandler from pydase.utils.logging import SocketIOHandler
from pydase.version import __version__ from pydase.utils.serializer import dump
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -94,64 +89,53 @@ class UpdateWebSettingsDict(TypedDict):
value: Any value: Any
class WebAPI: class SioServerWrapper:
__sio_app: socketio.ASGIApp def __init__(
__fastapi_app: FastAPI
def __init__( # noqa: PLR0913
self, self,
service: DataService, observer: DataServiceObserver,
state_manager: StateManager, enable_cors: bool,
frontend: str | Path | None = None, loop: asyncio.AbstractEventLoop,
css: str | Path | None = None,
enable_cors: bool = True,
*args: Any,
**kwargs: Any,
) -> None: ) -> None:
self.service = service self._loop = loop
self.state_manager = state_manager self._enable_cors = enable_cors
self.frontend = frontend self._observer = observer
self.css = css self._state_manager = self._observer.state_manager
self.enable_cors = enable_cors self._service = self._state_manager.service
self.args = args
self.kwargs = kwargs
self.setup_socketio() self._setup_sio()
self.setup_fastapi_app() self._setup_logging_handler()
self.setup_logging_handler()
def setup_logging_handler(self) -> None: def _setup_logging_handler(self) -> None:
logger = logging.getLogger() logger = logging.getLogger()
logger.addHandler(SocketIOHandler(self.__sio)) logger.addHandler(SocketIOHandler(self.sio))
def setup_socketio(self) -> None: def _setup_sio(self) -> None:
# the socketio ASGI app, to notify clients when params update if self._enable_cors:
if self.enable_cors: self.sio = socketio.AsyncServer(async_mode="asgi", cors_allowed_origins="*")
sio = socketio.AsyncServer(async_mode="asgi", cors_allowed_origins="*")
else: else:
sio = socketio.AsyncServer(async_mode="asgi") self.sio = socketio.AsyncServer(async_mode="asgi")
@sio.event @self.sio.event
def set_attribute(sid: str, data: UpdateDict) -> Any: def set_attribute(sid: str, data: UpdateDict) -> Any:
logger.debug("Received frontend update: %s", data) logger.debug("Received frontend update: %s", data)
path_list = [*data["parent_path"].split("."), data["name"]] path_list = [*data["parent_path"].split("."), data["name"]]
path_list.remove("DataService") # always at the start, does not do anything path_list.remove("DataService") # always at the start, does not do anything
path = ".".join(path_list) path = ".".join(path_list)
return self.state_manager.set_service_attribute_value_by_path( return self._state_manager.set_service_attribute_value_by_path(
path=path, value=data["value"] path=path, value=data["value"]
) )
@sio.event @self.sio.event
def run_method(sid: str, data: RunMethodDict) -> Any: def run_method(sid: str, data: RunMethodDict) -> Any:
logger.debug("Running method: %s", data) logger.debug("Running method: %s", data)
path_list = [*data["parent_path"].split("."), data["name"]] path_list = [*data["parent_path"].split("."), data["name"]]
path_list.remove("DataService") # always at the start, does not do anything path_list.remove("DataService") # always at the start, does not do anything
method = get_object_attr_from_path_list(self.service, path_list) method = get_object_attr_from_path_list(self._service, path_list)
return process_callable_attribute(method, data["kwargs"]) return process_callable_attribute(method, data["kwargs"])
@sio.event # type: ignore @self.sio.event
def web_settings(sid: str, data: UpdateWebSettingsDict) -> Any: def web_settings(sid: str, data: UpdateWebSettingsDict) -> Any:
logger.debug(f"Received web settings update: {data}") logger.debug("Received web settings update: %s", data)
path_list, config_option, value = ( path_list, config_option, value = (
data["access_path"].split("."), data["access_path"].split("."),
data["config_option"], data["config_option"],
@ -160,55 +144,33 @@ class WebAPI:
path_list.pop(0) # remove first entry (specifies root object, not needed) path_list.pop(0) # remove first entry (specifies root object, not needed)
# write to web-settings.json file # write to web-settings.json file
self.__sio = sio self._add_notification_callback_to_observer()
self.__sio_app = socketio.ASGIApp(self.__sio)
def setup_fastapi_app(self) -> None: def _add_notification_callback_to_observer(self) -> None:
app = FastAPI() def sio_callback(
full_access_path: str, value: Any, cached_value_dict: dict[str, Any]
) -> None:
if cached_value_dict != {}:
serialized_value = dump(value)
if cached_value_dict["type"] != "method":
cached_value_dict["type"] = serialized_value["type"]
if self.enable_cors: cached_value_dict["value"] = serialized_value["value"]
app.add_middleware(
CORSMiddleware, async def notify() -> None:
allow_credentials=True, try:
allow_origins=["*"], await self.sio.emit(
allow_methods=["*"], "notify",
allow_headers=["*"], {
"data": {
"full_access_path": full_access_path,
"value": cached_value_dict,
}
},
) )
app.mount("/ws", self.__sio_app) except Exception as e:
logger.warning("Failed to send notification: %s", e)
@app.get("/version") self._loop.create_task(notify())
def version() -> str:
return __version__
@app.get("/name") self._observer.add_notification_callback(sio_callback)
def name() -> str:
return self.service.get_service_name()
@app.get("/service-properties")
def service_properties() -> dict[str, Any]:
return self.state_manager.cache
# exposing custom.css file provided by user
if self.css is not None:
@app.get("/custom.css")
async def styles() -> FileResponse:
return FileResponse(str(self.css))
app.mount(
"/",
StaticFiles(
directory=Path(__file__).parent.parent / "frontend",
html=True,
),
)
self.__fastapi_app = app
@property
def sio(self) -> socketio.AsyncServer:
return self.__sio
@property
def fastapi_app(self) -> FastAPI:
return self.__fastapi_app

View File

@ -0,0 +1,134 @@
import asyncio
import logging
from pathlib import Path
from typing import Any
import socketio # type: ignore[import-untyped]
import uvicorn
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles
from pydase import DataService
from pydase.data_service.data_service_observer import DataServiceObserver
from pydase.data_service.state_manager import StateManager
from pydase.server.web_server.sio_server import SioServerWrapper
from pydase.version import __version__
logger = logging.getLogger(__name__)
class WebServer:
"""
A Protocol that defines the interface for additional servers.
This protocol sets the standard for how additional servers should be implemented
to ensure compatibility with the main Server class. The protocol requires that
any server implementing it should have an __init__ method for initialization and a
serve method for starting the server.
Parameters:
-----------
service: DataService
The instance of DataService that the server will use. This could be the main
application or a specific service that the server will provide.
port: int
The port number at which the server will be accessible. This should be a valid
port number, typically in the range 1024-65535.
host: str
The hostname or IP address at which the server will be hosted. This could be a
local address (like '127.0.0.1' for localhost) or a public IP address.
state_manager: StateManager
The state manager managing the state cache and persistence of the exposed
service.
**kwargs: Any
Any additional parameters required for initializing the server. These parameters
are specific to the server's implementation.
"""
def __init__( # noqa: PLR0913
self,
service: DataService,
host: str,
port: int,
state_manager: StateManager,
data_service_observer: DataServiceObserver,
css: str | Path | None = None,
enable_cors: bool = True,
**kwargs: Any,
) -> None:
self.service = service
self.state_manager = state_manager
self.port = port
self.host = host
self.css = css
self.enable_cors = enable_cors
self.observer = data_service_observer
self._loop: asyncio.AbstractEventLoop
self.setup_fastapi_app()
self.web_server = uvicorn.Server(
uvicorn.Config(self.__fastapi_app, host=self.host, port=self.port)
)
# overwrite uvicorn's signal handlers, otherwise it will bogart SIGINT and
# SIGTERM, which makes it impossible to escape out of
self.web_server.install_signal_handlers = lambda: None # type: ignore[method-assign]
async def serve(self) -> Any:
"""Starts the server. This method should be implemented as an asynchronous
method, which means that it should be able to run concurrently with other tasks.
"""
self._loop = asyncio.get_running_loop()
self.setup_socketio()
await self.web_server.serve()
def setup_socketio(self) -> None:
self.__sio = SioServerWrapper(self.observer, self.enable_cors, self._loop).sio
self.__sio_app = socketio.ASGIApp(self.__sio)
def setup_fastapi_app(self) -> None:
app = FastAPI()
if self.enable_cors:
app.add_middleware(
CORSMiddleware,
allow_credentials=True,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
app.mount("/ws", self.__sio_app)
@app.get("/version")
def version() -> str:
return __version__
@app.get("/name")
def name() -> str:
return self.service.get_service_name()
@app.get("/service-properties")
def service_properties() -> dict[str, Any]:
return self.state_manager.cache
# exposing custom.css file provided by user
if self.css is not None:
@app.get("/custom.css")
async def styles() -> FileResponse:
return FileResponse(str(self.css))
app.mount(
"/",
StaticFiles(
directory=Path(__file__).parent.parent / "frontend",
html=True,
),
)
self.__fastapi_app = app