#!/usr/bin/env python # vim: ts=8 sts=4 sw=4 expandtab # Author: Douglas Clowes (dcl@ansto.gov.au) 2014-04-24 # # Looks like test cases are run alphabetically within classes # and classes are run alphabetically too. # So, Able is the base class derived from Testcase # from sics_proxy import SICSClientFactory, SicsProtocol from sics_hdb import HipadabaTree from twisted.python import log from twisted.trial import unittest from twisted.internet import reactor, protocol, defer import ConfigParser, StringIO from datetime import datetime as dt class Able(unittest.TestCase): def setUp(self): debug = False if debug: log.msg("Connecting ...") self.factory = SICSClientFactory() self.sics = None self.deferred = defer.Deferred() def cb_login(client, *args, **kw): if debug: print "cb_login:", client, args, kw if 'instrument_name' not in globals(): d = self.get_instrument_name() d.addCallback(cb_login, "MyName") elif 'simulation_type' not in globals(): d = self.get_simulation_type() d.addCallback(cb_login, "MySIM") elif 'test_suite' not in globals(): d = self.load_test_suite() d.addCallback(cb_login, "Test_Suite") elif 'sicsconfig' not in globals(): d = self.load_sicsconfig() d.addCallback(cb_login, "SICS_Config") elif 'hipadaba' not in globals(): d = self.load_hipadaba() d.addCallback(cb_login, "HiPaDaBa") else: self.deferred.callback(True) return client def cb_connect(client, *args, **kw): if debug: print "cb_connect:", client, client.login, args, kw self.sics = client self.sics.factory = self.factory self.sics.login_deferred.addCallback(cb_login, "Login", Fred="Fred") if debug: log.msg("Connected") return client creator = protocol.ClientCreator(reactor, SicsProtocol) d = creator.connectTCP("localhost", 60003) d = d.addCallback(cb_connect) return self.deferred def send_command(self, command): def cb(result, *args, **kw): log.msg("Result: " + repr(result)) log.msg("Args: " + repr(args)) log.msg("Kw: " + repr(kw)) return result log.msg("Command: " + repr(command)) d = self.sics.sendMessage(command) d.addCallback(cb) return d def get_instrument_name(self): debug = False def cb1(result, *args, **kw): global instrument_name if debug: print "instrument:", result, args, kw self.assertTrue(result[0].lower().startswith("instrument = ")) instrument_name = result[0].split("=")[1].strip().lower() if debug: print "Instrument:", instrument_name if instrument_name == "quokka": Quokka.skip = False if instrument_name == "taipan": Taipan.skip = False if instrument_name == "wombat": Wombat.skip = False return result d = self.send_command("instrument") d.addCallback(cb1) return d def get_simulation_type(self): debug = False def cb1(result, *args, **kw): global simulation_type if debug: print "SICS_SIMULATION:", result, args, kw self.assertTrue(result[0].lower().startswith("sics_simulation = ")) simulation_type = result[0].split("=")[1].strip().lower() if debug: print "SICS_SIMULATION:", simulation_type return result d = self.send_command("sics_simulation") d.addCallback(cb1) return d def load_test_suite(self): debug = False def cb1(result, *args, **kw): global test_suite if debug: print "fileeval:", result, args, kw if result[0] != "OK": raise Exception("fileeval returned " + repr(result)) test_suite = True return result d = self.send_command("fileeval TEST_SICS/unit_tests/test_suite.tcl") d.addCallback(cb1) return d def load_sicsconfig(self): debug = False def cb2(result, *args, **kw): global sicsconfig with open("sicsconfig.ini", "w") as outfile: for line in result: outfile.write(line + '\n') if debug: print "config:", result, args, kw self.assertFalse(result[0].startswith("can't read")) self.assertFalse(result[0].startswith("... No dictionary ...")) ini = StringIO.StringIO("\n".join(result)) if debug: print "ini:", ini.readlines() ini.seek(0) print "ini:", ini.read() ini.seek(0) print "ini:", dir(ini) sicsconfig = ConfigParser.SafeConfigParser() sicsconfig.readfp(ini) return result d = self.send_command("tcl:test_suite::show_config ::config_dict") d.addCallback(cb2) return d def load_hipadaba(self): def cb0(result, *args, **kw): global hipadaba with open("hipadaba.xml", "w") as outfile: for line in result: outfile.write(line + '\n') if result[0][0] != '<': raise Exception("Tree does not start with left angle:" + result[0]) if result[-1] != '': raise Exception("Tree does not end with :" + result[-1]) self.hipadaba = HipadabaTree('\n'.join(result)) hipadaba = self.hipadaba return result d = self.send_command("tcl:test_suite::getsicsxml /") d.addCallback(cb0) return d def tearDown(self): #print "tearDownModule" if self.sics is not None: self.sics.login_deferred = None self.sics.command_deferred = None self.sics.transport.loseConnection() self.sics = None self.deferred = None class Baker(Able): timeout = 5 def test_000_000_login(self): debug = False def cb0(result, *args, **kw): if debug: print "Login:", result, args, kw if self.sics.login != 2: raise Exception("Bad login:" + repr(self.sics.login)) return result d = self.sics.login_deferred d.addCallback(cb0) return d test_000_000_login.timeout = 30 def test_000_001_fileeval(self): self.assertTrue('test_suite' in globals()) self.assertTrue(test_suite) def test_000_002_config(self): debug = False self.assertTrue('sicsconfig' in globals()) if debug: print "config:", repr(sicsconfig) print "config:", dir(sicsconfig) for section in sorted(sicsconfig.sections()): print ("[%s]\n" % section), for option in sorted(sicsconfig.options(section)): print ("%s = %s\n" % (option, sicsconfig.get(section, option))), print def test_000_003_hipadaba(self): debug = False self.assertTrue('hipadaba' in globals()) def test_001_001_clock(self): debug = False self.deferred = defer.Deferred() def cb2(result, *args, **kw): if debug: print "Clock:", result formatted = dt.fromtimestamp(1399866027).strftime("%Y-%b-%d %H:%M:%S") self.assertEqual(result[0], formatted) self.deferred.callback(None) return result def cb3(result, *args, **kw): if debug: print "Clock:", result if not result[0].isdigit(): raise Exception("Bad response:" + repr(result)) if not 1399865694 <= int(result[0]) < 1499865694: raise Exception("Bad value:" + repr(result)) d = self.send_command("tcl:clock format 1399866027 -format \"%Y-%h-%d %T\"") d.addCallback(cb2) return result d = self.send_command("tcl:clock seconds") d.addCallback(cb3) return self.deferred def test_001_002_hipadaba(self): debug = False self.assertTrue('hipadaba' in globals()) def test_001_003_status(self): d = self.send_command("status") def cb3(result, *args, **kw): #print "Status:", result if not result[0].startswith("status = "): raise Exception("Bad status:" + repr(result)) return result d.addCallback(cb3) return d def Test_002_000_dir(self): raise unittest.SkipTest("Not doing this test now") self.deferred = defer.Deferred() d = self.send_command("dir types") self.result = 0 def cb2(result, *args, **kw): self.result += 1 result = sorted([i.strip() for i in result]) #print "Types:", result for i in range(len(result)): result[i] = " ".join(sorted(result[i].split())) #print "Types:", result if len(self.types) > 0: type = self.types[0] del self.types[0] d = self.send_command("sicslist type %s" % type) d.addCallback(cb2) else: #print "Dir:", self.result self.deferred.callback(None) return result def cb3(result, *args, **kw): self.types = sorted([i.strip().replace(" ", "_") for i in result]) #print "Types:", self.types if len(self.types) > 0: type = self.types[0] del self.types[0] d = self.send_command("sicslist type %s" % type) d.addCallback(cb2) else: print "Dir: None" self.deferred.callback(None) return result d.addCallback(cb3) return self.deferred def test_003_000_motors(self): global motors motors = [] d = self.send_command("sicslist type motor") def cb3(result, *args, **kw): global motors motors = sorted(result[0].lower().split()) #print "Motors:", len(motors) missing = [] for m in ["m1", "m2", "s1", "s2", "a1", "a2"]: if m not in motors: missing.append(m) if instrument_name == "taipan" and len(missing) > 0: raise Exception("Missing motors:" + repr(missing)) return result d.addCallback(cb3) return d def test_003_001_motors(self): debug = False if instrument_name != "taipan": raise unittest.SkipTest("Cannot test motor m1, m2: not taipan") def cb(result, *args, **kw): if debug: print "\nLV:", result, args, kw phase = args[0] target = 0 target += 1 if phase == target: if result[0].startswith("SIMULATE"): d = self.send_command("m2 send lv") d.addCallback(cb, phase + 1) else: raise unittest.SkipTest("Motor is not fake") return result target += 1 if phase == target: if result[0].startswith("SIMULATE"): d = self.send_command("s1 send lv") d.addCallback(cb, phase + 1) else: raise unittest.SkipTest("Motor is not fake") return result target += 1 if phase == target: self.assertTrue(result[0].startswith("SIMULATE")) self.deferred.callback(True) return result self.deferred = defer.Deferred(); d = self.send_command("m1 send lv") d.addCallback(cb, 1) return self.deferred def test_004_000_gumtreexml(self): #raise unittest.SkipTest("Not doing this test now") d = self.send_command("getgumtreexml /") def cb3(result, *args, **kw): global gumtreexml #print "XML:", len(result) with open("gumtree.xml", "w") as outfile: for line in result: outfile.write(line + '\n') if "" != result[-1]: raise Exception("XML is not complete, ends:" + repr(result[-1])) txt = [] for line in result: txt.append(''.join(c for c in line if ord(c) >= 32)) gumtreexml = HipadabaTree('\n'.join(txt)).root return result d.addCallback(cb3) return d def test_004_001_gumtreexml(self): self.assertTrue(gumtreexml.attrib == {}) id_list = [] for child in gumtreexml.findall('component'): id_list.append(child.attrib['id']) #print child, child.tag, child.attrib #for grandchild in child.findall('component'): # print " ", grandchild.attrib self.assertTrue('commands' in id_list) self.assertTrue('instrument' in id_list) self.assertTrue('sample' in id_list) self.assertTrue('monitor' in id_list) self.assertTrue('user' in id_list) self.assertTrue('experiment' in id_list) self.assertTrue('control' in id_list) def test_004_002_gumtreexml(self): global sicsconfig if not (\ 'pfeiffer_hg' in sicsconfig.sections() and \ "enabled" in sicsconfig.options("pfeiffer_hg") and \ sicsconfig.get("pfeiffer_hg", "enabled") == "True"): raise unittest.SkipTest("Pfeiffer PS9 is not configured") child = gumtreexml for node in "/sample/ps9/".strip('/').split('/'): children = [ch for ch in child.findall('component') if ch.attrib['id'].lower() == node.lower()] self.assertTrue(len(children) > 0) child = children[0] self.assertTrue(len(child.attrib) > 0) self.assertTrue(len([ch.attrib['id'] for ch in child.findall('component')]) > 0) self.assertTrue(len([(ch.attrib['id'], [v.text for v in ch.findall('value')]) for ch in child.findall('property')]) > 0) def test_004_003_gumtreexmlvalues(self): #raise unittest.SkipTest("Not doing this test now") d = self.send_command("getgumtreexmlvalues /") def cb3(result, *args, **kw): global gumtreexml #print "XML:", len(result) if "" != result[-1]: raise Exception("XML is not complete, ends:" + repr(result[-1])) txt = [] for line in result: txt.append(''.join(c for c in line if ord(c) >= 32)) gumtreexml = HipadabaTree('\n'.join(txt)).root return result d.addCallback(cb3) return d def test_004_004_gumtreexmlvalues(self): self.assertTrue(gumtreexml.attrib == {}) id_list = [] for child in gumtreexml.findall('component'): id_list.append(child.attrib['id']) #print child, child.tag, child.attrib #for grandchild in child.findall('component'): # print " ", grandchild.attrib self.assertTrue('commands' in id_list) self.assertTrue('instrument' in id_list) self.assertTrue('sample' in id_list) self.assertTrue('monitor' in id_list) self.assertTrue('user' in id_list) self.assertTrue('experiment' in id_list) self.assertTrue('control' in id_list) def Test_005_000_drive_skip(self): """Skipping example Skips on the basis that the equipment to be tested is not configured """ global motors if not "ds9" in motors: raise unittest.SkipTest("Cannot drive motor ds9: does not exist") d = self.send_command("drive ds9 0") def cb3(result, *args, **kw): #print "Drive:", result if "Driving finished successfully" != result[-1]: raise Exception("Drive is not complete, ends:" + repr(result[-1])) return result d.addCallback(cb3) return d def test_006_000_drive_zero(self): """Drive to where the motors already are Should finish quickly """ if instrument_name != "taipan": raise unittest.SkipTest("Cannot drive motor m1, m2: not taipan") self.deferred = defer.Deferred() def cb3(result, *args, **kw): #print "Drive:", result if "Driving finished successfully" != result[-1]: raise Exception("Drive is not complete, ends:" + repr(result[-1])) for item in result: if item.startswith("New m1 position:"): self.assertTrue(round(float(item.split(":")[1]) - self.m1, 3) == 0) if item.startswith("New m2 position:"): self.assertTrue(round(float(item.split(":")[1]) - self.m2, 3) == 0) self.deferred.callback(True) return result def cb2(result, *args, **kw): #print "m2:", result if not result[0].startswith("m2 = "): raise Exception("Unexpected m2 response: " + repr(result)) self.m2 = float(result[0].split("=")[1]) d = self.send_command("drive m1 %.3f m2 %.3f" % (self.m1, self.m2)) d.addCallback(cb3) return result def cb1(result, *args, **kw): #print "m1:", result if not result[0].startswith("m1 = "): raise Exception("Unexpected m1 response: " + repr(result)) self.m1 = float(result[0].split("=")[1]) d = self.send_command("m2") d.addCallback(cb2) return result d = self.send_command("m1") d.addCallback(cb1) return self.deferred test_006_000_drive_zero.timeout = 5 def test_006_001_drive_one(self): """Drive to where the motors already are Should finish quickly """ if instrument_name != "taipan": raise unittest.SkipTest("Cannot drive motor m1, m2: not taipan") self.deferred = defer.Deferred() def cb3(result, *args, **kw): #print "Drive:", result if "Driving finished successfully" != result[-1]: raise Exception("Drive is not complete, ends:" + repr(result[-1])) for item in result: if item.startswith("New m1 position:"): self.assertTrue(round(float(item.split(":")[1]) - self.m1, 2) == 0, "Motor m1 position %f not equal target %f" % (float(item.split(":")[1]), self.m1)) if item.startswith("New m2 position:"): self.assertTrue(round(float(item.split(":")[1]) - self.m2, 2) == 0, "Motor m2 position %f not equal target %f" % (float(item.split(":")[1]), self.m2)) self.deferred.callback(True) return result def cb2(result, *args, **kw): #print "m2:", result if not result[0].startswith("m2 = "): raise Exception("Unexpected m2 response: " + repr(result)) self.m2 = float(result[0].split("=")[1]) if self.m1 > 20.1: self.m1 = round(self.m1 - 1, 0) else: self.m1 = round(self.m1 + 1, 0) if self.m2 > 20.1: self.m2 = round(self.m2 - 1, 0) else: self.m2 = round(self.m2 + 1, 0) d = self.send_command("drive m1 %.3f m2 %.3f" % (self.m1, self.m2)) d.addCallback(cb3) return result def cb1(result, *args, **kw): #print "m1:", result if not result[0].startswith("m1 = "): raise Exception("Unexpected m1 response: " + repr(result)) self.m1 = float(result[0].split("=")[1]) d = self.send_command("m2") d.addCallback(cb2) return result d = self.send_command("m1") d.addCallback(cb1) return self.deferred test_006_001_drive_one.timeout = 30 def Test_007_000_mercury(self): raise unittest.SkipTest("Not doing this test now (borken)") global sicsconfig debug = False def cb0(result, *args, **kw): if debug: print "\nMercury:", result.short_tree() self.deferred.callback(result) return result if not ("mercury_scpi" in sicsconfig.sections() and \ "enabled" in sicsconfig.options("mercury_scpi") and \ sicsconfig.get("mercury_scpi", "enabled") == "True"): raise unittest.SkipTest("mercury_scpi is not configured") self.deferred = defer.Deferred() d = self.load_hipadaba("/sample/%s" % sicsconfig.get("mercury_scpi", "name")) d.addCallback(cb0) return self.deferred Test_007_000_mercury.timeout = 2 def test_008_000_mercury_hipadaba(self): """Test the hipadaba tree and mechanism """ global hipadaba debug = False self.assertTrue('hipadaba' in globals()) self.assertFalse(hipadaba is None, "hipadaba is missing") if not ('mercury_scpi' in sicsconfig.sections() and \ "enabled" in sicsconfig.options("mercury_scpi") and \ sicsconfig.get("mercury_scpi", "enabled") == "True"): raise unittest.SkipTest("Mercury TC9 is not configured") if False: hipadaba.print_tree() root_children = hipadaba.getChildren('/') if debug: print "Children:", root_children root_children = [i.lower() for i in root_children] self.assertTrue('sample' in root_children) sample_children = hipadaba.getChildren('/sample') if debug: print "Children:", sample_children sample_children = [i.lower() for i in sample_children] self.assertTrue('tc9' in sample_children) level_children = hipadaba.getChildren('/sample/tc9/level/') if debug: print "Children:", level_children level_children = [i.lower() for i in level_children] self.assertTrue('helium' in level_children) self.assertTrue('nitrogen' in level_children) he_props = hipadaba.getProperties('/sample/tc9/level/helium') if debug: print "Properties:", he_props he_props = [i.lower() for i in he_props] self.assertTrue('data' in he_props) self.assertTrue('control' in he_props) self.assertTrue('nxsave' in he_props) self.assertTrue('nxalias' in he_props) nxalias = hipadaba.getProperty('/sample/tc9/level/helium', 'NXalias') if debug: print "Prop:", nxalias self.assertTrue(nxalias.lower() == 'tc9_level_helium'.lower()) y = hipadaba.getValue('/sample/tc9/Level/helium') if debug: print "Value:", y y = hipadaba.getNode('/sample/tc9/level/helium') if debug: print "Found:", hipadaba.short_tree(y) if debug: hipadaba.print_tree(y) text = hipadaba.list_tree(y, props=True, indent=6) if debug: print "Tree (/sample/tc9/level/helium):" for line in text: print line class Posit(Able): timeout = 5 def test_000_000_posit(self): debug = False if instrument_name != "taipan": raise unittest.SkipTest("Cannot posit motor m1, m2: not taipan") def cb(result, *args, **kw): if debug: print "\nposit:", result, args, kw phase = args[0] target = 1 if phase == target: d = self.send_command("m1 positions 10 20 30") d.addCallback(cb, phase + 1) return result target += 1 if phase == target: d = self.send_command("m1 position_names One Two Three") d.addCallback(cb, phase + 1) return result target += 1 if phase == target: d = self.send_command("m1 position_names") d.addCallback(cb, phase + 1) return result target += 1 if phase == target: d = self.send_command("m1 softzero 1.0") d.addCallback(cb, phase + 1) return result target += 1 if phase == target: d = self.send_command("m1 posit2hard three") d.addCallback(cb, phase + 1) return result target += 1 if phase == target: d = self.send_command("m1 posit2soft three") d.addCallback(cb, phase + 1) return result target += 1 if phase == target: d = self.send_command("m1 positions erase") d.addCallback(cb, phase + 1) return result target += 1 if phase == target: d = self.send_command("m1 softzero 0.0") d.addCallback(cb, phase + 1) return result self.deferred.callback(True) return result self.deferred = defer.Deferred(); d = self.send_command("m1 positions erase") d.addCallback(cb, 1) return self.deferred class Python(Able): timeout = 5 def test_001_000_python(self): debug = False def cb(result, *args, **kw): global python_installed if debug: print "\npython:", result, args, kw phase = args[0] target = 0 target += 1 if phase == target: if result[0].startswith("python=obj, type=python,"): python_installed = True else: python_installed = False self.deferred.callback(True) return result self.deferred = defer.Deferred(); d = self.send_command("sicslist python") d.addCallback(cb, 1) return self.deferred def test_002_000_python(self): debug = False if debug: print "Python_Installed:", python_installed if not python_installed: raise unittest.SkipTest("Python is not installed") def cb(result, *args, **kw): if debug: print "\npython:", result, args, kw phase = args[0] target = 0 target += 1 if phase == target: if result[0].startswith("Hello Sailor"): d = self.send_command("python sics.puts(sics.sics(\"clientput Hello World\"))") d.addCallback(cb, phase + 1) return result target += 1 if phase == target: self.assertTrue(result[0].startswith("Hello World")) self.deferred.callback(True) return result self.deferred = defer.Deferred(); d = self.send_command("python sics.puts(\"Hello Sailor\")") d.addCallback(cb, 1) return self.deferred class Quokka(Able): skip = True def test_000_000_motors(self): global motors if instrument_name != "quokka": raise unittest.SkipTest("Instrument is not Quokka") missing = [] for m in ["samx", "samy", "samz", "det", "detoff", "bsx"]: if m not in motors: missing.append(m) if instrument_name == "taipan" and len(missing) > 0: raise Exception("Missing motors:" + repr(missing)) class Taipan(Able): skip = True def test_000_000_motors(self): global motors if instrument_name != "taipan": raise unittest.SkipTest("Instrument is not Taipan") missing = [] for m in ["m1", "m2", "s1", "s2", "a1", "a2"]: if m not in motors: missing.append(m) if instrument_name == "taipan" and len(missing) > 0: raise Exception("Missing motors:" + repr(missing)) class Wombat(Able): skip = True def test_000_000_motors(self): global motors if instrument_name != "wombat": raise unittest.SkipTest("Instrument is not Wombat") missing = [] for m in ["mphi", "mchi", "mom", "mtth", "som", "stth"]: if m not in motors: missing.append(m) if instrument_name == "wombat" and len(missing) > 0: raise Exception("Missing motors:" + repr(missing)) def test_001_000_newfile(self): def cb(result, *args, **kw): self.assertTrue(result[0] == "OK") return result d = self.send_command("newfile HISTOGRAM_XY scratch") d.addCallback(cb) return d def test_001_001_save(self): def cb(result, *args, **kw): self.assertTrue("OK" in result) return result d = self.send_command("save") d.addCallback(cb) return d class Zulu(Able): timeout = 5 def test_performance(self): d = self.send_command("performance") def cb3(result, *args, **kw): #print "Performance:", result junk, performance = result[0].split("=") if not junk.startswith("Performance"): raise Exception("Can't find Performance") if float(performance) < 50: raise Exception("Performance too low: %s" % performance) if float(performance) > 100: raise Exception("Performance too high %s" % performance) d.addCallback(cb3) return d #class SicsTestCase(unittest.TestCase): # callbacks = [] # # def _test(command): # d = defer.Deferred() # self.d = d # self.client.sendMessage(command) # return d # # def test_status1(self): # d = _test("status") # return d # # def test_0000(self): # d = defer.Deferred() # self.d = d # def cb2(result, *args, **kw): # print "Login Callback", result, args # print "Add cb2", d # d = d.addCallback(cb2, "Fred2") # #reactor.callLater(5, d.callback, False) # return d # # def test_status(self): # d = defer.Deferred() # self.d = d # self.client.sendMessage("status") # #reactor.callLater(5, d.callback, False) # return d # # def add_callback(self, cb): # if not cb in self.callbacks: # self.callbacks.append(cb) # # def del_callback(self, cb): # if cb in self.callbacks: # self.callbacks.remove(cb) # # def call_callbacks(self, data): # d = self.d # self.d = None # print "Callback:", d, data # for cb in self.callbacks: # cb(self, data) # if d: # def cb3(result, *args, **kw): # print "Xqt Callback", result, args # print "Add cb3", d # d.addCallback(cb3) # print "Call Callback", d # d.callback(data) #