more merges from gerrit

Change-Id: I13441cd8889dd39f74a2dd1a85e75a1b76bb93c8
This commit is contained in:
2022-03-08 10:52:14 +01:00
parent 10018b8cad
commit 34b93adef0
20 changed files with 1423 additions and 340 deletions

78
test/test_attach.py Normal file
View File

@ -0,0 +1,78 @@
# -*- coding: utf-8 -*-
# *****************************************************************************
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
#
# *****************************************************************************
from secop.modules import Module, Attached
from secop.protocol.dispatcher import Dispatcher
# class DispatcherStub:
# # omit_unchanged_within = 0
#
# # def __init__(self, updates):
# # self.updates = updates
# #
# # def announce_update(self, modulename, pname, pobj):
# # self.updates.setdefault(modulename, {})
# # if pobj.readerror:
# # self.updates[modulename]['error', pname] = str(pobj.readerror)
# # else:
# # self.updates[modulename][pname] = pobj.value
#
# def __init__(self):
# self.modules = {}
#
# def get_module(self, name):
# return self.modules[name]
#
# def register_module(self, name, module):
# self.modules[name] = module
class LoggerStub:
def debug(self, fmt, *args):
print(fmt % args)
info = warning = exception = debug
handlers = []
logger = LoggerStub()
class ServerStub:
restart = None
shutdown = None
def __init__(self):
self.dispatcher = Dispatcher('dispatcher', logger, {}, self)
def test_attach():
class Mod(Module):
att = Attached()
srv = ServerStub()
a = Module('a', logger, {'description': ''}, srv)
m = Mod('m', logger, {'description': '', 'att': 'a'}, srv)
assert m.propertyValues['att'] == 'a'
srv.dispatcher.register_module(a, 'a')
srv.dispatcher.register_module(m, 'm')
assert m.att == a

View File

@ -28,7 +28,9 @@ import pytest
from secop.datatypes import ArrayOf, BLOBType, BoolType, \
CommandType, ConfigError, DataType, Enum, EnumType, FloatRange, \
IntRange, ProgrammingError, ScaledInteger, StatusType, \
StringType, StructOf, TextType, TupleOf, get_datatype
StringType, StructOf, TextType, TupleOf, get_datatype, \
DiscouragedConversion
from secop.lib import generalConfig
def copytest(dt):
@ -36,6 +38,7 @@ def copytest(dt):
assert dt.export_datatype() == dt.copy().export_datatype()
assert dt != dt.copy()
def test_DataType():
dt = DataType()
with pytest.raises(NotImplementedError):
@ -116,7 +119,6 @@ def test_IntRange():
dt('1.3')
dt(1)
dt(0)
dt('1')
with pytest.raises(ProgrammingError):
IntRange('xc', 'Yx')
@ -132,6 +134,7 @@ def test_IntRange():
with pytest.raises(ConfigError):
dt.checkProperties()
def test_ScaledInteger():
dt = ScaledInteger(0.01, -3, 3)
copytest(dt)
@ -407,6 +410,7 @@ def test_ArrayOf():
dt = ArrayOf(EnumType('myenum', single=0), 5)
copytest(dt)
def test_TupleOf():
# test constructor catching illegal arguments
with pytest.raises(ValueError):
@ -641,6 +645,7 @@ def test_oneway_compatible(dt, contained_in):
with pytest.raises(ValueError):
contained_in.compatible(dt)
@pytest.mark.parametrize('dt1, dt2', [
(FloatRange(-5.5, 5.5), ScaledInteger(10, -5.5, 5.5)),
(IntRange(0,1), BoolType()),
@ -650,6 +655,7 @@ def test_twoway_compatible(dt1, dt2):
dt1.compatible(dt1)
dt2.compatible(dt2)
@pytest.mark.parametrize('dt1, dt2', [
(StringType(), FloatRange()),
(IntRange(-10, 10), StringType()),
@ -665,3 +671,12 @@ def test_incompatible(dt1, dt2):
dt1.compatible(dt2)
with pytest.raises(ValueError):
dt2.compatible(dt1)
@pytest.mark.parametrize('dt', [FloatRange(), IntRange(), ScaledInteger(1)])
def test_lazy_validation(dt):
generalConfig.defaults['lazy_number_validation'] = True
dt('0')
generalConfig.defaults['lazy_number_validation'] = False
with pytest.raises(DiscouragedConversion):
dt('0')

234
test/test_handler.py Normal file
View File

@ -0,0 +1,234 @@
# -*- coding: utf-8 -*-
# *****************************************************************************
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
#
# *****************************************************************************
from secop.rwhandler import ReadHandler, WriteHandler, \
CommonReadHandler, CommonWriteHandler, nopoll
from secop.core import Module, Parameter, FloatRange, Done
class DispatcherStub:
# the first update from the poller comes a very short time after the
# initial value from the timestamp. However, in the test below
# the second update happens after the updates dict is cleared
# -> we have to inhibit the 'omit unchanged update' feature
omit_unchanged_within = 0
def __init__(self, updates):
self.updates = updates
def announce_update(self, modulename, pname, pobj):
self.updates.setdefault(modulename, {})
if pobj.readerror:
self.updates[modulename]['error', pname] = str(pobj.readerror)
else:
self.updates[modulename][pname] = pobj.value
class LoggerStub:
def debug(self, fmt, *args):
print(fmt % args)
info = warning = exception = error = debug
handlers = []
logger = LoggerStub()
class ServerStub:
def __init__(self, updates):
self.dispatcher = DispatcherStub(updates)
class ModuleTest(Module):
def __init__(self, updates=None, **opts):
opts['description'] = ''
super().__init__('mod', logger, opts, ServerStub(updates or {}))
def test_handler():
data = []
class Mod(ModuleTest):
a = Parameter('', FloatRange(), readonly=False)
b = Parameter('', FloatRange(), readonly=False)
@ReadHandler(['a', 'b'])
def read_hdl(self, pname):
value = data.pop()
data.append(pname)
return value
@WriteHandler(['a', 'b'])
def write_hdl(self, pname, value):
data.append(pname)
return value
assert Mod.read_a.poll is True
assert Mod.read_b.poll is True
m = Mod()
data.append(1.2)
assert m.read_a() == 1.2
assert data.pop() == 'a'
data.append(1.3)
assert m.read_b() == 1.3
assert data.pop() == 'b'
assert m.write_a(1.5) == 1.5
assert m.a == 1.5
assert data.pop() == 'a'
assert m.write_b(7) == 7
assert m.b == 7
assert data.pop() == 'b'
data.append(Done)
assert m.read_b() == 7
assert data.pop() == 'b'
assert data == []
def test_common_handler():
data = []
class Mod(ModuleTest):
a = Parameter('', FloatRange(), readonly=False)
b = Parameter('', FloatRange(), readonly=False)
@CommonReadHandler(['a', 'b'])
def read_hdl(self):
self.a, self.b = data.pop()
data.append('read_hdl')
@CommonWriteHandler(['a', 'b'])
def write_hdl(self, values):
self.a = values['a']
self.b = values['b']
data.append('write_hdl')
assert set([Mod.read_a.poll, Mod.read_b.poll]) == {True, False}
m = Mod(a=1, b=2)
assert m.writeDict == {'a': 1, 'b': 2}
m.write_a(3)
assert m.a == 3
assert m.b == 2
assert data.pop() == 'write_hdl'
assert m.writeDict == {}
m.write_b(4)
assert m.a == 3
assert m.b == 4
assert data.pop() == 'write_hdl'
data.append((3, 4))
assert m.read_a() == 3
assert m.a == 3
assert m.b == 4
assert data.pop() == 'read_hdl'
data.append((5, 6))
assert m.read_b() == 6
assert data.pop() == 'read_hdl'
data.append((1.1, 2.2))
assert m.read_b() == 2.2
assert m.a == 1.1
assert m.b == 2.2
assert data.pop() == 'read_hdl'
assert data == []
def test_nopoll():
class Mod1(ModuleTest):
a = Parameter('', FloatRange(), readonly=False)
b = Parameter('', FloatRange(), readonly=False)
@ReadHandler(['a', 'b'])
def read_hdl(self):
pass
assert Mod1.read_a.poll is True
assert Mod1.read_b.poll is True
class Mod2(ModuleTest):
a = Parameter('', FloatRange(), readonly=False)
b = Parameter('', FloatRange(), readonly=False)
@CommonReadHandler(['a', 'b'])
def read_hdl(self):
pass
assert Mod2.read_a.poll is True
assert Mod2.read_b.poll is False
class Mod3(ModuleTest):
a = Parameter('', FloatRange(), readonly=False)
b = Parameter('', FloatRange(), readonly=False)
@ReadHandler(['a', 'b'])
@nopoll
def read_hdl(self):
pass
assert Mod3.read_a.poll is False
assert Mod3.read_b.poll is False
class Mod4(ModuleTest):
a = Parameter('', FloatRange(), readonly=False)
b = Parameter('', FloatRange(), readonly=False)
@nopoll
@ReadHandler(['a', 'b'])
def read_hdl(self):
pass
assert Mod4.read_a.poll is False
assert Mod4.read_b.poll is False
class Mod5(ModuleTest):
a = Parameter('', FloatRange(), readonly=False)
b = Parameter('', FloatRange(), readonly=False)
@CommonReadHandler(['a', 'b'])
@nopoll
def read_hdl(self):
pass
assert Mod5.read_a.poll is False
assert Mod5.read_b.poll is False
class Mod6(ModuleTest):
a = Parameter('', FloatRange(), readonly=False)
b = Parameter('', FloatRange(), readonly=False)
@nopoll
@CommonReadHandler(['a', 'b'])
def read_hdl(self):
pass
assert Mod6.read_a.poll is False
assert Mod6.read_b.poll is False

View File

@ -120,7 +120,7 @@ def test_IOHandler():
real = Parameter('a float value', FloatRange(), default=12.3, handler=group2, readonly=False)
text = Parameter('a string value', StringType(), default='x', handler=group2, readonly=False)
def sendRecv(self, command):
def communicate(self, command):
assert data.pop('command') == command
return data.pop('reply')
@ -146,7 +146,7 @@ def test_IOHandler():
print(updates)
updates.clear() # get rid of updates from initialisation
# for sendRecv
# for communicate
data.push('command', 'SIMPLE?')
data.push('reply', '4.51')
# for analyze_group1
@ -159,7 +159,7 @@ def test_IOHandler():
assert updates.pop('simple') == 45.1
assert not updates
# for sendRecv
# for communicate
data.push('command', 'CMD?3')
data.push('reply', '1.23,text,5')
# for analyze_group2
@ -172,7 +172,7 @@ def test_IOHandler():
assert data.empty()
assert not updates
# for sendRecv
# for communicate
data.push('command', 'CMD?3')
data.push('reply', '1.23,text,5')
# for analyze_group2
@ -183,7 +183,7 @@ def test_IOHandler():
data.push('self', 12.3, 'string')
data.push('new', 12.3, 'FOO')
data.push('changed', 1.23, 'foo', 9)
# for sendRecv
# for communicate
data.push('command', 'CMD 3,1.23,foo,9|CMD?3')
data.push('reply', '1.23,foo,9')
# for analyze_group2

274
test/test_logging.py Normal file
View File

@ -0,0 +1,274 @@
# -*- coding: utf-8 -*-
# *****************************************************************************
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
#
# *****************************************************************************
import pytest
import mlzlog
from secop.modules import Module
from secop.protocol.dispatcher import Dispatcher
from secop.protocol.interface import encode_msg_frame, decode_msg
import secop.logging
from secop.logging import logger, generalConfig, HasComlog
class ServerStub:
restart = None
shutdown = None
def __init__(self):
self.dispatcher = Dispatcher('', logger.log.getChild('dispatcher'), {}, self)
class Connection:
def __init__(self, name, dispatcher, result):
self.result = result
self.dispatcher = dispatcher
self.name = name
dispatcher.add_connection(self)
def send_reply(self, msg):
self.result.append(encode_msg_frame(*msg).strip().decode())
def send(self, msg):
request = decode_msg(msg.encode())
assert self.dispatcher.handle_request(self, request) == request
@pytest.fixture(name='init')
def init_(monkeypatch):
logger.__init__()
class Playground:
def __init__(self, console_level='debug', comlog=True, com_module=True):
self.result_dict = result_dict = dict(
console=[], comlog=[], conn1=[], conn2=[])
class ConsoleHandler(mlzlog.Handler):
def __init__(self, *args, **kwds):
super().__init__()
self.result = result_dict['console']
def emit(self, record):
if record.name != 'frappy.dispatcher':
self.result.append('%s %s %s' % (record.name, record.levelname, record.getMessage()))
class ComLogHandler(mlzlog.Handler):
def __init__(self, *args, **kwds):
super().__init__()
self.result = result_dict['comlog']
def emit(self, record):
self.result.append('%s %s' % (record.name.split('.')[1], record.getMessage()))
class LogfileHandler(mlzlog.Handler):
def __init__(self, *args, **kwds):
super().__init__()
def noop(self, *args):
pass
close = flush = emit = noop
monkeypatch.setattr(mlzlog, 'ColoredConsoleHandler', ConsoleHandler)
monkeypatch.setattr(secop.logging, 'ComLogfileHandler', ComLogHandler)
monkeypatch.setattr(secop.logging, 'LogfileHandler', LogfileHandler)
class Mod(Module):
result = []
def __init__(self, name, srv, **kwds):
kwds['description'] = ''
super().__init__(name or 'mod', logger.log.getChild(name), kwds, srv)
srv.dispatcher.register_module(self, name, name)
self.result[:] = []
def earlyInit(self):
pass
class Com(HasComlog, Mod):
def __init__(self, name, srv, **kwds):
super().__init__(name, srv, **kwds)
self.earlyInit()
self.log.handlers[-1].result = result_dict['comlog']
def communicate(self, request):
self.comLog('> %s', request)
generalConfig.testinit(logger_root='frappy', comlog=comlog)
logger.init(console_level)
self.srv = ServerStub()
self.conn1 = Connection('conn1', self.srv.dispatcher, self.result_dict['conn1'])
self.conn2 = Connection('conn2', self.srv.dispatcher, self.result_dict['conn2'])
self.mod = Mod('mod', self.srv)
self.com = Com('com', self.srv, comlog=com_module)
for item in self.result_dict.values():
assert item == []
def check(self, both=None, **expected):
if both:
expected['conn1'] = expected['conn2'] = both
assert self.result_dict['console'] == expected.get('console', [])
assert self.result_dict['comlog'] == expected.get('comlog', [])
assert self.result_dict['conn1'] == expected.get('conn1', [])
assert self.result_dict['conn2'] == expected.get('conn2', [])
for item in self.result_dict.values():
item[:] = []
def comlog(self, flag):
logger.comlog = flag
yield Playground
# revert settings
generalConfig.testinit()
logger.__init__()
def test_mod_info(init):
p = init()
p.mod.log.info('i')
p.check(console=['frappy.mod INFO i'])
p.conn1.send('logging mod "debug"')
p.conn2.send('logging mod "info"')
p.mod.log.info('i')
p.check(console=['frappy.mod INFO i'], both=['log mod:info "i"'])
def test_mod_debug(init):
p = init()
p.mod.log.debug('d')
p.check(console=['frappy.mod DEBUG d'])
p.conn1.send('logging mod "debug"')
p.conn2.send('logging mod "info"')
p.mod.log.debug('d')
p.check(console=['frappy.mod DEBUG d'], conn1=['log mod:debug "d"'])
def test_com_info(init):
p = init()
p.com.log.info('i')
p.check(console=['frappy.com INFO i'])
p.conn1.send('logging com "info"')
p.conn2.send('logging com "debug"')
p.com.log.info('i')
p.check(console=['frappy.com INFO i'], both=['log com:info "i"'])
def test_com_debug(init):
p = init()
p.com.log.debug('d')
p.check(console=['frappy.com DEBUG d'])
p.conn2.send('logging com "debug"')
p.com.log.debug('d')
p.check(console=['frappy.com DEBUG d'], conn2=['log com:debug "d"'])
def test_com_com(init):
p = init()
p.com.communicate('x')
p.check(console=['frappy.com COMLOG > x'], comlog=['com > x'])
p.conn1.send('logging mod "debug"')
p.conn2.send('logging mod "info"')
p.conn2.send('logging com "debug"')
p.com.communicate('x')
p.check(console=['frappy.com COMLOG > x'], comlog=['com > x'], conn2=['log com:comlog "> x"'])
def test_main_info(init):
p = init(console_level='info')
p.mod.log.debug('d')
p.com.communicate('x')
p.check(comlog=['com > x'])
p.conn1.send('logging mod "debug"')
p.conn2.send('logging mod "info"')
p.conn2.send('logging com "debug"')
p.com.communicate('x')
p.check(comlog=['com > x'], conn2=['log com:comlog "> x"'])
def test_comlog_off(init):
p = init(console_level='info', comlog=False)
p.mod.log.debug('d')
p.com.communicate('x')
p.check()
def test_comlog_module_off(init):
p = init(console_level='info', com_module=False)
p.mod.log.debug('d')
p.com.communicate('x')
p.check()
def test_remote_all_off(init):
p = init()
p.conn1.send('logging mod "debug"')
p.conn2.send('logging mod "info"')
p.conn2.send('logging com "debug"')
p.mod.log.debug('d')
p.com.communicate('x')
p.mod.log.info('i')
checks = dict(
console=['frappy.mod DEBUG d', 'frappy.com COMLOG > x', 'frappy.mod INFO i'],
comlog=['com > x'],
conn1=['log mod:debug "d"', 'log mod:info "i"'],
conn2=['log com:comlog "> x"', 'log mod:info "i"'])
p.check(**checks)
p.conn1.send('logging "off"')
p.mod.log.debug('d')
p.com.communicate('x')
p.mod.log.info('i')
checks.pop('conn1')
p.check(**checks)
p.conn2.send('logging . "off"')
p.mod.log.debug('d')
p.com.communicate('x')
p.mod.log.info('i')
checks.pop('conn2')
p.check(**checks)
def test_remote_single_off(init):
p = init()
p.conn1.send('logging mod "debug"')
p.conn2.send('logging mod "info"')
p.conn2.send('logging com "debug"')
p.mod.log.debug('d')
p.com.communicate('x')
p.mod.log.info('i')
checks = dict(
console=['frappy.mod DEBUG d', 'frappy.com COMLOG > x', 'frappy.mod INFO i'],
comlog=['com > x'],
conn1=['log mod:debug "d"', 'log mod:info "i"'],
conn2=['log com:comlog "> x"', 'log mod:info "i"'])
p.check(**checks)
p.conn2.send('logging com "off"')
p.mod.log.debug('d')
p.com.communicate('x')
p.mod.log.info('i')
checks['conn2'] = ['log mod:info "i"']
p.check(**checks)
p.conn2.send('logging mod "off"')
p.mod.log.debug('d')
p.com.communicate('x')
p.mod.log.info('i')
checks['conn2'] = []
p.check(**checks)

View File

@ -22,15 +22,16 @@
# *****************************************************************************
"""test data types."""
import sys
import threading
import pytest
from secop.datatypes import BoolType, FloatRange, StringType, IntRange
from secop.datatypes import BoolType, FloatRange, StringType, IntRange, ScaledInteger
from secop.errors import ProgrammingError, ConfigError
from secop.modules import Communicator, Drivable, Readable, Module
from secop.params import Command, Parameter
from secop.poller import BasicPoller
from secop.rwhandler import ReadHandler, WriteHandler, nopoll
from secop.lib import generalConfig
class DispatcherStub:
@ -52,9 +53,10 @@ class DispatcherStub:
class LoggerStub:
def debug(self, *args):
print(*args)
info = warning = exception = debug
def debug(self, fmt, *args):
print(fmt % args)
info = warning = exception = error = debug
handlers = []
logger = LoggerStub()
@ -65,13 +67,21 @@ class ServerStub:
self.dispatcher = DispatcherStub(updates)
class DummyMultiEvent(threading.Event):
def get_trigger(self):
def trigger(event=self):
event.set()
sys.exit()
return trigger
def test_Communicator():
o = Communicator('communicator', LoggerStub(), {'.description':''}, ServerStub({}))
o = Communicator('communicator', LoggerStub(), {'.description': ''}, ServerStub({}))
o.earlyInit()
o.initModule()
event = threading.Event()
o.startModule(event.set)
assert event.is_set() # event should be set immediately
event = DummyMultiEvent()
o.startModule(event)
assert event.is_set() # event should be set immediately
def test_ModuleMagic():
@ -87,14 +97,13 @@ def test_ModuleMagic():
a1 = Parameter('a1', datatype=BoolType(), default=False)
a2 = Parameter('a2', datatype=BoolType(), default=True)
value = Parameter(datatype=StringType(), default='first')
target = Parameter(datatype=StringType(), default='')
@Command(argument=BoolType(), result=BoolType())
def cmd2(self, arg):
"""another stuff"""
return not arg
pollerClass = BasicPoller
def read_param1(self):
return True
@ -104,12 +113,16 @@ def test_ModuleMagic():
def read_a1(self):
return True
@nopoll
def read_a2(self):
return True
def read_value(self):
return 'second'
def read_status(self):
return 'IDLE', 'ok'
with pytest.raises(ProgrammingError):
class Mod1(Module): # pylint: disable=unused-variable
def do_this(self): # old style command
@ -132,9 +145,12 @@ def test_ModuleMagic():
return arg
value = Parameter(datatype=FloatRange(unit='deg'))
target = Parameter(datatype=FloatRange(), default=0)
a1 = Parameter(datatype=FloatRange(unit='$/s'), readonly=False)
# remark: it might be a programming error to override the datatype
# and not overriding the read_* method. This is not checked!
b2 = Parameter('<b2>', datatype=BoolType(), default=True,
poll=True, readonly=False, initwrite=True)
readonly=False, initwrite=True)
def write_a1(self, value):
self._a1_written = value
@ -170,30 +186,33 @@ def test_ModuleMagic():
# check for inital updates working properly
o1 = Newclass1('o1', logger, {'.description':''}, srv)
expectedBeforeStart = {'target': 0.0, 'status': (Drivable.Status.IDLE, ''),
expectedBeforeStart = {'target': '', 'status': (Drivable.Status.IDLE, ''),
'param1': False, 'param2': 1.0, 'a1': 0.0, 'a2': True, 'pollinterval': 5.0,
'value': 'first'}
assert updates.pop('o1') == expectedBeforeStart
o1.earlyInit()
event = threading.Event()
o1.startModule(event.set)
event = DummyMultiEvent()
o1.startModule(event)
event.wait()
# should contain polled values
expectedAfterStart = {'status': (Drivable.Status.IDLE, ''),
'value': 'second'}
expectedAfterStart = {
'status': (Drivable.Status.IDLE, 'ok'), 'value': 'second',
'param1': True, 'param2': 0.0, 'a1': True}
assert updates.pop('o1') == expectedAfterStart
# check in addition if parameters are written
o2 = Newclass2('o2', logger, {'.description':'', 'a1': 2.7}, srv)
# no update for b2, as this has to be written
expectedBeforeStart['a1'] = 2.7
expectedBeforeStart['target'] = 0.0
assert updates.pop('o2') == expectedBeforeStart
o2.earlyInit()
event = threading.Event()
o2.startModule(event.set)
event = DummyMultiEvent()
o2.startModule(event)
event.wait()
# value has changed type, b2 and a1 are written
expectedAfterStart.update(value=0, b2=True, a1=2.7)
expectedAfterStart.update(value=0, b2=True, a1=True)
# ramerk: a1=True: this behaviour is a Porgamming error
assert updates.pop('o2') == expectedAfterStart
assert o2._a1_written == 2.7
assert o2._b2_written is True
@ -210,13 +229,15 @@ def test_ModuleMagic():
# check '$' in unit works properly
assert o2.parameters['a1'].datatype.unit == 'mm/s'
cfg = Newclass2.configurables
assert set(cfg.keys()) == {'export', 'group', 'description',
assert set(cfg.keys()) == {
'export', 'group', 'description', 'disable_value_range_check',
'meaning', 'visibility', 'implementation', 'interface_classes', 'target', 'stop',
'status', 'param1', 'param2', 'cmd', 'a2', 'pollinterval', 'b2', 'cmd2', 'value',
'a1'}
assert set(cfg['value'].keys()) == {'group', 'export', 'relative_resolution',
'status', 'param1', 'param2', 'cmd', 'a2', 'pollinterval', 'slowinterval', 'b2',
'cmd2', 'value', 'a1'}
assert set(cfg['value'].keys()) == {
'group', 'export', 'relative_resolution',
'visibility', 'unit', 'default', 'datatype', 'fmtstr',
'absolute_resolution', 'poll', 'max', 'min', 'readonly', 'constant',
'absolute_resolution', 'max', 'min', 'readonly', 'constant',
'description', 'needscfg'}
# check on the level of classes
@ -459,3 +480,177 @@ def test_command_none():
assert 'stop' in Mod('o', logger, {'description': ''}, srv).accessibles
assert 'stop' not in Mod2('o', logger, {'description': ''}, srv).accessibles
def test_bad_method():
class Mod0(Drivable): # pylint: disable=unused-variable
def write_target(self, value):
pass
with pytest.raises(ProgrammingError):
class Mod1(Drivable): # pylint: disable=unused-variable
def write_taget(self, value):
pass
class Mod2(Drivable): # pylint: disable=unused-variable
def read_value(self, value):
pass
with pytest.raises(ProgrammingError):
class Mod3(Drivable): # pylint: disable=unused-variable
def read_valu(self, value):
pass
def test_generic_access():
class Mod(Module):
param = Parameter('handled param', StringType(), readonly=False)
unhandled = Parameter('unhandled param', StringType(), default='', readonly=False)
data = {'param': ''}
@ReadHandler(['param'])
def read_handler(self, pname):
value = self.data[pname]
setattr(self, pname, value)
return value
@WriteHandler(['param'])
def write_handler(self, pname, value):
value = value.lower()
self.data[pname] = value
setattr(self, pname, value)
return value
updates = {}
srv = ServerStub(updates)
obj = Mod('obj', logger, {'description': '', 'param': 'initial value'}, srv)
assert obj.param == 'initial value'
assert obj.write_param('Cheese') == 'cheese'
assert obj.write_unhandled('Cheese') == 'Cheese'
assert updates == {'obj': {'param': 'cheese', 'unhandled': 'Cheese'}}
updates.clear()
assert obj.write_param('Potato') == 'potato'
assert updates == {'obj': {'param': 'potato'}}
updates.clear()
assert obj.read_param() == 'potato'
assert obj.read_unhandled()
assert updates == {'obj': {'param': 'potato'}}
updates.clear()
assert updates == {}
def test_duplicate_handler_name():
with pytest.raises(ProgrammingError):
class Mod(Module): # pylint: disable=unused-variable
param = Parameter('handled param', StringType(), readonly=False)
@ReadHandler(['param'])
def handler(self, pname):
pass
@WriteHandler(['param'])
def handler(self, pname, value): # pylint: disable=function-redefined
pass
def test_handler_overwrites_method():
with pytest.raises(RuntimeError):
class Mod1(Module): # pylint: disable=unused-variable
param = Parameter('handled param', StringType(), readonly=False)
@ReadHandler(['param'])
def read_handler(self, pname):
pass
def read_param(self):
pass
with pytest.raises(RuntimeError):
class Mod2(Module): # pylint: disable=unused-variable
param = Parameter('handled param', StringType(), readonly=False)
@WriteHandler(['param'])
def write_handler(self, pname, value):
pass
def write_param(self, value):
pass
def test_no_read_write():
class Mod(Module):
param = Parameter('test param', StringType(), readonly=False)
updates = {}
srv = ServerStub(updates)
obj = Mod('obj', logger, {'description': '', 'param': 'cheese'}, srv)
assert obj.param == 'cheese'
assert obj.read_param() == 'cheese'
assert updates == {'obj': {'param': 'cheese'}}
assert obj.write_param('egg') == 'egg'
assert obj.param == 'egg'
assert updates == {'obj': {'param': 'egg'}}
def test_incompatible_value_target():
class Mod1(Drivable):
value = Parameter('', FloatRange(0, 10), default=0)
target = Parameter('', FloatRange(0, 11), default=0)
class Mod2(Drivable):
value = Parameter('', FloatRange(), default=0)
target = Parameter('', StringType(), default='')
class Mod3(Drivable):
value = Parameter('', FloatRange(), default=0)
target = Parameter('', ScaledInteger(1, 0, 10), default=0)
srv = ServerStub({})
with pytest.raises(ConfigError):
obj = Mod1('obj', logger, {'description': ''}, srv) # pylint: disable=unused-variable
with pytest.raises(ProgrammingError):
obj = Mod2('obj', logger, {'description': ''}, srv)
obj = Mod3('obj', logger, {'description': ''}, srv)
def test_problematic_value_range():
class Mod(Drivable):
value = Parameter('', FloatRange(0, 10), default=0)
target = Parameter('', FloatRange(0, 10), default=0)
srv = ServerStub({})
obj = Mod('obj', logger, {'description': '', 'value.max': 10.1}, srv) # pylint: disable=unused-variable
with pytest.raises(ConfigError):
obj = Mod('obj', logger, {'description': ''}, srv)
class Mod2(Drivable):
value = Parameter('', FloatRange(), default=0)
target = Parameter('', FloatRange(), default=0)
obj = Mod2('obj', logger, {'description': ''}, srv)
obj = Mod2('obj', logger, {'description': '', 'target.min': 0, 'target.max': 10}, srv)
with pytest.raises(ConfigError):
obj = Mod('obj', logger, {
'value.min': 0, 'value.max': 10,
'target.min': 0, 'target.max': 10, 'description': ''}, srv)
obj = Mod('obj', logger, {'disable_value_range_check': True,
'value.min': 0, 'value.max': 10,
'target.min': 0, 'target.max': 10, 'description': ''}, srv)
generalConfig.defaults['disable_value_range_check'] = True
class Mod4(Drivable):
value = Parameter('', FloatRange(0, 10), default=0)
target = Parameter('', FloatRange(0, 10), default=0)
obj = Mod4('obj', logger, {
'value.min': 0, 'value.max': 10,
'target.min': 0, 'target.max': 10, 'description': ''}, srv)

60
test/test_multievent.py Normal file
View File

@ -0,0 +1,60 @@
# -*- coding: utf-8 -*-
# *****************************************************************************
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
#
# *****************************************************************************
import time
from secop.lib.multievent import MultiEvent
def test_without_timeout():
m = MultiEvent()
s1 = m.get_trigger(name='s1')
s2 = m.get_trigger(name='s2')
assert not m.wait(0)
assert m.deadline() is None
assert m.waiting_for() == {'s1', 's2'}
s2()
assert m.waiting_for() == {'s1'}
assert not m.wait(0)
s1()
assert not m.waiting_for()
assert m.wait(0)
def test_with_timeout(monkeypatch):
current_time = 1000
monkeypatch.setattr(time, 'monotonic', lambda: current_time)
m = MultiEvent()
assert m.deadline() == 0
m.name = 's1'
s1 = m.get_trigger(10)
assert m.deadline() == 1010
m.name = 's2'
s2 = m.get_trigger(20)
assert m.deadline() == 1020
current_time += 21
assert not m.wait(0)
assert m.waiting_for() == {'s1', 's2'}
s1()
assert m.waiting_for() == {'s2'}
s2()
assert not m.waiting_for()
assert m.wait(0)

View File

@ -21,18 +21,20 @@
# *****************************************************************************
"""test poller."""
import sys
import threading
import time
from collections import OrderedDict
import logging
import pytest
from secop.modules import Drivable
from secop.poller import DYNAMIC, REGULAR, SLOW, Poller
from secop.core import Module, Parameter, FloatRange, Readable, ReadHandler, nopoll
from secop.lib.multievent import MultiEvent
Status = Drivable.Status
class Time:
STARTTIME = 1000 # artificial time zero
STARTTIME = 1000 # artificial time zero
def __init__(self):
self.reset()
self.finish = float('inf')
@ -61,187 +63,103 @@ class Time:
self.seconds += seconds
self.busytime += seconds
artime = Time() # artificial test time
@pytest.fixture(autouse=True)
def patch_time(monkeypatch):
monkeypatch.setattr(time, 'time', artime.time)
class Event(threading.Event):
def wait(self, timeout=None):
artime.sleep(max(0, timeout))
class Event:
def __init__(self):
self.flag = False
class DispatcherStub:
maxcycles = 10
def wait(self, timeout):
artime.sleep(max(0,timeout))
def set(self):
self.flag = True
def clear(self):
self.flag = False
def is_set(self):
return self.flag
class Parameter:
def __init__(self, name, readonly, poll, polltype, interval):
self.poll = poll
self.polltype = polltype # used for check only
self.export = name
self.readonly = readonly
self.interval = interval
self.timestamp = 0
self.handler = None
self.reset()
def reset(self):
self.cnt = 0
self.span = 0
self.maxspan = 0
def rfunc(self):
artime.busy(artime.commtime)
def announce_update(self, modulename, pname, pobj):
now = artime.time()
self.span = now - self.timestamp
self.maxspan = max(self.maxspan, self.span)
self.timestamp = now
self.cnt += 1
return True
def __repr__(self):
return 'Parameter(%s)' % ", ".join("%s=%r" % item for item in self.__dict__.items())
class Module:
properties = {}
pollerClass = Poller
iodev = 'common_iodev'
def __init__(self, name, pollinterval=5, fastfactor=0.25, slowfactor=4, busy=False,
counts=(), auto=None):
'''create a dummy module
nauto, ndynamic, nregular, nslow are the number of parameters of each polltype
'''
self.pollinterval = pollinterval
self.fast_pollfactor = fastfactor
self.slow_pollfactor = slowfactor
self.parameters = OrderedDict()
self.name = name
self.is_busy = busy
if auto is not None:
self.pvalue = self.addPar('value', True, auto or DYNAMIC, DYNAMIC)
# readonly = False should not matter:
self.pstatus = self.addPar('status', False, auto or DYNAMIC, DYNAMIC)
self.pregular = self.addPar('regular', True, auto or REGULAR, REGULAR)
self.pslow = self.addPar('slow', False, auto or SLOW, SLOW)
self.addPar('notpolled', True, False, 0)
self.counts = 'auto'
if hasattr(pobj, 'stat'):
pobj.stat.append(now)
else:
ndynamic, nregular, nslow = counts
for i in range(ndynamic):
self.addPar('%s:d%d' % (name, i), True, DYNAMIC, DYNAMIC)
for i in range(nregular):
self.addPar('%s:r%d' % (name, i), True, REGULAR, REGULAR)
for i in range(nslow):
self.addPar('%s:s%d' % (name, i), False, SLOW, SLOW)
self.counts = counts
pobj.stat = [now]
self.maxcycles -= 1
if self.maxcycles <= 0:
self.finish_event.set()
sys.exit() # stop thread
def addPar(self, name, readonly, poll, expected_polltype):
# self.count[polltype] += 1
expected_interval = self.pollinterval
if expected_polltype == SLOW:
expected_interval *= self.slow_pollfactor
elif expected_polltype == DYNAMIC and self.is_busy:
expected_interval *= self.fast_pollfactor
pobj = Parameter(name, readonly, poll, expected_polltype, expected_interval)
setattr(self, 'read_' + pobj.export, pobj.rfunc)
self.parameters[pobj.export] = pobj
return pobj
def isBusy(self):
return self.is_busy
class ServerStub:
def __init__(self):
self.dispatcher = DispatcherStub()
def pollOneParam(self, pname):
getattr(self, 'read_' + pname)()
def writeInitParams(self):
pass
class Base(Module):
def __init__(self):
srv = ServerStub()
super().__init__('mod', logging.getLogger('dummy'), dict(description=''), srv)
self.dispatcher = srv.dispatcher
self.nextPollEvent = Event()
def __repr__(self):
rdict = self.__dict__.copy()
rdict.pop('parameters')
return 'Module(%r, counts=%r, f=%r, pollinterval=%g, is_busy=%r)' % (self.name,
self.counts, (self.fast_pollfactor, self.slow_pollfactor, 1),
self.pollinterval, self.is_busy)
def run(self, maxcycles):
self.dispatcher.maxcycles = maxcycles
self.dispatcher.finish_event = threading.Event()
self.startModule(MultiEvent())
self.dispatcher.finish_event.wait(1)
module_list = [
[Module('x', 3.0, 0.125, 10, False, auto=True),
Module('y', 3.0, 0.125, 10, False, auto=False)],
[Module('a', 1.0, 0.25, 4, True, (5, 5, 10)),
Module('b', 2.0, 0.25, 4, True, (5, 5, 50))],
[Module('c', 1.0, 0.25, 4, False, (5, 0, 0))],
[Module('d', 1.0, 0.25, 4, True, (0, 9, 0))],
[Module('e', 1.0, 0.25, 4, True, (0, 0, 9))],
[Module('f', 1.0, 0.25, 4, True, (0, 0, 0))],
]
@pytest.mark.parametrize('modules', module_list)
def test_Poller(modules):
# check for proper timing
for overloaded in False, True:
artime.reset()
count = {DYNAMIC: 0, REGULAR: 0, SLOW: 0}
maxspan = {DYNAMIC: 0, REGULAR: 0, SLOW: 0}
pollTable = dict()
for module in modules:
Poller.add_to_table(pollTable, module)
for pobj in module.parameters.values():
if pobj.poll:
maxspan[pobj.polltype] = max(maxspan[pobj.polltype], pobj.interval)
count[pobj.polltype] += 1
pobj.reset()
assert len(pollTable) == 1
poller = pollTable[(Poller, 'common_iodev')]
artime.stop = poller.stop
poller._event = Event() # patch Event.wait
class Mod1(Base, Readable):
param1 = Parameter('', FloatRange())
param2 = Parameter('', FloatRange())
param3 = Parameter('', FloatRange())
param4 = Parameter('', FloatRange())
assert (sum(count.values()) > 0) == bool(poller)
@ReadHandler(('param1', 'param2', 'param3'))
def read_param(self, name):
artime.sleep(1.0)
return 0
def started_callback(modules=modules):
for module in modules:
for pobj in module.parameters.values():
assert pobj.cnt == bool(pobj.poll) # all parameters have to be polled once
pobj.reset() # set maxspan and cnt to 0
@nopoll
def read_param4(self):
return 0
if overloaded:
# overloaded scenario
artime.commtime = 1.0
ncycles = 10
if count[SLOW] > 0:
cycletime = (count[REGULAR] + 1) * count[SLOW] * 2
else:
cycletime = max(count[REGULAR], count[DYNAMIC]) * 2
artime.reset(cycletime * ncycles * 1.01) # poller will quit given time
poller.run(started_callback)
total = artime.time() - artime.STARTTIME
for module in modules:
for pobj in module.parameters.values():
if pobj.poll:
# average_span = total / (pobj.cnt + 1)
assert total / (pobj.cnt + 1) <= max(cycletime, pobj.interval * 1.1)
else:
# normal scenario
artime.commtime = 0.001
artime.reset(max(maxspan.values()) * 5) # poller will quit given time
poller.run(started_callback)
total = artime.time() - artime.STARTTIME
for module in modules:
for pobj in module.parameters.values():
if pobj.poll:
assert pobj.cnt > 0
assert pobj.maxspan <= maxspan[pobj.polltype] * 1.1
assert (pobj.cnt + 1) * pobj.interval >= total * 0.99
assert abs(pobj.span - pobj.interval) < 0.01
pobj.reset()
def read_status(self):
artime.sleep(1.0)
return 0
def read_value(self):
artime.sleep(1.0)
return 0
@pytest.mark.parametrize(
'ncycles, pollinterval, slowinterval, mspan, pspan',
[ # normal case: 5+-1 15+-1
( 60, 5, 15, (4, 6), (14, 16)),
# pollinterval faster then reading: mspan max 3 s (polls of value, status and ONE other parameter)
( 60, 1, 5, (1, 3), (5, 16)),
])
def test_poll(ncycles, pollinterval, slowinterval, mspan, pspan, monkeypatch):
monkeypatch.setattr(time, 'time', artime.time)
artime.reset()
m = Mod1()
m.pollinterval = pollinterval
m.slowInterval = slowinterval
m.run(ncycles)
assert not hasattr(m.parameters['param4'], 'stat')
for pname in ['value', 'status']:
pobj = m.parameters[pname]
lowcnt = 0
for t1, t2 in zip(pobj.stat[1:], pobj.stat[2:-1]):
if t2 - t1 < mspan[0]:
print(t2 - t1)
lowcnt += 1
assert t2 - t1 <= mspan[1]
assert lowcnt <= 1
for pname in ['param1', 'param2', 'param3']:
pobj = m.parameters[pname]
lowcnt = 0
for t1, t2 in zip(pobj.stat[1:], pobj.stat[2:-1]):
if t2 - t1 < pspan[0]:
print(pname, t2 - t1)
lowcnt += 1
assert t2 - t1 <= pspan[1]
assert lowcnt <= 1

View File

@ -26,6 +26,7 @@ import pytest
from secop.datatypes import FloatRange, IntRange, StringType, ValueType
from secop.errors import BadValueError, ConfigError, ProgrammingError
from secop.properties import HasProperties, Property
from secop.core import Parameter
def Prop(*args, name=None, **kwds):
@ -38,10 +39,10 @@ V_test_Property = [
[Prop(StringType(), 'default', extname='extname', mandatory=False),
dict(default='default', extname='extname', export=True, mandatory=False)
],
[Prop(IntRange(), '42', export=True, name='custom', mandatory=True),
[Prop(IntRange(), 42, export=True, name='custom', mandatory=True),
dict(default=42, extname='_custom', export=True, mandatory=True),
],
[Prop(IntRange(), '42', export=True, name='name'),
[Prop(IntRange(), 42, export=True, name='name'),
dict(default=42, extname='_name', export=True, mandatory=False)
],
[Prop(IntRange(), 42, '_extname', mandatory=True),
@ -85,12 +86,12 @@ def test_Property_basic():
Property('')
with pytest.raises(ValueError):
Property('', 1)
Property('', IntRange(), '42', 'extname', False, False)
Property('', IntRange(), 42, 'extname', False, False)
def test_Properties():
class Cls(HasProperties):
aa = Property('', IntRange(0, 99), '42', export=True)
aa = Property('', IntRange(0, 99), 42, export=True)
bb = Property('', IntRange(), 0, export=False)
assert Cls.aa.default == 42
@ -149,17 +150,25 @@ def test_Property_override():
assert o2.a == 3
with pytest.raises(ProgrammingError) as e:
class cx(c): # pylint: disable=unused-variable
class cx(c): # pylint: disable=unused-variable
def a(self):
pass
assert 'collides with' in str(e.value)
with pytest.raises(ProgrammingError) as e:
class cz(c): # pylint: disable=unused-variable
class cy(c): # pylint: disable=unused-variable
a = 's'
assert 'can not set' in str(e.value)
with pytest.raises(ProgrammingError) as e:
class cz(c): # pylint: disable=unused-variable
a = 's'
class cp(c): # pylint: disable=unused-variable
# overriding a Property with a Parameter is allowed
a = Parameter('x', IntRange())
def test_Properties_mro():
class Base(HasProperties):

148
test/test_statemachine.py Normal file
View File

@ -0,0 +1,148 @@
# -*- coding: utf-8 -*-
# *****************************************************************************
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
#
# *****************************************************************************
from secop.lib.statemachine import StateMachine, Stop, Retry
def rise(state):
state.step += 1
print('rise', state.step)
if state.init:
state.status = 'rise'
state.level += 1
if state.level > 3:
return turn
return Retry()
def turn(state):
state.step += 1
if state.init:
state.status = 'turn'
state.direction += 1
if state.direction > 3:
return fall
return Retry()
def fall(state):
state.step += 1
if state.init:
state.status = 'fall'
state.level -= 1
if state.level < 0:
raise ValueError('crash')
return Retry(0) # retry until crash!
def error_handler(state):
state.last_error_name = type(state.last_error).__name__
class LoggerStub:
def debug(self, fmt, *args):
print(fmt % args)
info = warning = exception = error = debug
handlers = []
class DummyThread:
def is_alive(self):
return True
def test_fun():
s = StateMachine(step=0, status='', threaded=False, logger=LoggerStub())
assert s.step == 0
assert s.status == ''
s.cycle() # do nothing
assert s.step == 0
s.start(rise, level=0, direction=0)
s.cycle()
for i in range(1, 4):
assert s.status == 'rise'
assert s.step == i
assert s.level == i
assert s.direction == 0
s.cycle()
for i in range(5, 8):
assert s.status == 'turn'
assert s.step == i
assert s.level == 4
assert s.direction == i - 4
s.cycle()
s.cycle() # -> crash
assert isinstance(s.last_error, ValueError)
assert str(s.last_error) == 'crash'
assert s.state is None
def test_max_chain():
s = StateMachine(step=0, status='', threaded=False, logger=LoggerStub())
s.start(fall, level=999+1, direction=0)
s.cycle()
assert isinstance(s.last_error, RuntimeError)
assert s.state is None
def test_stop():
s = StateMachine(step=0, status='', threaded=False, logger=LoggerStub())
s.start(rise, level=0, direction=0)
for _ in range(1, 3):
s.cycle()
s.stop()
s.cycle()
assert isinstance(s.last_error, Stop)
assert s.state is None
def test_std_error_handling():
s = StateMachine(step=0, status='', threaded=False, logger=LoggerStub())
s.start(rise, level=0, direction=0)
s.cycle()
s.level = None # -> TypeError on next step
s.cycle()
assert s.state is None # default error handler: stop machine
assert isinstance(s.last_error, TypeError)
assert not hasattr(s, 'last_error_name')
def test_default_error_handling():
s = StateMachine(step=0, status='', cleanup=error_handler, threaded=False, logger=LoggerStub())
s.start(rise, level=0, direction=0)
s.cycle()
s.level = None
s.cycle()
assert s.state is None
assert s.last_error_name == 'TypeError'
assert isinstance(s.last_error, TypeError)
def test_cleanup_on_restart():
s = StateMachine(step=0, status='', threaded=False, logger=LoggerStub())
s.start(rise, level=0, direction=0)
s.cycle()
s.start(turn)
s.cycle()
assert s.state is turn
assert s.last_error is None