From 78ceecbb53d6c44f00d314174bcefa2c3db44aab Mon Sep 17 00:00:00 2001 From: Markus Zolliker Date: Tue, 25 Feb 2025 14:13:35 +0100 Subject: [PATCH] method 'complete': move end_time_dict to an argument get_streams should return all tags --- influx.py | 15 +++++++-------- streams.py | 3 ++- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/influx.py b/influx.py index 05dfe77..04ed9cb 100644 --- a/influx.py +++ b/influx.py @@ -559,16 +559,15 @@ class InfluxDBWrapper: return result @staticmethod - def complete(curve_dict, end_time=0, tag='stream'): + def complete(curve_dict, end_time=0, tag=None): """complete to end_time - if end_time is not given, is is the max timestamp within the same stream + if tag is given, end_time is a dict of """ - end_time_dict = {} - if not end_time: - for curve in curve_dict.values(): - key = curve.tags.get(tag) - end_time_dict[key] = max(end_time_dict.get(key, 0), curve[-1][0]) + if tag is None: + end_time_dict = {} + else: + end_time_dict, end_time = end_time, 0 for curve in curve_dict.values(): if len(curve): tlast, value = curve[-1] @@ -651,7 +650,7 @@ class InfluxDBWrapper: for stream, entries in all_entries.items(): entry = sorted(entries, key=lambda r: r[0])[-1] if entry[1]: # on=True - result[stream] = entry.tags.get('instrument', '0') + result[stream] = entry.tags return result def set_instrument(self, stream, value, ts=None, guess=True, **tags): diff --git a/streams.py b/streams.py index cec74f3..05cb8ff 100644 --- a/streams.py +++ b/streams.py @@ -278,8 +278,9 @@ class EventStream: except Exception as e: print('can not connect to', uri, repr(e)) continue + device = stream.tags.get('device') events.append(('stream', kwargs.get('instrument', '0'), - {}, uri, int(time.time()))) + {'device': device}, uri, int(time.time()))) for name, stream in self.streams.items(): try: if stream.get_events(events, maxevents):