mirror of
https://gitea.psi.ch/APOG/acsm-fairifier.git
synced 2025-07-08 17:08:03 +02:00
Reimplement apply_calibration_factors() in terms of load_calibration_file() and compute_calibration_factors() to reduce complexity and improve modularity.
This commit is contained in:
@ -26,51 +26,146 @@ sys.path.append(dimaPath)
|
|||||||
|
|
||||||
import dima.src.hdf5_ops as dataOps
|
import dima.src.hdf5_ops as dataOps
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
import pandas as pd
|
||||||
|
from math import prod
|
||||||
|
|
||||||
|
def compute_calibration_factors(data_table, datetime_var_name, calibration_params, calibration_factors):
|
||||||
|
"""
|
||||||
|
Computes calibration factor values for variables (excluding the time variable) in data_table.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
data_table : pd.DataFrame
|
||||||
|
Data table containing time-series data.
|
||||||
|
datetime_var_name : str
|
||||||
|
Name of the datetime column in data_table.
|
||||||
|
calibration_params : dict
|
||||||
|
Dictionary containing calibration interval details.
|
||||||
|
calibration_factors : dict
|
||||||
|
Dictionary specifying numerator and denominator variables for calibration.
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
pd.DataFrame
|
||||||
|
DataFrame containing computed calibration factors for each variable.
|
||||||
|
"""
|
||||||
|
|
||||||
|
calibration_factors_dict = {}
|
||||||
|
for variable_name in calibration_factors['variables']:
|
||||||
|
#tmp = np.empty(shape=data_table[datetime_var_name].to_numpy().shape)
|
||||||
|
|
||||||
|
print(variable_name)
|
||||||
|
|
||||||
|
tmp = np.full(shape=data_table[datetime_var_name].shape, fill_value=np.nan)
|
||||||
|
|
||||||
|
|
||||||
def apply_calibration_factors(data_table, calibration_factors):
|
for interval_idx, interval_params in calibration_params['calibration_intervals'].items(): # Fixed typo
|
||||||
|
t1 = pd.to_datetime(interval_params['start_datetime'], format = "%Y-%m-%d %H-%M-%S")
|
||||||
|
t2 = pd.to_datetime(interval_params['end_datetime'], format = "%Y-%m-%d %H-%M-%S")
|
||||||
|
|
||||||
|
t1_idx = abs(data_table[datetime_var_name] - t1).argmin()
|
||||||
|
t2_idx = abs(data_table[datetime_var_name] - t2).argmin()
|
||||||
|
|
||||||
|
if t1_idx <= t2_idx:
|
||||||
|
numerator = prod(interval_params[key] for key in calibration_factors['variables'][variable_name]['num'])
|
||||||
|
denominator = prod(interval_params[key] for key in calibration_factors['variables'][variable_name]['den'])
|
||||||
|
|
||||||
|
if denominator == 0:
|
||||||
|
raise ZeroDivisionError(f"Denominator is zero for '{variable_name}' in interval {t1} - {t2}")
|
||||||
|
|
||||||
|
tmp[t1_idx:t2_idx] = numerator / denominator
|
||||||
|
else:
|
||||||
|
raise ValueError(f"Invalid calibration interval: start_datetime {t1} must be before end_datetime {t2}")
|
||||||
|
|
||||||
|
calibration_factors_dict[variable_name] = tmp
|
||||||
|
|
||||||
|
return pd.DataFrame(data=calibration_factors_dict)
|
||||||
|
|
||||||
|
|
||||||
|
def load_calibration_file(calibration_file):
|
||||||
|
|
||||||
|
# START YAML FILE VALIDATION
|
||||||
|
# TODO : create a separate validation function
|
||||||
|
with open(calibration_file, 'r') as stream:
|
||||||
|
calibration_factors = yaml.load(stream, Loader=yaml.FullLoader)
|
||||||
|
|
||||||
|
# Get path to file where calibrations params are defined
|
||||||
|
path_to_calib_params_file = calibration_factors.get("calibration_params", {}).get('path_to_file')
|
||||||
|
|
||||||
|
# Validate
|
||||||
|
if not path_to_calib_params_file:
|
||||||
|
raise ValueError(f'Invalid yaml file. {calibration_file} must contain "calibration_params" with a valid "path_to_file".')
|
||||||
|
|
||||||
|
if not os.path.exists(path_to_calib_params_file):
|
||||||
|
raise FileNotFoundError(f'Calibration parameters file not found: {path_to_calib_params_file}')
|
||||||
|
|
||||||
|
with open(path_to_calib_params_file, 'r') as stream:
|
||||||
|
calibration_params = yaml.load(stream, Loader=yaml.FullLoader)
|
||||||
|
|
||||||
|
#calibration_params = calibration_params['calibration_intervals']['interval_1']
|
||||||
|
#for key in calibration_params['calibration_intervals']['interval_1']:
|
||||||
|
# if not key in['start_datetime','end_datetime']:
|
||||||
|
# calibration_params[key] = calibration_params['calibration_intervals']['interval_1'][key]
|
||||||
|
|
||||||
|
# Get variable to calibration factors dictionary
|
||||||
|
#calibration_factors = calibration_dict.get('variables',{})
|
||||||
|
#calibration_factors['calibration_params'].update(calibration_params)
|
||||||
|
# TODO: perform a validation step before computing factors
|
||||||
|
### END YAML FILE VALIDATION
|
||||||
|
|
||||||
|
return calibration_params, calibration_factors
|
||||||
|
|
||||||
|
def apply_calibration_factors(data_table, datetime_var_name, calibration_file):
|
||||||
"""
|
"""
|
||||||
Calibrates the species data in the given data table using a calibration factor.
|
Calibrates the species data in the given data table using a calibration factor.
|
||||||
|
|
||||||
Parameters:
|
Parameters
|
||||||
|
----------
|
||||||
data_table (pd.DataFrame): The input data table with variables to calibrate.
|
data_table (pd.DataFrame): The input data table with variables to calibrate.
|
||||||
calibration_factor (dict): Dictionary containing 'standard' calibration factors
|
calibration_file (string): Calibration YAML file with a dictionary containing calibration factors for each variable in
|
||||||
with 'num' and 'den' values as dictionaries of multipliers.
|
the data_table, where factors are specified in terms of their
|
||||||
|
'num' and 'den' values as dictionaries of multipliers.
|
||||||
|
|
||||||
Returns:
|
Returns
|
||||||
|
-------
|
||||||
pd.DataFrame: A new data table with calibrated variables.
|
pd.DataFrame: A new data table with calibrated variables.
|
||||||
"""
|
"""
|
||||||
# Make a copy of the input table to avoid modifying the original
|
# Make a copy of the input table to avoid modifying the original
|
||||||
new_data_table = data_table.copy()
|
new_data_table = data_table.copy()
|
||||||
|
|
||||||
|
calibration_params, calibration_factors = load_calibration_file(calibration_file)
|
||||||
|
|
||||||
|
|
||||||
|
calibration_factor_table = compute_calibration_factors(new_data_table,
|
||||||
|
datetime_var_name,
|
||||||
|
calibration_params,
|
||||||
|
calibration_factors)
|
||||||
|
|
||||||
# Initialize a dictionary to rename variables
|
# Initialize a dictionary to rename variables
|
||||||
variable_rename_dict = {}
|
variable_rename_dict = {}
|
||||||
|
|
||||||
# Loop through the column names in the data table
|
# Loop through the column names in the data table
|
||||||
for variable_name in new_data_table.select_dtypes(include=["number"]).columns:
|
for variable in new_data_table.select_dtypes(include=["number"]).columns:
|
||||||
|
|
||||||
if variable_name in calibration_factors['variables'].keys(): # use standard calibration factor
|
if variable in calibration_factors['variables'].keys(): # use standard calibration factor
|
||||||
|
|
||||||
#print(variable_name)
|
|
||||||
# Extract numerator and denominator values
|
|
||||||
numerator = prod(calibration_factors['variables'][variable_name]['num'])
|
|
||||||
denominator = prod(calibration_factors['variables'][variable_name]['den'])
|
|
||||||
|
|
||||||
# Apply calibration to each variable
|
# Apply calibration to each variable
|
||||||
new_data_table[variable_name] = new_data_table[variable_name].mul((numerator / denominator))
|
new_data_table[variable] = new_data_table[variable].mul(calibration_factor_table[variable])
|
||||||
|
|
||||||
# Add renaming entry
|
# Add renaming entry
|
||||||
variable_rename_dict[variable_name] = f"{variable_name}_correct"
|
variable_rename_dict[variable] = f"{variable}_correct"
|
||||||
|
|
||||||
else: # use specifies dependent calibration factor
|
else: # use specifies dependent calibration factor
|
||||||
print(f'There is no calibration factors for variable {variable_name}. The variable will remain the same.')
|
print(f'There is no calibration factors for variable {variable}. The variable will remain the same.')
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# Rename the columns in the new data table
|
# Rename the columns in the new data table
|
||||||
new_data_table.rename(columns=variable_rename_dict, inplace=True)
|
new_data_table.rename(columns=variable_rename_dict, inplace=True)
|
||||||
|
|
||||||
return new_data_table
|
return calibration_factor_table, new_data_table
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def record_data_lineage(path_to_output_file, metadata):
|
def record_data_lineage(path_to_output_file, metadata):
|
||||||
|
|
||||||
@ -109,7 +204,7 @@ if __name__ == '__main__':
|
|||||||
parser.add_argument('data_file', type=str, help="Path to the input HDF5 file containing the data table.")
|
parser.add_argument('data_file', type=str, help="Path to the input HDF5 file containing the data table.")
|
||||||
parser.add_argument('dataset_name', type=str, help ='Relative path to data_table (i.e., dataset name) in HDF5 file')
|
parser.add_argument('dataset_name', type=str, help ='Relative path to data_table (i.e., dataset name) in HDF5 file')
|
||||||
parser.add_argument('calibration_file', type=str, help="Path to the input YAML file containing calibration factors.")
|
parser.add_argument('calibration_file', type=str, help="Path to the input YAML file containing calibration factors.")
|
||||||
parser.add_argument('output_file', type=str, help="Path to save the output calibrated data as a CSV file.")
|
#parser.add_argument('output_file', type=str, help="Path to save the output calibrated data as a CSV file.")
|
||||||
|
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
@ -123,6 +218,9 @@ if __name__ == '__main__':
|
|||||||
dataManager.load_file_obj()
|
dataManager.load_file_obj()
|
||||||
dataset_name = '/'+args.dataset_name
|
dataset_name = '/'+args.dataset_name
|
||||||
data_table = dataManager.extract_dataset_as_dataframe('/'+args.dataset_name)
|
data_table = dataManager.extract_dataset_as_dataframe('/'+args.dataset_name)
|
||||||
|
datetime_var, datetime_format = dataManager.infer_datetime_variable(dataset_name)
|
||||||
|
|
||||||
|
#data_table['t_start_Buf'] = data_table['t_start_Buf'].apply(lambda x : x.decode())
|
||||||
|
|
||||||
dataManager.extract_and_load_dataset_metadata()
|
dataManager.extract_and_load_dataset_metadata()
|
||||||
dataset_metadata_df = dataManager.dataset_metadata_df.copy()
|
dataset_metadata_df = dataManager.dataset_metadata_df.copy()
|
||||||
@ -133,11 +231,13 @@ if __name__ == '__main__':
|
|||||||
parent_instrument = data_table_metadata.loc[dataset_name_idx,'parent_instrument'].values[0]
|
parent_instrument = data_table_metadata.loc[dataset_name_idx,'parent_instrument'].values[0]
|
||||||
parent_file = data_table_metadata.loc[dataset_name_idx,'parent_file'].values[0]
|
parent_file = data_table_metadata.loc[dataset_name_idx,'parent_file'].values[0]
|
||||||
|
|
||||||
|
print(parent_file)
|
||||||
|
|
||||||
dataManager.unload_file_obj()
|
dataManager.unload_file_obj()
|
||||||
print(args.calibration_file)
|
print(args.calibration_file)
|
||||||
|
|
||||||
with open(args.calibration_file, 'r') as stream:
|
if not any(item in args.calibration_file for item in ['.yaml','.yml']):
|
||||||
calibration_factors = yaml.load(stream, Loader=yaml.FullLoader)
|
raise TypeError(f"Invalid file type. Calibration file {args.calibration_file} needs to be a valid yaml file.")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error loading input files: {e}")
|
print(f"Error loading input files: {e}")
|
||||||
exit(1)
|
exit(1)
|
||||||
@ -160,17 +260,23 @@ if __name__ == '__main__':
|
|||||||
path_to_output_file, ext = os.path.splitext('/'.join([path_to_output_dir,parent_instrument,parent_file]))
|
path_to_output_file, ext = os.path.splitext('/'.join([path_to_output_dir,parent_instrument,parent_file]))
|
||||||
path_to_calibrated_file = ''.join([path_to_output_file, '_calibrated.csv'])
|
path_to_calibrated_file = ''.join([path_to_output_file, '_calibrated.csv'])
|
||||||
|
|
||||||
|
path_to_calibration_factors_file = ''.join([path_to_output_file, '_calibration_factors.csv'])
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
#path_tail, path_head = os.path.split(path_to_calibrated_file)
|
#path_tail, path_head = os.path.split(path_to_calibrated_file)
|
||||||
#path_to_metadata_file = '/'.join([path_tail, 'data_lineage_metadata.json'])
|
#path_to_metadata_file = '/'.join([path_tail, 'data_lineage_metadata.json'])
|
||||||
|
|
||||||
print('Path to output file :', path_to_calibrated_file)
|
print('Path to output file :', path_to_calibrated_file)
|
||||||
import dima.utils.g5505_utils as utils
|
import dima.utils.g5505_utils as utils
|
||||||
import json
|
import json
|
||||||
calibrated_table = apply_calibration_factors(data_table, calibration_factors)
|
calibration_factor_table, calibrated_table = apply_calibration_factors(data_table, datetime_var, args.calibration_file) #calibration_factors)
|
||||||
metadata['processing_date'] = utils.created_at()
|
metadata['processing_date'] = utils.created_at()
|
||||||
calibrated_table.to_csv(path_to_calibrated_file, index=False)
|
calibrated_table.to_csv(path_to_calibrated_file, index=False)
|
||||||
|
calibration_factor_table.to_csv(path_to_calibration_factors_file, index=False)
|
||||||
|
|
||||||
status = record_data_lineage(path_to_calibrated_file, metadata)
|
status = record_data_lineage(path_to_calibrated_file, metadata)
|
||||||
|
status = record_data_lineage(path_to_calibration_factors_file, metadata)
|
||||||
|
|
||||||
|
|
||||||
print(f"Calibrated data saved to {path_to_calibrated_file}")
|
print(f"Calibrated data saved to {path_to_calibrated_file}")
|
||||||
|
Reference in New Issue
Block a user