Files
sehistory/influx.py
2025-02-11 15:59:27 +01:00

433 lines
16 KiB
Python

# *****************************************************************************
# Copyright (c) 2024 ff by the module authors
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
#
# *****************************************************************************
import re
import time
from datetime import datetime
from math import floor, ceil
from influxdb_client import InfluxDBClient, BucketRetentionRules, Point
from influxdb_client.client.write_api import SYNCHRONOUS
DAY = 24 * 3600
# write_precision from digits after decimal point
TIME_PRECISION = ['s'] + ['ms'] * 3 + ['us'] * 3 + ['ns'] * 3
try:
parse_time = datetime.fromisoformat
except AttributeError:
from dateutil.parser import parse as parse_time
def to_time(v):
return parse_time(v).timestamp()
def identity(v):
return v
def double(v):
return None if v == '-0' else float(v)
CONVERTER = {
'string': identity,
'long': int,
'double': double,
'unsigned_long': int,
'duration': int,
'dateTime:RFC3339': to_time,
'dateTime:RFC3339Nano': to_time,
# 'base64Binary': base64.b64decode,
}
class NamedTuple(tuple):
"""for our purpose improved version of collection.namedtuple
- names may be any string, but when not an identifer, attribute access is not possible
- access by key with get ([ ] is for indexed access)
Usage:
MyNamedTuple = NamedTuple.make_class(('a', 'b'))
x = MyNamedTuple(('a', 2.0))
assert x == ('a', 2.0) == (x.a, x.b) == (x.get('a'), x.get('b')) == (x[0], x[1])
"""
keys = None
_idx_by_name = None
def __new__(cls, keys):
"""create NamedTuple class from keys
:param keys: a sequence of names for the elements
"""
idxbyname = {n: i for i, n in enumerate(keys)}
attributes = {n: property(lambda s, idx=i: s[idx])
for i, n in enumerate(keys)
if n.isidentifier() and not hasattr(cls, n)}
# clsname = '_'.join(attributes)
attributes.update(_idx_by_name=idxbyname, __new__=tuple.__new__, keys=tuple(keys))
return type(f"NamedTuple", (cls,), attributes)
def get(self, key, default=None, strict=False):
"""get item by key
:param key: the key
:param default: value to return when key does not exist
:param strict: raise KeyError when key does not exist and ignore default
:return: the value of requested element or default if the key does not exist
"""
try:
return self[self._idx_by_name[key]]
except KeyError:
if strict:
raise
return default
class RegExp(str):
"""indicates, tht this string should be treated as regexp
Usage: RegExp(<pattern>)
when used in InfluxDBWrapper.query, uses Go regexp syntax!
"""
def append_wildcard_filter(msg, key, names):
patterns = []
for pattern in names:
if isinstance(pattern, RegExp):
patterns.append(pattern)
else:
# patterns.append('[^.]*'.join(re.escape(v) for v in pattern.split('*')))
patterns.append('.*'.join(re.escape(v) for v in pattern.split('*')))
if patterns:
pattern = '|'.join(patterns)
msg.append(f'|> filter(fn:(r) => r.{key} =~ /^({pattern})$/)')
class CurveDict(dict):
def __missing__(self, key):
return []
def abs_range(start=None, stop=None):
now = time.time()
if start is None:
start = int(now - 32 * DAY)
elif start < 366 * DAY:
start = int(now + start)
if stop is None:
stop = int(now + DAY)
elif stop < 366 * DAY:
stop = ceil(now + stop)
return start, stop
def round_range(start, stop, interval=None):
interval = max(1, interval or 0)
start = floor(floor(start) // interval * interval)
stop = ceil(ceil(stop // interval) * interval)
return start, stop
class InfluxDBWrapper:
"""Wrapper for InfluxDB API 2.0.
based on nicos.services.cache.database.influxdb
(work of Konstantin Kholostov <k.kholostov@fz-juelich.de>)
"""
_update_queue = None
_write_api_write = None
def __init__(self, url, token, org, bucket, access='readonly'):
"""initialize
:param url: the url for the influx DB
:param token: the token
:param org: the organisation
:param bucket: the bucket name
:param access: 'readonly', 'write' (RW) or 'create' (incl. RW)
"""
self._url = url
self._token = token
self._org = org
self._bucket = bucket
self._client = InfluxDBClient(url=self._url, token=self._token,
org=self._org)
if access != 'readonly':
self.enable_write_access()
self._deadline = 0
self.set_time_precision(3)
self.add_new_bucket(self._bucket, access == 'create')
self._write_buffer = []
def enable_write_access(self):
self._write_api_write = self._client.write_api(write_options=SYNCHRONOUS).write
def set_time_precision(self, digits):
self.timedig = max(0, min(digits, 9))
self._write_precision = TIME_PRECISION[self.timedig]
def disconnect(self):
self.flush()
self._client.close()
def get_bucket_names(self):
bucket_names = []
buckets = self._client.buckets_api().find_buckets().buckets
for bucket in buckets:
bucket_names.append(bucket.name)
return bucket_names
def add_new_bucket(self, bucket_name, create):
if bucket_name not in self.get_bucket_names():
if not create:
raise ValueError(f'unknown bucket {bucket_name}')
retention_rules = BucketRetentionRules(type='expire',
every_seconds=0)
self._client.buckets_api().create_bucket(
bucket_name=bucket_name,
retention_rules=retention_rules, org=self._org)
def get_measurements(self):
return [r['_value'] for t in self._client.query_api().query(f"""
import "influxdata/influxdb/schema"
schema.measurements(bucket: "{self._bucket}")""") for r in t]
def delete_measurement(self, measurement):
delete_api = self._client.delete_api()
delete_api.delete('1970-01-01T00:00:00Z', '2038-01-01T00:00:00Z', f'_measurement="{measurement}"',
bucket=self._bucket, org=self._org)
def delete_all_measurements(self):
measurements = self.get_measurements()
for meas in measurements:
self.delete_measurement(meas)
print('deleted', measurements)
# query the database
def query(self, start=None, stop=None, interval=None, last=False, columns=None, **tags):
"""Returns queried data as InfluxDB tables
:param start: start time. default is a month ago
:param stop: end time, default is tomorrow at the same time
:param interval: if set an aggregation filter will be applied. This will
return only the latest values per time interval in seconds.
:param last: when True, only the last value within the interval is returned
(for any existing combinations of tags!)
:param columns: if given, return only these columns (in addition to '_time' and '_value')
:param tags: selection criteria:
<tag>=None
return records independent of this tag. the value will be contained in the result dicts key
<tag>=[<value1>, <value2>, ...]
return records where tag is one from the list. the value is contained in the result dicts key
<tag>>=<value>:
return only records with given tag matching <value>. the value is not part of the results key
:return: a dict <tuple of key values> of list of <row>
where <tuple of keys> and <row> are NamedTuple
"""
self.flush()
start, stop = round_range(*abs_range(start, stop))
msg = [f'from(bucket:"{self._bucket}")',
f'|> range(start: {start}, stop: {stop})']
keynames = []
dropcols = ['_start', '_stop']
for key, crit in tags.items():
if crit is None:
keynames.append(key)
continue
if isinstance(crit, str):
if isinstance(crit, RegExp) or '*' in crit:
keynames.append(key)
append_wildcard_filter(msg, key, [crit])
continue
dropcols.append(key)
crit = f'"{crit}"'
elif isinstance(crit, (int, float)):
dropcols.append(key)
else:
try:
keynames.append(key)
append_wildcard_filter(msg, key, crit)
continue
except Exception:
raise ValueError(f'illegal value for {key}: {crit}')
msg.append(f'|> filter(fn:(r) => r.{key} == {crit})')
if last:
msg.append('|> last(column: "_time")')
if interval:
msg.append(f'|> aggregateWindow(every: {interval}s, fn: last, createEmpty: false)')
if columns is None:
msg.append(f'''|> drop(columns:["{'","'.join(dropcols)}"])''')
else:
columns = ['_time', '_value'] + list(columns)
msg.append(f'''|> keep(columns:["{'","'.join(columns + keynames)}"])''')
msg = '\n'.join(msg)
print(msg)
reader = self._client.query_api().query_csv(msg)
print('CSV')
converters = None
group = None
column_names = None
key = None
result = {}
table = None
for row in reader:
if not row:
continue
print(row)
if row[0]:
if row[0] == '#datatype':
converters = {i: CONVERTER.get(d) for i, d in enumerate(row) if i > 2}
column_names = None
elif row[0] == '#group':
group = row
continue
if column_names is None:
print('COL', row)
column_names = row
keys = {}
for col, (name, grp) in enumerate(zip(column_names, group)):
if grp != 'true':
continue
# if name in keynames or (columns is None and name not in dropcols):
if name in keynames or columns is None:
keys[col] = converters.pop(col)
valuecls = NamedTuple([row[i] for i in converters])
keycls = NamedTuple([row[i] for i in keys])
continue
if row[2] != table:
# new table, new key
table = row[2]
key = keycls(f(row[i]) for i, f in keys.items())
print('new table', table, key)
if result.get(key) is None:
result[key] = []
result[key].append(valuecls(f(row[i]) for i, f in converters.items()))
if last:
print('LAST')
for key, table in result.items():
result[key], = table
else:
for table in result.values():
table.sort()
return result
def curves(self, start=None, stop=None, measurement=('*.value', '*.target'), field='float',
interval=None, add_prev=3600, add_end=True, **tags):
"""get curves
:param start: start time (default: one month ago)
:param stop: end time (default: tomorrow)
:param measurement: '<module>.<parameter>' (default: ['*.value', '*.target'])
:param field: default 'float' (only numeric curves)
:param interval: if given, the result is binned
:param add_prev: amount of time to look back for the last previous point (default: 1 hr)
:param add_end: whether to add endpoint at stop time (default: False)
:param tags: further selection criteria
:return: a dict <tuple of key values> of list of <row>
where <tuple of keys> and <row> are NamedTuple
<row> is (<timestamp>, <value>)
when _field='float' (the default), the returned values are either a floats or None
"""
tags.setdefault('_measurement', measurement)
tags.setdefault('_field', field)
start, stop = abs_range(start, stop)
rstart, rstop = round_range(start, stop, interval)
if rstart < rstop:
result = self.query(rstart, rstop, interval, columns=None, **tags)
# result = self.query(rstart, rstop, interval, columns=['stream', 'device'], **tags)
else:
result = {}
if add_prev:
prev_data = self.query(rstart - add_prev, rstart, last=True, **tags)
for key, first in prev_data.items():
curve = result.get(key)
if first[1] is not None:
if curve:
if first[0] < curve[0][0]:
curve.insert(0, first)
else:
result[key] = [first]
if add_end:
for key, curve in result.items():
if curve:
last = list(curve[-1])
if last[0] < stop:
last[0] = stop
curve.append(type(curve[-1])(last))
return result
# write to the database
def _add_point(self, value, ts, measurement, field, tags):
point = Point(measurement).field(f'{field}', value)
if ts:
point.time(datetime.utcfromtimestamp(ts), write_precision=self._write_precision)
for key, val in tags.items():
point.tag(key, val)
self._write_buffer.append(point)
def write(self, measurement, field, value, ts, **tags):
"""add point and flush"""
self._add_point(measurement, field, value, ts, tags)
self.flush()
def flush(self):
"""flush write buffer"""
points = self._write_buffer
self._write_buffer = []
if points:
try:
self._write_api_write(bucket=self._bucket, record=points)
except TypeError as e:
if self._write_api_write is None:
raise PermissionError('no write access - need access="write"') from None
raise
def add_point(self, isfloat, value, *args):
"""add point to the buffer
flush must be called in order to write the buffer
"""
if isfloat:
# make sure value is float
self._add_point(-0.0 if value is None else float(value), *args)
else:
self._add_point('' if value is None else str(value), *args)
def testdb():
token = "zqDbTcMv9UizfdTj15Fx_6vBetkM5mXN56EE9CiDaFsh7O2FFWZ2X4VwAAmdyqZr3HbpIr5ixRju07-oQmxpXw=="
return InfluxDBWrapper('http://pc16392:8086', token, 'linse', 'curve-test')