mirror of
https://github.com/tiqi-group/pydase.git
synced 2025-12-19 04:31:19 +01:00
Compare commits
34 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
93d38651e8 | ||
|
|
72a3c199d9 | ||
|
|
7914e2fa7b | ||
|
|
0a4f898fde | ||
|
|
a9aa55fc99 | ||
|
|
fd5a230fa4 | ||
|
|
243b46aadb | ||
|
|
0f1ca84df5 | ||
|
|
6438a07305 | ||
|
|
80bfd209df | ||
|
|
e065b1fb22 | ||
|
|
977cee32b9 | ||
|
|
96f695020b | ||
|
|
33ce01865a | ||
|
|
f5374573cd | ||
|
|
43c6b5e817 | ||
|
|
37380c6d24 | ||
|
|
ae21656e83 | ||
|
|
a4b4f179c6 | ||
|
|
c6beca3961 | ||
|
|
2fa8240e54 | ||
|
|
369587a50c | ||
|
|
25343f6909 | ||
|
|
c136c9f3de | ||
|
|
8897c2fe4c | ||
|
|
80c5c4e99d | ||
|
|
423441a74c | ||
|
|
9ec60e3891 | ||
|
|
8bde104322 | ||
|
|
9b57b6984e | ||
|
|
e5b89f2581 | ||
|
|
ff1654e65c | ||
|
|
cded80c8e5 | ||
|
|
87a33b6293 |
2
.github/workflows/python-package.yml
vendored
2
.github/workflows/python-package.yml
vendored
@@ -16,7 +16,7 @@ jobs:
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
python-version: ["3.10", "3.11", "3.12"]
|
||||
python-version: ["3.10", "3.11", "3.12", "3.13"]
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
[](https://pypi.org/project/pydase/)
|
||||
[](https://pydase.readthedocs.io/en/stable/)
|
||||
[][License]
|
||||
[](https://doi.org/10.5281/zenodo.15703190)
|
||||
|
||||
`pydase` is a Python library that simplifies the creation of remote control interfaces for Python objects. It exposes the public attributes of a user-defined class via a [Socket.IO](https://python-socketio.readthedocs.io/en/stable/) web server, ensuring they are always in sync with the service state. You can interact with these attributes using an RPC client, a RESTful API, or a web browser. The web browser frontend is auto-generated, displaying components that correspond to each public attribute of the class for direct interaction.
|
||||
`pydase` implements an [observer pattern][Observer Pattern] to provide the real-time updates, ensuring that changes to the class attributes are reflected across all clients.
|
||||
|
||||
@@ -23,7 +23,39 @@ The proxy acts as a local representation of the remote service, enabling intuiti
|
||||
|
||||
The proxy class automatically synchronizes with the server's attributes and methods, keeping itself up-to-date with any changes. This dynamic synchronization essentially mirrors the server's API, making it feel like you're working with a local object.
|
||||
|
||||
### Accessing Services Behind Firewalls or SSH Gateways
|
||||
## Automatic Proxy Updates
|
||||
|
||||
By default, the client listens for attribute and structure changes from the server and dynamically updates its internal proxy representation. This ensures that value changes or newly added attributes on the server appear in the client proxy without requiring reconnection or manual refresh.
|
||||
|
||||
This is useful, for example, when [integrating the client into another service](#integrating-the-client-into-another-service). However, if you want to avoid this behavior (e.g., to reduce network traffic or avoid frequent re-syncing), you can disable it. When passing `auto_update_proxy=False` to the client, the proxy will not track changes after the initial connection:
|
||||
|
||||
```python
|
||||
client = pydase.Client(
|
||||
url="ws://localhost:8001",
|
||||
auto_update_proxy=False
|
||||
)
|
||||
```
|
||||
|
||||
## Direct API Access
|
||||
|
||||
In addition to using the `proxy` object, users may access the server API directly via the following methods:
|
||||
|
||||
```python
|
||||
client = pydase.Client(url="ws://localhost:8001")
|
||||
|
||||
# Get the current value of an attribute
|
||||
value = client.get_value("device.voltage")
|
||||
|
||||
# Update an attribute
|
||||
client.update_value("device.voltage", 5.0)
|
||||
|
||||
# Call a method on the remote service
|
||||
result = client.trigger_method("device.reset")
|
||||
```
|
||||
|
||||
This bypasses the proxy and is useful for lower-level access to individual service endpoints.
|
||||
|
||||
## Accessing Services Behind Firewalls or SSH Gateways
|
||||
|
||||
If your service is only reachable through a private network or SSH gateway, you can route your connection through a local SOCKS5 proxy using the `proxy_url` parameter.
|
||||
|
||||
@@ -75,6 +107,7 @@ if __name__ == "__main__":
|
||||
```
|
||||
|
||||
In this example:
|
||||
|
||||
- The `MyService` class has a `proxy` attribute that connects to a `pydase` service at `<ip_addr>:<service_port>`.
|
||||
- By setting `block_until_connected=False`, the service can start without waiting for the connection to succeed.
|
||||
- The `client_id` is optional. If not specified, it defaults to the system hostname, which will be sent in the `X-Client-Id` HTTP header for logging or authentication on the server side.
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[project]
|
||||
name = "pydase"
|
||||
version = "0.10.16"
|
||||
version = "0.10.18"
|
||||
description = "A flexible and robust Python library for creating, managing, and interacting with data services, with built-in support for web and RPC servers, and customizable features for diverse use cases."
|
||||
authors = [
|
||||
{name = "Mose Müller",email = "mosemueller@gmail.com"}
|
||||
|
||||
@@ -12,7 +12,12 @@ import aiohttp
|
||||
import socketio # type: ignore
|
||||
|
||||
from pydase.client.proxy_class import ProxyClass
|
||||
from pydase.client.proxy_loader import ProxyLoader
|
||||
from pydase.client.proxy_loader import (
|
||||
ProxyLoader,
|
||||
get_value,
|
||||
trigger_method,
|
||||
update_value,
|
||||
)
|
||||
from pydase.utils.serialization.deserializer import loads
|
||||
from pydase.utils.serialization.types import SerializedDataService, SerializedObject
|
||||
|
||||
@@ -65,6 +70,8 @@ class Client:
|
||||
proxy_url: An optional proxy URL to route the connection through. This is useful
|
||||
if the service is only reachable via an SSH tunnel or behind a firewall
|
||||
(e.g., `socks5://localhost:2222`).
|
||||
auto_update_proxy: If False, disables automatic updates from the server. Useful
|
||||
for request-only clients where real-time synchronization is not needed.
|
||||
|
||||
Example:
|
||||
Connect to a service directly:
|
||||
@@ -93,7 +100,7 @@ class Client:
|
||||
```
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
def __init__( # noqa: PLR0913
|
||||
self,
|
||||
*,
|
||||
url: str,
|
||||
@@ -101,6 +108,7 @@ class Client:
|
||||
sio_client_kwargs: dict[str, Any] = {},
|
||||
client_id: str | None = None,
|
||||
proxy_url: str | None = None,
|
||||
auto_update_proxy: bool = True, # new argument
|
||||
):
|
||||
# Parse the URL to separate base URL and path prefix
|
||||
parsed_url = urllib.parse.urlparse(url)
|
||||
@@ -118,6 +126,7 @@ class Client:
|
||||
self._sio_client_kwargs = sio_client_kwargs
|
||||
self._loop: asyncio.AbstractEventLoop | None = None
|
||||
self._thread: threading.Thread | None = None
|
||||
self._auto_update_proxy = auto_update_proxy
|
||||
self.proxy: ProxyClass
|
||||
"""A proxy object representing the remote service, facilitating interaction as
|
||||
if it were local."""
|
||||
@@ -224,24 +233,25 @@ class Client:
|
||||
async def _setup_events(self) -> None:
|
||||
self._sio.on("connect", self._handle_connect)
|
||||
self._sio.on("disconnect", self._handle_disconnect)
|
||||
self._sio.on("notify", self._handle_update)
|
||||
if self._auto_update_proxy:
|
||||
self._sio.on("notify", self._handle_update)
|
||||
|
||||
async def _handle_connect(self) -> None:
|
||||
logger.debug("Connected to '%s' ...", self._url)
|
||||
serialized_object = cast(
|
||||
"SerializedDataService", await self._sio.call("service_serialization")
|
||||
)
|
||||
ProxyLoader.update_data_service_proxy(
|
||||
self.proxy, serialized_object=serialized_object
|
||||
)
|
||||
serialized_object["type"] = "DeviceConnection"
|
||||
if self.proxy._service_representation is not None:
|
||||
if self._auto_update_proxy:
|
||||
serialized_object = cast(
|
||||
"SerializedDataService", await self._sio.call("service_serialization")
|
||||
)
|
||||
ProxyLoader.update_data_service_proxy(
|
||||
self.proxy, serialized_object=serialized_object
|
||||
)
|
||||
serialized_object["type"] = "DeviceConnection"
|
||||
# need to use object.__setattr__ to not trigger an observer notification
|
||||
object.__setattr__(self.proxy, "_service_representation", serialized_object)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
self.proxy._service_representation = serialized_object # type: ignore
|
||||
self.proxy._notify_changed("", self.proxy)
|
||||
self.proxy._notify_changed("", self.proxy)
|
||||
self.proxy._connected = True
|
||||
|
||||
async def _handle_disconnect(self) -> None:
|
||||
@@ -253,3 +263,77 @@ class Client:
|
||||
data["data"]["full_access_path"],
|
||||
loads(data["data"]["value"]),
|
||||
)
|
||||
|
||||
def get_value(self, access_path: str) -> Any:
|
||||
"""Retrieve the current value of a remote attribute.
|
||||
|
||||
Args:
|
||||
access_path: The dot-separated path to the attribute in the remote service.
|
||||
|
||||
Returns:
|
||||
The deserialized value of the remote attribute, or None if the client is not
|
||||
connected.
|
||||
|
||||
Example:
|
||||
```python
|
||||
value = client.get_value("my_device.temperature")
|
||||
print(value)
|
||||
```
|
||||
"""
|
||||
|
||||
if self._loop is not None:
|
||||
return get_value(
|
||||
sio_client=self._sio,
|
||||
loop=self._loop,
|
||||
access_path=access_path,
|
||||
)
|
||||
return None
|
||||
|
||||
def update_value(self, access_path: str, new_value: Any) -> Any:
|
||||
"""Set a new value for a remote attribute.
|
||||
|
||||
Args:
|
||||
access_path: The dot-separated path to the attribute in the remote service.
|
||||
new_value: The new value to assign to the attribute.
|
||||
|
||||
Example:
|
||||
```python
|
||||
client.update_value("my_device.power", True)
|
||||
```
|
||||
"""
|
||||
|
||||
if self._loop is not None:
|
||||
update_value(
|
||||
sio_client=self._sio,
|
||||
loop=self._loop,
|
||||
access_path=access_path,
|
||||
value=new_value,
|
||||
)
|
||||
|
||||
def trigger_method(self, access_path: str, *args: Any, **kwargs: Any) -> Any:
|
||||
"""Trigger a remote method with optional arguments.
|
||||
|
||||
Args:
|
||||
access_path: The dot-separated path to the method in the remote service.
|
||||
*args: Positional arguments to pass to the method.
|
||||
**kwargs: Keyword arguments to pass to the method.
|
||||
|
||||
Returns:
|
||||
The return value of the method call, if any.
|
||||
|
||||
Example:
|
||||
```python
|
||||
result = client.trigger_method("my_device.calibrate", timeout=5)
|
||||
print(result)
|
||||
```
|
||||
"""
|
||||
|
||||
if self._loop is not None:
|
||||
return trigger_method(
|
||||
sio_client=self._sio,
|
||||
loop=self._loop,
|
||||
access_path=access_path,
|
||||
args=list(args),
|
||||
kwargs=kwargs,
|
||||
)
|
||||
return None
|
||||
|
||||
@@ -65,19 +65,31 @@ class ProxyClass(ProxyClassMixin, pydase.components.DeviceConnection):
|
||||
self.reconnect = reconnect
|
||||
|
||||
def serialize(self) -> SerializedObject:
|
||||
if self._service_representation is None:
|
||||
serialization_future = cast(
|
||||
current_loop = asyncio.get_event_loop()
|
||||
|
||||
if not self.connected or current_loop == self._loop:
|
||||
logger.debug(
|
||||
"Client not connected, or called from within client event loop - using "
|
||||
"fallback serialization"
|
||||
)
|
||||
if self._service_representation is None:
|
||||
serialized_service = pydase.components.DeviceConnection().serialize()
|
||||
else:
|
||||
serialized_service = self._service_representation
|
||||
|
||||
else:
|
||||
future = cast(
|
||||
"asyncio.Future[SerializedDataService]",
|
||||
asyncio.run_coroutine_threadsafe(
|
||||
self._sio.call("service_serialization"), self._loop
|
||||
),
|
||||
)
|
||||
result = future.result()
|
||||
# need to use object.__setattr__ to not trigger an observer notification
|
||||
object.__setattr__(
|
||||
self, "_service_representation", serialization_future.result()
|
||||
)
|
||||
object.__setattr__(self, "_service_representation", result)
|
||||
if TYPE_CHECKING:
|
||||
self._service_representation = serialization_future.result()
|
||||
self._service_representation = result
|
||||
serialized_service = result
|
||||
|
||||
device_connection_value = cast(
|
||||
"dict[str, SerializedObject]",
|
||||
@@ -93,7 +105,7 @@ class ProxyClass(ProxyClassMixin, pydase.components.DeviceConnection):
|
||||
"dict[str, SerializedObject]",
|
||||
# need to deepcopy to not overwrite the _service_representation dict
|
||||
# when adding a prefix with add_prefix_to_full_access_path
|
||||
deepcopy(self._service_representation["value"]),
|
||||
deepcopy(serialized_service["value"]),
|
||||
),
|
||||
**device_connection_value,
|
||||
}
|
||||
|
||||
@@ -74,6 +74,21 @@ def update_value(
|
||||
)
|
||||
|
||||
|
||||
def get_value(
|
||||
sio_client: socketio.AsyncClient,
|
||||
loop: asyncio.AbstractEventLoop,
|
||||
access_path: str,
|
||||
) -> Any:
|
||||
async def get_result() -> Any:
|
||||
return await sio_client.call("get_value", access_path)
|
||||
|
||||
result = asyncio.run_coroutine_threadsafe(
|
||||
get_result(),
|
||||
loop=loop,
|
||||
).result()
|
||||
return ProxyLoader.loads_proxy(result, sio_client, loop)
|
||||
|
||||
|
||||
class ProxyDict(dict[str, Any]):
|
||||
def __init__(
|
||||
self,
|
||||
@@ -242,16 +257,11 @@ class ProxyClassMixin:
|
||||
self, attr_name: str, serialized_object: SerializedObject
|
||||
) -> None:
|
||||
def getter_proxy() -> Any:
|
||||
async def get_result() -> Any:
|
||||
return await self._sio.call(
|
||||
"get_value", serialized_object["full_access_path"]
|
||||
)
|
||||
|
||||
result = asyncio.run_coroutine_threadsafe(
|
||||
get_result(),
|
||||
return get_value(
|
||||
sio_client=self._sio,
|
||||
loop=self._loop,
|
||||
).result()
|
||||
return ProxyLoader.loads_proxy(result, self._sio, self._loop)
|
||||
access_path=serialized_object["full_access_path"],
|
||||
)
|
||||
|
||||
dict.__setitem__(self._proxy_getters, attr_name, getter_proxy) # type: ignore
|
||||
|
||||
|
||||
@@ -135,6 +135,14 @@ class Server:
|
||||
autosave_interval: Interval in seconds between automatic state save events.
|
||||
If set to `None`, automatic saving is disabled. Defaults to 30 seconds.
|
||||
**kwargs: Additional keyword arguments.
|
||||
|
||||
# Advanced
|
||||
- [`post_startup`][pydase.Server.post_startup] hook:
|
||||
|
||||
This method is intended to be overridden in subclasses. It runs immediately
|
||||
after all servers (web and additional) are initialized and before entering the
|
||||
main event loop. You can use this hook to register custom logic after the
|
||||
server is fully started.
|
||||
"""
|
||||
|
||||
def __init__( # noqa: PLR0913
|
||||
@@ -174,6 +182,14 @@ class Server:
|
||||
self._state_manager.load_state()
|
||||
autostart_service_tasks(self._service)
|
||||
|
||||
self._web_server = WebServer(
|
||||
data_service_observer=self._observer,
|
||||
host=self._host,
|
||||
port=self._web_port,
|
||||
enable_frontend=self._enable_web,
|
||||
**self._kwargs,
|
||||
)
|
||||
|
||||
def run(self) -> None:
|
||||
"""
|
||||
Initializes the asyncio event loop and starts the server.
|
||||
@@ -191,6 +207,7 @@ class Server:
|
||||
logger.info("Started server process [%s]", process_id)
|
||||
|
||||
await self.startup()
|
||||
await self.post_startup()
|
||||
if self.should_exit:
|
||||
return
|
||||
await self.main_loop()
|
||||
@@ -202,6 +219,10 @@ class Server:
|
||||
self._loop.set_exception_handler(self.custom_exception_handler)
|
||||
self.install_signal_handlers()
|
||||
|
||||
server_task = self._loop.create_task(self._web_server.serve())
|
||||
server_task.add_done_callback(self._handle_server_shutdown)
|
||||
self.servers["web"] = server_task
|
||||
|
||||
for server in self._additional_servers:
|
||||
addin_server = server["server"](
|
||||
data_service_observer=self._observer,
|
||||
@@ -217,17 +238,6 @@ class Server:
|
||||
server_task = self._loop.create_task(addin_server.serve())
|
||||
server_task.add_done_callback(self._handle_server_shutdown)
|
||||
self.servers[server_name] = server_task
|
||||
if self._enable_web:
|
||||
self._web_server = WebServer(
|
||||
data_service_observer=self._observer,
|
||||
host=self._host,
|
||||
port=self._web_port,
|
||||
**self._kwargs,
|
||||
)
|
||||
server_task = self._loop.create_task(self._web_server.serve())
|
||||
|
||||
server_task.add_done_callback(self._handle_server_shutdown)
|
||||
self.servers["web"] = server_task
|
||||
|
||||
self._loop.create_task(self._state_manager.autosave())
|
||||
|
||||
@@ -258,6 +268,9 @@ class Server:
|
||||
logger.debug("Cancelling tasks")
|
||||
await self.__cancel_tasks()
|
||||
|
||||
async def post_startup(self) -> None:
|
||||
"""Override this in a subclass to register custom logic after startup."""
|
||||
|
||||
async def __cancel_servers(self) -> None:
|
||||
for server_name, task in self.servers.items():
|
||||
task.cancel()
|
||||
@@ -307,7 +320,7 @@ class Server:
|
||||
# here we exclude most kinds of exceptions from triggering this kind of shutdown
|
||||
exc = context.get("exception")
|
||||
if type(exc) not in [RuntimeError, KeyboardInterrupt, asyncio.CancelledError]:
|
||||
if self._enable_web:
|
||||
if loop.is_running():
|
||||
|
||||
async def emit_exception() -> None:
|
||||
try:
|
||||
|
||||
@@ -115,7 +115,7 @@ def setup_sio_server(
|
||||
def sio_callback(
|
||||
full_access_path: str, value: Any, cached_value_dict: SerializedObject
|
||||
) -> None:
|
||||
if cached_value_dict != {}:
|
||||
if cached_value_dict != {} and loop.is_running():
|
||||
|
||||
async def notify() -> None:
|
||||
try:
|
||||
|
||||
@@ -81,6 +81,7 @@ class WebServer:
|
||||
host: str,
|
||||
port: int,
|
||||
*,
|
||||
enable_frontend: bool = True,
|
||||
css: str | Path | None = None,
|
||||
favicon_path: str | Path | None = None,
|
||||
enable_cors: bool = True,
|
||||
@@ -97,19 +98,18 @@ class WebServer:
|
||||
self.enable_cors = enable_cors
|
||||
self.frontend_src = frontend_src
|
||||
self.favicon_path: Path | str = favicon_path # type: ignore
|
||||
self.enable_frontend = enable_frontend
|
||||
|
||||
if self.favicon_path is None:
|
||||
self.favicon_path = self.frontend_src / "favicon.ico"
|
||||
|
||||
self._service_config_dir = config_dir
|
||||
self._generate_web_settings = generate_web_settings
|
||||
self._loop: asyncio.AbstractEventLoop
|
||||
self._loop = asyncio.get_event_loop()
|
||||
self._sio = setup_sio_server(self.observer, self.enable_cors, self._loop)
|
||||
self._initialise_configuration()
|
||||
|
||||
async def serve(self) -> None:
|
||||
self._loop = asyncio.get_running_loop()
|
||||
self._sio = setup_sio_server(self.observer, self.enable_cors, self._loop)
|
||||
|
||||
async def index(
|
||||
request: aiohttp.web.Request,
|
||||
) -> aiohttp.web.Response | aiohttp.web.FileResponse:
|
||||
@@ -162,15 +162,17 @@ class WebServer:
|
||||
|
||||
# Define routes
|
||||
self._sio.attach(app, socketio_path="/ws/socket.io")
|
||||
app.router.add_static("/assets", self.frontend_src / "assets")
|
||||
app.router.add_get("/favicon.ico", self._favicon_route)
|
||||
app.router.add_get("/service-properties", self._service_properties_route)
|
||||
app.router.add_get("/web-settings", self._web_settings_route)
|
||||
app.router.add_get("/custom.css", self._styles_route)
|
||||
if self.enable_frontend:
|
||||
app.router.add_static("/assets", self.frontend_src / "assets")
|
||||
app.router.add_get("/favicon.ico", self._favicon_route)
|
||||
app.router.add_get("/service-properties", self._service_properties_route)
|
||||
app.router.add_get("/web-settings", self._web_settings_route)
|
||||
app.router.add_get("/custom.css", self._styles_route)
|
||||
app.add_subapp("/api/", create_api_application(self.state_manager))
|
||||
|
||||
app.router.add_get(r"/", index)
|
||||
app.router.add_get(r"/{tail:.*}", index)
|
||||
if self.enable_frontend:
|
||||
app.router.add_get(r"/", index)
|
||||
app.router.add_get(r"/{tail:.*}", index)
|
||||
|
||||
await aiohttp.web._run_app(
|
||||
app,
|
||||
|
||||
@@ -165,15 +165,16 @@ class SocketIOHandler(logging.Handler):
|
||||
log_entry = self.format(record)
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
loop.create_task(
|
||||
self._sio.emit(
|
||||
"log",
|
||||
{
|
||||
"levelname": record.levelname,
|
||||
"message": log_entry,
|
||||
},
|
||||
if loop.is_running():
|
||||
loop.create_task(
|
||||
self._sio.emit(
|
||||
"log",
|
||||
{
|
||||
"levelname": record.levelname,
|
||||
"message": log_entry,
|
||||
},
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def setup_logging() -> None:
|
||||
|
||||
@@ -177,3 +177,16 @@ def test_client_id(
|
||||
|
||||
pydase.Client(url="ws://localhost:9999", client_id="my_service")
|
||||
assert "Client [id=my_service] connected" in caplog.text
|
||||
|
||||
|
||||
def test_get_value(
|
||||
pydase_client: pydase.Client, caplog: pytest.LogCaptureFixture
|
||||
) -> None:
|
||||
pydase_client.update_value("sub_service.name", "Other name")
|
||||
|
||||
assert pydase_client.get_value("sub_service.name") == "Other name"
|
||||
|
||||
assert (
|
||||
pydase_client.trigger_method("my_async_method", input_str="Hello World")
|
||||
== "Hello World"
|
||||
)
|
||||
|
||||
22
tests/client/test_proxy_class.py
Normal file
22
tests/client/test_proxy_class.py
Normal file
@@ -0,0 +1,22 @@
|
||||
import asyncio
|
||||
from unittest.mock import AsyncMock, call, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from pydase import components
|
||||
from pydase.client.proxy_class import ProxyClass
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_serialize_fallback_inside_event_loop() -> None:
|
||||
loop = asyncio.get_running_loop()
|
||||
mock_sio = AsyncMock()
|
||||
proxy = ProxyClass(sio_client=mock_sio, loop=loop, reconnect=lambda: None)
|
||||
|
||||
with patch.object(
|
||||
components.DeviceConnection, "serialize", return_value={"value": {}}
|
||||
) as mock_fallback:
|
||||
result = proxy.serialize()
|
||||
|
||||
mock_fallback.assert_has_calls(calls=[call(), call()])
|
||||
assert isinstance(result, dict)
|
||||
@@ -1,10 +1,17 @@
|
||||
import sys
|
||||
|
||||
from pytest import LogCaptureFixture
|
||||
|
||||
import pydase
|
||||
import pydase.components
|
||||
from pydase.data_service.data_service_observer import DataServiceObserver
|
||||
from pydase.data_service.state_manager import StateManager
|
||||
from pydase.utils.serialization.serializer import dump
|
||||
from pytest import LogCaptureFixture
|
||||
|
||||
if sys.version_info < (3, 13):
|
||||
PATHLIB_PATH = "pathlib.Path"
|
||||
else:
|
||||
PATHLIB_PATH = "pathlib._local.Path"
|
||||
|
||||
|
||||
def test_image_functions(caplog: LogCaptureFixture) -> None:
|
||||
@@ -106,7 +113,7 @@ def test_image_serialization() -> None:
|
||||
"signature": {
|
||||
"parameters": {
|
||||
"path": {
|
||||
"annotation": "pathlib.Path | str",
|
||||
"annotation": f"{PATHLIB_PATH} | str",
|
||||
"default": {},
|
||||
}
|
||||
},
|
||||
|
||||
Reference in New Issue
Block a user