Add code for 1D detector #69

Merged
usov_i merged 118 commits from det1d into master 2020-10-22 12:07:50 +02:00
15 changed files with 1952 additions and 55 deletions

13
.vscode/launch.json vendored Normal file
View File

@ -0,0 +1,13 @@
{
"version": "0.2.0",
"configurations": [
{
"name": "pyzebra",
"type": "python",
"request": "launch",
"program": "${workspaceFolder}/pyzebra/cli.py",
"console": "internalConsole",
"env": {},
},
]
}

View File

@ -1,3 +1,8 @@
import pyzebra.ccl_dict_operation
from pyzebra.anatric import * from pyzebra.anatric import *
from pyzebra.ccl_findpeaks import ccl_findpeaks
from pyzebra.comm_export import export_comm
from pyzebra.fit2 import fitccl
from pyzebra.h5 import * from pyzebra.h5 import *
from pyzebra.load_1D import load_1D, parse_1D
from pyzebra.xtal import * from pyzebra.xtal import *

View File

@ -1,10 +1,15 @@
import argparse import argparse
import logging
import sys
from io import StringIO
from bokeh.io import curdoc from bokeh.io import curdoc
from bokeh.models import Tabs from bokeh.layouts import column, row
from bokeh.models import Tabs, TextAreaInput
import panel_anatric import panel_ccl_integrate
import panel_data_viewer import panel_hdf_anatric
import panel_hdf_viewer
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
prog="pyzebra", formatter_class=argparse.ArgumentDefaultsHelpFormatter prog="pyzebra", formatter_class=argparse.ArgumentDefaultsHelpFormatter
@ -15,8 +20,32 @@ args = parser.parse_args()
doc = curdoc() doc = curdoc()
doc.title = "pyzebra" doc.title = "pyzebra"
# Final layout sys.stdout = StringIO()
tab_data_viewer = panel_data_viewer.create() stdout_textareainput = TextAreaInput(title="print output:", height=150)
tab_anatric = panel_anatric.create()
doc.add_root(Tabs(tabs=[tab_data_viewer, tab_anatric])) bokeh_stream = StringIO()
bokeh_handler = logging.StreamHandler(bokeh_stream)
bokeh_handler.setFormatter(logging.Formatter(logging.BASIC_FORMAT))
bokeh_logger = logging.getLogger('bokeh')
bokeh_logger.addHandler(bokeh_handler)
bokeh_log_textareainput = TextAreaInput(title="server output:", height=150)
# Final layout
tab_hdf_viewer = panel_hdf_viewer.create()
tab_hdf_anatric = panel_hdf_anatric.create()
tab_ccl_integrate = panel_ccl_integrate.create()
doc.add_root(
column(
Tabs(tabs=[tab_hdf_viewer, tab_hdf_anatric, tab_ccl_integrate]),
row(stdout_textareainput, bokeh_log_textareainput, sizing_mode="scale_both"),
)
)
def update_stdout():
stdout_textareainput.value = sys.stdout.getvalue()
bokeh_log_textareainput.value = bokeh_stream.getvalue()
doc.add_periodic_callback(update_stdout, 1000)

View File

@ -0,0 +1,548 @@
import base64
import io
import os
import tempfile
import numpy as np
from bokeh.layouts import column, row
from bokeh.models import (
Asterisk,
BasicTicker,
Button,
ColumnDataSource,
CustomJS,
DataRange1d,
DataTable,
Div,
FileInput,
Grid,
Line,
LinearAxis,
Panel,
Plot,
RadioButtonGroup,
Scatter,
Select,
Spacer,
Span,
Spinner,
TableColumn,
TextAreaInput,
TextInput,
Toggle,
Whisker,
)
import pyzebra
javaScript = """
setTimeout(function() {
const filename = 'output' + js_data.data['ext']
const blob = new Blob([js_data.data['cont']], {type: 'text/plain'})
const link = document.createElement('a');
document.body.appendChild(link);
const url = window.URL.createObjectURL(blob);
link.href = url;
link.download = filename;
link.click();
window.URL.revokeObjectURL(url);
document.body.removeChild(link);
}, 500);
"""
PROPOSAL_PATH = "/afs/psi.ch/project/sinqdata/2020/zebra/"
def create():
det_data = {}
peak_pos_textinput_lock = False
js_data = ColumnDataSource(data=dict(cont=[], ext=[]))
def proposal_textinput_callback(_attr, _old, new):
ccl_path = os.path.join(PROPOSAL_PATH, new)
ccl_file_list = []
for file in os.listdir(ccl_path):
if file.endswith(".ccl"):
ccl_file_list.append((os.path.join(ccl_path, file), file))
ccl_file_select.options = ccl_file_list
ccl_file_select.value = ccl_file_list[0][0]
proposal_textinput = TextInput(title="Enter proposal number:", default_size=145)
proposal_textinput.on_change("value", proposal_textinput_callback)
def ccl_file_select_callback(_attr, _old, new):
nonlocal det_data
with open(new) as file:
_, ext = os.path.splitext(new)
det_data = pyzebra.parse_1D(file, ext)
scan_list = list(det_data["scan"].keys())
hkl = [
f'{int(m["h_index"])} {int(m["k_index"])} {int(m["l_index"])}'
for m in det_data["scan"].values()
]
scan_table_source.data.update(
scan=scan_list, hkl=hkl, peaks=[0] * len(scan_list), fit=[0] * len(scan_list)
)
scan_table_source.selected.indices = []
scan_table_source.selected.indices = [0]
ccl_file_select = Select(title="Available .ccl files")
ccl_file_select.on_change("value", ccl_file_select_callback)
def upload_button_callback(_attr, _old, new):
nonlocal det_data
with io.StringIO(base64.b64decode(new).decode()) as file:
_, ext = os.path.splitext(upload_button.filename)
det_data = pyzebra.parse_1D(file, ext)
scan_list = list(det_data["scan"].keys())
hkl = [
f'{int(m["h_index"])} {int(m["k_index"])} {int(m["l_index"])}'
for m in det_data["scan"].values()
]
scan_table_source.data.update(
scan=scan_list, hkl=hkl, peaks=[0] * len(scan_list), fit=[0] * len(scan_list)
)
scan_table_source.selected.indices = []
scan_table_source.selected.indices = [0]
upload_button = FileInput(accept=".ccl")
upload_button.on_change("value", upload_button_callback)
def _update_table():
num_of_peaks = [scan.get("num_of_peaks", 0) for scan in det_data["scan"].values()]
fit_ok = [(1 if "fit" in scan else 0) for scan in det_data["scan"].values()]
scan_table_source.data.update(peaks=num_of_peaks, fit=fit_ok)
def _update_plot(ind):
nonlocal peak_pos_textinput_lock
peak_pos_textinput_lock = True
scan = det_data["scan"][ind]
y = scan["Counts"]
x = scan["om"]
plot_scatter_source.data.update(x=x, y=y, y_upper=y + np.sqrt(y), y_lower=y - np.sqrt(y))
num_of_peaks = scan.get("num_of_peaks")
if num_of_peaks is not None and num_of_peaks > 0:
peak_indexes = scan["peak_indexes"]
if len(peak_indexes) == 1:
peak_pos_textinput.value = str(scan["om"][peak_indexes[0]])
else:
peak_pos_textinput.value = str([scan["om"][ind] for ind in peak_indexes])
plot_peak_source.data.update(x=scan["om"][peak_indexes], y=scan["peak_heights"])
plot_line_smooth_source.data.update(x=x, y=scan["smooth_peaks"])
else:
peak_pos_textinput.value = None
plot_peak_source.data.update(x=[], y=[])
plot_line_smooth_source.data.update(x=[], y=[])
peak_pos_textinput_lock = False
fit = scan.get("fit")
if fit is not None:
plot_gauss_source.data.update(x=x, y=scan["fit"]["comps"]["gaussian"])
plot_bkg_source.data.update(x=x, y=scan["fit"]["comps"]["background"])
params = fit["result"].params
fit_output_textinput.value = (
"%s \n"
"Gaussian: centre = %9.4f, sigma = %9.4f, area = %9.4f \n"
"background: slope = %9.4f, intercept = %9.4f \n"
"Int. area = %9.4f +/- %9.4f \n"
"fit area = %9.4f +/- %9.4f \n"
"ratio((fit-int)/fit) = %9.4f"
% (
ind,
params["g_cen"].value,
params["g_width"].value,
params["g_amp"].value,
params["slope"].value,
params["intercept"].value,
fit["int_area"].n,
fit["int_area"].s,
params["g_amp"].value,
params["g_amp"].stderr,
(params["g_amp"].value - fit["int_area"].n) / params["g_amp"].value,
)
)
numfit_min, numfit_max = fit["numfit"]
if numfit_min is None:
numfit_min_span.location = None
else:
numfit_min_span.location = x[numfit_min]
if numfit_max is None:
numfit_max_span.location = None
else:
numfit_max_span.location = x[numfit_max]
else:
plot_gauss_source.data.update(x=[], y=[])
plot_bkg_source.data.update(x=[], y=[])
fit_output_textinput.value = ""
numfit_min_span.location = None
numfit_max_span.location = None
# Main plot
plot = Plot(
x_range=DataRange1d(),
y_range=DataRange1d(),
plot_height=400,
plot_width=700,
toolbar_location=None,
)
plot.add_layout(LinearAxis(axis_label="Counts"), place="left")
plot.add_layout(LinearAxis(axis_label="Omega"), place="below")
plot.add_layout(Grid(dimension=0, ticker=BasicTicker()))
plot.add_layout(Grid(dimension=1, ticker=BasicTicker()))
plot_scatter_source = ColumnDataSource(dict(x=[0], y=[0], y_upper=[0], y_lower=[0]))
plot.add_glyph(plot_scatter_source, Scatter(x="x", y="y", line_color="steelblue"))
plot.add_layout(Whisker(source=plot_scatter_source, base="x", upper="y_upper", lower="y_lower"))
plot_line_smooth_source = ColumnDataSource(dict(x=[0], y=[0]))
plot.add_glyph(
plot_line_smooth_source, Line(x="x", y="y", line_color="steelblue", line_dash="dashed")
)
plot_gauss_source = ColumnDataSource(dict(x=[0], y=[0]))
plot.add_glyph(plot_gauss_source, Line(x="x", y="y", line_color="red", line_dash="dashed"))
plot_bkg_source = ColumnDataSource(dict(x=[0], y=[0]))
plot.add_glyph(plot_bkg_source, Line(x="x", y="y", line_color="green", line_dash="dashed"))
plot_peak_source = ColumnDataSource(dict(x=[], y=[]))
plot.add_glyph(plot_peak_source, Asterisk(x="x", y="y", size=10, line_color="red"))
numfit_min_span = Span(location=None, dimension="height", line_dash="dashed")
plot.add_layout(numfit_min_span)
numfit_max_span = Span(location=None, dimension="height", line_dash="dashed")
plot.add_layout(numfit_max_span)
# Scan select
def scan_table_callback(_attr, _old, new):
if new:
_update_plot(scan_table_source.data["scan"][new[-1]])
scan_table_source = ColumnDataSource(dict(scan=[], hkl=[], peaks=[], fit=[]))
scan_table = DataTable(
source=scan_table_source,
columns=[
TableColumn(field="scan", title="scan"),
TableColumn(field="hkl", title="hkl"),
TableColumn(field="peaks", title="Peaks"),
TableColumn(field="fit", title="Fit"),
],
width=200,
index_position=None,
)
scan_table_source.selected.on_change("indices", scan_table_callback)
def peak_pos_textinput_callback(_attr, _old, new):
if new is not None and not peak_pos_textinput_lock:
sel_ind = scan_table_source.selected.indices[-1]
scan_name = scan_table_source.data["scan"][sel_ind]
scan = det_data["scan"][scan_name]
scan["num_of_peaks"] = 1
peak_ind = (np.abs(scan["om"] - float(new))).argmin()
scan["peak_indexes"] = np.array([peak_ind], dtype=np.int64)
scan["peak_heights"] = np.array([scan["smooth_peaks"][peak_ind]])
_update_table()
_update_plot(scan_name)
peak_pos_textinput = TextInput(title="Peak position:", default_size=145)
peak_pos_textinput.on_change("value", peak_pos_textinput_callback)
peak_int_ratio_spinner = Spinner(
title="Peak intensity ratio:", value=0.8, step=0.01, low=0, high=1, default_size=145
)
peak_prominence_spinner = Spinner(title="Peak prominence:", value=50, low=0, default_size=145)
smooth_toggle = Toggle(label="Smooth curve", default_size=145)
window_size_spinner = Spinner(title="Window size:", value=7, step=2, low=1, default_size=145)
poly_order_spinner = Spinner(title="Poly order:", value=3, low=0, default_size=145)
centre_guess = Spinner(default_size=100)
centre_vary = Toggle(default_size=100, active=True)
centre_min = Spinner(default_size=100)
centre_max = Spinner(default_size=100)
sigma_guess = Spinner(default_size=100)
sigma_vary = Toggle(default_size=100, active=True)
sigma_min = Spinner(default_size=100)
sigma_max = Spinner(default_size=100)
ampl_guess = Spinner(default_size=100)
ampl_vary = Toggle(default_size=100, active=True)
ampl_min = Spinner(default_size=100)
ampl_max = Spinner(default_size=100)
slope_guess = Spinner(default_size=100)
slope_vary = Toggle(default_size=100, active=True)
slope_min = Spinner(default_size=100)
slope_max = Spinner(default_size=100)
offset_guess = Spinner(default_size=100)
offset_vary = Toggle(default_size=100, active=True)
offset_min = Spinner(default_size=100)
offset_max = Spinner(default_size=100)
integ_from = Spinner(title="Integrate from:", default_size=145)
integ_to = Spinner(title="to:", default_size=145)
def fitparam_reset_button_callback():
centre_guess.value = None
centre_vary.active = True
centre_min.value = None
centre_max.value = None
sigma_guess.value = None
sigma_vary.active = True
sigma_min.value = None
sigma_max.value = None
ampl_guess.value = None
ampl_vary.active = True
ampl_min.value = None
ampl_max.value = None
slope_guess.value = None
slope_vary.active = True
slope_min.value = None
slope_max.value = None
offset_guess.value = None
offset_vary.active = True
offset_min.value = None
offset_max.value = None
integ_from.value = None
integ_to.value = None
fitparam_reset_button = Button(label="Reset to defaults", default_size=145)
fitparam_reset_button.on_click(fitparam_reset_button_callback)
fit_output_textinput = TextAreaInput(title="Fit results:", width=450, height=400)
def peakfind_all_button_callback():
for scan in det_data["scan"].values():
pyzebra.ccl_findpeaks(
scan,
int_threshold=peak_int_ratio_spinner.value,
prominence=peak_prominence_spinner.value,
smooth=smooth_toggle.active,
window_size=window_size_spinner.value,
poly_order=poly_order_spinner.value,
)
_update_table()
sel_ind = scan_table_source.selected.indices[-1]
_update_plot(scan_table_source.data["scan"][sel_ind])
peakfind_all_button = Button(label="Peak Find All", button_type="primary", default_size=145)
peakfind_all_button.on_click(peakfind_all_button_callback)
def peakfind_button_callback():
sel_ind = scan_table_source.selected.indices[-1]
scan = scan_table_source.data["scan"][sel_ind]
pyzebra.ccl_findpeaks(
det_data["scan"][scan],
int_threshold=peak_int_ratio_spinner.value,
prominence=peak_prominence_spinner.value,
smooth=smooth_toggle.active,
window_size=window_size_spinner.value,
poly_order=poly_order_spinner.value,
)
_update_table()
_update_plot(scan)
peakfind_button = Button(label="Peak Find Current", default_size=145)
peakfind_button.on_click(peakfind_button_callback)
def fit_all_button_callback():
for scan in det_data["scan"].values():
pyzebra.fitccl(
scan,
guess=[
centre_guess.value,
sigma_guess.value,
ampl_guess.value,
slope_guess.value,
offset_guess.value,
],
vary=[
centre_vary.active,
sigma_vary.active,
ampl_vary.active,
slope_vary.active,
offset_vary.active,
],
constraints_min=[
centre_min.value,
sigma_min.value,
ampl_min.value,
slope_min.value,
offset_min.value,
],
constraints_max=[
centre_max.value,
sigma_max.value,
ampl_max.value,
slope_max.value,
offset_max.value,
],
numfit_min=integ_from.value,
numfit_max=integ_to.value,
)
sel_ind = scan_table_source.selected.indices[-1]
_update_plot(scan_table_source.data["scan"][sel_ind])
_update_table()
fit_all_button = Button(label="Fit All", button_type="primary", default_size=145)
fit_all_button.on_click(fit_all_button_callback)
def fit_button_callback():
sel_ind = scan_table_source.selected.indices[-1]
scan = scan_table_source.data["scan"][sel_ind]
pyzebra.fitccl(
det_data["scan"][scan],
guess=[
centre_guess.value,
sigma_guess.value,
ampl_guess.value,
slope_guess.value,
offset_guess.value,
],
vary=[
centre_vary.active,
sigma_vary.active,
ampl_vary.active,
slope_vary.active,
offset_vary.active,
],
constraints_min=[
centre_min.value,
sigma_min.value,
ampl_min.value,
slope_min.value,
offset_min.value,
],
constraints_max=[
centre_max.value,
sigma_max.value,
ampl_max.value,
slope_max.value,
offset_max.value,
],
numfit_min=integ_from.value,
numfit_max=integ_to.value,
)
_update_plot(scan)
_update_table()
fit_button = Button(label="Fit Current", default_size=145)
fit_button.on_click(fit_button_callback)
def area_method_radiobutton_callback(_attr, _old, new):
det_data["meta"]["area_method"] = ("fit", "integ")[new]
area_method_radiobutton = RadioButtonGroup(
labels=["Fit", "Integral"], active=0, default_size=145
)
area_method_radiobutton.on_change("active", area_method_radiobutton_callback)
preview_output_textinput = TextAreaInput(title="Export file preview:", width=450, height=400)
def preview_output_button_callback():
if det_data["meta"]["indices"] == "hkl":
ext = ".comm"
elif det_data["meta"]["indices"] == "real":
ext = ".incomm"
with tempfile.TemporaryDirectory() as temp_dir:
temp_file = temp_dir + "/temp"
pyzebra.export_comm(det_data, temp_file)
with open(f"{temp_file}{ext}") as f:
preview_output_textinput.value = f.read()
preview_output_button = Button(label="Preview file", default_size=220)
preview_output_button.on_click(preview_output_button_callback)
def export_results(det_data):
if det_data["meta"]["indices"] == "hkl":
ext = ".comm"
elif det_data["meta"]["indices"] == "real":
ext = ".incomm"
with tempfile.TemporaryDirectory() as temp_dir:
temp_file = temp_dir + "/temp"
pyzebra.export_comm(det_data, temp_file)
with open(f"{temp_file}{ext}") as f:
output_content = f.read()
return output_content, ext
def save_button_callback():
cont, ext = export_results(det_data)
js_data.data.update(cont=[cont], ext=[ext])
save_button = Button(label="Download file", button_type="success", default_size=220)
save_button.on_click(save_button_callback)
save_button.js_on_click(CustomJS(args={"js_data": js_data}, code=javaScript))
findpeak_controls = column(
row(peak_pos_textinput, column(Spacer(height=19), smooth_toggle)),
row(peak_int_ratio_spinner, peak_prominence_spinner),
row(window_size_spinner, poly_order_spinner),
row(peakfind_button, peakfind_all_button),
)
div_1 = Div(text="Guess:")
div_2 = Div(text="Vary:")
div_3 = Div(text="Min:")
div_4 = Div(text="Max:")
div_5 = Div(text="Gauss Centre:", margin=[5, 5, -5, 5])
div_6 = Div(text="Gauss Sigma:", margin=[5, 5, -5, 5])
div_7 = Div(text="Gauss Ampl.:", margin=[5, 5, -5, 5])
div_8 = Div(text="Slope:", margin=[5, 5, -5, 5])
div_9 = Div(text="Offset:", margin=[5, 5, -5, 5])
fitpeak_controls = row(
column(
Spacer(height=36),
div_1,
Spacer(height=12),
div_2,
Spacer(height=12),
div_3,
Spacer(height=12),
div_4,
),
column(div_5, centre_guess, centre_vary, centre_min, centre_max),
column(div_6, sigma_guess, sigma_vary, sigma_min, sigma_max),
column(div_7, ampl_guess, ampl_vary, ampl_min, ampl_max),
column(div_8, slope_guess, slope_vary, slope_min, slope_max),
column(div_9, offset_guess, offset_vary, offset_min, offset_max),
Spacer(width=20),
column(
row(integ_from, integ_to),
row(fitparam_reset_button, area_method_radiobutton),
row(fit_button, fit_all_button),
),
)
export_layout = column(preview_output_textinput, row(preview_output_button, save_button))
upload_div = Div(text="Or upload .ccl file:")
tab_layout = column(
row(proposal_textinput, ccl_file_select),
row(column(Spacer(height=5), upload_div), upload_button),
row(scan_table, plot, Spacer(width=30), fit_output_textinput, export_layout),
row(findpeak_controls, Spacer(width=30), fitpeak_controls),
)
return Panel(child=tab_layout, title="ccl integrate")

View File

@ -406,4 +406,4 @@ def create():
curdoc().add_periodic_callback(update_config, 1000) curdoc().add_periodic_callback(update_config, 1000)
return Panel(child=tab_layout, title="Anatric") return Panel(child=tab_layout, title="hdf anatric")

View File

@ -87,7 +87,8 @@ def create():
temperature_spinner.value = det_data["temperature"][index] temperature_spinner.value = det_data["temperature"][index]
gamma, nu = calculate_pol(det_data, index) gamma, nu = calculate_pol(det_data, index)
image_source.data.update(gamma=[gamma], nu=[nu]) omega = np.ones((IMAGE_H, IMAGE_W)) * det_data["rot_angle"][index]
image_source.data.update(gamma=[gamma], nu=[nu], omega=[omega])
def update_overview_plot(): def update_overview_plot():
h5_data = det_data["data"] h5_data = det_data["data"]
@ -161,6 +162,7 @@ def create():
l=[np.zeros((1, 1))], l=[np.zeros((1, 1))],
gamma=[np.zeros((1, 1))], gamma=[np.zeros((1, 1))],
nu=[np.zeros((1, 1))], nu=[np.zeros((1, 1))],
omega=[np.zeros((1, 1))],
x=[0], x=[0],
y=[0], y=[0],
dw=[IMAGE_W], dw=[IMAGE_W],
@ -173,12 +175,14 @@ def create():
l_glyph = Image(image="l", x="x", y="y", dw="dw", dh="dh", global_alpha=0) l_glyph = Image(image="l", x="x", y="y", dw="dw", dh="dh", global_alpha=0)
gamma_glyph = Image(image="gamma", x="x", y="y", dw="dw", dh="dh", global_alpha=0) gamma_glyph = Image(image="gamma", x="x", y="y", dw="dw", dh="dh", global_alpha=0)
nu_glyph = Image(image="nu", x="x", y="y", dw="dw", dh="dh", global_alpha=0) nu_glyph = Image(image="nu", x="x", y="y", dw="dw", dh="dh", global_alpha=0)
omega_glyph = Image(image="omega", x="x", y="y", dw="dw", dh="dh", global_alpha=0)
plot.add_glyph(image_source, h_glyph) plot.add_glyph(image_source, h_glyph)
plot.add_glyph(image_source, k_glyph) plot.add_glyph(image_source, k_glyph)
plot.add_glyph(image_source, l_glyph) plot.add_glyph(image_source, l_glyph)
plot.add_glyph(image_source, gamma_glyph) plot.add_glyph(image_source, gamma_glyph)
plot.add_glyph(image_source, nu_glyph) plot.add_glyph(image_source, nu_glyph)
plot.add_glyph(image_source, omega_glyph)
image_glyph = Image(image="image", x="x", y="y", dw="dw", dh="dh") image_glyph = Image(image="image", x="x", y="y", dw="dw", dh="dh")
plot.add_glyph(image_source, image_glyph, name="image_glyph") plot.add_glyph(image_source, image_glyph, name="image_glyph")
@ -224,6 +228,7 @@ def create():
("intensity", "@image"), ("intensity", "@image"),
("gamma", "@gamma"), ("gamma", "@gamma"),
("nu", "@nu"), ("nu", "@nu"),
("omega", "@omega"),
("h", "@h"), ("h", "@h"),
("k", "@k"), ("k", "@k"),
("l", "@l"), ("l", "@l"),
@ -376,7 +381,7 @@ def create():
overview_plot_x_image_glyph.color_mapper = LinearColorMapper(palette=cmap_dict[new]) overview_plot_x_image_glyph.color_mapper = LinearColorMapper(palette=cmap_dict[new])
overview_plot_y_image_glyph.color_mapper = LinearColorMapper(palette=cmap_dict[new]) overview_plot_y_image_glyph.color_mapper = LinearColorMapper(palette=cmap_dict[new])
colormap = Select(title="Colormap:", options=list(cmap_dict.keys())) colormap = Select(title="Colormap:", options=list(cmap_dict.keys()), default_size=145)
colormap.on_change("value", colormap_callback) colormap.on_change("value", colormap_callback)
colormap.value = "plasma" colormap.value = "plasma"
@ -395,7 +400,7 @@ def create():
update_image() update_image()
auto_toggle = Toggle(label="Auto Range", active=True, button_type="default") auto_toggle = Toggle(label="Auto Range", active=True, button_type="default", default_size=145)
auto_toggle.on_click(auto_toggle_callback) auto_toggle.on_click(auto_toggle_callback)
# ---- colormap display max value # ---- colormap display max value
@ -409,6 +414,7 @@ def create():
value=1, value=1,
step=STEP, step=STEP,
disabled=auto_toggle.active, disabled=auto_toggle.active,
default_size=145,
) )
display_max_spinner.on_change("value", display_max_spinner_callback) display_max_spinner.on_change("value", display_max_spinner_callback)
@ -423,6 +429,7 @@ def create():
value=0, value=0,
step=STEP, step=STEP,
disabled=auto_toggle.active, disabled=auto_toggle.active,
default_size=145,
) )
display_min_spinner.on_change("value", display_min_spinner_callback) display_min_spinner.on_change("value", display_min_spinner_callback)
@ -465,13 +472,22 @@ def create():
temperature_spinner = Spinner(title="Temperature:", format="0.00", width=145, disabled=True) temperature_spinner = Spinner(title="Temperature:", format="0.00", width=145, disabled=True)
# Final layout # Final layout
layout_image = column( layout_image = column(gridplot([[proj_v, None], [plot, proj_h]], merge_tools=False))
gridplot([[proj_v, None], [plot, proj_h]], merge_tools=False), row(index_spinner) colormap_layout = column(
row(colormap, column(Spacer(height=19), auto_toggle)),
row(display_max_spinner, display_min_spinner),
) )
colormap_layout = column(colormap, auto_toggle, display_max_spinner, display_min_spinner)
hkl_layout = column(radio_button_group, hkl_button) hkl_layout = column(radio_button_group, hkl_button)
params_layout = row(magnetic_field_spinner, temperature_spinner) params_layout = row(magnetic_field_spinner, temperature_spinner)
layout_controls = row(
column(selection_button, selection_list),
Spacer(width=20),
column(frame_button_group, colormap_layout),
Spacer(width=20),
column(index_spinner, params_layout, hkl_layout),
)
layout_overview = column( layout_overview = column(
gridplot( gridplot(
[[overview_plot_x, overview_plot_y]], [[overview_plot_x, overview_plot_y]],
@ -486,12 +502,12 @@ def create():
column( column(
row(column(Spacer(height=5), upload_div), upload_button, filelist), row(column(Spacer(height=5), upload_div), upload_button, filelist),
layout_overview, layout_overview,
row(frame_button_group, selection_button, selection_list), layout_controls,
), ),
column(roi_avg_plot, layout_image, row(colormap_layout, column(params_layout, hkl_layout))), column(roi_avg_plot, layout_image),
) )
return Panel(child=tab_layout, title="Data Viewer") return Panel(child=tab_layout, title="hdf viewer")
def calculate_hkl(det_data, index, setup_type="nb_bi"): def calculate_hkl(det_data, index, setup_type="nb_bi"):

View File

@ -1,19 +0,0 @@
import pickle
def save_dict(obj, name):
""" saves dictionary as pickle file in binary format
:arg obj - object to save
:arg name - name of the file
NOTE: path should be added later"""
with open(name + '.pkl', 'wb') as f:
pickle.dump(obj, f, pickle.HIGHEST_PROTOCOL)
def load_dict(name):
"""load dictionary from picle file
:arg name - name of the file to load
NOTE: expect the file in the same folder, path should be added later
:return dictionary"""
with open(name + '.pkl', 'rb') as f:
return pickle.load(f)

View File

@ -0,0 +1,513 @@
import numpy as np
import uncertainties as u
from .fit2 import create_uncertanities
def add_dict(dict1, dict2):
"""adds two dictionaries, meta of the new is saved as meata+original_filename and
measurements are shifted to continue with numbering of first dict
:arg dict1 : dictionarry to add to
:arg dict2 : dictionarry from which to take the measurements
:return dict1 : combined dictionary
Note: dict1 must be made from ccl, otherwise we would have to change the structure of loaded
dat file"""
max_measurement_dict1 = max([int(str(keys)[1:]) for keys in dict1["scan"]])
if dict2["meta"]["data_type"] == ".ccl":
new_filenames = [
"M" + str(x + max_measurement_dict1)
for x in [int(str(keys)[1:]) for keys in dict2["scan"]]
]
new_meta_name = "meta" + str(dict2["meta"]["original_filename"])
if new_meta_name not in dict1:
for keys, name in zip(dict2["scan"], new_filenames):
dict2["scan"][keys]["file_of_origin"] = str(dict2["meta"]["original_filename"])
dict1["scan"][name] = dict2["scan"][keys]
dict1[new_meta_name] = dict2["meta"]
else:
raise KeyError(
str(
"The file %s has alredy been added to %s"
% (dict2["meta"]["original_filename"], dict1["meta"]["original_filename"])
)
)
elif dict2["meta"]["data_type"] == ".dat":
d = {}
new_name = "M" + str(max_measurement_dict1 + 1)
hkl = dict2["meta"]["title"]
d["h_index"] = float(hkl.split()[-3])
d["k_index"] = float(hkl.split()[-2])
d["l_index"] = float(hkl.split()[-1])
d["number_of_measurements"] = len(dict2["scan"]["NP"])
d["om"] = dict2["scan"]["om"]
d["Counts"] = dict2["scan"]["Counts"]
d["monitor"] = dict2["scan"]["Monitor1"][0]
d["temperature"] = dict2["meta"]["temp"]
d["mag_field"] = dict2["meta"]["mf"]
d["omega_angle"] = dict2["meta"]["omega"]
dict1["scan"][new_name] = d
print(hkl.split())
for keys in d:
print(keys)
print("s")
return dict1
def auto(dict):
"""takes just unique tuples from all tuples in dictionary returend by scan_dict
intendet for automatic merge if you doesent want to specify what scans to merge together
args: dict - dictionary from scan_dict function
:return dict - dict without repetitions"""
for keys in dict:
tuple_list = dict[keys]
new = list()
for i in range(len(tuple_list)):
if tuple_list[0][0] == tuple_list[i][0]:
new.append(tuple_list[i])
dict[keys] = new
return dict
def scan_dict(dict):
"""scans dictionary for duplicate hkl indexes
:arg dict : dictionary to scan
:return dictionary with matching scans, if there are none, the dict is empty
note: can be checked by "not d", true if empty
"""
d = {}
for i in dict["scan"]:
for j in dict["scan"]:
if dict["scan"][str(i)] != dict["scan"][str(j)]:
itup = (
dict["scan"][str(i)]["h_index"],
dict["scan"][str(i)]["k_index"],
dict["scan"][str(i)]["l_index"],
)
jtup = (
dict["scan"][str(j)]["h_index"],
dict["scan"][str(j)]["k_index"],
dict["scan"][str(j)]["l_index"],
)
if itup != jtup:
pass
else:
if str(itup) not in d:
d[str(itup)] = list()
d[str(itup)].append((i, j))
else:
d[str(itup)].append((i, j))
else:
continue
return d
def compare_hkl(dict1, dict2):
"""Compares two dictionaries based on hkl indexes and return dictionary with str(h k l) as
key and tuple with keys to same scan in dict1 and dict2
:arg dict1 : first dictionary
:arg dict2 : second dictionary
:return d : dict with matches
example of one key: '0.0 0.0 -1.0 : ('M1', 'M9')' meaning that 001 hkl scan is M1 in
first dict and M9 in second"""
d = {}
dupl = 0
for keys in dict1["scan"]:
for key in dict2["scan"]:
if (
dict1["scan"][str(keys)]["h_index"] == dict2["scan"][str(key)]["h_index"]
and dict1["scan"][str(keys)]["k_index"] == dict2["scan"][str(key)]["k_index"]
and dict1["scan"][str(keys)]["l_index"] == dict2["scan"][str(key)]["l_index"]
):
if (
str(
(
str(dict1["scan"][str(keys)]["h_index"])
+ " "
+ str(dict1["scan"][str(keys)]["k_index"])
+ " "
+ str(dict1["scan"][str(keys)]["l_index"])
)
)
not in d
):
d[
str(
str(dict1["scan"][str(keys)]["h_index"])
+ " "
+ str(dict1["scan"][str(keys)]["k_index"])
+ " "
+ str(dict1["scan"][str(keys)]["l_index"])
)
] = (str(keys), str(key))
else:
dupl = dupl + 1
d[
str(
str(dict1["scan"][str(keys)]["h_index"])
+ " "
+ str(dict1["scan"][str(keys)]["k_index"])
+ " "
+ str(dict1["scan"][str(keys)]["l_index"])
+ "_dupl"
+ str(dupl)
)
] = (str(keys), str(key))
else:
continue
return d
def create_tuples(x, y, y_err):
"""creates tuples for sorting and merginng of the data
Counts need to be normalized to monitor before"""
t = list()
for i in range(len(x)):
tup = (x[i], y[i], y_err[i])
t.append(tup)
return t
def normalize(dict, key, monitor):
"""Normalizes the scan to monitor, checks if sigma exists, otherwise creates it
:arg dict : dictionary to from which to tkae the scan
:arg key : which scan to normalize from dict1
:arg monitor : final monitor
:return counts - normalized counts
:return sigma - normalized sigma"""
counts = np.array(dict["scan"][key]["Counts"])
sigma = np.sqrt(counts) if "sigma" not in dict["scan"][key] else dict["scan"][key]["sigma"]
monitor_ratio = monitor / dict["scan"][key]["monitor"]
scaled_counts = counts * monitor_ratio
scaled_sigma = np.array(sigma) * monitor_ratio
return scaled_counts, scaled_sigma
def merge(dict1, dict2, keys, auto=True, monitor=100000):
"""merges the two tuples and sorts them, if om value is same, Counts value is average
averaging is propagated into sigma if dict1 == dict2, key[1] is deleted after merging
:arg dict1 : dictionary to which scan will be merged
:arg dict2 : dictionary from which scan will be merged
:arg keys : tuple with key to dict1 and dict2
:arg auto : if true, when monitors are same, does not change it, if flase, takes monitor always
:arg monitor : final monitor after merging
note: dict1 and dict2 can be same dict
:return dict1 with merged scan"""
if auto:
if dict1["scan"][keys[0]]["monitor"] == dict2["scan"][keys[1]]["monitor"]:
monitor = dict1["scan"][keys[0]]["monitor"]
# load om and Counts
x1, x2 = dict1["scan"][keys[0]]["om"], dict2["scan"][keys[1]]["om"]
cor_y1, y_err1 = normalize(dict1, keys[0], monitor=monitor)
cor_y2, y_err2 = normalize(dict2, keys[1], monitor=monitor)
# creates touples (om, Counts, sigma) for sorting and further processing
tuple_list = create_tuples(x1, cor_y1, y_err1) + create_tuples(x2, cor_y2, y_err2)
# Sort the list on om and add 0 0 0 tuple to the last position
sorted_t = sorted(tuple_list, key=lambda tup: tup[0])
sorted_t.append((0, 0, 0))
om, Counts, sigma = [], [], []
seen = list()
for i in range(len(sorted_t) - 1):
if sorted_t[i][0] not in seen:
if sorted_t[i][0] != sorted_t[i + 1][0]:
om = np.append(om, sorted_t[i][0])
Counts = np.append(Counts, sorted_t[i][1])
sigma = np.append(sigma, sorted_t[i][2])
else:
om = np.append(om, sorted_t[i][0])
counts1, counts2 = sorted_t[i][1], sorted_t[i + 1][1]
sigma1, sigma2 = sorted_t[i][2], sorted_t[i + 1][2]
count_err1 = u.ufloat(counts1, sigma1)
count_err2 = u.ufloat(counts2, sigma2)
avg = (count_err1 + count_err2) / 2
Counts = np.append(Counts, avg.n)
sigma = np.append(sigma, avg.s)
seen.append(sorted_t[i][0])
else:
continue
if dict1 == dict2:
del dict1["scan"][keys[1]]
note = (
f"This scan was merged with scan {keys[1]} from "
f'file {dict2["meta"]["original_filename"]} \n'
)
if "notes" not in dict1["scan"][str(keys[0])]:
dict1["scan"][str(keys[0])]["notes"] = note
else:
dict1["scan"][str(keys[0])]["notes"] += note
dict1["scan"][keys[0]]["om"] = om
dict1["scan"][keys[0]]["Counts"] = Counts
dict1["scan"][keys[0]]["sigma"] = sigma
dict1["scan"][keys[0]]["monitor"] = monitor
print("merging done")
return dict1
def substract_measurement(dict1, dict2, keys, auto=True, monitor=100000):
"""Substracts two scan (scan key2 from dict2 from measurent key1 in dict1), expects om to be same
:arg dict1 : dictionary to which scan will be merged
:arg dict2 : dictionary from which scan will be merged
:arg keys : tuple with key to dict1 and dict2
:arg auto : if true, when monitors are same, does not change it, if flase, takes monitor always
:arg monitor : final monitor after merging
:returns d : dict1 with substracted Counts from dict2 and sigma that comes from the substraction"""
if len(dict1["scan"][keys[0]]["om"]) != len(dict2["scan"][keys[1]]["om"]):
raise ValueError("Omegas have different lengths, cannot be substracted")
if auto:
if dict1["scan"][keys[0]]["monitor"] == dict2["scan"][keys[1]]["monitor"]:
monitor = dict1["scan"][keys[0]]["monitor"]
cor_y1, y_err1 = normalize(dict1, keys[0], monitor=monitor)
cor_y2, y_err2 = normalize(dict2, keys[1], monitor=monitor)
dict1_count_err = create_uncertanities(cor_y1, y_err1)
dict2_count_err = create_uncertanities(cor_y2, y_err2)
res = np.subtract(dict1_count_err, dict2_count_err)
res_nom = []
res_err = []
for k in range(len(res)):
res_nom = np.append(res_nom, res[k].n)
res_err = np.append(res_err, res[k].s)
if len([num for num in res_nom if num < 0]) >= 0.3 * len(res_nom):
print(
f"Warning! percentage of negative numbers in scan subsracted {keys[0]} is "
f"{len([num for num in res_nom if num < 0]) / len(res_nom)}"
)
dict1["scan"][str(keys[0])]["Counts"] = res_nom
dict1["scan"][str(keys[0])]["sigma"] = res_err
dict1["scan"][str(keys[0])]["monitor"] = monitor
note = (
f'Scan {keys[1]} from file {dict2["meta"]["original_filename"]} '
f"was substracted from this scan \n"
)
if "notes" not in dict1["scan"][str(keys[0])]:
dict1["scan"][str(keys[0])]["notes"] = note
else:
dict1["scan"][str(keys[0])]["notes"] += note
return dict1
def compare_dict(dict1, dict2):
"""takes two ccl dictionaries and compare different values for each key
:arg dict1 : dictionary 1 (ccl)
:arg dict2 : dictionary 2 (ccl)
:returns warning : dictionary with keys from primary files (if they differ) with
information of how many scan differ and which ones differ
:returns report_string string comparing all different values respecively of measurements"""
if dict1["meta"]["data_type"] != dict2["meta"]["data_type"]:
print("select two dicts")
return
S = []
conflicts = {}
warnings = {}
comp = compare_hkl(dict1, dict2)
d1 = scan_dict(dict1)
d2 = scan_dict(dict2)
if not d1:
S.append("There are no duplicates in %s (dict1) \n" % dict1["meta"]["original_filename"])
else:
S.append(
"There are %d duplicates in %s (dict1) \n"
% (len(d1), dict1["meta"]["original_filename"])
)
warnings["Duplicates in dict1"] = list()
for keys in d1:
S.append("Measurements %s with hkl %s \n" % (d1[keys], keys))
warnings["Duplicates in dict1"].append(d1[keys])
if not d2:
S.append("There are no duplicates in %s (dict2) \n" % dict2["meta"]["original_filename"])
else:
S.append(
"There are %d duplicates in %s (dict2) \n"
% (len(d2), dict2["meta"]["original_filename"])
)
warnings["Duplicates in dict2"] = list()
for keys in d2:
S.append("Measurements %s with hkl %s \n" % (d2[keys], keys))
warnings["Duplicates in dict2"].append(d2[keys])
# compare meta
S.append("Different values in meta: \n")
different_meta = {
k: dict1["meta"][k]
for k in dict1["meta"]
if k in dict2["meta"] and dict1["meta"][k] != dict2["meta"][k]
}
exlude_meta_set = ["original_filename", "date", "title"]
for keys in different_meta:
if keys in exlude_meta_set:
continue
else:
if keys not in conflicts:
conflicts[keys] = 1
else:
conflicts[keys] = conflicts[keys] + 1
S.append(" Different values in %s \n" % str(keys))
S.append(" dict1: %s \n" % str(dict1["meta"][str(keys)]))
S.append(" dict2: %s \n" % str(dict2["meta"][str(keys)]))
# compare Measurements
S.append(
"Number of measurements in %s = %s \n"
% (dict1["meta"]["original_filename"], len(dict1["scan"]))
)
S.append(
"Number of measurements in %s = %s \n"
% (dict2["meta"]["original_filename"], len(dict2["scan"]))
)
S.append("Different values in Measurements:\n")
select_set = ["om", "Counts", "sigma"]
exlude_set = ["time", "Counts", "date", "notes"]
for keys1 in comp:
for key2 in dict1["scan"][str(comp[str(keys1)][0])]:
if key2 in exlude_set:
continue
if key2 not in select_set:
try:
if (
dict1["scan"][comp[str(keys1)][0]][str(key2)]
!= dict2["scan"][str(comp[str(keys1)][1])][str(key2)]
):
S.append(
"Scan value "
"%s"
", with hkl %s differs in meausrements %s and %s \n"
% (key2, keys1, comp[str(keys1)][0], comp[str(keys1)][1])
)
S.append(
" dict1: %s \n"
% str(dict1["scan"][comp[str(keys1)][0]][str(key2)])
)
S.append(
" dict2: %s \n"
% str(dict2["scan"][comp[str(keys1)][1]][str(key2)])
)
if key2 not in conflicts:
conflicts[key2] = {}
conflicts[key2]["amount"] = 1
conflicts[key2]["scan"] = str(comp[str(keys1)])
else:
conflicts[key2]["amount"] = conflicts[key2]["amount"] + 1
conflicts[key2]["scan"] = (
conflicts[key2]["scan"] + " " + (str(comp[str(keys1)]))
)
except KeyError as e:
print("Missing keys, some files were probably merged or substracted")
print(e.args)
else:
try:
comparison = list(dict1["scan"][comp[str(keys1)][0]][str(key2)]) == list(
dict2["scan"][comp[str(keys1)][1]][str(key2)]
)
if len(list(dict1["scan"][comp[str(keys1)][0]][str(key2)])) != len(
list(dict2["scan"][comp[str(keys1)][1]][str(key2)])
):
if str("different length of %s" % key2) not in warnings:
warnings[str("different length of %s" % key2)] = list()
warnings[str("different length of %s" % key2)].append(
(str(comp[keys1][0]), str(comp[keys1][1]))
)
else:
warnings[str("different length of %s" % key2)].append(
(str(comp[keys1][0]), str(comp[keys1][1]))
)
if not comparison:
S.append(
"Scan value "
"%s"
" differs in scan %s and %s \n"
% (key2, comp[str(keys1)][0], comp[str(keys1)][1])
)
S.append(
" dict1: %s \n"
% str(list(dict1["scan"][comp[str(keys1)][0]][str(key2)]))
)
S.append(
" dict2: %s \n"
% str(list(dict2["scan"][comp[str(keys1)][1]][str(key2)]))
)
if key2 not in conflicts:
conflicts[key2] = {}
conflicts[key2]["amount"] = 1
conflicts[key2]["scan"] = str(comp[str(keys1)])
else:
conflicts[key2]["amount"] = conflicts[key2]["amount"] + 1
conflicts[key2]["scan"] = (
conflicts[key2]["scan"] + " " + (str(comp[str(keys1)]))
)
except KeyError as e:
print("Missing keys, some files were probably merged or substracted")
print(e.args)
for keys in conflicts:
try:
conflicts[str(keys)]["scan"] = conflicts[str(keys)]["scan"].split(" ")
except:
continue
report_string = "".join(S)
return warnings, conflicts, report_string
def guess_next(dict1, dict2, comp):
"""iterates thorough the scans and tries to decide if the scans should be
substracted or merged"""
threshold = 0.05
for keys in comp:
if (
abs(
(
dict1["scan"][str(comp[keys][0])]["temperature"]
- dict2["scan"][str(comp[keys][1])]["temperature"]
)
/ dict2["scan"][str(comp[keys][1])]["temperature"]
)
< threshold
and abs(
(
dict1["scan"][str(comp[keys][0])]["mag_field"]
- dict2["scan"][str(comp[keys][1])]["mag_field"]
)
/ dict2["scan"][str(comp[keys][1])]["mag_field"]
)
< threshold
):
comp[keys] = comp[keys] + tuple("m")
else:
comp[keys] = comp[keys] + tuple("s")
return comp
def process_dict(dict1, dict2, comp):
"""substracts or merges scans, guess_next function must run first """
for keys in comp:
if comp[keys][2] == "s":
substract_measurement(dict1, dict2, comp[keys])
elif comp[keys][2] == "m":
merge(dict1, dict2, comp[keys])
return dict1

75
pyzebra/ccl_findpeaks.py Normal file
View File

@ -0,0 +1,75 @@
import numpy as np
import scipy as sc
from scipy.interpolate import interp1d
from scipy.signal import savgol_filter
def ccl_findpeaks(
scan, int_threshold=0.8, prominence=50, smooth=False, window_size=7, poly_order=3
):
"""function iterates through the dictionary created by load_cclv2 and locates peaks for each scan
args: scan - a single scan,
int_threshold - fraction of threshold_intensity/max_intensity, must be positive num between 0 and 1
i.e. will only detect peaks above 75% of max intensity
prominence - defines a drop of values that must be between two peaks, must be positive number
i.e. if promimence is 20, it will detect two neigbouring peaks of 300 and 310 intesities,
if none of the itermediate values are lower that 290
smooth - if true, smooths data by savitzky golay filter, if false - no smoothing
window_size - window size for savgol filter, must be odd positive integer
poly_order = order of the polynomial used in savgol filter, must be positive integer smaller than
window_size returns: dictionary with following structure:
D{M34{ 'num_of_peaks': 1, #num of peaks
'peak_indexes': [20], # index of peaks in omega array
'peak_heights': [90.], # height of the peaks (if data vere smoothed
its the heigh of the peaks in smoothed data)
"""
if not 0 <= int_threshold <= 1:
int_threshold = 0.8
print(
"Invalid value for int_threshold, select value between 0 and 1, new value set to:",
int_threshold,
)
if not isinstance(window_size, int) or (window_size % 2) == 0 or window_size <= 1:
window_size = 7
print(
"Invalid value for window_size, select positive odd integer, new value set to!:",
window_size,
)
if not isinstance(poly_order, int) or window_size < poly_order:
poly_order = 3
print(
"Invalid value for poly_order, select positive integer smaller than window_size, new value set to:",
poly_order,
)
if not isinstance(prominence, (int, float)) and prominence < 0:
prominence = 50
print("Invalid value for prominence, select positive number, new value set to:", prominence)
omega = scan["om"]
counts = np.array(scan["Counts"])
if smooth:
itp = interp1d(omega, counts, kind="linear")
absintensity = [abs(number) for number in counts]
lowest_intensity = min(absintensity)
counts[counts < 0] = lowest_intensity
smooth_peaks = savgol_filter(itp(omega), window_size, poly_order)
else:
smooth_peaks = counts
peaks, properties = sc.signal.find_peaks(
smooth_peaks, height=int_threshold * max(smooth_peaks), prominence=prominence
)
scan["num_of_peaks"] = len(peaks)
scan["peak_indexes"] = peaks
scan["peak_heights"] = properties["peak_heights"]
scan["smooth_peaks"] = smooth_peaks # smoothed curve

80
pyzebra/comm_export.py Normal file
View File

@ -0,0 +1,80 @@
import numpy as np
def correction(value, lorentz=True, zebra_mode="--", ang1=0, ang2=0):
if lorentz is False:
return value
else:
if zebra_mode == "bi":
corr_value = np.abs(value * np.sin(ang1))
return corr_value
elif zebra_mode == "nb":
corr_value = np.abs(value * np.sin(ang1) * np.cos(ang2))
return corr_value
def export_comm(data, path, lorentz=False):
"""exports data in the *.comm format
:param lorentz: perform Lorentz correction
:param path: path to file + name
:arg data - data to export, is dict after peak fitting
"""
zebra_mode = data["meta"]["zebra_mode"]
align = ">"
if data["meta"]["indices"] == "hkl":
extension = ".comm"
padding = [6, 4, 10, 8]
elif data["meta"]["indices"] == "real":
extension = ".incomm"
padding = [4, 6, 10, 8]
with open(str(path + extension), "w") as out_file:
for key, scan in data["scan"].items():
if "fit" not in scan:
print("Scan skipped - no fit value for:", key)
continue
scan_number_str = f"{key:{align}{padding[0]}}"
h_str = f'{int(scan["h_index"]):{padding[1]}}'
k_str = f'{int(scan["k_index"]):{padding[1]}}'
l_str = f'{int(scan["l_index"]):{padding[1]}}'
if data["meta"]["area_method"] == "fit":
area = float(scan["fit"]["fit_area"].n)
sigma_str = (
f'{"{:8.2f}".format(float(scan["fit"]["fit_area"].s)):{align}{padding[2]}}'
)
elif data["meta"]["area_method"] == "integ":
area = float(scan["fit"]["int_area"].n)
sigma_str = (
f'{"{:8.2f}".format(float(scan["fit"]["int_area"].s)):{align}{padding[2]}}'
)
if zebra_mode == "bi":
area = correction(area, lorentz, zebra_mode, scan["twotheta_angle"])
int_str = f'{"{:8.2f}".format(area):{align}{padding[2]}}'
angle_str1 = f'{scan["twotheta_angle"]:{padding[3]}}'
angle_str2 = f'{scan["omega_angle"]:{padding[3]}}'
angle_str3 = f'{scan["chi_angle"]:{padding[3]}}'
angle_str4 = f'{scan["phi_angle"]:{padding[3]}}'
elif zebra_mode == "nb":
area = correction(area, lorentz, zebra_mode, scan["gamma_angle"], scan["nu_angle"])
int_str = f'{"{:8.2f}".format(area):{align}{padding[2]}}'
angle_str1 = f'{scan["gamma_angle"]:{padding[3]}}'
angle_str2 = f'{scan["omega_angle"]:{padding[3]}}'
angle_str3 = f'{scan["nu_angle"]:{padding[3]}}'
angle_str4 = f'{scan["unkwn_angle"]:{padding[3]}}'
line = (
scan_number_str
+ h_str
+ l_str
+ k_str
+ int_str
+ sigma_str
+ angle_str1
+ angle_str2
+ angle_str3
+ angle_str4
+ "\n"
)
out_file.write(line)

227
pyzebra/fit2.py Normal file
View File

@ -0,0 +1,227 @@
import numpy as np
import uncertainties as u
from lmfit import Model, Parameters
from scipy.integrate import simps
def bin_data(array, binsize):
if isinstance(binsize, int) and 0 < binsize < len(array):
return [
np.mean(array[binsize * i : binsize * i + binsize])
for i in range(int(np.ceil(len(array) / binsize)))
]
else:
print("Binsize need to be positive integer smaller than lenght of array")
return array
def find_nearest(array, value):
# find nearest value and return index
array = np.asarray(array)
idx = (np.abs(array - value)).argmin()
return idx
def create_uncertanities(y, y_err):
# create array with uncertanities for error propagation
combined = np.array([])
for i in range(len(y)):
part = u.ufloat(y[i], y_err[i])
combined = np.append(combined, part)
return combined
def fitccl(
scan,
guess,
vary,
constraints_min,
constraints_max,
numfit_min=None,
numfit_max=None,
binning=None,
):
"""Made for fitting of ccl date where 1 peak is expected. Allows for combination of gaussian and linear model combination
:param scan: scan in the data dict (i.e. M123)
:param guess: initial guess for the fitting, if none, some values are added automatically in order (see below)
:param vary: True if parameter can vary during fitting, False if it to be fixed
:param numfit_min: minimal value on x axis for numerical integration - if none is centre of gaussian minus 3 sigma
:param numfit_max: maximal value on x axis for numerical integration - if none is centre of gaussian plus 3 sigma
:param constraints_min: min constranits value for fit
:param constraints_max: max constranits value for fit
:param binning : binning of the data
:return data dict with additional values
order for guess, vary, constraints_min, constraints_max:
[Gaussian centre, Gaussian sigma, Gaussian amplitude, background slope, background intercept]
examples:
guess = [None, None, 100, 0, None]
vary = [True, True, True, True, True]
constraints_min = [23, None, 50, 0, 0]
constraints_min = [80, None, 1000, 0, 100]
"""
if len(scan["peak_indexes"]) > 1:
# return in case of more than 1 peaks
print("More than 1 peak, scan skipped")
return
if binning is None or binning == 0 or binning == 1:
x = list(scan["om"])
y = list(scan["Counts"])
y_err = list(np.sqrt(y)) if scan.get("sigma", None) is None else list(scan["sigma"])
print(scan["peak_indexes"])
if not scan["peak_indexes"]:
centre = np.mean(x)
else:
centre = x[int(scan["peak_indexes"])]
else:
x = list(scan["om"])
if not scan["peak_indexes"]:
centre = np.mean(x)
else:
centre = x[int(scan["peak_indexes"])]
x = bin_data(x, binning)
y = list(scan["Counts"])
y_err = list(np.sqrt(y)) if scan.get("sigma", None) is None else list(scan["sigma"])
combined = bin_data(create_uncertanities(y, y_err), binning)
y = [combined[i].n for i in range(len(combined))]
y_err = [combined[i].s for i in range(len(combined))]
if len(scan["peak_indexes"]) == 0:
# Case for no peak, gaussian in centre, sigma as 20% of range
print("No peak")
peak_index = find_nearest(x, np.mean(x))
guess[0] = centre if guess[0] is None else guess[0]
guess[1] = (x[-1] - x[0]) / 5 if guess[1] is None else guess[1]
guess[2] = 50 if guess[2] is None else guess[2]
guess[3] = 0 if guess[3] is None else guess[3]
guess[4] = np.mean(y) if guess[4] is None else guess[4]
constraints_min[2] = 0
elif len(scan["peak_indexes"]) == 1:
# case for one peak, takse into account users guesses
print("one peak")
peak_height = scan["peak_heights"]
guess[0] = centre if guess[0] is None else guess[0]
guess[1] = 0.1 if guess[1] is None else guess[1]
guess[2] = float(peak_height / 10) if guess[2] is None else float(guess[2])
guess[3] = 0 if guess[3] is None else guess[3]
guess[4] = np.median(x) if guess[4] is None else guess[4]
constraints_min[0] = np.min(x) if constraints_min[0] is None else constraints_min[0]
constraints_max[0] = np.max(x) if constraints_max[0] is None else constraints_max[0]
def gaussian(x, g_cen, g_width, g_amp):
"""1-d gaussian: gaussian(x, amp, cen, wid)"""
return (g_amp / (np.sqrt(2 * np.pi) * g_width)) * np.exp(
-((x - g_cen) ** 2) / (2 * g_width ** 2)
)
def background(x, slope, intercept):
"""background"""
return slope * (x - centre) + intercept
mod = Model(gaussian) + Model(background)
params = Parameters()
params.add_many(
("g_cen", guess[0], bool(vary[0]), np.min(x), np.max(x), None, None),
("g_width", guess[1], bool(vary[1]), constraints_min[1], constraints_max[1], None, None),
("g_amp", guess[2], bool(vary[2]), constraints_min[2], constraints_max[2], None, None),
("slope", guess[3], bool(vary[3]), constraints_min[3], constraints_max[3], None, None),
("intercept", guess[4], bool(vary[4]), constraints_min[4], constraints_max[4], None, None),
)
# the weighted fit
try:
result = mod.fit(
y, params, weights=[np.abs(1 / val) for val in y_err], x=x, calc_covar=True,
)
except ValueError:
return
if result.params["g_amp"].stderr is None:
result.params["g_amp"].stderr = result.params["g_amp"].value
elif result.params["g_amp"].stderr > result.params["g_amp"].value:
result.params["g_amp"].stderr = result.params["g_amp"].value
# u.ufloat to work with uncertanities
fit_area = u.ufloat(result.params["g_amp"].value, result.params["g_amp"].stderr)
comps = result.eval_components()
if len(scan["peak_indexes"]) == 0:
# for case of no peak, there is no reason to integrate, therefore fit and int are equal
int_area = fit_area
elif len(scan["peak_indexes"]) == 1:
gauss_3sigmamin = find_nearest(
x, result.params["g_cen"].value - 3 * result.params["g_width"].value
)
gauss_3sigmamax = find_nearest(
x, result.params["g_cen"].value + 3 * result.params["g_width"].value
)
numfit_min = gauss_3sigmamin if numfit_min is None else find_nearest(x, numfit_min)
numfit_max = gauss_3sigmamax if numfit_max is None else find_nearest(x, numfit_max)
it = -1
while abs(numfit_max - numfit_min) < 3:
# in the case the peak is very thin and numerical integration would be on zero omega
# difference, finds closes values
it = it + 1
numfit_min = find_nearest(
x,
result.params["g_cen"].value - 3 * (1 + it / 10) * result.params["g_width"].value,
)
numfit_max = find_nearest(
x,
result.params["g_cen"].value + 3 * (1 + it / 10) * result.params["g_width"].value,
)
if x[numfit_min] < np.min(x):
# makes sure that the values supplied by user lay in the omega range
# can be ommited for users who know what they're doing
numfit_min = gauss_3sigmamin
print("Minimal integration value outside of x range")
elif x[numfit_min] >= x[numfit_max]:
numfit_min = gauss_3sigmamin
print("Minimal integration value higher than maximal")
else:
pass
if x[numfit_max] > np.max(x):
numfit_max = gauss_3sigmamax
print("Maximal integration value outside of x range")
elif x[numfit_max] <= x[numfit_min]:
numfit_max = gauss_3sigmamax
print("Maximal integration value lower than minimal")
else:
pass
count_errors = create_uncertanities(y, y_err)
# create error vector for numerical integration propagation
num_int_area = simps(count_errors[numfit_min:numfit_max], x[numfit_min:numfit_max])
slope_err = u.ufloat(result.params["slope"].value, result.params["slope"].stderr)
# pulls the nominal and error values from fit (slope)
intercept_err = u.ufloat(
result.params["intercept"].value, result.params["intercept"].stderr
)
# pulls the nominal and error values from fit (intercept)
background_errors = np.array([])
for j in range(len(x[numfit_min:numfit_max])):
# creates nominal and error vector for numerical integration of background
bg = slope_err * (x[j] - centre) + intercept_err
background_errors = np.append(background_errors, bg)
num_int_background = simps(background_errors, x[numfit_min:numfit_max])
int_area = num_int_area - num_int_background
d = {}
for pars in result.params:
d[str(pars)] = (result.params[str(pars)].value, result.params[str(pars)].vary)
print(result.fit_report())
print((result.params["g_amp"].value - int_area.n) / result.params["g_amp"].value)
d["ratio"] = (result.params["g_amp"].value - int_area.n) / result.params["g_amp"].value
d["int_area"] = int_area
d["fit_area"] = u.ufloat(result.params["g_amp"].value, result.params["g_amp"].stderr)
d["full_report"] = result.fit_report()
d["result"] = result
d["comps"] = comps
d["numfit"] = [numfit_min, numfit_max]
scan["fit"] = d

View File

@ -22,7 +22,7 @@ def parse_h5meta(file):
for line in file: for line in file:
line = line.strip() line = line.strip()
if line.startswith("#begin "): if line.startswith("#begin "):
section = line[len("#begin "):] section = line[len("#begin ") :]
content[section] = [] content[section] = []
elif line.startswith("#end"): elif line.startswith("#end"):
@ -64,20 +64,3 @@ def read_detector_data(filepath):
det_data["temperature"] = h5f["/entry1/sample/temperature"][:] det_data["temperature"] = h5f["/entry1/sample/temperature"][:]
return det_data return det_data
def open_h5meta(filepath):
"""Open h5meta file like *.cami
Args:
filepath (str): File path of a h5meta file.
Returns:
dict: A dictionary with h5 names and their detector data and angles.
"""
data = dict()
h5meta_content = read_h5meta(filepath)
for file in h5meta_content["filelist"]:
data[file] = read_detector_data(file)
return data

221
pyzebra/load_1D.py Normal file
View File

@ -0,0 +1,221 @@
import os
import re
from collections import defaultdict
from decimal import Decimal
import numpy as np
META_VARS_STR = (
"instrument",
"title",
"sample",
"user",
"ProposalID",
"original_filename",
"date",
"zebra_mode",
"proposal",
"proposal_user",
"proposal_title",
"proposal_email",
"detectorDistance",
)
META_VARS_FLOAT = (
"omega",
"mf",
"2-theta",
"chi",
"phi",
"nu",
"temp",
"wavelenght",
"a",
"b",
"c",
"alpha",
"beta",
"gamma",
"cex1",
"cex2",
"mexz",
"moml",
"mcvl",
"momu",
"mcvu",
"snv",
"snh",
"snvm",
"snhm",
"s1vt",
"s1vb",
"s1hr",
"s1hl",
"s2vt",
"s2vb",
"s2hr",
"s2hl",
)
META_UB_MATRIX = ("ub1j", "ub2j", "ub3j")
CCL_FIRST_LINE = (
# the first element is `scan_number`, which we don't save to metadata
("h_index", float),
("k_index", float),
("l_index", float),
)
CCL_FIRST_LINE_BI = (
*CCL_FIRST_LINE,
("twotheta_angle", float),
("omega_angle", float),
("chi_angle", float),
("phi_angle", float),
)
CCL_FIRST_LINE_NB = (
*CCL_FIRST_LINE,
("gamma_angle", float),
("omega_angle", float),
("nu_angle", float),
("unkwn_angle", float),
)
CCL_SECOND_LINE = (
("number_of_measurements", int),
("angle_step", float),
("monitor", float),
("temperature", float),
("mag_field", float),
("date", str),
("time", str),
("scan_type", str),
)
def load_1D(filepath):
"""
Loads *.ccl or *.dat file (Distinguishes them based on last 3 chars in string of filepath
to add more variables to read, extend the elif list
the file must include '#data' and number of points in right place to work properly
:arg filepath
:returns det_variables
- dictionary of all detector/scan variables and dictinionary for every scan.
Names of these dictionaries are M + scan number. They include HKL indeces, angles,
monitors, stepsize and array of counts
"""
with open(filepath, "r") as infile:
_, ext = os.path.splitext(filepath)
det_variables = parse_1D(infile, data_type=ext)
return det_variables
def parse_1D(fileobj, data_type):
# read metadata
metadata = {}
for line in fileobj:
if "=" in line:
variable, value = line.split("=")
variable = variable.strip()
if variable in META_VARS_FLOAT:
metadata[variable] = float(value)
elif variable in META_VARS_STR:
metadata[variable] = str(value)[:-1].strip()
elif variable in META_UB_MATRIX:
metadata[variable] = re.findall(r"[-+]?\d*\.\d+|\d+", str(value))
if "#data" in line:
# this is the end of metadata and the start of data section
break
# read data
scan = {}
if data_type == ".ccl":
decimal = list()
if metadata["zebra_mode"] == "bi":
ccl_first_line = CCL_FIRST_LINE_BI
elif metadata["zebra_mode"] == "nb":
ccl_first_line = CCL_FIRST_LINE_NB
ccl_second_line = CCL_SECOND_LINE
for line in fileobj:
d = {}
# first line
scan_number, *params = line.split()
for param, (param_name, param_type) in zip(params, ccl_first_line):
d[param_name] = param_type(param)
decimal.append(bool(Decimal(d["h_index"]) % 1 == 0))
decimal.append(bool(Decimal(d["k_index"]) % 1 == 0))
decimal.append(bool(Decimal(d["l_index"]) % 1 == 0))
# second line
next_line = next(fileobj)
params = next_line.split()
for param, (param_name, param_type) in zip(params, ccl_second_line):
d[param_name] = param_type(param)
d["om"] = np.linspace(
d["omega_angle"] - (d["number_of_measurements"] / 2) * d["angle_step"],
d["omega_angle"] + (d["number_of_measurements"] / 2) * d["angle_step"],
d["number_of_measurements"],
)
# subsequent lines with counts
counts = []
while len(counts) < d["number_of_measurements"]:
counts.extend(map(int, next(fileobj).split()))
d["Counts"] = counts
scan[int(scan_number)] = d
if all(decimal):
metadata["indices"] = "hkl"
else:
metadata["indices"] = "real"
elif data_type == ".dat":
# skip the first 2 rows, the third row contans the column names
next(fileobj)
next(fileobj)
col_names = next(fileobj).split()
data_cols = defaultdict(list)
for line in fileobj:
if "END-OF-DATA" in line:
# this is the end of data
break
for name, val in zip(col_names, line.split()):
data_cols[name].append(float(val))
try:
data_cols["h_index"] = float(metadata["title"].split()[-3])
data_cols["k_index"] = float(metadata["title"].split()[-2])
data_cols["l_index"] = float(metadata["title"].split()[-1])
except (ValueError, IndexError):
print("seems hkl is not in title")
data_cols["temperature"] = metadata["temp"]
data_cols["mag_field"] = metadata["mf"]
data_cols["omega_angle"] = metadata["omega"]
data_cols["number_of_measurements"] = len(data_cols["om"])
data_cols["monitor"] = data_cols["Monitor1"][0]
data_cols["twotheta_angle"] = metadata["2-theta"]
data_cols["chi_angle"] = metadata["chi"]
data_cols["phi_angle"] = metadata["phi"]
data_cols["nu_angle"] = metadata["nu"]
scan[1] = dict(data_cols)
else:
print("Unknown file extention")
# utility information
metadata["data_type"] = data_type
metadata["area_method"] = "fit"
return {"meta": metadata, "scan": scan}

View File

@ -0,0 +1,202 @@
from load_1D import load_1D
from ccl_dict_operation import add_dict
import pandas as pd
from mpl_toolkits.mplot3d import Axes3D # dont delete, otherwise waterfall wont work
import matplotlib.pyplot as plt
import matplotlib as mpl
import numpy as np
import pickle
import scipy.io as sio
def load_dats(filepath):
"""reads the txt file, get headers and data
:arg filepath to txt file or list of filepaths to the files
:return ccl like dictionary"""
if isinstance(filepath, str):
data_type = "txt"
file_list = list()
with open(filepath, "r") as infile:
col_names = next(infile).split(",")
col_names = [col_names[i].rstrip() for i in range(len(col_names))]
for line in infile:
if "END" in line:
break
file_list.append(tuple(line.split(",")))
elif isinstance(filepath, list):
data_type = "list"
file_list = filepath
dict1 = {}
for i in range(len(file_list)):
if not dict1:
if data_type == "txt":
dict1 = load_1D(file_list[0][0])
else:
dict1 = load_1D(file_list[0])
else:
if data_type == "txt":
dict1 = add_dict(dict1, load_1D(file_list[i][0]))
else:
dict1 = add_dict(dict1, load_1D(file_list[i]))
dict1["scan"][i + 1]["params"] = {}
if data_type == "txt":
for x in range(len(col_names) - 1):
dict1["scan"][i + 1]["params"][col_names[x + 1]] = file_list[i][x + 1]
return dict1
def create_dataframe(dict1):
"""Creates pandas dataframe from the dictionary
:arg ccl like dictionary
:return pandas dataframe"""
# create dictionary to which we pull only wanted items before transforming it to pd.dataframe
pull_dict = {}
pull_dict["filenames"] = list()
for key in dict1["scan"][1]["params"]:
pull_dict[key] = list()
pull_dict["temperature"] = list()
pull_dict["mag_field"] = list()
pull_dict["fit_area"] = list()
pull_dict["int_area"] = list()
pull_dict["om"] = list()
pull_dict["Counts"] = list()
# populate the dict
for keys in dict1["scan"]:
if "file_of_origin" in dict1["scan"][keys]:
pull_dict["filenames"].append(dict1["scan"][keys]["file_of_origin"].split("/")[-1])
else:
pull_dict["filenames"].append(dict1["meta"]["original_filename"].split("/")[-1])
for key in dict1["scan"][keys]["params"]:
pull_dict[str(key)].append(float(dict1["scan"][keys]["params"][key]))
pull_dict["temperature"].append(dict1["scan"][keys]["temperature"])
pull_dict["mag_field"].append(dict1["scan"][keys]["mag_field"])
pull_dict["fit_area"].append(dict1["scan"][keys]["fit"]["fit_area"])
pull_dict["int_area"].append(dict1["scan"][keys]["fit"]["int_area"])
pull_dict["om"].append(dict1["scan"][keys]["om"])
pull_dict["Counts"].append(dict1["scan"][keys]["Counts"])
return pd.DataFrame(data=pull_dict)
def sort_dataframe(dataframe, sorting_parameter):
"""sorts the data frame and resets index"""
data = dataframe.sort_values(by=sorting_parameter)
data = data.reset_index(drop=True)
return data
def make_graph(data, sorting_parameter, style):
"""Makes the graph from the data based on style and sorting parameter
:arg data : pandas dataframe with data after sorting
:arg sorting_parameter to pull the correct variable and name
:arg style of the graph - waterfall, scatter, heatmap
:return matplotlib figure"""
if style == "waterfall":
mpl.rcParams["legend.fontsize"] = 10
fig = plt.figure()
ax = fig.gca(projection="3d")
for i in range(len(data)):
x = data["om"][i]
z = data["Counts"][i]
yy = [data[sorting_parameter][i]] * len(x)
ax.plot(x, yy, z, label=str("%s = %f" % (sorting_parameter, yy[i])))
ax.legend()
ax.set_xlabel("Omega")
ax.set_ylabel(sorting_parameter)
ax.set_zlabel("counts")
elif style == "scatter":
fig = plt.figure()
plt.errorbar(
data[sorting_parameter],
[data["fit_area"][i].n for i in range(len(data["fit_area"]))],
[data["fit_area"][i].s for i in range(len(data["fit_area"]))],
capsize=5,
ecolor="green",
)
plt.xlabel(str(sorting_parameter))
plt.ylabel("Intesity")
elif style == "heat":
new_om = list()
for i in range(len(data)):
new_om = np.append(new_om, np.around(data["om"][i], 2), axis=0)
unique_om = np.unique(new_om)
color_matrix = np.zeros(shape=(len(data), len(unique_om)))
for i in range(len(data)):
for j in range(len(data["om"][i])):
if np.around(data["om"][i][j], 2) in np.unique(new_om):
color_matrix[i, j] = data["Counts"][i][j]
else:
continue
fig = plt.figure()
plt.pcolormesh(unique_om, data[sorting_parameter], color_matrix, shading="gouraud")
plt.xlabel("omega")
plt.ylabel(sorting_parameter)
plt.colorbar()
plt.clim(color_matrix.mean(), color_matrix.max())
return fig
def save_dict(obj, name):
""" saves dictionary as pickle file in binary format
:arg obj - object to save
:arg name - name of the file
NOTE: path should be added later"""
with open(name + ".pkl", "wb") as f:
pickle.dump(obj, f, pickle.HIGHEST_PROTOCOL)
def load_dict(name):
"""load dictionary from picle file
:arg name - name of the file to load
NOTE: expect the file in the same folder, path should be added later
:return dictionary"""
with open(name + ".pkl", "rb") as f:
return pickle.load(f)
# pickle, mat, h5, txt, csv, json
def save_table(data, filetype, name, path=None):
print("Saving: ", filetype)
path = "" if path is None else path
if filetype == "pickle":
# to work with uncertanities, see uncertanity module
with open(path + name + ".pkl", "wb") as f:
pickle.dump(data, f, pickle.HIGHEST_PROTOCOL)
if filetype == "mat":
# matlab doesent allow some special character to be in var names, also cant start with
# numbers, in need, add some to the romove_character list
data["fit_area_nom"] = [data["fit_area"][i].n for i in range(len(data["fit_area"]))]
data["fit_area_err"] = [data["fit_area"][i].s for i in range(len(data["fit_area"]))]
data["int_area_nom"] = [data["int_area"][i].n for i in range(len(data["int_area"]))]
data["int_area_err"] = [data["int_area"][i].s for i in range(len(data["int_area"]))]
data = data.drop(columns=["fit_area", "int_area"])
remove_characters = [" ", "[", "]", "{", "}", "(", ")"]
for character in remove_characters:
data.columns = [
data.columns[i].replace(character, "") for i in range(len(data.columns))
]
sio.savemat((path + name + ".mat"), {name: col.values for name, col in data.items()})
if filetype == "csv" or "txt":
data["fit_area_nom"] = [data["fit_area"][i].n for i in range(len(data["fit_area"]))]
data["fit_area_err"] = [data["fit_area"][i].s for i in range(len(data["fit_area"]))]
data["int_area_nom"] = [data["int_area"][i].n for i in range(len(data["int_area"]))]
data["int_area_err"] = [data["int_area"][i].s for i in range(len(data["int_area"]))]
data = data.drop(columns=["fit_area", "int_area", "om", "Counts"])
if filetype == "csv":
data.to_csv(path + name + ".csv")
if filetype == "txt":
with open((path + name + ".txt"), "w") as outfile:
data.to_string(outfile)
if filetype == "h5":
hdf = pd.HDFStore((path + name + ".h5"))
hdf.put("data", data)
hdf.close()
if filetype == "json":
data.to_json((path + name + ".json"))

View File

@ -1,12 +1,16 @@
import math import math
import numpy as np import numpy as np
from matplotlib import pyplot as plt
from numba import njit from numba import njit
from scipy.optimize import curve_fit from scipy.optimize import curve_fit
import pyzebra import pyzebra
try:
from matplotlib import pyplot as plt
except ImportError:
print("matplotlib is not available")
pi_r = 180 / np.pi pi_r = 180 / np.pi