Files
boxtools/utils.py

229 lines
7.7 KiB
Python

import os
import socket
import threading
import re
from glob import glob
from configparser import ConfigParser
from netifaces import interfaces, ifaddresses, gateways, AF_INET, AF_LINK
from subprocess import Popen, PIPE
FIREWALL_CONF = '/etc/nftables.conf'
if os.geteuid():
def sudo(cmd):
os.system(f'sudo {cmd}')
else:
def sudo(cmd):
os.system(cmd)
class BoxInfo:
TOOLS = '/home/l_samenv/boxtools'
CFGPATH = f'{TOOLS}/cfg/%s_%06x.cfg'
BOX_TYPES = {
'00:0d:b9': 'apu', # bare apu or control box
'b8:27:eb': 'cm3', # iono pi
'd8:3a:dd': 'cm4', # dual-eth-rpi
}
def __init__(self):
self.id = None
self.typ = None
self.hostname = socket.gethostname()
self.change_if_names = False
self.cfgfile = None
self.config = None
self.macaddr = {}
self.main_if = None
for ifdev in sorted(glob('/sys/class/net/*/address')):
ifname = ifdev.split('/')[-2]
if ifname == 'lo': # do not consider loopback interface
continue
if ifname.startswith('enp'):
self.change_if_names = True
ifname = f'eth{int(ifname[3]) - 1}'
with open(ifdev) as f:
self.macaddr[ifname] = addr = f.read().strip().lower()
if ifname in ('eth0', 'enp1s0'):
self.id = int(''.join(addr.split(':')[-3:]), 16) & 0xffffff
self.typ = self.BOX_TYPES.get(addr[:8])
self.main_if = ifname
def get_macaddr(self):
return self.macaddr.get(self.main_if)
def read_config(self, section=None):
cfgfiles = glob(self.CFGPATH % ('*', self.id))
if len(cfgfiles) > 1:
raise ValueError('ambiguous cfgfile: %r' % cfgfiles)
if section and not cfgfiles:
raise ValueError('no cfg file found for %s' % self.id)
if cfgfiles:
self.cfgfile = cfgfiles[0]
else:
return {}
parser = ConfigParser()
parser.read(self.cfgfile)
if section:
if section in parser.sections():
return dict(parser[section])
return None
return {k: dict(parser[k]) for k in parser.sections()}
def gethostthread(ip, event, result):
try:
result[0] = socket.gethostbyaddr(ip)[0]
except Exception as e:
pass
event.set()
class MainIf:
address = None
ip = None
gateway = None
hostname = None
carrier = True
prev_ip = None
def __init__(self):
netcfg = BoxInfo().read_config('NETWORK')
for name, key in netcfg.items():
if key.startswith(('dhcp', 'wan')):
self.name = name
break
else:
# take first one (alphabetically)
self.name = sorted(netcfg)[0]
self.hostnameresult = [None]
with open(f'/sys/class/net/{self.name}/carrier_changes') as f:
self.carrier_changes = f.read()
self.poll()
def hostname(self):
return self.hostnameresult[0]
def poll(self):
with open(f'/sys/class/net/{self.name}/carrier') as f:
carrier = f.read().startswith('1')
with open(f'/sys/class/net/{self.name}/carrier_changes') as f:
carrier_changes = f.read()
if carrier_changes != self.carrier_changes:
self.hostnameresult = [None] # new instance for result
self.carrier_changes = carrier_changes
if carrier:
sudo(f'dhclient -r {self.name}')
sudo(f'dhclient {self.name}')
self.carrier = carrier
addrinfo = ifaddresses(self.name)
self.address = addrinfo.get(AF_LINK, [{}])[0].get('addr')
if carrier:
self.ip = addrinfo.get(AF_INET, [{}])[0].get('addr')
self.gateway = [g for g, n, _ in gateways().get(AF_INET, ()) if n == self.name]
else:
self.ip = None
self.gateway = ()
if self.carrier and self.ip and self.gateway:
if self.ip != self.prev_ip:
self.prev_ip = self.ip
self.event = event = threading.Event()
self.hostnameresult = list(self.hostnameresult) # new instance for result
threading.Thread(target=gethostthread,
args=(self.ip, event, self.hostnameresult), daemon=True).start()
event.wait(0.1)
# in case of timeout, the thread may change self.hostnameresult later,
# but only until carrier or ip changes
else:
self.prev_ip = None
return self.carrier, self.ip, self.hostnameresult[0], self.gateway
def unix_cmd(command, execute=None, stdout=PIPE):
if execute != False: # None or True
if execute:
print('$ %s' % command)
result = Popen(command.split(), stdout=stdout).communicate()[0]
return (result or b'').decode()
else:
print('> %s' % command)
def check_service(service, set_on=None, execute=None):
"""check or set state of systemd service
set_on is None or not given: query only
bool(set_on) is True: start and enable if not yet done
set_on == False/0: stop and disable if not yet done
sim: print out command instead of executing
"""
result = unix_cmd(f'systemctl show -p WantedBy -p ActiveState {service}')
enabled = False
active = False
for line in result.split('\n'):
if line.startswith('WantedBy=') and line.strip() != 'WantedBy=':
enabled = True
elif line.strip() == 'ActiveState=active':
active = True
if set_on:
if not active:
unix_cmd(f'systemctl start {service}', execute)
if not enabled:
unix_cmd(f'systemctl enable {service}', execute)
elif set_on is not None:
if active:
unix_cmd(f'systemctl stop {service}', execute)
if enabled:
unix_cmd(f'systemctl disable {service}', execute)
return active, enabled
def change_firewall(set_on, ports, execute=None):
ports.add(22) # always add ssh
active, enabled = check_service('nftables')
if not set_on:
if os.geteuid() == 0:
check_service('nftables', False, execute)
else:
print('need sudo rights to modify firewall')
return active or enabled
pattern = re.compile('(tcp dport ({.*}) ct state new accept)', re.MULTILINE | re.DOTALL)
for filename in FIREWALL_CONF, os.path.join(BoxInfo.TOOLS, 'nftables.conf'):
with open(filename) as f:
content = f.read()
try:
((prevline, prevports),) = pattern.findall(content)
break
except (TypeError, ValueError):
pass
else:
print(f'{FIREWALL_CONF} does not contain expected pattern for open ports - firewall off?')
return False
# parse previous port set
prevportset = {int(p) for p in prevports[1:-1].split(',')}
line = prevline.replace(prevports, '{ %s }' % ', '.join((str(p) for p in sorted(ports))))
if prevportset == ports:
if active and enabled:
return False
check_service('nftables', True, execute)
return True
if os.geteuid() == 0:
if execute is not None:
print(f'change firewall ports to {ports}')
if execute != 0:
with open('f{FIREWALL_CONF}.tmp', 'w') as f:
f.write(content.replace(prevline, line))
os.rename('f{FIREWALL_CONF}.tmp', FIREWALL_CONF)
unix_cmd('systemctl restart nftables', execute)
unix_cmd('systemctl enable nftables', execute)
elif ports - prevportset:
print('need sudo rights to modify firewall')
return True