From 4783f7323c85c91a92301fec0ac90e983cfb42c1 Mon Sep 17 00:00:00 2001 From: Chi-Huan Nguyen Date: Sun, 24 Aug 2025 16:27:09 +0200 Subject: [PATCH] Add influxdbv3 and tests --- pydase_service_base/database/config.py | 7 + .../database/influxdbv3_session.py | 112 +++++++++++++ tests/__init__.py | 0 tests/test_create_config.py | 50 ++++++ tests/test_influxdbv3_session.py | 151 ++++++++++++++++++ 5 files changed, 320 insertions(+) create mode 100644 pydase_service_base/database/influxdbv3_session.py create mode 100644 tests/__init__.py create mode 100644 tests/test_create_config.py create mode 100644 tests/test_influxdbv3_session.py diff --git a/pydase_service_base/database/config.py b/pydase_service_base/database/config.py index f4c4b25..072d882 100644 --- a/pydase_service_base/database/config.py +++ b/pydase_service_base/database/config.py @@ -42,3 +42,10 @@ class InfluxDBv1Config(BaseConfig): # type: ignore ssl: bool = True verify_ssl: bool = True headers: dict[str, str] = {} # noqa: RUF012 + +class InfluxDBv3Config(BaseConfig): # type: ignore + url: str + org: str + token: SecretStr + verify_ssl: bool = True + bucket: str diff --git a/pydase_service_base/database/influxdbv3_session.py b/pydase_service_base/database/influxdbv3_session.py new file mode 100644 index 0000000..9aded39 --- /dev/null +++ b/pydase_service_base/database/influxdbv3_session.py @@ -0,0 +1,112 @@ +"""Module for InfluxDB v3 session management.""" +from __future__ import annotations +from influxdb_client_3 import InfluxDBClient3, Point, WritePrecision as _WritePrecision +from typing import Any, NamedTuple, Iterable +from types import TracebackType +import logging +from confz import FileSource + +from pydase_service_base.database.config import InfluxDBv3Config +from pydase_service_base.database.config import ServiceConfig +from os import PathLike + +logger = logging.getLogger(__name__) + +__all__ = [ + "InfluxDBv3Session", + "WritePrecision", +] + +WritePrecision = _WritePrecision + +class InfluxDBv3Session: + """Context manager for InfluxDB v3 session.""" + def __init__(self, host: str, org: str, bucket: str, token: str, verify_ssl: bool = True): + """Initialize InfluxDB v3 session. + + Args: + host (str): The InfluxDB host URL. + org (str): The organization name. + bucket (str): The bucket name. + token (str): The authentication token. + verify_ssl (bool): Whether to verify SSL certificates. Defaults to True. + Recommended to set it to True in production environments. + """ + self._bucket = bucket + self._org = org + self._host = host + self._token = token + self._verify_ssl = verify_ssl + self._client = InfluxDBClient3(self._host, self._org, self._bucket, self._token, verify_ssl=self._verify_ssl) + + def __enter__(self) -> InfluxDBv3Session: + return self + + def __exit__( + self, + exc_type: type[BaseException] | None, + exc_value: BaseException | None, + exc_traceback: TracebackType | None, + ) -> None: + self._client.close() + + + def write(self, bucket: str, + record:str + | Iterable[str] + | Point + | Iterable[Point] + | dict[str, Any] + | Iterable[dict[str, Any]] + | bytes + | Iterable[bytes] + | NamedTuple + | Iterable[NamedTuple], + write_precision: WritePrecision | None = None)->None: + """Write records to InfluxDB v3. + + Args: + bucket (str): The target bucket. + record (various types): The record(s) to write. + write_precision (WritePrecision | None): Precision of the timestamps. + If None, defaults to the NS (nanoseconds) precision. + Only matters if the record contains timestamps in POSIX format. + """ + logger.debug("Writing to InfluxDB v3 bucket %s: %s", bucket, record) + self._client.write(record) + + @classmethod + def from_config_file(cls, path: str | PathLike | None = None)->"InfluxDBv3Session": + """Create InfluxDBv3Session from configuration file. + + Args: + path (str | PathLike | None): Path to the configuration file. + If None, defaults to 'influxdbv3_config.yaml' in the database config directory. + + The database config directory can be set using environment variables or defaults. + + The default path is 'database_config/influxdbv3_config.yaml'. + + To change the default path, set the environment variable: + + `export SERVICE_DATABASE_CONFIG_DIR=/tmp/myconfigs` + Returns: + InfluxDBv3Session: An instance of InfluxDBv3Session. + """ + if path is not None: + _config = InfluxDBv3Config( + config_sources=FileSource(path) + ) + else: + _config = InfluxDBv3Config( + config_sources=FileSource( + ServiceConfig().database_config_dir / "influxdbv3_config.yaml" + ) + ) + return cls( + host=_config.url, + org=_config.org, + bucket=_config.bucket, + token=_config.token.get_secret_value(), + verify_ssl=_config.verify_ssl + ) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_create_config.py b/tests/test_create_config.py new file mode 100644 index 0000000..c281a71 --- /dev/null +++ b/tests/test_create_config.py @@ -0,0 +1,50 @@ + +import pytest +from confz import BaseConfig, FileSource + +from pydase_service_base.database.create_config import ( + NoConfigSourceError, + create_config, +) + + +class DummyConfig(BaseConfig): + foo: str + CONFIG_SOURCES = None + + +def test_create_config_with_config_folder_and_file(tmp_path): + config_folder = tmp_path / "my_config" + config_folder.mkdir() + config_file = config_folder / "dummy.yaml" + config_file.write_text("foo: bar\n") + + # Should load config from the specified folder and file + config = create_config( + DummyConfig, config_folder=config_folder, config_file="dummy.yaml" + ) + assert config.foo == "bar" + + +def test_create_config_with_config_class_sources(tmp_path): + # DummyConfigWithSource has CONFIG_SOURCES set + class DummyConfigWithSource(BaseConfig): + foo: str + CONFIG_SOURCES = FileSource(tmp_path / "dummy.yaml") + + config_file = tmp_path / "dummy.yaml" + config_file.write_text("foo: qux\n") + + config = create_config(DummyConfigWithSource, config_file="dummy.yaml") + assert config.foo == "qux" + + +def test_create_config_raises_when_no_config_source(tmp_path): + # DummyConfigNoSource has CONFIG_SOURCES = None and no config_folder provided + class DummyConfigNoSource(BaseConfig): + foo: str + CONFIG_SOURCES = None + + with pytest.raises(NoConfigSourceError) as excinfo: + create_config(DummyConfigNoSource, config_file="dummy.yaml") + assert "No 'database_config' folder found" in str(excinfo.value) diff --git a/tests/test_influxdbv3_session.py b/tests/test_influxdbv3_session.py new file mode 100644 index 0000000..e9aff7d --- /dev/null +++ b/tests/test_influxdbv3_session.py @@ -0,0 +1,151 @@ +import re +import time + +import pytest +from influxdb_client_3 import Point +from testcontainers.core.container import DockerContainer + +from pydase_service_base.database.influxdbv3_session import InfluxDBv3Session + +INFLUXDB_IMAGE = "influxdb:3.3-core" +INFLUXDB_PORT = 8181 +INFLUXDB_ORG = "test-org" +INFLUXDB_BUCKET = "test-bucket" +INFLUXDB_TOKEN = "test-token" +INFLUXDB_DATABASE = "test-database" + + +def parse_influxdb3_token(output: str) -> str: + """ + Extract the token from the output of 'influxdb3 create token --admin', + handling ANSI escape codes. + Raises ValueError if not found. + """ + ansi_escape = re.compile(r"\x1B\[[0-?]*[ -/]*[@-~]") + clean_output = ansi_escape.sub("", output) + + for line in clean_output.splitlines(): + line_strip = line.strip() + if line_strip.startswith("Token:"): + return line_strip.split("Token:", 1)[1].strip() + raise ValueError("Token not found in output") + + +@pytest.fixture +def influxdbv3_config_yaml(tmp_path, monkeypatch): + """ + Create a temporary influxdbv3_config.yaml and patch ServiceConfig to point to it. + """ + config_dir = tmp_path / "database_config" + config_dir.mkdir() + config_file = config_dir / "influxdbv3_config.yaml" + config_file.write_text( + """ +url: http://localhost:9999 +org: test-org +bucket: test-bucket +token: test-token +verify_ssl: false +""" + ) + monkeypatch.chdir(tmp_path) + return config_file + + +@pytest.fixture(scope="session") +def influxdb_container(): + """Spin up an InfluxDB 3.x container for integration testing.""" + with ( + DockerContainer(INFLUXDB_IMAGE) + .with_exposed_ports(INFLUXDB_PORT) + .with_bind_ports(INFLUXDB_PORT, 8181) + .with_command( + "influxdb3 serve " + "--node-id host01 " + "--object-store memory " + "--data-dir ~/.influxdb3" + ) + ) as container: + raw_token = ( + container.exec(["influxdb3", "create", "token", "--admin"]) + .output.decode() + .strip() + ) + admin_token = parse_influxdb3_token(raw_token) + host = container.get_container_host_ip() + port = INFLUXDB_PORT + url = f"http://{host}:{port}" + + headers = {"Authorization": f"Bearer {admin_token}"} + + for _ in range(10): + try: + import requests + + print(f"Checking InfluxDB health at {url}/health") + resp = requests.get(f"{url}/health", headers=headers, timeout=2) + if resp.status_code == 200: + break + except Exception: + pass + time.sleep(1) + else: + raise RuntimeError("InfluxDB did not become healthy in time") + create_bucket_cmd = [ + "influxdb3", + "create", + "database", + "--token", + admin_token, + INFLUXDB_BUCKET, + ] + container.exec(create_bucket_cmd) + yield { + "url": url, + "org": INFLUXDB_ORG, + "bucket": INFLUXDB_BUCKET, + "token": admin_token, + } + + +@pytest.mark.skip(reason="Requires Docker run in background, enable when needed") +def test_influxdbv3session_write_and_query(influxdb_container): + """Test writing and querying a point using InfluxDBv3Session.""" + url = influxdb_container["url"] + org = influxdb_container["org"] + bucket = influxdb_container["bucket"] + token = influxdb_container["token"] + + with InfluxDBv3Session( + host=url, + org=org, + bucket=bucket, + token=token, + verify_ssl=False, + ) as session: + point = Point("temperature").tag("location", "office").field("value", 23.5) + session.write(bucket=bucket, record=point) + + time.sleep(1) + + result = session._client.query( + "SELECT * FROM temperature", + database=bucket, + ) + assert result is not None + assert result["value"][0].as_py() == 23.5 # type: ignore + + + + +def test_from_config_file_initialization(influxdbv3_config_yaml): + """ + Test that InfluxDBv3Session.from_config_file initializes correctly from config file. + """ + session = InfluxDBv3Session.from_config_file() + assert session._client is not None + # Check that the client has the expected attributes + assert session._host == "http://localhost:9999" + assert session._org == "test-org" + assert session._bucket == "test-bucket" + assert session._token == "test-token"