import sys, os try: thisFilePath = os.path.abspath(__file__) print('File path:',thisFilePath) except NameError: print("[Notice] The __file__ attribute is unavailable in this environment (e.g., Jupyter or IDLE).") print("When using a terminal, make sure the working directory is set to the script's location to prevent path issues (for the DIMA submodule)") #print("Otherwise, path to submodule DIMA may not be resolved properly.") thisFilePath = os.getcwd() # Use current directory or specify a default projectPath = os.path.normpath(os.path.join(thisFilePath, "..", "..",'..')) # Move up to project root #print('Project path:', projectPath) dimaPath = os.path.normpath('/'.join([projectPath,'dima'])) #print('DIMA path:', dimaPath) # Set up project root directory sys.path.insert(0,projectPath) sys.path.insert(0,dimaPath) #import importlib.util #print("Checking if projectPath exists:", os.path.exists(projectPath)) #if os.path.exists(projectPath): # print("Contents of dimaPath:", os.listdir(projectPath)) #print("Checking if Python can find 'dima':", importlib.util.find_spec("dima")) import numpy as np import pandas as pd from math import prod # To replace multiplyall import argparse import yaml import dima.src.hdf5_ops as dataOps import dima.utils.g5505_utils as utils import pipelines.steps.utils as stepUtils def compute_calibration_factors(data_table, datetime_var_name, calibration_params, calibration_factors): """ Computes calibration factor values for variables (excluding the time variable) in data_table. Parameters ---------- data_table : pd.DataFrame Data table containing time-series data. datetime_var_name : str Name of the datetime column in data_table. calibration_params : dict Dictionary containing calibration interval details. calibration_factors : dict Dictionary specifying numerator and denominator variables for calibration. Returns ------- pd.DataFrame DataFrame containing computed calibration factors for each variable. """ calibration_factors_dict = {} for variable_name in calibration_factors['variables']: #tmp = np.empty(shape=data_table[datetime_var_name].to_numpy().shape) print(variable_name) tmp = np.full(shape=data_table[datetime_var_name].shape, fill_value=np.nan) for interval_idx, interval_params in calibration_params['calibration_intervals'].items(): # Fixed typo t1 = pd.to_datetime(interval_params['start_datetime'], format = "%Y-%m-%d %H-%M-%S") t2 = pd.to_datetime(interval_params['end_datetime'], format = "%Y-%m-%d %H-%M-%S") t1_idx = abs(data_table[datetime_var_name] - t1).argmin() t2_idx = abs(data_table[datetime_var_name] - t2).argmin() if t1_idx <= t2_idx: numerator = prod(interval_params[key] for key in calibration_factors['variables'][variable_name]['num']) denominator = prod(interval_params[key] for key in calibration_factors['variables'][variable_name]['den']) if denominator == 0: raise ZeroDivisionError(f"Denominator is zero for '{variable_name}' in interval {t1} - {t2}") tmp[t1_idx:t2_idx] = numerator / denominator else: raise ValueError(f"Invalid calibration interval: start_datetime {t1} must be before end_datetime {t2}") calibration_factors_dict[variable_name] = tmp return pd.DataFrame(data=calibration_factors_dict) def load_calibration_file(calibration_file): # START YAML FILE VALIDATION # TODO : create a separate validation function with open(calibration_file, 'r') as stream: calibration_factors = yaml.load(stream, Loader=yaml.FullLoader) # Get path to file where calibrations params are defined path_to_calib_params_file = calibration_factors.get("calibration_params", {}).get('path_to_file') # Validate if not path_to_calib_params_file: raise ValueError(f'Invalid yaml file. {calibration_file} must contain "calibration_params" with a valid "path_to_file".') if not os.path.exists(path_to_calib_params_file): raise FileNotFoundError(f'Calibration parameters file not found: {path_to_calib_params_file}') with open(path_to_calib_params_file, 'r') as stream: calibration_params = yaml.load(stream, Loader=yaml.FullLoader) #calibration_params = calibration_params['calibration_intervals']['interval_1'] #for key in calibration_params['calibration_intervals']['interval_1']: # if not key in['start_datetime','end_datetime']: # calibration_params[key] = calibration_params['calibration_intervals']['interval_1'][key] # Get variable to calibration factors dictionary #calibration_factors = calibration_dict.get('variables',{}) #calibration_factors['calibration_params'].update(calibration_params) # TODO: perform a validation step before computing factors ### END YAML FILE VALIDATION return calibration_params, calibration_factors def apply_calibration_factors(data_table, datetime_var_name, calibration_file): """ Calibrates the species data in the given data table using a calibration factor. Parameters ---------- data_table (pd.DataFrame): The input data table with variables to calibrate. calibration_file (string): Calibration YAML file with a dictionary containing calibration factors for each variable in the data_table, where factors are specified in terms of their 'num' and 'den' values as dictionaries of multipliers. Returns ------- pd.DataFrame: A new data table with calibrated variables. """ # Make a copy of the input table to avoid modifying the original new_data_table = data_table.copy() calibration_params, calibration_factors = load_calibration_file(calibration_file) calibration_factor_table = compute_calibration_factors(new_data_table, datetime_var_name, calibration_params, calibration_factors) # Initialize a dictionary to rename variables variable_rename_dict = {} # Loop through the column names in the data table for variable in new_data_table.select_dtypes(include=["number"]).columns: if variable in calibration_factors['variables'].keys(): # use standard calibration factor # Apply calibration to each variable new_data_table[variable] = new_data_table[variable].mul(calibration_factor_table[variable]) # Add renaming entry variable_rename_dict[variable] = f"{variable}_correct" else: # use specifies dependent calibration factor print(f'There is no calibration factors for variable {variable}. The variable will remain the same.') # Rename the columns in the new data table new_data_table.rename(columns=variable_rename_dict, inplace=True) return calibration_factor_table, new_data_table if __name__ == '__main__': # Set up argument parsing parser = argparse.ArgumentParser(description="Calibrate species data using calibration factors.") parser.add_argument('data_file', type=str, help="Path to the input HDF5 file containing the data table.") parser.add_argument('dataset_name', type=str, help ='Relative path to data_table (i.e., dataset name) in HDF5 file') parser.add_argument('calibration_file', type=str, help="Path to the input YAML file containing calibration factors.") #parser.add_argument('output_file', type=str, help="Path to save the output calibrated data as a CSV file.") args = parser.parse_args() # Load input data and calibration factors try: #data_table = pd.read_json(args.data_file) print(args.data_file) dataManager = dataOps.HDF5DataOpsManager(args.data_file) dataManager.load_file_obj() dataset_name = '/'+args.dataset_name data_table = dataManager.extract_dataset_as_dataframe(dataset_name) datetime_var, datetime_format = dataManager.infer_datetime_variable(dataset_name) #data_table['t_start_Buf'] = data_table['t_start_Buf'].apply(lambda x : x.decode()) dataManager.extract_and_load_dataset_metadata() dataset_metadata_df = dataManager.dataset_metadata_df.copy() print(dataset_metadata_df.head()) dataset_name_idx = dataset_metadata_df.index[(dataset_metadata_df['dataset_name']==args.dataset_name).to_numpy()] data_table_metadata = dataset_metadata_df.loc[dataset_name_idx,:] parent_instrument = data_table_metadata.loc[dataset_name_idx,'parent_instrument'].values[0] parent_file = data_table_metadata.loc[dataset_name_idx,'parent_file'].values[0] print(parent_file) dataManager.unload_file_obj() print(args.calibration_file) if not any(item in args.calibration_file for item in ['.yaml','.yml']): raise TypeError(f"Invalid file type. Calibration file {args.calibration_file} needs to be a valid yaml file.") except Exception as e: print(f"Error loading input files: {e}") exit(1) path_to_output_dir, ext = os.path.splitext(args.data_file) print('Path to output directory :', path_to_output_dir) # Perform calibration try: # Define output directory of apply_calibration_factors() step suffix = 'processed' if len(parent_instrument.split('/')) >= 2: instFolder = parent_instrument.split('/')[0] category = parent_instrument.split('/')[1] else: instFolder = parent_instrument.split('/')[0] category = '' path_to_output_folder, ext = os.path.splitext('/'.join([path_to_output_dir,f'{instFolder}_{suffix}',category])) processingScriptRelPath = os.path.relpath(thisFilePath,start=projectPath) if not os.path.exists(path_to_output_folder): os.makedirs(path_to_output_folder) print(f'Processing script : {processingScriptRelPath}') print(f'Output directory : {path_to_output_folder}') # Apply calibration factors to input data_table and generate data lineage metadata calibration_factor_table, calibrated_table = apply_calibration_factors(data_table, datetime_var, args.calibration_file) #calibration_factors) metadata = {'actris_level' : 1, 'processing_script': processingScriptRelPath.replace(os.sep,'/'), 'processing_date' : utils.created_at()} # Save output tables to csv file and save/or update data lineage record filename, ext = os.path.splitext(parent_file) path_to_calibrated_file = '/'.join([path_to_output_folder, f'{filename}_calibrated.csv']) path_to_calibration_factors_file = '/'.join([path_to_output_folder, f'{filename}_calibration_factors.csv']) calibrated_table.to_csv(path_to_calibrated_file, index=False) calibration_factor_table.to_csv(path_to_calibration_factors_file, index=False) status = stepUtils.record_data_lineage(path_to_calibrated_file, projectPath, metadata) status = stepUtils.record_data_lineage(path_to_calibration_factors_file, projectPath, metadata) print('Calibration factors saved to', path_to_calibration_factors_file) print(f"Calibrated data saved to {path_to_calibrated_file}") print(f"Data lineage saved to {path_to_output_dir}") except Exception as e: print(f"Error during calibration: {e}") exit(1)