Compare commits
38 Commits
Author | SHA1 | Date | |
---|---|---|---|
b2fc2d604a | |||
78096efcef | |||
5b0f97959e | |||
8fb1c5f247 | |||
58641ab94f | |||
a6bcb8ffa1 | |||
7b6e6bf396 | |||
45f295fcf8 | |||
3c58fd2102 | |||
abbaded278 | |||
fbe992c901 | |||
dec282d1b7 | |||
4429823629 | |||
80fddb514a | |||
cfe9832c1e | |||
fd942672df | |||
60cb733ca7 | |||
7c2ecef56d | |||
468f33e606 | |||
dbc643aba9 | |||
0856705024 | |||
ce608f1b49 | |||
3eaf54eda3 | |||
a496267a9d | |||
1a3ebfbcbd | |||
7bcb23c1bd | |||
b28fe39bbb | |||
42c6e6b921 | |||
dba2dc6149 | |||
a0c9b0162b | |||
00b0c2d708 | |||
4ae8890bb8 | |||
430ffc2caa | |||
aee5c82925 | |||
7e16ea0fea | |||
6ff1b2b54f | |||
6099df650b | |||
0347566aeb |
@ -1,10 +1,9 @@
|
||||
import pyzebra.ccl_dict_operation
|
||||
from pyzebra.anatric import *
|
||||
from pyzebra.ccl_findpeaks import ccl_findpeaks
|
||||
from pyzebra.comm_export import export_comm
|
||||
from pyzebra.fit2 import fitccl
|
||||
from pyzebra.h5 import *
|
||||
from pyzebra.load_1D import load_1D, parse_1D
|
||||
from pyzebra.ccl_io import load_1D, parse_1D, export_comm
|
||||
from pyzebra.param_study_moduls import add_dict, auto, merge, scan_dict
|
||||
from pyzebra.xtal import *
|
||||
|
||||
__version__ = "0.1.0"
|
||||
__version__ = "0.1.2"
|
||||
|
@ -2,7 +2,6 @@ import subprocess
|
||||
import xml.etree.ElementTree as ET
|
||||
|
||||
|
||||
ANATRIC_PATH = "/afs/psi.ch/project/sinq/rhel7/bin/anatric"
|
||||
DATA_FACTORY_IMPLEMENTATION = [
|
||||
"trics",
|
||||
"morph",
|
||||
@ -24,8 +23,8 @@ REFLECTION_PRINTER_FORMATS = [
|
||||
ALGORITHMS = ["adaptivemaxcog", "adaptivedynamic"]
|
||||
|
||||
|
||||
def anatric(config_file):
|
||||
subprocess.run([ANATRIC_PATH, config_file], check=True)
|
||||
def anatric(config_file, anatric_path="/afs/psi.ch/project/sinq/rhel7/bin/anatric"):
|
||||
subprocess.run([anatric_path, config_file], check=True)
|
||||
|
||||
|
||||
class AnatricConfig:
|
||||
|
@ -1,4 +1,3 @@
|
||||
import argparse
|
||||
import logging
|
||||
import sys
|
||||
from io import StringIO
|
||||
@ -11,14 +10,8 @@ import panel_ccl_integrate
|
||||
import panel_hdf_anatric
|
||||
import panel_hdf_viewer
|
||||
|
||||
parser = argparse.ArgumentParser(
|
||||
prog="pyzebra", formatter_class=argparse.ArgumentDefaultsHelpFormatter
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
doc = curdoc()
|
||||
doc.title = "pyzebra"
|
||||
|
||||
sys.stdout = StringIO()
|
||||
stdout_textareainput = TextAreaInput(title="print output:", height=150)
|
||||
@ -26,7 +19,7 @@ stdout_textareainput = TextAreaInput(title="print output:", height=150)
|
||||
bokeh_stream = StringIO()
|
||||
bokeh_handler = logging.StreamHandler(bokeh_stream)
|
||||
bokeh_handler.setFormatter(logging.Formatter(logging.BASIC_FORMAT))
|
||||
bokeh_logger = logging.getLogger('bokeh')
|
||||
bokeh_logger = logging.getLogger("bokeh")
|
||||
bokeh_logger.addHandler(bokeh_handler)
|
||||
bokeh_log_textareainput = TextAreaInput(title="server output:", height=150)
|
||||
|
||||
|
30
pyzebra/app/handler.py
Normal file
30
pyzebra/app/handler.py
Normal file
@ -0,0 +1,30 @@
|
||||
from bokeh.application.handlers import Handler
|
||||
|
||||
|
||||
class PyzebraHandler(Handler):
|
||||
"""Provides a mechanism for generic bokeh applications to build up new streamvis documents.
|
||||
"""
|
||||
|
||||
def __init__(self, anatric_path):
|
||||
"""Initialize a pyzebra handler for bokeh applications.
|
||||
|
||||
Args:
|
||||
args (Namespace): Command line parsed arguments.
|
||||
"""
|
||||
super().__init__() # no-op
|
||||
|
||||
self.anatric_path = anatric_path
|
||||
|
||||
def modify_document(self, doc):
|
||||
"""Modify an application document with pyzebra specific features.
|
||||
|
||||
Args:
|
||||
doc (Document) : A bokeh Document to update in-place
|
||||
|
||||
Returns:
|
||||
Document
|
||||
"""
|
||||
doc.title = "pyzebra"
|
||||
doc.anatric_path = self.anatric_path
|
||||
|
||||
return doc
|
@ -2,6 +2,7 @@ import base64
|
||||
import io
|
||||
import os
|
||||
import tempfile
|
||||
from copy import deepcopy
|
||||
|
||||
import numpy as np
|
||||
from bokeh.layouts import column, row
|
||||
@ -9,6 +10,7 @@ from bokeh.models import (
|
||||
Asterisk,
|
||||
BasicTicker,
|
||||
Button,
|
||||
CheckboxEditor,
|
||||
ColumnDataSource,
|
||||
CustomJS,
|
||||
DataRange1d,
|
||||
@ -19,8 +21,10 @@ from bokeh.models import (
|
||||
Line,
|
||||
LinearAxis,
|
||||
Panel,
|
||||
PanTool,
|
||||
Plot,
|
||||
RadioButtonGroup,
|
||||
ResetTool,
|
||||
Scatter,
|
||||
Select,
|
||||
Spacer,
|
||||
@ -30,6 +34,7 @@ from bokeh.models import (
|
||||
TextAreaInput,
|
||||
TextInput,
|
||||
Toggle,
|
||||
WheelZoomTool,
|
||||
Whisker,
|
||||
)
|
||||
|
||||
@ -60,7 +65,7 @@ def create():
|
||||
js_data = ColumnDataSource(data=dict(cont=[], ext=[]))
|
||||
|
||||
def proposal_textinput_callback(_attr, _old, new):
|
||||
ccl_path = os.path.join(PROPOSAL_PATH, new)
|
||||
ccl_path = os.path.join(PROPOSAL_PATH, new.strip())
|
||||
ccl_file_list = []
|
||||
for file in os.listdir(ccl_path):
|
||||
if file.endswith(".ccl"):
|
||||
@ -71,23 +76,30 @@ def create():
|
||||
proposal_textinput = TextInput(title="Enter proposal number:", default_size=145)
|
||||
proposal_textinput.on_change("value", proposal_textinput_callback)
|
||||
|
||||
def ccl_file_select_callback(_attr, _old, new):
|
||||
nonlocal det_data
|
||||
with open(new) as file:
|
||||
_, ext = os.path.splitext(new)
|
||||
det_data = pyzebra.parse_1D(file, ext)
|
||||
|
||||
def _init_datatable():
|
||||
scan_list = list(det_data["scan"].keys())
|
||||
hkl = [
|
||||
f'{int(m["h_index"])} {int(m["k_index"])} {int(m["l_index"])}'
|
||||
for m in det_data["scan"].values()
|
||||
]
|
||||
scan_table_source.data.update(
|
||||
scan=scan_list, hkl=hkl, peaks=[0] * len(scan_list), fit=[0] * len(scan_list)
|
||||
scan=scan_list,
|
||||
hkl=hkl,
|
||||
peaks=[0] * len(scan_list),
|
||||
fit=[0] * len(scan_list),
|
||||
export=[True] * len(scan_list),
|
||||
)
|
||||
scan_table_source.selected.indices = []
|
||||
scan_table_source.selected.indices = [0]
|
||||
|
||||
def ccl_file_select_callback(_attr, _old, new):
|
||||
nonlocal det_data
|
||||
with open(new) as file:
|
||||
_, ext = os.path.splitext(new)
|
||||
det_data = pyzebra.parse_1D(file, ext)
|
||||
|
||||
_init_datatable()
|
||||
|
||||
ccl_file_select = Select(title="Available .ccl files")
|
||||
ccl_file_select.on_change("value", ccl_file_select_callback)
|
||||
|
||||
@ -97,30 +109,35 @@ def create():
|
||||
_, ext = os.path.splitext(upload_button.filename)
|
||||
det_data = pyzebra.parse_1D(file, ext)
|
||||
|
||||
scan_list = list(det_data["scan"].keys())
|
||||
hkl = [
|
||||
f'{int(m["h_index"])} {int(m["k_index"])} {int(m["l_index"])}'
|
||||
for m in det_data["scan"].values()
|
||||
]
|
||||
scan_table_source.data.update(
|
||||
scan=scan_list, hkl=hkl, peaks=[0] * len(scan_list), fit=[0] * len(scan_list)
|
||||
)
|
||||
scan_table_source.selected.indices = []
|
||||
scan_table_source.selected.indices = [0]
|
||||
_init_datatable()
|
||||
|
||||
upload_button = FileInput(accept=".ccl")
|
||||
upload_button.on_change("value", upload_button_callback)
|
||||
|
||||
def append_upload_button_callback(_attr, _old, new):
|
||||
nonlocal det_data
|
||||
with io.StringIO(base64.b64decode(new).decode()) as file:
|
||||
_, ext = os.path.splitext(append_upload_button.filename)
|
||||
append_data = pyzebra.parse_1D(file, ext)
|
||||
|
||||
added = pyzebra.add_dict(det_data, append_data)
|
||||
scan_result = pyzebra.auto(pyzebra.scan_dict(added))
|
||||
det_data = pyzebra.merge(added, added, scan_result)
|
||||
|
||||
_init_datatable()
|
||||
|
||||
append_upload_button = FileInput(accept=".ccl,.dat")
|
||||
append_upload_button.on_change("value", append_upload_button_callback)
|
||||
|
||||
def _update_table():
|
||||
num_of_peaks = [scan.get("num_of_peaks", 0) for scan in det_data["scan"].values()]
|
||||
fit_ok = [(1 if "fit" in scan else 0) for scan in det_data["scan"].values()]
|
||||
scan_table_source.data.update(peaks=num_of_peaks, fit=fit_ok)
|
||||
|
||||
def _update_plot(ind):
|
||||
def _update_plot(scan):
|
||||
nonlocal peak_pos_textinput_lock
|
||||
peak_pos_textinput_lock = True
|
||||
|
||||
scan = det_data["scan"][ind]
|
||||
y = scan["Counts"]
|
||||
x = scan["om"]
|
||||
|
||||
@ -145,18 +162,17 @@ def create():
|
||||
|
||||
fit = scan.get("fit")
|
||||
if fit is not None:
|
||||
x = scan["fit"]["x_fit"]
|
||||
plot_gauss_source.data.update(x=x, y=scan["fit"]["comps"]["gaussian"])
|
||||
plot_bkg_source.data.update(x=x, y=scan["fit"]["comps"]["background"])
|
||||
params = fit["result"].params
|
||||
fit_output_textinput.value = (
|
||||
"%s \n"
|
||||
"Gaussian: centre = %9.4f, sigma = %9.4f, area = %9.4f \n"
|
||||
"background: slope = %9.4f, intercept = %9.4f \n"
|
||||
"Int. area = %9.4f +/- %9.4f \n"
|
||||
"fit area = %9.4f +/- %9.4f \n"
|
||||
"ratio((fit-int)/fit) = %9.4f"
|
||||
% (
|
||||
ind,
|
||||
params["g_cen"].value,
|
||||
params["g_width"].value,
|
||||
params["g_amp"].value,
|
||||
@ -188,13 +204,7 @@ def create():
|
||||
numfit_max_span.location = None
|
||||
|
||||
# Main plot
|
||||
plot = Plot(
|
||||
x_range=DataRange1d(),
|
||||
y_range=DataRange1d(),
|
||||
plot_height=400,
|
||||
plot_width=700,
|
||||
toolbar_location=None,
|
||||
)
|
||||
plot = Plot(x_range=DataRange1d(), y_range=DataRange1d(), plot_height=400, plot_width=700)
|
||||
|
||||
plot.add_layout(LinearAxis(axis_label="Counts"), place="left")
|
||||
plot.add_layout(LinearAxis(axis_label="Omega"), place="below")
|
||||
@ -226,12 +236,28 @@ def create():
|
||||
numfit_max_span = Span(location=None, dimension="height", line_dash="dashed")
|
||||
plot.add_layout(numfit_max_span)
|
||||
|
||||
# Scan select
|
||||
def scan_table_callback(_attr, _old, new):
|
||||
if new:
|
||||
_update_plot(scan_table_source.data["scan"][new[-1]])
|
||||
plot.add_tools(PanTool(), WheelZoomTool(), ResetTool())
|
||||
plot.toolbar.logo = None
|
||||
|
||||
scan_table_source = ColumnDataSource(dict(scan=[], hkl=[], peaks=[], fit=[]))
|
||||
# Scan select
|
||||
def scan_table_select_callback(_attr, old, new):
|
||||
if not new:
|
||||
# skip empty selections
|
||||
return
|
||||
|
||||
# Avoid selection of multiple indicies (via Shift+Click or Ctrl+Click)
|
||||
if len(new) > 1:
|
||||
# drop selection to the previous one
|
||||
scan_table_source.selected.indices = old
|
||||
return
|
||||
|
||||
if len(old) > 1:
|
||||
# skip unnecessary update caused by selection drop
|
||||
return
|
||||
|
||||
_update_plot(det_data["scan"][scan_table_source.data["scan"][new[0]]])
|
||||
|
||||
scan_table_source = ColumnDataSource(dict(scan=[], hkl=[], peaks=[], fit=[], export=[]))
|
||||
scan_table = DataTable(
|
||||
source=scan_table_source,
|
||||
columns=[
|
||||
@ -239,25 +265,30 @@ def create():
|
||||
TableColumn(field="hkl", title="hkl"),
|
||||
TableColumn(field="peaks", title="Peaks"),
|
||||
TableColumn(field="fit", title="Fit"),
|
||||
TableColumn(field="export", title="Export", editor=CheckboxEditor()),
|
||||
],
|
||||
width=200,
|
||||
width=250,
|
||||
index_position=None,
|
||||
editable=True,
|
||||
)
|
||||
|
||||
scan_table_source.selected.on_change("indices", scan_table_callback)
|
||||
scan_table_source.selected.on_change("indices", scan_table_select_callback)
|
||||
|
||||
def _get_selected_scan():
|
||||
selected_index = scan_table_source.selected.indices[0]
|
||||
selected_scan_id = scan_table_source.data["scan"][selected_index]
|
||||
return det_data["scan"][selected_scan_id]
|
||||
|
||||
def peak_pos_textinput_callback(_attr, _old, new):
|
||||
if new is not None and not peak_pos_textinput_lock:
|
||||
sel_ind = scan_table_source.selected.indices[-1]
|
||||
scan_name = scan_table_source.data["scan"][sel_ind]
|
||||
scan = det_data["scan"][scan_name]
|
||||
scan = _get_selected_scan()
|
||||
|
||||
scan["num_of_peaks"] = 1
|
||||
peak_ind = (np.abs(scan["om"] - float(new))).argmin()
|
||||
scan["peak_indexes"] = np.array([peak_ind], dtype=np.int64)
|
||||
scan["peak_heights"] = np.array([scan["smooth_peaks"][peak_ind]])
|
||||
_update_table()
|
||||
_update_plot(scan_name)
|
||||
_update_plot(scan)
|
||||
|
||||
peak_pos_textinput = TextInput(title="Peak position:", default_size=145)
|
||||
peak_pos_textinput.on_change("value", peak_pos_textinput_callback)
|
||||
@ -322,30 +353,8 @@ def create():
|
||||
|
||||
fit_output_textinput = TextAreaInput(title="Fit results:", width=450, height=400)
|
||||
|
||||
def peakfind_all_button_callback():
|
||||
for scan in det_data["scan"].values():
|
||||
pyzebra.ccl_findpeaks(
|
||||
scan,
|
||||
int_threshold=peak_int_ratio_spinner.value,
|
||||
prominence=peak_prominence_spinner.value,
|
||||
smooth=smooth_toggle.active,
|
||||
window_size=window_size_spinner.value,
|
||||
poly_order=poly_order_spinner.value,
|
||||
)
|
||||
|
||||
_update_table()
|
||||
|
||||
sel_ind = scan_table_source.selected.indices[-1]
|
||||
_update_plot(scan_table_source.data["scan"][sel_ind])
|
||||
|
||||
peakfind_all_button = Button(label="Peak Find All", button_type="primary", default_size=145)
|
||||
peakfind_all_button.on_click(peakfind_all_button_callback)
|
||||
|
||||
def peakfind_button_callback():
|
||||
sel_ind = scan_table_source.selected.indices[-1]
|
||||
scan = scan_table_source.data["scan"][sel_ind]
|
||||
pyzebra.ccl_findpeaks(
|
||||
det_data["scan"][scan],
|
||||
def _get_peakfind_params():
|
||||
return dict(
|
||||
int_threshold=peak_int_ratio_spinner.value,
|
||||
prominence=peak_prominence_spinner.value,
|
||||
smooth=smooth_toggle.active,
|
||||
@ -353,61 +362,29 @@ def create():
|
||||
poly_order=poly_order_spinner.value,
|
||||
)
|
||||
|
||||
def peakfind_all_button_callback():
|
||||
peakfind_params = _get_peakfind_params()
|
||||
for scan in det_data["scan"].values():
|
||||
pyzebra.ccl_findpeaks(scan, **peakfind_params)
|
||||
|
||||
_update_table()
|
||||
_update_plot(_get_selected_scan())
|
||||
|
||||
peakfind_all_button = Button(label="Peak Find All", button_type="primary", default_size=145)
|
||||
peakfind_all_button.on_click(peakfind_all_button_callback)
|
||||
|
||||
def peakfind_button_callback():
|
||||
scan = _get_selected_scan()
|
||||
pyzebra.ccl_findpeaks(scan, **_get_peakfind_params())
|
||||
|
||||
_update_table()
|
||||
_update_plot(scan)
|
||||
|
||||
peakfind_button = Button(label="Peak Find Current", default_size=145)
|
||||
peakfind_button.on_click(peakfind_button_callback)
|
||||
|
||||
def fit_all_button_callback():
|
||||
for scan in det_data["scan"].values():
|
||||
pyzebra.fitccl(
|
||||
scan,
|
||||
guess=[
|
||||
centre_guess.value,
|
||||
sigma_guess.value,
|
||||
ampl_guess.value,
|
||||
slope_guess.value,
|
||||
offset_guess.value,
|
||||
],
|
||||
vary=[
|
||||
centre_vary.active,
|
||||
sigma_vary.active,
|
||||
ampl_vary.active,
|
||||
slope_vary.active,
|
||||
offset_vary.active,
|
||||
],
|
||||
constraints_min=[
|
||||
centre_min.value,
|
||||
sigma_min.value,
|
||||
ampl_min.value,
|
||||
slope_min.value,
|
||||
offset_min.value,
|
||||
],
|
||||
constraints_max=[
|
||||
centre_max.value,
|
||||
sigma_max.value,
|
||||
ampl_max.value,
|
||||
slope_max.value,
|
||||
offset_max.value,
|
||||
],
|
||||
numfit_min=integ_from.value,
|
||||
numfit_max=integ_to.value,
|
||||
)
|
||||
|
||||
sel_ind = scan_table_source.selected.indices[-1]
|
||||
_update_plot(scan_table_source.data["scan"][sel_ind])
|
||||
_update_table()
|
||||
|
||||
fit_all_button = Button(label="Fit All", button_type="primary", default_size=145)
|
||||
fit_all_button.on_click(fit_all_button_callback)
|
||||
|
||||
def fit_button_callback():
|
||||
sel_ind = scan_table_source.selected.indices[-1]
|
||||
scan = scan_table_source.data["scan"][sel_ind]
|
||||
|
||||
pyzebra.fitccl(
|
||||
det_data["scan"][scan],
|
||||
def _get_fit_params():
|
||||
return dict(
|
||||
guess=[
|
||||
centre_guess.value,
|
||||
sigma_guess.value,
|
||||
@ -438,8 +415,25 @@ def create():
|
||||
],
|
||||
numfit_min=integ_from.value,
|
||||
numfit_max=integ_to.value,
|
||||
binning=bin_size_spinner.value,
|
||||
)
|
||||
|
||||
def fit_all_button_callback():
|
||||
fit_params = _get_fit_params()
|
||||
for scan in det_data["scan"].values():
|
||||
# fit_params are updated inplace within `fitccl`
|
||||
pyzebra.fitccl(scan, **deepcopy(fit_params))
|
||||
|
||||
_update_plot(_get_selected_scan())
|
||||
_update_table()
|
||||
|
||||
fit_all_button = Button(label="Fit All", button_type="primary", default_size=145)
|
||||
fit_all_button.on_click(fit_all_button_callback)
|
||||
|
||||
def fit_button_callback():
|
||||
scan = _get_selected_scan()
|
||||
pyzebra.fitccl(scan, **_get_fit_params())
|
||||
|
||||
_update_plot(scan)
|
||||
_update_table()
|
||||
|
||||
@ -454,6 +448,10 @@ def create():
|
||||
)
|
||||
area_method_radiobutton.on_change("active", area_method_radiobutton_callback)
|
||||
|
||||
bin_size_spinner = Spinner(title="Bin size:", value=1, low=1, step=1, default_size=145)
|
||||
|
||||
lorentz_toggle = Toggle(label="Lorentz Correction", default_size=145)
|
||||
|
||||
preview_output_textinput = TextAreaInput(title="Export file preview:", width=450, height=400)
|
||||
|
||||
def preview_output_button_callback():
|
||||
@ -464,7 +462,11 @@ def create():
|
||||
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
temp_file = temp_dir + "/temp"
|
||||
pyzebra.export_comm(det_data, temp_file)
|
||||
export_data = deepcopy(det_data)
|
||||
for s, export in zip(scan_table_source.data["scan"], scan_table_source.data["export"]):
|
||||
if not export:
|
||||
del export_data["scan"][s]
|
||||
pyzebra.export_comm(export_data, temp_file, lorentz=lorentz_toggle.active)
|
||||
|
||||
with open(f"{temp_file}{ext}") as f:
|
||||
preview_output_textinput.value = f.read()
|
||||
@ -480,7 +482,11 @@ def create():
|
||||
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
temp_file = temp_dir + "/temp"
|
||||
pyzebra.export_comm(det_data, temp_file)
|
||||
export_data = deepcopy(det_data)
|
||||
for s, export in zip(scan_table_source.data["scan"], scan_table_source.data["export"]):
|
||||
if not export:
|
||||
del export_data["scan"][s]
|
||||
pyzebra.export_comm(export_data, temp_file, lorentz=lorentz_toggle.active)
|
||||
|
||||
with open(f"{temp_file}{ext}") as f:
|
||||
output_content = f.read()
|
||||
@ -530,6 +536,7 @@ def create():
|
||||
Spacer(width=20),
|
||||
column(
|
||||
row(integ_from, integ_to),
|
||||
row(bin_size_spinner, column(Spacer(height=19), lorentz_toggle)),
|
||||
row(fitparam_reset_button, area_method_radiobutton),
|
||||
row(fit_button, fit_all_button),
|
||||
),
|
||||
@ -538,9 +545,15 @@ def create():
|
||||
export_layout = column(preview_output_textinput, row(preview_output_button, save_button))
|
||||
|
||||
upload_div = Div(text="Or upload .ccl file:")
|
||||
append_upload_div = Div(text="append extra .ccl/.dat files:")
|
||||
tab_layout = column(
|
||||
row(proposal_textinput, ccl_file_select),
|
||||
row(column(Spacer(height=5), upload_div), upload_button),
|
||||
row(
|
||||
column(Spacer(height=5), upload_div),
|
||||
upload_button,
|
||||
column(Spacer(height=5), append_upload_div),
|
||||
append_upload_button,
|
||||
),
|
||||
row(scan_table, plot, Spacer(width=30), fit_output_textinput, export_layout),
|
||||
row(findpeak_controls, Spacer(width=30), fitpeak_controls),
|
||||
)
|
||||
|
@ -21,6 +21,7 @@ from pyzebra.anatric import DATA_FACTORY_IMPLEMENTATION, REFLECTION_PRINTER_FORM
|
||||
|
||||
|
||||
def create():
|
||||
doc = curdoc()
|
||||
config = pyzebra.AnatricConfig()
|
||||
|
||||
def _load_config_file(file):
|
||||
@ -345,7 +346,7 @@ def create():
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
temp_file = temp_dir + "/temp.xml"
|
||||
config.save_as(temp_file)
|
||||
pyzebra.anatric(temp_file)
|
||||
pyzebra.anatric(temp_file, anatric_path=doc.anatric_path)
|
||||
|
||||
with open(config.logfile) as f_log:
|
||||
output_log.value = f_log.read()
|
||||
@ -404,6 +405,6 @@ def create():
|
||||
with open("debug.xml") as f_config:
|
||||
output_config.value = f_config.read()
|
||||
|
||||
curdoc().add_periodic_callback(update_config, 1000)
|
||||
doc.add_periodic_callback(update_config, 1000)
|
||||
|
||||
return Panel(child=tab_layout, title="hdf anatric")
|
||||
|
@ -74,8 +74,8 @@ def create():
|
||||
image_source.data.update(image=[current_image])
|
||||
|
||||
if auto_toggle.active:
|
||||
im_max = int(np.max(current_image))
|
||||
im_min = int(np.min(current_image))
|
||||
im_min = np.min(current_image)
|
||||
im_max = np.max(current_image)
|
||||
|
||||
display_min_spinner.value = im_min
|
||||
display_max_spinner.value = im_max
|
||||
@ -83,8 +83,15 @@ def create():
|
||||
image_glyph.color_mapper.low = im_min
|
||||
image_glyph.color_mapper.high = im_max
|
||||
|
||||
magnetic_field_spinner.value = det_data["magnetic_field"][index]
|
||||
temperature_spinner.value = det_data["temperature"][index]
|
||||
if "magnetic_field" in det_data:
|
||||
magnetic_field_spinner.value = det_data["magnetic_field"][index]
|
||||
else:
|
||||
magnetic_field_spinner.value = None
|
||||
|
||||
if "temperature" in det_data:
|
||||
temperature_spinner.value = det_data["temperature"][index]
|
||||
else:
|
||||
temperature_spinner.value = None
|
||||
|
||||
gamma, nu = calculate_pol(det_data, index)
|
||||
omega = np.ones((IMAGE_H, IMAGE_W)) * det_data["rot_angle"][index]
|
||||
@ -99,6 +106,18 @@ def create():
|
||||
overview_plot_x_image_source.data.update(image=[overview_x], dw=[n_x])
|
||||
overview_plot_y_image_source.data.update(image=[overview_y], dw=[n_y])
|
||||
|
||||
if proj_auto_toggle.active:
|
||||
im_min = min(np.min(overview_x), np.min(overview_y))
|
||||
im_max = max(np.max(overview_x), np.max(overview_y))
|
||||
|
||||
proj_display_min_spinner.value = im_min
|
||||
proj_display_max_spinner.value = im_max
|
||||
|
||||
overview_plot_x_image_glyph.color_mapper.low = im_min
|
||||
overview_plot_y_image_glyph.color_mapper.low = im_min
|
||||
overview_plot_x_image_glyph.color_mapper.high = im_max
|
||||
overview_plot_y_image_glyph.color_mapper.high = im_max
|
||||
|
||||
if frame_button_group.active == 0: # Frame
|
||||
overview_plot_x.axis[1].axis_label = "Frame"
|
||||
overview_plot_y.axis[1].axis_label = "Frame"
|
||||
@ -388,7 +407,6 @@ def create():
|
||||
radio_button_group = RadioButtonGroup(labels=["nb", "nb_bi"], active=0)
|
||||
|
||||
STEP = 1
|
||||
|
||||
# ---- colormap auto toggle button
|
||||
def auto_toggle_callback(state):
|
||||
if state:
|
||||
@ -400,7 +418,9 @@ def create():
|
||||
|
||||
update_image()
|
||||
|
||||
auto_toggle = Toggle(label="Auto Range", active=True, button_type="default", default_size=145)
|
||||
auto_toggle = Toggle(
|
||||
label="Main Auto Range", active=True, button_type="default", default_size=125
|
||||
)
|
||||
auto_toggle.on_click(auto_toggle_callback)
|
||||
|
||||
# ---- colormap display max value
|
||||
@ -409,12 +429,12 @@ def create():
|
||||
image_glyph.color_mapper.high = new_value
|
||||
|
||||
display_max_spinner = Spinner(
|
||||
title="Maximal Display Value:",
|
||||
title="Max Value:",
|
||||
low=0 + STEP,
|
||||
value=1,
|
||||
step=STEP,
|
||||
disabled=auto_toggle.active,
|
||||
default_size=145,
|
||||
default_size=80,
|
||||
)
|
||||
display_max_spinner.on_change("value", display_max_spinner_callback)
|
||||
|
||||
@ -424,15 +444,66 @@ def create():
|
||||
image_glyph.color_mapper.low = new_value
|
||||
|
||||
display_min_spinner = Spinner(
|
||||
title="Minimal Display Value:",
|
||||
title="Min Value:",
|
||||
low=0,
|
||||
high=1 - STEP,
|
||||
value=0,
|
||||
step=STEP,
|
||||
disabled=auto_toggle.active,
|
||||
default_size=145,
|
||||
default_size=80,
|
||||
)
|
||||
display_min_spinner.on_change("value", display_min_spinner_callback)
|
||||
|
||||
PROJ_STEP = 0.1
|
||||
# ---- proj colormap auto toggle button
|
||||
def proj_auto_toggle_callback(state):
|
||||
if state:
|
||||
proj_display_min_spinner.disabled = True
|
||||
proj_display_max_spinner.disabled = True
|
||||
else:
|
||||
proj_display_min_spinner.disabled = False
|
||||
proj_display_max_spinner.disabled = False
|
||||
|
||||
update_overview_plot()
|
||||
|
||||
proj_auto_toggle = Toggle(
|
||||
label="Proj Auto Range", active=True, button_type="default", default_size=125
|
||||
)
|
||||
proj_auto_toggle.on_click(proj_auto_toggle_callback)
|
||||
|
||||
# ---- proj colormap display max value
|
||||
def proj_display_max_spinner_callback(_attr, _old_value, new_value):
|
||||
proj_display_min_spinner.high = new_value - PROJ_STEP
|
||||
overview_plot_x_image_glyph.color_mapper.high = new_value
|
||||
overview_plot_y_image_glyph.color_mapper.high = new_value
|
||||
|
||||
proj_display_max_spinner = Spinner(
|
||||
title="Max Value:",
|
||||
low=0 + PROJ_STEP,
|
||||
value=1,
|
||||
step=PROJ_STEP,
|
||||
disabled=proj_auto_toggle.active,
|
||||
default_size=80,
|
||||
)
|
||||
proj_display_max_spinner.on_change("value", proj_display_max_spinner_callback)
|
||||
|
||||
# ---- proj colormap display min value
|
||||
def proj_display_min_spinner_callback(_attr, _old_value, new_value):
|
||||
proj_display_max_spinner.low = new_value + PROJ_STEP
|
||||
overview_plot_x_image_glyph.color_mapper.low = new_value
|
||||
overview_plot_y_image_glyph.color_mapper.low = new_value
|
||||
|
||||
proj_display_min_spinner = Spinner(
|
||||
title="Min Value:",
|
||||
low=0,
|
||||
high=1 - PROJ_STEP,
|
||||
value=0,
|
||||
step=PROJ_STEP,
|
||||
disabled=proj_auto_toggle.active,
|
||||
default_size=80,
|
||||
)
|
||||
proj_display_min_spinner.on_change("value", proj_display_min_spinner_callback)
|
||||
|
||||
def hkl_button_callback():
|
||||
index = index_spinner.value
|
||||
setup_type = "nb_bi" if radio_button_group.active else "nb"
|
||||
@ -474,8 +545,13 @@ def create():
|
||||
# Final layout
|
||||
layout_image = column(gridplot([[proj_v, None], [plot, proj_h]], merge_tools=False))
|
||||
colormap_layout = column(
|
||||
row(colormap, column(Spacer(height=19), auto_toggle)),
|
||||
row(display_max_spinner, display_min_spinner),
|
||||
row(colormap),
|
||||
row(column(Spacer(height=19), auto_toggle), display_max_spinner, display_min_spinner),
|
||||
row(
|
||||
column(Spacer(height=19), proj_auto_toggle),
|
||||
proj_display_max_spinner,
|
||||
proj_display_min_spinner,
|
||||
),
|
||||
)
|
||||
hkl_layout = column(radio_button_group, hkl_button)
|
||||
params_layout = row(magnetic_field_spinner, temperature_spinner)
|
||||
|
@ -1,513 +0,0 @@
|
||||
import numpy as np
|
||||
import uncertainties as u
|
||||
|
||||
from .fit2 import create_uncertanities
|
||||
|
||||
|
||||
def add_dict(dict1, dict2):
|
||||
"""adds two dictionaries, meta of the new is saved as meata+original_filename and
|
||||
measurements are shifted to continue with numbering of first dict
|
||||
:arg dict1 : dictionarry to add to
|
||||
:arg dict2 : dictionarry from which to take the measurements
|
||||
:return dict1 : combined dictionary
|
||||
Note: dict1 must be made from ccl, otherwise we would have to change the structure of loaded
|
||||
dat file"""
|
||||
max_measurement_dict1 = max([int(str(keys)[1:]) for keys in dict1["scan"]])
|
||||
if dict2["meta"]["data_type"] == ".ccl":
|
||||
new_filenames = [
|
||||
"M" + str(x + max_measurement_dict1)
|
||||
for x in [int(str(keys)[1:]) for keys in dict2["scan"]]
|
||||
]
|
||||
new_meta_name = "meta" + str(dict2["meta"]["original_filename"])
|
||||
if new_meta_name not in dict1:
|
||||
for keys, name in zip(dict2["scan"], new_filenames):
|
||||
dict2["scan"][keys]["file_of_origin"] = str(dict2["meta"]["original_filename"])
|
||||
dict1["scan"][name] = dict2["scan"][keys]
|
||||
|
||||
dict1[new_meta_name] = dict2["meta"]
|
||||
|
||||
else:
|
||||
raise KeyError(
|
||||
str(
|
||||
"The file %s has alredy been added to %s"
|
||||
% (dict2["meta"]["original_filename"], dict1["meta"]["original_filename"])
|
||||
)
|
||||
)
|
||||
elif dict2["meta"]["data_type"] == ".dat":
|
||||
d = {}
|
||||
new_name = "M" + str(max_measurement_dict1 + 1)
|
||||
hkl = dict2["meta"]["title"]
|
||||
d["h_index"] = float(hkl.split()[-3])
|
||||
d["k_index"] = float(hkl.split()[-2])
|
||||
d["l_index"] = float(hkl.split()[-1])
|
||||
d["number_of_measurements"] = len(dict2["scan"]["NP"])
|
||||
d["om"] = dict2["scan"]["om"]
|
||||
d["Counts"] = dict2["scan"]["Counts"]
|
||||
d["monitor"] = dict2["scan"]["Monitor1"][0]
|
||||
d["temperature"] = dict2["meta"]["temp"]
|
||||
d["mag_field"] = dict2["meta"]["mf"]
|
||||
d["omega_angle"] = dict2["meta"]["omega"]
|
||||
dict1["scan"][new_name] = d
|
||||
print(hkl.split())
|
||||
for keys in d:
|
||||
print(keys)
|
||||
|
||||
print("s")
|
||||
|
||||
return dict1
|
||||
|
||||
|
||||
def auto(dict):
|
||||
"""takes just unique tuples from all tuples in dictionary returend by scan_dict
|
||||
intendet for automatic merge if you doesent want to specify what scans to merge together
|
||||
args: dict - dictionary from scan_dict function
|
||||
:return dict - dict without repetitions"""
|
||||
for keys in dict:
|
||||
tuple_list = dict[keys]
|
||||
new = list()
|
||||
for i in range(len(tuple_list)):
|
||||
if tuple_list[0][0] == tuple_list[i][0]:
|
||||
new.append(tuple_list[i])
|
||||
dict[keys] = new
|
||||
return dict
|
||||
|
||||
|
||||
def scan_dict(dict):
|
||||
"""scans dictionary for duplicate hkl indexes
|
||||
:arg dict : dictionary to scan
|
||||
:return dictionary with matching scans, if there are none, the dict is empty
|
||||
note: can be checked by "not d", true if empty
|
||||
"""
|
||||
|
||||
d = {}
|
||||
for i in dict["scan"]:
|
||||
for j in dict["scan"]:
|
||||
if dict["scan"][str(i)] != dict["scan"][str(j)]:
|
||||
itup = (
|
||||
dict["scan"][str(i)]["h_index"],
|
||||
dict["scan"][str(i)]["k_index"],
|
||||
dict["scan"][str(i)]["l_index"],
|
||||
)
|
||||
jtup = (
|
||||
dict["scan"][str(j)]["h_index"],
|
||||
dict["scan"][str(j)]["k_index"],
|
||||
dict["scan"][str(j)]["l_index"],
|
||||
)
|
||||
if itup != jtup:
|
||||
pass
|
||||
else:
|
||||
|
||||
if str(itup) not in d:
|
||||
d[str(itup)] = list()
|
||||
d[str(itup)].append((i, j))
|
||||
else:
|
||||
d[str(itup)].append((i, j))
|
||||
else:
|
||||
continue
|
||||
return d
|
||||
|
||||
|
||||
def compare_hkl(dict1, dict2):
|
||||
"""Compares two dictionaries based on hkl indexes and return dictionary with str(h k l) as
|
||||
key and tuple with keys to same scan in dict1 and dict2
|
||||
:arg dict1 : first dictionary
|
||||
:arg dict2 : second dictionary
|
||||
:return d : dict with matches
|
||||
example of one key: '0.0 0.0 -1.0 : ('M1', 'M9')' meaning that 001 hkl scan is M1 in
|
||||
first dict and M9 in second"""
|
||||
d = {}
|
||||
dupl = 0
|
||||
for keys in dict1["scan"]:
|
||||
for key in dict2["scan"]:
|
||||
if (
|
||||
dict1["scan"][str(keys)]["h_index"] == dict2["scan"][str(key)]["h_index"]
|
||||
and dict1["scan"][str(keys)]["k_index"] == dict2["scan"][str(key)]["k_index"]
|
||||
and dict1["scan"][str(keys)]["l_index"] == dict2["scan"][str(key)]["l_index"]
|
||||
):
|
||||
|
||||
if (
|
||||
str(
|
||||
(
|
||||
str(dict1["scan"][str(keys)]["h_index"])
|
||||
+ " "
|
||||
+ str(dict1["scan"][str(keys)]["k_index"])
|
||||
+ " "
|
||||
+ str(dict1["scan"][str(keys)]["l_index"])
|
||||
)
|
||||
)
|
||||
not in d
|
||||
):
|
||||
d[
|
||||
str(
|
||||
str(dict1["scan"][str(keys)]["h_index"])
|
||||
+ " "
|
||||
+ str(dict1["scan"][str(keys)]["k_index"])
|
||||
+ " "
|
||||
+ str(dict1["scan"][str(keys)]["l_index"])
|
||||
)
|
||||
] = (str(keys), str(key))
|
||||
else:
|
||||
dupl = dupl + 1
|
||||
d[
|
||||
str(
|
||||
str(dict1["scan"][str(keys)]["h_index"])
|
||||
+ " "
|
||||
+ str(dict1["scan"][str(keys)]["k_index"])
|
||||
+ " "
|
||||
+ str(dict1["scan"][str(keys)]["l_index"])
|
||||
+ "_dupl"
|
||||
+ str(dupl)
|
||||
)
|
||||
] = (str(keys), str(key))
|
||||
else:
|
||||
continue
|
||||
|
||||
return d
|
||||
|
||||
|
||||
def create_tuples(x, y, y_err):
|
||||
"""creates tuples for sorting and merginng of the data
|
||||
Counts need to be normalized to monitor before"""
|
||||
t = list()
|
||||
for i in range(len(x)):
|
||||
tup = (x[i], y[i], y_err[i])
|
||||
t.append(tup)
|
||||
return t
|
||||
|
||||
|
||||
def normalize(dict, key, monitor):
|
||||
"""Normalizes the scan to monitor, checks if sigma exists, otherwise creates it
|
||||
:arg dict : dictionary to from which to tkae the scan
|
||||
:arg key : which scan to normalize from dict1
|
||||
:arg monitor : final monitor
|
||||
:return counts - normalized counts
|
||||
:return sigma - normalized sigma"""
|
||||
|
||||
counts = np.array(dict["scan"][key]["Counts"])
|
||||
sigma = np.sqrt(counts) if "sigma" not in dict["scan"][key] else dict["scan"][key]["sigma"]
|
||||
monitor_ratio = monitor / dict["scan"][key]["monitor"]
|
||||
scaled_counts = counts * monitor_ratio
|
||||
scaled_sigma = np.array(sigma) * monitor_ratio
|
||||
|
||||
return scaled_counts, scaled_sigma
|
||||
|
||||
|
||||
def merge(dict1, dict2, keys, auto=True, monitor=100000):
|
||||
"""merges the two tuples and sorts them, if om value is same, Counts value is average
|
||||
averaging is propagated into sigma if dict1 == dict2, key[1] is deleted after merging
|
||||
:arg dict1 : dictionary to which scan will be merged
|
||||
:arg dict2 : dictionary from which scan will be merged
|
||||
:arg keys : tuple with key to dict1 and dict2
|
||||
:arg auto : if true, when monitors are same, does not change it, if flase, takes monitor always
|
||||
:arg monitor : final monitor after merging
|
||||
note: dict1 and dict2 can be same dict
|
||||
:return dict1 with merged scan"""
|
||||
if auto:
|
||||
if dict1["scan"][keys[0]]["monitor"] == dict2["scan"][keys[1]]["monitor"]:
|
||||
monitor = dict1["scan"][keys[0]]["monitor"]
|
||||
|
||||
# load om and Counts
|
||||
x1, x2 = dict1["scan"][keys[0]]["om"], dict2["scan"][keys[1]]["om"]
|
||||
cor_y1, y_err1 = normalize(dict1, keys[0], monitor=monitor)
|
||||
cor_y2, y_err2 = normalize(dict2, keys[1], monitor=monitor)
|
||||
# creates touples (om, Counts, sigma) for sorting and further processing
|
||||
tuple_list = create_tuples(x1, cor_y1, y_err1) + create_tuples(x2, cor_y2, y_err2)
|
||||
# Sort the list on om and add 0 0 0 tuple to the last position
|
||||
sorted_t = sorted(tuple_list, key=lambda tup: tup[0])
|
||||
sorted_t.append((0, 0, 0))
|
||||
om, Counts, sigma = [], [], []
|
||||
seen = list()
|
||||
for i in range(len(sorted_t) - 1):
|
||||
if sorted_t[i][0] not in seen:
|
||||
if sorted_t[i][0] != sorted_t[i + 1][0]:
|
||||
om = np.append(om, sorted_t[i][0])
|
||||
Counts = np.append(Counts, sorted_t[i][1])
|
||||
sigma = np.append(sigma, sorted_t[i][2])
|
||||
else:
|
||||
om = np.append(om, sorted_t[i][0])
|
||||
counts1, counts2 = sorted_t[i][1], sorted_t[i + 1][1]
|
||||
sigma1, sigma2 = sorted_t[i][2], sorted_t[i + 1][2]
|
||||
count_err1 = u.ufloat(counts1, sigma1)
|
||||
count_err2 = u.ufloat(counts2, sigma2)
|
||||
avg = (count_err1 + count_err2) / 2
|
||||
Counts = np.append(Counts, avg.n)
|
||||
sigma = np.append(sigma, avg.s)
|
||||
seen.append(sorted_t[i][0])
|
||||
else:
|
||||
continue
|
||||
|
||||
if dict1 == dict2:
|
||||
del dict1["scan"][keys[1]]
|
||||
|
||||
note = (
|
||||
f"This scan was merged with scan {keys[1]} from "
|
||||
f'file {dict2["meta"]["original_filename"]} \n'
|
||||
)
|
||||
if "notes" not in dict1["scan"][str(keys[0])]:
|
||||
dict1["scan"][str(keys[0])]["notes"] = note
|
||||
else:
|
||||
dict1["scan"][str(keys[0])]["notes"] += note
|
||||
|
||||
dict1["scan"][keys[0]]["om"] = om
|
||||
dict1["scan"][keys[0]]["Counts"] = Counts
|
||||
dict1["scan"][keys[0]]["sigma"] = sigma
|
||||
dict1["scan"][keys[0]]["monitor"] = monitor
|
||||
print("merging done")
|
||||
return dict1
|
||||
|
||||
|
||||
def substract_measurement(dict1, dict2, keys, auto=True, monitor=100000):
|
||||
"""Substracts two scan (scan key2 from dict2 from measurent key1 in dict1), expects om to be same
|
||||
:arg dict1 : dictionary to which scan will be merged
|
||||
:arg dict2 : dictionary from which scan will be merged
|
||||
:arg keys : tuple with key to dict1 and dict2
|
||||
:arg auto : if true, when monitors are same, does not change it, if flase, takes monitor always
|
||||
:arg monitor : final monitor after merging
|
||||
:returns d : dict1 with substracted Counts from dict2 and sigma that comes from the substraction"""
|
||||
|
||||
if len(dict1["scan"][keys[0]]["om"]) != len(dict2["scan"][keys[1]]["om"]):
|
||||
raise ValueError("Omegas have different lengths, cannot be substracted")
|
||||
|
||||
if auto:
|
||||
if dict1["scan"][keys[0]]["monitor"] == dict2["scan"][keys[1]]["monitor"]:
|
||||
monitor = dict1["scan"][keys[0]]["monitor"]
|
||||
|
||||
cor_y1, y_err1 = normalize(dict1, keys[0], monitor=monitor)
|
||||
cor_y2, y_err2 = normalize(dict2, keys[1], monitor=monitor)
|
||||
|
||||
dict1_count_err = create_uncertanities(cor_y1, y_err1)
|
||||
dict2_count_err = create_uncertanities(cor_y2, y_err2)
|
||||
|
||||
res = np.subtract(dict1_count_err, dict2_count_err)
|
||||
|
||||
res_nom = []
|
||||
res_err = []
|
||||
for k in range(len(res)):
|
||||
res_nom = np.append(res_nom, res[k].n)
|
||||
res_err = np.append(res_err, res[k].s)
|
||||
|
||||
if len([num for num in res_nom if num < 0]) >= 0.3 * len(res_nom):
|
||||
print(
|
||||
f"Warning! percentage of negative numbers in scan subsracted {keys[0]} is "
|
||||
f"{len([num for num in res_nom if num < 0]) / len(res_nom)}"
|
||||
)
|
||||
|
||||
dict1["scan"][str(keys[0])]["Counts"] = res_nom
|
||||
dict1["scan"][str(keys[0])]["sigma"] = res_err
|
||||
dict1["scan"][str(keys[0])]["monitor"] = monitor
|
||||
note = (
|
||||
f'Scan {keys[1]} from file {dict2["meta"]["original_filename"]} '
|
||||
f"was substracted from this scan \n"
|
||||
)
|
||||
if "notes" not in dict1["scan"][str(keys[0])]:
|
||||
dict1["scan"][str(keys[0])]["notes"] = note
|
||||
else:
|
||||
dict1["scan"][str(keys[0])]["notes"] += note
|
||||
return dict1
|
||||
|
||||
|
||||
def compare_dict(dict1, dict2):
|
||||
"""takes two ccl dictionaries and compare different values for each key
|
||||
:arg dict1 : dictionary 1 (ccl)
|
||||
:arg dict2 : dictionary 2 (ccl)
|
||||
:returns warning : dictionary with keys from primary files (if they differ) with
|
||||
information of how many scan differ and which ones differ
|
||||
:returns report_string string comparing all different values respecively of measurements"""
|
||||
|
||||
if dict1["meta"]["data_type"] != dict2["meta"]["data_type"]:
|
||||
print("select two dicts")
|
||||
return
|
||||
S = []
|
||||
conflicts = {}
|
||||
warnings = {}
|
||||
|
||||
comp = compare_hkl(dict1, dict2)
|
||||
d1 = scan_dict(dict1)
|
||||
d2 = scan_dict(dict2)
|
||||
if not d1:
|
||||
S.append("There are no duplicates in %s (dict1) \n" % dict1["meta"]["original_filename"])
|
||||
else:
|
||||
S.append(
|
||||
"There are %d duplicates in %s (dict1) \n"
|
||||
% (len(d1), dict1["meta"]["original_filename"])
|
||||
)
|
||||
warnings["Duplicates in dict1"] = list()
|
||||
for keys in d1:
|
||||
S.append("Measurements %s with hkl %s \n" % (d1[keys], keys))
|
||||
warnings["Duplicates in dict1"].append(d1[keys])
|
||||
if not d2:
|
||||
S.append("There are no duplicates in %s (dict2) \n" % dict2["meta"]["original_filename"])
|
||||
else:
|
||||
S.append(
|
||||
"There are %d duplicates in %s (dict2) \n"
|
||||
% (len(d2), dict2["meta"]["original_filename"])
|
||||
)
|
||||
warnings["Duplicates in dict2"] = list()
|
||||
for keys in d2:
|
||||
S.append("Measurements %s with hkl %s \n" % (d2[keys], keys))
|
||||
warnings["Duplicates in dict2"].append(d2[keys])
|
||||
|
||||
# compare meta
|
||||
S.append("Different values in meta: \n")
|
||||
different_meta = {
|
||||
k: dict1["meta"][k]
|
||||
for k in dict1["meta"]
|
||||
if k in dict2["meta"] and dict1["meta"][k] != dict2["meta"][k]
|
||||
}
|
||||
exlude_meta_set = ["original_filename", "date", "title"]
|
||||
for keys in different_meta:
|
||||
if keys in exlude_meta_set:
|
||||
continue
|
||||
else:
|
||||
if keys not in conflicts:
|
||||
conflicts[keys] = 1
|
||||
else:
|
||||
conflicts[keys] = conflicts[keys] + 1
|
||||
|
||||
S.append(" Different values in %s \n" % str(keys))
|
||||
S.append(" dict1: %s \n" % str(dict1["meta"][str(keys)]))
|
||||
S.append(" dict2: %s \n" % str(dict2["meta"][str(keys)]))
|
||||
|
||||
# compare Measurements
|
||||
S.append(
|
||||
"Number of measurements in %s = %s \n"
|
||||
% (dict1["meta"]["original_filename"], len(dict1["scan"]))
|
||||
)
|
||||
S.append(
|
||||
"Number of measurements in %s = %s \n"
|
||||
% (dict2["meta"]["original_filename"], len(dict2["scan"]))
|
||||
)
|
||||
S.append("Different values in Measurements:\n")
|
||||
select_set = ["om", "Counts", "sigma"]
|
||||
exlude_set = ["time", "Counts", "date", "notes"]
|
||||
for keys1 in comp:
|
||||
for key2 in dict1["scan"][str(comp[str(keys1)][0])]:
|
||||
if key2 in exlude_set:
|
||||
continue
|
||||
if key2 not in select_set:
|
||||
try:
|
||||
if (
|
||||
dict1["scan"][comp[str(keys1)][0]][str(key2)]
|
||||
!= dict2["scan"][str(comp[str(keys1)][1])][str(key2)]
|
||||
):
|
||||
S.append(
|
||||
"Scan value "
|
||||
"%s"
|
||||
", with hkl %s differs in meausrements %s and %s \n"
|
||||
% (key2, keys1, comp[str(keys1)][0], comp[str(keys1)][1])
|
||||
)
|
||||
S.append(
|
||||
" dict1: %s \n"
|
||||
% str(dict1["scan"][comp[str(keys1)][0]][str(key2)])
|
||||
)
|
||||
S.append(
|
||||
" dict2: %s \n"
|
||||
% str(dict2["scan"][comp[str(keys1)][1]][str(key2)])
|
||||
)
|
||||
if key2 not in conflicts:
|
||||
conflicts[key2] = {}
|
||||
conflicts[key2]["amount"] = 1
|
||||
conflicts[key2]["scan"] = str(comp[str(keys1)])
|
||||
else:
|
||||
|
||||
conflicts[key2]["amount"] = conflicts[key2]["amount"] + 1
|
||||
conflicts[key2]["scan"] = (
|
||||
conflicts[key2]["scan"] + " " + (str(comp[str(keys1)]))
|
||||
)
|
||||
except KeyError as e:
|
||||
print("Missing keys, some files were probably merged or substracted")
|
||||
print(e.args)
|
||||
|
||||
else:
|
||||
try:
|
||||
comparison = list(dict1["scan"][comp[str(keys1)][0]][str(key2)]) == list(
|
||||
dict2["scan"][comp[str(keys1)][1]][str(key2)]
|
||||
)
|
||||
if len(list(dict1["scan"][comp[str(keys1)][0]][str(key2)])) != len(
|
||||
list(dict2["scan"][comp[str(keys1)][1]][str(key2)])
|
||||
):
|
||||
if str("different length of %s" % key2) not in warnings:
|
||||
warnings[str("different length of %s" % key2)] = list()
|
||||
warnings[str("different length of %s" % key2)].append(
|
||||
(str(comp[keys1][0]), str(comp[keys1][1]))
|
||||
)
|
||||
else:
|
||||
warnings[str("different length of %s" % key2)].append(
|
||||
(str(comp[keys1][0]), str(comp[keys1][1]))
|
||||
)
|
||||
if not comparison:
|
||||
S.append(
|
||||
"Scan value "
|
||||
"%s"
|
||||
" differs in scan %s and %s \n"
|
||||
% (key2, comp[str(keys1)][0], comp[str(keys1)][1])
|
||||
)
|
||||
S.append(
|
||||
" dict1: %s \n"
|
||||
% str(list(dict1["scan"][comp[str(keys1)][0]][str(key2)]))
|
||||
)
|
||||
S.append(
|
||||
" dict2: %s \n"
|
||||
% str(list(dict2["scan"][comp[str(keys1)][1]][str(key2)]))
|
||||
)
|
||||
if key2 not in conflicts:
|
||||
conflicts[key2] = {}
|
||||
conflicts[key2]["amount"] = 1
|
||||
conflicts[key2]["scan"] = str(comp[str(keys1)])
|
||||
else:
|
||||
conflicts[key2]["amount"] = conflicts[key2]["amount"] + 1
|
||||
conflicts[key2]["scan"] = (
|
||||
conflicts[key2]["scan"] + " " + (str(comp[str(keys1)]))
|
||||
)
|
||||
except KeyError as e:
|
||||
print("Missing keys, some files were probably merged or substracted")
|
||||
print(e.args)
|
||||
|
||||
for keys in conflicts:
|
||||
try:
|
||||
conflicts[str(keys)]["scan"] = conflicts[str(keys)]["scan"].split(" ")
|
||||
except:
|
||||
continue
|
||||
report_string = "".join(S)
|
||||
return warnings, conflicts, report_string
|
||||
|
||||
|
||||
def guess_next(dict1, dict2, comp):
|
||||
"""iterates thorough the scans and tries to decide if the scans should be
|
||||
substracted or merged"""
|
||||
threshold = 0.05
|
||||
for keys in comp:
|
||||
if (
|
||||
abs(
|
||||
(
|
||||
dict1["scan"][str(comp[keys][0])]["temperature"]
|
||||
- dict2["scan"][str(comp[keys][1])]["temperature"]
|
||||
)
|
||||
/ dict2["scan"][str(comp[keys][1])]["temperature"]
|
||||
)
|
||||
< threshold
|
||||
and abs(
|
||||
(
|
||||
dict1["scan"][str(comp[keys][0])]["mag_field"]
|
||||
- dict2["scan"][str(comp[keys][1])]["mag_field"]
|
||||
)
|
||||
/ dict2["scan"][str(comp[keys][1])]["mag_field"]
|
||||
)
|
||||
< threshold
|
||||
):
|
||||
comp[keys] = comp[keys] + tuple("m")
|
||||
else:
|
||||
comp[keys] = comp[keys] + tuple("s")
|
||||
|
||||
return comp
|
||||
|
||||
|
||||
def process_dict(dict1, dict2, comp):
|
||||
"""substracts or merges scans, guess_next function must run first """
|
||||
for keys in comp:
|
||||
if comp[keys][2] == "s":
|
||||
substract_measurement(dict1, dict2, comp[keys])
|
||||
elif comp[keys][2] == "m":
|
||||
merge(dict1, dict2, comp[keys])
|
||||
|
||||
return dict1
|
@ -5,7 +5,13 @@ from scipy.signal import savgol_filter
|
||||
|
||||
|
||||
def ccl_findpeaks(
|
||||
scan, int_threshold=0.8, prominence=50, smooth=False, window_size=7, poly_order=3
|
||||
scan,
|
||||
int_threshold=0.8,
|
||||
prominence=50,
|
||||
smooth=False,
|
||||
window_size=7,
|
||||
poly_order=3,
|
||||
variable="om",
|
||||
):
|
||||
|
||||
"""function iterates through the dictionary created by load_cclv2 and locates peaks for each scan
|
||||
@ -54,7 +60,7 @@ def ccl_findpeaks(
|
||||
prominence = 50
|
||||
print("Invalid value for prominence, select positive number, new value set to:", prominence)
|
||||
|
||||
omega = scan["om"]
|
||||
omega = scan[variable]
|
||||
counts = np.array(scan["Counts"])
|
||||
if smooth:
|
||||
itp = interp1d(omega, counts, kind="linear")
|
||||
|
@ -1,7 +1,6 @@
|
||||
import os
|
||||
import re
|
||||
from collections import defaultdict
|
||||
from decimal import Decimal
|
||||
|
||||
import numpy as np
|
||||
|
||||
@ -20,6 +19,7 @@ META_VARS_STR = (
|
||||
"proposal_email",
|
||||
"detectorDistance",
|
||||
)
|
||||
|
||||
META_VARS_FLOAT = (
|
||||
"omega",
|
||||
"mf",
|
||||
@ -58,30 +58,29 @@ META_VARS_FLOAT = (
|
||||
META_UB_MATRIX = ("ub1j", "ub2j", "ub3j")
|
||||
|
||||
CCL_FIRST_LINE = (
|
||||
# the first element is `scan_number`, which we don't save to metadata
|
||||
("scan_number", int),
|
||||
("h_index", float),
|
||||
("k_index", float),
|
||||
("l_index", float),
|
||||
)
|
||||
|
||||
CCL_FIRST_LINE_BI = (
|
||||
*CCL_FIRST_LINE,
|
||||
("twotheta_angle", float),
|
||||
("omega_angle", float),
|
||||
("chi_angle", float),
|
||||
("phi_angle", float),
|
||||
)
|
||||
|
||||
CCL_FIRST_LINE_NB = (
|
||||
*CCL_FIRST_LINE,
|
||||
("gamma_angle", float),
|
||||
("omega_angle", float),
|
||||
("nu_angle", float),
|
||||
("unkwn_angle", float),
|
||||
)
|
||||
CCL_ANGLES = {
|
||||
"bi": (
|
||||
("twotheta_angle", float),
|
||||
("omega_angle", float),
|
||||
("chi_angle", float),
|
||||
("phi_angle", float),
|
||||
),
|
||||
"nb": (
|
||||
("gamma_angle", float),
|
||||
("omega_angle", float),
|
||||
("nu_angle", float),
|
||||
("unkwn_angle", float),
|
||||
),
|
||||
}
|
||||
|
||||
CCL_SECOND_LINE = (
|
||||
("number_of_measurements", int),
|
||||
("n_points", int),
|
||||
("angle_step", float),
|
||||
("monitor", float),
|
||||
("temperature", float),
|
||||
@ -132,50 +131,34 @@ def parse_1D(fileobj, data_type):
|
||||
# read data
|
||||
scan = {}
|
||||
if data_type == ".ccl":
|
||||
decimal = list()
|
||||
|
||||
if metadata["zebra_mode"] == "bi":
|
||||
ccl_first_line = CCL_FIRST_LINE_BI
|
||||
elif metadata["zebra_mode"] == "nb":
|
||||
ccl_first_line = CCL_FIRST_LINE_NB
|
||||
ccl_first_line = (*CCL_FIRST_LINE, *CCL_ANGLES[metadata["zebra_mode"]])
|
||||
ccl_second_line = CCL_SECOND_LINE
|
||||
|
||||
for line in fileobj:
|
||||
d = {}
|
||||
|
||||
# first line
|
||||
scan_number, *params = line.split()
|
||||
for param, (param_name, param_type) in zip(params, ccl_first_line):
|
||||
for param, (param_name, param_type) in zip(line.split(), ccl_first_line):
|
||||
d[param_name] = param_type(param)
|
||||
|
||||
decimal.append(bool(Decimal(d["h_index"]) % 1 == 0))
|
||||
decimal.append(bool(Decimal(d["k_index"]) % 1 == 0))
|
||||
decimal.append(bool(Decimal(d["l_index"]) % 1 == 0))
|
||||
|
||||
# second line
|
||||
next_line = next(fileobj)
|
||||
params = next_line.split()
|
||||
for param, (param_name, param_type) in zip(params, ccl_second_line):
|
||||
for param, (param_name, param_type) in zip(next_line.split(), ccl_second_line):
|
||||
d[param_name] = param_type(param)
|
||||
|
||||
d["om"] = np.linspace(
|
||||
d["omega_angle"] - (d["number_of_measurements"] / 2) * d["angle_step"],
|
||||
d["omega_angle"] + (d["number_of_measurements"] / 2) * d["angle_step"],
|
||||
d["number_of_measurements"],
|
||||
d["omega_angle"] - (d["n_points"] / 2) * d["angle_step"],
|
||||
d["omega_angle"] + (d["n_points"] / 2) * d["angle_step"],
|
||||
d["n_points"],
|
||||
)
|
||||
|
||||
# subsequent lines with counts
|
||||
counts = []
|
||||
while len(counts) < d["number_of_measurements"]:
|
||||
while len(counts) < d["n_points"]:
|
||||
counts.extend(map(int, next(fileobj).split()))
|
||||
d["Counts"] = counts
|
||||
|
||||
scan[int(scan_number)] = d
|
||||
|
||||
if all(decimal):
|
||||
metadata["indices"] = "hkl"
|
||||
else:
|
||||
metadata["indices"] = "real"
|
||||
scan[d["scan_number"]] = d
|
||||
|
||||
elif data_type == ".dat":
|
||||
# skip the first 2 rows, the third row contans the column names
|
||||
@ -200,9 +183,13 @@ def parse_1D(fileobj, data_type):
|
||||
print("seems hkl is not in title")
|
||||
|
||||
data_cols["temperature"] = metadata["temp"]
|
||||
data_cols["mag_field"] = metadata["mf"]
|
||||
try:
|
||||
data_cols["mag_field"] = metadata["mf"]
|
||||
except KeyError:
|
||||
print("Mag_field not present in dat file")
|
||||
|
||||
data_cols["omega_angle"] = metadata["omega"]
|
||||
data_cols["number_of_measurements"] = len(data_cols["om"])
|
||||
data_cols["n_points"] = len(data_cols["om"])
|
||||
data_cols["monitor"] = data_cols["Monitor1"][0]
|
||||
data_cols["twotheta_angle"] = metadata["2-theta"]
|
||||
data_cols["chi_angle"] = metadata["chi"]
|
||||
@ -215,7 +202,69 @@ def parse_1D(fileobj, data_type):
|
||||
print("Unknown file extention")
|
||||
|
||||
# utility information
|
||||
if all(
|
||||
s["h_index"].is_integer() and s["k_index"].is_integer() and s["l_index"].is_integer()
|
||||
for s in scan.values()
|
||||
):
|
||||
metadata["indices"] = "hkl"
|
||||
else:
|
||||
metadata["indices"] = "real"
|
||||
|
||||
metadata["data_type"] = data_type
|
||||
metadata["area_method"] = "fit"
|
||||
|
||||
return {"meta": metadata, "scan": scan}
|
||||
|
||||
|
||||
def export_comm(data, path, lorentz=False):
|
||||
"""exports data in the *.comm format
|
||||
:param lorentz: perform Lorentz correction
|
||||
:param path: path to file + name
|
||||
:arg data - data to export, is dict after peak fitting
|
||||
|
||||
"""
|
||||
zebra_mode = data["meta"]["zebra_mode"]
|
||||
if data["meta"]["indices"] == "hkl":
|
||||
extension = ".comm"
|
||||
padding = [6, 4]
|
||||
elif data["meta"]["indices"] == "real":
|
||||
extension = ".incomm"
|
||||
padding = [4, 6]
|
||||
|
||||
with open(str(path + extension), "w") as out_file:
|
||||
for key, scan in data["scan"].items():
|
||||
if "fit" not in scan:
|
||||
print("Scan skipped - no fit value for:", key)
|
||||
continue
|
||||
|
||||
scan_str = f"{key:>{padding[0]}}"
|
||||
h_str = f'{int(scan["h_index"]):{padding[1]}}'
|
||||
k_str = f'{int(scan["k_index"]):{padding[1]}}'
|
||||
l_str = f'{int(scan["l_index"]):{padding[1]}}'
|
||||
|
||||
if data["meta"]["area_method"] == "fit":
|
||||
area = scan["fit"]["fit_area"].n
|
||||
sigma_str = f'{scan["fit"]["fit_area"].s:>10.2f}'
|
||||
elif data["meta"]["area_method"] == "integ":
|
||||
area = scan["fit"]["int_area"].n
|
||||
sigma_str = f'{scan["fit"]["int_area"].s:>10.2f}'
|
||||
|
||||
# apply lorentz correction to area
|
||||
if lorentz:
|
||||
if zebra_mode == "bi":
|
||||
twotheta_angle = np.deg2rad(scan["twotheta_angle"])
|
||||
corr_factor = np.sin(twotheta_angle)
|
||||
elif zebra_mode == "nb":
|
||||
gamma_angle = np.deg2rad(scan["gamma_angle"])
|
||||
nu_angle = np.deg2rad(scan["nu_angle"])
|
||||
corr_factor = np.sin(gamma_angle) * np.cos(nu_angle)
|
||||
|
||||
area = np.abs(area * corr_factor)
|
||||
|
||||
area_str = f"{area:>10.2f}"
|
||||
|
||||
ang_str = ""
|
||||
for angle, _ in CCL_ANGLES[zebra_mode]:
|
||||
ang_str = ang_str + f"{scan[angle]:8}"
|
||||
|
||||
out_file.write(scan_str + h_str + k_str + l_str + area_str + sigma_str + ang_str + "\n")
|
@ -6,6 +6,8 @@ from bokeh.application.application import Application
|
||||
from bokeh.application.handlers import ScriptHandler
|
||||
from bokeh.server.server import Server
|
||||
|
||||
from pyzebra.app.handler import PyzebraHandler
|
||||
|
||||
logging.basicConfig(format="%(asctime)s %(message)s", level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -35,6 +37,13 @@ def main():
|
||||
help="hostname that can connect to the server websocket",
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"--anatric-path",
|
||||
type=str,
|
||||
default="/afs/psi.ch/project/sinq/rhel7/bin/anatric",
|
||||
help="path to anatric executable",
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"--args",
|
||||
nargs=argparse.REMAINDER,
|
||||
@ -46,9 +55,10 @@ def main():
|
||||
|
||||
logger.info(app_path)
|
||||
|
||||
pyzebra_handler = PyzebraHandler(args.anatric_path)
|
||||
handler = ScriptHandler(filename=app_path, argv=args.args)
|
||||
server = Server(
|
||||
{"/": Application(handler)},
|
||||
{"/": Application(pyzebra_handler, handler)},
|
||||
port=args.port,
|
||||
allow_websocket_origin=args.allow_websocket_origin,
|
||||
)
|
||||
|
@ -1,80 +0,0 @@
|
||||
import numpy as np
|
||||
|
||||
|
||||
def correction(value, lorentz=True, zebra_mode="--", ang1=0, ang2=0):
|
||||
if lorentz is False:
|
||||
return value
|
||||
else:
|
||||
if zebra_mode == "bi":
|
||||
corr_value = np.abs(value * np.sin(ang1))
|
||||
return corr_value
|
||||
elif zebra_mode == "nb":
|
||||
corr_value = np.abs(value * np.sin(ang1) * np.cos(ang2))
|
||||
return corr_value
|
||||
|
||||
|
||||
def export_comm(data, path, lorentz=False):
|
||||
"""exports data in the *.comm format
|
||||
:param lorentz: perform Lorentz correction
|
||||
:param path: path to file + name
|
||||
:arg data - data to export, is dict after peak fitting
|
||||
|
||||
"""
|
||||
zebra_mode = data["meta"]["zebra_mode"]
|
||||
align = ">"
|
||||
if data["meta"]["indices"] == "hkl":
|
||||
extension = ".comm"
|
||||
padding = [6, 4, 10, 8]
|
||||
elif data["meta"]["indices"] == "real":
|
||||
extension = ".incomm"
|
||||
padding = [4, 6, 10, 8]
|
||||
|
||||
with open(str(path + extension), "w") as out_file:
|
||||
for key, scan in data["scan"].items():
|
||||
if "fit" not in scan:
|
||||
print("Scan skipped - no fit value for:", key)
|
||||
continue
|
||||
scan_number_str = f"{key:{align}{padding[0]}}"
|
||||
h_str = f'{int(scan["h_index"]):{padding[1]}}'
|
||||
k_str = f'{int(scan["k_index"]):{padding[1]}}'
|
||||
l_str = f'{int(scan["l_index"]):{padding[1]}}'
|
||||
if data["meta"]["area_method"] == "fit":
|
||||
area = float(scan["fit"]["fit_area"].n)
|
||||
sigma_str = (
|
||||
f'{"{:8.2f}".format(float(scan["fit"]["fit_area"].s)):{align}{padding[2]}}'
|
||||
)
|
||||
elif data["meta"]["area_method"] == "integ":
|
||||
area = float(scan["fit"]["int_area"].n)
|
||||
sigma_str = (
|
||||
f'{"{:8.2f}".format(float(scan["fit"]["int_area"].s)):{align}{padding[2]}}'
|
||||
)
|
||||
|
||||
if zebra_mode == "bi":
|
||||
area = correction(area, lorentz, zebra_mode, scan["twotheta_angle"])
|
||||
int_str = f'{"{:8.2f}".format(area):{align}{padding[2]}}'
|
||||
angle_str1 = f'{scan["twotheta_angle"]:{padding[3]}}'
|
||||
angle_str2 = f'{scan["omega_angle"]:{padding[3]}}'
|
||||
angle_str3 = f'{scan["chi_angle"]:{padding[3]}}'
|
||||
angle_str4 = f'{scan["phi_angle"]:{padding[3]}}'
|
||||
elif zebra_mode == "nb":
|
||||
area = correction(area, lorentz, zebra_mode, scan["gamma_angle"], scan["nu_angle"])
|
||||
int_str = f'{"{:8.2f}".format(area):{align}{padding[2]}}'
|
||||
angle_str1 = f'{scan["gamma_angle"]:{padding[3]}}'
|
||||
angle_str2 = f'{scan["omega_angle"]:{padding[3]}}'
|
||||
angle_str3 = f'{scan["nu_angle"]:{padding[3]}}'
|
||||
angle_str4 = f'{scan["unkwn_angle"]:{padding[3]}}'
|
||||
|
||||
line = (
|
||||
scan_number_str
|
||||
+ h_str
|
||||
+ l_str
|
||||
+ k_str
|
||||
+ int_str
|
||||
+ sigma_str
|
||||
+ angle_str1
|
||||
+ angle_str2
|
||||
+ angle_str3
|
||||
+ angle_str4
|
||||
+ "\n"
|
||||
)
|
||||
out_file.write(line)
|
@ -59,15 +59,17 @@ def fitccl(
|
||||
constraints_min = [23, None, 50, 0, 0]
|
||||
constraints_min = [80, None, 1000, 0, 100]
|
||||
"""
|
||||
if "peak_indexes" not in scan:
|
||||
scan["peak_indexes"] = []
|
||||
|
||||
if len(scan["peak_indexes"]) > 1:
|
||||
# return in case of more than 1 peaks
|
||||
print("More than 1 peak, scan skipped")
|
||||
return
|
||||
|
||||
if binning is None or binning == 0 or binning == 1:
|
||||
x = list(scan["om"])
|
||||
y = list(scan["Counts"])
|
||||
y_err = list(np.sqrt(y)) if scan.get("sigma", None) is None else list(scan["sigma"])
|
||||
print(scan["peak_indexes"])
|
||||
if not scan["peak_indexes"]:
|
||||
centre = np.mean(x)
|
||||
else:
|
||||
@ -87,7 +89,6 @@ def fitccl(
|
||||
|
||||
if len(scan["peak_indexes"]) == 0:
|
||||
# Case for no peak, gaussian in centre, sigma as 20% of range
|
||||
print("No peak")
|
||||
peak_index = find_nearest(x, np.mean(x))
|
||||
guess[0] = centre if guess[0] is None else guess[0]
|
||||
guess[1] = (x[-1] - x[0]) / 5 if guess[1] is None else guess[1]
|
||||
@ -98,7 +99,6 @@ def fitccl(
|
||||
|
||||
elif len(scan["peak_indexes"]) == 1:
|
||||
# case for one peak, takse into account users guesses
|
||||
print("one peak")
|
||||
peak_height = scan["peak_heights"]
|
||||
guess[0] = centre if guess[0] is None else guess[0]
|
||||
guess[1] = 0.1 if guess[1] is None else guess[1]
|
||||
@ -128,11 +128,11 @@ def fitccl(
|
||||
("intercept", guess[4], bool(vary[4]), constraints_min[4], constraints_max[4], None, None),
|
||||
)
|
||||
# the weighted fit
|
||||
weights = [np.abs(1 / val) if val != 0 else 1 for val in y_err]
|
||||
try:
|
||||
result = mod.fit(
|
||||
y, params, weights=[np.abs(1 / val) for val in y_err], x=x, calc_covar=True,
|
||||
)
|
||||
result = mod.fit(y, params, weights=weights, x=x, calc_covar=True)
|
||||
except ValueError:
|
||||
print(f"Couldn't fit scan {scan['scan_number']}")
|
||||
return
|
||||
|
||||
if result.params["g_amp"].stderr is None:
|
||||
@ -213,9 +213,9 @@ def fitccl(
|
||||
d = {}
|
||||
for pars in result.params:
|
||||
d[str(pars)] = (result.params[str(pars)].value, result.params[str(pars)].vary)
|
||||
print(result.fit_report())
|
||||
|
||||
print((result.params["g_amp"].value - int_area.n) / result.params["g_amp"].value)
|
||||
print("Scan", scan["scan_number"])
|
||||
print(result.fit_report())
|
||||
|
||||
d["ratio"] = (result.params["g_amp"].value - int_area.n) / result.params["g_amp"].value
|
||||
d["int_area"] = int_area
|
||||
@ -224,4 +224,5 @@ def fitccl(
|
||||
d["result"] = result
|
||||
d["comps"] = comps
|
||||
d["numfit"] = [numfit_min, numfit_max]
|
||||
d["x_fit"] = x
|
||||
scan["fit"] = d
|
||||
|
167
pyzebra/fitvol3.py
Normal file
167
pyzebra/fitvol3.py
Normal file
@ -0,0 +1,167 @@
|
||||
import numpy as np
|
||||
from lmfit import Model, Parameters
|
||||
from scipy.integrate import simps
|
||||
import matplotlib.pyplot as plt
|
||||
import uncertainties as u
|
||||
from lmfit.models import GaussianModel
|
||||
from lmfit.models import VoigtModel
|
||||
from lmfit.models import PseudoVoigtModel
|
||||
|
||||
|
||||
def bin_data(array, binsize):
|
||||
if isinstance(binsize, int) and 0 < binsize < len(array):
|
||||
return [
|
||||
np.mean(array[binsize * i : binsize * i + binsize])
|
||||
for i in range(int(np.ceil(len(array) / binsize)))
|
||||
]
|
||||
else:
|
||||
print("Binsize need to be positive integer smaller than lenght of array")
|
||||
return array
|
||||
|
||||
|
||||
def create_uncertanities(y, y_err):
|
||||
# create array with uncertanities for error propagation
|
||||
combined = np.array([])
|
||||
for i in range(len(y)):
|
||||
part = u.ufloat(y[i], y_err[i])
|
||||
combined = np.append(combined, part)
|
||||
return combined
|
||||
|
||||
|
||||
def find_nearest(array, value):
|
||||
# find nearest value and return index
|
||||
array = np.asarray(array)
|
||||
idx = (np.abs(array - value)).argmin()
|
||||
return idx
|
||||
|
||||
|
||||
# predefined peak positions
|
||||
# peaks = [6.2, 8.1, 9.9, 11.5]
|
||||
peaks = [23.5, 24.5]
|
||||
# peaks = [24]
|
||||
def fitccl(scan, variable="om", peak_type="gauss", binning=None):
|
||||
|
||||
x = list(scan[variable])
|
||||
y = list(scan["Counts"])
|
||||
peak_centre = np.mean(x)
|
||||
if binning is None or binning == 0 or binning == 1:
|
||||
x = list(scan["om"])
|
||||
y = list(scan["Counts"])
|
||||
y_err = list(np.sqrt(y)) if scan.get("sigma", None) is None else list(scan["sigma"])
|
||||
print(scan["peak_indexes"])
|
||||
if not scan["peak_indexes"]:
|
||||
peak_centre = np.mean(x)
|
||||
else:
|
||||
centre = x[int(scan["peak_indexes"])]
|
||||
else:
|
||||
x = list(scan["om"])
|
||||
if not scan["peak_indexes"]:
|
||||
peak_centre = np.mean(x)
|
||||
else:
|
||||
peak_centre = x[int(scan["peak_indexes"])]
|
||||
x = bin_data(x, binning)
|
||||
y = list(scan["Counts"])
|
||||
y_err = list(np.sqrt(y)) if scan.get("sigma", None) is None else list(scan["sigma"])
|
||||
combined = bin_data(create_uncertanities(y, y_err), binning)
|
||||
y = [combined[i].n for i in range(len(combined))]
|
||||
y_err = [combined[i].s for i in range(len(combined))]
|
||||
|
||||
def background(x, slope, intercept):
|
||||
"""background"""
|
||||
return slope * (x - peak_centre) + intercept
|
||||
|
||||
def gaussian(x, center, g_sigma, amplitude):
|
||||
"""1-d gaussian: gaussian(x, amp, cen, wid)"""
|
||||
return (amplitude / (np.sqrt(2.0 * np.pi) * g_sigma)) * np.exp(
|
||||
-((x - center) ** 2) / (2 * g_sigma ** 2)
|
||||
)
|
||||
|
||||
def lorentzian(x, center, l_sigma, amplitude):
|
||||
"""1d lorentzian"""
|
||||
return (amplitude / (1 + ((1 * x - center) / l_sigma) ** 2)) / (np.pi * l_sigma)
|
||||
|
||||
def pseudoVoigt1(x, center, g_sigma, amplitude, l_sigma, fraction):
|
||||
"""PseudoVoight peak with different widths of lorenzian and gaussian"""
|
||||
return (1 - fraction) * gaussian(x, center, g_sigma, amplitude) + fraction * (
|
||||
lorentzian(x, center, l_sigma, amplitude)
|
||||
)
|
||||
|
||||
mod = Model(background)
|
||||
params = Parameters()
|
||||
params.add_many(
|
||||
("slope", 0, True, None, None, None, None), ("intercept", 0, False, None, None, None, None)
|
||||
)
|
||||
for i in range(len(peaks)):
|
||||
if peak_type == "gauss":
|
||||
mod = mod + GaussianModel(prefix="p%d_" % (i + 1))
|
||||
params.add(str("p%d_" % (i + 1) + "amplitude"), 20, True, 0, None, None)
|
||||
params.add(str("p%d_" % (i + 1) + "center"), peaks[i], True, None, None, None)
|
||||
params.add(str("p%d_" % (i + 1) + "sigma"), 0.2, True, 0, 5, None)
|
||||
elif peak_type == "voigt":
|
||||
mod = mod + VoigtModel(prefix="p%d_" % (i + 1))
|
||||
params.add(str("p%d_" % (i + 1) + "amplitude"), 20, True, 0, None, None)
|
||||
params.add(str("p%d_" % (i + 1) + "center"), peaks[i], True, None, None, None)
|
||||
params.add(str("p%d_" % (i + 1) + "sigma"), 0.2, True, 0, 3, None)
|
||||
params.add(str("p%d_" % (i + 1) + "gamma"), 0.2, True, 0, 5, None)
|
||||
elif peak_type == "pseudovoigt":
|
||||
mod = mod + PseudoVoigtModel(prefix="p%d_" % (i + 1))
|
||||
params.add(str("p%d_" % (i + 1) + "amplitude"), 20, True, 0, None, None)
|
||||
params.add(str("p%d_" % (i + 1) + "center"), peaks[i], True, None, None, None)
|
||||
params.add(str("p%d_" % (i + 1) + "sigma"), 0.2, True, 0, 5, None)
|
||||
params.add(str("p%d_" % (i + 1) + "fraction"), 0.5, True, -5, 5, None)
|
||||
elif peak_type == "pseudovoigt1":
|
||||
mod = mod + Model(pseudoVoigt1, prefix="p%d_" % (i + 1))
|
||||
params.add(str("p%d_" % (i + 1) + "amplitude"), 20, True, 0, None, None)
|
||||
params.add(str("p%d_" % (i + 1) + "center"), peaks[i], True, None, None, None)
|
||||
params.add(str("p%d_" % (i + 1) + "g_sigma"), 0.2, True, 0, 5, None)
|
||||
params.add(str("p%d_" % (i + 1) + "l_sigma"), 0.2, True, 0, 5, None)
|
||||
params.add(str("p%d_" % (i + 1) + "fraction"), 0.5, True, 0, 1, None)
|
||||
# add parameters
|
||||
|
||||
result = mod.fit(
|
||||
y, params, weights=[np.abs(1 / y_err[i]) for i in range(len(y_err))], x=x, calc_covar=True
|
||||
)
|
||||
|
||||
comps = result.eval_components()
|
||||
|
||||
reportstring = list()
|
||||
for keys in result.params:
|
||||
if result.params[keys].value is not None:
|
||||
str2 = np.around(result.params[keys].value, 3)
|
||||
else:
|
||||
str2 = 0
|
||||
if result.params[keys].stderr is not None:
|
||||
str3 = np.around(result.params[keys].stderr, 3)
|
||||
else:
|
||||
str3 = 0
|
||||
reportstring.append("%s = %2.3f +/- %2.3f" % (keys, str2, str3))
|
||||
|
||||
reportstring = "\n".join(reportstring)
|
||||
|
||||
plt.figure(figsize=(20, 10))
|
||||
plt.plot(x, result.best_fit, "k-", label="Best fit")
|
||||
|
||||
plt.plot(x, y, "b-", label="Original data")
|
||||
plt.plot(x, comps["background"], "g--", label="Line component")
|
||||
for i in range(len(peaks)):
|
||||
plt.plot(
|
||||
x,
|
||||
comps[str("p%d_" % (i + 1))],
|
||||
"r--",
|
||||
)
|
||||
plt.fill_between(x, comps[str("p%d_" % (i + 1))], alpha=0.4, label=str("p%d_" % (i + 1)))
|
||||
plt.legend()
|
||||
plt.text(
|
||||
np.min(x),
|
||||
np.max(y),
|
||||
reportstring,
|
||||
fontsize=9,
|
||||
verticalalignment="top",
|
||||
)
|
||||
plt.title(str(peak_type))
|
||||
|
||||
plt.xlabel("Omega [deg]")
|
||||
plt.ylabel("Counts [a.u.]")
|
||||
plt.show()
|
||||
|
||||
print(result.fit_report())
|
@ -60,7 +60,12 @@ def read_detector_data(filepath):
|
||||
det_data["chi_angle"] = h5f["/entry1/sample/chi"][:] # ch
|
||||
det_data["phi_angle"] = h5f["/entry1/sample/phi"][:] # ph
|
||||
det_data["UB"] = h5f["/entry1/sample/UB"][:].reshape(3, 3)
|
||||
det_data["magnetic_field"] = h5f["/entry1/sample/magnetic_field"][:]
|
||||
det_data["temperature"] = h5f["/entry1/sample/temperature"][:]
|
||||
|
||||
# optional parameters
|
||||
if "/entry1/sample/magnetic_field" in h5f:
|
||||
det_data["magnetic_field"] = h5f["/entry1/sample/magnetic_field"][:]
|
||||
|
||||
if "/entry1/sample/temperature" in h5f:
|
||||
det_data["temperature"] = h5f["/entry1/sample/temperature"][:]
|
||||
|
||||
return det_data
|
||||
|
@ -1,12 +1,24 @@
|
||||
from load_1D import load_1D
|
||||
from ccl_dict_operation import add_dict
|
||||
import pandas as pd
|
||||
from mpl_toolkits.mplot3d import Axes3D # dont delete, otherwise waterfall wont work
|
||||
import matplotlib.pyplot as plt
|
||||
import matplotlib as mpl
|
||||
import numpy as np
|
||||
import pickle
|
||||
|
||||
import matplotlib as mpl
|
||||
import matplotlib.pyplot as plt
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
import scipy.io as sio
|
||||
import uncertainties as u
|
||||
from mpl_toolkits.mplot3d import Axes3D # dont delete, otherwise waterfall wont work
|
||||
import collections
|
||||
|
||||
from .ccl_io import load_1D
|
||||
|
||||
def create_tuples(x, y, y_err):
|
||||
"""creates tuples for sorting and merginng of the data
|
||||
Counts need to be normalized to monitor before"""
|
||||
t = list()
|
||||
for i in range(len(x)):
|
||||
tup = (x[i], y[i], y_err[i])
|
||||
t.append(tup)
|
||||
return t
|
||||
|
||||
|
||||
def load_dats(filepath):
|
||||
@ -37,45 +49,45 @@ def load_dats(filepath):
|
||||
if data_type == "txt":
|
||||
dict1 = add_dict(dict1, load_1D(file_list[i][0]))
|
||||
else:
|
||||
|
||||
dict1 = add_dict(dict1, load_1D(file_list[i]))
|
||||
dict1["scan"][i + 1]["params"] = {}
|
||||
if data_type == "txt":
|
||||
for x in range(len(col_names) - 1):
|
||||
dict1["scan"][i + 1]["params"][col_names[x + 1]] = file_list[i][x + 1]
|
||||
|
||||
dict1["scan"][i + 1]["params"][col_names[x + 1]] = float(file_list[i][x + 1])
|
||||
return dict1
|
||||
|
||||
|
||||
def create_dataframe(dict1):
|
||||
def create_dataframe(dict1, variables):
|
||||
"""Creates pandas dataframe from the dictionary
|
||||
:arg ccl like dictionary
|
||||
:return pandas dataframe"""
|
||||
# create dictionary to which we pull only wanted items before transforming it to pd.dataframe
|
||||
pull_dict = {}
|
||||
pull_dict["filenames"] = list()
|
||||
for key in dict1["scan"][1]["params"]:
|
||||
pull_dict[key] = list()
|
||||
pull_dict["temperature"] = list()
|
||||
pull_dict["mag_field"] = list()
|
||||
for keys in variables:
|
||||
for item in variables[keys]:
|
||||
pull_dict[item] = list()
|
||||
pull_dict["fit_area"] = list()
|
||||
pull_dict["int_area"] = list()
|
||||
pull_dict["om"] = list()
|
||||
pull_dict["Counts"] = list()
|
||||
|
||||
for keys in pull_dict:
|
||||
print(keys)
|
||||
|
||||
# populate the dict
|
||||
for keys in dict1["scan"]:
|
||||
if "file_of_origin" in dict1["scan"][keys]:
|
||||
pull_dict["filenames"].append(dict1["scan"][keys]["file_of_origin"].split("/")[-1])
|
||||
else:
|
||||
pull_dict["filenames"].append(dict1["meta"]["original_filename"].split("/")[-1])
|
||||
for key in dict1["scan"][keys]["params"]:
|
||||
pull_dict[str(key)].append(float(dict1["scan"][keys]["params"][key]))
|
||||
pull_dict["temperature"].append(dict1["scan"][keys]["temperature"])
|
||||
pull_dict["mag_field"].append(dict1["scan"][keys]["mag_field"])
|
||||
|
||||
pull_dict["fit_area"].append(dict1["scan"][keys]["fit"]["fit_area"])
|
||||
pull_dict["int_area"].append(dict1["scan"][keys]["fit"]["int_area"])
|
||||
pull_dict["om"].append(dict1["scan"][keys]["om"])
|
||||
pull_dict["Counts"].append(dict1["scan"][keys]["Counts"])
|
||||
for key in variables:
|
||||
for i in variables[key]:
|
||||
pull_dict[i].append(_finditem(dict1["scan"][keys], i))
|
||||
|
||||
return pd.DataFrame(data=pull_dict)
|
||||
|
||||
@ -144,7 +156,7 @@ def make_graph(data, sorting_parameter, style):
|
||||
|
||||
|
||||
def save_dict(obj, name):
|
||||
""" saves dictionary as pickle file in binary format
|
||||
"""saves dictionary as pickle file in binary format
|
||||
:arg obj - object to save
|
||||
:arg name - name of the file
|
||||
NOTE: path should be added later"""
|
||||
@ -200,3 +212,277 @@ def save_table(data, filetype, name, path=None):
|
||||
hdf.close()
|
||||
if filetype == "json":
|
||||
data.to_json((path + name + ".json"))
|
||||
|
||||
|
||||
def normalize(scan, monitor):
|
||||
"""Normalizes the measurement to monitor, checks if sigma exists, otherwise creates it
|
||||
:arg dict : dictionary to from which to tkae the scan
|
||||
:arg key : which scan to normalize from dict1
|
||||
:arg monitor : final monitor
|
||||
:return counts - normalized counts
|
||||
:return sigma - normalized sigma"""
|
||||
|
||||
counts = np.array(scan["Counts"])
|
||||
sigma = np.sqrt(counts) if "sigma" not in scan else scan["sigma"]
|
||||
monitor_ratio = monitor / scan["monitor"]
|
||||
scaled_counts = counts * monitor_ratio
|
||||
scaled_sigma = np.array(sigma) * monitor_ratio
|
||||
|
||||
return scaled_counts, scaled_sigma
|
||||
|
||||
|
||||
def merge(scan1, scan2, keep=True, monitor=100000):
|
||||
"""merges the two tuples and sorts them, if om value is same, Counts value is average
|
||||
averaging is propagated into sigma if dict1 == dict2, key[1] is deleted after merging
|
||||
:arg dict1 : dictionary to which measurement will be merged
|
||||
:arg dict2 : dictionary from which measurement will be merged
|
||||
:arg scand_dict_result : result of scan_dict after auto function
|
||||
:arg keep : if true, when monitors are same, does not change it, if flase, takes monitor
|
||||
always
|
||||
:arg monitor : final monitor after merging
|
||||
note: dict1 and dict2 can be same dict
|
||||
:return dict1 with merged scan"""
|
||||
|
||||
if keep:
|
||||
if scan1["monitor"] == scan2["monitor"]:
|
||||
monitor = scan1["monitor"]
|
||||
|
||||
# load om and Counts
|
||||
x1, x2 = scan1["om"], scan2["om"]
|
||||
cor_y1, y_err1 = normalize(scan1, monitor=monitor)
|
||||
cor_y2, y_err2 = normalize(scan2, monitor=monitor)
|
||||
# creates touples (om, Counts, sigma) for sorting and further processing
|
||||
tuple_list = create_tuples(x1, cor_y1, y_err1) + create_tuples(x2, cor_y2, y_err2)
|
||||
# Sort the list on om and add 0 0 0 tuple to the last position
|
||||
sorted_t = sorted(tuple_list, key=lambda tup: tup[0])
|
||||
sorted_t.append((0, 0, 0))
|
||||
om, Counts, sigma = [], [], []
|
||||
seen = list()
|
||||
for i in range(len(sorted_t) - 1):
|
||||
if sorted_t[i][0] not in seen:
|
||||
if sorted_t[i][0] != sorted_t[i + 1][0]:
|
||||
om = np.append(om, sorted_t[i][0])
|
||||
Counts = np.append(Counts, sorted_t[i][1])
|
||||
sigma = np.append(sigma, sorted_t[i][2])
|
||||
else:
|
||||
om = np.append(om, sorted_t[i][0])
|
||||
counts1, counts2 = sorted_t[i][1], sorted_t[i + 1][1]
|
||||
sigma1, sigma2 = sorted_t[i][2], sorted_t[i + 1][2]
|
||||
count_err1 = u.ufloat(counts1, sigma1)
|
||||
count_err2 = u.ufloat(counts2, sigma2)
|
||||
avg = (count_err1 + count_err2) / 2
|
||||
Counts = np.append(Counts, avg.n)
|
||||
sigma = np.append(sigma, avg.s)
|
||||
seen.append(sorted_t[i][0])
|
||||
else:
|
||||
continue
|
||||
scan1["om"] = om
|
||||
scan1["Counts"] = Counts
|
||||
scan1["sigma"] = sigma
|
||||
scan1["monitor"] = monitor
|
||||
print("merging done")
|
||||
|
||||
|
||||
def add_dict(dict1, dict2):
|
||||
"""adds two dictionaries, meta of the new is saved as meata+original_filename and
|
||||
measurements are shifted to continue with numbering of first dict
|
||||
:arg dict1 : dictionarry to add to
|
||||
:arg dict2 : dictionarry from which to take the measurements
|
||||
:return dict1 : combined dictionary
|
||||
Note: dict1 must be made from ccl, otherwise we would have to change the structure of loaded
|
||||
dat file"""
|
||||
try:
|
||||
if dict1["meta"]["zebra_mode"] != dict2["meta"]["zebra_mode"]:
|
||||
print("You are trying to add scans measured with different zebra modes")
|
||||
return
|
||||
# this is for the qscan case
|
||||
except KeyError:
|
||||
print("Zebra mode not specified")
|
||||
max_measurement_dict1 = max([keys for keys in dict1["scan"]])
|
||||
new_filenames = np.arange(
|
||||
max_measurement_dict1 + 1, max_measurement_dict1 + 1 + len(dict2["scan"])
|
||||
)
|
||||
new_meta_name = "meta" + str(dict2["meta"]["original_filename"])
|
||||
if new_meta_name not in dict1:
|
||||
for keys, name in zip(dict2["scan"], new_filenames):
|
||||
dict2["scan"][keys]["file_of_origin"] = str(dict2["meta"]["original_filename"])
|
||||
dict1["scan"][name] = dict2["scan"][keys]
|
||||
|
||||
dict1[new_meta_name] = dict2["meta"]
|
||||
else:
|
||||
raise KeyError(
|
||||
str(
|
||||
"The file %s has alredy been added to %s"
|
||||
% (dict2["meta"]["original_filename"], dict1["meta"]["original_filename"])
|
||||
)
|
||||
)
|
||||
return dict1
|
||||
|
||||
|
||||
def auto(dict):
|
||||
"""takes just unique tuples from all tuples in dictionary returend by scan_dict
|
||||
intendet for automatic merge if you doesent want to specify what scans to merge together
|
||||
args: dict - dictionary from scan_dict function
|
||||
:return dict - dict without repetitions"""
|
||||
for keys in dict:
|
||||
tuple_list = dict[keys]
|
||||
new = list()
|
||||
for i in range(len(tuple_list)):
|
||||
if tuple_list[0][0] == tuple_list[i][0]:
|
||||
new.append(tuple_list[i])
|
||||
dict[keys] = new
|
||||
return dict
|
||||
|
||||
|
||||
def scan_dict(dict, precision=0.5):
|
||||
"""scans dictionary for duplicate angles indexes
|
||||
:arg dict : dictionary to scan
|
||||
:arg precision : in deg, sometimes angles are zero so its easier this way, instead of
|
||||
checking zero division
|
||||
:return dictionary with matching scans, if there are none, the dict is empty
|
||||
note: can be checked by "not d", true if empty
|
||||
"""
|
||||
|
||||
if dict["meta"]["zebra_mode"] == "bi":
|
||||
angles = ["twotheta_angle", "omega_angle", "chi_angle", "phi_angle"]
|
||||
elif dict["meta"]["zebra_mode"] == "nb":
|
||||
angles = ["gamma_angle", "omega_angle", "nu_angle"]
|
||||
else:
|
||||
print("Unknown zebra mode")
|
||||
return
|
||||
|
||||
d = {}
|
||||
for i in dict["scan"]:
|
||||
for j in dict["scan"]:
|
||||
if dict["scan"][i] != dict["scan"][j]:
|
||||
itup = list()
|
||||
for k in angles:
|
||||
itup.append(abs(abs(dict["scan"][i][k]) - abs(dict["scan"][j][k])))
|
||||
|
||||
if all(i <= precision for i in itup):
|
||||
print(itup)
|
||||
print([dict["scan"][i][k] for k in angles])
|
||||
print([dict["scan"][j][k] for k in angles])
|
||||
if str([np.around(dict["scan"][i][k], 0) for k in angles]) not in d:
|
||||
d[str([np.around(dict["scan"][i][k], 0) for k in angles])] = list()
|
||||
d[str([np.around(dict["scan"][i][k], 0) for k in angles])].append((i, j))
|
||||
else:
|
||||
d[str([np.around(dict["scan"][i][k], 0) for k in angles])].append((i, j))
|
||||
|
||||
else:
|
||||
pass
|
||||
|
||||
else:
|
||||
continue
|
||||
|
||||
return d
|
||||
|
||||
|
||||
def _finditem(obj, key):
|
||||
if key in obj:
|
||||
return obj[key]
|
||||
for k, v in obj.items():
|
||||
if isinstance(v, dict):
|
||||
item = _finditem(v, key)
|
||||
if item is not None:
|
||||
return item
|
||||
|
||||
|
||||
def most_common(lst):
|
||||
return max(set(lst), key=lst.count)
|
||||
|
||||
|
||||
def variables(dictionary):
|
||||
"""Funcrion to guess what variables will be used in the param study
|
||||
i call pripary variable the one the array like variable, usually omega
|
||||
and secondary the slicing variable, different for each scan,for example temperature"""
|
||||
# find all variables that are in all scans
|
||||
stdev_precision = 0.05
|
||||
all_vars = list()
|
||||
for keys in dictionary["scan"]:
|
||||
all_vars.append([key for key in dictionary["scan"][keys] if key != "params"])
|
||||
if dictionary["scan"][keys]["params"]:
|
||||
all_vars.append(key for key in dictionary["scan"][keys]["params"])
|
||||
|
||||
all_vars = [i for sublist in all_vars for i in sublist]
|
||||
# get the ones that are in all scans
|
||||
b = collections.Counter(all_vars)
|
||||
inall = [key for key in b if b[key] == len(dictionary["scan"])]
|
||||
# delete those that are obviously wrong
|
||||
wrong = [
|
||||
"NP",
|
||||
"Counts",
|
||||
"Monitor1",
|
||||
"Monitor2",
|
||||
"Monitor3",
|
||||
"h_index",
|
||||
"l_index",
|
||||
"k_index",
|
||||
"n_points",
|
||||
"monitor",
|
||||
"Time",
|
||||
"omega_angle",
|
||||
"twotheta_angle",
|
||||
"chi_angle",
|
||||
"phi_angle",
|
||||
"nu_angle",
|
||||
]
|
||||
inall_red = [i for i in inall if i not in wrong]
|
||||
|
||||
# check for primary variable, needs to be list, we dont suspect the
|
||||
# primary variable be as a parameter (be in scan[params])
|
||||
primary_candidates = list()
|
||||
for key in dictionary["scan"]:
|
||||
for i in inall_red:
|
||||
if isinstance(_finditem(dictionary["scan"][key], i), list):
|
||||
if np.std(_finditem(dictionary["scan"][key], i)) > stdev_precision:
|
||||
primary_candidates.append(i)
|
||||
# check which of the primary are in every scan
|
||||
primary_candidates = collections.Counter(primary_candidates)
|
||||
second_round_primary_candidates = [
|
||||
key for key in primary_candidates if primary_candidates[key] == len(dictionary["scan"])
|
||||
]
|
||||
|
||||
if len(second_round_primary_candidates) == 1:
|
||||
print("We've got a primary winner!", second_round_primary_candidates)
|
||||
else:
|
||||
print("Still not sure with primary:(", second_round_primary_candidates)
|
||||
|
||||
# check for secondary variable, we suspect a float\int or not changing array
|
||||
# we dont need to check for primary ones
|
||||
secondary_candidates = [i for i in inall_red if i not in second_round_primary_candidates]
|
||||
# print("secondary candidates", secondary_candidates)
|
||||
# select arrays and floats and ints
|
||||
second_round_secondary_candidates = list()
|
||||
for key in dictionary["scan"]:
|
||||
for i in secondary_candidates:
|
||||
if isinstance(_finditem(dictionary["scan"][key], i), float):
|
||||
second_round_secondary_candidates.append(i)
|
||||
elif isinstance(_finditem(dictionary["scan"][key], i), int):
|
||||
second_round_secondary_candidates.append(i)
|
||||
elif isinstance(_finditem(dictionary["scan"][key], i), list):
|
||||
if np.std(_finditem(dictionary["scan"][key], i)) < stdev_precision:
|
||||
second_round_secondary_candidates.append(i)
|
||||
|
||||
second_round_secondary_candidates = collections.Counter(second_round_secondary_candidates)
|
||||
second_round_secondary_candidates = [
|
||||
key
|
||||
for key in second_round_secondary_candidates
|
||||
if second_round_secondary_candidates[key] == len(dictionary["scan"])
|
||||
]
|
||||
# print("secondary candidates after second round", second_round_secondary_candidates)
|
||||
# now we check if they vary between the scans
|
||||
third_round_sec_candidates = list()
|
||||
for i in second_round_secondary_candidates:
|
||||
check_array = list()
|
||||
for keys in dictionary["scan"]:
|
||||
check_array.append(np.average(_finditem(dictionary["scan"][keys], i)))
|
||||
# print(i, check_array, np.std(check_array))
|
||||
if np.std(check_array) > stdev_precision:
|
||||
third_round_sec_candidates.append(i)
|
||||
if len(third_round_sec_candidates) == 1:
|
||||
print("We've got a secondary winner!", third_round_sec_candidates)
|
||||
else:
|
||||
print("Still not sure with secondary :(", third_round_sec_candidates)
|
||||
|
||||
return {"primary": second_round_primary_candidates, "secondary": third_round_sec_candidates}
|
||||
|
Reference in New Issue
Block a user