import os import socket import threading from glob import glob from configparser import ConfigParser from netifaces import interfaces, ifaddresses, gateways, AF_INET, AF_LINK if os.geteuid(): def sudo(cmd): os.system(f'sudo {cmd}') else: def sudo(cmd): os.system(cmd) class BoxInfo: TOOLS = '/home/l_samenv/boxtools' CFGPATH = f'{TOOLS}/cfg/%s_%06x.cfg' BOX_TYPES = { '00:0d:b9': 'apu', # bare apu or control box 'b8:27:eb': 'cm3', # iono pi 'd8:3a:dd': 'cm4', # dual-eth-rpi } def __init__(self): self.id = None self.typ = None self.hostname = socket.gethostname() self.change_if_names = False self.cfgfile = None self.config = None self.macaddr = {} self.main_if = None for ifdev in sorted(glob('/sys/class/net/*/address')): ifname = ifdev.split('/')[-2] if ifname == 'lo': # do not consider loopback interface continue if ifname.startswith('enp'): self.change_if_names = True ifname = f'eth{int(ifname[3]) - 1}' with open(ifdev) as f: self.macaddr[ifname] = addr = f.read().strip().lower() if ifname in ('eth0', 'enp1s0'): self.id = int(''.join(addr.split(':')[-3:]), 16) & 0xffffff self.typ = self.BOX_TYPES.get(addr[:8]) self.main_if = ifname def get_macaddr(self): return self.macaddr.get(self.main_if) def read_config(self, section=None): cfgfiles = glob(self.CFGPATH % ('*', self.id)) if len(cfgfiles) != 1: raise ValueError('there must be one and only one single cfgfile %r' % cfgfiles) if cfgfiles: self.cfgfile = cfgfiles[0] parser = ConfigParser() parser.read(self.cfgfile) if section: if section in parser.sections(): return dict(parser[section]) return None return {k: dict(parser[k]) for k in parser.sections()} def gethostthread(ip, event, result): try: result[0] = socket.gethostbyaddr(ip)[0] except Exception as e: pass event.set() class MainIf: address = None ip = None gateway = None hostname = None carrier = True prev_ip = None def __init__(self): netcfg = BoxInfo().read_config('NETWORK') for name, key in netcfg.items(): if key.startswith(('dhcp', 'wan')): self.name = name break else: # take first one (alphabetically) self.name = sorted(netcfg)[0] self.hostnameresult = [None] with open(f'/sys/class/net/{self.name}/carrier_changes') as f: self.carrier_changes = f.read() self.poll() def hostname(self): return self.hostnameresult[0] def poll(self): with open(f'/sys/class/net/{self.name}/carrier') as f: carrier = f.read().startswith('1') with open(f'/sys/class/net/{self.name}/carrier_changes') as f: carrier_changes = f.read() if carrier_changes != self.carrier_changes: self.hostnameresult = [None] # new instance for result self.carrier_changes = carrier_changes if carrier: sudo(f'dhclient -r {self.name}') sudo(f'dhclient {self.name}') self.carrier = carrier addrinfo = ifaddresses(self.name) self.address = addrinfo.get(AF_LINK, [{}])[0].get('addr') if carrier: self.ip = addrinfo.get(AF_INET, [{}])[0].get('addr') self.gateway = [g for g, n, _ in gateways().get(AF_INET, ()) if n == self.name] else: self.ip = None self.gateway = () if self.carrier and self.ip and self.gateway: if self.ip != self.prev_ip: self.prev_ip = self.ip self.event = event = threading.Event() self.hostnameresult = list(self.hostnameresult) # new instance for result threading.Thread(target=gethostthread, args=(self.ip, event, self.hostnameresult), daemon=True).start() event.wait(0.1) # in case of timeout, the thread may change self.hostnameresult later, # but only until carrier or ip changes else: self.prev_ip = None return self.carrier, self.ip, self.hostnameresult[0], self.gateway