Startup
This commit is contained in:
186
script/Lib_old/mathutils.py
Normal file
186
script/Lib_old/mathutils.py
Normal file
@@ -0,0 +1,186 @@
|
||||
import sys
|
||||
|
||||
import org.apache.commons.math3.util.FastMath as FastMath
|
||||
|
||||
import org.apache.commons.math3.stat.regression.SimpleRegression as SimpleRegression
|
||||
|
||||
import org.apache.commons.math3.analysis.polynomials.PolynomialFunction as PolynomialFunction
|
||||
import org.apache.commons.math3.analysis.function.Gaussian as Gaussian
|
||||
import org.apache.commons.math3.analysis.function.HarmonicOscillator as HarmonicOscillator
|
||||
import org.apache.commons.math3.analysis.solvers.LaguerreSolver as LaguerreSolver
|
||||
|
||||
import org.apache.commons.math3.fitting.GaussianCurveFitter as GaussianCurveFitter
|
||||
import org.apache.commons.math3.fitting.PolynomialCurveFitter as PolynomialCurveFitter
|
||||
import org.apache.commons.math3.fitting.HarmonicCurveFitter as HarmonicCurveFitter
|
||||
import org.apache.commons.math3.fitting.WeightedObservedPoint as WeightedObservedPoint
|
||||
|
||||
import org.apache.commons.math3.analysis.differentiation.FiniteDifferencesDifferentiator as FiniteDifferencesDifferentiator
|
||||
import org.apache.commons.math3.analysis.differentiation.DerivativeStructure as DerivativeStructure
|
||||
|
||||
|
||||
|
||||
def calculate_peaks(function, start_value = sys.float_info.min, end_value = sys.float_info.max, positive=True):
|
||||
"""Calculate peaks of a DifferentiableUnivariateFunction in a given range by finding the roots of the derivative
|
||||
|
||||
Args:
|
||||
function(DifferentiableUnivariateFunction): The function object.
|
||||
start_value(float, optional): start of range
|
||||
end_value(float, optional): end of range
|
||||
positive (boolean, optional): True for searching positive peaks, False for negative.
|
||||
Returns:
|
||||
List of peaks in the interval
|
||||
|
||||
"""
|
||||
derivative = function.derivative()
|
||||
derivative2 = derivative.derivative()
|
||||
ret = []
|
||||
solver = LaguerreSolver()
|
||||
for complex in solver.solveAllComplex(derivative.coefficients, start_value):
|
||||
r = complex.real
|
||||
if start_value < r < end_value:
|
||||
if (positive and (derivative2.value(r) < 0)) or ( (not positive) and (derivative2.value(r) > 0)):
|
||||
ret.append(r)
|
||||
return ret
|
||||
|
||||
|
||||
def estimate_peak_indexes(data, xdata = None, threshold = None, min_peak_distance = None, positive = True):
|
||||
"""Estimation of peaks in an array by ordering local maxima according to given criteria.
|
||||
|
||||
Args:
|
||||
data(float array or list)
|
||||
xdata(float array or list, optional): if not None must have the same length as data.
|
||||
threshold(float, optional): if specified filter peaks below this value
|
||||
min_peak_distance(float, optional): if specified defines minimum distance between two peaks.
|
||||
if xdata == None, it represents index counts, otherwise in xdata units.
|
||||
positive (boolean, optional): True for searching positive peaks, False for negative.
|
||||
Returns:
|
||||
List of peaks indexes.
|
||||
"""
|
||||
peaks = []
|
||||
indexes = sorted(range(len(data)),key=lambda x:data[x])
|
||||
if positive:
|
||||
indexes = reversed(indexes)
|
||||
for index in indexes:
|
||||
first = (index == 0)
|
||||
last = (index == (len(data)-1))
|
||||
val=data[index]
|
||||
prev = float('NaN') if first else data[index-1]
|
||||
next = float('NaN') if last else data[index+1]
|
||||
|
||||
if threshold is not None:
|
||||
if (positive and (val<threshold)) or ((not positive) and (val>threshold)):
|
||||
break
|
||||
if ( positive and (first or val>prev ) and (last or val>=next ) ) or (
|
||||
(not positive) and (first or val<prev ) and (last or val<=next ) ):
|
||||
append = True
|
||||
if min_peak_distance is not None:
|
||||
for peak in peaks:
|
||||
if ((xdata is None) and (abs(peak-index) < min_peak_distance)) or (
|
||||
(xdata is not None) and (abs(xdata[peak]-xdata[index]) < min_peak_distance)):
|
||||
append = False
|
||||
break
|
||||
if append:
|
||||
peaks.append(index)
|
||||
return peaks
|
||||
|
||||
|
||||
def fit_gaussians(y, x, peak_indexes):
|
||||
"""Fits data on multiple gaussians on the given peak indexes.
|
||||
|
||||
Args:
|
||||
x(float array or list)
|
||||
y(float array or list)
|
||||
peak_indexes(list of int)
|
||||
Returns:
|
||||
List of tuples of gaussian parameters: (normalization, mean, sigma)
|
||||
"""
|
||||
ret = []
|
||||
|
||||
minimum = min(y)
|
||||
for peak in peak_indexes:
|
||||
#Copy data
|
||||
data = y[:]
|
||||
#Remover data from other peaks
|
||||
for p in peak_indexes:
|
||||
limit = int(round((p+peak)/2))
|
||||
if (p > peak):
|
||||
data[limit : len(y)] =[minimum] * (len(y)-limit)
|
||||
elif (p < peak):
|
||||
data[0:limit] = [minimum] *limit
|
||||
#Build fit point list
|
||||
values = create_fit_point_list(data, x)
|
||||
maximum = max(data)
|
||||
gaussian_fitter = GaussianCurveFitter.create().withStartPoint([(maximum-minimum)/2,x[peak],1.0])
|
||||
#Fit return parameters: (normalization, mean, sigma)
|
||||
ret.append(gaussian_fitter.fit(values).tolist())
|
||||
return ret
|
||||
|
||||
|
||||
def create_fit_point_list(y, x):
|
||||
values = []
|
||||
for i in range(len(x)):
|
||||
values.append(WeightedObservedPoint(1, x[i], y[i]))
|
||||
return values
|
||||
|
||||
|
||||
def fit_polynomial(y, x, order, start_point = None):
|
||||
"""Fits data into a polynomial.
|
||||
|
||||
Args:
|
||||
x(float array or list)
|
||||
y(float array or list)
|
||||
order(int)
|
||||
start_point(optional tuple of float): initial parametrs (a0, a1, a2, ...)
|
||||
Returns:
|
||||
Tuples of polynomial parameters: (a0, a1, a2, ...)
|
||||
"""
|
||||
fit_point_list = create_fit_point_list(y, x)
|
||||
if start_point is None:
|
||||
polynomial_fitter = PolynomialCurveFitter.create(order)
|
||||
else:
|
||||
polynomial_fitter = PolynomialCurveFitter.create(0).withStartPoint(start_point)
|
||||
return polynomial_fitter.fit(fit_point_list).tolist()
|
||||
|
||||
def fit_gaussian(y, x, start_point_on_peak = False):
|
||||
"""Fits data into a gaussian.
|
||||
|
||||
Args:
|
||||
x(float array or list)
|
||||
y(float array or list)
|
||||
start_point_on_peak(optional boolean): If true initialize means on the detected peaks
|
||||
Returns:
|
||||
Tuples of gaussian parameters: (normalization, mean, sigma)
|
||||
"""
|
||||
fit_point_list = create_fit_point_list(y, x)
|
||||
if start_point_on_peak:
|
||||
peaks = estimate_peak_indexes(y, x)
|
||||
minimum = min(y)
|
||||
maximum = max(y)
|
||||
gaussian_fitter = GaussianCurveFitter.create().withStartPoint([(maximum-minimum)/2,x[peaks[0]],1.0])
|
||||
else:
|
||||
gaussian_fitter = GaussianCurveFitter.create()
|
||||
return gaussian_fitter.fit(fit_point_list).tolist() # (normalization, mean, sigma)
|
||||
|
||||
def fit_harmonic(y, x):
|
||||
"""Fits data into an harmonic.
|
||||
|
||||
Args:
|
||||
x(float array or list)
|
||||
y(float array or list)
|
||||
Returns:
|
||||
Tuples of harmonic parameters: (amplitude, angular_frequency, phase)
|
||||
"""
|
||||
fit_point_list = create_fit_point_list(y, x)
|
||||
harmonic_fitter = HarmonicCurveFitter.create()
|
||||
return harmonic_fitter.fit(fit_point_list).tolist() # (amplitude, angular_frequency, phase)
|
||||
|
||||
if False:
|
||||
data = [2,3,4,10,20,5,3,2,1]
|
||||
print estimate_peak_indexes(data)
|
||||
print estimate_peak_indexes(data, threshold =21.0)
|
||||
print estimate_peak_indexes(data, positive=False)
|
||||
print estimate_peak_indexes(data, threshold =1.0, positive=False)
|
||||
print estimate_peak_indexes(data, threshold = None, min_peak_distance = 10.0 , positive=False)
|
||||
xdata = [1,3,7,23,34,45,56,58, 61]
|
||||
print estimate_peak_indexes(data, xdata, threshold = None, min_peak_distance = 10.0 , positive=False)
|
||||
|
||||
Reference in New Issue
Block a user