diff --git a/router.py b/router.py index 04ed5ae..818b6fd 100644 --- a/router.py +++ b/router.py @@ -1,6 +1,10 @@ +"""router""" + import sys import os import socket +import time +import re from glob import glob from select import select from serial import serial_for_url @@ -73,8 +77,6 @@ for arg in sys.argv[1:]: log = Log(loglev) -print(log.loggers) - FILTER = "iptables -i enp4s0 -p tcp -m tcp --dport %d -j ACCEPT" BASIC = """ @@ -93,14 +95,60 @@ def unix_cmd(command): return Popen(command.split(), stdout=PIPE).communicate()[0].decode() +IP_ROUTE_PAT = re.compile(r'(.* (via) .*|.*) dev ([^ ]+) ') + + +def check_net(address): + """check to if interface is up where address would be routed + + return ifname: interface is up + return False: interface is down + return None: rout needs via -> do not consider this + """ + ret = Popen(['ip', 'route', 'get', address], stdout=PIPE).communicate()[0].decode() + match = IP_ROUTE_PAT.match(ret) + if match: + if match.group(2): + return None # via + ifname = match.group(3) + with open(f'/sys/class/net/{ifname}/carrier') as f: + if f.read().startswith('1'): + return ifname + return False + + class IoHandler: client = None - handler = None + service = None port = None + fno = None - def __init__(self, client, handler): - self.handler = handler + def __init__(self, client, service): + self.service = service self.client = client + + def close(self): + service, client = self.service, self.client + fno = 'undefined' + try: + fno = client.fileno() + service.readers.pop(fno, None) + client.shutdown(socket.SHUT_RDWR) + client.close() + log.info('closed connection from port %s fno %s', service.port, fno) + except Exception as e: + log.error('ERROR closing client fno %s: %r', fno, e) + service.handlers.pop(self.fno, None) + self.client = None + if self.fno: + self.fno = None + if service.pending_accepts and len(service.handlers) < service.maxcount: + service.complete_accept(service.pending_accepts.pop(0)) + + +class Router(IoHandler): + def __init__(self, client, service): + super().__init__(client, service) self.sentchunks = 0 self.sentbytes = 0 self.rcvdchunks = 0 @@ -110,20 +158,27 @@ class IoHandler: try: data = self.client.recv(1024) if data: - log.debug('< %r', data) + if len(data) > 50: + log.debug('< %r ... (%d)', data[:40], len(data)) + else: + log.debug('< %r', data) self.write(data) self.sentbytes += len(data) self.sentchunks += 1 return except Exception as e: - log.error('ERROR in request: %r', e) + msg = f'error in request: {e!r}' + service.failures[service.port] = msg + log.error(msg) self.close() - self.handler.close_client(self) def reply(self): try: data = self.read() - log.debug('> %r', data) + if len(data) > 50: + log.debug('> %r ... (%d)', data[:40], len(data)) + else: + log.debug('> %r', data) self.client.sendall(data) self.rcvdbytes += len(data) self.rcvdchunks += 1 @@ -131,19 +186,59 @@ class IoHandler: except ConnectionResetError: pass except Exception as e: - log.error('ERROR in reply: %r', e) + msg = f'error in reply: {e!r}' + service.failures[service.port] = msg self.close() - self.handler.close_client(self) - -class TcpHandler(IoHandler): - def __init__(self, client, handler): - log.info('create %r', handler.addr) - self.socket = socket.create_connection(handler.addr, timeout=5) - self.socket.settimeout(1) - self.fno = self.socket.fileno() - super().__init__(client, handler) - log.debug('created') + def close(self): + self.service.readers.pop(self.fno, None) + super().close() + + +class TcpHandler(Router): + socket = None + fno = None + + def __init__(self, client, service): + super().__init__(client, service) + host, port = service.addr + self.addr_todo = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM) + if not self.addr_todo: + self.log.error('can not resolve %s', host) + return + self.ifup = False + self.connect_first() + + def connect_first(self): + while self.addr_todo: + af, socktype, proto, _, sock_adr = self.addr_todo.pop(0) + self.ifup = check_net(sock_adr[0]) + if not self.ifup: + continue + try: + self.socket = sock = socket.socket(af, socktype, proto) + self.fno = sock.fileno() + log.info('try connecting to %r fno %s', sock_adr, self.fno) + sock.setblocking(False) + try: + sock.connect(sock_adr) + except BlockingIOError: + pass + self.connect_deadline = time.time() + self.service.tmo + self.service.pending_connects[self.fno] = self.check_connected + return sock + except Exception as e: + msg = f'error in connect: {e!r}' + service.failures[service.port] = msg + log.error(msg) + self.close() + if self.ifup: + msg = f'Timeout connecting to {self.service.addr}' + else: + msg = f'interface for {self.service.addr} is unplugged' + log.error(msg) + self.service.failures[self.service.port] = msg + self.close() def read(self): data = self.socket.recv(1024) @@ -155,18 +250,36 @@ class TcpHandler(IoHandler): self.socket.sendall(data) def close(self): - try: - self.socket.close() - except Exception as e: - log.error('ERROR in close: %r', e) + if self.socket: + self.service.pending_connects.pop(self.fno, None) + try: + self.socket.shutdown(socket.SHUT_RDWR) + self.socket.close() + log.info('close fno %s', self.fno) + except Exception as e: + log.error('ERROR in close fno %s: %r', self.fno, e) + super().close() + + def check_connected(self, ready): + if ready: + log.info('connected fno %s', self.fno) + self.socket.setblocking(True) + self.service.readers[self.client.fileno()] = self.request + self.service.readers[self.fno] = self.reply + elif time.time() > self.connect_deadline: + if self.addr_todo: + self.close() + self.connect_first() -class SerialHandler(IoHandler): - def __init__(self, client, handler): - self.serial = serial_for_url(handler.addr, timeout=10) +class SerialHandler(Router): + def __init__(self, client, service): + self.serial = serial_for_url(service.addr, timeout=10) self.serial.timeout = None self.fno = self.serial.fileno() - super().__init__(client, handler) + super().__init__(client, service) + service.readers[client.fileno()] = self.request + service.readers[service.fno] = self.reply def read(self): return self.serial.read(self.serial.in_waiting) @@ -176,21 +289,26 @@ class SerialHandler(IoHandler): def close(self): self.serial.close() + super().close() class InfoHandler(IoHandler): - def __init__(self, client, handler, loglevel=Log.INFO): - super().__init__(client, handler) - info = [f'{k} -> {v}' for k, v in AcceptHandler.routes.items()] - if AcceptHandler.handlers: + def __init__(self, client, service, loglevel=Log.INFO): + super().__init__(client, service) + info = [f'{k} -> {s.addr}' for k, s in service.routes.items()] + if sum(len(s.handlers) for s in service.routes.values()): info.append('\nactive routings, statistics bytes/chunks') info.append('fno port sent received') - for fno, h in AcceptHandler.handlers.items(): - info.append(f'{fno} {h.port} {h.sentbytes:d}/{h.sentchunks:d} {h.rcvdbytes:d}/{h.rcvdchunks:d}') + for s in service.routes.values(): + for fno, h in s.handlers.items(): + info.append(f'{fno} {h.port} {h.sentbytes:d}/{h.sentchunks:d} {h.rcvdbytes:d}/{h.rcvdchunks:d}') + if service.failures: + info.append('\nport last error') + for port, msg in service.failures.items(): + info.append(f'{port:5d} {msg}') info.append('') self.loggerkey = client.fileno() log.add(loglevel, self.loggerkey, self.emit) - self.fno = None self.emit('\n'.join(info)) def read(self): @@ -206,16 +324,16 @@ class InfoHandler(IoHandler): try: self.client.sendall(line.encode('utf-8')) self.client.sendall(b'\n') - except TimeoutError: + except Exception: pass -class AcceptHandler: - """handler for routing +class Service: + """service handler :param: port offered port for routing - :param: addr where to route - :param: iocls the io handler class, currently TcpHandler or SerialHandler + :param: addr where to route (or None for InfoHandler) + :param: iocls the io handler class, currently TcpHandler, SerialHandler, InfoHandler :param: maxcount the maximal number of concurrent connections. defaults to 1 as a side effect, if the destination is a web server, the traffic is serialized (only one connection at a time), which helps for @@ -223,10 +341,13 @@ class AcceptHandler: reused: in this case maxcount has to be increased ... """ readers = {} - handlers = {} + pending_connects = {} routes = {} + failures = {} + tmo = 5 def __init__(self, port, addr, iocls, maxcount=None, handler_args=()): + self.handlers = {} s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind(('0.0.0.0', port)) @@ -237,52 +358,48 @@ class AcceptHandler: self.handler_args = handler_args self.readers[s.fileno()] = self.accept self.port = port + if addr: + self.routes[port] = self # not for InfoHandler if maxcount is None: - maxcount = 8 if addr[1] == 80 else 1 - self.available = maxcount - self.pending = 0 + maxcount = 12 if addr[1] == 80 else 1 + self.maxcount = maxcount + self.pending_accepts = [] log.info('listening at port %s for %s %r', port, iocls.__name__, addr) - def close_client(self, iohandler): - self.readers.pop(iohandler.fno, None) - try: - client = iohandler.client - fno = client.fileno() - self.readers.pop(fno) - client.close() - log.info('closed connection from port %s fno %s', self.port, fno) - except Exception as e: - log.error('ERROR in close_client: %r', e) - self.handlers.pop(fno, None) - iohandler.client = None - iohandler.fno = None - self.available += 1 - if self.pending: - self.pending -= 1 - self.accept() - def accept(self): - if not self.available: - self.pending += 1 - return try: client, addr = self.socket.accept() - log.info('accepted %r on %r fno %s', addr, self.port, client.fileno()) + except Exception as e: + log.error('error accepting on port %s: %r', self.port, e) + return + fno = client.fileno() + log.info('accepted %r on %r fno %s', addr, self.port, fno) + if len(self.handlers) >= self.maxcount: + self.pending_accepts.append(client) + log.info('%d pending client connections on port %d', len(self.pending_accepts), self.port) + else: + self.complete_accept(client) + + def complete_accept(self, client): + try: handler = self.iocls(client, self, *self.handler_args) except Exception as e: - log.error('ERROR creating %s(%r): %r', self.iocls.__name__, self.addr, e) - client.close() + msg = f'error creating {self.iocls.__name__}({self.addr}): {e!r}' + self.failures[self.port] = msg + log.error(msg) + try: + client.shutdown(socket.SHUT_RDWR) + client.close() + except Exception: + pass return - self.readers[client.fileno()] = handler.request - if handler.fno is not None: - self.readers[handler.fno] = handler.reply handler.port = self.port - self.handlers[client.fileno()] = handler - self.available -= 1 + if handler.fno: + self.failures.pop(self.port, None) + self.handlers[handler.fno] = handler @classmethod def run(cls, routes, restrict=None): - cls.routes = dict(routes) if restrict is not None: lines = BASIC % dict(accept='DROP' if restrict else 'ACCEPT') unix_cmd('iptables -F') @@ -292,33 +409,48 @@ class AcceptHandler: if restrict: unix_cmd(FILTER % 22) - AcceptHandler(1110, None, InfoHandler, 5, handler_args=(log.DEBUG,)) - AcceptHandler(1111, None, InfoHandler, 5, handler_args=(log.INFO,)) - AcceptHandler(1112, None, InfoHandler, 5, handler_args=(log.WARN,)) + Service(1110, None, InfoHandler, 5, handler_args=(log.DEBUG,)) + Service(1111, None, InfoHandler, 5, handler_args=(log.INFO,)) + Service(1112, None, InfoHandler, 5, handler_args=(log.WARN,)) for port, dest in routes.items(): - port=int(port) + port = int(port) if restrict: unix_cmd(FILTER % port) if '/' in dest: - AcceptHandler(port, dest, SerialHandler) + Service(port, dest, SerialHandler) else: host, _, remoteport = dest.partition(':') if remoteport: remoteport = int(remoteport) else: remoteport = port - AcceptHandler(port, (host, remoteport), TcpHandler) + Service(port, (host, remoteport), TcpHandler) while True: try: # log.debug('select %r', list(cls.readers)) - ready, _, _ = select(cls.readers, [], []) + if cls.pending_connects: + while True: + ready, complete, _ = select(cls.readers, cls.pending_connects, [], 1) + for fno in complete: + cls.pending_connects.pop(fno)(True) + for func in list(cls.pending_connects.values()): + func(False) # check for connect timeouts + if ready: + break + else: + ready, complete, _ = select(cls.readers, [], []) # log.debug('ready %r', ready) except Exception as e: - for r in cls.readers: + for fno in cls.readers: try: - select([r], [], [], 0.1) + select([fno], [], [], 0.1) except Exception as e: - log.error('%r in select([%d])', e, r) + log.error('%r in select([%d])', e, fno) + for fno in cls.pending_connects: + try: + select([], [fno], [], 0.1) + except Exception as e: + log.error('%r in select([],[%d])', e, fno) raise for fno in ready: cls.readers[fno]() @@ -331,4 +463,4 @@ if __name__ == '__main__': raise ValueError('there must be one and only one single cfgfile %r' % cfgfiles) parser.read(cfgfiles[0]) if parser.has_section('ROUTER'): - AcceptHandler.run(parser['ROUTER']) + Service.run(parser['ROUTER'])