mirror of
https://github.com/tiqi-group/pydase.git
synced 2025-04-21 00:40:01 +02:00
fixing linting issues in server.py
This commit is contained in:
parent
95dd12bf7f
commit
1b11e88d5a
@ -7,7 +7,6 @@ from types import FrameType
|
||||
from typing import Any, Optional
|
||||
|
||||
import uvicorn
|
||||
from fastapi import FastAPI
|
||||
from loguru import logger
|
||||
from rpyc import (
|
||||
ForkingServer, # can be used for multiprocessing, E.g. a database interface server
|
||||
@ -28,7 +27,7 @@ except ImportError:
|
||||
|
||||
|
||||
class Server:
|
||||
def __init__(
|
||||
def __init__( # noqa: CFQ002
|
||||
self,
|
||||
service: DataService,
|
||||
host: str = "0.0.0.0",
|
||||
@ -40,7 +39,6 @@ class Server:
|
||||
enable_web: bool = True,
|
||||
use_forking_server: bool = False,
|
||||
web_settings: dict[str, Any] = {},
|
||||
*args: Any,
|
||||
**kwargs: Any,
|
||||
) -> None:
|
||||
self._service = service
|
||||
@ -52,12 +50,10 @@ class Server:
|
||||
self._enable_tiqi_rpc = enable_tiqi_rpc
|
||||
self._enable_web = enable_web
|
||||
self._web_settings = web_settings
|
||||
self._args = args
|
||||
self._kwargs = kwargs
|
||||
self._loop: asyncio.AbstractEventLoop
|
||||
self._rpc_server_type = ForkingServer if use_forking_server else ThreadedServer
|
||||
self.should_exit = False
|
||||
# self.servers: list[asyncio.Future[Any]] = []
|
||||
self.servers: dict[str, asyncio.Future[Any]] = {}
|
||||
self.executor: ThreadPoolExecutor | None = None
|
||||
self._info: dict[str, Any] = {
|
||||
@ -97,7 +93,7 @@ class Server:
|
||||
def _start_autostart_tasks(self) -> None:
|
||||
self._service._start_autostart_tasks()
|
||||
|
||||
async def startup(self) -> None:
|
||||
async def startup(self) -> None: # noqa: C901
|
||||
self._loop = asyncio.get_running_loop()
|
||||
self._loop.set_exception_handler(self.custom_exception_handler)
|
||||
self.install_signal_handlers()
|
||||
@ -119,9 +115,7 @@ class Server:
|
||||
self.servers["rpyc"] = future_or_task
|
||||
if self._enable_tiqi_rpc and tiqi_rpc is not None:
|
||||
tiqi_rpc_server = tiqi_rpc.Server(
|
||||
RPCInterface(
|
||||
self._data_model, *self._args, info=self._info, **self._kwargs
|
||||
),
|
||||
RPCInterface(self._data_model, info=self._info, **self._kwargs),
|
||||
host=self._host,
|
||||
port=self._rpc_port,
|
||||
)
|
||||
@ -130,7 +124,6 @@ class Server:
|
||||
self.servers["tiqi-rpc"] = future_or_task
|
||||
if self._enable_web:
|
||||
self._wapi: WebAPI = WebAPI(
|
||||
*self._args,
|
||||
service=self._service,
|
||||
info=self._info,
|
||||
**self._kwargs,
|
||||
@ -148,7 +141,7 @@ class Server:
|
||||
# > TypeError: Object of type list is not JSON serializable
|
||||
async def notify() -> None:
|
||||
try:
|
||||
await self._wapi.sio.emit(
|
||||
await self._wapi.sio.emit( # type: ignore
|
||||
"notify",
|
||||
{
|
||||
"data": {
|
||||
@ -239,7 +232,7 @@ class Server:
|
||||
if self._enable_web:
|
||||
|
||||
async def emit_exception() -> None:
|
||||
await self._wapi.sio.emit(
|
||||
await self._wapi.sio.emit( # type: ignore
|
||||
"notify",
|
||||
{
|
||||
"data": {
|
||||
|
Loading…
x
Reference in New Issue
Block a user