From b0df2f111be6615e1bf2c2c3c4665225e1b503fd Mon Sep 17 00:00:00 2001 From: Mose Mueller Date: Wed, 2 Aug 2023 11:08:51 +0200 Subject: [PATCH] feat: adding write method, making client and api protected --- .../database/influxdb_session.py | 58 +++++++++++++++---- 1 file changed, 47 insertions(+), 11 deletions(-) diff --git a/icon_service_base/database/influxdb_session.py b/icon_service_base/database/influxdb_session.py index 9aee1a1..906d12e 100644 --- a/icon_service_base/database/influxdb_session.py +++ b/icon_service_base/database/influxdb_session.py @@ -1,19 +1,25 @@ from __future__ import annotations +from collections.abc import Iterable +from dataclasses import dataclass from pathlib import Path from types import TracebackType -from typing import Optional +from typing import Any, NamedTuple, Optional, Union # noqa: UNT001 from influxdb_client import ( Bucket, BucketRetentionRules, BucketsApi, InfluxDBClient, + Point, WriteApi, + WritePrecision, ) +from influxdb_client.client.write.point import DEFAULT_WRITE_PRECISION from influxdb_client.client.write_api import SYNCHRONOUS from influxdb_client.rest import ApiException from loguru import logger +from reactivex import Observable from icon_service_base.database.config import InfluxDBConfig from icon_service_base.database.create_config import create_config @@ -44,7 +50,7 @@ class InfluxDBSession: }, "time": "2023-06-05T00:00:00Z", # Replace with your timestamp } - influx_client.write_api.write( + influx_client.write( bucket='my_new_bucket', record=record ) ``` @@ -62,13 +68,13 @@ class InfluxDBSession: self.url = self._config.url self.token = self._config.token.get_secret_value() self.org = self._config.org - self.client: InfluxDBClient - self.write_api: WriteApi - self.buckets_api: BucketsApi | None = None + self._client: InfluxDBClient + self._write_api: WriteApi + self._buckets_api: BucketsApi def __enter__(self) -> InfluxDBSession: - self.client = InfluxDBClient(url=self.url, token=self.token, org=self.org) - self.write_api = self.client.write_api(write_options=SYNCHRONOUS) # type: ignore + self._client = InfluxDBClient(url=self.url, token=self.token, org=self.org) + self._write_api = self._client.write_api(write_options=SYNCHRONOUS) # type: ignore return self def __exit__( @@ -77,8 +83,38 @@ class InfluxDBSession: exc_value: BaseException | None, exc_traceback: TracebackType | None, ) -> None: - self.write_api.close() - self.client.__del__() + self._write_api.close() + self._client.__del__() + + def write( + self, + bucket: str, + org: Optional[str] = None, + record: Union[ # type: ignore + str, + Iterable["str"], + Point, + Iterable["Point"], + dict, + Iterable["dict"], + bytes, + Iterable["bytes"], + Observable, + NamedTuple, + Iterable["NamedTuple"], + "dataclass", # type: ignore + Iterable["dataclass"], # type: ignore + ] = None, + write_precision: WritePrecision = DEFAULT_WRITE_PRECISION, # type: ignore + **kwargs: Any, + ) -> Any: + self._write_api.write( # type: ignore + bucket=bucket, + org=org, + record=record, + write_precision=write_precision, # type: ignore + **kwargs, + ) def create_bucket( # noqa: CFQ002 self, @@ -107,9 +143,9 @@ class InfluxDBSession: `InfluxDBClient.org` is used. """ - self.buckets_api = self.client.buckets_api() + self._buckets_api = self._client.buckets_api() try: - self.buckets_api.create_bucket( + self._buckets_api.create_bucket( bucket=bucket, bucket_name=bucket_name, org_id=org_id,