273 lines
9.6 KiB
Python
273 lines
9.6 KiB
Python
import ast
|
|
import math
|
|
import os
|
|
import subprocess
|
|
import tempfile
|
|
|
|
import numpy as np
|
|
from bokeh.layouts import column, row
|
|
from bokeh.models import (
|
|
Button,
|
|
ColumnDataSource,
|
|
DataTable,
|
|
Panel,
|
|
Spinner,
|
|
TableColumn,
|
|
TextAreaInput,
|
|
TextInput,
|
|
)
|
|
from scipy.optimize import curve_fit
|
|
|
|
import pyzebra
|
|
|
|
|
|
def create():
|
|
path_prefix_textinput = TextInput(title="Path prefix:", value="")
|
|
selection_list = TextAreaInput(title="ROIs:", rows=7)
|
|
lattice_const_textinput = TextInput(
|
|
title="Lattice constants:", value="8.3211,8.3211,8.3211,90.00,90.00,90.00"
|
|
)
|
|
max_res_spinner = Spinner(title="max-res:", value=2, step=0.01, width=145)
|
|
seed_pool_size_spinner = Spinner(title="seed-pool-size:", value=5, step=0.01, width=145)
|
|
seed_len_tol_spinner = Spinner(title="seed-len-tol:", value=0.02, step=0.01, width=145)
|
|
seed_angle_tol_spinner = Spinner(title="seed-angle-tol:", value=1, step=0.01, width=145)
|
|
eval_hkl_tol_spinner = Spinner(title="eval-hkl-tol:", value=0.15, step=0.01, width=145)
|
|
|
|
diff_vec = []
|
|
ub_matrices = []
|
|
|
|
def process_button_callback():
|
|
# drop table selection to clear result fields
|
|
results_table_source.selected.indices = []
|
|
|
|
nonlocal diff_vec
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
temp_peak_list_dir = os.path.join(temp_dir, "peak_list")
|
|
os.mkdir(temp_peak_list_dir)
|
|
temp_event_file = os.path.join(temp_peak_list_dir, "event-0.txt")
|
|
temp_hkl_file = os.path.join(temp_dir, "hkl.h5")
|
|
roi_dict = ast.literal_eval(selection_list.value)
|
|
|
|
comp_proc = subprocess.run(
|
|
[
|
|
"mpiexec",
|
|
"-n",
|
|
"2",
|
|
"python",
|
|
"spind/gen_hkl_table.py",
|
|
lattice_const_textinput.value,
|
|
"--max-res",
|
|
str(max_res_spinner.value),
|
|
"-o",
|
|
temp_hkl_file,
|
|
],
|
|
check=True,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
text=True,
|
|
)
|
|
print(" ".join(comp_proc.args))
|
|
print(comp_proc.stdout)
|
|
|
|
diff_vec = prepare_event_file(temp_event_file, roi_dict, path_prefix_textinput.value)
|
|
|
|
print(f"Content of {temp_event_file}:")
|
|
with open(temp_event_file) as f:
|
|
print(f.read())
|
|
|
|
comp_proc = subprocess.run(
|
|
[
|
|
"mpiexec",
|
|
"-n",
|
|
"2",
|
|
"python",
|
|
"spind/SPIND.py",
|
|
temp_peak_list_dir,
|
|
temp_hkl_file,
|
|
"-o",
|
|
temp_dir,
|
|
"--seed-pool-size",
|
|
str(seed_pool_size_spinner.value),
|
|
"--seed-len-tol",
|
|
str(seed_len_tol_spinner.value),
|
|
"--seed-angle-tol",
|
|
str(seed_angle_tol_spinner.value),
|
|
"--eval-hkl-tol",
|
|
str(eval_hkl_tol_spinner.value),
|
|
],
|
|
check=True,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
text=True,
|
|
)
|
|
print(" ".join(comp_proc.args))
|
|
print(comp_proc.stdout)
|
|
|
|
spind_out_file = os.path.join(temp_dir, "spind.txt")
|
|
spind_res = dict(
|
|
label=[], crystal_id=[], match_rate=[], matched_peaks=[], column_5=[], ub_matrix=[],
|
|
)
|
|
try:
|
|
with open(spind_out_file) as f_out:
|
|
for line in f_out:
|
|
c1, c2, c3, c4, c5, *c_rest = line.split()
|
|
spind_res["label"].append(c1)
|
|
spind_res["crystal_id"].append(c2)
|
|
spind_res["match_rate"].append(c3)
|
|
spind_res["matched_peaks"].append(c4)
|
|
spind_res["column_5"].append(c5)
|
|
|
|
# last digits are spind UB matrix
|
|
vals = list(map(float, c_rest))
|
|
ub_matrix_spind = np.transpose(np.array(vals).reshape(3, 3))
|
|
ub_matrix = np.linalg.inv(ub_matrix_spind)
|
|
ub_matrices.append(ub_matrix)
|
|
spind_res["ub_matrix"].append(str(ub_matrix_spind * 1e-10))
|
|
|
|
print(f"Content of {spind_out_file}:")
|
|
with open(spind_out_file) as f:
|
|
print(f.read())
|
|
|
|
except FileNotFoundError:
|
|
print("No results from spind")
|
|
|
|
results_table_source.data.update(spind_res)
|
|
|
|
process_button = Button(label="Process", button_type="primary")
|
|
process_button.on_click(process_button_callback)
|
|
|
|
ub_matrix_textareainput = TextAreaInput(title="UB matrix:", rows=7, width=400)
|
|
hkl_textareainput = TextAreaInput(title="hkl values:", rows=7, width=400)
|
|
|
|
def results_table_select_callback(_attr, old, new):
|
|
if new:
|
|
ind = new[0]
|
|
ub_matrix = ub_matrices[ind]
|
|
res = ""
|
|
for vec in diff_vec:
|
|
res += f"{ub_matrix @ vec}\n"
|
|
ub_matrix_textareainput.value = str(ub_matrix * 1e10)
|
|
hkl_textareainput.value = res
|
|
else:
|
|
ub_matrix_textareainput.value = ""
|
|
hkl_textareainput.value = ""
|
|
|
|
results_table_source = ColumnDataSource(
|
|
dict(label=[], crystal_id=[], match_rate=[], matched_peaks=[], column_5=[], ub_matrix=[])
|
|
)
|
|
results_table = DataTable(
|
|
source=results_table_source,
|
|
columns=[
|
|
TableColumn(field="label", title="Label", width=50),
|
|
TableColumn(field="crystal_id", title="Crystal ID", width=100),
|
|
TableColumn(field="match_rate", title="Match Rate", width=100),
|
|
TableColumn(field="matched_peaks", title="Matched Peaks", width=100),
|
|
TableColumn(field="column_5", title="", width=100),
|
|
TableColumn(field="ub_matrix", title="UB Matrix", width=700),
|
|
],
|
|
height=300,
|
|
width=1200,
|
|
autosize_mode="none",
|
|
index_position=None,
|
|
)
|
|
|
|
results_table_source.selected.on_change("indices", results_table_select_callback)
|
|
|
|
tab_layout = row(
|
|
column(
|
|
path_prefix_textinput,
|
|
selection_list,
|
|
lattice_const_textinput,
|
|
row(max_res_spinner, seed_pool_size_spinner),
|
|
row(seed_len_tol_spinner, seed_angle_tol_spinner),
|
|
row(eval_hkl_tol_spinner),
|
|
process_button,
|
|
),
|
|
column(results_table, row(ub_matrix_textareainput, hkl_textareainput)),
|
|
)
|
|
|
|
return Panel(child=tab_layout, title="spind")
|
|
|
|
|
|
def gauss(x, *p):
|
|
"""Defines Gaussian function
|
|
Args:
|
|
A - amplitude, mu - position of the center, sigma - width
|
|
Returns:
|
|
Gaussian function
|
|
"""
|
|
A, mu, sigma = p
|
|
return A * np.exp(-((x - mu) ** 2) / (2.0 * sigma ** 2))
|
|
|
|
|
|
def prepare_event_file(export_filename, roi_dict, path_prefix=""):
|
|
diff_vec = []
|
|
p0 = [1.0, 0.0, 1.0]
|
|
maxfev = 100000
|
|
with open(export_filename, "w") as f:
|
|
for file, rois in roi_dict.items():
|
|
dat = pyzebra.read_detector_data(path_prefix + file + ".hdf")
|
|
|
|
wave = dat["wave"]
|
|
ddist = dat["ddist"]
|
|
|
|
gamma = dat["gamma"][0]
|
|
omega = dat["omega"][0]
|
|
nu = dat["nu"][0]
|
|
chi = dat["chi"][0]
|
|
phi = dat["phi"][0]
|
|
|
|
scan_motor = dat["scan_motor"]
|
|
var_angle = dat[scan_motor]
|
|
|
|
for roi in rois:
|
|
x0, xN, y0, yN, fr0, frN = roi
|
|
data_roi = dat["data"][fr0:frN, y0:yN, x0:xN]
|
|
|
|
cnts = np.sum(data_roi, axis=(1, 2))
|
|
coeff, _ = curve_fit(gauss, range(len(cnts)), cnts, p0=p0, maxfev=maxfev)
|
|
|
|
m = cnts.mean()
|
|
sd = cnts.std()
|
|
snr_cnts = np.where(sd == 0, 0, m / sd)
|
|
|
|
frC = fr0 + coeff[1]
|
|
var_F = var_angle[math.floor(frC)]
|
|
var_C = var_angle[math.ceil(frC)]
|
|
frStep = frC - math.floor(frC)
|
|
var_step = var_C - var_F
|
|
var_p = var_F + var_step * frStep
|
|
|
|
if scan_motor == "gamma":
|
|
gamma = var_p
|
|
elif scan_motor == "omega":
|
|
omega = var_p
|
|
elif scan_motor == "nu":
|
|
nu = var_p
|
|
elif scan_motor == "chi":
|
|
chi = var_p
|
|
elif scan_motor == "phi":
|
|
phi = var_p
|
|
|
|
intensity = coeff[1] * abs(coeff[2] * var_step) * math.sqrt(2) * math.sqrt(np.pi)
|
|
|
|
projX = np.sum(data_roi, axis=(0, 1))
|
|
coeff, _ = curve_fit(gauss, range(len(projX)), projX, p0=p0, maxfev=maxfev)
|
|
x_pos = x0 + coeff[1]
|
|
|
|
projY = np.sum(data_roi, axis=(0, 2))
|
|
coeff, _ = curve_fit(gauss, range(len(projY)), projY, p0=p0, maxfev=maxfev)
|
|
y_pos = y0 + coeff[1]
|
|
|
|
ga, nu = pyzebra.det2pol(ddist, gamma, nu, x_pos, y_pos)
|
|
diff_vector = pyzebra.z1frmd(wave, ga, omega, chi, phi, nu)
|
|
d_spacing = float(pyzebra.dandth(wave, diff_vector)[0])
|
|
diff_vector = diff_vector.flatten() * 1e10
|
|
dv1, dv2, dv3 = diff_vector
|
|
|
|
diff_vec.append(diff_vector)
|
|
|
|
f.write(f"{x_pos} {y_pos} {intensity} {snr_cnts} {dv1} {dv2} {dv3} {d_spacing}\n")
|
|
|
|
return diff_vec
|