Update src/hdf5_writer.py to consider data lineage metadata in data ingestion process
This commit is contained in:
@ -100,6 +100,20 @@ def create_hdf5_file_from_filesystem_path(path_to_input_directory: str,
|
|||||||
print(message)
|
print(message)
|
||||||
logging.error(message)
|
logging.error(message)
|
||||||
else:
|
else:
|
||||||
|
# Step 1: Preprocess all metadata.json files into a lookup dict
|
||||||
|
all_metadata_dict = {}
|
||||||
|
|
||||||
|
for dirpath, filenames in path_to_filenames_dict.items():
|
||||||
|
metadata_file = next((f for f in filenames if f.endswith('metadata.json')), None)
|
||||||
|
if metadata_file:
|
||||||
|
metadata_path = os.path.join(dirpath, metadata_file)
|
||||||
|
try:
|
||||||
|
with open(metadata_path, 'r') as metafile:
|
||||||
|
all_metadata_dict[dirpath] = json.load(metafile)
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
logging.warning(f"Invalid JSON in metadata file: {metadata_path}")
|
||||||
|
all_metadata_dict[dirpath] = {}
|
||||||
|
|
||||||
with h5py.File(path_to_output_file, mode=mode, track_order=True) as h5file:
|
with h5py.File(path_to_output_file, mode=mode, track_order=True) as h5file:
|
||||||
|
|
||||||
number_of_dirs = len(path_to_filenames_dict.keys())
|
number_of_dirs = len(path_to_filenames_dict.keys())
|
||||||
@ -138,21 +152,14 @@ def create_hdf5_file_from_filesystem_path(path_to_input_directory: str,
|
|||||||
stdout = inst
|
stdout = inst
|
||||||
logging.error('Failed to create group %s into HDF5: %s', group_name, inst)
|
logging.error('Failed to create group %s into HDF5: %s', group_name, inst)
|
||||||
|
|
||||||
if 'data_lineage_metadata.json' in filtered_filenames_list:
|
# Step 3: During ingestion, attach metadata per file
|
||||||
idx = filtered_filenames_list.index('data_lineage_metadata.json')
|
metadata_dict = all_metadata_dict.get(dirpath, {})
|
||||||
data_lineage_file = filtered_filenames_list[idx]
|
|
||||||
try:
|
|
||||||
with open('/'.join([dirpath,data_lineage_file]),'r') as dlf:
|
|
||||||
data_lineage_dict = json.load(dlf)
|
|
||||||
filtered_filenames_list.pop(idx)
|
|
||||||
except json.JSONDecodeError:
|
|
||||||
data_lineage_dict = {} # Start fresh if file is invalid
|
|
||||||
|
|
||||||
else:
|
|
||||||
data_lineage_dict = {}
|
|
||||||
|
|
||||||
|
|
||||||
for filenumber, filename in enumerate(filtered_filenames_list):
|
for filenumber, filename in enumerate(filtered_filenames_list):
|
||||||
|
|
||||||
|
# Skip any file that itself ends in metadata.json
|
||||||
|
if filename.endswith('metadata.json'):
|
||||||
|
continue
|
||||||
|
|
||||||
# hdf5 path to filename group
|
# hdf5 path to filename group
|
||||||
dest_group_name = f'{group_name}/{filename}'
|
dest_group_name = f'{group_name}/{filename}'
|
||||||
@ -163,6 +170,10 @@ def create_hdf5_file_from_filesystem_path(path_to_input_directory: str,
|
|||||||
#file_dict = ext_to_reader_dict[file_ext](os.path.join(dirpath,filename))
|
#file_dict = ext_to_reader_dict[file_ext](os.path.join(dirpath,filename))
|
||||||
file_dict = filereader_registry.select_file_reader(dest_group_name)(source_file_path)
|
file_dict = filereader_registry.select_file_reader(dest_group_name)(source_file_path)
|
||||||
|
|
||||||
|
# Attach per-file metadata if available
|
||||||
|
if filename in metadata_dict:
|
||||||
|
file_dict.get("attributes_dict",{}).update(metadata_dict[filename])
|
||||||
|
|
||||||
stdout = hdf5_ops.save_file_dict_to_hdf5(dest_file_obj, group_name, file_dict)
|
stdout = hdf5_ops.save_file_dict_to_hdf5(dest_file_obj, group_name, file_dict)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
@ -270,6 +281,21 @@ def create_hdf5_file_from_filesystem_path_new(path_to_input_directory: str,
|
|||||||
print(message)
|
print(message)
|
||||||
logging.error(message)
|
logging.error(message)
|
||||||
else:
|
else:
|
||||||
|
|
||||||
|
# Step 1: Preprocess all metadata.json files into a lookup dict
|
||||||
|
all_metadata_dict = {}
|
||||||
|
|
||||||
|
for dirpath, filenames in path_to_filenames_dict.items():
|
||||||
|
metadata_file = next((f for f in filenames if f.endswith('metadata.json')), None)
|
||||||
|
if metadata_file:
|
||||||
|
metadata_path = os.path.join(dirpath, metadata_file)
|
||||||
|
try:
|
||||||
|
with open(metadata_path, 'r') as metafile:
|
||||||
|
all_metadata_dict[dirpath] = json.load(metafile)
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
logging.warning(f"Invalid JSON in metadata file: {metadata_path}")
|
||||||
|
all_metadata_dict[dirpath] = {}
|
||||||
|
|
||||||
with h5py.File(path_to_output_file, mode=mode, track_order=True) as h5file:
|
with h5py.File(path_to_output_file, mode=mode, track_order=True) as h5file:
|
||||||
print('Created file')
|
print('Created file')
|
||||||
|
|
||||||
@ -309,7 +335,14 @@ def create_hdf5_file_from_filesystem_path_new(path_to_input_directory: str,
|
|||||||
# stdout = inst
|
# stdout = inst
|
||||||
# logging.error('Failed to create group %s into HDF5: %s', group_name, inst)
|
# logging.error('Failed to create group %s into HDF5: %s', group_name, inst)
|
||||||
|
|
||||||
|
# Step 3: During ingestion, attach metadata per file
|
||||||
|
# TODO: pass this metadata fict to run_file_reader line 363
|
||||||
|
metadata_dict = all_metadata_dict.get(dirpath, {})
|
||||||
|
|
||||||
for filenumber, filename in enumerate(filtered_filenames_list):
|
for filenumber, filename in enumerate(filtered_filenames_list):
|
||||||
|
|
||||||
|
if filename.endswith('metadata.json'):
|
||||||
|
continue
|
||||||
|
|
||||||
#file_ext = os.path.splitext(filename)[1]
|
#file_ext = os.path.splitext(filename)[1]
|
||||||
#try:
|
#try:
|
||||||
|
Reference in New Issue
Block a user