Add code for 1D detector #69
13
.vscode/launch.json
vendored
Normal file
13
.vscode/launch.json
vendored
Normal file
@ -0,0 +1,13 @@
|
|||||||
|
{
|
||||||
|
"version": "0.2.0",
|
||||||
|
"configurations": [
|
||||||
|
{
|
||||||
|
"name": "pyzebra",
|
||||||
|
"type": "python",
|
||||||
|
"request": "launch",
|
||||||
|
"program": "${workspaceFolder}/pyzebra/cli.py",
|
||||||
|
"console": "internalConsole",
|
||||||
|
"env": {},
|
||||||
|
},
|
||||||
|
]
|
||||||
|
}
|
@ -1,3 +1,8 @@
|
|||||||
|
import pyzebra.ccl_dict_operation
|
||||||
from pyzebra.anatric import *
|
from pyzebra.anatric import *
|
||||||
|
from pyzebra.ccl_findpeaks import ccl_findpeaks
|
||||||
|
from pyzebra.comm_export import export_comm
|
||||||
|
from pyzebra.fit2 import fitccl
|
||||||
from pyzebra.h5 import *
|
from pyzebra.h5 import *
|
||||||
|
from pyzebra.load_1D import load_1D, parse_1D
|
||||||
from pyzebra.xtal import *
|
from pyzebra.xtal import *
|
||||||
|
@ -1,10 +1,15 @@
|
|||||||
import argparse
|
import argparse
|
||||||
|
import logging
|
||||||
|
import sys
|
||||||
|
from io import StringIO
|
||||||
|
|
||||||
from bokeh.io import curdoc
|
from bokeh.io import curdoc
|
||||||
from bokeh.models import Tabs
|
from bokeh.layouts import column, row
|
||||||
|
from bokeh.models import Tabs, TextAreaInput
|
||||||
|
|
||||||
import panel_anatric
|
import panel_ccl_integrate
|
||||||
import panel_data_viewer
|
import panel_hdf_anatric
|
||||||
|
import panel_hdf_viewer
|
||||||
|
|
||||||
parser = argparse.ArgumentParser(
|
parser = argparse.ArgumentParser(
|
||||||
prog="pyzebra", formatter_class=argparse.ArgumentDefaultsHelpFormatter
|
prog="pyzebra", formatter_class=argparse.ArgumentDefaultsHelpFormatter
|
||||||
@ -15,8 +20,32 @@ args = parser.parse_args()
|
|||||||
doc = curdoc()
|
doc = curdoc()
|
||||||
doc.title = "pyzebra"
|
doc.title = "pyzebra"
|
||||||
|
|
||||||
# Final layout
|
sys.stdout = StringIO()
|
||||||
tab_data_viewer = panel_data_viewer.create()
|
stdout_textareainput = TextAreaInput(title="print output:", height=150)
|
||||||
tab_anatric = panel_anatric.create()
|
|
||||||
|
|
||||||
doc.add_root(Tabs(tabs=[tab_data_viewer, tab_anatric]))
|
bokeh_stream = StringIO()
|
||||||
|
bokeh_handler = logging.StreamHandler(bokeh_stream)
|
||||||
|
bokeh_handler.setFormatter(logging.Formatter(logging.BASIC_FORMAT))
|
||||||
|
bokeh_logger = logging.getLogger('bokeh')
|
||||||
|
bokeh_logger.addHandler(bokeh_handler)
|
||||||
|
bokeh_log_textareainput = TextAreaInput(title="server output:", height=150)
|
||||||
|
|
||||||
|
# Final layout
|
||||||
|
tab_hdf_viewer = panel_hdf_viewer.create()
|
||||||
|
tab_hdf_anatric = panel_hdf_anatric.create()
|
||||||
|
tab_ccl_integrate = panel_ccl_integrate.create()
|
||||||
|
|
||||||
|
doc.add_root(
|
||||||
|
column(
|
||||||
|
Tabs(tabs=[tab_hdf_viewer, tab_hdf_anatric, tab_ccl_integrate]),
|
||||||
|
row(stdout_textareainput, bokeh_log_textareainput, sizing_mode="scale_both"),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def update_stdout():
|
||||||
|
stdout_textareainput.value = sys.stdout.getvalue()
|
||||||
|
bokeh_log_textareainput.value = bokeh_stream.getvalue()
|
||||||
|
|
||||||
|
|
||||||
|
doc.add_periodic_callback(update_stdout, 1000)
|
||||||
|
548
pyzebra/app/panel_ccl_integrate.py
Normal file
548
pyzebra/app/panel_ccl_integrate.py
Normal file
@ -0,0 +1,548 @@
|
|||||||
|
import base64
|
||||||
|
import io
|
||||||
|
import os
|
||||||
|
import tempfile
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
from bokeh.layouts import column, row
|
||||||
|
from bokeh.models import (
|
||||||
|
Asterisk,
|
||||||
|
BasicTicker,
|
||||||
|
Button,
|
||||||
|
ColumnDataSource,
|
||||||
|
CustomJS,
|
||||||
|
DataRange1d,
|
||||||
|
DataTable,
|
||||||
|
Div,
|
||||||
|
FileInput,
|
||||||
|
Grid,
|
||||||
|
Line,
|
||||||
|
LinearAxis,
|
||||||
|
Panel,
|
||||||
|
Plot,
|
||||||
|
RadioButtonGroup,
|
||||||
|
Scatter,
|
||||||
|
Select,
|
||||||
|
Spacer,
|
||||||
|
Span,
|
||||||
|
Spinner,
|
||||||
|
TableColumn,
|
||||||
|
TextAreaInput,
|
||||||
|
TextInput,
|
||||||
|
Toggle,
|
||||||
|
Whisker,
|
||||||
|
)
|
||||||
|
|
||||||
|
import pyzebra
|
||||||
|
|
||||||
|
|
||||||
|
javaScript = """
|
||||||
|
setTimeout(function() {
|
||||||
|
const filename = 'output' + js_data.data['ext']
|
||||||
|
const blob = new Blob([js_data.data['cont']], {type: 'text/plain'})
|
||||||
|
const link = document.createElement('a');
|
||||||
|
document.body.appendChild(link);
|
||||||
|
const url = window.URL.createObjectURL(blob);
|
||||||
|
link.href = url;
|
||||||
|
link.download = filename;
|
||||||
|
link.click();
|
||||||
|
window.URL.revokeObjectURL(url);
|
||||||
|
document.body.removeChild(link);
|
||||||
|
}, 500);
|
||||||
|
"""
|
||||||
|
|
||||||
|
PROPOSAL_PATH = "/afs/psi.ch/project/sinqdata/2020/zebra/"
|
||||||
|
|
||||||
|
|
||||||
|
def create():
|
||||||
|
det_data = {}
|
||||||
|
peak_pos_textinput_lock = False
|
||||||
|
js_data = ColumnDataSource(data=dict(cont=[], ext=[]))
|
||||||
|
|
||||||
|
def proposal_textinput_callback(_attr, _old, new):
|
||||||
|
ccl_path = os.path.join(PROPOSAL_PATH, new)
|
||||||
|
ccl_file_list = []
|
||||||
|
for file in os.listdir(ccl_path):
|
||||||
|
if file.endswith(".ccl"):
|
||||||
|
ccl_file_list.append((os.path.join(ccl_path, file), file))
|
||||||
|
ccl_file_select.options = ccl_file_list
|
||||||
|
ccl_file_select.value = ccl_file_list[0][0]
|
||||||
|
|
||||||
|
proposal_textinput = TextInput(title="Enter proposal number:", default_size=145)
|
||||||
|
proposal_textinput.on_change("value", proposal_textinput_callback)
|
||||||
|
|
||||||
|
def ccl_file_select_callback(_attr, _old, new):
|
||||||
|
nonlocal det_data
|
||||||
|
with open(new) as file:
|
||||||
|
_, ext = os.path.splitext(new)
|
||||||
|
det_data = pyzebra.parse_1D(file, ext)
|
||||||
|
|
||||||
|
scan_list = list(det_data["scan"].keys())
|
||||||
|
hkl = [
|
||||||
|
f'{int(m["h_index"])} {int(m["k_index"])} {int(m["l_index"])}'
|
||||||
|
for m in det_data["scan"].values()
|
||||||
|
]
|
||||||
|
scan_table_source.data.update(
|
||||||
|
scan=scan_list, hkl=hkl, peaks=[0] * len(scan_list), fit=[0] * len(scan_list)
|
||||||
|
)
|
||||||
|
scan_table_source.selected.indices = []
|
||||||
|
scan_table_source.selected.indices = [0]
|
||||||
|
|
||||||
|
ccl_file_select = Select(title="Available .ccl files")
|
||||||
|
ccl_file_select.on_change("value", ccl_file_select_callback)
|
||||||
|
|
||||||
|
def upload_button_callback(_attr, _old, new):
|
||||||
|
nonlocal det_data
|
||||||
|
with io.StringIO(base64.b64decode(new).decode()) as file:
|
||||||
|
_, ext = os.path.splitext(upload_button.filename)
|
||||||
|
det_data = pyzebra.parse_1D(file, ext)
|
||||||
|
|
||||||
|
scan_list = list(det_data["scan"].keys())
|
||||||
|
hkl = [
|
||||||
|
f'{int(m["h_index"])} {int(m["k_index"])} {int(m["l_index"])}'
|
||||||
|
for m in det_data["scan"].values()
|
||||||
|
]
|
||||||
|
scan_table_source.data.update(
|
||||||
|
scan=scan_list, hkl=hkl, peaks=[0] * len(scan_list), fit=[0] * len(scan_list)
|
||||||
|
)
|
||||||
|
scan_table_source.selected.indices = []
|
||||||
|
scan_table_source.selected.indices = [0]
|
||||||
|
|
||||||
|
upload_button = FileInput(accept=".ccl")
|
||||||
|
upload_button.on_change("value", upload_button_callback)
|
||||||
|
|
||||||
|
def _update_table():
|
||||||
|
num_of_peaks = [scan.get("num_of_peaks", 0) for scan in det_data["scan"].values()]
|
||||||
|
fit_ok = [(1 if "fit" in scan else 0) for scan in det_data["scan"].values()]
|
||||||
|
scan_table_source.data.update(peaks=num_of_peaks, fit=fit_ok)
|
||||||
|
|
||||||
|
def _update_plot(ind):
|
||||||
|
nonlocal peak_pos_textinput_lock
|
||||||
|
peak_pos_textinput_lock = True
|
||||||
|
|
||||||
|
scan = det_data["scan"][ind]
|
||||||
|
y = scan["Counts"]
|
||||||
|
x = scan["om"]
|
||||||
|
|
||||||
|
plot_scatter_source.data.update(x=x, y=y, y_upper=y + np.sqrt(y), y_lower=y - np.sqrt(y))
|
||||||
|
|
||||||
|
num_of_peaks = scan.get("num_of_peaks")
|
||||||
|
if num_of_peaks is not None and num_of_peaks > 0:
|
||||||
|
peak_indexes = scan["peak_indexes"]
|
||||||
|
if len(peak_indexes) == 1:
|
||||||
|
peak_pos_textinput.value = str(scan["om"][peak_indexes[0]])
|
||||||
|
else:
|
||||||
|
peak_pos_textinput.value = str([scan["om"][ind] for ind in peak_indexes])
|
||||||
|
|
||||||
|
plot_peak_source.data.update(x=scan["om"][peak_indexes], y=scan["peak_heights"])
|
||||||
|
plot_line_smooth_source.data.update(x=x, y=scan["smooth_peaks"])
|
||||||
|
else:
|
||||||
|
peak_pos_textinput.value = None
|
||||||
|
plot_peak_source.data.update(x=[], y=[])
|
||||||
|
plot_line_smooth_source.data.update(x=[], y=[])
|
||||||
|
|
||||||
|
peak_pos_textinput_lock = False
|
||||||
|
|
||||||
|
fit = scan.get("fit")
|
||||||
|
if fit is not None:
|
||||||
|
plot_gauss_source.data.update(x=x, y=scan["fit"]["comps"]["gaussian"])
|
||||||
|
plot_bkg_source.data.update(x=x, y=scan["fit"]["comps"]["background"])
|
||||||
|
params = fit["result"].params
|
||||||
|
fit_output_textinput.value = (
|
||||||
|
"%s \n"
|
||||||
|
"Gaussian: centre = %9.4f, sigma = %9.4f, area = %9.4f \n"
|
||||||
|
"background: slope = %9.4f, intercept = %9.4f \n"
|
||||||
|
"Int. area = %9.4f +/- %9.4f \n"
|
||||||
|
"fit area = %9.4f +/- %9.4f \n"
|
||||||
|
"ratio((fit-int)/fit) = %9.4f"
|
||||||
|
% (
|
||||||
|
ind,
|
||||||
|
params["g_cen"].value,
|
||||||
|
params["g_width"].value,
|
||||||
|
params["g_amp"].value,
|
||||||
|
params["slope"].value,
|
||||||
|
params["intercept"].value,
|
||||||
|
fit["int_area"].n,
|
||||||
|
fit["int_area"].s,
|
||||||
|
params["g_amp"].value,
|
||||||
|
params["g_amp"].stderr,
|
||||||
|
(params["g_amp"].value - fit["int_area"].n) / params["g_amp"].value,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
numfit_min, numfit_max = fit["numfit"]
|
||||||
|
if numfit_min is None:
|
||||||
|
numfit_min_span.location = None
|
||||||
|
else:
|
||||||
|
numfit_min_span.location = x[numfit_min]
|
||||||
|
|
||||||
|
if numfit_max is None:
|
||||||
|
numfit_max_span.location = None
|
||||||
|
else:
|
||||||
|
numfit_max_span.location = x[numfit_max]
|
||||||
|
|
||||||
|
else:
|
||||||
|
plot_gauss_source.data.update(x=[], y=[])
|
||||||
|
plot_bkg_source.data.update(x=[], y=[])
|
||||||
|
fit_output_textinput.value = ""
|
||||||
|
numfit_min_span.location = None
|
||||||
|
numfit_max_span.location = None
|
||||||
|
|
||||||
|
# Main plot
|
||||||
|
plot = Plot(
|
||||||
|
x_range=DataRange1d(),
|
||||||
|
y_range=DataRange1d(),
|
||||||
|
plot_height=400,
|
||||||
|
plot_width=700,
|
||||||
|
toolbar_location=None,
|
||||||
|
)
|
||||||
|
|
||||||
|
plot.add_layout(LinearAxis(axis_label="Counts"), place="left")
|
||||||
|
plot.add_layout(LinearAxis(axis_label="Omega"), place="below")
|
||||||
|
|
||||||
|
plot.add_layout(Grid(dimension=0, ticker=BasicTicker()))
|
||||||
|
plot.add_layout(Grid(dimension=1, ticker=BasicTicker()))
|
||||||
|
|
||||||
|
plot_scatter_source = ColumnDataSource(dict(x=[0], y=[0], y_upper=[0], y_lower=[0]))
|
||||||
|
plot.add_glyph(plot_scatter_source, Scatter(x="x", y="y", line_color="steelblue"))
|
||||||
|
plot.add_layout(Whisker(source=plot_scatter_source, base="x", upper="y_upper", lower="y_lower"))
|
||||||
|
|
||||||
|
plot_line_smooth_source = ColumnDataSource(dict(x=[0], y=[0]))
|
||||||
|
plot.add_glyph(
|
||||||
|
plot_line_smooth_source, Line(x="x", y="y", line_color="steelblue", line_dash="dashed")
|
||||||
|
)
|
||||||
|
|
||||||
|
plot_gauss_source = ColumnDataSource(dict(x=[0], y=[0]))
|
||||||
|
plot.add_glyph(plot_gauss_source, Line(x="x", y="y", line_color="red", line_dash="dashed"))
|
||||||
|
|
||||||
|
plot_bkg_source = ColumnDataSource(dict(x=[0], y=[0]))
|
||||||
|
plot.add_glyph(plot_bkg_source, Line(x="x", y="y", line_color="green", line_dash="dashed"))
|
||||||
|
|
||||||
|
plot_peak_source = ColumnDataSource(dict(x=[], y=[]))
|
||||||
|
plot.add_glyph(plot_peak_source, Asterisk(x="x", y="y", size=10, line_color="red"))
|
||||||
|
|
||||||
|
numfit_min_span = Span(location=None, dimension="height", line_dash="dashed")
|
||||||
|
plot.add_layout(numfit_min_span)
|
||||||
|
|
||||||
|
numfit_max_span = Span(location=None, dimension="height", line_dash="dashed")
|
||||||
|
plot.add_layout(numfit_max_span)
|
||||||
|
|
||||||
|
# Scan select
|
||||||
|
def scan_table_callback(_attr, _old, new):
|
||||||
|
if new:
|
||||||
|
_update_plot(scan_table_source.data["scan"][new[-1]])
|
||||||
|
|
||||||
|
scan_table_source = ColumnDataSource(dict(scan=[], hkl=[], peaks=[], fit=[]))
|
||||||
|
scan_table = DataTable(
|
||||||
|
source=scan_table_source,
|
||||||
|
columns=[
|
||||||
|
TableColumn(field="scan", title="scan"),
|
||||||
|
TableColumn(field="hkl", title="hkl"),
|
||||||
|
TableColumn(field="peaks", title="Peaks"),
|
||||||
|
TableColumn(field="fit", title="Fit"),
|
||||||
|
],
|
||||||
|
width=200,
|
||||||
|
index_position=None,
|
||||||
|
)
|
||||||
|
|
||||||
|
scan_table_source.selected.on_change("indices", scan_table_callback)
|
||||||
|
|
||||||
|
def peak_pos_textinput_callback(_attr, _old, new):
|
||||||
|
if new is not None and not peak_pos_textinput_lock:
|
||||||
|
sel_ind = scan_table_source.selected.indices[-1]
|
||||||
|
scan_name = scan_table_source.data["scan"][sel_ind]
|
||||||
|
scan = det_data["scan"][scan_name]
|
||||||
|
|
||||||
|
scan["num_of_peaks"] = 1
|
||||||
|
peak_ind = (np.abs(scan["om"] - float(new))).argmin()
|
||||||
|
scan["peak_indexes"] = np.array([peak_ind], dtype=np.int64)
|
||||||
|
scan["peak_heights"] = np.array([scan["smooth_peaks"][peak_ind]])
|
||||||
|
_update_table()
|
||||||
|
_update_plot(scan_name)
|
||||||
|
|
||||||
|
peak_pos_textinput = TextInput(title="Peak position:", default_size=145)
|
||||||
|
peak_pos_textinput.on_change("value", peak_pos_textinput_callback)
|
||||||
|
|
||||||
|
peak_int_ratio_spinner = Spinner(
|
||||||
|
title="Peak intensity ratio:", value=0.8, step=0.01, low=0, high=1, default_size=145
|
||||||
|
)
|
||||||
|
peak_prominence_spinner = Spinner(title="Peak prominence:", value=50, low=0, default_size=145)
|
||||||
|
smooth_toggle = Toggle(label="Smooth curve", default_size=145)
|
||||||
|
window_size_spinner = Spinner(title="Window size:", value=7, step=2, low=1, default_size=145)
|
||||||
|
poly_order_spinner = Spinner(title="Poly order:", value=3, low=0, default_size=145)
|
||||||
|
|
||||||
|
centre_guess = Spinner(default_size=100)
|
||||||
|
centre_vary = Toggle(default_size=100, active=True)
|
||||||
|
centre_min = Spinner(default_size=100)
|
||||||
|
centre_max = Spinner(default_size=100)
|
||||||
|
sigma_guess = Spinner(default_size=100)
|
||||||
|
sigma_vary = Toggle(default_size=100, active=True)
|
||||||
|
sigma_min = Spinner(default_size=100)
|
||||||
|
sigma_max = Spinner(default_size=100)
|
||||||
|
ampl_guess = Spinner(default_size=100)
|
||||||
|
ampl_vary = Toggle(default_size=100, active=True)
|
||||||
|
ampl_min = Spinner(default_size=100)
|
||||||
|
ampl_max = Spinner(default_size=100)
|
||||||
|
slope_guess = Spinner(default_size=100)
|
||||||
|
slope_vary = Toggle(default_size=100, active=True)
|
||||||
|
slope_min = Spinner(default_size=100)
|
||||||
|
slope_max = Spinner(default_size=100)
|
||||||
|
offset_guess = Spinner(default_size=100)
|
||||||
|
offset_vary = Toggle(default_size=100, active=True)
|
||||||
|
offset_min = Spinner(default_size=100)
|
||||||
|
offset_max = Spinner(default_size=100)
|
||||||
|
integ_from = Spinner(title="Integrate from:", default_size=145)
|
||||||
|
integ_to = Spinner(title="to:", default_size=145)
|
||||||
|
|
||||||
|
def fitparam_reset_button_callback():
|
||||||
|
centre_guess.value = None
|
||||||
|
centre_vary.active = True
|
||||||
|
centre_min.value = None
|
||||||
|
centre_max.value = None
|
||||||
|
sigma_guess.value = None
|
||||||
|
sigma_vary.active = True
|
||||||
|
sigma_min.value = None
|
||||||
|
sigma_max.value = None
|
||||||
|
ampl_guess.value = None
|
||||||
|
ampl_vary.active = True
|
||||||
|
ampl_min.value = None
|
||||||
|
ampl_max.value = None
|
||||||
|
slope_guess.value = None
|
||||||
|
slope_vary.active = True
|
||||||
|
slope_min.value = None
|
||||||
|
slope_max.value = None
|
||||||
|
offset_guess.value = None
|
||||||
|
offset_vary.active = True
|
||||||
|
offset_min.value = None
|
||||||
|
offset_max.value = None
|
||||||
|
integ_from.value = None
|
||||||
|
integ_to.value = None
|
||||||
|
|
||||||
|
fitparam_reset_button = Button(label="Reset to defaults", default_size=145)
|
||||||
|
fitparam_reset_button.on_click(fitparam_reset_button_callback)
|
||||||
|
|
||||||
|
fit_output_textinput = TextAreaInput(title="Fit results:", width=450, height=400)
|
||||||
|
|
||||||
|
def peakfind_all_button_callback():
|
||||||
|
for scan in det_data["scan"].values():
|
||||||
|
pyzebra.ccl_findpeaks(
|
||||||
|
scan,
|
||||||
|
int_threshold=peak_int_ratio_spinner.value,
|
||||||
|
prominence=peak_prominence_spinner.value,
|
||||||
|
smooth=smooth_toggle.active,
|
||||||
|
window_size=window_size_spinner.value,
|
||||||
|
poly_order=poly_order_spinner.value,
|
||||||
|
)
|
||||||
|
|
||||||
|
_update_table()
|
||||||
|
|
||||||
|
sel_ind = scan_table_source.selected.indices[-1]
|
||||||
|
_update_plot(scan_table_source.data["scan"][sel_ind])
|
||||||
|
|
||||||
|
peakfind_all_button = Button(label="Peak Find All", button_type="primary", default_size=145)
|
||||||
|
peakfind_all_button.on_click(peakfind_all_button_callback)
|
||||||
|
|
||||||
|
def peakfind_button_callback():
|
||||||
|
sel_ind = scan_table_source.selected.indices[-1]
|
||||||
|
scan = scan_table_source.data["scan"][sel_ind]
|
||||||
|
pyzebra.ccl_findpeaks(
|
||||||
|
det_data["scan"][scan],
|
||||||
|
int_threshold=peak_int_ratio_spinner.value,
|
||||||
|
prominence=peak_prominence_spinner.value,
|
||||||
|
smooth=smooth_toggle.active,
|
||||||
|
window_size=window_size_spinner.value,
|
||||||
|
poly_order=poly_order_spinner.value,
|
||||||
|
)
|
||||||
|
|
||||||
|
_update_table()
|
||||||
|
_update_plot(scan)
|
||||||
|
|
||||||
|
peakfind_button = Button(label="Peak Find Current", default_size=145)
|
||||||
|
peakfind_button.on_click(peakfind_button_callback)
|
||||||
|
|
||||||
|
def fit_all_button_callback():
|
||||||
|
for scan in det_data["scan"].values():
|
||||||
|
pyzebra.fitccl(
|
||||||
|
scan,
|
||||||
|
guess=[
|
||||||
|
centre_guess.value,
|
||||||
|
sigma_guess.value,
|
||||||
|
ampl_guess.value,
|
||||||
|
slope_guess.value,
|
||||||
|
offset_guess.value,
|
||||||
|
],
|
||||||
|
vary=[
|
||||||
|
centre_vary.active,
|
||||||
|
sigma_vary.active,
|
||||||
|
ampl_vary.active,
|
||||||
|
slope_vary.active,
|
||||||
|
offset_vary.active,
|
||||||
|
],
|
||||||
|
constraints_min=[
|
||||||
|
centre_min.value,
|
||||||
|
sigma_min.value,
|
||||||
|
ampl_min.value,
|
||||||
|
slope_min.value,
|
||||||
|
offset_min.value,
|
||||||
|
],
|
||||||
|
constraints_max=[
|
||||||
|
centre_max.value,
|
||||||
|
sigma_max.value,
|
||||||
|
ampl_max.value,
|
||||||
|
slope_max.value,
|
||||||
|
offset_max.value,
|
||||||
|
],
|
||||||
|
numfit_min=integ_from.value,
|
||||||
|
numfit_max=integ_to.value,
|
||||||
|
)
|
||||||
|
|
||||||
|
sel_ind = scan_table_source.selected.indices[-1]
|
||||||
|
_update_plot(scan_table_source.data["scan"][sel_ind])
|
||||||
|
_update_table()
|
||||||
|
|
||||||
|
fit_all_button = Button(label="Fit All", button_type="primary", default_size=145)
|
||||||
|
fit_all_button.on_click(fit_all_button_callback)
|
||||||
|
|
||||||
|
def fit_button_callback():
|
||||||
|
sel_ind = scan_table_source.selected.indices[-1]
|
||||||
|
scan = scan_table_source.data["scan"][sel_ind]
|
||||||
|
|
||||||
|
pyzebra.fitccl(
|
||||||
|
det_data["scan"][scan],
|
||||||
|
guess=[
|
||||||
|
centre_guess.value,
|
||||||
|
sigma_guess.value,
|
||||||
|
ampl_guess.value,
|
||||||
|
slope_guess.value,
|
||||||
|
offset_guess.value,
|
||||||
|
],
|
||||||
|
vary=[
|
||||||
|
centre_vary.active,
|
||||||
|
sigma_vary.active,
|
||||||
|
ampl_vary.active,
|
||||||
|
slope_vary.active,
|
||||||
|
offset_vary.active,
|
||||||
|
],
|
||||||
|
constraints_min=[
|
||||||
|
centre_min.value,
|
||||||
|
sigma_min.value,
|
||||||
|
ampl_min.value,
|
||||||
|
slope_min.value,
|
||||||
|
offset_min.value,
|
||||||
|
],
|
||||||
|
constraints_max=[
|
||||||
|
centre_max.value,
|
||||||
|
sigma_max.value,
|
||||||
|
ampl_max.value,
|
||||||
|
slope_max.value,
|
||||||
|
offset_max.value,
|
||||||
|
],
|
||||||
|
numfit_min=integ_from.value,
|
||||||
|
numfit_max=integ_to.value,
|
||||||
|
)
|
||||||
|
|
||||||
|
_update_plot(scan)
|
||||||
|
_update_table()
|
||||||
|
|
||||||
|
fit_button = Button(label="Fit Current", default_size=145)
|
||||||
|
fit_button.on_click(fit_button_callback)
|
||||||
|
|
||||||
|
def area_method_radiobutton_callback(_attr, _old, new):
|
||||||
|
det_data["meta"]["area_method"] = ("fit", "integ")[new]
|
||||||
|
|
||||||
|
area_method_radiobutton = RadioButtonGroup(
|
||||||
|
labels=["Fit", "Integral"], active=0, default_size=145
|
||||||
|
)
|
||||||
|
area_method_radiobutton.on_change("active", area_method_radiobutton_callback)
|
||||||
|
|
||||||
|
preview_output_textinput = TextAreaInput(title="Export file preview:", width=450, height=400)
|
||||||
|
|
||||||
|
def preview_output_button_callback():
|
||||||
|
if det_data["meta"]["indices"] == "hkl":
|
||||||
|
ext = ".comm"
|
||||||
|
elif det_data["meta"]["indices"] == "real":
|
||||||
|
ext = ".incomm"
|
||||||
|
|
||||||
|
with tempfile.TemporaryDirectory() as temp_dir:
|
||||||
|
temp_file = temp_dir + "/temp"
|
||||||
|
pyzebra.export_comm(det_data, temp_file)
|
||||||
|
|
||||||
|
with open(f"{temp_file}{ext}") as f:
|
||||||
|
preview_output_textinput.value = f.read()
|
||||||
|
|
||||||
|
preview_output_button = Button(label="Preview file", default_size=220)
|
||||||
|
preview_output_button.on_click(preview_output_button_callback)
|
||||||
|
|
||||||
|
def export_results(det_data):
|
||||||
|
if det_data["meta"]["indices"] == "hkl":
|
||||||
|
ext = ".comm"
|
||||||
|
elif det_data["meta"]["indices"] == "real":
|
||||||
|
ext = ".incomm"
|
||||||
|
|
||||||
|
with tempfile.TemporaryDirectory() as temp_dir:
|
||||||
|
temp_file = temp_dir + "/temp"
|
||||||
|
pyzebra.export_comm(det_data, temp_file)
|
||||||
|
|
||||||
|
with open(f"{temp_file}{ext}") as f:
|
||||||
|
output_content = f.read()
|
||||||
|
|
||||||
|
return output_content, ext
|
||||||
|
|
||||||
|
def save_button_callback():
|
||||||
|
cont, ext = export_results(det_data)
|
||||||
|
js_data.data.update(cont=[cont], ext=[ext])
|
||||||
|
|
||||||
|
save_button = Button(label="Download file", button_type="success", default_size=220)
|
||||||
|
save_button.on_click(save_button_callback)
|
||||||
|
save_button.js_on_click(CustomJS(args={"js_data": js_data}, code=javaScript))
|
||||||
|
|
||||||
|
findpeak_controls = column(
|
||||||
|
row(peak_pos_textinput, column(Spacer(height=19), smooth_toggle)),
|
||||||
|
row(peak_int_ratio_spinner, peak_prominence_spinner),
|
||||||
|
row(window_size_spinner, poly_order_spinner),
|
||||||
|
row(peakfind_button, peakfind_all_button),
|
||||||
|
)
|
||||||
|
|
||||||
|
div_1 = Div(text="Guess:")
|
||||||
|
div_2 = Div(text="Vary:")
|
||||||
|
div_3 = Div(text="Min:")
|
||||||
|
div_4 = Div(text="Max:")
|
||||||
|
div_5 = Div(text="Gauss Centre:", margin=[5, 5, -5, 5])
|
||||||
|
div_6 = Div(text="Gauss Sigma:", margin=[5, 5, -5, 5])
|
||||||
|
div_7 = Div(text="Gauss Ampl.:", margin=[5, 5, -5, 5])
|
||||||
|
div_8 = Div(text="Slope:", margin=[5, 5, -5, 5])
|
||||||
|
div_9 = Div(text="Offset:", margin=[5, 5, -5, 5])
|
||||||
|
fitpeak_controls = row(
|
||||||
|
column(
|
||||||
|
Spacer(height=36),
|
||||||
|
div_1,
|
||||||
|
Spacer(height=12),
|
||||||
|
div_2,
|
||||||
|
Spacer(height=12),
|
||||||
|
div_3,
|
||||||
|
Spacer(height=12),
|
||||||
|
div_4,
|
||||||
|
),
|
||||||
|
column(div_5, centre_guess, centre_vary, centre_min, centre_max),
|
||||||
|
column(div_6, sigma_guess, sigma_vary, sigma_min, sigma_max),
|
||||||
|
column(div_7, ampl_guess, ampl_vary, ampl_min, ampl_max),
|
||||||
|
column(div_8, slope_guess, slope_vary, slope_min, slope_max),
|
||||||
|
column(div_9, offset_guess, offset_vary, offset_min, offset_max),
|
||||||
|
Spacer(width=20),
|
||||||
|
column(
|
||||||
|
row(integ_from, integ_to),
|
||||||
|
row(fitparam_reset_button, area_method_radiobutton),
|
||||||
|
row(fit_button, fit_all_button),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
export_layout = column(preview_output_textinput, row(preview_output_button, save_button))
|
||||||
|
|
||||||
|
upload_div = Div(text="Or upload .ccl file:")
|
||||||
|
tab_layout = column(
|
||||||
|
row(proposal_textinput, ccl_file_select),
|
||||||
|
row(column(Spacer(height=5), upload_div), upload_button),
|
||||||
|
row(scan_table, plot, Spacer(width=30), fit_output_textinput, export_layout),
|
||||||
|
row(findpeak_controls, Spacer(width=30), fitpeak_controls),
|
||||||
|
)
|
||||||
|
|
||||||
|
return Panel(child=tab_layout, title="ccl integrate")
|
@ -406,4 +406,4 @@ def create():
|
|||||||
|
|
||||||
curdoc().add_periodic_callback(update_config, 1000)
|
curdoc().add_periodic_callback(update_config, 1000)
|
||||||
|
|
||||||
return Panel(child=tab_layout, title="Anatric")
|
return Panel(child=tab_layout, title="hdf anatric")
|
@ -87,7 +87,8 @@ def create():
|
|||||||
temperature_spinner.value = det_data["temperature"][index]
|
temperature_spinner.value = det_data["temperature"][index]
|
||||||
|
|
||||||
gamma, nu = calculate_pol(det_data, index)
|
gamma, nu = calculate_pol(det_data, index)
|
||||||
image_source.data.update(gamma=[gamma], nu=[nu])
|
omega = np.ones((IMAGE_H, IMAGE_W)) * det_data["rot_angle"][index]
|
||||||
|
image_source.data.update(gamma=[gamma], nu=[nu], omega=[omega])
|
||||||
|
|
||||||
def update_overview_plot():
|
def update_overview_plot():
|
||||||
h5_data = det_data["data"]
|
h5_data = det_data["data"]
|
||||||
@ -161,6 +162,7 @@ def create():
|
|||||||
l=[np.zeros((1, 1))],
|
l=[np.zeros((1, 1))],
|
||||||
gamma=[np.zeros((1, 1))],
|
gamma=[np.zeros((1, 1))],
|
||||||
nu=[np.zeros((1, 1))],
|
nu=[np.zeros((1, 1))],
|
||||||
|
omega=[np.zeros((1, 1))],
|
||||||
x=[0],
|
x=[0],
|
||||||
y=[0],
|
y=[0],
|
||||||
dw=[IMAGE_W],
|
dw=[IMAGE_W],
|
||||||
@ -173,12 +175,14 @@ def create():
|
|||||||
l_glyph = Image(image="l", x="x", y="y", dw="dw", dh="dh", global_alpha=0)
|
l_glyph = Image(image="l", x="x", y="y", dw="dw", dh="dh", global_alpha=0)
|
||||||
gamma_glyph = Image(image="gamma", x="x", y="y", dw="dw", dh="dh", global_alpha=0)
|
gamma_glyph = Image(image="gamma", x="x", y="y", dw="dw", dh="dh", global_alpha=0)
|
||||||
nu_glyph = Image(image="nu", x="x", y="y", dw="dw", dh="dh", global_alpha=0)
|
nu_glyph = Image(image="nu", x="x", y="y", dw="dw", dh="dh", global_alpha=0)
|
||||||
|
omega_glyph = Image(image="omega", x="x", y="y", dw="dw", dh="dh", global_alpha=0)
|
||||||
|
|
||||||
plot.add_glyph(image_source, h_glyph)
|
plot.add_glyph(image_source, h_glyph)
|
||||||
plot.add_glyph(image_source, k_glyph)
|
plot.add_glyph(image_source, k_glyph)
|
||||||
plot.add_glyph(image_source, l_glyph)
|
plot.add_glyph(image_source, l_glyph)
|
||||||
plot.add_glyph(image_source, gamma_glyph)
|
plot.add_glyph(image_source, gamma_glyph)
|
||||||
plot.add_glyph(image_source, nu_glyph)
|
plot.add_glyph(image_source, nu_glyph)
|
||||||
|
plot.add_glyph(image_source, omega_glyph)
|
||||||
|
|
||||||
image_glyph = Image(image="image", x="x", y="y", dw="dw", dh="dh")
|
image_glyph = Image(image="image", x="x", y="y", dw="dw", dh="dh")
|
||||||
plot.add_glyph(image_source, image_glyph, name="image_glyph")
|
plot.add_glyph(image_source, image_glyph, name="image_glyph")
|
||||||
@ -224,6 +228,7 @@ def create():
|
|||||||
("intensity", "@image"),
|
("intensity", "@image"),
|
||||||
("gamma", "@gamma"),
|
("gamma", "@gamma"),
|
||||||
("nu", "@nu"),
|
("nu", "@nu"),
|
||||||
|
("omega", "@omega"),
|
||||||
("h", "@h"),
|
("h", "@h"),
|
||||||
("k", "@k"),
|
("k", "@k"),
|
||||||
("l", "@l"),
|
("l", "@l"),
|
||||||
@ -376,7 +381,7 @@ def create():
|
|||||||
overview_plot_x_image_glyph.color_mapper = LinearColorMapper(palette=cmap_dict[new])
|
overview_plot_x_image_glyph.color_mapper = LinearColorMapper(palette=cmap_dict[new])
|
||||||
overview_plot_y_image_glyph.color_mapper = LinearColorMapper(palette=cmap_dict[new])
|
overview_plot_y_image_glyph.color_mapper = LinearColorMapper(palette=cmap_dict[new])
|
||||||
|
|
||||||
colormap = Select(title="Colormap:", options=list(cmap_dict.keys()))
|
colormap = Select(title="Colormap:", options=list(cmap_dict.keys()), default_size=145)
|
||||||
colormap.on_change("value", colormap_callback)
|
colormap.on_change("value", colormap_callback)
|
||||||
colormap.value = "plasma"
|
colormap.value = "plasma"
|
||||||
|
|
||||||
@ -395,7 +400,7 @@ def create():
|
|||||||
|
|
||||||
update_image()
|
update_image()
|
||||||
|
|
||||||
auto_toggle = Toggle(label="Auto Range", active=True, button_type="default")
|
auto_toggle = Toggle(label="Auto Range", active=True, button_type="default", default_size=145)
|
||||||
auto_toggle.on_click(auto_toggle_callback)
|
auto_toggle.on_click(auto_toggle_callback)
|
||||||
|
|
||||||
# ---- colormap display max value
|
# ---- colormap display max value
|
||||||
@ -409,6 +414,7 @@ def create():
|
|||||||
value=1,
|
value=1,
|
||||||
step=STEP,
|
step=STEP,
|
||||||
disabled=auto_toggle.active,
|
disabled=auto_toggle.active,
|
||||||
|
default_size=145,
|
||||||
)
|
)
|
||||||
display_max_spinner.on_change("value", display_max_spinner_callback)
|
display_max_spinner.on_change("value", display_max_spinner_callback)
|
||||||
|
|
||||||
@ -423,6 +429,7 @@ def create():
|
|||||||
value=0,
|
value=0,
|
||||||
step=STEP,
|
step=STEP,
|
||||||
disabled=auto_toggle.active,
|
disabled=auto_toggle.active,
|
||||||
|
default_size=145,
|
||||||
)
|
)
|
||||||
display_min_spinner.on_change("value", display_min_spinner_callback)
|
display_min_spinner.on_change("value", display_min_spinner_callback)
|
||||||
|
|
||||||
@ -465,13 +472,22 @@ def create():
|
|||||||
temperature_spinner = Spinner(title="Temperature:", format="0.00", width=145, disabled=True)
|
temperature_spinner = Spinner(title="Temperature:", format="0.00", width=145, disabled=True)
|
||||||
|
|
||||||
# Final layout
|
# Final layout
|
||||||
layout_image = column(
|
layout_image = column(gridplot([[proj_v, None], [plot, proj_h]], merge_tools=False))
|
||||||
gridplot([[proj_v, None], [plot, proj_h]], merge_tools=False), row(index_spinner)
|
colormap_layout = column(
|
||||||
|
row(colormap, column(Spacer(height=19), auto_toggle)),
|
||||||
|
row(display_max_spinner, display_min_spinner),
|
||||||
)
|
)
|
||||||
colormap_layout = column(colormap, auto_toggle, display_max_spinner, display_min_spinner)
|
|
||||||
hkl_layout = column(radio_button_group, hkl_button)
|
hkl_layout = column(radio_button_group, hkl_button)
|
||||||
params_layout = row(magnetic_field_spinner, temperature_spinner)
|
params_layout = row(magnetic_field_spinner, temperature_spinner)
|
||||||
|
|
||||||
|
layout_controls = row(
|
||||||
|
column(selection_button, selection_list),
|
||||||
|
Spacer(width=20),
|
||||||
|
column(frame_button_group, colormap_layout),
|
||||||
|
Spacer(width=20),
|
||||||
|
column(index_spinner, params_layout, hkl_layout),
|
||||||
|
)
|
||||||
|
|
||||||
layout_overview = column(
|
layout_overview = column(
|
||||||
gridplot(
|
gridplot(
|
||||||
[[overview_plot_x, overview_plot_y]],
|
[[overview_plot_x, overview_plot_y]],
|
||||||
@ -486,12 +502,12 @@ def create():
|
|||||||
column(
|
column(
|
||||||
row(column(Spacer(height=5), upload_div), upload_button, filelist),
|
row(column(Spacer(height=5), upload_div), upload_button, filelist),
|
||||||
layout_overview,
|
layout_overview,
|
||||||
row(frame_button_group, selection_button, selection_list),
|
layout_controls,
|
||||||
),
|
),
|
||||||
column(roi_avg_plot, layout_image, row(colormap_layout, column(params_layout, hkl_layout))),
|
column(roi_avg_plot, layout_image),
|
||||||
)
|
)
|
||||||
|
|
||||||
return Panel(child=tab_layout, title="Data Viewer")
|
return Panel(child=tab_layout, title="hdf viewer")
|
||||||
|
|
||||||
|
|
||||||
def calculate_hkl(det_data, index, setup_type="nb_bi"):
|
def calculate_hkl(det_data, index, setup_type="nb_bi"):
|
@ -1,19 +0,0 @@
|
|||||||
import pickle
|
|
||||||
|
|
||||||
|
|
||||||
def save_dict(obj, name):
|
|
||||||
""" saves dictionary as pickle file in binary format
|
|
||||||
:arg obj - object to save
|
|
||||||
:arg name - name of the file
|
|
||||||
NOTE: path should be added later"""
|
|
||||||
with open(name + '.pkl', 'wb') as f:
|
|
||||||
pickle.dump(obj, f, pickle.HIGHEST_PROTOCOL)
|
|
||||||
|
|
||||||
|
|
||||||
def load_dict(name):
|
|
||||||
"""load dictionary from picle file
|
|
||||||
:arg name - name of the file to load
|
|
||||||
NOTE: expect the file in the same folder, path should be added later
|
|
||||||
:return dictionary"""
|
|
||||||
with open(name + '.pkl', 'rb') as f:
|
|
||||||
return pickle.load(f)
|
|
513
pyzebra/ccl_dict_operation.py
Normal file
513
pyzebra/ccl_dict_operation.py
Normal file
@ -0,0 +1,513 @@
|
|||||||
|
import numpy as np
|
||||||
|
import uncertainties as u
|
||||||
|
|
||||||
|
from .fit2 import create_uncertanities
|
||||||
|
|
||||||
|
|
||||||
|
def add_dict(dict1, dict2):
|
||||||
|
"""adds two dictionaries, meta of the new is saved as meata+original_filename and
|
||||||
|
measurements are shifted to continue with numbering of first dict
|
||||||
|
:arg dict1 : dictionarry to add to
|
||||||
|
:arg dict2 : dictionarry from which to take the measurements
|
||||||
|
:return dict1 : combined dictionary
|
||||||
|
Note: dict1 must be made from ccl, otherwise we would have to change the structure of loaded
|
||||||
|
dat file"""
|
||||||
|
max_measurement_dict1 = max([int(str(keys)[1:]) for keys in dict1["scan"]])
|
||||||
|
if dict2["meta"]["data_type"] == ".ccl":
|
||||||
|
new_filenames = [
|
||||||
|
"M" + str(x + max_measurement_dict1)
|
||||||
|
for x in [int(str(keys)[1:]) for keys in dict2["scan"]]
|
||||||
|
]
|
||||||
|
new_meta_name = "meta" + str(dict2["meta"]["original_filename"])
|
||||||
|
if new_meta_name not in dict1:
|
||||||
|
for keys, name in zip(dict2["scan"], new_filenames):
|
||||||
|
dict2["scan"][keys]["file_of_origin"] = str(dict2["meta"]["original_filename"])
|
||||||
|
dict1["scan"][name] = dict2["scan"][keys]
|
||||||
|
|
||||||
|
dict1[new_meta_name] = dict2["meta"]
|
||||||
|
|
||||||
|
else:
|
||||||
|
raise KeyError(
|
||||||
|
str(
|
||||||
|
"The file %s has alredy been added to %s"
|
||||||
|
% (dict2["meta"]["original_filename"], dict1["meta"]["original_filename"])
|
||||||
|
)
|
||||||
|
)
|
||||||
|
elif dict2["meta"]["data_type"] == ".dat":
|
||||||
|
d = {}
|
||||||
|
new_name = "M" + str(max_measurement_dict1 + 1)
|
||||||
|
hkl = dict2["meta"]["title"]
|
||||||
|
d["h_index"] = float(hkl.split()[-3])
|
||||||
|
d["k_index"] = float(hkl.split()[-2])
|
||||||
|
d["l_index"] = float(hkl.split()[-1])
|
||||||
|
d["number_of_measurements"] = len(dict2["scan"]["NP"])
|
||||||
|
d["om"] = dict2["scan"]["om"]
|
||||||
|
d["Counts"] = dict2["scan"]["Counts"]
|
||||||
|
d["monitor"] = dict2["scan"]["Monitor1"][0]
|
||||||
|
d["temperature"] = dict2["meta"]["temp"]
|
||||||
|
d["mag_field"] = dict2["meta"]["mf"]
|
||||||
|
d["omega_angle"] = dict2["meta"]["omega"]
|
||||||
|
dict1["scan"][new_name] = d
|
||||||
|
print(hkl.split())
|
||||||
|
for keys in d:
|
||||||
|
print(keys)
|
||||||
|
|
||||||
|
print("s")
|
||||||
|
|
||||||
|
return dict1
|
||||||
|
|
||||||
|
|
||||||
|
def auto(dict):
|
||||||
|
"""takes just unique tuples from all tuples in dictionary returend by scan_dict
|
||||||
|
intendet for automatic merge if you doesent want to specify what scans to merge together
|
||||||
|
args: dict - dictionary from scan_dict function
|
||||||
|
:return dict - dict without repetitions"""
|
||||||
|
for keys in dict:
|
||||||
|
tuple_list = dict[keys]
|
||||||
|
new = list()
|
||||||
|
for i in range(len(tuple_list)):
|
||||||
|
if tuple_list[0][0] == tuple_list[i][0]:
|
||||||
|
new.append(tuple_list[i])
|
||||||
|
dict[keys] = new
|
||||||
|
return dict
|
||||||
|
|
||||||
|
|
||||||
|
def scan_dict(dict):
|
||||||
|
"""scans dictionary for duplicate hkl indexes
|
||||||
|
:arg dict : dictionary to scan
|
||||||
|
:return dictionary with matching scans, if there are none, the dict is empty
|
||||||
|
note: can be checked by "not d", true if empty
|
||||||
|
"""
|
||||||
|
|
||||||
|
d = {}
|
||||||
|
for i in dict["scan"]:
|
||||||
|
for j in dict["scan"]:
|
||||||
|
if dict["scan"][str(i)] != dict["scan"][str(j)]:
|
||||||
|
itup = (
|
||||||
|
dict["scan"][str(i)]["h_index"],
|
||||||
|
dict["scan"][str(i)]["k_index"],
|
||||||
|
dict["scan"][str(i)]["l_index"],
|
||||||
|
)
|
||||||
|
jtup = (
|
||||||
|
dict["scan"][str(j)]["h_index"],
|
||||||
|
dict["scan"][str(j)]["k_index"],
|
||||||
|
dict["scan"][str(j)]["l_index"],
|
||||||
|
)
|
||||||
|
if itup != jtup:
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
|
||||||
|
if str(itup) not in d:
|
||||||
|
d[str(itup)] = list()
|
||||||
|
d[str(itup)].append((i, j))
|
||||||
|
else:
|
||||||
|
d[str(itup)].append((i, j))
|
||||||
|
else:
|
||||||
|
continue
|
||||||
|
return d
|
||||||
|
|
||||||
|
|
||||||
|
def compare_hkl(dict1, dict2):
|
||||||
|
"""Compares two dictionaries based on hkl indexes and return dictionary with str(h k l) as
|
||||||
|
key and tuple with keys to same scan in dict1 and dict2
|
||||||
|
:arg dict1 : first dictionary
|
||||||
|
:arg dict2 : second dictionary
|
||||||
|
:return d : dict with matches
|
||||||
|
example of one key: '0.0 0.0 -1.0 : ('M1', 'M9')' meaning that 001 hkl scan is M1 in
|
||||||
|
first dict and M9 in second"""
|
||||||
|
d = {}
|
||||||
|
dupl = 0
|
||||||
|
for keys in dict1["scan"]:
|
||||||
|
for key in dict2["scan"]:
|
||||||
|
if (
|
||||||
|
dict1["scan"][str(keys)]["h_index"] == dict2["scan"][str(key)]["h_index"]
|
||||||
|
and dict1["scan"][str(keys)]["k_index"] == dict2["scan"][str(key)]["k_index"]
|
||||||
|
and dict1["scan"][str(keys)]["l_index"] == dict2["scan"][str(key)]["l_index"]
|
||||||
|
):
|
||||||
|
|
||||||
|
if (
|
||||||
|
str(
|
||||||
|
(
|
||||||
|
str(dict1["scan"][str(keys)]["h_index"])
|
||||||
|
+ " "
|
||||||
|
+ str(dict1["scan"][str(keys)]["k_index"])
|
||||||
|
+ " "
|
||||||
|
+ str(dict1["scan"][str(keys)]["l_index"])
|
||||||
|
)
|
||||||
|
)
|
||||||
|
not in d
|
||||||
|
):
|
||||||
|
d[
|
||||||
|
str(
|
||||||
|
str(dict1["scan"][str(keys)]["h_index"])
|
||||||
|
+ " "
|
||||||
|
+ str(dict1["scan"][str(keys)]["k_index"])
|
||||||
|
+ " "
|
||||||
|
+ str(dict1["scan"][str(keys)]["l_index"])
|
||||||
|
)
|
||||||
|
] = (str(keys), str(key))
|
||||||
|
else:
|
||||||
|
dupl = dupl + 1
|
||||||
|
d[
|
||||||
|
str(
|
||||||
|
str(dict1["scan"][str(keys)]["h_index"])
|
||||||
|
+ " "
|
||||||
|
+ str(dict1["scan"][str(keys)]["k_index"])
|
||||||
|
+ " "
|
||||||
|
+ str(dict1["scan"][str(keys)]["l_index"])
|
||||||
|
+ "_dupl"
|
||||||
|
+ str(dupl)
|
||||||
|
)
|
||||||
|
] = (str(keys), str(key))
|
||||||
|
else:
|
||||||
|
continue
|
||||||
|
|
||||||
|
return d
|
||||||
|
|
||||||
|
|
||||||
|
def create_tuples(x, y, y_err):
|
||||||
|
"""creates tuples for sorting and merginng of the data
|
||||||
|
Counts need to be normalized to monitor before"""
|
||||||
|
t = list()
|
||||||
|
for i in range(len(x)):
|
||||||
|
tup = (x[i], y[i], y_err[i])
|
||||||
|
t.append(tup)
|
||||||
|
return t
|
||||||
|
|
||||||
|
|
||||||
|
def normalize(dict, key, monitor):
|
||||||
|
"""Normalizes the scan to monitor, checks if sigma exists, otherwise creates it
|
||||||
|
:arg dict : dictionary to from which to tkae the scan
|
||||||
|
:arg key : which scan to normalize from dict1
|
||||||
|
:arg monitor : final monitor
|
||||||
|
:return counts - normalized counts
|
||||||
|
:return sigma - normalized sigma"""
|
||||||
|
|
||||||
|
counts = np.array(dict["scan"][key]["Counts"])
|
||||||
|
sigma = np.sqrt(counts) if "sigma" not in dict["scan"][key] else dict["scan"][key]["sigma"]
|
||||||
|
monitor_ratio = monitor / dict["scan"][key]["monitor"]
|
||||||
|
scaled_counts = counts * monitor_ratio
|
||||||
|
scaled_sigma = np.array(sigma) * monitor_ratio
|
||||||
|
|
||||||
|
return scaled_counts, scaled_sigma
|
||||||
|
|
||||||
|
|
||||||
|
def merge(dict1, dict2, keys, auto=True, monitor=100000):
|
||||||
|
"""merges the two tuples and sorts them, if om value is same, Counts value is average
|
||||||
|
averaging is propagated into sigma if dict1 == dict2, key[1] is deleted after merging
|
||||||
|
:arg dict1 : dictionary to which scan will be merged
|
||||||
|
:arg dict2 : dictionary from which scan will be merged
|
||||||
|
:arg keys : tuple with key to dict1 and dict2
|
||||||
|
:arg auto : if true, when monitors are same, does not change it, if flase, takes monitor always
|
||||||
|
:arg monitor : final monitor after merging
|
||||||
|
note: dict1 and dict2 can be same dict
|
||||||
|
:return dict1 with merged scan"""
|
||||||
|
if auto:
|
||||||
|
if dict1["scan"][keys[0]]["monitor"] == dict2["scan"][keys[1]]["monitor"]:
|
||||||
|
monitor = dict1["scan"][keys[0]]["monitor"]
|
||||||
|
|
||||||
|
# load om and Counts
|
||||||
|
x1, x2 = dict1["scan"][keys[0]]["om"], dict2["scan"][keys[1]]["om"]
|
||||||
|
cor_y1, y_err1 = normalize(dict1, keys[0], monitor=monitor)
|
||||||
|
cor_y2, y_err2 = normalize(dict2, keys[1], monitor=monitor)
|
||||||
|
# creates touples (om, Counts, sigma) for sorting and further processing
|
||||||
|
tuple_list = create_tuples(x1, cor_y1, y_err1) + create_tuples(x2, cor_y2, y_err2)
|
||||||
|
# Sort the list on om and add 0 0 0 tuple to the last position
|
||||||
|
sorted_t = sorted(tuple_list, key=lambda tup: tup[0])
|
||||||
|
sorted_t.append((0, 0, 0))
|
||||||
|
om, Counts, sigma = [], [], []
|
||||||
|
seen = list()
|
||||||
|
for i in range(len(sorted_t) - 1):
|
||||||
|
if sorted_t[i][0] not in seen:
|
||||||
|
if sorted_t[i][0] != sorted_t[i + 1][0]:
|
||||||
|
om = np.append(om, sorted_t[i][0])
|
||||||
|
Counts = np.append(Counts, sorted_t[i][1])
|
||||||
|
sigma = np.append(sigma, sorted_t[i][2])
|
||||||
|
else:
|
||||||
|
om = np.append(om, sorted_t[i][0])
|
||||||
|
counts1, counts2 = sorted_t[i][1], sorted_t[i + 1][1]
|
||||||
|
sigma1, sigma2 = sorted_t[i][2], sorted_t[i + 1][2]
|
||||||
|
count_err1 = u.ufloat(counts1, sigma1)
|
||||||
|
count_err2 = u.ufloat(counts2, sigma2)
|
||||||
|
avg = (count_err1 + count_err2) / 2
|
||||||
|
Counts = np.append(Counts, avg.n)
|
||||||
|
sigma = np.append(sigma, avg.s)
|
||||||
|
seen.append(sorted_t[i][0])
|
||||||
|
else:
|
||||||
|
continue
|
||||||
|
|
||||||
|
if dict1 == dict2:
|
||||||
|
del dict1["scan"][keys[1]]
|
||||||
|
|
||||||
|
note = (
|
||||||
|
f"This scan was merged with scan {keys[1]} from "
|
||||||
|
f'file {dict2["meta"]["original_filename"]} \n'
|
||||||
|
)
|
||||||
|
if "notes" not in dict1["scan"][str(keys[0])]:
|
||||||
|
dict1["scan"][str(keys[0])]["notes"] = note
|
||||||
|
else:
|
||||||
|
dict1["scan"][str(keys[0])]["notes"] += note
|
||||||
|
|
||||||
|
dict1["scan"][keys[0]]["om"] = om
|
||||||
|
dict1["scan"][keys[0]]["Counts"] = Counts
|
||||||
|
dict1["scan"][keys[0]]["sigma"] = sigma
|
||||||
|
dict1["scan"][keys[0]]["monitor"] = monitor
|
||||||
|
print("merging done")
|
||||||
|
return dict1
|
||||||
|
|
||||||
|
|
||||||
|
def substract_measurement(dict1, dict2, keys, auto=True, monitor=100000):
|
||||||
|
"""Substracts two scan (scan key2 from dict2 from measurent key1 in dict1), expects om to be same
|
||||||
|
:arg dict1 : dictionary to which scan will be merged
|
||||||
|
:arg dict2 : dictionary from which scan will be merged
|
||||||
|
:arg keys : tuple with key to dict1 and dict2
|
||||||
|
:arg auto : if true, when monitors are same, does not change it, if flase, takes monitor always
|
||||||
|
:arg monitor : final monitor after merging
|
||||||
|
:returns d : dict1 with substracted Counts from dict2 and sigma that comes from the substraction"""
|
||||||
|
|
||||||
|
if len(dict1["scan"][keys[0]]["om"]) != len(dict2["scan"][keys[1]]["om"]):
|
||||||
|
raise ValueError("Omegas have different lengths, cannot be substracted")
|
||||||
|
|
||||||
|
if auto:
|
||||||
|
if dict1["scan"][keys[0]]["monitor"] == dict2["scan"][keys[1]]["monitor"]:
|
||||||
|
monitor = dict1["scan"][keys[0]]["monitor"]
|
||||||
|
|
||||||
|
cor_y1, y_err1 = normalize(dict1, keys[0], monitor=monitor)
|
||||||
|
cor_y2, y_err2 = normalize(dict2, keys[1], monitor=monitor)
|
||||||
|
|
||||||
|
dict1_count_err = create_uncertanities(cor_y1, y_err1)
|
||||||
|
dict2_count_err = create_uncertanities(cor_y2, y_err2)
|
||||||
|
|
||||||
|
res = np.subtract(dict1_count_err, dict2_count_err)
|
||||||
|
|
||||||
|
res_nom = []
|
||||||
|
res_err = []
|
||||||
|
for k in range(len(res)):
|
||||||
|
res_nom = np.append(res_nom, res[k].n)
|
||||||
|
res_err = np.append(res_err, res[k].s)
|
||||||
|
|
||||||
|
if len([num for num in res_nom if num < 0]) >= 0.3 * len(res_nom):
|
||||||
|
print(
|
||||||
|
f"Warning! percentage of negative numbers in scan subsracted {keys[0]} is "
|
||||||
|
f"{len([num for num in res_nom if num < 0]) / len(res_nom)}"
|
||||||
|
)
|
||||||
|
|
||||||
|
dict1["scan"][str(keys[0])]["Counts"] = res_nom
|
||||||
|
dict1["scan"][str(keys[0])]["sigma"] = res_err
|
||||||
|
dict1["scan"][str(keys[0])]["monitor"] = monitor
|
||||||
|
note = (
|
||||||
|
f'Scan {keys[1]} from file {dict2["meta"]["original_filename"]} '
|
||||||
|
f"was substracted from this scan \n"
|
||||||
|
)
|
||||||
|
if "notes" not in dict1["scan"][str(keys[0])]:
|
||||||
|
dict1["scan"][str(keys[0])]["notes"] = note
|
||||||
|
else:
|
||||||
|
dict1["scan"][str(keys[0])]["notes"] += note
|
||||||
|
return dict1
|
||||||
|
|
||||||
|
|
||||||
|
def compare_dict(dict1, dict2):
|
||||||
|
"""takes two ccl dictionaries and compare different values for each key
|
||||||
|
:arg dict1 : dictionary 1 (ccl)
|
||||||
|
:arg dict2 : dictionary 2 (ccl)
|
||||||
|
:returns warning : dictionary with keys from primary files (if they differ) with
|
||||||
|
information of how many scan differ and which ones differ
|
||||||
|
:returns report_string string comparing all different values respecively of measurements"""
|
||||||
|
|
||||||
|
if dict1["meta"]["data_type"] != dict2["meta"]["data_type"]:
|
||||||
|
print("select two dicts")
|
||||||
|
return
|
||||||
|
S = []
|
||||||
|
conflicts = {}
|
||||||
|
warnings = {}
|
||||||
|
|
||||||
|
comp = compare_hkl(dict1, dict2)
|
||||||
|
d1 = scan_dict(dict1)
|
||||||
|
d2 = scan_dict(dict2)
|
||||||
|
if not d1:
|
||||||
|
S.append("There are no duplicates in %s (dict1) \n" % dict1["meta"]["original_filename"])
|
||||||
|
else:
|
||||||
|
S.append(
|
||||||
|
"There are %d duplicates in %s (dict1) \n"
|
||||||
|
% (len(d1), dict1["meta"]["original_filename"])
|
||||||
|
)
|
||||||
|
warnings["Duplicates in dict1"] = list()
|
||||||
|
for keys in d1:
|
||||||
|
S.append("Measurements %s with hkl %s \n" % (d1[keys], keys))
|
||||||
|
warnings["Duplicates in dict1"].append(d1[keys])
|
||||||
|
if not d2:
|
||||||
|
S.append("There are no duplicates in %s (dict2) \n" % dict2["meta"]["original_filename"])
|
||||||
|
else:
|
||||||
|
S.append(
|
||||||
|
"There are %d duplicates in %s (dict2) \n"
|
||||||
|
% (len(d2), dict2["meta"]["original_filename"])
|
||||||
|
)
|
||||||
|
warnings["Duplicates in dict2"] = list()
|
||||||
|
for keys in d2:
|
||||||
|
S.append("Measurements %s with hkl %s \n" % (d2[keys], keys))
|
||||||
|
warnings["Duplicates in dict2"].append(d2[keys])
|
||||||
|
|
||||||
|
# compare meta
|
||||||
|
S.append("Different values in meta: \n")
|
||||||
|
different_meta = {
|
||||||
|
k: dict1["meta"][k]
|
||||||
|
for k in dict1["meta"]
|
||||||
|
if k in dict2["meta"] and dict1["meta"][k] != dict2["meta"][k]
|
||||||
|
}
|
||||||
|
exlude_meta_set = ["original_filename", "date", "title"]
|
||||||
|
for keys in different_meta:
|
||||||
|
if keys in exlude_meta_set:
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
if keys not in conflicts:
|
||||||
|
conflicts[keys] = 1
|
||||||
|
else:
|
||||||
|
conflicts[keys] = conflicts[keys] + 1
|
||||||
|
|
||||||
|
S.append(" Different values in %s \n" % str(keys))
|
||||||
|
S.append(" dict1: %s \n" % str(dict1["meta"][str(keys)]))
|
||||||
|
S.append(" dict2: %s \n" % str(dict2["meta"][str(keys)]))
|
||||||
|
|
||||||
|
# compare Measurements
|
||||||
|
S.append(
|
||||||
|
"Number of measurements in %s = %s \n"
|
||||||
|
% (dict1["meta"]["original_filename"], len(dict1["scan"]))
|
||||||
|
)
|
||||||
|
S.append(
|
||||||
|
"Number of measurements in %s = %s \n"
|
||||||
|
% (dict2["meta"]["original_filename"], len(dict2["scan"]))
|
||||||
|
)
|
||||||
|
S.append("Different values in Measurements:\n")
|
||||||
|
select_set = ["om", "Counts", "sigma"]
|
||||||
|
exlude_set = ["time", "Counts", "date", "notes"]
|
||||||
|
for keys1 in comp:
|
||||||
|
for key2 in dict1["scan"][str(comp[str(keys1)][0])]:
|
||||||
|
if key2 in exlude_set:
|
||||||
|
continue
|
||||||
|
if key2 not in select_set:
|
||||||
|
try:
|
||||||
|
if (
|
||||||
|
dict1["scan"][comp[str(keys1)][0]][str(key2)]
|
||||||
|
!= dict2["scan"][str(comp[str(keys1)][1])][str(key2)]
|
||||||
|
):
|
||||||
|
S.append(
|
||||||
|
"Scan value "
|
||||||
|
"%s"
|
||||||
|
", with hkl %s differs in meausrements %s and %s \n"
|
||||||
|
% (key2, keys1, comp[str(keys1)][0], comp[str(keys1)][1])
|
||||||
|
)
|
||||||
|
S.append(
|
||||||
|
" dict1: %s \n"
|
||||||
|
% str(dict1["scan"][comp[str(keys1)][0]][str(key2)])
|
||||||
|
)
|
||||||
|
S.append(
|
||||||
|
" dict2: %s \n"
|
||||||
|
% str(dict2["scan"][comp[str(keys1)][1]][str(key2)])
|
||||||
|
)
|
||||||
|
if key2 not in conflicts:
|
||||||
|
conflicts[key2] = {}
|
||||||
|
conflicts[key2]["amount"] = 1
|
||||||
|
conflicts[key2]["scan"] = str(comp[str(keys1)])
|
||||||
|
else:
|
||||||
|
|
||||||
|
conflicts[key2]["amount"] = conflicts[key2]["amount"] + 1
|
||||||
|
conflicts[key2]["scan"] = (
|
||||||
|
conflicts[key2]["scan"] + " " + (str(comp[str(keys1)]))
|
||||||
|
)
|
||||||
|
except KeyError as e:
|
||||||
|
print("Missing keys, some files were probably merged or substracted")
|
||||||
|
print(e.args)
|
||||||
|
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
comparison = list(dict1["scan"][comp[str(keys1)][0]][str(key2)]) == list(
|
||||||
|
dict2["scan"][comp[str(keys1)][1]][str(key2)]
|
||||||
|
)
|
||||||
|
if len(list(dict1["scan"][comp[str(keys1)][0]][str(key2)])) != len(
|
||||||
|
list(dict2["scan"][comp[str(keys1)][1]][str(key2)])
|
||||||
|
):
|
||||||
|
if str("different length of %s" % key2) not in warnings:
|
||||||
|
warnings[str("different length of %s" % key2)] = list()
|
||||||
|
warnings[str("different length of %s" % key2)].append(
|
||||||
|
(str(comp[keys1][0]), str(comp[keys1][1]))
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
warnings[str("different length of %s" % key2)].append(
|
||||||
|
(str(comp[keys1][0]), str(comp[keys1][1]))
|
||||||
|
)
|
||||||
|
if not comparison:
|
||||||
|
S.append(
|
||||||
|
"Scan value "
|
||||||
|
"%s"
|
||||||
|
" differs in scan %s and %s \n"
|
||||||
|
% (key2, comp[str(keys1)][0], comp[str(keys1)][1])
|
||||||
|
)
|
||||||
|
S.append(
|
||||||
|
" dict1: %s \n"
|
||||||
|
% str(list(dict1["scan"][comp[str(keys1)][0]][str(key2)]))
|
||||||
|
)
|
||||||
|
S.append(
|
||||||
|
" dict2: %s \n"
|
||||||
|
% str(list(dict2["scan"][comp[str(keys1)][1]][str(key2)]))
|
||||||
|
)
|
||||||
|
if key2 not in conflicts:
|
||||||
|
conflicts[key2] = {}
|
||||||
|
conflicts[key2]["amount"] = 1
|
||||||
|
conflicts[key2]["scan"] = str(comp[str(keys1)])
|
||||||
|
else:
|
||||||
|
conflicts[key2]["amount"] = conflicts[key2]["amount"] + 1
|
||||||
|
conflicts[key2]["scan"] = (
|
||||||
|
conflicts[key2]["scan"] + " " + (str(comp[str(keys1)]))
|
||||||
|
)
|
||||||
|
except KeyError as e:
|
||||||
|
print("Missing keys, some files were probably merged or substracted")
|
||||||
|
print(e.args)
|
||||||
|
|
||||||
|
for keys in conflicts:
|
||||||
|
try:
|
||||||
|
conflicts[str(keys)]["scan"] = conflicts[str(keys)]["scan"].split(" ")
|
||||||
|
except:
|
||||||
|
continue
|
||||||
|
report_string = "".join(S)
|
||||||
|
return warnings, conflicts, report_string
|
||||||
|
|
||||||
|
|
||||||
|
def guess_next(dict1, dict2, comp):
|
||||||
|
"""iterates thorough the scans and tries to decide if the scans should be
|
||||||
|
substracted or merged"""
|
||||||
|
threshold = 0.05
|
||||||
|
for keys in comp:
|
||||||
|
if (
|
||||||
|
abs(
|
||||||
|
(
|
||||||
|
dict1["scan"][str(comp[keys][0])]["temperature"]
|
||||||
|
- dict2["scan"][str(comp[keys][1])]["temperature"]
|
||||||
|
)
|
||||||
|
/ dict2["scan"][str(comp[keys][1])]["temperature"]
|
||||||
|
)
|
||||||
|
< threshold
|
||||||
|
and abs(
|
||||||
|
(
|
||||||
|
dict1["scan"][str(comp[keys][0])]["mag_field"]
|
||||||
|
- dict2["scan"][str(comp[keys][1])]["mag_field"]
|
||||||
|
)
|
||||||
|
/ dict2["scan"][str(comp[keys][1])]["mag_field"]
|
||||||
|
)
|
||||||
|
< threshold
|
||||||
|
):
|
||||||
|
comp[keys] = comp[keys] + tuple("m")
|
||||||
|
else:
|
||||||
|
comp[keys] = comp[keys] + tuple("s")
|
||||||
|
|
||||||
|
return comp
|
||||||
|
|
||||||
|
|
||||||
|
def process_dict(dict1, dict2, comp):
|
||||||
|
"""substracts or merges scans, guess_next function must run first """
|
||||||
|
for keys in comp:
|
||||||
|
if comp[keys][2] == "s":
|
||||||
|
substract_measurement(dict1, dict2, comp[keys])
|
||||||
|
elif comp[keys][2] == "m":
|
||||||
|
merge(dict1, dict2, comp[keys])
|
||||||
|
|
||||||
|
return dict1
|
75
pyzebra/ccl_findpeaks.py
Normal file
75
pyzebra/ccl_findpeaks.py
Normal file
@ -0,0 +1,75 @@
|
|||||||
|
import numpy as np
|
||||||
|
import scipy as sc
|
||||||
|
from scipy.interpolate import interp1d
|
||||||
|
from scipy.signal import savgol_filter
|
||||||
|
|
||||||
|
|
||||||
|
def ccl_findpeaks(
|
||||||
|
scan, int_threshold=0.8, prominence=50, smooth=False, window_size=7, poly_order=3
|
||||||
|
):
|
||||||
|
|
||||||
|
"""function iterates through the dictionary created by load_cclv2 and locates peaks for each scan
|
||||||
|
args: scan - a single scan,
|
||||||
|
|
||||||
|
int_threshold - fraction of threshold_intensity/max_intensity, must be positive num between 0 and 1
|
||||||
|
i.e. will only detect peaks above 75% of max intensity
|
||||||
|
|
||||||
|
prominence - defines a drop of values that must be between two peaks, must be positive number
|
||||||
|
i.e. if promimence is 20, it will detect two neigbouring peaks of 300 and 310 intesities,
|
||||||
|
if none of the itermediate values are lower that 290
|
||||||
|
|
||||||
|
smooth - if true, smooths data by savitzky golay filter, if false - no smoothing
|
||||||
|
|
||||||
|
window_size - window size for savgol filter, must be odd positive integer
|
||||||
|
|
||||||
|
poly_order = order of the polynomial used in savgol filter, must be positive integer smaller than
|
||||||
|
window_size returns: dictionary with following structure:
|
||||||
|
D{M34{ 'num_of_peaks': 1, #num of peaks
|
||||||
|
'peak_indexes': [20], # index of peaks in omega array
|
||||||
|
'peak_heights': [90.], # height of the peaks (if data vere smoothed
|
||||||
|
its the heigh of the peaks in smoothed data)
|
||||||
|
"""
|
||||||
|
if not 0 <= int_threshold <= 1:
|
||||||
|
int_threshold = 0.8
|
||||||
|
print(
|
||||||
|
"Invalid value for int_threshold, select value between 0 and 1, new value set to:",
|
||||||
|
int_threshold,
|
||||||
|
)
|
||||||
|
|
||||||
|
if not isinstance(window_size, int) or (window_size % 2) == 0 or window_size <= 1:
|
||||||
|
window_size = 7
|
||||||
|
print(
|
||||||
|
"Invalid value for window_size, select positive odd integer, new value set to!:",
|
||||||
|
window_size,
|
||||||
|
)
|
||||||
|
|
||||||
|
if not isinstance(poly_order, int) or window_size < poly_order:
|
||||||
|
poly_order = 3
|
||||||
|
print(
|
||||||
|
"Invalid value for poly_order, select positive integer smaller than window_size, new value set to:",
|
||||||
|
poly_order,
|
||||||
|
)
|
||||||
|
|
||||||
|
if not isinstance(prominence, (int, float)) and prominence < 0:
|
||||||
|
prominence = 50
|
||||||
|
print("Invalid value for prominence, select positive number, new value set to:", prominence)
|
||||||
|
|
||||||
|
omega = scan["om"]
|
||||||
|
counts = np.array(scan["Counts"])
|
||||||
|
if smooth:
|
||||||
|
itp = interp1d(omega, counts, kind="linear")
|
||||||
|
absintensity = [abs(number) for number in counts]
|
||||||
|
lowest_intensity = min(absintensity)
|
||||||
|
counts[counts < 0] = lowest_intensity
|
||||||
|
smooth_peaks = savgol_filter(itp(omega), window_size, poly_order)
|
||||||
|
|
||||||
|
else:
|
||||||
|
smooth_peaks = counts
|
||||||
|
|
||||||
|
peaks, properties = sc.signal.find_peaks(
|
||||||
|
smooth_peaks, height=int_threshold * max(smooth_peaks), prominence=prominence
|
||||||
|
)
|
||||||
|
scan["num_of_peaks"] = len(peaks)
|
||||||
|
scan["peak_indexes"] = peaks
|
||||||
|
scan["peak_heights"] = properties["peak_heights"]
|
||||||
|
scan["smooth_peaks"] = smooth_peaks # smoothed curve
|
80
pyzebra/comm_export.py
Normal file
80
pyzebra/comm_export.py
Normal file
@ -0,0 +1,80 @@
|
|||||||
|
import numpy as np
|
||||||
|
|
||||||
|
|
||||||
|
def correction(value, lorentz=True, zebra_mode="--", ang1=0, ang2=0):
|
||||||
|
if lorentz is False:
|
||||||
|
return value
|
||||||
|
else:
|
||||||
|
if zebra_mode == "bi":
|
||||||
|
corr_value = np.abs(value * np.sin(ang1))
|
||||||
|
return corr_value
|
||||||
|
elif zebra_mode == "nb":
|
||||||
|
corr_value = np.abs(value * np.sin(ang1) * np.cos(ang2))
|
||||||
|
return corr_value
|
||||||
|
|
||||||
|
|
||||||
|
def export_comm(data, path, lorentz=False):
|
||||||
|
"""exports data in the *.comm format
|
||||||
|
:param lorentz: perform Lorentz correction
|
||||||
|
:param path: path to file + name
|
||||||
|
:arg data - data to export, is dict after peak fitting
|
||||||
|
|
||||||
|
"""
|
||||||
|
zebra_mode = data["meta"]["zebra_mode"]
|
||||||
|
align = ">"
|
||||||
|
if data["meta"]["indices"] == "hkl":
|
||||||
|
extension = ".comm"
|
||||||
|
padding = [6, 4, 10, 8]
|
||||||
|
elif data["meta"]["indices"] == "real":
|
||||||
|
extension = ".incomm"
|
||||||
|
padding = [4, 6, 10, 8]
|
||||||
|
|
||||||
|
with open(str(path + extension), "w") as out_file:
|
||||||
|
for key, scan in data["scan"].items():
|
||||||
|
if "fit" not in scan:
|
||||||
|
print("Scan skipped - no fit value for:", key)
|
||||||
|
continue
|
||||||
|
scan_number_str = f"{key:{align}{padding[0]}}"
|
||||||
|
h_str = f'{int(scan["h_index"]):{padding[1]}}'
|
||||||
|
k_str = f'{int(scan["k_index"]):{padding[1]}}'
|
||||||
|
l_str = f'{int(scan["l_index"]):{padding[1]}}'
|
||||||
|
if data["meta"]["area_method"] == "fit":
|
||||||
|
area = float(scan["fit"]["fit_area"].n)
|
||||||
|
sigma_str = (
|
||||||
|
f'{"{:8.2f}".format(float(scan["fit"]["fit_area"].s)):{align}{padding[2]}}'
|
||||||
|
)
|
||||||
|
elif data["meta"]["area_method"] == "integ":
|
||||||
|
area = float(scan["fit"]["int_area"].n)
|
||||||
|
sigma_str = (
|
||||||
|
f'{"{:8.2f}".format(float(scan["fit"]["int_area"].s)):{align}{padding[2]}}'
|
||||||
|
)
|
||||||
|
|
||||||
|
if zebra_mode == "bi":
|
||||||
|
area = correction(area, lorentz, zebra_mode, scan["twotheta_angle"])
|
||||||
|
int_str = f'{"{:8.2f}".format(area):{align}{padding[2]}}'
|
||||||
|
angle_str1 = f'{scan["twotheta_angle"]:{padding[3]}}'
|
||||||
|
angle_str2 = f'{scan["omega_angle"]:{padding[3]}}'
|
||||||
|
angle_str3 = f'{scan["chi_angle"]:{padding[3]}}'
|
||||||
|
angle_str4 = f'{scan["phi_angle"]:{padding[3]}}'
|
||||||
|
elif zebra_mode == "nb":
|
||||||
|
area = correction(area, lorentz, zebra_mode, scan["gamma_angle"], scan["nu_angle"])
|
||||||
|
int_str = f'{"{:8.2f}".format(area):{align}{padding[2]}}'
|
||||||
|
angle_str1 = f'{scan["gamma_angle"]:{padding[3]}}'
|
||||||
|
angle_str2 = f'{scan["omega_angle"]:{padding[3]}}'
|
||||||
|
angle_str3 = f'{scan["nu_angle"]:{padding[3]}}'
|
||||||
|
angle_str4 = f'{scan["unkwn_angle"]:{padding[3]}}'
|
||||||
|
|
||||||
|
line = (
|
||||||
|
scan_number_str
|
||||||
|
+ h_str
|
||||||
|
+ l_str
|
||||||
|
+ k_str
|
||||||
|
+ int_str
|
||||||
|
+ sigma_str
|
||||||
|
+ angle_str1
|
||||||
|
+ angle_str2
|
||||||
|
+ angle_str3
|
||||||
|
+ angle_str4
|
||||||
|
+ "\n"
|
||||||
|
)
|
||||||
|
out_file.write(line)
|
227
pyzebra/fit2.py
Normal file
227
pyzebra/fit2.py
Normal file
@ -0,0 +1,227 @@
|
|||||||
|
import numpy as np
|
||||||
|
import uncertainties as u
|
||||||
|
from lmfit import Model, Parameters
|
||||||
|
from scipy.integrate import simps
|
||||||
|
|
||||||
|
|
||||||
|
def bin_data(array, binsize):
|
||||||
|
if isinstance(binsize, int) and 0 < binsize < len(array):
|
||||||
|
return [
|
||||||
|
np.mean(array[binsize * i : binsize * i + binsize])
|
||||||
|
for i in range(int(np.ceil(len(array) / binsize)))
|
||||||
|
]
|
||||||
|
else:
|
||||||
|
print("Binsize need to be positive integer smaller than lenght of array")
|
||||||
|
return array
|
||||||
|
|
||||||
|
|
||||||
|
def find_nearest(array, value):
|
||||||
|
# find nearest value and return index
|
||||||
|
array = np.asarray(array)
|
||||||
|
idx = (np.abs(array - value)).argmin()
|
||||||
|
return idx
|
||||||
|
|
||||||
|
|
||||||
|
def create_uncertanities(y, y_err):
|
||||||
|
# create array with uncertanities for error propagation
|
||||||
|
combined = np.array([])
|
||||||
|
for i in range(len(y)):
|
||||||
|
part = u.ufloat(y[i], y_err[i])
|
||||||
|
combined = np.append(combined, part)
|
||||||
|
return combined
|
||||||
|
|
||||||
|
|
||||||
|
def fitccl(
|
||||||
|
scan,
|
||||||
|
guess,
|
||||||
|
vary,
|
||||||
|
constraints_min,
|
||||||
|
constraints_max,
|
||||||
|
numfit_min=None,
|
||||||
|
numfit_max=None,
|
||||||
|
binning=None,
|
||||||
|
):
|
||||||
|
"""Made for fitting of ccl date where 1 peak is expected. Allows for combination of gaussian and linear model combination
|
||||||
|
:param scan: scan in the data dict (i.e. M123)
|
||||||
|
:param guess: initial guess for the fitting, if none, some values are added automatically in order (see below)
|
||||||
|
:param vary: True if parameter can vary during fitting, False if it to be fixed
|
||||||
|
:param numfit_min: minimal value on x axis for numerical integration - if none is centre of gaussian minus 3 sigma
|
||||||
|
:param numfit_max: maximal value on x axis for numerical integration - if none is centre of gaussian plus 3 sigma
|
||||||
|
:param constraints_min: min constranits value for fit
|
||||||
|
:param constraints_max: max constranits value for fit
|
||||||
|
:param binning : binning of the data
|
||||||
|
:return data dict with additional values
|
||||||
|
order for guess, vary, constraints_min, constraints_max:
|
||||||
|
[Gaussian centre, Gaussian sigma, Gaussian amplitude, background slope, background intercept]
|
||||||
|
examples:
|
||||||
|
guess = [None, None, 100, 0, None]
|
||||||
|
vary = [True, True, True, True, True]
|
||||||
|
constraints_min = [23, None, 50, 0, 0]
|
||||||
|
constraints_min = [80, None, 1000, 0, 100]
|
||||||
|
"""
|
||||||
|
if len(scan["peak_indexes"]) > 1:
|
||||||
|
# return in case of more than 1 peaks
|
||||||
|
print("More than 1 peak, scan skipped")
|
||||||
|
return
|
||||||
|
if binning is None or binning == 0 or binning == 1:
|
||||||
|
x = list(scan["om"])
|
||||||
|
y = list(scan["Counts"])
|
||||||
|
y_err = list(np.sqrt(y)) if scan.get("sigma", None) is None else list(scan["sigma"])
|
||||||
|
print(scan["peak_indexes"])
|
||||||
|
if not scan["peak_indexes"]:
|
||||||
|
centre = np.mean(x)
|
||||||
|
else:
|
||||||
|
centre = x[int(scan["peak_indexes"])]
|
||||||
|
else:
|
||||||
|
x = list(scan["om"])
|
||||||
|
if not scan["peak_indexes"]:
|
||||||
|
centre = np.mean(x)
|
||||||
|
else:
|
||||||
|
centre = x[int(scan["peak_indexes"])]
|
||||||
|
x = bin_data(x, binning)
|
||||||
|
y = list(scan["Counts"])
|
||||||
|
y_err = list(np.sqrt(y)) if scan.get("sigma", None) is None else list(scan["sigma"])
|
||||||
|
combined = bin_data(create_uncertanities(y, y_err), binning)
|
||||||
|
y = [combined[i].n for i in range(len(combined))]
|
||||||
|
y_err = [combined[i].s for i in range(len(combined))]
|
||||||
|
|
||||||
|
if len(scan["peak_indexes"]) == 0:
|
||||||
|
# Case for no peak, gaussian in centre, sigma as 20% of range
|
||||||
|
print("No peak")
|
||||||
|
peak_index = find_nearest(x, np.mean(x))
|
||||||
|
guess[0] = centre if guess[0] is None else guess[0]
|
||||||
|
guess[1] = (x[-1] - x[0]) / 5 if guess[1] is None else guess[1]
|
||||||
|
guess[2] = 50 if guess[2] is None else guess[2]
|
||||||
|
guess[3] = 0 if guess[3] is None else guess[3]
|
||||||
|
guess[4] = np.mean(y) if guess[4] is None else guess[4]
|
||||||
|
constraints_min[2] = 0
|
||||||
|
|
||||||
|
elif len(scan["peak_indexes"]) == 1:
|
||||||
|
# case for one peak, takse into account users guesses
|
||||||
|
print("one peak")
|
||||||
|
peak_height = scan["peak_heights"]
|
||||||
|
guess[0] = centre if guess[0] is None else guess[0]
|
||||||
|
guess[1] = 0.1 if guess[1] is None else guess[1]
|
||||||
|
guess[2] = float(peak_height / 10) if guess[2] is None else float(guess[2])
|
||||||
|
guess[3] = 0 if guess[3] is None else guess[3]
|
||||||
|
guess[4] = np.median(x) if guess[4] is None else guess[4]
|
||||||
|
constraints_min[0] = np.min(x) if constraints_min[0] is None else constraints_min[0]
|
||||||
|
constraints_max[0] = np.max(x) if constraints_max[0] is None else constraints_max[0]
|
||||||
|
|
||||||
|
def gaussian(x, g_cen, g_width, g_amp):
|
||||||
|
"""1-d gaussian: gaussian(x, amp, cen, wid)"""
|
||||||
|
return (g_amp / (np.sqrt(2 * np.pi) * g_width)) * np.exp(
|
||||||
|
-((x - g_cen) ** 2) / (2 * g_width ** 2)
|
||||||
|
)
|
||||||
|
|
||||||
|
def background(x, slope, intercept):
|
||||||
|
"""background"""
|
||||||
|
return slope * (x - centre) + intercept
|
||||||
|
|
||||||
|
mod = Model(gaussian) + Model(background)
|
||||||
|
params = Parameters()
|
||||||
|
params.add_many(
|
||||||
|
("g_cen", guess[0], bool(vary[0]), np.min(x), np.max(x), None, None),
|
||||||
|
("g_width", guess[1], bool(vary[1]), constraints_min[1], constraints_max[1], None, None),
|
||||||
|
("g_amp", guess[2], bool(vary[2]), constraints_min[2], constraints_max[2], None, None),
|
||||||
|
("slope", guess[3], bool(vary[3]), constraints_min[3], constraints_max[3], None, None),
|
||||||
|
("intercept", guess[4], bool(vary[4]), constraints_min[4], constraints_max[4], None, None),
|
||||||
|
)
|
||||||
|
# the weighted fit
|
||||||
|
try:
|
||||||
|
result = mod.fit(
|
||||||
|
y, params, weights=[np.abs(1 / val) for val in y_err], x=x, calc_covar=True,
|
||||||
|
)
|
||||||
|
except ValueError:
|
||||||
|
return
|
||||||
|
|
||||||
|
if result.params["g_amp"].stderr is None:
|
||||||
|
result.params["g_amp"].stderr = result.params["g_amp"].value
|
||||||
|
elif result.params["g_amp"].stderr > result.params["g_amp"].value:
|
||||||
|
result.params["g_amp"].stderr = result.params["g_amp"].value
|
||||||
|
|
||||||
|
# u.ufloat to work with uncertanities
|
||||||
|
fit_area = u.ufloat(result.params["g_amp"].value, result.params["g_amp"].stderr)
|
||||||
|
comps = result.eval_components()
|
||||||
|
|
||||||
|
if len(scan["peak_indexes"]) == 0:
|
||||||
|
# for case of no peak, there is no reason to integrate, therefore fit and int are equal
|
||||||
|
int_area = fit_area
|
||||||
|
|
||||||
|
elif len(scan["peak_indexes"]) == 1:
|
||||||
|
gauss_3sigmamin = find_nearest(
|
||||||
|
x, result.params["g_cen"].value - 3 * result.params["g_width"].value
|
||||||
|
)
|
||||||
|
gauss_3sigmamax = find_nearest(
|
||||||
|
x, result.params["g_cen"].value + 3 * result.params["g_width"].value
|
||||||
|
)
|
||||||
|
numfit_min = gauss_3sigmamin if numfit_min is None else find_nearest(x, numfit_min)
|
||||||
|
numfit_max = gauss_3sigmamax if numfit_max is None else find_nearest(x, numfit_max)
|
||||||
|
|
||||||
|
it = -1
|
||||||
|
while abs(numfit_max - numfit_min) < 3:
|
||||||
|
# in the case the peak is very thin and numerical integration would be on zero omega
|
||||||
|
# difference, finds closes values
|
||||||
|
it = it + 1
|
||||||
|
numfit_min = find_nearest(
|
||||||
|
x,
|
||||||
|
result.params["g_cen"].value - 3 * (1 + it / 10) * result.params["g_width"].value,
|
||||||
|
)
|
||||||
|
numfit_max = find_nearest(
|
||||||
|
x,
|
||||||
|
result.params["g_cen"].value + 3 * (1 + it / 10) * result.params["g_width"].value,
|
||||||
|
)
|
||||||
|
|
||||||
|
if x[numfit_min] < np.min(x):
|
||||||
|
# makes sure that the values supplied by user lay in the omega range
|
||||||
|
# can be ommited for users who know what they're doing
|
||||||
|
numfit_min = gauss_3sigmamin
|
||||||
|
print("Minimal integration value outside of x range")
|
||||||
|
elif x[numfit_min] >= x[numfit_max]:
|
||||||
|
numfit_min = gauss_3sigmamin
|
||||||
|
print("Minimal integration value higher than maximal")
|
||||||
|
else:
|
||||||
|
pass
|
||||||
|
if x[numfit_max] > np.max(x):
|
||||||
|
numfit_max = gauss_3sigmamax
|
||||||
|
print("Maximal integration value outside of x range")
|
||||||
|
elif x[numfit_max] <= x[numfit_min]:
|
||||||
|
numfit_max = gauss_3sigmamax
|
||||||
|
print("Maximal integration value lower than minimal")
|
||||||
|
else:
|
||||||
|
pass
|
||||||
|
|
||||||
|
count_errors = create_uncertanities(y, y_err)
|
||||||
|
# create error vector for numerical integration propagation
|
||||||
|
num_int_area = simps(count_errors[numfit_min:numfit_max], x[numfit_min:numfit_max])
|
||||||
|
slope_err = u.ufloat(result.params["slope"].value, result.params["slope"].stderr)
|
||||||
|
# pulls the nominal and error values from fit (slope)
|
||||||
|
intercept_err = u.ufloat(
|
||||||
|
result.params["intercept"].value, result.params["intercept"].stderr
|
||||||
|
)
|
||||||
|
# pulls the nominal and error values from fit (intercept)
|
||||||
|
|
||||||
|
background_errors = np.array([])
|
||||||
|
for j in range(len(x[numfit_min:numfit_max])):
|
||||||
|
# creates nominal and error vector for numerical integration of background
|
||||||
|
bg = slope_err * (x[j] - centre) + intercept_err
|
||||||
|
background_errors = np.append(background_errors, bg)
|
||||||
|
|
||||||
|
num_int_background = simps(background_errors, x[numfit_min:numfit_max])
|
||||||
|
int_area = num_int_area - num_int_background
|
||||||
|
|
||||||
|
d = {}
|
||||||
|
for pars in result.params:
|
||||||
|
d[str(pars)] = (result.params[str(pars)].value, result.params[str(pars)].vary)
|
||||||
|
print(result.fit_report())
|
||||||
|
|
||||||
|
print((result.params["g_amp"].value - int_area.n) / result.params["g_amp"].value)
|
||||||
|
|
||||||
|
d["ratio"] = (result.params["g_amp"].value - int_area.n) / result.params["g_amp"].value
|
||||||
|
d["int_area"] = int_area
|
||||||
|
d["fit_area"] = u.ufloat(result.params["g_amp"].value, result.params["g_amp"].stderr)
|
||||||
|
d["full_report"] = result.fit_report()
|
||||||
|
d["result"] = result
|
||||||
|
d["comps"] = comps
|
||||||
|
d["numfit"] = [numfit_min, numfit_max]
|
||||||
|
scan["fit"] = d
|
@ -22,7 +22,7 @@ def parse_h5meta(file):
|
|||||||
for line in file:
|
for line in file:
|
||||||
line = line.strip()
|
line = line.strip()
|
||||||
if line.startswith("#begin "):
|
if line.startswith("#begin "):
|
||||||
section = line[len("#begin "):]
|
section = line[len("#begin ") :]
|
||||||
content[section] = []
|
content[section] = []
|
||||||
|
|
||||||
elif line.startswith("#end"):
|
elif line.startswith("#end"):
|
||||||
@ -64,20 +64,3 @@ def read_detector_data(filepath):
|
|||||||
det_data["temperature"] = h5f["/entry1/sample/temperature"][:]
|
det_data["temperature"] = h5f["/entry1/sample/temperature"][:]
|
||||||
|
|
||||||
return det_data
|
return det_data
|
||||||
|
|
||||||
|
|
||||||
def open_h5meta(filepath):
|
|
||||||
"""Open h5meta file like *.cami
|
|
||||||
|
|
||||||
Args:
|
|
||||||
filepath (str): File path of a h5meta file.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
dict: A dictionary with h5 names and their detector data and angles.
|
|
||||||
"""
|
|
||||||
data = dict()
|
|
||||||
h5meta_content = read_h5meta(filepath)
|
|
||||||
for file in h5meta_content["filelist"]:
|
|
||||||
data[file] = read_detector_data(file)
|
|
||||||
|
|
||||||
return data
|
|
||||||
|
221
pyzebra/load_1D.py
Normal file
221
pyzebra/load_1D.py
Normal file
@ -0,0 +1,221 @@
|
|||||||
|
import os
|
||||||
|
import re
|
||||||
|
from collections import defaultdict
|
||||||
|
from decimal import Decimal
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
META_VARS_STR = (
|
||||||
|
"instrument",
|
||||||
|
"title",
|
||||||
|
"sample",
|
||||||
|
"user",
|
||||||
|
"ProposalID",
|
||||||
|
"original_filename",
|
||||||
|
"date",
|
||||||
|
"zebra_mode",
|
||||||
|
"proposal",
|
||||||
|
"proposal_user",
|
||||||
|
"proposal_title",
|
||||||
|
"proposal_email",
|
||||||
|
"detectorDistance",
|
||||||
|
)
|
||||||
|
META_VARS_FLOAT = (
|
||||||
|
"omega",
|
||||||
|
"mf",
|
||||||
|
"2-theta",
|
||||||
|
"chi",
|
||||||
|
"phi",
|
||||||
|
"nu",
|
||||||
|
"temp",
|
||||||
|
"wavelenght",
|
||||||
|
"a",
|
||||||
|
"b",
|
||||||
|
"c",
|
||||||
|
"alpha",
|
||||||
|
"beta",
|
||||||
|
"gamma",
|
||||||
|
"cex1",
|
||||||
|
"cex2",
|
||||||
|
"mexz",
|
||||||
|
"moml",
|
||||||
|
"mcvl",
|
||||||
|
"momu",
|
||||||
|
"mcvu",
|
||||||
|
"snv",
|
||||||
|
"snh",
|
||||||
|
"snvm",
|
||||||
|
"snhm",
|
||||||
|
"s1vt",
|
||||||
|
"s1vb",
|
||||||
|
"s1hr",
|
||||||
|
"s1hl",
|
||||||
|
"s2vt",
|
||||||
|
"s2vb",
|
||||||
|
"s2hr",
|
||||||
|
"s2hl",
|
||||||
|
)
|
||||||
|
META_UB_MATRIX = ("ub1j", "ub2j", "ub3j")
|
||||||
|
|
||||||
|
CCL_FIRST_LINE = (
|
||||||
|
# the first element is `scan_number`, which we don't save to metadata
|
||||||
|
("h_index", float),
|
||||||
|
("k_index", float),
|
||||||
|
("l_index", float),
|
||||||
|
)
|
||||||
|
|
||||||
|
CCL_FIRST_LINE_BI = (
|
||||||
|
*CCL_FIRST_LINE,
|
||||||
|
("twotheta_angle", float),
|
||||||
|
("omega_angle", float),
|
||||||
|
("chi_angle", float),
|
||||||
|
("phi_angle", float),
|
||||||
|
)
|
||||||
|
|
||||||
|
CCL_FIRST_LINE_NB = (
|
||||||
|
*CCL_FIRST_LINE,
|
||||||
|
("gamma_angle", float),
|
||||||
|
("omega_angle", float),
|
||||||
|
("nu_angle", float),
|
||||||
|
("unkwn_angle", float),
|
||||||
|
)
|
||||||
|
|
||||||
|
CCL_SECOND_LINE = (
|
||||||
|
("number_of_measurements", int),
|
||||||
|
("angle_step", float),
|
||||||
|
("monitor", float),
|
||||||
|
("temperature", float),
|
||||||
|
("mag_field", float),
|
||||||
|
("date", str),
|
||||||
|
("time", str),
|
||||||
|
("scan_type", str),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def load_1D(filepath):
|
||||||
|
"""
|
||||||
|
Loads *.ccl or *.dat file (Distinguishes them based on last 3 chars in string of filepath
|
||||||
|
to add more variables to read, extend the elif list
|
||||||
|
the file must include '#data' and number of points in right place to work properly
|
||||||
|
|
||||||
|
:arg filepath
|
||||||
|
:returns det_variables
|
||||||
|
- dictionary of all detector/scan variables and dictinionary for every scan.
|
||||||
|
Names of these dictionaries are M + scan number. They include HKL indeces, angles,
|
||||||
|
monitors, stepsize and array of counts
|
||||||
|
"""
|
||||||
|
with open(filepath, "r") as infile:
|
||||||
|
_, ext = os.path.splitext(filepath)
|
||||||
|
det_variables = parse_1D(infile, data_type=ext)
|
||||||
|
|
||||||
|
return det_variables
|
||||||
|
|
||||||
|
|
||||||
|
def parse_1D(fileobj, data_type):
|
||||||
|
# read metadata
|
||||||
|
metadata = {}
|
||||||
|
for line in fileobj:
|
||||||
|
if "=" in line:
|
||||||
|
variable, value = line.split("=")
|
||||||
|
variable = variable.strip()
|
||||||
|
if variable in META_VARS_FLOAT:
|
||||||
|
metadata[variable] = float(value)
|
||||||
|
elif variable in META_VARS_STR:
|
||||||
|
metadata[variable] = str(value)[:-1].strip()
|
||||||
|
elif variable in META_UB_MATRIX:
|
||||||
|
metadata[variable] = re.findall(r"[-+]?\d*\.\d+|\d+", str(value))
|
||||||
|
|
||||||
|
if "#data" in line:
|
||||||
|
# this is the end of metadata and the start of data section
|
||||||
|
break
|
||||||
|
|
||||||
|
# read data
|
||||||
|
scan = {}
|
||||||
|
if data_type == ".ccl":
|
||||||
|
decimal = list()
|
||||||
|
|
||||||
|
if metadata["zebra_mode"] == "bi":
|
||||||
|
ccl_first_line = CCL_FIRST_LINE_BI
|
||||||
|
elif metadata["zebra_mode"] == "nb":
|
||||||
|
ccl_first_line = CCL_FIRST_LINE_NB
|
||||||
|
ccl_second_line = CCL_SECOND_LINE
|
||||||
|
|
||||||
|
for line in fileobj:
|
||||||
|
d = {}
|
||||||
|
|
||||||
|
# first line
|
||||||
|
scan_number, *params = line.split()
|
||||||
|
for param, (param_name, param_type) in zip(params, ccl_first_line):
|
||||||
|
d[param_name] = param_type(param)
|
||||||
|
|
||||||
|
decimal.append(bool(Decimal(d["h_index"]) % 1 == 0))
|
||||||
|
decimal.append(bool(Decimal(d["k_index"]) % 1 == 0))
|
||||||
|
decimal.append(bool(Decimal(d["l_index"]) % 1 == 0))
|
||||||
|
|
||||||
|
# second line
|
||||||
|
next_line = next(fileobj)
|
||||||
|
params = next_line.split()
|
||||||
|
for param, (param_name, param_type) in zip(params, ccl_second_line):
|
||||||
|
d[param_name] = param_type(param)
|
||||||
|
|
||||||
|
d["om"] = np.linspace(
|
||||||
|
d["omega_angle"] - (d["number_of_measurements"] / 2) * d["angle_step"],
|
||||||
|
d["omega_angle"] + (d["number_of_measurements"] / 2) * d["angle_step"],
|
||||||
|
d["number_of_measurements"],
|
||||||
|
)
|
||||||
|
|
||||||
|
# subsequent lines with counts
|
||||||
|
counts = []
|
||||||
|
while len(counts) < d["number_of_measurements"]:
|
||||||
|
counts.extend(map(int, next(fileobj).split()))
|
||||||
|
d["Counts"] = counts
|
||||||
|
|
||||||
|
scan[int(scan_number)] = d
|
||||||
|
|
||||||
|
if all(decimal):
|
||||||
|
metadata["indices"] = "hkl"
|
||||||
|
else:
|
||||||
|
metadata["indices"] = "real"
|
||||||
|
|
||||||
|
elif data_type == ".dat":
|
||||||
|
# skip the first 2 rows, the third row contans the column names
|
||||||
|
next(fileobj)
|
||||||
|
next(fileobj)
|
||||||
|
col_names = next(fileobj).split()
|
||||||
|
data_cols = defaultdict(list)
|
||||||
|
|
||||||
|
for line in fileobj:
|
||||||
|
if "END-OF-DATA" in line:
|
||||||
|
# this is the end of data
|
||||||
|
break
|
||||||
|
|
||||||
|
for name, val in zip(col_names, line.split()):
|
||||||
|
data_cols[name].append(float(val))
|
||||||
|
|
||||||
|
try:
|
||||||
|
data_cols["h_index"] = float(metadata["title"].split()[-3])
|
||||||
|
data_cols["k_index"] = float(metadata["title"].split()[-2])
|
||||||
|
data_cols["l_index"] = float(metadata["title"].split()[-1])
|
||||||
|
except (ValueError, IndexError):
|
||||||
|
print("seems hkl is not in title")
|
||||||
|
|
||||||
|
data_cols["temperature"] = metadata["temp"]
|
||||||
|
data_cols["mag_field"] = metadata["mf"]
|
||||||
|
data_cols["omega_angle"] = metadata["omega"]
|
||||||
|
data_cols["number_of_measurements"] = len(data_cols["om"])
|
||||||
|
data_cols["monitor"] = data_cols["Monitor1"][0]
|
||||||
|
data_cols["twotheta_angle"] = metadata["2-theta"]
|
||||||
|
data_cols["chi_angle"] = metadata["chi"]
|
||||||
|
data_cols["phi_angle"] = metadata["phi"]
|
||||||
|
data_cols["nu_angle"] = metadata["nu"]
|
||||||
|
|
||||||
|
scan[1] = dict(data_cols)
|
||||||
|
|
||||||
|
else:
|
||||||
|
print("Unknown file extention")
|
||||||
|
|
||||||
|
# utility information
|
||||||
|
metadata["data_type"] = data_type
|
||||||
|
metadata["area_method"] = "fit"
|
||||||
|
|
||||||
|
return {"meta": metadata, "scan": scan}
|
202
pyzebra/param_study_moduls.py
Normal file
202
pyzebra/param_study_moduls.py
Normal file
@ -0,0 +1,202 @@
|
|||||||
|
from load_1D import load_1D
|
||||||
|
from ccl_dict_operation import add_dict
|
||||||
|
import pandas as pd
|
||||||
|
from mpl_toolkits.mplot3d import Axes3D # dont delete, otherwise waterfall wont work
|
||||||
|
import matplotlib.pyplot as plt
|
||||||
|
import matplotlib as mpl
|
||||||
|
import numpy as np
|
||||||
|
import pickle
|
||||||
|
import scipy.io as sio
|
||||||
|
|
||||||
|
|
||||||
|
def load_dats(filepath):
|
||||||
|
"""reads the txt file, get headers and data
|
||||||
|
:arg filepath to txt file or list of filepaths to the files
|
||||||
|
:return ccl like dictionary"""
|
||||||
|
if isinstance(filepath, str):
|
||||||
|
data_type = "txt"
|
||||||
|
file_list = list()
|
||||||
|
with open(filepath, "r") as infile:
|
||||||
|
col_names = next(infile).split(",")
|
||||||
|
col_names = [col_names[i].rstrip() for i in range(len(col_names))]
|
||||||
|
for line in infile:
|
||||||
|
if "END" in line:
|
||||||
|
break
|
||||||
|
file_list.append(tuple(line.split(",")))
|
||||||
|
elif isinstance(filepath, list):
|
||||||
|
data_type = "list"
|
||||||
|
file_list = filepath
|
||||||
|
dict1 = {}
|
||||||
|
for i in range(len(file_list)):
|
||||||
|
if not dict1:
|
||||||
|
if data_type == "txt":
|
||||||
|
dict1 = load_1D(file_list[0][0])
|
||||||
|
else:
|
||||||
|
dict1 = load_1D(file_list[0])
|
||||||
|
else:
|
||||||
|
if data_type == "txt":
|
||||||
|
dict1 = add_dict(dict1, load_1D(file_list[i][0]))
|
||||||
|
else:
|
||||||
|
dict1 = add_dict(dict1, load_1D(file_list[i]))
|
||||||
|
dict1["scan"][i + 1]["params"] = {}
|
||||||
|
if data_type == "txt":
|
||||||
|
for x in range(len(col_names) - 1):
|
||||||
|
dict1["scan"][i + 1]["params"][col_names[x + 1]] = file_list[i][x + 1]
|
||||||
|
|
||||||
|
return dict1
|
||||||
|
|
||||||
|
|
||||||
|
def create_dataframe(dict1):
|
||||||
|
"""Creates pandas dataframe from the dictionary
|
||||||
|
:arg ccl like dictionary
|
||||||
|
:return pandas dataframe"""
|
||||||
|
# create dictionary to which we pull only wanted items before transforming it to pd.dataframe
|
||||||
|
pull_dict = {}
|
||||||
|
pull_dict["filenames"] = list()
|
||||||
|
for key in dict1["scan"][1]["params"]:
|
||||||
|
pull_dict[key] = list()
|
||||||
|
pull_dict["temperature"] = list()
|
||||||
|
pull_dict["mag_field"] = list()
|
||||||
|
pull_dict["fit_area"] = list()
|
||||||
|
pull_dict["int_area"] = list()
|
||||||
|
pull_dict["om"] = list()
|
||||||
|
pull_dict["Counts"] = list()
|
||||||
|
|
||||||
|
# populate the dict
|
||||||
|
for keys in dict1["scan"]:
|
||||||
|
if "file_of_origin" in dict1["scan"][keys]:
|
||||||
|
pull_dict["filenames"].append(dict1["scan"][keys]["file_of_origin"].split("/")[-1])
|
||||||
|
else:
|
||||||
|
pull_dict["filenames"].append(dict1["meta"]["original_filename"].split("/")[-1])
|
||||||
|
for key in dict1["scan"][keys]["params"]:
|
||||||
|
pull_dict[str(key)].append(float(dict1["scan"][keys]["params"][key]))
|
||||||
|
pull_dict["temperature"].append(dict1["scan"][keys]["temperature"])
|
||||||
|
pull_dict["mag_field"].append(dict1["scan"][keys]["mag_field"])
|
||||||
|
pull_dict["fit_area"].append(dict1["scan"][keys]["fit"]["fit_area"])
|
||||||
|
pull_dict["int_area"].append(dict1["scan"][keys]["fit"]["int_area"])
|
||||||
|
pull_dict["om"].append(dict1["scan"][keys]["om"])
|
||||||
|
pull_dict["Counts"].append(dict1["scan"][keys]["Counts"])
|
||||||
|
|
||||||
|
return pd.DataFrame(data=pull_dict)
|
||||||
|
|
||||||
|
|
||||||
|
def sort_dataframe(dataframe, sorting_parameter):
|
||||||
|
"""sorts the data frame and resets index"""
|
||||||
|
data = dataframe.sort_values(by=sorting_parameter)
|
||||||
|
data = data.reset_index(drop=True)
|
||||||
|
return data
|
||||||
|
|
||||||
|
|
||||||
|
def make_graph(data, sorting_parameter, style):
|
||||||
|
"""Makes the graph from the data based on style and sorting parameter
|
||||||
|
:arg data : pandas dataframe with data after sorting
|
||||||
|
:arg sorting_parameter to pull the correct variable and name
|
||||||
|
:arg style of the graph - waterfall, scatter, heatmap
|
||||||
|
:return matplotlib figure"""
|
||||||
|
if style == "waterfall":
|
||||||
|
mpl.rcParams["legend.fontsize"] = 10
|
||||||
|
fig = plt.figure()
|
||||||
|
ax = fig.gca(projection="3d")
|
||||||
|
for i in range(len(data)):
|
||||||
|
x = data["om"][i]
|
||||||
|
z = data["Counts"][i]
|
||||||
|
yy = [data[sorting_parameter][i]] * len(x)
|
||||||
|
ax.plot(x, yy, z, label=str("%s = %f" % (sorting_parameter, yy[i])))
|
||||||
|
|
||||||
|
ax.legend()
|
||||||
|
ax.set_xlabel("Omega")
|
||||||
|
ax.set_ylabel(sorting_parameter)
|
||||||
|
ax.set_zlabel("counts")
|
||||||
|
|
||||||
|
elif style == "scatter":
|
||||||
|
fig = plt.figure()
|
||||||
|
plt.errorbar(
|
||||||
|
data[sorting_parameter],
|
||||||
|
[data["fit_area"][i].n for i in range(len(data["fit_area"]))],
|
||||||
|
[data["fit_area"][i].s for i in range(len(data["fit_area"]))],
|
||||||
|
capsize=5,
|
||||||
|
ecolor="green",
|
||||||
|
)
|
||||||
|
plt.xlabel(str(sorting_parameter))
|
||||||
|
plt.ylabel("Intesity")
|
||||||
|
|
||||||
|
elif style == "heat":
|
||||||
|
new_om = list()
|
||||||
|
for i in range(len(data)):
|
||||||
|
new_om = np.append(new_om, np.around(data["om"][i], 2), axis=0)
|
||||||
|
unique_om = np.unique(new_om)
|
||||||
|
color_matrix = np.zeros(shape=(len(data), len(unique_om)))
|
||||||
|
for i in range(len(data)):
|
||||||
|
for j in range(len(data["om"][i])):
|
||||||
|
if np.around(data["om"][i][j], 2) in np.unique(new_om):
|
||||||
|
color_matrix[i, j] = data["Counts"][i][j]
|
||||||
|
else:
|
||||||
|
continue
|
||||||
|
|
||||||
|
fig = plt.figure()
|
||||||
|
plt.pcolormesh(unique_om, data[sorting_parameter], color_matrix, shading="gouraud")
|
||||||
|
plt.xlabel("omega")
|
||||||
|
plt.ylabel(sorting_parameter)
|
||||||
|
plt.colorbar()
|
||||||
|
plt.clim(color_matrix.mean(), color_matrix.max())
|
||||||
|
|
||||||
|
return fig
|
||||||
|
|
||||||
|
|
||||||
|
def save_dict(obj, name):
|
||||||
|
""" saves dictionary as pickle file in binary format
|
||||||
|
:arg obj - object to save
|
||||||
|
:arg name - name of the file
|
||||||
|
NOTE: path should be added later"""
|
||||||
|
with open(name + ".pkl", "wb") as f:
|
||||||
|
pickle.dump(obj, f, pickle.HIGHEST_PROTOCOL)
|
||||||
|
|
||||||
|
|
||||||
|
def load_dict(name):
|
||||||
|
"""load dictionary from picle file
|
||||||
|
:arg name - name of the file to load
|
||||||
|
NOTE: expect the file in the same folder, path should be added later
|
||||||
|
:return dictionary"""
|
||||||
|
with open(name + ".pkl", "rb") as f:
|
||||||
|
return pickle.load(f)
|
||||||
|
|
||||||
|
|
||||||
|
# pickle, mat, h5, txt, csv, json
|
||||||
|
def save_table(data, filetype, name, path=None):
|
||||||
|
print("Saving: ", filetype)
|
||||||
|
path = "" if path is None else path
|
||||||
|
if filetype == "pickle":
|
||||||
|
# to work with uncertanities, see uncertanity module
|
||||||
|
with open(path + name + ".pkl", "wb") as f:
|
||||||
|
pickle.dump(data, f, pickle.HIGHEST_PROTOCOL)
|
||||||
|
if filetype == "mat":
|
||||||
|
# matlab doesent allow some special character to be in var names, also cant start with
|
||||||
|
# numbers, in need, add some to the romove_character list
|
||||||
|
data["fit_area_nom"] = [data["fit_area"][i].n for i in range(len(data["fit_area"]))]
|
||||||
|
data["fit_area_err"] = [data["fit_area"][i].s for i in range(len(data["fit_area"]))]
|
||||||
|
data["int_area_nom"] = [data["int_area"][i].n for i in range(len(data["int_area"]))]
|
||||||
|
data["int_area_err"] = [data["int_area"][i].s for i in range(len(data["int_area"]))]
|
||||||
|
data = data.drop(columns=["fit_area", "int_area"])
|
||||||
|
remove_characters = [" ", "[", "]", "{", "}", "(", ")"]
|
||||||
|
for character in remove_characters:
|
||||||
|
data.columns = [
|
||||||
|
data.columns[i].replace(character, "") for i in range(len(data.columns))
|
||||||
|
]
|
||||||
|
sio.savemat((path + name + ".mat"), {name: col.values for name, col in data.items()})
|
||||||
|
if filetype == "csv" or "txt":
|
||||||
|
data["fit_area_nom"] = [data["fit_area"][i].n for i in range(len(data["fit_area"]))]
|
||||||
|
data["fit_area_err"] = [data["fit_area"][i].s for i in range(len(data["fit_area"]))]
|
||||||
|
data["int_area_nom"] = [data["int_area"][i].n for i in range(len(data["int_area"]))]
|
||||||
|
data["int_area_err"] = [data["int_area"][i].s for i in range(len(data["int_area"]))]
|
||||||
|
data = data.drop(columns=["fit_area", "int_area", "om", "Counts"])
|
||||||
|
if filetype == "csv":
|
||||||
|
data.to_csv(path + name + ".csv")
|
||||||
|
if filetype == "txt":
|
||||||
|
with open((path + name + ".txt"), "w") as outfile:
|
||||||
|
data.to_string(outfile)
|
||||||
|
if filetype == "h5":
|
||||||
|
hdf = pd.HDFStore((path + name + ".h5"))
|
||||||
|
hdf.put("data", data)
|
||||||
|
hdf.close()
|
||||||
|
if filetype == "json":
|
||||||
|
data.to_json((path + name + ".json"))
|
@ -1,12 +1,16 @@
|
|||||||
import math
|
import math
|
||||||
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
from matplotlib import pyplot as plt
|
|
||||||
from numba import njit
|
from numba import njit
|
||||||
from scipy.optimize import curve_fit
|
from scipy.optimize import curve_fit
|
||||||
|
|
||||||
import pyzebra
|
import pyzebra
|
||||||
|
|
||||||
|
try:
|
||||||
|
from matplotlib import pyplot as plt
|
||||||
|
except ImportError:
|
||||||
|
print("matplotlib is not available")
|
||||||
|
|
||||||
pi_r = 180 / np.pi
|
pi_r = 180 / np.pi
|
||||||
|
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user