diff --git a/site_ansto/instrument/TEST_SICS/setup/dingo b/site_ansto/instrument/TEST_SICS/setup/dingo new file mode 100755 index 00000000..b66772af --- /dev/null +++ b/site_ansto/instrument/TEST_SICS/setup/dingo @@ -0,0 +1,22 @@ +#!/bin/bash +echo me:${0} +. $(dirname ${0})/common +status=${?} +echo root:${status}:${root} +if [ ! ${status} ]; then + exit 1 +fi + +gnome-terminal \ + --tab --title="Motors" \ + --working-directory=${root}/fakeGalil \ + --command="${root}/fakeGalil/SIM_GALIL.py -w dingo" \ + --tab --title="Mercury" \ + --working-directory=${root}/fakeTempControl/oxford \ + --command="${root}/fakeTempControl/oxford/SIM_MERCURY.py -w" \ + --tab --title="Pfeiffer" \ + --working-directory=${root}/fakePfeiffer/tpg261 \ + --command="${root}/fakePfeiffer/tpg261/SIM_PFEIFFER.py -w" \ + --tab --title="Bash" \ + --working-directory=${root} \ + --command="bash" diff --git a/site_ansto/instrument/TEST_SICS/setup/echidna b/site_ansto/instrument/TEST_SICS/setup/echidna new file mode 100755 index 00000000..9e36fa4e --- /dev/null +++ b/site_ansto/instrument/TEST_SICS/setup/echidna @@ -0,0 +1,22 @@ +#!/bin/bash +echo me:${0} +. $(dirname ${0})/common +status=${?} +echo root:${status}:${root} +if [ ! ${status} ]; then + exit 1 +fi + +gnome-terminal \ + --tab --title="Motors" \ + --working-directory=${root}/fakeGalil \ + --command="${root}/fakeGalil/SIM_GALIL.py -w echidna" \ + --tab --title="Mercury" \ + --working-directory=${root}/fakeTempControl/oxford \ + --command="${root}/fakeTempControl/oxford/SIM_MERCURY.py -w" \ + --tab --title="Pfeiffer" \ + --working-directory=${root}/fakePfeiffer/tpg261 \ + --command="${root}/fakePfeiffer/tpg261/SIM_PFEIFFER.py -w" \ + --tab --title="Bash" \ + --working-directory=${root} \ + --command="bash" diff --git a/site_ansto/instrument/TEST_SICS/unit_tests/test_sics.py b/site_ansto/instrument/TEST_SICS/unit_tests/test_sics.py new file mode 100644 index 00000000..44e98bee --- /dev/null +++ b/site_ansto/instrument/TEST_SICS/unit_tests/test_sics.py @@ -0,0 +1,841 @@ +#!/usr/bin/env python +# vim: ts=8 sts=4 sw=4 expandtab +# Author: Douglas Clowes (dcl@ansto.gov.au) 2014-04-24 +# +# Looks like test cases are run alphabetically within classes +# and classes are run alphabetically too. +# So, Able is the base class derived from Testcase +# +from sics_proxy import SICSClientFactory, SicsProtocol +from sics_hdb import HipadabaTree + +from twisted.python import log +from twisted.trial import unittest +from twisted.internet import reactor, protocol, defer + +import ConfigParser, StringIO +from datetime import datetime as dt + +class Able(unittest.TestCase): + def setUp(self): + debug = False + if debug: + log.msg("Connecting ...") + self.factory = SICSClientFactory() + self.sics = None + self.deferred = defer.Deferred() + def cb_login(client, *args, **kw): + if debug: + print "cb_login:", client, args, kw + if 'instrument_name' not in globals(): + d = self.get_instrument_name() + d.addCallback(cb_login, "MyName") + elif 'simulation_type' not in globals(): + d = self.get_simulation_type() + d.addCallback(cb_login, "MySIM") + elif 'test_suite' not in globals(): + d = self.load_test_suite() + d.addCallback(cb_login, "Test_Suite") + elif 'sicsconfig' not in globals(): + d = self.load_sicsconfig() + d.addCallback(cb_login, "SICS_Config") + elif 'hipadaba' not in globals(): + d = self.load_hipadaba() + d.addCallback(cb_login, "HiPaDaBa") + else: + self.deferred.callback(True) + return client + def cb_connect(client, *args, **kw): + if debug: + print "cb_connect:", client, client.login, args, kw + self.sics = client + self.sics.factory = self.factory + self.sics.login_deferred.addCallback(cb_login, "Login", Fred="Fred") + if debug: + log.msg("Connected") + return client + creator = protocol.ClientCreator(reactor, SicsProtocol) + d = creator.connectTCP("localhost", 60003) + d = d.addCallback(cb_connect) + return self.deferred + + def send_command(self, command): + def cb(result, *args, **kw): + log.msg("Result: " + repr(result)) + log.msg("Args: " + repr(args)) + log.msg("Kw: " + repr(kw)) + return result + log.msg("Command: " + repr(command)) + d = self.sics.sendMessage(command) + d.addCallback(cb) + return d + + def get_instrument_name(self): + debug = False + def cb1(result, *args, **kw): + global instrument_name + if debug: + print "instrument:", result, args, kw + self.assertTrue(result[0].lower().startswith("instrument = ")) + instrument_name = result[0].split("=")[1].strip().lower() + if debug: + print "Instrument:", instrument_name + if instrument_name == "quokka": + Quokka.skip = False + if instrument_name == "taipan": + Taipan.skip = False + if instrument_name == "wombat": + Wombat.skip = False + return result + d = self.send_command("instrument") + d.addCallback(cb1) + return d + + def get_simulation_type(self): + debug = False + def cb1(result, *args, **kw): + global simulation_type + if debug: + print "SICS_SIMULATION:", result, args, kw + self.assertTrue(result[0].lower().startswith("sics_simulation = ")) + simulation_type = result[0].split("=")[1].strip().lower() + if debug: + print "SICS_SIMULATION:", simulation_type + return result + d = self.send_command("sics_simulation") + d.addCallback(cb1) + return d + + def load_test_suite(self): + debug = False + def cb1(result, *args, **kw): + global test_suite + if debug: + print "fileeval:", result, args, kw + if result[0] != "OK": + raise Exception("fileeval returned " + repr(result)) + test_suite = True + return result + d = self.send_command("fileeval TEST_SICS/unit_tests/test_suite.tcl") + d.addCallback(cb1) + return d + + def load_sicsconfig(self): + debug = False + def cb2(result, *args, **kw): + global sicsconfig + with open("sicsconfig.ini", "w") as outfile: + for line in result: + outfile.write(line + '\n') + if debug: + print "config:", result, args, kw + self.assertFalse(result[0].startswith("can't read")) + self.assertFalse(result[0].startswith("... No dictionary ...")) + ini = StringIO.StringIO("\n".join(result)) + if debug: + print "ini:", ini.readlines() + ini.seek(0) + print "ini:", ini.read() + ini.seek(0) + print "ini:", dir(ini) + sicsconfig = ConfigParser.SafeConfigParser() + sicsconfig.readfp(ini) + return result + d = self.send_command("tcl:test_suite::show_config ::config_dict") + d.addCallback(cb2) + return d + + def load_hipadaba(self): + def cb0(result, *args, **kw): + global hipadaba + with open("hipadaba.xml", "w") as outfile: + for line in result: + outfile.write(line + '\n') + if result[0][0] != '<': + raise Exception("Tree does not start with left angle:" + result[0]) + if result[-1] != '': + raise Exception("Tree does not end with :" + result[-1]) + self.hipadaba = HipadabaTree('\n'.join(result)) + hipadaba = self.hipadaba + return result + d = self.send_command("tcl:test_suite::getsicsxml /") + d.addCallback(cb0) + return d + + def tearDown(self): + #print "tearDownModule" + if self.sics is not None: + self.sics.login_deferred = None + self.sics.command_deferred = None + self.sics.transport.loseConnection() + self.sics = None + self.deferred = None + +class Baker(Able): + timeout = 5 + def test_000_000_login(self): + debug = False + def cb0(result, *args, **kw): + if debug: + print "Login:", result, args, kw + if self.sics.login != 2: + raise Exception("Bad login:" + repr(self.sics.login)) + return result + d = self.sics.login_deferred + d.addCallback(cb0) + return d + test_000_000_login.timeout = 30 + + def test_000_001_fileeval(self): + self.assertTrue('test_suite' in globals()) + self.assertTrue(test_suite) + + def test_000_002_config(self): + debug = False + self.assertTrue('sicsconfig' in globals()) + if debug: + print "config:", repr(sicsconfig) + print "config:", dir(sicsconfig) + for section in sorted(sicsconfig.sections()): + print ("[%s]\n" % section), + for option in sorted(sicsconfig.options(section)): + print ("%s = %s\n" % (option, sicsconfig.get(section, option))), + print + + def test_000_003_hipadaba(self): + debug = False + self.assertTrue('hipadaba' in globals()) + + def test_001_001_clock(self): + debug = False + self.deferred = defer.Deferred() + def cb2(result, *args, **kw): + if debug: + print "Clock:", result + formatted = dt.fromtimestamp(1399866027).strftime("%Y-%b-%d %H:%M:%S") + self.assertEqual(result[0], formatted) + self.deferred.callback(None) + return result + def cb3(result, *args, **kw): + if debug: + print "Clock:", result + if not result[0].isdigit(): + raise Exception("Bad response:" + repr(result)) + if not 1399865694 <= int(result[0]) < 1499865694: + raise Exception("Bad value:" + repr(result)) + d = self.send_command("tcl:clock format 1399866027 -format \"%Y-%h-%d %T\"") + d.addCallback(cb2) + return result + d = self.send_command("tcl:clock seconds") + d.addCallback(cb3) + return self.deferred + + def test_001_002_hipadaba(self): + debug = False + self.assertTrue('hipadaba' in globals()) + + def test_001_003_status(self): + d = self.send_command("status") + def cb3(result, *args, **kw): + #print "Status:", result + if not result[0].startswith("status = "): + raise Exception("Bad status:" + repr(result)) + return result + d.addCallback(cb3) + return d + + def Test_002_000_dir(self): + raise unittest.SkipTest("Not doing this test now") + self.deferred = defer.Deferred() + d = self.send_command("dir types") + self.result = 0 + def cb2(result, *args, **kw): + self.result += 1 + result = sorted([i.strip() for i in result]) + #print "Types:", result + for i in range(len(result)): + result[i] = " ".join(sorted(result[i].split())) + #print "Types:", result + if len(self.types) > 0: + type = self.types[0] + del self.types[0] + d = self.send_command("sicslist type %s" % type) + d.addCallback(cb2) + else: + #print "Dir:", self.result + self.deferred.callback(None) + return result + def cb3(result, *args, **kw): + self.types = sorted([i.strip().replace(" ", "_") for i in result]) + #print "Types:", self.types + if len(self.types) > 0: + type = self.types[0] + del self.types[0] + d = self.send_command("sicslist type %s" % type) + d.addCallback(cb2) + else: + print "Dir: None" + self.deferred.callback(None) + return result + d.addCallback(cb3) + return self.deferred + + def test_003_000_motors(self): + global motors + motors = [] + d = self.send_command("sicslist type motor") + def cb3(result, *args, **kw): + global motors + motors = sorted(result[0].lower().split()) + #print "Motors:", len(motors) + missing = [] + for m in ["m1", "m2", "s1", "s2", "a1", "a2"]: + if m not in motors: + missing.append(m) + if instrument_name == "taipan" and len(missing) > 0: + raise Exception("Missing motors:" + repr(missing)) + return result + d.addCallback(cb3) + return d + + def test_003_001_motors(self): + debug = False + if instrument_name != "taipan": + raise unittest.SkipTest("Cannot test motor m1, m2: not taipan") + def cb(result, *args, **kw): + if debug: + print "\nLV:", result, args, kw + phase = args[0] + target = 0 + target += 1 + if phase == target: + if result[0].startswith("SIMULATE"): + d = self.send_command("m2 send lv") + d.addCallback(cb, phase + 1) + else: + raise unittest.SkipTest("Motor is not fake") + return result + + target += 1 + if phase == target: + if result[0].startswith("SIMULATE"): + d = self.send_command("s1 send lv") + d.addCallback(cb, phase + 1) + else: + raise unittest.SkipTest("Motor is not fake") + return result + + target += 1 + if phase == target: + self.assertTrue(result[0].startswith("SIMULATE")) + + self.deferred.callback(True) + return result + self.deferred = defer.Deferred(); + d = self.send_command("m1 send lv") + d.addCallback(cb, 1) + return self.deferred + + def test_004_000_gumtreexml(self): + #raise unittest.SkipTest("Not doing this test now") + d = self.send_command("getgumtreexml /") + def cb3(result, *args, **kw): + global gumtreexml + #print "XML:", len(result) + with open("gumtree.xml", "w") as outfile: + for line in result: + outfile.write(line + '\n') + if "" != result[-1]: + raise Exception("XML is not complete, ends:" + repr(result[-1])) + txt = [] + for line in result: + txt.append(''.join(c for c in line if ord(c) >= 32)) + gumtreexml = HipadabaTree('\n'.join(txt)).root + return result + d.addCallback(cb3) + return d + + def test_004_001_gumtreexml(self): + self.assertTrue(gumtreexml.attrib == {}) + id_list = [] + for child in gumtreexml.findall('component'): + id_list.append(child.attrib['id']) + #print child, child.tag, child.attrib + #for grandchild in child.findall('component'): + # print " ", grandchild.attrib + self.assertTrue('commands' in id_list) + self.assertTrue('instrument' in id_list) + self.assertTrue('sample' in id_list) + self.assertTrue('monitor' in id_list) + self.assertTrue('user' in id_list) + self.assertTrue('experiment' in id_list) + self.assertTrue('control' in id_list) + + def test_004_002_gumtreexml(self): + global sicsconfig + if not (\ + 'pfeiffer_hg' in sicsconfig.sections() and \ + "enabled" in sicsconfig.options("pfeiffer_hg") and \ + sicsconfig.get("pfeiffer_hg", "enabled") == "True"): + raise unittest.SkipTest("Pfeiffer PS9 is not configured") + child = gumtreexml + for node in "/sample/ps9/".strip('/').split('/'): + children = [ch for ch in child.findall('component') if ch.attrib['id'].lower() == node.lower()] + self.assertTrue(len(children) > 0) + child = children[0] + self.assertTrue(len(child.attrib) > 0) + self.assertTrue(len([ch.attrib['id'] for ch in child.findall('component')]) > 0) + self.assertTrue(len([(ch.attrib['id'], [v.text for v in ch.findall('value')]) for ch in child.findall('property')]) > 0) + + def test_004_003_gumtreexmlvalues(self): + #raise unittest.SkipTest("Not doing this test now") + d = self.send_command("getgumtreexmlvalues /") + def cb3(result, *args, **kw): + global gumtreexml + #print "XML:", len(result) + if "" != result[-1]: + raise Exception("XML is not complete, ends:" + repr(result[-1])) + txt = [] + for line in result: + txt.append(''.join(c for c in line if ord(c) >= 32)) + gumtreexml = HipadabaTree('\n'.join(txt)).root + return result + d.addCallback(cb3) + return d + + def test_004_004_gumtreexmlvalues(self): + self.assertTrue(gumtreexml.attrib == {}) + id_list = [] + for child in gumtreexml.findall('component'): + id_list.append(child.attrib['id']) + #print child, child.tag, child.attrib + #for grandchild in child.findall('component'): + # print " ", grandchild.attrib + self.assertTrue('commands' in id_list) + self.assertTrue('instrument' in id_list) + self.assertTrue('sample' in id_list) + self.assertTrue('monitor' in id_list) + self.assertTrue('user' in id_list) + self.assertTrue('experiment' in id_list) + self.assertTrue('control' in id_list) + + def Test_005_000_drive_skip(self): + """Skipping example + + Skips on the basis that the equipment to be tested is not configured + """ + global motors + if not "ds9" in motors: + raise unittest.SkipTest("Cannot drive motor ds9: does not exist") + d = self.send_command("drive ds9 0") + def cb3(result, *args, **kw): + #print "Drive:", result + if "Driving finished successfully" != result[-1]: + raise Exception("Drive is not complete, ends:" + repr(result[-1])) + return result + d.addCallback(cb3) + return d + + def test_006_000_drive_zero(self): + """Drive to where the motors already are + + Should finish quickly + """ + if instrument_name != "taipan": + raise unittest.SkipTest("Cannot drive motor m1, m2: not taipan") + + self.deferred = defer.Deferred() + def cb3(result, *args, **kw): + #print "Drive:", result + if "Driving finished successfully" != result[-1]: + raise Exception("Drive is not complete, ends:" + repr(result[-1])) + for item in result: + if item.startswith("New m1 position:"): + self.assertTrue(round(float(item.split(":")[1]) - self.m1, 3) == 0) + if item.startswith("New m2 position:"): + self.assertTrue(round(float(item.split(":")[1]) - self.m2, 3) == 0) + self.deferred.callback(True) + return result + def cb2(result, *args, **kw): + #print "m2:", result + if not result[0].startswith("m2 = "): + raise Exception("Unexpected m2 response: " + repr(result)) + self.m2 = float(result[0].split("=")[1]) + d = self.send_command("drive m1 %.3f m2 %.3f" % (self.m1, self.m2)) + d.addCallback(cb3) + return result + def cb1(result, *args, **kw): + #print "m1:", result + if not result[0].startswith("m1 = "): + raise Exception("Unexpected m1 response: " + repr(result)) + self.m1 = float(result[0].split("=")[1]) + d = self.send_command("m2") + d.addCallback(cb2) + return result + d = self.send_command("m1") + d.addCallback(cb1) + return self.deferred + test_006_000_drive_zero.timeout = 5 + + def test_006_001_drive_one(self): + """Drive to where the motors already are + + Should finish quickly + """ + if instrument_name != "taipan": + raise unittest.SkipTest("Cannot drive motor m1, m2: not taipan") + + self.deferred = defer.Deferred() + def cb3(result, *args, **kw): + #print "Drive:", result + if "Driving finished successfully" != result[-1]: + raise Exception("Drive is not complete, ends:" + repr(result[-1])) + for item in result: + if item.startswith("New m1 position:"): + self.assertTrue(round(float(item.split(":")[1]) - self.m1, 2) == 0, + "Motor m1 position %f not equal target %f" % (float(item.split(":")[1]), self.m1)) + if item.startswith("New m2 position:"): + self.assertTrue(round(float(item.split(":")[1]) - self.m2, 2) == 0, + "Motor m2 position %f not equal target %f" % (float(item.split(":")[1]), self.m2)) + self.deferred.callback(True) + return result + def cb2(result, *args, **kw): + #print "m2:", result + if not result[0].startswith("m2 = "): + raise Exception("Unexpected m2 response: " + repr(result)) + self.m2 = float(result[0].split("=")[1]) + if self.m1 > 20.1: + self.m1 = round(self.m1 - 1, 0) + else: + self.m1 = round(self.m1 + 1, 0) + if self.m2 > 20.1: + self.m2 = round(self.m2 - 1, 0) + else: + self.m2 = round(self.m2 + 1, 0) + d = self.send_command("drive m1 %.3f m2 %.3f" % (self.m1, self.m2)) + d.addCallback(cb3) + return result + def cb1(result, *args, **kw): + #print "m1:", result + if not result[0].startswith("m1 = "): + raise Exception("Unexpected m1 response: " + repr(result)) + self.m1 = float(result[0].split("=")[1]) + d = self.send_command("m2") + d.addCallback(cb2) + return result + d = self.send_command("m1") + d.addCallback(cb1) + return self.deferred + test_006_001_drive_one.timeout = 30 + + def Test_007_000_mercury(self): + raise unittest.SkipTest("Not doing this test now (borken)") + global sicsconfig + debug = False + def cb0(result, *args, **kw): + if debug: + print "\nMercury:", result.short_tree() + self.deferred.callback(result) + return result + if not ("mercury_scpi" in sicsconfig.sections() and \ + "enabled" in sicsconfig.options("mercury_scpi") and \ + sicsconfig.get("mercury_scpi", "enabled") == "True"): + raise unittest.SkipTest("mercury_scpi is not configured") + self.deferred = defer.Deferred() + d = self.load_hipadaba("/sample/%s" % sicsconfig.get("mercury_scpi", "name")) + d.addCallback(cb0) + return self.deferred + Test_007_000_mercury.timeout = 2 + + def test_008_000_mercury_hipadaba(self): + """Test the hipadaba tree and mechanism + """ + global hipadaba + debug = False + self.assertTrue('hipadaba' in globals()) + self.assertFalse(hipadaba is None, "hipadaba is missing") + if not ('mercury_scpi' in sicsconfig.sections() and \ + "enabled" in sicsconfig.options("mercury_scpi") and \ + sicsconfig.get("mercury_scpi", "enabled") == "True"): + raise unittest.SkipTest("Mercury TC9 is not configured") + + if False: + hipadaba.print_tree() + root_children = hipadaba.getChildren('/') + if debug: + print "Children:", root_children + root_children = [i.lower() for i in root_children] + self.assertTrue('sample' in root_children) + sample_children = hipadaba.getChildren('/sample') + if debug: + print "Children:", sample_children + sample_children = [i.lower() for i in sample_children] + self.assertTrue('tc9' in sample_children) + level_children = hipadaba.getChildren('/sample/tc9/level/') + if debug: + print "Children:", level_children + level_children = [i.lower() for i in level_children] + self.assertTrue('helium' in level_children) + self.assertTrue('nitrogen' in level_children) + he_props = hipadaba.getProperties('/sample/tc9/level/helium') + if debug: + print "Properties:", he_props + he_props = [i.lower() for i in he_props] + self.assertTrue('data' in he_props) + self.assertTrue('control' in he_props) + self.assertTrue('nxsave' in he_props) + self.assertTrue('nxalias' in he_props) + nxalias = hipadaba.getProperty('/sample/tc9/level/helium', 'NXalias') + if debug: + print "Prop:", nxalias + self.assertTrue(nxalias.lower() == 'tc9_level_helium'.lower()) + y = hipadaba.getValue('/sample/tc9/Level/helium') + if debug: + print "Value:", y + y = hipadaba.getNode('/sample/tc9/level/helium') + if debug: + print "Found:", hipadaba.short_tree(y) + if debug: + hipadaba.print_tree(y) + text = hipadaba.list_tree(y, props=True, indent=6) + if debug: + print "Tree (/sample/tc9/level/helium):" + for line in text: + print line + +class Posit(Able): + timeout = 5 + def test_000_000_posit(self): + debug = False + if instrument_name != "taipan": + raise unittest.SkipTest("Cannot posit motor m1, m2: not taipan") + + def cb(result, *args, **kw): + if debug: + print "\nposit:", result, args, kw + phase = args[0] + target = 1 + if phase == target: + d = self.send_command("m1 positions 10 20 30") + d.addCallback(cb, phase + 1) + return result + target += 1 + if phase == target: + d = self.send_command("m1 position_names One Two Three") + d.addCallback(cb, phase + 1) + return result + target += 1 + if phase == target: + d = self.send_command("m1 position_names") + d.addCallback(cb, phase + 1) + return result + target += 1 + if phase == target: + d = self.send_command("m1 softzero 1.0") + d.addCallback(cb, phase + 1) + return result + target += 1 + if phase == target: + d = self.send_command("m1 posit2hard three") + d.addCallback(cb, phase + 1) + return result + target += 1 + if phase == target: + d = self.send_command("m1 posit2soft three") + d.addCallback(cb, phase + 1) + return result + target += 1 + if phase == target: + d = self.send_command("m1 positions erase") + d.addCallback(cb, phase + 1) + return result + target += 1 + if phase == target: + d = self.send_command("m1 softzero 0.0") + d.addCallback(cb, phase + 1) + return result + + self.deferred.callback(True) + return result + self.deferred = defer.Deferred(); + d = self.send_command("m1 positions erase") + d.addCallback(cb, 1) + return self.deferred + +class Python(Able): + timeout = 5 + def test_001_000_python(self): + debug = False + def cb(result, *args, **kw): + global python_installed + if debug: + print "\npython:", result, args, kw + phase = args[0] + target = 0 + target += 1 + if phase == target: + if result[0].startswith("python=obj, type=python,"): + python_installed = True + else: + python_installed = False + + self.deferred.callback(True) + return result + self.deferred = defer.Deferred(); + d = self.send_command("sicslist python") + d.addCallback(cb, 1) + return self.deferred + + def test_002_000_python(self): + debug = False + if debug: + print "Python_Installed:", python_installed + if not python_installed: + raise unittest.SkipTest("Python is not installed") + def cb(result, *args, **kw): + if debug: + print "\npython:", result, args, kw + phase = args[0] + target = 0 + target += 1 + if phase == target: + if result[0].startswith("Hello Sailor"): + d = self.send_command("python sics.puts(sics.sics(\"clientput Hello World\"))") + d.addCallback(cb, phase + 1) + return result + + target += 1 + if phase == target: + self.assertTrue(result[0].startswith("Hello World")) + + self.deferred.callback(True) + return result + self.deferred = defer.Deferred(); + d = self.send_command("python sics.puts(\"Hello Sailor\")") + d.addCallback(cb, 1) + return self.deferred + +class Quokka(Able): + skip = True + def test_000_000_motors(self): + global motors + if instrument_name != "quokka": + raise unittest.SkipTest("Instrument is not Quokka") + missing = [] + for m in ["samx", "samy", "samz", "det", "detoff", "bsx"]: + if m not in motors: + missing.append(m) + if instrument_name == "taipan" and len(missing) > 0: + raise Exception("Missing motors:" + repr(missing)) + +class Taipan(Able): + skip = True + def test_000_000_motors(self): + global motors + if instrument_name != "taipan": + raise unittest.SkipTest("Instrument is not Taipan") + missing = [] + for m in ["m1", "m2", "s1", "s2", "a1", "a2"]: + if m not in motors: + missing.append(m) + if instrument_name == "taipan" and len(missing) > 0: + raise Exception("Missing motors:" + repr(missing)) + +class Wombat(Able): + skip = True + def test_000_000_motors(self): + global motors + if instrument_name != "wombat": + raise unittest.SkipTest("Instrument is not Wombat") + missing = [] + for m in ["mphi", "mchi", "mom", "mtth", "som", "stth"]: + if m not in motors: + missing.append(m) + if instrument_name == "wombat" and len(missing) > 0: + raise Exception("Missing motors:" + repr(missing)) + + def test_001_000_newfile(self): + def cb(result, *args, **kw): + self.assertTrue(result[0] == "OK") + return result + d = self.send_command("newfile HISTOGRAM_XY scratch") + d.addCallback(cb) + return d + + def test_001_001_save(self): + def cb(result, *args, **kw): + self.assertTrue("OK" in result) + return result + d = self.send_command("save") + d.addCallback(cb) + return d + +class Zulu(Able): + timeout = 5 + def test_performance(self): + d = self.send_command("performance") + def cb3(result, *args, **kw): + #print "Performance:", result + junk, performance = result[0].split("=") + if not junk.startswith("Performance"): + raise Exception("Can't find Performance") + if float(performance) < 50: + raise Exception("Performance too low: %s" % performance) + if float(performance) > 100: + raise Exception("Performance too high %s" % performance) + d.addCallback(cb3) + return d + + +#class SicsTestCase(unittest.TestCase): +# callbacks = [] +# +# def _test(command): +# d = defer.Deferred() +# self.d = d +# self.client.sendMessage(command) +# return d +# +# def test_status1(self): +# d = _test("status") +# return d +# +# def test_0000(self): +# d = defer.Deferred() +# self.d = d +# def cb2(result, *args, **kw): +# print "Login Callback", result, args +# print "Add cb2", d +# d = d.addCallback(cb2, "Fred2") +# #reactor.callLater(5, d.callback, False) +# return d +# +# def test_status(self): +# d = defer.Deferred() +# self.d = d +# self.client.sendMessage("status") +# #reactor.callLater(5, d.callback, False) +# return d +# +# def add_callback(self, cb): +# if not cb in self.callbacks: +# self.callbacks.append(cb) +# +# def del_callback(self, cb): +# if cb in self.callbacks: +# self.callbacks.remove(cb) +# +# def call_callbacks(self, data): +# d = self.d +# self.d = None +# print "Callback:", d, data +# for cb in self.callbacks: +# cb(self, data) +# if d: +# def cb3(result, *args, **kw): +# print "Xqt Callback", result, args +# print "Add cb3", d +# d.addCallback(cb3) +# print "Call Callback", d +# d.callback(data) +# diff --git a/site_ansto/instrument/TEST_SICS/unit_tests/test_wombat.py b/site_ansto/instrument/TEST_SICS/unit_tests/test_wombat.py new file mode 100644 index 00000000..bf76f0b0 --- /dev/null +++ b/site_ansto/instrument/TEST_SICS/unit_tests/test_wombat.py @@ -0,0 +1,33 @@ +#!/usr/bin/env python +# vim: ts=8 sts=4 sw=4 expandtab +# Author: Douglas Clowes (dcl@ansto.gov.au) 2014-06-24 +# + +import test_sics + +class Wombat(test_sics.Baker): + def test_wombat_000_000_motors(self): + if test_sics.instrument_name != "wombat": + raise unittest.SkipTest("Instrument is not Wombat") + missing = [] + for m in ["mphi", "mchi", "mom", "mtth", "som", "stth"]: + if m not in test_sics.motors: + missing.append(m) + if test_sics.instrument_name == "wombat" and len(missing) > 0: + raise Exception("Missing motors:" + repr(missing)) + + def test_wombat_001_000_newfile(self): + def cb(result, *args, **kw): + self.assertTrue(result[0] == "OK") + return result + d = self.send_command("newfile HISTOGRAM_XY scratch") + d.addCallback(cb) + return d + + def test_wombat_001_001_save(self): + def cb(result, *args, **kw): + self.assertTrue("OK" in result) + return result + d = self.send_command("save") + d.addCallback(cb) + return d