Improve address and connection handling

* improve address checking
+ add ipv6 capabilities

Change-Id: I5369336bec449c27d79d857018f319266dfd4d0e
Reviewed-on: https://forge.frm2.tum.de/review/c/secop/frappy/+/30885
Tested-by: Jenkins Automated Tests <pedersen+jenkins@frm2.tum.de>
Reviewed-by: Alexander Zaft <a.zaft@fz-juelich.de>
This commit is contained in:
Alexander Zaft
2023-04-12 14:15:52 +02:00
parent a49d64953c
commit f491625dd1
4 changed files with 118 additions and 60 deletions

View File

@@ -21,9 +21,9 @@
# *****************************************************************************
"""Define helpers"""
import re
import importlib
import linecache
import re
import socket
import sys
import threading
@@ -295,50 +295,68 @@ def formatException(cut=0, exc_info=None, verbose=False):
return ''.join(res)
HOSTNAMEPAT = re.compile(r'[a-z0-9_.-]+$', re.IGNORECASE) # roughly checking for a valid hostname or ip address
HOSTNAMEPART = re.compile(r'^(?!-)[a-z0-9-]{1,63}(?<!-)$', re.IGNORECASE)
def validate_hostname(host):
"""checks if the rules for valid hostnames are adhered to"""
if len(host) > 255:
return False
for part in host.split('.'):
if not HOSTNAMEPART.match(part):
return False
return True
def parseHostPort(host, defaultport):
"""Parse host[:port] string and tuples
def validate_ipv4(addr):
"""check if v4 address is valid."""
try:
socket.inet_aton(addr)
except OSError:
return False
return True
Specify 'host[:port]' or a (host, port) tuple for the mandatory argument.
If the port specification is missing, the value of the defaultport is used.
raises TypeError in case host is neither a string nor an iterable
raises ValueError in other cases of invalid arguments
def validate_ipv6(addr):
"""check if v6 address is valid."""
try:
socket.inet_pton(socket.AF_INET6, addr)
except OSError:
return False
return True
def parse_ipv6_host_and_port(addr, defaultport=10767):
""" Parses IPv6 addresses with optional port. See parse_host_port for valid formats"""
if ']' in addr:
host, port = addr.rsplit(':', 1)
return host[1:-1], int(port)
if '.' in addr:
host, port = addr.rsplit('.', 1)
return host, int(port)
return (host, defaultport)
def parse_host_port(host, defaultport=10767):
"""Parses hostnames and IP (4/6) addressses.
The accepted formats are:
- a standard hostname
- base IPv6 or 4 addresses
- 'hostname:port'
- IPv4 addresses in the form of 'IPv4:port'
- IPv6 addresses in the forms '[IPv6]:port' or 'IPv6.port'
"""
if isinstance(host, str):
host, sep, port = host.partition(':')
if sep:
port = int(port)
else:
port = defaultport
else:
host, port = host
if not HOSTNAMEPAT.match(host):
raise ValueError(f'illegal host name {host!r}')
if 0 < port < 65536:
return host, port
raise ValueError(f'illegal port number: {port!r}')
def tcpSocket(host, defaultport, timeout=None):
"""Helper for opening a TCP client socket to a remote server.
Specify 'host[:port]' or a (host, port) tuple for the mandatory argument.
If the port specification is missing, the value of the defaultport is used.
If timeout is set to a number, the timout of the connection is set to this
number, else the socket stays in blocking mode.
"""
host, port = parseHostPort(host, defaultport)
# open socket and set options
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if timeout:
s.settimeout(timeout)
# connect
s.connect((host, int(port)))
return s
colons = host.count(':')
if colons == 0: # hostname/ipv4 wihtout port
port = defaultport
elif colons == 1: # hostname or ipv4 with port
host, port = host.split(':')
port = int(port)
else: # ipv6
host, port = parse_ipv6_host_and_port(host, defaultport)
if (validate_ipv4(host) or validate_hostname(host) or validate_ipv6(host)) \
and 0 < port < 65536:
return (host, port)
raise ValueError(f'invalid host {host!r} or port {port}')
# keep a reference to socket to avoid (interpreter) shut-down problems