diff --git a/router.py b/router.py index b59a6a3..04ed5ae 100644 --- a/router.py +++ b/router.py @@ -1,29 +1,79 @@ import sys import os import socket -import logging from glob import glob from select import select from serial import serial_for_url from subprocess import Popen, PIPE, check_output, call, DEVNULL from configparser import ConfigParser + +class Log: + DEBUG = 0 + INFO = 1 + WARN = 2 + ERROR = 3 + QUIET = 4 + + def __init__(self, console_level): + self.loggers = {lev: {} for lev in range(self.QUIET)} + self.add(console_level, None, print) + + def add(self, level, key, emit): + for lev in range(level, self.QUIET): + self.loggers[lev][key] = emit + + def discard(self, key): + for lev, emitters in self.loggers.items(): + emitters.pop(key, None) + + def debug(self, fmt, *args): + loggers = self.loggers[self.DEBUG] + if loggers: + line = fmt % args + for emit in loggers.values(): + emit(line) + + def info(self, fmt, *args): + loggers = self.loggers[self.INFO] + if loggers: + line = fmt % args + for emit in loggers.values(): + emit(line) + + def warn(self, fmt, *args): + loggers = self.loggers[self.WARN] + if loggers: + line = fmt % args + for emit in loggers.values(): + emit(line) + + def error(self, fmt, *args): + loggers = self.loggers[self.ERROR] + if loggers: + line = fmt % args + for emit in loggers.values(): + emit(line) + + sim = False if sys.argv[0] == 'router.py': # started manually - loglev = logging.INFO + loglev = Log.INFO else: - loglev = logging.WARNING + loglev = Log.WARN for arg in sys.argv[1:]: if arg == '-v': - loglev = logging.DEBUG + loglev = Log.DEBUG elif arg == '-s': sim = True else: raise ValueError(f'do not know {arg!r}') -logging.basicConfig(format='%(levelname)1.1s: %(message)s', level=loglev) +log = Log(loglev) + +print(log.loggers) FILTER = "iptables -i enp4s0 -p tcp -m tcp --dport %d -j ACCEPT" @@ -34,56 +84,66 @@ iptables -P OUTPUT ACCEPT iptables -A INPUT -i lo -j ACCEPT """ + def unix_cmd(command): if sim: - logging.info('> %r' % command) + log.info('> %r' % command) else: - logging.info('$ %r' % command) + log.info('$ %r' % command) return Popen(command.split(), stdout=PIPE).communicate()[0].decode() class IoHandler: client = None handler = None + port = None def __init__(self, client, handler): self.handler = handler self.client = client + self.sentchunks = 0 + self.sentbytes = 0 + self.rcvdchunks = 0 + self.rcvdbytes = 0 def request(self): try: data = self.client.recv(1024) if data: - logging.debug('< %r', data) + log.debug('< %r', data) self.write(data) + self.sentbytes += len(data) + self.sentchunks += 1 return except Exception as e: - logging.error('ERROR in request: %r', e) + log.error('ERROR in request: %r', e) self.close() self.handler.close_client(self) def reply(self): try: data = self.read() - logging.debug('> %r', data) + log.debug('> %r', data) self.client.sendall(data) + self.rcvdbytes += len(data) + self.rcvdchunks += 1 return except ConnectionResetError: pass except Exception as e: - logging.error('ERROR in reply: %r', e) + log.error('ERROR in reply: %r', e) self.close() self.handler.close_client(self) class TcpHandler(IoHandler): def __init__(self, client, handler): - logging.info('create %r', handler.addr) + log.info('create %r', handler.addr) self.socket = socket.create_connection(handler.addr, timeout=5) self.socket.settimeout(1) self.fno = self.socket.fileno() super().__init__(client, handler) - logging.debug('created') + log.debug('created') def read(self): data = self.socket.recv(1024) @@ -98,7 +158,7 @@ class TcpHandler(IoHandler): try: self.socket.close() except Exception as e: - logging.error('ERROR in close: %r', e) + log.error('ERROR in close: %r', e) class SerialHandler(IoHandler): @@ -118,21 +178,55 @@ class SerialHandler(IoHandler): self.serial.close() +class InfoHandler(IoHandler): + def __init__(self, client, handler, loglevel=Log.INFO): + super().__init__(client, handler) + info = [f'{k} -> {v}' for k, v in AcceptHandler.routes.items()] + if AcceptHandler.handlers: + info.append('\nactive routings, statistics bytes/chunks') + info.append('fno port sent received') + for fno, h in AcceptHandler.handlers.items(): + info.append(f'{fno} {h.port} {h.sentbytes:d}/{h.sentchunks:d} {h.rcvdbytes:d}/{h.rcvdchunks:d}') + info.append('') + self.loggerkey = client.fileno() + log.add(loglevel, self.loggerkey, self.emit) + self.fno = None + self.emit('\n'.join(info)) + + def read(self): + return b'' + + def write(self, data): + pass + + def close(self): + log.discard(self.loggerkey) + + def emit(self, line): + try: + self.client.sendall(line.encode('utf-8')) + self.client.sendall(b'\n') + except TimeoutError: + pass + + class AcceptHandler: """handler for routing :param: port offered port for routing :param: addr where to route :param: iocls the io handler class, currently TcpHandler or SerialHandler - :param: maxcount the maximal number of concurrent connections. defauls to 1 + :param: maxcount the maximal number of concurrent connections. defaults to 1 as a side effect, if the destination is a web server, the traffic - are serialized (only one connection at a time), which helps for + is serialized (only one connection at a time), which helps for some moxa device servers. might be a problem, if connections are reused: in this case maxcount has to be increased ... """ readers = {} + handlers = {} + routes = {} - def __init__(self, port, addr, iocls, maxcount=None): + def __init__(self, port, addr, iocls, maxcount=None, handler_args=()): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind(('0.0.0.0', port)) @@ -140,22 +234,26 @@ class AcceptHandler: self.socket = s self.addr = addr self.iocls = iocls + self.handler_args = handler_args self.readers[s.fileno()] = self.accept self.port = port if maxcount is None: maxcount = 8 if addr[1] == 80 else 1 self.available = maxcount self.pending = 0 - logging.info('listening at port %d for %s(%r)', port, iocls.__name__, addr) + log.info('listening at port %s for %s %r', port, iocls.__name__, addr) def close_client(self, iohandler): self.readers.pop(iohandler.fno, None) try: client = iohandler.client - self.readers.pop(client.fileno()) + fno = client.fileno() + self.readers.pop(fno) client.close() + log.info('closed connection from port %s fno %s', self.port, fno) except Exception as e: - logging.error('ERROR in close_client: %r', e) + log.error('ERROR in close_client: %r', e) + self.handlers.pop(fno, None) iohandler.client = None iohandler.fno = None self.available += 1 @@ -169,18 +267,22 @@ class AcceptHandler: return try: client, addr = self.socket.accept() - logging.info('accepted %r on %r', addr, self.port) - handler = self.iocls(client, self) + log.info('accepted %r on %r fno %s', addr, self.port, client.fileno()) + handler = self.iocls(client, self, *self.handler_args) except Exception as e: - logging.error('%r creating %s(%r)', e, self.iocls.__name__, self.addr) + log.error('ERROR creating %s(%r): %r', self.iocls.__name__, self.addr, e) client.close() return self.readers[client.fileno()] = handler.request - self.readers[handler.fno] = handler.reply + if handler.fno is not None: + self.readers[handler.fno] = handler.reply + handler.port = self.port + self.handlers[client.fileno()] = handler self.available -= 1 @classmethod def run(cls, routes, restrict=None): + cls.routes = dict(routes) if restrict is not None: lines = BASIC % dict(accept='DROP' if restrict else 'ACCEPT') unix_cmd('iptables -F') @@ -190,6 +292,9 @@ class AcceptHandler: if restrict: unix_cmd(FILTER % 22) + AcceptHandler(1110, None, InfoHandler, 5, handler_args=(log.DEBUG,)) + AcceptHandler(1111, None, InfoHandler, 5, handler_args=(log.INFO,)) + AcceptHandler(1112, None, InfoHandler, 5, handler_args=(log.WARN,)) for port, dest in routes.items(): port=int(port) if restrict: @@ -205,21 +310,20 @@ class AcceptHandler: AcceptHandler(port, (host, remoteport), TcpHandler) while True: try: - # logging.debug('select %r', list(cls.readers)) + # log.debug('select %r', list(cls.readers)) ready, _, _ = select(cls.readers, [], []) - # logging.debug('ready %r', ready) + # log.debug('ready %r', ready) except Exception as e: for r in cls.readers: try: select([r], [], [], 0.1) except Exception as e: - logging.error('%r in select([%d])', e, r) + log.error('%r in select([%d])', e, r) raise for fno in ready: cls.readers[fno]() - if __name__ == '__main__': parser = ConfigParser() cfgfiles = glob('/root/aputools/servercfg/%s_*.cfg' % socket.gethostname())