diff --git a/site_ansto/instrument/TEST_SICS/fakeHiden/xcs/HidenDevice.py b/site_ansto/instrument/TEST_SICS/fakeHiden/xcs/HidenDevice.py new file mode 100755 index 00000000..3e82795e --- /dev/null +++ b/site_ansto/instrument/TEST_SICS/fakeHiden/xcs/HidenDevice.py @@ -0,0 +1,112 @@ +# vim: ft=python ts=8 sts=4 sw=4 expandtab autoindent smartindent nocindent +# +# Generic Hiden Humidity Controller Device +# +# Author: Douglas Clowes (2014) +# +import inspect +import traceback + +class HidenDevice(object): + + def __init__(self): + print HidenDevice.__name__, "ctor" + #print "Methods:", inspect.getmembers(self, inspect.ismethod) + methods = inspect.getmembers(self, inspect.ismethod) + self.myMethods = {} + for method in methods: + self.myMethods[method[0]] = method[1:] + #for method in sorted(self.myMethods): + # print "Method:", method, self.myMethods[method], type(method), type(self.myMethods[method]) + + def reset_powerup(self): + print HidenDevice.__name__, "reset_powerup" + + def write(self, response): + print "Device Response: %s" % response + self.protocol.write(response) + + def doCommand(self, command, params): + print HidenDevice.__name__, "Command:", command, params + method = "doCommand%s" % command + if method in self.myMethods: + action = "response = self.%s(command, params)" % method + print "Action:", action + exec action + if response: + return response + else: + print "Unimplemented Command:", command, params + return False + + def doQuery(self, command, params): + print HidenDevice.__name__, "Query:", command, params + method = "doQuery%s" % command + if method in self.myMethods: + action = "response = self.%s(command, params)" % method + print "Action:", action + exec action + if response: + return response + else: + print "Unimplemented Query:", command, params + self.write("Unimplemented Query: %s" % command) + return False + + def doQueryRANDOM(self, command, params): + self.write("%f" % self.RANDOM) + + def doCommandRANDOM(self, command, params): + self.RANDOM = float(params[0]) + + def mergeParams(self, count, theOld, theNew): + oldParams = theOld.split(",") + if len(oldParams) != count: + raise IndexError + newParams = theNew.split(",") + if len(newParams) > count: + raise IndexError + mergedParams = [] + for idx in range(count): + if oldParams[idx] == '': + raise ValueError + if idx >= len(newParams) or newParams[idx] == '': + mergedParams.append(oldParams[idx]) + else: + mergedParams.append(newParams[idx]) + if len(mergedParams) != count: + raise IndexError + return ",".join(mergedParams) + + def dataReceived(self, data): + print HidenDevice.__name__, "PDU: \"" + data + "\"" + command = data.split(",")[0].split()[0] + params = data[len(command)+1:].strip().split(",") + try: + if command[0] == "?": + command = command[1:] + self.doQuery(command, params) + else: + command = command[1:] + self.doCommand(command, params) + except: + traceback.print_exc() + +if __name__ == '__main__': + class TestProtocol: + def __init__(self): + print self.__class__.__name__, "ctor" + self.numProtocols = 0 + def write(self, data): + print "test write:", data + def loseConnection(self): + print "test lose connection" + test_protocol = TestProtocol() + test_device = HidenDevice() + test_device.protocol = test_protocol + test_device.dataReceived("?DOUT,2") + test_device.dataReceived("!DOUT,2,10") + test_device.dataReceived("?AIN,0") + test_device.dataReceived("?AOUT,0") + test_device.dataReceived("!AOUT,0") + test_device.dataReceived("?ALL DATA") diff --git a/site_ansto/instrument/TEST_SICS/fakeHiden/xcs/HidenFactory.py b/site_ansto/instrument/TEST_SICS/fakeHiden/xcs/HidenFactory.py new file mode 100644 index 00000000..d3897fab --- /dev/null +++ b/site_ansto/instrument/TEST_SICS/fakeHiden/xcs/HidenFactory.py @@ -0,0 +1,47 @@ +# vim: ft=python ts=8 sts=4 sw=4 expandtab autoindent smartindent nocindent +# Fake Hiden Humidity Controller Factory +# +# Author: Douglas Clowes 2014 +# +from twisted.internet.protocol import ServerFactory + +class HidenFactory(ServerFactory): + """Factory object used by the Twisted Infrastructure to create a Protocol + object for incomming connections""" + + protocol = None + + def __init__(self, theProtocol, theDevice, theTerminator = "\n"): + print HidenFactory.__name__, "ctor" + self.protocol = theProtocol + self.device = theDevice + self.term = theTerminator + self.numProtocols = 0 + + def buildProtocol(self, addr): + p = self.protocol(self.device, self.term) + p.factory = self + return p + +if __name__ == '__main__': + class TestProtocol: + def __init__(self, theDevice, theTerm = "\n"): + self.device = theDevice + self.response = "" + self.term = theTerm + + class TestDevice: + def __init__(self): + pass + + new_factory = HidenFactory(TestProtocol, TestDevice, "\n"); + new_protocol = new_factory.buildProtocol("address") + print "Factory: ", new_factory + print " .protocol", new_factory.protocol + print " .device ", new_factory.device + print " .num_prot", new_factory.numProtocols + print " .term ", new_factory.term + print "Protocol: ", new_protocol + print " .device ", new_protocol.device + print " .response", new_protocol.response + print " .term ", new_protocol.term diff --git a/site_ansto/instrument/TEST_SICS/fakeHiden/xcs/HidenProtocol.py b/site_ansto/instrument/TEST_SICS/fakeHiden/xcs/HidenProtocol.py new file mode 100755 index 00000000..7b340ec0 --- /dev/null +++ b/site_ansto/instrument/TEST_SICS/fakeHiden/xcs/HidenProtocol.py @@ -0,0 +1,89 @@ +# vim: ft=python ts=8 sts=4 sw=4 expandtab autoindent smartindent nocindent +# Fake Hiden Temperature Controller Protocol +# +# Author: Douglas Clowes 2014 +# +from twisted.internet.protocol import Protocol + +class HidenProtocol(Protocol): + """Protocol object used by the Twisted Infrastructure to handle connections""" + + def __init__(self, theDevice, theTerminator = "\n"): + print HidenProtocol.__name__, "ctor" + self.device = theDevice + self.response = "" + self.term = theTerminator + + def write(self, response): + self.response = self.response + response + + def connectionMade(self): + self.pdu = "" + self.response = "" + self.device.protocol = self + self.factory.numProtocols = self.factory.numProtocols + 1 + print "connectionMade:", self.factory.numProtocols + if self.factory.numProtocols > 2: + print "Too many connections - rejecting" + self.transport.write("Too many connections, try later" + self.term) + self.transport.loseConnection() + else: + self.transport.write(("Welcome connection %d" % self.factory.numProtocols) + self.term) + + def connectionLost(self, reason): + print "connectionLost:", self.factory.numProtocols, reason + self.factory.numProtocols = self.factory.numProtocols - 1 + + def lineReceived(self, data): + print "lineReceived - len:", len(data), data + self.device.protocol = self + self.device.dataReceived(data) + + def dataReceived(self, data): + print "dataReceived - len:", len(data), data + for c in data: + if c == "\r" or c == ";" or c == "\n": + if len(self.pdu) > 0: + self.lineReceived(self.pdu) + self.pdu = "" + if len(self.response) > 0: + print "Protocol Response: %s" % self.response + self.transport.write(self.response + self.term) + self.response = "" + elif c == "A" and len(self.pdu) == 0: + self.lineReceived("?ALL DATA") + if len(self.response) > 0: + print "Protocol Response: %s" % self.response + self.transport.write(self.response + self.term) + self.response = "" + else: + self.pdu = self.pdu + c + +if __name__ == '__main__': + class TestDevice: + def __init__(self): + print self.__class__.__name__, "ctor" + def dataReceived(self, pdu): + print "test device data received:", pdu + self.protocol.write("test device response") + class TestFactory: + def __init__(self): + self.numProtocols = 0 + def write(self, data): + print "test write:", data, + def loseConnection(self): + print "test lose connection" + self.protocol.connectionLost("Factory") + myTerm = "\n" + test_device = TestDevice() + test_factory = TestFactory() + test_protocol = HidenProtocol(test_device, myTerm) + test_factory.protocol = test_protocol + test_protocol.factory = test_factory + test_protocol.transport = test_factory + test_protocol.connectionMade() + test_protocol.connectionMade() + test_protocol.connectionMade() + test_protocol.connectionLost("Dunno") + test_protocol.dataReceived("!AOUT,0,30" + myTerm + "?AOUT,0") + test_protocol.connectionLost("Dunno") diff --git a/site_ansto/instrument/TEST_SICS/fakeHiden/xcs/HidenXCS.py b/site_ansto/instrument/TEST_SICS/fakeHiden/xcs/HidenXCS.py new file mode 100644 index 00000000..29a02d30 --- /dev/null +++ b/site_ansto/instrument/TEST_SICS/fakeHiden/xcs/HidenXCS.py @@ -0,0 +1,180 @@ +# vim: ft=python ts=8 sts=4 sw=4 expandtab autoindent smartindent nocindent +# Fake Hiden Temperature Controller +# +# Author: Douglas Clowes 2014 +# +from HidenDevice import HidenDevice +import random +import re +import os +import sys +import time + +sys.path.insert(0, os.path.realpath(os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]),"../../../util")))) +from fopdt import fopdt, fopdt_sink +from pid import PID + +class Loop(fopdt): + def __init__(self, the_temp, the_nick): + fopdt.__init__(self, the_temp) + self.setpoint = the_temp + self.sensor = the_temp + # P, I, D + self.pid = PID(0.05, 0.02, 1.2) + self.pid.setPoint(the_temp) + # Value, Kp, Tp, Td, Absolute + self.AddSource(fopdt_sink(0, 2, 13, 10, True)) + self.AddSink(fopdt_sink(the_temp, 1, 30, 1, False)) + self.power = 0 + self.nick = the_nick + self.count = 0 + + def Setpoint(self, the_sp): + self.setpoint = the_sp + self.pid.setPoint(the_sp) + + def doIteration(self): + self.pid_delta = self.pid.update(self.pv) + self.sources[0].value = self.pid_delta + if self.sources[0].value > 100.0: + self.sources[0].value = 100.0 + if self.sources[0].value < 0.0: + self.sources[0].value = 0.0 + self.count += 1 + self.iterate(self.count) + self.sensor = 0.9 * self.sensor + 0.1 * self.setpoint + +class HidenXCS(HidenDevice): + """Hiden XCS humidity controller object - simulates the device""" + + def __init__(self): + HidenDevice.__init__(self) + print HidenXCS.__name__, "ctor" + self.RANDOM = 0.0 + self.IDN = "Simulated Hiden XCS" + self.CONFIG_LOOPS = [0, 1, 2] + self.AnalogIn = {0:0, 1:0, 2:0, 8:0, 9:0, 12:0} + self.AnalogOut = {0:0, 1:0, 2:0} + self.DigitalOut = {2:0} + self.humidity = 0.0 + self.temperature = 49.22 + self.Loops = {} + self.Loops[0] = Loop(0, "Dry") + self.Loops[1] = Loop(0, "Wet") + self.Loops[2] = Loop(0, "Special") + self.reset_powerup() + + def doCommand(self, command, params): + print HidenXCS.__name__, "Command:", command, params + return HidenDevice.doCommand(self, command, params) + + def doQuery(self, command, params): + print HidenXCS.__name__, "Query:", command, params + return HidenDevice.doQuery(self, command, params) + + def reset_powerup(self): + print HidenXCS.__name__, "reset_powerup" + self.LAST_ITERATION = 0 + + def doIteration(self): + delta_time = time.time() - self.LAST_ITERATION + if delta_time < 1: + return + #print "DoIteration:", delta_time + self.LAST_ITERATION = time.time() + self.flow = 0.0 + for idx in self.CONFIG_LOOPS: + if self.DigitalOut[2]: + self.Loops[idx].setpoint = 500.0 * float(self.AnalogOut[idx]) / 4096.0 + else: + self.Loops[idx].setpoint = 0.0 + self.Loops[idx].doIteration() + self.AnalogIn[idx] = int(self.Loops[idx].sensor / 500.0 * 65536.0) + self.flow += self.Loops[idx].sensor + self.temperature = 50.00 + if self.DigitalOut[2] and (self.AnalogIn[0] + self.AnalogIn[1] + self.AnalogIn[2]) > 0: + self.humidity = 100.0 + self.humidity *= self.AnalogIn[1] + self.humidity /= (self.AnalogIn[0] + self.AnalogIn[1] + self.AnalogIn[2]) + + def doCommandAOUT(self, cmd, args): + idx = int(args[0]) + if idx in self.AnalogOut: + self.AnalogOut[idx] = int(args[1]) + else: + print "Unknown index" + print "Command: \"AOUT\" in \"" + cmd + ":" + ":".join(args) + "\"" + + def doCommandDOUT(self, cmd, args): + idx = int(args[0]) + if idx in self.DigitalOut: + self.DigitalOut[idx] = int(args[1]) + else: + print "Unknown index" + print "Command: \"DOUT\" in \"" + cmd + ":" + ":".join(args) + "\"" + + def doQueryAIN(self, cmd, args): + idx = int(args[0]) + if idx in self.AnalogIn: + self.write("AIN = %d" % self.AnalogIn[idx]) + else: + self.write("AIN = %d" % 0) + print "Query: \"AIN\" in \"" + cmd + ":" + ":".join(args) + "\"" + + def doQueryAOUT(self, cmd, args): + idx = int(args[0]) + if idx in self.AnalogOut: + self.write("AOUT = %d" % self.AnalogOut[idx]) + else: + self.write("AOUT = %d" % 0) + print "Query: \"AOUT\" in \"" + cmd + ":" + ":".join(args) + "\"" + + def doQueryDOUT(self, cmd, args): + idx = int(args[0]) + if idx in self.DigitalOut: + self.write("DOUT = %d" % self.DigitalOut[idx]) + else: + self.write("DOUT = %d" % 0) + print "Query: \"DOUT\" in \"" + cmd + ":" + ":".join(args) + "\"" + + def doQueryALL(self, cmd, args): + line = "A " + line += "%4.2f" % self.humidity + line += ",%4.2f" % self.temperature + line += ",5051.5" + line += ",%5.2f" % (500.0 * self.AnalogIn[0] / 65536.0) + line += ",%5.2f" % (500.0 * self.AnalogIn[1] / 65536.0) + line += ",%5.2f" % (500.0 * self.AnalogIn[2] / 65536.0) + line += ",%-99.59,1100" + self.write(line) + print "Query: \"ALL\" in \"" + cmd + ":" + ":".join(args) + "\"" + +if __name__ == '__main__': + from HidenProtocol import HidenProtocol + + class TestFactory: + def __init__(self): + print self.__class__.__name__, "ctor" + self.numProtocols = 0 + def write(self, data): + print "test write:", data, + def loseConnection(self): + print "test lose connection" + test_factory = TestFactory() + test_device = HidenXCS() + test_protocol = HidenProtocol(test_device, "\n") + test_protocol.factory = test_factory + test_protocol.transport = test_factory + test_device.protocol = test_protocol + test_device.protocol.connectionMade() + commands = [ + "!DOUT,2,1", + "?DOUT,2", + "!AOUT,2,1", + "?AOUT,2", + "?AIN,2", + "?ALL DATA"] + for cmd in commands: + test_device.protocol.dataReceived(cmd) + test_device.protocol.dataReceived(test_protocol.term) + test_device.protocol.connectionLost("Dunno") diff --git a/site_ansto/instrument/TEST_SICS/fakeHiden/xcs/SIM_HIDEN.py b/site_ansto/instrument/TEST_SICS/fakeHiden/xcs/SIM_HIDEN.py new file mode 100755 index 00000000..48a55859 --- /dev/null +++ b/site_ansto/instrument/TEST_SICS/fakeHiden/xcs/SIM_HIDEN.py @@ -0,0 +1,141 @@ +#!/usr/bin/env python +# vim: ft=python ts=8 sts=4 sw=4 expandtab autoindent smartindent nocindent +# Author: Douglas Clowes (dcl@ansto.gov.au) 2013-06-03 + +from twisted.internet.task import LoopingCall +from twisted.internet import reactor +from twisted.python import log, usage + +from HidenXCS import HidenXCS as MYBASE +from HidenFactory import HidenFactory +from HidenProtocol import HidenProtocol +import os +import sys + +sys.path.insert(0, os.path.realpath(os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]),"../../../util")))) +from displayscreen import Screen + +class MyOptions(usage.Options): + optFlags = [ + ["window", "w", "Create a display window"], + ] + optParameters = [ + ["logfile", "l", None, "output logfile name"], + ["port", "p", None, "port number to listen on"], + ] + def __init__(self): + usage.Options.__init__(self) + self['files'] = [] + + def parseArgs(self, *args): + for arg in args: + self['files'].append(arg) + +class MyScreen(Screen): + def __init__(self, stdscr): + Screen.__init__(self, stdscr) + + def sendLine(self, txt): + global myDev + myDev.protocol = self + myDev.dataReceived(txt) + + def write(self, txt): + try: + newLine = self.lines[-1] + " => " + txt + del self.lines[-1] + self.addLine(newLine) + except: + pass + +class MYDEV(MYBASE): + def __init__(self): + MYBASE.__init__(self) + print MYDEV.__name__, "ctor" + +def device_display(): + global screen, myDev, myOpts, myPort, myFactory + try: + myDev.doIteration(); + except: + raise + + if not myOpts["window"]: + return + + try: + rows, cols = screen.stdscr.getmaxyx() + screen.stdscr.addstr(0, 0, "Lnks:%2d" % myFactory.numProtocols) + screen.stdscr.addstr(0, 10, "Rnd:%6.3f" % myDev.RANDOM) + screen.stdscr.addstr(0, 22, "Identity : %s (%d)" % (myDev.IDN, myPort)) + screen.stdscr.addstr(1, 10, "Flow : %6.3f" % myDev.flow) + screen.stdscr.addstr(1, 30, "Humidity : %6.3f" % myDev.humidity) + screen.stdscr.addstr(1, 50, "Temperature : %6.3f" % myDev.temperature) + base = 1 + screen.stdscr.addstr(base + 1, 0, "Sensor :") + screen.stdscr.addstr(base + 2, 0, "PV :") + screen.stdscr.addstr(base + 3, 0, "Setpoint :") + screen.stdscr.addstr(base + 4, 0, "T Delta :") + screen.stdscr.addstr(base + 5, 0, "PV Delta :") + for idx in myDev.CONFIG_LOOPS: + if 12 + (idx) * 12 > cols - 1: + break + screen.stdscr.addstr(base + 1, 12 + (idx) * 12, "%8.3f" % myDev.Loops[idx].sensor) + screen.stdscr.addstr(base + 2, 12 + (idx) * 12, "%8.3f" % myDev.Loops[idx].pv) + screen.stdscr.addstr(base + 3, 12 + (idx) * 12, "%8.3f" % myDev.Loops[idx].setpoint) + screen.stdscr.addstr(base + 4, 12 + (idx) * 12, "%8.3f" % (myDev.Loops[idx].setpoint - myDev.Loops[idx].sensor)) + screen.stdscr.addstr(base + 5, 12 + (idx) * 12, "%8.3f" % (myDev.Loops[idx].setpoint - myDev.Loops[idx].pid_delta)) + except: + pass + finally: + try: + screen.stdscr.refresh() + except: + pass + +if __name__ == "__main__": + global screen, myDev, myOpts, myPort, myFactory + + myOpts = MyOptions() + try: + myOpts.parseOptions() + except usage.UsageError, errortext: + print '%s: %s' % (sys.argv[0], errortext) + print '%s: Try --help for usage details.' % (sys.argv[0]) + raise SystemExit, 1 + + myDev = MYDEV() + default_port = 4020 + myPort = default_port + logfile = None + + if myOpts["port"]: + myPort = int(myOpts["port"]) + if myPort < 1025 or myPort > 65535: + myPort = default_port + + if myOpts["window"]: + logfile = "/tmp/Fake_Hiden_%d.log" % (myPort) + + if myOpts["logfile"]: + logfile = myOpts["logfile"] + + if logfile: + log.startLogging(open(logfile, "w")) + else: + log.startLogging(sys.stdout) + #log.startLogging(sys.stderr) + + if myOpts["window"]: + import curses + + stdscr = curses.initscr() + screen = MyScreen(stdscr) + # add screen object as a reader to the reactor + reactor.addReader(screen) + + myFactory = HidenFactory(HidenProtocol, myDev, "\n") + lc = LoopingCall(device_display) + lc.start(0.250) + reactor.listenTCP(myPort, myFactory) # server + reactor.run()