mirror of
https://github.com/tiqi-group/pydase_service_base.git
synced 2025-07-06 01:04:49 +02:00
Compare commits
5 Commits
Author | SHA1 | Date | |
---|---|---|---|
48e60de506 | |||
7651e1c983 | |||
447132a2bc | |||
b36a4e3d22 | |||
9dc0b23a3f |
@ -60,6 +60,8 @@ password: root
|
||||
database: my_database
|
||||
ssl: True # defaults to True
|
||||
verify_ssl: True # defaults to True
|
||||
headers:
|
||||
Host: other-virtual-host.ethz.ch
|
||||
```
|
||||
|
||||
`influxdb_config.yaml`:
|
||||
@ -67,6 +69,9 @@ verify_ssl: True # defaults to True
|
||||
url: https://database-url.ch
|
||||
org: your-org
|
||||
token: <influxdb-token>
|
||||
verify_ssl: True # defaults to True
|
||||
headers:
|
||||
Host: other-virtual-host.ethz.ch
|
||||
```
|
||||
|
||||
`postgres_development.yaml` / `postgres_production.yaml`:
|
||||
|
@ -29,6 +29,8 @@ class InfluxDBConfig(BaseConfig): # type: ignore
|
||||
url: str
|
||||
org: str
|
||||
token: SecretStr
|
||||
verify_ssl: bool = True
|
||||
headers: dict[str, str] = {} # noqa: RUF012
|
||||
|
||||
|
||||
class InfluxDBv1Config(BaseConfig): # type: ignore
|
||||
@ -39,3 +41,4 @@ class InfluxDBv1Config(BaseConfig): # type: ignore
|
||||
database: str
|
||||
ssl: bool = True
|
||||
verify_ssl: bool = True
|
||||
headers: dict[str, str] = {} # noqa: RUF012
|
||||
|
@ -80,12 +80,20 @@ class InfluxDBSession:
|
||||
self.url = self._config.url
|
||||
self.token = self._config.token.get_secret_value()
|
||||
self.org = self._config.org
|
||||
self.headers = self._config.headers
|
||||
self.verify_ssl = self._config.verify_ssl
|
||||
self._client: InfluxDBClient
|
||||
self._write_api: WriteApi
|
||||
self._buckets_api: BucketsApi
|
||||
|
||||
def __enter__(self) -> Self:
|
||||
self._client = InfluxDBClient(url=self.url, token=self.token, org=self.org)
|
||||
self._client = InfluxDBClient(
|
||||
url=self.url, token=self.token, org=self.org, verify_ssl=self.verify_ssl
|
||||
)
|
||||
|
||||
for header_name, header_value in self.headers.items():
|
||||
self._client.api_client.set_default_header(header_name, header_value)
|
||||
|
||||
self._write_api = self._client.write_api(write_options=SYNCHRONOUS) # type: ignore
|
||||
return self
|
||||
|
||||
|
@ -65,6 +65,7 @@ class InfluxDBv1Session:
|
||||
self._database = self._config.database
|
||||
self._ssl = self._config.ssl
|
||||
self._verify_ssl = self._config.verify_ssl
|
||||
self._headers = self._config.headers
|
||||
|
||||
def __enter__(self) -> Self:
|
||||
self._client = influxdb.InfluxDBClient(
|
||||
@ -75,6 +76,7 @@ class InfluxDBv1Session:
|
||||
database=self._database,
|
||||
ssl=self._ssl,
|
||||
verify_ssl=self._verify_ssl,
|
||||
headers=self._headers,
|
||||
)
|
||||
return self
|
||||
|
||||
|
@ -62,6 +62,60 @@ def update_method_serialization(
|
||||
return serialized_object
|
||||
|
||||
|
||||
def flatten_obj(
|
||||
obj: SerializedObject,
|
||||
) -> SerializedObject:
|
||||
"""Flattens container fields in the serialized representation of a service to avoid
|
||||
nested lists and dictionaries, which are not supported by Ionizer.
|
||||
|
||||
Ionizer requires a flat structure where each interactive or displayable element is
|
||||
individually addressable. This function removes intermediate container objects like
|
||||
lists and dicts and promotes their elements to top-level entries using fully
|
||||
qualified access paths (e.g., "my_list[0]" or "my_dict[\"key\"]").
|
||||
"""
|
||||
|
||||
obj_copy = copy.deepcopy(obj)
|
||||
if obj["type"] in (
|
||||
"DataService",
|
||||
"Image",
|
||||
"NumberSlider",
|
||||
"DeviceConnection",
|
||||
"Task",
|
||||
"list",
|
||||
"dict",
|
||||
):
|
||||
obj_copy["value"] = flatten_obj_value(obj["value"]) # type: ignore
|
||||
|
||||
return obj_copy
|
||||
|
||||
|
||||
def flatten_obj_value(
|
||||
obj_value: dict[str, SerializedObject],
|
||||
) -> dict[str, SerializedObject]:
|
||||
"""Recursively flattens the 'value' field of any serialized object if it contains
|
||||
lists or dicts, making each element directly accessible by its full access path.
|
||||
|
||||
This flattening is necessary because Ionizer does not support nested data
|
||||
structures. By converting structures like {"my_list": [...]}, into
|
||||
{"my_list[0]": ..., "my_list[1]": ...}, we make the representation flat and
|
||||
Ionizer-compatible.
|
||||
"""
|
||||
|
||||
flattened_obj_value: dict[str, SerializedObject] = {}
|
||||
|
||||
for key, value in obj_value.items():
|
||||
if value["type"] == "list" and isinstance(value["value"], list):
|
||||
for index, item in enumerate(value["value"]):
|
||||
flattened_obj_value[f"{key}[{index}]"] = flatten_obj(item)
|
||||
elif value["type"] == "dict" and isinstance(value["value"], dict):
|
||||
for k, v in value["value"].items():
|
||||
flattened_obj_value[f'{key}["{k}"]'] = flatten_obj(v)
|
||||
else:
|
||||
flattened_obj_value[key] = flatten_obj(value)
|
||||
|
||||
return flattened_obj_value
|
||||
|
||||
|
||||
class RPCInterface:
|
||||
"""RPC interface to be passed to tiqi_rpc.Server to interface with Ionizer."""
|
||||
|
||||
@ -79,8 +133,10 @@ class RPCInterface:
|
||||
return self._service.__class__.__name__
|
||||
|
||||
async def get_props(self) -> SerializedObject:
|
||||
return update_method_serialization(
|
||||
copy.deepcopy(self._service.serialize()["value"]) # type: ignore
|
||||
return flatten_obj_value(
|
||||
update_method_serialization(
|
||||
copy.deepcopy(self._service.serialize()["value"]) # type: ignore
|
||||
)
|
||||
)
|
||||
|
||||
async def get_param(self, full_access_path: str) -> Any:
|
||||
|
@ -1,6 +1,6 @@
|
||||
[tool.poetry]
|
||||
name = "pydase-service-base"
|
||||
version = "0.3.2"
|
||||
version = "0.3.4"
|
||||
description = "Repository storing the code that is common to all pydase services."
|
||||
authors = ["Mose Mueller <mosmuell@ethz.ch>"]
|
||||
license = "MIT"
|
||||
|
Reference in New Issue
Block a user