diff --git a/site_ansto/instrument/TEST_SICS/unit_tests/sics_hdb.py b/site_ansto/instrument/TEST_SICS/unit_tests/sics_hdb.py new file mode 100644 index 00000000..9c54f308 --- /dev/null +++ b/site_ansto/instrument/TEST_SICS/unit_tests/sics_hdb.py @@ -0,0 +1,194 @@ +#!/usr/bin/env python +# vim: ts=8 sts=4 sw=4 expandtab +# Author: Douglas Clowes (dcl@ansto.gov.au) 2014-04-24 +# +""" This is a SICS hipadaba module. + + It allows for loading the gumtree XML obtained from SICS and + accessing the tree by path to obtain the children and properties + of a node on a named path. +""" + +# +# Try to import an ElementTree compatible XML library in the order +# of our preference (performance) but fall back to a default version. +# +try: + from lxml import etree + print("running with lxml.etree") +except ImportError: + try: + # Python 2.5 + import xml.etree.cElementTree as etree + print("running with cElementTree on Python 2.5+") + except ImportError: + try: + # Python 2.5 + import xml.etree.ElementTree as etree + print("running with ElementTree on Python 2.5+") + except ImportError: + try: + # normal cElementTree install + import cElementTree as etree + print("running with cElementTree") + except ImportError: + try: + # normal ElementTree install + import elementtree.ElementTree as etree + print("running with ElementTree") + except ImportError: + print("Failed to import ElementTree from any known place") + +class HipadabaTree(object): + """Hipadaba Tree Class + + """ + def __init__(self, from_this): + """Build a Hipadaba Tree object from an 'ElementTree' object + + Parameters + ---------- + from_this : XML-like + ElementTree compatible object, or string or array of strings + + """ + if isinstance(from_this, list) and isinstance(from_this[0], str): + root = etree.fromstringlist(from_this) + elif isinstance(from_this, str): + if len(from_this) > 0: + if from_this[0] == "<": + root = etree.fromstring(from_this) + elif from_this[0] == "@": + root = etree.parse(from_this[1:]) + elif from_this.lower().endswith(".xml"): + root = etree.parse(from_this) + else: + raise Exception("unknown string") + else: + raise Exception("short string") + else: + directory = dir(from_this) + if "findall" not in directory: + raise Exception("findall not in object") + if "find" not in directory: + raise Exception("find not in object") + root = from_this + self.root = root + + def short_tree(self, tree=None): + """ + """ + if tree is None: + tree = self.root + if isinstance(tree, type(self.root)): + tree_part = {"name": ""} + else: + tree_part = {"name": tree.attrib['id']} + tree_part["children"] = [ch.attrib['id'] for ch in tree.findall('component')] + tree_part["properties"] = [ch.attrib['id'] for ch in tree.findall('property')] + return tree_part + + def find_path(self, path, tree=None): + debug = True + if tree is None: + tree = self.root + if debug: + print "Looking for %s in %s" % (repr(path), self.short_tree(tree)) + path_list = [p for p in path.lower().strip('/').split('/') if p is not ''] + if path_list == ['']: + if debug: + print "Finding root: %s in %s" % (repr(path_list), self.short_tree(tree)) + return tree + child = tree + for node in path_list: + children = [ch for ch in child.findall('component') if ch.attrib['id'].lower() == node.lower()] + if len(children) == 0: + if debug: + print "Not found: %s in %s" % (repr(path_list), self.short_tree(tree)) + return None + child = children[0] + if debug: + print "Finding node: %s in %s" % (repr(path_list), self.short_tree(child)) + return child + + def list_tree(self, tree=None, props=False, indent=0): + if tree is None: + tree = self.root + text = [] + if isinstance(tree, type(self.root)): + text += [' '*indent + '* ' + ''] + else: + text += [' '*indent + '* ' + tree.attrib['id']] + if props: + properties = sorted(tree.findall('property'), key=lambda node: node.attrib['id'].lower()) + for prop in properties: + line = ' '*indent + ' - ' + prop.attrib['id'] + '=' + items = [ch.text for ch in prop.findall('value') if ch.text is not None] + line += ' '.join(items) + text += [line] + children = sorted(tree.findall('component'), key=lambda node: node.attrib['id'].lower()) + for child in children: + text += self.list_tree(child, props, indent+1) + return text + + def print_tree(self, tree=None, props=False, indent=0): + for line in self.list_tree(tree, props, indent): + print line + + def getNode(self, path, tree=None): + return self.find_path(path, tree) + + def getProperty(self, path, property, tree=None): + node = self.find_path(path, tree) + if node is None: + return None + items = [ch for ch in node.findall('property') if ch.attrib['id'].lower() == property.lower()] + if len(items) == 0: + return None + node = items[0] + items = [ch.text for ch in node.findall('value') if ch.text is not None] + if len(items) == 0: + return "" + return ' '.join(items) + + def getProperties(self, path, tree=None): + node = self.find_path(path, tree) + if node is not None: + return sorted([ch.attrib['id'] for ch in node.findall('property')]) + return None + + def getChildren(self, path, tree=None): + node = self.find_path(path, tree) + if node: + return sorted([ch.attrib['id'] for ch in node.findall('component')]) + return None + +if __name__ == "__main__": + filename = "junk.xml" + fd = open(filename, "r") + lines = fd.readlines() + print "From file:", HipadabaTree(filename).short_tree() + print "From list:", HipadabaTree(lines).short_tree() + print "From text:", HipadabaTree('\n'.join(lines)).short_tree() + root = etree.parse("junk.xml") + hipadaba = HipadabaTree(root) + print "From tree:", hipadaba.short_tree() + print "Components:", [ch.attrib['id'] for ch in hipadaba.root.findall('component')] + print "Components:", [ch.attrib['id'] for ch in hipadaba.getNode('/sample/ps9').findall('component')] + print "GetNode:", hipadaba.short_tree(hipadaba.getNode('/sample/ps9')) + print "PrintTree:" + hipadaba.print_tree(tree=hipadaba.getNode('/sample/ps9/')) + hipadaba.print_tree(tree=hipadaba.getNode('/sample/ps9/status'), props=True) + print "GetChildren:", hipadaba.getChildren('') + print "GetChildren:", hipadaba.getChildren('////') + print "GetChildren:", hipadaba.getChildren('//sample///ps9/////') + print "GetProperties:", hipadaba.getProperties('/sample/ps9') + print "Properties from root:" + for p in hipadaba.getProperties('/sample/ps9'): + prop = hipadaba.getProperty('/sample/ps9', p) + print ' ', p, '=', prop + n = hipadaba.getNode('/sample/ps9') + print "Properties from node:" + for p in hipadaba.getProperties('/', n): + prop = hipadaba.getProperty('/', p, n) + print ' ', p, '=', prop diff --git a/site_ansto/instrument/TEST_SICS/unit_tests/sics_proxy.py b/site_ansto/instrument/TEST_SICS/unit_tests/sics_proxy.py new file mode 100644 index 00000000..09c8cdba --- /dev/null +++ b/site_ansto/instrument/TEST_SICS/unit_tests/sics_proxy.py @@ -0,0 +1,177 @@ +#!/usr/bin/env python +# vim: ts=8 sts=4 sw=4 expandtab +# Author: Douglas Clowes (dcl@ansto.gov.au) 2014-04-24 +from twisted.python import log +from twisted.internet import reactor, defer +from twisted.internet.task import LoopingCall +from twisted.internet.protocol import Protocol +from twisted.internet.protocol import ReconnectingClientFactory + +MyConnections = {} +Commands = ['status', 'dir types', 'sicslist a2'] + +class SicsProtocol(Protocol): + def __init__(self): + self.factory = None + self.buffer = "" + self.delimiter = "\n" + self.verbose = False + self.port = None + self.login = 0 + self.login_deferred = defer.Deferred() + self.command_deferred = None + + def lineReceived(self, line): + if self.login == 0: + self.login = 1 + self.sendLine("manager ansto") + return + elif self.login == 1: + self.login = 2 + self.loginComplete(line) + else: + if line.startswith("TRANSACTIONSTART "): + self.command = line[len("TRANSACTIONSTART "):] + self.line_list = [] + elif line.startswith("TRANSACTIONFINISHED"): + self.doCallbacks() + self.line_list = [] + else: + self.line_list.append(line) + + def dataReceived(self, data): + self.buffer += data + while self.delimiter in self.buffer: + line, self.buffer = self.buffer.split(self.delimiter, 1) + line = line.strip("\r\n") + self.lineReceived(line) + + def connectionMade(self): + if self.verbose: + print "connectionMade", self.transport.getHost(), self.transport.getPeer() + self.login = 0 + self.port = self.transport.getHost().port + MyConnections[self.port] = self + if self.verbose: + print "MyConnections:",MyConnections + + def connectionLost(self, reason): + if self.verbose: + print "connectionLost" + del MyConnections[self.port] + self.login = 0 + self.port = None + if self.verbose: + print "MyConnections:",MyConnections + + def loginComplete(self, line): + if self.verbose: + print "LoginComplete" + self.login_deferred.callback(line) + self.factory.call_callbacks(None) + pass + + def sendLine(self, m): + if self.verbose: + print "sendLine:", m + self.transport.write(m + self.delimiter) + + def sendMessage(self, m): + self.command_deferred = defer.Deferred() + self.sendLine('fulltransact ' + m) + return self.command_deferred + + def doCallbacks(self): + if self.command_deferred: + self.command_deferred.callback(self.line_list) + if self.factory: + self.factory.call_callbacks((self.command, self.line_list)) + + def isConnected(self): + if self.login == 2: + return True + return False + +class SICSClientFactory(ReconnectingClientFactory): + def __init__(self): + self.verbose = False + self.device = None + self.callbacks = [] + + def startedConnecting(self, connector): + if self.verbose: + print 'Started to connect.' + + def buildProtocol(self, addr): + if self.verbose: + print 'Connected to:', addr + self.resetDelay() + p = SicsProtocol() + if self.verbose: + p.verbose = True + p.factory = self + p.delimiter = '\n' + self.device = p + return p + + def clientConnectionLost(self, connector, reason): + if self.verbose: + print 'Lost connection', connector.getDestination().port,'Reason:', reason + self.device = None + ReconnectingClientFactory.clientConnectionLost(self, connector, reason) + + def clientConnectionFailed(self, connector, reason): + if self.verbose: + print 'Connection failed. Reason:', reason + ReconnectingClientFactory.clientConnectionFailed(self, connector,reason) + + def sendMessage(self, m): + if self.device: + self.device.sendMessage(m) + + def isConnected(self): + if self.device: + if self.device.isConnected(): + return True + return False + + def add_callback(self, cb): + if not cb in self.callbacks: + self.callbacks.append(cb) + + def del_callback(self, cb): + if cb in self.callbacks: + self.callbacks.remove(cb) + + def call_callbacks(self, data): + for cb in self.callbacks: + cb(self, data) + +if __name__ == "__main__": + Verbose = False + def cbf(server, data): + if Verbose: + print "CB:", repr(data) + if data: + command, response = data + print command + for item in response: + print item + if len(Commands) > 0: + command = Commands[0] + del Commands[0] + server.sendMessage(command) + return + reactor.stop() + + factory = SICSClientFactory() + if Verbose: + factory.verbose = True + factory.add_callback(cbf) + connector = reactor.connectTCP("localhost", 60003, factory) + if True: + print "Factory:", dir(factory) + print "Destination:", connector.getDestination() + print "Connector:", dir(connector) + reactor.run() +