# ***************************************************************************** # This program is free software; you can redistribute it and/or modify it under # the terms of the GNU General Public License as published by the Free Software # Foundation; either version 2 of the License, or (at your option) any later # version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more # details. # # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # Module authors: # Markus Zolliker # Jael Celia Lorenzana # Marek Bartkowiak # # ***************************************************************************** """soft PI control recipe using the PImixin: assume you have class Sensor inheriting from Readable, you create a new class: class SensorWithLoop(HasConvergence, PImixin, Sensor): pass and this is an example cfg Mod('T_sample', 'frappy_psi..SensorWithLoop', 'controlled T', meaning=['temperature', 20], output_module='htr_sample', p=100, i=60, ) recipe using PI: example cfg: Mod('T_softloop', 'frappy_psi.picontrol.PI', 'softloop controlled Temperature mixing chamber', input_module = 'ts', output_module = 'htr_mix', control_active = 1, output_max = 80000, p = 2E6, i = 10000, tlim = 1.0, ) """ import time import math import numpy as np from frappy.core import Readable, Writable, Parameter, Attached, IDLE, Property from frappy.lib import clamp, merge_status from frappy.datatypes import LimitsType, EnumType, FloatRange from frappy.errors import SECoPError from frappy.ctrlby import HasOutputModule, WrapControlledBy from frappy_psi.convergence import HasConvergence def ext_poll_value(mobj): prev = mobj.parameters['value'].timestamp mobj.doPoll() if mobj.parameters['value'].timestamp <= prev: # value was not updated mobj.read_value() # disable polling for the next interval interval = mobj.pollInfo.interval if interval: mobj.pollInfo.last_main = (time.time() // interval) * interval return mobj.value class PImixin(HasOutputModule, Writable): value = Parameter(unit='K', update_unchanged='always') p = Parameter('proportional term', FloatRange(0), readonly=False, default=1) i = Parameter('integral term', FloatRange(0), readonly=False, default=1) status = Parameter(update_unchanged='never') itime = Parameter('integration time', FloatRange(0, unit='s'), default=60, readonly=False) control_active = Parameter(readonly=False) # output_module is inherited output_range = Property('legacy output range', LimitsType(FloatRange()), default=(0, 0)) output_min = Parameter('min output', FloatRange(), default=0, readonly=False) output_max = Parameter('max output', FloatRange(), default=0, readonly=False) input_scale = Property('input scale', FloatRange(unit='$'), default=100) time_scale = Property('time scale', FloatRange(unit='s'), default=60) overflow = Parameter('overflow', FloatRange(), default=0, readonly=False) _lastdiff = None _lasttime = 0 _get_range = None # a function get output range from output_module _overflow = 0 _itime_set = None # True: 'itime' was set, False: 'i' was set _history = None __errcnt = 0 __inside_poll = False __cache = None # with input units K and output units %: # units for p: % / K # units for i: % / K / min def initModule(self): self.__cache = {} super().initModule() if self.output_range != (0, 0): # legacy ! self.output_min, self.output_max = self.output_range self.get_range_func() self.addCallback('value', self.__inside, 'value') self.addCallback('status', self.__inside, 'status') def __inside(self, value, pname): if self.__inside_poll is not None: self.__cache[pname] = value def doPoll(self): try: self.__inside_poll = True self.__cache = {} now = time.time() value = self.read_value() if self._history is None: # initialize a fixed size array, with fake time axis to avoid errors in np.polyfit self._history = np.array([(now+i, self.value) for i in range(-9,1)]) else: # shift fixed size array, and change last point self._history[:-1] = self._history[1:] self._history[-1] = (now, value) if not self.control_active: self._lastdiff = 0 return self.read_status() out = self.output_module deltat = clamp(0, now-self._lasttime, 10) self._lasttime = now diff = self.target - value if self._lastdiff is None: self._lastdiff = diff deltadiff = diff - self._lastdiff self._lastdiff = diff if diff: ref = self.itime / diff (slope, _), cov = np.polyfit(self._history[:, 0] - now, self._history[:, 1], 1, cov=True) slope_stddev = np.sqrt(max(0, cov[0, 0])) if slope * ref > 1 + 2 * slope_stddev * abs(ref): # extrapolated value will cross target in less than itime if self._overflow: self._overflow = 0 self.log.info('clear overflow') output, omin, omax = self.cvt2int(out.target) output += self._overflow + ( self.p * deltadiff + self.i * deltat * diff / self.time_scale) / self.input_scale if omin <= output <= omax: self._overflow = 0 else: # save overflow for next step if output < omin: self._overflow = output - omin output = omin else: self._overflow = output - omax output = omax out.update_target(self.name, self.cvt2ext(output)) self.__errcnt = 0 except Exception as e: if self.control_active: self.__errcnt += 1 if self.__errcnt > 5: self.__errcnt = 0 self.log.warning('too many errors - switch control off') self.write_control_active(False) raise finally: self.__inside_poll = False self.__cache = {} self.overflow = self._overflow def write_overflow(self, value): self._overflow = value def internal_poll(self): super().doPoll() def internal_read_value(self): return super().read_value() def internal_read_status(self): return super().read_status() def read_value(self): try: return self.__cache['value'] except KeyError: return self.internal_read_value() def read_status(self): try: return self.__cache['status'] except KeyError: pass status = IDLE, 'controlling' if self.control_active else 'inactive' if hasattr(super(), 'read_status'): status = merge_status(self.internal_read_status(), status) return status def get_range_func(self): out = self.output_module if hasattr(out, 'max_target'): if hasattr(self, 'min_target'): self._get_range = lambda o=out: (o.read_min_target(), o.read_max_target()) else: self._get_range = lambda o=out: (0, o.read_max_target()) elif hasattr(out, 'limit'): # mercury.HeaterOutput self._get_range = lambda o=out: (0, o.read_limit()) else: if self.output_min == self.output_max == 0: self.output_max = 1 self._get_range = lambda o=self: (o.output_min, o.output_max) if self.output_min == self.output_max == 0: self.output_min, self.output_max = self._get_range() def cvt2int(self, output): return (clamp(x, *self._get_range()) for x in (output, self.output_min, self.output_max)) def cvt2ext(self, output): return output def calc_itime(self, prop, integ): return prop * self.time_scale / integ def write_p(self, value): if self._itime_set: self.i = value * self.time_scale / self.itime elif self._itime_set is False: # means also not None self.itime = value * self.time_scale / self.i def write_i(self, value): self._itime_set = False self.itime = self.p * self.time_scale / value def write_itime(self, value): self._itime_set = True self.i = self.p * self.time_scale / value def set_target(self, value): if not self.control_active: self.activate_control() self.target = value self.doPoll() class PImixinSquare(PImixin): """unchecked: use square as output function""" def cvt2int(self, output): return (math.sqrt(max(0, 100 * clamp(x, *self._get_range()))) for x in (output, self.output_min, self.output_max)) def cvt2ext(self, output): return output ** 2 / 100 class PI(HasConvergence, PImixin): input_module = Attached(Readable, 'the input module') def internal_poll(self): inp = self.input_module inp.doPoll() interval = inp.pollInfo.interval if interval > 0: # disable next internal poll inp.pollInfo.last_main = (time.time() // interval) * interval self.read_value() self.read_status() def internal_read_value(self): return self.input_module.read_value() def internal_read_status(self): return self.input_module.read_status() def write_target(self, target): super().write_target(target) self.convergence_start() # unchecked! class PI2(PI): maxovershoot = Parameter('max. overshoot', FloatRange(0, 100, unit='%'), readonly=False, default=20) def doPoll(self): self.output_max = self.target * (1 + 0.01 * self.maxovershoot) self.output_min = self.target * (1 - 0.01 * self.maxovershoot) super().doPoll() def write_target(self, target): if not self.control_active: self.output.write_target(target) super().write_target(target) class PIctrl(WrapControlledBy, PI): """a pi controller which is controlled by another loop"""