# ***************************************************************************** # Copyright (c) 2024 ff by the module authors # # This program is free software; you can redistribute it and/or modify it under # the terms of the GNU General Public License as published by the Free Software # Foundation; either version 2 of the License, or (at your option) any later # version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more # details. # # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # Module authors: # Markus Zolliker # # ***************************************************************************** import re import time import logging from datetime import datetime, timezone from math import floor, ceil from influxdb_client import InfluxDBClient, BucketRetentionRules, Point from influxdb_client.client.write_api import SYNCHRONOUS DAY = 24 * 3600 YEAR = 366 * DAY SINCE_EVER = YEAR + DAY # write_precision from digits after decimal point TIME_PRECISION = ['s'] + ['ms'] * 3 + ['us'] * 3 + ['ns'] * 3 UNDEF = '' try: parse_time = datetime.fromisoformat except AttributeError: from dateutil.parser import parse as parse_time def to_time(v): return parse_time(v).timestamp() def to_iso(t): return datetime.fromtimestamp(t, timezone.utc).isoformat().replace('+00:00', 'Z') class PrettyFloat(float): """saves bandwidth when converting to JSON a lot of numbers originally have a fixed (low) number of decimal digits. as the binary representation is not exact, it might happen, that a lot of superfluous digits are transmitted: str(1/10*3) == '0.30000000000000004' str(PrettyFloat(1/10*3)) == '0.3' """ def __new__(cls, value): return None if value == '-0' else super().__new__(cls, value) def __repr__(self): return '%.15g' % self class Converters(dict): def __init__(self, datatypes): super().__init__((i, getattr(self, f"cvt_{d.split(':')[0]}")) for i, d in enumerate(datatypes) if i > 2) def as_tuple(self, row): """get selected columns as tuple""" return tuple(f(row[i]) for i, f in self.items()) cvt_double = staticmethod(PrettyFloat) @staticmethod def cvt_string(value): return value @staticmethod def cvt_long(value): return int(value) @staticmethod def cvt_dateTime(value): return to_time(value) @staticmethod def cvt_boolean(value): return value == 'true' cvt_unsigned_long = cvt_duration = cvt_long class Table(list): """a list of tuples with meta info""" def __init__(self, tags=None, key_names=(), column_names=(), rows=None): super().__init__() self.tags = tags or {} self.key_names = key_names self.column_names = column_names if rows: self[:] = rows def to_csv_rows(self, timeoffset=0, sep='\t', none='none', float_format='%.15g'): for row in self: result = ['%.15g' % (row[0] - timeoffset)] for value in row[1:]: try: result.append(float_format % value) except TypeError: if value is None: result.append(none) else: result.append(str(value).replace(sep, ' ')) yield sep.join(result) class RegExp(str): """indicates, tht this string should be treated as regexp Usage: RegExp() when used in InfluxDBWrapper.query, uses Go regexp syntax! """ def append_wildcard_filter(msg, key, names): patterns = [] for pattern in names: if isinstance(pattern, RegExp): patterns.append(pattern) else: # patterns.append('[^.]*'.join(re.escape(v) for v in pattern.split('*'))) patterns.append('.*'.join(re.escape(v) for v in pattern.split('*'))) if patterns: pattern = '|'.join(patterns) msg.append(f'|> filter(fn:(r) => r.{key} =~ /^({pattern})$/)') class CurveDict(dict): def __missing__(self, key): return [] def abs_range(start=None, stop=None): now = time.time() if start is None: # since ever start = SINCE_EVER elif start < YEAR: start = int(now + start) if stop is None: stop = int(now + YEAR) elif stop < YEAR: stop = ceil(now + stop) return start, stop def round_range(start, stop, interval=None): interval = max(1, interval or 0) start = floor(floor(start) // interval * interval) stop = ceil(ceil(stop // interval) * interval) return start, stop class InfluxDBWrapper: """Wrapper for InfluxDB API 2.0. based on nicos.services.cache.database.influxdb (work of Konstantin Kholostov ) """ _update_queue = None _write_api_write = None def __init__(self, uri=None, token=None, org=None, bucket=None, access='readonly'): """initialize :param uri: the uri for the influx DB or a filepath for a config file :param token: the token or the section in the config file :param org: the organisation :param bucket: the bucket name :param access: 'readonly', 'write' (RW) or 'create' (incl. RW) """ self._url, self._token, self._org, self._bucket = uri, token, org, bucket self._client = InfluxDBClient(url=uri, token=token, org=org) if access != 'readonly': self.enable_write_access() self._deadline = 0 self.set_time_precision(3) self.add_new_bucket(self._bucket, access == 'create') self._write_buffer = [] self._alias = {} logging.info('InfluxDBWrapper %s %s %s', self._url, self._org, self._bucket) def enable_write_access(self): self._write_api_write = self._client.write_api(write_options=SYNCHRONOUS).write def set_time_precision(self, digits): self.timedig = max(0, min(digits, 9)) self._write_precision = TIME_PRECISION[self.timedig] def disconnect(self): self.flush() self._client.close() def get_bucket_names(self): bucket_names = [] buckets = self._client.buckets_api().find_buckets().buckets for bucket in buckets: bucket_names.append(bucket.name) return bucket_names def add_new_bucket(self, bucket_name, create): if bucket_name not in self.get_bucket_names(): if not create: raise ValueError(f'unknown bucket {bucket_name}') retention_rules = BucketRetentionRules(type='expire', every_seconds=0) self._client.buckets_api().create_bucket( bucket_name=bucket_name, retention_rules=retention_rules, org=self._org) def get_measurements(self): return [r['_value'] for t in self._client.query_api().query(f""" import "influxdata/influxdb/schema" schema.measurements(bucket: "{self._bucket}")""") for r in t] def delete_measurement(self, measurement, start=None, stop=None): delete_api = self._client.delete_api() start, stop = abs_range(start, stop) if stop is None: stop = time.time() + DAY delete_api.delete(to_iso(start), to_iso(stop), f'_measurement="{measurement}"', bucket=self._bucket, org=self._org) def delete_all_measurements(self, measurements=None, start=0, stop=None): if measurements is None: measurements = self.get_measurements() for meas in measurements: self.delete_measurement(meas, start, stop) def _get_rows(self, reader, as_tuple, first_row): row = first_row tableno = row[2] try: while 1: if row[0]: first_row[:] = row return if row[2] != tableno: # table id changed: new table, store first row for next call first_row[:] = row return yield as_tuple(row) row = next(reader) if not row: raise ValueError('EMPTY') except StopIteration: first_row.clear() # indicate end of data # query the database def query(self, start=None, stop=None, interval=None, single=None, columns=None, **tags): """Returns queried data as InfluxDB tables :param start: start time (default: since ever) :param stop: end time (default: eternity = 1 year in the future) :param interval: if set an aggregation filter will be applied. This will return only the latest values per time interval in seconds. :param single: when not 0, only the last value within the interval is returned the resulting tables have all exactly one row (for any existing combinations of tags!) single < 0: return the first value instead :param columns: if given, return only these columns (in addition to '_time' and '_value') :param tags: selection criteria: =None return records independent of this tag. the obtained value will be contained in the result dicts key =[, , ...] return records where tag is one from the list. the obtained value is contained in the result dicts key =: return only records with given tag matching . the obtained value is contained in the result dicts key only if the value is an instance of RegExp or when it contains an asterisk ('*') :return: a dict of Table is an extension of list, with some meta info """ result = {} for rows, key, props in self.query_gen(start, stop, interval, single, columns, **tags): table = Table(*props, rows=rows) table.sort() result[key] = table return result def query_gen(self, start=None, stop=None, interval=None, single=None, columns=None, **tags): """Returns queried data as InfluxDB as a generator argument description: see query methods :return: an iterator of (rows, key, (tags, key_names, column_names)) remark: rows not consumed in between iteration steps are lost using this generator version does reduce memory usage """ self.flush() start, stop = round_range(*abs_range(start, stop)) msg = [f'from(bucket:"{self._bucket}")', f'|> range(start: {start}, stop: {stop})'] keylist = [] dropcols = ['_start', '_stop'] fixed_tags = {} for key, crit in tags.items(): if crit is None: keylist.append(key) continue if isinstance(crit, str): if isinstance(crit, RegExp) or '*' in crit: keylist.append(key) append_wildcard_filter(msg, key, [crit]) continue fixed_tags[key] = crit dropcols.append(key) crit = f'"{crit}"' elif isinstance(crit, bool): crit = 'true' if crit else 'false' fixed_tags[key] = crit dropcols.append(key) elif isinstance(crit, (int, float)): fixed_tags[key] = crit dropcols.append(key) else: try: keylist.append(key) append_wildcard_filter(msg, key, crit) continue except Exception: raise ValueError(f'illegal value for {key}: {crit}') msg.append(f'|> filter(fn:(r) => r.{key} == {crit})') if single: if single < 0: msg.append('|> first(column: "_time")') else: msg.append('|> last(column: "_time")') if interval: msg.append(f'|> aggregateWindow(every: {interval:g}s, fn: last, createEmpty: false)') if columns is None: msg.append(f'''|> drop(columns:["{'","'.join(dropcols)}"])''') else: columns = ['_time', '_value'] + list(columns) msg.append(f'''|> keep(columns:["{'","'.join(columns + keylist)}"])''') msg = '\n'.join(msg) logging.debug('MSG %r', msg) self.msg = msg try: reader = self._client.query_api().query_csv(msg) except Exception as e: logging.exception("error in query: %r", msg) raise # if self.debug: # def readdebug(reader): # for row in reader: # print(row) # yield row # reader = readdebug(reader) try: row = next(reader) except StopIteration: return converters = keys = column_names = None # make IDE happy while 1: header = {} if row[0]: # this is a header header[row[0]] = row for row in reader: if row: if not row[0]: break header[row[0]] = row else: return # this should not happen # we are now at the row with the column names column_names = row converters = Converters(header['#datatype']) group = header['#group'] keys = {k: None for k in keylist} for col, (name, grp) in enumerate(zip(column_names, group)): if grp != 'true': continue if columns is None or name in keys: keys[name] = col, converters.pop(col) none_keys = [k for k, v in keys.items() if v is None] if none_keys: for k in none_keys: keys.pop(k) # break row = next(reader) # we are at the first data row key_dict = {n: f(row[i]) for n, (i, f) in keys.items()} column_keys = tuple(column_names[i] for i in converters) table_properties = {**fixed_tags, **key_dict}, tuple(keys), column_keys key = tuple(key_dict.values()) row = list(row) # copy row, as it will be modified rows = self._get_rows(reader, converters.as_tuple, row) yield rows, key, table_properties # consume unused rows consumed = sum(1 for _ in rows) if consumed: logging.info('skip %r rows', consumed) if not row: # reader is at end return # write to the database def _add_point(self, measurement, field, value, ts, tags): point = Point(measurement).field(f'{field}', value) if ts: point.time(datetime.utcfromtimestamp(ts), write_precision=self._write_precision) for key, val in tags.items(): point.tag(key, val) self._write_buffer.append(point) def write(self, measurement, field, value, ts, **tags): """add point and flush""" self._add_point(measurement, field, value, ts, tags) self.flush() def flush(self): """flush write buffer""" points = self._write_buffer self._write_buffer = [] if points: try: self._write_api_write(bucket=self._bucket, record=points) except TypeError as e: if self._write_api_write is None: raise PermissionError('no write access - need access="write"') from None raise