import time from pathlib import Path from configparser import ConfigParser from influx import InfluxDBWrapper, abs_range, round_range, Table def summarize_tags(curves, remove_multiple=False): """summarize tags :param curves: list of curves (type Table) :param remove_multiple: True: remove non-unique values :return: dict of comma separated values """ result = {} for curve in curves: for k, v in curve.tags.items(): result.setdefault(k, set()).add(str(v)) if remove_multiple: return {k: ','.join(v) for k, v in result.items() if len(v) == 1} return {k: ','.join(v) for k, v in result.items()} class SEHistory(InfluxDBWrapper): def __init__(self, dbname=None, instrument=None, access='readonly'): self.instrument = instrument parser = ConfigParser() parser.optionxform = str parser.read([Path('~/.sehistory').expanduser()]) section = parser[dbname] if dbname else parser[parser.sections()[0]] super().__init__(*(section[k] for k in ('uri', 'token', 'org', 'bucket')), access=access) def curves(self, start=None, stop=None, measurement=('*.value', '*.target'), field='float', interval=None, add_prev=3600, add_end=False, merge=None, pivot=False, **kwds): """get curves :param start: start time (default: since ever) :param stop: end time (default: eternity = 1 year in the future) :param measurement: '.' (default: ['*.value', '*.target']) :param field: default 'float' (only numeric curves) :param interval: if given, the result is binned :param add_prev: amount of time to look back for the last previous point (default: 1 hr) :param add_end: whether to add endpoint at stop time (default: False) :param merge: None: no merge happens, else curves with the same final key are merged. 2 cases: one key (str): the name of final key. result will be a dict of of a tuple of keys: the names of the final key elements. result: dict of of Table :param pivot: sort values in to columns of one big table :param kwds: further selection criteria :return: a dict of
or where
is a list of tuples with some meta info (table.tags, table.column_names) and is a list (a single row of a table) with the same meta info when _field='float' (the default), the returned values are either a floats or None """ tags = {k: v for k, v in (('_measurement', measurement), ('_field', field)) if v is not None} tags.update(kwds) start, stop = abs_range(start, stop) rstart, rstop = round_range(start, stop, interval) if rstart < rstop: result = self.query(rstart, rstop, interval, columns=None, **tags) # result = self.query(rstart, rstop, interval, columns=['stream', 'device'], **tags) else: result = {} start_row = {} if add_prev: prev_data = self.query(rstart - add_prev, rstart, single=1, **tags) for key, first in prev_data.items(): curve = result.get(key) if first[1] is not None: if curve: if first[0] < curve[0][0]: if pivot: curve.insert(0, (rstart,) + tuple(first[1:])) # start_row.setdefault(key[1:], {})[key[0]] = first[1] else: curve.insert(0, tuple(first)) else: result[key] = table = Table(first.tags, first.key_names, first.column_names) table.append(tuple(first)) if add_end: self.complete(result, stop) if merge: single_merge = isinstance(merge, str) if single_merge: merge = [merge] rows = [] common_tags = summarize_tags(result.values(), True) col_keys = {} col_info = {} for key, curve in result.items(): merge_tags = {k: curve.tags.get(k, '') for k in merge} for k, v in zip(curve.key_names, key): merge_tags.setdefault(k, v) merge_key = tuple(zip(*merge_tags.items())) # (, ) info = col_info.get(merge_key) if info is None: col_idx = len(col_keys) + 1 col_keys[col_idx] = merge_key col_info[merge_key] = info = [col_idx, []] else: col_idx = info[0] info[1].append(curve) assert curve.column_names[1] == '_value' for row in curve: rows.append((row[0], row[1], col_idx)) # merged.append((merge_key, curve)) rows.sort(key=lambda x: x[0]) if pivot: header = [] for merge_key, (col_idx, curves) in col_info.items(): tags = summarize_tags(curves) primary = tags.pop(merge[0], None) primary = [primary] if primary else [] header.append(' '.join(primary + [f"{k}={v}" for k, v in tags.items() if k not in common_tags])) result = Table(common_tags, (), ('_time',) + tuple(header)) values = [0] + [None] * len(col_keys) for row in rows: col_nr = row[2] values[col_nr] = row[1] if row[0] > values[0]: values[0] = row[0] result.append(tuple(values)) elif row[0] < values[0]: raise ValueError(f'{rstart} {values[0]} {row[0]}') else: result = {} by_idx = {} for merge_key, (col_idx, curves) in col_info.items(): tags = summarize_tags(curves) print('TAGS', tags) primary = tags.get(merge[0], '_value') table = Table(tags, merge_key[0], ('_time', primary)) result[primary if single_merge else merge_key[1][:len(merge)]] = table by_idx[col_idx] = table for row in rows: by_idx[row[2]].append((row[0], row[1])) return result @staticmethod def complete(curve_dict, end_time=0, tag=None): """complete to end_time if tag is given, end_time is a dict of """ if tag is None: end_time_dict = {} else: end_time_dict, end_time = end_time, 0 for curve in curve_dict.values(): if len(curve): tlast, value = curve[-1] etime = end_time_dict.get(curve.tags.get(tag), end_time) if value is not None and tlast < etime: curve.append((etime, value)) def export(self, start, stop, measurement=('*.value', '*.target'), field='float', interval=None, add_prev=3600, add_end=False, timeoffset=None, none='', **tags): result = self.curves(start, stop, measurement, field, interval, add_prev, add_end, merge=('_measurement', 'device', 'stream'), pivot=True, **tags) if timeoffset is None: timeoffset = int(start) result.tags.pop('_field', None) rows = [f"# {' '.join(f'{k}={v}' for k, v in result.tags.items())}"] rows.extend(f'# col {i} {k}' for i, k in enumerate(result.column_names)) rows.extend(result.to_csv_rows(timeoffset, none=none)) rows.append('') return '\n'.join(rows) def add_float(self, value, key, tags, ts): self._add_point('.'.join(key), 'float', -0.0 if value is None else float(value), ts, tags) def add_error(self, value, key, tags, ts): self._add_point('.'.join(key), 'error', '' if value is None else str(value), ts, tags) def get_instrument(self, stream, ts=None, **tags): if ts is None: ts = int(time.time()) reply = self.query(None, int(ts) + 1, _measurement='_stream_', _field='on', stream=stream, single=1, **tags) if reply: entry = sorted(reply.values(), key=lambda r: r[0])[-1] return entry.tags.get('instrument', self.instrument), entry[0] return None, None def get_streams(self, instrument=None, ts=None, **tags): """get streams for one or all instruments :param instrument: None when looking for all instruments :param ts: the time or None when now :return: dict of or '0' when instrument is not known """ if ts is None: ts = int(time.time()) reply = self.query(None, int(ts) + 1, _measurement='_stream_', _field='on', single=1, instrument=instrument, **tags) all_entries = {} for entry in reply.values(): all_entries.setdefault(entry.tags.get('stream'), []).append(entry) result = {} for stream, entries in all_entries.items(): entry = sorted(entries, key=lambda r: r[0])[-1] if entry[1]: # on=True result[stream] = entry.tags return result def get_experiments(self, start=None, end=None, **tags): """get experiments (periods with the same device/stream/instrument combination) :param start, end: the time period :return: list of tuple(, , ) """ interval = 1 gap = 600 entries = {} end = time.time() + 60 for rows, key, (tags, _, _) in self.query_gen( start, end, _measurement='_stream_', _field='on', interval=interval, stream=None, device=None, instrument=None): stream = tags.get('stream') instrument = tags.get('instrument') device = tags.get('device') elist = entries.setdefault(instrument or stream, []) for row in rows: elist.append(row[:2] + (stream, device)) result = [] for ins, rows in entries.items(): rows.sort() current = {} # dict of [, , ] chunks = [current] for ts, flag, stream, device in rows: if flag: prev = current.get(stream) if prev: if device == prev[0] and prev[1] + gap < ts < prev[2] + gap: prev[2] = ts continue current = {} chunks.append(current) current[stream] = [device or stream, ts, end] else: prev = current.get(stream) if prev: prev[2] = ts prevchange = 0 prevdevices = {} # dict of for chunk in chunks: if chunk: devices = {k: v[0] for k, v in chunk.items() if v[0]} start = min(t[1] for t in chunk.values()) if start > prevchange + gap or any(v != devices.get(k) for k, v in prevdevices.items()): prevchange = start prevdevices = devices result.append((start, max(t[2] for t in chunk.values()), ins, devices)) result.sort() for start, end, ins, devices in result: print(' .. '.join(time.strftime('%Y-%m-%d-%H:%M', time.localtime(t)) for t in (start, end)), ins, devices) return result def set_instrument(self, stream, value, ts=None, guess=True, **tags): """set stream and instrument on or off :param stream: the uri of the stream :param value: instrument, "0" when unknown or None when switching to off :param ts: the time or None when now :param guess: when instrument is undefined, take from previous """ prev, prevts = self.get_instrument(stream, ts, **tags) if prevts is not None: if prev in (None, '0'): ts = prevts + 0.001 else: if value == '0' and guess: value = prev if ts < prevts: ts = prevts + 0.001 tags['stream'] = stream if value: tags['instrument'] = value flag = True else: tags['instrument'] = prev or '0' flag = False self._add_point('_stream_', 'on', flag, ts, tags) def add_stream(self, value, tags, key, ts): self.set_instrument(key, value, ts, **tags)