import os import socket from glob import glob from select import select from serial import serial_for_url from subprocess import Popen, PIPE, check_output, call, DEVNULL from configparser import ConfigParser FILTER = "iptables -i enp4s0 -p tcp -m tcp --dport %d -j ACCEPT" BASIC = """ iptables -P INPUT %(accept)s iptables -P FORWARD %(accept)s iptables -P OUTPUT ACCEPT iptables -A INPUT -i lo -j ACCEPT """ sim = False def unix_cmd(command): if sim: print('> %r' % command) else: print('$ %r' % command) return Popen(command.split(), stdout=PIPE).communicate()[0].decode() class IoHandler: client = None handler = None port = None def __init__(self, client, handler): self.handler = handler self.client = client self.sentchunks = 0 self.sentbytes = 0 self.rcvdchunks = 0 self.rcvdbytes = 0 def request(self): try: data = self.client.recv(1024) if data: # print('<', data) self.write(data) self.sentbytes += len(data) self.sentchunks += 1 return except Exception as e: print('ERROR in request: %s' % e) self.close() self.handler.close_client(self) def reply(self): try: data = self.read() # print('>', data) self.client.sendall(data) self.rcvdbytes += len(data) self.rcvdchunks += 1 return except ConnectionResetError: pass except Exception as e: print('ERROR in reply: %s' % e) self.close() self.handler.close_client(self) class TcpHandler(IoHandler): def __init__(self, client, handler): self.socket = socket.create_connection(handler.addr) self.fno = self.socket.fileno() super().__init__(client, handler) def read(self): data = self.socket.recv(1024) if not data: raise ConnectionResetError('disconnected') return data def write(self, data): self.socket.sendall(data) def close(self): try: self.socket.close() except Exception as e: print('ERROR in close: %s' % e) class SerialHandler(IoHandler): def __init__(self, client, handler): self.serial = serial_for_url(handler.addr, timeout=10) self.serial.timeout = None self.fno = self.serial.fileno() super().__init__(client, handler) def read(self): return self.serial.read(self.serial.in_waiting) def write(self, data): self.serial.write(data) def close(self): self.serial.close() class InfoHandler(IoHandler): clients = {} def __init__(self, client, handler): super().__init__(client, handler) info = [f'{k} -> {v}' for k, v in AcceptHandler.routes.items()] if AcceptHandler.handlers: info.append('\nactive routings, statistics bytes/chunks') info.append('fno port sent received') for fno, h in AcceptHandler.handlers.items(): info.append(f'{fno} {h.port} {h.sentbytes:d}/{h.sentchunks:d} {h.rcvdbytes:d}/{h.rcvdchunks:d}') info.append('') self.client.sendall('\n'.join(info).encode('utf-8')) self.clients[client.fileno()] = self self.fno = None def read(self): return b'' def write(self, data): pass def close(self): self.clients.pop(self.client.fileno()) @classmethod def log(cls, line): if cls.clients: for c in cls.clients.values(): try: c.client.sendall(line.encode('utf-8')) c.client.sendall(b'\n') except TimeoutError: pass else: print(line) class AcceptHandler: """handler for routing :param: port offered port for routing :param: addr where to route :param: iocls the io handler class, currently TcpHandler or SerialHandler :param: maxcount the maximal number of concurrent connections. defauls to 1 as a side effect, if the destination is a web server, the traffic are serialized (only one connection at a time), which helps for some moxa device servers. might be a problem, if connections are reused: in this case maxcount has to be increased ... """ readers = {} handlers = {} routes = {} def __init__(self, port, addr, iocls, maxcount=1): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind(('0.0.0.0', port)) s.listen() self.socket = s self.addr = addr self.iocls = iocls self.readers[s.fileno()] = self.accept self.port = port self.available = maxcount self.pending = 0 def close_client(self, iohandler): self.readers.pop(iohandler.fno, None) client = iohandler.client fno = client.fileno() try: self.readers.pop(fno) client.close() InfoHandler.log(f'closed connection from port {self.port} fno {fno}') except Exception as e: InfoHandler.log(f'{e!r} in close_client') self.handlers.pop(fno, None) iohandler.client = None iohandler.fno = None self.available += 1 if self.pending: self.pending -= 1 self.accept() def accept(self): if not self.available: self.pending += 1 return try: client, addr = self.socket.accept() InfoHandler.log(f'accepted {addr} on {self.port} fno {client.fileno()}') handler = self.iocls(client, self) except Exception as e: InfoHandler.log(f'{e!r} creating {self.iocls.__name__}({self.addr})') client.close() return self.readers[client.fileno()] = handler.request if handler.fno is not None: self.readers[handler.fno] = handler.reply # statistics: number of chunks sent / received handler.port = self.port self.handlers[client.fileno()] = handler self.available -= 1 @classmethod def run(cls, routes, restrict=None): cls.routes = dict(routes) if restrict is not None: lines = BASIC % dict(accept='DROP' if restrict else 'ACCEPT') unix_cmd('iptables -F') for line in lines.split('\n'): if line.strip(): unix_cmd(line) if restrict: unix_cmd(FILTER % 22) AcceptHandler(1111, None, InfoHandler, 5) for port, dest in routes.items(): port=int(port) if restrict: unix_cmd(FILTER % port) if '/' in dest: AcceptHandler(port, dest, SerialHandler) else: host, _, remoteport = dest.partition(':') if remoteport: remoteport = int(remoteport) else: remoteport = port AcceptHandler(port, (host, remoteport), TcpHandler) while True: try: ready, _, _ = select(cls.readers, [], []) except Exception as e: for r in cls.readers: try: select([r], [], [], 0.1) except Exception as e: print(r, e) raise for fno in ready: cls.readers[fno]() if __name__ == '__main__': parser = ConfigParser() cfgfiles = glob('/root/aputools/servercfg/%s_*.cfg' % socket.gethostname()) if len(cfgfiles) != 1: raise ValueError('there must be one and only one single cfgfile %r' % cfgfiles) parser.read(cfgfiles[0]) if parser.has_section('ROUTER'): AcceptHandler.run(parser['ROUTER'])