Files
camserver_sf/configuration/user_scripts/bunch_length_op.py
2025-01-13 16:20:01 +01:00

92 lines
3.1 KiB
Python

import sys
import numpy as np
import scipy.optimize
from logging import getLogger
from cam_server.pipeline.data_processing import functions, processor
_logger = getLogger(__name__)
def gauss_function_op(x, H, A, x0, sigma):
return H + A * np.exp(-(x - x0)**2 / (2 * sigma**2))
def gauss_fit_op(x, y):
H = min(y)
A = max(y) - min(y)
x0 = x[y.argmax()]
surface = np.trapz((y - H), x=x)
sigma = surface / (A * np.sqrt(2 * np.pi)) if A != 0 else 1
try:
opt_par, cov_par = scipy.optimize.curve_fit(gauss_function_op, x, y, p0=[H, A, x0, sigma])
except BaseException as e:
opt_par = np.array([H, A, x0, sigma]).astype("float64")
return opt_par
def gauss_op(x, y):
H, A, x0, sigma = gauss_fit_op(x, y)
sigma = abs(sigma)
gauss = gauss_function_op(x, H, A, x0, sigma)
y = y - H # remove offset
y[y < 0.0] = 0.0 # remove negative amplitude
y = y / sum(y) if sum(y) != 0 else y # normalise to 1
condition = np.abs(x - x0) > 5000 # 5 * sigma -> too jumpy
y[condition] = 0.0 # remove outliers
com = sum(x * y)
x = x - com
rms = np.sqrt(np.abs(sum(x**2 * y)))
return gauss, H, A, x0, sigma, com, rms
def get_fw(x, y, level=0.5):
try:
ymax, ymin = np.amax(y), np.amin(y)
thr = (ymax - ymin) * level
imax, i1, i2 = np.argmax(y), 0, len(x)-1
for i in range(imax - 1, 0, -1):
if (y[i] - ymin) <= thr:
i1 = i
break
for i in range(imax + 1, len(x), 1):
if (y[i] - ymin) <= thr:
i2 = i
break
slope1 = (y[i1 + 1] - y[i1]) / (x[i1 + 1] - x[i1])
slope2 = (y[i2] - y[i2 - 1]) / (x[i2] - x[i2 - 1])
dx1 = (ymin + thr - y[i1]) / slope1
dx2 = (ymin + thr - y[i2]) / slope2
fw = abs((x[i2] + dx2) - (x[i1] + dx1))
return fw
except:
return 0.0
def process_image(image, pulse_id, timestamp, x_axis, y_axis, parameters, bsdata=None):
try:
ret = processor.process_image(image, pulse_id, timestamp, x_axis, y_axis, parameters, bsdata)
replace = parameters.get("replace", True)
level = parameters.get("fw_threshold", 0.5)
sufix = "" if replace else "_op"
x_axis = ret["x_axis"]
y_axis = ret["y_axis"]
x_profile = ret["x_profile"]
y_profile = ret["y_profile"]
gauss, H, A, x0, sigma, com, rms = gauss_op(x_axis, x_profile)
ret["x_fit_offset" + sufix] = H
ret["x_fit_amplitude" + sufix] = A
ret["x_fit_mean" + sufix] = x0
ret["x_fit_standard_deviation" + sufix] = sigma
ret["x_center_of_mass" + sufix] = com
ret["x_rms" + sufix] = rms
fw = get_fw(x_axis, x_profile, level)
ret["x_fw" + sufix] = fw
gauss, H, A, x0, sigma, com, rms = gauss_op(y_axis, y_profile)
ret["y_fit_offset" + sufix] = H
ret["y_fit_amplitude" + sufix] = A
ret["y_fit_mean" + sufix] = x0
ret["y_fit_standard_deviation" + sufix] = sigma
ret["y_center_of_mass" + sufix] = com
ret["y_rms" + sufix] = rms
fw = get_fw(y_axis, y_profile, level)
ret["y_fw" + sufix] = fw
return ret
except Exception as e:
_logger.warning(sys.exc_info())
return None