Implemented create_hdf5_from_filesystem_new() using new instrument readers cml interface and subprocesses. This facilitates extension of file reading capabilities by collaborators without requiring changes to file_registry.py. Only additions in folders and registry.yaml.
This commit is contained in:
@ -26,10 +26,10 @@ import yaml
|
||||
import json
|
||||
import copy
|
||||
|
||||
try:
|
||||
from dima.utils import g5505_utils as utils
|
||||
from dima.src import hdf5_writer as hdf5_lib
|
||||
except ModuleNotFoundError:
|
||||
#try:
|
||||
# from dima.utils import g5505_utils as utils
|
||||
# from dima.src import hdf5_writer as hdf5_lib
|
||||
#except ModuleNotFoundError:
|
||||
import utils.g5505_utils as utils
|
||||
import src.hdf5_writer as hdf5_lib
|
||||
|
||||
@ -706,3 +706,67 @@ if __name__ == "__main__":
|
||||
|
||||
#run(sys.argv[2])
|
||||
|
||||
|
||||
def save_file_dict_to_hdf5(h5file, group_name, file_dict):
|
||||
"""
|
||||
Transfers data from a file_dict to an HDF5 file.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
h5file : h5py.File
|
||||
HDF5 file object where the data will be written.
|
||||
group_name : str
|
||||
Name of the HDF5 group where data will be stored.
|
||||
file_dict : dict
|
||||
Dictionary containing file data to be transferred. Required structure:
|
||||
{
|
||||
'name': str,
|
||||
'attributes_dict': dict,
|
||||
'datasets': [
|
||||
{
|
||||
'name': str,
|
||||
'data': array-like,
|
||||
'shape': tuple,
|
||||
'attributes': dict (optional)
|
||||
},
|
||||
...
|
||||
]
|
||||
}
|
||||
|
||||
Returns
|
||||
-------
|
||||
None
|
||||
"""
|
||||
|
||||
if not file_dict:
|
||||
return
|
||||
|
||||
try:
|
||||
# Create group and add their attributes
|
||||
filename = file_dict['name']
|
||||
group = h5file[group_name].create_group(name=filename)
|
||||
# Add group attributes
|
||||
group.attrs.update(file_dict['attributes_dict'])
|
||||
|
||||
# Add datasets to the just created group
|
||||
for dataset in file_dict['datasets']:
|
||||
dataset_obj = group.create_dataset(
|
||||
name=dataset['name'],
|
||||
data=dataset['data'],
|
||||
shape=dataset['shape']
|
||||
)
|
||||
|
||||
# Add dataset's attributes
|
||||
attributes = dataset.get('attributes', {})
|
||||
dataset_obj.attrs.update(attributes)
|
||||
group.attrs['last_update_date'] = utils.created_at().encode('utf-8')
|
||||
|
||||
stdout = f'Completed transfer for /{group_name}/{filename}'
|
||||
print(stdout)
|
||||
|
||||
except Exception as inst:
|
||||
logging.error('Failed to transfer data into HDF5: %s', inst)
|
||||
return -1
|
||||
|
||||
return 0
|
||||
|
||||
|
@ -8,79 +8,23 @@ import numpy as np
|
||||
import h5py
|
||||
import logging
|
||||
|
||||
try:
|
||||
from dima.utils import g5505_utils as utils
|
||||
from dima.instruments.readers import filereader_registry as filereader_registry
|
||||
except ModuleNotFoundError:
|
||||
#try:
|
||||
# from dima.utils import g5505_utils as utils
|
||||
# from dima.src import hdf5_ops
|
||||
# from dima.instruments import filereader_registry as filereader_registry
|
||||
#except ModuleNotFoundError:
|
||||
import utils.g5505_utils as utils
|
||||
import instruments.readers.filereader_registry as filereader_registry
|
||||
import src.hdf5_ops as hdf5_ops
|
||||
import instruments.filereader_registry as filereader_registry
|
||||
|
||||
|
||||
|
||||
def __transfer_file_dict_to_hdf5(h5file, group_name, file_dict):
|
||||
"""
|
||||
Transfers data from a file_dict to an HDF5 file.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
h5file : h5py.File
|
||||
HDF5 file object where the data will be written.
|
||||
group_name : str
|
||||
Name of the HDF5 group where data will be stored.
|
||||
file_dict : dict
|
||||
Dictionary containing file data to be transferred. Required structure:
|
||||
{
|
||||
'name': str,
|
||||
'attributes_dict': dict,
|
||||
'datasets': [
|
||||
{
|
||||
'name': str,
|
||||
'data': array-like,
|
||||
'shape': tuple,
|
||||
'attributes': dict (optional)
|
||||
},
|
||||
...
|
||||
]
|
||||
}
|
||||
|
||||
Returns
|
||||
-------
|
||||
None
|
||||
"""
|
||||
def __copy_file_in_group(path_to_output_file, source_file_path, dest_group_name, work_with_copy : bool = True):
|
||||
|
||||
if not file_dict:
|
||||
return
|
||||
|
||||
try:
|
||||
# Create group and add their attributes
|
||||
filename = file_dict['name']
|
||||
group = h5file[group_name].create_group(name=filename)
|
||||
# Add group attributes
|
||||
group.attrs.update(file_dict['attributes_dict'])
|
||||
|
||||
# Add datasets to the just created group
|
||||
for dataset in file_dict['datasets']:
|
||||
dataset_obj = group.create_dataset(
|
||||
name=dataset['name'],
|
||||
data=dataset['data'],
|
||||
shape=dataset['shape']
|
||||
)
|
||||
|
||||
# Add dataset's attributes
|
||||
attributes = dataset.get('attributes', {})
|
||||
dataset_obj.attrs.update(attributes)
|
||||
group.attrs['last_update_date'] = utils.created_at().encode('utf-8')
|
||||
|
||||
stdout = f'Completed transfer for /{group_name}/{filename}'
|
||||
|
||||
except Exception as inst:
|
||||
stdout = inst
|
||||
logging.error('Failed to transfer data into HDF5: %s', inst)
|
||||
|
||||
return stdout
|
||||
|
||||
def __copy_file_in_group(source_file_path, dest_file_obj : h5py.File, dest_group_name, work_with_copy : bool = True):
|
||||
# Create copy of original file to avoid possible file corruption and work with it.
|
||||
with h5py.File(path_to_output_file, mode='r+', track_order=True) as dest_file_obj:
|
||||
|
||||
if work_with_copy:
|
||||
tmp_file_path = utils.make_file_copy(source_file_path)
|
||||
@ -95,6 +39,7 @@ def __copy_file_in_group(source_file_path, dest_file_obj : h5py.File, dest_group
|
||||
os.remove(tmp_file_path)
|
||||
|
||||
stdout = f'Completed transfer for /{dest_group_name}'
|
||||
|
||||
return stdout
|
||||
|
||||
def create_hdf5_file_from_filesystem_path(path_to_input_directory: str,
|
||||
@ -220,16 +165,16 @@ def create_hdf5_file_from_filesystem_path(path_to_input_directory: str,
|
||||
|
||||
# hdf5 path to filename group
|
||||
dest_group_name = f'{group_name}/{filename}'
|
||||
source_file_path = os.path.join(dirpath,filename)
|
||||
|
||||
if not 'h5' in filename:
|
||||
#file_dict = config_file.select_file_readers(group_id)[file_ext](os.path.join(dirpath,filename))
|
||||
#file_dict = ext_to_reader_dict[file_ext](os.path.join(dirpath,filename))
|
||||
file_dict = filereader_registry.select_file_reader(dest_group_name)(os.path.join(dirpath,filename))
|
||||
file_dict = filereader_registry.select_file_reader(dest_group_name)(source_file_path)
|
||||
|
||||
stdout = __transfer_file_dict_to_hdf5(h5file, group_name, file_dict)
|
||||
stdout = hdf5_ops.save_file_dict_to_hdf5(h5file, group_name, file_dict)
|
||||
|
||||
else:
|
||||
source_file_path = os.path.join(dirpath,filename)
|
||||
dest_file_obj = h5file
|
||||
#group_name +'/'+filename
|
||||
#ext_to_reader_dict[file_ext](source_file_path, dest_file_obj, dest_group_name)
|
||||
@ -258,6 +203,186 @@ def create_hdf5_file_from_filesystem_path(path_to_input_directory: str,
|
||||
|
||||
return path_to_output_file #, output_yml_filename_path
|
||||
|
||||
def create_hdf5_file_from_filesystem_path_new(path_to_input_directory: str,
|
||||
path_to_filenames_dict: dict = None,
|
||||
select_dir_keywords : list = [],
|
||||
root_metadata_dict : dict = {}, mode = 'w'):
|
||||
|
||||
"""
|
||||
Creates an .h5 file with name "output_filename" that preserves the directory tree (or folder structure)
|
||||
of a given filesystem path.
|
||||
|
||||
The data integration capabilities are limited by our file reader, which can only access data from a list of
|
||||
admissible file formats. These, however, can be extended. Directories are groups in the resulting HDF5 file.
|
||||
Files are formatted as composite objects consisting of a group, file, and attributes.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
output_filename : str
|
||||
Name of the output HDF5 file.
|
||||
path_to_input_directory : str
|
||||
Path to root directory, specified with forward slashes, e.g., path/to/root.
|
||||
|
||||
path_to_filenames_dict : dict, optional
|
||||
A pre-processed dictionary where keys are directory paths on the input directory's tree and values are lists of files.
|
||||
If provided, 'input_file_system_path' is ignored.
|
||||
|
||||
select_dir_keywords : list
|
||||
List of string elements to consider or select only directory paths that contain
|
||||
a word in 'select_dir_keywords'. When empty, all directory paths are considered
|
||||
to be included in the HDF5 file group hierarchy.
|
||||
root_metadata_dict : dict
|
||||
Metadata to include at the root level of the HDF5 file.
|
||||
|
||||
mode : str
|
||||
'w' create File, truncate if it exists, or 'r+' read/write, File must exists. By default, mode = "w".
|
||||
|
||||
Returns
|
||||
-------
|
||||
output_filename : str
|
||||
Path to the created HDF5 file.
|
||||
"""
|
||||
|
||||
|
||||
if not mode in ['w','r+']:
|
||||
raise ValueError(f'Parameter mode must take values in ["w","r+"]')
|
||||
|
||||
if not '/' in path_to_input_directory:
|
||||
raise ValueError('path_to_input_directory needs to be specified using forward slashes "/".' )
|
||||
|
||||
#path_to_output_directory = os.path.join(path_to_input_directory,'..')
|
||||
path_to_input_directory = os.path.normpath(path_to_input_directory).rstrip(os.sep)
|
||||
|
||||
|
||||
for i, keyword in enumerate(select_dir_keywords):
|
||||
select_dir_keywords[i] = keyword.replace('/',os.sep)
|
||||
|
||||
if not path_to_filenames_dict:
|
||||
# On dry_run=True, returns path to files dictionary of the output directory without making a actual copy of the input directory.
|
||||
# Therefore, there wont be a copying conflict by setting up input and output directories the same
|
||||
path_to_filenames_dict = utils.copy_directory_with_contraints(input_dir_path=path_to_input_directory,
|
||||
output_dir_path=path_to_input_directory,
|
||||
dry_run=True)
|
||||
# Set input_directory as copied input directory
|
||||
root_dir = path_to_input_directory
|
||||
path_to_output_file = path_to_input_directory.rstrip(os.path.sep) + '.h5'
|
||||
|
||||
start_message = f'\n[Start] Data integration :\nSource: {path_to_input_directory}\nDestination: {path_to_output_file}\n'
|
||||
|
||||
print(start_message)
|
||||
logging.info(start_message)
|
||||
|
||||
# Check if the .h5 file already exists
|
||||
if os.path.exists(path_to_output_file) and mode in ['w']:
|
||||
message = (
|
||||
f"[Notice] The file '{path_to_output_file}' already exists and will not be overwritten.\n"
|
||||
"If you wish to replace it, please delete the existing file first and rerun the program."
|
||||
)
|
||||
print(message)
|
||||
logging.error(message)
|
||||
else:
|
||||
with h5py.File(path_to_output_file, mode=mode, track_order=True) as h5file:
|
||||
print('Created file')
|
||||
|
||||
number_of_dirs = len(path_to_filenames_dict.keys())
|
||||
dir_number = 1
|
||||
for dirpath, filtered_filenames_list in path_to_filenames_dict.items():
|
||||
|
||||
# Check if filtered_filenames_list is nonempty. TODO: This is perhaps redundant by design of path_to_filenames_dict.
|
||||
if not filtered_filenames_list:
|
||||
continue
|
||||
|
||||
group_name = dirpath.replace(os.sep,'/')
|
||||
group_name = group_name.replace(root_dir.replace(os.sep,'/') + '/', '/')
|
||||
|
||||
# Flatten group name to two level
|
||||
if select_dir_keywords:
|
||||
offset = sum([len(i.split(os.sep)) if i in dirpath else 0 for i in select_dir_keywords])
|
||||
else:
|
||||
offset = 2
|
||||
tmp_list = group_name.split('/')
|
||||
if len(tmp_list) > offset+1:
|
||||
group_name = '/'.join([tmp_list[i] for i in range(offset+1)])
|
||||
|
||||
# try:
|
||||
# # Create group called "group_name". Hierarchy of nested groups can be implicitly defined by the forward slashes
|
||||
# if not group_name in h5file.keys():
|
||||
# h5file.create_group(group_name)
|
||||
# h5file[group_name].attrs['creation_date'] = utils.created_at().encode('utf-8')
|
||||
# #h5file[group_name].attrs.create(name='filtered_file_list',data=convert_string_to_bytes(filtered_filename_list))
|
||||
# #h5file[group_name].attrs.create(name='file_list',data=convert_string_to_bytes(filenames_list))
|
||||
# #else:
|
||||
# #print(group_name,' was already created.')
|
||||
# instFoldermsgStart = f'Starting data transfer from instFolder: {group_name}'
|
||||
# print(instFoldermsgStart)
|
||||
|
||||
# except Exception as inst:
|
||||
# stdout = inst
|
||||
# logging.error('Failed to create group %s into HDF5: %s', group_name, inst)
|
||||
|
||||
for filenumber, filename in enumerate(filtered_filenames_list):
|
||||
|
||||
#file_ext = os.path.splitext(filename)[1]
|
||||
#try:
|
||||
|
||||
# hdf5 path to filename group
|
||||
dest_group_name = f'{group_name}/{filename}'
|
||||
source_file_path = os.path.join(dirpath,filename)
|
||||
|
||||
if not 'h5' in filename:
|
||||
#file_dict = config_file.select_file_readers(group_id)[file_ext](os.path.join(dirpath,filename))
|
||||
#file_dict = ext_to_reader_dict[file_ext](os.path.join(dirpath,filename))
|
||||
|
||||
# TODO: Run save_file_dict_to_hdf5 from reader.py using command line interface
|
||||
#file_dict = filereader_registry.select_file_reader(dest_group_name)(os.path.join(dirpath,filename))
|
||||
|
||||
#stdout = hdf5_ops.save_file_dict_to_hdf5(h5file, group_name, file_dict)
|
||||
|
||||
filereader_registry.run_reader(path_to_output_file, source_file_path, dest_group_name)
|
||||
|
||||
else:
|
||||
|
||||
#try:
|
||||
# # Create group if it does not exist
|
||||
# if dest_group_name not in dest_file_obj:
|
||||
# dest_file_obj.create_group(dest_group_name)
|
||||
# dest_file_obj[dest_group_name].attrs['creation_date'] = utils.created_at().encode('utf-8')
|
||||
# print(f'Created new group: {dest_group_name}')
|
||||
# else:
|
||||
# print(f'Group {dest_group_name} already exists. Proceeding with data transfer...')
|
||||
|
||||
#except Exception as inst:
|
||||
# logging.error('Failed to create group %s in HDF5: %s', dest_group_name, inst)
|
||||
|
||||
|
||||
#group_name +'/'+filename
|
||||
#ext_to_reader_dict[file_ext](source_file_path, dest_file_obj, dest_group_name)
|
||||
#g5505f_reader.select_file_reader(dest_group_name)(source_file_path, dest_file_obj, dest_group_name)
|
||||
stdout = __copy_file_in_group(path_to_output_file, source_file_path, dest_group_name, False)
|
||||
|
||||
# Update the progress bar and log the end message
|
||||
instFoldermsdEnd = f'\nCompleted data transfer for instFolder: {group_name}\n'
|
||||
# Print and log the start message
|
||||
utils.progressBar(dir_number, number_of_dirs, instFoldermsdEnd)
|
||||
logging.info(instFoldermsdEnd )
|
||||
dir_number = dir_number + 1
|
||||
|
||||
print('[End] Data integration')
|
||||
logging.info('[End] Data integration')
|
||||
|
||||
if len(root_metadata_dict.keys())>0:
|
||||
with h5py.File(path_to_output_file, mode='r+', track_order=True) as h5file:
|
||||
for key, value in root_metadata_dict.items():
|
||||
#if key in h5file.attrs:
|
||||
# del h5file.attrs[key]
|
||||
h5file.attrs.create(key, value)
|
||||
#annotate_root_dir(output_filename,root_metadata_dict)
|
||||
|
||||
|
||||
#output_yml_filename_path = hdf5_vis.take_yml_snapshot_of_hdf5_file(output_filename)
|
||||
|
||||
return path_to_output_file #, output_yml_filename_path
|
||||
|
||||
def create_hdf5_file_from_dataframe(ofilename, input_data, group_by_funcs: list, approach: str = None, extract_attrs_func=None):
|
||||
"""
|
||||
Creates an HDF5 file with hierarchical groups based on the specified grouping functions or columns.
|
||||
@ -400,6 +525,6 @@ def save_processed_dataframe_to_hdf5(df, annotator, output_filename): # src_hdf5
|
||||
with h5py.File(output_filename, mode) as h5file:
|
||||
# Add project level attributes at the root/top level
|
||||
h5file.attrs.update(project_level_attributes)
|
||||
__transfer_file_dict_to_hdf5(h5file, '/', file_dict)
|
||||
hdf5_ops.save_file_dict_to_hdf5(h5file, '/', file_dict)
|
||||
|
||||
#if __name__ == '__main__':
|
||||
|
Reference in New Issue
Block a user