mirror of
https://github.com/tiqi-group/pydase.git
synced 2025-12-20 05:01:19 +01:00
Compare commits
38 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
eb32b34b59 | ||
|
|
9eedf03c01 | ||
|
|
5ec7a8b530 | ||
|
|
f2f330dbd9 | ||
|
|
2e0e056489 | ||
|
|
d8685fe9a0 | ||
|
|
e52a019d5e | ||
|
|
0d5cef1537 | ||
|
|
e8f33eee4d | ||
|
|
a3b71b174c | ||
|
|
e2ce0e9acb | ||
|
|
f47a183c11 | ||
|
|
a9ea237cf3 | ||
|
|
6db1652dd3 | ||
|
|
e3b95a8076 | ||
|
|
0fe2a8516f | ||
|
|
51bbaba162 | ||
|
|
77802da417 | ||
|
|
3e21858cb7 | ||
|
|
2003f28fd1 | ||
|
|
172b50bf77 | ||
|
|
ec5694fedf | ||
|
|
968f774092 | ||
|
|
757dc9aa3c | ||
|
|
3d938562a6 | ||
|
|
964a62d4b4 | ||
|
|
99aa38fcfe | ||
|
|
5658514c8a | ||
|
|
109ee7d5e1 | ||
|
|
f4fa02fe11 | ||
|
|
487ef504a8 | ||
|
|
c98e407ed7 | ||
|
|
6b6ce1d43f | ||
|
|
e491ac7458 | ||
|
|
e9d8cbafc2 | ||
|
|
aa705592b2 | ||
|
|
008e1262bb | ||
|
|
91a71ad004 |
@@ -1,60 +1,86 @@
|
||||
# Python RPC Client
|
||||
|
||||
You can connect to the service using the `pydase.Client`. Below is an example of how to establish a connection to a service and interact with it:
|
||||
The [`pydase.Client`][pydase.Client] allows you to connect to a remote `pydase` service using socket.io, facilitating interaction with the service as though it were running locally.
|
||||
|
||||
## Basic Usage
|
||||
|
||||
```python
|
||||
import pydase
|
||||
|
||||
# Replace the hostname and port with the IP address and the port of the machine where
|
||||
# the service is running, respectively
|
||||
# Replace <ip_addr> and <service_port> with the appropriate values for your service
|
||||
client_proxy = pydase.Client(url="ws://<ip_addr>:<service_port>").proxy
|
||||
# client_proxy = pydase.Client(url="wss://your-domain.ch").proxy # if your service uses ssl-encryption
|
||||
# For SSL-encrypted services, use the wss protocol
|
||||
# client_proxy = pydase.Client(url="wss://your-domain.ch").proxy
|
||||
|
||||
# Interact with the service attributes as if they were local
|
||||
client_proxy.voltage = 5.0
|
||||
print(client_proxy.voltage) # Expected output: 5.0
|
||||
```
|
||||
|
||||
This example demonstrates setting and retrieving the `voltage` attribute through the client proxy.
|
||||
The proxy acts as a local representative of the remote service, enabling straightforward interaction.
|
||||
This example shows how to set and retrieve the `voltage` attribute through the client proxy.
|
||||
The proxy acts as a local representation of the remote service, enabling intuitive interaction.
|
||||
|
||||
The proxy class dynamically synchronizes with the server's exposed attributes. This synchronization allows the proxy to be automatically updated with any attributes or methods that the server exposes, essentially mirroring the server's API. This dynamic updating enables users to interact with the remote service as if they were working with a local object.
|
||||
The proxy class automatically synchronizes with the server's attributes and methods, keeping itself up-to-date with any changes. This dynamic synchronization essentially mirrors the server's API, making it feel like you're working with a local object.
|
||||
|
||||
## Context Manager
|
||||
## Context Manager Support
|
||||
|
||||
You can also use the client as a context manager which automatically opens and closes the connection again:
|
||||
You can also use the client within a context manager, which automatically handles connection management (i.e., opening and closing the connection):
|
||||
|
||||
```python
|
||||
import pydase
|
||||
|
||||
|
||||
with pydase.Client(url="ws://localhost:8001") as client:
|
||||
client.proxy.<my_method>()
|
||||
client.proxy.my_method()
|
||||
```
|
||||
|
||||
Using the context manager ensures that connections are cleanly closed once the block of code finishes executing.
|
||||
|
||||
## Tab Completion Support
|
||||
|
||||
In interactive environments such as Python interpreters and Jupyter notebooks, the proxy class supports tab completion, which allows users to explore available methods and attributes.
|
||||
In interactive environments like Python interpreters or Jupyter notebooks, the proxy supports tab completion. This allows users to explore available methods and attributes.
|
||||
|
||||
## Integration within Other Services
|
||||
## Integrating the Client into Another Service
|
||||
|
||||
You can also integrate a client proxy within another service. Here's how you can set it up:
|
||||
You can integrate a `pydase` client proxy within another service. Here's an example of how to set this up:
|
||||
|
||||
```python
|
||||
import pydase
|
||||
|
||||
class MyService(pydase.DataService):
|
||||
# Initialize the client without blocking the constructor
|
||||
proxy = pydase.Client(url="ws://<ip_addr>:<service_port>", block_until_connected=False).proxy
|
||||
# proxy = pydase.Client(url="wss://your-domain.ch", block_until_connected=False).proxy # communicating with ssl-encrypted service
|
||||
proxy = pydase.Client(
|
||||
url="ws://<ip_addr>:<service_port>",
|
||||
block_until_connected=False
|
||||
).proxy
|
||||
# For SSL-encrypted services, use the wss protocol
|
||||
# proxy = pydase.Client(
|
||||
# url="wss://your-domain.ch",
|
||||
# block_until_connected=False
|
||||
# ).proxy
|
||||
|
||||
if __name__ == "__main__":
|
||||
service = MyService()
|
||||
# Create a server that exposes this service; adjust the web_port as needed
|
||||
server = pydase.Server(service, web_port=8002). run()
|
||||
# Create a server that exposes this service
|
||||
server = pydase.Server(service, web_port=8002).run()
|
||||
```
|
||||
|
||||
In this setup, the `MyService` class has a `proxy` attribute that connects to a `pydase` service located at `<ip_addr>:8001`.
|
||||
The `block_until_connected=False` argument allows the service to start up even if the initial connection attempt fails.
|
||||
This configuration is particularly useful in distributed systems where services may start in any order.
|
||||
In this example:
|
||||
- The `MyService` class has a `proxy` attribute that connects to a `pydase` service at `<ip_addr>:<service_port>`.
|
||||
- By setting `block_until_connected=False`, the service can start without waiting for the connection to succeed, which is particularly useful in distributed systems where services may initialize in any order.
|
||||
|
||||
## Custom `socketio.AsyncClient` Connection Parameters
|
||||
|
||||
You can also configure advanced connection options by passing additional arguments to the underlying [`AsyncClient`][socketio.AsyncClient] via `sio_client_kwargs`. This allows you to fine-tune reconnection behaviour, delays, and other settings:
|
||||
|
||||
```python
|
||||
client = pydase.Client(
|
||||
url="ws://localhost:8001",
|
||||
sio_client_kwargs={
|
||||
"reconnection_attempts": 3,
|
||||
"reconnection_delay": 2,
|
||||
"reconnection_delay_max": 10,
|
||||
}
|
||||
).proxy
|
||||
```
|
||||
|
||||
In this setup, the client will attempt to reconnect three times, with an initial delay of 2 seconds (each successive attempt doubles this delay) and a maximum delay of 10 seconds between attempts.
|
||||
|
||||
@@ -54,6 +54,7 @@ plugins:
|
||||
- https://docs.python.org/3/objects.inv
|
||||
- https://docs.pydantic.dev/latest/objects.inv
|
||||
- https://confz.readthedocs.io/en/latest/objects.inv
|
||||
- https://python-socketio.readthedocs.io/en/stable/objects.inv
|
||||
options:
|
||||
show_source: true
|
||||
inherited_members: true
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[tool.poetry]
|
||||
name = "pydase"
|
||||
version = "0.10.2"
|
||||
version = "0.10.6"
|
||||
description = "A flexible and robust Python library for creating, managing, and interacting with data services, with built-in support for web and RPC servers, and customizable features for diverse use cases."
|
||||
authors = ["Mose Mueller <mosmuell@ethz.ch>"]
|
||||
readme = "README.md"
|
||||
|
||||
@@ -2,13 +2,12 @@ import asyncio
|
||||
import logging
|
||||
import sys
|
||||
import threading
|
||||
from typing import TypedDict, cast
|
||||
from typing import TYPE_CHECKING, Any, TypedDict, cast
|
||||
|
||||
import socketio # type: ignore
|
||||
|
||||
import pydase.components
|
||||
from pydase.client.proxy_loader import ProxyClassMixin, ProxyLoader
|
||||
from pydase.utils.helpers import current_event_loop_exists
|
||||
from pydase.client.proxy_class import ProxyClass
|
||||
from pydase.client.proxy_loader import ProxyLoader
|
||||
from pydase.utils.serialization.deserializer import loads
|
||||
from pydase.utils.serialization.types import SerializedDataService, SerializedObject
|
||||
|
||||
@@ -32,51 +31,7 @@ class NotifyDict(TypedDict):
|
||||
|
||||
def asyncio_loop_thread(loop: asyncio.AbstractEventLoop) -> None:
|
||||
asyncio.set_event_loop(loop)
|
||||
try:
|
||||
loop.run_forever()
|
||||
except RuntimeError:
|
||||
logger.debug("Tried starting even loop, but it is running already")
|
||||
|
||||
|
||||
class ProxyClass(ProxyClassMixin, pydase.components.DeviceConnection):
|
||||
"""
|
||||
A proxy class that serves as the interface for interacting with device connections
|
||||
via a socket.io client in an asyncio environment.
|
||||
|
||||
Args:
|
||||
sio_client:
|
||||
The socket.io client instance used for asynchronous communication with the
|
||||
pydase service server.
|
||||
loop:
|
||||
The event loop in which the client operations are managed and executed.
|
||||
|
||||
This class is used to create a proxy object that behaves like a local representation
|
||||
of a remote pydase service, facilitating direct interaction as if it were local
|
||||
while actually communicating over network protocols.
|
||||
It can also be used as an attribute of a pydase service itself, e.g.
|
||||
|
||||
```python
|
||||
import pydase
|
||||
|
||||
|
||||
class MyService(pydase.DataService):
|
||||
proxy = pydase.Client(
|
||||
hostname="...", port=8001, block_until_connected=False
|
||||
).proxy
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
service = MyService()
|
||||
server = pydase.Server(service, web_port=8002).run()
|
||||
```
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self, sio_client: socketio.AsyncClient, loop: asyncio.AbstractEventLoop
|
||||
) -> None:
|
||||
super().__init__()
|
||||
pydase.components.DeviceConnection.__init__(self)
|
||||
self._initialise(sio_client=sio_client, loop=loop)
|
||||
loop.run_forever()
|
||||
|
||||
|
||||
class Client:
|
||||
@@ -90,15 +45,35 @@ class Client:
|
||||
url:
|
||||
The URL of the pydase Socket.IO server. This should always contain the
|
||||
protocol and the hostname.
|
||||
|
||||
Examples:
|
||||
|
||||
- `wss://my-service.example.com` # for secure connections, use wss
|
||||
- `ws://localhost:8001`
|
||||
block_until_connected:
|
||||
If set to True, the constructor will block until the connection to the
|
||||
service has been established. This is useful for ensuring the client is
|
||||
ready to use immediately after instantiation. Default is True.
|
||||
sio_client_kwargs:
|
||||
Additional keyword arguments passed to the underlying
|
||||
[`AsyncClient`][socketio.AsyncClient]. This allows fine-tuning of the
|
||||
client's behaviour (e.g., reconnection attempts or reconnection delay).
|
||||
Default is an empty dictionary.
|
||||
|
||||
Example:
|
||||
The following example demonstrates a `Client` instance that connects to another
|
||||
pydase service, while customising some of the connection settings for the
|
||||
underlying [`AsyncClient`][socketio.AsyncClient].
|
||||
|
||||
```python
|
||||
pydase.Client(url="ws://localhost:8001", sio_client_kwargs={
|
||||
"reconnection_attempts": 2,
|
||||
"reconnection_delay": 2,
|
||||
"reconnection_delay_max": 8,
|
||||
})
|
||||
```
|
||||
|
||||
When connecting to a server over a secure connection (i.e., the server is using
|
||||
SSL/TLS encryption), make sure that the `wss` protocol is used instead of `ws`:
|
||||
|
||||
```python
|
||||
pydase.Client(url="wss://my-service.example.com")
|
||||
```
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
@@ -106,15 +81,14 @@ class Client:
|
||||
*,
|
||||
url: str,
|
||||
block_until_connected: bool = True,
|
||||
sio_client_kwargs: dict[str, Any] = {},
|
||||
):
|
||||
self._url = url
|
||||
self._sio = socketio.AsyncClient()
|
||||
if not current_event_loop_exists():
|
||||
self._loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(self._loop)
|
||||
else:
|
||||
self._loop = asyncio.get_event_loop()
|
||||
self.proxy = ProxyClass(sio_client=self._sio, loop=self._loop)
|
||||
self._sio = socketio.AsyncClient(**sio_client_kwargs)
|
||||
self._loop = asyncio.new_event_loop()
|
||||
self.proxy = ProxyClass(
|
||||
sio_client=self._sio, loop=self._loop, reconnect=self.connect
|
||||
)
|
||||
"""A proxy object representing the remote service, facilitating interaction as
|
||||
if it were local."""
|
||||
self._thread = threading.Thread(
|
||||
@@ -170,7 +144,13 @@ class Client:
|
||||
self.proxy, serialized_object=serialized_object
|
||||
)
|
||||
serialized_object["type"] = "DeviceConnection"
|
||||
self.proxy._notify_changed("", loads(serialized_object))
|
||||
if self.proxy._service_representation is not None:
|
||||
# need to use object.__setattr__ to not trigger an observer notification
|
||||
object.__setattr__(self.proxy, "_service_representation", serialized_object)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
self.proxy._service_representation = serialized_object # type: ignore
|
||||
self.proxy._notify_changed("", self.proxy)
|
||||
self.proxy._connected = True
|
||||
|
||||
async def _handle_disconnect(self) -> None:
|
||||
|
||||
112
src/pydase/client/proxy_class.py
Normal file
112
src/pydase/client/proxy_class.py
Normal file
@@ -0,0 +1,112 @@
|
||||
import asyncio
|
||||
import logging
|
||||
from collections.abc import Callable
|
||||
from copy import deepcopy
|
||||
from typing import TYPE_CHECKING, cast
|
||||
|
||||
import socketio # type: ignore
|
||||
|
||||
import pydase.components
|
||||
from pydase.client.proxy_loader import ProxyClassMixin
|
||||
from pydase.utils.helpers import get_attribute_doc
|
||||
from pydase.utils.serialization.types import SerializedDataService, SerializedObject
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ProxyClass(ProxyClassMixin, pydase.components.DeviceConnection):
|
||||
"""
|
||||
A proxy class that serves as the interface for interacting with device connections
|
||||
via a socket.io client in an asyncio environment.
|
||||
|
||||
Args:
|
||||
sio_client:
|
||||
The socket.io client instance used for asynchronous communication with the
|
||||
pydase service server.
|
||||
loop:
|
||||
The event loop in which the client operations are managed and executed.
|
||||
reconnect:
|
||||
The method that is called periodically when the client is not connected.
|
||||
|
||||
This class is used to create a proxy object that behaves like a local representation
|
||||
of a remote pydase service, facilitating direct interaction as if it were local
|
||||
while actually communicating over network protocols.
|
||||
It can also be used as an attribute of a pydase service itself, e.g.
|
||||
|
||||
```python
|
||||
import pydase
|
||||
|
||||
|
||||
class MyService(pydase.DataService):
|
||||
proxy = pydase.Client(
|
||||
hostname="...", port=8001, block_until_connected=False
|
||||
).proxy
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
service = MyService()
|
||||
server = pydase.Server(service, web_port=8002).run()
|
||||
```
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
sio_client: socketio.AsyncClient,
|
||||
loop: asyncio.AbstractEventLoop,
|
||||
reconnect: Callable[..., None],
|
||||
) -> None:
|
||||
if TYPE_CHECKING:
|
||||
self._service_representation: None | SerializedObject = None
|
||||
|
||||
super().__init__()
|
||||
pydase.components.DeviceConnection.__init__(self)
|
||||
self._initialise(sio_client=sio_client, loop=loop)
|
||||
object.__setattr__(self, "_service_representation", None)
|
||||
self.reconnect = reconnect
|
||||
|
||||
def serialize(self) -> SerializedObject:
|
||||
if self._service_representation is None:
|
||||
serialization_future = cast(
|
||||
asyncio.Future[SerializedDataService],
|
||||
asyncio.run_coroutine_threadsafe(
|
||||
self._sio.call("service_serialization"), self._loop
|
||||
),
|
||||
)
|
||||
# need to use object.__setattr__ to not trigger an observer notification
|
||||
object.__setattr__(
|
||||
self, "_service_representation", serialization_future.result()
|
||||
)
|
||||
if TYPE_CHECKING:
|
||||
self._service_representation = serialization_future.result()
|
||||
|
||||
device_connection_value = cast(
|
||||
dict[str, SerializedObject],
|
||||
pydase.components.DeviceConnection().serialize()["value"],
|
||||
)
|
||||
|
||||
readonly = False
|
||||
doc = get_attribute_doc(self)
|
||||
obj_name = self.__class__.__name__
|
||||
|
||||
value = {
|
||||
**cast(
|
||||
dict[str, SerializedObject],
|
||||
# need to deepcopy to not overwrite the _service_representation dict
|
||||
# when adding a prefix with add_prefix_to_full_access_path
|
||||
deepcopy(self._service_representation["value"]),
|
||||
),
|
||||
**device_connection_value,
|
||||
}
|
||||
|
||||
return {
|
||||
"full_access_path": "",
|
||||
"name": obj_name,
|
||||
"type": "DeviceConnection",
|
||||
"value": value,
|
||||
"readonly": readonly,
|
||||
"doc": doc,
|
||||
}
|
||||
|
||||
def connect(self) -> None:
|
||||
if not self._sio.reconnection or self._sio.reconnection_attempts > 0:
|
||||
self.reconnect(block_until_connected=False)
|
||||
@@ -10,6 +10,7 @@ from pydase.observer_pattern.observable.observable import (
|
||||
)
|
||||
from pydase.utils.helpers import (
|
||||
get_class_and_instance_attributes,
|
||||
is_descriptor,
|
||||
is_property_attribute,
|
||||
)
|
||||
from pydase.utils.serialization.serializer import (
|
||||
@@ -68,7 +69,7 @@ class DataService(AbstractDataService):
|
||||
if not issubclass(
|
||||
value_class,
|
||||
(int | float | bool | str | list | dict | Enum | u.Quantity | Observable),
|
||||
):
|
||||
) and not is_descriptor(__value):
|
||||
logger.warning(
|
||||
"Class '%s' does not inherit from DataService. This may lead to"
|
||||
" unexpected behaviour!",
|
||||
|
||||
@@ -8,7 +8,10 @@ from pydase.observer_pattern.observable.observable_object import ObservableObjec
|
||||
from pydase.observer_pattern.observer.property_observer import (
|
||||
PropertyObserver,
|
||||
)
|
||||
from pydase.utils.helpers import get_object_attr_from_path
|
||||
from pydase.utils.helpers import (
|
||||
get_object_attr_from_path,
|
||||
normalize_full_access_path_string,
|
||||
)
|
||||
from pydase.utils.serialization.serializer import (
|
||||
SerializationPathError,
|
||||
SerializedObject,
|
||||
@@ -99,7 +102,8 @@ class DataServiceObserver(PropertyObserver):
|
||||
)
|
||||
|
||||
def _notify_dependent_property_changes(self, changed_attr_path: str) -> None:
|
||||
changed_props = self.property_deps_dict.get(changed_attr_path, [])
|
||||
normalized_attr_path = normalize_full_access_path_string(changed_attr_path)
|
||||
changed_props = self.property_deps_dict.get(normalized_attr_path, [])
|
||||
for prop in changed_props:
|
||||
# only notify about changing attribute if it is not currently being
|
||||
# "changed" e.g. when calling the getter of a property within another
|
||||
|
||||
@@ -22,12 +22,9 @@ class Observable(ObservableObject):
|
||||
- {"__annotations__"}
|
||||
}
|
||||
for name, value in class_attrs.items():
|
||||
if isinstance(value, property) or callable(value):
|
||||
continue
|
||||
if is_descriptor(value):
|
||||
# Descriptors have to be stored as a class variable in another class to
|
||||
# work properly. So don't make it an instance attribute.
|
||||
self._initialise_new_objects(name, value)
|
||||
if isinstance(value, property) or callable(value) or is_descriptor(value):
|
||||
# Properties, methods and descriptors have to be stored as class
|
||||
# attributes to work properly. So don't make it an instance attribute.
|
||||
continue
|
||||
self.__dict__[name] = self._initialise_new_objects(name, value)
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@ from typing import Any
|
||||
|
||||
from pydase.observer_pattern.observable.observable import Observable
|
||||
from pydase.observer_pattern.observer.observer import Observer
|
||||
from pydase.utils.helpers import is_descriptor
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -61,17 +62,27 @@ class PropertyObserver(Observer):
|
||||
self, obj: Observable, deps: dict[str, Any], prefix: str
|
||||
) -> None:
|
||||
for k, value in {**vars(type(obj)), **vars(obj)}.items():
|
||||
actual_value = value
|
||||
prefix = (
|
||||
f"{prefix}." if prefix != "" and not prefix.endswith(".") else prefix
|
||||
)
|
||||
parent_path = f"{prefix}{k}"
|
||||
if isinstance(value, Observable):
|
||||
|
||||
# Get value from descriptor
|
||||
if not isinstance(value, property) and is_descriptor(value):
|
||||
actual_value = getattr(obj, k)
|
||||
|
||||
if isinstance(actual_value, Observable):
|
||||
new_prefix = f"{parent_path}."
|
||||
deps.update(
|
||||
self._get_properties_and_their_dependencies(value, new_prefix)
|
||||
self._get_properties_and_their_dependencies(
|
||||
actual_value, new_prefix
|
||||
)
|
||||
)
|
||||
elif isinstance(value, list | dict):
|
||||
self._process_collection_item_properties(value, deps, parent_path)
|
||||
self._process_collection_item_properties(
|
||||
actual_value, deps, parent_path
|
||||
)
|
||||
|
||||
def _process_collection_item_properties(
|
||||
self,
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
import logging
|
||||
from collections.abc import Callable, Coroutine
|
||||
from typing import Any, TypeVar
|
||||
from typing import Any, Generic, TypeVar, overload
|
||||
|
||||
from pydase.data_service.data_service import DataService
|
||||
from pydase.task.task import Task
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -9,6 +10,69 @@ logger = logging.getLogger(__name__)
|
||||
R = TypeVar("R")
|
||||
|
||||
|
||||
class PerInstanceTaskDescriptor(Generic[R]):
|
||||
"""
|
||||
A descriptor class that provides a unique [`Task`][pydase.task.task.Task] object
|
||||
for each instance of a [`DataService`][pydase.data_service.data_service.DataService]
|
||||
class.
|
||||
|
||||
The `PerInstanceTaskDescriptor` is used to transform an asynchronous function into a
|
||||
task that is managed independently for each instance of a `DataService` subclass.
|
||||
This allows tasks to be initialized, started, and stopped on a per-instance basis,
|
||||
providing better control over task execution within the service.
|
||||
|
||||
The `PerInstanceTaskDescriptor` is not intended to be used directly. Instead, it is
|
||||
used internally by the `@task` decorator to manage task objects for each instance of
|
||||
the service class.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
func: Callable[[Any], Coroutine[None, None, R]]
|
||||
| Callable[[], Coroutine[None, None, R]],
|
||||
autostart: bool = False,
|
||||
) -> None:
|
||||
self.__func = func
|
||||
self.__autostart = autostart
|
||||
self.__task_instances: dict[object, Task[R]] = {}
|
||||
|
||||
def __set_name__(self, owner: type[DataService], name: str) -> None:
|
||||
"""Stores the name of the task within the owning class. This method is called
|
||||
automatically when the descriptor is assigned to a class attribute.
|
||||
"""
|
||||
|
||||
self.__task_name = name
|
||||
|
||||
@overload
|
||||
def __get__(
|
||||
self, instance: None, owner: type[DataService]
|
||||
) -> "PerInstanceTaskDescriptor[R]":
|
||||
"""Returns the descriptor itself when accessed through the class."""
|
||||
|
||||
@overload
|
||||
def __get__(self, instance: DataService, owner: type[DataService]) -> Task[R]:
|
||||
"""Returns the `Task` object associated with the specific `DataService`
|
||||
instance.
|
||||
If no task exists for the instance, a new `Task` object is created and stored
|
||||
in the `__task_instances` dictionary.
|
||||
"""
|
||||
|
||||
def __get__(
|
||||
self, instance: DataService | None, owner: type[DataService]
|
||||
) -> "Task[R] | PerInstanceTaskDescriptor[R]":
|
||||
if instance is None:
|
||||
return self
|
||||
|
||||
# Create a new Task object for this instance, using the function's name.
|
||||
if instance not in self.__task_instances:
|
||||
self.__task_instances[instance] = instance._initialise_new_objects(
|
||||
self.__task_name,
|
||||
Task(self.__func.__get__(instance, owner), autostart=self.__autostart),
|
||||
)
|
||||
|
||||
return self.__task_instances[instance]
|
||||
|
||||
|
||||
def task(
|
||||
*, autostart: bool = False
|
||||
) -> Callable[
|
||||
@@ -16,18 +80,22 @@ def task(
|
||||
Callable[[Any], Coroutine[None, None, R]]
|
||||
| Callable[[], Coroutine[None, None, R]]
|
||||
],
|
||||
Task[R],
|
||||
PerInstanceTaskDescriptor[R],
|
||||
]:
|
||||
"""
|
||||
A decorator to define a function as a task within a
|
||||
A decorator to define an asynchronous function as a per-instance task within a
|
||||
[`DataService`][pydase.DataService] class.
|
||||
|
||||
This decorator transforms an asynchronous function into a
|
||||
[`Task`][pydase.task.task.Task] object. The `Task` object provides methods like
|
||||
`start()` and `stop()` to control the execution of the task.
|
||||
[`Task`][pydase.task.task.Task] object that is unique to each instance of the
|
||||
`DataService` class. The resulting `Task` object provides methods like `start()`
|
||||
and `stop()` to control the execution of the task, and manages the task's lifecycle
|
||||
independently for each instance of the service.
|
||||
|
||||
Tasks are typically used to perform periodic or recurring jobs, such as reading
|
||||
sensor data, updating databases, or other operations that need to be repeated over
|
||||
The decorator is particularly useful for defining tasks that need to run
|
||||
periodically or perform asynchronous operations, such as polling data sources,
|
||||
updating databases, or any recurring job that should be managed within the context
|
||||
of a `DataService`.
|
||||
time.
|
||||
|
||||
Args:
|
||||
@@ -36,8 +104,10 @@ def task(
|
||||
initialized. Defaults to False.
|
||||
|
||||
Returns:
|
||||
A decorator that converts an asynchronous function into a
|
||||
[`Task`][pydase.task.task.Task] object.
|
||||
A decorator that wraps an asynchronous function in a
|
||||
[`PerInstanceTaskDescriptor`][pydase.task.decorator.PerInstanceTaskDescriptor]
|
||||
object, which, when accessed, provides an instance-specific
|
||||
[`Task`][pydase.task.task.Task] object.
|
||||
|
||||
Example:
|
||||
```python
|
||||
@@ -69,7 +139,7 @@ def task(
|
||||
def decorator(
|
||||
func: Callable[[Any], Coroutine[None, None, R]]
|
||||
| Callable[[], Coroutine[None, None, R]],
|
||||
) -> Task[R]:
|
||||
return Task(func, autostart=autostart)
|
||||
) -> PerInstanceTaskDescriptor[R]:
|
||||
return PerInstanceTaskDescriptor(func, autostart=autostart)
|
||||
|
||||
return decorator
|
||||
|
||||
@@ -1,43 +1,23 @@
|
||||
import asyncio
|
||||
import inspect
|
||||
import logging
|
||||
import sys
|
||||
from collections.abc import Callable, Coroutine
|
||||
from typing import (
|
||||
Any,
|
||||
Generic,
|
||||
TypeVar,
|
||||
)
|
||||
|
||||
from typing_extensions import TypeIs
|
||||
|
||||
from pydase.task.task_status import TaskStatus
|
||||
|
||||
if sys.version_info < (3, 11):
|
||||
from typing_extensions import Self
|
||||
else:
|
||||
from typing import Self
|
||||
|
||||
import pydase.data_service.data_service
|
||||
from pydase.task.task_status import TaskStatus
|
||||
from pydase.utils.helpers import current_event_loop_exists
|
||||
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
R = TypeVar("R")
|
||||
|
||||
|
||||
def is_bound_method(
|
||||
method: Callable[[], Coroutine[None, None, R | None]]
|
||||
| Callable[[Any], Coroutine[None, None, R | None]],
|
||||
) -> TypeIs[Callable[[], Coroutine[None, None, R | None]]]:
|
||||
"""Check if instance method is bound to an object."""
|
||||
return inspect.ismethod(method)
|
||||
|
||||
|
||||
class Task(pydase.data_service.data_service.DataService, Generic[R]):
|
||||
"""
|
||||
A class representing a task within the `pydase` framework.
|
||||
"""A class representing a task within the `pydase` framework.
|
||||
|
||||
The `Task` class wraps an asynchronous function and provides methods to manage its
|
||||
lifecycle, such as `start()` and `stop()`. It is typically used to perform periodic
|
||||
@@ -85,25 +65,24 @@ class Task(pydase.data_service.data_service.DataService, Generic[R]):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
func: Callable[[Any], Coroutine[None, None, R | None]]
|
||||
| Callable[[], Coroutine[None, None, R | None]],
|
||||
func: Callable[[], Coroutine[None, None, R | None]],
|
||||
*,
|
||||
autostart: bool = False,
|
||||
) -> None:
|
||||
super().__init__()
|
||||
self._autostart = autostart
|
||||
self._func_name = func.__name__
|
||||
self._bound_func: Callable[[], Coroutine[None, None, R | None]] | None = None
|
||||
self._set_up = False
|
||||
if is_bound_method(func):
|
||||
self._func = func
|
||||
self._bound_func = func
|
||||
else:
|
||||
self._func = func
|
||||
self._func = func
|
||||
self._task: asyncio.Task[R | None] | None = None
|
||||
self._status = TaskStatus.NOT_RUNNING
|
||||
self._result: R | None = None
|
||||
|
||||
if not current_event_loop_exists():
|
||||
self._loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(self._loop)
|
||||
else:
|
||||
self._loop = asyncio.get_event_loop()
|
||||
|
||||
@property
|
||||
def autostart(self) -> bool:
|
||||
"""Defines if the task should be started automatically when the
|
||||
@@ -144,10 +123,10 @@ class Task(pydase.data_service.data_service.DataService, Generic[R]):
|
||||
self._result = task.result()
|
||||
|
||||
async def run_task() -> R | None:
|
||||
if inspect.iscoroutinefunction(self._bound_func):
|
||||
if inspect.iscoroutinefunction(self._func):
|
||||
logger.info("Starting task %r", self._func_name)
|
||||
self._status = TaskStatus.RUNNING
|
||||
res: Coroutine[None, None, R] = self._bound_func()
|
||||
res: Coroutine[None, None, R | None] = self._func()
|
||||
try:
|
||||
return await res
|
||||
except asyncio.CancelledError:
|
||||
@@ -167,24 +146,3 @@ class Task(pydase.data_service.data_service.DataService, Generic[R]):
|
||||
|
||||
if self._task:
|
||||
self._task.cancel()
|
||||
|
||||
def __get__(self, instance: Any, owner: Any) -> Self:
|
||||
"""Descriptor method used to correctly set up the task.
|
||||
|
||||
This descriptor method is called by the class instance containing the task.
|
||||
It binds the task function to that class instance.
|
||||
|
||||
Since the `__init__` function is called when a function is decorated with
|
||||
[`@task`][pydase.task.decorator.task], some setup is delayed until this
|
||||
descriptor function is called.
|
||||
"""
|
||||
|
||||
if instance and not self._set_up:
|
||||
if not current_event_loop_exists():
|
||||
self._loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(self._loop)
|
||||
else:
|
||||
self._loop = asyncio.get_event_loop()
|
||||
self._bound_func = self._func.__get__(instance, owner)
|
||||
self._set_up = True
|
||||
return self
|
||||
|
||||
@@ -204,6 +204,17 @@ def function_has_arguments(func: Callable[..., Any]) -> bool:
|
||||
|
||||
def is_descriptor(obj: object) -> bool:
|
||||
"""Check if an object is a descriptor."""
|
||||
|
||||
# Exclude functions, methods, builtins and properties
|
||||
if (
|
||||
inspect.isfunction(obj)
|
||||
or inspect.ismethod(obj)
|
||||
or inspect.isbuiltin(obj)
|
||||
or isinstance(obj, property)
|
||||
):
|
||||
return False
|
||||
|
||||
# Check if it has any descriptor methods
|
||||
return any(hasattr(obj, method) for method in ("__get__", "__set__", "__delete__"))
|
||||
|
||||
|
||||
@@ -212,3 +223,25 @@ def current_event_loop_exists() -> bool:
|
||||
import asyncio
|
||||
|
||||
return asyncio.get_event_loop_policy()._local._loop is not None # type: ignore
|
||||
|
||||
|
||||
def normalize_full_access_path_string(s: str) -> str:
|
||||
"""Normalizes a string representing a full access path by converting double quotes
|
||||
to single quotes.
|
||||
|
||||
This function is useful for ensuring consistency in strings that represent access
|
||||
paths containing dictionary keys, by replacing all double quotes (`"`) with single
|
||||
quotes (`'`).
|
||||
|
||||
Args:
|
||||
s (str): The input string to be normalized.
|
||||
|
||||
Returns:
|
||||
A new string with all double quotes replaced by single quotes.
|
||||
|
||||
Example:
|
||||
>>> normalize_full_access_path_string('dictionary["first"].my_task')
|
||||
"dictionary['first'].my_task"
|
||||
"""
|
||||
|
||||
return s.replace('"', "'")
|
||||
|
||||
@@ -33,7 +33,7 @@ LOGGING_CONFIG = {
|
||||
"default": {
|
||||
"formatter": "default",
|
||||
"class": "logging.StreamHandler",
|
||||
"stream": "ext://sys.stderr",
|
||||
"stream": "ext://sys.stdout",
|
||||
},
|
||||
},
|
||||
"loggers": {
|
||||
|
||||
@@ -42,6 +42,8 @@ from pydase.utils.serialization.types import (
|
||||
if TYPE_CHECKING:
|
||||
from collections.abc import Callable
|
||||
|
||||
from pydase.client.proxy_class import ProxyClass
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -74,6 +76,7 @@ class Serializer:
|
||||
Returns:
|
||||
Dictionary representation of `obj`.
|
||||
"""
|
||||
from pydase.client.client import ProxyClass
|
||||
|
||||
result: SerializedObject
|
||||
|
||||
@@ -83,6 +86,9 @@ class Serializer:
|
||||
elif isinstance(obj, datetime):
|
||||
result = cls._serialize_datetime(obj, access_path=access_path)
|
||||
|
||||
elif isinstance(obj, ProxyClass):
|
||||
result = cls._serialize_proxy_class(obj, access_path=access_path)
|
||||
|
||||
elif isinstance(obj, AbstractDataService):
|
||||
result = cls._serialize_data_service(obj, access_path=access_path)
|
||||
|
||||
@@ -322,6 +328,13 @@ class Serializer:
|
||||
"doc": doc,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def _serialize_proxy_class(
|
||||
cls, obj: ProxyClass, access_path: str = ""
|
||||
) -> SerializedDataService:
|
||||
# Get serialization value from the remote service and adapt the full_access_path
|
||||
return add_prefix_to_full_access_path(obj.serialize(), access_path)
|
||||
|
||||
|
||||
def dump(obj: Any) -> SerializedObject:
|
||||
"""Serialize `obj` to a
|
||||
@@ -572,6 +585,62 @@ def generate_serialized_data_paths(
|
||||
return paths
|
||||
|
||||
|
||||
def add_prefix_to_full_access_path(
|
||||
serialized_obj: SerializedObject, prefix: str
|
||||
) -> Any:
|
||||
"""Recursively adds a specified prefix to all full access paths of the serialized
|
||||
object.
|
||||
|
||||
Args:
|
||||
data:
|
||||
The serialized object to process.
|
||||
prefix:
|
||||
The prefix string to prepend to each full access path.
|
||||
|
||||
Returns:
|
||||
The modified serialized object with the prefix added to all full access paths.
|
||||
|
||||
Example:
|
||||
```python
|
||||
>>> data = {
|
||||
... "full_access_path": "",
|
||||
... "value": {
|
||||
... "item": {
|
||||
... "full_access_path": "some_item_path",
|
||||
... "value": 1.0
|
||||
... }
|
||||
... }
|
||||
... }
|
||||
...
|
||||
... modified_data = add_prefix_to_full_access_path(data, 'prefix')
|
||||
{"full_access_path": "prefix", "value": {"item": {"full_access_path":
|
||||
"prefix.some_item_path", "value": 1.0}}}
|
||||
```
|
||||
"""
|
||||
|
||||
try:
|
||||
if serialized_obj.get("full_access_path", None) is not None:
|
||||
serialized_obj["full_access_path"] = (
|
||||
prefix + "." + serialized_obj["full_access_path"]
|
||||
if serialized_obj["full_access_path"] != ""
|
||||
else prefix
|
||||
)
|
||||
|
||||
if isinstance(serialized_obj["value"], list):
|
||||
for value in serialized_obj["value"]:
|
||||
add_prefix_to_full_access_path(cast(SerializedObject, value), prefix)
|
||||
|
||||
elif isinstance(serialized_obj["value"], dict):
|
||||
for value in cast(
|
||||
dict[str, SerializedObject], serialized_obj["value"]
|
||||
).values():
|
||||
add_prefix_to_full_access_path(cast(SerializedObject, value), prefix)
|
||||
except (TypeError, KeyError, AttributeError):
|
||||
# passed dictionary is not a serialized object
|
||||
pass
|
||||
return serialized_obj
|
||||
|
||||
|
||||
def serialized_dict_is_nested_object(serialized_dict: SerializedObject) -> bool:
|
||||
value = serialized_dict["value"]
|
||||
# We are excluding Quantity here as the value corresponding to the "value" key is
|
||||
|
||||
107
tests/client/test_reconnection.py
Normal file
107
tests/client/test_reconnection.py
Normal file
@@ -0,0 +1,107 @@
|
||||
import threading
|
||||
from collections.abc import Callable, Generator
|
||||
from typing import Any
|
||||
|
||||
import pydase
|
||||
import pytest
|
||||
import socketio.exceptions
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def pydase_restartable_server() -> (
|
||||
Generator[
|
||||
tuple[
|
||||
pydase.Server,
|
||||
threading.Thread,
|
||||
pydase.DataService,
|
||||
Callable[
|
||||
[pydase.Server, threading.Thread, pydase.DataService],
|
||||
tuple[pydase.Server, threading.Thread],
|
||||
],
|
||||
],
|
||||
None,
|
||||
Any,
|
||||
]
|
||||
):
|
||||
class MyService(pydase.DataService):
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
self._name = "MyService"
|
||||
self._my_property = 12.1
|
||||
|
||||
@property
|
||||
def my_property(self) -> float:
|
||||
return self._my_property
|
||||
|
||||
@my_property.setter
|
||||
def my_property(self, value: float) -> None:
|
||||
self._my_property = value
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return self._name
|
||||
|
||||
service_instance = MyService()
|
||||
server = pydase.Server(service_instance, web_port=9999)
|
||||
thread = threading.Thread(target=server.run, daemon=True)
|
||||
thread.start()
|
||||
|
||||
def restart(
|
||||
server: pydase.Server,
|
||||
thread: threading.Thread,
|
||||
service_instance: pydase.DataService,
|
||||
) -> tuple[pydase.Server, threading.Thread]:
|
||||
server.handle_exit()
|
||||
thread.join()
|
||||
|
||||
server = pydase.Server(service_instance, web_port=9999)
|
||||
new_thread = threading.Thread(target=server.run, daemon=True)
|
||||
new_thread.start()
|
||||
|
||||
return server, new_thread
|
||||
|
||||
yield server, thread, service_instance, restart
|
||||
|
||||
server.handle_exit()
|
||||
thread.join()
|
||||
|
||||
|
||||
def test_reconnection(
|
||||
pydase_restartable_server: tuple[
|
||||
pydase.Server,
|
||||
threading.Thread,
|
||||
pydase.DataService,
|
||||
Callable[
|
||||
[pydase.Server, threading.Thread, pydase.DataService],
|
||||
tuple[pydase.Server, threading.Thread],
|
||||
],
|
||||
],
|
||||
) -> None:
|
||||
client = pydase.Client(
|
||||
url="ws://localhost:9999",
|
||||
sio_client_kwargs={
|
||||
"reconnection": False,
|
||||
},
|
||||
)
|
||||
client_2 = pydase.Client(
|
||||
url="ws://localhost:9999",
|
||||
sio_client_kwargs={
|
||||
"reconnection_attempts": 1,
|
||||
},
|
||||
)
|
||||
|
||||
server, thread, service_instance, restart = pydase_restartable_server
|
||||
service_instance._name = "New service name"
|
||||
|
||||
server, thread = restart(server, thread, service_instance)
|
||||
|
||||
with pytest.raises(socketio.exceptions.BadNamespaceError):
|
||||
client.proxy.name
|
||||
client_2.proxy.name
|
||||
|
||||
client.proxy.reconnect()
|
||||
client_2.proxy.reconnect()
|
||||
|
||||
# the service proxies successfully reconnect and get the new service name
|
||||
assert client.proxy.name == "New service name"
|
||||
assert client_2.proxy.name == "New service name"
|
||||
@@ -5,7 +5,7 @@ import pydase
|
||||
import pytest
|
||||
from pydase.data_service.data_service_observer import DataServiceObserver
|
||||
from pydase.data_service.state_manager import StateManager
|
||||
from pydase.utils.serialization.serializer import SerializationError
|
||||
from pydase.utils.serialization.serializer import SerializationError, dump
|
||||
|
||||
logger = logging.getLogger()
|
||||
|
||||
@@ -146,3 +146,41 @@ def test_private_attribute_does_not_have_to_be_serializable() -> None:
|
||||
service_instance.change_publ_attr()
|
||||
|
||||
service_instance.change_priv_attr()
|
||||
|
||||
|
||||
def test_normalized_attr_path_in_dependent_property_changes(
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
) -> None:
|
||||
class SubService(pydase.DataService):
|
||||
_prop = 10.0
|
||||
|
||||
@property
|
||||
def prop(self) -> float:
|
||||
return self._prop
|
||||
|
||||
class MyService(pydase.DataService):
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
self.service_dict = {"one": SubService()}
|
||||
|
||||
service_instance = MyService()
|
||||
state_manager = StateManager(service=service_instance)
|
||||
observer = DataServiceObserver(state_manager=state_manager)
|
||||
|
||||
assert observer.property_deps_dict["service_dict['one']._prop"] == [
|
||||
"service_dict['one'].prop"
|
||||
]
|
||||
|
||||
# We can use dict key path encoded with double quotes
|
||||
state_manager.set_service_attribute_value_by_path(
|
||||
'service_dict["one"]._prop', dump(11.0)
|
||||
)
|
||||
assert service_instance.service_dict["one"].prop == 11.0
|
||||
assert "'service_dict[\"one\"].prop' changed to '11.0'" in caplog.text
|
||||
|
||||
# We can use dict key path encoded with single quotes
|
||||
state_manager.set_service_attribute_value_by_path(
|
||||
"service_dict['one']._prop", dump(12.0)
|
||||
)
|
||||
assert service_instance.service_dict["one"].prop == 12.0
|
||||
assert "'service_dict[\"one\"].prop' changed to '12.0'" in caplog.text
|
||||
|
||||
@@ -138,3 +138,154 @@ async def test_nested_dict_autostart_task(
|
||||
"'sub_services_dict[\"second\"].my_task.status' changed to 'TaskStatus.RUNNING'"
|
||||
in caplog.text
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio(scope="function")
|
||||
async def test_manual_start_with_multiple_service_instances(
|
||||
caplog: LogCaptureFixture,
|
||||
) -> None:
|
||||
class MySubService(pydase.DataService):
|
||||
@task()
|
||||
async def my_task(self) -> None:
|
||||
logger.info("Triggered task.")
|
||||
while True:
|
||||
await asyncio.sleep(1)
|
||||
|
||||
class MyService(pydase.DataService):
|
||||
sub_services_list = [MySubService() for i in range(2)]
|
||||
sub_services_dict = {"first": MySubService(), "second": MySubService()}
|
||||
|
||||
service_instance = MyService()
|
||||
state_manager = StateManager(service_instance)
|
||||
DataServiceObserver(state_manager)
|
||||
|
||||
autostart_service_tasks(service_instance)
|
||||
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
assert (
|
||||
service_instance.sub_services_list[0].my_task.status == TaskStatus.NOT_RUNNING
|
||||
)
|
||||
assert (
|
||||
service_instance.sub_services_list[1].my_task.status == TaskStatus.NOT_RUNNING
|
||||
)
|
||||
assert (
|
||||
service_instance.sub_services_dict["first"].my_task.status
|
||||
== TaskStatus.NOT_RUNNING
|
||||
)
|
||||
assert (
|
||||
service_instance.sub_services_dict["second"].my_task.status
|
||||
== TaskStatus.NOT_RUNNING
|
||||
)
|
||||
|
||||
service_instance.sub_services_list[0].my_task.start()
|
||||
await asyncio.sleep(0.01)
|
||||
|
||||
assert service_instance.sub_services_list[0].my_task.status == TaskStatus.RUNNING
|
||||
assert (
|
||||
"'sub_services_list[0].my_task.status' changed to 'TaskStatus.RUNNING'"
|
||||
in caplog.text
|
||||
)
|
||||
assert (
|
||||
"'sub_services_list[1].my_task.status' changed to 'TaskStatus.RUNNING'"
|
||||
not in caplog.text
|
||||
)
|
||||
assert (
|
||||
"'sub_services_dict[\"first\"].my_task.status' changed to 'TaskStatus.RUNNING'"
|
||||
not in caplog.text
|
||||
)
|
||||
assert (
|
||||
"'sub_services_dict[\"second\"].my_task.status' changed to 'TaskStatus.RUNNING'"
|
||||
not in caplog.text
|
||||
)
|
||||
|
||||
service_instance.sub_services_list[0].my_task.stop()
|
||||
await asyncio.sleep(0.01)
|
||||
|
||||
assert "Task 'my_task' was cancelled" in caplog.text
|
||||
caplog.clear()
|
||||
|
||||
service_instance.sub_services_list[1].my_task.start()
|
||||
await asyncio.sleep(0.01)
|
||||
|
||||
assert service_instance.sub_services_list[1].my_task.status == TaskStatus.RUNNING
|
||||
assert (
|
||||
"'sub_services_list[0].my_task.status' changed to 'TaskStatus.RUNNING'"
|
||||
not in caplog.text
|
||||
)
|
||||
assert (
|
||||
"'sub_services_list[1].my_task.status' changed to 'TaskStatus.RUNNING'"
|
||||
in caplog.text
|
||||
)
|
||||
assert (
|
||||
"'sub_services_dict[\"first\"].my_task.status' changed to 'TaskStatus.RUNNING'"
|
||||
not in caplog.text
|
||||
)
|
||||
assert (
|
||||
"'sub_services_dict[\"second\"].my_task.status' changed to 'TaskStatus.RUNNING'"
|
||||
not in caplog.text
|
||||
)
|
||||
|
||||
service_instance.sub_services_list[1].my_task.stop()
|
||||
await asyncio.sleep(0.01)
|
||||
|
||||
assert "Task 'my_task' was cancelled" in caplog.text
|
||||
caplog.clear()
|
||||
|
||||
service_instance.sub_services_dict["first"].my_task.start()
|
||||
await asyncio.sleep(0.01)
|
||||
|
||||
assert (
|
||||
service_instance.sub_services_dict["first"].my_task.status == TaskStatus.RUNNING
|
||||
)
|
||||
assert (
|
||||
"'sub_services_list[0].my_task.status' changed to 'TaskStatus.RUNNING'"
|
||||
not in caplog.text
|
||||
)
|
||||
assert (
|
||||
"'sub_services_list[1].my_task.status' changed to 'TaskStatus.RUNNING'"
|
||||
not in caplog.text
|
||||
)
|
||||
assert (
|
||||
"'sub_services_dict[\"first\"].my_task.status' changed to 'TaskStatus.RUNNING'"
|
||||
in caplog.text
|
||||
)
|
||||
assert (
|
||||
"'sub_services_dict[\"second\"].my_task.status' changed to 'TaskStatus.RUNNING'"
|
||||
not in caplog.text
|
||||
)
|
||||
|
||||
service_instance.sub_services_dict["first"].my_task.stop()
|
||||
await asyncio.sleep(0.01)
|
||||
|
||||
assert "Task 'my_task' was cancelled" in caplog.text
|
||||
caplog.clear()
|
||||
|
||||
service_instance.sub_services_dict["second"].my_task.start()
|
||||
await asyncio.sleep(0.01)
|
||||
|
||||
assert (
|
||||
service_instance.sub_services_dict["second"].my_task.status
|
||||
== TaskStatus.RUNNING
|
||||
)
|
||||
assert (
|
||||
"'sub_services_list[0].my_task.status' changed to 'TaskStatus.RUNNING'"
|
||||
not in caplog.text
|
||||
)
|
||||
assert (
|
||||
"'sub_services_list[1].my_task.status' changed to 'TaskStatus.RUNNING'"
|
||||
not in caplog.text
|
||||
)
|
||||
assert (
|
||||
"'sub_services_dict[\"first\"].my_task.status' changed to 'TaskStatus.RUNNING'"
|
||||
not in caplog.text
|
||||
)
|
||||
assert (
|
||||
"'sub_services_dict[\"second\"].my_task.status' changed to 'TaskStatus.RUNNING'"
|
||||
in caplog.text
|
||||
)
|
||||
|
||||
service_instance.sub_services_dict["second"].my_task.stop()
|
||||
await asyncio.sleep(0.01)
|
||||
|
||||
assert "Task 'my_task' was cancelled" in caplog.text
|
||||
|
||||
@@ -12,6 +12,7 @@ from pydase.utils.decorators import frontend
|
||||
from pydase.utils.serialization.serializer import (
|
||||
SerializationPathError,
|
||||
SerializedObject,
|
||||
add_prefix_to_full_access_path,
|
||||
dump,
|
||||
generate_serialized_data_paths,
|
||||
get_container_item_by_key,
|
||||
@@ -1070,3 +1071,156 @@ def test_get_data_paths_from_serialized_object(obj: Any, expected: list[str]) ->
|
||||
)
|
||||
def test_generate_serialized_data_paths(obj: Any, expected: list[str]) -> None:
|
||||
assert generate_serialized_data_paths(dump(obj=obj)["value"]) == expected
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"serialized_obj, prefix, expected",
|
||||
[
|
||||
(
|
||||
{
|
||||
"full_access_path": "new_attr",
|
||||
"value": {
|
||||
"name": {
|
||||
"full_access_path": "new_attr.name",
|
||||
"value": "MyService",
|
||||
}
|
||||
},
|
||||
},
|
||||
"prefix",
|
||||
{
|
||||
"full_access_path": "prefix.new_attr",
|
||||
"value": {
|
||||
"name": {
|
||||
"full_access_path": "prefix.new_attr.name",
|
||||
"value": "MyService",
|
||||
}
|
||||
},
|
||||
},
|
||||
),
|
||||
(
|
||||
{
|
||||
"full_access_path": "new_attr",
|
||||
"value": [
|
||||
{
|
||||
"full_access_path": "new_attr[0]",
|
||||
"value": 1.0,
|
||||
}
|
||||
],
|
||||
},
|
||||
"prefix",
|
||||
{
|
||||
"full_access_path": "prefix.new_attr",
|
||||
"value": [
|
||||
{
|
||||
"full_access_path": "prefix.new_attr[0]",
|
||||
"value": 1.0,
|
||||
}
|
||||
],
|
||||
},
|
||||
),
|
||||
(
|
||||
{
|
||||
"full_access_path": "new_attr",
|
||||
"value": {
|
||||
"key": {
|
||||
"full_access_path": 'new_attr["key"]',
|
||||
"value": 1.0,
|
||||
}
|
||||
},
|
||||
},
|
||||
"prefix",
|
||||
{
|
||||
"full_access_path": "prefix.new_attr",
|
||||
"value": {
|
||||
"key": {
|
||||
"full_access_path": 'prefix.new_attr["key"]',
|
||||
"value": 1.0,
|
||||
}
|
||||
},
|
||||
},
|
||||
),
|
||||
(
|
||||
{
|
||||
"full_access_path": "new_attr",
|
||||
"value": {"magnitude": 10, "unit": "meter"},
|
||||
},
|
||||
"prefix",
|
||||
{
|
||||
"full_access_path": "prefix.new_attr",
|
||||
"value": {"magnitude": 10, "unit": "meter"},
|
||||
},
|
||||
),
|
||||
(
|
||||
{
|
||||
"full_access_path": "quantity_list",
|
||||
"value": [
|
||||
{
|
||||
"full_access_path": "quantity_list[0]",
|
||||
"value": {"magnitude": 1.0, "unit": "A"},
|
||||
}
|
||||
],
|
||||
},
|
||||
"prefix",
|
||||
{
|
||||
"full_access_path": "prefix.quantity_list",
|
||||
"value": [
|
||||
{
|
||||
"full_access_path": "prefix.quantity_list[0]",
|
||||
"value": {"magnitude": 1.0, "unit": "A"},
|
||||
}
|
||||
],
|
||||
},
|
||||
),
|
||||
(
|
||||
{
|
||||
"full_access_path": "",
|
||||
"value": {
|
||||
"dict_attr": {
|
||||
"type": "dict",
|
||||
"full_access_path": "dict_attr",
|
||||
"value": {
|
||||
"foo": {
|
||||
"full_access_path": 'dict_attr["foo"]',
|
||||
"type": "dict",
|
||||
"value": {
|
||||
"some_int": {
|
||||
"full_access_path": 'dict_attr["foo"].some_int',
|
||||
"type": "int",
|
||||
"value": 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
},
|
||||
},
|
||||
"prefix",
|
||||
{
|
||||
"full_access_path": "prefix",
|
||||
"value": {
|
||||
"dict_attr": {
|
||||
"type": "dict",
|
||||
"full_access_path": "prefix.dict_attr",
|
||||
"value": {
|
||||
"foo": {
|
||||
"full_access_path": 'prefix.dict_attr["foo"]',
|
||||
"type": "dict",
|
||||
"value": {
|
||||
"some_int": {
|
||||
"full_access_path": 'prefix.dict_attr["foo"].some_int',
|
||||
"type": "int",
|
||||
"value": 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
},
|
||||
},
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_add_prefix_to_full_access_path(
|
||||
serialized_obj: SerializedObject, prefix: str, expected: SerializedObject
|
||||
) -> None:
|
||||
assert add_prefix_to_full_access_path(serialized_obj, prefix) == expected
|
||||
|
||||
Reference in New Issue
Block a user