From acf2112fc282114d5a2e7304f99a833292bca3b9 Mon Sep 17 00:00:00 2001 From: Douglas Clowes Date: Thu, 17 Apr 2014 13:13:26 +1000 Subject: [PATCH] Make a fake Oxford Mercury device for testing the script context driver This driver allows PID type control of the needle valve --- .../fakeTempControl/oxford/MercuryDevice.py | 110 ++++++++++++ .../fakeTempControl/oxford/MercuryFactory.py | 47 +++++ .../fakeTempControl/oxford/MercuryProtocol.py | 92 ++++++++++ .../fakeTempControl/oxford/MercurySCPI.py | 163 ++++++++++++++++++ .../fakeTempControl/oxford/SIM_MERCURY.py | 144 ++++++++++++++++ 5 files changed, 556 insertions(+) create mode 100755 site_ansto/instrument/TEST_SICS/fakeTempControl/oxford/MercuryDevice.py create mode 100644 site_ansto/instrument/TEST_SICS/fakeTempControl/oxford/MercuryFactory.py create mode 100755 site_ansto/instrument/TEST_SICS/fakeTempControl/oxford/MercuryProtocol.py create mode 100644 site_ansto/instrument/TEST_SICS/fakeTempControl/oxford/MercurySCPI.py create mode 100755 site_ansto/instrument/TEST_SICS/fakeTempControl/oxford/SIM_MERCURY.py diff --git a/site_ansto/instrument/TEST_SICS/fakeTempControl/oxford/MercuryDevice.py b/site_ansto/instrument/TEST_SICS/fakeTempControl/oxford/MercuryDevice.py new file mode 100755 index 00000000..1a2d9740 --- /dev/null +++ b/site_ansto/instrument/TEST_SICS/fakeTempControl/oxford/MercuryDevice.py @@ -0,0 +1,110 @@ +# vim: ft=python ts=8 sts=4 sw=4 expandtab autoindent smartindent nocindent +# +# Generic Mercury Temperature Controller Device +# +# Author: Douglas Clowes (2014) +# +import inspect +import traceback + +class MercuryDevice(object): + + def __init__(self): + print MercuryDevice.__name__, "ctor" + #print "Methods:", inspect.getmembers(self, inspect.ismethod) + methods = inspect.getmembers(self, inspect.ismethod) + self.myMethods = {} + for method in methods: + self.myMethods[method[0]] = method[1:] + #for method in sorted(self.myMethods): + # print "Method:", method, self.myMethods[method], type(method), type(self.myMethods[method]) + + def reset_powerup(self): + print MercuryDevice.__name__, "reset_powerup" + + def write(self, response): + print "Device Response: %s" % response + self.protocol.write(response) + + def doCommand(self, command, params): + print MercuryDevice.__name__, "Command:", command, params + method = "doCommand%s" % command + if method in self.myMethods: + action = "response = self.%s(command, params)" % method + print "Action:", action + exec action + if response: + return response + else: + print "Unimplemented Command:", command, params + return False + + def doQuery(self, command, params): + print MercuryDevice.__name__, "Query:", command, params + method = "doQuery%s" % command + if method in self.myMethods: + action = "response = self.%s(command, params)" % method + print "Action:", action + exec action + if response: + return response + else: + print "Unimplemented Query:", command, params + self.write("Unimplemented Query: %s" % command) + return False + + def doQueryRANDOM(self, command, params): + self.write("%f" % self.RANDOM) + + def doCommandRANDOM(self, command, params): + self.RANDOM = float(params[0]) + + def mergeParams(self, count, theOld, theNew): + oldParams = theOld.split(",") + if len(oldParams) != count: + raise IndexError + newParams = theNew.split(",") + if len(newParams) > count: + raise IndexError + mergedParams = [] + for idx in range(count): + if oldParams[idx] == '': + raise ValueError + if idx >= len(newParams) or newParams[idx] == '': + mergedParams.append(oldParams[idx]) + else: + mergedParams.append(newParams[idx]) + if len(mergedParams) != count: + raise IndexError + return ",".join(mergedParams) + + def dataReceived(self, data): + print MercuryDevice.__name__, "PDU: \"" + data + "\"" + command = data.split(":")[0] + params = data[len(command)+1:].strip().split(":") + if command[0] == "*": + command = command[1:] + try: + if command in ('READ'): + self.doQuery(command, params) + else: + self.doCommand(command, params) + except: + traceback.print_exc() + +if __name__ == '__main__': + class TestProtocol: + def __init__(self): + print self.__class__.__name__, "ctor" + self.numProtocols = 0 + def write(self, data): + print "test write:", data + def loseConnection(self): + print "test lose connection" + test_protocol = TestProtocol() + test_device = MercuryDevice() + test_device.protocol = test_protocol + test_device.dataReceived("READ:DEV:DB7.T1:TEMP:SIG:TEMP") + test_device.dataReceived("READ:DEV:DB7.T1:TEMP:NICK") + test_device.dataReceived("SET:DEV:DB8.T1:TEMP:LOOP:TSET:273") + test_device.dataReceived("READ:DEV:DB8.T1:TEMP:LOOP:TSET") diff --git a/site_ansto/instrument/TEST_SICS/fakeTempControl/oxford/MercuryFactory.py b/site_ansto/instrument/TEST_SICS/fakeTempControl/oxford/MercuryFactory.py new file mode 100644 index 00000000..3de3257d --- /dev/null +++ b/site_ansto/instrument/TEST_SICS/fakeTempControl/oxford/MercuryFactory.py @@ -0,0 +1,47 @@ +# vim: ft=python ts=8 sts=4 sw=4 expandtab autoindent smartindent nocindent +# Fake Mercury Temperature Controller Factory +# +# Author: Douglas Clowes 2014 +# +from twisted.internet.protocol import ServerFactory + +class MercuryFactory(ServerFactory): + """Factory object used by the Twisted Infrastructure to create a Protocol + object for incomming connections""" + + protocol = None + + def __init__(self, theProtocol, theDevice, theTerminator = "\r\n"): + print MercuryFactory.__name__, "ctor" + self.protocol = theProtocol + self.device = theDevice + self.term = theTerminator + self.numProtocols = 0 + + def buildProtocol(self, addr): + p = self.protocol(self.device, self.term) + p.factory = self + return p + +if __name__ == '__main__': + class TestProtocol: + def __init__(self, theDevice, theTerm = "\r\n"): + self.device = theDevice + self.response = "" + self.term = theTerm + + class TestDevice: + def __init__(self): + pass + + new_factory = MercuryFactory(TestProtocol, TestDevice, "\r\n"); + new_protocol = new_factory.buildProtocol("address") + print "Factory: ", new_factory + print " .protocol", new_factory.protocol + print " .device ", new_factory.device + print " .num_prot", new_factory.numProtocols + print " .term ", new_factory.term + print "Protocol: ", new_protocol + print " .device ", new_protocol.device + print " .response", new_protocol.response + print " .term ", new_protocol.term diff --git a/site_ansto/instrument/TEST_SICS/fakeTempControl/oxford/MercuryProtocol.py b/site_ansto/instrument/TEST_SICS/fakeTempControl/oxford/MercuryProtocol.py new file mode 100755 index 00000000..a4b26a6c --- /dev/null +++ b/site_ansto/instrument/TEST_SICS/fakeTempControl/oxford/MercuryProtocol.py @@ -0,0 +1,92 @@ +# vim: ft=python ts=8 sts=4 sw=4 expandtab autoindent smartindent nocindent +# Fake Mercury Temperature Controller Protocol +# +# Author: Douglas Clowes 2014 +# +from twisted.internet.protocol import Protocol + +class MercuryProtocol(Protocol): + """Protocol object used by the Twisted Infrastructure to handle connections""" + + def __init__(self, theDevice, theTerminator = "\r\n"): + print MercuryProtocol.__name__, "ctor" + self.device = theDevice + self.response = "" + self.term = theTerminator + + def write(self, response): + self.response = self.response + response + + def connectionMade(self): + self.pdu = "" + self.response = "" + self.device.protocol = self + self.factory.numProtocols = self.factory.numProtocols + 1 + print "connectionMade:", self.factory.numProtocols + if self.factory.numProtocols > 2: + print "Too many connections - rejecting" + self.transport.write("Too many connections, try later" + self.term) + self.transport.loseConnection() + else: + self.transport.write(("Welcome connection %d" % self.factory.numProtocols) + self.term) + + def connectionLost(self, reason): + print "connectionLost:", self.factory.numProtocols, reason + self.factory.numProtocols = self.factory.numProtocols - 1 + + def lineReceived(self, data): + print "lineReceived - len:", len(data), data + self.device.protocol = self + self.device.dataReceived(data) + + def dataReceived(self, data): + print "dataReceived - len:", len(data), data + for c in data: + if c == "\r" or c == ";" or c == "\n": + if len(self.pdu) > 0: + self.lineReceived(self.pdu) + self.pdu = "" + if c == ";": + if len(self.response) > 0 and self.response[-1] != ";": + self.response = self.response + ";" + else: + if len(self.response) > 0: + if self.response[-1] == ";": + self.response = self.response[:-1] + if len(self.response) > 0: + print "Protocol Response: %s" % self.response + self.transport.write(self.response + self.term) + self.response = "" + else: + self.pdu = self.pdu + c + +if __name__ == '__main__': + class TestDevice: + def __init__(self): + print self.__class__.__name__, "ctor" + def dataReceived(self, pdu): + print "test device data received:", pdu + self.protocol.write("test device response") + class TestFactory: + def __init__(self): + self.numProtocols = 0 + def write(self, data): + print "test write:", data, + def loseConnection(self): + print "test lose connection" + self.protocol.connectionLost("Factory") + myTerm = "\r\n" + test_device = TestDevice() + test_factory = TestFactory() + test_protocol = MercuryProtocol(test_device, myTerm) + test_factory.protocol = test_protocol + test_protocol.factory = test_factory + test_protocol.transport = test_factory + test_protocol.connectionMade() + test_protocol.connectionMade() + test_protocol.connectionMade() + test_protocol.connectionLost("Dunno") + test_protocol.dataReceived("READ:DEV:DB7.T1:TEMP:SIG:TEMP" + myTerm + "READ:DEV:DB7.T1:TEMP:NICK;READ") + test_protocol.dataReceived(":DEV:DB7.T1:TEMP") + test_protocol.dataReceived(":NICK" + myTerm) + test_protocol.connectionLost("Dunno") diff --git a/site_ansto/instrument/TEST_SICS/fakeTempControl/oxford/MercurySCPI.py b/site_ansto/instrument/TEST_SICS/fakeTempControl/oxford/MercurySCPI.py new file mode 100644 index 00000000..02b4002d --- /dev/null +++ b/site_ansto/instrument/TEST_SICS/fakeTempControl/oxford/MercurySCPI.py @@ -0,0 +1,163 @@ +# vim: ft=python ts=8 sts=4 sw=4 expandtab autoindent smartindent nocindent +# Fake Mercury Temperature Controller +# +# Author: Douglas Clowes 2014 +# +from MercuryDevice import MercuryDevice +import random +import re +import os +import sys +import time + +sys.path.insert(0, os.path.realpath(os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]),"../../../util")))) +from fopdt import fopdt, fopdt_sink +from pid import PID + +class Loop(fopdt): + def __init__(self, the_temp, the_nick): + fopdt.__init__(self, the_temp) + self.setpoint = the_temp + self.sensor = the_temp + # P, I, D + self.pid = PID(0.05, 0.02, 1.2) + self.pid.setPoint(the_temp) + # Value, Kp, Tp, Td, Absolute + self.AddSource(fopdt_sink(0, 2, 13, 10, True)) + self.AddSink(fopdt_sink(the_temp, 1, 30, 1, False)) + self.power = 0 + self.nick = the_nick + self.count = 0 + + def Setpoint(self, the_sp): + self.setpoint = the_sp + self.pid.setPoint(the_sp) + + def doIteration(self): + self.pid_delta = self.pid.update(self.pv) + self.sources[0].value = self.pid_delta + if self.sources[0].value > 100.0: + self.sources[0].value = 100.0 + if self.sources[0].value < 0.0: + self.sources[0].value = 0.0 + self.count += 1 + self.iterate(self.count) + self.sensor = 0.9 * self.sensor + 0.1 * self.setpoint + +class MercurySCPI(MercuryDevice): + """Mercury SCPI temperature controller object - simulates the device""" + + def __init__(self): + MercuryDevice.__init__(self) + print MercurySCPI.__name__, "ctor" + self.RANDOM = 0.0 + self.IDN = "Simulated Mercury SCPI" + self.CONFIG_LOOPS = [1, 2, 3, 4] + self.CONFIG_SNSRS = [1, 2, 3, 4] + self.Loops = {} + self.Loops[1] = self.Loops['MB0'] = self.Loops['MB1'] = Loop(270, "VTI_STD") + self.Loops[2] = self.Loops['DB1'] = self.Loops['DB6'] = Loop(270, "Sample_1") + self.Loops[3] = self.Loops['DB2'] = self.Loops['DB7'] = Loop(270, "Sample_2") + self.Loops[4] = self.Loops['DB3'] = self.Loops['DB8'] = Loop(270, "VTI") + self.valve_open = 0.0 + self.hlev = 92.0 + self.nlev = 87.6 + self.reset_powerup() + + def doCommand(self, command, params): + print MercurySCPI.__name__, "Command:", command, params + return MercuryDevice.doCommand(self, command, params) + + def doQuery(self, command, params): + print MercurySCPI.__name__, "Query:", command, params + return MercuryDevice.doQuery(self, command, params) + + def reset_powerup(self): + print MercurySCPI.__name__, "reset_powerup" + self.LAST_ITERATION = 0 + + def doIteration(self): + delta_time = time.time() - self.LAST_ITERATION + if delta_time < 1: + return + #print "DoIteration:", delta_time + self.LAST_ITERATION = time.time() + for idx in self.CONFIG_LOOPS: + self.Loops[idx].doIteration() + + def doCommandSET(self, cmd, args): + if args[0] != "DEV": + return + key = args[1].split(".")[0] + if key == "DB4": + # Valve + self.valve_open = float(args[5]) + self.write("STAT:SET:" + ":".join(args) + ":VALID") + return + if key in self.Loops: + if args[4] == "TSET": + self.Loops[key].Setpoint(float(args[5])) + self.write("STAT:SET:" + ":".join(args) + ":VALID") + return + self.write("STAT:SET:" + ":".join(args) + ":INVALID") + + def doQueryREAD(self, cmd, args): + if args[0] != "DEV": + return + key = args[1].split(".")[0] + if key == "DB4": + # Valve + self.write("STAT:DEV:DB4.G1:AUX:SIG:OPEN:%7.4f%%" % self.valve_open) + return + if key == "DB5": + # Level + if args[4] == "HEL": + self.write("STAT:DEV:DB5.L1:LVL:SIG:HEL:LEV:%7.4f%%" % self.hlev) + return + if args[4] == "NIT": + self.write("STAT:DEV:DB5.L1:LVL:SIG:NIT:LEV:%7.4f%%" % self.nlev) + return + return + if key in self.Loops: + if args[3] == "NICK": + self.write("STAT:DEV:"+args[1]+":TEMP:NICK:%s" % self.Loops[key].nick) + return + if args[4] == "TSET": + self.write("STAT:DEV:"+args[1]+":TEMP:LOOP:TSET:%g" % self.Loops[key].setpoint) + return + if args[4] == "TEMP": + self.write("STAT:DEV:"+args[1]+":TEMP:SIG:TEMP:%7.4fK" % self.Loops[key].sensor) + return + if args[4] == "POWR": + self.write("STAT:DEV:"+args[1]+":HTR:SIG:POWR:%.4fW" % self.Loops[key].power) + return + self.write("STAT:" + ":".join(args) + ":INVALID") + print "TODO implement Query: \"READ\" in \"" + cmd + ":" + ":".join(args) + "\"" + +if __name__ == '__main__': + from MercuryProtocol import MercuryProtocol + + class TestFactory: + def __init__(self): + print self.__class__.__name__, "ctor" + self.numProtocols = 0 + def write(self, data): + print "test write:", data, + def loseConnection(self): + print "test lose connection" + test_factory = TestFactory() + test_device = MercurySCPI() + test_protocol = MercuryProtocol(test_device, "\r\n") + test_protocol.factory = test_factory + test_protocol.transport = test_factory + test_device.protocol = test_protocol + test_device.protocol.connectionMade() + commands = ["READ:DEV:MB1.T1:TEMP:SIG:TEMP", + "READ:DEV:MB1.T1:TEMP:NICK", + "SET:DEV:MB1.T1:TEMP:LOOP:TSET:274", + "READ:DEV:MB1.T1:TEMP:LOOP:TSET", + "READ:DEV:MB0.H1:HTR:SIG:POWR"] + for cmd in commands: + test_device.protocol.dataReceived(cmd) + test_device.protocol.dataReceived(test_protocol.term) + test_device.protocol.connectionLost("Dunno") diff --git a/site_ansto/instrument/TEST_SICS/fakeTempControl/oxford/SIM_MERCURY.py b/site_ansto/instrument/TEST_SICS/fakeTempControl/oxford/SIM_MERCURY.py new file mode 100755 index 00000000..9b86c5d3 --- /dev/null +++ b/site_ansto/instrument/TEST_SICS/fakeTempControl/oxford/SIM_MERCURY.py @@ -0,0 +1,144 @@ +#!/usr/bin/env python +# vim: ft=python ts=8 sts=4 sw=4 expandtab autoindent smartindent nocindent +# Author: Douglas Clowes (dcl@ansto.gov.au) 2013-06-03 + +from twisted.internet.task import LoopingCall +from twisted.internet import reactor +from twisted.python import log, usage + +from MercurySCPI import MercurySCPI as MYBASE +from MercuryFactory import MercuryFactory +from MercuryProtocol import MercuryProtocol +import os +import sys + +sys.path.insert(0, os.path.realpath(os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]),"../../../util")))) +from displayscreen import Screen + +class MyOptions(usage.Options): + optFlags = [ + ["window", "w", "Create a display window"], + ] + optParameters = [ + ["logfile", "l", None, "output logfile name"], + ["port", "p", None, "port number to listen on"], + ] + def __init__(self): + usage.Options.__init__(self) + self['files'] = [] + + def parseArgs(self, *args): + for arg in args: + self['files'].append(arg) + +class MyScreen(Screen): + def __init__(self, stdscr): + Screen.__init__(self, stdscr) + + def sendLine(self, txt): + global myDev + myDev.protocol = self + myDev.dataReceived(txt) + + def write(self, txt): + try: + newLine = self.lines[-1] + " => " + txt + del self.lines[-1] + self.addLine(newLine) + except: + pass + +class MYDEV(MYBASE): + def __init__(self): + MYBASE.__init__(self) + print MYDEV.__name__, "ctor" + +def device_display(): + global screen, myDev, myOpts, myPort, myFactory + try: + myDev.doIteration(); + except: + raise + + if not myOpts["window"]: + return + + try: + rows, cols = screen.stdscr.getmaxyx() + screen.stdscr.addstr(0, 0, "Lnks:%2d" % myFactory.numProtocols) + screen.stdscr.addstr(0, 10, "Rnd:%6.3f" % myDev.RANDOM) + screen.stdscr.addstr(0, 22, "Identity : %s (%d)" % (myDev.IDN, myPort)) + screen.stdscr.addstr(1, 0, "Valve: %8.4f%%" % myDev.valve_open) + screen.stdscr.addstr(1, 20, "Helium: %8.4f%%" % myDev.hlev) + screen.stdscr.addstr(1, 40, "Nitrogen: %8.4f%%" % myDev.nlev) + base = 1 + screen.stdscr.addstr(base + 1, 0, "Sensor :") + screen.stdscr.addstr(base + 2, 0, "PV :") + screen.stdscr.addstr(base + 3, 0, "Setpoint :") + screen.stdscr.addstr(base + 4, 0, "T Delta :") + screen.stdscr.addstr(base + 5, 0, "PV Delta :") + for idx in myDev.CONFIG_SNSRS: + if 12 + (idx - 1) * 12 > cols - 1: + break + screen.stdscr.addstr(base + 1, 12 + (idx - 1) * 12, "%8.3f" % myDev.Loops[idx].sensor) + for idx in myDev.CONFIG_LOOPS: + if 12 + (idx - 1) * 12 > cols - 1: + break + screen.stdscr.addstr(base + 2, 12 + (idx - 1) * 12, "%8.3f" % myDev.Loops[idx].pv) + screen.stdscr.addstr(base + 3, 12 + (idx - 1) * 12, "%8.3f" % myDev.Loops[idx].setpoint) + screen.stdscr.addstr(base + 4, 12 + (idx - 1) * 12, "%8.3f" % (myDev.Loops[idx].setpoint - myDev.Loops[idx].sensor)) + screen.stdscr.addstr(base + 5, 12 + (idx - 1) * 12, "%8.3f" % (myDev.Loops[idx].setpoint - myDev.Loops[idx].pid_delta)) + except: + pass + finally: + try: + screen.stdscr.refresh() + except: + pass + +if __name__ == "__main__": + global screen, myDev, myOpts, myPort, myFactory + + myOpts = MyOptions() + try: + myOpts.parseOptions() + except usage.UsageError, errortext: + print '%s: %s' % (sys.argv[0], errortext) + print '%s: Try --help for usage details.' % (sys.argv[0]) + raise SystemExit, 1 + + myDev = MYDEV() + default_port = 7020 + myPort = default_port + logfile = None + + if myOpts["port"]: + myPort = int(myOpts["port"]) + if myPort < 1025 or myPort > 65535: + myPort = default_port + + if myOpts["window"]: + logfile = "/tmp/Fake_Mercury_%d.log" % (myPort) + + if myOpts["logfile"]: + logfile = myOpts["logfile"] + + if logfile: + log.startLogging(open(logfile, "w")) + else: + log.startLogging(sys.stdout) + #log.startLogging(sys.stderr) + + if myOpts["window"]: + import curses + + stdscr = curses.initscr() + screen = MyScreen(stdscr) + # add screen object as a reader to the reactor + reactor.addReader(screen) + + myFactory = MercuryFactory(MercuryProtocol, myDev, "\r") + lc = LoopingCall(device_display) + lc.start(0.250) + reactor.listenTCP(myPort, myFactory) # server + reactor.run()