import os import socket import threading from glob import glob from configparser import ConfigParser from netifaces import interfaces, ifaddresses, gateways, AF_INET, AF_LINK ifname_mapping = { 'eth0': 'enp1s0', 'eth1': 'enp2s0', 'eth2': 'enp3s0', 'eth3': 'enp4s0', } if os.geteuid(): def sudo(cmd): os.system(f'sudo {cmd}') else: def sudo(cmd): os.system(cmd) def get_config(section=None): """get content of box configuration :param section: if not None, return only given section :return: configuration as dict """ parser = ConfigParser() cfgfiles = glob('/home/l_samenv/boxtools/cfg/%s_*.cfg' % socket.gethostname()) if len(cfgfiles) != 1: raise ValueError('there must be one and only one single cfgfile %r' % cfgfiles) parser.read(cfgfiles[0]) try: result = {k: dict(parser[k]) for k in ([section] if section else parser.sections())} except KeyError: return {} network = result.get('NETWORK', {}) for name in list(network): network[ifname_mapping.get(name, name)] = network.pop(name) if section: return result[section] return result class MainIf: address = None ip = None gateway = None hostname = None carrier = True def __init__(self): netcfg = get_config('NETWORK') for name, key in netcfg.items(): if key.startswith(('dhcp', 'wan')): self.name = name break else: # take first one (alphabetically) self.name = sorted(netcfg)[0] self.getip() def gethostthread(self, ip, event): try: hostname = socket.gethostbyaddr(ip)[0] if event == self.event: self.hostname = hostname except Exception as e: pass event.set() def getip(self): with open(f'/sys/class/net/{self.name}/carrier') as f: carrier = f.read().startswith('1') if carrier != self.carrier: if carrier: sudo(f'dhclient {self.name}') else: sudo(f'dhclient -r {self.name}') self.carrier = carrier addrinfo = ifaddresses(self.name) self.address = addrinfo.get(AF_LINK, [{}])[0].get('addr') if carrier: self.ip = addrinfo.get(AF_INET, [{}])[0].get('addr') self.gateway = gateways().get('default', {}).get(AF_INET) if self.ip else None else: self.ip = None self.gateway = None self.hostname = None if self.carrier and self.ip and self.gateway: self.event = event = threading.Event() result = [] threading.Thread(target=self.gethostthread, args=(self.ip, event), daemon=True).start() event.wait(0.1) else: self.event = None # disable changing hostname from pending threads return self.ip