from logging import getLogger from cam_server.pipeline.data_processing import functions, processor from cam_server.utils import create_thread_pvs, epics_lock import json import numpy as np import numpy import scipy.signal import scipy.optimize import numba numba.set_num_threads(4) _logger = getLogger(__name__) channel_names = None roi = [0, 0] initialized = False sent_pid = -1 nrows = 1 axis = None @numba.njit(parallel=False) def get_spectrum(image, background): y = image.shape[0] x = image.shape[1] profile = np.zeros(x, dtype=np.float64) for i in numba.prange(y): for j in range(x): profile[j] += image[i, j] - background[i, j] return profile @numba.njit() def _gauss_function(x, offset, amplitude, center, standard_deviation): # return offset + amplitude * numpy.exp(-(numpy.power((x - center), 2) / (2 * numpy.power(standard_deviation, 2)))) return offset + amplitude * numpy.exp(-(x - center) ** 2 / (2 * standard_deviation ** 2)) @numba.njit() def _gauss_deriv(x, offset, amplitude, center, standard_deviation): fac = numpy.exp(-(x - center) ** 2 / (2 * standard_deviation ** 2)) result = numpy.empty((4, x.size), dtype=x.dtype) result[0, :] = 1.0 result[1, :] = fac result[2, :] = amplitude * fac * (x - center) / (standard_deviation ** 2) result[3, :] = amplitude * fac * ((x - center) ** 2) / (standard_deviation ** 3) return result @numba.jit(forceobj=True) def fit(profile, axis, offset, amplitude, skip, maxfev, std): center = numpy.dot(axis, profile) / profile.sum() # Center of mass is a good estimation of center (mu) # Consider gaussian integral is amplitude * sigma * sqrt(2*pi) standard_deviation = std # numpy.trapz((profile - offset), x=axis) / (amplitude * numpy.sqrt(2 * numpy.pi)) # If user requests fitting to be skipped, return the estimated parameters. if skip: return offset, amplitude, center, abs(standard_deviation) #try: optimal_parameter, _ = scipy.optimize.curve_fit( _gauss_function, axis, profile.astype("float64"), p0=[offset, amplitude, center, standard_deviation], jac=_gauss_deriv, col_deriv=1, maxfev=maxfev) #except: # # Make sure return always as same type # optimal_parameter = numpy.array([offset, amplitude, center, standard_deviation]).astype("float64") offset, amplitude, center, standard_deviation = optimal_parameter return offset, amplitude, center, abs(standard_deviation) def process_image(image, pulse_id, timestamp, x_axis, y_axis, parameters, bsdata=None, background=None): global roi, initialized, sent_pid, nrows, axis global channel_names parameters["pixel_bkg"] = 1 processed_data = {} #processor.process_image(image, pulse_id, timestamp, x_axis, y_axis, parameters, bsdata) camera_name = parameters["camera_name"] axis = np.array(range(image.shape[1]), dtype="float") processing_image = image.astype(np.float32) - np.float32(parameters["pixel_bkg"]) nrows, ncols = processing_image.shape # validate background data if passive mode (background subtraction handled here) background_image = parameters.pop('background_data', None) if isinstance(background_image, np.ndarray): background_image = background_image.astype(np.float32) if background_image.shape != processing_image.shape: _logger.info("Invalid background shape: %s instead of %s" % ( str(background_image.shape), str(processing_image.shape))) background_image = None else: background_image = None processed_data[camera_name + ":processing_parameters"] = json.dumps( {"roi": roi, "background": None if (background_image is None) else parameters.get('image_background')}) # crop the image in y direction ymin, ymax = int(roi[0]), int(roi[1]) if nrows >= ymax > ymin >= 0: if (nrows != ymax) or (ymin != 0): processing_image = processing_image[ymin: ymax, :] if background_image is not None: background_image = background_image[ymin:ymax, :] # remove the background and collapse in y direction to get the spectrum if background_image is not None: spectrum = get_spectrum(processing_image, background_image) else: spectrum = np.sum(processing_image, axis=0) # smooth the spectrum with savgol filter with 51 window size and 3rd order polynomial smoothed_spectrum = scipy.signal.savgol_filter(spectrum, 51, 3) # check wether spectrum has only noise. the average counts per pixel at the peak # should be larger than 1.5 to be considered as having real signals. minimum, maximum = smoothed_spectrum.min(), smoothed_spectrum.max() amplitude = maximum - minimum skip = True if amplitude > nrows * 1.5: skip = False # gaussian fitting #offset, amplitude, center, sigma = functions.gauss_fit_psss(smoothed_spectrum[::2], axis[::2], # offset=minimum, amplitude=amplitude, skip=skip, # maxfev=20) _axis, _profile = axis[::2], smoothed_spectrum[::2] if _axis.shape[0] != _profile.shape[0]: raise RuntimeError("Invalid axis passed %d %d" % (_axis.shape[0], _profile.shape[0])) std = numpy.trapz((_profile - minimum), x=_axis) / (amplitude * numpy.sqrt(2 * numpy.pi)) offset, amplitude, center, sigma = fit(_profile,_axis, offset=minimum, amplitude=amplitude, skip=skip, maxfev=20, std=std) smoothed_spectrum_normed = smoothed_spectrum / np.sum(smoothed_spectrum) spectrum_com = np.sum(axis * smoothed_spectrum_normed) spectrum_std = np.sqrt(np.sum((axis - spectrum_com) ** 2 * smoothed_spectrum_normed)) processed_data["skip"]= 1 if skip else 0 # outputs processed_data[camera_name + ":SPECTRUM_Y"] = spectrum processed_data[camera_name + ":SPECTRUM_X"] = axis processed_data[camera_name + ":SPECTRUM_CENTER"] = np.float64(center) processed_data[camera_name + ":SPECTRUM_FWHM"] = np.float64(2.355 * sigma) processed_data[camera_name + ":SPECTRUM_COM"] = spectrum_com processed_data[camera_name + ":SPECTRUM_STD"] = spectrum_std return processed_data