Fake device for Oxford LabView testing
This commit is contained in:
124
site_ansto/instrument/TEST_SICS/fakeOLV/FakeOLV.py
Executable file
124
site_ansto/instrument/TEST_SICS/fakeOLV/FakeOLV.py
Executable file
@ -0,0 +1,124 @@
|
||||
#!/usr/bin/python
|
||||
# vim: ft=python ts=8 sts=4 sw=4 et autoindent smartindent nocindent
|
||||
# author: Douglas Clowes (douglas.clowes@ansto.gov.au) 2015
|
||||
'''
|
||||
Fake Oxford LabView program for the 10/12 Tesla magnet driver
|
||||
'''
|
||||
|
||||
from twisted.internet import reactor, protocol
|
||||
from twisted.internet.task import LoopingCall
|
||||
|
||||
devices = []
|
||||
|
||||
class FakeOLV(protocol.Protocol):
|
||||
'''Fake Oxford Labview Protocol Object'''
|
||||
def __init__(self):
|
||||
self.setpoint = 0.0
|
||||
self.field = 0.0
|
||||
self.rate = 0.0
|
||||
self.hsw = 0
|
||||
self.t_1 = 120.0
|
||||
self.t_2 = 120.0
|
||||
self.t_3 = 120.0
|
||||
self.dsp = 0.0
|
||||
self.dpv = 0.0
|
||||
self.hsp = 0.0
|
||||
self.hpv = 0.0
|
||||
|
||||
def dataReceived(self, data):
|
||||
print "RECEIVED ", data
|
||||
reply = ""
|
||||
cmd_pars = data.split()
|
||||
if len(cmd_pars) < 1:
|
||||
return
|
||||
cmd = cmd_pars[0].lower()
|
||||
if cmd.startswith("set"):
|
||||
reply = self.set_command(cmd, cmd_pars)
|
||||
elif cmd.startswith("get"):
|
||||
reply = self.get_command(cmd)
|
||||
else:
|
||||
reply = "Error Command!"
|
||||
print "REPLY ", reply
|
||||
self.transport.write(reply + "\r\n")
|
||||
|
||||
def set_command(self, cmd, cmd_pars):
|
||||
'''Execute a setXX command'''
|
||||
reply = ""
|
||||
if len(cmd_pars) < 2:
|
||||
reply = "Error Command!"
|
||||
elif cmd == "setr":
|
||||
self.rate = float(cmd_pars[1])
|
||||
elif cmd == "seths":
|
||||
if cmd_pars[1].lower() == "on":
|
||||
self.hsw = 1
|
||||
if cmd_pars[1].lower() == "off":
|
||||
self.hsw = 0
|
||||
elif cmd == "sethelioxtemp":
|
||||
self.hsp = float(cmd_pars[1])
|
||||
elif cmd == "setdiluttemp":
|
||||
self.dsp = float(cmd_pars[1])
|
||||
elif cmd == "setf":
|
||||
self.setpoint = float(cmd_pars[1])
|
||||
else:
|
||||
reply = "Error Command!"
|
||||
return reply
|
||||
|
||||
def get_command(self, cmd):
|
||||
'''Execute a getXX command'''
|
||||
reply = ""
|
||||
if cmd == "getf":
|
||||
current = self.field * 10.0
|
||||
reply = "Field_Current=%.6f;" % current
|
||||
reply += "Field_Tesla=%.6f" % self.field
|
||||
elif cmd == "gett":
|
||||
reply = "T1=120.000000;T2=120.000000;T3=120.000000"
|
||||
elif cmd == "gethelioxtemp":
|
||||
reply = "Temp=0.000000;SetPoint=0.000000 Cdeg"
|
||||
elif cmd == "getdilutiontemp":
|
||||
reply = "Temp=0.000000;SetPoint=0.000000"
|
||||
else:
|
||||
reply = "Error Command!"
|
||||
return reply
|
||||
|
||||
def iterate(self):
|
||||
'''Iterate the setpoints'''
|
||||
if abs(self.field - self.setpoint) > 0.01:
|
||||
self.field += 0.1 * (self.setpoint - self.field)
|
||||
else:
|
||||
self.field = self.setpoint
|
||||
if abs(self.dsp - self.dpv) > 0.01:
|
||||
self.dpv += 0.1 * (self.dsp - self.dpv)
|
||||
else:
|
||||
self.dpv = self.dsp
|
||||
if abs(self.hsp - self.hpv) > 0.01:
|
||||
self.hpv += 0.1 * (self.hsp - self.hpv)
|
||||
else:
|
||||
self.hpv = self.hsp
|
||||
|
||||
def connectionMade(self):
|
||||
'''Add this new connection to the list of devices'''
|
||||
print "connectionMade"
|
||||
devices.append(self)
|
||||
|
||||
def connectionLost(self, reason):
|
||||
'''Drop this old connection from the list of devices'''
|
||||
print "connectionLost"
|
||||
devices.remove(self)
|
||||
|
||||
def device_iterator():
|
||||
'''Iterate over all connected devices and do them'''
|
||||
#print "device_iterator"
|
||||
for dev in devices:
|
||||
dev.iterate()
|
||||
|
||||
def main():
|
||||
'''Main program'''
|
||||
dev_iter = LoopingCall(device_iterator)
|
||||
dev_iter.start(1.0)
|
||||
factory = protocol.ServerFactory()
|
||||
factory.protocol = FakeOLV
|
||||
reactor.listenTCP(55001, factory)
|
||||
reactor.run()
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
Reference in New Issue
Block a user