import time from pathlib import Path from configparser import ConfigParser from sehistory.influx import InfluxDBWrapper, abs_range, round_range, Table def fmtime(t): return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(int(t))) def summarize_tags(curves, remove_multiple=False): """summarize tags :param curves: list of curves (type Table) :param remove_multiple: True: remove non-unique values :return: dict of comma separated values """ result = {} for curve in curves: for k, v in curve.tags.items(): result.setdefault(k, set()).add(str(v)) if remove_multiple: return {k: ','.join(v) for k, v in result.items() if len(v) == 1} return {k: ','.join(v) for k, v in result.items()} class SEHistory(InfluxDBWrapper): def __init__(self, dbname=None, instrument=None, access='readonly'): self.instrument = instrument parser = ConfigParser() parser.optionxform = str parser.read([Path('~/.sehistory').expanduser()]) section = parser[dbname] if dbname else parser[parser.sections()[0]] super().__init__(*(section[k] for k in ('uri', 'token', 'org', 'bucket')), access=access) def curves(self, start=None, stop=None, measurement=('*.value', '*.target'), field='float', interval=None, add_prev=3600, add_end=False, merge=None, pivot=False, **kwds): """get curves :param start: start time (default: since ever) :param stop: end time (default: eternity = 1 year in the future) :param measurement: '.' (default: ['*.value', '*.target']) :param field: default 'float' (only numeric curves) :param interval: if given, the result is binned :param add_prev: amount of time to look back for the last previous point (default: 1 hr) :param add_end: whether to add endpoint at stop time (default: False) :param merge: None: no merge happens, else curves with the same final key are merged. 2 cases: one key (str): the name of final key. result will be a dict of of a tuple of keys: the names of the final key elements. result: dict of of Table :param pivot: sort values in to columns of one big table :param kwds: further selection criteria :return: a dict of
or where
is a list of tuples with some meta info (table.tags, table.column_names) and is a list (a single row of a table) with the same meta info when _field='float' (the default), the returned values are either a floats or None """ tags = {k: v for k, v in (('_measurement', measurement), ('_field', field)) if v is not None} tags.update(kwds) start, stop = abs_range(start, stop) rstart, rstop = round_range(start, stop, interval) if rstart < rstop: result = self.query(rstart, rstop, interval, columns=None, **tags) # result = self.query(rstart, rstop, interval, columns=['stream', 'device'], **tags) else: result = {} start_row = {} if add_prev: prev_data = self.query(rstart - add_prev, rstart, single=1, **tags) for key, table in prev_data.items(): curve = result.get(key) first = table[0] if first[1] is not None: if curve: if first[0] < curve[0][0]: if pivot: curve.insert(0, (rstart,) + tuple(first[1:])) # start_row.setdefault(key[1:], {})[key[0]] = first[1] else: curve.insert(0, tuple(first)) else: result[key] = table if add_end: self.complete(result, stop) if merge: single_merge = isinstance(merge, str) if single_merge: merge = [merge] rows = [] common_tags = summarize_tags(result.values(), True) col_keys = {} col_info = {} for key, curve in result.items(): merge_tags = {k: curve.tags.get(k, '') for k in merge} for k, v in zip(curve.key_names, key): merge_tags.setdefault(k, v) merge_key = tuple(zip(*merge_tags.items())) # (, ) info = col_info.get(merge_key) if info is None: col_idx = len(col_keys) + 1 col_keys[col_idx] = merge_key col_info[merge_key] = info = [col_idx, []] else: col_idx = info[0] info[1].append(curve) assert curve.column_names[1] == '_value' for row in curve: rows.append((row[0], row[1], col_idx)) # merged.append((merge_key, curve)) rows.sort(key=lambda x: x[0]) if pivot: header = [] for merge_key, (col_idx, curves) in col_info.items(): tags = summarize_tags(curves) primary = tags.pop(merge[0], None) primary = [primary] if primary else [] header.append(' '.join(primary + [f"{k}={v}" for k, v in tags.items() if k not in common_tags])) result = Table(common_tags, (), ('_time',) + tuple(header)) values = [0] + [None] * len(col_keys) for row in rows: col_nr = row[2] values[col_nr] = row[1] if row[0] > values[0]: values[0] = row[0] result.append(tuple(values)) elif row[0] < values[0]: raise ValueError(f'{rstart} {values[0]} {row[0]}') else: result = {} by_idx = {} for merge_key, (col_idx, curves) in col_info.items(): tags = summarize_tags(curves) primary = tags.get(merge[0], '_value') table = Table(tags, merge_key[0], ('_time', primary)) result[primary if single_merge else merge_key[1][:len(merge)]] = table by_idx[col_idx] = table for row in rows: by_idx[row[2]].append((row[0], row[1])) return result @staticmethod def complete(curve_dict, end_time=0, tag=None): """complete to end_time if tag is given, end_time is a dict of """ if tag is None: end_time_dict = {} else: end_time_dict, end_time = end_time, 0 for curve in curve_dict.values(): if len(curve): tlast, value = curve[-1] etime = end_time_dict.get(curve.tags.get(tag), end_time) if value is not None and tlast < etime: curve.append((etime, value)) def export(self, start, stop, measurement=('*.value', '*.target'), field='float', interval=None, add_prev=3600, add_end=False, timeoffset=None, none='', **tags): result = self.curves(start, stop, measurement, field, interval, add_prev, add_end, merge=('_measurement', 'device', 'stream'), pivot=True, **tags) if timeoffset is None: timeoffset = int(start) result.tags.pop('_field', None) rows = [f"# {' '.join(f'{k}={v}' for k, v in result.tags.items())}"] rows.extend(f'# col {i} {k}' for i, k in enumerate(result.column_names)) rows.extend(result.to_csv_rows(timeoffset, none=none)) rows.append('') return '\n'.join(rows) def add_float(self, value, key, tags, ts): self._add_point('.'.join(key), 'float', -0.0 if value is None else float(value), ts, tags) def add_error(self, value, key, tags, ts): self._add_point('.'.join(key), 'error', '' if value is None else str(value), ts, tags) def get_streams(self, instrument=None, stream=None, device=None, start=None, end=None, include_finished=False, **tags): """get streams for one or all instruments :param instrument: None when looking for all instruments :param stream: None, a comma separated string or a sequence of streams :param device: None or a comma separated string :param start: None or start time. None means 'since ever' :param end: None or end time. None means now (or more precise 'in a year') :param include_finished: whether finished streams are to be included (False by default) :return: dict of tags Remark: an assignment of an instrument to a stream persists, even when the stream gets off """ if end is not None: end = end + 1 if stream: if isinstance(stream, str): stream = stream.split(',') else: stream = list(stream) tags['stream'] = stream[0] if len(stream) == 1 else stream if device: tags['device'] = device before = self.query(None, start, _measurement='_stream_', _field='on', instrument=instrument, single=1, **tags) if start == end: during = {} else: during = self.query(start, end, _measurement='_stream_', _field='on', instrument=instrument, **tags) all_entries = {} # print('during', set(during)) # for key, table in before.items(): # print(key, type(table)) for key in set(before) | set(during): for tables in (before, during): table = tables.get(key, ()) if table: stream = table.tags.get('stream') for row in table: all_entries.setdefault(stream, []).append((row, table.tags)) result = {} for stream, entries in all_entries.items(): current = None instrument = None for entry in sorted(entries, key=lambda e: e[0][0]): (ts, flag), tags = entry if flag: if 'instrument' in tags: instrument = tags['instrument'] elif instrument: # keep instrument from previous assignments tags['instrument'] = instrument current = [ts, tags] elif current: if ts < current[0] + 60: # probably not containing real data current = None if current or include_finished: tags = current[1] ins = tags.get('instrument') if ins == '0': tags.pop('instrument') result[stream] = tags return result def get_experiments(self, start=None, end=None, stream=None, **tags): """get experiments (periods with the same device/stream/instrument combination) :param start: start of time period :param end: end of time period :return: dict of dict of list of [start, end] where is ('instrument', ) or ('stream', ) and is a tuple (tuple of streams, tuple of devices) """ interval = 1 gap = 600 eternity = 1e10 if start is None: previous = {} else: previous = self.query(None, start, _measurement='_stream_', _field='on', interval=interval, stream=stream, device=None, instrument=None, single=1, **tags) if end is None: nextrows = {} else: nextrows = self.query(end, None, _measurement='_stream_', _field='on', interval=interval, stream=stream, device=None, instrument=None, single=-1, **tags) start, end = abs_range(start, end) inperiod = self.query(start, end, _measurement='_stream_', _field='on', interval=interval, stream=stream, device=None, instrument=None, **tags) for key, rows in previous.items(): if key in inperiod: inperiod[key].insert(0, rows[0]) else: inperiod[key] = rows by_stream = {} # dict of [, , , ] for key, table in inperiod.items(): nextrow = nextrows.get(key) if nextrow and not nextrow[1]: table.extend(nextrow) stream, instrument, device = [table.tags.get(k, '') for k in ('stream', 'instrument', 'device')] elist = by_stream.setdefault(stream, []) for row in table: elist.append([row[0], row[1], stream, device, instrument]) by_key = {} for stream, rows in by_stream.items(): rows.sort() instrument = '0' for row in rows: if row[-1] is None: row[-1] = instrument else: instrument = row[-1] key = ('stream', stream) if instrument == '0' else ('instrument', instrument) by_key.setdefault(key, []).append(row) result = {} for key, rows in by_key.items(): devices = {} current = {} combi = None chunk = None for ts, flag, stream, device, _ in rows: ts = max(start, int(ts)) if flag: devices[stream] = device else: device = None devices.pop(device, None) devcombi = tuple(zip(*devices.items())) if devcombi != combi: print('D', devcombi, fmtime(ts)) if combi: prevend = min(ts, chunk[1]) if prevend - chunk[0] < gap: current.pop(combi) else: chunk[1] = prevend if devcombi: chunks = current.setdefault(devcombi, []) if chunks and time.localtime(chunks[-1][0])[:3] == time.localtime(ts)[:3]: # merge when started at the same day chunk = chunks[-1][1] chunk[1] = eternity print('EXTEND', devcombi, fmtime(ts)) else: print('APPEND', devcombi, fmtime(ts)) chunk = [ts, eternity] chunks.append(chunk) else: chunk = None combi = devcombi if current: result[key] = current return result def get_instrument(self, stream, ts=None, **tags): """get assigned instrument and stream state :param stream: the stream to look up :param ts: the time ot None for 'now' :param tags: tags to further filter the result :return: , where is the instrument assigned to stream at ts amd is the time when this stream was on last time before ts or None when it is off """ if ts is None: ts = int(time.time()) reply = self.query(None, int(ts) + 1, _measurement='_stream_', _field='on', stream=stream, single=1, **tags) instrument, lastts = None, None if reply: for table in sorted(reply.values(), key=lambda r: r[0][0]): ins = table.tags.get('instrument') row = table[-1] lastts = row[0] if row[1] else None if ins: instrument = None if ins == '0' else ins return instrument, lastts def set_instrument(self, stream, value, ts=None, **tags): """set stream and instrument on or off :param stream: the uri of the stream :param value: instrument, "0" to unassign the instrument or None when switching the stream off :param ts: the time or None when now """ flag = bool(value) try: previns, prevts = self.get_instrument(stream, ts, **tags) if prevts is None: if not flag: return # no change else: if previns == value and flag: return # no change if previns is None or ts < prevts: ts = prevts + 0.001 except Exception as e: print(f'Exception in get_instrument {e!r}') tags['stream'] = stream if flag: tags['instrument'] = value self._add_point('_stream_', 'on', flag, ts, tags) def add_stream(self, value, tags, key, ts): self.set_instrument(key, value, ts, **tags)