Improved modularity of hdf5_file creation by creating a function that copies the intput directory file and applies directory, files, and extensions constraints before regular directory to hdf5 transfer. See [200~def copy_directory_with_contraints(input_dir_path, output_dir_path, select_dir_keywords, select_file_keywords, allowed_file_extensions):

This commit is contained in:
2024-05-27 18:15:08 +02:00
parent 24a2d5d37e
commit 2911416431

View File

@ -16,7 +16,8 @@ import src.g5505_file_reader as g5505f_reader
import h5py
import yaml
import shutil
import logging
# Define mapping from extension to their file reader
ext_to_reader_dict = {'.ibw': g5505f_reader.read_xps_ibw_file_as_dict,
@ -228,17 +229,86 @@ def annotate_root_dir(filename,annotation_dict: dict):
def is_valid_directory_path(dirpath,select_dir_keywords):
activated_keywords = []
for item in select_dir_keywords:
if len(item.split(os.sep))>1:
is_sublist = all([x in dirpath.split(os.sep) for x in item.split(os.sep)])
activated_keywords.append(is_sublist)
else:
activated_keywords.append(item in dirpath)
if select_dir_keywords:
for item in select_dir_keywords:
if len(item.split(os.sep))>1:
is_sublist = all([x in dirpath.split(os.sep) for x in item.split(os.sep)])
activated_keywords.append(is_sublist)
else:
activated_keywords.append(item in dirpath)
else:
activated_keywords.append(True)
return any(activated_keywords)
def copy_directory_with_contraints(input_dir_path, output_dir_path, select_dir_keywords, select_file_keywords, allowed_file_extensions):
"""
Copies files from input_dir_path to output_dir_path based on specified constraints.
Parameters:
input_dir_path (str): Path to the input directory.
output_dir_path (str): Path to the output directory.
select_dir_keywords (list): List of keywords for selecting directories.
select_file_keywords (list): List of keywords for selecting files.
allowed_file_extensions (list): List of allowed file extensions.
Returns:
path_to_files_dict (dict): dictionary mapping directory paths to lists of copied file names satisfying the constraints.
"""
date = utils.created_at()
log_dir='logs/'
utils.setup_logging(log_dir, f"copy_directory_with_contraints_{date}.log")
def has_allowed_extension(filename):
return os.path.splitext(filename)[1] in allowed_file_extensions
def file_is_selected(filename):
return any(keyword in filename for keyword in select_file_keywords) if select_file_keywords else True
# Collect paths of directories, which are directly connected to the root dir and match select_dir_keywords
paths = []
if select_dir_keywords:
for item in os.listdir(input_dir_path): #Path(input_dir_path).iterdir():
if any([item in keyword for keyword in select_dir_keywords]):
paths.append(os.path.join(input_dir_path,item))
else:
paths.append(input_dir_path) #paths.append(Path(input_dir_path))
ROOT_DIR = input_dir_path
path_to_files_dict = {} # Dictionary to store directory-file pairs satisfying constraints
for subpath in paths:
for dirpath, _, filenames in os.walk(subpath,topdown=False):
# Reduce filenames to those that are admissible
admissible_filenames = [filename for filename in filenames if has_allowed_extension(filename) and file_is_selected(filename)]
if admissible_filenames: # Only create directory if there are files to copy
relative_dirpath = os.path.relpath(dirpath, ROOT_DIR)
target_dirpath = os.path.join(output_dir_path, relative_dirpath)
#path_to_files_dict[dirpath] = admissible_filenames
path_to_files_dict[target_dirpath] = admissible_filenames
os.makedirs(target_dirpath, exist_ok=True)
for filename in admissible_filenames:
src_file_path = os.path.join(dirpath, filename)
dest_file_path = os.path.join(target_dirpath, filename)
try:
shutil.copy2(src_file_path, dest_file_path)
except Exception as e:
logging.error("Failed to copy %s: %s", src_file_path, e)
return path_to_files_dict
def create_hdf5_file_from_filesystem_path(output_filename : str,
input_file_system_path : str,
select_dir_keywords = [],
@ -247,34 +317,32 @@ def create_hdf5_file_from_filesystem_path(output_filename : str,
root_metadata_dict : dict = {}):
"""
Creates an .h5 file with name "output_filename" that preserves the directory tree (or folder structure) of given a filesystem path.
When file and directory keywords are non-empty, the keywords enable filtering of directory paths and file paths that do not contain the specified keywords.
The data integration capabilities are limited by our file reader, which can only access data from a list of admissible file formats.
These however can be extended.
Directories are groups in the resultsing hdf5 file.
Files are formatted as composite object consisting of a group, file, and attributes.
Creates an .h5 file with name "output_filename" that preserves the directory tree (or folder structure)
of a given filesystem path. When file and directory keywords are non-empty, the keywords enable filtering
of directory paths and file paths that do not contain the specified keywords.
The data integration capabilities are limited by our file reader, which can only access data from a list of
admissible file formats. These, however, can be extended. Directories are groups in the resulting HDF5 file.
Files are formatted as composite objects consisting of a group, file, and attributes.
Parameters:
output_filename (str): Name of the output HDF5 file.
input_file_system_path (str): Path to root directory, specified with forward slashes, e.g., path/to/root.
select_dir_keywords (list): List of string elements to consider or select only directory paths that contain
a word in 'select_dir_keywords'. When empty, all directory paths are considered
to be included in the HDF5 file group hierarchy.
select_file_keywords (list): List of string elements to consider or select only files that contain a word in
'select_file_keywords'. When empty, all files are considered to be stored in the HDF5 file.
top_sub_dir_mask (bool): Mask for top-level subdirectories.
root_metadata_dict (dict): Metadata to include at the root level of the HDF5 file.
ofilename (str):
input_file_system_path (str) : path to root directory, specified with forwards slashes, e.g., path/to/root
select_dir_keywords (list): default value [],
list of string elements to consider or select only directory paths that contain a word in 'select_dir_keywords'.
When empty, all directory paths are considered to be included in the hdf5 file group hierarchy.
select_file_keywords (list): default value [],
list of string elements to consider or select only files that contain a word in 'select_file_keywords'.
When empty, all files are considered to be stored in the hdf5 file.
Returns:
str: Path to the created HDF5 file.
"""
admissible_file_ext_list = list(ext_to_reader_dict.keys()) # list(config_file.select_file_readers(group_id).keys())
allowed_file_extensions = list(ext_to_reader_dict.keys()) # list(config_file.select_file_readers(group_id).keys())
if '/' in input_file_system_path:
input_file_system_path = input_file_system_path.replace('/',os.sep)
@ -284,136 +352,83 @@ def create_hdf5_file_from_filesystem_path(output_filename : str,
for i, keyword in enumerate(select_dir_keywords):
select_dir_keywords[i] = keyword.replace('/',os.sep)
# Visit each subdirectory from top to bottom, root directory defined by input_file_sytem_path to the lower
# level directories.
# Constrain walkable paths on the specified directory tree by allowing walks that start from root
# through subdirectories specified by dir_keywords. This improves efficiency especially, in deep
# directory trees with many leaves.
paths = []
if top_sub_dir_mask:
for item in os.listdir(input_file_system_path):
if any([item in keyword for keyword in select_dir_keywords]):
paths.append(os.path.join(input_file_system_path,item))
else:
paths.append(input_file_system_path)
# Copy input_directory into the output_dir_path, and work with it from now on
output_dir_path = os.path.splitext(output_filename)[0].replace('/',os.sep)
path_to_filenames_dict = copy_directory_with_contraints(input_file_system_path,
output_dir_path,
select_dir_keywords,
select_file_keywords,
allowed_file_extensions)
# Set input_directory as copied input directory
root_dir = output_dir_path
with h5py.File(output_filename, 'w') as h5file:
for item in paths:
for dirpath, filtered_filenames_list in path_to_filenames_dict.items():
# Check if dirpath is valid. TODO: This is perhaps redundant by design of path_to_filenames_dict.
if not is_valid_directory_path(dirpath,select_dir_keywords):
continue
# Check if filtered_filenames_list is nonempty. TODO: This is perhaps redundant by design of path_to_filenames_dict.
if not filtered_filenames_list:
continue
root_dir = input_file_system_path
group_name = dirpath.replace(os.sep,'/')
group_name = group_name.replace(root_dir.replace(os.sep,'/') + '/', '/')
# Create dictionary with directory-files pairs where files satisfy keyword and admisible type contraints
# It requires an extra pass over directory three and additional memory for dictionary, but it may be useful
# to speed up subsequent step and prune resulting directory tree.
# Flatten group name to one level
offset = sum([len(i.split(os.sep)) if i in dirpath else 0 for i in select_dir_keywords])
tmp_list = group_name.split('/')
if len(tmp_list) > offset+1:
group_name = '/'.join([tmp_list[i] for i in range(offset+1)])
# For each directory and/or subdirectory, keep files that satisfy file_keyword constraints, and store
# (directory_path, suitable files) relationships in a dictionary.
file_paths_dict = {}
# Group hierarchy is implicitly defined by the forward slashes
if not group_name in h5file.keys():
h5file.create_group(group_name)
#h5file[group_name].attrs.create(name='filtered_file_list',data=convert_string_to_bytes(filtered_filename_list))
#h5file[group_name].attrs.create(name='file_list',data=convert_string_to_bytes(filenames_list))
else:
print(group_name,' was already created.')
#check_file_ext = lambda filename: any([ext in filename for ext in admissible_file_ext_list])
check_file_ext = lambda filename: os.path.splitext(filename)[1] in admissible_file_ext_list
for filenumber, filename in enumerate(filtered_filenames_list):
file_name, file_ext = os.path.splitext(filename)
#try:
if not 'h5' in filename:
#file_dict = config_file.select_file_readers(group_id)[file_ext](os.path.join(dirpath,filename))
file_dict = ext_to_reader_dict[file_ext](os.path.join(dirpath,filename))
for dirpath, _, filenames in os.walk(item,topdown=False):
file_paths_dict[dirpath] = []
# Keep files that have an admissible extension and store them in admissible_filenames list
admissible_filenames = []
for fn in filenames:
if check_file_ext(fn):
admissible_filenames.append(fn)
if select_file_keywords: # when select_file_keywords = [], all files are considered
#for filename in admissible_filenames:
for i in range(len(admissible_filenames) - 1, -1, -1):
filename = admissible_filenames[i]
# Remove files that with filename, not adhering to file keyword constraints.
if not any(keyword in filename for keyword in select_file_keywords):
admissible_filenames.pop(i)
file_paths_dict[dirpath] = admissible_filenames
else:
file_paths_dict[dirpath] = admissible_filenames
for node_number, node in enumerate(os.walk(item, topdown=True)):
dirpath, dirnames, filenames_list = node
filtered_filename_list = file_paths_dict.get(dirpath,filenames_list.copy())
# Skip subdirectories that do not contain a keyword in the parameter 'select_dir_keywords' when it is nonempty
if select_dir_keywords:
#if (dirpath.count(os.sep) > offset) and not any([item in dirpath for item in select_dir_keywords]):
#tail, dirname = os.path.split(dirpath)
#if not any([item in dirname for item in select_dir_keywords]):
#if not any([item in dirpath for item in select_dir_keywords]):
if not is_valid_directory_path(dirpath,select_dir_keywords):
if not file_dict:
continue
group_name = dirpath.replace(os.sep,'/')
group_name = group_name.replace(root_dir.replace(os.sep,'/') + '/', '/')
# flatten group name to one level
offset = sum([len(i.split(os.sep)) if i in dirpath else 0 for i in select_dir_keywords])
tmp_list = group_name.split('/')
if len(tmp_list) > offset+1:
group_name = '/'.join([tmp_list[i] for i in range(offset+1)])
# Group hierarchy is implicitly defined by the forward slashes
if not group_name in h5file.keys():
h5file.create_group(group_name)
h5file[group_name].attrs.create(name='filtered_file_list',data=filtered_filename_list)
h5file[group_name].attrs.create(name='file_list',data=filenames_list)
else:
print(group_name,' was already created.')
for filenumber, filename in enumerate(filtered_filename_list):
try:
# Create group and add their attributes
h5file[group_name].create_group(name=file_dict['name'])
# Add group attributes
h5file[group_name][file_dict['name']].attrs.update(file_dict['attributes_dict'])
# Add datasets to just created group
for dataset in file_dict['datasets']:
h5file[group_name][file_dict['name']].create_dataset(name = dataset['name'],
data = dataset['data'],
#dtype = file_dict['dtype'],
shape = dataset['shape'])
# Add dataset's attributes
attributes = dataset.get('attributes', {})
h5file[group_name][file_dict['name']][dataset['name']].attrs.update(attributes)
except Exception as inst:
# TODO: log when a file could not be stored as a dataset
print(inst)
# Get file extension (or file type)
file_name, file_ext = os.path.splitext(filename)
#print(filename)
#try:
if not 'h5' in filename:
#file_dict = config_file.select_file_readers(group_id)[file_ext](os.path.join(dirpath,filename))
file_dict = ext_to_reader_dict[file_ext](os.path.join(dirpath,filename))
if not file_dict:
continue
try:
# Create group and add their attributes
h5file[group_name].create_group(name=file_dict['name'])
# Add group attributes
h5file[group_name][file_dict['name']].attrs.update(file_dict['attributes_dict'])
# Add datasets to just created group
for dataset in file_dict['datasets']:
h5file[group_name][file_dict['name']].create_dataset(name = dataset['name'],
data = dataset['data'],
#dtype = file_dict['dtype'],
shape = dataset['shape'])
# Add dataset's attributes
attributes = dataset.get('attributes', {})
h5file[group_name][file_dict['name']][dataset['name']].attrs.update(attributes)
except Exception as inst:
# TODO: log when a file could not be stored as a dataset
print(inst)
else:
ext_to_reader_dict[file_ext](source_file_path = os.path.join(dirpath,filename),
dest_file_obj = h5file,
dest_group_name = group_name +'/'+filename)
#print(filename,file_ext, ':)')
else:
ext_to_reader_dict[file_ext](source_file_path = os.path.join(dirpath,filename),
dest_file_obj = h5file,
dest_group_name = group_name +'/'+filename)
#print(filename,file_ext, ':)')
progressBar(filenumber,len(filtered_filename_list), 'Uploading files in ' + dirpath)
progressBar(filenumber,len(filtered_filenames_list), 'Uploading files in ' + dirpath)
if len(root_metadata_dict.keys())>0: