diff --git a/influx.py b/influx.py index 56bd3c6..05dfe77 100644 --- a/influx.py +++ b/influx.py @@ -228,6 +228,7 @@ class InfluxDBWrapper: self._write_buffer = [] self._alias = {} print('InfluxDBWrapper', self._url, self._org, self._bucket) + self.debug_reply = False def enable_write_access(self): self._write_api_write = self._client.write_api(write_options=SYNCHRONOUS).write @@ -398,6 +399,12 @@ class InfluxDBWrapper: print(msg) raise + if self.debug_reply: + def readdebug(reader): + for row in reader: + print(row) + yield row + reader = readdebug(reader) try: row = next(reader) except StopIteration: diff --git a/streams.py b/streams.py index d77809d..cec74f3 100644 --- a/streams.py +++ b/streams.py @@ -215,10 +215,12 @@ class Stream(Base): for event in self.generator: kind, value, key, tags, ts = event timestamp = max(self.start_time, min(ts or INF, time.time())) + if timestamp != ts: + event = event[:-1] + (timestamp,) if timestamp >= self.next_hour: t = (timestamp // 3600) * 3600 events.extend(e[:-1] + (t,) for e in self.cache.values()) - self.next_hour = ts + 3600 + self.next_hour = t + 3600 prev = self.cache[key][:2] if key in self.cache else (None, None) if (kind, value) != prev: if kind == 'error':