Files
sics/site_ansto/instrument/TEST_SICS/unit_tests/sics_proxy.py
2014-05-12 15:20:18 +10:00

194 lines
5.7 KiB
Python

#!/usr/bin/env python
# vim: ts=8 sts=4 sw=4 expandtab
# Author: Douglas Clowes (dcl@ansto.gov.au) 2014-04-24
""" SICS proxy client for use by unit testing modules
This class provides for connection to a running SICS instance,
including login.
It also provides wrapping of transactions and accumulation
of a full response with deferred handling and callback
"""
from twisted.python import log
from twisted.internet import reactor, defer
from twisted.internet.task import LoopingCall
from twisted.internet.protocol import Protocol
from twisted.internet.protocol import ReconnectingClientFactory
MyConnections = {}
Commands = ['status', 'dir types', 'sicslist a2']
class SicsProtocol(Protocol):
def __init__(self):
self.factory = None
self.buffer = ""
self.delimiter = "\n"
self.verbose = False
self.port = None
self.login = 0
self.login_deferred = defer.Deferred()
self.command_deferred = None
self.command = None
self.line_list = []
def lineReceived(self, line):
if self.login == 0:
self.login = 1
self.sendLine("manager ansto")
return
elif self.login == 1:
self.login = 2
self.loginComplete(line)
else:
if line.startswith("TRANSACTIONSTART "):
self.command = line[len("TRANSACTIONSTART "):]
self.line_list = []
elif line.startswith("TRANSACTIONFINISHED"):
self.doCallbacks()
self.line_list = []
else:
self.line_list.append(line)
def dataReceived(self, data):
self.buffer += data
while self.delimiter in self.buffer:
line, self.buffer = self.buffer.split(self.delimiter, 1)
line = line.strip("\r\n")
self.lineReceived(line)
def connectionMade(self):
if self.verbose:
print "connectionMade",
print self.transport.getHost(),
print self.transport.getPeer()
self.login = 0
self.port = self.transport.getHost().port
MyConnections[self.port] = self
if self.verbose:
print "MyConnections:", MyConnections
def connectionLost(self, reason):
if self.verbose:
print "connectionLost"
del MyConnections[self.port]
self.login = 0
self.port = None
if self.verbose:
print "MyConnections:", MyConnections
def loginComplete(self, line):
if self.verbose:
print "LoginComplete"
self.login_deferred.callback(line)
self.factory.call_callbacks(None)
def sendLine(self, m):
if self.verbose:
print "sendLine:", m
self.transport.write(m + self.delimiter)
def sendMessage(self, m):
self.command_deferred = defer.Deferred()
self.sendLine('fulltransact ' + m)
return self.command_deferred
def doCallbacks(self):
if self.command_deferred:
self.command_deferred.callback(self.line_list)
if self.factory:
self.factory.call_callbacks((self.command, self.line_list))
def isConnected(self):
if self.login == 2:
return True
return False
class SICSClientFactory(ReconnectingClientFactory):
def __init__(self):
self.verbose = False
self.device = None
self.callbacks = []
def startedConnecting(self, connector):
if self.verbose:
print 'Started to connect.'
def buildProtocol(self, addr):
if self.verbose:
print 'Connected to:', addr
self.resetDelay()
my_proto = SicsProtocol()
if self.verbose:
my_proto.verbose = True
my_proto.factory = self
my_proto.delimiter = '\n'
self.device = my_proto
return my_proto
def clientConnectionLost(self, connector, reason):
if self.verbose:
print 'Lost connection',
print connector.getDestination().port,
print 'Reason:', reason
self.device = None
ReconnectingClientFactory.clientConnectionLost(self,
connector, reason)
def clientConnectionFailed(self, connector, reason):
if self.verbose:
print 'Connection failed. Reason:', reason
ReconnectingClientFactory.clientConnectionFailed(self,
connector, reason)
def sendMessage(self, m):
if self.device:
self.device.sendMessage(m)
def isConnected(self):
if self.device:
if self.device.isConnected():
return True
return False
def add_callback(self, cb):
if not cb in self.callbacks:
self.callbacks.append(cb)
def del_callback(self, cb):
if cb in self.callbacks:
self.callbacks.remove(cb)
def call_callbacks(self, data):
for cb in self.callbacks:
cb(self, data)
def main_program():
Verbose = False
def cbf(server, data):
if Verbose:
print "CB:", repr(data)
if data:
command, response = data
print command
for item in response:
print item
if len(Commands) > 0:
command = Commands[0]
del Commands[0]
server.sendMessage(command)
return
reactor.stop()
factory = SICSClientFactory()
if Verbose:
factory.verbose = True
factory.add_callback(cbf)
my_connector = reactor.connectTCP("localhost", 60003, factory)
if True:
print "Factory:", dir(factory)
print "Destination:", my_connector.getDestination()
print "Connector:", dir(my_connector)
reactor.run()
if __name__ == "__main__":
main_program()