Files
boxtools/utils.py
l_samenv d6e68aef7d utils.MainIf: rename 'getip' to 'poll'
+ change mechanism for getting hostname
2024-03-06 15:23:36 +01:00

114 lines
3.6 KiB
Python

import os
import socket
import threading
from glob import glob
from configparser import ConfigParser
from netifaces import interfaces, ifaddresses, gateways, AF_INET, AF_LINK
ifname_mapping = {
'eth0': 'enp1s0',
'eth1': 'enp2s0',
'eth2': 'enp3s0',
'eth3': 'enp4s0',
}
if os.geteuid():
def sudo(cmd):
os.system(f'sudo {cmd}')
else:
def sudo(cmd):
os.system(cmd)
def get_config(section=None):
"""get content of box configuration
:param section: if not None, return only given section
:return: configuration as dict
"""
parser = ConfigParser()
cfgfiles = glob('/home/l_samenv/boxtools/cfg/%s_*.cfg' % socket.gethostname())
if len(cfgfiles) != 1:
raise ValueError('there must be one and only one single cfgfile %r' % cfgfiles)
parser.read(cfgfiles[0])
try:
result = {k: dict(parser[k]) for k in ([section] if section else parser.sections())}
except KeyError:
return {}
network = result.get('NETWORK', {})
for name in list(network):
network[ifname_mapping.get(name, name)] = network.pop(name)
if section:
return result[section]
return result
def gethostthread(ip, event, result):
try:
result[0] = socket.gethostbyaddr(ip)[0]
except Exception as e:
pass
event.set()
class MainIf:
address = None
ip = None
gateway = None
hostname = None
carrier = True
prev_ip = None
def __init__(self):
netcfg = get_config('NETWORK')
for name, key in netcfg.items():
if key.startswith(('dhcp', 'wan')):
self.name = name
break
else:
# take first one (alphabetically)
self.name = sorted(netcfg)[0]
self.hostnameresult = [None]
with open(f'/sys/class/net/{self.name}/carrier_changes') as f:
self.carrier_changes = f.read()
self.poll()
def hostname(self):
return self.hostnameresult[0]
def poll(self):
with open(f'/sys/class/net/{self.name}/carrier') as f:
carrier = f.read().startswith('1')
with open(f'/sys/class/net/{self.name}/carrier_changes') as f:
carrier_changes = f.read()
if carrier_changes != self.carrier_changes:
self.hostnameresult = [None] # new instance for result
self.carrier_changes = carrier_changes
if carrier:
sudo(f'dhclient -r {self.name}')
sudo(f'dhclient {self.name}')
self.carrier = carrier
addrinfo = ifaddresses(self.name)
self.address = addrinfo.get(AF_LINK, [{}])[0].get('addr')
if carrier:
self.ip = addrinfo.get(AF_INET, [{}])[0].get('addr')
self.gateway = [g for g, n, _ in gateways().get(AF_INET, ()) if n == self.name]
else:
self.ip = None
self.gateway = ()
if self.carrier and self.ip and self.gateway:
if self.ip != self.prev_ip:
self.prev_ip = self.ip
self.event = event = threading.Event()
self.hostnameresult = list(self.hostnameresult) # new instance for result
threading.Thread(target=gethostthread,
args=(self.ip, event, self.hostnameresult), daemon=True).start()
event.wait(0.1)
# in case of timeout, the thread may change self.hostnameresult later,
# but only until carrier or ip changes
else:
self.prev_ip = None
return self.carrier, self.ip, self.hostnameresult[0], self.gateway