Files
boxtools/router.py

265 lines
7.9 KiB
Python

import os
import socket
from glob import glob
from select import select
from serial import serial_for_url
from subprocess import Popen, PIPE, check_output, call, DEVNULL
from configparser import ConfigParser
FILTER = "iptables -i enp4s0 -p tcp -m tcp --dport %d -j ACCEPT"
BASIC = """
iptables -P INPUT %(accept)s
iptables -P FORWARD %(accept)s
iptables -P OUTPUT ACCEPT
iptables -A INPUT -i lo -j ACCEPT
"""
sim = False
def unix_cmd(command):
if sim:
print('> %r' % command)
else:
print('$ %r' % command)
return Popen(command.split(), stdout=PIPE).communicate()[0].decode()
class IoHandler:
client = None
handler = None
port = None
def __init__(self, client, handler):
self.handler = handler
self.client = client
self.sentchunks = 0
self.sentbytes = 0
self.rcvdchunks = 0
self.rcvdbytes = 0
def request(self):
try:
data = self.client.recv(1024)
if data:
# print('<', data)
self.write(data)
self.sentbytes += len(data)
self.sentchunks += 1
return
except Exception as e:
print('ERROR in request: %s' % e)
self.close()
self.handler.close_client(self)
def reply(self):
try:
data = self.read()
# print('>', data)
self.client.sendall(data)
self.rcvdbytes += len(data)
self.rcvdchunks += 1
return
except ConnectionResetError:
pass
except Exception as e:
print('ERROR in reply: %s' % e)
self.close()
self.handler.close_client(self)
class TcpHandler(IoHandler):
def __init__(self, client, handler):
self.socket = socket.create_connection(handler.addr)
self.fno = self.socket.fileno()
super().__init__(client, handler)
def read(self):
data = self.socket.recv(1024)
if not data:
raise ConnectionResetError('disconnected')
return data
def write(self, data):
self.socket.sendall(data)
def close(self):
try:
self.socket.close()
except Exception as e:
print('ERROR in close: %s' % e)
class SerialHandler(IoHandler):
def __init__(self, client, handler):
self.serial = serial_for_url(handler.addr, timeout=10)
self.serial.timeout = None
self.fno = self.serial.fileno()
super().__init__(client, handler)
def read(self):
return self.serial.read(self.serial.in_waiting)
def write(self, data):
self.serial.write(data)
def close(self):
self.serial.close()
class InfoHandler(IoHandler):
clients = {}
def __init__(self, client, handler):
super().__init__(client, handler)
info = [f'{k} -> {v}' for k, v in AcceptHandler.routes.items()]
if AcceptHandler.handlers:
info.append('\nactive routings, statistics bytes/chunks')
info.append('fno port sent received')
for fno, h in AcceptHandler.handlers.items():
info.append(f'{fno} {h.port} {h.sentbytes:d}/{h.sentchunks:d} {h.rcvdbytes:d}/{h.rcvdchunks:d}')
info.append('')
self.client.sendall('\n'.join(info).encode('utf-8'))
self.clients[client.fileno()] = self
self.fno = None
def read(self):
return b''
def write(self, data):
pass
def close(self):
self.clients.pop(self.client.fileno())
@classmethod
def log(cls, line):
if cls.clients:
for c in cls.clients.values():
try:
c.client.sendall(line.encode('utf-8'))
c.client.sendall(b'\n')
except TimeoutError:
pass
else:
print(line)
class AcceptHandler:
"""handler for routing
:param: port offered port for routing
:param: addr where to route
:param: iocls the io handler class, currently TcpHandler or SerialHandler
:param: maxcount the maximal number of concurrent connections. defauls to 1
as a side effect, if the destination is a web server, the traffic
are serialized (only one connection at a time), which helps for
some moxa device servers. might be a problem, if connections are
reused: in this case maxcount has to be increased ...
"""
readers = {}
handlers = {}
routes = {}
def __init__(self, port, addr, iocls, maxcount=1):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(('0.0.0.0', port))
s.listen()
self.socket = s
self.addr = addr
self.iocls = iocls
self.readers[s.fileno()] = self.accept
self.port = port
self.available = maxcount
self.pending = 0
def close_client(self, iohandler):
self.readers.pop(iohandler.fno, None)
client = iohandler.client
fno = client.fileno()
try:
self.readers.pop(fno)
client.close()
InfoHandler.log(f'closed connection from port {self.port} fno {fno}')
except Exception as e:
InfoHandler.log(f'{e!r} in close_client')
self.handlers.pop(fno, None)
iohandler.client = None
iohandler.fno = None
self.available += 1
if self.pending:
self.pending -= 1
self.accept()
def accept(self):
if not self.available:
self.pending += 1
return
try:
client, addr = self.socket.accept()
InfoHandler.log(f'accepted {addr} on {self.port} fno {client.fileno()}')
handler = self.iocls(client, self)
except Exception as e:
InfoHandler.log(f'{e!r} creating {self.iocls.__name__}({self.addr})')
client.close()
return
self.readers[client.fileno()] = handler.request
if handler.fno is not None:
self.readers[handler.fno] = handler.reply
# statistics: number of chunks sent / received
handler.port = self.port
self.handlers[client.fileno()] = handler
self.available -= 1
@classmethod
def run(cls, routes, restrict=None):
cls.routes = dict(routes)
if restrict is not None:
lines = BASIC % dict(accept='DROP' if restrict else 'ACCEPT')
unix_cmd('iptables -F')
for line in lines.split('\n'):
if line.strip():
unix_cmd(line)
if restrict:
unix_cmd(FILTER % 22)
AcceptHandler(1111, None, InfoHandler, 5)
for port, dest in routes.items():
port=int(port)
if restrict:
unix_cmd(FILTER % port)
if '/' in dest:
AcceptHandler(port, dest, SerialHandler)
else:
host, _, remoteport = dest.partition(':')
if remoteport:
remoteport = int(remoteport)
else:
remoteport = port
AcceptHandler(port, (host, remoteport), TcpHandler)
while True:
try:
ready, _, _ = select(cls.readers, [], [])
except Exception as e:
for r in cls.readers:
try:
select([r], [], [], 0.1)
except Exception as e:
print(r, e)
raise
for fno in ready:
cls.readers[fno]()
if __name__ == '__main__':
parser = ConfigParser()
cfgfiles = glob('/root/aputools/servercfg/%s_*.cfg' % socket.gethostname())
if len(cfgfiles) != 1:
raise ValueError('there must be one and only one single cfgfile %r' % cfgfiles)
parser.read(cfgfiles[0])
if parser.has_section('ROUTER'):
AcceptHandler.run(parser['ROUTER'])