diff --git a/pipelines/steps/apply_calibration_factors.py b/pipelines/steps/apply_calibration_factors.py index 820d0da..0d3e756 100644 --- a/pipelines/steps/apply_calibration_factors.py +++ b/pipelines/steps/apply_calibration_factors.py @@ -36,7 +36,7 @@ import yaml import dima.src.hdf5_ops as dataOps import dima.utils.g5505_utils as utils import pipelines.steps.utils as stepUtils - +from pipelines.steps.utils import generate_error_dataframe def compute_calibration_factors(data_table, datetime_var_name, calibration_params, calibration_factors): """ @@ -300,27 +300,39 @@ if __name__ == '__main__': print(f'Output directory : {path_to_output_folder}') # Apply calibration factors to input data_table and generate data lineage metadata - calibration_factor_table, calibrated_table = apply_calibration_factors(data_table, datetime_var, args.calibration_file) #calibration_factors) - - metadata = {'actris_level' : 1, - 'processing_script': processingScriptRelPath.replace(os.sep,'/'), - 'processing_date' : utils.created_at(), + calibration_factor_table, calibrated_table = apply_calibration_factors(data_table, datetime_var, args.calibration_file) #calibration_factors) + calibrated_table_err = generate_error_dataframe(calibrated_table, datetime_var) + + # Define suffix to dataframe map to streamline dataframe export + suffix_to_dataframe_dict = {'calibrated.csv': calibrated_table, + 'calibrated_err.csv': calibrated_table_err, + 'calibration_factors.csv': calibration_factor_table} + + metadata = {'actris_level': 1, + 'processing_script': processingScriptRelPath.replace(os.sep, '/'), + 'processing_date': utils.created_at(), 'datetime_var': datetime_var} - + # Save output tables to csv file and save/or update data lineage record filename, ext = os.path.splitext(parent_file) - path_to_calibrated_file = '/'.join([path_to_output_folder, f'{filename}_calibrated.csv']) - path_to_calibration_factors_file = '/'.join([path_to_output_folder, f'{filename}_calibration_factors.csv']) - - calibrated_table.to_csv(path_to_calibrated_file, index=False) - calibration_factor_table.to_csv(path_to_calibration_factors_file, index=False) - - status = stepUtils.record_data_lineage(path_to_calibrated_file, projectPath, metadata) - status = stepUtils.record_data_lineage(path_to_calibration_factors_file, projectPath, metadata) + if not ext: + filename += '.csv' # Handle edge case for missing file extension + + for suffix, data_table in suffix_to_dataframe_dict.items(): + # Form path to output file + path_to_output_file = os.path.join(path_to_output_folder, f'{filename}_{suffix}') + + try: + data_table.to_csv(path_to_output_file, index=False) + print(f"Saved {filename}_{suffix} to {path_to_output_folder}") + except Exception as e: + print(f"Failed to save {path_to_output_file} due to: {e}") + continue + + # Record data lineage + metadata['suffix'] = suffix + status = stepUtils.record_data_lineage(path_to_output_file, projectPath, metadata) - print('Calibration factors saved to', path_to_calibration_factors_file) - print(f"Calibrated data saved to {path_to_calibrated_file}") - print(f"Data lineage saved to {path_to_output_dir}") except Exception as e: print(f"Error during calibration: {e}")