Merge branch 'wip' of gitlab.psi.ch:samenv/frappy into wip

This commit is contained in:
l_samenv
2023-05-05 13:25:10 +02:00
23 changed files with 428 additions and 301 deletions

View File

@@ -218,7 +218,7 @@ max-branches=50
max-statements=150 max-statements=150
# Maximum number of parents for a class (see R0901). # Maximum number of parents for a class (see R0901).
max-parents=25 max-parents=20
# Maximum number of attributes for a class (see R0902). # Maximum number of attributes for a class (see R0902).
max-attributes=50 max-attributes=50

View File

@@ -29,6 +29,9 @@ from os import path
# Add import path for inplace usage # Add import path for inplace usage
sys.path.insert(0, path.abspath(path.join(path.dirname(__file__), '..'))) sys.path.insert(0, path.abspath(path.join(path.dirname(__file__), '..')))
import logging
from mlzlog import ColoredConsoleHandler
from frappy.gui.qt import QApplication from frappy.gui.qt import QApplication
from frappy.gui.cfg_editor.mainwindow import MainWindow from frappy.gui.cfg_editor.mainwindow import MainWindow
@@ -38,7 +41,11 @@ def main(argv=None):
parser.add_argument('-f', '--file', help='Configuration file to open.') parser.add_argument('-f', '--file', help='Configuration file to open.')
args = parser.parse_args() args = parser.parse_args()
app = QApplication(argv) app = QApplication(argv)
window = MainWindow(args.file) logger = logging.getLogger('gui')
console = ColoredConsoleHandler()
console.setLevel(logging.INFO)
logger.addHandler(console)
window = MainWindow(args.file, log=logger)
window.show() window.show()
return app.exec() return app.exec()

View File

@@ -31,11 +31,11 @@ from frappy.datatypes import ArrayOf, BLOBType, BoolType, EnumType, \
from frappy.lib.enum import Enum from frappy.lib.enum import Enum
from frappy.modules import Attached, Communicator, \ from frappy.modules import Attached, Communicator, \
Done, Drivable, Feature, Module, Readable, Writable, HasAccessibles Done, Drivable, Feature, Module, Readable, Writable, HasAccessibles
from frappy.params import Command, Parameter from frappy.params import Command, Parameter, Limit
from frappy.properties import Property from frappy.properties import Property
from frappy.proxy import Proxy, SecNode, proxy_class from frappy.proxy import Proxy, SecNode, proxy_class
from frappy.io import HasIO, StringIO, BytesIO, HasIodev # TODO: remove HasIodev (legacy stuff) from frappy.io import HasIO, StringIO, BytesIO, HasIodev # TODO: remove HasIodev (legacy stuff)
from frappy.persistent import PersistentMixin, PersistentParam from frappy.persistent import PersistentMixin, PersistentParam, PersistentLimit
from frappy.rwhandler import ReadHandler, WriteHandler, CommonReadHandler, \ from frappy.rwhandler import ReadHandler, WriteHandler, CommonReadHandler, \
CommonWriteHandler, nopoll CommonWriteHandler, nopoll

View File

@@ -22,115 +22,14 @@
"""Define Mixin Features for real Modules implemented in the server""" """Define Mixin Features for real Modules implemented in the server"""
from frappy.datatypes import FloatRange, TupleOf from frappy.datatypes import FloatRange
from frappy.core import Drivable, Feature, \ from frappy.core import Feature, PersistentParam
Parameter, PersistentParam, Readable
from frappy.errors import RangeError, ConfigError
from frappy.lib import clamp
class HasSimpleOffset(Feature): class HasOffset(Feature):
"""has a client side offset parameter """has a client side offset parameter
this is just a storage! this is just a storage!
""" """
featureName = 'HasOffset'
offset = PersistentParam('offset (physical value + offset = HW value)', offset = PersistentParam('offset (physical value + offset = HW value)',
FloatRange(unit='deg'), readonly=False, default=0) FloatRange(unit='$'), readonly=False, default=0)
def write_offset(self, value):
self.offset = value
if isinstance(self, HasLimits):
self.read_limits()
if isinstance(self, Readable):
self.read_value()
if isinstance(self, Drivable):
self.read_target()
self.saveParameters()
return value
class HasTargetLimits(Feature):
"""user limits
implementation to be done in the subclass
according to standard
"""
target_limits = PersistentParam('user limits', readonly=False, default=(-9e99, 9e99),
datatype=TupleOf(FloatRange(unit='deg'), FloatRange(unit='deg')))
_limits = None
def apply_offset(self, sign, *values):
if isinstance(self, HasOffset):
return tuple(v + sign * self.offset for v in values)
return values
def earlyInit(self):
super().earlyInit()
# make limits valid
_limits = self.apply_offset(1, *self.limits)
self._limits = tuple(clamp(self.abslimits[0], v, self.abslimits[1]) for v in _limits)
self.read_limits()
def checkProperties(self):
pname = 'target' if isinstance(self, Drivable) else 'value'
dt = self.parameters[pname].datatype
min_, max_ = self.abslimits
t_min, t_max = self.apply_offset(1, dt.min, dt.max)
if t_min > max_ or t_max < min_:
raise ConfigError(f'abslimits not within {pname} range')
self.abslimits = clamp(t_min, min_, t_max), clamp(t_min, max_, t_max)
super().checkProperties()
def read_limits(self):
return self.apply_offset(-1, *self._limits)
def write_limits(self, value):
min_, max_ = self.apply_offset(-1, *self.abslimits)
if not min_ <= value[0] <= value[1] <= max_:
if value[0] > value[1]:
raise RangeError(f'invalid interval: {value!r}')
raise RangeError(f'limits not within abs limits [{min_:g}, {max_:g}]')
self.limits = value
self.saveParameters()
return self.limits
def check_limits(self, value):
"""check if value is valid"""
min_, max_ = self.target_limits
if not min_ <= value <= max_:
raise RangeError(f'limits violation: {value:g} outside [{min_:g}, {max_:g}]')
# --- legacy mixins, not agreed as standard ---
class HasOffset(Feature):
"""has an offset parameter
implementation to be done in the subclass
"""
offset = Parameter('offset (physical value + offset = HW value)',
FloatRange(unit='$'), readonly=False, default=0)
class HasLimits(Feature):
"""user limits
implementation to be done in the subclass
for a drivable, abslimits is roughly the same as the target datatype limits,
except for the offset
"""
target_limits = Parameter('user limits for target', readonly=False, default=(-9e99, 9e99),
datatype=TupleOf(FloatRange(unit='$'), FloatRange(unit='$')))
def earlyInit(self):
super().earlyInit()
dt = self.parameters['target'].datatype
self.target_limits = dt.min, dt.max
def check_limits(self, value):
"""check if value is valid"""
min_, max_ = self.target_limits
if not min_ <= value <= max_:
raise RangeError('limits violation: %g outside [%g, %g]' % (value, min_, max_))

View File

@@ -24,6 +24,8 @@ import configparser
from collections import OrderedDict from collections import OrderedDict
from configparser import NoOptionError from configparser import NoOptionError
from frappy.config import load_config
from frappy.datatypes import StringType, EnumType
from frappy.gui.cfg_editor.tree_widget_item import TreeWidgetItem from frappy.gui.cfg_editor.tree_widget_item import TreeWidgetItem
from frappy.gui.cfg_editor.utils import get_all_children_with_names, \ from frappy.gui.cfg_editor.utils import get_all_children_with_names, \
get_all_items, get_interface_class_from_name, get_module_class_from_name, \ get_all_items, get_interface_class_from_name, get_module_class_from_name, \
@@ -41,7 +43,7 @@ SECTIONS = {NODE: 'description',
MODULE: 'class'} MODULE: 'class'}
def write_config(file_name, tree_widget): def write_legacy_config(file_name, tree_widget):
itms = get_all_items(tree_widget) itms = get_all_items(tree_widget)
itm_lines = OrderedDict() itm_lines = OrderedDict()
value_str = '%s = %s' value_str = '%s = %s'
@@ -79,8 +81,7 @@ def write_config(file_name, tree_widget):
with open(file_name, 'w', encoding='utf-8') as configfile: with open(file_name, 'w', encoding='utf-8') as configfile:
configfile.write('\n'.join(itm_lines.values())) configfile.write('\n'.join(itm_lines.values()))
def read_legacy_config(file_path):
def read_config(file_path):
# TODO datatype of params and properties # TODO datatype of params and properties
node = TreeWidgetItem(NODE) node = TreeWidgetItem(NODE)
ifs = TreeWidgetItem(name='Interfaces') ifs = TreeWidgetItem(name='Interfaces')
@@ -147,6 +148,119 @@ def read_config(file_path):
node = get_comments(node, ifs, mods, file_path) node = get_comments(node, ifs, mods, file_path)
return node, ifs, mods return node, ifs, mods
def fmt_value(param, dt):
if isinstance(dt, StringType):
return repr(param)
if isinstance(dt, EnumType):
try:
return int(param)
except ValueError:
return repr(param)
return param
def fmt_param(param, pnp):
props = pnp[param.name]
if isinstance(props, list):
dt = props[0].datatype
else:
dt = props.datatype
if param.childCount() > 1 or (param.childCount() == 1 and param.child(0).name != 'value'):
values = []
main_value = param.get_value()
if main_value:
values.append(fmt_value(main_value, dt))
for i in range(param.childCount()):
prop = param.child(i)
propdt = props[1][prop.name].datatype
propv = fmt_value(prop.get_value(), propdt)
values.append(f'{prop.name} = {propv}')
values = f'Param({", ".join(values)})'
else:
values = fmt_value(param.get_value(), dt)
return f' {param.name} = {values},'
def write_config(file_name, tree_widget):
"""stopgap python config writing. assumes no comments are in the cfg."""
itms = get_all_items(tree_widget)
for itm in itms:
if itm.kind == 'comment':
print('comments are broken right now. not writing a file. exiting...')
return
lines = []
root = tree_widget.topLevelItem(0)
eq_id = root.name
description = root.get_value()
iface = root.child(0).child(0).name
lines.append(f'Node(\'{eq_id}\',\n {repr(description)},\n \'{iface}\',')
for i in range(2, root.childCount()):
lines.append(fmt_param(root.child(i), {}))
lines.append(')')
mods = root.child(1)
for i in range(mods.childCount()):
lines.append('\n')
mod = mods.child(i)
params_and_props = {}
params_and_props.update(mod.properties)
params_and_props.update(mod.parameters)
descr = None
for i in range(mod.childCount()):
if mod.child(i).name == 'description':
descr = mod.child(i)
break
lines.append(f'Mod(\'{mod.name}\',\n \'{mod.get_value()}\',\n \'{descr.get_value()}\',')
for j in range(mod.childCount()):
if j == i:
continue
lines.append(fmt_param(mod.child(j), params_and_props))
lines.append(')')
with open(file_name, 'w', encoding='utf-8') as configfile:
configfile.write('\n'.join(lines))
def read_config(file_path, log):
config = load_config(file_path, log)
node = TreeWidgetItem(NODE)
ifs = TreeWidgetItem(name='Interfaces')
mods = TreeWidgetItem(name='Modules')
node.addChild(ifs)
node.addChild(mods)
nodecfg = config.pop('node', {})
node.set_name(nodecfg['equipment_id'])
node.set_value(nodecfg['description'])
node.parameters = get_params(NODE)
node.properties = get_props(NODE)
iface = TreeWidgetItem(INTERFACE, nodecfg['interface'])
#act_class = get_interface_class_from_name(section_value)
#act_item.set_class_object(act_class)
ifs.addChild(iface)
for name, modcfg in config.items():
act_item = TreeWidgetItem(MODULE, name)
mods.addChild(act_item)
cls = modcfg.pop('cls')
act_item.set_value(cls)
act_class = get_module_class_from_name(cls)
act_item.set_class_object(act_class)
act_item.parameters = get_params(act_class)
act_item.properties = get_props(act_class)
for param, options in modcfg.items():
if not isinstance(options, dict):
prop = TreeWidgetItem(PROPERTY, param, str(options))
act_item.addChild(prop)
else:
param = TreeWidgetItem(PARAMETER, param)
param.set_value('')
act_item.addChild(param)
for k, v in options.items():
if k == 'value':
param.set_value(str(v))
else:
param.addChild(TreeWidgetItem(PROPERTY, k, str(v)))
#node = get_comments(node, ifs, mods, file_path)
return node, ifs, mods
def get_value(config, section, option): def get_value(config, section, option):
value = config.get(section, option) value = config.get(section, option)

View File

@@ -39,9 +39,10 @@ COMMENT = 'comment'
class MainWindow(QMainWindow): class MainWindow(QMainWindow):
def __init__(self, file_path=None, parent=None): def __init__(self, file_path=None, log=None, parent=None):
super().__init__(parent) super().__init__(parent)
loadUi(self, 'mainwindow.ui') loadUi(self, 'mainwindow.ui')
self.log = log
self.tabWidget.currentChanged.connect(self.tab_relevant_btns_disable) self.tabWidget.currentChanged.connect(self.tab_relevant_btns_disable)
if file_path is None: if file_path is None:
self.tab_relevant_btns_disable(-1) self.tab_relevant_btns_disable(-1)
@@ -179,7 +180,7 @@ class MainWindow(QMainWindow):
QMessageBox.StandardButton.Save) QMessageBox.StandardButton.Save)
def new_node(self, name, file_path=None): def new_node(self, name, file_path=None):
node = NodeDisplay(file_path) node = NodeDisplay(file_path, self.log)
if node.created: if node.created:
node.tree_widget.currentItemChanged.connect(self.disable_btns) node.tree_widget.currentItemChanged.connect(self.disable_btns)
self.tabWidget.setCurrentIndex(self.tabWidget.addTab(node, name)) self.tabWidget.setCurrentIndex(self.tabWidget.addTab(node, name))

View File

@@ -26,10 +26,12 @@ from frappy.gui.cfg_editor.utils import loadUi
class NodeDisplay(QWidget): class NodeDisplay(QWidget):
def __init__(self, file_path=None, parent=None): def __init__(self, file_path=None, log=None, parent=None):
super().__init__(parent) super().__init__(parent)
loadUi(self, 'node_display.ui') loadUi(self, 'node_display.ui')
self.log = log
self.saved = bool(file_path) self.saved = bool(file_path)
self.tree_widget.log = log
self.created = self.tree_widget.set_file(file_path) self.created = self.tree_widget.set_file(file_path)
self.tree_widget.save_status_changed.connect(self.change_save_status) self.tree_widget.save_status_changed.connect(self.change_save_status)
self.tree_widget.currentItemChanged.connect(self.set_scroll_area) self.tree_widget.currentItemChanged.connect(self.set_scroll_area)

View File

@@ -99,8 +99,8 @@ def get_file_paths(widget, open_file=True):
dialog.setAcceptMode(QFileDialog.AcceptMode.AcceptSave) dialog.setAcceptMode(QFileDialog.AcceptMode.AcceptSave)
dialog.setFileMode(QFileDialog.FileMode.AnyFile) dialog.setFileMode(QFileDialog.FileMode.AnyFile)
dialog.setWindowTitle(title) dialog.setWindowTitle(title)
dialog.setNameFilter('*.cfg') dialog.setNameFilter('*.py')
dialog.setDefaultSuffix('.cfg') dialog.setDefaultSuffix('.py')
dialog.exec() dialog.exec()
return dialog.selectedFiles() return dialog.selectedFiles()

View File

@@ -102,7 +102,7 @@ class TreeWidget(QTreeWidget):
self.file_path = file_path self.file_path = file_path
if self.file_path: if self.file_path:
if os.path.isfile(file_path): if os.path.isfile(file_path):
self.set_tree(read_config(self.file_path)) self.set_tree(read_config(self.file_path, self.log))
self.emit_save_status_changed(True) self.emit_save_status_changed(True)
return True return True
self.file_path = None self.file_path = None
@@ -277,7 +277,7 @@ class TreeWidget(QTreeWidget):
file_name = self.file_path file_name = self.file_path
if not self.file_path or save_as: if not self.file_path or save_as:
file_name = get_file_paths(self, False)[-1] file_name = get_file_paths(self, False)[-1]
if file_name[-4:] == '.cfg': if file_name[-3:] == '.py':
self.file_path = file_name self.file_path = file_name
write_config(self.file_path, self) write_config(self.file_path, self)
self.emit_save_status_changed(True) self.emit_save_status_changed(True)

View File

@@ -228,7 +228,7 @@ def get_widget(datatype, readonly=False, parent=None):
TupleOf: TupleWidget, TupleOf: TupleWidget,
StructOf: StructWidget, StructOf: StructWidget,
ArrayOf: ArrayWidget, ArrayOf: ArrayWidget,
}.get(datatype.__class__)(datatype, readonly, parent) }.get(datatype.__class__, TextWidget)(datatype, readonly, parent)
# TODO: handle NoneOr # TODO: handle NoneOr

View File

@@ -21,9 +21,9 @@
# ***************************************************************************** # *****************************************************************************
"""Define helpers""" """Define helpers"""
import re
import importlib import importlib
import linecache import linecache
import re
import socket import socket
import sys import sys
import threading import threading
@@ -295,50 +295,68 @@ def formatException(cut=0, exc_info=None, verbose=False):
return ''.join(res) return ''.join(res)
HOSTNAMEPAT = re.compile(r'[a-z0-9_.-]+$', re.IGNORECASE) # roughly checking for a valid hostname or ip address HOSTNAMEPART = re.compile(r'^(?!-)[a-z0-9-]{1,63}(?<!-)$', re.IGNORECASE)
def validate_hostname(host):
"""checks if the rules for valid hostnames are adhered to"""
if len(host) > 255:
return False
for part in host.split('.'):
if not HOSTNAMEPART.match(part):
return False
return True
def parseHostPort(host, defaultport): def validate_ipv4(addr):
"""Parse host[:port] string and tuples """check if v4 address is valid."""
try:
socket.inet_aton(addr)
except OSError:
return False
return True
Specify 'host[:port]' or a (host, port) tuple for the mandatory argument.
If the port specification is missing, the value of the defaultport is used.
raises TypeError in case host is neither a string nor an iterable def validate_ipv6(addr):
raises ValueError in other cases of invalid arguments """check if v6 address is valid."""
try:
socket.inet_pton(socket.AF_INET6, addr)
except OSError:
return False
return True
def parse_ipv6_host_and_port(addr, defaultport=10767):
""" Parses IPv6 addresses with optional port. See parse_host_port for valid formats"""
if ']' in addr:
host, port = addr.rsplit(':', 1)
return host[1:-1], int(port)
if '.' in addr:
host, port = addr.rsplit('.', 1)
return host, int(port)
return (host, defaultport)
def parse_host_port(host, defaultport=10767):
"""Parses hostnames and IP (4/6) addressses.
The accepted formats are:
- a standard hostname
- base IPv6 or 4 addresses
- 'hostname:port'
- IPv4 addresses in the form of 'IPv4:port'
- IPv6 addresses in the forms '[IPv6]:port' or 'IPv6.port'
""" """
if isinstance(host, str): colons = host.count(':')
host, sep, port = host.partition(':') if colons == 0: # hostname/ipv4 wihtout port
if sep: port = defaultport
port = int(port) elif colons == 1: # hostname or ipv4 with port
else: host, port = host.split(':')
port = defaultport port = int(port)
else: else: # ipv6
host, port = host host, port = parse_ipv6_host_and_port(host, defaultport)
if not HOSTNAMEPAT.match(host): if (validate_ipv4(host) or validate_hostname(host) or validate_ipv6(host)) \
raise ValueError(f'illegal host name {host!r}') and 0 < port < 65536:
if 0 < port < 65536: return (host, port)
return host, port raise ValueError(f'invalid host {host!r} or port {port}')
raise ValueError(f'illegal port number: {port!r}')
def tcpSocket(host, defaultport, timeout=None):
"""Helper for opening a TCP client socket to a remote server.
Specify 'host[:port]' or a (host, port) tuple for the mandatory argument.
If the port specification is missing, the value of the defaultport is used.
If timeout is set to a number, the timout of the connection is set to this
number, else the socket stays in blocking mode.
"""
host, port = parseHostPort(host, defaultport)
# open socket and set options
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if timeout:
s.settimeout(timeout)
# connect
s.connect((host, int(port)))
return s
# keep a reference to socket to avoid (interpreter) shut-down problems # keep a reference to socket to avoid (interpreter) shut-down problems

View File

@@ -35,7 +35,7 @@ import time
import re import re
from frappy.errors import CommunicationFailedError, ConfigError from frappy.errors import CommunicationFailedError, ConfigError
from frappy.lib import closeSocket, parseHostPort, tcpSocket from frappy.lib import closeSocket, parse_host_port
try: try:
from serial import Serial from serial import Serial
@@ -60,7 +60,7 @@ class AsynConn:
if not iocls: if not iocls:
# try tcp, if scheme not given # try tcp, if scheme not given
try: try:
parseHostPort(uri, 1) # check hostname only parse_host_port(uri, 1) # check hostname only
except ValueError: except ValueError:
if 'COM' in uri: if 'COM' in uri:
raise ValueError("the correct uri for a COM port is: " raise ValueError("the correct uri for a COM port is: "
@@ -175,7 +175,9 @@ class AsynTcp(AsynConn):
if uri.startswith('tcp://'): if uri.startswith('tcp://'):
uri = uri[6:] uri = uri[6:]
try: try:
self.connection = tcpSocket(uri, self.default_settings.get('port'), self.timeout)
host, port = parse_host_port(uri, self.default_settings.get('port'))
self.connection = socket.create_connection((host, port), timeout=self.timeout)
except (ConnectionRefusedError, socket.gaierror, socket.timeout) as e: except (ConnectionRefusedError, socket.gaierror, socket.timeout) as e:
# indicate that retrying might make sense # indicate that retrying might make sense
raise CommunicationFailedError(str(e)) from None raise CommunicationFailedError(str(e)) from None

View File

@@ -34,7 +34,7 @@ from frappy.errors import BadValueError, CommunicationFailedError, ConfigError,
ProgrammingError, SECoPError, secop_error, RangeError ProgrammingError, SECoPError, secop_error, RangeError
from frappy.lib import formatException, mkthread, UniqueObject from frappy.lib import formatException, mkthread, UniqueObject
from frappy.lib.enum import Enum from frappy.lib.enum import Enum
from frappy.params import Accessible, Command, Parameter from frappy.params import Accessible, Command, Parameter, Limit
from frappy.properties import HasProperties, Property from frappy.properties import HasProperties, Property
from frappy.logging import RemoteLogHandler, HasComlog from frappy.logging import RemoteLogHandler, HasComlog
@@ -161,7 +161,7 @@ class HasAccessibles(HasProperties):
# find the base class, where the parameter <limname> is defined first. # find the base class, where the parameter <limname> is defined first.
# we have to check all bases, as they may not be treated yet when # we have to check all bases, as they may not be treated yet when
# not inheriting from HasAccessibles # not inheriting from HasAccessibles
base = next(b for b in reversed(base.__mro__) if limname in b.__dict__) base = next(b for b in reversed(cls.__mro__) if limname in b.__dict__)
if cname not in base.__dict__: if cname not in base.__dict__:
# there is no check method yet at this class # there is no check method yet at this class
# add check function to the class where the limit was defined # add check function to the class where the limit was defined
@@ -431,28 +431,19 @@ class Module(HasAccessibles):
self.valueCallbacks[pname] = [] self.valueCallbacks[pname] = []
self.errorCallbacks[pname] = [] self.errorCallbacks[pname] = []
if not pobj.hasDatatype(): if isinstance(pobj, Limit):
head, _, postfix = pname.rpartition('_') basepname = pname.rpartition('_')[0]
if postfix not in ('min', 'max', 'limits'): baseparam = self.parameters.get(basepname)
errors.append(f'{pname} needs a datatype')
continue
# when datatype is not given, properties are set automagically
pobj.setProperty('readonly', False)
baseparam = self.parameters.get(head)
if not baseparam: if not baseparam:
errors.append(f'parameter {pname!r} is given, but not {head!r}') errors.append(f'limit {pname!r} is given, but not {basepname!r}')
continue continue
dt = baseparam.datatype if baseparam.datatype is None:
if dt is None:
continue # an error will be reported on baseparam continue # an error will be reported on baseparam
if postfix == 'limits': pobj.set_datatype(baseparam.datatype)
pobj.setProperty('datatype', TupleOf(dt, dt))
pobj.setProperty('default', (dt.min, dt.max)) if not pobj.hasDatatype():
else: errors.append(f'{pname} needs a datatype')
pobj.setProperty('datatype', dt) continue
pobj.setProperty('default', getattr(dt, postfix))
if not pobj.description:
pobj.setProperty('description', f'limit for {pname}')
if pobj.value is None: if pobj.value is None:
if pobj.needscfg: if pobj.needscfg:
@@ -805,11 +796,11 @@ class Module(HasAccessibles):
raise ValueError('remote handler not found') raise ValueError('remote handler not found')
self.remoteLogHandler.set_conn_level(self, conn, level) self.remoteLogHandler.set_conn_level(self, conn, level)
def checkLimits(self, value, parametername='target'): def checkLimits(self, value, pname='target'):
"""check for limits """check for limits
:param value: the value to be checked for <parametername>_min <= value <= <parametername>_max :param value: the value to be checked for <pname>_min <= value <= <pname>_max
:param parametername: parameter name, default is 'target' :param pname: parameter name, default is 'target'
raises RangeError in case the value is not valid raises RangeError in case the value is not valid
@@ -818,14 +809,20 @@ class Module(HasAccessibles):
when no automatic super call is desired. when no automatic super call is desired.
""" """
try: try:
min_, max_ = getattr(self, parametername + '_limits') min_, max_ = getattr(self, pname + '_limits')
if not min_ <= value <= max_:
raise RangeError(f'{pname} outside {pname}_limits')
return
except AttributeError: except AttributeError:
min_ = getattr(self, parametername + '_min', float('-inf')) pass
max_ = getattr(self, parametername + '_max', float('inf')) min_ = getattr(self, pname + '_min', float('-inf'))
if not min_ <= value <= max_: max_ = getattr(self, pname + '_max', float('inf'))
if min_ > max_: if min_ > max_:
raise RangeError(f'invalid limits: [{min_:g}, {max_:g}]') raise RangeError(f'invalid limits: {pname}_min > {pname}_max')
raise RangeError(f'limits violation: {value:g} outside [{min_:g}, {max_:g}]') if value < min_:
raise RangeError(f'{pname} below {pname}_min')
if value > max_:
raise RangeError(f'{pname} above {pname}_max')
class Readable(Module): class Readable(Module):

View File

@@ -514,6 +514,35 @@ class Command(Accessible):
return result[:-1] + f', {self.func!r})' if self.func else result return result[:-1] + f', {self.func!r})' if self.func else result
class Limit(Parameter):
"""a special limit parameter"""
POSTFIXES = {'min', 'max', 'limits'} # allowed postfixes
def __set_name__(self, owner, name):
super().__set_name__(owner, name)
head, _, postfix = name.rpartition('_')
if postfix not in self.POSTFIXES:
raise ProgrammingError(f'Limit name must end with one of {self.POSTFIXES}')
if 'readonly' not in self.propertyValues:
self.readonly = False
if not self.description:
self.description = f'limit for {head}'
if self.export.startswith('_') and PREDEFINED_ACCESSIBLES.get(head):
self.export = self.export[1:]
def set_datatype(self, datatype):
if self.hasDatatype():
return # the programmer is responsible that a given datatype is correct
postfix = self.name.rpartition('_')[-1]
postfix = self.name.rpartition('_')[-1]
if postfix == 'limits':
self.datatype = TupleOf(datatype, datatype)
self.default = (datatype.min, datatype.max)
else: # min, max
self.datatype = datatype
self.default = getattr(datatype, postfix)
# list of predefined accessibles with their type # list of predefined accessibles with their type
PREDEFINED_ACCESSIBLES = { PREDEFINED_ACCESSIBLES = {
'value': Parameter, 'value': Parameter,

View File

@@ -57,7 +57,7 @@ import json
from frappy.lib import generalConfig from frappy.lib import generalConfig
from frappy.datatypes import EnumType from frappy.datatypes import EnumType
from frappy.params import Parameter, Property, Command from frappy.params import Parameter, Property, Command, Limit
from frappy.modules import Module from frappy.modules import Module
@@ -67,6 +67,10 @@ class PersistentParam(Parameter):
given = False given = False
class PersistentLimit(Limit, Parameter):
pass
class PersistentMixin(Module): class PersistentMixin(Module):
persistentData = None # dict containing persistent data after startup persistentData = None # dict containing persistent data after startup

View File

@@ -81,6 +81,7 @@ class Dispatcher:
def __init__(self, name, log, opts, srv): def __init__(self, name, log, opts, srv):
self.log = log self.log = log
self._modules = {} self._modules = {}
self.equipment_id = opts.pop('equipment_id', name)
def announce_update(self, modulename, pname, pobj): def announce_update(self, modulename, pname, pobj):
if pobj.readerror: if pobj.readerror:

View File

@@ -21,22 +21,22 @@
# ***************************************************************************** # *****************************************************************************
"""provides tcp interface to the SECoP Server""" """provides tcp interface to the SECoP Server"""
import errno
import os
import socket import socket
import socketserver import socketserver
import sys import sys
import threading import threading
import time import time
import errno
import os
from frappy.datatypes import BoolType, StringType from frappy.datatypes import BoolType, StringType
from frappy.errors import SECoPError from frappy.errors import SECoPError
from frappy.lib import formatException, \ from frappy.lib import formatException, formatExtendedStack, \
formatExtendedStack, formatExtendedTraceback formatExtendedTraceback
from frappy.properties import Property from frappy.properties import Property
from frappy.protocol.interface import decode_msg, encode_msg_frame, get_msg from frappy.protocol.interface import decode_msg, encode_msg_frame, get_msg
from frappy.protocol.messages import ERRORPREFIX, \ from frappy.protocol.messages import ERRORPREFIX, HELPREPLY, HELPREQUEST, \
HELPREPLY, HELPREQUEST, HelpMessage HelpMessage
DEF_PORT = 10767 DEF_PORT = 10767
MESSAGE_READ_SIZE = 1024 MESSAGE_READ_SIZE = 1024
@@ -57,7 +57,7 @@ class TCPRequestHandler(socketserver.BaseRequestHandler):
clientaddr = self.client_address clientaddr = self.client_address
serverobj = self.server serverobj = self.server
self.log.info("handling new connection from %s:%d" % clientaddr) self.log.info("handling new connection from %s", format_address(clientaddr))
data = b'' data = b''
# notify dispatcher of us # notify dispatcher of us
@@ -160,7 +160,7 @@ class TCPRequestHandler(socketserver.BaseRequestHandler):
def finish(self): def finish(self):
"""called when handle() terminates, i.e. the socket closed""" """called when handle() terminates, i.e. the socket closed"""
self.log.info('closing connection from %s:%d' % self.client_address) self.log.info('closing connection from %s', format_address(self.client_address))
# notify dispatcher # notify dispatcher
self.server.dispatcher.remove_connection(self) self.server.dispatcher.remove_connection(self)
# close socket # close socket
@@ -171,8 +171,28 @@ class TCPRequestHandler(socketserver.BaseRequestHandler):
finally: finally:
self.request.close() self.request.close()
class DualStackTCPServer(socketserver.ThreadingTCPServer):
"""Subclassed to provide IPv6 capabilities as socketserver only uses IPv4"""
def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True, enable_ipv6=False):
super().__init__(
server_address, RequestHandlerClass, bind_and_activate=False)
class TCPServer(socketserver.ThreadingTCPServer): # override default socket
if enable_ipv6:
self.address_family = socket.AF_INET6
self.socket = socket.socket(self.address_family,
self.socket_type)
self.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
if bind_and_activate:
try:
self.server_bind()
self.server_activate()
except:
self.server_close()
raise
class TCPServer(DualStackTCPServer):
daemon_threads = True daemon_threads = True
# on windows, 'reuse_address' means that several servers might listen on # on windows, 'reuse_address' means that several servers might listen on
# the same port, on the other hand, a port is not blocked after closing # the same port, on the other hand, a port is not blocked after closing
@@ -191,13 +211,16 @@ class TCPServer(socketserver.ThreadingTCPServer):
self.name = name self.name = name
self.log = logger self.log = logger
port = int(options.pop('uri').split('://', 1)[-1]) port = int(options.pop('uri').split('://', 1)[-1])
enable_ipv6 = options.pop('ipv6', False)
self.detailed_errors = options.pop('detailed_errors', False) self.detailed_errors = options.pop('detailed_errors', False)
self.log.info("TCPServer %s binding to port %d", name, port) self.log.info("TCPServer %s binding to port %d", name, port)
for ntry in range(5): for ntry in range(5):
try: try:
socketserver.ThreadingTCPServer.__init__( DualStackTCPServer.__init__(
self, ('0.0.0.0', port), TCPRequestHandler, bind_and_activate=True) self, ('', port), TCPRequestHandler,
bind_and_activate=True, enable_ipv6=enable_ipv6
)
break break
except OSError as e: except OSError as e:
if e.args[0] == errno.EADDRINUSE: # address already in use if e.args[0] == errno.EADDRINUSE: # address already in use
@@ -217,3 +240,11 @@ class TCPServer(socketserver.ThreadingTCPServer):
def __exit__(self, *args): def __exit__(self, *args):
self.server_close() self.server_close()
def format_address(addr):
if len(addr) == 2:
return '%s:%d' % addr
address, port = addr[0:2]
if address.startswith('::ffff'):
return '%s:%d' % (address[7:], port)
return '[%s]:%d' % (address, port)

View File

@@ -125,6 +125,9 @@ class Server:
def unknown_options(self, cls, options): def unknown_options(self, cls, options):
return f"{cls.__name__} class don't know how to handle option(s): {', '.join(options)}" return f"{cls.__name__} class don't know how to handle option(s): {', '.join(options)}"
def restart_hook(self):
pass
def run(self): def run(self):
while self._restart: while self._restart:
self._restart = False self._restart = False

View File

@@ -460,6 +460,12 @@ class AnalogOutput(PyTangoDevice, Drivable):
_history = () _history = ()
_timeout = None _timeout = None
_moving = False _moving = False
__main_unit = None
def applyMainUnit(self, mainunit):
# called from __init__ method
# replacement of '$' by main unit must be done later
self.__main_unit = mainunit
def initModule(self): def initModule(self):
super().initModule() super().initModule()
@@ -469,12 +475,18 @@ class AnalogOutput(PyTangoDevice, Drivable):
def startModule(self, start_events): def startModule(self, start_events):
super().startModule(start_events) super().startModule(start_events)
# query unit from tango and update value property try:
attrInfo = self._dev.attribute_query('value') # query unit from tango and update value property
# prefer configured unit if nothing is set on the Tango device, else attrInfo = self._dev.attribute_query('value')
# update # prefer configured unit if nothing is set on the Tango device, else
if attrInfo.unit != 'No unit': # update
self.accessibles['value'].datatype.setProperty('unit', attrInfo.unit) if attrInfo.unit != 'No unit':
self.accessibles['value'].datatype.setProperty('unit', attrInfo.unit)
self.__main_unit = attrInfo.unit
except Exception as e:
self.log.error(e)
if self.__main_unit:
super().applyMainUnit(self.__main_unit)
def doPoll(self): def doPoll(self):
super().doPoll() super().doPoll()
@@ -543,7 +555,7 @@ class AnalogOutput(PyTangoDevice, Drivable):
return self.abslimits[1] return self.abslimits[1]
def __getusermin(self): def __getusermin(self):
return self.userlimits[0] return max(self.userlimits[0], self.abslimits[0])
def __setusermin(self, value): def __setusermin(self, value):
self.userlimits = (value, self.userlimits[1]) self.userlimits = (value, self.userlimits[1])
@@ -551,7 +563,7 @@ class AnalogOutput(PyTangoDevice, Drivable):
usermin = property(__getusermin, __setusermin) usermin = property(__getusermin, __setusermin)
def __getusermax(self): def __getusermax(self):
return self.userlimits[1] return min(self.userlimits[1], self.abslimits[1])
def __setusermax(self, value): def __setusermax(self, value):
self.userlimits = (self.userlimits[0], value) self.userlimits = (self.userlimits[0], value)

View File

@@ -31,7 +31,7 @@ def make_cvt_list(dt, tail=''):
tail is a postfix to be appended in case of tuples and structs tail is a postfix to be appended in case of tuples and structs
""" """
if isinstance(dt, (EnumType, IntRange, BoolType)): if isinstance(dt, (EnumType, IntRange, BoolType)):
return[(int, tail, {type: 'NUM'})] return[(int, tail, {'type': 'NUM'})]
if isinstance(dt, (FloatRange, ScaledInteger)): if isinstance(dt, (FloatRange, ScaledInteger)):
return [(dt.import_value, tail, return [(dt.import_value, tail,
{'type': 'NUM', 'unit': dt.unit, 'period': 5} if dt.unit else {})] {'type': 'NUM', 'unit': dt.unit, 'period': 5} if dt.unit else {})]

View File

@@ -22,11 +22,12 @@
"""driver for phytron motors""" """driver for phytron motors"""
import time
from frappy.core import Done, Command, EnumType, FloatRange, IntRange, \ from frappy.core import Done, Command, EnumType, FloatRange, IntRange, \
HasIO, Parameter, Property, Drivable, PersistentMixin, PersistentParam, \ HasIO, Parameter, Property, Drivable, PersistentMixin, PersistentParam, \
StringIO, StringType, TupleOf StringIO, StringType, IDLE, BUSY, ERROR, Limit
from frappy.errors import CommunicationFailedError, HardwareError, BadValueError from frappy.errors import CommunicationFailedError, HardwareError
from frappy.lib import clamp from frappy.features import HasOffset
class PhytronIO(StringIO): class PhytronIO(StringIO):
@@ -54,43 +55,35 @@ class PhytronIO(StringIO):
return reply[1:] return reply[1:]
class Motor(PersistentMixin, HasIO, Drivable): class Motor(HasOffset, PersistentMixin, HasIO, Drivable):
axis = Property('motor axis X or Y', StringType(), default='X') axis = Property('motor axis X or Y', StringType(), default='X')
address = Property('address', IntRange(0, 15), default=0) address = Property('address', IntRange(0, 15), default=0)
circumference = Property('cirumference for rotations or zero for linear', FloatRange(0), default=360)
encoder_mode = Parameter('how to treat the encoder', EnumType('encoder', NO=0, READ=1, CHECK=2), encoder_mode = Parameter('how to treat the encoder', EnumType('encoder', NO=0, READ=1, CHECK=2),
default=1, readonly=False) default=1, readonly=False)
value = Parameter('angle', FloatRange(unit='deg')) value = PersistentParam('angle', FloatRange(unit='deg'))
status = PersistentParam()
target = Parameter('target angle', FloatRange(unit='deg'), readonly=False) target = Parameter('target angle', FloatRange(unit='deg'), readonly=False)
speed = Parameter('', FloatRange(0, 20, unit='deg/s'), readonly=False) speed = Parameter('', FloatRange(0, 20, unit='deg/s'), readonly=False)
accel = Parameter('', FloatRange(2, 250, unit='deg/s/s'), readonly=False) accel = Parameter('', FloatRange(2, 250, unit='deg/s/s'), readonly=False)
encoder_tolerance = Parameter('', FloatRange(unit='deg'), readonly=False, default=0.01) encoder_tolerance = Parameter('', FloatRange(unit='deg'), readonly=False, default=0.01)
offset = PersistentParam('', FloatRange(unit='deg'), readonly=False, default=0)
sign = PersistentParam('', IntRange(-1,1), readonly=False, default=1) sign = PersistentParam('', IntRange(-1,1), readonly=False, default=1)
encoder = Parameter('encoder reading', FloatRange(unit='deg')) encoder = Parameter('encoder reading', FloatRange(unit='deg'))
backlash = Parameter("""backlash compensation\n backlash = Parameter("""backlash compensation\n
offset for always approaching from the same side""", offset for always approaching from the same side""",
FloatRange(unit='deg'), readonly=False, default=0) FloatRange(unit='deg'), readonly=False, default=0)
abslimits = Parameter('abs limits (raw values)', default=(0, 0), target_min = Limit()
datatype=TupleOf(FloatRange(unit='deg'), FloatRange(unit='deg'))) target_max = Limit()
userlimits = PersistentParam('user limits', readonly=False, default=(0, 0), initwrite=True, alive_time = PersistentParam('alive time for detecting restarts',
datatype=TupleOf(FloatRange(unit='deg'), FloatRange(unit='deg'))) FloatRange(), default=0) # export=False
ioClass = PhytronIO ioClass = PhytronIO
fast_poll = 0.1 fast_poll = 0.1
_backlash_pending = False _backlash_pending = False
_mismatch_count = 0 _mismatch_count = 0
_rawlimits = None
_step_size = None # degree / step _step_size = None # degree / step
_reset_needed = False
def earlyInit(self):
super().earlyInit()
if self.abslimits == (0, 0):
self.abslimits = -9e99, 9e99
if self.userlimits == (0, 0):
self._rawlimits = self.abslimits
self.read_userlimits()
self.loadParameters()
def get(self, cmd): def get(self, cmd):
return self.communicate('%x%s%s' % (self.address, self.axis, cmd)) return self.communicate('%x%s%s' % (self.address, self.axis, cmd))
@@ -111,9 +104,24 @@ class Motor(PersistentMixin, HasIO, Drivable):
self.set(cmd, value) self.set(cmd, value)
return self.get(query) return self.get(query)
def read_alive_time(self):
now = time.time()
axisbit = 1 << int(self.axis == 'Y')
active_axes = int(self.get('P37R')) # adr 37 is a custom address with no internal meaning
if not (axisbit & active_axes): # power cycle detected and this axis not yet active
self.set('P37S', axisbit | active_axes) # activate axis
if now < self.alive_time + 7 * 24 * 3600: # the device was running within last week
# inform the user about the loss of position by the need of doing reset_error
self._reset_needed = True
else: # do reset silently
self.reset_error()
self.alive_time = now
self.saveParameters()
return now
def read_value(self): def read_value(self):
prev_enc = self.encoder prev_enc = self.encoder
pos = float(self.get('P20R')) * self.sign - self.offset pos = float(self.get('P20R')) * self.sign
if self.encoder_mode != 'NO': if self.encoder_mode != 'NO':
enc = self.read_encoder() enc = self.read_encoder()
else: else:
@@ -122,23 +130,25 @@ class Motor(PersistentMixin, HasIO, Drivable):
status = status[0:4] if self.axis == 'X' else status[4:8] status = status[0:4] if self.axis == 'X' else status[4:8]
self.log.debug('run %s enc %s end %s', status[1], status[2], status[3]) self.log.debug('run %s enc %s end %s', status[1], status[2], status[3])
status = self.get('=H') status = self.get('=H')
if status == 'N': if status == 'N': # not at target
if self.encoder_mode == 'CHECK': if self.encoder_mode == 'CHECK':
e1, e2 = sorted((prev_enc, enc)) e1, e2 = sorted((prev_enc, enc))
if e1 - self.encoder_tolerance <= pos <= e2 + self.encoder_tolerance: if e1 - self.encoder_tolerance <= pos <= e2 + self.encoder_tolerance:
self.status = self.Status.BUSY, 'driving' self.status = BUSY, 'driving'
else: else:
self.log.error('encoder lag: %g not within %g..%g', self.log.error('encoder lag: %g not within %g..%g',
pos, e1, e2) pos, e1, e2)
self.get('S') # stop self.get('S') # stop
self.status = self.Status.ERROR, 'encoder lag error' self.status = ERROR, 'encoder lag error'
self.value = pos
self.saveParameters()
self.setFastPoll(False) self.setFastPoll(False)
else: else:
self.status = self.Status.BUSY, 'driving' self.status = BUSY, 'driving'
else: else:
if self._backlash_pending: if self._backlash_pending:
# drive to real target # drive to real target
self.set('A', self.sign * (self.target + self.offset)) self.set('A', self.sign * self.target)
self._backlash_pending = False self._backlash_pending = False
return pos return pos
if (self.encoder_mode == 'CHECK' and if (self.encoder_mode == 'CHECK' and
@@ -148,17 +158,19 @@ class Motor(PersistentMixin, HasIO, Drivable):
else: else:
self.log.error('encoder mismatch: abs(%g - %g) < %g', self.log.error('encoder mismatch: abs(%g - %g) < %g',
enc, pos, self.encoder_tolerance) enc, pos, self.encoder_tolerance)
self.status = self.Status.ERROR, 'encoder does not match pos' self.status = ERROR, 'encoder does not match pos'
else: else:
self._mismatch_count = 0 self._mismatch_count = 0
self.status = self.Status.IDLE, '' self.status = IDLE, ''
self.value = pos
self.saveParameters()
self.setFastPoll(False) self.setFastPoll(False)
return pos return pos
def read_encoder(self): def read_encoder(self):
if self.encoder_mode == 'NO': if self.encoder_mode == 'NO':
return self.value return self.value
return float(self.get('P22R')) * self.sign - self.offset return float(self.get('P22R')) * self.sign
def write_sign(self, value): def write_sign(self, value):
self.sign = value self.sign = value
@@ -187,68 +199,56 @@ class Motor(PersistentMixin, HasIO, Drivable):
self.get_step_size() self.get_step_size()
return float(self.set_get('P15S', round(value / self._step_size), 'P15R')) * self._step_size return float(self.set_get('P15S', round(value / self._step_size), 'P15R')) * self._step_size
def _check_limits(self, *values): def check_target(self, value):
for name, (mn, mx) in ('user', self._rawlimits), ('abs', self.abslimits): self.checkLimits(value)
mn -= self.offset self.checkLimits(value + self.backlash)
mx -= self.offset
for v in values:
if not mn <= v <= mx:
raise BadValueError('%s limits violation: %g <= %g <= %g' % (name, mn, v, mx))
v += self.offset
def write_target(self, value): def write_target(self, value):
if self.status[0] == self.Status.ERROR: self.read_alive_time()
if self._reset_needed:
self.status = ERROR, 'reset needed after power up (probably position lost)'
raise HardwareError(self.status[1])
if self.status[0] == ERROR:
raise HardwareError('need reset') raise HardwareError('need reset')
self.status = self.Status.BUSY, 'changed target' self.status = BUSY, 'changed target'
self._check_limits(value, value + self.backlash) self.saveParameters()
if self.backlash: if self.backlash:
# drive first to target + backlash # drive first to target + backlash
# we do not optimize when already driving from the right side # we do not optimize when already driving from the right side
self._backlash_pending = True self._backlash_pending = True
self.set('A', self.sign * (value + self.offset + self.backlash)) self.set('A', self.sign * (value + self.backlash))
else: else:
self.set('A', self.sign * (value + self.offset)) self.set('A', self.sign * value)
self.setFastPoll(True, self.fast_poll) self.setFastPoll(True, self.fast_poll)
return value return value
def read_userlimits(self):
return self._rawlimits[0] - self.offset, self._rawlimits[1] - self.offset
def write_userlimits(self, value):
self._rawlimits = [clamp(self.abslimits[0], v + self.offset, self.abslimits[1]) for v in value]
value = self.read_userlimits()
self.saveParameters()
return value
def write_offset(self, value):
self.offset = value
self.read_userlimits()
self.saveParameters()
return Done
def stop(self): def stop(self):
self.get('S') self.get('S')
@Command @Command
def reset(self): def reset_error(self):
"""Reset error, set position to encoder""" """Reset error, set position to encoder"""
self.read_value() self.read_value()
if self.status[0] == self.Status.ERROR: if self.status[0] == ERROR or self._reset_needed:
enc = self.encoder + self.offset newenc = enc = self.encoder
pos = self.value + self.offset pos = self.value
if abs(enc - pos) > self.encoder_tolerance: if abs(enc - pos) > self.encoder_tolerance or self.encoder_mode == 'NO':
if enc < 0: if self.circumference:
# assume we have a rotation (not a linear motor) # bring encoder value either within or as close as possible to the given range
while enc < 0: if enc < self.target_min:
self.offset += 360 mid = self.target_min + 0.5 * min(self.target_max - self.target_min, self.circumference)
enc += 360 elif enc > self.target_max:
self.set('P22S', enc * self.sign) mid = self.target_max - 0.5 * min(self.target_max - self.target_min, self.circumference)
self.saveParameters() else:
self.set('P20S', enc * self.sign) # set pos to encoder mid = enc
newenc += round((mid - enc) / self.circumference) * self.circumference
if newenc != enc:
self.set('P22S', newenc * self.sign)
if newenc != pos:
self.set('P20S', newenc * self.sign) # set pos to encoder
self.read_value() self.read_value()
# self.status = self.Status.IDLE, '' self._reset_needed = False
# TODO: # TODO:
# '=E' electronics status # '=E' electronics status
# '=I+' / '=I-': limit switches # '=I+' / '=I-': limit switches
# use P37 to determine if restarted

View File

@@ -21,21 +21,28 @@
# ***************************************************************************** # *****************************************************************************
import pytest import pytest
from frappy.lib import parseHostPort
from frappy.lib import parse_host_port
@pytest.mark.parametrize('hostport, defaultport, result', [ @pytest.mark.parametrize('hostport, defaultport, result', [
(('box.psi.ch', 9999), 1, ('box.psi.ch', 9999)), ('box.psi.ch:9999', 1, ('box.psi.ch', 9999)),
(('/dev/tty', 9999), 1, None), ('/dev/tty:9999', 1, None),
('localhost:10767', 1, ('localhost', 10767)), ('localhost:10767', 1, ('localhost', 10767)),
('www.psi.ch', 80, ('www.psi.ch', 80)), ('www.psi.ch', 80, ('www.psi.ch', 80)),
('/dev/ttyx:2089', 10767, None), ('/dev/ttyx:2089', 10767, None),
('COM4:', 2089, None), ('COM4:', 2089, None),
('underscore_valid.123.hyphen-valid.com', 80, ('underscore_valid.123.hyphen-valid.com', 80)), ('123.hyphen-valid.com', 80, ('123.hyphen-valid.com', 80)),
('underscore_invalid.123.hyphen-valid.com:10000', 80, None),
('::1.1111', 2, ('::1', 1111)),
('[2e::fe]:1', 50, ('2e::fe', 1)),
('127.0.0.1:50', 1337, ('127.0.0.1', 50)),
('234.40.128.3:13212', 1337, ('234.40.128.3', 13212)),
]) ])
def test_parse_host(hostport, defaultport, result): def test_parse_host(hostport, defaultport, result):
if result is None: if result is None:
with pytest.raises(ValueError): with pytest.raises(ValueError):
parseHostPort(hostport, defaultport) parse_host_port(hostport, defaultport)
else: else:
assert result == parseHostPort(hostport, defaultport) assert result == parse_host_port(hostport, defaultport)

View File

@@ -29,7 +29,7 @@ import pytest
from frappy.datatypes import BoolType, FloatRange, StringType, IntRange, ScaledInteger from frappy.datatypes import BoolType, FloatRange, StringType, IntRange, ScaledInteger
from frappy.errors import ProgrammingError, ConfigError, RangeError from frappy.errors import ProgrammingError, ConfigError, RangeError
from frappy.modules import Communicator, Drivable, Readable, Module from frappy.modules import Communicator, Drivable, Readable, Module
from frappy.params import Command, Parameter from frappy.params import Command, Parameter, Limit
from frappy.rwhandler import ReadHandler, WriteHandler, nopoll from frappy.rwhandler import ReadHandler, WriteHandler, nopoll
from frappy.lib import generalConfig from frappy.lib import generalConfig
@@ -795,17 +795,17 @@ stdlim = {
class Lim(Module): class Lim(Module):
a = Parameter('', FloatRange(-10, 10), readonly=False, default=0) a = Parameter('', FloatRange(-10, 10), readonly=False, default=0)
a_min = Parameter() a_min = Limit()
a_max = Parameter() a_max = Limit()
b = Parameter('', FloatRange(0, None), readonly=False, default=0) b = Parameter('', FloatRange(0, None), readonly=False, default=0)
b_min = Parameter() b_min = Limit()
c = Parameter('', IntRange(None, 100), readonly=False, default=0) c = Parameter('', IntRange(None, 100), readonly=False, default=0)
c_max = Parameter() c_max = Limit()
d = Parameter('', FloatRange(-5, 5), readonly=False, default=0) d = Parameter('', FloatRange(-5, 5), readonly=False, default=0)
d_limits = Parameter() d_limits = Limit()
e = Parameter('', IntRange(0, 8), readonly=False, default=0) e = Parameter('', IntRange(0, 8), readonly=False, default=0)
@@ -872,8 +872,8 @@ def test_limit_inheritance():
raise ValueError('value is not a multiple of 0.25') raise ValueError('value is not a multiple of 0.25')
class Mixin: class Mixin:
a_min = Parameter() a_min = Limit()
a_max = Parameter() a_max = Limit()
class Mod(Mixin, Base): class Mod(Mixin, Base):
def check_a(self, value): def check_a(self, value):