feeder: use FrappyManager to connect to running frappy instances

This commit is contained in:
l_samenv
2025-03-03 14:13:11 +01:00
parent a5b5d1de8d
commit b5c770e7d8
4 changed files with 80 additions and 16 deletions

View File

@ -1,8 +1,10 @@
import sys import sys
import socket
from streams import EventStream from streams import EventStream
from nicoscache import NicosStream from nicoscache import NicosStream
from secop import ScanStream, ScanReply, send_fake_udp from secop import ScanStream, ScanReply, TrySecopConnect, send_fake_udp
from seinflux import SEHistory from seinflux import SEHistory
from servicemanager import FrappyManager
def main(): def main():
@ -11,6 +13,18 @@ def main():
db = SEHistory(access='write') db = SEHistory(access='write')
db.enable_write_access() db.enable_write_access()
fm = FrappyManager()
fm.get_info()
host = socket.gethostname().split('.')[0]
for ins, procs in fm.get_procs().items():
for service in procs:
if service in procs:
port = fm.info.get(ins, {}).get(service, {})
if port:
uri = f'{host}:{port}'
print('CREATE', uri)
TrySecopConnect(uri)
event_map = {'value': db.add_float, 'error': db.add_error, 'stream': db.add_stream} event_map = {'value': db.add_float, 'error': db.add_error, 'stream': db.add_stream}
try: try:

View File

@ -4,7 +4,7 @@ import json
import time import time
import socket import socket
from select import select from select import select
from streams import Stream, Base from streams import Stream, Base, StreamDead
IDN = re.compile('.*ISSE.*,SEC[oO]P,') IDN = re.compile('.*ISSE.*,SEC[oO]P,')
DESCRIBING = re.compile(r'describing \S* (.*)$') DESCRIBING = re.compile(r'describing \S* (.*)$')
@ -50,8 +50,8 @@ class SecopStream(Stream):
raise ValueError('missing describing message') raise ValueError('missing describing message')
self.descr = json.loads(match.group(1)) self.descr = json.loads(match.group(1))
self.device = device or self.descr['equipment_id'] self.device = device or self.descr['equipment_id']
if self.device.endswith('psi.ch'): if self.device.endswith('.psi.ch'):
self.device[-6:] = [] self.device = self.device[:-7]
self.tags['device'] = self.device self.tags['device'] = self.device
self.modules = self.descr['modules'] self.modules = self.descr['modules']
self.convert = {} self.convert = {}
@ -144,7 +144,7 @@ class ScanReply(UdpStream):
except OSError as e: except OSError as e:
print('could not send the broadcast:', e) print('could not send the broadcast:', e)
self.socket = sock self.socket = sock
self.select_dict[sock.fileno()] = self self.select_read[sock.fileno()] = self
class ScanStream(UdpStream): class ScanStream(UdpStream):
@ -158,7 +158,49 @@ class ScanStream(UdpStream):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
sock.bind(('0.0.0.0', SECOP_UDP_PORT)) sock.bind(('0.0.0.0', SECOP_UDP_PORT))
self.socket = sock self.socket = sock
self.select_dict[sock.fileno()] = self self.select_read[sock.fileno()] = self
class TrySecopConnect(Base):
def __init__(self, uri):
self.uri = uri
host, port = self.uri.split(':')
sock = socket.socket()
sock.setblocking(False)
self.socket = sock
self.fno = sock.fileno()
self.select_write[self.fno] = self
try:
sock.connect((host, int(port)))
except BlockingIOError:
pass
self.idn = b''
def events(self):
if self.select_write.pop(self.fno, None):
try:
self.socket.sendall(b'*IDN?\n')
self.idn_sent = True
print('SEND IDN', self.uri)
self.select_read[self.fno] = self
return
except Exception as e:
print('NO CONN TO', self.uri)
print(e)
else:
reply = b''
try:
chunk = self.socket.recv(99)
if chunk:
self.idn += chunk
if b'SECoP' in self.idn:
print('CONN TO', self.uri)
yield SecopStream, self.uri, {'stream': self.uri}
if b'\n' not in self.idn:
return
except Exception as e:
print(e)
self.select_read.pop(self.fno)
def send_fake_udp(uri, device=None, instrument=None): def send_fake_udp(uri, device=None, instrument=None):

View File

@ -175,7 +175,8 @@ class SEHistory(InfluxDBWrapper):
def add_error(self, value, key, tags, ts): def add_error(self, value, key, tags, ts):
self._add_point('.'.join(key), 'error', '' if value is None else str(value), ts, tags) self._add_point('.'.join(key), 'error', '' if value is None else str(value), ts, tags)
def get_streams(self, instrument=None, stream=None, device=None, start=None, end=None, **tags): def get_streams(self, instrument=None, stream=None, device=None,
start=None, end=None, include_finished=False, **tags):
"""get streams for one or all instruments """get streams for one or all instruments
:param instrument: None when looking for all instruments :param instrument: None when looking for all instruments
@ -183,6 +184,7 @@ class SEHistory(InfluxDBWrapper):
:param device: None or a comma separated string :param device: None or a comma separated string
:param start: None or start time. None means 'since ever' :param start: None or start time. None means 'since ever'
:param end: None or end time. None means now (or more precise 'in a year') :param end: None or end time. None means now (or more precise 'in a year')
:param include_finished: whether finished streams are to be included (False by default)
:return: dict <stream> of tags :return: dict <stream> of tags
Remark: an assignment of an instrument to a stream persists, even when the stream gets off Remark: an assignment of an instrument to a stream persists, even when the stream gets off
@ -232,7 +234,7 @@ class SEHistory(InfluxDBWrapper):
if ts < current[0] + 60: if ts < current[0] + 60:
# probably not containing real data # probably not containing real data
current = None current = None
if current: if current or include_finished:
tags = current[1] tags = current[1]
ins = tags.get('instrument') ins = tags.get('instrument')
if ins == '0': if ins == '0':
@ -280,7 +282,6 @@ class SEHistory(InfluxDBWrapper):
stream, instrument, device = [table.tags.get(k, '') for k in ('stream', 'instrument', 'device')] stream, instrument, device = [table.tags.get(k, '') for k in ('stream', 'instrument', 'device')]
elist = by_stream.setdefault(stream, []) elist = by_stream.setdefault(stream, [])
for row in table: for row in table:
print('S', stream, instrument, device, row[1], fmtime(row[0]))
elist.append([row[0], row[1], stream, device, instrument]) elist.append([row[0], row[1], stream, device, instrument])
by_key = {} by_key = {}
for stream, rows in by_stream.items(): for stream, rows in by_stream.items():

View File

@ -23,7 +23,14 @@ INF = float('inf')
class Base: class Base:
select_dict = {} select_read = {}
select_write = {}
def close(self):
print('CLOSE BASE')
def finish_events(self, *args):
print('FINISH BASE')
def short_hostname(host): def short_hostname(host):
@ -67,12 +74,12 @@ class Stream(Base):
self.connect() self.connect()
self.init(**kwds) self.init(**kwds)
except Exception as e: except Exception as e:
print(self.uri, repr(e)) print('I', self.uri, repr(e))
raise raise
def connect(self): def connect(self):
self.socket = socket.create_connection(parse_uri(self.uri)) self.socket = socket.create_connection(parse_uri(self.uri))
self.select_dict[self.socket.fileno()] = self self.select_read[self.socket.fileno()] = self
self.settimeout(self.timeout) self.settimeout(self.timeout)
host, _, port = self.uri.partition(':') host, _, port = self.uri.partition(':')
# try to convert uri to host name # try to convert uri to host name
@ -96,7 +103,7 @@ class Stream(Base):
"""Do our best to close a socket.""" """Do our best to close a socket."""
if self.socket is None: if self.socket is None:
return return
self.select_dict.pop(self.socket.fileno(), None) self.select_read.pop(self.socket.fileno(), None)
print(self.uri, 'close socket') print(self.uri, 'close socket')
try: try:
self.socket.shutdown(socket.SHUT_RDWR) self.socket.shutdown(socket.SHUT_RDWR)
@ -253,8 +260,8 @@ class EventStream:
self.udp = {v.socket.fileno(): v for v in udp} self.udp = {v.socket.fileno(): v for v in udp}
def wait_ready(self, timeout): def wait_ready(self, timeout):
ready = select(Stream.select_dict, [], [], timeout)[0] rd, wr = select(Stream.select_read, Stream.select_write, [], timeout)[0:2]
return [Stream.select_dict[f] for f in ready] return [Stream.select_read[f] for f in rd] + [Stream.select_write[f] for f in wr]
def get_events(self, maxevents=20): def get_events(self, maxevents=20):
"""return events from all streams """return events from all streams
@ -278,7 +285,7 @@ class EventStream:
self.streams[uri] = stream = streamcls(uri, **kwargs) self.streams[uri] = stream = streamcls(uri, **kwargs)
print('added stream', uri, kwargs) print('added stream', uri, kwargs)
except Exception as e: except Exception as e:
print('can not connect to', uri, repr(e)) print('can not connect to', uri, repr(e), streamcls)
continue continue
device = stream.tags.get('device') device = stream.tags.get('device')
events.append(('stream', kwargs.get('instrument', '0'), events.append(('stream', kwargs.get('instrument', '0'),