479 lines
15 KiB
Python
479 lines
15 KiB
Python
"""router"""
|
|
|
|
import sys
|
|
import os
|
|
import socket
|
|
import time
|
|
import re
|
|
from glob import glob
|
|
from select import select
|
|
from serial import serial_for_url
|
|
from subprocess import Popen, PIPE, check_output, call, DEVNULL
|
|
from utils import BoxInfo, change_firewall
|
|
|
|
class Log:
|
|
DEBUG = 0
|
|
INFO = 1
|
|
WARN = 2
|
|
ERROR = 3
|
|
QUIET = 4
|
|
|
|
def __init__(self, console_level):
|
|
self.loggers = {lev: {} for lev in range(self.QUIET)}
|
|
self.add(console_level, None, print)
|
|
|
|
def add(self, level, key, emit):
|
|
for lev in range(level, self.QUIET):
|
|
self.loggers[lev][key] = emit
|
|
|
|
def discard(self, key):
|
|
for lev, emitters in self.loggers.items():
|
|
emitters.pop(key, None)
|
|
|
|
def debug(self, fmt, *args):
|
|
loggers = self.loggers[self.DEBUG]
|
|
if loggers:
|
|
line = fmt % args
|
|
for emit in loggers.values():
|
|
emit(line)
|
|
|
|
def info(self, fmt, *args):
|
|
loggers = self.loggers[self.INFO]
|
|
if loggers:
|
|
line = fmt % args
|
|
for emit in loggers.values():
|
|
emit(line)
|
|
|
|
def warn(self, fmt, *args):
|
|
loggers = self.loggers[self.WARN]
|
|
if loggers:
|
|
line = fmt % args
|
|
for emit in loggers.values():
|
|
emit(line)
|
|
|
|
def error(self, fmt, *args):
|
|
loggers = self.loggers[self.ERROR]
|
|
if loggers:
|
|
line = fmt % args
|
|
for emit in loggers.values():
|
|
emit(line)
|
|
|
|
|
|
sim = False
|
|
|
|
if sys.argv[0] == 'router.py': # started manually
|
|
loglev = Log.INFO
|
|
else:
|
|
loglev = Log.WARN
|
|
|
|
for arg in sys.argv[1:]:
|
|
if arg == '-v':
|
|
loglev = Log.DEBUG
|
|
elif arg == '-s':
|
|
sim = True
|
|
else:
|
|
raise ValueError(f'do not know {arg!r}')
|
|
|
|
log = Log(loglev)
|
|
|
|
FILTER = "iptables -i eth0 -p tcp -m tcp --dport %d -j ACCEPT"
|
|
|
|
BASIC = """
|
|
iptables -P INPUT %(accept)s
|
|
iptables -P FORWARD %(accept)s
|
|
iptables -P OUTPUT ACCEPT
|
|
iptables -A INPUT -i lo -j ACCEPT
|
|
"""
|
|
|
|
|
|
def unix_cmd(command):
|
|
if sim:
|
|
log.info('> %r' % command)
|
|
else:
|
|
log.info('$ %r' % command)
|
|
return Popen(command.split(), stdout=PIPE).communicate()[0].decode()
|
|
|
|
|
|
IP_ROUTE_PAT = re.compile(r'(.* (via) .*|.*) dev ([^ ]+) ')
|
|
|
|
|
|
def check_net(address):
|
|
"""check to if interface is up where address would be routed
|
|
|
|
return ifname: interface is up
|
|
return False: interface is down
|
|
return None: rout needs via -> do not consider this
|
|
"""
|
|
ret = Popen(['ip', 'route', 'get', address], stdout=PIPE).communicate()[0].decode()
|
|
match = IP_ROUTE_PAT.match(ret)
|
|
if match:
|
|
if match.group(2):
|
|
return None # via
|
|
ifname = match.group(3)
|
|
with open(f'/sys/class/net/{ifname}/carrier') as f:
|
|
if f.read().startswith('1'):
|
|
return ifname
|
|
return False
|
|
|
|
|
|
class IoHandler:
|
|
client = None
|
|
service = None
|
|
port = None
|
|
fno = None
|
|
|
|
def __init__(self, client, service):
|
|
self.service = service
|
|
self.client = client
|
|
|
|
def close(self):
|
|
service, client = self.service, self.client
|
|
fno = 'undefined'
|
|
try:
|
|
fno = client.fileno()
|
|
service.readers.pop(fno, None)
|
|
client.shutdown(socket.SHUT_RDWR)
|
|
client.close()
|
|
log.info('closed connection from port %s fno %s', service.port, fno)
|
|
except Exception as e:
|
|
log.error('ERROR closing client fno %s: %r', fno, e)
|
|
service.handlers.pop(self.fno, None)
|
|
self.client = None
|
|
if self.fno:
|
|
self.fno = None
|
|
if service.pending_accepts and len(service.handlers) < service.maxcount:
|
|
service.complete_accept(service.pending_accepts.pop(0))
|
|
|
|
|
|
class Router(IoHandler):
|
|
def __init__(self, client, service):
|
|
super().__init__(client, service)
|
|
self.sentchunks = 0
|
|
self.sentbytes = 0
|
|
self.rcvdchunks = 0
|
|
self.rcvdbytes = 0
|
|
|
|
def request(self):
|
|
try:
|
|
data = self.client.recv(1024)
|
|
if data:
|
|
if len(data) > 50:
|
|
log.debug('< %r ... (%d)', data[:40], len(data))
|
|
else:
|
|
log.debug('< %r', data)
|
|
self.write(data)
|
|
self.sentbytes += len(data)
|
|
self.sentchunks += 1
|
|
return
|
|
except Exception as e:
|
|
msg = f'error in request: {e!r}'
|
|
self.service.failures[self.service.port] = msg
|
|
Service.deadline = time.time() + 10
|
|
Service.reopen.append(self.service)
|
|
self.service.close()
|
|
log.error(msg)
|
|
self.close()
|
|
|
|
def reply(self):
|
|
try:
|
|
data = self.read()
|
|
if len(data) > 50:
|
|
log.debug('> %r ... (%d)', data[:40], len(data))
|
|
else:
|
|
log.debug('> %r', data)
|
|
self.client.sendall(data)
|
|
self.rcvdbytes += len(data)
|
|
self.rcvdchunks += 1
|
|
return
|
|
except ConnectionResetError:
|
|
pass
|
|
except Exception as e:
|
|
msg = f'error in reply: {e!r}'
|
|
self.service.failures[self.service.port] = msg
|
|
self.close()
|
|
|
|
def close(self):
|
|
self.service.readers.pop(self.fno, None)
|
|
super().close()
|
|
|
|
|
|
class TcpHandler(Router):
|
|
socket = None
|
|
fno = None
|
|
|
|
def __init__(self, client, service):
|
|
super().__init__(client, service)
|
|
host, port = service.addr
|
|
self.addr_todo = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM)
|
|
if not self.addr_todo:
|
|
self.log.error('can not resolve %s', host)
|
|
return
|
|
self.ifup = False
|
|
self.connect_first()
|
|
|
|
def connect_first(self):
|
|
while self.addr_todo:
|
|
af, socktype, proto, _, sock_adr = self.addr_todo.pop(0)
|
|
self.ifup = check_net(sock_adr[0])
|
|
if not self.ifup:
|
|
continue
|
|
try:
|
|
self.socket = sock = socket.socket(af, socktype, proto)
|
|
self.fno = sock.fileno()
|
|
log.info('try connecting to %r fno %s', sock_adr, self.fno)
|
|
sock.setblocking(False)
|
|
try:
|
|
sock.connect(sock_adr)
|
|
except BlockingIOError:
|
|
pass
|
|
self.connect_deadline = time.time() + self.service.tmo
|
|
self.service.pending_connects[self.fno] = self.check_connected
|
|
return sock
|
|
except Exception as e:
|
|
msg = f'error in connect: {e!r}'
|
|
service.failures[service.port] = msg
|
|
log.error(msg)
|
|
self.close()
|
|
if self.ifup:
|
|
msg = f'Timeout connecting to {self.service.addr}'
|
|
else:
|
|
msg = f'interface for {self.service.addr} is unplugged'
|
|
log.error(msg)
|
|
self.service.failures[self.service.port] = msg
|
|
self.close()
|
|
|
|
def read(self):
|
|
data = self.socket.recv(1024)
|
|
if not data:
|
|
raise ConnectionResetError('disconnected')
|
|
return data
|
|
|
|
def write(self, data):
|
|
self.socket.sendall(data)
|
|
|
|
def close(self):
|
|
if self.socket:
|
|
self.service.pending_connects.pop(self.fno, None)
|
|
try:
|
|
self.socket.shutdown(socket.SHUT_RDWR)
|
|
self.socket.close()
|
|
log.info('close fno %s', self.fno)
|
|
except Exception as e:
|
|
log.error('ERROR in close fno %s: %r', self.fno, e)
|
|
super().close()
|
|
|
|
def check_connected(self, ready):
|
|
if ready:
|
|
log.info('connected fno %s', self.fno)
|
|
self.socket.setblocking(True)
|
|
self.service.readers[self.client.fileno()] = self.request
|
|
self.service.readers[self.fno] = self.reply
|
|
elif time.time() > self.connect_deadline:
|
|
if self.addr_todo:
|
|
self.close()
|
|
self.connect_first()
|
|
|
|
|
|
class SerialHandler(Router):
|
|
def __init__(self, client, service):
|
|
self.serial = serial_for_url(service.addr, timeout=10)
|
|
self.serial.timeout = None
|
|
self.fno = self.serial.fileno()
|
|
super().__init__(client, service)
|
|
service.readers[client.fileno()] = self.request
|
|
service.readers[self.fno] = self.reply
|
|
|
|
def read(self):
|
|
return self.serial.read(self.serial.in_waiting)
|
|
|
|
def write(self, data):
|
|
self.serial.write(data)
|
|
|
|
def close(self):
|
|
self.serial.close()
|
|
super().close()
|
|
|
|
|
|
class InfoHandler(IoHandler):
|
|
def __init__(self, client, service, loglevel=Log.INFO):
|
|
super().__init__(client, service)
|
|
info = [f'{k} -> {s.addr}' for k, s in service.routes.items()]
|
|
if sum(len(s.handlers) for s in service.routes.values()):
|
|
info.append('\nactive routings, statistics bytes/chunks')
|
|
info.append('fno port sent received')
|
|
for s in service.routes.values():
|
|
for fno, h in s.handlers.items():
|
|
info.append(f'{fno} {h.port} {h.sentbytes:d}/{h.sentchunks:d} {h.rcvdbytes:d}/{h.rcvdchunks:d}')
|
|
if service.failures:
|
|
info.append('\nport last error')
|
|
for port, msg in service.failures.items():
|
|
info.append(f'{port:5d} {msg}')
|
|
info.append('')
|
|
self.loggerkey = client.fileno()
|
|
log.add(loglevel, self.loggerkey, self.emit)
|
|
self.emit('\n'.join(info))
|
|
|
|
def read(self):
|
|
return b''
|
|
|
|
def write(self, data):
|
|
pass
|
|
|
|
def close(self):
|
|
log.discard(self.loggerkey)
|
|
|
|
def emit(self, line):
|
|
try:
|
|
self.client.sendall(line.encode('utf-8'))
|
|
self.client.sendall(b'\n')
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
class Service:
|
|
"""service handler
|
|
|
|
:param: port offered port for routing
|
|
:param: addr where to route (or None for InfoHandler)
|
|
:param: iocls the io handler class, currently TcpHandler, SerialHandler, InfoHandler
|
|
:param: maxcount the maximal number of concurrent connections. defaults to 1
|
|
as a side effect, if the destination is a web server, the traffic
|
|
is serialized (only one connection at a time), which helps for
|
|
some moxa device servers. might be a problem, if connections are
|
|
reused: in this case maxcount has to be increased ...
|
|
"""
|
|
readers = {}
|
|
pending_connects = {}
|
|
routes = {}
|
|
failures = {}
|
|
firewall_ports = set()
|
|
tmo = 5
|
|
reopen = []
|
|
deadline = None
|
|
|
|
def __init__(self, port, addr, iocls, maxcount=None, handler_args=()):
|
|
self.handlers = {}
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
s.bind(('0.0.0.0', port))
|
|
s.listen()
|
|
self.socket = s
|
|
self.addr = addr
|
|
self.iocls = iocls
|
|
self.handler_args = handler_args
|
|
self.readers[s.fileno()] = self.accept
|
|
self.port = port
|
|
self.firewall_ports.add(port)
|
|
if addr:
|
|
self.routes[port] = self # not for InfoHandler
|
|
if maxcount is None:
|
|
maxcount = 12 if addr[1] == 80 else 1
|
|
self.maxcount = maxcount
|
|
self.pending_accepts = []
|
|
log.info('listening at port %s for %s %r', port, iocls.__name__, addr)
|
|
|
|
def accept(self):
|
|
try:
|
|
client, addr = self.socket.accept()
|
|
except Exception as e:
|
|
log.error('error accepting on port %s: %r', self.port, e)
|
|
return
|
|
fno = client.fileno()
|
|
log.info('accepted %r on %r fno %s', addr, self.port, fno)
|
|
if len(self.handlers) >= self.maxcount:
|
|
self.pending_accepts.append(client)
|
|
log.info('%d pending client connections on port %d', len(self.pending_accepts), self.port)
|
|
else:
|
|
self.complete_accept(client)
|
|
|
|
def complete_accept(self, client):
|
|
try:
|
|
handler = self.iocls(client, self, *self.handler_args)
|
|
except Exception as e:
|
|
msg = f'error creating {self.iocls.__name__}({self.addr}): {e!r}'
|
|
self.failures[self.port] = msg
|
|
log.error(msg)
|
|
try:
|
|
client.shutdown(socket.SHUT_RDWR)
|
|
client.close()
|
|
except Exception:
|
|
pass
|
|
return
|
|
handler.port = self.port
|
|
if handler.fno:
|
|
self.failures.pop(self.port, None)
|
|
self.handlers[handler.fno] = handler
|
|
|
|
def close(self):
|
|
s = self.socket
|
|
self.readers.pop(s.fileno(), None)
|
|
s.close()
|
|
|
|
def clone(self):
|
|
return type(self)(self.port, self.addr, self.iocls)
|
|
|
|
@classmethod
|
|
def check_deadline(cls):
|
|
if cls.deadline and time.time() > cls.deadline:
|
|
cls.deadline = None
|
|
while cls.reopen:
|
|
cls.reopen.pop().clone()
|
|
|
|
@classmethod
|
|
def run(cls, routes):
|
|
firewall = routes.pop('firewall', None)
|
|
firewall_on = firewall is not None
|
|
if firewall_on:
|
|
cls.firewall_ports = set(int(r) for r in firewall.split(',') if r.strip())
|
|
Service(1110, None, InfoHandler, 5, handler_args=(log.DEBUG,))
|
|
Service(1111, None, InfoHandler, 5, handler_args=(log.INFO,))
|
|
Service(1112, None, InfoHandler, 5, handler_args=(log.WARN,))
|
|
for port, dest in routes.items():
|
|
port = int(port)
|
|
if '/' in dest:
|
|
Service(port, dest, SerialHandler)
|
|
else:
|
|
host, _, remoteport = dest.partition(':')
|
|
if remoteport:
|
|
remoteport = int(remoteport)
|
|
else:
|
|
remoteport = port
|
|
Service(port, (host, remoteport), TcpHandler)
|
|
change_firewall(firewall_on, cls.firewall_ports)
|
|
while True:
|
|
try:
|
|
# log.debug('select %r', list(cls.readers))
|
|
if cls.pending_connects:
|
|
while True:
|
|
ready, complete, _ = select(cls.readers, cls.pending_connects, [], 1)
|
|
for fno in complete:
|
|
cls.pending_connects.pop(fno)(True)
|
|
for func in list(cls.pending_connects.values()):
|
|
func(False) # check for connect timeouts
|
|
if ready:
|
|
break
|
|
else:
|
|
ready, complete, _ = select(cls.readers, [], [], 1)
|
|
cls.check_deadline()
|
|
# log.debug('ready %r', ready)
|
|
except Exception as e:
|
|
for fno in cls.readers:
|
|
try:
|
|
select([fno], [], [], 0.1)
|
|
except Exception as e:
|
|
log.error('%r in select([%d])', e, fno)
|
|
for fno in cls.pending_connects:
|
|
try:
|
|
select([], [fno], [], 0.1)
|
|
except Exception as e:
|
|
log.error('%r in select([],[%d])', e, fno)
|
|
raise
|
|
for fno in ready:
|
|
cls.readers.get(fno, int)() # int() -> dummy function
|
|
|
|
|
|
if __name__ == '__main__':
|
|
routercfg = BoxInfo().read_config('ROUTER')
|
|
if routercfg:
|
|
Service.run(routercfg)
|