remove Single type, rework get stream

This commit is contained in:
l_samenv
2025-03-03 08:34:13 +01:00
parent a7797cbe6c
commit 31377dc933
3 changed files with 82 additions and 56 deletions

View File

@ -119,15 +119,6 @@ class Table(list):
yield sep.join(result) yield sep.join(result)
class Single(Table):
"""a single row of a table, as a list with meta info"""
def __init__(self, tags=None, key_names=(), column_names=(), rows=None):
super().__init__(tags, key_names, column_names)
if rows:
single_row, = rows
self[:] = single_row
class RegExp(str): class RegExp(str):
"""indicates, tht this string should be treated as regexp """indicates, tht this string should be treated as regexp
@ -279,9 +270,10 @@ class InfluxDBWrapper:
:param stop: end time (default: eternity = 1 year in the future) :param stop: end time (default: eternity = 1 year in the future)
:param interval: if set an aggregation filter will be applied. This will :param interval: if set an aggregation filter will be applied. This will
return only the latest values per time interval in seconds. return only the latest values per time interval in seconds.
:param single: when True (or 1), only the last value within the interval is returned :param single: when not 0, only the last value within the interval is returned
the resulting tables have all exactly one row
(for any existing combinations of tags!) (for any existing combinations of tags!)
single=-1: return the first value instead single < 0: return the first value instead
:param columns: if given, return only these columns (in addition to '_time' and '_value') :param columns: if given, return only these columns (in addition to '_time' and '_value')
:param tags: selection criteria: :param tags: selection criteria:
<tag>=None <tag>=None
@ -300,12 +292,9 @@ class InfluxDBWrapper:
""" """
result = {} result = {}
for rows, key, props in self.query_gen(start, stop, interval, single, columns, **tags): for rows, key, props in self.query_gen(start, stop, interval, single, columns, **tags):
if single: table = Table(*props, rows=rows)
result[key] = Single(*props, rows=rows) table.sort()
else: result[key] = table
table = Table(*props, rows=rows)
table.sort()
result[key] = table
return result return result
def query_gen(self, start=None, stop=None, interval=None, single=None, columns=None, **tags): def query_gen(self, start=None, stop=None, interval=None, single=None, columns=None, **tags):

View File

@ -1,7 +1,7 @@
import time import time
from pathlib import Path from pathlib import Path
from configparser import ConfigParser from configparser import ConfigParser
from .influx import InfluxDBWrapper, abs_range, round_range, Table from sehistory.influx import InfluxDBWrapper, abs_range, round_range, Table
def fmtime(t): def fmtime(t):
@ -68,8 +68,9 @@ class SEHistory(InfluxDBWrapper):
start_row = {} start_row = {}
if add_prev: if add_prev:
prev_data = self.query(rstart - add_prev, rstart, single=1, **tags) prev_data = self.query(rstart - add_prev, rstart, single=1, **tags)
for key, first in prev_data.items(): for key, table in prev_data.items():
curve = result.get(key) curve = result.get(key)
first = table[0]
if first[1] is not None: if first[1] is not None:
if curve: if curve:
if first[0] < curve[0][0]: if first[0] < curve[0][0]:
@ -79,8 +80,7 @@ class SEHistory(InfluxDBWrapper):
else: else:
curve.insert(0, tuple(first)) curve.insert(0, tuple(first))
else: else:
result[key] = table = Table(first.tags, first.key_names, first.column_names) result[key] = table
table.append(tuple(first))
if add_end: if add_end:
self.complete(result, stop) self.complete(result, stop)
if merge: if merge:
@ -181,30 +181,64 @@ class SEHistory(InfluxDBWrapper):
reply = self.query(None, int(ts) + 1, _measurement='_stream_', _field='on', reply = self.query(None, int(ts) + 1, _measurement='_stream_', _field='on',
stream=stream, single=1, **tags) stream=stream, single=1, **tags)
if reply: if reply:
entry = sorted(reply.values(), key=lambda r: r[0])[-1] entry = sorted(reply.values(), key=lambda r: r[0][0])[-1][0]
return entry.tags.get('instrument', self.instrument), entry[0] return entry.tags.get('instrument', self.instrument), entry[0]
return None, None return None, None
def get_streams(self, instrument=None, ts=None, **tags): def get_streams(self, instrument=None, stream=None, device=None, start=None, end=None, **tags):
"""get streams for one or all instruments """get streams for one or all instruments
:param instrument: None when looking for all instruments :param instrument: None when looking for all instruments
:param ts: the time or None when now :param stream: None, a comma separated string or a sequence of streams
:param device: None or a comma separated string
:param start: None or start time. None means 'since ever'
:param end: None or end time. None means now (or more precise 'in a year')
:return: dict <stream> of <instrument> or '0' when instrument is not known :return: dict <stream> of <instrument> or '0' when instrument is not known
""" """
if ts is None: if end is not None:
ts = int(time.time()) end = end + 1
reply = self.query(None, int(ts) + 1, _measurement='_stream_', _field='on', end = int(time.time())
single=1, instrument=instrument, **tags) if stream:
if isinstance(stream, str):
stream = stream.split(',')
else:
stream = list(stream)
tags['stream'] = stream[0] if len(stream) == 1 else stream
if device:
tags['device'] = device
before = self.query(None, start, _measurement='_stream_', _field='on',
instrument=instrument, single=1, **tags)
if start == end:
during = {}
else:
during = self.query(start, end, _measurement='_stream_', _field='on',
instrument=instrument, **tags)
all_entries = {} all_entries = {}
for entry in reply.values(): # print('during', set(during))
all_entries.setdefault(entry.tags.get('stream'), []).append(entry) # for key, table in before.items():
# print(key, type(table))
for key in set(before) | set(during):
for tables in (before, during):
table = tables.get(key, ())
if table:
stream = table.tags.get('stream')
for row in table:
all_entries.setdefault(stream, []).append((row, table.tags))
result = {} result = {}
for stream, entries in all_entries.items(): for stream, entries in all_entries.items():
entry = sorted(entries, key=lambda r: r[0])[-1] current = None
print('E', stream, entry, sorted(entries, key=lambda r: r[0])) for entry in sorted(entries, key=lambda e: e[0][0]):
if entry[1]: # on=True ts, flag = entry[0]
result[stream] = entry.tags if flag:
current = entry
elif current:
(lastts, _), tags = current
if ts > lastts + 60: # at least one minute
result[stream] = tags
break
else: # or on at end
if current:
result[stream] = current[1]
return result return result
def get_experiments(self, start=None, end=None, stream=None, **tags): def get_experiments(self, start=None, end=None, stream=None, **tags):
@ -227,23 +261,23 @@ class SEHistory(InfluxDBWrapper):
previous = self.query(None, start, _measurement='_stream_', _field='on', interval=interval, previous = self.query(None, start, _measurement='_stream_', _field='on', interval=interval,
stream=stream, device=None, instrument=None, single=1, **tags) stream=stream, device=None, instrument=None, single=1, **tags)
if end is None: if end is None:
nextrow = {} nextrows = {}
else: else:
nextrow = self.query(end, None, _measurement='_stream_', _field='on', interval=interval, nextrows = self.query(end, None, _measurement='_stream_', _field='on', interval=interval,
stream=stream, device=None, instrument=None, single=-1, **tags) stream=stream, device=None, instrument=None, single=-1, **tags)
start, end = abs_range(start, end) start, end = abs_range(start, end)
inperiod = self.query(start, end, _measurement='_stream_', _field='on', interval=interval, inperiod = self.query(start, end, _measurement='_stream_', _field='on', interval=interval,
stream=stream, device=None, instrument=None, **tags) stream=stream, device=None, instrument=None, **tags)
for key, single in previous.items(): for key, rows in previous.items():
if key in inperiod: if key in inperiod:
inperiod[key].insert(0, tuple(single)) inperiod[key].insert(0, rows[0])
else: else:
inperiod[key] = Table(rows=[tuple(single)], **single.__dict__) inperiod[key] = rows
for key, table in inperiod.items(): for key, table in inperiod.items():
nextvalue = nextrow.get(key) nextrow = nextrows.get(key)
if nextvalue and not nextvalue[1]: if nextrow and not nextrow[1]:
table.append(tuple(nextvalue)) table.extend(nextrow)
stream, instrument, device = [table.tags.get(k, '') for k in ('stream', 'instrument', 'device')] stream, instrument, device = [table.tags.get(k, '') for k in ('stream', 'instrument', 'device')]
key = ('instrument', instrument) if instrument else ('stream', stream) key = ('instrument', instrument) if instrument else ('stream', stream)
elist = entries.setdefault(key, []) elist = entries.setdefault(key, [])
@ -299,15 +333,19 @@ class SEHistory(InfluxDBWrapper):
:param ts: the time or None when now :param ts: the time or None when now
:param guess: when instrument is undefined, take from previous :param guess: when instrument is undefined, take from previous
""" """
prev, prevts = self.get_instrument(stream, ts, **tags) try:
if prevts is not None: prev, prevts = self.get_instrument(stream, ts, **tags)
if prev in (None, '0'): if prevts is not None:
ts = prevts + 0.001 if prev in (None, '0'):
else:
if value == '0' and guess:
value = prev
if ts < prevts:
ts = prevts + 0.001 ts = prevts + 0.001
else:
if value == '0' and guess:
value = prev
if ts < prevts:
ts = prevts + 0.001
except Exception as e:
print(f'Exception in get_instrument {e!r}')
pass
tags['stream'] = stream tags['stream'] = stream
if value: if value:
tags['instrument'] = value tags['instrument'] = value

9
t.py
View File

@ -1,7 +1,7 @@
import time import time
import math import math
import numpy as np import numpy as np
from sehistory.seinflux import SEHistory from sehistory.seinflux import SEHistory, fmtime
from influx import RegExp from influx import RegExp
DAY = 24 * 3600 DAY = 24 * 3600
@ -16,7 +16,6 @@ crv([start], [stop], [mod.par], ['float'], [interval=...,] [add_prev=False,] [ad
""") """)
now = int(time.time()) now = int(time.time())
offset = (now // 3600) * 3600
result = {} result = {}
maxcurves = 7 maxcurves = 7
maxpoints = 7 maxpoints = 7
@ -38,7 +37,7 @@ def prt():
print('...') print('...')
else: else:
for row in crv: for row in crv:
print(round(row[0] - offset, db.timedig), row[1:]) print(fmtime(row[0]), row[1:])
def qry(*args, **kwds): def qry(*args, **kwds):
@ -97,9 +96,9 @@ def sry(prectime=False):
for end, start, _, device, stream, pset in sorted(summary): for end, start, _, device, stream, pset in sorted(summary):
if prectime: if prectime:
res = db.query(start, end, device=device, stream=stream, single=-1) res = db.query(start, end, device=device, stream=stream, single=-1)
first = int(min(t[0] for t in res.values())) first = int(min(t[0][0] for t in res.values()))
res = db.query(start, end, device=device, stream=stream, single=1) res = db.query(start, end, device=device, stream=stream, single=1)
last = math.ceil(max(t[0] for t in res.values())) last = math.ceil(max(t[0][0] for t in res.values()))
tm1 = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(first)) tm1 = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(first))
tm2 = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(last)) tm2 = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(last))
else: else: