Files
cristallina_analysis_package/src/cristallina/utils.py
2023-04-03 15:54:57 +02:00

384 lines
14 KiB
Python

import yaml
import os
import logging
logger = logging.getLogger()
import numpy as np
from sfdata import SFDataFiles, sfdatafile, SFScanInfo, SFProcFile
from xraydb import material_mu
from joblib import Parallel, delayed, cpu_count
def scan_info(run_number, base_path=None, small_data=True):
"""Returns SFScanInfo object for a given run number.
If there is are small data channels, they will be added (small_data=False to suppress their loading).
"""
if base_path == None:
base_path = heuristic_extract_base_path()
scan = SFScanInfo(f"{base_path}run{run_number:04}/meta/scan.json")
try:
if small_data:
for i in range(len(scan.readbacks)):
sd_path_for_step=heuristic_extract_smalldata_path()+'run'+str(run_number).zfill(4)+'/acq'+str(i+1).zfill(4)+'.smalldata.h5'
scan.info['scan_files'][i].append(sd_path_for_step)
except KeyError:
logger.warning("Failed to load smalldata.")
return scan
def get_scan_from_run_number_or_scan(run_number_or_scan,small_data=True):
"""Returns SFScanInfo object from run number or SFScanInfo object (then just passes that to output)"""
if type(run_number_or_scan) == SFScanInfo:
scan = run_number_or_scan
else:
scan = scan_info(run_number_or_scan,small_data=small_data)
return scan
def get_run_number_from_run_number_or_scan(run_number_or_scan,small_data=True):
"""Returns run number from run number or SFScanInfo object"""
if type(run_number_or_scan) == int:
rn = run_number_or_scan
elif type(run_number_or_scan) == SFScanInfo:
rn = int(str(run_number_or_scan.fs)[-19:-15])
else:
raise ValueError("Input must be an int or SFScanInfo object")
return rn
def channel_names(run_number,verbose=False):
"""Prints channel names for a given run_number or scan object"""
if type(run_number) == SFScanInfo:
scan = run_number
else:
scan = scan_info(run_number)
channel_list = list(scan[0].keys())
if verbose:
print(channel_list)
return channel_list
def print_run_info(
run_number=42, print_channels=True, extra_verbose=False, base_path=None,
):
"""Prints overview of run information.
Extra verbose output contains all files and pids.
"""
scan = scan_info(run_number, base_path=base_path)
short = {}
for key, value in scan.info.items():
if isinstance(value, list):
short[key] = value[:2]
short[key].append("...")
short[key].append(value[-1])
else:
short[key] = value
if extra_verbose:
print(yaml.dump(scan.info, sort_keys=False, default_flow_style=False))
else:
print(yaml.dump(short, sort_keys=False, default_flow_style=False))
print(f"Number of steps: {len(scan.info['scan_readbacks'])}")
total_size = 0
for files_in_step in scan.info["scan_files"]:
for file in files_in_step:
try:
total_size += os.path.getsize(file)
except (FileNotFoundError, PermissionError) as e:
logger.warning(f"Cannot access {file}.")
print(f"Total file size: {total_size/(1024*1024*1024):.1f} GB\n")
for step in scan:
ch = step.channels
print("\n".join([str(c) for c in ch]))
# print only channels for first step
break
def number_of_steps(scan_number_or_scan):
"""Returns number of steps for a scan object or run number"""
scan = get_scan_from_run_number_or_scan(scan_number_or_scan)
return len(scan.info['scan_readbacks'])
def process_run(run_number, rois,detector='JF16T03V01', calculate =None, only_shots=slice(None), n_jobs=cpu_count()-2):
"""Process rois for a given detector. Save the results small data in the res/small_data/run...
By default only sum of rois is calculated, [mean,std,img] can be added to the "calculate" optional parameter.
"""
rn = get_run_number_from_run_number_or_scan(run_number)
# Load scan object with SFScanInfo
scan = scan_info(rn,small_data=False)
# Make the small data folder if it doesn't exist
if not os.path.exists( heuristic_extract_smalldata_path() ):
os.mkdir( heuristic_extract_smalldata_path() )
# Set the path for later small data saving
path_with_run_folder = heuristic_extract_smalldata_path()+'run'+str(run_number).zfill(4)
# Make the small data run folder if it doesn't exist
if not os.path.exists( path_with_run_folder ):
os.mkdir( path_with_run_folder )
# Add scan info into the small_data_object
#for key in scan.info.keys():
# d[key]= info.info[key]
# Check if there is only one roi. If yes, make a list of it so it can be iterated over.
if isinstance(rois, ROI):
rois = [rois]
def process_step(i):
scan = scan_info(run_number,small_data=False)
step = scan[i]
with step as data:
with SFProcFile(f"{path_with_run_folder}/acq{str(i+1).zfill(4)}.smalldata.h5", mode="w") as sd:
# Calculate everything related to JF_rois
for roi in rois:
bottom, top, left, right = roi.bottom, roi.top, roi.left, roi.right
# Pulse ids for saving the new channels
det_pids = data[detector].pids
sd[roi.name] = det_pids[only_shots], data[detector][only_shots, bottom:top,left:right].sum(axis=(1, 2))
if calculate:
if 'means' in calculate:
sd[roi.name+"_means"] = (det_pids[only_shots], data[detector][only_shots, bottom:top,left:right].mean(axis=(1, 2)))
if 'std' in calculate:
sd[roi.name+"_std"] = (det_pids[only_shots], data[detector][only_shots, bottom:top,left:right].std(axis=(1, 2)))
if 'img' in calculate:
sd[f'{roi.name}_img'] = (det_pids[only_shots], data[detector][only_shots, bottom:top,left:right].data)
# Currently meta files can't be read by SFData, this will be modified by Sven and then we can use it. For now saving in roi_info
#sd.meta[roi.name+"_info"] = f"roi {roi.name}: {left},{right}; {bottom},{top} (left, right, bottom, top)"
# These channels have only one dataset per step of the scan, so we take the first pulseID
sd[roi.name + "_info"] =([det_pids[0]], [f"roi {roi.name}: {left},{right}; {bottom},{top} (left, right, bottom, top)"])
sd[roi.name + "_mean_img"] = ([det_pids[0]], [data[detector][only_shots, bottom:top,left:right].mean(axis=(0))] )
sd[roi.name + "_step_sum"] = ([det_pids[0]], [data[detector][only_shots, bottom:top,left:right].sum()] )
Parallel(n_jobs=n_jobs,verbose=10)(delayed(process_step)(i) for i in range(len(scan)))
class ROI:
"""Definition of region of interest (ROI) in image coordinates.
Example: ROI(left=10, right=20, bottom=100, top=200).
Directions assume that lower left corner of image is at (x=0, y=0).
"""
def __init__(
self,
left: int = None,
right: int = None,
top: int = None,
bottom: int = None,
center_x: int = None,
center_y: int = None,
width: int = None,
height: int = None,
name: str = None,
):
if None not in (left, right, bottom, top):
self.left, self.right, self.bottom, self.top, = (
left,
right,
bottom,
top,
)
elif None not in (center_x, center_y, width, height):
self.from_centers_widths(center_x, center_y, width, height)
else:
raise ValueError("No valid ROI definition.")
# Check that ROI has a name or generate default
if name is None:
logger.warning(f"No ROI name given, generating: {self.__repr__()}")
name = self.__repr__()
self.name = name
def from_centers_widths(self, center_x, center_y, width, height):
self.left = center_x - width // 2
self.right = center_x + width // 2
self.top = center_y + height // 2
self.bottom = center_y - height // 2
@property
def rows(self):
return slice(self.bottom, self.top)
@property
def LeftRightBottomTop(self):
return [self.left, self.right, self.bottom, self.top]
@property
def cols(self):
return slice(self.left, self.right)
@property
def width(self):
return self.right - self.left
@property
def height(self):
return self.top - self.bottom
def __repr__(self):
return f"ROI(bottom={self.bottom},top={self.top},left={self.left},right={self.right})"
def __eq__(self, other):
# we disregard the name for comparisons
return (self.left, self.right, self.bottom, self.top) == (other.left, other.right, other.bottom, other.top)
def __ne__(self, other):
return not self == other
######################## Setting up paths ########################
def heuristic_extract_pgroup(path=None):
""" The function tries to guess the current p-group from the
current working directory (default) or the contents of
the given path.
"""
path = path or os.getcwd()
if "/p" in path:
# Find the last /p in the path
p_index = path.rfind('/p')
# Cut the string and look at the next five letters after the last /p
p_number = path[p_index+2:p_index+7]
if not p_number.isdigit():
raise KeyError("Automatic p-group extraction from the current working directory didn't work.")
else:
raise KeyError("Automatic p-group extraction from the current working directory didn't work.")
return p_number
def heuristic_extract_base_path():
""" The function tries to guess the full path where the raw data is saved."""
p_number = heuristic_extract_pgroup()
base_path = f"/sf/cristallina/data/p{p_number}/raw/"
return base_path
def heuristic_extract_smalldata_path():
""" The function tries to guess the full path where the small data is saved."""
p_number = heuristic_extract_pgroup()
small_data_path = f"/das/work/p{str(p_number)[0:2]}/p{p_number}/smalldata/"
return small_data_path
######################## Little useful functions ########################
def find_nearest(array, value):
'''Finds an index in an array with a value that is nearest to given number'''
array = np.asarray(array)
idx = (np.abs(array - value)).argmin()
return idx
def find_two_nearest(time_array,percentage):
'''Finds indeces of the two values that are the nearest to the given value in an array'''
array = np.asarray(time_array)
value = (np.max(array)-np.min(array))*percentage+np.min(array)
idx = (np.abs(array - value)).argmin()
indices = np.sort([np.argsort(np.abs(array-value))[0], np.argsort(np.abs(array-value))[1]])
return indices
def gauss(x, H, A, x0, sigma):
"""Returns gauss function value"""
return H + A * np.exp(-(x - x0) ** 2 / (2 * sigma ** 2))
def gauss_fit(x, y, fit_details=None, plot=None):
'''Returns [baseline_offset, Amplitude, center, sigma, FWHM]'''
# Initial guesses
mean = sum(x * y) / sum(y)
sigma = np.sqrt(sum(y * (x - mean) ** 2) / sum(y))
FWHM = 2.35482 * sigma
# Fit
popt, pcov = curve_fit(gauss, x, y, p0=[min(y), max(y), mean, sigma])
# Add FWHM to the ouptuts
popt = np.append(popt,2.35482 * popt[3])
# Print results
if fit_details :
print('The baseline offset is', popt[0])
print('The center is', popt[2])
print('The sigma of the fit is', popt[3])
print('The maximum intensity is', popt[0] + popt[1])
print('The Amplitude is', popt[1])
print('The FWHM is', 2.35482 * popt[3])
# Show plot
if plot:
plt.figure()
plt.plot(x, y, '.k', label='data')
plt.plot(x, gauss(x, *gauss_fit(x, y)[0:4]), '--r', label='fit')
plt.legend()
plt.title('Gaussian fit')
plt.xlabel('X')
plt.ylabel('Y')
plt.show()
return popt
def xray_transmission(energy,thickness,material='Si',density=[]):
'''Calculate x-ray tranmission for given energy, thickness and material. Default material is Si. Add material=element as a string for another material. Density as optional parameter'''
mu = material_mu(material, energy)
if density == []:
mu_array = material_mu(material, energy)
else:
mu_array = material_mu(formula, energy, density=density)
trans = np.exp(-0.1*(thickness*1000)*mu_array) # 0.1 is beccause mu is in 1/cm, thickness converted to mm from m
return trans
######################## Unit conversions ########################
def joules_to_eV(joules):
"""Just a unit conversion"""
eV = joules * 6.241509e18
return eV
def eV_to_joules(eV):
"""Just a unit conversion"""
joules = eV * 1.602176565e-19
return joules
def photon_energy_from_wavelength(wavelength):
'''Returns photon energy in eV from wavelength in meters. Source https://www.kmlabs.com/en/wavelength-to-photon-energy-calculator'''
Eph = 1239.8 / (wavelength*1e9)
return Eph
def wavelength_from_photon_energy(Eph):
'''Returns wavelength in meters from photon energy in eV. Source https://www.kmlabs.com/en/wavelength-to-photon-energy-calculator'''
wavelength = 1239.8 / (Eph*1e9)
return wavelength
def sigma_to_FWHM(sigma):
"""Gaussian sigma to FWHM"""
FWHM = sigma * 2.355
return FWHM
def FWHM_to_sigma(FWHM):
"""FWHM to gaussian sigma"""
sigma = FWHM / 2.355
return sigma