Add a fake Huber bath

This commit is contained in:
Douglas Clowes
2014-06-20 14:52:29 +10:00
parent ff342dc16d
commit 84fa02a5a5

View File

@ -0,0 +1,219 @@
#!/usr/bin/python
# vim: ft=python ts=8 sts=4 sw=4 et autoindent smartindent nocindent
# author: Douglas Clowes (douglas.clowes@ansto.gov.au) 2014
#
from twisted.internet import reactor, protocol
from twisted.protocols.basic import LineReceiver
from twisted.python import log
from twisted.internet.task import LoopingCall
import os
import sys
import curses
sys.path.insert(0, os.path.realpath(os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]),"../../util"))))
from displayscreen import Screen
devices = []
def hexData(data):
str = "%04X" % data
return str
def hexTemp(temp):
str = "%04X" % int(temp * 100)
if len(str) > 4:
str = str[-4:]
return str
def binTemp(temp):
x = binData(temp)
x *= 0.01
return x
def binData(data):
x = int(data, 16)
if x > 0x8000:
x -= 0x10000
return x
class Huber(LineReceiver):
def __init__(self):
self.delimiter = '\r\n'
self.setpoint = 25.00
self.internal = 25.00
self.external = 24.00
self.mode = 0
self.control = 1
self.min_set = 5.00
self.max_set = 35.00
self.data = {}
self.set_data()
def set_data(self):
self.data["00"] = hexTemp(self.setpoint)
self.data["01"] = hexTemp(self.internal)
self.data["07"] = hexTemp(self.external)
self.data["13"] = hexData(self.mode)
self.data["14"] = hexData(self.control)
self.data["30"] = hexTemp(self.min_set)
self.data["31"] = hexTemp(self.max_set)
def iterate(self):
if self.mode == 0: # internal
self.external = self.internal - 1.0
if self.setpoint != self.internal:
self.internal = self.setpoint * 0.9 + self.internal * 0.1
else:
self.internal = self.external + 1.0
if self.setpoint != self.external:
self.external = self.setpoint * 0.9 + self.external * 0.1
self.set_data()
def write(self, data):
sent = data + self.delimiter
print "transmitted:", repr(sent)
self.transport.write(sent)
def lineReceived(self, data):
print "lineReceived:", data
if len(data) >= 8 and data[0] == "{":
m_field = data[1]
t_field = data[2:4]
v_field = data[4:]
if t_field not in self.data:
print "Unknown address in: '%s'" % data
return
if v_field == "****":
self.write("{S" + t_field + self.data[t_field])
return
else:
if t_field == "00":
newTemp = binTemp(v_field)
if self.min_set <= newTemp <= self.max_set:
self.setpoint = newTemp
if t_field == "13":
self.mode = binData(v_field)
if t_field == "14":
self.control = binData(v_field)
if t_field == "30":
newTemp = binTemp(v_field)
if -150 <= newTemp <= 350:
self.min_set = newTemp
if t_field == "31":
newTemp = binTemp(v_field)
if -150 <= newTemp <= 350:
self.max_set = newTemp
self.set_data()
return
print "Unimplemented command for: '%s'" % data
return
def rawDataReceived(self, data):
#print "rawDataReceived:", repr(data)
self.transport.write(data)
self.line += data
if self.line.endswith(self.delimiter):
self.line = self.line[:-len(self.delimiter)]
self.lineReceived(self.line)
self.line = ""
def connectionMade(self):
print "connectionMade"
devices.append(self)
def connectionLost(self, reason):
print "connectionLost"
devices.remove(self)
def device_iterator():
global devices
for dev in devices:
dev.iterate()
def display_iterator():
global screen, devices
try:
rows, cols = screen.stdscr.getmaxyx()
screen.stdscr.clear()
col = 0
base = 0
screen.stdscr.addstr(base + 1, 0, "Setpoint: ")
screen.stdscr.addstr(base + 2, 0, "Internal: ")
screen.stdscr.addstr(base + 3, 0, "External: ")
screen.stdscr.addstr(base + 4, 0, "Mode : ")
screen.stdscr.addstr(base + 5, 0, "Control : ")
screen.stdscr.addstr(base + 6, 0, "Min_Set : ")
screen.stdscr.addstr(base + 7, 0, "Max_Set : ")
for dev in devices:
col += 1
screen.stdscr.addstr(0, 12 * col, "Top TODO")
try:
screen.stdscr.addstr(base + 1, 12 * col, "%8.2f" % dev.setpoint)
screen.stdscr.addstr(base + 2, 12 * col, "%8.2f" % dev.internal)
screen.stdscr.addstr(base + 3, 12 * col, "%8.2f" % dev.external)
screen.stdscr.addstr(base + 4, 12 * col, "%8d" % dev.mode)
screen.stdscr.addstr(base + 5, 12 * col, "%8d" % dev.control)
screen.stdscr.addstr(base + 6, 12 * col, "%8.2f" % dev.min_set)
screen.stdscr.addstr(base + 7, 12 * col, "%8.2f" % dev.max_set)
except:
pass
except:
raise
finally:
try:
screen.stdscr.refresh()
except:
pass
class MyScreen(Screen):
def __init__(self, stdscr):
Screen.__init__(self, stdscr)
def sendLine(self, txt):
global devices
def write(self, txt):
try:
newLine = self.lines[-1] + " => " + txt
del self.lines[-1]
self.addLine(newLine)
except:
pass
def main():
global screen
import argparse
parser = argparse.ArgumentParser(description="Fake Huber device")
parser.add_argument('instrument', help='The instrument name', nargs='*')
parser.add_argument('-p', '--port',\
help='Port on which to listen',\
type=int, default=61000)
parser.add_argument('-v', '--verbose',\
help='Print lots of stuff',\
action='store_true', default=False)
parser.add_argument('-w', '--window',\
help='Create a display window',\
action='store_true', default=False)
args = parser.parse_args()
log.startLogging(open(("/tmp/Fake_Huber.log"), "w"))
if args.verbose:
print "Args:", args
Verbose = True
if (args.window):
stdscr = curses.initscr()
screen = MyScreen(stdscr)
reactor.addReader(screen)
disp_iter = LoopingCall(display_iterator)
disp_iter.start(0.5)
dev_iter = LoopingCall(device_iterator)
dev_iter.start(1.0)
factory = protocol.ServerFactory()
factory.protocol = Huber
reactor.listenTCP(args.port, factory)
reactor.run()
if __name__ == "__main__":
main()