Twisted unittest based SICS unit test infrastructure
This commit is contained in:
194
site_ansto/instrument/TEST_SICS/unit_tests/sics_hdb.py
Normal file
194
site_ansto/instrument/TEST_SICS/unit_tests/sics_hdb.py
Normal file
@ -0,0 +1,194 @@
|
||||
#!/usr/bin/env python
|
||||
# vim: ts=8 sts=4 sw=4 expandtab
|
||||
# Author: Douglas Clowes (dcl@ansto.gov.au) 2014-04-24
|
||||
#
|
||||
""" This is a SICS hipadaba module.
|
||||
|
||||
It allows for loading the gumtree XML obtained from SICS and
|
||||
accessing the tree by path to obtain the children and properties
|
||||
of a node on a named path.
|
||||
"""
|
||||
|
||||
#
|
||||
# Try to import an ElementTree compatible XML library in the order
|
||||
# of our preference (performance) but fall back to a default version.
|
||||
#
|
||||
try:
|
||||
from lxml import etree
|
||||
print("running with lxml.etree")
|
||||
except ImportError:
|
||||
try:
|
||||
# Python 2.5
|
||||
import xml.etree.cElementTree as etree
|
||||
print("running with cElementTree on Python 2.5+")
|
||||
except ImportError:
|
||||
try:
|
||||
# Python 2.5
|
||||
import xml.etree.ElementTree as etree
|
||||
print("running with ElementTree on Python 2.5+")
|
||||
except ImportError:
|
||||
try:
|
||||
# normal cElementTree install
|
||||
import cElementTree as etree
|
||||
print("running with cElementTree")
|
||||
except ImportError:
|
||||
try:
|
||||
# normal ElementTree install
|
||||
import elementtree.ElementTree as etree
|
||||
print("running with ElementTree")
|
||||
except ImportError:
|
||||
print("Failed to import ElementTree from any known place")
|
||||
|
||||
class HipadabaTree(object):
|
||||
"""Hipadaba Tree Class
|
||||
|
||||
"""
|
||||
def __init__(self, from_this):
|
||||
"""Build a Hipadaba Tree object from an 'ElementTree' object
|
||||
|
||||
Parameters
|
||||
----------
|
||||
from_this : XML-like
|
||||
ElementTree compatible object, or string or array of strings
|
||||
|
||||
"""
|
||||
if isinstance(from_this, list) and isinstance(from_this[0], str):
|
||||
root = etree.fromstringlist(from_this)
|
||||
elif isinstance(from_this, str):
|
||||
if len(from_this) > 0:
|
||||
if from_this[0] == "<":
|
||||
root = etree.fromstring(from_this)
|
||||
elif from_this[0] == "@":
|
||||
root = etree.parse(from_this[1:])
|
||||
elif from_this.lower().endswith(".xml"):
|
||||
root = etree.parse(from_this)
|
||||
else:
|
||||
raise Exception("unknown string")
|
||||
else:
|
||||
raise Exception("short string")
|
||||
else:
|
||||
directory = dir(from_this)
|
||||
if "findall" not in directory:
|
||||
raise Exception("findall not in object")
|
||||
if "find" not in directory:
|
||||
raise Exception("find not in object")
|
||||
root = from_this
|
||||
self.root = root
|
||||
|
||||
def short_tree(self, tree=None):
|
||||
"""
|
||||
"""
|
||||
if tree is None:
|
||||
tree = self.root
|
||||
if isinstance(tree, type(self.root)):
|
||||
tree_part = {"name": "<root>"}
|
||||
else:
|
||||
tree_part = {"name": tree.attrib['id']}
|
||||
tree_part["children"] = [ch.attrib['id'] for ch in tree.findall('component')]
|
||||
tree_part["properties"] = [ch.attrib['id'] for ch in tree.findall('property')]
|
||||
return tree_part
|
||||
|
||||
def find_path(self, path, tree=None):
|
||||
debug = True
|
||||
if tree is None:
|
||||
tree = self.root
|
||||
if debug:
|
||||
print "Looking for %s in %s" % (repr(path), self.short_tree(tree))
|
||||
path_list = [p for p in path.lower().strip('/').split('/') if p is not '']
|
||||
if path_list == ['']:
|
||||
if debug:
|
||||
print "Finding root: %s in %s" % (repr(path_list), self.short_tree(tree))
|
||||
return tree
|
||||
child = tree
|
||||
for node in path_list:
|
||||
children = [ch for ch in child.findall('component') if ch.attrib['id'].lower() == node.lower()]
|
||||
if len(children) == 0:
|
||||
if debug:
|
||||
print "Not found: %s in %s" % (repr(path_list), self.short_tree(tree))
|
||||
return None
|
||||
child = children[0]
|
||||
if debug:
|
||||
print "Finding node: %s in %s" % (repr(path_list), self.short_tree(child))
|
||||
return child
|
||||
|
||||
def list_tree(self, tree=None, props=False, indent=0):
|
||||
if tree is None:
|
||||
tree = self.root
|
||||
text = []
|
||||
if isinstance(tree, type(self.root)):
|
||||
text += [' '*indent + '* ' + '<root>']
|
||||
else:
|
||||
text += [' '*indent + '* ' + tree.attrib['id']]
|
||||
if props:
|
||||
properties = sorted(tree.findall('property'), key=lambda node: node.attrib['id'].lower())
|
||||
for prop in properties:
|
||||
line = ' '*indent + ' - ' + prop.attrib['id'] + '='
|
||||
items = [ch.text for ch in prop.findall('value') if ch.text is not None]
|
||||
line += ' '.join(items)
|
||||
text += [line]
|
||||
children = sorted(tree.findall('component'), key=lambda node: node.attrib['id'].lower())
|
||||
for child in children:
|
||||
text += self.list_tree(child, props, indent+1)
|
||||
return text
|
||||
|
||||
def print_tree(self, tree=None, props=False, indent=0):
|
||||
for line in self.list_tree(tree, props, indent):
|
||||
print line
|
||||
|
||||
def getNode(self, path, tree=None):
|
||||
return self.find_path(path, tree)
|
||||
|
||||
def getProperty(self, path, property, tree=None):
|
||||
node = self.find_path(path, tree)
|
||||
if node is None:
|
||||
return None
|
||||
items = [ch for ch in node.findall('property') if ch.attrib['id'].lower() == property.lower()]
|
||||
if len(items) == 0:
|
||||
return None
|
||||
node = items[0]
|
||||
items = [ch.text for ch in node.findall('value') if ch.text is not None]
|
||||
if len(items) == 0:
|
||||
return ""
|
||||
return ' '.join(items)
|
||||
|
||||
def getProperties(self, path, tree=None):
|
||||
node = self.find_path(path, tree)
|
||||
if node is not None:
|
||||
return sorted([ch.attrib['id'] for ch in node.findall('property')])
|
||||
return None
|
||||
|
||||
def getChildren(self, path, tree=None):
|
||||
node = self.find_path(path, tree)
|
||||
if node:
|
||||
return sorted([ch.attrib['id'] for ch in node.findall('component')])
|
||||
return None
|
||||
|
||||
if __name__ == "__main__":
|
||||
filename = "junk.xml"
|
||||
fd = open(filename, "r")
|
||||
lines = fd.readlines()
|
||||
print "From file:", HipadabaTree(filename).short_tree()
|
||||
print "From list:", HipadabaTree(lines).short_tree()
|
||||
print "From text:", HipadabaTree('\n'.join(lines)).short_tree()
|
||||
root = etree.parse("junk.xml")
|
||||
hipadaba = HipadabaTree(root)
|
||||
print "From tree:", hipadaba.short_tree()
|
||||
print "Components:", [ch.attrib['id'] for ch in hipadaba.root.findall('component')]
|
||||
print "Components:", [ch.attrib['id'] for ch in hipadaba.getNode('/sample/ps9').findall('component')]
|
||||
print "GetNode:", hipadaba.short_tree(hipadaba.getNode('/sample/ps9'))
|
||||
print "PrintTree:"
|
||||
hipadaba.print_tree(tree=hipadaba.getNode('/sample/ps9/'))
|
||||
hipadaba.print_tree(tree=hipadaba.getNode('/sample/ps9/status'), props=True)
|
||||
print "GetChildren:", hipadaba.getChildren('')
|
||||
print "GetChildren:", hipadaba.getChildren('////')
|
||||
print "GetChildren:", hipadaba.getChildren('//sample///ps9/////')
|
||||
print "GetProperties:", hipadaba.getProperties('/sample/ps9')
|
||||
print "Properties from root:"
|
||||
for p in hipadaba.getProperties('/sample/ps9'):
|
||||
prop = hipadaba.getProperty('/sample/ps9', p)
|
||||
print ' ', p, '=', prop
|
||||
n = hipadaba.getNode('/sample/ps9')
|
||||
print "Properties from node:"
|
||||
for p in hipadaba.getProperties('/', n):
|
||||
prop = hipadaba.getProperty('/', p, n)
|
||||
print ' ', p, '=', prop
|
177
site_ansto/instrument/TEST_SICS/unit_tests/sics_proxy.py
Normal file
177
site_ansto/instrument/TEST_SICS/unit_tests/sics_proxy.py
Normal file
@ -0,0 +1,177 @@
|
||||
#!/usr/bin/env python
|
||||
# vim: ts=8 sts=4 sw=4 expandtab
|
||||
# Author: Douglas Clowes (dcl@ansto.gov.au) 2014-04-24
|
||||
from twisted.python import log
|
||||
from twisted.internet import reactor, defer
|
||||
from twisted.internet.task import LoopingCall
|
||||
from twisted.internet.protocol import Protocol
|
||||
from twisted.internet.protocol import ReconnectingClientFactory
|
||||
|
||||
MyConnections = {}
|
||||
Commands = ['status', 'dir types', 'sicslist a2']
|
||||
|
||||
class SicsProtocol(Protocol):
|
||||
def __init__(self):
|
||||
self.factory = None
|
||||
self.buffer = ""
|
||||
self.delimiter = "\n"
|
||||
self.verbose = False
|
||||
self.port = None
|
||||
self.login = 0
|
||||
self.login_deferred = defer.Deferred()
|
||||
self.command_deferred = None
|
||||
|
||||
def lineReceived(self, line):
|
||||
if self.login == 0:
|
||||
self.login = 1
|
||||
self.sendLine("manager ansto")
|
||||
return
|
||||
elif self.login == 1:
|
||||
self.login = 2
|
||||
self.loginComplete(line)
|
||||
else:
|
||||
if line.startswith("TRANSACTIONSTART "):
|
||||
self.command = line[len("TRANSACTIONSTART "):]
|
||||
self.line_list = []
|
||||
elif line.startswith("TRANSACTIONFINISHED"):
|
||||
self.doCallbacks()
|
||||
self.line_list = []
|
||||
else:
|
||||
self.line_list.append(line)
|
||||
|
||||
def dataReceived(self, data):
|
||||
self.buffer += data
|
||||
while self.delimiter in self.buffer:
|
||||
line, self.buffer = self.buffer.split(self.delimiter, 1)
|
||||
line = line.strip("\r\n")
|
||||
self.lineReceived(line)
|
||||
|
||||
def connectionMade(self):
|
||||
if self.verbose:
|
||||
print "connectionMade", self.transport.getHost(), self.transport.getPeer()
|
||||
self.login = 0
|
||||
self.port = self.transport.getHost().port
|
||||
MyConnections[self.port] = self
|
||||
if self.verbose:
|
||||
print "MyConnections:",MyConnections
|
||||
|
||||
def connectionLost(self, reason):
|
||||
if self.verbose:
|
||||
print "connectionLost"
|
||||
del MyConnections[self.port]
|
||||
self.login = 0
|
||||
self.port = None
|
||||
if self.verbose:
|
||||
print "MyConnections:",MyConnections
|
||||
|
||||
def loginComplete(self, line):
|
||||
if self.verbose:
|
||||
print "LoginComplete"
|
||||
self.login_deferred.callback(line)
|
||||
self.factory.call_callbacks(None)
|
||||
pass
|
||||
|
||||
def sendLine(self, m):
|
||||
if self.verbose:
|
||||
print "sendLine:", m
|
||||
self.transport.write(m + self.delimiter)
|
||||
|
||||
def sendMessage(self, m):
|
||||
self.command_deferred = defer.Deferred()
|
||||
self.sendLine('fulltransact ' + m)
|
||||
return self.command_deferred
|
||||
|
||||
def doCallbacks(self):
|
||||
if self.command_deferred:
|
||||
self.command_deferred.callback(self.line_list)
|
||||
if self.factory:
|
||||
self.factory.call_callbacks((self.command, self.line_list))
|
||||
|
||||
def isConnected(self):
|
||||
if self.login == 2:
|
||||
return True
|
||||
return False
|
||||
|
||||
class SICSClientFactory(ReconnectingClientFactory):
|
||||
def __init__(self):
|
||||
self.verbose = False
|
||||
self.device = None
|
||||
self.callbacks = []
|
||||
|
||||
def startedConnecting(self, connector):
|
||||
if self.verbose:
|
||||
print 'Started to connect.'
|
||||
|
||||
def buildProtocol(self, addr):
|
||||
if self.verbose:
|
||||
print 'Connected to:', addr
|
||||
self.resetDelay()
|
||||
p = SicsProtocol()
|
||||
if self.verbose:
|
||||
p.verbose = True
|
||||
p.factory = self
|
||||
p.delimiter = '\n'
|
||||
self.device = p
|
||||
return p
|
||||
|
||||
def clientConnectionLost(self, connector, reason):
|
||||
if self.verbose:
|
||||
print 'Lost connection', connector.getDestination().port,'Reason:', reason
|
||||
self.device = None
|
||||
ReconnectingClientFactory.clientConnectionLost(self, connector, reason)
|
||||
|
||||
def clientConnectionFailed(self, connector, reason):
|
||||
if self.verbose:
|
||||
print 'Connection failed. Reason:', reason
|
||||
ReconnectingClientFactory.clientConnectionFailed(self, connector,reason)
|
||||
|
||||
def sendMessage(self, m):
|
||||
if self.device:
|
||||
self.device.sendMessage(m)
|
||||
|
||||
def isConnected(self):
|
||||
if self.device:
|
||||
if self.device.isConnected():
|
||||
return True
|
||||
return False
|
||||
|
||||
def add_callback(self, cb):
|
||||
if not cb in self.callbacks:
|
||||
self.callbacks.append(cb)
|
||||
|
||||
def del_callback(self, cb):
|
||||
if cb in self.callbacks:
|
||||
self.callbacks.remove(cb)
|
||||
|
||||
def call_callbacks(self, data):
|
||||
for cb in self.callbacks:
|
||||
cb(self, data)
|
||||
|
||||
if __name__ == "__main__":
|
||||
Verbose = False
|
||||
def cbf(server, data):
|
||||
if Verbose:
|
||||
print "CB:", repr(data)
|
||||
if data:
|
||||
command, response = data
|
||||
print command
|
||||
for item in response:
|
||||
print item
|
||||
if len(Commands) > 0:
|
||||
command = Commands[0]
|
||||
del Commands[0]
|
||||
server.sendMessage(command)
|
||||
return
|
||||
reactor.stop()
|
||||
|
||||
factory = SICSClientFactory()
|
||||
if Verbose:
|
||||
factory.verbose = True
|
||||
factory.add_callback(cbf)
|
||||
connector = reactor.connectTCP("localhost", 60003, factory)
|
||||
if True:
|
||||
print "Factory:", dir(factory)
|
||||
print "Destination:", connector.getDestination()
|
||||
print "Connector:", dir(connector)
|
||||
reactor.run()
|
||||
|
Reference in New Issue
Block a user