430 lines
19 KiB
Python
430 lines
19 KiB
Python
import time
|
|
import logging
|
|
from pathlib import Path
|
|
from configparser import ConfigParser
|
|
from sehistory.influx import InfluxDBWrapper, abs_range, round_range, Table
|
|
|
|
ETERNITY = 1e10
|
|
|
|
|
|
def fmtime(t):
|
|
if t is None:
|
|
return ' ' * 19
|
|
if t >= ETERNITY:
|
|
return '-' * 19
|
|
return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(int(t)))
|
|
|
|
|
|
def sameday(ts1, ts2):
|
|
return time.localtime(ts1)[:3] == time.localtime(ts2)[:3]
|
|
|
|
|
|
def summarize_tags(curves, remove_multiple=False):
|
|
"""summarize tags
|
|
|
|
:param curves: list of curves (type Table)
|
|
:param remove_multiple: True: remove non-unique values
|
|
:return: dict <key> of comma separated values
|
|
"""
|
|
result = {}
|
|
for curve in curves:
|
|
for k, v in curve.tags.items():
|
|
result.setdefault(k, set()).add(str(v))
|
|
if remove_multiple:
|
|
return {k: ','.join(v) for k, v in result.items() if len(v) == 1}
|
|
return {k: ','.join(v) for k, v in result.items()}
|
|
|
|
|
|
class SEHistory(InfluxDBWrapper):
|
|
def __init__(self, dbname=None, instrument=None, access='readonly'):
|
|
self.instrument = instrument
|
|
parser = ConfigParser()
|
|
parser.optionxform = str
|
|
parser.read([Path('~/.config/sehistory').expanduser()])
|
|
section = parser[dbname] if dbname else parser[parser.sections()[0]]
|
|
self.has_local = parser.has_section('local')
|
|
self.instrument_by_stream = {}
|
|
super().__init__(*(section[k] for k in ('uri', 'token', 'org', 'bucket')), access=access)
|
|
|
|
def curves(self, start=None, stop=None, measurement=('*.value', '*.target'), field='float',
|
|
interval=None, add_prev=3600, add_end=False, merge=None, pivot=False, **kwds):
|
|
"""get curves
|
|
|
|
:param start: start time (default: since ever)
|
|
:param stop: end time (default: eternity = 1 year in the future)
|
|
:param measurement: '<module>.<parameter>' (default: ['*.value', '*.target'])
|
|
:param field: default 'float' (only numeric curves)
|
|
:param interval: if given, the result is binned
|
|
:param add_prev: amount of time to look back for the last previous point (default: 1 hr)
|
|
:param add_end: whether to add endpoint at stop time (default: False)
|
|
:param merge: None: no merge happens, else curves with the same final key are merged. 2 cases:
|
|
one key (str): the name of final key. result will be a dict of <str> of <Table>
|
|
a tuple of keys: the names of the final key elements. result: dict of <tuple> of Table
|
|
:param pivot: sort values in to columns of one big table
|
|
:param kwds: further selection criteria
|
|
:return: a dict <key or tuple of key values> of <Table> or <Single>
|
|
|
|
where <Table> is a list of tuples with some meta info (table.tags, table.column_names)
|
|
and <Single> is a list (a single row of a table) with the same meta info
|
|
|
|
when _field='float' (the default), the returned values are either a floats or None
|
|
"""
|
|
tags = {k: v for k, v in (('_measurement', measurement), ('_field', field)) if v is not None}
|
|
tags.update(kwds)
|
|
start, stop = abs_range(start, stop)
|
|
rstart, rstop = round_range(start, stop, interval)
|
|
if rstart < rstop:
|
|
result = self.query(rstart, rstop, interval, columns=None, **tags)
|
|
# result = self.query(rstart, rstop, interval, columns=['stream', 'device'], **tags)
|
|
else:
|
|
result = {}
|
|
start_row = {}
|
|
if add_prev:
|
|
prev_data = self.query(rstart - add_prev, rstart, single=1, **tags)
|
|
for key, table in prev_data.items():
|
|
curve = result.get(key)
|
|
first = table[0]
|
|
if first[1] is not None:
|
|
if curve:
|
|
if first[0] < curve[0][0]:
|
|
if pivot:
|
|
curve.insert(0, (rstart,) + tuple(first[1:]))
|
|
# start_row.setdefault(key[1:], {})[key[0]] = first[1]
|
|
else:
|
|
curve.insert(0, tuple(first))
|
|
else:
|
|
result[key] = table
|
|
if add_end:
|
|
self.complete(result, stop)
|
|
if merge:
|
|
single_merge = isinstance(merge, str)
|
|
if single_merge:
|
|
merge = [merge]
|
|
rows = []
|
|
common_tags = summarize_tags(result.values(), True)
|
|
col_keys = {}
|
|
col_info = {}
|
|
for key, curve in result.items():
|
|
merge_tags = {k: curve.tags.get(k, '') for k in merge}
|
|
for k, v in zip(curve.key_names, key):
|
|
merge_tags.setdefault(k, v)
|
|
merge_key = tuple(zip(*merge_tags.items())) # (<keys tuple>, <values tuple>)
|
|
info = col_info.get(merge_key)
|
|
if info is None:
|
|
col_idx = len(col_keys) + 1
|
|
col_keys[col_idx] = merge_key
|
|
col_info[merge_key] = info = [col_idx, []]
|
|
else:
|
|
col_idx = info[0]
|
|
info[1].append(curve)
|
|
assert curve.column_names[1] == '_value'
|
|
for row in curve:
|
|
rows.append((row[0], row[1], col_idx))
|
|
# merged.append((merge_key, curve))
|
|
rows.sort(key=lambda x: x[0])
|
|
if pivot:
|
|
header = []
|
|
for merge_key, (col_idx, curves) in col_info.items():
|
|
tags = summarize_tags(curves)
|
|
primary = tags.pop(merge[0], None)
|
|
primary = [primary] if primary else []
|
|
header.append(' '.join(primary + [f"{k}={v}" for k, v in tags.items() if k not in common_tags]))
|
|
result = Table(common_tags, (), ('_time',) + tuple(header))
|
|
values = [0] + [None] * len(col_keys)
|
|
for row in rows:
|
|
col_nr = row[2]
|
|
values[col_nr] = row[1]
|
|
if row[0] > values[0]:
|
|
values[0] = row[0]
|
|
result.append(tuple(values))
|
|
elif row[0] < values[0]:
|
|
raise ValueError(f'{rstart} {values[0]} {row[0]}')
|
|
else:
|
|
result = {}
|
|
by_idx = {}
|
|
for merge_key, (col_idx, curves) in col_info.items():
|
|
tags = summarize_tags(curves)
|
|
primary = tags.get(merge[0], '_value')
|
|
final_key = primary if single_merge else merge_key[1][:len(merge)]
|
|
table = result.get(final_key)
|
|
if table is None:
|
|
result[final_key] = table = Table(tags, merge_key[0], ('_time', primary))
|
|
else:
|
|
table.tags.update(tags) # for previously undefined units
|
|
by_idx[col_idx] = table
|
|
for row in rows:
|
|
by_idx[row[2]].append((row[0], row[1]))
|
|
return result
|
|
|
|
@staticmethod
|
|
def complete(curve_dict, end_time=0, tag=None):
|
|
"""complete to end_time
|
|
|
|
if tag is given, end_time is a dict <tag value> of <end time>
|
|
"""
|
|
if tag is None:
|
|
end_time_dict = {}
|
|
else:
|
|
end_time_dict, end_time = end_time, 0
|
|
for curve in curve_dict.values():
|
|
if len(curve):
|
|
tlast, value = curve[-1]
|
|
etime = end_time_dict.get(curve.tags.get(tag), end_time)
|
|
if value is not None and tlast < etime:
|
|
curve.append((etime, value))
|
|
|
|
def export(self, start, stop, measurement=('*.value', '*.target'), field='float',
|
|
interval=None, add_prev=3600, add_end=False, timeoffset=None, none='', **tags):
|
|
result = self.curves(start, stop, measurement, field, interval, add_prev, add_end,
|
|
merge=('_measurement', 'device', 'stream'), pivot=True, **tags)
|
|
if timeoffset is None:
|
|
timeoffset = int(start)
|
|
result.tags.pop('_field', None)
|
|
rows = [f"# {' '.join(f'{k}={v}' for k, v in result.tags.items())}"]
|
|
rows.extend(f'# col {i} {k}' for i, k in enumerate(result.column_names))
|
|
rows.extend(result.to_csv_rows(timeoffset, none=none))
|
|
rows.append('')
|
|
return '\n'.join(rows)
|
|
|
|
def add_float(self, value, key, tags, ts):
|
|
self._add_point('.'.join(key), 'float', -0.0 if value is None else float(value), ts, tags)
|
|
|
|
def add_error(self, value, key, tags, ts):
|
|
self._add_point('.'.join(key), 'error', '' if value is None else str(value), ts, tags)
|
|
|
|
def get_streams(self, instrument=None, stream=None, device=None,
|
|
start=None, end=None, include_finished=False, **tags):
|
|
"""get streams for one or all instruments
|
|
|
|
:param instrument: None when looking for all instruments
|
|
:param stream: None, a comma separated string or a sequence of streams
|
|
:param device: None or a comma separated string
|
|
:param start: None or start time. None means 'since ever'
|
|
:param end: None or end time. None means now (or more precise 'in a year')
|
|
:param include_finished: whether finished streams are to be included (False by default)
|
|
:return: dict <stream> of tags
|
|
|
|
Remark: an assignment of an instrument to a stream persists, even when the stream gets off
|
|
"""
|
|
if end is not None:
|
|
end = end + 1
|
|
if stream:
|
|
if isinstance(stream, str):
|
|
stream = stream.split(',')
|
|
else:
|
|
stream = list(stream)
|
|
tags['stream'] = stream[0] if len(stream) == 1 else stream
|
|
if device:
|
|
tags['device'] = device
|
|
before = self.query(None, start, _measurement='_stream_', _field='on',
|
|
instrument=instrument, single=1, **tags)
|
|
if start == end:
|
|
during = {}
|
|
else:
|
|
during = self.query(start, end, _measurement='_stream_', _field='on',
|
|
instrument=instrument, **tags)
|
|
all_entries = {}
|
|
# print('during', set(during))
|
|
# for key, table in before.items():
|
|
# print(key, type(table))
|
|
for key in set(before) | set(during):
|
|
for tables in (before, during):
|
|
table = tables.get(key, ())
|
|
if table:
|
|
stream = table.tags.get('stream')
|
|
for row in table:
|
|
all_entries.setdefault(stream, []).append((row, table.tags))
|
|
result = {}
|
|
for stream in sorted(all_entries):
|
|
entries = all_entries[stream]
|
|
current = None
|
|
last = None
|
|
instrument = None
|
|
for entry in sorted(entries, key=lambda e: e[0][0]):
|
|
(ts, flag), tags = entry
|
|
if flag:
|
|
if 'instrument' in tags:
|
|
instrument = tags['instrument']
|
|
elif instrument:
|
|
# keep instrument from previous assignments
|
|
tags['instrument'] = instrument
|
|
current = [ts, tags]
|
|
elif current:
|
|
if ts > current[0] + 60:
|
|
# else its probably not containing real data
|
|
last = current
|
|
current = None
|
|
if include_finished:
|
|
current = last
|
|
if current:
|
|
tags = current[1]
|
|
ins = tags.get('instrument')
|
|
if ins == '0':
|
|
tags.pop('instrument')
|
|
result[stream] = tags
|
|
return result
|
|
|
|
def get_experiments(self, start=None, end=None, stream=None, **tags):
|
|
"""get experiments (periods with the same device/stream/instrument combination)
|
|
|
|
:param start: start of time period
|
|
:param end: end of time period
|
|
:return: dict <key> of dict <device combi> of list of [start, end]
|
|
where <key> is ('instrument', <instrument>) or ('stream', <stream>)
|
|
and <device combi> is a tuple (tuple of streams, tuple of devices)
|
|
"""
|
|
|
|
interval = 1
|
|
gap = 600
|
|
if start is None:
|
|
previous = {}
|
|
else:
|
|
previous = self.query(None, start, _measurement='_stream_', _field='on', interval=interval,
|
|
stream=stream, device=None, instrument=None, single=1, **tags)
|
|
if end is None:
|
|
nextrows = {}
|
|
else:
|
|
nextrows = self.query(end, None, _measurement='_stream_', _field='on', interval=interval,
|
|
stream=stream, device=None, instrument=None, single=-1, **tags)
|
|
start, end = abs_range(start, end)
|
|
inperiod = self.query(start, end, _measurement='_stream_', _field='on', interval=interval,
|
|
stream=stream, device=None, instrument=None, **tags)
|
|
# prepend previous to the items in period or create if not there
|
|
for key, rows in previous.items():
|
|
if key in inperiod:
|
|
inperiod[key].insert(0, rows[0])
|
|
else:
|
|
inperiod[key] = rows
|
|
|
|
# append items after the ones in period, ignoring keys not present in period
|
|
# in the same go, create by_stream dict, joining tables with common stream
|
|
by_stream = {} # dict <stream> of [<ts>, <flag>, <instrument>, <device>]
|
|
for key, table in inperiod.items():
|
|
nextrow = nextrows.get(key)
|
|
if nextrow and not nextrow[0][1]:
|
|
table.extend(nextrow)
|
|
stream, instrument, device = [table.tags.get(k, '') for k in ('stream', 'instrument', 'device')]
|
|
elist = by_stream.setdefault(stream, [])
|
|
for row in table:
|
|
elist.append([row[0], row[1], stream, device, instrument])
|
|
# combine now by either instrument or stream, if instrument is undefined (='0')
|
|
by_key = {}
|
|
for stream, rows in by_stream.items():
|
|
rows.sort()
|
|
instrument = '0'
|
|
for row in rows:
|
|
if not row[-1]:
|
|
row[-1] = instrument
|
|
else:
|
|
instrument = row[-1]
|
|
key = ('stream', stream) if instrument == '0' else ('instrument', instrument)
|
|
by_key.setdefault(key, []).append(row)
|
|
result = {}
|
|
for key, rows in by_key.items():
|
|
devices = {}
|
|
current = {}
|
|
prevcombi = None
|
|
chunk = None
|
|
for ts, flag, stream, device, _ in rows:
|
|
ts = max(start, int(ts))
|
|
if flag:
|
|
devices[stream] = device
|
|
else:
|
|
devices.pop(stream, None)
|
|
devcombi = tuple(zip(*sorted(devices.items())))
|
|
if devcombi != prevcombi:
|
|
if prevcombi: # device(s) removed
|
|
prevend = min(ts, chunk[1])
|
|
if prevend - chunk[0] < gap:
|
|
current.pop(prevcombi)
|
|
else:
|
|
chunk[1] = prevend
|
|
if devcombi: # device(s) added
|
|
chunks = current.setdefault(devcombi, [])
|
|
if chunks:
|
|
prevbeg, prevend = chunks[-1]
|
|
# merge when joining or started at the same day
|
|
merge = prevend + gap > ts or sameday(prevbeg, ts)
|
|
else:
|
|
merge = False
|
|
if merge:
|
|
chunk = chunks[-1]
|
|
chunk[1] = ETERNITY
|
|
else:
|
|
chunk = [ts, ETERNITY]
|
|
chunks.append(chunk)
|
|
else:
|
|
chunk = None
|
|
prevcombi = devcombi
|
|
if current:
|
|
result[key] = current
|
|
return result
|
|
|
|
def get_instrument(self, stream, ts=None, **tags):
|
|
"""get assigned instrument and stream state
|
|
|
|
:param stream: the stream to look up
|
|
:param ts: the time ot None for 'now'
|
|
:param tags: tags to further filter the result
|
|
|
|
:return: <instrument>, <timestamp>
|
|
where <instrument> is the instrument assigned to stream <stream> at ts
|
|
amd <timestamp> is the time when this stream was on last time before ts or None when it is off
|
|
"""
|
|
if ts is None:
|
|
ts = int(time.time())
|
|
reply = self.query(None, int(ts) + 1, _measurement='_stream_', _field='on',
|
|
stream=stream, single=1, **tags)
|
|
instrument, lastts = None, None
|
|
if reply:
|
|
for table in sorted(reply.values(), key=lambda r: r[0][0]):
|
|
ins = table.tags.get('instrument')
|
|
row = table[-1]
|
|
lastts = row[0] if row[1] else None
|
|
if ins:
|
|
instrument = None if ins == '0' else ins
|
|
return instrument, lastts
|
|
|
|
def set_instrument(self, stream, instrument, ts=None, **tags):
|
|
"""set stream and instrument on or off
|
|
|
|
:param stream: the uri of the stream
|
|
:param instrument: or "0" to unassign the instrument or None when switching the stream off
|
|
:param ts: the time or None when now
|
|
"""
|
|
flag = instrument is not None
|
|
if flag and instrument != '0':
|
|
# in case an real instrument is given and no instrument is assigned yet
|
|
# we predate the assignment to the start of the stream
|
|
try:
|
|
previns, prevts = self.get_instrument(stream, ts, **tags)
|
|
if prevts is not None and (previns is None or (ts or ETERNITY) < prevts):
|
|
ts = prevts + 0.001
|
|
except Exception as e:
|
|
logging.warning('Exception in get_instrument: %r', e)
|
|
tags['stream'] = stream
|
|
if flag:
|
|
tags['instrument'] = instrument
|
|
self._add_point('_stream_', 'on', flag, ts, tags)
|
|
|
|
def remove_experiment(self, stream, ts=None, **tags):
|
|
if ts is not None:
|
|
ts += 1
|
|
reply = self.query(None, ts, _measurement='_stream_', _field='on',
|
|
stream=stream, single=1, **tags)
|
|
tagset = set(tags) | {'stream'}
|
|
tagset.discard('instrument')
|
|
for table in reply.values():
|
|
ts, flag = table[-1][:2]
|
|
if flag:
|
|
addtags = {k: v for k, v in table.tags.items()
|
|
if k not in {'instrument', '_measurement', '_field'}}
|
|
self._add_point('_stream_', 'on', False, ts + 0.001,
|
|
addtags)
|
|
self.flush()
|
|
|
|
def add_stream(self, value, tags, key, ts):
|
|
if value == '': # unknown instrument
|
|
value = self.instrument or self.instrument_by_stream.get(key, '0')
|
|
self.set_instrument(key, value, ts, **tags)
|