mirror of
https://github.com/tiqi-group/pydase_service_base.git
synced 2026-02-19 01:28:51 +01:00
updates type hints, implements linting suggestions
This commit is contained in:
@@ -1,11 +1,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from collections.abc import Iterable
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from types import TracebackType
|
||||
from typing import Any, NamedTuple, Optional, Union # noqa: UNT001
|
||||
from typing import TYPE_CHECKING, Any, NamedTuple
|
||||
|
||||
from influxdb_client import (
|
||||
Bucket,
|
||||
@@ -19,12 +15,19 @@ from influxdb_client import (
|
||||
from influxdb_client.client.write.point import DEFAULT_WRITE_PRECISION
|
||||
from influxdb_client.client.write_api import SYNCHRONOUS
|
||||
from influxdb_client.rest import ApiException
|
||||
from reactivex import Observable
|
||||
|
||||
from icon_service_base.database.config import InfluxDBConfig
|
||||
from icon_service_base.database.create_config import create_config
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from collections.abc import Iterable
|
||||
from pathlib import Path
|
||||
from types import TracebackType
|
||||
|
||||
from reactivex import Observable
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
BUCKET_ALREADY_EXISTS = 422
|
||||
|
||||
|
||||
class InfluxDBSession:
|
||||
@@ -60,7 +63,7 @@ class InfluxDBSession:
|
||||
|
||||
conf_folder: Path | str
|
||||
|
||||
def __init__(self, config_folder: Optional[Path | str] = None) -> None:
|
||||
def __init__(self, config_folder: Path | str | None = None) -> None:
|
||||
self._config = create_config(
|
||||
InfluxDBConfig,
|
||||
config_folder=config_folder,
|
||||
@@ -91,20 +94,18 @@ class InfluxDBSession:
|
||||
def write(
|
||||
self,
|
||||
bucket: str,
|
||||
record: Union[ # type: ignore
|
||||
str,
|
||||
Iterable[str],
|
||||
Point,
|
||||
Iterable[Point],
|
||||
dict[str, Any],
|
||||
Iterable[dict[str, Any]],
|
||||
bytes,
|
||||
Iterable[bytes],
|
||||
Observable[Any],
|
||||
NamedTuple,
|
||||
Iterable[NamedTuple],
|
||||
],
|
||||
org: Optional[str] = None,
|
||||
record: str
|
||||
| Iterable[str]
|
||||
| Point
|
||||
| Iterable[Point]
|
||||
| dict[str, Any]
|
||||
| Iterable[dict[str, Any]]
|
||||
| bytes
|
||||
| Iterable[bytes]
|
||||
| Observable[Any]
|
||||
| NamedTuple
|
||||
| Iterable[NamedTuple],
|
||||
org: str | None = None,
|
||||
write_precision: WritePrecision = DEFAULT_WRITE_PRECISION, # type: ignore
|
||||
**kwargs: Any,
|
||||
) -> Any:
|
||||
@@ -116,7 +117,7 @@ class InfluxDBSession:
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
def create_bucket( # noqa: CFQ002
|
||||
def create_bucket( # noqa: PLR0913
|
||||
self,
|
||||
bucket: Bucket | None = None,
|
||||
bucket_name: str | None = None,
|
||||
@@ -153,7 +154,7 @@ class InfluxDBSession:
|
||||
org=org,
|
||||
)
|
||||
except ApiException as e:
|
||||
if e.status == 422:
|
||||
if e.status == BUCKET_ALREADY_EXISTS:
|
||||
logger.debug(e.message)
|
||||
return
|
||||
logger.error(e)
|
||||
|
||||
Reference in New Issue
Block a user