import sys import org.apache.commons.math3.util.FastMath as FastMath import org.apache.commons.math3.stat.regression.SimpleRegression as SimpleRegression import org.apache.commons.math3.analysis.polynomials.PolynomialFunction as PolynomialFunction import org.apache.commons.math3.analysis.function.Gaussian as Gaussian import org.apache.commons.math3.analysis.function.HarmonicOscillator as HarmonicOscillator import org.apache.commons.math3.analysis.solvers.LaguerreSolver as LaguerreSolver import org.apache.commons.math3.fitting.GaussianCurveFitter as GaussianCurveFitter import org.apache.commons.math3.fitting.PolynomialCurveFitter as PolynomialCurveFitter import org.apache.commons.math3.fitting.HarmonicCurveFitter as HarmonicCurveFitter import org.apache.commons.math3.fitting.WeightedObservedPoint as WeightedObservedPoint import org.apache.commons.math3.analysis.differentiation.FiniteDifferencesDifferentiator as FiniteDifferencesDifferentiator import org.apache.commons.math3.analysis.differentiation.DerivativeStructure as DerivativeStructure def calculate_peaks(function, start_value = sys.float_info.min, end_value = sys.float_info.max, positive=True): """Calculate peaks of a DifferentiableUnivariateFunction in a given range by finding the roots of the derivative Args: function(DifferentiableUnivariateFunction): The function object. start_value(float, optional): start of range end_value(float, optional): end of range positive (boolean, optional): True for searching positive peaks, False for negative. Returns: List of peaks in the interval """ derivative = function.derivative() derivative2 = derivative.derivative() ret = [] solver = LaguerreSolver() for complex in solver.solveAllComplex(derivative.coefficients, start_value): r = complex.real if start_value < r < end_value: if (positive and (derivative2.value(r) < 0)) or ( (not positive) and (derivative2.value(r) > 0)): ret.append(r) return ret def estimate_peak_indexes(data, xdata = None, threshold = None, min_peak_distance = None, positive = True): """Estimation of peaks in an array by ordering local maxima according to given criteria. Args: data(float array or list) xdata(float array or list, optional): if not None must have the same length as data. threshold(float, optional): if specified filter peaks below this value min_peak_distance(float, optional): if specified defines minimum distance between two peaks. if xdata == None, it represents index counts, otherwise in xdata units. positive (boolean, optional): True for searching positive peaks, False for negative. Returns: List of peaks indexes. """ peaks = [] indexes = sorted(range(len(data)),key=lambda x:data[x]) if positive: indexes = reversed(indexes) for index in indexes: first = (index == 0) last = (index == (len(data)-1)) val=data[index] prev = float('NaN') if first else data[index-1] next = float('NaN') if last else data[index+1] if threshold is not None: if (positive and (valthreshold)): break if ( positive and (first or val>prev ) and (last or val>=next ) ) or ( (not positive) and (first or val peak): data[limit : len(y)] =[minimum] * (len(y)-limit) elif (p < peak): data[0:limit] = [minimum] *limit #Build fit point list values = create_fit_point_list(data, x) maximum = max(data) gaussian_fitter = GaussianCurveFitter.create().withStartPoint([(maximum-minimum)/2,x[peak],1.0]) #Fit return parameters: (normalization, mean, sigma) ret.append(gaussian_fitter.fit(values).tolist()) return ret def create_fit_point_list(y, x): values = [] for i in range(len(x)): values.append(WeightedObservedPoint(1, x[i], y[i])) return values def fit_polynomial(y, x, order, start_point = None): """Fits data into a polynomial. Args: x(float array or list) y(float array or list) order(int) start_point(optional tuple of float): initial parametrs (a0, a1, a2, ...) Returns: Tuples of polynomial parameters: (a0, a1, a2, ...) """ fit_point_list = create_fit_point_list(y, x) if start_point is None: polynomial_fitter = PolynomialCurveFitter.create(order) else: polynomial_fitter = PolynomialCurveFitter.create(0).withStartPoint(start_point) return polynomial_fitter.fit(fit_point_list).tolist() def fit_gaussian(y, x, start_point_on_peak = False): """Fits data into a gaussian. Args: x(float array or list) y(float array or list) start_point_on_peak(optional boolean): If true initialize means on the detected peaks Returns: Tuples of gaussian parameters: (normalization, mean, sigma) """ fit_point_list = create_fit_point_list(y, x) if start_point_on_peak: peaks = estimate_peak_indexes(y, x) minimum = min(y) maximum = max(y) gaussian_fitter = GaussianCurveFitter.create().withStartPoint([(maximum-minimum)/2,x[peaks[0]],1.0]) else: gaussian_fitter = GaussianCurveFitter.create() return gaussian_fitter.fit(fit_point_list).tolist() # (normalization, mean, sigma) def fit_harmonic(y, x): """Fits data into an harmonic. Args: x(float array or list) y(float array or list) Returns: Tuples of harmonic parameters: (amplitude, angular_frequency, phase) """ fit_point_list = create_fit_point_list(y, x) harmonic_fitter = HarmonicCurveFitter.create() return harmonic_fitter.fit(fit_point_list).tolist() # (amplitude, angular_frequency, phase) if False: data = [2,3,4,10,20,5,3,2,1] print estimate_peak_indexes(data) print estimate_peak_indexes(data, threshold =21.0) print estimate_peak_indexes(data, positive=False) print estimate_peak_indexes(data, threshold =1.0, positive=False) print estimate_peak_indexes(data, threshold = None, min_peak_distance = 10.0 , positive=False) xdata = [1,3,7,23,34,45,56,58, 61] print estimate_peak_indexes(data, xdata, threshold = None, min_peak_distance = 10.0 , positive=False)