improve set_instrument and get_experiment
- do not predate when setting instrument to '0' - join chunks with the same device
This commit is contained in:
41
seinflux.py
41
seinflux.py
@ -7,11 +7,17 @@ ETERNITY = 1e10
|
||||
|
||||
|
||||
def fmtime(t):
|
||||
if t is None:
|
||||
return ' ' * 19
|
||||
if t >= ETERNITY:
|
||||
return '-' * 19
|
||||
return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(int(t)))
|
||||
|
||||
|
||||
def sameday(ts1, ts2):
|
||||
return time.localtime(ts1)[:3] == time.localtime(ts2)[:3]
|
||||
|
||||
|
||||
def summarize_tags(curves, remove_multiple=False):
|
||||
"""summarize tags
|
||||
|
||||
@ -315,7 +321,7 @@ class SEHistory(InfluxDBWrapper):
|
||||
for key, rows in by_key.items():
|
||||
devices = {}
|
||||
current = {}
|
||||
combi = None
|
||||
prevcombi = None
|
||||
chunk = None
|
||||
for ts, flag, stream, device, _ in rows:
|
||||
ts = max(start, int(ts))
|
||||
@ -324,17 +330,22 @@ class SEHistory(InfluxDBWrapper):
|
||||
else:
|
||||
devices.pop(stream, None)
|
||||
devcombi = tuple(zip(*sorted(devices.items())))
|
||||
if devcombi != combi:
|
||||
if combi:
|
||||
if devcombi != prevcombi:
|
||||
if prevcombi: # device(s) removed
|
||||
prevend = min(ts, chunk[1])
|
||||
if prevend - chunk[0] < gap:
|
||||
current.pop(combi)
|
||||
current.pop(prevcombi)
|
||||
else:
|
||||
chunk[1] = prevend
|
||||
if devcombi:
|
||||
if devcombi: # device(s) added
|
||||
chunks = current.setdefault(devcombi, [])
|
||||
if chunks and time.localtime(chunks[-1][0])[:3] == time.localtime(ts)[:3]:
|
||||
# merge when started at the same day
|
||||
if chunks:
|
||||
prevbeg, prevend = chunks[-1]
|
||||
# merge when joining or started at the same day
|
||||
merge = prevend + gap < ts or sameday(prevbeg, ts)
|
||||
else:
|
||||
merge = False
|
||||
if merge:
|
||||
chunk = chunks[-1]
|
||||
chunk[1] = ETERNITY
|
||||
else:
|
||||
@ -342,7 +353,7 @@ class SEHistory(InfluxDBWrapper):
|
||||
chunks.append(chunk)
|
||||
else:
|
||||
chunk = None
|
||||
combi = devcombi
|
||||
prevcombi = devcombi
|
||||
if current:
|
||||
result[key] = current
|
||||
return result
|
||||
@ -372,24 +383,26 @@ class SEHistory(InfluxDBWrapper):
|
||||
instrument = None if ins == '0' else ins
|
||||
return instrument, lastts
|
||||
|
||||
def set_instrument(self, stream, value, ts=None, **tags):
|
||||
def set_instrument(self, stream, instrument, ts=None, **tags):
|
||||
"""set stream and instrument on or off
|
||||
|
||||
:param stream: the uri of the stream
|
||||
:param value: instrument, "0" to unassign the instrument or None when switching the stream off
|
||||
:param instrument: or "0" to unassign the instrument or None when switching the stream off
|
||||
:param ts: the time or None when now
|
||||
"""
|
||||
flag = value is not None
|
||||
if flag:
|
||||
flag = instrument is not None
|
||||
if flag and instrument != '0':
|
||||
# in case an real instrument is given and no instrument is assigned yet
|
||||
# we predate the assignment to the start of the stream
|
||||
try:
|
||||
previns, prevts = self.get_instrument(stream, ts, **tags)
|
||||
if prevts is not None and (previns is None or (ts or 1e10) < prevts):
|
||||
if prevts is not None and (previns is None or (ts or ETERNITY) < prevts):
|
||||
ts = prevts + 0.001
|
||||
except Exception as e:
|
||||
print(f'Exception in get_instrument {e!r}')
|
||||
tags['stream'] = stream
|
||||
if flag:
|
||||
tags['instrument'] = value
|
||||
tags['instrument'] = instrument
|
||||
self._add_point('_stream_', 'on', flag, ts, tags)
|
||||
|
||||
def remove_experiment(self, stream, ts=None, **tags):
|
||||
|
Reference in New Issue
Block a user