Files
sehistory/feeder.py

82 lines
2.3 KiB
Python

import sys
import signal
import logging
from os.path import expanduser
sys.path.append(expanduser('~'))
import socket
from streams import EventStream
from nicoscache import NicosStream
from secop import ScanStream, ScanReply, TrySecopConnect, send_fake_udp
from seinflux import SEHistory
from servicemanager import FrappyManager
USAGE = """
Usage:
start server
python feeder.py -d [<db instance>]
add a SECoP connection:
python feeder.py uri [device] [instrument]
"""
logging.basicConfig(filename='logfile.log', filemode='w', level=logging.INFO,
format='%(asctime)s %(levelname)s %(message)s')
def main(dbname=None, access='write'):
# egen = EventStream(ScanReply(), ScanStream(), n=NicosStream('localhost:14002'))
egen = EventStream(ScanReply(), ScanStream())
db = SEHistory(dbname, access=access)
db.enable_write_access()
host = socket.gethostname().split('.')[0]
fm = FrappyManager()
fm.get_info()
# create map to get instrument from internal stream uri
insmap = db.instrument_by_stream
for ins, ports in fm.info.items():
for p in ports.values():
insmap[f'{host}:{p}'] = ins
cfginfo = {}
for ins, procs in fm.get_procs(cfginfo=cfginfo).items():
for service in procs:
if service in procs:
port = fm.info.get(ins, {}).get(service, {})
if port:
uri = f'{host}:{port}'
logging.info('CREATE %s %s %s', uri, ins, cfginfo.get((ins, service)))
TrySecopConnect(uri)
db.set_instrument(uri, ins)
event_map = {'value': db.add_float, 'error': db.add_error, 'stream': db.add_stream}
running = [True]
def handler(self, num, frame):
running.clear()
signal.signal(signal.SIGTERM, handler)
try:
while running:
for kind, *args in egen.get_events():
event_map[kind](*args)
db.flush()
finally:
for kind, *args in egen.finish():
event_map[kind](*args)
db.disconnect()
logging.info('gracefully finished')
if len(sys.argv) >= 3:
if sys.argv[1] == '-d':
main(*sys.argv[2:])
else:
send_fake_udp(*sys.argv[1:])
else:
print(USAGE)