diff --git a/g5505_utils.py b/g5505_utils.py deleted file mode 100644 index 103f529..0000000 --- a/g5505_utils.py +++ /dev/null @@ -1,51 +0,0 @@ -import pandas as pd -import os - - -def is_callable_list(x : list): - return all([callable(item) for item in x]) - -def is_str_list(x : list): - return all([isinstance(item,str) for item in x]) - -def augment_with_filetype(df): - df['filetype'] = [os.path.splitext(item)[1][1::] for item in df['filename']] - #return [os.path.splitext(item)[1][1::] for item in df['filename']] - return df - -def augment_with_filenumber(df): - df['filenumber'] = [item[0:item.find('_')] for item in df['filename']] - #return [item[0:item.find('_')] for item in df['filename']] - return df - -def group_by_df_column(df, column_name: str): - """ - df (pandas.DataFrame): - column_name (str): column_name of df by which grouping operation will take place. - """ - - if not column_name in df.columns: - raise ValueError("column_name must be in the columns of df.") - - return df[column_name] - -def split_sample_col_into_sample_and_data_quality_cols(input_data: pd.DataFrame): - - sample_name = [] - sample_quality = [] - for item in input_data['sample']: - if item.find('(')!=-1: - #print(item) - sample_name.append(item[0:item.find('(')]) - sample_quality.append(item[item.find('(')+1:len(item)-1]) - else: - if item=='': - sample_name.append('Not yet annotated') - sample_quality.append('unevaluated') - else: - sample_name.append(item) - sample_quality.append('good data') - input_data['sample'] = sample_name - input_data['data_quality'] = sample_quality - - return input_data diff --git a/hdf5_lib.py b/hdf5_lib.py deleted file mode 100644 index 1e83346..0000000 --- a/hdf5_lib.py +++ /dev/null @@ -1,535 +0,0 @@ -import pandas as pd -import h5py -import os -#import sys -#from itertools import product -import numpy as np - -import matplotlib.pyplot as plt -import plotly.express as px -import plotly.graph_objects as go -from plotly.subplots import make_subplots - -import g5505_file_reader -import g5505_utils as utils -import smog_chamber_group_reader - - -def read_mtable_as_dataframe(filename): - - """ Reconstruct a Matlab Table encoded in a .h5 file as a Pandas DataFrame. The input .h5 file - contains as many groups as rows in the Matlab Table, and each group stores dataset-like variables in the Table as - Datasets while categorical and numerical variables in the table are represented as attributes of each group. - - Note: DataFrame is constructed columnwise to ensure homogenous data columns. - - Parameters: - - filename (str): .h5 file's name. It may include location-path information. - - Returns: - - output_dataframe (pd.DataFrame): Matlab's Table as a Pandas DataFrame - - """ - - #contructs dataframe by filling out entries columnwise. This way we can ensure homogenous data columns""" - - with h5py.File(filename,'r') as file: - - # Define group's attributes and datasets. This should hold - # for all groups. TODO: implement verification and noncompliance error if needed. - group_list = list(file.keys()) - group_attrs = list(file[group_list[0]].attrs.keys()) - # - column_attr_names = [item[item.find('_')+1::] for item in group_attrs] - column_attr_names_idx = [int(item[4:(item.find('_'))]) for item in group_attrs] - - group_datasets = list(file[group_list[0]].keys()) if not 'DS_EMPTY' in file[group_list[0]].keys() else [] - # - column_dataset_names = [file[group_list[0]][item].attrs['column_name'] for item in group_datasets] - column_dataset_names_idx = [int(item[2:]) for item in group_datasets] - - - # Define data_frame as group_attrs + group_datasets - #pd_series_index = group_attrs + group_datasets - pd_series_index = column_attr_names + column_dataset_names - - output_dataframe = pd.DataFrame(columns=pd_series_index,index=group_list) - - tmp_col = [] - - for meas_prop in group_attrs + group_datasets: - if meas_prop in group_attrs: - column_label = meas_prop[meas_prop.find('_')+1:] - # Create numerical or categorical column from group's attributes - tmp_col = [file[group_key].attrs[meas_prop][()][0] for group_key in group_list] - else: - # Create dataset column from group's datasets - column_label = file[group_list[0] + '/' + meas_prop].attrs['column_name'] - #tmp_col = [file[group_key + '/' + meas_prop][()][0] for group_key in group_list] - tmp_col = [file[group_key + '/' + meas_prop][()] for group_key in group_list] - - output_dataframe.loc[:,column_label] = tmp_col - - return output_dataframe - -def create_group_hierarchy(obj, df, columns): - - """ - Input: - obj (h5py.File or h5py.Group) - columns (list of strs): denote categorical columns in df to be used to define hdf5 file group hierarchy - """ - - if not columns: - return - - # Determine categories associated with first categorical column - unique_values = df[columns[0]].unique() - - if obj.name == '/': - obj.attrs.create('count',df.shape[0]) - - for group_name in unique_values: - - group = obj.require_group(group_name) - group.attrs.create('column_name', columns[0]) - - sub_df = df[df[columns[0]]==group_name] # same as df.loc[df[columns[0]]==group_name,:] - group.attrs.create('count',sub_df.shape[0]) - - # if group_name == 'MgO powder,H2O,HCl': - # print('Here:',sub_df.shape) - create_group_hierarchy(group, sub_df, columns[1::]) - -def is_nested_hierarchy(df) -> bool: - """receives a dataframe with categorical columns and checks whether rows form a nested group hierarchy. - That is, from bottom to top, subsequent hierarchical levels contain nested groups. The lower level groups belong to exactly one group in the higher level group. - """ - # TODO: generalize the code to check for deeper group hierachies. - def are_nested(df, col, col_nxt): - """ Checks whether low level LL groups can be separated in terms of high level HL groups. - That is, elements of low-level groups do not belong to more than one HL group.""" - - # Compute higher level group names/categories - memberships = df[col_nxt].unique().tolist() - - # Compute upper-level group memberships of low-level groups - col_avg_memberships = df.groupby(col).mean()[col_nxt].unique() - - # Check whether all low-level groups have an actual hlg membership. That is, their avg. hlg membership is in the hlg membership. - return all([col_avg_memberships[group_idx] in memberships for group_idx in range(len(col_avg_memberships))]) - - df_tmp = df.copy() - - # Create relabeling map - for column_name in df_tmp.columns: - category_index = pd.Series(np.arange(len(df_tmp[column_name].unique())), index=df_tmp[column_name].unique()) - df_tmp[column_name] = category_index[df_tmp[column_name].tolist()].tolist() - - df_tmp.plot() - - return all([are_nested(df_tmp,'level_'+str(i)+'_groups','level_'+str(i+1)+'_groups') for i in range(len(df_tmp.columns)-1)]) - -def get_attr_names(input_data): - - # TODO: extend this to file-system paths - if not isinstance(input_data,pd.DataFrame): - raise ValueError("input_data must be a pd.DataFrame") - - return input_data.columns - -def get_parent_child_relationships(file: h5py.File): - - nodes = ['/'] - parent = [''] - #values = [file.attrs['count']] - # TODO: maybe we should make this more general and not dependent on file_list attribute? - if 'file_list' in file.attrs.keys(): - values = [len(file.attrs['file_list'])] - else: - values = [1] - - def node_visitor(name,obj): - #if isinstance(obj,h5py.Group): - nodes.append(obj.name) - parent.append(obj.parent.name) - #nodes.append(os.path.split(obj.name)[1]) - #parent.append(os.path.split(obj.parent.name)[1]) - if isinstance(obj,h5py.Dataset) or not 'file_list' in obj.attrs.keys(): - values.append(1) - else: - values.append(len(obj.attrs['file_list'])) - file.visititems(node_visitor) - - return nodes, parent, values - - -def get_groups_at_a_level(file: h5py.File, level: str): - - groups = [] - def node_selector(name, obj): - if name.count('/') == level: - print(name) - groups.append(obj.name) - - file.visititems(node_selector) - #file.visititems() - return groups - -def format_group_names(names: list): - - formated_names = [] - for name in names: - idx = name.rfind('/') - if len(name) > 1: - formated_names.append(name[idx+1::]) - else: - formated_names.append(name) - - return pd.DataFrame(formated_names,columns=['formated_names'],index=names) - - - -def display_group_hierarchy_on_a_treemap(filename: str): - - with h5py.File(filename,'r') as file: - nodes, parents, values = get_parent_child_relationships(file) - - metadata_list = [] - metadata_dict={} - for key in file.attrs.keys(): - if 'metadata' in key: - metadata_dict[key[key.find('_')+1::]]= file.attrs[key] - metadata_list.append(key[key.find('_')+1::]+':'+file.attrs[key]) - metadata = '
'.join(['
'] + metadata_list) - - customdata_series = pd.Series(nodes) - customdata_series[0] = metadata - - fig = make_subplots(1, 1, specs=[[{"type": "domain"}]],) - fig.add_trace(go.Treemap( - labels=nodes, #formating_df['formated_names'][nodes], - parents=parents,#formating_df['formated_names'][parents], - values=values, - branchvalues='remainder', - customdata= customdata_series, - #marker=dict( - # colors=df_all_trees['color'], - # colorscale='RdBu', - # cmid=average_score), - #hovertemplate='%{label}
Number of files: %{value}
Success rate: %{color:.2f}', - hovertemplate='%{label}
Count: %{value}
Path: %{customdata}', - name='', - root_color="lightgrey" - )) - fig.update_layout(width = 800, height= 600, margin = dict(t=50, l=25, r=25, b=25)) - fig.show() - -def annotate_root_dir(filename,annotation_dict: dict): - with h5py.File(filename,'r+') as file: - for key in annotation_dict: - file.attrs.create('metadata_'+key, annotation_dict[key]) - - -import shutil - -def create_hdf5_file_from_filesystem_path(ofilename : str, input_file_system_path : str, select_dir_keywords = [], select_file_keywords =[]): - - """ - Creates an .h5 file with name ofilename that preserves the directory tree (or folder structure) of given a filesystem path and - a few file and directory keywords. The keywords enable filtering of directories and files that do not contain the specified keywords. - - In the .h5 file, only files that are admissible file formats will be stored in the form of datasets and attributes. - - Parameters: - - ofilename (str): - - input_file_system_path (str) : - - select_dir_keywords (list): default value [], - list of string elements to consider or select only directory paths that contain a word in 'select_dir_keywords'. - When empty, all directory paths are considered to be included in the hdf5 file group hierarchy. - - select_file_keywords (list): default value [], - list of string elements to consider or select only files that contain a word in 'select_file_keywords'. - When empty, all files are considered to be stored in the hdf5 file. - - Returns: - - - """ - - - with h5py.File(ofilename, 'w') as h5file: - - root_dir = '?##' - - # Visit each subdirectory from top to bottom, root directory defined by input_file_sytem_path to the lower - # level directories. - for node_number, node in enumerate(os.walk(input_file_system_path, topdown=True)): - - dirpath, dirnames, filenames_list = node - - if node_number == 0: - offset = dirpath.count(os.sep) - - # Filter out files with filenames not containing a keyword specified in the parameter 'select_file_keywords'. - # When select_file_keywords is an empty, i.e., [], do not apply any filter on the filenames. - if select_file_keywords: - filtered_filename_list = [] - for filename in filenames_list: - if any([date in filename for date in select_file_keywords]): - filtered_filename_list.append(filename) - else: - filtered_filename_list = filenames_list.copy() - - # Skip subdirectories that do not contain a keyword in the parameter 'select_dir_keywords' when it is nonempty - if select_dir_keywords: - if (dirpath.count(os.sep) > offset) and not any([item in dirpath for item in select_dir_keywords]): - continue - - # TODO: i think the below lines can be simplified, or based on the enumeration there is no need for conditionals - group_name = dirpath.replace(os.sep,'/') - if root_dir == '?##': - # Set root_dir to top directory path in input file system - root_dir = group_name - group_name = group_name.replace(root_dir,'/') - #h5file.attrs.create(name='count',data=len(filenames_list)) - h5file.attrs.create(name='file_list',data=filtered_filename_list) - else: - group_name = group_name.replace(root_dir+'/','/') - # Group hierarchy is implicitly defined by the forward slashes - h5file.create_group(group_name) - h5file[group_name].attrs.create(name='file_list',data=filtered_filename_list) - - - # TODO: for each "admissible" file in filenames, create an associated dataset in the corresponding group (subdirectory) - - tmp_dirpath = os.path.join(os.getcwd(), 'tmp') - - if not os.path.exists(tmp_dirpath): - os.mkdir(tmp_dirpath) - - for filename in filtered_filename_list: - - if 'ibw' in filename: - file_dict = g5505_file_reader.read_xps_ibw_file_as_dict(os.path.join(dirpath,filename)) - - h5file[group_name].create_dataset(name = file_dict['name'], - data = file_dict['data'], - #dtype = file_dict['dtype'], - shape = file_dict['shape']) - - #h5file[group_name][file_dict['name']].dims[0] = file_dict['dimension_units'] - - for key in file_dict['attributes_dict'].keys(): - h5file[group_name][file_dict['name']].attrs.create(name=key,data=file_dict['attributes_dict'][key]) - - if 'h5' in filename: - - # Create copy of original file to avoid possible file corruption and work with it. - backup_filename = 'backup_'+filename - # Path - - shutil.copy(os.path.join(dirpath,filename), os.path.join(tmp_dirpath,backup_filename)) - # Open backup h5 file and copy complet filesystem directory onto a group in h5file - with h5py.File(os.path.join(tmp_dirpath,backup_filename),'r') as src_file: - h5file.copy(source=src_file['/'],dest= group_name +'/'+filename) - - # TODO: generilize to multiphase chemistry text and dat files - # TODO: include header information from files as well - if ('txt' in filename or 'TXT' in filename) and any([item in os.path.join(dirpath,filename) for item in ['smps','gas']]): - if 'smps' in os.path.join(dirpath,filename): - file_dict = smog_chamber_group_reader.read_smog_chamber_txt_files_as_dict(os.path.join(dirpath,filename),'smps') - elif 'gas' in os.path.join(dirpath,filename): - file_dict = smog_chamber_group_reader.read_smog_chamber_txt_files_as_dict(os.path.join(dirpath,filename),'gas') - - # TODO: create datasets of compound data type to include variable/or column names and datetimestamps - h5file[group_name].create_group(filename) - h5file[group_name][filename].create_dataset(name = 'data', - data = file_dict['data'], - #dtype = file_dict['dtype'], - shape = file_dict['data'].shape) - - - h5file[group_name][filename].create_dataset(name = 'data_column_names', - data = np.array(file_dict['data_column_names']), - #dtype = file_dict['dtype'], - shape = np.array(file_dict['data_column_names']).shape) - - for key in file_dict['categ_data_dict'].keys(): - h5file[group_name][filename].create_dataset(name=key,data=file_dict['categ_data_dict'][key]) - - -def create_hdf5_file_from_dataframe(ofilename, input_data, approach : str, group_by_funcs : list, extract_attrs_func = None): - - """ Creates an hdf5 file with as many levels as indicated by len(group_by_funcs). - Top level denotes the root group/directory and bottom level denotes measurement level groups. - - Parameters: - input_data (pd.DataFrame | file-system path) : - group_by_funcs (list of callables or strs) : contains a list of callables or dataframe's column names that will be used - to partition or group files from top to bottom. - - Callables in the list must assign a categorical value to each file in a file list, internally represented as a DataFrame, - and they thus return a pd.Series of categorical values. - - On the other hand, strings in the list refer to the name of categorical columns in the input_data (when this is a DataFrame) - - Returns: - - """ - - # Check whether input_data is a valid file-system path or a DataFrame - is_valid_path = lambda x : os.path.exists(input_data) if isinstance(input_data,str) else False - - if is_valid_path(input_data): - - file_list = os.listdir(input_data) - - # Navigates file-system folders/directories from top to bottom. - #for dirpath, dirnames, filenames in os.walk(input_data,topdown=True): - - - #df = pd.DataFrame(file_list,columns=['filename']) - df = utils.augment_with_filetype(df) - - elif isinstance(input_data,pd.DataFrame): - df = input_data.copy() - else: - raise ValueError("input_data must be either a valid file-system path or a dataframe.") - - # - if utils.is_callable_list(group_by_funcs): - grouping_cols = [] - for i, func in enumerate(group_by_funcs): - grouping_cols.append('level_'+str(i)+'_groups') - df['level_'+str(i)+'_groups'] = func(df) - elif utils.is_str_list(group_by_funcs) and all([item in df.columns for item in group_by_funcs]): - grouping_cols = group_by_funcs - else: - raise ValueError("'group_by_funcs' must be a list of callables (or str) that takes input_data as input an returns a valid categorical output.") - - if approach == 'botton-up': - # TODO: implement botton-up approach - if is_nested_hierarchy(df.loc[:,grouping_cols]): - print('Do something') - else: - raise ValueError("group_by_funcs do not define a valid group hierarchy. Please reprocess the input_data or choose different grouping functions.") - - elif approach == 'top-down': - # Check the length of group_by_funcs list is at most 2 - #if len(group_by_funcs) > 2: - # # TODO: extend to more than 2 callable elements. - # raise ValueError("group_by_funcs can only contain at most two grouping elements.") - - with h5py.File(ofilename, 'w') as file: - - create_group_hierarchy(file, df, grouping_cols) - - file.attrs.create(name='depth', data=len(grouping_cols)-1) - - #join_path = lambda x,y: '/' + x + '/' + y - #for group_name in df[grouping_cols[0]].unique(): - # group_filter = df[grouping_cols[0]]==group_name - # for subgroup_name in df.loc[group_filter,grouping_cols[1]].unique(): - # # Create group subgroup folder structure implicitly. - # # Explicitly, grp = f.create_group(group_name), subgrp = grp.create_group(subgroup_name) - # print(join_path(group_name,subgroup_name)) - # f.create_group(join_path(group_name,subgroup_name)) - - # Get groups at the bottom of the hierarchy - #bottom_level_groups = get_groups_at_a_level(file, file.attrs['depth']) - - #nodes, parents, values = get_parent_child_relationships(file) - print(':)') - #fig = px.treemap(values=values,names=nodes, parents= parents) - #fig.update_traces(root_color="lightgrey") - #fig.update_layout(width = 800, height=600, margin = dict(t=50, l=25, r=25, b=25)) - #fig.show() - else: - raise ValueError("'approach' must take values in ['top-down','bottom-up']") - - - #for i, value in enumerate(df['level_'+str(0)+'_groups'].unique().tolist()): - - # 2. Validate group hierarchy, lower level groups must be embedded in higher level groups - - # 3. Create hdf5 file with groups defined by the 'file_group' column - # - # Add datasets to groups and the groups and the group's attributes - - #return 0 - - -def main_5505(): - - inputfile_dir = '\\\\fs101\\5505\\People\\Juan\\TypicalBeamTime' - - file_dict = g5505_file_reader.read_xps_ibw_file_as_dict(inputfile_dir+'\\SES\\0069069_N1s_495eV.ibw') - group_by_type = lambda x : utils.group_by_df_column(x,'filetype') - - select_dir_keywords = ['NEXAFS', 'Notes', 'Photos', 'Pressure', 'RGA', 'SES'] - create_hdf5_file_from_filesystem_path('test_sls_data.h5',inputfile_dir,select_dir_keywords,select_file_keywords=[]) - display_group_hierarchy_on_a_treemap('test_smog_chamber_v5.h5') - - #create_hdf5_file('test', inputfile_dir, 'Topdown', [group_by_type], extract_attrs_func = None) - -def main_smog_chamber(): - - inputfile_dir = '\\\\fs03\\Iron_Sulphate' - include_list = ['htof','ams', 'ptr', 'gas','smps'] - include_list = ['gas','smps\\20220726','htof\\2022.07.26','ptr\\2022.07.26','ams\\2022.07.26'] - select_date_list = ['20220726','2022.07.26'] - - create_hdf5_file_from_filesystem_path('test_smog_chamber_v5.h5',inputfile_dir,include_list,select_date_list) - display_group_hierarchy_on_a_treemap('test_smog_chamber_v5.h5') - -def main_mtable_h5_from_dataframe(): - # Read BeamTimeMetaData.h5, containing Thorsten's Matlab Table - input_data_df = read_mtable_as_dataframe('input_files\\BeamTimeMetaData.h5') - - # Preprocess Thorsten's input_data dataframe so that i can be used to create a newer .h5 file - # under certain grouping specificiations. - input_data_df = input_data_df.rename(columns = {'name':'filename'}) - input_data_df = utils.augment_with_filenumber(input_data_df) - input_data_df = utils.augment_with_filetype(input_data_df) - input_data_df = utils.split_sample_col_into_sample_and_data_quality_cols(input_data_df) - input_data_df['lastModifiedDatestr'] = input_data_df['lastModifiedDatestr'].astype('datetime64[s]') - - # Define grouping functions to be passed into create_hdf5_file function. These can also be set - # as strings refering to categorical columns in input_data_df. - - test_grouping_funcs = True - if test_grouping_funcs: - group_by_sample = lambda x : utils.group_by_df_column(x,'sample') - group_by_type = lambda x : utils.group_by_df_column(x,'filetype') - group_by_filenumber = lambda x : utils.group_by_df_column(x,'filenumber') - else: - group_by_sample = 'sample' - group_by_type = 'filetype' - group_by_filenumber = 'filenumber' - - create_hdf5_file_from_dataframe('test.h5',input_data_df, 'top-down', group_by_funcs = [group_by_sample, group_by_type, group_by_filenumber]) - - annotation_dict = {'Campaign name': 'SLS-Campaign-2023', - 'Users':'Thorsten, Luca, Zoe', - 'Startdate': str(input_data_df['lastModifiedDatestr'].min()), - 'Enddate': str(input_data_df['lastModifiedDatestr'].max()) - } - annotate_root_dir('test.h5',annotation_dict) - - display_group_hierarchy_on_a_treemap('test.h5') - - print(':)') - - -if __name__ == '__main__': - - main_mtable_h5_from_dataframe() - - print(':)') -