From ef35d18f37df73a232b102674283e61dae8bdfa5 Mon Sep 17 00:00:00 2001 From: Markus Zolliker Date: Fri, 23 May 2025 13:52:42 +0200 Subject: [PATCH] proper logging, add signal handler for graceful termination --- feeder.py | 17 +++++++++++++++-- influx.py | 25 ++++++++++++------------- nicoscache.py | 3 ++- secop.py | 16 ++++++++-------- seinflux.py | 6 +++--- streams.py | 24 ++++++++++++------------ 6 files changed, 52 insertions(+), 39 deletions(-) diff --git a/feeder.py b/feeder.py index 8e518eb..a006fd8 100644 --- a/feeder.py +++ b/feeder.py @@ -1,4 +1,6 @@ import sys +import signal +import logging from os.path import expanduser sys.path.append(expanduser('~')) import socket @@ -20,6 +22,9 @@ Usage: python feeder.py uri [device] [instrument] """ +logging.basicConfig(filename='logfile.log', filemode='w', level=logging.INFO, + format='%(asctime)s %(levelname)s %(message)s') + def main(dbname=None, access='write'): # egen = EventStream(ScanReply(), ScanStream(), n=NicosStream('localhost:14002')) @@ -42,14 +47,21 @@ def main(dbname=None, access='write'): port = fm.info.get(ins, {}).get(service, {}) if port: uri = f'{host}:{port}' - print('CREATE', uri, ins, cfginfo.get((ins, service))) + logging.info('CREATE %s %s %s', uri, ins, cfginfo.get((ins, service))) TrySecopConnect(uri) db.set_instrument(uri, ins) event_map = {'value': db.add_float, 'error': db.add_error, 'stream': db.add_stream} + running = [True] + + def handler(self, num, frame): + running.clear() + + signal.signal(signal.SIGTERM, handler) + try: - while 1: + while running: for kind, *args in egen.get_events(): event_map[kind](*args) db.flush() @@ -57,6 +69,7 @@ def main(dbname=None, access='write'): for kind, *args in egen.finish(): event_map[kind](*args) db.disconnect() + logging.info('gracefully finished') if len(sys.argv) >= 3: diff --git a/influx.py b/influx.py index 52bc374..1e3a103 100644 --- a/influx.py +++ b/influx.py @@ -21,6 +21,7 @@ # ***************************************************************************** import re import time +import logging from datetime import datetime, timezone from math import floor, ceil @@ -192,8 +193,7 @@ class InfluxDBWrapper: self.add_new_bucket(self._bucket, access == 'create') self._write_buffer = [] self._alias = {} - print('InfluxDBWrapper', self._url, self._org, self._bucket) - self.debug = False + logging.info('InfluxDBWrapper %s %s %s', self._url, self._org, self._bucket) def enable_write_access(self): self._write_api_write = self._client.write_api(write_options=SYNCHRONOUS).write @@ -356,22 +356,21 @@ class InfluxDBWrapper: msg.append(f'''|> keep(columns:["{'","'.join(columns + keylist)}"])''') msg = '\n'.join(msg) - if self.debug: - print(msg) + logging.debug('MSG %r', msg) self.msg = msg try: reader = self._client.query_api().query_csv(msg) - except Exception: - print(msg) + except Exception as e: + logging.exception("error in query: %r", msg) raise - if self.debug: - def readdebug(reader): - for row in reader: - print(row) - yield row - reader = readdebug(reader) + # if self.debug: + # def readdebug(reader): + # for row in reader: + # print(row) + # yield row + # reader = readdebug(reader) try: row = next(reader) except StopIteration: @@ -416,7 +415,7 @@ class InfluxDBWrapper: # consume unused rows consumed = sum(1 for _ in rows) if consumed: - print('skip', consumed, 'rows') + logging.info('skip %r rows', consumed) if not row: # reader is at end return diff --git a/nicoscache.py b/nicoscache.py index ab5ef59..85d2ba5 100644 --- a/nicoscache.py +++ b/nicoscache.py @@ -1,4 +1,5 @@ import re +import logging from ast import literal_eval from streams import Stream from secop import EnumConvert @@ -154,7 +155,7 @@ class NicosStream(Stream): if self._init: raise TimeoutError('timeout receiving initial values') except Exception as e: - print(self.uri, repr(e)) + logging.exception('nicos %r', self.uri) return for ts, devname, param, op, value in sorted([t, d, p, o, v] for (d, p), (o, v, t) in events.items()): descr = self.descr.get(devname) diff --git a/secop.py b/secop.py index 5321d5e..663c108 100644 --- a/secop.py +++ b/secop.py @@ -3,6 +3,7 @@ import os import json import time import socket +import logging from collections import namedtuple from select import select from streams import Stream, Base, StreamDead @@ -112,7 +113,7 @@ class SecopStream(Stream): except Exception as e: # probably other end closed - print(self.uri, repr(e)) + logging.info('%r on %s', e, self.uri) SECOP_UDP_PORT = 10767 @@ -154,7 +155,7 @@ class ScanReply(UdpStream): sock.sendto(json.dumps(dict(SECoP='discover')).encode('utf-8'), ('255.255.255.255', SECOP_UDP_PORT)) except OSError as e: - print('could not send the broadcast:', e) + logging.info('could not send the broadcast %r:', e) self.socket = sock self.select_read[sock.fileno()] = self @@ -193,12 +194,11 @@ class TrySecopConnect(Base): try: self.socket.sendall(b'*IDN?\n') self.idn_sent = True - print('SEND IDN', self.uri) + logging.debug('SEND IDN %s', self.uri) self.select_read[self.fno] = self return except Exception as e: - print('NO CONN TO', self.uri) - print(e) + logging.info('NO CONN TO %s %r', self.uri, e) else: reply = b'' try: @@ -206,12 +206,12 @@ class TrySecopConnect(Base): if chunk: self.idn += chunk if b'SECoP' in self.idn: - print('CONN TO', self.uri) + logging.info('connected to %s', self.uri) yield SecopStream, self.uri, {'stream': self.uri} if b'\n' not in self.idn: return - except Exception as e: - print(e) + except Exception: + logging.exception('receiving') self.select_read.pop(self.fno) diff --git a/seinflux.py b/seinflux.py index 5c0b5d7..4dc13af 100644 --- a/seinflux.py +++ b/seinflux.py @@ -1,4 +1,5 @@ import time +import logging from pathlib import Path from configparser import ConfigParser from sehistory.influx import InfluxDBWrapper, abs_range, round_range, Table @@ -400,7 +401,7 @@ class SEHistory(InfluxDBWrapper): if prevts is not None and (previns is None or (ts or ETERNITY) < prevts): ts = prevts + 0.001 except Exception as e: - print(f'Exception in get_instrument {e!r}') + logging.warning('Exception in get_instrument: %r', e) tags['stream'] = stream if flag: tags['instrument'] = instrument @@ -417,8 +418,7 @@ class SEHistory(InfluxDBWrapper): ts, flag = table[-1][:2] if flag: addtags = {k: v for k, v in table.tags.items() - if k not in {'instrument', '_measurement', '_field'}} - print(ts, addtags) + if k not in {'instrument', '_measurement', '_field'}} self._add_point('_stream_', 'on', False, ts + 0.001, addtags) self.flush() diff --git a/streams.py b/streams.py index 6af7d10..f431ef0 100644 --- a/streams.py +++ b/streams.py @@ -1,6 +1,7 @@ import socket import time import re +import logging from select import select @@ -27,10 +28,10 @@ class Base: select_write = {} def close(self): - print('CLOSE BASE') + logging.info('CLOSE BASE') def finish_events(self, *args): - print('FINISH BASE') + logging.info('FINISH BASE') def short_hostname(host): @@ -74,7 +75,7 @@ class Stream(Base): self.connect() self.init(**kwds) except Exception as e: - print('FAIL', self.uri, repr(e)) + logging.info('failled connecting %s %r', self.uri, e) raise def connect(self): @@ -84,7 +85,7 @@ class Stream(Base): host, _, port = self.uri.partition(':') # try to convert uri to host name self.uri = self.tags['stream'] = f'{short_hostname(host)}:{port}' - print(f'{host}:{port}', '=', self.uri, 'connected') + logging.info('connected %s:%s = %s', host, port, self.uri) self._buffer = [] self._deadline = INF self._next_connect = 0 @@ -104,7 +105,7 @@ class Stream(Base): if self.socket is None: return self.select_read.pop(self.socket.fileno(), None) - print(self.uri, 'close socket') + logging.info('close socket %s', self.uri) try: self.socket.shutdown(socket.SHUT_RDWR) except socket.error: @@ -130,10 +131,9 @@ class Stream(Base): self.dead = min(now, self._last_live + 1) return True if self._deadline == INF: - print(f'error "{e}" connecting to {self.uri} retrying for {self.max_offline} sec') + logging.info(f'error %r connecting to %s retrying for %s sec', + e, self.uri, self.max_offline) self._deadline = now + self.max_offline - else: - print('.', end='', flush=True) self._next_connect = now + 0.5 return True return False @@ -163,7 +163,7 @@ class Stream(Base): now = time.time() if now > self._next_ping: if self._next_ping == self._ping_deadline: - print(self.uri, 'no pong') + self.log.info('no pong from %s', self.uri) self.close() return self.ping() @@ -280,13 +280,13 @@ class EventStream: stream = self.streams.get(uri) if stream: stream.tags.update(kwargs) - print('update stream', uri, kwargs) + logging.info('update stream %s %r', uri, kwargs) else: try: self.streams[uri] = stream = streamcls(uri, **kwargs) - print('added stream', uri, kwargs) + logging.info('added stream %s %r', uri, kwargs) except Exception as e: - print('can not connect to', uri, repr(e), streamcls) + logging.warning('can not connect to %s %r %r', uri, e, streamcls) continue device = stream.tags.get('device') events.append(('stream', kwargs.get('instrument', ''),