Files
sehistory/feeder.py
2025-02-26 10:43:36 +01:00

31 lines
794 B
Python

import sys
from streams import EventStream
from nicoscache import NicosStream
from secop import ScanStream, ScanReply, send_fake_udp
from seinflux import SEHistory
def main():
# egen = EventStream(ScanReply(), ScanStream(), n=NicosStream('localhost:14002'))
egen = EventStream(ScanReply(), ScanStream())
db = SEHistory(access='write')
db.enable_write_access()
event_map = {'value': db.add_float, 'error': db.add_error, 'stream': db.add_stream}
try:
while 1:
for kind, *args in egen.get_events():
event_map[kind](*args)
db.flush()
finally:
for kind, *args in egen.finish():
event_map[kind](*args)
db.disconnect()
if len(sys.argv) > 1:
send_fake_udp(*sys.argv[1:])
else:
main()