From 31377dc93356d7a1cd26333c2c72568b48970162 Mon Sep 17 00:00:00 2001 From: l_samenv Date: Mon, 3 Mar 2025 08:34:13 +0100 Subject: [PATCH] remove Single type, rework get stream --- influx.py | 23 +++--------- seinflux.py | 106 +++++++++++++++++++++++++++++++++++----------------- t.py | 9 ++--- 3 files changed, 82 insertions(+), 56 deletions(-) diff --git a/influx.py b/influx.py index 7d2dbd0..52bc374 100644 --- a/influx.py +++ b/influx.py @@ -119,15 +119,6 @@ class Table(list): yield sep.join(result) -class Single(Table): - """a single row of a table, as a list with meta info""" - def __init__(self, tags=None, key_names=(), column_names=(), rows=None): - super().__init__(tags, key_names, column_names) - if rows: - single_row, = rows - self[:] = single_row - - class RegExp(str): """indicates, tht this string should be treated as regexp @@ -279,9 +270,10 @@ class InfluxDBWrapper: :param stop: end time (default: eternity = 1 year in the future) :param interval: if set an aggregation filter will be applied. This will return only the latest values per time interval in seconds. - :param single: when True (or 1), only the last value within the interval is returned + :param single: when not 0, only the last value within the interval is returned + the resulting tables have all exactly one row (for any existing combinations of tags!) - single=-1: return the first value instead + single < 0: return the first value instead :param columns: if given, return only these columns (in addition to '_time' and '_value') :param tags: selection criteria: =None @@ -300,12 +292,9 @@ class InfluxDBWrapper: """ result = {} for rows, key, props in self.query_gen(start, stop, interval, single, columns, **tags): - if single: - result[key] = Single(*props, rows=rows) - else: - table = Table(*props, rows=rows) - table.sort() - result[key] = table + table = Table(*props, rows=rows) + table.sort() + result[key] = table return result def query_gen(self, start=None, stop=None, interval=None, single=None, columns=None, **tags): diff --git a/seinflux.py b/seinflux.py index c99bf35..17b1723 100644 --- a/seinflux.py +++ b/seinflux.py @@ -1,7 +1,7 @@ import time from pathlib import Path from configparser import ConfigParser -from .influx import InfluxDBWrapper, abs_range, round_range, Table +from sehistory.influx import InfluxDBWrapper, abs_range, round_range, Table def fmtime(t): @@ -68,8 +68,9 @@ class SEHistory(InfluxDBWrapper): start_row = {} if add_prev: prev_data = self.query(rstart - add_prev, rstart, single=1, **tags) - for key, first in prev_data.items(): + for key, table in prev_data.items(): curve = result.get(key) + first = table[0] if first[1] is not None: if curve: if first[0] < curve[0][0]: @@ -79,8 +80,7 @@ class SEHistory(InfluxDBWrapper): else: curve.insert(0, tuple(first)) else: - result[key] = table = Table(first.tags, first.key_names, first.column_names) - table.append(tuple(first)) + result[key] = table if add_end: self.complete(result, stop) if merge: @@ -181,30 +181,64 @@ class SEHistory(InfluxDBWrapper): reply = self.query(None, int(ts) + 1, _measurement='_stream_', _field='on', stream=stream, single=1, **tags) if reply: - entry = sorted(reply.values(), key=lambda r: r[0])[-1] + entry = sorted(reply.values(), key=lambda r: r[0][0])[-1][0] return entry.tags.get('instrument', self.instrument), entry[0] return None, None - def get_streams(self, instrument=None, ts=None, **tags): + def get_streams(self, instrument=None, stream=None, device=None, start=None, end=None, **tags): """get streams for one or all instruments :param instrument: None when looking for all instruments - :param ts: the time or None when now + :param stream: None, a comma separated string or a sequence of streams + :param device: None or a comma separated string + :param start: None or start time. None means 'since ever' + :param end: None or end time. None means now (or more precise 'in a year') :return: dict of or '0' when instrument is not known """ - if ts is None: - ts = int(time.time()) - reply = self.query(None, int(ts) + 1, _measurement='_stream_', _field='on', - single=1, instrument=instrument, **tags) + if end is not None: + end = end + 1 + end = int(time.time()) + if stream: + if isinstance(stream, str): + stream = stream.split(',') + else: + stream = list(stream) + tags['stream'] = stream[0] if len(stream) == 1 else stream + if device: + tags['device'] = device + before = self.query(None, start, _measurement='_stream_', _field='on', + instrument=instrument, single=1, **tags) + if start == end: + during = {} + else: + during = self.query(start, end, _measurement='_stream_', _field='on', + instrument=instrument, **tags) all_entries = {} - for entry in reply.values(): - all_entries.setdefault(entry.tags.get('stream'), []).append(entry) + # print('during', set(during)) + # for key, table in before.items(): + # print(key, type(table)) + for key in set(before) | set(during): + for tables in (before, during): + table = tables.get(key, ()) + if table: + stream = table.tags.get('stream') + for row in table: + all_entries.setdefault(stream, []).append((row, table.tags)) result = {} for stream, entries in all_entries.items(): - entry = sorted(entries, key=lambda r: r[0])[-1] - print('E', stream, entry, sorted(entries, key=lambda r: r[0])) - if entry[1]: # on=True - result[stream] = entry.tags + current = None + for entry in sorted(entries, key=lambda e: e[0][0]): + ts, flag = entry[0] + if flag: + current = entry + elif current: + (lastts, _), tags = current + if ts > lastts + 60: # at least one minute + result[stream] = tags + break + else: # or on at end + if current: + result[stream] = current[1] return result def get_experiments(self, start=None, end=None, stream=None, **tags): @@ -227,23 +261,23 @@ class SEHistory(InfluxDBWrapper): previous = self.query(None, start, _measurement='_stream_', _field='on', interval=interval, stream=stream, device=None, instrument=None, single=1, **tags) if end is None: - nextrow = {} + nextrows = {} else: - nextrow = self.query(end, None, _measurement='_stream_', _field='on', interval=interval, - stream=stream, device=None, instrument=None, single=-1, **tags) + nextrows = self.query(end, None, _measurement='_stream_', _field='on', interval=interval, + stream=stream, device=None, instrument=None, single=-1, **tags) start, end = abs_range(start, end) inperiod = self.query(start, end, _measurement='_stream_', _field='on', interval=interval, stream=stream, device=None, instrument=None, **tags) - for key, single in previous.items(): + for key, rows in previous.items(): if key in inperiod: - inperiod[key].insert(0, tuple(single)) + inperiod[key].insert(0, rows[0]) else: - inperiod[key] = Table(rows=[tuple(single)], **single.__dict__) + inperiod[key] = rows for key, table in inperiod.items(): - nextvalue = nextrow.get(key) - if nextvalue and not nextvalue[1]: - table.append(tuple(nextvalue)) + nextrow = nextrows.get(key) + if nextrow and not nextrow[1]: + table.extend(nextrow) stream, instrument, device = [table.tags.get(k, '') for k in ('stream', 'instrument', 'device')] key = ('instrument', instrument) if instrument else ('stream', stream) elist = entries.setdefault(key, []) @@ -299,15 +333,19 @@ class SEHistory(InfluxDBWrapper): :param ts: the time or None when now :param guess: when instrument is undefined, take from previous """ - prev, prevts = self.get_instrument(stream, ts, **tags) - if prevts is not None: - if prev in (None, '0'): - ts = prevts + 0.001 - else: - if value == '0' and guess: - value = prev - if ts < prevts: + try: + prev, prevts = self.get_instrument(stream, ts, **tags) + if prevts is not None: + if prev in (None, '0'): ts = prevts + 0.001 + else: + if value == '0' and guess: + value = prev + if ts < prevts: + ts = prevts + 0.001 + except Exception as e: + print(f'Exception in get_instrument {e!r}') + pass tags['stream'] = stream if value: tags['instrument'] = value diff --git a/t.py b/t.py index 07af459..c84b7cf 100644 --- a/t.py +++ b/t.py @@ -1,7 +1,7 @@ import time import math import numpy as np -from sehistory.seinflux import SEHistory +from sehistory.seinflux import SEHistory, fmtime from influx import RegExp DAY = 24 * 3600 @@ -16,7 +16,6 @@ crv([start], [stop], [mod.par], ['float'], [interval=...,] [add_prev=False,] [ad """) now = int(time.time()) -offset = (now // 3600) * 3600 result = {} maxcurves = 7 maxpoints = 7 @@ -38,7 +37,7 @@ def prt(): print('...') else: for row in crv: - print(round(row[0] - offset, db.timedig), row[1:]) + print(fmtime(row[0]), row[1:]) def qry(*args, **kwds): @@ -97,9 +96,9 @@ def sry(prectime=False): for end, start, _, device, stream, pset in sorted(summary): if prectime: res = db.query(start, end, device=device, stream=stream, single=-1) - first = int(min(t[0] for t in res.values())) + first = int(min(t[0][0] for t in res.values())) res = db.query(start, end, device=device, stream=stream, single=1) - last = math.ceil(max(t[0] for t in res.values())) + last = math.ceil(max(t[0][0] for t in res.values())) tm1 = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(first)) tm2 = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(last)) else: