feat: adding database module

This commit is contained in:
Mose Mueller
2023-06-05 15:55:19 +02:00
parent fc4b351248
commit 58814204e9
6 changed files with 751 additions and 117 deletions

View File

@ -0,0 +1,4 @@
from .influxdb_connection import InfluxDBConnection
from .postgres_connection import PostgresDatabaseSession
__all__ = ["InfluxDBConnection", "PostgresDatabaseSession"]

View File

@ -0,0 +1,121 @@
from __future__ import annotations
from types import TracebackType
from dotenv import dotenv_values
from influxdb_client import ( # type: ignore
Bucket,
BucketRetentionRules,
BucketsApi,
InfluxDBClient,
WriteApi,
)
from influxdb_client.client.write_api import SYNCHRONOUS # type: ignore
from influxdb_client.rest import ApiException # type: ignore
from loguru import logger
# Load the environment variables from the .env file
env_vars = dotenv_values()
# Access the individual environment variables
INFLUXDB_V2_URL = str(env_vars.get("INFLUXDB_V2_URL"))
INFLUXDB_V2_ORG = env_vars.get("INFLUXDB_V2_ORG")
INFLUXDB_V2_TOKEN = env_vars.get("INFLUXDB_V2_TOKEN")
class InfluxDBConnection:
"""
The `InfluxDBConnection` class serves as a context manager for a connection
to an InfluxDB server. This connection is established using credentials
provided through environment variables.
Example usage:
```
with InfluxDBConnection() as influx_client:
# Creating a bucket
influx_client.create_bucket(
bucket_name='my_new_bucket', description='This is a new bucket'
)
# Writing data to a bucket
record = {
"measurement": "your_measurement", # Replace with your measurement
"tags": {
"example_tag": "tag_value", # Replace with your tag and its value
},
"fields": {
"example_field": 123, # Replace with your field and its value
},
"time": "2023-06-05T00:00:00Z", # Replace with your timestamp
}
influx_client.write_api.write(
bucket='my_new_bucket', record=record
)
```
"""
def __init__(self) -> None:
self.url = INFLUXDB_V2_URL
self.token = INFLUXDB_V2_TOKEN
self.org = INFLUXDB_V2_ORG
self.client: InfluxDBClient
self.write_api: WriteApi
self.buckets_api: BucketsApi | None = None
def __enter__(self) -> InfluxDBConnection:
self.client = InfluxDBClient(url=self.url, token=self.token, org=self.org)
self.write_api = self.client.write_api(write_options=SYNCHRONOUS) # type: ignore
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
exc_traceback: TracebackType | None,
) -> None:
self.write_api.close()
self.client.__del__()
def create_bucket( # noqa: CFQ002
self,
bucket: Bucket | None = None,
bucket_name: str | None = None,
org_id: int | None = None,
retention_rules: BucketRetentionRules | None = None,
description: str | None = None,
org: str | None = None,
) -> None:
"""
Create a bucket in the InfluxDB instance. This function wraps the create_bucket
from `influxdb_client` in a try-catch block and logs potential errors with
loguru.
Args:
bucket (Bucket | PostBucketRequest, optional): Bucket instance to be
created.
bucket_name (str, optional): Name of the bucket to be created.
org_id (int, optional): The organization id for the bucket.
retention_rules (BucketRetentionRules, optional): Retention rules for the
bucket.
description (str, optional): Description of the bucket.
org (str, optional): The name of the organization for the bucket. Takes
the ID, Name, or Organization. If not specified, the default value from
`InfluxDBClient.org` is used.
"""
self.buckets_api = self.client.buckets_api()
try:
self.buckets_api.create_bucket(
bucket=bucket,
bucket_name=bucket_name,
org_id=org_id,
retention_rules=retention_rules,
description=description,
org=org,
)
except ApiException as e:
if e.status == 422:
logger.debug(e.message)
return
logger.error(e)
return

View File

@ -0,0 +1,171 @@
from __future__ import annotations
import datetime
import json
import re
from types import TracebackType
from typing import Any
from dateutil.parser import ParserError, parse # type: ignore
from dotenv import dotenv_values
from sqlmodel import Session, SQLModel, create_engine
# Load the environment variables from the .env file
env_vars = dotenv_values()
# Access the individual environment variables
POSTGRESQL_USER = env_vars.get("POSTGRESQL_USER")
POSTGRESQL_PASSWORD = env_vars.get("POSTGRESQL_PASSWORD")
POSTGRESQL_HOST = env_vars.get("POSTGRESQL_HOST")
POSTGRESQL_PORT = env_vars.get("POSTGRESQL_PORT")
POSTGRESQL_DATABASE = env_vars.get("POSTGRESQL_DATABASE")
def json_loads_or_return_input(input_string: str) -> dict[str, Any] | Any:
"""
Try to parse a string as JSON, if it fails return the original string.
"""
try:
return json.loads(input_string)
except (TypeError, json.JSONDecodeError):
return input_string
def parse_datetime_or_return_str(input_string: str) -> datetime.datetime | str:
try:
# Attempts to parse the string as a datetime object
return parse(input_string)
except ParserError:
# If parsing fails, return the original input string
return input_string
def is_datetime_format(input_string: str) -> bool:
"""
Check if a string is in datetime format.
"""
try:
parse(input_string)
return True
except ParserError:
return False
def json_dumps(data: Any) -> str | list:
"""
Serialize a Python object into a JSON-formatted string, with custom handling for
datetime and list objects.
"""
# 'Infinity' is an unallowed token in JSON, thus make it a string
# https://stackoverflow.com/questions/48356938/store-infinity-in-postgres-json-via-django
pattern = r"(-?Infinity)"
result: str | list
if isinstance(data, str):
if is_datetime_format(data):
result = json.dumps(data)
else:
result = data
elif isinstance(data, datetime.datetime):
result = json.dumps(str(data))
elif isinstance(data, list):
result = [json_dumps(element) for element in data]
else:
if isinstance(data, SQLModel):
serialized_data = data.json()
else:
serialized_data = json.dumps(data)
result = re.sub(pattern, r'"\1"', serialized_data)
return result
def deserialize_json_dict(json_string: str) -> Any:
"""
Deserialize a JSON string into a Python dictionary.
"""
# 'Infinity' is an unallowed token in JSON, thus we made it a string. Now, convert
# it back
pattern = r'"(-?Infinity)"'
json_string = re.sub(pattern, r"\1", json_string)
result: Any
val = json.loads(json_string)
json_dict_or_val = json_loads_or_return_input(val)
if isinstance(json_dict_or_val, str):
result = parse_datetime_or_return_str(json_dict_or_val)
else:
result = json_dict_or_val
return result
# the docs state that
# "You should normally have a single engine object for your whole application
# and re-use it everywhere."
database_engine = create_engine(
f"postgresql://{POSTGRESQL_USER}:{POSTGRESQL_PASSWORD}@{POSTGRESQL_HOST}:"
f"{POSTGRESQL_PORT}/{POSTGRESQL_DATABASE}",
echo=False,
json_serializer=json_dumps,
json_deserializer=deserialize_json_dict,
)
class PostgresDatabaseSession(Session):
"""A class to represent a session with the PostgreSQL database.
This class inherits from SQLModel's Session class and implements Python's context
manager protocol. This class helps to ensure that sessions are properly opened
and closed, even in cases of error.
The main goal of this class is to provide a way to manage persistence operations
for ORM-mapped objects.
Attributes:
bind: Represents the database engine to which this session is bound.
Example:
This class is designed to be used with a context manager (the 'with' keyword).
Here's how you can use it to query data from a table represented by a SQLModel
class named 'YourModel':
```python
from your_module.models import YourModel # replace with your model
from sqlmodel import select
with PostgresDatabaseSession() as session:
row = session.exec(select(YourModel).limit(1)).all()
```
You can also use it to add new data to the table:
```python
with PostgresDatabaseSession() as session:
new_row = YourModel(...) # replace ... with your data
session.add(new_row)
session.commit()
```
"""
def __init__(self) -> None:
"""Initializes a new session bound to the database engine."""
super().__init__(bind=database_engine)
def __enter__(self) -> PostgresDatabaseSession:
"""Begins the runtime context related to the database session."""
super().__enter__()
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
exc_traceback: TracebackType | None,
) -> None:
"""Ends the runtime context related to the database session.
Ensures that the session is properly closed, even in the case of an error.
"""
super().__exit__(exc_type, exc_value, exc_traceback)