feat: adding support for additional servers, removing tiqi_rpc dependency

This commit is contained in:
Mose Müller 2023-08-02 12:06:22 +02:00
parent d4c72d7026
commit 93b7e25e57

View File

@ -5,12 +5,12 @@ import threading
from concurrent.futures import ThreadPoolExecutor
from enum import Enum
from types import FrameType
from typing import Any, Optional
from typing import Any, Optional, Protocol, TypedDict
import uvicorn
from loguru import logger
from rpyc import (
ForkingServer, # can be used for multiprocessing, E.g. a database interface server
ForkingServer, # can be used for multiprocessing, e.g. a database interface server
)
from rpyc import ThreadedServer
from uvicorn.server import HANDLED_SIGNALS
@ -20,11 +20,24 @@ from pyDataService.version import __version__
from .web_server import WebAPI
try:
import tiqi_rpc
except ImportError:
logger.debug("tiqi_rpc is not installed. tiqi_rpc server will not be exposed.")
tiqi_rpc = None # type: ignore
class AdditionalServerProtocol(Protocol):
def __init__(
self, service: DataService, port: int, host: str, **kwargs: Any
) -> None:
...
async def serve(self) -> Any:
...
def install_signal_handlers(self) -> None:
...
class AdditionalServer(TypedDict):
server: type[AdditionalServerProtocol]
port: int
kwargs: dict[str, Any]
class Server:
@ -33,27 +46,25 @@ class Server:
service: DataService,
host: str = "0.0.0.0",
rpc_port: int = 18871,
tiqi_rpc_port: int = 6007,
web_port: int = 8001,
enable_rpc: bool = True,
enable_tiqi_rpc: bool = True,
enable_web: bool = True,
use_forking_server: bool = False,
web_settings: dict[str, Any] = {},
additional_servers: list[AdditionalServer] = [],
**kwargs: Any,
) -> None:
self._service = service
self._host = host
self._rpc_port = rpc_port
self._tiqi_rpc_port = tiqi_rpc_port
self._web_port = web_port
self._enable_rpc = enable_rpc
self._enable_tiqi_rpc = enable_tiqi_rpc
self._enable_web = enable_web
self._web_settings = web_settings
self._kwargs = kwargs
self._loop: asyncio.AbstractEventLoop
self._rpc_server_type = ForkingServer if use_forking_server else ThreadedServer
self._additional_servers = additional_servers
self.should_exit = False
self.servers: dict[str, asyncio.Future[Any]] = {}
self.executor: ThreadPoolExecutor | None = None
@ -61,12 +72,11 @@ class Server:
"name": self._service.get_service_name(),
"version": __version__,
"rpc_port": self._rpc_port,
"tiqi_rpc_port": self._tiqi_rpc_port,
"web_port": self._web_port,
"enable_rpc": self._enable_rpc,
"enable_tiqi_rpc": self._enable_tiqi_rpc,
"enable_web": self._enable_web,
"web_settings": self._web_settings,
"additional_servers": [],
**kwargs,
}
@ -115,15 +125,36 @@ class Server:
executor=self.executor, func=self._rpc_server.start
)
self.servers["rpyc"] = future_or_task
if self._enable_tiqi_rpc and tiqi_rpc is not None:
tiqi_rpc_server = tiqi_rpc.Server(
RPCInterface(self._service, info=self._info, **self._kwargs),
for server in self._additional_servers:
addin_server = server["server"](
self._service,
port=server["port"],
host=self._host,
port=self._rpc_port,
info=self._info,
**server["kwargs"],
)
tiqi_rpc_server.install_signal_handlers = lambda: None # type: ignore
future_or_task = self._loop.create_task(tiqi_rpc_server.serve())
self.servers["tiqi-rpc"] = future_or_task
try:
addin_server.install_signal_handlers = lambda: None # type: ignore
except Exception:
logger.debug(
"Additional server does not have a method called "
"'install_signal_handlers'."
)
server_name = (
addin_server.__module__ + "." + addin_server.__class__.__name__
)
self._info["additional_servers"].append(
{
"name": server_name,
"port": server["port"],
"host": self._host,
**server["kwargs"],
}
)
future_or_task = self._loop.create_task(addin_server.serve())
self.servers[server_name] = future_or_task
if self._enable_web:
self._wapi: WebAPI = WebAPI(
service=self._service,
@ -216,7 +247,7 @@ class Server:
try:
for sig in HANDLED_SIGNALS:
self._loop.add_signal_handler(sig, self.handle_exit, sig, None)
except NotImplementedError: # pragma: no cover
except NotImplementedError:
# Windows
for sig in HANDLED_SIGNALS:
signal.signal(sig, self.handle_exit)