# vim: ft=python ts=8 sts=4 sw=4 expandtab autoindent smartindent nocindent # Fake Hiden Temperature Controller # # Author: Douglas Clowes 2014 # from HidenDevice import HidenDevice import random import re import os import sys import time sys.path.insert(0, os.path.realpath(os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]),"../../../util")))) from fopdt import fopdt, fopdt_sink from pid import PID class Loop(fopdt): def __init__(self, the_temp, the_nick): fopdt.__init__(self, the_temp) self.setpoint = the_temp self.sensor = the_temp # P, I, D self.pid = PID(0.05, 0.02, 1.2) self.pid.setPoint(the_temp) # Value, Kp, Tp, Td, Absolute self.AddSource(fopdt_sink(0, 2, 13, 10, True)) self.AddSink(fopdt_sink(the_temp, 1, 30, 1, False)) self.power = 0 self.nick = the_nick self.count = 0 def Setpoint(self, the_sp): self.setpoint = the_sp self.pid.setPoint(the_sp) def doIteration(self): self.pid_delta = self.pid.update(self.pv) self.sources[0].value = self.pid_delta if self.sources[0].value > 100.0: self.sources[0].value = 100.0 if self.sources[0].value < 0.0: self.sources[0].value = 0.0 self.count += 1 self.iterate(self.count) self.sensor = 0.9 * self.sensor + 0.1 * self.setpoint class HidenXCS(HidenDevice): """Hiden XCS humidity controller object - simulates the device""" def __init__(self): HidenDevice.__init__(self) print HidenXCS.__name__, "ctor" self.RANDOM = 0.0 self.IDN = "Simulated Hiden XCS" self.CONFIG_LOOPS = [0, 1, 2] self.AnalogIn = {0:0, 1:0, 2:0, 8:0, 9:0, 12:0} self.AnalogOut = {0:0, 1:0, 2:0} self.DigitalOut = {2:0} self.humidity = 0.0 self.temperature = 49.22 self.Loops = {} self.Loops[0] = Loop(0, "Dry") self.Loops[1] = Loop(0, "Wet") self.Loops[2] = Loop(0, "Special") self.reset_powerup() def doCommand(self, command, params): print HidenXCS.__name__, "Command:", command, params return HidenDevice.doCommand(self, command, params) def doQuery(self, command, params): print HidenXCS.__name__, "Query:", command, params return HidenDevice.doQuery(self, command, params) def reset_powerup(self): print HidenXCS.__name__, "reset_powerup" self.LAST_ITERATION = 0 def doIteration(self): delta_time = time.time() - self.LAST_ITERATION if delta_time < 1: return #print "DoIteration:", delta_time self.LAST_ITERATION = time.time() self.flow = 0.0 for idx in self.CONFIG_LOOPS: if self.DigitalOut[2]: self.Loops[idx].setpoint = 500.0 * float(self.AnalogOut[idx]) / 4096.0 else: self.Loops[idx].setpoint = 0.0 self.Loops[idx].doIteration() self.AnalogIn[idx] = int(self.Loops[idx].sensor / 500.0 * 65536.0) self.flow += self.Loops[idx].sensor self.temperature = 50.00 if self.DigitalOut[2] and (self.AnalogIn[0] + self.AnalogIn[1] + self.AnalogIn[2]) > 0: self.humidity = 100.0 self.humidity *= self.AnalogIn[1] self.humidity /= (self.AnalogIn[0] + self.AnalogIn[1] + self.AnalogIn[2]) def doCommandAOUT(self, cmd, args): idx = int(args[0]) if idx in self.AnalogOut: self.AnalogOut[idx] = int(args[1]) else: print "Unknown index" print "Command: \"AOUT\" in \"" + cmd + ":" + ":".join(args) + "\"" def doCommandDOUT(self, cmd, args): idx = int(args[0]) if idx in self.DigitalOut: self.DigitalOut[idx] = int(args[1]) else: print "Unknown index" print "Command: \"DOUT\" in \"" + cmd + ":" + ":".join(args) + "\"" def doQueryAIN(self, cmd, args): idx = int(args[0]) if idx in self.AnalogIn: self.write("AIN = %d" % self.AnalogIn[idx]) else: self.write("AIN = %d" % 0) print "Query: \"AIN\" in \"" + cmd + ":" + ":".join(args) + "\"" def doQueryAOUT(self, cmd, args): idx = int(args[0]) if idx in self.AnalogOut: self.write("AOUT = %d" % self.AnalogOut[idx]) else: self.write("AOUT = %d" % 0) print "Query: \"AOUT\" in \"" + cmd + ":" + ":".join(args) + "\"" def doQueryDOUT(self, cmd, args): idx = int(args[0]) if idx in self.DigitalOut: self.write("DOUT = %d" % self.DigitalOut[idx]) else: self.write("DOUT = %d" % 0) print "Query: \"DOUT\" in \"" + cmd + ":" + ":".join(args) + "\"" def doQueryALL(self, cmd, args): line = "A " line += "%4.2f" % self.humidity line += ",%4.2f" % self.temperature line += ",5051.5" line += ",%5.2f" % (500.0 * self.AnalogIn[0] / 65536.0) line += ",%5.2f" % (500.0 * self.AnalogIn[1] / 65536.0) line += ",%5.2f" % (500.0 * self.AnalogIn[2] / 65536.0) line += ",%-99.59,1100" self.write(line) print "Query: \"ALL\" in \"" + cmd + ":" + ":".join(args) + "\"" if __name__ == '__main__': from HidenProtocol import HidenProtocol class TestFactory: def __init__(self): print self.__class__.__name__, "ctor" self.numProtocols = 0 def write(self, data): print "test write:", data, def loseConnection(self): print "test lose connection" test_factory = TestFactory() test_device = HidenXCS() test_protocol = HidenProtocol(test_device, "\n") test_protocol.factory = test_factory test_protocol.transport = test_factory test_device.protocol = test_protocol test_device.protocol.connectionMade() commands = [ "!DOUT,2,1", "?DOUT,2", "!AOUT,2,1", "?AOUT,2", "?AIN,2", "?ALL DATA"] for cmd in commands: test_device.protocol.dataReceived(cmd) test_device.protocol.dataReceived(test_protocol.term) test_device.protocol.connectionLost("Dunno")