13 Commits
v0.3.0 ... main

6 changed files with 107 additions and 34 deletions

View File

@ -32,12 +32,12 @@ To utilize specific functionalities such as `IonizerServer`, `InfluxDBv1Session`
or directly add the following line to the `pyproject.toml` file:
```toml
pydase-service-base = {git = "https://github.com/tiqi-group/pydase_service_base.git", rev = "main", extras = ["ionizer", "postgresql", "ionizer"]}
pydase_service_base = {git = "https://github.com/tiqi-group/pydase_service_base.git", rev = "main", extras = ["ionizer", "postgresql", "influxdbv1", "influxdbv2"]}
```
## Configuration
Database connections rely on configurations provided either through environment variables or a specific configuration file. The package anticipates a `database_config` folder in the root directory of any module using this package. This folder should house the configuration files for the databases. Override the `database_config` folder's location by passing a different path to the database classes' constructor.
Database connections rely on configurations provided by environment variables or a specific configuration file. The package anticipates a `database_config` folder in the root directory of any module using this package. This folder should house the database configuration files. You can override the `database_config` folder's location by passing a different path to the database classes' constructor.
Structure of the `database_config` folder:
@ -53,13 +53,15 @@ Example content for the configuration files:
`influxdbv1_config.yaml`:
```yaml
host: https://database-url.ch
port: 8086
host: https://influxdb.phys.ethz.ch
port: 443 # defaults to 8086 (default port of influxdb)
username: root
password: root
database: my_database
ssl: True
verify_ssl: True
ssl: True # defaults to True
verify_ssl: True # defaults to True
headers:
Host: other-virtual-host.ethz.ch
```
`influxdb_config.yaml`:
@ -67,6 +69,9 @@ verify_ssl: True
url: https://database-url.ch
org: your-org
token: <influxdb-token>
verify_ssl: True # defaults to True
headers:
Host: other-virtual-host.ethz.ch
```
`postgres_development.yaml` / `postgres_production.yaml`:
@ -88,8 +93,8 @@ Interact with an InfluxDBv1 server using the `InfluxDBv1Session` class. **Note t
from pydase_service_base.database import InfluxDBv1Session
with InfluxDBv1Session() as influx_client:
# Writing data to a database
data = [
# Writing points to a database
points = [
{
"measurement": "your_measurement", # Replace with your measurement
"tags": {
@ -101,7 +106,7 @@ with InfluxDBv1Session() as influx_client:
"time": "2023-06-05T00:00:00Z", # Replace with your timestamp
}
]
influx_client.write_points(data=data, database="other_database")
influx_client.write_points(points=points, database="other_database")
```
**Note** that you have to set `ssl` and `verify_ssl` to `False` when you are using a local influxdb instance.
@ -160,7 +165,7 @@ with PostgresDatabaseSession() as session:
### Ionizer Interface
The `IonizerServer` provides an interface to seamlessly integrate `pydase` applications with Ionizer. **This requires the `ionizer` optional dependency**.
The `IonizerServer` provides an interface to integrate `pydase` applications with Ionizer seamlessly. **This requires the `ionizer` optional dependency**.
To deploy `IonizerServer` with your service:
@ -190,7 +195,7 @@ if __name__ == "__main__":
This integration ensures that your primary `pydase` server and `YourServiceClass` service are set up. It also incorporates the `IonizerServer` on port `8002`.
For more details on the `IonizerServer`, refer to the [official documentation](https://pydase.readthedocs.io/en/latest/) or get in touch with the maintainers.
For more details on the `IonizerServer`, get in touch with the maintainers.
## License

View File

@ -29,13 +29,16 @@ class InfluxDBConfig(BaseConfig): # type: ignore
url: str
org: str
token: SecretStr
verify_ssl: bool = True
headers: dict[str, str] = {} # noqa: RUF012
class InfluxDBv1Config(BaseConfig): # type: ignore
host: str
port: int
port: int = 8086
username: str
password: SecretStr
database: str
ssl: bool = True
verify_ssl: bool = True
headers: dict[str, str] = {} # noqa: RUF012

View File

@ -1,16 +1,17 @@
from __future__ import annotations
import logging
import sys
from typing import TYPE_CHECKING, Any, NamedTuple
try:
from typing import Self # type: ignore
except ImportError:
if sys.version_info < (3, 11):
from typing_extensions import Self
else:
from typing import Self
from confz import FileSource
from influxdb_client import (
from influxdb_client import ( # type: ignore
Bucket,
BucketRetentionRules,
BucketsApi,
@ -19,9 +20,9 @@ from influxdb_client import (
WriteApi,
WritePrecision,
)
from influxdb_client.client.write.point import DEFAULT_WRITE_PRECISION
from influxdb_client.client.write_api import SYNCHRONOUS
from influxdb_client.rest import ApiException
from influxdb_client.client.write.point import DEFAULT_WRITE_PRECISION # type: ignore
from influxdb_client.client.write_api import SYNCHRONOUS # type: ignore
from influxdb_client.rest import ApiException # type: ignore
from pydase_service_base.database.config import InfluxDBConfig, ServiceConfig
@ -79,12 +80,20 @@ class InfluxDBSession:
self.url = self._config.url
self.token = self._config.token.get_secret_value()
self.org = self._config.org
self.headers = self._config.headers
self.verify_ssl = self._config.verify_ssl
self._client: InfluxDBClient
self._write_api: WriteApi
self._buckets_api: BucketsApi
def __enter__(self) -> Self:
self._client = InfluxDBClient(url=self.url, token=self.token, org=self.org)
self._client = InfluxDBClient(
url=self.url, token=self.token, org=self.org, verify_ssl=self.verify_ssl
)
for header_name, header_value in self.headers.items():
self._client.api_client.set_default_header(header_name, header_value)
self._write_api = self._client.write_api(write_options=SYNCHRONOUS) # type: ignore
return self

View File

@ -31,13 +31,8 @@ class InfluxDBv1Session:
Example:
```python
with InfluxDBv1Session() as influx_client:
# Creating a database
influx_client.create_database(
dbname='my_new_database'
)
# Writing data to a database
data = [
points = [
{
"measurement": "your_measurement", # Replace with your measurement
"tags": {
@ -49,10 +44,7 @@ class InfluxDBv1Session:
"time": "2023-06-05T00:00:00Z", # Replace with your timestamp
}
]
influx_client.write_points(data=data, database="other_database")
# just write one point into the client's current database
influx_client.write(data=data[0])
influx_client.write_points(points=points, database="other_database")
```
"""
@ -73,6 +65,7 @@ class InfluxDBv1Session:
self._database = self._config.database
self._ssl = self._config.ssl
self._verify_ssl = self._config.verify_ssl
self._headers = self._config.headers
def __enter__(self) -> Self:
self._client = influxdb.InfluxDBClient(
@ -83,6 +76,7 @@ class InfluxDBv1Session:
database=self._database,
ssl=self._ssl,
verify_ssl=self._verify_ssl,
headers=self._headers,
)
return self

View File

@ -7,7 +7,7 @@ from typing import Any
from pydase import DataService
from pydase.components import NumberSlider
from pydase.data_service.data_service_observer import DataServiceObserver
from pydase.units import Quantity
from pydase.units import Quantity, Unit
from pydase.utils.helpers import (
get_object_attr_from_path,
get_object_by_path_parts,
@ -62,6 +62,60 @@ def update_method_serialization(
return serialized_object
def flatten_obj(
obj: SerializedObject,
) -> SerializedObject:
"""Flattens container fields in the serialized representation of a service to avoid
nested lists and dictionaries, which are not supported by Ionizer.
Ionizer requires a flat structure where each interactive or displayable element is
individually addressable. This function removes intermediate container objects like
lists and dicts and promotes their elements to top-level entries using fully
qualified access paths (e.g., "my_list[0]" or "my_dict[\"key\"]").
"""
obj_copy = copy.deepcopy(obj)
if obj["type"] in (
"DataService",
"Image",
"NumberSlider",
"DeviceConnection",
"Task",
"list",
"dict",
):
obj_copy["value"] = flatten_obj_value(obj["value"]) # type: ignore
return obj_copy
def flatten_obj_value(
obj_value: dict[str, SerializedObject],
) -> dict[str, SerializedObject]:
"""Recursively flattens the 'value' field of any serialized object if it contains
lists or dicts, making each element directly accessible by its full access path.
This flattening is necessary because Ionizer does not support nested data
structures. By converting structures like {"my_list": [...]}, into
{"my_list[0]": ..., "my_list[1]": ...}, we make the representation flat and
Ionizer-compatible.
"""
flattened_obj_value: dict[str, SerializedObject] = {}
for key, value in obj_value.items():
if value["type"] == "list" and isinstance(value["value"], list):
for index, item in enumerate(value["value"]):
flattened_obj_value[f"{key}[{index}]"] = flatten_obj(item)
elif value["type"] == "dict" and isinstance(value["value"], dict):
for k, v in value["value"].items():
flattened_obj_value[f'{key}["{k}"]'] = flatten_obj(v)
else:
flattened_obj_value[key] = flatten_obj(value)
return flattened_obj_value
class RPCInterface:
"""RPC interface to be passed to tiqi_rpc.Server to interface with Ionizer."""
@ -79,8 +133,10 @@ class RPCInterface:
return self._service.__class__.__name__
async def get_props(self) -> SerializedObject:
return update_method_serialization(
copy.deepcopy(self._service.serialize()["value"]) # type: ignore
return flatten_obj_value(
update_method_serialization(
copy.deepcopy(self._service.serialize()["value"]) # type: ignore
)
)
async def get_param(self, full_access_path: str) -> Any:
@ -91,6 +147,8 @@ class RPCInterface:
"""
param = get_object_attr_from_path(self._service, full_access_path)
if isinstance(param, NumberSlider):
if isinstance(param.value, Quantity):
return param.value.m
return param.value
if isinstance(param, DataService):
return param.serialize()
@ -123,6 +181,10 @@ class RPCInterface:
value = value * current_value.u
elif current_value_dict["type"] == "NumberSlider":
full_access_path = full_access_path + ".value"
if current_value_dict["value"]["value"]["type"] == "Quantity":
value = value * Unit(
current_value_dict["value"]["value"]["value"]["unit"] # type: ignore
)
self._state_manager.set_service_attribute_value_by_path(
full_access_path, dump(value)

View File

@ -1,6 +1,6 @@
[tool.poetry]
name = "pydase-service-base"
version = "0.3.0"
version = "0.3.4"
description = "Repository storing the code that is common to all pydase services."
authors = ["Mose Mueller <mosmuell@ethz.ch>"]
license = "MIT"