From dd571fc3ea1387402a8241aaafd330dd38a6c0c5 Mon Sep 17 00:00:00 2001 From: Markus Zolliker Date: Wed, 21 May 2025 09:49:50 +0200 Subject: [PATCH] improve set_instrument and get_experiment - do not predate when setting instrument to '0' - join chunks with the same device --- seinflux.py | 41 +++++++++++++++++++++++++++-------------- 1 file changed, 27 insertions(+), 14 deletions(-) diff --git a/seinflux.py b/seinflux.py index 2f03360..b900ee2 100644 --- a/seinflux.py +++ b/seinflux.py @@ -7,11 +7,17 @@ ETERNITY = 1e10 def fmtime(t): + if t is None: + return ' ' * 19 if t >= ETERNITY: return '-' * 19 return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(int(t))) +def sameday(ts1, ts2): + return time.localtime(ts1)[:3] == time.localtime(ts2)[:3] + + def summarize_tags(curves, remove_multiple=False): """summarize tags @@ -315,7 +321,7 @@ class SEHistory(InfluxDBWrapper): for key, rows in by_key.items(): devices = {} current = {} - combi = None + prevcombi = None chunk = None for ts, flag, stream, device, _ in rows: ts = max(start, int(ts)) @@ -324,17 +330,22 @@ class SEHistory(InfluxDBWrapper): else: devices.pop(stream, None) devcombi = tuple(zip(*sorted(devices.items()))) - if devcombi != combi: - if combi: + if devcombi != prevcombi: + if prevcombi: # device(s) removed prevend = min(ts, chunk[1]) if prevend - chunk[0] < gap: - current.pop(combi) + current.pop(prevcombi) else: chunk[1] = prevend - if devcombi: + if devcombi: # device(s) added chunks = current.setdefault(devcombi, []) - if chunks and time.localtime(chunks[-1][0])[:3] == time.localtime(ts)[:3]: - # merge when started at the same day + if chunks: + prevbeg, prevend = chunks[-1] + # merge when joining or started at the same day + merge = prevend + gap < ts or sameday(prevbeg, ts) + else: + merge = False + if merge: chunk = chunks[-1] chunk[1] = ETERNITY else: @@ -342,7 +353,7 @@ class SEHistory(InfluxDBWrapper): chunks.append(chunk) else: chunk = None - combi = devcombi + prevcombi = devcombi if current: result[key] = current return result @@ -372,24 +383,26 @@ class SEHistory(InfluxDBWrapper): instrument = None if ins == '0' else ins return instrument, lastts - def set_instrument(self, stream, value, ts=None, **tags): + def set_instrument(self, stream, instrument, ts=None, **tags): """set stream and instrument on or off :param stream: the uri of the stream - :param value: instrument, "0" to unassign the instrument or None when switching the stream off + :param instrument: or "0" to unassign the instrument or None when switching the stream off :param ts: the time or None when now """ - flag = value is not None - if flag: + flag = instrument is not None + if flag and instrument != '0': + # in case an real instrument is given and no instrument is assigned yet + # we predate the assignment to the start of the stream try: previns, prevts = self.get_instrument(stream, ts, **tags) - if prevts is not None and (previns is None or (ts or 1e10) < prevts): + if prevts is not None and (previns is None or (ts or ETERNITY) < prevts): ts = prevts + 0.001 except Exception as e: print(f'Exception in get_instrument {e!r}') tags['stream'] = stream if flag: - tags['instrument'] = value + tags['instrument'] = instrument self._add_point('_stream_', 'on', flag, ts, tags) def remove_experiment(self, stream, ts=None, **tags):