From a7797cbe6cdc654b556f0fe0094be16476ce0162 Mon Sep 17 00:00:00 2001 From: Markus Zolliker Date: Thu, 27 Feb 2025 10:42:15 +0100 Subject: [PATCH] add get_experiments --- seinflux.py | 85 ++++++++++++++++++++++++++++++----------------------- t.py | 2 +- 2 files changed, 49 insertions(+), 38 deletions(-) diff --git a/seinflux.py b/seinflux.py index 981988f..c99bf35 100644 --- a/seinflux.py +++ b/seinflux.py @@ -1,7 +1,11 @@ import time from pathlib import Path from configparser import ConfigParser -from influx import InfluxDBWrapper, abs_range, round_range, Table +from .influx import InfluxDBWrapper, abs_range, round_range, Table + + +def fmtime(t): + return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(int(t))) def summarize_tags(curves, remove_multiple=False): @@ -127,7 +131,6 @@ class SEHistory(InfluxDBWrapper): by_idx = {} for merge_key, (col_idx, curves) in col_info.items(): tags = summarize_tags(curves) - print('TAGS', tags) primary = tags.get(merge[0], '_value') table = Table(tags, merge_key[0], ('_time', primary)) result[primary if single_merge else merge_key[1][:len(merge)]] = table @@ -199,16 +202,19 @@ class SEHistory(InfluxDBWrapper): result = {} for stream, entries in all_entries.items(): entry = sorted(entries, key=lambda r: r[0])[-1] + print('E', stream, entry, sorted(entries, key=lambda r: r[0])) if entry[1]: # on=True result[stream] = entry.tags return result - def get_experiments(self, start=None, end=None, **tags): + def get_experiments(self, start=None, end=None, stream=None, **tags): """get experiments (periods with the same device/stream/instrument combination) :param start: start of time period :param end: end of time period - :return: list of tuple((, ), ('instrument' or 'stream', ), dict of ) + :return: dict of dict of list of [start, end] + where is ('instrument', ) or ('stream', ) + and is a tuple (tuple of streams, tuple of devices) """ interval = 1 @@ -219,15 +225,15 @@ class SEHistory(InfluxDBWrapper): previous = {} else: previous = self.query(None, start, _measurement='_stream_', _field='on', interval=interval, - stream=None, device=None, instrument=None, single=1) + stream=stream, device=None, instrument=None, single=1, **tags) if end is None: nextrow = {} else: nextrow = self.query(end, None, _measurement='_stream_', _field='on', interval=interval, - stream=None, device=None, instrument=None, single=-1) + stream=stream, device=None, instrument=None, single=-1, **tags) start, end = abs_range(start, end) inperiod = self.query(start, end, _measurement='_stream_', _field='on', interval=interval, - stream=None, device=None, instrument=None) + stream=stream, device=None, instrument=None, **tags) for key, single in previous.items(): if key in inperiod: inperiod[key].insert(0, tuple(single)) @@ -242,42 +248,47 @@ class SEHistory(InfluxDBWrapper): key = ('instrument', instrument) if instrument else ('stream', stream) elist = entries.setdefault(key, []) for row in table: + print('S', stream, device, key, row[1], fmtime(row[0])) elist.append(row[:2] + (stream, device)) - result = [] + result = {} # dict of dict of list of [start, end] for key, rows in entries.items(): rows.sort() - current = {} # dict of [, , ] - chunks = [current] + devices = {} + current = {} + combi = None + chunk = None for ts, flag, stream, device in rows: + ts = max(start, int(ts)) if flag: - prev = current.get(stream) - if prev: - if device == prev[0] and prev[1] + gap < ts < prev[2] + gap: - prev[2] = ts - continue - current = {} - chunks.append(current) - current[stream] = [device or stream, ts, eternity] + devices[stream] = device else: - prev = current.get(stream) - if prev: - prev[2] = ts - prevexpt = [0, 0] - prevdevices = {} # dict of - for chunk in chunks: - if chunk: - devices = {k: v[0] for k, v in chunk.items() if v[0]} - beg = min(t[1] for t in chunk.values()) - if beg > prevexpt[0] + gap or any(v != devices.get(k) for k, v in prevdevices.items()): - prevdevices = devices - if prevexpt[1] > beg: - prevexpt[1] = beg # shorten previous - prevexpt = [beg, max(t[2] for t in chunk.values()), key, devices] - result.append(prevexpt) - result.sort() - for expt in result: - if expt[-1] == eternity: - expt[-1] = time.time() + device = None + devices.pop(device, None) + devcombi = tuple(zip(*devices.items())) + if devcombi != combi: + print('D', devcombi, fmtime(ts)) + if combi: + prevend = min(ts, chunk[1]) + if prevend - chunk[0] < gap: + current.pop(combi) + else: + chunk[1] = prevend + if devcombi: + chunks = current.setdefault(devcombi, []) + if chunks and time.localtime(chunks[-1][0])[:3] == time.localtime(ts)[:3]: + # merge when started at the same day + chunk = chunks[-1][1] + chunk[1] = eternity + print('EXTEND', devcombi, fmtime(ts)) + else: + print('APPEND', devcombi, fmtime(ts)) + chunk = [ts, eternity] + chunks.append(chunk) + else: + chunk = None + combi = devcombi + if current: + result[key] = current return result def set_instrument(self, stream, value, ts=None, guess=True, **tags): diff --git a/t.py b/t.py index cc6a618..07af459 100644 --- a/t.py +++ b/t.py @@ -1,7 +1,7 @@ import time import math import numpy as np -from seinflux import SEHistory +from sehistory.seinflux import SEHistory from influx import RegExp DAY = 24 * 3600