Implement utils module for reusable functions across steps in the pipelines

This commit is contained in:
2025-02-06 16:54:36 +01:00
parent 6ed9fe1eb1
commit ad95d11a0d

33
pipelines/steps/utils.py Normal file
View File

@ -0,0 +1,33 @@
import os
import json
def record_data_lineage(path_to_output_file, projectPath, metadata):
path_to_output_dir, output_file = os.path.split(path_to_output_file)
path_to_metadata_file = '/'.join([path_to_output_dir,'data_lineage_metadata.json'])
# Ensure the file exists
if not os.path.exists(path_to_metadata_file):
with open(path_to_metadata_file, 'w') as f:
json.dump({}, f) # Initialize empty JSON
# Read the existing JSON
with open(path_to_metadata_file, 'r') as metadata_file:
try:
json_dict = json.load(metadata_file)
except json.JSONDecodeError:
json_dict = {} # Start fresh if file is invalid
# Compute relative output file path and update the JSON object
relpath_to_output_file = os.path.relpath(path_to_output_file, start=projectPath).replace(os.sep, '/')
json_dict[relpath_to_output_file] = metadata
# Write updated JSON back to the file
with open(path_to_metadata_file, 'w') as metadata_file:
json.dump(json_dict, metadata_file, indent=4)
print(f"Metadata for calibrated data saved to {path_to_metadata_file}")
return 0