Update param_study_moduls.py
Updated parametric study module with merging, adding etc...
This commit is contained in:
parent
0347566aeb
commit
6099df650b
@ -1,5 +1,4 @@
|
||||
from load_1D import load_1D
|
||||
from ccl_dict_operation import add_dict
|
||||
import pandas as pd
|
||||
from mpl_toolkits.mplot3d import Axes3D # dont delete, otherwise waterfall wont work
|
||||
import matplotlib.pyplot as plt
|
||||
@ -7,6 +6,17 @@ import matplotlib as mpl
|
||||
import numpy as np
|
||||
import pickle
|
||||
import scipy.io as sio
|
||||
import uncertainties as u
|
||||
|
||||
|
||||
def create_tuples(x, y, y_err):
|
||||
"""creates tuples for sorting and merginng of the data
|
||||
Counts need to be normalized to monitor before"""
|
||||
t = list()
|
||||
for i in range(len(x)):
|
||||
tup = (x[i], y[i], y_err[i])
|
||||
t.append(tup)
|
||||
return t
|
||||
|
||||
|
||||
def load_dats(filepath):
|
||||
@ -38,10 +48,10 @@ def load_dats(filepath):
|
||||
dict1 = add_dict(dict1, load_1D(file_list[i][0]))
|
||||
else:
|
||||
dict1 = add_dict(dict1, load_1D(file_list[i]))
|
||||
dict1["scan"][i + 1]["params"] = {}
|
||||
dict1["meas"][i + 1]["params"] = {}
|
||||
if data_type == "txt":
|
||||
for x in range(len(col_names) - 1):
|
||||
dict1["scan"][i + 1]["params"][col_names[x + 1]] = file_list[i][x + 1]
|
||||
dict1["meas"][i + 1]["params"][col_names[x + 1]] = file_list[i][x + 1]
|
||||
|
||||
return dict1
|
||||
|
||||
@ -53,7 +63,7 @@ def create_dataframe(dict1):
|
||||
# create dictionary to which we pull only wanted items before transforming it to pd.dataframe
|
||||
pull_dict = {}
|
||||
pull_dict["filenames"] = list()
|
||||
for key in dict1["scan"][1]["params"]:
|
||||
for key in dict1["meas"][1]["params"]:
|
||||
pull_dict[key] = list()
|
||||
pull_dict["temperature"] = list()
|
||||
pull_dict["mag_field"] = list()
|
||||
@ -63,19 +73,19 @@ def create_dataframe(dict1):
|
||||
pull_dict["Counts"] = list()
|
||||
|
||||
# populate the dict
|
||||
for keys in dict1["scan"]:
|
||||
if "file_of_origin" in dict1["scan"][keys]:
|
||||
pull_dict["filenames"].append(dict1["scan"][keys]["file_of_origin"].split("/")[-1])
|
||||
for keys in dict1["meas"]:
|
||||
if "file_of_origin" in dict1["meas"][keys]:
|
||||
pull_dict["filenames"].append(dict1["meas"][keys]["file_of_origin"].split("/")[-1])
|
||||
else:
|
||||
pull_dict["filenames"].append(dict1["meta"]["original_filename"].split("/")[-1])
|
||||
for key in dict1["scan"][keys]["params"]:
|
||||
pull_dict[str(key)].append(float(dict1["scan"][keys]["params"][key]))
|
||||
pull_dict["temperature"].append(dict1["scan"][keys]["temperature"])
|
||||
pull_dict["mag_field"].append(dict1["scan"][keys]["mag_field"])
|
||||
pull_dict["fit_area"].append(dict1["scan"][keys]["fit"]["fit_area"])
|
||||
pull_dict["int_area"].append(dict1["scan"][keys]["fit"]["int_area"])
|
||||
pull_dict["om"].append(dict1["scan"][keys]["om"])
|
||||
pull_dict["Counts"].append(dict1["scan"][keys]["Counts"])
|
||||
for key in dict1["meas"][keys]["params"]:
|
||||
pull_dict[str(key)].append(float(dict1["meas"][keys]["params"][key]))
|
||||
pull_dict["temperature"].append(dict1["meas"][keys]["temperature"])
|
||||
pull_dict["mag_field"].append(dict1["meas"][keys]["mag_field"])
|
||||
pull_dict["fit_area"].append(dict1["meas"][keys]["fit"]["fit_area"])
|
||||
pull_dict["int_area"].append(dict1["meas"][keys]["fit"]["int_area"])
|
||||
pull_dict["om"].append(dict1["meas"][keys]["om"])
|
||||
pull_dict["Counts"].append(dict1["meas"][keys]["Counts"])
|
||||
|
||||
return pd.DataFrame(data=pull_dict)
|
||||
|
||||
@ -144,7 +154,7 @@ def make_graph(data, sorting_parameter, style):
|
||||
|
||||
|
||||
def save_dict(obj, name):
|
||||
""" saves dictionary as pickle file in binary format
|
||||
"""saves dictionary as pickle file in binary format
|
||||
:arg obj - object to save
|
||||
:arg name - name of the file
|
||||
NOTE: path should be added later"""
|
||||
@ -200,3 +210,172 @@ def save_table(data, filetype, name, path=None):
|
||||
hdf.close()
|
||||
if filetype == "json":
|
||||
data.to_json((path + name + ".json"))
|
||||
|
||||
def normalize(dict, key, monitor):
|
||||
"""Normalizes the measurement to monitor, checks if sigma exists, otherwise creates it
|
||||
:arg dict : dictionary to from which to tkae the scan
|
||||
:arg key : which scan to normalize from dict1
|
||||
:arg monitor : final monitor
|
||||
:return counts - normalized counts
|
||||
:return sigma - normalized sigma"""
|
||||
|
||||
counts = np.array(dict["meas"][key]["Counts"])
|
||||
sigma = np.sqrt(counts) if "sigma" not in dict["meas"][key] else dict["meas"][key]["sigma"]
|
||||
monitor_ratio = monitor / dict["meas"][key]["monitor"]
|
||||
scaled_counts = counts * monitor_ratio
|
||||
scaled_sigma = np.array(sigma) * monitor_ratio
|
||||
|
||||
return scaled_counts, scaled_sigma
|
||||
|
||||
def merge(dict1, dict2, scand_dict_result, auto=True, monitor=100000):
|
||||
"""merges the two tuples and sorts them, if om value is same, Counts value is average
|
||||
averaging is propagated into sigma if dict1 == dict2, key[1] is deleted after merging
|
||||
:arg dict1 : dictionary to which measurement will be merged
|
||||
:arg dict2 : dictionary from which measurement will be merged
|
||||
:arg keys : tuple with key to dict1 and dict2
|
||||
:arg auto : if true, when monitors are same, does not change it, if flase, takes monitor always
|
||||
:arg monitor : final monitor after merging
|
||||
note: dict1 and dict2 can be same dict
|
||||
:return dict1 with merged scan"""
|
||||
for keys in scand_dict_result:
|
||||
for j in range(len(scand_dict_result[keys])):
|
||||
first, second = scand_dict_result[keys][j][0], scand_dict_result[keys][j][1]
|
||||
print(first, second)
|
||||
if auto:
|
||||
if dict1["meas"][first]["monitor"] == dict2["meas"][second]["monitor"]:
|
||||
monitor = dict1["meas"][first]["monitor"]
|
||||
|
||||
# load om and Counts
|
||||
x1, x2 = dict1["meas"][first]["om"], dict2["meas"][second]["om"]
|
||||
cor_y1, y_err1 = normalize(dict1, first, monitor=monitor)
|
||||
cor_y2, y_err2 = normalize(dict2, second, monitor=monitor)
|
||||
# creates touples (om, Counts, sigma) for sorting and further processing
|
||||
tuple_list = create_tuples(x1, cor_y1, y_err1) + create_tuples(x2, cor_y2, y_err2)
|
||||
# Sort the list on om and add 0 0 0 tuple to the last position
|
||||
sorted_t = sorted(tuple_list, key=lambda tup: tup[0])
|
||||
sorted_t.append((0, 0, 0))
|
||||
om, Counts, sigma = [], [], []
|
||||
seen = list()
|
||||
for i in range(len(sorted_t) - 1):
|
||||
if sorted_t[i][0] not in seen:
|
||||
if sorted_t[i][0] != sorted_t[i + 1][0]:
|
||||
om = np.append(om, sorted_t[i][0])
|
||||
Counts = np.append(Counts, sorted_t[i][1])
|
||||
sigma = np.append(sigma, sorted_t[i][2])
|
||||
else:
|
||||
om = np.append(om, sorted_t[i][0])
|
||||
counts1, counts2 = sorted_t[i][1], sorted_t[i + 1][1]
|
||||
sigma1, sigma2 = sorted_t[i][2], sorted_t[i + 1][2]
|
||||
count_err1 = u.ufloat(counts1, sigma1)
|
||||
count_err2 = u.ufloat(counts2, sigma2)
|
||||
avg = (count_err1 + count_err2) / 2
|
||||
Counts = np.append(Counts, avg.n)
|
||||
sigma = np.append(sigma, avg.s)
|
||||
seen.append(sorted_t[i][0])
|
||||
else:
|
||||
continue
|
||||
|
||||
if dict1 == dict2:
|
||||
del dict1["meas"][second]
|
||||
|
||||
note = (
|
||||
f"This measurement was merged with measurement {second} from "
|
||||
f'file {dict2["meta"]["original_filename"]} \n'
|
||||
)
|
||||
if "notes" not in dict1["meas"][first]:
|
||||
dict1["meas"][first]["notes"] = note
|
||||
else:
|
||||
dict1["meas"][first]["notes"] += note
|
||||
|
||||
dict1["meas"][first]["om"] = om
|
||||
dict1["meas"][first]["Counts"] = Counts
|
||||
dict1["meas"][first]["sigma"] = sigma
|
||||
dict1["meas"][first]["monitor"] = monitor
|
||||
print("merging done")
|
||||
return dict1
|
||||
|
||||
|
||||
def add_dict(dict1, dict2):
|
||||
"""adds two dictionaries, meta of the new is saved as meata+original_filename and
|
||||
measurements are shifted to continue with numbering of first dict
|
||||
:arg dict1 : dictionarry to add to
|
||||
:arg dict2 : dictionarry from which to take the measurements
|
||||
:return dict1 : combined dictionary
|
||||
Note: dict1 must be made from ccl, otherwise we would have to change the structure of loaded
|
||||
dat file"""
|
||||
if dict1["meta"]["zebra_mode"] != dict2["meta"]["zebra_mode"]:
|
||||
print("You are trying to add scans measured with different zebra modes")
|
||||
return
|
||||
max_measurement_dict1 = max([keys for keys in dict1["meas"]])
|
||||
new_filenames = np.arange(
|
||||
max_measurement_dict1 + 1, max_measurement_dict1 + 1 + len(dict2["meas"])
|
||||
)
|
||||
new_meta_name = "meta" + str(dict2["meta"]["original_filename"])
|
||||
if new_meta_name not in dict1:
|
||||
for keys, name in zip(dict2["meas"], new_filenames):
|
||||
dict2["meas"][keys]["file_of_origin"] = str(dict2["meta"]["original_filename"])
|
||||
dict1["meas"][name] = dict2["meas"][keys]
|
||||
|
||||
dict1[new_meta_name] = dict2["meta"]
|
||||
else:
|
||||
raise KeyError(
|
||||
str(
|
||||
"The file %s has alredy been added to %s"
|
||||
% (dict2["meta"]["original_filename"], dict1["meta"]["original_filename"])
|
||||
)
|
||||
)
|
||||
return dict1
|
||||
|
||||
|
||||
def auto(dict):
|
||||
"""takes just unique tuples from all tuples in dictionary returend by scan_dict
|
||||
intendet for automatic merge if you doesent want to specify what scans to merge together
|
||||
args: dict - dictionary from scan_dict function
|
||||
:return dict - dict without repetitions"""
|
||||
for keys in dict:
|
||||
tuple_list = dict[keys]
|
||||
new = list()
|
||||
for i in range(len(tuple_list)):
|
||||
if tuple_list[0][0] == tuple_list[i][0]:
|
||||
new.append(tuple_list[i])
|
||||
dict[keys] = new
|
||||
return dict
|
||||
|
||||
|
||||
def scan_dict(dict, precision=0.5):
|
||||
"""scans dictionary for duplicate angles indexes
|
||||
:arg dict : dictionary to scan
|
||||
:arg precision : in deg, sometimes angles are zero so its easier this way, instead of
|
||||
checking zero division
|
||||
:return dictionary with matching scans, if there are none, the dict is empty
|
||||
note: can be checked by "not d", true if empty
|
||||
"""
|
||||
if dict["meta"]["zebra_mode"] == "bi":
|
||||
angles = ["twotheta_angle", "omega_angle", "chi_angle", "phi_angle"]
|
||||
elif dict["meta"]["zebra_mode"] == "nb":
|
||||
angles = ["gamma_angle", "omega_angle", "nu_angle"]
|
||||
else:
|
||||
print("Unknown zebra mode")
|
||||
return
|
||||
|
||||
d = {}
|
||||
for i in dict["meas"]:
|
||||
for j in dict["meas"]:
|
||||
if dict["meas"][i] != dict["meas"][j]:
|
||||
itup = list()
|
||||
for k in angles:
|
||||
itup.append(abs(abs(dict["meas"][i][k]) - abs(dict["meas"][j][k])))
|
||||
|
||||
if all(i <= precision for i in itup):
|
||||
if str([np.around(dict["meas"][i][k], 1) for k in angles]) not in d:
|
||||
d[str([np.around(dict["meas"][i][k], 1) for k in angles])] = list()
|
||||
d[str([np.around(dict["meas"][i][k], 1) for k in angles])].append((i, j))
|
||||
else:
|
||||
d[str([np.around(dict["meas"][i][k], 1) for k in angles])].append((i, j))
|
||||
|
||||
else:
|
||||
pass
|
||||
|
||||
else:
|
||||
continue
|
||||
return d
|
||||
|
Loading…
x
Reference in New Issue
Block a user