diff --git a/pipelines/steps/apply_calibration_factors.py b/pipelines/steps/apply_calibration_factors.py index 1c653ab..bd2d319 100644 --- a/pipelines/steps/apply_calibration_factors.py +++ b/pipelines/steps/apply_calibration_factors.py @@ -26,51 +26,146 @@ sys.path.append(dimaPath) import dima.src.hdf5_ops as dataOps +import numpy as np +import pandas as pd +from math import prod + +def compute_calibration_factors(data_table, datetime_var_name, calibration_params, calibration_factors): + """ + Computes calibration factor values for variables (excluding the time variable) in data_table. + + Parameters + ---------- + data_table : pd.DataFrame + Data table containing time-series data. + datetime_var_name : str + Name of the datetime column in data_table. + calibration_params : dict + Dictionary containing calibration interval details. + calibration_factors : dict + Dictionary specifying numerator and denominator variables for calibration. + + Returns + ------- + pd.DataFrame + DataFrame containing computed calibration factors for each variable. + """ + + calibration_factors_dict = {} + for variable_name in calibration_factors['variables']: + #tmp = np.empty(shape=data_table[datetime_var_name].to_numpy().shape) + + print(variable_name) + + tmp = np.full(shape=data_table[datetime_var_name].shape, fill_value=np.nan) + + + for interval_idx, interval_params in calibration_params['calibration_intervals'].items(): # Fixed typo + t1 = pd.to_datetime(interval_params['start_datetime'], format = "%Y-%m-%d %H-%M-%S") + t2 = pd.to_datetime(interval_params['end_datetime'], format = "%Y-%m-%d %H-%M-%S") + + t1_idx = abs(data_table[datetime_var_name] - t1).argmin() + t2_idx = abs(data_table[datetime_var_name] - t2).argmin() + + if t1_idx <= t2_idx: + numerator = prod(interval_params[key] for key in calibration_factors['variables'][variable_name]['num']) + denominator = prod(interval_params[key] for key in calibration_factors['variables'][variable_name]['den']) + + if denominator == 0: + raise ZeroDivisionError(f"Denominator is zero for '{variable_name}' in interval {t1} - {t2}") + + tmp[t1_idx:t2_idx] = numerator / denominator + else: + raise ValueError(f"Invalid calibration interval: start_datetime {t1} must be before end_datetime {t2}") + + calibration_factors_dict[variable_name] = tmp + + return pd.DataFrame(data=calibration_factors_dict) -def apply_calibration_factors(data_table, calibration_factors): +def load_calibration_file(calibration_file): + + # START YAML FILE VALIDATION + # TODO : create a separate validation function + with open(calibration_file, 'r') as stream: + calibration_factors = yaml.load(stream, Loader=yaml.FullLoader) + + # Get path to file where calibrations params are defined + path_to_calib_params_file = calibration_factors.get("calibration_params", {}).get('path_to_file') + + # Validate + if not path_to_calib_params_file: + raise ValueError(f'Invalid yaml file. {calibration_file} must contain "calibration_params" with a valid "path_to_file".') + + if not os.path.exists(path_to_calib_params_file): + raise FileNotFoundError(f'Calibration parameters file not found: {path_to_calib_params_file}') + + with open(path_to_calib_params_file, 'r') as stream: + calibration_params = yaml.load(stream, Loader=yaml.FullLoader) + + #calibration_params = calibration_params['calibration_intervals']['interval_1'] + #for key in calibration_params['calibration_intervals']['interval_1']: + # if not key in['start_datetime','end_datetime']: + # calibration_params[key] = calibration_params['calibration_intervals']['interval_1'][key] + + # Get variable to calibration factors dictionary + #calibration_factors = calibration_dict.get('variables',{}) + #calibration_factors['calibration_params'].update(calibration_params) + # TODO: perform a validation step before computing factors + ### END YAML FILE VALIDATION + + return calibration_params, calibration_factors + +def apply_calibration_factors(data_table, datetime_var_name, calibration_file): """ Calibrates the species data in the given data table using a calibration factor. - Parameters: + Parameters + ---------- data_table (pd.DataFrame): The input data table with variables to calibrate. - calibration_factor (dict): Dictionary containing 'standard' calibration factors - with 'num' and 'den' values as dictionaries of multipliers. + calibration_file (string): Calibration YAML file with a dictionary containing calibration factors for each variable in + the data_table, where factors are specified in terms of their + 'num' and 'den' values as dictionaries of multipliers. - Returns: + Returns + ------- pd.DataFrame: A new data table with calibrated variables. """ # Make a copy of the input table to avoid modifying the original new_data_table = data_table.copy() + calibration_params, calibration_factors = load_calibration_file(calibration_file) + + + calibration_factor_table = compute_calibration_factors(new_data_table, + datetime_var_name, + calibration_params, + calibration_factors) + # Initialize a dictionary to rename variables variable_rename_dict = {} - # Loop through the column names in the data table - for variable_name in new_data_table.select_dtypes(include=["number"]).columns: + for variable in new_data_table.select_dtypes(include=["number"]).columns: - if variable_name in calibration_factors['variables'].keys(): # use standard calibration factor - - #print(variable_name) - # Extract numerator and denominator values - numerator = prod(calibration_factors['variables'][variable_name]['num']) - denominator = prod(calibration_factors['variables'][variable_name]['den']) + if variable in calibration_factors['variables'].keys(): # use standard calibration factor # Apply calibration to each variable - new_data_table[variable_name] = new_data_table[variable_name].mul((numerator / denominator)) + new_data_table[variable] = new_data_table[variable].mul(calibration_factor_table[variable]) # Add renaming entry - variable_rename_dict[variable_name] = f"{variable_name}_correct" + variable_rename_dict[variable] = f"{variable}_correct" else: # use specifies dependent calibration factor - print(f'There is no calibration factors for variable {variable_name}. The variable will remain the same.') + print(f'There is no calibration factors for variable {variable}. The variable will remain the same.') # Rename the columns in the new data table new_data_table.rename(columns=variable_rename_dict, inplace=True) - return new_data_table + return calibration_factor_table, new_data_table + + def record_data_lineage(path_to_output_file, metadata): @@ -109,7 +204,7 @@ if __name__ == '__main__': parser.add_argument('data_file', type=str, help="Path to the input HDF5 file containing the data table.") parser.add_argument('dataset_name', type=str, help ='Relative path to data_table (i.e., dataset name) in HDF5 file') parser.add_argument('calibration_file', type=str, help="Path to the input YAML file containing calibration factors.") - parser.add_argument('output_file', type=str, help="Path to save the output calibrated data as a CSV file.") + #parser.add_argument('output_file', type=str, help="Path to save the output calibrated data as a CSV file.") args = parser.parse_args() @@ -123,6 +218,9 @@ if __name__ == '__main__': dataManager.load_file_obj() dataset_name = '/'+args.dataset_name data_table = dataManager.extract_dataset_as_dataframe('/'+args.dataset_name) + datetime_var, datetime_format = dataManager.infer_datetime_variable(dataset_name) + + #data_table['t_start_Buf'] = data_table['t_start_Buf'].apply(lambda x : x.decode()) dataManager.extract_and_load_dataset_metadata() dataset_metadata_df = dataManager.dataset_metadata_df.copy() @@ -132,12 +230,14 @@ if __name__ == '__main__': data_table_metadata = dataset_metadata_df.loc[dataset_name_idx,:] parent_instrument = data_table_metadata.loc[dataset_name_idx,'parent_instrument'].values[0] parent_file = data_table_metadata.loc[dataset_name_idx,'parent_file'].values[0] + + print(parent_file) dataManager.unload_file_obj() print(args.calibration_file) - with open(args.calibration_file, 'r') as stream: - calibration_factors = yaml.load(stream, Loader=yaml.FullLoader) + if not any(item in args.calibration_file for item in ['.yaml','.yml']): + raise TypeError(f"Invalid file type. Calibration file {args.calibration_file} needs to be a valid yaml file.") except Exception as e: print(f"Error loading input files: {e}") exit(1) @@ -159,6 +259,10 @@ if __name__ == '__main__': path_to_output_file, ext = os.path.splitext('/'.join([path_to_output_dir,parent_instrument,parent_file])) path_to_calibrated_file = ''.join([path_to_output_file, '_calibrated.csv']) + + path_to_calibration_factors_file = ''.join([path_to_output_file, '_calibration_factors.csv']) + + #path_tail, path_head = os.path.split(path_to_calibrated_file) #path_to_metadata_file = '/'.join([path_tail, 'data_lineage_metadata.json']) @@ -166,11 +270,13 @@ if __name__ == '__main__': print('Path to output file :', path_to_calibrated_file) import dima.utils.g5505_utils as utils import json - calibrated_table = apply_calibration_factors(data_table, calibration_factors) + calibration_factor_table, calibrated_table = apply_calibration_factors(data_table, datetime_var, args.calibration_file) #calibration_factors) metadata['processing_date'] = utils.created_at() calibrated_table.to_csv(path_to_calibrated_file, index=False) + calibration_factor_table.to_csv(path_to_calibration_factors_file, index=False) status = record_data_lineage(path_to_calibrated_file, metadata) + status = record_data_lineage(path_to_calibration_factors_file, metadata) print(f"Calibrated data saved to {path_to_calibrated_file}")