Add the FOPDT and PID modules
The First Order Plus Delay Time module is a model of a controlled system that is useful in modelling and simulating process control systems. The Proportional Integral Differential module is a control system that is useful in controlling process control systems. This is not fully functional yet.
This commit is contained in:
123
site_ansto/instrument/util/fopdt.py
Normal file
123
site_ansto/instrument/util/fopdt.py
Normal file
@@ -0,0 +1,123 @@
|
|||||||
|
#!/usr/bin/env python
|
||||||
|
# vim: ft=python ts=8 sts=4 sw=4 expandtab autoindent smartindent nocindent
|
||||||
|
# Classes for device simulation using: First Order Plus Delay Time (FOPDT)
|
||||||
|
#
|
||||||
|
# Author: Unknown
|
||||||
|
# Finder: Douglas Clowes
|
||||||
|
"""
|
||||||
|
The class fopdt is a container for one or more fopdt_sink objects.
|
||||||
|
|
||||||
|
I am guessing that an fopdt_sink is a FOPDT source such as a heater element
|
||||||
|
or FOPDT sink such as a heat leak to the environment.
|
||||||
|
|
||||||
|
TODO:
|
||||||
|
* look into the isinstance block in getAbsolute
|
||||||
|
because there doesn't seem to be a 'current_value' anywhere
|
||||||
|
"""
|
||||||
|
from math import exp
|
||||||
|
|
||||||
|
class fopdt_sink:
|
||||||
|
def __init__(self, value, Kp, Tp, Td, absolute=False):
|
||||||
|
self.value = value
|
||||||
|
self.absolute = absolute
|
||||||
|
self.Kp = Kp
|
||||||
|
self.Tp = Tp
|
||||||
|
self.Td = Td
|
||||||
|
self.vmap = dict()
|
||||||
|
|
||||||
|
def getAbsolute(self):
|
||||||
|
if isinstance(self.value, fopdt):
|
||||||
|
result = self.value.current_value
|
||||||
|
else:
|
||||||
|
result = self.value
|
||||||
|
return result
|
||||||
|
|
||||||
|
def getValue(self, tm, current):
|
||||||
|
result = self.getAbsolute()
|
||||||
|
|
||||||
|
# Do the timeshifting implied by Td
|
||||||
|
t2 = round(tm)
|
||||||
|
t1 = round(tm - self.Td)
|
||||||
|
if not t2 in self.vmap:
|
||||||
|
self.vmap[t2] = result
|
||||||
|
for key in sorted(self.vmap.keys()):
|
||||||
|
if key < t1:
|
||||||
|
del self.vmap[key]
|
||||||
|
else:
|
||||||
|
break
|
||||||
|
if t1 in self.vmap:
|
||||||
|
result = self.vmap[t1]
|
||||||
|
else:
|
||||||
|
result = self.vmap[sorted(self.vmap.keys())[0]]
|
||||||
|
|
||||||
|
# TODO should this go before timeshifting?
|
||||||
|
if not self.absolute:
|
||||||
|
result = result - current
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
def getDelta(self, tm, current):
|
||||||
|
value = self.getValue(tm, current)
|
||||||
|
return (1 - exp(-1.0/self.Tp)) * self.Kp * value
|
||||||
|
|
||||||
|
class fopdt:
|
||||||
|
|
||||||
|
def __init__(self, pv):
|
||||||
|
self.pv = pv
|
||||||
|
self.sources = []
|
||||||
|
self.sinks = []
|
||||||
|
|
||||||
|
def AddSource(self, source):
|
||||||
|
self.sources.append(source)
|
||||||
|
|
||||||
|
def RemSource(self, source):
|
||||||
|
if source in self.sources:
|
||||||
|
self.sources.remove(source)
|
||||||
|
|
||||||
|
def AddSink(self, sink):
|
||||||
|
self.sinks.append(sink)
|
||||||
|
|
||||||
|
def RemSink(self, sink):
|
||||||
|
if sink in self.sinks:
|
||||||
|
self.sinks.remove(sink)
|
||||||
|
|
||||||
|
def iterate(self, tm):
|
||||||
|
self.source_delta = 0.0
|
||||||
|
self.sink_delta = 0.0
|
||||||
|
for sink in self.sources:
|
||||||
|
self.source_delta = self.source_delta + sink.getDelta(tm, self.pv)
|
||||||
|
for sink in self.sinks:
|
||||||
|
self.sink_delta = self.sink_delta + sink.getDelta(tm, self.pv)
|
||||||
|
self.old_value = self.pv
|
||||||
|
self.new_value = self.pv + self.source_delta + self.sink_delta
|
||||||
|
self.pv = self.new_value
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
dev = fopdt(20)
|
||||||
|
source = fopdt_sink(20, 2, 13, 10, False)
|
||||||
|
dev.AddSource(source)
|
||||||
|
dev.AddSource(source)
|
||||||
|
dev.AddSource(source)
|
||||||
|
dev.AddSource(source)
|
||||||
|
sink = fopdt_sink(20, 1, 30, 1, False)
|
||||||
|
dev.AddSink(sink)
|
||||||
|
dev.AddSink(sink)
|
||||||
|
min = max = dev.pv
|
||||||
|
fd = open("test.csv", "w")
|
||||||
|
fd.write("Time,value,source,source_v,sink,sink_v\n")
|
||||||
|
for i in range(0,300+ 1):
|
||||||
|
if i == 15:
|
||||||
|
source.value = 30
|
||||||
|
elif i == 45:
|
||||||
|
source.value = 10
|
||||||
|
dev.iterate(i)
|
||||||
|
current = dev.pv
|
||||||
|
if current > max:
|
||||||
|
max = current
|
||||||
|
if current < min:
|
||||||
|
min = current
|
||||||
|
#print "%3d: %6.3f = %6.3f + %6.3f + %6.3f" % ( i, current, prev, delta_in, delta_out )
|
||||||
|
line = "%d,%.3f,%.3f,%.3f,%.3f,%.3f" % (i, current, source.value, dev.source_delta, sink.value, dev.sink_delta)
|
||||||
|
fd.write(line + "\n")
|
||||||
|
fd.close()
|
||||||
|
print "Now: %6.3f, Min: %6.3f, Max: %6.3f" % (current, min, max)
|
||||||
119
site_ansto/instrument/util/pid.py
Normal file
119
site_ansto/instrument/util/pid.py
Normal file
@@ -0,0 +1,119 @@
|
|||||||
|
#!/usr/bin/env python
|
||||||
|
# vim: ft=python ts=8 sts=4 sw=4 expandtab autoindent smartindent nocindent
|
||||||
|
#
|
||||||
|
# Class for device simulation using: Proportional-Integral-Derivative (PID)
|
||||||
|
#
|
||||||
|
# This recipe gives simple implementation of a Discrete
|
||||||
|
# Proportional-Integral-Derivative (PID) controller.
|
||||||
|
#
|
||||||
|
# PID controller gives output value for error between desired reference input and
|
||||||
|
# measurement feedback to minimize error value.
|
||||||
|
#
|
||||||
|
# More information: http://en.wikipedia.org/wiki/PID_controller
|
||||||
|
#
|
||||||
|
# cnr437@gmail.com
|
||||||
|
#
|
||||||
|
####### Example #########
|
||||||
|
#
|
||||||
|
# p=PID(3.0,0.4,1.2)
|
||||||
|
# p.setPoint(5.0)
|
||||||
|
# while True:
|
||||||
|
# pid = p.update(measurement_value)
|
||||||
|
#
|
||||||
|
#
|
||||||
|
"""
|
||||||
|
TODO:
|
||||||
|
* Look into making the update time based because the logic seems to assume constant (1 second) time
|
||||||
|
- Derivator should be dE/dT but is just dE
|
||||||
|
- Integrator should be sigma(dT*E) but is just sigma(E)
|
||||||
|
* Look into making the Derivator based on changes in the PV instead of the Error
|
||||||
|
"""
|
||||||
|
|
||||||
|
class PID:
|
||||||
|
"""
|
||||||
|
Discrete PID control
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, P=2.0, I=0.0, D=1.0, Derivator=0, Integrator=0, Integrator_max=500, Integrator_min=-500):
|
||||||
|
|
||||||
|
self.Kp=P
|
||||||
|
self.Ki=I
|
||||||
|
self.Kd=D
|
||||||
|
self.Derivator=Derivator
|
||||||
|
self.Integrator=Integrator
|
||||||
|
self.Integrator_max=Integrator_max
|
||||||
|
self.Integrator_min=Integrator_min
|
||||||
|
|
||||||
|
self.set_point=0.0
|
||||||
|
self.error=0.0
|
||||||
|
|
||||||
|
def update(self,current_value):
|
||||||
|
"""
|
||||||
|
Calculate PID output value for given reference input and feedback
|
||||||
|
"""
|
||||||
|
|
||||||
|
self.error = self.set_point - current_value
|
||||||
|
|
||||||
|
self.P_value = self.Kp * self.error
|
||||||
|
# TODO: check the sign is correct
|
||||||
|
self.D_value = self.Kd * (current_value - self.Derivator)
|
||||||
|
self.Derivator = current_value
|
||||||
|
|
||||||
|
self.Integrator = self.Integrator + self.error
|
||||||
|
|
||||||
|
if self.Integrator > self.Integrator_max:
|
||||||
|
self.Integrator = self.Integrator_max
|
||||||
|
elif self.Integrator < self.Integrator_min:
|
||||||
|
self.Integrator = self.Integrator_min
|
||||||
|
|
||||||
|
self.I_value = self.Integrator * self.Ki
|
||||||
|
|
||||||
|
PID = self.P_value + self.I_value + self.D_value
|
||||||
|
|
||||||
|
return PID
|
||||||
|
|
||||||
|
def setPoint(self,set_point):
|
||||||
|
"""
|
||||||
|
Initilize the setpoint of PID
|
||||||
|
"""
|
||||||
|
self.set_point = set_point
|
||||||
|
self.Integrator=0
|
||||||
|
self.Derivator=0
|
||||||
|
|
||||||
|
def setIntegrator(self, Integrator):
|
||||||
|
self.Integrator = Integrator
|
||||||
|
|
||||||
|
def setDerivator(self, Derivator):
|
||||||
|
self.Derivator = Derivator
|
||||||
|
|
||||||
|
def setKp(self,P):
|
||||||
|
self.Kp=P
|
||||||
|
|
||||||
|
def setKi(self,I):
|
||||||
|
self.Ki=I
|
||||||
|
|
||||||
|
def setKd(self,D):
|
||||||
|
self.Kd=D
|
||||||
|
|
||||||
|
def getPoint(self):
|
||||||
|
return self.set_point
|
||||||
|
|
||||||
|
def getError(self):
|
||||||
|
return self.error
|
||||||
|
|
||||||
|
def getIntegrator(self):
|
||||||
|
return self.Integrator
|
||||||
|
|
||||||
|
def getDerivator(self):
|
||||||
|
return self.Derivator
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
#p=PID(3.0,0.4,1.2)
|
||||||
|
#p.setPoint(5.0)
|
||||||
|
#while True:
|
||||||
|
# pid = p.update(measurement_value)
|
||||||
|
p = PID(3.0, 0.4, 1.2)
|
||||||
|
p.setPoint(20.0)
|
||||||
|
pv = 20
|
||||||
|
for i in range(0,61):
|
||||||
|
print p.update(pv)
|
||||||
Reference in New Issue
Block a user