24 Commits
0.1.2 ... 0.2.0

Author SHA1 Message Date
9bd959e656 Updating for version 0.2.0 2020-11-23 16:56:26 +01:00
65b28fffc6 Fix deps 2020-11-23 16:56:19 +01:00
0d8a30b995 Fix real indices display and export 2020-11-23 16:45:39 +01:00
c602a3df2e Adapt param study to work with a set of dat files 2020-11-23 16:07:31 +01:00
b19b70caae Add scan_number to dat-file metadata 2020-11-23 10:41:11 +01:00
b08f3c27db Use unified_merge for datasets merging 2020-11-18 13:40:38 +01:00
e15f9c9c3e Clarify terms nb and bi in labels 2020-11-18 13:24:19 +01:00
b62573fa09 Add 'param study' tab based on 'ccl integrate' tab 2020-11-18 09:48:50 +01:00
f7f016cf1c First draft of new merge function
This is the first shot at the new merge function. I didnt wanna rewrite the previous before we agree on this, since I think there will be some changes. Therefore I would like to discuss this first. Since we agreed not to do is as previously, ergo first scan everything and then merge or add, I've tried to do these function recursive. Haven't tested it much, I would like to agree if this would be a good way on how to write it.
2020-11-17 15:25:09 +01:00
8c8715b041 Fix #19 2020-11-12 20:35:33 +01:00
008761e661 Update ccl_io.py
Added correction for area_s as well as requested by Romain
2020-11-10 18:39:25 +01:00
4343d6e2b6 Refactor area method calculation 2020-11-10 16:20:04 +01:00
11e1a6b60c Avoid num_of_peaks intermediate 2020-11-10 15:32:13 +01:00
8be637a7f3 Pin bokeh/2.3 2020-11-10 08:55:48 +01:00
4fbfe21e99 Replace fitparam widgets with DataTable solution 2020-11-09 15:50:50 +01:00
b5b77d165a Make sure to put tags only on master branch 2020-11-09 11:18:04 +01:00
0e176cb2f3 Deploy only on tags 2020-11-07 00:18:42 +01:00
2ba0964e07 Replace travis-ci with github actions 2020-11-06 23:41:52 +01:00
b31f359ee7 Updating for version 0.1.3 2020-11-06 15:04:19 +01:00
63150a4b19 Build only one noarch package with python >=3.6 2020-11-06 15:03:48 +01:00
8d779b11f6 No need for tag message in release script 2020-11-06 13:26:41 +01:00
b2d603b3c5 Add matplotlib and pandas as deps 2020-11-06 11:44:28 +01:00
2ddb0a668a Switch build to noarch package 2020-11-06 10:29:43 +01:00
de81f2fd9f Move cli.py into app folder 2020-11-06 10:28:43 +01:00
15 changed files with 1066 additions and 180 deletions

25
.github/workflows/deployment.yaml vendored Normal file
View File

@ -0,0 +1,25 @@
name: Deployment
on:
push:
tags:
- '*'
jobs:
publish-conda-package:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Prepare
run: |
$CONDA/bin/conda install --quiet --yes conda-build anaconda-client
$CONDA/bin/conda config --append channels conda-forge
$CONDA/bin/conda config --set anaconda_upload yes
- name: Build and upload
env:
ANACONDA_TOKEN: ${{ secrets.ANACONDA_TOKEN }}
run: |
$CONDA/bin/conda build --token $ANACONDA_TOKEN conda-recipe

View File

@ -1,33 +0,0 @@
language: python
python:
- 3.6
- 3.7
- 3.8
# Build only tagged commits
if: tag IS present
before_install:
- wget https://repo.continuum.io/miniconda/Miniconda3-latest-Linux-x86_64.sh -O miniconda.sh
- bash miniconda.sh -b -p $HOME/miniconda
- export PATH="$HOME/miniconda/bin:$PATH"
- conda config --append channels conda-forge
- conda config --set always_yes yes
- conda config --set anaconda_upload no
install:
- conda update -q conda
- conda install -q python=$TRAVIS_PYTHON_VERSION conda-build anaconda-client
script:
- conda build conda-recipe
deploy:
provider: script
script: anaconda -t $ANACONDA_TOKEN upload $HOME/miniconda/conda-bld/**/pyzebra-*.tar.bz2
on:
branch: master
tags: true
notifications:
email: false

2
.vscode/launch.json vendored
View File

@ -5,7 +5,7 @@
"name": "pyzebra",
"type": "python",
"request": "launch",
"program": "${workspaceFolder}/pyzebra/cli.py",
"program": "${workspaceFolder}/pyzebra/app/cli.py",
"console": "internalConsole",
"env": {},
},

2
conda-recipe/bld.bat Normal file
View File

@ -0,0 +1,2 @@
"%PYTHON%" setup.py install --single-version-externally-managed --record=record.txt
if errorlevel 1 exit 1

View File

@ -8,20 +8,23 @@ source:
path: ..
build:
noarch: python
number: 0
entry_points:
- pyzebra = pyzebra.cli:main
- pyzebra = pyzebra.app.cli:main
requirements:
build:
- python
- python >=3.6
- setuptools
run:
- python
- python >=3.6
- numpy
- scipy
- pandas
- h5py
- bokeh
- bokeh =2.2
- matplotlib
- numba
- lmfit
- uncertainties

9
make_release.py Normal file → Executable file
View File

@ -3,14 +3,19 @@
import argparse
import os
import re
import subprocess
def main():
branch = subprocess.check_output("git rev-parse --abbrev-ref HEAD", shell=True).decode().strip()
if branch != "master":
print("Aborting, not on 'master' branch.")
return
filepath = "pyzebra/__init__.py"
parser = argparse.ArgumentParser()
parser.add_argument("level", type=str, choices=["patch", "minor", "major"])
parser.add_argument("tag_msg", type=str, help="tag message")
args = parser.parse_args()
with open(filepath) as f:
@ -35,7 +40,7 @@ def main():
f.write(re.sub(r'__version__ = "(.*?)"', f'__version__ = "{new_version}"', file_content))
os.system(f"git commit {filepath} -m 'Updating for version {new_version}'")
os.system(f"git tag -a {new_version} -m '{args.tag_msg}'")
os.system(f"git tag -a {new_version} -m 'Release {new_version}'")
if __name__ == "__main__":

View File

@ -3,7 +3,7 @@ from pyzebra.ccl_findpeaks import ccl_findpeaks
from pyzebra.fit2 import fitccl
from pyzebra.h5 import *
from pyzebra.ccl_io import load_1D, parse_1D, export_comm
from pyzebra.param_study_moduls import add_dict, auto, merge, scan_dict
from pyzebra.merge_function import unified_merge
from pyzebra.xtal import *
__version__ = "0.1.2"
__version__ = "0.2.0"

View File

@ -9,6 +9,7 @@ from bokeh.models import Tabs, TextAreaInput
import panel_ccl_integrate
import panel_hdf_anatric
import panel_hdf_viewer
import panel_param_study
doc = curdoc()
@ -27,10 +28,11 @@ bokeh_log_textareainput = TextAreaInput(title="server output:", height=150)
tab_hdf_viewer = panel_hdf_viewer.create()
tab_hdf_anatric = panel_hdf_anatric.create()
tab_ccl_integrate = panel_ccl_integrate.create()
tab_param_study = panel_param_study.create()
doc.add_root(
column(
Tabs(tabs=[tab_hdf_viewer, tab_hdf_anatric, tab_ccl_integrate]),
Tabs(tabs=[tab_hdf_viewer, tab_hdf_anatric, tab_ccl_integrate, tab_param_study]),
row(stdout_textareainput, bokeh_log_textareainput, sizing_mode="scale_both"),
)
)

View File

@ -18,7 +18,7 @@ def main():
This is a wrapper around a bokeh server that provides an interface to launch the application,
bundled with the pyzebra package.
"""
app_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "app", "app.py")
app_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "app.py")
parser = argparse.ArgumentParser(
prog="pyzebra", formatter_class=argparse.ArgumentDefaultsHelpFormatter

View File

@ -2,6 +2,7 @@ import base64
import io
import os
import tempfile
import types
from copy import deepcopy
import numpy as np
@ -16,10 +17,13 @@ from bokeh.models import (
DataRange1d,
DataTable,
Div,
Dropdown,
FileInput,
Grid,
Line,
LinearAxis,
MultiSelect,
NumberEditor,
Panel,
PanTool,
Plot,
@ -39,6 +43,7 @@ from bokeh.models import (
)
import pyzebra
from pyzebra.ccl_io import AREA_METHODS
javaScript = """
@ -61,6 +66,7 @@ PROPOSAL_PATH = "/afs/psi.ch/project/sinqdata/2020/zebra/"
def create():
det_data = {}
fit_params = {}
peak_pos_textinput_lock = False
js_data = ColumnDataSource(data=dict(cont=[], ext=[]))
@ -80,6 +86,8 @@ def create():
scan_list = list(det_data["scan"].keys())
hkl = [
f'{int(m["h_index"])} {int(m["k_index"])} {int(m["l_index"])}'
if det_data["meta"]["indices"] == "hkl"
else f'{m["h_index"]} {m["k_index"]} {m["l_index"]}'
for m in det_data["scan"].values()
]
scan_table_source.data.update(
@ -120,9 +128,7 @@ def create():
_, ext = os.path.splitext(append_upload_button.filename)
append_data = pyzebra.parse_1D(file, ext)
added = pyzebra.add_dict(det_data, append_data)
scan_result = pyzebra.auto(pyzebra.scan_dict(added))
det_data = pyzebra.merge(added, added, scan_result)
pyzebra.unified_merge(det_data, append_data)
_init_datatable()
@ -130,7 +136,7 @@ def create():
append_upload_button.on_change("value", append_upload_button_callback)
def _update_table():
num_of_peaks = [scan.get("num_of_peaks", 0) for scan in det_data["scan"].values()]
num_of_peaks = [len(scan.get("peak_indexes", [])) for scan in det_data["scan"].values()]
fit_ok = [(1 if "fit" in scan else 0) for scan in det_data["scan"].values()]
scan_table_source.data.update(peaks=num_of_peaks, fit=fit_ok)
@ -143,7 +149,7 @@ def create():
plot_scatter_source.data.update(x=x, y=y, y_upper=y + np.sqrt(y), y_lower=y - np.sqrt(y))
num_of_peaks = scan.get("num_of_peaks")
num_of_peaks = len(scan.get("peak_indexes", []))
if num_of_peaks is not None and num_of_peaks > 0:
peak_indexes = scan["peak_indexes"]
if len(peak_indexes) == 1:
@ -283,7 +289,6 @@ def create():
if new is not None and not peak_pos_textinput_lock:
scan = _get_selected_scan()
scan["num_of_peaks"] = 1
peak_ind = (np.abs(scan["om"] - float(new))).argmin()
scan["peak_indexes"] = np.array([peak_ind], dtype=np.int64)
scan["peak_heights"] = np.array([scan["smooth_peaks"][peak_ind]])
@ -301,56 +306,111 @@ def create():
window_size_spinner = Spinner(title="Window size:", value=7, step=2, low=1, default_size=145)
poly_order_spinner = Spinner(title="Poly order:", value=3, low=0, default_size=145)
centre_guess = Spinner(default_size=100)
centre_vary = Toggle(default_size=100, active=True)
centre_min = Spinner(default_size=100)
centre_max = Spinner(default_size=100)
sigma_guess = Spinner(default_size=100)
sigma_vary = Toggle(default_size=100, active=True)
sigma_min = Spinner(default_size=100)
sigma_max = Spinner(default_size=100)
ampl_guess = Spinner(default_size=100)
ampl_vary = Toggle(default_size=100, active=True)
ampl_min = Spinner(default_size=100)
ampl_max = Spinner(default_size=100)
slope_guess = Spinner(default_size=100)
slope_vary = Toggle(default_size=100, active=True)
slope_min = Spinner(default_size=100)
slope_max = Spinner(default_size=100)
offset_guess = Spinner(default_size=100)
offset_vary = Toggle(default_size=100, active=True)
offset_min = Spinner(default_size=100)
offset_max = Spinner(default_size=100)
integ_from = Spinner(title="Integrate from:", default_size=145)
integ_to = Spinner(title="to:", default_size=145)
def fitparam_reset_button_callback():
centre_guess.value = None
centre_vary.active = True
centre_min.value = None
centre_max.value = None
sigma_guess.value = None
sigma_vary.active = True
sigma_min.value = None
sigma_max.value = None
ampl_guess.value = None
ampl_vary.active = True
ampl_min.value = None
ampl_max.value = None
slope_guess.value = None
slope_vary.active = True
slope_min.value = None
slope_max.value = None
offset_guess.value = None
offset_vary.active = True
offset_min.value = None
offset_max.value = None
integ_from.value = None
integ_to.value = None
...
fitparam_reset_button = Button(label="Reset to defaults", default_size=145)
fitparam_reset_button = Button(label="Reset to defaults", default_size=145, disabled=True)
fitparam_reset_button.on_click(fitparam_reset_button_callback)
def fitparams_add_dropdown_callback(click):
new_tag = str(fitparams_select.tags[0]) # bokeh requires (str, str) for MultiSelect options
fitparams_select.options.append((new_tag, click.item))
fit_params[new_tag] = fitparams_factory(click.item)
fitparams_select.tags[0] += 1
fitparams_add_dropdown = Dropdown(
label="Add fit function",
menu=[
("Background", "background"),
("Gauss", "gauss"),
("Voigt", "voigt"),
("Pseudo Voigt", "pseudovoigt"),
("Pseudo Voigt1", "pseudovoigt1"),
],
default_size=145,
disabled=True,
)
fitparams_add_dropdown.on_click(fitparams_add_dropdown_callback)
def fitparams_select_callback(_attr, old, new):
# Avoid selection of multiple indicies (via Shift+Click or Ctrl+Click)
if len(new) > 1:
# drop selection to the previous one
fitparams_select.value = old
return
if len(old) > 1:
# skip unnecessary update caused by selection drop
return
if new:
fitparams_table_source.data.update(fit_params[new[0]])
else:
fitparams_table_source.data.update(dict(param=[], guess=[], vary=[], min=[], max=[]))
fitparams_select = MultiSelect(options=[], height=120, default_size=145)
fitparams_select.tags = [0]
fitparams_select.on_change("value", fitparams_select_callback)
def fitparams_remove_button_callback():
if fitparams_select.value:
sel_tag = fitparams_select.value[0]
del fit_params[sel_tag]
for elem in fitparams_select.options:
if elem[0] == sel_tag:
fitparams_select.options.remove(elem)
break
fitparams_select.value = []
fitparams_remove_button = Button(label="Remove fit function", default_size=145, disabled=True)
fitparams_remove_button.on_click(fitparams_remove_button_callback)
def fitparams_factory(function):
if function == "background":
params = ["slope", "offset"]
elif function == "gauss":
params = ["center", "sigma", "amplitude"]
elif function == "voigt":
params = ["center", "sigma", "amplitude", "gamma"]
elif function == "pseudovoigt":
params = ["center", "sigma", "amplitude", "fraction"]
elif function == "pseudovoigt1":
params = ["center", "g_sigma", "l_sigma", "amplitude", "fraction"]
else:
raise ValueError("Unknown fit function")
n = len(params)
fitparams = dict(
param=params, guess=[None] * n, vary=[True] * n, min=[None] * n, max=[None] * n,
)
return fitparams
fitparams_table_source = ColumnDataSource(dict(param=[], guess=[], vary=[], min=[], max=[]))
fitparams_table = DataTable(
source=fitparams_table_source,
columns=[
TableColumn(field="param", title="Parameter"),
TableColumn(field="guess", title="Guess", editor=NumberEditor()),
TableColumn(field="vary", title="Vary", editor=CheckboxEditor()),
TableColumn(field="min", title="Min", editor=NumberEditor()),
TableColumn(field="max", title="Max", editor=NumberEditor()),
],
height=200,
width=350,
index_position=None,
editable=True,
auto_edit=True,
)
# start with `background` and `gauss` fit functions added
fitparams_add_dropdown_callback(types.SimpleNamespace(item="background"))
fitparams_add_dropdown_callback(types.SimpleNamespace(item="gauss"))
fit_output_textinput = TextAreaInput(title="Fit results:", width=450, height=400)
def _get_peakfind_params():
@ -385,34 +445,10 @@ def create():
def _get_fit_params():
return dict(
guess=[
centre_guess.value,
sigma_guess.value,
ampl_guess.value,
slope_guess.value,
offset_guess.value,
],
vary=[
centre_vary.active,
sigma_vary.active,
ampl_vary.active,
slope_vary.active,
offset_vary.active,
],
constraints_min=[
centre_min.value,
sigma_min.value,
ampl_min.value,
slope_min.value,
offset_min.value,
],
constraints_max=[
centre_max.value,
sigma_max.value,
ampl_max.value,
slope_max.value,
offset_max.value,
],
guess=fit_params["1"]["guess"] + fit_params["0"]["guess"],
vary=fit_params["1"]["vary"] + fit_params["0"]["vary"],
constraints_min=fit_params["1"]["min"] + fit_params["0"]["min"],
constraints_max=fit_params["1"]["max"] + fit_params["0"]["max"],
numfit_min=integ_from.value,
numfit_max=integ_to.value,
binning=bin_size_spinner.value,
@ -441,10 +477,10 @@ def create():
fit_button.on_click(fit_button_callback)
def area_method_radiobutton_callback(_attr, _old, new):
det_data["meta"]["area_method"] = ("fit", "integ")[new]
det_data["meta"]["area_method"] = AREA_METHODS[new]
area_method_radiobutton = RadioButtonGroup(
labels=["Fit", "Integral"], active=0, default_size=145
labels=["Fit area", "Int area"], active=0, default_size=145
)
area_method_radiobutton.on_change("active", area_method_radiobutton_callback)
@ -508,31 +544,9 @@ def create():
row(peakfind_button, peakfind_all_button),
)
div_1 = Div(text="Guess:")
div_2 = Div(text="Vary:")
div_3 = Div(text="Min:")
div_4 = Div(text="Max:")
div_5 = Div(text="Gauss Centre:", margin=[5, 5, -5, 5])
div_6 = Div(text="Gauss Sigma:", margin=[5, 5, -5, 5])
div_7 = Div(text="Gauss Ampl.:", margin=[5, 5, -5, 5])
div_8 = Div(text="Slope:", margin=[5, 5, -5, 5])
div_9 = Div(text="Offset:", margin=[5, 5, -5, 5])
fitpeak_controls = row(
column(
Spacer(height=36),
div_1,
Spacer(height=12),
div_2,
Spacer(height=12),
div_3,
Spacer(height=12),
div_4,
),
column(div_5, centre_guess, centre_vary, centre_min, centre_max),
column(div_6, sigma_guess, sigma_vary, sigma_min, sigma_max),
column(div_7, ampl_guess, ampl_vary, ampl_min, ampl_max),
column(div_8, slope_guess, slope_vary, slope_min, slope_max),
column(div_9, offset_guess, offset_vary, offset_min, offset_max),
column(fitparams_add_dropdown, fitparams_select, fitparams_remove_button),
fitparams_table,
Spacer(width=20),
column(
row(integ_from, integ_to),

View File

@ -404,7 +404,7 @@ def create():
colormap.on_change("value", colormap_callback)
colormap.value = "plasma"
radio_button_group = RadioButtonGroup(labels=["nb", "nb_bi"], active=0)
radio_button_group = RadioButtonGroup(labels=["normal beam", "bisecting"], active=0)
STEP = 1
# ---- colormap auto toggle button
@ -506,8 +506,8 @@ def create():
def hkl_button_callback():
index = index_spinner.value
setup_type = "nb_bi" if radio_button_group.active else "nb"
h, k, l = calculate_hkl(det_data, index, setup_type)
geometry = "bi" if radio_button_group.active else "nb"
h, k, l = calculate_hkl(det_data, index, geometry)
image_source.data.update(h=[h], k=[k], l=[l])
hkl_button = Button(label="Calculate hkl (slow)")
@ -553,7 +553,8 @@ def create():
proj_display_min_spinner,
),
)
hkl_layout = column(radio_button_group, hkl_button)
geometry_div = Div(text="Geometry:", margin=[5, 5, -5, 5])
hkl_layout = column(column(geometry_div, radio_button_group), hkl_button)
params_layout = row(magnetic_field_spinner, temperature_spinner)
layout_controls = row(
@ -586,7 +587,7 @@ def create():
return Panel(child=tab_layout, title="hdf viewer")
def calculate_hkl(det_data, index, setup_type="nb_bi"):
def calculate_hkl(det_data, index, geometry):
h = np.empty(shape=(IMAGE_H, IMAGE_W))
k = np.empty(shape=(IMAGE_H, IMAGE_W))
l = np.empty(shape=(IMAGE_H, IMAGE_W))
@ -598,14 +599,14 @@ def calculate_hkl(det_data, index, setup_type="nb_bi"):
nud = det_data["tlt_angle"]
ub = det_data["UB"]
if setup_type == "nb_bi":
if geometry == "bi":
ch = det_data["chi_angle"][index]
ph = det_data["phi_angle"][index]
elif setup_type == "nb":
elif geometry == "nb":
ch = 0
ph = 0
else:
raise ValueError(f"Unknown setup type '{setup_type}'")
raise ValueError(f"Unknown geometry type '{geometry}'")
for xi in np.arange(IMAGE_W):
for yi in np.arange(IMAGE_H):

View File

@ -0,0 +1,574 @@
import base64
import io
import os
import tempfile
import types
from copy import deepcopy
import numpy as np
from bokeh.layouts import column, row
from bokeh.models import (
Asterisk,
BasicTicker,
Button,
CheckboxEditor,
ColumnDataSource,
CustomJS,
DataRange1d,
DataTable,
Div,
Dropdown,
FileInput,
Grid,
Line,
LinearAxis,
MultiSelect,
NumberEditor,
Panel,
PanTool,
Plot,
RadioButtonGroup,
ResetTool,
Scatter,
Select,
Spacer,
Span,
Spinner,
TableColumn,
TextAreaInput,
TextInput,
Toggle,
WheelZoomTool,
Whisker,
)
import pyzebra
from pyzebra.ccl_io import AREA_METHODS
javaScript = """
setTimeout(function() {
const filename = 'output' + js_data.data['ext']
const blob = new Blob([js_data.data['cont']], {type: 'text/plain'})
const link = document.createElement('a');
document.body.appendChild(link);
const url = window.URL.createObjectURL(blob);
link.href = url;
link.download = filename;
link.click();
window.URL.revokeObjectURL(url);
document.body.removeChild(link);
}, 500);
"""
PROPOSAL_PATH = "/afs/psi.ch/project/sinqdata/2020/zebra/"
def create():
det_data = {}
fit_params = {}
peak_pos_textinput_lock = False
js_data = ColumnDataSource(data=dict(cont=[], ext=[]))
def proposal_textinput_callback(_attr, _old, new):
ccl_path = os.path.join(PROPOSAL_PATH, new.strip())
ccl_file_list = []
for file in os.listdir(ccl_path):
if file.endswith(".ccl"):
ccl_file_list.append((os.path.join(ccl_path, file), file))
file_select.options = ccl_file_list
file_select.value = ccl_file_list[0][0]
proposal_textinput = TextInput(title="Enter proposal number:", default_size=145, disabled=True)
proposal_textinput.on_change("value", proposal_textinput_callback)
def _init_datatable():
scan_table_source.data.update(
file=list(det_data.keys()),
param=[""] * len(det_data),
peaks=[0] * len(det_data),
fit=[0] * len(det_data),
export=[True] * len(det_data),
)
scan_table_source.selected.indices = []
scan_table_source.selected.indices = [0]
def file_select_callback(_attr, _old, new):
nonlocal det_data
with open(new) as file:
_, ext = os.path.splitext(new)
det_data = pyzebra.parse_1D(file, ext)
_init_datatable()
file_select = Select(title="Available .dat files", disabled=True)
file_select.on_change("value", file_select_callback)
def upload_button_callback(_attr, _old, new):
nonlocal det_data
det_data = {}
for f_str, f_name in zip(new, upload_button.filename):
with io.StringIO(base64.b64decode(f_str).decode()) as file:
_, ext = os.path.splitext(f_name)
det_data[f_name] = pyzebra.parse_1D(file, ext)
_init_datatable()
upload_button = FileInput(accept=".dat", multiple=True)
upload_button.on_change("value", upload_button_callback)
def append_upload_button_callback(_attr, _old, new):
nonlocal det_data
for f_str, f_name in zip(new, append_upload_button.filename):
with io.StringIO(base64.b64decode(f_str).decode()) as file:
_, ext = os.path.splitext(f_name)
det_data[f_name] = pyzebra.parse_1D(file, ext)
_init_datatable()
append_upload_button = FileInput(accept=".dat", multiple=True)
append_upload_button.on_change("value", append_upload_button_callback)
def _update_table():
num_of_peaks = [len(scan["scan"][1].get("peak_indexes", [])) for scan in det_data.values()]
fit_ok = [(1 if "fit" in scan["scan"][1] else 0) for scan in det_data.values()]
scan_table_source.data.update(peaks=num_of_peaks, fit=fit_ok)
def _update_plot(scan):
nonlocal peak_pos_textinput_lock
peak_pos_textinput_lock = True
y = scan["Counts"]
x = scan["om"]
plot_scatter_source.data.update(x=x, y=y, y_upper=y + np.sqrt(y), y_lower=y - np.sqrt(y))
num_of_peaks = len(scan.get("peak_indexes", []))
if num_of_peaks is not None and num_of_peaks > 0:
peak_indexes = scan["peak_indexes"]
if len(peak_indexes) == 1:
peak_pos_textinput.value = str(x[peak_indexes[0]])
else:
peak_pos_textinput.value = str([x[ind] for ind in peak_indexes])
plot_peak_source.data.update(x=x[peak_indexes], y=scan["peak_heights"])
plot_line_smooth_source.data.update(x=x, y=scan["smooth_peaks"])
else:
peak_pos_textinput.value = None
plot_peak_source.data.update(x=[], y=[])
plot_line_smooth_source.data.update(x=[], y=[])
peak_pos_textinput_lock = False
fit = scan.get("fit")
if fit is not None:
x = scan["fit"]["x_fit"]
plot_gauss_source.data.update(x=x, y=scan["fit"]["comps"]["gaussian"])
plot_bkg_source.data.update(x=x, y=scan["fit"]["comps"]["background"])
params = fit["result"].params
fit_output_textinput.value = (
"Gaussian: centre = %9.4f, sigma = %9.4f, area = %9.4f \n"
"background: slope = %9.4f, intercept = %9.4f \n"
"Int. area = %9.4f +/- %9.4f \n"
"fit area = %9.4f +/- %9.4f \n"
"ratio((fit-int)/fit) = %9.4f"
% (
params["g_cen"].value,
params["g_width"].value,
params["g_amp"].value,
params["slope"].value,
params["intercept"].value,
fit["int_area"].n,
fit["int_area"].s,
params["g_amp"].value,
params["g_amp"].stderr,
(params["g_amp"].value - fit["int_area"].n) / params["g_amp"].value,
)
)
numfit_min, numfit_max = fit["numfit"]
if numfit_min is None:
numfit_min_span.location = None
else:
numfit_min_span.location = x[numfit_min]
if numfit_max is None:
numfit_max_span.location = None
else:
numfit_max_span.location = x[numfit_max]
else:
plot_gauss_source.data.update(x=[], y=[])
plot_bkg_source.data.update(x=[], y=[])
fit_output_textinput.value = ""
numfit_min_span.location = None
numfit_max_span.location = None
# Main plot
plot = Plot(x_range=DataRange1d(), y_range=DataRange1d(), plot_height=400, plot_width=700)
plot.add_layout(LinearAxis(axis_label="Counts"), place="left")
plot.add_layout(LinearAxis(axis_label="Omega"), place="below")
plot.add_layout(Grid(dimension=0, ticker=BasicTicker()))
plot.add_layout(Grid(dimension=1, ticker=BasicTicker()))
plot_scatter_source = ColumnDataSource(dict(x=[0], y=[0], y_upper=[0], y_lower=[0]))
plot.add_glyph(plot_scatter_source, Scatter(x="x", y="y", line_color="steelblue"))
plot.add_layout(Whisker(source=plot_scatter_source, base="x", upper="y_upper", lower="y_lower"))
plot_line_smooth_source = ColumnDataSource(dict(x=[0], y=[0]))
plot.add_glyph(
plot_line_smooth_source, Line(x="x", y="y", line_color="steelblue", line_dash="dashed")
)
plot_gauss_source = ColumnDataSource(dict(x=[0], y=[0]))
plot.add_glyph(plot_gauss_source, Line(x="x", y="y", line_color="red", line_dash="dashed"))
plot_bkg_source = ColumnDataSource(dict(x=[0], y=[0]))
plot.add_glyph(plot_bkg_source, Line(x="x", y="y", line_color="green", line_dash="dashed"))
plot_peak_source = ColumnDataSource(dict(x=[], y=[]))
plot.add_glyph(plot_peak_source, Asterisk(x="x", y="y", size=10, line_color="red"))
numfit_min_span = Span(location=None, dimension="height", line_dash="dashed")
plot.add_layout(numfit_min_span)
numfit_max_span = Span(location=None, dimension="height", line_dash="dashed")
plot.add_layout(numfit_max_span)
plot.add_tools(PanTool(), WheelZoomTool(), ResetTool())
plot.toolbar.logo = None
# Scan select
def scan_table_select_callback(_attr, old, new):
if not new:
# skip empty selections
return
# Avoid selection of multiple indicies (via Shift+Click or Ctrl+Click)
if len(new) > 1:
# drop selection to the previous one
scan_table_source.selected.indices = old
return
if len(old) > 1:
# skip unnecessary update caused by selection drop
return
f_name = scan_table_source.data["file"][new[0]]
_update_plot(det_data[f_name]["scan"][1])
scan_table_source = ColumnDataSource(dict(file=[], param=[], peaks=[], fit=[], export=[]))
scan_table = DataTable(
source=scan_table_source,
columns=[
TableColumn(field="file", title="file", width=150),
TableColumn(field="param", title="param", width=50),
TableColumn(field="peaks", title="Peaks", width=50),
TableColumn(field="fit", title="Fit", width=50),
TableColumn(field="export", title="Export", editor=CheckboxEditor(), width=50),
],
width=350,
index_position=None,
editable=True,
fit_columns=False,
)
scan_table_source.selected.on_change("indices", scan_table_select_callback)
def _get_selected_scan():
selected_index = scan_table_source.selected.indices[0]
selected_file_name = scan_table_source.data["file"][selected_index]
return det_data[selected_file_name]["scan"][1]
def peak_pos_textinput_callback(_attr, _old, new):
if new is not None and not peak_pos_textinput_lock:
scan = _get_selected_scan()
peak_ind = (np.abs(scan["om"] - float(new))).argmin()
scan["peak_indexes"] = np.array([peak_ind], dtype=np.int64)
scan["peak_heights"] = np.array([scan["smooth_peaks"][peak_ind]])
_update_table()
_update_plot(scan)
peak_pos_textinput = TextInput(title="Peak position:", default_size=145)
peak_pos_textinput.on_change("value", peak_pos_textinput_callback)
peak_int_ratio_spinner = Spinner(
title="Peak intensity ratio:", value=0.8, step=0.01, low=0, high=1, default_size=145
)
peak_prominence_spinner = Spinner(title="Peak prominence:", value=50, low=0, default_size=145)
smooth_toggle = Toggle(label="Smooth curve", default_size=145)
window_size_spinner = Spinner(title="Window size:", value=7, step=2, low=1, default_size=145)
poly_order_spinner = Spinner(title="Poly order:", value=3, low=0, default_size=145)
integ_from = Spinner(title="Integrate from:", default_size=145)
integ_to = Spinner(title="to:", default_size=145)
def fitparam_reset_button_callback():
...
fitparam_reset_button = Button(label="Reset to defaults", default_size=145, disabled=True)
fitparam_reset_button.on_click(fitparam_reset_button_callback)
def fitparams_add_dropdown_callback(click):
new_tag = str(fitparams_select.tags[0]) # bokeh requires (str, str) for MultiSelect options
fitparams_select.options.append((new_tag, click.item))
fit_params[new_tag] = fitparams_factory(click.item)
fitparams_select.tags[0] += 1
fitparams_add_dropdown = Dropdown(
label="Add fit function",
menu=[
("Background", "background"),
("Gauss", "gauss"),
("Voigt", "voigt"),
("Pseudo Voigt", "pseudovoigt"),
("Pseudo Voigt1", "pseudovoigt1"),
],
default_size=145,
)
fitparams_add_dropdown.on_click(fitparams_add_dropdown_callback)
def fitparams_select_callback(_attr, old, new):
# Avoid selection of multiple indicies (via Shift+Click or Ctrl+Click)
if len(new) > 1:
# drop selection to the previous one
fitparams_select.value = old
return
if len(old) > 1:
# skip unnecessary update caused by selection drop
return
if new:
fitparams_table_source.data.update(fit_params[new[0]])
else:
fitparams_table_source.data.update(dict(param=[], guess=[], vary=[], min=[], max=[]))
fitparams_select = MultiSelect(options=[], height=120, default_size=145)
fitparams_select.tags = [0]
fitparams_select.on_change("value", fitparams_select_callback)
def fitparams_remove_button_callback():
if fitparams_select.value:
sel_tag = fitparams_select.value[0]
del fit_params[sel_tag]
for elem in fitparams_select.options:
if elem[0] == sel_tag:
fitparams_select.options.remove(elem)
break
fitparams_select.value = []
fitparams_remove_button = Button(label="Remove fit function", default_size=145)
fitparams_remove_button.on_click(fitparams_remove_button_callback)
def fitparams_factory(function):
if function == "background":
params = ["slope", "offset"]
elif function == "gauss":
params = ["center", "sigma", "amplitude"]
elif function == "voigt":
params = ["center", "sigma", "amplitude", "gamma"]
elif function == "pseudovoigt":
params = ["center", "sigma", "amplitude", "fraction"]
elif function == "pseudovoigt1":
params = ["center", "g_sigma", "l_sigma", "amplitude", "fraction"]
else:
raise ValueError("Unknown fit function")
n = len(params)
fitparams = dict(
param=params, guess=[None] * n, vary=[True] * n, min=[None] * n, max=[None] * n,
)
return fitparams
fitparams_table_source = ColumnDataSource(dict(param=[], guess=[], vary=[], min=[], max=[]))
fitparams_table = DataTable(
source=fitparams_table_source,
columns=[
TableColumn(field="param", title="Parameter"),
TableColumn(field="guess", title="Guess", editor=NumberEditor()),
TableColumn(field="vary", title="Vary", editor=CheckboxEditor()),
TableColumn(field="min", title="Min", editor=NumberEditor()),
TableColumn(field="max", title="Max", editor=NumberEditor()),
],
height=200,
width=350,
index_position=None,
editable=True,
auto_edit=True,
)
# start with `background` and `gauss` fit functions added
fitparams_add_dropdown_callback(types.SimpleNamespace(item="background"))
fitparams_add_dropdown_callback(types.SimpleNamespace(item="gauss"))
fit_output_textinput = TextAreaInput(title="Fit results:", width=450, height=400)
def _get_peakfind_params():
return dict(
int_threshold=peak_int_ratio_spinner.value,
prominence=peak_prominence_spinner.value,
smooth=smooth_toggle.active,
window_size=window_size_spinner.value,
poly_order=poly_order_spinner.value,
)
def peakfind_all_button_callback():
peakfind_params = _get_peakfind_params()
for dat_file in det_data.values():
pyzebra.ccl_findpeaks(dat_file["scan"][1], **peakfind_params)
_update_table()
_update_plot(_get_selected_scan())
peakfind_all_button = Button(label="Peak Find All", button_type="primary", default_size=145)
peakfind_all_button.on_click(peakfind_all_button_callback)
def peakfind_button_callback():
scan = _get_selected_scan()
pyzebra.ccl_findpeaks(scan, **_get_peakfind_params())
_update_table()
_update_plot(scan)
peakfind_button = Button(label="Peak Find Current", default_size=145)
peakfind_button.on_click(peakfind_button_callback)
def _get_fit_params():
return dict(
guess=fit_params["1"]["guess"] + fit_params["0"]["guess"],
vary=fit_params["1"]["vary"] + fit_params["0"]["vary"],
constraints_min=fit_params["1"]["min"] + fit_params["0"]["min"],
constraints_max=fit_params["1"]["max"] + fit_params["0"]["max"],
numfit_min=integ_from.value,
numfit_max=integ_to.value,
binning=bin_size_spinner.value,
)
def fit_all_button_callback():
fit_params = _get_fit_params()
for dat_file in det_data.values():
# fit_params are updated inplace within `fitccl`
pyzebra.fitccl(dat_file["scan"][1], **deepcopy(fit_params))
_update_plot(_get_selected_scan())
_update_table()
fit_all_button = Button(label="Fit All", button_type="primary", default_size=145)
fit_all_button.on_click(fit_all_button_callback)
def fit_button_callback():
scan = _get_selected_scan()
pyzebra.fitccl(scan, **_get_fit_params())
_update_plot(scan)
_update_table()
fit_button = Button(label="Fit Current", default_size=145)
fit_button.on_click(fit_button_callback)
def area_method_radiobutton_callback(_attr, _old, new):
det_data["meta"]["area_method"] = AREA_METHODS[new]
area_method_radiobutton = RadioButtonGroup(
labels=["Fit area", "Int area"], active=0, default_size=145, disabled=True
)
area_method_radiobutton.on_change("active", area_method_radiobutton_callback)
bin_size_spinner = Spinner(title="Bin size:", value=1, low=1, step=1, default_size=145)
lorentz_toggle = Toggle(label="Lorentz Correction", default_size=145, disabled=True)
preview_output_textinput = TextAreaInput(
title="Export file preview:", width=450, height=400, disabled=True
)
def preview_output_button_callback():
if det_data["meta"]["indices"] == "hkl":
ext = ".comm"
elif det_data["meta"]["indices"] == "real":
ext = ".incomm"
with tempfile.TemporaryDirectory() as temp_dir:
temp_file = temp_dir + "/temp"
export_data = deepcopy(det_data)
for s, export in zip(scan_table_source.data["scan"], scan_table_source.data["export"]):
if not export:
del export_data["scan"][s]
pyzebra.export_comm(export_data, temp_file, lorentz=lorentz_toggle.active)
with open(f"{temp_file}{ext}") as f:
preview_output_textinput.value = f.read()
preview_output_button = Button(label="Preview file", default_size=220, disabled=True)
preview_output_button.on_click(preview_output_button_callback)
def export_results(det_data):
if det_data["meta"]["indices"] == "hkl":
ext = ".comm"
elif det_data["meta"]["indices"] == "real":
ext = ".incomm"
with tempfile.TemporaryDirectory() as temp_dir:
temp_file = temp_dir + "/temp"
export_data = deepcopy(det_data)
for s, export in zip(scan_table_source.data["scan"], scan_table_source.data["export"]):
if not export:
del export_data["scan"][s]
pyzebra.export_comm(export_data, temp_file, lorentz=lorentz_toggle.active)
with open(f"{temp_file}{ext}") as f:
output_content = f.read()
return output_content, ext
def save_button_callback():
cont, ext = export_results(det_data)
js_data.data.update(cont=[cont], ext=[ext])
save_button = Button(
label="Download file", button_type="success", default_size=220, disabled=True
)
save_button.on_click(save_button_callback)
save_button.js_on_click(CustomJS(args={"js_data": js_data}, code=javaScript))
findpeak_controls = column(
row(peak_pos_textinput, column(Spacer(height=19), smooth_toggle)),
row(peak_int_ratio_spinner, peak_prominence_spinner),
row(window_size_spinner, poly_order_spinner),
row(peakfind_button, peakfind_all_button),
)
fitpeak_controls = row(
column(fitparams_add_dropdown, fitparams_select, fitparams_remove_button),
fitparams_table,
Spacer(width=20),
column(
row(integ_from, integ_to),
row(bin_size_spinner, column(Spacer(height=19), lorentz_toggle)),
row(fitparam_reset_button, area_method_radiobutton),
row(fit_button, fit_all_button),
),
)
export_layout = column(preview_output_textinput, row(preview_output_button, save_button))
upload_div = Div(text="Or upload .dat files:")
append_upload_div = Div(text="append extra .dat files:")
tab_layout = column(
row(proposal_textinput, file_select),
row(
column(Spacer(height=5), upload_div),
upload_button,
column(Spacer(height=5), append_upload_div),
append_upload_button,
),
row(scan_table, plot, Spacer(width=30), fit_output_textinput, export_layout),
row(findpeak_controls, Spacer(width=30), fitpeak_controls),
)
return Panel(child=tab_layout, title="param study")

View File

@ -29,11 +29,6 @@ def ccl_findpeaks(
window_size - window size for savgol filter, must be odd positive integer
poly_order = order of the polynomial used in savgol filter, must be positive integer smaller than
window_size returns: dictionary with following structure:
D{M34{ 'num_of_peaks': 1, #num of peaks
'peak_indexes': [20], # index of peaks in omega array
'peak_heights': [90.], # height of the peaks (if data vere smoothed
its the heigh of the peaks in smoothed data)
"""
if not 0 <= int_threshold <= 1:
int_threshold = 0.8
@ -75,7 +70,6 @@ def ccl_findpeaks(
peaks, properties = sc.signal.find_peaks(
smooth_peaks, height=int_threshold * max(smooth_peaks), prominence=prominence
)
scan["num_of_peaks"] = len(peaks)
scan["peak_indexes"] = peaks
scan["peak_heights"] = properties["peak_heights"]
scan["smooth_peaks"] = smooth_peaks # smoothed curve

View File

@ -55,6 +55,7 @@ META_VARS_FLOAT = (
"s2hr",
"s2hl",
)
META_UB_MATRIX = ("ub1j", "ub2j", "ub3j")
CCL_FIRST_LINE = (
@ -90,6 +91,8 @@ CCL_SECOND_LINE = (
("scan_type", str),
)
AREA_METHODS = ("fit_area", "int_area")
def load_1D(filepath):
"""
@ -182,6 +185,8 @@ def parse_1D(fileobj, data_type):
except (ValueError, IndexError):
print("seems hkl is not in title")
data_cols["om"] = np.array(data_cols["om"])
data_cols["temperature"] = metadata["temp"]
try:
data_cols["mag_field"] = metadata["mf"]
@ -196,7 +201,8 @@ def parse_1D(fileobj, data_type):
data_cols["phi_angle"] = metadata["phi"]
data_cols["nu_angle"] = metadata["nu"]
scan[1] = dict(data_cols)
data_cols["scan_number"] = 1
scan[data_cols["scan_number"]] = dict(data_cols)
else:
print("Unknown file extention")
@ -211,7 +217,7 @@ def parse_1D(fileobj, data_type):
metadata["indices"] = "real"
metadata["data_type"] = data_type
metadata["area_method"] = "fit"
metadata["area_method"] = AREA_METHODS[0]
return {"meta": metadata, "scan": scan}
@ -226,10 +232,8 @@ def export_comm(data, path, lorentz=False):
zebra_mode = data["meta"]["zebra_mode"]
if data["meta"]["indices"] == "hkl":
extension = ".comm"
padding = [6, 4]
elif data["meta"]["indices"] == "real":
else: # data["meta"]["indices"] == "real":
extension = ".incomm"
padding = [4, 6]
with open(str(path + extension), "w") as out_file:
for key, scan in data["scan"].items():
@ -237,34 +241,35 @@ def export_comm(data, path, lorentz=False):
print("Scan skipped - no fit value for:", key)
continue
scan_str = f"{key:>{padding[0]}}"
h_str = f'{int(scan["h_index"]):{padding[1]}}'
k_str = f'{int(scan["k_index"]):{padding[1]}}'
l_str = f'{int(scan["l_index"]):{padding[1]}}'
scan_str = f"{key:6}"
if data["meta"]["area_method"] == "fit":
area = scan["fit"]["fit_area"].n
sigma_str = f'{scan["fit"]["fit_area"].s:>10.2f}'
elif data["meta"]["area_method"] == "integ":
area = scan["fit"]["int_area"].n
sigma_str = f'{scan["fit"]["int_area"].s:>10.2f}'
h, k, l = scan["h_index"], scan["k_index"], scan["l_index"]
if data["meta"]["indices"] == "hkl":
hkl_str = f"{int(h):6}{int(k):6}{int(l):6}"
else: # data["meta"]["indices"] == "real"
hkl_str = f"{h:8.4g}{k:8.4g}{l:8.4g}"
area_method = data["meta"]["area_method"]
area_n = scan["fit"][area_method].n
area_s = scan["fit"][area_method].s
# apply lorentz correction to area
if lorentz:
if zebra_mode == "bi":
twotheta_angle = np.deg2rad(scan["twotheta_angle"])
corr_factor = np.sin(twotheta_angle)
elif zebra_mode == "nb":
else: # zebra_mode == "nb":
gamma_angle = np.deg2rad(scan["gamma_angle"])
nu_angle = np.deg2rad(scan["nu_angle"])
corr_factor = np.sin(gamma_angle) * np.cos(nu_angle)
area = np.abs(area * corr_factor)
area_n = np.abs(area_n * corr_factor)
area_s = np.abs(area_s * corr_factor)
area_str = f"{area:>10.2f}"
area_str = f"{area_n:10.2f}{area_s:10.2f}"
ang_str = ""
for angle, _ in CCL_ANGLES[zebra_mode]:
ang_str = ang_str + f"{scan[angle]:8}"
out_file.write(scan_str + h_str + k_str + l_str + area_str + sigma_str + ang_str + "\n")
out_file.write(scan_str + hkl_str + area_str + ang_str + "\n")

294
pyzebra/merge_function.py Normal file
View File

@ -0,0 +1,294 @@
import numpy as np
import uncertainties as u
def create_tuples(x, y, y_err):
"""creates tuples for sorting and merginng of the data
Counts need to be normalized to monitor before"""
t = list()
for i in range(len(x)):
tup = (x[i], y[i], y_err[i])
t.append(tup)
return t
def normalize(scan, monitor):
"""Normalizes the measurement to monitor, checks if sigma exists, otherwise creates it
:arg dict : dictionary to from which to tkae the scan
:arg key : which scan to normalize from dict1
:arg monitor : final monitor
:return counts - normalized counts
:return sigma - normalized sigma"""
counts = np.array(scan["Counts"])
sigma = np.sqrt(counts) if "sigma" not in scan else scan["sigma"]
monitor_ratio = monitor / scan["monitor"]
scaled_counts = counts * monitor_ratio
scaled_sigma = np.array(sigma) * monitor_ratio
return scaled_counts, scaled_sigma
def merge(scan1, scan2, keep=True, monitor=100000):
"""merges the two tuples and sorts them, if om value is same, Counts value is average
averaging is propagated into sigma if dict1 == dict2, key[1] is deleted after merging
:arg dict1 : dictionary to which measurement will be merged
:arg dict2 : dictionary from which measurement will be merged
:arg scand_dict_result : result of scan_dict after auto function
:arg keep : if true, when monitors are same, does not change it, if flase, takes monitor
always
:arg monitor : final monitor after merging
note: dict1 and dict2 can be same dict
:return dict1 with merged scan"""
if keep:
if scan1["monitor"] == scan2["monitor"]:
monitor = scan1["monitor"]
# load om and Counts
x1, x2 = scan1["om"], scan2["om"]
cor_y1, y_err1 = normalize(scan1, monitor=monitor)
cor_y2, y_err2 = normalize(scan2, monitor=monitor)
# creates touples (om, Counts, sigma) for sorting and further processing
tuple_list = create_tuples(x1, cor_y1, y_err1) + create_tuples(x2, cor_y2, y_err2)
# Sort the list on om and add 0 0 0 tuple to the last position
sorted_t = sorted(tuple_list, key=lambda tup: tup[0])
sorted_t.append((0, 0, 0))
om, Counts, sigma = [], [], []
seen = list()
for i in range(len(sorted_t) - 1):
if sorted_t[i][0] not in seen:
if sorted_t[i][0] != sorted_t[i + 1][0]:
om = np.append(om, sorted_t[i][0])
Counts = np.append(Counts, sorted_t[i][1])
sigma = np.append(sigma, sorted_t[i][2])
else:
om = np.append(om, sorted_t[i][0])
counts1, counts2 = sorted_t[i][1], sorted_t[i + 1][1]
sigma1, sigma2 = sorted_t[i][2], sorted_t[i + 1][2]
count_err1 = u.ufloat(counts1, sigma1)
count_err2 = u.ufloat(counts2, sigma2)
avg = (count_err1 + count_err2) / 2
Counts = np.append(Counts, avg.n)
sigma = np.append(sigma, avg.s)
seen.append(sorted_t[i][0])
else:
continue
scan1["om"] = om
scan1["Counts"] = Counts
scan1["sigma"] = sigma
scan1["monitor"] = monitor
print("merging done")
def check_UB(dict1, dict2, precision=0.01):
truth_list = list()
for i in ["ub1j", "ub2j", "ub3j"]:
for j in range(3):
if abs(abs(float(dict1["meta"][i][j])) - abs(float(dict2["meta"][i][j]))) < precision:
truth_list.append(True)
else:
truth_list.append(False)
# print(truth_list)
if all(truth_list):
return True
else:
return False
def check_zebramode(dict1, dict2):
if dict1["meta"]["zebra_mode"] == dict2["meta"]["zebra_mode"]:
return True
else:
return False
def check_angles(scan1, scan2, angles, precision):
truth_list = list()
for item in angles:
if abs(abs(scan1[item]) - abs(scan2[item])) <= precision[item]:
truth_list.append(True)
else:
truth_list.append(False)
if all(truth_list):
return True
else:
return False
def check_temp_mag(scan1, scan2):
temp_diff = 1
mag_diff = 0.001
truth_list = list()
try:
if abs(abs(scan1["mag_field"]) - abs(scan2["mag_field"])) <= mag_diff:
truth_list.append(True)
else:
truth_list.append(False)
except KeyError:
print("mag_field missing")
try:
if abs(abs(scan1["temperature"]) - abs(scan2["temperature"])) <= temp_diff:
truth_list.append(True)
else:
truth_list.append(False)
except KeyError:
print("temperature missing")
if all(truth_list):
return True
else:
return False
def merge_dups(dictionary, angles):
precision = {
"twotheta_angle": 0.1,
"chi_angle": 0.1,
"nu_angle": 0.1,
"phi_angle": 0.05,
"omega_angle": 0.05,
"gamma_angle": 0.05,
}
for i in list(dictionary["scan"]):
for j in list(dictionary["scan"]):
if i == j:
continue
else:
# print(i, j)
if check_angles(dictionary["scan"][i], dictionary["scan"][j], angles, precision):
merge(dictionary["scan"][i], dictionary["scan"][j])
print("merged %d with %d" % (i, j))
del dictionary["scan"][j]
merge_dups(dictionary, angles)
break
else:
continue
break
def add_scan(dict1, dict2, scan_to_add):
max_scan = np.max(list(dict1["scan"]))
dict1["scan"][max_scan + 1] = dict2["scan"][scan_to_add]
del dict2["scan"][scan_to_add]
def process(dict1, dict2, angles, precision):
# stop when the second dict is empty
# print(dict2["scan"])
if dict2["scan"]:
print("doing something")
# check UB matrixes
if check_UB(dict1, dict2):
# iterate over second dict and check for matches
for i in list(dict2["scan"]):
for j in list(dict1["scan"]):
if check_angles(dict1["scan"][j], dict2["scan"][i], angles, precision):
# angles good, see the mag and temp
if check_temp_mag(dict1["scan"][j], dict2["scan"][i]):
merge(dict1["scan"][j], dict2["scan"][i])
print("merged")
del dict2["scan"][i]
process(dict1, dict2, angles, precision)
break
else:
add_scan(dict1, dict2, i)
print("scan added r")
process(dict1, dict2, angles, precision)
break
else:
add_scan(dict1, dict2, i)
print("scan added l")
process(dict1, dict2, angles, precision)
break
else:
continue
break
else:
# ask user if he really wants to add
print("UBs are different, do you really wish to add datasets? Y/N")
dict1 = add_dict(dict1, dict2)
return
"""
1. check for bisecting or normal beam geometry in data files; select stt, om, chi, phi for bisecting; select stt, om, nu for normal beam
2. in the ccl files, check for identical stt, chi and nu within 0.1 degree, and, at the same time, for identical om and phi within 0.05 degree;
3. in the dat files, check for identical stt, chi and nu within 0.1 degree, and, at the same time,
for identical phi within 0.05 degree, and, at the same time, for identical om within 5 degree."""
def unified_merge(dict1, dict2):
if not check_zebramode(dict1, dict2):
print("You are trying to add two files with different zebra mdoe")
return
# decide angles
if dict1["meta"]["zebra_mode"] == "bi":
angles = ["twotheta_angle", "omega_angle", "chi_angle", "phi_angle"]
elif dict1["meta"]["zebra_mode"] == "nb":
angles = ["gamma_angle", "omega_angle", "nu_angle"]
# precision of angles to check
precision = {
"twotheta_angle": 0.1,
"chi_angle": 0.1,
"nu_angle": 0.1,
"phi_angle": 0.05,
"omega_angle": 5,
"gamma_angle": 0.05,
}
if (dict1["meta"]["data_type"] == "ccl") and (dict2["meta"]["data_type"] == "ccl"):
precision["omega_angle"] = 0.05
# check for duplicates in original files
for d in dict1, dict2:
# no duplicates in dats
if d["meta"]["data_type"] == "dat":
continue
else:
merge_dups(d, angles)
process(dict1, dict2, angles, precision)
def add_dict(dict1, dict2):
"""adds two dictionaries, meta of the new is saved as meata+original_filename and
measurements are shifted to continue with numbering of first dict
:arg dict1 : dictionarry to add to
:arg dict2 : dictionarry from which to take the measurements
:return dict1 : combined dictionary
Note: dict1 must be made from ccl, otherwise we would have to change the structure of loaded
dat file"""
try:
if dict1["meta"]["zebra_mode"] != dict2["meta"]["zebra_mode"]:
print("You are trying to add scans measured with different zebra modes")
return
# this is for the qscan case
except KeyError:
print("Zebra mode not specified")
max_measurement_dict1 = max([keys for keys in dict1["scan"]])
new_filenames = np.arange(
max_measurement_dict1 + 1, max_measurement_dict1 + 1 + len(dict2["scan"])
)
new_meta_name = "meta" + str(dict2["meta"]["original_filename"])
if new_meta_name not in dict1:
for keys, name in zip(dict2["scan"], new_filenames):
dict2["scan"][keys]["file_of_origin"] = str(dict2["meta"]["original_filename"])
dict1["scan"][name] = dict2["scan"][keys]
dict1[new_meta_name] = dict2["meta"]
else:
raise KeyError(
str(
"The file %s has alredy been added to %s"
% (dict2["meta"]["original_filename"], dict1["meta"]["original_filename"])
)
)
return dict1