mirror of
https://github.com/tiqi-group/pydase_service_base.git
synced 2025-07-05 08:44:48 +02:00
Compare commits
13 Commits
Author | SHA1 | Date | |
---|---|---|---|
48e60de506 | |||
7651e1c983 | |||
447132a2bc | |||
b36a4e3d22 | |||
9dc0b23a3f | |||
854fc49c75 | |||
a3b9fc1fb5 | |||
8af6513aaf | |||
496816335f | |||
596962fa4c | |||
0c7dbae845 | |||
597aa035f1 | |||
dd8be147ba |
27
README.md
27
README.md
@ -32,12 +32,12 @@ To utilize specific functionalities such as `IonizerServer`, `InfluxDBv1Session`
|
||||
or directly add the following line to the `pyproject.toml` file:
|
||||
|
||||
```toml
|
||||
pydase-service-base = {git = "https://github.com/tiqi-group/pydase_service_base.git", rev = "main", extras = ["ionizer", "postgresql", "ionizer"]}
|
||||
pydase_service_base = {git = "https://github.com/tiqi-group/pydase_service_base.git", rev = "main", extras = ["ionizer", "postgresql", "influxdbv1", "influxdbv2"]}
|
||||
```
|
||||
|
||||
## Configuration
|
||||
|
||||
Database connections rely on configurations provided either through environment variables or a specific configuration file. The package anticipates a `database_config` folder in the root directory of any module using this package. This folder should house the configuration files for the databases. Override the `database_config` folder's location by passing a different path to the database classes' constructor.
|
||||
Database connections rely on configurations provided by environment variables or a specific configuration file. The package anticipates a `database_config` folder in the root directory of any module using this package. This folder should house the database configuration files. You can override the `database_config` folder's location by passing a different path to the database classes' constructor.
|
||||
|
||||
Structure of the `database_config` folder:
|
||||
|
||||
@ -53,13 +53,15 @@ Example content for the configuration files:
|
||||
|
||||
`influxdbv1_config.yaml`:
|
||||
```yaml
|
||||
host: https://database-url.ch
|
||||
port: 8086
|
||||
host: https://influxdb.phys.ethz.ch
|
||||
port: 443 # defaults to 8086 (default port of influxdb)
|
||||
username: root
|
||||
password: root
|
||||
database: my_database
|
||||
ssl: True
|
||||
verify_ssl: True
|
||||
ssl: True # defaults to True
|
||||
verify_ssl: True # defaults to True
|
||||
headers:
|
||||
Host: other-virtual-host.ethz.ch
|
||||
```
|
||||
|
||||
`influxdb_config.yaml`:
|
||||
@ -67,6 +69,9 @@ verify_ssl: True
|
||||
url: https://database-url.ch
|
||||
org: your-org
|
||||
token: <influxdb-token>
|
||||
verify_ssl: True # defaults to True
|
||||
headers:
|
||||
Host: other-virtual-host.ethz.ch
|
||||
```
|
||||
|
||||
`postgres_development.yaml` / `postgres_production.yaml`:
|
||||
@ -88,8 +93,8 @@ Interact with an InfluxDBv1 server using the `InfluxDBv1Session` class. **Note t
|
||||
from pydase_service_base.database import InfluxDBv1Session
|
||||
|
||||
with InfluxDBv1Session() as influx_client:
|
||||
# Writing data to a database
|
||||
data = [
|
||||
# Writing points to a database
|
||||
points = [
|
||||
{
|
||||
"measurement": "your_measurement", # Replace with your measurement
|
||||
"tags": {
|
||||
@ -101,7 +106,7 @@ with InfluxDBv1Session() as influx_client:
|
||||
"time": "2023-06-05T00:00:00Z", # Replace with your timestamp
|
||||
}
|
||||
]
|
||||
influx_client.write_points(data=data, database="other_database")
|
||||
influx_client.write_points(points=points, database="other_database")
|
||||
```
|
||||
|
||||
**Note** that you have to set `ssl` and `verify_ssl` to `False` when you are using a local influxdb instance.
|
||||
@ -160,7 +165,7 @@ with PostgresDatabaseSession() as session:
|
||||
|
||||
### Ionizer Interface
|
||||
|
||||
The `IonizerServer` provides an interface to seamlessly integrate `pydase` applications with Ionizer. **This requires the `ionizer` optional dependency**.
|
||||
The `IonizerServer` provides an interface to integrate `pydase` applications with Ionizer seamlessly. **This requires the `ionizer` optional dependency**.
|
||||
|
||||
To deploy `IonizerServer` with your service:
|
||||
|
||||
@ -190,7 +195,7 @@ if __name__ == "__main__":
|
||||
|
||||
This integration ensures that your primary `pydase` server and `YourServiceClass` service are set up. It also incorporates the `IonizerServer` on port `8002`.
|
||||
|
||||
For more details on the `IonizerServer`, refer to the [official documentation](https://pydase.readthedocs.io/en/latest/) or get in touch with the maintainers.
|
||||
For more details on the `IonizerServer`, get in touch with the maintainers.
|
||||
|
||||
## License
|
||||
|
||||
|
@ -29,13 +29,16 @@ class InfluxDBConfig(BaseConfig): # type: ignore
|
||||
url: str
|
||||
org: str
|
||||
token: SecretStr
|
||||
verify_ssl: bool = True
|
||||
headers: dict[str, str] = {} # noqa: RUF012
|
||||
|
||||
|
||||
class InfluxDBv1Config(BaseConfig): # type: ignore
|
||||
host: str
|
||||
port: int
|
||||
port: int = 8086
|
||||
username: str
|
||||
password: SecretStr
|
||||
database: str
|
||||
ssl: bool = True
|
||||
verify_ssl: bool = True
|
||||
headers: dict[str, str] = {} # noqa: RUF012
|
||||
|
@ -1,16 +1,17 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import sys
|
||||
from typing import TYPE_CHECKING, Any, NamedTuple
|
||||
|
||||
try:
|
||||
from typing import Self # type: ignore
|
||||
except ImportError:
|
||||
if sys.version_info < (3, 11):
|
||||
from typing_extensions import Self
|
||||
else:
|
||||
from typing import Self
|
||||
|
||||
|
||||
from confz import FileSource
|
||||
from influxdb_client import (
|
||||
from influxdb_client import ( # type: ignore
|
||||
Bucket,
|
||||
BucketRetentionRules,
|
||||
BucketsApi,
|
||||
@ -19,9 +20,9 @@ from influxdb_client import (
|
||||
WriteApi,
|
||||
WritePrecision,
|
||||
)
|
||||
from influxdb_client.client.write.point import DEFAULT_WRITE_PRECISION
|
||||
from influxdb_client.client.write_api import SYNCHRONOUS
|
||||
from influxdb_client.rest import ApiException
|
||||
from influxdb_client.client.write.point import DEFAULT_WRITE_PRECISION # type: ignore
|
||||
from influxdb_client.client.write_api import SYNCHRONOUS # type: ignore
|
||||
from influxdb_client.rest import ApiException # type: ignore
|
||||
|
||||
from pydase_service_base.database.config import InfluxDBConfig, ServiceConfig
|
||||
|
||||
@ -79,12 +80,20 @@ class InfluxDBSession:
|
||||
self.url = self._config.url
|
||||
self.token = self._config.token.get_secret_value()
|
||||
self.org = self._config.org
|
||||
self.headers = self._config.headers
|
||||
self.verify_ssl = self._config.verify_ssl
|
||||
self._client: InfluxDBClient
|
||||
self._write_api: WriteApi
|
||||
self._buckets_api: BucketsApi
|
||||
|
||||
def __enter__(self) -> Self:
|
||||
self._client = InfluxDBClient(url=self.url, token=self.token, org=self.org)
|
||||
self._client = InfluxDBClient(
|
||||
url=self.url, token=self.token, org=self.org, verify_ssl=self.verify_ssl
|
||||
)
|
||||
|
||||
for header_name, header_value in self.headers.items():
|
||||
self._client.api_client.set_default_header(header_name, header_value)
|
||||
|
||||
self._write_api = self._client.write_api(write_options=SYNCHRONOUS) # type: ignore
|
||||
return self
|
||||
|
||||
|
@ -31,13 +31,8 @@ class InfluxDBv1Session:
|
||||
Example:
|
||||
```python
|
||||
with InfluxDBv1Session() as influx_client:
|
||||
# Creating a database
|
||||
influx_client.create_database(
|
||||
dbname='my_new_database'
|
||||
)
|
||||
|
||||
# Writing data to a database
|
||||
data = [
|
||||
points = [
|
||||
{
|
||||
"measurement": "your_measurement", # Replace with your measurement
|
||||
"tags": {
|
||||
@ -49,10 +44,7 @@ class InfluxDBv1Session:
|
||||
"time": "2023-06-05T00:00:00Z", # Replace with your timestamp
|
||||
}
|
||||
]
|
||||
influx_client.write_points(data=data, database="other_database")
|
||||
|
||||
# just write one point into the client's current database
|
||||
influx_client.write(data=data[0])
|
||||
influx_client.write_points(points=points, database="other_database")
|
||||
```
|
||||
"""
|
||||
|
||||
@ -73,6 +65,7 @@ class InfluxDBv1Session:
|
||||
self._database = self._config.database
|
||||
self._ssl = self._config.ssl
|
||||
self._verify_ssl = self._config.verify_ssl
|
||||
self._headers = self._config.headers
|
||||
|
||||
def __enter__(self) -> Self:
|
||||
self._client = influxdb.InfluxDBClient(
|
||||
@ -83,6 +76,7 @@ class InfluxDBv1Session:
|
||||
database=self._database,
|
||||
ssl=self._ssl,
|
||||
verify_ssl=self._verify_ssl,
|
||||
headers=self._headers,
|
||||
)
|
||||
return self
|
||||
|
||||
|
@ -7,7 +7,7 @@ from typing import Any
|
||||
from pydase import DataService
|
||||
from pydase.components import NumberSlider
|
||||
from pydase.data_service.data_service_observer import DataServiceObserver
|
||||
from pydase.units import Quantity
|
||||
from pydase.units import Quantity, Unit
|
||||
from pydase.utils.helpers import (
|
||||
get_object_attr_from_path,
|
||||
get_object_by_path_parts,
|
||||
@ -62,6 +62,60 @@ def update_method_serialization(
|
||||
return serialized_object
|
||||
|
||||
|
||||
def flatten_obj(
|
||||
obj: SerializedObject,
|
||||
) -> SerializedObject:
|
||||
"""Flattens container fields in the serialized representation of a service to avoid
|
||||
nested lists and dictionaries, which are not supported by Ionizer.
|
||||
|
||||
Ionizer requires a flat structure where each interactive or displayable element is
|
||||
individually addressable. This function removes intermediate container objects like
|
||||
lists and dicts and promotes their elements to top-level entries using fully
|
||||
qualified access paths (e.g., "my_list[0]" or "my_dict[\"key\"]").
|
||||
"""
|
||||
|
||||
obj_copy = copy.deepcopy(obj)
|
||||
if obj["type"] in (
|
||||
"DataService",
|
||||
"Image",
|
||||
"NumberSlider",
|
||||
"DeviceConnection",
|
||||
"Task",
|
||||
"list",
|
||||
"dict",
|
||||
):
|
||||
obj_copy["value"] = flatten_obj_value(obj["value"]) # type: ignore
|
||||
|
||||
return obj_copy
|
||||
|
||||
|
||||
def flatten_obj_value(
|
||||
obj_value: dict[str, SerializedObject],
|
||||
) -> dict[str, SerializedObject]:
|
||||
"""Recursively flattens the 'value' field of any serialized object if it contains
|
||||
lists or dicts, making each element directly accessible by its full access path.
|
||||
|
||||
This flattening is necessary because Ionizer does not support nested data
|
||||
structures. By converting structures like {"my_list": [...]}, into
|
||||
{"my_list[0]": ..., "my_list[1]": ...}, we make the representation flat and
|
||||
Ionizer-compatible.
|
||||
"""
|
||||
|
||||
flattened_obj_value: dict[str, SerializedObject] = {}
|
||||
|
||||
for key, value in obj_value.items():
|
||||
if value["type"] == "list" and isinstance(value["value"], list):
|
||||
for index, item in enumerate(value["value"]):
|
||||
flattened_obj_value[f"{key}[{index}]"] = flatten_obj(item)
|
||||
elif value["type"] == "dict" and isinstance(value["value"], dict):
|
||||
for k, v in value["value"].items():
|
||||
flattened_obj_value[f'{key}["{k}"]'] = flatten_obj(v)
|
||||
else:
|
||||
flattened_obj_value[key] = flatten_obj(value)
|
||||
|
||||
return flattened_obj_value
|
||||
|
||||
|
||||
class RPCInterface:
|
||||
"""RPC interface to be passed to tiqi_rpc.Server to interface with Ionizer."""
|
||||
|
||||
@ -79,8 +133,10 @@ class RPCInterface:
|
||||
return self._service.__class__.__name__
|
||||
|
||||
async def get_props(self) -> SerializedObject:
|
||||
return update_method_serialization(
|
||||
copy.deepcopy(self._service.serialize()["value"]) # type: ignore
|
||||
return flatten_obj_value(
|
||||
update_method_serialization(
|
||||
copy.deepcopy(self._service.serialize()["value"]) # type: ignore
|
||||
)
|
||||
)
|
||||
|
||||
async def get_param(self, full_access_path: str) -> Any:
|
||||
@ -91,6 +147,8 @@ class RPCInterface:
|
||||
"""
|
||||
param = get_object_attr_from_path(self._service, full_access_path)
|
||||
if isinstance(param, NumberSlider):
|
||||
if isinstance(param.value, Quantity):
|
||||
return param.value.m
|
||||
return param.value
|
||||
if isinstance(param, DataService):
|
||||
return param.serialize()
|
||||
@ -123,6 +181,10 @@ class RPCInterface:
|
||||
value = value * current_value.u
|
||||
elif current_value_dict["type"] == "NumberSlider":
|
||||
full_access_path = full_access_path + ".value"
|
||||
if current_value_dict["value"]["value"]["type"] == "Quantity":
|
||||
value = value * Unit(
|
||||
current_value_dict["value"]["value"]["value"]["unit"] # type: ignore
|
||||
)
|
||||
|
||||
self._state_manager.set_service_attribute_value_by_path(
|
||||
full_access_path, dump(value)
|
||||
|
@ -1,6 +1,6 @@
|
||||
[tool.poetry]
|
||||
name = "pydase-service-base"
|
||||
version = "0.3.0"
|
||||
version = "0.3.4"
|
||||
description = "Repository storing the code that is common to all pydase services."
|
||||
authors = ["Mose Mueller <mosmuell@ethz.ch>"]
|
||||
license = "MIT"
|
||||
|
Reference in New Issue
Block a user