add get_experiments
This commit is contained in:
87
seinflux.py
87
seinflux.py
@ -1,7 +1,11 @@
|
|||||||
import time
|
import time
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from configparser import ConfigParser
|
from configparser import ConfigParser
|
||||||
from influx import InfluxDBWrapper, abs_range, round_range, Table
|
from .influx import InfluxDBWrapper, abs_range, round_range, Table
|
||||||
|
|
||||||
|
|
||||||
|
def fmtime(t):
|
||||||
|
return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(int(t)))
|
||||||
|
|
||||||
|
|
||||||
def summarize_tags(curves, remove_multiple=False):
|
def summarize_tags(curves, remove_multiple=False):
|
||||||
@ -127,7 +131,6 @@ class SEHistory(InfluxDBWrapper):
|
|||||||
by_idx = {}
|
by_idx = {}
|
||||||
for merge_key, (col_idx, curves) in col_info.items():
|
for merge_key, (col_idx, curves) in col_info.items():
|
||||||
tags = summarize_tags(curves)
|
tags = summarize_tags(curves)
|
||||||
print('TAGS', tags)
|
|
||||||
primary = tags.get(merge[0], '_value')
|
primary = tags.get(merge[0], '_value')
|
||||||
table = Table(tags, merge_key[0], ('_time', primary))
|
table = Table(tags, merge_key[0], ('_time', primary))
|
||||||
result[primary if single_merge else merge_key[1][:len(merge)]] = table
|
result[primary if single_merge else merge_key[1][:len(merge)]] = table
|
||||||
@ -199,16 +202,19 @@ class SEHistory(InfluxDBWrapper):
|
|||||||
result = {}
|
result = {}
|
||||||
for stream, entries in all_entries.items():
|
for stream, entries in all_entries.items():
|
||||||
entry = sorted(entries, key=lambda r: r[0])[-1]
|
entry = sorted(entries, key=lambda r: r[0])[-1]
|
||||||
|
print('E', stream, entry, sorted(entries, key=lambda r: r[0]))
|
||||||
if entry[1]: # on=True
|
if entry[1]: # on=True
|
||||||
result[stream] = entry.tags
|
result[stream] = entry.tags
|
||||||
return result
|
return result
|
||||||
|
|
||||||
def get_experiments(self, start=None, end=None, **tags):
|
def get_experiments(self, start=None, end=None, stream=None, **tags):
|
||||||
"""get experiments (periods with the same device/stream/instrument combination)
|
"""get experiments (periods with the same device/stream/instrument combination)
|
||||||
|
|
||||||
:param start: start of time period
|
:param start: start of time period
|
||||||
:param end: end of time period
|
:param end: end of time period
|
||||||
:return: list of tuple((<start>, <end>), ('instrument' or 'stream', <value>), dict <stream> of <device>)
|
:return: dict <key> of dict <device combi> of list of [start, end]
|
||||||
|
where <key> is ('instrument', <instrument>) or ('stream', <stream>)
|
||||||
|
and <device combi> is a tuple (tuple of streams, tuple of devices)
|
||||||
"""
|
"""
|
||||||
|
|
||||||
interval = 1
|
interval = 1
|
||||||
@ -219,15 +225,15 @@ class SEHistory(InfluxDBWrapper):
|
|||||||
previous = {}
|
previous = {}
|
||||||
else:
|
else:
|
||||||
previous = self.query(None, start, _measurement='_stream_', _field='on', interval=interval,
|
previous = self.query(None, start, _measurement='_stream_', _field='on', interval=interval,
|
||||||
stream=None, device=None, instrument=None, single=1)
|
stream=stream, device=None, instrument=None, single=1, **tags)
|
||||||
if end is None:
|
if end is None:
|
||||||
nextrow = {}
|
nextrow = {}
|
||||||
else:
|
else:
|
||||||
nextrow = self.query(end, None, _measurement='_stream_', _field='on', interval=interval,
|
nextrow = self.query(end, None, _measurement='_stream_', _field='on', interval=interval,
|
||||||
stream=None, device=None, instrument=None, single=-1)
|
stream=stream, device=None, instrument=None, single=-1, **tags)
|
||||||
start, end = abs_range(start, end)
|
start, end = abs_range(start, end)
|
||||||
inperiod = self.query(start, end, _measurement='_stream_', _field='on', interval=interval,
|
inperiod = self.query(start, end, _measurement='_stream_', _field='on', interval=interval,
|
||||||
stream=None, device=None, instrument=None)
|
stream=stream, device=None, instrument=None, **tags)
|
||||||
for key, single in previous.items():
|
for key, single in previous.items():
|
||||||
if key in inperiod:
|
if key in inperiod:
|
||||||
inperiod[key].insert(0, tuple(single))
|
inperiod[key].insert(0, tuple(single))
|
||||||
@ -242,42 +248,47 @@ class SEHistory(InfluxDBWrapper):
|
|||||||
key = ('instrument', instrument) if instrument else ('stream', stream)
|
key = ('instrument', instrument) if instrument else ('stream', stream)
|
||||||
elist = entries.setdefault(key, [])
|
elist = entries.setdefault(key, [])
|
||||||
for row in table:
|
for row in table:
|
||||||
|
print('S', stream, device, key, row[1], fmtime(row[0]))
|
||||||
elist.append(row[:2] + (stream, device))
|
elist.append(row[:2] + (stream, device))
|
||||||
result = []
|
result = {} # dict <instrument> of dict <device combi> of list of [start, end]
|
||||||
for key, rows in entries.items():
|
for key, rows in entries.items():
|
||||||
rows.sort()
|
rows.sort()
|
||||||
current = {} # dict <stream> of [<device>, <start>, <end>]
|
devices = {}
|
||||||
chunks = [current]
|
|
||||||
for ts, flag, stream, device in rows:
|
|
||||||
if flag:
|
|
||||||
prev = current.get(stream)
|
|
||||||
if prev:
|
|
||||||
if device == prev[0] and prev[1] + gap < ts < prev[2] + gap:
|
|
||||||
prev[2] = ts
|
|
||||||
continue
|
|
||||||
current = {}
|
current = {}
|
||||||
chunks.append(current)
|
combi = None
|
||||||
current[stream] = [device or stream, ts, eternity]
|
chunk = None
|
||||||
|
for ts, flag, stream, device in rows:
|
||||||
|
ts = max(start, int(ts))
|
||||||
|
if flag:
|
||||||
|
devices[stream] = device
|
||||||
else:
|
else:
|
||||||
prev = current.get(stream)
|
device = None
|
||||||
if prev:
|
devices.pop(device, None)
|
||||||
prev[2] = ts
|
devcombi = tuple(zip(*devices.items()))
|
||||||
prevexpt = [0, 0]
|
if devcombi != combi:
|
||||||
prevdevices = {} # dict <stream> of <device>
|
print('D', devcombi, fmtime(ts))
|
||||||
for chunk in chunks:
|
if combi:
|
||||||
if chunk:
|
prevend = min(ts, chunk[1])
|
||||||
devices = {k: v[0] for k, v in chunk.items() if v[0]}
|
if prevend - chunk[0] < gap:
|
||||||
beg = min(t[1] for t in chunk.values())
|
current.pop(combi)
|
||||||
if beg > prevexpt[0] + gap or any(v != devices.get(k) for k, v in prevdevices.items()):
|
else:
|
||||||
prevdevices = devices
|
chunk[1] = prevend
|
||||||
if prevexpt[1] > beg:
|
if devcombi:
|
||||||
prevexpt[1] = beg # shorten previous
|
chunks = current.setdefault(devcombi, [])
|
||||||
prevexpt = [beg, max(t[2] for t in chunk.values()), key, devices]
|
if chunks and time.localtime(chunks[-1][0])[:3] == time.localtime(ts)[:3]:
|
||||||
result.append(prevexpt)
|
# merge when started at the same day
|
||||||
result.sort()
|
chunk = chunks[-1][1]
|
||||||
for expt in result:
|
chunk[1] = eternity
|
||||||
if expt[-1] == eternity:
|
print('EXTEND', devcombi, fmtime(ts))
|
||||||
expt[-1] = time.time()
|
else:
|
||||||
|
print('APPEND', devcombi, fmtime(ts))
|
||||||
|
chunk = [ts, eternity]
|
||||||
|
chunks.append(chunk)
|
||||||
|
else:
|
||||||
|
chunk = None
|
||||||
|
combi = devcombi
|
||||||
|
if current:
|
||||||
|
result[key] = current
|
||||||
return result
|
return result
|
||||||
|
|
||||||
def set_instrument(self, stream, value, ts=None, guess=True, **tags):
|
def set_instrument(self, stream, value, ts=None, guess=True, **tags):
|
||||||
|
Reference in New Issue
Block a user