292 lines
10 KiB
Python
292 lines
10 KiB
Python
import os
|
|
import socket
|
|
import threading
|
|
import re
|
|
from glob import glob
|
|
from pathlib import Path
|
|
from configparser import ConfigParser
|
|
from netifaces import interfaces, ifaddresses, gateways, AF_INET, AF_LINK
|
|
from subprocess import Popen, PIPE
|
|
|
|
|
|
FIREWALL_CONF = '/etc/nftables.conf'
|
|
|
|
|
|
if os.geteuid():
|
|
def sudo(cmd):
|
|
os.system(f'sudo {cmd}')
|
|
else:
|
|
def sudo(cmd):
|
|
os.system(cmd)
|
|
|
|
|
|
class UndefinedConfigFile(Exception):
|
|
"""config file not found or ambiguous"""
|
|
|
|
|
|
def convert_cfg(file, macaddr=None):
|
|
hostname, boxid = file.stem.split('_')
|
|
if macaddr is None:
|
|
macaddr = f'--:--:--:{boxid[0:2]}:{boxid[2:4]}:{boxid[4:6]}'
|
|
box = BoxInfo(macaddr, True)
|
|
config = box.read_config()
|
|
network = config.get('NETWORK', {})
|
|
if len(network) > 2:
|
|
mac1 = '00:0d:b9'
|
|
if config.get('DISPLAY'):
|
|
typ = 'control-box'
|
|
else:
|
|
typ = 'bare-apu'
|
|
elif len(network) == 2:
|
|
mac1 = 'd8:3a:dd'
|
|
typ = 'dual-eth-rpi'
|
|
else:
|
|
if config.get('BOX', {}).get('type') == 'ionopi':
|
|
mac1 = 'e4:5f:01'
|
|
typ = 'ionopi'
|
|
else:
|
|
mac1 = 'b8:27:eb'
|
|
typ = 'ionopimax'
|
|
macaddr = macaddr.replace('--:--:--', mac1)
|
|
with open(file) as f:
|
|
content = f.read()
|
|
newfile = file.parent / f'{hostname}.cfg'
|
|
with open(newfile, 'w') as f:
|
|
f.write(f'[BOX]\ntype={typ}\nMAC={macaddr}\n\n{content}')
|
|
if not file.name.endswith('_'):
|
|
file.rename(file.parent / (file.name + '_'))
|
|
return newfile
|
|
|
|
|
|
def convert_all():
|
|
for file in Path('cfg').glob('*_*.cfg*'):
|
|
convert_cfg(file)
|
|
|
|
|
|
class BoxInfo:
|
|
TOOLS = Path('/home/l_samenv/boxtools')
|
|
CFGDIR = TOOLS / 'cfg'
|
|
BOX_TYPES = {
|
|
'00:0d:b9': 'apu', # bare apu or control box
|
|
'b8:27:eb': 'cm3', # compute module 3 (guess iono pi max)
|
|
'e4:5f:01': 'rpi', # simple raspberry pi (guess ionopi)
|
|
'd8:3a:dd': 'cm4', # compute module 4 (guess dual-eth-rpi)
|
|
}
|
|
|
|
def __init__(self, macaddr=None, relcfg=None):
|
|
if relcfg:
|
|
self.CFGDIR = Path('cfg')
|
|
self.id = None
|
|
self.typ = None
|
|
self.hostname = socket.gethostname()
|
|
self.change_if_names = False
|
|
self.cfgfile = None
|
|
self.config = None
|
|
self.network_interfaces = {}
|
|
self.main_if = None
|
|
self.oldcfg = False
|
|
if macaddr is None:
|
|
for ifdev in sorted(glob('/sys/class/net/*/address')):
|
|
ifname = ifdev.split('/')[-2]
|
|
if ifname == 'lo': # do not consider loopback interface
|
|
continue
|
|
if ifname.startswith('enp'):
|
|
self.change_if_names = True
|
|
ifname = f'eth{int(ifname[3]) - 1}'
|
|
with open(ifdev) as f:
|
|
self.network_interfaces[ifname] = addr = f.read().strip().lower()
|
|
if ifname in ('eth0', 'enp1s0', 'end0'):
|
|
macaddr = addr
|
|
self.main_if = ifname
|
|
self.macaddr = macaddr
|
|
if macaddr:
|
|
self.id = int(''.join(macaddr.split(':')[-3:]), 16) & 0xffffff
|
|
self.typ = self.BOX_TYPES.get(macaddr[:8])
|
|
self.hwtype = self.typ # this is one of the values in BOX_TYPE and will not change
|
|
|
|
def read_config(self, section=None):
|
|
cfgfiles = []
|
|
for file in self.CFGDIR.glob('*.cfg'):
|
|
with open(file) as f:
|
|
for line in f:
|
|
if line.startswith(f'MAC={self.macaddr}'):
|
|
cfgfiles.append(file)
|
|
break
|
|
if not cfgfiles:
|
|
cfgfiles = list(self.CFGDIR.glob(f'*_{self.id:06x}.cfg*'))
|
|
self.oldcfg = bool(cfgfiles)
|
|
if len(cfgfiles) > 1:
|
|
raise AmbiguousConfigFile('ambiguous cfgfile: %r' % cfgfiles)
|
|
if section and not cfgfiles:
|
|
raise UndefinedConfigFile('no cfg file found for %s' % self.macaddr)
|
|
if cfgfiles:
|
|
self.cfgfile = cfgfiles[0]
|
|
else:
|
|
return {}
|
|
parser = ConfigParser()
|
|
parser.read(self.cfgfile)
|
|
if section:
|
|
if section in parser.sections():
|
|
return dict(parser[section])
|
|
return None
|
|
return {k: dict(parser[k]) for k in parser.sections()}
|
|
|
|
|
|
def gethostthread(ip, event, result):
|
|
try:
|
|
result[0] = socket.gethostbyaddr(ip)[0]
|
|
except Exception as e:
|
|
pass
|
|
event.set()
|
|
|
|
|
|
class MainIf:
|
|
address = None
|
|
ip = None
|
|
gateway = None
|
|
hostname = None
|
|
carrier = True
|
|
prev_ip = None
|
|
|
|
def __init__(self):
|
|
netcfg = BoxInfo().read_config('NETWORK')
|
|
for name, key in netcfg.items():
|
|
if key.startswith(('dhcp', 'wan')):
|
|
self.name = name
|
|
break
|
|
else:
|
|
# take first one (alphabetically)
|
|
self.name = sorted(netcfg)[0]
|
|
self.hostnameresult = [None]
|
|
with open(f'/sys/class/net/{self.name}/carrier_changes') as f:
|
|
self.carrier_changes = f.read()
|
|
self.poll()
|
|
|
|
def hostname(self):
|
|
return self.hostnameresult[0]
|
|
|
|
def poll(self):
|
|
with open(f'/sys/class/net/{self.name}/carrier') as f:
|
|
carrier = f.read().startswith('1')
|
|
with open(f'/sys/class/net/{self.name}/carrier_changes') as f:
|
|
carrier_changes = f.read()
|
|
if carrier_changes != self.carrier_changes:
|
|
self.hostnameresult = [None] # new instance for result
|
|
self.carrier_changes = carrier_changes
|
|
if carrier:
|
|
sudo(f'dhclient -r {self.name}')
|
|
sudo(f'dhclient {self.name}')
|
|
self.carrier = carrier
|
|
addrinfo = ifaddresses(self.name)
|
|
self.address = addrinfo.get(AF_LINK, [{}])[0].get('addr')
|
|
if carrier:
|
|
self.ip = addrinfo.get(AF_INET, [{}])[0].get('addr')
|
|
self.gateway = [g for g, n, _ in gateways().get(AF_INET, ()) if n == self.name]
|
|
else:
|
|
self.ip = None
|
|
self.gateway = ()
|
|
if self.carrier and self.ip and self.gateway:
|
|
if self.ip != self.prev_ip:
|
|
self.prev_ip = self.ip
|
|
self.event = event = threading.Event()
|
|
self.hostnameresult = list(self.hostnameresult) # new instance for result
|
|
threading.Thread(target=gethostthread,
|
|
args=(self.ip, event, self.hostnameresult), daemon=True).start()
|
|
event.wait(0.1)
|
|
# in case of timeout, the thread may change self.hostnameresult later,
|
|
# but only until carrier or ip changes
|
|
else:
|
|
self.prev_ip = None
|
|
return self.carrier, self.ip, self.hostnameresult[0], self.gateway
|
|
|
|
|
|
def unix_cmd(cmd, *args, execute=None, stdout=PIPE, sudo=True):
|
|
command = cmd.split() + list(args)
|
|
if command[0] == 'systemctl' and not sudo:
|
|
command.insert(1, '--user')
|
|
if sudo:
|
|
command.insert(0, 'sudo')
|
|
# sudo = ['sudo'] if sudo else []
|
|
if execute is not False: # None or True
|
|
if execute:
|
|
print('$', *command)
|
|
# result = Popen(sudo + command, stdout=stdout).communicate()[0]
|
|
result = Popen(command, stdout=stdout).communicate()[0]
|
|
return (result or b'').decode()
|
|
else:
|
|
print('>', *command)
|
|
|
|
|
|
def check_service(service, set_on=None, execute=None, as_root=True):
|
|
"""check or set state of systemd service
|
|
|
|
set_on is None or not given: query only
|
|
bool(set_on) is True: start and enable if not yet done
|
|
set_on == False/0: stop and disable if not yet done
|
|
|
|
sim: print out command instead of executing
|
|
"""
|
|
result = unix_cmd('systemctl show -p WantedBy -p ActiveState', service, sudo=as_root)
|
|
enabled = False
|
|
active = False
|
|
for line in result.split('\n'):
|
|
if line.startswith('WantedBy=') and line.strip() != 'WantedBy=':
|
|
enabled = True
|
|
elif line.strip() == 'ActiveState=active':
|
|
active = True
|
|
if set_on:
|
|
if not active:
|
|
unix_cmd('systemctl', 'start', service, execute=execute, sudo=as_root)
|
|
if not enabled:
|
|
unix_cmd('systemctl', 'enable', service, execute=execute, sudo=as_root)
|
|
elif set_on is not None:
|
|
if active:
|
|
unix_cmd('systemctl', 'stop', service, execute=execute, sudo=as_root)
|
|
if enabled:
|
|
unix_cmd('systemctl', 'disable', service, execute=execute, sudo=as_root)
|
|
return active, enabled
|
|
|
|
|
|
def change_firewall(set_on, ports, execute=None):
|
|
ports.add(22) # always add ssh
|
|
active, enabled = check_service('nftables')
|
|
if not set_on:
|
|
check_service('nftables', False, execute)
|
|
return active or enabled
|
|
|
|
pattern = re.compile('(tcp dport ({.*}) ct state new accept)', re.MULTILINE | re.DOTALL)
|
|
|
|
for filename in FIREWALL_CONF, BoxInfo.TOOLS / 'nftables.conf':
|
|
with open(filename) as f:
|
|
content = f.read()
|
|
|
|
try:
|
|
((prevline, prevports),) = pattern.findall(content)
|
|
break
|
|
except (TypeError, ValueError):
|
|
pass
|
|
else:
|
|
print(f'{FIREWALL_CONF} does not contain expected pattern for open ports - firewall off?')
|
|
return False
|
|
|
|
# parse previous port set
|
|
prevportset = {int(p) for p in prevports[1:-1].split(',')}
|
|
line = prevline.replace(prevports, '{ %s }' % ', '.join((str(p) for p in sorted(ports))))
|
|
if prevportset == ports:
|
|
if active and enabled:
|
|
return False
|
|
check_service('nftables', True, execute)
|
|
return True
|
|
if os.geteuid() == 0:
|
|
if execute is not None:
|
|
print(f'change firewall ports to {ports}')
|
|
if execute != 0:
|
|
with open('f{FIREWALL_CONF}.tmp', 'w') as f:
|
|
f.write(content.replace(prevline, line))
|
|
os.rename('f{FIREWALL_CONF}.tmp', FIREWALL_CONF)
|
|
unix_cmd('systemctl restart nftables', execute=execute)
|
|
unix_cmd('systemctl enable nftables', execute=execute)
|
|
elif ports - prevportset:
|
|
print('need sudo rights to modify firewall')
|
|
return True
|