import time import logging from pathlib import Path from configparser import ConfigParser from sehistory.influx import InfluxDBWrapper, abs_range, round_range, Table ETERNITY = 1e10 def fmtime(t): if t is None: return ' ' * 19 if t >= ETERNITY: return '-' * 19 return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(int(t))) def sameday(ts1, ts2): return time.localtime(ts1)[:3] == time.localtime(ts2)[:3] def summarize_tags(curves, remove_multiple=False): """summarize tags :param curves: list of curves (type Table) :param remove_multiple: True: remove non-unique values :return: dict of comma separated values """ result = {} for curve in curves: for k, v in curve.tags.items(): result.setdefault(k, set()).add(str(v)) if remove_multiple: return {k: ','.join(v) for k, v in result.items() if len(v) == 1} return {k: ','.join(v) for k, v in result.items()} class SEHistory(InfluxDBWrapper): def __init__(self, dbname=None, instrument=None, access='readonly'): self.instrument = instrument parser = ConfigParser() parser.optionxform = str parser.read([Path('~/.config/sehistory').expanduser()]) section = parser[dbname] if dbname else parser[parser.sections()[0]] self.has_local = parser.has_section('local') self.instrument_by_stream = {} super().__init__(*(section[k] for k in ('uri', 'token', 'org', 'bucket')), access=access) def curves(self, start=None, stop=None, measurement=('*.value', '*.target'), field='float', interval=None, add_prev=3600, add_end=False, merge=None, pivot=False, **kwds): """get curves :param start: start time (default: since ever) :param stop: end time (default: eternity = 1 year in the future) :param measurement: '.' (default: ['*.value', '*.target']) :param field: default 'float' (only numeric curves) :param interval: if given, the result is binned :param add_prev: amount of time to look back for the last previous point (default: 1 hr) :param add_end: whether to add endpoint at stop time (default: False) :param merge: None: no merge happens, else curves with the same final key are merged. 2 cases: one key (str): the name of final key. result will be a dict of of a tuple of keys: the names of the final key elements. result: dict of of Table :param pivot: sort values in to columns of one big table :param kwds: further selection criteria :return: a dict of
or where
is a list of tuples with some meta info (table.tags, table.column_names) and is a list (a single row of a table) with the same meta info when _field='float' (the default), the returned values are either a floats or None """ tags = {k: v for k, v in (('_measurement', measurement), ('_field', field)) if v is not None} tags.update(kwds) start, stop = abs_range(start, stop) rstart, rstop = round_range(start, stop, interval) if rstart < rstop: result = self.query(rstart, rstop, interval, columns=None, **tags) # result = self.query(rstart, rstop, interval, columns=['stream', 'device'], **tags) else: result = {} start_row = {} if add_prev: prev_data = self.query(rstart - add_prev, rstart, single=1, **tags) for key, table in prev_data.items(): curve = result.get(key) first = table[0] if first[1] is not None: if curve: if first[0] < curve[0][0]: if pivot: curve.insert(0, (rstart,) + tuple(first[1:])) # start_row.setdefault(key[1:], {})[key[0]] = first[1] else: curve.insert(0, tuple(first)) else: result[key] = table if add_end: self.complete(result, stop) if merge: single_merge = isinstance(merge, str) if single_merge: merge = [merge] rows = [] common_tags = summarize_tags(result.values(), True) col_keys = {} col_info = {} for key, curve in result.items(): merge_tags = {k: curve.tags.get(k, '') for k in merge} for k, v in zip(curve.key_names, key): merge_tags.setdefault(k, v) merge_key = tuple(zip(*merge_tags.items())) # (, ) info = col_info.get(merge_key) if info is None: col_idx = len(col_keys) + 1 col_keys[col_idx] = merge_key col_info[merge_key] = info = [col_idx, []] else: col_idx = info[0] info[1].append(curve) assert curve.column_names[1] == '_value' for row in curve: rows.append((row[0], row[1], col_idx)) # merged.append((merge_key, curve)) rows.sort(key=lambda x: x[0]) if pivot: header = [] for merge_key, (col_idx, curves) in col_info.items(): tags = summarize_tags(curves) primary = tags.pop(merge[0], None) primary = [primary] if primary else [] header.append(' '.join(primary + [f"{k}={v}" for k, v in tags.items() if k not in common_tags])) result = Table(common_tags, (), ('_time',) + tuple(header)) values = [0] + [None] * len(col_keys) for row in rows: col_nr = row[2] values[col_nr] = row[1] if row[0] > values[0]: values[0] = row[0] result.append(tuple(values)) elif row[0] < values[0]: raise ValueError(f'{rstart} {values[0]} {row[0]}') else: result = {} by_idx = {} for merge_key, (col_idx, curves) in col_info.items(): tags = summarize_tags(curves) primary = tags.get(merge[0], '_value') final_key = primary if single_merge else merge_key[1][:len(merge)] table = result.get(final_key) if table is None: result[final_key] = table = Table(tags, merge_key[0], ('_time', primary)) else: table.tags.update(tags) # for previously undefined units by_idx[col_idx] = table for row in rows: by_idx[row[2]].append((row[0], row[1])) return result @staticmethod def complete(curve_dict, end_time=0, tag=None): """complete to end_time if tag is given, end_time is a dict of """ if tag is None: end_time_dict = {} else: end_time_dict, end_time = end_time, 0 for curve in curve_dict.values(): if len(curve): tlast, value = curve[-1] etime = end_time_dict.get(curve.tags.get(tag), end_time) if value is not None and tlast < etime: curve.append((etime, value)) def export(self, start, stop, measurement=('*.value', '*.target'), field='float', interval=None, add_prev=3600, add_end=False, timeoffset=None, none='', **tags): result = self.curves(start, stop, measurement, field, interval, add_prev, add_end, merge=('_measurement', 'device', 'stream'), pivot=True, **tags) if timeoffset is None: timeoffset = int(start) result.tags.pop('_field', None) rows = [f"# {' '.join(f'{k}={v}' for k, v in result.tags.items())}"] rows.extend(f'# col {i} {k}' for i, k in enumerate(result.column_names)) rows.extend(result.to_csv_rows(timeoffset, none=none)) rows.append('') return '\n'.join(rows) def add_float(self, value, key, tags, ts): self._add_point('.'.join(key), 'float', -0.0 if value is None else float(value), ts, tags) def add_error(self, value, key, tags, ts): self._add_point('.'.join(key), 'error', '' if value is None else str(value), ts, tags) def get_streams(self, instrument=None, stream=None, device=None, start=None, end=None, include_finished=False, **tags): """get streams for one or all instruments :param instrument: None when looking for all instruments :param stream: None, a comma separated string or a sequence of streams :param device: None or a comma separated string :param start: None or start time. None means 'since ever' :param end: None or end time. None means now (or more precise 'in a year') :param include_finished: whether finished streams are to be included (False by default) :return: dict of tags Remark: an assignment of an instrument to a stream persists, even when the stream gets off """ if end is not None: end = end + 1 if stream: if isinstance(stream, str): stream = stream.split(',') else: stream = list(stream) tags['stream'] = stream[0] if len(stream) == 1 else stream if device: tags['device'] = device before = self.query(None, start, _measurement='_stream_', _field='on', instrument=instrument, single=1, **tags) if start == end: during = {} else: during = self.query(start, end, _measurement='_stream_', _field='on', instrument=instrument, **tags) all_entries = {} # print('during', set(during)) # for key, table in before.items(): # print(key, type(table)) for key in set(before) | set(during): for tables in (before, during): table = tables.get(key, ()) if table: stream = table.tags.get('stream') for row in table: all_entries.setdefault(stream, []).append((row, table.tags)) result = {} for stream in sorted(all_entries): entries = all_entries[stream] current = None last = None instrument = None for entry in sorted(entries, key=lambda e: e[0][0]): (ts, flag), tags = entry if flag: if 'instrument' in tags: instrument = tags['instrument'] elif instrument: # keep instrument from previous assignments tags['instrument'] = instrument current = [ts, tags] elif current: if ts > current[0] + 60: # else its probably not containing real data last = current current = None if include_finished: current = last if current: tags = current[1] ins = tags.get('instrument') if ins == '0': tags.pop('instrument') result[stream] = tags return result def get_experiments(self, start=None, end=None, stream=None, **tags): """get experiments (periods with the same device/stream/instrument combination) :param start: start of time period :param end: end of time period :return: dict of dict of list of [start, end] where is ('instrument', ) or ('stream', ) and is a tuple (tuple of streams, tuple of devices) """ interval = 1 gap = 600 if start is None: previous = {} else: previous = self.query(None, start, _measurement='_stream_', _field='on', interval=interval, stream=stream, device=None, instrument=None, single=1, **tags) if end is None: nextrows = {} else: nextrows = self.query(end, None, _measurement='_stream_', _field='on', interval=interval, stream=stream, device=None, instrument=None, single=-1, **tags) start, end = abs_range(start, end) inperiod = self.query(start, end, _measurement='_stream_', _field='on', interval=interval, stream=stream, device=None, instrument=None, **tags) # prepend previous to the items in period or create if not there for key, rows in previous.items(): if key in inperiod: inperiod[key].insert(0, rows[0]) else: inperiod[key] = rows # append items after the ones in period, ignoring keys not present in period # in the same go, create by_stream dict, joining tables with common stream by_stream = {} # dict of [, , , ] for key, table in inperiod.items(): nextrow = nextrows.get(key) if nextrow and not nextrow[0][1]: table.extend(nextrow) stream, instrument, device = [table.tags.get(k, '') for k in ('stream', 'instrument', 'device')] elist = by_stream.setdefault(stream, []) for row in table: elist.append([row[0], row[1], stream, device, instrument]) # combine now by either instrument or stream, if instrument is undefined (='0') by_key = {} for stream, rows in by_stream.items(): rows.sort() instrument = '0' for row in rows: if not row[-1]: row[-1] = instrument else: instrument = row[-1] key = ('stream', stream) if instrument == '0' else ('instrument', instrument) by_key.setdefault(key, []).append(row) result = {} for key, rows in by_key.items(): devices = {} current = {} prevcombi = None chunk = None for ts, flag, stream, device, _ in rows: ts = max(start, int(ts)) if flag: devices[stream] = device else: devices.pop(stream, None) devcombi = tuple(zip(*sorted(devices.items()))) if devcombi != prevcombi: if prevcombi: # device(s) removed prevend = min(ts, chunk[1]) if prevend - chunk[0] < gap: current.pop(prevcombi) else: chunk[1] = prevend if devcombi: # device(s) added chunks = current.setdefault(devcombi, []) if chunks: prevbeg, prevend = chunks[-1] # merge when joining or started at the same day merge = prevend + gap > ts or sameday(prevbeg, ts) else: merge = False if merge: chunk = chunks[-1] chunk[1] = ETERNITY else: chunk = [ts, ETERNITY] chunks.append(chunk) else: chunk = None prevcombi = devcombi if current: result[key] = current return result def get_instrument(self, stream, ts=None, **tags): """get assigned instrument and stream state :param stream: the stream to look up :param ts: the time ot None for 'now' :param tags: tags to further filter the result :return: , where is the instrument assigned to stream at ts amd is the time when this stream was on last time before ts or None when it is off """ if ts is None: ts = int(time.time()) reply = self.query(None, int(ts) + 1, _measurement='_stream_', _field='on', stream=stream, single=1, **tags) instrument, lastts = None, None if reply: for table in sorted(reply.values(), key=lambda r: r[0][0]): ins = table.tags.get('instrument') row = table[-1] lastts = row[0] if row[1] else None if ins: instrument = None if ins == '0' else ins return instrument, lastts def set_instrument(self, stream, instrument, ts=None, **tags): """set stream and instrument on or off :param stream: the uri of the stream :param instrument: or "0" to unassign the instrument or None when switching the stream off :param ts: the time or None when now """ flag = instrument is not None if flag and instrument != '0': # in case an real instrument is given and no instrument is assigned yet # we predate the assignment to the start of the stream try: previns, prevts = self.get_instrument(stream, ts, **tags) if prevts is not None and (previns is None or (ts or ETERNITY) < prevts): ts = prevts + 0.001 except Exception as e: logging.warning('Exception in get_instrument: %r', e) tags['stream'] = stream if flag: tags['instrument'] = instrument self._add_point('_stream_', 'on', flag, ts, tags) def remove_experiment(self, stream, ts=None, **tags): if ts is not None: ts += 1 reply = self.query(None, ts, _measurement='_stream_', _field='on', stream=stream, single=1, **tags) tagset = set(tags) | {'stream'} tagset.discard('instrument') for table in reply.values(): ts, flag = table[-1][:2] if flag: addtags = {k: v for k, v in table.tags.items() if k not in {'instrument', '_measurement', '_field'}} self._add_point('_stream_', 'on', False, ts + 0.001, addtags) self.flush() def add_stream(self, value, tags, key, ts): if value == '': # unknown instrument value = self.instrument or self.instrument_by_stream.get(key, '0') self.set_instrument(key, value, ts, **tags)