diff --git a/seinflux.py b/seinflux.py index 17b1723..fdabf27 100644 --- a/seinflux.py +++ b/seinflux.py @@ -175,16 +175,6 @@ class SEHistory(InfluxDBWrapper): def add_error(self, value, key, tags, ts): self._add_point('.'.join(key), 'error', '' if value is None else str(value), ts, tags) - def get_instrument(self, stream, ts=None, **tags): - if ts is None: - ts = int(time.time()) - reply = self.query(None, int(ts) + 1, _measurement='_stream_', _field='on', - stream=stream, single=1, **tags) - if reply: - entry = sorted(reply.values(), key=lambda r: r[0][0])[-1][0] - return entry.tags.get('instrument', self.instrument), entry[0] - return None, None - def get_streams(self, instrument=None, stream=None, device=None, start=None, end=None, **tags): """get streams for one or all instruments @@ -193,11 +183,12 @@ class SEHistory(InfluxDBWrapper): :param device: None or a comma separated string :param start: None or start time. None means 'since ever' :param end: None or end time. None means now (or more precise 'in a year') - :return: dict of or '0' when instrument is not known + :return: dict of tags + + Remark: an assignment of an instrument to a stream persists, even when the stream gets off """ if end is not None: end = end + 1 - end = int(time.time()) if stream: if isinstance(stream, str): stream = stream.split(',') @@ -227,18 +218,26 @@ class SEHistory(InfluxDBWrapper): result = {} for stream, entries in all_entries.items(): current = None + instrument = None for entry in sorted(entries, key=lambda e: e[0][0]): - ts, flag = entry[0] + (ts, flag), tags = entry if flag: - current = entry + if 'instrument' in tags: + instrument = tags['instrument'] + elif instrument: + # keep instrument from previous assignments + tags['instrument'] = instrument + current = [ts, tags] elif current: - (lastts, _), tags = current - if ts > lastts + 60: # at least one minute - result[stream] = tags - break - else: # or on at end - if current: - result[stream] = current[1] + if ts < current[0] + 60: + # probably not containing real data + current = None + if current: + tags = current[1] + ins = tags.get('instrument') + if ins == '0': + tags.pop('instrument') + result[stream] = tags return result def get_experiments(self, start=None, end=None, stream=None, **tags): @@ -254,7 +253,6 @@ class SEHistory(InfluxDBWrapper): interval = 1 gap = 600 eternity = 1e10 - entries = {} if start is None: previous = {} else: @@ -274,24 +272,34 @@ class SEHistory(InfluxDBWrapper): else: inperiod[key] = rows + by_stream = {} # dict of [, , , ] for key, table in inperiod.items(): nextrow = nextrows.get(key) if nextrow and not nextrow[1]: table.extend(nextrow) stream, instrument, device = [table.tags.get(k, '') for k in ('stream', 'instrument', 'device')] - key = ('instrument', instrument) if instrument else ('stream', stream) - elist = entries.setdefault(key, []) + elist = by_stream.setdefault(stream, []) for row in table: - print('S', stream, device, key, row[1], fmtime(row[0])) - elist.append(row[:2] + (stream, device)) - result = {} # dict of dict of list of [start, end] - for key, rows in entries.items(): + print('S', stream, instrument, device, row[1], fmtime(row[0])) + elist.append([row[0], row[1], stream, device, instrument]) + by_key = {} + for stream, rows in by_stream.items(): rows.sort() + instrument = '0' + for row in rows: + if row[-1] is None: + row[-1] = instrument + else: + instrument = row[-1] + key = ('stream', stream) if instrument == '0' else ('instrument', instrument) + by_key.setdefault(key, []).append(rows) + result = {} + for key, rows in by_key.items(): devices = {} current = {} combi = None chunk = None - for ts, flag, stream, device in rows: + for ts, flag, stream, device, _ in rows: ts = max(start, int(ts)) if flag: devices[stream] = device @@ -325,34 +333,54 @@ class SEHistory(InfluxDBWrapper): result[key] = current return result - def set_instrument(self, stream, value, ts=None, guess=True, **tags): + def get_instrument(self, stream, ts=None, **tags): + """get assigned instrument and stream state + + :param stream: the stream to look up + :param ts: the time ot None for 'now' + :param tags: tags to further filter the result + + :return: , + where is the instrument assigned to stream at ts + amd is the time when this stream was on last time before ts or None when it is off + """ + if ts is None: + ts = int(time.time()) + reply = self.query(None, int(ts) + 1, _measurement='_stream_', _field='on', + stream=stream, single=1, **tags) + instrument, lastts = None, None + if reply: + for table in sorted(reply.values(), key=lambda r: r[0][0]): + ins = table.tags.get('instrument') + row = table[-1] + lastts = row[0] if row[1] else None + if ins: + instrument = None if ins == '0' else ins + return instrument, lastts + + def set_instrument(self, stream, value, ts=None, **tags): """set stream and instrument on or off :param stream: the uri of the stream - :param value: instrument, "0" when unknown or None when switching to off + :param value: instrument, "0" to unassign the instrument or None when switching the stream off :param ts: the time or None when now - :param guess: when instrument is undefined, take from previous """ + flag = bool(value) try: - prev, prevts = self.get_instrument(stream, ts, **tags) - if prevts is not None: - if prev in (None, '0'): + previns, prevts = self.get_instrument(stream, ts, **tags) + if prevts is None: + if not flag: + return # no change + else: + if previns == value and flag: + return # no change + if previns is None or ts < prevts: ts = prevts + 0.001 - else: - if value == '0' and guess: - value = prev - if ts < prevts: - ts = prevts + 0.001 except Exception as e: print(f'Exception in get_instrument {e!r}') - pass tags['stream'] = stream - if value: + if flag: tags['instrument'] = value - flag = True - else: - tags['instrument'] = prev or '0' - flag = False self._add_point('_stream_', 'on', flag, ts, tags) def add_stream(self, value, tags, key, ts): diff --git a/t.py b/t.py index c84b7cf..f2a2793 100644 --- a/t.py +++ b/t.py @@ -17,11 +17,9 @@ crv([start], [stop], [mod.par], ['float'], [interval=...,] [add_prev=False,] [ad now = int(time.time()) result = {} -maxcurves = 7 -maxpoints = 7 -def prt(): +def prt(maxpoints=7, maxcurves=7): for i, (key, curve) in enumerate(result.items()): if i > maxcurves: print('--- ...') @@ -40,21 +38,20 @@ def prt(): print(fmtime(row[0]), row[1:]) -def qry(*args, **kwds): +def qry(*args, maxpoints=7, maxcurves=7, **kwds): result.clear() result.update(db.query(*args, **kwds)) - print('PRINT') - prt() + prt(maxpoints, maxcurves) -def crv(*args, **kwds): +def crv(*args, maxpoints=7, maxcurves=7, **kwds): result.clear() res = db.curves(*args, **kwds) if isinstance(res, list): result[()] = res else: result.update(res) - prt() + prt(maxpoints, maxcurves) def sry(prectime=False):