452 lines
17 KiB
Python
452 lines
17 KiB
Python
#!/usr/bin/env python
|
|
# vim: ts=8 sts=4 sw=4 expandtab
|
|
# Author: Douglas Clowes (dcl@ansto.gov.au) 2014-04-24
|
|
#
|
|
# Looks like test cases are run alphabetically within classes
|
|
# and classes are run alphabetically too.
|
|
# So, Able is the base class derived from Testcase
|
|
#
|
|
from sics_proxy import SICSClientFactory, SicsProtocol
|
|
from sics_hdb import HipadabaTree
|
|
|
|
from twisted.trial import unittest
|
|
from twisted.internet import reactor, protocol, defer
|
|
|
|
import ConfigParser, StringIO
|
|
from datetime import datetime as dt
|
|
|
|
|
|
class Able(unittest.TestCase):
|
|
def setUp(self):
|
|
#print "Connecting ...",
|
|
self.factory = SICSClientFactory()
|
|
self.sics = None
|
|
self.deferred = defer.Deferred()
|
|
def cb_login(client, *args, **kw):
|
|
#print "cb_login:", client, args, kw
|
|
self.deferred.callback(True)
|
|
def cb_connect(client, *args, **kw):
|
|
#print "cb_connect:", client, client.login, args, kw
|
|
self.sics = client
|
|
self.sics.factory = self.factory
|
|
self.sics.login_deferred.addCallback(cb_login)
|
|
#print "Connected"
|
|
creator = protocol.ClientCreator(reactor, SicsProtocol)
|
|
d = creator.connectTCP("localhost", 60003)
|
|
d = d.addCallback(cb_connect)
|
|
return self.deferred
|
|
|
|
def send_command(self, command):
|
|
d = self.sics.sendMessage(command)
|
|
#print "Command:", command, d
|
|
return d
|
|
|
|
def load_hipadaba(self):
|
|
def cb0(result, *args, **kw):
|
|
with open("hipadaba.xml", "w") as outfile:
|
|
for line in result:
|
|
outfile.write(line + '\n')
|
|
if result[0][0] != '<':
|
|
raise Exception("Tree does not start with left angle:" + result[0])
|
|
if result[-1] != '</hipadaba:SICS>':
|
|
raise Exception("Tree does not end with </hipadaba:SICS>:" + result[-1])
|
|
self.hipadaba = HipadabaTree('\n'.join(result))
|
|
self.tree_deferred.callback(self.hipadaba)
|
|
self.tree_deferred = defer.Deferred()
|
|
d = self.send_command("tcl:test_suite::getsicsxml /")
|
|
d.addCallback(cb0)
|
|
return self.tree_deferred
|
|
|
|
def tearDown(self):
|
|
#print "tearDownModule"
|
|
if self.sics is not None:
|
|
self.sics.login_deferred = None
|
|
self.sics.command_deferred = None
|
|
self.sics.transport.loseConnection()
|
|
self.sics = None
|
|
self.deferred = None
|
|
|
|
class Baker(Able):
|
|
timeout = 5
|
|
def test_000_000_login(self):
|
|
debug = False
|
|
self.deferred = defer.Deferred()
|
|
def cb0(result, *args, **kw):
|
|
if debug:
|
|
print "Login:", result, args, kw
|
|
if self.sics.login != 2:
|
|
raise Exception("Bad login:" + repr(self.sics.login))
|
|
self.deferred.callback(None)
|
|
d = self.sics.login_deferred
|
|
d.addCallback(cb0)
|
|
return self.deferred
|
|
|
|
def test_000_001_fileeval(self):
|
|
debug = False
|
|
self.deferred = defer.Deferred()
|
|
def cb1(result, *args, **kw):
|
|
if debug:
|
|
print "fileeval:", result, args, kw
|
|
if result[0] != "OK":
|
|
raise Exception("fileeval returned " + repr(result))
|
|
self.deferred.callback(None)
|
|
d = self.send_command("fileeval TEST_SICS/unit_tests/test_suite.tcl")
|
|
d.addCallback(cb1)
|
|
return self.deferred
|
|
|
|
def test_001_000_config(self):
|
|
debug = False
|
|
self.deferred = defer.Deferred()
|
|
def cb2(result, *args, **kw):
|
|
global config
|
|
if debug:
|
|
print "config:", result, args, kw
|
|
ini = StringIO.StringIO("\n".join(result))
|
|
if debug:
|
|
print "ini:", ini.readlines()
|
|
ini.seek(0)
|
|
print "ini:", ini.read()
|
|
ini.seek(0)
|
|
print "ini:", dir(ini)
|
|
config = ConfigParser.SafeConfigParser()
|
|
if debug:
|
|
print "config:", repr(config)
|
|
config.readfp(ini)
|
|
if debug:
|
|
print "config:", repr(config)
|
|
print "config:", dir(config)
|
|
for section in sorted(config.sections()):
|
|
print ("[%s]\n" % section),
|
|
for option in sorted(config.options(section)):
|
|
print ("%s = %s\n" % (option, config.get(section, option))),
|
|
print
|
|
self.deferred.callback(None)
|
|
d = self.send_command("tcl:test_suite::show_config ::config_dict")
|
|
d.addCallback(cb2)
|
|
return self.deferred
|
|
|
|
def test_001_001_clock(self):
|
|
debug = False
|
|
self.deferred = defer.Deferred()
|
|
def cb2(result, *args, **kw):
|
|
if debug:
|
|
print "Clock:", result
|
|
formatted = dt.fromtimestamp(1399866027).strftime("%Y-%b-%d %H:%M:%S")
|
|
self.assertEqual(result[0], formatted)
|
|
self.deferred.callback(None)
|
|
def cb3(result, *args, **kw):
|
|
if debug:
|
|
print "Clock:", result
|
|
if not result[0].isdigit():
|
|
raise Exception("Bad response:" + repr(result))
|
|
if not 1399865694 <= int(result[0]) < 1499865694:
|
|
raise Exception("Bad value:" + repr(result))
|
|
d = self.send_command("tcl:clock format 1399866027 -format \"%Y-%h-%d %T\"")
|
|
d.addCallback(cb2)
|
|
d = self.send_command("tcl:clock seconds")
|
|
d.addCallback(cb3)
|
|
return self.deferred
|
|
|
|
def test_001_002_hipadaba(self):
|
|
self.deferred = defer.Deferred()
|
|
def cb3(result, *args, **kw):
|
|
global hipadaba
|
|
hipadaba = result
|
|
self.deferred.callback(None)
|
|
d = self.load_hipadaba()
|
|
d.addCallback(cb3)
|
|
return self.deferred
|
|
|
|
def test_001_003_status(self):
|
|
d = self.send_command("status")
|
|
def cb3(result, *args, **kw):
|
|
#print "Status:", result
|
|
if not result[0].startswith("status = "):
|
|
raise Exception("Bad status:" + repr(result))
|
|
d.addCallback(cb3)
|
|
return d
|
|
|
|
def Test_002_000_dir(self):
|
|
raise unittest.SkipTest("Not doing this test now")
|
|
self.deferred = defer.Deferred()
|
|
d = self.send_command("dir types")
|
|
self.result = 0
|
|
def cb2(result, *args, **kw):
|
|
self.result += 1
|
|
result = sorted([i.strip() for i in result])
|
|
#print "Types:", result
|
|
for i in range(len(result)):
|
|
result[i] = " ".join(sorted(result[i].split()))
|
|
#print "Types:", result
|
|
if len(self.types) > 0:
|
|
type = self.types[0]
|
|
del self.types[0]
|
|
d = self.send_command("sicslist type %s" % type)
|
|
d.addCallback(cb2)
|
|
else:
|
|
#print "Dir:", self.result
|
|
self.deferred.callback(None)
|
|
def cb3(result, *args, **kw):
|
|
self.types = sorted([i.strip().replace(" ", "_") for i in result])
|
|
#print "Types:", self.types
|
|
if len(self.types) > 0:
|
|
type = self.types[0]
|
|
del self.types[0]
|
|
d = self.send_command("sicslist type %s" % type)
|
|
d.addCallback(cb2)
|
|
else:
|
|
print "Dir: None"
|
|
self.deferred.callback(None)
|
|
d.addCallback(cb3)
|
|
return self.deferred
|
|
|
|
def test_003_000_motors(self):
|
|
global motors
|
|
motors = []
|
|
d = self.send_command("sicslist type motor")
|
|
def cb3(result, *args, **kw):
|
|
global motors
|
|
motors = sorted(result[0].lower().split())
|
|
#print "Motors:", len(motors)
|
|
missing = []
|
|
for m in ["m1", "m2", "s1", "s2", "a1", "a2"]:
|
|
if m not in motors:
|
|
missing.append(m)
|
|
if len(missing) > 0:
|
|
raise Exception("Missing motors:" + repr(missing))
|
|
d.addCallback(cb3)
|
|
return d
|
|
|
|
def test_004_000_gumtreexml(self):
|
|
#raise unittest.SkipTest("Not doing this test now")
|
|
d = self.send_command("getgumtreexml /")
|
|
def cb3(result, *args, **kw):
|
|
global gumtreexml
|
|
#print "XML:", len(result)
|
|
with open("gumtree.xml", "w") as outfile:
|
|
for line in result:
|
|
outfile.write(line + '\n')
|
|
if "</hipadaba:SICS>" != result[-1]:
|
|
raise Exception("XML is not complete, ends:" + repr(result[-1]))
|
|
txt = []
|
|
for line in result:
|
|
txt.append(''.join(c for c in line if ord(c) >= 32))
|
|
gumtreexml = HipadabaTree('\n'.join(txt)).root
|
|
d.addCallback(cb3)
|
|
return d
|
|
|
|
def test_004_001_gumtreexml(self):
|
|
self.assertTrue(gumtreexml.attrib == {})
|
|
id_list = []
|
|
for child in gumtreexml.findall('component'):
|
|
id_list.append(child.attrib['id'])
|
|
#print child, child.tag, child.attrib
|
|
#for grandchild in child.findall('component'):
|
|
# print " ", grandchild.attrib
|
|
self.assertTrue('commands' in id_list)
|
|
self.assertTrue('instrument' in id_list)
|
|
self.assertTrue('sample' in id_list)
|
|
self.assertTrue('monitor' in id_list)
|
|
self.assertTrue('user' in id_list)
|
|
self.assertTrue('experiment' in id_list)
|
|
self.assertTrue('control' in id_list)
|
|
|
|
def test_004_002_gumtreexml(self):
|
|
child = gumtreexml
|
|
for node in "/sample/ps9/".strip('/').split('/'):
|
|
children = [ch for ch in child.findall('component') if ch.attrib['id'].lower() == node.lower()]
|
|
self.assertTrue(len(children) > 0)
|
|
child = children[0]
|
|
self.assertTrue(len(child.attrib) > 0)
|
|
self.assertTrue(len([ch.attrib['id'] for ch in child.findall('component')]) > 0)
|
|
self.assertTrue(len([(ch.attrib['id'], [v.text for v in ch.findall('value')]) for ch in child.findall('property')]) > 0)
|
|
|
|
def Test_005_000_drive(self):
|
|
"""Skipping example
|
|
|
|
Skips on the basis that the equipment to be tested is not configured
|
|
"""
|
|
global motors
|
|
if not "ds9" in motors:
|
|
raise unittest.SkipTest("Cannot drive motor ds9: does not exist")
|
|
d = self.send_command("drive ds9 0")
|
|
def cb3(result, *args, **kw):
|
|
#print "Drive:", result
|
|
if "Driving finished successfully" != result[-1]:
|
|
raise Exception("Drive is not complete, ends:" + repr(result[-1]))
|
|
d.addCallback(cb3)
|
|
return d
|
|
|
|
def test_006_000_drive(self):
|
|
"""Drive to where the motors already are
|
|
|
|
Should finish quickly
|
|
"""
|
|
self.deferred = defer.Deferred()
|
|
def cb3(result, *args, **kw):
|
|
#print "Drive:", result
|
|
if "Driving finished successfully" != result[-1]:
|
|
raise Exception("Drive is not complete, ends:" + repr(result[-1]))
|
|
for item in result:
|
|
if item.startswith("New m1 position:"):
|
|
self.assertTrue(round(float(item.split(":")[1]) - self.m1, 3) == 0)
|
|
if item.startswith("New m2 position:"):
|
|
self.assertTrue(round(float(item.split(":")[1]) - self.m2, 3) == 0)
|
|
self.deferred.callback(True)
|
|
def cb2(result, *args, **kw):
|
|
#print "m2:", result
|
|
if not result[0].startswith("m2 = "):
|
|
raise Exception("Unexpected m2 response: " + repr(result))
|
|
self.m2 = float(result[0].split("=")[1])
|
|
d = self.send_command("drive m1 %.3f m2 %.3f" % (self.m1, self.m2))
|
|
d.addCallback(cb3)
|
|
def cb1(result, *args, **kw):
|
|
#print "m1:", result
|
|
if not result[0].startswith("m1 = "):
|
|
raise Exception("Unexpected m1 response: " + repr(result))
|
|
self.m1 = float(result[0].split("=")[1])
|
|
d = self.send_command("m2")
|
|
d.addCallback(cb2)
|
|
d = self.send_command("m1")
|
|
d.addCallback(cb1)
|
|
return self.deferred
|
|
test_006_000_drive.timeout = 10
|
|
|
|
def Test_007_000_mercury(self):
|
|
raise unittest.SkipTest("Not doing this test now (borken)")
|
|
global config, hipadaba
|
|
debug = False
|
|
def cb0(result, *args, **kw):
|
|
if debug:
|
|
print "\nMercury:", result.short_tree()
|
|
self.deferred.callback(result)
|
|
if "mercury_scpi" in config.sections() and config.get("mercury_scpi", "enabled"):
|
|
self.deferred = defer.Deferred()
|
|
d = self.load_hipadaba("/sample/%s" % config.get("mercury_scpi", "name"))
|
|
d.addCallback(cb0)
|
|
return self.deferred
|
|
else:
|
|
raise unittest.SkipTest("mercury_scpi is not configured")
|
|
Test_007_000_mercury.timeout = 2
|
|
|
|
def test_008_000_mercury_hipadaba(self):
|
|
"""Test the hipadaba tree and mechanism
|
|
"""
|
|
global hipadaba
|
|
debug = False
|
|
if not 'hipadaba' in globals() or hipadaba is None:
|
|
raise unittest.SkipTest("hipadaba is missing")
|
|
else:
|
|
if False:
|
|
hipadaba.print_tree()
|
|
root_children = hipadaba.getChildren('/')
|
|
if debug:
|
|
print "Children:", root_children
|
|
root_children = [i.lower() for i in root_children]
|
|
self.assertTrue('sample' in root_children)
|
|
sample_children = hipadaba.getChildren('/sample')
|
|
if debug:
|
|
print "Children:", sample_children
|
|
sample_children = [i.lower() for i in sample_children]
|
|
self.assertTrue('tc9' in sample_children)
|
|
level_children = hipadaba.getChildren('/sample/tc9/level/')
|
|
if debug:
|
|
print "Children:", level_children
|
|
level_children = [i.lower() for i in level_children]
|
|
self.assertTrue('helium' in level_children)
|
|
self.assertTrue('nitrogen' in level_children)
|
|
he_props = hipadaba.getProperties('/sample/tc9/level/helium')
|
|
if debug:
|
|
print "Properties:", he_props
|
|
he_props = [i.lower() for i in he_props]
|
|
self.assertTrue('data' in he_props)
|
|
self.assertTrue('control' in he_props)
|
|
self.assertTrue('nxsave' in he_props)
|
|
self.assertTrue('nxalias' in he_props)
|
|
nxalias = hipadaba.getProperty('/sample/tc9/level/helium', 'NXalias')
|
|
if debug:
|
|
print "Prop:", nxalias
|
|
self.assertTrue(nxalias.lower() == 'tc9_level_helium'.lower())
|
|
y = hipadaba.getValue('/sample/tc9/Level/helium')
|
|
if debug:
|
|
print "Value:", y
|
|
y = hipadaba.getNode('/sample/tc9/level/helium')
|
|
if debug:
|
|
print "Found:", hipadaba.short_tree(y)
|
|
if debug:
|
|
hipadaba.print_tree(y)
|
|
text = hipadaba.list_tree(y, props=True, indent=6)
|
|
if debug:
|
|
print "Tree (/sample/tc9/level/helium):"
|
|
for line in text:
|
|
print line
|
|
|
|
class Zulu(Able):
|
|
def test_performance(self):
|
|
d = self.send_command("performance")
|
|
def cb3(result, *args, **kw):
|
|
#print "Performance:", result
|
|
junk, performance = result[0].split("=")
|
|
if not junk.startswith("Performance"):
|
|
raise Exception("Can't find Performance")
|
|
if float(performance) < 50:
|
|
raise Exception("Performance too low: %s" % performance)
|
|
if float(performance) > 100:
|
|
raise Exception("Performance too high %s" % performance)
|
|
d.addCallback(cb3)
|
|
return d
|
|
|
|
|
|
#class SicsTestCase(unittest.TestCase):
|
|
# callbacks = []
|
|
#
|
|
# def _test(command):
|
|
# d = defer.Deferred()
|
|
# self.d = d
|
|
# self.client.sendMessage(command)
|
|
# return d
|
|
#
|
|
# def test_status1(self):
|
|
# d = _test("status")
|
|
# return d
|
|
#
|
|
# def test_0000(self):
|
|
# d = defer.Deferred()
|
|
# self.d = d
|
|
# def cb2(result, *args, **kw):
|
|
# print "Login Callback", result, args
|
|
# print "Add cb2", d
|
|
# d = d.addCallback(cb2, "Fred2")
|
|
# #reactor.callLater(5, d.callback, False)
|
|
# return d
|
|
#
|
|
# def test_status(self):
|
|
# d = defer.Deferred()
|
|
# self.d = d
|
|
# self.client.sendMessage("status")
|
|
# #reactor.callLater(5, d.callback, False)
|
|
# return d
|
|
#
|
|
# def add_callback(self, cb):
|
|
# if not cb in self.callbacks:
|
|
# self.callbacks.append(cb)
|
|
#
|
|
# def del_callback(self, cb):
|
|
# if cb in self.callbacks:
|
|
# self.callbacks.remove(cb)
|
|
#
|
|
# def call_callbacks(self, data):
|
|
# d = self.d
|
|
# self.d = None
|
|
# print "Callback:", d, data
|
|
# for cb in self.callbacks:
|
|
# cb(self, data)
|
|
# if d:
|
|
# def cb3(result, *args, **kw):
|
|
# print "Xqt Callback", result, args
|
|
# print "Add cb3", d
|
|
# d.addCallback(cb3)
|
|
# print "Call Callback", d
|
|
# d.callback(data)
|
|
#
|