Add functions for 2D detector data merging

This commit is contained in:
usov_i 2022-02-14 11:04:16 +01:00
parent ac6f67cc53
commit e8ce57b56b

View File

@ -68,7 +68,10 @@ def _parameters_match(scan1, scan2):
if max(r1_start - r2_end, r2_start - r1_end) > max_range_gap:
return False
elif np.max(np.abs(scan1[param] - scan2[param])) > PARAM_PRECISIONS[param]:
elif (
np.max(np.abs(np.median(scan1[param]) - np.median(scan2[param])))
> PARAM_PRECISIONS[param]
):
return False
return True
@ -85,7 +88,10 @@ def merge_datasets(dataset_into, dataset_from):
for scan_into in dataset_into:
for ind, scan_from in enumerate(dataset_from):
if _parameters_match(scan_into, scan_from) and not merged[ind]:
merge_scans(scan_into, scan_from)
if scan_into["counts"].ndim == 3:
merge_h5_scans(scan_into, scan_from)
else: # scan_into["counts"].ndim == 1
merge_scans(scan_into, scan_from)
merged[ind] = True
for scan_from in dataset_from:
@ -147,6 +153,63 @@ def merge_scans(scan_into, scan_from):
print(f'Merging scans: {scan_into["idx"]} ({fname1}) <-- {scan_from["idx"]} ({fname2})')
def merge_h5_scans(scan_into, scan_from):
if "init_scan" not in scan_into:
scan_into["init_scan"] = scan_into.copy()
if "merged_scans" not in scan_into:
scan_into["merged_scans"] = []
for scan in scan_into["merged_scans"]:
if scan_from is scan:
print("Already merged scan")
return
scan_into["merged_scans"].append(scan_from)
scan_motor = scan_into["scan_motor"] # the same as scan_from["scan_motor"]
pos_all = scan_into["init_scan"][scan_motor]
val_all = scan_into["init_scan"]["counts"]
err_all = scan_into["init_scan"]["counts_err"] ** 2
for scan in scan_into["merged_scans"]:
pos_all = np.append(pos_all, scan[scan_motor])
val_all = np.concatenate((val_all, scan["counts"]))
err_all = np.concatenate((err_all, scan["counts_err"] ** 2))
sort_index = np.argsort(pos_all)
pos_all = pos_all[sort_index]
val_all = val_all[sort_index]
err_all = err_all[sort_index]
pos_tmp = pos_all[:1]
val_tmp = val_all[:1]
err_tmp = err_all[:1]
num_tmp = np.array([1])
for pos, val, err in zip(pos_all[1:], val_all[1:], err_all[1:]):
if pos - pos_tmp[-1] < MOTOR_POS_PRECISION:
# the repeated motor position
val_tmp[-1] += val
err_tmp[-1] += err
num_tmp[-1] += 1
else:
# a new motor position
pos_tmp = np.append(pos_tmp, pos)
val_tmp = np.concatenate((val_tmp, val[None, :]))
err_tmp = np.concatenate((err_tmp, err[None, :]))
num_tmp = np.append(num_tmp, 1)
scan_into[scan_motor] = pos_tmp
scan_into["counts"] = val_tmp / num_tmp[:, None, None]
scan_into["counts_err"] = np.sqrt(err_tmp) / num_tmp[:, None, None]
scan_from["export"] = False
fname1 = os.path.basename(scan_into["original_filename"])
fname2 = os.path.basename(scan_from["original_filename"])
print(f'Merging scans: {scan_into["idx"]} ({fname1}) <-- {scan_from["idx"]} ({fname2})')
def restore_scan(scan):
if "merged_scans" in scan:
for merged_scan in scan["merged_scans"]: