method 'complete': move end_time_dict to an argument
get_streams should return all tags
This commit is contained in:
15
influx.py
15
influx.py
@ -559,16 +559,15 @@ class InfluxDBWrapper:
|
||||
return result
|
||||
|
||||
@staticmethod
|
||||
def complete(curve_dict, end_time=0, tag='stream'):
|
||||
def complete(curve_dict, end_time=0, tag=None):
|
||||
"""complete to end_time
|
||||
|
||||
if end_time is not given, is is the max timestamp within the same stream
|
||||
if tag is given, end_time is a dict <tag value> of <end time>
|
||||
"""
|
||||
end_time_dict = {}
|
||||
if not end_time:
|
||||
for curve in curve_dict.values():
|
||||
key = curve.tags.get(tag)
|
||||
end_time_dict[key] = max(end_time_dict.get(key, 0), curve[-1][0])
|
||||
if tag is None:
|
||||
end_time_dict = {}
|
||||
else:
|
||||
end_time_dict, end_time = end_time, 0
|
||||
for curve in curve_dict.values():
|
||||
if len(curve):
|
||||
tlast, value = curve[-1]
|
||||
@ -651,7 +650,7 @@ class InfluxDBWrapper:
|
||||
for stream, entries in all_entries.items():
|
||||
entry = sorted(entries, key=lambda r: r[0])[-1]
|
||||
if entry[1]: # on=True
|
||||
result[stream] = entry.tags.get('instrument', '0')
|
||||
result[stream] = entry.tags
|
||||
return result
|
||||
|
||||
def set_instrument(self, stream, value, ts=None, guess=True, **tags):
|
||||
|
@ -278,8 +278,9 @@ class EventStream:
|
||||
except Exception as e:
|
||||
print('can not connect to', uri, repr(e))
|
||||
continue
|
||||
device = stream.tags.get('device')
|
||||
events.append(('stream', kwargs.get('instrument', '0'),
|
||||
{}, uri, int(time.time())))
|
||||
{'device': device}, uri, int(time.time())))
|
||||
for name, stream in self.streams.items():
|
||||
try:
|
||||
if stream.get_events(events, maxevents):
|
||||
|
Reference in New Issue
Block a user