diff --git a/pipelines/steps/__init__.py b/pipelines/steps/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pipelines/steps/apply_calibration_factors.py b/pipelines/steps/apply_calibration_factors.py index bd2d319..3d5bb7f 100644 --- a/pipelines/steps/apply_calibration_factors.py +++ b/pipelines/steps/apply_calibration_factors.py @@ -25,6 +25,7 @@ import yaml sys.path.append(dimaPath) import dima.src.hdf5_ops as dataOps +import pipelines.steps.utils as stepUtils import numpy as np import pandas as pd @@ -167,35 +168,7 @@ def apply_calibration_factors(data_table, datetime_var_name, calibration_file): -def record_data_lineage(path_to_output_file, metadata): - path_to_output_dir, output_file = os.path.split(path_to_output_file) - - path_to_metadata_file = '/'.join([path_to_output_dir,'data_lineage_metadata.json']) - # Ensure the file exists - if not os.path.exists(path_to_metadata_file): - with open(path_to_metadata_file, 'w') as f: - json.dump({}, f) # Initialize empty JSON - - # Read the existing JSON - with open(path_to_metadata_file, 'r') as metadata_file: - try: - json_dict = json.load(metadata_file) - except json.JSONDecodeError: - json_dict = {} # Start fresh if file is invalid - - # Compute relative output file path and update the JSON object - relpath_to_output_file = os.path.relpath(path_to_output_file, start=projectPath).replace(os.sep, '/') - json_dict[relpath_to_output_file] = metadata - - # Write updated JSON back to the file - with open(path_to_metadata_file, 'w') as metadata_file: - json.dump(json_dict, metadata_file, indent=4) - - - print(f"Metadata for calibrated data saved to {path_to_metadata_file}") - - return 0 if __name__ == '__main__': @@ -275,8 +248,8 @@ if __name__ == '__main__': calibrated_table.to_csv(path_to_calibrated_file, index=False) calibration_factor_table.to_csv(path_to_calibration_factors_file, index=False) - status = record_data_lineage(path_to_calibrated_file, metadata) - status = record_data_lineage(path_to_calibration_factors_file, metadata) + status = stepUtils.record_data_lineage(path_to_calibrated_file, projectPath, metadata) + status = stepUtils.record_data_lineage(path_to_calibration_factors_file, projectPath, metadata) print(f"Calibrated data saved to {path_to_calibrated_file}") diff --git a/pipelines/steps/apply_diagnostic_flags.py b/pipelines/steps/apply_diagnostic_flags.py index 782993a..99750dd 100644 --- a/pipelines/steps/apply_diagnostic_flags.py +++ b/pipelines/steps/apply_diagnostic_flags.py @@ -20,7 +20,9 @@ projectPath = os.path.normpath(os.path.join(dimaPath,'..')) print(dimaPath) sys.path.append(dimaPath) import dima.src.hdf5_ops as dataOps - +import pipelines.steps.utils as stepUtils +import dima.utils.g5505_utils as utils +import json def compute_diagnostic_variable_flags(data_table, validity_thresholds_dict): """ @@ -135,32 +137,15 @@ if __name__ == '__main__': path_to_metadata_file = '/'.join([path_tail, 'data_lineage_metadata.json']) print('Path to output file :', path_to_calibrated_file) - import dima.utils.g5505_utils as utils - import json + print(calibration_factors.keys()) calibrated_table = compute_diagnostic_variable_flags(data_table, calibration_factors) metadata['processing_date'] = utils.created_at() calibrated_table.to_csv(path_to_calibrated_file, index=False) - # Ensure the file exists - if not os.path.exists(path_to_metadata_file): - with open(path_to_metadata_file, 'w') as f: - json.dump({}, f) # Initialize empty JSON + status = stepUtils.record_data_lineage(path_to_calibrated_file, projectPath, metadata) - # Read the existing JSON - with open(path_to_metadata_file, 'r') as metadata_file: - try: - json_dict = json.load(metadata_file) - except json.JSONDecodeError: - json_dict = {} # Start fresh if file is invalid - # Update the JSON object - outputfileRelPath = os.path.relpath(path_to_calibrated_file, start=projectPath).replace(os.sep, '/') - json_dict[outputfileRelPath] = metadata - - # Write updated JSON back to the file - with open(path_to_metadata_file, 'w') as metadata_file: - json.dump(json_dict, metadata_file, indent=4) print(f"Calibrated data saved to {path_to_calibrated_file}") print(f"Metadata for calibrated data saved to {path_to_metadata_file}")