[WIP] work on curses cfg editor
state as of 2026-01-28 Change-Id: I73d2fa4e6fda8820a95fe4e7256c7a23bf565f67
This commit is contained in:
@@ -22,8 +22,6 @@ import time
|
||||
import math
|
||||
import random
|
||||
import threading
|
||||
import numpy as np
|
||||
from numpy.testing import assert_approx_equal
|
||||
|
||||
from frappy.core import Module, Readable, Parameter, Property, \
|
||||
HasIO, StringIO, Writable, IDLE, ERROR, BUSY, DISABLED, nopoll, Attached
|
||||
@@ -32,11 +30,14 @@ from frappy.datatypes import IntRange, FloatRange, StringType, \
|
||||
from frappy.errors import CommunicationFailedError, ConfigError, \
|
||||
HardwareError, DisabledError, ImpossibleError, secop_error, SECoPError
|
||||
from frappy.lib.units import NumberWithUnit, format_with_unit
|
||||
from frappy.lib import formatStatusBits
|
||||
from frappy.lib import formatStatusBits, LazyImport
|
||||
from frappy_psi.convergence import HasConvergence
|
||||
from frappy.mixins import HasOutputModule, HasControlledBy
|
||||
from frappy.extparams import StructParam
|
||||
from frappy_psi.calcurve import CalCurve
|
||||
|
||||
np = LazyImport('numpy')
|
||||
np_testing = LazyImport('numpy.testing')
|
||||
calcurve_module = LazyImport('frappy_psi.calcurve')
|
||||
|
||||
|
||||
def string_to_num(string):
|
||||
@@ -419,7 +420,7 @@ class Device(HasLscIO, Module):
|
||||
"""check whether a returned calibration point is equal within curve point precision"""
|
||||
for v1, v2, eps in zip(left, right, fixeps):
|
||||
try:
|
||||
assert_approx_equal(v1, v2, significant, verbose=False)
|
||||
np_testing.assert_approx_equal(v1, v2, significant, verbose=False)
|
||||
except AssertionError:
|
||||
return abs(v1 - v2) < eps
|
||||
return True
|
||||
@@ -464,7 +465,7 @@ class CurveRequest:
|
||||
self.action = device.find_curve
|
||||
self.new_sensors = set()
|
||||
self.sensors = {sensor.channel: sensor}
|
||||
calcurve = CalCurve(sensor.calcurve)
|
||||
calcurve = calcurve_module.CalCurve(sensor.calcurve)
|
||||
equipment_id = device.propertyValues.get('original_id') or device.secNode.equipment_id
|
||||
name = f"{equipment_id.split('.')[0]}.{sensor.name}"
|
||||
sn = calcurve.calibname
|
||||
|
||||
@@ -101,9 +101,8 @@ class PImixin(HasOutputModule, Writable):
|
||||
_lastdiff = None
|
||||
_lasttime = 0
|
||||
_get_range = None # a function get output range from output_module
|
||||
_overflow = 0
|
||||
_overflow = None # history of overflow (is not zero when integration overflows output range)
|
||||
_itime_set = None # True: 'itime' was set, False: 'i' was set
|
||||
_history = None
|
||||
__errcnt = 0
|
||||
__inside_poll = False
|
||||
__cache = None
|
||||
@@ -114,6 +113,7 @@ class PImixin(HasOutputModule, Writable):
|
||||
|
||||
def initModule(self):
|
||||
self.__cache = {}
|
||||
self._overflow = np.zeros(10)
|
||||
super().initModule()
|
||||
if self.output_range != (0, 0): # legacy !
|
||||
self.output_min, self.output_max = self.output_range
|
||||
@@ -131,13 +131,6 @@ class PImixin(HasOutputModule, Writable):
|
||||
self.__cache = {}
|
||||
now = time.time()
|
||||
value = self.read_value()
|
||||
if self._history is None:
|
||||
# initialize a fixed size array, with fake time axis to avoid errors in np.polyfit
|
||||
self._history = np.array([(now+i, self.value) for i in range(-9,1)])
|
||||
else:
|
||||
# shift fixed size array, and change last point
|
||||
self._history[:-1] = self._history[1:]
|
||||
self._history[-1] = (now, value)
|
||||
if not self.control_active:
|
||||
self._lastdiff = 0
|
||||
return
|
||||
@@ -150,30 +143,34 @@ class PImixin(HasOutputModule, Writable):
|
||||
self._lastdiff = diff
|
||||
deltadiff = diff - self._lastdiff
|
||||
self._lastdiff = diff
|
||||
if diff:
|
||||
ref = self.itime / diff
|
||||
(slope, _), cov = np.polyfit(self._history[:, 0] - now, self._history[:, 1], 1, cov=True)
|
||||
slope_stddev = np.sqrt(max(0, cov[0, 0]))
|
||||
if slope * ref > 1 + 2 * slope_stddev * abs(ref):
|
||||
# extrapolated value will cross target in less than itime
|
||||
if self._overflow:
|
||||
self._overflow = 0
|
||||
self.log.info('clear overflow')
|
||||
|
||||
output, omin, omax = self.cvt2int(out.target)
|
||||
output += self._overflow + (
|
||||
output += self._overflow[-1] + (
|
||||
self.p * deltadiff +
|
||||
self.i * deltat * diff / self.time_scale) / self.input_scale
|
||||
if omin <= output <= omax:
|
||||
self._overflow = 0
|
||||
overflow = 0
|
||||
else:
|
||||
# save overflow for next step
|
||||
if output < omin:
|
||||
self._overflow = output - omin
|
||||
overflow = output - omin
|
||||
output = omin
|
||||
else:
|
||||
self._overflow = output - omax
|
||||
overflow = output - omax
|
||||
output = omax
|
||||
if overflow:
|
||||
# fit a straight line
|
||||
(slope, beg), cov = np.polyfit(range(self._overflow), self._overflow, 1, cov=True)
|
||||
sign = np.copysign(1, overflow)
|
||||
end = beg + slope * len(self._overflow)
|
||||
# reduce the absolute value of overflow by the minimum distance of the fitted
|
||||
# line to zero, with a margin of 3 * stddev
|
||||
shift = max(0, min(overflow * sign, min(beg * sign, end * sign) - 3 * np.sqrt(cov[1, 1]))) * sign
|
||||
if shift:
|
||||
overflow -= shift
|
||||
self._overflow -= shift
|
||||
self._overflow[:-1] = self._overflow[1:]
|
||||
self._overflow[-1] = overflow
|
||||
out.update_target(self.name, self.cvt2ext(output))
|
||||
self.__errcnt = 0
|
||||
except Exception as e:
|
||||
@@ -187,10 +184,10 @@ class PImixin(HasOutputModule, Writable):
|
||||
finally:
|
||||
self.__inside_poll = False
|
||||
self.__cache = {}
|
||||
self.overflow = self._overflow
|
||||
self.overflow = self._overflow[-1]
|
||||
|
||||
def write_overflow(self, value):
|
||||
self._overflow = value
|
||||
self._overflow.fill(value)
|
||||
|
||||
def internal_poll(self):
|
||||
super().doPoll()
|
||||
|
||||
Reference in New Issue
Block a user