major improvements and rework

- add stream / instrument availability data
- events contain event kind for dispatching db methods
This commit is contained in:
2025-02-24 14:54:54 +01:00
parent 832252bbbb
commit c0aeca523a
6 changed files with 501 additions and 231 deletions

View File

@ -2,22 +2,25 @@ import sys
from streams import EventStream from streams import EventStream
from nicoscache import NicosStream from nicoscache import NicosStream
from secop import ScanStream, ScanReply, send_fake_udp from secop import ScanStream, ScanReply, send_fake_udp
from influx import testdb from influx import InfluxDBWrapper
def main(): def main():
# egen = EventStream(ScanReply(), ScanStream(), n=NicosStream('localhost:14002')) # egen = EventStream(ScanReply(), ScanStream(), n=NicosStream('localhost:14002'))
egen = EventStream(ScanReply(), ScanStream()) egen = EventStream(ScanReply(), ScanStream())
db = testdb() db = InfluxDBWrapper('linse-c')
db.enable_write_access() db.enable_write_access()
event_map = {'value': db.add_float, 'error': db.add_error, 'stream': db.add_stream}
try: try:
while 1: while 1:
for event in egen.get_events(): for kind, *args in egen.get_events():
db.add_point(*event) event_map[kind](*args)
db.flush() db.flush()
finally: finally:
for event in egen.finish(): for kind, *args in egen.finish():
db.add_point(*event) event_map[kind](*args)
db.disconnect() db.disconnect()

520
influx.py
View File

@ -21,16 +21,19 @@
# ***************************************************************************** # *****************************************************************************
import re import re
import time import time
from datetime import datetime from pathlib import Path
from configparser import ConfigParser
from datetime import datetime, timezone
from math import floor, ceil from math import floor, ceil
from influxdb_client import InfluxDBClient, BucketRetentionRules, Point from influxdb_client import InfluxDBClient, BucketRetentionRules, Point
from influxdb_client.client.write_api import SYNCHRONOUS from influxdb_client.client.write_api import SYNCHRONOUS
DAY = 24 * 3600 DAY = 24 * 3600
YEAR = 366 * DAY
# write_precision from digits after decimal point # write_precision from digits after decimal point
TIME_PRECISION = ['s'] + ['ms'] * 3 + ['us'] * 3 + ['ns'] * 3 TIME_PRECISION = ['s'] + ['ms'] * 3 + ['us'] * 3 + ['ns'] * 3
UNDEF = '<undef>'
try: try:
parse_time = datetime.fromisoformat parse_time = datetime.fromisoformat
@ -42,92 +45,104 @@ def to_time(v):
return parse_time(v).timestamp() return parse_time(v).timestamp()
def identity(v): def to_iso(t):
return v return datetime.fromtimestamp(t, timezone.utc).isoformat().replace('+00:00', 'Z')
def double(v): class PrettyFloat(float):
return None if v == '-0' else float(v) """saves bandwidth when converting to JSON
a lot of numbers originally have a fixed (low) number of decimal digits.
as the binary representation is not exact, it might happen, that a
lot of superfluous digits are transmitted:
CONVERTER = { str(1/10*3) == '0.30000000000000004'
'string': identity, str(PrettyFloat(1/10*3)) == '0.3'
'long': int,
'double': double,
'unsigned_long': int,
'duration': int,
'dateTime:RFC3339': to_time,
'dateTime:RFC3339Nano': to_time,
# 'base64Binary': base64.b64decode,
}
class NamedTuple(tuple):
"""for our purpose improved version of collection.namedtuple
- names may be any string, but when not an identifer, attribute access is not possible
- access by key with get ([ ] is for indexed access)
Usage:
MyNamedTuple = NamedTuple.make_class(('a', 'b'))
x = MyNamedTuple(('a', 2.0))
assert x == ('a', 2.0) == (x.a, x.b) == (x.get('a'), x.get('b')) == (x[0], x[1])
""" """
keys = None def __new__(cls, value):
_idx_by_name = None return None if value == '-0' else super().__new__(cls, value)
def __new__(cls, keys): def __repr__(self):
"""create NamedTuple class from keys return '%.15g' % self
:param keys: a sequence of names for the elements
"""
idxbyname = {n: i for i, n in enumerate(keys)}
attributes = {n: property(lambda s, idx=i: s[idx])
for i, n in enumerate(keys)
if n.isidentifier() and not hasattr(cls, n)}
# clsname = '_'.join(attributes)
attributes.update(_idx_by_name=idxbyname, __new__=tuple.__new__, keys=tuple(keys))
return type(f"NamedTuple", (cls,), attributes)
def get(self, key, default=None, strict=False): class Converters(dict):
"""get item by key def __init__(self, datatypes):
super().__init__((i, getattr(self, f"cvt_{d.split(':')[0]}"))
for i, d in enumerate(datatypes) if i > 2)
:param key: the key def as_tuple(self, row):
:param default: value to return when key does not exist """get selected columns as tuple"""
:param strict: raise KeyError when key does not exist and ignore default return tuple(f(row[i]) for i, f in self.items())
:return: the value of requested element or default if the key does not exist
"""
try:
return self[self._idx_by_name[key]]
except KeyError:
if strict:
raise
return default
@property cvt_double = staticmethod(PrettyFloat)
def names(self):
return tuple(self._idx_by_name)
def tuple(self, *keys): @staticmethod
return tuple(self.get(k) for k in keys) def cvt_string(value):
return value
@staticmethod
def cvt_long(value):
return int(value)
@staticmethod
def cvt_dateTime(value):
return to_time(value)
@staticmethod
def cvt_boolean(value):
return value == 'true'
cvt_unsigned_long = cvt_duration = cvt_long
class Table(list): class Table(list):
"""a list of tuples with meta info""" """a list of tuples with meta info"""
def __init__(self, tags, key_names, column_names): def __init__(self, tags={}, key_names=(), column_names=(), rows=None):
super().__init__() super().__init__()
self.tags = tags self.tags = tags
self.key_names = key_names self.key_names = key_names
self.column_names = column_names self.column_names = column_names
if rows:
self[:] = rows
def to_csv_rows(self, timeoffset=0, sep='\t', none='none', float_format='%.15g'):
for row in self:
result = ['%.15g' % (row[0] - timeoffset)]
for value in row[1:]:
try:
result.append(float_format % value)
except TypeError:
if value is None:
result.append(none)
else:
result.append(str(value).replace(sep, ' '))
yield sep.join(result)
class Single(Table): class Single(Table):
"""a single row of a table, as a list with meta info""" """a single row of a table, as a list with meta info"""
def __init__(self, table): def __init__(self, tags={}, key_names=(), column_names=(), rows=None):
super().__init__(table.tags, table.key_names, table.column_names) super().__init__(tags, key_names, column_names)
single, = table if rows:
self[:] = single single_row, = rows
self[:] = single_row
def summarize_tags(curves, remove_multiple=False):
"""summarize tags
:param curves: list of curves (type Table)
:param remove_multiple: True: remove non-unique values
:return: dict <key> of comma separated values
"""
result = {}
for curve in curves:
for k, v in curve.tags.items():
result.setdefault(k, set()).add(str(v))
if remove_multiple:
return {k: ','.join(v) for k, v in result.items() if len(v) == 1}
return {k: ','.join(v) for k, v in result.items()}
class RegExp(str): class RegExp(str):
@ -158,13 +173,13 @@ class CurveDict(dict):
def abs_range(start=None, stop=None): def abs_range(start=None, stop=None):
now = time.time() now = time.time()
if start is None: if start is None: # since ever
start = int(now - 32 * DAY) start = 0
elif start < 366 * DAY: elif start < YEAR:
start = int(now + start) start = int(now + start)
if stop is None: if stop is None:
stop = int(now + DAY) stop = int(now + YEAR)
elif stop < 366 * DAY: elif stop < YEAR:
stop = ceil(now + stop) stop = ceil(now + stop)
return start, stop return start, stop
@ -185,19 +200,24 @@ class InfluxDBWrapper:
_update_queue = None _update_queue = None
_write_api_write = None _write_api_write = None
def __init__(self, url, token, org, bucket, access='readonly'): def __init__(self, uri=None, token=None, org=None, bucket=None, access='readonly'):
"""initialize """initialize
:param url: the url for the influx DB :param uri: the uri for the influx DB or a name to look up in ~/.sehistory
:param token: the token :param token: the token
:param org: the organisation :param org: the organisation
:param bucket: the bucket name :param bucket: the bucket name
:param access: 'readonly', 'write' (RW) or 'create' (incl. RW) :param access: 'readonly', 'write' (RW) or 'create' (incl. RW)
""" """
self._url = url if ':' in uri:
self._token = token args = uri, token, org, bucket
self._org = org else:
self._bucket = bucket parser = ConfigParser()
parser.optionxform = str
parser.read([Path('~').expanduser() / '.sehistory'])
section = parser[uri]
args = [section[k] for k in ('uri', 'token', 'org', 'bucket')]
self._url, self._token, self._org, self._bucket =args
self._client = InfluxDBClient(url=self._url, token=self._token, self._client = InfluxDBClient(url=self._url, token=self._token,
org=self._org) org=self._org)
if access != 'readonly': if access != 'readonly':
@ -206,6 +226,8 @@ class InfluxDBWrapper:
self.set_time_precision(3) self.set_time_precision(3)
self.add_new_bucket(self._bucket, access == 'create') self.add_new_bucket(self._bucket, access == 'create')
self._write_buffer = [] self._write_buffer = []
self._alias = {}
print('InfluxDBWrapper', self._url, self._org, self._bucket)
def enable_write_access(self): def enable_write_access(self):
self._write_api_write = self._client.write_api(write_options=SYNCHRONOUS).write self._write_api_write = self._client.write_api(write_options=SYNCHRONOUS).write
@ -240,24 +262,47 @@ class InfluxDBWrapper:
import "influxdata/influxdb/schema" import "influxdata/influxdb/schema"
schema.measurements(bucket: "{self._bucket}")""") for r in t] schema.measurements(bucket: "{self._bucket}")""") for r in t]
def delete_measurement(self, measurement): def delete_measurement(self, measurement, start=None, stop=None):
delete_api = self._client.delete_api() delete_api = self._client.delete_api()
delete_api.delete('1970-01-01T00:00:00Z', '2038-01-01T00:00:00Z', f'_measurement="{measurement}"', start, stop = abs_range(start, stop)
if stop is None:
stop = time.time() + DAY
delete_api.delete(to_iso(start), to_iso(stop), f'_measurement="{measurement}"',
bucket=self._bucket, org=self._org) bucket=self._bucket, org=self._org)
def delete_all_measurements(self): def delete_all_measurements(self, measurements=None, start=0, stop=None):
measurements = self.get_measurements() if measurements is None:
measurements = self.get_measurements()
for meas in measurements: for meas in measurements:
self.delete_measurement(meas) self.delete_measurement(meas, start, stop)
print('deleted', measurements)
def _get_rows(self, reader, as_tuple, first_row):
row = first_row
tableno = row[2]
try:
while 1:
if row[0]:
first_row[:] = row
return
if row[2] != tableno:
# table id changed: new table, store first row for next call
first_row[:] = row
return
yield as_tuple(row)
row = next(reader)
if not row:
raise ValueError('EMPTY')
except StopIteration:
first_row.clear() # indicate end of data
# query the database # query the database
def query(self, start=None, stop=None, interval=None, single=None, columns=None, **tags): def query(self, start=None, stop=None, interval=None, single=None, columns=None, **tags):
"""Returns queried data as InfluxDB tables """Returns queried data as InfluxDB tables
:param start: start time. default is a month ago :param start: start time (default: since ever)
:param stop: end time, default is tomorrow at the same time :param stop: end time (default: eternity = 1 year in the future)
:param interval: if set an aggregation filter will be applied. This will :param interval: if set an aggregation filter will be applied. This will
return only the latest values per time interval in seconds. return only the latest values per time interval in seconds.
:param single: when True (or 1), only the last value within the interval is returned :param single: when True (or 1), only the last value within the interval is returned
@ -276,24 +321,44 @@ class InfluxDBWrapper:
the obtained value is contained in the result dicts key only the obtained value is contained in the result dicts key only
if the value is an instance of RegExp or when it contains an asterisk ('*') if the value is an instance of RegExp or when it contains an asterisk ('*')
:return: a dict <tuple of key values> of list of <row> :return: a dict <tuple of key values> of <Table instance>
where <tuple of keys> and <row> are NamedTuple Table is an extension of list, with some meta info
"""
result = {}
for rows, key, props in self.query_gen(start, stop, interval, single, columns, **tags):
if single:
result[key] = Single(*props, rows=rows)
else:
table = Table(*props, rows=rows)
table.sort()
result[key] = table
return result
def query_gen(self, start=None, stop=None, interval=None, single=None, columns=None, **tags):
"""Returns queried data as InfluxDB as a generator
argument description: see query methods
:return: an iterator of (rows, key, (tags, key_names, column_names))
remark: rows not consumed in between iteration steps are lost
using this generator version does reduce memory usage
""" """
self.flush() self.flush()
start, stop = round_range(*abs_range(start, stop)) start, stop = round_range(*abs_range(start, stop))
msg = [f'from(bucket:"{self._bucket}")', msg = [f'from(bucket:"{self._bucket}")',
f'|> range(start: {start}, stop: {stop})'] f'|> range(start: {start}, stop: {stop})']
keys = {} keylist = []
dropcols = ['_start', '_stop'] dropcols = ['_start', '_stop']
fixed_tags = {} fixed_tags = {}
for key, crit in tags.items(): for key, crit in tags.items():
if crit is None: if crit is None:
keys[key] = None keylist.append(key)
continue continue
if isinstance(crit, str): if isinstance(crit, str):
if isinstance(crit, RegExp) or '*' in crit: if isinstance(crit, RegExp) or '*' in crit:
keys[key] = None keylist.append(key)
append_wildcard_filter(msg, key, [crit]) append_wildcard_filter(msg, key, [crit])
continue continue
fixed_tags[key] = crit fixed_tags[key] = crit
@ -304,7 +369,7 @@ class InfluxDBWrapper:
dropcols.append(key) dropcols.append(key)
else: else:
try: try:
keys[key] = None keylist.append(key)
append_wildcard_filter(msg, key, crit) append_wildcard_filter(msg, key, crit)
continue continue
except Exception: except Exception:
@ -316,84 +381,96 @@ class InfluxDBWrapper:
else: else:
msg.append('|> last(column: "_time")') msg.append('|> last(column: "_time")')
if interval: if interval:
msg.append(f'|> aggregateWindow(every: {interval}s, fn: last, createEmpty: false)') msg.append(f'|> aggregateWindow(every: {interval:g}s, fn: last, createEmpty: false)')
if columns is None: if columns is None:
msg.append(f'''|> drop(columns:["{'","'.join(dropcols)}"])''') msg.append(f'''|> drop(columns:["{'","'.join(dropcols)}"])''')
else: else:
columns = ['_time', '_value'] + list(columns) columns = ['_time', '_value'] + list(columns)
msg.append(f'''|> keep(columns:["{'","'.join(columns + keys)}"])''') msg.append(f'''|> keep(columns:["{'","'.join(columns + keylist)}"])''')
msg = '\n'.join(msg) msg = '\n'.join(msg)
print(msg) # print(msg)
self.msg = msg
reader = self._client.query_api().query_csv(msg) try:
print('CSV', keys, columns) reader = self._client.query_api().query_csv(msg)
converters = None except Exception:
group = None print(msg)
column_names = None raise
column_keys = None
key = None
result = {}
tableno = None
for row in reader: try:
if not row: row = next(reader)
continue except StopIteration:
if row[0]: return
if row[0] == '#datatype': converters = key_dict = table_properties = None # make IDE happy
converters = {i: CONVERTER.get(d) for i, d in enumerate(row) if i > 2} for i in range(5):
column_names = None header = {}
elif row[0] == '#group': if row[0]: # this is a header
group = row header[row[0]] = row
continue for row in reader:
if column_names is None: if row:
if not row[0]:
break
header[row[0]] = row
else:
return # this should not happen
# we are now at the row with the column names
column_names = row column_names = row
converters = Converters(header['#datatype'])
group = header['#group']
keys = {k: None for k in keylist}
for col, (name, grp) in enumerate(zip(column_names, group)): for col, (name, grp) in enumerate(zip(column_names, group)):
if grp != 'true': if grp != 'true':
continue continue
if columns is None or name in keys: if columns is None or name in keys:
keys[name] = col, converters.pop(col) keys[name] = col, converters.pop(col)
column_keys = tuple(column_names[i] for i in converters) none_keys = [k for k, v in keys.items() if v is None]
continue if none_keys:
if row[2] != tableno: for k in none_keys:
# new table, new key keys.pop(k)
tableno = row[2] # break
key_dict = {n: f(row[i]) for n, (i, f) in keys.items()} row = next(reader)
key = tuple(key_dict.values()) # we are at the first data row
if result.get(key) is None: key_dict = {n: f(row[i]) for n, (i, f) in keys.items()}
print('KC', key_dict, column_keys) column_keys = tuple(column_names[i] for i in converters)
result[key] = Table({**fixed_tags, **key_dict}, tuple(keys), column_keys) table_properties = {**fixed_tags, **key_dict}, tuple(keys), column_keys
key = tuple(key_dict.values())
result[key].append(tuple(f(row[i]) for i, f in converters.items())) row = list(row) # copy row, as it will be modified
if single: rows = self._get_rows(reader, converters.as_tuple, row)
for key, table in result.items(): yield rows, key, table_properties
result[key] = Single(table) # consume unused rows
else: consumed = sum(1 for _ in rows)
for table in result.values(): if consumed:
table.sort() print('skip', consumed, 'rows')
return result if not row: # reader is at end
return
def curves(self, start=None, stop=None, measurement=('*.value', '*.target'), field='float', def curves(self, start=None, stop=None, measurement=('*.value', '*.target'), field='float',
interval=None, add_prev=3600, add_end=True, **tags): interval=None, add_prev=3600, add_end=False, merge=None, pivot=False, **kwds):
"""get curves """get curves
:param start: start time (default: one month ago) :param start: start time (default: since ever)
:param stop: end time (default: tomorrow) :param stop: end time (default: eternity = 1 year in the future)
:param measurement: '<module>.<parameter>' (default: ['*.value', '*.target']) :param measurement: '<module>.<parameter>' (default: ['*.value', '*.target'])
:param field: default 'float' (only numeric curves) :param field: default 'float' (only numeric curves)
:param interval: if given, the result is binned :param interval: if given, the result is binned
:param add_prev: amount of time to look back for the last previous point (default: 1 hr) :param add_prev: amount of time to look back for the last previous point (default: 1 hr)
:param add_end: whether to add endpoint at stop time (default: False) :param add_end: whether to add endpoint at stop time (default: False)
:param tags: further selection criteria :param merge: None: no merge happens, else curves with the same final key are merged. 2 cases:
:return: a dict <tuple of key values> of <Table> or <Single> one key (str): the name of final key. result will be a dict of <str> of <Table>
a tuple of keys: the names of the final key elements. result: dict of <tuple> of Table
:param pivot: sort values in to columns of one big table
:param kwds: further selection criteria
:return: a dict <key or tuple of key values> of <Table> or <Single>
where <Table> is a list of tuples with some meta info (table.tags, table.column_names) where <Table> is a list of tuples with some meta info (table.tags, table.column_names)
and <Single> is a list (a single row of a table) with the same meta info and <Single> is a list (a single row of a table) with the same meta info
when _field='float' (the default), the returned values are either a floats or None when _field='float' (the default), the returned values are either a floats or None
""" """
tags.setdefault('_measurement', measurement) tags = {k: v for k, v in (('_measurement', measurement), ('_field', field)) if v is not None}
tags.setdefault('_field', field) tags.update(kwds)
start, stop = abs_range(start, stop) start, stop = abs_range(start, stop)
rstart, rstop = round_range(start, stop, interval) rstart, rstop = round_range(start, stop, interval)
if rstart < rstop: if rstart < rstop:
@ -401,6 +478,7 @@ class InfluxDBWrapper:
# result = self.query(rstart, rstop, interval, columns=['stream', 'device'], **tags) # result = self.query(rstart, rstop, interval, columns=['stream', 'device'], **tags)
else: else:
result = {} result = {}
start_row = {}
if add_prev: if add_prev:
prev_data = self.query(rstart - add_prev, rstart, single=1, **tags) prev_data = self.query(rstart - add_prev, rstart, single=1, **tags)
for key, first in prev_data.items(): for key, first in prev_data.items():
@ -408,21 +486,105 @@ class InfluxDBWrapper:
if first[1] is not None: if first[1] is not None:
if curve: if curve:
if first[0] < curve[0][0]: if first[0] < curve[0][0]:
curve.insert(0, first) if pivot:
curve.insert(0, (rstart,) + tuple(first[1:]))
# start_row.setdefault(key[1:], {})[key[0]] = first[1]
else:
curve.insert(0, tuple(first))
else: else:
result[key] = [first] result[key] = table = Table(first.tags, first.key_names, first.column_names)
table.append(tuple(first))
if add_end: if add_end:
self.complete(result, stop)
if merge:
single_merge = isinstance(merge, str)
if single_merge:
merge = [merge]
rows = []
common_tags = summarize_tags(result.values(), True)
col_keys = {}
col_info = {}
for key, curve in result.items(): for key, curve in result.items():
if curve: merge_tags = {k: curve.tags.get(k, '') for k in merge}
last = list(curve[-1]) for k, v in zip(curve.key_names, key):
if last[0] < stop: merge_tags.setdefault(k, v)
last[0] = stop merge_key = tuple(zip(*merge_tags.items())) # (<keys tuple>, <values tuple>)
curve.append(type(curve[-1])(last)) info = col_info.get(merge_key)
if info is None:
col_idx = len(col_keys) + 1
col_keys[col_idx] = merge_key
col_info[merge_key] = info = [col_idx, []]
else:
col_idx = info[0]
info[1].append(curve)
assert curve.column_names[1] == '_value'
for row in curve:
rows.append((row[0], row[1], col_idx))
# merged.append((merge_key, curve))
rows.sort(key=lambda x: x[0])
if pivot:
header = []
for merge_key, (col_idx, curves) in col_info.items():
tags = summarize_tags(curves)
primary = tags.pop(merge[0])
header.append(' '.join([primary] + [f"{k}={v}" for k, v in tags.items() if k not in common_tags]))
result = Table(common_tags, (), ('_time',) + tuple(header))
values = [0] + [None] * len(col_keys)
for row in rows:
col_nr = row[2]
values[col_nr] = row[1]
if row[0] > values[0]:
values[0] = row[0]
result.append(tuple(values))
elif row[0] < values[0]:
raise ValueError(f'{rstart} {values[0]} {row[0]}')
else:
result = {}
by_idx = {}
for merge_key, (col_idx, curves) in col_info.items():
tags = summarize_tags(curves)
primary = tags[merge[0]]
table = Table(tags, merge_key[0], ('_time', primary))
result[primary if single_merge else merge_key[1][:len(merge)]] = table
by_idx[col_idx] = table
for row in rows:
by_idx[row[2]].append((row[0], row[1]))
return result return result
@staticmethod
def complete(curve_dict, end_time=0, tag='stream'):
"""complete to end_time
if end_time is not given, is is the max timestamp within the same stream
"""
end_time_dict = {}
if not end_time:
for curve in curve_dict.values():
key = curve.tags.get(tag)
end_time_dict[key] = max(end_time_dict.get(key, 0), curve[-1][0])
for curve in curve_dict.values():
if len(curve):
tlast, value = curve[-1]
etime = end_time_dict.get(curve.tags.get(tag), end_time)
if value is not None and tlast < etime:
curve.append((etime, value))
def export(self, start, stop, measurement=('*.value', '*.target'), field='float',
interval=None, add_prev=3600, add_end=False, timeoffset=None, none='', **tags):
result = self.curves(start, stop, measurement, field, interval, add_prev, add_end,
merge=('_measurement', 'device', 'stream'), pivot=True, **tags)
if timeoffset is None:
timeoffset = int(start)
result.tags.pop('_field', None)
rows = [f"# {' '.join(f'{k}={v}' for k, v in result.tags.items())}"]
rows.extend(f'# col {i} {k}' for i, k in enumerate(result.column_names))
rows.extend(result.to_csv_rows(timeoffset, none=none))
rows.append('')
return '\n'.join(rows)
# write to the database # write to the database
def _add_point(self, value, ts, measurement, field, tags): def _add_point(self, measurement, field, value, ts, tags):
point = Point(measurement).field(f'{field}', value) point = Point(measurement).field(f'{field}', value)
if ts: if ts:
point.time(datetime.utcfromtimestamp(ts), write_precision=self._write_precision) point.time(datetime.utcfromtimestamp(ts), write_precision=self._write_precision)
@ -447,16 +609,66 @@ class InfluxDBWrapper:
raise PermissionError('no write access - need access="write"') from None raise PermissionError('no write access - need access="write"') from None
raise raise
def add_point(self, isfloat, value, *args): # TODO: move these sehistory related methods to a subclass
"""add point to the buffer def add_float(self, value, key, tags, ts):
self._add_point('.'.join(key), 'float', -0.0 if value is None else float(value), ts, tags)
flush must be called in order to write the buffer def add_error(self, value, key, tags, ts):
self._add_point('.'.join(key), 'error', '' if value is None else str(value), ts, tags)
def get_instrument(self, stream, ts=None, **tags):
if ts is None:
ts = int(time.time())
reply = self.query(None, int(ts) + 1, _measurement='_stream_', _field='on',
stream=stream, single=1, **tags)
if reply:
entry = sorted(reply.values(), key=lambda r: r[0])[-1]
return entry.tags.get('instrument'), entry[0]
return None, None
def get_streams(self, instrument=None, ts=None, **tags):
"""get streams for one or all instruments
:param instrument: None when looking for all instruments
:param ts: the time or None when now
:return: dict <stream> of <instrument> or '0' when instrument is not known
""" """
if isfloat: if ts is None:
# make sure value is float ts = int(time.time())
self._add_point(-0.0 if value is None else float(value), *args) reply = self.query(None, int(ts) + 1, _measurement='_stream_', _field='on',
single=1, instrument=instrument, **tags)
all_entries = {}
for entry in reply.values():
all_entries.setdefault(entry.tags.get('stream'), []).append(entry)
result = {}
for stream, entries in all_entries.items():
entry = sorted(entries, key=lambda r: r[0])[-1]
if entry[1]: # on=True
result[stream] = entry.tags.get('instrument', '0')
return result
def set_instrument(self, stream, value, ts=None, **tags):
"""set stream and instrument on or off
:param stream: the uri of the stream
:param value: instrument, "0" when unknown or None when switching to off
:param ts: the time or None when now
"""
prev, row = self.get_instrument(stream, ts, **tags)
if row is not None:
if prev in (None, '0') or ts < row[0]:
ts = prevts + 0.001
tags['stream'] = stream
if value:
tags['instrument'] = value
flag = True
else: else:
self._add_point('' if value is None else str(value), *args) tags['instrument'] = prev or '0'
flag = False
self._add_point('_stream_', 'on', flag, ts, tags)
def add_stream(self, value, tags, key, ts):
self.set_instrument(key, value, ts, **tags)
def testdb(): def testdb():

View File

@ -156,7 +156,6 @@ class NicosStream(Stream):
except Exception as e: except Exception as e:
print(self.uri, repr(e)) print(self.uri, repr(e))
return return
cnt = 0
for ts, devname, param, op, value in sorted([t, d, p, o, v] for (d, p), (o, v, t) in events.items()): for ts, devname, param, op, value in sorted([t, d, p, o, v] for (d, p), (o, v, t) in events.items()):
descr = self.descr.get(devname) descr = self.descr.get(devname)
mod = descr.get('secop_module', devname) if descr else devname mod = descr.get('secop_module', devname) if descr else devname
@ -164,11 +163,9 @@ class NicosStream(Stream):
if self.devices.get(devname): if self.devices.get(devname):
try: try:
value = self.convert[key](value) value = self.convert[key](value)
yield 'value', value, key, self.tags, ts
error = None error = None
except KeyError: # no conversion function except KeyError: # no conversion function
continue continue
except TypeError: except TypeError:
value = None yield 'error', 'error', key, self.tags, ts
error = 'error'
cnt += 1
yield key, value, error, ts, self.get_tags(key)

View File

@ -16,6 +16,14 @@ class EnumConvert(dict):
return float(self[value]) return float(self[value])
class TagsDict(dict):
def __init__(self, default_value):
self.default_value = default_value
def __missing__(self, key):
return self.default_value
class SecopStream(Stream): class SecopStream(Stream):
ping_time = 0 ping_time = 0
@ -47,12 +55,12 @@ class SecopStream(Stream):
self.tags['device'] = self.device self.tags['device'] = self.device
self.modules = self.descr['modules'] self.modules = self.descr['modules']
self.convert = {} self.convert = {}
self.original_id = {} self.tags_dict = TagsDict(self.tags)
for mod, moddesc in self.modules.items(): for mod, moddesc in self.modules.items():
for key in ('_original_id', 'original_id'): for key in ('_original_id', 'original_id'):
value = moddesc.get(key) value = moddesc.get(key)
if value: if value:
self.original_id[mod] = value self.tags_dict[mod] = dict(self.tags, device=value)
break break
for param, desc in moddesc['accessibles'].items(): for param, desc in moddesc['accessibles'].items():
dt = desc['datainfo'] dt = desc['datainfo']
@ -64,11 +72,10 @@ class SecopStream(Stream):
self.send('ping') self.send('ping')
def get_tags(self, key): def get_tags(self, key):
return dict(self.tags, device=self.original_id.get(key[0], self.device)) return self.tags_dict[key[0]]
def event_generator(self): def event_generator(self):
try: try:
cnt = 0
for msg in self.get_lines(): for msg in self.get_lines():
match = UPDATE.match(msg) match = UPDATE.match(msg)
if match: if match:
@ -78,23 +85,23 @@ class SecopStream(Stream):
cvt = self.convert.get(key) cvt = self.convert.get(key)
if cvt: if cvt:
data = json.loads(data) data = json.loads(data)
tags = self.tags_dict[key[0]]
if cmd == 'error_update': if cmd == 'error_update':
error = ': '.join(data[0:2]) error = ': '.join(data[0:2])
print(msg, repr(error)) print(msg, repr(error))
ts = data[2].get('t', time.time()) timestamp = data[2].get('t', time.time())
value = None yield 'error', error, key, tags, timestamp
else: else:
error = None
ts = data[1].get('t', time.time())
value = cvt(data[0]) value = cvt(data[0])
cnt += 1 timestamp = data[1].get('t', time.time())
yield key, value, error, ts, self.get_tags(key) yield 'value', value, key, tags, timestamp
elif msg == 'active': elif msg == 'active':
# from now on, no more waiting # from now on, no more waiting
self.notimeout() self.notimeout()
except Exception as e: except Exception as e:
print(self.uri, repr(e)) print(self.uri, repr(e))
raise
SECOP_UDP_PORT = 10767 SECOP_UDP_PORT = 10767
@ -115,6 +122,8 @@ class UdpStream(Base):
continue continue
if kind == 'for_other_node': if kind == 'for_other_node':
uri = msg.pop('uri') uri = msg.pop('uri')
if 'device' not in msg:
msg['device'] = uri.split('://', 1)[-1].split(':')[0]
kwargs = msg kwargs = msg
elif kind == 'node': elif kind == 'node':
uri = f"{addr[0]}:{msg['port']}" uri = f"{addr[0]}:{msg['port']}"
@ -152,14 +161,17 @@ class ScanStream(UdpStream):
self.select_dict[sock.fileno()] = self self.select_dict[sock.fileno()] = self
def send_fake_udp(uri, device='fake'): def send_fake_udp(uri, device=None, instrument=None):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
msg = json.dumps({ msg = {
'SECoP': 'for_other_node', 'SECoP': 'for_other_node',
'uri': uri, 'uri': uri,
'device': device, }
}, ensure_ascii=False, separators=(',', ':')).encode('utf-8') if device:
sock.sendto(msg, ('255.255.255.255', SECOP_UDP_PORT)) msg['device'] = device
msg['instrument'] = instrument or '0'
sock.sendto(json.dumps(msg, ensure_ascii=False, separators=(',', ':')).encode('utf-8'),
('255.255.255.255', SECOP_UDP_PORT))

View File

@ -56,7 +56,7 @@ class Stream(Base):
self.encoding = encoding self.encoding = encoding
self.timeout = timeout self.timeout = timeout
self.socket = None self.socket = None
self.cache = {} self.cache = {} # dict <key> of event
self.errors = {} self.errors = {}
self.start_time = time.time() self.start_time = time.time()
self.next_hour = (self.start_time // 3600 + 1) * 3600 self.next_hour = (self.start_time // 3600 + 1) * 3600
@ -187,6 +187,11 @@ class Stream(Base):
break break
def event_generator(self): def event_generator(self):
"""a generator returning events
events are (<kind>, <value>, <key>, <tags>, <timestamp>)
kind is one of 'error', 'value', 'stream'
"""
raise NotImplementedError raise NotImplementedError
def get_tags(self, key): def get_tags(self, key):
@ -196,9 +201,8 @@ class Stream(Base):
def finish_events(self, events, end_time): def finish_events(self, events, end_time):
for key in list(self.cache): for key in list(self.cache):
self.cache.pop(key) self.cache.pop(key)
dbkey = '.'.join(key) events.append(('value', None, key, self.tags, end_time))
events.append((True, None, end_time, dbkey, 'float', self.tags)) events.append(('error', 'END', key, self.tags, end_time))
events.append((False, 'END', end_time, dbkey, 'error', self.tags))
def get_events(self, events, maxevents): def get_events(self, events, maxevents):
"""get available events """get available events
@ -208,23 +212,23 @@ class Stream(Base):
there might be more after a full hour or when the stream is dying there might be more after a full hour or when the stream is dying
:return: True when maxevents is reached :return: True when maxevents is reached
""" """
for key, value, error, ts, tags in self.generator: for event in self.generator:
ts = max(self.start_time, min(ts or INF, time.time())) kind, value, key, tags, ts = event
if ts >= self.next_hour: timestamp = max(self.start_time, min(ts or INF, time.time()))
ts_ = (ts // 3600) * 3600 if timestamp >= self.next_hour:
for key_, value_ in self.cache.items(): t = (timestamp // 3600) * 3600
events.append((True, value_, ts_, '.'.join(key_), 'float', self.get_tags(key_))) events.extend(e[:-1] + (t,) for e in self.cache.values())
for key_, error_ in self.errors.items(): self.next_hour = ts + 3600
events.append((False, error_, ts_, '.'.join(key_), 'error', self.get_tags(key_))) prev = self.cache[key][:2] if key in self.cache else (None, None)
self.next_hour = ts_ + 3600 if (kind, value) != prev:
if value != self.cache.get(key, None) or error != self.errors.get(key, None): if kind == 'error':
dbkey = '.'.join(key) if prev[0] == 'value':
events.append((True, value, ts, dbkey, 'float', tags)) events.append(('value', None, key, tags, timestamp))
self.cache[key] = value self.cache[key] = event
if error and self.errors.get(key) != error: elif kind == 'value':
events.append((False, error, ts, dbkey, 'error', tags)) self.cache[key] = event
self.errors[key] = error events.append(event)
elif len(events) >= maxevents: if len(events) >= maxevents:
return True return True
else: else:
if self.dead: if self.dead:
@ -262,17 +266,25 @@ class EventStream:
for stream in self.wait_ready(1): for stream in self.wait_ready(1):
if not isinstance(stream, Stream): if not isinstance(stream, Stream):
for streamcls, uri, kwargs in stream.events(): for streamcls, uri, kwargs in stream.events():
if uri not in self.streams: stream = self.streams.get(uri)
if stream:
stream.tags.update(kwargs)
else:
try: try:
self.streams[uri] = streamcls(uri, **kwargs) self.streams[uri] = stream = streamcls(uri, **kwargs)
print('added stream', uri, kwargs) print('added stream', uri, kwargs)
except Exception as e: except Exception as e:
print('can not connect to', uri, repr(e)) print('can not connect to', uri, repr(e))
continue
events.append(('stream', kwargs.get('instrument', '0'),
{}, uri, int(time.time())))
for name, stream in self.streams.items(): for name, stream in self.streams.items():
try: try:
if stream.get_events(events, maxevents): if stream.get_events(events, maxevents):
return events return events
except StreamDead: except StreamDead:
# indicate stream is removed
events.append(('stream', None, {}, uri, int(time.time())))
self.streams.pop(name) self.streams.pop(name)
if events: if events:
return events return events
@ -285,4 +297,5 @@ class EventStream:
for stream in self.streams.values(): for stream in self.streams.values():
stream.close() stream.close()
stream.finish_events(events, end_time) stream.finish_events(events, end_time)
events.append(('stream', None, {}, stream.uri, end_time))
return events return events

89
t.py
View File

@ -1,30 +1,34 @@
import time import time
import math
import numpy as np import numpy as np
from influx import InfluxDBWrapper, NamedTuple, RegExp from influx import InfluxDBWrapper, RegExp
DAY = 24 * 3600 DAY = 24 * 3600
token = "zqDbTcMv9UizfdTj15Fx_6vBetkM5mXN56EE9CiDaFsh7O2FFWZ2X4VwAAmdyqZr3HbpIr5ixRju07-oQmxpXw==" # token = "zqDbTcMv9UizfdTj15Fx_6vBetkM5mXN56EE9CiDaFsh7O2FFWZ2X4VwAAmdyqZr3HbpIr5ixRju07-oQmxpXw=="
db = InfluxDBWrapper('http://pc16392:8086', token, 'linse', 'curve-test') db = InfluxDBWrapper('linse-c')
print(""" print("""
qry([start], [stop], [interval=...,] [last=True,] [columns=[...],] [<tag>=<value>, ] ...) qry([start], [stop], [interval=...,] [last=True,] [columns=[...],] [<tag>=<value>, ] ...)
crv([start], [stop], [mod.par], ['float'], [interval=...,] [add_prev=False,] [add_end=True,] [<tag>=<value>, ] ...) crv([start], [stop], [mod.par], ['float'], [interval=...,] [add_prev=False,] [add_end=True,] [<tag>=<value>, ] ...)
""") """)
offset = (time.time() // 3600) * 3600 now = int(time.time())
offset = (now // 3600) * 3600
result = {} result = {}
maxcurves = 7
maxpoints = 7
def prt(): def prt():
for i, (key, curve) in enumerate(result.items()): for i, (key, curve) in enumerate(result.items()):
if i > 5: if i > maxcurves:
print('--- ...') print('--- ...')
break break
print('---', key, list(curve[0]._idx_by_name)) print('---', key, curve.column_names, [f'{k}={v}' for k, v in curve.tags.items() if k not in curve.key_names])
n = len(curve) n = len(curve)
if n > 7: if n > maxpoints:
curves = [curve[:3], None, curve[-3:]] curves = [curve[:3], None, curve[-3:]]
else: else:
curves = [curve] curves = [curve]
@ -37,40 +41,69 @@ def prt():
def qry(*args, **kwds): def qry(*args, **kwds):
global result result.clear()
result = db.query(*args, **kwds) result.update(db.query(*args, **kwds))
print('PRINT')
prt() prt()
def crv(*args, **kwds): def crv(*args, **kwds):
global result result.clear()
result = db.curves(*args, **kwds) res = db.curves(*args, **kwds)
if isinstance(res, list):
result[()] = res
else:
result.update(res)
prt() prt()
def sry(): def sry(prectime=False):
global result interval = 3600
res = db.query(-DAY * 365, interval=DAY, _field='float', res = db.query(-DAY * 365, interval=interval, _field='float',
device=None, stream=None, _measurement=None) device=None, stream=None, _measurement=None)
result = {} # dict (device, stream) of list of [start, end, set of params] by_day = {} # dict (device, stream) of list of [start, end, set of params]
for key, table in res.items(): for key, table in res.items():
assert table.key_names == ('device', 'stream', '_measurement') assert table.key_names == ('device', 'stream', '_measurement')
device, stream, param = key device, stream, param = key
for row in table: for row in table:
start = row[0] - 3600 tm = time.localtime(row[0] - interval)
result.setdefault((start, device, stream), set()).add(param) day = time.mktime(tm[0:3] + (0, 0, 0, 0, 0, -1))
key = (day, device, stream)
info = by_day.get(key)
start = row[0] - interval
if info:
info[0] = min(start, info[0])
info[1] = max(row[0], info[1])
else:
info = [start, row[0], set()]
by_day[key] = info
info[2].add(param)
prev_data = {} prev_data = {}
print('---')
summary = [] summary = []
for (start, device, stream), pset in sorted(result.items()): for (day, device, stream), (start, end, pset) in sorted(by_day.items()):
prev = prev_data.get((device, stream)) prev = prev_data.get((device, stream))
if prev is None or start > prev[1]: # merge continuous days, considering leap hour
if prev: if prev is None or day > prev[2] + 25 * 3600:
print('PREV', device, stream, start - prev[1]) experiment = [end, start, day, device, stream, pset]
prev_data[device, stream] = prev = [start, start + 3600, pset] summary.append(experiment)
summary.append([start, device, stream, prev]) prev_data[device, stream] = experiment
else: else:
prev[1] = start + 3600 prev[0] = end
prev[2].update(pset) prev[2] = day
for start, device, stream, (_, end, pset) in sorted(summary): prev[-1].update(pset)
st = time.strftime('%Y-%m-%d %H:%M', time.localtime(start)) result.clear()
print(st, (end - start) / 3600., device, stream, len(pset)) for end, start, _, device, stream, pset in sorted(summary):
if prectime:
res = db.query(start, end, device=device, stream=stream, single=-1)
first = int(min(t[0] for t in res.values()))
res = db.query(start, end, device=device, stream=stream, single=1)
last = math.ceil(max(t[0] for t in res.values()))
tm1 = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(first))
tm2 = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(last))
else:
first, last = start, end - 1
tm1 = time.strftime('%Y-%m-%d %Hh', time.localtime(first))
tm2 = time.strftime('%Y-%m-%d %Hh', time.localtime(last))
result.setdefault(device, []).append([first, last, device, stream, pset])
print(tm1, tm2, device, stream, len(pset))