boxtools on raspberry pi
- rename to_system to apu_system / cm3_system /cm4_system - fixes in router - firewall may be switched off - firewall is a parameter of router, also used when no routing is configured.
This commit is contained in:
92
utils.py
92
utils.py
@ -1,9 +1,14 @@
|
||||
import os
|
||||
import socket
|
||||
import threading
|
||||
import re
|
||||
from glob import glob
|
||||
from configparser import ConfigParser
|
||||
from netifaces import interfaces, ifaddresses, gateways, AF_INET, AF_LINK
|
||||
from subprocess import Popen, PIPE
|
||||
|
||||
|
||||
FIREWALL_CONF = '/etc/nftables.conf'
|
||||
|
||||
|
||||
if os.geteuid():
|
||||
@ -130,3 +135,90 @@ class MainIf:
|
||||
else:
|
||||
self.prev_ip = None
|
||||
return self.carrier, self.ip, self.hostnameresult[0], self.gateway
|
||||
|
||||
|
||||
def unix_cmd(command, execute=None, stdout=PIPE):
|
||||
if execute != False: # None or True
|
||||
if execute:
|
||||
print('$ %s' % command)
|
||||
result = Popen(command.split(), stdout=stdout).communicate()[0]
|
||||
return (result or b'').decode()
|
||||
else:
|
||||
print('> %s' % command)
|
||||
|
||||
|
||||
def check_service(service, set_on=None, execute=None):
|
||||
"""check or set state of systemd service
|
||||
|
||||
set_on is None or not given: query only
|
||||
bool(set_on) is True: start and enable if not yet done
|
||||
set_on == False/0: stop and disable if not yet done
|
||||
|
||||
sim: print out command instead of executing
|
||||
"""
|
||||
result = unix_cmd(f'systemctl show -p WantedBy -p ActiveState {service}')
|
||||
enabled = False
|
||||
active = False
|
||||
for line in result.split('\n'):
|
||||
if line.startswith('WantedBy=') and line.strip() != 'WantedBy=':
|
||||
enabled = True
|
||||
elif line.strip() == 'ActiveState=active':
|
||||
active = True
|
||||
if set_on:
|
||||
if not active:
|
||||
unix_cmd(f'systemctl start {service}', execute)
|
||||
if not enabled:
|
||||
unix_cmd(f'systemctl enable {service}', execute)
|
||||
elif set_on is not None:
|
||||
if active:
|
||||
unix_cmd(f'systemctl stop {service}', execute)
|
||||
if enabled:
|
||||
unix_cmd(f'systemctl disable {service}', execute)
|
||||
return active, enabled
|
||||
|
||||
|
||||
def change_firewall(set_on, ports, execute=None):
|
||||
ports.add(22) # always add ssh
|
||||
active, enabled = check_service('nftables')
|
||||
if not set_on:
|
||||
if os.geteuid() == 0:
|
||||
check_service('nftables', False, execute)
|
||||
else:
|
||||
print('need sudo rights to modify firewall')
|
||||
return active or enabled
|
||||
|
||||
pattern = re.compile('(tcp dport ({.*}) ct state new accept)', re.MULTILINE | re.DOTALL)
|
||||
|
||||
for filename in FIREWALL_CONF, os.path.join(BoxInfo.TOOLS, 'nftables.conf'):
|
||||
with open(filename) as f:
|
||||
content = f.read()
|
||||
|
||||
try:
|
||||
((prevline, prevports),) = pattern.findall(content)
|
||||
break
|
||||
except (TypeError, ValueError):
|
||||
pass
|
||||
else:
|
||||
print(f'{FIREWALL_CONF} does not contain expected pattern for open ports - firewall off?')
|
||||
return False
|
||||
|
||||
# parse previous port set
|
||||
prevportset = {int(p) for p in prevports[1:-1].split(',')}
|
||||
line = prevline.replace(prevports, '{ %s }' % ', '.join((str(p) for p in sorted(ports))))
|
||||
if prevportset == ports:
|
||||
if active and enabled:
|
||||
return False
|
||||
check_service('nftables', True, execute)
|
||||
return True
|
||||
if os.geteuid() == 0:
|
||||
if execute is not None:
|
||||
print(f'change firewall ports to {ports}')
|
||||
if execute != 0:
|
||||
with open('f{FIREWALL_CONF}.tmp', 'w') as f:
|
||||
f.write(content.replace(prevline, line))
|
||||
os.rename('f{FIREWALL_CONF}.tmp', FIREWALL_CONF)
|
||||
unix_cmd('systemctl restart nftables', execute)
|
||||
unix_cmd('systemctl enable nftables', execute)
|
||||
elif ports - prevportset:
|
||||
print('need sudo rights to modify firewall')
|
||||
return True
|
||||
|
Reference in New Issue
Block a user