added metadata collection routine and tests
This commit is contained in:
@@ -1,43 +1,119 @@
|
||||
import yaml
|
||||
import os
|
||||
import json
|
||||
from pathlib import Path
|
||||
from collections import defaultdict
|
||||
|
||||
import logging
|
||||
logger = logging.getLogger()
|
||||
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
|
||||
import sfdata
|
||||
import sfdata.errors
|
||||
from sfdata import SFDataFiles, sfdatafile, SFScanInfo, SFProcFile
|
||||
from xraydb import material_mu
|
||||
from joblib import Parallel, delayed, cpu_count
|
||||
|
||||
|
||||
def collect_runs_metadata(pgroup_data_dir: str | os.PathLike):
|
||||
"""
|
||||
Generates metadata overview for all runs in a given p-group data directory.
|
||||
|
||||
Not all available metadata is included, we skip e.g. lists of recorded channels.
|
||||
|
||||
Returns: Pandas dataframe with the individual acquisitions and the respective metadata.
|
||||
"""
|
||||
|
||||
measurements = defaultdict(list)
|
||||
|
||||
raw_dir = Path(pgroup_data_dir) / "raw"
|
||||
|
||||
run_paths = sorted([run for run in Path(raw_dir).glob("run[0-9][0-9][0-9][0-9]")])
|
||||
|
||||
for run_path in run_paths:
|
||||
# overall data in scan.json
|
||||
with open(run_path / "meta/scan.json", "r") as file:
|
||||
scan_meta = json.load(file)
|
||||
|
||||
# individual metadata in acqXXXX.json
|
||||
num_acq = len(scan_meta["scan_files"])
|
||||
acq_jsons = sorted([acq for acq in Path(run_path).glob("meta/acq[0-9][0-9][0-9][0-9].json")])
|
||||
|
||||
for i, acq in enumerate(acq_jsons):
|
||||
with open(acq) as file:
|
||||
acq_meta = json.load(file)
|
||||
|
||||
for parameter in [
|
||||
"rate_multiplicator",
|
||||
"user_tag",
|
||||
"request_time",
|
||||
"unique_acquisition_run_number",
|
||||
"start_pulseid",
|
||||
"stop_pulseid",
|
||||
"client_name",
|
||||
]:
|
||||
try:
|
||||
measurements[parameter].append(acq_meta[parameter])
|
||||
except KeyError:
|
||||
logger.info(f'No parameter {parameter} in metadata found, skipping.')
|
||||
measurements[parameter] = None
|
||||
|
||||
for scan_parameter in ["scan_readbacks", "scan_step_info", "scan_values", "scan_readbacks_raw"]:
|
||||
# in most scans we are only scanning one parameter at a time, so convert to single object here instead of list
|
||||
meta_parameter = (
|
||||
scan_meta[scan_parameter][i][0]
|
||||
if len(scan_meta[scan_parameter][i]) == 1
|
||||
else scan_meta[scan_parameter][i]
|
||||
)
|
||||
measurements[scan_parameter].append(meta_parameter)
|
||||
|
||||
# generate file pattern to cover all *.h5 of acquisition (this assumes we have some PVDATA which should almost always be the case).
|
||||
measurements["files"].append(str(scan_meta["scan_files"][i][0]).replace("PVDATA", "*"))
|
||||
|
||||
df = pd.DataFrame(measurements)
|
||||
|
||||
return df
|
||||
|
||||
|
||||
def scan_info(run_number, base_path=None, small_data=True):
|
||||
"""Returns SFScanInfo object for a given run number.
|
||||
"""Returns SFScanInfo object for a given run number.
|
||||
If there is are small data channels, they will be added (small_data=False to suppress their loading).
|
||||
"""
|
||||
if base_path == None:
|
||||
base_path = heuristic_extract_base_path()
|
||||
|
||||
|
||||
scan = SFScanInfo(f"{base_path}run{run_number:04}/meta/scan.json")
|
||||
|
||||
|
||||
try:
|
||||
if small_data:
|
||||
for i in range(len(scan.readbacks)):
|
||||
sd_path_for_step=heuristic_extract_smalldata_path()+'run'+str(run_number).zfill(4)+'/acq'+str(i+1).zfill(4)+'.smalldata*.h5'
|
||||
scan.info['scan_files'][i].append(sd_path_for_step)
|
||||
sd_path_for_step = (
|
||||
heuristic_extract_smalldata_path()
|
||||
+ "run"
|
||||
+ str(run_number).zfill(4)
|
||||
+ "/acq"
|
||||
+ str(i + 1).zfill(4)
|
||||
+ ".smalldata*.h5"
|
||||
)
|
||||
scan.info["scan_files"][i].append(sd_path_for_step)
|
||||
except KeyError:
|
||||
logger.warning("Failed to load smalldata.")
|
||||
logger.warning("Failed to load smalldata.")
|
||||
|
||||
return scan
|
||||
|
||||
def get_scan_from_run_number_or_scan(run_number_or_scan,small_data=True):
|
||||
|
||||
def get_scan_from_run_number_or_scan(run_number_or_scan, small_data=True):
|
||||
"""Returns SFScanInfo object from run number or SFScanInfo object (then just passes that to output)"""
|
||||
if type(run_number_or_scan) == SFScanInfo:
|
||||
scan = run_number_or_scan
|
||||
else:
|
||||
scan = scan_info(run_number_or_scan,small_data=small_data)
|
||||
scan = scan_info(run_number_or_scan, small_data=small_data)
|
||||
return scan
|
||||
|
||||
def get_run_number_from_run_number_or_scan(run_number_or_scan,small_data=True):
|
||||
|
||||
def get_run_number_from_run_number_or_scan(run_number_or_scan, small_data=True):
|
||||
"""Returns run number from run number or SFScanInfo object"""
|
||||
if type(run_number_or_scan) == int:
|
||||
rn = run_number_or_scan
|
||||
@@ -47,22 +123,27 @@ def get_run_number_from_run_number_or_scan(run_number_or_scan,small_data=True):
|
||||
raise ValueError("Input must be an int or SFScanInfo object")
|
||||
return rn
|
||||
|
||||
def channel_names(run_number,verbose=False):
|
||||
|
||||
def channel_names(run_number, verbose=False):
|
||||
"""Prints channel names for a given run_number or scan object"""
|
||||
if type(run_number) == SFScanInfo:
|
||||
scan = run_number
|
||||
else:
|
||||
scan = scan_info(run_number)
|
||||
|
||||
|
||||
channel_list = list(scan[0].keys())
|
||||
|
||||
|
||||
if verbose:
|
||||
print(channel_list)
|
||||
|
||||
|
||||
return channel_list
|
||||
|
||||
|
||||
def print_run_info(
|
||||
run_number=42, print_channels=True, extra_verbose=False, base_path=None,
|
||||
run_number=42,
|
||||
print_channels=True,
|
||||
extra_verbose=False,
|
||||
base_path=None,
|
||||
):
|
||||
"""Prints overview of run information.
|
||||
|
||||
@@ -109,72 +190,95 @@ def print_run_info(
|
||||
def number_of_steps(scan_number_or_scan):
|
||||
"""Returns number of steps for a scan object or run number"""
|
||||
scan = get_scan_from_run_number_or_scan(scan_number_or_scan)
|
||||
return len(scan.info['scan_readbacks'])
|
||||
return len(scan.info["scan_readbacks"])
|
||||
|
||||
def process_run(run_number, rois,detector='JF16T03V01', calculate =None, only_shots=slice(None), n_jobs=cpu_count()-2):
|
||||
|
||||
def process_run(
|
||||
run_number, rois, detector="JF16T03V01", calculate=None, only_shots=slice(None), n_jobs=cpu_count() - 2
|
||||
):
|
||||
"""Process rois for a given detector. Save the results small data in the res/small_data/run...
|
||||
By default only sum of rois is calculated, [mean,std,img] can be added to the "calculate" optional parameter.
|
||||
"""
|
||||
"""
|
||||
rn = get_run_number_from_run_number_or_scan(run_number)
|
||||
|
||||
|
||||
# Load scan object with SFScanInfo
|
||||
scan = scan_info(rn,small_data=False)
|
||||
|
||||
scan = scan_info(rn, small_data=False)
|
||||
|
||||
# Make the small data folder if it doesn't exist
|
||||
if not os.path.exists( heuristic_extract_smalldata_path() ):
|
||||
os.mkdir( heuristic_extract_smalldata_path() )
|
||||
if not os.path.exists(heuristic_extract_smalldata_path()):
|
||||
os.mkdir(heuristic_extract_smalldata_path())
|
||||
|
||||
# Set the path for later small data saving
|
||||
path_with_run_folder = heuristic_extract_smalldata_path()+'run'+str(run_number).zfill(4)
|
||||
path_with_run_folder = heuristic_extract_smalldata_path() + "run" + str(run_number).zfill(4)
|
||||
|
||||
# Make the small data run folder if it doesn't exist
|
||||
if not os.path.exists( path_with_run_folder ):
|
||||
os.mkdir( path_with_run_folder )
|
||||
|
||||
if not os.path.exists(path_with_run_folder):
|
||||
os.mkdir(path_with_run_folder)
|
||||
|
||||
# Add scan info into the small_data_object
|
||||
#for key in scan.info.keys():
|
||||
# for key in scan.info.keys():
|
||||
# d[key]= info.info[key]
|
||||
|
||||
|
||||
# Check if there is only one roi. If yes, make a list of it so it can be iterated over.
|
||||
if isinstance(rois, ROI):
|
||||
rois = [rois]
|
||||
|
||||
def process_step(i):
|
||||
scan = scan_info(run_number,small_data=False)
|
||||
|
||||
scan = scan_info(run_number, small_data=False)
|
||||
|
||||
step = scan[i]
|
||||
with step as data:
|
||||
with SFProcFile(f"{path_with_run_folder}/acq{str(i+1).zfill(4)}.smalldata.h5", mode="w") as sd:
|
||||
|
||||
# Calculate everything related to JF_rois
|
||||
for roi in rois:
|
||||
|
||||
bottom, top, left, right = roi.bottom, roi.top, roi.left, roi.right
|
||||
|
||||
# Pulse ids for saving the new channels
|
||||
det_pids = data[detector].pids
|
||||
sd[roi.name] = det_pids[only_shots], data[detector][only_shots, bottom:top,left:right].sum(axis=(1, 2))
|
||||
sd[roi.name] = det_pids[only_shots], data[detector][only_shots, bottom:top, left:right].sum(
|
||||
axis=(1, 2)
|
||||
)
|
||||
if calculate:
|
||||
if 'means' in calculate:
|
||||
sd[roi.name+"_means"] = (det_pids[only_shots], data[detector][only_shots, bottom:top,left:right].mean(axis=(1, 2)))
|
||||
if 'std' in calculate:
|
||||
sd[roi.name+"_std"] = (det_pids[only_shots], data[detector][only_shots, bottom:top,left:right].std(axis=(1, 2)))
|
||||
if 'img' in calculate:
|
||||
sd[f'{roi.name}_img'] = (det_pids[only_shots], data[detector][only_shots, bottom:top,left:right].data)
|
||||
|
||||
if "means" in calculate:
|
||||
sd[roi.name + "_means"] = (
|
||||
det_pids[only_shots],
|
||||
data[detector][only_shots, bottom:top, left:right].mean(axis=(1, 2)),
|
||||
)
|
||||
if "std" in calculate:
|
||||
sd[roi.name + "_std"] = (
|
||||
det_pids[only_shots],
|
||||
data[detector][only_shots, bottom:top, left:right].std(axis=(1, 2)),
|
||||
)
|
||||
if "img" in calculate:
|
||||
sd[f"{roi.name}_img"] = (
|
||||
det_pids[only_shots],
|
||||
data[detector][only_shots, bottom:top, left:right].data,
|
||||
)
|
||||
|
||||
# Currently meta files can't be read by SFData, this will be modified by Sven and then we can use it. For now saving in roi_info
|
||||
#sd.meta[roi.name+"_info"] = f"roi {roi.name}: {left},{right}; {bottom},{top} (left, right, bottom, top)"
|
||||
# sd.meta[roi.name+"_info"] = f"roi {roi.name}: {left},{right}; {bottom},{top} (left, right, bottom, top)"
|
||||
|
||||
# These channels have only one dataset per step of the scan, so we take the first pulseID
|
||||
sd[roi.name + "_info"] =([det_pids[0]], [f"roi {roi.name}: {left},{right}; {bottom},{top} (left, right, bottom, top)"])
|
||||
sd[roi.name + "_mean_img"] = ([det_pids[0]], [data[detector][only_shots, bottom:top,left:right].mean(axis=(0))] )
|
||||
sd[roi.name + "_step_sum"] = ([det_pids[0]], [data[detector][only_shots, bottom:top,left:right].sum()] )
|
||||
Parallel(n_jobs=n_jobs,verbose=10)(delayed(process_step)(i) for i in range(len(scan)))
|
||||
sd[roi.name + "_info"] = (
|
||||
[det_pids[0]],
|
||||
[f"roi {roi.name}: {left},{right}; {bottom},{top} (left, right, bottom, top)"],
|
||||
)
|
||||
sd[roi.name + "_mean_img"] = (
|
||||
[det_pids[0]],
|
||||
[data[detector][only_shots, bottom:top, left:right].mean(axis=(0))],
|
||||
)
|
||||
sd[roi.name + "_step_sum"] = (
|
||||
[det_pids[0]],
|
||||
[data[detector][only_shots, bottom:top, left:right].sum()],
|
||||
)
|
||||
|
||||
Parallel(n_jobs=n_jobs, verbose=10)(delayed(process_step)(i) for i in range(len(scan)))
|
||||
|
||||
|
||||
class ROI:
|
||||
"""Definition of region of interest (ROI) in image coordinates.
|
||||
|
||||
Example: ROI(left=10, right=20, bottom=100, top=200).
|
||||
Example: ROI(left=10, right=20, bottom=100, top=200).
|
||||
|
||||
Directions assume that lower left corner of image is at (x=0, y=0).
|
||||
"""
|
||||
@@ -191,9 +295,13 @@ class ROI:
|
||||
height: int = None,
|
||||
name: str = None,
|
||||
):
|
||||
|
||||
if None not in (left, right, bottom, top):
|
||||
self.left, self.right, self.bottom, self.top, = (
|
||||
(
|
||||
self.left,
|
||||
self.right,
|
||||
self.bottom,
|
||||
self.top,
|
||||
) = (
|
||||
left,
|
||||
right,
|
||||
bottom,
|
||||
@@ -203,12 +311,12 @@ class ROI:
|
||||
self.from_centers_widths(center_x, center_y, width, height)
|
||||
else:
|
||||
raise ValueError("No valid ROI definition.")
|
||||
|
||||
|
||||
# Check that ROI has a name or generate default
|
||||
if name is None:
|
||||
logger.warning(f"No ROI name given, generating: {self.__repr__()}")
|
||||
name = self.__repr__()
|
||||
|
||||
|
||||
self.name = name
|
||||
|
||||
def from_centers_widths(self, center_x, center_y, width, height):
|
||||
@@ -221,7 +329,7 @@ class ROI:
|
||||
@property
|
||||
def rows(self):
|
||||
return slice(self.bottom, self.top)
|
||||
|
||||
|
||||
@property
|
||||
def LeftRightBottomTop(self):
|
||||
return [self.left, self.right, self.bottom, self.top]
|
||||
@@ -229,85 +337,92 @@ class ROI:
|
||||
@property
|
||||
def cols(self):
|
||||
return slice(self.left, self.right)
|
||||
|
||||
|
||||
@property
|
||||
def width(self):
|
||||
return self.right - self.left
|
||||
|
||||
|
||||
@property
|
||||
def height(self):
|
||||
return self.top - self.bottom
|
||||
|
||||
|
||||
def __repr__(self):
|
||||
return f"ROI(bottom={self.bottom},top={self.top},left={self.left},right={self.right})"
|
||||
|
||||
def __eq__(self, other):
|
||||
# we disregard the name for comparisons
|
||||
return (self.left, self.right, self.bottom, self.top) == (other.left, other.right, other.bottom, other.top)
|
||||
|
||||
def __ne__(self, other):
|
||||
return not self == other
|
||||
return (self.left, self.right, self.bottom, self.top) == (other.left, other.right, other.bottom, other.top)
|
||||
|
||||
def __ne__(self, other):
|
||||
return not self == other
|
||||
|
||||
|
||||
|
||||
######################## Setting up paths ########################
|
||||
|
||||
|
||||
def heuristic_extract_pgroup(path=None):
|
||||
""" The function tries to guess the current p-group from the
|
||||
current working directory (default) or the contents of
|
||||
the given path.
|
||||
"""The function tries to guess the current p-group from the
|
||||
current working directory (default) or the contents of
|
||||
the given path.
|
||||
"""
|
||||
path = path or os.getcwd()
|
||||
|
||||
if "/p" in path:
|
||||
|
||||
# Find the last /p in the path
|
||||
p_index = path.rfind('/p')
|
||||
|
||||
# Find the last /p in the path
|
||||
p_index = path.rfind("/p")
|
||||
|
||||
# Cut the string and look at the next five letters after the last /p
|
||||
p_number = path[p_index+2:p_index+7]
|
||||
p_number = path[p_index + 2 : p_index + 7]
|
||||
|
||||
if not p_number.isdigit():
|
||||
raise KeyError("Automatic p-group extraction from the current working directory didn't work.")
|
||||
|
||||
|
||||
else:
|
||||
raise KeyError("Automatic p-group extraction from the current working directory didn't work.")
|
||||
|
||||
return p_number
|
||||
|
||||
return p_number
|
||||
|
||||
|
||||
def heuristic_extract_base_path():
|
||||
""" The function tries to guess the full path where the raw data is saved."""
|
||||
"""The function tries to guess the full path where the raw data is saved."""
|
||||
p_number = heuristic_extract_pgroup()
|
||||
base_path = f"/sf/cristallina/data/p{p_number}/raw/"
|
||||
return base_path
|
||||
return base_path
|
||||
|
||||
|
||||
def heuristic_extract_smalldata_path():
|
||||
""" The function tries to guess the full path where the small data is saved."""
|
||||
"""The function tries to guess the full path where the small data is saved."""
|
||||
p_number = heuristic_extract_pgroup()
|
||||
small_data_path = f"/das/work/p{str(p_number)[0:2]}/p{p_number}/smalldata/"
|
||||
return small_data_path
|
||||
return small_data_path
|
||||
|
||||
|
||||
######################## Little useful functions ########################
|
||||
|
||||
|
||||
def find_nearest(array, value):
|
||||
'''Finds an index in an array with a value that is nearest to given number'''
|
||||
"""Finds an index in an array with a value that is nearest to given number"""
|
||||
array = np.asarray(array)
|
||||
idx = (np.abs(array - value)).argmin()
|
||||
return idx
|
||||
|
||||
def find_two_nearest(time_array,percentage):
|
||||
'''Finds indeces of the two values that are the nearest to the given value in an array'''
|
||||
|
||||
def find_two_nearest(time_array, percentage):
|
||||
"""Finds indeces of the two values that are the nearest to the given value in an array"""
|
||||
array = np.asarray(time_array)
|
||||
value = (np.max(array)-np.min(array))*percentage+np.min(array)
|
||||
value = (np.max(array) - np.min(array)) * percentage + np.min(array)
|
||||
idx = (np.abs(array - value)).argmin()
|
||||
indices = np.sort([np.argsort(np.abs(array-value))[0], np.argsort(np.abs(array-value))[1]])
|
||||
return indices
|
||||
indices = np.sort([np.argsort(np.abs(array - value))[0], np.argsort(np.abs(array - value))[1]])
|
||||
return indices
|
||||
|
||||
|
||||
def gauss(x, H, A, x0, sigma):
|
||||
"""Returns gauss function value"""
|
||||
return H + A * np.exp(-(x - x0) ** 2 / (2 * sigma ** 2))
|
||||
return H + A * np.exp(-((x - x0) ** 2) / (2 * sigma**2))
|
||||
|
||||
|
||||
def gauss_fit(x, y, fit_details=None, plot=None):
|
||||
'''Returns [baseline_offset, Amplitude, center, sigma, FWHM]'''
|
||||
"""Returns [baseline_offset, Amplitude, center, sigma, FWHM]"""
|
||||
|
||||
# Initial guesses
|
||||
mean = sum(x * y) / sum(y)
|
||||
@@ -318,34 +433,35 @@ def gauss_fit(x, y, fit_details=None, plot=None):
|
||||
popt, pcov = curve_fit(gauss, x, y, p0=[min(y), max(y), mean, sigma])
|
||||
|
||||
# Add FWHM to the ouptuts
|
||||
popt = np.append(popt,2.35482 * popt[3])
|
||||
popt = np.append(popt, 2.35482 * popt[3])
|
||||
|
||||
# Print results
|
||||
if fit_details :
|
||||
print('The baseline offset is', popt[0])
|
||||
print('The center is', popt[2])
|
||||
print('The sigma of the fit is', popt[3])
|
||||
print('The maximum intensity is', popt[0] + popt[1])
|
||||
print('The Amplitude is', popt[1])
|
||||
print('The FWHM is', 2.35482 * popt[3])
|
||||
if fit_details:
|
||||
print("The baseline offset is", popt[0])
|
||||
print("The center is", popt[2])
|
||||
print("The sigma of the fit is", popt[3])
|
||||
print("The maximum intensity is", popt[0] + popt[1])
|
||||
print("The Amplitude is", popt[1])
|
||||
print("The FWHM is", 2.35482 * popt[3])
|
||||
|
||||
# Show plot
|
||||
if plot:
|
||||
plt.figure()
|
||||
plt.plot(x, y, '.k', label='data')
|
||||
plt.plot(x, gauss(x, *gauss_fit(x, y)[0:4]), '--r', label='fit')
|
||||
plt.plot(x, y, ".k", label="data")
|
||||
plt.plot(x, gauss(x, *gauss_fit(x, y)[0:4]), "--r", label="fit")
|
||||
|
||||
plt.legend()
|
||||
plt.title('Gaussian fit')
|
||||
plt.xlabel('X')
|
||||
plt.ylabel('Y')
|
||||
plt.title("Gaussian fit")
|
||||
plt.xlabel("X")
|
||||
plt.ylabel("Y")
|
||||
plt.show()
|
||||
|
||||
return popt
|
||||
|
||||
def xray_transmission(energy,thickness,material='Si',density=[]):
|
||||
'''Calculate x-ray tranmission for given energy, thickness and material. Default material is Si. Add material=element as a string for another material. Density as optional parameter'''
|
||||
|
||||
|
||||
def xray_transmission(energy, thickness, material="Si", density=[]):
|
||||
"""Calculate x-ray tranmission for given energy, thickness and material. Default material is Si. Add material=element as a string for another material. Density as optional parameter"""
|
||||
|
||||
mu = material_mu(material, energy)
|
||||
|
||||
if density == []:
|
||||
@@ -353,37 +469,46 @@ def xray_transmission(energy,thickness,material='Si',density=[]):
|
||||
else:
|
||||
mu_array = material_mu(formula, energy, density=density)
|
||||
|
||||
trans = np.exp(-0.1*(thickness*1000)*mu_array) # 0.1 is beccause mu is in 1/cm, thickness converted to mm from m
|
||||
trans = np.exp(
|
||||
-0.1 * (thickness * 1000) * mu_array
|
||||
) # 0.1 is beccause mu is in 1/cm, thickness converted to mm from m
|
||||
|
||||
return trans
|
||||
|
||||
return trans
|
||||
|
||||
######################## Unit conversions ########################
|
||||
|
||||
|
||||
def joules_to_eV(joules):
|
||||
"""Just a unit conversion"""
|
||||
eV = joules * 6.241509e18
|
||||
return eV
|
||||
|
||||
|
||||
def eV_to_joules(eV):
|
||||
"""Just a unit conversion"""
|
||||
joules = eV * 1.602176565e-19
|
||||
return joules
|
||||
|
||||
return joules
|
||||
|
||||
|
||||
def photon_energy_from_wavelength(wavelength):
|
||||
'''Returns photon energy in eV from wavelength in meters. Source https://www.kmlabs.com/en/wavelength-to-photon-energy-calculator'''
|
||||
Eph = 1239.8 / (wavelength*1e9)
|
||||
"""Returns photon energy in eV from wavelength in meters. Source https://www.kmlabs.com/en/wavelength-to-photon-energy-calculator"""
|
||||
Eph = 1239.8 / (wavelength * 1e9)
|
||||
return Eph
|
||||
|
||||
|
||||
def wavelength_from_photon_energy(Eph):
|
||||
'''Returns wavelength in meters from photon energy in eV. Source https://www.kmlabs.com/en/wavelength-to-photon-energy-calculator'''
|
||||
wavelength = 1239.8 / (Eph*1e9)
|
||||
"""Returns wavelength in meters from photon energy in eV. Source https://www.kmlabs.com/en/wavelength-to-photon-energy-calculator"""
|
||||
wavelength = 1239.8 / (Eph * 1e9)
|
||||
return wavelength
|
||||
|
||||
|
||||
def sigma_to_FWHM(sigma):
|
||||
"""Gaussian sigma to FWHM"""
|
||||
FWHM = sigma * 2.355
|
||||
return FWHM
|
||||
|
||||
|
||||
def FWHM_to_sigma(FWHM):
|
||||
"""FWHM to gaussian sigma"""
|
||||
sigma = FWHM / 2.355
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
import pytest
|
||||
import os
|
||||
import numpy as np
|
||||
import cristallina.utils as utils
|
||||
|
||||
from cristallina.utils import ROI, print_run_info, heuristic_extract_pgroup, gauss, find_nearest, xray_transmission
|
||||
|
||||
|
||||
@@ -10,13 +12,21 @@ def test_print(capsys):
|
||||
assert "17259343145" in captured.out
|
||||
assert "scan_parameters" in captured.out
|
||||
|
||||
|
||||
|
||||
def test_collect_metadata():
|
||||
test_pgroup_dir = "tests/data/p20841"
|
||||
df = utils.collect_runs_metadata(test_pgroup_dir)
|
||||
|
||||
assert df.iloc[1]["user_tag"] == "PMS, Magnet at 78K, 400V excitation"
|
||||
assert df.iloc[1]["start_pulseid"] == 17358560870
|
||||
|
||||
def test_ROI():
|
||||
"""API Tests"""
|
||||
r = ROI(left=1, right=2, top=4, bottom=2)
|
||||
|
||||
assert r.width == 1
|
||||
assert r.height == 2
|
||||
assert r.name is not None
|
||||
|
||||
|
||||
def test_extract_pgroup():
|
||||
|
||||
Reference in New Issue
Block a user