From 22bb6122969899b0b8722c429669af112a69f29f Mon Sep 17 00:00:00 2001 From: Douglas Clowes Date: Mon, 16 Feb 2015 12:36:12 +1100 Subject: [PATCH] Fake device for Oxford LabView testing --- .../instrument/TEST_SICS/fakeOLV/FakeOLV.py | 124 ++++++++++++++++++ 1 file changed, 124 insertions(+) create mode 100755 site_ansto/instrument/TEST_SICS/fakeOLV/FakeOLV.py diff --git a/site_ansto/instrument/TEST_SICS/fakeOLV/FakeOLV.py b/site_ansto/instrument/TEST_SICS/fakeOLV/FakeOLV.py new file mode 100755 index 00000000..52363069 --- /dev/null +++ b/site_ansto/instrument/TEST_SICS/fakeOLV/FakeOLV.py @@ -0,0 +1,124 @@ +#!/usr/bin/python +# vim: ft=python ts=8 sts=4 sw=4 et autoindent smartindent nocindent +# author: Douglas Clowes (douglas.clowes@ansto.gov.au) 2015 +''' + Fake Oxford LabView program for the 10/12 Tesla magnet driver +''' + +from twisted.internet import reactor, protocol +from twisted.internet.task import LoopingCall + +devices = [] + +class FakeOLV(protocol.Protocol): + '''Fake Oxford Labview Protocol Object''' + def __init__(self): + self.setpoint = 0.0 + self.field = 0.0 + self.rate = 0.0 + self.hsw = 0 + self.t_1 = 120.0 + self.t_2 = 120.0 + self.t_3 = 120.0 + self.dsp = 0.0 + self.dpv = 0.0 + self.hsp = 0.0 + self.hpv = 0.0 + + def dataReceived(self, data): + print "RECEIVED ", data + reply = "" + cmd_pars = data.split() + if len(cmd_pars) < 1: + return + cmd = cmd_pars[0].lower() + if cmd.startswith("set"): + reply = self.set_command(cmd, cmd_pars) + elif cmd.startswith("get"): + reply = self.get_command(cmd) + else: + reply = "Error Command!" + print "REPLY ", reply + self.transport.write(reply + "\r\n") + + def set_command(self, cmd, cmd_pars): + '''Execute a setXX command''' + reply = "" + if len(cmd_pars) < 2: + reply = "Error Command!" + elif cmd == "setr": + self.rate = float(cmd_pars[1]) + elif cmd == "seths": + if cmd_pars[1].lower() == "on": + self.hsw = 1 + if cmd_pars[1].lower() == "off": + self.hsw = 0 + elif cmd == "sethelioxtemp": + self.hsp = float(cmd_pars[1]) + elif cmd == "setdiluttemp": + self.dsp = float(cmd_pars[1]) + elif cmd == "setf": + self.setpoint = float(cmd_pars[1]) + else: + reply = "Error Command!" + return reply + + def get_command(self, cmd): + '''Execute a getXX command''' + reply = "" + if cmd == "getf": + current = self.field * 10.0 + reply = "Field_Current=%.6f;" % current + reply += "Field_Tesla=%.6f" % self.field + elif cmd == "gett": + reply = "T1=120.000000;T2=120.000000;T3=120.000000" + elif cmd == "gethelioxtemp": + reply = "Temp=0.000000;SetPoint=0.000000 Cdeg" + elif cmd == "getdilutiontemp": + reply = "Temp=0.000000;SetPoint=0.000000" + else: + reply = "Error Command!" + return reply + + def iterate(self): + '''Iterate the setpoints''' + if abs(self.field - self.setpoint) > 0.01: + self.field += 0.1 * (self.setpoint - self.field) + else: + self.field = self.setpoint + if abs(self.dsp - self.dpv) > 0.01: + self.dpv += 0.1 * (self.dsp - self.dpv) + else: + self.dpv = self.dsp + if abs(self.hsp - self.hpv) > 0.01: + self.hpv += 0.1 * (self.hsp - self.hpv) + else: + self.hpv = self.hsp + + def connectionMade(self): + '''Add this new connection to the list of devices''' + print "connectionMade" + devices.append(self) + + def connectionLost(self, reason): + '''Drop this old connection from the list of devices''' + print "connectionLost" + devices.remove(self) + +def device_iterator(): + '''Iterate over all connected devices and do them''' + #print "device_iterator" + for dev in devices: + dev.iterate() + +def main(): + '''Main program''' + dev_iter = LoopingCall(device_iterator) + dev_iter.start(1.0) + factory = protocol.ServerFactory() + factory.protocol = FakeOLV + reactor.listenTCP(55001, factory) + reactor.run() + +if __name__ == "__main__": + main()