Add influxdbv3 and tests

This commit is contained in:
Chi-Huan Nguyen
2025-08-24 16:27:09 +02:00
parent ce96891f9d
commit 4783f7323c
5 changed files with 320 additions and 0 deletions

View File

@@ -42,3 +42,10 @@ class InfluxDBv1Config(BaseConfig): # type: ignore
ssl: bool = True
verify_ssl: bool = True
headers: dict[str, str] = {} # noqa: RUF012
class InfluxDBv3Config(BaseConfig): # type: ignore
url: str
org: str
token: SecretStr
verify_ssl: bool = True
bucket: str

View File

@@ -0,0 +1,112 @@
"""Module for InfluxDB v3 session management."""
from __future__ import annotations
from influxdb_client_3 import InfluxDBClient3, Point, WritePrecision as _WritePrecision
from typing import Any, NamedTuple, Iterable
from types import TracebackType
import logging
from confz import FileSource
from pydase_service_base.database.config import InfluxDBv3Config
from pydase_service_base.database.config import ServiceConfig
from os import PathLike
logger = logging.getLogger(__name__)
__all__ = [
"InfluxDBv3Session",
"WritePrecision",
]
WritePrecision = _WritePrecision
class InfluxDBv3Session:
"""Context manager for InfluxDB v3 session."""
def __init__(self, host: str, org: str, bucket: str, token: str, verify_ssl: bool = True):
"""Initialize InfluxDB v3 session.
Args:
host (str): The InfluxDB host URL.
org (str): The organization name.
bucket (str): The bucket name.
token (str): The authentication token.
verify_ssl (bool): Whether to verify SSL certificates. Defaults to True.
Recommended to set it to True in production environments.
"""
self._bucket = bucket
self._org = org
self._host = host
self._token = token
self._verify_ssl = verify_ssl
self._client = InfluxDBClient3(self._host, self._org, self._bucket, self._token, verify_ssl=self._verify_ssl)
def __enter__(self) -> InfluxDBv3Session:
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
exc_traceback: TracebackType | None,
) -> None:
self._client.close()
def write(self, bucket: str,
record:str
| Iterable[str]
| Point
| Iterable[Point]
| dict[str, Any]
| Iterable[dict[str, Any]]
| bytes
| Iterable[bytes]
| NamedTuple
| Iterable[NamedTuple],
write_precision: WritePrecision | None = None)->None:
"""Write records to InfluxDB v3.
Args:
bucket (str): The target bucket.
record (various types): The record(s) to write.
write_precision (WritePrecision | None): Precision of the timestamps.
If None, defaults to the NS (nanoseconds) precision.
Only matters if the record contains timestamps in POSIX format.
"""
logger.debug("Writing to InfluxDB v3 bucket %s: %s", bucket, record)
self._client.write(record)
@classmethod
def from_config_file(cls, path: str | PathLike | None = None)->"InfluxDBv3Session":
"""Create InfluxDBv3Session from configuration file.
Args:
path (str | PathLike | None): Path to the configuration file.
If None, defaults to 'influxdbv3_config.yaml' in the database config directory.
The database config directory can be set using environment variables or defaults.
The default path is 'database_config/influxdbv3_config.yaml'.
To change the default path, set the environment variable:
`export SERVICE_DATABASE_CONFIG_DIR=/tmp/myconfigs`
Returns:
InfluxDBv3Session: An instance of InfluxDBv3Session.
"""
if path is not None:
_config = InfluxDBv3Config(
config_sources=FileSource(path)
)
else:
_config = InfluxDBv3Config(
config_sources=FileSource(
ServiceConfig().database_config_dir / "influxdbv3_config.yaml"
)
)
return cls(
host=_config.url,
org=_config.org,
bucket=_config.bucket,
token=_config.token.get_secret_value(),
verify_ssl=_config.verify_ssl
)

0
tests/__init__.py Normal file
View File

View File

@@ -0,0 +1,50 @@
import pytest
from confz import BaseConfig, FileSource
from pydase_service_base.database.create_config import (
NoConfigSourceError,
create_config,
)
class DummyConfig(BaseConfig):
foo: str
CONFIG_SOURCES = None
def test_create_config_with_config_folder_and_file(tmp_path):
config_folder = tmp_path / "my_config"
config_folder.mkdir()
config_file = config_folder / "dummy.yaml"
config_file.write_text("foo: bar\n")
# Should load config from the specified folder and file
config = create_config(
DummyConfig, config_folder=config_folder, config_file="dummy.yaml"
)
assert config.foo == "bar"
def test_create_config_with_config_class_sources(tmp_path):
# DummyConfigWithSource has CONFIG_SOURCES set
class DummyConfigWithSource(BaseConfig):
foo: str
CONFIG_SOURCES = FileSource(tmp_path / "dummy.yaml")
config_file = tmp_path / "dummy.yaml"
config_file.write_text("foo: qux\n")
config = create_config(DummyConfigWithSource, config_file="dummy.yaml")
assert config.foo == "qux"
def test_create_config_raises_when_no_config_source(tmp_path):
# DummyConfigNoSource has CONFIG_SOURCES = None and no config_folder provided
class DummyConfigNoSource(BaseConfig):
foo: str
CONFIG_SOURCES = None
with pytest.raises(NoConfigSourceError) as excinfo:
create_config(DummyConfigNoSource, config_file="dummy.yaml")
assert "No 'database_config' folder found" in str(excinfo.value)

View File

@@ -0,0 +1,151 @@
import re
import time
import pytest
from influxdb_client_3 import Point
from testcontainers.core.container import DockerContainer
from pydase_service_base.database.influxdbv3_session import InfluxDBv3Session
INFLUXDB_IMAGE = "influxdb:3.3-core"
INFLUXDB_PORT = 8181
INFLUXDB_ORG = "test-org"
INFLUXDB_BUCKET = "test-bucket"
INFLUXDB_TOKEN = "test-token"
INFLUXDB_DATABASE = "test-database"
def parse_influxdb3_token(output: str) -> str:
"""
Extract the token from the output of 'influxdb3 create token --admin',
handling ANSI escape codes.
Raises ValueError if not found.
"""
ansi_escape = re.compile(r"\x1B\[[0-?]*[ -/]*[@-~]")
clean_output = ansi_escape.sub("", output)
for line in clean_output.splitlines():
line_strip = line.strip()
if line_strip.startswith("Token:"):
return line_strip.split("Token:", 1)[1].strip()
raise ValueError("Token not found in output")
@pytest.fixture
def influxdbv3_config_yaml(tmp_path, monkeypatch):
"""
Create a temporary influxdbv3_config.yaml and patch ServiceConfig to point to it.
"""
config_dir = tmp_path / "database_config"
config_dir.mkdir()
config_file = config_dir / "influxdbv3_config.yaml"
config_file.write_text(
"""
url: http://localhost:9999
org: test-org
bucket: test-bucket
token: test-token
verify_ssl: false
"""
)
monkeypatch.chdir(tmp_path)
return config_file
@pytest.fixture(scope="session")
def influxdb_container():
"""Spin up an InfluxDB 3.x container for integration testing."""
with (
DockerContainer(INFLUXDB_IMAGE)
.with_exposed_ports(INFLUXDB_PORT)
.with_bind_ports(INFLUXDB_PORT, 8181)
.with_command(
"influxdb3 serve "
"--node-id host01 "
"--object-store memory "
"--data-dir ~/.influxdb3"
)
) as container:
raw_token = (
container.exec(["influxdb3", "create", "token", "--admin"])
.output.decode()
.strip()
)
admin_token = parse_influxdb3_token(raw_token)
host = container.get_container_host_ip()
port = INFLUXDB_PORT
url = f"http://{host}:{port}"
headers = {"Authorization": f"Bearer {admin_token}"}
for _ in range(10):
try:
import requests
print(f"Checking InfluxDB health at {url}/health")
resp = requests.get(f"{url}/health", headers=headers, timeout=2)
if resp.status_code == 200:
break
except Exception:
pass
time.sleep(1)
else:
raise RuntimeError("InfluxDB did not become healthy in time")
create_bucket_cmd = [
"influxdb3",
"create",
"database",
"--token",
admin_token,
INFLUXDB_BUCKET,
]
container.exec(create_bucket_cmd)
yield {
"url": url,
"org": INFLUXDB_ORG,
"bucket": INFLUXDB_BUCKET,
"token": admin_token,
}
@pytest.mark.skip(reason="Requires Docker run in background, enable when needed")
def test_influxdbv3session_write_and_query(influxdb_container):
"""Test writing and querying a point using InfluxDBv3Session."""
url = influxdb_container["url"]
org = influxdb_container["org"]
bucket = influxdb_container["bucket"]
token = influxdb_container["token"]
with InfluxDBv3Session(
host=url,
org=org,
bucket=bucket,
token=token,
verify_ssl=False,
) as session:
point = Point("temperature").tag("location", "office").field("value", 23.5)
session.write(bucket=bucket, record=point)
time.sleep(1)
result = session._client.query(
"SELECT * FROM temperature",
database=bucket,
)
assert result is not None
assert result["value"][0].as_py() == 23.5 # type: ignore
def test_from_config_file_initialization(influxdbv3_config_yaml):
"""
Test that InfluxDBv3Session.from_config_file initializes correctly from config file.
"""
session = InfluxDBv3Session.from_config_file()
assert session._client is not None
# Check that the client has the expected attributes
assert session._host == "http://localhost:9999"
assert session._org == "test-org"
assert session._bucket == "test-bucket"
assert session._token == "test-token"