Drop peakfinding step

The initial guesses via lmfit seems to work just fine
This commit is contained in:
usov_i 2021-02-26 09:32:12 +01:00
parent 7ee4b1c86a
commit 11fae5d47a
4 changed files with 9 additions and 272 deletions

View File

@ -1,5 +1,4 @@
from pyzebra.anatric import * from pyzebra.anatric import *
from pyzebra.ccl_findpeaks import ccl_findpeaks
from pyzebra.ccl_io import export_1D, load_1D, parse_1D from pyzebra.ccl_io import export_1D, load_1D, parse_1D
from pyzebra.fit2 import fitccl from pyzebra.fit2 import fitccl
from pyzebra.h5 import * from pyzebra.h5 import *

View File

@ -8,7 +8,6 @@ from copy import deepcopy
import numpy as np import numpy as np
from bokeh.layouts import column, row from bokeh.layouts import column, row
from bokeh.models import ( from bokeh.models import (
Asterisk,
BasicTicker, BasicTicker,
Button, Button,
CheckboxEditor, CheckboxEditor,
@ -68,7 +67,6 @@ PROPOSAL_PATH = "/afs/psi.ch/project/sinqdata/2020/zebra/"
def create(): def create():
det_data = {} det_data = {}
fit_params = {} fit_params = {}
peak_pos_textinput_lock = False
js_data = { js_data = {
".comm": ColumnDataSource(data=dict(cont=[], ext=[])), ".comm": ColumnDataSource(data=dict(cont=[], ext=[])),
".incomm": ColumnDataSource(data=dict(cont=[], ext=[])), ".incomm": ColumnDataSource(data=dict(cont=[], ext=[])),
@ -91,11 +89,7 @@ def create():
hkl = [f'{s["h"]} {s["k"]} {s["l"]}' for s in det_data] hkl = [f'{s["h"]} {s["k"]} {s["l"]}' for s in det_data]
export = [s.get("active", True) for s in det_data] export = [s.get("active", True) for s in det_data]
scan_table_source.data.update( scan_table_source.data.update(
scan=scan_list, scan=scan_list, hkl=hkl, fit=[0] * len(scan_list), export=export,
hkl=hkl,
peaks=[0] * len(scan_list),
fit=[0] * len(scan_list),
export=export,
) )
scan_table_source.selected.indices = [] scan_table_source.selected.indices = []
scan_table_source.selected.indices = [0] scan_table_source.selected.indices = [0]
@ -178,14 +172,10 @@ def create():
monitor_spinner.on_change("value", monitor_spinner_callback) monitor_spinner.on_change("value", monitor_spinner_callback)
def _update_table(): def _update_table():
num_of_peaks = [len(scan.get("peak_indexes", [])) for scan in det_data]
fit_ok = [(1 if "fit" in scan else 0) for scan in det_data] fit_ok = [(1 if "fit" in scan else 0) for scan in det_data]
scan_table_source.data.update(peaks=num_of_peaks, fit=fit_ok) scan_table_source.data.update(fit=fit_ok)
def _update_plot(scan): def _update_plot(scan):
nonlocal peak_pos_textinput_lock
peak_pos_textinput_lock = True
scan_motor = scan["scan_motor"] scan_motor = scan["scan_motor"]
y = scan["Counts"] y = scan["Counts"]
@ -194,23 +184,6 @@ def create():
plot.axis[0].axis_label = scan_motor plot.axis[0].axis_label = scan_motor
plot_scatter_source.data.update(x=x, y=y, y_upper=y + np.sqrt(y), y_lower=y - np.sqrt(y)) plot_scatter_source.data.update(x=x, y=y, y_upper=y + np.sqrt(y), y_lower=y - np.sqrt(y))
num_of_peaks = len(scan.get("peak_indexes", []))
if num_of_peaks is not None and num_of_peaks > 0:
peak_indexes = scan["peak_indexes"]
if len(peak_indexes) == 1:
peak_pos_textinput.value = str(scan[scan_motor][peak_indexes[0]])
else:
peak_pos_textinput.value = str([scan[scan_motor][ind] for ind in peak_indexes])
plot_peak_source.data.update(x=scan[scan_motor][peak_indexes], y=scan["peak_heights"])
plot_line_smooth_source.data.update(x=x, y=scan["smooth_peaks"])
else:
peak_pos_textinput.value = None
plot_peak_source.data.update(x=[], y=[])
plot_line_smooth_source.data.update(x=[], y=[])
peak_pos_textinput_lock = False
fit = scan.get("fit") fit = scan.get("fit")
if fit is not None: if fit is not None:
comps = fit.eval_components() comps = fit.eval_components()
@ -249,20 +222,12 @@ def create():
plot.add_glyph(plot_scatter_source, Scatter(x="x", y="y", line_color="steelblue")) plot.add_glyph(plot_scatter_source, Scatter(x="x", y="y", line_color="steelblue"))
plot.add_layout(Whisker(source=plot_scatter_source, base="x", upper="y_upper", lower="y_lower")) plot.add_layout(Whisker(source=plot_scatter_source, base="x", upper="y_upper", lower="y_lower"))
plot_line_smooth_source = ColumnDataSource(dict(x=[0], y=[0]))
plot.add_glyph(
plot_line_smooth_source, Line(x="x", y="y", line_color="steelblue", line_dash="dashed")
)
plot_gauss_source = ColumnDataSource(dict(x=[0], y=[0])) plot_gauss_source = ColumnDataSource(dict(x=[0], y=[0]))
plot.add_glyph(plot_gauss_source, Line(x="x", y="y", line_color="red", line_dash="dashed")) plot.add_glyph(plot_gauss_source, Line(x="x", y="y", line_color="red", line_dash="dashed"))
plot_bkg_source = ColumnDataSource(dict(x=[0], y=[0])) plot_bkg_source = ColumnDataSource(dict(x=[0], y=[0]))
plot.add_glyph(plot_bkg_source, Line(x="x", y="y", line_color="green", line_dash="dashed")) plot.add_glyph(plot_bkg_source, Line(x="x", y="y", line_color="green", line_dash="dashed"))
plot_peak_source = ColumnDataSource(dict(x=[], y=[]))
plot.add_glyph(plot_peak_source, Asterisk(x="x", y="y", size=10, line_color="red"))
numfit_min_span = Span(location=None, dimension="height", line_dash="dashed") numfit_min_span = Span(location=None, dimension="height", line_dash="dashed")
plot.add_layout(numfit_min_span) plot.add_layout(numfit_min_span)
@ -290,17 +255,16 @@ def create():
_update_plot(det_data[new[0]]) _update_plot(det_data[new[0]])
scan_table_source = ColumnDataSource(dict(scan=[], hkl=[], peaks=[], fit=[], export=[])) scan_table_source = ColumnDataSource(dict(scan=[], hkl=[], fit=[], export=[]))
scan_table = DataTable( scan_table = DataTable(
source=scan_table_source, source=scan_table_source,
columns=[ columns=[
TableColumn(field="scan", title="Scan", width=50), TableColumn(field="scan", title="Scan", width=50),
TableColumn(field="hkl", title="hkl", width=100), TableColumn(field="hkl", title="hkl", width=100),
TableColumn(field="peaks", title="Peaks", width=50),
TableColumn(field="fit", title="Fit", width=50), TableColumn(field="fit", title="Fit", width=50),
TableColumn(field="export", title="Export", editor=CheckboxEditor(), width=50), TableColumn(field="export", title="Export", editor=CheckboxEditor(), width=50),
], ],
width=360, # +60 because of the index column width=310, # +60 because of the index column
fit_columns=False, fit_columns=False,
editable=True, editable=True,
) )
@ -327,27 +291,6 @@ def create():
merge_button = Button(label="Merge scans", width=145) merge_button = Button(label="Merge scans", width=145)
merge_button.on_click(merge_button_callback) merge_button.on_click(merge_button_callback)
def peak_pos_textinput_callback(_attr, _old, new):
if new is not None and not peak_pos_textinput_lock:
scan = _get_selected_scan()
peak_ind = (np.abs(scan["omega"] - float(new))).argmin()
scan["peak_indexes"] = np.array([peak_ind], dtype=np.int64)
scan["peak_heights"] = np.array([scan["smooth_peaks"][peak_ind]])
_update_table()
_update_plot(scan)
peak_pos_textinput = TextInput(title="Peak position:", default_size=145)
peak_pos_textinput.on_change("value", peak_pos_textinput_callback)
peak_int_ratio_spinner = Spinner(
title="Peak intensity ratio:", value=0.8, step=0.01, low=0, high=1, default_size=145
)
peak_prominence_spinner = Spinner(title="Peak prominence:", value=50, low=0, default_size=145)
smooth_toggle = Toggle(label="Smooth curve", default_size=145)
window_size_spinner = Spinner(title="Window size:", value=7, step=2, low=1, default_size=145)
poly_order_spinner = Spinner(title="Poly order:", value=3, low=0, default_size=145)
integ_from = Spinner(title="Integrate from:", default_size=145, disabled=True) integ_from = Spinner(title="Integrate from:", default_size=145, disabled=True)
integ_to = Spinner(title="to:", default_size=145, disabled=True) integ_to = Spinner(title="to:", default_size=145, disabled=True)
@ -456,36 +399,6 @@ def create():
fit_output_textinput = TextAreaInput(title="Fit results:", width=450, height=200) fit_output_textinput = TextAreaInput(title="Fit results:", width=450, height=200)
def _get_peakfind_params():
return dict(
int_threshold=peak_int_ratio_spinner.value,
prominence=peak_prominence_spinner.value,
smooth=smooth_toggle.active,
window_size=window_size_spinner.value,
poly_order=poly_order_spinner.value,
)
def peakfind_all_button_callback():
peakfind_params = _get_peakfind_params()
for scan in det_data:
pyzebra.ccl_findpeaks(scan, **peakfind_params)
_update_table()
_update_plot(_get_selected_scan())
peakfind_all_button = Button(label="Peak Find All", button_type="primary", default_size=145)
peakfind_all_button.on_click(peakfind_all_button_callback)
def peakfind_button_callback():
scan = _get_selected_scan()
pyzebra.ccl_findpeaks(scan, **_get_peakfind_params())
_update_table()
_update_plot(scan)
peakfind_button = Button(label="Peak Find Current", default_size=145)
peakfind_button.on_click(peakfind_button_callback)
def fit_all_button_callback(): def fit_all_button_callback():
for scan in det_data: for scan in det_data:
pyzebra.fit_scan(scan, fit_params) pyzebra.fit_scan(scan, fit_params)
@ -581,13 +494,6 @@ def create():
save_button.js_on_click(CustomJS(args={"js_data": js_data[".comm"]}, code=javaScript)) save_button.js_on_click(CustomJS(args={"js_data": js_data[".comm"]}, code=javaScript))
save_button.js_on_click(CustomJS(args={"js_data": js_data[".incomm"]}, code=javaScript)) save_button.js_on_click(CustomJS(args={"js_data": js_data[".incomm"]}, code=javaScript))
findpeak_controls = column(
row(peak_pos_textinput, column(Spacer(height=19), smooth_toggle)),
row(peak_int_ratio_spinner, peak_prominence_spinner),
row(window_size_spinner, poly_order_spinner),
row(peakfind_button, peakfind_all_button),
)
fitpeak_controls = row( fitpeak_controls = row(
column(fitparams_add_dropdown, fitparams_select, fitparams_remove_button), column(fitparams_add_dropdown, fitparams_select, fitparams_remove_button),
fitparams_table, fitparams_table,
@ -623,7 +529,7 @@ def create():
monitor_spinner, monitor_spinner,
), ),
row(scan_layout, plot, Spacer(width=30), export_layout), row(scan_layout, plot, Spacer(width=30), export_layout),
row(findpeak_controls, Spacer(width=30), fitpeak_controls, fit_output_textinput), row(fitpeak_controls, fit_output_textinput),
) )
return Panel(child=tab_layout, title="ccl integrate") return Panel(child=tab_layout, title="ccl integrate")

View File

@ -9,7 +9,6 @@ from copy import deepcopy
import numpy as np import numpy as np
from bokeh.layouts import column, row from bokeh.layouts import column, row
from bokeh.models import ( from bokeh.models import (
Asterisk,
BasicTicker, BasicTicker,
Button, Button,
CheckboxEditor, CheckboxEditor,
@ -77,7 +76,6 @@ def color_palette(n_colors):
def create(): def create():
det_data = [] det_data = []
fit_params = {} fit_params = {}
peak_pos_textinput_lock = False
js_data = { js_data = {
".comm": ColumnDataSource(data=dict(cont=[], ext=[])), ".comm": ColumnDataSource(data=dict(cont=[], ext=[])),
".incomm": ColumnDataSource(data=dict(cont=[], ext=[])), ".incomm": ColumnDataSource(data=dict(cont=[], ext=[])),
@ -106,7 +104,6 @@ def create():
file=file_list, file=file_list,
scan=scan_list, scan=scan_list,
param=[""] * len(scan_list), param=[""] * len(scan_list),
peaks=[0] * len(scan_list),
fit=[0] * len(scan_list), fit=[0] * len(scan_list),
export=[True] * len(scan_list), export=[True] * len(scan_list),
) )
@ -189,18 +186,14 @@ def create():
monitor_spinner.on_change("value", monitor_spinner_callback) monitor_spinner.on_change("value", monitor_spinner_callback)
def _update_table(): def _update_table():
num_of_peaks = [len(scan.get("peak_indexes", [])) for scan in det_data]
fit_ok = [(1 if "fit" in scan else 0) for scan in det_data] fit_ok = [(1 if "fit" in scan else 0) for scan in det_data]
scan_table_source.data.update(peaks=num_of_peaks, fit=fit_ok) scan_table_source.data.update(fit=fit_ok)
def _update_plot(): def _update_plot():
_update_single_scan_plot(_get_selected_scan()) _update_single_scan_plot(_get_selected_scan())
_update_overview() _update_overview()
def _update_single_scan_plot(scan): def _update_single_scan_plot(scan):
nonlocal peak_pos_textinput_lock
peak_pos_textinput_lock = True
scan_motor = scan["scan_motor"] scan_motor = scan["scan_motor"]
y = scan["Counts"] y = scan["Counts"]
@ -209,23 +202,6 @@ def create():
plot.axis[0].axis_label = scan_motor plot.axis[0].axis_label = scan_motor
plot_scatter_source.data.update(x=x, y=y, y_upper=y + np.sqrt(y), y_lower=y - np.sqrt(y)) plot_scatter_source.data.update(x=x, y=y, y_upper=y + np.sqrt(y), y_lower=y - np.sqrt(y))
num_of_peaks = len(scan.get("peak_indexes", []))
if num_of_peaks is not None and num_of_peaks > 0:
peak_indexes = scan["peak_indexes"]
if len(peak_indexes) == 1:
peak_pos_textinput.value = str(x[peak_indexes[0]])
else:
peak_pos_textinput.value = str([x[ind] for ind in peak_indexes])
plot_peak_source.data.update(x=x[peak_indexes], y=scan["peak_heights"])
plot_line_smooth_source.data.update(x=x, y=scan["smooth_peaks"])
else:
peak_pos_textinput.value = None
plot_peak_source.data.update(x=[], y=[])
plot_line_smooth_source.data.update(x=[], y=[])
peak_pos_textinput_lock = False
fit = scan.get("fit") fit = scan.get("fit")
if fit is not None: if fit is not None:
comps = fit.eval_components() comps = fit.eval_components()
@ -290,11 +266,6 @@ def create():
plot.add_glyph(plot_scatter_source, Scatter(x="x", y="y", line_color="steelblue")) plot.add_glyph(plot_scatter_source, Scatter(x="x", y="y", line_color="steelblue"))
plot.add_layout(Whisker(source=plot_scatter_source, base="x", upper="y_upper", lower="y_lower")) plot.add_layout(Whisker(source=plot_scatter_source, base="x", upper="y_upper", lower="y_lower"))
plot_line_smooth_source = ColumnDataSource(dict(x=[0], y=[0]))
plot.add_glyph(
plot_line_smooth_source, Line(x="x", y="y", line_color="steelblue", line_dash="dashed"),
)
plot_gauss_source = ColumnDataSource(dict(x=[0], y=[0])) plot_gauss_source = ColumnDataSource(dict(x=[0], y=[0]))
plot.add_glyph( plot.add_glyph(
plot_gauss_source, Line(x="x", y="y", line_color="red", line_dash="dashed"), plot_gauss_source, Line(x="x", y="y", line_color="red", line_dash="dashed"),
@ -305,9 +276,6 @@ def create():
plot_bkg_source, Line(x="x", y="y", line_color="green", line_dash="dashed"), plot_bkg_source, Line(x="x", y="y", line_color="green", line_dash="dashed"),
) )
plot_peak_source = ColumnDataSource(dict(x=[], y=[]))
plot.add_glyph(plot_peak_source, Asterisk(x="x", y="y", size=10, line_color="red"))
numfit_min_span = Span(location=None, dimension="height", line_dash="dashed") numfit_min_span = Span(location=None, dimension="height", line_dash="dashed")
plot.add_layout(numfit_min_span) plot.add_layout(numfit_min_span)
@ -380,20 +348,17 @@ def create():
_update_plot() _update_plot()
scan_table_source = ColumnDataSource( scan_table_source = ColumnDataSource(dict(file=[], scan=[], param=[], fit=[], export=[]))
dict(file=[], scan=[], param=[], peaks=[], fit=[], export=[])
)
scan_table = DataTable( scan_table = DataTable(
source=scan_table_source, source=scan_table_source,
columns=[ columns=[
TableColumn(field="file", title="file", width=150), TableColumn(field="file", title="file", width=150),
TableColumn(field="scan", title="scan", width=50), TableColumn(field="scan", title="scan", width=50),
TableColumn(field="param", title="param", width=50), TableColumn(field="param", title="param", width=50),
TableColumn(field="peaks", title="Peaks", width=50),
TableColumn(field="fit", title="Fit", width=50), TableColumn(field="fit", title="Fit", width=50),
TableColumn(field="export", title="Export", editor=CheckboxEditor(), width=50), TableColumn(field="export", title="Export", editor=CheckboxEditor(), width=50),
], ],
width=460, # +60 because of the index column width=410, # +60 because of the index column
editable=True, editable=True,
fit_columns=False, fit_columns=False,
) )
@ -408,27 +373,6 @@ def create():
def _get_selected_scan(): def _get_selected_scan():
return det_data[scan_table_source.selected.indices[0]] return det_data[scan_table_source.selected.indices[0]]
def peak_pos_textinput_callback(_attr, _old, new):
if new is not None and not peak_pos_textinput_lock:
scan = _get_selected_scan()
peak_ind = (np.abs(scan["omega"] - float(new))).argmin()
scan["peak_indexes"] = np.array([peak_ind], dtype=np.int64)
scan["peak_heights"] = np.array([scan["smooth_peaks"][peak_ind]])
_update_table()
_update_plot()
peak_pos_textinput = TextInput(title="Peak position:", default_size=145)
peak_pos_textinput.on_change("value", peak_pos_textinput_callback)
peak_int_ratio_spinner = Spinner(
title="Peak intensity ratio:", value=0.8, step=0.01, low=0, high=1, default_size=145
)
peak_prominence_spinner = Spinner(title="Peak prominence:", value=50, low=0, default_size=145)
smooth_toggle = Toggle(label="Smooth curve", default_size=145)
window_size_spinner = Spinner(title="Window size:", value=7, step=2, low=1, default_size=145)
poly_order_spinner = Spinner(title="Poly order:", value=3, low=0, default_size=145)
integ_from = Spinner(title="Integrate from:", default_size=145, disabled=True) integ_from = Spinner(title="Integrate from:", default_size=145, disabled=True)
integ_to = Spinner(title="to:", default_size=145, disabled=True) integ_to = Spinner(title="to:", default_size=145, disabled=True)
@ -537,36 +481,6 @@ def create():
fit_output_textinput = TextAreaInput(title="Fit results:", width=450, height=200) fit_output_textinput = TextAreaInput(title="Fit results:", width=450, height=200)
def _get_peakfind_params():
return dict(
int_threshold=peak_int_ratio_spinner.value,
prominence=peak_prominence_spinner.value,
smooth=smooth_toggle.active,
window_size=window_size_spinner.value,
poly_order=poly_order_spinner.value,
)
def peakfind_all_button_callback():
peakfind_params = _get_peakfind_params()
for scan in det_data:
pyzebra.ccl_findpeaks(scan, **peakfind_params)
_update_table()
_update_plot()
peakfind_all_button = Button(label="Peak Find All", button_type="primary", default_size=145)
peakfind_all_button.on_click(peakfind_all_button_callback)
def peakfind_button_callback():
scan = _get_selected_scan()
pyzebra.ccl_findpeaks(scan, **_get_peakfind_params())
_update_table()
_update_plot()
peakfind_button = Button(label="Peak Find Current", default_size=145)
peakfind_button.on_click(peakfind_button_callback)
def fit_all_button_callback(): def fit_all_button_callback():
for scan in det_data: for scan in det_data:
pyzebra.fit_scan(scan, fit_params) pyzebra.fit_scan(scan, fit_params)
@ -656,13 +570,6 @@ def create():
save_button.js_on_click(CustomJS(args={"js_data": js_data[".comm"]}, code=javaScript)) save_button.js_on_click(CustomJS(args={"js_data": js_data[".comm"]}, code=javaScript))
save_button.js_on_click(CustomJS(args={"js_data": js_data[".incomm"]}, code=javaScript)) save_button.js_on_click(CustomJS(args={"js_data": js_data[".incomm"]}, code=javaScript))
findpeak_controls = column(
row(peak_pos_textinput, column(Spacer(height=19), smooth_toggle)),
row(peak_int_ratio_spinner, peak_prominence_spinner),
row(window_size_spinner, poly_order_spinner),
row(peakfind_button, peakfind_all_button),
)
fitpeak_controls = row( fitpeak_controls = row(
column(fitparams_add_dropdown, fitparams_select, fitparams_remove_button), column(fitparams_add_dropdown, fitparams_select, fitparams_remove_button),
fitparams_table, fitparams_table,
@ -688,7 +595,7 @@ def create():
monitor_spinner, monitor_spinner,
), ),
row(scan_table, plots, Spacer(width=30), export_layout), row(scan_table, plots, Spacer(width=30), export_layout),
row(findpeak_controls, Spacer(width=30), fitpeak_controls, fit_output_textinput), row(fitpeak_controls, fit_output_textinput),
) )
return Panel(child=tab_layout, title="param study") return Panel(child=tab_layout, title="param study")

View File

@ -1,75 +0,0 @@
import numpy as np
import scipy as sc
from scipy.interpolate import interp1d
from scipy.signal import savgol_filter
def ccl_findpeaks(
scan,
int_threshold=0.8,
prominence=50,
smooth=False,
window_size=7,
poly_order=3,
variable="omega",
):
"""function iterates through the dictionary created by load_cclv2 and locates peaks for each scan
args: scan - a single scan,
int_threshold - fraction of threshold_intensity/max_intensity, must be positive num between 0 and 1
i.e. will only detect peaks above 75% of max intensity
prominence - defines a drop of values that must be between two peaks, must be positive number
i.e. if promimence is 20, it will detect two neigbouring peaks of 300 and 310 intesities,
if none of the itermediate values are lower that 290
smooth - if true, smooths data by savitzky golay filter, if false - no smoothing
window_size - window size for savgol filter, must be odd positive integer
poly_order = order of the polynomial used in savgol filter, must be positive integer smaller than
"""
if not 0 <= int_threshold <= 1:
int_threshold = 0.8
print(
"Invalid value for int_threshold, select value between 0 and 1, new value set to:",
int_threshold,
)
if not isinstance(window_size, int) or (window_size % 2) == 0 or window_size <= 1:
window_size = 7
print(
"Invalid value for window_size, select positive odd integer, new value set to!:",
window_size,
)
if not isinstance(poly_order, int) or window_size < poly_order:
poly_order = 3
print(
"Invalid value for poly_order, select positive integer smaller than window_size, new value set to:",
poly_order,
)
if not isinstance(prominence, (int, float)) and prominence < 0:
prominence = 50
print("Invalid value for prominence, select positive number, new value set to:", prominence)
omega = scan[variable]
counts = np.array(scan["Counts"])
if smooth:
itp = interp1d(omega, counts, kind="linear")
absintensity = [abs(number) for number in counts]
lowest_intensity = min(absintensity)
counts[counts < 0] = lowest_intensity
smooth_peaks = savgol_filter(itp(omega), window_size, poly_order)
else:
smooth_peaks = counts
peaks, properties = sc.signal.find_peaks(
smooth_peaks, height=int_threshold * max(smooth_peaks), prominence=prominence
)
scan["peak_indexes"] = peaks
scan["peak_heights"] = properties["peak_heights"]
scan["smooth_peaks"] = smooth_peaks # smoothed curve