49 Commits
0.5.1 ... 0.6.5

Author SHA1 Message Date
10bcabd7f9 Updating for version 0.6.5 2022-01-21 10:25:01 +01:00
bee8263184 Use dist2 in cami for detector distance 2022-01-21 10:24:30 +01:00
98a76ebf63 Updating for version 0.6.4 2022-01-03 17:34:23 +01:00
12ba0b291b Update overview map plotting 2022-01-03 17:33:37 +01:00
a719f10f4f Replace interp2d with griddata 2022-01-03 16:12:48 +01:00
aaff6032c8 Allow single frame in hdf_viewer 2022-01-03 10:25:19 +01:00
3fb3fe573b Handle single frame h5 files 2021-12-23 20:05:58 +01:00
1687337f26 Report in user output if reading h5 file fails 2021-12-23 17:41:15 +01:00
53ceac21aa Suppress RuntimeWarning in interp2d 2021-12-23 15:37:05 +01:00
741a09819c Fix unpacking from enumerate 2021-12-20 13:10:05 +01:00
3c619713d5 Updating for version 0.6.3 2021-12-07 09:49:09 +01:00
3d5a4ed6aa Fix error calculation for 0-count data points
Fix #47
2021-12-01 12:18:24 +01:00
b2129805dc Updating for version 0.6.2 2021-11-22 16:00:44 +01:00
92765b5665 Fix error calculation when merging data
Fix #46
2021-11-22 14:23:50 +01:00
328b71e058 Updating for version 0.6.1 2021-11-19 16:20:19 +01:00
11ab8485bc Avoid crush on failed interp2d
There is often not enough data for 2d interpolation at intermediate data
analysis steps
2021-11-16 18:55:30 +01:00
4734b3e50f Do not export data without a specified parameter 2021-11-15 16:32:16 +01:00
dfeeed284b Updating for version 0.6.0 2021-11-15 09:21:26 +01:00
9adf83ec74 Minor visual tweaks 2021-11-12 17:04:13 +01:00
a299449209 Add ccl_compare panel
Fix #41
2021-11-12 16:47:01 +01:00
45a81aa632 Set chi and phi to None for peaks in nb geometry
Fix #44
2021-11-10 15:59:54 +01:00
3926e8de39 Add gamma column
Fix #43
2021-11-10 14:44:10 +01:00
d2e2a2c7fd Do not reset Parameter on opening new data
Fix #45
2021-11-09 17:49:08 +01:00
3934dcdd07 Update overview plot on parameter change
For #45
2021-11-09 17:12:15 +01:00
4c8037af5c Convenience fix for restoring chain-merged scans 2021-11-09 16:39:12 +01:00
e29b4e7da8 Add Restore and Merge buttons to param study
For #45
2021-11-09 16:38:16 +01:00
7189ee8196 Fix gamma and nu axes
For #44
2021-11-09 15:04:23 +01:00
be8417856a Open hdf file on file selection
For #44
2021-11-09 11:00:47 +01:00
8ba062064a Rename column (twotheta -> 2theta)
For #43
2021-11-08 16:39:03 +01:00
6557b2f3a4 Average counts per scan position upon scan merging
Fix #42
2021-11-08 16:04:03 +01:00
7dcd20198f Do not set area_v to nan if the fit is not good
For #42
2021-11-08 13:52:08 +01:00
13a6ff285a Check for scan motors upon dataset merging 2021-10-22 16:11:24 +02:00
09b6e4fdcf Skip unreadable files
For #41
2021-10-22 15:36:42 +02:00
e7780a2405 Add gamma and nu axes
For #41
2021-10-20 15:27:47 +02:00
e8b85bcea3 Add extra angle columns for scan center
For #41
2021-10-20 10:39:03 +02:00
2482746f14 Fix for FileInput props not updating simultaneously 2021-10-20 09:28:03 +02:00
3986b8173f Use scan_motor instead of "omega" 2021-10-20 09:15:01 +02:00
16966b6e3e Fix export flag change
For #41
2021-10-20 09:12:04 +02:00
e9d3fcc41a Fix lmfit >=1.0.2
Needed for two-dimensional Gaussian model
2021-10-19 18:18:38 +02:00
506d70a913 Add Open New button to panel_hdf_viewer
For #41
2021-10-19 18:07:20 +02:00
fc4e9c12cf Calculate angles in the detector center
For #41
2021-10-19 17:33:19 +02:00
c5faa0a55a Update button labels
For #41
2021-10-19 14:51:35 +02:00
c9922bb0cb Reuse fit_event in panel_hdf_viewer 2021-10-19 14:50:56 +02:00
813270d6f8 Refactor fit_event 2021-10-19 10:59:06 +02:00
cf2f8435e7 Allow debug of external libs 2021-10-18 13:28:12 +02:00
380abfb102 Add support for direct hdf file upload
Fix #38
2021-10-08 16:42:56 +02:00
c8502a3b93 Updating for version 0.5.2 2021-10-05 22:04:36 +02:00
b84fc632aa Restrict DataTable column editing
Setting editor to CellEditor makes the column read-only for user even
if "editable=True" for the entire DataTable
2021-10-05 16:34:15 +02:00
3acd57adb9 Abort fit if there is no data in range 2021-10-05 16:34:05 +02:00
13 changed files with 1388 additions and 343 deletions

1
.vscode/launch.json vendored
View File

@ -8,6 +8,7 @@
"program": "${workspaceFolder}/pyzebra/app/cli.py",
"console": "internalConsole",
"env": {},
"justMyCode": false,
},
]
}

View File

@ -24,7 +24,7 @@ requirements:
- h5py
- bokeh =2.4
- numba
- lmfit
- lmfit >=1.0.2
about:

View File

@ -5,4 +5,4 @@ from pyzebra.h5 import *
from pyzebra.utils import *
from pyzebra.xtal import *
__version__ = "0.5.1"
__version__ = "0.6.5"

View File

@ -8,6 +8,7 @@ from bokeh.layouts import column, row
from bokeh.models import Button, Panel, Tabs, TextAreaInput, TextInput
import panel_ccl_integrate
import panel_ccl_compare
import panel_hdf_anatric
import panel_hdf_param_study
import panel_hdf_viewer
@ -55,6 +56,7 @@ doc.add_root(
panel_hdf_viewer.create(),
panel_hdf_anatric.create(),
panel_ccl_integrate.create(),
panel_ccl_compare.create(),
panel_param_study.create(),
panel_hdf_param_study.create(),
panel_spind.create(),

View File

@ -0,0 +1,718 @@
import base64
import io
import os
import tempfile
import types
import numpy as np
from bokeh.io import curdoc
from bokeh.layouts import column, row
from bokeh.models import (
BasicTicker,
Button,
CellEditor,
CheckboxEditor,
CheckboxGroup,
ColumnDataSource,
CustomJS,
DataRange1d,
DataTable,
Div,
Dropdown,
FileInput,
Grid,
Legend,
Line,
LinearAxis,
MultiLine,
MultiSelect,
NumberEditor,
Panel,
PanTool,
Plot,
RadioGroup,
ResetTool,
Scatter,
Select,
Spacer,
Span,
Spinner,
TableColumn,
TextAreaInput,
WheelZoomTool,
Whisker,
)
import pyzebra
from pyzebra.ccl_io import EXPORT_TARGETS
from pyzebra.ccl_process import AREA_METHODS
javaScript = """
let j = 0;
for (let i = 0; i < js_data.data['fname'].length; i++) {
if (js_data.data['content'][i] === "") continue;
setTimeout(function() {
const blob = new Blob([js_data.data['content'][i]], {type: 'text/plain'})
const link = document.createElement('a');
document.body.appendChild(link);
const url = window.URL.createObjectURL(blob);
link.href = url;
link.download = js_data.data['fname'][i] + js_data.data['ext'][i];
link.click();
window.URL.revokeObjectURL(url);
document.body.removeChild(link);
}, 100 * j)
j++;
}
"""
def create():
doc = curdoc()
det_data1 = []
det_data2 = []
fit_params = {}
js_data = ColumnDataSource(data=dict(content=["", ""], fname=["", ""], ext=["", ""]))
def file_select_update_for_proposal():
proposal_path = proposal_textinput.name
if proposal_path:
file_list = []
for file in os.listdir(proposal_path):
if file.endswith((".ccl")):
file_list.append((os.path.join(proposal_path, file), file))
file_select.options = file_list
file_open_button.disabled = False
else:
file_select.options = []
file_open_button.disabled = True
doc.add_periodic_callback(file_select_update_for_proposal, 5000)
def proposal_textinput_callback(_attr, _old, _new):
file_select_update_for_proposal()
proposal_textinput = doc.proposal_textinput
proposal_textinput.on_change("name", proposal_textinput_callback)
def _init_datatable():
# det_data2 should have the same metadata to det_data1
scan_list = [s["idx"] for s in det_data1]
hkl = [f'{s["h"]} {s["k"]} {s["l"]}' for s in det_data1]
export = [s["export"] for s in det_data1]
twotheta = [np.median(s["twotheta"]) if "twotheta" in s else None for s in det_data1]
gamma = [np.median(s["gamma"]) if "gamma" in s else None for s in det_data1]
omega = [np.median(s["omega"]) if "omega" in s else None for s in det_data1]
chi = [np.median(s["chi"]) if "chi" in s else None for s in det_data1]
phi = [np.median(s["phi"]) if "phi" in s else None for s in det_data1]
nu = [np.median(s["nu"]) if "nu" in s else None for s in det_data1]
scan_table_source.data.update(
scan=scan_list,
hkl=hkl,
fit=[0] * len(scan_list),
export=export,
twotheta=twotheta,
gamma=gamma,
omega=omega,
chi=chi,
phi=phi,
nu=nu,
)
scan_table_source.selected.indices = []
scan_table_source.selected.indices = [0]
merge_options = [(str(i), f"{i} ({idx})") for i, idx in enumerate(scan_list)]
merge_from_select.options = merge_options
merge_from_select.value = merge_options[0][0]
file_select = MultiSelect(title="Select 2 .ccl files:", width=210, height=250)
def file_open_button_callback():
if len(file_select.value) != 2:
print("WARNING: Select exactly 2 .ccl files.")
return
new_data1 = []
new_data2 = []
for ind, f_path in enumerate(file_select.value):
with open(f_path) as file:
f_name = os.path.basename(f_path)
base, ext = os.path.splitext(f_name)
try:
file_data = pyzebra.parse_1D(file, ext)
except:
print(f"Error loading {f_name}")
return
pyzebra.normalize_dataset(file_data, monitor_spinner.value)
pyzebra.merge_duplicates(file_data)
if ind == 0:
js_data.data.update(fname=[base, base])
new_data1 = file_data
else: # ind = 1
new_data2 = file_data
# ignore extra scans at the end of the longest of the two files
min_len = min(len(new_data1), len(new_data2))
new_data1 = new_data1[:min_len]
new_data2 = new_data2[:min_len]
nonlocal det_data1, det_data2
det_data1 = new_data1
det_data2 = new_data2
_init_datatable()
file_open_button = Button(label="Open New", width=100, disabled=True)
file_open_button.on_click(file_open_button_callback)
def upload_button_callback(_attr, _old, _new):
if len(upload_button.filename) != 2:
print("WARNING: Upload exactly 2 .ccl files.")
return
new_data1 = []
new_data2 = []
for ind, (f_str, f_name) in enumerate(zip(upload_button.value, upload_button.filename)):
with io.StringIO(base64.b64decode(f_str).decode()) as file:
base, ext = os.path.splitext(f_name)
try:
file_data = pyzebra.parse_1D(file, ext)
except:
print(f"Error loading {f_name}")
return
pyzebra.normalize_dataset(file_data, monitor_spinner.value)
pyzebra.merge_duplicates(file_data)
if ind == 0:
js_data.data.update(fname=[base, base])
new_data1 = file_data
else: # ind = 1
new_data2 = file_data
# ignore extra scans at the end of the longest of the two files
min_len = min(len(new_data1), len(new_data2))
new_data1 = new_data1[:min_len]
new_data2 = new_data2[:min_len]
nonlocal det_data1, det_data2
det_data1 = new_data1
det_data2 = new_data2
_init_datatable()
upload_div = Div(text="or upload 2 .ccl files:", margin=(5, 5, 0, 5))
upload_button = FileInput(accept=".ccl", multiple=True, width=200)
# for on_change("value", ...) or on_change("filename", ...),
# see https://github.com/bokeh/bokeh/issues/11461
upload_button.on_change("filename", upload_button_callback)
def monitor_spinner_callback(_attr, old, new):
if det_data1 and det_data2:
pyzebra.normalize_dataset(det_data1, new)
pyzebra.normalize_dataset(det_data2, new)
_update_plot()
monitor_spinner = Spinner(title="Monitor:", mode="int", value=100_000, low=1, width=145)
monitor_spinner.on_change("value", monitor_spinner_callback)
def _update_table():
fit_ok = [(1 if "fit" in scan else 0) for scan in det_data1]
export = [scan["export"] for scan in det_data1]
scan_table_source.data.update(fit=fit_ok, export=export)
def _update_plot():
plot_scatter_source = [plot_scatter1_source, plot_scatter2_source]
plot_fit_source = [plot_fit1_source, plot_fit2_source]
plot_bkg_source = [plot_bkg1_source, plot_bkg2_source]
plot_peak_source = [plot_peak1_source, plot_peak2_source]
fit_output = ""
for ind, scan in enumerate(_get_selected_scan()):
scatter_source = plot_scatter_source[ind]
fit_source = plot_fit_source[ind]
bkg_source = plot_bkg_source[ind]
peak_source = plot_peak_source[ind]
scan_motor = scan["scan_motor"]
y = scan["counts"]
y_err = scan["counts_err"]
x = scan[scan_motor]
plot.axis[0].axis_label = scan_motor
scatter_source.data.update(x=x, y=y, y_upper=y + y_err, y_lower=y - y_err)
fit = scan.get("fit")
if fit is not None:
x_fit = np.linspace(x[0], x[-1], 100)
fit_source.data.update(x=x_fit, y=fit.eval(x=x_fit))
x_bkg = []
y_bkg = []
xs_peak = []
ys_peak = []
comps = fit.eval_components(x=x_fit)
for i, model in enumerate(fit_params):
if "linear" in model:
x_bkg = x_fit
y_bkg = comps[f"f{i}_"]
elif any(val in model for val in ("gaussian", "voigt", "pvoigt")):
xs_peak.append(x_fit)
ys_peak.append(comps[f"f{i}_"])
bkg_source.data.update(x=x_bkg, y=y_bkg)
peak_source.data.update(xs=xs_peak, ys=ys_peak)
if fit_output:
fit_output = fit_output + "\n\n"
fit_output = fit_output + fit.fit_report()
else:
fit_source.data.update(x=[], y=[])
bkg_source.data.update(x=[], y=[])
peak_source.data.update(xs=[], ys=[])
fit_output_textinput.value = fit_output
# Main plot
plot = Plot(
x_range=DataRange1d(),
y_range=DataRange1d(only_visible=True),
plot_height=470,
plot_width=700,
)
plot.add_layout(LinearAxis(axis_label="Counts"), place="left")
plot.add_layout(LinearAxis(axis_label="Scan motor"), place="below")
plot.add_layout(Grid(dimension=0, ticker=BasicTicker()))
plot.add_layout(Grid(dimension=1, ticker=BasicTicker()))
plot_scatter1_source = ColumnDataSource(dict(x=[0], y=[0], y_upper=[0], y_lower=[0]))
plot_scatter1 = plot.add_glyph(
plot_scatter1_source, Scatter(x="x", y="y", line_color="steelblue", fill_color="steelblue")
)
plot.add_layout(
Whisker(source=plot_scatter1_source, base="x", upper="y_upper", lower="y_lower")
)
plot_scatter2_source = ColumnDataSource(dict(x=[0], y=[0], y_upper=[0], y_lower=[0]))
plot_scatter2 = plot.add_glyph(
plot_scatter2_source, Scatter(x="x", y="y", line_color="firebrick", fill_color="firebrick")
)
plot.add_layout(
Whisker(source=plot_scatter2_source, base="x", upper="y_upper", lower="y_lower")
)
plot_fit1_source = ColumnDataSource(dict(x=[0], y=[0]))
plot_fit1 = plot.add_glyph(plot_fit1_source, Line(x="x", y="y"))
plot_fit2_source = ColumnDataSource(dict(x=[0], y=[0]))
plot_fit2 = plot.add_glyph(plot_fit2_source, Line(x="x", y="y"))
plot_bkg1_source = ColumnDataSource(dict(x=[0], y=[0]))
plot_bkg1 = plot.add_glyph(
plot_bkg1_source, Line(x="x", y="y", line_color="steelblue", line_dash="dashed")
)
plot_bkg2_source = ColumnDataSource(dict(x=[0], y=[0]))
plot_bkg2 = plot.add_glyph(
plot_bkg2_source, Line(x="x", y="y", line_color="firebrick", line_dash="dashed")
)
plot_peak1_source = ColumnDataSource(dict(xs=[[0]], ys=[[0]]))
plot_peak1 = plot.add_glyph(
plot_peak1_source, MultiLine(xs="xs", ys="ys", line_color="steelblue", line_dash="dashed")
)
plot_peak2_source = ColumnDataSource(dict(xs=[[0]], ys=[[0]]))
plot_peak2 = plot.add_glyph(
plot_peak2_source, MultiLine(xs="xs", ys="ys", line_color="firebrick", line_dash="dashed")
)
fit_from_span = Span(location=None, dimension="height", line_dash="dashed")
plot.add_layout(fit_from_span)
fit_to_span = Span(location=None, dimension="height", line_dash="dashed")
plot.add_layout(fit_to_span)
plot.add_layout(
Legend(
items=[
("data 1", [plot_scatter1]),
("data 2", [plot_scatter2]),
("best fit 1", [plot_fit1]),
("best fit 2", [plot_fit2]),
("peak 1", [plot_peak1]),
("peak 2", [plot_peak2]),
("linear 1", [plot_bkg1]),
("linear 2", [plot_bkg2]),
],
location="top_left",
click_policy="hide",
)
)
plot.add_tools(PanTool(), WheelZoomTool(), ResetTool())
plot.toolbar.logo = None
# Scan select
def scan_table_select_callback(_attr, old, new):
if not new:
# skip empty selections
return
# Avoid selection of multiple indicies (via Shift+Click or Ctrl+Click)
if len(new) > 1:
# drop selection to the previous one
scan_table_source.selected.indices = old
return
if len(old) > 1:
# skip unnecessary update caused by selection drop
return
_update_plot()
def scan_table_source_callback(_attr, _old, new):
# unfortunately, we don't know if the change comes from data update or user input
# also `old` and `new` are the same for non-scalars
for scan1, scan2, export in zip(det_data1, det_data2, new["export"]):
scan1["export"] = export
scan2["export"] = export
_update_preview()
scan_table_source = ColumnDataSource(
dict(
scan=[],
hkl=[],
fit=[],
export=[],
twotheta=[],
gamma=[],
omega=[],
chi=[],
phi=[],
nu=[],
)
)
scan_table_source.on_change("data", scan_table_source_callback)
scan_table_source.selected.on_change("indices", scan_table_select_callback)
scan_table = DataTable(
source=scan_table_source,
columns=[
TableColumn(field="scan", title="Scan", editor=CellEditor(), width=50),
TableColumn(field="hkl", title="hkl", editor=CellEditor(), width=100),
TableColumn(field="fit", title="Fit", editor=CellEditor(), width=50),
TableColumn(field="export", title="Export", editor=CheckboxEditor(), width=50),
TableColumn(field="twotheta", title="2theta", editor=CellEditor(), width=50),
TableColumn(field="gamma", title="gamma", editor=CellEditor(), width=50),
TableColumn(field="omega", title="omega", editor=CellEditor(), width=50),
TableColumn(field="chi", title="chi", editor=CellEditor(), width=50),
TableColumn(field="phi", title="phi", editor=CellEditor(), width=50),
TableColumn(field="nu", title="nu", editor=CellEditor(), width=50),
],
width=310, # +60 because of the index column, but excluding twotheta onwards
height=350,
autosize_mode="none",
editable=True,
)
def _get_selected_scan():
ind = scan_table_source.selected.indices[0]
return det_data1[ind], det_data2[ind]
merge_from_select = Select(title="scan:", width=145)
def merge_button_callback():
scan_into1, scan_into2 = _get_selected_scan()
scan_from1 = det_data1[int(merge_from_select.value)]
scan_from2 = det_data2[int(merge_from_select.value)]
if scan_into1 is scan_from1:
print("WARNING: Selected scans for merging are identical")
return
pyzebra.merge_scans(scan_into1, scan_from1)
pyzebra.merge_scans(scan_into2, scan_from2)
_update_table()
_update_plot()
merge_button = Button(label="Merge into current", width=145)
merge_button.on_click(merge_button_callback)
def restore_button_callback():
scan1, scan2 = _get_selected_scan()
pyzebra.restore_scan(scan1)
pyzebra.restore_scan(scan2)
_update_table()
_update_plot()
restore_button = Button(label="Restore scan", width=145)
restore_button.on_click(restore_button_callback)
def fit_from_spinner_callback(_attr, _old, new):
fit_from_span.location = new
fit_from_spinner = Spinner(title="Fit from:", width=145)
fit_from_spinner.on_change("value", fit_from_spinner_callback)
def fit_to_spinner_callback(_attr, _old, new):
fit_to_span.location = new
fit_to_spinner = Spinner(title="to:", width=145)
fit_to_spinner.on_change("value", fit_to_spinner_callback)
def fitparams_add_dropdown_callback(click):
# bokeh requires (str, str) for MultiSelect options
new_tag = f"{click.item}-{fitparams_select.tags[0]}"
fitparams_select.options.append((new_tag, click.item))
fit_params[new_tag] = fitparams_factory(click.item)
fitparams_select.tags[0] += 1
fitparams_add_dropdown = Dropdown(
label="Add fit function",
menu=[
("Linear", "linear"),
("Gaussian", "gaussian"),
("Voigt", "voigt"),
("Pseudo Voigt", "pvoigt"),
# ("Pseudo Voigt1", "pseudovoigt1"),
],
width=145,
)
fitparams_add_dropdown.on_click(fitparams_add_dropdown_callback)
def fitparams_select_callback(_attr, old, new):
# Avoid selection of multiple indicies (via Shift+Click or Ctrl+Click)
if len(new) > 1:
# drop selection to the previous one
fitparams_select.value = old
return
if len(old) > 1:
# skip unnecessary update caused by selection drop
return
if new:
fitparams_table_source.data.update(fit_params[new[0]])
else:
fitparams_table_source.data.update(dict(param=[], value=[], vary=[], min=[], max=[]))
fitparams_select = MultiSelect(options=[], height=120, width=145)
fitparams_select.tags = [0]
fitparams_select.on_change("value", fitparams_select_callback)
def fitparams_remove_button_callback():
if fitparams_select.value:
sel_tag = fitparams_select.value[0]
del fit_params[sel_tag]
for elem in fitparams_select.options:
if elem[0] == sel_tag:
fitparams_select.options.remove(elem)
break
fitparams_select.value = []
fitparams_remove_button = Button(label="Remove fit function", width=145)
fitparams_remove_button.on_click(fitparams_remove_button_callback)
def fitparams_factory(function):
if function == "linear":
params = ["slope", "intercept"]
elif function == "gaussian":
params = ["amplitude", "center", "sigma"]
elif function == "voigt":
params = ["amplitude", "center", "sigma", "gamma"]
elif function == "pvoigt":
params = ["amplitude", "center", "sigma", "fraction"]
elif function == "pseudovoigt1":
params = ["amplitude", "center", "g_sigma", "l_sigma", "fraction"]
else:
raise ValueError("Unknown fit function")
n = len(params)
fitparams = dict(
param=params, value=[None] * n, vary=[True] * n, min=[None] * n, max=[None] * n,
)
if function == "linear":
fitparams["value"] = [0, 1]
fitparams["vary"] = [False, True]
fitparams["min"] = [None, 0]
elif function == "gaussian":
fitparams["min"] = [0, None, None]
return fitparams
fitparams_table_source = ColumnDataSource(dict(param=[], value=[], vary=[], min=[], max=[]))
fitparams_table = DataTable(
source=fitparams_table_source,
columns=[
TableColumn(field="param", title="Parameter", editor=CellEditor()),
TableColumn(field="value", title="Value", editor=NumberEditor()),
TableColumn(field="vary", title="Vary", editor=CheckboxEditor()),
TableColumn(field="min", title="Min", editor=NumberEditor()),
TableColumn(field="max", title="Max", editor=NumberEditor()),
],
height=200,
width=350,
index_position=None,
editable=True,
auto_edit=True,
)
# start with `background` and `gauss` fit functions added
fitparams_add_dropdown_callback(types.SimpleNamespace(item="linear"))
fitparams_add_dropdown_callback(types.SimpleNamespace(item="gaussian"))
fitparams_select.value = ["gaussian-1"] # add selection to gauss
fit_output_textinput = TextAreaInput(title="Fit results:", width=750, height=200)
def proc_all_button_callback():
for scan in [*det_data1, *det_data2]:
if scan["export"]:
pyzebra.fit_scan(
scan, fit_params, fit_from=fit_from_spinner.value, fit_to=fit_to_spinner.value
)
pyzebra.get_area(
scan,
area_method=AREA_METHODS[area_method_radiobutton.active],
lorentz=lorentz_checkbox.active,
)
_update_plot()
_update_table()
proc_all_button = Button(label="Process All", button_type="primary", width=145)
proc_all_button.on_click(proc_all_button_callback)
def proc_button_callback():
for scan in _get_selected_scan():
pyzebra.fit_scan(
scan, fit_params, fit_from=fit_from_spinner.value, fit_to=fit_to_spinner.value
)
pyzebra.get_area(
scan,
area_method=AREA_METHODS[area_method_radiobutton.active],
lorentz=lorentz_checkbox.active,
)
_update_plot()
_update_table()
proc_button = Button(label="Process Current", width=145)
proc_button.on_click(proc_button_callback)
area_method_div = Div(text="Intensity:", margin=(5, 5, 0, 5))
area_method_radiobutton = RadioGroup(labels=["Function", "Area"], active=0, width=145)
intensity_diff_div = Div(text="Intensity difference:", margin=(5, 5, 0, 5))
intensity_diff_radiobutton = RadioGroup(
labels=["file1 - file2", "file2 - file1"], active=0, width=145
)
lorentz_checkbox = CheckboxGroup(labels=["Lorentz Correction"], width=145, margin=(13, 5, 5, 5))
export_preview_textinput = TextAreaInput(title="Export file(s) preview:", width=500, height=400)
def _update_preview():
with tempfile.TemporaryDirectory() as temp_dir:
temp_file = temp_dir + "/temp"
export_data1 = []
export_data2 = []
for scan1, scan2 in zip(det_data1, det_data2):
if scan1["export"]:
export_data1.append(scan1)
export_data2.append(scan2)
if intensity_diff_radiobutton.active:
export_data1, export_data2 = export_data2, export_data1
pyzebra.export_ccl_compare(
export_data1,
export_data2,
temp_file,
export_target_select.value,
hkl_precision=int(hkl_precision_select.value),
)
exported_content = ""
file_content = []
for ext in EXPORT_TARGETS[export_target_select.value]:
fname = temp_file + ext
if os.path.isfile(fname):
with open(fname) as f:
content = f.read()
exported_content += f"{ext} file:\n" + content
else:
content = ""
file_content.append(content)
js_data.data.update(content=file_content)
export_preview_textinput.value = exported_content
def export_target_select_callback(_attr, _old, new):
js_data.data.update(ext=EXPORT_TARGETS[new])
_update_preview()
export_target_select = Select(
title="Export target:", options=list(EXPORT_TARGETS.keys()), value="fullprof", width=80
)
export_target_select.on_change("value", export_target_select_callback)
js_data.data.update(ext=EXPORT_TARGETS[export_target_select.value])
def hkl_precision_select_callback(_attr, _old, _new):
_update_preview()
hkl_precision_select = Select(
title="hkl precision:", options=["2", "3", "4"], value="2", width=80
)
hkl_precision_select.on_change("value", hkl_precision_select_callback)
save_button = Button(label="Download File(s)", button_type="success", width=200)
save_button.js_on_click(CustomJS(args={"js_data": js_data}, code=javaScript))
fitpeak_controls = row(
column(fitparams_add_dropdown, fitparams_select, fitparams_remove_button),
fitparams_table,
Spacer(width=20),
column(
fit_from_spinner,
lorentz_checkbox,
area_method_div,
area_method_radiobutton,
intensity_diff_div,
intensity_diff_radiobutton,
),
column(fit_to_spinner, proc_button, proc_all_button),
)
scan_layout = column(
scan_table,
row(monitor_spinner, column(Spacer(height=19), restore_button)),
row(column(Spacer(height=19), merge_button), merge_from_select),
)
import_layout = column(file_select, file_open_button, upload_div, upload_button)
export_layout = column(
export_preview_textinput,
row(
export_target_select, hkl_precision_select, column(Spacer(height=19), row(save_button))
),
)
tab_layout = column(
row(import_layout, scan_layout, plot, Spacer(width=30), export_layout),
row(fitpeak_controls, fit_output_textinput),
)
return Panel(child=tab_layout, title="ccl compare")

View File

@ -10,6 +10,7 @@ from bokeh.layouts import column, row
from bokeh.models import (
BasicTicker,
Button,
CellEditor,
CheckboxEditor,
CheckboxGroup,
ColumnDataSource,
@ -71,7 +72,7 @@ for (let i = 0; i < js_data.data['fname'].length; i++) {
def create():
doc = curdoc()
det_data = {}
det_data = []
fit_params = {}
js_data = ColumnDataSource(data=dict(content=["", ""], fname=["", ""], ext=["", ""]))
@ -101,9 +102,26 @@ def create():
def _init_datatable():
scan_list = [s["idx"] for s in det_data]
hkl = [f'{s["h"]} {s["k"]} {s["l"]}' for s in det_data]
export = [s.get("active", True) for s in det_data]
export = [s["export"] for s in det_data]
twotheta = [np.median(s["twotheta"]) if "twotheta" in s else None for s in det_data]
gamma = [np.median(s["gamma"]) if "gamma" in s else None for s in det_data]
omega = [np.median(s["omega"]) if "omega" in s else None for s in det_data]
chi = [np.median(s["chi"]) if "chi" in s else None for s in det_data]
phi = [np.median(s["phi"]) if "phi" in s else None for s in det_data]
nu = [np.median(s["nu"]) if "nu" in s else None for s in det_data]
scan_table_source.data.update(
scan=scan_list, hkl=hkl, fit=[0] * len(scan_list), export=export,
scan=scan_list,
hkl=hkl,
fit=[0] * len(scan_list),
export=export,
twotheta=twotheta,
gamma=gamma,
omega=omega,
chi=chi,
phi=phi,
nu=nu,
)
scan_table_source.selected.indices = []
scan_table_source.selected.indices = [0]
@ -116,78 +134,109 @@ def create():
def file_open_button_callback():
nonlocal det_data
for f_ind, f_path in enumerate(file_select.value):
new_data = []
for f_path in file_select.value:
with open(f_path) as file:
base, ext = os.path.splitext(os.path.basename(f_path))
file_data = pyzebra.parse_1D(file, ext)
f_name = os.path.basename(f_path)
base, ext = os.path.splitext(f_name)
try:
file_data = pyzebra.parse_1D(file, ext)
except:
print(f"Error loading {f_name}")
continue
pyzebra.normalize_dataset(file_data, monitor_spinner.value)
if f_ind == 0: # first file
det_data = file_data
pyzebra.merge_duplicates(det_data)
if not new_data: # first file
new_data = file_data
pyzebra.merge_duplicates(new_data)
js_data.data.update(fname=[base, base])
else:
pyzebra.merge_datasets(det_data, file_data)
pyzebra.merge_datasets(new_data, file_data)
_init_datatable()
append_upload_button.disabled = False
if new_data:
det_data = new_data
_init_datatable()
append_upload_button.disabled = False
file_open_button = Button(label="Open New", width=100, disabled=True)
file_open_button.on_click(file_open_button_callback)
def file_append_button_callback():
file_data = []
for f_path in file_select.value:
with open(f_path) as file:
_, ext = os.path.splitext(f_path)
file_data = pyzebra.parse_1D(file, ext)
f_name = os.path.basename(f_path)
_, ext = os.path.splitext(f_name)
try:
file_data = pyzebra.parse_1D(file, ext)
except:
print(f"Error loading {f_name}")
continue
pyzebra.normalize_dataset(file_data, monitor_spinner.value)
pyzebra.merge_datasets(det_data, file_data)
_init_datatable()
if file_data:
_init_datatable()
file_append_button = Button(label="Append", width=100, disabled=True)
file_append_button.on_click(file_append_button_callback)
def upload_button_callback(_attr, _old, new):
def upload_button_callback(_attr, _old, _new):
nonlocal det_data
det_data = []
for f_str, f_name in zip(new, upload_button.filename):
new_data = []
for f_str, f_name in zip(upload_button.value, upload_button.filename):
with io.StringIO(base64.b64decode(f_str).decode()) as file:
base, ext = os.path.splitext(f_name)
file_data = pyzebra.parse_1D(file, ext)
try:
file_data = pyzebra.parse_1D(file, ext)
except:
print(f"Error loading {f_name}")
continue
pyzebra.normalize_dataset(file_data, monitor_spinner.value)
if not det_data: # first file
det_data = file_data
pyzebra.merge_duplicates(det_data)
if not new_data: # first file
new_data = file_data
pyzebra.merge_duplicates(new_data)
js_data.data.update(fname=[base, base])
else:
pyzebra.merge_datasets(det_data, file_data)
pyzebra.merge_datasets(new_data, file_data)
_init_datatable()
append_upload_button.disabled = False
if new_data:
det_data = new_data
_init_datatable()
append_upload_button.disabled = False
upload_div = Div(text="or upload new .ccl/.dat files:", margin=(5, 5, 0, 5))
upload_button = FileInput(accept=".ccl,.dat", multiple=True, width=200)
upload_button.on_change("value", upload_button_callback)
# for on_change("value", ...) or on_change("filename", ...),
# see https://github.com/bokeh/bokeh/issues/11461
upload_button.on_change("filename", upload_button_callback)
def append_upload_button_callback(_attr, _old, new):
for f_str, f_name in zip(new, append_upload_button.filename):
def append_upload_button_callback(_attr, _old, _new):
file_data = []
for f_str, f_name in zip(append_upload_button.value, append_upload_button.filename):
with io.StringIO(base64.b64decode(f_str).decode()) as file:
_, ext = os.path.splitext(f_name)
file_data = pyzebra.parse_1D(file, ext)
try:
file_data = pyzebra.parse_1D(file, ext)
except:
print(f"Error loading {f_name}")
continue
pyzebra.normalize_dataset(file_data, monitor_spinner.value)
pyzebra.merge_datasets(det_data, file_data)
_init_datatable()
if file_data:
_init_datatable()
append_upload_div = Div(text="append extra files:", margin=(5, 5, 0, 5))
append_upload_button = FileInput(accept=".ccl,.dat", multiple=True, width=200, disabled=True)
append_upload_button.on_change("value", append_upload_button_callback)
# for on_change("value", ...) or on_change("filename", ...),
# see https://github.com/bokeh/bokeh/issues/11461
append_upload_button.on_change("filename", append_upload_button_callback)
def monitor_spinner_callback(_attr, old, new):
if det_data:
@ -197,9 +246,9 @@ def create():
monitor_spinner = Spinner(title="Monitor:", mode="int", value=100_000, low=1, width=145)
monitor_spinner.on_change("value", monitor_spinner_callback)
def _update_datatable():
def _update_table():
fit_ok = [(1 if "fit" in scan else 0) for scan in det_data]
export = [scan.get("active", True) for scan in det_data]
export = [scan["export"] for scan in det_data]
scan_table_source.data.update(fit=fit_ok, export=export)
def _update_plot():
@ -259,7 +308,7 @@ def create():
plot_scatter_source = ColumnDataSource(dict(x=[0], y=[0], y_upper=[0], y_lower=[0]))
plot_scatter = plot.add_glyph(
plot_scatter_source, Scatter(x="x", y="y", line_color="steelblue")
plot_scatter_source, Scatter(x="x", y="y", line_color="steelblue", fill_color="steelblue")
)
plot.add_layout(Whisker(source=plot_scatter_source, base="x", upper="y_upper", lower="y_lower"))
@ -316,22 +365,45 @@ def create():
_update_plot()
def scan_table_source_callback(_attr, _old, _new):
def scan_table_source_callback(_attr, _old, new):
# unfortunately, we don't know if the change comes from data update or user input
# also `old` and `new` are the same for non-scalars
for scan, export in zip(det_data, new["export"]):
scan["export"] = export
_update_preview()
scan_table_source = ColumnDataSource(dict(scan=[], hkl=[], fit=[], export=[]))
scan_table_source = ColumnDataSource(
dict(
scan=[],
hkl=[],
fit=[],
export=[],
twotheta=[],
gamma=[],
omega=[],
chi=[],
phi=[],
nu=[],
)
)
scan_table_source.on_change("data", scan_table_source_callback)
scan_table_source.selected.on_change("indices", scan_table_select_callback)
scan_table = DataTable(
source=scan_table_source,
columns=[
TableColumn(field="scan", title="Scan", width=50),
TableColumn(field="hkl", title="hkl", width=100),
TableColumn(field="fit", title="Fit", width=50),
TableColumn(field="scan", title="Scan", editor=CellEditor(), width=50),
TableColumn(field="hkl", title="hkl", editor=CellEditor(), width=100),
TableColumn(field="fit", title="Fit", editor=CellEditor(), width=50),
TableColumn(field="export", title="Export", editor=CheckboxEditor(), width=50),
TableColumn(field="twotheta", title="2theta", editor=CellEditor(), width=50),
TableColumn(field="gamma", title="gamma", editor=CellEditor(), width=50),
TableColumn(field="omega", title="omega", editor=CellEditor(), width=50),
TableColumn(field="chi", title="chi", editor=CellEditor(), width=50),
TableColumn(field="phi", title="phi", editor=CellEditor(), width=50),
TableColumn(field="nu", title="nu", editor=CellEditor(), width=50),
],
width=310, # +60 because of the index column
width=310, # +60 because of the index column, but excluding twotheta onwards
height=350,
autosize_mode="none",
editable=True,
@ -351,7 +423,7 @@ def create():
return
pyzebra.merge_scans(scan_into, scan_from)
_update_datatable()
_update_table()
_update_plot()
merge_button = Button(label="Merge into current", width=145)
@ -359,7 +431,7 @@ def create():
def restore_button_callback():
pyzebra.restore_scan(_get_selected_scan())
_update_datatable()
_update_table()
_update_plot()
restore_button = Button(label="Restore scan", width=145)
@ -464,7 +536,7 @@ def create():
fitparams_table = DataTable(
source=fitparams_table_source,
columns=[
TableColumn(field="param", title="Parameter"),
TableColumn(field="param", title="Parameter", editor=CellEditor()),
TableColumn(field="value", title="Value", editor=NumberEditor()),
TableColumn(field="vary", title="Vary", editor=CheckboxEditor()),
TableColumn(field="min", title="Min", editor=NumberEditor()),
@ -485,8 +557,8 @@ def create():
fit_output_textinput = TextAreaInput(title="Fit results:", width=750, height=200)
def proc_all_button_callback():
for scan, export in zip(det_data, scan_table_source.data["export"]):
if export:
for scan in det_data:
if scan["export"]:
pyzebra.fit_scan(
scan, fit_params, fit_from=fit_from_spinner.value, fit_to=fit_to_spinner.value
)
@ -497,7 +569,7 @@ def create():
)
_update_plot()
_update_datatable()
_update_table()
proc_all_button = Button(label="Process All", button_type="primary", width=145)
proc_all_button.on_click(proc_all_button_callback)
@ -514,7 +586,7 @@ def create():
)
_update_plot()
_update_datatable()
_update_table()
proc_button = Button(label="Process Current", width=145)
proc_button.on_click(proc_button_callback)
@ -530,9 +602,9 @@ def create():
with tempfile.TemporaryDirectory() as temp_dir:
temp_file = temp_dir + "/temp"
export_data = []
for s, export in zip(det_data, scan_table_source.data["export"]):
if export:
export_data.append(s)
for scan in det_data:
if scan["export"]:
export_data.append(scan)
pyzebra.export_1D(
export_data,

View File

@ -1,6 +1,5 @@
import base64
import io
import math
import os
import numpy as np
@ -10,6 +9,7 @@ from bokeh.models import (
BasicTicker,
BoxZoomTool,
Button,
CellEditor,
CheckboxGroup,
ColumnDataSource,
DataRange1d,
@ -37,7 +37,6 @@ from bokeh.models import (
WheelZoomTool,
)
from bokeh.palettes import Cividis256, Greys256, Plasma256 # pylint: disable=E0611
from scipy.optimize import curve_fit
import pyzebra
@ -142,22 +141,29 @@ def create():
scan_table_source.data.update(frame=frame, x_pos=x_pos, y_pos=y_pos)
def _file_open():
new_data = []
for f_name in file_select.value:
try:
new_data.append(pyzebra.read_detector_data(f_name))
except KeyError:
print("Could not read data from the file.")
return
zebra_data.extend(new_data)
_init_datatable()
def file_open_button_callback():
nonlocal zebra_data
zebra_data = []
for f_name in file_select.value:
zebra_data.append(pyzebra.read_detector_data(f_name))
_init_datatable()
_file_open()
file_open_button = Button(label="Open New", width=100)
file_open_button.on_click(file_open_button_callback)
def file_append_button_callback():
for f_name in file_select.value:
zebra_data.append(pyzebra.read_detector_data(f_name))
_init_datatable()
_file_open()
file_append_button = Button(label="Append", width=100)
file_append_button.on_click(file_append_button_callback)
@ -210,7 +216,7 @@ def create():
scan_table = DataTable(
source=scan_table_source,
columns=[
TableColumn(field="file", title="file", width=150),
TableColumn(field="file", title="file", editor=CellEditor(), width=150),
TableColumn(
field="param",
title="param",
@ -218,9 +224,15 @@ def create():
editor=NumberEditor(),
width=50,
),
TableColumn(field="frame", title="Frame", formatter=num_formatter, width=70),
TableColumn(field="x_pos", title="X", formatter=num_formatter, width=70),
TableColumn(field="y_pos", title="Y", formatter=num_formatter, width=70),
TableColumn(
field="frame", title="Frame", formatter=num_formatter, editor=CellEditor(), width=70
),
TableColumn(
field="x_pos", title="X", formatter=num_formatter, editor=CellEditor(), width=70
),
TableColumn(
field="y_pos", title="Y", formatter=num_formatter, editor=CellEditor(), width=70
),
],
width=470, # +60 because of the index column
height=420,
@ -440,68 +452,6 @@ def create():
)
proj_display_min_spinner.on_change("value", proj_display_min_spinner_callback)
def fit_event(scan):
p0 = [1.0, 0.0, 1.0]
maxfev = 100000
# wave = scan["wave"]
# ddist = scan["ddist"]
# cell = scan["cell"]
# gamma = scan["gamma"][0]
# omega = scan["omega"][0]
# nu = scan["nu"][0]
# chi = scan["chi"][0]
# phi = scan["phi"][0]
scan_motor = scan["scan_motor"]
var_angle = scan[scan_motor]
x0 = int(np.floor(det_x_range.start))
xN = int(np.ceil(det_x_range.end))
y0 = int(np.floor(det_y_range.start))
yN = int(np.ceil(det_y_range.end))
fr0 = int(np.floor(frame_range.start))
frN = int(np.ceil(frame_range.end))
data_roi = scan["data"][fr0:frN, y0:yN, x0:xN]
cnts = np.sum(data_roi, axis=(1, 2))
coeff, _ = curve_fit(gauss, range(len(cnts)), cnts, p0=p0, maxfev=maxfev)
# m = cnts.mean()
# sd = cnts.std()
# snr_cnts = np.where(sd == 0, 0, m / sd)
frC = fr0 + coeff[1]
var_F = var_angle[math.floor(frC)]
var_C = var_angle[math.ceil(frC)]
# frStep = frC - math.floor(frC)
var_step = var_C - var_F
# var_p = var_F + var_step * frStep
# if scan_motor == "gamma":
# gamma = var_p
# elif scan_motor == "omega":
# omega = var_p
# elif scan_motor == "nu":
# nu = var_p
# elif scan_motor == "chi":
# chi = var_p
# elif scan_motor == "phi":
# phi = var_p
intensity = coeff[1] * abs(coeff[2] * var_step) * math.sqrt(2) * math.sqrt(np.pi)
projX = np.sum(data_roi, axis=(0, 1))
coeff, _ = curve_fit(gauss, range(len(projX)), projX, p0=p0, maxfev=maxfev)
x_pos = x0 + coeff[1]
projY = np.sum(data_roi, axis=(0, 2))
coeff, _ = curve_fit(gauss, range(len(projY)), projY, p0=p0, maxfev=maxfev)
y_pos = y0 + coeff[1]
scan["fit"] = {"frame": frC, "x_pos": x_pos, "y_pos": y_pos, "intensity": intensity}
metadata_table_source = ColumnDataSource(dict(geom=[""], temp=[None], mf=[None]))
metadata_table = DataTable(
source=metadata_table_source,
@ -549,7 +499,15 @@ def create():
def proc_all_button_callback():
for scan in zebra_data:
fit_event(scan)
pyzebra.fit_event(
scan,
int(np.floor(frame_range.start)),
int(np.ceil(frame_range.end)),
int(np.floor(det_y_range.start)),
int(np.ceil(det_y_range.end)),
int(np.floor(det_x_range.start)),
int(np.ceil(det_x_range.end)),
)
_update_table()
@ -566,7 +524,15 @@ def create():
proc_all_button.on_click(proc_all_button_callback)
def proc_button_callback():
fit_event(det_data)
pyzebra.fit_event(
det_data,
int(np.floor(frame_range.start)),
int(np.ceil(frame_range.end)),
int(np.floor(det_y_range.start)),
int(np.ceil(det_y_range.end)),
int(np.floor(det_x_range.start)),
int(np.ceil(det_x_range.end)),
)
_update_table()
@ -621,14 +587,3 @@ def create():
tab_layout = column(row(import_layout, scan_layout, plots))
return Panel(child=tab_layout, title="hdf param study")
def gauss(x, *p):
"""Defines Gaussian function
Args:
A - amplitude, mu - position of the center, sigma - width
Returns:
Gaussian function
"""
A, mu, sigma = p
return A * np.exp(-((x - mu) ** 2) / (2.0 * sigma ** 2))

View File

@ -1,6 +1,5 @@
import base64
import io
import math
import os
import numpy as np
@ -37,11 +36,11 @@ from bokeh.models import (
Spacer,
Spinner,
TableColumn,
Tabs,
Title,
WheelZoomTool,
)
from bokeh.palettes import Cividis256, Greys256, Plasma256 # pylint: disable=E0611
from scipy.optimize import curve_fit
import pyzebra
@ -97,16 +96,62 @@ def create():
proposal_textinput = doc.proposal_textinput
proposal_textinput.on_change("name", proposal_textinput_callback)
def upload_button_callback(_attr, _old, new):
def upload_cami_button_callback(_attr, _old, new):
nonlocal cami_meta
with io.StringIO(base64.b64decode(new).decode()) as file:
cami_meta = pyzebra.parse_h5meta(file)
data_source.value = "cami file"
file_select_update()
upload_div = Div(text="or upload .cami file:", margin=(5, 5, 0, 5))
upload_button = FileInput(accept=".cami", width=200)
upload_button.on_change("value", upload_button_callback)
upload_cami_div = Div(text="or upload .cami file:", margin=(5, 5, 0, 5))
upload_cami_button = FileInput(accept=".cami", width=200)
upload_cami_button.on_change("value", upload_cami_button_callback)
def _file_open(file, cami_meta):
nonlocal det_data
try:
det_data = pyzebra.read_detector_data(file, cami_meta)
except KeyError:
print("Could not read data from the file.")
return
last_im_index = det_data["data"].shape[0] - 1
index_spinner.value = 0
index_spinner.high = last_im_index
if last_im_index == 0:
index_slider.disabled = True
else:
index_slider.disabled = False
index_slider.end = last_im_index
zebra_mode = det_data["zebra_mode"]
if zebra_mode == "nb":
metadata_table_source.data.update(geom=["normal beam"])
else: # zebra_mode == "bi"
metadata_table_source.data.update(geom=["bisecting"])
update_image(0)
update_overview_plot()
def upload_hdf_button_callback(_attr, _old, new):
_file_open(io.BytesIO(base64.b64decode(new)), None)
upload_hdf_div = Div(text="or upload .hdf file:", margin=(5, 5, 0, 5))
upload_hdf_button = FileInput(accept=".hdf", width=200)
upload_hdf_button.on_change("value", upload_hdf_button_callback)
def file_open_button_callback():
if not file_select.value:
return
if data_source.value == "proposal number":
_file_open(file_select.value[0], None)
else:
_file_open(file_select.value[0], cami_meta)
file_open_button = Button(label="Open New", width=100)
file_open_button.on_click(file_open_button_callback)
def update_image(index=None):
if index is None:
@ -149,6 +194,34 @@ def create():
omega = np.ones((IMAGE_H, IMAGE_W)) * det_data["omega"][index]
image_source.data.update(gamma=[gamma], nu=[nu], omega=[omega])
# update detector center angles
det_c_x = int(IMAGE_W / 2)
det_c_y = int(IMAGE_H / 2)
if det_data["zebra_mode"] == "nb":
gamma_c = gamma[det_c_y, det_c_x]
nu_c = nu[det_c_y, det_c_x]
omega_c = omega[det_c_y, det_c_x]
chi_c = None
phi_c = None
else: # zebra_mode == "bi"
wave = det_data["wave"]
ddist = det_data["ddist"]
gammad = det_data["gamma"][index]
om = det_data["omega"][index]
ch = det_data["chi"][index]
ph = det_data["phi"][index]
nud = det_data["nu"]
nu_c = 0
chi_c, phi_c, gamma_c, omega_c = pyzebra.ang_proc(
wave, ddist, gammad, om, ch, ph, nud, det_c_x, det_c_y
)
detcenter_table_source.data.update(
gamma=[gamma_c], nu=[nu_c], omega=[omega_c], chi=[chi_c], phi=[phi_c],
)
def update_overview_plot():
h5_data = det_data["data"]
n_im, n_y, n_x = h5_data.shape
@ -186,7 +259,7 @@ def create():
var = det_data[scan_motor]
var_start = var[0]
var_end = var[-1] + (var[-1] - var[0]) / (n_im - 1)
var_end = var[-1] + (var[-1] - var[0]) / (n_im - 1) if n_im != 1 else var_start + 1
scanning_motor_range.start = var_start
scanning_motor_range.end = var_end
@ -195,8 +268,27 @@ def create():
# handle both, ascending and descending sequences
scanning_motor_range.bounds = (min(var_start, var_end), max(var_start, var_end))
gamma = image_source.data["gamma"][0]
gamma_start = gamma[0, 0]
gamma_end = gamma[0, -1]
gamma_range.start = gamma_start
gamma_range.end = gamma_end
gamma_range.reset_start = gamma_start
gamma_range.reset_end = gamma_end
gamma_range.bounds = (min(gamma_start, gamma_end), max(gamma_start, gamma_end))
nu = image_source.data["nu"][0]
nu_start = nu[0, 0]
nu_end = nu[-1, 0]
nu_range.start = nu_start
nu_range.end = nu_end
nu_range.reset_start = nu_start
nu_range.reset_end = nu_end
nu_range.bounds = (min(nu_start, nu_end), max(nu_start, nu_end))
def file_select_callback(_attr, old, new):
nonlocal det_data
if not new:
# skip empty selections
return
@ -211,20 +303,7 @@ def create():
# skip unnecessary update caused by selection drop
return
det_data = pyzebra.read_detector_data(new[0], cami_meta)
index_spinner.value = 0
index_spinner.high = det_data["data"].shape[0] - 1
index_slider.end = det_data["data"].shape[0] - 1
zebra_mode = det_data["zebra_mode"]
if zebra_mode == "nb":
metadata_table_source.data.update(geom=["normal beam"])
else: # zebra_mode == "bi"
metadata_table_source.data.update(geom=["bisecting"])
update_image(0)
update_overview_plot()
file_open_button_callback()
file_select = MultiSelect(title="Available .hdf files:", width=210, height=250)
file_select.on_change("value", file_select_callback)
@ -385,12 +464,14 @@ def create():
scanning_motor_range = Range1d(0, 1, bounds=(0, 1))
det_x_range = Range1d(0, IMAGE_W, bounds=(0, IMAGE_W))
gamma_range = Range1d(0, 1, bounds=(0, 1))
overview_plot_x = Plot(
title=Title(text="Projections on X-axis"),
x_range=det_x_range,
y_range=frame_range,
extra_x_ranges={"gamma": gamma_range},
extra_y_ranges={"scanning_motor": scanning_motor_range},
plot_height=400,
plot_height=450,
plot_width=IMAGE_PLOT_W - 3,
)
@ -404,6 +485,9 @@ def create():
# ---- axes
overview_plot_x.add_layout(LinearAxis(axis_label="Coordinate X, pix"), place="below")
overview_plot_x.add_layout(
LinearAxis(x_range_name="gamma", axis_label="Gamma, deg"), place="above"
)
overview_plot_x.add_layout(
LinearAxis(axis_label="Frame", major_label_orientation="vertical"), place="left"
)
@ -423,12 +507,14 @@ def create():
)
det_y_range = Range1d(0, IMAGE_H, bounds=(0, IMAGE_H))
nu_range = Range1d(0, 1, bounds=(0, 1))
overview_plot_y = Plot(
title=Title(text="Projections on Y-axis"),
x_range=det_y_range,
y_range=frame_range,
extra_x_ranges={"nu": nu_range},
extra_y_ranges={"scanning_motor": scanning_motor_range},
plot_height=400,
plot_height=450,
plot_width=IMAGE_PLOT_H + 22,
)
@ -442,6 +528,7 @@ def create():
# ---- axes
overview_plot_y.add_layout(LinearAxis(axis_label="Coordinate Y, pix"), place="below")
overview_plot_y.add_layout(LinearAxis(x_range_name="nu", axis_label="Nu, deg"), place="above")
overview_plot_y.add_layout(
LinearAxis(
y_range_name="scanning_motor",
@ -634,9 +721,32 @@ def create():
index_position=None,
)
detcenter_table_source = ColumnDataSource(dict(gamma=[], omega=[], chi=[], phi=[], nu=[]))
detcenter_table = DataTable(
source=detcenter_table_source,
columns=[
TableColumn(field="gamma", title="Gamma", formatter=num_formatter, width=70),
TableColumn(field="omega", title="Omega", formatter=num_formatter, width=70),
TableColumn(field="chi", title="Chi", formatter=num_formatter, width=70),
TableColumn(field="phi", title="Phi", formatter=num_formatter, width=70),
TableColumn(field="nu", title="Nu", formatter=num_formatter, width=70),
],
height=150,
width=350,
autosize_mode="none",
index_position=None,
)
def add_event_button_callback():
p0 = [1.0, 0.0, 1.0]
maxfev = 100000
pyzebra.fit_event(
det_data,
int(np.floor(frame_range.start)),
int(np.ceil(frame_range.end)),
int(np.floor(det_y_range.start)),
int(np.ceil(det_y_range.end)),
int(np.floor(det_x_range.start)),
int(np.ceil(det_x_range.end)),
)
wave = det_data["wave"]
ddist = det_data["ddist"]
@ -651,25 +761,12 @@ def create():
scan_motor = det_data["scan_motor"]
var_angle = det_data[scan_motor]
x0 = int(np.floor(det_x_range.start))
xN = int(np.ceil(det_x_range.end))
y0 = int(np.floor(det_y_range.start))
yN = int(np.ceil(det_y_range.end))
fr0 = int(np.floor(frame_range.start))
frN = int(np.ceil(frame_range.end))
data_roi = det_data["data"][fr0:frN, y0:yN, x0:xN]
snr_cnts = det_data["fit"]["snr"]
frC = det_data["fit"]["frame"]
cnts = np.sum(data_roi, axis=(1, 2))
coeff, _ = curve_fit(gauss, range(len(cnts)), cnts, p0=p0, maxfev=maxfev)
m = cnts.mean()
sd = cnts.std()
snr_cnts = np.where(sd == 0, 0, m / sd)
frC = fr0 + coeff[1]
var_F = var_angle[math.floor(frC)]
var_C = var_angle[math.ceil(frC)]
frStep = frC - math.floor(frC)
var_F = var_angle[int(np.floor(frC))]
var_C = var_angle[int(np.ceil(frC))]
frStep = frC - np.floor(frC)
var_step = var_C - var_F
var_p = var_F + var_step * frStep
@ -684,15 +781,13 @@ def create():
elif scan_motor == "phi":
phi = var_p
intensity = coeff[1] * abs(coeff[2] * var_step) * math.sqrt(2) * math.sqrt(np.pi)
intensity = det_data["fit"]["intensity"]
x_pos = det_data["fit"]["x_pos"]
y_pos = det_data["fit"]["y_pos"]
projX = np.sum(data_roi, axis=(0, 1))
coeff, _ = curve_fit(gauss, range(len(projX)), projX, p0=p0, maxfev=maxfev)
x_pos = x0 + coeff[1]
projY = np.sum(data_roi, axis=(0, 2))
coeff, _ = curve_fit(gauss, range(len(projY)), projY, p0=p0, maxfev=maxfev)
y_pos = y0 + coeff[1]
if det_data["zebra_mode"] == "nb":
chi = None
phi = None
events_data["wave"].append(wave)
events_data["ddist"].append(ddist)
@ -710,7 +805,7 @@ def create():
events_table_source.data = events_data
add_event_button = Button(label="Add spind event", width=145)
add_event_button = Button(label="Add peak center", width=145)
add_event_button.on_click(add_event_button_callback)
def remove_event_button_callback():
@ -721,7 +816,7 @@ def create():
events_table_source.data = events_data
remove_event_button = Button(label="Remove spind event", width=145)
remove_event_button = Button(label="Remove peak center", width=145)
remove_event_button.on_click(remove_event_button_callback)
metadata_table_source = ColumnDataSource(dict(geom=[""], temp=[None], mf=[None]))
@ -739,7 +834,23 @@ def create():
)
# Final layout
import_layout = column(data_source, upload_div, upload_button, file_select)
peak_tables = Tabs(
tabs=[
Panel(child=events_table, title="Actual peak center"),
Panel(child=detcenter_table, title="Peak in the detector center"),
]
)
import_layout = column(
data_source,
upload_cami_div,
upload_cami_button,
upload_hdf_div,
upload_hdf_button,
file_select,
file_open_button,
)
layout_image = column(gridplot([[proj_v, None], [plot, proj_h]], merge_tools=False))
colormap_layout = column(
colormap,
@ -751,7 +862,7 @@ def create():
layout_controls = column(
row(metadata_table, index_spinner, column(Spacer(height=25), index_slider)),
row(column(add_event_button, remove_event_button), events_table),
row(column(add_event_button, remove_event_button), peak_tables),
)
layout_overview = column(
@ -772,17 +883,6 @@ def create():
return Panel(child=tab_layout, title="hdf viewer")
def gauss(x, *p):
"""Defines Gaussian function
Args:
A - amplitude, mu - position of the center, sigma - width
Returns:
Gaussian function
"""
A, mu, sigma = p
return A * np.exp(-((x - mu) ** 2) / (2.0 * sigma ** 2))
def calculate_hkl(det_data, index):
h = np.empty(shape=(IMAGE_H, IMAGE_W))
k = np.empty(shape=(IMAGE_H, IMAGE_W))
@ -815,15 +915,10 @@ def calculate_hkl(det_data, index):
def calculate_pol(det_data, index):
gamma = np.empty(shape=(IMAGE_H, IMAGE_W))
nu = np.empty(shape=(IMAGE_H, IMAGE_W))
ddist = det_data["ddist"]
gammad = det_data["gamma"][index]
nud = det_data["nu"]
for xi in np.arange(IMAGE_W):
for yi in np.arange(IMAGE_H):
gamma[yi, xi], nu[yi, xi] = pyzebra.det2pol(ddist, gammad, nud, xi, yi)
yi, xi = np.ogrid[:IMAGE_H, :IMAGE_W]
gamma, nu = pyzebra.det2pol(ddist, gammad, nud, xi, yi)
return gamma, nu

View File

@ -11,6 +11,7 @@ from bokeh.layouts import column, row
from bokeh.models import (
BasicTicker,
Button,
CellEditor,
CheckboxEditor,
CheckboxGroup,
ColumnDataSource,
@ -26,6 +27,7 @@ from bokeh.models import (
Legend,
Line,
LinearAxis,
LinearColorMapper,
MultiLine,
MultiSelect,
NumberEditor,
@ -33,6 +35,7 @@ from bokeh.models import (
PanTool,
Plot,
RadioGroup,
Range1d,
ResetTool,
Scatter,
Select,
@ -45,8 +48,7 @@ from bokeh.models import (
WheelZoomTool,
Whisker,
)
from bokeh.palettes import Category10, Turbo256
from bokeh.transform import linear_cmap
from bokeh.palettes import Category10, Plasma256
from scipy import interpolate
import pyzebra
@ -110,105 +112,142 @@ def create():
def _init_datatable():
scan_list = [s["idx"] for s in det_data]
export = [s["export"] for s in det_data]
if param_select.value == "user defined":
param = [None] * len(det_data)
else:
param = [scan[param_select.value] for scan in det_data]
file_list = []
for scan in det_data:
file_list.append(os.path.basename(scan["original_filename"]))
scan_table_source.data.update(
file=file_list,
scan=scan_list,
param=[None] * len(scan_list),
fit=[0] * len(scan_list),
export=[True] * len(scan_list),
file=file_list, scan=scan_list, param=param, fit=[0] * len(scan_list), export=export,
)
scan_table_source.selected.indices = []
scan_table_source.selected.indices = [0]
scan_motor_select.options = det_data[0]["scan_motors"]
scan_motor_select.value = det_data[0]["scan_motor"]
param_select.value = "user defined"
merge_options = [(str(i), f"{i} ({idx})") for i, idx in enumerate(scan_list)]
merge_from_select.options = merge_options
merge_from_select.value = merge_options[0][0]
file_select = MultiSelect(title="Available .ccl/.dat files:", width=210, height=250)
def file_open_button_callback():
nonlocal det_data
for f_ind, f_path in enumerate(file_select.value):
new_data = []
for f_path in file_select.value:
with open(f_path) as file:
base, ext = os.path.splitext(os.path.basename(f_path))
file_data = pyzebra.parse_1D(file, ext)
f_name = os.path.basename(f_path)
base, ext = os.path.splitext(f_name)
try:
file_data = pyzebra.parse_1D(file, ext)
except:
print(f"Error loading {f_name}")
continue
pyzebra.normalize_dataset(file_data, monitor_spinner.value)
if f_ind == 0: # first file
det_data = file_data
pyzebra.merge_duplicates(det_data)
if not new_data: # first file
new_data = file_data
pyzebra.merge_duplicates(new_data)
js_data.data.update(fname=[base])
else:
pyzebra.merge_datasets(det_data, file_data)
pyzebra.merge_datasets(new_data, file_data)
_init_datatable()
append_upload_button.disabled = False
if new_data:
det_data = new_data
_init_datatable()
append_upload_button.disabled = False
file_open_button = Button(label="Open New", width=100, disabled=True)
file_open_button.on_click(file_open_button_callback)
def file_append_button_callback():
file_data = []
for f_path in file_select.value:
with open(f_path) as file:
_, ext = os.path.splitext(f_path)
file_data = pyzebra.parse_1D(file, ext)
f_name = os.path.basename(f_path)
_, ext = os.path.splitext(f_name)
try:
file_data = pyzebra.parse_1D(file, ext)
except:
print(f"Error loading {f_name}")
continue
pyzebra.normalize_dataset(file_data, monitor_spinner.value)
pyzebra.merge_datasets(det_data, file_data)
_init_datatable()
if file_data:
_init_datatable()
file_append_button = Button(label="Append", width=100, disabled=True)
file_append_button.on_click(file_append_button_callback)
def upload_button_callback(_attr, _old, new):
def upload_button_callback(_attr, _old, _new):
nonlocal det_data
det_data = []
for f_str, f_name in zip(new, upload_button.filename):
new_data = []
for f_str, f_name in zip(upload_button.value, upload_button.filename):
with io.StringIO(base64.b64decode(f_str).decode()) as file:
base, ext = os.path.splitext(f_name)
file_data = pyzebra.parse_1D(file, ext)
try:
file_data = pyzebra.parse_1D(file, ext)
except:
print(f"Error loading {f_name}")
continue
pyzebra.normalize_dataset(file_data, monitor_spinner.value)
if not det_data: # first file
det_data = file_data
pyzebra.merge_duplicates(det_data)
if not new_data: # first file
new_data = file_data
pyzebra.merge_duplicates(new_data)
js_data.data.update(fname=[base])
else:
pyzebra.merge_datasets(det_data, file_data)
pyzebra.merge_datasets(new_data, file_data)
_init_datatable()
append_upload_button.disabled = False
if new_data:
det_data = new_data
_init_datatable()
append_upload_button.disabled = False
upload_div = Div(text="or upload new .ccl/.dat files:", margin=(5, 5, 0, 5))
upload_button = FileInput(accept=".ccl,.dat", multiple=True, width=200)
upload_button.on_change("value", upload_button_callback)
# for on_change("value", ...) or on_change("filename", ...),
# see https://github.com/bokeh/bokeh/issues/11461
upload_button.on_change("filename", upload_button_callback)
def append_upload_button_callback(_attr, _old, new):
for f_str, f_name in zip(new, append_upload_button.filename):
def append_upload_button_callback(_attr, _old, _new):
file_data = []
for f_str, f_name in zip(append_upload_button.value, append_upload_button.filename):
with io.StringIO(base64.b64decode(f_str).decode()) as file:
_, ext = os.path.splitext(f_name)
file_data = pyzebra.parse_1D(file, ext)
try:
file_data = pyzebra.parse_1D(file, ext)
except:
print(f"Error loading {f_name}")
continue
pyzebra.normalize_dataset(file_data, monitor_spinner.value)
pyzebra.merge_datasets(det_data, file_data)
_init_datatable()
if file_data:
_init_datatable()
append_upload_div = Div(text="append extra files:", margin=(5, 5, 0, 5))
append_upload_button = FileInput(accept=".ccl,.dat", multiple=True, width=200, disabled=True)
append_upload_button.on_change("value", append_upload_button_callback)
# for on_change("value", ...) or on_change("filename", ...),
# see https://github.com/bokeh/bokeh/issues/11461
append_upload_button.on_change("filename", append_upload_button_callback)
def monitor_spinner_callback(_attr, _old, new):
if det_data:
pyzebra.normalize_dataset(det_data, new)
_update_plot()
_update_single_scan_plot()
_update_overview()
monitor_spinner = Spinner(title="Monitor:", mode="int", value=100_000, low=1, width=145)
monitor_spinner.on_change("value", monitor_spinner_callback)
@ -217,18 +256,21 @@ def create():
if det_data:
for scan in det_data:
scan["scan_motor"] = new
_update_plot()
_update_single_scan_plot()
_update_overview()
scan_motor_select = Select(title="Scan motor:", options=[], width=145)
scan_motor_select.on_change("value", scan_motor_select_callback)
def _update_table():
fit_ok = [(1 if "fit" in scan else 0) for scan in det_data]
scan_table_source.data.update(fit=fit_ok)
export = [scan["export"] for scan in det_data]
if param_select.value == "user defined":
param = [None] * len(det_data)
else:
param = [scan[param_select.value] for scan in det_data]
def _update_plot():
_update_single_scan_plot()
_update_overview()
scan_table_source.data.update(fit=fit_ok, export=export, param=param)
def _update_single_scan_plot():
scan = _get_selected_scan()
@ -296,23 +338,30 @@ def create():
ov_plot_mline_source.data.update(xs=xs, ys=ys, param=param, color=color_palette(len(xs)))
if y:
mapper["transform"].low = np.min([np.min(y) for y in ys])
mapper["transform"].high = np.max([np.max(y) for y in ys])
ov_param_plot_scatter_source.data.update(x=x, y=y, param=par)
ov_param_plot_scatter_source.data.update(x=x, y=y)
if y:
interp_f = interpolate.interp2d(x, y, par)
x1, x2 = min(x), max(x)
y1, y2 = min(y), max(y)
image = interp_f(
np.linspace(x1, x2, ov_param_plot.inner_width // 10),
np.linspace(y1, y2, ov_param_plot.inner_height // 10),
assume_sorted=True,
grid_x, grid_y = np.meshgrid(
np.linspace(x1, x2, ov_param_plot.inner_width),
np.linspace(y1, y2, ov_param_plot.inner_height),
)
image = interpolate.griddata((x, y), par, (grid_x, grid_y))
ov_param_plot_image_source.data.update(
image=[image], x=[x1], y=[y1], dw=[x2 - x1], dh=[y2 - y1]
)
x_range = ov_param_plot.x_range
x_range.start, x_range.end = x1, x2
x_range.reset_start, x_range.reset_end = x1, x2
x_range.bounds = (x1, x2)
y_range = ov_param_plot.y_range
y_range.start, y_range.end = y1, y2
y_range.reset_start, y_range.reset_end = y1, y2
y_range.bounds = (y1, y2)
else:
ov_param_plot_image_source.data.update(image=[], x=[], y=[], dw=[], dh=[])
@ -349,7 +398,7 @@ def create():
plot_scatter_source = ColumnDataSource(dict(x=[0], y=[0], y_upper=[0], y_lower=[0]))
plot_scatter = plot.add_glyph(
plot_scatter_source, Scatter(x="x", y="y", line_color="steelblue")
plot_scatter_source, Scatter(x="x", y="y", line_color="steelblue", fill_color="steelblue")
)
plot.add_layout(Whisker(source=plot_scatter_source, base="x", upper="y_upper", lower="y_lower"))
@ -407,9 +456,7 @@ def create():
ov_plot.toolbar.logo = None
# Overview perams plot
ov_param_plot = Plot(
x_range=DataRange1d(), y_range=DataRange1d(), plot_height=450, plot_width=700
)
ov_param_plot = Plot(x_range=Range1d(), y_range=Range1d(), plot_height=450, plot_width=700)
ov_param_plot.add_layout(LinearAxis(axis_label="Param"), place="left")
ov_param_plot.add_layout(LinearAxis(axis_label="Scan motor"), place="below")
@ -417,16 +464,16 @@ def create():
ov_param_plot.add_layout(Grid(dimension=0, ticker=BasicTicker()))
ov_param_plot.add_layout(Grid(dimension=1, ticker=BasicTicker()))
color_mapper = LinearColorMapper(palette=Plasma256)
ov_param_plot_image_source = ColumnDataSource(dict(image=[], x=[], y=[], dw=[], dh=[]))
ov_param_plot.add_glyph(
ov_param_plot_image_source, Image(image="image", x="x", y="y", dw="dw", dh="dh")
ov_param_plot_image_source,
Image(image="image", x="x", y="y", dw="dw", dh="dh", color_mapper=color_mapper),
)
ov_param_plot_scatter_source = ColumnDataSource(dict(x=[], y=[], param=[]))
mapper = linear_cmap(field_name="param", palette=Turbo256, low=0, high=50)
ov_param_plot_scatter_source = ColumnDataSource(dict(x=[], y=[]))
ov_param_plot.add_glyph(
ov_param_plot_scatter_source,
Scatter(x="x", y="y", line_color=mapper, fill_color=mapper, size=10),
ov_param_plot_scatter_source, Scatter(x="x", y="y", marker="dot", size=15),
)
ov_param_plot.add_tools(PanTool(), WheelZoomTool(), ResetTool())
@ -482,9 +529,15 @@ def create():
# skip unnecessary update caused by selection drop
return
_update_plot()
_update_single_scan_plot()
def scan_table_source_callback(_attr, _old, _new):
def scan_table_source_callback(_attr, _old, new):
# unfortunately, we don't know if the change comes from data update or user input
# also `old` and `new` are the same for non-scalars
for scan, export in zip(det_data, new["export"]):
scan["export"] = export
_update_overview()
_update_param_plot()
_update_preview()
scan_table_source = ColumnDataSource(dict(file=[], scan=[], param=[], fit=[], export=[]))
@ -494,28 +547,50 @@ def create():
scan_table = DataTable(
source=scan_table_source,
columns=[
TableColumn(field="file", title="file", width=150),
TableColumn(field="scan", title="scan", width=50),
TableColumn(field="file", title="file", editor=CellEditor(), width=150),
TableColumn(field="scan", title="scan", editor=CellEditor(), width=50),
TableColumn(field="param", title="param", editor=NumberEditor(), width=50),
TableColumn(field="fit", title="Fit", width=50),
TableColumn(field="fit", title="Fit", editor=CellEditor(), width=50),
TableColumn(field="export", title="Export", editor=CheckboxEditor(), width=50),
],
width=410, # +60 because of the index column
height=350,
editable=True,
autosize_mode="none",
)
merge_from_select = Select(title="scan:", width=145)
def merge_button_callback():
scan_into = _get_selected_scan()
scan_from = det_data[int(merge_from_select.value)]
if scan_into is scan_from:
print("WARNING: Selected scans for merging are identical")
return
pyzebra.merge_scans(scan_into, scan_from)
_update_table()
_update_single_scan_plot()
_update_overview()
merge_button = Button(label="Merge into current", width=145)
merge_button.on_click(merge_button_callback)
def restore_button_callback():
pyzebra.restore_scan(_get_selected_scan())
_update_table()
_update_single_scan_plot()
_update_overview()
restore_button = Button(label="Restore scan", width=145)
restore_button.on_click(restore_button_callback)
def _get_selected_scan():
return det_data[scan_table_source.selected.indices[0]]
def param_select_callback(_attr, _old, new):
if new == "user defined":
param = [None] * len(det_data)
else:
param = [scan[new] for scan in det_data]
scan_table_source.data["param"] = param
_update_param_plot()
def param_select_callback(_attr, _old, _new):
_update_table()
param_select = Select(
title="Parameter:",
@ -624,7 +699,7 @@ def create():
fitparams_table = DataTable(
source=fitparams_table_source,
columns=[
TableColumn(field="param", title="Parameter"),
TableColumn(field="param", title="Parameter", editor=CellEditor()),
TableColumn(field="value", title="Value", editor=NumberEditor()),
TableColumn(field="vary", title="Vary", editor=CheckboxEditor()),
TableColumn(field="min", title="Min", editor=NumberEditor()),
@ -645,8 +720,8 @@ def create():
fit_output_textinput = TextAreaInput(title="Fit results:", width=750, height=200)
def proc_all_button_callback():
for scan, export in zip(det_data, scan_table_source.data["export"]):
if export:
for scan in det_data:
if scan["export"]:
pyzebra.fit_scan(
scan, fit_params, fit_from=fit_from_spinner.value, fit_to=fit_to_spinner.value
)
@ -656,7 +731,8 @@ def create():
lorentz=lorentz_checkbox.active,
)
_update_plot()
_update_single_scan_plot()
_update_overview()
_update_table()
for scan in det_data:
@ -665,7 +741,6 @@ def create():
fit_param_select.options = options
fit_param_select.value = options[0]
break
_update_param_plot()
proc_all_button = Button(label="Process All", button_type="primary", width=145)
proc_all_button.on_click(proc_all_button_callback)
@ -681,7 +756,8 @@ def create():
lorentz=lorentz_checkbox.active,
)
_update_plot()
_update_single_scan_plot()
_update_overview()
_update_table()
for scan in det_data:
@ -690,7 +766,6 @@ def create():
fit_param_select.options = options
fit_param_select.value = options[0]
break
_update_param_plot()
proc_button = Button(label="Process Current", width=145)
proc_button.on_click(proc_button_callback)
@ -707,12 +782,10 @@ def create():
temp_file = temp_dir + "/temp"
export_data = []
param_data = []
for s, p, export in zip(
det_data, scan_table_source.data["param"], scan_table_source.data["export"]
):
if export:
export_data.append(s)
param_data.append(p)
for scan, param in zip(det_data, scan_table_source.data["param"]):
if scan["export"] and param:
export_data.append(scan)
param_data.append(param)
pyzebra.export_param_study(export_data, param_data, temp_file)
@ -742,7 +815,11 @@ def create():
column(fit_to_spinner, proc_button, proc_all_button),
)
scan_layout = column(scan_table, row(monitor_spinner, scan_motor_select, param_select))
scan_layout = column(
scan_table,
row(monitor_spinner, scan_motor_select, param_select),
row(column(Spacer(height=19), row(restore_button, merge_button)), merge_from_select),
)
import_layout = column(
file_select,

View File

@ -144,6 +144,7 @@ def parse_1D(fileobj, data_type):
continue
s = {}
s["export"] = True
# first line
for param, (param_name, param_type) in zip(line.split(), ccl_first_line):
@ -169,7 +170,7 @@ def parse_1D(fileobj, data_type):
while len(counts) < s["n_points"]:
counts.extend(map(float, next(fileobj).split()))
s["counts"] = np.array(counts)
s["counts_err"] = np.sqrt(s["counts"])
s["counts_err"] = np.sqrt(np.maximum(s["counts"], 1))
if s["h"].is_integer() and s["k"].is_integer() and s["l"].is_integer():
s["h"], s["k"], s["l"] = map(int, (s["h"], s["k"], s["l"]))
@ -182,6 +183,7 @@ def parse_1D(fileobj, data_type):
metadata["gamma"] = metadata["twotheta"]
s = defaultdict(list)
s["export"] = True
match = re.search("Scanning Variables: (.*), Steps: (.*)", next(fileobj))
motors = [motor.lower() for motor in match.group(1).split(", ")]
@ -190,6 +192,7 @@ def parse_1D(fileobj, data_type):
match = re.search("(.*) Points, Mode: (.*), Preset (.*)", next(fileobj))
if match.group(2) != "Monitor":
raise Exception("Unknown mode in dat file.")
s["n_points"] = int(match.group(1))
s["monitor"] = float(match.group(3))
col_names = list(map(str.lower, next(fileobj).split()))
@ -205,7 +208,7 @@ def parse_1D(fileobj, data_type):
for name in col_names:
s[name] = np.array(s[name])
s["counts_err"] = np.sqrt(s["counts"])
s["counts_err"] = np.sqrt(np.maximum(s["counts"], 1))
s["scan_motors"] = []
for motor, step in zip(motors, steps):
@ -303,6 +306,63 @@ def export_1D(data, path, export_target, hkl_precision=2):
out_file.writelines(content)
def export_ccl_compare(data1, data2, path, export_target, hkl_precision=2):
"""Exports compare data in the .comm/.incomm format for fullprof or .col/.incol format for jana.
Scans with integer/real hkl values are saved in .comm/.incomm or .col/.incol files
correspondingly. If no scans are present for a particular output format, that file won't be
created.
"""
if export_target not in EXPORT_TARGETS:
raise ValueError(f"Unknown export target: {export_target}.")
zebra_mode = data1[0]["zebra_mode"]
exts = EXPORT_TARGETS[export_target]
file_content = {ext: [] for ext in exts}
for scan1, scan2 in zip(data1, data2):
if "fit" not in scan1:
continue
idx_str = f"{scan1['idx']:6}"
h, k, l = scan1["h"], scan1["k"], scan1["l"]
hkl_are_integers = isinstance(h, int) # if True, other indices are of type 'int' too
if hkl_are_integers:
hkl_str = f"{h:4}{k:4}{l:4}"
else:
hkl_str = f"{h:8.{hkl_precision}f}{k:8.{hkl_precision}f}{l:8.{hkl_precision}f}"
area_n1, area_s1 = scan1["area"]
area_n2, area_s2 = scan2["area"]
area_n = area_n1 - area_n2
area_s = np.sqrt(area_s1 ** 2 + area_s2 ** 2)
area_str = f"{area_n:10.2f}{area_s:10.2f}"
ang_str = ""
for angle, _ in CCL_ANGLES[zebra_mode]:
if angle == scan1["scan_motor"]:
angle_center = (np.min(scan1[angle]) + np.max(scan1[angle])) / 2
else:
angle_center = scan1[angle]
if angle == "twotheta" and export_target == "jana":
angle_center /= 2
ang_str = ang_str + f"{angle_center:8g}"
if export_target == "jana":
ang_str = ang_str + f"{scan1['temp']:8}" + f"{scan1['monitor']:8}"
ref = file_content[exts[0]] if hkl_are_integers else file_content[exts[1]]
ref.append(idx_str + hkl_str + area_str + ang_str + "\n")
for ext, content in file_content.items():
if content:
with open(path + ext, "w") as out_file:
out_file.writelines(content)
def export_param_study(data, param_data, path):
file_content = []
for scan, param in zip(data, param_data):

View File

@ -1,7 +1,7 @@
import os
import numpy as np
from lmfit.models import GaussianModel, LinearModel, PseudoVoigtModel, VoigtModel
from lmfit.models import Gaussian2dModel, GaussianModel, LinearModel, PseudoVoigtModel, VoigtModel
from scipy.integrate import simpson, trapezoid
from .ccl_io import CCL_ANGLES
@ -68,6 +68,12 @@ def _parameters_match(scan1, scan2):
def merge_datasets(dataset_into, dataset_from):
scan_motors_into = dataset_into[0]["scan_motors"]
scan_motors_from = dataset_from[0]["scan_motors"]
if scan_motors_into != scan_motors_from:
print(f"Scan motors mismatch between datasets: {scan_motors_into} vs {scan_motors_from}")
return
merged = np.zeros(len(dataset_from), dtype=np.bool)
for scan_into in dataset_into:
for ind, scan_from in enumerate(dataset_from):
@ -80,7 +86,6 @@ def merge_datasets(dataset_into, dataset_from):
def merge_scans(scan_into, scan_from):
# TODO: does it need to be "scan_motor" instead of omega for a generalized solution?
if "init_scan" not in scan_into:
scan_into["init_scan"] = scan_into.copy()
@ -92,32 +97,43 @@ def merge_scans(scan_into, scan_from):
scan_into["merged_scans"].append(scan_from)
if (
scan_into["omega"].shape == scan_from["omega"].shape
and np.max(np.abs(scan_into["omega"] - scan_from["omega"])) < 0.0005
):
counts_tmp = 0
counts_err_tmp = 0
scan_motor = scan_into["scan_motor"] # the same as scan_from["scan_motor"]
for scan in [scan_into["init_scan"], *scan_into["merged_scans"]]:
counts_tmp += scan["counts"]
counts_err_tmp += scan["counts_err"] ** 2
pos_all = np.array([])
val_all = np.array([])
err_all = np.array([])
for scan in [scan_into["init_scan"], *scan_into["merged_scans"]]:
pos_all = np.append(pos_all, scan[scan_motor])
val_all = np.append(val_all, scan["counts"])
err_all = np.append(err_all, scan["counts_err"] ** 2)
scan_into["counts"] = counts_tmp / (1 + len(scan_into["merged_scans"]))
scan_into["counts_err"] = np.sqrt(counts_err_tmp)
sort_index = np.argsort(pos_all)
pos_all = pos_all[sort_index]
val_all = val_all[sort_index]
err_all = err_all[sort_index]
else:
omega = np.concatenate((scan_into["omega"], scan_from["omega"]))
counts = np.concatenate((scan_into["counts"], scan_from["counts"]))
counts_err = np.concatenate((scan_into["counts_err"], scan_from["counts_err"]))
pos_tmp = pos_all[:1]
val_tmp = val_all[:1]
err_tmp = err_all[:1]
num_tmp = np.array([1])
for pos, val, err in zip(pos_all[1:], val_all[1:], err_all[1:]):
if pos - pos_tmp[-1] < 0.0005:
# the repeated motor position
val_tmp[-1] += val
err_tmp[-1] += err
num_tmp[-1] += 1
else:
# a new motor position
pos_tmp = np.append(pos_tmp, pos)
val_tmp = np.append(val_tmp, val)
err_tmp = np.append(err_tmp, err)
num_tmp = np.append(num_tmp, 1)
index = np.argsort(omega)
scan_into[scan_motor] = pos_tmp
scan_into["counts"] = val_tmp / num_tmp
scan_into["counts_err"] = np.sqrt(err_tmp) / num_tmp
scan_into["omega"] = omega[index]
scan_into["counts"] = counts[index]
scan_into["counts_err"] = counts_err[index]
scan_from["active"] = False
scan_from["export"] = False
fname1 = os.path.basename(scan_into["original_filename"])
fname2 = os.path.basename(scan_from["original_filename"])
@ -127,12 +143,16 @@ def merge_scans(scan_into, scan_from):
def restore_scan(scan):
if "merged_scans" in scan:
for merged_scan in scan["merged_scans"]:
merged_scan["active"] = True
merged_scan["export"] = True
if "init_scan" in scan:
tmp = scan["init_scan"]
scan.clear()
scan.update(tmp)
# force scan export to True, otherwise in the sequence of incorrectly merged scans
# a <- b <- c the scan b will be restored with scan["export"] = False if restoring executed
# in the same order, i.e. restore a -> restore b
scan["export"] = True
def fit_scan(scan, model_dict, fit_from=None, fit_to=None):
@ -147,6 +167,10 @@ def fit_scan(scan, model_dict, fit_from=None, fit_to=None):
# apply fitting range
fit_ind = (fit_from <= x_fit) & (x_fit <= fit_to)
if not np.any(fit_ind):
print(f"No data in fit range for scan {scan['idx']}")
return
y_fit = y_fit[fit_ind]
y_err = y_err[fit_ind]
x_fit = x_fit[fit_ind]
@ -196,11 +220,13 @@ def fit_scan(scan, model_dict, fit_from=None, fit_to=None):
else:
model += _model
weights = [1 / y_err if y_err != 0 else 1 for y_err in y_err]
scan["fit"] = model.fit(y_fit, x=x_fit, weights=weights)
scan["fit"] = model.fit(y_fit, x=x_fit, weights=1 / y_err)
def get_area(scan, area_method, lorentz):
if "fit" not in scan:
return
if area_method not in AREA_METHODS:
raise ValueError(f"Unknown area method: {area_method}.")
@ -209,12 +235,8 @@ def get_area(scan, area_method, lorentz):
area_s = 0
for name, param in scan["fit"].params.items():
if "amplitude" in name:
if param.stderr is None:
area_v = np.nan
area_s = np.nan
else:
area_v += param.value
area_s += param.stderr
area_v += np.nan if param.value is None else param.value
area_s += np.nan if param.stderr is None else param.stderr
else: # area_method == "int_area"
y_val = scan["counts"]
@ -237,3 +259,31 @@ def get_area(scan, area_method, lorentz):
area_s = np.abs(area_s * corr_factor)
scan["area"] = (area_v, area_s)
def fit_event(scan, fr_from, fr_to, y_from, y_to, x_from, x_to):
data_roi = scan["data"][fr_from:fr_to, y_from:y_to, x_from:x_to]
model = GaussianModel()
fr = np.arange(fr_from, fr_to)
counts_per_fr = np.sum(data_roi, axis=(1, 2))
params = model.guess(counts_per_fr, fr)
result = model.fit(counts_per_fr, x=fr, params=params)
frC = result.params["center"].value
intensity = result.params["height"].value
counts_std = counts_per_fr.std()
counts_mean = counts_per_fr.mean()
snr = 0 if counts_std == 0 else counts_mean / counts_std
model = Gaussian2dModel()
xs, ys = np.meshgrid(np.arange(x_from, x_to), np.arange(y_from, y_to))
xs = xs.flatten()
ys = ys.flatten()
counts = np.sum(data_roi, axis=0).flatten()
params = model.guess(counts, xs, ys)
result = model.fit(counts, x=xs, y=ys, params=params)
xC = result.params["centerx"].value
yC = result.params["centery"].value
scan["fit"] = {"frame": frC, "x_pos": xC, "y_pos": yC, "intensity": intensity, "snr": snr}

View File

@ -103,12 +103,16 @@ def read_detector_data(filepath, cami_meta=None):
det_data["name"] = h5f["/entry1/sample/name"][0].decode()
det_data["cell"] = h5f["/entry1/sample/cell"][:]
for var in ("omega", "gamma", "nu", "chi", "phi"):
if abs(det_data[var][0] - det_data[var][-1]) > 0.1:
det_data["scan_motor"] = var
break
if n == 1:
# a default motor for a single frame file
det_data["scan_motor"] = "omega"
else:
raise ValueError("No angles that vary")
for var in ("omega", "gamma", "nu", "chi", "phi"):
if abs(det_data[var][0] - det_data[var][-1]) > 0.1:
det_data["scan_motor"] = var
break
else:
raise ValueError("No angles that vary")
# optional parameters
if "/entry1/sample/magnetic_field" in h5f:
@ -132,7 +136,7 @@ def read_detector_data(filepath, cami_meta=None):
if "detector parameters" in cami_meta:
cami_meta_detparam = cami_meta["detector parameters"]
if "dist1" in cami_meta_detparam:
det_data["ddist"] = cami_meta_detparam["dist1"]
if "dist2" in cami_meta_detparam:
det_data["ddist"] = cami_meta_detparam["dist2"]
return det_data

View File

@ -372,6 +372,17 @@ def ang2hkl(wave, ddist, gammad, om, ch, ph, nud, ub, x, y):
return hkl
def ang_proc(wave, ddist, gammad, om, ch, ph, nud, x, y):
"""Utility function to calculate ch, ph, ga, om
"""
ga, nu = det2pol(ddist, gammad, nud, x, y)
z1 = z1frmd(wave, ga, om, ch, ph, nu)
ch2, ph2 = eqchph(z1)
ch, ph, ga, om = fixdnu(wave, z1, ch2, ph2, nu)
return ch, ph, ga, om
def gauss(x, *p):
"""Defines Gaussian function