import socket import time import re import logging from select import select from normalizeuri import normalizeuri class StreamDead(Exception): """raised when stream is dead""" def parse_uri(uri): scheme, _, hostport = uri.rpartition('://') scheme = scheme or 'tcp' if scheme != 'tcp': raise ValueError(f'scheme {scheme!r} not supported') host, _, port = hostport.rpartition(':') host = host or 'localhost' return host, int(port) NETBUFSIZE = 4092 INF = float('inf') class Base: select_read = {} select_write = {} def close(self): logging.info('CLOSE BASE') def finish_events(self, *args): logging.info('FINISH BASE') class Stream(Base): _last_time = None dead = False max_offline = 20 ping_timeout = 3 _next_ping = INF _ping_deadline = 0 _deadline = INF _next_connect = 0 def __init__(self, uri, name=None, timeout=5, encoding='latin-1', **kwds): self.name = name or uri self.uri = uri self.tags = {} self.streamname = self.uri self.encoding = encoding self.timeout = timeout self.socket = None self.cache = {} # dict of event self.errors = {} self.start_time = time.time() self.next_hour = (self.start_time // 3600 + 1) * 3600 self.generator = self.event_generator() try: self.connect() self.init(**kwds) except Exception as e: logging.info('failled connecting %s %r', self.uri, e) raise def connect(self): self.socket = socket.create_connection(parse_uri(self.uri)) self.select_read[self.socket.fileno()] = self self.settimeout(self.timeout) self.uri = self.tags['stream'] = f'{normalizeuri(self.uri)}' logging.info('connected %s', self.uri) self._buffer = [] self._deadline = INF self._next_connect = 0 self._pinged = False def init(self): raise NotImplementedError def ping(self): raise NotImplementedError def pong(self): self._alive = True def close(self): """Do our best to close a socket.""" if self.socket is None: return self.select_read.pop(self.socket.fileno(), None) logging.info('close socket %s', self.uri) try: self.socket.shutdown(socket.SHUT_RDWR) except socket.error: pass try: self.socket.close() except socket.error: pass self.socket = None def is_offline(self): """try restart if needed""" if self.socket is None: now = time.time() if now < self._next_connect: return True try: self.connect() self.init() except Exception as e: if now > self._deadline: self.close() self.dead = min(now, self._last_live + 1) return True if self._deadline == INF: logging.info(f'error %r connecting to %s retrying for %s sec', e, self.uri, self.max_offline) self._deadline = now + self.max_offline self._next_connect = now + 0.5 return True return False def settimeout(self, timeout): self.timeout = timeout if self.socket: self.socket.settimeout(timeout) def notimeout(self): if self.socket: self.socket.settimeout(0) def send(self, line): try: self.socket.sendall(line.encode(self.encoding)) self.socket.sendall(b'\n') except Exception as e: self.close() e.args = ('send:' + e.args[0],) raise def get_lines(self): """generator returning lines as bytes""" if self.is_offline(): return now = time.time() if now > self._next_ping: if self._next_ping == self._ping_deadline: self.log.info('no pong from %s', self.uri) self.close() return self.ping() self._last_live = self._next_ping - self.ping_timeout self._next_ping = self._ping_deadline = now + self._next_ping buffer = self._buffer while 1: try: received = self.socket.recv(NETBUFSIZE) if not received: raise ConnectionAbortedError('connection closed by other end') except (TimeoutError, BlockingIOError): break except Exception: self._last_live = now self.close() raise splitted = received.split(b'\n') if len(splitted) == 1: buffer.append(received) else: self._next_ping = now + self.ping_timeout buffer.append(splitted[0]) splitted[0] = b''.join(buffer) buffer[:] = [splitted.pop()] for line in splitted: yield line.decode(self.encoding) if len(received) < NETBUFSIZE and self.timeout == 0: break def event_generator(self): """a generator returning events events are (, , , , ) kind is one of 'error', 'value', 'stream' """ raise NotImplementedError def get_tags(self, key): """get tags for key""" raise NotImplementedError def finish_events(self, events, end_time): for key in list(self.cache): self.cache.pop(key) events.append(('value', None, key, self.tags, end_time)) events.append(('error', 'END', key, self.tags, end_time)) def get_events(self, events, maxevents): """get available events :param events: a list events to be appended to :param maxevents: hint for max number of events to be joined there might be more after a full hour or when the stream is dying :return: True when maxevents is reached """ for event in self.generator: kind, value, key, tags, ts = event timestamp = max(self.start_time, min(ts or INF, time.time())) if timestamp != ts: event = event[:-1] + (timestamp,) if timestamp >= self.next_hour: t = (timestamp // 3600) * 3600 events.extend(e[:-1] + (t,) for e in self.cache.values()) self.next_hour = t + 3600 prev = self.cache[key][:2] if key in self.cache else (None, None) if (kind, value) != prev: if kind == 'error': if prev[0] == 'value': events.append(('value', None, key, tags, timestamp)) self.cache[key] = event elif kind == 'value': self.cache[key] = event events.append(event) if len(events) >= maxevents: return True else: if self.dead: self.finish_events(events, self.dead) raise StreamDead() self.generator = self.event_generator() return False class EventStream: return_on_wait = False # return_on_wait = True: stop generator when no more streams have buffered content # note: a stream with buffered content might not be ready to emit any event, because # of filtering def __init__(self, *udp, **streams): self.streams = streams self.udp = {v.socket.fileno(): v for v in udp} def wait_ready(self, timeout): rd, wr = select(Stream.select_read, Stream.select_write, [], timeout)[0:2] return [Stream.select_read[f] for f in rd] + [Stream.select_write[f] for f in wr] def get_events(self, maxevents=20): """return events from all streams :param maxevents: hint for max number of events to be joined there might be more after a full hour or when the stream is dying :return: list of events wait for at least one event """ events = [] while 1: for stream in self.wait_ready(1): if not isinstance(stream, Stream): # stream is a UdpStream for streamcls, uri, kwargs in stream.events(): stream = self.streams.get(uri) if stream: stream.tags.update(kwargs) logging.info('update stream %s %r', uri, kwargs) else: try: self.streams[uri] = stream = streamcls(uri, **kwargs) logging.info('added stream %s %r', uri, kwargs) except Exception as e: logging.warning('can not connect to %s %r %r', uri, e, streamcls) continue device = stream.tags.get('device') events.append(('stream', kwargs.get('instrument', ''), {'device': device}, stream.uri, int(time.time()))) for name, stream in self.streams.items(): try: if stream.get_events(events, maxevents): return events except StreamDead: # indicate stream is removed events.append(('stream', None, {}, stream.uri, int(time.time()))) self.streams.pop(name) if events: return events if events: return events def finish(self): events = [] end_time = time.time() for stream in self.streams.values(): stream.close() stream.finish_events(events, end_time) events.append(('stream', None, {}, stream.uri, end_time)) return events