unify versions
logging was with 2 different versions new: InfoHandler and Log class covers logging when stated from commandline and remote logging via ports 1110, 1111, 1112 (DEBUG, INFO, WARN levels)
This commit is contained in:
160
router.py
160
router.py
@ -1,29 +1,79 @@
|
|||||||
import sys
|
import sys
|
||||||
import os
|
import os
|
||||||
import socket
|
import socket
|
||||||
import logging
|
|
||||||
from glob import glob
|
from glob import glob
|
||||||
from select import select
|
from select import select
|
||||||
from serial import serial_for_url
|
from serial import serial_for_url
|
||||||
from subprocess import Popen, PIPE, check_output, call, DEVNULL
|
from subprocess import Popen, PIPE, check_output, call, DEVNULL
|
||||||
from configparser import ConfigParser
|
from configparser import ConfigParser
|
||||||
|
|
||||||
|
|
||||||
|
class Log:
|
||||||
|
DEBUG = 0
|
||||||
|
INFO = 1
|
||||||
|
WARN = 2
|
||||||
|
ERROR = 3
|
||||||
|
QUIET = 4
|
||||||
|
|
||||||
|
def __init__(self, console_level):
|
||||||
|
self.loggers = {lev: {} for lev in range(self.QUIET)}
|
||||||
|
self.add(console_level, None, print)
|
||||||
|
|
||||||
|
def add(self, level, key, emit):
|
||||||
|
for lev in range(level, self.QUIET):
|
||||||
|
self.loggers[lev][key] = emit
|
||||||
|
|
||||||
|
def discard(self, key):
|
||||||
|
for lev, emitters in self.loggers.items():
|
||||||
|
emitters.pop(key, None)
|
||||||
|
|
||||||
|
def debug(self, fmt, *args):
|
||||||
|
loggers = self.loggers[self.DEBUG]
|
||||||
|
if loggers:
|
||||||
|
line = fmt % args
|
||||||
|
for emit in loggers.values():
|
||||||
|
emit(line)
|
||||||
|
|
||||||
|
def info(self, fmt, *args):
|
||||||
|
loggers = self.loggers[self.INFO]
|
||||||
|
if loggers:
|
||||||
|
line = fmt % args
|
||||||
|
for emit in loggers.values():
|
||||||
|
emit(line)
|
||||||
|
|
||||||
|
def warn(self, fmt, *args):
|
||||||
|
loggers = self.loggers[self.WARN]
|
||||||
|
if loggers:
|
||||||
|
line = fmt % args
|
||||||
|
for emit in loggers.values():
|
||||||
|
emit(line)
|
||||||
|
|
||||||
|
def error(self, fmt, *args):
|
||||||
|
loggers = self.loggers[self.ERROR]
|
||||||
|
if loggers:
|
||||||
|
line = fmt % args
|
||||||
|
for emit in loggers.values():
|
||||||
|
emit(line)
|
||||||
|
|
||||||
|
|
||||||
sim = False
|
sim = False
|
||||||
|
|
||||||
if sys.argv[0] == 'router.py': # started manually
|
if sys.argv[0] == 'router.py': # started manually
|
||||||
loglev = logging.INFO
|
loglev = Log.INFO
|
||||||
else:
|
else:
|
||||||
loglev = logging.WARNING
|
loglev = Log.WARN
|
||||||
|
|
||||||
for arg in sys.argv[1:]:
|
for arg in sys.argv[1:]:
|
||||||
if arg == '-v':
|
if arg == '-v':
|
||||||
loglev = logging.DEBUG
|
loglev = Log.DEBUG
|
||||||
elif arg == '-s':
|
elif arg == '-s':
|
||||||
sim = True
|
sim = True
|
||||||
else:
|
else:
|
||||||
raise ValueError(f'do not know {arg!r}')
|
raise ValueError(f'do not know {arg!r}')
|
||||||
|
|
||||||
logging.basicConfig(format='%(levelname)1.1s: %(message)s', level=loglev)
|
log = Log(loglev)
|
||||||
|
|
||||||
|
print(log.loggers)
|
||||||
|
|
||||||
FILTER = "iptables -i enp4s0 -p tcp -m tcp --dport %d -j ACCEPT"
|
FILTER = "iptables -i enp4s0 -p tcp -m tcp --dport %d -j ACCEPT"
|
||||||
|
|
||||||
@ -34,56 +84,66 @@ iptables -P OUTPUT ACCEPT
|
|||||||
iptables -A INPUT -i lo -j ACCEPT
|
iptables -A INPUT -i lo -j ACCEPT
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
def unix_cmd(command):
|
def unix_cmd(command):
|
||||||
if sim:
|
if sim:
|
||||||
logging.info('> %r' % command)
|
log.info('> %r' % command)
|
||||||
else:
|
else:
|
||||||
logging.info('$ %r' % command)
|
log.info('$ %r' % command)
|
||||||
return Popen(command.split(), stdout=PIPE).communicate()[0].decode()
|
return Popen(command.split(), stdout=PIPE).communicate()[0].decode()
|
||||||
|
|
||||||
|
|
||||||
class IoHandler:
|
class IoHandler:
|
||||||
client = None
|
client = None
|
||||||
handler = None
|
handler = None
|
||||||
|
port = None
|
||||||
|
|
||||||
def __init__(self, client, handler):
|
def __init__(self, client, handler):
|
||||||
self.handler = handler
|
self.handler = handler
|
||||||
self.client = client
|
self.client = client
|
||||||
|
self.sentchunks = 0
|
||||||
|
self.sentbytes = 0
|
||||||
|
self.rcvdchunks = 0
|
||||||
|
self.rcvdbytes = 0
|
||||||
|
|
||||||
def request(self):
|
def request(self):
|
||||||
try:
|
try:
|
||||||
data = self.client.recv(1024)
|
data = self.client.recv(1024)
|
||||||
if data:
|
if data:
|
||||||
logging.debug('< %r', data)
|
log.debug('< %r', data)
|
||||||
self.write(data)
|
self.write(data)
|
||||||
|
self.sentbytes += len(data)
|
||||||
|
self.sentchunks += 1
|
||||||
return
|
return
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.error('ERROR in request: %r', e)
|
log.error('ERROR in request: %r', e)
|
||||||
self.close()
|
self.close()
|
||||||
self.handler.close_client(self)
|
self.handler.close_client(self)
|
||||||
|
|
||||||
def reply(self):
|
def reply(self):
|
||||||
try:
|
try:
|
||||||
data = self.read()
|
data = self.read()
|
||||||
logging.debug('> %r', data)
|
log.debug('> %r', data)
|
||||||
self.client.sendall(data)
|
self.client.sendall(data)
|
||||||
|
self.rcvdbytes += len(data)
|
||||||
|
self.rcvdchunks += 1
|
||||||
return
|
return
|
||||||
except ConnectionResetError:
|
except ConnectionResetError:
|
||||||
pass
|
pass
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.error('ERROR in reply: %r', e)
|
log.error('ERROR in reply: %r', e)
|
||||||
self.close()
|
self.close()
|
||||||
self.handler.close_client(self)
|
self.handler.close_client(self)
|
||||||
|
|
||||||
|
|
||||||
class TcpHandler(IoHandler):
|
class TcpHandler(IoHandler):
|
||||||
def __init__(self, client, handler):
|
def __init__(self, client, handler):
|
||||||
logging.info('create %r', handler.addr)
|
log.info('create %r', handler.addr)
|
||||||
self.socket = socket.create_connection(handler.addr, timeout=5)
|
self.socket = socket.create_connection(handler.addr, timeout=5)
|
||||||
self.socket.settimeout(1)
|
self.socket.settimeout(1)
|
||||||
self.fno = self.socket.fileno()
|
self.fno = self.socket.fileno()
|
||||||
super().__init__(client, handler)
|
super().__init__(client, handler)
|
||||||
logging.debug('created')
|
log.debug('created')
|
||||||
|
|
||||||
def read(self):
|
def read(self):
|
||||||
data = self.socket.recv(1024)
|
data = self.socket.recv(1024)
|
||||||
@ -98,7 +158,7 @@ class TcpHandler(IoHandler):
|
|||||||
try:
|
try:
|
||||||
self.socket.close()
|
self.socket.close()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.error('ERROR in close: %r', e)
|
log.error('ERROR in close: %r', e)
|
||||||
|
|
||||||
|
|
||||||
class SerialHandler(IoHandler):
|
class SerialHandler(IoHandler):
|
||||||
@ -118,21 +178,55 @@ class SerialHandler(IoHandler):
|
|||||||
self.serial.close()
|
self.serial.close()
|
||||||
|
|
||||||
|
|
||||||
|
class InfoHandler(IoHandler):
|
||||||
|
def __init__(self, client, handler, loglevel=Log.INFO):
|
||||||
|
super().__init__(client, handler)
|
||||||
|
info = [f'{k} -> {v}' for k, v in AcceptHandler.routes.items()]
|
||||||
|
if AcceptHandler.handlers:
|
||||||
|
info.append('\nactive routings, statistics bytes/chunks')
|
||||||
|
info.append('fno port sent received')
|
||||||
|
for fno, h in AcceptHandler.handlers.items():
|
||||||
|
info.append(f'{fno} {h.port} {h.sentbytes:d}/{h.sentchunks:d} {h.rcvdbytes:d}/{h.rcvdchunks:d}')
|
||||||
|
info.append('')
|
||||||
|
self.loggerkey = client.fileno()
|
||||||
|
log.add(loglevel, self.loggerkey, self.emit)
|
||||||
|
self.fno = None
|
||||||
|
self.emit('\n'.join(info))
|
||||||
|
|
||||||
|
def read(self):
|
||||||
|
return b''
|
||||||
|
|
||||||
|
def write(self, data):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
log.discard(self.loggerkey)
|
||||||
|
|
||||||
|
def emit(self, line):
|
||||||
|
try:
|
||||||
|
self.client.sendall(line.encode('utf-8'))
|
||||||
|
self.client.sendall(b'\n')
|
||||||
|
except TimeoutError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
class AcceptHandler:
|
class AcceptHandler:
|
||||||
"""handler for routing
|
"""handler for routing
|
||||||
|
|
||||||
:param: port offered port for routing
|
:param: port offered port for routing
|
||||||
:param: addr where to route
|
:param: addr where to route
|
||||||
:param: iocls the io handler class, currently TcpHandler or SerialHandler
|
:param: iocls the io handler class, currently TcpHandler or SerialHandler
|
||||||
:param: maxcount the maximal number of concurrent connections. defauls to 1
|
:param: maxcount the maximal number of concurrent connections. defaults to 1
|
||||||
as a side effect, if the destination is a web server, the traffic
|
as a side effect, if the destination is a web server, the traffic
|
||||||
are serialized (only one connection at a time), which helps for
|
is serialized (only one connection at a time), which helps for
|
||||||
some moxa device servers. might be a problem, if connections are
|
some moxa device servers. might be a problem, if connections are
|
||||||
reused: in this case maxcount has to be increased ...
|
reused: in this case maxcount has to be increased ...
|
||||||
"""
|
"""
|
||||||
readers = {}
|
readers = {}
|
||||||
|
handlers = {}
|
||||||
|
routes = {}
|
||||||
|
|
||||||
def __init__(self, port, addr, iocls, maxcount=None):
|
def __init__(self, port, addr, iocls, maxcount=None, handler_args=()):
|
||||||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||||
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||||
s.bind(('0.0.0.0', port))
|
s.bind(('0.0.0.0', port))
|
||||||
@ -140,22 +234,26 @@ class AcceptHandler:
|
|||||||
self.socket = s
|
self.socket = s
|
||||||
self.addr = addr
|
self.addr = addr
|
||||||
self.iocls = iocls
|
self.iocls = iocls
|
||||||
|
self.handler_args = handler_args
|
||||||
self.readers[s.fileno()] = self.accept
|
self.readers[s.fileno()] = self.accept
|
||||||
self.port = port
|
self.port = port
|
||||||
if maxcount is None:
|
if maxcount is None:
|
||||||
maxcount = 8 if addr[1] == 80 else 1
|
maxcount = 8 if addr[1] == 80 else 1
|
||||||
self.available = maxcount
|
self.available = maxcount
|
||||||
self.pending = 0
|
self.pending = 0
|
||||||
logging.info('listening at port %d for %s(%r)', port, iocls.__name__, addr)
|
log.info('listening at port %s for %s %r', port, iocls.__name__, addr)
|
||||||
|
|
||||||
def close_client(self, iohandler):
|
def close_client(self, iohandler):
|
||||||
self.readers.pop(iohandler.fno, None)
|
self.readers.pop(iohandler.fno, None)
|
||||||
try:
|
try:
|
||||||
client = iohandler.client
|
client = iohandler.client
|
||||||
self.readers.pop(client.fileno())
|
fno = client.fileno()
|
||||||
|
self.readers.pop(fno)
|
||||||
client.close()
|
client.close()
|
||||||
|
log.info('closed connection from port %s fno %s', self.port, fno)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.error('ERROR in close_client: %r', e)
|
log.error('ERROR in close_client: %r', e)
|
||||||
|
self.handlers.pop(fno, None)
|
||||||
iohandler.client = None
|
iohandler.client = None
|
||||||
iohandler.fno = None
|
iohandler.fno = None
|
||||||
self.available += 1
|
self.available += 1
|
||||||
@ -169,18 +267,22 @@ class AcceptHandler:
|
|||||||
return
|
return
|
||||||
try:
|
try:
|
||||||
client, addr = self.socket.accept()
|
client, addr = self.socket.accept()
|
||||||
logging.info('accepted %r on %r', addr, self.port)
|
log.info('accepted %r on %r fno %s', addr, self.port, client.fileno())
|
||||||
handler = self.iocls(client, self)
|
handler = self.iocls(client, self, *self.handler_args)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.error('%r creating %s(%r)', e, self.iocls.__name__, self.addr)
|
log.error('ERROR creating %s(%r): %r', self.iocls.__name__, self.addr, e)
|
||||||
client.close()
|
client.close()
|
||||||
return
|
return
|
||||||
self.readers[client.fileno()] = handler.request
|
self.readers[client.fileno()] = handler.request
|
||||||
self.readers[handler.fno] = handler.reply
|
if handler.fno is not None:
|
||||||
|
self.readers[handler.fno] = handler.reply
|
||||||
|
handler.port = self.port
|
||||||
|
self.handlers[client.fileno()] = handler
|
||||||
self.available -= 1
|
self.available -= 1
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def run(cls, routes, restrict=None):
|
def run(cls, routes, restrict=None):
|
||||||
|
cls.routes = dict(routes)
|
||||||
if restrict is not None:
|
if restrict is not None:
|
||||||
lines = BASIC % dict(accept='DROP' if restrict else 'ACCEPT')
|
lines = BASIC % dict(accept='DROP' if restrict else 'ACCEPT')
|
||||||
unix_cmd('iptables -F')
|
unix_cmd('iptables -F')
|
||||||
@ -190,6 +292,9 @@ class AcceptHandler:
|
|||||||
if restrict:
|
if restrict:
|
||||||
unix_cmd(FILTER % 22)
|
unix_cmd(FILTER % 22)
|
||||||
|
|
||||||
|
AcceptHandler(1110, None, InfoHandler, 5, handler_args=(log.DEBUG,))
|
||||||
|
AcceptHandler(1111, None, InfoHandler, 5, handler_args=(log.INFO,))
|
||||||
|
AcceptHandler(1112, None, InfoHandler, 5, handler_args=(log.WARN,))
|
||||||
for port, dest in routes.items():
|
for port, dest in routes.items():
|
||||||
port=int(port)
|
port=int(port)
|
||||||
if restrict:
|
if restrict:
|
||||||
@ -205,21 +310,20 @@ class AcceptHandler:
|
|||||||
AcceptHandler(port, (host, remoteport), TcpHandler)
|
AcceptHandler(port, (host, remoteport), TcpHandler)
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
# logging.debug('select %r', list(cls.readers))
|
# log.debug('select %r', list(cls.readers))
|
||||||
ready, _, _ = select(cls.readers, [], [])
|
ready, _, _ = select(cls.readers, [], [])
|
||||||
# logging.debug('ready %r', ready)
|
# log.debug('ready %r', ready)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
for r in cls.readers:
|
for r in cls.readers:
|
||||||
try:
|
try:
|
||||||
select([r], [], [], 0.1)
|
select([r], [], [], 0.1)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.error('%r in select([%d])', e, r)
|
log.error('%r in select([%d])', e, r)
|
||||||
raise
|
raise
|
||||||
for fno in ready:
|
for fno in ready:
|
||||||
cls.readers[fno]()
|
cls.readers[fno]()
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
parser = ConfigParser()
|
parser = ConfigParser()
|
||||||
cfgfiles = glob('/root/aputools/servercfg/%s_*.cfg' % socket.gethostname())
|
cfgfiles = glob('/root/aputools/servercfg/%s_*.cfg' % socket.gethostname())
|
||||||
|
Reference in New Issue
Block a user