diff --git a/site_ansto/instrument/TEST_SICS/unit_tests/sics_test.py b/site_ansto/instrument/TEST_SICS/unit_tests/sics_test.py new file mode 100644 index 00000000..500c2907 --- /dev/null +++ b/site_ansto/instrument/TEST_SICS/unit_tests/sics_test.py @@ -0,0 +1,373 @@ +#!/usr/bin/env python +# vim: ts=8 sts=4 sw=4 expandtab +# Author: Douglas Clowes (dcl@ansto.gov.au) 2014-04-24 +# +# Looks like test cases are run alphabetically within classes +# and classes are run alphabetically too. +# So, Able is the base class derived from Testcase +# +from sics_proxy import SICSClientFactory, SicsProtocol +from sics_hdb import HipadabaTree + +from twisted.trial import unittest +from twisted.internet import reactor, protocol, defer + +import ConfigParser, StringIO + + +class Able(unittest.TestCase): + def setUp(self): + #print "Connecting ...", + self.factory = SICSClientFactory() + self.sics = None + self.deferred = defer.Deferred() + def cb_login(client, *args, **kw): + #print "cb_login:", client, args, kw + self.deferred.callback(True) + def cb_connect(client, *args, **kw): + #print "cb_connect:", client, client.login, args, kw + self.sics = client + self.sics.factory = self.factory + self.sics.login_deferred.addCallback(cb_login) + #print "Connected" + creator = protocol.ClientCreator(reactor, SicsProtocol) + d = creator.connectTCP("localhost", 60003) + d = d.addCallback(cb_connect) + return self.deferred + + def send_command(self, command): + d = self.sics.sendMessage(command) + #print "Command:", command, d + return d + + def load_hipadaba(self): + def cb0(result, *args, **kw): + with open("junksh.xml", "w") as outfile: + for line in result: + outfile.write(line + '\n') + if result[0][0] != '<': + raise Exception("Tree does not start with left angle:" + result[0]) + if result[-1] != '': + raise Exception("Tree does not end with :" + result[-1]) + self.hipadaba = HipadabaTree('\n'.join(result)) + self.tree_deferred.callback(self.hipadaba) + self.tree_deferred = defer.Deferred() + d = self.send_command("getgumtreexml /") + d.addCallback(cb0) + return self.tree_deferred + + def tearDown(self): + #print "tearDownModule" + if self.sics is not None: + self.sics.login_deferred = None + self.sics.command_deferred = None + self.sics.transport.loseConnection() + self.sics = None + self.deferred = None + +class Baker(Able): + timeout = 5 + def test_000_login(self): + debug = False + self.deferred = defer.Deferred() + d = self.sics.login_deferred + def cb0(result, *args, **kw): + if debug: + print "Login:", result, args, kw + if self.sics.login != 2: + raise Exception("Bad login:" + repr(self.sics.login)) + d = self.send_command("fileeval TEST_SICS/test_suite.tcl") + d.addCallback(cb1) + def cb1(result, *args, **kw): + if debug: + print "fileeval:", result, args, kw + if result[0] != "OK": + raise Exception("fileeval returned " + repr(result)) + d = self.send_command("tcl:test_suite::show_config ::config_dict") + d.addCallback(cb2) + def cb2(result, *args, **kw): + global config + if debug: + print "config:", result, args, kw + ini = StringIO.StringIO("\n".join(result)) + if debug: + print "ini:", ini.readlines() + ini.seek(0) + print "ini:", ini.read() + ini.seek(0) + print "ini:", dir(ini) + config = ConfigParser.SafeConfigParser() + if debug: + print "config:", repr(config) + config.readfp(ini) + if debug: + print "config:", repr(config) + print "config:", dir(config) + for section in sorted(config.sections()): + print ("[%s]\n" % section), + for option in sorted(config.options(section)): + print ("%s = %s\n" % (option, config.get(section, option))), + print + d = self.load_hipadaba() + d.addCallback(cb3) + def cb3(result, *args, **kw): + global hipadaba + hipadaba = result + self.deferred.callback(None) + d.addCallback(cb0) + return self.deferred + + def test_001_status(self): + d = self.send_command("status") + def cb3(result, *args, **kw): + #print "Status:", result + if not result[0].startswith("status = "): + raise Exception("Bad status:" + repr(result)) + d.addCallback(cb3) + return d + + def test_002_dir(self): + raise unittest.SkipTest("Not doing this test now") + self.deferred = defer.Deferred() + d = self.send_command("dir types") + self.result = 0 + def cb2(result, *args, **kw): + self.result += 1 + result = sorted([i.strip() for i in result]) + #print "Types:", result + for i in range(len(result)): + result[i] = " ".join(sorted(result[i].split())) + #print "Types:", result + if len(self.types) > 0: + type = self.types[0] + del self.types[0] + d = self.send_command("sicslist type %s" % type) + d.addCallback(cb2) + else: + #print "Dir:", self.result + self.deferred.callback(None) + def cb3(result, *args, **kw): + self.types = sorted([i.strip().replace(" ", "_") for i in result]) + #print "Types:", self.types + if len(self.types) > 0: + type = self.types[0] + del self.types[0] + d = self.send_command("sicslist type %s" % type) + d.addCallback(cb2) + else: + print "Dir: None" + self.deferred.callback(None) + d.addCallback(cb3) + return self.deferred + + def test_003_motor(self): + global motors + motors = [] + d = self.send_command("sicslist type motor") + def cb3(result, *args, **kw): + global motors + motors = sorted(result[0].lower().split()) + #print "Motors:", len(motors) + missing = [] + for m in ["m1", "m2", "s1", "s2", "a1", "a2"]: + if m not in motors: + missing.append(m) + if len(missing) > 0: + raise Exception("Missing motors:" + repr(missing)) + d.addCallback(cb3) + return d + + def test_004_000_xml(self): + #raise unittest.SkipTest("Not doing this test now") + d = self.send_command("getgumtreexml /") + def cb3(result, *args, **kw): + global root + #print "XML:", len(result) + with open("junk.xml", "w") as outfile: + for line in result: + outfile.write(line + '\n') + if "" != result[-1]: + raise Exception("XML is not complete, ends:" + repr(result[-1])) + txt = [] + for line in result: + txt.append(''.join(c for c in line if ord(c) >= 32)) + root = HipadabaTree('\n'.join(txt)).root + d.addCallback(cb3) + return d + + def test_004_001_xml(self): + print + print "Root:", root, root.attrib + for child in root.findall('component'): + print child, child.tag, child.attrib + #for grandchild in child.findall('component'): + # print " ", grandchild.attrib + + def test_004_002_xml(self): + print + child = root + for node in "/sample/ps9/".strip('/').split('/'): + children = [ch for ch in child.findall('component') if ch.attrib['id'].lower() == node.lower()] + print children + child = children[0] + print child + print child.attrib + print [ch.attrib['id'] for ch in child.findall('component')] + print [(ch.attrib['id'], [v.text for v in ch.findall('value')]) for ch in child.findall('property')] + + def test_005_drive(self): + global motors + if not "ds9" in motors: + raise unittest.SkipTest("Cannot drive motor ds9: does not exist") + d = self.send_command("drive ds9 0") + def cb3(result, *args, **kw): + #print "Drive:", result + if "Driving finished successfully" != result[-1]: + raise Exception("Drive is not complete, ends:" + repr(result[-1])) + d.addCallback(cb3) + return d + + def test_006_drive(self): + d = self.send_command("drive m1 20 m2 20 ") + def cb3(result, *args, **kw): + #print "Drive:", result + if "Driving finished successfully" != result[-1]: + raise Exception("Drive is not complete, ends:" + repr(result[-1])) + d.addCallback(cb3) + return d + + def test_007_mercury(self): + raise unittest.SkipTest("Not doing this test now (borken)") + global config, hipadaba + debug = False + def cb0(result, *args, **kw): + if debug: + print "\nMercury:", result.short_tree() + self.deferred.callback(result) + if "mercury_scpi" in config.sections() and config.get("mercury_scpi", "enabled"): + self.deferred = defer.Deferred() + d = self.load_hipadaba("/sample/%s" % config.get("mercury_scpi", "name")) + d.addCallback(cb0) + return self.deferred + else: + raise unittest.SkipTest("mercury_scpi is not configured") + test_007_mercury.timeout = 2 + + def test_008_mercury_hipadaba(self): + raise unittest.SkipTest("Not doing this test now (borken)") + global hipadaba + debug = False + if hipadaba is None: + raise unittest.SkipTest("hipadaba is missing") + else: + if False: + hipadaba.print_tree() + root_children = hipadaba.getChildren('/') + if debug: + print "Children:", root_children + root_children = [i.lower() for i in root_children] + self.assertTrue('sample' in root_children) + sample_children = hipadaba.getChildren('/sample') + if debug: + print "Children:", sample_children + sample_children = [i.lower() for i in sample_children] + self.assertTrue('tc9' in sample_children) + level_children = hipadaba.getChildren('/sample/tc9/level/') + if debug: + print "Children:", level_children + level_children = [i.lower() for i in level_children] + self.assertTrue('helium' in level_children) + self.assertTrue('nitrogen' in level_children) + he_props = hipadaba.getProperties('/sample/tc9/level/helium') + if debug: + print "Properties:", he_props + he_props = [i.lower() for i in he_props] + self.assertTrue('data' in he_props) + self.assertTrue('control' in he_props) + self.assertTrue('nxsave' in he_props) + self.assertTrue('nxalias' in he_props) + nxalias = hipadaba.getProperty('/sample/tc9/level/helium', 'NXalias') + if debug: + print "Prop:", nxalias + self.assertTrue(nxalias.lower() == 'tc9_level_helium'.lower()) + y = hipadaba.getValue('/sample/tc9/Level/helium') + if debug: + print "Value:", y + y = hipadaba.getNode('/sample/tc9/level/helium') + if debug: + print "Found:", hipadaba.short_tree(y) + if debug: + hipadaba.print_tree(y) + text = hipadaba.list_tree(y, props=True, indent=6) + print + for line in text: + print line + +class Zulu(Able): + def test_performance(self): + d = self.send_command("performance") + def cb3(result, *args, **kw): + #print "Performance:", result + junk, performance = result[0].split("=") + if not junk.startswith("Performance"): + raise Exception("Can't find Performance") + if float(performance) < 50: + raise Exception("Performance too low: %s" % performance) + if float(performance) > 100: + raise Exception("Performance too high %s" % performance) + d.addCallback(cb3) + return d + + +#class SicsTestCase(unittest.TestCase): +# callbacks = [] +# +# def _test(command): +# d = defer.Deferred() +# self.d = d +# self.client.sendMessage(command) +# return d +# +# def test_status1(self): +# d = _test("status") +# return d +# +# def test_0000(self): +# d = defer.Deferred() +# self.d = d +# def cb2(result, *args, **kw): +# print "Login Callback", result, args +# print "Add cb2", d +# d = d.addCallback(cb2, "Fred2") +# #reactor.callLater(5, d.callback, False) +# return d +# +# def test_status(self): +# d = defer.Deferred() +# self.d = d +# self.client.sendMessage("status") +# #reactor.callLater(5, d.callback, False) +# return d +# +# def add_callback(self, cb): +# if not cb in self.callbacks: +# self.callbacks.append(cb) +# +# def del_callback(self, cb): +# if cb in self.callbacks: +# self.callbacks.remove(cb) +# +# def call_callbacks(self, data): +# d = self.d +# self.d = None +# print "Callback:", d, data +# for cb in self.callbacks: +# cb(self, data) +# if d: +# def cb3(result, *args, **kw): +# print "Xqt Callback", result, args +# print "Add cb3", d +# d.addCallback(cb3) +# print "Call Callback", d +# d.callback(data) +#