Files
sehistory/influx.py

448 lines
16 KiB
Python

# *****************************************************************************
# Copyright (c) 2024 ff by the module authors
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
#
# *****************************************************************************
import re
import time
import logging
from datetime import datetime, timezone
from math import floor, ceil
from influxdb_client import InfluxDBClient, BucketRetentionRules, Point
from influxdb_client.client.write_api import SYNCHRONOUS
DAY = 24 * 3600
YEAR = 366 * DAY
SINCE_EVER = YEAR + DAY
# write_precision from digits after decimal point
TIME_PRECISION = ['s'] + ['ms'] * 3 + ['us'] * 3 + ['ns'] * 3
UNDEF = '<undef>'
try:
parse_time = datetime.fromisoformat
except AttributeError:
from dateutil.parser import parse as parse_time
def to_time(v):
return parse_time(v).timestamp()
def to_iso(t):
return datetime.fromtimestamp(t, timezone.utc).isoformat().replace('+00:00', 'Z')
class PrettyFloat(float):
"""saves bandwidth when converting to JSON
a lot of numbers originally have a fixed (low) number of decimal digits.
as the binary representation is not exact, it might happen, that a
lot of superfluous digits are transmitted:
str(1/10*3) == '0.30000000000000004'
str(PrettyFloat(1/10*3)) == '0.3'
"""
def __new__(cls, value):
return None if value == '-0' else super().__new__(cls, value)
def __repr__(self):
return '%.15g' % self
class Converters(dict):
def __init__(self, datatypes):
super().__init__((i, getattr(self, f"cvt_{d.split(':')[0]}"))
for i, d in enumerate(datatypes) if i > 2)
def as_tuple(self, row):
"""get selected columns as tuple"""
return tuple(f(row[i]) for i, f in self.items())
cvt_double = staticmethod(PrettyFloat)
@staticmethod
def cvt_string(value):
return value
@staticmethod
def cvt_long(value):
return int(value)
@staticmethod
def cvt_dateTime(value):
return to_time(value)
@staticmethod
def cvt_boolean(value):
return value == 'true'
cvt_unsigned_long = cvt_duration = cvt_long
class Table(list):
"""a list of tuples with meta info"""
def __init__(self, tags=None, key_names=(), column_names=(), rows=None):
super().__init__()
self.tags = tags or {}
self.key_names = key_names
self.column_names = column_names
if rows:
self[:] = rows
def to_csv_rows(self, timeoffset=0, sep='\t', none='none', float_format='%.15g'):
for row in self:
result = ['%.15g' % (row[0] - timeoffset)]
for value in row[1:]:
try:
result.append(float_format % value)
except TypeError:
if value is None:
result.append(none)
else:
result.append(str(value).replace(sep, ' '))
yield sep.join(result)
class RegExp(str):
"""indicates, tht this string should be treated as regexp
Usage: RegExp(<pattern>)
when used in InfluxDBWrapper.query, uses Go regexp syntax!
"""
def append_wildcard_filter(msg, key, names):
patterns = []
for pattern in names:
if isinstance(pattern, RegExp):
patterns.append(pattern)
else:
# patterns.append('[^.]*'.join(re.escape(v) for v in pattern.split('*')))
patterns.append('.*'.join(re.escape(v) for v in pattern.split('*')))
if patterns:
pattern = '|'.join(patterns)
msg.append(f'|> filter(fn:(r) => r.{key} =~ /^({pattern})$/)')
class CurveDict(dict):
def __missing__(self, key):
return []
def abs_range(start=None, stop=None):
now = time.time()
if start is None: # since ever
start = SINCE_EVER
elif start < YEAR:
start = int(now + start)
if stop is None:
stop = int(now + YEAR)
elif stop < YEAR:
stop = ceil(now + stop)
return start, stop
def round_range(start, stop, interval=None):
interval = max(1, interval or 0)
start = floor(floor(start) // interval * interval)
stop = ceil(ceil(stop // interval) * interval)
return start, stop
class InfluxDBWrapper:
"""Wrapper for InfluxDB API 2.0.
based on nicos.services.cache.database.influxdb
(work of Konstantin Kholostov <k.kholostov@fz-juelich.de>)
"""
_update_queue = None
_write_api_write = None
def __init__(self, uri=None, token=None, org=None, bucket=None, access='readonly'):
"""initialize
:param uri: the uri for the influx DB or a filepath for a config file
:param token: the token or the section in the config file
:param org: the organisation
:param bucket: the bucket name
:param access: 'readonly', 'write' (RW) or 'create' (incl. RW)
"""
self._url, self._token, self._org, self._bucket = uri, token, org, bucket
self._client = InfluxDBClient(url=uri, token=token, org=org)
if access != 'readonly':
self.enable_write_access()
self._deadline = 0
self.set_time_precision(3)
self.add_new_bucket(self._bucket, access == 'create')
self._write_buffer = []
self._alias = {}
logging.info('InfluxDBWrapper %s %s %s', self._url, self._org, self._bucket)
def enable_write_access(self):
self._write_api_write = self._client.write_api(write_options=SYNCHRONOUS).write
def set_time_precision(self, digits):
self.timedig = max(0, min(digits, 9))
self._write_precision = TIME_PRECISION[self.timedig]
def disconnect(self):
self.flush()
self._client.close()
def get_bucket_names(self):
bucket_names = []
buckets = self._client.buckets_api().find_buckets().buckets
for bucket in buckets:
bucket_names.append(bucket.name)
return bucket_names
def add_new_bucket(self, bucket_name, create):
if bucket_name not in self.get_bucket_names():
if not create:
raise ValueError(f'unknown bucket {bucket_name}')
retention_rules = BucketRetentionRules(type='expire',
every_seconds=0)
self._client.buckets_api().create_bucket(
bucket_name=bucket_name,
retention_rules=retention_rules, org=self._org)
def get_measurements(self):
return [r['_value'] for t in self._client.query_api().query(f"""
import "influxdata/influxdb/schema"
schema.measurements(bucket: "{self._bucket}")""") for r in t]
def delete_measurement(self, measurement, start=None, stop=None):
delete_api = self._client.delete_api()
start, stop = abs_range(start, stop)
if stop is None:
stop = time.time() + DAY
delete_api.delete(to_iso(start), to_iso(stop), f'_measurement="{measurement}"',
bucket=self._bucket, org=self._org)
def delete_all_measurements(self, measurements=None, start=0, stop=None):
if measurements is None:
measurements = self.get_measurements()
for meas in measurements:
self.delete_measurement(meas, start, stop)
def _get_rows(self, reader, as_tuple, first_row):
row = first_row
tableno = row[2]
try:
while 1:
if row[0]:
first_row[:] = row
return
if row[2] != tableno:
# table id changed: new table, store first row for next call
first_row[:] = row
return
yield as_tuple(row)
row = next(reader)
if not row:
raise ValueError('EMPTY')
except StopIteration:
first_row.clear() # indicate end of data
# query the database
def query(self, start=None, stop=None, interval=None, single=None, columns=None, **tags):
"""Returns queried data as InfluxDB tables
:param start: start time (default: since ever)
:param stop: end time (default: eternity = 1 year in the future)
:param interval: if set an aggregation filter will be applied. This will
return only the latest values per time interval in seconds.
:param single: when not 0, only the last value within the interval is returned
the resulting tables have all exactly one row
(for any existing combinations of tags!)
single < 0: return the first value instead
:param columns: if given, return only these columns (in addition to '_time' and '_value')
:param tags: selection criteria:
<tag>=None
return records independent of this tag.
the obtained value will be contained in the result dicts key
<tag>=[<value1>, <value2>, ...]
return records where tag is one from the list.
the obtained value is contained in the result dicts key
<tag>=<value>:
return only records with given tag matching <value>.
the obtained value is contained in the result dicts key only
if the value is an instance of RegExp or when it contains an asterisk ('*')
:return: a dict <tuple of key values> of <Table instance>
Table is an extension of list, with some meta info
"""
result = {}
for rows, key, props in self.query_gen(start, stop, interval, single, columns, **tags):
table = Table(*props, rows=rows)
table.sort()
result[key] = table
return result
def query_gen(self, start=None, stop=None, interval=None, single=None, columns=None, **tags):
"""Returns queried data as InfluxDB as a generator
argument description: see query methods
:return: an iterator of (rows, key, (tags, key_names, column_names))
remark: rows not consumed in between iteration steps are lost
using this generator version does reduce memory usage
"""
self.flush()
start, stop = round_range(*abs_range(start, stop))
msg = [f'from(bucket:"{self._bucket}")',
f'|> range(start: {start}, stop: {stop})']
keylist = []
dropcols = ['_start', '_stop']
fixed_tags = {}
for key, crit in tags.items():
if crit is None:
keylist.append(key)
continue
if isinstance(crit, str):
if isinstance(crit, RegExp) or '*' in crit:
keylist.append(key)
append_wildcard_filter(msg, key, [crit])
continue
fixed_tags[key] = crit
dropcols.append(key)
crit = f'"{crit}"'
elif isinstance(crit, bool):
crit = 'true' if crit else 'false'
fixed_tags[key] = crit
dropcols.append(key)
elif isinstance(crit, (int, float)):
fixed_tags[key] = crit
dropcols.append(key)
else:
try:
keylist.append(key)
append_wildcard_filter(msg, key, crit)
continue
except Exception:
raise ValueError(f'illegal value for {key}: {crit}')
msg.append(f'|> filter(fn:(r) => r.{key} == {crit})')
if single:
if single < 0:
msg.append('|> first(column: "_time")')
else:
msg.append('|> last(column: "_time")')
if interval:
msg.append(f'|> aggregateWindow(every: {interval:g}s, fn: last, createEmpty: false)')
if columns is None:
msg.append(f'''|> drop(columns:["{'","'.join(dropcols)}"])''')
else:
columns = ['_time', '_value'] + list(columns)
msg.append(f'''|> keep(columns:["{'","'.join(columns + keylist)}"])''')
msg = '\n'.join(msg)
logging.debug('MSG %r', msg)
self.msg = msg
try:
reader = self._client.query_api().query_csv(msg)
except Exception as e:
logging.exception("error in query: %r", msg)
raise
# if self.debug:
# def readdebug(reader):
# for row in reader:
# print(row)
# yield row
# reader = readdebug(reader)
try:
row = next(reader)
except StopIteration:
return
converters = keys = column_names = None # make IDE happy
while 1:
header = {}
if row[0]: # this is a header
header[row[0]] = row
for row in reader:
if row:
if not row[0]:
break
header[row[0]] = row
else:
return # this should not happen
# we are now at the row with the column names
column_names = row
converters = Converters(header['#datatype'])
group = header['#group']
keys = {k: None for k in keylist}
for col, (name, grp) in enumerate(zip(column_names, group)):
if grp != 'true':
continue
if columns is None or name in keys:
keys[name] = col, converters.pop(col)
none_keys = [k for k, v in keys.items() if v is None]
if none_keys:
for k in none_keys:
keys.pop(k)
# break
row = next(reader)
# we are at the first data row
key_dict = {n: f(row[i]) for n, (i, f) in keys.items()}
column_keys = tuple(column_names[i] for i in converters)
table_properties = {**fixed_tags, **key_dict}, tuple(keys), column_keys
key = tuple(key_dict.values())
row = list(row) # copy row, as it will be modified
rows = self._get_rows(reader, converters.as_tuple, row)
yield rows, key, table_properties
# consume unused rows
consumed = sum(1 for _ in rows)
if consumed:
logging.info('skip %r rows', consumed)
if not row: # reader is at end
return
# write to the database
def _add_point(self, measurement, field, value, ts, tags):
point = Point(measurement).field(f'{field}', value)
if ts:
point.time(datetime.utcfromtimestamp(ts), write_precision=self._write_precision)
for key, val in tags.items():
point.tag(key, val)
self._write_buffer.append(point)
def write(self, measurement, field, value, ts, **tags):
"""add point and flush"""
self._add_point(measurement, field, value, ts, tags)
self.flush()
def flush(self):
"""flush write buffer"""
points = self._write_buffer
self._write_buffer = []
if points:
try:
self._write_api_write(bucket=self._bucket, record=points)
except TypeError as e:
if self._write_api_write is None:
raise PermissionError('no write access - need access="write"') from None
raise