Files
sehistory/seinflux.py
Markus Zolliker e2122fb3a0 import get_experiments
- in case of localhost, remove .psi.ch from normalized host name
+ fix import in t.py
+ allow boolean as selection criterium
2025-02-26 13:43:39 +01:00

313 lines
14 KiB
Python

import time
from pathlib import Path
from configparser import ConfigParser
from influx import InfluxDBWrapper, abs_range, round_range, Table
def summarize_tags(curves, remove_multiple=False):
"""summarize tags
:param curves: list of curves (type Table)
:param remove_multiple: True: remove non-unique values
:return: dict <key> of comma separated values
"""
result = {}
for curve in curves:
for k, v in curve.tags.items():
result.setdefault(k, set()).add(str(v))
if remove_multiple:
return {k: ','.join(v) for k, v in result.items() if len(v) == 1}
return {k: ','.join(v) for k, v in result.items()}
class SEHistory(InfluxDBWrapper):
def __init__(self, dbname=None, instrument=None, access='readonly'):
self.instrument = instrument
parser = ConfigParser()
parser.optionxform = str
parser.read([Path('~/.sehistory').expanduser()])
section = parser[dbname] if dbname else parser[parser.sections()[0]]
super().__init__(*(section[k] for k in ('uri', 'token', 'org', 'bucket')), access=access)
def curves(self, start=None, stop=None, measurement=('*.value', '*.target'), field='float',
interval=None, add_prev=3600, add_end=False, merge=None, pivot=False, **kwds):
"""get curves
:param start: start time (default: since ever)
:param stop: end time (default: eternity = 1 year in the future)
:param measurement: '<module>.<parameter>' (default: ['*.value', '*.target'])
:param field: default 'float' (only numeric curves)
:param interval: if given, the result is binned
:param add_prev: amount of time to look back for the last previous point (default: 1 hr)
:param add_end: whether to add endpoint at stop time (default: False)
:param merge: None: no merge happens, else curves with the same final key are merged. 2 cases:
one key (str): the name of final key. result will be a dict of <str> of <Table>
a tuple of keys: the names of the final key elements. result: dict of <tuple> of Table
:param pivot: sort values in to columns of one big table
:param kwds: further selection criteria
:return: a dict <key or tuple of key values> of <Table> or <Single>
where <Table> is a list of tuples with some meta info (table.tags, table.column_names)
and <Single> is a list (a single row of a table) with the same meta info
when _field='float' (the default), the returned values are either a floats or None
"""
tags = {k: v for k, v in (('_measurement', measurement), ('_field', field)) if v is not None}
tags.update(kwds)
start, stop = abs_range(start, stop)
rstart, rstop = round_range(start, stop, interval)
if rstart < rstop:
result = self.query(rstart, rstop, interval, columns=None, **tags)
# result = self.query(rstart, rstop, interval, columns=['stream', 'device'], **tags)
else:
result = {}
start_row = {}
if add_prev:
prev_data = self.query(rstart - add_prev, rstart, single=1, **tags)
for key, first in prev_data.items():
curve = result.get(key)
if first[1] is not None:
if curve:
if first[0] < curve[0][0]:
if pivot:
curve.insert(0, (rstart,) + tuple(first[1:]))
# start_row.setdefault(key[1:], {})[key[0]] = first[1]
else:
curve.insert(0, tuple(first))
else:
result[key] = table = Table(first.tags, first.key_names, first.column_names)
table.append(tuple(first))
if add_end:
self.complete(result, stop)
if merge:
single_merge = isinstance(merge, str)
if single_merge:
merge = [merge]
rows = []
common_tags = summarize_tags(result.values(), True)
col_keys = {}
col_info = {}
for key, curve in result.items():
merge_tags = {k: curve.tags.get(k, '') for k in merge}
for k, v in zip(curve.key_names, key):
merge_tags.setdefault(k, v)
merge_key = tuple(zip(*merge_tags.items())) # (<keys tuple>, <values tuple>)
info = col_info.get(merge_key)
if info is None:
col_idx = len(col_keys) + 1
col_keys[col_idx] = merge_key
col_info[merge_key] = info = [col_idx, []]
else:
col_idx = info[0]
info[1].append(curve)
assert curve.column_names[1] == '_value'
for row in curve:
rows.append((row[0], row[1], col_idx))
# merged.append((merge_key, curve))
rows.sort(key=lambda x: x[0])
if pivot:
header = []
for merge_key, (col_idx, curves) in col_info.items():
tags = summarize_tags(curves)
primary = tags.pop(merge[0], None)
primary = [primary] if primary else []
header.append(' '.join(primary + [f"{k}={v}" for k, v in tags.items() if k not in common_tags]))
result = Table(common_tags, (), ('_time',) + tuple(header))
values = [0] + [None] * len(col_keys)
for row in rows:
col_nr = row[2]
values[col_nr] = row[1]
if row[0] > values[0]:
values[0] = row[0]
result.append(tuple(values))
elif row[0] < values[0]:
raise ValueError(f'{rstart} {values[0]} {row[0]}')
else:
result = {}
by_idx = {}
for merge_key, (col_idx, curves) in col_info.items():
tags = summarize_tags(curves)
print('TAGS', tags)
primary = tags.get(merge[0], '_value')
table = Table(tags, merge_key[0], ('_time', primary))
result[primary if single_merge else merge_key[1][:len(merge)]] = table
by_idx[col_idx] = table
for row in rows:
by_idx[row[2]].append((row[0], row[1]))
return result
@staticmethod
def complete(curve_dict, end_time=0, tag=None):
"""complete to end_time
if tag is given, end_time is a dict <tag value> of <end time>
"""
if tag is None:
end_time_dict = {}
else:
end_time_dict, end_time = end_time, 0
for curve in curve_dict.values():
if len(curve):
tlast, value = curve[-1]
etime = end_time_dict.get(curve.tags.get(tag), end_time)
if value is not None and tlast < etime:
curve.append((etime, value))
def export(self, start, stop, measurement=('*.value', '*.target'), field='float',
interval=None, add_prev=3600, add_end=False, timeoffset=None, none='', **tags):
result = self.curves(start, stop, measurement, field, interval, add_prev, add_end,
merge=('_measurement', 'device', 'stream'), pivot=True, **tags)
if timeoffset is None:
timeoffset = int(start)
result.tags.pop('_field', None)
rows = [f"# {' '.join(f'{k}={v}' for k, v in result.tags.items())}"]
rows.extend(f'# col {i} {k}' for i, k in enumerate(result.column_names))
rows.extend(result.to_csv_rows(timeoffset, none=none))
rows.append('')
return '\n'.join(rows)
def add_float(self, value, key, tags, ts):
self._add_point('.'.join(key), 'float', -0.0 if value is None else float(value), ts, tags)
def add_error(self, value, key, tags, ts):
self._add_point('.'.join(key), 'error', '' if value is None else str(value), ts, tags)
def get_instrument(self, stream, ts=None, **tags):
if ts is None:
ts = int(time.time())
reply = self.query(None, int(ts) + 1, _measurement='_stream_', _field='on',
stream=stream, single=1, **tags)
if reply:
entry = sorted(reply.values(), key=lambda r: r[0])[-1]
return entry.tags.get('instrument', self.instrument), entry[0]
return None, None
def get_streams(self, instrument=None, ts=None, **tags):
"""get streams for one or all instruments
:param instrument: None when looking for all instruments
:param ts: the time or None when now
:return: dict <stream> of <instrument> or '0' when instrument is not known
"""
if ts is None:
ts = int(time.time())
reply = self.query(None, int(ts) + 1, _measurement='_stream_', _field='on',
single=1, instrument=instrument, **tags)
all_entries = {}
for entry in reply.values():
all_entries.setdefault(entry.tags.get('stream'), []).append(entry)
result = {}
for stream, entries in all_entries.items():
entry = sorted(entries, key=lambda r: r[0])[-1]
if entry[1]: # on=True
result[stream] = entry.tags
return result
def get_experiments(self, start=None, end=None, **tags):
"""get experiments (periods with the same device/stream/instrument combination)
:param start: start of time period
:param end: end of time period
:return: list of tuple((<start>, <end>), ('instrument' or 'stream', <value>), dict <stream> of <device>)
"""
interval = 1
gap = 600
eternity = 1e10
entries = {}
if start is None:
previous = {}
else:
previous = self.query(None, start, _measurement='_stream_', _field='on', interval=interval,
stream=None, device=None, instrument=None, single=1)
if end is None:
nextrow = {}
else:
nextrow = self.query(end, None, _measurement='_stream_', _field='on', interval=interval,
stream=None, device=None, instrument=None, single=-1)
start, end = abs_range(start, end)
inperiod = self.query(start, end, _measurement='_stream_', _field='on', interval=interval,
stream=None, device=None, instrument=None)
for key, single in previous.items():
if key in inperiod:
inperiod[key].insert(0, tuple(single))
else:
inperiod[key] = Table(rows=[tuple(single)], **single.__dict__)
for key, table in inperiod.items():
nextvalue = nextrow.get(key)
if nextvalue:
print('N', key, nextvalue, table.tags)
if nextvalue and not nextvalue[1]:
table.append(tuple(nextvalue))
stream, instrument, device = [table.tags.get(k, '') for k in ('stream', 'instrument', 'device')]
key = ('instrument', instrument) if instrument else ('stream', stream)
elist = entries.setdefault(key, [])
for row in table:
elist.append(row[:2] + (stream, device))
result = []
for key, rows in entries.items():
rows.sort()
current = {} # dict <stream> of [<device>, <start>, <end>]
chunks = [current]
for ts, flag, stream, device in rows:
if flag:
prev = current.get(stream)
if prev:
if device == prev[0] and prev[1] + gap < ts < prev[2] + gap:
prev[2] = ts
continue
current = {}
chunks.append(current)
current[stream] = [device or stream, ts, eternity]
else:
prev = current.get(stream)
if prev:
prev[2] = ts
prevexpt = [0, 0]
prevdevices = {} # dict <stream> of <device>
for chunk in chunks:
if chunk:
devices = {k: v[0] for k, v in chunk.items() if v[0]}
beg = min(t[1] for t in chunk.values())
if beg > prevexpt[0] + gap or any(v != devices.get(k) for k, v in prevdevices.items()):
prevdevices = devices
if prevexpt[1] > beg:
prevexpt[1] = beg # shorten previous
prevexpt = [beg, max(t[2] for t in chunk.values()), key, devices]
result.append(prevexpt)
result.sort()
for expt in result:
if expt[-1] == eternity:
expt[-1] = time.time()
return result
def set_instrument(self, stream, value, ts=None, guess=True, **tags):
"""set stream and instrument on or off
:param stream: the uri of the stream
:param value: instrument, "0" when unknown or None when switching to off
:param ts: the time or None when now
:param guess: when instrument is undefined, take from previous
"""
prev, prevts = self.get_instrument(stream, ts, **tags)
if prevts is not None:
if prev in (None, '0'):
ts = prevts + 0.001
else:
if value == '0' and guess:
value = prev
if ts < prevts:
ts = prevts + 0.001
tags['stream'] = stream
if value:
tags['instrument'] = value
flag = True
else:
tags['instrument'] = prev or '0'
flag = False
self._add_point('_stream_', 'on', flag, ts, tags)
def add_stream(self, value, tags, key, ts):
self.set_instrument(key, value, ts, **tags)