From 50f8c349eeb8f9854898b99735329c204b901022 Mon Sep 17 00:00:00 2001 From: Markus Zolliker Date: Thu, 16 Jan 2025 12:56:31 +0100 Subject: [PATCH] add streams.py was forgotten initially --- influx.py | 61 +++++++++++---- streams.py | 219 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 264 insertions(+), 16 deletions(-) create mode 100644 streams.py diff --git a/influx.py b/influx.py index 9f49a93..c9760d2 100644 --- a/influx.py +++ b/influx.py @@ -21,6 +21,8 @@ # ***************************************************************************** import re import time +import threading +import queue from math import floor, ceil, copysign from datetime import datetime import pandas as pd @@ -44,7 +46,11 @@ def negnull2none(v): class RegExp(str): - pass + """indicates, tht this string should be treated as regexp + + Usage: RegExp() + when used in InfluxDBWrapper.query, uses Go regexp syntax! + """ def wildcard_filter(key, names): @@ -135,10 +141,11 @@ class InfluxDBWrapper: based on nicos.services.cache.database.influxdb (work of Konstantin Kholostov ) """ + _update_queue = None + _write_thread = None - def __init__(self, url, token, org, bucket, create=False, watch_interval=300): + def __init__(self, url, token, org, bucket, threaded=False, create=False, watch_interval=300): self._watch_interval = watch_interval - self._update_queue = [] self._url = url self._token = token self._org = org @@ -150,6 +157,34 @@ class InfluxDBWrapper: self._active_streams = {} self.set_time_precision(3) self.add_new_bucket(self._bucket, create) + if threaded: + self._update_queue = queue.Queue(100) + self._write_thread = threading.Thread(target=self._write_thread) + + def _write(self, point): + if self._update_queue: + self._update_queue.put(point) + else: + self._write_api_write(bucket=self._bucket, record=[point]) + + def _write_thread(self): + while 1: + points = [self.write_queue.get()] + try: + while 1: + points.append(self.write_queue.get(False)) + except queue.Empty: + pass + self._write_api_write(bucket=self._bucket, record=points) + event = self._wait_complete + if event: + self._wait_complete = None + event.set() + + def flush(self): + if self._write_thread: + self._wait_complete = event = threading.Event() + event.wait() def set_time_precision(self, digits): self.timedig = max(0, min(digits, 9)) @@ -159,10 +194,12 @@ class InfluxDBWrapper: return round(timevalue.timestamp(), self.timedig) def disconnect(self): + for _ in range(10): + self._write_thread.join(1) for stream, last in self._active_streams.items(): - self._update_queue.append(Point('_streams_') - .time(last, write_precision=self._write_precision) - .field('interval', 0).tag('stream', stream)) + self._write(Point('_streams_') + .time(last, write_precision=self._write_precision) + .field('interval', 0).tag('stream', stream)) self.flush() self._client.close() @@ -199,12 +236,6 @@ class InfluxDBWrapper: self.get_measurements(meas) print('deleted', all) - def flush(self): - points = self._update_queue - if points: - self._update_queue = [] - self._write_api_write(bucket=self._bucket, record=points) - def query(self, start=None, stop=None, interval=None, last=False, **tags): """Returns queried data as InfluxDB tables @@ -310,7 +341,7 @@ class InfluxDBWrapper: if ts > self._deadline: dl = ts // self._watch_interval * self._watch_interval for stream, last in self._active_streams.items(): - self._update_queue.append( + self._write( Point('_streams_') .time(datetime.utcfromtimestamp(last), write_precision=self._write_precision) .field('interval', self._watch_interval).tag('stream', stream)) @@ -321,9 +352,7 @@ class InfluxDBWrapper: point.time(datetime.utcfromtimestamp(ts), write_precision=self._write_precision) for key, val in tags.items(): point.tag(key, val) - self._update_queue.append(point) - if len(self._update_queue) > 0: # 100 - self.flush() + self._write(point) def write_float(self, measurement, field, value, ts, **tags): # make sure value is float diff --git a/streams.py b/streams.py new file mode 100644 index 0000000..24a6a5e --- /dev/null +++ b/streams.py @@ -0,0 +1,219 @@ +import socket +import time +import re +from select import select + + +def parse_uri(uri): + scheme, _, hostport = uri.rpartition('://') + scheme = scheme or 'tcp' + if scheme != 'tcp': + raise ValueError(f'scheme {scheme!r} not supported') + host, _, port = hostport.rpartition(':') + host = host or 'localhost' + return host, int(port) + + +NETBUFSIZE = 4092 +INF = float('inf') + + +class Base: + select_dict = {} + + +class Stream(Base): + _last_time = None + dead = False + max_offline = 20 + ping_timeout = 3 + _next_ping = INF + _ping_deadline = 0 + _deadline = INF + _next_connect = 0 + + def __init__(self, uri, name=None, timeout=5, encoding='latin-1'): + self.name = name or uri + self.uri = uri + self.tags = {} + self.streamname = self.uri + self.encoding = encoding + self.timeout = timeout + self.socket = None + self.cache = {} + self.first_values = time.time() + try: + self.connect() + self.init() + self.first_values = 0 + except Exception as e: + print(self.uri, repr(e)) + raise + + def connect(self): + self.socket = socket.create_connection(parse_uri(self.uri)) + self.select_dict[self.socket.fileno()] = self + self.settimeout(self.timeout) + host, _, port = self.uri.partition(':') + # try to convert uri to host name + host = socket.gethostbyaddr(host)[0] + # psi special: shorten name, in case a computer has several network connections + match = re.match(r'([^.-]+)(?:-129129\d{6}|(-[~.]*|)).psi.ch', host) + if match: + host = match.group(1) + (match.group(2) or '') + self.tags['stream'] = f'{host}:{port}' + print(self.uri, '=', self.tags['stream'], 'connected') + self._buffer = [] + self._deadline = INF + self._next_connect = 0 + self._pinged = False + + def init(self): + pass + + def ping(self): + raise NotImplementedError + + def pong(self): + self._alive = True + + def close(self): + """Do our best to close a socket.""" + if self.socket is None: + return + self.select_dict.pop(self.socket.fileno(), None) + print(self.uri, 'close socket') + try: + self.socket.shutdown(socket.SHUT_RDWR) + except socket.error: + pass + try: + self.socket.close() + except socket.error: + pass + self.socket = None + + def is_offline(self): + """try restart if needed""" + if self.socket is None: + now = time.time() + if now < self._next_connect: + return True + try: + self.connect() + self.init() + except Exception as e: + if now > self._deadline: + self.close() + self.dead = min(now, self._last_live + 1) + return True + if self._deadline == INF: + print(f'error "{e}" connecting to {self.uri} retrying for {self.max_offline} sec') + self._deadline = now + self.max_offline + else: + print('.', end='', flush=True) + self._next_connect = now + 0.5 + return True + return False + + def settimeout(self, timeout): + self.timeout = timeout + if self.socket: + self.socket.settimeout(timeout) + + def notimeout(self): + if self.socket: + self.socket.settimeout(0) + + def send(self, line): + try: + self.socket.sendall(line.encode(self.encoding)) + self.socket.sendall(b'\n') + except Exception as e: + self.close() + e.args = ('send:' + e.args[0],) + raise + + def get_lines(self): + """generator returning lines as bytes""" + if self.is_offline(): + return + now = time.time() + if now > self._next_ping: + if self._next_ping == self._ping_deadline: + print(self.uri, 'no pong') + self.close() + return + self.ping() + self._last_live = self._next_ping - self.ping_timeout + self._next_ping = self._ping_deadline = now + self._next_ping + buffer = self._buffer + while 1: + try: + received = self.socket.recv(NETBUFSIZE) + if not received: + raise ConnectionAbortedError('connection closed by other end') + except TimeoutError: + break + except BlockingIOError: + break + except Exception: + self._last_live = now + self.close() + raise + splitted = received.split(b'\n') + if len(splitted) == 1: + buffer.append(received) + else: + self._next_ping = now + self.ping_timeout + buffer.append(splitted[0]) + splitted[0] = b''.join(buffer) + buffer[:] = [splitted.pop()] + for line in splitted: + yield line.decode(self.encoding) + if len(received) < NETBUFSIZE and self.timeout == 0: + break + + +class EventGenerator: + return_on_wait = False + # return_on_wait = True: stop generator when no more streams have buffered content + # note: a stream with buffered content might not be ready to emit any event, because + # of filtering + + def __init__(self, *udp, **streams): + self.streams = streams + self.udp = {v.socket.fileno(): v for v in udp} + + def wait_ready(self, timeout): + ready = select(Stream.select_dict, [], [], timeout)[0] + return [Stream.select_dict[f] for f in ready] + + def gen(self): + while 1: + for stream in self.wait_ready(1): + if not isinstance(stream, Stream): + for streamcls, uri, *args in stream.events(): + if uri not in self.streams: + print('ADD STREAM', uri, *args) + self.streams[uri] = streamcls(uri, *args) + for name, stream in self.streams.items(): + if stream.dead: + print('REMOVE STREAM', name) + for key in stream.cache: + yield key, None, 'END', stream.dead, stream.tags + self.streams.pop(name) + break + for key, value, error, ts, tags in stream.events(): + ts = max(stream.first_values, min(ts or INF, time.time())) + prev = stream.cache.get(key, None) + if (value, error) != prev: + yield key, value, error, ts, tags + stream.cache[key] = value, error + if self.return_on_wait and not self.wait_ready(0): + return + + def __iter__(self): + return self.gen() + +