diff --git a/pyzebra/param_study_moduls.py b/pyzebra/param_study_moduls.py index 2593039..928fb12 100644 --- a/pyzebra/param_study_moduls.py +++ b/pyzebra/param_study_moduls.py @@ -1,5 +1,4 @@ from load_1D import load_1D -from ccl_dict_operation import add_dict import pandas as pd from mpl_toolkits.mplot3d import Axes3D # dont delete, otherwise waterfall wont work import matplotlib.pyplot as plt @@ -7,6 +6,17 @@ import matplotlib as mpl import numpy as np import pickle import scipy.io as sio +import uncertainties as u + + +def create_tuples(x, y, y_err): + """creates tuples for sorting and merginng of the data + Counts need to be normalized to monitor before""" + t = list() + for i in range(len(x)): + tup = (x[i], y[i], y_err[i]) + t.append(tup) + return t def load_dats(filepath): @@ -38,10 +48,10 @@ def load_dats(filepath): dict1 = add_dict(dict1, load_1D(file_list[i][0])) else: dict1 = add_dict(dict1, load_1D(file_list[i])) - dict1["scan"][i + 1]["params"] = {} + dict1["meas"][i + 1]["params"] = {} if data_type == "txt": for x in range(len(col_names) - 1): - dict1["scan"][i + 1]["params"][col_names[x + 1]] = file_list[i][x + 1] + dict1["meas"][i + 1]["params"][col_names[x + 1]] = file_list[i][x + 1] return dict1 @@ -53,7 +63,7 @@ def create_dataframe(dict1): # create dictionary to which we pull only wanted items before transforming it to pd.dataframe pull_dict = {} pull_dict["filenames"] = list() - for key in dict1["scan"][1]["params"]: + for key in dict1["meas"][1]["params"]: pull_dict[key] = list() pull_dict["temperature"] = list() pull_dict["mag_field"] = list() @@ -63,19 +73,19 @@ def create_dataframe(dict1): pull_dict["Counts"] = list() # populate the dict - for keys in dict1["scan"]: - if "file_of_origin" in dict1["scan"][keys]: - pull_dict["filenames"].append(dict1["scan"][keys]["file_of_origin"].split("/")[-1]) + for keys in dict1["meas"]: + if "file_of_origin" in dict1["meas"][keys]: + pull_dict["filenames"].append(dict1["meas"][keys]["file_of_origin"].split("/")[-1]) else: pull_dict["filenames"].append(dict1["meta"]["original_filename"].split("/")[-1]) - for key in dict1["scan"][keys]["params"]: - pull_dict[str(key)].append(float(dict1["scan"][keys]["params"][key])) - pull_dict["temperature"].append(dict1["scan"][keys]["temperature"]) - pull_dict["mag_field"].append(dict1["scan"][keys]["mag_field"]) - pull_dict["fit_area"].append(dict1["scan"][keys]["fit"]["fit_area"]) - pull_dict["int_area"].append(dict1["scan"][keys]["fit"]["int_area"]) - pull_dict["om"].append(dict1["scan"][keys]["om"]) - pull_dict["Counts"].append(dict1["scan"][keys]["Counts"]) + for key in dict1["meas"][keys]["params"]: + pull_dict[str(key)].append(float(dict1["meas"][keys]["params"][key])) + pull_dict["temperature"].append(dict1["meas"][keys]["temperature"]) + pull_dict["mag_field"].append(dict1["meas"][keys]["mag_field"]) + pull_dict["fit_area"].append(dict1["meas"][keys]["fit"]["fit_area"]) + pull_dict["int_area"].append(dict1["meas"][keys]["fit"]["int_area"]) + pull_dict["om"].append(dict1["meas"][keys]["om"]) + pull_dict["Counts"].append(dict1["meas"][keys]["Counts"]) return pd.DataFrame(data=pull_dict) @@ -144,7 +154,7 @@ def make_graph(data, sorting_parameter, style): def save_dict(obj, name): - """ saves dictionary as pickle file in binary format + """saves dictionary as pickle file in binary format :arg obj - object to save :arg name - name of the file NOTE: path should be added later""" @@ -200,3 +210,172 @@ def save_table(data, filetype, name, path=None): hdf.close() if filetype == "json": data.to_json((path + name + ".json")) + + def normalize(dict, key, monitor): + """Normalizes the measurement to monitor, checks if sigma exists, otherwise creates it + :arg dict : dictionary to from which to tkae the scan + :arg key : which scan to normalize from dict1 + :arg monitor : final monitor + :return counts - normalized counts + :return sigma - normalized sigma""" + + counts = np.array(dict["meas"][key]["Counts"]) + sigma = np.sqrt(counts) if "sigma" not in dict["meas"][key] else dict["meas"][key]["sigma"] + monitor_ratio = monitor / dict["meas"][key]["monitor"] + scaled_counts = counts * monitor_ratio + scaled_sigma = np.array(sigma) * monitor_ratio + + return scaled_counts, scaled_sigma + + def merge(dict1, dict2, scand_dict_result, auto=True, monitor=100000): + """merges the two tuples and sorts them, if om value is same, Counts value is average + averaging is propagated into sigma if dict1 == dict2, key[1] is deleted after merging + :arg dict1 : dictionary to which measurement will be merged + :arg dict2 : dictionary from which measurement will be merged + :arg keys : tuple with key to dict1 and dict2 + :arg auto : if true, when monitors are same, does not change it, if flase, takes monitor always + :arg monitor : final monitor after merging + note: dict1 and dict2 can be same dict + :return dict1 with merged scan""" + for keys in scand_dict_result: + for j in range(len(scand_dict_result[keys])): + first, second = scand_dict_result[keys][j][0], scand_dict_result[keys][j][1] + print(first, second) + if auto: + if dict1["meas"][first]["monitor"] == dict2["meas"][second]["monitor"]: + monitor = dict1["meas"][first]["monitor"] + + # load om and Counts + x1, x2 = dict1["meas"][first]["om"], dict2["meas"][second]["om"] + cor_y1, y_err1 = normalize(dict1, first, monitor=monitor) + cor_y2, y_err2 = normalize(dict2, second, monitor=monitor) + # creates touples (om, Counts, sigma) for sorting and further processing + tuple_list = create_tuples(x1, cor_y1, y_err1) + create_tuples(x2, cor_y2, y_err2) + # Sort the list on om and add 0 0 0 tuple to the last position + sorted_t = sorted(tuple_list, key=lambda tup: tup[0]) + sorted_t.append((0, 0, 0)) + om, Counts, sigma = [], [], [] + seen = list() + for i in range(len(sorted_t) - 1): + if sorted_t[i][0] not in seen: + if sorted_t[i][0] != sorted_t[i + 1][0]: + om = np.append(om, sorted_t[i][0]) + Counts = np.append(Counts, sorted_t[i][1]) + sigma = np.append(sigma, sorted_t[i][2]) + else: + om = np.append(om, sorted_t[i][0]) + counts1, counts2 = sorted_t[i][1], sorted_t[i + 1][1] + sigma1, sigma2 = sorted_t[i][2], sorted_t[i + 1][2] + count_err1 = u.ufloat(counts1, sigma1) + count_err2 = u.ufloat(counts2, sigma2) + avg = (count_err1 + count_err2) / 2 + Counts = np.append(Counts, avg.n) + sigma = np.append(sigma, avg.s) + seen.append(sorted_t[i][0]) + else: + continue + + if dict1 == dict2: + del dict1["meas"][second] + + note = ( + f"This measurement was merged with measurement {second} from " + f'file {dict2["meta"]["original_filename"]} \n' + ) + if "notes" not in dict1["meas"][first]: + dict1["meas"][first]["notes"] = note + else: + dict1["meas"][first]["notes"] += note + + dict1["meas"][first]["om"] = om + dict1["meas"][first]["Counts"] = Counts + dict1["meas"][first]["sigma"] = sigma + dict1["meas"][first]["monitor"] = monitor + print("merging done") + return dict1 + + +def add_dict(dict1, dict2): + """adds two dictionaries, meta of the new is saved as meata+original_filename and + measurements are shifted to continue with numbering of first dict + :arg dict1 : dictionarry to add to + :arg dict2 : dictionarry from which to take the measurements + :return dict1 : combined dictionary + Note: dict1 must be made from ccl, otherwise we would have to change the structure of loaded + dat file""" + if dict1["meta"]["zebra_mode"] != dict2["meta"]["zebra_mode"]: + print("You are trying to add scans measured with different zebra modes") + return + max_measurement_dict1 = max([keys for keys in dict1["meas"]]) + new_filenames = np.arange( + max_measurement_dict1 + 1, max_measurement_dict1 + 1 + len(dict2["meas"]) + ) + new_meta_name = "meta" + str(dict2["meta"]["original_filename"]) + if new_meta_name not in dict1: + for keys, name in zip(dict2["meas"], new_filenames): + dict2["meas"][keys]["file_of_origin"] = str(dict2["meta"]["original_filename"]) + dict1["meas"][name] = dict2["meas"][keys] + + dict1[new_meta_name] = dict2["meta"] + else: + raise KeyError( + str( + "The file %s has alredy been added to %s" + % (dict2["meta"]["original_filename"], dict1["meta"]["original_filename"]) + ) + ) + return dict1 + + +def auto(dict): + """takes just unique tuples from all tuples in dictionary returend by scan_dict + intendet for automatic merge if you doesent want to specify what scans to merge together + args: dict - dictionary from scan_dict function + :return dict - dict without repetitions""" + for keys in dict: + tuple_list = dict[keys] + new = list() + for i in range(len(tuple_list)): + if tuple_list[0][0] == tuple_list[i][0]: + new.append(tuple_list[i]) + dict[keys] = new + return dict + + +def scan_dict(dict, precision=0.5): + """scans dictionary for duplicate angles indexes + :arg dict : dictionary to scan + :arg precision : in deg, sometimes angles are zero so its easier this way, instead of + checking zero division + :return dictionary with matching scans, if there are none, the dict is empty + note: can be checked by "not d", true if empty + """ + if dict["meta"]["zebra_mode"] == "bi": + angles = ["twotheta_angle", "omega_angle", "chi_angle", "phi_angle"] + elif dict["meta"]["zebra_mode"] == "nb": + angles = ["gamma_angle", "omega_angle", "nu_angle"] + else: + print("Unknown zebra mode") + return + + d = {} + for i in dict["meas"]: + for j in dict["meas"]: + if dict["meas"][i] != dict["meas"][j]: + itup = list() + for k in angles: + itup.append(abs(abs(dict["meas"][i][k]) - abs(dict["meas"][j][k]))) + + if all(i <= precision for i in itup): + if str([np.around(dict["meas"][i][k], 1) for k in angles]) not in d: + d[str([np.around(dict["meas"][i][k], 1) for k in angles])] = list() + d[str([np.around(dict["meas"][i][k], 1) for k in angles])].append((i, j)) + else: + d[str([np.around(dict["meas"][i][k], 1) for k in angles])].append((i, j)) + + else: + pass + + else: + continue + return d