181 lines
6.1 KiB
Python
181 lines
6.1 KiB
Python
# vim: ft=python ts=8 sts=4 sw=4 expandtab autoindent smartindent nocindent
|
|
# Fake Hiden Temperature Controller
|
|
#
|
|
# Author: Douglas Clowes 2014
|
|
#
|
|
from HidenDevice import HidenDevice
|
|
import random
|
|
import re
|
|
import os
|
|
import sys
|
|
import time
|
|
|
|
sys.path.insert(0, os.path.realpath(os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]),"../../../util"))))
|
|
from fopdt import fopdt, fopdt_sink
|
|
from pid import PID
|
|
|
|
class Loop(fopdt):
|
|
def __init__(self, the_temp, the_nick):
|
|
fopdt.__init__(self, the_temp)
|
|
self.setpoint = the_temp
|
|
self.sensor = the_temp
|
|
# P, I, D
|
|
self.pid = PID(0.05, 0.02, 1.2)
|
|
self.pid.setPoint(the_temp)
|
|
# Value, Kp, Tp, Td, Absolute
|
|
self.AddSource(fopdt_sink(0, 2, 13, 10, True))
|
|
self.AddSink(fopdt_sink(the_temp, 1, 30, 1, False))
|
|
self.power = 0
|
|
self.nick = the_nick
|
|
self.count = 0
|
|
|
|
def Setpoint(self, the_sp):
|
|
self.setpoint = the_sp
|
|
self.pid.setPoint(the_sp)
|
|
|
|
def doIteration(self):
|
|
self.pid_delta = self.pid.update(self.pv)
|
|
self.sources[0].value = self.pid_delta
|
|
if self.sources[0].value > 100.0:
|
|
self.sources[0].value = 100.0
|
|
if self.sources[0].value < 0.0:
|
|
self.sources[0].value = 0.0
|
|
self.count += 1
|
|
self.iterate(self.count)
|
|
self.sensor = 0.9 * self.sensor + 0.1 * self.setpoint
|
|
|
|
class HidenXCS(HidenDevice):
|
|
"""Hiden XCS humidity controller object - simulates the device"""
|
|
|
|
def __init__(self):
|
|
HidenDevice.__init__(self)
|
|
print HidenXCS.__name__, "ctor"
|
|
self.RANDOM = 0.0
|
|
self.IDN = "Simulated Hiden XCS"
|
|
self.CONFIG_LOOPS = [0, 1, 2]
|
|
self.AnalogIn = {0:0, 1:0, 2:0, 8:0, 9:0, 12:0}
|
|
self.AnalogOut = {0:0, 1:0, 2:0}
|
|
self.DigitalOut = {2:0}
|
|
self.humidity = 0.0
|
|
self.temperature = 49.22
|
|
self.Loops = {}
|
|
self.Loops[0] = Loop(0, "Dry")
|
|
self.Loops[1] = Loop(0, "Wet")
|
|
self.Loops[2] = Loop(0, "Special")
|
|
self.reset_powerup()
|
|
|
|
def doCommand(self, command, params):
|
|
print HidenXCS.__name__, "Command:", command, params
|
|
return HidenDevice.doCommand(self, command, params)
|
|
|
|
def doQuery(self, command, params):
|
|
print HidenXCS.__name__, "Query:", command, params
|
|
return HidenDevice.doQuery(self, command, params)
|
|
|
|
def reset_powerup(self):
|
|
print HidenXCS.__name__, "reset_powerup"
|
|
self.LAST_ITERATION = 0
|
|
|
|
def doIteration(self):
|
|
delta_time = time.time() - self.LAST_ITERATION
|
|
if delta_time < 1:
|
|
return
|
|
#print "DoIteration:", delta_time
|
|
self.LAST_ITERATION = time.time()
|
|
self.flow = 0.0
|
|
for idx in self.CONFIG_LOOPS:
|
|
if self.DigitalOut[2]:
|
|
self.Loops[idx].setpoint = 500.0 * float(self.AnalogOut[idx]) / 4096.0
|
|
else:
|
|
self.Loops[idx].setpoint = 0.0
|
|
self.Loops[idx].doIteration()
|
|
self.AnalogIn[idx] = int(self.Loops[idx].sensor / 500.0 * 65536.0)
|
|
self.flow += self.Loops[idx].sensor
|
|
self.temperature = 50.00
|
|
if self.DigitalOut[2] and (self.AnalogIn[0] + self.AnalogIn[1] + self.AnalogIn[2]) > 0:
|
|
self.humidity = 100.0
|
|
self.humidity *= self.AnalogIn[1]
|
|
self.humidity /= (self.AnalogIn[0] + self.AnalogIn[1] + self.AnalogIn[2])
|
|
|
|
def doCommandAOUT(self, cmd, args):
|
|
idx = int(args[0])
|
|
if idx in self.AnalogOut:
|
|
self.AnalogOut[idx] = int(args[1])
|
|
else:
|
|
print "Unknown index"
|
|
print "Command: \"AOUT\" in \"" + cmd + ":" + ":".join(args) + "\""
|
|
|
|
def doCommandDOUT(self, cmd, args):
|
|
idx = int(args[0])
|
|
if idx in self.DigitalOut:
|
|
self.DigitalOut[idx] = int(args[1])
|
|
else:
|
|
print "Unknown index"
|
|
print "Command: \"DOUT\" in \"" + cmd + ":" + ":".join(args) + "\""
|
|
|
|
def doQueryAIN(self, cmd, args):
|
|
idx = int(args[0])
|
|
if idx in self.AnalogIn:
|
|
self.write("AIN = %d" % self.AnalogIn[idx])
|
|
else:
|
|
self.write("AIN = %d" % 0)
|
|
print "Query: \"AIN\" in \"" + cmd + ":" + ":".join(args) + "\""
|
|
|
|
def doQueryAOUT(self, cmd, args):
|
|
idx = int(args[0])
|
|
if idx in self.AnalogOut:
|
|
self.write("AOUT = %d" % self.AnalogOut[idx])
|
|
else:
|
|
self.write("AOUT = %d" % 0)
|
|
print "Query: \"AOUT\" in \"" + cmd + ":" + ":".join(args) + "\""
|
|
|
|
def doQueryDOUT(self, cmd, args):
|
|
idx = int(args[0])
|
|
if idx in self.DigitalOut:
|
|
self.write("DOUT = %d" % self.DigitalOut[idx])
|
|
else:
|
|
self.write("DOUT = %d" % 0)
|
|
print "Query: \"DOUT\" in \"" + cmd + ":" + ":".join(args) + "\""
|
|
|
|
def doQueryALL(self, cmd, args):
|
|
line = "A "
|
|
line += "%4.2f" % self.humidity
|
|
line += ",%4.2f" % self.temperature
|
|
line += ",5051.5"
|
|
line += ",%5.2f" % (500.0 * self.AnalogIn[0] / 65536.0)
|
|
line += ",%5.2f" % (500.0 * self.AnalogIn[1] / 65536.0)
|
|
line += ",%5.2f" % (500.0 * self.AnalogIn[2] / 65536.0)
|
|
line += ",%-99.59,1100"
|
|
self.write(line)
|
|
print "Query: \"ALL\" in \"" + cmd + ":" + ":".join(args) + "\""
|
|
|
|
if __name__ == '__main__':
|
|
from HidenProtocol import HidenProtocol
|
|
|
|
class TestFactory:
|
|
def __init__(self):
|
|
print self.__class__.__name__, "ctor"
|
|
self.numProtocols = 0
|
|
def write(self, data):
|
|
print "test write:", data,
|
|
def loseConnection(self):
|
|
print "test lose connection"
|
|
test_factory = TestFactory()
|
|
test_device = HidenXCS()
|
|
test_protocol = HidenProtocol(test_device, "\n")
|
|
test_protocol.factory = test_factory
|
|
test_protocol.transport = test_factory
|
|
test_device.protocol = test_protocol
|
|
test_device.protocol.connectionMade()
|
|
commands = [
|
|
"!DOUT,2,1",
|
|
"?DOUT,2",
|
|
"!AOUT,2,1",
|
|
"?AOUT,2",
|
|
"?AIN,2",
|
|
"?ALL DATA"]
|
|
for cmd in commands:
|
|
test_device.protocol.dataReceived(cmd)
|
|
test_device.protocol.dataReceived(test_protocol.term)
|
|
test_device.protocol.connectionLost("Dunno")
|