+import yaml
+import os
+import logging
+logger = logging.getLogger()
+
+import numpy as np
+from sfdata import SFDataFiles, sfdatafile, SFScanInfo, SFProcFile
+from xraydb import material_mu
+from joblib import Parallel, delayed, cpu_count
+
+[docs]def scan_info(run_number, base_path=None, small_data=True):
+
"""Returns SFScanInfo object for a given run number.
+
If there is are small data channels, they will be added (small_data=False to suppress their loading).
+
"""
+
if base_path == None:
+
base_path = heuristic_extract_base_path()
+
+
scan = SFScanInfo(f"{base_path}run{run_number:04}/meta/scan.json")
+
+
if small_data:
+
for i in range(len(scan.readbacks)):
+
sd_path_for_step=heuristic_extract_smalldata_path()+'run'+str(run_number).zfill(4)+'/acq'+str(i+1).zfill(4)+'.smalldata.h5'
+
scan.info['scan_files'][i].append(sd_path_for_step)
+
+
return scan
+
+[docs]def channel_names(run_number,verbose=False):
+
"""Prints channel names for a given run_number or scan object"""
+
if type(run_number) == SFScanInfo:
+
scan = run_number
+
else:
+
scan = scan_info(run_number)
+
+
channel_list = list(scan[0].keys())
+
+
if verbose:
+
print(channel_list)
+
+
return channel_list
+
+[docs]def print_run_info(
+
run_number=42, print_channels=True, extra_verbose=False, base_path=None,
+
):
+
"""Prints overview of run information.
+
+
Extra verbose output contains all files and pids.
+
"""
+
scan = scan_info(run_number, base_path=base_path)
+
short = {}
+
for key, value in scan.info.items():
+
if isinstance(value, list):
+
short[key] = value[:2]
+
short[key].append("...")
+
short[key].append(value[-1])
+
else:
+
short[key] = value
+
+
if extra_verbose:
+
print(yaml.dump(scan.info, sort_keys=False, default_flow_style=False))
+
else:
+
print(yaml.dump(short, sort_keys=False, default_flow_style=False))
+
+
print(f"Number of steps: {len(scan.info['scan_readbacks'])}")
+
+
total_size = 0
+
for files_in_step in scan.info["scan_files"]:
+
for file in files_in_step:
+
try:
+
total_size += os.path.getsize(file)
+
except FileNotFoundError as e:
+
pass
+
+
print(f"Total file size: {total_size/(1024*1024*1024):.1f} GB\n")
+
+
for step in scan:
+
ch = step.channels
+
print("\n".join([str(c) for c in ch]))
+
# print only channels for first step
+
break
+
+
+[docs]def process_run(run_number, rois,detector='JF16T03V01', calculate =None, only_shots=slice(None), n_jobs=cpu_count()):
+
"""Process rois for a given detector. Save the results small data in the res/small_data/run...
+
By default only sum of rois is calculated, [mean,std,img] can be added to the "calculate" optional parameter.
+
"""
+
# Load scan object with SFScanInfo
+
scan = scan_info(run_number,small_data=False)
+
+
# Set the path for later small data saving
+
path_with_run_folder = heuristic_extract_smalldata_path()+'run'+str(run_number).zfill(4)
+
+
# Make the small data run folder if it doesn't exist
+
if not os.path.exists( path_with_run_folder ):
+
os.mkdir( path_with_run_folder )
+
+
# Add scan info into the small_data_object
+
#for key in scan.info.keys():
+
# d[key]= info.info[key]
+
+
# Check if there is only one roi. If yes, make a list of it so it can be iterated over.
+
if isinstance(rois, ROI):
+
rois = [rois]
+
+
def process_step(i):
+
scan = scan_info(run_number,small_data=False)
+
+
step = scan[i]
+
with step as data:
+
with SFProcFile(f"{path_with_run_folder}/acq{str(i+1).zfill(4)}.smalldata.h5", mode="w") as sd:
+
+
# Calculate everything related to JF_rois
+
for roi in rois:
+
+
bottom, top, left, right = roi.bottom, roi.top, roi.left, roi.right
+
+
# Pulse ids for saving the new channels
+
det_pids = data[detector].pids
+
sd[roi.name] = det_pids[only_shots], data[detector][only_shots, bottom:top,left:right].sum(axis=(1, 2))
+
if calculate:
+
if 'mean' in calculate:
+
sd[roi.name+"_mean"] = (det_pids[only_shots], data[detector][only_shots, bottom:top,left:right].mean(axis=(1, 2)))
+
if 'std' in calculate:
+
sd[roi.name+"_std"] = (det_pids[only_shots], data[detector][only_shots, bottom:top,left:right].std(axis=(1, 2)))
+
if 'img' in calculate:
+
sd[f'{roi.name}_img'] = (det_pids[only_shots], data[detector][only_shots, bottom:top,left:right].data)
+
+
# Currently meta files can't be read by SFData, this will be modified by Sven and then we can use it. For now saving in roi_info
+
#sd.meta[roi.name+"_info"] = f"roi {roi.name}: {left},{right}; {bottom},{top} (left, right, bottom, top)"
+
+
# These channels have only one dataset per step of the scan, so we take the first pulseID
+
sd[roi.name + "_info"] =([det_pids[0]], [f"roi {roi.name}: {left},{right}; {bottom},{top} (left, right, bottom, top)"])
+
sd[roi.name + "_mean_img"] = ([det_pids[0]], [data[detector][:, bottom:top,left:right].mean(axis=(0))] )
+
+
Parallel(n_jobs=n_jobs,verbose=10)(delayed(process_step)(i) for i in range(len(scan)))
+
+[docs]class ROI:
+
"""Definition of region of interest (ROI) in image coordinates.
+
+
Example: ROI(left=10, right=20, bottom=100, top=200).
+
+
Directions assume that lower left corner of image is at (x=0, y=0).
+
"""
+
+
def __init__(
+
self,
+
left: int = None,
+
right: int = None,
+
top: int = None,
+
bottom: int = None,
+
center_x: int = None,
+
center_y: int = None,
+
width: int = None,
+
height: int = None,
+
name: str = None,
+
):
+
+
if None not in (left, right, bottom, top):
+
self.left, self.right, self.bottom, self.top, = (
+
left,
+
right,
+
bottom,
+
top,
+
)
+
elif None not in (center_x, center_y, width, height):
+
self.from_centers_widths(center_x, center_y, width, height)
+
else:
+
raise ValueError("No valid ROI definition.")
+
+
# Check that ROI has a name or generate default
+
if name is None:
+
logger.warning(f"No ROI name given, generating: {self.__repr__()}")
+
name = self.__repr__()
+
+
self.name = name
+
+
[docs] def from_centers_widths(self, center_x, center_y, width, height):
+
self.left = center_x - width // 2
+
self.right = center_x + width // 2
+
+
self.top = center_y + height // 2
+
self.bottom = center_y - height // 2
+
+
@property
+
def rows(self):
+
return slice(self.bottom, self.top)
+
+
@property
+
def LeftRightBottomTop(self):
+
return [self.left, self.right, self.bottom, self.top]
+
+
@property
+
def cols(self):
+
return slice(self.left, self.right)
+
+
@property
+
def width(self):
+
return self.right - self.left
+
+
@property
+
def height(self):
+
return self.top - self.bottom
+
+
def __repr__(self):
+
return f"ROI(bottom={self.bottom},top={self.top},left={self.left},right={self.right})"
+
+
def __eq__(self, other):
+
# we disregard the name for comparisons
+
return (self.left, self.right, self.bottom, self.top) == (other.left, other.right, other.bottom, other.top)
+
+
def __ne__(self, other):
+
return not self == other
+
+
+######################## Setting up paths ########################
+
+
+
+
+
+
+
+######################## Little useful functions ########################
+
+[docs]def find_nearest(array, value):
+
'''Finds an index in an array with a value that is nearest to given number'''
+
array = np.asarray(array)
+
idx = (np.abs(array - value)).argmin()
+
return idx
+
+[docs]def find_two_nearest(time_array,percentage):
+
'''Finds indeces of the two values that are the nearest to the given value in an array'''
+
array = np.asarray(time_array)
+
value = (np.max(array)-np.min(array))*percentage+np.min(array)
+
idx = (np.abs(array - value)).argmin()
+
indices = np.sort([np.argsort(np.abs(array-value))[0], np.argsort(np.abs(array-value))[1]])
+
return indices
+
+[docs]def gauss(x, H, A, x0, sigma):
+
"""Returns gauss function value"""
+
return H + A * np.exp(-(x - x0) ** 2 / (2 * sigma ** 2))
+
+[docs]def gauss_fit(x, y, fit_details=None, plot=None):
+
'''Returns [baseline_offset, Amplitude, center, sigma, FWHM]'''
+
+
# Initial guesses
+
mean = sum(x * y) / sum(y)
+
sigma = np.sqrt(sum(y * (x - mean) ** 2) / sum(y))
+
FWHM = 2.35482 * sigma
+
+
# Fit
+
popt, pcov = curve_fit(gauss, x, y, p0=[min(y), max(y), mean, sigma])
+
+
# Add FWHM to the ouptuts
+
popt = np.append(popt,2.35482 * popt[3])
+
+
# Print results
+
if fit_details :
+
print('The baseline offset is', popt[0])
+
print('The center is', popt[2])
+
print('The sigma of the fit is', popt[3])
+
print('The maximum intensity is', popt[0] + popt[1])
+
print('The Amplitude is', popt[1])
+
print('The FWHM is', 2.35482 * popt[3])
+
+
# Show plot
+
if plot:
+
plt.figure()
+
plt.plot(x, y, '.k', label='data')
+
plt.plot(x, gauss(x, *gauss_fit(x, y)[0:4]), '--r', label='fit')
+
+
plt.legend()
+
plt.title('Gaussian fit')
+
plt.xlabel('X')
+
plt.ylabel('Y')
+
plt.show()
+
+
return popt
+
+[docs]def xray_transmission(energy,thickness,material='Si',density=[]):
+
'''Calculate x-ray tranmission for given energy, thickness and material. Default material is Si. Add material=element as a string for another material. Density as optional parameter'''
+
+
mu = material_mu(material, energy)
+
+
if density == []:
+
mu_array = material_mu(material, energy)
+
else:
+
mu_array = material_mu(formula, energy, density=density)
+
+
trans = np.exp(-0.1*(thickness*1000)*mu_array) # 0.1 is beccause mu is in 1/cm, thickness converted to mm from m
+
+
return trans
+
+######################## Unit conversions ########################
+
+[docs]def joules_to_eV(joules):
+
"""Just a unit conversion"""
+
eV = joules * 6.241509e18
+
return eV
+
+[docs]def eV_to_joules(eV):
+
"""Just a unit conversion"""
+
joules = eV * 1.602176565e-19
+
return joules
+
+[docs]def photon_energy_from_wavelength(wavelength):
+
'''Returns photon energy in eV from wavelength in meters. Source https://www.kmlabs.com/en/wavelength-to-photon-energy-calculator'''
+
Eph = 1239.8 / (wavelength*1e9)
+
return Eph
+
+[docs]def wavelength_from_photon_energy(Eph):
+
'''Returns wavelength in meters from photon energy in eV. Source https://www.kmlabs.com/en/wavelength-to-photon-energy-calculator'''
+
wavelength = 1239.8 / (Eph*1e9)
+
return wavelength
+
+[docs]def sigma_to_FWHM(sigma):
+
"""Gaussian sigma to FWHM"""
+
FWHM = sigma * 2.355
+
return FWHM
+
+[docs]def FWHM_to_sigma(FWHM):
+
"""FWHM to gaussian sigma"""
+
sigma = FWHM / 2.355
+
return sigma
+