introduce command handlers
A command handler can be used to deal with parameters which have a common command for querying and writing to the hardware. Support for parsing and formatting commands is included. Change-Id: I79a1b6c9daf3af3be2e1875875ced41ec1a1829d Reviewed-on: https://forge.frm2.tum.de/review/c/sine2020/secop/playground/+/21946 Tested-by: JenkinsCodeReview <bjoern_pedersen@frm2.tum.de> Reviewed-by: Markus Zolliker <markus.zolliker@psi.ch>
This commit is contained in:
264
secop/commandhandler.py
Executable file
264
secop/commandhandler.py
Executable file
@ -0,0 +1,264 @@
|
|||||||
|
#!/usr/bin/env python
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
# *****************************************************************************
|
||||||
|
# This program is free software; you can redistribute it and/or modify it under
|
||||||
|
# the terms of the GNU General Public License as published by the Free Software
|
||||||
|
# Foundation; either version 2 of the License, or (at your option) any later
|
||||||
|
# version.
|
||||||
|
#
|
||||||
|
# This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
||||||
|
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
|
||||||
|
# details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU General Public License along with
|
||||||
|
# this program; if not, write to the Free Software Foundation, Inc.,
|
||||||
|
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
||||||
|
#
|
||||||
|
# Module authors:
|
||||||
|
# Markus Zolliker <markus.zolliker@psi.ch>
|
||||||
|
# *****************************************************************************
|
||||||
|
"""command handler
|
||||||
|
|
||||||
|
utility class for cases, where multiple parameters are treated with a common command.
|
||||||
|
The support for LakeShore and similar protocols is already included.
|
||||||
|
"""
|
||||||
|
import re
|
||||||
|
|
||||||
|
from secop.metaclass import Done
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
class CmdParser:
|
||||||
|
"""helper for parsing replies
|
||||||
|
|
||||||
|
using a subset of old style python formatting
|
||||||
|
the same format can be used or formatting command arguments
|
||||||
|
"""
|
||||||
|
|
||||||
|
# make a map of cast functions
|
||||||
|
CAST_MAP = {letter: cast
|
||||||
|
for letters, cast in (
|
||||||
|
('d', int),
|
||||||
|
('s', str), # 'c' is treated separately
|
||||||
|
('o', lambda x:int(x, 8)),
|
||||||
|
('xX', lambda x:int(x, 16)),
|
||||||
|
('eEfFgG', float),
|
||||||
|
) for letter in letters
|
||||||
|
}
|
||||||
|
# pattern for chacaters to be escaped
|
||||||
|
ESC_PAT = re.compile('([\\%s])' % '\\'.join('|^$-.+*?()[]{}<>'))
|
||||||
|
# format pattern
|
||||||
|
FMT_PAT = re.compile('(%%|%[^diouxXfFgGeEcrsa]*(?:.|$))')
|
||||||
|
|
||||||
|
def __init__(self, argformat):
|
||||||
|
# replace named patterns
|
||||||
|
|
||||||
|
self.fmt = argformat
|
||||||
|
spl = self.FMT_PAT.split(argformat)
|
||||||
|
spl_iter = iter(spl)
|
||||||
|
|
||||||
|
def escaped(text):
|
||||||
|
return self.ESC_PAT.sub(lambda x: '\\' + x.group(1), text)
|
||||||
|
|
||||||
|
casts = []
|
||||||
|
# the first item in spl is just plain text
|
||||||
|
pat = [escaped(next(spl_iter))]
|
||||||
|
todofmt = None # format set aside to be treated in next loop
|
||||||
|
|
||||||
|
# loop over found formats and separators
|
||||||
|
for fmt, sep in zip(spl_iter,spl_iter):
|
||||||
|
if fmt == '%%':
|
||||||
|
if todofmt is None:
|
||||||
|
pat.append('%' + escaped(sep)) # plain text
|
||||||
|
continue
|
||||||
|
fmt = todofmt
|
||||||
|
todofmt = None
|
||||||
|
sep = '%' + sep
|
||||||
|
elif todofmt:
|
||||||
|
raise ValueError("a separator must follow '%s'" % todofmt)
|
||||||
|
cast = self.CAST_MAP.get(fmt[-1], None)
|
||||||
|
if cast is None: # special or unknown case
|
||||||
|
if fmt != '%c':
|
||||||
|
raise ValueError("unsupported format: '%s'" % fmt)
|
||||||
|
# we do not need a separator after %c
|
||||||
|
pat.append('(.)')
|
||||||
|
casts.append(str)
|
||||||
|
pat.append(escaped(sep))
|
||||||
|
continue
|
||||||
|
if sep == '': # missing separator. postpone handling for '%%' case or end of pattern
|
||||||
|
todofmt = fmt
|
||||||
|
continue
|
||||||
|
casts.append(cast)
|
||||||
|
# accepting everything up to a separator
|
||||||
|
pat.append('([^%s]*)' % escaped(sep[0]) + escaped(sep))
|
||||||
|
if todofmt:
|
||||||
|
casts.append(cast)
|
||||||
|
pat.append('(.*)')
|
||||||
|
self.casts = casts
|
||||||
|
self.pat = re.compile(''.join(pat))
|
||||||
|
try:
|
||||||
|
argformat % ((0,) * len(casts)) # validate argformat
|
||||||
|
except ValueError as e:
|
||||||
|
raise ValueError("%s in %r" % (e, argformat))
|
||||||
|
|
||||||
|
def format(self, *values):
|
||||||
|
return self.fmt % values
|
||||||
|
|
||||||
|
def parse(self, reply):
|
||||||
|
match = self.pat.match(reply)
|
||||||
|
if not match:
|
||||||
|
raise ValueError('reply "%s" does not match pattern "%s"' % (reply, self.fmt))
|
||||||
|
return [c(v) for c, v in zip(self.casts, match.groups())]
|
||||||
|
|
||||||
|
|
||||||
|
class ChangeWrapper:
|
||||||
|
"""store parameter changes before they are applied"""
|
||||||
|
|
||||||
|
def __init__(self, module, pname, value):
|
||||||
|
self._module = module
|
||||||
|
setattr(self, pname, value)
|
||||||
|
|
||||||
|
def __getattr__(self, key):
|
||||||
|
"""get values from module for unknown keys"""
|
||||||
|
return getattr(self._module, key)
|
||||||
|
|
||||||
|
def apply(self, module):
|
||||||
|
"""set only changed values"""
|
||||||
|
for k, v in self.__dict__.items():
|
||||||
|
if k != '_module' and v != getattr(module, k):
|
||||||
|
setattr(module, k, v)
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return ', '.join('%s=%r' % (k, v) for k, v in self.__dict__.items() if k != '_module')
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
class CmdHandlerBase:
|
||||||
|
"""generic command handler"""
|
||||||
|
|
||||||
|
# def __init__(self, group, ...):
|
||||||
|
# init must at least initialize self.group, which is is relevant for calling the
|
||||||
|
# proper analyze_<group> and change_<group> methods
|
||||||
|
|
||||||
|
def parse_reply(self, reply):
|
||||||
|
"""return values from a raw reply"""
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
def make_query(self, module):
|
||||||
|
"""make a query"""
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
def make_change(self, module, *values):
|
||||||
|
"""make a change command from values"""
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
def send_command(self, module, changecmd=''):
|
||||||
|
"""send a command (query or change+query) and parse the reply into a list
|
||||||
|
|
||||||
|
If changecmd is given, it is prepended before the query.
|
||||||
|
"""
|
||||||
|
querycmd = self.make_query(module)
|
||||||
|
reply = module.sendRecv((changecmd + querycmd))
|
||||||
|
return self.parse_reply(reply)
|
||||||
|
|
||||||
|
def send_change(self, module, *values):
|
||||||
|
"""compose and send a command from values
|
||||||
|
|
||||||
|
and send a query afterwards. This method might be overriden, if the change command
|
||||||
|
can be combined with a query command, or if the change command already includes
|
||||||
|
a reply.
|
||||||
|
"""
|
||||||
|
changecmd = self.make_change(module, *values)
|
||||||
|
module.sendRecv(changecmd) # ignore result
|
||||||
|
return self.send_command(module)
|
||||||
|
|
||||||
|
def get_read_func(self, pname):
|
||||||
|
"""returns the read function passed to the metaclass"""
|
||||||
|
return self.read
|
||||||
|
|
||||||
|
def read(self, module):
|
||||||
|
"""the read function passed to the metaclass
|
||||||
|
|
||||||
|
overwrite with None if not used
|
||||||
|
"""
|
||||||
|
# do a read of the current hw values
|
||||||
|
reply = self.send_command(module)
|
||||||
|
# convert them to parameters
|
||||||
|
getattr(module, 'analyze_' + self.group)(*reply)
|
||||||
|
return Done # parameters should be updated already
|
||||||
|
|
||||||
|
def get_write_func(self, pname):
|
||||||
|
"""returns the write function passed to the metaclass
|
||||||
|
|
||||||
|
return None if not used.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def wfunc(module, value, cmd=self, pname=pname):
|
||||||
|
# do a read of the current hw values
|
||||||
|
values = cmd.send_command(module)
|
||||||
|
# convert them to parameters
|
||||||
|
analyze = getattr(module, 'analyze_' + cmd.group)
|
||||||
|
analyze(*values)
|
||||||
|
# create wrapper object 'new' with changed parameter 'pname'
|
||||||
|
new = ChangeWrapper(module, pname, value)
|
||||||
|
# call change_* for calculation new hw values
|
||||||
|
values = getattr(module, 'change_' + cmd.group)(new, *values)
|
||||||
|
# send the change command and a query command
|
||||||
|
analyze(*cmd.send_change(module, *values))
|
||||||
|
# update only changed values
|
||||||
|
new.apply(module)
|
||||||
|
return Done # parameter 'pname' should be changed already
|
||||||
|
|
||||||
|
return wfunc
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
class CmdHandler(CmdHandlerBase):
|
||||||
|
"""more evolved command handler
|
||||||
|
|
||||||
|
this command handler works for a syntax, where the change command syntax can be
|
||||||
|
build from the query command syntax, with the to be changed items at the second
|
||||||
|
part of the command, using the same format as for the reply.
|
||||||
|
Examples: devices from LakeShore, PPMS
|
||||||
|
|
||||||
|
implementing classes have to define/override the following:
|
||||||
|
"""
|
||||||
|
CMDARGS = [] # list of properties or parameters to be used for building
|
||||||
|
# some of the the query and change commands
|
||||||
|
CMDSEPARATOR = ';' # if given, it is valid to join a command a a query with
|
||||||
|
# the given separator
|
||||||
|
|
||||||
|
|
||||||
|
def __init__(self, group, querycmd, replyfmt):
|
||||||
|
"""initialize the command handler
|
||||||
|
|
||||||
|
group: the handler group (used for analyze_<group> and change_<group>)
|
||||||
|
querycmd: the command for a query, may contain named formats for cmdargs
|
||||||
|
replyfmt: the format for reading the reply with some scanf like behaviour
|
||||||
|
"""
|
||||||
|
self.group = group
|
||||||
|
self.querycmd = querycmd
|
||||||
|
self.replyfmt = CmdParser(replyfmt)
|
||||||
|
|
||||||
|
def parse_reply(self, reply):
|
||||||
|
"""return values from a raw reply"""
|
||||||
|
return self.replyfmt.parse(reply)
|
||||||
|
|
||||||
|
def make_query(self, module):
|
||||||
|
"""make a query"""
|
||||||
|
return self.querycmd % {k: getattr(module, k, None) for k in self.CMDARGS}
|
||||||
|
|
||||||
|
def make_change(self, module, *values):
|
||||||
|
"""make a change command from a query command"""
|
||||||
|
changecmd = self.querycmd.replace('?', ' ')
|
||||||
|
if not self.querycmd.endswith('?'):
|
||||||
|
changecmd += ','
|
||||||
|
changecmd %= {k: getattr(module, k, None) for k in self.CMDARGS}
|
||||||
|
return changecmd + self.replyfmt.format(*values)
|
||||||
|
|
||||||
|
def send_change(self, module, *values):
|
||||||
|
"""join change and query commands"""
|
||||||
|
if self.CMDSEPARATOR is None:
|
||||||
|
return super().send_change(module, *values)
|
||||||
|
return self.send_command(module, self.make_change(module, *values) + self.CMDSEPARATOR)
|
@ -33,6 +33,10 @@ from secop.properties import PropertyMeta
|
|||||||
|
|
||||||
EVENT_ONLY_ON_CHANGED_VALUES = False
|
EVENT_ONLY_ON_CHANGED_VALUES = False
|
||||||
|
|
||||||
|
class Done:
|
||||||
|
"""a special return value for a read/write function
|
||||||
|
|
||||||
|
indicating that the setter is triggered already"""
|
||||||
|
|
||||||
# warning: MAGIC!
|
# warning: MAGIC!
|
||||||
|
|
||||||
@ -108,57 +112,76 @@ class ModuleMeta(PropertyMeta):
|
|||||||
# skip commands for now
|
# skip commands for now
|
||||||
continue
|
continue
|
||||||
rfunc = attrs.get('read_' + pname, None)
|
rfunc = attrs.get('read_' + pname, None)
|
||||||
for base in bases:
|
handler = pobj.handler.get_read_func(pname) if pobj.handler else None
|
||||||
if rfunc is not None:
|
if handler:
|
||||||
break
|
|
||||||
rfunc = getattr(base, 'read_' + pname, None)
|
|
||||||
|
|
||||||
def wrapped_rfunc(self, pname=pname, rfunc=rfunc):
|
|
||||||
if rfunc:
|
if rfunc:
|
||||||
self.log.debug("rfunc(%s): call %r" % (pname, rfunc))
|
raise ProgrammingError("parameter '%s' can not have a handler "
|
||||||
try:
|
"and read_%s" % (pname, pname))
|
||||||
value = rfunc(self)
|
rfunc = handler
|
||||||
except Exception as e:
|
else:
|
||||||
pobj = self.accessibles[pname]
|
rfunc = attrs.get('read_' + pname, None)
|
||||||
self.DISPATCHER.announce_update_error(self, pname, pobj, e)
|
for base in bases:
|
||||||
raise
|
if rfunc is not None:
|
||||||
else:
|
break
|
||||||
# return cached value
|
rfunc = getattr(base, 'read_' + pname, None)
|
||||||
self.log.debug("rfunc(%s): return cached value" % pname)
|
|
||||||
value = self.accessibles[pname].value
|
if rfunc is None or getattr(rfunc, '__wrapped__', False) is False:
|
||||||
setattr(self, pname, value) # important! trigger the setter
|
|
||||||
return value
|
def wrapped_rfunc(self, pname=pname, rfunc=rfunc):
|
||||||
|
if rfunc:
|
||||||
|
self.log.debug("rfunc(%s): call %r" % (pname, rfunc))
|
||||||
|
try:
|
||||||
|
value = rfunc(self)
|
||||||
|
if value is Done: # the setter is already triggered
|
||||||
|
return getattr(self, pname)
|
||||||
|
except Exception as e:
|
||||||
|
pobj = self.accessibles[pname]
|
||||||
|
self.DISPATCHER.announce_update_error(self, pname, pobj, e)
|
||||||
|
raise
|
||||||
|
else:
|
||||||
|
# return cached value
|
||||||
|
self.log.debug("rfunc(%s): return cached value" % pname)
|
||||||
|
value = self.accessibles[pname].value
|
||||||
|
setattr(self, pname, value) # important! trigger the setter
|
||||||
|
return value
|
||||||
|
|
||||||
if rfunc:
|
|
||||||
wrapped_rfunc.__doc__ = rfunc.__doc__
|
wrapped_rfunc.__doc__ = rfunc.__doc__
|
||||||
if getattr(rfunc, '__wrapped__', False) is False:
|
|
||||||
setattr(newtype, 'read_' + pname, wrapped_rfunc)
|
setattr(newtype, 'read_' + pname, wrapped_rfunc)
|
||||||
wrapped_rfunc.__wrapped__ = True
|
wrapped_rfunc.__wrapped__ = True
|
||||||
|
|
||||||
if not pobj.readonly:
|
if not pobj.readonly:
|
||||||
wfunc = attrs.get('write_' + pname, None)
|
wfunc = attrs.get('write_' + pname, None)
|
||||||
for base in bases:
|
handler = pobj.handler.get_write_func(pname) if pobj.handler else None
|
||||||
if wfunc is not None:
|
if handler:
|
||||||
break
|
|
||||||
wfunc = getattr(base, 'write_' + pname, None)
|
|
||||||
|
|
||||||
def wrapped_wfunc(self, value, pname=pname, wfunc=wfunc):
|
|
||||||
self.log.debug("wfunc(%s): set %r" % (pname, value))
|
|
||||||
pobj = self.accessibles[pname]
|
|
||||||
value = pobj.datatype(value)
|
|
||||||
if wfunc:
|
if wfunc:
|
||||||
self.log.debug('calling %r(%r)' % (wfunc, value))
|
raise ProgrammingError("parameter '%s' can not have a handler "
|
||||||
returned_value = wfunc(self, value)
|
"and write_%s" % (pname, pname))
|
||||||
if returned_value is not None:
|
wfunc = handler
|
||||||
value = returned_value
|
else:
|
||||||
setattr(self, pname, value)
|
for base in bases:
|
||||||
return value
|
if wfunc is not None:
|
||||||
|
break
|
||||||
|
wfunc = getattr(base, 'write_' + pname, None)
|
||||||
|
|
||||||
|
if wfunc is None or getattr(wfunc, '__wrapped__', False) is False:
|
||||||
|
|
||||||
|
def wrapped_wfunc(self, value, pname=pname, wfunc=wfunc):
|
||||||
|
self.log.debug("check validity of %s = %r" % (pname, value))
|
||||||
|
pobj = self.accessibles[pname]
|
||||||
|
value = pobj.datatype(value)
|
||||||
|
if wfunc:
|
||||||
|
self.log.debug('calling %r(%r)' % (wfunc, value))
|
||||||
|
returned_value = wfunc(self, value)
|
||||||
|
if returned_value is Done: # the setter is already triggered
|
||||||
|
return getattr(self, pname)
|
||||||
|
if returned_value is not None:
|
||||||
|
value = returned_value
|
||||||
|
setattr(self, pname, value)
|
||||||
|
return value
|
||||||
|
|
||||||
if wfunc:
|
|
||||||
wrapped_wfunc.__doc__ = wfunc.__doc__
|
wrapped_wfunc.__doc__ = wfunc.__doc__
|
||||||
if getattr(wfunc, '__wrapped__', False) is False:
|
|
||||||
setattr(newtype, 'write_' + pname, wrapped_wfunc)
|
setattr(newtype, 'write_' + pname, wrapped_wfunc)
|
||||||
wrapped_wfunc.__wrapped__ = True
|
wrapped_wfunc.__wrapped__ = True
|
||||||
|
|
||||||
def getter(self, pname=pname):
|
def getter(self, pname=pname):
|
||||||
return self.accessibles[pname].value
|
return self.accessibles[pname].value
|
||||||
|
@ -110,11 +110,15 @@ class Parameter(Accessible):
|
|||||||
'export': Property('Is this parameter accessible via SECoP? (vs. internal parameter)',
|
'export': Property('Is this parameter accessible via SECoP? (vs. internal parameter)',
|
||||||
OrType(BoolType(), StringType()), export=False, default=True),
|
OrType(BoolType(), StringType()), export=False, default=True),
|
||||||
'poll': Property('Polling indicator', IntRange(), export=False, default=False),
|
'poll': Property('Polling indicator', IntRange(), export=False, default=False),
|
||||||
'optional': Property('[Internal] is this parameter optional?', BoolType(), export=False, default=False),
|
'optional': Property('[Internal] is this parameter optional?', BoolType(), export=False,
|
||||||
|
settable=False, default=False),
|
||||||
|
'handler': Property('[internal] overload the standard read and write functions',
|
||||||
|
ValueType(), export=False, default=None, mandatory=False, settable=False),
|
||||||
}
|
}
|
||||||
|
|
||||||
value = None
|
value = None
|
||||||
timestamp = None
|
timestamp = None
|
||||||
|
|
||||||
def __init__(self, description, datatype, ctr=None, unit=None, **kwds):
|
def __init__(self, description, datatype, ctr=None, unit=None, **kwds):
|
||||||
|
|
||||||
if ctr is not None:
|
if ctr is not None:
|
||||||
@ -256,7 +260,7 @@ class Command(Accessible):
|
|||||||
extname='visibility', export=True, default=1),
|
extname='visibility', export=True, default=1),
|
||||||
'export': Property('[internal] Flag: is the command accessible via SECoP? (vs. pure internal use)',
|
'export': Property('[internal] Flag: is the command accessible via SECoP? (vs. pure internal use)',
|
||||||
OrType(BoolType(), StringType()), export=False, default=True),
|
OrType(BoolType(), StringType()), export=False, default=True),
|
||||||
'optional': Property('[internal] is The comamnd optional to implement? (vs. mandatory',
|
'optional': Property('[internal] is the command optional to implement? (vs. mandatory)',
|
||||||
BoolType(), export=False, default=False, settable=False),
|
BoolType(), export=False, default=False, settable=False),
|
||||||
'datatype': Property('[internal] datatype of the command, auto generated from \'argument\' and \'result\'',
|
'datatype': Property('[internal] datatype of the command, auto generated from \'argument\' and \'result\'',
|
||||||
DataTypeType(), extname='datainfo', mandatory=True),
|
DataTypeType(), extname='datainfo', mandatory=True),
|
||||||
|
184
test/test_commandhandler.py
Normal file
184
test/test_commandhandler.py
Normal file
@ -0,0 +1,184 @@
|
|||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
# *****************************************************************************
|
||||||
|
#
|
||||||
|
# This program is free software; you can redistribute it and/or modify it under
|
||||||
|
# the terms of the GNU General Public License as published by the Free Software
|
||||||
|
# Foundation; either version 2 of the License, or (at your option) any later
|
||||||
|
# version.
|
||||||
|
#
|
||||||
|
# This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
||||||
|
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
|
||||||
|
# details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU General Public License along with
|
||||||
|
# this program; if not, write to the Free Software Foundation, Inc.,
|
||||||
|
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
||||||
|
#
|
||||||
|
# Module authors:
|
||||||
|
# Markus Zolliker <markus.zolliker@psi.ch>
|
||||||
|
#
|
||||||
|
# *****************************************************************************
|
||||||
|
"""test commandhandler."""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from secop.commandhandler import CmdParser, CmdHandler
|
||||||
|
from secop.modules import Module, Parameter
|
||||||
|
from secop.datatypes import FloatRange, StringType, IntRange, Property
|
||||||
|
|
||||||
|
@pytest.mark.parametrize('fmt, text, values, text2', [
|
||||||
|
('%d,%d', '2,3', [2,3], None),
|
||||||
|
('%c%s', '222', ['2', '22'], None), # %c does not need a spearator
|
||||||
|
('%s%%%d_%%', 'abc%12_%', ['abc', 12], None), # % as separator and within separator
|
||||||
|
('%s.+', 'string,without+period.+', ['string,without+period',], None), # special characters
|
||||||
|
('%d,%.3f,%x,%o', '1,1.2346,fF,17', [1, 1.2346, 255, 15], '1,1.235,ff,17'), # special formats
|
||||||
|
])
|
||||||
|
def test_CmdParser(fmt, text, values, text2):
|
||||||
|
parser = CmdParser(fmt)
|
||||||
|
print(parser.format(*values))
|
||||||
|
assert parser.parse(text) == values
|
||||||
|
if text2 is None:
|
||||||
|
text2 = text
|
||||||
|
assert parser.format(*values) == text2
|
||||||
|
|
||||||
|
def test_CmdParser_ex():
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
CmdParser('%d%s') # no separator
|
||||||
|
|
||||||
|
|
||||||
|
class Data:
|
||||||
|
"""a cyclic list where we put data to be checked or used during test"""
|
||||||
|
def __init__(self):
|
||||||
|
self.data = []
|
||||||
|
|
||||||
|
def push(self, tag, *args):
|
||||||
|
if len(args) == 1:
|
||||||
|
args = args[0]
|
||||||
|
self.data.append((tag, args))
|
||||||
|
|
||||||
|
def pop(self, expected):
|
||||||
|
tag, data = self.data.pop(0)
|
||||||
|
print('pop(%s) %r' % (tag, data))
|
||||||
|
if tag != expected:
|
||||||
|
raise ValueError('expected tag %s' % expected)
|
||||||
|
return data
|
||||||
|
|
||||||
|
def empty(self):
|
||||||
|
return not self.data
|
||||||
|
|
||||||
|
|
||||||
|
class DispatcherStub:
|
||||||
|
def __init__(self, updates):
|
||||||
|
self.updates = updates
|
||||||
|
|
||||||
|
def announce_update(self, moduleobj, pname, pobj):
|
||||||
|
self.updates[pname] = pobj.value
|
||||||
|
|
||||||
|
def announce_update_error(self, moduleobj, pname, pobj, err):
|
||||||
|
self.updates[('error', pname)] = str(err)
|
||||||
|
|
||||||
|
|
||||||
|
class LoggerStub:
|
||||||
|
def debug(self, *args):
|
||||||
|
pass
|
||||||
|
info = exception = debug
|
||||||
|
|
||||||
|
|
||||||
|
class ServerStub:
|
||||||
|
def __init__(self, updates):
|
||||||
|
self.dispatcher = DispatcherStub(updates)
|
||||||
|
|
||||||
|
|
||||||
|
def test_CmdHandler():
|
||||||
|
class Hdl(CmdHandler):
|
||||||
|
CMDARGS = ['channel', 'loop']
|
||||||
|
CMDSEPARATOR ='|'
|
||||||
|
|
||||||
|
group1 = Hdl('group1', 'SIMPLE?', '%g')
|
||||||
|
group2 = Hdl('group2', 'CMD?%(channel)d', '%g,%s,%d')
|
||||||
|
|
||||||
|
|
||||||
|
class TestModule(Module):
|
||||||
|
properties = {
|
||||||
|
'channel': Property('the channel', IntRange(), default=3),
|
||||||
|
'loop': Property('the loop', IntRange(), default=2),
|
||||||
|
}
|
||||||
|
parameters = {
|
||||||
|
'simple': Parameter('a readonly', FloatRange(), default=0.77, handler=group1),
|
||||||
|
'real': Parameter('a float value', FloatRange(), default=12.3, handler=group2, readonly=False),
|
||||||
|
'text': Parameter('a string value', StringType(), default='x', handler=group2, readonly=False),
|
||||||
|
}
|
||||||
|
|
||||||
|
def sendRecv(self, command):
|
||||||
|
assert data.pop('command') == command
|
||||||
|
return data.pop('reply')
|
||||||
|
|
||||||
|
def analyze_group1(self, val):
|
||||||
|
assert data.pop('val') == val
|
||||||
|
self.simple = data.pop('simple')
|
||||||
|
|
||||||
|
def analyze_group2(self, gval, sval, dval):
|
||||||
|
assert data.pop('gsv') == (gval, sval, dval)
|
||||||
|
self.real, self.text = data.pop('rt')
|
||||||
|
|
||||||
|
def change_group2(self, new, gval, sval, dval):
|
||||||
|
assert data.pop('old') == (gval, sval, dval)
|
||||||
|
assert data.pop('self') == (self.real, self.text)
|
||||||
|
assert data.pop('new') == (new.real, new.text)
|
||||||
|
return data.pop('changed')
|
||||||
|
|
||||||
|
data = Data()
|
||||||
|
updates = {}
|
||||||
|
module = TestModule('mymodule', LoggerStub(), {'.description': ''}, ServerStub(updates))
|
||||||
|
updates.clear() # get rid of updates from initialisation
|
||||||
|
|
||||||
|
# for sendRecv
|
||||||
|
data.push('command', 'SIMPLE?')
|
||||||
|
data.push('reply', '4.51')
|
||||||
|
# for analyze_group1
|
||||||
|
data.push('val', 4.51)
|
||||||
|
data.push('simple', 45.1)
|
||||||
|
value = module.read_simple()
|
||||||
|
assert value == 45.1
|
||||||
|
assert module.simple == value
|
||||||
|
assert data.empty()
|
||||||
|
assert updates.pop('simple') == 45.1
|
||||||
|
assert not updates
|
||||||
|
|
||||||
|
# for sendRecv
|
||||||
|
data.push('command', 'CMD?3')
|
||||||
|
data.push('reply', '1.23,text,5')
|
||||||
|
# for analyze_group2
|
||||||
|
data.push('gsv', 1.23, 'text', 5)
|
||||||
|
data.push('rt', 12.3, 'string')
|
||||||
|
value = module.read_real()
|
||||||
|
assert module.real == value
|
||||||
|
assert module.real == updates.pop('real') == 12.3
|
||||||
|
assert module.text == updates.pop('text') == 'string'
|
||||||
|
assert data.empty()
|
||||||
|
assert not updates
|
||||||
|
|
||||||
|
# for sendRecv
|
||||||
|
data.push('command', 'CMD?3')
|
||||||
|
data.push('reply', '1.23,text,5')
|
||||||
|
# for analyze_group2
|
||||||
|
data.push('gsv', 1.23, 'text', 5)
|
||||||
|
data.push('rt', 12.3, 'string')
|
||||||
|
# for change_group2
|
||||||
|
data.push('old', 1.23, 'text', 5)
|
||||||
|
data.push('self', 12.3, 'string')
|
||||||
|
data.push('new', 12.3, 'FOO')
|
||||||
|
data.push('changed', 1.23, 'foo', 9)
|
||||||
|
# for sendRecv
|
||||||
|
data.push('command', 'CMD 3,1.23,foo,9|CMD?3')
|
||||||
|
data.push('reply', '1.23,foo,9')
|
||||||
|
# for analyze_group2
|
||||||
|
data.push('gsv', 1.23, 'foo', 9)
|
||||||
|
data.push('rt', 12.3, 'FOO')
|
||||||
|
value = module.write_text('FOO')
|
||||||
|
assert module.text == value
|
||||||
|
assert module.text == updates.pop('text') == 'FOO'
|
||||||
|
assert module.real == updates.pop('real') == 12.3
|
||||||
|
assert data.empty()
|
||||||
|
assert not updates
|
@ -101,7 +101,7 @@ def test_ModuleMeta():
|
|||||||
))()
|
))()
|
||||||
|
|
||||||
dispatcher = type('DispatcherStub', (object,), dict(
|
dispatcher = type('DispatcherStub', (object,), dict(
|
||||||
announce_update = lambda self, m, pn, pv: print('%s:%s=%r' % (m.name, pn, pv)),
|
announce_update = lambda self, m, pn, po: print('%s:%s=%r' % (m.name, pn, po.value)),
|
||||||
))()
|
))()
|
||||||
|
|
||||||
srv = type('ServerStub', (object,), dict(
|
srv = type('ServerStub', (object,), dict(
|
||||||
@ -142,7 +142,7 @@ def test_ModuleMeta():
|
|||||||
'status', 'param1', 'param2', 'cmd', 'a2', 'pollinterval', 'b2', 'cmd2', 'value',
|
'status', 'param1', 'param2', 'cmd', 'a2', 'pollinterval', 'b2', 'cmd2', 'value',
|
||||||
'a1'}
|
'a1'}
|
||||||
assert set(cfg['value'].keys()) == {'group', 'export', 'relative_resolution',
|
assert set(cfg['value'].keys()) == {'group', 'export', 'relative_resolution',
|
||||||
'visibility', 'unit', 'default', 'optional', 'datatype', 'fmtstr',
|
'visibility', 'unit', 'default', 'datatype', 'fmtstr',
|
||||||
'absolute_resolution', 'poll', 'max', 'min', 'readonly', 'constant',
|
'absolute_resolution', 'poll', 'max', 'min', 'readonly', 'constant',
|
||||||
'description'}
|
'description'}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user