From 80150ab50a4afd5abe97f4d582c0c595d5940b82 Mon Sep 17 00:00:00 2001 From: Mose Mueller Date: Tue, 5 Nov 2024 11:46:49 +0100 Subject: [PATCH] adds support for InfluxDB v1 clients --- README.md | 50 ++++- pydase_service_base/database/config.py | 10 + .../database/influxdbv1_session.py | 201 ++++++++++++++++++ pyproject.toml | 7 +- 4 files changed, 265 insertions(+), 3 deletions(-) create mode 100644 pydase_service_base/database/influxdbv1_session.py diff --git a/README.md b/README.md index cb21717..0f42556 100644 --- a/README.md +++ b/README.md @@ -10,12 +10,16 @@ Ensure you have Python 3.10 or later installed on your system. Dependencies of t poetry add git+https://github.com/tiqi-group/pydase_service_base.git ``` -To utilize specific functionalities such as `IonizerServer`, `InfluxDBSession`, or `PostgresDatabaseSession`, you need to install the relevant optional dependencies: +To utilize specific functionalities such as `IonizerServer`, `InfluxDBv1Session`,`InfluxDBSession`, or `PostgresDatabaseSession`, you need to install the relevant optional dependencies: - For `IonizerServer`, include the `ionizer` extra: ```bash poetry add "git+https://github.com/tiqi-group/pydase_service_base.git#main[ionizer]" ``` +- For `InfluxDBv1Session`, include the `influxdbv1` extra: + ```bash + poetry add "git+https://github.com/tiqi-group/pydase_service_base.git#main[influxdbv1]" + ``` - For `InfluxDBSession`, include the `influxdbv2` extra: ```bash poetry add "git+https://github.com/tiqi-group/pydase_service_base.git#main[influxdbv2]" @@ -39,6 +43,7 @@ Structure of the `database_config` folder: ``` database_config +├── influxdbv1_config.yaml ├── influxdb_config.yaml ├── postgres_development.yaml └── postgres_production.yaml @@ -46,6 +51,17 @@ database_config Example content for the configuration files: +`influxdbv1_config.yaml`: +```yaml +host: https://database-url.ch +port: 8086 +username: root +password: root +database: my_database +ssl: True +verify_ssl: True +``` + `influxdb_config.yaml`: ```yaml url: https://database-url.ch @@ -64,6 +80,38 @@ user: ... ## Usage +### InfluxDBv1Session + +Interact with an InfluxDBv1 server using the `InfluxDBv1Session` class. **Note that this class only supports InfluxDB v1** and **requires the `influxdbv1` optional dependency**. + +```python +from pydase_service_base.database import InfluxDBv1Session + +with InfluxDBv1Session() as influx_client: + # Creating a database + influx_client.create_database( + dbname='my_new_database' + ) + + # Writing data to a database + data = [ + { + "measurement": "your_measurement", # Replace with your measurement + "tags": { + "example_tag": "tag_value", # Replace with your tag and value + }, + "fields": { + "example_field": 123, # Replace with your field and its value + }, + "time": "2023-06-05T00:00:00Z", # Replace with your timestamp + } + ] + influx_client.write_points(data=[data, data], database="other_database") + + # just write one point into the client's current database + influx_client.write(data=data[0]) +``` + ### InfluxDBSession Interact with an InfluxDB server using the `InfluxDBSession` class. **Note that this class only supports InfluxDB v2** and **requires the `influxdbv2` optional dependency**. diff --git a/pydase_service_base/database/config.py b/pydase_service_base/database/config.py index beffa86..7809dc8 100644 --- a/pydase_service_base/database/config.py +++ b/pydase_service_base/database/config.py @@ -29,3 +29,13 @@ class InfluxDBConfig(BaseConfig): # type: ignore url: str org: str token: SecretStr + + +class InfluxDBv1Config(BaseConfig): # type: ignore + host: str + port: int + username: str + password: SecretStr + database: str + ssl: bool = True + verify_ssl: bool = True diff --git a/pydase_service_base/database/influxdbv1_session.py b/pydase_service_base/database/influxdbv1_session.py new file mode 100644 index 0000000..ab83d20 --- /dev/null +++ b/pydase_service_base/database/influxdbv1_session.py @@ -0,0 +1,201 @@ +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING, Any, Literal + +try: + from typing import Self # type: ignore +except ImportError: + from typing_extensions import Self + + +import influxdb +from confz import FileSource + +from pydase_service_base.database.config import InfluxDBv1Config, ServiceConfig + +if TYPE_CHECKING: + from pathlib import Path + from types import TracebackType + + +logger = logging.getLogger(__name__) + + +class InfluxDBv1Session: + """ + The `InfluxDBv1Session` class serves as a context manager for a connection + to an InfluxDB server. This connection is established using credentials provided + through environment variables. + + Example: + ```python + with InfluxDBv1Session() as influx_client: + # Creating a database + influx_client.create_database( + dbname='my_new_database' + ) + + # Writing data to a database + data = [ + { + "measurement": "your_measurement", # Replace with your measurement + "tags": { + "example_tag": "tag_value", # Replace with your tag and value + }, + "fields": { + "example_field": 123, # Replace with your field and its value + }, + "time": "2023-06-05T00:00:00Z", # Replace with your timestamp + } + ] + influx_client.write_points(data=[data, data], database="other_database") + + # just write one point into the client's current database + influx_client.write(data=data[0]) + ``` + """ + + conf_folder: Path | str + + def __init__(self) -> None: + self._config = InfluxDBv1Config( + config_sources=FileSource( + ServiceConfig().database_config_dir / "influxdbv1_config.yaml" + ) + ) + + self._client: influxdb.InfluxDBClient + self._host = self._config.host + self._port = self._config.port + self._username = self._config.username + self._password = self._config.password + self._database = self._config.database + self._ssl = self._config.ssl + self._verify_ssl = self._config.verify_ssl + + def __enter__(self) -> Self: + self._client = influxdb.InfluxDBClient( + host=self._host, + port=self._port, + username=self._username, + password=self._password.get_secret_value(), + database=self._database, + ssl=self._ssl, + verify_ssl=self._verify_ssl, + ) + return self + + def __exit__( + self, + exc_type: type[BaseException] | None, + exc_value: BaseException | None, + exc_traceback: TracebackType | None, + ) -> None: + self._client.close() + + def write( + self, + data: dict[str, Any], + ) -> Any: + """Write data to InfluxDB. + + Args: + data: + The data to be written. + + Example: + ```python + >>> data = { + "measurement": "cpu_load_short", + "tags": { + "host": "server01", + "region": "us-west" + }, + "time": "2009-11-10T23:00:00Z", + "fields": { + "value": 0.64 + } + } + >>> with InfluxDBv1Session() as client: + client.write(data=data) + ``` + """ + self._client.write(data=data) + + def write_points( # noqa: PLR0913 + self, + points: list[dict[str, Any]], + time_precision: Literal["s", "m", "ms", "u"] | None = None, + database: str | None = None, + tags: dict[str, str] | None = None, + batch_size: int | None = None, + consistency: Literal["any", "one", "quorum", "all"] | None = None, + ) -> bool: + """Write to multiple time series names. + + Args: + points: + The list of points to be written in the database. + time_precision: + Either 's', 'm', 'ms' or 'u', defaults to None. + database: + The database to write the points to. Defaults to the client's current + database. + tags: + A set of key-value pairs associated with each point. Both keys and + values must be strings. These are shared tags and will be merged with + point-specific tags. Defaults to None. + batch_size: + Value to write the points in batches instead of all at one time. Useful + for when doing data dumps from one database to another or when doing a + massive write operation. Defaults to None + consistency: + Consistency for the points. One of {'any','one','quorum','all'}. + + Return: + True, if the operation is successful + + Example: + ```python + >>> data = { + "measurement": "cpu_load_short", + "tags": { + "host": "server01", + "region": "us-west" + }, + "time": "2009-11-10T23:00:00Z", + "fields": { + "value": 0.64 + } + } + >>> with InfluxDBv1Session() as client: + client.write(data=data) + ``` + """ + + return self._client.write_points( + points=points, + time_precision=time_precision, + database=database, + tags=tags, + batch_size=batch_size, + consistency=consistency, + ) + + def create_database( + self, + dbname: str, + ) -> None: + """Create a new database in the InfluxDB instance. This function wraps the + create_database from `influxdb` in a try-catch block and logs potential errors. + + Args: + dbname: + The name of the database to create. + """ + + try: + self._client.create_database(dbname=dbname) + except Exception as e: + logger.error(e) diff --git a/pyproject.toml b/pyproject.toml index 30c1030..834b9ec 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,18 +14,21 @@ build-backend = "poetry.core.masonry.api" [tool.poetry.dependencies] python = "^3.10" confz = "^2.0.0" -pydase = ">=0.8.2" +# pydase = ">=0.8.2" +pydase = { path = "../../pydase", develop = true } # optional dependencies tiqi-rpc = { git = "ssh://git@gitlab.phys.ethz.ch/tiqi-projects/tiqi-rpc-python.git", optional = true } influxdb-client = { version = "^1.36.1", optional = true} -sqlmodel = { version = "^0.0.14", optional = true} +influxdb = { version = "^5.3.2", optional = true } +sqlmodel = { version = "^0.0.14", optional = true } psycopg2-binary = { version = "^2.9.6", optional = true } python-dateutil = { version = "^2.8.2", optional = true } [tool.poetry.extras] ionizer = ["tiqi-rpc"] influxdbv2 = ["influxdb-client"] +influxdbv1 = ["influxdb"] postgresql = ["sqlmodel", "psycopg2-binary", "python-dateutil"] [tool.poetry.group.dev]