Improve formatting and documentation (pylint)

This commit is contained in:
Douglas Clowes
2014-05-12 15:19:10 +10:00
parent 5699ae2747
commit bd9c79da30
2 changed files with 179 additions and 83 deletions

View File

@ -3,7 +3,7 @@
# Author: Douglas Clowes (dcl@ansto.gov.au) 2014-04-24
#
""" This is a SICS hipadaba module.
It allows for loading the gumtree XML obtained from SICS and
accessing the tree by path to obtain the children and properties
of a node on a named path.
@ -14,34 +14,36 @@
# of our preference (performance) but fall back to a default version.
#
try:
from lxml import etree
print("running with lxml.etree")
from lxml import etree
print "running with lxml.etree"
except ImportError:
try:
# Python 2.5
import xml.etree.cElementTree as etree
print("running with cElementTree on Python 2.5+")
except ImportError:
try:
# Python 2.5
import xml.etree.ElementTree as etree
print("running with ElementTree on Python 2.5+")
# Python 2.5
import xml.etree.cElementTree as etree
print "running with cElementTree on Python 2.5+"
except ImportError:
try:
# normal cElementTree install
import cElementTree as etree
print("running with cElementTree")
except ImportError:
try:
# normal ElementTree install
import elementtree.ElementTree as etree
print("running with ElementTree")
# Python 2.5
import xml.etree.ElementTree as etree
print "running with ElementTree on Python 2.5+"
except ImportError:
print("Failed to import ElementTree from any known place")
try:
# normal cElementTree install
import cElementTree as etree
print "running with cElementTree"
except ImportError:
try:
# normal ElementTree install
import elementtree.ElementTree as etree
print "running with ElementTree"
except ImportError:
print "Failed to import ElementTree from any known place"
class HipadabaTree(object):
"""Hipadaba Tree Class
This can be built from a variety of XML sources and contains
methods for accessing and navigating the SICS hipadaba tree
"""
def __init__(self, from_this):
"""Build a Hipadaba Tree object from an 'ElementTree' object
@ -49,19 +51,19 @@ class HipadabaTree(object):
Parameters
----------
from_this : XML-like
ElementTree compatible object, or string or array of strings
ElementTree compatible object, or string or list of strings
"""
if isinstance(from_this, list) and isinstance(from_this[0], str):
root = etree.fromstringlist(from_this)
my_root = etree.fromstringlist(from_this)
elif isinstance(from_this, str):
if len(from_this) > 0:
if from_this[0] == "<":
root = etree.fromstring(from_this)
my_root = etree.fromstring(from_this)
elif from_this[0] == "@":
root = etree.parse(from_this[1:])
my_root = etree.parse(from_this[1:])
elif from_this.lower().endswith(".xml"):
root = etree.parse(from_this)
my_root = etree.parse(from_this)
else:
raise Exception("unknown string")
else:
@ -72,11 +74,21 @@ class HipadabaTree(object):
raise Exception("findall not in object")
if "find" not in directory:
raise Exception("find not in object")
root = from_this
self.root = root
my_root = from_this
self.root = my_root
def short_tree(self, tree=None):
"""Creates a map from the tree/node that can be used or printed
Parameters
----------
tree : optional Element or ElementTree
Returns
-------
map containing name(str),
children(list of str),
properties(list of str)
"""
if tree is None:
tree = self.root
@ -84,36 +96,69 @@ class HipadabaTree(object):
tree_part = {"name": "<root>"}
else:
tree_part = {"name": tree.attrib['id']}
tree_part["children"] = sorted([ch.attrib['id'] for ch in tree.findall('component')])
tree_part["properties"] = sorted([ch.attrib['id'] for ch in tree.findall('property')])
tree_part["children"] = sorted(
[comp.attrib['id'] for comp in tree.findall('component')])
tree_part["properties"] = sorted(
[propy.attrib['id'] for propy in tree.findall('property')])
return tree_part
def find_path(self, path, tree=None):
"""Finds a node from the given tree and path
Parameters
----------
path : str
tree : optional Element or ElementTree
Returns
-------
requested Element or None
"""
debug = False
if tree is None:
tree = self.root
if debug:
print "Looking for %s in %s" % (repr(path), self.short_tree(tree))
path_list = [p for p in path.lower().strip('/').split('/') if p is not '']
path_list = [propy
for propy in path.lower().strip('/').split('/')
if propy is not '']
if path_list == ['']:
if debug:
print "Finding root: %s in %s" % (repr(path_list), self.short_tree(tree))
print "Finding root: %s in %s" % (
repr(path_list),
self.short_tree(tree))
return tree
child = tree
for node in path_list:
children = [ch for ch in child.findall('component') if ch.attrib['id'].lower() == node.lower()]
children = [comp
for comp in child.findall('component')
if comp.attrib['id'].lower() == node.lower()]
if len(children) == 0:
if debug:
print "Not found: %s in %s" % (repr(path_list), self.short_tree(tree))
print "Not found: %s in %s" % (
repr(path_list),
self.short_tree(tree))
return None
child = children[0]
if debug:
print "Finding node: %s in %s" % (repr(path_list), self.short_tree(child))
print "Finding node: %s in %s" % (
repr(path_list),
self.short_tree(child))
return child
def list_tree(self, tree=None, props=False, indent=0):
"""Returns a list of indented strings for a tree
Parameters
----------
tree : optional Element or ElementTree
props : optional boolean to include node properties
indent : optional nesting/indent depth/level
Returns
-------
list of str ready to print or write to file
"""
if tree is None:
tree = self.root
text = []
@ -122,20 +167,40 @@ class HipadabaTree(object):
else:
text += [' '*indent + '* ' + tree.attrib['id']]
if props:
properties = sorted(tree.findall('property'), key=lambda node: node.attrib['id'].lower())
for prop in properties:
line = ' '*indent + ' - ' + prop.attrib['id'] + '='
items = [ch.text for ch in prop.findall('value') if ch.text is not None]
properties = sorted(
tree.findall('property'),
key=lambda node: node.attrib['id'].lower())
for propy in properties:
line = ' '*indent + ' - ' + propy.attrib['id'] + '='
items = [valu.text
for valu in propy.findall('value')
if valu.text is not None]
line += ' '.join(items)
text += [line]
children = sorted(tree.findall('component'), key=lambda node: node.attrib['id'].lower())
children = sorted(
tree.findall('component'),
key=lambda node: node.attrib['id'].lower())
for child in children:
text += self.list_tree(child, props, indent+1)
return text
def print_tree(self, tree=None, props=False, indent=0):
for line in self.list_tree(tree, props, indent):
"""Print the indented tree to stdout
Parameters
----------
tree : optional Element or ElementTree
props : optional boolean to include node properties
indent : optional nesting/indent depth/level
Returns
-------
list of str ready to print or write to file
"""
line_list = self.list_tree(tree, props, indent)
for line in line_list:
print line
return line_list
def getNode(self, path, tree=None):
return self.find_path(path, tree)
@ -144,20 +209,26 @@ class HipadabaTree(object):
node = self.find_path(path, tree)
if node is None:
return None
items = [ch.text for ch in node.findall('value') if ch.text is not None]
items = [valu.text
for valu in node.findall('value')
if valu.text is not None]
if len(items) == 0:
return ""
return ','.join(items)
def getProperty(self, path, property, tree=None):
def getProperty(self, path, the_property, tree=None):
node = self.find_path(path, tree)
if node is None:
return None
items = [ch for ch in node.findall('property') if ch.attrib['id'].lower() == property.lower()]
items = [propy
for propy in node.findall('property')
if propy.attrib['id'].lower() == the_property.lower()]
if len(items) == 0:
return None
node = items[0]
items = [ch.text for ch in node.findall('value') if ch.text is not None]
items = [valu.text
for valu in node.findall('value')
if valu.text is not None]
if len(items) == 0:
return ""
return ','.join(items)
@ -165,43 +236,52 @@ class HipadabaTree(object):
def getProperties(self, path, tree=None):
node = self.find_path(path, tree)
if node is not None:
return sorted([ch.attrib['id'] for ch in node.findall('property')])
return sorted(
[propy.attrib['id'] for propy in node.findall('property')])
return None
def getChildren(self, path, tree=None):
node = self.find_path(path, tree)
if node is not None:
return sorted([ch.attrib['id'] for ch in node.findall('component')])
return sorted(
[comp.attrib['id'] for comp in node.findall('component')])
return None
if __name__ == "__main__":
"""this test uses a "junk.xml" file extracted for SICS
"""
filename = "junk.xml"
fd = open(filename, "r")
lines = fd.readlines()
print "From file:", HipadabaTree(filename).short_tree()
print "From list:", HipadabaTree(lines).short_tree()
print "From text:", HipadabaTree('\n'.join(lines)).short_tree()
def main_program():
###this test uses a "junk.xml" file extracted for SICS
###
test_file = "junk.xml"
with open(test_file, "r") as input_file:
line_list = input_file.readlines()
print "From file:", HipadabaTree(test_file).short_tree()
print "From list:", HipadabaTree(line_list).short_tree()
print "From text:", HipadabaTree('\n'.join(line_list)).short_tree()
root = etree.parse("junk.xml")
hipadaba = HipadabaTree(root)
print "From tree:", hipadaba.short_tree()
print "Components:", [ch.attrib['id'] for ch in hipadaba.root.findall('component')]
print "Components:", [ch.attrib['id'] for ch in hipadaba.getNode('/sample/ps9').findall('component')]
print "Components:", [ch.attrib['id']
for ch in hipadaba.root.findall('component')]
print "Components:", [ch.attrib['id']
for ch in hipadaba.getNode('/sample/ps9').findall('component')]
print "GetNode:", hipadaba.short_tree(hipadaba.getNode('/sample/ps9'))
print "PrintTree:"
hipadaba.print_tree(tree=hipadaba.getNode('/sample/ps9/'))
hipadaba.print_tree(tree=hipadaba.getNode('/sample/ps9/status'), props=True)
hipadaba.print_tree(
tree=hipadaba.getNode('/sample/ps9/'))
hipadaba.print_tree(
tree=hipadaba.getNode('/sample/ps9/status'), props=True)
print "GetChildren:", hipadaba.getChildren('')
print "GetChildren:", hipadaba.getChildren('////')
print "GetChildren:", hipadaba.getChildren('//sample///ps9/////')
print "GetProperties:", hipadaba.getProperties('/sample/ps9')
print "Properties from root:"
for p in hipadaba.getProperties('/sample/ps9'):
prop = hipadaba.getProperty('/sample/ps9', p)
print ' ', p, '=', prop
n = hipadaba.getNode('/sample/ps9')
for property_name in hipadaba.getProperties('/sample/ps9'):
property_value = hipadaba.getProperty('/sample/ps9', property_name)
print ' ', property_name, '=', property_value
node = hipadaba.getNode('/sample/ps9')
print "Properties from node:"
for p in hipadaba.getProperties('/', n):
prop = hipadaba.getProperty('/', p, n)
print ' ', p, '=', prop
for property_name in hipadaba.getProperties('/', node):
property_value = hipadaba.getProperty('/', property_name, node)
print ' ', property_name, '=', property_value
if __name__ == "__main__":
main_program()

View File

@ -1,6 +1,13 @@
#!/usr/bin/env python
# vim: ts=8 sts=4 sw=4 expandtab
# Author: Douglas Clowes (dcl@ansto.gov.au) 2014-04-24
""" SICS proxy client for use by unit testing modules
This class provides for connection to a running SICS instance,
including login.
It also provides wrapping of transactions and accumulation
of a full response with deferred handling and callback
"""
from twisted.python import log
from twisted.internet import reactor, defer
from twisted.internet.task import LoopingCall
@ -20,6 +27,8 @@ class SicsProtocol(Protocol):
self.login = 0
self.login_deferred = defer.Deferred()
self.command_deferred = None
self.command = None
self.line_list = []
def lineReceived(self, line):
if self.login == 0:
@ -48,12 +57,14 @@ class SicsProtocol(Protocol):
def connectionMade(self):
if self.verbose:
print "connectionMade", self.transport.getHost(), self.transport.getPeer()
print "connectionMade",
print self.transport.getHost(),
print self.transport.getPeer()
self.login = 0
self.port = self.transport.getHost().port
MyConnections[self.port] = self
if self.verbose:
print "MyConnections:",MyConnections
print "MyConnections:", MyConnections
def connectionLost(self, reason):
if self.verbose:
@ -62,14 +73,13 @@ class SicsProtocol(Protocol):
self.login = 0
self.port = None
if self.verbose:
print "MyConnections:",MyConnections
print "MyConnections:", MyConnections
def loginComplete(self, line):
if self.verbose:
print "LoginComplete"
self.login_deferred.callback(line)
self.factory.call_callbacks(None)
pass
def sendLine(self, m):
if self.verbose:
@ -106,24 +116,28 @@ class SICSClientFactory(ReconnectingClientFactory):
if self.verbose:
print 'Connected to:', addr
self.resetDelay()
p = SicsProtocol()
my_proto = SicsProtocol()
if self.verbose:
p.verbose = True
p.factory = self
p.delimiter = '\n'
self.device = p
return p
my_proto.verbose = True
my_proto.factory = self
my_proto.delimiter = '\n'
self.device = my_proto
return my_proto
def clientConnectionLost(self, connector, reason):
if self.verbose:
print 'Lost connection', connector.getDestination().port,'Reason:', reason
print 'Lost connection',
print connector.getDestination().port,
print 'Reason:', reason
self.device = None
ReconnectingClientFactory.clientConnectionLost(self, connector, reason)
ReconnectingClientFactory.clientConnectionLost(self,
connector, reason)
def clientConnectionFailed(self, connector, reason):
if self.verbose:
print 'Connection failed. Reason:', reason
ReconnectingClientFactory.clientConnectionFailed(self, connector,reason)
ReconnectingClientFactory.clientConnectionFailed(self,
connector, reason)
def sendMessage(self, m):
if self.device:
@ -147,7 +161,7 @@ class SICSClientFactory(ReconnectingClientFactory):
for cb in self.callbacks:
cb(self, data)
if __name__ == "__main__":
def main_program():
Verbose = False
def cbf(server, data):
if Verbose:
@ -168,10 +182,12 @@ if __name__ == "__main__":
if Verbose:
factory.verbose = True
factory.add_callback(cbf)
connector = reactor.connectTCP("localhost", 60003, factory)
my_connector = reactor.connectTCP("localhost", 60003, factory)
if True:
print "Factory:", dir(factory)
print "Destination:", connector.getDestination()
print "Connector:", dir(connector)
print "Destination:", my_connector.getDestination()
print "Connector:", dir(my_connector)
reactor.run()
if __name__ == "__main__":
main_program()