130 lines
4.3 KiB
Python
130 lines
4.3 KiB
Python
import os
|
|
import socket
|
|
import threading
|
|
from glob import glob
|
|
from configparser import ConfigParser
|
|
from netifaces import interfaces, ifaddresses, gateways, AF_INET, AF_LINK
|
|
|
|
|
|
if os.geteuid():
|
|
def sudo(cmd):
|
|
os.system(f'sudo {cmd}')
|
|
else:
|
|
def sudo(cmd):
|
|
os.system(cmd)
|
|
|
|
|
|
class BoxInfo:
|
|
TOOLS = '/home/l_samenv/boxtools'
|
|
CFGPATH = f'{TOOLS}/cfg/%s_%06x.cfg'
|
|
BOX_TYPES = {
|
|
'00:0d:b9': 'apu', # bare apu or control box
|
|
'b8:27:eb': 'cm3', # iono pi
|
|
'd8:3a:dd': 'cm4', # dual-eth-rpi
|
|
}
|
|
|
|
def __init__(self):
|
|
self.id = None
|
|
self.typ = None
|
|
self.hostname = socket.gethostname()
|
|
self.change_if_names = False
|
|
self.cfgfile = None
|
|
self.config = None
|
|
self.macaddr = {}
|
|
self.main_if = None
|
|
for ifdev in glob('/sys/class/net/*/address'):
|
|
ifname = ifdev.split('/')[-2]
|
|
if ifname == 'lo': # do not consider loopback interface
|
|
continue
|
|
if ifname.startswith('enp'):
|
|
self.change_if_names = True
|
|
ifname = f'eth{int(ifname[3]) - 1}'
|
|
print(ifname)
|
|
with open(ifdev) as f:
|
|
self.macaddr[ifname] = addr = f.read().strip().lower()
|
|
if ifname in ('eth0', 'enp1s0'):
|
|
self.id = int(''.join(addr.split(':')[-3:]), 16) & 0xffffff
|
|
self.typ = self.BOX_TYPES.get(addr[:8])
|
|
self.main_if = ifname
|
|
|
|
def get_macaddr(self):
|
|
return self.macaddr.get(self.main_if)
|
|
|
|
def read_config(self):
|
|
cfgfiles = glob(self.CFGPATH % ('*', self.id))
|
|
if len(cfgfiles) != 1:
|
|
raise ValueError('there must be one and only one single cfgfile %r' % cfgfiles)
|
|
if cfgfiles:
|
|
self.cfgfile = cfgfiles[0]
|
|
parser = ConfigParser()
|
|
parser.read(self.cfgfile)
|
|
return {k: dict(parser[k]) for k in parser.sections()}
|
|
|
|
|
|
def gethostthread(ip, event, result):
|
|
try:
|
|
result[0] = socket.gethostbyaddr(ip)[0]
|
|
except Exception as e:
|
|
pass
|
|
event.set()
|
|
|
|
|
|
class MainIf:
|
|
address = None
|
|
ip = None
|
|
gateway = None
|
|
hostname = None
|
|
carrier = True
|
|
prev_ip = None
|
|
|
|
def __init__(self):
|
|
netcfg = get_config('NETWORK')
|
|
for name, key in netcfg.items():
|
|
if key.startswith(('dhcp', 'wan')):
|
|
self.name = name
|
|
break
|
|
else:
|
|
# take first one (alphabetically)
|
|
self.name = sorted(netcfg)[0]
|
|
self.hostnameresult = [None]
|
|
with open(f'/sys/class/net/{self.name}/carrier_changes') as f:
|
|
self.carrier_changes = f.read()
|
|
self.poll()
|
|
|
|
def hostname(self):
|
|
return self.hostnameresult[0]
|
|
|
|
def poll(self):
|
|
with open(f'/sys/class/net/{self.name}/carrier') as f:
|
|
carrier = f.read().startswith('1')
|
|
with open(f'/sys/class/net/{self.name}/carrier_changes') as f:
|
|
carrier_changes = f.read()
|
|
if carrier_changes != self.carrier_changes:
|
|
self.hostnameresult = [None] # new instance for result
|
|
self.carrier_changes = carrier_changes
|
|
if carrier:
|
|
sudo(f'dhclient -r {self.name}')
|
|
sudo(f'dhclient {self.name}')
|
|
self.carrier = carrier
|
|
addrinfo = ifaddresses(self.name)
|
|
self.address = addrinfo.get(AF_LINK, [{}])[0].get('addr')
|
|
if carrier:
|
|
self.ip = addrinfo.get(AF_INET, [{}])[0].get('addr')
|
|
self.gateway = [g for g, n, _ in gateways().get(AF_INET, ()) if n == self.name]
|
|
else:
|
|
self.ip = None
|
|
self.gateway = ()
|
|
if self.carrier and self.ip and self.gateway:
|
|
if self.ip != self.prev_ip:
|
|
self.prev_ip = self.ip
|
|
self.event = event = threading.Event()
|
|
self.hostnameresult = list(self.hostnameresult) # new instance for result
|
|
threading.Thread(target=gethostthread,
|
|
args=(self.ip, event, self.hostnameresult), daemon=True).start()
|
|
event.wait(0.1)
|
|
# in case of timeout, the thread may change self.hostnameresult later,
|
|
# but only until carrier or ip changes
|
|
else:
|
|
self.prev_ip = None
|
|
return self.carrier, self.ip, self.hostnameresult[0], self.gateway
|