merge manually with mlz repo

as of 2023-05-04

Change-Id: I5926617c454844927799e20a489db20d538db100
This commit is contained in:
zolliker 2023-05-04 16:34:09 +02:00
parent bbe70fb3cb
commit 3fcd72b189
14 changed files with 278 additions and 81 deletions

View File

@ -29,6 +29,9 @@ from os import path
# Add import path for inplace usage # Add import path for inplace usage
sys.path.insert(0, path.abspath(path.join(path.dirname(__file__), '..'))) sys.path.insert(0, path.abspath(path.join(path.dirname(__file__), '..')))
import logging
from mlzlog import ColoredConsoleHandler
from frappy.gui.qt import QApplication from frappy.gui.qt import QApplication
from frappy.gui.cfg_editor.mainwindow import MainWindow from frappy.gui.cfg_editor.mainwindow import MainWindow
@ -38,7 +41,11 @@ def main(argv=None):
parser.add_argument('-f', '--file', help='Configuration file to open.') parser.add_argument('-f', '--file', help='Configuration file to open.')
args = parser.parse_args() args = parser.parse_args()
app = QApplication(argv) app = QApplication(argv)
window = MainWindow(args.file) logger = logging.getLogger('gui')
console = ColoredConsoleHandler()
console.setLevel(logging.INFO)
logger.addHandler(console)
window = MainWindow(args.file, log=logger)
window.show() window.show()
return app.exec() return app.exec()

View File

@ -24,6 +24,8 @@ import configparser
from collections import OrderedDict from collections import OrderedDict
from configparser import NoOptionError from configparser import NoOptionError
from frappy.config import load_config
from frappy.datatypes import StringType, EnumType
from frappy.gui.cfg_editor.tree_widget_item import TreeWidgetItem from frappy.gui.cfg_editor.tree_widget_item import TreeWidgetItem
from frappy.gui.cfg_editor.utils import get_all_children_with_names, \ from frappy.gui.cfg_editor.utils import get_all_children_with_names, \
get_all_items, get_interface_class_from_name, get_module_class_from_name, \ get_all_items, get_interface_class_from_name, get_module_class_from_name, \
@ -41,7 +43,7 @@ SECTIONS = {NODE: 'description',
MODULE: 'class'} MODULE: 'class'}
def write_config(file_name, tree_widget): def write_legacy_config(file_name, tree_widget):
itms = get_all_items(tree_widget) itms = get_all_items(tree_widget)
itm_lines = OrderedDict() itm_lines = OrderedDict()
value_str = '%s = %s' value_str = '%s = %s'
@ -79,8 +81,7 @@ def write_config(file_name, tree_widget):
with open(file_name, 'w', encoding='utf-8') as configfile: with open(file_name, 'w', encoding='utf-8') as configfile:
configfile.write('\n'.join(itm_lines.values())) configfile.write('\n'.join(itm_lines.values()))
def read_legacy_config(file_path):
def read_config(file_path):
# TODO datatype of params and properties # TODO datatype of params and properties
node = TreeWidgetItem(NODE) node = TreeWidgetItem(NODE)
ifs = TreeWidgetItem(name='Interfaces') ifs = TreeWidgetItem(name='Interfaces')
@ -147,6 +148,119 @@ def read_config(file_path):
node = get_comments(node, ifs, mods, file_path) node = get_comments(node, ifs, mods, file_path)
return node, ifs, mods return node, ifs, mods
def fmt_value(param, dt):
if isinstance(dt, StringType):
return repr(param)
if isinstance(dt, EnumType):
try:
return int(param)
except ValueError:
return repr(param)
return param
def fmt_param(param, pnp):
props = pnp[param.name]
if isinstance(props, list):
dt = props[0].datatype
else:
dt = props.datatype
if param.childCount() > 1 or (param.childCount() == 1 and param.child(0).name != 'value'):
values = []
main_value = param.get_value()
if main_value:
values.append(fmt_value(main_value, dt))
for i in range(param.childCount()):
prop = param.child(i)
propdt = props[1][prop.name].datatype
propv = fmt_value(prop.get_value(), propdt)
values.append(f'{prop.name} = {propv}')
values = f'Param({", ".join(values)})'
else:
values = fmt_value(param.get_value(), dt)
return f' {param.name} = {values},'
def write_config(file_name, tree_widget):
"""stopgap python config writing. assumes no comments are in the cfg."""
itms = get_all_items(tree_widget)
for itm in itms:
if itm.kind == 'comment':
print('comments are broken right now. not writing a file. exiting...')
return
lines = []
root = tree_widget.topLevelItem(0)
eq_id = root.name
description = root.get_value()
iface = root.child(0).child(0).name
lines.append(f'Node(\'{eq_id}\',\n {repr(description)},\n \'{iface}\',')
for i in range(2, root.childCount()):
lines.append(fmt_param(root.child(i), {}))
lines.append(')')
mods = root.child(1)
for i in range(mods.childCount()):
lines.append('\n')
mod = mods.child(i)
params_and_props = {}
params_and_props.update(mod.properties)
params_and_props.update(mod.parameters)
descr = None
for i in range(mod.childCount()):
if mod.child(i).name == 'description':
descr = mod.child(i)
break
lines.append(f'Mod(\'{mod.name}\',\n \'{mod.get_value()}\',\n \'{descr.get_value()}\',')
for j in range(mod.childCount()):
if j == i:
continue
lines.append(fmt_param(mod.child(j), params_and_props))
lines.append(')')
with open(file_name, 'w', encoding='utf-8') as configfile:
configfile.write('\n'.join(lines))
def read_config(file_path, log):
config = load_config(file_path, log)
node = TreeWidgetItem(NODE)
ifs = TreeWidgetItem(name='Interfaces')
mods = TreeWidgetItem(name='Modules')
node.addChild(ifs)
node.addChild(mods)
nodecfg = config.pop('node', {})
node.set_name(nodecfg['equipment_id'])
node.set_value(nodecfg['description'])
node.parameters = get_params(NODE)
node.properties = get_props(NODE)
iface = TreeWidgetItem(INTERFACE, nodecfg['interface'])
#act_class = get_interface_class_from_name(section_value)
#act_item.set_class_object(act_class)
ifs.addChild(iface)
for name, modcfg in config.items():
act_item = TreeWidgetItem(MODULE, name)
mods.addChild(act_item)
cls = modcfg.pop('cls')
act_item.set_value(cls)
act_class = get_module_class_from_name(cls)
act_item.set_class_object(act_class)
act_item.parameters = get_params(act_class)
act_item.properties = get_props(act_class)
for param, options in modcfg.items():
if not isinstance(options, dict):
prop = TreeWidgetItem(PROPERTY, param, str(options))
act_item.addChild(prop)
else:
param = TreeWidgetItem(PARAMETER, param)
param.set_value('')
act_item.addChild(param)
for k, v in options.items():
if k == 'value':
param.set_value(str(v))
else:
param.addChild(TreeWidgetItem(PROPERTY, k, str(v)))
#node = get_comments(node, ifs, mods, file_path)
return node, ifs, mods
def get_value(config, section, option): def get_value(config, section, option):
value = config.get(section, option) value = config.get(section, option)

View File

@ -39,9 +39,10 @@ COMMENT = 'comment'
class MainWindow(QMainWindow): class MainWindow(QMainWindow):
def __init__(self, file_path=None, parent=None): def __init__(self, file_path=None, log=None, parent=None):
super().__init__(parent) super().__init__(parent)
loadUi(self, 'mainwindow.ui') loadUi(self, 'mainwindow.ui')
self.log = log
self.tabWidget.currentChanged.connect(self.tab_relevant_btns_disable) self.tabWidget.currentChanged.connect(self.tab_relevant_btns_disable)
if file_path is None: if file_path is None:
self.tab_relevant_btns_disable(-1) self.tab_relevant_btns_disable(-1)
@ -179,7 +180,7 @@ class MainWindow(QMainWindow):
QMessageBox.StandardButton.Save) QMessageBox.StandardButton.Save)
def new_node(self, name, file_path=None): def new_node(self, name, file_path=None):
node = NodeDisplay(file_path) node = NodeDisplay(file_path, self.log)
if node.created: if node.created:
node.tree_widget.currentItemChanged.connect(self.disable_btns) node.tree_widget.currentItemChanged.connect(self.disable_btns)
self.tabWidget.setCurrentIndex(self.tabWidget.addTab(node, name)) self.tabWidget.setCurrentIndex(self.tabWidget.addTab(node, name))

View File

@ -26,10 +26,12 @@ from frappy.gui.cfg_editor.utils import loadUi
class NodeDisplay(QWidget): class NodeDisplay(QWidget):
def __init__(self, file_path=None, parent=None): def __init__(self, file_path=None, log=None, parent=None):
super().__init__(parent) super().__init__(parent)
loadUi(self, 'node_display.ui') loadUi(self, 'node_display.ui')
self.log = log
self.saved = bool(file_path) self.saved = bool(file_path)
self.tree_widget.log = log
self.created = self.tree_widget.set_file(file_path) self.created = self.tree_widget.set_file(file_path)
self.tree_widget.save_status_changed.connect(self.change_save_status) self.tree_widget.save_status_changed.connect(self.change_save_status)
self.tree_widget.currentItemChanged.connect(self.set_scroll_area) self.tree_widget.currentItemChanged.connect(self.set_scroll_area)

View File

@ -99,8 +99,8 @@ def get_file_paths(widget, open_file=True):
dialog.setAcceptMode(QFileDialog.AcceptMode.AcceptSave) dialog.setAcceptMode(QFileDialog.AcceptMode.AcceptSave)
dialog.setFileMode(QFileDialog.FileMode.AnyFile) dialog.setFileMode(QFileDialog.FileMode.AnyFile)
dialog.setWindowTitle(title) dialog.setWindowTitle(title)
dialog.setNameFilter('*.cfg') dialog.setNameFilter('*.py')
dialog.setDefaultSuffix('.cfg') dialog.setDefaultSuffix('.py')
dialog.exec() dialog.exec()
return dialog.selectedFiles() return dialog.selectedFiles()

View File

@ -102,7 +102,7 @@ class TreeWidget(QTreeWidget):
self.file_path = file_path self.file_path = file_path
if self.file_path: if self.file_path:
if os.path.isfile(file_path): if os.path.isfile(file_path):
self.set_tree(read_config(self.file_path)) self.set_tree(read_config(self.file_path, self.log))
self.emit_save_status_changed(True) self.emit_save_status_changed(True)
return True return True
self.file_path = None self.file_path = None
@ -277,7 +277,7 @@ class TreeWidget(QTreeWidget):
file_name = self.file_path file_name = self.file_path
if not self.file_path or save_as: if not self.file_path or save_as:
file_name = get_file_paths(self, False)[-1] file_name = get_file_paths(self, False)[-1]
if file_name[-4:] == '.cfg': if file_name[-3:] == '.py':
self.file_path = file_name self.file_path = file_name
write_config(self.file_path, self) write_config(self.file_path, self)
self.emit_save_status_changed(True) self.emit_save_status_changed(True)

View File

@ -228,7 +228,7 @@ def get_widget(datatype, readonly=False, parent=None):
TupleOf: TupleWidget, TupleOf: TupleWidget,
StructOf: StructWidget, StructOf: StructWidget,
ArrayOf: ArrayWidget, ArrayOf: ArrayWidget,
}.get(datatype.__class__)(datatype, readonly, parent) }.get(datatype.__class__, TextWidget)(datatype, readonly, parent)
# TODO: handle NoneOr # TODO: handle NoneOr

View File

@ -21,9 +21,9 @@
# ***************************************************************************** # *****************************************************************************
"""Define helpers""" """Define helpers"""
import re
import importlib import importlib
import linecache import linecache
import re
import socket import socket
import sys import sys
import threading import threading
@ -295,50 +295,68 @@ def formatException(cut=0, exc_info=None, verbose=False):
return ''.join(res) return ''.join(res)
HOSTNAMEPAT = re.compile(r'[a-z0-9_.-]+$', re.IGNORECASE) # roughly checking for a valid hostname or ip address HOSTNAMEPART = re.compile(r'^(?!-)[a-z0-9-]{1,63}(?<!-)$', re.IGNORECASE)
def validate_hostname(host):
"""checks if the rules for valid hostnames are adhered to"""
if len(host) > 255:
return False
for part in host.split('.'):
if not HOSTNAMEPART.match(part):
return False
return True
def parseHostPort(host, defaultport): def validate_ipv4(addr):
"""Parse host[:port] string and tuples """check if v4 address is valid."""
try:
socket.inet_aton(addr)
except OSError:
return False
return True
Specify 'host[:port]' or a (host, port) tuple for the mandatory argument.
If the port specification is missing, the value of the defaultport is used.
raises TypeError in case host is neither a string nor an iterable def validate_ipv6(addr):
raises ValueError in other cases of invalid arguments """check if v6 address is valid."""
try:
socket.inet_pton(socket.AF_INET6, addr)
except OSError:
return False
return True
def parse_ipv6_host_and_port(addr, defaultport=10767):
""" Parses IPv6 addresses with optional port. See parse_host_port for valid formats"""
if ']' in addr:
host, port = addr.rsplit(':', 1)
return host[1:-1], int(port)
if '.' in addr:
host, port = addr.rsplit('.', 1)
return host, int(port)
return (host, defaultport)
def parse_host_port(host, defaultport=10767):
"""Parses hostnames and IP (4/6) addressses.
The accepted formats are:
- a standard hostname
- base IPv6 or 4 addresses
- 'hostname:port'
- IPv4 addresses in the form of 'IPv4:port'
- IPv6 addresses in the forms '[IPv6]:port' or 'IPv6.port'
""" """
if isinstance(host, str): colons = host.count(':')
host, sep, port = host.partition(':') if colons == 0: # hostname/ipv4 wihtout port
if sep: port = defaultport
port = int(port) elif colons == 1: # hostname or ipv4 with port
else: host, port = host.split(':')
port = defaultport port = int(port)
else: else: # ipv6
host, port = host host, port = parse_ipv6_host_and_port(host, defaultport)
if not HOSTNAMEPAT.match(host): if (validate_ipv4(host) or validate_hostname(host) or validate_ipv6(host)) \
raise ValueError(f'illegal host name {host!r}') and 0 < port < 65536:
if 0 < port < 65536: return (host, port)
return host, port raise ValueError(f'invalid host {host!r} or port {port}')
raise ValueError(f'illegal port number: {port!r}')
def tcpSocket(host, defaultport, timeout=None):
"""Helper for opening a TCP client socket to a remote server.
Specify 'host[:port]' or a (host, port) tuple for the mandatory argument.
If the port specification is missing, the value of the defaultport is used.
If timeout is set to a number, the timout of the connection is set to this
number, else the socket stays in blocking mode.
"""
host, port = parseHostPort(host, defaultport)
# open socket and set options
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if timeout:
s.settimeout(timeout)
# connect
s.connect((host, int(port)))
return s
# keep a reference to socket to avoid (interpreter) shut-down problems # keep a reference to socket to avoid (interpreter) shut-down problems

View File

@ -35,7 +35,7 @@ import time
import re import re
from frappy.errors import CommunicationFailedError, ConfigError from frappy.errors import CommunicationFailedError, ConfigError
from frappy.lib import closeSocket, parseHostPort, tcpSocket from frappy.lib import closeSocket, parse_host_port
try: try:
from serial import Serial from serial import Serial
@ -60,7 +60,7 @@ class AsynConn:
if not iocls: if not iocls:
# try tcp, if scheme not given # try tcp, if scheme not given
try: try:
parseHostPort(uri, 1) # check hostname only parse_host_port(uri, 1) # check hostname only
except ValueError: except ValueError:
if 'COM' in uri: if 'COM' in uri:
raise ValueError("the correct uri for a COM port is: " raise ValueError("the correct uri for a COM port is: "
@ -175,7 +175,9 @@ class AsynTcp(AsynConn):
if uri.startswith('tcp://'): if uri.startswith('tcp://'):
uri = uri[6:] uri = uri[6:]
try: try:
self.connection = tcpSocket(uri, self.default_settings.get('port'), self.timeout)
host, port = parse_host_port(uri, self.default_settings.get('port'))
self.connection = socket.create_connection((host, port), timeout=self.timeout)
except (ConnectionRefusedError, socket.gaierror, socket.timeout) as e: except (ConnectionRefusedError, socket.gaierror, socket.timeout) as e:
# indicate that retrying might make sense # indicate that retrying might make sense
raise CommunicationFailedError(str(e)) from None raise CommunicationFailedError(str(e)) from None

View File

@ -21,22 +21,22 @@
# ***************************************************************************** # *****************************************************************************
"""provides tcp interface to the SECoP Server""" """provides tcp interface to the SECoP Server"""
import errno
import os
import socket import socket
import socketserver import socketserver
import sys import sys
import threading import threading
import time import time
import errno
import os
from frappy.datatypes import BoolType, StringType from frappy.datatypes import BoolType, StringType
from frappy.errors import SECoPError from frappy.errors import SECoPError
from frappy.lib import formatException, \ from frappy.lib import formatException, formatExtendedStack, \
formatExtendedStack, formatExtendedTraceback formatExtendedTraceback
from frappy.properties import Property from frappy.properties import Property
from frappy.protocol.interface import decode_msg, encode_msg_frame, get_msg from frappy.protocol.interface import decode_msg, encode_msg_frame, get_msg
from frappy.protocol.messages import ERRORPREFIX, \ from frappy.protocol.messages import ERRORPREFIX, HELPREPLY, HELPREQUEST, \
HELPREPLY, HELPREQUEST, HelpMessage HelpMessage
DEF_PORT = 10767 DEF_PORT = 10767
MESSAGE_READ_SIZE = 1024 MESSAGE_READ_SIZE = 1024
@ -57,7 +57,7 @@ class TCPRequestHandler(socketserver.BaseRequestHandler):
clientaddr = self.client_address clientaddr = self.client_address
serverobj = self.server serverobj = self.server
self.log.info("handling new connection from %s:%d" % clientaddr) self.log.info("handling new connection from %s", format_address(clientaddr))
data = b'' data = b''
# notify dispatcher of us # notify dispatcher of us
@ -160,7 +160,7 @@ class TCPRequestHandler(socketserver.BaseRequestHandler):
def finish(self): def finish(self):
"""called when handle() terminates, i.e. the socket closed""" """called when handle() terminates, i.e. the socket closed"""
self.log.info('closing connection from %s:%d' % self.client_address) self.log.info('closing connection from %s', format_address(self.client_address))
# notify dispatcher # notify dispatcher
self.server.dispatcher.remove_connection(self) self.server.dispatcher.remove_connection(self)
# close socket # close socket
@ -171,8 +171,28 @@ class TCPRequestHandler(socketserver.BaseRequestHandler):
finally: finally:
self.request.close() self.request.close()
class DualStackTCPServer(socketserver.ThreadingTCPServer):
"""Subclassed to provide IPv6 capabilities as socketserver only uses IPv4"""
def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True, enable_ipv6=False):
super().__init__(
server_address, RequestHandlerClass, bind_and_activate=False)
class TCPServer(socketserver.ThreadingTCPServer): # override default socket
if enable_ipv6:
self.address_family = socket.AF_INET6
self.socket = socket.socket(self.address_family,
self.socket_type)
self.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
if bind_and_activate:
try:
self.server_bind()
self.server_activate()
except:
self.server_close()
raise
class TCPServer(DualStackTCPServer):
daemon_threads = True daemon_threads = True
# on windows, 'reuse_address' means that several servers might listen on # on windows, 'reuse_address' means that several servers might listen on
# the same port, on the other hand, a port is not blocked after closing # the same port, on the other hand, a port is not blocked after closing
@ -191,13 +211,16 @@ class TCPServer(socketserver.ThreadingTCPServer):
self.name = name self.name = name
self.log = logger self.log = logger
port = int(options.pop('uri').split('://', 1)[-1]) port = int(options.pop('uri').split('://', 1)[-1])
enable_ipv6 = options.pop('ipv6', False)
self.detailed_errors = options.pop('detailed_errors', False) self.detailed_errors = options.pop('detailed_errors', False)
self.log.info("TCPServer %s binding to port %d", name, port) self.log.info("TCPServer %s binding to port %d", name, port)
for ntry in range(5): for ntry in range(5):
try: try:
socketserver.ThreadingTCPServer.__init__( DualStackTCPServer.__init__(
self, ('0.0.0.0', port), TCPRequestHandler, bind_and_activate=True) self, ('', port), TCPRequestHandler,
bind_and_activate=True, enable_ipv6=enable_ipv6
)
break break
except OSError as e: except OSError as e:
if e.args[0] == errno.EADDRINUSE: # address already in use if e.args[0] == errno.EADDRINUSE: # address already in use
@ -217,3 +240,11 @@ class TCPServer(socketserver.ThreadingTCPServer):
def __exit__(self, *args): def __exit__(self, *args):
self.server_close() self.server_close()
def format_address(addr):
if len(addr) == 2:
return '%s:%d' % addr
address, port = addr[0:2]
if address.startswith('::ffff'):
return '%s:%d' % (address[7:], port)
return '[%s]:%d' % (address, port)

View File

@ -125,6 +125,9 @@ class Server:
def unknown_options(self, cls, options): def unknown_options(self, cls, options):
return f"{cls.__name__} class don't know how to handle option(s): {', '.join(options)}" return f"{cls.__name__} class don't know how to handle option(s): {', '.join(options)}"
def restart_hook(self):
pass
def run(self): def run(self):
while self._restart: while self._restart:
self._restart = False self._restart = False

View File

@ -460,6 +460,12 @@ class AnalogOutput(PyTangoDevice, Drivable):
_history = () _history = ()
_timeout = None _timeout = None
_moving = False _moving = False
__main_unit = None
def applyMainUnit(self, mainunit):
# called from __init__ method
# replacement of '$' by main unit must be done later
self.__main_unit = mainunit
def initModule(self): def initModule(self):
super().initModule() super().initModule()
@ -469,12 +475,18 @@ class AnalogOutput(PyTangoDevice, Drivable):
def startModule(self, start_events): def startModule(self, start_events):
super().startModule(start_events) super().startModule(start_events)
# query unit from tango and update value property try:
attrInfo = self._dev.attribute_query('value') # query unit from tango and update value property
# prefer configured unit if nothing is set on the Tango device, else attrInfo = self._dev.attribute_query('value')
# update # prefer configured unit if nothing is set on the Tango device, else
if attrInfo.unit != 'No unit': # update
self.accessibles['value'].datatype.setProperty('unit', attrInfo.unit) if attrInfo.unit != 'No unit':
self.accessibles['value'].datatype.setProperty('unit', attrInfo.unit)
self.__main_unit = attrInfo.unit
except Exception as e:
self.log.error(e)
if self.__main_unit:
super().applyMainUnit(self.__main_unit)
def doPoll(self): def doPoll(self):
super().doPoll() super().doPoll()
@ -543,7 +555,7 @@ class AnalogOutput(PyTangoDevice, Drivable):
return self.abslimits[1] return self.abslimits[1]
def __getusermin(self): def __getusermin(self):
return self.userlimits[0] return max(self.userlimits[0], self.abslimits[0])
def __setusermin(self, value): def __setusermin(self, value):
self.userlimits = (value, self.userlimits[1]) self.userlimits = (value, self.userlimits[1])
@ -551,7 +563,7 @@ class AnalogOutput(PyTangoDevice, Drivable):
usermin = property(__getusermin, __setusermin) usermin = property(__getusermin, __setusermin)
def __getusermax(self): def __getusermax(self):
return self.userlimits[1] return min(self.userlimits[1], self.abslimits[1])
def __setusermax(self, value): def __setusermax(self, value):
self.userlimits = (self.userlimits[0], value) self.userlimits = (self.userlimits[0], value)

View File

@ -31,7 +31,7 @@ def make_cvt_list(dt, tail=''):
tail is a postfix to be appended in case of tuples and structs tail is a postfix to be appended in case of tuples and structs
""" """
if isinstance(dt, (EnumType, IntRange, BoolType)): if isinstance(dt, (EnumType, IntRange, BoolType)):
return[(int, tail, {type: 'NUM'})] return[(int, tail, {'type': 'NUM'})]
if isinstance(dt, (FloatRange, ScaledInteger)): if isinstance(dt, (FloatRange, ScaledInteger)):
return [(dt.import_value, tail, return [(dt.import_value, tail,
{'type': 'NUM', 'unit': dt.unit, 'period': 5} if dt.unit else {})] {'type': 'NUM', 'unit': dt.unit, 'period': 5} if dt.unit else {})]

View File

@ -21,21 +21,28 @@
# ***************************************************************************** # *****************************************************************************
import pytest import pytest
from frappy.lib import parseHostPort
from frappy.lib import parse_host_port
@pytest.mark.parametrize('hostport, defaultport, result', [ @pytest.mark.parametrize('hostport, defaultport, result', [
(('box.psi.ch', 9999), 1, ('box.psi.ch', 9999)), ('box.psi.ch:9999', 1, ('box.psi.ch', 9999)),
(('/dev/tty', 9999), 1, None), ('/dev/tty:9999', 1, None),
('localhost:10767', 1, ('localhost', 10767)), ('localhost:10767', 1, ('localhost', 10767)),
('www.psi.ch', 80, ('www.psi.ch', 80)), ('www.psi.ch', 80, ('www.psi.ch', 80)),
('/dev/ttyx:2089', 10767, None), ('/dev/ttyx:2089', 10767, None),
('COM4:', 2089, None), ('COM4:', 2089, None),
('underscore_valid.123.hyphen-valid.com', 80, ('underscore_valid.123.hyphen-valid.com', 80)), ('123.hyphen-valid.com', 80, ('123.hyphen-valid.com', 80)),
('underscore_invalid.123.hyphen-valid.com:10000', 80, None),
('::1.1111', 2, ('::1', 1111)),
('[2e::fe]:1', 50, ('2e::fe', 1)),
('127.0.0.1:50', 1337, ('127.0.0.1', 50)),
('234.40.128.3:13212', 1337, ('234.40.128.3', 13212)),
]) ])
def test_parse_host(hostport, defaultport, result): def test_parse_host(hostport, defaultport, result):
if result is None: if result is None:
with pytest.raises(ValueError): with pytest.raises(ValueError):
parseHostPort(hostport, defaultport) parse_host_port(hostport, defaultport)
else: else:
assert result == parseHostPort(hostport, defaultport) assert result == parse_host_port(hostport, defaultport)