diff --git a/secop.py b/secop.py index 7f2593d..5321d5e 100644 --- a/secop.py +++ b/secop.py @@ -3,6 +3,7 @@ import os import json import time import socket +from collections import namedtuple from select import select from streams import Stream, Base, StreamDead @@ -24,6 +25,9 @@ class TagsDict(dict): return self.default_value +ParamInfo = namedtuple('ParamInfo', ['cvt', 'key', 'tags']) + + class SecopStream(Stream): ping_time = 0 @@ -57,17 +61,23 @@ class SecopStream(Stream): self.param_info = {} self.tags_dict = TagsDict(self.tags) for mod, moddesc in self.modules.items(): + mod_tags = None for key in ('_original_id', 'original_id'): value = moddesc.get(key) if value: - self.tags_dict[mod] = dict(self.tags, device=value) + mod_tags = dict(self.tags, device=value) break parameters = moddesc['accessibles'] for param, desc in parameters.items(): dt = desc['datainfo'] if dt['type'] in ('double', 'int', 'enum'): stripped = param[1:] if param.startswith('_') else param - self.param_info[mod, param] = float, (mod, param if stripped in parameters else stripped) + unit = dt.get('unit') + tags = self.tags or mod_tags + if unit: + tags = dict(tags, unit=unit) + key = mod, (param if stripped in parameters else stripped) + self.param_info[mod, param] = ParamInfo(float, key, tags) self.send('activate') def ping(self): @@ -83,27 +93,26 @@ class SecopStream(Stream): if match: cmd, ident, data = match.groups() mod, _, param = ident.partition(':') - cvt_key = self.param_info.get((mod, param or 'value')) - if cvt_key: - cvt, key = cvt_key + pinfo = self.param_info.get((mod, param or 'value')) + if pinfo: data = json.loads(data) - tags = self.tags_dict[key[0]] + tags = self.tags_dict[pinfo.key[0]] if cmd == 'error_update': error = ': '.join(data[0:2]) - print(msg, repr(error)) + # print(msg, repr(error)) timestamp = data[2].get('t', time.time()) - yield 'error', error, key, tags, timestamp + yield 'error', error, pinfo.key, pinfo.tags, timestamp else: - value = cvt(data[0]) + value = pinfo.cvt(data[0]) timestamp = data[1].get('t', time.time()) - yield 'value', value, key, tags, timestamp + yield 'value', value, pinfo.key, pinfo.tags, timestamp elif msg == 'active': # from now on, no more waiting self.notimeout() except Exception as e: + # probably other end closed print(self.uri, repr(e)) - raise SECOP_UDP_PORT = 10767 @@ -118,6 +127,7 @@ class UdpStream(Base): msg, addr = self.socket.recvfrom(1024) except socket.error: # pragma: no cover return None + addr = socket.getnameinfo(addr, socket.NI_NOFQDN)[0] msg = json.loads(msg.decode('utf-8')) kind = msg.pop('SECoP', None) if not kind: @@ -128,7 +138,7 @@ class UdpStream(Base): # msg['device'] = uri.split('://', 1)[-1].split(':')[0] kwargs = msg elif kind == 'node': - uri = f"{addr[0]}:{msg['port']}" + uri = f"{addr}:{msg['port']}" kwargs = {'name': msg['equipment_id']} else: continue diff --git a/seinflux.py b/seinflux.py index 455ccaf..2583ef5 100644 --- a/seinflux.py +++ b/seinflux.py @@ -132,8 +132,10 @@ class SEHistory(InfluxDBWrapper): for merge_key, (col_idx, curves) in col_info.items(): tags = summarize_tags(curves) primary = tags.get(merge[0], '_value') - table = Table(tags, merge_key[0], ('_time', primary)) - result[primary if single_merge else merge_key[1][:len(merge)]] = table + final_key = primary if single_merge else merge_key[1][:len(merge)] + table = result.get(final_key) + if table is None: + result[final_key] = table = Table(tags, merge_key[0], ('_time', primary)) by_idx[col_idx] = table for row in rows: by_idx[row[2]].append((row[0], row[1])) @@ -221,6 +223,7 @@ class SEHistory(InfluxDBWrapper): for stream in sorted(all_entries): entries = all_entries[stream] current = None + last = None instrument = None for entry in sorted(entries, key=lambda e: e[0][0]): (ts, flag), tags = entry @@ -232,10 +235,13 @@ class SEHistory(InfluxDBWrapper): tags['instrument'] = instrument current = [ts, tags] elif current: - if ts < current[0] + 60: - # probably not containing real data - current = None - if current or include_finished: + if ts > current[0] + 60: + # else its probably not containing real data + last = current + current = None + if include_finished: + current = last + if current: tags = current[1] ins = tags.get('instrument') if ins == '0': @@ -269,12 +275,15 @@ class SEHistory(InfluxDBWrapper): start, end = abs_range(start, end) inperiod = self.query(start, end, _measurement='_stream_', _field='on', interval=interval, stream=stream, device=None, instrument=None, **tags) + # prepend previous to the items in period or create if not there for key, rows in previous.items(): if key in inperiod: inperiod[key].insert(0, rows[0]) else: inperiod[key] = rows + # append items after the ones in period, ignoring keys not present in period + # in the same go, create by_stream dict, joining tables with common stream by_stream = {} # dict of [, , , ] for key, table in inperiod.items(): nextrow = nextrows.get(key) @@ -284,12 +293,13 @@ class SEHistory(InfluxDBWrapper): elist = by_stream.setdefault(stream, []) for row in table: elist.append([row[0], row[1], stream, device, instrument]) + # combine now by either instrument or stream, if instrument is undefined (='0') by_key = {} for stream, rows in by_stream.items(): rows.sort() instrument = '0' for row in rows: - if row[-1] is None: + if not row[-1]: row[-1] = instrument else: instrument = row[-1] @@ -364,7 +374,7 @@ class SEHistory(InfluxDBWrapper): :param value: instrument, "0" to unassign the instrument or None when switching the stream off :param ts: the time or None when now """ - flag = bool(value) + flag = value is not None try: previns, prevts = self.get_instrument(stream, ts, **tags) if prevts is not None and (previns is None or (ts or 1e10) < prevts): diff --git a/streams.py b/streams.py index 02c4478..e3dc056 100644 --- a/streams.py +++ b/streams.py @@ -290,7 +290,7 @@ class EventStream: print('can not connect to', uri, repr(e), streamcls) continue device = stream.tags.get('device') - events.append(('stream', kwargs.get('instrument'), + events.append(('stream', kwargs.get('instrument', ''), {'device': device}, stream.uri, int(time.time()))) for name, stream in self.streams.items(): try: @@ -298,7 +298,7 @@ class EventStream: return events except StreamDead: # indicate stream is removed - events.append(('stream', None, {}, uri, int(time.time()))) + events.append(('stream', None, {}, stream.uri, int(time.time()))) self.streams.pop(name) if events: return events