mirror of
https://github.com/tiqi-group/pydase_service_base.git
synced 2025-07-05 08:44:48 +02:00
Compare commits
8 Commits
Author | SHA1 | Date | |
---|---|---|---|
48e60de506 | |||
7651e1c983 | |||
447132a2bc | |||
b36a4e3d22 | |||
9dc0b23a3f | |||
854fc49c75 | |||
a3b9fc1fb5 | |||
8af6513aaf |
13
README.md
13
README.md
@ -32,12 +32,12 @@ To utilize specific functionalities such as `IonizerServer`, `InfluxDBv1Session`
|
||||
or directly add the following line to the `pyproject.toml` file:
|
||||
|
||||
```toml
|
||||
pydase-service-base = {git = "https://github.com/tiqi-group/pydase_service_base.git", rev = "main", extras = ["ionizer", "postgresql", "ionizer"]}
|
||||
pydase_service_base = {git = "https://github.com/tiqi-group/pydase_service_base.git", rev = "main", extras = ["ionizer", "postgresql", "influxdbv1", "influxdbv2"]}
|
||||
```
|
||||
|
||||
## Configuration
|
||||
|
||||
Database connections rely on configurations provided either through environment variables or a specific configuration file. The package anticipates a `database_config` folder in the root directory of any module using this package. This folder should house the configuration files for the databases. Override the `database_config` folder's location by passing a different path to the database classes' constructor.
|
||||
Database connections rely on configurations provided by environment variables or a specific configuration file. The package anticipates a `database_config` folder in the root directory of any module using this package. This folder should house the database configuration files. You can override the `database_config` folder's location by passing a different path to the database classes' constructor.
|
||||
|
||||
Structure of the `database_config` folder:
|
||||
|
||||
@ -60,6 +60,8 @@ password: root
|
||||
database: my_database
|
||||
ssl: True # defaults to True
|
||||
verify_ssl: True # defaults to True
|
||||
headers:
|
||||
Host: other-virtual-host.ethz.ch
|
||||
```
|
||||
|
||||
`influxdb_config.yaml`:
|
||||
@ -67,6 +69,9 @@ verify_ssl: True # defaults to True
|
||||
url: https://database-url.ch
|
||||
org: your-org
|
||||
token: <influxdb-token>
|
||||
verify_ssl: True # defaults to True
|
||||
headers:
|
||||
Host: other-virtual-host.ethz.ch
|
||||
```
|
||||
|
||||
`postgres_development.yaml` / `postgres_production.yaml`:
|
||||
@ -160,7 +165,7 @@ with PostgresDatabaseSession() as session:
|
||||
|
||||
### Ionizer Interface
|
||||
|
||||
The `IonizerServer` provides an interface to seamlessly integrate `pydase` applications with Ionizer. **This requires the `ionizer` optional dependency**.
|
||||
The `IonizerServer` provides an interface to integrate `pydase` applications with Ionizer seamlessly. **This requires the `ionizer` optional dependency**.
|
||||
|
||||
To deploy `IonizerServer` with your service:
|
||||
|
||||
@ -190,7 +195,7 @@ if __name__ == "__main__":
|
||||
|
||||
This integration ensures that your primary `pydase` server and `YourServiceClass` service are set up. It also incorporates the `IonizerServer` on port `8002`.
|
||||
|
||||
For more details on the `IonizerServer`, refer to the [official documentation](https://pydase.readthedocs.io/en/latest/) or get in touch with the maintainers.
|
||||
For more details on the `IonizerServer`, get in touch with the maintainers.
|
||||
|
||||
## License
|
||||
|
||||
|
@ -29,6 +29,8 @@ class InfluxDBConfig(BaseConfig): # type: ignore
|
||||
url: str
|
||||
org: str
|
||||
token: SecretStr
|
||||
verify_ssl: bool = True
|
||||
headers: dict[str, str] = {} # noqa: RUF012
|
||||
|
||||
|
||||
class InfluxDBv1Config(BaseConfig): # type: ignore
|
||||
@ -39,3 +41,4 @@ class InfluxDBv1Config(BaseConfig): # type: ignore
|
||||
database: str
|
||||
ssl: bool = True
|
||||
verify_ssl: bool = True
|
||||
headers: dict[str, str] = {} # noqa: RUF012
|
||||
|
@ -1,16 +1,17 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import sys
|
||||
from typing import TYPE_CHECKING, Any, NamedTuple
|
||||
|
||||
try:
|
||||
from typing import Self # type: ignore
|
||||
except ImportError:
|
||||
if sys.version_info < (3, 11):
|
||||
from typing_extensions import Self
|
||||
else:
|
||||
from typing import Self
|
||||
|
||||
|
||||
from confz import FileSource
|
||||
from influxdb_client import (
|
||||
from influxdb_client import ( # type: ignore
|
||||
Bucket,
|
||||
BucketRetentionRules,
|
||||
BucketsApi,
|
||||
@ -19,9 +20,9 @@ from influxdb_client import (
|
||||
WriteApi,
|
||||
WritePrecision,
|
||||
)
|
||||
from influxdb_client.client.write.point import DEFAULT_WRITE_PRECISION
|
||||
from influxdb_client.client.write_api import SYNCHRONOUS
|
||||
from influxdb_client.rest import ApiException
|
||||
from influxdb_client.client.write.point import DEFAULT_WRITE_PRECISION # type: ignore
|
||||
from influxdb_client.client.write_api import SYNCHRONOUS # type: ignore
|
||||
from influxdb_client.rest import ApiException # type: ignore
|
||||
|
||||
from pydase_service_base.database.config import InfluxDBConfig, ServiceConfig
|
||||
|
||||
@ -79,12 +80,20 @@ class InfluxDBSession:
|
||||
self.url = self._config.url
|
||||
self.token = self._config.token.get_secret_value()
|
||||
self.org = self._config.org
|
||||
self.headers = self._config.headers
|
||||
self.verify_ssl = self._config.verify_ssl
|
||||
self._client: InfluxDBClient
|
||||
self._write_api: WriteApi
|
||||
self._buckets_api: BucketsApi
|
||||
|
||||
def __enter__(self) -> Self:
|
||||
self._client = InfluxDBClient(url=self.url, token=self.token, org=self.org)
|
||||
self._client = InfluxDBClient(
|
||||
url=self.url, token=self.token, org=self.org, verify_ssl=self.verify_ssl
|
||||
)
|
||||
|
||||
for header_name, header_value in self.headers.items():
|
||||
self._client.api_client.set_default_header(header_name, header_value)
|
||||
|
||||
self._write_api = self._client.write_api(write_options=SYNCHRONOUS) # type: ignore
|
||||
return self
|
||||
|
||||
|
@ -65,6 +65,7 @@ class InfluxDBv1Session:
|
||||
self._database = self._config.database
|
||||
self._ssl = self._config.ssl
|
||||
self._verify_ssl = self._config.verify_ssl
|
||||
self._headers = self._config.headers
|
||||
|
||||
def __enter__(self) -> Self:
|
||||
self._client = influxdb.InfluxDBClient(
|
||||
@ -75,6 +76,7 @@ class InfluxDBv1Session:
|
||||
database=self._database,
|
||||
ssl=self._ssl,
|
||||
verify_ssl=self._verify_ssl,
|
||||
headers=self._headers,
|
||||
)
|
||||
return self
|
||||
|
||||
|
@ -62,6 +62,60 @@ def update_method_serialization(
|
||||
return serialized_object
|
||||
|
||||
|
||||
def flatten_obj(
|
||||
obj: SerializedObject,
|
||||
) -> SerializedObject:
|
||||
"""Flattens container fields in the serialized representation of a service to avoid
|
||||
nested lists and dictionaries, which are not supported by Ionizer.
|
||||
|
||||
Ionizer requires a flat structure where each interactive or displayable element is
|
||||
individually addressable. This function removes intermediate container objects like
|
||||
lists and dicts and promotes their elements to top-level entries using fully
|
||||
qualified access paths (e.g., "my_list[0]" or "my_dict[\"key\"]").
|
||||
"""
|
||||
|
||||
obj_copy = copy.deepcopy(obj)
|
||||
if obj["type"] in (
|
||||
"DataService",
|
||||
"Image",
|
||||
"NumberSlider",
|
||||
"DeviceConnection",
|
||||
"Task",
|
||||
"list",
|
||||
"dict",
|
||||
):
|
||||
obj_copy["value"] = flatten_obj_value(obj["value"]) # type: ignore
|
||||
|
||||
return obj_copy
|
||||
|
||||
|
||||
def flatten_obj_value(
|
||||
obj_value: dict[str, SerializedObject],
|
||||
) -> dict[str, SerializedObject]:
|
||||
"""Recursively flattens the 'value' field of any serialized object if it contains
|
||||
lists or dicts, making each element directly accessible by its full access path.
|
||||
|
||||
This flattening is necessary because Ionizer does not support nested data
|
||||
structures. By converting structures like {"my_list": [...]}, into
|
||||
{"my_list[0]": ..., "my_list[1]": ...}, we make the representation flat and
|
||||
Ionizer-compatible.
|
||||
"""
|
||||
|
||||
flattened_obj_value: dict[str, SerializedObject] = {}
|
||||
|
||||
for key, value in obj_value.items():
|
||||
if value["type"] == "list" and isinstance(value["value"], list):
|
||||
for index, item in enumerate(value["value"]):
|
||||
flattened_obj_value[f"{key}[{index}]"] = flatten_obj(item)
|
||||
elif value["type"] == "dict" and isinstance(value["value"], dict):
|
||||
for k, v in value["value"].items():
|
||||
flattened_obj_value[f'{key}["{k}"]'] = flatten_obj(v)
|
||||
else:
|
||||
flattened_obj_value[key] = flatten_obj(value)
|
||||
|
||||
return flattened_obj_value
|
||||
|
||||
|
||||
class RPCInterface:
|
||||
"""RPC interface to be passed to tiqi_rpc.Server to interface with Ionizer."""
|
||||
|
||||
@ -79,8 +133,10 @@ class RPCInterface:
|
||||
return self._service.__class__.__name__
|
||||
|
||||
async def get_props(self) -> SerializedObject:
|
||||
return update_method_serialization(
|
||||
copy.deepcopy(self._service.serialize()["value"]) # type: ignore
|
||||
return flatten_obj_value(
|
||||
update_method_serialization(
|
||||
copy.deepcopy(self._service.serialize()["value"]) # type: ignore
|
||||
)
|
||||
)
|
||||
|
||||
async def get_param(self, full_access_path: str) -> Any:
|
||||
|
@ -1,6 +1,6 @@
|
||||
[tool.poetry]
|
||||
name = "pydase-service-base"
|
||||
version = "0.3.1"
|
||||
version = "0.3.4"
|
||||
description = "Repository storing the code that is common to all pydase services."
|
||||
authors = ["Mose Mueller <mosmuell@ethz.ch>"]
|
||||
license = "MIT"
|
||||
|
Reference in New Issue
Block a user