226 lines
8.0 KiB
Python
Executable File
226 lines
8.0 KiB
Python
Executable File
"""Get data from an h5 file or BEC history and perform fitting."""
|
|
|
|
import numpy as np
|
|
from lmfit.models import GaussianModel, LorentzianModel, VoigtModel, ConstantModel, LinearModel
|
|
from scipy.ndimage import gaussian_filter1d
|
|
import h5py
|
|
import matplotlib.pyplot as plt
|
|
|
|
|
|
def create_fit_parameters(
|
|
deriv: bool = False, model: str = "Voigt", baseline: str = "Linear", smoothing: None = None
|
|
):
|
|
"""Store the fit parameters in a dictionary."""
|
|
# map input model to lmfit model name
|
|
model_mappings = {
|
|
"Gaussian": GaussianModel,
|
|
"Lorentzian": LorentzianModel,
|
|
"Voigt": VoigtModel,
|
|
"Constant": ConstantModel,
|
|
"Linear": LinearModel,
|
|
}
|
|
return {
|
|
"deriv": deriv,
|
|
"model": model_mappings[model],
|
|
"baseline": model_mappings[baseline],
|
|
"smoothing": smoothing,
|
|
}
|
|
|
|
|
|
def get_data_from_h5(signal_name: str = "lu_bpmsum"):
|
|
"""Get data from an h5 file."""
|
|
with h5py.File("scan_676.h5", "r") as f:
|
|
entry = f["entry"]["collection"]
|
|
y_data = entry["devices"][signal_name][signal_name]["value"][:]
|
|
motor_data = entry["metadata"]["bec"]
|
|
motor_name = motor_data["scan_motors"][0].decode()
|
|
scan_number = motor_data["scan_number"][()]
|
|
x_data = entry["devices"][motor_name][motor_name]["value"][:]
|
|
return {
|
|
"x_data": x_data,
|
|
"y_data": y_data,
|
|
"signal_name": signal_name,
|
|
"motor_name": motor_name,
|
|
"scan_number": str(scan_number),
|
|
}
|
|
|
|
|
|
def get_data_from_history(history_index: int, signal_name: str = "lu_bpmsum"):
|
|
"""Read data from the BEC history and return the X and Y data as arrays."""
|
|
scan = bec.history[history_index]
|
|
md = scan.metadata["bec"]
|
|
motor_name = md["scan_motors"][0].decode()
|
|
scan_number = md["scan_number"]
|
|
x_data = scan.devices[motor_name][motor_name].read()["value"]
|
|
y_data = scan.devices[signal_name][signal_name].read()["value"]
|
|
return {
|
|
"signal_name": signal_name,
|
|
"x_data": x_data,
|
|
"y_data": y_data,
|
|
"motor_name": motor_name,
|
|
"scan_number": scan_number,
|
|
}
|
|
|
|
|
|
def process_data(data, fit_params):
|
|
"""
|
|
Process the signal data for fitting based on derivative or smoothing.
|
|
"""
|
|
smoothing, deriv = fit_params["smoothing"], fit_params["deriv"]
|
|
signal_name = data["signal_name"]
|
|
y_data = data["y_data"]
|
|
|
|
if deriv:
|
|
if smoothing:
|
|
y_smooth = gaussian_filter1d(y_data, smoothing)
|
|
fitting_data = np.gradient(y_smooth)
|
|
signal_name = f"Derivative of smoothed {signal_name}"
|
|
else:
|
|
fitting_data = np.gradient(y_data)
|
|
signal_name = f"Derivative of {signal_name}"
|
|
elif smoothing and smoothing > 0.01:
|
|
fitting_data = gaussian_filter1d(y_data, smoothing)
|
|
signal_name = f"Smoothed {signal_name}"
|
|
else:
|
|
fitting_data = y_data
|
|
|
|
updated_data = {"y_to_fit": fitting_data, "signal_name": signal_name}
|
|
data.update(updated_data)
|
|
return data
|
|
|
|
|
|
def fit(data, fit_params):
|
|
"""Fit a signal to a model and return the fitting results."""
|
|
# Create the model
|
|
peak_model = fit_params["model"](prefix="peak_")
|
|
baseline_model = fit_params["baseline"](prefix="base_")
|
|
full_model = peak_model + baseline_model
|
|
|
|
# Prepare data
|
|
processed_data = process_data(data, fit_params)
|
|
params = full_model.make_params()
|
|
y_min = np.min(processed_data["y_to_fit"])
|
|
|
|
# Configure baseline parameters
|
|
if fit_params["baseline"] == ConstantModel:
|
|
params["base_c"].set(value=y_min)
|
|
elif fit_params["baseline"] == LinearModel:
|
|
params["base_intercept"].set(value=y_min)
|
|
params["base_slope"].set(value=0)
|
|
|
|
# Add peak-specific parameters
|
|
params.update(peak_model.guess(processed_data["y_to_fit"], x=processed_data["x_data"]))
|
|
|
|
# Perform the fitting
|
|
lmfit_result = full_model.fit(processed_data["y_to_fit"], params, x=processed_data["x_data"])
|
|
|
|
# Find the X that gives the max Y
|
|
max_index = np.argmax(processed_data["y_to_fit"])
|
|
x_max = processed_data["x_data"][max_index]
|
|
|
|
# Collect results
|
|
return {
|
|
"model": fit_params["model"].__name__,
|
|
"fwhm": lmfit_result.params["peak_fwhm"].value,
|
|
"centre": lmfit_result.best_values["peak_center"],
|
|
"height": lmfit_result.params["peak_height"].value,
|
|
"chi_sq": lmfit_result.chisqr,
|
|
"lmfit_result": lmfit_result,
|
|
"x_max": x_max,
|
|
}
|
|
|
|
|
|
def plot_fitted_data(data, fit_result):
|
|
"""Plot the original data and the fitted model."""
|
|
plt.plot(data["x_data"], data["y_to_fit"], label="Data")
|
|
plt.plot(
|
|
data["x_data"],
|
|
fit_result["lmfit_result"].best_fit,
|
|
"-",
|
|
label=f"FWHM = {fit_result['fwhm']:.3f},"
|
|
f"Centre = {fit_result['centre']:.3f}, "
|
|
f"Height = {fit_result['height']:.3f}",
|
|
)
|
|
plt.xlabel(data["motor_name"])
|
|
plt.ylabel(data["signal_name"])
|
|
plt.title(f"Scan {data['scan_number']}, fitted with {fit_result['model']}")
|
|
plt.grid(True)
|
|
plt.legend()
|
|
plt.show()
|
|
|
|
|
|
def select_bec_window(dock_area_name="Fitting"):
|
|
"""Check to see if the fitting results dock is already open and re-create it if not"""
|
|
open_docks = bec.gui.windows
|
|
if open_docks.get(dock_area_name) is None:
|
|
dock_area = bec.gui.new(dock_area_name)
|
|
wf = dock_area.new("Plot").new(bec.gui.available_widgets.Waveform)
|
|
text_box = dock_area.new("Results", position="bottom").new(
|
|
widget=bec.gui.available_widgets.TextBox
|
|
)
|
|
else:
|
|
wf = bec.gui.Fitting.Plot.Waveform
|
|
text_box = bec.gui.Fitting.Results.TextBox
|
|
return wf, text_box
|
|
|
|
|
|
def plot_live_data_bec(motor_name, signal_name, window_name="Fitting"):
|
|
"""
|
|
Plotting live data for motor and signal using BEC.
|
|
|
|
This function plots live data from a specified motor and signal.
|
|
It clears the current plot window, sets its title, labels the axes
|
|
with the provided motor and signal names, and initializes live plotting
|
|
on the given signal against the motor.
|
|
|
|
Args:
|
|
motor_name (str): The name of the motor to be used as the x-axis.
|
|
signal_name (str): The name of the signal to be used as the y-axis.
|
|
|
|
Returns:
|
|
None
|
|
"""
|
|
wf, text_box = select_bec_window(window_name)
|
|
text_box.set_plain_text("Plotting live data")
|
|
wf.clear_all()
|
|
wf.title = "Scan: Live scan"
|
|
wf.x_label = motor_name
|
|
wf.y_label = signal_name
|
|
wf.plot(x_name=motor_name, y_name=signal_name)
|
|
|
|
|
|
def plot_fitted_data_bec(data, fit_result):
|
|
"""
|
|
Plot fitted data and display fitting parameters in the specified window.
|
|
|
|
This function selects a BEC window and plots the original data along with the
|
|
fitted function. Additionally, it displays the fitting results in a text
|
|
box within the same window for better visualization of the fit results.
|
|
|
|
Parameters:
|
|
data : dict
|
|
Dictionary containing the original dataset, where 'x_data' and 'y_to_fit'
|
|
hold the independent variable and the dependent variable, respectively,
|
|
'scan_number' represents the scan number, 'motor_name' and 'signal_name'
|
|
provide axis labels.
|
|
fit_result : dict
|
|
Dictionary containing the results of the fit, including parameters such
|
|
as 'centre', 'fwhm', 'height', and the fitted model stored under
|
|
'lmfit_result', with its 'best_fit' attribute representing the fitted data.
|
|
"""
|
|
wf, text_box = select_bec_window()
|
|
fit_text = (
|
|
f"Fit parameters: Centre = {fit_result['centre']:.4f}, "
|
|
f"FWHM = {fit_result['fwhm']:.3f}, "
|
|
f"Height = {fit_result['height']:.4f}\n"
|
|
f"Model = {fit_result['model']}\n"
|
|
f"Chi sq = {fit_result['chi_sq']:.3g}"
|
|
)
|
|
text_box.set_plain_text(fit_text)
|
|
wf.clear_all()
|
|
wf.title = f"Scan: {data['scan_number']}"
|
|
wf.x_label = data["motor_name"]
|
|
wf.y_label = data["signal_name"]
|
|
wf.plot(x=data["x_data"], y=data["y_to_fit"], label="data")
|
|
wf.plot(x=data["x_data"], y=fit_result["lmfit_result"].best_fit, label="Fit to data")
|