diff --git a/src/hdf5_ops.py b/src/hdf5_ops.py index 9b8575d..4c4bc92 100644 --- a/src/hdf5_ops.py +++ b/src/hdf5_ops.py @@ -26,12 +26,12 @@ import yaml import json import copy -try: - from dima.utils import g5505_utils as utils - from dima.src import hdf5_writer as hdf5_lib -except ModuleNotFoundError: - import utils.g5505_utils as utils - import src.hdf5_writer as hdf5_lib +#try: +# from dima.utils import g5505_utils as utils +# from dima.src import hdf5_writer as hdf5_lib +#except ModuleNotFoundError: +import utils.g5505_utils as utils +import src.hdf5_writer as hdf5_lib class HDF5DataOpsManager(): @@ -706,3 +706,67 @@ if __name__ == "__main__": #run(sys.argv[2]) + +def save_file_dict_to_hdf5(h5file, group_name, file_dict): + """ + Transfers data from a file_dict to an HDF5 file. + + Parameters + ---------- + h5file : h5py.File + HDF5 file object where the data will be written. + group_name : str + Name of the HDF5 group where data will be stored. + file_dict : dict + Dictionary containing file data to be transferred. Required structure: + { + 'name': str, + 'attributes_dict': dict, + 'datasets': [ + { + 'name': str, + 'data': array-like, + 'shape': tuple, + 'attributes': dict (optional) + }, + ... + ] + } + + Returns + ------- + None + """ + + if not file_dict: + return + + try: + # Create group and add their attributes + filename = file_dict['name'] + group = h5file[group_name].create_group(name=filename) + # Add group attributes + group.attrs.update(file_dict['attributes_dict']) + + # Add datasets to the just created group + for dataset in file_dict['datasets']: + dataset_obj = group.create_dataset( + name=dataset['name'], + data=dataset['data'], + shape=dataset['shape'] + ) + + # Add dataset's attributes + attributes = dataset.get('attributes', {}) + dataset_obj.attrs.update(attributes) + group.attrs['last_update_date'] = utils.created_at().encode('utf-8') + + stdout = f'Completed transfer for /{group_name}/{filename}' + print(stdout) + + except Exception as inst: + logging.error('Failed to transfer data into HDF5: %s', inst) + return -1 + + return 0 + diff --git a/src/hdf5_writer.py b/src/hdf5_writer.py index c8f1475..3113bea 100644 --- a/src/hdf5_writer.py +++ b/src/hdf5_writer.py @@ -8,93 +8,38 @@ import numpy as np import h5py import logging -try: - from dima.utils import g5505_utils as utils - from dima.instruments.readers import filereader_registry as filereader_registry -except ModuleNotFoundError: - import utils.g5505_utils as utils - import instruments.readers.filereader_registry as filereader_registry +#try: +# from dima.utils import g5505_utils as utils +# from dima.src import hdf5_ops +# from dima.instruments import filereader_registry as filereader_registry +#except ModuleNotFoundError: +import utils.g5505_utils as utils +import src.hdf5_ops as hdf5_ops +import instruments.filereader_registry as filereader_registry -def __transfer_file_dict_to_hdf5(h5file, group_name, file_dict): - """ - Transfers data from a file_dict to an HDF5 file. - Parameters - ---------- - h5file : h5py.File - HDF5 file object where the data will be written. - group_name : str - Name of the HDF5 group where data will be stored. - file_dict : dict - Dictionary containing file data to be transferred. Required structure: - { - 'name': str, - 'attributes_dict': dict, - 'datasets': [ - { - 'name': str, - 'data': array-like, - 'shape': tuple, - 'attributes': dict (optional) - }, - ... - ] - } - Returns - ------- - None - """ +def __copy_file_in_group(path_to_output_file, source_file_path, dest_group_name, work_with_copy : bool = True): - if not file_dict: - return - - try: - # Create group and add their attributes - filename = file_dict['name'] - group = h5file[group_name].create_group(name=filename) - # Add group attributes - group.attrs.update(file_dict['attributes_dict']) - - # Add datasets to the just created group - for dataset in file_dict['datasets']: - dataset_obj = group.create_dataset( - name=dataset['name'], - data=dataset['data'], - shape=dataset['shape'] - ) - - # Add dataset's attributes - attributes = dataset.get('attributes', {}) - dataset_obj.attrs.update(attributes) - group.attrs['last_update_date'] = utils.created_at().encode('utf-8') - - stdout = f'Completed transfer for /{group_name}/{filename}' - - except Exception as inst: - stdout = inst - logging.error('Failed to transfer data into HDF5: %s', inst) - - return stdout - -def __copy_file_in_group(source_file_path, dest_file_obj : h5py.File, dest_group_name, work_with_copy : bool = True): # Create copy of original file to avoid possible file corruption and work with it. + with h5py.File(path_to_output_file, mode='r+', track_order=True) as dest_file_obj: - if work_with_copy: - tmp_file_path = utils.make_file_copy(source_file_path) - else: - tmp_file_path = source_file_path + if work_with_copy: + tmp_file_path = utils.make_file_copy(source_file_path) + else: + tmp_file_path = source_file_path - # Open backup h5 file and copy complet filesystem directory onto a group in h5file - with h5py.File(tmp_file_path,'r') as src_file: - dest_file_obj.copy(source= src_file['/'], dest= dest_group_name) + # Open backup h5 file and copy complet filesystem directory onto a group in h5file + with h5py.File(tmp_file_path,'r') as src_file: + dest_file_obj.copy(source= src_file['/'], dest= dest_group_name) - if 'tmp_files' in tmp_file_path: - os.remove(tmp_file_path) + if 'tmp_files' in tmp_file_path: + os.remove(tmp_file_path) - stdout = f'Completed transfer for /{dest_group_name}' + stdout = f'Completed transfer for /{dest_group_name}' + return stdout def create_hdf5_file_from_filesystem_path(path_to_input_directory: str, @@ -220,16 +165,16 @@ def create_hdf5_file_from_filesystem_path(path_to_input_directory: str, # hdf5 path to filename group dest_group_name = f'{group_name}/{filename}' + source_file_path = os.path.join(dirpath,filename) if not 'h5' in filename: #file_dict = config_file.select_file_readers(group_id)[file_ext](os.path.join(dirpath,filename)) #file_dict = ext_to_reader_dict[file_ext](os.path.join(dirpath,filename)) - file_dict = filereader_registry.select_file_reader(dest_group_name)(os.path.join(dirpath,filename)) + file_dict = filereader_registry.select_file_reader(dest_group_name)(source_file_path) - stdout = __transfer_file_dict_to_hdf5(h5file, group_name, file_dict) + stdout = hdf5_ops.save_file_dict_to_hdf5(h5file, group_name, file_dict) else: - source_file_path = os.path.join(dirpath,filename) dest_file_obj = h5file #group_name +'/'+filename #ext_to_reader_dict[file_ext](source_file_path, dest_file_obj, dest_group_name) @@ -258,6 +203,186 @@ def create_hdf5_file_from_filesystem_path(path_to_input_directory: str, return path_to_output_file #, output_yml_filename_path +def create_hdf5_file_from_filesystem_path_new(path_to_input_directory: str, + path_to_filenames_dict: dict = None, + select_dir_keywords : list = [], + root_metadata_dict : dict = {}, mode = 'w'): + + """ + Creates an .h5 file with name "output_filename" that preserves the directory tree (or folder structure) + of a given filesystem path. + + The data integration capabilities are limited by our file reader, which can only access data from a list of + admissible file formats. These, however, can be extended. Directories are groups in the resulting HDF5 file. + Files are formatted as composite objects consisting of a group, file, and attributes. + + Parameters + ---------- + output_filename : str + Name of the output HDF5 file. + path_to_input_directory : str + Path to root directory, specified with forward slashes, e.g., path/to/root. + + path_to_filenames_dict : dict, optional + A pre-processed dictionary where keys are directory paths on the input directory's tree and values are lists of files. + If provided, 'input_file_system_path' is ignored. + + select_dir_keywords : list + List of string elements to consider or select only directory paths that contain + a word in 'select_dir_keywords'. When empty, all directory paths are considered + to be included in the HDF5 file group hierarchy. + root_metadata_dict : dict + Metadata to include at the root level of the HDF5 file. + + mode : str + 'w' create File, truncate if it exists, or 'r+' read/write, File must exists. By default, mode = "w". + + Returns + ------- + output_filename : str + Path to the created HDF5 file. + """ + + + if not mode in ['w','r+']: + raise ValueError(f'Parameter mode must take values in ["w","r+"]') + + if not '/' in path_to_input_directory: + raise ValueError('path_to_input_directory needs to be specified using forward slashes "/".' ) + + #path_to_output_directory = os.path.join(path_to_input_directory,'..') + path_to_input_directory = os.path.normpath(path_to_input_directory).rstrip(os.sep) + + + for i, keyword in enumerate(select_dir_keywords): + select_dir_keywords[i] = keyword.replace('/',os.sep) + + if not path_to_filenames_dict: + # On dry_run=True, returns path to files dictionary of the output directory without making a actual copy of the input directory. + # Therefore, there wont be a copying conflict by setting up input and output directories the same + path_to_filenames_dict = utils.copy_directory_with_contraints(input_dir_path=path_to_input_directory, + output_dir_path=path_to_input_directory, + dry_run=True) + # Set input_directory as copied input directory + root_dir = path_to_input_directory + path_to_output_file = path_to_input_directory.rstrip(os.path.sep) + '.h5' + + start_message = f'\n[Start] Data integration :\nSource: {path_to_input_directory}\nDestination: {path_to_output_file}\n' + + print(start_message) + logging.info(start_message) + + # Check if the .h5 file already exists + if os.path.exists(path_to_output_file) and mode in ['w']: + message = ( + f"[Notice] The file '{path_to_output_file}' already exists and will not be overwritten.\n" + "If you wish to replace it, please delete the existing file first and rerun the program." + ) + print(message) + logging.error(message) + else: + with h5py.File(path_to_output_file, mode=mode, track_order=True) as h5file: + print('Created file') + + number_of_dirs = len(path_to_filenames_dict.keys()) + dir_number = 1 + for dirpath, filtered_filenames_list in path_to_filenames_dict.items(): + + # Check if filtered_filenames_list is nonempty. TODO: This is perhaps redundant by design of path_to_filenames_dict. + if not filtered_filenames_list: + continue + + group_name = dirpath.replace(os.sep,'/') + group_name = group_name.replace(root_dir.replace(os.sep,'/') + '/', '/') + + # Flatten group name to two level + if select_dir_keywords: + offset = sum([len(i.split(os.sep)) if i in dirpath else 0 for i in select_dir_keywords]) + else: + offset = 2 + tmp_list = group_name.split('/') + if len(tmp_list) > offset+1: + group_name = '/'.join([tmp_list[i] for i in range(offset+1)]) + + # try: + # # Create group called "group_name". Hierarchy of nested groups can be implicitly defined by the forward slashes + # if not group_name in h5file.keys(): + # h5file.create_group(group_name) + # h5file[group_name].attrs['creation_date'] = utils.created_at().encode('utf-8') + # #h5file[group_name].attrs.create(name='filtered_file_list',data=convert_string_to_bytes(filtered_filename_list)) + # #h5file[group_name].attrs.create(name='file_list',data=convert_string_to_bytes(filenames_list)) + # #else: + # #print(group_name,' was already created.') + # instFoldermsgStart = f'Starting data transfer from instFolder: {group_name}' + # print(instFoldermsgStart) + + # except Exception as inst: + # stdout = inst + # logging.error('Failed to create group %s into HDF5: %s', group_name, inst) + + for filenumber, filename in enumerate(filtered_filenames_list): + + #file_ext = os.path.splitext(filename)[1] + #try: + + # hdf5 path to filename group + dest_group_name = f'{group_name}/{filename}' + source_file_path = os.path.join(dirpath,filename) + + if not 'h5' in filename: + #file_dict = config_file.select_file_readers(group_id)[file_ext](os.path.join(dirpath,filename)) + #file_dict = ext_to_reader_dict[file_ext](os.path.join(dirpath,filename)) + + # TODO: Run save_file_dict_to_hdf5 from reader.py using command line interface + #file_dict = filereader_registry.select_file_reader(dest_group_name)(os.path.join(dirpath,filename)) + + #stdout = hdf5_ops.save_file_dict_to_hdf5(h5file, group_name, file_dict) + + filereader_registry.run_reader(path_to_output_file, source_file_path, dest_group_name) + + else: + + #try: + # # Create group if it does not exist + # if dest_group_name not in dest_file_obj: + # dest_file_obj.create_group(dest_group_name) + # dest_file_obj[dest_group_name].attrs['creation_date'] = utils.created_at().encode('utf-8') + # print(f'Created new group: {dest_group_name}') + # else: + # print(f'Group {dest_group_name} already exists. Proceeding with data transfer...') + + #except Exception as inst: + # logging.error('Failed to create group %s in HDF5: %s', dest_group_name, inst) + + + #group_name +'/'+filename + #ext_to_reader_dict[file_ext](source_file_path, dest_file_obj, dest_group_name) + #g5505f_reader.select_file_reader(dest_group_name)(source_file_path, dest_file_obj, dest_group_name) + stdout = __copy_file_in_group(path_to_output_file, source_file_path, dest_group_name, False) + + # Update the progress bar and log the end message + instFoldermsdEnd = f'\nCompleted data transfer for instFolder: {group_name}\n' + # Print and log the start message + utils.progressBar(dir_number, number_of_dirs, instFoldermsdEnd) + logging.info(instFoldermsdEnd ) + dir_number = dir_number + 1 + + print('[End] Data integration') + logging.info('[End] Data integration') + + if len(root_metadata_dict.keys())>0: + with h5py.File(path_to_output_file, mode='r+', track_order=True) as h5file: + for key, value in root_metadata_dict.items(): + #if key in h5file.attrs: + # del h5file.attrs[key] + h5file.attrs.create(key, value) + #annotate_root_dir(output_filename,root_metadata_dict) + + + #output_yml_filename_path = hdf5_vis.take_yml_snapshot_of_hdf5_file(output_filename) + + return path_to_output_file #, output_yml_filename_path + def create_hdf5_file_from_dataframe(ofilename, input_data, group_by_funcs: list, approach: str = None, extract_attrs_func=None): """ Creates an HDF5 file with hierarchical groups based on the specified grouping functions or columns. @@ -400,6 +525,6 @@ def save_processed_dataframe_to_hdf5(df, annotator, output_filename): # src_hdf5 with h5py.File(output_filename, mode) as h5file: # Add project level attributes at the root/top level h5file.attrs.update(project_level_attributes) - __transfer_file_dict_to_hdf5(h5file, '/', file_dict) + hdf5_ops.save_file_dict_to_hdf5(h5file, '/', file_dict) #if __name__ == '__main__':