First draft of new merge function

This is the first shot at the new merge function. I didnt wanna rewrite the previous before we agree on this, since I think there will be some changes. Therefore I would like to discuss this first. Since we agreed not to do is as previously, ergo first scan everything and then merge or add, I've tried to do these function recursive. Haven't tested it much, I would like to agree if this would be a good way on how to write it.
This commit is contained in:
JakHolzer 2020-11-17 15:25:09 +01:00 committed by GitHub
parent 8c8715b041
commit f7f016cf1c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

294
pyzebra/merge_function.py Normal file
View File

@ -0,0 +1,294 @@
import numpy as np
import uncertainties as u
def create_tuples(x, y, y_err):
"""creates tuples for sorting and merginng of the data
Counts need to be normalized to monitor before"""
t = list()
for i in range(len(x)):
tup = (x[i], y[i], y_err[i])
t.append(tup)
return t
def normalize(scan, monitor):
"""Normalizes the measurement to monitor, checks if sigma exists, otherwise creates it
:arg dict : dictionary to from which to tkae the scan
:arg key : which scan to normalize from dict1
:arg monitor : final monitor
:return counts - normalized counts
:return sigma - normalized sigma"""
counts = np.array(scan["Counts"])
sigma = np.sqrt(counts) if "sigma" not in scan else scan["sigma"]
monitor_ratio = monitor / scan["monitor"]
scaled_counts = counts * monitor_ratio
scaled_sigma = np.array(sigma) * monitor_ratio
return scaled_counts, scaled_sigma
def merge(scan1, scan2, keep=True, monitor=100000):
"""merges the two tuples and sorts them, if om value is same, Counts value is average
averaging is propagated into sigma if dict1 == dict2, key[1] is deleted after merging
:arg dict1 : dictionary to which measurement will be merged
:arg dict2 : dictionary from which measurement will be merged
:arg scand_dict_result : result of scan_dict after auto function
:arg keep : if true, when monitors are same, does not change it, if flase, takes monitor
always
:arg monitor : final monitor after merging
note: dict1 and dict2 can be same dict
:return dict1 with merged scan"""
if keep:
if scan1["monitor"] == scan2["monitor"]:
monitor = scan1["monitor"]
# load om and Counts
x1, x2 = scan1["om"], scan2["om"]
cor_y1, y_err1 = normalize(scan1, monitor=monitor)
cor_y2, y_err2 = normalize(scan2, monitor=monitor)
# creates touples (om, Counts, sigma) for sorting and further processing
tuple_list = create_tuples(x1, cor_y1, y_err1) + create_tuples(x2, cor_y2, y_err2)
# Sort the list on om and add 0 0 0 tuple to the last position
sorted_t = sorted(tuple_list, key=lambda tup: tup[0])
sorted_t.append((0, 0, 0))
om, Counts, sigma = [], [], []
seen = list()
for i in range(len(sorted_t) - 1):
if sorted_t[i][0] not in seen:
if sorted_t[i][0] != sorted_t[i + 1][0]:
om = np.append(om, sorted_t[i][0])
Counts = np.append(Counts, sorted_t[i][1])
sigma = np.append(sigma, sorted_t[i][2])
else:
om = np.append(om, sorted_t[i][0])
counts1, counts2 = sorted_t[i][1], sorted_t[i + 1][1]
sigma1, sigma2 = sorted_t[i][2], sorted_t[i + 1][2]
count_err1 = u.ufloat(counts1, sigma1)
count_err2 = u.ufloat(counts2, sigma2)
avg = (count_err1 + count_err2) / 2
Counts = np.append(Counts, avg.n)
sigma = np.append(sigma, avg.s)
seen.append(sorted_t[i][0])
else:
continue
scan1["om"] = om
scan1["Counts"] = Counts
scan1["sigma"] = sigma
scan1["monitor"] = monitor
print("merging done")
def check_UB(dict1, dict2, precision=0.01):
truth_list = list()
for i in ["ub1j", "ub2j", "ub3j"]:
for j in range(3):
if abs(abs(float(dict1["meta"][i][j])) - abs(float(dict2["meta"][i][j]))) < precision:
truth_list.append(True)
else:
truth_list.append(False)
# print(truth_list)
if all(truth_list):
return True
else:
return False
def check_zebramode(dict1, dict2):
if dict1["meta"]["zebra_mode"] == dict2["meta"]["zebra_mode"]:
return True
else:
return False
def check_angles(scan1, scan2, angles, precision):
truth_list = list()
for item in angles:
if abs(abs(scan1[item]) - abs(scan2[item])) <= precision[item]:
truth_list.append(True)
else:
truth_list.append(False)
if all(truth_list):
return True
else:
return False
def check_temp_mag(scan1, scan2):
temp_diff = 1
mag_diff = 0.001
truth_list = list()
try:
if abs(abs(scan1["mag_field"]) - abs(scan2["mag_field"])) <= mag_diff:
truth_list.append(True)
else:
truth_list.append(False)
except KeyError:
print("mag_field missing")
try:
if abs(abs(scan1["temperature"]) - abs(scan2["temperature"])) <= temp_diff:
truth_list.append(True)
else:
truth_list.append(False)
except KeyError:
print("temperature missing")
if all(truth_list):
return True
else:
return False
def merge_dups(dictionary, angles):
precision = {
"twotheta_angle": 0.1,
"chi_angle": 0.1,
"nu_angle": 0.1,
"phi_angle": 0.05,
"omega_angle": 0.05,
"gamma_angle": 0.05,
}
for i in list(dictionary["scan"]):
for j in list(dictionary["scan"]):
if i == j:
continue
else:
# print(i, j)
if check_angles(dictionary["scan"][i], dictionary["scan"][j], angles, precision):
merge(dictionary["scan"][i], dictionary["scan"][j])
print("merged %d with %d" % (i, j))
del dictionary["scan"][j]
merge_dups(dictionary, angles)
break
else:
continue
break
def add_scan(dict1, dict2, scan_to_add):
max_scan = np.max(list(dict1["scan"]))
dict1["scan"][max_scan + 1] = dict2["scan"][scan_to_add]
del dict2["scan"][scan_to_add]
def process(dict1, dict2, angles, precision):
# stop when the second dict is empty
# print(dict2["scan"])
if dict2["scan"]:
print("doing something")
# check UB matrixes
if check_UB(dict1, dict2):
# iterate over second dict and check for matches
for i in list(dict2["scan"]):
for j in list(dict1["scan"]):
if check_angles(dict1["scan"][j], dict2["scan"][i], angles, precision):
# angles good, see the mag and temp
if check_temp_mag(dict1["scan"][j], dict2["scan"][i]):
merge(dict1["scan"][j], dict2["scan"][i])
print("merged")
del dict2["scan"][i]
process(dict1, dict2, angles, precision)
break
else:
add_scan(dict1, dict2, i)
print("scan added r")
process(dict1, dict2, angles, precision)
break
else:
add_scan(dict1, dict2, i)
print("scan added l")
process(dict1, dict2, angles, precision)
break
else:
continue
break
else:
# ask user if he really wants to add
print("UBs are different, do you really wish to add datasets? Y/N")
dict1 = add_dict(dict1, dict2)
return
"""
1. check for bisecting or normal beam geometry in data files; select stt, om, chi, phi for bisecting; select stt, om, nu for normal beam
2. in the ccl files, check for identical stt, chi and nu within 0.1 degree, and, at the same time, for identical om and phi within 0.05 degree;
3. in the dat files, check for identical stt, chi and nu within 0.1 degree, and, at the same time,
for identical phi within 0.05 degree, and, at the same time, for identical om within 5 degree."""
def unified_merge(dict1, dict2):
if not check_zebramode(dict1, dict2):
print("You are trying to add two files with different zebra mdoe")
return
# decide angles
if dict1["meta"]["zebra_mode"] == "bi":
angles = ["twotheta_angle", "omega_angle", "chi_angle", "phi_angle"]
elif dict1["meta"]["zebra_mode"] == "nb":
angles = ["gamma_angle", "omega_angle", "nu_angle"]
# precision of angles to check
precision = {
"twotheta_angle": 0.1,
"chi_angle": 0.1,
"nu_angle": 0.1,
"phi_angle": 0.05,
"omega_angle": 5,
"gamma_angle": 0.05,
}
if (dict1["meta"]["data_type"] == "ccl") and (dict2["meta"]["data_type"] == "ccl"):
precision["omega_angle"] = 0.05
# check for duplicates in original files
for d in dict1, dict2:
# no duplicates in dats
if d["meta"]["data_type"] == "dat":
continue
else:
merge_dups(d, angles)
process(dict1, dict2, angles, precision)
def add_dict(dict1, dict2):
"""adds two dictionaries, meta of the new is saved as meata+original_filename and
measurements are shifted to continue with numbering of first dict
:arg dict1 : dictionarry to add to
:arg dict2 : dictionarry from which to take the measurements
:return dict1 : combined dictionary
Note: dict1 must be made from ccl, otherwise we would have to change the structure of loaded
dat file"""
try:
if dict1["meta"]["zebra_mode"] != dict2["meta"]["zebra_mode"]:
print("You are trying to add scans measured with different zebra modes")
return
# this is for the qscan case
except KeyError:
print("Zebra mode not specified")
max_measurement_dict1 = max([keys for keys in dict1["scan"]])
new_filenames = np.arange(
max_measurement_dict1 + 1, max_measurement_dict1 + 1 + len(dict2["scan"])
)
new_meta_name = "meta" + str(dict2["meta"]["original_filename"])
if new_meta_name not in dict1:
for keys, name in zip(dict2["scan"], new_filenames):
dict2["scan"][keys]["file_of_origin"] = str(dict2["meta"]["original_filename"])
dict1["scan"][name] = dict2["scan"][keys]
dict1[new_meta_name] = dict2["meta"]
else:
raise KeyError(
str(
"The file %s has alredy been added to %s"
% (dict2["meta"]["original_filename"], dict1["meta"]["original_filename"])
)
)
return dict1