74 Commits
0.1.0 ... 0.2.1

Author SHA1 Message Date
cf6f7a8506 Updating for version 0.2.1 2020-12-09 16:46:53 +01:00
654d281c49 Organize plots in Tabs 2020-12-09 16:44:13 +01:00
950f76d4be Enable dat files discovery via proposal number 2020-12-01 18:34:37 +01:00
238f3e4fbc Use the same figure for main and overview plots 2020-12-01 18:12:07 +01:00
826363a0f5 Use add_dict instead of unified_merge for param study 2020-12-01 17:35:06 +01:00
4822121b3b Use color instead of an offset for overview plot 2020-12-01 17:34:57 +01:00
56609ad5ff Add basic systemd service files 2020-12-01 11:39:38 +01:00
a9b0a8a01d Use temp dir for saving debug.xml for anatric 2020-11-30 17:30:57 +01:00
aa6bcb6c6b Add experimental overview plot 2020-11-24 20:37:10 +01:00
216de442a5 Display filename in the scan table 2020-11-24 16:14:43 +01:00
9507339c2a Experiment with unified_merge 2020-11-24 13:40:31 +01:00
0c158db48f Add hkl-precision select
For #19
2020-11-24 11:01:55 +01:00
9bd959e656 Updating for version 0.2.0 2020-11-23 16:56:26 +01:00
65b28fffc6 Fix deps 2020-11-23 16:56:19 +01:00
0d8a30b995 Fix real indices display and export 2020-11-23 16:45:39 +01:00
c602a3df2e Adapt param study to work with a set of dat files 2020-11-23 16:07:31 +01:00
b19b70caae Add scan_number to dat-file metadata 2020-11-23 10:41:11 +01:00
b08f3c27db Use unified_merge for datasets merging 2020-11-18 13:40:38 +01:00
e15f9c9c3e Clarify terms nb and bi in labels 2020-11-18 13:24:19 +01:00
b62573fa09 Add 'param study' tab based on 'ccl integrate' tab 2020-11-18 09:48:50 +01:00
f7f016cf1c First draft of new merge function
This is the first shot at the new merge function. I didnt wanna rewrite the previous before we agree on this, since I think there will be some changes. Therefore I would like to discuss this first. Since we agreed not to do is as previously, ergo first scan everything and then merge or add, I've tried to do these function recursive. Haven't tested it much, I would like to agree if this would be a good way on how to write it.
2020-11-17 15:25:09 +01:00
8c8715b041 Fix #19 2020-11-12 20:35:33 +01:00
008761e661 Update ccl_io.py
Added correction for area_s as well as requested by Romain
2020-11-10 18:39:25 +01:00
4343d6e2b6 Refactor area method calculation 2020-11-10 16:20:04 +01:00
11e1a6b60c Avoid num_of_peaks intermediate 2020-11-10 15:32:13 +01:00
8be637a7f3 Pin bokeh/2.3 2020-11-10 08:55:48 +01:00
4fbfe21e99 Replace fitparam widgets with DataTable solution 2020-11-09 15:50:50 +01:00
b5b77d165a Make sure to put tags only on master branch 2020-11-09 11:18:04 +01:00
0e176cb2f3 Deploy only on tags 2020-11-07 00:18:42 +01:00
2ba0964e07 Replace travis-ci with github actions 2020-11-06 23:41:52 +01:00
b31f359ee7 Updating for version 0.1.3 2020-11-06 15:04:19 +01:00
63150a4b19 Build only one noarch package with python >=3.6 2020-11-06 15:03:48 +01:00
8d779b11f6 No need for tag message in release script 2020-11-06 13:26:41 +01:00
b2d603b3c5 Add matplotlib and pandas as deps 2020-11-06 11:44:28 +01:00
2ddb0a668a Switch build to noarch package 2020-11-06 10:29:43 +01:00
de81f2fd9f Move cli.py into app folder 2020-11-06 10:28:43 +01:00
b2fc2d604a Updating for version 0.1.2 2020-11-05 17:27:42 +01:00
78096efcef Simplify ccl_io functions 2020-11-05 17:23:07 +01:00
5b0f97959e Simplify lorentz correction calculation 2020-11-05 15:48:02 +01:00
8fb1c5f247 Fix lorentz correction 2020-11-05 15:00:07 +01:00
58641ab94f Fix #17 2020-11-05 14:55:25 +01:00
a6bcb8ffa1 Strip proposal number string 2020-11-02 16:28:17 +01:00
7b6e6bf396 Update prints in fitting functions 2020-11-02 16:20:02 +01:00
45f295fcf8 Add pyzebra handler
* allow user to specify anatric path
2020-11-02 15:41:15 +01:00
3c58fd2102 Add Lorentz Correction toggle 2020-11-02 13:54:38 +01:00
abbaded278 Allow direct edit of export flag in scan_table 2020-11-02 12:07:24 +01:00
fbe992c901 Deduplicate database initialization code 2020-11-02 10:50:07 +01:00
dec282d1b7 Simplify peakfind and fit params handling 2020-11-02 10:31:28 +01:00
4429823629 Simplify scan selection updates 2020-11-02 10:15:47 +01:00
80fddb514a Avoid selection of multiple indicies 2020-11-02 09:40:48 +01:00
cfe9832c1e Allow projection colormap values to be floats 2020-10-30 16:23:00 +01:00
fd942672df Rename number_of_measurements -> n_points 2020-10-30 15:58:47 +01:00
60cb733ca7 Remove ccl_dict_operations.py 2020-10-30 15:58:47 +01:00
7c2ecef56d Fix #16
Added a line that in case of not running the peak finder prior to the fit creates empty list of peak positions, ergo it behaves like no peak scenario
2020-10-30 15:53:25 +01:00
468f33e606 Update ccl_io.py
Added try except for mag_field, since some of the data dont have this value and script fails.
2020-10-30 14:10:01 +01:00
dbc643aba9 Update param_study_moduls.py
Updated the create_dataframe and added function called variables, which tries to decide which variables to plot in parametric study and q scans. Works good for primary variable (usually om), and reduces the secondary (slice variable, temperature, mag.field,...) variables to a few candidates from which one has to be picked. In one set for param study, it identified all parameters correctly, in q scan, the temperature varied as well as H index, so technically both could be used, but only one makes sense and that will have to be picked by user.
2020-10-30 11:45:24 +01:00
0856705024 Update ccl_findpeaks.py
Added one more parameter "variable", so we can use the function also for other scans than omega, will be necessary in param study and should not influence the ccl integration hopefully.
2020-10-30 11:30:37 +01:00
ce608f1b49 Allow manual data removal from export 2020-10-27 14:48:58 +01:00
3eaf54eda3 Combine comm_export and load_1D modules 2020-10-27 13:24:31 +01:00
a496267a9d Simplify check for hkl/real indices 2020-10-27 11:55:56 +01:00
1a3ebfbcbd Keep scan_number as a part of scan dict 2020-10-27 11:22:06 +01:00
7bcb23c1bd Temporary bug fix
Rest in the email.
2020-10-26 21:05:27 +01:00
b28fe39bbb Introduce experimental merge of 2 datasets 2020-10-26 16:47:28 +01:00
42c6e6b921 Fix imports and indentation 2020-10-26 16:37:03 +01:00
dba2dc6149 Add bin size widget 2020-10-26 15:54:49 +01:00
a0c9b0162b Add pan/zoom tools 2020-10-26 15:22:52 +01:00
00b0c2d708 Updating for version 0.1.1 2020-10-23 16:58:14 +02:00
4ae8890bb8 Merge branch 'det1d' 2020-10-23 16:56:25 +02:00
430ffc2caa Generalized fitting function
This is first idea how the function could work. Use should be the same as previous one but we need to find a way how to pass parameters to the function. There is a new parameter called variable, which should choose the x coordinate, since "om" might not be the only axis here. Function does not change the initial dictionary yet, but process will be the same as in the first one. It is still not clear how the peaks should be reported, more so what to report in case of two overlapping peaks (same goes for numerical integration), but the process will be similar to the fitvol2. The function can be used, but is posted here for a reason of discussion and finding the best way of passing the parameters.
2020-10-23 16:45:04 +02:00
aee5c82925 Add colormap controls to proj plots 2020-10-23 15:58:06 +02:00
7e16ea0fea Make magnetic field and temperature optional 2020-10-23 15:35:15 +02:00
6ff1b2b54f Update param_study_moduls.py
meas to scan
2020-10-23 15:19:08 +02:00
6099df650b Update param_study_moduls.py
Updated parametric study module with merging, adding etc...
2020-10-23 10:23:46 +02:00
0347566aeb Update comm_export.py
fixed the order of hkls
2020-10-22 15:16:40 +02:00
28 changed files with 2072 additions and 978 deletions

25
.github/workflows/deployment.yaml vendored Normal file
View File

@ -0,0 +1,25 @@
name: Deployment
on:
push:
tags:
- '*'
jobs:
publish-conda-package:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Prepare
run: |
$CONDA/bin/conda install --quiet --yes conda-build anaconda-client
$CONDA/bin/conda config --append channels conda-forge
$CONDA/bin/conda config --set anaconda_upload yes
- name: Build and upload
env:
ANACONDA_TOKEN: ${{ secrets.ANACONDA_TOKEN }}
run: |
$CONDA/bin/conda build --token $ANACONDA_TOKEN conda-recipe

View File

@ -1,33 +0,0 @@
language: python
python:
- 3.6
- 3.7
- 3.8
# Build only tagged commits
if: tag IS present
before_install:
- wget https://repo.continuum.io/miniconda/Miniconda3-latest-Linux-x86_64.sh -O miniconda.sh
- bash miniconda.sh -b -p $HOME/miniconda
- export PATH="$HOME/miniconda/bin:$PATH"
- conda config --append channels conda-forge
- conda config --set always_yes yes
- conda config --set anaconda_upload no
install:
- conda update -q conda
- conda install -q python=$TRAVIS_PYTHON_VERSION conda-build anaconda-client
script:
- conda build conda-recipe
deploy:
provider: script
script: anaconda -t $ANACONDA_TOKEN upload $HOME/miniconda/conda-bld/**/pyzebra-*.tar.bz2
on:
branch: master
tags: true
notifications:
email: false

2
.vscode/launch.json vendored
View File

@ -5,7 +5,7 @@
"name": "pyzebra",
"type": "python",
"request": "launch",
"program": "${workspaceFolder}/pyzebra/cli.py",
"program": "${workspaceFolder}/pyzebra/app/cli.py",
"console": "internalConsole",
"env": {},
},

2
conda-recipe/bld.bat Normal file
View File

@ -0,0 +1,2 @@
"%PYTHON%" setup.py install --single-version-externally-managed --record=record.txt
if errorlevel 1 exit 1

View File

@ -8,20 +8,23 @@ source:
path: ..
build:
noarch: python
number: 0
entry_points:
- pyzebra = pyzebra.cli:main
- pyzebra = pyzebra.app.cli:main
requirements:
build:
- python
- python >=3.6
- setuptools
run:
- python
- python >=3.6
- numpy
- scipy
- pandas
- h5py
- bokeh
- bokeh =2.2
- matplotlib
- numba
- lmfit
- uncertainties

9
make_release.py Normal file → Executable file
View File

@ -3,14 +3,19 @@
import argparse
import os
import re
import subprocess
def main():
branch = subprocess.check_output("git rev-parse --abbrev-ref HEAD", shell=True).decode().strip()
if branch != "master":
print("Aborting, not on 'master' branch.")
return
filepath = "pyzebra/__init__.py"
parser = argparse.ArgumentParser()
parser.add_argument("level", type=str, choices=["patch", "minor", "major"])
parser.add_argument("tag_msg", type=str, help="tag message")
args = parser.parse_args()
with open(filepath) as f:
@ -35,7 +40,7 @@ def main():
f.write(re.sub(r'__version__ = "(.*?)"', f'__version__ = "{new_version}"', file_content))
os.system(f"git commit {filepath} -m 'Updating for version {new_version}'")
os.system(f"git tag -a {new_version} -m '{args.tag_msg}'")
os.system(f"git tag -a {new_version} -m 'Release {new_version}'")
if __name__ == "__main__":

View File

@ -1,10 +1,9 @@
import pyzebra.ccl_dict_operation
from pyzebra.anatric import *
from pyzebra.ccl_findpeaks import ccl_findpeaks
from pyzebra.comm_export import export_comm
from pyzebra.ccl_io import export_comm, load_1D, parse_1D
from pyzebra.fit2 import fitccl
from pyzebra.h5 import *
from pyzebra.load_1D import load_1D, parse_1D
from pyzebra.merge_function import add_dict, unified_merge
from pyzebra.xtal import *
__version__ = "0.1.0"
__version__ = "0.2.1"

View File

@ -2,7 +2,6 @@ import subprocess
import xml.etree.ElementTree as ET
ANATRIC_PATH = "/afs/psi.ch/project/sinq/rhel7/bin/anatric"
DATA_FACTORY_IMPLEMENTATION = [
"trics",
"morph",
@ -24,8 +23,8 @@ REFLECTION_PRINTER_FORMATS = [
ALGORITHMS = ["adaptivemaxcog", "adaptivedynamic"]
def anatric(config_file):
subprocess.run([ANATRIC_PATH, config_file], check=True)
def anatric(config_file, anatric_path="/afs/psi.ch/project/sinq/rhel7/bin/anatric"):
subprocess.run([anatric_path, config_file], check=True)
class AnatricConfig:

View File

@ -1,4 +1,3 @@
import argparse
import logging
import sys
from io import StringIO
@ -10,15 +9,10 @@ from bokeh.models import Tabs, TextAreaInput
import panel_ccl_integrate
import panel_hdf_anatric
import panel_hdf_viewer
import panel_param_study
parser = argparse.ArgumentParser(
prog="pyzebra", formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
args = parser.parse_args()
doc = curdoc()
doc.title = "pyzebra"
sys.stdout = StringIO()
stdout_textareainput = TextAreaInput(title="print output:", height=150)
@ -26,7 +20,7 @@ stdout_textareainput = TextAreaInput(title="print output:", height=150)
bokeh_stream = StringIO()
bokeh_handler = logging.StreamHandler(bokeh_stream)
bokeh_handler.setFormatter(logging.Formatter(logging.BASIC_FORMAT))
bokeh_logger = logging.getLogger('bokeh')
bokeh_logger = logging.getLogger("bokeh")
bokeh_logger.addHandler(bokeh_handler)
bokeh_log_textareainput = TextAreaInput(title="server output:", height=150)
@ -34,10 +28,11 @@ bokeh_log_textareainput = TextAreaInput(title="server output:", height=150)
tab_hdf_viewer = panel_hdf_viewer.create()
tab_hdf_anatric = panel_hdf_anatric.create()
tab_ccl_integrate = panel_ccl_integrate.create()
tab_param_study = panel_param_study.create()
doc.add_root(
column(
Tabs(tabs=[tab_hdf_viewer, tab_hdf_anatric, tab_ccl_integrate]),
Tabs(tabs=[tab_hdf_viewer, tab_hdf_anatric, tab_ccl_integrate, tab_param_study]),
row(stdout_textareainput, bokeh_log_textareainput, sizing_mode="scale_both"),
)
)

View File

@ -6,6 +6,8 @@ from bokeh.application.application import Application
from bokeh.application.handlers import ScriptHandler
from bokeh.server.server import Server
from pyzebra.app.handler import PyzebraHandler
logging.basicConfig(format="%(asctime)s %(message)s", level=logging.INFO)
logger = logging.getLogger(__name__)
@ -16,7 +18,7 @@ def main():
This is a wrapper around a bokeh server that provides an interface to launch the application,
bundled with the pyzebra package.
"""
app_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "app", "app.py")
app_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "app.py")
parser = argparse.ArgumentParser(
prog="pyzebra", formatter_class=argparse.ArgumentDefaultsHelpFormatter
@ -35,6 +37,13 @@ def main():
help="hostname that can connect to the server websocket",
)
parser.add_argument(
"--anatric-path",
type=str,
default="/afs/psi.ch/project/sinq/rhel7/bin/anatric",
help="path to anatric executable",
)
parser.add_argument(
"--args",
nargs=argparse.REMAINDER,
@ -46,9 +55,10 @@ def main():
logger.info(app_path)
pyzebra_handler = PyzebraHandler(args.anatric_path)
handler = ScriptHandler(filename=app_path, argv=args.args)
server = Server(
{"/": Application(handler)},
{"/": Application(pyzebra_handler, handler)},
port=args.port,
allow_websocket_origin=args.allow_websocket_origin,
)

30
pyzebra/app/handler.py Normal file
View File

@ -0,0 +1,30 @@
from bokeh.application.handlers import Handler
class PyzebraHandler(Handler):
"""Provides a mechanism for generic bokeh applications to build up new streamvis documents.
"""
def __init__(self, anatric_path):
"""Initialize a pyzebra handler for bokeh applications.
Args:
args (Namespace): Command line parsed arguments.
"""
super().__init__() # no-op
self.anatric_path = anatric_path
def modify_document(self, doc):
"""Modify an application document with pyzebra specific features.
Args:
doc (Document) : A bokeh Document to update in-place
Returns:
Document
"""
doc.title = "pyzebra"
doc.anatric_path = self.anatric_path
return doc

View File

@ -2,6 +2,8 @@ import base64
import io
import os
import tempfile
import types
from copy import deepcopy
import numpy as np
from bokeh.layouts import column, row
@ -9,18 +11,24 @@ from bokeh.models import (
Asterisk,
BasicTicker,
Button,
CheckboxEditor,
ColumnDataSource,
CustomJS,
DataRange1d,
DataTable,
Div,
Dropdown,
FileInput,
Grid,
Line,
LinearAxis,
MultiSelect,
NumberEditor,
Panel,
PanTool,
Plot,
RadioButtonGroup,
ResetTool,
Scatter,
Select,
Spacer,
@ -30,10 +38,12 @@ from bokeh.models import (
TextAreaInput,
TextInput,
Toggle,
WheelZoomTool,
Whisker,
)
import pyzebra
from pyzebra.ccl_io import AREA_METHODS
javaScript = """
@ -56,11 +66,12 @@ PROPOSAL_PATH = "/afs/psi.ch/project/sinqdata/2020/zebra/"
def create():
det_data = {}
fit_params = {}
peak_pos_textinput_lock = False
js_data = ColumnDataSource(data=dict(cont=[], ext=[]))
def proposal_textinput_callback(_attr, _old, new):
ccl_path = os.path.join(PROPOSAL_PATH, new)
ccl_path = os.path.join(PROPOSAL_PATH, new.strip())
ccl_file_list = []
for file in os.listdir(ccl_path):
if file.endswith(".ccl"):
@ -71,22 +82,31 @@ def create():
proposal_textinput = TextInput(title="Enter proposal number:", default_size=145)
proposal_textinput.on_change("value", proposal_textinput_callback)
def _init_datatable():
scan_list = list(det_data["scan"].keys())
hkl = [
f'{int(m["h_index"])} {int(m["k_index"])} {int(m["l_index"])}'
if det_data["meta"]["indices"] == "hkl"
else f'{m["h_index"]} {m["k_index"]} {m["l_index"]}'
for m in det_data["scan"].values()
]
scan_table_source.data.update(
scan=scan_list,
hkl=hkl,
peaks=[0] * len(scan_list),
fit=[0] * len(scan_list),
export=[True] * len(scan_list),
)
scan_table_source.selected.indices = []
scan_table_source.selected.indices = [0]
def ccl_file_select_callback(_attr, _old, new):
nonlocal det_data
with open(new) as file:
_, ext = os.path.splitext(new)
det_data = pyzebra.parse_1D(file, ext)
scan_list = list(det_data["scan"].keys())
hkl = [
f'{int(m["h_index"])} {int(m["k_index"])} {int(m["l_index"])}'
for m in det_data["scan"].values()
]
scan_table_source.data.update(
scan=scan_list, hkl=hkl, peaks=[0] * len(scan_list), fit=[0] * len(scan_list)
)
scan_table_source.selected.indices = []
scan_table_source.selected.indices = [0]
_init_datatable()
ccl_file_select = Select(title="Available .ccl files")
ccl_file_select.on_change("value", ccl_file_select_callback)
@ -97,36 +117,39 @@ def create():
_, ext = os.path.splitext(upload_button.filename)
det_data = pyzebra.parse_1D(file, ext)
scan_list = list(det_data["scan"].keys())
hkl = [
f'{int(m["h_index"])} {int(m["k_index"])} {int(m["l_index"])}'
for m in det_data["scan"].values()
]
scan_table_source.data.update(
scan=scan_list, hkl=hkl, peaks=[0] * len(scan_list), fit=[0] * len(scan_list)
)
scan_table_source.selected.indices = []
scan_table_source.selected.indices = [0]
_init_datatable()
upload_button = FileInput(accept=".ccl")
upload_button.on_change("value", upload_button_callback)
def append_upload_button_callback(_attr, _old, new):
nonlocal det_data
with io.StringIO(base64.b64decode(new).decode()) as file:
_, ext = os.path.splitext(append_upload_button.filename)
append_data = pyzebra.parse_1D(file, ext)
pyzebra.unified_merge(det_data, append_data)
_init_datatable()
append_upload_button = FileInput(accept=".ccl,.dat")
append_upload_button.on_change("value", append_upload_button_callback)
def _update_table():
num_of_peaks = [scan.get("num_of_peaks", 0) for scan in det_data["scan"].values()]
num_of_peaks = [len(scan.get("peak_indexes", [])) for scan in det_data["scan"].values()]
fit_ok = [(1 if "fit" in scan else 0) for scan in det_data["scan"].values()]
scan_table_source.data.update(peaks=num_of_peaks, fit=fit_ok)
def _update_plot(ind):
def _update_plot(scan):
nonlocal peak_pos_textinput_lock
peak_pos_textinput_lock = True
scan = det_data["scan"][ind]
y = scan["Counts"]
x = scan["om"]
plot_scatter_source.data.update(x=x, y=y, y_upper=y + np.sqrt(y), y_lower=y - np.sqrt(y))
num_of_peaks = scan.get("num_of_peaks")
num_of_peaks = len(scan.get("peak_indexes", []))
if num_of_peaks is not None and num_of_peaks > 0:
peak_indexes = scan["peak_indexes"]
if len(peak_indexes) == 1:
@ -145,18 +168,17 @@ def create():
fit = scan.get("fit")
if fit is not None:
x = scan["fit"]["x_fit"]
plot_gauss_source.data.update(x=x, y=scan["fit"]["comps"]["gaussian"])
plot_bkg_source.data.update(x=x, y=scan["fit"]["comps"]["background"])
params = fit["result"].params
fit_output_textinput.value = (
"%s \n"
"Gaussian: centre = %9.4f, sigma = %9.4f, area = %9.4f \n"
"background: slope = %9.4f, intercept = %9.4f \n"
"Int. area = %9.4f +/- %9.4f \n"
"fit area = %9.4f +/- %9.4f \n"
"ratio((fit-int)/fit) = %9.4f"
% (
ind,
params["g_cen"].value,
params["g_width"].value,
params["g_amp"].value,
@ -188,13 +210,7 @@ def create():
numfit_max_span.location = None
# Main plot
plot = Plot(
x_range=DataRange1d(),
y_range=DataRange1d(),
plot_height=400,
plot_width=700,
toolbar_location=None,
)
plot = Plot(x_range=DataRange1d(), y_range=DataRange1d(), plot_height=400, plot_width=700)
plot.add_layout(LinearAxis(axis_label="Counts"), place="left")
plot.add_layout(LinearAxis(axis_label="Omega"), place="below")
@ -226,12 +242,28 @@ def create():
numfit_max_span = Span(location=None, dimension="height", line_dash="dashed")
plot.add_layout(numfit_max_span)
# Scan select
def scan_table_callback(_attr, _old, new):
if new:
_update_plot(scan_table_source.data["scan"][new[-1]])
plot.add_tools(PanTool(), WheelZoomTool(), ResetTool())
plot.toolbar.logo = None
scan_table_source = ColumnDataSource(dict(scan=[], hkl=[], peaks=[], fit=[]))
# Scan select
def scan_table_select_callback(_attr, old, new):
if not new:
# skip empty selections
return
# Avoid selection of multiple indicies (via Shift+Click or Ctrl+Click)
if len(new) > 1:
# drop selection to the previous one
scan_table_source.selected.indices = old
return
if len(old) > 1:
# skip unnecessary update caused by selection drop
return
_update_plot(det_data["scan"][scan_table_source.data["scan"][new[0]]])
scan_table_source = ColumnDataSource(dict(scan=[], hkl=[], peaks=[], fit=[], export=[]))
scan_table = DataTable(
source=scan_table_source,
columns=[
@ -239,25 +271,29 @@ def create():
TableColumn(field="hkl", title="hkl"),
TableColumn(field="peaks", title="Peaks"),
TableColumn(field="fit", title="Fit"),
TableColumn(field="export", title="Export", editor=CheckboxEditor()),
],
width=200,
width=250,
index_position=None,
editable=True,
)
scan_table_source.selected.on_change("indices", scan_table_callback)
scan_table_source.selected.on_change("indices", scan_table_select_callback)
def _get_selected_scan():
selected_index = scan_table_source.selected.indices[0]
selected_scan_id = scan_table_source.data["scan"][selected_index]
return det_data["scan"][selected_scan_id]
def peak_pos_textinput_callback(_attr, _old, new):
if new is not None and not peak_pos_textinput_lock:
sel_ind = scan_table_source.selected.indices[-1]
scan_name = scan_table_source.data["scan"][sel_ind]
scan = det_data["scan"][scan_name]
scan = _get_selected_scan()
scan["num_of_peaks"] = 1
peak_ind = (np.abs(scan["om"] - float(new))).argmin()
scan["peak_indexes"] = np.array([peak_ind], dtype=np.int64)
scan["peak_heights"] = np.array([scan["smooth_peaks"][peak_ind]])
_update_table()
_update_plot(scan_name)
_update_plot(scan)
peak_pos_textinput = TextInput(title="Peak position:", default_size=145)
peak_pos_textinput.on_change("value", peak_pos_textinput_callback)
@ -270,82 +306,115 @@ def create():
window_size_spinner = Spinner(title="Window size:", value=7, step=2, low=1, default_size=145)
poly_order_spinner = Spinner(title="Poly order:", value=3, low=0, default_size=145)
centre_guess = Spinner(default_size=100)
centre_vary = Toggle(default_size=100, active=True)
centre_min = Spinner(default_size=100)
centre_max = Spinner(default_size=100)
sigma_guess = Spinner(default_size=100)
sigma_vary = Toggle(default_size=100, active=True)
sigma_min = Spinner(default_size=100)
sigma_max = Spinner(default_size=100)
ampl_guess = Spinner(default_size=100)
ampl_vary = Toggle(default_size=100, active=True)
ampl_min = Spinner(default_size=100)
ampl_max = Spinner(default_size=100)
slope_guess = Spinner(default_size=100)
slope_vary = Toggle(default_size=100, active=True)
slope_min = Spinner(default_size=100)
slope_max = Spinner(default_size=100)
offset_guess = Spinner(default_size=100)
offset_vary = Toggle(default_size=100, active=True)
offset_min = Spinner(default_size=100)
offset_max = Spinner(default_size=100)
integ_from = Spinner(title="Integrate from:", default_size=145)
integ_to = Spinner(title="to:", default_size=145)
def fitparam_reset_button_callback():
centre_guess.value = None
centre_vary.active = True
centre_min.value = None
centre_max.value = None
sigma_guess.value = None
sigma_vary.active = True
sigma_min.value = None
sigma_max.value = None
ampl_guess.value = None
ampl_vary.active = True
ampl_min.value = None
ampl_max.value = None
slope_guess.value = None
slope_vary.active = True
slope_min.value = None
slope_max.value = None
offset_guess.value = None
offset_vary.active = True
offset_min.value = None
offset_max.value = None
integ_from.value = None
integ_to.value = None
...
fitparam_reset_button = Button(label="Reset to defaults", default_size=145)
fitparam_reset_button = Button(label="Reset to defaults", default_size=145, disabled=True)
fitparam_reset_button.on_click(fitparam_reset_button_callback)
def fitparams_add_dropdown_callback(click):
new_tag = str(fitparams_select.tags[0]) # bokeh requires (str, str) for MultiSelect options
fitparams_select.options.append((new_tag, click.item))
fit_params[new_tag] = fitparams_factory(click.item)
fitparams_select.tags[0] += 1
fitparams_add_dropdown = Dropdown(
label="Add fit function",
menu=[
("Background", "background"),
("Gauss", "gauss"),
("Voigt", "voigt"),
("Pseudo Voigt", "pseudovoigt"),
("Pseudo Voigt1", "pseudovoigt1"),
],
default_size=145,
disabled=True,
)
fitparams_add_dropdown.on_click(fitparams_add_dropdown_callback)
def fitparams_select_callback(_attr, old, new):
# Avoid selection of multiple indicies (via Shift+Click or Ctrl+Click)
if len(new) > 1:
# drop selection to the previous one
fitparams_select.value = old
return
if len(old) > 1:
# skip unnecessary update caused by selection drop
return
if new:
fitparams_table_source.data.update(fit_params[new[0]])
else:
fitparams_table_source.data.update(dict(param=[], guess=[], vary=[], min=[], max=[]))
fitparams_select = MultiSelect(options=[], height=120, default_size=145)
fitparams_select.tags = [0]
fitparams_select.on_change("value", fitparams_select_callback)
def fitparams_remove_button_callback():
if fitparams_select.value:
sel_tag = fitparams_select.value[0]
del fit_params[sel_tag]
for elem in fitparams_select.options:
if elem[0] == sel_tag:
fitparams_select.options.remove(elem)
break
fitparams_select.value = []
fitparams_remove_button = Button(label="Remove fit function", default_size=145, disabled=True)
fitparams_remove_button.on_click(fitparams_remove_button_callback)
def fitparams_factory(function):
if function == "background":
params = ["slope", "offset"]
elif function == "gauss":
params = ["center", "sigma", "amplitude"]
elif function == "voigt":
params = ["center", "sigma", "amplitude", "gamma"]
elif function == "pseudovoigt":
params = ["center", "sigma", "amplitude", "fraction"]
elif function == "pseudovoigt1":
params = ["center", "g_sigma", "l_sigma", "amplitude", "fraction"]
else:
raise ValueError("Unknown fit function")
n = len(params)
fitparams = dict(
param=params, guess=[None] * n, vary=[True] * n, min=[None] * n, max=[None] * n,
)
return fitparams
fitparams_table_source = ColumnDataSource(dict(param=[], guess=[], vary=[], min=[], max=[]))
fitparams_table = DataTable(
source=fitparams_table_source,
columns=[
TableColumn(field="param", title="Parameter"),
TableColumn(field="guess", title="Guess", editor=NumberEditor()),
TableColumn(field="vary", title="Vary", editor=CheckboxEditor()),
TableColumn(field="min", title="Min", editor=NumberEditor()),
TableColumn(field="max", title="Max", editor=NumberEditor()),
],
height=200,
width=350,
index_position=None,
editable=True,
auto_edit=True,
)
# start with `background` and `gauss` fit functions added
fitparams_add_dropdown_callback(types.SimpleNamespace(item="background"))
fitparams_add_dropdown_callback(types.SimpleNamespace(item="gauss"))
fit_output_textinput = TextAreaInput(title="Fit results:", width=450, height=400)
def peakfind_all_button_callback():
for scan in det_data["scan"].values():
pyzebra.ccl_findpeaks(
scan,
int_threshold=peak_int_ratio_spinner.value,
prominence=peak_prominence_spinner.value,
smooth=smooth_toggle.active,
window_size=window_size_spinner.value,
poly_order=poly_order_spinner.value,
)
_update_table()
sel_ind = scan_table_source.selected.indices[-1]
_update_plot(scan_table_source.data["scan"][sel_ind])
peakfind_all_button = Button(label="Peak Find All", button_type="primary", default_size=145)
peakfind_all_button.on_click(peakfind_all_button_callback)
def peakfind_button_callback():
sel_ind = scan_table_source.selected.indices[-1]
scan = scan_table_source.data["scan"][sel_ind]
pyzebra.ccl_findpeaks(
det_data["scan"][scan],
def _get_peakfind_params():
return dict(
int_threshold=peak_int_ratio_spinner.value,
prominence=peak_prominence_spinner.value,
smooth=smooth_toggle.active,
@ -353,92 +422,53 @@ def create():
poly_order=poly_order_spinner.value,
)
def peakfind_all_button_callback():
peakfind_params = _get_peakfind_params()
for scan in det_data["scan"].values():
pyzebra.ccl_findpeaks(scan, **peakfind_params)
_update_table()
_update_plot(_get_selected_scan())
peakfind_all_button = Button(label="Peak Find All", button_type="primary", default_size=145)
peakfind_all_button.on_click(peakfind_all_button_callback)
def peakfind_button_callback():
scan = _get_selected_scan()
pyzebra.ccl_findpeaks(scan, **_get_peakfind_params())
_update_table()
_update_plot(scan)
peakfind_button = Button(label="Peak Find Current", default_size=145)
peakfind_button.on_click(peakfind_button_callback)
def fit_all_button_callback():
for scan in det_data["scan"].values():
pyzebra.fitccl(
scan,
guess=[
centre_guess.value,
sigma_guess.value,
ampl_guess.value,
slope_guess.value,
offset_guess.value,
],
vary=[
centre_vary.active,
sigma_vary.active,
ampl_vary.active,
slope_vary.active,
offset_vary.active,
],
constraints_min=[
centre_min.value,
sigma_min.value,
ampl_min.value,
slope_min.value,
offset_min.value,
],
constraints_max=[
centre_max.value,
sigma_max.value,
ampl_max.value,
slope_max.value,
offset_max.value,
],
numfit_min=integ_from.value,
numfit_max=integ_to.value,
)
def _get_fit_params():
return dict(
guess=fit_params["1"]["guess"] + fit_params["0"]["guess"],
vary=fit_params["1"]["vary"] + fit_params["0"]["vary"],
constraints_min=fit_params["1"]["min"] + fit_params["0"]["min"],
constraints_max=fit_params["1"]["max"] + fit_params["0"]["max"],
numfit_min=integ_from.value,
numfit_max=integ_to.value,
binning=bin_size_spinner.value,
)
sel_ind = scan_table_source.selected.indices[-1]
_update_plot(scan_table_source.data["scan"][sel_ind])
def fit_all_button_callback():
fit_params = _get_fit_params()
for scan in det_data["scan"].values():
# fit_params are updated inplace within `fitccl`
pyzebra.fitccl(scan, **deepcopy(fit_params))
_update_plot(_get_selected_scan())
_update_table()
fit_all_button = Button(label="Fit All", button_type="primary", default_size=145)
fit_all_button.on_click(fit_all_button_callback)
def fit_button_callback():
sel_ind = scan_table_source.selected.indices[-1]
scan = scan_table_source.data["scan"][sel_ind]
pyzebra.fitccl(
det_data["scan"][scan],
guess=[
centre_guess.value,
sigma_guess.value,
ampl_guess.value,
slope_guess.value,
offset_guess.value,
],
vary=[
centre_vary.active,
sigma_vary.active,
ampl_vary.active,
slope_vary.active,
offset_vary.active,
],
constraints_min=[
centre_min.value,
sigma_min.value,
ampl_min.value,
slope_min.value,
offset_min.value,
],
constraints_max=[
centre_max.value,
sigma_max.value,
ampl_max.value,
slope_max.value,
offset_max.value,
],
numfit_min=integ_from.value,
numfit_max=integ_to.value,
)
scan = _get_selected_scan()
pyzebra.fitccl(scan, **_get_fit_params())
_update_plot(scan)
_update_table()
@ -447,13 +477,17 @@ def create():
fit_button.on_click(fit_button_callback)
def area_method_radiobutton_callback(_attr, _old, new):
det_data["meta"]["area_method"] = ("fit", "integ")[new]
det_data["meta"]["area_method"] = AREA_METHODS[new]
area_method_radiobutton = RadioButtonGroup(
labels=["Fit", "Integral"], active=0, default_size=145
labels=["Fit area", "Int area"], active=0, default_size=145
)
area_method_radiobutton.on_change("active", area_method_radiobutton_callback)
bin_size_spinner = Spinner(title="Bin size:", value=1, low=1, step=1, default_size=145)
lorentz_toggle = Toggle(label="Lorentz Correction", default_size=145)
preview_output_textinput = TextAreaInput(title="Export file preview:", width=450, height=400)
def preview_output_button_callback():
@ -464,7 +498,16 @@ def create():
with tempfile.TemporaryDirectory() as temp_dir:
temp_file = temp_dir + "/temp"
pyzebra.export_comm(det_data, temp_file)
export_data = deepcopy(det_data)
for s, export in zip(scan_table_source.data["scan"], scan_table_source.data["export"]):
if not export:
del export_data["scan"][s]
pyzebra.export_comm(
export_data,
temp_file,
lorentz=lorentz_toggle.active,
hkl_precision=int(hkl_precision_select.value),
)
with open(f"{temp_file}{ext}") as f:
preview_output_textinput.value = f.read()
@ -472,6 +515,8 @@ def create():
preview_output_button = Button(label="Preview file", default_size=220)
preview_output_button.on_click(preview_output_button_callback)
hkl_precision_select = Select(options=["2", "3", "4"], value="2", default_size=220)
def export_results(det_data):
if det_data["meta"]["indices"] == "hkl":
ext = ".comm"
@ -480,7 +525,16 @@ def create():
with tempfile.TemporaryDirectory() as temp_dir:
temp_file = temp_dir + "/temp"
pyzebra.export_comm(det_data, temp_file)
export_data = deepcopy(det_data)
for s, export in zip(scan_table_source.data["scan"], scan_table_source.data["export"]):
if not export:
del export_data["scan"][s]
pyzebra.export_comm(
export_data,
temp_file,
lorentz=lorentz_toggle.active,
hkl_precision=int(hkl_precision_select.value),
)
with open(f"{temp_file}{ext}") as f:
output_content = f.read()
@ -502,45 +556,33 @@ def create():
row(peakfind_button, peakfind_all_button),
)
div_1 = Div(text="Guess:")
div_2 = Div(text="Vary:")
div_3 = Div(text="Min:")
div_4 = Div(text="Max:")
div_5 = Div(text="Gauss Centre:", margin=[5, 5, -5, 5])
div_6 = Div(text="Gauss Sigma:", margin=[5, 5, -5, 5])
div_7 = Div(text="Gauss Ampl.:", margin=[5, 5, -5, 5])
div_8 = Div(text="Slope:", margin=[5, 5, -5, 5])
div_9 = Div(text="Offset:", margin=[5, 5, -5, 5])
fitpeak_controls = row(
column(
Spacer(height=36),
div_1,
Spacer(height=12),
div_2,
Spacer(height=12),
div_3,
Spacer(height=12),
div_4,
),
column(div_5, centre_guess, centre_vary, centre_min, centre_max),
column(div_6, sigma_guess, sigma_vary, sigma_min, sigma_max),
column(div_7, ampl_guess, ampl_vary, ampl_min, ampl_max),
column(div_8, slope_guess, slope_vary, slope_min, slope_max),
column(div_9, offset_guess, offset_vary, offset_min, offset_max),
column(fitparams_add_dropdown, fitparams_select, fitparams_remove_button),
fitparams_table,
Spacer(width=20),
column(
row(integ_from, integ_to),
row(bin_size_spinner, column(Spacer(height=19), lorentz_toggle)),
row(fitparam_reset_button, area_method_radiobutton),
row(fit_button, fit_all_button),
),
)
export_layout = column(preview_output_textinput, row(preview_output_button, save_button))
export_layout = column(
preview_output_textinput,
row(column(preview_output_button, hkl_precision_select), save_button),
)
upload_div = Div(text="Or upload .ccl file:")
append_upload_div = Div(text="append extra .ccl/.dat files:")
tab_layout = column(
row(proposal_textinput, ccl_file_select),
row(column(Spacer(height=5), upload_div), upload_button),
row(
column(Spacer(height=5), upload_div),
upload_button,
column(Spacer(height=5), append_upload_div),
append_upload_button,
),
row(scan_table, plot, Spacer(width=30), fit_output_textinput, export_layout),
row(findpeak_controls, Spacer(width=30), fitpeak_controls),
)

View File

@ -21,6 +21,7 @@ from pyzebra.anatric import DATA_FACTORY_IMPLEMENTATION, REFLECTION_PRINTER_FORM
def create():
doc = curdoc()
config = pyzebra.AnatricConfig()
def _load_config_file(file):
@ -345,7 +346,7 @@ def create():
with tempfile.TemporaryDirectory() as temp_dir:
temp_file = temp_dir + "/temp.xml"
config.save_as(temp_file)
pyzebra.anatric(temp_file)
pyzebra.anatric(temp_file, anatric_path=doc.anatric_path)
with open(config.logfile) as f_log:
output_log.value = f_log.read()
@ -400,10 +401,12 @@ def create():
)
async def update_config():
config.save_as("debug.xml")
with open("debug.xml") as f_config:
output_config.value = f_config.read()
with tempfile.TemporaryDirectory() as temp_dir:
temp_file = temp_dir + "/debug.xml"
config.save_as(temp_file)
with open(temp_file) as f_config:
output_config.value = f_config.read()
curdoc().add_periodic_callback(update_config, 1000)
doc.add_periodic_callback(update_config, 1000)
return Panel(child=tab_layout, title="hdf anatric")

View File

@ -74,8 +74,8 @@ def create():
image_source.data.update(image=[current_image])
if auto_toggle.active:
im_max = int(np.max(current_image))
im_min = int(np.min(current_image))
im_min = np.min(current_image)
im_max = np.max(current_image)
display_min_spinner.value = im_min
display_max_spinner.value = im_max
@ -83,8 +83,15 @@ def create():
image_glyph.color_mapper.low = im_min
image_glyph.color_mapper.high = im_max
magnetic_field_spinner.value = det_data["magnetic_field"][index]
temperature_spinner.value = det_data["temperature"][index]
if "magnetic_field" in det_data:
magnetic_field_spinner.value = det_data["magnetic_field"][index]
else:
magnetic_field_spinner.value = None
if "temperature" in det_data:
temperature_spinner.value = det_data["temperature"][index]
else:
temperature_spinner.value = None
gamma, nu = calculate_pol(det_data, index)
omega = np.ones((IMAGE_H, IMAGE_W)) * det_data["rot_angle"][index]
@ -99,6 +106,18 @@ def create():
overview_plot_x_image_source.data.update(image=[overview_x], dw=[n_x])
overview_plot_y_image_source.data.update(image=[overview_y], dw=[n_y])
if proj_auto_toggle.active:
im_min = min(np.min(overview_x), np.min(overview_y))
im_max = max(np.max(overview_x), np.max(overview_y))
proj_display_min_spinner.value = im_min
proj_display_max_spinner.value = im_max
overview_plot_x_image_glyph.color_mapper.low = im_min
overview_plot_y_image_glyph.color_mapper.low = im_min
overview_plot_x_image_glyph.color_mapper.high = im_max
overview_plot_y_image_glyph.color_mapper.high = im_max
if frame_button_group.active == 0: # Frame
overview_plot_x.axis[1].axis_label = "Frame"
overview_plot_y.axis[1].axis_label = "Frame"
@ -385,10 +404,9 @@ def create():
colormap.on_change("value", colormap_callback)
colormap.value = "plasma"
radio_button_group = RadioButtonGroup(labels=["nb", "nb_bi"], active=0)
radio_button_group = RadioButtonGroup(labels=["normal beam", "bisecting"], active=0)
STEP = 1
# ---- colormap auto toggle button
def auto_toggle_callback(state):
if state:
@ -400,7 +418,9 @@ def create():
update_image()
auto_toggle = Toggle(label="Auto Range", active=True, button_type="default", default_size=145)
auto_toggle = Toggle(
label="Main Auto Range", active=True, button_type="default", default_size=125
)
auto_toggle.on_click(auto_toggle_callback)
# ---- colormap display max value
@ -409,12 +429,12 @@ def create():
image_glyph.color_mapper.high = new_value
display_max_spinner = Spinner(
title="Maximal Display Value:",
title="Max Value:",
low=0 + STEP,
value=1,
step=STEP,
disabled=auto_toggle.active,
default_size=145,
default_size=80,
)
display_max_spinner.on_change("value", display_max_spinner_callback)
@ -424,19 +444,70 @@ def create():
image_glyph.color_mapper.low = new_value
display_min_spinner = Spinner(
title="Minimal Display Value:",
title="Min Value:",
low=0,
high=1 - STEP,
value=0,
step=STEP,
disabled=auto_toggle.active,
default_size=145,
default_size=80,
)
display_min_spinner.on_change("value", display_min_spinner_callback)
PROJ_STEP = 0.1
# ---- proj colormap auto toggle button
def proj_auto_toggle_callback(state):
if state:
proj_display_min_spinner.disabled = True
proj_display_max_spinner.disabled = True
else:
proj_display_min_spinner.disabled = False
proj_display_max_spinner.disabled = False
update_overview_plot()
proj_auto_toggle = Toggle(
label="Proj Auto Range", active=True, button_type="default", default_size=125
)
proj_auto_toggle.on_click(proj_auto_toggle_callback)
# ---- proj colormap display max value
def proj_display_max_spinner_callback(_attr, _old_value, new_value):
proj_display_min_spinner.high = new_value - PROJ_STEP
overview_plot_x_image_glyph.color_mapper.high = new_value
overview_plot_y_image_glyph.color_mapper.high = new_value
proj_display_max_spinner = Spinner(
title="Max Value:",
low=0 + PROJ_STEP,
value=1,
step=PROJ_STEP,
disabled=proj_auto_toggle.active,
default_size=80,
)
proj_display_max_spinner.on_change("value", proj_display_max_spinner_callback)
# ---- proj colormap display min value
def proj_display_min_spinner_callback(_attr, _old_value, new_value):
proj_display_max_spinner.low = new_value + PROJ_STEP
overview_plot_x_image_glyph.color_mapper.low = new_value
overview_plot_y_image_glyph.color_mapper.low = new_value
proj_display_min_spinner = Spinner(
title="Min Value:",
low=0,
high=1 - PROJ_STEP,
value=0,
step=PROJ_STEP,
disabled=proj_auto_toggle.active,
default_size=80,
)
proj_display_min_spinner.on_change("value", proj_display_min_spinner_callback)
def hkl_button_callback():
index = index_spinner.value
setup_type = "nb_bi" if radio_button_group.active else "nb"
h, k, l = calculate_hkl(det_data, index, setup_type)
geometry = "bi" if radio_button_group.active else "nb"
h, k, l = calculate_hkl(det_data, index, geometry)
image_source.data.update(h=[h], k=[k], l=[l])
hkl_button = Button(label="Calculate hkl (slow)")
@ -474,10 +545,16 @@ def create():
# Final layout
layout_image = column(gridplot([[proj_v, None], [plot, proj_h]], merge_tools=False))
colormap_layout = column(
row(colormap, column(Spacer(height=19), auto_toggle)),
row(display_max_spinner, display_min_spinner),
row(colormap),
row(column(Spacer(height=19), auto_toggle), display_max_spinner, display_min_spinner),
row(
column(Spacer(height=19), proj_auto_toggle),
proj_display_max_spinner,
proj_display_min_spinner,
),
)
hkl_layout = column(radio_button_group, hkl_button)
geometry_div = Div(text="Geometry:", margin=[5, 5, -5, 5])
hkl_layout = column(column(geometry_div, radio_button_group), hkl_button)
params_layout = row(magnetic_field_spinner, temperature_spinner)
layout_controls = row(
@ -510,7 +587,7 @@ def create():
return Panel(child=tab_layout, title="hdf viewer")
def calculate_hkl(det_data, index, setup_type="nb_bi"):
def calculate_hkl(det_data, index, geometry):
h = np.empty(shape=(IMAGE_H, IMAGE_W))
k = np.empty(shape=(IMAGE_H, IMAGE_W))
l = np.empty(shape=(IMAGE_H, IMAGE_W))
@ -522,14 +599,14 @@ def calculate_hkl(det_data, index, setup_type="nb_bi"):
nud = det_data["tlt_angle"]
ub = det_data["UB"]
if setup_type == "nb_bi":
if geometry == "bi":
ch = det_data["chi_angle"][index]
ph = det_data["phi_angle"][index]
elif setup_type == "nb":
elif geometry == "nb":
ch = 0
ph = 0
else:
raise ValueError(f"Unknown setup type '{setup_type}'")
raise ValueError(f"Unknown geometry type '{geometry}'")
for xi in np.arange(IMAGE_W):
for yi in np.arange(IMAGE_H):

View File

@ -0,0 +1,686 @@
import base64
import io
import itertools
import os
import tempfile
import types
from copy import deepcopy
import numpy as np
from bokeh.layouts import column, row
from bokeh.models import (
Asterisk,
BasicTicker,
Button,
CheckboxEditor,
ColumnDataSource,
CustomJS,
DataRange1d,
DataTable,
Div,
Dropdown,
FileInput,
Grid,
Line,
LinearAxis,
MultiLine,
MultiSelect,
NumberEditor,
Panel,
PanTool,
Plot,
RadioButtonGroup,
ResetTool,
Scatter,
Select,
Spacer,
Span,
Spinner,
TableColumn,
Tabs,
TextAreaInput,
TextInput,
Toggle,
WheelZoomTool,
Whisker,
)
from bokeh.palettes import Category10
import pyzebra
from pyzebra.ccl_io import AREA_METHODS
javaScript = """
setTimeout(function() {
const filename = 'output' + js_data.data['ext']
const blob = new Blob([js_data.data['cont']], {type: 'text/plain'})
const link = document.createElement('a');
document.body.appendChild(link);
const url = window.URL.createObjectURL(blob);
link.href = url;
link.download = filename;
link.click();
window.URL.revokeObjectURL(url);
document.body.removeChild(link);
}, 500);
"""
PROPOSAL_PATH = "/afs/psi.ch/project/sinqdata/2020/zebra/"
PLOT_TYPES = ("single scan", "overview")
def color_palette(n_colors):
palette = itertools.cycle(Category10[10])
return list(itertools.islice(palette, n_colors))
def create():
det_data = {}
fit_params = {}
peak_pos_textinput_lock = False
js_data = ColumnDataSource(data=dict(cont=[], ext=[]))
def proposal_textinput_callback(_attr, _old, new):
full_proposal_path = os.path.join(PROPOSAL_PATH, new.strip())
dat_file_list = []
for file in os.listdir(full_proposal_path):
if file.endswith(".dat"):
dat_file_list.append((os.path.join(full_proposal_path, file), file))
file_select.options = dat_file_list
file_select.value = dat_file_list[0][0]
proposal_textinput = TextInput(title="Enter proposal number:", default_size=145)
proposal_textinput.on_change("value", proposal_textinput_callback)
def _init_datatable():
scan_list = list(det_data["scan"].keys())
file_list = []
extra_meta = det_data.get("extra_meta", {})
for scan_id in scan_list:
if scan_id in extra_meta:
f_path = extra_meta[scan_id]["original_filename"]
else:
f_path = det_data["meta"]["original_filename"]
_, f_name = os.path.split(f_path)
file_list.append(f_name)
scan_table_source.data.update(
file=file_list,
scan=scan_list,
param=[""] * len(scan_list),
peaks=[0] * len(scan_list),
fit=[0] * len(scan_list),
export=[True] * len(scan_list),
)
scan_table_source.selected.indices = []
scan_table_source.selected.indices = [0]
def file_select_callback(_attr, _old, _new):
pass
file_select = Select(title="Available .dat files")
file_select.on_change("value", file_select_callback)
def file_open_button_callback():
nonlocal det_data
with open(file_select.value) as file:
_, ext = os.path.splitext(file_select.value)
det_data = pyzebra.parse_1D(file, ext)
_init_datatable()
file_open_button = Button(label="Open", default_size=100)
file_open_button.on_click(file_open_button_callback)
def file_append_button_callback():
with open(file_select.value) as file:
_, ext = os.path.splitext(file_select.value)
append_data = pyzebra.parse_1D(file, ext)
pyzebra.add_dict(det_data, append_data)
_init_datatable()
file_append_button = Button(label="Append", default_size=100)
file_append_button.on_click(file_append_button_callback)
def upload_button_callback(_attr, _old, new):
nonlocal det_data
det_data = {}
for f_str, f_name in zip(new, upload_button.filename):
with io.StringIO(base64.b64decode(f_str).decode()) as file:
_, ext = os.path.splitext(f_name)
if det_data:
append_data = pyzebra.parse_1D(file, ext)
pyzebra.add_dict(det_data, append_data)
else:
det_data = pyzebra.parse_1D(file, ext)
_init_datatable()
upload_button = FileInput(accept=".dat", multiple=True)
upload_button.on_change("value", upload_button_callback)
def append_upload_button_callback(_attr, _old, new):
for f_str, f_name in zip(new, append_upload_button.filename):
with io.StringIO(base64.b64decode(f_str).decode()) as file:
_, ext = os.path.splitext(f_name)
append_data = pyzebra.parse_1D(file, ext)
pyzebra.add_dict(det_data, append_data)
_init_datatable()
append_upload_button = FileInput(accept=".dat", multiple=True)
append_upload_button.on_change("value", append_upload_button_callback)
def _update_table():
num_of_peaks = [len(scan.get("peak_indexes", [])) for scan in det_data["scan"].values()]
fit_ok = [(1 if "fit" in scan else 0) for scan in det_data["scan"].values()]
scan_table_source.data.update(peaks=num_of_peaks, fit=fit_ok)
def _update_plot():
_update_single_scan_plot(_get_selected_scan())
_update_overview()
def _update_single_scan_plot(scan):
nonlocal peak_pos_textinput_lock
peak_pos_textinput_lock = True
y = scan["Counts"]
x = scan["om"]
plot_scatter_source.data.update(x=x, y=y, y_upper=y + np.sqrt(y), y_lower=y - np.sqrt(y))
num_of_peaks = len(scan.get("peak_indexes", []))
if num_of_peaks is not None and num_of_peaks > 0:
peak_indexes = scan["peak_indexes"]
if len(peak_indexes) == 1:
peak_pos_textinput.value = str(x[peak_indexes[0]])
else:
peak_pos_textinput.value = str([x[ind] for ind in peak_indexes])
plot_peak_source.data.update(x=x[peak_indexes], y=scan["peak_heights"])
plot_line_smooth_source.data.update(x=x, y=scan["smooth_peaks"])
else:
peak_pos_textinput.value = None
plot_peak_source.data.update(x=[], y=[])
plot_line_smooth_source.data.update(x=[], y=[])
peak_pos_textinput_lock = False
fit = scan.get("fit")
if fit is not None:
x = scan["fit"]["x_fit"]
plot_gauss_source.data.update(x=x, y=scan["fit"]["comps"]["gaussian"])
plot_bkg_source.data.update(x=x, y=scan["fit"]["comps"]["background"])
params = fit["result"].params
fit_output_textinput.value = (
"Gaussian: centre = %9.4f, sigma = %9.4f, area = %9.4f \n"
"background: slope = %9.4f, intercept = %9.4f \n"
"Int. area = %9.4f +/- %9.4f \n"
"fit area = %9.4f +/- %9.4f \n"
"ratio((fit-int)/fit) = %9.4f"
% (
params["g_cen"].value,
params["g_width"].value,
params["g_amp"].value,
params["slope"].value,
params["intercept"].value,
fit["int_area"].n,
fit["int_area"].s,
params["g_amp"].value,
params["g_amp"].stderr,
(params["g_amp"].value - fit["int_area"].n) / params["g_amp"].value,
)
)
numfit_min, numfit_max = fit["numfit"]
if numfit_min is None:
numfit_min_span.location = None
else:
numfit_min_span.location = x[numfit_min]
if numfit_max is None:
numfit_max_span.location = None
else:
numfit_max_span.location = x[numfit_max]
else:
plot_gauss_source.data.update(x=[], y=[])
plot_bkg_source.data.update(x=[], y=[])
fit_output_textinput.value = ""
numfit_min_span.location = None
numfit_max_span.location = None
def _update_overview():
xs = []
ys = []
param = []
for ind, p in enumerate(scan_table_source.data["param"]):
if p:
s = scan_table_source.data["scan"][ind]
xs.append(np.array(det_data["scan"][s]["om"]))
ys.append(np.array(det_data["scan"][s]["Counts"]))
param.append(float(p))
ov_plot_mline_source.data.update(xs=xs, ys=ys, param=param, color=color_palette(len(xs)))
# Main plot
plot = Plot(x_range=DataRange1d(), y_range=DataRange1d(), plot_height=400, plot_width=700)
plot.add_layout(LinearAxis(axis_label="Counts"), place="left")
plot.add_layout(LinearAxis(axis_label="Omega"), place="below")
plot.add_layout(Grid(dimension=0, ticker=BasicTicker()))
plot.add_layout(Grid(dimension=1, ticker=BasicTicker()))
plot_scatter_source = ColumnDataSource(dict(x=[0], y=[0], y_upper=[0], y_lower=[0]))
plot.add_glyph(
plot_scatter_source, Scatter(x="x", y="y", line_color="steelblue", name="single scan")
)
plot.add_layout(
Whisker(
source=plot_scatter_source,
base="x",
upper="y_upper",
lower="y_lower",
name="single scan",
)
)
plot_line_smooth_source = ColumnDataSource(dict(x=[0], y=[0]))
plot.add_glyph(
plot_line_smooth_source,
Line(x="x", y="y", line_color="steelblue", line_dash="dashed", name="single scan"),
)
plot_gauss_source = ColumnDataSource(dict(x=[0], y=[0]))
plot.add_glyph(
plot_gauss_source,
Line(x="x", y="y", line_color="red", line_dash="dashed", name="single scan"),
)
plot_bkg_source = ColumnDataSource(dict(x=[0], y=[0]))
plot.add_glyph(
plot_bkg_source,
Line(x="x", y="y", line_color="green", line_dash="dashed", name="single scan"),
)
plot_peak_source = ColumnDataSource(dict(x=[], y=[]))
plot.add_glyph(
plot_peak_source, Asterisk(x="x", y="y", size=10, line_color="red", name="single scan")
)
numfit_min_span = Span(
location=None, dimension="height", line_dash="dashed", name="single scan"
)
plot.add_layout(numfit_min_span)
numfit_max_span = Span(
location=None, dimension="height", line_dash="dashed", name="single scan"
)
plot.add_layout(numfit_max_span)
plot.add_tools(PanTool(), WheelZoomTool(), ResetTool())
plot.toolbar.logo = None
# Overview multilines plot
ov_plot = Plot(x_range=DataRange1d(), y_range=DataRange1d(), plot_height=400, plot_width=700)
ov_plot.add_layout(LinearAxis(axis_label="Counts"), place="left")
ov_plot.add_layout(LinearAxis(axis_label="Omega"), place="below")
ov_plot.add_layout(Grid(dimension=0, ticker=BasicTicker()))
ov_plot.add_layout(Grid(dimension=1, ticker=BasicTicker()))
ov_plot_mline_source = ColumnDataSource(dict(xs=[], ys=[], param=[], color=[]))
ov_plot.add_glyph(
ov_plot_mline_source, MultiLine(xs="xs", ys="ys", line_color="color", name="overview")
)
ov_plot.add_tools(PanTool(), WheelZoomTool(), ResetTool())
ov_plot.toolbar.logo = None
# Plot tabs
plots = Tabs(
tabs=[Panel(child=plot, title="single scan"), Panel(child=ov_plot, title="overview")]
)
# Scan select
def scan_table_select_callback(_attr, old, new):
if not new:
# skip empty selections
return
# Avoid selection of multiple indicies (via Shift+Click or Ctrl+Click)
if len(new) > 1:
# drop selection to the previous one
scan_table_source.selected.indices = old
return
if len(old) > 1:
# skip unnecessary update caused by selection drop
return
_update_plot()
scan_table_source = ColumnDataSource(
dict(file=[], scan=[], param=[], peaks=[], fit=[], export=[])
)
scan_table = DataTable(
source=scan_table_source,
columns=[
TableColumn(field="file", title="file", width=150),
TableColumn(field="scan", title="scan", width=50),
TableColumn(field="param", title="param", width=50),
TableColumn(field="peaks", title="Peaks", width=50),
TableColumn(field="fit", title="Fit", width=50),
TableColumn(field="export", title="Export", editor=CheckboxEditor(), width=50),
],
width=400,
index_position=None,
editable=True,
fit_columns=False,
)
def scan_table_source_callback(_attr, _old, _new):
if scan_table_source.selected.indices:
_update_plot()
scan_table_source.selected.on_change("indices", scan_table_select_callback)
scan_table_source.on_change("data", scan_table_source_callback)
def _get_selected_scan():
selected_index = scan_table_source.selected.indices[0]
selected_scan_id = scan_table_source.data["scan"][selected_index]
return det_data["scan"][selected_scan_id]
def peak_pos_textinput_callback(_attr, _old, new):
if new is not None and not peak_pos_textinput_lock:
scan = _get_selected_scan()
peak_ind = (np.abs(scan["om"] - float(new))).argmin()
scan["peak_indexes"] = np.array([peak_ind], dtype=np.int64)
scan["peak_heights"] = np.array([scan["smooth_peaks"][peak_ind]])
_update_table()
_update_plot()
peak_pos_textinput = TextInput(title="Peak position:", default_size=145)
peak_pos_textinput.on_change("value", peak_pos_textinput_callback)
peak_int_ratio_spinner = Spinner(
title="Peak intensity ratio:", value=0.8, step=0.01, low=0, high=1, default_size=145
)
peak_prominence_spinner = Spinner(title="Peak prominence:", value=50, low=0, default_size=145)
smooth_toggle = Toggle(label="Smooth curve", default_size=145)
window_size_spinner = Spinner(title="Window size:", value=7, step=2, low=1, default_size=145)
poly_order_spinner = Spinner(title="Poly order:", value=3, low=0, default_size=145)
integ_from = Spinner(title="Integrate from:", default_size=145)
integ_to = Spinner(title="to:", default_size=145)
def fitparam_reset_button_callback():
...
fitparam_reset_button = Button(label="Reset to defaults", default_size=145, disabled=True)
fitparam_reset_button.on_click(fitparam_reset_button_callback)
def fitparams_add_dropdown_callback(click):
new_tag = str(fitparams_select.tags[0]) # bokeh requires (str, str) for MultiSelect options
fitparams_select.options.append((new_tag, click.item))
fit_params[new_tag] = fitparams_factory(click.item)
fitparams_select.tags[0] += 1
fitparams_add_dropdown = Dropdown(
label="Add fit function",
menu=[
("Background", "background"),
("Gauss", "gauss"),
("Voigt", "voigt"),
("Pseudo Voigt", "pseudovoigt"),
("Pseudo Voigt1", "pseudovoigt1"),
],
default_size=145,
)
fitparams_add_dropdown.on_click(fitparams_add_dropdown_callback)
def fitparams_select_callback(_attr, old, new):
# Avoid selection of multiple indicies (via Shift+Click or Ctrl+Click)
if len(new) > 1:
# drop selection to the previous one
fitparams_select.value = old
return
if len(old) > 1:
# skip unnecessary update caused by selection drop
return
if new:
fitparams_table_source.data.update(fit_params[new[0]])
else:
fitparams_table_source.data.update(dict(param=[], guess=[], vary=[], min=[], max=[]))
fitparams_select = MultiSelect(options=[], height=120, default_size=145)
fitparams_select.tags = [0]
fitparams_select.on_change("value", fitparams_select_callback)
def fitparams_remove_button_callback():
if fitparams_select.value:
sel_tag = fitparams_select.value[0]
del fit_params[sel_tag]
for elem in fitparams_select.options:
if elem[0] == sel_tag:
fitparams_select.options.remove(elem)
break
fitparams_select.value = []
fitparams_remove_button = Button(label="Remove fit function", default_size=145)
fitparams_remove_button.on_click(fitparams_remove_button_callback)
def fitparams_factory(function):
if function == "background":
params = ["slope", "offset"]
elif function == "gauss":
params = ["center", "sigma", "amplitude"]
elif function == "voigt":
params = ["center", "sigma", "amplitude", "gamma"]
elif function == "pseudovoigt":
params = ["center", "sigma", "amplitude", "fraction"]
elif function == "pseudovoigt1":
params = ["center", "g_sigma", "l_sigma", "amplitude", "fraction"]
else:
raise ValueError("Unknown fit function")
n = len(params)
fitparams = dict(
param=params, guess=[None] * n, vary=[True] * n, min=[None] * n, max=[None] * n,
)
return fitparams
fitparams_table_source = ColumnDataSource(dict(param=[], guess=[], vary=[], min=[], max=[]))
fitparams_table = DataTable(
source=fitparams_table_source,
columns=[
TableColumn(field="param", title="Parameter"),
TableColumn(field="guess", title="Guess", editor=NumberEditor()),
TableColumn(field="vary", title="Vary", editor=CheckboxEditor()),
TableColumn(field="min", title="Min", editor=NumberEditor()),
TableColumn(field="max", title="Max", editor=NumberEditor()),
],
height=200,
width=350,
index_position=None,
editable=True,
auto_edit=True,
)
# start with `background` and `gauss` fit functions added
fitparams_add_dropdown_callback(types.SimpleNamespace(item="background"))
fitparams_add_dropdown_callback(types.SimpleNamespace(item="gauss"))
fit_output_textinput = TextAreaInput(title="Fit results:", width=450, height=400)
def _get_peakfind_params():
return dict(
int_threshold=peak_int_ratio_spinner.value,
prominence=peak_prominence_spinner.value,
smooth=smooth_toggle.active,
window_size=window_size_spinner.value,
poly_order=poly_order_spinner.value,
)
def peakfind_all_button_callback():
peakfind_params = _get_peakfind_params()
for scan in det_data["scan"].values():
pyzebra.ccl_findpeaks(scan, **peakfind_params)
_update_table()
_update_plot()
peakfind_all_button = Button(label="Peak Find All", button_type="primary", default_size=145)
peakfind_all_button.on_click(peakfind_all_button_callback)
def peakfind_button_callback():
scan = _get_selected_scan()
pyzebra.ccl_findpeaks(scan, **_get_peakfind_params())
_update_table()
_update_plot()
peakfind_button = Button(label="Peak Find Current", default_size=145)
peakfind_button.on_click(peakfind_button_callback)
def _get_fit_params():
return dict(
guess=fit_params["1"]["guess"] + fit_params["0"]["guess"],
vary=fit_params["1"]["vary"] + fit_params["0"]["vary"],
constraints_min=fit_params["1"]["min"] + fit_params["0"]["min"],
constraints_max=fit_params["1"]["max"] + fit_params["0"]["max"],
numfit_min=integ_from.value,
numfit_max=integ_to.value,
binning=bin_size_spinner.value,
)
def fit_all_button_callback():
fit_params = _get_fit_params()
for scan in det_data["scan"].values():
# fit_params are updated inplace within `fitccl`
pyzebra.fitccl(scan, **deepcopy(fit_params))
_update_plot()
_update_table()
fit_all_button = Button(label="Fit All", button_type="primary", default_size=145)
fit_all_button.on_click(fit_all_button_callback)
def fit_button_callback():
scan = _get_selected_scan()
pyzebra.fitccl(scan, **_get_fit_params())
_update_plot()
_update_table()
fit_button = Button(label="Fit Current", default_size=145)
fit_button.on_click(fit_button_callback)
def area_method_radiobutton_callback(_attr, _old, new):
det_data["meta"]["area_method"] = AREA_METHODS[new]
area_method_radiobutton = RadioButtonGroup(
labels=["Fit area", "Int area"], active=0, default_size=145,
)
area_method_radiobutton.on_change("active", area_method_radiobutton_callback)
bin_size_spinner = Spinner(title="Bin size:", value=1, low=1, step=1, default_size=145)
lorentz_toggle = Toggle(label="Lorentz Correction", default_size=145)
preview_output_textinput = TextAreaInput(title="Export file preview:", width=450, height=400)
def preview_output_button_callback():
if det_data["meta"]["indices"] == "hkl":
ext = ".comm"
elif det_data["meta"]["indices"] == "real":
ext = ".incomm"
with tempfile.TemporaryDirectory() as temp_dir:
temp_file = temp_dir + "/temp"
export_data = deepcopy(det_data)
for s, export in zip(scan_table_source.data["scan"], scan_table_source.data["export"]):
if not export:
del export_data["scan"][s]
pyzebra.export_comm(export_data, temp_file, lorentz=lorentz_toggle.active)
with open(f"{temp_file}{ext}") as f:
preview_output_textinput.value = f.read()
preview_output_button = Button(label="Preview file", default_size=220)
preview_output_button.on_click(preview_output_button_callback)
def export_results(det_data):
if det_data["meta"]["indices"] == "hkl":
ext = ".comm"
elif det_data["meta"]["indices"] == "real":
ext = ".incomm"
with tempfile.TemporaryDirectory() as temp_dir:
temp_file = temp_dir + "/temp"
export_data = deepcopy(det_data)
for s, export in zip(scan_table_source.data["scan"], scan_table_source.data["export"]):
if not export:
del export_data["scan"][s]
pyzebra.export_comm(export_data, temp_file, lorentz=lorentz_toggle.active)
with open(f"{temp_file}{ext}") as f:
output_content = f.read()
return output_content, ext
def save_button_callback():
cont, ext = export_results(det_data)
js_data.data.update(cont=[cont], ext=[ext])
save_button = Button(label="Download file", button_type="success", default_size=220)
save_button.on_click(save_button_callback)
save_button.js_on_click(CustomJS(args={"js_data": js_data}, code=javaScript))
findpeak_controls = column(
row(peak_pos_textinput, column(Spacer(height=19), smooth_toggle)),
row(peak_int_ratio_spinner, peak_prominence_spinner),
row(window_size_spinner, poly_order_spinner),
row(peakfind_button, peakfind_all_button),
)
fitpeak_controls = row(
column(fitparams_add_dropdown, fitparams_select, fitparams_remove_button),
fitparams_table,
Spacer(width=20),
column(
row(integ_from, integ_to),
row(bin_size_spinner, column(Spacer(height=19), lorentz_toggle)),
row(fitparam_reset_button, area_method_radiobutton),
row(fit_button, fit_all_button),
),
)
export_layout = column(preview_output_textinput, row(preview_output_button, save_button))
upload_div = Div(text="Or upload .dat files:")
append_upload_div = Div(text="append extra .dat files:")
tab_layout = column(
row(
proposal_textinput,
file_select,
column(Spacer(height=19), row(file_open_button, file_append_button)),
),
row(
column(Spacer(height=5), upload_div),
upload_button,
column(Spacer(height=5), append_upload_div),
append_upload_button,
),
row(scan_table, plots, Spacer(width=30), fit_output_textinput, export_layout),
row(findpeak_controls, Spacer(width=30), fitpeak_controls),
)
return Panel(child=tab_layout, title="param study")

View File

@ -1,513 +0,0 @@
import numpy as np
import uncertainties as u
from .fit2 import create_uncertanities
def add_dict(dict1, dict2):
"""adds two dictionaries, meta of the new is saved as meata+original_filename and
measurements are shifted to continue with numbering of first dict
:arg dict1 : dictionarry to add to
:arg dict2 : dictionarry from which to take the measurements
:return dict1 : combined dictionary
Note: dict1 must be made from ccl, otherwise we would have to change the structure of loaded
dat file"""
max_measurement_dict1 = max([int(str(keys)[1:]) for keys in dict1["scan"]])
if dict2["meta"]["data_type"] == ".ccl":
new_filenames = [
"M" + str(x + max_measurement_dict1)
for x in [int(str(keys)[1:]) for keys in dict2["scan"]]
]
new_meta_name = "meta" + str(dict2["meta"]["original_filename"])
if new_meta_name not in dict1:
for keys, name in zip(dict2["scan"], new_filenames):
dict2["scan"][keys]["file_of_origin"] = str(dict2["meta"]["original_filename"])
dict1["scan"][name] = dict2["scan"][keys]
dict1[new_meta_name] = dict2["meta"]
else:
raise KeyError(
str(
"The file %s has alredy been added to %s"
% (dict2["meta"]["original_filename"], dict1["meta"]["original_filename"])
)
)
elif dict2["meta"]["data_type"] == ".dat":
d = {}
new_name = "M" + str(max_measurement_dict1 + 1)
hkl = dict2["meta"]["title"]
d["h_index"] = float(hkl.split()[-3])
d["k_index"] = float(hkl.split()[-2])
d["l_index"] = float(hkl.split()[-1])
d["number_of_measurements"] = len(dict2["scan"]["NP"])
d["om"] = dict2["scan"]["om"]
d["Counts"] = dict2["scan"]["Counts"]
d["monitor"] = dict2["scan"]["Monitor1"][0]
d["temperature"] = dict2["meta"]["temp"]
d["mag_field"] = dict2["meta"]["mf"]
d["omega_angle"] = dict2["meta"]["omega"]
dict1["scan"][new_name] = d
print(hkl.split())
for keys in d:
print(keys)
print("s")
return dict1
def auto(dict):
"""takes just unique tuples from all tuples in dictionary returend by scan_dict
intendet for automatic merge if you doesent want to specify what scans to merge together
args: dict - dictionary from scan_dict function
:return dict - dict without repetitions"""
for keys in dict:
tuple_list = dict[keys]
new = list()
for i in range(len(tuple_list)):
if tuple_list[0][0] == tuple_list[i][0]:
new.append(tuple_list[i])
dict[keys] = new
return dict
def scan_dict(dict):
"""scans dictionary for duplicate hkl indexes
:arg dict : dictionary to scan
:return dictionary with matching scans, if there are none, the dict is empty
note: can be checked by "not d", true if empty
"""
d = {}
for i in dict["scan"]:
for j in dict["scan"]:
if dict["scan"][str(i)] != dict["scan"][str(j)]:
itup = (
dict["scan"][str(i)]["h_index"],
dict["scan"][str(i)]["k_index"],
dict["scan"][str(i)]["l_index"],
)
jtup = (
dict["scan"][str(j)]["h_index"],
dict["scan"][str(j)]["k_index"],
dict["scan"][str(j)]["l_index"],
)
if itup != jtup:
pass
else:
if str(itup) not in d:
d[str(itup)] = list()
d[str(itup)].append((i, j))
else:
d[str(itup)].append((i, j))
else:
continue
return d
def compare_hkl(dict1, dict2):
"""Compares two dictionaries based on hkl indexes and return dictionary with str(h k l) as
key and tuple with keys to same scan in dict1 and dict2
:arg dict1 : first dictionary
:arg dict2 : second dictionary
:return d : dict with matches
example of one key: '0.0 0.0 -1.0 : ('M1', 'M9')' meaning that 001 hkl scan is M1 in
first dict and M9 in second"""
d = {}
dupl = 0
for keys in dict1["scan"]:
for key in dict2["scan"]:
if (
dict1["scan"][str(keys)]["h_index"] == dict2["scan"][str(key)]["h_index"]
and dict1["scan"][str(keys)]["k_index"] == dict2["scan"][str(key)]["k_index"]
and dict1["scan"][str(keys)]["l_index"] == dict2["scan"][str(key)]["l_index"]
):
if (
str(
(
str(dict1["scan"][str(keys)]["h_index"])
+ " "
+ str(dict1["scan"][str(keys)]["k_index"])
+ " "
+ str(dict1["scan"][str(keys)]["l_index"])
)
)
not in d
):
d[
str(
str(dict1["scan"][str(keys)]["h_index"])
+ " "
+ str(dict1["scan"][str(keys)]["k_index"])
+ " "
+ str(dict1["scan"][str(keys)]["l_index"])
)
] = (str(keys), str(key))
else:
dupl = dupl + 1
d[
str(
str(dict1["scan"][str(keys)]["h_index"])
+ " "
+ str(dict1["scan"][str(keys)]["k_index"])
+ " "
+ str(dict1["scan"][str(keys)]["l_index"])
+ "_dupl"
+ str(dupl)
)
] = (str(keys), str(key))
else:
continue
return d
def create_tuples(x, y, y_err):
"""creates tuples for sorting and merginng of the data
Counts need to be normalized to monitor before"""
t = list()
for i in range(len(x)):
tup = (x[i], y[i], y_err[i])
t.append(tup)
return t
def normalize(dict, key, monitor):
"""Normalizes the scan to monitor, checks if sigma exists, otherwise creates it
:arg dict : dictionary to from which to tkae the scan
:arg key : which scan to normalize from dict1
:arg monitor : final monitor
:return counts - normalized counts
:return sigma - normalized sigma"""
counts = np.array(dict["scan"][key]["Counts"])
sigma = np.sqrt(counts) if "sigma" not in dict["scan"][key] else dict["scan"][key]["sigma"]
monitor_ratio = monitor / dict["scan"][key]["monitor"]
scaled_counts = counts * monitor_ratio
scaled_sigma = np.array(sigma) * monitor_ratio
return scaled_counts, scaled_sigma
def merge(dict1, dict2, keys, auto=True, monitor=100000):
"""merges the two tuples and sorts them, if om value is same, Counts value is average
averaging is propagated into sigma if dict1 == dict2, key[1] is deleted after merging
:arg dict1 : dictionary to which scan will be merged
:arg dict2 : dictionary from which scan will be merged
:arg keys : tuple with key to dict1 and dict2
:arg auto : if true, when monitors are same, does not change it, if flase, takes monitor always
:arg monitor : final monitor after merging
note: dict1 and dict2 can be same dict
:return dict1 with merged scan"""
if auto:
if dict1["scan"][keys[0]]["monitor"] == dict2["scan"][keys[1]]["monitor"]:
monitor = dict1["scan"][keys[0]]["monitor"]
# load om and Counts
x1, x2 = dict1["scan"][keys[0]]["om"], dict2["scan"][keys[1]]["om"]
cor_y1, y_err1 = normalize(dict1, keys[0], monitor=monitor)
cor_y2, y_err2 = normalize(dict2, keys[1], monitor=monitor)
# creates touples (om, Counts, sigma) for sorting and further processing
tuple_list = create_tuples(x1, cor_y1, y_err1) + create_tuples(x2, cor_y2, y_err2)
# Sort the list on om and add 0 0 0 tuple to the last position
sorted_t = sorted(tuple_list, key=lambda tup: tup[0])
sorted_t.append((0, 0, 0))
om, Counts, sigma = [], [], []
seen = list()
for i in range(len(sorted_t) - 1):
if sorted_t[i][0] not in seen:
if sorted_t[i][0] != sorted_t[i + 1][0]:
om = np.append(om, sorted_t[i][0])
Counts = np.append(Counts, sorted_t[i][1])
sigma = np.append(sigma, sorted_t[i][2])
else:
om = np.append(om, sorted_t[i][0])
counts1, counts2 = sorted_t[i][1], sorted_t[i + 1][1]
sigma1, sigma2 = sorted_t[i][2], sorted_t[i + 1][2]
count_err1 = u.ufloat(counts1, sigma1)
count_err2 = u.ufloat(counts2, sigma2)
avg = (count_err1 + count_err2) / 2
Counts = np.append(Counts, avg.n)
sigma = np.append(sigma, avg.s)
seen.append(sorted_t[i][0])
else:
continue
if dict1 == dict2:
del dict1["scan"][keys[1]]
note = (
f"This scan was merged with scan {keys[1]} from "
f'file {dict2["meta"]["original_filename"]} \n'
)
if "notes" not in dict1["scan"][str(keys[0])]:
dict1["scan"][str(keys[0])]["notes"] = note
else:
dict1["scan"][str(keys[0])]["notes"] += note
dict1["scan"][keys[0]]["om"] = om
dict1["scan"][keys[0]]["Counts"] = Counts
dict1["scan"][keys[0]]["sigma"] = sigma
dict1["scan"][keys[0]]["monitor"] = monitor
print("merging done")
return dict1
def substract_measurement(dict1, dict2, keys, auto=True, monitor=100000):
"""Substracts two scan (scan key2 from dict2 from measurent key1 in dict1), expects om to be same
:arg dict1 : dictionary to which scan will be merged
:arg dict2 : dictionary from which scan will be merged
:arg keys : tuple with key to dict1 and dict2
:arg auto : if true, when monitors are same, does not change it, if flase, takes monitor always
:arg monitor : final monitor after merging
:returns d : dict1 with substracted Counts from dict2 and sigma that comes from the substraction"""
if len(dict1["scan"][keys[0]]["om"]) != len(dict2["scan"][keys[1]]["om"]):
raise ValueError("Omegas have different lengths, cannot be substracted")
if auto:
if dict1["scan"][keys[0]]["monitor"] == dict2["scan"][keys[1]]["monitor"]:
monitor = dict1["scan"][keys[0]]["monitor"]
cor_y1, y_err1 = normalize(dict1, keys[0], monitor=monitor)
cor_y2, y_err2 = normalize(dict2, keys[1], monitor=monitor)
dict1_count_err = create_uncertanities(cor_y1, y_err1)
dict2_count_err = create_uncertanities(cor_y2, y_err2)
res = np.subtract(dict1_count_err, dict2_count_err)
res_nom = []
res_err = []
for k in range(len(res)):
res_nom = np.append(res_nom, res[k].n)
res_err = np.append(res_err, res[k].s)
if len([num for num in res_nom if num < 0]) >= 0.3 * len(res_nom):
print(
f"Warning! percentage of negative numbers in scan subsracted {keys[0]} is "
f"{len([num for num in res_nom if num < 0]) / len(res_nom)}"
)
dict1["scan"][str(keys[0])]["Counts"] = res_nom
dict1["scan"][str(keys[0])]["sigma"] = res_err
dict1["scan"][str(keys[0])]["monitor"] = monitor
note = (
f'Scan {keys[1]} from file {dict2["meta"]["original_filename"]} '
f"was substracted from this scan \n"
)
if "notes" not in dict1["scan"][str(keys[0])]:
dict1["scan"][str(keys[0])]["notes"] = note
else:
dict1["scan"][str(keys[0])]["notes"] += note
return dict1
def compare_dict(dict1, dict2):
"""takes two ccl dictionaries and compare different values for each key
:arg dict1 : dictionary 1 (ccl)
:arg dict2 : dictionary 2 (ccl)
:returns warning : dictionary with keys from primary files (if they differ) with
information of how many scan differ and which ones differ
:returns report_string string comparing all different values respecively of measurements"""
if dict1["meta"]["data_type"] != dict2["meta"]["data_type"]:
print("select two dicts")
return
S = []
conflicts = {}
warnings = {}
comp = compare_hkl(dict1, dict2)
d1 = scan_dict(dict1)
d2 = scan_dict(dict2)
if not d1:
S.append("There are no duplicates in %s (dict1) \n" % dict1["meta"]["original_filename"])
else:
S.append(
"There are %d duplicates in %s (dict1) \n"
% (len(d1), dict1["meta"]["original_filename"])
)
warnings["Duplicates in dict1"] = list()
for keys in d1:
S.append("Measurements %s with hkl %s \n" % (d1[keys], keys))
warnings["Duplicates in dict1"].append(d1[keys])
if not d2:
S.append("There are no duplicates in %s (dict2) \n" % dict2["meta"]["original_filename"])
else:
S.append(
"There are %d duplicates in %s (dict2) \n"
% (len(d2), dict2["meta"]["original_filename"])
)
warnings["Duplicates in dict2"] = list()
for keys in d2:
S.append("Measurements %s with hkl %s \n" % (d2[keys], keys))
warnings["Duplicates in dict2"].append(d2[keys])
# compare meta
S.append("Different values in meta: \n")
different_meta = {
k: dict1["meta"][k]
for k in dict1["meta"]
if k in dict2["meta"] and dict1["meta"][k] != dict2["meta"][k]
}
exlude_meta_set = ["original_filename", "date", "title"]
for keys in different_meta:
if keys in exlude_meta_set:
continue
else:
if keys not in conflicts:
conflicts[keys] = 1
else:
conflicts[keys] = conflicts[keys] + 1
S.append(" Different values in %s \n" % str(keys))
S.append(" dict1: %s \n" % str(dict1["meta"][str(keys)]))
S.append(" dict2: %s \n" % str(dict2["meta"][str(keys)]))
# compare Measurements
S.append(
"Number of measurements in %s = %s \n"
% (dict1["meta"]["original_filename"], len(dict1["scan"]))
)
S.append(
"Number of measurements in %s = %s \n"
% (dict2["meta"]["original_filename"], len(dict2["scan"]))
)
S.append("Different values in Measurements:\n")
select_set = ["om", "Counts", "sigma"]
exlude_set = ["time", "Counts", "date", "notes"]
for keys1 in comp:
for key2 in dict1["scan"][str(comp[str(keys1)][0])]:
if key2 in exlude_set:
continue
if key2 not in select_set:
try:
if (
dict1["scan"][comp[str(keys1)][0]][str(key2)]
!= dict2["scan"][str(comp[str(keys1)][1])][str(key2)]
):
S.append(
"Scan value "
"%s"
", with hkl %s differs in meausrements %s and %s \n"
% (key2, keys1, comp[str(keys1)][0], comp[str(keys1)][1])
)
S.append(
" dict1: %s \n"
% str(dict1["scan"][comp[str(keys1)][0]][str(key2)])
)
S.append(
" dict2: %s \n"
% str(dict2["scan"][comp[str(keys1)][1]][str(key2)])
)
if key2 not in conflicts:
conflicts[key2] = {}
conflicts[key2]["amount"] = 1
conflicts[key2]["scan"] = str(comp[str(keys1)])
else:
conflicts[key2]["amount"] = conflicts[key2]["amount"] + 1
conflicts[key2]["scan"] = (
conflicts[key2]["scan"] + " " + (str(comp[str(keys1)]))
)
except KeyError as e:
print("Missing keys, some files were probably merged or substracted")
print(e.args)
else:
try:
comparison = list(dict1["scan"][comp[str(keys1)][0]][str(key2)]) == list(
dict2["scan"][comp[str(keys1)][1]][str(key2)]
)
if len(list(dict1["scan"][comp[str(keys1)][0]][str(key2)])) != len(
list(dict2["scan"][comp[str(keys1)][1]][str(key2)])
):
if str("different length of %s" % key2) not in warnings:
warnings[str("different length of %s" % key2)] = list()
warnings[str("different length of %s" % key2)].append(
(str(comp[keys1][0]), str(comp[keys1][1]))
)
else:
warnings[str("different length of %s" % key2)].append(
(str(comp[keys1][0]), str(comp[keys1][1]))
)
if not comparison:
S.append(
"Scan value "
"%s"
" differs in scan %s and %s \n"
% (key2, comp[str(keys1)][0], comp[str(keys1)][1])
)
S.append(
" dict1: %s \n"
% str(list(dict1["scan"][comp[str(keys1)][0]][str(key2)]))
)
S.append(
" dict2: %s \n"
% str(list(dict2["scan"][comp[str(keys1)][1]][str(key2)]))
)
if key2 not in conflicts:
conflicts[key2] = {}
conflicts[key2]["amount"] = 1
conflicts[key2]["scan"] = str(comp[str(keys1)])
else:
conflicts[key2]["amount"] = conflicts[key2]["amount"] + 1
conflicts[key2]["scan"] = (
conflicts[key2]["scan"] + " " + (str(comp[str(keys1)]))
)
except KeyError as e:
print("Missing keys, some files were probably merged or substracted")
print(e.args)
for keys in conflicts:
try:
conflicts[str(keys)]["scan"] = conflicts[str(keys)]["scan"].split(" ")
except:
continue
report_string = "".join(S)
return warnings, conflicts, report_string
def guess_next(dict1, dict2, comp):
"""iterates thorough the scans and tries to decide if the scans should be
substracted or merged"""
threshold = 0.05
for keys in comp:
if (
abs(
(
dict1["scan"][str(comp[keys][0])]["temperature"]
- dict2["scan"][str(comp[keys][1])]["temperature"]
)
/ dict2["scan"][str(comp[keys][1])]["temperature"]
)
< threshold
and abs(
(
dict1["scan"][str(comp[keys][0])]["mag_field"]
- dict2["scan"][str(comp[keys][1])]["mag_field"]
)
/ dict2["scan"][str(comp[keys][1])]["mag_field"]
)
< threshold
):
comp[keys] = comp[keys] + tuple("m")
else:
comp[keys] = comp[keys] + tuple("s")
return comp
def process_dict(dict1, dict2, comp):
"""substracts or merges scans, guess_next function must run first """
for keys in comp:
if comp[keys][2] == "s":
substract_measurement(dict1, dict2, comp[keys])
elif comp[keys][2] == "m":
merge(dict1, dict2, comp[keys])
return dict1

View File

@ -5,7 +5,13 @@ from scipy.signal import savgol_filter
def ccl_findpeaks(
scan, int_threshold=0.8, prominence=50, smooth=False, window_size=7, poly_order=3
scan,
int_threshold=0.8,
prominence=50,
smooth=False,
window_size=7,
poly_order=3,
variable="om",
):
"""function iterates through the dictionary created by load_cclv2 and locates peaks for each scan
@ -23,11 +29,6 @@ def ccl_findpeaks(
window_size - window size for savgol filter, must be odd positive integer
poly_order = order of the polynomial used in savgol filter, must be positive integer smaller than
window_size returns: dictionary with following structure:
D{M34{ 'num_of_peaks': 1, #num of peaks
'peak_indexes': [20], # index of peaks in omega array
'peak_heights': [90.], # height of the peaks (if data vere smoothed
its the heigh of the peaks in smoothed data)
"""
if not 0 <= int_threshold <= 1:
int_threshold = 0.8
@ -54,7 +55,7 @@ def ccl_findpeaks(
prominence = 50
print("Invalid value for prominence, select positive number, new value set to:", prominence)
omega = scan["om"]
omega = scan[variable]
counts = np.array(scan["Counts"])
if smooth:
itp = interp1d(omega, counts, kind="linear")
@ -69,7 +70,6 @@ def ccl_findpeaks(
peaks, properties = sc.signal.find_peaks(
smooth_peaks, height=int_threshold * max(smooth_peaks), prominence=prominence
)
scan["num_of_peaks"] = len(peaks)
scan["peak_indexes"] = peaks
scan["peak_heights"] = properties["peak_heights"]
scan["smooth_peaks"] = smooth_peaks # smoothed curve

View File

@ -1,7 +1,6 @@
import os
import re
from collections import defaultdict
from decimal import Decimal
import numpy as np
@ -20,6 +19,7 @@ META_VARS_STR = (
"proposal_email",
"detectorDistance",
)
META_VARS_FLOAT = (
"omega",
"mf",
@ -55,33 +55,33 @@ META_VARS_FLOAT = (
"s2hr",
"s2hl",
)
META_UB_MATRIX = ("ub1j", "ub2j", "ub3j")
CCL_FIRST_LINE = (
# the first element is `scan_number`, which we don't save to metadata
("scan_number", int),
("h_index", float),
("k_index", float),
("l_index", float),
)
CCL_FIRST_LINE_BI = (
*CCL_FIRST_LINE,
("twotheta_angle", float),
("omega_angle", float),
("chi_angle", float),
("phi_angle", float),
)
CCL_FIRST_LINE_NB = (
*CCL_FIRST_LINE,
("gamma_angle", float),
("omega_angle", float),
("nu_angle", float),
("unkwn_angle", float),
)
CCL_ANGLES = {
"bi": (
("twotheta_angle", float),
("omega_angle", float),
("chi_angle", float),
("phi_angle", float),
),
"nb": (
("gamma_angle", float),
("omega_angle", float),
("nu_angle", float),
("unkwn_angle", float),
),
}
CCL_SECOND_LINE = (
("number_of_measurements", int),
("n_points", int),
("angle_step", float),
("monitor", float),
("temperature", float),
@ -91,6 +91,8 @@ CCL_SECOND_LINE = (
("scan_type", str),
)
AREA_METHODS = ("fit_area", "int_area")
def load_1D(filepath):
"""
@ -132,50 +134,34 @@ def parse_1D(fileobj, data_type):
# read data
scan = {}
if data_type == ".ccl":
decimal = list()
if metadata["zebra_mode"] == "bi":
ccl_first_line = CCL_FIRST_LINE_BI
elif metadata["zebra_mode"] == "nb":
ccl_first_line = CCL_FIRST_LINE_NB
ccl_first_line = (*CCL_FIRST_LINE, *CCL_ANGLES[metadata["zebra_mode"]])
ccl_second_line = CCL_SECOND_LINE
for line in fileobj:
d = {}
# first line
scan_number, *params = line.split()
for param, (param_name, param_type) in zip(params, ccl_first_line):
for param, (param_name, param_type) in zip(line.split(), ccl_first_line):
d[param_name] = param_type(param)
decimal.append(bool(Decimal(d["h_index"]) % 1 == 0))
decimal.append(bool(Decimal(d["k_index"]) % 1 == 0))
decimal.append(bool(Decimal(d["l_index"]) % 1 == 0))
# second line
next_line = next(fileobj)
params = next_line.split()
for param, (param_name, param_type) in zip(params, ccl_second_line):
for param, (param_name, param_type) in zip(next_line.split(), ccl_second_line):
d[param_name] = param_type(param)
d["om"] = np.linspace(
d["omega_angle"] - (d["number_of_measurements"] / 2) * d["angle_step"],
d["omega_angle"] + (d["number_of_measurements"] / 2) * d["angle_step"],
d["number_of_measurements"],
d["omega_angle"] - (d["n_points"] / 2) * d["angle_step"],
d["omega_angle"] + (d["n_points"] / 2) * d["angle_step"],
d["n_points"],
)
# subsequent lines with counts
counts = []
while len(counts) < d["number_of_measurements"]:
while len(counts) < d["n_points"]:
counts.extend(map(int, next(fileobj).split()))
d["Counts"] = counts
scan[int(scan_number)] = d
if all(decimal):
metadata["indices"] = "hkl"
else:
metadata["indices"] = "real"
scan[d["scan_number"]] = d
elif data_type == ".dat":
# skip the first 2 rows, the third row contans the column names
@ -199,23 +185,91 @@ def parse_1D(fileobj, data_type):
except (ValueError, IndexError):
print("seems hkl is not in title")
data_cols["om"] = np.array(data_cols["om"])
data_cols["temperature"] = metadata["temp"]
data_cols["mag_field"] = metadata["mf"]
try:
data_cols["mag_field"] = metadata["mf"]
except KeyError:
print("Mag_field not present in dat file")
data_cols["omega_angle"] = metadata["omega"]
data_cols["number_of_measurements"] = len(data_cols["om"])
data_cols["n_points"] = len(data_cols["om"])
data_cols["monitor"] = data_cols["Monitor1"][0]
data_cols["twotheta_angle"] = metadata["2-theta"]
data_cols["chi_angle"] = metadata["chi"]
data_cols["phi_angle"] = metadata["phi"]
data_cols["nu_angle"] = metadata["nu"]
scan[1] = dict(data_cols)
data_cols["scan_number"] = 1
scan[data_cols["scan_number"]] = dict(data_cols)
else:
print("Unknown file extention")
# utility information
if all(
s["h_index"].is_integer() and s["k_index"].is_integer() and s["l_index"].is_integer()
for s in scan.values()
):
metadata["indices"] = "hkl"
else:
metadata["indices"] = "real"
metadata["data_type"] = data_type
metadata["area_method"] = "fit"
metadata["area_method"] = AREA_METHODS[0]
return {"meta": metadata, "scan": scan}
def export_comm(data, path, lorentz=False, hkl_precision=2):
"""exports data in the *.comm format
:param lorentz: perform Lorentz correction
:param path: path to file + name
:arg data - data to export, is dict after peak fitting
"""
zebra_mode = data["meta"]["zebra_mode"]
if data["meta"]["indices"] == "hkl":
extension = ".comm"
else: # data["meta"]["indices"] == "real":
extension = ".incomm"
with open(str(path + extension), "w") as out_file:
for key, scan in data["scan"].items():
if "fit" not in scan:
print("Scan skipped - no fit value for:", key)
continue
scan_str = f"{key:6}"
h, k, l = scan["h_index"], scan["k_index"], scan["l_index"]
if data["meta"]["indices"] == "hkl":
hkl_str = f"{int(h):6}{int(k):6}{int(l):6}"
else: # data["meta"]["indices"] == "real"
hkl_str = f"{h:8.{hkl_precision}f}{k:8.{hkl_precision}f}{l:8.{hkl_precision}f}"
area_method = data["meta"]["area_method"]
area_n = scan["fit"][area_method].n
area_s = scan["fit"][area_method].s
# apply lorentz correction to area
if lorentz:
if zebra_mode == "bi":
twotheta_angle = np.deg2rad(scan["twotheta_angle"])
corr_factor = np.sin(twotheta_angle)
else: # zebra_mode == "nb":
gamma_angle = np.deg2rad(scan["gamma_angle"])
nu_angle = np.deg2rad(scan["nu_angle"])
corr_factor = np.sin(gamma_angle) * np.cos(nu_angle)
area_n = np.abs(area_n * corr_factor)
area_s = np.abs(area_s * corr_factor)
area_str = f"{area_n:10.2f}{area_s:10.2f}"
ang_str = ""
for angle, _ in CCL_ANGLES[zebra_mode]:
ang_str = ang_str + f"{scan[angle]:8}"
out_file.write(scan_str + hkl_str + area_str + ang_str + "\n")

View File

@ -1,80 +0,0 @@
import numpy as np
def correction(value, lorentz=True, zebra_mode="--", ang1=0, ang2=0):
if lorentz is False:
return value
else:
if zebra_mode == "bi":
corr_value = np.abs(value * np.sin(ang1))
return corr_value
elif zebra_mode == "nb":
corr_value = np.abs(value * np.sin(ang1) * np.cos(ang2))
return corr_value
def export_comm(data, path, lorentz=False):
"""exports data in the *.comm format
:param lorentz: perform Lorentz correction
:param path: path to file + name
:arg data - data to export, is dict after peak fitting
"""
zebra_mode = data["meta"]["zebra_mode"]
align = ">"
if data["meta"]["indices"] == "hkl":
extension = ".comm"
padding = [6, 4, 10, 8]
elif data["meta"]["indices"] == "real":
extension = ".incomm"
padding = [4, 6, 10, 8]
with open(str(path + extension), "w") as out_file:
for key, scan in data["scan"].items():
if "fit" not in scan:
print("Scan skipped - no fit value for:", key)
continue
scan_number_str = f"{key:{align}{padding[0]}}"
h_str = f'{int(scan["h_index"]):{padding[1]}}'
k_str = f'{int(scan["k_index"]):{padding[1]}}'
l_str = f'{int(scan["l_index"]):{padding[1]}}'
if data["meta"]["area_method"] == "fit":
area = float(scan["fit"]["fit_area"].n)
sigma_str = (
f'{"{:8.2f}".format(float(scan["fit"]["fit_area"].s)):{align}{padding[2]}}'
)
elif data["meta"]["area_method"] == "integ":
area = float(scan["fit"]["int_area"].n)
sigma_str = (
f'{"{:8.2f}".format(float(scan["fit"]["int_area"].s)):{align}{padding[2]}}'
)
if zebra_mode == "bi":
area = correction(area, lorentz, zebra_mode, scan["twotheta_angle"])
int_str = f'{"{:8.2f}".format(area):{align}{padding[2]}}'
angle_str1 = f'{scan["twotheta_angle"]:{padding[3]}}'
angle_str2 = f'{scan["omega_angle"]:{padding[3]}}'
angle_str3 = f'{scan["chi_angle"]:{padding[3]}}'
angle_str4 = f'{scan["phi_angle"]:{padding[3]}}'
elif zebra_mode == "nb":
area = correction(area, lorentz, zebra_mode, scan["gamma_angle"], scan["nu_angle"])
int_str = f'{"{:8.2f}".format(area):{align}{padding[2]}}'
angle_str1 = f'{scan["gamma_angle"]:{padding[3]}}'
angle_str2 = f'{scan["omega_angle"]:{padding[3]}}'
angle_str3 = f'{scan["nu_angle"]:{padding[3]}}'
angle_str4 = f'{scan["unkwn_angle"]:{padding[3]}}'
line = (
scan_number_str
+ h_str
+ l_str
+ k_str
+ int_str
+ sigma_str
+ angle_str1
+ angle_str2
+ angle_str3
+ angle_str4
+ "\n"
)
out_file.write(line)

View File

@ -59,15 +59,17 @@ def fitccl(
constraints_min = [23, None, 50, 0, 0]
constraints_min = [80, None, 1000, 0, 100]
"""
if "peak_indexes" not in scan:
scan["peak_indexes"] = []
if len(scan["peak_indexes"]) > 1:
# return in case of more than 1 peaks
print("More than 1 peak, scan skipped")
return
if binning is None or binning == 0 or binning == 1:
x = list(scan["om"])
y = list(scan["Counts"])
y_err = list(np.sqrt(y)) if scan.get("sigma", None) is None else list(scan["sigma"])
print(scan["peak_indexes"])
if not scan["peak_indexes"]:
centre = np.mean(x)
else:
@ -87,7 +89,6 @@ def fitccl(
if len(scan["peak_indexes"]) == 0:
# Case for no peak, gaussian in centre, sigma as 20% of range
print("No peak")
peak_index = find_nearest(x, np.mean(x))
guess[0] = centre if guess[0] is None else guess[0]
guess[1] = (x[-1] - x[0]) / 5 if guess[1] is None else guess[1]
@ -98,7 +99,6 @@ def fitccl(
elif len(scan["peak_indexes"]) == 1:
# case for one peak, takse into account users guesses
print("one peak")
peak_height = scan["peak_heights"]
guess[0] = centre if guess[0] is None else guess[0]
guess[1] = 0.1 if guess[1] is None else guess[1]
@ -128,11 +128,11 @@ def fitccl(
("intercept", guess[4], bool(vary[4]), constraints_min[4], constraints_max[4], None, None),
)
# the weighted fit
weights = [np.abs(1 / val) if val != 0 else 1 for val in y_err]
try:
result = mod.fit(
y, params, weights=[np.abs(1 / val) for val in y_err], x=x, calc_covar=True,
)
result = mod.fit(y, params, weights=weights, x=x, calc_covar=True)
except ValueError:
print(f"Couldn't fit scan {scan['scan_number']}")
return
if result.params["g_amp"].stderr is None:
@ -213,9 +213,9 @@ def fitccl(
d = {}
for pars in result.params:
d[str(pars)] = (result.params[str(pars)].value, result.params[str(pars)].vary)
print(result.fit_report())
print((result.params["g_amp"].value - int_area.n) / result.params["g_amp"].value)
print("Scan", scan["scan_number"])
print(result.fit_report())
d["ratio"] = (result.params["g_amp"].value - int_area.n) / result.params["g_amp"].value
d["int_area"] = int_area
@ -224,4 +224,5 @@ def fitccl(
d["result"] = result
d["comps"] = comps
d["numfit"] = [numfit_min, numfit_max]
d["x_fit"] = x
scan["fit"] = d

167
pyzebra/fitvol3.py Normal file
View File

@ -0,0 +1,167 @@
import numpy as np
from lmfit import Model, Parameters
from scipy.integrate import simps
import matplotlib.pyplot as plt
import uncertainties as u
from lmfit.models import GaussianModel
from lmfit.models import VoigtModel
from lmfit.models import PseudoVoigtModel
def bin_data(array, binsize):
if isinstance(binsize, int) and 0 < binsize < len(array):
return [
np.mean(array[binsize * i : binsize * i + binsize])
for i in range(int(np.ceil(len(array) / binsize)))
]
else:
print("Binsize need to be positive integer smaller than lenght of array")
return array
def create_uncertanities(y, y_err):
# create array with uncertanities for error propagation
combined = np.array([])
for i in range(len(y)):
part = u.ufloat(y[i], y_err[i])
combined = np.append(combined, part)
return combined
def find_nearest(array, value):
# find nearest value and return index
array = np.asarray(array)
idx = (np.abs(array - value)).argmin()
return idx
# predefined peak positions
# peaks = [6.2, 8.1, 9.9, 11.5]
peaks = [23.5, 24.5]
# peaks = [24]
def fitccl(scan, variable="om", peak_type="gauss", binning=None):
x = list(scan[variable])
y = list(scan["Counts"])
peak_centre = np.mean(x)
if binning is None or binning == 0 or binning == 1:
x = list(scan["om"])
y = list(scan["Counts"])
y_err = list(np.sqrt(y)) if scan.get("sigma", None) is None else list(scan["sigma"])
print(scan["peak_indexes"])
if not scan["peak_indexes"]:
peak_centre = np.mean(x)
else:
centre = x[int(scan["peak_indexes"])]
else:
x = list(scan["om"])
if not scan["peak_indexes"]:
peak_centre = np.mean(x)
else:
peak_centre = x[int(scan["peak_indexes"])]
x = bin_data(x, binning)
y = list(scan["Counts"])
y_err = list(np.sqrt(y)) if scan.get("sigma", None) is None else list(scan["sigma"])
combined = bin_data(create_uncertanities(y, y_err), binning)
y = [combined[i].n for i in range(len(combined))]
y_err = [combined[i].s for i in range(len(combined))]
def background(x, slope, intercept):
"""background"""
return slope * (x - peak_centre) + intercept
def gaussian(x, center, g_sigma, amplitude):
"""1-d gaussian: gaussian(x, amp, cen, wid)"""
return (amplitude / (np.sqrt(2.0 * np.pi) * g_sigma)) * np.exp(
-((x - center) ** 2) / (2 * g_sigma ** 2)
)
def lorentzian(x, center, l_sigma, amplitude):
"""1d lorentzian"""
return (amplitude / (1 + ((1 * x - center) / l_sigma) ** 2)) / (np.pi * l_sigma)
def pseudoVoigt1(x, center, g_sigma, amplitude, l_sigma, fraction):
"""PseudoVoight peak with different widths of lorenzian and gaussian"""
return (1 - fraction) * gaussian(x, center, g_sigma, amplitude) + fraction * (
lorentzian(x, center, l_sigma, amplitude)
)
mod = Model(background)
params = Parameters()
params.add_many(
("slope", 0, True, None, None, None, None), ("intercept", 0, False, None, None, None, None)
)
for i in range(len(peaks)):
if peak_type == "gauss":
mod = mod + GaussianModel(prefix="p%d_" % (i + 1))
params.add(str("p%d_" % (i + 1) + "amplitude"), 20, True, 0, None, None)
params.add(str("p%d_" % (i + 1) + "center"), peaks[i], True, None, None, None)
params.add(str("p%d_" % (i + 1) + "sigma"), 0.2, True, 0, 5, None)
elif peak_type == "voigt":
mod = mod + VoigtModel(prefix="p%d_" % (i + 1))
params.add(str("p%d_" % (i + 1) + "amplitude"), 20, True, 0, None, None)
params.add(str("p%d_" % (i + 1) + "center"), peaks[i], True, None, None, None)
params.add(str("p%d_" % (i + 1) + "sigma"), 0.2, True, 0, 3, None)
params.add(str("p%d_" % (i + 1) + "gamma"), 0.2, True, 0, 5, None)
elif peak_type == "pseudovoigt":
mod = mod + PseudoVoigtModel(prefix="p%d_" % (i + 1))
params.add(str("p%d_" % (i + 1) + "amplitude"), 20, True, 0, None, None)
params.add(str("p%d_" % (i + 1) + "center"), peaks[i], True, None, None, None)
params.add(str("p%d_" % (i + 1) + "sigma"), 0.2, True, 0, 5, None)
params.add(str("p%d_" % (i + 1) + "fraction"), 0.5, True, -5, 5, None)
elif peak_type == "pseudovoigt1":
mod = mod + Model(pseudoVoigt1, prefix="p%d_" % (i + 1))
params.add(str("p%d_" % (i + 1) + "amplitude"), 20, True, 0, None, None)
params.add(str("p%d_" % (i + 1) + "center"), peaks[i], True, None, None, None)
params.add(str("p%d_" % (i + 1) + "g_sigma"), 0.2, True, 0, 5, None)
params.add(str("p%d_" % (i + 1) + "l_sigma"), 0.2, True, 0, 5, None)
params.add(str("p%d_" % (i + 1) + "fraction"), 0.5, True, 0, 1, None)
# add parameters
result = mod.fit(
y, params, weights=[np.abs(1 / y_err[i]) for i in range(len(y_err))], x=x, calc_covar=True
)
comps = result.eval_components()
reportstring = list()
for keys in result.params:
if result.params[keys].value is not None:
str2 = np.around(result.params[keys].value, 3)
else:
str2 = 0
if result.params[keys].stderr is not None:
str3 = np.around(result.params[keys].stderr, 3)
else:
str3 = 0
reportstring.append("%s = %2.3f +/- %2.3f" % (keys, str2, str3))
reportstring = "\n".join(reportstring)
plt.figure(figsize=(20, 10))
plt.plot(x, result.best_fit, "k-", label="Best fit")
plt.plot(x, y, "b-", label="Original data")
plt.plot(x, comps["background"], "g--", label="Line component")
for i in range(len(peaks)):
plt.plot(
x,
comps[str("p%d_" % (i + 1))],
"r--",
)
plt.fill_between(x, comps[str("p%d_" % (i + 1))], alpha=0.4, label=str("p%d_" % (i + 1)))
plt.legend()
plt.text(
np.min(x),
np.max(y),
reportstring,
fontsize=9,
verticalalignment="top",
)
plt.title(str(peak_type))
plt.xlabel("Omega [deg]")
plt.ylabel("Counts [a.u.]")
plt.show()
print(result.fit_report())

View File

@ -60,7 +60,12 @@ def read_detector_data(filepath):
det_data["chi_angle"] = h5f["/entry1/sample/chi"][:] # ch
det_data["phi_angle"] = h5f["/entry1/sample/phi"][:] # ph
det_data["UB"] = h5f["/entry1/sample/UB"][:].reshape(3, 3)
det_data["magnetic_field"] = h5f["/entry1/sample/magnetic_field"][:]
det_data["temperature"] = h5f["/entry1/sample/temperature"][:]
# optional parameters
if "/entry1/sample/magnetic_field" in h5f:
det_data["magnetic_field"] = h5f["/entry1/sample/magnetic_field"][:]
if "/entry1/sample/temperature" in h5f:
det_data["temperature"] = h5f["/entry1/sample/temperature"][:]
return det_data

302
pyzebra/merge_function.py Normal file
View File

@ -0,0 +1,302 @@
import numpy as np
import uncertainties as u
def create_tuples(x, y, y_err):
"""creates tuples for sorting and merginng of the data
Counts need to be normalized to monitor before"""
t = list()
for i in range(len(x)):
tup = (x[i], y[i], y_err[i])
t.append(tup)
return t
def normalize(scan, monitor):
"""Normalizes the measurement to monitor, checks if sigma exists, otherwise creates it
:arg dict : dictionary to from which to tkae the scan
:arg key : which scan to normalize from dict1
:arg monitor : final monitor
:return counts - normalized counts
:return sigma - normalized sigma"""
counts = np.array(scan["Counts"])
sigma = np.sqrt(counts) if "sigma" not in scan else scan["sigma"]
monitor_ratio = monitor / scan["monitor"]
scaled_counts = counts * monitor_ratio
scaled_sigma = np.array(sigma) * monitor_ratio
return scaled_counts, scaled_sigma
def merge(scan1, scan2, keep=True, monitor=100000):
"""merges the two tuples and sorts them, if om value is same, Counts value is average
averaging is propagated into sigma if dict1 == dict2, key[1] is deleted after merging
:arg dict1 : dictionary to which measurement will be merged
:arg dict2 : dictionary from which measurement will be merged
:arg scand_dict_result : result of scan_dict after auto function
:arg keep : if true, when monitors are same, does not change it, if flase, takes monitor
always
:arg monitor : final monitor after merging
note: dict1 and dict2 can be same dict
:return dict1 with merged scan"""
if keep:
if scan1["monitor"] == scan2["monitor"]:
monitor = scan1["monitor"]
# load om and Counts
x1, x2 = scan1["om"], scan2["om"]
cor_y1, y_err1 = normalize(scan1, monitor=monitor)
cor_y2, y_err2 = normalize(scan2, monitor=monitor)
# creates touples (om, Counts, sigma) for sorting and further processing
tuple_list = create_tuples(x1, cor_y1, y_err1) + create_tuples(x2, cor_y2, y_err2)
# Sort the list on om and add 0 0 0 tuple to the last position
sorted_t = sorted(tuple_list, key=lambda tup: tup[0])
sorted_t.append((0, 0, 0))
om, Counts, sigma = [], [], []
seen = list()
for i in range(len(sorted_t) - 1):
if sorted_t[i][0] not in seen:
if sorted_t[i][0] != sorted_t[i + 1][0]:
om = np.append(om, sorted_t[i][0])
Counts = np.append(Counts, sorted_t[i][1])
sigma = np.append(sigma, sorted_t[i][2])
else:
om = np.append(om, sorted_t[i][0])
counts1, counts2 = sorted_t[i][1], sorted_t[i + 1][1]
sigma1, sigma2 = sorted_t[i][2], sorted_t[i + 1][2]
count_err1 = u.ufloat(counts1, sigma1)
count_err2 = u.ufloat(counts2, sigma2)
avg = (count_err1 + count_err2) / 2
Counts = np.append(Counts, avg.n)
sigma = np.append(sigma, avg.s)
seen.append(sorted_t[i][0])
else:
continue
scan1["om"] = om
scan1["Counts"] = Counts
scan1["sigma"] = sigma
scan1["monitor"] = monitor
print("merging done")
def check_UB(dict1, dict2, precision=0.01):
truth_list = list()
for i in ["ub1j", "ub2j", "ub3j"]:
for j in range(3):
if abs(abs(float(dict1["meta"][i][j])) - abs(float(dict2["meta"][i][j]))) < precision:
truth_list.append(True)
else:
truth_list.append(False)
# print(truth_list)
if all(truth_list):
return True
else:
return False
def check_zebramode(dict1, dict2):
if dict1["meta"]["zebra_mode"] == dict2["meta"]["zebra_mode"]:
return True
else:
return False
def check_angles(scan1, scan2, angles, precision):
truth_list = list()
for item in angles:
if abs(abs(scan1[item]) - abs(scan2[item])) <= precision[item]:
truth_list.append(True)
else:
truth_list.append(False)
if all(truth_list):
return True
else:
return False
def check_temp_mag(scan1, scan2):
temp_diff = 1
mag_diff = 0.001
truth_list = list()
try:
if abs(abs(scan1["mag_field"]) - abs(scan2["mag_field"])) <= mag_diff:
truth_list.append(True)
else:
truth_list.append(False)
except KeyError:
print("mag_field missing")
try:
if abs(abs(scan1["temperature"]) - abs(scan2["temperature"])) <= temp_diff:
truth_list.append(True)
else:
truth_list.append(False)
except KeyError:
print("temperature missing")
if all(truth_list):
return True
else:
return False
def merge_dups(dictionary, angles):
precision = {
"twotheta_angle": 0.1,
"chi_angle": 0.1,
"nu_angle": 0.1,
"phi_angle": 0.05,
"omega_angle": 0.05,
"gamma_angle": 0.05,
}
for i in list(dictionary["scan"]):
for j in list(dictionary["scan"]):
if i == j:
continue
else:
# print(i, j)
if check_angles(dictionary["scan"][i], dictionary["scan"][j], angles, precision):
merge(dictionary["scan"][i], dictionary["scan"][j])
print("merged %d with %d" % (i, j))
del dictionary["scan"][j]
merge_dups(dictionary, angles)
break
else:
continue
break
def add_scan(dict1, dict2, scan_to_add):
max_scan = np.max(list(dict1["scan"]))
dict1["scan"][max_scan + 1] = dict2["scan"][scan_to_add]
if dict1.get("extra_meta") is None:
dict1["extra_meta"] = {}
dict1["extra_meta"][max_scan + 1] = dict2["meta"]
del dict2["scan"][scan_to_add]
def process(dict1, dict2, angles, precision):
# stop when the second dict is empty
# print(dict2["scan"])
if dict2["scan"]:
print("doing something")
# check UB matrixes
if check_UB(dict1, dict2):
# iterate over second dict and check for matches
for i in list(dict2["scan"]):
for j in list(dict1["scan"]):
if check_angles(dict1["scan"][j], dict2["scan"][i], angles, precision):
# angles good, see the mag and temp
if check_temp_mag(dict1["scan"][j], dict2["scan"][i]):
merge(dict1["scan"][j], dict2["scan"][i])
print("merged")
del dict2["scan"][i]
process(dict1, dict2, angles, precision)
break
else:
add_scan(dict1, dict2, i)
print("scan added r")
process(dict1, dict2, angles, precision)
break
else:
add_scan(dict1, dict2, i)
print("scan added l")
process(dict1, dict2, angles, precision)
break
else:
continue
break
else:
# ask user if he really wants to add
print("UBs are different, do you really wish to add datasets? Y/N")
dict1 = add_dict(dict1, dict2)
return
"""
1. check for bisecting or normal beam geometry in data files; select stt, om, chi, phi for bisecting; select stt, om, nu for normal beam
2. in the ccl files, check for identical stt, chi and nu within 0.1 degree, and, at the same time, for identical om and phi within 0.05 degree;
3. in the dat files, check for identical stt, chi and nu within 0.1 degree, and, at the same time,
for identical phi within 0.05 degree, and, at the same time, for identical om within 5 degree."""
def unified_merge(dict1, dict2):
if not check_zebramode(dict1, dict2):
print("You are trying to add two files with different zebra mdoe")
return
# decide angles
if dict1["meta"]["zebra_mode"] == "bi":
angles = ["twotheta_angle", "omega_angle", "chi_angle", "phi_angle"]
elif dict1["meta"]["zebra_mode"] == "nb":
angles = ["gamma_angle", "omega_angle", "nu_angle"]
# precision of angles to check
precision = {
"twotheta_angle": 0.1,
"chi_angle": 0.1,
"nu_angle": 0.1,
"phi_angle": 0.05,
"omega_angle": 5,
"gamma_angle": 0.05,
}
if (dict1["meta"]["data_type"] == "ccl") and (dict2["meta"]["data_type"] == "ccl"):
precision["omega_angle"] = 0.05
# check for duplicates in original files
for d in dict1, dict2:
# no duplicates in dats
if d["meta"]["data_type"] == "dat":
continue
else:
merge_dups(d, angles)
process(dict1, dict2, angles, precision)
def add_dict(dict1, dict2):
"""adds two dictionaries, meta of the new is saved as meata+original_filename and
measurements are shifted to continue with numbering of first dict
:arg dict1 : dictionarry to add to
:arg dict2 : dictionarry from which to take the measurements
:return dict1 : combined dictionary
Note: dict1 must be made from ccl, otherwise we would have to change the structure of loaded
dat file"""
try:
if dict1["meta"]["zebra_mode"] != dict2["meta"]["zebra_mode"]:
print("You are trying to add scans measured with different zebra modes")
return
# this is for the qscan case
except KeyError:
print("Zebra mode not specified")
max_measurement_dict1 = max([keys for keys in dict1["scan"]])
new_filenames = np.arange(
max_measurement_dict1 + 1, max_measurement_dict1 + 1 + len(dict2["scan"])
)
if dict1.get("extra_meta") is None:
dict1["extra_meta"] = {}
new_meta_name = "meta" + str(dict2["meta"]["original_filename"])
if new_meta_name not in dict1:
for keys, name in zip(dict2["scan"], new_filenames):
dict2["scan"][keys]["file_of_origin"] = str(dict2["meta"]["original_filename"])
dict1["scan"][name] = dict2["scan"][keys]
dict1["extra_meta"][name] = dict2["meta"]
dict1[new_meta_name] = dict2["meta"]
else:
raise KeyError(
str(
"The file %s has alredy been added to %s"
% (dict2["meta"]["original_filename"], dict1["meta"]["original_filename"])
)
)
return dict1

View File

@ -1,12 +1,24 @@
from load_1D import load_1D
from ccl_dict_operation import add_dict
import pandas as pd
from mpl_toolkits.mplot3d import Axes3D # dont delete, otherwise waterfall wont work
import matplotlib.pyplot as plt
import matplotlib as mpl
import numpy as np
import pickle
import matplotlib as mpl
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import scipy.io as sio
import uncertainties as u
from mpl_toolkits.mplot3d import Axes3D # dont delete, otherwise waterfall wont work
import collections
from .ccl_io import load_1D
def create_tuples(x, y, y_err):
"""creates tuples for sorting and merginng of the data
Counts need to be normalized to monitor before"""
t = list()
for i in range(len(x)):
tup = (x[i], y[i], y_err[i])
t.append(tup)
return t
def load_dats(filepath):
@ -37,45 +49,45 @@ def load_dats(filepath):
if data_type == "txt":
dict1 = add_dict(dict1, load_1D(file_list[i][0]))
else:
dict1 = add_dict(dict1, load_1D(file_list[i]))
dict1["scan"][i + 1]["params"] = {}
if data_type == "txt":
for x in range(len(col_names) - 1):
dict1["scan"][i + 1]["params"][col_names[x + 1]] = file_list[i][x + 1]
dict1["scan"][i + 1]["params"][col_names[x + 1]] = float(file_list[i][x + 1])
return dict1
def create_dataframe(dict1):
def create_dataframe(dict1, variables):
"""Creates pandas dataframe from the dictionary
:arg ccl like dictionary
:return pandas dataframe"""
# create dictionary to which we pull only wanted items before transforming it to pd.dataframe
pull_dict = {}
pull_dict["filenames"] = list()
for key in dict1["scan"][1]["params"]:
pull_dict[key] = list()
pull_dict["temperature"] = list()
pull_dict["mag_field"] = list()
for keys in variables:
for item in variables[keys]:
pull_dict[item] = list()
pull_dict["fit_area"] = list()
pull_dict["int_area"] = list()
pull_dict["om"] = list()
pull_dict["Counts"] = list()
for keys in pull_dict:
print(keys)
# populate the dict
for keys in dict1["scan"]:
if "file_of_origin" in dict1["scan"][keys]:
pull_dict["filenames"].append(dict1["scan"][keys]["file_of_origin"].split("/")[-1])
else:
pull_dict["filenames"].append(dict1["meta"]["original_filename"].split("/")[-1])
for key in dict1["scan"][keys]["params"]:
pull_dict[str(key)].append(float(dict1["scan"][keys]["params"][key]))
pull_dict["temperature"].append(dict1["scan"][keys]["temperature"])
pull_dict["mag_field"].append(dict1["scan"][keys]["mag_field"])
pull_dict["fit_area"].append(dict1["scan"][keys]["fit"]["fit_area"])
pull_dict["int_area"].append(dict1["scan"][keys]["fit"]["int_area"])
pull_dict["om"].append(dict1["scan"][keys]["om"])
pull_dict["Counts"].append(dict1["scan"][keys]["Counts"])
for key in variables:
for i in variables[key]:
pull_dict[i].append(_finditem(dict1["scan"][keys], i))
return pd.DataFrame(data=pull_dict)
@ -144,7 +156,7 @@ def make_graph(data, sorting_parameter, style):
def save_dict(obj, name):
""" saves dictionary as pickle file in binary format
"""saves dictionary as pickle file in binary format
:arg obj - object to save
:arg name - name of the file
NOTE: path should be added later"""
@ -200,3 +212,277 @@ def save_table(data, filetype, name, path=None):
hdf.close()
if filetype == "json":
data.to_json((path + name + ".json"))
def normalize(scan, monitor):
"""Normalizes the measurement to monitor, checks if sigma exists, otherwise creates it
:arg dict : dictionary to from which to tkae the scan
:arg key : which scan to normalize from dict1
:arg monitor : final monitor
:return counts - normalized counts
:return sigma - normalized sigma"""
counts = np.array(scan["Counts"])
sigma = np.sqrt(counts) if "sigma" not in scan else scan["sigma"]
monitor_ratio = monitor / scan["monitor"]
scaled_counts = counts * monitor_ratio
scaled_sigma = np.array(sigma) * monitor_ratio
return scaled_counts, scaled_sigma
def merge(scan1, scan2, keep=True, monitor=100000):
"""merges the two tuples and sorts them, if om value is same, Counts value is average
averaging is propagated into sigma if dict1 == dict2, key[1] is deleted after merging
:arg dict1 : dictionary to which measurement will be merged
:arg dict2 : dictionary from which measurement will be merged
:arg scand_dict_result : result of scan_dict after auto function
:arg keep : if true, when monitors are same, does not change it, if flase, takes monitor
always
:arg monitor : final monitor after merging
note: dict1 and dict2 can be same dict
:return dict1 with merged scan"""
if keep:
if scan1["monitor"] == scan2["monitor"]:
monitor = scan1["monitor"]
# load om and Counts
x1, x2 = scan1["om"], scan2["om"]
cor_y1, y_err1 = normalize(scan1, monitor=monitor)
cor_y2, y_err2 = normalize(scan2, monitor=monitor)
# creates touples (om, Counts, sigma) for sorting and further processing
tuple_list = create_tuples(x1, cor_y1, y_err1) + create_tuples(x2, cor_y2, y_err2)
# Sort the list on om and add 0 0 0 tuple to the last position
sorted_t = sorted(tuple_list, key=lambda tup: tup[0])
sorted_t.append((0, 0, 0))
om, Counts, sigma = [], [], []
seen = list()
for i in range(len(sorted_t) - 1):
if sorted_t[i][0] not in seen:
if sorted_t[i][0] != sorted_t[i + 1][0]:
om = np.append(om, sorted_t[i][0])
Counts = np.append(Counts, sorted_t[i][1])
sigma = np.append(sigma, sorted_t[i][2])
else:
om = np.append(om, sorted_t[i][0])
counts1, counts2 = sorted_t[i][1], sorted_t[i + 1][1]
sigma1, sigma2 = sorted_t[i][2], sorted_t[i + 1][2]
count_err1 = u.ufloat(counts1, sigma1)
count_err2 = u.ufloat(counts2, sigma2)
avg = (count_err1 + count_err2) / 2
Counts = np.append(Counts, avg.n)
sigma = np.append(sigma, avg.s)
seen.append(sorted_t[i][0])
else:
continue
scan1["om"] = om
scan1["Counts"] = Counts
scan1["sigma"] = sigma
scan1["monitor"] = monitor
print("merging done")
def add_dict(dict1, dict2):
"""adds two dictionaries, meta of the new is saved as meata+original_filename and
measurements are shifted to continue with numbering of first dict
:arg dict1 : dictionarry to add to
:arg dict2 : dictionarry from which to take the measurements
:return dict1 : combined dictionary
Note: dict1 must be made from ccl, otherwise we would have to change the structure of loaded
dat file"""
try:
if dict1["meta"]["zebra_mode"] != dict2["meta"]["zebra_mode"]:
print("You are trying to add scans measured with different zebra modes")
return
# this is for the qscan case
except KeyError:
print("Zebra mode not specified")
max_measurement_dict1 = max([keys for keys in dict1["scan"]])
new_filenames = np.arange(
max_measurement_dict1 + 1, max_measurement_dict1 + 1 + len(dict2["scan"])
)
new_meta_name = "meta" + str(dict2["meta"]["original_filename"])
if new_meta_name not in dict1:
for keys, name in zip(dict2["scan"], new_filenames):
dict2["scan"][keys]["file_of_origin"] = str(dict2["meta"]["original_filename"])
dict1["scan"][name] = dict2["scan"][keys]
dict1[new_meta_name] = dict2["meta"]
else:
raise KeyError(
str(
"The file %s has alredy been added to %s"
% (dict2["meta"]["original_filename"], dict1["meta"]["original_filename"])
)
)
return dict1
def auto(dict):
"""takes just unique tuples from all tuples in dictionary returend by scan_dict
intendet for automatic merge if you doesent want to specify what scans to merge together
args: dict - dictionary from scan_dict function
:return dict - dict without repetitions"""
for keys in dict:
tuple_list = dict[keys]
new = list()
for i in range(len(tuple_list)):
if tuple_list[0][0] == tuple_list[i][0]:
new.append(tuple_list[i])
dict[keys] = new
return dict
def scan_dict(dict, precision=0.5):
"""scans dictionary for duplicate angles indexes
:arg dict : dictionary to scan
:arg precision : in deg, sometimes angles are zero so its easier this way, instead of
checking zero division
:return dictionary with matching scans, if there are none, the dict is empty
note: can be checked by "not d", true if empty
"""
if dict["meta"]["zebra_mode"] == "bi":
angles = ["twotheta_angle", "omega_angle", "chi_angle", "phi_angle"]
elif dict["meta"]["zebra_mode"] == "nb":
angles = ["gamma_angle", "omega_angle", "nu_angle"]
else:
print("Unknown zebra mode")
return
d = {}
for i in dict["scan"]:
for j in dict["scan"]:
if dict["scan"][i] != dict["scan"][j]:
itup = list()
for k in angles:
itup.append(abs(abs(dict["scan"][i][k]) - abs(dict["scan"][j][k])))
if all(i <= precision for i in itup):
print(itup)
print([dict["scan"][i][k] for k in angles])
print([dict["scan"][j][k] for k in angles])
if str([np.around(dict["scan"][i][k], 0) for k in angles]) not in d:
d[str([np.around(dict["scan"][i][k], 0) for k in angles])] = list()
d[str([np.around(dict["scan"][i][k], 0) for k in angles])].append((i, j))
else:
d[str([np.around(dict["scan"][i][k], 0) for k in angles])].append((i, j))
else:
pass
else:
continue
return d
def _finditem(obj, key):
if key in obj:
return obj[key]
for k, v in obj.items():
if isinstance(v, dict):
item = _finditem(v, key)
if item is not None:
return item
def most_common(lst):
return max(set(lst), key=lst.count)
def variables(dictionary):
"""Funcrion to guess what variables will be used in the param study
i call pripary variable the one the array like variable, usually omega
and secondary the slicing variable, different for each scan,for example temperature"""
# find all variables that are in all scans
stdev_precision = 0.05
all_vars = list()
for keys in dictionary["scan"]:
all_vars.append([key for key in dictionary["scan"][keys] if key != "params"])
if dictionary["scan"][keys]["params"]:
all_vars.append(key for key in dictionary["scan"][keys]["params"])
all_vars = [i for sublist in all_vars for i in sublist]
# get the ones that are in all scans
b = collections.Counter(all_vars)
inall = [key for key in b if b[key] == len(dictionary["scan"])]
# delete those that are obviously wrong
wrong = [
"NP",
"Counts",
"Monitor1",
"Monitor2",
"Monitor3",
"h_index",
"l_index",
"k_index",
"n_points",
"monitor",
"Time",
"omega_angle",
"twotheta_angle",
"chi_angle",
"phi_angle",
"nu_angle",
]
inall_red = [i for i in inall if i not in wrong]
# check for primary variable, needs to be list, we dont suspect the
# primary variable be as a parameter (be in scan[params])
primary_candidates = list()
for key in dictionary["scan"]:
for i in inall_red:
if isinstance(_finditem(dictionary["scan"][key], i), list):
if np.std(_finditem(dictionary["scan"][key], i)) > stdev_precision:
primary_candidates.append(i)
# check which of the primary are in every scan
primary_candidates = collections.Counter(primary_candidates)
second_round_primary_candidates = [
key for key in primary_candidates if primary_candidates[key] == len(dictionary["scan"])
]
if len(second_round_primary_candidates) == 1:
print("We've got a primary winner!", second_round_primary_candidates)
else:
print("Still not sure with primary:(", second_round_primary_candidates)
# check for secondary variable, we suspect a float\int or not changing array
# we dont need to check for primary ones
secondary_candidates = [i for i in inall_red if i not in second_round_primary_candidates]
# print("secondary candidates", secondary_candidates)
# select arrays and floats and ints
second_round_secondary_candidates = list()
for key in dictionary["scan"]:
for i in secondary_candidates:
if isinstance(_finditem(dictionary["scan"][key], i), float):
second_round_secondary_candidates.append(i)
elif isinstance(_finditem(dictionary["scan"][key], i), int):
second_round_secondary_candidates.append(i)
elif isinstance(_finditem(dictionary["scan"][key], i), list):
if np.std(_finditem(dictionary["scan"][key], i)) < stdev_precision:
second_round_secondary_candidates.append(i)
second_round_secondary_candidates = collections.Counter(second_round_secondary_candidates)
second_round_secondary_candidates = [
key
for key in second_round_secondary_candidates
if second_round_secondary_candidates[key] == len(dictionary["scan"])
]
# print("secondary candidates after second round", second_round_secondary_candidates)
# now we check if they vary between the scans
third_round_sec_candidates = list()
for i in second_round_secondary_candidates:
check_array = list()
for keys in dictionary["scan"]:
check_array.append(np.average(_finditem(dictionary["scan"][keys], i)))
# print(i, check_array, np.std(check_array))
if np.std(check_array) > stdev_precision:
third_round_sec_candidates.append(i)
if len(third_round_sec_candidates) == 1:
print("We've got a secondary winner!", third_round_sec_candidates)
else:
print("Still not sure with secondary :(", third_round_sec_candidates)
return {"primary": second_round_primary_candidates, "secondary": third_round_sec_candidates}

4
scripts/pyzebra-start.sh Normal file
View File

@ -0,0 +1,4 @@
source /home/pyzebra/miniconda3/etc/profile.d/conda.sh
conda activate prod
pyzebra --port=80 --allow-websocket-origin=pyzebra.psi.ch:80

View File

@ -0,0 +1,4 @@
source /home/pyzebra/miniconda3/etc/profile.d/conda.sh
conda activate test
python ~/pyzebra/pyzebra/app/cli.py --allow-websocket-origin=pyzebra.psi.ch:5006

View File

@ -0,0 +1,11 @@
[Unit]
Description=pyzebra-test web server (runs on port 5006)
[Service]
Type=simple
User=pyzebra
ExecStart=/bin/bash /usr/local/sbin/pyzebra-test-start.sh
Restart=always
[Install]
WantedBy=multi-user.target

10
scripts/pyzebra.service Normal file
View File

@ -0,0 +1,10 @@
[Unit]
Description=pyzebra web server
[Service]
Type=simple
ExecStart=/bin/bash /usr/local/sbin/pyzebra-start.sh
Restart=always
[Install]
WantedBy=multi-user.target