30 Commits
0.1.1 ... 0.1.2

Author SHA1 Message Date
b2fc2d604a Updating for version 0.1.2 2020-11-05 17:27:42 +01:00
78096efcef Simplify ccl_io functions 2020-11-05 17:23:07 +01:00
5b0f97959e Simplify lorentz correction calculation 2020-11-05 15:48:02 +01:00
8fb1c5f247 Fix lorentz correction 2020-11-05 15:00:07 +01:00
58641ab94f Fix #17 2020-11-05 14:55:25 +01:00
a6bcb8ffa1 Strip proposal number string 2020-11-02 16:28:17 +01:00
7b6e6bf396 Update prints in fitting functions 2020-11-02 16:20:02 +01:00
45f295fcf8 Add pyzebra handler
* allow user to specify anatric path
2020-11-02 15:41:15 +01:00
3c58fd2102 Add Lorentz Correction toggle 2020-11-02 13:54:38 +01:00
abbaded278 Allow direct edit of export flag in scan_table 2020-11-02 12:07:24 +01:00
fbe992c901 Deduplicate database initialization code 2020-11-02 10:50:07 +01:00
dec282d1b7 Simplify peakfind and fit params handling 2020-11-02 10:31:28 +01:00
4429823629 Simplify scan selection updates 2020-11-02 10:15:47 +01:00
80fddb514a Avoid selection of multiple indicies 2020-11-02 09:40:48 +01:00
cfe9832c1e Allow projection colormap values to be floats 2020-10-30 16:23:00 +01:00
fd942672df Rename number_of_measurements -> n_points 2020-10-30 15:58:47 +01:00
60cb733ca7 Remove ccl_dict_operations.py 2020-10-30 15:58:47 +01:00
7c2ecef56d Fix #16
Added a line that in case of not running the peak finder prior to the fit creates empty list of peak positions, ergo it behaves like no peak scenario
2020-10-30 15:53:25 +01:00
468f33e606 Update ccl_io.py
Added try except for mag_field, since some of the data dont have this value and script fails.
2020-10-30 14:10:01 +01:00
dbc643aba9 Update param_study_moduls.py
Updated the create_dataframe and added function called variables, which tries to decide which variables to plot in parametric study and q scans. Works good for primary variable (usually om), and reduces the secondary (slice variable, temperature, mag.field,...) variables to a few candidates from which one has to be picked. In one set for param study, it identified all parameters correctly, in q scan, the temperature varied as well as H index, so technically both could be used, but only one makes sense and that will have to be picked by user.
2020-10-30 11:45:24 +01:00
0856705024 Update ccl_findpeaks.py
Added one more parameter "variable", so we can use the function also for other scans than omega, will be necessary in param study and should not influence the ccl integration hopefully.
2020-10-30 11:30:37 +01:00
ce608f1b49 Allow manual data removal from export 2020-10-27 14:48:58 +01:00
3eaf54eda3 Combine comm_export and load_1D modules 2020-10-27 13:24:31 +01:00
a496267a9d Simplify check for hkl/real indices 2020-10-27 11:55:56 +01:00
1a3ebfbcbd Keep scan_number as a part of scan dict 2020-10-27 11:22:06 +01:00
7bcb23c1bd Temporary bug fix
Rest in the email.
2020-10-26 21:05:27 +01:00
b28fe39bbb Introduce experimental merge of 2 datasets 2020-10-26 16:47:28 +01:00
42c6e6b921 Fix imports and indentation 2020-10-26 16:37:03 +01:00
dba2dc6149 Add bin size widget 2020-10-26 15:54:49 +01:00
a0c9b0162b Add pan/zoom tools 2020-10-26 15:22:52 +01:00
14 changed files with 510 additions and 895 deletions

View File

@ -1,10 +1,9 @@
import pyzebra.ccl_dict_operation
from pyzebra.anatric import *
from pyzebra.ccl_findpeaks import ccl_findpeaks
from pyzebra.comm_export import export_comm
from pyzebra.fit2 import fitccl
from pyzebra.h5 import *
from pyzebra.load_1D import load_1D, parse_1D
from pyzebra.ccl_io import load_1D, parse_1D, export_comm
from pyzebra.param_study_moduls import add_dict, auto, merge, scan_dict
from pyzebra.xtal import *
__version__ = "0.1.1"
__version__ = "0.1.2"

View File

@ -2,7 +2,6 @@ import subprocess
import xml.etree.ElementTree as ET
ANATRIC_PATH = "/afs/psi.ch/project/sinq/rhel7/bin/anatric"
DATA_FACTORY_IMPLEMENTATION = [
"trics",
"morph",
@ -24,8 +23,8 @@ REFLECTION_PRINTER_FORMATS = [
ALGORITHMS = ["adaptivemaxcog", "adaptivedynamic"]
def anatric(config_file):
subprocess.run([ANATRIC_PATH, config_file], check=True)
def anatric(config_file, anatric_path="/afs/psi.ch/project/sinq/rhel7/bin/anatric"):
subprocess.run([anatric_path, config_file], check=True)
class AnatricConfig:

View File

@ -1,4 +1,3 @@
import argparse
import logging
import sys
from io import StringIO
@ -11,14 +10,8 @@ import panel_ccl_integrate
import panel_hdf_anatric
import panel_hdf_viewer
parser = argparse.ArgumentParser(
prog="pyzebra", formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
args = parser.parse_args()
doc = curdoc()
doc.title = "pyzebra"
sys.stdout = StringIO()
stdout_textareainput = TextAreaInput(title="print output:", height=150)
@ -26,7 +19,7 @@ stdout_textareainput = TextAreaInput(title="print output:", height=150)
bokeh_stream = StringIO()
bokeh_handler = logging.StreamHandler(bokeh_stream)
bokeh_handler.setFormatter(logging.Formatter(logging.BASIC_FORMAT))
bokeh_logger = logging.getLogger('bokeh')
bokeh_logger = logging.getLogger("bokeh")
bokeh_logger.addHandler(bokeh_handler)
bokeh_log_textareainput = TextAreaInput(title="server output:", height=150)

30
pyzebra/app/handler.py Normal file
View File

@ -0,0 +1,30 @@
from bokeh.application.handlers import Handler
class PyzebraHandler(Handler):
"""Provides a mechanism for generic bokeh applications to build up new streamvis documents.
"""
def __init__(self, anatric_path):
"""Initialize a pyzebra handler for bokeh applications.
Args:
args (Namespace): Command line parsed arguments.
"""
super().__init__() # no-op
self.anatric_path = anatric_path
def modify_document(self, doc):
"""Modify an application document with pyzebra specific features.
Args:
doc (Document) : A bokeh Document to update in-place
Returns:
Document
"""
doc.title = "pyzebra"
doc.anatric_path = self.anatric_path
return doc

View File

@ -2,6 +2,7 @@ import base64
import io
import os
import tempfile
from copy import deepcopy
import numpy as np
from bokeh.layouts import column, row
@ -9,6 +10,7 @@ from bokeh.models import (
Asterisk,
BasicTicker,
Button,
CheckboxEditor,
ColumnDataSource,
CustomJS,
DataRange1d,
@ -19,8 +21,10 @@ from bokeh.models import (
Line,
LinearAxis,
Panel,
PanTool,
Plot,
RadioButtonGroup,
ResetTool,
Scatter,
Select,
Spacer,
@ -30,6 +34,7 @@ from bokeh.models import (
TextAreaInput,
TextInput,
Toggle,
WheelZoomTool,
Whisker,
)
@ -60,7 +65,7 @@ def create():
js_data = ColumnDataSource(data=dict(cont=[], ext=[]))
def proposal_textinput_callback(_attr, _old, new):
ccl_path = os.path.join(PROPOSAL_PATH, new)
ccl_path = os.path.join(PROPOSAL_PATH, new.strip())
ccl_file_list = []
for file in os.listdir(ccl_path):
if file.endswith(".ccl"):
@ -71,23 +76,30 @@ def create():
proposal_textinput = TextInput(title="Enter proposal number:", default_size=145)
proposal_textinput.on_change("value", proposal_textinput_callback)
def ccl_file_select_callback(_attr, _old, new):
nonlocal det_data
with open(new) as file:
_, ext = os.path.splitext(new)
det_data = pyzebra.parse_1D(file, ext)
def _init_datatable():
scan_list = list(det_data["scan"].keys())
hkl = [
f'{int(m["h_index"])} {int(m["k_index"])} {int(m["l_index"])}'
for m in det_data["scan"].values()
]
scan_table_source.data.update(
scan=scan_list, hkl=hkl, peaks=[0] * len(scan_list), fit=[0] * len(scan_list)
scan=scan_list,
hkl=hkl,
peaks=[0] * len(scan_list),
fit=[0] * len(scan_list),
export=[True] * len(scan_list),
)
scan_table_source.selected.indices = []
scan_table_source.selected.indices = [0]
def ccl_file_select_callback(_attr, _old, new):
nonlocal det_data
with open(new) as file:
_, ext = os.path.splitext(new)
det_data = pyzebra.parse_1D(file, ext)
_init_datatable()
ccl_file_select = Select(title="Available .ccl files")
ccl_file_select.on_change("value", ccl_file_select_callback)
@ -97,30 +109,35 @@ def create():
_, ext = os.path.splitext(upload_button.filename)
det_data = pyzebra.parse_1D(file, ext)
scan_list = list(det_data["scan"].keys())
hkl = [
f'{int(m["h_index"])} {int(m["k_index"])} {int(m["l_index"])}'
for m in det_data["scan"].values()
]
scan_table_source.data.update(
scan=scan_list, hkl=hkl, peaks=[0] * len(scan_list), fit=[0] * len(scan_list)
)
scan_table_source.selected.indices = []
scan_table_source.selected.indices = [0]
_init_datatable()
upload_button = FileInput(accept=".ccl")
upload_button.on_change("value", upload_button_callback)
def append_upload_button_callback(_attr, _old, new):
nonlocal det_data
with io.StringIO(base64.b64decode(new).decode()) as file:
_, ext = os.path.splitext(append_upload_button.filename)
append_data = pyzebra.parse_1D(file, ext)
added = pyzebra.add_dict(det_data, append_data)
scan_result = pyzebra.auto(pyzebra.scan_dict(added))
det_data = pyzebra.merge(added, added, scan_result)
_init_datatable()
append_upload_button = FileInput(accept=".ccl,.dat")
append_upload_button.on_change("value", append_upload_button_callback)
def _update_table():
num_of_peaks = [scan.get("num_of_peaks", 0) for scan in det_data["scan"].values()]
fit_ok = [(1 if "fit" in scan else 0) for scan in det_data["scan"].values()]
scan_table_source.data.update(peaks=num_of_peaks, fit=fit_ok)
def _update_plot(ind):
def _update_plot(scan):
nonlocal peak_pos_textinput_lock
peak_pos_textinput_lock = True
scan = det_data["scan"][ind]
y = scan["Counts"]
x = scan["om"]
@ -145,18 +162,17 @@ def create():
fit = scan.get("fit")
if fit is not None:
x = scan["fit"]["x_fit"]
plot_gauss_source.data.update(x=x, y=scan["fit"]["comps"]["gaussian"])
plot_bkg_source.data.update(x=x, y=scan["fit"]["comps"]["background"])
params = fit["result"].params
fit_output_textinput.value = (
"%s \n"
"Gaussian: centre = %9.4f, sigma = %9.4f, area = %9.4f \n"
"background: slope = %9.4f, intercept = %9.4f \n"
"Int. area = %9.4f +/- %9.4f \n"
"fit area = %9.4f +/- %9.4f \n"
"ratio((fit-int)/fit) = %9.4f"
% (
ind,
params["g_cen"].value,
params["g_width"].value,
params["g_amp"].value,
@ -188,13 +204,7 @@ def create():
numfit_max_span.location = None
# Main plot
plot = Plot(
x_range=DataRange1d(),
y_range=DataRange1d(),
plot_height=400,
plot_width=700,
toolbar_location=None,
)
plot = Plot(x_range=DataRange1d(), y_range=DataRange1d(), plot_height=400, plot_width=700)
plot.add_layout(LinearAxis(axis_label="Counts"), place="left")
plot.add_layout(LinearAxis(axis_label="Omega"), place="below")
@ -226,12 +236,28 @@ def create():
numfit_max_span = Span(location=None, dimension="height", line_dash="dashed")
plot.add_layout(numfit_max_span)
# Scan select
def scan_table_callback(_attr, _old, new):
if new:
_update_plot(scan_table_source.data["scan"][new[-1]])
plot.add_tools(PanTool(), WheelZoomTool(), ResetTool())
plot.toolbar.logo = None
scan_table_source = ColumnDataSource(dict(scan=[], hkl=[], peaks=[], fit=[]))
# Scan select
def scan_table_select_callback(_attr, old, new):
if not new:
# skip empty selections
return
# Avoid selection of multiple indicies (via Shift+Click or Ctrl+Click)
if len(new) > 1:
# drop selection to the previous one
scan_table_source.selected.indices = old
return
if len(old) > 1:
# skip unnecessary update caused by selection drop
return
_update_plot(det_data["scan"][scan_table_source.data["scan"][new[0]]])
scan_table_source = ColumnDataSource(dict(scan=[], hkl=[], peaks=[], fit=[], export=[]))
scan_table = DataTable(
source=scan_table_source,
columns=[
@ -239,25 +265,30 @@ def create():
TableColumn(field="hkl", title="hkl"),
TableColumn(field="peaks", title="Peaks"),
TableColumn(field="fit", title="Fit"),
TableColumn(field="export", title="Export", editor=CheckboxEditor()),
],
width=200,
width=250,
index_position=None,
editable=True,
)
scan_table_source.selected.on_change("indices", scan_table_callback)
scan_table_source.selected.on_change("indices", scan_table_select_callback)
def _get_selected_scan():
selected_index = scan_table_source.selected.indices[0]
selected_scan_id = scan_table_source.data["scan"][selected_index]
return det_data["scan"][selected_scan_id]
def peak_pos_textinput_callback(_attr, _old, new):
if new is not None and not peak_pos_textinput_lock:
sel_ind = scan_table_source.selected.indices[-1]
scan_name = scan_table_source.data["scan"][sel_ind]
scan = det_data["scan"][scan_name]
scan = _get_selected_scan()
scan["num_of_peaks"] = 1
peak_ind = (np.abs(scan["om"] - float(new))).argmin()
scan["peak_indexes"] = np.array([peak_ind], dtype=np.int64)
scan["peak_heights"] = np.array([scan["smooth_peaks"][peak_ind]])
_update_table()
_update_plot(scan_name)
_update_plot(scan)
peak_pos_textinput = TextInput(title="Peak position:", default_size=145)
peak_pos_textinput.on_change("value", peak_pos_textinput_callback)
@ -322,30 +353,8 @@ def create():
fit_output_textinput = TextAreaInput(title="Fit results:", width=450, height=400)
def peakfind_all_button_callback():
for scan in det_data["scan"].values():
pyzebra.ccl_findpeaks(
scan,
int_threshold=peak_int_ratio_spinner.value,
prominence=peak_prominence_spinner.value,
smooth=smooth_toggle.active,
window_size=window_size_spinner.value,
poly_order=poly_order_spinner.value,
)
_update_table()
sel_ind = scan_table_source.selected.indices[-1]
_update_plot(scan_table_source.data["scan"][sel_ind])
peakfind_all_button = Button(label="Peak Find All", button_type="primary", default_size=145)
peakfind_all_button.on_click(peakfind_all_button_callback)
def peakfind_button_callback():
sel_ind = scan_table_source.selected.indices[-1]
scan = scan_table_source.data["scan"][sel_ind]
pyzebra.ccl_findpeaks(
det_data["scan"][scan],
def _get_peakfind_params():
return dict(
int_threshold=peak_int_ratio_spinner.value,
prominence=peak_prominence_spinner.value,
smooth=smooth_toggle.active,
@ -353,61 +362,29 @@ def create():
poly_order=poly_order_spinner.value,
)
def peakfind_all_button_callback():
peakfind_params = _get_peakfind_params()
for scan in det_data["scan"].values():
pyzebra.ccl_findpeaks(scan, **peakfind_params)
_update_table()
_update_plot(_get_selected_scan())
peakfind_all_button = Button(label="Peak Find All", button_type="primary", default_size=145)
peakfind_all_button.on_click(peakfind_all_button_callback)
def peakfind_button_callback():
scan = _get_selected_scan()
pyzebra.ccl_findpeaks(scan, **_get_peakfind_params())
_update_table()
_update_plot(scan)
peakfind_button = Button(label="Peak Find Current", default_size=145)
peakfind_button.on_click(peakfind_button_callback)
def fit_all_button_callback():
for scan in det_data["scan"].values():
pyzebra.fitccl(
scan,
guess=[
centre_guess.value,
sigma_guess.value,
ampl_guess.value,
slope_guess.value,
offset_guess.value,
],
vary=[
centre_vary.active,
sigma_vary.active,
ampl_vary.active,
slope_vary.active,
offset_vary.active,
],
constraints_min=[
centre_min.value,
sigma_min.value,
ampl_min.value,
slope_min.value,
offset_min.value,
],
constraints_max=[
centre_max.value,
sigma_max.value,
ampl_max.value,
slope_max.value,
offset_max.value,
],
numfit_min=integ_from.value,
numfit_max=integ_to.value,
)
sel_ind = scan_table_source.selected.indices[-1]
_update_plot(scan_table_source.data["scan"][sel_ind])
_update_table()
fit_all_button = Button(label="Fit All", button_type="primary", default_size=145)
fit_all_button.on_click(fit_all_button_callback)
def fit_button_callback():
sel_ind = scan_table_source.selected.indices[-1]
scan = scan_table_source.data["scan"][sel_ind]
pyzebra.fitccl(
det_data["scan"][scan],
def _get_fit_params():
return dict(
guess=[
centre_guess.value,
sigma_guess.value,
@ -438,8 +415,25 @@ def create():
],
numfit_min=integ_from.value,
numfit_max=integ_to.value,
binning=bin_size_spinner.value,
)
def fit_all_button_callback():
fit_params = _get_fit_params()
for scan in det_data["scan"].values():
# fit_params are updated inplace within `fitccl`
pyzebra.fitccl(scan, **deepcopy(fit_params))
_update_plot(_get_selected_scan())
_update_table()
fit_all_button = Button(label="Fit All", button_type="primary", default_size=145)
fit_all_button.on_click(fit_all_button_callback)
def fit_button_callback():
scan = _get_selected_scan()
pyzebra.fitccl(scan, **_get_fit_params())
_update_plot(scan)
_update_table()
@ -454,6 +448,10 @@ def create():
)
area_method_radiobutton.on_change("active", area_method_radiobutton_callback)
bin_size_spinner = Spinner(title="Bin size:", value=1, low=1, step=1, default_size=145)
lorentz_toggle = Toggle(label="Lorentz Correction", default_size=145)
preview_output_textinput = TextAreaInput(title="Export file preview:", width=450, height=400)
def preview_output_button_callback():
@ -464,7 +462,11 @@ def create():
with tempfile.TemporaryDirectory() as temp_dir:
temp_file = temp_dir + "/temp"
pyzebra.export_comm(det_data, temp_file)
export_data = deepcopy(det_data)
for s, export in zip(scan_table_source.data["scan"], scan_table_source.data["export"]):
if not export:
del export_data["scan"][s]
pyzebra.export_comm(export_data, temp_file, lorentz=lorentz_toggle.active)
with open(f"{temp_file}{ext}") as f:
preview_output_textinput.value = f.read()
@ -480,7 +482,11 @@ def create():
with tempfile.TemporaryDirectory() as temp_dir:
temp_file = temp_dir + "/temp"
pyzebra.export_comm(det_data, temp_file)
export_data = deepcopy(det_data)
for s, export in zip(scan_table_source.data["scan"], scan_table_source.data["export"]):
if not export:
del export_data["scan"][s]
pyzebra.export_comm(export_data, temp_file, lorentz=lorentz_toggle.active)
with open(f"{temp_file}{ext}") as f:
output_content = f.read()
@ -530,6 +536,7 @@ def create():
Spacer(width=20),
column(
row(integ_from, integ_to),
row(bin_size_spinner, column(Spacer(height=19), lorentz_toggle)),
row(fitparam_reset_button, area_method_radiobutton),
row(fit_button, fit_all_button),
),
@ -538,9 +545,15 @@ def create():
export_layout = column(preview_output_textinput, row(preview_output_button, save_button))
upload_div = Div(text="Or upload .ccl file:")
append_upload_div = Div(text="append extra .ccl/.dat files:")
tab_layout = column(
row(proposal_textinput, ccl_file_select),
row(column(Spacer(height=5), upload_div), upload_button),
row(
column(Spacer(height=5), upload_div),
upload_button,
column(Spacer(height=5), append_upload_div),
append_upload_button,
),
row(scan_table, plot, Spacer(width=30), fit_output_textinput, export_layout),
row(findpeak_controls, Spacer(width=30), fitpeak_controls),
)

View File

@ -21,6 +21,7 @@ from pyzebra.anatric import DATA_FACTORY_IMPLEMENTATION, REFLECTION_PRINTER_FORM
def create():
doc = curdoc()
config = pyzebra.AnatricConfig()
def _load_config_file(file):
@ -345,7 +346,7 @@ def create():
with tempfile.TemporaryDirectory() as temp_dir:
temp_file = temp_dir + "/temp.xml"
config.save_as(temp_file)
pyzebra.anatric(temp_file)
pyzebra.anatric(temp_file, anatric_path=doc.anatric_path)
with open(config.logfile) as f_log:
output_log.value = f_log.read()
@ -404,6 +405,6 @@ def create():
with open("debug.xml") as f_config:
output_config.value = f_config.read()
curdoc().add_periodic_callback(update_config, 1000)
doc.add_periodic_callback(update_config, 1000)
return Panel(child=tab_layout, title="hdf anatric")

View File

@ -74,8 +74,8 @@ def create():
image_source.data.update(image=[current_image])
if auto_toggle.active:
im_max = int(np.max(current_image))
im_min = int(np.min(current_image))
im_min = np.min(current_image)
im_max = np.max(current_image)
display_min_spinner.value = im_min
display_max_spinner.value = im_max
@ -107,8 +107,8 @@ def create():
overview_plot_y_image_source.data.update(image=[overview_y], dw=[n_y])
if proj_auto_toggle.active:
im_max = int(max(np.max(overview_x), np.max(overview_y)))
im_min = int(min(np.min(overview_x), np.min(overview_y)))
im_min = min(np.min(overview_x), np.min(overview_y))
im_max = max(np.max(overview_x), np.max(overview_y))
proj_display_min_spinner.value = im_min
proj_display_max_spinner.value = im_max
@ -407,7 +407,6 @@ def create():
radio_button_group = RadioButtonGroup(labels=["nb", "nb_bi"], active=0)
STEP = 1
# ---- colormap auto toggle button
def auto_toggle_callback(state):
if state:
@ -446,6 +445,7 @@ def create():
display_min_spinner = Spinner(
title="Min Value:",
low=0,
high=1 - STEP,
value=0,
step=STEP,
@ -454,6 +454,7 @@ def create():
)
display_min_spinner.on_change("value", display_min_spinner_callback)
PROJ_STEP = 0.1
# ---- proj colormap auto toggle button
def proj_auto_toggle_callback(state):
if state:
@ -472,15 +473,15 @@ def create():
# ---- proj colormap display max value
def proj_display_max_spinner_callback(_attr, _old_value, new_value):
proj_display_min_spinner.high = new_value - STEP
proj_display_min_spinner.high = new_value - PROJ_STEP
overview_plot_x_image_glyph.color_mapper.high = new_value
overview_plot_y_image_glyph.color_mapper.high = new_value
proj_display_max_spinner = Spinner(
title="Max Value:",
low=0 + STEP,
low=0 + PROJ_STEP,
value=1,
step=STEP,
step=PROJ_STEP,
disabled=proj_auto_toggle.active,
default_size=80,
)
@ -488,15 +489,16 @@ def create():
# ---- proj colormap display min value
def proj_display_min_spinner_callback(_attr, _old_value, new_value):
proj_display_max_spinner.low = new_value + STEP
proj_display_max_spinner.low = new_value + PROJ_STEP
overview_plot_x_image_glyph.color_mapper.low = new_value
overview_plot_y_image_glyph.color_mapper.low = new_value
proj_display_min_spinner = Spinner(
title="Min Value:",
high=1 - STEP,
low=0,
high=1 - PROJ_STEP,
value=0,
step=STEP,
step=PROJ_STEP,
disabled=proj_auto_toggle.active,
default_size=80,
)

View File

@ -1,513 +0,0 @@
import numpy as np
import uncertainties as u
from .fit2 import create_uncertanities
def add_dict(dict1, dict2):
"""adds two dictionaries, meta of the new is saved as meata+original_filename and
measurements are shifted to continue with numbering of first dict
:arg dict1 : dictionarry to add to
:arg dict2 : dictionarry from which to take the measurements
:return dict1 : combined dictionary
Note: dict1 must be made from ccl, otherwise we would have to change the structure of loaded
dat file"""
max_measurement_dict1 = max([int(str(keys)[1:]) for keys in dict1["scan"]])
if dict2["meta"]["data_type"] == ".ccl":
new_filenames = [
"M" + str(x + max_measurement_dict1)
for x in [int(str(keys)[1:]) for keys in dict2["scan"]]
]
new_meta_name = "meta" + str(dict2["meta"]["original_filename"])
if new_meta_name not in dict1:
for keys, name in zip(dict2["scan"], new_filenames):
dict2["scan"][keys]["file_of_origin"] = str(dict2["meta"]["original_filename"])
dict1["scan"][name] = dict2["scan"][keys]
dict1[new_meta_name] = dict2["meta"]
else:
raise KeyError(
str(
"The file %s has alredy been added to %s"
% (dict2["meta"]["original_filename"], dict1["meta"]["original_filename"])
)
)
elif dict2["meta"]["data_type"] == ".dat":
d = {}
new_name = "M" + str(max_measurement_dict1 + 1)
hkl = dict2["meta"]["title"]
d["h_index"] = float(hkl.split()[-3])
d["k_index"] = float(hkl.split()[-2])
d["l_index"] = float(hkl.split()[-1])
d["number_of_measurements"] = len(dict2["scan"]["NP"])
d["om"] = dict2["scan"]["om"]
d["Counts"] = dict2["scan"]["Counts"]
d["monitor"] = dict2["scan"]["Monitor1"][0]
d["temperature"] = dict2["meta"]["temp"]
d["mag_field"] = dict2["meta"]["mf"]
d["omega_angle"] = dict2["meta"]["omega"]
dict1["scan"][new_name] = d
print(hkl.split())
for keys in d:
print(keys)
print("s")
return dict1
def auto(dict):
"""takes just unique tuples from all tuples in dictionary returend by scan_dict
intendet for automatic merge if you doesent want to specify what scans to merge together
args: dict - dictionary from scan_dict function
:return dict - dict without repetitions"""
for keys in dict:
tuple_list = dict[keys]
new = list()
for i in range(len(tuple_list)):
if tuple_list[0][0] == tuple_list[i][0]:
new.append(tuple_list[i])
dict[keys] = new
return dict
def scan_dict(dict):
"""scans dictionary for duplicate hkl indexes
:arg dict : dictionary to scan
:return dictionary with matching scans, if there are none, the dict is empty
note: can be checked by "not d", true if empty
"""
d = {}
for i in dict["scan"]:
for j in dict["scan"]:
if dict["scan"][str(i)] != dict["scan"][str(j)]:
itup = (
dict["scan"][str(i)]["h_index"],
dict["scan"][str(i)]["k_index"],
dict["scan"][str(i)]["l_index"],
)
jtup = (
dict["scan"][str(j)]["h_index"],
dict["scan"][str(j)]["k_index"],
dict["scan"][str(j)]["l_index"],
)
if itup != jtup:
pass
else:
if str(itup) not in d:
d[str(itup)] = list()
d[str(itup)].append((i, j))
else:
d[str(itup)].append((i, j))
else:
continue
return d
def compare_hkl(dict1, dict2):
"""Compares two dictionaries based on hkl indexes and return dictionary with str(h k l) as
key and tuple with keys to same scan in dict1 and dict2
:arg dict1 : first dictionary
:arg dict2 : second dictionary
:return d : dict with matches
example of one key: '0.0 0.0 -1.0 : ('M1', 'M9')' meaning that 001 hkl scan is M1 in
first dict and M9 in second"""
d = {}
dupl = 0
for keys in dict1["scan"]:
for key in dict2["scan"]:
if (
dict1["scan"][str(keys)]["h_index"] == dict2["scan"][str(key)]["h_index"]
and dict1["scan"][str(keys)]["k_index"] == dict2["scan"][str(key)]["k_index"]
and dict1["scan"][str(keys)]["l_index"] == dict2["scan"][str(key)]["l_index"]
):
if (
str(
(
str(dict1["scan"][str(keys)]["h_index"])
+ " "
+ str(dict1["scan"][str(keys)]["k_index"])
+ " "
+ str(dict1["scan"][str(keys)]["l_index"])
)
)
not in d
):
d[
str(
str(dict1["scan"][str(keys)]["h_index"])
+ " "
+ str(dict1["scan"][str(keys)]["k_index"])
+ " "
+ str(dict1["scan"][str(keys)]["l_index"])
)
] = (str(keys), str(key))
else:
dupl = dupl + 1
d[
str(
str(dict1["scan"][str(keys)]["h_index"])
+ " "
+ str(dict1["scan"][str(keys)]["k_index"])
+ " "
+ str(dict1["scan"][str(keys)]["l_index"])
+ "_dupl"
+ str(dupl)
)
] = (str(keys), str(key))
else:
continue
return d
def create_tuples(x, y, y_err):
"""creates tuples for sorting and merginng of the data
Counts need to be normalized to monitor before"""
t = list()
for i in range(len(x)):
tup = (x[i], y[i], y_err[i])
t.append(tup)
return t
def normalize(dict, key, monitor):
"""Normalizes the scan to monitor, checks if sigma exists, otherwise creates it
:arg dict : dictionary to from which to tkae the scan
:arg key : which scan to normalize from dict1
:arg monitor : final monitor
:return counts - normalized counts
:return sigma - normalized sigma"""
counts = np.array(dict["scan"][key]["Counts"])
sigma = np.sqrt(counts) if "sigma" not in dict["scan"][key] else dict["scan"][key]["sigma"]
monitor_ratio = monitor / dict["scan"][key]["monitor"]
scaled_counts = counts * monitor_ratio
scaled_sigma = np.array(sigma) * monitor_ratio
return scaled_counts, scaled_sigma
def merge(dict1, dict2, keys, auto=True, monitor=100000):
"""merges the two tuples and sorts them, if om value is same, Counts value is average
averaging is propagated into sigma if dict1 == dict2, key[1] is deleted after merging
:arg dict1 : dictionary to which scan will be merged
:arg dict2 : dictionary from which scan will be merged
:arg keys : tuple with key to dict1 and dict2
:arg auto : if true, when monitors are same, does not change it, if flase, takes monitor always
:arg monitor : final monitor after merging
note: dict1 and dict2 can be same dict
:return dict1 with merged scan"""
if auto:
if dict1["scan"][keys[0]]["monitor"] == dict2["scan"][keys[1]]["monitor"]:
monitor = dict1["scan"][keys[0]]["monitor"]
# load om and Counts
x1, x2 = dict1["scan"][keys[0]]["om"], dict2["scan"][keys[1]]["om"]
cor_y1, y_err1 = normalize(dict1, keys[0], monitor=monitor)
cor_y2, y_err2 = normalize(dict2, keys[1], monitor=monitor)
# creates touples (om, Counts, sigma) for sorting and further processing
tuple_list = create_tuples(x1, cor_y1, y_err1) + create_tuples(x2, cor_y2, y_err2)
# Sort the list on om and add 0 0 0 tuple to the last position
sorted_t = sorted(tuple_list, key=lambda tup: tup[0])
sorted_t.append((0, 0, 0))
om, Counts, sigma = [], [], []
seen = list()
for i in range(len(sorted_t) - 1):
if sorted_t[i][0] not in seen:
if sorted_t[i][0] != sorted_t[i + 1][0]:
om = np.append(om, sorted_t[i][0])
Counts = np.append(Counts, sorted_t[i][1])
sigma = np.append(sigma, sorted_t[i][2])
else:
om = np.append(om, sorted_t[i][0])
counts1, counts2 = sorted_t[i][1], sorted_t[i + 1][1]
sigma1, sigma2 = sorted_t[i][2], sorted_t[i + 1][2]
count_err1 = u.ufloat(counts1, sigma1)
count_err2 = u.ufloat(counts2, sigma2)
avg = (count_err1 + count_err2) / 2
Counts = np.append(Counts, avg.n)
sigma = np.append(sigma, avg.s)
seen.append(sorted_t[i][0])
else:
continue
if dict1 == dict2:
del dict1["scan"][keys[1]]
note = (
f"This scan was merged with scan {keys[1]} from "
f'file {dict2["meta"]["original_filename"]} \n'
)
if "notes" not in dict1["scan"][str(keys[0])]:
dict1["scan"][str(keys[0])]["notes"] = note
else:
dict1["scan"][str(keys[0])]["notes"] += note
dict1["scan"][keys[0]]["om"] = om
dict1["scan"][keys[0]]["Counts"] = Counts
dict1["scan"][keys[0]]["sigma"] = sigma
dict1["scan"][keys[0]]["monitor"] = monitor
print("merging done")
return dict1
def substract_measurement(dict1, dict2, keys, auto=True, monitor=100000):
"""Substracts two scan (scan key2 from dict2 from measurent key1 in dict1), expects om to be same
:arg dict1 : dictionary to which scan will be merged
:arg dict2 : dictionary from which scan will be merged
:arg keys : tuple with key to dict1 and dict2
:arg auto : if true, when monitors are same, does not change it, if flase, takes monitor always
:arg monitor : final monitor after merging
:returns d : dict1 with substracted Counts from dict2 and sigma that comes from the substraction"""
if len(dict1["scan"][keys[0]]["om"]) != len(dict2["scan"][keys[1]]["om"]):
raise ValueError("Omegas have different lengths, cannot be substracted")
if auto:
if dict1["scan"][keys[0]]["monitor"] == dict2["scan"][keys[1]]["monitor"]:
monitor = dict1["scan"][keys[0]]["monitor"]
cor_y1, y_err1 = normalize(dict1, keys[0], monitor=monitor)
cor_y2, y_err2 = normalize(dict2, keys[1], monitor=monitor)
dict1_count_err = create_uncertanities(cor_y1, y_err1)
dict2_count_err = create_uncertanities(cor_y2, y_err2)
res = np.subtract(dict1_count_err, dict2_count_err)
res_nom = []
res_err = []
for k in range(len(res)):
res_nom = np.append(res_nom, res[k].n)
res_err = np.append(res_err, res[k].s)
if len([num for num in res_nom if num < 0]) >= 0.3 * len(res_nom):
print(
f"Warning! percentage of negative numbers in scan subsracted {keys[0]} is "
f"{len([num for num in res_nom if num < 0]) / len(res_nom)}"
)
dict1["scan"][str(keys[0])]["Counts"] = res_nom
dict1["scan"][str(keys[0])]["sigma"] = res_err
dict1["scan"][str(keys[0])]["monitor"] = monitor
note = (
f'Scan {keys[1]} from file {dict2["meta"]["original_filename"]} '
f"was substracted from this scan \n"
)
if "notes" not in dict1["scan"][str(keys[0])]:
dict1["scan"][str(keys[0])]["notes"] = note
else:
dict1["scan"][str(keys[0])]["notes"] += note
return dict1
def compare_dict(dict1, dict2):
"""takes two ccl dictionaries and compare different values for each key
:arg dict1 : dictionary 1 (ccl)
:arg dict2 : dictionary 2 (ccl)
:returns warning : dictionary with keys from primary files (if they differ) with
information of how many scan differ and which ones differ
:returns report_string string comparing all different values respecively of measurements"""
if dict1["meta"]["data_type"] != dict2["meta"]["data_type"]:
print("select two dicts")
return
S = []
conflicts = {}
warnings = {}
comp = compare_hkl(dict1, dict2)
d1 = scan_dict(dict1)
d2 = scan_dict(dict2)
if not d1:
S.append("There are no duplicates in %s (dict1) \n" % dict1["meta"]["original_filename"])
else:
S.append(
"There are %d duplicates in %s (dict1) \n"
% (len(d1), dict1["meta"]["original_filename"])
)
warnings["Duplicates in dict1"] = list()
for keys in d1:
S.append("Measurements %s with hkl %s \n" % (d1[keys], keys))
warnings["Duplicates in dict1"].append(d1[keys])
if not d2:
S.append("There are no duplicates in %s (dict2) \n" % dict2["meta"]["original_filename"])
else:
S.append(
"There are %d duplicates in %s (dict2) \n"
% (len(d2), dict2["meta"]["original_filename"])
)
warnings["Duplicates in dict2"] = list()
for keys in d2:
S.append("Measurements %s with hkl %s \n" % (d2[keys], keys))
warnings["Duplicates in dict2"].append(d2[keys])
# compare meta
S.append("Different values in meta: \n")
different_meta = {
k: dict1["meta"][k]
for k in dict1["meta"]
if k in dict2["meta"] and dict1["meta"][k] != dict2["meta"][k]
}
exlude_meta_set = ["original_filename", "date", "title"]
for keys in different_meta:
if keys in exlude_meta_set:
continue
else:
if keys not in conflicts:
conflicts[keys] = 1
else:
conflicts[keys] = conflicts[keys] + 1
S.append(" Different values in %s \n" % str(keys))
S.append(" dict1: %s \n" % str(dict1["meta"][str(keys)]))
S.append(" dict2: %s \n" % str(dict2["meta"][str(keys)]))
# compare Measurements
S.append(
"Number of measurements in %s = %s \n"
% (dict1["meta"]["original_filename"], len(dict1["scan"]))
)
S.append(
"Number of measurements in %s = %s \n"
% (dict2["meta"]["original_filename"], len(dict2["scan"]))
)
S.append("Different values in Measurements:\n")
select_set = ["om", "Counts", "sigma"]
exlude_set = ["time", "Counts", "date", "notes"]
for keys1 in comp:
for key2 in dict1["scan"][str(comp[str(keys1)][0])]:
if key2 in exlude_set:
continue
if key2 not in select_set:
try:
if (
dict1["scan"][comp[str(keys1)][0]][str(key2)]
!= dict2["scan"][str(comp[str(keys1)][1])][str(key2)]
):
S.append(
"Scan value "
"%s"
", with hkl %s differs in meausrements %s and %s \n"
% (key2, keys1, comp[str(keys1)][0], comp[str(keys1)][1])
)
S.append(
" dict1: %s \n"
% str(dict1["scan"][comp[str(keys1)][0]][str(key2)])
)
S.append(
" dict2: %s \n"
% str(dict2["scan"][comp[str(keys1)][1]][str(key2)])
)
if key2 not in conflicts:
conflicts[key2] = {}
conflicts[key2]["amount"] = 1
conflicts[key2]["scan"] = str(comp[str(keys1)])
else:
conflicts[key2]["amount"] = conflicts[key2]["amount"] + 1
conflicts[key2]["scan"] = (
conflicts[key2]["scan"] + " " + (str(comp[str(keys1)]))
)
except KeyError as e:
print("Missing keys, some files were probably merged or substracted")
print(e.args)
else:
try:
comparison = list(dict1["scan"][comp[str(keys1)][0]][str(key2)]) == list(
dict2["scan"][comp[str(keys1)][1]][str(key2)]
)
if len(list(dict1["scan"][comp[str(keys1)][0]][str(key2)])) != len(
list(dict2["scan"][comp[str(keys1)][1]][str(key2)])
):
if str("different length of %s" % key2) not in warnings:
warnings[str("different length of %s" % key2)] = list()
warnings[str("different length of %s" % key2)].append(
(str(comp[keys1][0]), str(comp[keys1][1]))
)
else:
warnings[str("different length of %s" % key2)].append(
(str(comp[keys1][0]), str(comp[keys1][1]))
)
if not comparison:
S.append(
"Scan value "
"%s"
" differs in scan %s and %s \n"
% (key2, comp[str(keys1)][0], comp[str(keys1)][1])
)
S.append(
" dict1: %s \n"
% str(list(dict1["scan"][comp[str(keys1)][0]][str(key2)]))
)
S.append(
" dict2: %s \n"
% str(list(dict2["scan"][comp[str(keys1)][1]][str(key2)]))
)
if key2 not in conflicts:
conflicts[key2] = {}
conflicts[key2]["amount"] = 1
conflicts[key2]["scan"] = str(comp[str(keys1)])
else:
conflicts[key2]["amount"] = conflicts[key2]["amount"] + 1
conflicts[key2]["scan"] = (
conflicts[key2]["scan"] + " " + (str(comp[str(keys1)]))
)
except KeyError as e:
print("Missing keys, some files were probably merged or substracted")
print(e.args)
for keys in conflicts:
try:
conflicts[str(keys)]["scan"] = conflicts[str(keys)]["scan"].split(" ")
except:
continue
report_string = "".join(S)
return warnings, conflicts, report_string
def guess_next(dict1, dict2, comp):
"""iterates thorough the scans and tries to decide if the scans should be
substracted or merged"""
threshold = 0.05
for keys in comp:
if (
abs(
(
dict1["scan"][str(comp[keys][0])]["temperature"]
- dict2["scan"][str(comp[keys][1])]["temperature"]
)
/ dict2["scan"][str(comp[keys][1])]["temperature"]
)
< threshold
and abs(
(
dict1["scan"][str(comp[keys][0])]["mag_field"]
- dict2["scan"][str(comp[keys][1])]["mag_field"]
)
/ dict2["scan"][str(comp[keys][1])]["mag_field"]
)
< threshold
):
comp[keys] = comp[keys] + tuple("m")
else:
comp[keys] = comp[keys] + tuple("s")
return comp
def process_dict(dict1, dict2, comp):
"""substracts or merges scans, guess_next function must run first """
for keys in comp:
if comp[keys][2] == "s":
substract_measurement(dict1, dict2, comp[keys])
elif comp[keys][2] == "m":
merge(dict1, dict2, comp[keys])
return dict1

View File

@ -5,7 +5,13 @@ from scipy.signal import savgol_filter
def ccl_findpeaks(
scan, int_threshold=0.8, prominence=50, smooth=False, window_size=7, poly_order=3
scan,
int_threshold=0.8,
prominence=50,
smooth=False,
window_size=7,
poly_order=3,
variable="om",
):
"""function iterates through the dictionary created by load_cclv2 and locates peaks for each scan
@ -54,7 +60,7 @@ def ccl_findpeaks(
prominence = 50
print("Invalid value for prominence, select positive number, new value set to:", prominence)
omega = scan["om"]
omega = scan[variable]
counts = np.array(scan["Counts"])
if smooth:
itp = interp1d(omega, counts, kind="linear")

View File

@ -1,7 +1,6 @@
import os
import re
from collections import defaultdict
from decimal import Decimal
import numpy as np
@ -20,6 +19,7 @@ META_VARS_STR = (
"proposal_email",
"detectorDistance",
)
META_VARS_FLOAT = (
"omega",
"mf",
@ -58,30 +58,29 @@ META_VARS_FLOAT = (
META_UB_MATRIX = ("ub1j", "ub2j", "ub3j")
CCL_FIRST_LINE = (
# the first element is `scan_number`, which we don't save to metadata
("scan_number", int),
("h_index", float),
("k_index", float),
("l_index", float),
)
CCL_FIRST_LINE_BI = (
*CCL_FIRST_LINE,
("twotheta_angle", float),
("omega_angle", float),
("chi_angle", float),
("phi_angle", float),
)
CCL_FIRST_LINE_NB = (
*CCL_FIRST_LINE,
("gamma_angle", float),
("omega_angle", float),
("nu_angle", float),
("unkwn_angle", float),
)
CCL_ANGLES = {
"bi": (
("twotheta_angle", float),
("omega_angle", float),
("chi_angle", float),
("phi_angle", float),
),
"nb": (
("gamma_angle", float),
("omega_angle", float),
("nu_angle", float),
("unkwn_angle", float),
),
}
CCL_SECOND_LINE = (
("number_of_measurements", int),
("n_points", int),
("angle_step", float),
("monitor", float),
("temperature", float),
@ -132,50 +131,34 @@ def parse_1D(fileobj, data_type):
# read data
scan = {}
if data_type == ".ccl":
decimal = list()
if metadata["zebra_mode"] == "bi":
ccl_first_line = CCL_FIRST_LINE_BI
elif metadata["zebra_mode"] == "nb":
ccl_first_line = CCL_FIRST_LINE_NB
ccl_first_line = (*CCL_FIRST_LINE, *CCL_ANGLES[metadata["zebra_mode"]])
ccl_second_line = CCL_SECOND_LINE
for line in fileobj:
d = {}
# first line
scan_number, *params = line.split()
for param, (param_name, param_type) in zip(params, ccl_first_line):
for param, (param_name, param_type) in zip(line.split(), ccl_first_line):
d[param_name] = param_type(param)
decimal.append(bool(Decimal(d["h_index"]) % 1 == 0))
decimal.append(bool(Decimal(d["k_index"]) % 1 == 0))
decimal.append(bool(Decimal(d["l_index"]) % 1 == 0))
# second line
next_line = next(fileobj)
params = next_line.split()
for param, (param_name, param_type) in zip(params, ccl_second_line):
for param, (param_name, param_type) in zip(next_line.split(), ccl_second_line):
d[param_name] = param_type(param)
d["om"] = np.linspace(
d["omega_angle"] - (d["number_of_measurements"] / 2) * d["angle_step"],
d["omega_angle"] + (d["number_of_measurements"] / 2) * d["angle_step"],
d["number_of_measurements"],
d["omega_angle"] - (d["n_points"] / 2) * d["angle_step"],
d["omega_angle"] + (d["n_points"] / 2) * d["angle_step"],
d["n_points"],
)
# subsequent lines with counts
counts = []
while len(counts) < d["number_of_measurements"]:
while len(counts) < d["n_points"]:
counts.extend(map(int, next(fileobj).split()))
d["Counts"] = counts
scan[int(scan_number)] = d
if all(decimal):
metadata["indices"] = "hkl"
else:
metadata["indices"] = "real"
scan[d["scan_number"]] = d
elif data_type == ".dat":
# skip the first 2 rows, the third row contans the column names
@ -200,9 +183,13 @@ def parse_1D(fileobj, data_type):
print("seems hkl is not in title")
data_cols["temperature"] = metadata["temp"]
data_cols["mag_field"] = metadata["mf"]
try:
data_cols["mag_field"] = metadata["mf"]
except KeyError:
print("Mag_field not present in dat file")
data_cols["omega_angle"] = metadata["omega"]
data_cols["number_of_measurements"] = len(data_cols["om"])
data_cols["n_points"] = len(data_cols["om"])
data_cols["monitor"] = data_cols["Monitor1"][0]
data_cols["twotheta_angle"] = metadata["2-theta"]
data_cols["chi_angle"] = metadata["chi"]
@ -215,7 +202,69 @@ def parse_1D(fileobj, data_type):
print("Unknown file extention")
# utility information
if all(
s["h_index"].is_integer() and s["k_index"].is_integer() and s["l_index"].is_integer()
for s in scan.values()
):
metadata["indices"] = "hkl"
else:
metadata["indices"] = "real"
metadata["data_type"] = data_type
metadata["area_method"] = "fit"
return {"meta": metadata, "scan": scan}
def export_comm(data, path, lorentz=False):
"""exports data in the *.comm format
:param lorentz: perform Lorentz correction
:param path: path to file + name
:arg data - data to export, is dict after peak fitting
"""
zebra_mode = data["meta"]["zebra_mode"]
if data["meta"]["indices"] == "hkl":
extension = ".comm"
padding = [6, 4]
elif data["meta"]["indices"] == "real":
extension = ".incomm"
padding = [4, 6]
with open(str(path + extension), "w") as out_file:
for key, scan in data["scan"].items():
if "fit" not in scan:
print("Scan skipped - no fit value for:", key)
continue
scan_str = f"{key:>{padding[0]}}"
h_str = f'{int(scan["h_index"]):{padding[1]}}'
k_str = f'{int(scan["k_index"]):{padding[1]}}'
l_str = f'{int(scan["l_index"]):{padding[1]}}'
if data["meta"]["area_method"] == "fit":
area = scan["fit"]["fit_area"].n
sigma_str = f'{scan["fit"]["fit_area"].s:>10.2f}'
elif data["meta"]["area_method"] == "integ":
area = scan["fit"]["int_area"].n
sigma_str = f'{scan["fit"]["int_area"].s:>10.2f}'
# apply lorentz correction to area
if lorentz:
if zebra_mode == "bi":
twotheta_angle = np.deg2rad(scan["twotheta_angle"])
corr_factor = np.sin(twotheta_angle)
elif zebra_mode == "nb":
gamma_angle = np.deg2rad(scan["gamma_angle"])
nu_angle = np.deg2rad(scan["nu_angle"])
corr_factor = np.sin(gamma_angle) * np.cos(nu_angle)
area = np.abs(area * corr_factor)
area_str = f"{area:>10.2f}"
ang_str = ""
for angle, _ in CCL_ANGLES[zebra_mode]:
ang_str = ang_str + f"{scan[angle]:8}"
out_file.write(scan_str + h_str + k_str + l_str + area_str + sigma_str + ang_str + "\n")

View File

@ -6,6 +6,8 @@ from bokeh.application.application import Application
from bokeh.application.handlers import ScriptHandler
from bokeh.server.server import Server
from pyzebra.app.handler import PyzebraHandler
logging.basicConfig(format="%(asctime)s %(message)s", level=logging.INFO)
logger = logging.getLogger(__name__)
@ -35,6 +37,13 @@ def main():
help="hostname that can connect to the server websocket",
)
parser.add_argument(
"--anatric-path",
type=str,
default="/afs/psi.ch/project/sinq/rhel7/bin/anatric",
help="path to anatric executable",
)
parser.add_argument(
"--args",
nargs=argparse.REMAINDER,
@ -46,9 +55,10 @@ def main():
logger.info(app_path)
pyzebra_handler = PyzebraHandler(args.anatric_path)
handler = ScriptHandler(filename=app_path, argv=args.args)
server = Server(
{"/": Application(handler)},
{"/": Application(pyzebra_handler, handler)},
port=args.port,
allow_websocket_origin=args.allow_websocket_origin,
)

View File

@ -1,80 +0,0 @@
import numpy as np
def correction(value, lorentz=True, zebra_mode="--", ang1=0, ang2=0):
if lorentz is False:
return value
else:
if zebra_mode == "bi":
corr_value = np.abs(value * np.sin(ang1))
return corr_value
elif zebra_mode == "nb":
corr_value = np.abs(value * np.sin(ang1) * np.cos(ang2))
return corr_value
def export_comm(data, path, lorentz=False):
"""exports data in the *.comm format
:param lorentz: perform Lorentz correction
:param path: path to file + name
:arg data - data to export, is dict after peak fitting
"""
zebra_mode = data["meta"]["zebra_mode"]
align = ">"
if data["meta"]["indices"] == "hkl":
extension = ".comm"
padding = [6, 4, 10, 8]
elif data["meta"]["indices"] == "real":
extension = ".incomm"
padding = [4, 6, 10, 8]
with open(str(path + extension), "w") as out_file:
for key, scan in data["scan"].items():
if "fit" not in scan:
print("Scan skipped - no fit value for:", key)
continue
scan_number_str = f"{key:{align}{padding[0]}}"
h_str = f'{int(scan["h_index"]):{padding[1]}}'
k_str = f'{int(scan["k_index"]):{padding[1]}}'
l_str = f'{int(scan["l_index"]):{padding[1]}}'
if data["meta"]["area_method"] == "fit":
area = float(scan["fit"]["fit_area"].n)
sigma_str = (
f'{"{:8.2f}".format(float(scan["fit"]["fit_area"].s)):{align}{padding[2]}}'
)
elif data["meta"]["area_method"] == "integ":
area = float(scan["fit"]["int_area"].n)
sigma_str = (
f'{"{:8.2f}".format(float(scan["fit"]["int_area"].s)):{align}{padding[2]}}'
)
if zebra_mode == "bi":
area = correction(area, lorentz, zebra_mode, scan["twotheta_angle"])
int_str = f'{"{:8.2f}".format(area):{align}{padding[2]}}'
angle_str1 = f'{scan["twotheta_angle"]:{padding[3]}}'
angle_str2 = f'{scan["omega_angle"]:{padding[3]}}'
angle_str3 = f'{scan["chi_angle"]:{padding[3]}}'
angle_str4 = f'{scan["phi_angle"]:{padding[3]}}'
elif zebra_mode == "nb":
area = correction(area, lorentz, zebra_mode, scan["gamma_angle"], scan["nu_angle"])
int_str = f'{"{:8.2f}".format(area):{align}{padding[2]}}'
angle_str1 = f'{scan["gamma_angle"]:{padding[3]}}'
angle_str2 = f'{scan["omega_angle"]:{padding[3]}}'
angle_str3 = f'{scan["nu_angle"]:{padding[3]}}'
angle_str4 = f'{scan["unkwn_angle"]:{padding[3]}}'
line = (
scan_number_str
+ h_str
+ k_str
+ l_str
+ int_str
+ sigma_str
+ angle_str1
+ angle_str2
+ angle_str3
+ angle_str4
+ "\n"
)
out_file.write(line)

View File

@ -59,15 +59,17 @@ def fitccl(
constraints_min = [23, None, 50, 0, 0]
constraints_min = [80, None, 1000, 0, 100]
"""
if "peak_indexes" not in scan:
scan["peak_indexes"] = []
if len(scan["peak_indexes"]) > 1:
# return in case of more than 1 peaks
print("More than 1 peak, scan skipped")
return
if binning is None or binning == 0 or binning == 1:
x = list(scan["om"])
y = list(scan["Counts"])
y_err = list(np.sqrt(y)) if scan.get("sigma", None) is None else list(scan["sigma"])
print(scan["peak_indexes"])
if not scan["peak_indexes"]:
centre = np.mean(x)
else:
@ -87,7 +89,6 @@ def fitccl(
if len(scan["peak_indexes"]) == 0:
# Case for no peak, gaussian in centre, sigma as 20% of range
print("No peak")
peak_index = find_nearest(x, np.mean(x))
guess[0] = centre if guess[0] is None else guess[0]
guess[1] = (x[-1] - x[0]) / 5 if guess[1] is None else guess[1]
@ -98,7 +99,6 @@ def fitccl(
elif len(scan["peak_indexes"]) == 1:
# case for one peak, takse into account users guesses
print("one peak")
peak_height = scan["peak_heights"]
guess[0] = centre if guess[0] is None else guess[0]
guess[1] = 0.1 if guess[1] is None else guess[1]
@ -128,11 +128,11 @@ def fitccl(
("intercept", guess[4], bool(vary[4]), constraints_min[4], constraints_max[4], None, None),
)
# the weighted fit
weights = [np.abs(1 / val) if val != 0 else 1 for val in y_err]
try:
result = mod.fit(
y, params, weights=[np.abs(1 / val) for val in y_err], x=x, calc_covar=True,
)
result = mod.fit(y, params, weights=weights, x=x, calc_covar=True)
except ValueError:
print(f"Couldn't fit scan {scan['scan_number']}")
return
if result.params["g_amp"].stderr is None:
@ -213,9 +213,9 @@ def fitccl(
d = {}
for pars in result.params:
d[str(pars)] = (result.params[str(pars)].value, result.params[str(pars)].vary)
print(result.fit_report())
print((result.params["g_amp"].value - int_area.n) / result.params["g_amp"].value)
print("Scan", scan["scan_number"])
print(result.fit_report())
d["ratio"] = (result.params["g_amp"].value - int_area.n) / result.params["g_amp"].value
d["int_area"] = int_area
@ -224,4 +224,5 @@ def fitccl(
d["result"] = result
d["comps"] = comps
d["numfit"] = [numfit_min, numfit_max]
d["x_fit"] = x
scan["fit"] = d

View File

@ -1,13 +1,15 @@
from load_1D import load_1D
import pandas as pd
from mpl_toolkits.mplot3d import Axes3D # dont delete, otherwise waterfall wont work
import matplotlib.pyplot as plt
import matplotlib as mpl
import numpy as np
import pickle
import matplotlib as mpl
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import scipy.io as sio
import uncertainties as u
from mpl_toolkits.mplot3d import Axes3D # dont delete, otherwise waterfall wont work
import collections
from .ccl_io import load_1D
def create_tuples(x, y, y_err):
"""creates tuples for sorting and merginng of the data
@ -47,45 +49,45 @@ def load_dats(filepath):
if data_type == "txt":
dict1 = add_dict(dict1, load_1D(file_list[i][0]))
else:
dict1 = add_dict(dict1, load_1D(file_list[i]))
dict1["scan"][i + 1]["params"] = {}
if data_type == "txt":
for x in range(len(col_names) - 1):
dict1["scan"][i + 1]["params"][col_names[x + 1]] = file_list[i][x + 1]
dict1["scan"][i + 1]["params"][col_names[x + 1]] = float(file_list[i][x + 1])
return dict1
def create_dataframe(dict1):
def create_dataframe(dict1, variables):
"""Creates pandas dataframe from the dictionary
:arg ccl like dictionary
:return pandas dataframe"""
# create dictionary to which we pull only wanted items before transforming it to pd.dataframe
pull_dict = {}
pull_dict["filenames"] = list()
for key in dict1["scan"][1]["params"]:
pull_dict[key] = list()
pull_dict["temperature"] = list()
pull_dict["mag_field"] = list()
for keys in variables:
for item in variables[keys]:
pull_dict[item] = list()
pull_dict["fit_area"] = list()
pull_dict["int_area"] = list()
pull_dict["om"] = list()
pull_dict["Counts"] = list()
for keys in pull_dict:
print(keys)
# populate the dict
for keys in dict1["scan"]:
if "file_of_origin" in dict1["scan"][keys]:
pull_dict["filenames"].append(dict1["scan"][keys]["file_of_origin"].split("/")[-1])
else:
pull_dict["filenames"].append(dict1["meta"]["original_filename"].split("/")[-1])
for key in dict1["scan"][keys]["params"]:
pull_dict[str(key)].append(float(dict1["scan"][keys]["params"][key]))
pull_dict["temperature"].append(dict1["scan"][keys]["temperature"])
pull_dict["mag_field"].append(dict1["scan"][keys]["mag_field"])
pull_dict["fit_area"].append(dict1["scan"][keys]["fit"]["fit_area"])
pull_dict["int_area"].append(dict1["scan"][keys]["fit"]["int_area"])
pull_dict["om"].append(dict1["scan"][keys]["om"])
pull_dict["Counts"].append(dict1["scan"][keys]["Counts"])
for key in variables:
for i in variables[key]:
pull_dict[i].append(_finditem(dict1["scan"][keys], i))
return pd.DataFrame(data=pull_dict)
@ -211,89 +213,74 @@ def save_table(data, filetype, name, path=None):
if filetype == "json":
data.to_json((path + name + ".json"))
def normalize(dict, key, monitor):
"""Normalizes the measurement to monitor, checks if sigma exists, otherwise creates it
:arg dict : dictionary to from which to tkae the scan
:arg key : which scan to normalize from dict1
:arg monitor : final monitor
:return counts - normalized counts
:return sigma - normalized sigma"""
counts = np.array(dict["scan"][key]["Counts"])
sigma = np.sqrt(counts) if "sigma" not in dict["scan"][key] else dict["scan"][key]["sigma"]
monitor_ratio = monitor / dict["scan"][key]["monitor"]
scaled_counts = counts * monitor_ratio
scaled_sigma = np.array(sigma) * monitor_ratio
def normalize(scan, monitor):
"""Normalizes the measurement to monitor, checks if sigma exists, otherwise creates it
:arg dict : dictionary to from which to tkae the scan
:arg key : which scan to normalize from dict1
:arg monitor : final monitor
:return counts - normalized counts
:return sigma - normalized sigma"""
return scaled_counts, scaled_sigma
counts = np.array(scan["Counts"])
sigma = np.sqrt(counts) if "sigma" not in scan else scan["sigma"]
monitor_ratio = monitor / scan["monitor"]
scaled_counts = counts * monitor_ratio
scaled_sigma = np.array(sigma) * monitor_ratio
def merge(dict1, dict2, scand_dict_result, keep=True, monitor=100000):
"""merges the two tuples and sorts them, if om value is same, Counts value is average
averaging is propagated into sigma if dict1 == dict2, key[1] is deleted after merging
:arg dict1 : dictionary to which measurement will be merged
:arg dict2 : dictionary from which measurement will be merged
:arg scand_dict_result : result of scan_dict after auto function
:arg keep : if true, when monitors are same, does not change it, if flase, takes monitor
always
:arg monitor : final monitor after merging
note: dict1 and dict2 can be same dict
:return dict1 with merged scan"""
for keys in scand_dict_result:
for j in range(len(scand_dict_result[keys])):
first, second = scand_dict_result[keys][j][0], scand_dict_result[keys][j][1]
print(first, second)
if keep:
if dict1["scan"][first]["monitor"] == dict2["scan"][second]["monitor"]:
monitor = dict1["scan"][first]["monitor"]
return scaled_counts, scaled_sigma
# load om and Counts
x1, x2 = dict1["scan"][first]["om"], dict2["scan"][second]["om"]
cor_y1, y_err1 = normalize(dict1, first, monitor=monitor)
cor_y2, y_err2 = normalize(dict2, second, monitor=monitor)
# creates touples (om, Counts, sigma) for sorting and further processing
tuple_list = create_tuples(x1, cor_y1, y_err1) + create_tuples(x2, cor_y2, y_err2)
# Sort the list on om and add 0 0 0 tuple to the last position
sorted_t = sorted(tuple_list, key=lambda tup: tup[0])
sorted_t.append((0, 0, 0))
om, Counts, sigma = [], [], []
seen = list()
for i in range(len(sorted_t) - 1):
if sorted_t[i][0] not in seen:
if sorted_t[i][0] != sorted_t[i + 1][0]:
om = np.append(om, sorted_t[i][0])
Counts = np.append(Counts, sorted_t[i][1])
sigma = np.append(sigma, sorted_t[i][2])
else:
om = np.append(om, sorted_t[i][0])
counts1, counts2 = sorted_t[i][1], sorted_t[i + 1][1]
sigma1, sigma2 = sorted_t[i][2], sorted_t[i + 1][2]
count_err1 = u.ufloat(counts1, sigma1)
count_err2 = u.ufloat(counts2, sigma2)
avg = (count_err1 + count_err2) / 2
Counts = np.append(Counts, avg.n)
sigma = np.append(sigma, avg.s)
seen.append(sorted_t[i][0])
else:
continue
if dict1 == dict2:
del dict1["scan"][second]
def merge(scan1, scan2, keep=True, monitor=100000):
"""merges the two tuples and sorts them, if om value is same, Counts value is average
averaging is propagated into sigma if dict1 == dict2, key[1] is deleted after merging
:arg dict1 : dictionary to which measurement will be merged
:arg dict2 : dictionary from which measurement will be merged
:arg scand_dict_result : result of scan_dict after auto function
:arg keep : if true, when monitors are same, does not change it, if flase, takes monitor
always
:arg monitor : final monitor after merging
note: dict1 and dict2 can be same dict
:return dict1 with merged scan"""
note = (
f"This measurement was merged with measurement {second} from "
f'file {dict2["meta"]["original_filename"]} \n'
)
if "notes" not in dict1["scan"][first]:
dict1["scan"][first]["notes"] = note
else:
dict1["scan"][first]["notes"] += note
if keep:
if scan1["monitor"] == scan2["monitor"]:
monitor = scan1["monitor"]
dict1["scan"][first]["om"] = om
dict1["scan"][first]["Counts"] = Counts
dict1["scan"][first]["sigma"] = sigma
dict1["scan"][first]["monitor"] = monitor
print("merging done")
return dict1
# load om and Counts
x1, x2 = scan1["om"], scan2["om"]
cor_y1, y_err1 = normalize(scan1, monitor=monitor)
cor_y2, y_err2 = normalize(scan2, monitor=monitor)
# creates touples (om, Counts, sigma) for sorting and further processing
tuple_list = create_tuples(x1, cor_y1, y_err1) + create_tuples(x2, cor_y2, y_err2)
# Sort the list on om and add 0 0 0 tuple to the last position
sorted_t = sorted(tuple_list, key=lambda tup: tup[0])
sorted_t.append((0, 0, 0))
om, Counts, sigma = [], [], []
seen = list()
for i in range(len(sorted_t) - 1):
if sorted_t[i][0] not in seen:
if sorted_t[i][0] != sorted_t[i + 1][0]:
om = np.append(om, sorted_t[i][0])
Counts = np.append(Counts, sorted_t[i][1])
sigma = np.append(sigma, sorted_t[i][2])
else:
om = np.append(om, sorted_t[i][0])
counts1, counts2 = sorted_t[i][1], sorted_t[i + 1][1]
sigma1, sigma2 = sorted_t[i][2], sorted_t[i + 1][2]
count_err1 = u.ufloat(counts1, sigma1)
count_err2 = u.ufloat(counts2, sigma2)
avg = (count_err1 + count_err2) / 2
Counts = np.append(Counts, avg.n)
sigma = np.append(sigma, avg.s)
seen.append(sorted_t[i][0])
else:
continue
scan1["om"] = om
scan1["Counts"] = Counts
scan1["sigma"] = sigma
scan1["monitor"] = monitor
print("merging done")
def add_dict(dict1, dict2):
@ -304,9 +291,13 @@ def add_dict(dict1, dict2):
:return dict1 : combined dictionary
Note: dict1 must be made from ccl, otherwise we would have to change the structure of loaded
dat file"""
if dict1["meta"]["zebra_mode"] != dict2["meta"]["zebra_mode"]:
print("You are trying to add scans measured with different zebra modes")
return
try:
if dict1["meta"]["zebra_mode"] != dict2["meta"]["zebra_mode"]:
print("You are trying to add scans measured with different zebra modes")
return
# this is for the qscan case
except KeyError:
print("Zebra mode not specified")
max_measurement_dict1 = max([keys for keys in dict1["scan"]])
new_filenames = np.arange(
max_measurement_dict1 + 1, max_measurement_dict1 + 1 + len(dict2["scan"])
@ -369,15 +360,129 @@ def scan_dict(dict, precision=0.5):
itup.append(abs(abs(dict["scan"][i][k]) - abs(dict["scan"][j][k])))
if all(i <= precision for i in itup):
if str([np.around(dict["scan"][i][k], 1) for k in angles]) not in d:
d[str([np.around(dict["scan"][i][k], 1) for k in angles])] = list()
d[str([np.around(dict["scan"][i][k], 1) for k in angles])].append((i, j))
print(itup)
print([dict["scan"][i][k] for k in angles])
print([dict["scan"][j][k] for k in angles])
if str([np.around(dict["scan"][i][k], 0) for k in angles]) not in d:
d[str([np.around(dict["scan"][i][k], 0) for k in angles])] = list()
d[str([np.around(dict["scan"][i][k], 0) for k in angles])].append((i, j))
else:
d[str([np.around(dict["scan"][i][k], 1) for k in angles])].append((i, j))
d[str([np.around(dict["scan"][i][k], 0) for k in angles])].append((i, j))
else:
pass
else:
continue
return d
def _finditem(obj, key):
if key in obj:
return obj[key]
for k, v in obj.items():
if isinstance(v, dict):
item = _finditem(v, key)
if item is not None:
return item
def most_common(lst):
return max(set(lst), key=lst.count)
def variables(dictionary):
"""Funcrion to guess what variables will be used in the param study
i call pripary variable the one the array like variable, usually omega
and secondary the slicing variable, different for each scan,for example temperature"""
# find all variables that are in all scans
stdev_precision = 0.05
all_vars = list()
for keys in dictionary["scan"]:
all_vars.append([key for key in dictionary["scan"][keys] if key != "params"])
if dictionary["scan"][keys]["params"]:
all_vars.append(key for key in dictionary["scan"][keys]["params"])
all_vars = [i for sublist in all_vars for i in sublist]
# get the ones that are in all scans
b = collections.Counter(all_vars)
inall = [key for key in b if b[key] == len(dictionary["scan"])]
# delete those that are obviously wrong
wrong = [
"NP",
"Counts",
"Monitor1",
"Monitor2",
"Monitor3",
"h_index",
"l_index",
"k_index",
"n_points",
"monitor",
"Time",
"omega_angle",
"twotheta_angle",
"chi_angle",
"phi_angle",
"nu_angle",
]
inall_red = [i for i in inall if i not in wrong]
# check for primary variable, needs to be list, we dont suspect the
# primary variable be as a parameter (be in scan[params])
primary_candidates = list()
for key in dictionary["scan"]:
for i in inall_red:
if isinstance(_finditem(dictionary["scan"][key], i), list):
if np.std(_finditem(dictionary["scan"][key], i)) > stdev_precision:
primary_candidates.append(i)
# check which of the primary are in every scan
primary_candidates = collections.Counter(primary_candidates)
second_round_primary_candidates = [
key for key in primary_candidates if primary_candidates[key] == len(dictionary["scan"])
]
if len(second_round_primary_candidates) == 1:
print("We've got a primary winner!", second_round_primary_candidates)
else:
print("Still not sure with primary:(", second_round_primary_candidates)
# check for secondary variable, we suspect a float\int or not changing array
# we dont need to check for primary ones
secondary_candidates = [i for i in inall_red if i not in second_round_primary_candidates]
# print("secondary candidates", secondary_candidates)
# select arrays and floats and ints
second_round_secondary_candidates = list()
for key in dictionary["scan"]:
for i in secondary_candidates:
if isinstance(_finditem(dictionary["scan"][key], i), float):
second_round_secondary_candidates.append(i)
elif isinstance(_finditem(dictionary["scan"][key], i), int):
second_round_secondary_candidates.append(i)
elif isinstance(_finditem(dictionary["scan"][key], i), list):
if np.std(_finditem(dictionary["scan"][key], i)) < stdev_precision:
second_round_secondary_candidates.append(i)
second_round_secondary_candidates = collections.Counter(second_round_secondary_candidates)
second_round_secondary_candidates = [
key
for key in second_round_secondary_candidates
if second_round_secondary_candidates[key] == len(dictionary["scan"])
]
# print("secondary candidates after second round", second_round_secondary_candidates)
# now we check if they vary between the scans
third_round_sec_candidates = list()
for i in second_round_secondary_candidates:
check_array = list()
for keys in dictionary["scan"]:
check_array.append(np.average(_finditem(dictionary["scan"][keys], i)))
# print(i, check_array, np.std(check_array))
if np.std(check_array) > stdev_precision:
third_round_sec_candidates.append(i)
if len(third_round_sec_candidates) == 1:
print("We've got a secondary winner!", third_round_sec_candidates)
else:
print("Still not sure with secondary :(", third_round_sec_candidates)
return {"primary": second_round_primary_candidates, "secondary": third_round_sec_candidates}