AsynConn.uri: better handling for missing scheme

- check roughly for hostname being a valid address
- allow missing 'tcp' scheme even with missing port number

Change-Id: Ia3ce4cb7b8d2a4b339421eafe21f06fba6d938e6
Reviewed-on: https://forge.frm2.tum.de/review/c/secop/frappy/+/30582
Tested-by: Jenkins Automated Tests <pedersen+jenkins@frm2.tum.de>
Reviewed-by: Markus Zolliker <markus.zolliker@psi.ch>
This commit is contained in:
zolliker 2023-03-06 09:53:12 +01:00
parent b0b85a2711
commit 79da402ceb
4 changed files with 72 additions and 19 deletions

View File

@ -1,10 +1,15 @@
from os import environ
# either change the uri or set the environment variable 'LS_URI'
lakeshore_uri = environ.get('LS_URI', 'tcp://<host>:7777')
Node('example_cryo.psi.ch', # a globally unique identification
'this is an example cryostat for the Frappy tutorial', # describes the node
interface='tcp://10767') # you might choose any port number > 1024
Mod('io', # the name of the module
'frappy_demo.lakeshore.LakeshoreIO', # the class used for communication
'communication to main controller', # a description
uri='tcp://129.129.138.78:7777', # the serial connection
uri=lakeshore_uri, # the serial connection
)
Mod('T',
'frappy_demo.lakeshore.TemperatureLoop',

View File

@ -21,7 +21,7 @@
# *****************************************************************************
"""Define helpers"""
import os
import re
import importlib
import linecache
import socket
@ -296,23 +296,31 @@ def formatException(cut=0, exc_info=None, verbose=False):
return ''.join(res)
HOSTNAMEPAT = re.compile(r'[a-z0-9_.-]+$', re.IGNORECASE) # roughly checking for a valid hostname or ip address
def parseHostPort(host, defaultport):
"""Parse host[:port] string and tuples
Specify 'host[:port]' or a (host, port) tuple for the mandatory argument.
If the port specification is missing, the value of the defaultport is used.
"""
if isinstance(host, (tuple, list)):
host, port = host
elif ':' in host:
host, port = host.rsplit(':', 1)
raises TypeError in case host is neither a string nor an iterable
raises ValueError in other cases of invalid arguments
"""
if isinstance(host, str):
host, sep, port = host.partition(':')
if sep:
port = int(port)
else:
port = defaultport
assert 0 < port < 65536
assert ':' not in host
else:
host, port = host
if not HOSTNAMEPAT.match(host):
raise ValueError('illegal host name %r' % host)
if 0 < port < 65536:
return host, port
raise ValueError('illegal port number: %r' % port)
def tcpSocket(host, defaultport, timeout=None):

View File

@ -52,6 +52,7 @@ class AsynConn:
scheme = None
SCHEME_MAP = {}
connection = None # is not None, if connected
HOSTNAMEPAT = re.compile(r'[a-z0-9_.-]+$', re.IGNORECASE) # roughly checking if it is a valid hostname
def __new__(cls, uri, end_of_line=b'\n', default_settings=None):
scheme = uri.split('://')[0]
@ -59,17 +60,16 @@ class AsynConn:
if not iocls:
# try tcp, if scheme not given
try:
host_port = parseHostPort(uri, None)
except (ValueError, TypeError, AssertionError):
parseHostPort(uri, 1) # check hostname only
except ValueError:
if 'COM' in uri:
raise ValueError("the correct uri for a COM port is: "
"'serial://COM<i>[?<option>=<value>[+<option>=value ...]]'") from None
"'serial://COM<i>[?<option>=<value>[&<option>=value ...]]'") from None
if '/dev' in uri:
raise ValueError("the correct uri for a serial port is: "
"'serial:///dev/<tty>[?<option>=<value>[+<option>=value ...]]'") from None
raise ValueError('invalid uri: %s' % uri) from None
"'serial:///dev/<tty>[?<option>=<value>[&<option>=value ...]]'") from None
raise ValueError('invalid hostname %r' % uri) from None
iocls = cls.SCHEME_MAP['tcp']
uri = 'tcp://%s:%d' % host_port
return object.__new__(iocls)
def __init__(self, uri, end_of_line=b'\n', default_settings=None):
@ -170,7 +170,6 @@ class AsynTcp(AsynConn):
super().__init__(uri, *args, **kwargs)
self.uri = uri
if uri.startswith('tcp://'):
# should be the case always
uri = uri[6:]
try:
self.connection = tcpSocket(uri, self.default_settings.get('port'), self.timeout)

41
test/test_lib.py Normal file
View File

@ -0,0 +1,41 @@
# -*- coding: utf-8 -*-
# *****************************************************************************
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
#
# *****************************************************************************
import pytest
from frappy.lib import parseHostPort
@pytest.mark.parametrize('hostport, defaultport, result', [
(('box.psi.ch', 9999), 1, ('box.psi.ch', 9999)),
(('/dev/tty', 9999), 1, None),
('localhost:10767', 1, ('localhost', 10767)),
('www.psi.ch', 80, ('www.psi.ch', 80)),
('/dev/ttyx:2089', 10767, None),
('COM4:', 2089, None),
('underscore_valid.123.hyphen-valid.com', 80, ('underscore_valid.123.hyphen-valid.com', 80)),
])
def test_parse_host(hostport, defaultport, result):
if result is None:
with pytest.raises(ValueError):
parseHostPort(hostport, defaultport)
else:
assert result == parseHostPort(hostport, defaultport)