diff --git a/README.md b/README.md index da02d93..2dcf7d6 100644 --- a/README.md +++ b/README.md @@ -24,6 +24,10 @@ To utilize specific functionalities such as `IonizerServer`, `InfluxDBv1Session` ```bash poetry add "git+https://github.com/tiqi-group/pydase_service_base.git#main[influxdbv2]" ``` +- For InfluxDBv3Session, include the influxdbv3 extra: + ```bash + poetry add "git+https://github.com/tiqi-group/pydase_service_base.git#main[influxdbv3]" + ``` - For `PostgresDatabaseSession`, include the `postgresql` extra: ```bash poetry add "git+https://github.com/tiqi-group/pydase_service_base.git#main[postgresql]" @@ -74,6 +78,14 @@ headers: Host: other-virtual-host.ethz.ch ``` +```yaml +url: https://database-url.ch +org: your-org +bucket: your-bucket +token: +verify_ssl: True +``` + `postgres_development.yaml` / `postgres_production.yaml`: ```yaml database: ... @@ -112,6 +124,41 @@ with InfluxDBv1Session() as influx_client: **Note** that you have to set `ssl` and `verify_ssl` to `False` when you are using a local influxdb instance. +### InfluxDBv3Session + +Interact with an InfluxDB 3.x server using the `InfluxDBv3Session` class. **This requires the `influxdbv3` optional dependency**. + +```python +from pydase_service_base.database.influxdbv3_session import InfluxDBv3Session, WritePrecision +from influxdb_client_3 import Point +import time + +# Option 1: Initialize from config file (recommended) +with InfluxDBv3Session.from_config_file() as session: + point = Point("temperature").tag("location", "office").field("value", 23.5).time(int(time.time() * 1e9), WritePrecision.NS) + session.write(bucket="test-bucket", record=point) + # Query data (returns a pyarrow.Table) + table = session._client.query('SELECT * FROM "test-bucket" WHERE location = \'office\'', language="sql") + print(table.to_pandas()) + +# Option 2: Initialize directly +with InfluxDBv3Session( + host="http://localhost:8181", + org="test-org", + bucket="test-bucket", + token="", + verify_ssl=False, +) as session: + # ... same as above ... + pass +``` + +**Note:** +- You must create the bucket (`test-bucket`) before writing data. +- The token must have sufficient privileges to write and query data. + +--- + ### InfluxDBSession Interact with an InfluxDB server using the `InfluxDBSession` class. **Note that this class only supports InfluxDB v2** and **requires the `influxdbv2` optional dependency**. diff --git a/pydase_service_base/database/__init__.py b/pydase_service_base/database/__init__.py index 8a3b775..e9bbf61 100644 --- a/pydase_service_base/database/__init__.py +++ b/pydase_service_base/database/__init__.py @@ -3,6 +3,7 @@ from typing import TYPE_CHECKING if TYPE_CHECKING: from .influxdb_session import InfluxDBSession # type: ignore from .influxdbv1_session import InfluxDBv1Session # type: ignore + from .influxdbv3_session import InfluxDBv3Session # type: ignore from .postgres_session import PostgresDatabaseSession # type: ignore else: @@ -35,6 +36,19 @@ else: "Please refer to https://gitlab.phys.ethz.ch/tiqi-projects/qchub/icon-services/pydase_service_base." ) + try: + import influxdb_client_3 # type: ignore # noqa + + from .influxdbv3_session import InfluxDBv3Session # type: ignore + except ImportError: + + class InfluxDBv3Session: # type: ignore + def __init__(self) -> None: + raise OptionalDependencyError( + "InfluxDBv3Session requires the 'influxdbv3' extra. " + "Please refer to https://gitlab.phys.ethz.ch/tiqi-projects/qchub/icon-services/pydase_service_base." + ) + try: import sqlmodel # noqa @@ -49,4 +63,9 @@ else: ) -__all__ = ["InfluxDBSession", "InfluxDBv1Session", "PostgresDatabaseSession"] +__all__ = [ + "InfluxDBSession", + "InfluxDBv1Session", + "InfluxDBv3Session", + "PostgresDatabaseSession", +] diff --git a/pydase_service_base/database/config.py b/pydase_service_base/database/config.py index f4c4b25..c19374c 100644 --- a/pydase_service_base/database/config.py +++ b/pydase_service_base/database/config.py @@ -42,3 +42,11 @@ class InfluxDBv1Config(BaseConfig): # type: ignore ssl: bool = True verify_ssl: bool = True headers: dict[str, str] = {} # noqa: RUF012 + + +class InfluxDBv3Config(BaseConfig): # type: ignore + url: str + org: str + token: SecretStr + verify_ssl: bool = True + bucket: str diff --git a/pydase_service_base/database/influxdbv3_session.py b/pydase_service_base/database/influxdbv3_session.py new file mode 100644 index 0000000..5a6c299 --- /dev/null +++ b/pydase_service_base/database/influxdbv3_session.py @@ -0,0 +1,122 @@ +"""Module for InfluxDB v3 session management.""" + +from __future__ import annotations +from influxdb_client_3 import InfluxDBClient3, Point, WritePrecision as WritePrecision +from typing import Any, NamedTuple, Iterable +from types import TracebackType +import logging +from confz import FileSource + +from pydase_service_base.database.config import InfluxDBv3Config +from pydase_service_base.database.config import ServiceConfig +from os import PathLike + +logger = logging.getLogger(__name__) + +__all__ = [ + "InfluxDBv3Session", + "WritePrecision", +] + +class InfluxDBv3Session: + """Context manager for InfluxDB v3 session.""" + + def __init__( + self, host: str, org: str, bucket: str, token: str, verify_ssl: bool = True + ): + """Initialize InfluxDB v3 session. + + Args: + host (str): The InfluxDB host URL. + org (str): The organization name. + bucket (str): The bucket name. + token (str): The authentication token. + verify_ssl (bool): Whether to verify SSL certificates. Defaults to True. + Recommended to set it to True in production environments. + """ + self._bucket = bucket + self._org = org + self._host = host + self._token = token + self._verify_ssl = verify_ssl + self._client = InfluxDBClient3( + self._host, + self._org, + self._bucket, + self._token, + verify_ssl=self._verify_ssl, + ) + + def __enter__(self) -> InfluxDBv3Session: + return self + + def __exit__( + self, + exc_type: type[BaseException] | None, + exc_value: BaseException | None, + exc_traceback: TracebackType | None, + ) -> None: + self._client.close() + + def write( + self, + bucket: str, + record: str + | Iterable[str] + | Point + | Iterable[Point] + | dict[str, Any] + | Iterable[dict[str, Any]] + | bytes + | Iterable[bytes] + | NamedTuple + | Iterable[NamedTuple], + write_precision: WritePrecision | None = None, + ) -> None: + """Write records to InfluxDB v3. + + Args: + bucket (str): The target bucket. + record (various types): The record(s) to write. + write_precision (WritePrecision | None): Precision of the timestamps. + If None, defaults to the NS (nanoseconds) precision. + Only matters if the record contains timestamps in POSIX format. + """ + logger.debug("Writing to InfluxDB v3 bucket %s: %s", bucket, record) + self._client.write(record) + + @classmethod + def from_config_file( + cls, path: str | PathLike | None = None + ) -> "InfluxDBv3Session": + """Create InfluxDBv3Session from configuration file. + + Args: + path (str | PathLike | None): Path to the configuration file. + If None, defaults to 'influxdbv3_config.yaml' in the database config directory. + + The database config directory can be set using environment variables or defaults. + + The default path is 'database_config/influxdbv3_config.yaml'. + + To change the default path, set the environment variable: + + `export SERVICE_DATABASE_CONFIG_DIR=/tmp/myconfigs` + Returns: + InfluxDBv3Session: An instance of InfluxDBv3Session. + """ + if path is not None: + _config = InfluxDBv3Config(config_sources=FileSource(path)) + else: + _config = InfluxDBv3Config( + config_sources=FileSource( + ServiceConfig().database_config_dir / "influxdbv3_config.yaml" + ) + ) + return cls( + host=_config.url, + org=_config.org, + bucket=_config.bucket, + token=_config.token.get_secret_value(), + verify_ssl=_config.verify_ssl, + ) diff --git a/pyproject.toml b/pyproject.toml index daadd0a..93700d5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,12 +20,14 @@ pydase = ">=0.10.0" tiqi-rpc = { git = "ssh://git@gitlab.phys.ethz.ch/tiqi-projects/tiqi-rpc-python.git", optional = true } influxdb-client = { version = "^1.36.1", optional = true} influxdb = { version = "^5.3.2", optional = true } +influxdb3-python = { version = "^0.15.0", optional = true } sqlmodel = { version = "^0.0.14", optional = true } psycopg2-binary = { version = "^2.9.6", optional = true } python-dateutil = { version = "^2.8.2", optional = true } [tool.poetry.extras] ionizer = ["tiqi-rpc"] +influxdbv3 = ["influxdb3-python"] influxdbv2 = ["influxdb-client"] influxdbv1 = ["influxdb"] postgresql = ["sqlmodel", "psycopg2-binary", "python-dateutil"] @@ -37,6 +39,9 @@ optional = true mypy = "^1.4.1" pyright = "^1.1.323" ruff = "^0.7.1" +pytest = "^8.0.0" +testcontainers = "^4.4.0" +requests = "^2.31.0" [tool.ruff] target-version = "py310" # Always generate Python 3.10-compatible code diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/database/test_create_config.py b/tests/database/test_create_config.py new file mode 100644 index 0000000..d05a15d --- /dev/null +++ b/tests/database/test_create_config.py @@ -0,0 +1,49 @@ +import pytest +from confz import BaseConfig, FileSource + +from pydase_service_base.database.create_config import ( + NoConfigSourceError, + create_config, +) + + +class DummyConfig(BaseConfig): + foo: str + CONFIG_SOURCES = None + + +def test_create_config_with_config_folder_and_file(tmp_path): + config_folder = tmp_path / "my_config" + config_folder.mkdir() + config_file = config_folder / "dummy.yaml" + config_file.write_text("foo: bar\n") + + # Should load config from the specified folder and file + config = create_config( + DummyConfig, config_folder=config_folder, config_file="dummy.yaml" + ) + assert config.foo == "bar" + + +def test_create_config_with_config_class_sources(tmp_path): + # DummyConfigWithSource has CONFIG_SOURCES set + class DummyConfigWithSource(BaseConfig): + foo: str + CONFIG_SOURCES = FileSource(tmp_path / "dummy.yaml") + + config_file = tmp_path / "dummy.yaml" + config_file.write_text("foo: qux\n") + + config = create_config(DummyConfigWithSource, config_file="dummy.yaml") + assert config.foo == "qux" + + +def test_create_config_raises_when_no_config_source(tmp_path): + # DummyConfigNoSource has CONFIG_SOURCES = None and no config_folder provided + class DummyConfigNoSource(BaseConfig): + foo: str + CONFIG_SOURCES = None + + with pytest.raises(NoConfigSourceError) as excinfo: + create_config(DummyConfigNoSource, config_file="dummy.yaml") + assert "No 'database_config' folder found" in str(excinfo.value) diff --git a/tests/database/test_influxdbv3_session.py b/tests/database/test_influxdbv3_session.py new file mode 100644 index 0000000..e124c7d --- /dev/null +++ b/tests/database/test_influxdbv3_session.py @@ -0,0 +1,149 @@ +import re +import time + +import pytest +from influxdb_client_3 import Point +from testcontainers.core.container import DockerContainer + +from pydase_service_base.database.influxdbv3_session import InfluxDBv3Session + +INFLUXDB_IMAGE = "influxdb:3.3-core" +INFLUXDB_PORT = 8181 +INFLUXDB_ORG = "test-org" +INFLUXDB_BUCKET = "test-bucket" +INFLUXDB_TOKEN = "test-token" +INFLUXDB_DATABASE = "test-database" + + +def parse_influxdb3_token(output: str) -> str: + """ + Extract the token from the output of 'influxdb3 create token --admin', + handling ANSI escape codes. + Raises ValueError if not found. + """ + ansi_escape = re.compile(r"\x1B\[[0-?]*[ -/]*[@-~]") + clean_output = ansi_escape.sub("", output) + + for line in clean_output.splitlines(): + line_strip = line.strip() + if line_strip.startswith("Token:"): + return line_strip.split("Token:", 1)[1].strip() + raise ValueError("Token not found in output") + + +@pytest.fixture +def influxdbv3_config_yaml(tmp_path, monkeypatch): + """ + Create a temporary influxdbv3_config.yaml and patch ServiceConfig to point to it. + """ + config_dir = tmp_path / "database_config" + config_dir.mkdir() + config_file = config_dir / "influxdbv3_config.yaml" + config_file.write_text( + """ +url: http://localhost:9999 +org: test-org +bucket: test-bucket +token: test-token +verify_ssl: false +""" + ) + monkeypatch.chdir(tmp_path) + return config_file + + +@pytest.fixture(scope="session") +def influxdb_container(): + """Spin up an InfluxDB 3.x container for integration testing.""" + with ( + DockerContainer(INFLUXDB_IMAGE) + .with_exposed_ports(INFLUXDB_PORT) + .with_bind_ports(INFLUXDB_PORT, 8181) + .with_command( + "influxdb3 serve " + "--node-id host01 " + "--object-store memory " + "--data-dir ~/.influxdb3" + ) + ) as container: + raw_token = ( + container.exec(["influxdb3", "create", "token", "--admin"]) + .output.decode() + .strip() + ) + admin_token = parse_influxdb3_token(raw_token) + host = container.get_container_host_ip() + port = INFLUXDB_PORT + url = f"http://{host}:{port}" + + headers = {"Authorization": f"Bearer {admin_token}"} + + for _ in range(10): + try: + import requests + + print(f"Checking InfluxDB health at {url}/health") + resp = requests.get(f"{url}/health", headers=headers, timeout=2) + if resp.status_code == 200: + break + except Exception: + pass + time.sleep(1) + else: + raise RuntimeError("InfluxDB did not become healthy in time") + create_bucket_cmd = [ + "influxdb3", + "create", + "database", + "--token", + admin_token, + INFLUXDB_BUCKET, + ] + container.exec(create_bucket_cmd) + yield { + "url": url, + "org": INFLUXDB_ORG, + "bucket": INFLUXDB_BUCKET, + "token": admin_token, + } + + +@pytest.mark.skip(reason="Requires Docker run in background, enable when needed") +def test_influxdbv3session_write(influxdb_container): + """Test writing and querying a point using InfluxDBv3Session.""" + url = influxdb_container["url"] + org = influxdb_container["org"] + bucket = influxdb_container["bucket"] + token = influxdb_container["token"] + + with InfluxDBv3Session( + host=url, + org=org, + bucket=bucket, + token=token, + verify_ssl=False, + ) as session: + point = Point("temperature").tag("location", "office").field("value", 23.5) + session.write(bucket=bucket, record=point) + + time.sleep(1) + + result = session._client.query( + "SELECT * FROM temperature", + database=bucket, + ) + assert result is not None + assert result["value"][0].as_py() == 23.5 # type: ignore + + +def test_from_config_file_initialization(influxdbv3_config_yaml): + """ + Test that InfluxDBv3Session.from_config_file initializes correctly from config file. + """ + session = InfluxDBv3Session.from_config_file() + assert session._client is not None + # Check that the client has the expected attributes + assert session._host == "http://localhost:9999" + assert session._org == "test-org" + assert session._bucket == "test-bucket" + assert session._token == "test-token"