# ***************************************************************************** # Copyright (c) 2024 ff by the module authors # # This program is free software; you can redistribute it and/or modify it under # the terms of the GNU General Public License as published by the Free Software # Foundation; either version 2 of the License, or (at your option) any later # version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more # details. # # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # Module authors: # Markus Zolliker # # ***************************************************************************** import re import time from datetime import datetime from math import floor, ceil from influxdb_client import InfluxDBClient, BucketRetentionRules, Point from influxdb_client.client.write_api import SYNCHRONOUS DAY = 24 * 3600 # write_precision from digits after decimal point TIME_PRECISION = ['s'] + ['ms'] * 3 + ['us'] * 3 + ['ns'] * 3 try: parse_time = datetime.fromisoformat except AttributeError: from dateutil.parser import parse as parse_time def to_time(v): return parse_time(v).timestamp() def identity(v): return v def double(v): return None if v == '-0' else float(v) CONVERTER = { 'string': identity, 'long': int, 'double': double, 'unsigned_long': int, 'duration': int, 'dateTime:RFC3339': to_time, 'dateTime:RFC3339Nano': to_time, # 'base64Binary': base64.b64decode, } class NamedTuple(tuple): """for our purpose improved version of collection.namedtuple - names may be any string, but when not an identifer, attribute access is not possible - access by key with get ([ ] is for indexed access) Usage: MyNamedTuple = NamedTuple.make_class(('a', 'b')) x = MyNamedTuple(('a', 2.0)) assert x == ('a', 2.0) == (x.a, x.b) == (x.get('a'), x.get('b')) == (x[0], x[1]) """ keys = None _idx_by_name = None def __new__(cls, keys): """create NamedTuple class from keys :param keys: a sequence of names for the elements """ idxbyname = {n: i for i, n in enumerate(keys)} attributes = {n: property(lambda s, idx=i: s[idx]) for i, n in enumerate(keys) if n.isidentifier() and not hasattr(cls, n)} # clsname = '_'.join(attributes) attributes.update(_idx_by_name=idxbyname, __new__=tuple.__new__, keys=tuple(keys)) return type(f"NamedTuple", (cls,), attributes) def get(self, key, default=None, strict=False): """get item by key :param key: the key :param default: value to return when key does not exist :param strict: raise KeyError when key does not exist and ignore default :return: the value of requested element or default if the key does not exist """ try: return self[self._idx_by_name[key]] except KeyError: if strict: raise return default @property def names(self): return tuple(self._idx_by_name) def tuple(self, *keys): return tuple(self.get(k) for k in keys) class Table(list): """a list of tuples with meta info""" def __init__(self, tags, key_names, column_names): super().__init__() self.tags = tags self.key_names = key_names self.column_names = column_names class Single(Table): """a single row of a table, as a list with meta info""" def __init__(self, table): super().__init__(table.tags, table.key_names, table.column_names) single, = table self[:] = single class RegExp(str): """indicates, tht this string should be treated as regexp Usage: RegExp() when used in InfluxDBWrapper.query, uses Go regexp syntax! """ def append_wildcard_filter(msg, key, names): patterns = [] for pattern in names: if isinstance(pattern, RegExp): patterns.append(pattern) else: # patterns.append('[^.]*'.join(re.escape(v) for v in pattern.split('*'))) patterns.append('.*'.join(re.escape(v) for v in pattern.split('*'))) if patterns: pattern = '|'.join(patterns) msg.append(f'|> filter(fn:(r) => r.{key} =~ /^({pattern})$/)') class CurveDict(dict): def __missing__(self, key): return [] def abs_range(start=None, stop=None): now = time.time() if start is None: start = int(now - 32 * DAY) elif start < 366 * DAY: start = int(now + start) if stop is None: stop = int(now + DAY) elif stop < 366 * DAY: stop = ceil(now + stop) return start, stop def round_range(start, stop, interval=None): interval = max(1, interval or 0) start = floor(floor(start) // interval * interval) stop = ceil(ceil(stop // interval) * interval) return start, stop class InfluxDBWrapper: """Wrapper for InfluxDB API 2.0. based on nicos.services.cache.database.influxdb (work of Konstantin Kholostov ) """ _update_queue = None _write_api_write = None def __init__(self, url, token, org, bucket, access='readonly'): """initialize :param url: the url for the influx DB :param token: the token :param org: the organisation :param bucket: the bucket name :param access: 'readonly', 'write' (RW) or 'create' (incl. RW) """ self._url = url self._token = token self._org = org self._bucket = bucket self._client = InfluxDBClient(url=self._url, token=self._token, org=self._org) if access != 'readonly': self.enable_write_access() self._deadline = 0 self.set_time_precision(3) self.add_new_bucket(self._bucket, access == 'create') self._write_buffer = [] def enable_write_access(self): self._write_api_write = self._client.write_api(write_options=SYNCHRONOUS).write def set_time_precision(self, digits): self.timedig = max(0, min(digits, 9)) self._write_precision = TIME_PRECISION[self.timedig] def disconnect(self): self.flush() self._client.close() def get_bucket_names(self): bucket_names = [] buckets = self._client.buckets_api().find_buckets().buckets for bucket in buckets: bucket_names.append(bucket.name) return bucket_names def add_new_bucket(self, bucket_name, create): if bucket_name not in self.get_bucket_names(): if not create: raise ValueError(f'unknown bucket {bucket_name}') retention_rules = BucketRetentionRules(type='expire', every_seconds=0) self._client.buckets_api().create_bucket( bucket_name=bucket_name, retention_rules=retention_rules, org=self._org) def get_measurements(self): return [r['_value'] for t in self._client.query_api().query(f""" import "influxdata/influxdb/schema" schema.measurements(bucket: "{self._bucket}")""") for r in t] def delete_measurement(self, measurement): delete_api = self._client.delete_api() delete_api.delete('1970-01-01T00:00:00Z', '2038-01-01T00:00:00Z', f'_measurement="{measurement}"', bucket=self._bucket, org=self._org) def delete_all_measurements(self): measurements = self.get_measurements() for meas in measurements: self.delete_measurement(meas) print('deleted', measurements) # query the database def query(self, start=None, stop=None, interval=None, single=None, columns=None, **tags): """Returns queried data as InfluxDB tables :param start: start time. default is a month ago :param stop: end time, default is tomorrow at the same time :param interval: if set an aggregation filter will be applied. This will return only the latest values per time interval in seconds. :param single: when True (or 1), only the last value within the interval is returned (for any existing combinations of tags!) single=-1: return the first value instead :param columns: if given, return only these columns (in addition to '_time' and '_value') :param tags: selection criteria: =None return records independent of this tag. the obtained value will be contained in the result dicts key =[, , ...] return records where tag is one from the list. the obtained value is contained in the result dicts key =: return only records with given tag matching . the obtained value is contained in the result dicts key only if the value is an instance of RegExp or when it contains an asterisk ('*') :return: a dict of list of where and are NamedTuple """ self.flush() start, stop = round_range(*abs_range(start, stop)) msg = [f'from(bucket:"{self._bucket}")', f'|> range(start: {start}, stop: {stop})'] keys = {} dropcols = ['_start', '_stop'] fixed_tags = {} for key, crit in tags.items(): if crit is None: keys[key] = None continue if isinstance(crit, str): if isinstance(crit, RegExp) or '*' in crit: keys[key] = None append_wildcard_filter(msg, key, [crit]) continue fixed_tags[key] = crit dropcols.append(key) crit = f'"{crit}"' elif isinstance(crit, (int, float)): fixed_tags[key] = crit dropcols.append(key) else: try: keys[key] = None append_wildcard_filter(msg, key, crit) continue except Exception: raise ValueError(f'illegal value for {key}: {crit}') msg.append(f'|> filter(fn:(r) => r.{key} == {crit})') if single: if single < 0: msg.append('|> first(column: "_time")') else: msg.append('|> last(column: "_time")') if interval: msg.append(f'|> aggregateWindow(every: {interval}s, fn: last, createEmpty: false)') if columns is None: msg.append(f'''|> drop(columns:["{'","'.join(dropcols)}"])''') else: columns = ['_time', '_value'] + list(columns) msg.append(f'''|> keep(columns:["{'","'.join(columns + keys)}"])''') msg = '\n'.join(msg) print(msg) reader = self._client.query_api().query_csv(msg) print('CSV', keys, columns) converters = None group = None column_names = None column_keys = None key = None result = {} tableno = None for row in reader: if not row: continue if row[0]: if row[0] == '#datatype': converters = {i: CONVERTER.get(d) for i, d in enumerate(row) if i > 2} column_names = None elif row[0] == '#group': group = row continue if column_names is None: column_names = row for col, (name, grp) in enumerate(zip(column_names, group)): if grp != 'true': continue if columns is None or name in keys: keys[name] = col, converters.pop(col) column_keys = tuple(column_names[i] for i in converters) continue if row[2] != tableno: # new table, new key tableno = row[2] key_dict = {n: f(row[i]) for n, (i, f) in keys.items()} key = tuple(key_dict.values()) if result.get(key) is None: print('KC', key_dict, column_keys) result[key] = Table({**fixed_tags, **key_dict}, tuple(keys), column_keys) result[key].append(tuple(f(row[i]) for i, f in converters.items())) if single: for key, table in result.items(): result[key] = Single(table) else: for table in result.values(): table.sort() return result def curves(self, start=None, stop=None, measurement=('*.value', '*.target'), field='float', interval=None, add_prev=3600, add_end=True, **tags): """get curves :param start: start time (default: one month ago) :param stop: end time (default: tomorrow) :param measurement: '.' (default: ['*.value', '*.target']) :param field: default 'float' (only numeric curves) :param interval: if given, the result is binned :param add_prev: amount of time to look back for the last previous point (default: 1 hr) :param add_end: whether to add endpoint at stop time (default: False) :param tags: further selection criteria :return: a dict of or where
is a list of tuples with some meta info (table.tags, table.column_names) and is a list (a single row of a table) with the same meta info when _field='float' (the default), the returned values are either a floats or None """ tags.setdefault('_measurement', measurement) tags.setdefault('_field', field) start, stop = abs_range(start, stop) rstart, rstop = round_range(start, stop, interval) if rstart < rstop: result = self.query(rstart, rstop, interval, columns=None, **tags) # result = self.query(rstart, rstop, interval, columns=['stream', 'device'], **tags) else: result = {} if add_prev: prev_data = self.query(rstart - add_prev, rstart, single=1, **tags) for key, first in prev_data.items(): curve = result.get(key) if first[1] is not None: if curve: if first[0] < curve[0][0]: curve.insert(0, first) else: result[key] = [first] if add_end: for key, curve in result.items(): if curve: last = list(curve[-1]) if last[0] < stop: last[0] = stop curve.append(type(curve[-1])(last)) return result # write to the database def _add_point(self, value, ts, measurement, field, tags): point = Point(measurement).field(f'{field}', value) if ts: point.time(datetime.utcfromtimestamp(ts), write_precision=self._write_precision) for key, val in tags.items(): point.tag(key, val) self._write_buffer.append(point) def write(self, measurement, field, value, ts, **tags): """add point and flush""" self._add_point(measurement, field, value, ts, tags) self.flush() def flush(self): """flush write buffer""" points = self._write_buffer self._write_buffer = [] if points: try: self._write_api_write(bucket=self._bucket, record=points) except TypeError as e: if self._write_api_write is None: raise PermissionError('no write access - need access="write"') from None raise def add_point(self, isfloat, value, *args): """add point to the buffer flush must be called in order to write the buffer """ if isfloat: # make sure value is float self._add_point(-0.0 if value is None else float(value), *args) else: self._add_point('' if value is None else str(value), *args) def testdb(): token = "zqDbTcMv9UizfdTj15Fx_6vBetkM5mXN56EE9CiDaFsh7O2FFWZ2X4VwAAmdyqZr3HbpIr5ixRju07-oQmxpXw==" return InfluxDBWrapper('http://pc16392:8086', token, 'linse', 'curve-test')