implement fake lakeshore 336, 340 and 370 in Python with curses display

This commit is contained in:
Douglas Clowes
2013-06-21 14:27:37 +10:00
parent 3dce7feb8e
commit 2b1e34202d
10 changed files with 2285 additions and 0 deletions

View File

@@ -0,0 +1,124 @@
#!/usr/bin/env python
# vim: ts=8 sts=4 sw=4 expandtab
# Author: Douglas Clowes (dcl@ansto.gov.au) 2013-06-03
from twisted.internet.task import LoopingCall
from twisted.internet import reactor
from twisted.python import log, usage
from Lakeshore336 import Lakeshore336 as MYBASE
from LakeshoreFactory import LakeshoreFactory
from LakeshoreProtocol import LakeshoreProtocol
import sys
from displayscreen import Screen
class MyOptions(usage.Options):
optFlags = [
["window", "w", "Create a display window"],
]
optParameters = [
["logfile", "l", None, "output logfile name"],
["port", "p", None, "port number to listen on"],
]
def __init__(self):
usage.Options.__init__(self)
self['files'] = []
def parseArgs(self, *args):
for arg in args:
self['files'].append(arg)
class MyScreen(Screen):
def __init__(self, stdscr):
Screen.__init__(self, stdscr)
def sendLine(self, txt):
global myDev
myDev.protocol = self
myDev.dataReceived(txt)
def write(self, txt):
try:
newLine = self.lines[-1] + " => " + txt
del self.lines[-1]
self.addLine(newLine)
except:
pass
class MYDEV(MYBASE):
def __init__(self):
MYBASE.__init__(self)
print MYDEV.__name__, "ctor"
def device_display():
global stdscr, myDev, myOpts, myPort
try:
myDev.doIteration();
except:
raise
if not myOpts["window"]:
return
try:
stdscr.addstr(0, 25, "Identity : %s (%d)" % (myDev.IDN, myPort))
stdscr.addstr(1, 25, "Temperature: %8.3f" % myDev.KRDG[1])
stdscr.addstr(2, 25, "Setpoint : %8.3f" % myDev.SETP[1])
stdscr.addstr(3, 25, "Difference : %8.3f" % (myDev.KRDG[1] - myDev.SETP[1]))
stdscr.addstr(5, 25, "Target : %8.3f" % myDev.TARGET[1])
stdscr.addstr(6, 25, "Ramp Rate : %8.3f" % myDev.RAMP_RATE[1])
stdscr.addstr(7, 25, "Ramp On : %8s" % ["No", "Yes"][myDev.RAMP_ON[1]])
stdscr.addstr(8, 25, "Ramping : %8s" % ["No", "Yes"][myDev.RAMP_ST[1]])
stdscr.addstr(0, 0, "Random : %8.3f" % myDev.RANDOM)
stdscr.refresh()
except:
raise
if __name__ == "__main__":
global stdscr, myDev, myOpts, myPort
myOpts = MyOptions()
try:
myOpts.parseOptions()
except usage.UsageError, errortext:
print '%s: %s' % (sys.argv[0], errortext)
print '%s: Try --help for usage details.' % (sys.argv[0])
raise SystemExit, 1
myDev = MYDEV()
digits = myDev.IDN[10:13]
default_port = 7000 + int(digits)
myPort = default_port
logfile = None
if myOpts["port"]:
myPort = int(myOpts["port"])
if myPort < 1025 or myPort > 65535:
myPort = default_port
if myOpts["window"]:
logfile = "/tmp/Fake_LS%s_%d.log" % (digits, myPort)
if myOpts["logfile"]:
logfile = myOpts["logfile"]
if logfile:
log.startLogging(open(logfile, "w"))
else:
log.startLogging(sys.stdout)
#log.startLogging(sys.stderr)
if myOpts["window"]:
import curses
stdscr = curses.initscr()
screen = MyScreen(stdscr)
# add screen object as a reader to the reactor
reactor.addReader(screen)
myFactory = LakeshoreFactory(LakeshoreProtocol, myDev, "\r")
lc = LoopingCall(device_display)
lc.start(0.250)
reactor.listenTCP(myPort, myFactory) # server
reactor.run()

View File

@@ -0,0 +1,124 @@
#!/usr/bin/env python
# vim: ts=8 sts=4 sw=4 expandtab
# Author: Douglas Clowes (dcl@ansto.gov.au) 2013-06-03
from twisted.internet.task import LoopingCall
from twisted.internet import reactor
from twisted.python import log, usage
from Lakeshore340 import Lakeshore340 as MYBASE
from LakeshoreFactory import LakeshoreFactory
from LakeshoreProtocol import LakeshoreProtocol
import sys
from displayscreen import Screen
class MyOptions(usage.Options):
optFlags = [
["window", "w", "Create a display window"],
]
optParameters = [
["logfile", "l", None, "output logfile name"],
["port", "p", None, "port number to listen on"],
]
def __init__(self):
usage.Options.__init__(self)
self['files'] = []
def parseArgs(self, *args):
for arg in args:
self['files'].append(arg)
class MyScreen(Screen):
def __init__(self, stdscr):
Screen.__init__(self, stdscr)
def sendLine(self, txt):
global myDev
myDev.protocol = self
myDev.dataReceived(txt)
def write(self, txt):
try:
newLine = self.lines[-1] + " => " + txt
del self.lines[-1]
self.addLine(newLine)
except:
pass
class MYDEV(MYBASE):
def __init__(self):
MYBASE.__init__(self)
print MYDEV.__name__, "ctor"
def device_display():
global stdscr, myDev, myOpts, myPort
try:
myDev.doIteration();
except:
raise
if not myOpts["window"]:
return
try:
stdscr.addstr(0, 25, "Identity : %s (%d)" % (myDev.IDN, myPort))
stdscr.addstr(1, 25, "Temperature: %8.3f" % myDev.KRDG[1])
stdscr.addstr(2, 25, "Setpoint : %8.3f" % myDev.SETP[1])
stdscr.addstr(3, 25, "Difference : %8.3f" % (myDev.KRDG[1] - myDev.SETP[1]))
stdscr.addstr(5, 25, "Target : %8.3f" % myDev.TARGET[1])
stdscr.addstr(6, 25, "Ramp Rate : %8.3f" % myDev.RAMP_RATE[1])
stdscr.addstr(7, 25, "Ramp On : %8s" % ["No", "Yes"][myDev.RAMP_ON[1]])
stdscr.addstr(8, 25, "Ramping : %8s" % ["No", "Yes"][myDev.RAMP_ST[1]])
stdscr.addstr(0, 0, "Random : %8.3f" % myDev.RANDOM)
stdscr.refresh()
except:
raise
if __name__ == "__main__":
global stdscr, myDev, myOpts, myPort
myOpts = MyOptions()
try:
myOpts.parseOptions()
except usage.UsageError, errortext:
print '%s: %s' % (sys.argv[0], errortext)
print '%s: Try --help for usage details.' % (sys.argv[0])
raise SystemExit, 1
myDev = MYDEV()
digits = myDev.IDN[10:13]
default_port = 7000 + int(digits)
myPort = default_port
logfile = None
if myOpts["port"]:
myPort = int(myOpts["port"])
if myPort < 1025 or myPort > 65535:
myPort = default_port
if myOpts["window"]:
logfile = "/tmp/Fake_LS%s_%d.log" % (digits, myPort)
if myOpts["logfile"]:
logfile = myOpts["logfile"]
if logfile:
log.startLogging(open(logfile, "w"))
else:
log.startLogging(sys.stdout)
#log.startLogging(sys.stderr)
if myOpts["window"]:
import curses
stdscr = curses.initscr()
screen = MyScreen(stdscr)
# add screen object as a reader to the reactor
reactor.addReader(screen)
myFactory = LakeshoreFactory(LakeshoreProtocol, myDev, "\r")
lc = LoopingCall(device_display)
lc.start(0.250)
reactor.listenTCP(myPort, myFactory) # server
reactor.run()

View File

@@ -0,0 +1,124 @@
#!/usr/bin/env python
# vim: ts=8 sts=4 sw=4 expandtab
# Author: Douglas Clowes (dcl@ansto.gov.au) 2013-06-03
from twisted.internet.task import LoopingCall
from twisted.internet import reactor
from twisted.python import log, usage
from Lakeshore370 import Lakeshore370 as MYBASE
from LakeshoreFactory import LakeshoreFactory
from LakeshoreProtocol import LakeshoreProtocol
import sys
from displayscreen import Screen
class MyOptions(usage.Options):
optFlags = [
["window", "w", "Create a display window"],
]
optParameters = [
["logfile", "l", None, "output logfile name"],
["port", "p", None, "port number to listen on"],
]
def __init__(self):
usage.Options.__init__(self)
self['files'] = []
def parseArgs(self, *args):
for arg in args:
self['files'].append(arg)
class MyScreen(Screen):
def __init__(self, stdscr):
Screen.__init__(self, stdscr)
def sendLine(self, txt):
global myDev
myDev.protocol = self
myDev.dataReceived(txt)
def write(self, txt):
try:
newLine = self.lines[-1] + " => " + txt
del self.lines[-1]
self.addLine(newLine)
except:
pass
class MYDEV(MYBASE):
def __init__(self):
MYBASE.__init__(self)
print MYDEV.__name__, "ctor"
def device_display():
global stdscr, myDev, myOpts, myPort
try:
myDev.doIteration();
except:
raise
if not myOpts["window"]:
return
try:
stdscr.addstr(0, 25, "Identity : %s (%d)" % (myDev.IDN, myPort))
stdscr.addstr(1, 25, "Temperature: %8.3f" % myDev.KRDG[1])
stdscr.addstr(2, 25, "Setpoint : %8.3f" % myDev.SETP[1])
stdscr.addstr(3, 25, "Difference : %8.3f" % (myDev.KRDG[1] - myDev.SETP[1]))
stdscr.addstr(5, 25, "Target : %8.3f" % myDev.TARGET[1])
stdscr.addstr(6, 25, "Ramp Rate : %8.3f" % myDev.RAMP_RATE[1])
stdscr.addstr(7, 25, "Ramp On : %8s" % ["No", "Yes"][myDev.RAMP_ON[1]])
stdscr.addstr(8, 25, "Ramping : %8s" % ["No", "Yes"][myDev.RAMP_ST[1]])
stdscr.addstr(0, 0, "Random : %8.3f" % myDev.RANDOM)
stdscr.refresh()
except:
raise
if __name__ == "__main__":
global stdscr, myDev, myOpts, myPort
myOpts = MyOptions()
try:
myOpts.parseOptions()
except usage.UsageError, errortext:
print '%s: %s' % (sys.argv[0], errortext)
print '%s: Try --help for usage details.' % (sys.argv[0])
raise SystemExit, 1
myDev = MYDEV()
digits = myDev.IDN[10:13]
default_port = 7000 + int(digits)
myPort = default_port
logfile = None
if myOpts["port"]:
myPort = int(myOpts["port"])
if myPort < 1025 or myPort > 65535:
myPort = default_port
if myOpts["window"]:
logfile = "/tmp/Fake_LS%s_%d.log" % (digits, myPort)
if myOpts["logfile"]:
logfile = myOpts["logfile"]
if logfile:
log.startLogging(open(logfile, "w"))
else:
log.startLogging(sys.stdout)
#log.startLogging(sys.stderr)
if myOpts["window"]:
import curses
stdscr = curses.initscr()
screen = MyScreen(stdscr)
# add screen object as a reader to the reactor
reactor.addReader(screen)
myFactory = LakeshoreFactory(LakeshoreProtocol, myDev, "\r")
lc = LoopingCall(device_display)
lc.start(0.250)
reactor.listenTCP(myPort, myFactory) # server
reactor.run()

View File

@@ -0,0 +1,527 @@
# vim: ts=8 sts=4 sw=4 expandtab
# Fake Lakeshore Model 336 Temperature Controller
#
# Author: Douglas Clowes 2012, 2013
#
from LakeshoreDevice import LakeshoreDevice
import random
import re
import sys
import time
class Lakeshore336(LakeshoreDevice):
"""Lakeshore 336 temperature controller object - simulates the LS336"""
def __init__(self):
LakeshoreDevice.__init__(self)
print Lakeshore336.__name__, "ctor"
self.CONFIG_LOOPS = [1, 2, 3, 4]
self.CONFIG_SNSRS = [1, 2, 3, 4]
self.reset_powerup()
def doCommand(self, command, params):
print Lakeshore336.__name__, "Command:", command, params
return LakeshoreDevice.doCommand(self, command, params)
def doQuery(self, command, params):
print Lakeshore336.__name__, "Query:", command, params
return LakeshoreDevice.doQuery(self, command, params)
def reset_powerup(self):
print Lakeshore336.__name__, "reset_powerup"
self.LAST_ITERATION = 0
self.ALARM = {}
self.ALARMST = {}
for idx in self.CONFIG_SNSRS:
self.ALARM[idx] = "0"
self.ALARMST[idx] = "0"
self.ANALOG = {1: "0,1,1,1,400.0,0.0,0.0", 2: "0,1,1,1,400.0,0.0,0.0"}
self.AOUT = { 1: 0.0, 2: 0.0 }
self.CFILT = {1: 1, 2: 1}
self.CLIMI = 0.0
self.CLIMIT = {1: "400.0,10,0,0", 2: "400.0,10,0,0"}
self.CMODE = {}
for idx in self.CONFIG_LOOPS:
self.CMODE[idx] = 1
self.CRVHDR = {}
self.CRVHDR[1] = "DT-336-1 ,STANDARD ,1,+500.000,1"
self.CRVHDR[2] = "DT-336-2 ,STANDARD ,2,+0.500,1"
self.CRVHDR[3] = "DT-336-3 ,STANDARD ,3,+2.000,2"
self.CRVHDR[4] = "DT-336-4 ,STANDARD ,4,+0.301,2"
self.CRVPT = {}
for i in range(1,21):
self.CRVPT[i] = [(0.0, 0.0)]
self.CSET = {1: "A,1,1,0", 2: "B,1,1,0"}
self.DOUT = 0
self.FILTER = {1: "1,10,2"}
self.FREQ = 2
self.GUARD = 0
self.HTR = 0.0
self.HTRRNG = 0
self.HTRST = 0
self.IDN = "LSCI,MODEL336,123456/123456,1.0"
self.IEEE = "0,0,4"
self.INCRV = {}
self.INTYPE = {}
for idx in self.CONFIG_SNSRS:
self.INCRV[idx] = "1"
self.INTYPE[idx] = "1,0,1,0,1"
self.KEYST = 1
self.KRDG = {}
for idx in self.CONFIG_SNSRS:
self.KRDG[idx] = 300.0
self.LOCK = "0,000"
self.MDAT = {1: "0,0,1"} # Min,Max,Reset
self.MOUT = {1: "0.0", 2: "0.0", 3: "0.0", 4: "0.0"}
self.OUTMODE = {1: "1,1,0", 2: "1,2,0", 3: "1,3,0", 4: "1,4,0"}
self.PID = {1: "+0150.0,+0005.0,+000.0", 2: "+0150.0,+0005.0,+000.0"}
self.RAMP_ON = {1: 0, 2: 0}
self.RAMP_RATE = {1: 0.000, 2: 0.000}
self.RAMP_ST = {1: 0, 2:0}
self.RAMP_TIME = 0.0
self.RANGE = {}
for idx in range(1,5):
self.RANGE[idx] = "1"
self.RDGST = {"A": 0, "B": 0, "C": 0, "D": 0}
self.RELAY = {1: "1,A,0", 2: "2,A,0"}
self.RELAYST = {1: "0", 2: "0"}
self.SETP = {}
self.TARGET = {}
self.RAMP_START_TEMP = {}
self.RAMP_START_TIME = {}
for idx in self.CONFIG_LOOPS:
self.SETP[idx] = 300.0
self.TARGET[idx] = 300.0
self.RAMP_START_TEMP[idx] = 300.0
self.RAMP_START_TIME[idx] = 0.0
self.STB = 0
self.ESE = 0
self.ESR = 0
self.OPC = 0
self.SRE = 0
self.TST = 0
self.RANDOM = 0.5
def doIteration(self):
delta_time = time.time() - self.LAST_ITERATION
if delta_time < 1:
return
#print "DoIteration:", delta_time
self.LAST_ITERATION = time.time()
for idx in self.CONFIG_LOOPS:
# TODO - progress ramping setpoints (SP)
if idx in self.RAMP_ON and self.RAMP_ON[idx] and self.TARGET[idx] != self.SETP[idx]:
delta_time = time.time() - self.RAMP_START_TIME[idx];
delta_temp = self.RAMP_RATE[idx] * (delta_time / 60.0)
if self.TARGET[idx] > self.RAMP_START_TEMP[idx]:
self.SETP[idx] = self.RAMP_START_TEMP[idx] + delta_temp
if self.SETP[idx] >= self.TARGET[idx]:
self.SETP[idx] = self.TARGET[idx]
self.RAMP_ST[idx] = 0
else:
self.SETP[idx] = self.RAMP_START_TEMP[idx] - delta_temp
if self.SETP[idx] <= self.TARGET[idx]:
self.SETP[idx] = self.TARGET[idx]
self.RAMP_ST[idx] = 0
# TODO - iterate Power Level
if self.KRDG[idx] <> self.SETP[idx]:
self.HTR = self.SETP[idx] - self.KRDG[idx]
if self.HTR > 100.0:
self.HTR = 100.0
elif self.HTR < -100.0:
self.HTR = -100.0
# TODO - iterate Process Values (PV)
self.KRDG[idx] = (0.9 * self.KRDG[idx] + 0.1 * self.SETP[idx])
def doCommandCLS(self, cmd, args):
print "Unimplemented Command: \"*CLS\" in \"" + cmd + " " + ",".join(args) + "\""
def doCommandESE(self, cmd, args):
self.ESE = int(args[0]) & 0xFF
def doQueryESE(self, cmd, args):
self.write("%d" % self.ESE)
def doQueryESR(self, cmd, args):
self.write("%d" % self.ESR)
def doQueryIDN(self, cmd, args):
self.write(self.IDN)
def doCommandOPC(self, cmd, args):
self.OPC = 1
def doQueryOPC(self, cmd, args):
self.write("1")
def doCommandRST(self, cmd, args):
print "Unimplemented Command: \"*RST\" in \"" + cmd + " " + ",".join(args) + "\""
def doCommandSRE(self, cmd, args):
self.SRE = int(args[0]) & 0xFF
def doQuerySRE(self, cmd, args):
self.write("%d" % self.SRE)
def doQuerySTB(self, cmd, args):
self.write("%d" % self.STB)
def doQueryTST(self, cmd, args):
self.write("%d" % self.TST)
def doCommandWAI(self, cmd, args):
print "Unimplemented Command: \"*WAI\" in \"" + cmd + " " + ",".join(args) + "\""
def doCommandALARM(self, cmd, args):
if len(args) > 0:
idx = ord(args[0]) - 64
params = args[1:].join(",")
if idx in self.ALARM:
self.ALARM[idx] = params
def doQueryALARM(self, cmd, args):
if len(args) > 0:
idx = ord(args[0]) - 64
if idx in self.ALARM:
self.write(self.ALARM[idx])
else:
self.write("0,1,500.0,0.0,0")
def doQueryALARMST(self, cmd, args):
if len(args) > 0:
idx = ord(args[0]) - 64
if idx in self.ALARMST:
self.write(self.ALARMST[idx])
else:
self.write("0,0")
def doCommandALMRST(self, cmd, args):
for key in self.ALARMST.keys():
self.ALARMST[key] = "0,0"
def doCommandANALOG(self, cmd, args):
key = args[0]
if key < 1 or key > 2:
key = 1
self.ANALOG[key] = ",".join(args[1:])
print "TODO implement Command: \"ANALOG\" in \"" + cmd + " " + ",".join(args) + "\""
def doQueryANALOG(self, cmd, args):
key = args[0]
if key < 1 or key > 2:
key = 1
self.write(self.ANALOG[key]) # TODO
print "TODO implement Query: \"ANALOG?\" in \"" + cmd + " " + ",".join(args) + "\""
def doQueryAOUT(self, cmd, args):
key = args[0]
if key < 1 or key > 2:
key = 1
self.write("%6.3f" % self.AOUT[key]) # TODO
print "TODO implement Query: \"AOUT?\" in \"" + cmd + " " + ",".join(args) + "\""
def doCommandBEEP(self, cmd, args):
print "Unimplemented Command: \"BEEP\" in \"" + cmd + " " + ",".join(args) + "\""
def doQueryBEEP(self, cmd, args):
print "Unimplemented Query: \"BEEP?\" in \"" + cmd + " " + ",".join(args) + "\""
def doQueryBUSY(self, cmd, args):
self.write("0")
def doCommandCDISP(self, cmd, args):
print "Unimplemented Command: \"CDISP\" in \"" + cmd + " " + ",".join(args) + "\""
def doQueryCDISP(self, cmd, args):
print "Unimplemented Query: \"CDISP?\" in \"" + cmd + " " + ",".join(args) + "\""
def doCommandCFILT(self, cmd, args):
loop = int(args[0])
if loop < 1:
loop = 1
if loop > 2:
loop = 2
self.CFILT[loop] = int(args[1])
def doQueryCFILT(self, cmd, args):
loop = int(args[0])
if loop < 1:
loop = 1
if loop > 2:
loop = 2
self.write("%d" % self.CFILT[loop])
def doCommandCLIMI(self, cmd, args):
self.CLIMI = double(args[0])
def doQueryCLIMI(self, cmd, args):
self.write("%f" % self.CLIMI)
def doCommandCLIMIT(self, cmd, args):
loop = int(args[0])
if loop < 1:
loop = 1
if loop > 2:
loop = 2
self.CLIMIT[loop] = ",".join(args[1:])
def doQueryCLIMIT(self, cmd, args):
loop = int(args[0])
if loop < 1:
loop = 1
if loop > 2:
loop = 2
self.write("%s" % self.CLIMIT[loop])
def doCommandCMODE(self, cmd, args):
loop = int(args[0])
if loop in self.CMODE:
self.CMODE[loop] = int(args[1])
def doQueryCMODE(self, cmd, args):
loop = int(args[0])
if loop in self.CMODE:
self.write("%f" % self.CMODE[loop])
def doCommandCOMM(self, cmd, args):
self.COMM = ",".join(args)
def doQueryCOMM(self, cmd, args):
self.write("%f" % self.COMM)
def doQueryCRDG(self, cmd, args):
loop = ord(args[0]) - 64
if loop < 1:
loop = 1
if loop > 4:
loop = 4
self.write("%f" % (self.KRDG[loop] - 273.15 + self.RANDOM))
def doCommandCRVDEL(self, cmd, args):
print "Unimplemented Command: \"CRVDEL\" in \"" + cmd + " " + ",".join(args) + "\""
def doCommandCRVHDR(self, cmd, args):
print "Unimplemented Command: \"CRVHDR\" in \"" + cmd + " " + ",".join(args) + "\""
def doQueryCRVHDR(self, cmd, args):
key = int(args[0])
if key in self.CRVHDR:
self.write(self.CRVHDR[key])
else:
self.write("DT-336-1 ,STANDARD ,1,+500.000,1") # TODO
print "TODO implement Query: \"CRVHDR?\" in \"" + cmd + " " + ",".join(args) + "\""
def doCommandCRVPT(self, cmd, args):
key = int(args[0])
if key < 1 or key > 20:
key = 1
idx = int(args[1])
if idx < 1 or idx > 200:
idx = 1
# TODO set the Curve Point
print "TODO implement Command: \"CRVPT\" in \"" + cmd + " " + ",".join(args) + "\""
def doQueryCRVPT(self, cmd, args):
key = int(args[0])
if key < 1 or key > 20:
key = 1
idx = int(args[1])
if idx < 1 or idx > 200:
idx = 1
self.write("1.0E+01,1.0+E02") # TODO
print "TODO implement Query: \"CRVPT?\" in \"" + cmd + " " + ",".join(args) + "\""
def doCommandCRVSAV(self, cmd, args):
pass
def doCommandCSET(self, cmd, args):
loop = int(args[0])
if loop < 1:
loop = 1
if loop > 2:
loop = 2
self.CSET[loop] = ",".join(args[1:])
def doQueryCSET(self, cmd, args):
loop = int(args[0])
if loop < 1:
loop = 1
if loop > 2:
loop = 2
self.write("%s" % self.CSET[loop])
def doCommandDFLT(self, cmd, args):
if args[0] == "99":
print "Unimplemented Command: \"DFLT 99\" in \"" + cmd + " " + ",".join(args) + "\""
else:
print "Invalid Command: \"DFLT " + args[0] + "\" in \"" + cmd + " " + ",".join(args) + "\""
def doCommandDISPLAY(self, cmd, args):
print "Unimplemented Command: \"DISPLAY\" in \"" + cmd + " " + ",".join(args) + "\""
def doQueryDISPLAY(self, cmd, args):
print "Unimplemented Query: \"DISPLAY?\" in \"" + cmd + " " + ",".join(args) + "\""
def doCommandDISPLOC(self, cmd, args):
print "Unimplemented Command: \"DISPLOC\" in \"" + cmd + " " + ",".join(args) + "\""
def doQueryDISPLOC(self, cmd, args):
print "Unimplemented Query: \"DISPLOC?\" in \"" + cmd + " " + ",".join(args) + "\""
def doCommandDOUT(self, cmd, args):
self.DOUT = int(args[0])
def doQueryDOUT(self, cmd, args):
self.write("%d" % self.DOUT)
def doCommandFILTER(self, cmd, args):
if len(args) > 0:
if int(args[0]) in self.FILTER:
keys = [int(args)]
else:
keys = self.FILTER.keys()
params = args[1:].join(",")
for key in keys:
self.FILTER[key] = params
def doQueryFILTER(self, cmd, args):
idx = int(args[0])
if idx in self.FILTER:
self.write(self.FILTER[idx])
else:
self.write("0")
def doQueryHTR(self, cmd, args):
self.write("%f" % self.HTR)
def doQueryHTRST(self, cmd, args):
self.write("%d" % self.HTRST)
def doQueryIEEE(self, cmd, args):
self.write("%s" % self.IEEE)
def doCommandIEEE(self, cmd, args):
self.IEEE = args[0]
def doQueryINCRV(self, cmd, args):
idx = ord(args[0]) - 64
if idx in self.INCRV:
self.write(self.INCRV[idx])
else:
self.write("0")
def doCommandINCRV(self, cmd, args):
if len(args) > 1:
idx = ord(args[0]) - 64
if idx in self.INCRV:
for key in keys:
self.INCRV[key] = args[1]
def doQueryINTYPE(self, cmd, args):
idx = ord(args[0]) - 64
if idx in self.INTYPE:
self.write(self.INTYPE[idx])
else:
self.write("0")
def doCommandINTYPE(self, cmd, args):
if len(args) > 1:
idx = ord(args[0]) - 64
if idx in self.INTYPE:
for key in keys:
self.INTYPE[key] = ",".join(args[1:])
def doQueryKEYST(self, cmd, args):
self.write("%d" % self.KEYST)
self.KEYST = 0
def doQueryKRDG(self, cmd, args):
loop = ord(args[0]) - 64
if loop < 1:
loop = 1
if loop > 4:
loop = 4
self.write("%f" % (self.KRDG[loop] + self.RANDOM))
def doQueryLDAT(self, cmd, args):
self.write("3.000E+02") # TODO
print "TODO implement Query: \"LDAT?\" in \"" + cmd + " " + ",".join(args) + "\""
def doQueryLINEAR(self, cmd, args):
self.write("1,1.0,1,3") # TODO
print "TODO implement Query: \"LINEAR?\" in \"" + cmd + " " + ",".join(args) + "\""
def doCommandLINEAR(self, cmd, args):
print "Unimplemented Command: \"LINEAR\" in \"" + cmd + " " + ",".join(args) + "\""
def doQueryLOCK(self, cmd, args):
self.write("%s" % self.LOCK)
def doCommandLOCK(self, cmd, args):
self.LOCK = args[0]
def doQueryMDAT(self, cmd, args):
response = "0"
if len(args[0]) > 0:
idx = int(args[0])
if idx in self.MDAT:
(minv, maxv, reset) = self.MDAT[idx].split(",")
response = "%f,%f" % (float(minv), float(maxv))
self.write(response)
def doQueryMNMX(self, cmd, args):
self.write("1") # TODO
print "TODO implement Query: \"MNMX?\" in \"" + cmd + " " + ",".join(args) + "\""
def doCommandMNMX(self, cmd, args):
print "Unimplemented Command: \"MNMX\" in \"" + cmd + " " + ",".join(args) + "\""
def doCommandMNMXRST(self, cmd, args):
print "Unimplemented Command: \"MNMXRST\" in \"" + cmd + " " + ",".join(args) + "\""
def doQueryMODE(self, cmd, args):
print "Unimplemented Query: \"MODE?\" in \"" + cmd + " " + ",".join(args) + "\""
def doCommandMODE(self, cmd, args):
print "Unimplemented Command: \"MODE\" in \"" + cmd + " " + ",".join(args) + "\""
def doQueryMONITOR(self, cmd, args):
print "Unimplemented Query: \"MONITOR?\" in \"" + cmd + " " + ",".join(args) + "\""
def doCommandMONITOR(self, cmd, args):
print "Unimplemented Command: \"MONITOR\" in \"" + cmd + " " + ",".join(args) + "\""
def doQueryMOUT(self, cmd, args):
idx = int(args[0])
self.write(self.MOUT[idx])
def doCommandMOUT(self, cmd, args):
print "Unimplemented Command: \"MOUT\" in \"" + cmd + " " + ",".join(args) + "\""
def doQueryOUTMODE(self, cmd, args):
idx = int(args[0])
if idx in self.OUTMODE:
self.write(self.OUTMODE[idx])
else:
self.write(self.OUTMODE[1])
def doCommandOUTMODE(self, cmd, args):
print "Unimplemented Command: \"OUTMODE\" in \"" + cmd + " " + ",".join(args) + "\""
def doQueryPID(self, cmd, args):
idx = int(args[0])
self.write(self.PID[idx])
def doCommandPID(self, cmd, args):
print "Unimplemented Command: \"PID\" in \"" + cmd + " " + ",".join(args) + "\""
def doQueryRANGE(self, cmd, args):
idx = int(args[0])
self.write(self.RANGE[idx])
def doCommandRANGE(self, cmd, args):
idx = int(args[0])
val = int(args[1])
self.RANGE[idx] = val
def doQueryRAMP(self, cmd, args):
idx = int(args[0])
response = "%d,%f" % (self.RAMP_ON[idx], self.RAMP_RATE[idx])
self.write(response)
def doCommandRAMP(self, cmd, args):
idx = int(args[0])
ramp_on = int(args[1])
if ramp_on == 0 or ramp_on == 1:
self.RAMP_ON[idx] = ramp_on
if ramp_on == 1:
ramp_rate = float(args[2])
if ramp_rate >= 0.001 and ramp_rate <= 100.0:
self.RAMP_RATE[idx] = ramp_rate
def doQueryRAMPST(self, cmd, args):
idx = int(args[0])
response = "%d" % self.RAMP_ST[idx]
self.write(response)
def doQueryRDGPWR(self, cmd, args):
self.write("1.000E-15") # TODO
print "TODO implement Query: \"RDGPWR?\" in \"" + cmd + " " + ",".join(args) + "\""
def doQueryRDGR(self, cmd, args):
self.write("1.000E+03") # TODO
print "TODO implement Query: \"RDGR?\" in \"" + cmd + " " + ",".join(args) + "\""
def doQueryRDGRNG(self, cmd, args):
self.write("1,1,19,0,0") # TODO
print "TODO implement Query: \"RDGRNG?\" in \"" + cmd + " " + ",".join(args) + "\""
def doCommandRDGRNG(self, cmd, args):
print "Unimplemented Command: \"RDGRNG\" in \"" + cmd + " " + ",".join(args) + "\""
def doQueryRDGST(self, cmd, args):
self.write("000")
print "TODO implement Query: \"RDGST?\" in \"" + cmd + " " + ",".join(args) + "\""
def doQueryRELAY(self, cmd, args):
idx = int(args[0])
self.write(self.RELAY[idx])
def doCommandRELAY(self, cmd, args):
idx = int(args[0])
if idx in self.RELAY:
self.relay[idx] = ",".join(args[1:])
def doQueryRELAYST(self, cmd, args):
idx = int(args[0])
self.write(self.RELAYST[idx])
def doQuerySETP(self, cmd, args):
idx = int(args[0])
val = self.SETP[idx]
self.write("%f" % val)
def doCommandSETP(self, cmd, args):
idx = int(args[0])
val = float(args[1])
if (val >= 0.0 and val <= 500.0):
self.TARGET[idx] = val
if self.RAMP_ON[idx] == 0:
self.SETP[idx] = val
self.RAMP_ST[idx] = 0
else:
self.RAMP_START_TEMP[idx] = self.SETP[idx]
self.RAMP_START_TIME[idx] = time.time()
self.RAMP_ST[idx] = 1
def doQueryZONE(self, cmd, args):
print "Unimplemented Query: \"ZONE?\" in \"" + cmd + " " + ",".join(args) + "\""
def doCommandZONE(self, cmd, args):
print "Unimplemented Command: \"ZONE\" in \"" + cmd + " " + ",".join(args) + "\""
if __name__ == '__main__':
from LakeshoreProtocol import LakeshoreProtocol
class TestFactory:
def __init__(self):
print self.__class__.__name__, "ctor"
self.numProtocols = 0
def write(self, data):
print "test write:", data,
def loseConnection(self):
print "test lose connection"
test_factory = TestFactory()
test_device = Lakeshore336()
test_protocol = LakeshoreProtocol(test_device, "\r\n")
test_protocol.factory = test_factory
test_protocol.transport = test_factory
test_device.protocol = test_protocol
test_device.protocol.connectionMade()
test_device.protocol.dataReceived("*IDN?")
test_device.protocol.dataReceived("*TST?")
test_device.protocol.connectionLost("Dunno")

View File

@@ -0,0 +1,541 @@
# vim: ts=8 sts=4 sw=4 expandtab
# Fake Lakeshore Model 340 Temperature Controller
#
# Author: Douglas Clowes 2012, 2013
#
from LakeshoreDevice import LakeshoreDevice
import random
import re
import sys
import time
class Lakeshore340(LakeshoreDevice):
"""Lakeshore 340 temperature controller object - simulates the LS340"""
def __init__(self):
LakeshoreDevice.__init__(self)
print Lakeshore340.__name__, "ctor"
self.CONFIG_LOOPS = [1, 2]
self.CONFIG_SNSRS = [1, 2, 3, 4]
self.reset_powerup()
def doCommand(self, command, params):
print Lakeshore340.__name__, "Command:", command, params
return LakeshoreDevice.doCommand(self, command, params)
def doQuery(self, command, params):
print Lakeshore340.__name__, "Query:", command, params
return LakeshoreDevice.doQuery(self, command, params)
def reset_powerup(self):
print Lakeshore340.__name__, "reset_powerup"
self.LAST_ITERATION = 0
self.ALARM = {}
self.ALARMST = {}
for idx in self.CONFIG_SNSRS:
self.ALARM[idx] = "0"
self.ALARMST[idx] = "0"
self.ANALOG = {1: "0,1,1,1,400.0,0.0,0.0", 2: "0,1,1,1,400.0,0.0,0.0"}
self.AOUT = { 1: 0.0, 2: 0.0 }
self.CFILT = {1: 1, 2: 1}
self.CLIMI = 0.0
self.CLIMIT = {1: "400.0,10,0,0", 2: "400.0,10,0,0"}
self.CMODE = {}
for idx in self.CONFIG_LOOPS:
self.CMODE[idx] = 1
self.CRVHDR = {}
self.CRVHDR[1] = "DT-336-1 ,STANDARD ,1,+500.000,1"
self.CRVHDR[2] = "DT-336-2 ,STANDARD ,2,+0.500,1"
self.CRVHDR[3] = "DT-336-3 ,STANDARD ,3,+2.000,2"
self.CRVHDR[4] = "DT-336-4 ,STANDARD ,4,+0.301,2"
self.CRVPT = {}
for i in range(1,21):
self.CRVPT[i] = [(0.0, 0.0)]
self.CSET = {1: "A,1,1,0", 2: "B,1,1,0"}
self.DOUT = 0
self.FILTER = {1: "1,10,2"}
self.FREQ = 2
self.GUARD = 0
self.HTR = 0.0
self.HTRRNG = 0
self.HTRST = 0
self.IDN = "LSCI,MODEL340,123456,01022003"
self.IEEE = "0,0,4"
self.INCRV = {}
self.INSET = {}
self.INTYPE = {}
for idx in self.CONFIG_SNSRS:
self.INCRV[idx] = "1"
self.INSET[idx] = "1,0"
self.INTYPE[idx] = "1,0,1,0,1"
self.KEYST = 1
self.KRDG = {}
for idx in self.CONFIG_SNSRS:
self.KRDG[idx] = 300.0
self.LOCK = "0,000"
self.MDAT = {1: "0,0,1"} # Min,Max,Reset
self.MOUT = {1: "0.0", 2: "0.0", 3: "0.0", 4: "0.0"}
self.OUTMODE = {1: "1,1,0", 2: "1,2,0", 3: "1,3,0", 4: "1,4,0"}
self.PID = {1: "+0150.0,+0005.0,+000.0", 2: "+0150.0,+0005.0,+000.0"}
self.RAMP_ON = {1: 0, 2: 0}
self.RAMP_RATE = {1: 0.000, 2: 0.000}
self.RAMP_ST = {1: 0, 2:0}
self.RAMP_TIME = 0.0
self.RANGE = {}
for idx in range(1,5):
self.RANGE[idx] = "1"
self.RDGST = {"A": 0, "B": 0, "C": 0, "D": 0}
self.RELAY = {1: "1,A,0", 2: "2,A,0"}
self.RELAYST = {1: "0", 2: "0"}
self.SETTLE = "10.0,10"
self.SETP = {}
self.TARGET = {}
self.RAMP_START_TEMP = {}
self.RAMP_START_TIME = {}
for idx in self.CONFIG_LOOPS:
self.SETP[idx] = 300.0
self.TARGET[idx] = 300.0
self.RAMP_START_TEMP[idx] = 300.0
self.RAMP_START_TIME[idx] = 0.0
self.STB = 0
self.ESE = 0
self.ESR = 0
self.OPC = 0
self.SRE = 0
self.TST = 0
self.RANDOM = 0.5
def doIteration(self):
delta_time = time.time() - self.LAST_ITERATION
if delta_time < 1:
return
#print "DoIteration:", delta_time
self.LAST_ITERATION = time.time()
for idx in self.CONFIG_LOOPS:
# TODO - progress ramping setpoints (SP)
if idx in self.RAMP_ON and self.RAMP_ON[idx] and self.TARGET[idx] != self.SETP[idx]:
delta_time = time.time() - self.RAMP_START_TIME[idx];
delta_temp = self.RAMP_RATE[idx] * (delta_time / 60.0)
if self.TARGET[idx] > self.RAMP_START_TEMP[idx]:
self.SETP[idx] = self.RAMP_START_TEMP[idx] + delta_temp
if self.SETP[idx] >= self.TARGET[idx]:
self.SETP[idx] = self.TARGET[idx]
self.RAMP_ST[idx] = 0
else:
self.SETP[idx] = self.RAMP_START_TEMP[idx] - delta_temp
if self.SETP[idx] <= self.TARGET[idx]:
self.SETP[idx] = self.TARGET[idx]
self.RAMP_ST[idx] = 0
# TODO - iterate Power Level
if self.KRDG[idx] <> self.SETP[idx]:
self.HTR = self.SETP[idx] - self.KRDG[idx]
if self.HTR > 100.0:
self.HTR = 100.0
elif self.HTR < -100.0:
self.HTR = -100.0
# TODO - iterate Process Values (PV)
self.KRDG[idx] = (0.9 * self.KRDG[idx] + 0.1 * self.SETP[idx])
def doCommandCLS(self, cmd, args):
print "Unimplemented Command: \"*CLS\" in \"" + cmd + " " + ",".join(args) + "\""
def doCommandESE(self, cmd, args):
self.ESE = int(args[0]) & 0xFF
def doQueryESE(self, cmd, args):
self.write("%d" % self.ESE)
def doQueryESR(self, cmd, args):
self.write("%d" % self.ESR)
def doQueryIDN(self, cmd, args):
self.write(self.IDN)
def doCommandOPC(self, cmd, args):
self.OPC = 1
def doQueryOPC(self, cmd, args):
self.write("1")
def doCommandRST(self, cmd, args):
print "Unimplemented Command: \"*RST\" in \"" + cmd + " " + ",".join(args) + "\""
def doCommandSRE(self, cmd, args):
self.SRE = int(args[0]) & 0xFF
def doQuerySRE(self, cmd, args):
self.write("%d" % self.SRE)
def doQuerySTB(self, cmd, args):
self.write("%d" % self.STB)
def doQueryTST(self, cmd, args):
self.write("%d" % self.TST)
def doCommandWAI(self, cmd, args):
print "Unimplemented Command: \"*WAI\" in \"" + cmd + " " + ",".join(args) + "\""
def doCommandALARM(self, cmd, args):
if len(args) > 0:
idx = ord(args[0]) - 64
params = args[1:].join(",")
if idx in self.ALARM:
self.ALARM[idx] = params
def doQueryALARM(self, cmd, args):
if len(args) > 0:
idx = ord(args[0]) - 64
if idx in self.ALARM:
self.write(self.ALARM[idx])
else:
self.write("0,1,500.0,0.0,0")
def doQueryALARMST(self, cmd, args):
if len(args) > 0:
idx = ord(args[0]) - 64
if idx in self.ALARMST:
self.write(self.ALARMST[idx])
else:
self.write("0,0")
def doCommandALMRST(self, cmd, args):
for key in self.ALARMST.keys():
self.ALARMST[key] = "0,0"
def doCommandANALOG(self, cmd, args):
key = args[0]
if key < 1 or key > 2:
key = 1
self.ANALOG[key] = ",".join(args[1:])
print "TODO implement Command: \"ANALOG\" in \"" + cmd + " " + ",".join(args) + "\""
def doQueryANALOG(self, cmd, args):
key = args[0]
if key < 1 or key > 2:
key = 1
self.write(self.ANALOG[key]) # TODO
print "TODO implement Query: \"ANALOG?\" in \"" + cmd + " " + ",".join(args) + "\""
def doQueryAOUT(self, cmd, args):
key = args[0]
if key < 1 or key > 2:
key = 1
self.write("%6.3f" % self.AOUT[key]) # TODO
print "TODO implement Query: \"AOUT?\" in \"" + cmd + " " + ",".join(args) + "\""
def doCommandBEEP(self, cmd, args):
print "Unimplemented Command: \"BEEP\" in \"" + cmd + " " + ",".join(args) + "\""
def doQueryBEEP(self, cmd, args):
print "Unimplemented Query: \"BEEP?\" in \"" + cmd + " " + ",".join(args) + "\""
def doQueryBUSY(self, cmd, args):
self.write("0")
def doCommandCDISP(self, cmd, args):
print "Unimplemented Command: \"CDISP\" in \"" + cmd + " " + ",".join(args) + "\""
def doQueryCDISP(self, cmd, args):
print "Unimplemented Query: \"CDISP?\" in \"" + cmd + " " + ",".join(args) + "\""
def doCommandCFILT(self, cmd, args):
loop = int(args[0])
if loop < 1:
loop = 1
if loop > 2:
loop = 2
self.CFILT[loop] = int(args[1])
def doQueryCFILT(self, cmd, args):
loop = int(args[0])
if loop < 1:
loop = 1
if loop > 2:
loop = 2
self.write("%d" % self.CFILT[loop])
def doCommandCLIMI(self, cmd, args):
self.CLIMI = double(args[0])
def doQueryCLIMI(self, cmd, args):
self.write("%f" % self.CLIMI)
def doCommandCLIMIT(self, cmd, args):
loop = int(args[0])
if loop < 1:
loop = 1
if loop > 2:
loop = 2
self.CLIMIT[loop] = ",".join(args[1:])
def doQueryCLIMIT(self, cmd, args):
loop = int(args[0])
if loop < 1:
loop = 1
if loop > 2:
loop = 2
self.write("%s" % self.CLIMIT[loop])
def doCommandCMODE(self, cmd, args):
loop = int(args[0])
if loop in self.CMODE:
self.CMODE[loop] = int(args[1])
def doQueryCMODE(self, cmd, args):
loop = int(args[0])
if loop in self.CMODE:
self.write("%f" % self.CMODE[loop])
def doCommandCOMM(self, cmd, args):
self.COMM = ",".join(args)
def doQueryCOMM(self, cmd, args):
self.write("%f" % self.COMM)
def doQueryCRDG(self, cmd, args):
loop = ord(args[0]) - 64
if loop < 1:
loop = 1
if loop > 4:
loop = 4
self.write("%f" % (self.KRDG[loop] - 273.15 + self.RANDOM))
def doCommandCRVDEL(self, cmd, args):
print "Unimplemented Command: \"CRVDEL\" in \"" + cmd + " " + ",".join(args) + "\""
def doCommandCRVHDR(self, cmd, args):
print "Unimplemented Command: \"CRVHDR\" in \"" + cmd + " " + ",".join(args) + "\""
def doQueryCRVHDR(self, cmd, args):
key = int(args[0])
if key in self.CRVHDR:
self.write(self.CRVHDR[key])
else:
self.write("DT-336-1 ,STANDARD ,1,+500.000,1") # TODO
print "TODO implement Query: \"CRVHDR?\" in \"" + cmd + " " + ",".join(args) + "\""
def doCommandCRVPT(self, cmd, args):
key = int(args[0])
if key < 1 or key > 20:
key = 1
idx = int(args[1])
if idx < 1 or idx > 200:
idx = 1
# TODO set the Curve Point
print "TODO implement Command: \"CRVPT\" in \"" + cmd + " " + ",".join(args) + "\""
def doQueryCRVPT(self, cmd, args):
key = int(args[0])
if key < 1 or key > 20:
key = 1
idx = int(args[1])
if idx < 1 or idx > 200:
idx = 1
self.write("1.0E+01,1.0+E02") # TODO
print "TODO implement Query: \"CRVPT?\" in \"" + cmd + " " + ",".join(args) + "\""
def doCommandCRVSAV(self, cmd, args):
pass
def doCommandCSET(self, cmd, args):
loop = int(args[0])
if loop < 1:
loop = 1
if loop > 2:
loop = 2
self.CSET[loop] = ",".join(args[1:])
def doQueryCSET(self, cmd, args):
loop = int(args[0])
if loop < 1:
loop = 1
if loop > 2:
loop = 2
self.write("%s" % self.CSET[loop])
def doCommandDATETIME(self, cmd, args):
print "Unimplemented Command: \"DATETIME\" in \"" + cmd + " " + ",".join(args) + "\""
def doQueryDATETIME(self, cmd, args):
self.write("2,3,1996,15,30,0,0") # TODO
def doCommandDFLT(self, cmd, args):
if args[0] == "99":
print "Unimplemented Command: \"DFLT 99\" in \"" + cmd + " " + ",".join(args) + "\""
else:
print "Invalid Command: \"DFLT " + args[0] + "\" in \"" + cmd + " " + ",".join(args) + "\""
def doCommandDISPLAY(self, cmd, args):
print "Unimplemented Command: \"DISPLAY\" in \"" + cmd + " " + ",".join(args) + "\""
def doQueryDISPLAY(self, cmd, args):
print "Unimplemented Query: \"DISPLAY?\" in \"" + cmd + " " + ",".join(args) + "\""
def doCommandDISPLOC(self, cmd, args):
print "Unimplemented Command: \"DISPLOC\" in \"" + cmd + " " + ",".join(args) + "\""
def doQueryDISPLOC(self, cmd, args):
print "Unimplemented Query: \"DISPLOC?\" in \"" + cmd + " " + ",".join(args) + "\""
def doCommandDOUT(self, cmd, args):
self.DOUT = int(args[0])
def doQueryDOUT(self, cmd, args):
self.write("%d" % self.DOUT)
def doCommandFILTER(self, cmd, args):
if len(args) > 0:
if int(args[0]) in self.FILTER:
keys = [int(args)]
else:
keys = self.FILTER.keys()
params = args[1:].join(",")
for key in keys:
self.FILTER[key] = params
def doQueryFILTER(self, cmd, args):
idx = int(args[0])
if idx in self.FILTER:
self.write(self.FILTER[idx])
else:
self.write("0")
def doQueryHTR(self, cmd, args):
self.write("%f" % self.HTR)
def doQueryHTRST(self, cmd, args):
self.write("%d" % self.HTRST)
def doQueryIEEE(self, cmd, args):
self.write("%s" % self.IEEE)
def doCommandIEEE(self, cmd, args):
self.IEEE = args[0]
def doQueryINCRV(self, cmd, args):
idx = ord(args[0]) - 64
if idx in self.INCRV:
self.write(self.INCRV[idx])
else:
self.write("0")
def doCommandINCRV(self, cmd, args):
if len(args) > 1:
idx = ord(args[0]) - 64
if idx in self.INCRV:
for key in keys:
self.INCRV[key] = args[1]
def doQueryINTYPE(self, cmd, args):
idx = ord(args[0]) - 64
if idx in self.INTYPE:
self.write(self.INTYPE[idx])
else:
self.write("0")
def doQueryINSET(self, cmd, args):
idx = ord(args[0])
if idx in self.INSET:
self.write(self.INSET[idx])
else:
self.write("0,0")
def doCommandINSET(self, cmd, args):
if len(args) > 0:
idx = ord(args[0])
if idx in self.INSET:
self.INSET[idx] = ",".join(args[1:])
def doCommandINTYPE(self, cmd, args):
if len(args) > 1:
idx = ord(args[0]) - 64
if idx in self.INTYPE:
for key in keys:
self.INTYPE[key] = ",".join(args[1:])
def doQueryKEYST(self, cmd, args):
self.write("%d" % self.KEYST)
self.KEYST = 0
def doQueryKRDG(self, cmd, args):
loop = ord(args[0]) - 64
if loop < 1:
loop = 1
if loop > 4:
loop = 4
self.write("%f" % (self.KRDG[loop] + self.RANDOM))
def doQueryLDAT(self, cmd, args):
self.write("3.000E+02") # TODO
print "TODO implement Query: \"LDAT?\" in \"" + cmd + " " + ",".join(args) + "\""
def doQueryLINEAR(self, cmd, args):
self.write("1,1.0,1,3") # TODO
print "TODO implement Query: \"LINEAR?\" in \"" + cmd + " " + ",".join(args) + "\""
def doCommandLINEAR(self, cmd, args):
print "Unimplemented Command: \"LINEAR\" in \"" + cmd + " " + ",".join(args) + "\""
def doQueryLOCK(self, cmd, args):
self.write("%s" % self.LOCK)
def doCommandLOCK(self, cmd, args):
self.LOCK = args[0]
def doQueryMDAT(self, cmd, args):
response = "0"
if len(args[0]) > 0:
idx = int(args[0])
if idx in self.MDAT:
(minv, maxv, reset) = self.MDAT[idx].split(",")
response = "%f,%f" % (float(minv), float(maxv))
self.write(response)
def doQueryMNMX(self, cmd, args):
self.write("1") # TODO
print "TODO implement Query: \"MNMX?\" in \"" + cmd + " " + ",".join(args) + "\""
def doCommandMNMX(self, cmd, args):
print "Unimplemented Command: \"MNMX\" in \"" + cmd + " " + ",".join(args) + "\""
def doCommandMNMXRST(self, cmd, args):
print "Unimplemented Command: \"MNMXRST\" in \"" + cmd + " " + ",".join(args) + "\""
def doQueryMODE(self, cmd, args):
print "Unimplemented Query: \"MODE?\" in \"" + cmd + " " + ",".join(args) + "\""
def doCommandMODE(self, cmd, args):
print "Unimplemented Command: \"MODE\" in \"" + cmd + " " + ",".join(args) + "\""
def doQueryMONITOR(self, cmd, args):
print "Unimplemented Query: \"MONITOR?\" in \"" + cmd + " " + ",".join(args) + "\""
def doCommandMONITOR(self, cmd, args):
print "Unimplemented Command: \"MONITOR\" in \"" + cmd + " " + ",".join(args) + "\""
def doQueryMOUT(self, cmd, args):
idx = int(args[0])
self.write(self.MOUT[idx])
def doCommandMOUT(self, cmd, args):
print "Unimplemented Command: \"MOUT\" in \"" + cmd + " " + ",".join(args) + "\""
def doQueryPID(self, cmd, args):
idx = int(args[0])
self.write(self.PID[idx])
def doCommandPID(self, cmd, args):
print "Unimplemented Command: \"PID\" in \"" + cmd + " " + ",".join(args) + "\""
def doQueryRANGE(self, cmd, args):
idx = 1
self.write(self.RANGE[idx])
def doCommandRANGE(self, cmd, args):
idx = 1
val = int(args[0])
self.RANGE[idx] = val
def doQueryRAMP(self, cmd, args):
idx = int(args[0])
response = "%d,%f" % (self.RAMP_ON[idx], self.RAMP_RATE[idx])
self.write(response)
def doCommandRAMP(self, cmd, args):
idx = int(args[0])
ramp_on = int(args[1])
if ramp_on == 0 or ramp_on == 1:
self.RAMP_ON[idx] = ramp_on
if ramp_on == 1:
ramp_rate = float(args[2])
if ramp_rate >= 0.001 and ramp_rate <= 100.0:
self.RAMP_RATE[idx] = ramp_rate
def doQueryRAMPST(self, cmd, args):
idx = int(args[0])
response = "%d" % self.RAMP_ST[idx]
self.write(response)
def doQueryRDGPWR(self, cmd, args):
self.write("1.000E-15") # TODO
print "TODO implement Query: \"RDGPWR?\" in \"" + cmd + " " + ",".join(args) + "\""
def doQueryRDGR(self, cmd, args):
self.write("1.000E+03") # TODO
print "TODO implement Query: \"RDGR?\" in \"" + cmd + " " + ",".join(args) + "\""
def doQueryRDGRNG(self, cmd, args):
self.write("1,1,19,0,0") # TODO
print "TODO implement Query: \"RDGRNG?\" in \"" + cmd + " " + ",".join(args) + "\""
def doCommandRDGRNG(self, cmd, args):
print "Unimplemented Command: \"RDGRNG\" in \"" + cmd + " " + ",".join(args) + "\""
def doQueryRDGST(self, cmd, args):
self.write("000")
print "TODO implement Query: \"RDGST?\" in \"" + cmd + " " + ",".join(args) + "\""
def doQueryRELAY(self, cmd, args):
idx = int(args[0])
self.write(self.RELAY[idx])
def doCommandRELAY(self, cmd, args):
idx = int(args[0])
if idx in self.RELAY:
self.relay[idx] = ",".join(args[1:])
def doQueryRELAYST(self, cmd, args):
idx = int(args[0])
self.write(self.RELAYST[idx])
def doQuerySETP(self, cmd, args):
idx = int(args[0])
val = self.SETP[idx]
self.write("%f" % val)
def doCommandSETP(self, cmd, args):
idx = int(args[0])
val = float(args[1])
if (val >= 0.0 and val <= 500.0):
self.TARGET[idx] = val
if self.RAMP_ON[idx] == 0:
self.SETP[idx] = val
self.RAMP_ST[idx] = 0
else:
self.RAMP_START_TEMP[idx] = self.SETP[idx]
self.RAMP_START_TIME[idx] = time.time()
self.RAMP_ST[idx] = 1
def doQuerySETTLE(self, cmd, args):
self.write(self.SETTLE)
def doCommandSETTLE(self, cmd, args):
self.SETTLE = ",".join(args)
def doQueryZONE(self, cmd, args):
print "Unimplemented Query: \"ZONE?\" in \"" + cmd + " " + ",".join(args) + "\""
def doCommandZONE(self, cmd, args):
print "Unimplemented Command: \"ZONE\" in \"" + cmd + " " + ",".join(args) + "\""
if __name__ == '__main__':
from LakeshoreProtocol import LakeshoreProtocol
class TestFactory:
def __init__(self):
print self.__class__.__name__, "ctor"
self.numProtocols = 0
def write(self, data):
print "test write:", data,
def loseConnection(self):
print "test lose connection"
test_factory = TestFactory()
test_device = Lakeshore340()
test_protocol = LakeshoreProtocol(test_device, "\r\n")
test_protocol.factory = test_factory
test_protocol.transport = test_factory
test_device.protocol = test_protocol
test_device.protocol.connectionMade()
test_device.protocol.dataReceived("*IDN?")
test_device.protocol.dataReceived("*TST?")
test_device.protocol.connectionLost("Dunno")

View File

@@ -0,0 +1,472 @@
# vim: ts=8 sts=4 sw=4 expandtab
# Fake Lakeshore Model 370 Temperature Controller
#
# Author: Douglas Clowes 2012, 2013
#
from LakeshoreDevice import LakeshoreDevice
import random
import re
import sys
import time
class Lakeshore370(LakeshoreDevice):
"""Lakeshore 370 temperature controller object - simulates the LS370"""
def __init__(self):
LakeshoreDevice.__init__(self)
print Lakeshore370.__name__, "ctor"
self.CONFIG_LOOPS = [1]
self.CONFIG_SNSRS = [i for i in range(1,17)]
self.reset_powerup()
def doCommand(self, command, params):
print Lakeshore370.__name__, "Command:", command, params
return LakeshoreDevice.doCommand(self, command, params)
def doQuery(self, command, params):
print Lakeshore370.__name__, "Query:", command, params
return LakeshoreDevice.doQuery(self, command, params)
def reset_powerup(self):
print Lakeshore370.__name__, "reset_powerup"
self.LAST_ITERATION = 0
self.ALARM = {1: "0,1,500.0,0.0,0,0"}
self.ALARMST = {1: "0,0"}
self.ANALOG = {1: "0,1,1,1,400.0,0.0,0.0", 2: "0,1,1,1,400.0,0.0,0.0"}
self.AOUT = { 1: 0.0, 2: 0.0 }
self.CMODE = {}
for idx in self.CONFIG_LOOPS:
self.CMODE[idx] = 1
self.CMR = 0
self.CPOL = 0
self.CRVHDR = {}
self.CRVHDR[1] = "DT-336-1 ,STANDARD ,1,+500.000,1"
self.CRVHDR[2] = "DT-336-2 ,STANDARD ,2,+0.500,1"
self.CRVHDR[3] = "DT-336-3 ,STANDARD ,3,+2.000,2"
self.CRVHDR[4] = "DT-336-4 ,STANDARD ,4,+0.301,2"
self.CRVPT = {}
for i in range(1,21):
self.CRVPT[i] = [(0.0, 0.0)]
self.CSET = "1,0,1,30,1,8,100"
self.DOUT = 0
self.FILTER = {1: "1,10,2"}
self.FREQ = 2
self.GUARD = 0
self.HTR = 0.0
self.HTRRNG = 0
self.HTRST = 0
self.IDN = "LSCI,MODEL370,123456,00000000,1.5"
self.IEEE = "0,0,4"
self.INCRV = {}
self.INSET = {}
for idx in self.CONFIG_SNSRS:
self.INCRV[idx] = "1"
self.INSET[idx] = "1,10,3,0,2"
self.INTYPE = {"A": "1,0,1,0,1", "B": "1,0,1,0,1", "C": "1,0,1,0,1", "D": "1,0,1,0,1" }
self.KEYST = 1
self.KRDG = {}
for idx in self.CONFIG_SNSRS:
self.KRDG[idx] = 300.0
self.LOCK = "0,000"
self.MDAT = {1: "0,0,1"} # Min,Max,Reset
self.MOUT = {1: "0.0", 2: "0.0", 3: "0.0", 4: "0.0"}
self.OUTMODE = {1: "1,1,0", 2: "1,2,0", 3: "1,3,0", 4: "1,4,0"}
self.PID = {1: "+0150.0,+0005.0,+000.0", 2: "+0150.0,+0005.0,+000.0"}
self.RAMP_ON = {1: 0}
self.RAMP_RATE = {1: 0.000}
self.RAMP_ST = {1: 0}
self.RAMP_TIME = 0.0
self.RDGST = {"A": 0, "B": 0, "C": 0, "D": 0}
self.RELAY = {1: "1,A,0", 2: "2,A,0"}
self.RELAYST = {1: "0", 2: "0"}
self.SETP = {}
self.TARGET = {}
self.RAMP_START_TEMP = {}
self.RAMP_START_TIME = {}
for idx in self.CONFIG_LOOPS:
self.SETP[idx] = 300.0
self.TARGET[idx] = 300.0
self.RAMP_START_TEMP[idx] = 300.0
self.RAMP_START_TIME[idx] = 0.0
self.STB = 0
self.ESE = 0
self.ESR = 0
self.OPC = 0
self.SRE = 0
self.TST = 0
self.RANDOM = 0.5
def doIteration(self):
delta_time = time.time() - self.LAST_ITERATION
if delta_time < 1:
return
#print "DoIteration:", delta_time
self.LAST_ITERATION = time.time()
for idx in self.CONFIG_LOOPS:
# TODO - progress ramping setpoints (SP)
if idx in self.RAMP_ON and self.RAMP_ON[idx] and self.TARGET[idx] != self.SETP[idx]:
delta_time = time.time() - self.RAMP_START_TIME[idx];
delta_temp = self.RAMP_RATE[idx] * (delta_time / 60.0)
if self.TARGET[idx] > self.RAMP_START_TEMP[idx]:
self.SETP[idx] = self.RAMP_START_TEMP[idx] + delta_temp
if self.SETP[idx] >= self.TARGET[idx]:
self.SETP[idx] = self.TARGET[idx]
self.RAMP_ST[idx] = 0
else:
self.SETP[idx] = self.RAMP_START_TEMP[idx] - delta_temp
if self.SETP[idx] <= self.TARGET[idx]:
self.SETP[idx] = self.TARGET[idx]
self.RAMP_ST[idx] = 0
# TODO - iterate Power Level
if self.KRDG[idx] <> self.SETP[idx]:
self.HTR = self.SETP[idx] - self.KRDG[idx]
if self.HTR > 100.0:
self.HTR = 100.0
elif self.HTR < -100.0:
self.HTR = -100.0
# TODO - iterate Process Values (PV)
self.KRDG[idx] = (0.9 * self.KRDG[idx] + 0.1 * self.SETP[idx])
def doCommandCLS(self, cmd, args):
print "Unimplemented Command: \"*CLS\" in \"" + cmd + " " + ",".join(args) + "\""
def doCommandESE(self, cmd, args):
self.ESE = int(args[0]) & 0xFF
def doQueryESE(self, cmd, args):
self.write("%d" % self.ESE)
def doQueryESR(self, cmd, args):
self.write("%d" % self.ESR)
def doQueryIDN(self, cmd, args):
self.write(self.IDN)
def doCommandOPC(self, cmd, args):
self.OPC = 1
def doQueryOPC(self, cmd, args):
self.write("1")
def doCommandRST(self, cmd, args):
print "Unimplemented Command: \"*RST\" in \"" + cmd + " " + ",".join(args) + "\""
def doCommandSRE(self, cmd, args):
self.SRE = int(args[0]) & 0xFF
def doQuerySRE(self, cmd, args):
self.write("%d" % self.SRE)
def doQuerySTB(self, cmd, args):
self.write("%d" % self.STB)
def doQueryTST(self, cmd, args):
self.write("%d" % self.TST)
def doCommandWAI(self, cmd, args):
print "Unimplemented Command: \"*WAI\" in \"" + cmd + " " + ",".join(args) + "\""
def doCommandALARM(self, cmd, args):
if len(args) > 0:
if int(args[0]) in self.ALARM:
keys = [int(args)]
else:
keys = self.ALARM.keys()
params = args[1:].join(",")
for key in keys:
self.ALARM[key] = params
def doQueryALARM(self, cmd, args):
idx = int(args[0])
if idx in self.ALARM:
self.write(self.ALARM[idx])
else:
self.write("0,1,500.0,0.0,0")
def doQueryALARMST(self, cmd, args):
idx = int(args[0])
if idx in self.ALARMST:
self.write(self.ALARMST[idx])
else:
self.write("0,0")
def doCommandALMRST(self, cmd, args):
for key in self.ALARMST.keys():
self.ALARMST[key] = "0,0"
def doCommandANALOG(self, cmd, args):
key = args[0]
if key < 1 or key > 2:
key = 1
self.ANALOG[key] = ",".join(args[1:])
print "TODO implement Command: \"ANALOG\" in \"" + cmd + " " + ",".join(args) + "\""
def doQueryANALOG(self, cmd, args):
key = args[0]
if key < 1 or key > 2:
key = 1
self.write(self.ANALOG[key]) # TODO
print "TODO implement Query: \"ANALOG?\" in \"" + cmd + " " + ",".join(args) + "\""
def doQueryAOUT(self, cmd, args):
key = args[0]
if key < 1 or key > 2:
key = 1
self.write("%6.3f" % self.AOUT[key]) # TODO
print "TODO implement Query: \"AOUT?\" in \"" + cmd + " " + ",".join(args) + "\""
def doCommandBAUD(self, cmd, args):
print "Unimplemented Command: \"BAUD\" in \"" + cmd + " " + ",".join(args) + "\""
def doQueryBAUD(self, cmd, args):
print "Unimplemented Query: \"BAUD?\" in \"" + cmd + " " + ",".join(args) + "\""
def doCommandBEEP(self, cmd, args):
print "Unimplemented Command: \"BEEP\" in \"" + cmd + " " + ",".join(args) + "\""
def doQueryBEEP(self, cmd, args):
print "Unimplemented Query: \"BEEP?\" in \"" + cmd + " " + ",".join(args) + "\""
def doCommandBRIGT(self, cmd, args):
print "Unimplemented Command: \"BRIGT\" in \"" + cmd + " " + ",".join(args) + "\""
def doQueryBRIGT(self, cmd, args):
print "Unimplemented Query: \"BRIGT?\" in \"" + cmd + " " + ",".join(args) + "\""
def doCommandCHGALL(self, cmd, args):
print "Unimplemented Command: \"CHGALL\" in \"" + cmd + " " + ",".join(args) + "\""
def doQueryCHGALL(self, cmd, args):
print "Unimplemented Query: \"CHGALL?\" in \"" + cmd + " " + ",".join(args) + "\""
def doCommandCMODE(self, cmd, args):
loop = 1
if loop in self.CMODE:
self.CMODE[loop] = int(args[0])
def doQueryCMODE(self, cmd, args):
loop = 1
if loop in self.CMODE:
self.write("%f" % self.CMODE[loop])
def doCommandCMR(self, cmd, args):
self.CMR = int(args[0])
def doQueryCMR(self, cmd, args):
self.write("%d" % self.CMR)
def doCommandCPOL(self, cmd, args):
self.CPOL = int(args[0])
def doQueryCPOL(self, cmd, args):
self.write("%d" % self.CPOL)
def doCommandCRVDEL(self, cmd, args):
print "Unimplemented Command: \"CRVDEL\" in \"" + cmd + " " + ",".join(args) + "\""
def doCommandCRVHDR(self, cmd, args):
print "Unimplemented Command: \"CRVHDR\" in \"" + cmd + " " + ",".join(args) + "\""
def doQueryCRVHDR(self, cmd, args):
key = int(args[0])
if key in self.CRVHDR:
self.write(self.CRVHDR[key])
else:
self.write("DT-336-1 ,STANDARD ,1,+500.000,1") # TODO
print "TODO implement Query: \"CRVHDR?\" in \"" + cmd + " " + ",".join(args) + "\""
def doCommandCRVPT(self, cmd, args):
key = int(args[0])
if key < 1 or key > 20:
key = 1
idx = int(args[1])
if idx < 1 or idx > 200:
idx = 1
# TODO set the Curve Point
print "TODO implement Command: \"CRVPT\" in \"" + cmd + " " + ",".join(args) + "\""
def doQueryCRVPT(self, cmd, args):
key = int(args[0])
if key < 1 or key > 20:
key = 1
idx = int(args[1])
if idx < 1 or idx > 200:
idx = 1
self.write("1.0E+01,1.0+E02") # TODO
print "TODO implement Query: \"CRVPT?\" in \"" + cmd + " " + ",".join(args) + "\""
def doCommandCSET(self, cmd, args):
self.CSET = args[0]
def doQueryCSET(self, cmd, args):
self.write("%s" % self.CSET)
def doCommandDFLT(self, cmd, args):
if args[0] == "99":
print "Unimplemented Command: \"DFLT 99\" in \"" + cmd + " " + ",".join(args) + "\""
else:
print "Invalid Command: \"DFLT " + args[0] + "\" in \"" + cmd + " " + ",".join(args) + "\""
def doCommandDISPLAY(self, cmd, args):
print "Unimplemented Command: \"DISPLAY\" in \"" + cmd + " " + ",".join(args) + "\""
def doQueryDISPLAY(self, cmd, args):
print "Unimplemented Query: \"DISPLAY?\" in \"" + cmd + " " + ",".join(args) + "\""
def doCommandDISPLOC(self, cmd, args):
print "Unimplemented Command: \"DISPLOC\" in \"" + cmd + " " + ",".join(args) + "\""
def doQueryDISPLOC(self, cmd, args):
print "Unimplemented Query: \"DISPLOC?\" in \"" + cmd + " " + ",".join(args) + "\""
def doCommandDOUT(self, cmd, args):
self.DOUT = int(args[0])
def doQueryDOUT(self, cmd, args):
self.write("%d" % self.DOUT)
def doCommandFILTER(self, cmd, args):
if len(args) > 0:
if int(args[0]) in self.FILTER:
keys = [int(args)]
else:
keys = self.FILTER.keys()
params = args[1:].join(",")
for key in keys:
self.FILTER[key] = params
def doQueryFILTER(self, cmd, args):
idx = int(args[0])
if idx in self.FILTER:
self.write(self.FILTER[idx])
else:
self.write("0")
def doCommandFREQ(self, cmd, args):
self.FREQ = int(args[0])
def doQueryFREQ(self, cmd, args):
self.write("%d" % self.FREQ)
def doCommandGUARD(self, cmd, args):
self.GUARD = int(args[0])
def doQueryGUARD(self, cmd, args):
self.write("%d" % self.GUARD)
def doQueryHTR(self, cmd, args):
self.write("%f" % self.HTR)
def doCommandHTRRNG(self, cmd, args):
self.HTRRNG = int(args[0])
def doQueryHTRRNG(self, cmd, args):
self.write("%d" % self.HTRRNG)
def doQueryHTRST(self, cmd, args):
self.write("%d" % self.HTRST)
def doQueryIEEE(self, cmd, args):
self.write("%s" % self.IEEE)
def doCommandIEEE(self, cmd, args):
self.IEEE = args[0]
def doQueryINSET(self, cmd, args):
idx = int(args[0])
if idx in self.INSET:
self.write(self.INSET[idx])
else:
self.write("0")
def doCommandINSET(self, cmd, args):
if len(args) > 0:
idx = int(args[0])
if idx in self.INSET:
params = ",".join(args[1:])
self.INSET[idx] = params
def doQueryKEYST(self, cmd, args):
self.write("%d" % self.KEYST)
self.KEYST = 0
def doQueryLDAT(self, cmd, args):
self.write("3.000E+02") # TODO
print "TODO implement Query: \"LDAT?\" in \"" + cmd + " " + ",".join(args) + "\""
def doQueryLINEAR(self, cmd, args):
self.write("1,1.0,1,3") # TODO
print "TODO implement Query: \"LINEAR?\" in \"" + cmd + " " + ",".join(args) + "\""
def doCommandLINEAR(self, cmd, args):
print "Unimplemented Command: \"LINEAR\" in \"" + cmd + " " + ",".join(args) + "\""
def doQueryLOCK(self, cmd, args):
self.write("%s" % self.LOCK)
def doCommandLOCK(self, cmd, args):
self.LOCK = args[0]
def doQueryMDAT(self, cmd, args):
response = "0"
if len(args[0]) > 0:
idx = int(args[0])
if idx in self.MDAT:
(minv, maxv, reset) = self.MDAT[idx].split(",")
response = "%f,%f" % (float(minv), float(maxv))
self.write(response)
def doQueryMNMX(self, cmd, args):
self.write("1") # TODO
print "TODO implement Query: \"MNMX?\" in \"" + cmd + " " + ",".join(args) + "\""
def doCommandMNMX(self, cmd, args):
print "Unimplemented Command: \"MNMX\" in \"" + cmd + " " + ",".join(args) + "\""
def doCommandMNMXRST(self, cmd, args):
print "Unimplemented Command: \"MNMXRST\" in \"" + cmd + " " + ",".join(args) + "\""
def doQueryMODE(self, cmd, args):
print "Unimplemented Query: \"MODE?\" in \"" + cmd + " " + ",".join(args) + "\""
def doCommandMODE(self, cmd, args):
print "Unimplemented Command: \"MODE\" in \"" + cmd + " " + ",".join(args) + "\""
def doQueryMONITOR(self, cmd, args):
print "Unimplemented Query: \"MONITOR?\" in \"" + cmd + " " + ",".join(args) + "\""
def doCommandMONITOR(self, cmd, args):
print "Unimplemented Command: \"MONITOR\" in \"" + cmd + " " + ",".join(args) + "\""
def doQueryMOUT(self, cmd, args):
print "Unimplemented Query: \"MOUT?\" in \"" + cmd + " " + ",".join(args) + "\""
def doCommandMOUT(self, cmd, args):
print "Unimplemented Command: \"MOUT\" in \"" + cmd + " " + ",".join(args) + "\""
def doQueryPID(self, cmd, args):
idx = int(args[0])
self.write(self.PID[idx])
def doCommandPID(self, cmd, args):
print "Unimplemented Command: \"PID\" in \"" + cmd + " " + ",".join(args) + "\""
def doQueryRAMP(self, cmd, args):
idx = 1
response = "%d,%f" % (self.RAMP_ON[idx], self.RAMP_RATE[idx])
self.write(response)
def doCommandRAMP(self, cmd, args):
idx = 1
ramp_on = int(args[0])
if ramp_on == 0 or ramp_on == 1:
self.RAMP_ON[idx] = ramp_on
if ramp_on == 1:
ramp_rate = float(args[1])
if ramp_rate >= 0.001 and ramp_rate <= 10.0:
self.RAMP_RATE[idx] = ramp_rate
def doQueryRAMPST(self, cmd, args):
response = "%d" % self.RAMP_ST
self.write(response)
def doQueryRDGK(self, cmd, args):
if len(args) == 0 or len(args[0]) == 0:
idx = 1
else:
idx = int(args[0])
if idx in self.KRDG:
self.RANDOM = random.uniform(-0.5, 0.5)
self.RANDOM = 0.5
self.write("%f" % (self.KRDG[idx] + self.RANDOM))
else:
self.write("+000.0")
def doQueryRDGPWR(self, cmd, args):
self.write("1.000E-15") # TODO
print "TODO implement Query: \"RDGPWR?\" in \"" + cmd + " " + ",".join(args) + "\""
def doQueryRDGR(self, cmd, args):
self.write("1.000E+03") # TODO
print "TODO implement Query: \"RDGR?\" in \"" + cmd + " " + ",".join(args) + "\""
def doQueryRDGRNG(self, cmd, args):
self.write("1,1,19,0,0") # TODO
print "TODO implement Query: \"RDGRNG?\" in \"" + cmd + " " + ",".join(args) + "\""
def doCommandRDGRNG(self, cmd, args):
print "Unimplemented Command: \"RDGRNG\" in \"" + cmd + " " + ",".join(args) + "\""
def doQueryRDGST(self, cmd, args):
self.write("000")
print "TODO implement Query: \"RDGST?\" in \"" + cmd + " " + ",".join(args) + "\""
def doQueryRELAY(self, cmd, args):
print "Unimplemented Query: \"RELAY?\" in \"" + cmd + " " + ",".join(args) + "\""
def doCommandRELAY(self, cmd, args):
print "Unimplemented Command: \"RELAY\" in \"" + cmd + " " + ",".join(args) + "\""
def doQueryRELAYST(self, cmd, args):
print "Unimplemented Query: \"RELAYST?\" in \"" + cmd + " " + ",".join(args) + "\""
def doQuerySCAN(self, cmd, args):
print "Unimplemented Query: \"SCAN?\" in \"" + cmd + " " + ",".join(args) + "\""
def doCommandSCAN(self, cmd, args):
print "Unimplemented Command: \"SCAN\" in \"" + cmd + " " + ",".join(args) + "\""
def doQuerySETP(self, cmd, args):
idx = 1
val = self.SETP[idx]
self.write("%f" % val)
def doCommandSETP(self, cmd, args):
idx = 1
val = float(args[0])
if (val >= 0.0 and val <= 500.0):
self.TARGET[idx] = val
if self.RAMP_ON[idx] == 0:
self.SETP[idx] = val
self.RAMP_ST[idx] = 0
else:
self.RAMP_START_TEMP[idx] = self.SETP[idx]
self.RAMP_START_TIME[idx] = time.time()
self.RAMP_ST[idx] = 1
def doQuerySTILL(self, cmd, args):
print "Unimplemented Query: \"STILL?\" in \"" + cmd + " " + ",".join(args) + "\""
def doCommandSTILL(self, cmd, args):
print "Unimplemented Command: \"STILL\" in \"" + cmd + " " + ",".join(args) + "\""
def doQueryZONE(self, cmd, args):
print "Unimplemented Query: \"ZONE?\" in \"" + cmd + " " + ",".join(args) + "\""
def doCommandZONE(self, cmd, args):
print "Unimplemented Command: \"ZONE\" in \"" + cmd + " " + ",".join(args) + "\""
if __name__ == '__main__':
from LakeshoreProtocol import LakeshoreProtocol
class TestFactory:
def __init__(self):
print self.__class__.__name__, "ctor"
self.numProtocols = 0
def write(self, data):
print "test write:", data,
def loseConnection(self):
print "test lose connection"
test_factory = TestFactory()
test_device = Lakeshore370()
test_protocol = LakeshoreProtocol(test_device, "\r\n")
test_protocol.factory = test_factory
test_protocol.transport = test_factory
test_device.protocol = test_protocol
test_device.protocol.connectionMade()
test_device.protocol.dataReceived("*IDN?")
test_device.protocol.dataReceived("*TST?")
test_device.protocol.connectionLost("Dunno")

View File

@@ -0,0 +1,92 @@
# vim: ts=8 sts=4 sw=4 expandtab
#
# Generic Lakeshore Temperature Controller Device
#
# Author: Douglas Clowes (2013)
#
import inspect
import traceback
class LakeshoreDevice(object):
def __init__(self):
print LakeshoreDevice.__name__, "ctor"
#print "Methods:", inspect.getmembers(self, inspect.ismethod)
methods = inspect.getmembers(self, inspect.ismethod)
self.myMethods = {}
for method in methods:
self.myMethods[method[0]] = method[1:]
#for method in sorted(self.myMethods):
# print "Method:", method, self.myMethods[method], type(method), type(self.myMethods[method])
def reset_powerup(self):
print LakeshoreDevice.__name__, "reset_powerup"
def write(self, response):
print "Device Response: %s" % response
self.protocol.write(response)
def doCommand(self, command, params):
print LakeshoreDevice.__name__, "Command:", command, params
method = "doCommand%s" % command
if method in self.myMethods:
action = "response = self.%s(command, params)" % method
print "Action:", action
exec action
if response:
return response
else:
print "Unimplemented Command:", command, params
return False
def doQuery(self, command, params):
print LakeshoreDevice.__name__, "Query:", command, params
method = "doQuery%s" % command
if method in self.myMethods:
action = "response = self.%s(command, params)" % method
print "Action:", action
exec action
if response:
return response
else:
print "Unimplemented Query:", command, params
self.write("Unimplemented Query: %s" % command)
return False
def doQueryTST(self, command, params):
self.write("0")
return True
def doQueryIDN(self, command, params):
self.write(self.IDN)
return True
def dataReceived(self, data):
print LakeshoreDevice.__name__, "PDU: \"" + data + "\""
command = data.split()[0]
params = data[len(command):].strip().split(",")
if command[0] == "*":
command = command[1:]
try:
if command[-1] == '?':
self.doQuery(command[:-1], params)
else:
self.doCommand(command, params)
except:
traceback.print_exc()
if __name__ == '__main__':
class TestProtocol:
def __init__(self):
print self.__class__.__name__, "ctor"
self.numProtocols = 0
def write(self, data):
print "test write:", data
def loseConnection(self):
print "test lose connection"
test_protocol = TestProtocol()
test_device = LakeshoreDevice()
test_device.protocol = test_protocol
test_device.dataReceived("*IDN?")
test_device.dataReceived("*TST?")
test_device.dataReceived("RDGK?")

View File

@@ -0,0 +1,43 @@
# vim: ts=8 sts=4 sw=4 expandtab
from twisted.internet.protocol import ServerFactory
class LakeshoreFactory(ServerFactory):
"""Factory object used by the Twisted Infrastructure to create a Protocol
object for incomming connections"""
protocol = None
def __init__(self, theProtocol, theDevice, theTerminator = "\r\n"):
print LakeshoreFactory.__name__, "ctor"
self.protocol = theProtocol
self.device = theDevice
self.term = theTerminator
self.numProtocols = 0
def buildProtocol(self, addr):
p = self.protocol(self.device, self.term)
p.factory = self
return p
if __name__ == '__main__':
class TestProtocol:
def __init__(self, theDevice, theTerm = "\r\n"):
self.device = theDevice
self.response = ""
self.term = theTerm
class TestDevice:
def __init__(self):
pass
new_factory = LakeshoreFactory(TestProtocol, TestDevice, "\r\n");
new_protocol = new_factory.buildProtocol("address")
print "Factory: ", new_factory
print " .protocol", new_factory.protocol
print " .device ", new_factory.device
print " .num_prot", new_factory.numProtocols
print " .term ", new_factory.term
print "Protocol: ", new_protocol
print " .device ", new_protocol.device
print " .response", new_protocol.response
print " .term ", new_protocol.term

View File

@@ -0,0 +1,88 @@
# vim: ts=8 sts=4 sw=4 expandtab
from twisted.internet.protocol import Protocol
class LakeshoreProtocol(Protocol):
"""Protocol object used by the Twisted Infrastructure to handle connections"""
def __init__(self, theDevice, theTerminator = "\r\n"):
print LakeshoreProtocol.__name__, "ctor"
self.device = theDevice
self.response = ""
self.term = theTerminator
def write(self, response):
self.response = self.response + response
def connectionMade(self):
self.pdu = ""
self.response = ""
self.device.protocol = self
self.factory.numProtocols = self.factory.numProtocols + 1
print "connectionMade:", self.factory.numProtocols
if self.factory.numProtocols > 2:
print "Too many connections - rejecting"
self.transport.write("Too many connections, try later" + self.term)
self.transport.loseConnection()
else:
self.transport.write(("Welcome connection %d" % self.factory.numProtocols) + self.term)
def connectionLost(self, reason):
print "connectionLost:", self.factory.numProtocols, reason
self.factory.numProtocols = self.factory.numProtocols - 1
def lineReceived(self, data):
print "lineReceived - len:", len(data), data
self.device.protocol = self
self.device.dataReceived(data)
def dataReceived(self, data):
print "dataReceived - len:", len(data), data
for c in data:
if c == "\r" or c == ";" or c == "\n":
if len(self.pdu) > 0:
self.lineReceived(self.pdu)
self.pdu = ""
if c == ";":
if len(self.response) > 0 and self.response[-1] != ";":
self.response = self.response + ";"
else:
if len(self.response) > 0:
if self.response[-1] == ";":
self.response = self.response[:-1]
if len(self.response) > 0:
print "Protocol Response: %s" % self.response
self.transport.write(self.response + self.term)
self.response = ""
else:
self.pdu = self.pdu + c
if __name__ == '__main__':
class TestDevice:
def __init__(self):
print self.__class__.__name__, "ctor"
def dataReceived(self, pdu):
print "test device data received:", pdu
self.protocol.write("test device response")
class TestFactory:
def __init__(self):
self.numProtocols = 0
def write(self, data):
print "test write:", data,
def loseConnection(self):
print "test lose connection"
self.protocol.connectionLost("Factory")
myTerm = "\r\n"
test_device = TestDevice()
test_factory = TestFactory()
test_protocol = LakeshoreProtocol(test_device, myTerm)
test_factory.protocol = test_protocol
test_protocol.factory = test_factory
test_protocol.transport = test_factory
test_protocol.connectionMade()
test_protocol.connectionMade()
test_protocol.connectionMade()
test_protocol.connectionLost("Dunno")
test_protocol.dataReceived("*IDN?" + myTerm + "*IDN?;*I")
test_protocol.dataReceived("DN")
test_protocol.dataReceived("?" + myTerm)
test_protocol.connectionLost("Dunno")

View File

@@ -0,0 +1,150 @@
# vim: ts=8 sts=4 sw=4 expandtab
import curses
class CursesStdIO:
"""fake fd to be registered as a reader with the twisted reactor.
Curses classes needing input should extend this"""
def fileno(self):
""" We want to select on FD 0 """
return 0
def doRead(self):
"""called when input is ready"""
def logPrefix(self): return 'CursesClient'
class Screen(CursesStdIO):
def __init__(self, stdscr):
self.timer = 0
self.statusText = "TEST CURSES APP -"
self.searchText = ''
self.stdscr = stdscr
# set screen attributes
self.stdscr.nodelay(1) # this is used to make input calls non-blocking
curses.cbreak()
self.stdscr.keypad(1)
curses.curs_set(0) # no annoying mouse cursor
self.rows, self.cols = self.stdscr.getmaxyx()
self.lines = []
curses.start_color()
# create color pair's 1 and 2
curses.init_pair(1, curses.COLOR_BLACK, curses.COLOR_WHITE)
curses.init_pair(2, curses.COLOR_CYAN, curses.COLOR_BLACK)
self.paintStatus(self.statusText)
self.stdscr.refresh()
def connectionLost(self, reason):
self.close()
def addLine(self, text):
""" add a line to the internal list of lines"""
self.lines.append(text)
while len(self.lines) > 10:
del self.lines[0]
self.redisplayLines()
def redisplayLines(self):
""" method for redisplaying lines
based on internal list of lines """
self.stdscr.clear()
self.paintStatus(self.statusText)
i = 0
index = len(self.lines) - 1
while i < (self.rows - 3) and index >= 0:
self.stdscr.addstr(self.rows - 3 - i, 0, self.lines[index],
curses.color_pair(2))
i = i + 1
index = index - 1
self.stdscr.refresh()
def paintStatus(self, text):
if len(text) > self.cols: raise TextTooLongError
self.stdscr.addstr(self.rows-2,0,text + ' ' * (self.cols-len(text)),
curses.color_pair(1))
# move cursor to input line
self.stdscr.move(self.rows-1, self.cols-1)
def doRead(self):
""" Input is ready! """
curses.noecho()
self.timer = self.timer + 1
c = self.stdscr.getch() # read a character
if c == curses.KEY_BACKSPACE:
print "curses KEY_BACKSPACE"
self.searchText = self.searchText[:-1]
elif c == curses.KEY_ENTER or c == 10:
self.addLine(self.searchText)
# for testing too
try: self.sendLine(self.searchText)
except: pass
self.stdscr.refresh()
self.searchText = ''
elif c == curses.KEY_RESIZE:
print "curses KEY_RESIZE"
self.rows, self.cols = self.stdscr.getmaxyx()
self.redisplayLines()
self.paintStatus(self.statusText)
self.stdscr.refresh()
elif c >= curses.KEY_MIN and c <= curses.KEY_MAX:
return
else:
if len(self.searchText) >= self.cols-2:
return
self.searchText = self.searchText + chr(c)
self.stdscr.addstr(self.rows-1, 0,
self.searchText + (' ' * (
self.cols-len(self.searchText)-2)))
self.stdscr.move(self.rows-1, len(self.searchText))
self.paintStatus(self.statusText + ' %d' % len(self.searchText))
self.stdscr.refresh()
def sendLine(self, txt):
pass
def close(self):
""" clean up """
curses.nocbreak()
self.stdscr.keypad(0)
curses.echo()
curses.endwin()
def device_display():
global stdscr
try:
import datetime
now = datetime.datetime.now()
str = now.date().isoformat() + " " + now.time().isoformat()
stdscr.addstr(12, 25, "Time: %s" % str)
except:
pass
stdscr.refresh()
if __name__ == '__main__':
global stdscr
from twisted.internet.task import LoopingCall
from twisted.internet import reactor
from twisted.python import log
log.startLogging(open("/tmp/displayscreen.log", "w"))
stdscr = curses.initscr()
screen = Screen(stdscr)
lc = LoopingCall(device_display)
lc.start(0.250)
reactor.addReader(screen) # add screen object as a reader to the reactor
reactor.run()