#!/usr/bin/python # vim: ft=python ts=8 sts=4 sw=4 et autoindent smartindent nocindent # author: Douglas Clowes (douglas.clowes@ansto.gov.au) 2014 # from twisted.internet import reactor, protocol from twisted.protocols.basic import LineReceiver from twisted.python import log from twisted.internet.task import LoopingCall import os import sys import curses sys.path.insert(0, os.path.realpath(os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]), "../../util")))) from displayscreen import Screen import argparse devices = [] class Card(object): def __init__(self): self.uid = None self.type = "NONE" self.nick = "None" def iterate(self): pass def doRead(self, parts): return self.write("STAT:DEV:N/A") def doWrite(self, parts): return self.write("STAT:DEV:N/A") def write(self, data): return data class Heater(Card): def __init__(self): super(Heater, self).__init__() self.type = "HTR" self.nick = "Heater" self.power = 50 def doRead(self, parts): if len(parts)>5 and parts[3] == "HTR" and parts[4] == "SIG" and parts[5] == "POWR": return self.write("STAT:DEV:"+self.uid+":HTR:SIG:POWR:%.4fW" % self.power) return self.write("STAT:DEV:"+self.uid+":HTR:INVALID") class Level(Card): def __init__(self): super(Level, self).__init__() self.type = "LVL" self.nick = "Level" self.hlev = 50 self.nlev = 60 def doRead(self, parts): if len(parts)>4 and parts[3] == "LVL" and parts[4] == "NICK": return self.write("STAT:DEV:"+self.uid+":LVL:NICK:%s" % self.nick) if len(parts)>6 and parts[3] == "LVL" and parts[4] == "SIG" and parts[5] == "HEL" and parts[6] == "LEV": return self.write("STAT:DEV:"+self.uid+":LVL:SIG:HEL:LEV:%7.4f%%" % self.hlev) if len(parts)>6 and parts[3] == "LVL" and parts[4] == "SIG" and parts[5] == "NIT" and parts[6] == "LEV": return self.write("STAT:DEV:"+self.uid+":LVL:SIG:NIT:LEV:%7.4f%%" % self.nlev) return self.write("STAT:DEV:"+self.uid+":LVL:INVALID") class Valve(Card): def __init__(self): super(Valve, self).__init__() self.type = "AUX" self.nick = "Valve" self.valve_open = 50 def doRead(self, parts): if len(parts)>4 and parts[3] == "AUX" and parts[4] == "NICK": return self.write("STAT:DEV:"+self.uid+":AUX:NICK:%s" % self.nick) if len(parts)>5 and parts[3] == "AUX" and parts[4] == "SIG" and parts[5] == "OPEN": return self.write("STAT:DEV:"+self.uid+":AUX:SIG:OPEN:%7.4f%%" % self.valve_open) return self.write("STAT:DEV:"+self.uid+":AUX:INVALID") class Temperature(Card): def __init__(self): super(Temperature, self).__init__() self.type = "TEMP" self.nick = "Temp" self.sp = 280 self.pv = 280 def iterate(self): if abs(self.sp - self.pv) > 0.1: self.pv += 0.1 * (self.sp - self.pv) else: self.pv = self.sp def doRead(self, parts): if len(parts)>4 and parts[3] == "TEMP" and parts[4] == "NICK": return self.write("STAT:DEV:"+self.uid+":TEMP:NICK:%s" % self.nick) if len(parts)>5 and parts[3] == "TEMP" and parts[4] == "LOOP" and parts[5] == "TSET": return self.write("STAT:DEV:"+self.uid+":TEMP:SIG:LOOP:TSET:%g" % self.sp) if len(parts)>5 and parts[3] == "TEMP" and parts[4] == "SIG" and parts[5] == "TEMP": return self.write("STAT:DEV:"+self.uid+":TEMP:SIG:TEMP:%7.4fK" % self.pv) return self.write("STAT:DEV:"+self.uid+":LVL:INVALID") def doWrite(self, parts): if len(parts)>6 and parts[3] == "TEMP" and parts[4] == "LOOP" and parts[5] == "TSET": self.sp = parts[6] return self.write("STAT:DEV:"+self.uid+":TEMP:SIG:LOOP:TSET:VALID") return self.write("STAT:DEV:"+self.uid+":TEMP:INVALID") class Pressure(Card): def __init__(self): super(Pressure, self).__init__() self.type = "PRES" self.nick = "Pres" self.sp = 280 self.pv = 280 def iterate(self): if abs(self.sp - self.pv) > 0.1: self.pv += 0.1 * (self.sp - self.pv) else: self.pv = self.sp def doRead(self, parts): if len(parts)>4 and parts[3] == "PRES" and parts[4] == "NICK": return self.write("STAT:DEV:"+self.uid+":PRES:NICK:%s" % self.nick) if len(parts)>5 and parts[3] == "PRES" and parts[4] == "LOOP" and parts[5] == "TSET": return self.write("STAT:DEV:"+self.uid+":PRES:SIG:LOOP:TSET:%g" % self.sp) if len(parts)>5 and parts[3] == "PRES" and parts[4] == "SIG" and parts[5] == "PRES": return self.write("STAT:DEV:"+self.uid+":PRES:SIG:PRES:%7.4f" % self.pv) return self.write("STAT:DEV:"+self.uid+":PRES:INVALID") def doWrite(self, parts): if len(parts)>6 and parts[3] == "PRES" and parts[4] == "LOOP" and parts[5] == "TSET": self.sp = parts[6] return self.write("STAT:DEV:"+self.uid+":PRES:SIG:LOOP:TSET:VALID") return self.write("STAT:DEV:"+self.uid+":PRES:INVALID") class Mercury(LineReceiver): def __init__(self): self.delimiter = '\r\n' self.setRawMode() self.line = "" self.config = args.config self.cards = [] self.mb0 = Heater() self.mb0.uid = 'MB0.H1' self.mb0.nick = 'Mobo0.Heater' self.mb1 = Temperature() self.mb1.uid = 'MB1.T1' self.mb1.nick = 'Mobo1.Temp' for char in self.config: if char.upper() == 'N': self.cards.append(Card()) self.cards[-1].uid = "DB%d.%s1" % (len(self.cards), "N") self.cards[-1].nick = "Slot%d.%s" % (len(self.cards), "None") if char.upper() == 'H': self.cards.append(Heater()) self.cards[-1].uid = "DB%d.%s1" % (len(self.cards), "H") self.cards[-1].nick = "Slot%d.%s" % (len(self.cards), "Heater") if char.upper() == 'L': self.cards.append(Level()) self.cards[-1].uid = "DB%d.%s1" % (len(self.cards), "L") self.cards[-1].nick = "Slot%d.%s" % (len(self.cards), "Level") if char.upper() == 'V': self.cards.append(Valve()) self.cards[-1].uid = "DB%d.%s1" % (len(self.cards), "G") self.cards[-1].nick = "Slot%d.%s" % (len(self.cards), "Valve") if char.upper() == 'P': self.cards.append(Pressure()) self.cards[-1].uid = "DB%d.%s1" % (len(self.cards), "P") self.cards[-1].nick = "Slot%d.%s" % (len(self.cards), "Pres") if char.upper() == 'T': self.cards.append(Temperature()) self.cards[-1].uid = "DB%d.%s1" % (len(self.cards), "T") self.cards[-1].nick = "Slot%d.%s" % (len(self.cards), "Temp") for idx, card in enumerate([self.mb0, self.mb1] + self.cards): print idx, card.type, card.uid, card.nick def iterate(self): self.mb0.iterate() self.mb1.iterate() for card in self.cards: card.iterate def write(self, data): sent = data print "transmitted:", repr(sent) self.transport.write(sent) self.transport.write(self.delimiter) def read_sys_cat(self): result = "STAT:SYS:CAT" result += ":DEV:" + self.mb1.uid + ":" + self.mb1.type for card in self.cards: result += ":DEV:" + card.uid + ":" + card.type result += ":DEV:" + self.mb0.uid + ":" + self.mb0.type return result def doRead(self, parts): if parts[1] == "SYS": if parts[2] == "CAT": return self.read_sys_cat() elif parts[1] == "DEV": for card in [self.mb0, self.mb1] + self.cards: if card.uid == parts[2]: return card.doRead(parts) print "doRead card not found:", parts return "STAT:DEV:"+parts[2]+":N/A" else: print "doRead ERROR:", parts def doWrite(self, parts): if parts[1] == "SYS": pass elif parts[1] == "DEV": for card in [self.mb0, self.mb1] + self.cards: if card.uid == parts[2]: return card.doWrite(parts) print "doWrite card not found:", parts return "STAT:DEV:"+parts[2]+":N/A" else: print "doWrite ERROR:", parts def lineReceived(self, data): print "lineReceived:", data if data == "*IDN?": return self.write("IDN:OXFORD INSTRUMENTS:MERCURY ITC:65:2.0.0") parts = data.split(":") if parts[0] == "READ": return self.write(self.doRead(parts)) elif parts[0] == "SET": return self.write(self.doWrite(parts)) print "ERROR:", data def rawDataReceived(self, data): for char in data: self.line += char if self.line.endswith(self.delimiter): self.line = self.line[:-len(self.delimiter)] self.lineReceived(self.line) self.line = "" class MyScreen(Screen): def __init__(self, stdscr): Screen.__init__(self, stdscr) def sendLine(self, txt): global devices def write(self, txt): try: newLine = self.lines[-1] + " => " + txt del self.lines[-1] self.addLine(newLine) except: pass def connectionMade(self): print "connectionMade" devices.append(self) def connectionLost(self, reason): print "connectionLost" devices.remove(self) def device_iterator(): global devices for dev in devices: dev.iterate() def display_iterator(): global screen, devices try: rows, cols = screen.stdscr.getmaxyx() screen.stdscr.clear() col = 0 except: raise finally: try: screen.stdscr.refresh() except: pass def main(): global args parser = argparse.ArgumentParser(description="Fake Mercury ITC") parser.add_argument('instrument', help='The instrument name', nargs='*') parser.add_argument('-c', '--config',\ help='wconfiguration string [HHHVLTTP]',\ type=str, default="HHHVLTTP") parser.add_argument('-p', '--port',\ help='Port on which to listen',\ type=int, default=7020) parser.add_argument('-v', '--verbose',\ help='Print lots of stuff',\ action='store_true', default=False) parser.add_argument('-w', '--window',\ help='Create a display window',\ action='store_true', default=False) args = parser.parse_args() log.startLogging(open(("/tmp/Fake_Mercury_%04d.log" % args.port), "w")) if args.verbose: print "Args:", args Verbose = True if (args.window): stdscr = curses.initscr() screen = MyScreen(stdscr) reactor.addReader(screen) disp_iter = LoopingCall(display_iterator) disp_iter.start(0.5) dev_iter = LoopingCall(device_iterator) dev_iter.start(1.0) factory = protocol.ServerFactory() factory.protocol = Mercury reactor.listenTCP(args.port, factory) reactor.run() if __name__ == "__main__": main()