38 Commits

Author SHA1 Message Date
Mose Müller
eb32b34b59 Merge pull request #172 from tiqi-group/feat/update_client_reconnection
Feat: update client reconnection
2024-10-02 09:29:39 +02:00
Mose Müller
9eedf03c01 adds reconnection method to proxy class which is called when the sio client does not reconnect 2024-10-02 09:18:32 +02:00
Mose Müller
5ec7a8b530 docs: updates Python Client user guide 2024-10-02 08:24:29 +02:00
Mose Müller
f2f330dbd9 docs: adds python-socketio object inventory 2024-10-02 07:11:35 +02:00
Mose Müller
2e0e056489 adds sio_client_kwargs as pydase.Client keyword argument 2024-10-02 07:10:34 +02:00
Mose Müller
d8685fe9a0 Merge pull request #169 from tiqi-group/fix/proxy_class_representation
Fix: proxy class representation
2024-10-01 11:02:44 +02:00
Mose Müller
e52a019d5e fixes add_prefix_to_full_access_path, updates tests
The prefix does not contain a "." anymore. This will be added by the
function itself (to be able to distinguish empty full access paths).
2024-10-01 11:01:01 +02:00
Mose Müller
0d5cef1537 updates how Client handles (re-)connection with the server
The client will update the proxy class serialization directly on the
ProxyClass instance. this is the only time this get "updated".
Now, the client also notifies the observers directly with the proxy
object as this the serialization of the proxy class is now done through
its `serialize` method (which we have overwritten in a previous commit).
2024-10-01 10:55:29 +02:00
Mose Müller
e8f33eee4d updates ProxyClass serialize method
The ProxyClass will keep a copy of its serialized state s.t. it does not
have to call the remote service. This hangs the event loop if trying to
call asyncio.run_coroutine_threadsafe when already inside the thread.
2024-10-01 10:55:29 +02:00
Mose Müller
a3b71b174c fixes proxy class serialization (needs device connection methods and properties) 2024-10-01 08:25:39 +02:00
Mose Müller
e2ce0e9acb adds ProxyClass serialization support 2024-10-01 07:27:27 +02:00
Mose Müller
f47a183c11 adds add_prefix_to_full_access_path helper function 2024-10-01 07:13:22 +02:00
Mose Müller
a9ea237cf3 overrides serialize method in ProxyClass, getting it from remote service 2024-09-30 16:58:01 +02:00
Mose Müller
6db1652dd3 move ProxyClass into separate file 2024-09-30 16:57:12 +02:00
Mose Müller
e3b95a8076 Merge pull request #168 from tiqi-group/fix/logging_to_stdout
Fix: logging to stdout
2024-09-30 10:57:23 +02:00
Mose Müller
0fe2a8516f updates to version v0.10.6 2024-09-30 10:55:48 +02:00
Mose Müller
51bbaba162 writing to stdout instead of stderr 2024-09-30 10:54:58 +02:00
Mose Müller
77802da417 Merge pull request #166 from tiqi-group/fix/logging_settings
fix: removes basic configuration of logging system in task module
2024-09-25 19:50:10 +02:00
Mose Müller
3e21858cb7 removes basic configuration of logging system in task module 2024-09-25 19:48:32 +02:00
Mose Müller
2003f28fd1 Merge pull request #165 from tiqi-group/fix/client_usage_in_jupyter_notebook
Fix: client usage in jupyter notebook
2024-09-25 19:41:42 +02:00
Mose Müller
172b50bf77 updates to version v0.10.5 2024-09-25 19:38:28 +02:00
Mose Müller
ec5694fedf no need to check for RuntimeError as the loop is always new 2024-09-25 19:38:09 +02:00
Mose Müller
968f774092 always create a new event loop in the client and pass it to a new thread 2024-09-25 19:37:33 +02:00
Mose Müller
757dc9aa3c Merge pull request #163 from tiqi-group/fix/removes_PerInstanceTaskDescriptor_warning
fix: removes inheritance warning for descriptors
2024-09-23 13:35:44 +02:00
Mose Müller
3d938562a6 updates to version v0.10.4 2024-09-23 13:35:33 +02:00
Mose Müller
964a62d4b4 removes inheritance warning for descriptors 2024-09-23 13:33:13 +02:00
Mose Müller
99aa38fcfe chore: removes unused code 2024-09-23 13:28:08 +02:00
Mose Müller
5658514c8a Merge pull request #162 from tiqi-group/fix/normalize_full_access_path
Fix: normalize full access path (for dict keys)
2024-09-23 13:23:11 +02:00
Mose Müller
109ee7d5e1 updates version to v0.10.3 2024-09-23 13:16:50 +02:00
Mose Müller
f4fa02fe11 adds test enuring dict keys can be encoded with both single and double quotes 2024-09-23 13:16:50 +02:00
Mose Müller
487ef504a8 normalizes full access path strings containing dict keys with double quotes
Full access paths containing stringed dictionary keys are sent with
double quotes from the frontend. The quotes have to be changed to single
quotes s.t. the comparison with the property dependency dictionary
works.
2024-09-23 13:16:49 +02:00
Mose Müller
c98e407ed7 Merge pull request #161 from tiqi-group/160-control-of-tasks-in-instances-derived-from-same-class-only-controls-task-of-first-instance
fix: controlling tasks of instances derived from same class
2024-09-23 12:54:38 +02:00
Mose Müller
6b6ce1d43f adds test checking for multiple instances of a class containing a task 2024-09-23 09:44:43 +02:00
Mose Müller
e491ac7458 observable does not have to initialise descriptor objects anymore
The task decorator is not returning a Task object directly anymore, but
rather a descriptor which is returning the task. This is where the task
is initialised and this does not have to be done in the observable base
class, any more.
2024-09-23 09:27:15 +02:00
Mose Müller
e9d8cbafc2 adds PerInstanceTaskDescriptor class managing task objects for service class instances
When defining a task on a DataService class, formerly a task object was
created which replaced the decorated method as a class attribute. This
caused errors when using multiple instances of that class as each
instance was referring to the same task.
This descriptor class now handles the tasks per instance of the service
class.
2024-09-23 09:21:04 +02:00
Mose Müller
aa705592b2 removes code from Task meant to bind the passed function to the containing class instance
The task object will only receive bound methods, so there is no need to
keep the descriptor functionality anymore.
2024-09-23 09:15:42 +02:00
Mose Müller
008e1262bb updates PropertyObserver to support descriptors returning observables
If a class attribute is a descriptor, get its value before checking if
it is an Observable.
2024-09-23 08:57:00 +02:00
Mose Müller
91a71ad004 updates is_descriptor to exclude false positives for methods, functions and builtins 2024-09-23 08:55:44 +02:00
18 changed files with 875 additions and 163 deletions

View File

@@ -1,60 +1,86 @@
# Python RPC Client
You can connect to the service using the `pydase.Client`. Below is an example of how to establish a connection to a service and interact with it:
The [`pydase.Client`][pydase.Client] allows you to connect to a remote `pydase` service using socket.io, facilitating interaction with the service as though it were running locally.
## Basic Usage
```python
import pydase
# Replace the hostname and port with the IP address and the port of the machine where
# the service is running, respectively
# Replace <ip_addr> and <service_port> with the appropriate values for your service
client_proxy = pydase.Client(url="ws://<ip_addr>:<service_port>").proxy
# client_proxy = pydase.Client(url="wss://your-domain.ch").proxy # if your service uses ssl-encryption
# For SSL-encrypted services, use the wss protocol
# client_proxy = pydase.Client(url="wss://your-domain.ch").proxy
# Interact with the service attributes as if they were local
client_proxy.voltage = 5.0
print(client_proxy.voltage) # Expected output: 5.0
```
This example demonstrates setting and retrieving the `voltage` attribute through the client proxy.
The proxy acts as a local representative of the remote service, enabling straightforward interaction.
This example shows how to set and retrieve the `voltage` attribute through the client proxy.
The proxy acts as a local representation of the remote service, enabling intuitive interaction.
The proxy class dynamically synchronizes with the server's exposed attributes. This synchronization allows the proxy to be automatically updated with any attributes or methods that the server exposes, essentially mirroring the server's API. This dynamic updating enables users to interact with the remote service as if they were working with a local object.
The proxy class automatically synchronizes with the server's attributes and methods, keeping itself up-to-date with any changes. This dynamic synchronization essentially mirrors the server's API, making it feel like you're working with a local object.
## Context Manager
## Context Manager Support
You can also use the client as a context manager which automatically opens and closes the connection again:
You can also use the client within a context manager, which automatically handles connection management (i.e., opening and closing the connection):
```python
import pydase
with pydase.Client(url="ws://localhost:8001") as client:
client.proxy.<my_method>()
client.proxy.my_method()
```
Using the context manager ensures that connections are cleanly closed once the block of code finishes executing.
## Tab Completion Support
In interactive environments such as Python interpreters and Jupyter notebooks, the proxy class supports tab completion, which allows users to explore available methods and attributes.
In interactive environments like Python interpreters or Jupyter notebooks, the proxy supports tab completion. This allows users to explore available methods and attributes.
## Integration within Other Services
## Integrating the Client into Another Service
You can also integrate a client proxy within another service. Here's how you can set it up:
You can integrate a `pydase` client proxy within another service. Here's an example of how to set this up:
```python
import pydase
class MyService(pydase.DataService):
# Initialize the client without blocking the constructor
proxy = pydase.Client(url="ws://<ip_addr>:<service_port>", block_until_connected=False).proxy
# proxy = pydase.Client(url="wss://your-domain.ch", block_until_connected=False).proxy # communicating with ssl-encrypted service
proxy = pydase.Client(
url="ws://<ip_addr>:<service_port>",
block_until_connected=False
).proxy
# For SSL-encrypted services, use the wss protocol
# proxy = pydase.Client(
# url="wss://your-domain.ch",
# block_until_connected=False
# ).proxy
if __name__ == "__main__":
service = MyService()
# Create a server that exposes this service; adjust the web_port as needed
server = pydase.Server(service, web_port=8002). run()
# Create a server that exposes this service
server = pydase.Server(service, web_port=8002).run()
```
In this setup, the `MyService` class has a `proxy` attribute that connects to a `pydase` service located at `<ip_addr>:8001`.
The `block_until_connected=False` argument allows the service to start up even if the initial connection attempt fails.
This configuration is particularly useful in distributed systems where services may start in any order.
In this example:
- The `MyService` class has a `proxy` attribute that connects to a `pydase` service at `<ip_addr>:<service_port>`.
- By setting `block_until_connected=False`, the service can start without waiting for the connection to succeed, which is particularly useful in distributed systems where services may initialize in any order.
## Custom `socketio.AsyncClient` Connection Parameters
You can also configure advanced connection options by passing additional arguments to the underlying [`AsyncClient`][socketio.AsyncClient] via `sio_client_kwargs`. This allows you to fine-tune reconnection behaviour, delays, and other settings:
```python
client = pydase.Client(
url="ws://localhost:8001",
sio_client_kwargs={
"reconnection_attempts": 3,
"reconnection_delay": 2,
"reconnection_delay_max": 10,
}
).proxy
```
In this setup, the client will attempt to reconnect three times, with an initial delay of 2 seconds (each successive attempt doubles this delay) and a maximum delay of 10 seconds between attempts.

View File

@@ -54,6 +54,7 @@ plugins:
- https://docs.python.org/3/objects.inv
- https://docs.pydantic.dev/latest/objects.inv
- https://confz.readthedocs.io/en/latest/objects.inv
- https://python-socketio.readthedocs.io/en/stable/objects.inv
options:
show_source: true
inherited_members: true

View File

@@ -1,6 +1,6 @@
[tool.poetry]
name = "pydase"
version = "0.10.2"
version = "0.10.6"
description = "A flexible and robust Python library for creating, managing, and interacting with data services, with built-in support for web and RPC servers, and customizable features for diverse use cases."
authors = ["Mose Mueller <mosmuell@ethz.ch>"]
readme = "README.md"

View File

@@ -2,13 +2,12 @@ import asyncio
import logging
import sys
import threading
from typing import TypedDict, cast
from typing import TYPE_CHECKING, Any, TypedDict, cast
import socketio # type: ignore
import pydase.components
from pydase.client.proxy_loader import ProxyClassMixin, ProxyLoader
from pydase.utils.helpers import current_event_loop_exists
from pydase.client.proxy_class import ProxyClass
from pydase.client.proxy_loader import ProxyLoader
from pydase.utils.serialization.deserializer import loads
from pydase.utils.serialization.types import SerializedDataService, SerializedObject
@@ -32,51 +31,7 @@ class NotifyDict(TypedDict):
def asyncio_loop_thread(loop: asyncio.AbstractEventLoop) -> None:
asyncio.set_event_loop(loop)
try:
loop.run_forever()
except RuntimeError:
logger.debug("Tried starting even loop, but it is running already")
class ProxyClass(ProxyClassMixin, pydase.components.DeviceConnection):
"""
A proxy class that serves as the interface for interacting with device connections
via a socket.io client in an asyncio environment.
Args:
sio_client:
The socket.io client instance used for asynchronous communication with the
pydase service server.
loop:
The event loop in which the client operations are managed and executed.
This class is used to create a proxy object that behaves like a local representation
of a remote pydase service, facilitating direct interaction as if it were local
while actually communicating over network protocols.
It can also be used as an attribute of a pydase service itself, e.g.
```python
import pydase
class MyService(pydase.DataService):
proxy = pydase.Client(
hostname="...", port=8001, block_until_connected=False
).proxy
if __name__ == "__main__":
service = MyService()
server = pydase.Server(service, web_port=8002).run()
```
"""
def __init__(
self, sio_client: socketio.AsyncClient, loop: asyncio.AbstractEventLoop
) -> None:
super().__init__()
pydase.components.DeviceConnection.__init__(self)
self._initialise(sio_client=sio_client, loop=loop)
loop.run_forever()
class Client:
@@ -90,15 +45,35 @@ class Client:
url:
The URL of the pydase Socket.IO server. This should always contain the
protocol and the hostname.
Examples:
- `wss://my-service.example.com` # for secure connections, use wss
- `ws://localhost:8001`
block_until_connected:
If set to True, the constructor will block until the connection to the
service has been established. This is useful for ensuring the client is
ready to use immediately after instantiation. Default is True.
sio_client_kwargs:
Additional keyword arguments passed to the underlying
[`AsyncClient`][socketio.AsyncClient]. This allows fine-tuning of the
client's behaviour (e.g., reconnection attempts or reconnection delay).
Default is an empty dictionary.
Example:
The following example demonstrates a `Client` instance that connects to another
pydase service, while customising some of the connection settings for the
underlying [`AsyncClient`][socketio.AsyncClient].
```python
pydase.Client(url="ws://localhost:8001", sio_client_kwargs={
"reconnection_attempts": 2,
"reconnection_delay": 2,
"reconnection_delay_max": 8,
})
```
When connecting to a server over a secure connection (i.e., the server is using
SSL/TLS encryption), make sure that the `wss` protocol is used instead of `ws`:
```python
pydase.Client(url="wss://my-service.example.com")
```
"""
def __init__(
@@ -106,15 +81,14 @@ class Client:
*,
url: str,
block_until_connected: bool = True,
sio_client_kwargs: dict[str, Any] = {},
):
self._url = url
self._sio = socketio.AsyncClient()
if not current_event_loop_exists():
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
else:
self._loop = asyncio.get_event_loop()
self.proxy = ProxyClass(sio_client=self._sio, loop=self._loop)
self._sio = socketio.AsyncClient(**sio_client_kwargs)
self._loop = asyncio.new_event_loop()
self.proxy = ProxyClass(
sio_client=self._sio, loop=self._loop, reconnect=self.connect
)
"""A proxy object representing the remote service, facilitating interaction as
if it were local."""
self._thread = threading.Thread(
@@ -170,7 +144,13 @@ class Client:
self.proxy, serialized_object=serialized_object
)
serialized_object["type"] = "DeviceConnection"
self.proxy._notify_changed("", loads(serialized_object))
if self.proxy._service_representation is not None:
# need to use object.__setattr__ to not trigger an observer notification
object.__setattr__(self.proxy, "_service_representation", serialized_object)
if TYPE_CHECKING:
self.proxy._service_representation = serialized_object # type: ignore
self.proxy._notify_changed("", self.proxy)
self.proxy._connected = True
async def _handle_disconnect(self) -> None:

View File

@@ -0,0 +1,112 @@
import asyncio
import logging
from collections.abc import Callable
from copy import deepcopy
from typing import TYPE_CHECKING, cast
import socketio # type: ignore
import pydase.components
from pydase.client.proxy_loader import ProxyClassMixin
from pydase.utils.helpers import get_attribute_doc
from pydase.utils.serialization.types import SerializedDataService, SerializedObject
logger = logging.getLogger(__name__)
class ProxyClass(ProxyClassMixin, pydase.components.DeviceConnection):
"""
A proxy class that serves as the interface for interacting with device connections
via a socket.io client in an asyncio environment.
Args:
sio_client:
The socket.io client instance used for asynchronous communication with the
pydase service server.
loop:
The event loop in which the client operations are managed and executed.
reconnect:
The method that is called periodically when the client is not connected.
This class is used to create a proxy object that behaves like a local representation
of a remote pydase service, facilitating direct interaction as if it were local
while actually communicating over network protocols.
It can also be used as an attribute of a pydase service itself, e.g.
```python
import pydase
class MyService(pydase.DataService):
proxy = pydase.Client(
hostname="...", port=8001, block_until_connected=False
).proxy
if __name__ == "__main__":
service = MyService()
server = pydase.Server(service, web_port=8002).run()
```
"""
def __init__(
self,
sio_client: socketio.AsyncClient,
loop: asyncio.AbstractEventLoop,
reconnect: Callable[..., None],
) -> None:
if TYPE_CHECKING:
self._service_representation: None | SerializedObject = None
super().__init__()
pydase.components.DeviceConnection.__init__(self)
self._initialise(sio_client=sio_client, loop=loop)
object.__setattr__(self, "_service_representation", None)
self.reconnect = reconnect
def serialize(self) -> SerializedObject:
if self._service_representation is None:
serialization_future = cast(
asyncio.Future[SerializedDataService],
asyncio.run_coroutine_threadsafe(
self._sio.call("service_serialization"), self._loop
),
)
# need to use object.__setattr__ to not trigger an observer notification
object.__setattr__(
self, "_service_representation", serialization_future.result()
)
if TYPE_CHECKING:
self._service_representation = serialization_future.result()
device_connection_value = cast(
dict[str, SerializedObject],
pydase.components.DeviceConnection().serialize()["value"],
)
readonly = False
doc = get_attribute_doc(self)
obj_name = self.__class__.__name__
value = {
**cast(
dict[str, SerializedObject],
# need to deepcopy to not overwrite the _service_representation dict
# when adding a prefix with add_prefix_to_full_access_path
deepcopy(self._service_representation["value"]),
),
**device_connection_value,
}
return {
"full_access_path": "",
"name": obj_name,
"type": "DeviceConnection",
"value": value,
"readonly": readonly,
"doc": doc,
}
def connect(self) -> None:
if not self._sio.reconnection or self._sio.reconnection_attempts > 0:
self.reconnect(block_until_connected=False)

View File

@@ -10,6 +10,7 @@ from pydase.observer_pattern.observable.observable import (
)
from pydase.utils.helpers import (
get_class_and_instance_attributes,
is_descriptor,
is_property_attribute,
)
from pydase.utils.serialization.serializer import (
@@ -68,7 +69,7 @@ class DataService(AbstractDataService):
if not issubclass(
value_class,
(int | float | bool | str | list | dict | Enum | u.Quantity | Observable),
):
) and not is_descriptor(__value):
logger.warning(
"Class '%s' does not inherit from DataService. This may lead to"
" unexpected behaviour!",

View File

@@ -8,7 +8,10 @@ from pydase.observer_pattern.observable.observable_object import ObservableObjec
from pydase.observer_pattern.observer.property_observer import (
PropertyObserver,
)
from pydase.utils.helpers import get_object_attr_from_path
from pydase.utils.helpers import (
get_object_attr_from_path,
normalize_full_access_path_string,
)
from pydase.utils.serialization.serializer import (
SerializationPathError,
SerializedObject,
@@ -99,7 +102,8 @@ class DataServiceObserver(PropertyObserver):
)
def _notify_dependent_property_changes(self, changed_attr_path: str) -> None:
changed_props = self.property_deps_dict.get(changed_attr_path, [])
normalized_attr_path = normalize_full_access_path_string(changed_attr_path)
changed_props = self.property_deps_dict.get(normalized_attr_path, [])
for prop in changed_props:
# only notify about changing attribute if it is not currently being
# "changed" e.g. when calling the getter of a property within another

View File

@@ -22,12 +22,9 @@ class Observable(ObservableObject):
- {"__annotations__"}
}
for name, value in class_attrs.items():
if isinstance(value, property) or callable(value):
continue
if is_descriptor(value):
# Descriptors have to be stored as a class variable in another class to
# work properly. So don't make it an instance attribute.
self._initialise_new_objects(name, value)
if isinstance(value, property) or callable(value) or is_descriptor(value):
# Properties, methods and descriptors have to be stored as class
# attributes to work properly. So don't make it an instance attribute.
continue
self.__dict__[name] = self._initialise_new_objects(name, value)

View File

@@ -5,6 +5,7 @@ from typing import Any
from pydase.observer_pattern.observable.observable import Observable
from pydase.observer_pattern.observer.observer import Observer
from pydase.utils.helpers import is_descriptor
logger = logging.getLogger(__name__)
@@ -61,17 +62,27 @@ class PropertyObserver(Observer):
self, obj: Observable, deps: dict[str, Any], prefix: str
) -> None:
for k, value in {**vars(type(obj)), **vars(obj)}.items():
actual_value = value
prefix = (
f"{prefix}." if prefix != "" and not prefix.endswith(".") else prefix
)
parent_path = f"{prefix}{k}"
if isinstance(value, Observable):
# Get value from descriptor
if not isinstance(value, property) and is_descriptor(value):
actual_value = getattr(obj, k)
if isinstance(actual_value, Observable):
new_prefix = f"{parent_path}."
deps.update(
self._get_properties_and_their_dependencies(value, new_prefix)
self._get_properties_and_their_dependencies(
actual_value, new_prefix
)
)
elif isinstance(value, list | dict):
self._process_collection_item_properties(value, deps, parent_path)
self._process_collection_item_properties(
actual_value, deps, parent_path
)
def _process_collection_item_properties(
self,

View File

@@ -1,7 +1,8 @@
import logging
from collections.abc import Callable, Coroutine
from typing import Any, TypeVar
from typing import Any, Generic, TypeVar, overload
from pydase.data_service.data_service import DataService
from pydase.task.task import Task
logger = logging.getLogger(__name__)
@@ -9,6 +10,69 @@ logger = logging.getLogger(__name__)
R = TypeVar("R")
class PerInstanceTaskDescriptor(Generic[R]):
"""
A descriptor class that provides a unique [`Task`][pydase.task.task.Task] object
for each instance of a [`DataService`][pydase.data_service.data_service.DataService]
class.
The `PerInstanceTaskDescriptor` is used to transform an asynchronous function into a
task that is managed independently for each instance of a `DataService` subclass.
This allows tasks to be initialized, started, and stopped on a per-instance basis,
providing better control over task execution within the service.
The `PerInstanceTaskDescriptor` is not intended to be used directly. Instead, it is
used internally by the `@task` decorator to manage task objects for each instance of
the service class.
"""
def __init__(
self,
func: Callable[[Any], Coroutine[None, None, R]]
| Callable[[], Coroutine[None, None, R]],
autostart: bool = False,
) -> None:
self.__func = func
self.__autostart = autostart
self.__task_instances: dict[object, Task[R]] = {}
def __set_name__(self, owner: type[DataService], name: str) -> None:
"""Stores the name of the task within the owning class. This method is called
automatically when the descriptor is assigned to a class attribute.
"""
self.__task_name = name
@overload
def __get__(
self, instance: None, owner: type[DataService]
) -> "PerInstanceTaskDescriptor[R]":
"""Returns the descriptor itself when accessed through the class."""
@overload
def __get__(self, instance: DataService, owner: type[DataService]) -> Task[R]:
"""Returns the `Task` object associated with the specific `DataService`
instance.
If no task exists for the instance, a new `Task` object is created and stored
in the `__task_instances` dictionary.
"""
def __get__(
self, instance: DataService | None, owner: type[DataService]
) -> "Task[R] | PerInstanceTaskDescriptor[R]":
if instance is None:
return self
# Create a new Task object for this instance, using the function's name.
if instance not in self.__task_instances:
self.__task_instances[instance] = instance._initialise_new_objects(
self.__task_name,
Task(self.__func.__get__(instance, owner), autostart=self.__autostart),
)
return self.__task_instances[instance]
def task(
*, autostart: bool = False
) -> Callable[
@@ -16,18 +80,22 @@ def task(
Callable[[Any], Coroutine[None, None, R]]
| Callable[[], Coroutine[None, None, R]]
],
Task[R],
PerInstanceTaskDescriptor[R],
]:
"""
A decorator to define a function as a task within a
A decorator to define an asynchronous function as a per-instance task within a
[`DataService`][pydase.DataService] class.
This decorator transforms an asynchronous function into a
[`Task`][pydase.task.task.Task] object. The `Task` object provides methods like
`start()` and `stop()` to control the execution of the task.
[`Task`][pydase.task.task.Task] object that is unique to each instance of the
`DataService` class. The resulting `Task` object provides methods like `start()`
and `stop()` to control the execution of the task, and manages the task's lifecycle
independently for each instance of the service.
Tasks are typically used to perform periodic or recurring jobs, such as reading
sensor data, updating databases, or other operations that need to be repeated over
The decorator is particularly useful for defining tasks that need to run
periodically or perform asynchronous operations, such as polling data sources,
updating databases, or any recurring job that should be managed within the context
of a `DataService`.
time.
Args:
@@ -36,8 +104,10 @@ def task(
initialized. Defaults to False.
Returns:
A decorator that converts an asynchronous function into a
[`Task`][pydase.task.task.Task] object.
A decorator that wraps an asynchronous function in a
[`PerInstanceTaskDescriptor`][pydase.task.decorator.PerInstanceTaskDescriptor]
object, which, when accessed, provides an instance-specific
[`Task`][pydase.task.task.Task] object.
Example:
```python
@@ -69,7 +139,7 @@ def task(
def decorator(
func: Callable[[Any], Coroutine[None, None, R]]
| Callable[[], Coroutine[None, None, R]],
) -> Task[R]:
return Task(func, autostart=autostart)
) -> PerInstanceTaskDescriptor[R]:
return PerInstanceTaskDescriptor(func, autostart=autostart)
return decorator

View File

@@ -1,43 +1,23 @@
import asyncio
import inspect
import logging
import sys
from collections.abc import Callable, Coroutine
from typing import (
Any,
Generic,
TypeVar,
)
from typing_extensions import TypeIs
from pydase.task.task_status import TaskStatus
if sys.version_info < (3, 11):
from typing_extensions import Self
else:
from typing import Self
import pydase.data_service.data_service
from pydase.task.task_status import TaskStatus
from pydase.utils.helpers import current_event_loop_exists
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
R = TypeVar("R")
def is_bound_method(
method: Callable[[], Coroutine[None, None, R | None]]
| Callable[[Any], Coroutine[None, None, R | None]],
) -> TypeIs[Callable[[], Coroutine[None, None, R | None]]]:
"""Check if instance method is bound to an object."""
return inspect.ismethod(method)
class Task(pydase.data_service.data_service.DataService, Generic[R]):
"""
A class representing a task within the `pydase` framework.
"""A class representing a task within the `pydase` framework.
The `Task` class wraps an asynchronous function and provides methods to manage its
lifecycle, such as `start()` and `stop()`. It is typically used to perform periodic
@@ -85,25 +65,24 @@ class Task(pydase.data_service.data_service.DataService, Generic[R]):
def __init__(
self,
func: Callable[[Any], Coroutine[None, None, R | None]]
| Callable[[], Coroutine[None, None, R | None]],
func: Callable[[], Coroutine[None, None, R | None]],
*,
autostart: bool = False,
) -> None:
super().__init__()
self._autostart = autostart
self._func_name = func.__name__
self._bound_func: Callable[[], Coroutine[None, None, R | None]] | None = None
self._set_up = False
if is_bound_method(func):
self._func = func
self._bound_func = func
else:
self._func = func
self._func = func
self._task: asyncio.Task[R | None] | None = None
self._status = TaskStatus.NOT_RUNNING
self._result: R | None = None
if not current_event_loop_exists():
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
else:
self._loop = asyncio.get_event_loop()
@property
def autostart(self) -> bool:
"""Defines if the task should be started automatically when the
@@ -144,10 +123,10 @@ class Task(pydase.data_service.data_service.DataService, Generic[R]):
self._result = task.result()
async def run_task() -> R | None:
if inspect.iscoroutinefunction(self._bound_func):
if inspect.iscoroutinefunction(self._func):
logger.info("Starting task %r", self._func_name)
self._status = TaskStatus.RUNNING
res: Coroutine[None, None, R] = self._bound_func()
res: Coroutine[None, None, R | None] = self._func()
try:
return await res
except asyncio.CancelledError:
@@ -167,24 +146,3 @@ class Task(pydase.data_service.data_service.DataService, Generic[R]):
if self._task:
self._task.cancel()
def __get__(self, instance: Any, owner: Any) -> Self:
"""Descriptor method used to correctly set up the task.
This descriptor method is called by the class instance containing the task.
It binds the task function to that class instance.
Since the `__init__` function is called when a function is decorated with
[`@task`][pydase.task.decorator.task], some setup is delayed until this
descriptor function is called.
"""
if instance and not self._set_up:
if not current_event_loop_exists():
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
else:
self._loop = asyncio.get_event_loop()
self._bound_func = self._func.__get__(instance, owner)
self._set_up = True
return self

View File

@@ -204,6 +204,17 @@ def function_has_arguments(func: Callable[..., Any]) -> bool:
def is_descriptor(obj: object) -> bool:
"""Check if an object is a descriptor."""
# Exclude functions, methods, builtins and properties
if (
inspect.isfunction(obj)
or inspect.ismethod(obj)
or inspect.isbuiltin(obj)
or isinstance(obj, property)
):
return False
# Check if it has any descriptor methods
return any(hasattr(obj, method) for method in ("__get__", "__set__", "__delete__"))
@@ -212,3 +223,25 @@ def current_event_loop_exists() -> bool:
import asyncio
return asyncio.get_event_loop_policy()._local._loop is not None # type: ignore
def normalize_full_access_path_string(s: str) -> str:
"""Normalizes a string representing a full access path by converting double quotes
to single quotes.
This function is useful for ensuring consistency in strings that represent access
paths containing dictionary keys, by replacing all double quotes (`"`) with single
quotes (`'`).
Args:
s (str): The input string to be normalized.
Returns:
A new string with all double quotes replaced by single quotes.
Example:
>>> normalize_full_access_path_string('dictionary["first"].my_task')
"dictionary['first'].my_task"
"""
return s.replace('"', "'")

View File

@@ -33,7 +33,7 @@ LOGGING_CONFIG = {
"default": {
"formatter": "default",
"class": "logging.StreamHandler",
"stream": "ext://sys.stderr",
"stream": "ext://sys.stdout",
},
},
"loggers": {

View File

@@ -42,6 +42,8 @@ from pydase.utils.serialization.types import (
if TYPE_CHECKING:
from collections.abc import Callable
from pydase.client.proxy_class import ProxyClass
logger = logging.getLogger(__name__)
@@ -74,6 +76,7 @@ class Serializer:
Returns:
Dictionary representation of `obj`.
"""
from pydase.client.client import ProxyClass
result: SerializedObject
@@ -83,6 +86,9 @@ class Serializer:
elif isinstance(obj, datetime):
result = cls._serialize_datetime(obj, access_path=access_path)
elif isinstance(obj, ProxyClass):
result = cls._serialize_proxy_class(obj, access_path=access_path)
elif isinstance(obj, AbstractDataService):
result = cls._serialize_data_service(obj, access_path=access_path)
@@ -322,6 +328,13 @@ class Serializer:
"doc": doc,
}
@classmethod
def _serialize_proxy_class(
cls, obj: ProxyClass, access_path: str = ""
) -> SerializedDataService:
# Get serialization value from the remote service and adapt the full_access_path
return add_prefix_to_full_access_path(obj.serialize(), access_path)
def dump(obj: Any) -> SerializedObject:
"""Serialize `obj` to a
@@ -572,6 +585,62 @@ def generate_serialized_data_paths(
return paths
def add_prefix_to_full_access_path(
serialized_obj: SerializedObject, prefix: str
) -> Any:
"""Recursively adds a specified prefix to all full access paths of the serialized
object.
Args:
data:
The serialized object to process.
prefix:
The prefix string to prepend to each full access path.
Returns:
The modified serialized object with the prefix added to all full access paths.
Example:
```python
>>> data = {
... "full_access_path": "",
... "value": {
... "item": {
... "full_access_path": "some_item_path",
... "value": 1.0
... }
... }
... }
...
... modified_data = add_prefix_to_full_access_path(data, 'prefix')
{"full_access_path": "prefix", "value": {"item": {"full_access_path":
"prefix.some_item_path", "value": 1.0}}}
```
"""
try:
if serialized_obj.get("full_access_path", None) is not None:
serialized_obj["full_access_path"] = (
prefix + "." + serialized_obj["full_access_path"]
if serialized_obj["full_access_path"] != ""
else prefix
)
if isinstance(serialized_obj["value"], list):
for value in serialized_obj["value"]:
add_prefix_to_full_access_path(cast(SerializedObject, value), prefix)
elif isinstance(serialized_obj["value"], dict):
for value in cast(
dict[str, SerializedObject], serialized_obj["value"]
).values():
add_prefix_to_full_access_path(cast(SerializedObject, value), prefix)
except (TypeError, KeyError, AttributeError):
# passed dictionary is not a serialized object
pass
return serialized_obj
def serialized_dict_is_nested_object(serialized_dict: SerializedObject) -> bool:
value = serialized_dict["value"]
# We are excluding Quantity here as the value corresponding to the "value" key is

View File

@@ -0,0 +1,107 @@
import threading
from collections.abc import Callable, Generator
from typing import Any
import pydase
import pytest
import socketio.exceptions
@pytest.fixture(scope="function")
def pydase_restartable_server() -> (
Generator[
tuple[
pydase.Server,
threading.Thread,
pydase.DataService,
Callable[
[pydase.Server, threading.Thread, pydase.DataService],
tuple[pydase.Server, threading.Thread],
],
],
None,
Any,
]
):
class MyService(pydase.DataService):
def __init__(self) -> None:
super().__init__()
self._name = "MyService"
self._my_property = 12.1
@property
def my_property(self) -> float:
return self._my_property
@my_property.setter
def my_property(self, value: float) -> None:
self._my_property = value
@property
def name(self) -> str:
return self._name
service_instance = MyService()
server = pydase.Server(service_instance, web_port=9999)
thread = threading.Thread(target=server.run, daemon=True)
thread.start()
def restart(
server: pydase.Server,
thread: threading.Thread,
service_instance: pydase.DataService,
) -> tuple[pydase.Server, threading.Thread]:
server.handle_exit()
thread.join()
server = pydase.Server(service_instance, web_port=9999)
new_thread = threading.Thread(target=server.run, daemon=True)
new_thread.start()
return server, new_thread
yield server, thread, service_instance, restart
server.handle_exit()
thread.join()
def test_reconnection(
pydase_restartable_server: tuple[
pydase.Server,
threading.Thread,
pydase.DataService,
Callable[
[pydase.Server, threading.Thread, pydase.DataService],
tuple[pydase.Server, threading.Thread],
],
],
) -> None:
client = pydase.Client(
url="ws://localhost:9999",
sio_client_kwargs={
"reconnection": False,
},
)
client_2 = pydase.Client(
url="ws://localhost:9999",
sio_client_kwargs={
"reconnection_attempts": 1,
},
)
server, thread, service_instance, restart = pydase_restartable_server
service_instance._name = "New service name"
server, thread = restart(server, thread, service_instance)
with pytest.raises(socketio.exceptions.BadNamespaceError):
client.proxy.name
client_2.proxy.name
client.proxy.reconnect()
client_2.proxy.reconnect()
# the service proxies successfully reconnect and get the new service name
assert client.proxy.name == "New service name"
assert client_2.proxy.name == "New service name"

View File

@@ -5,7 +5,7 @@ import pydase
import pytest
from pydase.data_service.data_service_observer import DataServiceObserver
from pydase.data_service.state_manager import StateManager
from pydase.utils.serialization.serializer import SerializationError
from pydase.utils.serialization.serializer import SerializationError, dump
logger = logging.getLogger()
@@ -146,3 +146,41 @@ def test_private_attribute_does_not_have_to_be_serializable() -> None:
service_instance.change_publ_attr()
service_instance.change_priv_attr()
def test_normalized_attr_path_in_dependent_property_changes(
caplog: pytest.LogCaptureFixture,
) -> None:
class SubService(pydase.DataService):
_prop = 10.0
@property
def prop(self) -> float:
return self._prop
class MyService(pydase.DataService):
def __init__(self) -> None:
super().__init__()
self.service_dict = {"one": SubService()}
service_instance = MyService()
state_manager = StateManager(service=service_instance)
observer = DataServiceObserver(state_manager=state_manager)
assert observer.property_deps_dict["service_dict['one']._prop"] == [
"service_dict['one'].prop"
]
# We can use dict key path encoded with double quotes
state_manager.set_service_attribute_value_by_path(
'service_dict["one"]._prop', dump(11.0)
)
assert service_instance.service_dict["one"].prop == 11.0
assert "'service_dict[\"one\"].prop' changed to '11.0'" in caplog.text
# We can use dict key path encoded with single quotes
state_manager.set_service_attribute_value_by_path(
"service_dict['one']._prop", dump(12.0)
)
assert service_instance.service_dict["one"].prop == 12.0
assert "'service_dict[\"one\"].prop' changed to '12.0'" in caplog.text

View File

@@ -138,3 +138,154 @@ async def test_nested_dict_autostart_task(
"'sub_services_dict[\"second\"].my_task.status' changed to 'TaskStatus.RUNNING'"
in caplog.text
)
@pytest.mark.asyncio(scope="function")
async def test_manual_start_with_multiple_service_instances(
caplog: LogCaptureFixture,
) -> None:
class MySubService(pydase.DataService):
@task()
async def my_task(self) -> None:
logger.info("Triggered task.")
while True:
await asyncio.sleep(1)
class MyService(pydase.DataService):
sub_services_list = [MySubService() for i in range(2)]
sub_services_dict = {"first": MySubService(), "second": MySubService()}
service_instance = MyService()
state_manager = StateManager(service_instance)
DataServiceObserver(state_manager)
autostart_service_tasks(service_instance)
await asyncio.sleep(0.1)
assert (
service_instance.sub_services_list[0].my_task.status == TaskStatus.NOT_RUNNING
)
assert (
service_instance.sub_services_list[1].my_task.status == TaskStatus.NOT_RUNNING
)
assert (
service_instance.sub_services_dict["first"].my_task.status
== TaskStatus.NOT_RUNNING
)
assert (
service_instance.sub_services_dict["second"].my_task.status
== TaskStatus.NOT_RUNNING
)
service_instance.sub_services_list[0].my_task.start()
await asyncio.sleep(0.01)
assert service_instance.sub_services_list[0].my_task.status == TaskStatus.RUNNING
assert (
"'sub_services_list[0].my_task.status' changed to 'TaskStatus.RUNNING'"
in caplog.text
)
assert (
"'sub_services_list[1].my_task.status' changed to 'TaskStatus.RUNNING'"
not in caplog.text
)
assert (
"'sub_services_dict[\"first\"].my_task.status' changed to 'TaskStatus.RUNNING'"
not in caplog.text
)
assert (
"'sub_services_dict[\"second\"].my_task.status' changed to 'TaskStatus.RUNNING'"
not in caplog.text
)
service_instance.sub_services_list[0].my_task.stop()
await asyncio.sleep(0.01)
assert "Task 'my_task' was cancelled" in caplog.text
caplog.clear()
service_instance.sub_services_list[1].my_task.start()
await asyncio.sleep(0.01)
assert service_instance.sub_services_list[1].my_task.status == TaskStatus.RUNNING
assert (
"'sub_services_list[0].my_task.status' changed to 'TaskStatus.RUNNING'"
not in caplog.text
)
assert (
"'sub_services_list[1].my_task.status' changed to 'TaskStatus.RUNNING'"
in caplog.text
)
assert (
"'sub_services_dict[\"first\"].my_task.status' changed to 'TaskStatus.RUNNING'"
not in caplog.text
)
assert (
"'sub_services_dict[\"second\"].my_task.status' changed to 'TaskStatus.RUNNING'"
not in caplog.text
)
service_instance.sub_services_list[1].my_task.stop()
await asyncio.sleep(0.01)
assert "Task 'my_task' was cancelled" in caplog.text
caplog.clear()
service_instance.sub_services_dict["first"].my_task.start()
await asyncio.sleep(0.01)
assert (
service_instance.sub_services_dict["first"].my_task.status == TaskStatus.RUNNING
)
assert (
"'sub_services_list[0].my_task.status' changed to 'TaskStatus.RUNNING'"
not in caplog.text
)
assert (
"'sub_services_list[1].my_task.status' changed to 'TaskStatus.RUNNING'"
not in caplog.text
)
assert (
"'sub_services_dict[\"first\"].my_task.status' changed to 'TaskStatus.RUNNING'"
in caplog.text
)
assert (
"'sub_services_dict[\"second\"].my_task.status' changed to 'TaskStatus.RUNNING'"
not in caplog.text
)
service_instance.sub_services_dict["first"].my_task.stop()
await asyncio.sleep(0.01)
assert "Task 'my_task' was cancelled" in caplog.text
caplog.clear()
service_instance.sub_services_dict["second"].my_task.start()
await asyncio.sleep(0.01)
assert (
service_instance.sub_services_dict["second"].my_task.status
== TaskStatus.RUNNING
)
assert (
"'sub_services_list[0].my_task.status' changed to 'TaskStatus.RUNNING'"
not in caplog.text
)
assert (
"'sub_services_list[1].my_task.status' changed to 'TaskStatus.RUNNING'"
not in caplog.text
)
assert (
"'sub_services_dict[\"first\"].my_task.status' changed to 'TaskStatus.RUNNING'"
not in caplog.text
)
assert (
"'sub_services_dict[\"second\"].my_task.status' changed to 'TaskStatus.RUNNING'"
in caplog.text
)
service_instance.sub_services_dict["second"].my_task.stop()
await asyncio.sleep(0.01)
assert "Task 'my_task' was cancelled" in caplog.text

View File

@@ -12,6 +12,7 @@ from pydase.utils.decorators import frontend
from pydase.utils.serialization.serializer import (
SerializationPathError,
SerializedObject,
add_prefix_to_full_access_path,
dump,
generate_serialized_data_paths,
get_container_item_by_key,
@@ -1070,3 +1071,156 @@ def test_get_data_paths_from_serialized_object(obj: Any, expected: list[str]) ->
)
def test_generate_serialized_data_paths(obj: Any, expected: list[str]) -> None:
assert generate_serialized_data_paths(dump(obj=obj)["value"]) == expected
@pytest.mark.parametrize(
"serialized_obj, prefix, expected",
[
(
{
"full_access_path": "new_attr",
"value": {
"name": {
"full_access_path": "new_attr.name",
"value": "MyService",
}
},
},
"prefix",
{
"full_access_path": "prefix.new_attr",
"value": {
"name": {
"full_access_path": "prefix.new_attr.name",
"value": "MyService",
}
},
},
),
(
{
"full_access_path": "new_attr",
"value": [
{
"full_access_path": "new_attr[0]",
"value": 1.0,
}
],
},
"prefix",
{
"full_access_path": "prefix.new_attr",
"value": [
{
"full_access_path": "prefix.new_attr[0]",
"value": 1.0,
}
],
},
),
(
{
"full_access_path": "new_attr",
"value": {
"key": {
"full_access_path": 'new_attr["key"]',
"value": 1.0,
}
},
},
"prefix",
{
"full_access_path": "prefix.new_attr",
"value": {
"key": {
"full_access_path": 'prefix.new_attr["key"]',
"value": 1.0,
}
},
},
),
(
{
"full_access_path": "new_attr",
"value": {"magnitude": 10, "unit": "meter"},
},
"prefix",
{
"full_access_path": "prefix.new_attr",
"value": {"magnitude": 10, "unit": "meter"},
},
),
(
{
"full_access_path": "quantity_list",
"value": [
{
"full_access_path": "quantity_list[0]",
"value": {"magnitude": 1.0, "unit": "A"},
}
],
},
"prefix",
{
"full_access_path": "prefix.quantity_list",
"value": [
{
"full_access_path": "prefix.quantity_list[0]",
"value": {"magnitude": 1.0, "unit": "A"},
}
],
},
),
(
{
"full_access_path": "",
"value": {
"dict_attr": {
"type": "dict",
"full_access_path": "dict_attr",
"value": {
"foo": {
"full_access_path": 'dict_attr["foo"]',
"type": "dict",
"value": {
"some_int": {
"full_access_path": 'dict_attr["foo"].some_int',
"type": "int",
"value": 1,
},
},
},
},
}
},
},
"prefix",
{
"full_access_path": "prefix",
"value": {
"dict_attr": {
"type": "dict",
"full_access_path": "prefix.dict_attr",
"value": {
"foo": {
"full_access_path": 'prefix.dict_attr["foo"]',
"type": "dict",
"value": {
"some_int": {
"full_access_path": 'prefix.dict_attr["foo"].some_int',
"type": "int",
"value": 1,
},
},
},
},
}
},
},
),
],
)
def test_add_prefix_to_full_access_path(
serialized_obj: SerializedObject, prefix: str, expected: SerializedObject
) -> None:
assert add_prefix_to_full_access_path(serialized_obj, prefix) == expected