- use gethostbyaddr only in case of numeric IP - hard wired reverse CNAME (network alias) for linse-c
300 lines
10 KiB
Python
300 lines
10 KiB
Python
import socket
|
|
import time
|
|
import re
|
|
import logging
|
|
from select import select
|
|
from normalizeuri import normalizeuri
|
|
|
|
|
|
class StreamDead(Exception):
|
|
"""raised when stream is dead"""
|
|
|
|
|
|
def parse_uri(uri):
|
|
scheme, _, hostport = uri.rpartition('://')
|
|
scheme = scheme or 'tcp'
|
|
if scheme != 'tcp':
|
|
raise ValueError(f'scheme {scheme!r} not supported')
|
|
host, _, port = hostport.rpartition(':')
|
|
host = host or 'localhost'
|
|
return host, int(port)
|
|
|
|
|
|
NETBUFSIZE = 4092
|
|
INF = float('inf')
|
|
|
|
|
|
class Base:
|
|
select_read = {}
|
|
select_write = {}
|
|
|
|
def close(self):
|
|
logging.info('CLOSE BASE')
|
|
|
|
def finish_events(self, *args):
|
|
logging.info('FINISH BASE')
|
|
|
|
|
|
class Stream(Base):
|
|
_last_time = None
|
|
dead = False
|
|
max_offline = 20
|
|
ping_timeout = 3
|
|
_next_ping = INF
|
|
_ping_deadline = 0
|
|
_deadline = INF
|
|
_next_connect = 0
|
|
|
|
def __init__(self, uri, name=None, timeout=5, encoding='latin-1', **kwds):
|
|
self.name = name or uri
|
|
self.uri = uri
|
|
self.tags = {}
|
|
self.streamname = self.uri
|
|
self.encoding = encoding
|
|
self.timeout = timeout
|
|
self.socket = None
|
|
self.cache = {} # dict <key> of event
|
|
self.errors = {}
|
|
self.start_time = time.time()
|
|
self.next_hour = (self.start_time // 3600 + 1) * 3600
|
|
self.generator = self.event_generator()
|
|
try:
|
|
self.connect()
|
|
self.init(**kwds)
|
|
except Exception as e:
|
|
logging.info('failled connecting %s %r', self.uri, e)
|
|
raise
|
|
|
|
def connect(self):
|
|
self.socket = socket.create_connection(parse_uri(self.uri))
|
|
self.select_read[self.socket.fileno()] = self
|
|
self.settimeout(self.timeout)
|
|
self.uri = self.tags['stream'] = f'{normalizeuri(self.uri)}'
|
|
logging.info('connected %s', self.uri)
|
|
self._buffer = []
|
|
self._deadline = INF
|
|
self._next_connect = 0
|
|
self._pinged = False
|
|
|
|
def init(self):
|
|
raise NotImplementedError
|
|
|
|
def ping(self):
|
|
raise NotImplementedError
|
|
|
|
def pong(self):
|
|
self._alive = True
|
|
|
|
def close(self):
|
|
"""Do our best to close a socket."""
|
|
if self.socket is None:
|
|
return
|
|
self.select_read.pop(self.socket.fileno(), None)
|
|
logging.info('close socket %s', self.uri)
|
|
try:
|
|
self.socket.shutdown(socket.SHUT_RDWR)
|
|
except socket.error:
|
|
pass
|
|
try:
|
|
self.socket.close()
|
|
except socket.error:
|
|
pass
|
|
self.socket = None
|
|
|
|
def is_offline(self):
|
|
"""try restart if needed"""
|
|
if self.socket is None:
|
|
now = time.time()
|
|
if now < self._next_connect:
|
|
return True
|
|
try:
|
|
self.connect()
|
|
self.init()
|
|
except Exception as e:
|
|
if now > self._deadline:
|
|
self.close()
|
|
self.dead = min(now, self._last_live + 1)
|
|
return True
|
|
if self._deadline == INF:
|
|
logging.info(f'error %r connecting to %s retrying for %s sec',
|
|
e, self.uri, self.max_offline)
|
|
self._deadline = now + self.max_offline
|
|
self._next_connect = now + 0.5
|
|
return True
|
|
return False
|
|
|
|
def settimeout(self, timeout):
|
|
self.timeout = timeout
|
|
if self.socket:
|
|
self.socket.settimeout(timeout)
|
|
|
|
def notimeout(self):
|
|
if self.socket:
|
|
self.socket.settimeout(0)
|
|
|
|
def send(self, line):
|
|
try:
|
|
self.socket.sendall(line.encode(self.encoding))
|
|
self.socket.sendall(b'\n')
|
|
except Exception as e:
|
|
self.close()
|
|
e.args = ('send:' + e.args[0],)
|
|
raise
|
|
|
|
def get_lines(self):
|
|
"""generator returning lines as bytes"""
|
|
if self.is_offline():
|
|
return
|
|
now = time.time()
|
|
if now > self._next_ping:
|
|
if self._next_ping == self._ping_deadline:
|
|
self.log.info('no pong from %s', self.uri)
|
|
self.close()
|
|
return
|
|
self.ping()
|
|
self._last_live = self._next_ping - self.ping_timeout
|
|
self._next_ping = self._ping_deadline = now + self._next_ping
|
|
buffer = self._buffer
|
|
while 1:
|
|
try:
|
|
received = self.socket.recv(NETBUFSIZE)
|
|
if not received:
|
|
raise ConnectionAbortedError('connection closed by other end')
|
|
except (TimeoutError, BlockingIOError):
|
|
break
|
|
except Exception:
|
|
self._last_live = now
|
|
self.close()
|
|
raise
|
|
splitted = received.split(b'\n')
|
|
if len(splitted) == 1:
|
|
buffer.append(received)
|
|
else:
|
|
self._next_ping = now + self.ping_timeout
|
|
buffer.append(splitted[0])
|
|
splitted[0] = b''.join(buffer)
|
|
buffer[:] = [splitted.pop()]
|
|
for line in splitted:
|
|
yield line.decode(self.encoding)
|
|
if len(received) < NETBUFSIZE and self.timeout == 0:
|
|
break
|
|
|
|
def event_generator(self):
|
|
"""a generator returning events
|
|
|
|
events are (<kind>, <value>, <key>, <tags>, <timestamp>)
|
|
kind is one of 'error', 'value', 'stream'
|
|
"""
|
|
raise NotImplementedError
|
|
|
|
def get_tags(self, key):
|
|
"""get tags for key"""
|
|
raise NotImplementedError
|
|
|
|
def finish_events(self, events, end_time):
|
|
for key in list(self.cache):
|
|
self.cache.pop(key)
|
|
events.append(('value', None, key, self.tags, end_time))
|
|
events.append(('error', 'END', key, self.tags, end_time))
|
|
|
|
def get_events(self, events, maxevents):
|
|
"""get available events
|
|
|
|
:param events: a list events to be appended to
|
|
:param maxevents: hint for max number of events to be joined
|
|
there might be more after a full hour or when the stream is dying
|
|
:return: True when maxevents is reached
|
|
"""
|
|
for event in self.generator:
|
|
kind, value, key, tags, ts = event
|
|
timestamp = max(self.start_time, min(ts or INF, time.time()))
|
|
if timestamp != ts:
|
|
event = event[:-1] + (timestamp,)
|
|
if timestamp >= self.next_hour:
|
|
t = (timestamp // 3600) * 3600
|
|
events.extend(e[:-1] + (t,) for e in self.cache.values())
|
|
self.next_hour = t + 3600
|
|
prev = self.cache[key][:2] if key in self.cache else (None, None)
|
|
if (kind, value) != prev:
|
|
if kind == 'error':
|
|
if prev[0] == 'value':
|
|
events.append(('value', None, key, tags, timestamp))
|
|
self.cache[key] = event
|
|
elif kind == 'value':
|
|
self.cache[key] = event
|
|
events.append(event)
|
|
if len(events) >= maxevents:
|
|
return True
|
|
else:
|
|
if self.dead:
|
|
self.finish_events(events, self.dead)
|
|
raise StreamDead()
|
|
self.generator = self.event_generator()
|
|
return False
|
|
|
|
|
|
class EventStream:
|
|
return_on_wait = False
|
|
# return_on_wait = True: stop generator when no more streams have buffered content
|
|
# note: a stream with buffered content might not be ready to emit any event, because
|
|
# of filtering
|
|
|
|
def __init__(self, *udp, **streams):
|
|
self.streams = streams
|
|
self.udp = {v.socket.fileno(): v for v in udp}
|
|
|
|
def wait_ready(self, timeout):
|
|
rd, wr = select(Stream.select_read, Stream.select_write, [], timeout)[0:2]
|
|
return [Stream.select_read[f] for f in rd] + [Stream.select_write[f] for f in wr]
|
|
|
|
def get_events(self, maxevents=20):
|
|
"""return events from all streams
|
|
|
|
:param maxevents: hint for max number of events to be joined
|
|
there might be more after a full hour or when the stream is dying
|
|
:return: list of events
|
|
|
|
wait for at least one event
|
|
"""
|
|
events = []
|
|
while 1:
|
|
for stream in self.wait_ready(1):
|
|
if not isinstance(stream, Stream):
|
|
# stream is a UdpStream
|
|
for streamcls, uri, kwargs in stream.events():
|
|
stream = self.streams.get(uri)
|
|
if stream:
|
|
stream.tags.update(kwargs)
|
|
logging.info('update stream %s %r', uri, kwargs)
|
|
else:
|
|
try:
|
|
self.streams[uri] = stream = streamcls(uri, **kwargs)
|
|
logging.info('added stream %s %r', uri, kwargs)
|
|
except Exception as e:
|
|
logging.warning('can not connect to %s %r %r', uri, e, streamcls)
|
|
continue
|
|
device = stream.tags.get('device')
|
|
events.append(('stream', kwargs.get('instrument', ''),
|
|
{'device': device}, stream.uri, int(time.time())))
|
|
for name, stream in self.streams.items():
|
|
try:
|
|
if stream.get_events(events, maxevents):
|
|
return events
|
|
except StreamDead:
|
|
# indicate stream is removed
|
|
events.append(('stream', None, {}, stream.uri, int(time.time())))
|
|
self.streams.pop(name)
|
|
if events:
|
|
return events
|
|
if events:
|
|
return events
|
|
|
|
def finish(self):
|
|
events = []
|
|
end_time = time.time()
|
|
for stream in self.streams.values():
|
|
stream.close()
|
|
stream.finish_events(events, end_time)
|
|
events.append(('stream', None, {}, stream.uri, end_time))
|
|
return events
|