From 93b7e25e57d935a488c43c6ff47ccc403876822b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mose=20M=C3=BCller?= Date: Wed, 2 Aug 2023 12:06:22 +0200 Subject: [PATCH] feat: adding support for additional servers, removing tiqi_rpc dependency --- src/pyDataService/server/server.py | 73 +++++++++++++++++++++--------- 1 file changed, 52 insertions(+), 21 deletions(-) diff --git a/src/pyDataService/server/server.py b/src/pyDataService/server/server.py index 4cf75bc..b0ff86f 100644 --- a/src/pyDataService/server/server.py +++ b/src/pyDataService/server/server.py @@ -5,12 +5,12 @@ import threading from concurrent.futures import ThreadPoolExecutor from enum import Enum from types import FrameType -from typing import Any, Optional +from typing import Any, Optional, Protocol, TypedDict import uvicorn from loguru import logger from rpyc import ( - ForkingServer, # can be used for multiprocessing, E.g. a database interface server + ForkingServer, # can be used for multiprocessing, e.g. a database interface server ) from rpyc import ThreadedServer from uvicorn.server import HANDLED_SIGNALS @@ -20,11 +20,24 @@ from pyDataService.version import __version__ from .web_server import WebAPI -try: - import tiqi_rpc -except ImportError: - logger.debug("tiqi_rpc is not installed. tiqi_rpc server will not be exposed.") - tiqi_rpc = None # type: ignore + +class AdditionalServerProtocol(Protocol): + def __init__( + self, service: DataService, port: int, host: str, **kwargs: Any + ) -> None: + ... + + async def serve(self) -> Any: + ... + + def install_signal_handlers(self) -> None: + ... + + +class AdditionalServer(TypedDict): + server: type[AdditionalServerProtocol] + port: int + kwargs: dict[str, Any] class Server: @@ -33,27 +46,25 @@ class Server: service: DataService, host: str = "0.0.0.0", rpc_port: int = 18871, - tiqi_rpc_port: int = 6007, web_port: int = 8001, enable_rpc: bool = True, - enable_tiqi_rpc: bool = True, enable_web: bool = True, use_forking_server: bool = False, web_settings: dict[str, Any] = {}, + additional_servers: list[AdditionalServer] = [], **kwargs: Any, ) -> None: self._service = service self._host = host self._rpc_port = rpc_port - self._tiqi_rpc_port = tiqi_rpc_port self._web_port = web_port self._enable_rpc = enable_rpc - self._enable_tiqi_rpc = enable_tiqi_rpc self._enable_web = enable_web self._web_settings = web_settings self._kwargs = kwargs self._loop: asyncio.AbstractEventLoop self._rpc_server_type = ForkingServer if use_forking_server else ThreadedServer + self._additional_servers = additional_servers self.should_exit = False self.servers: dict[str, asyncio.Future[Any]] = {} self.executor: ThreadPoolExecutor | None = None @@ -61,12 +72,11 @@ class Server: "name": self._service.get_service_name(), "version": __version__, "rpc_port": self._rpc_port, - "tiqi_rpc_port": self._tiqi_rpc_port, "web_port": self._web_port, "enable_rpc": self._enable_rpc, - "enable_tiqi_rpc": self._enable_tiqi_rpc, "enable_web": self._enable_web, "web_settings": self._web_settings, + "additional_servers": [], **kwargs, } @@ -115,15 +125,36 @@ class Server: executor=self.executor, func=self._rpc_server.start ) self.servers["rpyc"] = future_or_task - if self._enable_tiqi_rpc and tiqi_rpc is not None: - tiqi_rpc_server = tiqi_rpc.Server( - RPCInterface(self._service, info=self._info, **self._kwargs), + for server in self._additional_servers: + addin_server = server["server"]( + self._service, + port=server["port"], host=self._host, - port=self._rpc_port, + info=self._info, + **server["kwargs"], ) - tiqi_rpc_server.install_signal_handlers = lambda: None # type: ignore - future_or_task = self._loop.create_task(tiqi_rpc_server.serve()) - self.servers["tiqi-rpc"] = future_or_task + try: + addin_server.install_signal_handlers = lambda: None # type: ignore + except Exception: + logger.debug( + "Additional server does not have a method called " + "'install_signal_handlers'." + ) + + server_name = ( + addin_server.__module__ + "." + addin_server.__class__.__name__ + ) + self._info["additional_servers"].append( + { + "name": server_name, + "port": server["port"], + "host": self._host, + **server["kwargs"], + } + ) + + future_or_task = self._loop.create_task(addin_server.serve()) + self.servers[server_name] = future_or_task if self._enable_web: self._wapi: WebAPI = WebAPI( service=self._service, @@ -216,7 +247,7 @@ class Server: try: for sig in HANDLED_SIGNALS: self._loop.add_signal_handler(sig, self.handle_exit, sig, None) - except NotImplementedError: # pragma: no cover + except NotImplementedError: # Windows for sig in HANDLED_SIGNALS: signal.signal(sig, self.handle_exit)