import os import socket import threading import re from glob import glob from pathlib import Path from configparser import ConfigParser from netifaces import interfaces, ifaddresses, gateways, AF_INET, AF_LINK from subprocess import Popen, PIPE FIREWALL_CONF = '/etc/nftables.conf' if os.geteuid(): def sudo(cmd): os.system(f'sudo {cmd}') else: def sudo(cmd): os.system(cmd) class UndefinedConfigFile(Exception): """config file not found or ambiguous""" def convert_cfg(file, macaddr=None): hostname, boxid = file.stem.split('_') if macaddr is None: macaddr = f'--:--:--:{boxid[0:2]}:{boxid[2:4]}:{boxid[4:6]}' box = BoxInfo(macaddr, True) config = box.read_config() network = config.get('NETWORK', {}) if len(network) > 2: mac1 = '00:0d:b9' if config.get('DISPLAY'): typ = 'control-box' else: typ = 'bare-apu' elif len(network) == 2: mac1 = 'd8:3a:dd' typ = 'dual-eth-rpi' else: if config.get('BOX', {}).get('type') == 'ionopi': mac1 = 'e4:5f:01' typ = 'ionopi' else: mac1 = 'b8:27:eb' typ = 'ionopimax' macaddr = macaddr.replace('--:--:--', mac1) with open(file) as f: content = f.read() newfile = file.parent / f'{hostname}.cfg' with open(newfile, 'w') as f: f.write(f'[BOX]\ntype={typ}\nMAC={macaddr}\n\n{content}') if not file.name.endswith('_'): file.rename(file.parent / (file.name + '_')) return newfile def convert_all(): for file in Path('cfg').glob('*_*.cfg*'): convert_cfg(file) class BoxInfo: TOOLS = Path('/home/l_samenv/boxtools') CFGDIR = TOOLS / 'cfg' BOX_TYPES = { '00:0d:b9': 'apu', # bare apu or control box 'b8:27:eb': 'cm3', # compute module 3 (guess iono pi max) 'e4:5f:01': 'rpi', # simple raspberry pi (guess ionopi) 'd8:3a:dd': 'cm4', # compute module 4 (guess dual-eth-rpi) } def __init__(self, macaddr=None, relcfg=None): if relcfg: self.CFGDIR = Path('cfg') self.id = None self.typ = None self.hostname = socket.gethostname() self.change_if_names = False self.cfgfile = None self.config = None self.network_interfaces = {} self.main_if = None self.oldcfg = False if macaddr is None: for ifdev in sorted(glob('/sys/class/net/*/address')): ifname = ifdev.split('/')[-2] if ifname == 'lo': # do not consider loopback interface continue if ifname.startswith('enp'): self.change_if_names = True ifname = f'eth{int(ifname[3]) - 1}' with open(ifdev) as f: self.network_interfaces[ifname] = addr = f.read().strip().lower() if ifname in ('eth0', 'enp1s0', 'end0'): macaddr = addr self.main_if = ifname self.macaddr = macaddr if macaddr: self.id = int(''.join(macaddr.split(':')[-3:]), 16) & 0xffffff self.typ = self.BOX_TYPES.get(macaddr[:8]) self.hwtype = self.typ # this is one of the values in BOX_TYPE and will not change def read_config(self, section=None): cfgfiles = [] for file in self.CFGDIR.glob('*.cfg'): with open(file) as f: for line in f: if line.startswith(f'MAC={self.macaddr}'): cfgfiles.append(file) break if not cfgfiles: cfgfiles = list(self.CFGDIR.glob(f'*_{self.id:06x}.cfg*')) self.oldcfg = bool(cfgfiles) if len(cfgfiles) > 1: raise AmbiguousConfigFile('ambiguous cfgfile: %r' % cfgfiles) if section and not cfgfiles: raise UndefinedConfigFile('no cfg file found for %s' % self.macaddr) if cfgfiles: self.cfgfile = cfgfiles[0] else: return {} parser = ConfigParser() parser.read(self.cfgfile) if section: if section in parser.sections(): return dict(parser[section]) return None return {k: dict(parser[k]) for k in parser.sections()} def gethostthread(ip, event, result): try: result[0] = socket.gethostbyaddr(ip)[0] except Exception as e: pass event.set() class MainIf: address = None ip = None gateway = None hostname = None carrier = True prev_ip = None def __init__(self): netcfg = BoxInfo().read_config('NETWORK') for name, key in netcfg.items(): if key.startswith(('dhcp', 'wan')): self.name = name break else: # take first one (alphabetically) self.name = sorted(netcfg)[0] self.hostnameresult = [None] with open(f'/sys/class/net/{self.name}/carrier_changes') as f: self.carrier_changes = f.read() self.poll() def hostname(self): return self.hostnameresult[0] def poll(self): with open(f'/sys/class/net/{self.name}/carrier') as f: carrier = f.read().startswith('1') with open(f'/sys/class/net/{self.name}/carrier_changes') as f: carrier_changes = f.read() if carrier_changes != self.carrier_changes: self.hostnameresult = [None] # new instance for result self.carrier_changes = carrier_changes if carrier: sudo(f'dhclient -r {self.name}') sudo(f'dhclient {self.name}') self.carrier = carrier addrinfo = ifaddresses(self.name) self.address = addrinfo.get(AF_LINK, [{}])[0].get('addr') if carrier: self.ip = addrinfo.get(AF_INET, [{}])[0].get('addr') self.gateway = [g for g, n, _ in gateways().get(AF_INET, ()) if n == self.name] else: self.ip = None self.gateway = () if self.carrier and self.ip and self.gateway: if self.ip != self.prev_ip: self.prev_ip = self.ip self.event = event = threading.Event() self.hostnameresult = list(self.hostnameresult) # new instance for result threading.Thread(target=gethostthread, args=(self.ip, event, self.hostnameresult), daemon=True).start() event.wait(0.1) # in case of timeout, the thread may change self.hostnameresult later, # but only until carrier or ip changes else: self.prev_ip = None return self.carrier, self.ip, self.hostnameresult[0], self.gateway def unix_cmd(cmd, *args, execute=None, stdout=PIPE, sudo=True): command = cmd.split() + list(args) if command[0] == 'systemctl' and not sudo: command.insert(1, '--user') if sudo: command.insert(0, 'sudo') # sudo = ['sudo'] if sudo else [] if execute is not False: # None or True if execute: print('$', *command) # result = Popen(sudo + command, stdout=stdout).communicate()[0] result = Popen(command, stdout=stdout).communicate()[0] return (result or b'').decode() else: print('>', *command) def check_service(service, set_on=None, execute=None, as_root=True): """check or set state of systemd service set_on is None or not given: query only bool(set_on) is True: start and enable if not yet done set_on == False/0: stop and disable if not yet done sim: print out command instead of executing """ result = unix_cmd('systemctl show -p WantedBy -p ActiveState', service, sudo=as_root) enabled = False active = False for line in result.split('\n'): if line.startswith('WantedBy=') and line.strip() != 'WantedBy=': enabled = True elif line.strip() == 'ActiveState=active': active = True if set_on: if not active: unix_cmd('systemctl', 'start', service, execute=execute, sudo=as_root) if not enabled: unix_cmd('systemctl', 'enable', service, execute=execute, sudo=as_root) elif set_on is not None: if active: unix_cmd('systemctl', 'stop', service, execute=execute, sudo=as_root) if enabled: unix_cmd('systemctl', 'disable', service, execute=execute, sudo=as_root) return active, enabled def change_firewall(set_on, ports, execute=None): ports.add(22) # always add ssh active, enabled = check_service('nftables') if not set_on: check_service('nftables', False, execute) return active or enabled pattern = re.compile('(tcp dport ({.*}) ct state new accept)', re.MULTILINE | re.DOTALL) for filename in FIREWALL_CONF, BoxInfo.TOOLS / 'nftables.conf': with open(filename) as f: content = f.read() try: ((prevline, prevports),) = pattern.findall(content) break except (TypeError, ValueError): pass else: print(f'{FIREWALL_CONF} does not contain expected pattern for open ports - firewall off?') return False # parse previous port set prevportset = {int(p) for p in prevports[1:-1].split(',')} line = prevline.replace(prevports, '{ %s }' % ', '.join((str(p) for p in sorted(ports)))) if prevportset == ports: if active and enabled: return False check_service('nftables', True, execute) return True if os.geteuid() == 0: if execute is not None: print(f'change firewall ports to {ports}') if execute != 0: with open('f{FIREWALL_CONF}.tmp', 'w') as f: f.write(content.replace(prevline, line)) os.rename('f{FIREWALL_CONF}.tmp', FIREWALL_CONF) unix_cmd('systemctl restart nftables', execute=execute) unix_cmd('systemctl enable nftables', execute=execute) elif ports - prevportset: print('need sudo rights to modify firewall') return True