further rework

- dump all every full hour
- finish all streams properly on exit
This commit is contained in:
2025-02-11 10:51:37 +01:00
parent 50f8c349ee
commit ce205f47a2
6 changed files with 386 additions and 296 deletions

394
influx.py
View File

@ -16,33 +16,94 @@
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Konstantin Kholostov <k.kholostov@fz-juelich.de>
# Markus Zolliker <markus.zolliker@psi.ch>
#
# *****************************************************************************
import re
import time
import threading
import queue
from math import floor, ceil, copysign
from datetime import datetime
import pandas as pd
from math import floor, ceil
from influxdb_client import InfluxDBClient, BucketRetentionRules, Point
from influxdb_client.client.write_api import SYNCHRONOUS as write_option
from influxdb_client.client.write_api import SYNCHRONOUS
OFFSET, SCALE = pd.to_datetime(['1970-01-01 00:00:00', '1970-01-01 00:00:01']).astype(int)
DAY = 24 * 3600
# write_precision from digits after decimal point
TIME_PRECISION = ['s'] + ['ms'] * 3 + ['us'] * 3 + ['ns'] * 3
try:
parse_time = datetime.fromisoformat
except AttributeError:
from dateutil.parser import parse as parse_time
def to_time(v):
return parse_time(v).timestamp()
def identity(v):
return v
def negnull2none(v):
"""converts -0.0 to None, returns argument for everything else, also strings"""
return None if v == 0 and copysign(1, v) == -1 else v
def double(v):
return None if v == '-0' else float(v)
CONVERTER = {
'string': identity,
'long': int,
'double': double,
'unsigned_long': int,
'duration': int,
'dateTime:RFC3339': to_time,
'dateTime:RFC3339Nano': to_time,
# 'base64Binary': base64.b64decode,
}
class NamedTuple(tuple):
"""for our purpose improved version of collection.namedtuple
- names may be any string, but when not an identifer, attribute access is not possible
- access by key with get ([ ] is for indexed access)
Usage:
MyNamedTuple = NamedTuple.make_class(('a', 'b'))
x = MyNamedTuple(('a', 2.0))
assert x == ('a', 2.0) == (x.a, x.b) == (x.get('a'), x.get('b')) == (x[0], x[1])
"""
keys = None
_idx_by_name = None
def __new__(cls, keys):
"""create NamedTuple class from keys
:param keys: a sequence of names for the elements
"""
idxbyname = {n: i for i, n in enumerate(keys)}
attributes = {n: property(lambda s, idx=i: s[idx])
for i, n in enumerate(keys)
if n.isidentifier() and not hasattr(cls, n)}
# clsname = '_'.join(attributes)
attributes.update(_idx_by_name=idxbyname, __new__=tuple.__new__, keys=tuple(keys))
return type(f"NamedTuple", (cls,), attributes)
def get(self, key, default=None, strict=False):
"""get item by key
:param key: the key
:param default: value to return when key does not exist
:param strict: raise KeyError when key does not exist and ignore default
:return: the value of requested element or default if the key does not exist
"""
try:
return self[self._idx_by_name[key]]
except KeyError:
if strict:
raise
return default
class RegExp(str):
@ -53,14 +114,17 @@ class RegExp(str):
"""
def wildcard_filter(key, names):
def append_wildcard_filter(msg, key, names):
patterns = []
for pattern in names:
if isinstance(pattern, RegExp):
patterns.append(pattern)
else:
patterns.append('[^.]*'.join(re.escape(v) for v in pattern.split('*')))
return f'|> filter(fn:(r) => r.{key} =~ /^({pattern})$/)'
# patterns.append('[^.]*'.join(re.escape(v) for v in pattern.split('*')))
patterns.append('.*'.join(re.escape(v) for v in pattern.split('*')))
if patterns:
pattern = '|'.join(patterns)
msg.append(f'|> filter(fn:(r) => r.{key} =~ /^({pattern})$/)')
class CurveDict(dict):
@ -68,54 +132,7 @@ class CurveDict(dict):
return []
class NamedTuple(tuple):
"""for our purpose improved version of collection.namedtuple
- names may be any string, but when not an identifer, no attribute access
- dict like access with [<key>]
- customized converter (or validator) function for initialization
Usage:
MyNamedTuple1 = NamedTuple('a', 'b')
x = MyNamedTuple1(1, 'y')
assert x == (1, 'y') == (x.a, x.b)
MyNamedTuple2 = NamedTuple(a=str, b=float)
y = MyNamedTuple2(10, b='2')
assert y == ('10', 2) == (y['a'], y['b'])
"""
_indices = None
_converters = None
def __new__(cls, *args, **kwds):
if cls is NamedTuple:
return cls.getcls(dict({a: identity for a in args}, **kwds))
values = dict(zip(cls._converters, args), **kwds)
elements = []
for key, cvt in cls._converters.items():
try:
elements.append(cvt(values[key]))
except KeyError:
elements.append(None)
return super().__new__(cls, elements)
def __getitem__(self, key):
try:
return tuple(self)[key]
except Exception:
return tuple(self)[self._indices[key]]
@classmethod
def getcls(cls, converters):
attributes = {'_converters': converters,
'_indices': {k: i for i, k in enumerate(converters)}}
for idx, name in enumerate(converters):
if name.isidentifier():
attributes[name] = property(lambda s, i=idx: s[i])
return type('NamedTuple', (cls,), attributes)
def abs_range(start=None, stop=None, interval=None):
def abs_range(start=None, stop=None):
now = time.time()
if start is None:
start = int(now - 32 * DAY)
@ -142,64 +159,38 @@ class InfluxDBWrapper:
(work of Konstantin Kholostov <k.kholostov@fz-juelich.de>)
"""
_update_queue = None
_write_thread = None
_write_api_write = None
def __init__(self, url, token, org, bucket, threaded=False, create=False, watch_interval=300):
self._watch_interval = watch_interval
def __init__(self, url, token, org, bucket, access='readonly'):
"""initialize
:param url: the url for the influx DB
:param token: the token
:param org: the organisation
:param bucket: the bucket name
:param access: 'readonly', 'write' (RW) or 'create' (incl. RW)
"""
self._url = url
self._token = token
self._org = org
self._bucket = bucket
self._client = InfluxDBClient(url=self._url, token=self._token,
org=self._org)
self._write_api_write = self._client.write_api(write_options=write_option).write
if access != 'readonly':
self.enable_write_access()
self._deadline = 0
self._active_streams = {}
self.set_time_precision(3)
self.add_new_bucket(self._bucket, create)
if threaded:
self._update_queue = queue.Queue(100)
self._write_thread = threading.Thread(target=self._write_thread)
self.add_new_bucket(self._bucket, access == 'create')
self._write_buffer = []
def _write(self, point):
if self._update_queue:
self._update_queue.put(point)
else:
self._write_api_write(bucket=self._bucket, record=[point])
def _write_thread(self):
while 1:
points = [self.write_queue.get()]
try:
while 1:
points.append(self.write_queue.get(False))
except queue.Empty:
pass
self._write_api_write(bucket=self._bucket, record=points)
event = self._wait_complete
if event:
self._wait_complete = None
event.set()
def flush(self):
if self._write_thread:
self._wait_complete = event = threading.Event()
event.wait()
def enable_write_access(self):
self._write_api_write = self._client.write_api(write_options=SYNCHRONOUS).write
def set_time_precision(self, digits):
self.timedig = max(0, min(digits, 9))
self._write_precision = TIME_PRECISION[self.timedig]
def to_timestamp(self, timevalue):
return round(timevalue.timestamp(), self.timedig)
def disconnect(self):
for _ in range(10):
self._write_thread.join(1)
for stream, last in self._active_streams.items():
self._write(Point('_streams_')
.time(last, write_precision=self._write_precision)
.field('interval', 0).tag('stream', stream))
self.flush()
self._client.close()
@ -231,138 +222,209 @@ class InfluxDBWrapper:
bucket=self._bucket, org=self._org)
def delete_all_measurements(self):
all = self.get_measurements()
for meas in all:
self.get_measurements(meas)
print('deleted', all)
measurements = self.get_measurements()
for meas in measurements:
self.delete_measurement(meas)
print('deleted', measurements)
def query(self, start=None, stop=None, interval=None, last=False, **tags):
# query the database
def query(self, start=None, stop=None, interval=None, last=False, columns=None, **tags):
"""Returns queried data as InfluxDB tables
:param start: start time. default is a month ago
:param stop: end time, default is tomorrow at the same time
:param interval: is set an aggregation filter will be applied. This will
return only the latest values for a time interval in seconds.
:param interval: if set an aggregation filter will be applied. This will
return only the latest values per time interval in seconds.
:param last: when True, only the last value within the interval is returned
(for any existing combinations of tags!)
:param columns: if given, return only these columns (in addition to '_time' and '_value')
:param tags: selection criteria:
<tag>=None
return records independent of the tag. the value will be contained in the result dicts key
return records independent of this tag. the value will be contained in the result dicts key
<tag>=[<value1>, <value2>, ...]
return records with given tag from the list. the value is contained in the result dicts key
return records where tag is one from the list. the value is contained in the result dicts key
<tag>>=<value>:
return only records with given tag matching <value>. the value is not part of the results key
<tag>=<func>
where <func> is a callable. return tag as value in returned rows. <func> is used for value conversion
:return: a dict <tuple of key values> of list of <row>>
:return: a dict <tuple of key values> of list of <row>
where <tuple of keys> and <row> are NamedTuple
"""
self.flush()
start, stop = round_range(*abs_range(start, stop), interval)
start, stop = round_range(*abs_range(start, stop))
msg = [f'from(bucket:"{self._bucket}")',
f'|> range(start: {start}, stop: {stop})']
columns = {'_time': self.to_timestamp, '_value': negnull2none}
keynames = []
dropcols = ['_start', '_stop']
for key, crit in tags.items():
if crit is None:
keynames.append(key)
continue
if callable(crit):
columns[key] = crit
continue
if isinstance(crit, str):
if isinstance(crit, RegExp) or '*' in crit:
keynames.append(key)
msg.append(wildcard_filter(key, [crit]))
append_wildcard_filter(msg, key, [crit])
continue
dropcols.append(key)
crit = f'"{crit}"'
elif not isinstance(crit, (int, float)):
elif isinstance(crit, (int, float)):
dropcols.append(key)
else:
try:
keynames.append(key)
msg.append(wildcard_filter(key, crit))
append_wildcard_filter(msg, key, crit)
continue
except Exception:
raise ValueError(f'illegal value for {key}: {crit}')
msg.append(f'|> filter(fn:(r) => r.{key} == {crit})')
if last:
msg.append('|> last(column: "_time")')
if interval:
msg.append(f'|> aggregateWindow(every: {interval}s, fn: last, createEmpty: false)')
if columns is not None:
msg.append(f'''|> keep(columns:["{'","'.join(list(columns) + keynames)}"])''')
if columns is None:
msg.append(f'''|> drop(columns:["{'","'.join(dropcols)}"])''')
else:
columns = ['_time', '_value'] + list(columns)
msg.append(f'''|> keep(columns:["{'","'.join(columns + keynames)}"])''')
msg = '\n'.join(msg)
print(msg)
tables = self._client.query_api().query(msg)
result = CurveDict()
keycls = NamedTuple(*keynames)
colcls = NamedTuple(**columns)
for table in tables:
key = None
for rec in table:
print(rec.values)
if key is None:
key = keycls(**rec.values)
result.setdefault(key, [])
data = colcls(**rec.values)
result[key].append(data)
result[key].sort(key=lambda v: v[0])
print('---')
reader = self._client.query_api().query_csv(msg)
sort = False
converters = None
group = None
column_names = None
key = None
result = {}
table = None
for row in reader:
if not row:
continue
if row[0]:
if row[0] == '#datatype':
converters = {i: CONVERTER.get(d) for i, d in enumerate(row) if i > 2}
column_names = None
elif row[0] == '#group':
group = row
continue
if column_names is None:
column_names = row
keys = {}
for col, (name, grp) in enumerate(zip(column_names, group)):
if grp != 'true':
continue
# if name in keynames or (columns is None and name not in dropcols):
if name in keynames or columns is None:
keys[col] = converters.pop(col)
else:
sort = True
valuecls = NamedTuple([row[i] for i in converters])
keycls = NamedTuple([row[i] for i in keys])
continue
if row[2] != table:
# new table, new key
table = row[2]
key = keycls(f(row[i]) for i, f in keys.items())
if result.get(key) is None:
result[key] = []
elif not sort:
# this should not happen
sort = True
result[key].append(valuecls(f(row[i]) for i, f in converters.items()))
if last:
for key, table in result.items():
result[key], = table
elif sort:
for table in result.values():
table.sort()
return result
def curves(self, start=None, stop=None, measurement=None, field='float', interval=None,
add_prev=True, **tags):
add_prev=3600, add_end=False, **tags):
"""get curves
:param start: start time (default: one month ago)
:param stop: end time (default: tomorrow)
:param measurement: '<module>.<parameter>' (default: ['*.value', '*.target'])
:param field: default 'float' (only numeric curves)
:param interval: if given, the result is binned
:param add_prev: amount of time to look back for the last previous point (default: 1 hr)
:param add_end: whether to add endpoint at stop time (default: False)
:param tags: further selection criteria
:return: a dict <tuple of key values> of list of <row>
where <tuple of keys> and <row> are NamedTuple
<row> is (<timestamp>, <value>)
when field='float' (the default), the returned values are either a floats or None
"""
for key, val in zip(('_measurement', '_field'), (measurement, field)):
tags.setdefault(key, val)
start, stop = abs_range(start, stop)
rstart, rstop = round_range(start, stop, interval)
if rstart < rstop:
result = self.query(rstart, rstop, interval, **tags)
result = self.query(rstart, rstop, interval, columns=[], **tags)
else:
result = {}
if add_prev:
prev = self.query(rstart - DAY, rstart, last=True, **tags)
for key, prev in prev.items():
prev_data = self.query(rstart - add_prev, rstart, last=True, **tags)
for key, first in prev_data.items():
curve = result.get(key)
first = prev[-1]
if first[1] is not None:
if curve:
if first[0] < curve[0][0]:
curve.insert(0, first)
else:
result[key] = [first]
if add_end:
for key, curve in result.items():
if curve:
last = list(curve[-1])
if last[0] < stop:
last[0] = stop
curve.append(type(curve[-1])(last))
return result
def write(self, measurement, field, value, ts, **tags):
self._active_streams[tags.get('stream')] = ts
if ts > self._deadline:
dl = ts // self._watch_interval * self._watch_interval
for stream, last in self._active_streams.items():
self._write(
Point('_streams_')
.time(datetime.utcfromtimestamp(last), write_precision=self._write_precision)
.field('interval', self._watch_interval).tag('stream', stream))
self._active_streams.clear()
self._deadline = dl + self._watch_interval
# write to the database
def _add_point(self, value, ts, measurement, field, tags):
point = Point(measurement).field(f'{field}', value)
if ts:
point.time(datetime.utcfromtimestamp(ts), write_precision=self._write_precision)
for key, val in tags.items():
point.tag(key, val)
self._write(point)
self._write_buffer.append(point)
def write_float(self, measurement, field, value, ts, **tags):
# make sure value is float
value = -0.0 if value is None else float(value)
self.write(measurement, field, value, ts, **tags)
def write(self, measurement, field, value, ts, **tags):
"""add point and flush"""
self._add_point(measurement, field, value, ts, tags)
self.flush()
def write_string(self, measurement, field, value, ts, **tags):
# make sure value is string
value = '' if value is None else str(value)
self.write(measurement, field, value, ts, **tags)
def flush(self):
"""flush write buffer"""
points = self._write_buffer
self._write_buffer = []
if points:
try:
self._write_api_write(bucket=self._bucket, record=points)
except TypeError as e:
if self._write_api_write is None:
raise PermissionError('no write access - need access="write"') from None
raise
def add_point(self, isfloat, value, *args):
"""add point to the buffer
flush must be called in order to write the buffer
"""
if isfloat:
# make sure value is float
self._add_point(-0.0 if value is None else float(value), *args)
else:
self._add_point('' if value is None else str(value), *args)
def testdb():