Files
2022-06-01 16:18:43 +02:00

157 lines
6.2 KiB
Python

from logging import getLogger
from cam_server.pipeline.data_processing import functions, processor
from cam_server.utils import create_thread_pvs, epics_lock
import json
import numpy as np
import numpy
import scipy.signal
import scipy.optimize
import numba
numba.set_num_threads(4)
_logger = getLogger(__name__)
channel_names = None
roi = [0, 0]
initialized = False
sent_pid = -1
nrows = 1
axis = None
@numba.njit(parallel=False)
def get_spectrum(image, background):
y = image.shape[0]
x = image.shape[1]
profile = np.zeros(x, dtype=np.float64)
for i in numba.prange(y):
for j in range(x):
profile[j] += image[i, j] - background[i, j]
return profile
@numba.njit()
def _gauss_function(x, offset, amplitude, center, standard_deviation):
# return offset + amplitude * numpy.exp(-(numpy.power((x - center), 2) / (2 * numpy.power(standard_deviation, 2))))
return offset + amplitude * numpy.exp(-(x - center) ** 2 / (2 * standard_deviation ** 2))
@numba.njit()
def _gauss_deriv(x, offset, amplitude, center, standard_deviation):
fac = numpy.exp(-(x - center) ** 2 / (2 * standard_deviation ** 2))
result = numpy.empty((4, x.size), dtype=x.dtype)
result[0, :] = 1.0
result[1, :] = fac
result[2, :] = amplitude * fac * (x - center) / (standard_deviation ** 2)
result[3, :] = amplitude * fac * ((x - center) ** 2) / (standard_deviation ** 3)
return result
@numba.jit(forceobj=True)
def fit(profile, axis, offset, amplitude, skip, maxfev, std):
center = numpy.dot(axis, profile) / profile.sum() # Center of mass is a good estimation of center (mu)
# Consider gaussian integral is amplitude * sigma * sqrt(2*pi)
standard_deviation = std # numpy.trapz((profile - offset), x=axis) / (amplitude * numpy.sqrt(2 * numpy.pi))
# If user requests fitting to be skipped, return the estimated parameters.
if skip:
return offset, amplitude, center, abs(standard_deviation)
#try:
optimal_parameter, _ = scipy.optimize.curve_fit(
_gauss_function, axis, profile.astype("float64"),
p0=[offset, amplitude, center, standard_deviation],
jac=_gauss_deriv,
col_deriv=1,
maxfev=maxfev)
#except:
# # Make sure return always as same type
# optimal_parameter = numpy.array([offset, amplitude, center, standard_deviation]).astype("float64")
offset, amplitude, center, standard_deviation = optimal_parameter
return offset, amplitude, center, abs(standard_deviation)
def process_image(image, pulse_id, timestamp, x_axis, y_axis, parameters, bsdata=None, background=None):
global roi, initialized, sent_pid, nrows, axis
global channel_names
parameters["pixel_bkg"] = 1
processed_data = {} #processor.process_image(image, pulse_id, timestamp, x_axis, y_axis, parameters, bsdata)
camera_name = parameters["camera_name"]
axis = np.array(range(image.shape[1]), dtype="float")
processing_image = image.astype(np.float32) - np.float32(parameters["pixel_bkg"])
nrows, ncols = processing_image.shape
# validate background data if passive mode (background subtraction handled here)
background_image = parameters.pop('background_data', None)
if isinstance(background_image, np.ndarray):
background_image = background_image.astype(np.float32)
if background_image.shape != processing_image.shape:
_logger.info("Invalid background shape: %s instead of %s" % (
str(background_image.shape), str(processing_image.shape)))
background_image = None
else:
background_image = None
processed_data[camera_name + ":processing_parameters"] = json.dumps(
{"roi": roi, "background": None if (background_image is None) else parameters.get('image_background')})
# crop the image in y direction
ymin, ymax = int(roi[0]), int(roi[1])
if nrows >= ymax > ymin >= 0:
if (nrows != ymax) or (ymin != 0):
processing_image = processing_image[ymin: ymax, :]
if background_image is not None:
background_image = background_image[ymin:ymax, :]
# remove the background and collapse in y direction to get the spectrum
if background_image is not None:
spectrum = get_spectrum(processing_image, background_image)
else:
spectrum = np.sum(processing_image, axis=0)
# smooth the spectrum with savgol filter with 51 window size and 3rd order polynomial
smoothed_spectrum = scipy.signal.savgol_filter(spectrum, 51, 3)
# check wether spectrum has only noise. the average counts per pixel at the peak
# should be larger than 1.5 to be considered as having real signals.
minimum, maximum = smoothed_spectrum.min(), smoothed_spectrum.max()
amplitude = maximum - minimum
skip = True
if amplitude > nrows * 1.5:
skip = False
# gaussian fitting
#offset, amplitude, center, sigma = functions.gauss_fit_psss(smoothed_spectrum[::2], axis[::2],
# offset=minimum, amplitude=amplitude, skip=skip,
# maxfev=20)
_axis, _profile = axis[::2], smoothed_spectrum[::2]
if _axis.shape[0] != _profile.shape[0]:
raise RuntimeError("Invalid axis passed %d %d" % (_axis.shape[0], _profile.shape[0]))
std = numpy.trapz((_profile - minimum), x=_axis) / (amplitude * numpy.sqrt(2 * numpy.pi))
offset, amplitude, center, sigma = fit(_profile,_axis, offset=minimum, amplitude=amplitude, skip=skip, maxfev=20, std=std)
smoothed_spectrum_normed = smoothed_spectrum / np.sum(smoothed_spectrum)
spectrum_com = np.sum(axis * smoothed_spectrum_normed)
spectrum_std = np.sqrt(np.sum((axis - spectrum_com) ** 2 * smoothed_spectrum_normed))
processed_data["skip"]= 1 if skip else 0
# outputs
processed_data[camera_name + ":SPECTRUM_Y"] = spectrum
processed_data[camera_name + ":SPECTRUM_X"] = axis
processed_data[camera_name + ":SPECTRUM_CENTER"] = np.float64(center)
processed_data[camera_name + ":SPECTRUM_FWHM"] = np.float64(2.355 * sigma)
processed_data[camera_name + ":SPECTRUM_COM"] = spectrum_com
processed_data[camera_name + ":SPECTRUM_STD"] = spectrum_std
return processed_data