fix pending accepts behaviour
+ check if interface is plugged
This commit is contained in:
302
router.py
302
router.py
@ -1,6 +1,10 @@
|
||||
"""router"""
|
||||
|
||||
import sys
|
||||
import os
|
||||
import socket
|
||||
import time
|
||||
import re
|
||||
from glob import glob
|
||||
from select import select
|
||||
from serial import serial_for_url
|
||||
@ -73,8 +77,6 @@ for arg in sys.argv[1:]:
|
||||
|
||||
log = Log(loglev)
|
||||
|
||||
print(log.loggers)
|
||||
|
||||
FILTER = "iptables -i enp4s0 -p tcp -m tcp --dport %d -j ACCEPT"
|
||||
|
||||
BASIC = """
|
||||
@ -93,14 +95,60 @@ def unix_cmd(command):
|
||||
return Popen(command.split(), stdout=PIPE).communicate()[0].decode()
|
||||
|
||||
|
||||
IP_ROUTE_PAT = re.compile(r'(.* (via) .*|.*) dev ([^ ]+) ')
|
||||
|
||||
|
||||
def check_net(address):
|
||||
"""check to if interface is up where address would be routed
|
||||
|
||||
return ifname: interface is up
|
||||
return False: interface is down
|
||||
return None: rout needs via -> do not consider this
|
||||
"""
|
||||
ret = Popen(['ip', 'route', 'get', address], stdout=PIPE).communicate()[0].decode()
|
||||
match = IP_ROUTE_PAT.match(ret)
|
||||
if match:
|
||||
if match.group(2):
|
||||
return None # via
|
||||
ifname = match.group(3)
|
||||
with open(f'/sys/class/net/{ifname}/carrier') as f:
|
||||
if f.read().startswith('1'):
|
||||
return ifname
|
||||
return False
|
||||
|
||||
|
||||
class IoHandler:
|
||||
client = None
|
||||
handler = None
|
||||
service = None
|
||||
port = None
|
||||
fno = None
|
||||
|
||||
def __init__(self, client, handler):
|
||||
self.handler = handler
|
||||
def __init__(self, client, service):
|
||||
self.service = service
|
||||
self.client = client
|
||||
|
||||
def close(self):
|
||||
service, client = self.service, self.client
|
||||
fno = 'undefined'
|
||||
try:
|
||||
fno = client.fileno()
|
||||
service.readers.pop(fno, None)
|
||||
client.shutdown(socket.SHUT_RDWR)
|
||||
client.close()
|
||||
log.info('closed connection from port %s fno %s', service.port, fno)
|
||||
except Exception as e:
|
||||
log.error('ERROR closing client fno %s: %r', fno, e)
|
||||
service.handlers.pop(self.fno, None)
|
||||
self.client = None
|
||||
if self.fno:
|
||||
self.fno = None
|
||||
if service.pending_accepts and len(service.handlers) < service.maxcount:
|
||||
service.complete_accept(service.pending_accepts.pop(0))
|
||||
|
||||
|
||||
class Router(IoHandler):
|
||||
def __init__(self, client, service):
|
||||
super().__init__(client, service)
|
||||
self.sentchunks = 0
|
||||
self.sentbytes = 0
|
||||
self.rcvdchunks = 0
|
||||
@ -110,20 +158,27 @@ class IoHandler:
|
||||
try:
|
||||
data = self.client.recv(1024)
|
||||
if data:
|
||||
log.debug('< %r', data)
|
||||
if len(data) > 50:
|
||||
log.debug('< %r ... (%d)', data[:40], len(data))
|
||||
else:
|
||||
log.debug('< %r', data)
|
||||
self.write(data)
|
||||
self.sentbytes += len(data)
|
||||
self.sentchunks += 1
|
||||
return
|
||||
except Exception as e:
|
||||
log.error('ERROR in request: %r', e)
|
||||
msg = f'error in request: {e!r}'
|
||||
service.failures[service.port] = msg
|
||||
log.error(msg)
|
||||
self.close()
|
||||
self.handler.close_client(self)
|
||||
|
||||
def reply(self):
|
||||
try:
|
||||
data = self.read()
|
||||
log.debug('> %r', data)
|
||||
if len(data) > 50:
|
||||
log.debug('> %r ... (%d)', data[:40], len(data))
|
||||
else:
|
||||
log.debug('> %r', data)
|
||||
self.client.sendall(data)
|
||||
self.rcvdbytes += len(data)
|
||||
self.rcvdchunks += 1
|
||||
@ -131,19 +186,59 @@ class IoHandler:
|
||||
except ConnectionResetError:
|
||||
pass
|
||||
except Exception as e:
|
||||
log.error('ERROR in reply: %r', e)
|
||||
msg = f'error in reply: {e!r}'
|
||||
service.failures[service.port] = msg
|
||||
self.close()
|
||||
self.handler.close_client(self)
|
||||
|
||||
|
||||
class TcpHandler(IoHandler):
|
||||
def __init__(self, client, handler):
|
||||
log.info('create %r', handler.addr)
|
||||
self.socket = socket.create_connection(handler.addr, timeout=5)
|
||||
self.socket.settimeout(1)
|
||||
self.fno = self.socket.fileno()
|
||||
super().__init__(client, handler)
|
||||
log.debug('created')
|
||||
def close(self):
|
||||
self.service.readers.pop(self.fno, None)
|
||||
super().close()
|
||||
|
||||
|
||||
class TcpHandler(Router):
|
||||
socket = None
|
||||
fno = None
|
||||
|
||||
def __init__(self, client, service):
|
||||
super().__init__(client, service)
|
||||
host, port = service.addr
|
||||
self.addr_todo = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM)
|
||||
if not self.addr_todo:
|
||||
self.log.error('can not resolve %s', host)
|
||||
return
|
||||
self.ifup = False
|
||||
self.connect_first()
|
||||
|
||||
def connect_first(self):
|
||||
while self.addr_todo:
|
||||
af, socktype, proto, _, sock_adr = self.addr_todo.pop(0)
|
||||
self.ifup = check_net(sock_adr[0])
|
||||
if not self.ifup:
|
||||
continue
|
||||
try:
|
||||
self.socket = sock = socket.socket(af, socktype, proto)
|
||||
self.fno = sock.fileno()
|
||||
log.info('try connecting to %r fno %s', sock_adr, self.fno)
|
||||
sock.setblocking(False)
|
||||
try:
|
||||
sock.connect(sock_adr)
|
||||
except BlockingIOError:
|
||||
pass
|
||||
self.connect_deadline = time.time() + self.service.tmo
|
||||
self.service.pending_connects[self.fno] = self.check_connected
|
||||
return sock
|
||||
except Exception as e:
|
||||
msg = f'error in connect: {e!r}'
|
||||
service.failures[service.port] = msg
|
||||
log.error(msg)
|
||||
self.close()
|
||||
if self.ifup:
|
||||
msg = f'Timeout connecting to {self.service.addr}'
|
||||
else:
|
||||
msg = f'interface for {self.service.addr} is unplugged'
|
||||
log.error(msg)
|
||||
self.service.failures[self.service.port] = msg
|
||||
self.close()
|
||||
|
||||
def read(self):
|
||||
data = self.socket.recv(1024)
|
||||
@ -155,18 +250,36 @@ class TcpHandler(IoHandler):
|
||||
self.socket.sendall(data)
|
||||
|
||||
def close(self):
|
||||
try:
|
||||
self.socket.close()
|
||||
except Exception as e:
|
||||
log.error('ERROR in close: %r', e)
|
||||
if self.socket:
|
||||
self.service.pending_connects.pop(self.fno, None)
|
||||
try:
|
||||
self.socket.shutdown(socket.SHUT_RDWR)
|
||||
self.socket.close()
|
||||
log.info('close fno %s', self.fno)
|
||||
except Exception as e:
|
||||
log.error('ERROR in close fno %s: %r', self.fno, e)
|
||||
super().close()
|
||||
|
||||
def check_connected(self, ready):
|
||||
if ready:
|
||||
log.info('connected fno %s', self.fno)
|
||||
self.socket.setblocking(True)
|
||||
self.service.readers[self.client.fileno()] = self.request
|
||||
self.service.readers[self.fno] = self.reply
|
||||
elif time.time() > self.connect_deadline:
|
||||
if self.addr_todo:
|
||||
self.close()
|
||||
self.connect_first()
|
||||
|
||||
|
||||
class SerialHandler(IoHandler):
|
||||
def __init__(self, client, handler):
|
||||
self.serial = serial_for_url(handler.addr, timeout=10)
|
||||
class SerialHandler(Router):
|
||||
def __init__(self, client, service):
|
||||
self.serial = serial_for_url(service.addr, timeout=10)
|
||||
self.serial.timeout = None
|
||||
self.fno = self.serial.fileno()
|
||||
super().__init__(client, handler)
|
||||
super().__init__(client, service)
|
||||
service.readers[client.fileno()] = self.request
|
||||
service.readers[service.fno] = self.reply
|
||||
|
||||
def read(self):
|
||||
return self.serial.read(self.serial.in_waiting)
|
||||
@ -176,21 +289,26 @@ class SerialHandler(IoHandler):
|
||||
|
||||
def close(self):
|
||||
self.serial.close()
|
||||
super().close()
|
||||
|
||||
|
||||
class InfoHandler(IoHandler):
|
||||
def __init__(self, client, handler, loglevel=Log.INFO):
|
||||
super().__init__(client, handler)
|
||||
info = [f'{k} -> {v}' for k, v in AcceptHandler.routes.items()]
|
||||
if AcceptHandler.handlers:
|
||||
def __init__(self, client, service, loglevel=Log.INFO):
|
||||
super().__init__(client, service)
|
||||
info = [f'{k} -> {s.addr}' for k, s in service.routes.items()]
|
||||
if sum(len(s.handlers) for s in service.routes.values()):
|
||||
info.append('\nactive routings, statistics bytes/chunks')
|
||||
info.append('fno port sent received')
|
||||
for fno, h in AcceptHandler.handlers.items():
|
||||
info.append(f'{fno} {h.port} {h.sentbytes:d}/{h.sentchunks:d} {h.rcvdbytes:d}/{h.rcvdchunks:d}')
|
||||
for s in service.routes.values():
|
||||
for fno, h in s.handlers.items():
|
||||
info.append(f'{fno} {h.port} {h.sentbytes:d}/{h.sentchunks:d} {h.rcvdbytes:d}/{h.rcvdchunks:d}')
|
||||
if service.failures:
|
||||
info.append('\nport last error')
|
||||
for port, msg in service.failures.items():
|
||||
info.append(f'{port:5d} {msg}')
|
||||
info.append('')
|
||||
self.loggerkey = client.fileno()
|
||||
log.add(loglevel, self.loggerkey, self.emit)
|
||||
self.fno = None
|
||||
self.emit('\n'.join(info))
|
||||
|
||||
def read(self):
|
||||
@ -206,16 +324,16 @@ class InfoHandler(IoHandler):
|
||||
try:
|
||||
self.client.sendall(line.encode('utf-8'))
|
||||
self.client.sendall(b'\n')
|
||||
except TimeoutError:
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
class AcceptHandler:
|
||||
"""handler for routing
|
||||
class Service:
|
||||
"""service handler
|
||||
|
||||
:param: port offered port for routing
|
||||
:param: addr where to route
|
||||
:param: iocls the io handler class, currently TcpHandler or SerialHandler
|
||||
:param: addr where to route (or None for InfoHandler)
|
||||
:param: iocls the io handler class, currently TcpHandler, SerialHandler, InfoHandler
|
||||
:param: maxcount the maximal number of concurrent connections. defaults to 1
|
||||
as a side effect, if the destination is a web server, the traffic
|
||||
is serialized (only one connection at a time), which helps for
|
||||
@ -223,10 +341,13 @@ class AcceptHandler:
|
||||
reused: in this case maxcount has to be increased ...
|
||||
"""
|
||||
readers = {}
|
||||
handlers = {}
|
||||
pending_connects = {}
|
||||
routes = {}
|
||||
failures = {}
|
||||
tmo = 5
|
||||
|
||||
def __init__(self, port, addr, iocls, maxcount=None, handler_args=()):
|
||||
self.handlers = {}
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
s.bind(('0.0.0.0', port))
|
||||
@ -237,52 +358,48 @@ class AcceptHandler:
|
||||
self.handler_args = handler_args
|
||||
self.readers[s.fileno()] = self.accept
|
||||
self.port = port
|
||||
if addr:
|
||||
self.routes[port] = self # not for InfoHandler
|
||||
if maxcount is None:
|
||||
maxcount = 8 if addr[1] == 80 else 1
|
||||
self.available = maxcount
|
||||
self.pending = 0
|
||||
maxcount = 12 if addr[1] == 80 else 1
|
||||
self.maxcount = maxcount
|
||||
self.pending_accepts = []
|
||||
log.info('listening at port %s for %s %r', port, iocls.__name__, addr)
|
||||
|
||||
def close_client(self, iohandler):
|
||||
self.readers.pop(iohandler.fno, None)
|
||||
try:
|
||||
client = iohandler.client
|
||||
fno = client.fileno()
|
||||
self.readers.pop(fno)
|
||||
client.close()
|
||||
log.info('closed connection from port %s fno %s', self.port, fno)
|
||||
except Exception as e:
|
||||
log.error('ERROR in close_client: %r', e)
|
||||
self.handlers.pop(fno, None)
|
||||
iohandler.client = None
|
||||
iohandler.fno = None
|
||||
self.available += 1
|
||||
if self.pending:
|
||||
self.pending -= 1
|
||||
self.accept()
|
||||
|
||||
def accept(self):
|
||||
if not self.available:
|
||||
self.pending += 1
|
||||
return
|
||||
try:
|
||||
client, addr = self.socket.accept()
|
||||
log.info('accepted %r on %r fno %s', addr, self.port, client.fileno())
|
||||
except Exception as e:
|
||||
log.error('error accepting on port %s: %r', self.port, e)
|
||||
return
|
||||
fno = client.fileno()
|
||||
log.info('accepted %r on %r fno %s', addr, self.port, fno)
|
||||
if len(self.handlers) >= self.maxcount:
|
||||
self.pending_accepts.append(client)
|
||||
log.info('%d pending client connections on port %d', len(self.pending_accepts), self.port)
|
||||
else:
|
||||
self.complete_accept(client)
|
||||
|
||||
def complete_accept(self, client):
|
||||
try:
|
||||
handler = self.iocls(client, self, *self.handler_args)
|
||||
except Exception as e:
|
||||
log.error('ERROR creating %s(%r): %r', self.iocls.__name__, self.addr, e)
|
||||
client.close()
|
||||
msg = f'error creating {self.iocls.__name__}({self.addr}): {e!r}'
|
||||
self.failures[self.port] = msg
|
||||
log.error(msg)
|
||||
try:
|
||||
client.shutdown(socket.SHUT_RDWR)
|
||||
client.close()
|
||||
except Exception:
|
||||
pass
|
||||
return
|
||||
self.readers[client.fileno()] = handler.request
|
||||
if handler.fno is not None:
|
||||
self.readers[handler.fno] = handler.reply
|
||||
handler.port = self.port
|
||||
self.handlers[client.fileno()] = handler
|
||||
self.available -= 1
|
||||
if handler.fno:
|
||||
self.failures.pop(self.port, None)
|
||||
self.handlers[handler.fno] = handler
|
||||
|
||||
@classmethod
|
||||
def run(cls, routes, restrict=None):
|
||||
cls.routes = dict(routes)
|
||||
if restrict is not None:
|
||||
lines = BASIC % dict(accept='DROP' if restrict else 'ACCEPT')
|
||||
unix_cmd('iptables -F')
|
||||
@ -292,33 +409,48 @@ class AcceptHandler:
|
||||
if restrict:
|
||||
unix_cmd(FILTER % 22)
|
||||
|
||||
AcceptHandler(1110, None, InfoHandler, 5, handler_args=(log.DEBUG,))
|
||||
AcceptHandler(1111, None, InfoHandler, 5, handler_args=(log.INFO,))
|
||||
AcceptHandler(1112, None, InfoHandler, 5, handler_args=(log.WARN,))
|
||||
Service(1110, None, InfoHandler, 5, handler_args=(log.DEBUG,))
|
||||
Service(1111, None, InfoHandler, 5, handler_args=(log.INFO,))
|
||||
Service(1112, None, InfoHandler, 5, handler_args=(log.WARN,))
|
||||
for port, dest in routes.items():
|
||||
port=int(port)
|
||||
port = int(port)
|
||||
if restrict:
|
||||
unix_cmd(FILTER % port)
|
||||
if '/' in dest:
|
||||
AcceptHandler(port, dest, SerialHandler)
|
||||
Service(port, dest, SerialHandler)
|
||||
else:
|
||||
host, _, remoteport = dest.partition(':')
|
||||
if remoteport:
|
||||
remoteport = int(remoteport)
|
||||
else:
|
||||
remoteport = port
|
||||
AcceptHandler(port, (host, remoteport), TcpHandler)
|
||||
Service(port, (host, remoteport), TcpHandler)
|
||||
while True:
|
||||
try:
|
||||
# log.debug('select %r', list(cls.readers))
|
||||
ready, _, _ = select(cls.readers, [], [])
|
||||
if cls.pending_connects:
|
||||
while True:
|
||||
ready, complete, _ = select(cls.readers, cls.pending_connects, [], 1)
|
||||
for fno in complete:
|
||||
cls.pending_connects.pop(fno)(True)
|
||||
for func in list(cls.pending_connects.values()):
|
||||
func(False) # check for connect timeouts
|
||||
if ready:
|
||||
break
|
||||
else:
|
||||
ready, complete, _ = select(cls.readers, [], [])
|
||||
# log.debug('ready %r', ready)
|
||||
except Exception as e:
|
||||
for r in cls.readers:
|
||||
for fno in cls.readers:
|
||||
try:
|
||||
select([r], [], [], 0.1)
|
||||
select([fno], [], [], 0.1)
|
||||
except Exception as e:
|
||||
log.error('%r in select([%d])', e, r)
|
||||
log.error('%r in select([%d])', e, fno)
|
||||
for fno in cls.pending_connects:
|
||||
try:
|
||||
select([], [fno], [], 0.1)
|
||||
except Exception as e:
|
||||
log.error('%r in select([],[%d])', e, fno)
|
||||
raise
|
||||
for fno in ready:
|
||||
cls.readers[fno]()
|
||||
@ -331,4 +463,4 @@ if __name__ == '__main__':
|
||||
raise ValueError('there must be one and only one single cfgfile %r' % cfgfiles)
|
||||
parser.read(cfgfiles[0])
|
||||
if parser.has_section('ROUTER'):
|
||||
AcceptHandler.run(parser['ROUTER'])
|
||||
Service.run(parser['ROUTER'])
|
||||
|
Reference in New Issue
Block a user