get map stream -> instrument from servicemanager for feeder
This commit is contained in:
@ -25,9 +25,14 @@ def main(dbname=None):
|
|||||||
db = SEHistory(dbname, access='write')
|
db = SEHistory(dbname, access='write')
|
||||||
db.enable_write_access()
|
db.enable_write_access()
|
||||||
|
|
||||||
|
host = socket.gethostname().split('.')[0]
|
||||||
fm = FrappyManager()
|
fm = FrappyManager()
|
||||||
fm.get_info()
|
fm.get_info()
|
||||||
host = socket.gethostname().split('.')[0]
|
# create map to get instrument from internal stream uri
|
||||||
|
insmap = db.instrument_by_stream
|
||||||
|
for ins, ports in fm.info.items():
|
||||||
|
for p in ports.values():
|
||||||
|
insmap[f'{host}:{p}'] = ins
|
||||||
cfginfo = {}
|
cfginfo = {}
|
||||||
for ins, procs in fm.get_procs(cfginfo=cfginfo).items():
|
for ins, procs in fm.get_procs(cfginfo=cfginfo).items():
|
||||||
for service in procs:
|
for service in procs:
|
||||||
|
@ -35,6 +35,7 @@ class SEHistory(InfluxDBWrapper):
|
|||||||
parser.optionxform = str
|
parser.optionxform = str
|
||||||
parser.read([Path('~/.config/sehistory').expanduser()])
|
parser.read([Path('~/.config/sehistory').expanduser()])
|
||||||
section = parser[dbname] if dbname else parser[parser.sections()[0]]
|
section = parser[dbname] if dbname else parser[parser.sections()[0]]
|
||||||
|
self.instrument_by_stream = {}
|
||||||
super().__init__(*(section[k] for k in ('uri', 'token', 'org', 'bucket')), access=access)
|
super().__init__(*(section[k] for k in ('uri', 'token', 'org', 'bucket')), access=access)
|
||||||
|
|
||||||
def curves(self, start=None, stop=None, measurement=('*.value', '*.target'), field='float',
|
def curves(self, start=None, stop=None, measurement=('*.value', '*.target'), field='float',
|
||||||
@ -407,4 +408,6 @@ class SEHistory(InfluxDBWrapper):
|
|||||||
self.flush()
|
self.flush()
|
||||||
|
|
||||||
def add_stream(self, value, tags, key, ts):
|
def add_stream(self, value, tags, key, ts):
|
||||||
|
if value == '': # unknown instrument
|
||||||
|
value = self.instrument or self.instrument_by_stream.get(key, '0')
|
||||||
self.set_instrument(key, value, ts, **tags)
|
self.set_instrument(key, value, ts, **tags)
|
||||||
|
@ -254,9 +254,8 @@ class EventStream:
|
|||||||
# note: a stream with buffered content might not be ready to emit any event, because
|
# note: a stream with buffered content might not be ready to emit any event, because
|
||||||
# of filtering
|
# of filtering
|
||||||
|
|
||||||
def __init__(self, *udp, instrument=None, **streams):
|
def __init__(self, *udp, **streams):
|
||||||
self.streams = streams
|
self.streams = streams
|
||||||
self.instrument = instrument
|
|
||||||
self.udp = {v.socket.fileno(): v for v in udp}
|
self.udp = {v.socket.fileno(): v for v in udp}
|
||||||
|
|
||||||
def wait_ready(self, timeout):
|
def wait_ready(self, timeout):
|
||||||
|
Reference in New Issue
Block a user