From c0aeca523a4240fe1d024d9c79ff618b4da18711 Mon Sep 17 00:00:00 2001 From: Markus Zolliker Date: Mon, 24 Feb 2025 14:54:54 +0100 Subject: [PATCH] major improvements and rework - add stream / instrument availability data - events contain event kind for dispatching db methods --- feeder.py | 15 +- influx.py | 520 +++++++++++++++++++++++++++++++++++--------------- nicoscache.py | 7 +- secop.py | 42 ++-- streams.py | 59 +++--- t.py | 89 ++++++--- 6 files changed, 501 insertions(+), 231 deletions(-) diff --git a/feeder.py b/feeder.py index 928fc5d..bb690bf 100644 --- a/feeder.py +++ b/feeder.py @@ -2,22 +2,25 @@ import sys from streams import EventStream from nicoscache import NicosStream from secop import ScanStream, ScanReply, send_fake_udp -from influx import testdb +from influx import InfluxDBWrapper def main(): # egen = EventStream(ScanReply(), ScanStream(), n=NicosStream('localhost:14002')) egen = EventStream(ScanReply(), ScanStream()) - db = testdb() + db = InfluxDBWrapper('linse-c') db.enable_write_access() + + event_map = {'value': db.add_float, 'error': db.add_error, 'stream': db.add_stream} + try: while 1: - for event in egen.get_events(): - db.add_point(*event) + for kind, *args in egen.get_events(): + event_map[kind](*args) db.flush() finally: - for event in egen.finish(): - db.add_point(*event) + for kind, *args in egen.finish(): + event_map[kind](*args) db.disconnect() diff --git a/influx.py b/influx.py index 69c51c6..76a49d7 100644 --- a/influx.py +++ b/influx.py @@ -21,16 +21,19 @@ # ***************************************************************************** import re import time -from datetime import datetime +from pathlib import Path +from configparser import ConfigParser +from datetime import datetime, timezone from math import floor, ceil from influxdb_client import InfluxDBClient, BucketRetentionRules, Point from influxdb_client.client.write_api import SYNCHRONOUS DAY = 24 * 3600 +YEAR = 366 * DAY # write_precision from digits after decimal point TIME_PRECISION = ['s'] + ['ms'] * 3 + ['us'] * 3 + ['ns'] * 3 - +UNDEF = '' try: parse_time = datetime.fromisoformat @@ -42,92 +45,104 @@ def to_time(v): return parse_time(v).timestamp() -def identity(v): - return v +def to_iso(t): + return datetime.fromtimestamp(t, timezone.utc).isoformat().replace('+00:00', 'Z') -def double(v): - return None if v == '-0' else float(v) +class PrettyFloat(float): + """saves bandwidth when converting to JSON + a lot of numbers originally have a fixed (low) number of decimal digits. + as the binary representation is not exact, it might happen, that a + lot of superfluous digits are transmitted: -CONVERTER = { - 'string': identity, - 'long': int, - 'double': double, - 'unsigned_long': int, - 'duration': int, - 'dateTime:RFC3339': to_time, - 'dateTime:RFC3339Nano': to_time, - # 'base64Binary': base64.b64decode, -} - - -class NamedTuple(tuple): - """for our purpose improved version of collection.namedtuple - - - names may be any string, but when not an identifer, attribute access is not possible - - access by key with get ([ ] is for indexed access) - - Usage: - - MyNamedTuple = NamedTuple.make_class(('a', 'b')) - x = MyNamedTuple(('a', 2.0)) - assert x == ('a', 2.0) == (x.a, x.b) == (x.get('a'), x.get('b')) == (x[0], x[1]) + str(1/10*3) == '0.30000000000000004' + str(PrettyFloat(1/10*3)) == '0.3' """ - keys = None - _idx_by_name = None + def __new__(cls, value): + return None if value == '-0' else super().__new__(cls, value) - def __new__(cls, keys): - """create NamedTuple class from keys + def __repr__(self): + return '%.15g' % self - :param keys: a sequence of names for the elements - """ - idxbyname = {n: i for i, n in enumerate(keys)} - attributes = {n: property(lambda s, idx=i: s[idx]) - for i, n in enumerate(keys) - if n.isidentifier() and not hasattr(cls, n)} - # clsname = '_'.join(attributes) - attributes.update(_idx_by_name=idxbyname, __new__=tuple.__new__, keys=tuple(keys)) - return type(f"NamedTuple", (cls,), attributes) - def get(self, key, default=None, strict=False): - """get item by key +class Converters(dict): + def __init__(self, datatypes): + super().__init__((i, getattr(self, f"cvt_{d.split(':')[0]}")) + for i, d in enumerate(datatypes) if i > 2) - :param key: the key - :param default: value to return when key does not exist - :param strict: raise KeyError when key does not exist and ignore default - :return: the value of requested element or default if the key does not exist - """ - try: - return self[self._idx_by_name[key]] - except KeyError: - if strict: - raise - return default + def as_tuple(self, row): + """get selected columns as tuple""" + return tuple(f(row[i]) for i, f in self.items()) - @property - def names(self): - return tuple(self._idx_by_name) + cvt_double = staticmethod(PrettyFloat) - def tuple(self, *keys): - return tuple(self.get(k) for k in keys) + @staticmethod + def cvt_string(value): + return value + + @staticmethod + def cvt_long(value): + return int(value) + + @staticmethod + def cvt_dateTime(value): + return to_time(value) + + @staticmethod + def cvt_boolean(value): + return value == 'true' + + cvt_unsigned_long = cvt_duration = cvt_long class Table(list): """a list of tuples with meta info""" - def __init__(self, tags, key_names, column_names): + def __init__(self, tags={}, key_names=(), column_names=(), rows=None): super().__init__() self.tags = tags self.key_names = key_names self.column_names = column_names + if rows: + self[:] = rows + + def to_csv_rows(self, timeoffset=0, sep='\t', none='none', float_format='%.15g'): + for row in self: + result = ['%.15g' % (row[0] - timeoffset)] + for value in row[1:]: + try: + result.append(float_format % value) + except TypeError: + if value is None: + result.append(none) + else: + result.append(str(value).replace(sep, ' ')) + yield sep.join(result) class Single(Table): """a single row of a table, as a list with meta info""" - def __init__(self, table): - super().__init__(table.tags, table.key_names, table.column_names) - single, = table - self[:] = single + def __init__(self, tags={}, key_names=(), column_names=(), rows=None): + super().__init__(tags, key_names, column_names) + if rows: + single_row, = rows + self[:] = single_row + + +def summarize_tags(curves, remove_multiple=False): + """summarize tags + + :param curves: list of curves (type Table) + :param remove_multiple: True: remove non-unique values + :return: dict of comma separated values + """ + result = {} + for curve in curves: + for k, v in curve.tags.items(): + result.setdefault(k, set()).add(str(v)) + if remove_multiple: + return {k: ','.join(v) for k, v in result.items() if len(v) == 1} + return {k: ','.join(v) for k, v in result.items()} class RegExp(str): @@ -158,13 +173,13 @@ class CurveDict(dict): def abs_range(start=None, stop=None): now = time.time() - if start is None: - start = int(now - 32 * DAY) - elif start < 366 * DAY: + if start is None: # since ever + start = 0 + elif start < YEAR: start = int(now + start) if stop is None: - stop = int(now + DAY) - elif stop < 366 * DAY: + stop = int(now + YEAR) + elif stop < YEAR: stop = ceil(now + stop) return start, stop @@ -185,19 +200,24 @@ class InfluxDBWrapper: _update_queue = None _write_api_write = None - def __init__(self, url, token, org, bucket, access='readonly'): + def __init__(self, uri=None, token=None, org=None, bucket=None, access='readonly'): """initialize - :param url: the url for the influx DB + :param uri: the uri for the influx DB or a name to look up in ~/.sehistory :param token: the token :param org: the organisation :param bucket: the bucket name :param access: 'readonly', 'write' (RW) or 'create' (incl. RW) """ - self._url = url - self._token = token - self._org = org - self._bucket = bucket + if ':' in uri: + args = uri, token, org, bucket + else: + parser = ConfigParser() + parser.optionxform = str + parser.read([Path('~').expanduser() / '.sehistory']) + section = parser[uri] + args = [section[k] for k in ('uri', 'token', 'org', 'bucket')] + self._url, self._token, self._org, self._bucket =args self._client = InfluxDBClient(url=self._url, token=self._token, org=self._org) if access != 'readonly': @@ -206,6 +226,8 @@ class InfluxDBWrapper: self.set_time_precision(3) self.add_new_bucket(self._bucket, access == 'create') self._write_buffer = [] + self._alias = {} + print('InfluxDBWrapper', self._url, self._org, self._bucket) def enable_write_access(self): self._write_api_write = self._client.write_api(write_options=SYNCHRONOUS).write @@ -240,24 +262,47 @@ class InfluxDBWrapper: import "influxdata/influxdb/schema" schema.measurements(bucket: "{self._bucket}")""") for r in t] - def delete_measurement(self, measurement): + def delete_measurement(self, measurement, start=None, stop=None): delete_api = self._client.delete_api() - delete_api.delete('1970-01-01T00:00:00Z', '2038-01-01T00:00:00Z', f'_measurement="{measurement}"', + start, stop = abs_range(start, stop) + if stop is None: + stop = time.time() + DAY + delete_api.delete(to_iso(start), to_iso(stop), f'_measurement="{measurement}"', bucket=self._bucket, org=self._org) - def delete_all_measurements(self): - measurements = self.get_measurements() + def delete_all_measurements(self, measurements=None, start=0, stop=None): + if measurements is None: + measurements = self.get_measurements() for meas in measurements: - self.delete_measurement(meas) - print('deleted', measurements) + self.delete_measurement(meas, start, stop) + + def _get_rows(self, reader, as_tuple, first_row): + row = first_row + tableno = row[2] + try: + while 1: + if row[0]: + first_row[:] = row + return + if row[2] != tableno: + # table id changed: new table, store first row for next call + first_row[:] = row + return + yield as_tuple(row) + row = next(reader) + if not row: + raise ValueError('EMPTY') + except StopIteration: + first_row.clear() # indicate end of data + # query the database def query(self, start=None, stop=None, interval=None, single=None, columns=None, **tags): """Returns queried data as InfluxDB tables - :param start: start time. default is a month ago - :param stop: end time, default is tomorrow at the same time + :param start: start time (default: since ever) + :param stop: end time (default: eternity = 1 year in the future) :param interval: if set an aggregation filter will be applied. This will return only the latest values per time interval in seconds. :param single: when True (or 1), only the last value within the interval is returned @@ -276,24 +321,44 @@ class InfluxDBWrapper: the obtained value is contained in the result dicts key only if the value is an instance of RegExp or when it contains an asterisk ('*') - :return: a dict of list of - where and are NamedTuple + :return: a dict of + Table is an extension of list, with some meta info + """ + result = {} + for rows, key, props in self.query_gen(start, stop, interval, single, columns, **tags): + if single: + result[key] = Single(*props, rows=rows) + else: + table = Table(*props, rows=rows) + table.sort() + result[key] = table + return result + + def query_gen(self, start=None, stop=None, interval=None, single=None, columns=None, **tags): + """Returns queried data as InfluxDB as a generator + + argument description: see query methods + + :return: an iterator of (rows, key, (tags, key_names, column_names)) + + remark: rows not consumed in between iteration steps are lost + using this generator version does reduce memory usage """ self.flush() start, stop = round_range(*abs_range(start, stop)) msg = [f'from(bucket:"{self._bucket}")', f'|> range(start: {start}, stop: {stop})'] - keys = {} + keylist = [] dropcols = ['_start', '_stop'] fixed_tags = {} for key, crit in tags.items(): if crit is None: - keys[key] = None + keylist.append(key) continue if isinstance(crit, str): if isinstance(crit, RegExp) or '*' in crit: - keys[key] = None + keylist.append(key) append_wildcard_filter(msg, key, [crit]) continue fixed_tags[key] = crit @@ -304,7 +369,7 @@ class InfluxDBWrapper: dropcols.append(key) else: try: - keys[key] = None + keylist.append(key) append_wildcard_filter(msg, key, crit) continue except Exception: @@ -316,84 +381,96 @@ class InfluxDBWrapper: else: msg.append('|> last(column: "_time")') if interval: - msg.append(f'|> aggregateWindow(every: {interval}s, fn: last, createEmpty: false)') + msg.append(f'|> aggregateWindow(every: {interval:g}s, fn: last, createEmpty: false)') if columns is None: msg.append(f'''|> drop(columns:["{'","'.join(dropcols)}"])''') else: columns = ['_time', '_value'] + list(columns) - msg.append(f'''|> keep(columns:["{'","'.join(columns + keys)}"])''') + msg.append(f'''|> keep(columns:["{'","'.join(columns + keylist)}"])''') msg = '\n'.join(msg) - print(msg) + # print(msg) + self.msg = msg - reader = self._client.query_api().query_csv(msg) - print('CSV', keys, columns) - converters = None - group = None - column_names = None - column_keys = None - key = None - result = {} - tableno = None + try: + reader = self._client.query_api().query_csv(msg) + except Exception: + print(msg) + raise - for row in reader: - if not row: - continue - if row[0]: - if row[0] == '#datatype': - converters = {i: CONVERTER.get(d) for i, d in enumerate(row) if i > 2} - column_names = None - elif row[0] == '#group': - group = row - continue - if column_names is None: + try: + row = next(reader) + except StopIteration: + return + converters = key_dict = table_properties = None # make IDE happy + for i in range(5): + header = {} + if row[0]: # this is a header + header[row[0]] = row + for row in reader: + if row: + if not row[0]: + break + header[row[0]] = row + else: + return # this should not happen + # we are now at the row with the column names column_names = row + converters = Converters(header['#datatype']) + group = header['#group'] + keys = {k: None for k in keylist} + for col, (name, grp) in enumerate(zip(column_names, group)): if grp != 'true': continue if columns is None or name in keys: keys[name] = col, converters.pop(col) - column_keys = tuple(column_names[i] for i in converters) - continue - if row[2] != tableno: - # new table, new key - tableno = row[2] - key_dict = {n: f(row[i]) for n, (i, f) in keys.items()} - key = tuple(key_dict.values()) - if result.get(key) is None: - print('KC', key_dict, column_keys) - result[key] = Table({**fixed_tags, **key_dict}, tuple(keys), column_keys) - - result[key].append(tuple(f(row[i]) for i, f in converters.items())) - if single: - for key, table in result.items(): - result[key] = Single(table) - else: - for table in result.values(): - table.sort() - return result + none_keys = [k for k, v in keys.items() if v is None] + if none_keys: + for k in none_keys: + keys.pop(k) + # break + row = next(reader) + # we are at the first data row + key_dict = {n: f(row[i]) for n, (i, f) in keys.items()} + column_keys = tuple(column_names[i] for i in converters) + table_properties = {**fixed_tags, **key_dict}, tuple(keys), column_keys + key = tuple(key_dict.values()) + row = list(row) # copy row, as it will be modified + rows = self._get_rows(reader, converters.as_tuple, row) + yield rows, key, table_properties + # consume unused rows + consumed = sum(1 for _ in rows) + if consumed: + print('skip', consumed, 'rows') + if not row: # reader is at end + return def curves(self, start=None, stop=None, measurement=('*.value', '*.target'), field='float', - interval=None, add_prev=3600, add_end=True, **tags): + interval=None, add_prev=3600, add_end=False, merge=None, pivot=False, **kwds): """get curves - :param start: start time (default: one month ago) - :param stop: end time (default: tomorrow) + :param start: start time (default: since ever) + :param stop: end time (default: eternity = 1 year in the future) :param measurement: '.' (default: ['*.value', '*.target']) :param field: default 'float' (only numeric curves) :param interval: if given, the result is binned :param add_prev: amount of time to look back for the last previous point (default: 1 hr) :param add_end: whether to add endpoint at stop time (default: False) - :param tags: further selection criteria - :return: a dict of
or + :param merge: None: no merge happens, else curves with the same final key are merged. 2 cases: + one key (str): the name of final key. result will be a dict of of
+ a tuple of keys: the names of the final key elements. result: dict of of Table + :param pivot: sort values in to columns of one big table + :param kwds: further selection criteria + :return: a dict of
or where
is a list of tuples with some meta info (table.tags, table.column_names) and is a list (a single row of a table) with the same meta info when _field='float' (the default), the returned values are either a floats or None """ - tags.setdefault('_measurement', measurement) - tags.setdefault('_field', field) + tags = {k: v for k, v in (('_measurement', measurement), ('_field', field)) if v is not None} + tags.update(kwds) start, stop = abs_range(start, stop) rstart, rstop = round_range(start, stop, interval) if rstart < rstop: @@ -401,6 +478,7 @@ class InfluxDBWrapper: # result = self.query(rstart, rstop, interval, columns=['stream', 'device'], **tags) else: result = {} + start_row = {} if add_prev: prev_data = self.query(rstart - add_prev, rstart, single=1, **tags) for key, first in prev_data.items(): @@ -408,21 +486,105 @@ class InfluxDBWrapper: if first[1] is not None: if curve: if first[0] < curve[0][0]: - curve.insert(0, first) + if pivot: + curve.insert(0, (rstart,) + tuple(first[1:])) + # start_row.setdefault(key[1:], {})[key[0]] = first[1] + else: + curve.insert(0, tuple(first)) else: - result[key] = [first] + result[key] = table = Table(first.tags, first.key_names, first.column_names) + table.append(tuple(first)) if add_end: + self.complete(result, stop) + if merge: + single_merge = isinstance(merge, str) + if single_merge: + merge = [merge] + rows = [] + common_tags = summarize_tags(result.values(), True) + col_keys = {} + col_info = {} for key, curve in result.items(): - if curve: - last = list(curve[-1]) - if last[0] < stop: - last[0] = stop - curve.append(type(curve[-1])(last)) + merge_tags = {k: curve.tags.get(k, '') for k in merge} + for k, v in zip(curve.key_names, key): + merge_tags.setdefault(k, v) + merge_key = tuple(zip(*merge_tags.items())) # (, ) + info = col_info.get(merge_key) + if info is None: + col_idx = len(col_keys) + 1 + col_keys[col_idx] = merge_key + col_info[merge_key] = info = [col_idx, []] + else: + col_idx = info[0] + info[1].append(curve) + assert curve.column_names[1] == '_value' + for row in curve: + rows.append((row[0], row[1], col_idx)) + # merged.append((merge_key, curve)) + rows.sort(key=lambda x: x[0]) + if pivot: + header = [] + for merge_key, (col_idx, curves) in col_info.items(): + tags = summarize_tags(curves) + primary = tags.pop(merge[0]) + header.append(' '.join([primary] + [f"{k}={v}" for k, v in tags.items() if k not in common_tags])) + result = Table(common_tags, (), ('_time',) + tuple(header)) + values = [0] + [None] * len(col_keys) + for row in rows: + col_nr = row[2] + values[col_nr] = row[1] + if row[0] > values[0]: + values[0] = row[0] + result.append(tuple(values)) + elif row[0] < values[0]: + raise ValueError(f'{rstart} {values[0]} {row[0]}') + else: + result = {} + by_idx = {} + for merge_key, (col_idx, curves) in col_info.items(): + tags = summarize_tags(curves) + primary = tags[merge[0]] + table = Table(tags, merge_key[0], ('_time', primary)) + result[primary if single_merge else merge_key[1][:len(merge)]] = table + by_idx[col_idx] = table + for row in rows: + by_idx[row[2]].append((row[0], row[1])) return result + @staticmethod + def complete(curve_dict, end_time=0, tag='stream'): + """complete to end_time + + if end_time is not given, is is the max timestamp within the same stream + """ + end_time_dict = {} + if not end_time: + for curve in curve_dict.values(): + key = curve.tags.get(tag) + end_time_dict[key] = max(end_time_dict.get(key, 0), curve[-1][0]) + for curve in curve_dict.values(): + if len(curve): + tlast, value = curve[-1] + etime = end_time_dict.get(curve.tags.get(tag), end_time) + if value is not None and tlast < etime: + curve.append((etime, value)) + + def export(self, start, stop, measurement=('*.value', '*.target'), field='float', + interval=None, add_prev=3600, add_end=False, timeoffset=None, none='', **tags): + result = self.curves(start, stop, measurement, field, interval, add_prev, add_end, + merge=('_measurement', 'device', 'stream'), pivot=True, **tags) + if timeoffset is None: + timeoffset = int(start) + result.tags.pop('_field', None) + rows = [f"# {' '.join(f'{k}={v}' for k, v in result.tags.items())}"] + rows.extend(f'# col {i} {k}' for i, k in enumerate(result.column_names)) + rows.extend(result.to_csv_rows(timeoffset, none=none)) + rows.append('') + return '\n'.join(rows) + # write to the database - def _add_point(self, value, ts, measurement, field, tags): + def _add_point(self, measurement, field, value, ts, tags): point = Point(measurement).field(f'{field}', value) if ts: point.time(datetime.utcfromtimestamp(ts), write_precision=self._write_precision) @@ -447,16 +609,66 @@ class InfluxDBWrapper: raise PermissionError('no write access - need access="write"') from None raise - def add_point(self, isfloat, value, *args): - """add point to the buffer + # TODO: move these sehistory related methods to a subclass + def add_float(self, value, key, tags, ts): + self._add_point('.'.join(key), 'float', -0.0 if value is None else float(value), ts, tags) - flush must be called in order to write the buffer + def add_error(self, value, key, tags, ts): + self._add_point('.'.join(key), 'error', '' if value is None else str(value), ts, tags) + + def get_instrument(self, stream, ts=None, **tags): + if ts is None: + ts = int(time.time()) + reply = self.query(None, int(ts) + 1, _measurement='_stream_', _field='on', + stream=stream, single=1, **tags) + if reply: + entry = sorted(reply.values(), key=lambda r: r[0])[-1] + return entry.tags.get('instrument'), entry[0] + return None, None + + def get_streams(self, instrument=None, ts=None, **tags): + """get streams for one or all instruments + + :param instrument: None when looking for all instruments + :param ts: the time or None when now + :return: dict of or '0' when instrument is not known """ - if isfloat: - # make sure value is float - self._add_point(-0.0 if value is None else float(value), *args) + if ts is None: + ts = int(time.time()) + reply = self.query(None, int(ts) + 1, _measurement='_stream_', _field='on', + single=1, instrument=instrument, **tags) + all_entries = {} + for entry in reply.values(): + all_entries.setdefault(entry.tags.get('stream'), []).append(entry) + result = {} + for stream, entries in all_entries.items(): + entry = sorted(entries, key=lambda r: r[0])[-1] + if entry[1]: # on=True + result[stream] = entry.tags.get('instrument', '0') + return result + + def set_instrument(self, stream, value, ts=None, **tags): + """set stream and instrument on or off + + :param stream: the uri of the stream + :param value: instrument, "0" when unknown or None when switching to off + :param ts: the time or None when now + """ + prev, row = self.get_instrument(stream, ts, **tags) + if row is not None: + if prev in (None, '0') or ts < row[0]: + ts = prevts + 0.001 + tags['stream'] = stream + if value: + tags['instrument'] = value + flag = True else: - self._add_point('' if value is None else str(value), *args) + tags['instrument'] = prev or '0' + flag = False + self._add_point('_stream_', 'on', flag, ts, tags) + + def add_stream(self, value, tags, key, ts): + self.set_instrument(key, value, ts, **tags) def testdb(): diff --git a/nicoscache.py b/nicoscache.py index 4d30370..ab5ef59 100644 --- a/nicoscache.py +++ b/nicoscache.py @@ -156,7 +156,6 @@ class NicosStream(Stream): except Exception as e: print(self.uri, repr(e)) return - cnt = 0 for ts, devname, param, op, value in sorted([t, d, p, o, v] for (d, p), (o, v, t) in events.items()): descr = self.descr.get(devname) mod = descr.get('secop_module', devname) if descr else devname @@ -164,11 +163,9 @@ class NicosStream(Stream): if self.devices.get(devname): try: value = self.convert[key](value) + yield 'value', value, key, self.tags, ts error = None except KeyError: # no conversion function continue except TypeError: - value = None - error = 'error' - cnt += 1 - yield key, value, error, ts, self.get_tags(key) + yield 'error', 'error', key, self.tags, ts diff --git a/secop.py b/secop.py index 96ca481..30dcbe8 100644 --- a/secop.py +++ b/secop.py @@ -16,6 +16,14 @@ class EnumConvert(dict): return float(self[value]) +class TagsDict(dict): + def __init__(self, default_value): + self.default_value = default_value + + def __missing__(self, key): + return self.default_value + + class SecopStream(Stream): ping_time = 0 @@ -47,12 +55,12 @@ class SecopStream(Stream): self.tags['device'] = self.device self.modules = self.descr['modules'] self.convert = {} - self.original_id = {} + self.tags_dict = TagsDict(self.tags) for mod, moddesc in self.modules.items(): for key in ('_original_id', 'original_id'): value = moddesc.get(key) if value: - self.original_id[mod] = value + self.tags_dict[mod] = dict(self.tags, device=value) break for param, desc in moddesc['accessibles'].items(): dt = desc['datainfo'] @@ -64,11 +72,10 @@ class SecopStream(Stream): self.send('ping') def get_tags(self, key): - return dict(self.tags, device=self.original_id.get(key[0], self.device)) + return self.tags_dict[key[0]] def event_generator(self): try: - cnt = 0 for msg in self.get_lines(): match = UPDATE.match(msg) if match: @@ -78,23 +85,23 @@ class SecopStream(Stream): cvt = self.convert.get(key) if cvt: data = json.loads(data) + tags = self.tags_dict[key[0]] if cmd == 'error_update': error = ': '.join(data[0:2]) print(msg, repr(error)) - ts = data[2].get('t', time.time()) - value = None + timestamp = data[2].get('t', time.time()) + yield 'error', error, key, tags, timestamp else: - error = None - ts = data[1].get('t', time.time()) value = cvt(data[0]) - cnt += 1 - yield key, value, error, ts, self.get_tags(key) + timestamp = data[1].get('t', time.time()) + yield 'value', value, key, tags, timestamp elif msg == 'active': # from now on, no more waiting self.notimeout() except Exception as e: print(self.uri, repr(e)) + raise SECOP_UDP_PORT = 10767 @@ -115,6 +122,8 @@ class UdpStream(Base): continue if kind == 'for_other_node': uri = msg.pop('uri') + if 'device' not in msg: + msg['device'] = uri.split('://', 1)[-1].split(':')[0] kwargs = msg elif kind == 'node': uri = f"{addr[0]}:{msg['port']}" @@ -152,14 +161,17 @@ class ScanStream(UdpStream): self.select_dict[sock.fileno()] = self -def send_fake_udp(uri, device='fake'): +def send_fake_udp(uri, device=None, instrument=None): sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) - msg = json.dumps({ + msg = { 'SECoP': 'for_other_node', 'uri': uri, - 'device': device, - }, ensure_ascii=False, separators=(',', ':')).encode('utf-8') - sock.sendto(msg, ('255.255.255.255', SECOP_UDP_PORT)) + } + if device: + msg['device'] = device + msg['instrument'] = instrument or '0' + sock.sendto(json.dumps(msg, ensure_ascii=False, separators=(',', ':')).encode('utf-8'), + ('255.255.255.255', SECOP_UDP_PORT)) diff --git a/streams.py b/streams.py index 5ebe3a1..d77809d 100644 --- a/streams.py +++ b/streams.py @@ -56,7 +56,7 @@ class Stream(Base): self.encoding = encoding self.timeout = timeout self.socket = None - self.cache = {} + self.cache = {} # dict of event self.errors = {} self.start_time = time.time() self.next_hour = (self.start_time // 3600 + 1) * 3600 @@ -187,6 +187,11 @@ class Stream(Base): break def event_generator(self): + """a generator returning events + + events are (, , , , ) + kind is one of 'error', 'value', 'stream' + """ raise NotImplementedError def get_tags(self, key): @@ -196,9 +201,8 @@ class Stream(Base): def finish_events(self, events, end_time): for key in list(self.cache): self.cache.pop(key) - dbkey = '.'.join(key) - events.append((True, None, end_time, dbkey, 'float', self.tags)) - events.append((False, 'END', end_time, dbkey, 'error', self.tags)) + events.append(('value', None, key, self.tags, end_time)) + events.append(('error', 'END', key, self.tags, end_time)) def get_events(self, events, maxevents): """get available events @@ -208,23 +212,23 @@ class Stream(Base): there might be more after a full hour or when the stream is dying :return: True when maxevents is reached """ - for key, value, error, ts, tags in self.generator: - ts = max(self.start_time, min(ts or INF, time.time())) - if ts >= self.next_hour: - ts_ = (ts // 3600) * 3600 - for key_, value_ in self.cache.items(): - events.append((True, value_, ts_, '.'.join(key_), 'float', self.get_tags(key_))) - for key_, error_ in self.errors.items(): - events.append((False, error_, ts_, '.'.join(key_), 'error', self.get_tags(key_))) - self.next_hour = ts_ + 3600 - if value != self.cache.get(key, None) or error != self.errors.get(key, None): - dbkey = '.'.join(key) - events.append((True, value, ts, dbkey, 'float', tags)) - self.cache[key] = value - if error and self.errors.get(key) != error: - events.append((False, error, ts, dbkey, 'error', tags)) - self.errors[key] = error - elif len(events) >= maxevents: + for event in self.generator: + kind, value, key, tags, ts = event + timestamp = max(self.start_time, min(ts or INF, time.time())) + if timestamp >= self.next_hour: + t = (timestamp // 3600) * 3600 + events.extend(e[:-1] + (t,) for e in self.cache.values()) + self.next_hour = ts + 3600 + prev = self.cache[key][:2] if key in self.cache else (None, None) + if (kind, value) != prev: + if kind == 'error': + if prev[0] == 'value': + events.append(('value', None, key, tags, timestamp)) + self.cache[key] = event + elif kind == 'value': + self.cache[key] = event + events.append(event) + if len(events) >= maxevents: return True else: if self.dead: @@ -262,17 +266,25 @@ class EventStream: for stream in self.wait_ready(1): if not isinstance(stream, Stream): for streamcls, uri, kwargs in stream.events(): - if uri not in self.streams: + stream = self.streams.get(uri) + if stream: + stream.tags.update(kwargs) + else: try: - self.streams[uri] = streamcls(uri, **kwargs) + self.streams[uri] = stream = streamcls(uri, **kwargs) print('added stream', uri, kwargs) except Exception as e: print('can not connect to', uri, repr(e)) + continue + events.append(('stream', kwargs.get('instrument', '0'), + {}, uri, int(time.time()))) for name, stream in self.streams.items(): try: if stream.get_events(events, maxevents): return events except StreamDead: + # indicate stream is removed + events.append(('stream', None, {}, uri, int(time.time()))) self.streams.pop(name) if events: return events @@ -285,4 +297,5 @@ class EventStream: for stream in self.streams.values(): stream.close() stream.finish_events(events, end_time) + events.append(('stream', None, {}, stream.uri, end_time)) return events diff --git a/t.py b/t.py index 6b46093..80ec610 100644 --- a/t.py +++ b/t.py @@ -1,30 +1,34 @@ import time +import math import numpy as np -from influx import InfluxDBWrapper, NamedTuple, RegExp +from influx import InfluxDBWrapper, RegExp DAY = 24 * 3600 -token = "zqDbTcMv9UizfdTj15Fx_6vBetkM5mXN56EE9CiDaFsh7O2FFWZ2X4VwAAmdyqZr3HbpIr5ixRju07-oQmxpXw==" +# token = "zqDbTcMv9UizfdTj15Fx_6vBetkM5mXN56EE9CiDaFsh7O2FFWZ2X4VwAAmdyqZr3HbpIr5ixRju07-oQmxpXw==" -db = InfluxDBWrapper('http://pc16392:8086', token, 'linse', 'curve-test') +db = InfluxDBWrapper('linse-c') print(""" qry([start], [stop], [interval=...,] [last=True,] [columns=[...],] [=, ] ...) crv([start], [stop], [mod.par], ['float'], [interval=...,] [add_prev=False,] [add_end=True,] [=, ] ...) """) -offset = (time.time() // 3600) * 3600 +now = int(time.time()) +offset = (now // 3600) * 3600 result = {} +maxcurves = 7 +maxpoints = 7 def prt(): for i, (key, curve) in enumerate(result.items()): - if i > 5: + if i > maxcurves: print('--- ...') break - print('---', key, list(curve[0]._idx_by_name)) + print('---', key, curve.column_names, [f'{k}={v}' for k, v in curve.tags.items() if k not in curve.key_names]) n = len(curve) - if n > 7: + if n > maxpoints: curves = [curve[:3], None, curve[-3:]] else: curves = [curve] @@ -37,40 +41,69 @@ def prt(): def qry(*args, **kwds): - global result - result = db.query(*args, **kwds) + result.clear() + result.update(db.query(*args, **kwds)) + print('PRINT') prt() def crv(*args, **kwds): - global result - result = db.curves(*args, **kwds) + result.clear() + res = db.curves(*args, **kwds) + if isinstance(res, list): + result[()] = res + else: + result.update(res) prt() -def sry(): - global result - res = db.query(-DAY * 365, interval=DAY, _field='float', +def sry(prectime=False): + interval = 3600 + res = db.query(-DAY * 365, interval=interval, _field='float', device=None, stream=None, _measurement=None) - result = {} # dict (device, stream) of list of [start, end, set of params] + by_day = {} # dict (device, stream) of list of [start, end, set of params] for key, table in res.items(): assert table.key_names == ('device', 'stream', '_measurement') device, stream, param = key for row in table: - start = row[0] - 3600 - result.setdefault((start, device, stream), set()).add(param) + tm = time.localtime(row[0] - interval) + day = time.mktime(tm[0:3] + (0, 0, 0, 0, 0, -1)) + key = (day, device, stream) + info = by_day.get(key) + start = row[0] - interval + if info: + info[0] = min(start, info[0]) + info[1] = max(row[0], info[1]) + else: + info = [start, row[0], set()] + by_day[key] = info + info[2].add(param) prev_data = {} + print('---') summary = [] - for (start, device, stream), pset in sorted(result.items()): + for (day, device, stream), (start, end, pset) in sorted(by_day.items()): prev = prev_data.get((device, stream)) - if prev is None or start > prev[1]: - if prev: - print('PREV', device, stream, start - prev[1]) - prev_data[device, stream] = prev = [start, start + 3600, pset] - summary.append([start, device, stream, prev]) + # merge continuous days, considering leap hour + if prev is None or day > prev[2] + 25 * 3600: + experiment = [end, start, day, device, stream, pset] + summary.append(experiment) + prev_data[device, stream] = experiment else: - prev[1] = start + 3600 - prev[2].update(pset) - for start, device, stream, (_, end, pset) in sorted(summary): - st = time.strftime('%Y-%m-%d %H:%M', time.localtime(start)) - print(st, (end - start) / 3600., device, stream, len(pset)) + prev[0] = end + prev[2] = day + prev[-1].update(pset) + result.clear() + for end, start, _, device, stream, pset in sorted(summary): + if prectime: + res = db.query(start, end, device=device, stream=stream, single=-1) + first = int(min(t[0] for t in res.values())) + res = db.query(start, end, device=device, stream=stream, single=1) + last = math.ceil(max(t[0] for t in res.values())) + tm1 = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(first)) + tm2 = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(last)) + else: + first, last = start, end - 1 + tm1 = time.strftime('%Y-%m-%d %Hh', time.localtime(first)) + tm2 = time.strftime('%Y-%m-%d %Hh', time.localtime(last)) + result.setdefault(device, []).append([first, last, device, stream, pset]) + print(tm1, tm2, device, stream, len(pset))