mirror of
https://github.com/tiqi-group/pydase_service_base.git
synced 2026-02-18 17:18:45 +01:00
Move test files
This commit is contained in:
49
tests/database/test_create_config.py
Normal file
49
tests/database/test_create_config.py
Normal file
@@ -0,0 +1,49 @@
|
||||
import pytest
|
||||
from confz import BaseConfig, FileSource
|
||||
|
||||
from pydase_service_base.database.create_config import (
|
||||
NoConfigSourceError,
|
||||
create_config,
|
||||
)
|
||||
|
||||
|
||||
class DummyConfig(BaseConfig):
|
||||
foo: str
|
||||
CONFIG_SOURCES = None
|
||||
|
||||
|
||||
def test_create_config_with_config_folder_and_file(tmp_path):
|
||||
config_folder = tmp_path / "my_config"
|
||||
config_folder.mkdir()
|
||||
config_file = config_folder / "dummy.yaml"
|
||||
config_file.write_text("foo: bar\n")
|
||||
|
||||
# Should load config from the specified folder and file
|
||||
config = create_config(
|
||||
DummyConfig, config_folder=config_folder, config_file="dummy.yaml"
|
||||
)
|
||||
assert config.foo == "bar"
|
||||
|
||||
|
||||
def test_create_config_with_config_class_sources(tmp_path):
|
||||
# DummyConfigWithSource has CONFIG_SOURCES set
|
||||
class DummyConfigWithSource(BaseConfig):
|
||||
foo: str
|
||||
CONFIG_SOURCES = FileSource(tmp_path / "dummy.yaml")
|
||||
|
||||
config_file = tmp_path / "dummy.yaml"
|
||||
config_file.write_text("foo: qux\n")
|
||||
|
||||
config = create_config(DummyConfigWithSource, config_file="dummy.yaml")
|
||||
assert config.foo == "qux"
|
||||
|
||||
|
||||
def test_create_config_raises_when_no_config_source(tmp_path):
|
||||
# DummyConfigNoSource has CONFIG_SOURCES = None and no config_folder provided
|
||||
class DummyConfigNoSource(BaseConfig):
|
||||
foo: str
|
||||
CONFIG_SOURCES = None
|
||||
|
||||
with pytest.raises(NoConfigSourceError) as excinfo:
|
||||
create_config(DummyConfigNoSource, config_file="dummy.yaml")
|
||||
assert "No 'database_config' folder found" in str(excinfo.value)
|
||||
149
tests/database/test_influxdbv3_session.py
Normal file
149
tests/database/test_influxdbv3_session.py
Normal file
@@ -0,0 +1,149 @@
|
||||
import re
|
||||
import time
|
||||
|
||||
import pytest
|
||||
from influxdb_client_3 import Point
|
||||
from testcontainers.core.container import DockerContainer
|
||||
|
||||
from pydase_service_base.database.influxdbv3_session import InfluxDBv3Session
|
||||
|
||||
INFLUXDB_IMAGE = "influxdb:3.3-core"
|
||||
INFLUXDB_PORT = 8181
|
||||
INFLUXDB_ORG = "test-org"
|
||||
INFLUXDB_BUCKET = "test-bucket"
|
||||
INFLUXDB_TOKEN = "test-token"
|
||||
INFLUXDB_DATABASE = "test-database"
|
||||
|
||||
|
||||
def parse_influxdb3_token(output: str) -> str:
|
||||
"""
|
||||
Extract the token from the output of 'influxdb3 create token --admin',
|
||||
handling ANSI escape codes.
|
||||
Raises ValueError if not found.
|
||||
"""
|
||||
ansi_escape = re.compile(r"\x1B\[[0-?]*[ -/]*[@-~]")
|
||||
clean_output = ansi_escape.sub("", output)
|
||||
|
||||
for line in clean_output.splitlines():
|
||||
line_strip = line.strip()
|
||||
if line_strip.startswith("Token:"):
|
||||
return line_strip.split("Token:", 1)[1].strip()
|
||||
raise ValueError("Token not found in output")
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def influxdbv3_config_yaml(tmp_path, monkeypatch):
|
||||
"""
|
||||
Create a temporary influxdbv3_config.yaml and patch ServiceConfig to point to it.
|
||||
"""
|
||||
config_dir = tmp_path / "database_config"
|
||||
config_dir.mkdir()
|
||||
config_file = config_dir / "influxdbv3_config.yaml"
|
||||
config_file.write_text(
|
||||
"""
|
||||
url: http://localhost:9999
|
||||
org: test-org
|
||||
bucket: test-bucket
|
||||
token: test-token
|
||||
verify_ssl: false
|
||||
"""
|
||||
)
|
||||
monkeypatch.chdir(tmp_path)
|
||||
return config_file
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def influxdb_container():
|
||||
"""Spin up an InfluxDB 3.x container for integration testing."""
|
||||
with (
|
||||
DockerContainer(INFLUXDB_IMAGE)
|
||||
.with_exposed_ports(INFLUXDB_PORT)
|
||||
.with_bind_ports(INFLUXDB_PORT, 8181)
|
||||
.with_command(
|
||||
"influxdb3 serve "
|
||||
"--node-id host01 "
|
||||
"--object-store memory "
|
||||
"--data-dir ~/.influxdb3"
|
||||
)
|
||||
) as container:
|
||||
raw_token = (
|
||||
container.exec(["influxdb3", "create", "token", "--admin"])
|
||||
.output.decode()
|
||||
.strip()
|
||||
)
|
||||
admin_token = parse_influxdb3_token(raw_token)
|
||||
host = container.get_container_host_ip()
|
||||
port = INFLUXDB_PORT
|
||||
url = f"http://{host}:{port}"
|
||||
|
||||
headers = {"Authorization": f"Bearer {admin_token}"}
|
||||
|
||||
for _ in range(10):
|
||||
try:
|
||||
import requests
|
||||
|
||||
print(f"Checking InfluxDB health at {url}/health")
|
||||
resp = requests.get(f"{url}/health", headers=headers, timeout=2)
|
||||
if resp.status_code == 200:
|
||||
break
|
||||
except Exception:
|
||||
pass
|
||||
time.sleep(1)
|
||||
else:
|
||||
raise RuntimeError("InfluxDB did not become healthy in time")
|
||||
create_bucket_cmd = [
|
||||
"influxdb3",
|
||||
"create",
|
||||
"database",
|
||||
"--token",
|
||||
admin_token,
|
||||
INFLUXDB_BUCKET,
|
||||
]
|
||||
container.exec(create_bucket_cmd)
|
||||
yield {
|
||||
"url": url,
|
||||
"org": INFLUXDB_ORG,
|
||||
"bucket": INFLUXDB_BUCKET,
|
||||
"token": admin_token,
|
||||
}
|
||||
|
||||
|
||||
@pytest.mark.skip(reason="Requires Docker run in background, enable when needed")
|
||||
def test_influxdbv3session_write(influxdb_container):
|
||||
"""Test writing and querying a point using InfluxDBv3Session."""
|
||||
url = influxdb_container["url"]
|
||||
org = influxdb_container["org"]
|
||||
bucket = influxdb_container["bucket"]
|
||||
token = influxdb_container["token"]
|
||||
|
||||
with InfluxDBv3Session(
|
||||
host=url,
|
||||
org=org,
|
||||
bucket=bucket,
|
||||
token=token,
|
||||
verify_ssl=False,
|
||||
) as session:
|
||||
point = Point("temperature").tag("location", "office").field("value", 23.5)
|
||||
session.write(bucket=bucket, record=point)
|
||||
|
||||
time.sleep(1)
|
||||
|
||||
result = session._client.query(
|
||||
"SELECT * FROM temperature",
|
||||
database=bucket,
|
||||
)
|
||||
assert result is not None
|
||||
assert result["value"][0].as_py() == 23.5 # type: ignore
|
||||
|
||||
|
||||
def test_from_config_file_initialization(influxdbv3_config_yaml):
|
||||
"""
|
||||
Test that InfluxDBv3Session.from_config_file initializes correctly from config file.
|
||||
"""
|
||||
session = InfluxDBv3Session.from_config_file()
|
||||
assert session._client is not None
|
||||
# Check that the client has the expected attributes
|
||||
assert session._host == "http://localhost:9999"
|
||||
assert session._org == "test-org"
|
||||
assert session._bucket == "test-bucket"
|
||||
assert session._token == "test-token"
|
||||
Reference in New Issue
Block a user