pyzebra/pyzebra/ccl_process.py
2023-11-29 14:10:35 +01:00

342 lines
11 KiB
Python

import logging
import os
import numpy as np
from lmfit.models import GaussianModel, LinearModel, PseudoVoigtModel, VoigtModel
from scipy.integrate import simpson, trapezoid
from pyzebra import CCL_ANGLES
logger = logging.getLogger(__name__)
PARAM_PRECISIONS = {
"twotheta": 0.1,
"chi": 0.1,
"nu": 0.1,
"phi": 0.05,
"omega": 0.05,
"gamma": 0.05,
"temp": 1,
"mf": 0.001,
"ub": 0.01,
}
MAX_RANGE_GAP = {"omega": 0.5}
MOTOR_POS_PRECISION = 0.01
AREA_METHODS = ("fit_area", "int_area")
def normalize_dataset(dataset, monitor=100_000):
for scan in dataset:
monitor_ratio = monitor / scan["monitor"]
scan["counts"] *= monitor_ratio
scan["counts_err"] *= monitor_ratio
scan["monitor"] = monitor
def merge_duplicates(dataset, log=logger):
merged = np.zeros(len(dataset), dtype=bool)
for ind_into, scan_into in enumerate(dataset):
for ind_from, scan_from in enumerate(dataset[ind_into + 1 :], start=ind_into + 1):
if _parameters_match(scan_into, scan_from) and not merged[ind_from]:
merge_scans(scan_into, scan_from, log=log)
merged[ind_from] = True
def _parameters_match(scan1, scan2):
zebra_mode = scan1["zebra_mode"]
if zebra_mode != scan2["zebra_mode"]:
return False
for param in ("ub", *(vars[0] for vars in CCL_ANGLES[zebra_mode])):
if param.startswith("skip"):
# ignore skip parameters, like the last angle in 'nb' zebra mode
continue
if param == scan1["scan_motor"] == scan2["scan_motor"]:
# check if ranges of variable parameter overlap
r1_start, r1_end = scan1[param][0], scan1[param][-1]
r2_start, r2_end = scan2[param][0], scan2[param][-1]
# support reversed ranges
if r1_start > r1_end:
r1_start, r1_end = r1_end, r1_start
if r2_start > r2_end:
r2_start, r2_end = r2_end, r2_start
# maximum gap between ranges of the scanning parameter (default 0)
max_range_gap = MAX_RANGE_GAP.get(param, 0)
if max(r1_start - r2_end, r2_start - r1_end) > max_range_gap:
return False
elif (
np.max(np.abs(np.median(scan1[param]) - np.median(scan2[param])))
> PARAM_PRECISIONS[param]
):
return False
return True
def merge_datasets(dataset_into, dataset_from, log=logger):
scan_motors_into = dataset_into[0]["scan_motors"]
scan_motors_from = dataset_from[0]["scan_motors"]
if scan_motors_into != scan_motors_from:
log.warning(
f"Scan motors mismatch between datasets: {scan_motors_into} vs {scan_motors_from}"
)
return
merged = np.zeros(len(dataset_from), dtype=bool)
for scan_into in dataset_into:
for ind, scan_from in enumerate(dataset_from):
if _parameters_match(scan_into, scan_from) and not merged[ind]:
if scan_into["counts"].ndim == 3:
merge_h5_scans(scan_into, scan_from, log=log)
else: # scan_into["counts"].ndim == 1
merge_scans(scan_into, scan_from, log=log)
merged[ind] = True
for scan_from in dataset_from:
dataset_into.append(scan_from)
def merge_scans(scan_into, scan_from, log=logger):
if "init_scan" not in scan_into:
scan_into["init_scan"] = scan_into.copy()
if "merged_scans" not in scan_into:
scan_into["merged_scans"] = []
if scan_from in scan_into["merged_scans"]:
return
scan_into["merged_scans"].append(scan_from)
scan_motor = scan_into["scan_motor"] # the same as scan_from["scan_motor"]
pos_all = np.array([])
val_all = np.array([])
err_all = np.array([])
for scan in [scan_into["init_scan"], *scan_into["merged_scans"]]:
pos_all = np.append(pos_all, scan[scan_motor])
val_all = np.append(val_all, scan["counts"])
err_all = np.append(err_all, scan["counts_err"] ** 2)
sort_index = np.argsort(pos_all)
pos_all = pos_all[sort_index]
val_all = val_all[sort_index]
err_all = err_all[sort_index]
pos_tmp = pos_all[:1]
val_tmp = val_all[:1]
err_tmp = err_all[:1]
num_tmp = np.array([1])
for pos, val, err in zip(pos_all[1:], val_all[1:], err_all[1:]):
if pos - pos_tmp[-1] < MOTOR_POS_PRECISION:
# the repeated motor position
val_tmp[-1] += val
err_tmp[-1] += err
num_tmp[-1] += 1
else:
# a new motor position
pos_tmp = np.append(pos_tmp, pos)
val_tmp = np.append(val_tmp, val)
err_tmp = np.append(err_tmp, err)
num_tmp = np.append(num_tmp, 1)
scan_into[scan_motor] = pos_tmp
scan_into["counts"] = val_tmp / num_tmp
scan_into["counts_err"] = np.sqrt(err_tmp) / num_tmp
scan_from["export"] = False
fname1 = os.path.basename(scan_into["original_filename"])
fname2 = os.path.basename(scan_from["original_filename"])
log.info(f'Merging scans: {scan_into["idx"]} ({fname1}) <-- {scan_from["idx"]} ({fname2})')
def merge_h5_scans(scan_into, scan_from, log=logger):
if "init_scan" not in scan_into:
scan_into["init_scan"] = scan_into.copy()
if "merged_scans" not in scan_into:
scan_into["merged_scans"] = []
for scan in scan_into["merged_scans"]:
if scan_from is scan:
log.warning("Already merged scan")
return
scan_into["merged_scans"].append(scan_from)
scan_motor = scan_into["scan_motor"] # the same as scan_from["scan_motor"]
pos_all = [scan_into["init_scan"][scan_motor]]
val_all = [scan_into["init_scan"]["counts"]]
err_all = [scan_into["init_scan"]["counts_err"] ** 2]
for scan in scan_into["merged_scans"]:
pos_all.append(scan[scan_motor])
val_all.append(scan["counts"])
err_all.append(scan["counts_err"] ** 2)
pos_all = np.concatenate(pos_all)
val_all = np.concatenate(val_all)
err_all = np.concatenate(err_all)
sort_index = np.argsort(pos_all)
pos_all = pos_all[sort_index]
val_all = val_all[sort_index]
err_all = err_all[sort_index]
pos_tmp = [pos_all[0]]
val_tmp = [val_all[:1]]
err_tmp = [err_all[:1]]
num_tmp = [1]
for pos, val, err in zip(pos_all[1:], val_all[1:], err_all[1:]):
if pos - pos_tmp[-1] < MOTOR_POS_PRECISION:
# the repeated motor position
val_tmp[-1] += val
err_tmp[-1] += err
num_tmp[-1] += 1
else:
# a new motor position
pos_tmp.append(pos)
val_tmp.append(val[None, :])
err_tmp.append(err[None, :])
num_tmp.append(1)
pos_tmp = np.array(pos_tmp)
val_tmp = np.concatenate(val_tmp)
err_tmp = np.concatenate(err_tmp)
num_tmp = np.array(num_tmp)
scan_into[scan_motor] = pos_tmp
scan_into["counts"] = val_tmp / num_tmp[:, None, None]
scan_into["counts_err"] = np.sqrt(err_tmp) / num_tmp[:, None, None]
scan_from["export"] = False
fname1 = os.path.basename(scan_into["original_filename"])
fname2 = os.path.basename(scan_from["original_filename"])
log.info(f'Merging scans: {scan_into["idx"]} ({fname1}) <-- {scan_from["idx"]} ({fname2})')
def restore_scan(scan):
if "merged_scans" in scan:
for merged_scan in scan["merged_scans"]:
merged_scan["export"] = True
if "init_scan" in scan:
tmp = scan["init_scan"]
scan.clear()
scan.update(tmp)
# force scan export to True, otherwise in the sequence of incorrectly merged scans
# a <- b <- c the scan b will be restored with scan["export"] = False if restoring executed
# in the same order, i.e. restore a -> restore b
scan["export"] = True
def fit_scan(scan, model_dict, fit_from=None, fit_to=None, log=logger):
if fit_from is None:
fit_from = -np.inf
if fit_to is None:
fit_to = np.inf
y_fit = scan["counts"]
y_err = scan["counts_err"]
x_fit = scan[scan["scan_motor"]]
# apply fitting range
fit_ind = (fit_from <= x_fit) & (x_fit <= fit_to)
if not np.any(fit_ind):
log.warning(f"No data in fit range for scan {scan['idx']}")
return
y_fit = y_fit[fit_ind]
y_err = y_err[fit_ind]
x_fit = x_fit[fit_ind]
model = None
for model_index, (model_name, model_param) in enumerate(model_dict.items()):
model_name, _ = model_name.split("-")
prefix = f"f{model_index}_"
if model_name == "linear":
_model = LinearModel(prefix=prefix)
elif model_name == "gaussian":
_model = GaussianModel(prefix=prefix)
elif model_name == "voigt":
_model = VoigtModel(prefix=prefix)
elif model_name == "pvoigt":
_model = PseudoVoigtModel(prefix=prefix)
else:
raise ValueError(f"Unknown model name: '{model_name}'")
_init_guess = _model.guess(y_fit, x=x_fit)
for param_index, param_name in enumerate(model_param["param"]):
param_hints = {}
for hint_name in ("value", "vary", "min", "max"):
tmp = model_param[hint_name][param_index]
if tmp is None:
param_hints[hint_name] = getattr(_init_guess[prefix + param_name], hint_name)
else:
param_hints[hint_name] = tmp
if "center" in param_name:
if np.isneginf(param_hints["min"]):
param_hints["min"] = np.min(x_fit)
if np.isposinf(param_hints["max"]):
param_hints["max"] = np.max(x_fit)
if "sigma" in param_name:
if np.isposinf(param_hints["max"]):
param_hints["max"] = np.max(x_fit) - np.min(x_fit)
_model.set_param_hint(param_name, **param_hints)
if model is None:
model = _model
else:
model += _model
scan["fit"] = model.fit(y_fit, x=x_fit, weights=1 / y_err)
def get_area(scan, area_method, lorentz):
if "fit" not in scan:
return
if area_method not in AREA_METHODS:
raise ValueError(f"Unknown area method: {area_method}.")
if area_method == "fit_area":
area_v = 0
area_s = 0
for name, param in scan["fit"].params.items():
if "amplitude" in name:
area_v += np.nan if param.value is None else param.value
area_s += np.nan if param.stderr is None else param.stderr
else: # area_method == "int_area"
y_val = scan["counts"]
x_val = scan[scan["scan_motor"]]
y_bkg = scan["fit"].eval_components(x=x_val)["f0_"]
area_v = simpson(y_val, x=x_val) - trapezoid(y_bkg, x=x_val)
area_s = np.sqrt(area_v)
if lorentz:
# lorentz correction to area
if scan["zebra_mode"] == "bi":
twotheta = np.deg2rad(scan["twotheta"])
corr_factor = np.sin(twotheta)
else: # zebra_mode == "nb":
gamma = np.deg2rad(scan["gamma"])
nu = np.deg2rad(scan["nu"])
corr_factor = np.sin(gamma) * np.cos(nu)
area_v = np.abs(area_v * corr_factor)
area_s = np.abs(area_s * corr_factor)
scan["area"] = (area_v, area_s)