Implement record_data_lineage.py to be used as a parameterized decorator. This is to simplify provenance tracking on newly added file readers.

This commit is contained in:
2025-09-19 19:01:08 +02:00
parent 8daa57c396
commit e96ecfa951

84
src/meta_ops.py Normal file
View File

@@ -0,0 +1,84 @@
import sys
import os
try:
thisFilePath = os.path.abspath(__file__)
except NameError:
print("Error: __file__ is not available. Ensure the script is being run from a file.")
print("[Notice] Path to DIMA package may not be resolved properly.")
thisFilePath = os.getcwd() # Use current directory or specify a default
dimaPath = os.path.normpath(os.path.join(thisFilePath, "..",'..')) # Move up to project root
if dimaPath not in sys.path: # Avoid duplicate entries
sys.path.append(dimaPath)
import h5py
import pandas as pd
import numpy as np
import logging
import datetime
import yaml
import json
import copy
import utils.g5505_utils as utils
#import src.hdf5_writer as hdf5_lib
import inspect
from functools import wraps
def record_data_lineage(data_level: int = 0):
"""Parameterized decorator to record data lineage information.
`data_level` is a user-defined integer.
Adds lineage metadata to dict returns or HDF5 group attributes."""
def decorator(function: callable):
# Get relative path to the script where the function is defined
tmpFunctionAbsPath = inspect.getfile(function)
functionFileRelativePath = os.path.relpath(tmpFunctionAbsPath, dimaPath)
func_signature = inspect.signature(function)
@wraps(function)
def wrapper_func(*args, **kwargs):
# Bind args/kwargs to the function signature
bound_args = func_signature.bind(*args, **kwargs)
bound_args.apply_defaults()
dest_file_path = bound_args.arguments.get('dest_file_obj_or_path')
dest_group_name = bound_args.arguments.get('dest_group_name')
# If the file is already an h5py.File object, use its filename
if isinstance(dest_file_path, h5py.File):
dest_file_path = dest_file_path.filename
# Call the original function
result = function(*args, **kwargs)
# Prepare lineage metadata
data_lineage_metadata = {
'data_level': data_level,
'processing_script': functionFileRelativePath,
'processing_date': utils.created_at(),
}
# Case 1: dict result → inject metadata
if isinstance(result, dict):
if 'attributes_dict' not in result:
result['attributes_dict'] = {}
result['attributes_dict'].update(data_lineage_metadata)
# Case 2: HDF5 group → inject metadata safely
elif dest_file_path and dest_group_name:
if os.path.exists(dest_file_path) and dest_file_path.endswith('.h5'):
with h5py.File(dest_file_path, mode='r+', track_order=True) as fobj:
if dest_group_name in fobj:
for key, value in data_lineage_metadata.items():
fobj[dest_group_name].attrs[key] = value
return result
return wrapper_func
return decorator