Changes to sics_test.py for Wombat testing

This commit is contained in:
Douglas Clowes
2014-07-02 13:45:22 +10:00
parent 5454dfb009
commit 1134aa815b

View File

@@ -9,6 +9,7 @@
from sics_proxy import SICSClientFactory, SicsProtocol
from sics_hdb import HipadabaTree
from twisted.python import log
from twisted.trial import unittest
from twisted.internet import reactor, protocol, defer
@@ -19,7 +20,7 @@ class Able(unittest.TestCase):
def setUp(self):
debug = False
if debug:
print "Connecting ...",
log.msg("Connecting ...")
self.factory = SICSClientFactory()
self.sics = None
self.deferred = defer.Deferred()
@@ -29,6 +30,9 @@ class Able(unittest.TestCase):
if 'instrument_name' not in globals():
d = self.get_instrument_name()
d.addCallback(cb_login, "MyName")
elif 'simulation_type' not in globals():
d = self.get_simulation_type()
d.addCallback(cb_login, "MySIM")
elif 'test_suite' not in globals():
d = self.load_test_suite()
d.addCallback(cb_login, "Test_Suite")
@@ -40,6 +44,7 @@ class Able(unittest.TestCase):
d.addCallback(cb_login, "HiPaDaBa")
else:
self.deferred.callback(True)
return client
def cb_connect(client, *args, **kw):
if debug:
print "cb_connect:", client, client.login, args, kw
@@ -47,15 +52,22 @@ class Able(unittest.TestCase):
self.sics.factory = self.factory
self.sics.login_deferred.addCallback(cb_login, "Login", Fred="Fred")
if debug:
print "Connected"
log.msg("Connected")
return client
creator = protocol.ClientCreator(reactor, SicsProtocol)
d = creator.connectTCP("localhost", 60003)
d = d.addCallback(cb_connect)
return self.deferred
def send_command(self, command):
def cb(result, *args, **kw):
log.msg("Result: " + repr(result))
log.msg("Args: " + repr(args))
log.msg("Kw: " + repr(kw))
return result
log.msg("Command: " + repr(command))
d = self.sics.sendMessage(command)
#print "Command:", command, d
d.addCallback(cb)
return d
def get_instrument_name(self):
@@ -72,10 +84,28 @@ class Able(unittest.TestCase):
Quokka.skip = False
if instrument_name == "taipan":
Taipan.skip = False
if instrument_name == "wombat":
Wombat.skip = False
return result
d = self.send_command("instrument")
d.addCallback(cb1)
return d
def get_simulation_type(self):
debug = False
def cb1(result, *args, **kw):
global simulation_type
if debug:
print "SICS_SIMULATION:", result, args, kw
self.assertTrue(result[0].lower().startswith("sics_simulation = "))
simulation_type = result[0].split("=")[1].strip().lower()
if debug:
print "SICS_SIMULATION:", simulation_type
return result
d = self.send_command("sics_simulation")
d.addCallback(cb1)
return d
def load_test_suite(self):
debug = False
def cb1(result, *args, **kw):
@@ -85,6 +115,7 @@ class Able(unittest.TestCase):
if result[0] != "OK":
raise Exception("fileeval returned " + repr(result))
test_suite = True
return result
d = self.send_command("fileeval TEST_SICS/unit_tests/test_suite.tcl")
d.addCallback(cb1)
return d
@@ -109,6 +140,7 @@ class Able(unittest.TestCase):
print "ini:", dir(ini)
sicsconfig = ConfigParser.SafeConfigParser()
sicsconfig.readfp(ini)
return result
d = self.send_command("tcl:test_suite::show_config ::config_dict")
d.addCallback(cb2)
return d
@@ -125,6 +157,7 @@ class Able(unittest.TestCase):
raise Exception("Tree does not end with </hipadaba:SICS>:" + result[-1])
self.hipadaba = HipadabaTree('\n'.join(result))
hipadaba = self.hipadaba
return result
d = self.send_command("tcl:test_suite::getsicsxml /")
d.addCallback(cb0)
return d
@@ -147,10 +180,11 @@ class Baker(Able):
print "Login:", result, args, kw
if self.sics.login != 2:
raise Exception("Bad login:" + repr(self.sics.login))
return result
d = self.sics.login_deferred
d.addCallback(cb0)
return d
test_000_000_login.timeout = 10
test_000_000_login.timeout = 30
def test_000_001_fileeval(self):
self.assertTrue('test_suite' in globals())
@@ -181,6 +215,7 @@ class Baker(Able):
formatted = dt.fromtimestamp(1399866027).strftime("%Y-%b-%d %H:%M:%S")
self.assertEqual(result[0], formatted)
self.deferred.callback(None)
return result
def cb3(result, *args, **kw):
if debug:
print "Clock:", result
@@ -190,6 +225,7 @@ class Baker(Able):
raise Exception("Bad value:" + repr(result))
d = self.send_command("tcl:clock format 1399866027 -format \"%Y-%h-%d %T\"")
d.addCallback(cb2)
return result
d = self.send_command("tcl:clock seconds")
d.addCallback(cb3)
return self.deferred
@@ -204,6 +240,7 @@ class Baker(Able):
#print "Status:", result
if not result[0].startswith("status = "):
raise Exception("Bad status:" + repr(result))
return result
d.addCallback(cb3)
return d
@@ -227,6 +264,7 @@ class Baker(Able):
else:
#print "Dir:", self.result
self.deferred.callback(None)
return result
def cb3(result, *args, **kw):
self.types = sorted([i.strip().replace(" ", "_") for i in result])
#print "Types:", self.types
@@ -238,6 +276,7 @@ class Baker(Able):
else:
print "Dir: None"
self.deferred.callback(None)
return result
d.addCallback(cb3)
return self.deferred
@@ -255,6 +294,7 @@ class Baker(Able):
missing.append(m)
if instrument_name == "taipan" and len(missing) > 0:
raise Exception("Missing motors:" + repr(missing))
return result
d.addCallback(cb3)
return d
@@ -274,7 +314,7 @@ class Baker(Able):
d.addCallback(cb, phase + 1)
else:
raise unittest.SkipTest("Motor is not fake")
return
return result
target += 1
if phase == target:
@@ -283,13 +323,14 @@ class Baker(Able):
d.addCallback(cb, phase + 1)
else:
raise unittest.SkipTest("Motor is not fake")
return
return result
target += 1
if phase == target:
self.assertTrue(result[0].startswith("SIMULATE"))
self.deferred.callback(True)
return result
self.deferred = defer.Deferred();
d = self.send_command("m1 send lv")
d.addCallback(cb, 1)
@@ -310,6 +351,7 @@ class Baker(Able):
for line in result:
txt.append(''.join(c for c in line if ord(c) >= 32))
gumtreexml = HipadabaTree('\n'.join(txt)).root
return result
d.addCallback(cb3)
return d
@@ -331,7 +373,10 @@ class Baker(Able):
def test_004_002_gumtreexml(self):
global sicsconfig
if not ('pfeiffer_hg' in sicsconfig.sections() and sicsconfig.get("pfeiffer_hg", "enabled") == "True"):
if not (\
'pfeiffer_hg' in sicsconfig.sections() and \
"enabled" in sicsconfig.options("pfeiffer_hg") and \
sicsconfig.get("pfeiffer_hg", "enabled") == "True"):
raise unittest.SkipTest("Pfeiffer PS9 is not configured")
child = gumtreexml
for node in "/sample/ps9/".strip('/').split('/'):
@@ -354,6 +399,7 @@ class Baker(Able):
for line in result:
txt.append(''.join(c for c in line if ord(c) >= 32))
gumtreexml = HipadabaTree('\n'.join(txt)).root
return result
d.addCallback(cb3)
return d
@@ -386,6 +432,7 @@ class Baker(Able):
#print "Drive:", result
if "Driving finished successfully" != result[-1]:
raise Exception("Drive is not complete, ends:" + repr(result[-1]))
return result
d.addCallback(cb3)
return d
@@ -408,6 +455,7 @@ class Baker(Able):
if item.startswith("New m2 position:"):
self.assertTrue(round(float(item.split(":")[1]) - self.m2, 3) == 0)
self.deferred.callback(True)
return result
def cb2(result, *args, **kw):
#print "m2:", result
if not result[0].startswith("m2 = "):
@@ -415,6 +463,7 @@ class Baker(Able):
self.m2 = float(result[0].split("=")[1])
d = self.send_command("drive m1 %.3f m2 %.3f" % (self.m1, self.m2))
d.addCallback(cb3)
return result
def cb1(result, *args, **kw):
#print "m1:", result
if not result[0].startswith("m1 = "):
@@ -422,6 +471,7 @@ class Baker(Able):
self.m1 = float(result[0].split("=")[1])
d = self.send_command("m2")
d.addCallback(cb2)
return result
d = self.send_command("m1")
d.addCallback(cb1)
return self.deferred
@@ -448,6 +498,7 @@ class Baker(Able):
self.assertTrue(round(float(item.split(":")[1]) - self.m2, 2) == 0,
"Motor m2 position %f not equal target %f" % (float(item.split(":")[1]), self.m2))
self.deferred.callback(True)
return result
def cb2(result, *args, **kw):
#print "m2:", result
if not result[0].startswith("m2 = "):
@@ -463,6 +514,7 @@ class Baker(Able):
self.m2 = round(self.m2 + 1, 0)
d = self.send_command("drive m1 %.3f m2 %.3f" % (self.m1, self.m2))
d.addCallback(cb3)
return result
def cb1(result, *args, **kw):
#print "m1:", result
if not result[0].startswith("m1 = "):
@@ -470,6 +522,7 @@ class Baker(Able):
self.m1 = float(result[0].split("=")[1])
d = self.send_command("m2")
d.addCallback(cb2)
return result
d = self.send_command("m1")
d.addCallback(cb1)
return self.deferred
@@ -483,7 +536,10 @@ class Baker(Able):
if debug:
print "\nMercury:", result.short_tree()
self.deferred.callback(result)
if not ("mercury_scpi" in sicsconfig.sections() and sicsconfig.get("mercury_scpi", "enabled") == "True"):
return result
if not ("mercury_scpi" in sicsconfig.sections() and \
"enabled" in sicsconfig.options("mercury_scpi") and \
sicsconfig.get("mercury_scpi", "enabled") == "True"):
raise unittest.SkipTest("mercury_scpi is not configured")
self.deferred = defer.Deferred()
d = self.load_hipadaba("/sample/%s" % sicsconfig.get("mercury_scpi", "name"))
@@ -498,7 +554,9 @@ class Baker(Able):
debug = False
self.assertTrue('hipadaba' in globals())
self.assertFalse(hipadaba is None, "hipadaba is missing")
if not ('mercury_scpi' in sicsconfig.sections() and sicsconfig.get("mercury_scpi", "enabled") == "True"):
if not ('mercury_scpi' in sicsconfig.sections() and \
"enabled" in sicsconfig.options("mercury_scpi") and \
sicsconfig.get("mercury_scpi", "enabled") == "True"):
raise unittest.SkipTest("Mercury TC9 is not configured")
if False:
@@ -560,44 +618,45 @@ class Posit(Able):
if phase == target:
d = self.send_command("m1 positions 10 20 30")
d.addCallback(cb, phase + 1)
return
return result
target += 1
if phase == target:
d = self.send_command("m1 position_names One Two Three")
d.addCallback(cb, phase + 1)
return
return result
target += 1
if phase == target:
d = self.send_command("m1 position_names")
d.addCallback(cb, phase + 1)
return
return result
target += 1
if phase == target:
d = self.send_command("m1 softzero 1.0")
d.addCallback(cb, phase + 1)
return
return result
target += 1
if phase == target:
d = self.send_command("m1 posit2hard three")
d.addCallback(cb, phase + 1)
return
return result
target += 1
if phase == target:
d = self.send_command("m1 posit2soft three")
d.addCallback(cb, phase + 1)
return
return result
target += 1
if phase == target:
d = self.send_command("m1 positions erase")
d.addCallback(cb, phase + 1)
return
return result
target += 1
if phase == target:
d = self.send_command("m1 softzero 0.0")
d.addCallback(cb, phase + 1)
return
return result
self.deferred.callback(True)
return result
self.deferred = defer.Deferred();
d = self.send_command("m1 positions erase")
d.addCallback(cb, 1)
@@ -621,6 +680,7 @@ class Python(Able):
python_installed = False
self.deferred.callback(True)
return result
self.deferred = defer.Deferred();
d = self.send_command("sicslist python")
d.addCallback(cb, 1)
@@ -642,13 +702,14 @@ class Python(Able):
if result[0].startswith("Hello Sailor"):
d = self.send_command("python sics.puts(sics.sics(\"clientput Hello World\"))")
d.addCallback(cb, phase + 1)
return
return result
target += 1
if phase == target:
self.assertTrue(result[0].startswith("Hello World"))
self.deferred.callback(True)
return result
self.deferred = defer.Deferred();
d = self.send_command("python sics.puts(\"Hello Sailor\")")
d.addCallback(cb, 1)
@@ -680,6 +741,35 @@ class Taipan(Able):
if instrument_name == "taipan" and len(missing) > 0:
raise Exception("Missing motors:" + repr(missing))
class Wombat(Able):
skip = True
def test_000_000_motors(self):
global motors
if instrument_name != "wombat":
raise unittest.SkipTest("Instrument is not Wombat")
missing = []
for m in ["mphi", "mchi", "mom", "mtth", "som", "stth"]:
if m not in motors:
missing.append(m)
if instrument_name == "wombat" and len(missing) > 0:
raise Exception("Missing motors:" + repr(missing))
def test_001_000_newfile(self):
def cb(result, *args, **kw):
self.assertTrue(result[0] == "OK")
return result
d = self.send_command("newfile HISTOGRAM_XY scratch")
d.addCallback(cb)
return d
def test_001_001_save(self):
def cb(result, *args, **kw):
self.assertTrue("OK" in result)
return result
d = self.send_command("save")
d.addCallback(cb)
return d
class Zulu(Able):
timeout = 5
def test_performance(self):