187 lines
7.6 KiB
Python
187 lines
7.6 KiB
Python
import sys
|
|
|
|
import org.apache.commons.math3.util.FastMath as FastMath
|
|
|
|
import org.apache.commons.math3.stat.regression.SimpleRegression as SimpleRegression
|
|
|
|
import org.apache.commons.math3.analysis.polynomials.PolynomialFunction as PolynomialFunction
|
|
import org.apache.commons.math3.analysis.function.Gaussian as Gaussian
|
|
import org.apache.commons.math3.analysis.function.HarmonicOscillator as HarmonicOscillator
|
|
import org.apache.commons.math3.analysis.solvers.LaguerreSolver as LaguerreSolver
|
|
|
|
import org.apache.commons.math3.fitting.GaussianCurveFitter as GaussianCurveFitter
|
|
import org.apache.commons.math3.fitting.PolynomialCurveFitter as PolynomialCurveFitter
|
|
import org.apache.commons.math3.fitting.HarmonicCurveFitter as HarmonicCurveFitter
|
|
import org.apache.commons.math3.fitting.WeightedObservedPoint as WeightedObservedPoint
|
|
|
|
import org.apache.commons.math3.analysis.differentiation.FiniteDifferencesDifferentiator as FiniteDifferencesDifferentiator
|
|
import org.apache.commons.math3.analysis.differentiation.DerivativeStructure as DerivativeStructure
|
|
|
|
|
|
|
|
def calculate_peaks(function, start_value = sys.float_info.min, end_value = sys.float_info.max, positive=True):
|
|
"""Calculate peaks of a DifferentiableUnivariateFunction in a given range by finding the roots of the derivative
|
|
|
|
Args:
|
|
function(DifferentiableUnivariateFunction): The function object.
|
|
start_value(float, optional): start of range
|
|
end_value(float, optional): end of range
|
|
positive (boolean, optional): True for searching positive peaks, False for negative.
|
|
Returns:
|
|
List of peaks in the interval
|
|
|
|
"""
|
|
derivative = function.derivative()
|
|
derivative2 = derivative.derivative()
|
|
ret = []
|
|
solver = LaguerreSolver()
|
|
for complex in solver.solveAllComplex(derivative.coefficients, start_value):
|
|
r = complex.real
|
|
if start_value < r < end_value:
|
|
if (positive and (derivative2.value(r) < 0)) or ( (not positive) and (derivative2.value(r) > 0)):
|
|
ret.append(r)
|
|
return ret
|
|
|
|
|
|
def estimate_peak_indexes(data, xdata = None, threshold = None, min_peak_distance = None, positive = True):
|
|
"""Estimation of peaks in an array by ordering local maxima according to given criteria.
|
|
|
|
Args:
|
|
data(float array or list)
|
|
xdata(float array or list, optional): if not None must have the same length as data.
|
|
threshold(float, optional): if specified filter peaks below this value
|
|
min_peak_distance(float, optional): if specified defines minimum distance between two peaks.
|
|
if xdata == None, it represents index counts, otherwise in xdata units.
|
|
positive (boolean, optional): True for searching positive peaks, False for negative.
|
|
Returns:
|
|
List of peaks indexes.
|
|
"""
|
|
peaks = []
|
|
indexes = sorted(range(len(data)),key=lambda x:data[x])
|
|
if positive:
|
|
indexes = reversed(indexes)
|
|
for index in indexes:
|
|
first = (index == 0)
|
|
last = (index == (len(data)-1))
|
|
val=data[index]
|
|
prev = float('NaN') if first else data[index-1]
|
|
next = float('NaN') if last else data[index+1]
|
|
|
|
if threshold is not None:
|
|
if (positive and (val<threshold)) or ((not positive) and (val>threshold)):
|
|
break
|
|
if ( positive and (first or val>prev ) and (last or val>=next ) ) or (
|
|
(not positive) and (first or val<prev ) and (last or val<=next ) ):
|
|
append = True
|
|
if min_peak_distance is not None:
|
|
for peak in peaks:
|
|
if ((xdata is None) and (abs(peak-index) < min_peak_distance)) or (
|
|
(xdata is not None) and (abs(xdata[peak]-xdata[index]) < min_peak_distance)):
|
|
append = False
|
|
break
|
|
if append:
|
|
peaks.append(index)
|
|
return peaks
|
|
|
|
|
|
def fit_gaussians(y, x, peak_indexes):
|
|
"""Fits data on multiple gaussians on the given peak indexes.
|
|
|
|
Args:
|
|
x(float array or list)
|
|
y(float array or list)
|
|
peak_indexes(list of int)
|
|
Returns:
|
|
List of tuples of gaussian parameters: (normalization, mean, sigma)
|
|
"""
|
|
ret = []
|
|
|
|
minimum = min(y)
|
|
for peak in peak_indexes:
|
|
#Copy data
|
|
data = y[:]
|
|
#Remover data from other peaks
|
|
for p in peak_indexes:
|
|
limit = int(round((p+peak)/2))
|
|
if (p > peak):
|
|
data[limit : len(y)] =[minimum] * (len(y)-limit)
|
|
elif (p < peak):
|
|
data[0:limit] = [minimum] *limit
|
|
#Build fit point list
|
|
values = create_fit_point_list(data, x)
|
|
maximum = max(data)
|
|
gaussian_fitter = GaussianCurveFitter.create().withStartPoint([(maximum-minimum)/2,x[peak],1.0])
|
|
#Fit return parameters: (normalization, mean, sigma)
|
|
ret.append(gaussian_fitter.fit(values).tolist())
|
|
return ret
|
|
|
|
|
|
def create_fit_point_list(y, x):
|
|
values = []
|
|
for i in range(len(x)):
|
|
values.append(WeightedObservedPoint(1, x[i], y[i]))
|
|
return values
|
|
|
|
|
|
def fit_polynomial(y, x, order, start_point = None):
|
|
"""Fits data into a polynomial.
|
|
|
|
Args:
|
|
x(float array or list)
|
|
y(float array or list)
|
|
order(int)
|
|
start_point(optional tuple of float): initial parametrs (a0, a1, a2, ...)
|
|
Returns:
|
|
Tuples of polynomial parameters: (a0, a1, a2, ...)
|
|
"""
|
|
fit_point_list = create_fit_point_list(y, x)
|
|
if start_point is None:
|
|
polynomial_fitter = PolynomialCurveFitter.create(order)
|
|
else:
|
|
polynomial_fitter = PolynomialCurveFitter.create(0).withStartPoint(start_point)
|
|
return polynomial_fitter.fit(fit_point_list).tolist()
|
|
|
|
def fit_gaussian(y, x, start_point_on_peak = False):
|
|
"""Fits data into a gaussian.
|
|
|
|
Args:
|
|
x(float array or list)
|
|
y(float array or list)
|
|
start_point_on_peak(optional boolean): If true initialize means on the detected peaks
|
|
Returns:
|
|
Tuples of gaussian parameters: (normalization, mean, sigma)
|
|
"""
|
|
fit_point_list = create_fit_point_list(y, x)
|
|
if start_point_on_peak:
|
|
peaks = estimate_peak_indexes(y, x)
|
|
minimum = min(y)
|
|
maximum = max(y)
|
|
gaussian_fitter = GaussianCurveFitter.create().withStartPoint([(maximum-minimum)/2,x[peaks[0]],1.0])
|
|
else:
|
|
gaussian_fitter = GaussianCurveFitter.create()
|
|
return gaussian_fitter.fit(fit_point_list).tolist() # (normalization, mean, sigma)
|
|
|
|
def fit_harmonic(y, x):
|
|
"""Fits data into an harmonic.
|
|
|
|
Args:
|
|
x(float array or list)
|
|
y(float array or list)
|
|
Returns:
|
|
Tuples of harmonic parameters: (amplitude, angular_frequency, phase)
|
|
"""
|
|
fit_point_list = create_fit_point_list(y, x)
|
|
harmonic_fitter = HarmonicCurveFitter.create()
|
|
return harmonic_fitter.fit(fit_point_list).tolist() # (amplitude, angular_frequency, phase)
|
|
|
|
if False:
|
|
data = [2,3,4,10,20,5,3,2,1]
|
|
print estimate_peak_indexes(data)
|
|
print estimate_peak_indexes(data, threshold =21.0)
|
|
print estimate_peak_indexes(data, positive=False)
|
|
print estimate_peak_indexes(data, threshold =1.0, positive=False)
|
|
print estimate_peak_indexes(data, threshold = None, min_peak_distance = 10.0 , positive=False)
|
|
xdata = [1,3,7,23,34,45,56,58, 61]
|
|
print estimate_peak_indexes(data, xdata, threshold = None, min_peak_distance = 10.0 , positive=False)
|
|
|