From 1ed579f1cbb610e2af4c49a7da65928514763fe0 Mon Sep 17 00:00:00 2001 From: Markus Zolliker Date: Mon, 5 May 2025 08:32:34 +0200 Subject: [PATCH] get map stream -> instrument from servicemanager for feeder --- feeder.py | 7 ++++++- seinflux.py | 3 +++ streams.py | 3 +-- 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/feeder.py b/feeder.py index c5a8935..c10974f 100644 --- a/feeder.py +++ b/feeder.py @@ -25,9 +25,14 @@ def main(dbname=None): db = SEHistory(dbname, access='write') db.enable_write_access() + host = socket.gethostname().split('.')[0] fm = FrappyManager() fm.get_info() - host = socket.gethostname().split('.')[0] + # create map to get instrument from internal stream uri + insmap = db.instrument_by_stream + for ins, ports in fm.info.items(): + for p in ports.values(): + insmap[f'{host}:{p}'] = ins cfginfo = {} for ins, procs in fm.get_procs(cfginfo=cfginfo).items(): for service in procs: diff --git a/seinflux.py b/seinflux.py index bef772a..478f631 100644 --- a/seinflux.py +++ b/seinflux.py @@ -35,6 +35,7 @@ class SEHistory(InfluxDBWrapper): parser.optionxform = str parser.read([Path('~/.config/sehistory').expanduser()]) section = parser[dbname] if dbname else parser[parser.sections()[0]] + self.instrument_by_stream = {} super().__init__(*(section[k] for k in ('uri', 'token', 'org', 'bucket')), access=access) def curves(self, start=None, stop=None, measurement=('*.value', '*.target'), field='float', @@ -407,4 +408,6 @@ class SEHistory(InfluxDBWrapper): self.flush() def add_stream(self, value, tags, key, ts): + if value == '': # unknown instrument + value = self.instrument or self.instrument_by_stream.get(key, '0') self.set_instrument(key, value, ts, **tags) diff --git a/streams.py b/streams.py index e3dc056..6af7d10 100644 --- a/streams.py +++ b/streams.py @@ -254,9 +254,8 @@ class EventStream: # note: a stream with buffered content might not be ready to emit any event, because # of filtering - def __init__(self, *udp, instrument=None, **streams): + def __init__(self, *udp, **streams): self.streams = streams - self.instrument = instrument self.udp = {v.socket.fileno(): v for v in udp} def wait_ready(self, timeout):