add sending udp message on behalf of an non-supporting server

This commit is contained in:
2025-02-13 10:03:26 +01:00
parent 1712415f20
commit aa2069d4f4
4 changed files with 117 additions and 41 deletions

View File

@ -1,6 +1,7 @@
import sys
from streams import EventStream from streams import EventStream
from nicoscache import NicosStream from nicoscache import NicosStream
from secop import ScanStream, ScanReply from secop import ScanStream, ScanReply, send_fake_udp
from influx import testdb from influx import testdb
@ -20,4 +21,7 @@ def main():
db.disconnect() db.disconnect()
main() if len(sys.argv) > 1:
send_fake_udp(*sys.argv[1:])
else:
main()

102
influx.py
View File

@ -105,6 +105,30 @@ class NamedTuple(tuple):
raise raise
return default return default
@property
def names(self):
return tuple(self._idx_by_name)
def tuple(self, *keys):
return tuple(self.get(k) for k in keys)
class Table(list):
"""a list of tuples with meta info"""
def __init__(self, tags, key_names, column_names):
super().__init__()
self.tags = tags
self.key_names = key_names
self.column_names = column_names
class Single(Table):
"""a single row of a table, as a list with meta info"""
def __init__(self, table):
super().__init__(table.tags, table.key_names, table.column_names)
single, = table
self[:] = single
class RegExp(str): class RegExp(str):
"""indicates, tht this string should be treated as regexp """indicates, tht this string should be treated as regexp
@ -229,23 +253,28 @@ class InfluxDBWrapper:
# query the database # query the database
def query(self, start=None, stop=None, interval=None, last=False, columns=None, **tags): def query(self, start=None, stop=None, interval=None, single=None, columns=None, **tags):
"""Returns queried data as InfluxDB tables """Returns queried data as InfluxDB tables
:param start: start time. default is a month ago :param start: start time. default is a month ago
:param stop: end time, default is tomorrow at the same time :param stop: end time, default is tomorrow at the same time
:param interval: if set an aggregation filter will be applied. This will :param interval: if set an aggregation filter will be applied. This will
return only the latest values per time interval in seconds. return only the latest values per time interval in seconds.
:param last: when True, only the last value within the interval is returned :param single: when True (or 1), only the last value within the interval is returned
(for any existing combinations of tags!) (for any existing combinations of tags!)
single=-1: return the first value instead
:param columns: if given, return only these columns (in addition to '_time' and '_value') :param columns: if given, return only these columns (in addition to '_time' and '_value')
:param tags: selection criteria: :param tags: selection criteria:
<tag>=None <tag>=None
return records independent of this tag. the value will be contained in the result dicts key return records independent of this tag.
the obtained value will be contained in the result dicts key
<tag>=[<value1>, <value2>, ...] <tag>=[<value1>, <value2>, ...]
return records where tag is one from the list. the value is contained in the result dicts key return records where tag is one from the list.
<tag>>=<value>: the obtained value is contained in the result dicts key
return only records with given tag matching <value>. the value is not part of the results key <tag>=<value>:
return only records with given tag matching <value>.
the obtained value is contained in the result dicts key only
if the value is an instance of RegExp or when it contains an asterisk ('*')
:return: a dict <tuple of key values> of list of <row> :return: a dict <tuple of key values> of list of <row>
where <tuple of keys> and <row> are NamedTuple where <tuple of keys> and <row> are NamedTuple
@ -255,30 +284,36 @@ class InfluxDBWrapper:
msg = [f'from(bucket:"{self._bucket}")', msg = [f'from(bucket:"{self._bucket}")',
f'|> range(start: {start}, stop: {stop})'] f'|> range(start: {start}, stop: {stop})']
keynames = [] keys = {}
dropcols = ['_start', '_stop'] dropcols = ['_start', '_stop']
fixed_tags = {}
for key, crit in tags.items(): for key, crit in tags.items():
if crit is None: if crit is None:
keynames.append(key) keys[key] = None
continue continue
if isinstance(crit, str): if isinstance(crit, str):
if isinstance(crit, RegExp) or '*' in crit: if isinstance(crit, RegExp) or '*' in crit:
keynames.append(key) keys[key] = None
append_wildcard_filter(msg, key, [crit]) append_wildcard_filter(msg, key, [crit])
continue continue
fixed_tags[key] = crit
dropcols.append(key) dropcols.append(key)
crit = f'"{crit}"' crit = f'"{crit}"'
elif isinstance(crit, (int, float)): elif isinstance(crit, (int, float)):
fixed_tags[key] = crit
dropcols.append(key) dropcols.append(key)
else: else:
try: try:
keynames.append(key) keys[key] = None
append_wildcard_filter(msg, key, crit) append_wildcard_filter(msg, key, crit)
continue continue
except Exception: except Exception:
raise ValueError(f'illegal value for {key}: {crit}') raise ValueError(f'illegal value for {key}: {crit}')
msg.append(f'|> filter(fn:(r) => r.{key} == {crit})') msg.append(f'|> filter(fn:(r) => r.{key} == {crit})')
if last: if single:
if single < 0:
msg.append('|> first(column: "_time")')
else:
msg.append('|> last(column: "_time")') msg.append('|> last(column: "_time")')
if interval: if interval:
msg.append(f'|> aggregateWindow(every: {interval}s, fn: last, createEmpty: false)') msg.append(f'|> aggregateWindow(every: {interval}s, fn: last, createEmpty: false)')
@ -286,24 +321,24 @@ class InfluxDBWrapper:
msg.append(f'''|> drop(columns:["{'","'.join(dropcols)}"])''') msg.append(f'''|> drop(columns:["{'","'.join(dropcols)}"])''')
else: else:
columns = ['_time', '_value'] + list(columns) columns = ['_time', '_value'] + list(columns)
msg.append(f'''|> keep(columns:["{'","'.join(columns + keynames)}"])''') msg.append(f'''|> keep(columns:["{'","'.join(columns + keys)}"])''')
msg = '\n'.join(msg) msg = '\n'.join(msg)
print(msg) print(msg)
reader = self._client.query_api().query_csv(msg) reader = self._client.query_api().query_csv(msg)
print('CSV') print('CSV', keys, columns)
converters = None converters = None
group = None group = None
column_names = None column_names = None
column_keys = None
key = None key = None
result = {} result = {}
table = None tableno = None
for row in reader: for row in reader:
if not row: if not row:
continue continue
print(row)
if row[0]: if row[0]:
if row[0] == '#datatype': if row[0] == '#datatype':
converters = {i: CONVERTER.get(d) for i, d in enumerate(row) if i > 2} converters = {i: CONVERTER.get(d) for i, d in enumerate(row) if i > 2}
@ -312,31 +347,27 @@ class InfluxDBWrapper:
group = row group = row
continue continue
if column_names is None: if column_names is None:
print('COL', row)
column_names = row column_names = row
keys = {}
for col, (name, grp) in enumerate(zip(column_names, group)): for col, (name, grp) in enumerate(zip(column_names, group)):
if grp != 'true': if grp != 'true':
continue continue
# if name in keynames or (columns is None and name not in dropcols): if columns is None or name in keys:
if name in keynames or columns is None: keys[name] = col, converters.pop(col)
keys[col] = converters.pop(col) column_keys = tuple(column_names[i] for i in converters)
valuecls = NamedTuple([row[i] for i in converters])
keycls = NamedTuple([row[i] for i in keys])
continue continue
if row[2] != table: if row[2] != tableno:
# new table, new key # new table, new key
table = row[2] tableno = row[2]
key = keycls(f(row[i]) for i, f in keys.items()) key_dict = {n: f(row[i]) for n, (i, f) in keys.items()}
print('new table', table, key) key = tuple(key_dict.values())
if result.get(key) is None: if result.get(key) is None:
result[key] = [] print('KC', key_dict, column_keys)
result[key] = Table({**fixed_tags, **key_dict}, tuple(keys), column_keys)
result[key].append(valuecls(f(row[i]) for i, f in converters.items())) result[key].append(tuple(f(row[i]) for i, f in converters.items()))
if last: if single:
print('LAST')
for key, table in result.items(): for key, table in result.items():
result[key], = table result[key] = Single(table)
else: else:
for table in result.values(): for table in result.values():
table.sort() table.sort()
@ -354,9 +385,10 @@ class InfluxDBWrapper:
:param add_prev: amount of time to look back for the last previous point (default: 1 hr) :param add_prev: amount of time to look back for the last previous point (default: 1 hr)
:param add_end: whether to add endpoint at stop time (default: False) :param add_end: whether to add endpoint at stop time (default: False)
:param tags: further selection criteria :param tags: further selection criteria
:return: a dict <tuple of key values> of list of <row> :return: a dict <tuple of key values> of <Table> or <Single>
where <tuple of keys> and <row> are NamedTuple
<row> is (<timestamp>, <value>) where <Table> is a list of tuples with some meta info (table.tags, table.column_names)
and <Single> is a list (a single row of a table) with the same meta info
when _field='float' (the default), the returned values are either a floats or None when _field='float' (the default), the returned values are either a floats or None
""" """
@ -370,7 +402,7 @@ class InfluxDBWrapper:
else: else:
result = {} result = {}
if add_prev: if add_prev:
prev_data = self.query(rstart - add_prev, rstart, last=True, **tags) prev_data = self.query(rstart - add_prev, rstart, single=1, **tags)
for key, first in prev_data.items(): for key, first in prev_data.items():
curve = result.get(key) curve = result.get(key)
if first[1] is not None: if first[1] is not None:

View File

@ -110,9 +110,12 @@ class UdpStream(Base):
except socket.error: # pragma: no cover except socket.error: # pragma: no cover
return None return None
msg = json.loads(msg.decode('utf-8')) msg = json.loads(msg.decode('utf-8'))
if msg['SECoP'] != 'node': if msg['SECoP'] == 'for_other_node':
continue uri = msg['uri']
elif msg['SECoP'] == 'node':
uri = f"{addr[0]}:{msg['port']}" uri = f"{addr[0]}:{msg['port']}"
else:
continue
yield SecopStream, uri, msg['equipment_id'] yield SecopStream, uri, msg['equipment_id']
@ -144,5 +147,14 @@ class ScanStream(UdpStream):
self.select_dict[sock.fileno()] = self self.select_dict[sock.fileno()] = self
def send_fake_udp(uri, equipment_id='fake'):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
msg = json.dumps({
'SECoP': 'for_other_node',
'uri': uri,
'equipment_id': equipment_id,
}, ensure_ascii=False, separators=(',', ':')).encode('utf-8')
sock.sendto(msg, ('255.255.255.255', SECOP_UDP_PORT))

28
t.py
View File

@ -46,3 +46,31 @@ def crv(*args, **kwds):
global result global result
result = db.curves(*args, **kwds) result = db.curves(*args, **kwds)
prt() prt()
def sry():
global result
res = db.query(-DAY * 365, interval=DAY, _field='float',
device=None, stream=None, _measurement=None)
result = {} # dict (device, stream) of list of [start, end, set of params]
for key, table in res.items():
assert table.key_names == ('device', 'stream', '_measurement')
device, stream, param = key
for row in table:
start = row[0] - 3600
result.setdefault((start, device, stream), set()).add(param)
prev_data = {}
summary = []
for (start, device, stream), pset in sorted(result.items()):
prev = prev_data.get((device, stream))
if prev is None or start > prev[1]:
if prev:
print('PREV', device, stream, start - prev[1])
prev_data[device, stream] = prev = [start, start + 3600, pset]
summary.append([start, device, stream, prev])
else:
prev[1] = start + 3600
prev[2].update(pset)
for start, device, stream, (_, end, pset) in sorted(summary):
st = time.strftime('%Y-%m-%d %H:%M', time.localtime(start))
print(st, (end - start) / 3600., device, stream, len(pset))