improvements when testing leiden

- triple current source
- software loop
This commit is contained in:
2025-10-30 12:24:34 +01:00
parent b45635e4f8
commit 71629c1d3a
6 changed files with 535 additions and 65 deletions

View File

@@ -34,7 +34,7 @@ from frappy.lib import formatStatusBits
from frappy.core import Done, Drivable, Parameter, Property, CommonReadHandler, CommonWriteHandler
from frappy.io import HasIO
from frappy_psi.channelswitcher import Channel, ChannelSwitcher
from frappy_psi.picontrol import HasConvergence
from frappy.ctrlby import WrapControlledBy
Status = Drivable.Status

View File

@@ -36,8 +36,8 @@ and this is an example cfg
'controlled T',
meaning=['temperature', 20],
output_module='htr_sample',
p=1,
i=0.01,
p=100,
i=60,
)
@@ -60,79 +60,164 @@ example cfg:
import time
import math
import numpy as np
from frappy.core import Readable, Writable, Parameter, Attached, IDLE, Property
from frappy.lib import clamp, merge_status
from frappy.datatypes import LimitsType, EnumType, FloatRange
from frappy.errors import SECoPError
from frappy.ctrlby import HasOutputModule, WrapControlledBy
from frappy_psi.convergence import HasConvergence
def ext_poll_value(mobj):
prev = mobj.parameters['value'].timestamp
mobj.doPoll()
if mobj.parameters['value'].timestamp <= prev:
# value was not updated
mobj.read_value()
# disable polling for the next interval
interval = mobj.pollInfo.interval
if interval:
mobj.pollInfo.last_main = (time.time() // interval) * interval
return mobj.value
class PImixin(HasOutputModule, Writable):
p = Parameter('proportional term', FloatRange(0), readonly=False)
i = Parameter('integral term', FloatRange(0), readonly=False)
value = Parameter(unit='K', update_unchanged='always')
p = Parameter('proportional term', FloatRange(0), readonly=False, default=1)
i = Parameter('integral term', FloatRange(0), readonly=False, default=1)
status = Parameter(update_unchanged='never')
itime = Parameter('integration time', FloatRange(0, unit='s'), default=60, readonly=False)
control_active = Parameter(readonly=False)
# output_module is inherited
output_range = Property('legacy output range', LimitsType(FloatRange()), default=(0,0))
output_range = Property('legacy output range', LimitsType(FloatRange()), default=(0, 0))
output_min = Parameter('min output', FloatRange(), default=0, readonly=False)
output_max = Parameter('max output', FloatRange(), default=0, readonly=False)
output_func = Parameter('output function',
EnumType(lin=0, square=1), readonly=False, value=0)
value = Parameter(unit='K')
input_scale = Property('input scale', FloatRange(unit='$'), default=100)
time_scale = Property('time scale', FloatRange(unit='s'), default=60)
overflow = Parameter('overflow', FloatRange(), default=0, readonly=False)
_lastdiff = None
_lasttime = 0
_get_range = None # a function get output range from output_module
_overflow = 0
_cvt2int = None
_cvt2ext = None
_itime_set = None # True: 'itime' was set, False: 'i' was set
_history = None
__errcnt = 0
__inside_poll = False
__cache = None
# with input units K and output units %:
# units for p: % / K
# units for i: % / K / min
def initModule(self):
self.__cache = {}
super().initModule()
if self.output_range != (0, 0): # legacy !
self.output_min, self.output_max = self.output_range
self.get_range_func()
self.addCallback('value', self.__inside, 'value')
self.addCallback('status', self.__inside, 'status')
def __inside(self, value, pname):
if self.__inside_poll is not None:
self.__cache[pname] = value
def doPoll(self):
super().doPoll()
if not self.control_active:
return
out = self.output_module
now = time.time()
deltat = clamp(0, now-self._lasttime, 10)
self._lasttime = now
diff = self.target - self.value
if self._lastdiff is None:
try:
self.__inside_poll = True
self.__cache = {}
now = time.time()
value = self.read_value()
if self._history is None:
# initialize a fixed size array, with fake time axis to avoid errors in np.polyfit
self._history = np.array([(now+i, self.value) for i in range(-9,1)])
else:
# shift fixed size array, and change last point
self._history[:-1] = self._history[1:]
self._history[-1] = (now, value)
if not self.control_active:
self._lastdiff = 0
return
self.read_status()
out = self.output_module
deltat = clamp(0, now-self._lasttime, 10)
self._lasttime = now
diff = self.target - value
if self._lastdiff is None:
self._lastdiff = diff
deltadiff = diff - self._lastdiff
self._lastdiff = diff
deltadiff = diff - self._lastdiff
self._lastdiff = diff
output, omin, omax = self._cvt2int(out.target)
output += self._overflow + self.p * deltadiff + self.i * deltat * diff
if output < omin:
self._overflow = max(omin - omax, output - omin)
output = omin
elif output > omax:
self._overflow = min(omax - omin, output - omax)
output = omax
else:
self._overflow = 0
out.update_target(self.name, self._cvt2ext(output))
if diff:
ref = self.itime / diff
(slope, _), cov = np.polyfit(self._history[:, 0] - now, self._history[:, 1], 1, cov=True)
slope_stddev = np.sqrt(max(0, cov[0, 0]))
if slope * ref > 1 + 2 * slope_stddev * abs(ref):
# extrapolated value will cross target in less than itime
if self._overflow:
self._overflow = 0
self.log.info('clear overflow')
output, omin, omax = self.cvt2int(out.target)
output += self._overflow + (
self.p * deltadiff +
self.i * deltat * diff / self.time_scale) / self.input_scale
if omin <= output <= omax:
self._overflow = 0
else:
# save overflow for next step
if output < omin:
self._overflow = output - omin
output = omin
else:
self._overflow = output - omax
output = omax
out.update_target(self.name, self.cvt2ext(output))
self.__errcnt = 0
except Exception as e:
if self.control_active:
self.__errcnt += 1
if self.__errcnt > 5:
self.__errcnt = 0
self.log.warning('too many errors - switch control off')
self.write_control_active(False)
raise
finally:
self.__inside_poll = False
self.__cache = {}
self.overflow = self._overflow
def write_overflow(self, value):
self._overflow = value
def internal_poll(self):
super().doPoll()
def internal_read_value(self):
return super().read_value()
def internal_read_status(self):
return super().read_status()
def read_value(self):
try:
return self.__cache['value']
except KeyError:
return self.internal_read_value()
def read_status(self):
try:
return self.__cache['status']
except KeyError:
pass
status = IDLE, 'controlling' if self.control_active else 'inactive'
if hasattr(super(), 'read_status'):
status = merge_status(super().read_status(), status)
status = merge_status(self.internal_read_status(), status)
return status
def cvt2int_square(self, output):
return (math.sqrt(max(0, clamp(x, *self._get_range()))) for x in (output, self.output_min, self.output_max))
def cvt2ext_square(self, output):
return output ** 2
def cvt2int_lin(self, output):
return (clamp(x, *self._get_range()) for x in (output, self.output_min, self.output_max))
def cvt2ext_lin(self, output):
return output
def write_output_func(self, value):
def get_range_func(self):
out = self.output_module
if hasattr(out, 'max_target'):
if hasattr(self, 'min_target'):
@@ -147,32 +232,72 @@ class PImixin(HasOutputModule, Writable):
self._get_range = lambda o=self: (o.output_min, o.output_max)
if self.output_min == self.output_max == 0:
self.output_min, self.output_max = self._get_range()
self.output_func = value
self._cvt2int = getattr(self, f'cvt2int_{self.output_func.name}')
self._cvt2ext = getattr(self, f'cvt2ext_{self.output_func.name}')
# not needed, done by HasOutputModule.write_control_active
# def write_control_active(self, value):
# super().write_control_active(value)
# out = self.output_module
# if not value:
# out.write_target(out.parameters['target'].datatype.default)
def cvt2int(self, output):
return (clamp(x, *self._get_range()) for x in (output, self.output_min, self.output_max))
def cvt2ext(self, output):
return output
def calc_itime(self, prop, integ):
return prop * self.time_scale / integ
def write_p(self, value):
if self._itime_set:
self.i = value * self.time_scale / self.itime
elif self._itime_set is False: # means also not None
self.itime = value * self.time_scale / self.i
def write_i(self, value):
self._itime_set = False
self.itime = self.p * self.time_scale / value
def write_itime(self, value):
self._itime_set = True
self.i = self.p * self.time_scale / value
def set_target(self, value):
if not self.control_active:
self.activate_control()
self.target = value
self.doPoll()
class PImixinSquare(PImixin):
"""unchecked: use square as output function"""
def cvt2int(self, output):
return (math.sqrt(max(0, 100 * clamp(x, *self._get_range())))
for x in (output, self.output_min, self.output_max))
def cvt2ext(self, output):
return output ** 2 / 100
class PI(HasConvergence, PImixin):
input_module = Attached(Readable, 'the input module')
def read_value(self):
return self.input_module.value
def internal_poll(self):
inp = self.input_module
inp.doPoll()
interval = inp.pollInfo.interval
if interval > 0:
# disable next internal poll
inp.pollInfo.last_main = (time.time() // interval) * interval
self.read_value()
self.read_status()
def read_status(self):
return self.input_module.status
def internal_read_value(self):
return self.input_module.read_value()
def internal_read_status(self):
return self.input_module.read_status()
def write_target(self, target):
super().write_target(target)
self.convergence_start()
# unchecked!
class PI2(PI):
maxovershoot = Parameter('max. overshoot', FloatRange(0, 100, unit='%'), readonly=False, default=20)

View File

@@ -21,6 +21,7 @@
from frappy.core import StringIO, HasIO, Writable, Parameter, Property, FloatRange, IntRange, BoolType, \
ERROR
from frappy.errors import CommunicationFailedError, HardwareError
from frappy.ctrlby import WrapControlledBy
class IO(StringIO):
@@ -36,7 +37,7 @@ class Heater(HasIO, Writable):
channel = Property('channel (source number)', IntRange(1, 3))
value = Parameter('current reading', FloatRange(0, 0.1, unit='A'))
target = Parameter('current target value', FloatRange(0, 0.1, unit='A'), readonly=False)
on = Parameter('turn current on/off', BoolType(), readonly=False)
on = Parameter('turn current on/off', BoolType(), readonly=False, default=False)
def query_status(self):
reply, txtvalue = self.communicate('STATUS?').split('\t')
@@ -64,13 +65,14 @@ class Heater(HasIO, Writable):
if reply != '0': # not as in manual
raise CommunicationFailedError(f'Bad reply: {reply!r}')
def read_target(self):
def read_value(self):
txtvalue = self.query_status()
current_range = txtvalue[(self.channel - 1) * 4 + 1]
current = txtvalue[(self.channel - 1) * 4 + 1 + 1] # percent of range
multipliers = {'1': 99e-6, '2': 990e-6, '3': 9900e-6, '4': 99e-3}
multipliers = {'1': 1e-4, '2': 1e-3, '3': 1e-2, '4': 1e-1}
value = float(current) / 100 * float(multipliers[current_range])
return value
# no measured value available
read_value = read_target
class WrappedHeater(WrapControlledBy, Heater):
pass