68 Commits
0.6.4 ... 0.7.0

Author SHA1 Message Date
b8cf76220c Updating for version 0.7.0 2022-05-23 09:16:20 +02:00
20e43ecce9 Remove temp and mf from parameters match 2022-05-20 14:56:55 +02:00
bcb6f7bd3b Add a workaround for single motor positions 2022-05-20 14:20:01 +02:00
c3b198f63a Adapt 'temp' to new h5 file format 2022-05-20 12:48:54 +02:00
f044e739a0 Adapt counts to new h5 file format 2022-05-06 16:57:06 +02:00
6f83292d69 Temporary resolve issues with new format metadata 2022-05-06 15:54:12 +02:00
7075e0f42b Assume scan_motor to be "om" if it's not present 2022-05-06 15:29:45 +02:00
1d43c86cff Fix UB matrix read 2022-05-06 15:29:09 +02:00
cde2aa1182 Fix letter case for UB matrix metadata 2022-05-06 14:46:37 +02:00
df9607ceb9 Adapt 'wave' to new h5 file format 2022-05-06 14:20:26 +02:00
5c5be065be Adapt 'monitor' and 'omega' to new h5 file format 2022-05-06 12:27:31 +02:00
279dee3935 Temporary fix for new h5 files
Datasets has changed from a single number to arrays
2022-05-06 12:14:27 +02:00
4ebb343afb Support new UB matrix format in ccl files 2022-05-06 11:19:25 +02:00
2810cf3d2b Fix legend when res_flag is True 2022-05-05 19:04:14 +02:00
585d4b3f54 Code refactoring 2022-05-04 20:01:53 +02:00
72fd54d268 Add legend items for propagation vectors 2022-05-04 17:20:24 +02:00
181c359ef9 Add legend items for file entries 2022-05-04 17:10:18 +02:00
00607d94c7 Add a widget for orthogonal cut value 2022-05-03 17:27:45 +02:00
a192648cc4 Add ccl_prepare plotting functionality
* Based on Camilla's notebook
2022-05-03 17:18:53 +02:00
42adda235b Update ccl_prepare plot widgets 2022-04-30 20:12:24 +02:00
e30035350b Generalize search for proposals
Avoid hardcoding specific years in proposal paths
2022-04-28 16:37:54 +02:00
89fffed5d1 Add 2022 data folder 2022-04-28 15:04:18 +02:00
fae90cbeae Initial version of (m)hkl sorting 2022-04-26 16:37:20 +02:00
7b70d30578 Use uploaded cfl filename as a default for outputs 2022-04-21 18:15:33 +02:00
b26d7005c5 Optimize layout 2022-04-21 17:50:54 +02:00
90021428ee Add support for lattiCE and multiple kvect values 2022-04-21 17:50:50 +02:00
bf97af1949 Treat 2theta as gamma in geom files 2022-04-21 12:00:50 +02:00
098314e30d Performance optimization
Use lists as intermediate data structure to avoid lots of numpy array
allocations
2022-04-20 13:15:48 +02:00
49ff319230 Use TextInput for wavelength widget 2022-04-14 16:14:27 +02:00
9b1e396bdb Disabled currently no-op buttons 2022-04-13 10:15:44 +02:00
d72c1d478e Add download option for hkl/mhkl files 2022-04-13 10:12:30 +02:00
fa73271076 Display created hkl and mhkl lists 2022-04-13 09:16:14 +02:00
4d2a8ecad2 Run sxtal_refgen on GO press 2022-04-12 20:12:11 +02:00
6f1fe1a3d8 Correctly strip lines in read_cfl_file 2022-04-12 19:45:45 +02:00
8d4f4a9b50 Add export_cfl_file function 2022-04-12 19:45:00 +02:00
c94f784373 Keep params as nonlocal var 2022-04-12 18:44:23 +02:00
de2a10b507 Keep ang_lims as nonlocal var 2022-04-12 17:01:21 +02:00
e2220760f0 Save geometry in ang_lims 2022-04-12 16:09:22 +02:00
39b2af60ca Allow export of geom file based on template 2022-04-12 15:11:42 +02:00
94df4869d4 Add ability to load cif data 2022-04-12 12:05:42 +02:00
4131eb31e7 Activate cfl file upload 2022-04-11 16:06:19 +02:00
4ee1f6fb70 Ignore comments in geom and cfl files 2022-04-11 16:04:10 +02:00
fbce5de7b9 Handle default cfl file 2022-04-11 15:16:37 +02:00
636badfba8 Handle different number of angles for geoms 2022-04-11 14:15:45 +02:00
77d6f42e0d Activate initially the first geom selection 2022-04-11 12:29:01 +02:00
166259d669 Merge min/max widgets of ang limits 2022-04-11 12:13:21 +02:00
14d65c9590 Update default geometry files 2022-04-11 11:33:35 +02:00
48b001180f Add default values for ranges 2022-04-07 19:48:06 +02:00
21332b3edc Calculate ub matrix via Sxtal_Refgen 2022-04-07 19:13:04 +02:00
421fc726dd Handle wavelength-related widgets 2022-04-07 17:05:09 +02:00
eb3cc99aeb Implement angular limit updates from geom files 2022-04-06 17:51:41 +02:00
d36686177d Add optional --sxtal-refgen-path cli argument 2022-04-06 17:38:46 +02:00
7e2c6ad21f Add basic functions to handle geom files 2022-04-06 17:38:14 +02:00
03ed97e063 Add panel_ccl_prepare
Basic layout only
2022-03-11 17:05:45 +01:00
234fb1f3b6 Update hdf_view with 2D detector data merging 2022-02-14 11:04:56 +01:00
e8ce57b56b Add functions for 2D detector data merging 2022-02-14 11:04:16 +01:00
ac6f67cc53 Reduce scan motor position precision for merging 2022-02-08 15:23:03 +01:00
3234a544de Cast counts to float64 in h5 data
For correct calculation of count_err
2022-02-03 17:15:42 +01:00
4bd6c6760e Fix parse_h5meta 2022-02-03 17:12:48 +01:00
b401d2f459 Fix param_study for an unconverged fit
For #49
2022-02-02 10:42:44 +01:00
6b8d15234b Fix check for reversed ranges upon scan merging 2022-01-28 15:06:28 +01:00
5cfa5c176d Add counts_err, idx and scan_motors to hdf data 2022-01-28 13:47:57 +01:00
bbe7b7d305 Rename data into counts for hdf5 zebra data 2022-01-28 10:08:27 +01:00
4ba08366df Assign twotheta to polar_angle 2022-01-28 09:55:43 +01:00
51c78ad06b Read monitor value in hdf 2022-01-27 17:45:01 +01:00
bcd594fa7e Utility renames and hdf_param_study simplification 2022-01-27 16:37:55 +01:00
10bcabd7f9 Updating for version 0.6.5 2022-01-21 10:25:01 +01:00
bee8263184 Use dist2 in cami for detector distance 2022-01-21 10:24:30 +01:00
15 changed files with 1911 additions and 377 deletions

View File

@ -4,5 +4,6 @@ from pyzebra.ccl_process import *
from pyzebra.h5 import * from pyzebra.h5 import *
from pyzebra.utils import * from pyzebra.utils import *
from pyzebra.xtal import * from pyzebra.xtal import *
from pyzebra.sxtal_refgen import *
__version__ = "0.6.4" __version__ = "0.7.0"

View File

@ -14,6 +14,7 @@ import panel_hdf_param_study
import panel_hdf_viewer import panel_hdf_viewer
import panel_param_study import panel_param_study
import panel_spind import panel_spind
import panel_ccl_prepare
doc = curdoc() doc = curdoc()
@ -35,14 +36,18 @@ proposal_textinput.on_change("value_input", proposal_textinput_callback)
doc.proposal_textinput = proposal_textinput doc.proposal_textinput = proposal_textinput
def apply_button_callback(): def apply_button_callback():
try: proposal = proposal_textinput.value.strip()
proposal_path = pyzebra.find_proposal_path(proposal_textinput.value) if proposal:
except ValueError as e: try:
print(e) proposal_path = pyzebra.find_proposal_path(proposal)
return except ValueError as e:
print(e)
return
apply_button.disabled = True
else:
proposal_path = ""
proposal_textinput.name = proposal_path proposal_textinput.name = proposal_path
apply_button.disabled = True
apply_button = Button(label="Apply", button_type="primary") apply_button = Button(label="Apply", button_type="primary")
apply_button.on_click(apply_button_callback) apply_button.on_click(apply_button_callback)
@ -55,6 +60,7 @@ doc.add_root(
Panel(child=column(proposal_textinput, apply_button), title="user config"), Panel(child=column(proposal_textinput, apply_button), title="user config"),
panel_hdf_viewer.create(), panel_hdf_viewer.create(),
panel_hdf_anatric.create(), panel_hdf_anatric.create(),
panel_ccl_prepare.create(),
panel_ccl_integrate.create(), panel_ccl_integrate.create(),
panel_ccl_compare.create(), panel_ccl_compare.create(),
panel_param_study.create(), panel_param_study.create(),

View File

@ -6,7 +6,7 @@ from bokeh.application.application import Application
from bokeh.application.handlers import ScriptHandler from bokeh.application.handlers import ScriptHandler
from bokeh.server.server import Server from bokeh.server.server import Server
from pyzebra.anatric import ANATRIC_PATH from pyzebra import ANATRIC_PATH, SXTAL_REFGEN_PATH
from pyzebra.app.handler import PyzebraHandler from pyzebra.app.handler import PyzebraHandler
logging.basicConfig(format="%(asctime)s %(message)s", level=logging.INFO) logging.basicConfig(format="%(asctime)s %(message)s", level=logging.INFO)
@ -42,6 +42,13 @@ def main():
"--anatric-path", type=str, default=ANATRIC_PATH, help="path to anatric executable", "--anatric-path", type=str, default=ANATRIC_PATH, help="path to anatric executable",
) )
parser.add_argument(
"--sxtal-refgen-path",
type=str,
default=SXTAL_REFGEN_PATH,
help="path to Sxtal_Refgen executable",
)
parser.add_argument( parser.add_argument(
"--spind-path", type=str, default=None, help="path to spind scripts folder", "--spind-path", type=str, default=None, help="path to spind scripts folder",
) )

View File

@ -72,8 +72,8 @@ for (let i = 0; i < js_data.data['fname'].length; i++) {
def create(): def create():
doc = curdoc() doc = curdoc()
det_data1 = [] dataset1 = []
det_data2 = [] dataset2 = []
fit_params = {} fit_params = {}
js_data = ColumnDataSource(data=dict(content=["", ""], fname=["", ""], ext=["", ""])) js_data = ColumnDataSource(data=dict(content=["", ""], fname=["", ""], ext=["", ""]))
@ -99,17 +99,17 @@ def create():
proposal_textinput.on_change("name", proposal_textinput_callback) proposal_textinput.on_change("name", proposal_textinput_callback)
def _init_datatable(): def _init_datatable():
# det_data2 should have the same metadata to det_data1 # dataset2 should have the same metadata as dataset1
scan_list = [s["idx"] for s in det_data1] scan_list = [s["idx"] for s in dataset1]
hkl = [f'{s["h"]} {s["k"]} {s["l"]}' for s in det_data1] hkl = [f'{s["h"]} {s["k"]} {s["l"]}' for s in dataset1]
export = [s["export"] for s in det_data1] export = [s["export"] for s in dataset1]
twotheta = [np.median(s["twotheta"]) if "twotheta" in s else None for s in det_data1] twotheta = [np.median(s["twotheta"]) if "twotheta" in s else None for s in dataset1]
gamma = [np.median(s["gamma"]) if "gamma" in s else None for s in det_data1] gamma = [np.median(s["gamma"]) if "gamma" in s else None for s in dataset1]
omega = [np.median(s["omega"]) if "omega" in s else None for s in det_data1] omega = [np.median(s["omega"]) if "omega" in s else None for s in dataset1]
chi = [np.median(s["chi"]) if "chi" in s else None for s in det_data1] chi = [np.median(s["chi"]) if "chi" in s else None for s in dataset1]
phi = [np.median(s["phi"]) if "phi" in s else None for s in det_data1] phi = [np.median(s["phi"]) if "phi" in s else None for s in dataset1]
nu = [np.median(s["nu"]) if "nu" in s else None for s in det_data1] nu = [np.median(s["nu"]) if "nu" in s else None for s in dataset1]
scan_table_source.data.update( scan_table_source.data.update(
scan=scan_list, scan=scan_list,
@ -163,9 +163,9 @@ def create():
new_data1 = new_data1[:min_len] new_data1 = new_data1[:min_len]
new_data2 = new_data2[:min_len] new_data2 = new_data2[:min_len]
nonlocal det_data1, det_data2 nonlocal dataset1, dataset2
det_data1 = new_data1 dataset1 = new_data1
det_data2 = new_data2 dataset2 = new_data2
_init_datatable() _init_datatable()
file_open_button = Button(label="Open New", width=100, disabled=True) file_open_button = Button(label="Open New", width=100, disabled=True)
@ -201,9 +201,9 @@ def create():
new_data1 = new_data1[:min_len] new_data1 = new_data1[:min_len]
new_data2 = new_data2[:min_len] new_data2 = new_data2[:min_len]
nonlocal det_data1, det_data2 nonlocal dataset1, dataset2
det_data1 = new_data1 dataset1 = new_data1
det_data2 = new_data2 dataset2 = new_data2
_init_datatable() _init_datatable()
upload_div = Div(text="or upload 2 .ccl files:", margin=(5, 5, 0, 5)) upload_div = Div(text="or upload 2 .ccl files:", margin=(5, 5, 0, 5))
@ -213,17 +213,17 @@ def create():
upload_button.on_change("filename", upload_button_callback) upload_button.on_change("filename", upload_button_callback)
def monitor_spinner_callback(_attr, old, new): def monitor_spinner_callback(_attr, old, new):
if det_data1 and det_data2: if dataset1 and dataset2:
pyzebra.normalize_dataset(det_data1, new) pyzebra.normalize_dataset(dataset1, new)
pyzebra.normalize_dataset(det_data2, new) pyzebra.normalize_dataset(dataset2, new)
_update_plot() _update_plot()
monitor_spinner = Spinner(title="Monitor:", mode="int", value=100_000, low=1, width=145) monitor_spinner = Spinner(title="Monitor:", mode="int", value=100_000, low=1, width=145)
monitor_spinner.on_change("value", monitor_spinner_callback) monitor_spinner.on_change("value", monitor_spinner_callback)
def _update_table(): def _update_table():
fit_ok = [(1 if "fit" in scan else 0) for scan in det_data1] fit_ok = [(1 if "fit" in scan else 0) for scan in dataset1]
export = [scan["export"] for scan in det_data1] export = [scan["export"] for scan in dataset1]
scan_table_source.data.update(fit=fit_ok, export=export) scan_table_source.data.update(fit=fit_ok, export=export)
def _update_plot(): def _update_plot():
@ -382,7 +382,7 @@ def create():
def scan_table_source_callback(_attr, _old, new): def scan_table_source_callback(_attr, _old, new):
# unfortunately, we don't know if the change comes from data update or user input # unfortunately, we don't know if the change comes from data update or user input
# also `old` and `new` are the same for non-scalars # also `old` and `new` are the same for non-scalars
for scan1, scan2, export in zip(det_data1, det_data2, new["export"]): for scan1, scan2, export in zip(dataset1, dataset2, new["export"]):
scan1["export"] = export scan1["export"] = export
scan2["export"] = export scan2["export"] = export
_update_preview() _update_preview()
@ -426,14 +426,14 @@ def create():
def _get_selected_scan(): def _get_selected_scan():
ind = scan_table_source.selected.indices[0] ind = scan_table_source.selected.indices[0]
return det_data1[ind], det_data2[ind] return dataset1[ind], dataset2[ind]
merge_from_select = Select(title="scan:", width=145) merge_from_select = Select(title="scan:", width=145)
def merge_button_callback(): def merge_button_callback():
scan_into1, scan_into2 = _get_selected_scan() scan_into1, scan_into2 = _get_selected_scan()
scan_from1 = det_data1[int(merge_from_select.value)] scan_from1 = dataset1[int(merge_from_select.value)]
scan_from2 = det_data2[int(merge_from_select.value)] scan_from2 = dataset2[int(merge_from_select.value)]
if scan_into1 is scan_from1: if scan_into1 is scan_from1:
print("WARNING: Selected scans for merging are identical") print("WARNING: Selected scans for merging are identical")
@ -577,7 +577,7 @@ def create():
fit_output_textinput = TextAreaInput(title="Fit results:", width=750, height=200) fit_output_textinput = TextAreaInput(title="Fit results:", width=750, height=200)
def proc_all_button_callback(): def proc_all_button_callback():
for scan in [*det_data1, *det_data2]: for scan in [*dataset1, *dataset2]:
if scan["export"]: if scan["export"]:
pyzebra.fit_scan( pyzebra.fit_scan(
scan, fit_params, fit_from=fit_from_spinner.value, fit_to=fit_to_spinner.value scan, fit_params, fit_from=fit_from_spinner.value, fit_to=fit_to_spinner.value
@ -628,7 +628,7 @@ def create():
temp_file = temp_dir + "/temp" temp_file = temp_dir + "/temp"
export_data1 = [] export_data1 = []
export_data2 = [] export_data2 = []
for scan1, scan2 in zip(det_data1, det_data2): for scan1, scan2 in zip(dataset1, dataset2):
if scan1["export"]: if scan1["export"]:
export_data1.append(scan1) export_data1.append(scan1)
export_data2.append(scan2) export_data2.append(scan2)

View File

@ -72,7 +72,7 @@ for (let i = 0; i < js_data.data['fname'].length; i++) {
def create(): def create():
doc = curdoc() doc = curdoc()
det_data = [] dataset = []
fit_params = {} fit_params = {}
js_data = ColumnDataSource(data=dict(content=["", ""], fname=["", ""], ext=["", ""])) js_data = ColumnDataSource(data=dict(content=["", ""], fname=["", ""], ext=["", ""]))
@ -100,16 +100,16 @@ def create():
proposal_textinput.on_change("name", proposal_textinput_callback) proposal_textinput.on_change("name", proposal_textinput_callback)
def _init_datatable(): def _init_datatable():
scan_list = [s["idx"] for s in det_data] scan_list = [s["idx"] for s in dataset]
hkl = [f'{s["h"]} {s["k"]} {s["l"]}' for s in det_data] hkl = [f'{s["h"]} {s["k"]} {s["l"]}' for s in dataset]
export = [s["export"] for s in det_data] export = [s["export"] for s in dataset]
twotheta = [np.median(s["twotheta"]) if "twotheta" in s else None for s in det_data] twotheta = [np.median(s["twotheta"]) if "twotheta" in s else None for s in dataset]
gamma = [np.median(s["gamma"]) if "gamma" in s else None for s in det_data] gamma = [np.median(s["gamma"]) if "gamma" in s else None for s in dataset]
omega = [np.median(s["omega"]) if "omega" in s else None for s in det_data] omega = [np.median(s["omega"]) if "omega" in s else None for s in dataset]
chi = [np.median(s["chi"]) if "chi" in s else None for s in det_data] chi = [np.median(s["chi"]) if "chi" in s else None for s in dataset]
phi = [np.median(s["phi"]) if "phi" in s else None for s in det_data] phi = [np.median(s["phi"]) if "phi" in s else None for s in dataset]
nu = [np.median(s["nu"]) if "nu" in s else None for s in det_data] nu = [np.median(s["nu"]) if "nu" in s else None for s in dataset]
scan_table_source.data.update( scan_table_source.data.update(
scan=scan_list, scan=scan_list,
@ -133,7 +133,7 @@ def create():
file_select = MultiSelect(title="Available .ccl/.dat files:", width=210, height=250) file_select = MultiSelect(title="Available .ccl/.dat files:", width=210, height=250)
def file_open_button_callback(): def file_open_button_callback():
nonlocal det_data nonlocal dataset
new_data = [] new_data = []
for f_path in file_select.value: for f_path in file_select.value:
with open(f_path) as file: with open(f_path) as file:
@ -155,7 +155,7 @@ def create():
pyzebra.merge_datasets(new_data, file_data) pyzebra.merge_datasets(new_data, file_data)
if new_data: if new_data:
det_data = new_data dataset = new_data
_init_datatable() _init_datatable()
append_upload_button.disabled = False append_upload_button.disabled = False
@ -175,7 +175,7 @@ def create():
continue continue
pyzebra.normalize_dataset(file_data, monitor_spinner.value) pyzebra.normalize_dataset(file_data, monitor_spinner.value)
pyzebra.merge_datasets(det_data, file_data) pyzebra.merge_datasets(dataset, file_data)
if file_data: if file_data:
_init_datatable() _init_datatable()
@ -184,7 +184,7 @@ def create():
file_append_button.on_click(file_append_button_callback) file_append_button.on_click(file_append_button_callback)
def upload_button_callback(_attr, _old, _new): def upload_button_callback(_attr, _old, _new):
nonlocal det_data nonlocal dataset
new_data = [] new_data = []
for f_str, f_name in zip(upload_button.value, upload_button.filename): for f_str, f_name in zip(upload_button.value, upload_button.filename):
with io.StringIO(base64.b64decode(f_str).decode()) as file: with io.StringIO(base64.b64decode(f_str).decode()) as file:
@ -205,7 +205,7 @@ def create():
pyzebra.merge_datasets(new_data, file_data) pyzebra.merge_datasets(new_data, file_data)
if new_data: if new_data:
det_data = new_data dataset = new_data
_init_datatable() _init_datatable()
append_upload_button.disabled = False append_upload_button.disabled = False
@ -227,7 +227,7 @@ def create():
continue continue
pyzebra.normalize_dataset(file_data, monitor_spinner.value) pyzebra.normalize_dataset(file_data, monitor_spinner.value)
pyzebra.merge_datasets(det_data, file_data) pyzebra.merge_datasets(dataset, file_data)
if file_data: if file_data:
_init_datatable() _init_datatable()
@ -239,16 +239,16 @@ def create():
append_upload_button.on_change("filename", append_upload_button_callback) append_upload_button.on_change("filename", append_upload_button_callback)
def monitor_spinner_callback(_attr, old, new): def monitor_spinner_callback(_attr, old, new):
if det_data: if dataset:
pyzebra.normalize_dataset(det_data, new) pyzebra.normalize_dataset(dataset, new)
_update_plot() _update_plot()
monitor_spinner = Spinner(title="Monitor:", mode="int", value=100_000, low=1, width=145) monitor_spinner = Spinner(title="Monitor:", mode="int", value=100_000, low=1, width=145)
monitor_spinner.on_change("value", monitor_spinner_callback) monitor_spinner.on_change("value", monitor_spinner_callback)
def _update_table(): def _update_table():
fit_ok = [(1 if "fit" in scan else 0) for scan in det_data] fit_ok = [(1 if "fit" in scan else 0) for scan in dataset]
export = [scan["export"] for scan in det_data] export = [scan["export"] for scan in dataset]
scan_table_source.data.update(fit=fit_ok, export=export) scan_table_source.data.update(fit=fit_ok, export=export)
def _update_plot(): def _update_plot():
@ -368,7 +368,7 @@ def create():
def scan_table_source_callback(_attr, _old, new): def scan_table_source_callback(_attr, _old, new):
# unfortunately, we don't know if the change comes from data update or user input # unfortunately, we don't know if the change comes from data update or user input
# also `old` and `new` are the same for non-scalars # also `old` and `new` are the same for non-scalars
for scan, export in zip(det_data, new["export"]): for scan, export in zip(dataset, new["export"]):
scan["export"] = export scan["export"] = export
_update_preview() _update_preview()
@ -410,13 +410,13 @@ def create():
) )
def _get_selected_scan(): def _get_selected_scan():
return det_data[scan_table_source.selected.indices[0]] return dataset[scan_table_source.selected.indices[0]]
merge_from_select = Select(title="scan:", width=145) merge_from_select = Select(title="scan:", width=145)
def merge_button_callback(): def merge_button_callback():
scan_into = _get_selected_scan() scan_into = _get_selected_scan()
scan_from = det_data[int(merge_from_select.value)] scan_from = dataset[int(merge_from_select.value)]
if scan_into is scan_from: if scan_into is scan_from:
print("WARNING: Selected scans for merging are identical") print("WARNING: Selected scans for merging are identical")
@ -557,7 +557,7 @@ def create():
fit_output_textinput = TextAreaInput(title="Fit results:", width=750, height=200) fit_output_textinput = TextAreaInput(title="Fit results:", width=750, height=200)
def proc_all_button_callback(): def proc_all_button_callback():
for scan in det_data: for scan in dataset:
if scan["export"]: if scan["export"]:
pyzebra.fit_scan( pyzebra.fit_scan(
scan, fit_params, fit_from=fit_from_spinner.value, fit_to=fit_to_spinner.value scan, fit_params, fit_from=fit_from_spinner.value, fit_to=fit_to_spinner.value
@ -602,7 +602,7 @@ def create():
with tempfile.TemporaryDirectory() as temp_dir: with tempfile.TemporaryDirectory() as temp_dir:
temp_file = temp_dir + "/temp" temp_file = temp_dir + "/temp"
export_data = [] export_data = []
for scan in det_data: for scan in dataset:
if scan["export"]: if scan["export"]:
export_data.append(scan) export_data.append(scan)

View File

@ -0,0 +1,745 @@
import base64
import io
import os
import subprocess
import tempfile
import numpy as np
from bokeh.layouts import column, row
from bokeh.models import (
Arrow,
BoxZoomTool,
Button,
CheckboxGroup,
ColumnDataSource,
CustomJS,
Div,
Ellipse,
FileInput,
Legend,
LegendItem,
LinearAxis,
MultiLine,
MultiSelect,
NormalHead,
NumericInput,
Panel,
PanTool,
Plot,
RadioGroup,
Range1d,
ResetTool,
Scatter,
Select,
Spacer,
Spinner,
Text,
TextAreaInput,
TextInput,
WheelZoomTool,
)
from bokeh.palettes import Dark2
import pyzebra
javaScript = """
let j = 0;
for (let i = 0; i < js_data.data['fname'].length; i++) {
if (js_data.data['content'][i] === "") continue;
setTimeout(function() {
const blob = new Blob([js_data.data['content'][i]], {type: 'text/plain'})
const link = document.createElement('a');
document.body.appendChild(link);
const url = window.URL.createObjectURL(blob);
link.href = url;
link.download = js_data.data['fname'][i];
link.click();
window.URL.revokeObjectURL(url);
document.body.removeChild(link);
}, 100 * j)
j++;
}
"""
ANG_CHUNK_DEFAULTS = {"2theta": 30, "gamma": 30, "omega": 30, "chi": 35, "phi": 35, "nu": 10}
SORT_OPT_BI = ["2theta", "chi", "phi", "omega"]
SORT_OPT_NB = ["gamma", "nu", "omega"]
def create():
ang_lims = None
cif_data = None
params = None
res_files = {}
js_data = ColumnDataSource(data=dict(content=[""], fname=[""]))
anglim_div = Div(text="Angular min/max limits:", margin=(5, 5, 0, 5))
sttgamma_ti = TextInput(title="stt/gamma", width=100)
omega_ti = TextInput(title="omega", width=100)
chinu_ti = TextInput(title="chi/nu", width=100)
phi_ti = TextInput(title="phi", width=100)
def _update_ang_lims(ang_lims):
sttgamma_ti.value = " ".join(ang_lims["gamma"][:2])
omega_ti.value = " ".join(ang_lims["omega"][:2])
if ang_lims["geom"] == "nb":
chinu_ti.value = " ".join(ang_lims["nu"][:2])
phi_ti.value = ""
else: # ang_lims["geom"] == "bi"
chinu_ti.value = " ".join(ang_lims["chi"][:2])
phi_ti.value = " ".join(ang_lims["phi"][:2])
def _update_params(params):
if "WAVE" in params:
wavelen_input.value = params["WAVE"]
if "SPGR" in params:
cryst_space_group.value = params["SPGR"]
if "CELL" in params:
cryst_cell.value = params["CELL"]
if "UBMAT" in params:
ub_matrix.value = " ".join(params["UBMAT"])
if "HLIM" in params:
ranges_hkl.value = params["HLIM"]
if "SRANG" in params:
ranges_srang.value = params["SRANG"]
if "lattiCE" in params:
magstruct_lattice.value = params["lattiCE"]
if "kvect" in params:
magstruct_kvec.value = params["kvect"]
def open_geom_callback(_attr, _old, new):
nonlocal ang_lims
with io.StringIO(base64.b64decode(new).decode()) as fileobj:
ang_lims = pyzebra.read_geom_file(fileobj)
_update_ang_lims(ang_lims)
open_geom_div = Div(text="Open GEOM:")
open_geom = FileInput(accept=".geom", width=200)
open_geom.on_change("value", open_geom_callback)
def open_cfl_callback(_attr, _old, new):
nonlocal params
with io.StringIO(base64.b64decode(new).decode()) as fileobj:
params = pyzebra.read_cfl_file(fileobj)
_update_params(params)
open_cfl_div = Div(text="Open CFL:")
open_cfl = FileInput(accept=".cfl", width=200)
open_cfl.on_change("value", open_cfl_callback)
def open_cif_callback(_attr, _old, new):
nonlocal cif_data
with io.StringIO(base64.b64decode(new).decode()) as fileobj:
cif_data = pyzebra.read_cif_file(fileobj)
_update_params(cif_data)
open_cif_div = Div(text="Open CIF:")
open_cif = FileInput(accept=".cif", width=200)
open_cif.on_change("value", open_cif_callback)
wavelen_div = Div(text="Wavelength:", margin=(5, 5, 0, 5))
wavelen_input = TextInput(title="value", width=70)
def wavelen_select_callback(_attr, _old, new):
if new:
wavelen_input.value = new
else:
wavelen_input.value = ""
wavelen_select = Select(
title="preset", options=["", "0.788", "1.178", "1.383", "2.305"], width=70
)
wavelen_select.on_change("value", wavelen_select_callback)
cryst_div = Div(text="Crystal structure:", margin=(5, 5, 0, 5))
cryst_space_group = TextInput(title="space group", width=100)
cryst_cell = TextInput(title="cell", width=250)
def ub_matrix_calc_callback():
params = dict()
params["SPGR"] = cryst_space_group.value
params["CELL"] = cryst_cell.value
ub = pyzebra.calc_ub_matrix(params)
ub_matrix.value = " ".join(ub)
ub_matrix_calc = Button(label="UB matrix:", button_type="primary", width=100)
ub_matrix_calc.on_click(ub_matrix_calc_callback)
ub_matrix = TextInput(title="\u200B", width=600)
ranges_div = Div(text="Ranges:", margin=(5, 5, 0, 5))
ranges_hkl = TextInput(title="HKL", value="-25 25 -25 25 -25 25", width=250)
ranges_srang = TextInput(title="sin(​θ​)/λ", value="0.0 0.7", width=100)
magstruct_div = Div(text="Magnetic structure:", margin=(5, 5, 0, 5))
magstruct_lattice = TextInput(title="lattice", width=100)
magstruct_kvec = TextAreaInput(title="k vector", width=150)
def sorting0_callback(_attr, _old, new):
sorting_0_dt.value = ANG_CHUNK_DEFAULTS[new]
def sorting1_callback(_attr, _old, new):
sorting_1_dt.value = ANG_CHUNK_DEFAULTS[new]
def sorting2_callback(_attr, _old, new):
sorting_2_dt.value = ANG_CHUNK_DEFAULTS[new]
sorting_0 = Select(title="1st", width=100)
sorting_0.on_change("value", sorting0_callback)
sorting_0_dt = NumericInput(title="Δ", width=70)
sorting_1 = Select(title="2nd", width=100)
sorting_1.on_change("value", sorting1_callback)
sorting_1_dt = NumericInput(title="Δ", width=70)
sorting_2 = Select(title="3rd", width=100)
sorting_2.on_change("value", sorting2_callback)
sorting_2_dt = NumericInput(title="Δ", width=70)
def geom_radiogroup_callback(_attr, _old, new):
nonlocal ang_lims, params
if new == 0:
geom_file = pyzebra.get_zebraBI_default_geom_file()
sort_opt = SORT_OPT_BI
else:
geom_file = pyzebra.get_zebraNB_default_geom_file()
sort_opt = SORT_OPT_NB
cfl_file = pyzebra.get_zebra_default_cfl_file()
ang_lims = pyzebra.read_geom_file(geom_file)
_update_ang_lims(ang_lims)
params = pyzebra.read_cfl_file(cfl_file)
_update_params(params)
sorting_0.options = sorting_1.options = sorting_2.options = sort_opt
sorting_0.value = sort_opt[0]
sorting_1.value = sort_opt[1]
sorting_2.value = sort_opt[2]
geom_radiogroup_div = Div(text="Geometry:", margin=(5, 5, 0, 5))
geom_radiogroup = RadioGroup(labels=["bisecting", "normal beam"], width=150)
geom_radiogroup.on_change("active", geom_radiogroup_callback)
geom_radiogroup.active = 0
def go_button_callback():
ang_lims["gamma"][0], ang_lims["gamma"][1] = sttgamma_ti.value.strip().split()
ang_lims["omega"][0], ang_lims["omega"][1] = omega_ti.value.strip().split()
if ang_lims["geom"] == "nb":
ang_lims["nu"][0], ang_lims["nu"][1] = chinu_ti.value.strip().split()
else: # ang_lims["geom"] == "bi"
ang_lims["chi"][0], ang_lims["chi"][1] = chinu_ti.value.strip().split()
ang_lims["phi"][0], ang_lims["phi"][1] = phi_ti.value.strip().split()
if cif_data:
params.update(cif_data)
params["WAVE"] = wavelen_input.value
params["SPGR"] = cryst_space_group.value
params["CELL"] = cryst_cell.value
params["UBMAT"] = ub_matrix.value.split()
params["HLIM"] = ranges_hkl.value
params["SRANG"] = ranges_srang.value
params["lattiCE"] = magstruct_lattice.value
kvects = magstruct_kvec.value.split("\n")
with tempfile.TemporaryDirectory() as temp_dir:
geom_path = os.path.join(temp_dir, "zebra.geom")
if open_geom.value:
geom_template = io.StringIO(base64.b64decode(open_geom.value).decode())
else:
geom_template = None
pyzebra.export_geom_file(geom_path, ang_lims, geom_template)
print(f"Content of {geom_path}:")
with open(geom_path) as f:
print(f.read())
priority = [sorting_0.value, sorting_1.value, sorting_2.value]
chunks = [sorting_0_dt.value, sorting_1_dt.value, sorting_2_dt.value]
if geom_radiogroup.active == 0:
sort_hkl_file = pyzebra.sort_hkl_file_bi
priority.extend(set(SORT_OPT_BI) - set(priority))
else:
sort_hkl_file = pyzebra.sort_hkl_file_nb
# run sxtal_refgen for each kvect provided
for i, kvect in enumerate(kvects, start=1):
params["kvect"] = kvect
if open_cfl.filename:
base_fname = f"{os.path.splitext(open_cfl.filename)[0]}_{i}"
else:
base_fname = f"zebra_{i}"
cfl_path = os.path.join(temp_dir, base_fname + ".cfl")
if open_cfl.value:
cfl_template = io.StringIO(base64.b64decode(open_cfl.value).decode())
else:
cfl_template = None
pyzebra.export_cfl_file(cfl_path, params, cfl_template)
print(f"Content of {cfl_path}:")
with open(cfl_path) as f:
print(f.read())
comp_proc = subprocess.run(
[pyzebra.SXTAL_REFGEN_PATH, cfl_path],
cwd=temp_dir,
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
)
print(" ".join(comp_proc.args))
print(comp_proc.stdout)
if i == 1: # all hkl files are identical, so keep only one
hkl_fname = base_fname + ".hkl"
hkl_fpath = os.path.join(temp_dir, hkl_fname)
with open(hkl_fpath) as f:
res_files[hkl_fname] = f.read()
hkl_fname_sorted = base_fname + "_sorted.hkl"
hkl_fpath_sorted = os.path.join(temp_dir, hkl_fname_sorted)
sort_hkl_file(hkl_fpath, hkl_fpath_sorted, priority, chunks)
with open(hkl_fpath_sorted) as f:
res_files[hkl_fname_sorted] = f.read()
mhkl_fname = base_fname + ".mhkl"
mhkl_fpath = os.path.join(temp_dir, mhkl_fname)
with open(mhkl_fpath) as f:
res_files[mhkl_fname] = f.read()
mhkl_fname_sorted = base_fname + "_sorted.mhkl"
mhkl_fpath_sorted = os.path.join(temp_dir, hkl_fname_sorted)
sort_hkl_file(mhkl_fpath, mhkl_fpath_sorted, priority, chunks)
with open(mhkl_fpath_sorted) as f:
res_files[mhkl_fname_sorted] = f.read()
created_lists.options = list(res_files)
go_button = Button(label="GO", button_type="primary", width=50)
go_button.on_click(go_button_callback)
def created_lists_callback(_attr, _old, new):
sel_file = new[0]
file_text = res_files[sel_file]
preview_lists.value = file_text
js_data.data.update(content=[file_text], fname=[sel_file])
created_lists = MultiSelect(title="Created lists:", width=200, height=150)
created_lists.on_change("value", created_lists_callback)
preview_lists = TextAreaInput(title="Preview selected list:", width=600, height=150)
download_file = Button(label="Download file", button_type="success", width=200)
download_file.js_on_click(CustomJS(args={"js_data": js_data}, code=javaScript))
plot_list = Button(label="Plot selected list", button_type="primary", width=200, disabled=True)
measured_data_div = Div(text="Measured data:")
measured_data = FileInput(accept=".ccl", multiple=True, width=200)
min_grid_x = -10
max_grid_x = 10
min_grid_y = -5
max_grid_y = 5
cmap = Dark2[8]
syms = ["circle", "inverted_triangle", "square", "diamond", "star", "triangle"]
# Define resolution function
def _res_fun(stt, wave, res_mult):
expr = np.tan(stt / 2 * np.pi / 180)
fwhm = np.sqrt(0.4639 * expr ** 2 - 0.4452 * expr + 0.1506) * res_mult # res in deg
return fwhm
def plot_file_callback():
orth_dir = list(map(float, hkl_normal.value.split()))
cut_tol = hkl_delta.value
cut_or = hkl_cut.value
x_dir = list(map(float, hkl_in_plane_x.value.split()))
y_dir = list(map(float, hkl_in_plane_y.value.split()))
k = np.array(k_vectors.value.split()).astype(float).reshape(3, 3)
tol_k = 0.1
# Plotting options
grid_flag = 1
grid_minor_flag = 1
grid_div = 2 # Number of minor division lines per unit
# different symbols based on file number
file_flag = 0 in disting_opt_cb.active
# scale marker size according to intensity
intensity_flag = 1 in disting_opt_cb.active
# use color to mark different propagation vectors
prop_legend_flag = 2 in disting_opt_cb.active
# use resolution ellipsis
res_flag = disting_opt_rb.active
# multiplier for resolution function (in case of samples with large mosaicity)
res_mult = res_mult_ni.value
md_fnames = measured_data.filename
md_fdata = measured_data.value
# Load first data cile, read angles and define matrices to perform conversion to cartesian coordinates and back
with io.StringIO(base64.b64decode(md_fdata[0]).decode()) as file:
_, ext = os.path.splitext(md_fnames[0])
try:
file_data = pyzebra.parse_1D(file, ext)
except:
print(f"Error loading {md_fnames[0]}")
return
alpha = file_data[0]["alpha_cell"] * np.pi / 180.0
beta = file_data[0]["beta_cell"] * np.pi / 180.0
gamma = file_data[0]["gamma_cell"] * np.pi / 180.0
# reciprocal angle parameters
alpha_star = np.arccos(
(np.cos(beta) * np.cos(gamma) - np.cos(alpha)) / (np.sin(beta) * np.sin(gamma))
)
beta_star = np.arccos(
(np.cos(alpha) * np.cos(gamma) - np.cos(beta)) / (np.sin(alpha) * np.sin(gamma))
)
gamma_star = np.arccos(
(np.cos(alpha) * np.cos(beta) - np.cos(gamma)) / (np.sin(alpha) * np.sin(beta))
)
# conversion matrix:
M = np.array(
[
[1, np.cos(gamma_star), np.cos(beta_star)],
[0, np.sin(gamma_star), -np.sin(beta_star) * np.cos(alpha)],
[0, 0, np.sin(beta_star) * np.sin(alpha)],
]
)
M_inv = np.linalg.inv(M)
# Calculate in-plane y-direction
x_c = M @ x_dir
y_c = M @ y_dir
o_c = M @ orth_dir
# Normalize all directions
y_c = y_c / np.linalg.norm(y_c)
x_c = x_c / np.linalg.norm(x_c)
o_c = o_c / np.linalg.norm(o_c)
# Read all data
hkl_coord = []
intensity_vec = []
k_flag_vec = []
file_flag_vec = []
res_vec_x = []
res_vec_y = []
res_N = 10
for j in range(len(md_fnames)):
with io.StringIO(base64.b64decode(md_fdata[j]).decode()) as file:
_, ext = os.path.splitext(md_fnames[j])
try:
file_data = pyzebra.parse_1D(file, ext)
except:
print(f"Error loading {md_fnames[j]}")
return
# Loop throguh all data
for scan in file_data:
om = scan["omega"]
gammad = scan["twotheta"]
chi = scan["chi"]
phi = scan["phi"]
nud = 0 # 1d detector
ub = scan["ub"]
ddist = float(scan["detectorDistance"])
counts = scan["counts"]
mon = scan["monitor"]
# Determine wavelength from mcvl value (is wavelength stored anywhere???)
mcvl = scan["mcvl"]
if mcvl == 2.2:
wave = 1.178
elif mcvl == 7.0:
wave = 1.383
else:
wave = 2.3
# Calculate resolution in degrees
res = _res_fun(gammad, wave, res_mult)
# convert to resolution in hkl along scan line
ang2hkl_1d = pyzebra.ang2hkl_1d
res_x = []
res_y = []
for _om in np.linspace(om[0], om[-1], num=res_N):
expr1 = ang2hkl_1d(wave, ddist, gammad, _om + res / 2, chi, phi, nud, ub)
expr2 = ang2hkl_1d(wave, ddist, gammad, _om - res / 2, chi, phi, nud, ub)
hkl_temp = M @ (np.abs(expr1 - expr2) / 2)
res_x.append(hkl_temp[0])
res_y.append(hkl_temp[1])
# Get first and final hkl
hkl1 = ang2hkl_1d(wave, ddist, gammad, om[0], chi, phi, nud, ub)
hkl2 = ang2hkl_1d(wave, ddist, gammad, om[-1], chi, phi, nud, ub)
# Get hkl at best intensity
hkl_m = ang2hkl_1d(wave, ddist, gammad, om[np.argmax(counts)], chi, phi, nud, ub)
# Estimate intensity for marker size scaling
y1 = counts[0]
y2 = counts[-1]
x1 = om[0]
x2 = om[-1]
a = (y1 - y2) / (x1 - x2)
b = y1 - a * x1
intensity_exp = np.sum(counts - (a * om + b))
c = int(intensity_exp / mon * 10000)
# Recognize k_flag_vec
min_hkl_m = np.minimum(1 - hkl_m % 1, hkl_m % 1)
for j2, _k in enumerate(k):
if all(np.abs(min_hkl_m - _k) < tol_k):
k_flag_vec.append(j2)
break
else:
k_flag_vec.append(len(k))
# Save data
hkl_coord.append([hkl1, hkl2, hkl_m])
intensity_vec.append(c)
file_flag_vec.append(j)
res_vec_x.append(res_x)
res_vec_y.append(res_y)
plot.x_range.start = plot.x_range.reset_start = -2
plot.x_range.end = plot.x_range.reset_end = 5
plot.y_range.start = plot.y_range.reset_start = -4
plot.y_range.end = plot.y_range.reset_end = 3.5
xs, ys = [], []
xs_minor, ys_minor = [], []
if grid_flag:
for yy in np.arange(min_grid_y, max_grid_y, 1):
hkl1 = M @ [0, yy, 0]
xs.append([min_grid_y, max_grid_y])
ys.append([hkl1[1], hkl1[1]])
for xx in np.arange(min_grid_x, max_grid_x, 1):
hkl1 = M @ [xx, min_grid_x, 0]
hkl2 = M @ [xx, max_grid_x, 0]
xs.append([hkl1[0], hkl2[0]])
ys.append([hkl1[1], hkl2[1]])
if grid_minor_flag:
for yy in np.arange(min_grid_y, max_grid_y, 1 / grid_div):
hkl1 = M @ [0, yy, 0]
xs_minor.append([min_grid_y, max_grid_y])
ys_minor.append([hkl1[1], hkl1[1]])
for xx in np.arange(min_grid_x, max_grid_x, 1 / grid_div):
hkl1 = M @ [xx, min_grid_x, 0]
hkl2 = M @ [xx, max_grid_x, 0]
xs_minor.append([hkl1[0], hkl2[0]])
ys_minor.append([hkl1[1], hkl2[1]])
grid_source.data.update(xs=xs, ys=ys)
minor_grid_source.data.update(xs=xs_minor, ys=ys_minor)
el_x, el_y, el_w, el_h, el_c = [], [], [], [], []
scan_xs, scan_ys, scan_x, scan_y = [], [], [], []
scan_m, scan_s, scan_c, scan_l = [], [], [], []
for j in range(len(hkl_coord)):
# Get middle hkl from list
hklm = M @ hkl_coord[j][2]
# Decide if point is in the cut
proj = np.dot(hklm, o_c)
if abs(proj - cut_or) >= cut_tol:
continue
hkl1 = M @ hkl_coord[j][0]
hkl2 = M @ hkl_coord[j][1]
if intensity_flag:
markersize = max(1, int(intensity_vec[j] / max(intensity_vec) * 20))
else:
markersize = 4
if file_flag:
plot_symbol = syms[file_flag_vec[j]]
else:
plot_symbol = "circle"
if prop_legend_flag:
col_value = cmap[k_flag_vec[j]]
else:
col_value = "black"
if res_flag:
# Generate series of ellipses along scan line
el_x.extend(np.linspace(hkl1[0], hkl2[0], num=res_N))
el_y.extend(np.linspace(hkl1[1], hkl2[1], num=res_N))
el_w.extend(np.array(res_vec_x[j]) * 2)
el_h.extend(np.array(res_vec_y[j]) * 2)
el_c.extend([col_value] * res_N)
else:
# Plot scan line
scan_xs.append([hkl1[0], hkl2[0]])
scan_ys.append([hkl1[1], hkl2[1]])
# Plot middle point of scan
scan_x.append(hklm[0])
scan_y.append(hklm[1])
scan_m.append(plot_symbol)
scan_s.append(markersize)
# Color and legend label
scan_c.append(col_value)
scan_l.append(md_fnames[file_flag_vec[j]])
ellipse_source.data.update(x=el_x, y=el_y, w=el_w, h=el_h, c=el_c)
scan_source.data.update(
xs=scan_xs, ys=scan_ys, x=scan_x, y=scan_y, m=scan_m, s=scan_s, c=scan_c, l=scan_l,
)
arrow1.visible = True
arrow1.x_end = x_c[0]
arrow1.y_end = x_c[1]
arrow2.visible = True
arrow2.x_end = y_c[0]
arrow2.y_end = y_c[1]
kvect_source.data.update(
text_x=[x_c[0] / 2, y_c[0] / 2 - 0.1],
text_y=[x_c[1] - 0.1, y_c[1] / 2],
text=["h", "k"],
)
# Legend items for different file entries (symbol)
legend_items = []
if not res_flag and file_flag:
labels, inds = np.unique(scan_source.data["l"], return_index=True)
for label, ind in zip(labels, inds):
legend_items.append(LegendItem(label=label, renderers=[scatter], index=ind))
# Legend items for propagation vector (color)
if prop_legend_flag:
if res_flag:
source, render = ellipse_source, ellipse
else:
source, render = scan_source, mline
labels, inds = np.unique(source.data["c"], return_index=True)
for label, ind in zip(labels, inds):
label = f"k={k[cmap.index(label)]}"
legend_items.append(LegendItem(label=label, renderers=[render], index=ind))
plot.legend.items = legend_items
plot_file = Button(label="Plot selected file(s)", button_type="primary", width=200)
plot_file.on_click(plot_file_callback)
plot = Plot(x_range=Range1d(), y_range=Range1d(), plot_height=450, plot_width=600)
plot.add_tools(PanTool(), WheelZoomTool(), BoxZoomTool(), ResetTool())
plot.toolbar.logo = None
plot.add_layout(LinearAxis(), place="left")
plot.add_layout(LinearAxis(), place="below")
arrow1 = Arrow(x_start=0, y_start=0, x_end=0, y_end=0, end=NormalHead(size=10), visible=False)
plot.add_layout(arrow1)
arrow2 = Arrow(x_start=0, y_start=0, x_end=0, y_end=0, end=NormalHead(size=10), visible=False)
plot.add_layout(arrow2)
kvect_source = ColumnDataSource(dict(text_x=[], text_y=[], text=[]))
plot.add_glyph(kvect_source, Text(x="text_x", y="text_y", text="text"))
grid_source = ColumnDataSource(dict(xs=[], ys=[]))
minor_grid_source = ColumnDataSource(dict(xs=[], ys=[]))
plot.add_glyph(grid_source, MultiLine(xs="xs", ys="ys", line_color="gray"))
plot.add_glyph(
minor_grid_source, MultiLine(xs="xs", ys="ys", line_color="gray", line_dash="dotted")
)
ellipse_source = ColumnDataSource(dict(x=[], y=[], w=[], h=[], c=[]))
ellipse = plot.add_glyph(
ellipse_source, Ellipse(x="x", y="y", width="w", height="h", fill_color="c", line_color="c")
)
scan_source = ColumnDataSource(dict(xs=[], ys=[], x=[], y=[], m=[], s=[], c=[], l=[]))
mline = plot.add_glyph(scan_source, MultiLine(xs="xs", ys="ys", line_color="c"))
scatter = plot.add_glyph(
scan_source, Scatter(x="x", y="y", marker="m", size="s", fill_color="c", line_color="c")
)
plot.add_layout(Legend(items=[], location="top_left", click_policy="hide"))
hkl_div = Div(text="HKL:", margin=(5, 5, 0, 5))
hkl_normal = TextInput(title="normal", value="0 0 1", width=70)
hkl_cut = Spinner(title="cut", value=0, step=0.1, width=70)
hkl_delta = NumericInput(title="delta", value=0.1, mode="float", width=70)
hkl_in_plane_x = TextInput(title="in-plane X", value="1 0 0", width=70)
hkl_in_plane_y = TextInput(title="in-plane Y", value="0 1 0", width=70)
disting_opt_div = Div(text="Distinguish options:", margin=(5, 5, 0, 5))
disting_opt_cb = CheckboxGroup(
labels=["files (symbols)", "intensities (size)", "k vectors nucl/magn (colors)"],
active=[0, 1, 2],
width=200,
)
disting_opt_rb = RadioGroup(
labels=["scan direction", "resolution ellipsoid"], active=0, width=200
)
k_vectors = TextAreaInput(
title="k vectors:", value="0.0 0.0 0.0\n0.5 0.0 0.0\n0.5 0.5 0.0", width=150,
)
res_mult_ni = NumericInput(title="Resolution mult:", value=10, mode="int", width=100)
fileinput_layout = row(open_cfl_div, open_cfl, open_cif_div, open_cif, open_geom_div, open_geom)
geom_layout = column(geom_radiogroup_div, geom_radiogroup)
wavelen_layout = column(wavelen_div, row(wavelen_select, wavelen_input))
anglim_layout = column(anglim_div, row(sttgamma_ti, omega_ti, chinu_ti, phi_ti))
cryst_layout = column(cryst_div, row(cryst_space_group, cryst_cell))
ubmat_layout = row(column(Spacer(height=18), ub_matrix_calc), ub_matrix)
ranges_layout = column(ranges_div, row(ranges_hkl, ranges_srang))
magstruct_layout = column(magstruct_div, row(magstruct_lattice, magstruct_kvec))
sorting_layout = row(
sorting_0,
sorting_0_dt,
Spacer(width=30),
sorting_1,
sorting_1_dt,
Spacer(width=30),
sorting_2,
sorting_2_dt,
)
column1_layout = column(
fileinput_layout,
Spacer(height=10),
row(geom_layout, wavelen_layout, Spacer(width=50), anglim_layout),
cryst_layout,
ubmat_layout,
row(ranges_layout, Spacer(width=50), magstruct_layout),
row(sorting_layout, Spacer(width=30), column(Spacer(height=18), go_button)),
row(created_lists, preview_lists),
row(download_file, plot_list),
)
hkl_layout = column(
hkl_div,
row(hkl_normal, hkl_cut, hkl_delta, Spacer(width=10), hkl_in_plane_x, hkl_in_plane_y),
)
disting_layout = column(disting_opt_div, row(disting_opt_cb, disting_opt_rb))
column2_layout = column(
row(measured_data_div, measured_data, plot_file),
plot,
row(hkl_layout, k_vectors),
row(disting_layout, res_mult_ni),
)
tab_layout = row(column1_layout, column2_layout)
return Panel(child=tab_layout, title="ccl prepare")

View File

@ -48,8 +48,7 @@ IMAGE_PLOT_H = int(IMAGE_H * 2) + 27
def create(): def create():
doc = curdoc() doc = curdoc()
zebra_data = [] dataset = []
det_data = {}
cami_meta = {} cami_meta = {}
num_formatter = NumberFormatter(format="0.00", nan_format="") num_formatter = NumberFormatter(format="0.00", nan_format="")
@ -108,15 +107,15 @@ def create():
def _init_datatable(): def _init_datatable():
file_list = [] file_list = []
for scan in zebra_data: for scan in dataset:
file_list.append(os.path.basename(scan["original_filename"])) file_list.append(os.path.basename(scan["original_filename"]))
scan_table_source.data.update( scan_table_source.data.update(
file=file_list, file=file_list,
param=[None] * len(zebra_data), param=[None] * len(dataset),
frame=[None] * len(zebra_data), frame=[None] * len(dataset),
x_pos=[None] * len(zebra_data), x_pos=[None] * len(dataset),
y_pos=[None] * len(zebra_data), y_pos=[None] * len(dataset),
) )
scan_table_source.selected.indices = [] scan_table_source.selected.indices = []
scan_table_source.selected.indices = [0] scan_table_source.selected.indices = [0]
@ -127,7 +126,7 @@ def create():
frame = [] frame = []
x_pos = [] x_pos = []
y_pos = [] y_pos = []
for scan in zebra_data: for scan in dataset:
if "fit" in scan: if "fit" in scan:
framei = scan["fit"]["frame"] framei = scan["fit"]["frame"]
x_posi = scan["fit"]["x_pos"] x_posi = scan["fit"]["x_pos"]
@ -150,13 +149,13 @@ def create():
print("Could not read data from the file.") print("Could not read data from the file.")
return return
zebra_data.extend(new_data) dataset.extend(new_data)
_init_datatable() _init_datatable()
def file_open_button_callback(): def file_open_button_callback():
nonlocal zebra_data nonlocal dataset
zebra_data = [] dataset = []
_file_open() _file_open()
file_open_button = Button(label="Open New", width=100) file_open_button = Button(label="Open New", width=100)
@ -170,8 +169,6 @@ def create():
# Scan select # Scan select
def scan_table_select_callback(_attr, old, new): def scan_table_select_callback(_attr, old, new):
nonlocal det_data
if not new: if not new:
# skip empty selections # skip empty selections
return return
@ -186,21 +183,21 @@ def create():
# skip unnecessary update caused by selection drop # skip unnecessary update caused by selection drop
return return
det_data = zebra_data[new[0]] scan = dataset[new[0]]
zebra_mode = det_data["zebra_mode"] zebra_mode = scan["zebra_mode"]
if zebra_mode == "nb": if zebra_mode == "nb":
metadata_table_source.data.update(geom=["normal beam"]) metadata_table_source.data.update(geom=["normal beam"])
else: # zebra_mode == "bi" else: # zebra_mode == "bi"
metadata_table_source.data.update(geom=["bisecting"]) metadata_table_source.data.update(geom=["bisecting"])
if "mf" in det_data: if "mf" in scan:
metadata_table_source.data.update(mf=[det_data["mf"][0]]) metadata_table_source.data.update(mf=[scan["mf"][0]])
else: else:
metadata_table_source.data.update(mf=[None]) metadata_table_source.data.update(mf=[None])
if "temp" in det_data: if "temp" in scan:
metadata_table_source.data.update(temp=[det_data["temp"][0]]) metadata_table_source.data.update(temp=[scan["temp"][0]])
else: else:
metadata_table_source.data.update(temp=[None]) metadata_table_source.data.update(temp=[None])
@ -240,12 +237,15 @@ def create():
autosize_mode="none", autosize_mode="none",
) )
def _get_selected_scan():
return dataset[scan_table_source.selected.indices[0]]
def param_select_callback(_attr, _old, new): def param_select_callback(_attr, _old, new):
if new == "user defined": if new == "user defined":
param = [None] * len(zebra_data) param = [None] * len(dataset)
else: else:
# TODO: which value to take? # TODO: which value to take?
param = [scan[new][0] for scan in zebra_data] param = [scan[new][0] for scan in dataset]
scan_table_source.data["param"] = param scan_table_source.data["param"] = param
_update_param_plot() _update_param_plot()
@ -259,10 +259,11 @@ def create():
param_select.on_change("value", param_select_callback) param_select.on_change("value", param_select_callback)
def update_overview_plot(): def update_overview_plot():
h5_data = det_data["data"] scan = _get_selected_scan()
n_im, n_y, n_x = h5_data.shape counts = scan["counts"]
overview_x = np.mean(h5_data, axis=1) n_im, n_y, n_x = counts.shape
overview_y = np.mean(h5_data, axis=2) overview_x = np.mean(counts, axis=1)
overview_y = np.mean(counts, axis=2)
# normalize for simpler colormapping # normalize for simpler colormapping
overview_max_val = max(np.max(overview_x), np.max(overview_y)) overview_max_val = max(np.max(overview_x), np.max(overview_y))
@ -290,10 +291,10 @@ def create():
frame_range.reset_end = n_im frame_range.reset_end = n_im
frame_range.bounds = (0, n_im) frame_range.bounds = (0, n_im)
scan_motor = det_data["scan_motor"] scan_motor = scan["scan_motor"]
overview_plot_y.axis[1].axis_label = f"Scanning motor, {scan_motor}" overview_plot_y.axis[1].axis_label = f"Scanning motor, {scan_motor}"
var = det_data[scan_motor] var = scan[scan_motor]
var_start = var[0] var_start = var[0]
var_end = var[-1] + (var[-1] - var[0]) / (n_im - 1) var_end = var[-1] + (var[-1] - var[0]) / (n_im - 1)
@ -470,7 +471,7 @@ def create():
x = [] x = []
y = [] y = []
fit_param = fit_param_select.value fit_param = fit_param_select.value
for s, p in zip(zebra_data, scan_table_source.data["param"]): for s, p in zip(dataset, scan_table_source.data["param"]):
if "fit" in s and fit_param: if "fit" in s and fit_param:
x.append(p) x.append(p)
y.append(s["fit"][fit_param]) y.append(s["fit"][fit_param])
@ -498,7 +499,7 @@ def create():
fit_param_select.on_change("value", fit_param_select_callback) fit_param_select.on_change("value", fit_param_select_callback)
def proc_all_button_callback(): def proc_all_button_callback():
for scan in zebra_data: for scan in dataset:
pyzebra.fit_event( pyzebra.fit_event(
scan, scan,
int(np.floor(frame_range.start)), int(np.floor(frame_range.start)),
@ -511,7 +512,7 @@ def create():
_update_table() _update_table()
for scan in zebra_data: for scan in dataset:
if "fit" in scan: if "fit" in scan:
options = list(scan["fit"].keys()) options = list(scan["fit"].keys())
fit_param_select.options = options fit_param_select.options = options
@ -524,8 +525,9 @@ def create():
proc_all_button.on_click(proc_all_button_callback) proc_all_button.on_click(proc_all_button_callback)
def proc_button_callback(): def proc_button_callback():
scan = _get_selected_scan()
pyzebra.fit_event( pyzebra.fit_event(
det_data, scan,
int(np.floor(frame_range.start)), int(np.floor(frame_range.start)),
int(np.ceil(frame_range.end)), int(np.ceil(frame_range.end)),
int(np.floor(det_y_range.start)), int(np.floor(det_y_range.start)),
@ -536,7 +538,7 @@ def create():
_update_table() _update_table()
for scan in zebra_data: for scan in dataset:
if "fit" in scan: if "fit" in scan:
options = list(scan["fit"].keys()) options = list(scan["fit"].keys())
fit_param_select.options = options fit_param_select.options = options

View File

@ -11,6 +11,7 @@ from bokeh.models import (
BoxEditTool, BoxEditTool,
BoxZoomTool, BoxZoomTool,
Button, Button,
CellEditor,
CheckboxGroup, CheckboxGroup,
ColumnDataSource, ColumnDataSource,
DataRange1d, DataRange1d,
@ -52,7 +53,7 @@ IMAGE_PLOT_H = int(IMAGE_H * 2) + 27
def create(): def create():
doc = curdoc() doc = curdoc()
det_data = {} dataset = []
cami_meta = {} cami_meta = {}
num_formatter = NumberFormatter(format="0.00", nan_format="") num_formatter = NumberFormatter(format="0.00", nan_format="")
@ -107,15 +108,16 @@ def create():
upload_cami_button = FileInput(accept=".cami", width=200) upload_cami_button = FileInput(accept=".cami", width=200)
upload_cami_button.on_change("value", upload_cami_button_callback) upload_cami_button.on_change("value", upload_cami_button_callback)
def _file_open(file, cami_meta): def upload_hdf_button_callback(_attr, _old, new):
nonlocal det_data nonlocal dataset
try: try:
det_data = pyzebra.read_detector_data(file, cami_meta) scan = pyzebra.read_detector_data(io.BytesIO(base64.b64decode(new)), None)
except KeyError: except KeyError:
print("Could not read data from the file.") print("Could not read data from the file.")
return return
last_im_index = det_data["data"].shape[0] - 1 dataset = [scan]
last_im_index = scan["counts"].shape[0] - 1
index_spinner.value = 0 index_spinner.value = 0
index_spinner.high = last_im_index index_spinner.high = last_im_index
@ -125,39 +127,207 @@ def create():
index_slider.disabled = False index_slider.disabled = False
index_slider.end = last_im_index index_slider.end = last_im_index
zebra_mode = det_data["zebra_mode"] zebra_mode = scan["zebra_mode"]
if zebra_mode == "nb": if zebra_mode == "nb":
metadata_table_source.data.update(geom=["normal beam"]) metadata_table_source.data.update(geom=["normal beam"])
else: # zebra_mode == "bi" else: # zebra_mode == "bi"
metadata_table_source.data.update(geom=["bisecting"]) metadata_table_source.data.update(geom=["bisecting"])
update_image(0) _init_datatable()
update_overview_plot()
def upload_hdf_button_callback(_attr, _old, new):
_file_open(io.BytesIO(base64.b64decode(new)), None)
upload_hdf_div = Div(text="or upload .hdf file:", margin=(5, 5, 0, 5)) upload_hdf_div = Div(text="or upload .hdf file:", margin=(5, 5, 0, 5))
upload_hdf_button = FileInput(accept=".hdf", width=200) upload_hdf_button = FileInput(accept=".hdf", width=200)
upload_hdf_button.on_change("value", upload_hdf_button_callback) upload_hdf_button.on_change("value", upload_hdf_button_callback)
def file_open_button_callback(): def file_open_button_callback():
if not file_select.value: nonlocal dataset
return new_data = []
cm = cami_meta if data_source.value == "cami file" else None
for f_path in file_select.value:
f_name = os.path.basename(f_path)
try:
file_data = [pyzebra.read_detector_data(f_path, cm)]
except:
print(f"Error loading {f_name}")
continue
if data_source.value == "proposal number": pyzebra.normalize_dataset(file_data, monitor_spinner.value)
_file_open(file_select.value[0], None)
else: if not new_data: # first file
_file_open(file_select.value[0], cami_meta) new_data = file_data
else:
pyzebra.merge_datasets(new_data, file_data)
if new_data:
dataset = new_data
_init_datatable()
file_open_button = Button(label="Open New", width=100) file_open_button = Button(label="Open New", width=100)
file_open_button.on_click(file_open_button_callback) file_open_button.on_click(file_open_button_callback)
def update_image(index=None): def file_append_button_callback():
file_data = []
for f_path in file_select.value:
f_name = os.path.basename(f_path)
try:
file_data = [pyzebra.read_detector_data(f_path, None)]
except:
print(f"Error loading {f_name}")
continue
pyzebra.normalize_dataset(file_data, monitor_spinner.value)
pyzebra.merge_datasets(dataset, file_data)
if file_data:
_init_datatable()
file_append_button = Button(label="Append", width=100)
file_append_button.on_click(file_append_button_callback)
def _init_datatable():
scan_list = [s["idx"] for s in dataset]
export = [s["export"] for s in dataset]
twotheta = [np.median(s["twotheta"]) if "twotheta" in s else None for s in dataset]
gamma = [np.median(s["gamma"]) if "gamma" in s else None for s in dataset]
omega = [np.median(s["omega"]) if "omega" in s else None for s in dataset]
chi = [np.median(s["chi"]) if "chi" in s else None for s in dataset]
phi = [np.median(s["phi"]) if "phi" in s else None for s in dataset]
nu = [np.median(s["nu"]) if "nu" in s else None for s in dataset]
scan_table_source.data.update(
scan=scan_list,
fit=[0] * len(scan_list),
export=export,
twotheta=twotheta,
gamma=gamma,
omega=omega,
chi=chi,
phi=phi,
nu=nu,
)
scan_table_source.selected.indices = []
scan_table_source.selected.indices = [0]
merge_options = [(str(i), f"{i} ({idx})") for i, idx in enumerate(scan_list)]
merge_from_select.options = merge_options
merge_from_select.value = merge_options[0][0]
def scan_table_select_callback(_attr, old, new):
if not new:
# skip empty selections
return
# Avoid selection of multiple indicies (via Shift+Click or Ctrl+Click)
if len(new) > 1:
# drop selection to the previous one
scan_table_source.selected.indices = old
return
if len(old) > 1:
# skip unnecessary update caused by selection drop
return
scan = _get_selected_scan()
last_im_index = scan["counts"].shape[0] - 1
index_spinner.value = 0
index_spinner.high = last_im_index
if last_im_index == 0:
index_slider.disabled = True
else:
index_slider.disabled = False
index_slider.end = last_im_index
zebra_mode = scan["zebra_mode"]
if zebra_mode == "nb":
metadata_table_source.data.update(geom=["normal beam"])
else: # zebra_mode == "bi"
metadata_table_source.data.update(geom=["bisecting"])
_update_image()
_update_overview_plot()
def scan_table_source_callback(_attr, _old, new):
# unfortunately, we don't know if the change comes from data update or user input
# also `old` and `new` are the same for non-scalars
for scan, export in zip(dataset, new["export"]):
scan["export"] = export
scan_table_source = ColumnDataSource(
dict(scan=[], fit=[], export=[], twotheta=[], gamma=[], omega=[], chi=[], phi=[], nu=[],)
)
scan_table_source.on_change("data", scan_table_source_callback)
scan_table_source.selected.on_change("indices", scan_table_select_callback)
scan_table = DataTable(
source=scan_table_source,
columns=[
TableColumn(field="scan", title="Scan", editor=CellEditor(), width=50),
TableColumn(field="fit", title="Fit", editor=CellEditor(), width=50),
TableColumn(field="export", title="Export", editor=CellEditor(), width=50),
TableColumn(field="twotheta", title="2theta", editor=CellEditor(), width=50),
TableColumn(field="gamma", title="gamma", editor=CellEditor(), width=50),
TableColumn(field="omega", title="omega", editor=CellEditor(), width=50),
TableColumn(field="chi", title="chi", editor=CellEditor(), width=50),
TableColumn(field="phi", title="phi", editor=CellEditor(), width=50),
TableColumn(field="nu", title="nu", editor=CellEditor(), width=50),
],
width=310, # +60 because of the index column, but excluding twotheta onwards
height=350,
autosize_mode="none",
editable=True,
)
def _get_selected_scan():
return dataset[scan_table_source.selected.indices[0]]
def _update_table():
export = [scan["export"] for scan in dataset]
scan_table_source.data.update(export=export)
def monitor_spinner_callback(_attr, old, new):
if dataset:
pyzebra.normalize_dataset(dataset, new)
_update_image()
_update_overview_plot()
monitor_spinner = Spinner(title="Monitor:", mode="int", value=100_000, low=1, width=145)
monitor_spinner.on_change("value", monitor_spinner_callback)
merge_from_select = Select(title="scan:", width=145)
def merge_button_callback():
scan_into = _get_selected_scan()
scan_from = dataset[int(merge_from_select.value)]
if scan_into is scan_from:
print("WARNING: Selected scans for merging are identical")
return
pyzebra.merge_h5_scans(scan_into, scan_from)
_update_table()
_update_image()
_update_overview_plot()
merge_button = Button(label="Merge into current", width=145)
merge_button.on_click(merge_button_callback)
def restore_button_callback():
pyzebra.restore_scan(_get_selected_scan())
_update_table()
_update_image()
_update_overview_plot()
restore_button = Button(label="Restore scan", width=145)
restore_button.on_click(restore_button_callback)
def _update_image(index=None):
if index is None: if index is None:
index = index_spinner.value index = index_spinner.value
current_image = det_data["data"][index] scan = _get_selected_scan()
current_image = scan["counts"][index]
proj_v_line_source.data.update( proj_v_line_source.data.update(
x=np.arange(0, IMAGE_W) + 0.5, y=np.mean(current_image, axis=0) x=np.arange(0, IMAGE_W) + 0.5, y=np.mean(current_image, axis=0)
) )
@ -180,24 +350,24 @@ def create():
image_glyph.color_mapper.low = im_min image_glyph.color_mapper.low = im_min
image_glyph.color_mapper.high = im_max image_glyph.color_mapper.high = im_max
if "mf" in det_data: if "mf" in scan:
metadata_table_source.data.update(mf=[det_data["mf"][index]]) metadata_table_source.data.update(mf=[scan["mf"][index]])
else: else:
metadata_table_source.data.update(mf=[None]) metadata_table_source.data.update(mf=[None])
if "temp" in det_data: if "temp" in scan:
metadata_table_source.data.update(temp=[det_data["temp"][index]]) metadata_table_source.data.update(temp=[scan["temp"][index]])
else: else:
metadata_table_source.data.update(temp=[None]) metadata_table_source.data.update(temp=[None])
gamma, nu = calculate_pol(det_data, index) gamma, nu = calculate_pol(scan, index)
omega = np.ones((IMAGE_H, IMAGE_W)) * det_data["omega"][index] omega = np.ones((IMAGE_H, IMAGE_W)) * scan["omega"][index]
image_source.data.update(gamma=[gamma], nu=[nu], omega=[omega]) image_source.data.update(gamma=[gamma], nu=[nu], omega=[omega])
# update detector center angles # update detector center angles
det_c_x = int(IMAGE_W / 2) det_c_x = int(IMAGE_W / 2)
det_c_y = int(IMAGE_H / 2) det_c_y = int(IMAGE_H / 2)
if det_data["zebra_mode"] == "nb": if scan["zebra_mode"] == "nb":
gamma_c = gamma[det_c_y, det_c_x] gamma_c = gamma[det_c_y, det_c_x]
nu_c = nu[det_c_y, det_c_x] nu_c = nu[det_c_y, det_c_x]
omega_c = omega[det_c_y, det_c_x] omega_c = omega[det_c_y, det_c_x]
@ -205,13 +375,13 @@ def create():
phi_c = None phi_c = None
else: # zebra_mode == "bi" else: # zebra_mode == "bi"
wave = det_data["wave"] wave = scan["wave"]
ddist = det_data["ddist"] ddist = scan["ddist"]
gammad = det_data["gamma"][index] gammad = scan["gamma"][index]
om = det_data["omega"][index] om = scan["omega"][index]
ch = det_data["chi"][index] ch = scan["chi"][index]
ph = det_data["phi"][index] ph = scan["phi"][index]
nud = det_data["nu"] nud = scan["nu"]
nu_c = 0 nu_c = 0
chi_c, phi_c, gamma_c, omega_c = pyzebra.ang_proc( chi_c, phi_c, gamma_c, omega_c = pyzebra.ang_proc(
@ -222,11 +392,12 @@ def create():
gamma=[gamma_c], nu=[nu_c], omega=[omega_c], chi=[chi_c], phi=[phi_c], gamma=[gamma_c], nu=[nu_c], omega=[omega_c], chi=[chi_c], phi=[phi_c],
) )
def update_overview_plot(): def _update_overview_plot():
h5_data = det_data["data"] scan = _get_selected_scan()
n_im, n_y, n_x = h5_data.shape counts = scan["counts"]
overview_x = np.mean(h5_data, axis=1) n_im, n_y, n_x = counts.shape
overview_y = np.mean(h5_data, axis=2) overview_x = np.mean(counts, axis=1)
overview_y = np.mean(counts, axis=2)
# normalize for simpler colormapping # normalize for simpler colormapping
overview_max_val = max(np.max(overview_x), np.max(overview_y)) overview_max_val = max(np.max(overview_x), np.max(overview_y))
@ -254,10 +425,10 @@ def create():
frame_range.reset_end = n_im frame_range.reset_end = n_im
frame_range.bounds = (0, n_im) frame_range.bounds = (0, n_im)
scan_motor = det_data["scan_motor"] scan_motor = scan["scan_motor"]
overview_plot_y.axis[1].axis_label = f"Scanning motor, {scan_motor}" overview_plot_y.axis[1].axis_label = f"Scanning motor, {scan_motor}"
var = det_data[scan_motor] var = scan[scan_motor]
var_start = var[0] var_start = var[0]
var_end = var[-1] + (var[-1] - var[0]) / (n_im - 1) if n_im != 1 else var_start + 1 var_end = var[-1] + (var[-1] - var[0]) / (n_im - 1) if n_im != 1 else var_start + 1
@ -288,28 +459,10 @@ def create():
nu_range.reset_end = nu_end nu_range.reset_end = nu_end
nu_range.bounds = (min(nu_start, nu_end), max(nu_start, nu_end)) nu_range.bounds = (min(nu_start, nu_end), max(nu_start, nu_end))
def file_select_callback(_attr, old, new):
if not new:
# skip empty selections
return
# Avoid selection of multiple indicies (via Shift+Click or Ctrl+Click)
if len(new) > 1:
# drop selection to the previous one
file_select.value = old
return
if len(old) > 1:
# skip unnecessary update caused by selection drop
return
file_open_button_callback()
file_select = MultiSelect(title="Available .hdf files:", width=210, height=250) file_select = MultiSelect(title="Available .hdf files:", width=210, height=250)
file_select.on_change("value", file_select_callback)
def index_callback(_attr, _old, new): def index_callback(_attr, _old, new):
update_image(new) _update_image(new)
index_slider = Slider(value=0, start=0, end=1, show_value=False, width=400) index_slider = Slider(value=0, start=0, end=1, show_value=False, width=400)
@ -374,9 +527,10 @@ def create():
# calculate hkl-indices of first mouse entry # calculate hkl-indices of first mouse entry
def mouse_enter_callback(_event): def mouse_enter_callback(_event):
if det_data and np.array_equal(image_source.data["h"][0], np.zeros((1, 1))): if dataset and np.array_equal(image_source.data["h"][0], np.zeros((1, 1))):
scan = _get_selected_scan()
index = index_spinner.value index = index_spinner.value
h, k, l = calculate_hkl(det_data, index) h, k, l = calculate_hkl(scan, index)
image_source.data.update(h=[h], k=[k], l=[l]) image_source.data.update(h=[h], k=[k], l=[l])
plot.on_event(MouseEnter, mouse_enter_callback) plot.on_event(MouseEnter, mouse_enter_callback)
@ -438,13 +592,14 @@ def create():
def box_edit_callback(_attr, _old, new): def box_edit_callback(_attr, _old, new):
if new["x"]: if new["x"]:
h5_data = det_data["data"] scan = _get_selected_scan()
x_val = np.arange(h5_data.shape[0]) counts = scan["counts"]
x_val = np.arange(counts.shape[0])
left = int(np.floor(new["x"][0])) left = int(np.floor(new["x"][0]))
right = int(np.ceil(new["x"][0] + new["width"][0])) right = int(np.ceil(new["x"][0] + new["width"][0]))
bottom = int(np.floor(new["y"][0])) bottom = int(np.floor(new["y"][0]))
top = int(np.ceil(new["y"][0] + new["height"][0])) top = int(np.ceil(new["y"][0] + new["height"][0]))
y_val = np.sum(h5_data[:, bottom:top, left:right], axis=(1, 2)) y_val = np.sum(counts[:, bottom:top, left:right], axis=(1, 2))
else: else:
x_val = [] x_val = []
y_val = [] y_val = []
@ -600,7 +755,7 @@ def create():
display_min_spinner.disabled = False display_min_spinner.disabled = False
display_max_spinner.disabled = False display_max_spinner.disabled = False
update_image() _update_image()
main_auto_checkbox = CheckboxGroup( main_auto_checkbox = CheckboxGroup(
labels=["Frame Intensity Range"], active=[0], width=145, margin=[10, 5, 0, 5] labels=["Frame Intensity Range"], active=[0], width=145, margin=[10, 5, 0, 5]
@ -646,7 +801,7 @@ def create():
proj_display_min_spinner.disabled = False proj_display_min_spinner.disabled = False
proj_display_max_spinner.disabled = False proj_display_max_spinner.disabled = False
update_overview_plot() _update_overview_plot()
proj_auto_checkbox = CheckboxGroup( proj_auto_checkbox = CheckboxGroup(
labels=["Projections Intensity Range"], active=[0], width=145, margin=[10, 5, 0, 5] labels=["Projections Intensity Range"], active=[0], width=145, margin=[10, 5, 0, 5]
@ -738,8 +893,9 @@ def create():
) )
def add_event_button_callback(): def add_event_button_callback():
scan = _get_selected_scan()
pyzebra.fit_event( pyzebra.fit_event(
det_data, scan,
int(np.floor(frame_range.start)), int(np.floor(frame_range.start)),
int(np.ceil(frame_range.end)), int(np.ceil(frame_range.end)),
int(np.floor(det_y_range.start)), int(np.floor(det_y_range.start)),
@ -748,21 +904,21 @@ def create():
int(np.ceil(det_x_range.end)), int(np.ceil(det_x_range.end)),
) )
wave = det_data["wave"] wave = scan["wave"]
ddist = det_data["ddist"] ddist = scan["ddist"]
cell = det_data["cell"] cell = scan["cell"]
gamma = det_data["gamma"][0] gamma = scan["gamma"][0]
omega = det_data["omega"][0] omega = scan["omega"][0]
nu = det_data["nu"][0] nu = scan["nu"][0]
chi = det_data["chi"][0] chi = scan["chi"][0]
phi = det_data["phi"][0] phi = scan["phi"][0]
scan_motor = det_data["scan_motor"] scan_motor = scan["scan_motor"]
var_angle = det_data[scan_motor] var_angle = scan[scan_motor]
snr_cnts = det_data["fit"]["snr"] snr_cnts = scan["fit"]["snr"]
frC = det_data["fit"]["frame"] frC = scan["fit"]["frame"]
var_F = var_angle[int(np.floor(frC))] var_F = var_angle[int(np.floor(frC))]
var_C = var_angle[int(np.ceil(frC))] var_C = var_angle[int(np.ceil(frC))]
@ -781,11 +937,11 @@ def create():
elif scan_motor == "phi": elif scan_motor == "phi":
phi = var_p phi = var_p
intensity = det_data["fit"]["intensity"] intensity = scan["fit"]["intensity"]
x_pos = det_data["fit"]["x_pos"] x_pos = scan["fit"]["x_pos"]
y_pos = det_data["fit"]["y_pos"] y_pos = scan["fit"]["y_pos"]
if det_data["zebra_mode"] == "nb": if scan["zebra_mode"] == "nb":
chi = None chi = None
phi = None phi = None
@ -848,7 +1004,7 @@ def create():
upload_hdf_div, upload_hdf_div,
upload_hdf_button, upload_hdf_button,
file_select, file_select,
file_open_button, row(file_open_button, file_append_button),
) )
layout_image = column(gridplot([[proj_v, None], [plot, proj_h]], merge_tools=False)) layout_image = column(gridplot([[proj_v, None], [plot, proj_h]], merge_tools=False))
@ -874,31 +1030,37 @@ def create():
), ),
) )
scan_layout = column(
scan_table,
row(monitor_spinner, column(Spacer(height=19), restore_button)),
row(column(Spacer(height=19), merge_button), merge_from_select),
)
tab_layout = row( tab_layout = row(
column(import_layout, colormap_layout), column(import_layout, colormap_layout),
column(layout_overview, layout_controls), column(row(scan_layout, layout_overview), layout_controls),
column(roi_avg_plot, layout_image), column(roi_avg_plot, layout_image),
) )
return Panel(child=tab_layout, title="hdf viewer") return Panel(child=tab_layout, title="hdf viewer")
def calculate_hkl(det_data, index): def calculate_hkl(scan, index):
h = np.empty(shape=(IMAGE_H, IMAGE_W)) h = np.empty(shape=(IMAGE_H, IMAGE_W))
k = np.empty(shape=(IMAGE_H, IMAGE_W)) k = np.empty(shape=(IMAGE_H, IMAGE_W))
l = np.empty(shape=(IMAGE_H, IMAGE_W)) l = np.empty(shape=(IMAGE_H, IMAGE_W))
wave = det_data["wave"] wave = scan["wave"]
ddist = det_data["ddist"] ddist = scan["ddist"]
gammad = det_data["gamma"][index] gammad = scan["gamma"][index]
om = det_data["omega"][index] om = scan["omega"][index]
nud = det_data["nu"] nud = scan["nu"]
ub = det_data["ub"] ub = scan["ub"]
geometry = det_data["zebra_mode"] geometry = scan["zebra_mode"]
if geometry == "bi": if geometry == "bi":
chi = det_data["chi"][index] chi = scan["chi"][index]
phi = det_data["phi"][index] phi = scan["phi"][index]
elif geometry == "nb": elif geometry == "nb":
chi = 0 chi = 0
phi = 0 phi = 0
@ -914,10 +1076,10 @@ def calculate_hkl(det_data, index):
return h, k, l return h, k, l
def calculate_pol(det_data, index): def calculate_pol(scan, index):
ddist = det_data["ddist"] ddist = scan["ddist"]
gammad = det_data["gamma"][index] gammad = scan["gamma"][index]
nud = det_data["nu"] nud = scan["nu"]
yi, xi = np.ogrid[:IMAGE_H, :IMAGE_W] yi, xi = np.ogrid[:IMAGE_H, :IMAGE_W]
gamma, nu = pyzebra.det2pol(ddist, gammad, nud, xi, yi) gamma, nu = pyzebra.det2pol(ddist, gammad, nud, xi, yi)

View File

@ -83,7 +83,7 @@ def color_palette(n_colors):
def create(): def create():
doc = curdoc() doc = curdoc()
det_data = [] dataset = []
fit_params = {} fit_params = {}
js_data = ColumnDataSource(data=dict(content=[""], fname=[""], ext=[""])) js_data = ColumnDataSource(data=dict(content=[""], fname=[""], ext=[""]))
@ -111,15 +111,15 @@ def create():
proposal_textinput.on_change("name", proposal_textinput_callback) proposal_textinput.on_change("name", proposal_textinput_callback)
def _init_datatable(): def _init_datatable():
scan_list = [s["idx"] for s in det_data] scan_list = [s["idx"] for s in dataset]
export = [s["export"] for s in det_data] export = [s["export"] for s in dataset]
if param_select.value == "user defined": if param_select.value == "user defined":
param = [None] * len(det_data) param = [None] * len(dataset)
else: else:
param = [scan[param_select.value] for scan in det_data] param = [scan[param_select.value] for scan in dataset]
file_list = [] file_list = []
for scan in det_data: for scan in dataset:
file_list.append(os.path.basename(scan["original_filename"])) file_list.append(os.path.basename(scan["original_filename"]))
scan_table_source.data.update( scan_table_source.data.update(
@ -128,8 +128,8 @@ def create():
scan_table_source.selected.indices = [] scan_table_source.selected.indices = []
scan_table_source.selected.indices = [0] scan_table_source.selected.indices = [0]
scan_motor_select.options = det_data[0]["scan_motors"] scan_motor_select.options = dataset[0]["scan_motors"]
scan_motor_select.value = det_data[0]["scan_motor"] scan_motor_select.value = dataset[0]["scan_motor"]
merge_options = [(str(i), f"{i} ({idx})") for i, idx in enumerate(scan_list)] merge_options = [(str(i), f"{i} ({idx})") for i, idx in enumerate(scan_list)]
merge_from_select.options = merge_options merge_from_select.options = merge_options
@ -138,7 +138,7 @@ def create():
file_select = MultiSelect(title="Available .ccl/.dat files:", width=210, height=250) file_select = MultiSelect(title="Available .ccl/.dat files:", width=210, height=250)
def file_open_button_callback(): def file_open_button_callback():
nonlocal det_data nonlocal dataset
new_data = [] new_data = []
for f_path in file_select.value: for f_path in file_select.value:
with open(f_path) as file: with open(f_path) as file:
@ -160,7 +160,7 @@ def create():
pyzebra.merge_datasets(new_data, file_data) pyzebra.merge_datasets(new_data, file_data)
if new_data: if new_data:
det_data = new_data dataset = new_data
_init_datatable() _init_datatable()
append_upload_button.disabled = False append_upload_button.disabled = False
@ -180,7 +180,7 @@ def create():
continue continue
pyzebra.normalize_dataset(file_data, monitor_spinner.value) pyzebra.normalize_dataset(file_data, monitor_spinner.value)
pyzebra.merge_datasets(det_data, file_data) pyzebra.merge_datasets(dataset, file_data)
if file_data: if file_data:
_init_datatable() _init_datatable()
@ -189,7 +189,7 @@ def create():
file_append_button.on_click(file_append_button_callback) file_append_button.on_click(file_append_button_callback)
def upload_button_callback(_attr, _old, _new): def upload_button_callback(_attr, _old, _new):
nonlocal det_data nonlocal dataset
new_data = [] new_data = []
for f_str, f_name in zip(upload_button.value, upload_button.filename): for f_str, f_name in zip(upload_button.value, upload_button.filename):
with io.StringIO(base64.b64decode(f_str).decode()) as file: with io.StringIO(base64.b64decode(f_str).decode()) as file:
@ -210,7 +210,7 @@ def create():
pyzebra.merge_datasets(new_data, file_data) pyzebra.merge_datasets(new_data, file_data)
if new_data: if new_data:
det_data = new_data dataset = new_data
_init_datatable() _init_datatable()
append_upload_button.disabled = False append_upload_button.disabled = False
@ -232,7 +232,7 @@ def create():
continue continue
pyzebra.normalize_dataset(file_data, monitor_spinner.value) pyzebra.normalize_dataset(file_data, monitor_spinner.value)
pyzebra.merge_datasets(det_data, file_data) pyzebra.merge_datasets(dataset, file_data)
if file_data: if file_data:
_init_datatable() _init_datatable()
@ -244,8 +244,8 @@ def create():
append_upload_button.on_change("filename", append_upload_button_callback) append_upload_button.on_change("filename", append_upload_button_callback)
def monitor_spinner_callback(_attr, _old, new): def monitor_spinner_callback(_attr, _old, new):
if det_data: if dataset:
pyzebra.normalize_dataset(det_data, new) pyzebra.normalize_dataset(dataset, new)
_update_single_scan_plot() _update_single_scan_plot()
_update_overview() _update_overview()
@ -253,8 +253,8 @@ def create():
monitor_spinner.on_change("value", monitor_spinner_callback) monitor_spinner.on_change("value", monitor_spinner_callback)
def scan_motor_select_callback(_attr, _old, new): def scan_motor_select_callback(_attr, _old, new):
if det_data: if dataset:
for scan in det_data: for scan in dataset:
scan["scan_motor"] = new scan["scan_motor"] = new
_update_single_scan_plot() _update_single_scan_plot()
_update_overview() _update_overview()
@ -263,12 +263,12 @@ def create():
scan_motor_select.on_change("value", scan_motor_select_callback) scan_motor_select.on_change("value", scan_motor_select_callback)
def _update_table(): def _update_table():
fit_ok = [(1 if "fit" in scan else 0) for scan in det_data] fit_ok = [(1 if "fit" in scan else 0) for scan in dataset]
export = [scan["export"] for scan in det_data] export = [scan["export"] for scan in dataset]
if param_select.value == "user defined": if param_select.value == "user defined":
param = [None] * len(det_data) param = [None] * len(dataset)
else: else:
param = [scan[param_select.value] for scan in det_data] param = [scan[param_select.value] for scan in dataset]
scan_table_source.data.update(fit=fit_ok, export=export, param=param) scan_table_source.data.update(fit=fit_ok, export=export, param=param)
@ -322,7 +322,7 @@ def create():
par = [] par = []
for s, p in enumerate(scan_table_source.data["param"]): for s, p in enumerate(scan_table_source.data["param"]):
if p is not None: if p is not None:
scan = det_data[s] scan = dataset[s]
scan_motor = scan["scan_motor"] scan_motor = scan["scan_motor"]
xs.append(scan[scan_motor]) xs.append(scan[scan_motor])
x.extend(scan[scan_motor]) x.extend(scan[scan_motor])
@ -331,8 +331,8 @@ def create():
param.append(float(p)) param.append(float(p))
par.extend(scan["counts"]) par.extend(scan["counts"])
if det_data: if dataset:
scan_motor = det_data[0]["scan_motor"] scan_motor = dataset[0]["scan_motor"]
ov_plot.axis[0].axis_label = scan_motor ov_plot.axis[0].axis_label = scan_motor
ov_param_plot.axis[0].axis_label = scan_motor ov_param_plot.axis[0].axis_label = scan_motor
@ -371,11 +371,13 @@ def create():
y_lower = [] y_lower = []
y_upper = [] y_upper = []
fit_param = fit_param_select.value fit_param = fit_param_select.value
for s, p in zip(det_data, scan_table_source.data["param"]): for s, p in zip(dataset, scan_table_source.data["param"]):
if "fit" in s and fit_param: if "fit" in s and fit_param:
x.append(p) x.append(p)
param_fit_val = s["fit"].params[fit_param].value param_fit_val = s["fit"].params[fit_param].value
param_fit_std = s["fit"].params[fit_param].stderr param_fit_std = s["fit"].params[fit_param].stderr
if param_fit_std is None:
param_fit_std = 0
y.append(param_fit_val) y.append(param_fit_val)
y_lower.append(param_fit_val - param_fit_std) y_lower.append(param_fit_val - param_fit_std)
y_upper.append(param_fit_val + param_fit_std) y_upper.append(param_fit_val + param_fit_std)
@ -534,7 +536,7 @@ def create():
def scan_table_source_callback(_attr, _old, new): def scan_table_source_callback(_attr, _old, new):
# unfortunately, we don't know if the change comes from data update or user input # unfortunately, we don't know if the change comes from data update or user input
# also `old` and `new` are the same for non-scalars # also `old` and `new` are the same for non-scalars
for scan, export in zip(det_data, new["export"]): for scan, export in zip(dataset, new["export"]):
scan["export"] = export scan["export"] = export
_update_overview() _update_overview()
_update_param_plot() _update_param_plot()
@ -563,7 +565,7 @@ def create():
def merge_button_callback(): def merge_button_callback():
scan_into = _get_selected_scan() scan_into = _get_selected_scan()
scan_from = det_data[int(merge_from_select.value)] scan_from = dataset[int(merge_from_select.value)]
if scan_into is scan_from: if scan_into is scan_from:
print("WARNING: Selected scans for merging are identical") print("WARNING: Selected scans for merging are identical")
@ -587,7 +589,7 @@ def create():
restore_button.on_click(restore_button_callback) restore_button.on_click(restore_button_callback)
def _get_selected_scan(): def _get_selected_scan():
return det_data[scan_table_source.selected.indices[0]] return dataset[scan_table_source.selected.indices[0]]
def param_select_callback(_attr, _old, _new): def param_select_callback(_attr, _old, _new):
_update_table() _update_table()
@ -720,7 +722,7 @@ def create():
fit_output_textinput = TextAreaInput(title="Fit results:", width=750, height=200) fit_output_textinput = TextAreaInput(title="Fit results:", width=750, height=200)
def proc_all_button_callback(): def proc_all_button_callback():
for scan in det_data: for scan in dataset:
if scan["export"]: if scan["export"]:
pyzebra.fit_scan( pyzebra.fit_scan(
scan, fit_params, fit_from=fit_from_spinner.value, fit_to=fit_to_spinner.value scan, fit_params, fit_from=fit_from_spinner.value, fit_to=fit_to_spinner.value
@ -735,7 +737,7 @@ def create():
_update_overview() _update_overview()
_update_table() _update_table()
for scan in det_data: for scan in dataset:
if "fit" in scan: if "fit" in scan:
options = list(scan["fit"].params.keys()) options = list(scan["fit"].params.keys())
fit_param_select.options = options fit_param_select.options = options
@ -760,7 +762,7 @@ def create():
_update_overview() _update_overview()
_update_table() _update_table()
for scan in det_data: for scan in dataset:
if "fit" in scan: if "fit" in scan:
options = list(scan["fit"].params.keys()) options = list(scan["fit"].params.keys())
fit_param_select.options = options fit_param_select.options = options
@ -782,7 +784,7 @@ def create():
temp_file = temp_dir + "/temp" temp_file = temp_dir + "/temp"
export_data = [] export_data = []
param_data = [] param_data = []
for scan, param in zip(det_data, scan_table_source.data["param"]): for scan, param in zip(dataset, scan_table_source.data["param"]):
if scan["export"] and param: if scan["export"] and param:
export_data.append(scan) export_data.append(scan)
param_data.append(param) param_data.append(param)

View File

@ -1,5 +1,6 @@
import os import os
import re import re
from ast import literal_eval
from collections import defaultdict from collections import defaultdict
import numpy as np import numpy as np
@ -56,7 +57,7 @@ META_VARS_FLOAT = (
"s2hl", "s2hl",
) )
META_UB_MATRIX = ("ub1j", "ub2j", "ub3j") META_UB_MATRIX = ("ub1j", "ub2j", "ub3j", "UB")
CCL_FIRST_LINE = (("idx", int), ("h", float), ("k", float), ("l", float)) CCL_FIRST_LINE = (("idx", int), ("h", float), ("k", float), ("l", float))
@ -93,9 +94,9 @@ def load_1D(filepath):
""" """
with open(filepath, "r") as infile: with open(filepath, "r") as infile:
_, ext = os.path.splitext(filepath) _, ext = os.path.splitext(filepath)
det_variables = parse_1D(infile, data_type=ext) dataset = parse_1D(infile, data_type=ext)
return det_variables return dataset
def parse_1D(fileobj, data_type): def parse_1D(fileobj, data_type):
@ -108,21 +109,29 @@ def parse_1D(fileobj, data_type):
variable = variable.strip() variable = variable.strip()
value = value.strip() value = value.strip()
if variable in META_VARS_STR: try:
metadata[variable] = value if variable in META_VARS_STR:
metadata[variable] = value
elif variable in META_VARS_FLOAT: elif variable in META_VARS_FLOAT:
if variable == "2-theta": # fix that angle name not to be an expression if variable == "2-theta": # fix that angle name not to be an expression
variable = "twotheta" variable = "twotheta"
if variable in ("a", "b", "c", "alpha", "beta", "gamma"): if variable in ("a", "b", "c", "alpha", "beta", "gamma"):
variable += "_cell" variable += "_cell"
metadata[variable] = float(value) metadata[variable] = float(value)
elif variable in META_UB_MATRIX: elif variable in META_UB_MATRIX:
if "ub" not in metadata: if variable == "UB":
metadata["ub"] = np.zeros((3, 3)) metadata["ub"] = np.array(literal_eval(value)).reshape(3, 3)
row = int(variable[-2]) - 1 else:
metadata["ub"][row, :] = list(map(float, value.split())) if "ub" not in metadata:
metadata["ub"] = np.zeros((3, 3))
row = int(variable[-2]) - 1
metadata["ub"][row, :] = list(map(float, value.split()))
except Exception:
print(f"Error reading {variable} with value '{value}'")
metadata[variable] = 0
if "#data" in line: if "#data" in line:
# this is the end of metadata and the start of data section # this is the end of metadata and the start of data section
@ -133,7 +142,7 @@ def parse_1D(fileobj, data_type):
metadata["zebra_mode"] = "nb" metadata["zebra_mode"] = "nb"
# read data # read data
scan = [] dataset = []
if data_type == ".ccl": if data_type == ".ccl":
ccl_first_line = CCL_FIRST_LINE + CCL_ANGLES[metadata["zebra_mode"]] ccl_first_line = CCL_FIRST_LINE + CCL_ANGLES[metadata["zebra_mode"]]
ccl_second_line = CCL_SECOND_LINE ccl_second_line = CCL_SECOND_LINE
@ -143,47 +152,52 @@ def parse_1D(fileobj, data_type):
if not line or line.isspace(): if not line or line.isspace():
continue continue
s = {} scan = {}
s["export"] = True scan["export"] = True
# first line # first line
for param, (param_name, param_type) in zip(line.split(), ccl_first_line): for param, (param_name, param_type) in zip(line.split(), ccl_first_line):
s[param_name] = param_type(param) scan[param_name] = param_type(param)
# second line # second line
next_line = next(fileobj) next_line = next(fileobj)
for param, (param_name, param_type) in zip(next_line.split(), ccl_second_line): for param, (param_name, param_type) in zip(next_line.split(), ccl_second_line):
s[param_name] = param_type(param) scan[param_name] = param_type(param)
if s["scan_motor"] != "om": if "scan_motor" not in scan:
scan["scan_motor"] = "om"
if scan["scan_motor"] != "om":
raise Exception("Unsupported variable name in ccl file.") raise Exception("Unsupported variable name in ccl file.")
# "om" -> "omega" # "om" -> "omega"
s["scan_motor"] = "omega" scan["scan_motor"] = "omega"
s["scan_motors"] = ["omega", ] scan["scan_motors"] = ["omega", ]
# overwrite metadata, because it only refers to the scan center # overwrite metadata, because it only refers to the scan center
half_dist = (s["n_points"] - 1) / 2 * s["angle_step"] half_dist = (scan["n_points"] - 1) / 2 * scan["angle_step"]
s["omega"] = np.linspace(s["omega"] - half_dist, s["omega"] + half_dist, s["n_points"]) scan["omega"] = np.linspace(
scan["omega"] - half_dist, scan["omega"] + half_dist, scan["n_points"]
)
# subsequent lines with counts # subsequent lines with counts
counts = [] counts = []
while len(counts) < s["n_points"]: while len(counts) < scan["n_points"]:
counts.extend(map(float, next(fileobj).split())) counts.extend(map(float, next(fileobj).split()))
s["counts"] = np.array(counts) scan["counts"] = np.array(counts)
s["counts_err"] = np.sqrt(np.maximum(s["counts"], 1)) scan["counts_err"] = np.sqrt(np.maximum(scan["counts"], 1))
if s["h"].is_integer() and s["k"].is_integer() and s["l"].is_integer(): if scan["h"].is_integer() and scan["k"].is_integer() and scan["l"].is_integer():
s["h"], s["k"], s["l"] = map(int, (s["h"], s["k"], s["l"])) scan["h"], scan["k"], scan["l"] = map(int, (scan["h"], scan["k"], scan["l"]))
scan.append({**metadata, **s}) dataset.append({**metadata, **scan})
elif data_type == ".dat": elif data_type == ".dat":
# TODO: this might need to be adapted in the future, when "gamma" will be added to dat files # TODO: this might need to be adapted in the future, when "gamma" will be added to dat files
if metadata["zebra_mode"] == "nb": if metadata["zebra_mode"] == "nb":
metadata["gamma"] = metadata["twotheta"] metadata["gamma"] = metadata["twotheta"]
s = defaultdict(list) scan = defaultdict(list)
s["export"] = True scan["export"] = True
match = re.search("Scanning Variables: (.*), Steps: (.*)", next(fileobj)) match = re.search("Scanning Variables: (.*), Steps: (.*)", next(fileobj))
motors = [motor.lower() for motor in match.group(1).split(", ")] motors = [motor.lower() for motor in match.group(1).split(", ")]
@ -192,8 +206,8 @@ def parse_1D(fileobj, data_type):
match = re.search("(.*) Points, Mode: (.*), Preset (.*)", next(fileobj)) match = re.search("(.*) Points, Mode: (.*), Preset (.*)", next(fileobj))
if match.group(2) != "Monitor": if match.group(2) != "Monitor":
raise Exception("Unknown mode in dat file.") raise Exception("Unknown mode in dat file.")
s["n_points"] = int(match.group(1)) scan["n_points"] = int(match.group(1))
s["monitor"] = float(match.group(3)) scan["monitor"] = float(match.group(3))
col_names = list(map(str.lower, next(fileobj).split())) col_names = list(map(str.lower, next(fileobj).split()))
@ -203,56 +217,56 @@ def parse_1D(fileobj, data_type):
break break
for name, val in zip(col_names, line.split()): for name, val in zip(col_names, line.split()):
s[name].append(float(val)) scan[name].append(float(val))
for name in col_names: for name in col_names:
s[name] = np.array(s[name]) scan[name] = np.array(scan[name])
s["counts_err"] = np.sqrt(np.maximum(s["counts"], 1)) scan["counts_err"] = np.sqrt(np.maximum(scan["counts"], 1))
s["scan_motors"] = [] scan["scan_motors"] = []
for motor, step in zip(motors, steps): for motor, step in zip(motors, steps):
if step == 0: if step == 0:
# it's not a scan motor, so keep only the median value # it's not a scan motor, so keep only the median value
s[motor] = np.median(s[motor]) scan[motor] = np.median(scan[motor])
else: else:
s["scan_motors"].append(motor) scan["scan_motors"].append(motor)
# "om" -> "omega" # "om" -> "omega"
if "om" in s["scan_motors"]: if "om" in scan["scan_motors"]:
s["scan_motors"][s["scan_motors"].index("om")] = "omega" scan["scan_motors"][scan["scan_motors"].index("om")] = "omega"
s["omega"] = s["om"] scan["omega"] = scan["om"]
del s["om"] del scan["om"]
# "tt" -> "temp" # "tt" -> "temp"
if "tt" in s["scan_motors"]: if "tt" in scan["scan_motors"]:
s["scan_motors"][s["scan_motors"].index("tt")] = "temp" scan["scan_motors"][scan["scan_motors"].index("tt")] = "temp"
s["temp"] = s["tt"] scan["temp"] = scan["tt"]
del s["tt"] del scan["tt"]
# "mf" stays "mf" # "mf" stays "mf"
# "phi" stays "phi" # "phi" stays "phi"
s["scan_motor"] = s["scan_motors"][0] scan["scan_motor"] = scan["scan_motors"][0]
if "h" not in s: if "h" not in scan:
s["h"] = s["k"] = s["l"] = float("nan") scan["h"] = scan["k"] = scan["l"] = float("nan")
for param in ("mf", "temp"): for param in ("mf", "temp"):
if param not in metadata: if param not in metadata:
s[param] = 0 scan[param] = 0
s["idx"] = 1 scan["idx"] = 1
scan.append({**metadata, **s}) dataset.append({**metadata, **scan})
else: else:
print("Unknown file extention") print("Unknown file extention")
return scan return dataset
def export_1D(data, path, export_target, hkl_precision=2): def export_1D(dataset, path, export_target, hkl_precision=2):
"""Exports data in the .comm/.incomm format for fullprof or .col/.incol format for jana. """Exports data in the .comm/.incomm format for fullprof or .col/.incol format for jana.
Scans with integer/real hkl values are saved in .comm/.incomm or .col/.incol files Scans with integer/real hkl values are saved in .comm/.incomm or .col/.incol files
@ -262,11 +276,11 @@ def export_1D(data, path, export_target, hkl_precision=2):
if export_target not in EXPORT_TARGETS: if export_target not in EXPORT_TARGETS:
raise ValueError(f"Unknown export target: {export_target}.") raise ValueError(f"Unknown export target: {export_target}.")
zebra_mode = data[0]["zebra_mode"] zebra_mode = dataset[0]["zebra_mode"]
exts = EXPORT_TARGETS[export_target] exts = EXPORT_TARGETS[export_target]
file_content = {ext: [] for ext in exts} file_content = {ext: [] for ext in exts}
for scan in data: for scan in dataset:
if "fit" not in scan: if "fit" not in scan:
continue continue
@ -306,7 +320,7 @@ def export_1D(data, path, export_target, hkl_precision=2):
out_file.writelines(content) out_file.writelines(content)
def export_ccl_compare(data1, data2, path, export_target, hkl_precision=2): def export_ccl_compare(dataset1, dataset2, path, export_target, hkl_precision=2):
"""Exports compare data in the .comm/.incomm format for fullprof or .col/.incol format for jana. """Exports compare data in the .comm/.incomm format for fullprof or .col/.incol format for jana.
Scans with integer/real hkl values are saved in .comm/.incomm or .col/.incol files Scans with integer/real hkl values are saved in .comm/.incomm or .col/.incol files
@ -316,11 +330,11 @@ def export_ccl_compare(data1, data2, path, export_target, hkl_precision=2):
if export_target not in EXPORT_TARGETS: if export_target not in EXPORT_TARGETS:
raise ValueError(f"Unknown export target: {export_target}.") raise ValueError(f"Unknown export target: {export_target}.")
zebra_mode = data1[0]["zebra_mode"] zebra_mode = dataset1[0]["zebra_mode"]
exts = EXPORT_TARGETS[export_target] exts = EXPORT_TARGETS[export_target]
file_content = {ext: [] for ext in exts} file_content = {ext: [] for ext in exts}
for scan1, scan2 in zip(data1, data2): for scan1, scan2 in zip(dataset1, dataset2):
if "fit" not in scan1: if "fit" not in scan1:
continue continue
@ -363,9 +377,9 @@ def export_ccl_compare(data1, data2, path, export_target, hkl_precision=2):
out_file.writelines(content) out_file.writelines(content)
def export_param_study(data, param_data, path): def export_param_study(dataset, param_data, path):
file_content = [] file_content = []
for scan, param in zip(data, param_data): for scan, param in zip(dataset, param_data):
if "fit" not in scan: if "fit" not in scan:
continue continue
@ -380,7 +394,11 @@ def export_param_study(data, param_data, path):
fit_str = "" fit_str = ""
for fit_param in scan["fit"].params.values(): for fit_param in scan["fit"].params.values():
fit_str = fit_str + f"{fit_param.value:<20.2f}" + f"{fit_param.stderr:<20.2f}" fit_param_val = fit_param.value
fit_param_std = fit_param.stderr
if fit_param_std is None:
fit_param_std = 0
fit_str = fit_str + f"{fit_param_val:<20.2f}" + f"{fit_param_std:<20.2f}"
_, fname_str = os.path.split(scan["original_filename"]) _, fname_str = os.path.split(scan["original_filename"])

View File

@ -1,7 +1,7 @@
import os import os
import numpy as np import numpy as np
from lmfit.models import Gaussian2dModel, GaussianModel, LinearModel, PseudoVoigtModel, VoigtModel from lmfit.models import GaussianModel, LinearModel, PseudoVoigtModel, VoigtModel
from scipy.integrate import simpson, trapezoid from scipy.integrate import simpson, trapezoid
from .ccl_io import CCL_ANGLES from .ccl_io import CCL_ANGLES
@ -22,6 +22,8 @@ MAX_RANGE_GAP = {
"omega": 0.5, "omega": 0.5,
} }
MOTOR_POS_PRECISION = 0.01
AREA_METHODS = ("fit_area", "int_area") AREA_METHODS = ("fit_area", "int_area")
@ -47,21 +49,29 @@ def _parameters_match(scan1, scan2):
if zebra_mode != scan2["zebra_mode"]: if zebra_mode != scan2["zebra_mode"]:
return False return False
for param in ("ub", "temp", "mf", *(vars[0] for vars in CCL_ANGLES[zebra_mode])): for param in ("ub", *(vars[0] for vars in CCL_ANGLES[zebra_mode])):
if param.startswith("skip"): if param.startswith("skip"):
# ignore skip parameters, like the last angle in 'nb' zebra mode # ignore skip parameters, like the last angle in 'nb' zebra mode
continue continue
if param == scan1["scan_motor"] == scan2["scan_motor"]: if param == scan1["scan_motor"] == scan2["scan_motor"]:
# check if ranges of variable parameter overlap # check if ranges of variable parameter overlap
range1 = scan1[param] r1_start, r1_end = scan1[param][0], scan1[param][-1]
range2 = scan2[param] r2_start, r2_end = scan2[param][0], scan2[param][-1]
# support reversed ranges
if r1_start > r1_end:
r1_start, r1_end = r1_end, r1_start
if r2_start > r2_end:
r2_start, r2_end = r2_end, r2_start
# maximum gap between ranges of the scanning parameter (default 0) # maximum gap between ranges of the scanning parameter (default 0)
max_range_gap = MAX_RANGE_GAP.get(param, 0) max_range_gap = MAX_RANGE_GAP.get(param, 0)
if max(range1[0] - range2[-1], range2[0] - range1[-1]) > max_range_gap: if max(r1_start - r2_end, r2_start - r1_end) > max_range_gap:
return False return False
elif np.max(np.abs(scan1[param] - scan2[param])) > PARAM_PRECISIONS[param]: elif (
np.max(np.abs(np.median(scan1[param]) - np.median(scan2[param])))
> PARAM_PRECISIONS[param]
):
return False return False
return True return True
@ -78,7 +88,10 @@ def merge_datasets(dataset_into, dataset_from):
for scan_into in dataset_into: for scan_into in dataset_into:
for ind, scan_from in enumerate(dataset_from): for ind, scan_from in enumerate(dataset_from):
if _parameters_match(scan_into, scan_from) and not merged[ind]: if _parameters_match(scan_into, scan_from) and not merged[ind]:
merge_scans(scan_into, scan_from) if scan_into["counts"].ndim == 3:
merge_h5_scans(scan_into, scan_from)
else: # scan_into["counts"].ndim == 1
merge_scans(scan_into, scan_from)
merged[ind] = True merged[ind] = True
for scan_from in dataset_from: for scan_from in dataset_from:
@ -117,7 +130,7 @@ def merge_scans(scan_into, scan_from):
err_tmp = err_all[:1] err_tmp = err_all[:1]
num_tmp = np.array([1]) num_tmp = np.array([1])
for pos, val, err in zip(pos_all[1:], val_all[1:], err_all[1:]): for pos, val, err in zip(pos_all[1:], val_all[1:], err_all[1:]):
if pos - pos_tmp[-1] < 0.0005: if pos - pos_tmp[-1] < MOTOR_POS_PRECISION:
# the repeated motor position # the repeated motor position
val_tmp[-1] += val val_tmp[-1] += val
err_tmp[-1] += err err_tmp[-1] += err
@ -140,6 +153,70 @@ def merge_scans(scan_into, scan_from):
print(f'Merging scans: {scan_into["idx"]} ({fname1}) <-- {scan_from["idx"]} ({fname2})') print(f'Merging scans: {scan_into["idx"]} ({fname1}) <-- {scan_from["idx"]} ({fname2})')
def merge_h5_scans(scan_into, scan_from):
if "init_scan" not in scan_into:
scan_into["init_scan"] = scan_into.copy()
if "merged_scans" not in scan_into:
scan_into["merged_scans"] = []
for scan in scan_into["merged_scans"]:
if scan_from is scan:
print("Already merged scan")
return
scan_into["merged_scans"].append(scan_from)
scan_motor = scan_into["scan_motor"] # the same as scan_from["scan_motor"]
pos_all = [scan_into["init_scan"][scan_motor]]
val_all = [scan_into["init_scan"]["counts"]]
err_all = [scan_into["init_scan"]["counts_err"] ** 2]
for scan in scan_into["merged_scans"]:
pos_all.append(scan[scan_motor])
val_all.append(scan["counts"])
err_all.append(scan["counts_err"] ** 2)
pos_all = np.concatenate(pos_all)
val_all = np.concatenate(val_all)
err_all = np.concatenate(err_all)
sort_index = np.argsort(pos_all)
pos_all = pos_all[sort_index]
val_all = val_all[sort_index]
err_all = err_all[sort_index]
pos_tmp = [pos_all[0]]
val_tmp = [val_all[:1]]
err_tmp = [err_all[:1]]
num_tmp = [1]
for pos, val, err in zip(pos_all[1:], val_all[1:], err_all[1:]):
if pos - pos_tmp[-1] < MOTOR_POS_PRECISION:
# the repeated motor position
val_tmp[-1] += val
err_tmp[-1] += err
num_tmp[-1] += 1
else:
# a new motor position
pos_tmp.append(pos)
val_tmp.append(val[None, :])
err_tmp.append(err[None, :])
num_tmp.append(1)
pos_tmp = np.array(pos_tmp)
val_tmp = np.concatenate(val_tmp)
err_tmp = np.concatenate(err_tmp)
num_tmp = np.array(num_tmp)
scan_into[scan_motor] = pos_tmp
scan_into["counts"] = val_tmp / num_tmp[:, None, None]
scan_into["counts_err"] = np.sqrt(err_tmp) / num_tmp[:, None, None]
scan_from["export"] = False
fname1 = os.path.basename(scan_into["original_filename"])
fname2 = os.path.basename(scan_from["original_filename"])
print(f'Merging scans: {scan_into["idx"]} ({fname1}) <-- {scan_from["idx"]} ({fname2})')
def restore_scan(scan): def restore_scan(scan):
if "merged_scans" in scan: if "merged_scans" in scan:
for merged_scan in scan["merged_scans"]: for merged_scan in scan["merged_scans"]:
@ -259,31 +336,3 @@ def get_area(scan, area_method, lorentz):
area_s = np.abs(area_s * corr_factor) area_s = np.abs(area_s * corr_factor)
scan["area"] = (area_v, area_s) scan["area"] = (area_v, area_s)
def fit_event(scan, fr_from, fr_to, y_from, y_to, x_from, x_to):
data_roi = scan["data"][fr_from:fr_to, y_from:y_to, x_from:x_to]
model = GaussianModel()
fr = np.arange(fr_from, fr_to)
counts_per_fr = np.sum(data_roi, axis=(1, 2))
params = model.guess(counts_per_fr, fr)
result = model.fit(counts_per_fr, x=fr, params=params)
frC = result.params["center"].value
intensity = result.params["height"].value
counts_std = counts_per_fr.std()
counts_mean = counts_per_fr.mean()
snr = 0 if counts_std == 0 else counts_mean / counts_std
model = Gaussian2dModel()
xs, ys = np.meshgrid(np.arange(x_from, x_to), np.arange(y_from, y_to))
xs = xs.flatten()
ys = ys.flatten()
counts = np.sum(data_roi, axis=0).flatten()
params = model.guess(counts, xs, ys)
result = model.fit(counts, x=xs, y=ys, params=params)
xC = result.params["centerx"].value
yC = result.params["centery"].value
scan["fit"] = {"frame": frC, "x_pos": xC, "y_pos": yC, "intensity": intensity, "snr": snr}

View File

@ -1,10 +1,10 @@
import h5py import h5py
import numpy as np import numpy as np
from lmfit.models import Gaussian2dModel, GaussianModel
META_MATRIX = ("UB", )
META_MATRIX = ("UB") META_CELL = ("cell", )
META_CELL = ("cell") META_STR = ("name", )
META_STR = ("name")
def read_h5meta(filepath): def read_h5meta(filepath):
"""Open and parse content of a h5meta file. """Open and parse content of a h5meta file.
@ -68,75 +68,127 @@ def read_detector_data(filepath, cami_meta=None):
ndarray: A 3D array of data, omega, gamma, nu. ndarray: A 3D array of data, omega, gamma, nu.
""" """
with h5py.File(filepath, "r") as h5f: with h5py.File(filepath, "r") as h5f:
data = h5f["/entry1/area_detector2/data"][:] counts = h5f["/entry1/area_detector2/data"][:].astype(np.float64)
# reshape data to a correct shape (2006 issue) n, cols, rows = counts.shape
n, cols, rows = data.shape if "/entry1/experiment_identifier" in h5f: # old format
data = data.reshape(n, rows, cols) # reshape images (counts) to a correct shape (2006 issue)
counts = counts.reshape(n, rows, cols)
else:
counts = counts.swapaxes(1, 2)
det_data = {"data": data} scan = {"counts": counts, "counts_err": np.sqrt(np.maximum(counts, 1))}
det_data["original_filename"] = filepath scan["original_filename"] = filepath
scan["export"] = True
if "/entry1/zebra_mode" in h5f: if "/entry1/zebra_mode" in h5f:
det_data["zebra_mode"] = h5f["/entry1/zebra_mode"][0].decode() scan["zebra_mode"] = h5f["/entry1/zebra_mode"][0].decode()
else: else:
det_data["zebra_mode"] = "nb" scan["zebra_mode"] = "nb"
# overwrite zebra_mode from cami # overwrite zebra_mode from cami
if cami_meta is not None: if cami_meta is not None:
if "zebra_mode" in cami_meta: if "zebra_mode" in cami_meta:
det_data["zebra_mode"] = cami_meta["zebra_mode"][0] scan["zebra_mode"] = cami_meta["zebra_mode"][0]
# om, sometimes ph if "/entry1/control/Monitor" in h5f:
if det_data["zebra_mode"] == "nb": scan["monitor"] = h5f["/entry1/control/Monitor"][0]
det_data["omega"] = h5f["/entry1/area_detector2/rotation_angle"][:] else: # old path
else: # bi scan["monitor"] = h5f["/entry1/control/data"][0]
det_data["omega"] = h5f["/entry1/sample/rotation_angle"][:]
det_data["gamma"] = h5f["/entry1/ZEBRA/area_detector2/polar_angle"][:] # gammad scan["idx"] = 1
det_data["nu"] = h5f["/entry1/ZEBRA/area_detector2/tilt_angle"][:] # nud
det_data["ddist"] = h5f["/entry1/ZEBRA/area_detector2/distance"][:] if "/entry1/sample/rotation_angle" in h5f:
det_data["wave"] = h5f["/entry1/ZEBRA/monochromator/wavelength"][:] scan["omega"] = h5f["/entry1/sample/rotation_angle"][:]
det_data["chi"] = h5f["/entry1/sample/chi"][:] # ch else:
det_data["phi"] = h5f["/entry1/sample/phi"][:] # ph scan["omega"] = h5f["/entry1/area_detector2/rotation_angle"][:]
det_data["ub"] = h5f["/entry1/sample/UB"][:].reshape(3, 3) if len(scan["omega"]) == 1:
det_data["name"] = h5f["/entry1/sample/name"][0].decode() scan["omega"] = np.ones(n) * scan["omega"]
det_data["cell"] = h5f["/entry1/sample/cell"][:]
scan["gamma"] = h5f["/entry1/ZEBRA/area_detector2/polar_angle"][:]
scan["twotheta"] = h5f["/entry1/ZEBRA/area_detector2/polar_angle"][:]
if len(scan["gamma"]) == 1:
scan["gamma"] = np.ones(n) * scan["gamma"]
scan["twotheta"] = np.ones(n) * scan["twotheta"]
scan["nu"] = h5f["/entry1/ZEBRA/area_detector2/tilt_angle"][:1]
scan["ddist"] = h5f["/entry1/ZEBRA/area_detector2/distance"][:1]
scan["wave"] = h5f["/entry1/ZEBRA/monochromator/wavelength"][:1]
scan["chi"] = h5f["/entry1/sample/chi"][:]
if len(scan["chi"]) == 1:
scan["chi"] = np.ones(n) * scan["chi"]
scan["phi"] = h5f["/entry1/sample/phi"][:]
if len(scan["phi"]) == 1:
scan["phi"] = np.ones(n) * scan["phi"]
scan["ub"] = h5f["/entry1/sample/UB"][:].reshape(3, 3)
scan["name"] = h5f["/entry1/sample/name"][0].decode()
scan["cell"] = h5f["/entry1/sample/cell"][:]
if n == 1: if n == 1:
# a default motor for a single frame file # a default motor for a single frame file
det_data["scan_motor"] = "omega" scan["scan_motor"] = "omega"
else: else:
for var in ("omega", "gamma", "nu", "chi", "phi"): for var in ("omega", "gamma", "nu", "chi", "phi"):
if abs(det_data[var][0] - det_data[var][-1]) > 0.1: if abs(scan[var][0] - scan[var][-1]) > 0.1:
det_data["scan_motor"] = var scan["scan_motor"] = var
break break
else: else:
raise ValueError("No angles that vary") raise ValueError("No angles that vary")
scan["scan_motors"] = [scan["scan_motor"], ]
# optional parameters # optional parameters
if "/entry1/sample/magnetic_field" in h5f: if "/entry1/sample/magnetic_field" in h5f:
det_data["mf"] = h5f["/entry1/sample/magnetic_field"][:] scan["mf"] = h5f["/entry1/sample/magnetic_field"][:]
if "/entry1/sample/temperature" in h5f: if "/entry1/sample/temperature" in h5f:
det_data["temp"] = h5f["/entry1/sample/temperature"][:] scan["temp"] = h5f["/entry1/sample/temperature"][:]
elif "/entry1/sample/Ts/value" in h5f:
scan["temp"] = h5f["/entry1/sample/Ts/value"][:]
# overwrite metadata from .cami # overwrite metadata from .cami
if cami_meta is not None: if cami_meta is not None:
if "crystal" in cami_meta: if "crystal" in cami_meta:
cami_meta_crystal = cami_meta["crystal"] cami_meta_crystal = cami_meta["crystal"]
if "name" in cami_meta_crystal: if "name" in cami_meta_crystal:
det_data["name"] = cami_meta_crystal["name"] scan["name"] = cami_meta_crystal["name"]
if "UB" in cami_meta_crystal: if "UB" in cami_meta_crystal:
det_data["ub"] = cami_meta_crystal["UB"] scan["ub"] = cami_meta_crystal["UB"]
if "cell" in cami_meta_crystal: if "cell" in cami_meta_crystal:
det_data["cell"] = cami_meta_crystal["cell"] scan["cell"] = cami_meta_crystal["cell"]
if "lambda" in cami_meta_crystal: if "lambda" in cami_meta_crystal:
det_data["wave"] = cami_meta_crystal["lambda"] scan["wave"] = cami_meta_crystal["lambda"]
if "detector parameters" in cami_meta: if "detector parameters" in cami_meta:
cami_meta_detparam = cami_meta["detector parameters"] cami_meta_detparam = cami_meta["detector parameters"]
if "dist1" in cami_meta_detparam: if "dist2" in cami_meta_detparam:
det_data["ddist"] = cami_meta_detparam["dist1"] scan["ddist"] = cami_meta_detparam["dist2"]
return det_data return scan
def fit_event(scan, fr_from, fr_to, y_from, y_to, x_from, x_to):
data_roi = scan["counts"][fr_from:fr_to, y_from:y_to, x_from:x_to]
model = GaussianModel()
fr = np.arange(fr_from, fr_to)
counts_per_fr = np.sum(data_roi, axis=(1, 2))
params = model.guess(counts_per_fr, fr)
result = model.fit(counts_per_fr, x=fr, params=params)
frC = result.params["center"].value
intensity = result.params["height"].value
counts_std = counts_per_fr.std()
counts_mean = counts_per_fr.mean()
snr = 0 if counts_std == 0 else counts_mean / counts_std
model = Gaussian2dModel()
xs, ys = np.meshgrid(np.arange(x_from, x_to), np.arange(y_from, y_to))
xs = xs.flatten()
ys = ys.flatten()
counts = np.sum(data_roi, axis=0).flatten()
params = model.guess(counts, xs, ys)
result = model.fit(counts, x=xs, y=ys, params=params)
xC = result.params["centerx"].value
yC = result.params["centery"].value
scan["fit"] = {"frame": frC, "x_pos": xC, "y_pos": yC, "intensity": intensity, "snr": snr}

483
pyzebra/sxtal_refgen.py Normal file
View File

@ -0,0 +1,483 @@
import io
import os
import subprocess
import tempfile
from math import ceil, floor
import numpy as np
SXTAL_REFGEN_PATH = "/afs/psi.ch/project/sinq/rhel7/bin/Sxtal_Refgen"
_zebraBI_default_geom = """GEOM 2 Bissecting - HiCHI
BLFR z-up
DIST_UNITS mm
ANGL_UNITS deg
DET_TYPE Point ipsd 1
DIST_DET 488
DIM_XY 1.0 1.0 1 1
GAPS_DET 0 0
SETTING 1 0 0 0 1 0 0 0 1
NUM_ANG 4
ANG_LIMITS Min Max Offset
Gamma 0.0 128.0 0.00
Omega 0.0 64.0 0.00
Chi 80.0 211.0 0.00
Phi 0.0 360.0 0.00
DET_OFF 0 0 0
"""
_zebraNB_default_geom = """GEOM 3 Normal Beam
BLFR z-up
DIST_UNITS mm
ANGL_UNITS deg
DET_TYPE Point ipsd 1
DIST_DET 448
DIM_XY 1.0 1.0 1 1
GAPS_DET 0 0
SETTING 1 0 0 0 1 0 0 0 1
NUM_ANG 3
ANG_LIMITS Min Max Offset
Gamma 0.0 128.0 0.00
Omega -180.0 180.0 0.00
Nu -15.0 15.0 0.00
DET_OFF 0 0 0
"""
_zebra_default_cfl = """TITLE mymaterial
SPGR P 63 2 2
CELL 5.73 5.73 11.89 90 90 120
WAVE 1.383
UBMAT
0.000000 0.000000 0.084104
0.000000 0.174520 -0.000000
0.201518 0.100759 0.000000
INSTR zebra.geom
ORDER 1 2 3
ANGOR gamma
HLIM -25 25 -25 25 -25 25
SRANG 0.0 0.7
Mag_Structure
lattiCE P 1
kvect 0.0 0.0 0.0
magcent
symm x,y,z
msym u,v,w, 0.0
End_Mag_Structure
"""
def get_zebraBI_default_geom_file():
return io.StringIO(_zebraBI_default_geom)
def get_zebraNB_default_geom_file():
return io.StringIO(_zebraNB_default_geom)
def get_zebra_default_cfl_file():
return io.StringIO(_zebra_default_cfl)
def read_geom_file(fileobj):
ang_lims = dict()
for line in fileobj:
if "!" in line: # remove comments that start with ! sign
line, _ = line.split(sep="!", maxsplit=1)
if line.startswith("GEOM"):
_, val = line.split(maxsplit=1)
if val.startswith("2"):
ang_lims["geom"] = "bi"
else: # val.startswith("3")
ang_lims["geom"] = "nb"
elif line.startswith("ANG_LIMITS"):
# read angular limits
for line in fileobj:
if not line or line.isspace():
break
ang, ang_min, ang_max, ang_offset = line.split()
ang_lims[ang.lower()] = [ang_min, ang_max, ang_offset]
if "2theta" in ang_lims: # treat 2theta as gamma
ang_lims["gamma"] = ang_lims.pop("2theta")
return ang_lims
def export_geom_file(path, ang_lims, template=None):
if ang_lims["geom"] == "bi":
template_file = get_zebraBI_default_geom_file()
n_ang = 4
else: # ang_lims["geom"] == "nb"
template_file = get_zebraNB_default_geom_file()
n_ang = 3
if template is not None:
template_file = template
with open(path, "w") as out_file:
for line in template_file:
out_file.write(line)
if line.startswith("ANG_LIMITS"):
for _ in range(n_ang):
next_line = next(template_file)
ang, _, _, _ = next_line.split()
if ang == "2theta": # treat 2theta as gamma
ang = "Gamma"
vals = ang_lims[ang.lower()]
out_file.write(f"{'':<8}{ang:<10}{vals[0]:<10}{vals[1]:<10}{vals[2]:<10}\n")
def calc_ub_matrix(params):
with tempfile.TemporaryDirectory() as temp_dir:
cfl_file = os.path.join(temp_dir, "ub_matrix.cfl")
with open(cfl_file, "w") as fileobj:
for key, value in params.items():
fileobj.write(f"{key} {value}\n")
comp_proc = subprocess.run(
[SXTAL_REFGEN_PATH, cfl_file],
cwd=temp_dir,
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
)
print(" ".join(comp_proc.args))
print(comp_proc.stdout)
sfa_file = os.path.join(temp_dir, "ub_matrix.sfa")
ub_matrix = []
with open(sfa_file, "r") as fileobj:
for line in fileobj:
if "BL_M" in line: # next 3 lines contain the matrix
for _ in range(3):
next_line = next(fileobj)
*vals, _ = next_line.split(maxsplit=3)
ub_matrix.extend(vals)
return ub_matrix
def read_cfl_file(fileobj):
params = {
"SPGR": None,
"CELL": None,
"WAVE": None,
"UBMAT": None,
"HLIM": None,
"SRANG": None,
"lattiCE": None,
"kvect": None,
}
param_names = tuple(params)
for line in fileobj:
line = line.strip()
if "!" in line: # remove comments that start with ! sign
line, _ = line.split(sep="!", maxsplit=1)
if line.startswith(param_names):
if line.startswith("UBMAT"): # next 3 lines contain the matrix
param, val = "UBMAT", []
for _ in range(3):
next_line = next(fileobj).strip()
val.extend(next_line.split(maxsplit=2))
else:
param, val = line.split(maxsplit=1)
params[param] = val
return params
def read_cif_file(fileobj):
params = {"SPGR": None, "CELL": None, "ATOM": []}
cell_params = {
"_cell_length_a": None,
"_cell_length_b": None,
"_cell_length_c": None,
"_cell_angle_alpha": None,
"_cell_angle_beta": None,
"_cell_angle_gamma": None,
}
cell_param_names = tuple(cell_params)
atom_param_pos = {
"_atom_site_label": 0,
"_atom_site_type_symbol": None,
"_atom_site_fract_x": None,
"_atom_site_fract_y": None,
"_atom_site_fract_z": None,
"_atom_site_U_iso_or_equiv": None,
"_atom_site_occupancy": None,
}
atom_param_names = tuple(atom_param_pos)
for line in fileobj:
line = line.strip()
if line.startswith("_space_group_name_H-M_alt"):
_, val = line.split(maxsplit=1)
params["SPGR"] = val.strip("'")
elif line.startswith(cell_param_names):
param, val = line.split(maxsplit=1)
cell_params[param] = val
elif line.startswith("_atom_site_label"): # assume this is the start of atom data
for ind, line in enumerate(fileobj, start=1):
line = line.strip()
# read fields
if line.startswith("_atom_site"):
if line.startswith(atom_param_names):
atom_param_pos[line] = ind
continue
# read data till an empty line
if not line:
break
vals = line.split()
params["ATOM"].append(" ".join([vals[ind] for ind in atom_param_pos.values()]))
if None not in cell_params.values():
params["CELL"] = " ".join(cell_params.values())
return params
def export_cfl_file(path, params, template=None):
param_names = tuple(params)
if template is None:
template_file = get_zebra_default_cfl_file()
else:
template_file = template
atom_done = False
with open(path, "w") as out_file:
for line in template_file:
if line.startswith(param_names):
if line.startswith("UBMAT"): # only UBMAT values are not on the same line
out_file.write(line)
for i in range(3):
next(template_file)
out_file.write(" ".join(params["UBMAT"][3 * i : 3 * (i + 1)]) + "\n")
elif line.startswith("ATOM"):
if "ATOM" in params:
# replace all ATOM with values in params
while line.startswith("ATOM"):
line = next(template_file)
for atom_line in params["ATOM"]:
out_file.write(f"ATOM {atom_line}\n")
atom_done = True
else:
param, _ = line.split(maxsplit=1)
out_file.write(f"{param} {params[param]}\n")
elif line.startswith("INSTR"):
# replace it with a default name
out_file.write("INSTR zebra.geom\n")
else:
out_file.write(line)
# append ATOM data if it's present and a template did not contain it
if "ATOM" in params and not atom_done:
out_file.write("\n")
for atom_line in params["ATOM"]:
out_file.write(f"ATOM {atom_line}\n")
def sort_hkl_file_bi(file_in, file_out, priority, chunks):
with open(file_in) as fileobj:
file_in_data = fileobj.readlines()
data = np.genfromtxt(file_in, skip_header=3)
stt = data[:, 4]
omega = data[:, 5]
chi = data[:, 6]
phi = data[:, 7]
lines = file_in_data[3:]
lines_update = []
angles = {"2theta": stt, "omega": omega, "chi": chi, "phi": phi}
# Reverse flag
to_reverse = False
to_reverse_p2 = False
to_reverse_p3 = False
# Get indices within first priority
ang_p1 = angles[priority[0]]
begin_p1 = floor(min(ang_p1))
end_p1 = ceil(max(ang_p1))
delta_p1 = chunks[0]
for p1 in range(begin_p1, end_p1, delta_p1):
ind_p1 = [j for j, x in enumerate(ang_p1) if p1 <= x and x < p1 + delta_p1]
stt_new = [stt[x] for x in ind_p1]
omega_new = [omega[x] for x in ind_p1]
chi_new = [chi[x] for x in ind_p1]
phi_new = [phi[x] for x in ind_p1]
lines_new = [lines[x] for x in ind_p1]
angles_p2 = {"stt": stt_new, "omega": omega_new, "chi": chi_new, "phi": phi_new}
# Get indices for second priority
ang_p2 = angles_p2[priority[1]]
if len(ang_p2) > 0 and to_reverse_p2:
begin_p2 = ceil(max(ang_p2))
end_p2 = floor(min(ang_p2))
delta_p2 = -chunks[1]
elif len(ang_p2) > 0 and not to_reverse_p2:
end_p2 = ceil(max(ang_p2))
begin_p2 = floor(min(ang_p2))
delta_p2 = chunks[1]
else:
end_p2 = 0
begin_p2 = 0
delta_p2 = 1
to_reverse_p2 = not to_reverse_p2
for p2 in range(begin_p2, end_p2, delta_p2):
min_p2 = min([p2, p2 + delta_p2])
max_p2 = max([p2, p2 + delta_p2])
ind_p2 = [j for j, x in enumerate(ang_p2) if min_p2 <= x and x < max_p2]
stt_new2 = [stt_new[x] for x in ind_p2]
omega_new2 = [omega_new[x] for x in ind_p2]
chi_new2 = [chi_new[x] for x in ind_p2]
phi_new2 = [phi_new[x] for x in ind_p2]
lines_new2 = [lines_new[x] for x in ind_p2]
angles_p3 = {"stt": stt_new2, "omega": omega_new2, "chi": chi_new2, "phi": phi_new2}
# Get indices for third priority
ang_p3 = angles_p3[priority[2]]
if len(ang_p3) > 0 and to_reverse_p3:
begin_p3 = ceil(max(ang_p3)) + chunks[2]
end_p3 = floor(min(ang_p3)) - chunks[2]
delta_p3 = -chunks[2]
elif len(ang_p3) > 0 and not to_reverse_p3:
end_p3 = ceil(max(ang_p3)) + chunks[2]
begin_p3 = floor(min(ang_p3)) - chunks[2]
delta_p3 = chunks[2]
else:
end_p3 = 0
begin_p3 = 0
delta_p3 = 1
to_reverse_p3 = not to_reverse_p3
for p3 in range(begin_p3, end_p3, delta_p3):
min_p3 = min([p3, p3 + delta_p3])
max_p3 = max([p3, p3 + delta_p3])
ind_p3 = [j for j, x in enumerate(ang_p3) if min_p3 <= x and x < max_p3]
angle_new3 = [angles_p3[priority[3]][x] for x in ind_p3]
ind_final = [x for _, x in sorted(zip(angle_new3, ind_p3), reverse=to_reverse)]
to_reverse = not to_reverse
for i in ind_final:
lines_update.append(lines_new2[i])
with open(file_out, "w") as fileobj:
for _ in range(3):
fileobj.write(file_in_data.pop(0))
fileobj.writelines(lines_update)
def sort_hkl_file_nb(file_in, file_out, priority, chunks):
with open(file_in) as fileobj:
file_in_data = fileobj.readlines()
data = np.genfromtxt(file_in, skip_header=3)
gamma = data[:, 4]
omega = data[:, 5]
nu = data[:, 6]
lines = file_in_data[3:]
lines_update = []
angles = {"gamma": gamma, "omega": omega, "nu": nu}
to_reverse = False
to_reverse_p2 = False
# Get indices within first priority
ang_p1 = angles[priority[0]]
begin_p1 = floor(min(ang_p1))
end_p1 = ceil(max(ang_p1))
delta_p1 = chunks[0]
for p1 in range(begin_p1, end_p1, delta_p1):
ind_p1 = [j for j, x in enumerate(ang_p1) if p1 <= x and x < p1 + delta_p1]
# Get angles from within nu range
lines_new = [lines[x] for x in ind_p1]
gamma_new = [gamma[x] for x in ind_p1]
omega_new = [omega[x] for x in ind_p1]
nu_new = [nu[x] for x in ind_p1]
angles_p2 = {"gamma": gamma_new, "omega": omega_new, "nu": nu_new}
# Get indices for second priority
ang_p2 = angles_p2[priority[1]]
if len(gamma_new) > 0 and to_reverse_p2:
begin_p2 = ceil(max(ang_p2))
end_p2 = floor(min(ang_p2))
delta_p2 = -chunks[1]
elif len(gamma_new) > 0 and not to_reverse_p2:
end_p2 = ceil(max(ang_p2))
begin_p2 = floor(min(ang_p2))
delta_p2 = chunks[1]
else:
end_p2 = 0
begin_p2 = 0
delta_p2 = 1
to_reverse_p2 = not to_reverse_p2
for p2 in range(begin_p2, end_p2, delta_p2):
min_p2 = min([p2, p2 + delta_p2])
max_p2 = max([p2, p2 + delta_p2])
ind_p2 = [j for j, x in enumerate(ang_p2) if min_p2 <= x and x < max_p2]
angle_new2 = [angles_p2[priority[2]][x] for x in ind_p2]
ind_final = [x for _, x in sorted(zip(angle_new2, ind_p2), reverse=to_reverse)]
to_reverse = not to_reverse
for i in ind_final:
lines_update.append(lines_new[i])
with open(file_out, "w") as fileobj:
for _ in range(3):
fileobj.write(file_in_data.pop(0))
fileobj.writelines(lines_update)

View File

@ -1,20 +1,17 @@
import os import os
ZEBRA_PROPOSALS_PATHS = [ SINQ_PATH = "/afs/psi.ch/project/sinqdata"
f"/afs/psi.ch/project/sinqdata/{year}/zebra/" for year in (2016, 2017, 2018, 2020, 2021) ZEBRA_PROPOSALS_PATH = os.path.join(SINQ_PATH, "{year}/zebra/{proposal}")
]
def find_proposal_path(proposal): def find_proposal_path(proposal):
proposal = proposal.strip() for entry in os.scandir(SINQ_PATH):
if proposal: if entry.is_dir() and len(entry.name) == 4 and entry.name.isdigit():
for zebra_proposals_path in ZEBRA_PROPOSALS_PATHS: proposal_path = ZEBRA_PROPOSALS_PATH.format(year=entry.name, proposal=proposal)
proposal_path = os.path.join(zebra_proposals_path, proposal)
if os.path.isdir(proposal_path): if os.path.isdir(proposal_path):
# found it # found it
break break
else:
raise ValueError(f"Can not find data for proposal '{proposal}'.")
else: else:
proposal_path = "" raise ValueError(f"Can not find data for proposal '{proposal}'.")
return proposal_path return proposal_path

View File

@ -372,6 +372,16 @@ def ang2hkl(wave, ddist, gammad, om, ch, ph, nud, ub, x, y):
return hkl return hkl
def ang2hkl_1d(wave, ddist, ga, om, ch, ph, nu, ub):
"""Calculate hkl-indices of a reflection from its position (angles) at the 1d-detector
"""
z1 = z1frmd(wave, ga, om, ch, ph, nu)
ubinv = np.linalg.inv(ub)
hkl = ubinv @ z1
return hkl
def ang_proc(wave, ddist, gammad, om, ch, ph, nud, x, y): def ang_proc(wave, ddist, gammad, om, ch, ph, nud, x, y):
"""Utility function to calculate ch, ph, ga, om """Utility function to calculate ch, ph, ga, om
""" """