change behaviour of instrument assignment

instrument assignment is persistent
if a stream has to be unassigned from a stream, assign it
to the dummy instrument '0'
This commit is contained in:
l_samenv
2025-03-03 11:24:02 +01:00
parent 31377dc933
commit fc9970b4af
2 changed files with 79 additions and 54 deletions

View File

@ -175,16 +175,6 @@ class SEHistory(InfluxDBWrapper):
def add_error(self, value, key, tags, ts):
self._add_point('.'.join(key), 'error', '' if value is None else str(value), ts, tags)
def get_instrument(self, stream, ts=None, **tags):
if ts is None:
ts = int(time.time())
reply = self.query(None, int(ts) + 1, _measurement='_stream_', _field='on',
stream=stream, single=1, **tags)
if reply:
entry = sorted(reply.values(), key=lambda r: r[0][0])[-1][0]
return entry.tags.get('instrument', self.instrument), entry[0]
return None, None
def get_streams(self, instrument=None, stream=None, device=None, start=None, end=None, **tags):
"""get streams for one or all instruments
@ -193,11 +183,12 @@ class SEHistory(InfluxDBWrapper):
:param device: None or a comma separated string
:param start: None or start time. None means 'since ever'
:param end: None or end time. None means now (or more precise 'in a year')
:return: dict <stream> of <instrument> or '0' when instrument is not known
:return: dict <stream> of tags
Remark: an assignment of an instrument to a stream persists, even when the stream gets off
"""
if end is not None:
end = end + 1
end = int(time.time())
if stream:
if isinstance(stream, str):
stream = stream.split(',')
@ -227,18 +218,26 @@ class SEHistory(InfluxDBWrapper):
result = {}
for stream, entries in all_entries.items():
current = None
instrument = None
for entry in sorted(entries, key=lambda e: e[0][0]):
ts, flag = entry[0]
(ts, flag), tags = entry
if flag:
current = entry
if 'instrument' in tags:
instrument = tags['instrument']
elif instrument:
# keep instrument from previous assignments
tags['instrument'] = instrument
current = [ts, tags]
elif current:
(lastts, _), tags = current
if ts > lastts + 60: # at least one minute
result[stream] = tags
break
else: # or on at end
if ts < current[0] + 60:
# probably not containing real data
current = None
if current:
result[stream] = current[1]
tags = current[1]
ins = tags.get('instrument')
if ins == '0':
tags.pop('instrument')
result[stream] = tags
return result
def get_experiments(self, start=None, end=None, stream=None, **tags):
@ -254,7 +253,6 @@ class SEHistory(InfluxDBWrapper):
interval = 1
gap = 600
eternity = 1e10
entries = {}
if start is None:
previous = {}
else:
@ -274,24 +272,34 @@ class SEHistory(InfluxDBWrapper):
else:
inperiod[key] = rows
by_stream = {} # dict <stream> of [<ts>, <flag>, <instrument>, <device>]
for key, table in inperiod.items():
nextrow = nextrows.get(key)
if nextrow and not nextrow[1]:
table.extend(nextrow)
stream, instrument, device = [table.tags.get(k, '') for k in ('stream', 'instrument', 'device')]
key = ('instrument', instrument) if instrument else ('stream', stream)
elist = entries.setdefault(key, [])
elist = by_stream.setdefault(stream, [])
for row in table:
print('S', stream, device, key, row[1], fmtime(row[0]))
elist.append(row[:2] + (stream, device))
result = {} # dict <instrument> of dict <device combi> of list of [start, end]
for key, rows in entries.items():
print('S', stream, instrument, device, row[1], fmtime(row[0]))
elist.append([row[0], row[1], stream, device, instrument])
by_key = {}
for stream, rows in by_stream.items():
rows.sort()
instrument = '0'
for row in rows:
if row[-1] is None:
row[-1] = instrument
else:
instrument = row[-1]
key = ('stream', stream) if instrument == '0' else ('instrument', instrument)
by_key.setdefault(key, []).append(rows)
result = {}
for key, rows in by_key.items():
devices = {}
current = {}
combi = None
chunk = None
for ts, flag, stream, device in rows:
for ts, flag, stream, device, _ in rows:
ts = max(start, int(ts))
if flag:
devices[stream] = device
@ -325,34 +333,54 @@ class SEHistory(InfluxDBWrapper):
result[key] = current
return result
def set_instrument(self, stream, value, ts=None, guess=True, **tags):
def get_instrument(self, stream, ts=None, **tags):
"""get assigned instrument and stream state
:param stream: the stream to look up
:param ts: the time ot None for 'now'
:param tags: tags to further filter the result
:return: <instrument>, <timestamp>
where <instrument> is the instrument assigned to stream <stream> at ts
amd <timestamp> is the time when this stream was on last time before ts or None when it is off
"""
if ts is None:
ts = int(time.time())
reply = self.query(None, int(ts) + 1, _measurement='_stream_', _field='on',
stream=stream, single=1, **tags)
instrument, lastts = None, None
if reply:
for table in sorted(reply.values(), key=lambda r: r[0][0]):
ins = table.tags.get('instrument')
row = table[-1]
lastts = row[0] if row[1] else None
if ins:
instrument = None if ins == '0' else ins
return instrument, lastts
def set_instrument(self, stream, value, ts=None, **tags):
"""set stream and instrument on or off
:param stream: the uri of the stream
:param value: instrument, "0" when unknown or None when switching to off
:param value: instrument, "0" to unassign the instrument or None when switching the stream off
:param ts: the time or None when now
:param guess: when instrument is undefined, take from previous
"""
flag = bool(value)
try:
prev, prevts = self.get_instrument(stream, ts, **tags)
if prevts is not None:
if prev in (None, '0'):
ts = prevts + 0.001
previns, prevts = self.get_instrument(stream, ts, **tags)
if prevts is None:
if not flag:
return # no change
else:
if value == '0' and guess:
value = prev
if ts < prevts:
if previns == value and flag:
return # no change
if previns is None or ts < prevts:
ts = prevts + 0.001
except Exception as e:
print(f'Exception in get_instrument {e!r}')
pass
tags['stream'] = stream
if value:
if flag:
tags['instrument'] = value
flag = True
else:
tags['instrument'] = prev or '0'
flag = False
self._add_point('_stream_', 'on', flag, ts, tags)
def add_stream(self, value, tags, key, ts):

13
t.py
View File

@ -17,11 +17,9 @@ crv([start], [stop], [mod.par], ['float'], [interval=...,] [add_prev=False,] [ad
now = int(time.time())
result = {}
maxcurves = 7
maxpoints = 7
def prt():
def prt(maxpoints=7, maxcurves=7):
for i, (key, curve) in enumerate(result.items()):
if i > maxcurves:
print('--- ...')
@ -40,21 +38,20 @@ def prt():
print(fmtime(row[0]), row[1:])
def qry(*args, **kwds):
def qry(*args, maxpoints=7, maxcurves=7, **kwds):
result.clear()
result.update(db.query(*args, **kwds))
print('PRINT')
prt()
prt(maxpoints, maxcurves)
def crv(*args, **kwds):
def crv(*args, maxpoints=7, maxcurves=7, **kwds):
result.clear()
res = db.curves(*args, **kwds)
if isinstance(res, list):
result[()] = res
else:
result.update(res)
prt()
prt(maxpoints, maxcurves)
def sry(prectime=False):