Compare commits
38 Commits
Author | SHA1 | Date | |
---|---|---|---|
b2fc2d604a | |||
78096efcef | |||
5b0f97959e | |||
8fb1c5f247 | |||
58641ab94f | |||
a6bcb8ffa1 | |||
7b6e6bf396 | |||
45f295fcf8 | |||
3c58fd2102 | |||
abbaded278 | |||
fbe992c901 | |||
dec282d1b7 | |||
4429823629 | |||
80fddb514a | |||
cfe9832c1e | |||
fd942672df | |||
60cb733ca7 | |||
7c2ecef56d | |||
468f33e606 | |||
dbc643aba9 | |||
0856705024 | |||
ce608f1b49 | |||
3eaf54eda3 | |||
a496267a9d | |||
1a3ebfbcbd | |||
7bcb23c1bd | |||
b28fe39bbb | |||
42c6e6b921 | |||
dba2dc6149 | |||
a0c9b0162b | |||
00b0c2d708 | |||
4ae8890bb8 | |||
430ffc2caa | |||
aee5c82925 | |||
7e16ea0fea | |||
6ff1b2b54f | |||
6099df650b | |||
0347566aeb |
@ -1,10 +1,9 @@
|
|||||||
import pyzebra.ccl_dict_operation
|
|
||||||
from pyzebra.anatric import *
|
from pyzebra.anatric import *
|
||||||
from pyzebra.ccl_findpeaks import ccl_findpeaks
|
from pyzebra.ccl_findpeaks import ccl_findpeaks
|
||||||
from pyzebra.comm_export import export_comm
|
|
||||||
from pyzebra.fit2 import fitccl
|
from pyzebra.fit2 import fitccl
|
||||||
from pyzebra.h5 import *
|
from pyzebra.h5 import *
|
||||||
from pyzebra.load_1D import load_1D, parse_1D
|
from pyzebra.ccl_io import load_1D, parse_1D, export_comm
|
||||||
|
from pyzebra.param_study_moduls import add_dict, auto, merge, scan_dict
|
||||||
from pyzebra.xtal import *
|
from pyzebra.xtal import *
|
||||||
|
|
||||||
__version__ = "0.1.0"
|
__version__ = "0.1.2"
|
||||||
|
@ -2,7 +2,6 @@ import subprocess
|
|||||||
import xml.etree.ElementTree as ET
|
import xml.etree.ElementTree as ET
|
||||||
|
|
||||||
|
|
||||||
ANATRIC_PATH = "/afs/psi.ch/project/sinq/rhel7/bin/anatric"
|
|
||||||
DATA_FACTORY_IMPLEMENTATION = [
|
DATA_FACTORY_IMPLEMENTATION = [
|
||||||
"trics",
|
"trics",
|
||||||
"morph",
|
"morph",
|
||||||
@ -24,8 +23,8 @@ REFLECTION_PRINTER_FORMATS = [
|
|||||||
ALGORITHMS = ["adaptivemaxcog", "adaptivedynamic"]
|
ALGORITHMS = ["adaptivemaxcog", "adaptivedynamic"]
|
||||||
|
|
||||||
|
|
||||||
def anatric(config_file):
|
def anatric(config_file, anatric_path="/afs/psi.ch/project/sinq/rhel7/bin/anatric"):
|
||||||
subprocess.run([ANATRIC_PATH, config_file], check=True)
|
subprocess.run([anatric_path, config_file], check=True)
|
||||||
|
|
||||||
|
|
||||||
class AnatricConfig:
|
class AnatricConfig:
|
||||||
|
@ -1,4 +1,3 @@
|
|||||||
import argparse
|
|
||||||
import logging
|
import logging
|
||||||
import sys
|
import sys
|
||||||
from io import StringIO
|
from io import StringIO
|
||||||
@ -11,14 +10,8 @@ import panel_ccl_integrate
|
|||||||
import panel_hdf_anatric
|
import panel_hdf_anatric
|
||||||
import panel_hdf_viewer
|
import panel_hdf_viewer
|
||||||
|
|
||||||
parser = argparse.ArgumentParser(
|
|
||||||
prog="pyzebra", formatter_class=argparse.ArgumentDefaultsHelpFormatter
|
|
||||||
)
|
|
||||||
|
|
||||||
args = parser.parse_args()
|
|
||||||
|
|
||||||
doc = curdoc()
|
doc = curdoc()
|
||||||
doc.title = "pyzebra"
|
|
||||||
|
|
||||||
sys.stdout = StringIO()
|
sys.stdout = StringIO()
|
||||||
stdout_textareainput = TextAreaInput(title="print output:", height=150)
|
stdout_textareainput = TextAreaInput(title="print output:", height=150)
|
||||||
@ -26,7 +19,7 @@ stdout_textareainput = TextAreaInput(title="print output:", height=150)
|
|||||||
bokeh_stream = StringIO()
|
bokeh_stream = StringIO()
|
||||||
bokeh_handler = logging.StreamHandler(bokeh_stream)
|
bokeh_handler = logging.StreamHandler(bokeh_stream)
|
||||||
bokeh_handler.setFormatter(logging.Formatter(logging.BASIC_FORMAT))
|
bokeh_handler.setFormatter(logging.Formatter(logging.BASIC_FORMAT))
|
||||||
bokeh_logger = logging.getLogger('bokeh')
|
bokeh_logger = logging.getLogger("bokeh")
|
||||||
bokeh_logger.addHandler(bokeh_handler)
|
bokeh_logger.addHandler(bokeh_handler)
|
||||||
bokeh_log_textareainput = TextAreaInput(title="server output:", height=150)
|
bokeh_log_textareainput = TextAreaInput(title="server output:", height=150)
|
||||||
|
|
||||||
|
30
pyzebra/app/handler.py
Normal file
30
pyzebra/app/handler.py
Normal file
@ -0,0 +1,30 @@
|
|||||||
|
from bokeh.application.handlers import Handler
|
||||||
|
|
||||||
|
|
||||||
|
class PyzebraHandler(Handler):
|
||||||
|
"""Provides a mechanism for generic bokeh applications to build up new streamvis documents.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, anatric_path):
|
||||||
|
"""Initialize a pyzebra handler for bokeh applications.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
args (Namespace): Command line parsed arguments.
|
||||||
|
"""
|
||||||
|
super().__init__() # no-op
|
||||||
|
|
||||||
|
self.anatric_path = anatric_path
|
||||||
|
|
||||||
|
def modify_document(self, doc):
|
||||||
|
"""Modify an application document with pyzebra specific features.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
doc (Document) : A bokeh Document to update in-place
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Document
|
||||||
|
"""
|
||||||
|
doc.title = "pyzebra"
|
||||||
|
doc.anatric_path = self.anatric_path
|
||||||
|
|
||||||
|
return doc
|
@ -2,6 +2,7 @@ import base64
|
|||||||
import io
|
import io
|
||||||
import os
|
import os
|
||||||
import tempfile
|
import tempfile
|
||||||
|
from copy import deepcopy
|
||||||
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
from bokeh.layouts import column, row
|
from bokeh.layouts import column, row
|
||||||
@ -9,6 +10,7 @@ from bokeh.models import (
|
|||||||
Asterisk,
|
Asterisk,
|
||||||
BasicTicker,
|
BasicTicker,
|
||||||
Button,
|
Button,
|
||||||
|
CheckboxEditor,
|
||||||
ColumnDataSource,
|
ColumnDataSource,
|
||||||
CustomJS,
|
CustomJS,
|
||||||
DataRange1d,
|
DataRange1d,
|
||||||
@ -19,8 +21,10 @@ from bokeh.models import (
|
|||||||
Line,
|
Line,
|
||||||
LinearAxis,
|
LinearAxis,
|
||||||
Panel,
|
Panel,
|
||||||
|
PanTool,
|
||||||
Plot,
|
Plot,
|
||||||
RadioButtonGroup,
|
RadioButtonGroup,
|
||||||
|
ResetTool,
|
||||||
Scatter,
|
Scatter,
|
||||||
Select,
|
Select,
|
||||||
Spacer,
|
Spacer,
|
||||||
@ -30,6 +34,7 @@ from bokeh.models import (
|
|||||||
TextAreaInput,
|
TextAreaInput,
|
||||||
TextInput,
|
TextInput,
|
||||||
Toggle,
|
Toggle,
|
||||||
|
WheelZoomTool,
|
||||||
Whisker,
|
Whisker,
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -60,7 +65,7 @@ def create():
|
|||||||
js_data = ColumnDataSource(data=dict(cont=[], ext=[]))
|
js_data = ColumnDataSource(data=dict(cont=[], ext=[]))
|
||||||
|
|
||||||
def proposal_textinput_callback(_attr, _old, new):
|
def proposal_textinput_callback(_attr, _old, new):
|
||||||
ccl_path = os.path.join(PROPOSAL_PATH, new)
|
ccl_path = os.path.join(PROPOSAL_PATH, new.strip())
|
||||||
ccl_file_list = []
|
ccl_file_list = []
|
||||||
for file in os.listdir(ccl_path):
|
for file in os.listdir(ccl_path):
|
||||||
if file.endswith(".ccl"):
|
if file.endswith(".ccl"):
|
||||||
@ -71,23 +76,30 @@ def create():
|
|||||||
proposal_textinput = TextInput(title="Enter proposal number:", default_size=145)
|
proposal_textinput = TextInput(title="Enter proposal number:", default_size=145)
|
||||||
proposal_textinput.on_change("value", proposal_textinput_callback)
|
proposal_textinput.on_change("value", proposal_textinput_callback)
|
||||||
|
|
||||||
def ccl_file_select_callback(_attr, _old, new):
|
def _init_datatable():
|
||||||
nonlocal det_data
|
|
||||||
with open(new) as file:
|
|
||||||
_, ext = os.path.splitext(new)
|
|
||||||
det_data = pyzebra.parse_1D(file, ext)
|
|
||||||
|
|
||||||
scan_list = list(det_data["scan"].keys())
|
scan_list = list(det_data["scan"].keys())
|
||||||
hkl = [
|
hkl = [
|
||||||
f'{int(m["h_index"])} {int(m["k_index"])} {int(m["l_index"])}'
|
f'{int(m["h_index"])} {int(m["k_index"])} {int(m["l_index"])}'
|
||||||
for m in det_data["scan"].values()
|
for m in det_data["scan"].values()
|
||||||
]
|
]
|
||||||
scan_table_source.data.update(
|
scan_table_source.data.update(
|
||||||
scan=scan_list, hkl=hkl, peaks=[0] * len(scan_list), fit=[0] * len(scan_list)
|
scan=scan_list,
|
||||||
|
hkl=hkl,
|
||||||
|
peaks=[0] * len(scan_list),
|
||||||
|
fit=[0] * len(scan_list),
|
||||||
|
export=[True] * len(scan_list),
|
||||||
)
|
)
|
||||||
scan_table_source.selected.indices = []
|
scan_table_source.selected.indices = []
|
||||||
scan_table_source.selected.indices = [0]
|
scan_table_source.selected.indices = [0]
|
||||||
|
|
||||||
|
def ccl_file_select_callback(_attr, _old, new):
|
||||||
|
nonlocal det_data
|
||||||
|
with open(new) as file:
|
||||||
|
_, ext = os.path.splitext(new)
|
||||||
|
det_data = pyzebra.parse_1D(file, ext)
|
||||||
|
|
||||||
|
_init_datatable()
|
||||||
|
|
||||||
ccl_file_select = Select(title="Available .ccl files")
|
ccl_file_select = Select(title="Available .ccl files")
|
||||||
ccl_file_select.on_change("value", ccl_file_select_callback)
|
ccl_file_select.on_change("value", ccl_file_select_callback)
|
||||||
|
|
||||||
@ -97,30 +109,35 @@ def create():
|
|||||||
_, ext = os.path.splitext(upload_button.filename)
|
_, ext = os.path.splitext(upload_button.filename)
|
||||||
det_data = pyzebra.parse_1D(file, ext)
|
det_data = pyzebra.parse_1D(file, ext)
|
||||||
|
|
||||||
scan_list = list(det_data["scan"].keys())
|
_init_datatable()
|
||||||
hkl = [
|
|
||||||
f'{int(m["h_index"])} {int(m["k_index"])} {int(m["l_index"])}'
|
|
||||||
for m in det_data["scan"].values()
|
|
||||||
]
|
|
||||||
scan_table_source.data.update(
|
|
||||||
scan=scan_list, hkl=hkl, peaks=[0] * len(scan_list), fit=[0] * len(scan_list)
|
|
||||||
)
|
|
||||||
scan_table_source.selected.indices = []
|
|
||||||
scan_table_source.selected.indices = [0]
|
|
||||||
|
|
||||||
upload_button = FileInput(accept=".ccl")
|
upload_button = FileInput(accept=".ccl")
|
||||||
upload_button.on_change("value", upload_button_callback)
|
upload_button.on_change("value", upload_button_callback)
|
||||||
|
|
||||||
|
def append_upload_button_callback(_attr, _old, new):
|
||||||
|
nonlocal det_data
|
||||||
|
with io.StringIO(base64.b64decode(new).decode()) as file:
|
||||||
|
_, ext = os.path.splitext(append_upload_button.filename)
|
||||||
|
append_data = pyzebra.parse_1D(file, ext)
|
||||||
|
|
||||||
|
added = pyzebra.add_dict(det_data, append_data)
|
||||||
|
scan_result = pyzebra.auto(pyzebra.scan_dict(added))
|
||||||
|
det_data = pyzebra.merge(added, added, scan_result)
|
||||||
|
|
||||||
|
_init_datatable()
|
||||||
|
|
||||||
|
append_upload_button = FileInput(accept=".ccl,.dat")
|
||||||
|
append_upload_button.on_change("value", append_upload_button_callback)
|
||||||
|
|
||||||
def _update_table():
|
def _update_table():
|
||||||
num_of_peaks = [scan.get("num_of_peaks", 0) for scan in det_data["scan"].values()]
|
num_of_peaks = [scan.get("num_of_peaks", 0) for scan in det_data["scan"].values()]
|
||||||
fit_ok = [(1 if "fit" in scan else 0) for scan in det_data["scan"].values()]
|
fit_ok = [(1 if "fit" in scan else 0) for scan in det_data["scan"].values()]
|
||||||
scan_table_source.data.update(peaks=num_of_peaks, fit=fit_ok)
|
scan_table_source.data.update(peaks=num_of_peaks, fit=fit_ok)
|
||||||
|
|
||||||
def _update_plot(ind):
|
def _update_plot(scan):
|
||||||
nonlocal peak_pos_textinput_lock
|
nonlocal peak_pos_textinput_lock
|
||||||
peak_pos_textinput_lock = True
|
peak_pos_textinput_lock = True
|
||||||
|
|
||||||
scan = det_data["scan"][ind]
|
|
||||||
y = scan["Counts"]
|
y = scan["Counts"]
|
||||||
x = scan["om"]
|
x = scan["om"]
|
||||||
|
|
||||||
@ -145,18 +162,17 @@ def create():
|
|||||||
|
|
||||||
fit = scan.get("fit")
|
fit = scan.get("fit")
|
||||||
if fit is not None:
|
if fit is not None:
|
||||||
|
x = scan["fit"]["x_fit"]
|
||||||
plot_gauss_source.data.update(x=x, y=scan["fit"]["comps"]["gaussian"])
|
plot_gauss_source.data.update(x=x, y=scan["fit"]["comps"]["gaussian"])
|
||||||
plot_bkg_source.data.update(x=x, y=scan["fit"]["comps"]["background"])
|
plot_bkg_source.data.update(x=x, y=scan["fit"]["comps"]["background"])
|
||||||
params = fit["result"].params
|
params = fit["result"].params
|
||||||
fit_output_textinput.value = (
|
fit_output_textinput.value = (
|
||||||
"%s \n"
|
|
||||||
"Gaussian: centre = %9.4f, sigma = %9.4f, area = %9.4f \n"
|
"Gaussian: centre = %9.4f, sigma = %9.4f, area = %9.4f \n"
|
||||||
"background: slope = %9.4f, intercept = %9.4f \n"
|
"background: slope = %9.4f, intercept = %9.4f \n"
|
||||||
"Int. area = %9.4f +/- %9.4f \n"
|
"Int. area = %9.4f +/- %9.4f \n"
|
||||||
"fit area = %9.4f +/- %9.4f \n"
|
"fit area = %9.4f +/- %9.4f \n"
|
||||||
"ratio((fit-int)/fit) = %9.4f"
|
"ratio((fit-int)/fit) = %9.4f"
|
||||||
% (
|
% (
|
||||||
ind,
|
|
||||||
params["g_cen"].value,
|
params["g_cen"].value,
|
||||||
params["g_width"].value,
|
params["g_width"].value,
|
||||||
params["g_amp"].value,
|
params["g_amp"].value,
|
||||||
@ -188,13 +204,7 @@ def create():
|
|||||||
numfit_max_span.location = None
|
numfit_max_span.location = None
|
||||||
|
|
||||||
# Main plot
|
# Main plot
|
||||||
plot = Plot(
|
plot = Plot(x_range=DataRange1d(), y_range=DataRange1d(), plot_height=400, plot_width=700)
|
||||||
x_range=DataRange1d(),
|
|
||||||
y_range=DataRange1d(),
|
|
||||||
plot_height=400,
|
|
||||||
plot_width=700,
|
|
||||||
toolbar_location=None,
|
|
||||||
)
|
|
||||||
|
|
||||||
plot.add_layout(LinearAxis(axis_label="Counts"), place="left")
|
plot.add_layout(LinearAxis(axis_label="Counts"), place="left")
|
||||||
plot.add_layout(LinearAxis(axis_label="Omega"), place="below")
|
plot.add_layout(LinearAxis(axis_label="Omega"), place="below")
|
||||||
@ -226,12 +236,28 @@ def create():
|
|||||||
numfit_max_span = Span(location=None, dimension="height", line_dash="dashed")
|
numfit_max_span = Span(location=None, dimension="height", line_dash="dashed")
|
||||||
plot.add_layout(numfit_max_span)
|
plot.add_layout(numfit_max_span)
|
||||||
|
|
||||||
# Scan select
|
plot.add_tools(PanTool(), WheelZoomTool(), ResetTool())
|
||||||
def scan_table_callback(_attr, _old, new):
|
plot.toolbar.logo = None
|
||||||
if new:
|
|
||||||
_update_plot(scan_table_source.data["scan"][new[-1]])
|
|
||||||
|
|
||||||
scan_table_source = ColumnDataSource(dict(scan=[], hkl=[], peaks=[], fit=[]))
|
# Scan select
|
||||||
|
def scan_table_select_callback(_attr, old, new):
|
||||||
|
if not new:
|
||||||
|
# skip empty selections
|
||||||
|
return
|
||||||
|
|
||||||
|
# Avoid selection of multiple indicies (via Shift+Click or Ctrl+Click)
|
||||||
|
if len(new) > 1:
|
||||||
|
# drop selection to the previous one
|
||||||
|
scan_table_source.selected.indices = old
|
||||||
|
return
|
||||||
|
|
||||||
|
if len(old) > 1:
|
||||||
|
# skip unnecessary update caused by selection drop
|
||||||
|
return
|
||||||
|
|
||||||
|
_update_plot(det_data["scan"][scan_table_source.data["scan"][new[0]]])
|
||||||
|
|
||||||
|
scan_table_source = ColumnDataSource(dict(scan=[], hkl=[], peaks=[], fit=[], export=[]))
|
||||||
scan_table = DataTable(
|
scan_table = DataTable(
|
||||||
source=scan_table_source,
|
source=scan_table_source,
|
||||||
columns=[
|
columns=[
|
||||||
@ -239,25 +265,30 @@ def create():
|
|||||||
TableColumn(field="hkl", title="hkl"),
|
TableColumn(field="hkl", title="hkl"),
|
||||||
TableColumn(field="peaks", title="Peaks"),
|
TableColumn(field="peaks", title="Peaks"),
|
||||||
TableColumn(field="fit", title="Fit"),
|
TableColumn(field="fit", title="Fit"),
|
||||||
|
TableColumn(field="export", title="Export", editor=CheckboxEditor()),
|
||||||
],
|
],
|
||||||
width=200,
|
width=250,
|
||||||
index_position=None,
|
index_position=None,
|
||||||
|
editable=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
scan_table_source.selected.on_change("indices", scan_table_callback)
|
scan_table_source.selected.on_change("indices", scan_table_select_callback)
|
||||||
|
|
||||||
|
def _get_selected_scan():
|
||||||
|
selected_index = scan_table_source.selected.indices[0]
|
||||||
|
selected_scan_id = scan_table_source.data["scan"][selected_index]
|
||||||
|
return det_data["scan"][selected_scan_id]
|
||||||
|
|
||||||
def peak_pos_textinput_callback(_attr, _old, new):
|
def peak_pos_textinput_callback(_attr, _old, new):
|
||||||
if new is not None and not peak_pos_textinput_lock:
|
if new is not None and not peak_pos_textinput_lock:
|
||||||
sel_ind = scan_table_source.selected.indices[-1]
|
scan = _get_selected_scan()
|
||||||
scan_name = scan_table_source.data["scan"][sel_ind]
|
|
||||||
scan = det_data["scan"][scan_name]
|
|
||||||
|
|
||||||
scan["num_of_peaks"] = 1
|
scan["num_of_peaks"] = 1
|
||||||
peak_ind = (np.abs(scan["om"] - float(new))).argmin()
|
peak_ind = (np.abs(scan["om"] - float(new))).argmin()
|
||||||
scan["peak_indexes"] = np.array([peak_ind], dtype=np.int64)
|
scan["peak_indexes"] = np.array([peak_ind], dtype=np.int64)
|
||||||
scan["peak_heights"] = np.array([scan["smooth_peaks"][peak_ind]])
|
scan["peak_heights"] = np.array([scan["smooth_peaks"][peak_ind]])
|
||||||
_update_table()
|
_update_table()
|
||||||
_update_plot(scan_name)
|
_update_plot(scan)
|
||||||
|
|
||||||
peak_pos_textinput = TextInput(title="Peak position:", default_size=145)
|
peak_pos_textinput = TextInput(title="Peak position:", default_size=145)
|
||||||
peak_pos_textinput.on_change("value", peak_pos_textinput_callback)
|
peak_pos_textinput.on_change("value", peak_pos_textinput_callback)
|
||||||
@ -322,30 +353,8 @@ def create():
|
|||||||
|
|
||||||
fit_output_textinput = TextAreaInput(title="Fit results:", width=450, height=400)
|
fit_output_textinput = TextAreaInput(title="Fit results:", width=450, height=400)
|
||||||
|
|
||||||
def peakfind_all_button_callback():
|
def _get_peakfind_params():
|
||||||
for scan in det_data["scan"].values():
|
return dict(
|
||||||
pyzebra.ccl_findpeaks(
|
|
||||||
scan,
|
|
||||||
int_threshold=peak_int_ratio_spinner.value,
|
|
||||||
prominence=peak_prominence_spinner.value,
|
|
||||||
smooth=smooth_toggle.active,
|
|
||||||
window_size=window_size_spinner.value,
|
|
||||||
poly_order=poly_order_spinner.value,
|
|
||||||
)
|
|
||||||
|
|
||||||
_update_table()
|
|
||||||
|
|
||||||
sel_ind = scan_table_source.selected.indices[-1]
|
|
||||||
_update_plot(scan_table_source.data["scan"][sel_ind])
|
|
||||||
|
|
||||||
peakfind_all_button = Button(label="Peak Find All", button_type="primary", default_size=145)
|
|
||||||
peakfind_all_button.on_click(peakfind_all_button_callback)
|
|
||||||
|
|
||||||
def peakfind_button_callback():
|
|
||||||
sel_ind = scan_table_source.selected.indices[-1]
|
|
||||||
scan = scan_table_source.data["scan"][sel_ind]
|
|
||||||
pyzebra.ccl_findpeaks(
|
|
||||||
det_data["scan"][scan],
|
|
||||||
int_threshold=peak_int_ratio_spinner.value,
|
int_threshold=peak_int_ratio_spinner.value,
|
||||||
prominence=peak_prominence_spinner.value,
|
prominence=peak_prominence_spinner.value,
|
||||||
smooth=smooth_toggle.active,
|
smooth=smooth_toggle.active,
|
||||||
@ -353,61 +362,29 @@ def create():
|
|||||||
poly_order=poly_order_spinner.value,
|
poly_order=poly_order_spinner.value,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def peakfind_all_button_callback():
|
||||||
|
peakfind_params = _get_peakfind_params()
|
||||||
|
for scan in det_data["scan"].values():
|
||||||
|
pyzebra.ccl_findpeaks(scan, **peakfind_params)
|
||||||
|
|
||||||
|
_update_table()
|
||||||
|
_update_plot(_get_selected_scan())
|
||||||
|
|
||||||
|
peakfind_all_button = Button(label="Peak Find All", button_type="primary", default_size=145)
|
||||||
|
peakfind_all_button.on_click(peakfind_all_button_callback)
|
||||||
|
|
||||||
|
def peakfind_button_callback():
|
||||||
|
scan = _get_selected_scan()
|
||||||
|
pyzebra.ccl_findpeaks(scan, **_get_peakfind_params())
|
||||||
|
|
||||||
_update_table()
|
_update_table()
|
||||||
_update_plot(scan)
|
_update_plot(scan)
|
||||||
|
|
||||||
peakfind_button = Button(label="Peak Find Current", default_size=145)
|
peakfind_button = Button(label="Peak Find Current", default_size=145)
|
||||||
peakfind_button.on_click(peakfind_button_callback)
|
peakfind_button.on_click(peakfind_button_callback)
|
||||||
|
|
||||||
def fit_all_button_callback():
|
def _get_fit_params():
|
||||||
for scan in det_data["scan"].values():
|
return dict(
|
||||||
pyzebra.fitccl(
|
|
||||||
scan,
|
|
||||||
guess=[
|
|
||||||
centre_guess.value,
|
|
||||||
sigma_guess.value,
|
|
||||||
ampl_guess.value,
|
|
||||||
slope_guess.value,
|
|
||||||
offset_guess.value,
|
|
||||||
],
|
|
||||||
vary=[
|
|
||||||
centre_vary.active,
|
|
||||||
sigma_vary.active,
|
|
||||||
ampl_vary.active,
|
|
||||||
slope_vary.active,
|
|
||||||
offset_vary.active,
|
|
||||||
],
|
|
||||||
constraints_min=[
|
|
||||||
centre_min.value,
|
|
||||||
sigma_min.value,
|
|
||||||
ampl_min.value,
|
|
||||||
slope_min.value,
|
|
||||||
offset_min.value,
|
|
||||||
],
|
|
||||||
constraints_max=[
|
|
||||||
centre_max.value,
|
|
||||||
sigma_max.value,
|
|
||||||
ampl_max.value,
|
|
||||||
slope_max.value,
|
|
||||||
offset_max.value,
|
|
||||||
],
|
|
||||||
numfit_min=integ_from.value,
|
|
||||||
numfit_max=integ_to.value,
|
|
||||||
)
|
|
||||||
|
|
||||||
sel_ind = scan_table_source.selected.indices[-1]
|
|
||||||
_update_plot(scan_table_source.data["scan"][sel_ind])
|
|
||||||
_update_table()
|
|
||||||
|
|
||||||
fit_all_button = Button(label="Fit All", button_type="primary", default_size=145)
|
|
||||||
fit_all_button.on_click(fit_all_button_callback)
|
|
||||||
|
|
||||||
def fit_button_callback():
|
|
||||||
sel_ind = scan_table_source.selected.indices[-1]
|
|
||||||
scan = scan_table_source.data["scan"][sel_ind]
|
|
||||||
|
|
||||||
pyzebra.fitccl(
|
|
||||||
det_data["scan"][scan],
|
|
||||||
guess=[
|
guess=[
|
||||||
centre_guess.value,
|
centre_guess.value,
|
||||||
sigma_guess.value,
|
sigma_guess.value,
|
||||||
@ -438,8 +415,25 @@ def create():
|
|||||||
],
|
],
|
||||||
numfit_min=integ_from.value,
|
numfit_min=integ_from.value,
|
||||||
numfit_max=integ_to.value,
|
numfit_max=integ_to.value,
|
||||||
|
binning=bin_size_spinner.value,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def fit_all_button_callback():
|
||||||
|
fit_params = _get_fit_params()
|
||||||
|
for scan in det_data["scan"].values():
|
||||||
|
# fit_params are updated inplace within `fitccl`
|
||||||
|
pyzebra.fitccl(scan, **deepcopy(fit_params))
|
||||||
|
|
||||||
|
_update_plot(_get_selected_scan())
|
||||||
|
_update_table()
|
||||||
|
|
||||||
|
fit_all_button = Button(label="Fit All", button_type="primary", default_size=145)
|
||||||
|
fit_all_button.on_click(fit_all_button_callback)
|
||||||
|
|
||||||
|
def fit_button_callback():
|
||||||
|
scan = _get_selected_scan()
|
||||||
|
pyzebra.fitccl(scan, **_get_fit_params())
|
||||||
|
|
||||||
_update_plot(scan)
|
_update_plot(scan)
|
||||||
_update_table()
|
_update_table()
|
||||||
|
|
||||||
@ -454,6 +448,10 @@ def create():
|
|||||||
)
|
)
|
||||||
area_method_radiobutton.on_change("active", area_method_radiobutton_callback)
|
area_method_radiobutton.on_change("active", area_method_radiobutton_callback)
|
||||||
|
|
||||||
|
bin_size_spinner = Spinner(title="Bin size:", value=1, low=1, step=1, default_size=145)
|
||||||
|
|
||||||
|
lorentz_toggle = Toggle(label="Lorentz Correction", default_size=145)
|
||||||
|
|
||||||
preview_output_textinput = TextAreaInput(title="Export file preview:", width=450, height=400)
|
preview_output_textinput = TextAreaInput(title="Export file preview:", width=450, height=400)
|
||||||
|
|
||||||
def preview_output_button_callback():
|
def preview_output_button_callback():
|
||||||
@ -464,7 +462,11 @@ def create():
|
|||||||
|
|
||||||
with tempfile.TemporaryDirectory() as temp_dir:
|
with tempfile.TemporaryDirectory() as temp_dir:
|
||||||
temp_file = temp_dir + "/temp"
|
temp_file = temp_dir + "/temp"
|
||||||
pyzebra.export_comm(det_data, temp_file)
|
export_data = deepcopy(det_data)
|
||||||
|
for s, export in zip(scan_table_source.data["scan"], scan_table_source.data["export"]):
|
||||||
|
if not export:
|
||||||
|
del export_data["scan"][s]
|
||||||
|
pyzebra.export_comm(export_data, temp_file, lorentz=lorentz_toggle.active)
|
||||||
|
|
||||||
with open(f"{temp_file}{ext}") as f:
|
with open(f"{temp_file}{ext}") as f:
|
||||||
preview_output_textinput.value = f.read()
|
preview_output_textinput.value = f.read()
|
||||||
@ -480,7 +482,11 @@ def create():
|
|||||||
|
|
||||||
with tempfile.TemporaryDirectory() as temp_dir:
|
with tempfile.TemporaryDirectory() as temp_dir:
|
||||||
temp_file = temp_dir + "/temp"
|
temp_file = temp_dir + "/temp"
|
||||||
pyzebra.export_comm(det_data, temp_file)
|
export_data = deepcopy(det_data)
|
||||||
|
for s, export in zip(scan_table_source.data["scan"], scan_table_source.data["export"]):
|
||||||
|
if not export:
|
||||||
|
del export_data["scan"][s]
|
||||||
|
pyzebra.export_comm(export_data, temp_file, lorentz=lorentz_toggle.active)
|
||||||
|
|
||||||
with open(f"{temp_file}{ext}") as f:
|
with open(f"{temp_file}{ext}") as f:
|
||||||
output_content = f.read()
|
output_content = f.read()
|
||||||
@ -530,6 +536,7 @@ def create():
|
|||||||
Spacer(width=20),
|
Spacer(width=20),
|
||||||
column(
|
column(
|
||||||
row(integ_from, integ_to),
|
row(integ_from, integ_to),
|
||||||
|
row(bin_size_spinner, column(Spacer(height=19), lorentz_toggle)),
|
||||||
row(fitparam_reset_button, area_method_radiobutton),
|
row(fitparam_reset_button, area_method_radiobutton),
|
||||||
row(fit_button, fit_all_button),
|
row(fit_button, fit_all_button),
|
||||||
),
|
),
|
||||||
@ -538,9 +545,15 @@ def create():
|
|||||||
export_layout = column(preview_output_textinput, row(preview_output_button, save_button))
|
export_layout = column(preview_output_textinput, row(preview_output_button, save_button))
|
||||||
|
|
||||||
upload_div = Div(text="Or upload .ccl file:")
|
upload_div = Div(text="Or upload .ccl file:")
|
||||||
|
append_upload_div = Div(text="append extra .ccl/.dat files:")
|
||||||
tab_layout = column(
|
tab_layout = column(
|
||||||
row(proposal_textinput, ccl_file_select),
|
row(proposal_textinput, ccl_file_select),
|
||||||
row(column(Spacer(height=5), upload_div), upload_button),
|
row(
|
||||||
|
column(Spacer(height=5), upload_div),
|
||||||
|
upload_button,
|
||||||
|
column(Spacer(height=5), append_upload_div),
|
||||||
|
append_upload_button,
|
||||||
|
),
|
||||||
row(scan_table, plot, Spacer(width=30), fit_output_textinput, export_layout),
|
row(scan_table, plot, Spacer(width=30), fit_output_textinput, export_layout),
|
||||||
row(findpeak_controls, Spacer(width=30), fitpeak_controls),
|
row(findpeak_controls, Spacer(width=30), fitpeak_controls),
|
||||||
)
|
)
|
||||||
|
@ -21,6 +21,7 @@ from pyzebra.anatric import DATA_FACTORY_IMPLEMENTATION, REFLECTION_PRINTER_FORM
|
|||||||
|
|
||||||
|
|
||||||
def create():
|
def create():
|
||||||
|
doc = curdoc()
|
||||||
config = pyzebra.AnatricConfig()
|
config = pyzebra.AnatricConfig()
|
||||||
|
|
||||||
def _load_config_file(file):
|
def _load_config_file(file):
|
||||||
@ -345,7 +346,7 @@ def create():
|
|||||||
with tempfile.TemporaryDirectory() as temp_dir:
|
with tempfile.TemporaryDirectory() as temp_dir:
|
||||||
temp_file = temp_dir + "/temp.xml"
|
temp_file = temp_dir + "/temp.xml"
|
||||||
config.save_as(temp_file)
|
config.save_as(temp_file)
|
||||||
pyzebra.anatric(temp_file)
|
pyzebra.anatric(temp_file, anatric_path=doc.anatric_path)
|
||||||
|
|
||||||
with open(config.logfile) as f_log:
|
with open(config.logfile) as f_log:
|
||||||
output_log.value = f_log.read()
|
output_log.value = f_log.read()
|
||||||
@ -404,6 +405,6 @@ def create():
|
|||||||
with open("debug.xml") as f_config:
|
with open("debug.xml") as f_config:
|
||||||
output_config.value = f_config.read()
|
output_config.value = f_config.read()
|
||||||
|
|
||||||
curdoc().add_periodic_callback(update_config, 1000)
|
doc.add_periodic_callback(update_config, 1000)
|
||||||
|
|
||||||
return Panel(child=tab_layout, title="hdf anatric")
|
return Panel(child=tab_layout, title="hdf anatric")
|
||||||
|
@ -74,8 +74,8 @@ def create():
|
|||||||
image_source.data.update(image=[current_image])
|
image_source.data.update(image=[current_image])
|
||||||
|
|
||||||
if auto_toggle.active:
|
if auto_toggle.active:
|
||||||
im_max = int(np.max(current_image))
|
im_min = np.min(current_image)
|
||||||
im_min = int(np.min(current_image))
|
im_max = np.max(current_image)
|
||||||
|
|
||||||
display_min_spinner.value = im_min
|
display_min_spinner.value = im_min
|
||||||
display_max_spinner.value = im_max
|
display_max_spinner.value = im_max
|
||||||
@ -83,8 +83,15 @@ def create():
|
|||||||
image_glyph.color_mapper.low = im_min
|
image_glyph.color_mapper.low = im_min
|
||||||
image_glyph.color_mapper.high = im_max
|
image_glyph.color_mapper.high = im_max
|
||||||
|
|
||||||
magnetic_field_spinner.value = det_data["magnetic_field"][index]
|
if "magnetic_field" in det_data:
|
||||||
temperature_spinner.value = det_data["temperature"][index]
|
magnetic_field_spinner.value = det_data["magnetic_field"][index]
|
||||||
|
else:
|
||||||
|
magnetic_field_spinner.value = None
|
||||||
|
|
||||||
|
if "temperature" in det_data:
|
||||||
|
temperature_spinner.value = det_data["temperature"][index]
|
||||||
|
else:
|
||||||
|
temperature_spinner.value = None
|
||||||
|
|
||||||
gamma, nu = calculate_pol(det_data, index)
|
gamma, nu = calculate_pol(det_data, index)
|
||||||
omega = np.ones((IMAGE_H, IMAGE_W)) * det_data["rot_angle"][index]
|
omega = np.ones((IMAGE_H, IMAGE_W)) * det_data["rot_angle"][index]
|
||||||
@ -99,6 +106,18 @@ def create():
|
|||||||
overview_plot_x_image_source.data.update(image=[overview_x], dw=[n_x])
|
overview_plot_x_image_source.data.update(image=[overview_x], dw=[n_x])
|
||||||
overview_plot_y_image_source.data.update(image=[overview_y], dw=[n_y])
|
overview_plot_y_image_source.data.update(image=[overview_y], dw=[n_y])
|
||||||
|
|
||||||
|
if proj_auto_toggle.active:
|
||||||
|
im_min = min(np.min(overview_x), np.min(overview_y))
|
||||||
|
im_max = max(np.max(overview_x), np.max(overview_y))
|
||||||
|
|
||||||
|
proj_display_min_spinner.value = im_min
|
||||||
|
proj_display_max_spinner.value = im_max
|
||||||
|
|
||||||
|
overview_plot_x_image_glyph.color_mapper.low = im_min
|
||||||
|
overview_plot_y_image_glyph.color_mapper.low = im_min
|
||||||
|
overview_plot_x_image_glyph.color_mapper.high = im_max
|
||||||
|
overview_plot_y_image_glyph.color_mapper.high = im_max
|
||||||
|
|
||||||
if frame_button_group.active == 0: # Frame
|
if frame_button_group.active == 0: # Frame
|
||||||
overview_plot_x.axis[1].axis_label = "Frame"
|
overview_plot_x.axis[1].axis_label = "Frame"
|
||||||
overview_plot_y.axis[1].axis_label = "Frame"
|
overview_plot_y.axis[1].axis_label = "Frame"
|
||||||
@ -388,7 +407,6 @@ def create():
|
|||||||
radio_button_group = RadioButtonGroup(labels=["nb", "nb_bi"], active=0)
|
radio_button_group = RadioButtonGroup(labels=["nb", "nb_bi"], active=0)
|
||||||
|
|
||||||
STEP = 1
|
STEP = 1
|
||||||
|
|
||||||
# ---- colormap auto toggle button
|
# ---- colormap auto toggle button
|
||||||
def auto_toggle_callback(state):
|
def auto_toggle_callback(state):
|
||||||
if state:
|
if state:
|
||||||
@ -400,7 +418,9 @@ def create():
|
|||||||
|
|
||||||
update_image()
|
update_image()
|
||||||
|
|
||||||
auto_toggle = Toggle(label="Auto Range", active=True, button_type="default", default_size=145)
|
auto_toggle = Toggle(
|
||||||
|
label="Main Auto Range", active=True, button_type="default", default_size=125
|
||||||
|
)
|
||||||
auto_toggle.on_click(auto_toggle_callback)
|
auto_toggle.on_click(auto_toggle_callback)
|
||||||
|
|
||||||
# ---- colormap display max value
|
# ---- colormap display max value
|
||||||
@ -409,12 +429,12 @@ def create():
|
|||||||
image_glyph.color_mapper.high = new_value
|
image_glyph.color_mapper.high = new_value
|
||||||
|
|
||||||
display_max_spinner = Spinner(
|
display_max_spinner = Spinner(
|
||||||
title="Maximal Display Value:",
|
title="Max Value:",
|
||||||
low=0 + STEP,
|
low=0 + STEP,
|
||||||
value=1,
|
value=1,
|
||||||
step=STEP,
|
step=STEP,
|
||||||
disabled=auto_toggle.active,
|
disabled=auto_toggle.active,
|
||||||
default_size=145,
|
default_size=80,
|
||||||
)
|
)
|
||||||
display_max_spinner.on_change("value", display_max_spinner_callback)
|
display_max_spinner.on_change("value", display_max_spinner_callback)
|
||||||
|
|
||||||
@ -424,15 +444,66 @@ def create():
|
|||||||
image_glyph.color_mapper.low = new_value
|
image_glyph.color_mapper.low = new_value
|
||||||
|
|
||||||
display_min_spinner = Spinner(
|
display_min_spinner = Spinner(
|
||||||
title="Minimal Display Value:",
|
title="Min Value:",
|
||||||
|
low=0,
|
||||||
high=1 - STEP,
|
high=1 - STEP,
|
||||||
value=0,
|
value=0,
|
||||||
step=STEP,
|
step=STEP,
|
||||||
disabled=auto_toggle.active,
|
disabled=auto_toggle.active,
|
||||||
default_size=145,
|
default_size=80,
|
||||||
)
|
)
|
||||||
display_min_spinner.on_change("value", display_min_spinner_callback)
|
display_min_spinner.on_change("value", display_min_spinner_callback)
|
||||||
|
|
||||||
|
PROJ_STEP = 0.1
|
||||||
|
# ---- proj colormap auto toggle button
|
||||||
|
def proj_auto_toggle_callback(state):
|
||||||
|
if state:
|
||||||
|
proj_display_min_spinner.disabled = True
|
||||||
|
proj_display_max_spinner.disabled = True
|
||||||
|
else:
|
||||||
|
proj_display_min_spinner.disabled = False
|
||||||
|
proj_display_max_spinner.disabled = False
|
||||||
|
|
||||||
|
update_overview_plot()
|
||||||
|
|
||||||
|
proj_auto_toggle = Toggle(
|
||||||
|
label="Proj Auto Range", active=True, button_type="default", default_size=125
|
||||||
|
)
|
||||||
|
proj_auto_toggle.on_click(proj_auto_toggle_callback)
|
||||||
|
|
||||||
|
# ---- proj colormap display max value
|
||||||
|
def proj_display_max_spinner_callback(_attr, _old_value, new_value):
|
||||||
|
proj_display_min_spinner.high = new_value - PROJ_STEP
|
||||||
|
overview_plot_x_image_glyph.color_mapper.high = new_value
|
||||||
|
overview_plot_y_image_glyph.color_mapper.high = new_value
|
||||||
|
|
||||||
|
proj_display_max_spinner = Spinner(
|
||||||
|
title="Max Value:",
|
||||||
|
low=0 + PROJ_STEP,
|
||||||
|
value=1,
|
||||||
|
step=PROJ_STEP,
|
||||||
|
disabled=proj_auto_toggle.active,
|
||||||
|
default_size=80,
|
||||||
|
)
|
||||||
|
proj_display_max_spinner.on_change("value", proj_display_max_spinner_callback)
|
||||||
|
|
||||||
|
# ---- proj colormap display min value
|
||||||
|
def proj_display_min_spinner_callback(_attr, _old_value, new_value):
|
||||||
|
proj_display_max_spinner.low = new_value + PROJ_STEP
|
||||||
|
overview_plot_x_image_glyph.color_mapper.low = new_value
|
||||||
|
overview_plot_y_image_glyph.color_mapper.low = new_value
|
||||||
|
|
||||||
|
proj_display_min_spinner = Spinner(
|
||||||
|
title="Min Value:",
|
||||||
|
low=0,
|
||||||
|
high=1 - PROJ_STEP,
|
||||||
|
value=0,
|
||||||
|
step=PROJ_STEP,
|
||||||
|
disabled=proj_auto_toggle.active,
|
||||||
|
default_size=80,
|
||||||
|
)
|
||||||
|
proj_display_min_spinner.on_change("value", proj_display_min_spinner_callback)
|
||||||
|
|
||||||
def hkl_button_callback():
|
def hkl_button_callback():
|
||||||
index = index_spinner.value
|
index = index_spinner.value
|
||||||
setup_type = "nb_bi" if radio_button_group.active else "nb"
|
setup_type = "nb_bi" if radio_button_group.active else "nb"
|
||||||
@ -474,8 +545,13 @@ def create():
|
|||||||
# Final layout
|
# Final layout
|
||||||
layout_image = column(gridplot([[proj_v, None], [plot, proj_h]], merge_tools=False))
|
layout_image = column(gridplot([[proj_v, None], [plot, proj_h]], merge_tools=False))
|
||||||
colormap_layout = column(
|
colormap_layout = column(
|
||||||
row(colormap, column(Spacer(height=19), auto_toggle)),
|
row(colormap),
|
||||||
row(display_max_spinner, display_min_spinner),
|
row(column(Spacer(height=19), auto_toggle), display_max_spinner, display_min_spinner),
|
||||||
|
row(
|
||||||
|
column(Spacer(height=19), proj_auto_toggle),
|
||||||
|
proj_display_max_spinner,
|
||||||
|
proj_display_min_spinner,
|
||||||
|
),
|
||||||
)
|
)
|
||||||
hkl_layout = column(radio_button_group, hkl_button)
|
hkl_layout = column(radio_button_group, hkl_button)
|
||||||
params_layout = row(magnetic_field_spinner, temperature_spinner)
|
params_layout = row(magnetic_field_spinner, temperature_spinner)
|
||||||
|
@ -1,513 +0,0 @@
|
|||||||
import numpy as np
|
|
||||||
import uncertainties as u
|
|
||||||
|
|
||||||
from .fit2 import create_uncertanities
|
|
||||||
|
|
||||||
|
|
||||||
def add_dict(dict1, dict2):
|
|
||||||
"""adds two dictionaries, meta of the new is saved as meata+original_filename and
|
|
||||||
measurements are shifted to continue with numbering of first dict
|
|
||||||
:arg dict1 : dictionarry to add to
|
|
||||||
:arg dict2 : dictionarry from which to take the measurements
|
|
||||||
:return dict1 : combined dictionary
|
|
||||||
Note: dict1 must be made from ccl, otherwise we would have to change the structure of loaded
|
|
||||||
dat file"""
|
|
||||||
max_measurement_dict1 = max([int(str(keys)[1:]) for keys in dict1["scan"]])
|
|
||||||
if dict2["meta"]["data_type"] == ".ccl":
|
|
||||||
new_filenames = [
|
|
||||||
"M" + str(x + max_measurement_dict1)
|
|
||||||
for x in [int(str(keys)[1:]) for keys in dict2["scan"]]
|
|
||||||
]
|
|
||||||
new_meta_name = "meta" + str(dict2["meta"]["original_filename"])
|
|
||||||
if new_meta_name not in dict1:
|
|
||||||
for keys, name in zip(dict2["scan"], new_filenames):
|
|
||||||
dict2["scan"][keys]["file_of_origin"] = str(dict2["meta"]["original_filename"])
|
|
||||||
dict1["scan"][name] = dict2["scan"][keys]
|
|
||||||
|
|
||||||
dict1[new_meta_name] = dict2["meta"]
|
|
||||||
|
|
||||||
else:
|
|
||||||
raise KeyError(
|
|
||||||
str(
|
|
||||||
"The file %s has alredy been added to %s"
|
|
||||||
% (dict2["meta"]["original_filename"], dict1["meta"]["original_filename"])
|
|
||||||
)
|
|
||||||
)
|
|
||||||
elif dict2["meta"]["data_type"] == ".dat":
|
|
||||||
d = {}
|
|
||||||
new_name = "M" + str(max_measurement_dict1 + 1)
|
|
||||||
hkl = dict2["meta"]["title"]
|
|
||||||
d["h_index"] = float(hkl.split()[-3])
|
|
||||||
d["k_index"] = float(hkl.split()[-2])
|
|
||||||
d["l_index"] = float(hkl.split()[-1])
|
|
||||||
d["number_of_measurements"] = len(dict2["scan"]["NP"])
|
|
||||||
d["om"] = dict2["scan"]["om"]
|
|
||||||
d["Counts"] = dict2["scan"]["Counts"]
|
|
||||||
d["monitor"] = dict2["scan"]["Monitor1"][0]
|
|
||||||
d["temperature"] = dict2["meta"]["temp"]
|
|
||||||
d["mag_field"] = dict2["meta"]["mf"]
|
|
||||||
d["omega_angle"] = dict2["meta"]["omega"]
|
|
||||||
dict1["scan"][new_name] = d
|
|
||||||
print(hkl.split())
|
|
||||||
for keys in d:
|
|
||||||
print(keys)
|
|
||||||
|
|
||||||
print("s")
|
|
||||||
|
|
||||||
return dict1
|
|
||||||
|
|
||||||
|
|
||||||
def auto(dict):
|
|
||||||
"""takes just unique tuples from all tuples in dictionary returend by scan_dict
|
|
||||||
intendet for automatic merge if you doesent want to specify what scans to merge together
|
|
||||||
args: dict - dictionary from scan_dict function
|
|
||||||
:return dict - dict without repetitions"""
|
|
||||||
for keys in dict:
|
|
||||||
tuple_list = dict[keys]
|
|
||||||
new = list()
|
|
||||||
for i in range(len(tuple_list)):
|
|
||||||
if tuple_list[0][0] == tuple_list[i][0]:
|
|
||||||
new.append(tuple_list[i])
|
|
||||||
dict[keys] = new
|
|
||||||
return dict
|
|
||||||
|
|
||||||
|
|
||||||
def scan_dict(dict):
|
|
||||||
"""scans dictionary for duplicate hkl indexes
|
|
||||||
:arg dict : dictionary to scan
|
|
||||||
:return dictionary with matching scans, if there are none, the dict is empty
|
|
||||||
note: can be checked by "not d", true if empty
|
|
||||||
"""
|
|
||||||
|
|
||||||
d = {}
|
|
||||||
for i in dict["scan"]:
|
|
||||||
for j in dict["scan"]:
|
|
||||||
if dict["scan"][str(i)] != dict["scan"][str(j)]:
|
|
||||||
itup = (
|
|
||||||
dict["scan"][str(i)]["h_index"],
|
|
||||||
dict["scan"][str(i)]["k_index"],
|
|
||||||
dict["scan"][str(i)]["l_index"],
|
|
||||||
)
|
|
||||||
jtup = (
|
|
||||||
dict["scan"][str(j)]["h_index"],
|
|
||||||
dict["scan"][str(j)]["k_index"],
|
|
||||||
dict["scan"][str(j)]["l_index"],
|
|
||||||
)
|
|
||||||
if itup != jtup:
|
|
||||||
pass
|
|
||||||
else:
|
|
||||||
|
|
||||||
if str(itup) not in d:
|
|
||||||
d[str(itup)] = list()
|
|
||||||
d[str(itup)].append((i, j))
|
|
||||||
else:
|
|
||||||
d[str(itup)].append((i, j))
|
|
||||||
else:
|
|
||||||
continue
|
|
||||||
return d
|
|
||||||
|
|
||||||
|
|
||||||
def compare_hkl(dict1, dict2):
|
|
||||||
"""Compares two dictionaries based on hkl indexes and return dictionary with str(h k l) as
|
|
||||||
key and tuple with keys to same scan in dict1 and dict2
|
|
||||||
:arg dict1 : first dictionary
|
|
||||||
:arg dict2 : second dictionary
|
|
||||||
:return d : dict with matches
|
|
||||||
example of one key: '0.0 0.0 -1.0 : ('M1', 'M9')' meaning that 001 hkl scan is M1 in
|
|
||||||
first dict and M9 in second"""
|
|
||||||
d = {}
|
|
||||||
dupl = 0
|
|
||||||
for keys in dict1["scan"]:
|
|
||||||
for key in dict2["scan"]:
|
|
||||||
if (
|
|
||||||
dict1["scan"][str(keys)]["h_index"] == dict2["scan"][str(key)]["h_index"]
|
|
||||||
and dict1["scan"][str(keys)]["k_index"] == dict2["scan"][str(key)]["k_index"]
|
|
||||||
and dict1["scan"][str(keys)]["l_index"] == dict2["scan"][str(key)]["l_index"]
|
|
||||||
):
|
|
||||||
|
|
||||||
if (
|
|
||||||
str(
|
|
||||||
(
|
|
||||||
str(dict1["scan"][str(keys)]["h_index"])
|
|
||||||
+ " "
|
|
||||||
+ str(dict1["scan"][str(keys)]["k_index"])
|
|
||||||
+ " "
|
|
||||||
+ str(dict1["scan"][str(keys)]["l_index"])
|
|
||||||
)
|
|
||||||
)
|
|
||||||
not in d
|
|
||||||
):
|
|
||||||
d[
|
|
||||||
str(
|
|
||||||
str(dict1["scan"][str(keys)]["h_index"])
|
|
||||||
+ " "
|
|
||||||
+ str(dict1["scan"][str(keys)]["k_index"])
|
|
||||||
+ " "
|
|
||||||
+ str(dict1["scan"][str(keys)]["l_index"])
|
|
||||||
)
|
|
||||||
] = (str(keys), str(key))
|
|
||||||
else:
|
|
||||||
dupl = dupl + 1
|
|
||||||
d[
|
|
||||||
str(
|
|
||||||
str(dict1["scan"][str(keys)]["h_index"])
|
|
||||||
+ " "
|
|
||||||
+ str(dict1["scan"][str(keys)]["k_index"])
|
|
||||||
+ " "
|
|
||||||
+ str(dict1["scan"][str(keys)]["l_index"])
|
|
||||||
+ "_dupl"
|
|
||||||
+ str(dupl)
|
|
||||||
)
|
|
||||||
] = (str(keys), str(key))
|
|
||||||
else:
|
|
||||||
continue
|
|
||||||
|
|
||||||
return d
|
|
||||||
|
|
||||||
|
|
||||||
def create_tuples(x, y, y_err):
|
|
||||||
"""creates tuples for sorting and merginng of the data
|
|
||||||
Counts need to be normalized to monitor before"""
|
|
||||||
t = list()
|
|
||||||
for i in range(len(x)):
|
|
||||||
tup = (x[i], y[i], y_err[i])
|
|
||||||
t.append(tup)
|
|
||||||
return t
|
|
||||||
|
|
||||||
|
|
||||||
def normalize(dict, key, monitor):
|
|
||||||
"""Normalizes the scan to monitor, checks if sigma exists, otherwise creates it
|
|
||||||
:arg dict : dictionary to from which to tkae the scan
|
|
||||||
:arg key : which scan to normalize from dict1
|
|
||||||
:arg monitor : final monitor
|
|
||||||
:return counts - normalized counts
|
|
||||||
:return sigma - normalized sigma"""
|
|
||||||
|
|
||||||
counts = np.array(dict["scan"][key]["Counts"])
|
|
||||||
sigma = np.sqrt(counts) if "sigma" not in dict["scan"][key] else dict["scan"][key]["sigma"]
|
|
||||||
monitor_ratio = monitor / dict["scan"][key]["monitor"]
|
|
||||||
scaled_counts = counts * monitor_ratio
|
|
||||||
scaled_sigma = np.array(sigma) * monitor_ratio
|
|
||||||
|
|
||||||
return scaled_counts, scaled_sigma
|
|
||||||
|
|
||||||
|
|
||||||
def merge(dict1, dict2, keys, auto=True, monitor=100000):
|
|
||||||
"""merges the two tuples and sorts them, if om value is same, Counts value is average
|
|
||||||
averaging is propagated into sigma if dict1 == dict2, key[1] is deleted after merging
|
|
||||||
:arg dict1 : dictionary to which scan will be merged
|
|
||||||
:arg dict2 : dictionary from which scan will be merged
|
|
||||||
:arg keys : tuple with key to dict1 and dict2
|
|
||||||
:arg auto : if true, when monitors are same, does not change it, if flase, takes monitor always
|
|
||||||
:arg monitor : final monitor after merging
|
|
||||||
note: dict1 and dict2 can be same dict
|
|
||||||
:return dict1 with merged scan"""
|
|
||||||
if auto:
|
|
||||||
if dict1["scan"][keys[0]]["monitor"] == dict2["scan"][keys[1]]["monitor"]:
|
|
||||||
monitor = dict1["scan"][keys[0]]["monitor"]
|
|
||||||
|
|
||||||
# load om and Counts
|
|
||||||
x1, x2 = dict1["scan"][keys[0]]["om"], dict2["scan"][keys[1]]["om"]
|
|
||||||
cor_y1, y_err1 = normalize(dict1, keys[0], monitor=monitor)
|
|
||||||
cor_y2, y_err2 = normalize(dict2, keys[1], monitor=monitor)
|
|
||||||
# creates touples (om, Counts, sigma) for sorting and further processing
|
|
||||||
tuple_list = create_tuples(x1, cor_y1, y_err1) + create_tuples(x2, cor_y2, y_err2)
|
|
||||||
# Sort the list on om and add 0 0 0 tuple to the last position
|
|
||||||
sorted_t = sorted(tuple_list, key=lambda tup: tup[0])
|
|
||||||
sorted_t.append((0, 0, 0))
|
|
||||||
om, Counts, sigma = [], [], []
|
|
||||||
seen = list()
|
|
||||||
for i in range(len(sorted_t) - 1):
|
|
||||||
if sorted_t[i][0] not in seen:
|
|
||||||
if sorted_t[i][0] != sorted_t[i + 1][0]:
|
|
||||||
om = np.append(om, sorted_t[i][0])
|
|
||||||
Counts = np.append(Counts, sorted_t[i][1])
|
|
||||||
sigma = np.append(sigma, sorted_t[i][2])
|
|
||||||
else:
|
|
||||||
om = np.append(om, sorted_t[i][0])
|
|
||||||
counts1, counts2 = sorted_t[i][1], sorted_t[i + 1][1]
|
|
||||||
sigma1, sigma2 = sorted_t[i][2], sorted_t[i + 1][2]
|
|
||||||
count_err1 = u.ufloat(counts1, sigma1)
|
|
||||||
count_err2 = u.ufloat(counts2, sigma2)
|
|
||||||
avg = (count_err1 + count_err2) / 2
|
|
||||||
Counts = np.append(Counts, avg.n)
|
|
||||||
sigma = np.append(sigma, avg.s)
|
|
||||||
seen.append(sorted_t[i][0])
|
|
||||||
else:
|
|
||||||
continue
|
|
||||||
|
|
||||||
if dict1 == dict2:
|
|
||||||
del dict1["scan"][keys[1]]
|
|
||||||
|
|
||||||
note = (
|
|
||||||
f"This scan was merged with scan {keys[1]} from "
|
|
||||||
f'file {dict2["meta"]["original_filename"]} \n'
|
|
||||||
)
|
|
||||||
if "notes" not in dict1["scan"][str(keys[0])]:
|
|
||||||
dict1["scan"][str(keys[0])]["notes"] = note
|
|
||||||
else:
|
|
||||||
dict1["scan"][str(keys[0])]["notes"] += note
|
|
||||||
|
|
||||||
dict1["scan"][keys[0]]["om"] = om
|
|
||||||
dict1["scan"][keys[0]]["Counts"] = Counts
|
|
||||||
dict1["scan"][keys[0]]["sigma"] = sigma
|
|
||||||
dict1["scan"][keys[0]]["monitor"] = monitor
|
|
||||||
print("merging done")
|
|
||||||
return dict1
|
|
||||||
|
|
||||||
|
|
||||||
def substract_measurement(dict1, dict2, keys, auto=True, monitor=100000):
|
|
||||||
"""Substracts two scan (scan key2 from dict2 from measurent key1 in dict1), expects om to be same
|
|
||||||
:arg dict1 : dictionary to which scan will be merged
|
|
||||||
:arg dict2 : dictionary from which scan will be merged
|
|
||||||
:arg keys : tuple with key to dict1 and dict2
|
|
||||||
:arg auto : if true, when monitors are same, does not change it, if flase, takes monitor always
|
|
||||||
:arg monitor : final monitor after merging
|
|
||||||
:returns d : dict1 with substracted Counts from dict2 and sigma that comes from the substraction"""
|
|
||||||
|
|
||||||
if len(dict1["scan"][keys[0]]["om"]) != len(dict2["scan"][keys[1]]["om"]):
|
|
||||||
raise ValueError("Omegas have different lengths, cannot be substracted")
|
|
||||||
|
|
||||||
if auto:
|
|
||||||
if dict1["scan"][keys[0]]["monitor"] == dict2["scan"][keys[1]]["monitor"]:
|
|
||||||
monitor = dict1["scan"][keys[0]]["monitor"]
|
|
||||||
|
|
||||||
cor_y1, y_err1 = normalize(dict1, keys[0], monitor=monitor)
|
|
||||||
cor_y2, y_err2 = normalize(dict2, keys[1], monitor=monitor)
|
|
||||||
|
|
||||||
dict1_count_err = create_uncertanities(cor_y1, y_err1)
|
|
||||||
dict2_count_err = create_uncertanities(cor_y2, y_err2)
|
|
||||||
|
|
||||||
res = np.subtract(dict1_count_err, dict2_count_err)
|
|
||||||
|
|
||||||
res_nom = []
|
|
||||||
res_err = []
|
|
||||||
for k in range(len(res)):
|
|
||||||
res_nom = np.append(res_nom, res[k].n)
|
|
||||||
res_err = np.append(res_err, res[k].s)
|
|
||||||
|
|
||||||
if len([num for num in res_nom if num < 0]) >= 0.3 * len(res_nom):
|
|
||||||
print(
|
|
||||||
f"Warning! percentage of negative numbers in scan subsracted {keys[0]} is "
|
|
||||||
f"{len([num for num in res_nom if num < 0]) / len(res_nom)}"
|
|
||||||
)
|
|
||||||
|
|
||||||
dict1["scan"][str(keys[0])]["Counts"] = res_nom
|
|
||||||
dict1["scan"][str(keys[0])]["sigma"] = res_err
|
|
||||||
dict1["scan"][str(keys[0])]["monitor"] = monitor
|
|
||||||
note = (
|
|
||||||
f'Scan {keys[1]} from file {dict2["meta"]["original_filename"]} '
|
|
||||||
f"was substracted from this scan \n"
|
|
||||||
)
|
|
||||||
if "notes" not in dict1["scan"][str(keys[0])]:
|
|
||||||
dict1["scan"][str(keys[0])]["notes"] = note
|
|
||||||
else:
|
|
||||||
dict1["scan"][str(keys[0])]["notes"] += note
|
|
||||||
return dict1
|
|
||||||
|
|
||||||
|
|
||||||
def compare_dict(dict1, dict2):
|
|
||||||
"""takes two ccl dictionaries and compare different values for each key
|
|
||||||
:arg dict1 : dictionary 1 (ccl)
|
|
||||||
:arg dict2 : dictionary 2 (ccl)
|
|
||||||
:returns warning : dictionary with keys from primary files (if they differ) with
|
|
||||||
information of how many scan differ and which ones differ
|
|
||||||
:returns report_string string comparing all different values respecively of measurements"""
|
|
||||||
|
|
||||||
if dict1["meta"]["data_type"] != dict2["meta"]["data_type"]:
|
|
||||||
print("select two dicts")
|
|
||||||
return
|
|
||||||
S = []
|
|
||||||
conflicts = {}
|
|
||||||
warnings = {}
|
|
||||||
|
|
||||||
comp = compare_hkl(dict1, dict2)
|
|
||||||
d1 = scan_dict(dict1)
|
|
||||||
d2 = scan_dict(dict2)
|
|
||||||
if not d1:
|
|
||||||
S.append("There are no duplicates in %s (dict1) \n" % dict1["meta"]["original_filename"])
|
|
||||||
else:
|
|
||||||
S.append(
|
|
||||||
"There are %d duplicates in %s (dict1) \n"
|
|
||||||
% (len(d1), dict1["meta"]["original_filename"])
|
|
||||||
)
|
|
||||||
warnings["Duplicates in dict1"] = list()
|
|
||||||
for keys in d1:
|
|
||||||
S.append("Measurements %s with hkl %s \n" % (d1[keys], keys))
|
|
||||||
warnings["Duplicates in dict1"].append(d1[keys])
|
|
||||||
if not d2:
|
|
||||||
S.append("There are no duplicates in %s (dict2) \n" % dict2["meta"]["original_filename"])
|
|
||||||
else:
|
|
||||||
S.append(
|
|
||||||
"There are %d duplicates in %s (dict2) \n"
|
|
||||||
% (len(d2), dict2["meta"]["original_filename"])
|
|
||||||
)
|
|
||||||
warnings["Duplicates in dict2"] = list()
|
|
||||||
for keys in d2:
|
|
||||||
S.append("Measurements %s with hkl %s \n" % (d2[keys], keys))
|
|
||||||
warnings["Duplicates in dict2"].append(d2[keys])
|
|
||||||
|
|
||||||
# compare meta
|
|
||||||
S.append("Different values in meta: \n")
|
|
||||||
different_meta = {
|
|
||||||
k: dict1["meta"][k]
|
|
||||||
for k in dict1["meta"]
|
|
||||||
if k in dict2["meta"] and dict1["meta"][k] != dict2["meta"][k]
|
|
||||||
}
|
|
||||||
exlude_meta_set = ["original_filename", "date", "title"]
|
|
||||||
for keys in different_meta:
|
|
||||||
if keys in exlude_meta_set:
|
|
||||||
continue
|
|
||||||
else:
|
|
||||||
if keys not in conflicts:
|
|
||||||
conflicts[keys] = 1
|
|
||||||
else:
|
|
||||||
conflicts[keys] = conflicts[keys] + 1
|
|
||||||
|
|
||||||
S.append(" Different values in %s \n" % str(keys))
|
|
||||||
S.append(" dict1: %s \n" % str(dict1["meta"][str(keys)]))
|
|
||||||
S.append(" dict2: %s \n" % str(dict2["meta"][str(keys)]))
|
|
||||||
|
|
||||||
# compare Measurements
|
|
||||||
S.append(
|
|
||||||
"Number of measurements in %s = %s \n"
|
|
||||||
% (dict1["meta"]["original_filename"], len(dict1["scan"]))
|
|
||||||
)
|
|
||||||
S.append(
|
|
||||||
"Number of measurements in %s = %s \n"
|
|
||||||
% (dict2["meta"]["original_filename"], len(dict2["scan"]))
|
|
||||||
)
|
|
||||||
S.append("Different values in Measurements:\n")
|
|
||||||
select_set = ["om", "Counts", "sigma"]
|
|
||||||
exlude_set = ["time", "Counts", "date", "notes"]
|
|
||||||
for keys1 in comp:
|
|
||||||
for key2 in dict1["scan"][str(comp[str(keys1)][0])]:
|
|
||||||
if key2 in exlude_set:
|
|
||||||
continue
|
|
||||||
if key2 not in select_set:
|
|
||||||
try:
|
|
||||||
if (
|
|
||||||
dict1["scan"][comp[str(keys1)][0]][str(key2)]
|
|
||||||
!= dict2["scan"][str(comp[str(keys1)][1])][str(key2)]
|
|
||||||
):
|
|
||||||
S.append(
|
|
||||||
"Scan value "
|
|
||||||
"%s"
|
|
||||||
", with hkl %s differs in meausrements %s and %s \n"
|
|
||||||
% (key2, keys1, comp[str(keys1)][0], comp[str(keys1)][1])
|
|
||||||
)
|
|
||||||
S.append(
|
|
||||||
" dict1: %s \n"
|
|
||||||
% str(dict1["scan"][comp[str(keys1)][0]][str(key2)])
|
|
||||||
)
|
|
||||||
S.append(
|
|
||||||
" dict2: %s \n"
|
|
||||||
% str(dict2["scan"][comp[str(keys1)][1]][str(key2)])
|
|
||||||
)
|
|
||||||
if key2 not in conflicts:
|
|
||||||
conflicts[key2] = {}
|
|
||||||
conflicts[key2]["amount"] = 1
|
|
||||||
conflicts[key2]["scan"] = str(comp[str(keys1)])
|
|
||||||
else:
|
|
||||||
|
|
||||||
conflicts[key2]["amount"] = conflicts[key2]["amount"] + 1
|
|
||||||
conflicts[key2]["scan"] = (
|
|
||||||
conflicts[key2]["scan"] + " " + (str(comp[str(keys1)]))
|
|
||||||
)
|
|
||||||
except KeyError as e:
|
|
||||||
print("Missing keys, some files were probably merged or substracted")
|
|
||||||
print(e.args)
|
|
||||||
|
|
||||||
else:
|
|
||||||
try:
|
|
||||||
comparison = list(dict1["scan"][comp[str(keys1)][0]][str(key2)]) == list(
|
|
||||||
dict2["scan"][comp[str(keys1)][1]][str(key2)]
|
|
||||||
)
|
|
||||||
if len(list(dict1["scan"][comp[str(keys1)][0]][str(key2)])) != len(
|
|
||||||
list(dict2["scan"][comp[str(keys1)][1]][str(key2)])
|
|
||||||
):
|
|
||||||
if str("different length of %s" % key2) not in warnings:
|
|
||||||
warnings[str("different length of %s" % key2)] = list()
|
|
||||||
warnings[str("different length of %s" % key2)].append(
|
|
||||||
(str(comp[keys1][0]), str(comp[keys1][1]))
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
warnings[str("different length of %s" % key2)].append(
|
|
||||||
(str(comp[keys1][0]), str(comp[keys1][1]))
|
|
||||||
)
|
|
||||||
if not comparison:
|
|
||||||
S.append(
|
|
||||||
"Scan value "
|
|
||||||
"%s"
|
|
||||||
" differs in scan %s and %s \n"
|
|
||||||
% (key2, comp[str(keys1)][0], comp[str(keys1)][1])
|
|
||||||
)
|
|
||||||
S.append(
|
|
||||||
" dict1: %s \n"
|
|
||||||
% str(list(dict1["scan"][comp[str(keys1)][0]][str(key2)]))
|
|
||||||
)
|
|
||||||
S.append(
|
|
||||||
" dict2: %s \n"
|
|
||||||
% str(list(dict2["scan"][comp[str(keys1)][1]][str(key2)]))
|
|
||||||
)
|
|
||||||
if key2 not in conflicts:
|
|
||||||
conflicts[key2] = {}
|
|
||||||
conflicts[key2]["amount"] = 1
|
|
||||||
conflicts[key2]["scan"] = str(comp[str(keys1)])
|
|
||||||
else:
|
|
||||||
conflicts[key2]["amount"] = conflicts[key2]["amount"] + 1
|
|
||||||
conflicts[key2]["scan"] = (
|
|
||||||
conflicts[key2]["scan"] + " " + (str(comp[str(keys1)]))
|
|
||||||
)
|
|
||||||
except KeyError as e:
|
|
||||||
print("Missing keys, some files were probably merged or substracted")
|
|
||||||
print(e.args)
|
|
||||||
|
|
||||||
for keys in conflicts:
|
|
||||||
try:
|
|
||||||
conflicts[str(keys)]["scan"] = conflicts[str(keys)]["scan"].split(" ")
|
|
||||||
except:
|
|
||||||
continue
|
|
||||||
report_string = "".join(S)
|
|
||||||
return warnings, conflicts, report_string
|
|
||||||
|
|
||||||
|
|
||||||
def guess_next(dict1, dict2, comp):
|
|
||||||
"""iterates thorough the scans and tries to decide if the scans should be
|
|
||||||
substracted or merged"""
|
|
||||||
threshold = 0.05
|
|
||||||
for keys in comp:
|
|
||||||
if (
|
|
||||||
abs(
|
|
||||||
(
|
|
||||||
dict1["scan"][str(comp[keys][0])]["temperature"]
|
|
||||||
- dict2["scan"][str(comp[keys][1])]["temperature"]
|
|
||||||
)
|
|
||||||
/ dict2["scan"][str(comp[keys][1])]["temperature"]
|
|
||||||
)
|
|
||||||
< threshold
|
|
||||||
and abs(
|
|
||||||
(
|
|
||||||
dict1["scan"][str(comp[keys][0])]["mag_field"]
|
|
||||||
- dict2["scan"][str(comp[keys][1])]["mag_field"]
|
|
||||||
)
|
|
||||||
/ dict2["scan"][str(comp[keys][1])]["mag_field"]
|
|
||||||
)
|
|
||||||
< threshold
|
|
||||||
):
|
|
||||||
comp[keys] = comp[keys] + tuple("m")
|
|
||||||
else:
|
|
||||||
comp[keys] = comp[keys] + tuple("s")
|
|
||||||
|
|
||||||
return comp
|
|
||||||
|
|
||||||
|
|
||||||
def process_dict(dict1, dict2, comp):
|
|
||||||
"""substracts or merges scans, guess_next function must run first """
|
|
||||||
for keys in comp:
|
|
||||||
if comp[keys][2] == "s":
|
|
||||||
substract_measurement(dict1, dict2, comp[keys])
|
|
||||||
elif comp[keys][2] == "m":
|
|
||||||
merge(dict1, dict2, comp[keys])
|
|
||||||
|
|
||||||
return dict1
|
|
@ -5,7 +5,13 @@ from scipy.signal import savgol_filter
|
|||||||
|
|
||||||
|
|
||||||
def ccl_findpeaks(
|
def ccl_findpeaks(
|
||||||
scan, int_threshold=0.8, prominence=50, smooth=False, window_size=7, poly_order=3
|
scan,
|
||||||
|
int_threshold=0.8,
|
||||||
|
prominence=50,
|
||||||
|
smooth=False,
|
||||||
|
window_size=7,
|
||||||
|
poly_order=3,
|
||||||
|
variable="om",
|
||||||
):
|
):
|
||||||
|
|
||||||
"""function iterates through the dictionary created by load_cclv2 and locates peaks for each scan
|
"""function iterates through the dictionary created by load_cclv2 and locates peaks for each scan
|
||||||
@ -54,7 +60,7 @@ def ccl_findpeaks(
|
|||||||
prominence = 50
|
prominence = 50
|
||||||
print("Invalid value for prominence, select positive number, new value set to:", prominence)
|
print("Invalid value for prominence, select positive number, new value set to:", prominence)
|
||||||
|
|
||||||
omega = scan["om"]
|
omega = scan[variable]
|
||||||
counts = np.array(scan["Counts"])
|
counts = np.array(scan["Counts"])
|
||||||
if smooth:
|
if smooth:
|
||||||
itp = interp1d(omega, counts, kind="linear")
|
itp = interp1d(omega, counts, kind="linear")
|
||||||
|
@ -1,7 +1,6 @@
|
|||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
from decimal import Decimal
|
|
||||||
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
|
|
||||||
@ -20,6 +19,7 @@ META_VARS_STR = (
|
|||||||
"proposal_email",
|
"proposal_email",
|
||||||
"detectorDistance",
|
"detectorDistance",
|
||||||
)
|
)
|
||||||
|
|
||||||
META_VARS_FLOAT = (
|
META_VARS_FLOAT = (
|
||||||
"omega",
|
"omega",
|
||||||
"mf",
|
"mf",
|
||||||
@ -58,30 +58,29 @@ META_VARS_FLOAT = (
|
|||||||
META_UB_MATRIX = ("ub1j", "ub2j", "ub3j")
|
META_UB_MATRIX = ("ub1j", "ub2j", "ub3j")
|
||||||
|
|
||||||
CCL_FIRST_LINE = (
|
CCL_FIRST_LINE = (
|
||||||
# the first element is `scan_number`, which we don't save to metadata
|
("scan_number", int),
|
||||||
("h_index", float),
|
("h_index", float),
|
||||||
("k_index", float),
|
("k_index", float),
|
||||||
("l_index", float),
|
("l_index", float),
|
||||||
)
|
)
|
||||||
|
|
||||||
CCL_FIRST_LINE_BI = (
|
CCL_ANGLES = {
|
||||||
*CCL_FIRST_LINE,
|
"bi": (
|
||||||
("twotheta_angle", float),
|
("twotheta_angle", float),
|
||||||
("omega_angle", float),
|
("omega_angle", float),
|
||||||
("chi_angle", float),
|
("chi_angle", float),
|
||||||
("phi_angle", float),
|
("phi_angle", float),
|
||||||
)
|
),
|
||||||
|
"nb": (
|
||||||
CCL_FIRST_LINE_NB = (
|
("gamma_angle", float),
|
||||||
*CCL_FIRST_LINE,
|
("omega_angle", float),
|
||||||
("gamma_angle", float),
|
("nu_angle", float),
|
||||||
("omega_angle", float),
|
("unkwn_angle", float),
|
||||||
("nu_angle", float),
|
),
|
||||||
("unkwn_angle", float),
|
}
|
||||||
)
|
|
||||||
|
|
||||||
CCL_SECOND_LINE = (
|
CCL_SECOND_LINE = (
|
||||||
("number_of_measurements", int),
|
("n_points", int),
|
||||||
("angle_step", float),
|
("angle_step", float),
|
||||||
("monitor", float),
|
("monitor", float),
|
||||||
("temperature", float),
|
("temperature", float),
|
||||||
@ -132,50 +131,34 @@ def parse_1D(fileobj, data_type):
|
|||||||
# read data
|
# read data
|
||||||
scan = {}
|
scan = {}
|
||||||
if data_type == ".ccl":
|
if data_type == ".ccl":
|
||||||
decimal = list()
|
ccl_first_line = (*CCL_FIRST_LINE, *CCL_ANGLES[metadata["zebra_mode"]])
|
||||||
|
|
||||||
if metadata["zebra_mode"] == "bi":
|
|
||||||
ccl_first_line = CCL_FIRST_LINE_BI
|
|
||||||
elif metadata["zebra_mode"] == "nb":
|
|
||||||
ccl_first_line = CCL_FIRST_LINE_NB
|
|
||||||
ccl_second_line = CCL_SECOND_LINE
|
ccl_second_line = CCL_SECOND_LINE
|
||||||
|
|
||||||
for line in fileobj:
|
for line in fileobj:
|
||||||
d = {}
|
d = {}
|
||||||
|
|
||||||
# first line
|
# first line
|
||||||
scan_number, *params = line.split()
|
for param, (param_name, param_type) in zip(line.split(), ccl_first_line):
|
||||||
for param, (param_name, param_type) in zip(params, ccl_first_line):
|
|
||||||
d[param_name] = param_type(param)
|
d[param_name] = param_type(param)
|
||||||
|
|
||||||
decimal.append(bool(Decimal(d["h_index"]) % 1 == 0))
|
|
||||||
decimal.append(bool(Decimal(d["k_index"]) % 1 == 0))
|
|
||||||
decimal.append(bool(Decimal(d["l_index"]) % 1 == 0))
|
|
||||||
|
|
||||||
# second line
|
# second line
|
||||||
next_line = next(fileobj)
|
next_line = next(fileobj)
|
||||||
params = next_line.split()
|
for param, (param_name, param_type) in zip(next_line.split(), ccl_second_line):
|
||||||
for param, (param_name, param_type) in zip(params, ccl_second_line):
|
|
||||||
d[param_name] = param_type(param)
|
d[param_name] = param_type(param)
|
||||||
|
|
||||||
d["om"] = np.linspace(
|
d["om"] = np.linspace(
|
||||||
d["omega_angle"] - (d["number_of_measurements"] / 2) * d["angle_step"],
|
d["omega_angle"] - (d["n_points"] / 2) * d["angle_step"],
|
||||||
d["omega_angle"] + (d["number_of_measurements"] / 2) * d["angle_step"],
|
d["omega_angle"] + (d["n_points"] / 2) * d["angle_step"],
|
||||||
d["number_of_measurements"],
|
d["n_points"],
|
||||||
)
|
)
|
||||||
|
|
||||||
# subsequent lines with counts
|
# subsequent lines with counts
|
||||||
counts = []
|
counts = []
|
||||||
while len(counts) < d["number_of_measurements"]:
|
while len(counts) < d["n_points"]:
|
||||||
counts.extend(map(int, next(fileobj).split()))
|
counts.extend(map(int, next(fileobj).split()))
|
||||||
d["Counts"] = counts
|
d["Counts"] = counts
|
||||||
|
|
||||||
scan[int(scan_number)] = d
|
scan[d["scan_number"]] = d
|
||||||
|
|
||||||
if all(decimal):
|
|
||||||
metadata["indices"] = "hkl"
|
|
||||||
else:
|
|
||||||
metadata["indices"] = "real"
|
|
||||||
|
|
||||||
elif data_type == ".dat":
|
elif data_type == ".dat":
|
||||||
# skip the first 2 rows, the third row contans the column names
|
# skip the first 2 rows, the third row contans the column names
|
||||||
@ -200,9 +183,13 @@ def parse_1D(fileobj, data_type):
|
|||||||
print("seems hkl is not in title")
|
print("seems hkl is not in title")
|
||||||
|
|
||||||
data_cols["temperature"] = metadata["temp"]
|
data_cols["temperature"] = metadata["temp"]
|
||||||
data_cols["mag_field"] = metadata["mf"]
|
try:
|
||||||
|
data_cols["mag_field"] = metadata["mf"]
|
||||||
|
except KeyError:
|
||||||
|
print("Mag_field not present in dat file")
|
||||||
|
|
||||||
data_cols["omega_angle"] = metadata["omega"]
|
data_cols["omega_angle"] = metadata["omega"]
|
||||||
data_cols["number_of_measurements"] = len(data_cols["om"])
|
data_cols["n_points"] = len(data_cols["om"])
|
||||||
data_cols["monitor"] = data_cols["Monitor1"][0]
|
data_cols["monitor"] = data_cols["Monitor1"][0]
|
||||||
data_cols["twotheta_angle"] = metadata["2-theta"]
|
data_cols["twotheta_angle"] = metadata["2-theta"]
|
||||||
data_cols["chi_angle"] = metadata["chi"]
|
data_cols["chi_angle"] = metadata["chi"]
|
||||||
@ -215,7 +202,69 @@ def parse_1D(fileobj, data_type):
|
|||||||
print("Unknown file extention")
|
print("Unknown file extention")
|
||||||
|
|
||||||
# utility information
|
# utility information
|
||||||
|
if all(
|
||||||
|
s["h_index"].is_integer() and s["k_index"].is_integer() and s["l_index"].is_integer()
|
||||||
|
for s in scan.values()
|
||||||
|
):
|
||||||
|
metadata["indices"] = "hkl"
|
||||||
|
else:
|
||||||
|
metadata["indices"] = "real"
|
||||||
|
|
||||||
metadata["data_type"] = data_type
|
metadata["data_type"] = data_type
|
||||||
metadata["area_method"] = "fit"
|
metadata["area_method"] = "fit"
|
||||||
|
|
||||||
return {"meta": metadata, "scan": scan}
|
return {"meta": metadata, "scan": scan}
|
||||||
|
|
||||||
|
|
||||||
|
def export_comm(data, path, lorentz=False):
|
||||||
|
"""exports data in the *.comm format
|
||||||
|
:param lorentz: perform Lorentz correction
|
||||||
|
:param path: path to file + name
|
||||||
|
:arg data - data to export, is dict after peak fitting
|
||||||
|
|
||||||
|
"""
|
||||||
|
zebra_mode = data["meta"]["zebra_mode"]
|
||||||
|
if data["meta"]["indices"] == "hkl":
|
||||||
|
extension = ".comm"
|
||||||
|
padding = [6, 4]
|
||||||
|
elif data["meta"]["indices"] == "real":
|
||||||
|
extension = ".incomm"
|
||||||
|
padding = [4, 6]
|
||||||
|
|
||||||
|
with open(str(path + extension), "w") as out_file:
|
||||||
|
for key, scan in data["scan"].items():
|
||||||
|
if "fit" not in scan:
|
||||||
|
print("Scan skipped - no fit value for:", key)
|
||||||
|
continue
|
||||||
|
|
||||||
|
scan_str = f"{key:>{padding[0]}}"
|
||||||
|
h_str = f'{int(scan["h_index"]):{padding[1]}}'
|
||||||
|
k_str = f'{int(scan["k_index"]):{padding[1]}}'
|
||||||
|
l_str = f'{int(scan["l_index"]):{padding[1]}}'
|
||||||
|
|
||||||
|
if data["meta"]["area_method"] == "fit":
|
||||||
|
area = scan["fit"]["fit_area"].n
|
||||||
|
sigma_str = f'{scan["fit"]["fit_area"].s:>10.2f}'
|
||||||
|
elif data["meta"]["area_method"] == "integ":
|
||||||
|
area = scan["fit"]["int_area"].n
|
||||||
|
sigma_str = f'{scan["fit"]["int_area"].s:>10.2f}'
|
||||||
|
|
||||||
|
# apply lorentz correction to area
|
||||||
|
if lorentz:
|
||||||
|
if zebra_mode == "bi":
|
||||||
|
twotheta_angle = np.deg2rad(scan["twotheta_angle"])
|
||||||
|
corr_factor = np.sin(twotheta_angle)
|
||||||
|
elif zebra_mode == "nb":
|
||||||
|
gamma_angle = np.deg2rad(scan["gamma_angle"])
|
||||||
|
nu_angle = np.deg2rad(scan["nu_angle"])
|
||||||
|
corr_factor = np.sin(gamma_angle) * np.cos(nu_angle)
|
||||||
|
|
||||||
|
area = np.abs(area * corr_factor)
|
||||||
|
|
||||||
|
area_str = f"{area:>10.2f}"
|
||||||
|
|
||||||
|
ang_str = ""
|
||||||
|
for angle, _ in CCL_ANGLES[zebra_mode]:
|
||||||
|
ang_str = ang_str + f"{scan[angle]:8}"
|
||||||
|
|
||||||
|
out_file.write(scan_str + h_str + k_str + l_str + area_str + sigma_str + ang_str + "\n")
|
@ -6,6 +6,8 @@ from bokeh.application.application import Application
|
|||||||
from bokeh.application.handlers import ScriptHandler
|
from bokeh.application.handlers import ScriptHandler
|
||||||
from bokeh.server.server import Server
|
from bokeh.server.server import Server
|
||||||
|
|
||||||
|
from pyzebra.app.handler import PyzebraHandler
|
||||||
|
|
||||||
logging.basicConfig(format="%(asctime)s %(message)s", level=logging.INFO)
|
logging.basicConfig(format="%(asctime)s %(message)s", level=logging.INFO)
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@ -35,6 +37,13 @@ def main():
|
|||||||
help="hostname that can connect to the server websocket",
|
help="hostname that can connect to the server websocket",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
parser.add_argument(
|
||||||
|
"--anatric-path",
|
||||||
|
type=str,
|
||||||
|
default="/afs/psi.ch/project/sinq/rhel7/bin/anatric",
|
||||||
|
help="path to anatric executable",
|
||||||
|
)
|
||||||
|
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"--args",
|
"--args",
|
||||||
nargs=argparse.REMAINDER,
|
nargs=argparse.REMAINDER,
|
||||||
@ -46,9 +55,10 @@ def main():
|
|||||||
|
|
||||||
logger.info(app_path)
|
logger.info(app_path)
|
||||||
|
|
||||||
|
pyzebra_handler = PyzebraHandler(args.anatric_path)
|
||||||
handler = ScriptHandler(filename=app_path, argv=args.args)
|
handler = ScriptHandler(filename=app_path, argv=args.args)
|
||||||
server = Server(
|
server = Server(
|
||||||
{"/": Application(handler)},
|
{"/": Application(pyzebra_handler, handler)},
|
||||||
port=args.port,
|
port=args.port,
|
||||||
allow_websocket_origin=args.allow_websocket_origin,
|
allow_websocket_origin=args.allow_websocket_origin,
|
||||||
)
|
)
|
||||||
|
@ -1,80 +0,0 @@
|
|||||||
import numpy as np
|
|
||||||
|
|
||||||
|
|
||||||
def correction(value, lorentz=True, zebra_mode="--", ang1=0, ang2=0):
|
|
||||||
if lorentz is False:
|
|
||||||
return value
|
|
||||||
else:
|
|
||||||
if zebra_mode == "bi":
|
|
||||||
corr_value = np.abs(value * np.sin(ang1))
|
|
||||||
return corr_value
|
|
||||||
elif zebra_mode == "nb":
|
|
||||||
corr_value = np.abs(value * np.sin(ang1) * np.cos(ang2))
|
|
||||||
return corr_value
|
|
||||||
|
|
||||||
|
|
||||||
def export_comm(data, path, lorentz=False):
|
|
||||||
"""exports data in the *.comm format
|
|
||||||
:param lorentz: perform Lorentz correction
|
|
||||||
:param path: path to file + name
|
|
||||||
:arg data - data to export, is dict after peak fitting
|
|
||||||
|
|
||||||
"""
|
|
||||||
zebra_mode = data["meta"]["zebra_mode"]
|
|
||||||
align = ">"
|
|
||||||
if data["meta"]["indices"] == "hkl":
|
|
||||||
extension = ".comm"
|
|
||||||
padding = [6, 4, 10, 8]
|
|
||||||
elif data["meta"]["indices"] == "real":
|
|
||||||
extension = ".incomm"
|
|
||||||
padding = [4, 6, 10, 8]
|
|
||||||
|
|
||||||
with open(str(path + extension), "w") as out_file:
|
|
||||||
for key, scan in data["scan"].items():
|
|
||||||
if "fit" not in scan:
|
|
||||||
print("Scan skipped - no fit value for:", key)
|
|
||||||
continue
|
|
||||||
scan_number_str = f"{key:{align}{padding[0]}}"
|
|
||||||
h_str = f'{int(scan["h_index"]):{padding[1]}}'
|
|
||||||
k_str = f'{int(scan["k_index"]):{padding[1]}}'
|
|
||||||
l_str = f'{int(scan["l_index"]):{padding[1]}}'
|
|
||||||
if data["meta"]["area_method"] == "fit":
|
|
||||||
area = float(scan["fit"]["fit_area"].n)
|
|
||||||
sigma_str = (
|
|
||||||
f'{"{:8.2f}".format(float(scan["fit"]["fit_area"].s)):{align}{padding[2]}}'
|
|
||||||
)
|
|
||||||
elif data["meta"]["area_method"] == "integ":
|
|
||||||
area = float(scan["fit"]["int_area"].n)
|
|
||||||
sigma_str = (
|
|
||||||
f'{"{:8.2f}".format(float(scan["fit"]["int_area"].s)):{align}{padding[2]}}'
|
|
||||||
)
|
|
||||||
|
|
||||||
if zebra_mode == "bi":
|
|
||||||
area = correction(area, lorentz, zebra_mode, scan["twotheta_angle"])
|
|
||||||
int_str = f'{"{:8.2f}".format(area):{align}{padding[2]}}'
|
|
||||||
angle_str1 = f'{scan["twotheta_angle"]:{padding[3]}}'
|
|
||||||
angle_str2 = f'{scan["omega_angle"]:{padding[3]}}'
|
|
||||||
angle_str3 = f'{scan["chi_angle"]:{padding[3]}}'
|
|
||||||
angle_str4 = f'{scan["phi_angle"]:{padding[3]}}'
|
|
||||||
elif zebra_mode == "nb":
|
|
||||||
area = correction(area, lorentz, zebra_mode, scan["gamma_angle"], scan["nu_angle"])
|
|
||||||
int_str = f'{"{:8.2f}".format(area):{align}{padding[2]}}'
|
|
||||||
angle_str1 = f'{scan["gamma_angle"]:{padding[3]}}'
|
|
||||||
angle_str2 = f'{scan["omega_angle"]:{padding[3]}}'
|
|
||||||
angle_str3 = f'{scan["nu_angle"]:{padding[3]}}'
|
|
||||||
angle_str4 = f'{scan["unkwn_angle"]:{padding[3]}}'
|
|
||||||
|
|
||||||
line = (
|
|
||||||
scan_number_str
|
|
||||||
+ h_str
|
|
||||||
+ l_str
|
|
||||||
+ k_str
|
|
||||||
+ int_str
|
|
||||||
+ sigma_str
|
|
||||||
+ angle_str1
|
|
||||||
+ angle_str2
|
|
||||||
+ angle_str3
|
|
||||||
+ angle_str4
|
|
||||||
+ "\n"
|
|
||||||
)
|
|
||||||
out_file.write(line)
|
|
@ -59,15 +59,17 @@ def fitccl(
|
|||||||
constraints_min = [23, None, 50, 0, 0]
|
constraints_min = [23, None, 50, 0, 0]
|
||||||
constraints_min = [80, None, 1000, 0, 100]
|
constraints_min = [80, None, 1000, 0, 100]
|
||||||
"""
|
"""
|
||||||
|
if "peak_indexes" not in scan:
|
||||||
|
scan["peak_indexes"] = []
|
||||||
|
|
||||||
if len(scan["peak_indexes"]) > 1:
|
if len(scan["peak_indexes"]) > 1:
|
||||||
# return in case of more than 1 peaks
|
# return in case of more than 1 peaks
|
||||||
print("More than 1 peak, scan skipped")
|
|
||||||
return
|
return
|
||||||
|
|
||||||
if binning is None or binning == 0 or binning == 1:
|
if binning is None or binning == 0 or binning == 1:
|
||||||
x = list(scan["om"])
|
x = list(scan["om"])
|
||||||
y = list(scan["Counts"])
|
y = list(scan["Counts"])
|
||||||
y_err = list(np.sqrt(y)) if scan.get("sigma", None) is None else list(scan["sigma"])
|
y_err = list(np.sqrt(y)) if scan.get("sigma", None) is None else list(scan["sigma"])
|
||||||
print(scan["peak_indexes"])
|
|
||||||
if not scan["peak_indexes"]:
|
if not scan["peak_indexes"]:
|
||||||
centre = np.mean(x)
|
centre = np.mean(x)
|
||||||
else:
|
else:
|
||||||
@ -87,7 +89,6 @@ def fitccl(
|
|||||||
|
|
||||||
if len(scan["peak_indexes"]) == 0:
|
if len(scan["peak_indexes"]) == 0:
|
||||||
# Case for no peak, gaussian in centre, sigma as 20% of range
|
# Case for no peak, gaussian in centre, sigma as 20% of range
|
||||||
print("No peak")
|
|
||||||
peak_index = find_nearest(x, np.mean(x))
|
peak_index = find_nearest(x, np.mean(x))
|
||||||
guess[0] = centre if guess[0] is None else guess[0]
|
guess[0] = centre if guess[0] is None else guess[0]
|
||||||
guess[1] = (x[-1] - x[0]) / 5 if guess[1] is None else guess[1]
|
guess[1] = (x[-1] - x[0]) / 5 if guess[1] is None else guess[1]
|
||||||
@ -98,7 +99,6 @@ def fitccl(
|
|||||||
|
|
||||||
elif len(scan["peak_indexes"]) == 1:
|
elif len(scan["peak_indexes"]) == 1:
|
||||||
# case for one peak, takse into account users guesses
|
# case for one peak, takse into account users guesses
|
||||||
print("one peak")
|
|
||||||
peak_height = scan["peak_heights"]
|
peak_height = scan["peak_heights"]
|
||||||
guess[0] = centre if guess[0] is None else guess[0]
|
guess[0] = centre if guess[0] is None else guess[0]
|
||||||
guess[1] = 0.1 if guess[1] is None else guess[1]
|
guess[1] = 0.1 if guess[1] is None else guess[1]
|
||||||
@ -128,11 +128,11 @@ def fitccl(
|
|||||||
("intercept", guess[4], bool(vary[4]), constraints_min[4], constraints_max[4], None, None),
|
("intercept", guess[4], bool(vary[4]), constraints_min[4], constraints_max[4], None, None),
|
||||||
)
|
)
|
||||||
# the weighted fit
|
# the weighted fit
|
||||||
|
weights = [np.abs(1 / val) if val != 0 else 1 for val in y_err]
|
||||||
try:
|
try:
|
||||||
result = mod.fit(
|
result = mod.fit(y, params, weights=weights, x=x, calc_covar=True)
|
||||||
y, params, weights=[np.abs(1 / val) for val in y_err], x=x, calc_covar=True,
|
|
||||||
)
|
|
||||||
except ValueError:
|
except ValueError:
|
||||||
|
print(f"Couldn't fit scan {scan['scan_number']}")
|
||||||
return
|
return
|
||||||
|
|
||||||
if result.params["g_amp"].stderr is None:
|
if result.params["g_amp"].stderr is None:
|
||||||
@ -213,9 +213,9 @@ def fitccl(
|
|||||||
d = {}
|
d = {}
|
||||||
for pars in result.params:
|
for pars in result.params:
|
||||||
d[str(pars)] = (result.params[str(pars)].value, result.params[str(pars)].vary)
|
d[str(pars)] = (result.params[str(pars)].value, result.params[str(pars)].vary)
|
||||||
print(result.fit_report())
|
|
||||||
|
|
||||||
print((result.params["g_amp"].value - int_area.n) / result.params["g_amp"].value)
|
print("Scan", scan["scan_number"])
|
||||||
|
print(result.fit_report())
|
||||||
|
|
||||||
d["ratio"] = (result.params["g_amp"].value - int_area.n) / result.params["g_amp"].value
|
d["ratio"] = (result.params["g_amp"].value - int_area.n) / result.params["g_amp"].value
|
||||||
d["int_area"] = int_area
|
d["int_area"] = int_area
|
||||||
@ -224,4 +224,5 @@ def fitccl(
|
|||||||
d["result"] = result
|
d["result"] = result
|
||||||
d["comps"] = comps
|
d["comps"] = comps
|
||||||
d["numfit"] = [numfit_min, numfit_max]
|
d["numfit"] = [numfit_min, numfit_max]
|
||||||
|
d["x_fit"] = x
|
||||||
scan["fit"] = d
|
scan["fit"] = d
|
||||||
|
167
pyzebra/fitvol3.py
Normal file
167
pyzebra/fitvol3.py
Normal file
@ -0,0 +1,167 @@
|
|||||||
|
import numpy as np
|
||||||
|
from lmfit import Model, Parameters
|
||||||
|
from scipy.integrate import simps
|
||||||
|
import matplotlib.pyplot as plt
|
||||||
|
import uncertainties as u
|
||||||
|
from lmfit.models import GaussianModel
|
||||||
|
from lmfit.models import VoigtModel
|
||||||
|
from lmfit.models import PseudoVoigtModel
|
||||||
|
|
||||||
|
|
||||||
|
def bin_data(array, binsize):
|
||||||
|
if isinstance(binsize, int) and 0 < binsize < len(array):
|
||||||
|
return [
|
||||||
|
np.mean(array[binsize * i : binsize * i + binsize])
|
||||||
|
for i in range(int(np.ceil(len(array) / binsize)))
|
||||||
|
]
|
||||||
|
else:
|
||||||
|
print("Binsize need to be positive integer smaller than lenght of array")
|
||||||
|
return array
|
||||||
|
|
||||||
|
|
||||||
|
def create_uncertanities(y, y_err):
|
||||||
|
# create array with uncertanities for error propagation
|
||||||
|
combined = np.array([])
|
||||||
|
for i in range(len(y)):
|
||||||
|
part = u.ufloat(y[i], y_err[i])
|
||||||
|
combined = np.append(combined, part)
|
||||||
|
return combined
|
||||||
|
|
||||||
|
|
||||||
|
def find_nearest(array, value):
|
||||||
|
# find nearest value and return index
|
||||||
|
array = np.asarray(array)
|
||||||
|
idx = (np.abs(array - value)).argmin()
|
||||||
|
return idx
|
||||||
|
|
||||||
|
|
||||||
|
# predefined peak positions
|
||||||
|
# peaks = [6.2, 8.1, 9.9, 11.5]
|
||||||
|
peaks = [23.5, 24.5]
|
||||||
|
# peaks = [24]
|
||||||
|
def fitccl(scan, variable="om", peak_type="gauss", binning=None):
|
||||||
|
|
||||||
|
x = list(scan[variable])
|
||||||
|
y = list(scan["Counts"])
|
||||||
|
peak_centre = np.mean(x)
|
||||||
|
if binning is None or binning == 0 or binning == 1:
|
||||||
|
x = list(scan["om"])
|
||||||
|
y = list(scan["Counts"])
|
||||||
|
y_err = list(np.sqrt(y)) if scan.get("sigma", None) is None else list(scan["sigma"])
|
||||||
|
print(scan["peak_indexes"])
|
||||||
|
if not scan["peak_indexes"]:
|
||||||
|
peak_centre = np.mean(x)
|
||||||
|
else:
|
||||||
|
centre = x[int(scan["peak_indexes"])]
|
||||||
|
else:
|
||||||
|
x = list(scan["om"])
|
||||||
|
if not scan["peak_indexes"]:
|
||||||
|
peak_centre = np.mean(x)
|
||||||
|
else:
|
||||||
|
peak_centre = x[int(scan["peak_indexes"])]
|
||||||
|
x = bin_data(x, binning)
|
||||||
|
y = list(scan["Counts"])
|
||||||
|
y_err = list(np.sqrt(y)) if scan.get("sigma", None) is None else list(scan["sigma"])
|
||||||
|
combined = bin_data(create_uncertanities(y, y_err), binning)
|
||||||
|
y = [combined[i].n for i in range(len(combined))]
|
||||||
|
y_err = [combined[i].s for i in range(len(combined))]
|
||||||
|
|
||||||
|
def background(x, slope, intercept):
|
||||||
|
"""background"""
|
||||||
|
return slope * (x - peak_centre) + intercept
|
||||||
|
|
||||||
|
def gaussian(x, center, g_sigma, amplitude):
|
||||||
|
"""1-d gaussian: gaussian(x, amp, cen, wid)"""
|
||||||
|
return (amplitude / (np.sqrt(2.0 * np.pi) * g_sigma)) * np.exp(
|
||||||
|
-((x - center) ** 2) / (2 * g_sigma ** 2)
|
||||||
|
)
|
||||||
|
|
||||||
|
def lorentzian(x, center, l_sigma, amplitude):
|
||||||
|
"""1d lorentzian"""
|
||||||
|
return (amplitude / (1 + ((1 * x - center) / l_sigma) ** 2)) / (np.pi * l_sigma)
|
||||||
|
|
||||||
|
def pseudoVoigt1(x, center, g_sigma, amplitude, l_sigma, fraction):
|
||||||
|
"""PseudoVoight peak with different widths of lorenzian and gaussian"""
|
||||||
|
return (1 - fraction) * gaussian(x, center, g_sigma, amplitude) + fraction * (
|
||||||
|
lorentzian(x, center, l_sigma, amplitude)
|
||||||
|
)
|
||||||
|
|
||||||
|
mod = Model(background)
|
||||||
|
params = Parameters()
|
||||||
|
params.add_many(
|
||||||
|
("slope", 0, True, None, None, None, None), ("intercept", 0, False, None, None, None, None)
|
||||||
|
)
|
||||||
|
for i in range(len(peaks)):
|
||||||
|
if peak_type == "gauss":
|
||||||
|
mod = mod + GaussianModel(prefix="p%d_" % (i + 1))
|
||||||
|
params.add(str("p%d_" % (i + 1) + "amplitude"), 20, True, 0, None, None)
|
||||||
|
params.add(str("p%d_" % (i + 1) + "center"), peaks[i], True, None, None, None)
|
||||||
|
params.add(str("p%d_" % (i + 1) + "sigma"), 0.2, True, 0, 5, None)
|
||||||
|
elif peak_type == "voigt":
|
||||||
|
mod = mod + VoigtModel(prefix="p%d_" % (i + 1))
|
||||||
|
params.add(str("p%d_" % (i + 1) + "amplitude"), 20, True, 0, None, None)
|
||||||
|
params.add(str("p%d_" % (i + 1) + "center"), peaks[i], True, None, None, None)
|
||||||
|
params.add(str("p%d_" % (i + 1) + "sigma"), 0.2, True, 0, 3, None)
|
||||||
|
params.add(str("p%d_" % (i + 1) + "gamma"), 0.2, True, 0, 5, None)
|
||||||
|
elif peak_type == "pseudovoigt":
|
||||||
|
mod = mod + PseudoVoigtModel(prefix="p%d_" % (i + 1))
|
||||||
|
params.add(str("p%d_" % (i + 1) + "amplitude"), 20, True, 0, None, None)
|
||||||
|
params.add(str("p%d_" % (i + 1) + "center"), peaks[i], True, None, None, None)
|
||||||
|
params.add(str("p%d_" % (i + 1) + "sigma"), 0.2, True, 0, 5, None)
|
||||||
|
params.add(str("p%d_" % (i + 1) + "fraction"), 0.5, True, -5, 5, None)
|
||||||
|
elif peak_type == "pseudovoigt1":
|
||||||
|
mod = mod + Model(pseudoVoigt1, prefix="p%d_" % (i + 1))
|
||||||
|
params.add(str("p%d_" % (i + 1) + "amplitude"), 20, True, 0, None, None)
|
||||||
|
params.add(str("p%d_" % (i + 1) + "center"), peaks[i], True, None, None, None)
|
||||||
|
params.add(str("p%d_" % (i + 1) + "g_sigma"), 0.2, True, 0, 5, None)
|
||||||
|
params.add(str("p%d_" % (i + 1) + "l_sigma"), 0.2, True, 0, 5, None)
|
||||||
|
params.add(str("p%d_" % (i + 1) + "fraction"), 0.5, True, 0, 1, None)
|
||||||
|
# add parameters
|
||||||
|
|
||||||
|
result = mod.fit(
|
||||||
|
y, params, weights=[np.abs(1 / y_err[i]) for i in range(len(y_err))], x=x, calc_covar=True
|
||||||
|
)
|
||||||
|
|
||||||
|
comps = result.eval_components()
|
||||||
|
|
||||||
|
reportstring = list()
|
||||||
|
for keys in result.params:
|
||||||
|
if result.params[keys].value is not None:
|
||||||
|
str2 = np.around(result.params[keys].value, 3)
|
||||||
|
else:
|
||||||
|
str2 = 0
|
||||||
|
if result.params[keys].stderr is not None:
|
||||||
|
str3 = np.around(result.params[keys].stderr, 3)
|
||||||
|
else:
|
||||||
|
str3 = 0
|
||||||
|
reportstring.append("%s = %2.3f +/- %2.3f" % (keys, str2, str3))
|
||||||
|
|
||||||
|
reportstring = "\n".join(reportstring)
|
||||||
|
|
||||||
|
plt.figure(figsize=(20, 10))
|
||||||
|
plt.plot(x, result.best_fit, "k-", label="Best fit")
|
||||||
|
|
||||||
|
plt.plot(x, y, "b-", label="Original data")
|
||||||
|
plt.plot(x, comps["background"], "g--", label="Line component")
|
||||||
|
for i in range(len(peaks)):
|
||||||
|
plt.plot(
|
||||||
|
x,
|
||||||
|
comps[str("p%d_" % (i + 1))],
|
||||||
|
"r--",
|
||||||
|
)
|
||||||
|
plt.fill_between(x, comps[str("p%d_" % (i + 1))], alpha=0.4, label=str("p%d_" % (i + 1)))
|
||||||
|
plt.legend()
|
||||||
|
plt.text(
|
||||||
|
np.min(x),
|
||||||
|
np.max(y),
|
||||||
|
reportstring,
|
||||||
|
fontsize=9,
|
||||||
|
verticalalignment="top",
|
||||||
|
)
|
||||||
|
plt.title(str(peak_type))
|
||||||
|
|
||||||
|
plt.xlabel("Omega [deg]")
|
||||||
|
plt.ylabel("Counts [a.u.]")
|
||||||
|
plt.show()
|
||||||
|
|
||||||
|
print(result.fit_report())
|
@ -60,7 +60,12 @@ def read_detector_data(filepath):
|
|||||||
det_data["chi_angle"] = h5f["/entry1/sample/chi"][:] # ch
|
det_data["chi_angle"] = h5f["/entry1/sample/chi"][:] # ch
|
||||||
det_data["phi_angle"] = h5f["/entry1/sample/phi"][:] # ph
|
det_data["phi_angle"] = h5f["/entry1/sample/phi"][:] # ph
|
||||||
det_data["UB"] = h5f["/entry1/sample/UB"][:].reshape(3, 3)
|
det_data["UB"] = h5f["/entry1/sample/UB"][:].reshape(3, 3)
|
||||||
det_data["magnetic_field"] = h5f["/entry1/sample/magnetic_field"][:]
|
|
||||||
det_data["temperature"] = h5f["/entry1/sample/temperature"][:]
|
# optional parameters
|
||||||
|
if "/entry1/sample/magnetic_field" in h5f:
|
||||||
|
det_data["magnetic_field"] = h5f["/entry1/sample/magnetic_field"][:]
|
||||||
|
|
||||||
|
if "/entry1/sample/temperature" in h5f:
|
||||||
|
det_data["temperature"] = h5f["/entry1/sample/temperature"][:]
|
||||||
|
|
||||||
return det_data
|
return det_data
|
||||||
|
@ -1,12 +1,24 @@
|
|||||||
from load_1D import load_1D
|
|
||||||
from ccl_dict_operation import add_dict
|
|
||||||
import pandas as pd
|
|
||||||
from mpl_toolkits.mplot3d import Axes3D # dont delete, otherwise waterfall wont work
|
|
||||||
import matplotlib.pyplot as plt
|
|
||||||
import matplotlib as mpl
|
|
||||||
import numpy as np
|
|
||||||
import pickle
|
import pickle
|
||||||
|
|
||||||
|
import matplotlib as mpl
|
||||||
|
import matplotlib.pyplot as plt
|
||||||
|
import numpy as np
|
||||||
|
import pandas as pd
|
||||||
import scipy.io as sio
|
import scipy.io as sio
|
||||||
|
import uncertainties as u
|
||||||
|
from mpl_toolkits.mplot3d import Axes3D # dont delete, otherwise waterfall wont work
|
||||||
|
import collections
|
||||||
|
|
||||||
|
from .ccl_io import load_1D
|
||||||
|
|
||||||
|
def create_tuples(x, y, y_err):
|
||||||
|
"""creates tuples for sorting and merginng of the data
|
||||||
|
Counts need to be normalized to monitor before"""
|
||||||
|
t = list()
|
||||||
|
for i in range(len(x)):
|
||||||
|
tup = (x[i], y[i], y_err[i])
|
||||||
|
t.append(tup)
|
||||||
|
return t
|
||||||
|
|
||||||
|
|
||||||
def load_dats(filepath):
|
def load_dats(filepath):
|
||||||
@ -37,45 +49,45 @@ def load_dats(filepath):
|
|||||||
if data_type == "txt":
|
if data_type == "txt":
|
||||||
dict1 = add_dict(dict1, load_1D(file_list[i][0]))
|
dict1 = add_dict(dict1, load_1D(file_list[i][0]))
|
||||||
else:
|
else:
|
||||||
|
|
||||||
dict1 = add_dict(dict1, load_1D(file_list[i]))
|
dict1 = add_dict(dict1, load_1D(file_list[i]))
|
||||||
dict1["scan"][i + 1]["params"] = {}
|
dict1["scan"][i + 1]["params"] = {}
|
||||||
if data_type == "txt":
|
if data_type == "txt":
|
||||||
for x in range(len(col_names) - 1):
|
for x in range(len(col_names) - 1):
|
||||||
dict1["scan"][i + 1]["params"][col_names[x + 1]] = file_list[i][x + 1]
|
dict1["scan"][i + 1]["params"][col_names[x + 1]] = float(file_list[i][x + 1])
|
||||||
|
|
||||||
return dict1
|
return dict1
|
||||||
|
|
||||||
|
|
||||||
def create_dataframe(dict1):
|
def create_dataframe(dict1, variables):
|
||||||
"""Creates pandas dataframe from the dictionary
|
"""Creates pandas dataframe from the dictionary
|
||||||
:arg ccl like dictionary
|
:arg ccl like dictionary
|
||||||
:return pandas dataframe"""
|
:return pandas dataframe"""
|
||||||
# create dictionary to which we pull only wanted items before transforming it to pd.dataframe
|
# create dictionary to which we pull only wanted items before transforming it to pd.dataframe
|
||||||
pull_dict = {}
|
pull_dict = {}
|
||||||
pull_dict["filenames"] = list()
|
pull_dict["filenames"] = list()
|
||||||
for key in dict1["scan"][1]["params"]:
|
for keys in variables:
|
||||||
pull_dict[key] = list()
|
for item in variables[keys]:
|
||||||
pull_dict["temperature"] = list()
|
pull_dict[item] = list()
|
||||||
pull_dict["mag_field"] = list()
|
|
||||||
pull_dict["fit_area"] = list()
|
pull_dict["fit_area"] = list()
|
||||||
pull_dict["int_area"] = list()
|
pull_dict["int_area"] = list()
|
||||||
pull_dict["om"] = list()
|
|
||||||
pull_dict["Counts"] = list()
|
pull_dict["Counts"] = list()
|
||||||
|
|
||||||
|
for keys in pull_dict:
|
||||||
|
print(keys)
|
||||||
|
|
||||||
# populate the dict
|
# populate the dict
|
||||||
for keys in dict1["scan"]:
|
for keys in dict1["scan"]:
|
||||||
if "file_of_origin" in dict1["scan"][keys]:
|
if "file_of_origin" in dict1["scan"][keys]:
|
||||||
pull_dict["filenames"].append(dict1["scan"][keys]["file_of_origin"].split("/")[-1])
|
pull_dict["filenames"].append(dict1["scan"][keys]["file_of_origin"].split("/")[-1])
|
||||||
else:
|
else:
|
||||||
pull_dict["filenames"].append(dict1["meta"]["original_filename"].split("/")[-1])
|
pull_dict["filenames"].append(dict1["meta"]["original_filename"].split("/")[-1])
|
||||||
for key in dict1["scan"][keys]["params"]:
|
|
||||||
pull_dict[str(key)].append(float(dict1["scan"][keys]["params"][key]))
|
|
||||||
pull_dict["temperature"].append(dict1["scan"][keys]["temperature"])
|
|
||||||
pull_dict["mag_field"].append(dict1["scan"][keys]["mag_field"])
|
|
||||||
pull_dict["fit_area"].append(dict1["scan"][keys]["fit"]["fit_area"])
|
pull_dict["fit_area"].append(dict1["scan"][keys]["fit"]["fit_area"])
|
||||||
pull_dict["int_area"].append(dict1["scan"][keys]["fit"]["int_area"])
|
pull_dict["int_area"].append(dict1["scan"][keys]["fit"]["int_area"])
|
||||||
pull_dict["om"].append(dict1["scan"][keys]["om"])
|
|
||||||
pull_dict["Counts"].append(dict1["scan"][keys]["Counts"])
|
pull_dict["Counts"].append(dict1["scan"][keys]["Counts"])
|
||||||
|
for key in variables:
|
||||||
|
for i in variables[key]:
|
||||||
|
pull_dict[i].append(_finditem(dict1["scan"][keys], i))
|
||||||
|
|
||||||
return pd.DataFrame(data=pull_dict)
|
return pd.DataFrame(data=pull_dict)
|
||||||
|
|
||||||
@ -144,7 +156,7 @@ def make_graph(data, sorting_parameter, style):
|
|||||||
|
|
||||||
|
|
||||||
def save_dict(obj, name):
|
def save_dict(obj, name):
|
||||||
""" saves dictionary as pickle file in binary format
|
"""saves dictionary as pickle file in binary format
|
||||||
:arg obj - object to save
|
:arg obj - object to save
|
||||||
:arg name - name of the file
|
:arg name - name of the file
|
||||||
NOTE: path should be added later"""
|
NOTE: path should be added later"""
|
||||||
@ -200,3 +212,277 @@ def save_table(data, filetype, name, path=None):
|
|||||||
hdf.close()
|
hdf.close()
|
||||||
if filetype == "json":
|
if filetype == "json":
|
||||||
data.to_json((path + name + ".json"))
|
data.to_json((path + name + ".json"))
|
||||||
|
|
||||||
|
|
||||||
|
def normalize(scan, monitor):
|
||||||
|
"""Normalizes the measurement to monitor, checks if sigma exists, otherwise creates it
|
||||||
|
:arg dict : dictionary to from which to tkae the scan
|
||||||
|
:arg key : which scan to normalize from dict1
|
||||||
|
:arg monitor : final monitor
|
||||||
|
:return counts - normalized counts
|
||||||
|
:return sigma - normalized sigma"""
|
||||||
|
|
||||||
|
counts = np.array(scan["Counts"])
|
||||||
|
sigma = np.sqrt(counts) if "sigma" not in scan else scan["sigma"]
|
||||||
|
monitor_ratio = monitor / scan["monitor"]
|
||||||
|
scaled_counts = counts * monitor_ratio
|
||||||
|
scaled_sigma = np.array(sigma) * monitor_ratio
|
||||||
|
|
||||||
|
return scaled_counts, scaled_sigma
|
||||||
|
|
||||||
|
|
||||||
|
def merge(scan1, scan2, keep=True, monitor=100000):
|
||||||
|
"""merges the two tuples and sorts them, if om value is same, Counts value is average
|
||||||
|
averaging is propagated into sigma if dict1 == dict2, key[1] is deleted after merging
|
||||||
|
:arg dict1 : dictionary to which measurement will be merged
|
||||||
|
:arg dict2 : dictionary from which measurement will be merged
|
||||||
|
:arg scand_dict_result : result of scan_dict after auto function
|
||||||
|
:arg keep : if true, when monitors are same, does not change it, if flase, takes monitor
|
||||||
|
always
|
||||||
|
:arg monitor : final monitor after merging
|
||||||
|
note: dict1 and dict2 can be same dict
|
||||||
|
:return dict1 with merged scan"""
|
||||||
|
|
||||||
|
if keep:
|
||||||
|
if scan1["monitor"] == scan2["monitor"]:
|
||||||
|
monitor = scan1["monitor"]
|
||||||
|
|
||||||
|
# load om and Counts
|
||||||
|
x1, x2 = scan1["om"], scan2["om"]
|
||||||
|
cor_y1, y_err1 = normalize(scan1, monitor=monitor)
|
||||||
|
cor_y2, y_err2 = normalize(scan2, monitor=monitor)
|
||||||
|
# creates touples (om, Counts, sigma) for sorting and further processing
|
||||||
|
tuple_list = create_tuples(x1, cor_y1, y_err1) + create_tuples(x2, cor_y2, y_err2)
|
||||||
|
# Sort the list on om and add 0 0 0 tuple to the last position
|
||||||
|
sorted_t = sorted(tuple_list, key=lambda tup: tup[0])
|
||||||
|
sorted_t.append((0, 0, 0))
|
||||||
|
om, Counts, sigma = [], [], []
|
||||||
|
seen = list()
|
||||||
|
for i in range(len(sorted_t) - 1):
|
||||||
|
if sorted_t[i][0] not in seen:
|
||||||
|
if sorted_t[i][0] != sorted_t[i + 1][0]:
|
||||||
|
om = np.append(om, sorted_t[i][0])
|
||||||
|
Counts = np.append(Counts, sorted_t[i][1])
|
||||||
|
sigma = np.append(sigma, sorted_t[i][2])
|
||||||
|
else:
|
||||||
|
om = np.append(om, sorted_t[i][0])
|
||||||
|
counts1, counts2 = sorted_t[i][1], sorted_t[i + 1][1]
|
||||||
|
sigma1, sigma2 = sorted_t[i][2], sorted_t[i + 1][2]
|
||||||
|
count_err1 = u.ufloat(counts1, sigma1)
|
||||||
|
count_err2 = u.ufloat(counts2, sigma2)
|
||||||
|
avg = (count_err1 + count_err2) / 2
|
||||||
|
Counts = np.append(Counts, avg.n)
|
||||||
|
sigma = np.append(sigma, avg.s)
|
||||||
|
seen.append(sorted_t[i][0])
|
||||||
|
else:
|
||||||
|
continue
|
||||||
|
scan1["om"] = om
|
||||||
|
scan1["Counts"] = Counts
|
||||||
|
scan1["sigma"] = sigma
|
||||||
|
scan1["monitor"] = monitor
|
||||||
|
print("merging done")
|
||||||
|
|
||||||
|
|
||||||
|
def add_dict(dict1, dict2):
|
||||||
|
"""adds two dictionaries, meta of the new is saved as meata+original_filename and
|
||||||
|
measurements are shifted to continue with numbering of first dict
|
||||||
|
:arg dict1 : dictionarry to add to
|
||||||
|
:arg dict2 : dictionarry from which to take the measurements
|
||||||
|
:return dict1 : combined dictionary
|
||||||
|
Note: dict1 must be made from ccl, otherwise we would have to change the structure of loaded
|
||||||
|
dat file"""
|
||||||
|
try:
|
||||||
|
if dict1["meta"]["zebra_mode"] != dict2["meta"]["zebra_mode"]:
|
||||||
|
print("You are trying to add scans measured with different zebra modes")
|
||||||
|
return
|
||||||
|
# this is for the qscan case
|
||||||
|
except KeyError:
|
||||||
|
print("Zebra mode not specified")
|
||||||
|
max_measurement_dict1 = max([keys for keys in dict1["scan"]])
|
||||||
|
new_filenames = np.arange(
|
||||||
|
max_measurement_dict1 + 1, max_measurement_dict1 + 1 + len(dict2["scan"])
|
||||||
|
)
|
||||||
|
new_meta_name = "meta" + str(dict2["meta"]["original_filename"])
|
||||||
|
if new_meta_name not in dict1:
|
||||||
|
for keys, name in zip(dict2["scan"], new_filenames):
|
||||||
|
dict2["scan"][keys]["file_of_origin"] = str(dict2["meta"]["original_filename"])
|
||||||
|
dict1["scan"][name] = dict2["scan"][keys]
|
||||||
|
|
||||||
|
dict1[new_meta_name] = dict2["meta"]
|
||||||
|
else:
|
||||||
|
raise KeyError(
|
||||||
|
str(
|
||||||
|
"The file %s has alredy been added to %s"
|
||||||
|
% (dict2["meta"]["original_filename"], dict1["meta"]["original_filename"])
|
||||||
|
)
|
||||||
|
)
|
||||||
|
return dict1
|
||||||
|
|
||||||
|
|
||||||
|
def auto(dict):
|
||||||
|
"""takes just unique tuples from all tuples in dictionary returend by scan_dict
|
||||||
|
intendet for automatic merge if you doesent want to specify what scans to merge together
|
||||||
|
args: dict - dictionary from scan_dict function
|
||||||
|
:return dict - dict without repetitions"""
|
||||||
|
for keys in dict:
|
||||||
|
tuple_list = dict[keys]
|
||||||
|
new = list()
|
||||||
|
for i in range(len(tuple_list)):
|
||||||
|
if tuple_list[0][0] == tuple_list[i][0]:
|
||||||
|
new.append(tuple_list[i])
|
||||||
|
dict[keys] = new
|
||||||
|
return dict
|
||||||
|
|
||||||
|
|
||||||
|
def scan_dict(dict, precision=0.5):
|
||||||
|
"""scans dictionary for duplicate angles indexes
|
||||||
|
:arg dict : dictionary to scan
|
||||||
|
:arg precision : in deg, sometimes angles are zero so its easier this way, instead of
|
||||||
|
checking zero division
|
||||||
|
:return dictionary with matching scans, if there are none, the dict is empty
|
||||||
|
note: can be checked by "not d", true if empty
|
||||||
|
"""
|
||||||
|
|
||||||
|
if dict["meta"]["zebra_mode"] == "bi":
|
||||||
|
angles = ["twotheta_angle", "omega_angle", "chi_angle", "phi_angle"]
|
||||||
|
elif dict["meta"]["zebra_mode"] == "nb":
|
||||||
|
angles = ["gamma_angle", "omega_angle", "nu_angle"]
|
||||||
|
else:
|
||||||
|
print("Unknown zebra mode")
|
||||||
|
return
|
||||||
|
|
||||||
|
d = {}
|
||||||
|
for i in dict["scan"]:
|
||||||
|
for j in dict["scan"]:
|
||||||
|
if dict["scan"][i] != dict["scan"][j]:
|
||||||
|
itup = list()
|
||||||
|
for k in angles:
|
||||||
|
itup.append(abs(abs(dict["scan"][i][k]) - abs(dict["scan"][j][k])))
|
||||||
|
|
||||||
|
if all(i <= precision for i in itup):
|
||||||
|
print(itup)
|
||||||
|
print([dict["scan"][i][k] for k in angles])
|
||||||
|
print([dict["scan"][j][k] for k in angles])
|
||||||
|
if str([np.around(dict["scan"][i][k], 0) for k in angles]) not in d:
|
||||||
|
d[str([np.around(dict["scan"][i][k], 0) for k in angles])] = list()
|
||||||
|
d[str([np.around(dict["scan"][i][k], 0) for k in angles])].append((i, j))
|
||||||
|
else:
|
||||||
|
d[str([np.around(dict["scan"][i][k], 0) for k in angles])].append((i, j))
|
||||||
|
|
||||||
|
else:
|
||||||
|
pass
|
||||||
|
|
||||||
|
else:
|
||||||
|
continue
|
||||||
|
|
||||||
|
return d
|
||||||
|
|
||||||
|
|
||||||
|
def _finditem(obj, key):
|
||||||
|
if key in obj:
|
||||||
|
return obj[key]
|
||||||
|
for k, v in obj.items():
|
||||||
|
if isinstance(v, dict):
|
||||||
|
item = _finditem(v, key)
|
||||||
|
if item is not None:
|
||||||
|
return item
|
||||||
|
|
||||||
|
|
||||||
|
def most_common(lst):
|
||||||
|
return max(set(lst), key=lst.count)
|
||||||
|
|
||||||
|
|
||||||
|
def variables(dictionary):
|
||||||
|
"""Funcrion to guess what variables will be used in the param study
|
||||||
|
i call pripary variable the one the array like variable, usually omega
|
||||||
|
and secondary the slicing variable, different for each scan,for example temperature"""
|
||||||
|
# find all variables that are in all scans
|
||||||
|
stdev_precision = 0.05
|
||||||
|
all_vars = list()
|
||||||
|
for keys in dictionary["scan"]:
|
||||||
|
all_vars.append([key for key in dictionary["scan"][keys] if key != "params"])
|
||||||
|
if dictionary["scan"][keys]["params"]:
|
||||||
|
all_vars.append(key for key in dictionary["scan"][keys]["params"])
|
||||||
|
|
||||||
|
all_vars = [i for sublist in all_vars for i in sublist]
|
||||||
|
# get the ones that are in all scans
|
||||||
|
b = collections.Counter(all_vars)
|
||||||
|
inall = [key for key in b if b[key] == len(dictionary["scan"])]
|
||||||
|
# delete those that are obviously wrong
|
||||||
|
wrong = [
|
||||||
|
"NP",
|
||||||
|
"Counts",
|
||||||
|
"Monitor1",
|
||||||
|
"Monitor2",
|
||||||
|
"Monitor3",
|
||||||
|
"h_index",
|
||||||
|
"l_index",
|
||||||
|
"k_index",
|
||||||
|
"n_points",
|
||||||
|
"monitor",
|
||||||
|
"Time",
|
||||||
|
"omega_angle",
|
||||||
|
"twotheta_angle",
|
||||||
|
"chi_angle",
|
||||||
|
"phi_angle",
|
||||||
|
"nu_angle",
|
||||||
|
]
|
||||||
|
inall_red = [i for i in inall if i not in wrong]
|
||||||
|
|
||||||
|
# check for primary variable, needs to be list, we dont suspect the
|
||||||
|
# primary variable be as a parameter (be in scan[params])
|
||||||
|
primary_candidates = list()
|
||||||
|
for key in dictionary["scan"]:
|
||||||
|
for i in inall_red:
|
||||||
|
if isinstance(_finditem(dictionary["scan"][key], i), list):
|
||||||
|
if np.std(_finditem(dictionary["scan"][key], i)) > stdev_precision:
|
||||||
|
primary_candidates.append(i)
|
||||||
|
# check which of the primary are in every scan
|
||||||
|
primary_candidates = collections.Counter(primary_candidates)
|
||||||
|
second_round_primary_candidates = [
|
||||||
|
key for key in primary_candidates if primary_candidates[key] == len(dictionary["scan"])
|
||||||
|
]
|
||||||
|
|
||||||
|
if len(second_round_primary_candidates) == 1:
|
||||||
|
print("We've got a primary winner!", second_round_primary_candidates)
|
||||||
|
else:
|
||||||
|
print("Still not sure with primary:(", second_round_primary_candidates)
|
||||||
|
|
||||||
|
# check for secondary variable, we suspect a float\int or not changing array
|
||||||
|
# we dont need to check for primary ones
|
||||||
|
secondary_candidates = [i for i in inall_red if i not in second_round_primary_candidates]
|
||||||
|
# print("secondary candidates", secondary_candidates)
|
||||||
|
# select arrays and floats and ints
|
||||||
|
second_round_secondary_candidates = list()
|
||||||
|
for key in dictionary["scan"]:
|
||||||
|
for i in secondary_candidates:
|
||||||
|
if isinstance(_finditem(dictionary["scan"][key], i), float):
|
||||||
|
second_round_secondary_candidates.append(i)
|
||||||
|
elif isinstance(_finditem(dictionary["scan"][key], i), int):
|
||||||
|
second_round_secondary_candidates.append(i)
|
||||||
|
elif isinstance(_finditem(dictionary["scan"][key], i), list):
|
||||||
|
if np.std(_finditem(dictionary["scan"][key], i)) < stdev_precision:
|
||||||
|
second_round_secondary_candidates.append(i)
|
||||||
|
|
||||||
|
second_round_secondary_candidates = collections.Counter(second_round_secondary_candidates)
|
||||||
|
second_round_secondary_candidates = [
|
||||||
|
key
|
||||||
|
for key in second_round_secondary_candidates
|
||||||
|
if second_round_secondary_candidates[key] == len(dictionary["scan"])
|
||||||
|
]
|
||||||
|
# print("secondary candidates after second round", second_round_secondary_candidates)
|
||||||
|
# now we check if they vary between the scans
|
||||||
|
third_round_sec_candidates = list()
|
||||||
|
for i in second_round_secondary_candidates:
|
||||||
|
check_array = list()
|
||||||
|
for keys in dictionary["scan"]:
|
||||||
|
check_array.append(np.average(_finditem(dictionary["scan"][keys], i)))
|
||||||
|
# print(i, check_array, np.std(check_array))
|
||||||
|
if np.std(check_array) > stdev_precision:
|
||||||
|
third_round_sec_candidates.append(i)
|
||||||
|
if len(third_round_sec_candidates) == 1:
|
||||||
|
print("We've got a secondary winner!", third_round_sec_candidates)
|
||||||
|
else:
|
||||||
|
print("Still not sure with secondary :(", third_round_sec_candidates)
|
||||||
|
|
||||||
|
return {"primary": second_round_primary_candidates, "secondary": third_round_sec_candidates}
|
||||||
|
Reference in New Issue
Block a user