feat: adding write method, making client and api protected

This commit is contained in:
Mose Mueller 2023-08-02 11:08:51 +02:00
parent 711fdabbab
commit b0df2f111b

View File

@ -1,19 +1,25 @@
from __future__ import annotations from __future__ import annotations
from collections.abc import Iterable
from dataclasses import dataclass
from pathlib import Path from pathlib import Path
from types import TracebackType from types import TracebackType
from typing import Optional from typing import Any, NamedTuple, Optional, Union # noqa: UNT001
from influxdb_client import ( from influxdb_client import (
Bucket, Bucket,
BucketRetentionRules, BucketRetentionRules,
BucketsApi, BucketsApi,
InfluxDBClient, InfluxDBClient,
Point,
WriteApi, WriteApi,
WritePrecision,
) )
from influxdb_client.client.write.point import DEFAULT_WRITE_PRECISION
from influxdb_client.client.write_api import SYNCHRONOUS from influxdb_client.client.write_api import SYNCHRONOUS
from influxdb_client.rest import ApiException from influxdb_client.rest import ApiException
from loguru import logger from loguru import logger
from reactivex import Observable
from icon_service_base.database.config import InfluxDBConfig from icon_service_base.database.config import InfluxDBConfig
from icon_service_base.database.create_config import create_config from icon_service_base.database.create_config import create_config
@ -44,7 +50,7 @@ class InfluxDBSession:
}, },
"time": "2023-06-05T00:00:00Z", # Replace with your timestamp "time": "2023-06-05T00:00:00Z", # Replace with your timestamp
} }
influx_client.write_api.write( influx_client.write(
bucket='my_new_bucket', record=record bucket='my_new_bucket', record=record
) )
``` ```
@ -62,13 +68,13 @@ class InfluxDBSession:
self.url = self._config.url self.url = self._config.url
self.token = self._config.token.get_secret_value() self.token = self._config.token.get_secret_value()
self.org = self._config.org self.org = self._config.org
self.client: InfluxDBClient self._client: InfluxDBClient
self.write_api: WriteApi self._write_api: WriteApi
self.buckets_api: BucketsApi | None = None self._buckets_api: BucketsApi
def __enter__(self) -> InfluxDBSession: def __enter__(self) -> InfluxDBSession:
self.client = InfluxDBClient(url=self.url, token=self.token, org=self.org) self._client = InfluxDBClient(url=self.url, token=self.token, org=self.org)
self.write_api = self.client.write_api(write_options=SYNCHRONOUS) # type: ignore self._write_api = self._client.write_api(write_options=SYNCHRONOUS) # type: ignore
return self return self
def __exit__( def __exit__(
@ -77,8 +83,38 @@ class InfluxDBSession:
exc_value: BaseException | None, exc_value: BaseException | None,
exc_traceback: TracebackType | None, exc_traceback: TracebackType | None,
) -> None: ) -> None:
self.write_api.close() self._write_api.close()
self.client.__del__() self._client.__del__()
def write(
self,
bucket: str,
org: Optional[str] = None,
record: Union[ # type: ignore
str,
Iterable["str"],
Point,
Iterable["Point"],
dict,
Iterable["dict"],
bytes,
Iterable["bytes"],
Observable,
NamedTuple,
Iterable["NamedTuple"],
"dataclass", # type: ignore
Iterable["dataclass"], # type: ignore
] = None,
write_precision: WritePrecision = DEFAULT_WRITE_PRECISION, # type: ignore
**kwargs: Any,
) -> Any:
self._write_api.write( # type: ignore
bucket=bucket,
org=org,
record=record,
write_precision=write_precision, # type: ignore
**kwargs,
)
def create_bucket( # noqa: CFQ002 def create_bucket( # noqa: CFQ002
self, self,
@ -107,9 +143,9 @@ class InfluxDBSession:
`InfluxDBClient.org` is used. `InfluxDBClient.org` is used.
""" """
self.buckets_api = self.client.buckets_api() self._buckets_api = self._client.buckets_api()
try: try:
self.buckets_api.create_bucket( self._buckets_api.create_bucket(
bucket=bucket, bucket=bucket,
bucket_name=bucket_name, bucket_name=bucket_name,
org_id=org_id, org_id=org_id,