diff --git a/site_ansto/instrument/util/fopdt.py b/site_ansto/instrument/util/fopdt.py new file mode 100644 index 00000000..5764e8f9 --- /dev/null +++ b/site_ansto/instrument/util/fopdt.py @@ -0,0 +1,123 @@ +#!/usr/bin/env python +# vim: ft=python ts=8 sts=4 sw=4 expandtab autoindent smartindent nocindent +# Classes for device simulation using: First Order Plus Delay Time (FOPDT) +# +# Author: Unknown +# Finder: Douglas Clowes +""" +The class fopdt is a container for one or more fopdt_sink objects. + +I am guessing that an fopdt_sink is a FOPDT source such as a heater element +or FOPDT sink such as a heat leak to the environment. + +TODO: +* look into the isinstance block in getAbsolute + because there doesn't seem to be a 'current_value' anywhere +""" +from math import exp + +class fopdt_sink: + def __init__(self, value, Kp, Tp, Td, absolute=False): + self.value = value + self.absolute = absolute + self.Kp = Kp + self.Tp = Tp + self.Td = Td + self.vmap = dict() + + def getAbsolute(self): + if isinstance(self.value, fopdt): + result = self.value.current_value + else: + result = self.value + return result + + def getValue(self, tm, current): + result = self.getAbsolute() + + # Do the timeshifting implied by Td + t2 = round(tm) + t1 = round(tm - self.Td) + if not t2 in self.vmap: + self.vmap[t2] = result + for key in sorted(self.vmap.keys()): + if key < t1: + del self.vmap[key] + else: + break + if t1 in self.vmap: + result = self.vmap[t1] + else: + result = self.vmap[sorted(self.vmap.keys())[0]] + + # TODO should this go before timeshifting? + if not self.absolute: + result = result - current + + return result + + def getDelta(self, tm, current): + value = self.getValue(tm, current) + return (1 - exp(-1.0/self.Tp)) * self.Kp * value + +class fopdt: + + def __init__(self, pv): + self.pv = pv + self.sources = [] + self.sinks = [] + + def AddSource(self, source): + self.sources.append(source) + + def RemSource(self, source): + if source in self.sources: + self.sources.remove(source) + + def AddSink(self, sink): + self.sinks.append(sink) + + def RemSink(self, sink): + if sink in self.sinks: + self.sinks.remove(sink) + + def iterate(self, tm): + self.source_delta = 0.0 + self.sink_delta = 0.0 + for sink in self.sources: + self.source_delta = self.source_delta + sink.getDelta(tm, self.pv) + for sink in self.sinks: + self.sink_delta = self.sink_delta + sink.getDelta(tm, self.pv) + self.old_value = self.pv + self.new_value = self.pv + self.source_delta + self.sink_delta + self.pv = self.new_value + +if __name__ == "__main__": + dev = fopdt(20) + source = fopdt_sink(20, 2, 13, 10, False) + dev.AddSource(source) + dev.AddSource(source) + dev.AddSource(source) + dev.AddSource(source) + sink = fopdt_sink(20, 1, 30, 1, False) + dev.AddSink(sink) + dev.AddSink(sink) + min = max = dev.pv + fd = open("test.csv", "w") + fd.write("Time,value,source,source_v,sink,sink_v\n") + for i in range(0,300+ 1): + if i == 15: + source.value = 30 + elif i == 45: + source.value = 10 + dev.iterate(i) + current = dev.pv + if current > max: + max = current + if current < min: + min = current + #print "%3d: %6.3f = %6.3f + %6.3f + %6.3f" % ( i, current, prev, delta_in, delta_out ) + line = "%d,%.3f,%.3f,%.3f,%.3f,%.3f" % (i, current, source.value, dev.source_delta, sink.value, dev.sink_delta) + fd.write(line + "\n") + fd.close() + print "Now: %6.3f, Min: %6.3f, Max: %6.3f" % (current, min, max) diff --git a/site_ansto/instrument/util/pid.py b/site_ansto/instrument/util/pid.py new file mode 100644 index 00000000..c1fbd006 --- /dev/null +++ b/site_ansto/instrument/util/pid.py @@ -0,0 +1,119 @@ +#!/usr/bin/env python +# vim: ft=python ts=8 sts=4 sw=4 expandtab autoindent smartindent nocindent +# +# Class for device simulation using: Proportional-Integral-Derivative (PID) +# +# This recipe gives simple implementation of a Discrete +# Proportional-Integral-Derivative (PID) controller. +# +# PID controller gives output value for error between desired reference input and +# measurement feedback to minimize error value. +# +# More information: http://en.wikipedia.org/wiki/PID_controller +# +# cnr437@gmail.com +# +####### Example ######### +# +# p=PID(3.0,0.4,1.2) +# p.setPoint(5.0) +# while True: +# pid = p.update(measurement_value) +# +# +""" +TODO: +* Look into making the update time based because the logic seems to assume constant (1 second) time + - Derivator should be dE/dT but is just dE + - Integrator should be sigma(dT*E) but is just sigma(E) +* Look into making the Derivator based on changes in the PV instead of the Error +""" + +class PID: + """ + Discrete PID control + """ + + def __init__(self, P=2.0, I=0.0, D=1.0, Derivator=0, Integrator=0, Integrator_max=500, Integrator_min=-500): + + self.Kp=P + self.Ki=I + self.Kd=D + self.Derivator=Derivator + self.Integrator=Integrator + self.Integrator_max=Integrator_max + self.Integrator_min=Integrator_min + + self.set_point=0.0 + self.error=0.0 + + def update(self,current_value): + """ + Calculate PID output value for given reference input and feedback + """ + + self.error = self.set_point - current_value + + self.P_value = self.Kp * self.error + # TODO: check the sign is correct + self.D_value = self.Kd * (current_value - self.Derivator) + self.Derivator = current_value + + self.Integrator = self.Integrator + self.error + + if self.Integrator > self.Integrator_max: + self.Integrator = self.Integrator_max + elif self.Integrator < self.Integrator_min: + self.Integrator = self.Integrator_min + + self.I_value = self.Integrator * self.Ki + + PID = self.P_value + self.I_value + self.D_value + + return PID + + def setPoint(self,set_point): + """ + Initilize the setpoint of PID + """ + self.set_point = set_point + self.Integrator=0 + self.Derivator=0 + + def setIntegrator(self, Integrator): + self.Integrator = Integrator + + def setDerivator(self, Derivator): + self.Derivator = Derivator + + def setKp(self,P): + self.Kp=P + + def setKi(self,I): + self.Ki=I + + def setKd(self,D): + self.Kd=D + + def getPoint(self): + return self.set_point + + def getError(self): + return self.error + + def getIntegrator(self): + return self.Integrator + + def getDerivator(self): + return self.Derivator + +if __name__ == "__main__": +#p=PID(3.0,0.4,1.2) +#p.setPoint(5.0) +#while True: +# pid = p.update(measurement_value) + p = PID(3.0, 0.4, 1.2) + p.setPoint(20.0) + pv = 20 + for i in range(0,61): + print p.update(pv)