removes rpyc

This commit is contained in:
Mose Müller 2024-03-26 13:26:23 +01:00
parent 57e7deb552
commit 1ad917a423
3 changed files with 2 additions and 51 deletions

View File

@ -13,7 +13,6 @@ class OperationMode(BaseConfig): # type: ignore[misc]
class ServiceConfig(BaseConfig): # type: ignore[misc] class ServiceConfig(BaseConfig): # type: ignore[misc]
config_dir: Path = Path("config") config_dir: Path = Path("config")
web_port: int = 8001 web_port: int = 8001
rpc_port: int = 18871
CONFIG_SOURCES = EnvSource(allow_all=True, prefix="SERVICE_", file=".env") CONFIG_SOURCES = EnvSource(allow_all=True, prefix="SERVICE_", file=".env")

View File

@ -3,8 +3,6 @@ import logging
from enum import Enum from enum import Enum
from typing import Any, get_type_hints from typing import Any, get_type_hints
import rpyc # type: ignore[import-untyped]
import pydase.units as u import pydase.units as u
from pydase.data_service.abstract_data_service import AbstractDataService from pydase.data_service.abstract_data_service import AbstractDataService
from pydase.data_service.task_manager import TaskManager from pydase.data_service.task_manager import TaskManager
@ -35,7 +33,7 @@ def process_callable_attribute(attr: Any, args: dict[str, Any]) -> Any:
) )
class DataService(rpyc.Service, AbstractDataService): class DataService(AbstractDataService):
def __init__(self, **kwargs: Any) -> None: def __init__(self, **kwargs: Any) -> None:
super().__init__() super().__init__()
self._task_manager = TaskManager(self) self._task_manager = TaskManager(self)
@ -106,26 +104,6 @@ class DataService(rpyc.Service, AbstractDataService):
): ):
self.__warn_if_not_observable(attr_value) self.__warn_if_not_observable(attr_value)
def _rpyc_getattr(self, name: str) -> Any:
if name.startswith("_"):
# disallow special and private attributes
raise AttributeError("cannot access private/special names")
# allow all other attributes
return getattr(self, name)
def _rpyc_setattr(self, name: str, value: Any) -> None:
if name.startswith("_"):
# disallow special and private attributes
raise AttributeError("cannot access private/special names")
# check if the attribute has a setter method
attr = getattr(self, name, None)
if isinstance(attr, property) and attr.fset is None:
raise AttributeError(f"{name} attribute does not have a setter method")
# allow all other attributes
setattr(self, name, value)
def serialize(self) -> SerializedObject: def serialize(self) -> SerializedObject:
""" """
Serializes the instance into a dictionary, preserving the structure of the Serializes the instance into a dictionary, preserving the structure of the

View File

@ -3,12 +3,10 @@ import logging
import os import os
import signal import signal
import threading import threading
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path from pathlib import Path
from types import FrameType from types import FrameType
from typing import Any, Protocol, TypedDict from typing import Any, Protocol, TypedDict
from rpyc import ThreadedServer # type: ignore[import-untyped]
from uvicorn.server import HANDLED_SIGNALS from uvicorn.server import HANDLED_SIGNALS
from pydase import DataService from pydase import DataService
@ -51,8 +49,7 @@ class AdditionalServerProtocol(Protocol):
host: str, host: str,
port: int, port: int,
**kwargs: Any, **kwargs: Any,
) -> None: ) -> None: ...
...
async def serve(self) -> Any: async def serve(self) -> Any:
"""Starts the server. This method should be implemented as an asynchronous """Starts the server. This method should be implemented as an asynchronous
@ -154,9 +151,7 @@ class Server:
self, self,
service: DataService, service: DataService,
host: str = "0.0.0.0", host: str = "0.0.0.0",
rpc_port: int = ServiceConfig().rpc_port,
web_port: int = ServiceConfig().web_port, web_port: int = ServiceConfig().web_port,
enable_rpc: bool = True,
enable_web: bool = True, enable_web: bool = True,
filename: str | Path | None = None, filename: str | Path | None = None,
additional_servers: list[AdditionalServer] | None = None, additional_servers: list[AdditionalServer] | None = None,
@ -166,16 +161,13 @@ class Server:
additional_servers = [] additional_servers = []
self._service = service self._service = service
self._host = host self._host = host
self._rpc_port = rpc_port
self._web_port = web_port self._web_port = web_port
self._enable_rpc = enable_rpc
self._enable_web = enable_web self._enable_web = enable_web
self._kwargs = kwargs self._kwargs = kwargs
self._loop: asyncio.AbstractEventLoop self._loop: asyncio.AbstractEventLoop
self._additional_servers = additional_servers self._additional_servers = additional_servers
self.should_exit = False self.should_exit = False
self.servers: dict[str, asyncio.Future[Any]] = {} self.servers: dict[str, asyncio.Future[Any]] = {}
self.executor: ThreadPoolExecutor | None = None
self._state_manager = StateManager(self._service, filename) self._state_manager = StateManager(self._service, filename)
self._observer = DataServiceObserver(self._state_manager) self._observer = DataServiceObserver(self._state_manager)
self._state_manager.load_state() self._state_manager.load_state()
@ -207,20 +199,6 @@ class Server:
self.install_signal_handlers() self.install_signal_handlers()
self._service._task_manager.start_autostart_tasks() self._service._task_manager.start_autostart_tasks()
if self._enable_rpc:
self.executor = ThreadPoolExecutor()
self._rpc_server = ThreadedServer(
self._service,
port=self._rpc_port,
protocol_config={
"allow_all_attrs": True,
"allow_setattr": True,
},
)
future_or_task = self._loop.run_in_executor(
executor=self.executor, func=self._rpc_server.start
)
self.servers["rpyc"] = future_or_task
for server in self._additional_servers: for server in self._additional_servers:
addin_server = server["server"]( addin_server = server["server"](
data_service_observer=self._observer, data_service_observer=self._observer,
@ -258,10 +236,6 @@ class Server:
await self.__cancel_servers() await self.__cancel_servers()
await self.__cancel_tasks() await self.__cancel_tasks()
if hasattr(self, "_rpc_server") and self._enable_rpc:
logger.debug("Closing rpyc server.")
self._rpc_server.close()
async def __cancel_servers(self) -> None: async def __cancel_servers(self) -> None:
for server_name, task in self.servers.items(): for server_name, task in self.servers.items():
task.cancel() task.cancel()