remove obsolete code
- basic_validators is not needed any more since the implementation of datatypes.Stub - client/baseclient.y is replaced by client/__init__.py both for the gui client and NICOS SECoP client - lib/parsing.py used by baseclient only Change-Id: I15b6ac880017000e155b8f6b7e2456e1bbf56dab Reviewed-on: https://forge.frm2.tum.de/review/c/sine2020/secop/playground/+/25058 Tested-by: Jenkins Automated Tests <pedersen+jenkins@frm2.tum.de> Reviewed-by: Enrico Faulhaber <enrico.faulhaber@frm2.tum.de> Reviewed-by: Markus Zolliker <markus.zolliker@psi.ch>
This commit is contained in:
parent
d6cf1f7629
commit
0c23ee46a1
@ -1,76 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
# pylint: disable=invalid-name
|
||||
# -*- coding: utf-8 -*-
|
||||
# *****************************************************************************
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or modify it under
|
||||
# the terms of the GNU General Public License as published by the Free Software
|
||||
# Foundation; either version 2 of the License, or (at your option) any later
|
||||
# version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
||||
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
|
||||
# details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License along with
|
||||
# this program; if not, write to the Free Software Foundation, Inc.,
|
||||
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
||||
#
|
||||
# Module authors:
|
||||
# Enrico Faulhaber <enrico.faulhaber@frm2.tum.de>
|
||||
#
|
||||
# *****************************************************************************
|
||||
|
||||
import sys
|
||||
import argparse
|
||||
from os import path
|
||||
|
||||
# Path magic to make python find our stuff.
|
||||
# also remember our basepath (for etc, pid lookup, etc)
|
||||
basepath = path.abspath(path.join(sys.path[0], '..'))
|
||||
etc_path = path.join(basepath, 'etc')
|
||||
pid_path = path.join(basepath, 'pid')
|
||||
log_path = path.join(basepath, 'log')
|
||||
# sys.path[0] = path.join(basepath, 'src')
|
||||
sys.path[0] = basepath
|
||||
|
||||
# do not move above!
|
||||
import mlzlog
|
||||
from secop.client.console import ClientConsole
|
||||
|
||||
|
||||
def parseArgv(argv):
|
||||
parser = argparse.ArgumentParser(description="Connect to a SECoP server")
|
||||
loggroup = parser.add_mutually_exclusive_group()
|
||||
loggroup.add_argument("-v", "--verbose",
|
||||
help="Output lots of diagnostic information",
|
||||
action='store_true', default=False)
|
||||
loggroup.add_argument("-q", "--quiet", help="suppress non-error messages",
|
||||
action='store_true', default=False)
|
||||
parser.add_argument("name",
|
||||
type=str,
|
||||
help="Name of the instance.\n"
|
||||
" Uses etc/name.cfg for configuration\n",)
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
def main(argv=None):
|
||||
if argv is None:
|
||||
argv = sys.argv
|
||||
|
||||
args = parseArgv(argv[1:])
|
||||
|
||||
loglevel = 'debug' if args.verbose else ('error' if args.quiet else 'info')
|
||||
mlzlog.initLogging('console', loglevel, log_path)
|
||||
|
||||
console = ClientConsole(args.name, basepath)
|
||||
|
||||
try:
|
||||
console.run()
|
||||
except KeyboardInterrupt:
|
||||
console.close()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
sys.exit(main(sys.argv))
|
@ -1,406 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# *****************************************************************************
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or modify it under
|
||||
# the terms of the GNU General Public License as published by the Free Software
|
||||
# Foundation; either version 2 of the License, or (at your option) any later
|
||||
# version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
||||
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
|
||||
# details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License along with
|
||||
# this program; if not, write to the Free Software Foundation, Inc.,
|
||||
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
||||
#
|
||||
# Module authors:
|
||||
# Enrico Faulhaber <enrico.faulhaber@frm2.tum.de>
|
||||
#
|
||||
# *****************************************************************************
|
||||
"""Define parsing helpers"""
|
||||
|
||||
# TODO: remove, as currently not used
|
||||
|
||||
import re
|
||||
import time
|
||||
from datetime import datetime, timedelta, tzinfo
|
||||
|
||||
# format_time and parse_time could be simplified with external dateutil lib
|
||||
# http://stackoverflow.com/a/15228038
|
||||
|
||||
# based on http://stackoverflow.com/a/39418771
|
||||
|
||||
|
||||
class LocalTimezone(tzinfo):
|
||||
ZERO = timedelta(0)
|
||||
STDOFFSET = timedelta(seconds=-time.timezone)
|
||||
if time.daylight:
|
||||
DSTOFFSET = timedelta(seconds=-time.altzone)
|
||||
else:
|
||||
DSTOFFSET = STDOFFSET
|
||||
|
||||
DSTDIFF = DSTOFFSET - STDOFFSET
|
||||
|
||||
def utcoffset(self, dt):
|
||||
if self._isdst(dt):
|
||||
return self.DSTOFFSET
|
||||
return self.STDOFFSET
|
||||
|
||||
def dst(self, dt):
|
||||
if self._isdst(dt):
|
||||
return self.DSTDIFF
|
||||
return self.ZERO
|
||||
|
||||
def tzname(self, dt):
|
||||
return time.tzname[self._isdst(dt)]
|
||||
|
||||
def _isdst(self, dt):
|
||||
tt = (dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second,
|
||||
dt.weekday(), 0, 0)
|
||||
stamp = time.mktime(tt)
|
||||
tt = time.localtime(stamp)
|
||||
return tt.tm_isdst > 0
|
||||
|
||||
|
||||
LocalTimezone = LocalTimezone()
|
||||
|
||||
|
||||
def format_time(timestamp=None):
|
||||
# get time in UTC
|
||||
if timestamp is None:
|
||||
d = datetime.now(LocalTimezone)
|
||||
else:
|
||||
d = datetime.fromtimestamp(timestamp, LocalTimezone)
|
||||
return d.isoformat("T")
|
||||
|
||||
# Solution based on
|
||||
# https://bugs.python.org/review/15873/diff/16581/Lib/datetime.py#newcode1418Lib/datetime.py:1418
|
||||
|
||||
|
||||
class Timezone(tzinfo):
|
||||
|
||||
def __init__(self, offset, name='unknown timezone'): # pylint: disable=W0231
|
||||
self.offset = offset
|
||||
self.name = name
|
||||
|
||||
def tzname(self, dt):
|
||||
return self.name
|
||||
|
||||
def utcoffset(self, dt):
|
||||
return self.offset
|
||||
|
||||
def dst(self, dt):
|
||||
return timedelta(0)
|
||||
|
||||
|
||||
datetime_re = re.compile(
|
||||
r'(?P<year>\d{4})-(?P<month>\d{1,2})-(?P<day>\d{1,2})'
|
||||
r'[T ](?P<hour>\d{1,2}):(?P<minute>\d{1,2})'
|
||||
r'(?::(?P<second>\d{1,2})(?:\.(?P<microsecond>\d{1,6})\d*)?)?'
|
||||
r'(?P<tzinfo>Z|[+-]\d{2}(?::?\d{2})?)?$')
|
||||
|
||||
|
||||
def _parse_isostring(isostring):
|
||||
"""Parses a string and return a datetime.datetime.
|
||||
This function supports time zone offsets. When the input contains one,
|
||||
the output uses a timezone with a fixed offset from UTC.
|
||||
"""
|
||||
match = datetime_re.match(isostring)
|
||||
if match:
|
||||
kw = match.groupdict()
|
||||
if kw['microsecond']:
|
||||
kw['microsecond'] = kw['microsecond'].ljust(6, '0')
|
||||
_tzinfo = kw.pop('tzinfo')
|
||||
if _tzinfo == 'Z':
|
||||
_tzinfo = timezone.utc # pylint: disable=E0602
|
||||
elif _tzinfo is not None:
|
||||
offset_mins = int(_tzinfo[-2:]) if len(_tzinfo) > 3 else 0
|
||||
offset_hours = int(_tzinfo[1:3])
|
||||
offset = timedelta(hours=offset_hours, minutes=offset_mins)
|
||||
if _tzinfo[0] == '-':
|
||||
offset = -offset
|
||||
_tzinfo = Timezone(offset)
|
||||
kw = {k: int(v) for k, v in kw.items() if v is not None}
|
||||
kw['tzinfo'] = _tzinfo
|
||||
return datetime(**kw)
|
||||
raise ValueError("%s is not a valid ISO8601 string I can parse!" %
|
||||
isostring)
|
||||
|
||||
|
||||
def parse_time(isostring):
|
||||
try:
|
||||
return float(isostring)
|
||||
except ValueError:
|
||||
dt = _parse_isostring(isostring)
|
||||
return time.mktime(dt.timetuple()) + dt.microsecond * 1e-6
|
||||
|
||||
# possibly unusable stuff below!
|
||||
|
||||
|
||||
def format_args(args):
|
||||
if isinstance(args, list):
|
||||
return ','.join(format_args(arg) for arg in args).join('[]')
|
||||
if isinstance(args, tuple):
|
||||
return ','.join(format_args(arg) for arg in args).join('()')
|
||||
if isinstance(args, str):
|
||||
# XXX: check for 'easy' strings only and omit the ''
|
||||
return repr(args)
|
||||
return repr(args) # for floats/ints/...
|
||||
|
||||
|
||||
class ArgsParser:
|
||||
"""returns a pythonic object from the input expression
|
||||
|
||||
grammar:
|
||||
expr = number | string | array_expr | record_expr
|
||||
number = int | float
|
||||
string = '"' (chars - '"')* '"' | "'" (chars - "'")* "'"
|
||||
array_expr = '[' (expr ',')* expr ']'
|
||||
record_expr = '(' (name '=' expr ',')* ')'
|
||||
int = '-' pos_int | pos_int
|
||||
pos_int = [0..9]+
|
||||
float = int '.' pos_int ( [eE] int )?
|
||||
name = [A-Za-z_] [A-Za-z0-9_]*
|
||||
"""
|
||||
|
||||
DIGITS_CHARS = '0123456789'
|
||||
NAME_CHARS = '_ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz'
|
||||
NAME_CHARS2 = NAME_CHARS + DIGITS_CHARS
|
||||
|
||||
def __init__(self, string=''):
|
||||
self.string = string
|
||||
self.idx = 0
|
||||
self.length = len(string)
|
||||
|
||||
def setstring(self, string):
|
||||
self.string = string
|
||||
self.idx = 0
|
||||
self.length = len(string)
|
||||
self.skip()
|
||||
|
||||
def peek(self):
|
||||
if self.idx >= self.length:
|
||||
return None
|
||||
return self.string[self.idx]
|
||||
|
||||
def get(self):
|
||||
res = self.peek()
|
||||
self.idx += 1
|
||||
return res
|
||||
|
||||
def skip(self):
|
||||
"""skips whitespace"""
|
||||
while self.peek() in ('\t', ' '):
|
||||
self.get()
|
||||
|
||||
def match(self, what):
|
||||
if self.peek() != what:
|
||||
return False
|
||||
self.get()
|
||||
self.skip()
|
||||
return True
|
||||
|
||||
def parse(self, arg=None):
|
||||
"""parses given or constructed_with string"""
|
||||
self.setstring(arg or self.string)
|
||||
res = []
|
||||
while self.idx < self.length:
|
||||
res.append(self.parse_exp())
|
||||
self.match(',')
|
||||
if len(res) > 1:
|
||||
return tuple(*res)
|
||||
return res[0]
|
||||
|
||||
def parse_exp(self):
|
||||
"""expr = array_expr | record_expr | string | number"""
|
||||
idx = self.idx
|
||||
res = self.parse_array()
|
||||
if res:
|
||||
return res
|
||||
self.idx = idx
|
||||
res = self.parse_record()
|
||||
if res:
|
||||
return res
|
||||
self.idx = idx
|
||||
res = self.parse_string()
|
||||
if res:
|
||||
return res
|
||||
self.idx = idx
|
||||
return self.parse_number()
|
||||
|
||||
def parse_number(self):
|
||||
"""number = float | int """
|
||||
idx = self.idx
|
||||
number = self.parse_float()
|
||||
if number is not None:
|
||||
return number
|
||||
self.idx = idx # rewind
|
||||
return self.parse_int()
|
||||
|
||||
def parse_string(self):
|
||||
"""string = '"' (chars - '"')* '"' | "'" (chars - "'")* "'" """
|
||||
delim = self.peek()
|
||||
if delim in ('"', "'"):
|
||||
lastchar = self.get()
|
||||
string = []
|
||||
while self.peek() != delim or lastchar == '\\':
|
||||
lastchar = self.peek()
|
||||
string.append(self.get())
|
||||
self.get()
|
||||
self.skip()
|
||||
return ''.join(string)
|
||||
return self.parse_name()
|
||||
|
||||
def parse_array(self):
|
||||
"""array_expr = '[' (expr ',')* expr ']' """
|
||||
if self.get() != '[':
|
||||
return None
|
||||
self.skip()
|
||||
res = []
|
||||
while self.peek() != ']':
|
||||
el = self.parse_exp()
|
||||
if el is None:
|
||||
return el
|
||||
res.append(el)
|
||||
if self.match(']'):
|
||||
return res
|
||||
if self.get() != ',':
|
||||
return None
|
||||
self.skip()
|
||||
self.get()
|
||||
self.skip()
|
||||
return res
|
||||
|
||||
def parse_record(self):
|
||||
"""record_expr = '(' (name '=' expr ',')* ')' """
|
||||
if self.get() != '(':
|
||||
return None
|
||||
self.skip()
|
||||
res = {}
|
||||
while self.peek() != ')':
|
||||
name = self.parse_name()
|
||||
if self.get() != '=':
|
||||
return None
|
||||
self.skip()
|
||||
value = self.parse_exp()
|
||||
res[name] = value
|
||||
if self.peek() == ')':
|
||||
self.get()
|
||||
self.skip()
|
||||
return res
|
||||
if self.get() != ',':
|
||||
return None
|
||||
self.skip()
|
||||
self.get()
|
||||
self.skip()
|
||||
return res
|
||||
|
||||
def parse_int(self):
|
||||
"""int = '-' pos_int | pos_int"""
|
||||
if self.peek() == '-':
|
||||
self.get()
|
||||
number = self.parse_pos_int()
|
||||
if number is not None:
|
||||
return -number # pylint: disable=invalid-unary-operand-type
|
||||
return None
|
||||
return self.parse_pos_int()
|
||||
|
||||
def parse_pos_int(self):
|
||||
"""pos_int = [0..9]+"""
|
||||
number = 0
|
||||
if self.peek() not in self.DIGITS_CHARS:
|
||||
return None
|
||||
while self.peek() in self.DIGITS_CHARS:
|
||||
number = number * 10 + int(self.get())
|
||||
self.skip()
|
||||
return number
|
||||
|
||||
def parse_float(self):
|
||||
"""float = int '.' pos_int ( [eE] int )?"""
|
||||
number = self.parse_int()
|
||||
if self.get() != '.':
|
||||
return None
|
||||
idx = self.idx
|
||||
fraction = self.parse_pos_int()
|
||||
while idx < self.idx:
|
||||
fraction /= 10.
|
||||
idx += 1
|
||||
if number >= 0:
|
||||
number = number + fraction
|
||||
else:
|
||||
number = number - fraction
|
||||
exponent = 0
|
||||
if self.peek() in ('e', 'E'):
|
||||
self.get()
|
||||
exponent = self.parse_int()
|
||||
if exponent is None:
|
||||
return exponent
|
||||
while exponent > 0:
|
||||
number *= 10.
|
||||
exponent -= 1
|
||||
while exponent < 0:
|
||||
number /= 10.
|
||||
exponent += 1
|
||||
self.skip()
|
||||
return number
|
||||
|
||||
def parse_name(self):
|
||||
"""name = [A-Za-z_] [A-Za-z0-9_]*"""
|
||||
name = []
|
||||
if self.peek() in self.NAME_CHARS:
|
||||
name.append(self.get())
|
||||
while self.peek() in self.NAME_CHARS2:
|
||||
name.append(self.get())
|
||||
self.skip()
|
||||
return ''.join(name)
|
||||
return None
|
||||
|
||||
|
||||
def parse_args(s):
|
||||
# QnD Hack! try to parse lists/tuples/ints/floats, ignore dicts, specials
|
||||
# XXX: replace by proper parsing. use ast?
|
||||
s = s.strip()
|
||||
if s.startswith('[') and s.endswith(']'):
|
||||
# evaluate inner
|
||||
return [parse_args(part) for part in s[1:-1].split(',')]
|
||||
if s.startswith('(') and s.endswith(')'):
|
||||
# evaluate inner
|
||||
return tuple(parse_args(part) for part in s[1:-1].split(','))
|
||||
if s.startswith('"') and s.endswith('"'):
|
||||
# evaluate inner
|
||||
return s[1:-1]
|
||||
if s.startswith("'") and s.endswith("'"):
|
||||
# evaluate inner
|
||||
return s[1:-1]
|
||||
if '.' in s:
|
||||
return float(s)
|
||||
return int(s)
|
||||
|
||||
|
||||
__ALL__ = ['format_time', 'parse_time', 'parse_args']
|
||||
|
||||
# if __name__ == '__main__':
|
||||
# print "minimal testing: lib/parsing:"
|
||||
# print "time_formatting:",
|
||||
# t = time.time()
|
||||
# s = format_time(t)
|
||||
# assert (abs(t - parse_time(s)) < 1e-6)
|
||||
# print "OK"#
|
||||
#
|
||||
# print "ArgsParser:"
|
||||
# a = ArgsParser()
|
||||
# print a.parse('[ "\'\\\"A" , "<>\'", \'",C\', [1.23e1, 123.0e-001] , ]')
|
||||
|
||||
# #import pdb
|
||||
# #pdb.run('print a.parse()', globals(), locals())
|
||||
|
||||
# print "args_formatting:",
|
||||
# for obj in [1, 2.3, 'X', (1, 2, 3), [1, (3, 4), 'X,y']]:
|
||||
# s = format_args(obj)
|
||||
# p = a.parse(s)
|
||||
# print p,
|
||||
# assert (parse_args(format_args(obj)) == obj)
|
||||
# print "OK"
|
||||
# print "OK"
|
@ -1,84 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# *****************************************************************************
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or modify it under
|
||||
# the terms of the GNU General Public License as published by the Free Software
|
||||
# Foundation; either version 2 of the License, or (at your option) any later
|
||||
# version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
||||
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
|
||||
# details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License along with
|
||||
# this program; if not, write to the Free Software Foundation, Inc.,
|
||||
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
||||
#
|
||||
# Module authors:
|
||||
# Enrico Faulhaber <enrico.faulhaber@frm2.tum.de>
|
||||
#
|
||||
# *****************************************************************************
|
||||
"""test basic validators."""
|
||||
|
||||
# no fixtures needed
|
||||
import pytest
|
||||
|
||||
from secop.basic_validators import BoolProperty, EnumProperty, FloatProperty, \
|
||||
FmtStrProperty, IntProperty, NoneOr, NonNegativeFloatProperty, \
|
||||
NonNegativeIntProperty, OneOfProperty, PositiveFloatProperty, \
|
||||
PositiveIntProperty, StringProperty, TupleProperty, UnitProperty
|
||||
|
||||
|
||||
class unprintable:
|
||||
def __str__(self):
|
||||
raise NotImplementedError
|
||||
|
||||
@pytest.mark.parametrize('validators_args', [
|
||||
[FloatProperty, [None, 'a'], [1, 1.23, '1.23', '9e-12']],
|
||||
[PositiveFloatProperty, ['x', -9, '-9', 0], [1, 1.23, '1.23', '9e-12']],
|
||||
[NonNegativeFloatProperty, ['x', -9, '-9'], [0, 1.23, '1.23', '9e-12']],
|
||||
[IntProperty, [None, 'a', 1.2, '1.2'], [1, '-1']],
|
||||
[PositiveIntProperty, ['x', 1.9, '-9', '1e-4'], [1, '1']],
|
||||
[NonNegativeIntProperty, ['x', 1.9, '-9', '1e-6'], [0, '1']],
|
||||
[BoolProperty, ['x', 3], ['on', 'off', True, False]],
|
||||
[StringProperty, [unprintable()], ['1', 1.2, [{}]]],
|
||||
[UnitProperty, [unprintable(), '3', 9], ['mm', 'Gbarn', 'acre']],
|
||||
[FmtStrProperty, [1, None, 'a', '%f'], ['%.0e', '%.3f','%.1g']],
|
||||
])
|
||||
def test_validators(validators_args):
|
||||
v, fails, oks = validators_args
|
||||
for value in fails:
|
||||
with pytest.raises(Exception):
|
||||
v(value)
|
||||
for value in oks:
|
||||
v(value)
|
||||
|
||||
|
||||
@pytest.mark.parametrize('checker_inits', [
|
||||
[OneOfProperty, lambda: OneOfProperty(a=3),], # pylint: disable=unexpected-keyword-arg
|
||||
[NoneOr, lambda: NoneOr(None),],
|
||||
[EnumProperty, lambda: EnumProperty(1),], # pylint: disable=too-many-function-args
|
||||
[TupleProperty, lambda: TupleProperty(1,2,3),],
|
||||
])
|
||||
def test_checker_fails(checker_inits):
|
||||
empty, badargs = checker_inits
|
||||
with pytest.raises(Exception):
|
||||
empty()
|
||||
with pytest.raises(Exception):
|
||||
badargs()
|
||||
|
||||
|
||||
@pytest.mark.parametrize('checker_args', [
|
||||
[OneOfProperty(1,2,3), ['x', None, 4], [1, 2, 3]],
|
||||
[NoneOr(IntProperty), ['a', 1.2, '1.2'], [None, 1, '-1', '999999999999999']],
|
||||
[EnumProperty(a=1, b=2), ['x', None, 3], ['a', 'b', 1, 2]],
|
||||
[TupleProperty(IntProperty, StringProperty), [1, 'a', ('x', 2)], [(1,'x')]],
|
||||
])
|
||||
def test_checkers(checker_args):
|
||||
v, fails, oks = checker_args
|
||||
for value in fails:
|
||||
with pytest.raises(Exception):
|
||||
v(value)
|
||||
for value in oks:
|
||||
v(value)
|
@ -1,85 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# *****************************************************************************
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or modify it under
|
||||
# the terms of the GNU General Public License as published by the Free Software
|
||||
# Foundation; either version 2 of the License, or (at your option) any later
|
||||
# version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
||||
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
|
||||
# details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License along with
|
||||
# this program; if not, write to the Free Software Foundation, Inc.,
|
||||
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
||||
#
|
||||
# Module authors:
|
||||
# Enrico Faulhaber <enrico.faulhaber@frm2.tum.de>
|
||||
#
|
||||
# *****************************************************************************
|
||||
"""test base client."""
|
||||
|
||||
from collections import OrderedDict
|
||||
|
||||
import pytest
|
||||
|
||||
from secop.client.baseclient import Client
|
||||
|
||||
# define Test-only connection object
|
||||
|
||||
|
||||
class TestConnect:
|
||||
callbacks = []
|
||||
|
||||
def writeline(self, line):
|
||||
pass
|
||||
|
||||
def readline(self):
|
||||
return ''
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def clientobj(request):
|
||||
print (" SETUP ClientObj")
|
||||
testconnect = TestConnect()
|
||||
yield Client(dict(testing=testconnect), autoconnect=False)
|
||||
for cb, arg in testconnect.callbacks:
|
||||
cb(arg)
|
||||
print (" TEARDOWN ClientObj")
|
||||
|
||||
|
||||
# pylint: disable=redefined-outer-name
|
||||
def test_describing_data_decode(clientobj):
|
||||
assert {'modules': OrderedDict(), 'properties': {}
|
||||
} == clientobj._decode_substruct(['modules'], {})
|
||||
describing_data = {'equipment_id': 'eid',
|
||||
'modules': [['LN2', {'commands': [],
|
||||
'interfaces': ['Readable', 'Module'],
|
||||
'parameters': [['value', {'datatype': ['double'],
|
||||
'description': 'current value',
|
||||
'readonly': True,
|
||||
}
|
||||
]]
|
||||
}
|
||||
]]
|
||||
}
|
||||
decoded_data = {'modules': OrderedDict([('LN2', {'commands': OrderedDict(),
|
||||
'parameters': OrderedDict([('value', {'datatype': ['double'],
|
||||
'description': 'current value',
|
||||
'readonly': True,
|
||||
}
|
||||
)]),
|
||||
'properties': {'interfaces': ['Readable', 'Module']}
|
||||
}
|
||||
)]),
|
||||
'properties': {'equipment_id': 'eid',
|
||||
}
|
||||
}
|
||||
|
||||
a = clientobj._decode_substruct(['modules'], describing_data)
|
||||
for modname, module in a['modules'].items():
|
||||
a['modules'][modname] = clientobj._decode_substruct(
|
||||
['parameters', 'commands'], module)
|
||||
assert a == decoded_data
|
Loading…
x
Reference in New Issue
Block a user