diff --git a/feeder.py b/feeder.py index 33b3857..eae3081 100644 --- a/feeder.py +++ b/feeder.py @@ -1,8 +1,10 @@ import sys +import socket from streams import EventStream from nicoscache import NicosStream -from secop import ScanStream, ScanReply, send_fake_udp +from secop import ScanStream, ScanReply, TrySecopConnect, send_fake_udp from seinflux import SEHistory +from servicemanager import FrappyManager def main(): @@ -11,6 +13,18 @@ def main(): db = SEHistory(access='write') db.enable_write_access() + fm = FrappyManager() + fm.get_info() + host = socket.gethostname().split('.')[0] + for ins, procs in fm.get_procs().items(): + for service in procs: + if service in procs: + port = fm.info.get(ins, {}).get(service, {}) + if port: + uri = f'{host}:{port}' + print('CREATE', uri) + TrySecopConnect(uri) + event_map = {'value': db.add_float, 'error': db.add_error, 'stream': db.add_stream} try: diff --git a/secop.py b/secop.py index f84a474..d4d852f 100644 --- a/secop.py +++ b/secop.py @@ -4,7 +4,7 @@ import json import time import socket from select import select -from streams import Stream, Base +from streams import Stream, Base, StreamDead IDN = re.compile('.*ISSE.*,SEC[oO]P,') DESCRIBING = re.compile(r'describing \S* (.*)$') @@ -50,8 +50,8 @@ class SecopStream(Stream): raise ValueError('missing describing message') self.descr = json.loads(match.group(1)) self.device = device or self.descr['equipment_id'] - if self.device.endswith('psi.ch'): - self.device[-6:] = [] + if self.device.endswith('.psi.ch'): + self.device = self.device[:-7] self.tags['device'] = self.device self.modules = self.descr['modules'] self.convert = {} @@ -144,7 +144,7 @@ class ScanReply(UdpStream): except OSError as e: print('could not send the broadcast:', e) self.socket = sock - self.select_dict[sock.fileno()] = self + self.select_read[sock.fileno()] = self class ScanStream(UdpStream): @@ -158,7 +158,49 @@ class ScanStream(UdpStream): sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) sock.bind(('0.0.0.0', SECOP_UDP_PORT)) self.socket = sock - self.select_dict[sock.fileno()] = self + self.select_read[sock.fileno()] = self + + +class TrySecopConnect(Base): + def __init__(self, uri): + self.uri = uri + host, port = self.uri.split(':') + sock = socket.socket() + sock.setblocking(False) + self.socket = sock + self.fno = sock.fileno() + self.select_write[self.fno] = self + try: + sock.connect((host, int(port))) + except BlockingIOError: + pass + self.idn = b'' + + def events(self): + if self.select_write.pop(self.fno, None): + try: + self.socket.sendall(b'*IDN?\n') + self.idn_sent = True + print('SEND IDN', self.uri) + self.select_read[self.fno] = self + return + except Exception as e: + print('NO CONN TO', self.uri) + print(e) + else: + reply = b'' + try: + chunk = self.socket.recv(99) + if chunk: + self.idn += chunk + if b'SECoP' in self.idn: + print('CONN TO', self.uri) + yield SecopStream, self.uri, {'stream': self.uri} + if b'\n' not in self.idn: + return + except Exception as e: + print(e) + self.select_read.pop(self.fno) def send_fake_udp(uri, device=None, instrument=None): diff --git a/seinflux.py b/seinflux.py index fdabf27..859fdbf 100644 --- a/seinflux.py +++ b/seinflux.py @@ -175,7 +175,8 @@ class SEHistory(InfluxDBWrapper): def add_error(self, value, key, tags, ts): self._add_point('.'.join(key), 'error', '' if value is None else str(value), ts, tags) - def get_streams(self, instrument=None, stream=None, device=None, start=None, end=None, **tags): + def get_streams(self, instrument=None, stream=None, device=None, + start=None, end=None, include_finished=False, **tags): """get streams for one or all instruments :param instrument: None when looking for all instruments @@ -183,6 +184,7 @@ class SEHistory(InfluxDBWrapper): :param device: None or a comma separated string :param start: None or start time. None means 'since ever' :param end: None or end time. None means now (or more precise 'in a year') + :param include_finished: whether finished streams are to be included (False by default) :return: dict of tags Remark: an assignment of an instrument to a stream persists, even when the stream gets off @@ -232,7 +234,7 @@ class SEHistory(InfluxDBWrapper): if ts < current[0] + 60: # probably not containing real data current = None - if current: + if current or include_finished: tags = current[1] ins = tags.get('instrument') if ins == '0': @@ -280,7 +282,6 @@ class SEHistory(InfluxDBWrapper): stream, instrument, device = [table.tags.get(k, '') for k in ('stream', 'instrument', 'device')] elist = by_stream.setdefault(stream, []) for row in table: - print('S', stream, instrument, device, row[1], fmtime(row[0])) elist.append([row[0], row[1], stream, device, instrument]) by_key = {} for stream, rows in by_stream.items(): diff --git a/streams.py b/streams.py index 3a37f88..dd00bd3 100644 --- a/streams.py +++ b/streams.py @@ -23,7 +23,14 @@ INF = float('inf') class Base: - select_dict = {} + select_read = {} + select_write = {} + + def close(self): + print('CLOSE BASE') + + def finish_events(self, *args): + print('FINISH BASE') def short_hostname(host): @@ -67,12 +74,12 @@ class Stream(Base): self.connect() self.init(**kwds) except Exception as e: - print(self.uri, repr(e)) + print('I', self.uri, repr(e)) raise def connect(self): self.socket = socket.create_connection(parse_uri(self.uri)) - self.select_dict[self.socket.fileno()] = self + self.select_read[self.socket.fileno()] = self self.settimeout(self.timeout) host, _, port = self.uri.partition(':') # try to convert uri to host name @@ -96,7 +103,7 @@ class Stream(Base): """Do our best to close a socket.""" if self.socket is None: return - self.select_dict.pop(self.socket.fileno(), None) + self.select_read.pop(self.socket.fileno(), None) print(self.uri, 'close socket') try: self.socket.shutdown(socket.SHUT_RDWR) @@ -253,8 +260,8 @@ class EventStream: self.udp = {v.socket.fileno(): v for v in udp} def wait_ready(self, timeout): - ready = select(Stream.select_dict, [], [], timeout)[0] - return [Stream.select_dict[f] for f in ready] + rd, wr = select(Stream.select_read, Stream.select_write, [], timeout)[0:2] + return [Stream.select_read[f] for f in rd] + [Stream.select_write[f] for f in wr] def get_events(self, maxevents=20): """return events from all streams @@ -278,7 +285,7 @@ class EventStream: self.streams[uri] = stream = streamcls(uri, **kwargs) print('added stream', uri, kwargs) except Exception as e: - print('can not connect to', uri, repr(e)) + print('can not connect to', uri, repr(e), streamcls) continue device = stream.tags.get('device') events.append(('stream', kwargs.get('instrument', '0'),