improve set_instrument
- this still needs some checks
This commit is contained in:
34
secop.py
34
secop.py
@ -3,6 +3,7 @@ import os
|
|||||||
import json
|
import json
|
||||||
import time
|
import time
|
||||||
import socket
|
import socket
|
||||||
|
from collections import namedtuple
|
||||||
from select import select
|
from select import select
|
||||||
from streams import Stream, Base, StreamDead
|
from streams import Stream, Base, StreamDead
|
||||||
|
|
||||||
@ -24,6 +25,9 @@ class TagsDict(dict):
|
|||||||
return self.default_value
|
return self.default_value
|
||||||
|
|
||||||
|
|
||||||
|
ParamInfo = namedtuple('ParamInfo', ['cvt', 'key', 'tags'])
|
||||||
|
|
||||||
|
|
||||||
class SecopStream(Stream):
|
class SecopStream(Stream):
|
||||||
ping_time = 0
|
ping_time = 0
|
||||||
|
|
||||||
@ -57,17 +61,23 @@ class SecopStream(Stream):
|
|||||||
self.param_info = {}
|
self.param_info = {}
|
||||||
self.tags_dict = TagsDict(self.tags)
|
self.tags_dict = TagsDict(self.tags)
|
||||||
for mod, moddesc in self.modules.items():
|
for mod, moddesc in self.modules.items():
|
||||||
|
mod_tags = None
|
||||||
for key in ('_original_id', 'original_id'):
|
for key in ('_original_id', 'original_id'):
|
||||||
value = moddesc.get(key)
|
value = moddesc.get(key)
|
||||||
if value:
|
if value:
|
||||||
self.tags_dict[mod] = dict(self.tags, device=value)
|
mod_tags = dict(self.tags, device=value)
|
||||||
break
|
break
|
||||||
parameters = moddesc['accessibles']
|
parameters = moddesc['accessibles']
|
||||||
for param, desc in parameters.items():
|
for param, desc in parameters.items():
|
||||||
dt = desc['datainfo']
|
dt = desc['datainfo']
|
||||||
if dt['type'] in ('double', 'int', 'enum'):
|
if dt['type'] in ('double', 'int', 'enum'):
|
||||||
stripped = param[1:] if param.startswith('_') else param
|
stripped = param[1:] if param.startswith('_') else param
|
||||||
self.param_info[mod, param] = float, (mod, param if stripped in parameters else stripped)
|
unit = dt.get('unit')
|
||||||
|
tags = self.tags or mod_tags
|
||||||
|
if unit:
|
||||||
|
tags = dict(tags, unit=unit)
|
||||||
|
key = mod, (param if stripped in parameters else stripped)
|
||||||
|
self.param_info[mod, param] = ParamInfo(float, key, tags)
|
||||||
self.send('activate')
|
self.send('activate')
|
||||||
|
|
||||||
def ping(self):
|
def ping(self):
|
||||||
@ -83,27 +93,26 @@ class SecopStream(Stream):
|
|||||||
if match:
|
if match:
|
||||||
cmd, ident, data = match.groups()
|
cmd, ident, data = match.groups()
|
||||||
mod, _, param = ident.partition(':')
|
mod, _, param = ident.partition(':')
|
||||||
cvt_key = self.param_info.get((mod, param or 'value'))
|
pinfo = self.param_info.get((mod, param or 'value'))
|
||||||
if cvt_key:
|
if pinfo:
|
||||||
cvt, key = cvt_key
|
|
||||||
data = json.loads(data)
|
data = json.loads(data)
|
||||||
tags = self.tags_dict[key[0]]
|
tags = self.tags_dict[pinfo.key[0]]
|
||||||
if cmd == 'error_update':
|
if cmd == 'error_update':
|
||||||
error = ': '.join(data[0:2])
|
error = ': '.join(data[0:2])
|
||||||
print(msg, repr(error))
|
# print(msg, repr(error))
|
||||||
timestamp = data[2].get('t', time.time())
|
timestamp = data[2].get('t', time.time())
|
||||||
yield 'error', error, key, tags, timestamp
|
yield 'error', error, pinfo.key, pinfo.tags, timestamp
|
||||||
else:
|
else:
|
||||||
value = cvt(data[0])
|
value = pinfo.cvt(data[0])
|
||||||
timestamp = data[1].get('t', time.time())
|
timestamp = data[1].get('t', time.time())
|
||||||
yield 'value', value, key, tags, timestamp
|
yield 'value', value, pinfo.key, pinfo.tags, timestamp
|
||||||
elif msg == 'active':
|
elif msg == 'active':
|
||||||
# from now on, no more waiting
|
# from now on, no more waiting
|
||||||
self.notimeout()
|
self.notimeout()
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
# probably other end closed
|
||||||
print(self.uri, repr(e))
|
print(self.uri, repr(e))
|
||||||
raise
|
|
||||||
|
|
||||||
|
|
||||||
SECOP_UDP_PORT = 10767
|
SECOP_UDP_PORT = 10767
|
||||||
@ -118,6 +127,7 @@ class UdpStream(Base):
|
|||||||
msg, addr = self.socket.recvfrom(1024)
|
msg, addr = self.socket.recvfrom(1024)
|
||||||
except socket.error: # pragma: no cover
|
except socket.error: # pragma: no cover
|
||||||
return None
|
return None
|
||||||
|
addr = socket.getnameinfo(addr, socket.NI_NOFQDN)[0]
|
||||||
msg = json.loads(msg.decode('utf-8'))
|
msg = json.loads(msg.decode('utf-8'))
|
||||||
kind = msg.pop('SECoP', None)
|
kind = msg.pop('SECoP', None)
|
||||||
if not kind:
|
if not kind:
|
||||||
@ -128,7 +138,7 @@ class UdpStream(Base):
|
|||||||
# msg['device'] = uri.split('://', 1)[-1].split(':')[0]
|
# msg['device'] = uri.split('://', 1)[-1].split(':')[0]
|
||||||
kwargs = msg
|
kwargs = msg
|
||||||
elif kind == 'node':
|
elif kind == 'node':
|
||||||
uri = f"{addr[0]}:{msg['port']}"
|
uri = f"{addr}:{msg['port']}"
|
||||||
kwargs = {'name': msg['equipment_id']}
|
kwargs = {'name': msg['equipment_id']}
|
||||||
else:
|
else:
|
||||||
continue
|
continue
|
||||||
|
24
seinflux.py
24
seinflux.py
@ -132,8 +132,10 @@ class SEHistory(InfluxDBWrapper):
|
|||||||
for merge_key, (col_idx, curves) in col_info.items():
|
for merge_key, (col_idx, curves) in col_info.items():
|
||||||
tags = summarize_tags(curves)
|
tags = summarize_tags(curves)
|
||||||
primary = tags.get(merge[0], '_value')
|
primary = tags.get(merge[0], '_value')
|
||||||
table = Table(tags, merge_key[0], ('_time', primary))
|
final_key = primary if single_merge else merge_key[1][:len(merge)]
|
||||||
result[primary if single_merge else merge_key[1][:len(merge)]] = table
|
table = result.get(final_key)
|
||||||
|
if table is None:
|
||||||
|
result[final_key] = table = Table(tags, merge_key[0], ('_time', primary))
|
||||||
by_idx[col_idx] = table
|
by_idx[col_idx] = table
|
||||||
for row in rows:
|
for row in rows:
|
||||||
by_idx[row[2]].append((row[0], row[1]))
|
by_idx[row[2]].append((row[0], row[1]))
|
||||||
@ -221,6 +223,7 @@ class SEHistory(InfluxDBWrapper):
|
|||||||
for stream in sorted(all_entries):
|
for stream in sorted(all_entries):
|
||||||
entries = all_entries[stream]
|
entries = all_entries[stream]
|
||||||
current = None
|
current = None
|
||||||
|
last = None
|
||||||
instrument = None
|
instrument = None
|
||||||
for entry in sorted(entries, key=lambda e: e[0][0]):
|
for entry in sorted(entries, key=lambda e: e[0][0]):
|
||||||
(ts, flag), tags = entry
|
(ts, flag), tags = entry
|
||||||
@ -232,10 +235,13 @@ class SEHistory(InfluxDBWrapper):
|
|||||||
tags['instrument'] = instrument
|
tags['instrument'] = instrument
|
||||||
current = [ts, tags]
|
current = [ts, tags]
|
||||||
elif current:
|
elif current:
|
||||||
if ts < current[0] + 60:
|
if ts > current[0] + 60:
|
||||||
# probably not containing real data
|
# else its probably not containing real data
|
||||||
|
last = current
|
||||||
current = None
|
current = None
|
||||||
if current or include_finished:
|
if include_finished:
|
||||||
|
current = last
|
||||||
|
if current:
|
||||||
tags = current[1]
|
tags = current[1]
|
||||||
ins = tags.get('instrument')
|
ins = tags.get('instrument')
|
||||||
if ins == '0':
|
if ins == '0':
|
||||||
@ -269,12 +275,15 @@ class SEHistory(InfluxDBWrapper):
|
|||||||
start, end = abs_range(start, end)
|
start, end = abs_range(start, end)
|
||||||
inperiod = self.query(start, end, _measurement='_stream_', _field='on', interval=interval,
|
inperiod = self.query(start, end, _measurement='_stream_', _field='on', interval=interval,
|
||||||
stream=stream, device=None, instrument=None, **tags)
|
stream=stream, device=None, instrument=None, **tags)
|
||||||
|
# prepend previous to the items in period or create if not there
|
||||||
for key, rows in previous.items():
|
for key, rows in previous.items():
|
||||||
if key in inperiod:
|
if key in inperiod:
|
||||||
inperiod[key].insert(0, rows[0])
|
inperiod[key].insert(0, rows[0])
|
||||||
else:
|
else:
|
||||||
inperiod[key] = rows
|
inperiod[key] = rows
|
||||||
|
|
||||||
|
# append items after the ones in period, ignoring keys not present in period
|
||||||
|
# in the same go, create by_stream dict, joining tables with common stream
|
||||||
by_stream = {} # dict <stream> of [<ts>, <flag>, <instrument>, <device>]
|
by_stream = {} # dict <stream> of [<ts>, <flag>, <instrument>, <device>]
|
||||||
for key, table in inperiod.items():
|
for key, table in inperiod.items():
|
||||||
nextrow = nextrows.get(key)
|
nextrow = nextrows.get(key)
|
||||||
@ -284,12 +293,13 @@ class SEHistory(InfluxDBWrapper):
|
|||||||
elist = by_stream.setdefault(stream, [])
|
elist = by_stream.setdefault(stream, [])
|
||||||
for row in table:
|
for row in table:
|
||||||
elist.append([row[0], row[1], stream, device, instrument])
|
elist.append([row[0], row[1], stream, device, instrument])
|
||||||
|
# combine now by either instrument or stream, if instrument is undefined (='0')
|
||||||
by_key = {}
|
by_key = {}
|
||||||
for stream, rows in by_stream.items():
|
for stream, rows in by_stream.items():
|
||||||
rows.sort()
|
rows.sort()
|
||||||
instrument = '0'
|
instrument = '0'
|
||||||
for row in rows:
|
for row in rows:
|
||||||
if row[-1] is None:
|
if not row[-1]:
|
||||||
row[-1] = instrument
|
row[-1] = instrument
|
||||||
else:
|
else:
|
||||||
instrument = row[-1]
|
instrument = row[-1]
|
||||||
@ -364,7 +374,7 @@ class SEHistory(InfluxDBWrapper):
|
|||||||
:param value: instrument, "0" to unassign the instrument or None when switching the stream off
|
:param value: instrument, "0" to unassign the instrument or None when switching the stream off
|
||||||
:param ts: the time or None when now
|
:param ts: the time or None when now
|
||||||
"""
|
"""
|
||||||
flag = bool(value)
|
flag = value is not None
|
||||||
try:
|
try:
|
||||||
previns, prevts = self.get_instrument(stream, ts, **tags)
|
previns, prevts = self.get_instrument(stream, ts, **tags)
|
||||||
if prevts is not None and (previns is None or (ts or 1e10) < prevts):
|
if prevts is not None and (previns is None or (ts or 1e10) < prevts):
|
||||||
|
@ -290,7 +290,7 @@ class EventStream:
|
|||||||
print('can not connect to', uri, repr(e), streamcls)
|
print('can not connect to', uri, repr(e), streamcls)
|
||||||
continue
|
continue
|
||||||
device = stream.tags.get('device')
|
device = stream.tags.get('device')
|
||||||
events.append(('stream', kwargs.get('instrument'),
|
events.append(('stream', kwargs.get('instrument', ''),
|
||||||
{'device': device}, stream.uri, int(time.time())))
|
{'device': device}, stream.uri, int(time.time())))
|
||||||
for name, stream in self.streams.items():
|
for name, stream in self.streams.items():
|
||||||
try:
|
try:
|
||||||
@ -298,7 +298,7 @@ class EventStream:
|
|||||||
return events
|
return events
|
||||||
except StreamDead:
|
except StreamDead:
|
||||||
# indicate stream is removed
|
# indicate stream is removed
|
||||||
events.append(('stream', None, {}, uri, int(time.time())))
|
events.append(('stream', None, {}, stream.uri, int(time.time())))
|
||||||
self.streams.pop(name)
|
self.streams.pop(name)
|
||||||
if events:
|
if events:
|
||||||
return events
|
return events
|
||||||
|
Reference in New Issue
Block a user