"""router""" import sys import os import socket import time import re from glob import glob from select import select from serial import serial_for_url from subprocess import Popen, PIPE, check_output, call, DEVNULL from utils import BoxInfo, change_firewall class Log: DEBUG = 0 INFO = 1 WARN = 2 ERROR = 3 QUIET = 4 def __init__(self, console_level): self.loggers = {lev: {} for lev in range(self.QUIET)} self.add(console_level, None, print) def add(self, level, key, emit): for lev in range(level, self.QUIET): self.loggers[lev][key] = emit def discard(self, key): for lev, emitters in self.loggers.items(): emitters.pop(key, None) def debug(self, fmt, *args): loggers = self.loggers[self.DEBUG] if loggers: line = fmt % args for emit in loggers.values(): emit(line) def info(self, fmt, *args): loggers = self.loggers[self.INFO] if loggers: line = fmt % args for emit in loggers.values(): emit(line) def warn(self, fmt, *args): loggers = self.loggers[self.WARN] if loggers: line = fmt % args for emit in loggers.values(): emit(line) def error(self, fmt, *args): loggers = self.loggers[self.ERROR] if loggers: line = fmt % args for emit in loggers.values(): emit(line) sim = False if sys.argv[0] == 'router.py': # started manually loglev = Log.INFO else: loglev = Log.WARN for arg in sys.argv[1:]: if arg == '-v': loglev = Log.DEBUG elif arg == '-s': sim = True else: raise ValueError(f'do not know {arg!r}') log = Log(loglev) FILTER = "iptables -i eth0 -p tcp -m tcp --dport %d -j ACCEPT" BASIC = """ iptables -P INPUT %(accept)s iptables -P FORWARD %(accept)s iptables -P OUTPUT ACCEPT iptables -A INPUT -i lo -j ACCEPT """ def unix_cmd(command): if sim: log.info('> %r' % command) else: log.info('$ %r' % command) return Popen(command.split(), stdout=PIPE).communicate()[0].decode() IP_ROUTE_PAT = re.compile(r'(.* (via) .*|.*) dev ([^ ]+) ') def check_net(address): """check to if interface is up where address would be routed return ifname: interface is up return False: interface is down return None: rout needs via -> do not consider this """ ret = Popen(['ip', 'route', 'get', address], stdout=PIPE).communicate()[0].decode() match = IP_ROUTE_PAT.match(ret) if match: if match.group(2): return None # via ifname = match.group(3) with open(f'/sys/class/net/{ifname}/carrier') as f: if f.read().startswith('1'): return ifname return False class IoHandler: client = None service = None port = None fno = None def __init__(self, client, service): self.service = service self.client = client def close(self): service, client = self.service, self.client fno = 'undefined' try: fno = client.fileno() service.readers.pop(fno, None) client.shutdown(socket.SHUT_RDWR) client.close() log.info('closed connection from port %s fno %s', service.port, fno) except Exception as e: log.error('ERROR closing client fno %s: %r', fno, e) service.handlers.pop(self.fno, None) self.client = None if self.fno: self.fno = None if service.pending_accepts and len(service.handlers) < service.maxcount: service.complete_accept(service.pending_accepts.pop(0)) class Router(IoHandler): def __init__(self, client, service): super().__init__(client, service) self.sentchunks = 0 self.sentbytes = 0 self.rcvdchunks = 0 self.rcvdbytes = 0 def request(self): try: data = self.client.recv(1024) if data: if len(data) > 50: log.debug('< %r ... (%d)', data[:40], len(data)) else: log.debug('< %r', data) self.write(data) self.sentbytes += len(data) self.sentchunks += 1 return except Exception as e: msg = f'error in request: {e!r}' self.service.failures[self.service.port] = msg Service.deadline = time.time() + 10 Service.reopen.append(self.service) self.service.close() log.error(msg) self.close() def reply(self): try: data = self.read() if len(data) > 50: log.debug('> %r ... (%d)', data[:40], len(data)) else: log.debug('> %r', data) self.client.sendall(data) self.rcvdbytes += len(data) self.rcvdchunks += 1 return except ConnectionResetError: pass except Exception as e: msg = f'error in reply: {e!r}' self.service.failures[self.service.port] = msg self.close() def close(self): self.service.readers.pop(self.fno, None) super().close() class TcpHandler(Router): socket = None fno = None def __init__(self, client, service): super().__init__(client, service) host, port = service.addr self.addr_todo = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM) if not self.addr_todo: self.log.error('can not resolve %s', host) return self.ifup = False self.connect_first() def connect_first(self): while self.addr_todo: af, socktype, proto, _, sock_adr = self.addr_todo.pop(0) self.ifup = check_net(sock_adr[0]) if not self.ifup: continue try: self.socket = sock = socket.socket(af, socktype, proto) self.fno = sock.fileno() log.info('try connecting to %r fno %s', sock_adr, self.fno) sock.setblocking(False) try: sock.connect(sock_adr) except BlockingIOError: pass self.connect_deadline = time.time() + self.service.tmo self.service.pending_connects[self.fno] = self.check_connected return sock except Exception as e: msg = f'error in connect: {e!r}' service.failures[service.port] = msg log.error(msg) self.close() if self.ifup: msg = f'Timeout connecting to {self.service.addr}' else: msg = f'interface for {self.service.addr} is unplugged' log.error(msg) self.service.failures[self.service.port] = msg self.close() def read(self): data = self.socket.recv(1024) if not data: raise ConnectionResetError('disconnected') return data def write(self, data): self.socket.sendall(data) def close(self): if self.socket: self.service.pending_connects.pop(self.fno, None) try: self.socket.shutdown(socket.SHUT_RDWR) self.socket.close() log.info('close fno %s', self.fno) except Exception as e: log.error('ERROR in close fno %s: %r', self.fno, e) super().close() def check_connected(self, ready): if ready: log.info('connected fno %s', self.fno) self.socket.setblocking(True) self.service.readers[self.client.fileno()] = self.request self.service.readers[self.fno] = self.reply elif time.time() > self.connect_deadline: if self.addr_todo: self.close() self.connect_first() class SerialHandler(Router): def __init__(self, client, service): self.serial = serial_for_url(service.addr, timeout=10) self.serial.timeout = None self.fno = self.serial.fileno() super().__init__(client, service) service.readers[client.fileno()] = self.request service.readers[self.fno] = self.reply def read(self): return self.serial.read(self.serial.in_waiting) def write(self, data): self.serial.write(data) def close(self): self.serial.close() super().close() class InfoHandler(IoHandler): def __init__(self, client, service, loglevel=Log.INFO): super().__init__(client, service) info = [f'{k} -> {s.addr}' for k, s in service.routes.items()] if sum(len(s.handlers) for s in service.routes.values()): info.append('\nactive routings, statistics bytes/chunks') info.append('fno port sent received') for s in service.routes.values(): for fno, h in s.handlers.items(): info.append(f'{fno} {h.port} {h.sentbytes:d}/{h.sentchunks:d} {h.rcvdbytes:d}/{h.rcvdchunks:d}') if service.failures: info.append('\nport last error') for port, msg in service.failures.items(): info.append(f'{port:5d} {msg}') info.append('') self.loggerkey = client.fileno() log.add(loglevel, self.loggerkey, self.emit) self.emit('\n'.join(info)) def read(self): return b'' def write(self, data): pass def close(self): log.discard(self.loggerkey) def emit(self, line): try: self.client.sendall(line.encode('utf-8')) self.client.sendall(b'\n') except Exception: pass class Service: """service handler :param: port offered port for routing :param: addr where to route (or None for InfoHandler) :param: iocls the io handler class, currently TcpHandler, SerialHandler, InfoHandler :param: maxcount the maximal number of concurrent connections. defaults to 1 as a side effect, if the destination is a web server, the traffic is serialized (only one connection at a time), which helps for some moxa device servers. might be a problem, if connections are reused: in this case maxcount has to be increased ... """ readers = {} pending_connects = {} routes = {} failures = {} firewall_ports = set() tmo = 5 reopen = [] deadline = None def __init__(self, port, addr, iocls, maxcount=None, handler_args=()): self.handlers = {} s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind(('0.0.0.0', port)) s.listen() self.socket = s self.addr = addr self.iocls = iocls self.handler_args = handler_args self.readers[s.fileno()] = self.accept self.port = port self.firewall_ports.add(port) if addr: self.routes[port] = self # not for InfoHandler if maxcount is None: maxcount = 12 if addr[1] == 80 else 1 self.maxcount = maxcount self.pending_accepts = [] log.info('listening at port %s for %s %r', port, iocls.__name__, addr) def accept(self): try: client, addr = self.socket.accept() except Exception as e: log.error('error accepting on port %s: %r', self.port, e) return fno = client.fileno() log.info('accepted %r on %r fno %s', addr, self.port, fno) if len(self.handlers) >= self.maxcount: self.pending_accepts.append(client) log.info('%d pending client connections on port %d', len(self.pending_accepts), self.port) else: self.complete_accept(client) def complete_accept(self, client): try: handler = self.iocls(client, self, *self.handler_args) except Exception as e: msg = f'error creating {self.iocls.__name__}({self.addr}): {e!r}' self.failures[self.port] = msg log.error(msg) try: client.shutdown(socket.SHUT_RDWR) client.close() except Exception: pass return handler.port = self.port if handler.fno: self.failures.pop(self.port, None) self.handlers[handler.fno] = handler def close(self): s = self.socket self.readers.pop(s.fileno(), None) s.close() def clone(self): return type(self)(self.port, self.addr, self.iocls) @classmethod def check_deadline(cls): if cls.deadline and time.time() > cls.deadline: cls.deadline = None while cls.reopen: cls.reopen.pop().clone() @classmethod def run(cls, routes): firewall = routes.pop('firewall', None) firewall_on = firewall is not None if firewall_on: cls.firewall_ports = set(int(r) for r in firewall.split(',') if r.strip()) Service(1110, None, InfoHandler, 5, handler_args=(log.DEBUG,)) Service(1111, None, InfoHandler, 5, handler_args=(log.INFO,)) Service(1112, None, InfoHandler, 5, handler_args=(log.WARN,)) for port, dest in routes.items(): port = int(port) if '/' in dest: Service(port, dest, SerialHandler) else: host, _, remoteport = dest.partition(':') if remoteport: remoteport = int(remoteport) else: remoteport = port Service(port, (host, remoteport), TcpHandler) change_firewall(firewall_on, cls.firewall_ports) while True: try: # log.debug('select %r', list(cls.readers)) if cls.pending_connects: while True: ready, complete, _ = select(cls.readers, cls.pending_connects, [], 1) for fno in complete: cls.pending_connects.pop(fno)(True) for func in list(cls.pending_connects.values()): func(False) # check for connect timeouts if ready: break else: ready, complete, _ = select(cls.readers, [], [], 1) cls.check_deadline() # log.debug('ready %r', ready) except Exception as e: for fno in cls.readers: try: select([fno], [], [], 0.1) except Exception as e: log.error('%r in select([%d])', e, fno) for fno in cls.pending_connects: try: select([], [fno], [], 0.1) except Exception as e: log.error('%r in select([],[%d])', e, fno) raise for fno in ready: cls.readers.get(fno, int)() # int() -> dummy function if __name__ == '__main__': routercfg = BoxInfo().read_config('ROUTER') if routercfg: Service.run(routercfg)