From 2911416431fa383fd6481df6922c296100594325 Mon Sep 17 00:00:00 2001 From: Florez Ospina Juan Felipe Date: Mon, 27 May 2024 18:15:08 +0200 Subject: [PATCH] =?UTF-8?q?Improved=20modularity=20of=20hdf5=5Ffile=20crea?= =?UTF-8?q?tion=20by=20creating=20a=20function=20that=20copies=20the=20int?= =?UTF-8?q?put=20directory=20file=20and=20applies=20directory,=20files,=20?= =?UTF-8?q?and=20extensions=20constraints=20before=20regular=20directory?= =?UTF-8?q?=20to=20hdf5=20transfer.=20See=20=1B[200~def=20copy=5Fdirectory?= =?UTF-8?q?=5Fwith=5Fcontraints(input=5Fdir=5Fpath,=20output=5Fdir=5Fpath,?= =?UTF-8?q?=20select=5Fdir=5Fkeywords,=20select=5Ffile=5Fkeywords,=20allow?= =?UTF-8?q?ed=5Ffile=5Fextensions):?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/hdf5_lib.py | 305 +++++++++++++++++++++++++----------------------- 1 file changed, 160 insertions(+), 145 deletions(-) diff --git a/src/hdf5_lib.py b/src/hdf5_lib.py index 705205b..fd6fce8 100644 --- a/src/hdf5_lib.py +++ b/src/hdf5_lib.py @@ -16,7 +16,8 @@ import src.g5505_file_reader as g5505f_reader import h5py import yaml - +import shutil +import logging # Define mapping from extension to their file reader ext_to_reader_dict = {'.ibw': g5505f_reader.read_xps_ibw_file_as_dict, @@ -228,17 +229,86 @@ def annotate_root_dir(filename,annotation_dict: dict): def is_valid_directory_path(dirpath,select_dir_keywords): activated_keywords = [] - for item in select_dir_keywords: - if len(item.split(os.sep))>1: - is_sublist = all([x in dirpath.split(os.sep) for x in item.split(os.sep)]) - activated_keywords.append(is_sublist) - else: - activated_keywords.append(item in dirpath) + if select_dir_keywords: + for item in select_dir_keywords: + if len(item.split(os.sep))>1: + is_sublist = all([x in dirpath.split(os.sep) for x in item.split(os.sep)]) + activated_keywords.append(is_sublist) + else: + activated_keywords.append(item in dirpath) + else: + activated_keywords.append(True) return any(activated_keywords) +def copy_directory_with_contraints(input_dir_path, output_dir_path, select_dir_keywords, select_file_keywords, allowed_file_extensions): + """ + Copies files from input_dir_path to output_dir_path based on specified constraints. + + Parameters: + input_dir_path (str): Path to the input directory. + output_dir_path (str): Path to the output directory. + select_dir_keywords (list): List of keywords for selecting directories. + select_file_keywords (list): List of keywords for selecting files. + allowed_file_extensions (list): List of allowed file extensions. + + Returns: + path_to_files_dict (dict): dictionary mapping directory paths to lists of copied file names satisfying the constraints. + """ + + date = utils.created_at() + log_dir='logs/' + utils.setup_logging(log_dir, f"copy_directory_with_contraints_{date}.log") + + def has_allowed_extension(filename): + return os.path.splitext(filename)[1] in allowed_file_extensions + + def file_is_selected(filename): + return any(keyword in filename for keyword in select_file_keywords) if select_file_keywords else True + + + # Collect paths of directories, which are directly connected to the root dir and match select_dir_keywords + paths = [] + if select_dir_keywords: + for item in os.listdir(input_dir_path): #Path(input_dir_path).iterdir(): + if any([item in keyword for keyword in select_dir_keywords]): + paths.append(os.path.join(input_dir_path,item)) + else: + paths.append(input_dir_path) #paths.append(Path(input_dir_path)) + + ROOT_DIR = input_dir_path + path_to_files_dict = {} # Dictionary to store directory-file pairs satisfying constraints + + for subpath in paths: + + for dirpath, _, filenames in os.walk(subpath,topdown=False): + + # Reduce filenames to those that are admissible + admissible_filenames = [filename for filename in filenames if has_allowed_extension(filename) and file_is_selected(filename)] + + if admissible_filenames: # Only create directory if there are files to copy + + + relative_dirpath = os.path.relpath(dirpath, ROOT_DIR) + target_dirpath = os.path.join(output_dir_path, relative_dirpath) + #path_to_files_dict[dirpath] = admissible_filenames + path_to_files_dict[target_dirpath] = admissible_filenames + os.makedirs(target_dirpath, exist_ok=True) + + for filename in admissible_filenames: + src_file_path = os.path.join(dirpath, filename) + dest_file_path = os.path.join(target_dirpath, filename) + try: + shutil.copy2(src_file_path, dest_file_path) + except Exception as e: + logging.error("Failed to copy %s: %s", src_file_path, e) + + return path_to_files_dict + + + def create_hdf5_file_from_filesystem_path(output_filename : str, input_file_system_path : str, select_dir_keywords = [], @@ -247,34 +317,32 @@ def create_hdf5_file_from_filesystem_path(output_filename : str, root_metadata_dict : dict = {}): """ - Creates an .h5 file with name "output_filename" that preserves the directory tree (or folder structure) of given a filesystem path. - When file and directory keywords are non-empty, the keywords enable filtering of directory paths and file paths that do not contain the specified keywords. - The data integration capabilities are limited by our file reader, which can only access data from a list of admissible file formats. - These however can be extended. - Directories are groups in the resultsing hdf5 file. - Files are formatted as composite object consisting of a group, file, and attributes. + Creates an .h5 file with name "output_filename" that preserves the directory tree (or folder structure) + of a given filesystem path. When file and directory keywords are non-empty, the keywords enable filtering + of directory paths and file paths that do not contain the specified keywords. + + The data integration capabilities are limited by our file reader, which can only access data from a list of + admissible file formats. These, however, can be extended. Directories are groups in the resulting HDF5 file. + Files are formatted as composite objects consisting of a group, file, and attributes. Parameters: + output_filename (str): Name of the output HDF5 file. + input_file_system_path (str): Path to root directory, specified with forward slashes, e.g., path/to/root. + select_dir_keywords (list): List of string elements to consider or select only directory paths that contain + a word in 'select_dir_keywords'. When empty, all directory paths are considered + to be included in the HDF5 file group hierarchy. + select_file_keywords (list): List of string elements to consider or select only files that contain a word in + 'select_file_keywords'. When empty, all files are considered to be stored in the HDF5 file. + top_sub_dir_mask (bool): Mask for top-level subdirectories. + root_metadata_dict (dict): Metadata to include at the root level of the HDF5 file. - ofilename (str): - - input_file_system_path (str) : path to root directory, specified with forwards slashes, e.g., path/to/root - - select_dir_keywords (list): default value [], - list of string elements to consider or select only directory paths that contain a word in 'select_dir_keywords'. - When empty, all directory paths are considered to be included in the hdf5 file group hierarchy. - - select_file_keywords (list): default value [], - list of string elements to consider or select only files that contain a word in 'select_file_keywords'. - When empty, all files are considered to be stored in the hdf5 file. - Returns: - + str: Path to the created HDF5 file. """ - admissible_file_ext_list = list(ext_to_reader_dict.keys()) # list(config_file.select_file_readers(group_id).keys()) + allowed_file_extensions = list(ext_to_reader_dict.keys()) # list(config_file.select_file_readers(group_id).keys()) if '/' in input_file_system_path: input_file_system_path = input_file_system_path.replace('/',os.sep) @@ -284,136 +352,83 @@ def create_hdf5_file_from_filesystem_path(output_filename : str, for i, keyword in enumerate(select_dir_keywords): select_dir_keywords[i] = keyword.replace('/',os.sep) - # Visit each subdirectory from top to bottom, root directory defined by input_file_sytem_path to the lower - # level directories. - - # Constrain walkable paths on the specified directory tree by allowing walks that start from root - # through subdirectories specified by dir_keywords. This improves efficiency especially, in deep - # directory trees with many leaves. - paths = [] - if top_sub_dir_mask: - for item in os.listdir(input_file_system_path): - if any([item in keyword for keyword in select_dir_keywords]): - paths.append(os.path.join(input_file_system_path,item)) - else: - paths.append(input_file_system_path) + # Copy input_directory into the output_dir_path, and work with it from now on + output_dir_path = os.path.splitext(output_filename)[0].replace('/',os.sep) + path_to_filenames_dict = copy_directory_with_contraints(input_file_system_path, + output_dir_path, + select_dir_keywords, + select_file_keywords, + allowed_file_extensions) + # Set input_directory as copied input directory + root_dir = output_dir_path with h5py.File(output_filename, 'w') as h5file: - for item in paths: + for dirpath, filtered_filenames_list in path_to_filenames_dict.items(): + + # Check if dirpath is valid. TODO: This is perhaps redundant by design of path_to_filenames_dict. + if not is_valid_directory_path(dirpath,select_dir_keywords): + continue + # Check if filtered_filenames_list is nonempty. TODO: This is perhaps redundant by design of path_to_filenames_dict. + if not filtered_filenames_list: + continue - root_dir = input_file_system_path + group_name = dirpath.replace(os.sep,'/') + group_name = group_name.replace(root_dir.replace(os.sep,'/') + '/', '/') - # Create dictionary with directory-files pairs where files satisfy keyword and admisible type contraints - # It requires an extra pass over directory three and additional memory for dictionary, but it may be useful - # to speed up subsequent step and prune resulting directory tree. + # Flatten group name to one level + offset = sum([len(i.split(os.sep)) if i in dirpath else 0 for i in select_dir_keywords]) + tmp_list = group_name.split('/') + if len(tmp_list) > offset+1: + group_name = '/'.join([tmp_list[i] for i in range(offset+1)]) - # For each directory and/or subdirectory, keep files that satisfy file_keyword constraints, and store - # (directory_path, suitable files) relationships in a dictionary. - file_paths_dict = {} + # Group hierarchy is implicitly defined by the forward slashes + if not group_name in h5file.keys(): + h5file.create_group(group_name) + #h5file[group_name].attrs.create(name='filtered_file_list',data=convert_string_to_bytes(filtered_filename_list)) + #h5file[group_name].attrs.create(name='file_list',data=convert_string_to_bytes(filenames_list)) + else: + print(group_name,' was already created.') - #check_file_ext = lambda filename: any([ext in filename for ext in admissible_file_ext_list]) - check_file_ext = lambda filename: os.path.splitext(filename)[1] in admissible_file_ext_list + for filenumber, filename in enumerate(filtered_filenames_list): + + file_name, file_ext = os.path.splitext(filename) + #try: + if not 'h5' in filename: + #file_dict = config_file.select_file_readers(group_id)[file_ext](os.path.join(dirpath,filename)) + file_dict = ext_to_reader_dict[file_ext](os.path.join(dirpath,filename)) - for dirpath, _, filenames in os.walk(item,topdown=False): - file_paths_dict[dirpath] = [] - - # Keep files that have an admissible extension and store them in admissible_filenames list - admissible_filenames = [] - for fn in filenames: - if check_file_ext(fn): - admissible_filenames.append(fn) - - if select_file_keywords: # when select_file_keywords = [], all files are considered - #for filename in admissible_filenames: - for i in range(len(admissible_filenames) - 1, -1, -1): - filename = admissible_filenames[i] - - # Remove files that with filename, not adhering to file keyword constraints. - if not any(keyword in filename for keyword in select_file_keywords): - admissible_filenames.pop(i) - - file_paths_dict[dirpath] = admissible_filenames - else: - file_paths_dict[dirpath] = admissible_filenames - - for node_number, node in enumerate(os.walk(item, topdown=True)): - - dirpath, dirnames, filenames_list = node - - filtered_filename_list = file_paths_dict.get(dirpath,filenames_list.copy()) - - - # Skip subdirectories that do not contain a keyword in the parameter 'select_dir_keywords' when it is nonempty - if select_dir_keywords: - #if (dirpath.count(os.sep) > offset) and not any([item in dirpath for item in select_dir_keywords]): - #tail, dirname = os.path.split(dirpath) - #if not any([item in dirname for item in select_dir_keywords]): - #if not any([item in dirpath for item in select_dir_keywords]): - if not is_valid_directory_path(dirpath,select_dir_keywords): + if not file_dict: continue - group_name = dirpath.replace(os.sep,'/') - group_name = group_name.replace(root_dir.replace(os.sep,'/') + '/', '/') - - # flatten group name to one level - offset = sum([len(i.split(os.sep)) if i in dirpath else 0 for i in select_dir_keywords]) - tmp_list = group_name.split('/') - if len(tmp_list) > offset+1: - group_name = '/'.join([tmp_list[i] for i in range(offset+1)]) - - # Group hierarchy is implicitly defined by the forward slashes - if not group_name in h5file.keys(): - h5file.create_group(group_name) - h5file[group_name].attrs.create(name='filtered_file_list',data=filtered_filename_list) - h5file[group_name].attrs.create(name='file_list',data=filenames_list) - else: - print(group_name,' was already created.') - - - for filenumber, filename in enumerate(filtered_filename_list): + try: + # Create group and add their attributes + h5file[group_name].create_group(name=file_dict['name']) + # Add group attributes + h5file[group_name][file_dict['name']].attrs.update(file_dict['attributes_dict']) + + # Add datasets to just created group + for dataset in file_dict['datasets']: + h5file[group_name][file_dict['name']].create_dataset(name = dataset['name'], + data = dataset['data'], + #dtype = file_dict['dtype'], + shape = dataset['shape']) + + # Add dataset's attributes + attributes = dataset.get('attributes', {}) + h5file[group_name][file_dict['name']][dataset['name']].attrs.update(attributes) + except Exception as inst: + # TODO: log when a file could not be stored as a dataset + print(inst) - # Get file extension (or file type) - file_name, file_ext = os.path.splitext(filename) - - #print(filename) - - #try: - if not 'h5' in filename: - #file_dict = config_file.select_file_readers(group_id)[file_ext](os.path.join(dirpath,filename)) - file_dict = ext_to_reader_dict[file_ext](os.path.join(dirpath,filename)) - - if not file_dict: - continue - - try: - # Create group and add their attributes - h5file[group_name].create_group(name=file_dict['name']) - # Add group attributes - h5file[group_name][file_dict['name']].attrs.update(file_dict['attributes_dict']) - - # Add datasets to just created group - for dataset in file_dict['datasets']: - h5file[group_name][file_dict['name']].create_dataset(name = dataset['name'], - data = dataset['data'], - #dtype = file_dict['dtype'], - shape = dataset['shape']) - - # Add dataset's attributes - attributes = dataset.get('attributes', {}) - h5file[group_name][file_dict['name']][dataset['name']].attrs.update(attributes) - except Exception as inst: - # TODO: log when a file could not be stored as a dataset - print(inst) - - else: - ext_to_reader_dict[file_ext](source_file_path = os.path.join(dirpath,filename), - dest_file_obj = h5file, - dest_group_name = group_name +'/'+filename) - #print(filename,file_ext, ':)') + else: + ext_to_reader_dict[file_ext](source_file_path = os.path.join(dirpath,filename), + dest_file_obj = h5file, + dest_group_name = group_name +'/'+filename) + #print(filename,file_ext, ':)') + - - progressBar(filenumber,len(filtered_filename_list), 'Uploading files in ' + dirpath) + progressBar(filenumber,len(filtered_filenames_list), 'Uploading files in ' + dirpath) if len(root_metadata_dict.keys())>0: