diff --git a/site_ansto/instrument/TEST_SICS/unit_tests/sics_hdb.py b/site_ansto/instrument/TEST_SICS/unit_tests/sics_hdb.py index 96ad23b7..56505dc7 100644 --- a/site_ansto/instrument/TEST_SICS/unit_tests/sics_hdb.py +++ b/site_ansto/instrument/TEST_SICS/unit_tests/sics_hdb.py @@ -3,7 +3,7 @@ # Author: Douglas Clowes (dcl@ansto.gov.au) 2014-04-24 # """ This is a SICS hipadaba module. - + It allows for loading the gumtree XML obtained from SICS and accessing the tree by path to obtain the children and properties of a node on a named path. @@ -14,34 +14,36 @@ # of our preference (performance) but fall back to a default version. # try: - from lxml import etree - print("running with lxml.etree") + from lxml import etree + print "running with lxml.etree" except ImportError: - try: - # Python 2.5 - import xml.etree.cElementTree as etree - print("running with cElementTree on Python 2.5+") - except ImportError: try: - # Python 2.5 - import xml.etree.ElementTree as etree - print("running with ElementTree on Python 2.5+") + # Python 2.5 + import xml.etree.cElementTree as etree + print "running with cElementTree on Python 2.5+" except ImportError: - try: - # normal cElementTree install - import cElementTree as etree - print("running with cElementTree") - except ImportError: try: - # normal ElementTree install - import elementtree.ElementTree as etree - print("running with ElementTree") + # Python 2.5 + import xml.etree.ElementTree as etree + print "running with ElementTree on Python 2.5+" except ImportError: - print("Failed to import ElementTree from any known place") + try: + # normal cElementTree install + import cElementTree as etree + print "running with cElementTree" + except ImportError: + try: + # normal ElementTree install + import elementtree.ElementTree as etree + print "running with ElementTree" + except ImportError: + print "Failed to import ElementTree from any known place" class HipadabaTree(object): """Hipadaba Tree Class + This can be built from a variety of XML sources and contains + methods for accessing and navigating the SICS hipadaba tree """ def __init__(self, from_this): """Build a Hipadaba Tree object from an 'ElementTree' object @@ -49,19 +51,19 @@ class HipadabaTree(object): Parameters ---------- from_this : XML-like - ElementTree compatible object, or string or array of strings + ElementTree compatible object, or string or list of strings """ if isinstance(from_this, list) and isinstance(from_this[0], str): - root = etree.fromstringlist(from_this) + my_root = etree.fromstringlist(from_this) elif isinstance(from_this, str): if len(from_this) > 0: if from_this[0] == "<": - root = etree.fromstring(from_this) + my_root = etree.fromstring(from_this) elif from_this[0] == "@": - root = etree.parse(from_this[1:]) + my_root = etree.parse(from_this[1:]) elif from_this.lower().endswith(".xml"): - root = etree.parse(from_this) + my_root = etree.parse(from_this) else: raise Exception("unknown string") else: @@ -72,11 +74,21 @@ class HipadabaTree(object): raise Exception("findall not in object") if "find" not in directory: raise Exception("find not in object") - root = from_this - self.root = root + my_root = from_this + self.root = my_root def short_tree(self, tree=None): """Creates a map from the tree/node that can be used or printed + + Parameters + ---------- + tree : optional Element or ElementTree + + Returns + ------- + map containing name(str), + children(list of str), + properties(list of str) """ if tree is None: tree = self.root @@ -84,36 +96,69 @@ class HipadabaTree(object): tree_part = {"name": ""} else: tree_part = {"name": tree.attrib['id']} - tree_part["children"] = sorted([ch.attrib['id'] for ch in tree.findall('component')]) - tree_part["properties"] = sorted([ch.attrib['id'] for ch in tree.findall('property')]) + tree_part["children"] = sorted( + [comp.attrib['id'] for comp in tree.findall('component')]) + tree_part["properties"] = sorted( + [propy.attrib['id'] for propy in tree.findall('property')]) return tree_part def find_path(self, path, tree=None): """Finds a node from the given tree and path + + Parameters + ---------- + path : str + tree : optional Element or ElementTree + + Returns + ------- + requested Element or None """ debug = False if tree is None: tree = self.root if debug: print "Looking for %s in %s" % (repr(path), self.short_tree(tree)) - path_list = [p for p in path.lower().strip('/').split('/') if p is not ''] + path_list = [propy + for propy in path.lower().strip('/').split('/') + if propy is not ''] if path_list == ['']: if debug: - print "Finding root: %s in %s" % (repr(path_list), self.short_tree(tree)) + print "Finding root: %s in %s" % ( + repr(path_list), + self.short_tree(tree)) return tree child = tree for node in path_list: - children = [ch for ch in child.findall('component') if ch.attrib['id'].lower() == node.lower()] + children = [comp + for comp in child.findall('component') + if comp.attrib['id'].lower() == node.lower()] if len(children) == 0: if debug: - print "Not found: %s in %s" % (repr(path_list), self.short_tree(tree)) + print "Not found: %s in %s" % ( + repr(path_list), + self.short_tree(tree)) return None child = children[0] if debug: - print "Finding node: %s in %s" % (repr(path_list), self.short_tree(child)) + print "Finding node: %s in %s" % ( + repr(path_list), + self.short_tree(child)) return child def list_tree(self, tree=None, props=False, indent=0): + """Returns a list of indented strings for a tree + + Parameters + ---------- + tree : optional Element or ElementTree + props : optional boolean to include node properties + indent : optional nesting/indent depth/level + + Returns + ------- + list of str ready to print or write to file + """ if tree is None: tree = self.root text = [] @@ -122,20 +167,40 @@ class HipadabaTree(object): else: text += [' '*indent + '* ' + tree.attrib['id']] if props: - properties = sorted(tree.findall('property'), key=lambda node: node.attrib['id'].lower()) - for prop in properties: - line = ' '*indent + ' - ' + prop.attrib['id'] + '=' - items = [ch.text for ch in prop.findall('value') if ch.text is not None] + properties = sorted( + tree.findall('property'), + key=lambda node: node.attrib['id'].lower()) + for propy in properties: + line = ' '*indent + ' - ' + propy.attrib['id'] + '=' + items = [valu.text + for valu in propy.findall('value') + if valu.text is not None] line += ' '.join(items) text += [line] - children = sorted(tree.findall('component'), key=lambda node: node.attrib['id'].lower()) + children = sorted( + tree.findall('component'), + key=lambda node: node.attrib['id'].lower()) for child in children: text += self.list_tree(child, props, indent+1) return text def print_tree(self, tree=None, props=False, indent=0): - for line in self.list_tree(tree, props, indent): + """Print the indented tree to stdout + + Parameters + ---------- + tree : optional Element or ElementTree + props : optional boolean to include node properties + indent : optional nesting/indent depth/level + + Returns + ------- + list of str ready to print or write to file + """ + line_list = self.list_tree(tree, props, indent) + for line in line_list: print line + return line_list def getNode(self, path, tree=None): return self.find_path(path, tree) @@ -144,20 +209,26 @@ class HipadabaTree(object): node = self.find_path(path, tree) if node is None: return None - items = [ch.text for ch in node.findall('value') if ch.text is not None] + items = [valu.text + for valu in node.findall('value') + if valu.text is not None] if len(items) == 0: return "" return ','.join(items) - def getProperty(self, path, property, tree=None): + def getProperty(self, path, the_property, tree=None): node = self.find_path(path, tree) if node is None: return None - items = [ch for ch in node.findall('property') if ch.attrib['id'].lower() == property.lower()] + items = [propy + for propy in node.findall('property') + if propy.attrib['id'].lower() == the_property.lower()] if len(items) == 0: return None node = items[0] - items = [ch.text for ch in node.findall('value') if ch.text is not None] + items = [valu.text + for valu in node.findall('value') + if valu.text is not None] if len(items) == 0: return "" return ','.join(items) @@ -165,43 +236,52 @@ class HipadabaTree(object): def getProperties(self, path, tree=None): node = self.find_path(path, tree) if node is not None: - return sorted([ch.attrib['id'] for ch in node.findall('property')]) + return sorted( + [propy.attrib['id'] for propy in node.findall('property')]) return None def getChildren(self, path, tree=None): node = self.find_path(path, tree) if node is not None: - return sorted([ch.attrib['id'] for ch in node.findall('component')]) + return sorted( + [comp.attrib['id'] for comp in node.findall('component')]) return None -if __name__ == "__main__": - """this test uses a "junk.xml" file extracted for SICS - """ - filename = "junk.xml" - fd = open(filename, "r") - lines = fd.readlines() - print "From file:", HipadabaTree(filename).short_tree() - print "From list:", HipadabaTree(lines).short_tree() - print "From text:", HipadabaTree('\n'.join(lines)).short_tree() +def main_program(): + ###this test uses a "junk.xml" file extracted for SICS + ### + test_file = "junk.xml" + with open(test_file, "r") as input_file: + line_list = input_file.readlines() + print "From file:", HipadabaTree(test_file).short_tree() + print "From list:", HipadabaTree(line_list).short_tree() + print "From text:", HipadabaTree('\n'.join(line_list)).short_tree() root = etree.parse("junk.xml") hipadaba = HipadabaTree(root) print "From tree:", hipadaba.short_tree() - print "Components:", [ch.attrib['id'] for ch in hipadaba.root.findall('component')] - print "Components:", [ch.attrib['id'] for ch in hipadaba.getNode('/sample/ps9').findall('component')] + print "Components:", [ch.attrib['id'] + for ch in hipadaba.root.findall('component')] + print "Components:", [ch.attrib['id'] + for ch in hipadaba.getNode('/sample/ps9').findall('component')] print "GetNode:", hipadaba.short_tree(hipadaba.getNode('/sample/ps9')) print "PrintTree:" - hipadaba.print_tree(tree=hipadaba.getNode('/sample/ps9/')) - hipadaba.print_tree(tree=hipadaba.getNode('/sample/ps9/status'), props=True) + hipadaba.print_tree( + tree=hipadaba.getNode('/sample/ps9/')) + hipadaba.print_tree( + tree=hipadaba.getNode('/sample/ps9/status'), props=True) print "GetChildren:", hipadaba.getChildren('') print "GetChildren:", hipadaba.getChildren('////') print "GetChildren:", hipadaba.getChildren('//sample///ps9/////') print "GetProperties:", hipadaba.getProperties('/sample/ps9') print "Properties from root:" - for p in hipadaba.getProperties('/sample/ps9'): - prop = hipadaba.getProperty('/sample/ps9', p) - print ' ', p, '=', prop - n = hipadaba.getNode('/sample/ps9') + for property_name in hipadaba.getProperties('/sample/ps9'): + property_value = hipadaba.getProperty('/sample/ps9', property_name) + print ' ', property_name, '=', property_value + node = hipadaba.getNode('/sample/ps9') print "Properties from node:" - for p in hipadaba.getProperties('/', n): - prop = hipadaba.getProperty('/', p, n) - print ' ', p, '=', prop + for property_name in hipadaba.getProperties('/', node): + property_value = hipadaba.getProperty('/', property_name, node) + print ' ', property_name, '=', property_value + +if __name__ == "__main__": + main_program() diff --git a/site_ansto/instrument/TEST_SICS/unit_tests/sics_proxy.py b/site_ansto/instrument/TEST_SICS/unit_tests/sics_proxy.py index 09c8cdba..1fd4d81c 100644 --- a/site_ansto/instrument/TEST_SICS/unit_tests/sics_proxy.py +++ b/site_ansto/instrument/TEST_SICS/unit_tests/sics_proxy.py @@ -1,6 +1,13 @@ #!/usr/bin/env python # vim: ts=8 sts=4 sw=4 expandtab # Author: Douglas Clowes (dcl@ansto.gov.au) 2014-04-24 +""" SICS proxy client for use by unit testing modules + + This class provides for connection to a running SICS instance, + including login. + It also provides wrapping of transactions and accumulation + of a full response with deferred handling and callback +""" from twisted.python import log from twisted.internet import reactor, defer from twisted.internet.task import LoopingCall @@ -20,6 +27,8 @@ class SicsProtocol(Protocol): self.login = 0 self.login_deferred = defer.Deferred() self.command_deferred = None + self.command = None + self.line_list = [] def lineReceived(self, line): if self.login == 0: @@ -48,12 +57,14 @@ class SicsProtocol(Protocol): def connectionMade(self): if self.verbose: - print "connectionMade", self.transport.getHost(), self.transport.getPeer() + print "connectionMade", + print self.transport.getHost(), + print self.transport.getPeer() self.login = 0 self.port = self.transport.getHost().port MyConnections[self.port] = self if self.verbose: - print "MyConnections:",MyConnections + print "MyConnections:", MyConnections def connectionLost(self, reason): if self.verbose: @@ -62,14 +73,13 @@ class SicsProtocol(Protocol): self.login = 0 self.port = None if self.verbose: - print "MyConnections:",MyConnections + print "MyConnections:", MyConnections def loginComplete(self, line): if self.verbose: print "LoginComplete" self.login_deferred.callback(line) self.factory.call_callbacks(None) - pass def sendLine(self, m): if self.verbose: @@ -106,24 +116,28 @@ class SICSClientFactory(ReconnectingClientFactory): if self.verbose: print 'Connected to:', addr self.resetDelay() - p = SicsProtocol() + my_proto = SicsProtocol() if self.verbose: - p.verbose = True - p.factory = self - p.delimiter = '\n' - self.device = p - return p + my_proto.verbose = True + my_proto.factory = self + my_proto.delimiter = '\n' + self.device = my_proto + return my_proto def clientConnectionLost(self, connector, reason): if self.verbose: - print 'Lost connection', connector.getDestination().port,'Reason:', reason + print 'Lost connection', + print connector.getDestination().port, + print 'Reason:', reason self.device = None - ReconnectingClientFactory.clientConnectionLost(self, connector, reason) + ReconnectingClientFactory.clientConnectionLost(self, + connector, reason) def clientConnectionFailed(self, connector, reason): if self.verbose: print 'Connection failed. Reason:', reason - ReconnectingClientFactory.clientConnectionFailed(self, connector,reason) + ReconnectingClientFactory.clientConnectionFailed(self, + connector, reason) def sendMessage(self, m): if self.device: @@ -147,7 +161,7 @@ class SICSClientFactory(ReconnectingClientFactory): for cb in self.callbacks: cb(self, data) -if __name__ == "__main__": +def main_program(): Verbose = False def cbf(server, data): if Verbose: @@ -168,10 +182,12 @@ if __name__ == "__main__": if Verbose: factory.verbose = True factory.add_callback(cbf) - connector = reactor.connectTCP("localhost", 60003, factory) + my_connector = reactor.connectTCP("localhost", 60003, factory) if True: print "Factory:", dir(factory) - print "Destination:", connector.getDestination() - print "Connector:", dir(connector) + print "Destination:", my_connector.getDestination() + print "Connector:", dir(my_connector) reactor.run() +if __name__ == "__main__": + main_program()