diff --git a/src/meta_ops.py b/src/meta_ops.py new file mode 100644 index 0000000..5827293 --- /dev/null +++ b/src/meta_ops.py @@ -0,0 +1,84 @@ +import sys +import os + +try: + thisFilePath = os.path.abspath(__file__) +except NameError: + print("Error: __file__ is not available. Ensure the script is being run from a file.") + print("[Notice] Path to DIMA package may not be resolved properly.") + thisFilePath = os.getcwd() # Use current directory or specify a default + +dimaPath = os.path.normpath(os.path.join(thisFilePath, "..",'..')) # Move up to project root + +if dimaPath not in sys.path: # Avoid duplicate entries + sys.path.append(dimaPath) + + +import h5py +import pandas as pd +import numpy as np +import logging +import datetime +import yaml +import json +import copy + +import utils.g5505_utils as utils +#import src.hdf5_writer as hdf5_lib +import inspect +from functools import wraps + + +def record_data_lineage(data_level: int = 0): + """Parameterized decorator to record data lineage information. + `data_level` is a user-defined integer. + Adds lineage metadata to dict returns or HDF5 group attributes.""" + + def decorator(function: callable): + # Get relative path to the script where the function is defined + tmpFunctionAbsPath = inspect.getfile(function) + functionFileRelativePath = os.path.relpath(tmpFunctionAbsPath, dimaPath) + func_signature = inspect.signature(function) + + @wraps(function) + def wrapper_func(*args, **kwargs): + # Bind args/kwargs to the function signature + bound_args = func_signature.bind(*args, **kwargs) + bound_args.apply_defaults() + + dest_file_path = bound_args.arguments.get('dest_file_obj_or_path') + dest_group_name = bound_args.arguments.get('dest_group_name') + + # If the file is already an h5py.File object, use its filename + if isinstance(dest_file_path, h5py.File): + dest_file_path = dest_file_path.filename + + # Call the original function + result = function(*args, **kwargs) + + # Prepare lineage metadata + data_lineage_metadata = { + 'data_level': data_level, + 'processing_script': functionFileRelativePath, + 'processing_date': utils.created_at(), + } + + # Case 1: dict result → inject metadata + if isinstance(result, dict): + if 'attributes_dict' not in result: + result['attributes_dict'] = {} + result['attributes_dict'].update(data_lineage_metadata) + + # Case 2: HDF5 group → inject metadata safely + elif dest_file_path and dest_group_name: + if os.path.exists(dest_file_path) and dest_file_path.endswith('.h5'): + with h5py.File(dest_file_path, mode='r+', track_order=True) as fobj: + if dest_group_name in fobj: + for key, value in data_lineage_metadata.items(): + fobj[dest_group_name].attrs[key] = value + + return result + + return wrapper_func + + return decorator