adds support for InfluxDB v1 clients

This commit is contained in:
Mose Mueller 2024-11-05 11:46:49 +01:00
parent b05a38af77
commit 80150ab50a
4 changed files with 265 additions and 3 deletions

View File

@ -10,12 +10,16 @@ Ensure you have Python 3.10 or later installed on your system. Dependencies of t
poetry add git+https://github.com/tiqi-group/pydase_service_base.git
```
To utilize specific functionalities such as `IonizerServer`, `InfluxDBSession`, or `PostgresDatabaseSession`, you need to install the relevant optional dependencies:
To utilize specific functionalities such as `IonizerServer`, `InfluxDBv1Session`,`InfluxDBSession`, or `PostgresDatabaseSession`, you need to install the relevant optional dependencies:
- For `IonizerServer`, include the `ionizer` extra:
```bash
poetry add "git+https://github.com/tiqi-group/pydase_service_base.git#main[ionizer]"
```
- For `InfluxDBv1Session`, include the `influxdbv1` extra:
```bash
poetry add "git+https://github.com/tiqi-group/pydase_service_base.git#main[influxdbv1]"
```
- For `InfluxDBSession`, include the `influxdbv2` extra:
```bash
poetry add "git+https://github.com/tiqi-group/pydase_service_base.git#main[influxdbv2]"
@ -39,6 +43,7 @@ Structure of the `database_config` folder:
```
database_config
├── influxdbv1_config.yaml
├── influxdb_config.yaml
├── postgres_development.yaml
└── postgres_production.yaml
@ -46,6 +51,17 @@ database_config
Example content for the configuration files:
`influxdbv1_config.yaml`:
```yaml
host: https://database-url.ch
port: 8086
username: root
password: root
database: my_database
ssl: True
verify_ssl: True
```
`influxdb_config.yaml`:
```yaml
url: https://database-url.ch
@ -64,6 +80,38 @@ user: ...
## Usage
### InfluxDBv1Session
Interact with an InfluxDBv1 server using the `InfluxDBv1Session` class. **Note that this class only supports InfluxDB v1** and **requires the `influxdbv1` optional dependency**.
```python
from pydase_service_base.database import InfluxDBv1Session
with InfluxDBv1Session() as influx_client:
# Creating a database
influx_client.create_database(
dbname='my_new_database'
)
# Writing data to a database
data = [
{
"measurement": "your_measurement", # Replace with your measurement
"tags": {
"example_tag": "tag_value", # Replace with your tag and value
},
"fields": {
"example_field": 123, # Replace with your field and its value
},
"time": "2023-06-05T00:00:00Z", # Replace with your timestamp
}
]
influx_client.write_points(data=[data, data], database="other_database")
# just write one point into the client's current database
influx_client.write(data=data[0])
```
### InfluxDBSession
Interact with an InfluxDB server using the `InfluxDBSession` class. **Note that this class only supports InfluxDB v2** and **requires the `influxdbv2` optional dependency**.

View File

@ -29,3 +29,13 @@ class InfluxDBConfig(BaseConfig): # type: ignore
url: str
org: str
token: SecretStr
class InfluxDBv1Config(BaseConfig): # type: ignore
host: str
port: int
username: str
password: SecretStr
database: str
ssl: bool = True
verify_ssl: bool = True

View File

@ -0,0 +1,201 @@
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Any, Literal
try:
from typing import Self # type: ignore
except ImportError:
from typing_extensions import Self
import influxdb
from confz import FileSource
from pydase_service_base.database.config import InfluxDBv1Config, ServiceConfig
if TYPE_CHECKING:
from pathlib import Path
from types import TracebackType
logger = logging.getLogger(__name__)
class InfluxDBv1Session:
"""
The `InfluxDBv1Session` class serves as a context manager for a connection
to an InfluxDB server. This connection is established using credentials provided
through environment variables.
Example:
```python
with InfluxDBv1Session() as influx_client:
# Creating a database
influx_client.create_database(
dbname='my_new_database'
)
# Writing data to a database
data = [
{
"measurement": "your_measurement", # Replace with your measurement
"tags": {
"example_tag": "tag_value", # Replace with your tag and value
},
"fields": {
"example_field": 123, # Replace with your field and its value
},
"time": "2023-06-05T00:00:00Z", # Replace with your timestamp
}
]
influx_client.write_points(data=[data, data], database="other_database")
# just write one point into the client's current database
influx_client.write(data=data[0])
```
"""
conf_folder: Path | str
def __init__(self) -> None:
self._config = InfluxDBv1Config(
config_sources=FileSource(
ServiceConfig().database_config_dir / "influxdbv1_config.yaml"
)
)
self._client: influxdb.InfluxDBClient
self._host = self._config.host
self._port = self._config.port
self._username = self._config.username
self._password = self._config.password
self._database = self._config.database
self._ssl = self._config.ssl
self._verify_ssl = self._config.verify_ssl
def __enter__(self) -> Self:
self._client = influxdb.InfluxDBClient(
host=self._host,
port=self._port,
username=self._username,
password=self._password.get_secret_value(),
database=self._database,
ssl=self._ssl,
verify_ssl=self._verify_ssl,
)
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
exc_traceback: TracebackType | None,
) -> None:
self._client.close()
def write(
self,
data: dict[str, Any],
) -> Any:
"""Write data to InfluxDB.
Args:
data:
The data to be written.
Example:
```python
>>> data = {
"measurement": "cpu_load_short",
"tags": {
"host": "server01",
"region": "us-west"
},
"time": "2009-11-10T23:00:00Z",
"fields": {
"value": 0.64
}
}
>>> with InfluxDBv1Session() as client:
client.write(data=data)
```
"""
self._client.write(data=data)
def write_points( # noqa: PLR0913
self,
points: list[dict[str, Any]],
time_precision: Literal["s", "m", "ms", "u"] | None = None,
database: str | None = None,
tags: dict[str, str] | None = None,
batch_size: int | None = None,
consistency: Literal["any", "one", "quorum", "all"] | None = None,
) -> bool:
"""Write to multiple time series names.
Args:
points:
The list of points to be written in the database.
time_precision:
Either 's', 'm', 'ms' or 'u', defaults to None.
database:
The database to write the points to. Defaults to the client's current
database.
tags:
A set of key-value pairs associated with each point. Both keys and
values must be strings. These are shared tags and will be merged with
point-specific tags. Defaults to None.
batch_size:
Value to write the points in batches instead of all at one time. Useful
for when doing data dumps from one database to another or when doing a
massive write operation. Defaults to None
consistency:
Consistency for the points. One of {'any','one','quorum','all'}.
Return:
True, if the operation is successful
Example:
```python
>>> data = {
"measurement": "cpu_load_short",
"tags": {
"host": "server01",
"region": "us-west"
},
"time": "2009-11-10T23:00:00Z",
"fields": {
"value": 0.64
}
}
>>> with InfluxDBv1Session() as client:
client.write(data=data)
```
"""
return self._client.write_points(
points=points,
time_precision=time_precision,
database=database,
tags=tags,
batch_size=batch_size,
consistency=consistency,
)
def create_database(
self,
dbname: str,
) -> None:
"""Create a new database in the InfluxDB instance. This function wraps the
create_database from `influxdb` in a try-catch block and logs potential errors.
Args:
dbname:
The name of the database to create.
"""
try:
self._client.create_database(dbname=dbname)
except Exception as e:
logger.error(e)

View File

@ -14,11 +14,13 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry.dependencies]
python = "^3.10"
confz = "^2.0.0"
pydase = ">=0.8.2"
# pydase = ">=0.8.2"
pydase = { path = "../../pydase", develop = true }
# optional dependencies
tiqi-rpc = { git = "ssh://git@gitlab.phys.ethz.ch/tiqi-projects/tiqi-rpc-python.git", optional = true }
influxdb-client = { version = "^1.36.1", optional = true}
influxdb = { version = "^5.3.2", optional = true }
sqlmodel = { version = "^0.0.14", optional = true }
psycopg2-binary = { version = "^2.9.6", optional = true }
python-dateutil = { version = "^2.8.2", optional = true }
@ -26,6 +28,7 @@ python-dateutil = { version = "^2.8.2", optional = true }
[tool.poetry.extras]
ionizer = ["tiqi-rpc"]
influxdbv2 = ["influxdb-client"]
influxdbv1 = ["influxdb"]
postgresql = ["sqlmodel", "psycopg2-binary", "python-dateutil"]
[tool.poetry.group.dev]