diff --git a/feeder.py b/feeder.py index 95e47ed..928fc5d 100644 --- a/feeder.py +++ b/feeder.py @@ -1,6 +1,7 @@ +import sys from streams import EventStream from nicoscache import NicosStream -from secop import ScanStream, ScanReply +from secop import ScanStream, ScanReply, send_fake_udp from influx import testdb @@ -20,4 +21,7 @@ def main(): db.disconnect() -main() +if len(sys.argv) > 1: + send_fake_udp(*sys.argv[1:]) +else: + main() diff --git a/influx.py b/influx.py index d80f3e6..69c51c6 100644 --- a/influx.py +++ b/influx.py @@ -105,6 +105,30 @@ class NamedTuple(tuple): raise return default + @property + def names(self): + return tuple(self._idx_by_name) + + def tuple(self, *keys): + return tuple(self.get(k) for k in keys) + + +class Table(list): + """a list of tuples with meta info""" + def __init__(self, tags, key_names, column_names): + super().__init__() + self.tags = tags + self.key_names = key_names + self.column_names = column_names + + +class Single(Table): + """a single row of a table, as a list with meta info""" + def __init__(self, table): + super().__init__(table.tags, table.key_names, table.column_names) + single, = table + self[:] = single + class RegExp(str): """indicates, tht this string should be treated as regexp @@ -229,23 +253,28 @@ class InfluxDBWrapper: # query the database - def query(self, start=None, stop=None, interval=None, last=False, columns=None, **tags): + def query(self, start=None, stop=None, interval=None, single=None, columns=None, **tags): """Returns queried data as InfluxDB tables :param start: start time. default is a month ago :param stop: end time, default is tomorrow at the same time :param interval: if set an aggregation filter will be applied. This will return only the latest values per time interval in seconds. - :param last: when True, only the last value within the interval is returned - (for any existing combinations of tags!) + :param single: when True (or 1), only the last value within the interval is returned + (for any existing combinations of tags!) + single=-1: return the first value instead :param columns: if given, return only these columns (in addition to '_time' and '_value') :param tags: selection criteria: =None - return records independent of this tag. the value will be contained in the result dicts key + return records independent of this tag. + the obtained value will be contained in the result dicts key =[, , ...] - return records where tag is one from the list. the value is contained in the result dicts key - >=: - return only records with given tag matching . the value is not part of the results key + return records where tag is one from the list. + the obtained value is contained in the result dicts key + =: + return only records with given tag matching . + the obtained value is contained in the result dicts key only + if the value is an instance of RegExp or when it contains an asterisk ('*') :return: a dict of list of where and are NamedTuple @@ -255,55 +284,61 @@ class InfluxDBWrapper: msg = [f'from(bucket:"{self._bucket}")', f'|> range(start: {start}, stop: {stop})'] - keynames = [] + keys = {} dropcols = ['_start', '_stop'] + fixed_tags = {} for key, crit in tags.items(): if crit is None: - keynames.append(key) + keys[key] = None continue if isinstance(crit, str): if isinstance(crit, RegExp) or '*' in crit: - keynames.append(key) + keys[key] = None append_wildcard_filter(msg, key, [crit]) continue + fixed_tags[key] = crit dropcols.append(key) crit = f'"{crit}"' elif isinstance(crit, (int, float)): + fixed_tags[key] = crit dropcols.append(key) else: try: - keynames.append(key) + keys[key] = None append_wildcard_filter(msg, key, crit) continue except Exception: raise ValueError(f'illegal value for {key}: {crit}') msg.append(f'|> filter(fn:(r) => r.{key} == {crit})') - if last: - msg.append('|> last(column: "_time")') + if single: + if single < 0: + msg.append('|> first(column: "_time")') + else: + msg.append('|> last(column: "_time")') if interval: msg.append(f'|> aggregateWindow(every: {interval}s, fn: last, createEmpty: false)') if columns is None: msg.append(f'''|> drop(columns:["{'","'.join(dropcols)}"])''') else: columns = ['_time', '_value'] + list(columns) - msg.append(f'''|> keep(columns:["{'","'.join(columns + keynames)}"])''') + msg.append(f'''|> keep(columns:["{'","'.join(columns + keys)}"])''') msg = '\n'.join(msg) print(msg) reader = self._client.query_api().query_csv(msg) - print('CSV') + print('CSV', keys, columns) converters = None group = None column_names = None + column_keys = None key = None result = {} - table = None + tableno = None for row in reader: if not row: continue - print(row) if row[0]: if row[0] == '#datatype': converters = {i: CONVERTER.get(d) for i, d in enumerate(row) if i > 2} @@ -312,31 +347,27 @@ class InfluxDBWrapper: group = row continue if column_names is None: - print('COL', row) column_names = row - keys = {} for col, (name, grp) in enumerate(zip(column_names, group)): if grp != 'true': continue - # if name in keynames or (columns is None and name not in dropcols): - if name in keynames or columns is None: - keys[col] = converters.pop(col) - valuecls = NamedTuple([row[i] for i in converters]) - keycls = NamedTuple([row[i] for i in keys]) + if columns is None or name in keys: + keys[name] = col, converters.pop(col) + column_keys = tuple(column_names[i] for i in converters) continue - if row[2] != table: + if row[2] != tableno: # new table, new key - table = row[2] - key = keycls(f(row[i]) for i, f in keys.items()) - print('new table', table, key) + tableno = row[2] + key_dict = {n: f(row[i]) for n, (i, f) in keys.items()} + key = tuple(key_dict.values()) if result.get(key) is None: - result[key] = [] + print('KC', key_dict, column_keys) + result[key] = Table({**fixed_tags, **key_dict}, tuple(keys), column_keys) - result[key].append(valuecls(f(row[i]) for i, f in converters.items())) - if last: - print('LAST') + result[key].append(tuple(f(row[i]) for i, f in converters.items())) + if single: for key, table in result.items(): - result[key], = table + result[key] = Single(table) else: for table in result.values(): table.sort() @@ -354,9 +385,10 @@ class InfluxDBWrapper: :param add_prev: amount of time to look back for the last previous point (default: 1 hr) :param add_end: whether to add endpoint at stop time (default: False) :param tags: further selection criteria - :return: a dict of list of - where and are NamedTuple - is (, ) + :return: a dict of or + + where
is a list of tuples with some meta info (table.tags, table.column_names) + and is a list (a single row of a table) with the same meta info when _field='float' (the default), the returned values are either a floats or None """ @@ -370,7 +402,7 @@ class InfluxDBWrapper: else: result = {} if add_prev: - prev_data = self.query(rstart - add_prev, rstart, last=True, **tags) + prev_data = self.query(rstart - add_prev, rstart, single=1, **tags) for key, first in prev_data.items(): curve = result.get(key) if first[1] is not None: diff --git a/secop.py b/secop.py index fde46ee..4f85ec4 100644 --- a/secop.py +++ b/secop.py @@ -110,9 +110,12 @@ class UdpStream(Base): except socket.error: # pragma: no cover return None msg = json.loads(msg.decode('utf-8')) - if msg['SECoP'] != 'node': + if msg['SECoP'] == 'for_other_node': + uri = msg['uri'] + elif msg['SECoP'] == 'node': + uri = f"{addr[0]}:{msg['port']}" + else: continue - uri = f"{addr[0]}:{msg['port']}" yield SecopStream, uri, msg['equipment_id'] @@ -144,5 +147,14 @@ class ScanStream(UdpStream): self.select_dict[sock.fileno()] = self +def send_fake_udp(uri, equipment_id='fake'): + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + msg = json.dumps({ + 'SECoP': 'for_other_node', + 'uri': uri, + 'equipment_id': equipment_id, + }, ensure_ascii=False, separators=(',', ':')).encode('utf-8') + sock.sendto(msg, ('255.255.255.255', SECOP_UDP_PORT)) diff --git a/t.py b/t.py index a96d0cf..6b46093 100644 --- a/t.py +++ b/t.py @@ -46,3 +46,31 @@ def crv(*args, **kwds): global result result = db.curves(*args, **kwds) prt() + + +def sry(): + global result + res = db.query(-DAY * 365, interval=DAY, _field='float', + device=None, stream=None, _measurement=None) + result = {} # dict (device, stream) of list of [start, end, set of params] + for key, table in res.items(): + assert table.key_names == ('device', 'stream', '_measurement') + device, stream, param = key + for row in table: + start = row[0] - 3600 + result.setdefault((start, device, stream), set()).add(param) + prev_data = {} + summary = [] + for (start, device, stream), pset in sorted(result.items()): + prev = prev_data.get((device, stream)) + if prev is None or start > prev[1]: + if prev: + print('PREV', device, stream, start - prev[1]) + prev_data[device, stream] = prev = [start, start + 3600, pset] + summary.append([start, device, stream, prev]) + else: + prev[1] = start + 3600 + prev[2].update(pset) + for start, device, stream, (_, end, pset) in sorted(summary): + st = time.strftime('%Y-%m-%d %H:%M', time.localtime(start)) + print(st, (end - start) / 3600., device, stream, len(pset))