Files
boxtools/utils.py
Markus Zolliker ab195cc90f allow predictable network interfaces
on dual-eth-raspi:
- eth0 -> end0
- eth1 -> enx...
2025-06-27 16:02:38 +02:00

292 lines
10 KiB
Python

import os
import socket
import threading
import re
from glob import glob
from pathlib import Path
from configparser import ConfigParser
from netifaces import interfaces, ifaddresses, gateways, AF_INET, AF_LINK
from subprocess import Popen, PIPE
FIREWALL_CONF = '/etc/nftables.conf'
if os.geteuid():
def sudo(cmd):
os.system(f'sudo {cmd}')
else:
def sudo(cmd):
os.system(cmd)
class UndefinedConfigFile(Exception):
"""config file not found or ambiguous"""
def convert_cfg(file, macaddr=None):
hostname, boxid = file.stem.split('_')
if macaddr is None:
macaddr = f'--:--:--:{boxid[0:2]}:{boxid[2:4]}:{boxid[4:6]}'
box = BoxInfo(macaddr, True)
config = box.read_config()
network = config.get('NETWORK', {})
if len(network) > 2:
mac1 = '00:0d:b9'
if config.get('DISPLAY'):
typ = 'control-box'
else:
typ = 'bare-apu'
elif len(network) == 2:
mac1 = 'd8:3a:dd'
typ = 'dual-eth-rpi'
else:
if config.get('BOX', {}).get('type') == 'ionopi':
mac1 = 'e4:5f:01'
typ = 'ionopi'
else:
mac1 = 'b8:27:eb'
typ = 'ionopimax'
macaddr = macaddr.replace('--:--:--', mac1)
with open(file) as f:
content = f.read()
newfile = file.parent / f'{hostname}.cfg'
with open(newfile, 'w') as f:
f.write(f'[BOX]\ntype={typ}\nMAC={macaddr}\n\n{content}')
if not file.name.endswith('_'):
file.rename(file.parent / (file.name + '_'))
return newfile
def convert_all():
for file in Path('cfg').glob('*_*.cfg*'):
convert_cfg(file)
class BoxInfo:
TOOLS = Path('/home/l_samenv/boxtools')
CFGDIR = TOOLS / 'cfg'
BOX_TYPES = {
'00:0d:b9': 'apu', # bare apu or control box
'b8:27:eb': 'cm3', # compute module 3 (guess iono pi max)
'e4:5f:01': 'rpi', # simple raspberry pi (guess ionopi)
'd8:3a:dd': 'cm4', # compute module 4 (guess dual-eth-rpi)
}
def __init__(self, macaddr=None, relcfg=None):
if relcfg:
self.CFGDIR = Path('cfg')
self.id = None
self.typ = None
self.hostname = socket.gethostname()
self.change_if_names = False
self.cfgfile = None
self.config = None
self.network_interfaces = {}
self.main_if = None
self.oldcfg = False
if macaddr is None:
for ifdev in sorted(glob('/sys/class/net/*/address')):
ifname = ifdev.split('/')[-2]
if ifname == 'lo': # do not consider loopback interface
continue
if ifname.startswith('enp'):
self.change_if_names = True
ifname = f'eth{int(ifname[3]) - 1}'
with open(ifdev) as f:
self.network_interfaces[ifname] = addr = f.read().strip().lower()
if ifname in ('eth0', 'enp1s0', 'end0'):
macaddr = addr
self.main_if = ifname
self.macaddr = macaddr
if macaddr:
self.id = int(''.join(macaddr.split(':')[-3:]), 16) & 0xffffff
self.typ = self.BOX_TYPES.get(macaddr[:8])
self.hwtype = self.typ # this is one of the values in BOX_TYPE and will not change
def read_config(self, section=None):
cfgfiles = []
for file in self.CFGDIR.glob('*.cfg'):
with open(file) as f:
for line in f:
if line.startswith(f'MAC={self.macaddr}'):
cfgfiles.append(file)
break
if not cfgfiles:
cfgfiles = list(self.CFGDIR.glob(f'*_{self.id:06x}.cfg*'))
self.oldcfg = bool(cfgfiles)
if len(cfgfiles) > 1:
raise AmbiguousConfigFile('ambiguous cfgfile: %r' % cfgfiles)
if section and not cfgfiles:
raise UndefinedConfigFile('no cfg file found for %s' % self.macaddr)
if cfgfiles:
self.cfgfile = cfgfiles[0]
else:
return {}
parser = ConfigParser()
parser.read(self.cfgfile)
if section:
if section in parser.sections():
return dict(parser[section])
return None
return {k: dict(parser[k]) for k in parser.sections()}
def gethostthread(ip, event, result):
try:
result[0] = socket.gethostbyaddr(ip)[0]
except Exception as e:
pass
event.set()
class MainIf:
address = None
ip = None
gateway = None
hostname = None
carrier = True
prev_ip = None
def __init__(self):
netcfg = BoxInfo().read_config('NETWORK')
for name, key in netcfg.items():
if key.startswith(('dhcp', 'wan')):
self.name = name
break
else:
# take first one (alphabetically)
self.name = sorted(netcfg)[0]
self.hostnameresult = [None]
with open(f'/sys/class/net/{self.name}/carrier_changes') as f:
self.carrier_changes = f.read()
self.poll()
def hostname(self):
return self.hostnameresult[0]
def poll(self):
with open(f'/sys/class/net/{self.name}/carrier') as f:
carrier = f.read().startswith('1')
with open(f'/sys/class/net/{self.name}/carrier_changes') as f:
carrier_changes = f.read()
if carrier_changes != self.carrier_changes:
self.hostnameresult = [None] # new instance for result
self.carrier_changes = carrier_changes
if carrier:
sudo(f'dhclient -r {self.name}')
sudo(f'dhclient {self.name}')
self.carrier = carrier
addrinfo = ifaddresses(self.name)
self.address = addrinfo.get(AF_LINK, [{}])[0].get('addr')
if carrier:
self.ip = addrinfo.get(AF_INET, [{}])[0].get('addr')
self.gateway = [g for g, n, _ in gateways().get(AF_INET, ()) if n == self.name]
else:
self.ip = None
self.gateway = ()
if self.carrier and self.ip and self.gateway:
if self.ip != self.prev_ip:
self.prev_ip = self.ip
self.event = event = threading.Event()
self.hostnameresult = list(self.hostnameresult) # new instance for result
threading.Thread(target=gethostthread,
args=(self.ip, event, self.hostnameresult), daemon=True).start()
event.wait(0.1)
# in case of timeout, the thread may change self.hostnameresult later,
# but only until carrier or ip changes
else:
self.prev_ip = None
return self.carrier, self.ip, self.hostnameresult[0], self.gateway
def unix_cmd(cmd, *args, execute=None, stdout=PIPE, sudo=True):
command = cmd.split() + list(args)
if command[0] == 'systemctl' and not sudo:
command.insert(1, '--user')
if sudo:
command.insert(0, 'sudo')
# sudo = ['sudo'] if sudo else []
if execute is not False: # None or True
if execute:
print('$', *command)
# result = Popen(sudo + command, stdout=stdout).communicate()[0]
result = Popen(command, stdout=stdout).communicate()[0]
return (result or b'').decode()
else:
print('>', *command)
def check_service(service, set_on=None, execute=None, as_root=True):
"""check or set state of systemd service
set_on is None or not given: query only
bool(set_on) is True: start and enable if not yet done
set_on == False/0: stop and disable if not yet done
sim: print out command instead of executing
"""
result = unix_cmd('systemctl show -p WantedBy -p ActiveState', service, sudo=as_root)
enabled = False
active = False
for line in result.split('\n'):
if line.startswith('WantedBy=') and line.strip() != 'WantedBy=':
enabled = True
elif line.strip() == 'ActiveState=active':
active = True
if set_on:
if not active:
unix_cmd('systemctl', 'start', service, execute=execute, sudo=as_root)
if not enabled:
unix_cmd('systemctl', 'enable', service, execute=execute, sudo=as_root)
elif set_on is not None:
if active:
unix_cmd('systemctl', 'stop', service, execute=execute, sudo=as_root)
if enabled:
unix_cmd('systemctl', 'disable', service, execute=execute, sudo=as_root)
return active, enabled
def change_firewall(set_on, ports, execute=None):
ports.add(22) # always add ssh
active, enabled = check_service('nftables')
if not set_on:
check_service('nftables', False, execute)
return active or enabled
pattern = re.compile('(tcp dport ({.*}) ct state new accept)', re.MULTILINE | re.DOTALL)
for filename in FIREWALL_CONF, BoxInfo.TOOLS / 'nftables.conf':
with open(filename) as f:
content = f.read()
try:
((prevline, prevports),) = pattern.findall(content)
break
except (TypeError, ValueError):
pass
else:
print(f'{FIREWALL_CONF} does not contain expected pattern for open ports - firewall off?')
return False
# parse previous port set
prevportset = {int(p) for p in prevports[1:-1].split(',')}
line = prevline.replace(prevports, '{ %s }' % ', '.join((str(p) for p in sorted(ports))))
if prevportset == ports:
if active and enabled:
return False
check_service('nftables', True, execute)
return True
if os.geteuid() == 0:
if execute is not None:
print(f'change firewall ports to {ports}')
if execute != 0:
with open('f{FIREWALL_CONF}.tmp', 'w') as f:
f.write(content.replace(prevline, line))
os.rename('f{FIREWALL_CONF}.tmp', FIREWALL_CONF)
unix_cmd('systemctl restart nftables', execute=execute)
unix_cmd('systemctl enable nftables', execute=execute)
elif ports - prevportset:
print('need sudo rights to modify firewall')
return True