diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..46b92fb --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,13 @@ +{ + "version": "0.2.0", + "configurations": [ + { + "name": "pyzebra", + "type": "python", + "request": "launch", + "program": "${workspaceFolder}/pyzebra/cli.py", + "console": "internalConsole", + "env": {}, + }, + ] +} diff --git a/pyzebra/__init__.py b/pyzebra/__init__.py index abd6e90..db37dcf 100644 --- a/pyzebra/__init__.py +++ b/pyzebra/__init__.py @@ -1,3 +1,8 @@ +import pyzebra.ccl_dict_operation from pyzebra.anatric import * +from pyzebra.ccl_findpeaks import ccl_findpeaks +from pyzebra.comm_export import export_comm +from pyzebra.fit2 import fitccl from pyzebra.h5 import * +from pyzebra.load_1D import load_1D, parse_1D from pyzebra.xtal import * diff --git a/pyzebra/app/app.py b/pyzebra/app/app.py index 078c729..35e5bf0 100644 --- a/pyzebra/app/app.py +++ b/pyzebra/app/app.py @@ -1,10 +1,15 @@ import argparse +import logging +import sys +from io import StringIO from bokeh.io import curdoc -from bokeh.models import Tabs +from bokeh.layouts import column, row +from bokeh.models import Tabs, TextAreaInput -import panel_anatric -import panel_data_viewer +import panel_ccl_integrate +import panel_hdf_anatric +import panel_hdf_viewer parser = argparse.ArgumentParser( prog="pyzebra", formatter_class=argparse.ArgumentDefaultsHelpFormatter @@ -15,8 +20,32 @@ args = parser.parse_args() doc = curdoc() doc.title = "pyzebra" -# Final layout -tab_data_viewer = panel_data_viewer.create() -tab_anatric = panel_anatric.create() +sys.stdout = StringIO() +stdout_textareainput = TextAreaInput(title="print output:", height=150) -doc.add_root(Tabs(tabs=[tab_data_viewer, tab_anatric])) +bokeh_stream = StringIO() +bokeh_handler = logging.StreamHandler(bokeh_stream) +bokeh_handler.setFormatter(logging.Formatter(logging.BASIC_FORMAT)) +bokeh_logger = logging.getLogger('bokeh') +bokeh_logger.addHandler(bokeh_handler) +bokeh_log_textareainput = TextAreaInput(title="server output:", height=150) + +# Final layout +tab_hdf_viewer = panel_hdf_viewer.create() +tab_hdf_anatric = panel_hdf_anatric.create() +tab_ccl_integrate = panel_ccl_integrate.create() + +doc.add_root( + column( + Tabs(tabs=[tab_hdf_viewer, tab_hdf_anatric, tab_ccl_integrate]), + row(stdout_textareainput, bokeh_log_textareainput, sizing_mode="scale_both"), + ) +) + + +def update_stdout(): + stdout_textareainput.value = sys.stdout.getvalue() + bokeh_log_textareainput.value = bokeh_stream.getvalue() + + +doc.add_periodic_callback(update_stdout, 1000) diff --git a/pyzebra/app/panel_ccl_integrate.py b/pyzebra/app/panel_ccl_integrate.py new file mode 100644 index 0000000..16e5be5 --- /dev/null +++ b/pyzebra/app/panel_ccl_integrate.py @@ -0,0 +1,548 @@ +import base64 +import io +import os +import tempfile + +import numpy as np +from bokeh.layouts import column, row +from bokeh.models import ( + Asterisk, + BasicTicker, + Button, + ColumnDataSource, + CustomJS, + DataRange1d, + DataTable, + Div, + FileInput, + Grid, + Line, + LinearAxis, + Panel, + Plot, + RadioButtonGroup, + Scatter, + Select, + Spacer, + Span, + Spinner, + TableColumn, + TextAreaInput, + TextInput, + Toggle, + Whisker, +) + +import pyzebra + + +javaScript = """ +setTimeout(function() { + const filename = 'output' + js_data.data['ext'] + const blob = new Blob([js_data.data['cont']], {type: 'text/plain'}) + const link = document.createElement('a'); + document.body.appendChild(link); + const url = window.URL.createObjectURL(blob); + link.href = url; + link.download = filename; + link.click(); + window.URL.revokeObjectURL(url); + document.body.removeChild(link); +}, 500); +""" + +PROPOSAL_PATH = "/afs/psi.ch/project/sinqdata/2020/zebra/" + + +def create(): + det_data = {} + peak_pos_textinput_lock = False + js_data = ColumnDataSource(data=dict(cont=[], ext=[])) + + def proposal_textinput_callback(_attr, _old, new): + ccl_path = os.path.join(PROPOSAL_PATH, new) + ccl_file_list = [] + for file in os.listdir(ccl_path): + if file.endswith(".ccl"): + ccl_file_list.append((os.path.join(ccl_path, file), file)) + ccl_file_select.options = ccl_file_list + ccl_file_select.value = ccl_file_list[0][0] + + proposal_textinput = TextInput(title="Enter proposal number:", default_size=145) + proposal_textinput.on_change("value", proposal_textinput_callback) + + def ccl_file_select_callback(_attr, _old, new): + nonlocal det_data + with open(new) as file: + _, ext = os.path.splitext(new) + det_data = pyzebra.parse_1D(file, ext) + + scan_list = list(det_data["scan"].keys()) + hkl = [ + f'{int(m["h_index"])} {int(m["k_index"])} {int(m["l_index"])}' + for m in det_data["scan"].values() + ] + scan_table_source.data.update( + scan=scan_list, hkl=hkl, peaks=[0] * len(scan_list), fit=[0] * len(scan_list) + ) + scan_table_source.selected.indices = [] + scan_table_source.selected.indices = [0] + + ccl_file_select = Select(title="Available .ccl files") + ccl_file_select.on_change("value", ccl_file_select_callback) + + def upload_button_callback(_attr, _old, new): + nonlocal det_data + with io.StringIO(base64.b64decode(new).decode()) as file: + _, ext = os.path.splitext(upload_button.filename) + det_data = pyzebra.parse_1D(file, ext) + + scan_list = list(det_data["scan"].keys()) + hkl = [ + f'{int(m["h_index"])} {int(m["k_index"])} {int(m["l_index"])}' + for m in det_data["scan"].values() + ] + scan_table_source.data.update( + scan=scan_list, hkl=hkl, peaks=[0] * len(scan_list), fit=[0] * len(scan_list) + ) + scan_table_source.selected.indices = [] + scan_table_source.selected.indices = [0] + + upload_button = FileInput(accept=".ccl") + upload_button.on_change("value", upload_button_callback) + + def _update_table(): + num_of_peaks = [scan.get("num_of_peaks", 0) for scan in det_data["scan"].values()] + fit_ok = [(1 if "fit" in scan else 0) for scan in det_data["scan"].values()] + scan_table_source.data.update(peaks=num_of_peaks, fit=fit_ok) + + def _update_plot(ind): + nonlocal peak_pos_textinput_lock + peak_pos_textinput_lock = True + + scan = det_data["scan"][ind] + y = scan["Counts"] + x = scan["om"] + + plot_scatter_source.data.update(x=x, y=y, y_upper=y + np.sqrt(y), y_lower=y - np.sqrt(y)) + + num_of_peaks = scan.get("num_of_peaks") + if num_of_peaks is not None and num_of_peaks > 0: + peak_indexes = scan["peak_indexes"] + if len(peak_indexes) == 1: + peak_pos_textinput.value = str(scan["om"][peak_indexes[0]]) + else: + peak_pos_textinput.value = str([scan["om"][ind] for ind in peak_indexes]) + + plot_peak_source.data.update(x=scan["om"][peak_indexes], y=scan["peak_heights"]) + plot_line_smooth_source.data.update(x=x, y=scan["smooth_peaks"]) + else: + peak_pos_textinput.value = None + plot_peak_source.data.update(x=[], y=[]) + plot_line_smooth_source.data.update(x=[], y=[]) + + peak_pos_textinput_lock = False + + fit = scan.get("fit") + if fit is not None: + plot_gauss_source.data.update(x=x, y=scan["fit"]["comps"]["gaussian"]) + plot_bkg_source.data.update(x=x, y=scan["fit"]["comps"]["background"]) + params = fit["result"].params + fit_output_textinput.value = ( + "%s \n" + "Gaussian: centre = %9.4f, sigma = %9.4f, area = %9.4f \n" + "background: slope = %9.4f, intercept = %9.4f \n" + "Int. area = %9.4f +/- %9.4f \n" + "fit area = %9.4f +/- %9.4f \n" + "ratio((fit-int)/fit) = %9.4f" + % ( + ind, + params["g_cen"].value, + params["g_width"].value, + params["g_amp"].value, + params["slope"].value, + params["intercept"].value, + fit["int_area"].n, + fit["int_area"].s, + params["g_amp"].value, + params["g_amp"].stderr, + (params["g_amp"].value - fit["int_area"].n) / params["g_amp"].value, + ) + ) + numfit_min, numfit_max = fit["numfit"] + if numfit_min is None: + numfit_min_span.location = None + else: + numfit_min_span.location = x[numfit_min] + + if numfit_max is None: + numfit_max_span.location = None + else: + numfit_max_span.location = x[numfit_max] + + else: + plot_gauss_source.data.update(x=[], y=[]) + plot_bkg_source.data.update(x=[], y=[]) + fit_output_textinput.value = "" + numfit_min_span.location = None + numfit_max_span.location = None + + # Main plot + plot = Plot( + x_range=DataRange1d(), + y_range=DataRange1d(), + plot_height=400, + plot_width=700, + toolbar_location=None, + ) + + plot.add_layout(LinearAxis(axis_label="Counts"), place="left") + plot.add_layout(LinearAxis(axis_label="Omega"), place="below") + + plot.add_layout(Grid(dimension=0, ticker=BasicTicker())) + plot.add_layout(Grid(dimension=1, ticker=BasicTicker())) + + plot_scatter_source = ColumnDataSource(dict(x=[0], y=[0], y_upper=[0], y_lower=[0])) + plot.add_glyph(plot_scatter_source, Scatter(x="x", y="y", line_color="steelblue")) + plot.add_layout(Whisker(source=plot_scatter_source, base="x", upper="y_upper", lower="y_lower")) + + plot_line_smooth_source = ColumnDataSource(dict(x=[0], y=[0])) + plot.add_glyph( + plot_line_smooth_source, Line(x="x", y="y", line_color="steelblue", line_dash="dashed") + ) + + plot_gauss_source = ColumnDataSource(dict(x=[0], y=[0])) + plot.add_glyph(plot_gauss_source, Line(x="x", y="y", line_color="red", line_dash="dashed")) + + plot_bkg_source = ColumnDataSource(dict(x=[0], y=[0])) + plot.add_glyph(plot_bkg_source, Line(x="x", y="y", line_color="green", line_dash="dashed")) + + plot_peak_source = ColumnDataSource(dict(x=[], y=[])) + plot.add_glyph(plot_peak_source, Asterisk(x="x", y="y", size=10, line_color="red")) + + numfit_min_span = Span(location=None, dimension="height", line_dash="dashed") + plot.add_layout(numfit_min_span) + + numfit_max_span = Span(location=None, dimension="height", line_dash="dashed") + plot.add_layout(numfit_max_span) + + # Scan select + def scan_table_callback(_attr, _old, new): + if new: + _update_plot(scan_table_source.data["scan"][new[-1]]) + + scan_table_source = ColumnDataSource(dict(scan=[], hkl=[], peaks=[], fit=[])) + scan_table = DataTable( + source=scan_table_source, + columns=[ + TableColumn(field="scan", title="scan"), + TableColumn(field="hkl", title="hkl"), + TableColumn(field="peaks", title="Peaks"), + TableColumn(field="fit", title="Fit"), + ], + width=200, + index_position=None, + ) + + scan_table_source.selected.on_change("indices", scan_table_callback) + + def peak_pos_textinput_callback(_attr, _old, new): + if new is not None and not peak_pos_textinput_lock: + sel_ind = scan_table_source.selected.indices[-1] + scan_name = scan_table_source.data["scan"][sel_ind] + scan = det_data["scan"][scan_name] + + scan["num_of_peaks"] = 1 + peak_ind = (np.abs(scan["om"] - float(new))).argmin() + scan["peak_indexes"] = np.array([peak_ind], dtype=np.int64) + scan["peak_heights"] = np.array([scan["smooth_peaks"][peak_ind]]) + _update_table() + _update_plot(scan_name) + + peak_pos_textinput = TextInput(title="Peak position:", default_size=145) + peak_pos_textinput.on_change("value", peak_pos_textinput_callback) + + peak_int_ratio_spinner = Spinner( + title="Peak intensity ratio:", value=0.8, step=0.01, low=0, high=1, default_size=145 + ) + peak_prominence_spinner = Spinner(title="Peak prominence:", value=50, low=0, default_size=145) + smooth_toggle = Toggle(label="Smooth curve", default_size=145) + window_size_spinner = Spinner(title="Window size:", value=7, step=2, low=1, default_size=145) + poly_order_spinner = Spinner(title="Poly order:", value=3, low=0, default_size=145) + + centre_guess = Spinner(default_size=100) + centre_vary = Toggle(default_size=100, active=True) + centre_min = Spinner(default_size=100) + centre_max = Spinner(default_size=100) + sigma_guess = Spinner(default_size=100) + sigma_vary = Toggle(default_size=100, active=True) + sigma_min = Spinner(default_size=100) + sigma_max = Spinner(default_size=100) + ampl_guess = Spinner(default_size=100) + ampl_vary = Toggle(default_size=100, active=True) + ampl_min = Spinner(default_size=100) + ampl_max = Spinner(default_size=100) + slope_guess = Spinner(default_size=100) + slope_vary = Toggle(default_size=100, active=True) + slope_min = Spinner(default_size=100) + slope_max = Spinner(default_size=100) + offset_guess = Spinner(default_size=100) + offset_vary = Toggle(default_size=100, active=True) + offset_min = Spinner(default_size=100) + offset_max = Spinner(default_size=100) + integ_from = Spinner(title="Integrate from:", default_size=145) + integ_to = Spinner(title="to:", default_size=145) + + def fitparam_reset_button_callback(): + centre_guess.value = None + centre_vary.active = True + centre_min.value = None + centre_max.value = None + sigma_guess.value = None + sigma_vary.active = True + sigma_min.value = None + sigma_max.value = None + ampl_guess.value = None + ampl_vary.active = True + ampl_min.value = None + ampl_max.value = None + slope_guess.value = None + slope_vary.active = True + slope_min.value = None + slope_max.value = None + offset_guess.value = None + offset_vary.active = True + offset_min.value = None + offset_max.value = None + integ_from.value = None + integ_to.value = None + + fitparam_reset_button = Button(label="Reset to defaults", default_size=145) + fitparam_reset_button.on_click(fitparam_reset_button_callback) + + fit_output_textinput = TextAreaInput(title="Fit results:", width=450, height=400) + + def peakfind_all_button_callback(): + for scan in det_data["scan"].values(): + pyzebra.ccl_findpeaks( + scan, + int_threshold=peak_int_ratio_spinner.value, + prominence=peak_prominence_spinner.value, + smooth=smooth_toggle.active, + window_size=window_size_spinner.value, + poly_order=poly_order_spinner.value, + ) + + _update_table() + + sel_ind = scan_table_source.selected.indices[-1] + _update_plot(scan_table_source.data["scan"][sel_ind]) + + peakfind_all_button = Button(label="Peak Find All", button_type="primary", default_size=145) + peakfind_all_button.on_click(peakfind_all_button_callback) + + def peakfind_button_callback(): + sel_ind = scan_table_source.selected.indices[-1] + scan = scan_table_source.data["scan"][sel_ind] + pyzebra.ccl_findpeaks( + det_data["scan"][scan], + int_threshold=peak_int_ratio_spinner.value, + prominence=peak_prominence_spinner.value, + smooth=smooth_toggle.active, + window_size=window_size_spinner.value, + poly_order=poly_order_spinner.value, + ) + + _update_table() + _update_plot(scan) + + peakfind_button = Button(label="Peak Find Current", default_size=145) + peakfind_button.on_click(peakfind_button_callback) + + def fit_all_button_callback(): + for scan in det_data["scan"].values(): + pyzebra.fitccl( + scan, + guess=[ + centre_guess.value, + sigma_guess.value, + ampl_guess.value, + slope_guess.value, + offset_guess.value, + ], + vary=[ + centre_vary.active, + sigma_vary.active, + ampl_vary.active, + slope_vary.active, + offset_vary.active, + ], + constraints_min=[ + centre_min.value, + sigma_min.value, + ampl_min.value, + slope_min.value, + offset_min.value, + ], + constraints_max=[ + centre_max.value, + sigma_max.value, + ampl_max.value, + slope_max.value, + offset_max.value, + ], + numfit_min=integ_from.value, + numfit_max=integ_to.value, + ) + + sel_ind = scan_table_source.selected.indices[-1] + _update_plot(scan_table_source.data["scan"][sel_ind]) + _update_table() + + fit_all_button = Button(label="Fit All", button_type="primary", default_size=145) + fit_all_button.on_click(fit_all_button_callback) + + def fit_button_callback(): + sel_ind = scan_table_source.selected.indices[-1] + scan = scan_table_source.data["scan"][sel_ind] + + pyzebra.fitccl( + det_data["scan"][scan], + guess=[ + centre_guess.value, + sigma_guess.value, + ampl_guess.value, + slope_guess.value, + offset_guess.value, + ], + vary=[ + centre_vary.active, + sigma_vary.active, + ampl_vary.active, + slope_vary.active, + offset_vary.active, + ], + constraints_min=[ + centre_min.value, + sigma_min.value, + ampl_min.value, + slope_min.value, + offset_min.value, + ], + constraints_max=[ + centre_max.value, + sigma_max.value, + ampl_max.value, + slope_max.value, + offset_max.value, + ], + numfit_min=integ_from.value, + numfit_max=integ_to.value, + ) + + _update_plot(scan) + _update_table() + + fit_button = Button(label="Fit Current", default_size=145) + fit_button.on_click(fit_button_callback) + + def area_method_radiobutton_callback(_attr, _old, new): + det_data["meta"]["area_method"] = ("fit", "integ")[new] + + area_method_radiobutton = RadioButtonGroup( + labels=["Fit", "Integral"], active=0, default_size=145 + ) + area_method_radiobutton.on_change("active", area_method_radiobutton_callback) + + preview_output_textinput = TextAreaInput(title="Export file preview:", width=450, height=400) + + def preview_output_button_callback(): + if det_data["meta"]["indices"] == "hkl": + ext = ".comm" + elif det_data["meta"]["indices"] == "real": + ext = ".incomm" + + with tempfile.TemporaryDirectory() as temp_dir: + temp_file = temp_dir + "/temp" + pyzebra.export_comm(det_data, temp_file) + + with open(f"{temp_file}{ext}") as f: + preview_output_textinput.value = f.read() + + preview_output_button = Button(label="Preview file", default_size=220) + preview_output_button.on_click(preview_output_button_callback) + + def export_results(det_data): + if det_data["meta"]["indices"] == "hkl": + ext = ".comm" + elif det_data["meta"]["indices"] == "real": + ext = ".incomm" + + with tempfile.TemporaryDirectory() as temp_dir: + temp_file = temp_dir + "/temp" + pyzebra.export_comm(det_data, temp_file) + + with open(f"{temp_file}{ext}") as f: + output_content = f.read() + + return output_content, ext + + def save_button_callback(): + cont, ext = export_results(det_data) + js_data.data.update(cont=[cont], ext=[ext]) + + save_button = Button(label="Download file", button_type="success", default_size=220) + save_button.on_click(save_button_callback) + save_button.js_on_click(CustomJS(args={"js_data": js_data}, code=javaScript)) + + findpeak_controls = column( + row(peak_pos_textinput, column(Spacer(height=19), smooth_toggle)), + row(peak_int_ratio_spinner, peak_prominence_spinner), + row(window_size_spinner, poly_order_spinner), + row(peakfind_button, peakfind_all_button), + ) + + div_1 = Div(text="Guess:") + div_2 = Div(text="Vary:") + div_3 = Div(text="Min:") + div_4 = Div(text="Max:") + div_5 = Div(text="Gauss Centre:", margin=[5, 5, -5, 5]) + div_6 = Div(text="Gauss Sigma:", margin=[5, 5, -5, 5]) + div_7 = Div(text="Gauss Ampl.:", margin=[5, 5, -5, 5]) + div_8 = Div(text="Slope:", margin=[5, 5, -5, 5]) + div_9 = Div(text="Offset:", margin=[5, 5, -5, 5]) + fitpeak_controls = row( + column( + Spacer(height=36), + div_1, + Spacer(height=12), + div_2, + Spacer(height=12), + div_3, + Spacer(height=12), + div_4, + ), + column(div_5, centre_guess, centre_vary, centre_min, centre_max), + column(div_6, sigma_guess, sigma_vary, sigma_min, sigma_max), + column(div_7, ampl_guess, ampl_vary, ampl_min, ampl_max), + column(div_8, slope_guess, slope_vary, slope_min, slope_max), + column(div_9, offset_guess, offset_vary, offset_min, offset_max), + Spacer(width=20), + column( + row(integ_from, integ_to), + row(fitparam_reset_button, area_method_radiobutton), + row(fit_button, fit_all_button), + ), + ) + + export_layout = column(preview_output_textinput, row(preview_output_button, save_button)) + + upload_div = Div(text="Or upload .ccl file:") + tab_layout = column( + row(proposal_textinput, ccl_file_select), + row(column(Spacer(height=5), upload_div), upload_button), + row(scan_table, plot, Spacer(width=30), fit_output_textinput, export_layout), + row(findpeak_controls, Spacer(width=30), fitpeak_controls), + ) + + return Panel(child=tab_layout, title="ccl integrate") diff --git a/pyzebra/app/panel_anatric.py b/pyzebra/app/panel_hdf_anatric.py similarity index 99% rename from pyzebra/app/panel_anatric.py rename to pyzebra/app/panel_hdf_anatric.py index 77a019c..5023161 100644 --- a/pyzebra/app/panel_anatric.py +++ b/pyzebra/app/panel_hdf_anatric.py @@ -406,4 +406,4 @@ def create(): curdoc().add_periodic_callback(update_config, 1000) - return Panel(child=tab_layout, title="Anatric") + return Panel(child=tab_layout, title="hdf anatric") diff --git a/pyzebra/app/panel_data_viewer.py b/pyzebra/app/panel_hdf_viewer.py similarity index 96% rename from pyzebra/app/panel_data_viewer.py rename to pyzebra/app/panel_hdf_viewer.py index 92f6c32..2ff09d9 100644 --- a/pyzebra/app/panel_data_viewer.py +++ b/pyzebra/app/panel_hdf_viewer.py @@ -381,7 +381,7 @@ def create(): overview_plot_x_image_glyph.color_mapper = LinearColorMapper(palette=cmap_dict[new]) overview_plot_y_image_glyph.color_mapper = LinearColorMapper(palette=cmap_dict[new]) - colormap = Select(title="Colormap:", options=list(cmap_dict.keys())) + colormap = Select(title="Colormap:", options=list(cmap_dict.keys()), default_size=145) colormap.on_change("value", colormap_callback) colormap.value = "plasma" @@ -400,7 +400,7 @@ def create(): update_image() - auto_toggle = Toggle(label="Auto Range", active=True, button_type="default") + auto_toggle = Toggle(label="Auto Range", active=True, button_type="default", default_size=145) auto_toggle.on_click(auto_toggle_callback) # ---- colormap display max value @@ -414,6 +414,7 @@ def create(): value=1, step=STEP, disabled=auto_toggle.active, + default_size=145, ) display_max_spinner.on_change("value", display_max_spinner_callback) @@ -428,6 +429,7 @@ def create(): value=0, step=STEP, disabled=auto_toggle.active, + default_size=145, ) display_min_spinner.on_change("value", display_min_spinner_callback) @@ -470,13 +472,22 @@ def create(): temperature_spinner = Spinner(title="Temperature:", format="0.00", width=145, disabled=True) # Final layout - layout_image = column( - gridplot([[proj_v, None], [plot, proj_h]], merge_tools=False), row(index_spinner) + layout_image = column(gridplot([[proj_v, None], [plot, proj_h]], merge_tools=False)) + colormap_layout = column( + row(colormap, column(Spacer(height=19), auto_toggle)), + row(display_max_spinner, display_min_spinner), ) - colormap_layout = column(colormap, auto_toggle, display_max_spinner, display_min_spinner) hkl_layout = column(radio_button_group, hkl_button) params_layout = row(magnetic_field_spinner, temperature_spinner) + layout_controls = row( + column(selection_button, selection_list), + Spacer(width=20), + column(frame_button_group, colormap_layout), + Spacer(width=20), + column(index_spinner, params_layout, hkl_layout), + ) + layout_overview = column( gridplot( [[overview_plot_x, overview_plot_y]], @@ -491,12 +502,12 @@ def create(): column( row(column(Spacer(height=5), upload_div), upload_button, filelist), layout_overview, - row(frame_button_group, selection_button, selection_list), + layout_controls, ), - column(roi_avg_plot, layout_image, row(colormap_layout, column(params_layout, hkl_layout))), + column(roi_avg_plot, layout_image), ) - return Panel(child=tab_layout, title="Data Viewer") + return Panel(child=tab_layout, title="hdf viewer") def calculate_hkl(det_data, index, setup_type="nb_bi"): diff --git a/pyzebra/app/save_load_dict.py b/pyzebra/app/save_load_dict.py deleted file mode 100644 index 5e31442..0000000 --- a/pyzebra/app/save_load_dict.py +++ /dev/null @@ -1,19 +0,0 @@ -import pickle - - -def save_dict(obj, name): - """ saves dictionary as pickle file in binary format - :arg obj - object to save - :arg name - name of the file - NOTE: path should be added later""" - with open(name + '.pkl', 'wb') as f: - pickle.dump(obj, f, pickle.HIGHEST_PROTOCOL) - - -def load_dict(name): - """load dictionary from picle file - :arg name - name of the file to load - NOTE: expect the file in the same folder, path should be added later - :return dictionary""" - with open(name + '.pkl', 'rb') as f: - return pickle.load(f) diff --git a/pyzebra/ccl_dict_operation.py b/pyzebra/ccl_dict_operation.py new file mode 100644 index 0000000..7d3b40d --- /dev/null +++ b/pyzebra/ccl_dict_operation.py @@ -0,0 +1,513 @@ +import numpy as np +import uncertainties as u + +from .fit2 import create_uncertanities + + +def add_dict(dict1, dict2): + """adds two dictionaries, meta of the new is saved as meata+original_filename and + measurements are shifted to continue with numbering of first dict + :arg dict1 : dictionarry to add to + :arg dict2 : dictionarry from which to take the measurements + :return dict1 : combined dictionary + Note: dict1 must be made from ccl, otherwise we would have to change the structure of loaded + dat file""" + max_measurement_dict1 = max([int(str(keys)[1:]) for keys in dict1["scan"]]) + if dict2["meta"]["data_type"] == ".ccl": + new_filenames = [ + "M" + str(x + max_measurement_dict1) + for x in [int(str(keys)[1:]) for keys in dict2["scan"]] + ] + new_meta_name = "meta" + str(dict2["meta"]["original_filename"]) + if new_meta_name not in dict1: + for keys, name in zip(dict2["scan"], new_filenames): + dict2["scan"][keys]["file_of_origin"] = str(dict2["meta"]["original_filename"]) + dict1["scan"][name] = dict2["scan"][keys] + + dict1[new_meta_name] = dict2["meta"] + + else: + raise KeyError( + str( + "The file %s has alredy been added to %s" + % (dict2["meta"]["original_filename"], dict1["meta"]["original_filename"]) + ) + ) + elif dict2["meta"]["data_type"] == ".dat": + d = {} + new_name = "M" + str(max_measurement_dict1 + 1) + hkl = dict2["meta"]["title"] + d["h_index"] = float(hkl.split()[-3]) + d["k_index"] = float(hkl.split()[-2]) + d["l_index"] = float(hkl.split()[-1]) + d["number_of_measurements"] = len(dict2["scan"]["NP"]) + d["om"] = dict2["scan"]["om"] + d["Counts"] = dict2["scan"]["Counts"] + d["monitor"] = dict2["scan"]["Monitor1"][0] + d["temperature"] = dict2["meta"]["temp"] + d["mag_field"] = dict2["meta"]["mf"] + d["omega_angle"] = dict2["meta"]["omega"] + dict1["scan"][new_name] = d + print(hkl.split()) + for keys in d: + print(keys) + + print("s") + + return dict1 + + +def auto(dict): + """takes just unique tuples from all tuples in dictionary returend by scan_dict + intendet for automatic merge if you doesent want to specify what scans to merge together + args: dict - dictionary from scan_dict function + :return dict - dict without repetitions""" + for keys in dict: + tuple_list = dict[keys] + new = list() + for i in range(len(tuple_list)): + if tuple_list[0][0] == tuple_list[i][0]: + new.append(tuple_list[i]) + dict[keys] = new + return dict + + +def scan_dict(dict): + """scans dictionary for duplicate hkl indexes + :arg dict : dictionary to scan + :return dictionary with matching scans, if there are none, the dict is empty + note: can be checked by "not d", true if empty + """ + + d = {} + for i in dict["scan"]: + for j in dict["scan"]: + if dict["scan"][str(i)] != dict["scan"][str(j)]: + itup = ( + dict["scan"][str(i)]["h_index"], + dict["scan"][str(i)]["k_index"], + dict["scan"][str(i)]["l_index"], + ) + jtup = ( + dict["scan"][str(j)]["h_index"], + dict["scan"][str(j)]["k_index"], + dict["scan"][str(j)]["l_index"], + ) + if itup != jtup: + pass + else: + + if str(itup) not in d: + d[str(itup)] = list() + d[str(itup)].append((i, j)) + else: + d[str(itup)].append((i, j)) + else: + continue + return d + + +def compare_hkl(dict1, dict2): + """Compares two dictionaries based on hkl indexes and return dictionary with str(h k l) as + key and tuple with keys to same scan in dict1 and dict2 + :arg dict1 : first dictionary + :arg dict2 : second dictionary + :return d : dict with matches + example of one key: '0.0 0.0 -1.0 : ('M1', 'M9')' meaning that 001 hkl scan is M1 in + first dict and M9 in second""" + d = {} + dupl = 0 + for keys in dict1["scan"]: + for key in dict2["scan"]: + if ( + dict1["scan"][str(keys)]["h_index"] == dict2["scan"][str(key)]["h_index"] + and dict1["scan"][str(keys)]["k_index"] == dict2["scan"][str(key)]["k_index"] + and dict1["scan"][str(keys)]["l_index"] == dict2["scan"][str(key)]["l_index"] + ): + + if ( + str( + ( + str(dict1["scan"][str(keys)]["h_index"]) + + " " + + str(dict1["scan"][str(keys)]["k_index"]) + + " " + + str(dict1["scan"][str(keys)]["l_index"]) + ) + ) + not in d + ): + d[ + str( + str(dict1["scan"][str(keys)]["h_index"]) + + " " + + str(dict1["scan"][str(keys)]["k_index"]) + + " " + + str(dict1["scan"][str(keys)]["l_index"]) + ) + ] = (str(keys), str(key)) + else: + dupl = dupl + 1 + d[ + str( + str(dict1["scan"][str(keys)]["h_index"]) + + " " + + str(dict1["scan"][str(keys)]["k_index"]) + + " " + + str(dict1["scan"][str(keys)]["l_index"]) + + "_dupl" + + str(dupl) + ) + ] = (str(keys), str(key)) + else: + continue + + return d + + +def create_tuples(x, y, y_err): + """creates tuples for sorting and merginng of the data + Counts need to be normalized to monitor before""" + t = list() + for i in range(len(x)): + tup = (x[i], y[i], y_err[i]) + t.append(tup) + return t + + +def normalize(dict, key, monitor): + """Normalizes the scan to monitor, checks if sigma exists, otherwise creates it + :arg dict : dictionary to from which to tkae the scan + :arg key : which scan to normalize from dict1 + :arg monitor : final monitor + :return counts - normalized counts + :return sigma - normalized sigma""" + + counts = np.array(dict["scan"][key]["Counts"]) + sigma = np.sqrt(counts) if "sigma" not in dict["scan"][key] else dict["scan"][key]["sigma"] + monitor_ratio = monitor / dict["scan"][key]["monitor"] + scaled_counts = counts * monitor_ratio + scaled_sigma = np.array(sigma) * monitor_ratio + + return scaled_counts, scaled_sigma + + +def merge(dict1, dict2, keys, auto=True, monitor=100000): + """merges the two tuples and sorts them, if om value is same, Counts value is average + averaging is propagated into sigma if dict1 == dict2, key[1] is deleted after merging + :arg dict1 : dictionary to which scan will be merged + :arg dict2 : dictionary from which scan will be merged + :arg keys : tuple with key to dict1 and dict2 + :arg auto : if true, when monitors are same, does not change it, if flase, takes monitor always + :arg monitor : final monitor after merging + note: dict1 and dict2 can be same dict + :return dict1 with merged scan""" + if auto: + if dict1["scan"][keys[0]]["monitor"] == dict2["scan"][keys[1]]["monitor"]: + monitor = dict1["scan"][keys[0]]["monitor"] + + # load om and Counts + x1, x2 = dict1["scan"][keys[0]]["om"], dict2["scan"][keys[1]]["om"] + cor_y1, y_err1 = normalize(dict1, keys[0], monitor=monitor) + cor_y2, y_err2 = normalize(dict2, keys[1], monitor=monitor) + # creates touples (om, Counts, sigma) for sorting and further processing + tuple_list = create_tuples(x1, cor_y1, y_err1) + create_tuples(x2, cor_y2, y_err2) + # Sort the list on om and add 0 0 0 tuple to the last position + sorted_t = sorted(tuple_list, key=lambda tup: tup[0]) + sorted_t.append((0, 0, 0)) + om, Counts, sigma = [], [], [] + seen = list() + for i in range(len(sorted_t) - 1): + if sorted_t[i][0] not in seen: + if sorted_t[i][0] != sorted_t[i + 1][0]: + om = np.append(om, sorted_t[i][0]) + Counts = np.append(Counts, sorted_t[i][1]) + sigma = np.append(sigma, sorted_t[i][2]) + else: + om = np.append(om, sorted_t[i][0]) + counts1, counts2 = sorted_t[i][1], sorted_t[i + 1][1] + sigma1, sigma2 = sorted_t[i][2], sorted_t[i + 1][2] + count_err1 = u.ufloat(counts1, sigma1) + count_err2 = u.ufloat(counts2, sigma2) + avg = (count_err1 + count_err2) / 2 + Counts = np.append(Counts, avg.n) + sigma = np.append(sigma, avg.s) + seen.append(sorted_t[i][0]) + else: + continue + + if dict1 == dict2: + del dict1["scan"][keys[1]] + + note = ( + f"This scan was merged with scan {keys[1]} from " + f'file {dict2["meta"]["original_filename"]} \n' + ) + if "notes" not in dict1["scan"][str(keys[0])]: + dict1["scan"][str(keys[0])]["notes"] = note + else: + dict1["scan"][str(keys[0])]["notes"] += note + + dict1["scan"][keys[0]]["om"] = om + dict1["scan"][keys[0]]["Counts"] = Counts + dict1["scan"][keys[0]]["sigma"] = sigma + dict1["scan"][keys[0]]["monitor"] = monitor + print("merging done") + return dict1 + + +def substract_measurement(dict1, dict2, keys, auto=True, monitor=100000): + """Substracts two scan (scan key2 from dict2 from measurent key1 in dict1), expects om to be same + :arg dict1 : dictionary to which scan will be merged + :arg dict2 : dictionary from which scan will be merged + :arg keys : tuple with key to dict1 and dict2 + :arg auto : if true, when monitors are same, does not change it, if flase, takes monitor always + :arg monitor : final monitor after merging + :returns d : dict1 with substracted Counts from dict2 and sigma that comes from the substraction""" + + if len(dict1["scan"][keys[0]]["om"]) != len(dict2["scan"][keys[1]]["om"]): + raise ValueError("Omegas have different lengths, cannot be substracted") + + if auto: + if dict1["scan"][keys[0]]["monitor"] == dict2["scan"][keys[1]]["monitor"]: + monitor = dict1["scan"][keys[0]]["monitor"] + + cor_y1, y_err1 = normalize(dict1, keys[0], monitor=monitor) + cor_y2, y_err2 = normalize(dict2, keys[1], monitor=monitor) + + dict1_count_err = create_uncertanities(cor_y1, y_err1) + dict2_count_err = create_uncertanities(cor_y2, y_err2) + + res = np.subtract(dict1_count_err, dict2_count_err) + + res_nom = [] + res_err = [] + for k in range(len(res)): + res_nom = np.append(res_nom, res[k].n) + res_err = np.append(res_err, res[k].s) + + if len([num for num in res_nom if num < 0]) >= 0.3 * len(res_nom): + print( + f"Warning! percentage of negative numbers in scan subsracted {keys[0]} is " + f"{len([num for num in res_nom if num < 0]) / len(res_nom)}" + ) + + dict1["scan"][str(keys[0])]["Counts"] = res_nom + dict1["scan"][str(keys[0])]["sigma"] = res_err + dict1["scan"][str(keys[0])]["monitor"] = monitor + note = ( + f'Scan {keys[1]} from file {dict2["meta"]["original_filename"]} ' + f"was substracted from this scan \n" + ) + if "notes" not in dict1["scan"][str(keys[0])]: + dict1["scan"][str(keys[0])]["notes"] = note + else: + dict1["scan"][str(keys[0])]["notes"] += note + return dict1 + + +def compare_dict(dict1, dict2): + """takes two ccl dictionaries and compare different values for each key + :arg dict1 : dictionary 1 (ccl) + :arg dict2 : dictionary 2 (ccl) + :returns warning : dictionary with keys from primary files (if they differ) with + information of how many scan differ and which ones differ + :returns report_string string comparing all different values respecively of measurements""" + + if dict1["meta"]["data_type"] != dict2["meta"]["data_type"]: + print("select two dicts") + return + S = [] + conflicts = {} + warnings = {} + + comp = compare_hkl(dict1, dict2) + d1 = scan_dict(dict1) + d2 = scan_dict(dict2) + if not d1: + S.append("There are no duplicates in %s (dict1) \n" % dict1["meta"]["original_filename"]) + else: + S.append( + "There are %d duplicates in %s (dict1) \n" + % (len(d1), dict1["meta"]["original_filename"]) + ) + warnings["Duplicates in dict1"] = list() + for keys in d1: + S.append("Measurements %s with hkl %s \n" % (d1[keys], keys)) + warnings["Duplicates in dict1"].append(d1[keys]) + if not d2: + S.append("There are no duplicates in %s (dict2) \n" % dict2["meta"]["original_filename"]) + else: + S.append( + "There are %d duplicates in %s (dict2) \n" + % (len(d2), dict2["meta"]["original_filename"]) + ) + warnings["Duplicates in dict2"] = list() + for keys in d2: + S.append("Measurements %s with hkl %s \n" % (d2[keys], keys)) + warnings["Duplicates in dict2"].append(d2[keys]) + + # compare meta + S.append("Different values in meta: \n") + different_meta = { + k: dict1["meta"][k] + for k in dict1["meta"] + if k in dict2["meta"] and dict1["meta"][k] != dict2["meta"][k] + } + exlude_meta_set = ["original_filename", "date", "title"] + for keys in different_meta: + if keys in exlude_meta_set: + continue + else: + if keys not in conflicts: + conflicts[keys] = 1 + else: + conflicts[keys] = conflicts[keys] + 1 + + S.append(" Different values in %s \n" % str(keys)) + S.append(" dict1: %s \n" % str(dict1["meta"][str(keys)])) + S.append(" dict2: %s \n" % str(dict2["meta"][str(keys)])) + + # compare Measurements + S.append( + "Number of measurements in %s = %s \n" + % (dict1["meta"]["original_filename"], len(dict1["scan"])) + ) + S.append( + "Number of measurements in %s = %s \n" + % (dict2["meta"]["original_filename"], len(dict2["scan"])) + ) + S.append("Different values in Measurements:\n") + select_set = ["om", "Counts", "sigma"] + exlude_set = ["time", "Counts", "date", "notes"] + for keys1 in comp: + for key2 in dict1["scan"][str(comp[str(keys1)][0])]: + if key2 in exlude_set: + continue + if key2 not in select_set: + try: + if ( + dict1["scan"][comp[str(keys1)][0]][str(key2)] + != dict2["scan"][str(comp[str(keys1)][1])][str(key2)] + ): + S.append( + "Scan value " + "%s" + ", with hkl %s differs in meausrements %s and %s \n" + % (key2, keys1, comp[str(keys1)][0], comp[str(keys1)][1]) + ) + S.append( + " dict1: %s \n" + % str(dict1["scan"][comp[str(keys1)][0]][str(key2)]) + ) + S.append( + " dict2: %s \n" + % str(dict2["scan"][comp[str(keys1)][1]][str(key2)]) + ) + if key2 not in conflicts: + conflicts[key2] = {} + conflicts[key2]["amount"] = 1 + conflicts[key2]["scan"] = str(comp[str(keys1)]) + else: + + conflicts[key2]["amount"] = conflicts[key2]["amount"] + 1 + conflicts[key2]["scan"] = ( + conflicts[key2]["scan"] + " " + (str(comp[str(keys1)])) + ) + except KeyError as e: + print("Missing keys, some files were probably merged or substracted") + print(e.args) + + else: + try: + comparison = list(dict1["scan"][comp[str(keys1)][0]][str(key2)]) == list( + dict2["scan"][comp[str(keys1)][1]][str(key2)] + ) + if len(list(dict1["scan"][comp[str(keys1)][0]][str(key2)])) != len( + list(dict2["scan"][comp[str(keys1)][1]][str(key2)]) + ): + if str("different length of %s" % key2) not in warnings: + warnings[str("different length of %s" % key2)] = list() + warnings[str("different length of %s" % key2)].append( + (str(comp[keys1][0]), str(comp[keys1][1])) + ) + else: + warnings[str("different length of %s" % key2)].append( + (str(comp[keys1][0]), str(comp[keys1][1])) + ) + if not comparison: + S.append( + "Scan value " + "%s" + " differs in scan %s and %s \n" + % (key2, comp[str(keys1)][0], comp[str(keys1)][1]) + ) + S.append( + " dict1: %s \n" + % str(list(dict1["scan"][comp[str(keys1)][0]][str(key2)])) + ) + S.append( + " dict2: %s \n" + % str(list(dict2["scan"][comp[str(keys1)][1]][str(key2)])) + ) + if key2 not in conflicts: + conflicts[key2] = {} + conflicts[key2]["amount"] = 1 + conflicts[key2]["scan"] = str(comp[str(keys1)]) + else: + conflicts[key2]["amount"] = conflicts[key2]["amount"] + 1 + conflicts[key2]["scan"] = ( + conflicts[key2]["scan"] + " " + (str(comp[str(keys1)])) + ) + except KeyError as e: + print("Missing keys, some files were probably merged or substracted") + print(e.args) + + for keys in conflicts: + try: + conflicts[str(keys)]["scan"] = conflicts[str(keys)]["scan"].split(" ") + except: + continue + report_string = "".join(S) + return warnings, conflicts, report_string + + +def guess_next(dict1, dict2, comp): + """iterates thorough the scans and tries to decide if the scans should be + substracted or merged""" + threshold = 0.05 + for keys in comp: + if ( + abs( + ( + dict1["scan"][str(comp[keys][0])]["temperature"] + - dict2["scan"][str(comp[keys][1])]["temperature"] + ) + / dict2["scan"][str(comp[keys][1])]["temperature"] + ) + < threshold + and abs( + ( + dict1["scan"][str(comp[keys][0])]["mag_field"] + - dict2["scan"][str(comp[keys][1])]["mag_field"] + ) + / dict2["scan"][str(comp[keys][1])]["mag_field"] + ) + < threshold + ): + comp[keys] = comp[keys] + tuple("m") + else: + comp[keys] = comp[keys] + tuple("s") + + return comp + + +def process_dict(dict1, dict2, comp): + """substracts or merges scans, guess_next function must run first """ + for keys in comp: + if comp[keys][2] == "s": + substract_measurement(dict1, dict2, comp[keys]) + elif comp[keys][2] == "m": + merge(dict1, dict2, comp[keys]) + + return dict1 diff --git a/pyzebra/ccl_findpeaks.py b/pyzebra/ccl_findpeaks.py new file mode 100644 index 0000000..6e7d898 --- /dev/null +++ b/pyzebra/ccl_findpeaks.py @@ -0,0 +1,75 @@ +import numpy as np +import scipy as sc +from scipy.interpolate import interp1d +from scipy.signal import savgol_filter + + +def ccl_findpeaks( + scan, int_threshold=0.8, prominence=50, smooth=False, window_size=7, poly_order=3 +): + + """function iterates through the dictionary created by load_cclv2 and locates peaks for each scan + args: scan - a single scan, + + int_threshold - fraction of threshold_intensity/max_intensity, must be positive num between 0 and 1 + i.e. will only detect peaks above 75% of max intensity + + prominence - defines a drop of values that must be between two peaks, must be positive number + i.e. if promimence is 20, it will detect two neigbouring peaks of 300 and 310 intesities, + if none of the itermediate values are lower that 290 + + smooth - if true, smooths data by savitzky golay filter, if false - no smoothing + + window_size - window size for savgol filter, must be odd positive integer + + poly_order = order of the polynomial used in savgol filter, must be positive integer smaller than + window_size returns: dictionary with following structure: + D{M34{ 'num_of_peaks': 1, #num of peaks + 'peak_indexes': [20], # index of peaks in omega array + 'peak_heights': [90.], # height of the peaks (if data vere smoothed + its the heigh of the peaks in smoothed data) + """ + if not 0 <= int_threshold <= 1: + int_threshold = 0.8 + print( + "Invalid value for int_threshold, select value between 0 and 1, new value set to:", + int_threshold, + ) + + if not isinstance(window_size, int) or (window_size % 2) == 0 or window_size <= 1: + window_size = 7 + print( + "Invalid value for window_size, select positive odd integer, new value set to!:", + window_size, + ) + + if not isinstance(poly_order, int) or window_size < poly_order: + poly_order = 3 + print( + "Invalid value for poly_order, select positive integer smaller than window_size, new value set to:", + poly_order, + ) + + if not isinstance(prominence, (int, float)) and prominence < 0: + prominence = 50 + print("Invalid value for prominence, select positive number, new value set to:", prominence) + + omega = scan["om"] + counts = np.array(scan["Counts"]) + if smooth: + itp = interp1d(omega, counts, kind="linear") + absintensity = [abs(number) for number in counts] + lowest_intensity = min(absintensity) + counts[counts < 0] = lowest_intensity + smooth_peaks = savgol_filter(itp(omega), window_size, poly_order) + + else: + smooth_peaks = counts + + peaks, properties = sc.signal.find_peaks( + smooth_peaks, height=int_threshold * max(smooth_peaks), prominence=prominence + ) + scan["num_of_peaks"] = len(peaks) + scan["peak_indexes"] = peaks + scan["peak_heights"] = properties["peak_heights"] + scan["smooth_peaks"] = smooth_peaks # smoothed curve diff --git a/pyzebra/comm_export.py b/pyzebra/comm_export.py new file mode 100644 index 0000000..e178bb0 --- /dev/null +++ b/pyzebra/comm_export.py @@ -0,0 +1,80 @@ +import numpy as np + + +def correction(value, lorentz=True, zebra_mode="--", ang1=0, ang2=0): + if lorentz is False: + return value + else: + if zebra_mode == "bi": + corr_value = np.abs(value * np.sin(ang1)) + return corr_value + elif zebra_mode == "nb": + corr_value = np.abs(value * np.sin(ang1) * np.cos(ang2)) + return corr_value + + +def export_comm(data, path, lorentz=False): + """exports data in the *.comm format + :param lorentz: perform Lorentz correction + :param path: path to file + name + :arg data - data to export, is dict after peak fitting + + """ + zebra_mode = data["meta"]["zebra_mode"] + align = ">" + if data["meta"]["indices"] == "hkl": + extension = ".comm" + padding = [6, 4, 10, 8] + elif data["meta"]["indices"] == "real": + extension = ".incomm" + padding = [4, 6, 10, 8] + + with open(str(path + extension), "w") as out_file: + for key, scan in data["scan"].items(): + if "fit" not in scan: + print("Scan skipped - no fit value for:", key) + continue + scan_number_str = f"{key:{align}{padding[0]}}" + h_str = f'{int(scan["h_index"]):{padding[1]}}' + k_str = f'{int(scan["k_index"]):{padding[1]}}' + l_str = f'{int(scan["l_index"]):{padding[1]}}' + if data["meta"]["area_method"] == "fit": + area = float(scan["fit"]["fit_area"].n) + sigma_str = ( + f'{"{:8.2f}".format(float(scan["fit"]["fit_area"].s)):{align}{padding[2]}}' + ) + elif data["meta"]["area_method"] == "integ": + area = float(scan["fit"]["int_area"].n) + sigma_str = ( + f'{"{:8.2f}".format(float(scan["fit"]["int_area"].s)):{align}{padding[2]}}' + ) + + if zebra_mode == "bi": + area = correction(area, lorentz, zebra_mode, scan["twotheta_angle"]) + int_str = f'{"{:8.2f}".format(area):{align}{padding[2]}}' + angle_str1 = f'{scan["twotheta_angle"]:{padding[3]}}' + angle_str2 = f'{scan["omega_angle"]:{padding[3]}}' + angle_str3 = f'{scan["chi_angle"]:{padding[3]}}' + angle_str4 = f'{scan["phi_angle"]:{padding[3]}}' + elif zebra_mode == "nb": + area = correction(area, lorentz, zebra_mode, scan["gamma_angle"], scan["nu_angle"]) + int_str = f'{"{:8.2f}".format(area):{align}{padding[2]}}' + angle_str1 = f'{scan["gamma_angle"]:{padding[3]}}' + angle_str2 = f'{scan["omega_angle"]:{padding[3]}}' + angle_str3 = f'{scan["nu_angle"]:{padding[3]}}' + angle_str4 = f'{scan["unkwn_angle"]:{padding[3]}}' + + line = ( + scan_number_str + + h_str + + l_str + + k_str + + int_str + + sigma_str + + angle_str1 + + angle_str2 + + angle_str3 + + angle_str4 + + "\n" + ) + out_file.write(line) diff --git a/pyzebra/fit2.py b/pyzebra/fit2.py new file mode 100644 index 0000000..c4132e6 --- /dev/null +++ b/pyzebra/fit2.py @@ -0,0 +1,227 @@ +import numpy as np +import uncertainties as u +from lmfit import Model, Parameters +from scipy.integrate import simps + + +def bin_data(array, binsize): + if isinstance(binsize, int) and 0 < binsize < len(array): + return [ + np.mean(array[binsize * i : binsize * i + binsize]) + for i in range(int(np.ceil(len(array) / binsize))) + ] + else: + print("Binsize need to be positive integer smaller than lenght of array") + return array + + +def find_nearest(array, value): + # find nearest value and return index + array = np.asarray(array) + idx = (np.abs(array - value)).argmin() + return idx + + +def create_uncertanities(y, y_err): + # create array with uncertanities for error propagation + combined = np.array([]) + for i in range(len(y)): + part = u.ufloat(y[i], y_err[i]) + combined = np.append(combined, part) + return combined + + +def fitccl( + scan, + guess, + vary, + constraints_min, + constraints_max, + numfit_min=None, + numfit_max=None, + binning=None, +): + """Made for fitting of ccl date where 1 peak is expected. Allows for combination of gaussian and linear model combination + :param scan: scan in the data dict (i.e. M123) + :param guess: initial guess for the fitting, if none, some values are added automatically in order (see below) + :param vary: True if parameter can vary during fitting, False if it to be fixed + :param numfit_min: minimal value on x axis for numerical integration - if none is centre of gaussian minus 3 sigma + :param numfit_max: maximal value on x axis for numerical integration - if none is centre of gaussian plus 3 sigma + :param constraints_min: min constranits value for fit + :param constraints_max: max constranits value for fit + :param binning : binning of the data + :return data dict with additional values + order for guess, vary, constraints_min, constraints_max: + [Gaussian centre, Gaussian sigma, Gaussian amplitude, background slope, background intercept] + examples: + guess = [None, None, 100, 0, None] + vary = [True, True, True, True, True] + constraints_min = [23, None, 50, 0, 0] + constraints_min = [80, None, 1000, 0, 100] + """ + if len(scan["peak_indexes"]) > 1: + # return in case of more than 1 peaks + print("More than 1 peak, scan skipped") + return + if binning is None or binning == 0 or binning == 1: + x = list(scan["om"]) + y = list(scan["Counts"]) + y_err = list(np.sqrt(y)) if scan.get("sigma", None) is None else list(scan["sigma"]) + print(scan["peak_indexes"]) + if not scan["peak_indexes"]: + centre = np.mean(x) + else: + centre = x[int(scan["peak_indexes"])] + else: + x = list(scan["om"]) + if not scan["peak_indexes"]: + centre = np.mean(x) + else: + centre = x[int(scan["peak_indexes"])] + x = bin_data(x, binning) + y = list(scan["Counts"]) + y_err = list(np.sqrt(y)) if scan.get("sigma", None) is None else list(scan["sigma"]) + combined = bin_data(create_uncertanities(y, y_err), binning) + y = [combined[i].n for i in range(len(combined))] + y_err = [combined[i].s for i in range(len(combined))] + + if len(scan["peak_indexes"]) == 0: + # Case for no peak, gaussian in centre, sigma as 20% of range + print("No peak") + peak_index = find_nearest(x, np.mean(x)) + guess[0] = centre if guess[0] is None else guess[0] + guess[1] = (x[-1] - x[0]) / 5 if guess[1] is None else guess[1] + guess[2] = 50 if guess[2] is None else guess[2] + guess[3] = 0 if guess[3] is None else guess[3] + guess[4] = np.mean(y) if guess[4] is None else guess[4] + constraints_min[2] = 0 + + elif len(scan["peak_indexes"]) == 1: + # case for one peak, takse into account users guesses + print("one peak") + peak_height = scan["peak_heights"] + guess[0] = centre if guess[0] is None else guess[0] + guess[1] = 0.1 if guess[1] is None else guess[1] + guess[2] = float(peak_height / 10) if guess[2] is None else float(guess[2]) + guess[3] = 0 if guess[3] is None else guess[3] + guess[4] = np.median(x) if guess[4] is None else guess[4] + constraints_min[0] = np.min(x) if constraints_min[0] is None else constraints_min[0] + constraints_max[0] = np.max(x) if constraints_max[0] is None else constraints_max[0] + + def gaussian(x, g_cen, g_width, g_amp): + """1-d gaussian: gaussian(x, amp, cen, wid)""" + return (g_amp / (np.sqrt(2 * np.pi) * g_width)) * np.exp( + -((x - g_cen) ** 2) / (2 * g_width ** 2) + ) + + def background(x, slope, intercept): + """background""" + return slope * (x - centre) + intercept + + mod = Model(gaussian) + Model(background) + params = Parameters() + params.add_many( + ("g_cen", guess[0], bool(vary[0]), np.min(x), np.max(x), None, None), + ("g_width", guess[1], bool(vary[1]), constraints_min[1], constraints_max[1], None, None), + ("g_amp", guess[2], bool(vary[2]), constraints_min[2], constraints_max[2], None, None), + ("slope", guess[3], bool(vary[3]), constraints_min[3], constraints_max[3], None, None), + ("intercept", guess[4], bool(vary[4]), constraints_min[4], constraints_max[4], None, None), + ) + # the weighted fit + try: + result = mod.fit( + y, params, weights=[np.abs(1 / val) for val in y_err], x=x, calc_covar=True, + ) + except ValueError: + return + + if result.params["g_amp"].stderr is None: + result.params["g_amp"].stderr = result.params["g_amp"].value + elif result.params["g_amp"].stderr > result.params["g_amp"].value: + result.params["g_amp"].stderr = result.params["g_amp"].value + + # u.ufloat to work with uncertanities + fit_area = u.ufloat(result.params["g_amp"].value, result.params["g_amp"].stderr) + comps = result.eval_components() + + if len(scan["peak_indexes"]) == 0: + # for case of no peak, there is no reason to integrate, therefore fit and int are equal + int_area = fit_area + + elif len(scan["peak_indexes"]) == 1: + gauss_3sigmamin = find_nearest( + x, result.params["g_cen"].value - 3 * result.params["g_width"].value + ) + gauss_3sigmamax = find_nearest( + x, result.params["g_cen"].value + 3 * result.params["g_width"].value + ) + numfit_min = gauss_3sigmamin if numfit_min is None else find_nearest(x, numfit_min) + numfit_max = gauss_3sigmamax if numfit_max is None else find_nearest(x, numfit_max) + + it = -1 + while abs(numfit_max - numfit_min) < 3: + # in the case the peak is very thin and numerical integration would be on zero omega + # difference, finds closes values + it = it + 1 + numfit_min = find_nearest( + x, + result.params["g_cen"].value - 3 * (1 + it / 10) * result.params["g_width"].value, + ) + numfit_max = find_nearest( + x, + result.params["g_cen"].value + 3 * (1 + it / 10) * result.params["g_width"].value, + ) + + if x[numfit_min] < np.min(x): + # makes sure that the values supplied by user lay in the omega range + # can be ommited for users who know what they're doing + numfit_min = gauss_3sigmamin + print("Minimal integration value outside of x range") + elif x[numfit_min] >= x[numfit_max]: + numfit_min = gauss_3sigmamin + print("Minimal integration value higher than maximal") + else: + pass + if x[numfit_max] > np.max(x): + numfit_max = gauss_3sigmamax + print("Maximal integration value outside of x range") + elif x[numfit_max] <= x[numfit_min]: + numfit_max = gauss_3sigmamax + print("Maximal integration value lower than minimal") + else: + pass + + count_errors = create_uncertanities(y, y_err) + # create error vector for numerical integration propagation + num_int_area = simps(count_errors[numfit_min:numfit_max], x[numfit_min:numfit_max]) + slope_err = u.ufloat(result.params["slope"].value, result.params["slope"].stderr) + # pulls the nominal and error values from fit (slope) + intercept_err = u.ufloat( + result.params["intercept"].value, result.params["intercept"].stderr + ) + # pulls the nominal and error values from fit (intercept) + + background_errors = np.array([]) + for j in range(len(x[numfit_min:numfit_max])): + # creates nominal and error vector for numerical integration of background + bg = slope_err * (x[j] - centre) + intercept_err + background_errors = np.append(background_errors, bg) + + num_int_background = simps(background_errors, x[numfit_min:numfit_max]) + int_area = num_int_area - num_int_background + + d = {} + for pars in result.params: + d[str(pars)] = (result.params[str(pars)].value, result.params[str(pars)].vary) + print(result.fit_report()) + + print((result.params["g_amp"].value - int_area.n) / result.params["g_amp"].value) + + d["ratio"] = (result.params["g_amp"].value - int_area.n) / result.params["g_amp"].value + d["int_area"] = int_area + d["fit_area"] = u.ufloat(result.params["g_amp"].value, result.params["g_amp"].stderr) + d["full_report"] = result.fit_report() + d["result"] = result + d["comps"] = comps + d["numfit"] = [numfit_min, numfit_max] + scan["fit"] = d diff --git a/pyzebra/h5.py b/pyzebra/h5.py index 14f2e4b..3286f0e 100644 --- a/pyzebra/h5.py +++ b/pyzebra/h5.py @@ -22,7 +22,7 @@ def parse_h5meta(file): for line in file: line = line.strip() if line.startswith("#begin "): - section = line[len("#begin "):] + section = line[len("#begin ") :] content[section] = [] elif line.startswith("#end"): @@ -64,20 +64,3 @@ def read_detector_data(filepath): det_data["temperature"] = h5f["/entry1/sample/temperature"][:] return det_data - - -def open_h5meta(filepath): - """Open h5meta file like *.cami - - Args: - filepath (str): File path of a h5meta file. - - Returns: - dict: A dictionary with h5 names and their detector data and angles. - """ - data = dict() - h5meta_content = read_h5meta(filepath) - for file in h5meta_content["filelist"]: - data[file] = read_detector_data(file) - - return data diff --git a/pyzebra/load_1D.py b/pyzebra/load_1D.py new file mode 100644 index 0000000..50f3603 --- /dev/null +++ b/pyzebra/load_1D.py @@ -0,0 +1,221 @@ +import os +import re +from collections import defaultdict +from decimal import Decimal + +import numpy as np + +META_VARS_STR = ( + "instrument", + "title", + "sample", + "user", + "ProposalID", + "original_filename", + "date", + "zebra_mode", + "proposal", + "proposal_user", + "proposal_title", + "proposal_email", + "detectorDistance", +) +META_VARS_FLOAT = ( + "omega", + "mf", + "2-theta", + "chi", + "phi", + "nu", + "temp", + "wavelenght", + "a", + "b", + "c", + "alpha", + "beta", + "gamma", + "cex1", + "cex2", + "mexz", + "moml", + "mcvl", + "momu", + "mcvu", + "snv", + "snh", + "snvm", + "snhm", + "s1vt", + "s1vb", + "s1hr", + "s1hl", + "s2vt", + "s2vb", + "s2hr", + "s2hl", +) +META_UB_MATRIX = ("ub1j", "ub2j", "ub3j") + +CCL_FIRST_LINE = ( + # the first element is `scan_number`, which we don't save to metadata + ("h_index", float), + ("k_index", float), + ("l_index", float), +) + +CCL_FIRST_LINE_BI = ( + *CCL_FIRST_LINE, + ("twotheta_angle", float), + ("omega_angle", float), + ("chi_angle", float), + ("phi_angle", float), +) + +CCL_FIRST_LINE_NB = ( + *CCL_FIRST_LINE, + ("gamma_angle", float), + ("omega_angle", float), + ("nu_angle", float), + ("unkwn_angle", float), +) + +CCL_SECOND_LINE = ( + ("number_of_measurements", int), + ("angle_step", float), + ("monitor", float), + ("temperature", float), + ("mag_field", float), + ("date", str), + ("time", str), + ("scan_type", str), +) + + +def load_1D(filepath): + """ + Loads *.ccl or *.dat file (Distinguishes them based on last 3 chars in string of filepath + to add more variables to read, extend the elif list + the file must include '#data' and number of points in right place to work properly + + :arg filepath + :returns det_variables + - dictionary of all detector/scan variables and dictinionary for every scan. + Names of these dictionaries are M + scan number. They include HKL indeces, angles, + monitors, stepsize and array of counts + """ + with open(filepath, "r") as infile: + _, ext = os.path.splitext(filepath) + det_variables = parse_1D(infile, data_type=ext) + + return det_variables + + +def parse_1D(fileobj, data_type): + # read metadata + metadata = {} + for line in fileobj: + if "=" in line: + variable, value = line.split("=") + variable = variable.strip() + if variable in META_VARS_FLOAT: + metadata[variable] = float(value) + elif variable in META_VARS_STR: + metadata[variable] = str(value)[:-1].strip() + elif variable in META_UB_MATRIX: + metadata[variable] = re.findall(r"[-+]?\d*\.\d+|\d+", str(value)) + + if "#data" in line: + # this is the end of metadata and the start of data section + break + + # read data + scan = {} + if data_type == ".ccl": + decimal = list() + + if metadata["zebra_mode"] == "bi": + ccl_first_line = CCL_FIRST_LINE_BI + elif metadata["zebra_mode"] == "nb": + ccl_first_line = CCL_FIRST_LINE_NB + ccl_second_line = CCL_SECOND_LINE + + for line in fileobj: + d = {} + + # first line + scan_number, *params = line.split() + for param, (param_name, param_type) in zip(params, ccl_first_line): + d[param_name] = param_type(param) + + decimal.append(bool(Decimal(d["h_index"]) % 1 == 0)) + decimal.append(bool(Decimal(d["k_index"]) % 1 == 0)) + decimal.append(bool(Decimal(d["l_index"]) % 1 == 0)) + + # second line + next_line = next(fileobj) + params = next_line.split() + for param, (param_name, param_type) in zip(params, ccl_second_line): + d[param_name] = param_type(param) + + d["om"] = np.linspace( + d["omega_angle"] - (d["number_of_measurements"] / 2) * d["angle_step"], + d["omega_angle"] + (d["number_of_measurements"] / 2) * d["angle_step"], + d["number_of_measurements"], + ) + + # subsequent lines with counts + counts = [] + while len(counts) < d["number_of_measurements"]: + counts.extend(map(int, next(fileobj).split())) + d["Counts"] = counts + + scan[int(scan_number)] = d + + if all(decimal): + metadata["indices"] = "hkl" + else: + metadata["indices"] = "real" + + elif data_type == ".dat": + # skip the first 2 rows, the third row contans the column names + next(fileobj) + next(fileobj) + col_names = next(fileobj).split() + data_cols = defaultdict(list) + + for line in fileobj: + if "END-OF-DATA" in line: + # this is the end of data + break + + for name, val in zip(col_names, line.split()): + data_cols[name].append(float(val)) + + try: + data_cols["h_index"] = float(metadata["title"].split()[-3]) + data_cols["k_index"] = float(metadata["title"].split()[-2]) + data_cols["l_index"] = float(metadata["title"].split()[-1]) + except (ValueError, IndexError): + print("seems hkl is not in title") + + data_cols["temperature"] = metadata["temp"] + data_cols["mag_field"] = metadata["mf"] + data_cols["omega_angle"] = metadata["omega"] + data_cols["number_of_measurements"] = len(data_cols["om"]) + data_cols["monitor"] = data_cols["Monitor1"][0] + data_cols["twotheta_angle"] = metadata["2-theta"] + data_cols["chi_angle"] = metadata["chi"] + data_cols["phi_angle"] = metadata["phi"] + data_cols["nu_angle"] = metadata["nu"] + + scan[1] = dict(data_cols) + + else: + print("Unknown file extention") + + # utility information + metadata["data_type"] = data_type + metadata["area_method"] = "fit" + + return {"meta": metadata, "scan": scan} diff --git a/pyzebra/param_study_moduls.py b/pyzebra/param_study_moduls.py new file mode 100644 index 0000000..2593039 --- /dev/null +++ b/pyzebra/param_study_moduls.py @@ -0,0 +1,202 @@ +from load_1D import load_1D +from ccl_dict_operation import add_dict +import pandas as pd +from mpl_toolkits.mplot3d import Axes3D # dont delete, otherwise waterfall wont work +import matplotlib.pyplot as plt +import matplotlib as mpl +import numpy as np +import pickle +import scipy.io as sio + + +def load_dats(filepath): + """reads the txt file, get headers and data + :arg filepath to txt file or list of filepaths to the files + :return ccl like dictionary""" + if isinstance(filepath, str): + data_type = "txt" + file_list = list() + with open(filepath, "r") as infile: + col_names = next(infile).split(",") + col_names = [col_names[i].rstrip() for i in range(len(col_names))] + for line in infile: + if "END" in line: + break + file_list.append(tuple(line.split(","))) + elif isinstance(filepath, list): + data_type = "list" + file_list = filepath + dict1 = {} + for i in range(len(file_list)): + if not dict1: + if data_type == "txt": + dict1 = load_1D(file_list[0][0]) + else: + dict1 = load_1D(file_list[0]) + else: + if data_type == "txt": + dict1 = add_dict(dict1, load_1D(file_list[i][0])) + else: + dict1 = add_dict(dict1, load_1D(file_list[i])) + dict1["scan"][i + 1]["params"] = {} + if data_type == "txt": + for x in range(len(col_names) - 1): + dict1["scan"][i + 1]["params"][col_names[x + 1]] = file_list[i][x + 1] + + return dict1 + + +def create_dataframe(dict1): + """Creates pandas dataframe from the dictionary + :arg ccl like dictionary + :return pandas dataframe""" + # create dictionary to which we pull only wanted items before transforming it to pd.dataframe + pull_dict = {} + pull_dict["filenames"] = list() + for key in dict1["scan"][1]["params"]: + pull_dict[key] = list() + pull_dict["temperature"] = list() + pull_dict["mag_field"] = list() + pull_dict["fit_area"] = list() + pull_dict["int_area"] = list() + pull_dict["om"] = list() + pull_dict["Counts"] = list() + + # populate the dict + for keys in dict1["scan"]: + if "file_of_origin" in dict1["scan"][keys]: + pull_dict["filenames"].append(dict1["scan"][keys]["file_of_origin"].split("/")[-1]) + else: + pull_dict["filenames"].append(dict1["meta"]["original_filename"].split("/")[-1]) + for key in dict1["scan"][keys]["params"]: + pull_dict[str(key)].append(float(dict1["scan"][keys]["params"][key])) + pull_dict["temperature"].append(dict1["scan"][keys]["temperature"]) + pull_dict["mag_field"].append(dict1["scan"][keys]["mag_field"]) + pull_dict["fit_area"].append(dict1["scan"][keys]["fit"]["fit_area"]) + pull_dict["int_area"].append(dict1["scan"][keys]["fit"]["int_area"]) + pull_dict["om"].append(dict1["scan"][keys]["om"]) + pull_dict["Counts"].append(dict1["scan"][keys]["Counts"]) + + return pd.DataFrame(data=pull_dict) + + +def sort_dataframe(dataframe, sorting_parameter): + """sorts the data frame and resets index""" + data = dataframe.sort_values(by=sorting_parameter) + data = data.reset_index(drop=True) + return data + + +def make_graph(data, sorting_parameter, style): + """Makes the graph from the data based on style and sorting parameter + :arg data : pandas dataframe with data after sorting + :arg sorting_parameter to pull the correct variable and name + :arg style of the graph - waterfall, scatter, heatmap + :return matplotlib figure""" + if style == "waterfall": + mpl.rcParams["legend.fontsize"] = 10 + fig = plt.figure() + ax = fig.gca(projection="3d") + for i in range(len(data)): + x = data["om"][i] + z = data["Counts"][i] + yy = [data[sorting_parameter][i]] * len(x) + ax.plot(x, yy, z, label=str("%s = %f" % (sorting_parameter, yy[i]))) + + ax.legend() + ax.set_xlabel("Omega") + ax.set_ylabel(sorting_parameter) + ax.set_zlabel("counts") + + elif style == "scatter": + fig = plt.figure() + plt.errorbar( + data[sorting_parameter], + [data["fit_area"][i].n for i in range(len(data["fit_area"]))], + [data["fit_area"][i].s for i in range(len(data["fit_area"]))], + capsize=5, + ecolor="green", + ) + plt.xlabel(str(sorting_parameter)) + plt.ylabel("Intesity") + + elif style == "heat": + new_om = list() + for i in range(len(data)): + new_om = np.append(new_om, np.around(data["om"][i], 2), axis=0) + unique_om = np.unique(new_om) + color_matrix = np.zeros(shape=(len(data), len(unique_om))) + for i in range(len(data)): + for j in range(len(data["om"][i])): + if np.around(data["om"][i][j], 2) in np.unique(new_om): + color_matrix[i, j] = data["Counts"][i][j] + else: + continue + + fig = plt.figure() + plt.pcolormesh(unique_om, data[sorting_parameter], color_matrix, shading="gouraud") + plt.xlabel("omega") + plt.ylabel(sorting_parameter) + plt.colorbar() + plt.clim(color_matrix.mean(), color_matrix.max()) + + return fig + + +def save_dict(obj, name): + """ saves dictionary as pickle file in binary format + :arg obj - object to save + :arg name - name of the file + NOTE: path should be added later""" + with open(name + ".pkl", "wb") as f: + pickle.dump(obj, f, pickle.HIGHEST_PROTOCOL) + + +def load_dict(name): + """load dictionary from picle file + :arg name - name of the file to load + NOTE: expect the file in the same folder, path should be added later + :return dictionary""" + with open(name + ".pkl", "rb") as f: + return pickle.load(f) + + +# pickle, mat, h5, txt, csv, json +def save_table(data, filetype, name, path=None): + print("Saving: ", filetype) + path = "" if path is None else path + if filetype == "pickle": + # to work with uncertanities, see uncertanity module + with open(path + name + ".pkl", "wb") as f: + pickle.dump(data, f, pickle.HIGHEST_PROTOCOL) + if filetype == "mat": + # matlab doesent allow some special character to be in var names, also cant start with + # numbers, in need, add some to the romove_character list + data["fit_area_nom"] = [data["fit_area"][i].n for i in range(len(data["fit_area"]))] + data["fit_area_err"] = [data["fit_area"][i].s for i in range(len(data["fit_area"]))] + data["int_area_nom"] = [data["int_area"][i].n for i in range(len(data["int_area"]))] + data["int_area_err"] = [data["int_area"][i].s for i in range(len(data["int_area"]))] + data = data.drop(columns=["fit_area", "int_area"]) + remove_characters = [" ", "[", "]", "{", "}", "(", ")"] + for character in remove_characters: + data.columns = [ + data.columns[i].replace(character, "") for i in range(len(data.columns)) + ] + sio.savemat((path + name + ".mat"), {name: col.values for name, col in data.items()}) + if filetype == "csv" or "txt": + data["fit_area_nom"] = [data["fit_area"][i].n for i in range(len(data["fit_area"]))] + data["fit_area_err"] = [data["fit_area"][i].s for i in range(len(data["fit_area"]))] + data["int_area_nom"] = [data["int_area"][i].n for i in range(len(data["int_area"]))] + data["int_area_err"] = [data["int_area"][i].s for i in range(len(data["int_area"]))] + data = data.drop(columns=["fit_area", "int_area", "om", "Counts"]) + if filetype == "csv": + data.to_csv(path + name + ".csv") + if filetype == "txt": + with open((path + name + ".txt"), "w") as outfile: + data.to_string(outfile) + if filetype == "h5": + hdf = pd.HDFStore((path + name + ".h5")) + hdf.put("data", data) + hdf.close() + if filetype == "json": + data.to_json((path + name + ".json")) diff --git a/pyzebra/xtal.py b/pyzebra/xtal.py index 817453d..4407937 100644 --- a/pyzebra/xtal.py +++ b/pyzebra/xtal.py @@ -1,12 +1,16 @@ import math import numpy as np -from matplotlib import pyplot as plt from numba import njit from scipy.optimize import curve_fit import pyzebra +try: + from matplotlib import pyplot as plt +except ImportError: + print("matplotlib is not available") + pi_r = 180 / np.pi