import socket import time import re from select import select class StreamDead(Exception): """raised when stream is dead""" def parse_uri(uri): scheme, _, hostport = uri.rpartition('://') scheme = scheme or 'tcp' if scheme != 'tcp': raise ValueError(f'scheme {scheme!r} not supported') host, _, port = hostport.rpartition(':') host = host or 'localhost' return host, int(port) NETBUFSIZE = 4092 INF = float('inf') class Base: select_dict = {} def short_hostname(host): """psi/lin/se special - treat case where -129129xxxx is appended """ host = socket.gethostbyaddr(host)[0] if host == 'localhost': host = socket.gethostname() else: match = re.match(r'([^.-]+)(?:-129129\d{6}|(-[~.]*|)).psi.ch', host) if match: host = match.group(1) + (match.group(2) or '') return host class Stream(Base): _last_time = None dead = False max_offline = 20 ping_timeout = 3 _next_ping = INF _ping_deadline = 0 _deadline = INF _next_connect = 0 def __init__(self, uri, name=None, timeout=5, encoding='latin-1', **kwds): self.name = name or uri self.uri = uri self.tags = {} self.streamname = self.uri self.encoding = encoding self.timeout = timeout self.socket = None self.cache = {} # dict of event self.errors = {} self.start_time = time.time() self.next_hour = (self.start_time // 3600 + 1) * 3600 self.generator = self.event_generator() try: self.connect() self.init(**kwds) except Exception as e: print(self.uri, repr(e)) raise def connect(self): self.socket = socket.create_connection(parse_uri(self.uri)) self.select_dict[self.socket.fileno()] = self self.settimeout(self.timeout) host, _, port = self.uri.partition(':') # try to convert uri to host name host = short_hostname(host) self.uri = self.tags['stream'] = f'{host}:{port}' print(f'{host}:{port}', '=', self.uri, 'connected') self._buffer = [] self._deadline = INF self._next_connect = 0 self._pinged = False def init(self): raise NotImplementedError def ping(self): raise NotImplementedError def pong(self): self._alive = True def close(self): """Do our best to close a socket.""" if self.socket is None: return self.select_dict.pop(self.socket.fileno(), None) print(self.uri, 'close socket') try: self.socket.shutdown(socket.SHUT_RDWR) except socket.error: pass try: self.socket.close() except socket.error: pass self.socket = None def is_offline(self): """try restart if needed""" if self.socket is None: now = time.time() if now < self._next_connect: return True try: self.connect() self.init() except Exception as e: if now > self._deadline: self.close() self.dead = min(now, self._last_live + 1) return True if self._deadline == INF: print(f'error "{e}" connecting to {self.uri} retrying for {self.max_offline} sec') self._deadline = now + self.max_offline else: print('.', end='', flush=True) self._next_connect = now + 0.5 return True return False def settimeout(self, timeout): self.timeout = timeout if self.socket: self.socket.settimeout(timeout) def notimeout(self): if self.socket: self.socket.settimeout(0) def send(self, line): try: self.socket.sendall(line.encode(self.encoding)) self.socket.sendall(b'\n') except Exception as e: self.close() e.args = ('send:' + e.args[0],) raise def get_lines(self): """generator returning lines as bytes""" if self.is_offline(): return now = time.time() if now > self._next_ping: if self._next_ping == self._ping_deadline: print(self.uri, 'no pong') self.close() return self.ping() self._last_live = self._next_ping - self.ping_timeout self._next_ping = self._ping_deadline = now + self._next_ping buffer = self._buffer while 1: try: received = self.socket.recv(NETBUFSIZE) if not received: raise ConnectionAbortedError('connection closed by other end') except (TimeoutError, BlockingIOError): break except Exception: self._last_live = now self.close() raise splitted = received.split(b'\n') if len(splitted) == 1: buffer.append(received) else: self._next_ping = now + self.ping_timeout buffer.append(splitted[0]) splitted[0] = b''.join(buffer) buffer[:] = [splitted.pop()] for line in splitted: yield line.decode(self.encoding) if len(received) < NETBUFSIZE and self.timeout == 0: break def event_generator(self): """a generator returning events events are (, , , , ) kind is one of 'error', 'value', 'stream' """ raise NotImplementedError def get_tags(self, key): """get tags for key""" raise NotImplementedError def finish_events(self, events, end_time): for key in list(self.cache): self.cache.pop(key) events.append(('value', None, key, self.tags, end_time)) events.append(('error', 'END', key, self.tags, end_time)) def get_events(self, events, maxevents): """get available events :param events: a list events to be appended to :param maxevents: hint for max number of events to be joined there might be more after a full hour or when the stream is dying :return: True when maxevents is reached """ for event in self.generator: kind, value, key, tags, ts = event timestamp = max(self.start_time, min(ts or INF, time.time())) if timestamp != ts: event = event[:-1] + (timestamp,) if timestamp >= self.next_hour: t = (timestamp // 3600) * 3600 events.extend(e[:-1] + (t,) for e in self.cache.values()) self.next_hour = t + 3600 prev = self.cache[key][:2] if key in self.cache else (None, None) if (kind, value) != prev: if kind == 'error': if prev[0] == 'value': events.append(('value', None, key, tags, timestamp)) self.cache[key] = event elif kind == 'value': self.cache[key] = event events.append(event) if len(events) >= maxevents: return True else: if self.dead: self.finish_events(events, self.dead) raise StreamDead() self.generator = self.event_generator() return False class EventStream: return_on_wait = False # return_on_wait = True: stop generator when no more streams have buffered content # note: a stream with buffered content might not be ready to emit any event, because # of filtering def __init__(self, *udp, instrument=None, **streams): self.streams = streams self.instrument = instrument self.udp = {v.socket.fileno(): v for v in udp} def wait_ready(self, timeout): ready = select(Stream.select_dict, [], [], timeout)[0] return [Stream.select_dict[f] for f in ready] def get_events(self, maxevents=20): """return events from all streams :param maxevents: hint for max number of events to be joined there might be more after a full hour or when the stream is dying :return: list of events wait for at least one event """ events = [] while 1: for stream in self.wait_ready(1): if not isinstance(stream, Stream): for streamcls, uri, kwargs in stream.events(): stream = self.streams.get(uri) if stream: stream.tags.update(kwargs) else: try: self.streams[uri] = stream = streamcls(uri, **kwargs) print('added stream', uri, kwargs) except Exception as e: print('can not connect to', uri, repr(e)) continue device = stream.tags.get('device') events.append(('stream', kwargs.get('instrument', '0'), {'device': device}, stream.uri, int(time.time()))) for name, stream in self.streams.items(): try: if stream.get_events(events, maxevents): return events except StreamDead: # indicate stream is removed events.append(('stream', None, {}, uri, int(time.time()))) self.streams.pop(name) if events: return events if events: return events def finish(self): events = [] end_time = time.time() for stream in self.streams.values(): stream.close() stream.finish_events(events, end_time) events.append(('stream', None, {}, stream.uri, end_time)) return events