Simplified code and corrected buggy if statement. Included input verification steps and OS path normalization.

This commit is contained in:
2024-03-19 11:11:05 +01:00
parent afa89df143
commit b886066133

View File

@ -92,14 +92,16 @@ def create_group_hierarchy(obj, df, columns):
if obj.name == '/':
obj.attrs.create('count',df.shape[0])
obj.attrs.create('file_list',df['filename'].tolist())
for group_name in unique_values:
group = obj.require_group(group_name)
group.attrs.create('column_name', columns[0])
group.attrs.create('column_name', columns[0])
sub_df = df[df[columns[0]]==group_name] # same as df.loc[df[columns[0]]==group_name,:]
group.attrs.create('count',sub_df.shape[0])
group.attrs.create('file_list',sub_df['filename'].tolist())
# if group_name == 'MgO powder,H2O,HCl':
# print('Here:',sub_df.shape)
@ -239,7 +241,10 @@ def annotate_root_dir(filename,annotation_dict: dict):
import shutil
def create_hdf5_file_from_filesystem_path(ofilename : str, input_file_system_path : str, select_dir_keywords = [], select_file_keywords =[]):
def create_hdf5_file_from_filesystem_path(ofilename : str, input_file_system_path :
str, select_dir_keywords = [],
select_file_keywords =[],
top_sub_dir_mask : bool = True):
"""
Creates an .h5 file with name ofilename that preserves the directory tree (or folder structure) of given a filesystem path and
@ -251,7 +256,7 @@ def create_hdf5_file_from_filesystem_path(ofilename : str, input_file_system_pat
ofilename (str):
input_file_system_path (str) :
input_file_system_path (str) : path to root directory, specified with forwards slashes, e.g., path/to/root
select_dir_keywords (list): default value [],
list of string elements to consider or select only directory paths that contain a word in 'select_dir_keywords'.
@ -266,93 +271,111 @@ def create_hdf5_file_from_filesystem_path(ofilename : str, input_file_system_pat
"""
# Ensure OS compliant paths and keywords
if '/' in input_file_system_path:
input_file_system_path = input_file_system_path.replace('/',os.sep)
else:
raise ValueError('input_file_system_path needs to be specified using forward slashes "/".' )
for i, keyword in enumerate(select_dir_keywords):
select_dir_keywords[i] = keyword.replace('/',os.sep)
with h5py.File(ofilename, 'w') as h5file:
root_dir = '?##'
# Visit each subdirectory from top to bottom, root directory defined by input_file_sytem_path to the lower
# level directories.
for node_number, node in enumerate(os.walk(input_file_system_path, topdown=True)):
dirpath, dirnames, filenames_list = node
# Constrain walkable paths on the specified directory tree by allowing walks that start from root
# through subdirectories specified by dir_keywords. This improves efficiency especially, in deep
# directory trees with many leaves.
paths = []
if top_sub_dir_mask:
for item in os.listdir(input_file_system_path):
if any([item in keyword for keyword in select_dir_keywords]):
paths.append(os.path.join(input_file_system_path,item))
else:
paths.append(input_file_system_path)
if node_number == 0:
offset = dirpath.count(os.sep)
# Filter out files with filenames not containing a keyword specified in the parameter 'select_file_keywords'.
# When select_file_keywords is an empty, i.e., [], do not apply any filter on the filenames.
for item in paths:
root_dir = input_file_system_path
for node_number, node in enumerate(os.walk(item, topdown=True)):
dirpath, dirnames, filenames_list = node
#if node_number == 0:
# offset = dirpath.count(os.sep)
filtered_filename_list = []
if select_file_keywords:
for filename in filenames_list:
if any([keyword in filename for keyword in select_file_keywords]):
filtered_filename_list.append(filename)
else:
filtered_filename_list = filenames_list.copy()
# Filter out files with filenames not containing a keyword specified in the parameter 'select_file_keywords'.
# When select_file_keywords is an empty, i.e., [], do not apply any filter on the filenames.
filtered_filename_list = []
if select_file_keywords:
for filename in filenames_list:
if any([keyword in filename for keyword in select_file_keywords]):
filtered_filename_list.append(filename)
else:
filtered_filename_list = filenames_list.copy()
admissible_file_ext_list = list(config_file.ext_to_reader_dict.keys())
admissible_file_ext_list = list(config_file.ext_to_reader_dict.keys())
for filename in filtered_filename_list.copy():
if not any([ext in filename for ext in admissible_file_ext_list]):
filtered_filename_list.remove(filename)
for filename in filtered_filename_list.copy():
if not any([ext in filename for ext in admissible_file_ext_list]):
filtered_filename_list.remove(filename)
# Skip subdirectories that do not contain a keyword in the parameter 'select_dir_keywords' when it is nonempty
if select_dir_keywords:
if (dirpath.count(os.sep) > offset) and not any([item in dirpath for item in select_dir_keywords]):
continue
# Skip subdirectories that do not contain a keyword in the parameter 'select_dir_keywords' when it is nonempty
if select_dir_keywords:
#if (dirpath.count(os.sep) > offset) and not any([item in dirpath for item in select_dir_keywords]):
if not any([item in dirpath for item in select_dir_keywords]):
continue
# TODO: i think the below lines can be simplified, or based on the enumeration there is no need for conditionals
group_name = dirpath.replace(os.sep,'/')
if root_dir == '?##':
# Set root_dir to top directory path in input file system
root_dir = group_name
group_name = group_name.replace(root_dir,'/')
h5file.attrs.create(name='filtered_file_list',data=filtered_filename_list)
h5file.attrs.create(name='file_list',data=filenames_list)
else:
group_name = group_name.replace(root_dir+'/','/')
group_name = dirpath.replace(os.sep,'/')
group_name = group_name.replace(root_dir.replace(os.sep,'/') + '/', '/')
# Group hierarchy is implicitly defined by the forward slashes
h5file.create_group(group_name)
h5file[group_name].attrs.create(name='filtered_file_list',data=filtered_filename_list)
h5file[group_name].attrs.create(name='file_list',data=filenames_list)
# TODO: for each "admissible" file in filenames, create an associated dataset in the corresponding group (subdirectory)
for filename in filtered_filename_list:
# TODO: for each "admissible" file in filenames, create an associated dataset in the corresponding group (subdirectory)
# Get file extension (or file type)
file_name, file_ext = os.path.splitext(filename)
#try:
if not 'h5' in filename:
file_dict = config_file.ext_to_reader_dict[file_ext](os.path.join(dirpath,filename))
if not file_dict:
continue
# file_dict = file_obj
# Create group and add their attributes
h5file[group_name].create_group(name=file_dict['name'])
for key in file_dict['attributes_dict'].keys():
h5file[group_name][file_dict['name']].attrs.create(name=key,data=file_dict['attributes_dict'][key])
# Add datasets to just created group
for dataset in file_dict['datasets']:
h5file[group_name][file_dict['name']].create_dataset(name = dataset['name'],
data = dataset['data'],
#dtype = file_dict['dtype'],
shape = dataset['shape'])
for filename in filtered_filename_list:
else:
config_file.ext_to_reader_dict[file_ext](source_file_path = os.path.join(dirpath,filename),
dest_file_obj = h5file,
dest_group_name = group_name +'/'+filename)
print(file_ext, ':)')
# Get file extension (or file type)
file_name, file_ext = os.path.splitext(filename)
#try:
if not 'h5' in filename:
file_dict = config_file.ext_to_reader_dict[file_ext](os.path.join(dirpath,filename))
if not file_dict:
continue
# file_dict = file_obj
# Create group and add their attributes
h5file[group_name].create_group(name=file_dict['name'])
for key in file_dict['attributes_dict'].keys():
h5file[group_name][file_dict['name']].attrs.create(name=key,data=file_dict['attributes_dict'][key])
# Add datasets to just created group
for dataset in file_dict['datasets']:
h5file[group_name][file_dict['name']].create_dataset(name = dataset['name'],
data = dataset['data'],
#dtype = file_dict['dtype'],
shape = dataset['shape'])
else:
config_file.ext_to_reader_dict[file_ext](source_file_path = os.path.join(dirpath,filename),
dest_file_obj = h5file,
dest_group_name = group_name +'/'+filename)
print(file_ext, ':)')
@ -489,22 +512,25 @@ def main_mtable_h5_from_dataframe():
if test_grouping_funcs:
group_by_sample = lambda x : utils.group_by_df_column(x,'sample')
group_by_type = lambda x : utils.group_by_df_column(x,'filetype')
group_by_filenumber = lambda x : utils.group_by_df_column(x,'filenumber')
#group_by_filenumber = lambda x : utils.group_by_df_column(x,'filenumber')
else:
group_by_sample = 'sample'
group_by_type = 'filetype'
group_by_filenumber = 'filenumber'
create_hdf5_file_from_dataframe('test.h5',input_data_df, 'top-down', group_by_funcs = [group_by_sample, group_by_type, group_by_filenumber])
output_filename_path = os.path.join(config_file.outputfile_dir,'thorsten_file_list.h5')
annotation_dict = {'Campaign name': 'SLS-Campaign-2023',
'Users':'Thorsten, Luca, Zoe',
'Startdate': str(input_data_df['lastModifiedDatestr'].min()),
'Enddate': str(input_data_df['lastModifiedDatestr'].max())
create_hdf5_file_from_dataframe(output_filename_path,input_data_df, 'top-down', group_by_funcs = [group_by_sample, group_by_type])
#create_hdf5_file_from_dataframe('test.h5',input_data_df, 'top-down', group_by_funcs = [group_by_sample, group_by_type, group_by_filenumber])
annotation_dict = {'1-Campaign name': '**SLS-Campaign-2023**',
'2-Users':'Thorsten, Luca, Zoe',
'3-Startdate': str(input_data_df['lastModifiedDatestr'].min()),
'4-Enddate': str(input_data_df['lastModifiedDatestr'].max())
}
annotate_root_dir('test.h5',annotation_dict)
annotate_root_dir(output_filename_path, annotation_dict)
display_group_hierarchy_on_a_treemap('test.h5')
display_group_hierarchy_on_a_treemap(output_filename_path)
print(':)')