add streams.py

was forgotten initially
This commit is contained in:
2025-01-16 12:56:31 +01:00
parent fc679cee21
commit 50f8c349ee
2 changed files with 264 additions and 16 deletions

View File

@ -21,6 +21,8 @@
# ***************************************************************************** # *****************************************************************************
import re import re
import time import time
import threading
import queue
from math import floor, ceil, copysign from math import floor, ceil, copysign
from datetime import datetime from datetime import datetime
import pandas as pd import pandas as pd
@ -44,7 +46,11 @@ def negnull2none(v):
class RegExp(str): class RegExp(str):
pass """indicates, tht this string should be treated as regexp
Usage: RegExp(<pattern>)
when used in InfluxDBWrapper.query, uses Go regexp syntax!
"""
def wildcard_filter(key, names): def wildcard_filter(key, names):
@ -135,10 +141,11 @@ class InfluxDBWrapper:
based on nicos.services.cache.database.influxdb based on nicos.services.cache.database.influxdb
(work of Konstantin Kholostov <k.kholostov@fz-juelich.de>) (work of Konstantin Kholostov <k.kholostov@fz-juelich.de>)
""" """
_update_queue = None
_write_thread = None
def __init__(self, url, token, org, bucket, create=False, watch_interval=300): def __init__(self, url, token, org, bucket, threaded=False, create=False, watch_interval=300):
self._watch_interval = watch_interval self._watch_interval = watch_interval
self._update_queue = []
self._url = url self._url = url
self._token = token self._token = token
self._org = org self._org = org
@ -150,6 +157,34 @@ class InfluxDBWrapper:
self._active_streams = {} self._active_streams = {}
self.set_time_precision(3) self.set_time_precision(3)
self.add_new_bucket(self._bucket, create) self.add_new_bucket(self._bucket, create)
if threaded:
self._update_queue = queue.Queue(100)
self._write_thread = threading.Thread(target=self._write_thread)
def _write(self, point):
if self._update_queue:
self._update_queue.put(point)
else:
self._write_api_write(bucket=self._bucket, record=[point])
def _write_thread(self):
while 1:
points = [self.write_queue.get()]
try:
while 1:
points.append(self.write_queue.get(False))
except queue.Empty:
pass
self._write_api_write(bucket=self._bucket, record=points)
event = self._wait_complete
if event:
self._wait_complete = None
event.set()
def flush(self):
if self._write_thread:
self._wait_complete = event = threading.Event()
event.wait()
def set_time_precision(self, digits): def set_time_precision(self, digits):
self.timedig = max(0, min(digits, 9)) self.timedig = max(0, min(digits, 9))
@ -159,8 +194,10 @@ class InfluxDBWrapper:
return round(timevalue.timestamp(), self.timedig) return round(timevalue.timestamp(), self.timedig)
def disconnect(self): def disconnect(self):
for _ in range(10):
self._write_thread.join(1)
for stream, last in self._active_streams.items(): for stream, last in self._active_streams.items():
self._update_queue.append(Point('_streams_') self._write(Point('_streams_')
.time(last, write_precision=self._write_precision) .time(last, write_precision=self._write_precision)
.field('interval', 0).tag('stream', stream)) .field('interval', 0).tag('stream', stream))
self.flush() self.flush()
@ -199,12 +236,6 @@ class InfluxDBWrapper:
self.get_measurements(meas) self.get_measurements(meas)
print('deleted', all) print('deleted', all)
def flush(self):
points = self._update_queue
if points:
self._update_queue = []
self._write_api_write(bucket=self._bucket, record=points)
def query(self, start=None, stop=None, interval=None, last=False, **tags): def query(self, start=None, stop=None, interval=None, last=False, **tags):
"""Returns queried data as InfluxDB tables """Returns queried data as InfluxDB tables
@ -310,7 +341,7 @@ class InfluxDBWrapper:
if ts > self._deadline: if ts > self._deadline:
dl = ts // self._watch_interval * self._watch_interval dl = ts // self._watch_interval * self._watch_interval
for stream, last in self._active_streams.items(): for stream, last in self._active_streams.items():
self._update_queue.append( self._write(
Point('_streams_') Point('_streams_')
.time(datetime.utcfromtimestamp(last), write_precision=self._write_precision) .time(datetime.utcfromtimestamp(last), write_precision=self._write_precision)
.field('interval', self._watch_interval).tag('stream', stream)) .field('interval', self._watch_interval).tag('stream', stream))
@ -321,9 +352,7 @@ class InfluxDBWrapper:
point.time(datetime.utcfromtimestamp(ts), write_precision=self._write_precision) point.time(datetime.utcfromtimestamp(ts), write_precision=self._write_precision)
for key, val in tags.items(): for key, val in tags.items():
point.tag(key, val) point.tag(key, val)
self._update_queue.append(point) self._write(point)
if len(self._update_queue) > 0: # 100
self.flush()
def write_float(self, measurement, field, value, ts, **tags): def write_float(self, measurement, field, value, ts, **tags):
# make sure value is float # make sure value is float

219
streams.py Normal file
View File

@ -0,0 +1,219 @@
import socket
import time
import re
from select import select
def parse_uri(uri):
scheme, _, hostport = uri.rpartition('://')
scheme = scheme or 'tcp'
if scheme != 'tcp':
raise ValueError(f'scheme {scheme!r} not supported')
host, _, port = hostport.rpartition(':')
host = host or 'localhost'
return host, int(port)
NETBUFSIZE = 4092
INF = float('inf')
class Base:
select_dict = {}
class Stream(Base):
_last_time = None
dead = False
max_offline = 20
ping_timeout = 3
_next_ping = INF
_ping_deadline = 0
_deadline = INF
_next_connect = 0
def __init__(self, uri, name=None, timeout=5, encoding='latin-1'):
self.name = name or uri
self.uri = uri
self.tags = {}
self.streamname = self.uri
self.encoding = encoding
self.timeout = timeout
self.socket = None
self.cache = {}
self.first_values = time.time()
try:
self.connect()
self.init()
self.first_values = 0
except Exception as e:
print(self.uri, repr(e))
raise
def connect(self):
self.socket = socket.create_connection(parse_uri(self.uri))
self.select_dict[self.socket.fileno()] = self
self.settimeout(self.timeout)
host, _, port = self.uri.partition(':')
# try to convert uri to host name
host = socket.gethostbyaddr(host)[0]
# psi special: shorten name, in case a computer has several network connections
match = re.match(r'([^.-]+)(?:-129129\d{6}|(-[~.]*|)).psi.ch', host)
if match:
host = match.group(1) + (match.group(2) or '')
self.tags['stream'] = f'{host}:{port}'
print(self.uri, '=', self.tags['stream'], 'connected')
self._buffer = []
self._deadline = INF
self._next_connect = 0
self._pinged = False
def init(self):
pass
def ping(self):
raise NotImplementedError
def pong(self):
self._alive = True
def close(self):
"""Do our best to close a socket."""
if self.socket is None:
return
self.select_dict.pop(self.socket.fileno(), None)
print(self.uri, 'close socket')
try:
self.socket.shutdown(socket.SHUT_RDWR)
except socket.error:
pass
try:
self.socket.close()
except socket.error:
pass
self.socket = None
def is_offline(self):
"""try restart if needed"""
if self.socket is None:
now = time.time()
if now < self._next_connect:
return True
try:
self.connect()
self.init()
except Exception as e:
if now > self._deadline:
self.close()
self.dead = min(now, self._last_live + 1)
return True
if self._deadline == INF:
print(f'error "{e}" connecting to {self.uri} retrying for {self.max_offline} sec')
self._deadline = now + self.max_offline
else:
print('.', end='', flush=True)
self._next_connect = now + 0.5
return True
return False
def settimeout(self, timeout):
self.timeout = timeout
if self.socket:
self.socket.settimeout(timeout)
def notimeout(self):
if self.socket:
self.socket.settimeout(0)
def send(self, line):
try:
self.socket.sendall(line.encode(self.encoding))
self.socket.sendall(b'\n')
except Exception as e:
self.close()
e.args = ('send:' + e.args[0],)
raise
def get_lines(self):
"""generator returning lines as bytes"""
if self.is_offline():
return
now = time.time()
if now > self._next_ping:
if self._next_ping == self._ping_deadline:
print(self.uri, 'no pong')
self.close()
return
self.ping()
self._last_live = self._next_ping - self.ping_timeout
self._next_ping = self._ping_deadline = now + self._next_ping
buffer = self._buffer
while 1:
try:
received = self.socket.recv(NETBUFSIZE)
if not received:
raise ConnectionAbortedError('connection closed by other end')
except TimeoutError:
break
except BlockingIOError:
break
except Exception:
self._last_live = now
self.close()
raise
splitted = received.split(b'\n')
if len(splitted) == 1:
buffer.append(received)
else:
self._next_ping = now + self.ping_timeout
buffer.append(splitted[0])
splitted[0] = b''.join(buffer)
buffer[:] = [splitted.pop()]
for line in splitted:
yield line.decode(self.encoding)
if len(received) < NETBUFSIZE and self.timeout == 0:
break
class EventGenerator:
return_on_wait = False
# return_on_wait = True: stop generator when no more streams have buffered content
# note: a stream with buffered content might not be ready to emit any event, because
# of filtering
def __init__(self, *udp, **streams):
self.streams = streams
self.udp = {v.socket.fileno(): v for v in udp}
def wait_ready(self, timeout):
ready = select(Stream.select_dict, [], [], timeout)[0]
return [Stream.select_dict[f] for f in ready]
def gen(self):
while 1:
for stream in self.wait_ready(1):
if not isinstance(stream, Stream):
for streamcls, uri, *args in stream.events():
if uri not in self.streams:
print('ADD STREAM', uri, *args)
self.streams[uri] = streamcls(uri, *args)
for name, stream in self.streams.items():
if stream.dead:
print('REMOVE STREAM', name)
for key in stream.cache:
yield key, None, 'END', stream.dead, stream.tags
self.streams.pop(name)
break
for key, value, error, ts, tags in stream.events():
ts = max(stream.first_values, min(ts or INF, time.time()))
prev = stream.cache.get(key, None)
if (value, error) != prev:
yield key, value, error, ts, tags
stream.cache[key] = value, error
if self.return_on_wait and not self.wait_ready(0):
return
def __iter__(self):
return self.gen()