From 2b1e34202d7a8c0d6f891e09bd2791fcbdd38d32 Mon Sep 17 00:00:00 2001 From: Douglas Clowes Date: Fri, 21 Jun 2013 14:27:37 +1000 Subject: [PATCH] implement fake lakeshore 336, 340 and 370 in Python with curses display --- .../fakeTempControl/lakeshore/LS336.py | 124 ++++ .../fakeTempControl/lakeshore/LS340.py | 124 ++++ .../fakeTempControl/lakeshore/LS370.py | 124 ++++ .../fakeTempControl/lakeshore/Lakeshore336.py | 527 +++++++++++++++++ .../fakeTempControl/lakeshore/Lakeshore340.py | 541 ++++++++++++++++++ .../fakeTempControl/lakeshore/Lakeshore370.py | 472 +++++++++++++++ .../lakeshore/LakeshoreDevice.py | 92 +++ .../lakeshore/LakeshoreFactory.py | 43 ++ .../lakeshore/LakeshoreProtocol.py | 88 +++ .../lakeshore/displayscreen.py | 150 +++++ 10 files changed, 2285 insertions(+) create mode 100755 site_ansto/instrument/TEST_SICS/fakeTempControl/lakeshore/LS336.py create mode 100755 site_ansto/instrument/TEST_SICS/fakeTempControl/lakeshore/LS340.py create mode 100755 site_ansto/instrument/TEST_SICS/fakeTempControl/lakeshore/LS370.py create mode 100755 site_ansto/instrument/TEST_SICS/fakeTempControl/lakeshore/Lakeshore336.py create mode 100755 site_ansto/instrument/TEST_SICS/fakeTempControl/lakeshore/Lakeshore340.py create mode 100755 site_ansto/instrument/TEST_SICS/fakeTempControl/lakeshore/Lakeshore370.py create mode 100755 site_ansto/instrument/TEST_SICS/fakeTempControl/lakeshore/LakeshoreDevice.py create mode 100644 site_ansto/instrument/TEST_SICS/fakeTempControl/lakeshore/LakeshoreFactory.py create mode 100755 site_ansto/instrument/TEST_SICS/fakeTempControl/lakeshore/LakeshoreProtocol.py create mode 100644 site_ansto/instrument/TEST_SICS/fakeTempControl/lakeshore/displayscreen.py diff --git a/site_ansto/instrument/TEST_SICS/fakeTempControl/lakeshore/LS336.py b/site_ansto/instrument/TEST_SICS/fakeTempControl/lakeshore/LS336.py new file mode 100755 index 00000000..86023cb4 --- /dev/null +++ b/site_ansto/instrument/TEST_SICS/fakeTempControl/lakeshore/LS336.py @@ -0,0 +1,124 @@ +#!/usr/bin/env python +# vim: ts=8 sts=4 sw=4 expandtab +# Author: Douglas Clowes (dcl@ansto.gov.au) 2013-06-03 + +from twisted.internet.task import LoopingCall +from twisted.internet import reactor +from twisted.python import log, usage + +from Lakeshore336 import Lakeshore336 as MYBASE +from LakeshoreFactory import LakeshoreFactory +from LakeshoreProtocol import LakeshoreProtocol +import sys + +from displayscreen import Screen + +class MyOptions(usage.Options): + optFlags = [ + ["window", "w", "Create a display window"], + ] + optParameters = [ + ["logfile", "l", None, "output logfile name"], + ["port", "p", None, "port number to listen on"], + ] + def __init__(self): + usage.Options.__init__(self) + self['files'] = [] + + def parseArgs(self, *args): + for arg in args: + self['files'].append(arg) + +class MyScreen(Screen): + def __init__(self, stdscr): + Screen.__init__(self, stdscr) + + def sendLine(self, txt): + global myDev + myDev.protocol = self + myDev.dataReceived(txt) + + def write(self, txt): + try: + newLine = self.lines[-1] + " => " + txt + del self.lines[-1] + self.addLine(newLine) + except: + pass + +class MYDEV(MYBASE): + def __init__(self): + MYBASE.__init__(self) + print MYDEV.__name__, "ctor" + +def device_display(): + global stdscr, myDev, myOpts, myPort + try: + myDev.doIteration(); + except: + raise + + if not myOpts["window"]: + return + + try: + stdscr.addstr(0, 25, "Identity : %s (%d)" % (myDev.IDN, myPort)) + stdscr.addstr(1, 25, "Temperature: %8.3f" % myDev.KRDG[1]) + stdscr.addstr(2, 25, "Setpoint : %8.3f" % myDev.SETP[1]) + stdscr.addstr(3, 25, "Difference : %8.3f" % (myDev.KRDG[1] - myDev.SETP[1])) + stdscr.addstr(5, 25, "Target : %8.3f" % myDev.TARGET[1]) + stdscr.addstr(6, 25, "Ramp Rate : %8.3f" % myDev.RAMP_RATE[1]) + stdscr.addstr(7, 25, "Ramp On : %8s" % ["No", "Yes"][myDev.RAMP_ON[1]]) + stdscr.addstr(8, 25, "Ramping : %8s" % ["No", "Yes"][myDev.RAMP_ST[1]]) + stdscr.addstr(0, 0, "Random : %8.3f" % myDev.RANDOM) + stdscr.refresh() + except: + raise + +if __name__ == "__main__": + global stdscr, myDev, myOpts, myPort + + myOpts = MyOptions() + try: + myOpts.parseOptions() + except usage.UsageError, errortext: + print '%s: %s' % (sys.argv[0], errortext) + print '%s: Try --help for usage details.' % (sys.argv[0]) + raise SystemExit, 1 + + myDev = MYDEV() + digits = myDev.IDN[10:13] + default_port = 7000 + int(digits) + myPort = default_port + logfile = None + + if myOpts["port"]: + myPort = int(myOpts["port"]) + if myPort < 1025 or myPort > 65535: + myPort = default_port + + if myOpts["window"]: + logfile = "/tmp/Fake_LS%s_%d.log" % (digits, myPort) + + if myOpts["logfile"]: + logfile = myOpts["logfile"] + + if logfile: + log.startLogging(open(logfile, "w")) + else: + log.startLogging(sys.stdout) + #log.startLogging(sys.stderr) + + if myOpts["window"]: + import curses + + stdscr = curses.initscr() + screen = MyScreen(stdscr) + # add screen object as a reader to the reactor + reactor.addReader(screen) + + myFactory = LakeshoreFactory(LakeshoreProtocol, myDev, "\r") + lc = LoopingCall(device_display) + lc.start(0.250) + reactor.listenTCP(myPort, myFactory) # server + reactor.run() diff --git a/site_ansto/instrument/TEST_SICS/fakeTempControl/lakeshore/LS340.py b/site_ansto/instrument/TEST_SICS/fakeTempControl/lakeshore/LS340.py new file mode 100755 index 00000000..9c052f52 --- /dev/null +++ b/site_ansto/instrument/TEST_SICS/fakeTempControl/lakeshore/LS340.py @@ -0,0 +1,124 @@ +#!/usr/bin/env python +# vim: ts=8 sts=4 sw=4 expandtab +# Author: Douglas Clowes (dcl@ansto.gov.au) 2013-06-03 + +from twisted.internet.task import LoopingCall +from twisted.internet import reactor +from twisted.python import log, usage + +from Lakeshore340 import Lakeshore340 as MYBASE +from LakeshoreFactory import LakeshoreFactory +from LakeshoreProtocol import LakeshoreProtocol +import sys + +from displayscreen import Screen + +class MyOptions(usage.Options): + optFlags = [ + ["window", "w", "Create a display window"], + ] + optParameters = [ + ["logfile", "l", None, "output logfile name"], + ["port", "p", None, "port number to listen on"], + ] + def __init__(self): + usage.Options.__init__(self) + self['files'] = [] + + def parseArgs(self, *args): + for arg in args: + self['files'].append(arg) + +class MyScreen(Screen): + def __init__(self, stdscr): + Screen.__init__(self, stdscr) + + def sendLine(self, txt): + global myDev + myDev.protocol = self + myDev.dataReceived(txt) + + def write(self, txt): + try: + newLine = self.lines[-1] + " => " + txt + del self.lines[-1] + self.addLine(newLine) + except: + pass + +class MYDEV(MYBASE): + def __init__(self): + MYBASE.__init__(self) + print MYDEV.__name__, "ctor" + +def device_display(): + global stdscr, myDev, myOpts, myPort + try: + myDev.doIteration(); + except: + raise + + if not myOpts["window"]: + return + + try: + stdscr.addstr(0, 25, "Identity : %s (%d)" % (myDev.IDN, myPort)) + stdscr.addstr(1, 25, "Temperature: %8.3f" % myDev.KRDG[1]) + stdscr.addstr(2, 25, "Setpoint : %8.3f" % myDev.SETP[1]) + stdscr.addstr(3, 25, "Difference : %8.3f" % (myDev.KRDG[1] - myDev.SETP[1])) + stdscr.addstr(5, 25, "Target : %8.3f" % myDev.TARGET[1]) + stdscr.addstr(6, 25, "Ramp Rate : %8.3f" % myDev.RAMP_RATE[1]) + stdscr.addstr(7, 25, "Ramp On : %8s" % ["No", "Yes"][myDev.RAMP_ON[1]]) + stdscr.addstr(8, 25, "Ramping : %8s" % ["No", "Yes"][myDev.RAMP_ST[1]]) + stdscr.addstr(0, 0, "Random : %8.3f" % myDev.RANDOM) + stdscr.refresh() + except: + raise + +if __name__ == "__main__": + global stdscr, myDev, myOpts, myPort + + myOpts = MyOptions() + try: + myOpts.parseOptions() + except usage.UsageError, errortext: + print '%s: %s' % (sys.argv[0], errortext) + print '%s: Try --help for usage details.' % (sys.argv[0]) + raise SystemExit, 1 + + myDev = MYDEV() + digits = myDev.IDN[10:13] + default_port = 7000 + int(digits) + myPort = default_port + logfile = None + + if myOpts["port"]: + myPort = int(myOpts["port"]) + if myPort < 1025 or myPort > 65535: + myPort = default_port + + if myOpts["window"]: + logfile = "/tmp/Fake_LS%s_%d.log" % (digits, myPort) + + if myOpts["logfile"]: + logfile = myOpts["logfile"] + + if logfile: + log.startLogging(open(logfile, "w")) + else: + log.startLogging(sys.stdout) + #log.startLogging(sys.stderr) + + if myOpts["window"]: + import curses + + stdscr = curses.initscr() + screen = MyScreen(stdscr) + # add screen object as a reader to the reactor + reactor.addReader(screen) + + myFactory = LakeshoreFactory(LakeshoreProtocol, myDev, "\r") + lc = LoopingCall(device_display) + lc.start(0.250) + reactor.listenTCP(myPort, myFactory) # server + reactor.run() diff --git a/site_ansto/instrument/TEST_SICS/fakeTempControl/lakeshore/LS370.py b/site_ansto/instrument/TEST_SICS/fakeTempControl/lakeshore/LS370.py new file mode 100755 index 00000000..f74cc779 --- /dev/null +++ b/site_ansto/instrument/TEST_SICS/fakeTempControl/lakeshore/LS370.py @@ -0,0 +1,124 @@ +#!/usr/bin/env python +# vim: ts=8 sts=4 sw=4 expandtab +# Author: Douglas Clowes (dcl@ansto.gov.au) 2013-06-03 + +from twisted.internet.task import LoopingCall +from twisted.internet import reactor +from twisted.python import log, usage + +from Lakeshore370 import Lakeshore370 as MYBASE +from LakeshoreFactory import LakeshoreFactory +from LakeshoreProtocol import LakeshoreProtocol +import sys + +from displayscreen import Screen + +class MyOptions(usage.Options): + optFlags = [ + ["window", "w", "Create a display window"], + ] + optParameters = [ + ["logfile", "l", None, "output logfile name"], + ["port", "p", None, "port number to listen on"], + ] + def __init__(self): + usage.Options.__init__(self) + self['files'] = [] + + def parseArgs(self, *args): + for arg in args: + self['files'].append(arg) + +class MyScreen(Screen): + def __init__(self, stdscr): + Screen.__init__(self, stdscr) + + def sendLine(self, txt): + global myDev + myDev.protocol = self + myDev.dataReceived(txt) + + def write(self, txt): + try: + newLine = self.lines[-1] + " => " + txt + del self.lines[-1] + self.addLine(newLine) + except: + pass + +class MYDEV(MYBASE): + def __init__(self): + MYBASE.__init__(self) + print MYDEV.__name__, "ctor" + +def device_display(): + global stdscr, myDev, myOpts, myPort + try: + myDev.doIteration(); + except: + raise + + if not myOpts["window"]: + return + + try: + stdscr.addstr(0, 25, "Identity : %s (%d)" % (myDev.IDN, myPort)) + stdscr.addstr(1, 25, "Temperature: %8.3f" % myDev.KRDG[1]) + stdscr.addstr(2, 25, "Setpoint : %8.3f" % myDev.SETP[1]) + stdscr.addstr(3, 25, "Difference : %8.3f" % (myDev.KRDG[1] - myDev.SETP[1])) + stdscr.addstr(5, 25, "Target : %8.3f" % myDev.TARGET[1]) + stdscr.addstr(6, 25, "Ramp Rate : %8.3f" % myDev.RAMP_RATE[1]) + stdscr.addstr(7, 25, "Ramp On : %8s" % ["No", "Yes"][myDev.RAMP_ON[1]]) + stdscr.addstr(8, 25, "Ramping : %8s" % ["No", "Yes"][myDev.RAMP_ST[1]]) + stdscr.addstr(0, 0, "Random : %8.3f" % myDev.RANDOM) + stdscr.refresh() + except: + raise + +if __name__ == "__main__": + global stdscr, myDev, myOpts, myPort + + myOpts = MyOptions() + try: + myOpts.parseOptions() + except usage.UsageError, errortext: + print '%s: %s' % (sys.argv[0], errortext) + print '%s: Try --help for usage details.' % (sys.argv[0]) + raise SystemExit, 1 + + myDev = MYDEV() + digits = myDev.IDN[10:13] + default_port = 7000 + int(digits) + myPort = default_port + logfile = None + + if myOpts["port"]: + myPort = int(myOpts["port"]) + if myPort < 1025 or myPort > 65535: + myPort = default_port + + if myOpts["window"]: + logfile = "/tmp/Fake_LS%s_%d.log" % (digits, myPort) + + if myOpts["logfile"]: + logfile = myOpts["logfile"] + + if logfile: + log.startLogging(open(logfile, "w")) + else: + log.startLogging(sys.stdout) + #log.startLogging(sys.stderr) + + if myOpts["window"]: + import curses + + stdscr = curses.initscr() + screen = MyScreen(stdscr) + # add screen object as a reader to the reactor + reactor.addReader(screen) + + myFactory = LakeshoreFactory(LakeshoreProtocol, myDev, "\r") + lc = LoopingCall(device_display) + lc.start(0.250) + reactor.listenTCP(myPort, myFactory) # server + reactor.run() diff --git a/site_ansto/instrument/TEST_SICS/fakeTempControl/lakeshore/Lakeshore336.py b/site_ansto/instrument/TEST_SICS/fakeTempControl/lakeshore/Lakeshore336.py new file mode 100755 index 00000000..7ebdba3d --- /dev/null +++ b/site_ansto/instrument/TEST_SICS/fakeTempControl/lakeshore/Lakeshore336.py @@ -0,0 +1,527 @@ +# vim: ts=8 sts=4 sw=4 expandtab +# Fake Lakeshore Model 336 Temperature Controller +# +# Author: Douglas Clowes 2012, 2013 +# +from LakeshoreDevice import LakeshoreDevice +import random +import re +import sys +import time + +class Lakeshore336(LakeshoreDevice): + """Lakeshore 336 temperature controller object - simulates the LS336""" + + def __init__(self): + LakeshoreDevice.__init__(self) + print Lakeshore336.__name__, "ctor" + self.CONFIG_LOOPS = [1, 2, 3, 4] + self.CONFIG_SNSRS = [1, 2, 3, 4] + self.reset_powerup() + + def doCommand(self, command, params): + print Lakeshore336.__name__, "Command:", command, params + return LakeshoreDevice.doCommand(self, command, params) + + def doQuery(self, command, params): + print Lakeshore336.__name__, "Query:", command, params + return LakeshoreDevice.doQuery(self, command, params) + + def reset_powerup(self): + print Lakeshore336.__name__, "reset_powerup" + self.LAST_ITERATION = 0 + self.ALARM = {} + self.ALARMST = {} + for idx in self.CONFIG_SNSRS: + self.ALARM[idx] = "0" + self.ALARMST[idx] = "0" + self.ANALOG = {1: "0,1,1,1,400.0,0.0,0.0", 2: "0,1,1,1,400.0,0.0,0.0"} + self.AOUT = { 1: 0.0, 2: 0.0 } + self.CFILT = {1: 1, 2: 1} + self.CLIMI = 0.0 + self.CLIMIT = {1: "400.0,10,0,0", 2: "400.0,10,0,0"} + self.CMODE = {} + for idx in self.CONFIG_LOOPS: + self.CMODE[idx] = 1 + self.CRVHDR = {} + self.CRVHDR[1] = "DT-336-1 ,STANDARD ,1,+500.000,1" + self.CRVHDR[2] = "DT-336-2 ,STANDARD ,2,+0.500,1" + self.CRVHDR[3] = "DT-336-3 ,STANDARD ,3,+2.000,2" + self.CRVHDR[4] = "DT-336-4 ,STANDARD ,4,+0.301,2" + self.CRVPT = {} + for i in range(1,21): + self.CRVPT[i] = [(0.0, 0.0)] + self.CSET = {1: "A,1,1,0", 2: "B,1,1,0"} + self.DOUT = 0 + self.FILTER = {1: "1,10,2"} + self.FREQ = 2 + self.GUARD = 0 + self.HTR = 0.0 + self.HTRRNG = 0 + self.HTRST = 0 + self.IDN = "LSCI,MODEL336,123456/123456,1.0" + self.IEEE = "0,0,4" + self.INCRV = {} + self.INTYPE = {} + for idx in self.CONFIG_SNSRS: + self.INCRV[idx] = "1" + self.INTYPE[idx] = "1,0,1,0,1" + self.KEYST = 1 + self.KRDG = {} + for idx in self.CONFIG_SNSRS: + self.KRDG[idx] = 300.0 + self.LOCK = "0,000" + self.MDAT = {1: "0,0,1"} # Min,Max,Reset + self.MOUT = {1: "0.0", 2: "0.0", 3: "0.0", 4: "0.0"} + self.OUTMODE = {1: "1,1,0", 2: "1,2,0", 3: "1,3,0", 4: "1,4,0"} + self.PID = {1: "+0150.0,+0005.0,+000.0", 2: "+0150.0,+0005.0,+000.0"} + self.RAMP_ON = {1: 0, 2: 0} + self.RAMP_RATE = {1: 0.000, 2: 0.000} + self.RAMP_ST = {1: 0, 2:0} + self.RAMP_TIME = 0.0 + self.RANGE = {} + for idx in range(1,5): + self.RANGE[idx] = "1" + self.RDGST = {"A": 0, "B": 0, "C": 0, "D": 0} + self.RELAY = {1: "1,A,0", 2: "2,A,0"} + self.RELAYST = {1: "0", 2: "0"} + self.SETP = {} + self.TARGET = {} + self.RAMP_START_TEMP = {} + self.RAMP_START_TIME = {} + for idx in self.CONFIG_LOOPS: + self.SETP[idx] = 300.0 + self.TARGET[idx] = 300.0 + self.RAMP_START_TEMP[idx] = 300.0 + self.RAMP_START_TIME[idx] = 0.0 + self.STB = 0 + self.ESE = 0 + self.ESR = 0 + self.OPC = 0 + self.SRE = 0 + self.TST = 0 + self.RANDOM = 0.5 + + def doIteration(self): + delta_time = time.time() - self.LAST_ITERATION + if delta_time < 1: + return + #print "DoIteration:", delta_time + self.LAST_ITERATION = time.time() + for idx in self.CONFIG_LOOPS: + # TODO - progress ramping setpoints (SP) + if idx in self.RAMP_ON and self.RAMP_ON[idx] and self.TARGET[idx] != self.SETP[idx]: + delta_time = time.time() - self.RAMP_START_TIME[idx]; + delta_temp = self.RAMP_RATE[idx] * (delta_time / 60.0) + if self.TARGET[idx] > self.RAMP_START_TEMP[idx]: + self.SETP[idx] = self.RAMP_START_TEMP[idx] + delta_temp + if self.SETP[idx] >= self.TARGET[idx]: + self.SETP[idx] = self.TARGET[idx] + self.RAMP_ST[idx] = 0 + else: + self.SETP[idx] = self.RAMP_START_TEMP[idx] - delta_temp + if self.SETP[idx] <= self.TARGET[idx]: + self.SETP[idx] = self.TARGET[idx] + self.RAMP_ST[idx] = 0 + + # TODO - iterate Power Level + if self.KRDG[idx] <> self.SETP[idx]: + self.HTR = self.SETP[idx] - self.KRDG[idx] + if self.HTR > 100.0: + self.HTR = 100.0 + elif self.HTR < -100.0: + self.HTR = -100.0 + + # TODO - iterate Process Values (PV) + self.KRDG[idx] = (0.9 * self.KRDG[idx] + 0.1 * self.SETP[idx]) + + def doCommandCLS(self, cmd, args): + print "Unimplemented Command: \"*CLS\" in \"" + cmd + " " + ",".join(args) + "\"" + def doCommandESE(self, cmd, args): + self.ESE = int(args[0]) & 0xFF + def doQueryESE(self, cmd, args): + self.write("%d" % self.ESE) + def doQueryESR(self, cmd, args): + self.write("%d" % self.ESR) + def doQueryIDN(self, cmd, args): + self.write(self.IDN) + def doCommandOPC(self, cmd, args): + self.OPC = 1 + def doQueryOPC(self, cmd, args): + self.write("1") + def doCommandRST(self, cmd, args): + print "Unimplemented Command: \"*RST\" in \"" + cmd + " " + ",".join(args) + "\"" + def doCommandSRE(self, cmd, args): + self.SRE = int(args[0]) & 0xFF + def doQuerySRE(self, cmd, args): + self.write("%d" % self.SRE) + def doQuerySTB(self, cmd, args): + self.write("%d" % self.STB) + def doQueryTST(self, cmd, args): + self.write("%d" % self.TST) + def doCommandWAI(self, cmd, args): + print "Unimplemented Command: \"*WAI\" in \"" + cmd + " " + ",".join(args) + "\"" + def doCommandALARM(self, cmd, args): + if len(args) > 0: + idx = ord(args[0]) - 64 + params = args[1:].join(",") + if idx in self.ALARM: + self.ALARM[idx] = params + def doQueryALARM(self, cmd, args): + if len(args) > 0: + idx = ord(args[0]) - 64 + if idx in self.ALARM: + self.write(self.ALARM[idx]) + else: + self.write("0,1,500.0,0.0,0") + def doQueryALARMST(self, cmd, args): + if len(args) > 0: + idx = ord(args[0]) - 64 + if idx in self.ALARMST: + self.write(self.ALARMST[idx]) + else: + self.write("0,0") + def doCommandALMRST(self, cmd, args): + for key in self.ALARMST.keys(): + self.ALARMST[key] = "0,0" + def doCommandANALOG(self, cmd, args): + key = args[0] + if key < 1 or key > 2: + key = 1 + self.ANALOG[key] = ",".join(args[1:]) + print "TODO implement Command: \"ANALOG\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQueryANALOG(self, cmd, args): + key = args[0] + if key < 1 or key > 2: + key = 1 + self.write(self.ANALOG[key]) # TODO + print "TODO implement Query: \"ANALOG?\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQueryAOUT(self, cmd, args): + key = args[0] + if key < 1 or key > 2: + key = 1 + self.write("%6.3f" % self.AOUT[key]) # TODO + print "TODO implement Query: \"AOUT?\" in \"" + cmd + " " + ",".join(args) + "\"" + def doCommandBEEP(self, cmd, args): + print "Unimplemented Command: \"BEEP\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQueryBEEP(self, cmd, args): + print "Unimplemented Query: \"BEEP?\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQueryBUSY(self, cmd, args): + self.write("0") + def doCommandCDISP(self, cmd, args): + print "Unimplemented Command: \"CDISP\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQueryCDISP(self, cmd, args): + print "Unimplemented Query: \"CDISP?\" in \"" + cmd + " " + ",".join(args) + "\"" + def doCommandCFILT(self, cmd, args): + loop = int(args[0]) + if loop < 1: + loop = 1 + if loop > 2: + loop = 2 + self.CFILT[loop] = int(args[1]) + def doQueryCFILT(self, cmd, args): + loop = int(args[0]) + if loop < 1: + loop = 1 + if loop > 2: + loop = 2 + self.write("%d" % self.CFILT[loop]) + def doCommandCLIMI(self, cmd, args): + self.CLIMI = double(args[0]) + def doQueryCLIMI(self, cmd, args): + self.write("%f" % self.CLIMI) + def doCommandCLIMIT(self, cmd, args): + loop = int(args[0]) + if loop < 1: + loop = 1 + if loop > 2: + loop = 2 + self.CLIMIT[loop] = ",".join(args[1:]) + def doQueryCLIMIT(self, cmd, args): + loop = int(args[0]) + if loop < 1: + loop = 1 + if loop > 2: + loop = 2 + self.write("%s" % self.CLIMIT[loop]) + def doCommandCMODE(self, cmd, args): + loop = int(args[0]) + if loop in self.CMODE: + self.CMODE[loop] = int(args[1]) + def doQueryCMODE(self, cmd, args): + loop = int(args[0]) + if loop in self.CMODE: + self.write("%f" % self.CMODE[loop]) + def doCommandCOMM(self, cmd, args): + self.COMM = ",".join(args) + def doQueryCOMM(self, cmd, args): + self.write("%f" % self.COMM) + def doQueryCRDG(self, cmd, args): + loop = ord(args[0]) - 64 + if loop < 1: + loop = 1 + if loop > 4: + loop = 4 + self.write("%f" % (self.KRDG[loop] - 273.15 + self.RANDOM)) + def doCommandCRVDEL(self, cmd, args): + print "Unimplemented Command: \"CRVDEL\" in \"" + cmd + " " + ",".join(args) + "\"" + def doCommandCRVHDR(self, cmd, args): + print "Unimplemented Command: \"CRVHDR\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQueryCRVHDR(self, cmd, args): + key = int(args[0]) + if key in self.CRVHDR: + self.write(self.CRVHDR[key]) + else: + self.write("DT-336-1 ,STANDARD ,1,+500.000,1") # TODO + print "TODO implement Query: \"CRVHDR?\" in \"" + cmd + " " + ",".join(args) + "\"" + def doCommandCRVPT(self, cmd, args): + key = int(args[0]) + if key < 1 or key > 20: + key = 1 + idx = int(args[1]) + if idx < 1 or idx > 200: + idx = 1 + # TODO set the Curve Point + print "TODO implement Command: \"CRVPT\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQueryCRVPT(self, cmd, args): + key = int(args[0]) + if key < 1 or key > 20: + key = 1 + idx = int(args[1]) + if idx < 1 or idx > 200: + idx = 1 + self.write("1.0E+01,1.0+E02") # TODO + print "TODO implement Query: \"CRVPT?\" in \"" + cmd + " " + ",".join(args) + "\"" + def doCommandCRVSAV(self, cmd, args): + pass + def doCommandCSET(self, cmd, args): + loop = int(args[0]) + if loop < 1: + loop = 1 + if loop > 2: + loop = 2 + self.CSET[loop] = ",".join(args[1:]) + def doQueryCSET(self, cmd, args): + loop = int(args[0]) + if loop < 1: + loop = 1 + if loop > 2: + loop = 2 + self.write("%s" % self.CSET[loop]) + def doCommandDFLT(self, cmd, args): + if args[0] == "99": + print "Unimplemented Command: \"DFLT 99\" in \"" + cmd + " " + ",".join(args) + "\"" + else: + print "Invalid Command: \"DFLT " + args[0] + "\" in \"" + cmd + " " + ",".join(args) + "\"" + def doCommandDISPLAY(self, cmd, args): + print "Unimplemented Command: \"DISPLAY\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQueryDISPLAY(self, cmd, args): + print "Unimplemented Query: \"DISPLAY?\" in \"" + cmd + " " + ",".join(args) + "\"" + def doCommandDISPLOC(self, cmd, args): + print "Unimplemented Command: \"DISPLOC\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQueryDISPLOC(self, cmd, args): + print "Unimplemented Query: \"DISPLOC?\" in \"" + cmd + " " + ",".join(args) + "\"" + def doCommandDOUT(self, cmd, args): + self.DOUT = int(args[0]) + def doQueryDOUT(self, cmd, args): + self.write("%d" % self.DOUT) + def doCommandFILTER(self, cmd, args): + if len(args) > 0: + if int(args[0]) in self.FILTER: + keys = [int(args)] + else: + keys = self.FILTER.keys() + params = args[1:].join(",") + for key in keys: + self.FILTER[key] = params + def doQueryFILTER(self, cmd, args): + idx = int(args[0]) + if idx in self.FILTER: + self.write(self.FILTER[idx]) + else: + self.write("0") + def doQueryHTR(self, cmd, args): + self.write("%f" % self.HTR) + def doQueryHTRST(self, cmd, args): + self.write("%d" % self.HTRST) + def doQueryIEEE(self, cmd, args): + self.write("%s" % self.IEEE) + def doCommandIEEE(self, cmd, args): + self.IEEE = args[0] + def doQueryINCRV(self, cmd, args): + idx = ord(args[0]) - 64 + if idx in self.INCRV: + self.write(self.INCRV[idx]) + else: + self.write("0") + def doCommandINCRV(self, cmd, args): + if len(args) > 1: + idx = ord(args[0]) - 64 + if idx in self.INCRV: + for key in keys: + self.INCRV[key] = args[1] + def doQueryINTYPE(self, cmd, args): + idx = ord(args[0]) - 64 + if idx in self.INTYPE: + self.write(self.INTYPE[idx]) + else: + self.write("0") + def doCommandINTYPE(self, cmd, args): + if len(args) > 1: + idx = ord(args[0]) - 64 + if idx in self.INTYPE: + for key in keys: + self.INTYPE[key] = ",".join(args[1:]) + def doQueryKEYST(self, cmd, args): + self.write("%d" % self.KEYST) + self.KEYST = 0 + def doQueryKRDG(self, cmd, args): + loop = ord(args[0]) - 64 + if loop < 1: + loop = 1 + if loop > 4: + loop = 4 + self.write("%f" % (self.KRDG[loop] + self.RANDOM)) + def doQueryLDAT(self, cmd, args): + self.write("3.000E+02") # TODO + print "TODO implement Query: \"LDAT?\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQueryLINEAR(self, cmd, args): + self.write("1,1.0,1,3") # TODO + print "TODO implement Query: \"LINEAR?\" in \"" + cmd + " " + ",".join(args) + "\"" + def doCommandLINEAR(self, cmd, args): + print "Unimplemented Command: \"LINEAR\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQueryLOCK(self, cmd, args): + self.write("%s" % self.LOCK) + def doCommandLOCK(self, cmd, args): + self.LOCK = args[0] + def doQueryMDAT(self, cmd, args): + response = "0" + if len(args[0]) > 0: + idx = int(args[0]) + if idx in self.MDAT: + (minv, maxv, reset) = self.MDAT[idx].split(",") + response = "%f,%f" % (float(minv), float(maxv)) + self.write(response) + def doQueryMNMX(self, cmd, args): + self.write("1") # TODO + print "TODO implement Query: \"MNMX?\" in \"" + cmd + " " + ",".join(args) + "\"" + def doCommandMNMX(self, cmd, args): + print "Unimplemented Command: \"MNMX\" in \"" + cmd + " " + ",".join(args) + "\"" + def doCommandMNMXRST(self, cmd, args): + print "Unimplemented Command: \"MNMXRST\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQueryMODE(self, cmd, args): + print "Unimplemented Query: \"MODE?\" in \"" + cmd + " " + ",".join(args) + "\"" + def doCommandMODE(self, cmd, args): + print "Unimplemented Command: \"MODE\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQueryMONITOR(self, cmd, args): + print "Unimplemented Query: \"MONITOR?\" in \"" + cmd + " " + ",".join(args) + "\"" + def doCommandMONITOR(self, cmd, args): + print "Unimplemented Command: \"MONITOR\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQueryMOUT(self, cmd, args): + idx = int(args[0]) + self.write(self.MOUT[idx]) + def doCommandMOUT(self, cmd, args): + print "Unimplemented Command: \"MOUT\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQueryOUTMODE(self, cmd, args): + idx = int(args[0]) + if idx in self.OUTMODE: + self.write(self.OUTMODE[idx]) + else: + self.write(self.OUTMODE[1]) + def doCommandOUTMODE(self, cmd, args): + print "Unimplemented Command: \"OUTMODE\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQueryPID(self, cmd, args): + idx = int(args[0]) + self.write(self.PID[idx]) + def doCommandPID(self, cmd, args): + print "Unimplemented Command: \"PID\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQueryRANGE(self, cmd, args): + idx = int(args[0]) + self.write(self.RANGE[idx]) + def doCommandRANGE(self, cmd, args): + idx = int(args[0]) + val = int(args[1]) + self.RANGE[idx] = val + def doQueryRAMP(self, cmd, args): + idx = int(args[0]) + response = "%d,%f" % (self.RAMP_ON[idx], self.RAMP_RATE[idx]) + self.write(response) + def doCommandRAMP(self, cmd, args): + idx = int(args[0]) + ramp_on = int(args[1]) + if ramp_on == 0 or ramp_on == 1: + self.RAMP_ON[idx] = ramp_on + if ramp_on == 1: + ramp_rate = float(args[2]) + if ramp_rate >= 0.001 and ramp_rate <= 100.0: + self.RAMP_RATE[idx] = ramp_rate + def doQueryRAMPST(self, cmd, args): + idx = int(args[0]) + response = "%d" % self.RAMP_ST[idx] + self.write(response) + def doQueryRDGPWR(self, cmd, args): + self.write("1.000E-15") # TODO + print "TODO implement Query: \"RDGPWR?\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQueryRDGR(self, cmd, args): + self.write("1.000E+03") # TODO + print "TODO implement Query: \"RDGR?\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQueryRDGRNG(self, cmd, args): + self.write("1,1,19,0,0") # TODO + print "TODO implement Query: \"RDGRNG?\" in \"" + cmd + " " + ",".join(args) + "\"" + def doCommandRDGRNG(self, cmd, args): + print "Unimplemented Command: \"RDGRNG\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQueryRDGST(self, cmd, args): + self.write("000") + print "TODO implement Query: \"RDGST?\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQueryRELAY(self, cmd, args): + idx = int(args[0]) + self.write(self.RELAY[idx]) + def doCommandRELAY(self, cmd, args): + idx = int(args[0]) + if idx in self.RELAY: + self.relay[idx] = ",".join(args[1:]) + def doQueryRELAYST(self, cmd, args): + idx = int(args[0]) + self.write(self.RELAYST[idx]) + def doQuerySETP(self, cmd, args): + idx = int(args[0]) + val = self.SETP[idx] + self.write("%f" % val) + def doCommandSETP(self, cmd, args): + idx = int(args[0]) + val = float(args[1]) + if (val >= 0.0 and val <= 500.0): + self.TARGET[idx] = val + if self.RAMP_ON[idx] == 0: + self.SETP[idx] = val + self.RAMP_ST[idx] = 0 + else: + self.RAMP_START_TEMP[idx] = self.SETP[idx] + self.RAMP_START_TIME[idx] = time.time() + self.RAMP_ST[idx] = 1 + def doQueryZONE(self, cmd, args): + print "Unimplemented Query: \"ZONE?\" in \"" + cmd + " " + ",".join(args) + "\"" + def doCommandZONE(self, cmd, args): + print "Unimplemented Command: \"ZONE\" in \"" + cmd + " " + ",".join(args) + "\"" + +if __name__ == '__main__': + from LakeshoreProtocol import LakeshoreProtocol + + class TestFactory: + def __init__(self): + print self.__class__.__name__, "ctor" + self.numProtocols = 0 + def write(self, data): + print "test write:", data, + def loseConnection(self): + print "test lose connection" + test_factory = TestFactory() + test_device = Lakeshore336() + test_protocol = LakeshoreProtocol(test_device, "\r\n") + test_protocol.factory = test_factory + test_protocol.transport = test_factory + test_device.protocol = test_protocol + test_device.protocol.connectionMade() + test_device.protocol.dataReceived("*IDN?") + test_device.protocol.dataReceived("*TST?") + test_device.protocol.connectionLost("Dunno") diff --git a/site_ansto/instrument/TEST_SICS/fakeTempControl/lakeshore/Lakeshore340.py b/site_ansto/instrument/TEST_SICS/fakeTempControl/lakeshore/Lakeshore340.py new file mode 100755 index 00000000..8b97c5bf --- /dev/null +++ b/site_ansto/instrument/TEST_SICS/fakeTempControl/lakeshore/Lakeshore340.py @@ -0,0 +1,541 @@ +# vim: ts=8 sts=4 sw=4 expandtab +# Fake Lakeshore Model 340 Temperature Controller +# +# Author: Douglas Clowes 2012, 2013 +# +from LakeshoreDevice import LakeshoreDevice +import random +import re +import sys +import time + +class Lakeshore340(LakeshoreDevice): + """Lakeshore 340 temperature controller object - simulates the LS340""" + + def __init__(self): + LakeshoreDevice.__init__(self) + print Lakeshore340.__name__, "ctor" + self.CONFIG_LOOPS = [1, 2] + self.CONFIG_SNSRS = [1, 2, 3, 4] + self.reset_powerup() + + def doCommand(self, command, params): + print Lakeshore340.__name__, "Command:", command, params + return LakeshoreDevice.doCommand(self, command, params) + + def doQuery(self, command, params): + print Lakeshore340.__name__, "Query:", command, params + return LakeshoreDevice.doQuery(self, command, params) + + def reset_powerup(self): + print Lakeshore340.__name__, "reset_powerup" + self.LAST_ITERATION = 0 + self.ALARM = {} + self.ALARMST = {} + for idx in self.CONFIG_SNSRS: + self.ALARM[idx] = "0" + self.ALARMST[idx] = "0" + self.ANALOG = {1: "0,1,1,1,400.0,0.0,0.0", 2: "0,1,1,1,400.0,0.0,0.0"} + self.AOUT = { 1: 0.0, 2: 0.0 } + self.CFILT = {1: 1, 2: 1} + self.CLIMI = 0.0 + self.CLIMIT = {1: "400.0,10,0,0", 2: "400.0,10,0,0"} + self.CMODE = {} + for idx in self.CONFIG_LOOPS: + self.CMODE[idx] = 1 + self.CRVHDR = {} + self.CRVHDR[1] = "DT-336-1 ,STANDARD ,1,+500.000,1" + self.CRVHDR[2] = "DT-336-2 ,STANDARD ,2,+0.500,1" + self.CRVHDR[3] = "DT-336-3 ,STANDARD ,3,+2.000,2" + self.CRVHDR[4] = "DT-336-4 ,STANDARD ,4,+0.301,2" + self.CRVPT = {} + for i in range(1,21): + self.CRVPT[i] = [(0.0, 0.0)] + self.CSET = {1: "A,1,1,0", 2: "B,1,1,0"} + self.DOUT = 0 + self.FILTER = {1: "1,10,2"} + self.FREQ = 2 + self.GUARD = 0 + self.HTR = 0.0 + self.HTRRNG = 0 + self.HTRST = 0 + self.IDN = "LSCI,MODEL340,123456,01022003" + self.IEEE = "0,0,4" + self.INCRV = {} + self.INSET = {} + self.INTYPE = {} + for idx in self.CONFIG_SNSRS: + self.INCRV[idx] = "1" + self.INSET[idx] = "1,0" + self.INTYPE[idx] = "1,0,1,0,1" + self.KEYST = 1 + self.KRDG = {} + for idx in self.CONFIG_SNSRS: + self.KRDG[idx] = 300.0 + self.LOCK = "0,000" + self.MDAT = {1: "0,0,1"} # Min,Max,Reset + self.MOUT = {1: "0.0", 2: "0.0", 3: "0.0", 4: "0.0"} + self.OUTMODE = {1: "1,1,0", 2: "1,2,0", 3: "1,3,0", 4: "1,4,0"} + self.PID = {1: "+0150.0,+0005.0,+000.0", 2: "+0150.0,+0005.0,+000.0"} + self.RAMP_ON = {1: 0, 2: 0} + self.RAMP_RATE = {1: 0.000, 2: 0.000} + self.RAMP_ST = {1: 0, 2:0} + self.RAMP_TIME = 0.0 + self.RANGE = {} + for idx in range(1,5): + self.RANGE[idx] = "1" + self.RDGST = {"A": 0, "B": 0, "C": 0, "D": 0} + self.RELAY = {1: "1,A,0", 2: "2,A,0"} + self.RELAYST = {1: "0", 2: "0"} + self.SETTLE = "10.0,10" + self.SETP = {} + self.TARGET = {} + self.RAMP_START_TEMP = {} + self.RAMP_START_TIME = {} + for idx in self.CONFIG_LOOPS: + self.SETP[idx] = 300.0 + self.TARGET[idx] = 300.0 + self.RAMP_START_TEMP[idx] = 300.0 + self.RAMP_START_TIME[idx] = 0.0 + self.STB = 0 + self.ESE = 0 + self.ESR = 0 + self.OPC = 0 + self.SRE = 0 + self.TST = 0 + self.RANDOM = 0.5 + + def doIteration(self): + delta_time = time.time() - self.LAST_ITERATION + if delta_time < 1: + return + #print "DoIteration:", delta_time + self.LAST_ITERATION = time.time() + for idx in self.CONFIG_LOOPS: + # TODO - progress ramping setpoints (SP) + if idx in self.RAMP_ON and self.RAMP_ON[idx] and self.TARGET[idx] != self.SETP[idx]: + delta_time = time.time() - self.RAMP_START_TIME[idx]; + delta_temp = self.RAMP_RATE[idx] * (delta_time / 60.0) + if self.TARGET[idx] > self.RAMP_START_TEMP[idx]: + self.SETP[idx] = self.RAMP_START_TEMP[idx] + delta_temp + if self.SETP[idx] >= self.TARGET[idx]: + self.SETP[idx] = self.TARGET[idx] + self.RAMP_ST[idx] = 0 + else: + self.SETP[idx] = self.RAMP_START_TEMP[idx] - delta_temp + if self.SETP[idx] <= self.TARGET[idx]: + self.SETP[idx] = self.TARGET[idx] + self.RAMP_ST[idx] = 0 + + # TODO - iterate Power Level + if self.KRDG[idx] <> self.SETP[idx]: + self.HTR = self.SETP[idx] - self.KRDG[idx] + if self.HTR > 100.0: + self.HTR = 100.0 + elif self.HTR < -100.0: + self.HTR = -100.0 + + # TODO - iterate Process Values (PV) + self.KRDG[idx] = (0.9 * self.KRDG[idx] + 0.1 * self.SETP[idx]) + + def doCommandCLS(self, cmd, args): + print "Unimplemented Command: \"*CLS\" in \"" + cmd + " " + ",".join(args) + "\"" + def doCommandESE(self, cmd, args): + self.ESE = int(args[0]) & 0xFF + def doQueryESE(self, cmd, args): + self.write("%d" % self.ESE) + def doQueryESR(self, cmd, args): + self.write("%d" % self.ESR) + def doQueryIDN(self, cmd, args): + self.write(self.IDN) + def doCommandOPC(self, cmd, args): + self.OPC = 1 + def doQueryOPC(self, cmd, args): + self.write("1") + def doCommandRST(self, cmd, args): + print "Unimplemented Command: \"*RST\" in \"" + cmd + " " + ",".join(args) + "\"" + def doCommandSRE(self, cmd, args): + self.SRE = int(args[0]) & 0xFF + def doQuerySRE(self, cmd, args): + self.write("%d" % self.SRE) + def doQuerySTB(self, cmd, args): + self.write("%d" % self.STB) + def doQueryTST(self, cmd, args): + self.write("%d" % self.TST) + def doCommandWAI(self, cmd, args): + print "Unimplemented Command: \"*WAI\" in \"" + cmd + " " + ",".join(args) + "\"" + def doCommandALARM(self, cmd, args): + if len(args) > 0: + idx = ord(args[0]) - 64 + params = args[1:].join(",") + if idx in self.ALARM: + self.ALARM[idx] = params + def doQueryALARM(self, cmd, args): + if len(args) > 0: + idx = ord(args[0]) - 64 + if idx in self.ALARM: + self.write(self.ALARM[idx]) + else: + self.write("0,1,500.0,0.0,0") + def doQueryALARMST(self, cmd, args): + if len(args) > 0: + idx = ord(args[0]) - 64 + if idx in self.ALARMST: + self.write(self.ALARMST[idx]) + else: + self.write("0,0") + def doCommandALMRST(self, cmd, args): + for key in self.ALARMST.keys(): + self.ALARMST[key] = "0,0" + def doCommandANALOG(self, cmd, args): + key = args[0] + if key < 1 or key > 2: + key = 1 + self.ANALOG[key] = ",".join(args[1:]) + print "TODO implement Command: \"ANALOG\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQueryANALOG(self, cmd, args): + key = args[0] + if key < 1 or key > 2: + key = 1 + self.write(self.ANALOG[key]) # TODO + print "TODO implement Query: \"ANALOG?\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQueryAOUT(self, cmd, args): + key = args[0] + if key < 1 or key > 2: + key = 1 + self.write("%6.3f" % self.AOUT[key]) # TODO + print "TODO implement Query: \"AOUT?\" in \"" + cmd + " " + ",".join(args) + "\"" + def doCommandBEEP(self, cmd, args): + print "Unimplemented Command: \"BEEP\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQueryBEEP(self, cmd, args): + print "Unimplemented Query: \"BEEP?\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQueryBUSY(self, cmd, args): + self.write("0") + def doCommandCDISP(self, cmd, args): + print "Unimplemented Command: \"CDISP\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQueryCDISP(self, cmd, args): + print "Unimplemented Query: \"CDISP?\" in \"" + cmd + " " + ",".join(args) + "\"" + def doCommandCFILT(self, cmd, args): + loop = int(args[0]) + if loop < 1: + loop = 1 + if loop > 2: + loop = 2 + self.CFILT[loop] = int(args[1]) + def doQueryCFILT(self, cmd, args): + loop = int(args[0]) + if loop < 1: + loop = 1 + if loop > 2: + loop = 2 + self.write("%d" % self.CFILT[loop]) + def doCommandCLIMI(self, cmd, args): + self.CLIMI = double(args[0]) + def doQueryCLIMI(self, cmd, args): + self.write("%f" % self.CLIMI) + def doCommandCLIMIT(self, cmd, args): + loop = int(args[0]) + if loop < 1: + loop = 1 + if loop > 2: + loop = 2 + self.CLIMIT[loop] = ",".join(args[1:]) + def doQueryCLIMIT(self, cmd, args): + loop = int(args[0]) + if loop < 1: + loop = 1 + if loop > 2: + loop = 2 + self.write("%s" % self.CLIMIT[loop]) + def doCommandCMODE(self, cmd, args): + loop = int(args[0]) + if loop in self.CMODE: + self.CMODE[loop] = int(args[1]) + def doQueryCMODE(self, cmd, args): + loop = int(args[0]) + if loop in self.CMODE: + self.write("%f" % self.CMODE[loop]) + def doCommandCOMM(self, cmd, args): + self.COMM = ",".join(args) + def doQueryCOMM(self, cmd, args): + self.write("%f" % self.COMM) + def doQueryCRDG(self, cmd, args): + loop = ord(args[0]) - 64 + if loop < 1: + loop = 1 + if loop > 4: + loop = 4 + self.write("%f" % (self.KRDG[loop] - 273.15 + self.RANDOM)) + def doCommandCRVDEL(self, cmd, args): + print "Unimplemented Command: \"CRVDEL\" in \"" + cmd + " " + ",".join(args) + "\"" + def doCommandCRVHDR(self, cmd, args): + print "Unimplemented Command: \"CRVHDR\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQueryCRVHDR(self, cmd, args): + key = int(args[0]) + if key in self.CRVHDR: + self.write(self.CRVHDR[key]) + else: + self.write("DT-336-1 ,STANDARD ,1,+500.000,1") # TODO + print "TODO implement Query: \"CRVHDR?\" in \"" + cmd + " " + ",".join(args) + "\"" + def doCommandCRVPT(self, cmd, args): + key = int(args[0]) + if key < 1 or key > 20: + key = 1 + idx = int(args[1]) + if idx < 1 or idx > 200: + idx = 1 + # TODO set the Curve Point + print "TODO implement Command: \"CRVPT\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQueryCRVPT(self, cmd, args): + key = int(args[0]) + if key < 1 or key > 20: + key = 1 + idx = int(args[1]) + if idx < 1 or idx > 200: + idx = 1 + self.write("1.0E+01,1.0+E02") # TODO + print "TODO implement Query: \"CRVPT?\" in \"" + cmd + " " + ",".join(args) + "\"" + def doCommandCRVSAV(self, cmd, args): + pass + def doCommandCSET(self, cmd, args): + loop = int(args[0]) + if loop < 1: + loop = 1 + if loop > 2: + loop = 2 + self.CSET[loop] = ",".join(args[1:]) + def doQueryCSET(self, cmd, args): + loop = int(args[0]) + if loop < 1: + loop = 1 + if loop > 2: + loop = 2 + self.write("%s" % self.CSET[loop]) + def doCommandDATETIME(self, cmd, args): + print "Unimplemented Command: \"DATETIME\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQueryDATETIME(self, cmd, args): + self.write("2,3,1996,15,30,0,0") # TODO + def doCommandDFLT(self, cmd, args): + if args[0] == "99": + print "Unimplemented Command: \"DFLT 99\" in \"" + cmd + " " + ",".join(args) + "\"" + else: + print "Invalid Command: \"DFLT " + args[0] + "\" in \"" + cmd + " " + ",".join(args) + "\"" + def doCommandDISPLAY(self, cmd, args): + print "Unimplemented Command: \"DISPLAY\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQueryDISPLAY(self, cmd, args): + print "Unimplemented Query: \"DISPLAY?\" in \"" + cmd + " " + ",".join(args) + "\"" + def doCommandDISPLOC(self, cmd, args): + print "Unimplemented Command: \"DISPLOC\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQueryDISPLOC(self, cmd, args): + print "Unimplemented Query: \"DISPLOC?\" in \"" + cmd + " " + ",".join(args) + "\"" + def doCommandDOUT(self, cmd, args): + self.DOUT = int(args[0]) + def doQueryDOUT(self, cmd, args): + self.write("%d" % self.DOUT) + def doCommandFILTER(self, cmd, args): + if len(args) > 0: + if int(args[0]) in self.FILTER: + keys = [int(args)] + else: + keys = self.FILTER.keys() + params = args[1:].join(",") + for key in keys: + self.FILTER[key] = params + def doQueryFILTER(self, cmd, args): + idx = int(args[0]) + if idx in self.FILTER: + self.write(self.FILTER[idx]) + else: + self.write("0") + def doQueryHTR(self, cmd, args): + self.write("%f" % self.HTR) + def doQueryHTRST(self, cmd, args): + self.write("%d" % self.HTRST) + def doQueryIEEE(self, cmd, args): + self.write("%s" % self.IEEE) + def doCommandIEEE(self, cmd, args): + self.IEEE = args[0] + def doQueryINCRV(self, cmd, args): + idx = ord(args[0]) - 64 + if idx in self.INCRV: + self.write(self.INCRV[idx]) + else: + self.write("0") + def doCommandINCRV(self, cmd, args): + if len(args) > 1: + idx = ord(args[0]) - 64 + if idx in self.INCRV: + for key in keys: + self.INCRV[key] = args[1] + def doQueryINTYPE(self, cmd, args): + idx = ord(args[0]) - 64 + if idx in self.INTYPE: + self.write(self.INTYPE[idx]) + else: + self.write("0") + def doQueryINSET(self, cmd, args): + idx = ord(args[0]) + if idx in self.INSET: + self.write(self.INSET[idx]) + else: + self.write("0,0") + def doCommandINSET(self, cmd, args): + if len(args) > 0: + idx = ord(args[0]) + if idx in self.INSET: + self.INSET[idx] = ",".join(args[1:]) + def doCommandINTYPE(self, cmd, args): + if len(args) > 1: + idx = ord(args[0]) - 64 + if idx in self.INTYPE: + for key in keys: + self.INTYPE[key] = ",".join(args[1:]) + def doQueryKEYST(self, cmd, args): + self.write("%d" % self.KEYST) + self.KEYST = 0 + def doQueryKRDG(self, cmd, args): + loop = ord(args[0]) - 64 + if loop < 1: + loop = 1 + if loop > 4: + loop = 4 + self.write("%f" % (self.KRDG[loop] + self.RANDOM)) + def doQueryLDAT(self, cmd, args): + self.write("3.000E+02") # TODO + print "TODO implement Query: \"LDAT?\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQueryLINEAR(self, cmd, args): + self.write("1,1.0,1,3") # TODO + print "TODO implement Query: \"LINEAR?\" in \"" + cmd + " " + ",".join(args) + "\"" + def doCommandLINEAR(self, cmd, args): + print "Unimplemented Command: \"LINEAR\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQueryLOCK(self, cmd, args): + self.write("%s" % self.LOCK) + def doCommandLOCK(self, cmd, args): + self.LOCK = args[0] + def doQueryMDAT(self, cmd, args): + response = "0" + if len(args[0]) > 0: + idx = int(args[0]) + if idx in self.MDAT: + (minv, maxv, reset) = self.MDAT[idx].split(",") + response = "%f,%f" % (float(minv), float(maxv)) + self.write(response) + def doQueryMNMX(self, cmd, args): + self.write("1") # TODO + print "TODO implement Query: \"MNMX?\" in \"" + cmd + " " + ",".join(args) + "\"" + def doCommandMNMX(self, cmd, args): + print "Unimplemented Command: \"MNMX\" in \"" + cmd + " " + ",".join(args) + "\"" + def doCommandMNMXRST(self, cmd, args): + print "Unimplemented Command: \"MNMXRST\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQueryMODE(self, cmd, args): + print "Unimplemented Query: \"MODE?\" in \"" + cmd + " " + ",".join(args) + "\"" + def doCommandMODE(self, cmd, args): + print "Unimplemented Command: \"MODE\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQueryMONITOR(self, cmd, args): + print "Unimplemented Query: \"MONITOR?\" in \"" + cmd + " " + ",".join(args) + "\"" + def doCommandMONITOR(self, cmd, args): + print "Unimplemented Command: \"MONITOR\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQueryMOUT(self, cmd, args): + idx = int(args[0]) + self.write(self.MOUT[idx]) + def doCommandMOUT(self, cmd, args): + print "Unimplemented Command: \"MOUT\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQueryPID(self, cmd, args): + idx = int(args[0]) + self.write(self.PID[idx]) + def doCommandPID(self, cmd, args): + print "Unimplemented Command: \"PID\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQueryRANGE(self, cmd, args): + idx = 1 + self.write(self.RANGE[idx]) + def doCommandRANGE(self, cmd, args): + idx = 1 + val = int(args[0]) + self.RANGE[idx] = val + def doQueryRAMP(self, cmd, args): + idx = int(args[0]) + response = "%d,%f" % (self.RAMP_ON[idx], self.RAMP_RATE[idx]) + self.write(response) + def doCommandRAMP(self, cmd, args): + idx = int(args[0]) + ramp_on = int(args[1]) + if ramp_on == 0 or ramp_on == 1: + self.RAMP_ON[idx] = ramp_on + if ramp_on == 1: + ramp_rate = float(args[2]) + if ramp_rate >= 0.001 and ramp_rate <= 100.0: + self.RAMP_RATE[idx] = ramp_rate + def doQueryRAMPST(self, cmd, args): + idx = int(args[0]) + response = "%d" % self.RAMP_ST[idx] + self.write(response) + def doQueryRDGPWR(self, cmd, args): + self.write("1.000E-15") # TODO + print "TODO implement Query: \"RDGPWR?\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQueryRDGR(self, cmd, args): + self.write("1.000E+03") # TODO + print "TODO implement Query: \"RDGR?\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQueryRDGRNG(self, cmd, args): + self.write("1,1,19,0,0") # TODO + print "TODO implement Query: \"RDGRNG?\" in \"" + cmd + " " + ",".join(args) + "\"" + def doCommandRDGRNG(self, cmd, args): + print "Unimplemented Command: \"RDGRNG\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQueryRDGST(self, cmd, args): + self.write("000") + print "TODO implement Query: \"RDGST?\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQueryRELAY(self, cmd, args): + idx = int(args[0]) + self.write(self.RELAY[idx]) + def doCommandRELAY(self, cmd, args): + idx = int(args[0]) + if idx in self.RELAY: + self.relay[idx] = ",".join(args[1:]) + def doQueryRELAYST(self, cmd, args): + idx = int(args[0]) + self.write(self.RELAYST[idx]) + def doQuerySETP(self, cmd, args): + idx = int(args[0]) + val = self.SETP[idx] + self.write("%f" % val) + def doCommandSETP(self, cmd, args): + idx = int(args[0]) + val = float(args[1]) + if (val >= 0.0 and val <= 500.0): + self.TARGET[idx] = val + if self.RAMP_ON[idx] == 0: + self.SETP[idx] = val + self.RAMP_ST[idx] = 0 + else: + self.RAMP_START_TEMP[idx] = self.SETP[idx] + self.RAMP_START_TIME[idx] = time.time() + self.RAMP_ST[idx] = 1 + def doQuerySETTLE(self, cmd, args): + self.write(self.SETTLE) + def doCommandSETTLE(self, cmd, args): + self.SETTLE = ",".join(args) + def doQueryZONE(self, cmd, args): + print "Unimplemented Query: \"ZONE?\" in \"" + cmd + " " + ",".join(args) + "\"" + def doCommandZONE(self, cmd, args): + print "Unimplemented Command: \"ZONE\" in \"" + cmd + " " + ",".join(args) + "\"" + +if __name__ == '__main__': + from LakeshoreProtocol import LakeshoreProtocol + + class TestFactory: + def __init__(self): + print self.__class__.__name__, "ctor" + self.numProtocols = 0 + def write(self, data): + print "test write:", data, + def loseConnection(self): + print "test lose connection" + test_factory = TestFactory() + test_device = Lakeshore340() + test_protocol = LakeshoreProtocol(test_device, "\r\n") + test_protocol.factory = test_factory + test_protocol.transport = test_factory + test_device.protocol = test_protocol + test_device.protocol.connectionMade() + test_device.protocol.dataReceived("*IDN?") + test_device.protocol.dataReceived("*TST?") + test_device.protocol.connectionLost("Dunno") diff --git a/site_ansto/instrument/TEST_SICS/fakeTempControl/lakeshore/Lakeshore370.py b/site_ansto/instrument/TEST_SICS/fakeTempControl/lakeshore/Lakeshore370.py new file mode 100755 index 00000000..61d0dd0c --- /dev/null +++ b/site_ansto/instrument/TEST_SICS/fakeTempControl/lakeshore/Lakeshore370.py @@ -0,0 +1,472 @@ +# vim: ts=8 sts=4 sw=4 expandtab +# Fake Lakeshore Model 370 Temperature Controller +# +# Author: Douglas Clowes 2012, 2013 +# +from LakeshoreDevice import LakeshoreDevice +import random +import re +import sys +import time + +class Lakeshore370(LakeshoreDevice): + """Lakeshore 370 temperature controller object - simulates the LS370""" + + def __init__(self): + LakeshoreDevice.__init__(self) + print Lakeshore370.__name__, "ctor" + self.CONFIG_LOOPS = [1] + self.CONFIG_SNSRS = [i for i in range(1,17)] + self.reset_powerup() + + def doCommand(self, command, params): + print Lakeshore370.__name__, "Command:", command, params + return LakeshoreDevice.doCommand(self, command, params) + + def doQuery(self, command, params): + print Lakeshore370.__name__, "Query:", command, params + return LakeshoreDevice.doQuery(self, command, params) + + def reset_powerup(self): + print Lakeshore370.__name__, "reset_powerup" + self.LAST_ITERATION = 0 + self.ALARM = {1: "0,1,500.0,0.0,0,0"} + self.ALARMST = {1: "0,0"} + self.ANALOG = {1: "0,1,1,1,400.0,0.0,0.0", 2: "0,1,1,1,400.0,0.0,0.0"} + self.AOUT = { 1: 0.0, 2: 0.0 } + self.CMODE = {} + for idx in self.CONFIG_LOOPS: + self.CMODE[idx] = 1 + self.CMR = 0 + self.CPOL = 0 + self.CRVHDR = {} + self.CRVHDR[1] = "DT-336-1 ,STANDARD ,1,+500.000,1" + self.CRVHDR[2] = "DT-336-2 ,STANDARD ,2,+0.500,1" + self.CRVHDR[3] = "DT-336-3 ,STANDARD ,3,+2.000,2" + self.CRVHDR[4] = "DT-336-4 ,STANDARD ,4,+0.301,2" + self.CRVPT = {} + for i in range(1,21): + self.CRVPT[i] = [(0.0, 0.0)] + self.CSET = "1,0,1,30,1,8,100" + self.DOUT = 0 + self.FILTER = {1: "1,10,2"} + self.FREQ = 2 + self.GUARD = 0 + self.HTR = 0.0 + self.HTRRNG = 0 + self.HTRST = 0 + self.IDN = "LSCI,MODEL370,123456,00000000,1.5" + self.IEEE = "0,0,4" + self.INCRV = {} + self.INSET = {} + for idx in self.CONFIG_SNSRS: + self.INCRV[idx] = "1" + self.INSET[idx] = "1,10,3,0,2" + self.INTYPE = {"A": "1,0,1,0,1", "B": "1,0,1,0,1", "C": "1,0,1,0,1", "D": "1,0,1,0,1" } + self.KEYST = 1 + self.KRDG = {} + for idx in self.CONFIG_SNSRS: + self.KRDG[idx] = 300.0 + self.LOCK = "0,000" + self.MDAT = {1: "0,0,1"} # Min,Max,Reset + self.MOUT = {1: "0.0", 2: "0.0", 3: "0.0", 4: "0.0"} + self.OUTMODE = {1: "1,1,0", 2: "1,2,0", 3: "1,3,0", 4: "1,4,0"} + self.PID = {1: "+0150.0,+0005.0,+000.0", 2: "+0150.0,+0005.0,+000.0"} + self.RAMP_ON = {1: 0} + self.RAMP_RATE = {1: 0.000} + self.RAMP_ST = {1: 0} + self.RAMP_TIME = 0.0 + self.RDGST = {"A": 0, "B": 0, "C": 0, "D": 0} + self.RELAY = {1: "1,A,0", 2: "2,A,0"} + self.RELAYST = {1: "0", 2: "0"} + self.SETP = {} + self.TARGET = {} + self.RAMP_START_TEMP = {} + self.RAMP_START_TIME = {} + for idx in self.CONFIG_LOOPS: + self.SETP[idx] = 300.0 + self.TARGET[idx] = 300.0 + self.RAMP_START_TEMP[idx] = 300.0 + self.RAMP_START_TIME[idx] = 0.0 + self.STB = 0 + self.ESE = 0 + self.ESR = 0 + self.OPC = 0 + self.SRE = 0 + self.TST = 0 + self.RANDOM = 0.5 + + def doIteration(self): + delta_time = time.time() - self.LAST_ITERATION + if delta_time < 1: + return + #print "DoIteration:", delta_time + self.LAST_ITERATION = time.time() + for idx in self.CONFIG_LOOPS: + # TODO - progress ramping setpoints (SP) + if idx in self.RAMP_ON and self.RAMP_ON[idx] and self.TARGET[idx] != self.SETP[idx]: + delta_time = time.time() - self.RAMP_START_TIME[idx]; + delta_temp = self.RAMP_RATE[idx] * (delta_time / 60.0) + if self.TARGET[idx] > self.RAMP_START_TEMP[idx]: + self.SETP[idx] = self.RAMP_START_TEMP[idx] + delta_temp + if self.SETP[idx] >= self.TARGET[idx]: + self.SETP[idx] = self.TARGET[idx] + self.RAMP_ST[idx] = 0 + else: + self.SETP[idx] = self.RAMP_START_TEMP[idx] - delta_temp + if self.SETP[idx] <= self.TARGET[idx]: + self.SETP[idx] = self.TARGET[idx] + self.RAMP_ST[idx] = 0 + + # TODO - iterate Power Level + if self.KRDG[idx] <> self.SETP[idx]: + self.HTR = self.SETP[idx] - self.KRDG[idx] + if self.HTR > 100.0: + self.HTR = 100.0 + elif self.HTR < -100.0: + self.HTR = -100.0 + + # TODO - iterate Process Values (PV) + self.KRDG[idx] = (0.9 * self.KRDG[idx] + 0.1 * self.SETP[idx]) + + def doCommandCLS(self, cmd, args): + print "Unimplemented Command: \"*CLS\" in \"" + cmd + " " + ",".join(args) + "\"" + def doCommandESE(self, cmd, args): + self.ESE = int(args[0]) & 0xFF + def doQueryESE(self, cmd, args): + self.write("%d" % self.ESE) + def doQueryESR(self, cmd, args): + self.write("%d" % self.ESR) + def doQueryIDN(self, cmd, args): + self.write(self.IDN) + def doCommandOPC(self, cmd, args): + self.OPC = 1 + def doQueryOPC(self, cmd, args): + self.write("1") + def doCommandRST(self, cmd, args): + print "Unimplemented Command: \"*RST\" in \"" + cmd + " " + ",".join(args) + "\"" + def doCommandSRE(self, cmd, args): + self.SRE = int(args[0]) & 0xFF + def doQuerySRE(self, cmd, args): + self.write("%d" % self.SRE) + def doQuerySTB(self, cmd, args): + self.write("%d" % self.STB) + def doQueryTST(self, cmd, args): + self.write("%d" % self.TST) + def doCommandWAI(self, cmd, args): + print "Unimplemented Command: \"*WAI\" in \"" + cmd + " " + ",".join(args) + "\"" + def doCommandALARM(self, cmd, args): + if len(args) > 0: + if int(args[0]) in self.ALARM: + keys = [int(args)] + else: + keys = self.ALARM.keys() + params = args[1:].join(",") + for key in keys: + self.ALARM[key] = params + def doQueryALARM(self, cmd, args): + idx = int(args[0]) + if idx in self.ALARM: + self.write(self.ALARM[idx]) + else: + self.write("0,1,500.0,0.0,0") + def doQueryALARMST(self, cmd, args): + idx = int(args[0]) + if idx in self.ALARMST: + self.write(self.ALARMST[idx]) + else: + self.write("0,0") + def doCommandALMRST(self, cmd, args): + for key in self.ALARMST.keys(): + self.ALARMST[key] = "0,0" + def doCommandANALOG(self, cmd, args): + key = args[0] + if key < 1 or key > 2: + key = 1 + self.ANALOG[key] = ",".join(args[1:]) + print "TODO implement Command: \"ANALOG\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQueryANALOG(self, cmd, args): + key = args[0] + if key < 1 or key > 2: + key = 1 + self.write(self.ANALOG[key]) # TODO + print "TODO implement Query: \"ANALOG?\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQueryAOUT(self, cmd, args): + key = args[0] + if key < 1 or key > 2: + key = 1 + self.write("%6.3f" % self.AOUT[key]) # TODO + print "TODO implement Query: \"AOUT?\" in \"" + cmd + " " + ",".join(args) + "\"" + def doCommandBAUD(self, cmd, args): + print "Unimplemented Command: \"BAUD\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQueryBAUD(self, cmd, args): + print "Unimplemented Query: \"BAUD?\" in \"" + cmd + " " + ",".join(args) + "\"" + def doCommandBEEP(self, cmd, args): + print "Unimplemented Command: \"BEEP\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQueryBEEP(self, cmd, args): + print "Unimplemented Query: \"BEEP?\" in \"" + cmd + " " + ",".join(args) + "\"" + def doCommandBRIGT(self, cmd, args): + print "Unimplemented Command: \"BRIGT\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQueryBRIGT(self, cmd, args): + print "Unimplemented Query: \"BRIGT?\" in \"" + cmd + " " + ",".join(args) + "\"" + def doCommandCHGALL(self, cmd, args): + print "Unimplemented Command: \"CHGALL\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQueryCHGALL(self, cmd, args): + print "Unimplemented Query: \"CHGALL?\" in \"" + cmd + " " + ",".join(args) + "\"" + def doCommandCMODE(self, cmd, args): + loop = 1 + if loop in self.CMODE: + self.CMODE[loop] = int(args[0]) + def doQueryCMODE(self, cmd, args): + loop = 1 + if loop in self.CMODE: + self.write("%f" % self.CMODE[loop]) + def doCommandCMR(self, cmd, args): + self.CMR = int(args[0]) + def doQueryCMR(self, cmd, args): + self.write("%d" % self.CMR) + def doCommandCPOL(self, cmd, args): + self.CPOL = int(args[0]) + def doQueryCPOL(self, cmd, args): + self.write("%d" % self.CPOL) + def doCommandCRVDEL(self, cmd, args): + print "Unimplemented Command: \"CRVDEL\" in \"" + cmd + " " + ",".join(args) + "\"" + def doCommandCRVHDR(self, cmd, args): + print "Unimplemented Command: \"CRVHDR\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQueryCRVHDR(self, cmd, args): + key = int(args[0]) + if key in self.CRVHDR: + self.write(self.CRVHDR[key]) + else: + self.write("DT-336-1 ,STANDARD ,1,+500.000,1") # TODO + print "TODO implement Query: \"CRVHDR?\" in \"" + cmd + " " + ",".join(args) + "\"" + def doCommandCRVPT(self, cmd, args): + key = int(args[0]) + if key < 1 or key > 20: + key = 1 + idx = int(args[1]) + if idx < 1 or idx > 200: + idx = 1 + # TODO set the Curve Point + print "TODO implement Command: \"CRVPT\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQueryCRVPT(self, cmd, args): + key = int(args[0]) + if key < 1 or key > 20: + key = 1 + idx = int(args[1]) + if idx < 1 or idx > 200: + idx = 1 + self.write("1.0E+01,1.0+E02") # TODO + print "TODO implement Query: \"CRVPT?\" in \"" + cmd + " " + ",".join(args) + "\"" + def doCommandCSET(self, cmd, args): + self.CSET = args[0] + def doQueryCSET(self, cmd, args): + self.write("%s" % self.CSET) + def doCommandDFLT(self, cmd, args): + if args[0] == "99": + print "Unimplemented Command: \"DFLT 99\" in \"" + cmd + " " + ",".join(args) + "\"" + else: + print "Invalid Command: \"DFLT " + args[0] + "\" in \"" + cmd + " " + ",".join(args) + "\"" + def doCommandDISPLAY(self, cmd, args): + print "Unimplemented Command: \"DISPLAY\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQueryDISPLAY(self, cmd, args): + print "Unimplemented Query: \"DISPLAY?\" in \"" + cmd + " " + ",".join(args) + "\"" + def doCommandDISPLOC(self, cmd, args): + print "Unimplemented Command: \"DISPLOC\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQueryDISPLOC(self, cmd, args): + print "Unimplemented Query: \"DISPLOC?\" in \"" + cmd + " " + ",".join(args) + "\"" + def doCommandDOUT(self, cmd, args): + self.DOUT = int(args[0]) + def doQueryDOUT(self, cmd, args): + self.write("%d" % self.DOUT) + def doCommandFILTER(self, cmd, args): + if len(args) > 0: + if int(args[0]) in self.FILTER: + keys = [int(args)] + else: + keys = self.FILTER.keys() + params = args[1:].join(",") + for key in keys: + self.FILTER[key] = params + def doQueryFILTER(self, cmd, args): + idx = int(args[0]) + if idx in self.FILTER: + self.write(self.FILTER[idx]) + else: + self.write("0") + def doCommandFREQ(self, cmd, args): + self.FREQ = int(args[0]) + def doQueryFREQ(self, cmd, args): + self.write("%d" % self.FREQ) + def doCommandGUARD(self, cmd, args): + self.GUARD = int(args[0]) + def doQueryGUARD(self, cmd, args): + self.write("%d" % self.GUARD) + def doQueryHTR(self, cmd, args): + self.write("%f" % self.HTR) + def doCommandHTRRNG(self, cmd, args): + self.HTRRNG = int(args[0]) + def doQueryHTRRNG(self, cmd, args): + self.write("%d" % self.HTRRNG) + def doQueryHTRST(self, cmd, args): + self.write("%d" % self.HTRST) + def doQueryIEEE(self, cmd, args): + self.write("%s" % self.IEEE) + def doCommandIEEE(self, cmd, args): + self.IEEE = args[0] + def doQueryINSET(self, cmd, args): + idx = int(args[0]) + if idx in self.INSET: + self.write(self.INSET[idx]) + else: + self.write("0") + def doCommandINSET(self, cmd, args): + if len(args) > 0: + idx = int(args[0]) + if idx in self.INSET: + params = ",".join(args[1:]) + self.INSET[idx] = params + def doQueryKEYST(self, cmd, args): + self.write("%d" % self.KEYST) + self.KEYST = 0 + def doQueryLDAT(self, cmd, args): + self.write("3.000E+02") # TODO + print "TODO implement Query: \"LDAT?\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQueryLINEAR(self, cmd, args): + self.write("1,1.0,1,3") # TODO + print "TODO implement Query: \"LINEAR?\" in \"" + cmd + " " + ",".join(args) + "\"" + def doCommandLINEAR(self, cmd, args): + print "Unimplemented Command: \"LINEAR\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQueryLOCK(self, cmd, args): + self.write("%s" % self.LOCK) + def doCommandLOCK(self, cmd, args): + self.LOCK = args[0] + def doQueryMDAT(self, cmd, args): + response = "0" + if len(args[0]) > 0: + idx = int(args[0]) + if idx in self.MDAT: + (minv, maxv, reset) = self.MDAT[idx].split(",") + response = "%f,%f" % (float(minv), float(maxv)) + self.write(response) + def doQueryMNMX(self, cmd, args): + self.write("1") # TODO + print "TODO implement Query: \"MNMX?\" in \"" + cmd + " " + ",".join(args) + "\"" + def doCommandMNMX(self, cmd, args): + print "Unimplemented Command: \"MNMX\" in \"" + cmd + " " + ",".join(args) + "\"" + def doCommandMNMXRST(self, cmd, args): + print "Unimplemented Command: \"MNMXRST\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQueryMODE(self, cmd, args): + print "Unimplemented Query: \"MODE?\" in \"" + cmd + " " + ",".join(args) + "\"" + def doCommandMODE(self, cmd, args): + print "Unimplemented Command: \"MODE\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQueryMONITOR(self, cmd, args): + print "Unimplemented Query: \"MONITOR?\" in \"" + cmd + " " + ",".join(args) + "\"" + def doCommandMONITOR(self, cmd, args): + print "Unimplemented Command: \"MONITOR\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQueryMOUT(self, cmd, args): + print "Unimplemented Query: \"MOUT?\" in \"" + cmd + " " + ",".join(args) + "\"" + def doCommandMOUT(self, cmd, args): + print "Unimplemented Command: \"MOUT\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQueryPID(self, cmd, args): + idx = int(args[0]) + self.write(self.PID[idx]) + def doCommandPID(self, cmd, args): + print "Unimplemented Command: \"PID\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQueryRAMP(self, cmd, args): + idx = 1 + response = "%d,%f" % (self.RAMP_ON[idx], self.RAMP_RATE[idx]) + self.write(response) + def doCommandRAMP(self, cmd, args): + idx = 1 + ramp_on = int(args[0]) + if ramp_on == 0 or ramp_on == 1: + self.RAMP_ON[idx] = ramp_on + if ramp_on == 1: + ramp_rate = float(args[1]) + if ramp_rate >= 0.001 and ramp_rate <= 10.0: + self.RAMP_RATE[idx] = ramp_rate + def doQueryRAMPST(self, cmd, args): + response = "%d" % self.RAMP_ST + self.write(response) + def doQueryRDGK(self, cmd, args): + if len(args) == 0 or len(args[0]) == 0: + idx = 1 + else: + idx = int(args[0]) + if idx in self.KRDG: + self.RANDOM = random.uniform(-0.5, 0.5) + self.RANDOM = 0.5 + self.write("%f" % (self.KRDG[idx] + self.RANDOM)) + else: + self.write("+000.0") + def doQueryRDGPWR(self, cmd, args): + self.write("1.000E-15") # TODO + print "TODO implement Query: \"RDGPWR?\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQueryRDGR(self, cmd, args): + self.write("1.000E+03") # TODO + print "TODO implement Query: \"RDGR?\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQueryRDGRNG(self, cmd, args): + self.write("1,1,19,0,0") # TODO + print "TODO implement Query: \"RDGRNG?\" in \"" + cmd + " " + ",".join(args) + "\"" + def doCommandRDGRNG(self, cmd, args): + print "Unimplemented Command: \"RDGRNG\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQueryRDGST(self, cmd, args): + self.write("000") + print "TODO implement Query: \"RDGST?\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQueryRELAY(self, cmd, args): + print "Unimplemented Query: \"RELAY?\" in \"" + cmd + " " + ",".join(args) + "\"" + def doCommandRELAY(self, cmd, args): + print "Unimplemented Command: \"RELAY\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQueryRELAYST(self, cmd, args): + print "Unimplemented Query: \"RELAYST?\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQuerySCAN(self, cmd, args): + print "Unimplemented Query: \"SCAN?\" in \"" + cmd + " " + ",".join(args) + "\"" + def doCommandSCAN(self, cmd, args): + print "Unimplemented Command: \"SCAN\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQuerySETP(self, cmd, args): + idx = 1 + val = self.SETP[idx] + self.write("%f" % val) + def doCommandSETP(self, cmd, args): + idx = 1 + val = float(args[0]) + if (val >= 0.0 and val <= 500.0): + self.TARGET[idx] = val + if self.RAMP_ON[idx] == 0: + self.SETP[idx] = val + self.RAMP_ST[idx] = 0 + else: + self.RAMP_START_TEMP[idx] = self.SETP[idx] + self.RAMP_START_TIME[idx] = time.time() + self.RAMP_ST[idx] = 1 + def doQuerySTILL(self, cmd, args): + print "Unimplemented Query: \"STILL?\" in \"" + cmd + " " + ",".join(args) + "\"" + def doCommandSTILL(self, cmd, args): + print "Unimplemented Command: \"STILL\" in \"" + cmd + " " + ",".join(args) + "\"" + def doQueryZONE(self, cmd, args): + print "Unimplemented Query: \"ZONE?\" in \"" + cmd + " " + ",".join(args) + "\"" + def doCommandZONE(self, cmd, args): + print "Unimplemented Command: \"ZONE\" in \"" + cmd + " " + ",".join(args) + "\"" + +if __name__ == '__main__': + from LakeshoreProtocol import LakeshoreProtocol + + class TestFactory: + def __init__(self): + print self.__class__.__name__, "ctor" + self.numProtocols = 0 + def write(self, data): + print "test write:", data, + def loseConnection(self): + print "test lose connection" + test_factory = TestFactory() + test_device = Lakeshore370() + test_protocol = LakeshoreProtocol(test_device, "\r\n") + test_protocol.factory = test_factory + test_protocol.transport = test_factory + test_device.protocol = test_protocol + test_device.protocol.connectionMade() + test_device.protocol.dataReceived("*IDN?") + test_device.protocol.dataReceived("*TST?") + test_device.protocol.connectionLost("Dunno") diff --git a/site_ansto/instrument/TEST_SICS/fakeTempControl/lakeshore/LakeshoreDevice.py b/site_ansto/instrument/TEST_SICS/fakeTempControl/lakeshore/LakeshoreDevice.py new file mode 100755 index 00000000..4e739c8d --- /dev/null +++ b/site_ansto/instrument/TEST_SICS/fakeTempControl/lakeshore/LakeshoreDevice.py @@ -0,0 +1,92 @@ +# vim: ts=8 sts=4 sw=4 expandtab +# +# Generic Lakeshore Temperature Controller Device +# +# Author: Douglas Clowes (2013) +# +import inspect +import traceback + +class LakeshoreDevice(object): + + def __init__(self): + print LakeshoreDevice.__name__, "ctor" + #print "Methods:", inspect.getmembers(self, inspect.ismethod) + methods = inspect.getmembers(self, inspect.ismethod) + self.myMethods = {} + for method in methods: + self.myMethods[method[0]] = method[1:] + #for method in sorted(self.myMethods): + # print "Method:", method, self.myMethods[method], type(method), type(self.myMethods[method]) + + def reset_powerup(self): + print LakeshoreDevice.__name__, "reset_powerup" + + def write(self, response): + print "Device Response: %s" % response + self.protocol.write(response) + + def doCommand(self, command, params): + print LakeshoreDevice.__name__, "Command:", command, params + method = "doCommand%s" % command + if method in self.myMethods: + action = "response = self.%s(command, params)" % method + print "Action:", action + exec action + if response: + return response + else: + print "Unimplemented Command:", command, params + return False + + def doQuery(self, command, params): + print LakeshoreDevice.__name__, "Query:", command, params + method = "doQuery%s" % command + if method in self.myMethods: + action = "response = self.%s(command, params)" % method + print "Action:", action + exec action + if response: + return response + else: + print "Unimplemented Query:", command, params + self.write("Unimplemented Query: %s" % command) + return False + + def doQueryTST(self, command, params): + self.write("0") + return True + + def doQueryIDN(self, command, params): + self.write(self.IDN) + return True + + def dataReceived(self, data): + print LakeshoreDevice.__name__, "PDU: \"" + data + "\"" + command = data.split()[0] + params = data[len(command):].strip().split(",") + if command[0] == "*": + command = command[1:] + try: + if command[-1] == '?': + self.doQuery(command[:-1], params) + else: + self.doCommand(command, params) + except: + traceback.print_exc() + +if __name__ == '__main__': + class TestProtocol: + def __init__(self): + print self.__class__.__name__, "ctor" + self.numProtocols = 0 + def write(self, data): + print "test write:", data + def loseConnection(self): + print "test lose connection" + test_protocol = TestProtocol() + test_device = LakeshoreDevice() + test_device.protocol = test_protocol + test_device.dataReceived("*IDN?") + test_device.dataReceived("*TST?") + test_device.dataReceived("RDGK?") diff --git a/site_ansto/instrument/TEST_SICS/fakeTempControl/lakeshore/LakeshoreFactory.py b/site_ansto/instrument/TEST_SICS/fakeTempControl/lakeshore/LakeshoreFactory.py new file mode 100644 index 00000000..89e60352 --- /dev/null +++ b/site_ansto/instrument/TEST_SICS/fakeTempControl/lakeshore/LakeshoreFactory.py @@ -0,0 +1,43 @@ +# vim: ts=8 sts=4 sw=4 expandtab +from twisted.internet.protocol import ServerFactory + +class LakeshoreFactory(ServerFactory): + """Factory object used by the Twisted Infrastructure to create a Protocol + object for incomming connections""" + + protocol = None + + def __init__(self, theProtocol, theDevice, theTerminator = "\r\n"): + print LakeshoreFactory.__name__, "ctor" + self.protocol = theProtocol + self.device = theDevice + self.term = theTerminator + self.numProtocols = 0 + + def buildProtocol(self, addr): + p = self.protocol(self.device, self.term) + p.factory = self + return p + +if __name__ == '__main__': + class TestProtocol: + def __init__(self, theDevice, theTerm = "\r\n"): + self.device = theDevice + self.response = "" + self.term = theTerm + + class TestDevice: + def __init__(self): + pass + + new_factory = LakeshoreFactory(TestProtocol, TestDevice, "\r\n"); + new_protocol = new_factory.buildProtocol("address") + print "Factory: ", new_factory + print " .protocol", new_factory.protocol + print " .device ", new_factory.device + print " .num_prot", new_factory.numProtocols + print " .term ", new_factory.term + print "Protocol: ", new_protocol + print " .device ", new_protocol.device + print " .response", new_protocol.response + print " .term ", new_protocol.term diff --git a/site_ansto/instrument/TEST_SICS/fakeTempControl/lakeshore/LakeshoreProtocol.py b/site_ansto/instrument/TEST_SICS/fakeTempControl/lakeshore/LakeshoreProtocol.py new file mode 100755 index 00000000..23ea9a44 --- /dev/null +++ b/site_ansto/instrument/TEST_SICS/fakeTempControl/lakeshore/LakeshoreProtocol.py @@ -0,0 +1,88 @@ +# vim: ts=8 sts=4 sw=4 expandtab +from twisted.internet.protocol import Protocol + +class LakeshoreProtocol(Protocol): + """Protocol object used by the Twisted Infrastructure to handle connections""" + + def __init__(self, theDevice, theTerminator = "\r\n"): + print LakeshoreProtocol.__name__, "ctor" + self.device = theDevice + self.response = "" + self.term = theTerminator + + def write(self, response): + self.response = self.response + response + + def connectionMade(self): + self.pdu = "" + self.response = "" + self.device.protocol = self + self.factory.numProtocols = self.factory.numProtocols + 1 + print "connectionMade:", self.factory.numProtocols + if self.factory.numProtocols > 2: + print "Too many connections - rejecting" + self.transport.write("Too many connections, try later" + self.term) + self.transport.loseConnection() + else: + self.transport.write(("Welcome connection %d" % self.factory.numProtocols) + self.term) + + def connectionLost(self, reason): + print "connectionLost:", self.factory.numProtocols, reason + self.factory.numProtocols = self.factory.numProtocols - 1 + + def lineReceived(self, data): + print "lineReceived - len:", len(data), data + self.device.protocol = self + self.device.dataReceived(data) + + def dataReceived(self, data): + print "dataReceived - len:", len(data), data + for c in data: + if c == "\r" or c == ";" or c == "\n": + if len(self.pdu) > 0: + self.lineReceived(self.pdu) + self.pdu = "" + if c == ";": + if len(self.response) > 0 and self.response[-1] != ";": + self.response = self.response + ";" + else: + if len(self.response) > 0: + if self.response[-1] == ";": + self.response = self.response[:-1] + if len(self.response) > 0: + print "Protocol Response: %s" % self.response + self.transport.write(self.response + self.term) + self.response = "" + else: + self.pdu = self.pdu + c + +if __name__ == '__main__': + class TestDevice: + def __init__(self): + print self.__class__.__name__, "ctor" + def dataReceived(self, pdu): + print "test device data received:", pdu + self.protocol.write("test device response") + class TestFactory: + def __init__(self): + self.numProtocols = 0 + def write(self, data): + print "test write:", data, + def loseConnection(self): + print "test lose connection" + self.protocol.connectionLost("Factory") + myTerm = "\r\n" + test_device = TestDevice() + test_factory = TestFactory() + test_protocol = LakeshoreProtocol(test_device, myTerm) + test_factory.protocol = test_protocol + test_protocol.factory = test_factory + test_protocol.transport = test_factory + test_protocol.connectionMade() + test_protocol.connectionMade() + test_protocol.connectionMade() + test_protocol.connectionLost("Dunno") + test_protocol.dataReceived("*IDN?" + myTerm + "*IDN?;*I") + test_protocol.dataReceived("DN") + test_protocol.dataReceived("?" + myTerm) + test_protocol.connectionLost("Dunno") diff --git a/site_ansto/instrument/TEST_SICS/fakeTempControl/lakeshore/displayscreen.py b/site_ansto/instrument/TEST_SICS/fakeTempControl/lakeshore/displayscreen.py new file mode 100644 index 00000000..137f93de --- /dev/null +++ b/site_ansto/instrument/TEST_SICS/fakeTempControl/lakeshore/displayscreen.py @@ -0,0 +1,150 @@ +# vim: ts=8 sts=4 sw=4 expandtab + +import curses + +class CursesStdIO: + """fake fd to be registered as a reader with the twisted reactor. + Curses classes needing input should extend this""" + + def fileno(self): + """ We want to select on FD 0 """ + return 0 + + def doRead(self): + """called when input is ready""" + + def logPrefix(self): return 'CursesClient' + +class Screen(CursesStdIO): + def __init__(self, stdscr): + self.timer = 0 + self.statusText = "TEST CURSES APP -" + self.searchText = '' + self.stdscr = stdscr + + # set screen attributes + self.stdscr.nodelay(1) # this is used to make input calls non-blocking + curses.cbreak() + self.stdscr.keypad(1) + curses.curs_set(0) # no annoying mouse cursor + + self.rows, self.cols = self.stdscr.getmaxyx() + self.lines = [] + + curses.start_color() + + # create color pair's 1 and 2 + curses.init_pair(1, curses.COLOR_BLACK, curses.COLOR_WHITE) + curses.init_pair(2, curses.COLOR_CYAN, curses.COLOR_BLACK) + + self.paintStatus(self.statusText) + self.stdscr.refresh() + + def connectionLost(self, reason): + self.close() + + def addLine(self, text): + """ add a line to the internal list of lines""" + + self.lines.append(text) + while len(self.lines) > 10: + del self.lines[0] + self.redisplayLines() + + def redisplayLines(self): + """ method for redisplaying lines + based on internal list of lines """ + + self.stdscr.clear() + self.paintStatus(self.statusText) + i = 0 + index = len(self.lines) - 1 + while i < (self.rows - 3) and index >= 0: + self.stdscr.addstr(self.rows - 3 - i, 0, self.lines[index], + curses.color_pair(2)) + i = i + 1 + index = index - 1 + self.stdscr.refresh() + + def paintStatus(self, text): + if len(text) > self.cols: raise TextTooLongError + self.stdscr.addstr(self.rows-2,0,text + ' ' * (self.cols-len(text)), + curses.color_pair(1)) + # move cursor to input line + self.stdscr.move(self.rows-1, self.cols-1) + + def doRead(self): + """ Input is ready! """ + curses.noecho() + self.timer = self.timer + 1 + c = self.stdscr.getch() # read a character + + if c == curses.KEY_BACKSPACE: + print "curses KEY_BACKSPACE" + self.searchText = self.searchText[:-1] + + elif c == curses.KEY_ENTER or c == 10: + self.addLine(self.searchText) + # for testing too + try: self.sendLine(self.searchText) + except: pass + self.stdscr.refresh() + self.searchText = '' + + elif c == curses.KEY_RESIZE: + print "curses KEY_RESIZE" + self.rows, self.cols = self.stdscr.getmaxyx() + self.redisplayLines() + self.paintStatus(self.statusText) + self.stdscr.refresh() + + elif c >= curses.KEY_MIN and c <= curses.KEY_MAX: + return + + else: + if len(self.searchText) >= self.cols-2: + return + self.searchText = self.searchText + chr(c) + + self.stdscr.addstr(self.rows-1, 0, + self.searchText + (' ' * ( + self.cols-len(self.searchText)-2))) + self.stdscr.move(self.rows-1, len(self.searchText)) + self.paintStatus(self.statusText + ' %d' % len(self.searchText)) + self.stdscr.refresh() + + def sendLine(self, txt): + pass + + def close(self): + """ clean up """ + + curses.nocbreak() + self.stdscr.keypad(0) + curses.echo() + curses.endwin() + +def device_display(): + global stdscr + try: + import datetime + now = datetime.datetime.now() + str = now.date().isoformat() + " " + now.time().isoformat() + stdscr.addstr(12, 25, "Time: %s" % str) + except: + pass + stdscr.refresh() + +if __name__ == '__main__': + global stdscr + from twisted.internet.task import LoopingCall + from twisted.internet import reactor + from twisted.python import log + + log.startLogging(open("/tmp/displayscreen.log", "w")) + stdscr = curses.initscr() + screen = Screen(stdscr) + lc = LoopingCall(device_display) + lc.start(0.250) + reactor.addReader(screen) # add screen object as a reader to the reactor + reactor.run()