Initial commit
This commit is contained in:
335
influx.py
Normal file
335
influx.py
Normal file
@ -0,0 +1,335 @@
|
||||
# *****************************************************************************
|
||||
# Copyright (c) 2024 ff by the module authors
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or modify it under
|
||||
# the terms of the GNU General Public License as published by the Free Software
|
||||
# Foundation; either version 2 of the License, or (at your option) any later
|
||||
# version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
||||
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
|
||||
# details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License along with
|
||||
# this program; if not, write to the Free Software Foundation, Inc.,
|
||||
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
||||
#
|
||||
# Module authors:
|
||||
# Konstantin Kholostov <k.kholostov@fz-juelich.de>
|
||||
#
|
||||
# *****************************************************************************
|
||||
import re
|
||||
import time
|
||||
from math import floor, ceil, copysign
|
||||
from datetime import datetime
|
||||
import pandas as pd
|
||||
|
||||
from influxdb_client import InfluxDBClient, BucketRetentionRules, Point
|
||||
from influxdb_client.client.write_api import SYNCHRONOUS as write_option
|
||||
|
||||
OFFSET, SCALE = pd.to_datetime(['1970-01-01 00:00:00', '1970-01-01 00:00:01']).astype(int)
|
||||
DAY = 24 * 3600
|
||||
# write_precision from digits after decimal point
|
||||
TIME_PRECISION = ['s'] + ['ms'] * 3 + ['us'] * 3 + ['ns'] * 3
|
||||
|
||||
|
||||
def identity(v):
|
||||
return v
|
||||
|
||||
|
||||
def negnull2none(v):
|
||||
"""converts -0.0 to None, returns argument for everything else, also strings"""
|
||||
return None if v == 0 and copysign(1, v) == -1 else v
|
||||
|
||||
|
||||
def wildcard_filter(key, names):
|
||||
patterns = []
|
||||
for name in names:
|
||||
patterns.append('[^.]*'.join(re.escape(v) for v in name.split('*')))
|
||||
pattern = '|'.join(patterns)
|
||||
return f'|> filter(fn:(r) => r.{key} =~ /^({pattern})$/)'
|
||||
|
||||
|
||||
class CurveDict(dict):
|
||||
def __missing__(self, key):
|
||||
return []
|
||||
|
||||
|
||||
class NamedTuple(tuple):
|
||||
"""for our purpose improved version of collection.namedtuple
|
||||
|
||||
- names may be any string, but when not an identifer, no attribute access
|
||||
- dict like access with [<key>]
|
||||
- customized converter (or validator) function for initialization
|
||||
|
||||
Usage:
|
||||
|
||||
MyNamedTuple1 = NamedTuple('a', 'b')
|
||||
x = MyNamedTuple1(1, 'y')
|
||||
assert x == (1, 'y') == (x.a, x.b)
|
||||
MyNamedTuple2 = NamedTuple(a=str, b=float)
|
||||
y = MyNamedTuple2(10, b='2')
|
||||
assert y == ('10', 2) == (y['a'], y['b'])
|
||||
"""
|
||||
_indices = None
|
||||
_converters = None
|
||||
|
||||
def __new__(cls, *args, **kwds):
|
||||
if cls is NamedTuple:
|
||||
return cls.getcls(dict({a: identity for a in args}, **kwds))
|
||||
values = dict(zip(cls._converters, args), **kwds)
|
||||
elements = []
|
||||
for key, cvt in cls._converters.items():
|
||||
try:
|
||||
elements.append(cvt(values[key]))
|
||||
except KeyError:
|
||||
elements.append(None)
|
||||
return super().__new__(cls, elements)
|
||||
|
||||
def __getitem__(self, key):
|
||||
try:
|
||||
return tuple(self)[key]
|
||||
except Exception:
|
||||
return tuple(self)[self._indices[key]]
|
||||
|
||||
@classmethod
|
||||
def getcls(cls, converters):
|
||||
attributes = {'_converters': converters,
|
||||
'_indices': {k: i for i, k in enumerate(converters)}}
|
||||
for idx, name in enumerate(converters):
|
||||
if name.isidentifier():
|
||||
attributes[name] = property(lambda s, i=idx: s[i])
|
||||
return type('NamedTuple', (cls,), attributes)
|
||||
|
||||
|
||||
def abs_range(start=None, stop=None, interval=None):
|
||||
now = time.time()
|
||||
if start is None:
|
||||
start = int(now - 32 * DAY)
|
||||
elif start < 366 * DAY:
|
||||
start = int(now + start)
|
||||
if stop is None:
|
||||
stop = int(now + DAY)
|
||||
elif stop < 366 * DAY:
|
||||
stop = ceil(now + stop)
|
||||
return start, stop
|
||||
|
||||
|
||||
def round_range(start, stop, interval=None):
|
||||
interval = max(1, interval or 0)
|
||||
start = floor(floor(start) // interval * interval)
|
||||
stop = ceil(ceil(stop // interval) * interval)
|
||||
return start, stop
|
||||
|
||||
|
||||
class InfluxDBWrapper:
|
||||
"""Wrapper for InfluxDB API 2.0.
|
||||
|
||||
based on nicos.services.cache.database.influxdb
|
||||
(work of Konstantin Kholostov <k.kholostov@fz-juelich.de>)
|
||||
"""
|
||||
|
||||
def __init__(self, url, token, org, bucket, create=False, watch_interval=300):
|
||||
self._watch_interval = watch_interval
|
||||
self._update_queue = []
|
||||
self._url = url
|
||||
self._token = token
|
||||
self._org = org
|
||||
self._bucket = bucket
|
||||
self._client = InfluxDBClient(url=self._url, token=self._token,
|
||||
org=self._org)
|
||||
self._write_api_write = self._client.write_api(write_options=write_option).write
|
||||
self._deadline = 0
|
||||
self._active_streams = {}
|
||||
self.set_time_precision(3)
|
||||
self.add_new_bucket(self._bucket, create)
|
||||
|
||||
def set_time_precision(self, digits):
|
||||
self.timedig = max(0, min(digits, 9))
|
||||
self._write_precision = TIME_PRECISION[self.timedig]
|
||||
|
||||
def to_timestamp(self, timevalue):
|
||||
return round(timevalue.timestamp(), self.timedig)
|
||||
|
||||
def disconnect(self):
|
||||
for stream, last in self._active_streams.items():
|
||||
self._update_queue.append(Point('_streams_')
|
||||
.time(last, write_precision=self._write_precision)
|
||||
.field('interval', 0).tag('stream', stream))
|
||||
self.flush()
|
||||
self._client.close()
|
||||
|
||||
def get_bucket_names(self):
|
||||
bucket_names = []
|
||||
buckets = self._client.buckets_api().find_buckets().buckets
|
||||
for bucket in buckets:
|
||||
bucket_names.append(bucket.name)
|
||||
return bucket_names
|
||||
|
||||
def add_new_bucket(self, bucket_name, create):
|
||||
if bucket_name not in self.get_bucket_names():
|
||||
if not create:
|
||||
raise ValueError(f'unknown bucket {bucket_name}')
|
||||
retention_rules = BucketRetentionRules(type='expire',
|
||||
every_seconds=0)
|
||||
self._client.buckets_api().create_bucket(
|
||||
bucket_name=bucket_name,
|
||||
retention_rules=retention_rules, org=self._org)
|
||||
|
||||
def get_measurements(self):
|
||||
return [r['_value'] for t in self._client.query_api().query(f"""
|
||||
import "influxdata/influxdb/schema"
|
||||
schema.measurements(bucket: "{self._bucket}")""") for r in t]
|
||||
|
||||
def delete_measurement(self, measurement):
|
||||
delete_api = self._client.delete_api()
|
||||
delete_api.delete('1970-01-01T00:00:00Z', '2038-01-01T00:00:00Z', f'_measurement="{measurement}"',
|
||||
bucket=self._bucket, org=self._org)
|
||||
|
||||
def delete_all_measurements(self):
|
||||
all = self.get_measurements()
|
||||
for meas in all:
|
||||
self.get_measurements(meas)
|
||||
print('deleted', all)
|
||||
|
||||
def flush(self):
|
||||
points = self._update_queue
|
||||
if points:
|
||||
self._update_queue = []
|
||||
self._write_api_write(bucket=self._bucket, record=points)
|
||||
|
||||
def query(self, start=None, stop=None, interval=None, last=False, **tags):
|
||||
"""Returns queried data as InfluxDB tables
|
||||
|
||||
:param start: start time. default is a month ago
|
||||
:param stop: end time, default is tomorrow at the same time
|
||||
:param interval: is set an aggregation filter will be applied. This will
|
||||
return only the latest values for a time interval in seconds.
|
||||
:param last: when True, only the last value within the interval is returned
|
||||
(for any existing combinations of tags!)
|
||||
:param tags: selection criteria:
|
||||
<tag>=None
|
||||
return records independent of the tag. the value will be contained in the result dicts key
|
||||
<tag>=[<value1>, <value2>, ...]
|
||||
return records with given tag from the list. the value is contained in the result dicts key
|
||||
<tag>>=<value>:
|
||||
return only records with given tag matching <value>. the value is not part of the results key
|
||||
<tag>=<func>
|
||||
where <func> is a callable. return tag as value in returned rows. <func> is used for value conversion
|
||||
|
||||
:return: a dict <tuple of key values> of list of <row>>
|
||||
where <tuple of keys> and <row> are NamedTuple
|
||||
"""
|
||||
self.flush()
|
||||
start, stop = round_range(*abs_range(start, stop), interval)
|
||||
msg = [f'from(bucket:"{self._bucket}")',
|
||||
f'|> range(start: {start}, stop: {stop})']
|
||||
|
||||
columns = {'_time': self.to_timestamp, '_value': negnull2none}
|
||||
keynames = []
|
||||
for key, crit in tags.items():
|
||||
if crit is None:
|
||||
keynames.append(key)
|
||||
continue
|
||||
if callable(crit):
|
||||
columns[key] = crit
|
||||
continue
|
||||
if isinstance(crit, str):
|
||||
if '*' in crit:
|
||||
keynames.append(key)
|
||||
msg.append(wildcard_filter(key, [crit]))
|
||||
continue
|
||||
crit = f'"{crit}"'
|
||||
elif not isinstance(crit, (int, float)):
|
||||
try:
|
||||
keynames.append(key)
|
||||
msg.append(wildcard_filter(key, crit))
|
||||
continue
|
||||
except Exception:
|
||||
raise ValueError(f'illegal value for {key}: {crit}')
|
||||
msg.append(f'|> filter(fn:(r) => r.{key} == {crit})')
|
||||
|
||||
if last:
|
||||
msg.append('|> last(column: "_time")')
|
||||
if interval:
|
||||
msg.append(f'|> aggregateWindow(every: {interval}s, fn: last, createEmpty: false)')
|
||||
if columns is not None:
|
||||
msg.append(f'''|> keep(columns:["{'","'.join(list(columns) + keynames)}"])''')
|
||||
msg = '\n'.join(msg)
|
||||
print(msg)
|
||||
tables = self._client.query_api().query(msg)
|
||||
result = CurveDict()
|
||||
keycls = NamedTuple(*keynames)
|
||||
colcls = NamedTuple(**columns)
|
||||
for table in tables:
|
||||
key = None
|
||||
for rec in table:
|
||||
print(rec.values)
|
||||
if key is None:
|
||||
key = keycls(**rec.values)
|
||||
result.setdefault(key, [])
|
||||
data = colcls(**rec.values)
|
||||
result[key].append(data)
|
||||
result[key].sort(key=lambda v: v[0])
|
||||
print('---')
|
||||
return result
|
||||
|
||||
def curves(self, start=None, stop=None, measurement=None, field='float', interval=None,
|
||||
add_prev=True, **tags):
|
||||
|
||||
for key, val in zip(('_measurement', '_field'), (measurement, field)):
|
||||
tags.setdefault(key, val)
|
||||
start, stop = abs_range(start, stop)
|
||||
rstart, rstop = round_range(start, stop, interval)
|
||||
if rstart < rstop:
|
||||
result = self.query(rstart, rstop, interval, **tags)
|
||||
else:
|
||||
result = {}
|
||||
if add_prev:
|
||||
prev = self.query(rstart - DAY, rstart, last=True, **tags)
|
||||
for key, prev in prev.items():
|
||||
curve = result.get(key)
|
||||
first = prev[-1]
|
||||
if first[1] is not None:
|
||||
if curve:
|
||||
if first[0] < curve[0][0]:
|
||||
curve.insert(0, first)
|
||||
else:
|
||||
result[key] = [first]
|
||||
return result
|
||||
|
||||
def write(self, measurement, field, value, ts, **tags):
|
||||
self._active_streams[tags.get('stream')] = ts
|
||||
if ts > self._deadline:
|
||||
dl = ts // self._watch_interval * self._watch_interval
|
||||
for stream, last in self._active_streams.items():
|
||||
self._update_queue.append(
|
||||
Point('_streams_')
|
||||
.time(datetime.utcfromtimestamp(last), write_precision=self._write_precision)
|
||||
.field('interval', self._watch_interval).tag('stream', stream))
|
||||
self._active_streams.clear()
|
||||
self._deadline = dl + self._watch_interval
|
||||
point = Point(measurement).field(f'{field}', value)
|
||||
if ts:
|
||||
point.time(datetime.utcfromtimestamp(ts), write_precision=self._write_precision)
|
||||
for key, val in tags.items():
|
||||
point.tag(key, val)
|
||||
self._update_queue.append(point)
|
||||
if len(self._update_queue) > 0: # 100
|
||||
self.flush()
|
||||
|
||||
def write_float(self, measurement, field, value, ts, **tags):
|
||||
# make sure value is float
|
||||
value = -0.0 if value is None else float(value)
|
||||
self.write(measurement, field, value, ts, **tags)
|
||||
|
||||
def write_string(self, measurement, field, value, ts, **tags):
|
||||
# make sure value is string
|
||||
value = '' if value is None else str(value)
|
||||
self.write(measurement, field, value, ts, **tags)
|
||||
|
||||
|
||||
def testdb():
|
||||
token = "zqDbTcMv9UizfdTj15Fx_6vBetkM5mXN56EE9CiDaFsh7O2FFWZ2X4VwAAmdyqZr3HbpIr5ixRju07-oQmxpXw=="
|
||||
return InfluxDBWrapper('http://pc16392:8086', token, 'linse', 'curve-test')
|
Reference in New Issue
Block a user