#!/usr/bin/python3 """install.py - copy files from to_system into system directories - set host name / network settings from boxtools/cfg file - and many more """ if bytes == str: raise NotImplementedError('python2 not supported') import sys import os import filecmp import re import types import socket import tempfile from pathlib import Path from subprocess import Popen, PIPE from ipaddress import IPv4Interface from os.path import getmtime, exists from gitea import change_to_gitea def exit(): print('please restart ./install.py again') sys.exit() try: import serial except ImportError: if 'yes'.startswith(input('install pyserial? [y]')): os.system('sudo apt update') os.system('sudo apt install python3-pip') os.system('sudo pip3 install --break-system-packages pyserial') serial = None try: from utils import BoxInfo, check_service, unix_cmd, change_firewall, UndefinedConfigFile, convert_cfg except ImportError: if 'yes'.startswith(input('install netifaces? [y]')): os.system('sudo pip3 install --break-system-packages netifaces') serial = None if serial is None: exit() TOOLS = BoxInfo.TOOLS BOX_TYPES = {'ionopi', 'ionopimax', 'control-box', 'bare-apu', 'dual-eth-rpi'} more_info = False DEL = '__to_delete__' STARTUP_TEXT = TOOLS / 'startup_display.txt' COMMENT = "; please refer to README.md for help" TEMPLATES = {} TEMPLATES['bare-apu'] = f"""{COMMENT} [BOX] type=%(typ)s MAC=%(mac)s [NETWORK] eth0=wan eth1=192.168.2.2 eth2=192.168.3.3 eth3=192.168.127.254 [ROUTER] 3001=192.168.127.254:3001 """ TEMPLATES['control-box'] = f"""{COMMENT} [BOX] type=%(typ)s MAC=%(mac)s [NETWORK] eth0=wan eth1=192.168.1.1 eth2=192.168.2.2 eth3=192.168.3.3 [DISPLAY] line0=startup... line1=HOST line2=ADDR """ TEMPLATES['dual-eth-rpi'] = f"""{COMMENT} [BOX] type=%(typ)s MAC=%(mac)s [NETWORK] end0=wan %(mac2)s=192.168.1.1 """ GENERIC_TEMPLATE = f"""{COMMENT} [BOX] type=%(typ)s MAC=%(mac)s """ DHCP_HEADER = """ default-lease-time 600; max-lease-time 7200; authoritative; """ ROUTER_SERVICE = f"""[Unit] Description = Routing to locally connected hardware After = network.target [Service] ExecStart = /usr/bin/python3 {TOOLS}/router.py [Install] WantedBy = multi-user.target """ FRAPPY_SERVICE = """[Unit] Description = Running frappy server After=waitboot.service [Service] Type=simple Environment=PYTHONPATH=/home/l_samenv/.local/lib/python3.11/site-packages/ ExecStart = /home/l_samenv/boxtools/start_frappy [Install] WantedBy = default.target """ DISPLAY_SERVICE = f"""[Unit] Description = status display After = network.target [Service] User = root ExecStart = /usr/bin/python3 {TOOLS}/display.py -d [Install] WantedBy = multi-user.target """ DISPLAY_SERVICE = f"""[Unit] Description = status display After = network.target [Service] User = root ExecStart = /usr/bin/python3 {TOOLS}/display.py -d [Install] WantedBy = multi-user.target """ BOXWEB_SERVICE = """[Unit] Description = Web service for local GUI on a box After = waitboot.target [Service] ExecStart = /usr/bin/python /home/l_samenv/boxweb/flaskserver.py {page} {port} [Install] WantedBy = default.target """ pip_requirements = { 'root': {}, 'l_samenv': {}, } box = BoxInfo() box.hostname_changed = False dhcp_server_cfg = [] TO_SYSTEM = TOOLS / 'to_system', TOOLS / f'{box.typ}_system' def do_cmd(*command, sudo=True): unix_cmd(*command, execute=doit, sudo=sudo) show.dirty = True def frappy(cfg=None, port=None, requirements='', **kwds): if not cfg: return False, None req = pip_requirements['l_samenv'] if requirements: req[cfg] = '\n'.join(requirements.split(',')) with open('/home/l_samenv/frappy/requirements.txt') as f: req['frappy main'] = f.read() return False, FRAPPY_SERVICE def router(firewall=False, **opts): if firewall: active, enabled = check_service('nftables') ports = {22} if opts: for port in firewall.split(','): ports.add(int(port)) for key in opts: try: ports.add(int(key)) except ValueError: pass ports.update((1110, 1111, 1112)) if change_firewall(True, ports, doit): show.dirty = True elif change_firewall(False, set(), doit): show.dirty = True if not opts: return True, None try: os.remove(TO_SYSTEM[0] / 'etc/nftables.conf') with open(TOOLS / 'requirements.txt') as f: pip_requirements['root']['tools'] = f.read() except FileNotFoundError: pass return True, ROUTER_SERVICE def display_update(cfg): text = '\n'.join(cfg.get('startup_text', '').split('|')[:3]) text = text.replace('HOST', box.hostname) \ .replace('ADDR', box.macaddr) + '\n' if write_when_new(STARTUP_TEXT, text): print('change startup text') if doit: os.rename(STARTUP_TEXT, STARTUP_TEXT + '.todo') return True return False def display(**opts): if not opts: return True, None return True, DISPLAY_SERVICE def pip(): for user, requirements in pip_requirements.items(): if not requirements: continue if user == 'root': tmpname = '/root/pip_requirements.tmp' pipcmd = ['sudo pip3 install -r', tmpname] else: tmpname = f'/home/{user}/pip_requirements.tmp' pipcmd = ['pip3 install --user --break-system-packages -r', tmpname] filename = tmpname.replace('.tmp', '.txt') content = ''.join('# --- for %s ---\n%s\n' % kv for kv in requirements.items()) if content: if write_when_new(filename, content, user=='root'): if doit: os.rename(filename, tmpname) if os.system(' '.join(pipcmd)) == 0: os.rename(tmpname, filename) else: os.remove(tmpname) else: print(' '.join(pipcmd)) show.dirty = True def boxweb(port=8080, page=None): port = int(port) servicecfg = None if page is None else BOXWEB_SERVICE.format(port=port, page=page) return port <= 1024, servicecfg SERVICES = dict(router=router, display=display, frappy=frappy, boxweb=boxweb) AS_ROOT = {'router', 'display'} def write_when_new(filename, content, as_root=False, ignore_reduction=False): if content is None: lines = [] else: if not content.endswith('\n'): content += '\n' lines = content.split('\n') try: with open(filename) as fil: old = fil.read() except FileNotFoundError: old = None #except Exception as e: # print('can not read', filename, '-> do not check') # return False if old == content: return False if ignore_reduction: content_set = set(v for v in lines if not v.startswith('#')) old_set = set(v for v in (old or '').split('\n') if not v.startswith('#')) content_set -= old_set if content_set: print(f"missing in {filename}: {(', '.join(content_set - old_set))}") else: return False if doit: if lines: unix_cmd(f'mkdir -p {Path(filename).parent}', sudo=as_root) with tempfile.NamedTemporaryFile('w') as fil: fil.write(content) fil.flush() unix_cmd('cp', fil.name, filename, sudo=as_root) unix_cmd('chmod 0644', filename, sudo=as_root) else: unix_cmd('rm', '-f', filename, sudo=as_root) elif more_info: print('.' * 80) print('changes in', filename) old = [] if old is None else old.split('\n') top_lines = 0 # in case of empty loop for top_lines, (ol, ll) in enumerate(zip(old, lines)): if ol != ll: break bottom_lines = 0 for bottom_lines, (ol, ll) in enumerate(zip(reversed(old[top_lines:]), reversed(lines[top_lines:]))): if ol != ll: break if bottom_lines == 0: old.append('') lines.append('') bottom_lines += 1 print("===") print('\n'.join(old[:top_lines])) print('<<<') print('\n'.join(old[top_lines:-bottom_lines])) print('---') print('\n'.join(lines[top_lines:-bottom_lines])) print('>>>') print('\n'.join(old[-bottom_lines:-1])) print("===") print('.' * 80) return content def replace_in_file(filename, pat, repl): """find pattern in content of filename find regexp pattern in content of file - if it does not match exactly once, an error is raised - exchange group 1 by repl - return True when result is changed, or False when not doit determines, whether the file is really changed """ with open(filename) as f: content = f.read() success = [] def fun(match, count=[]): if success: raise ValueError(f'{pat} matches multiple times in {filename}') part = match.group(0) match = match.re.match(part) fr, to = match.span(1) result = part[:fr] + repl + part[to:] if result != part: print(f'in {filename} replace {part} -> {result}') success.append(result != part) return result newcontent = re.sub(pat, fun, content) if not success: raise ValueError(f'{pat} not in {filename}') write_when_new(filename, newcontent, as_root=True) return success[0] def create_if(name, cfg): if cfg == 'off': result = None elif cfg.startswith('wan') or cfg == 'dhcp': if box.main_if != name: print(box.main_if, name) raise ValueError('can not have more than one WAN/DHCP port') box.main_if = name # default: all <= 192.0.0.0 # others have to be added explicitly dhcp_server_cfg.append(('0.0.0.0/128.0.0.0', [])) dhcp_server_cfg.append(('128.0.0.0/192.0.0.0', [])) for nw in cfg.split(',')[1:]: nw = IPv4Interface(nw).network dhcp_server_cfg.append((nw.with_netmask, [])) result = f"allow-hotplug {name}\niface {name} inet dhcp" else: cfgip = IPv4Interface(cfg) network = cfgip.network if network.prefixlen == 32: # or no prefix specified otherip = IPv4Interface('%s/24' % cfgip.ip) network = otherip.network if str(cfgip.ip).endswith('.1'): cfgip = network.network_address + 2 else: cfgip = network.network_address + 1 dhcp_server_cfg.append((network.with_netmask, [(str(otherip.ip), str(otherip.ip))])) addr = str(cfgip) else: # subnet with multiple adresses -> static adresses only. dhcp range not yet implemented addr = str(cfgip.ip) # result['NETMASK'] = network.netmask result = f"allow-hotplug {name}\niface {name} inet static\n address {addr}/{network.prefixlen}" if result: return result + '\n' def walk_dir(action, srcpath, dstroot, files): if not files: return action.srcpath = srcpath = Path(srcpath) action.dstpath = dstpath = dstroot / srcpath match, mismatch, missing = filecmp.cmpfiles(srcpath, dstpath, files) if mismatch: newer = [f for f in mismatch if getmtime(dstpath / f) > getmtime(srcpath / f)] if newer: action.newer(newer) if len(newer) < len(mismatch): newer = set(newer) action.older([f for f in mismatch if f not in newer]) if missing: if DEL in missing: missing.remove(DEL) with open(srcpath / DEL) as fil: to_delete = [] for fname in fil: fname = fname.strip() if fname and exists(dstpath / fname): if (srcpath / fname).exists(): print('ERROR: %s in %s, but also in repo -> ignored' % (fname, DEL)) else: to_delete.append(fname) action.delete(to_delete) if missing: action.missing(missing) def walk(action): action.as_root = True for rootpath in TO_SYSTEM: if not rootpath.exists(): continue os.chdir(rootpath) for srcpath, _, files in os.walk('.'): walk_dir(action, srcpath, Path('/'), files) os.chdir(TOOLS / 'to_home') action.as_root = False for srcpath, _, files in os.walk('.'): walk_dir(action, srcpath, Path.home(), files) class Walker: srcpath = None dstpath = None as_root = None class Show(Walker): dirty = False def diff(self, title, files): self.dirty = True if more_info: for f in files: if f.endswith(more_info): print('diff %s %s' % (self.srcpath / f, self.dstpath / f)) print('.' * 80) else: print('%s %s:\n %s' % (title, self.dstpath, ' '.join(files))) def show(self, title, srcpath, files): self.dirty = True if more_info: for f in files: print('cat %s' % (srcpath / f)) print('.' * 80) else: print('%s %s:\n %s' % (title, self.dstpath, ' '.join(files))) def newer(self, files): self.show('get from', self.srcpath, files) def older(self, files): self.show('replace in', self.srcpath, files) def missing(self, files): self.show('install in', self.srcpath, files) def delete(self, files): self.show('remove from', self.dstpath, files) class Do(Walker): def newer(self, files): unix_cmd('mkdir', '-p', str(self.dstpath), sudo=self.as_root) unix_cmd('cp', *(str(self.srcpath / f) for f in files), str(self.dstpath), sudo=self.as_root) older = newer missing = newer def delete(self, files): unix_cmd('rm', '-f', *(str(self.dstpath / f) for f in files)) def handle_config(): dhcp_server_cfg.clear() try: config = box.read_config() except UndefinedConfigFile as e: print(f'{box.cfgfile} not found', e) if box.oldcfg: print(f'convert {box.cfgfile} to {convert_cfg(box.cfgfile, box.macaddr)}') config = box.read_config() cfgfile = box.cfgfile newhostname = box.hostname if cfgfile: if box.hwtype != 'apu': typ = config.get('BOX', {}).get('type') if typ: if typ not in BOX_TYPES: raise ValueError(f'unknown type={typ} defined in BOX section') box.typ = typ elif box.typ == 'cm4': print('WARNING: missing type in BOX section') print('This is a cm4 so guess its a dual-eth-rpi') box.typ = 'dual-eth-rpi' elif box.typ == 'cm3': print('WARNING: missing type in BOX section') print('This is a cm3 so guess its a ionopimax') box.typ = 'ionopimax' print('box type:', box.typ, ' mac addr:', box.macaddr) print('---') else: template_args = {'mac': box.macaddr} if box.typ == 'apu': # determine if display is present disp = serial.Serial('/dev/ttyS1', baudrate=115200, timeout=1) disp.write(b'\x1b\x1b\x01\xf3') display_available = disp.read(8)[0:4] == b'\x1b\x1b\x05\xf3' if display_available: box.typ = 'control=box' else: box.typ = 'bare-apu' elif box.typ == 'cm4': box.typ = 'dual-eth-rpi' print('This is a cm4, so guess its a dual-eth-rpi - please check type in created cfg file') template_args['mac2'] = 'enx?' for addr in box.network_interfaces: if addr.startswith('enx'): template_args['mac2'] = addr print(template_args) elif box.typ == 'cm3': print('This is a cm3 so guess its a ionopimax - please check type in created cfg file') box.typ = 'ionopimax' elif box.typ == 'rpi': print('This is a simple rpi, so guess its a ionopi - please check type in created cfg file') box.typ = 'ionopi' template = TEMPLATES.get(box.typ) if template is None: box.typ = box.typ or 'unknown-box' template = GENERIC_TEMPLATE template_args['typ'] = box.typ template = template % template_args print('no cfg file found for this', box.typ, f'with mac address {box.macaddr} (hostname={box.hostname})') newhostname = input('enter host name: ') if not newhostname: print('no hostname given') return False cfgfile = BoxInfo.CFGDIR / f'{newhostname}.cfg' with open(cfgfile, 'w') as f: f.write(template) config = box.read_config() cfgfile = box.cfgfile if box.hostname_changed or box.hostname != newhostname: box.hostname_changed = True if cfgfile: newhostname = cfgfile.stem.split('_')[0] if doit: print('bash sethostname.sh') unix_cmd('bash', f'{TOOLS}/sethostname.sh') else: if cfgfile: print('replace host name %r by %r' % (box.hostname, newhostname)) show.dirty = True config = box.read_config() box.hostname = newhostname if cfgfile is None: return False to_start = {} # dict of , ifname = '' try: netcfg = config.get('NETWORK', {}) if netcfg: # when not network is specified, do not handle network at all for name in netcfg: if name not in box.network_interfaces: print(f'{name} is currently not a valid network interface - skip') continue for ifname in box.network_interfaces: content = create_if(ifname, netcfg.get(ifname, 'off')) # content = '\n'.join('%s=%s' % kv for kv in content.items()) todo = write_when_new(f'/etc/network/interfaces.d/{ifname}', content, as_root=True) if todo and not more_info: print('change', ifname) show.dirty = True to_start[ifname] = 'if_restart', True if dhcp_server_cfg: content = [DHCP_HEADER] for subnet, rangelist in dhcp_server_cfg: try: adr, mask = subnet.split('/') except Exception as e: print(subnet, repr(e)) continue content.append('subnet %s netmask %s {\n' % (adr, mask)) #content.append(' option netmask %s;\n' % mask) for rng in rangelist: content.append(' range %s %s;\n' % rng) content.append('}\n') content = ''.join(content) todo = write_when_new('/etc/dhcp/dhcpd.conf', content, as_root=True) if todo: print('change dhcpd.conf') to_start['isc-dhcp-server'] = 'restart', True show.dirty = True try: if replace_in_file( '/etc/default/isc-dhcp-server', r'INTERFACESv4="(.*)"', ' '.join( [n for n in box.network_interfaces if n != box.main_if])): show.dirty = True except FileNotFoundError: if 'yes'.startswith(input('install isc-dhcp-server? [y]')): unix_cmd('apt-get install isc-dhcp-server') exit() elif doit: unix_cmd('systemctl stop isc-dhcp-server') unix_cmd('systemctl disable isc-dhcp-server') displaycfg = config.get('DISPLAY') if displaycfg and display_update(displaycfg): to_start['display'] = 'restart', True if change_to_gitea(doit): show.dirty = True except Exception as e: print('ERROR: can not handle %s %s: %r' % (box.hostname, ifname, e)) raise # return False reload_systemd = set() for service, service_func in SERVICES.items(): section = service.upper() section_dict = config.get(section, {}) as_root, servicecfg = service_func(**section_dict) active, enabled = check_service(service, as_root=as_root) if servicecfg is None: if active or enabled: check_service(service, False, doit, as_root=as_root) show.dirty = True continue else: if not enabled: to_start[service] = 'enable', as_root elif not active: to_start[service] = 'restart', as_root if as_root: systemdir = Path('/etc/systemd/system') else: systemdir = Path('~/.config/systemd/user').expanduser() if write_when_new(systemdir / f'{service}.service', servicecfg, as_root=as_root): show.dirty = True reload_systemd.add(as_root) if servicecfg and to_start.get('service') is None: to_start[service] = 'restart', as_root if 'dialout' not in unix_cmd('id l_samenv'): do_cmd('usermod -a -G dialout l_samenv') pip() for as_root in reload_systemd: if as_root: do_cmd('systemctl daemon-reload', sudo=True) else: do_cmd('systemctl --user daemon-reload', sudo=False) for service, (action, _) in to_start.items(): show.dirty = True if action == 'if_restart': do_cmd(f'ifconfig {service} down') for service, (action, as_root) in to_start.items(): if action == 'if_restart': do_cmd(f'ifconfig {service} up') else: if action == 'restart': # else 'enable' do_cmd('systemctl', 'restart', service, sudo=as_root) if not as_root: do_cmd('loginctl enable-linger', sudo=True) do_cmd('systemctl', 'enable', service, sudo=as_root) if box.change_if_names: print('interface name system has to be changed from enp*s0 to eth*') if replace_in_file('/etc/default/grub', r'GRUB_CMDLINE_LINUX_DEFAULT="quiet(.*)"', ' net.ifnames=0'): do_cmd('update-grub') result = [f'config file:\n {cfgfile}'] for section, section_dict in config.items(): result.append(section) for key, value in section_dict.items(): result.append(f' {key} = {value}') result.append('') return '\n'.join(result) doit = False print(' ') show = Show() walk(show) result = handle_config() if not result: print('fix first above errors') else: if show.dirty: print('---') answer = input('enter "y(es)" to do above or "m(ore)" for more info: ') if not answer: print('cancel') elif 'yes'.startswith(answer.lower()): doit = True handle_config() walk(Do()) unix_cmd(f'rm -f {TOOLS / "current"}') with open(TOOLS / 'current', 'w') as f: f.write(result) elif 'more'.startswith(answer.lower()): more_info = True walk(show) result = handle_config() else: print('nothing to do')