Starter tests for SICS unit testing

This commit is contained in:
Douglas Clowes
2014-05-08 14:42:47 +10:00
parent 8d6d28bd53
commit f5f5ca33ea

View File

@ -0,0 +1,373 @@
#!/usr/bin/env python
# vim: ts=8 sts=4 sw=4 expandtab
# Author: Douglas Clowes (dcl@ansto.gov.au) 2014-04-24
#
# Looks like test cases are run alphabetically within classes
# and classes are run alphabetically too.
# So, Able is the base class derived from Testcase
#
from sics_proxy import SICSClientFactory, SicsProtocol
from sics_hdb import HipadabaTree
from twisted.trial import unittest
from twisted.internet import reactor, protocol, defer
import ConfigParser, StringIO
class Able(unittest.TestCase):
def setUp(self):
#print "Connecting ...",
self.factory = SICSClientFactory()
self.sics = None
self.deferred = defer.Deferred()
def cb_login(client, *args, **kw):
#print "cb_login:", client, args, kw
self.deferred.callback(True)
def cb_connect(client, *args, **kw):
#print "cb_connect:", client, client.login, args, kw
self.sics = client
self.sics.factory = self.factory
self.sics.login_deferred.addCallback(cb_login)
#print "Connected"
creator = protocol.ClientCreator(reactor, SicsProtocol)
d = creator.connectTCP("localhost", 60003)
d = d.addCallback(cb_connect)
return self.deferred
def send_command(self, command):
d = self.sics.sendMessage(command)
#print "Command:", command, d
return d
def load_hipadaba(self):
def cb0(result, *args, **kw):
with open("junksh.xml", "w") as outfile:
for line in result:
outfile.write(line + '\n')
if result[0][0] != '<':
raise Exception("Tree does not start with left angle:" + result[0])
if result[-1] != '</hipadaba:SICS>':
raise Exception("Tree does not end with </hipadaba:SICS>:" + result[-1])
self.hipadaba = HipadabaTree('\n'.join(result))
self.tree_deferred.callback(self.hipadaba)
self.tree_deferred = defer.Deferred()
d = self.send_command("getgumtreexml /")
d.addCallback(cb0)
return self.tree_deferred
def tearDown(self):
#print "tearDownModule"
if self.sics is not None:
self.sics.login_deferred = None
self.sics.command_deferred = None
self.sics.transport.loseConnection()
self.sics = None
self.deferred = None
class Baker(Able):
timeout = 5
def test_000_login(self):
debug = False
self.deferred = defer.Deferred()
d = self.sics.login_deferred
def cb0(result, *args, **kw):
if debug:
print "Login:", result, args, kw
if self.sics.login != 2:
raise Exception("Bad login:" + repr(self.sics.login))
d = self.send_command("fileeval TEST_SICS/test_suite.tcl")
d.addCallback(cb1)
def cb1(result, *args, **kw):
if debug:
print "fileeval:", result, args, kw
if result[0] != "OK":
raise Exception("fileeval returned " + repr(result))
d = self.send_command("tcl:test_suite::show_config ::config_dict")
d.addCallback(cb2)
def cb2(result, *args, **kw):
global config
if debug:
print "config:", result, args, kw
ini = StringIO.StringIO("\n".join(result))
if debug:
print "ini:", ini.readlines()
ini.seek(0)
print "ini:", ini.read()
ini.seek(0)
print "ini:", dir(ini)
config = ConfigParser.SafeConfigParser()
if debug:
print "config:", repr(config)
config.readfp(ini)
if debug:
print "config:", repr(config)
print "config:", dir(config)
for section in sorted(config.sections()):
print ("[%s]\n" % section),
for option in sorted(config.options(section)):
print ("%s = %s\n" % (option, config.get(section, option))),
print
d = self.load_hipadaba()
d.addCallback(cb3)
def cb3(result, *args, **kw):
global hipadaba
hipadaba = result
self.deferred.callback(None)
d.addCallback(cb0)
return self.deferred
def test_001_status(self):
d = self.send_command("status")
def cb3(result, *args, **kw):
#print "Status:", result
if not result[0].startswith("status = "):
raise Exception("Bad status:" + repr(result))
d.addCallback(cb3)
return d
def test_002_dir(self):
raise unittest.SkipTest("Not doing this test now")
self.deferred = defer.Deferred()
d = self.send_command("dir types")
self.result = 0
def cb2(result, *args, **kw):
self.result += 1
result = sorted([i.strip() for i in result])
#print "Types:", result
for i in range(len(result)):
result[i] = " ".join(sorted(result[i].split()))
#print "Types:", result
if len(self.types) > 0:
type = self.types[0]
del self.types[0]
d = self.send_command("sicslist type %s" % type)
d.addCallback(cb2)
else:
#print "Dir:", self.result
self.deferred.callback(None)
def cb3(result, *args, **kw):
self.types = sorted([i.strip().replace(" ", "_") for i in result])
#print "Types:", self.types
if len(self.types) > 0:
type = self.types[0]
del self.types[0]
d = self.send_command("sicslist type %s" % type)
d.addCallback(cb2)
else:
print "Dir: None"
self.deferred.callback(None)
d.addCallback(cb3)
return self.deferred
def test_003_motor(self):
global motors
motors = []
d = self.send_command("sicslist type motor")
def cb3(result, *args, **kw):
global motors
motors = sorted(result[0].lower().split())
#print "Motors:", len(motors)
missing = []
for m in ["m1", "m2", "s1", "s2", "a1", "a2"]:
if m not in motors:
missing.append(m)
if len(missing) > 0:
raise Exception("Missing motors:" + repr(missing))
d.addCallback(cb3)
return d
def test_004_000_xml(self):
#raise unittest.SkipTest("Not doing this test now")
d = self.send_command("getgumtreexml /")
def cb3(result, *args, **kw):
global root
#print "XML:", len(result)
with open("junk.xml", "w") as outfile:
for line in result:
outfile.write(line + '\n')
if "</hipadaba:SICS>" != result[-1]:
raise Exception("XML is not complete, ends:" + repr(result[-1]))
txt = []
for line in result:
txt.append(''.join(c for c in line if ord(c) >= 32))
root = HipadabaTree('\n'.join(txt)).root
d.addCallback(cb3)
return d
def test_004_001_xml(self):
print
print "Root:", root, root.attrib
for child in root.findall('component'):
print child, child.tag, child.attrib
#for grandchild in child.findall('component'):
# print " ", grandchild.attrib
def test_004_002_xml(self):
print
child = root
for node in "/sample/ps9/".strip('/').split('/'):
children = [ch for ch in child.findall('component') if ch.attrib['id'].lower() == node.lower()]
print children
child = children[0]
print child
print child.attrib
print [ch.attrib['id'] for ch in child.findall('component')]
print [(ch.attrib['id'], [v.text for v in ch.findall('value')]) for ch in child.findall('property')]
def test_005_drive(self):
global motors
if not "ds9" in motors:
raise unittest.SkipTest("Cannot drive motor ds9: does not exist")
d = self.send_command("drive ds9 0")
def cb3(result, *args, **kw):
#print "Drive:", result
if "Driving finished successfully" != result[-1]:
raise Exception("Drive is not complete, ends:" + repr(result[-1]))
d.addCallback(cb3)
return d
def test_006_drive(self):
d = self.send_command("drive m1 20 m2 20 ")
def cb3(result, *args, **kw):
#print "Drive:", result
if "Driving finished successfully" != result[-1]:
raise Exception("Drive is not complete, ends:" + repr(result[-1]))
d.addCallback(cb3)
return d
def test_007_mercury(self):
raise unittest.SkipTest("Not doing this test now (borken)")
global config, hipadaba
debug = False
def cb0(result, *args, **kw):
if debug:
print "\nMercury:", result.short_tree()
self.deferred.callback(result)
if "mercury_scpi" in config.sections() and config.get("mercury_scpi", "enabled"):
self.deferred = defer.Deferred()
d = self.load_hipadaba("/sample/%s" % config.get("mercury_scpi", "name"))
d.addCallback(cb0)
return self.deferred
else:
raise unittest.SkipTest("mercury_scpi is not configured")
test_007_mercury.timeout = 2
def test_008_mercury_hipadaba(self):
raise unittest.SkipTest("Not doing this test now (borken)")
global hipadaba
debug = False
if hipadaba is None:
raise unittest.SkipTest("hipadaba is missing")
else:
if False:
hipadaba.print_tree()
root_children = hipadaba.getChildren('/')
if debug:
print "Children:", root_children
root_children = [i.lower() for i in root_children]
self.assertTrue('sample' in root_children)
sample_children = hipadaba.getChildren('/sample')
if debug:
print "Children:", sample_children
sample_children = [i.lower() for i in sample_children]
self.assertTrue('tc9' in sample_children)
level_children = hipadaba.getChildren('/sample/tc9/level/')
if debug:
print "Children:", level_children
level_children = [i.lower() for i in level_children]
self.assertTrue('helium' in level_children)
self.assertTrue('nitrogen' in level_children)
he_props = hipadaba.getProperties('/sample/tc9/level/helium')
if debug:
print "Properties:", he_props
he_props = [i.lower() for i in he_props]
self.assertTrue('data' in he_props)
self.assertTrue('control' in he_props)
self.assertTrue('nxsave' in he_props)
self.assertTrue('nxalias' in he_props)
nxalias = hipadaba.getProperty('/sample/tc9/level/helium', 'NXalias')
if debug:
print "Prop:", nxalias
self.assertTrue(nxalias.lower() == 'tc9_level_helium'.lower())
y = hipadaba.getValue('/sample/tc9/Level/helium')
if debug:
print "Value:", y
y = hipadaba.getNode('/sample/tc9/level/helium')
if debug:
print "Found:", hipadaba.short_tree(y)
if debug:
hipadaba.print_tree(y)
text = hipadaba.list_tree(y, props=True, indent=6)
print
for line in text:
print line
class Zulu(Able):
def test_performance(self):
d = self.send_command("performance")
def cb3(result, *args, **kw):
#print "Performance:", result
junk, performance = result[0].split("=")
if not junk.startswith("Performance"):
raise Exception("Can't find Performance")
if float(performance) < 50:
raise Exception("Performance too low: %s" % performance)
if float(performance) > 100:
raise Exception("Performance too high %s" % performance)
d.addCallback(cb3)
return d
#class SicsTestCase(unittest.TestCase):
# callbacks = []
#
# def _test(command):
# d = defer.Deferred()
# self.d = d
# self.client.sendMessage(command)
# return d
#
# def test_status1(self):
# d = _test("status")
# return d
#
# def test_0000(self):
# d = defer.Deferred()
# self.d = d
# def cb2(result, *args, **kw):
# print "Login Callback", result, args
# print "Add cb2", d
# d = d.addCallback(cb2, "Fred2")
# #reactor.callLater(5, d.callback, False)
# return d
#
# def test_status(self):
# d = defer.Deferred()
# self.d = d
# self.client.sendMessage("status")
# #reactor.callLater(5, d.callback, False)
# return d
#
# def add_callback(self, cb):
# if not cb in self.callbacks:
# self.callbacks.append(cb)
#
# def del_callback(self, cb):
# if cb in self.callbacks:
# self.callbacks.remove(cb)
#
# def call_callbacks(self, data):
# d = self.d
# self.d = None
# print "Callback:", d, data
# for cb in self.callbacks:
# cb(self, data)
# if d:
# def cb3(result, *args, **kw):
# print "Xqt Callback", result, args
# print "Add cb3", d
# d.addCallback(cb3)
# print "Call Callback", d
# d.callback(data)
#