diff --git a/pipelines/steps/apply_calibration_factors.py b/pipelines/steps/apply_calibration_factors.py index eb7cd7e..b8e95dc 100644 --- a/pipelines/steps/apply_calibration_factors.py +++ b/pipelines/steps/apply_calibration_factors.py @@ -125,6 +125,7 @@ def load_calibration_file(calibration_factors_file): # Get path to file where calibrations params are defined path_to_calib_params_file = calibration_factors.get("calibration_params", {}).get('path_to_file') + path_to_calib_params_file = os.path.normpath(os.path.join(projectPath,path_to_calib_params_file)) # Validate if not path_to_calib_params_file: @@ -199,81 +200,50 @@ def apply_calibration_factors(data_table, datetime_var_name, calibration_factors return calibration_factor_table, new_data_table - - - -if __name__ == '__main__': - - # Set up argument parsing - parser = argparse.ArgumentParser(description="Calibrate species data using calibration factors.") - parser.add_argument('data_file', type=str, help="Path to the input HDF5 file containing the data table.") - #parser.add_argument('dataset_name', type=str, help ='Relative path to data_table (i.e., dataset name) in HDF5 file') - parser.add_argument('calibration_file', type=str, help="Path to the input YAML file containing calibration factors.") - #parser.add_argument('output_file', type=str, help="Path to save the output calibrated data as a CSV file.") - - args = parser.parse_args() - - - if not any(item in args.calibration_file for item in ['.yaml','.yml']): - raise TypeError(f"Invalid file type. Calibration file {args.calibration_file} needs to be a valid yaml file.") - +def main(data_file, calibration_file): + """Main function for processing the data with calibration.""" # Load input data and calibration factors try: - #data_table = pd.read_json(args.data_file) - - print(f'Openning data file : {args.data_file} using src.hdf5_ops.HDF5DataManager().') - - dataManager = dataOps.HDF5DataOpsManager(args.data_file) + print(f"Opening data file: {data_file} using src.hdf5_ops.HDF5DataOpsManager().") + dataManager = dataOps.HDF5DataOpsManager(data_file) dataManager.load_file_obj() dataManager.extract_and_load_dataset_metadata() dataset_metadata_df = dataManager.dataset_metadata_df.copy() - - keywords = ['ACSM_TOFWARE/','ACSM_JFJ_','_timeseries.txt/data_table'] + keywords = ['ACSM_TOFWARE/', 'ACSM_JFJ_', '_timeseries.txt/data_table'] find_keyword = [all(keyword in item for keyword in keywords) for item in dataset_metadata_df['dataset_name']] - if sum(find_keyword)!=1: + if sum(find_keyword) != 1: input_file_name = ''.join(keywords) raise RuntimeError(f'Input file {input_file_name} was neither found nor uniquely identified.') dataset_name = dataset_metadata_df['dataset_name'][find_keyword].values[0] - parent_file = dataset_metadata_df.loc[find_keyword,'parent_file'].values[0] - parent_instrument = dataset_metadata_df.loc[find_keyword,'parent_instrument'].values[0] + parent_file = dataset_metadata_df.loc[find_keyword, 'parent_file'].values[0] + parent_instrument = dataset_metadata_df.loc[find_keyword, 'parent_instrument'].values[0] - #dataset_name = '/'+args.dataset_name data_table = dataManager.extract_dataset_as_dataframe(dataset_name) datetime_var, datetime_format = dataManager.infer_datetime_variable(dataset_name) print(dataset_metadata_df.head()) - print(parent_file) - print(args.calibration_file) + print(parent_file) + print(calibration_file) + except Exception as e: print(f"Error loading input files: {e}") - exit(1) - finally: - dataManager.unload_file_obj() - print(f'Closing data file : {args.data_file} to unlock the file.') - + finally: + dataManager.unload_file_obj() + print(f'Closing data file: {data_file} to unlock the file.') - path_to_output_dir, ext = os.path.splitext(args.data_file) - - print('Path to output directory :', path_to_output_dir) - - # Count the number of NaT (null) values + # Count NaT values and calculate percentage num_nats = data_table[datetime_var].isna().sum() - # Get the total number of rows total_rows = len(data_table) - # Calculate the percentage of NaT values percentage_nats = (num_nats / total_rows) * 100 print(f"Total rows: {total_rows}") print(f"NaT (missing) values: {num_nats}") print(f"Percentage of data loss: {percentage_nats:.4f}%") - # Perform calibration try: - # Define output directory of apply_calibration_factors() step - suffix = 'processed' if len(parent_instrument.split('/')) >= 2: instFolder = parent_instrument.split('/')[0] @@ -282,50 +252,66 @@ if __name__ == '__main__': instFolder = parent_instrument.split('/')[0] category = '' - path_to_output_folder, ext = os.path.splitext('/'.join([path_to_output_dir,f'{instFolder}_{suffix}',category])) - processingScriptRelPath = os.path.relpath(thisFilePath,start=projectPath) - + path_to_output_dir, _ = os.path.splitext(data_file) + path_to_output_folder = os.path.join(path_to_output_dir, f"{instFolder}_{suffix}", category) if not os.path.exists(path_to_output_folder): os.makedirs(path_to_output_folder) - print(f'Processing script : {processingScriptRelPath}') - print(f'Output directory : {path_to_output_folder}') - + print(f"Output directory: {path_to_output_folder}") + # Apply calibration factors to input data_table and generate data lineage metadata - calibration_factor_table, calibrated_table = apply_calibration_factors(data_table, datetime_var, args.calibration_file) #calibration_factors) - calibrated_table_err = generate_error_dataframe(calibrated_table, datetime_var) + calibration_factor_table, calibrated_table = apply_calibration_factors(data_table, datetime_var, calibration_file) + calibrated_table_err = generate_error_dataframe(calibrated_table, datetime_var) - # Define suffix to dataframe map to streamline dataframe export - suffix_to_dataframe_dict = {'calibrated.csv': calibrated_table, - 'calibrated_err.csv': calibrated_table_err, - 'calibration_factors.csv': calibration_factor_table} + suffix_to_dataframe_dict = { + 'calibrated.csv': calibrated_table, + 'calibrated_err.csv': calibrated_table_err, + 'calibration_factors.csv': calibration_factor_table + } - metadata = {'actris_level': 1, - 'processing_script': processingScriptRelPath.replace(os.sep, '/'), - 'processing_date': utils.created_at(), - 'datetime_var': datetime_var} + metadata = { + 'actris_level': 1, + 'processing_script': os.path.relpath(__file__, start=os.getcwd()), + 'processing_date': utils.created_at(), + 'datetime_var': datetime_var + } - # Save output tables to csv file and save/or update data lineage record - filename, ext = os.path.splitext(parent_file) - if not ext: - filename += '.csv' # Handle edge case for missing file extension + # Save output tables to CSV and record data lineage + filename, _ = os.path.splitext(parent_file) + if not _: + filename += '.csv' for suffix, data_table in suffix_to_dataframe_dict.items(): - # Form path to output file path_to_output_file = os.path.join(path_to_output_folder, f'{filename}_{suffix}') - try: data_table.to_csv(path_to_output_file, index=False) print(f"Saved {filename}_{suffix} to {path_to_output_folder}") except Exception as e: print(f"Failed to save {path_to_output_file} due to: {e}") continue - + # Record data lineage metadata['suffix'] = suffix - status = stepUtils.record_data_lineage(path_to_output_file, projectPath, metadata) - + stepUtils.record_data_lineage(path_to_output_file, os.getcwd(), metadata) except Exception as e: print(f"Error during calibration: {e}") - exit(1) \ No newline at end of file + exit(1) + +if __name__ == '__main__': + # Set up argument parsing + parser = argparse.ArgumentParser(description="Calibrate species data using calibration factors.") + parser.add_argument('data_file', type=str, help="Path to the input HDF5 file containing the data table.") + #parser.add_argument('dataset_name', type=str, help ='Relative path to data_table (i.e., dataset name) in HDF5 file') + parser.add_argument('calibration_file', type=str, help="Path to the input YAML file containing calibration factors.") + #parser.add_argument('output_file', type=str, help="Path to save the output calibrated data as a CSV file.") + args = parser.parse_args() + + if not any(item in args.calibration_file for item in ['.yaml', '.yml']): + raise TypeError(f"Invalid file type. Calibration file {args.calibration_file} needs to be a valid YAML file.") + + main(args.data_file, args.calibration_file) + + + +