split out config file reading

+ improve ip and no 'network' on display
This commit is contained in:
2024-03-06 11:48:41 +01:00
parent c3a36879cb
commit ddedf67bf3
6 changed files with 152 additions and 77 deletions

View File

@ -5,6 +5,9 @@ eth1=192.168.1.1
eth2=192.168.2.2 eth2=192.168.2.2
eth3=192.168.3.3 eth3=192.168.3.3
[ROUTER]
8080:192.168.127.254:80
[DISPLAY] [DISPLAY]
; please refer to README.md for help ; please refer to README.md for help
startup_text=startup...|HOST|ADDR startup_text=startup...|HOST|ADDR

View File

@ -4,10 +4,16 @@ import time
import serial import serial
import socket import socket
import threading import threading
import re
import queue
from utils import MainIf
from subprocess import Popen, PIPE
from configparser import ConfigParser
# display tty device # display tty device
tty = '/dev/ttyS1' tty = '/dev/ttyS1'
STARTUP_TEXT = '/home/l_samenv/boxtools/startup_display.txt' TOOLS = '/home/l_samenv/boxtools'
STARTUP_TEXT = f'{TOOLS}/startup_display.txt'
ESC = 0x1b ESC = 0x1b
ESCESC = bytes((ESC, ESC)) ESCESC = bytes((ESC, ESC))
@ -46,6 +52,9 @@ WIDTH = 480
MAGIC = b'\xcb\xef\x20\x18' MAGIC = b'\xcb\xef\x20\x18'
mainif = MainIf()
def xy(x, y): def xy(x, y):
x = min(480, int(x)) x = min(480, int(x))
return bytes([(min(127, y + TOPGAP) << 1) + (x >> 8), x % 256]) return bytes([(min(127, y + TOPGAP) << 1) + (x >> 8), x % 256])
@ -61,9 +70,10 @@ class Display:
storage = None storage = None
def __init__(self, dev, timeout=0.5, daemon=False): def __init__(self, dev, timeout=0.5, daemon=False):
self.event = threading.Event() # self.event = threading.Event()
self.term = serial.Serial(dev, baudrate=115200, timeout=timeout) self.term = serial.Serial(dev, baudrate=115200, timeout=timeout)
self.storage = bytearray() self.storage = bytearray()
self.hostname = socket.gethostname()
if daemon: if daemon:
todo_file = STARTUP_TEXT + '.todo' todo_file = STARTUP_TEXT + '.todo'
try: try:
@ -82,11 +92,8 @@ class Display:
self.storage = None self.storage = None
else: else:
os.system('systemctl stop display') os.system('systemctl stop display')
self.gethost(False)
self.reset() self.reset()
if daemon: if daemon:
threading.Thread(target=self.gethostthread, daemon=True).start()
self.event.wait(1) # wait for thread to be started
self.net_display(None) self.net_display(None)
def reset(self): def reset(self):
@ -159,34 +166,10 @@ class Display:
if row < 3: if row < 3:
self.text(line, row) self.text(line, row)
def gethost(self, truehostname):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
s.connect(('8.8.8.8', 80)) # 8.8.8.8: google DNS
self.hostip = s.getsockname()[0]
except Exception as e:
self.hostip = ''
hostname = None
if self.hostip and truehostname:
try:
hostname = socket.gethostbyaddr(self.hostip)[0]
except Exception:
pass
self.hostname = hostname or socket.gethostname()
def gethostthread(self):
self.gethost(True)
self.event.set()
time.sleep(1)
while True:
self.event.clear()
self.gethost(True)
self.event.wait(1)
def set_touch(self, on): def set_touch(self, on):
self.send(TOUCH_MODE, on) self.send(TOUCH_MODE, on)
self.touch = on self.touch = on
def gettouch(self): def gettouch(self):
if not self.touch: if not self.touch:
self.set_touch(1) self.set_touch(1)
@ -207,44 +190,43 @@ class Display:
return x return x
print('skipped', data) print('skipped', data)
def menu_reboot(self, x): def menu_reboot(self, xtouch):
if x is None: if xtouch is None:
return return
if x < 120: if xtouch < 120:
self.show('reboot ...') self.show('reboot ...')
os.system('reboot now') os.system('reboot now')
else: else:
self.std_display() self.std_display()
def menu_shutdown(self, x): def menu_shutdown(self, xtouch):
if x is None: if xtouch is None:
return return
if x < 120: if xtouch < 120:
self.show('shutdown ...') self.show('shutdown ...')
os.system('shutdown now') os.system('shutdown now')
else: else:
self.std_display() self.std_display()
def menu_main(self, x): def menu_main(self, xtouch):
if x is None: if xtouch is None:
return return
print(x) if xtouch < 160:
if x < 160:
self.std_display() self.std_display()
elif x < 320: elif xtouch < 320:
self.menu = self.menu_reboot self.menu = self.menu_reboot
self.show('reboot?', '( OK ) (cancel)') self.show('reboot?', '( OK ) (cancel)')
else: else:
self.menu = self.menu_shutdown self.menu = self.menu_shutdown
self.show('shutdown?', '( OK ) (cancel)') self.show('shutdown?', '( OK ) (cancel)')
def std_display(self): def std_display(self):
self.menu = self.net_display self.menu = self.net_display
self.net_display(None) self.net_display(None)
def net_display(self, x): def net_display(self, xtouch):
if x is None: if xtouch is None:
hostip = self.hostip or 'no network' hostip = mainif.ip or 'no network'
dotpos = hostip.find('.') dotpos = hostip.find('.')
if dotpos < 0: if dotpos < 0:
dotpos = len(hostip) dotpos = len(hostip)
@ -252,14 +234,15 @@ class Display:
self.send(SET_COLOR, 0, 0, 0, 11 + self.blink) # yellow / blue self.send(SET_COLOR, 0, 0, 0, 11 + self.blink) # yellow / blue
self.text('.', 0, dotpos, dotpos+1) self.text('.', 0, dotpos, dotpos+1)
self.color() self.color()
self.text(self.hostname, 1) self.text(mainif.hostname or self.hostname, 1)
self.event.set() # self.event.set()
self.blink = not self.blink self.blink = not self.blink
else: else:
self.menu = self.menu_main self.menu = self.menu_main
self.show('(cancel)(reboot)(shdown)') self.show('(cancel)(reboot)(shdown)')
def refresh(self): def refresh(self):
mainif.getip()
func = self.menu or self.net_display func = self.menu or self.net_display
func(self.gettouch()) func(self.gettouch())

View File

@ -20,6 +20,7 @@ from glob import glob
from ipaddress import IPv4Interface from ipaddress import IPv4Interface
from configparser import ConfigParser from configparser import ConfigParser
from os.path import join, getmtime, exists, basename from os.path import join, getmtime, exists, basename
from utils import get_config
if os.geteuid() != 0: if os.geteuid() != 0:
exit("You need to have root privileges to run this script.\nPlease try again, this time using 'sudo'. Exiting.") exit("You need to have root privileges to run this script.\nPlease try again, this time using 'sudo'. Exiting.")
@ -102,12 +103,6 @@ ExecStart = /usr/bin/python3 {TOOLS}/display.py -d
WantedBy = multi-user.target WantedBy = multi-user.target
""" """
ifname_mapping = {
'eth0': 'enp1s0',
'eth1': 'enp2s0',
'eth2': 'enp3s0',
'eth3': 'enp4s0',
}
pip_requirements = { pip_requirements = {
'root': {}, 'root': {},
@ -300,7 +295,8 @@ def create_if(name, cfg):
addr = str(cfgip.ip) addr = str(cfgip.ip)
# result['NETMASK'] = network.netmask # result['NETMASK'] = network.netmask
result = f"allow-hotplug {name}\niface {name} inet static\n address {addr}/{network.prefixlen}" result = f"allow-hotplug {name}\niface {name} inet static\n address {addr}/{network.prefixlen}"
return result + '\n' if result:
return result + '\n'
def walk(action): def walk(action):
@ -445,12 +441,11 @@ def handle_config():
return False return False
to_start = {} to_start = {}
ifname = '' ifname = ''
parser = ConfigParser() config = get_config()
try: try:
parser.read(cfgfile) netcfg = config['NETWORK']
network = {ifname_mapping.get(k, k): v for k, v in parser['NETWORK'].items()}
for ifname in net_addr: for ifname in net_addr:
content = create_if(ifname, network.pop(ifname, 'off')) content = create_if(ifname, netcfg.get(ifname, 'off'))
# content = '\n'.join('%s=%s' % kv for kv in content.items()) # content = '\n'.join('%s=%s' % kv for kv in content.items())
todo = write_when_new(f'/etc/network/interfaces.d/{ifname}', content) todo = write_when_new(f'/etc/network/interfaces.d/{ifname}', content)
if todo and not more_info: if todo and not more_info:
@ -481,12 +476,11 @@ def handle_config():
unix_cmd('systemctl disable isc-dhcp-server') unix_cmd('systemctl disable isc-dhcp-server')
content = [] content = []
dirty = False dirty = False
for section in parser.sections(): for section, section_dict in config.items():
if COMMENT not in parser[section]: if COMMENT not in section_dict:
dirty = True dirty = True
content.append('[%s]' % section) content.append('[%s]' % section)
content.append(COMMENT) content.append(COMMENT)
section_dict = dict(parser[section].items())
if section == 'DISPLAY': if section == 'DISPLAY':
if display_update(section_dict): if display_update(section_dict):
to_start['display'] = 'restart' to_start['display'] = 'restart'
@ -494,8 +488,10 @@ def handle_config():
content.append('%s=%s' % (key, value)) content.append('%s=%s' % (key, value))
content.append('') content.append('')
if dirty: if dirty:
with open(cfgfile, 'w') as f: print(cfgfile)
f.write('\n'.join(content)) print('\n'.join(content))
#with open(cfgfile, 'w') as f:
# f.write('\n'.join(content))
except Exception as e: except Exception as e:
print('ERROR: can not handle %s %s: %r' % (main_info['hostname'], ifname, e)) print('ERROR: can not handle %s %s: %r' % (main_info['hostname'], ifname, e))
raise raise
@ -504,10 +500,8 @@ def handle_config():
reload_systemd = False reload_systemd = False
for service, service_func in SERVICES.items(): for service, service_func in SERVICES.items():
section = service.upper() section = service.upper()
if parser.has_section(section): section_dict = config.get(section, {})
servicecfg = service_func(**dict(parser[section])) servicecfg = service_func(**section_dict)
else:
servicecfg = service_func() # allow to handle missing service
result = unix_cmd('systemctl show -p WantedBy -p ActiveState %s' % service, True) result = unix_cmd('systemctl show -p WantedBy -p ActiveState %s' % service, True)
active = False active = False
enabled = False enabled = False
@ -555,9 +549,9 @@ def handle_config():
unix_cmd('systemctl restart %s' % service) unix_cmd('systemctl restart %s' % service)
unix_cmd('systemctl enable %s' % service) unix_cmd('systemctl enable %s' % service)
result = [f'config file:\n {cfgfile}'] result = [f'config file:\n {cfgfile}']
for section in parser.sections(): for section, section_dict in config.items():
result.append(section) result.append(section)
for key, value in parser.items(section): for key, value in section_dict.items():
result.append(f' {key} = {value}') result.append(f' {key} = {value}')
result.append('') result.append('')
return '\n'.join(result) return '\n'.join(result)

View File

@ -9,7 +9,7 @@ from glob import glob
from select import select from select import select
from serial import serial_for_url from serial import serial_for_url
from subprocess import Popen, PIPE, check_output, call, DEVNULL from subprocess import Popen, PIPE, check_output, call, DEVNULL
from configparser import ConfigParser from utils import get_config
FIREWALL_CONF = '/etc/nftables.conf' FIREWALL_CONF = '/etc/nftables.conf'
@ -477,10 +477,6 @@ class Service:
if __name__ == '__main__': if __name__ == '__main__':
parser = ConfigParser() routercfg = get_config('ROUTER')
cfgfiles = glob('/home/l_samenv/boxtools/cfg/%s_*.cfg' % socket.gethostname()) if routercfg:
if len(cfgfiles) != 1: Service.run(routercfg)
raise ValueError('there must be one and only one single cfgfile %r' % cfgfiles)
parser.read(cfgfiles[0])
if parser.has_section('ROUTER'):
Service.run(parser['ROUTER'])

View File

@ -13,3 +13,4 @@ else
echo "hostname $HOSTNAME" echo "hostname $HOSTNAME"
fi fi
echo $HOSTNAME > /etc/hostname echo $HOSTNAME > /etc/hostname
echo "127.0.0.1 localhost $HOSTNAME" > /etc/hosts

98
utils.py Normal file
View File

@ -0,0 +1,98 @@
import os
import socket
import threading
from glob import glob
from configparser import ConfigParser
from netifaces import interfaces, ifaddresses, gateways, AF_INET, AF_LINK
ifname_mapping = {
'eth0': 'enp1s0',
'eth1': 'enp2s0',
'eth2': 'enp3s0',
'eth3': 'enp4s0',
}
if os.geteuid():
def sudo(cmd):
os.system(f'sudo {cmd}')
else:
def sudo(cmd):
os.system(cmd)
def get_config(section=None):
"""get content of box configuration
:param section: if not None, return only given section
:return: configuration as dict
"""
parser = ConfigParser()
cfgfiles = glob('/home/l_samenv/boxtools/cfg/%s_*.cfg' % socket.gethostname())
if len(cfgfiles) != 1:
raise ValueError('there must be one and only one single cfgfile %r' % cfgfiles)
parser.read(cfgfiles[0])
try:
result = {k: dict(parser[k]) for k in ([section] if section else parser.sections())}
except KeyError:
return {}
network = result.get('NETWORK', {})
for name in list(network):
network[ifname_mapping.get(name, name)] = network.pop(name)
if section:
return result[section]
return result
class MainIf:
address = None
ip = None
gateway = None
hostname = None
carrier = True
def __init__(self):
netcfg = get_config('NETWORK')
for name, key in netcfg.items():
if key.startswith(('dhcp', 'wan')):
self.name = name
break
else:
# take first one (alphabetically)
self.name = sorted(netcfg)[0]
self.getip()
def gethostthread(self, ip, event):
try:
hostname = socket.gethostbyaddr(ip)[0]
if event == self.event:
self.hostname = hostname
except Exception as e:
pass
event.set()
def getip(self):
with open(f'/sys/class/net/{self.name}/carrier') as f:
carrier = f.read().startswith('1')
if carrier > self.carrier:
sudo(f'dhclient -r {self.name}')
sudo(f'dhclient {self.name}')
self.carrier = carrier
addrinfo = ifaddresses(self.name)
self.address = addrinfo.get(AF_LINK, [{}])[0].get('addr')
if carrier:
self.ip = addrinfo.get(AF_INET, [{}])[0].get('addr')
self.gateway = gateways().get('default', {}).get(AF_INET) if self.ip else None
else:
self.ip = None
self.gateway = None
self.hostname = None
if self.carrier and self.ip and self.gateway:
self.event = event = threading.Event()
result = []
threading.Thread(target=self.gethostthread, args=(self.ip, event), daemon=True).start()
event.wait(0.1)
else:
self.event = None # disable changing hostname from pending threads
return self.ip