From 84fa02a5a53075a0218ddb2cb56639d15f6d5f3b Mon Sep 17 00:00:00 2001 From: Douglas Clowes Date: Fri, 20 Jun 2014 14:52:29 +1000 Subject: [PATCH] Add a fake Huber bath --- .../TEST_SICS/fakeHuber/fakeHuber.py | 219 ++++++++++++++++++ 1 file changed, 219 insertions(+) create mode 100755 site_ansto/instrument/TEST_SICS/fakeHuber/fakeHuber.py diff --git a/site_ansto/instrument/TEST_SICS/fakeHuber/fakeHuber.py b/site_ansto/instrument/TEST_SICS/fakeHuber/fakeHuber.py new file mode 100755 index 00000000..c3b42cc3 --- /dev/null +++ b/site_ansto/instrument/TEST_SICS/fakeHuber/fakeHuber.py @@ -0,0 +1,219 @@ +#!/usr/bin/python +# vim: ft=python ts=8 sts=4 sw=4 et autoindent smartindent nocindent +# author: Douglas Clowes (douglas.clowes@ansto.gov.au) 2014 +# +from twisted.internet import reactor, protocol +from twisted.protocols.basic import LineReceiver +from twisted.python import log +from twisted.internet.task import LoopingCall + +import os +import sys +import curses +sys.path.insert(0, os.path.realpath(os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]),"../../util")))) +from displayscreen import Screen + +devices = [] + +def hexData(data): + str = "%04X" % data + return str + +def hexTemp(temp): + str = "%04X" % int(temp * 100) + if len(str) > 4: + str = str[-4:] + return str + +def binTemp(temp): + x = binData(temp) + x *= 0.01 + return x + +def binData(data): + x = int(data, 16) + if x > 0x8000: + x -= 0x10000 + return x + +class Huber(LineReceiver): + def __init__(self): + self.delimiter = '\r\n' + self.setpoint = 25.00 + self.internal = 25.00 + self.external = 24.00 + self.mode = 0 + self.control = 1 + self.min_set = 5.00 + self.max_set = 35.00 + self.data = {} + self.set_data() + + def set_data(self): + self.data["00"] = hexTemp(self.setpoint) + self.data["01"] = hexTemp(self.internal) + self.data["07"] = hexTemp(self.external) + self.data["13"] = hexData(self.mode) + self.data["14"] = hexData(self.control) + self.data["30"] = hexTemp(self.min_set) + self.data["31"] = hexTemp(self.max_set) + + def iterate(self): + if self.mode == 0: # internal + self.external = self.internal - 1.0 + if self.setpoint != self.internal: + self.internal = self.setpoint * 0.9 + self.internal * 0.1 + else: + self.internal = self.external + 1.0 + if self.setpoint != self.external: + self.external = self.setpoint * 0.9 + self.external * 0.1 + self.set_data() + + def write(self, data): + sent = data + self.delimiter + print "transmitted:", repr(sent) + self.transport.write(sent) + + def lineReceived(self, data): + print "lineReceived:", data + if len(data) >= 8 and data[0] == "{": + m_field = data[1] + t_field = data[2:4] + v_field = data[4:] + if t_field not in self.data: + print "Unknown address in: '%s'" % data + return + if v_field == "****": + self.write("{S" + t_field + self.data[t_field]) + return + else: + if t_field == "00": + newTemp = binTemp(v_field) + if self.min_set <= newTemp <= self.max_set: + self.setpoint = newTemp + if t_field == "13": + self.mode = binData(v_field) + if t_field == "14": + self.control = binData(v_field) + if t_field == "30": + newTemp = binTemp(v_field) + if -150 <= newTemp <= 350: + self.min_set = newTemp + if t_field == "31": + newTemp = binTemp(v_field) + if -150 <= newTemp <= 350: + self.max_set = newTemp + self.set_data() + return + print "Unimplemented command for: '%s'" % data + return + + def rawDataReceived(self, data): + #print "rawDataReceived:", repr(data) + self.transport.write(data) + self.line += data + if self.line.endswith(self.delimiter): + self.line = self.line[:-len(self.delimiter)] + self.lineReceived(self.line) + self.line = "" + + def connectionMade(self): + print "connectionMade" + devices.append(self) + + def connectionLost(self, reason): + print "connectionLost" + devices.remove(self) + +def device_iterator(): + global devices + for dev in devices: + dev.iterate() + +def display_iterator(): + global screen, devices + + try: + rows, cols = screen.stdscr.getmaxyx() + screen.stdscr.clear() + col = 0 + base = 0 + screen.stdscr.addstr(base + 1, 0, "Setpoint: ") + screen.stdscr.addstr(base + 2, 0, "Internal: ") + screen.stdscr.addstr(base + 3, 0, "External: ") + screen.stdscr.addstr(base + 4, 0, "Mode : ") + screen.stdscr.addstr(base + 5, 0, "Control : ") + screen.stdscr.addstr(base + 6, 0, "Min_Set : ") + screen.stdscr.addstr(base + 7, 0, "Max_Set : ") + for dev in devices: + col += 1 + screen.stdscr.addstr(0, 12 * col, "Top TODO") + try: + screen.stdscr.addstr(base + 1, 12 * col, "%8.2f" % dev.setpoint) + screen.stdscr.addstr(base + 2, 12 * col, "%8.2f" % dev.internal) + screen.stdscr.addstr(base + 3, 12 * col, "%8.2f" % dev.external) + screen.stdscr.addstr(base + 4, 12 * col, "%8d" % dev.mode) + screen.stdscr.addstr(base + 5, 12 * col, "%8d" % dev.control) + screen.stdscr.addstr(base + 6, 12 * col, "%8.2f" % dev.min_set) + screen.stdscr.addstr(base + 7, 12 * col, "%8.2f" % dev.max_set) + except: + pass + except: + raise + finally: + try: + screen.stdscr.refresh() + except: + pass + +class MyScreen(Screen): + def __init__(self, stdscr): + Screen.__init__(self, stdscr) + + def sendLine(self, txt): + global devices + + def write(self, txt): + try: + newLine = self.lines[-1] + " => " + txt + del self.lines[-1] + self.addLine(newLine) + except: + pass + +def main(): + global screen + import argparse + parser = argparse.ArgumentParser(description="Fake Huber device") + parser.add_argument('instrument', help='The instrument name', nargs='*') + parser.add_argument('-p', '--port',\ + help='Port on which to listen',\ + type=int, default=61000) + parser.add_argument('-v', '--verbose',\ + help='Print lots of stuff',\ + action='store_true', default=False) + parser.add_argument('-w', '--window',\ + help='Create a display window',\ + action='store_true', default=False) + args = parser.parse_args() + log.startLogging(open(("/tmp/Fake_Huber.log"), "w")) + if args.verbose: + print "Args:", args + Verbose = True + + if (args.window): + stdscr = curses.initscr() + screen = MyScreen(stdscr) + reactor.addReader(screen) + disp_iter = LoopingCall(display_iterator) + disp_iter.start(0.5) + + dev_iter = LoopingCall(device_iterator) + dev_iter.start(1.0) + factory = protocol.ServerFactory() + factory.protocol = Huber + reactor.listenTCP(args.port, factory) + reactor.run() + +if __name__ == "__main__": + main()