Add test setup for dingo/echidna and unit test for wombat

This commit is contained in:
Douglas Clowes
2014-10-15 12:19:27 +11:00
parent 539ea4d9da
commit 2820cc3c68
4 changed files with 918 additions and 0 deletions

View File

@ -0,0 +1,22 @@
#!/bin/bash
echo me:${0}
. $(dirname ${0})/common
status=${?}
echo root:${status}:${root}
if [ ! ${status} ]; then
exit 1
fi
gnome-terminal \
--tab --title="Motors" \
--working-directory=${root}/fakeGalil \
--command="${root}/fakeGalil/SIM_GALIL.py -w dingo" \
--tab --title="Mercury" \
--working-directory=${root}/fakeTempControl/oxford \
--command="${root}/fakeTempControl/oxford/SIM_MERCURY.py -w" \
--tab --title="Pfeiffer" \
--working-directory=${root}/fakePfeiffer/tpg261 \
--command="${root}/fakePfeiffer/tpg261/SIM_PFEIFFER.py -w" \
--tab --title="Bash" \
--working-directory=${root} \
--command="bash"

View File

@ -0,0 +1,22 @@
#!/bin/bash
echo me:${0}
. $(dirname ${0})/common
status=${?}
echo root:${status}:${root}
if [ ! ${status} ]; then
exit 1
fi
gnome-terminal \
--tab --title="Motors" \
--working-directory=${root}/fakeGalil \
--command="${root}/fakeGalil/SIM_GALIL.py -w echidna" \
--tab --title="Mercury" \
--working-directory=${root}/fakeTempControl/oxford \
--command="${root}/fakeTempControl/oxford/SIM_MERCURY.py -w" \
--tab --title="Pfeiffer" \
--working-directory=${root}/fakePfeiffer/tpg261 \
--command="${root}/fakePfeiffer/tpg261/SIM_PFEIFFER.py -w" \
--tab --title="Bash" \
--working-directory=${root} \
--command="bash"

View File

@ -0,0 +1,841 @@
#!/usr/bin/env python
# vim: ts=8 sts=4 sw=4 expandtab
# Author: Douglas Clowes (dcl@ansto.gov.au) 2014-04-24
#
# Looks like test cases are run alphabetically within classes
# and classes are run alphabetically too.
# So, Able is the base class derived from Testcase
#
from sics_proxy import SICSClientFactory, SicsProtocol
from sics_hdb import HipadabaTree
from twisted.python import log
from twisted.trial import unittest
from twisted.internet import reactor, protocol, defer
import ConfigParser, StringIO
from datetime import datetime as dt
class Able(unittest.TestCase):
def setUp(self):
debug = False
if debug:
log.msg("Connecting ...")
self.factory = SICSClientFactory()
self.sics = None
self.deferred = defer.Deferred()
def cb_login(client, *args, **kw):
if debug:
print "cb_login:", client, args, kw
if 'instrument_name' not in globals():
d = self.get_instrument_name()
d.addCallback(cb_login, "MyName")
elif 'simulation_type' not in globals():
d = self.get_simulation_type()
d.addCallback(cb_login, "MySIM")
elif 'test_suite' not in globals():
d = self.load_test_suite()
d.addCallback(cb_login, "Test_Suite")
elif 'sicsconfig' not in globals():
d = self.load_sicsconfig()
d.addCallback(cb_login, "SICS_Config")
elif 'hipadaba' not in globals():
d = self.load_hipadaba()
d.addCallback(cb_login, "HiPaDaBa")
else:
self.deferred.callback(True)
return client
def cb_connect(client, *args, **kw):
if debug:
print "cb_connect:", client, client.login, args, kw
self.sics = client
self.sics.factory = self.factory
self.sics.login_deferred.addCallback(cb_login, "Login", Fred="Fred")
if debug:
log.msg("Connected")
return client
creator = protocol.ClientCreator(reactor, SicsProtocol)
d = creator.connectTCP("localhost", 60003)
d = d.addCallback(cb_connect)
return self.deferred
def send_command(self, command):
def cb(result, *args, **kw):
log.msg("Result: " + repr(result))
log.msg("Args: " + repr(args))
log.msg("Kw: " + repr(kw))
return result
log.msg("Command: " + repr(command))
d = self.sics.sendMessage(command)
d.addCallback(cb)
return d
def get_instrument_name(self):
debug = False
def cb1(result, *args, **kw):
global instrument_name
if debug:
print "instrument:", result, args, kw
self.assertTrue(result[0].lower().startswith("instrument = "))
instrument_name = result[0].split("=")[1].strip().lower()
if debug:
print "Instrument:", instrument_name
if instrument_name == "quokka":
Quokka.skip = False
if instrument_name == "taipan":
Taipan.skip = False
if instrument_name == "wombat":
Wombat.skip = False
return result
d = self.send_command("instrument")
d.addCallback(cb1)
return d
def get_simulation_type(self):
debug = False
def cb1(result, *args, **kw):
global simulation_type
if debug:
print "SICS_SIMULATION:", result, args, kw
self.assertTrue(result[0].lower().startswith("sics_simulation = "))
simulation_type = result[0].split("=")[1].strip().lower()
if debug:
print "SICS_SIMULATION:", simulation_type
return result
d = self.send_command("sics_simulation")
d.addCallback(cb1)
return d
def load_test_suite(self):
debug = False
def cb1(result, *args, **kw):
global test_suite
if debug:
print "fileeval:", result, args, kw
if result[0] != "OK":
raise Exception("fileeval returned " + repr(result))
test_suite = True
return result
d = self.send_command("fileeval TEST_SICS/unit_tests/test_suite.tcl")
d.addCallback(cb1)
return d
def load_sicsconfig(self):
debug = False
def cb2(result, *args, **kw):
global sicsconfig
with open("sicsconfig.ini", "w") as outfile:
for line in result:
outfile.write(line + '\n')
if debug:
print "config:", result, args, kw
self.assertFalse(result[0].startswith("can't read"))
self.assertFalse(result[0].startswith("... No dictionary ..."))
ini = StringIO.StringIO("\n".join(result))
if debug:
print "ini:", ini.readlines()
ini.seek(0)
print "ini:", ini.read()
ini.seek(0)
print "ini:", dir(ini)
sicsconfig = ConfigParser.SafeConfigParser()
sicsconfig.readfp(ini)
return result
d = self.send_command("tcl:test_suite::show_config ::config_dict")
d.addCallback(cb2)
return d
def load_hipadaba(self):
def cb0(result, *args, **kw):
global hipadaba
with open("hipadaba.xml", "w") as outfile:
for line in result:
outfile.write(line + '\n')
if result[0][0] != '<':
raise Exception("Tree does not start with left angle:" + result[0])
if result[-1] != '</hipadaba:SICS>':
raise Exception("Tree does not end with </hipadaba:SICS>:" + result[-1])
self.hipadaba = HipadabaTree('\n'.join(result))
hipadaba = self.hipadaba
return result
d = self.send_command("tcl:test_suite::getsicsxml /")
d.addCallback(cb0)
return d
def tearDown(self):
#print "tearDownModule"
if self.sics is not None:
self.sics.login_deferred = None
self.sics.command_deferred = None
self.sics.transport.loseConnection()
self.sics = None
self.deferred = None
class Baker(Able):
timeout = 5
def test_000_000_login(self):
debug = False
def cb0(result, *args, **kw):
if debug:
print "Login:", result, args, kw
if self.sics.login != 2:
raise Exception("Bad login:" + repr(self.sics.login))
return result
d = self.sics.login_deferred
d.addCallback(cb0)
return d
test_000_000_login.timeout = 30
def test_000_001_fileeval(self):
self.assertTrue('test_suite' in globals())
self.assertTrue(test_suite)
def test_000_002_config(self):
debug = False
self.assertTrue('sicsconfig' in globals())
if debug:
print "config:", repr(sicsconfig)
print "config:", dir(sicsconfig)
for section in sorted(sicsconfig.sections()):
print ("[%s]\n" % section),
for option in sorted(sicsconfig.options(section)):
print ("%s = %s\n" % (option, sicsconfig.get(section, option))),
print
def test_000_003_hipadaba(self):
debug = False
self.assertTrue('hipadaba' in globals())
def test_001_001_clock(self):
debug = False
self.deferred = defer.Deferred()
def cb2(result, *args, **kw):
if debug:
print "Clock:", result
formatted = dt.fromtimestamp(1399866027).strftime("%Y-%b-%d %H:%M:%S")
self.assertEqual(result[0], formatted)
self.deferred.callback(None)
return result
def cb3(result, *args, **kw):
if debug:
print "Clock:", result
if not result[0].isdigit():
raise Exception("Bad response:" + repr(result))
if not 1399865694 <= int(result[0]) < 1499865694:
raise Exception("Bad value:" + repr(result))
d = self.send_command("tcl:clock format 1399866027 -format \"%Y-%h-%d %T\"")
d.addCallback(cb2)
return result
d = self.send_command("tcl:clock seconds")
d.addCallback(cb3)
return self.deferred
def test_001_002_hipadaba(self):
debug = False
self.assertTrue('hipadaba' in globals())
def test_001_003_status(self):
d = self.send_command("status")
def cb3(result, *args, **kw):
#print "Status:", result
if not result[0].startswith("status = "):
raise Exception("Bad status:" + repr(result))
return result
d.addCallback(cb3)
return d
def Test_002_000_dir(self):
raise unittest.SkipTest("Not doing this test now")
self.deferred = defer.Deferred()
d = self.send_command("dir types")
self.result = 0
def cb2(result, *args, **kw):
self.result += 1
result = sorted([i.strip() for i in result])
#print "Types:", result
for i in range(len(result)):
result[i] = " ".join(sorted(result[i].split()))
#print "Types:", result
if len(self.types) > 0:
type = self.types[0]
del self.types[0]
d = self.send_command("sicslist type %s" % type)
d.addCallback(cb2)
else:
#print "Dir:", self.result
self.deferred.callback(None)
return result
def cb3(result, *args, **kw):
self.types = sorted([i.strip().replace(" ", "_") for i in result])
#print "Types:", self.types
if len(self.types) > 0:
type = self.types[0]
del self.types[0]
d = self.send_command("sicslist type %s" % type)
d.addCallback(cb2)
else:
print "Dir: None"
self.deferred.callback(None)
return result
d.addCallback(cb3)
return self.deferred
def test_003_000_motors(self):
global motors
motors = []
d = self.send_command("sicslist type motor")
def cb3(result, *args, **kw):
global motors
motors = sorted(result[0].lower().split())
#print "Motors:", len(motors)
missing = []
for m in ["m1", "m2", "s1", "s2", "a1", "a2"]:
if m not in motors:
missing.append(m)
if instrument_name == "taipan" and len(missing) > 0:
raise Exception("Missing motors:" + repr(missing))
return result
d.addCallback(cb3)
return d
def test_003_001_motors(self):
debug = False
if instrument_name != "taipan":
raise unittest.SkipTest("Cannot test motor m1, m2: not taipan")
def cb(result, *args, **kw):
if debug:
print "\nLV:", result, args, kw
phase = args[0]
target = 0
target += 1
if phase == target:
if result[0].startswith("SIMULATE"):
d = self.send_command("m2 send lv")
d.addCallback(cb, phase + 1)
else:
raise unittest.SkipTest("Motor is not fake")
return result
target += 1
if phase == target:
if result[0].startswith("SIMULATE"):
d = self.send_command("s1 send lv")
d.addCallback(cb, phase + 1)
else:
raise unittest.SkipTest("Motor is not fake")
return result
target += 1
if phase == target:
self.assertTrue(result[0].startswith("SIMULATE"))
self.deferred.callback(True)
return result
self.deferred = defer.Deferred();
d = self.send_command("m1 send lv")
d.addCallback(cb, 1)
return self.deferred
def test_004_000_gumtreexml(self):
#raise unittest.SkipTest("Not doing this test now")
d = self.send_command("getgumtreexml /")
def cb3(result, *args, **kw):
global gumtreexml
#print "XML:", len(result)
with open("gumtree.xml", "w") as outfile:
for line in result:
outfile.write(line + '\n')
if "</hipadaba:SICS>" != result[-1]:
raise Exception("XML is not complete, ends:" + repr(result[-1]))
txt = []
for line in result:
txt.append(''.join(c for c in line if ord(c) >= 32))
gumtreexml = HipadabaTree('\n'.join(txt)).root
return result
d.addCallback(cb3)
return d
def test_004_001_gumtreexml(self):
self.assertTrue(gumtreexml.attrib == {})
id_list = []
for child in gumtreexml.findall('component'):
id_list.append(child.attrib['id'])
#print child, child.tag, child.attrib
#for grandchild in child.findall('component'):
# print " ", grandchild.attrib
self.assertTrue('commands' in id_list)
self.assertTrue('instrument' in id_list)
self.assertTrue('sample' in id_list)
self.assertTrue('monitor' in id_list)
self.assertTrue('user' in id_list)
self.assertTrue('experiment' in id_list)
self.assertTrue('control' in id_list)
def test_004_002_gumtreexml(self):
global sicsconfig
if not (\
'pfeiffer_hg' in sicsconfig.sections() and \
"enabled" in sicsconfig.options("pfeiffer_hg") and \
sicsconfig.get("pfeiffer_hg", "enabled") == "True"):
raise unittest.SkipTest("Pfeiffer PS9 is not configured")
child = gumtreexml
for node in "/sample/ps9/".strip('/').split('/'):
children = [ch for ch in child.findall('component') if ch.attrib['id'].lower() == node.lower()]
self.assertTrue(len(children) > 0)
child = children[0]
self.assertTrue(len(child.attrib) > 0)
self.assertTrue(len([ch.attrib['id'] for ch in child.findall('component')]) > 0)
self.assertTrue(len([(ch.attrib['id'], [v.text for v in ch.findall('value')]) for ch in child.findall('property')]) > 0)
def test_004_003_gumtreexmlvalues(self):
#raise unittest.SkipTest("Not doing this test now")
d = self.send_command("getgumtreexmlvalues /")
def cb3(result, *args, **kw):
global gumtreexml
#print "XML:", len(result)
if "</hipadaba:SICS>" != result[-1]:
raise Exception("XML is not complete, ends:" + repr(result[-1]))
txt = []
for line in result:
txt.append(''.join(c for c in line if ord(c) >= 32))
gumtreexml = HipadabaTree('\n'.join(txt)).root
return result
d.addCallback(cb3)
return d
def test_004_004_gumtreexmlvalues(self):
self.assertTrue(gumtreexml.attrib == {})
id_list = []
for child in gumtreexml.findall('component'):
id_list.append(child.attrib['id'])
#print child, child.tag, child.attrib
#for grandchild in child.findall('component'):
# print " ", grandchild.attrib
self.assertTrue('commands' in id_list)
self.assertTrue('instrument' in id_list)
self.assertTrue('sample' in id_list)
self.assertTrue('monitor' in id_list)
self.assertTrue('user' in id_list)
self.assertTrue('experiment' in id_list)
self.assertTrue('control' in id_list)
def Test_005_000_drive_skip(self):
"""Skipping example
Skips on the basis that the equipment to be tested is not configured
"""
global motors
if not "ds9" in motors:
raise unittest.SkipTest("Cannot drive motor ds9: does not exist")
d = self.send_command("drive ds9 0")
def cb3(result, *args, **kw):
#print "Drive:", result
if "Driving finished successfully" != result[-1]:
raise Exception("Drive is not complete, ends:" + repr(result[-1]))
return result
d.addCallback(cb3)
return d
def test_006_000_drive_zero(self):
"""Drive to where the motors already are
Should finish quickly
"""
if instrument_name != "taipan":
raise unittest.SkipTest("Cannot drive motor m1, m2: not taipan")
self.deferred = defer.Deferred()
def cb3(result, *args, **kw):
#print "Drive:", result
if "Driving finished successfully" != result[-1]:
raise Exception("Drive is not complete, ends:" + repr(result[-1]))
for item in result:
if item.startswith("New m1 position:"):
self.assertTrue(round(float(item.split(":")[1]) - self.m1, 3) == 0)
if item.startswith("New m2 position:"):
self.assertTrue(round(float(item.split(":")[1]) - self.m2, 3) == 0)
self.deferred.callback(True)
return result
def cb2(result, *args, **kw):
#print "m2:", result
if not result[0].startswith("m2 = "):
raise Exception("Unexpected m2 response: " + repr(result))
self.m2 = float(result[0].split("=")[1])
d = self.send_command("drive m1 %.3f m2 %.3f" % (self.m1, self.m2))
d.addCallback(cb3)
return result
def cb1(result, *args, **kw):
#print "m1:", result
if not result[0].startswith("m1 = "):
raise Exception("Unexpected m1 response: " + repr(result))
self.m1 = float(result[0].split("=")[1])
d = self.send_command("m2")
d.addCallback(cb2)
return result
d = self.send_command("m1")
d.addCallback(cb1)
return self.deferred
test_006_000_drive_zero.timeout = 5
def test_006_001_drive_one(self):
"""Drive to where the motors already are
Should finish quickly
"""
if instrument_name != "taipan":
raise unittest.SkipTest("Cannot drive motor m1, m2: not taipan")
self.deferred = defer.Deferred()
def cb3(result, *args, **kw):
#print "Drive:", result
if "Driving finished successfully" != result[-1]:
raise Exception("Drive is not complete, ends:" + repr(result[-1]))
for item in result:
if item.startswith("New m1 position:"):
self.assertTrue(round(float(item.split(":")[1]) - self.m1, 2) == 0,
"Motor m1 position %f not equal target %f" % (float(item.split(":")[1]), self.m1))
if item.startswith("New m2 position:"):
self.assertTrue(round(float(item.split(":")[1]) - self.m2, 2) == 0,
"Motor m2 position %f not equal target %f" % (float(item.split(":")[1]), self.m2))
self.deferred.callback(True)
return result
def cb2(result, *args, **kw):
#print "m2:", result
if not result[0].startswith("m2 = "):
raise Exception("Unexpected m2 response: " + repr(result))
self.m2 = float(result[0].split("=")[1])
if self.m1 > 20.1:
self.m1 = round(self.m1 - 1, 0)
else:
self.m1 = round(self.m1 + 1, 0)
if self.m2 > 20.1:
self.m2 = round(self.m2 - 1, 0)
else:
self.m2 = round(self.m2 + 1, 0)
d = self.send_command("drive m1 %.3f m2 %.3f" % (self.m1, self.m2))
d.addCallback(cb3)
return result
def cb1(result, *args, **kw):
#print "m1:", result
if not result[0].startswith("m1 = "):
raise Exception("Unexpected m1 response: " + repr(result))
self.m1 = float(result[0].split("=")[1])
d = self.send_command("m2")
d.addCallback(cb2)
return result
d = self.send_command("m1")
d.addCallback(cb1)
return self.deferred
test_006_001_drive_one.timeout = 30
def Test_007_000_mercury(self):
raise unittest.SkipTest("Not doing this test now (borken)")
global sicsconfig
debug = False
def cb0(result, *args, **kw):
if debug:
print "\nMercury:", result.short_tree()
self.deferred.callback(result)
return result
if not ("mercury_scpi" in sicsconfig.sections() and \
"enabled" in sicsconfig.options("mercury_scpi") and \
sicsconfig.get("mercury_scpi", "enabled") == "True"):
raise unittest.SkipTest("mercury_scpi is not configured")
self.deferred = defer.Deferred()
d = self.load_hipadaba("/sample/%s" % sicsconfig.get("mercury_scpi", "name"))
d.addCallback(cb0)
return self.deferred
Test_007_000_mercury.timeout = 2
def test_008_000_mercury_hipadaba(self):
"""Test the hipadaba tree and mechanism
"""
global hipadaba
debug = False
self.assertTrue('hipadaba' in globals())
self.assertFalse(hipadaba is None, "hipadaba is missing")
if not ('mercury_scpi' in sicsconfig.sections() and \
"enabled" in sicsconfig.options("mercury_scpi") and \
sicsconfig.get("mercury_scpi", "enabled") == "True"):
raise unittest.SkipTest("Mercury TC9 is not configured")
if False:
hipadaba.print_tree()
root_children = hipadaba.getChildren('/')
if debug:
print "Children:", root_children
root_children = [i.lower() for i in root_children]
self.assertTrue('sample' in root_children)
sample_children = hipadaba.getChildren('/sample')
if debug:
print "Children:", sample_children
sample_children = [i.lower() for i in sample_children]
self.assertTrue('tc9' in sample_children)
level_children = hipadaba.getChildren('/sample/tc9/level/')
if debug:
print "Children:", level_children
level_children = [i.lower() for i in level_children]
self.assertTrue('helium' in level_children)
self.assertTrue('nitrogen' in level_children)
he_props = hipadaba.getProperties('/sample/tc9/level/helium')
if debug:
print "Properties:", he_props
he_props = [i.lower() for i in he_props]
self.assertTrue('data' in he_props)
self.assertTrue('control' in he_props)
self.assertTrue('nxsave' in he_props)
self.assertTrue('nxalias' in he_props)
nxalias = hipadaba.getProperty('/sample/tc9/level/helium', 'NXalias')
if debug:
print "Prop:", nxalias
self.assertTrue(nxalias.lower() == 'tc9_level_helium'.lower())
y = hipadaba.getValue('/sample/tc9/Level/helium')
if debug:
print "Value:", y
y = hipadaba.getNode('/sample/tc9/level/helium')
if debug:
print "Found:", hipadaba.short_tree(y)
if debug:
hipadaba.print_tree(y)
text = hipadaba.list_tree(y, props=True, indent=6)
if debug:
print "Tree (/sample/tc9/level/helium):"
for line in text:
print line
class Posit(Able):
timeout = 5
def test_000_000_posit(self):
debug = False
if instrument_name != "taipan":
raise unittest.SkipTest("Cannot posit motor m1, m2: not taipan")
def cb(result, *args, **kw):
if debug:
print "\nposit:", result, args, kw
phase = args[0]
target = 1
if phase == target:
d = self.send_command("m1 positions 10 20 30")
d.addCallback(cb, phase + 1)
return result
target += 1
if phase == target:
d = self.send_command("m1 position_names One Two Three")
d.addCallback(cb, phase + 1)
return result
target += 1
if phase == target:
d = self.send_command("m1 position_names")
d.addCallback(cb, phase + 1)
return result
target += 1
if phase == target:
d = self.send_command("m1 softzero 1.0")
d.addCallback(cb, phase + 1)
return result
target += 1
if phase == target:
d = self.send_command("m1 posit2hard three")
d.addCallback(cb, phase + 1)
return result
target += 1
if phase == target:
d = self.send_command("m1 posit2soft three")
d.addCallback(cb, phase + 1)
return result
target += 1
if phase == target:
d = self.send_command("m1 positions erase")
d.addCallback(cb, phase + 1)
return result
target += 1
if phase == target:
d = self.send_command("m1 softzero 0.0")
d.addCallback(cb, phase + 1)
return result
self.deferred.callback(True)
return result
self.deferred = defer.Deferred();
d = self.send_command("m1 positions erase")
d.addCallback(cb, 1)
return self.deferred
class Python(Able):
timeout = 5
def test_001_000_python(self):
debug = False
def cb(result, *args, **kw):
global python_installed
if debug:
print "\npython:", result, args, kw
phase = args[0]
target = 0
target += 1
if phase == target:
if result[0].startswith("python=obj, type=python,"):
python_installed = True
else:
python_installed = False
self.deferred.callback(True)
return result
self.deferred = defer.Deferred();
d = self.send_command("sicslist python")
d.addCallback(cb, 1)
return self.deferred
def test_002_000_python(self):
debug = False
if debug:
print "Python_Installed:", python_installed
if not python_installed:
raise unittest.SkipTest("Python is not installed")
def cb(result, *args, **kw):
if debug:
print "\npython:", result, args, kw
phase = args[0]
target = 0
target += 1
if phase == target:
if result[0].startswith("Hello Sailor"):
d = self.send_command("python sics.puts(sics.sics(\"clientput Hello World\"))")
d.addCallback(cb, phase + 1)
return result
target += 1
if phase == target:
self.assertTrue(result[0].startswith("Hello World"))
self.deferred.callback(True)
return result
self.deferred = defer.Deferred();
d = self.send_command("python sics.puts(\"Hello Sailor\")")
d.addCallback(cb, 1)
return self.deferred
class Quokka(Able):
skip = True
def test_000_000_motors(self):
global motors
if instrument_name != "quokka":
raise unittest.SkipTest("Instrument is not Quokka")
missing = []
for m in ["samx", "samy", "samz", "det", "detoff", "bsx"]:
if m not in motors:
missing.append(m)
if instrument_name == "taipan" and len(missing) > 0:
raise Exception("Missing motors:" + repr(missing))
class Taipan(Able):
skip = True
def test_000_000_motors(self):
global motors
if instrument_name != "taipan":
raise unittest.SkipTest("Instrument is not Taipan")
missing = []
for m in ["m1", "m2", "s1", "s2", "a1", "a2"]:
if m not in motors:
missing.append(m)
if instrument_name == "taipan" and len(missing) > 0:
raise Exception("Missing motors:" + repr(missing))
class Wombat(Able):
skip = True
def test_000_000_motors(self):
global motors
if instrument_name != "wombat":
raise unittest.SkipTest("Instrument is not Wombat")
missing = []
for m in ["mphi", "mchi", "mom", "mtth", "som", "stth"]:
if m not in motors:
missing.append(m)
if instrument_name == "wombat" and len(missing) > 0:
raise Exception("Missing motors:" + repr(missing))
def test_001_000_newfile(self):
def cb(result, *args, **kw):
self.assertTrue(result[0] == "OK")
return result
d = self.send_command("newfile HISTOGRAM_XY scratch")
d.addCallback(cb)
return d
def test_001_001_save(self):
def cb(result, *args, **kw):
self.assertTrue("OK" in result)
return result
d = self.send_command("save")
d.addCallback(cb)
return d
class Zulu(Able):
timeout = 5
def test_performance(self):
d = self.send_command("performance")
def cb3(result, *args, **kw):
#print "Performance:", result
junk, performance = result[0].split("=")
if not junk.startswith("Performance"):
raise Exception("Can't find Performance")
if float(performance) < 50:
raise Exception("Performance too low: %s" % performance)
if float(performance) > 100:
raise Exception("Performance too high %s" % performance)
d.addCallback(cb3)
return d
#class SicsTestCase(unittest.TestCase):
# callbacks = []
#
# def _test(command):
# d = defer.Deferred()
# self.d = d
# self.client.sendMessage(command)
# return d
#
# def test_status1(self):
# d = _test("status")
# return d
#
# def test_0000(self):
# d = defer.Deferred()
# self.d = d
# def cb2(result, *args, **kw):
# print "Login Callback", result, args
# print "Add cb2", d
# d = d.addCallback(cb2, "Fred2")
# #reactor.callLater(5, d.callback, False)
# return d
#
# def test_status(self):
# d = defer.Deferred()
# self.d = d
# self.client.sendMessage("status")
# #reactor.callLater(5, d.callback, False)
# return d
#
# def add_callback(self, cb):
# if not cb in self.callbacks:
# self.callbacks.append(cb)
#
# def del_callback(self, cb):
# if cb in self.callbacks:
# self.callbacks.remove(cb)
#
# def call_callbacks(self, data):
# d = self.d
# self.d = None
# print "Callback:", d, data
# for cb in self.callbacks:
# cb(self, data)
# if d:
# def cb3(result, *args, **kw):
# print "Xqt Callback", result, args
# print "Add cb3", d
# d.addCallback(cb3)
# print "Call Callback", d
# d.callback(data)
#

View File

@ -0,0 +1,33 @@
#!/usr/bin/env python
# vim: ts=8 sts=4 sw=4 expandtab
# Author: Douglas Clowes (dcl@ansto.gov.au) 2014-06-24
#
import test_sics
class Wombat(test_sics.Baker):
def test_wombat_000_000_motors(self):
if test_sics.instrument_name != "wombat":
raise unittest.SkipTest("Instrument is not Wombat")
missing = []
for m in ["mphi", "mchi", "mom", "mtth", "som", "stth"]:
if m not in test_sics.motors:
missing.append(m)
if test_sics.instrument_name == "wombat" and len(missing) > 0:
raise Exception("Missing motors:" + repr(missing))
def test_wombat_001_000_newfile(self):
def cb(result, *args, **kw):
self.assertTrue(result[0] == "OK")
return result
d = self.send_command("newfile HISTOGRAM_XY scratch")
d.addCallback(cb)
return d
def test_wombat_001_001_save(self):
def cb(result, *args, **kw):
self.assertTrue("OK" in result)
return result
d = self.send_command("save")
d.addCallback(cb)
return d