From 4ff6072056ec6ab27604e76699d8f807ad3804fe Mon Sep 17 00:00:00 2001 From: Alexander Steppke Date: Wed, 16 Aug 2023 18:05:43 +0200 Subject: [PATCH] added metadata collection routine and tests --- src/cristallina/utils.py | 337 +++++++++++++++++++++++++++------------ tests/test_utils.py | 12 +- 2 files changed, 242 insertions(+), 107 deletions(-) diff --git a/src/cristallina/utils.py b/src/cristallina/utils.py index 34b10ab..9117545 100644 --- a/src/cristallina/utils.py +++ b/src/cristallina/utils.py @@ -1,43 +1,119 @@ import yaml import os +import json +from pathlib import Path +from collections import defaultdict + import logging logger = logging.getLogger() import numpy as np +import pandas as pd + import sfdata import sfdata.errors from sfdata import SFDataFiles, sfdatafile, SFScanInfo, SFProcFile from xraydb import material_mu from joblib import Parallel, delayed, cpu_count + +def collect_runs_metadata(pgroup_data_dir: str | os.PathLike): + """ + Generates metadata overview for all runs in a given p-group data directory. + + Not all available metadata is included, we skip e.g. lists of recorded channels. + + Returns: Pandas dataframe with the individual acquisitions and the respective metadata. + """ + + measurements = defaultdict(list) + + raw_dir = Path(pgroup_data_dir) / "raw" + + run_paths = sorted([run for run in Path(raw_dir).glob("run[0-9][0-9][0-9][0-9]")]) + + for run_path in run_paths: + # overall data in scan.json + with open(run_path / "meta/scan.json", "r") as file: + scan_meta = json.load(file) + + # individual metadata in acqXXXX.json + num_acq = len(scan_meta["scan_files"]) + acq_jsons = sorted([acq for acq in Path(run_path).glob("meta/acq[0-9][0-9][0-9][0-9].json")]) + + for i, acq in enumerate(acq_jsons): + with open(acq) as file: + acq_meta = json.load(file) + + for parameter in [ + "rate_multiplicator", + "user_tag", + "request_time", + "unique_acquisition_run_number", + "start_pulseid", + "stop_pulseid", + "client_name", + ]: + try: + measurements[parameter].append(acq_meta[parameter]) + except KeyError: + logger.info(f'No parameter {parameter} in metadata found, skipping.') + measurements[parameter] = None + + for scan_parameter in ["scan_readbacks", "scan_step_info", "scan_values", "scan_readbacks_raw"]: + # in most scans we are only scanning one parameter at a time, so convert to single object here instead of list + meta_parameter = ( + scan_meta[scan_parameter][i][0] + if len(scan_meta[scan_parameter][i]) == 1 + else scan_meta[scan_parameter][i] + ) + measurements[scan_parameter].append(meta_parameter) + + # generate file pattern to cover all *.h5 of acquisition (this assumes we have some PVDATA which should almost always be the case). + measurements["files"].append(str(scan_meta["scan_files"][i][0]).replace("PVDATA", "*")) + + df = pd.DataFrame(measurements) + + return df + + def scan_info(run_number, base_path=None, small_data=True): - """Returns SFScanInfo object for a given run number. + """Returns SFScanInfo object for a given run number. If there is are small data channels, they will be added (small_data=False to suppress their loading). """ if base_path == None: base_path = heuristic_extract_base_path() - + scan = SFScanInfo(f"{base_path}run{run_number:04}/meta/scan.json") - + try: if small_data: for i in range(len(scan.readbacks)): - sd_path_for_step=heuristic_extract_smalldata_path()+'run'+str(run_number).zfill(4)+'/acq'+str(i+1).zfill(4)+'.smalldata*.h5' - scan.info['scan_files'][i].append(sd_path_for_step) + sd_path_for_step = ( + heuristic_extract_smalldata_path() + + "run" + + str(run_number).zfill(4) + + "/acq" + + str(i + 1).zfill(4) + + ".smalldata*.h5" + ) + scan.info["scan_files"][i].append(sd_path_for_step) except KeyError: - logger.warning("Failed to load smalldata.") + logger.warning("Failed to load smalldata.") return scan -def get_scan_from_run_number_or_scan(run_number_or_scan,small_data=True): + +def get_scan_from_run_number_or_scan(run_number_or_scan, small_data=True): """Returns SFScanInfo object from run number or SFScanInfo object (then just passes that to output)""" if type(run_number_or_scan) == SFScanInfo: scan = run_number_or_scan else: - scan = scan_info(run_number_or_scan,small_data=small_data) + scan = scan_info(run_number_or_scan, small_data=small_data) return scan -def get_run_number_from_run_number_or_scan(run_number_or_scan,small_data=True): + +def get_run_number_from_run_number_or_scan(run_number_or_scan, small_data=True): """Returns run number from run number or SFScanInfo object""" if type(run_number_or_scan) == int: rn = run_number_or_scan @@ -47,22 +123,27 @@ def get_run_number_from_run_number_or_scan(run_number_or_scan,small_data=True): raise ValueError("Input must be an int or SFScanInfo object") return rn -def channel_names(run_number,verbose=False): + +def channel_names(run_number, verbose=False): """Prints channel names for a given run_number or scan object""" if type(run_number) == SFScanInfo: scan = run_number else: scan = scan_info(run_number) - + channel_list = list(scan[0].keys()) - + if verbose: print(channel_list) - + return channel_list + def print_run_info( - run_number=42, print_channels=True, extra_verbose=False, base_path=None, + run_number=42, + print_channels=True, + extra_verbose=False, + base_path=None, ): """Prints overview of run information. @@ -109,72 +190,95 @@ def print_run_info( def number_of_steps(scan_number_or_scan): """Returns number of steps for a scan object or run number""" scan = get_scan_from_run_number_or_scan(scan_number_or_scan) - return len(scan.info['scan_readbacks']) + return len(scan.info["scan_readbacks"]) -def process_run(run_number, rois,detector='JF16T03V01', calculate =None, only_shots=slice(None), n_jobs=cpu_count()-2): + +def process_run( + run_number, rois, detector="JF16T03V01", calculate=None, only_shots=slice(None), n_jobs=cpu_count() - 2 +): """Process rois for a given detector. Save the results small data in the res/small_data/run... By default only sum of rois is calculated, [mean,std,img] can be added to the "calculate" optional parameter. - """ + """ rn = get_run_number_from_run_number_or_scan(run_number) - + # Load scan object with SFScanInfo - scan = scan_info(rn,small_data=False) - + scan = scan_info(rn, small_data=False) + # Make the small data folder if it doesn't exist - if not os.path.exists( heuristic_extract_smalldata_path() ): - os.mkdir( heuristic_extract_smalldata_path() ) + if not os.path.exists(heuristic_extract_smalldata_path()): + os.mkdir(heuristic_extract_smalldata_path()) # Set the path for later small data saving - path_with_run_folder = heuristic_extract_smalldata_path()+'run'+str(run_number).zfill(4) + path_with_run_folder = heuristic_extract_smalldata_path() + "run" + str(run_number).zfill(4) # Make the small data run folder if it doesn't exist - if not os.path.exists( path_with_run_folder ): - os.mkdir( path_with_run_folder ) - + if not os.path.exists(path_with_run_folder): + os.mkdir(path_with_run_folder) + # Add scan info into the small_data_object - #for key in scan.info.keys(): + # for key in scan.info.keys(): # d[key]= info.info[key] - + # Check if there is only one roi. If yes, make a list of it so it can be iterated over. if isinstance(rois, ROI): rois = [rois] def process_step(i): - scan = scan_info(run_number,small_data=False) - + scan = scan_info(run_number, small_data=False) + step = scan[i] with step as data: with SFProcFile(f"{path_with_run_folder}/acq{str(i+1).zfill(4)}.smalldata.h5", mode="w") as sd: - # Calculate everything related to JF_rois for roi in rois: - bottom, top, left, right = roi.bottom, roi.top, roi.left, roi.right # Pulse ids for saving the new channels det_pids = data[detector].pids - sd[roi.name] = det_pids[only_shots], data[detector][only_shots, bottom:top,left:right].sum(axis=(1, 2)) + sd[roi.name] = det_pids[only_shots], data[detector][only_shots, bottom:top, left:right].sum( + axis=(1, 2) + ) if calculate: - if 'means' in calculate: - sd[roi.name+"_means"] = (det_pids[only_shots], data[detector][only_shots, bottom:top,left:right].mean(axis=(1, 2))) - if 'std' in calculate: - sd[roi.name+"_std"] = (det_pids[only_shots], data[detector][only_shots, bottom:top,left:right].std(axis=(1, 2))) - if 'img' in calculate: - sd[f'{roi.name}_img'] = (det_pids[only_shots], data[detector][only_shots, bottom:top,left:right].data) - + if "means" in calculate: + sd[roi.name + "_means"] = ( + det_pids[only_shots], + data[detector][only_shots, bottom:top, left:right].mean(axis=(1, 2)), + ) + if "std" in calculate: + sd[roi.name + "_std"] = ( + det_pids[only_shots], + data[detector][only_shots, bottom:top, left:right].std(axis=(1, 2)), + ) + if "img" in calculate: + sd[f"{roi.name}_img"] = ( + det_pids[only_shots], + data[detector][only_shots, bottom:top, left:right].data, + ) + # Currently meta files can't be read by SFData, this will be modified by Sven and then we can use it. For now saving in roi_info - #sd.meta[roi.name+"_info"] = f"roi {roi.name}: {left},{right}; {bottom},{top} (left, right, bottom, top)" + # sd.meta[roi.name+"_info"] = f"roi {roi.name}: {left},{right}; {bottom},{top} (left, right, bottom, top)" # These channels have only one dataset per step of the scan, so we take the first pulseID - sd[roi.name + "_info"] =([det_pids[0]], [f"roi {roi.name}: {left},{right}; {bottom},{top} (left, right, bottom, top)"]) - sd[roi.name + "_mean_img"] = ([det_pids[0]], [data[detector][only_shots, bottom:top,left:right].mean(axis=(0))] ) - sd[roi.name + "_step_sum"] = ([det_pids[0]], [data[detector][only_shots, bottom:top,left:right].sum()] ) - Parallel(n_jobs=n_jobs,verbose=10)(delayed(process_step)(i) for i in range(len(scan))) + sd[roi.name + "_info"] = ( + [det_pids[0]], + [f"roi {roi.name}: {left},{right}; {bottom},{top} (left, right, bottom, top)"], + ) + sd[roi.name + "_mean_img"] = ( + [det_pids[0]], + [data[detector][only_shots, bottom:top, left:right].mean(axis=(0))], + ) + sd[roi.name + "_step_sum"] = ( + [det_pids[0]], + [data[detector][only_shots, bottom:top, left:right].sum()], + ) + + Parallel(n_jobs=n_jobs, verbose=10)(delayed(process_step)(i) for i in range(len(scan))) + class ROI: """Definition of region of interest (ROI) in image coordinates. - Example: ROI(left=10, right=20, bottom=100, top=200). + Example: ROI(left=10, right=20, bottom=100, top=200). Directions assume that lower left corner of image is at (x=0, y=0). """ @@ -191,9 +295,13 @@ class ROI: height: int = None, name: str = None, ): - if None not in (left, right, bottom, top): - self.left, self.right, self.bottom, self.top, = ( + ( + self.left, + self.right, + self.bottom, + self.top, + ) = ( left, right, bottom, @@ -203,12 +311,12 @@ class ROI: self.from_centers_widths(center_x, center_y, width, height) else: raise ValueError("No valid ROI definition.") - + # Check that ROI has a name or generate default if name is None: logger.warning(f"No ROI name given, generating: {self.__repr__()}") name = self.__repr__() - + self.name = name def from_centers_widths(self, center_x, center_y, width, height): @@ -221,7 +329,7 @@ class ROI: @property def rows(self): return slice(self.bottom, self.top) - + @property def LeftRightBottomTop(self): return [self.left, self.right, self.bottom, self.top] @@ -229,85 +337,92 @@ class ROI: @property def cols(self): return slice(self.left, self.right) - + @property def width(self): return self.right - self.left - + @property def height(self): return self.top - self.bottom - + def __repr__(self): return f"ROI(bottom={self.bottom},top={self.top},left={self.left},right={self.right})" def __eq__(self, other): # we disregard the name for comparisons - return (self.left, self.right, self.bottom, self.top) == (other.left, other.right, other.bottom, other.top) - - def __ne__(self, other): - return not self == other + return (self.left, self.right, self.bottom, self.top) == (other.left, other.right, other.bottom, other.top) + + def __ne__(self, other): + return not self == other + - ######################## Setting up paths ######################## + def heuristic_extract_pgroup(path=None): - """ The function tries to guess the current p-group from the - current working directory (default) or the contents of - the given path. + """The function tries to guess the current p-group from the + current working directory (default) or the contents of + the given path. """ path = path or os.getcwd() if "/p" in path: - - # Find the last /p in the path - p_index = path.rfind('/p') - + # Find the last /p in the path + p_index = path.rfind("/p") + # Cut the string and look at the next five letters after the last /p - p_number = path[p_index+2:p_index+7] + p_number = path[p_index + 2 : p_index + 7] if not p_number.isdigit(): raise KeyError("Automatic p-group extraction from the current working directory didn't work.") - + else: raise KeyError("Automatic p-group extraction from the current working directory didn't work.") - - return p_number + + return p_number + def heuristic_extract_base_path(): - """ The function tries to guess the full path where the raw data is saved.""" + """The function tries to guess the full path where the raw data is saved.""" p_number = heuristic_extract_pgroup() base_path = f"/sf/cristallina/data/p{p_number}/raw/" - return base_path + return base_path + def heuristic_extract_smalldata_path(): - """ The function tries to guess the full path where the small data is saved.""" + """The function tries to guess the full path where the small data is saved.""" p_number = heuristic_extract_pgroup() small_data_path = f"/das/work/p{str(p_number)[0:2]}/p{p_number}/smalldata/" - return small_data_path + return small_data_path + ######################## Little useful functions ######################## + def find_nearest(array, value): - '''Finds an index in an array with a value that is nearest to given number''' + """Finds an index in an array with a value that is nearest to given number""" array = np.asarray(array) idx = (np.abs(array - value)).argmin() return idx -def find_two_nearest(time_array,percentage): - '''Finds indeces of the two values that are the nearest to the given value in an array''' + +def find_two_nearest(time_array, percentage): + """Finds indeces of the two values that are the nearest to the given value in an array""" array = np.asarray(time_array) - value = (np.max(array)-np.min(array))*percentage+np.min(array) + value = (np.max(array) - np.min(array)) * percentage + np.min(array) idx = (np.abs(array - value)).argmin() - indices = np.sort([np.argsort(np.abs(array-value))[0], np.argsort(np.abs(array-value))[1]]) - return indices + indices = np.sort([np.argsort(np.abs(array - value))[0], np.argsort(np.abs(array - value))[1]]) + return indices + def gauss(x, H, A, x0, sigma): """Returns gauss function value""" - return H + A * np.exp(-(x - x0) ** 2 / (2 * sigma ** 2)) + return H + A * np.exp(-((x - x0) ** 2) / (2 * sigma**2)) + def gauss_fit(x, y, fit_details=None, plot=None): - '''Returns [baseline_offset, Amplitude, center, sigma, FWHM]''' + """Returns [baseline_offset, Amplitude, center, sigma, FWHM]""" # Initial guesses mean = sum(x * y) / sum(y) @@ -318,34 +433,35 @@ def gauss_fit(x, y, fit_details=None, plot=None): popt, pcov = curve_fit(gauss, x, y, p0=[min(y), max(y), mean, sigma]) # Add FWHM to the ouptuts - popt = np.append(popt,2.35482 * popt[3]) + popt = np.append(popt, 2.35482 * popt[3]) # Print results - if fit_details : - print('The baseline offset is', popt[0]) - print('The center is', popt[2]) - print('The sigma of the fit is', popt[3]) - print('The maximum intensity is', popt[0] + popt[1]) - print('The Amplitude is', popt[1]) - print('The FWHM is', 2.35482 * popt[3]) + if fit_details: + print("The baseline offset is", popt[0]) + print("The center is", popt[2]) + print("The sigma of the fit is", popt[3]) + print("The maximum intensity is", popt[0] + popt[1]) + print("The Amplitude is", popt[1]) + print("The FWHM is", 2.35482 * popt[3]) # Show plot if plot: plt.figure() - plt.plot(x, y, '.k', label='data') - plt.plot(x, gauss(x, *gauss_fit(x, y)[0:4]), '--r', label='fit') + plt.plot(x, y, ".k", label="data") + plt.plot(x, gauss(x, *gauss_fit(x, y)[0:4]), "--r", label="fit") plt.legend() - plt.title('Gaussian fit') - plt.xlabel('X') - plt.ylabel('Y') + plt.title("Gaussian fit") + plt.xlabel("X") + plt.ylabel("Y") plt.show() return popt -def xray_transmission(energy,thickness,material='Si',density=[]): - '''Calculate x-ray tranmission for given energy, thickness and material. Default material is Si. Add material=element as a string for another material. Density as optional parameter''' - + +def xray_transmission(energy, thickness, material="Si", density=[]): + """Calculate x-ray tranmission for given energy, thickness and material. Default material is Si. Add material=element as a string for another material. Density as optional parameter""" + mu = material_mu(material, energy) if density == []: @@ -353,37 +469,46 @@ def xray_transmission(energy,thickness,material='Si',density=[]): else: mu_array = material_mu(formula, energy, density=density) - trans = np.exp(-0.1*(thickness*1000)*mu_array) # 0.1 is beccause mu is in 1/cm, thickness converted to mm from m + trans = np.exp( + -0.1 * (thickness * 1000) * mu_array + ) # 0.1 is beccause mu is in 1/cm, thickness converted to mm from m + + return trans - return trans ######################## Unit conversions ######################## + def joules_to_eV(joules): """Just a unit conversion""" eV = joules * 6.241509e18 return eV + def eV_to_joules(eV): """Just a unit conversion""" joules = eV * 1.602176565e-19 - return joules - + return joules + + def photon_energy_from_wavelength(wavelength): - '''Returns photon energy in eV from wavelength in meters. Source https://www.kmlabs.com/en/wavelength-to-photon-energy-calculator''' - Eph = 1239.8 / (wavelength*1e9) + """Returns photon energy in eV from wavelength in meters. Source https://www.kmlabs.com/en/wavelength-to-photon-energy-calculator""" + Eph = 1239.8 / (wavelength * 1e9) return Eph + def wavelength_from_photon_energy(Eph): - '''Returns wavelength in meters from photon energy in eV. Source https://www.kmlabs.com/en/wavelength-to-photon-energy-calculator''' - wavelength = 1239.8 / (Eph*1e9) + """Returns wavelength in meters from photon energy in eV. Source https://www.kmlabs.com/en/wavelength-to-photon-energy-calculator""" + wavelength = 1239.8 / (Eph * 1e9) return wavelength + def sigma_to_FWHM(sigma): """Gaussian sigma to FWHM""" FWHM = sigma * 2.355 return FWHM + def FWHM_to_sigma(FWHM): """FWHM to gaussian sigma""" sigma = FWHM / 2.355 diff --git a/tests/test_utils.py b/tests/test_utils.py index 68fc955..d138888 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,6 +1,8 @@ import pytest import os import numpy as np +import cristallina.utils as utils + from cristallina.utils import ROI, print_run_info, heuristic_extract_pgroup, gauss, find_nearest, xray_transmission @@ -10,13 +12,21 @@ def test_print(capsys): assert "17259343145" in captured.out assert "scan_parameters" in captured.out - + +def test_collect_metadata(): + test_pgroup_dir = "tests/data/p20841" + df = utils.collect_runs_metadata(test_pgroup_dir) + + assert df.iloc[1]["user_tag"] == "PMS, Magnet at 78K, 400V excitation" + assert df.iloc[1]["start_pulseid"] == 17358560870 + def test_ROI(): """API Tests""" r = ROI(left=1, right=2, top=4, bottom=2) assert r.width == 1 assert r.height == 2 + assert r.name is not None def test_extract_pgroup():