Files
boxtools/install.py

751 lines
23 KiB
Python
Executable File

#!/usr/bin/python3
"""install.py
- copy files from to_system into system directories
- set host name / network settings from boxtools/cfg file
- and many more
"""
if bytes == str:
raise NotImplementedError('python2 not supported')
import sys
import os
import filecmp
import re
import types
import socket
import tempfile
from pathlib import Path
from subprocess import Popen, PIPE
from ipaddress import IPv4Interface
from os.path import getmtime, exists
from gitea import change_to_gitea
def exit():
print('please restart ./install.py again')
sys.exit()
try:
import serial
except ImportError:
if 'yes'.startswith(input('install pyserial? [y]')):
os.system('sudo pip3 install --break-system-packages pyserial')
serial = None
try:
from utils import BoxInfo, check_service, unix_cmd, change_firewall, UndefinedConfigFile, convert_cfg
except ImportError:
if 'yes'.startswith(input('install netifaces? [y]')):
os.system('sudo pip3 install --break-system-packages netifaces')
serial = None
if serial is None:
exit()
TOOLS = BoxInfo.TOOLS
BOX_TYPES = {'ionopi', 'ionopimax', 'control-box', 'bare-apu', 'dual-eth-rpi'}
more_info = False
DEL = '__to_delete__'
STARTUP_TEXT = TOOLS / 'startup_display.txt'
COMMENT = "; please refer to README.md for help"
TEMPLATES = {}
TEMPLATES['bare-apu'] = f"""{COMMENT}
[BOX]
type=%s
MAC=%s
[NETWORK]
eth0=wan
eth1=192.168.2.2
eth2=192.168.3.3
eth3=192.168.127.254
[ROUTER]
3001=192.168.127.254:3001
"""
TEMPLATES['control-box'] = f"""{COMMENT}
[BOX]
type=%s
MAC=%s
[NETWORK]
eth0=wan
eth1=192.168.1.1
eth2=192.168.2.2
eth3=192.168.3.3
[DISPLAY]
line0=startup...
line1=HOST
line2=ADDR
"""
TEMPLATES['dual-eth-rpi'] = f"""{COMMENT}
[BOX]
type=%s
MAC=%s
[NETWORK]
eth0=wan
eth1=192.168.1.1
"""
GENERIC_TEMPLATE = f"""{COMMENT}
[BOX]
type=%s
MAC=%s
"""
DHCP_HEADER = """
default-lease-time 600;
max-lease-time 7200;
authoritative;
"""
ROUTER_SERVICE = f"""[Unit]
Description = Routing to locally connected hardware
After = network.target
[Service]
ExecStart = /usr/bin/python3 {TOOLS}/router.py
[Install]
WantedBy = multi-user.target
"""
FRAPPY_SERVICE = """[Unit]
Description = Running frappy server
[Service]
Environment=PYTHONPATH=/home/l_samenv/.local/lib/python3.11/site-packages/
ExecStart = /usr/bin/python3 /home/l_samenv/frappy/bin/frappy-server %s
[Install]
WantedBy = default.target
"""
DISPLAY_SERVICE = f"""[Unit]
Description = status display
After = network.target
[Service]
User = root
ExecStart = /usr/bin/python3 {TOOLS}/display.py -d
[Install]
WantedBy = multi-user.target
"""
DISPLAY_SERVICE = f"""[Unit]
Description = status display
After = network.target
[Service]
User = root
ExecStart = /usr/bin/python3 {TOOLS}/display.py -d
[Install]
WantedBy = multi-user.target
"""
BOXWEB_SERVICE = """[Unit]
Description = Web service for local GUI on a box
After = network.target
[Service]
Environment=PYTHONPATH=/home/l_samenv/.local/lib/python3.11/site-packages/
ExecStart = /usr/bin/python3 /home/l_samenv/boxweb/flaskserver.py {page} {port}
[Install]
WantedBy = multi-user.target
"""
pip_requirements = {
'root': {},
'l_samenv': {},
}
box = BoxInfo()
box.hostname_changed = False
dhcp_server_cfg = []
TO_SYSTEM = TOOLS / 'to_system', TOOLS / f'{box.typ}_system'
def do_cmd(*command, sudo=True):
unix_cmd(*command, execute=doit, sudo=sudo)
show.dirty = True
def frappy(cfg=None, port=None, requirements='', **kwds):
if not cfg:
return False, None
req = pip_requirements['l_samenv']
if requirements:
req[cfg] = '\n'.join(requirements.split(','))
if port:
cfg = '-p %s %s' % (port, cfg)
with open('/home/l_samenv/frappy/requirements.txt') as f:
req['frappy main'] = f.read()
return False, FRAPPY_SERVICE % cfg
def router(firewall=False, **opts):
if firewall:
active, enabled = check_service('nftables')
ports = {22}
if opts:
for port in firewall.split(','):
ports.add(int(port))
for key in opts:
try:
ports.add(int(key))
except ValueError:
pass
ports.update((1110, 1111, 1112))
if change_firewall(True, ports, doit):
show.dirty = True
elif change_firewall(False, set(), doit):
show.dirty = True
if not opts:
return True, None
try:
os.remove(TO_SYSTEM[0] / 'etc/nftables.conf')
with open(TOOLS / 'requirements.txt') as f:
pip_requirements['root']['tools'] = f.read()
except FileNotFoundError:
pass
return True, ROUTER_SERVICE
def display_update(cfg):
text = '\n'.join(cfg.get('startup_text', '').split('|')[:3])
text = text.replace('HOST', box.hostname) \
.replace('ADDR', box.macaddr) + '\n'
if write_when_new(STARTUP_TEXT, text):
print('change startup text')
if doit:
os.rename(STARTUP_TEXT, STARTUP_TEXT + '.todo')
return True
return False
def display(**opts):
if not opts:
return True, None
return True, DISPLAY_SERVICE
def pip():
for user, requirements in pip_requirements.items():
if not requirements:
continue
if user == 'root':
tmpname = '/root/pip_requirements.tmp'
pipcmd = ['sudo pip3 install -r', tmpname]
else:
tmpname = f'/home/{user}/pip_requirements.tmp'
pipcmd = ['pip3 install --user --break-system-packages -r', tmpname]
filename = tmpname.replace('.tmp', '.txt')
content = ''.join('# --- for %s ---\n%s\n' % kv for kv in requirements.items())
if content:
if write_when_new(filename, content, user=='root'):
if doit:
os.rename(filename, tmpname)
if os.system(' '.join(pipcmd)) == 0:
os.rename(tmpname, filename)
else:
os.remove(tmpname)
else:
print(' '.join(pipcmd))
show.dirty = True
def boxweb(port=80, page=None):
port = int(port)
servicecfg = None if page is None else BOXWEB_SERVICE.format(port=port, page=page)
return port <= 1024, servicecfg
SERVICES = dict(router=router, display=display, frappy=frappy, boxweb=boxweb)
AS_ROOT = {'router', 'display'}
def write_when_new(filename, content, as_root=False, ignore_reduction=False):
if content is None:
lines = []
else:
if not content.endswith('\n'):
content += '\n'
lines = content.split('\n')
try:
with open(filename) as fil:
old = fil.read()
except FileNotFoundError:
old = None
#except Exception as e:
# print('can not read', filename, '-> do not check')
# return False
if old == content:
return False
if ignore_reduction:
content_set = set(v for v in lines if not v.startswith('#'))
old_set = set(v for v in (old or '').split('\n') if not v.startswith('#'))
content_set -= old_set
if content_set:
print(f"missing in {filename}: {(', '.join(content_set - old_set))}")
else:
return False
if doit:
if lines:
with tempfile.NamedTemporaryFile('w') as fil:
fil.write(content)
fil.flush()
unix_cmd('cp', fil.name, filename, sudo=as_root)
unix_cmd('chmod 0644', filename, sudo=as_root)
else:
unix_cmd('rm', '-f', filename, sudo=as_root)
elif more_info:
print('.' * 80)
print('changes in', filename)
old = [] if old is None else old.split('\n')
top_lines = 0 # in case of empty loop
for top_lines, (ol, ll) in enumerate(zip(old, lines)):
if ol != ll:
break
bottom_lines = 0
for bottom_lines, (ol, ll) in enumerate(zip(reversed(old[top_lines:]), reversed(lines[top_lines:]))):
if ol != ll:
break
if bottom_lines == 0:
old.append('')
lines.append('')
bottom_lines += 1
print("===")
print('\n'.join(old[:top_lines]))
print('<<<')
print('\n'.join(old[top_lines:-bottom_lines]))
print('---')
print('\n'.join(lines[top_lines:-bottom_lines]))
print('>>>')
print('\n'.join(old[-bottom_lines:-1]))
print("===")
print('.' * 80)
return content
def replace_in_file(filename, pat, repl):
"""find pattern <prev> in content of filename
find regexp pattern <pat> in content of file
- if it does not match exactly once, an error is raised
- exchange group 1 by repl
- return True when result is changed, or False when not
doit determines, whether the file is really changed
"""
with open(filename) as f:
content = f.read()
success = []
def fun(match, count=[]):
if success:
raise ValueError(f'{pat} matches multiple times in {filename}')
part = match.group(0)
match = match.re.match(part)
fr, to = match.span(1)
result = part[:fr] + repl + part[to:]
if result != part:
print(f'in {filename} replace {part} -> {result}')
success.append(result != part)
return result
newcontent = re.sub(pat, fun, content)
if not success:
raise ValueError(f'{pat} not in {filename}')
write_when_new(filename, newcontent, as_root=True)
return success[0]
def create_if(name, cfg):
if cfg == 'off':
result = None
elif cfg.startswith('wan') or cfg == 'dhcp':
if box.main_if != name:
print(box.main_if, name)
raise ValueError('can not have more than one WAN/DHCP port')
box.main_if = name
# default: all <= 192.0.0.0
# others have to be added explicitly
dhcp_server_cfg.append(('0.0.0.0/128.0.0.0', []))
dhcp_server_cfg.append(('128.0.0.0/192.0.0.0', []))
for nw in cfg.split(',')[1:]:
nw = IPv4Interface(nw).network
dhcp_server_cfg.append((nw.with_netmask, []))
result = f"allow-hotplug {name}\niface {name} inet dhcp"
else:
cfgip = IPv4Interface(cfg)
network = cfgip.network
if network.prefixlen == 32: # or no prefix specified
otherip = IPv4Interface('%s/24' % cfgip.ip)
network = otherip.network
if str(cfgip.ip).endswith('.1'):
cfgip = network.network_address + 2
else:
cfgip = network.network_address + 1
dhcp_server_cfg.append((network.with_netmask, [(str(otherip.ip), str(otherip.ip))]))
addr = str(cfgip)
else: # subnet with multiple adresses -> static adresses only. dhcp range not yet implemented
addr = str(cfgip.ip)
# result['NETMASK'] = network.netmask
result = f"allow-hotplug {name}\niface {name} inet static\n address {addr}/{network.prefixlen}"
if result:
return result + '\n'
def walk_dir(action, srcpath, dstroot, files):
if not files:
return
action.srcpath = srcpath = Path(srcpath)
action.dstpath = dstpath = dstroot / srcpath
match, mismatch, missing = filecmp.cmpfiles(srcpath, dstpath, files)
if mismatch:
newer = [f for f in mismatch if getmtime(dstpath / f) > getmtime(srcpath / f)]
if newer:
action.newer(newer)
if len(newer) < len(mismatch):
newer = set(newer)
action.older([f for f in mismatch if f not in newer])
if missing:
if DEL in missing:
missing.remove(DEL)
with open(srcpath / DEL) as fil:
to_delete = []
for fname in fil:
fname = fname.strip()
if fname and exists(dstpath / fname):
if (srcpath / fname).exists():
print('ERROR: %s in %s, but also in repo -> ignored' % (fname, DEL))
else:
to_delete.append(fname)
action.delete(to_delete)
if missing:
action.missing(missing)
def walk(action):
action.as_root = True
for rootpath in TO_SYSTEM:
if not rootpath.exists():
continue
os.chdir(rootpath)
for srcpath, _, files in os.walk('.'):
walk_dir(action, srcpath, Path('/'), files)
os.chdir(TOOLS / 'to_home')
action.as_root = False
for srcpath, _, files in os.walk('.'):
walk_dir(action, srcpath, Path.home(), files)
class Walker:
srcpath = None
dstpath = None
as_root = None
class Show(Walker):
dirty = False
def diff(self, title, files):
self.dirty = True
if more_info:
for f in files:
if f.endswith(more_info):
print('diff %s %s' % (self.srcpath / f, self.dstpath / f))
print('.' * 80)
else:
print('%s %s:\n %s' % (title, self.dstpath, ' '.join(files)))
def show(self, title, srcpath, files):
self.dirty = True
if more_info:
for f in files:
print('cat %s' % (srcpath / f))
print('.' * 80)
else:
print('%s %s:\n %s' % (title, self.dstpath, ' '.join(files)))
def newer(self, files):
self.show('get from', self.srcpath, files)
def older(self, files):
self.show('replace in', self.srcpath, files)
def missing(self, files):
self.show('install in', self.srcpath, files)
def delete(self, files):
self.show('remove from', self.dstpath, files)
class Do(Walker):
def newer(self, files):
unix_cmd('mkdir', '-p', str(self.dstpath), sudo=self.as_root)
unix_cmd('cp', *(str(self.srcpath / f) for f in files), str(self.dstpath),
sudo=self.as_root)
older = newer
missing = newer
def delete(self, files):
unix_cmd('rm', '-f', *(str(self.dstpath / f) for f in files))
def handle_config():
dhcp_server_cfg.clear()
try:
config = box.read_config()
except UndefinedConfigFile as e:
print(f'{box.cfgfile} not found', e)
if box.oldcfg:
print(f'convert {box.cfgfile} to {convert_cfg(box.cfgfile, box.macaddr)}')
config = box.read_config()
cfgfile = box.cfgfile
newhostname = box.hostname
if cfgfile:
if box.hwtype != 'apu':
typ = config.get('BOX', {}).get('type')
if typ:
if typ not in BOX_TYPES:
raise ValueError(f'unknown type={typ} defined in BOX section')
box.typ = typ
elif box.typ == 'cm4':
print('WARNING: missing type in BOX section')
print('This is a cm4 so guess its a dual-eth-rpi')
box.typ = 'dual-eth-rpi'
elif box.typ == 'cm3':
print('WARNING: missing type in BOX section')
print('This is a cm3 so guess its a ionopimax')
box.typ = 'ionopimax'
print('box type:', box.typ, ' mac addr:', box.macaddr)
print('---')
else:
if box.typ == 'apu':
# determine if display is present
disp = serial.Serial('/dev/ttyS1', baudrate=115200, timeout=1)
disp.write(b'\x1b\x1b\x01\xf3')
display_available = disp.read(8)[0:4] == b'\x1b\x1b\x05\xf3'
if display_available:
box.typ = 'control=box'
else:
box.typ = 'bare-apu'
elif box.typ == 'cm4':
box.typ = 'dual-eth-rpi'
print('This is a cm4, so guess its a dual-eth-rpi - please check type in created cfg file')
elif box.typ == 'cm3':
print('This is a cm3 so guess its a ionopimax - please check type in created cfg file')
box.typ = 'ionopimax'
elif box.typ == 'rpi':
print('This is a simple rpi, so guess its a ionopi - please check type in created cfg file')
box.typ = 'ionopi'
template = TEMPLATES.get(box.typ)
if template is None:
box.typ = box.typ or 'unknown-box'
template = GENERIC_TEMPLATE
template = template % (box.typ, box.macaddr)
print('no cfg file found for this', box.typ,
f'with mac address {box.macaddr} (hostname={box.hostname})')
newhostname = input('enter host name: ')
if not newhostname:
print('no hostname given')
return False
cfgfile = BoxInfo.CFGDIR / f'{newhostname}.cfg'
with open(cfgfile, 'w') as f:
f.write(template)
config = box.read_config()
cfgfile = box.cfgfile
if box.hostname_changed or box.hostname != newhostname:
box.hostname_changed = True
if cfgfile:
newhostname = cfgfile.stem.split('_')[0]
if doit:
print('bash sethostname.sh')
unix_cmd('bash', f'{TOOLS}/sethostname.sh')
else:
if cfgfile:
print('replace host name %r by %r' % (box.hostname, newhostname))
show.dirty = True
config = box.read_config()
box.hostname = newhostname
if cfgfile is None:
return False
to_start = {} # dict <service> of <action>, <as_root>
ifname = ''
try:
netcfg = config.get('NETWORK', {})
if netcfg: # when not network is specified, do not handle network at all
for name in netcfg:
if name not in box.network_interfaces:
print(f'{name} is currently not a valid network interface - skip')
continue
for ifname in box.network_interfaces:
content = create_if(ifname, netcfg.get(ifname, 'off'))
# content = '\n'.join('%s=%s' % kv for kv in content.items())
todo = write_when_new(f'/etc/network/interfaces.d/{ifname}', content, as_root=True)
if todo and not more_info:
print('change', ifname)
show.dirty = True
to_start[ifname] = 'if_restart', True
if dhcp_server_cfg:
content = [DHCP_HEADER]
for subnet, rangelist in dhcp_server_cfg:
try:
adr, mask = subnet.split('/')
except Exception as e:
print(subnet, repr(e))
continue
content.append('subnet %s netmask %s {\n' % (adr, mask))
#content.append(' option netmask %s;\n' % mask)
for rng in rangelist:
content.append(' range %s %s;\n' % rng)
content.append('}\n')
content = ''.join(content)
todo = write_when_new('/etc/dhcp/dhcpd.conf', content, as_root=True)
if todo:
print('change dhcpd.conf')
to_start['isc-dhcp-server'] = 'restart', True
show.dirty = True
try:
if replace_in_file(
'/etc/default/isc-dhcp-server', r'INTERFACESv4="(.*)"', ' '.join(
[n for n in box.network_interfaces if n != box.main_if])):
show.dirty = True
except FileNotFoundError:
if 'yes'.startswith(input('install isc-dhcp-server? [y]')):
unix_cmd('apt-get install isc-dhcp-server')
exit()
elif doit:
unix_cmd('systemctl stop isc-dhcp-server')
unix_cmd('systemctl disable isc-dhcp-server')
displaycfg = config.get('DISPLAY')
if displaycfg and display_update(displaycfg):
to_start['display'] = 'restart', True
if change_to_gitea(doit):
show.dirty = True
except Exception as e:
print('ERROR: can not handle %s %s: %r' % (box.hostname, ifname, e))
raise
# return False
reload_systemd = set()
for service, service_func in SERVICES.items():
section = service.upper()
section_dict = config.get(section, {})
as_root, servicecfg = service_func(**section_dict)
active, enabled = check_service(service, as_root=as_root)
if servicecfg is None:
if active or enabled:
check_service(service, False, doit, as_root=as_root)
show.dirty = True
continue
else:
if not enabled:
to_start[service] = 'enable', as_root
elif not active:
to_start[service] = 'restart', as_root
if as_root:
systemdir = Path('/etc/systemd/system')
else:
systemdir = Path('~/.config/systemd/user').expanduser()
if write_when_new(systemdir / f'{service}.service', servicecfg, as_root=as_root):
show.dirty = True
reload_systemd.add(as_root)
if servicecfg and to_start.get('service') is None:
to_start[service] = 'restart', as_root
if 'dialout' not in unix_cmd('id l_samenv'):
do_cmd('usermod -a -G dialout l_samenv')
pip()
for as_root in reload_systemd:
if as_root:
do_cmd('systemctl daemon-reload', sudo=True)
else:
do_cmd('systemctl --user daemon-reload', sudo=False)
for service, (action, _) in to_start.items():
show.dirty = True
if action == 'if_restart':
do_cmd(f'ifdown {service}')
for service, (action, as_root) in to_start.items():
if action == 'if_restart':
do_cmd(f'ifup {service}')
else:
if action == 'restart': # else 'enable'
do_cmd('systemctl', 'restart', service, sudo=as_root)
if not as_root:
do_cmd(f'loginctl enable-linger l_samenv')
do_cmd('systemctl', 'enable', service, sudo=as_root)
if box.change_if_names:
print('interface name system has to be changed from enp*s0 to eth*')
if replace_in_file('/etc/default/grub',
r'GRUB_CMDLINE_LINUX_DEFAULT="quiet(.*)"',
' net.ifnames=0'):
do_cmd('update-grub')
result = [f'config file:\n {cfgfile}']
for section, section_dict in config.items():
result.append(section)
for key, value in section_dict.items():
result.append(f' {key} = {value}')
result.append('')
return '\n'.join(result)
doit = False
print(' ')
show = Show()
walk(show)
result = handle_config()
if not result:
print('fix first above errors')
else:
if show.dirty:
print('---')
answer = input('enter "y(es)" to do above or "m(ore)" for more info: ')
if not answer:
print('cancel')
elif 'yes'.startswith(answer.lower()):
doit = True
handle_config()
walk(Do())
with open(TOOLS / 'current', 'w') as f:
f.write(result)
elif 'more'.startswith(answer.lower()):
more_info = True
walk(show)
result = handle_config()
else:
print('nothing to do')