proper logging, add signal handler for graceful termination
This commit is contained in:
17
feeder.py
17
feeder.py
@ -1,4 +1,6 @@
|
||||
import sys
|
||||
import signal
|
||||
import logging
|
||||
from os.path import expanduser
|
||||
sys.path.append(expanduser('~'))
|
||||
import socket
|
||||
@ -20,6 +22,9 @@ Usage:
|
||||
python feeder.py uri [device] [instrument]
|
||||
"""
|
||||
|
||||
logging.basicConfig(filename='logfile.log', filemode='w', level=logging.INFO,
|
||||
format='%(asctime)s %(levelname)s %(message)s')
|
||||
|
||||
|
||||
def main(dbname=None, access='write'):
|
||||
# egen = EventStream(ScanReply(), ScanStream(), n=NicosStream('localhost:14002'))
|
||||
@ -42,14 +47,21 @@ def main(dbname=None, access='write'):
|
||||
port = fm.info.get(ins, {}).get(service, {})
|
||||
if port:
|
||||
uri = f'{host}:{port}'
|
||||
print('CREATE', uri, ins, cfginfo.get((ins, service)))
|
||||
logging.info('CREATE %s %s %s', uri, ins, cfginfo.get((ins, service)))
|
||||
TrySecopConnect(uri)
|
||||
db.set_instrument(uri, ins)
|
||||
|
||||
event_map = {'value': db.add_float, 'error': db.add_error, 'stream': db.add_stream}
|
||||
|
||||
running = [True]
|
||||
|
||||
def handler(self, num, frame):
|
||||
running.clear()
|
||||
|
||||
signal.signal(signal.SIGTERM, handler)
|
||||
|
||||
try:
|
||||
while 1:
|
||||
while running:
|
||||
for kind, *args in egen.get_events():
|
||||
event_map[kind](*args)
|
||||
db.flush()
|
||||
@ -57,6 +69,7 @@ def main(dbname=None, access='write'):
|
||||
for kind, *args in egen.finish():
|
||||
event_map[kind](*args)
|
||||
db.disconnect()
|
||||
logging.info('gracefully finished')
|
||||
|
||||
|
||||
if len(sys.argv) >= 3:
|
||||
|
25
influx.py
25
influx.py
@ -21,6 +21,7 @@
|
||||
# *****************************************************************************
|
||||
import re
|
||||
import time
|
||||
import logging
|
||||
from datetime import datetime, timezone
|
||||
from math import floor, ceil
|
||||
|
||||
@ -192,8 +193,7 @@ class InfluxDBWrapper:
|
||||
self.add_new_bucket(self._bucket, access == 'create')
|
||||
self._write_buffer = []
|
||||
self._alias = {}
|
||||
print('InfluxDBWrapper', self._url, self._org, self._bucket)
|
||||
self.debug = False
|
||||
logging.info('InfluxDBWrapper %s %s %s', self._url, self._org, self._bucket)
|
||||
|
||||
def enable_write_access(self):
|
||||
self._write_api_write = self._client.write_api(write_options=SYNCHRONOUS).write
|
||||
@ -356,22 +356,21 @@ class InfluxDBWrapper:
|
||||
msg.append(f'''|> keep(columns:["{'","'.join(columns + keylist)}"])''')
|
||||
|
||||
msg = '\n'.join(msg)
|
||||
if self.debug:
|
||||
print(msg)
|
||||
logging.debug('MSG %r', msg)
|
||||
self.msg = msg
|
||||
|
||||
try:
|
||||
reader = self._client.query_api().query_csv(msg)
|
||||
except Exception:
|
||||
print(msg)
|
||||
except Exception as e:
|
||||
logging.exception("error in query: %r", msg)
|
||||
raise
|
||||
|
||||
if self.debug:
|
||||
def readdebug(reader):
|
||||
for row in reader:
|
||||
print(row)
|
||||
yield row
|
||||
reader = readdebug(reader)
|
||||
# if self.debug:
|
||||
# def readdebug(reader):
|
||||
# for row in reader:
|
||||
# print(row)
|
||||
# yield row
|
||||
# reader = readdebug(reader)
|
||||
try:
|
||||
row = next(reader)
|
||||
except StopIteration:
|
||||
@ -416,7 +415,7 @@ class InfluxDBWrapper:
|
||||
# consume unused rows
|
||||
consumed = sum(1 for _ in rows)
|
||||
if consumed:
|
||||
print('skip', consumed, 'rows')
|
||||
logging.info('skip %r rows', consumed)
|
||||
if not row: # reader is at end
|
||||
return
|
||||
|
||||
|
@ -1,4 +1,5 @@
|
||||
import re
|
||||
import logging
|
||||
from ast import literal_eval
|
||||
from streams import Stream
|
||||
from secop import EnumConvert
|
||||
@ -154,7 +155,7 @@ class NicosStream(Stream):
|
||||
if self._init:
|
||||
raise TimeoutError('timeout receiving initial values')
|
||||
except Exception as e:
|
||||
print(self.uri, repr(e))
|
||||
logging.exception('nicos %r', self.uri)
|
||||
return
|
||||
for ts, devname, param, op, value in sorted([t, d, p, o, v] for (d, p), (o, v, t) in events.items()):
|
||||
descr = self.descr.get(devname)
|
||||
|
16
secop.py
16
secop.py
@ -3,6 +3,7 @@ import os
|
||||
import json
|
||||
import time
|
||||
import socket
|
||||
import logging
|
||||
from collections import namedtuple
|
||||
from select import select
|
||||
from streams import Stream, Base, StreamDead
|
||||
@ -112,7 +113,7 @@ class SecopStream(Stream):
|
||||
|
||||
except Exception as e:
|
||||
# probably other end closed
|
||||
print(self.uri, repr(e))
|
||||
logging.info('%r on %s', e, self.uri)
|
||||
|
||||
|
||||
SECOP_UDP_PORT = 10767
|
||||
@ -154,7 +155,7 @@ class ScanReply(UdpStream):
|
||||
sock.sendto(json.dumps(dict(SECoP='discover')).encode('utf-8'),
|
||||
('255.255.255.255', SECOP_UDP_PORT))
|
||||
except OSError as e:
|
||||
print('could not send the broadcast:', e)
|
||||
logging.info('could not send the broadcast %r:', e)
|
||||
self.socket = sock
|
||||
self.select_read[sock.fileno()] = self
|
||||
|
||||
@ -193,12 +194,11 @@ class TrySecopConnect(Base):
|
||||
try:
|
||||
self.socket.sendall(b'*IDN?\n')
|
||||
self.idn_sent = True
|
||||
print('SEND IDN', self.uri)
|
||||
logging.debug('SEND IDN %s', self.uri)
|
||||
self.select_read[self.fno] = self
|
||||
return
|
||||
except Exception as e:
|
||||
print('NO CONN TO', self.uri)
|
||||
print(e)
|
||||
logging.info('NO CONN TO %s %r', self.uri, e)
|
||||
else:
|
||||
reply = b''
|
||||
try:
|
||||
@ -206,12 +206,12 @@ class TrySecopConnect(Base):
|
||||
if chunk:
|
||||
self.idn += chunk
|
||||
if b'SECoP' in self.idn:
|
||||
print('CONN TO', self.uri)
|
||||
logging.info('connected to %s', self.uri)
|
||||
yield SecopStream, self.uri, {'stream': self.uri}
|
||||
if b'\n' not in self.idn:
|
||||
return
|
||||
except Exception as e:
|
||||
print(e)
|
||||
except Exception:
|
||||
logging.exception('receiving')
|
||||
self.select_read.pop(self.fno)
|
||||
|
||||
|
||||
|
@ -1,4 +1,5 @@
|
||||
import time
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from configparser import ConfigParser
|
||||
from sehistory.influx import InfluxDBWrapper, abs_range, round_range, Table
|
||||
@ -400,7 +401,7 @@ class SEHistory(InfluxDBWrapper):
|
||||
if prevts is not None and (previns is None or (ts or ETERNITY) < prevts):
|
||||
ts = prevts + 0.001
|
||||
except Exception as e:
|
||||
print(f'Exception in get_instrument {e!r}')
|
||||
logging.warning('Exception in get_instrument: %r', e)
|
||||
tags['stream'] = stream
|
||||
if flag:
|
||||
tags['instrument'] = instrument
|
||||
@ -417,8 +418,7 @@ class SEHistory(InfluxDBWrapper):
|
||||
ts, flag = table[-1][:2]
|
||||
if flag:
|
||||
addtags = {k: v for k, v in table.tags.items()
|
||||
if k not in {'instrument', '_measurement', '_field'}}
|
||||
print(ts, addtags)
|
||||
if k not in {'instrument', '_measurement', '_field'}}
|
||||
self._add_point('_stream_', 'on', False, ts + 0.001,
|
||||
addtags)
|
||||
self.flush()
|
||||
|
24
streams.py
24
streams.py
@ -1,6 +1,7 @@
|
||||
import socket
|
||||
import time
|
||||
import re
|
||||
import logging
|
||||
from select import select
|
||||
|
||||
|
||||
@ -27,10 +28,10 @@ class Base:
|
||||
select_write = {}
|
||||
|
||||
def close(self):
|
||||
print('CLOSE BASE')
|
||||
logging.info('CLOSE BASE')
|
||||
|
||||
def finish_events(self, *args):
|
||||
print('FINISH BASE')
|
||||
logging.info('FINISH BASE')
|
||||
|
||||
|
||||
def short_hostname(host):
|
||||
@ -74,7 +75,7 @@ class Stream(Base):
|
||||
self.connect()
|
||||
self.init(**kwds)
|
||||
except Exception as e:
|
||||
print('FAIL', self.uri, repr(e))
|
||||
logging.info('failled connecting %s %r', self.uri, e)
|
||||
raise
|
||||
|
||||
def connect(self):
|
||||
@ -84,7 +85,7 @@ class Stream(Base):
|
||||
host, _, port = self.uri.partition(':')
|
||||
# try to convert uri to host name
|
||||
self.uri = self.tags['stream'] = f'{short_hostname(host)}:{port}'
|
||||
print(f'{host}:{port}', '=', self.uri, 'connected')
|
||||
logging.info('connected %s:%s = %s', host, port, self.uri)
|
||||
self._buffer = []
|
||||
self._deadline = INF
|
||||
self._next_connect = 0
|
||||
@ -104,7 +105,7 @@ class Stream(Base):
|
||||
if self.socket is None:
|
||||
return
|
||||
self.select_read.pop(self.socket.fileno(), None)
|
||||
print(self.uri, 'close socket')
|
||||
logging.info('close socket %s', self.uri)
|
||||
try:
|
||||
self.socket.shutdown(socket.SHUT_RDWR)
|
||||
except socket.error:
|
||||
@ -130,10 +131,9 @@ class Stream(Base):
|
||||
self.dead = min(now, self._last_live + 1)
|
||||
return True
|
||||
if self._deadline == INF:
|
||||
print(f'error "{e}" connecting to {self.uri} retrying for {self.max_offline} sec')
|
||||
logging.info(f'error %r connecting to %s retrying for %s sec',
|
||||
e, self.uri, self.max_offline)
|
||||
self._deadline = now + self.max_offline
|
||||
else:
|
||||
print('.', end='', flush=True)
|
||||
self._next_connect = now + 0.5
|
||||
return True
|
||||
return False
|
||||
@ -163,7 +163,7 @@ class Stream(Base):
|
||||
now = time.time()
|
||||
if now > self._next_ping:
|
||||
if self._next_ping == self._ping_deadline:
|
||||
print(self.uri, 'no pong')
|
||||
self.log.info('no pong from %s', self.uri)
|
||||
self.close()
|
||||
return
|
||||
self.ping()
|
||||
@ -280,13 +280,13 @@ class EventStream:
|
||||
stream = self.streams.get(uri)
|
||||
if stream:
|
||||
stream.tags.update(kwargs)
|
||||
print('update stream', uri, kwargs)
|
||||
logging.info('update stream %s %r', uri, kwargs)
|
||||
else:
|
||||
try:
|
||||
self.streams[uri] = stream = streamcls(uri, **kwargs)
|
||||
print('added stream', uri, kwargs)
|
||||
logging.info('added stream %s %r', uri, kwargs)
|
||||
except Exception as e:
|
||||
print('can not connect to', uri, repr(e), streamcls)
|
||||
logging.warning('can not connect to %s %r %r', uri, e, streamcls)
|
||||
continue
|
||||
device = stream.tags.get('device')
|
||||
events.append(('stream', kwargs.get('instrument', ''),
|
||||
|
Reference in New Issue
Block a user