From f7f016cf1c0c2334d833a69b59958972e3716c79 Mon Sep 17 00:00:00 2001 From: JakHolzer <53743814+JakHolzer@users.noreply.github.com> Date: Tue, 17 Nov 2020 15:25:09 +0100 Subject: [PATCH] First draft of new merge function This is the first shot at the new merge function. I didnt wanna rewrite the previous before we agree on this, since I think there will be some changes. Therefore I would like to discuss this first. Since we agreed not to do is as previously, ergo first scan everything and then merge or add, I've tried to do these function recursive. Haven't tested it much, I would like to agree if this would be a good way on how to write it. --- pyzebra/merge_function.py | 294 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 294 insertions(+) create mode 100644 pyzebra/merge_function.py diff --git a/pyzebra/merge_function.py b/pyzebra/merge_function.py new file mode 100644 index 0000000..5cad2b5 --- /dev/null +++ b/pyzebra/merge_function.py @@ -0,0 +1,294 @@ +import numpy as np +import uncertainties as u + + +def create_tuples(x, y, y_err): + """creates tuples for sorting and merginng of the data + Counts need to be normalized to monitor before""" + t = list() + for i in range(len(x)): + tup = (x[i], y[i], y_err[i]) + t.append(tup) + return t + + +def normalize(scan, monitor): + """Normalizes the measurement to monitor, checks if sigma exists, otherwise creates it + :arg dict : dictionary to from which to tkae the scan + :arg key : which scan to normalize from dict1 + :arg monitor : final monitor + :return counts - normalized counts + :return sigma - normalized sigma""" + + counts = np.array(scan["Counts"]) + sigma = np.sqrt(counts) if "sigma" not in scan else scan["sigma"] + monitor_ratio = monitor / scan["monitor"] + scaled_counts = counts * monitor_ratio + scaled_sigma = np.array(sigma) * monitor_ratio + + return scaled_counts, scaled_sigma + + +def merge(scan1, scan2, keep=True, monitor=100000): + """merges the two tuples and sorts them, if om value is same, Counts value is average + averaging is propagated into sigma if dict1 == dict2, key[1] is deleted after merging + :arg dict1 : dictionary to which measurement will be merged + :arg dict2 : dictionary from which measurement will be merged + :arg scand_dict_result : result of scan_dict after auto function + :arg keep : if true, when monitors are same, does not change it, if flase, takes monitor + always + :arg monitor : final monitor after merging + note: dict1 and dict2 can be same dict + :return dict1 with merged scan""" + + if keep: + if scan1["monitor"] == scan2["monitor"]: + monitor = scan1["monitor"] + + # load om and Counts + x1, x2 = scan1["om"], scan2["om"] + cor_y1, y_err1 = normalize(scan1, monitor=monitor) + cor_y2, y_err2 = normalize(scan2, monitor=monitor) + # creates touples (om, Counts, sigma) for sorting and further processing + tuple_list = create_tuples(x1, cor_y1, y_err1) + create_tuples(x2, cor_y2, y_err2) + # Sort the list on om and add 0 0 0 tuple to the last position + sorted_t = sorted(tuple_list, key=lambda tup: tup[0]) + sorted_t.append((0, 0, 0)) + om, Counts, sigma = [], [], [] + seen = list() + for i in range(len(sorted_t) - 1): + if sorted_t[i][0] not in seen: + if sorted_t[i][0] != sorted_t[i + 1][0]: + om = np.append(om, sorted_t[i][0]) + Counts = np.append(Counts, sorted_t[i][1]) + sigma = np.append(sigma, sorted_t[i][2]) + else: + om = np.append(om, sorted_t[i][0]) + counts1, counts2 = sorted_t[i][1], sorted_t[i + 1][1] + sigma1, sigma2 = sorted_t[i][2], sorted_t[i + 1][2] + count_err1 = u.ufloat(counts1, sigma1) + count_err2 = u.ufloat(counts2, sigma2) + avg = (count_err1 + count_err2) / 2 + Counts = np.append(Counts, avg.n) + sigma = np.append(sigma, avg.s) + seen.append(sorted_t[i][0]) + else: + continue + scan1["om"] = om + scan1["Counts"] = Counts + scan1["sigma"] = sigma + scan1["monitor"] = monitor + print("merging done") + + +def check_UB(dict1, dict2, precision=0.01): + truth_list = list() + for i in ["ub1j", "ub2j", "ub3j"]: + for j in range(3): + if abs(abs(float(dict1["meta"][i][j])) - abs(float(dict2["meta"][i][j]))) < precision: + + truth_list.append(True) + else: + truth_list.append(False) + + # print(truth_list) + if all(truth_list): + return True + else: + return False + + +def check_zebramode(dict1, dict2): + if dict1["meta"]["zebra_mode"] == dict2["meta"]["zebra_mode"]: + return True + else: + return False + + +def check_angles(scan1, scan2, angles, precision): + truth_list = list() + for item in angles: + if abs(abs(scan1[item]) - abs(scan2[item])) <= precision[item]: + truth_list.append(True) + else: + truth_list.append(False) + if all(truth_list): + return True + else: + return False + + +def check_temp_mag(scan1, scan2): + temp_diff = 1 + mag_diff = 0.001 + truth_list = list() + try: + if abs(abs(scan1["mag_field"]) - abs(scan2["mag_field"])) <= mag_diff: + truth_list.append(True) + else: + truth_list.append(False) + except KeyError: + print("mag_field missing") + + try: + if abs(abs(scan1["temperature"]) - abs(scan2["temperature"])) <= temp_diff: + truth_list.append(True) + else: + truth_list.append(False) + except KeyError: + print("temperature missing") + + if all(truth_list): + return True + else: + return False + + +def merge_dups(dictionary, angles): + precision = { + "twotheta_angle": 0.1, + "chi_angle": 0.1, + "nu_angle": 0.1, + "phi_angle": 0.05, + "omega_angle": 0.05, + "gamma_angle": 0.05, + } + + for i in list(dictionary["scan"]): + for j in list(dictionary["scan"]): + if i == j: + continue + else: + # print(i, j) + if check_angles(dictionary["scan"][i], dictionary["scan"][j], angles, precision): + merge(dictionary["scan"][i], dictionary["scan"][j]) + print("merged %d with %d" % (i, j)) + + del dictionary["scan"][j] + merge_dups(dictionary, angles) + break + else: + continue + break + + +def add_scan(dict1, dict2, scan_to_add): + max_scan = np.max(list(dict1["scan"])) + dict1["scan"][max_scan + 1] = dict2["scan"][scan_to_add] + del dict2["scan"][scan_to_add] + + +def process(dict1, dict2, angles, precision): + # stop when the second dict is empty + # print(dict2["scan"]) + if dict2["scan"]: + print("doing something") + # check UB matrixes + if check_UB(dict1, dict2): + # iterate over second dict and check for matches + for i in list(dict2["scan"]): + for j in list(dict1["scan"]): + if check_angles(dict1["scan"][j], dict2["scan"][i], angles, precision): + # angles good, see the mag and temp + if check_temp_mag(dict1["scan"][j], dict2["scan"][i]): + merge(dict1["scan"][j], dict2["scan"][i]) + print("merged") + del dict2["scan"][i] + process(dict1, dict2, angles, precision) + break + else: + add_scan(dict1, dict2, i) + print("scan added r") + process(dict1, dict2, angles, precision) + break + else: + add_scan(dict1, dict2, i) + print("scan added l") + process(dict1, dict2, angles, precision) + break + else: + continue + break + + else: + # ask user if he really wants to add + print("UBs are different, do you really wish to add datasets? Y/N") + dict1 = add_dict(dict1, dict2) + return + + +""" + 1. check for bisecting or normal beam geometry in data files; select stt, om, chi, phi for bisecting; select stt, om, nu for normal beam + 2. in the ccl files, check for identical stt, chi and nu within 0.1 degree, and, at the same time, for identical om and phi within 0.05 degree; + 3. in the dat files, check for identical stt, chi and nu within 0.1 degree, and, at the same time, + for identical phi within 0.05 degree, and, at the same time, for identical om within 5 degree.""" + + +def unified_merge(dict1, dict2): + if not check_zebramode(dict1, dict2): + print("You are trying to add two files with different zebra mdoe") + return + + # decide angles + if dict1["meta"]["zebra_mode"] == "bi": + angles = ["twotheta_angle", "omega_angle", "chi_angle", "phi_angle"] + elif dict1["meta"]["zebra_mode"] == "nb": + angles = ["gamma_angle", "omega_angle", "nu_angle"] + + # precision of angles to check + precision = { + "twotheta_angle": 0.1, + "chi_angle": 0.1, + "nu_angle": 0.1, + "phi_angle": 0.05, + "omega_angle": 5, + "gamma_angle": 0.05, + } + if (dict1["meta"]["data_type"] == "ccl") and (dict2["meta"]["data_type"] == "ccl"): + precision["omega_angle"] = 0.05 + + # check for duplicates in original files + for d in dict1, dict2: + # no duplicates in dats + if d["meta"]["data_type"] == "dat": + continue + else: + merge_dups(d, angles) + + process(dict1, dict2, angles, precision) + + +def add_dict(dict1, dict2): + """adds two dictionaries, meta of the new is saved as meata+original_filename and + measurements are shifted to continue with numbering of first dict + :arg dict1 : dictionarry to add to + :arg dict2 : dictionarry from which to take the measurements + :return dict1 : combined dictionary + Note: dict1 must be made from ccl, otherwise we would have to change the structure of loaded + dat file""" + try: + if dict1["meta"]["zebra_mode"] != dict2["meta"]["zebra_mode"]: + print("You are trying to add scans measured with different zebra modes") + return + # this is for the qscan case + except KeyError: + print("Zebra mode not specified") + max_measurement_dict1 = max([keys for keys in dict1["scan"]]) + new_filenames = np.arange( + max_measurement_dict1 + 1, max_measurement_dict1 + 1 + len(dict2["scan"]) + ) + new_meta_name = "meta" + str(dict2["meta"]["original_filename"]) + if new_meta_name not in dict1: + for keys, name in zip(dict2["scan"], new_filenames): + dict2["scan"][keys]["file_of_origin"] = str(dict2["meta"]["original_filename"]) + dict1["scan"][name] = dict2["scan"][keys] + + dict1[new_meta_name] = dict2["meta"] + else: + raise KeyError( + str( + "The file %s has alredy been added to %s" + % (dict2["meta"]["original_filename"], dict1["meta"]["original_filename"]) + ) + ) + return dict1