Implemented hdf5_file_reader.py and updated register.yaml and hdf5_writer.py. This replaces previous function __copy_file_in_group().
This commit is contained in:
79
instruments/readers/hdf5_file_reader.py
Normal file
79
instruments/readers/hdf5_file_reader.py
Normal file
@ -0,0 +1,79 @@
|
|||||||
|
import sys
|
||||||
|
import os
|
||||||
|
|
||||||
|
try:
|
||||||
|
thisFilePath = os.path.abspath(__file__)
|
||||||
|
except NameError:
|
||||||
|
print("Error: __file__ is not available. Ensure the script is being run from a file.")
|
||||||
|
print("[Notice] Path to DIMA package may not be resolved properly.")
|
||||||
|
thisFilePath = os.getcwd() # Use current directory or specify a default
|
||||||
|
|
||||||
|
dimaPath = os.path.normpath(os.path.join(thisFilePath, "..",'..','..')) # Move up to project root
|
||||||
|
|
||||||
|
if dimaPath not in sys.path: # Avoid duplicate entries
|
||||||
|
sys.path.insert(0,dimaPath)
|
||||||
|
|
||||||
|
import pandas as pd
|
||||||
|
import collections
|
||||||
|
import yaml
|
||||||
|
import h5py
|
||||||
|
import argparse
|
||||||
|
import logging
|
||||||
|
import utils.g5505_utils as utils
|
||||||
|
import src.hdf5_ops as hdf5_ops
|
||||||
|
import instruments.filereader_registry as filereader_registry
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def hdf5_file_reader(dest_file_obj_or_path, src_file_path=None, dest_group_name=None, work_with_copy: bool = True):
|
||||||
|
"""
|
||||||
|
Reads an HDF5 file and copies its contents to a destination group.
|
||||||
|
If an HDF5 file object is provided, it skips reading from a file path.
|
||||||
|
"""
|
||||||
|
# Determine if dest_file_obj_or_path is a file path or an HDF5 file object
|
||||||
|
if isinstance(dest_file_obj_or_path, h5py.File):
|
||||||
|
dest_file_obj = dest_file_obj_or_path
|
||||||
|
else:
|
||||||
|
dest_file_obj = h5py.File(dest_file_obj_or_path, mode='r+', track_order=True)
|
||||||
|
|
||||||
|
try:
|
||||||
|
if work_with_copy:
|
||||||
|
tmp_src_file_path = utils.make_file_copy(src_file_path)
|
||||||
|
else:
|
||||||
|
tmp_src_file_path = src_file_path
|
||||||
|
|
||||||
|
# Open source HDF5 file
|
||||||
|
with h5py.File(tmp_src_file_path, 'r') as src_file:
|
||||||
|
dest_file_obj.copy(source=src_file['/'], dest=dest_group_name)
|
||||||
|
|
||||||
|
# Remove temporary file if created
|
||||||
|
if 'tmp_files' in tmp_src_file_path:
|
||||||
|
os.remove(tmp_src_file_path)
|
||||||
|
|
||||||
|
print(f'Completed transfer for /{dest_group_name}')
|
||||||
|
|
||||||
|
finally:
|
||||||
|
if not isinstance(dest_file_obj_or_path, h5py.File):
|
||||||
|
dest_file_obj.close()
|
||||||
|
|
||||||
|
return 0
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
|
||||||
|
# Set up argument parsing
|
||||||
|
parser = argparse.ArgumentParser(description="Data ingestion process to HDF5 files.")
|
||||||
|
parser.add_argument('dst_file_path', type=str, help="Path to the target HDF5 file.")
|
||||||
|
parser.add_argument('src_file_path', type=str, help="Path to source HDF5 file to be saved to target HDF5 file.")
|
||||||
|
parser.add_argument('dst_group_name', type=str, help="Group name '/instFolder/[category]/fileName' in the target HDF5 file.")
|
||||||
|
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
dst_file_path = args.dst_file_path
|
||||||
|
src_file_path = args.src_file_path
|
||||||
|
dst_group_name = args.dst_group_name
|
||||||
|
default_mode = 'r+'
|
||||||
|
|
||||||
|
status = hdf5_file_reader(dst_file_path, src_file_path, dst_group_name)
|
||||||
|
|
||||||
|
print('Return status: {status}')
|
@ -6,7 +6,7 @@ instruments:
|
|||||||
|
|
||||||
- instrumentFolderName: NEXAFS
|
- instrumentFolderName: NEXAFS
|
||||||
fileExtension: h5
|
fileExtension: h5
|
||||||
fileReaderPath: null
|
fileReaderPath: instruments/readers/hdf5_file_reader.py
|
||||||
InstrumentDictionaryPath: null
|
InstrumentDictionaryPath: null
|
||||||
|
|
||||||
- instrumentFolderName: SES
|
- instrumentFolderName: SES
|
||||||
|
@ -22,27 +22,6 @@ import instruments.filereader_registry as filereader_registry
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
def __copy_file_in_group(path_to_output_file, source_file_path, dest_group_name, work_with_copy : bool = True):
|
|
||||||
|
|
||||||
# Create copy of original file to avoid possible file corruption and work with it.
|
|
||||||
with h5py.File(path_to_output_file, mode='r+', track_order=True) as dest_file_obj:
|
|
||||||
|
|
||||||
if work_with_copy:
|
|
||||||
tmp_file_path = utils.make_file_copy(source_file_path)
|
|
||||||
else:
|
|
||||||
tmp_file_path = source_file_path
|
|
||||||
|
|
||||||
# Open backup h5 file and copy complet filesystem directory onto a group in h5file
|
|
||||||
with h5py.File(tmp_file_path,'r') as src_file:
|
|
||||||
dest_file_obj.copy(source= src_file['/'], dest= dest_group_name)
|
|
||||||
|
|
||||||
if 'tmp_files' in tmp_file_path:
|
|
||||||
os.remove(tmp_file_path)
|
|
||||||
|
|
||||||
stdout = f'Completed transfer for /{dest_group_name}'
|
|
||||||
|
|
||||||
return stdout
|
|
||||||
|
|
||||||
def create_hdf5_file_from_filesystem_path(path_to_input_directory: str,
|
def create_hdf5_file_from_filesystem_path(path_to_input_directory: str,
|
||||||
path_to_filenames_dict: dict = None,
|
path_to_filenames_dict: dict = None,
|
||||||
select_dir_keywords : list = [],
|
select_dir_keywords : list = [],
|
||||||
@ -178,20 +157,18 @@ def create_hdf5_file_from_filesystem_path(path_to_input_directory: str,
|
|||||||
# hdf5 path to filename group
|
# hdf5 path to filename group
|
||||||
dest_group_name = f'{group_name}/{filename}'
|
dest_group_name = f'{group_name}/{filename}'
|
||||||
source_file_path = os.path.join(dirpath,filename)
|
source_file_path = os.path.join(dirpath,filename)
|
||||||
|
dest_file_obj = h5file
|
||||||
if not 'h5' in filename:
|
if not 'h5' in filename:
|
||||||
#file_dict = config_file.select_file_readers(group_id)[file_ext](os.path.join(dirpath,filename))
|
#file_dict = config_file.select_file_readers(group_id)[file_ext](os.path.join(dirpath,filename))
|
||||||
#file_dict = ext_to_reader_dict[file_ext](os.path.join(dirpath,filename))
|
#file_dict = ext_to_reader_dict[file_ext](os.path.join(dirpath,filename))
|
||||||
file_dict = filereader_registry.select_file_reader(dest_group_name)(source_file_path)
|
file_dict = filereader_registry.select_file_reader(dest_group_name)(source_file_path)
|
||||||
|
|
||||||
stdout = hdf5_ops.save_file_dict_to_hdf5(h5file, group_name, file_dict)
|
stdout = hdf5_ops.save_file_dict_to_hdf5(dest_file_obj, group_name, file_dict)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
dest_file_obj = h5file
|
from instruments.readers.hdf5_file_reader import hdf5_file_reader
|
||||||
#group_name +'/'+filename
|
stdout = hdf5_file_reader(dest_file_obj, source_file_path, dest_group_name)
|
||||||
#ext_to_reader_dict[file_ext](source_file_path, dest_file_obj, dest_group_name)
|
#stdout = __copy_file_in_group(source_file_path, dest_file_obj, dest_group_name, False)
|
||||||
#g5505f_reader.select_file_reader(dest_group_name)(source_file_path, dest_file_obj, dest_group_name)
|
|
||||||
stdout = __copy_file_in_group(source_file_path, dest_file_obj, dest_group_name, False)
|
|
||||||
|
|
||||||
# Update the progress bar and log the end message
|
# Update the progress bar and log the end message
|
||||||
instFoldermsdEnd = f'\nCompleted data transfer for instFolder: {group_name}\n'
|
instFoldermsdEnd = f'\nCompleted data transfer for instFolder: {group_name}\n'
|
||||||
@ -353,7 +330,7 @@ def create_hdf5_file_from_filesystem_path_new(path_to_input_directory: str,
|
|||||||
filereader_registry.run_reader(path_to_output_file, source_file_path, dest_group_name)
|
filereader_registry.run_reader(path_to_output_file, source_file_path, dest_group_name)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
|
from instruments.readers.hdf5_file_reader import hdf5_file_reader
|
||||||
#try:
|
#try:
|
||||||
# # Create group if it does not exist
|
# # Create group if it does not exist
|
||||||
# if dest_group_name not in dest_file_obj:
|
# if dest_group_name not in dest_file_obj:
|
||||||
@ -370,7 +347,7 @@ def create_hdf5_file_from_filesystem_path_new(path_to_input_directory: str,
|
|||||||
#group_name +'/'+filename
|
#group_name +'/'+filename
|
||||||
#ext_to_reader_dict[file_ext](source_file_path, dest_file_obj, dest_group_name)
|
#ext_to_reader_dict[file_ext](source_file_path, dest_file_obj, dest_group_name)
|
||||||
#g5505f_reader.select_file_reader(dest_group_name)(source_file_path, dest_file_obj, dest_group_name)
|
#g5505f_reader.select_file_reader(dest_group_name)(source_file_path, dest_file_obj, dest_group_name)
|
||||||
stdout = __copy_file_in_group(path_to_output_file, source_file_path, dest_group_name, False)
|
stdout = hdf5_file_reader(path_to_output_file, source_file_path, dest_group_name, False)
|
||||||
|
|
||||||
# Update the progress bar and log the end message
|
# Update the progress bar and log the end message
|
||||||
instFoldermsdEnd = f'\nCompleted data transfer for instFolder: {group_name}\n'
|
instFoldermsdEnd = f'\nCompleted data transfer for instFolder: {group_name}\n'
|
||||||
|
Reference in New Issue
Block a user