mirror of
https://github.com/tiqi-group/pydase.git
synced 2025-06-05 21:20:40 +02:00
feat: adding socketio frontend handler to web server
This commit is contained in:
parent
ca4bb85be0
commit
4d929afa79
@ -1,16 +1,22 @@
|
|||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Any
|
from typing import Any, TypedDict
|
||||||
|
|
||||||
import socketio
|
import socketio
|
||||||
from fastapi import FastAPI
|
from fastapi import FastAPI
|
||||||
from fastapi.middleware.cors import CORSMiddleware
|
from fastapi.middleware.cors import CORSMiddleware
|
||||||
from fastapi.staticfiles import StaticFiles
|
from fastapi.staticfiles import StaticFiles
|
||||||
|
from loguru import logger
|
||||||
|
|
||||||
from pyDataInterface import DataService
|
from pyDataInterface import DataService
|
||||||
from pyDataInterface.config import OperationMode
|
from pyDataInterface.config import OperationMode
|
||||||
from pyDataInterface.version import __version__
|
from pyDataInterface.version import __version__
|
||||||
|
|
||||||
|
|
||||||
|
class FrontendUpdate(TypedDict):
|
||||||
|
name: str
|
||||||
|
value: Any
|
||||||
|
|
||||||
|
|
||||||
class WebAPI:
|
class WebAPI:
|
||||||
__sio_app: socketio.ASGIApp
|
__sio_app: socketio.ASGIApp
|
||||||
__fastapi_app: FastAPI
|
__fastapi_app: FastAPI
|
||||||
@ -39,11 +45,16 @@ class WebAPI:
|
|||||||
def setup_socketio(self) -> None:
|
def setup_socketio(self) -> None:
|
||||||
# the socketio ASGI app, to notify clients when params update
|
# the socketio ASGI app, to notify clients when params update
|
||||||
if self.enable_CORS:
|
if self.enable_CORS:
|
||||||
self.__sio = socketio.AsyncServer(
|
sio = socketio.AsyncServer(async_mode="asgi", cors_allowed_origins="*")
|
||||||
async_mode="asgi", cors_allowed_origins="*"
|
|
||||||
)
|
|
||||||
else:
|
else:
|
||||||
self.__sio = socketio.AsyncServer(async_mode="asgi")
|
sio = socketio.AsyncServer(async_mode="asgi")
|
||||||
|
|
||||||
|
@sio.on("frontend_update") # type: ignore
|
||||||
|
def handle_frontend_update(sid: str, data: FrontendUpdate) -> None:
|
||||||
|
logger.debug(f"Received frontend update: {data}")
|
||||||
|
setattr(self.service, data["name"], data["value"])
|
||||||
|
|
||||||
|
self.__sio = sio
|
||||||
self.__sio_app = socketio.ASGIApp(self.__sio)
|
self.__sio_app = socketio.ASGIApp(self.__sio)
|
||||||
|
|
||||||
def setup_fastapi_app(self) -> None: # noqa: CFQ004
|
def setup_fastapi_app(self) -> None: # noqa: CFQ004
|
||||||
|
Loading…
x
Reference in New Issue
Block a user