Renamed the main functions for the sake of clarity and transfered a few functions to the g5505_utils.py module.

This commit is contained in:
2024-02-13 16:40:52 +01:00
parent d62327ba25
commit 03dcc62f9a

View File

@ -11,6 +11,9 @@ import plotly.graph_objects as go
from plotly.subplots import make_subplots
import g5505_file_reader
import g5505_utils as utils
import smog_chamber_group_reader
def read_mtable_as_dataframe(filename):
@ -100,12 +103,6 @@ def create_group_hierarchy(obj, df, columns):
# print('Here:',sub_df.shape)
create_group_hierarchy(group, sub_df, columns[1::])
def is_callable_list(x : list):
return all([callable(item) for item in x])
def is_str_list(x : list):
return all([isinstance(item,str) for item in x])
def is_nested_hierarchy(df) -> bool:
"""receives a dataframe with categorical columns and checks whether rows form a nested group hierarchy.
That is, from bottom to top, subsequent hierarchical levels contain nested groups. The lower level groups belong to exactly one group in the higher level group.
@ -148,7 +145,11 @@ def get_parent_child_relationships(file: h5py.File):
nodes = ['/']
parent = ['']
#values = [file.attrs['count']]
values = [len(file.attrs['file_list'])]
# TODO: maybe we should make this more general and not dependent on file_list attribute?
if 'file_list' in file.attrs.keys():
values = [len(file.attrs['file_list'])]
else:
values = [1]
def node_visitor(name,obj):
#if isinstance(obj,h5py.Group):
@ -156,11 +157,10 @@ def get_parent_child_relationships(file: h5py.File):
parent.append(obj.parent.name)
#nodes.append(os.path.split(obj.name)[1])
#parent.append(os.path.split(obj.parent.name)[1])
if isinstance(obj,h5py.Dataset):
if isinstance(obj,h5py.Dataset) or not 'file_list' in obj.attrs.keys():
values.append(1)
else:
values.append(len(obj.attrs['file_list']))
file.visititems(node_visitor)
return nodes, parent, values
@ -233,48 +233,87 @@ def annotate_root_dir(filename,annotation_dict: dict):
file.attrs.create('metadata_'+key, annotation_dict[key])
import shutil
def create_hdf5_file_from_filesystem_path(ofilename,input_file_system_path, include_list = []):
def create_hdf5_file_from_filesystem_path(ofilename : str, input_file_system_path : str, select_dir_keywords = [], select_file_keywords =[]):
"""
Creates an .h5 file with name ofilename that preserves the directory tree (or folder structure) of given a filesystem path and
a few file and directory keywords. The keywords enable filtering of directories and files that do not contain the specified keywords.
In the .h5 file, only files that are admissible file formats will be stored in the form of datasets and attributes.
Parameters:
ofilename (str):
input_file_system_path (str) :
include_list (optional)(list): list of string elements, which keeps all directory_paths containing any of the words (string values) in include_list.
When left empty, all directory paths are considered to be included in the hdf5 file group hierarchy.
select_dir_keywords (list): default value [],
list of string elements to consider or select only directory paths that contain a word in 'select_dir_keywords'.
When empty, all directory paths are considered to be included in the hdf5 file group hierarchy.
select_file_keywords (list): default value [],
list of string elements to consider or select only files that contain a word in 'select_file_keywords'.
When empty, all files are considered to be stored in the hdf5 file.
Returns:
"""
with h5py.File(ofilename, 'w') as h5file:
root_dir = '?##'
# loops over (or visits each) subdirectories from root directory defined by input_file_sytem_path to the lower
#level subfolders
for node_number, node in enumerate(os.walk(input_file_system_path,topdown=True)):
# Visit each subdirectory from top to bottom, root directory defined by input_file_sytem_path to the lower
# level directories.
for node_number, node in enumerate(os.walk(input_file_system_path, topdown=True)):
dirpath, dirnames, filenames_list = node
# if include_list is nonempty, filter out any directory path that does not contain the key words in include_list.
# TODO: explain better in fuction documentation
if (node_number > 0) and (len(include_list) > 0):
if not any([item in dirpath for item in include_list]):
continue
if node_number == 0:
offset = dirpath.count(os.sep)
# Filter out files with filenames not containing a keyword specified in the parameter 'select_file_keywords'.
# When select_file_keywords is an empty, i.e., [], do not apply any filter on the filenames.
if select_file_keywords:
filtered_filename_list = []
for filename in filenames_list:
if any([date in filename for date in select_file_keywords]):
filtered_filename_list.append(filename)
else:
filtered_filename_list = filenames_list.copy()
# Skip subdirectories that do not contain a keyword in the parameter 'select_dir_keywords' when it is nonempty
if select_dir_keywords:
if (dirpath.count(os.sep) > offset) and not any([item in dirpath for item in select_dir_keywords]):
continue
# TODO: i think the below lines can be simplified, or based on the enumeration there is no need for conditionals
group_name = dirpath.replace(os.sep,'/')
if root_dir == '?##':
# Set root_dir to top directory path in input file system
root_dir = group_name
group_name = group_name.replace(root_dir,'/')
#h5file.attrs.create(name='count',data=len(filenames_list))
h5file.attrs.create(name='file_list',data=filenames_list)
h5file.attrs.create(name='file_list',data=filtered_filename_list)
else:
group_name = group_name.replace(root_dir+'/','/')
# Group hierarchy is implicitly defined by the forward slashes
h5file.create_group(group_name)
h5file[group_name].attrs.create(name='file_list',data=filenames_list)
h5file[group_name].attrs.create(name='file_list',data=filtered_filename_list)
# TODO: for each "admissible" file in filenames, create an associated dataset in the corresponding group (subdirectory)
# TODO: for each "admissible" file in filenames, create an associated dataset in the corresponding group (subdirectory)
for filename in filenames_list:
tmp_dirpath = os.path.join(os.getcwd(), 'tmp')
if not os.path.exists(tmp_dirpath):
os.mkdir(tmp_dirpath)
for filename in filtered_filename_list:
if 'ibw' in filename:
file_dict = g5505_file_reader.read_xps_ibw_file_as_dict(os.path.join(dirpath,filename))
@ -289,8 +328,43 @@ def create_hdf5_file_from_filesystem_path(ofilename,input_file_system_path, incl
for key in file_dict['attributes_dict'].keys():
h5file[group_name][file_dict['name']].attrs.create(name=key,data=file_dict['attributes_dict'][key])
if 'h5' in filename:
def create_hdf5_file(ofilename, input_data, approach : str, group_by_funcs : list, extract_attrs_func = None):
# Create copy of original file to avoid possible file corruption and work with it.
backup_filename = 'backup_'+filename
# Path
shutil.copy(os.path.join(dirpath,filename), os.path.join(tmp_dirpath,backup_filename))
# Open backup h5 file and copy complet filesystem directory onto a group in h5file
with h5py.File(os.path.join(tmp_dirpath,backup_filename),'r') as src_file:
h5file.copy(source=src_file['/'],dest= group_name +'/'+filename)
# TODO: generilize to multiphase chemistry text and dat files
# TODO: include header information from files as well
if ('txt' in filename or 'TXT' in filename) and any([item in os.path.join(dirpath,filename) for item in ['smps','gas']]):
if 'smps' in os.path.join(dirpath,filename):
file_dict = smog_chamber_group_reader.read_smog_chamber_txt_files_as_dict(os.path.join(dirpath,filename),'smps')
elif 'gas' in os.path.join(dirpath,filename):
file_dict = smog_chamber_group_reader.read_smog_chamber_txt_files_as_dict(os.path.join(dirpath,filename),'gas')
# TODO: create datasets of compound data type to include variable/or column names and datetimestamps
h5file[group_name].create_group(filename)
h5file[group_name][filename].create_dataset(name = 'data',
data = file_dict['data'],
#dtype = file_dict['dtype'],
shape = file_dict['data'].shape)
h5file[group_name][filename].create_dataset(name = 'data_column_names',
data = np.array(file_dict['data_column_names']),
#dtype = file_dict['dtype'],
shape = np.array(file_dict['data_column_names']).shape)
for key in file_dict['categ_data_dict'].keys():
h5file[group_name][filename].create_dataset(name=key,data=file_dict['categ_data_dict'][key])
def create_hdf5_file_from_dataframe(ofilename, input_data, approach : str, group_by_funcs : list, extract_attrs_func = None):
""" Creates an hdf5 file with as many levels as indicated by len(group_by_funcs).
Top level denotes the root group/directory and bottom level denotes measurement level groups.
@ -321,7 +395,7 @@ def create_hdf5_file(ofilename, input_data, approach : str, group_by_funcs : lis
#df = pd.DataFrame(file_list,columns=['filename'])
df = augment_with_filetype(df)
df = utils.augment_with_filetype(df)
elif isinstance(input_data,pd.DataFrame):
df = input_data.copy()
@ -329,12 +403,12 @@ def create_hdf5_file(ofilename, input_data, approach : str, group_by_funcs : lis
raise ValueError("input_data must be either a valid file-system path or a dataframe.")
#
if is_callable_list(group_by_funcs):
if utils.is_callable_list(group_by_funcs):
grouping_cols = []
for i, func in enumerate(group_by_funcs):
grouping_cols.append('level_'+str(i)+'_groups')
df['level_'+str(i)+'_groups'] = func(df)
elif is_str_list(group_by_funcs) and all([item in df.columns for item in group_by_funcs]):
elif utils.is_str_list(group_by_funcs) and all([item in df.columns for item in group_by_funcs]):
grouping_cols = group_by_funcs
else:
raise ValueError("'group_by_funcs' must be a list of callables (or str) that takes input_data as input an returns a valid categorical output.")
@ -391,79 +465,39 @@ def create_hdf5_file(ofilename, input_data, approach : str, group_by_funcs : lis
#return 0
def augment_with_filetype(df):
df['filetype'] = [os.path.splitext(item)[1][1::] for item in df['filename']]
#return [os.path.splitext(item)[1][1::] for item in df['filename']]
return df
def augment_with_filenumber(df):
df['filenumber'] = [item[0:item.find('_')] for item in df['filename']]
#return [item[0:item.find('_')] for item in df['filename']]
return df
def group_by_df_column(df, column_name: str):
"""
df (pandas.DataFrame):
column_name (str): column_name of df by which grouping operation will take place.
"""
if not column_name in df.columns:
raise ValueError("column_name must be in the columns of df.")
return df[column_name]
def split_sample_col_into_sample_and_data_quality_cols(input_data: pd.DataFrame):
sample_name = []
sample_quality = []
for item in input_data['sample']:
if item.find('(')!=-1:
#print(item)
sample_name.append(item[0:item.find('(')])
sample_quality.append(item[item.find('(')+1:len(item)-1])
else:
if item=='':
sample_name.append('Not yet annotated')
sample_quality.append('unevaluated')
else:
sample_name.append(item)
sample_quality.append('good data')
input_data['sample'] = sample_name
input_data['data_quality'] = sample_quality
return input_data
def main1():
def main_5505():
inputfile_dir = '\\\\fs101\\5505\\People\\Juan\\TypicalBeamTime'
#ibw_file = loadibw(inputfile_dir+'\\SES\\0069069_N1s_495eV.ibw')
file_dict = g5505_file_reader.read_xps_ibw_file_as_dict(inputfile_dir+'\\SES\\0069069_N1s_495eV.ibw')
group_by_type = lambda x : utils.group_by_df_column(x,'filetype')
group_by_type = lambda x : group_by_df_column(x,'filetype')
include_list = ['NEXAFS', 'Notes', 'Photos', 'Pressure', 'RGA', 'SES']
select_dir_keywords = ['NEXAFS', 'Notes', 'Photos', 'Pressure', 'RGA', 'SES']
create_hdf5_file_from_filesystem_path('test_sls_data.h5',inputfile_dir,select_dir_keywords,select_file_keywords=[])
display_group_hierarchy_on_a_treemap('test_smog_chamber_v5.h5')
#inputfile_dir = '\\\\fs03\\Iron_Sulphate'
#include_list = ['htof','ams', 'ptr', 'gas','smps']
create_hdf5_file_from_filesystem_path('test3.h5',inputfile_dir,include_list)
display_group_hierarchy_on_a_treemap('test3.h5')
#create_hdf5_file('test', inputfile_dir, 'Topdown', [group_by_type], extract_attrs_func = None)
def main2():
def main_smog_chamber():
inputfile_dir = '\\\\fs03\\Iron_Sulphate'
include_list = ['htof','ams', 'ptr', 'gas','smps']
include_list = ['gas','smps\\20220726','htof\\2022.07.26','ptr\\2022.07.26','ams\\2022.07.26']
select_date_list = ['20220726','2022.07.26']
create_hdf5_file_from_filesystem_path('test_smog_chamber_v5.h5',inputfile_dir,include_list,select_date_list)
display_group_hierarchy_on_a_treemap('test_smog_chamber_v5.h5')
def main_mtable_h5_from_dataframe():
# Read BeamTimeMetaData.h5, containing Thorsten's Matlab Table
input_data_df = read_mtable_as_dataframe('input_files\\BeamTimeMetaData.h5')
# Preprocess Thorsten's input_data dataframe so that i can be used to create a newer .h5 file
# under certain grouping specificiations.
input_data_df = input_data_df.rename(columns = {'name':'filename'})
input_data_df = augment_with_filenumber(input_data_df)
input_data_df = augment_with_filetype(input_data_df)
input_data_df = split_sample_col_into_sample_and_data_quality_cols(input_data_df)
input_data_df = utils.augment_with_filenumber(input_data_df)
input_data_df = utils.augment_with_filetype(input_data_df)
input_data_df = utils.split_sample_col_into_sample_and_data_quality_cols(input_data_df)
input_data_df['lastModifiedDatestr'] = input_data_df['lastModifiedDatestr'].astype('datetime64[s]')
# Define grouping functions to be passed into create_hdf5_file function. These can also be set
@ -471,15 +505,15 @@ def main2():
test_grouping_funcs = True
if test_grouping_funcs:
group_by_sample = lambda x : group_by_df_column(x,'sample')
group_by_type = lambda x : group_by_df_column(x,'filetype')
group_by_filenumber = lambda x : group_by_df_column(x,'filenumber')
group_by_sample = lambda x : utils.group_by_df_column(x,'sample')
group_by_type = lambda x : utils.group_by_df_column(x,'filetype')
group_by_filenumber = lambda x : utils.group_by_df_column(x,'filenumber')
else:
group_by_sample = 'sample'
group_by_type = 'filetype'
group_by_filenumber = 'filenumber'
create_hdf5_file('test.h5',input_data_df, 'top-down', group_by_funcs = [group_by_sample, group_by_type, group_by_filenumber])
create_hdf5_file_from_dataframe('test.h5',input_data_df, 'top-down', group_by_funcs = [group_by_sample, group_by_type, group_by_filenumber])
annotation_dict = {'Campaign name': 'SLS-Campaign-2023',
'Users':'Thorsten, Luca, Zoe',
@ -495,7 +529,7 @@ def main2():
if __name__ == '__main__':
main1()
main_mtable_h5_from_dataframe()
print(':)')