From da2fd6bbbb6a3a24db5c7c076d32c75b466d2c4d Mon Sep 17 00:00:00 2001 From: Douglas Clowes Date: Thu, 17 Apr 2014 13:12:42 +1000 Subject: [PATCH] Make a fake Pfeiffer device for testing the script context driver This driver controls the needle valve on the Oxford Mercury --- .../fakePfeiffer/tpg261/PfeifferDevice.py | 110 +++++++++++++++ .../fakePfeiffer/tpg261/PfeifferFactory.py | 47 +++++++ .../fakePfeiffer/tpg261/PfeifferProtocol.py | 87 ++++++++++++ .../fakePfeiffer/tpg261/Pfeiffer_tpg261.py | 76 +++++++++++ .../fakePfeiffer/tpg261/SIM_PFEIFFER.py | 126 ++++++++++++++++++ 5 files changed, 446 insertions(+) create mode 100755 site_ansto/instrument/TEST_SICS/fakePfeiffer/tpg261/PfeifferDevice.py create mode 100644 site_ansto/instrument/TEST_SICS/fakePfeiffer/tpg261/PfeifferFactory.py create mode 100755 site_ansto/instrument/TEST_SICS/fakePfeiffer/tpg261/PfeifferProtocol.py create mode 100644 site_ansto/instrument/TEST_SICS/fakePfeiffer/tpg261/Pfeiffer_tpg261.py create mode 100755 site_ansto/instrument/TEST_SICS/fakePfeiffer/tpg261/SIM_PFEIFFER.py diff --git a/site_ansto/instrument/TEST_SICS/fakePfeiffer/tpg261/PfeifferDevice.py b/site_ansto/instrument/TEST_SICS/fakePfeiffer/tpg261/PfeifferDevice.py new file mode 100755 index 00000000..3f532bd2 --- /dev/null +++ b/site_ansto/instrument/TEST_SICS/fakePfeiffer/tpg261/PfeifferDevice.py @@ -0,0 +1,110 @@ +# vim: ft=python ts=8 sts=4 sw=4 expandtab autoindent smartindent nocindent +# +# Generic Pfeiffer Temperature Controller Device +# +# Author: Douglas Clowes (2014) +# +import inspect +import traceback + +class PfeifferDevice(object): + + def __init__(self): + print PfeifferDevice.__name__, "ctor" + #print "Methods:", inspect.getmembers(self, inspect.ismethod) + methods = inspect.getmembers(self, inspect.ismethod) + self.myMethods = {} + for method in methods: + self.myMethods[method[0]] = method[1:] + #for method in sorted(self.myMethods): + # print "Method:", method, self.myMethods[method], type(method), type(self.myMethods[method]) + + def reset_powerup(self): + print PfeifferDevice.__name__, "reset_powerup" + + def write(self, response): + print "Device Response: %s" % response + self.protocol.write(response) + + def doCommand(self, command, params): + print PfeifferDevice.__name__, "Command:", command, params + method = "doCommand%s" % command + if method in self.myMethods: + action = "response = self.%s(command, params)" % method + print "Action:", action + exec action + if response: + return response + else: + print "Unimplemented Command:", command, params + return False + + def doQuery(self, command, params): + print PfeifferDevice.__name__, "Query:", command, params + method = "doQuery%s" % command + if method in self.myMethods: + action = "response = self.%s(command, params)" % method + print "Action:", action + exec action + if response: + return response + else: + print "Unimplemented Query:", command, params + self.write("Unimplemented Query: %s" % command) + return False + + def doQueryRANDOM(self, command, params): + self.write("%f" % self.RANDOM) + + def doCommandRANDOM(self, command, params): + self.RANDOM = float(params[0]) + + def mergeParams(self, count, theOld, theNew): + oldParams = theOld.split(",") + if len(oldParams) != count: + raise IndexError + newParams = theNew.split(",") + if len(newParams) > count: + raise IndexError + mergedParams = [] + for idx in range(count): + if oldParams[idx] == '': + raise ValueError + if idx >= len(newParams) or newParams[idx] == '': + mergedParams.append(oldParams[idx]) + else: + mergedParams.append(newParams[idx]) + if len(mergedParams) != count: + raise IndexError + return ",".join(mergedParams) + + def dataReceived(self, data): + print PfeifferDevice.__name__, "PDU: \"" + data + "\"" + command = data.split() + if len(command) == 0: + return + command = command[0] + params = data[len(command)+1:].strip().split() + try: + if command == "SET": + self.doCommand(command, params) + else: + self.doQuery(command, params) + except: + traceback.print_exc() + +if __name__ == '__main__': + class TestProtocol: + def __init__(self): + print self.__class__.__name__, "ctor" + self.numProtocols = 0 + def write(self, data): + print "test write:", data + def loseConnection(self): + print "test lose connection" + test_protocol = TestProtocol() + test_device = PfeifferDevice() + test_device.protocol = test_protocol + test_device.dataReceived("PR1") + test_device.dataReceived("SET 3") + test_device.dataReceived("PR1") diff --git a/site_ansto/instrument/TEST_SICS/fakePfeiffer/tpg261/PfeifferFactory.py b/site_ansto/instrument/TEST_SICS/fakePfeiffer/tpg261/PfeifferFactory.py new file mode 100644 index 00000000..0385590e --- /dev/null +++ b/site_ansto/instrument/TEST_SICS/fakePfeiffer/tpg261/PfeifferFactory.py @@ -0,0 +1,47 @@ +# vim: ft=python ts=8 sts=4 sw=4 expandtab autoindent smartindent nocindent +# Fake Pfeiffer Temperature Controller Factory +# +# Author: Douglas Clowes 2014 +# +from twisted.internet.protocol import ServerFactory + +class PfeifferFactory(ServerFactory): + """Factory object used by the Twisted Infrastructure to create a Protocol + object for incomming connections""" + + protocol = None + + def __init__(self, theProtocol, theDevice, theTerminator = "\r\n"): + print PfeifferFactory.__name__, "ctor" + self.protocol = theProtocol + self.device = theDevice + self.term = theTerminator + self.numProtocols = 0 + + def buildProtocol(self, addr): + p = self.protocol(self.device, self.term) + p.factory = self + return p + +if __name__ == '__main__': + class TestProtocol: + def __init__(self, theDevice, theTerm = "\r\n"): + self.device = theDevice + self.response = "" + self.term = theTerm + + class TestDevice: + def __init__(self): + pass + + new_factory = PfeifferFactory(TestProtocol, TestDevice, "\r\n"); + new_protocol = new_factory.buildProtocol("address") + print "Factory: ", new_factory + print " .protocol", new_factory.protocol + print " .device ", new_factory.device + print " .num_prot", new_factory.numProtocols + print " .term ", new_factory.term + print "Protocol: ", new_protocol + print " .device ", new_protocol.device + print " .response", new_protocol.response + print " .term ", new_protocol.term diff --git a/site_ansto/instrument/TEST_SICS/fakePfeiffer/tpg261/PfeifferProtocol.py b/site_ansto/instrument/TEST_SICS/fakePfeiffer/tpg261/PfeifferProtocol.py new file mode 100755 index 00000000..0ca05def --- /dev/null +++ b/site_ansto/instrument/TEST_SICS/fakePfeiffer/tpg261/PfeifferProtocol.py @@ -0,0 +1,87 @@ +# vim: ft=python ts=8 sts=4 sw=4 expandtab autoindent smartindent nocindent +# Fake Pfeiffer Temperature Controller Protocol +# +# Author: Douglas Clowes 2014 +# +from twisted.internet.protocol import Protocol + +class PfeifferProtocol(Protocol): + """Protocol object used by the Twisted Infrastructure to handle connections""" + + def __init__(self, theDevice, theTerminator = "\r\n"): + print PfeifferProtocol.__name__, "ctor" + self.device = theDevice + self.response = "" + self.term = theTerminator + + def write(self, response): + self.response = self.response + response + + def connectionMade(self): + self.pdu = "" + self.response = "" + self.device.protocol = self + self.factory.numProtocols = self.factory.numProtocols + 1 + print "connectionMade:", self.factory.numProtocols + if self.factory.numProtocols > 2: + print "Too many connections - rejecting" + self.transport.write("Too many connections, try later" + self.term) + self.transport.loseConnection() + else: + self.transport.write(("Welcome connection %d" % self.factory.numProtocols) + self.term) + + def connectionLost(self, reason): + print "connectionLost:", self.factory.numProtocols, reason + self.factory.numProtocols = self.factory.numProtocols - 1 + + def lineReceived(self, data): + print "lineReceived - len:", len(data), data + self.device.protocol = self + self.device.dataReceived(data) + + def dataReceived(self, data): + print "dataReceived - len:", len(data), data + for c in data: + if c == "\r" or c == "\n": + if len(self.pdu) > 0: + self.lineReceived(self.pdu) + self.pdu = "" + self.transport.write("\6\r\n") + elif c == "\5": + if len(self.response) > 0: + print "Protocol Response: %s" % self.response + self.transport.write(self.response + self.term) + self.response = "" + else: + self.pdu = self.pdu + c + +if __name__ == '__main__': + class TestDevice: + def __init__(self): + print self.__class__.__name__, "ctor" + def dataReceived(self, pdu): + print "test device data received:", pdu + self.protocol.write("test device response\r\n") + class TestFactory: + def __init__(self): + self.numProtocols = 0 + def write(self, data): + print "test write:", data, + def loseConnection(self): + print "test lose connection" + self.protocol.connectionLost("Factory") + myTerm = "\r\n" + test_device = TestDevice() + test_factory = TestFactory() + test_protocol = PfeifferProtocol(test_device, myTerm) + test_factory.protocol = test_protocol + test_protocol.factory = test_factory + test_protocol.transport = test_factory + test_protocol.connectionMade() + test_protocol.connectionMade() + test_protocol.connectionMade() + test_protocol.connectionLost("Dunno") + test_protocol.dataReceived("P") + test_protocol.dataReceived("R1\r\n") + test_protocol.dataReceived("\5") + test_protocol.connectionLost("Dunno") diff --git a/site_ansto/instrument/TEST_SICS/fakePfeiffer/tpg261/Pfeiffer_tpg261.py b/site_ansto/instrument/TEST_SICS/fakePfeiffer/tpg261/Pfeiffer_tpg261.py new file mode 100644 index 00000000..67b02c45 --- /dev/null +++ b/site_ansto/instrument/TEST_SICS/fakePfeiffer/tpg261/Pfeiffer_tpg261.py @@ -0,0 +1,76 @@ +# vim: ft=python ts=8 sts=4 sw=4 expandtab autoindent smartindent nocindent +# Fake Pfeiffer Temperature Controller +# +# Author: Douglas Clowes 2014 +# +from PfeifferDevice import PfeifferDevice +import random +import re +import os +import sys +import time + +class Pfeiffer_tpg261(PfeifferDevice): + """Pfeiffer TPG261 temperature controller object - simulates the device""" + + def __init__(self): + PfeifferDevice.__init__(self) + print Pfeiffer_tpg261.__name__, "ctor" + self.RANDOM = 0.0 + self.IDN = "Simulated Pfeiffer TPG261" + self.reset_powerup() + self.pressure = 3.14156E-02 + + def doCommand(self, command, params): + print Pfeiffer_tpg261.__name__, "Command:", command, params + return PfeifferDevice.doCommand(self, command, params) + + def doQuery(self, command, params): + print Pfeiffer_tpg261.__name__, "Query:", command, params + return PfeifferDevice.doQuery(self, command, params) + + def reset_powerup(self): + print Pfeiffer_tpg261.__name__, "reset_powerup" + self.LAST_ITERATION = 0 + + def doIteration(self): + delta_time = time.time() - self.LAST_ITERATION + if delta_time < 1: + return + #print "DoIteration:", delta_time + self.LAST_ITERATION = time.time() + + def doCommandSET(self, cmd, args): + if len(args) > 0: + print "SET", args, float(args[0]) + self.pressure = float(args[0]) + print "TODO implement Command: \"SET\" in \"" + cmd + ":" + ":".join(args) + "\"" + + def doQueryPR1(self, cmd, args): + self.write("0,%11.4E" % self.pressure) + #print "TODO implement Query: \"PR1\" in \"" + cmd + ":" + ":".join(args) + "\"" + +if __name__ == '__main__': + from PfeifferProtocol import PfeifferProtocol + + class TestFactory: + def __init__(self): + print self.__class__.__name__, "ctor" + self.numProtocols = 0 + def write(self, data): + print "test write:", data, + def loseConnection(self): + print "test lose connection" + test_factory = TestFactory() + test_device = Pfeiffer_tpg261() + test_protocol = PfeifferProtocol(test_device, "\r\n") + test_protocol.factory = test_factory + test_protocol.transport = test_factory + test_device.protocol = test_protocol + test_device.protocol.connectionMade() + commands = ["PR1", "SET 3.1415926E-03", "PR1"] + for cmd in commands: + print "Command:", cmd + test_device.protocol.dataReceived(cmd) + test_device.protocol.dataReceived("\r\n\5") + test_device.protocol.connectionLost("Dunno") diff --git a/site_ansto/instrument/TEST_SICS/fakePfeiffer/tpg261/SIM_PFEIFFER.py b/site_ansto/instrument/TEST_SICS/fakePfeiffer/tpg261/SIM_PFEIFFER.py new file mode 100755 index 00000000..7e638110 --- /dev/null +++ b/site_ansto/instrument/TEST_SICS/fakePfeiffer/tpg261/SIM_PFEIFFER.py @@ -0,0 +1,126 @@ +#!/usr/bin/env python +# vim: ft=python ts=8 sts=4 sw=4 expandtab autoindent smartindent nocindent +# Author: Douglas Clowes (dcl@ansto.gov.au) 2014 + +from twisted.internet.task import LoopingCall +from twisted.internet import reactor +from twisted.python import log, usage + +from Pfeiffer_tpg261 import Pfeiffer_tpg261 as MYBASE +from PfeifferFactory import PfeifferFactory +from PfeifferProtocol import PfeifferProtocol +import os +import sys + +sys.path.insert(0, os.path.realpath(os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]),"../../../util")))) +from displayscreen import Screen + +class MyOptions(usage.Options): + optFlags = [ + ["window", "w", "Create a display window"], + ] + optParameters = [ + ["logfile", "l", None, "output logfile name"], + ["port", "p", None, "port number to listen on"], + ] + def __init__(self): + usage.Options.__init__(self) + self['files'] = [] + + def parseArgs(self, *args): + for arg in args: + self['files'].append(arg) + +class MyScreen(Screen): + def __init__(self, stdscr): + Screen.__init__(self, stdscr) + + def sendLine(self, txt): + global myDev + myDev.protocol = self + myDev.dataReceived(txt) + + def write(self, txt): + try: + newLine = self.lines[-1] + " => " + txt + del self.lines[-1] + self.addLine(newLine) + except: + pass + +class MYDEV(MYBASE): + def __init__(self): + MYBASE.__init__(self) + print MYDEV.__name__, "ctor" + +def device_display(): + global screen, myDev, myOpts, myPort, myFactory + try: + myDev.doIteration(); + except: + raise + + if not myOpts["window"]: + return + + try: + rows, cols = screen.stdscr.getmaxyx() + screen.stdscr.addstr(0, 0, "Lnks:%2d" % myFactory.numProtocols) + screen.stdscr.addstr(0, 10, "Rnd:%6.3f" % myDev.RANDOM) + screen.stdscr.addstr(0, 22, "Identity : %s (%d)" % (myDev.IDN, myPort)) + screen.stdscr.addstr(1, 0, "Pressure :") + screen.stdscr.addstr(1, 12, "%11.4E" % myDev.pressure) + except: + pass + finally: + try: + screen.stdscr.refresh() + except: + pass + +if __name__ == "__main__": + global screen, myDev, myOpts, myPort, myFactory + + myOpts = MyOptions() + try: + myOpts.parseOptions() + except usage.UsageError, errortext: + print '%s: %s' % (sys.argv[0], errortext) + print '%s: Try --help for usage details.' % (sys.argv[0]) + raise SystemExit, 1 + + myDev = MYDEV() + default_port = 4261 + myPort = default_port + logfile = None + + if myOpts["port"]: + myPort = int(myOpts["port"]) + if myPort < 1025 or myPort > 65535: + myPort = default_port + + if myOpts["window"]: + logfile = "/tmp/Fake_Pfeiffer_%d.log" % (myPort) + + if myOpts["logfile"]: + logfile = myOpts["logfile"] + + if logfile: + log.startLogging(open(logfile, "w")) + else: + log.startLogging(sys.stdout) + #log.startLogging(sys.stderr) + + if myOpts["window"]: + import curses + + stdscr = curses.initscr() + screen = MyScreen(stdscr) + # add screen object as a reader to the reactor + reactor.addReader(screen) + + myFactory = PfeifferFactory(PfeifferProtocol, myDev, "\r") + lc = LoopingCall(device_display) + lc.start(0.250) + reactor.listenTCP(myPort, myFactory) # server + reactor.run()