#!/usr/bin/env python # vim: ts=8 sts=4 sw=4 expandtab # Author: Douglas Clowes (dcl@ansto.gov.au) 2014-04-24 # # Looks like test cases are run alphabetically within classes # and classes are run alphabetically too. # So, Able is the base class derived from Testcase # from sics_proxy import SICSClientFactory, SicsProtocol from sics_hdb import HipadabaTree from twisted.trial import unittest from twisted.internet import reactor, protocol, defer import ConfigParser, StringIO from datetime import datetime as dt class Able(unittest.TestCase): def setUp(self): debug = False if debug: print "Connecting ...", self.factory = SICSClientFactory() self.sics = None self.deferred = defer.Deferred() def cb_login(client, *args, **kw): if debug: print "cb_login:", client, args, kw if 'test_suite' not in globals(): d = self.load_test_suite() d.addCallback(cb_login, "Test_Suite") elif 'sicsconfig' not in globals(): d = self.load_sicsconfig() d.addCallback(cb_login, "SICS_Config") elif 'hipadaba' not in globals(): d = self.load_hipadaba() d.addCallback(cb_login, "HiPaDaBa") else: self.deferred.callback(True) def cb_connect(client, *args, **kw): if debug: print "cb_connect:", client, client.login, args, kw self.sics = client self.sics.factory = self.factory self.sics.login_deferred.addCallback(cb_login, "Login", Fred="Fred") if debug: print "Connected" creator = protocol.ClientCreator(reactor, SicsProtocol) d = creator.connectTCP("localhost", 60003) d = d.addCallback(cb_connect) return self.deferred def send_command(self, command): d = self.sics.sendMessage(command) #print "Command:", command, d return d def load_test_suite(self): debug = False def cb1(result, *args, **kw): global test_suite if debug: print "fileeval:", result, args, kw if result[0] != "OK": raise Exception("fileeval returned " + repr(result)) test_suite = True d = self.send_command("fileeval TEST_SICS/unit_tests/test_suite.tcl") d.addCallback(cb1) return d def load_sicsconfig(self): debug = False def cb2(result, *args, **kw): global sicsconfig with open("sicsconfig.ini", "w") as outfile: for line in result: outfile.write(line + '\n') if debug: print "config:", result, args, kw ini = StringIO.StringIO("\n".join(result)) if debug: print "ini:", ini.readlines() ini.seek(0) print "ini:", ini.read() ini.seek(0) print "ini:", dir(ini) sicsconfig = ConfigParser.SafeConfigParser() sicsconfig.readfp(ini) d = self.send_command("tcl:test_suite::show_config ::config_dict") d.addCallback(cb2) return d def load_hipadaba(self): def cb0(result, *args, **kw): global hipadaba with open("hipadaba.xml", "w") as outfile: for line in result: outfile.write(line + '\n') if result[0][0] != '<': raise Exception("Tree does not start with left angle:" + result[0]) if result[-1] != '': raise Exception("Tree does not end with :" + result[-1]) self.hipadaba = HipadabaTree('\n'.join(result)) hipadaba = self.hipadaba d = self.send_command("tcl:test_suite::getsicsxml /") d.addCallback(cb0) return d def tearDown(self): #print "tearDownModule" if self.sics is not None: self.sics.login_deferred = None self.sics.command_deferred = None self.sics.transport.loseConnection() self.sics = None self.deferred = None class Baker(Able): timeout = 5 def test_000_000_login(self): debug = False def cb0(result, *args, **kw): if debug: print "Login:", result, args, kw if self.sics.login != 2: raise Exception("Bad login:" + repr(self.sics.login)) d = self.sics.login_deferred d.addCallback(cb0) return d test_000_000_login.timeout = 10 def test_000_001_fileeval(self): self.assertTrue('test_suite' in globals()) self.assertTrue(test_suite) def test_000_002_config(self): debug = False self.assertTrue('sicsconfig' in globals()) if debug: print "config:", repr(sicsconfig) print "config:", dir(sicsconfig) for section in sorted(sicsconfig.sections()): print ("[%s]\n" % section), for option in sorted(sicsconfig.options(section)): print ("%s = %s\n" % (option, sicsconfig.get(section, option))), print def test_000_003_hipadaba(self): debug = False self.assertTrue('hipadaba' in globals()) def test_001_001_clock(self): debug = False self.deferred = defer.Deferred() def cb2(result, *args, **kw): if debug: print "Clock:", result formatted = dt.fromtimestamp(1399866027).strftime("%Y-%b-%d %H:%M:%S") self.assertEqual(result[0], formatted) self.deferred.callback(None) def cb3(result, *args, **kw): if debug: print "Clock:", result if not result[0].isdigit(): raise Exception("Bad response:" + repr(result)) if not 1399865694 <= int(result[0]) < 1499865694: raise Exception("Bad value:" + repr(result)) d = self.send_command("tcl:clock format 1399866027 -format \"%Y-%h-%d %T\"") d.addCallback(cb2) d = self.send_command("tcl:clock seconds") d.addCallback(cb3) return self.deferred def test_001_002_hipadaba(self): debug = False self.assertTrue('hipadaba' in globals()) def test_001_003_status(self): d = self.send_command("status") def cb3(result, *args, **kw): #print "Status:", result if not result[0].startswith("status = "): raise Exception("Bad status:" + repr(result)) d.addCallback(cb3) return d def Test_002_000_dir(self): raise unittest.SkipTest("Not doing this test now") self.deferred = defer.Deferred() d = self.send_command("dir types") self.result = 0 def cb2(result, *args, **kw): self.result += 1 result = sorted([i.strip() for i in result]) #print "Types:", result for i in range(len(result)): result[i] = " ".join(sorted(result[i].split())) #print "Types:", result if len(self.types) > 0: type = self.types[0] del self.types[0] d = self.send_command("sicslist type %s" % type) d.addCallback(cb2) else: #print "Dir:", self.result self.deferred.callback(None) def cb3(result, *args, **kw): self.types = sorted([i.strip().replace(" ", "_") for i in result]) #print "Types:", self.types if len(self.types) > 0: type = self.types[0] del self.types[0] d = self.send_command("sicslist type %s" % type) d.addCallback(cb2) else: print "Dir: None" self.deferred.callback(None) d.addCallback(cb3) return self.deferred def test_003_000_motors(self): global motors motors = [] d = self.send_command("sicslist type motor") def cb3(result, *args, **kw): global motors motors = sorted(result[0].lower().split()) #print "Motors:", len(motors) missing = [] for m in ["m1", "m2", "s1", "s2", "a1", "a2"]: if m not in motors: missing.append(m) if len(missing) > 0: raise Exception("Missing motors:" + repr(missing)) d.addCallback(cb3) return d def test_004_000_gumtreexml(self): #raise unittest.SkipTest("Not doing this test now") d = self.send_command("getgumtreexml /") def cb3(result, *args, **kw): global gumtreexml #print "XML:", len(result) with open("gumtree.xml", "w") as outfile: for line in result: outfile.write(line + '\n') if "" != result[-1]: raise Exception("XML is not complete, ends:" + repr(result[-1])) txt = [] for line in result: txt.append(''.join(c for c in line if ord(c) >= 32)) gumtreexml = HipadabaTree('\n'.join(txt)).root d.addCallback(cb3) return d def test_004_001_gumtreexml(self): self.assertTrue(gumtreexml.attrib == {}) id_list = [] for child in gumtreexml.findall('component'): id_list.append(child.attrib['id']) #print child, child.tag, child.attrib #for grandchild in child.findall('component'): # print " ", grandchild.attrib self.assertTrue('commands' in id_list) self.assertTrue('instrument' in id_list) self.assertTrue('sample' in id_list) self.assertTrue('monitor' in id_list) self.assertTrue('user' in id_list) self.assertTrue('experiment' in id_list) self.assertTrue('control' in id_list) def test_004_002_gumtreexml(self): global sicsconfig if not ('pfeiffer_hg' in sicsconfig.sections() and sicsconfig.get("pfeiffer_hg", "enabled") == "True"): raise unittest.SkipTest("Pfeiffer PS9 is not configured") child = gumtreexml for node in "/sample/ps9/".strip('/').split('/'): children = [ch for ch in child.findall('component') if ch.attrib['id'].lower() == node.lower()] self.assertTrue(len(children) > 0) child = children[0] self.assertTrue(len(child.attrib) > 0) self.assertTrue(len([ch.attrib['id'] for ch in child.findall('component')]) > 0) self.assertTrue(len([(ch.attrib['id'], [v.text for v in ch.findall('value')]) for ch in child.findall('property')]) > 0) def test_004_003_gumtreexmlvalues(self): #raise unittest.SkipTest("Not doing this test now") d = self.send_command("getgumtreexmlvalues /") def cb3(result, *args, **kw): global gumtreexml #print "XML:", len(result) if "" != result[-1]: raise Exception("XML is not complete, ends:" + repr(result[-1])) txt = [] for line in result: txt.append(''.join(c for c in line if ord(c) >= 32)) gumtreexml = HipadabaTree('\n'.join(txt)).root d.addCallback(cb3) return d def test_004_004_gumtreexmlvalues(self): self.assertTrue(gumtreexml.attrib == {}) id_list = [] for child in gumtreexml.findall('component'): id_list.append(child.attrib['id']) #print child, child.tag, child.attrib #for grandchild in child.findall('component'): # print " ", grandchild.attrib self.assertTrue('commands' in id_list) self.assertTrue('instrument' in id_list) self.assertTrue('sample' in id_list) self.assertTrue('monitor' in id_list) self.assertTrue('user' in id_list) self.assertTrue('experiment' in id_list) self.assertTrue('control' in id_list) def Test_005_000_drive_skip(self): """Skipping example Skips on the basis that the equipment to be tested is not configured """ global motors if not "ds9" in motors: raise unittest.SkipTest("Cannot drive motor ds9: does not exist") d = self.send_command("drive ds9 0") def cb3(result, *args, **kw): #print "Drive:", result if "Driving finished successfully" != result[-1]: raise Exception("Drive is not complete, ends:" + repr(result[-1])) d.addCallback(cb3) return d def test_006_000_drive_zero(self): """Drive to where the motors already are Should finish quickly """ self.deferred = defer.Deferred() def cb3(result, *args, **kw): #print "Drive:", result if "Driving finished successfully" != result[-1]: raise Exception("Drive is not complete, ends:" + repr(result[-1])) for item in result: if item.startswith("New m1 position:"): self.assertTrue(round(float(item.split(":")[1]) - self.m1, 3) == 0) if item.startswith("New m2 position:"): self.assertTrue(round(float(item.split(":")[1]) - self.m2, 3) == 0) self.deferred.callback(True) def cb2(result, *args, **kw): #print "m2:", result if not result[0].startswith("m2 = "): raise Exception("Unexpected m2 response: " + repr(result)) self.m2 = float(result[0].split("=")[1]) d = self.send_command("drive m1 %.3f m2 %.3f" % (self.m1, self.m2)) d.addCallback(cb3) def cb1(result, *args, **kw): #print "m1:", result if not result[0].startswith("m1 = "): raise Exception("Unexpected m1 response: " + repr(result)) self.m1 = float(result[0].split("=")[1]) d = self.send_command("m2") d.addCallback(cb2) d = self.send_command("m1") d.addCallback(cb1) return self.deferred test_006_000_drive_zero.timeout = 5 def test_006_001_drive_one(self): """Drive to where the motors already are Should finish quickly """ self.deferred = defer.Deferred() def cb3(result, *args, **kw): #print "Drive:", result if "Driving finished successfully" != result[-1]: raise Exception("Drive is not complete, ends:" + repr(result[-1])) for item in result: if item.startswith("New m1 position:"): self.assertTrue(round(float(item.split(":")[1]) - self.m1, 2) == 0, "Motor m1 position %f not equal target %f" % (float(item.split(":")[1]), self.m1)) if item.startswith("New m2 position:"): self.assertTrue(round(float(item.split(":")[1]) - self.m2, 2) == 0, "Motor m2 position %f not equal target %f" % (float(item.split(":")[1]), self.m2)) self.deferred.callback(True) def cb2(result, *args, **kw): #print "m2:", result if not result[0].startswith("m2 = "): raise Exception("Unexpected m2 response: " + repr(result)) self.m2 = float(result[0].split("=")[1]) if self.m1 > 20.1: self.m1 = round(self.m1 - 1, 0) else: self.m1 = round(self.m1 + 1, 0) if self.m2 > 20.1: self.m2 = round(self.m2 - 1, 0) else: self.m2 = round(self.m2 + 1, 0) d = self.send_command("drive m1 %.3f m2 %.3f" % (self.m1, self.m2)) d.addCallback(cb3) def cb1(result, *args, **kw): #print "m1:", result if not result[0].startswith("m1 = "): raise Exception("Unexpected m1 response: " + repr(result)) self.m1 = float(result[0].split("=")[1]) d = self.send_command("m2") d.addCallback(cb2) d = self.send_command("m1") d.addCallback(cb1) return self.deferred test_006_001_drive_one.timeout = 30 def Test_007_000_mercury(self): raise unittest.SkipTest("Not doing this test now (borken)") global sicsconfig debug = False def cb0(result, *args, **kw): if debug: print "\nMercury:", result.short_tree() self.deferred.callback(result) if not ("mercury_scpi" in sicsconfig.sections() and sicsconfig.get("mercury_scpi", "enabled") == "True"): raise unittest.SkipTest("mercury_scpi is not configured") self.deferred = defer.Deferred() d = self.load_hipadaba("/sample/%s" % sicsconfig.get("mercury_scpi", "name")) d.addCallback(cb0) return self.deferred Test_007_000_mercury.timeout = 2 def test_008_000_mercury_hipadaba(self): """Test the hipadaba tree and mechanism """ global hipadaba debug = False self.assertTrue('hipadaba' in globals()) self.assertFalse(hipadaba is None, "hipadaba is missing") if not ('mercury_scpi' in sicsconfig.sections() and sicsconfig.get("mercury_scpi", "enabled") == "True"): raise unittest.SkipTest("Mercury TC9 is not configured") if False: hipadaba.print_tree() root_children = hipadaba.getChildren('/') if debug: print "Children:", root_children root_children = [i.lower() for i in root_children] self.assertTrue('sample' in root_children) sample_children = hipadaba.getChildren('/sample') if debug: print "Children:", sample_children sample_children = [i.lower() for i in sample_children] self.assertTrue('tc9' in sample_children) level_children = hipadaba.getChildren('/sample/tc9/level/') if debug: print "Children:", level_children level_children = [i.lower() for i in level_children] self.assertTrue('helium' in level_children) self.assertTrue('nitrogen' in level_children) he_props = hipadaba.getProperties('/sample/tc9/level/helium') if debug: print "Properties:", he_props he_props = [i.lower() for i in he_props] self.assertTrue('data' in he_props) self.assertTrue('control' in he_props) self.assertTrue('nxsave' in he_props) self.assertTrue('nxalias' in he_props) nxalias = hipadaba.getProperty('/sample/tc9/level/helium', 'NXalias') if debug: print "Prop:", nxalias self.assertTrue(nxalias.lower() == 'tc9_level_helium'.lower()) y = hipadaba.getValue('/sample/tc9/Level/helium') if debug: print "Value:", y y = hipadaba.getNode('/sample/tc9/level/helium') if debug: print "Found:", hipadaba.short_tree(y) if debug: hipadaba.print_tree(y) text = hipadaba.list_tree(y, props=True, indent=6) if debug: print "Tree (/sample/tc9/level/helium):" for line in text: print line class Zulu(Able): def test_performance(self): d = self.send_command("performance") def cb3(result, *args, **kw): #print "Performance:", result junk, performance = result[0].split("=") if not junk.startswith("Performance"): raise Exception("Can't find Performance") if float(performance) < 50: raise Exception("Performance too low: %s" % performance) if float(performance) > 100: raise Exception("Performance too high %s" % performance) d.addCallback(cb3) return d #class SicsTestCase(unittest.TestCase): # callbacks = [] # # def _test(command): # d = defer.Deferred() # self.d = d # self.client.sendMessage(command) # return d # # def test_status1(self): # d = _test("status") # return d # # def test_0000(self): # d = defer.Deferred() # self.d = d # def cb2(result, *args, **kw): # print "Login Callback", result, args # print "Add cb2", d # d = d.addCallback(cb2, "Fred2") # #reactor.callLater(5, d.callback, False) # return d # # def test_status(self): # d = defer.Deferred() # self.d = d # self.client.sendMessage("status") # #reactor.callLater(5, d.callback, False) # return d # # def add_callback(self, cb): # if not cb in self.callbacks: # self.callbacks.append(cb) # # def del_callback(self, cb): # if cb in self.callbacks: # self.callbacks.remove(cb) # # def call_callbacks(self, data): # d = self.d # self.d = None # print "Callback:", d, data # for cb in self.callbacks: # cb(self, data) # if d: # def cb3(result, *args, **kw): # print "Xqt Callback", result, args # print "Add cb3", d # d.addCallback(cb3) # print "Call Callback", d # d.callback(data) #