diff --git a/src/g5505_file_reader.py b/src/g5505_file_reader.py new file mode 100644 index 0000000..1b78ae1 --- /dev/null +++ b/src/g5505_file_reader.py @@ -0,0 +1,67 @@ +import numpy as np +import pandas as np +import matplotlib.pyplot as plt +import plotly.express as px +import plotly.graph_objects as go +from plotly.subplots import make_subplots +from igor2.binarywave import load as loadibw + +#import h5py +import os +import tempfile +import shutil + +def read_xps_ibw_file_as_dict(filename): + + """ Reads ibw files from multiphase chemistry group, which contain xps spectra and acquisition settings.""" + + file_obj = loadibw(filename) + + required_keys = ['wData','data_units','dimension_units','note'] + if sum([item in required_keys for item in file_obj['wave'].keys()]) < len(required_keys): + raise ValueError('This is not a valid xps ibw file. It does not satisfy minimum adimissibility criteria.') + + file_dict = {} + path_tail, path_head = os.path.split(filename) + file_dict['name'] = path_head + file_dict['data'] = file_obj['wave']['wData'] + file_dict['data_units'] = file_obj['wave']['data_units'] + file_dict['shape'] = file_dict['data'].shape + file_dict['dtype'] = type(file_dict['data']) + + file_dict['attributes_dict'] = {} + + # Convert notes of bytes class to string class and split string into a list of elements separated by '\r'. + notes_list = file_obj['wave']['note'].decode("utf-8").split('\r') + exclude_list = ['Excitation Energy'] + for item in notes_list: + if '=' in item: + key, value = tuple(item.split('=')) + # TODO: check if value can be converted into a numeric type. Now all values are string type + if not key in exclude_list: + file_dict['attributes_dict'][key] = value + + # TODO: talk to Thorsten to see if there is an easier way to access the below attributes + dimension_labels = file_obj['wave']['dimension_units'].decode("utf-8").split(']') + file_dict['attributes_dict']['dimension_units'] = [item+']' for item in dimension_labels[0:len(dimension_labels)-1]] + + + return file_dict + + + +def main(): + + inputfile_dir = '\\\\fs101\\5505\\People\\Juan\\TypicalBeamTime' + + file_dict = read_xps_ibw_file_as_dict(inputfile_dir+'\\SES\\0069069_N1s_495eV.ibw') + + for key in file_dict.keys(): + print(key,file_dict[key]) + + +if __name__ == '__main__': + + main() + + print(':)') \ No newline at end of file diff --git a/src/g5505_utils.py b/src/g5505_utils.py new file mode 100644 index 0000000..103f529 --- /dev/null +++ b/src/g5505_utils.py @@ -0,0 +1,51 @@ +import pandas as pd +import os + + +def is_callable_list(x : list): + return all([callable(item) for item in x]) + +def is_str_list(x : list): + return all([isinstance(item,str) for item in x]) + +def augment_with_filetype(df): + df['filetype'] = [os.path.splitext(item)[1][1::] for item in df['filename']] + #return [os.path.splitext(item)[1][1::] for item in df['filename']] + return df + +def augment_with_filenumber(df): + df['filenumber'] = [item[0:item.find('_')] for item in df['filename']] + #return [item[0:item.find('_')] for item in df['filename']] + return df + +def group_by_df_column(df, column_name: str): + """ + df (pandas.DataFrame): + column_name (str): column_name of df by which grouping operation will take place. + """ + + if not column_name in df.columns: + raise ValueError("column_name must be in the columns of df.") + + return df[column_name] + +def split_sample_col_into_sample_and_data_quality_cols(input_data: pd.DataFrame): + + sample_name = [] + sample_quality = [] + for item in input_data['sample']: + if item.find('(')!=-1: + #print(item) + sample_name.append(item[0:item.find('(')]) + sample_quality.append(item[item.find('(')+1:len(item)-1]) + else: + if item=='': + sample_name.append('Not yet annotated') + sample_quality.append('unevaluated') + else: + sample_name.append(item) + sample_quality.append('good data') + input_data['sample'] = sample_name + input_data['data_quality'] = sample_quality + + return input_data diff --git a/src/hdf5_lib.py b/src/hdf5_lib.py new file mode 100644 index 0000000..4b80bd6 --- /dev/null +++ b/src/hdf5_lib.py @@ -0,0 +1,535 @@ +import pandas as pd +import h5py +import os +#import sys +#from itertools import product +import numpy as np + +import matplotlib.pyplot as plt +import plotly.express as px +import plotly.graph_objects as go +from plotly.subplots import make_subplots + +import g5505_file_reader +import g5505_utils as utils +import smog_chamber_file_reader + + +def read_mtable_as_dataframe(filename): + + """ Reconstruct a Matlab Table encoded in a .h5 file as a Pandas DataFrame. The input .h5 file + contains as many groups as rows in the Matlab Table, and each group stores dataset-like variables in the Table as + Datasets while categorical and numerical variables in the table are represented as attributes of each group. + + Note: DataFrame is constructed columnwise to ensure homogenous data columns. + + Parameters: + + filename (str): .h5 file's name. It may include location-path information. + + Returns: + + output_dataframe (pd.DataFrame): Matlab's Table as a Pandas DataFrame + + """ + + #contructs dataframe by filling out entries columnwise. This way we can ensure homogenous data columns""" + + with h5py.File(filename,'r') as file: + + # Define group's attributes and datasets. This should hold + # for all groups. TODO: implement verification and noncompliance error if needed. + group_list = list(file.keys()) + group_attrs = list(file[group_list[0]].attrs.keys()) + # + column_attr_names = [item[item.find('_')+1::] for item in group_attrs] + column_attr_names_idx = [int(item[4:(item.find('_'))]) for item in group_attrs] + + group_datasets = list(file[group_list[0]].keys()) if not 'DS_EMPTY' in file[group_list[0]].keys() else [] + # + column_dataset_names = [file[group_list[0]][item].attrs['column_name'] for item in group_datasets] + column_dataset_names_idx = [int(item[2:]) for item in group_datasets] + + + # Define data_frame as group_attrs + group_datasets + #pd_series_index = group_attrs + group_datasets + pd_series_index = column_attr_names + column_dataset_names + + output_dataframe = pd.DataFrame(columns=pd_series_index,index=group_list) + + tmp_col = [] + + for meas_prop in group_attrs + group_datasets: + if meas_prop in group_attrs: + column_label = meas_prop[meas_prop.find('_')+1:] + # Create numerical or categorical column from group's attributes + tmp_col = [file[group_key].attrs[meas_prop][()][0] for group_key in group_list] + else: + # Create dataset column from group's datasets + column_label = file[group_list[0] + '/' + meas_prop].attrs['column_name'] + #tmp_col = [file[group_key + '/' + meas_prop][()][0] for group_key in group_list] + tmp_col = [file[group_key + '/' + meas_prop][()] for group_key in group_list] + + output_dataframe.loc[:,column_label] = tmp_col + + return output_dataframe + +def create_group_hierarchy(obj, df, columns): + + """ + Input: + obj (h5py.File or h5py.Group) + columns (list of strs): denote categorical columns in df to be used to define hdf5 file group hierarchy + """ + + if not columns: + return + + # Determine categories associated with first categorical column + unique_values = df[columns[0]].unique() + + if obj.name == '/': + obj.attrs.create('count',df.shape[0]) + + for group_name in unique_values: + + group = obj.require_group(group_name) + group.attrs.create('column_name', columns[0]) + + sub_df = df[df[columns[0]]==group_name] # same as df.loc[df[columns[0]]==group_name,:] + group.attrs.create('count',sub_df.shape[0]) + + # if group_name == 'MgO powder,H2O,HCl': + # print('Here:',sub_df.shape) + create_group_hierarchy(group, sub_df, columns[1::]) + +def is_nested_hierarchy(df) -> bool: + """receives a dataframe with categorical columns and checks whether rows form a nested group hierarchy. + That is, from bottom to top, subsequent hierarchical levels contain nested groups. The lower level groups belong to exactly one group in the higher level group. + """ + # TODO: generalize the code to check for deeper group hierachies. + def are_nested(df, col, col_nxt): + """ Checks whether low level LL groups can be separated in terms of high level HL groups. + That is, elements of low-level groups do not belong to more than one HL group.""" + + # Compute higher level group names/categories + memberships = df[col_nxt].unique().tolist() + + # Compute upper-level group memberships of low-level groups + col_avg_memberships = df.groupby(col).mean()[col_nxt].unique() + + # Check whether all low-level groups have an actual hlg membership. That is, their avg. hlg membership is in the hlg membership. + return all([col_avg_memberships[group_idx] in memberships for group_idx in range(len(col_avg_memberships))]) + + df_tmp = df.copy() + + # Create relabeling map + for column_name in df_tmp.columns: + category_index = pd.Series(np.arange(len(df_tmp[column_name].unique())), index=df_tmp[column_name].unique()) + df_tmp[column_name] = category_index[df_tmp[column_name].tolist()].tolist() + + df_tmp.plot() + + return all([are_nested(df_tmp,'level_'+str(i)+'_groups','level_'+str(i+1)+'_groups') for i in range(len(df_tmp.columns)-1)]) + +def get_attr_names(input_data): + + # TODO: extend this to file-system paths + if not isinstance(input_data,pd.DataFrame): + raise ValueError("input_data must be a pd.DataFrame") + + return input_data.columns + +def get_parent_child_relationships(file: h5py.File): + + nodes = ['/'] + parent = [''] + #values = [file.attrs['count']] + # TODO: maybe we should make this more general and not dependent on file_list attribute? + if 'file_list' in file.attrs.keys(): + values = [len(file.attrs['file_list'])] + else: + values = [1] + + def node_visitor(name,obj): + #if isinstance(obj,h5py.Group): + nodes.append(obj.name) + parent.append(obj.parent.name) + #nodes.append(os.path.split(obj.name)[1]) + #parent.append(os.path.split(obj.parent.name)[1]) + if isinstance(obj,h5py.Dataset) or not 'file_list' in obj.attrs.keys(): + values.append(1) + else: + values.append(len(obj.attrs['file_list'])) + file.visititems(node_visitor) + + return nodes, parent, values + + +def get_groups_at_a_level(file: h5py.File, level: str): + + groups = [] + def node_selector(name, obj): + if name.count('/') == level: + print(name) + groups.append(obj.name) + + file.visititems(node_selector) + #file.visititems() + return groups + +def format_group_names(names: list): + + formated_names = [] + for name in names: + idx = name.rfind('/') + if len(name) > 1: + formated_names.append(name[idx+1::]) + else: + formated_names.append(name) + + return pd.DataFrame(formated_names,columns=['formated_names'],index=names) + + + +def display_group_hierarchy_on_a_treemap(filename: str): + + with h5py.File(filename,'r') as file: + nodes, parents, values = get_parent_child_relationships(file) + + metadata_list = [] + metadata_dict={} + for key in file.attrs.keys(): + if 'metadata' in key: + metadata_dict[key[key.find('_')+1::]]= file.attrs[key] + metadata_list.append(key[key.find('_')+1::]+':'+file.attrs[key]) + metadata = '
'.join(['
'] + metadata_list) + + customdata_series = pd.Series(nodes) + customdata_series[0] = metadata + + fig = make_subplots(1, 1, specs=[[{"type": "domain"}]],) + fig.add_trace(go.Treemap( + labels=nodes, #formating_df['formated_names'][nodes], + parents=parents,#formating_df['formated_names'][parents], + values=values, + branchvalues='remainder', + customdata= customdata_series, + #marker=dict( + # colors=df_all_trees['color'], + # colorscale='RdBu', + # cmid=average_score), + #hovertemplate='%{label}
Number of files: %{value}
Success rate: %{color:.2f}', + hovertemplate='%{label}
Count: %{value}
Path: %{customdata}', + name='', + root_color="lightgrey" + )) + fig.update_layout(width = 800, height= 600, margin = dict(t=50, l=25, r=25, b=25)) + fig.show() + +def annotate_root_dir(filename,annotation_dict: dict): + with h5py.File(filename,'r+') as file: + for key in annotation_dict: + file.attrs.create('metadata_'+key, annotation_dict[key]) + + +import shutil + +def create_hdf5_file_from_filesystem_path(ofilename : str, input_file_system_path : str, select_dir_keywords = [], select_file_keywords =[]): + + """ + Creates an .h5 file with name ofilename that preserves the directory tree (or folder structure) of given a filesystem path and + a few file and directory keywords. The keywords enable filtering of directories and files that do not contain the specified keywords. + + In the .h5 file, only files that are admissible file formats will be stored in the form of datasets and attributes. + + Parameters: + + ofilename (str): + + input_file_system_path (str) : + + select_dir_keywords (list): default value [], + list of string elements to consider or select only directory paths that contain a word in 'select_dir_keywords'. + When empty, all directory paths are considered to be included in the hdf5 file group hierarchy. + + select_file_keywords (list): default value [], + list of string elements to consider or select only files that contain a word in 'select_file_keywords'. + When empty, all files are considered to be stored in the hdf5 file. + + Returns: + + + """ + + + with h5py.File(ofilename, 'w') as h5file: + + root_dir = '?##' + + # Visit each subdirectory from top to bottom, root directory defined by input_file_sytem_path to the lower + # level directories. + for node_number, node in enumerate(os.walk(input_file_system_path, topdown=True)): + + dirpath, dirnames, filenames_list = node + + if node_number == 0: + offset = dirpath.count(os.sep) + + # Filter out files with filenames not containing a keyword specified in the parameter 'select_file_keywords'. + # When select_file_keywords is an empty, i.e., [], do not apply any filter on the filenames. + if select_file_keywords: + filtered_filename_list = [] + for filename in filenames_list: + if any([date in filename for date in select_file_keywords]): + filtered_filename_list.append(filename) + else: + filtered_filename_list = filenames_list.copy() + + # Skip subdirectories that do not contain a keyword in the parameter 'select_dir_keywords' when it is nonempty + if select_dir_keywords: + if (dirpath.count(os.sep) > offset) and not any([item in dirpath for item in select_dir_keywords]): + continue + + # TODO: i think the below lines can be simplified, or based on the enumeration there is no need for conditionals + group_name = dirpath.replace(os.sep,'/') + if root_dir == '?##': + # Set root_dir to top directory path in input file system + root_dir = group_name + group_name = group_name.replace(root_dir,'/') + #h5file.attrs.create(name='count',data=len(filenames_list)) + h5file.attrs.create(name='file_list',data=filtered_filename_list) + else: + group_name = group_name.replace(root_dir+'/','/') + # Group hierarchy is implicitly defined by the forward slashes + h5file.create_group(group_name) + h5file[group_name].attrs.create(name='file_list',data=filtered_filename_list) + + + # TODO: for each "admissible" file in filenames, create an associated dataset in the corresponding group (subdirectory) + + tmp_dirpath = os.path.join(os.getcwd(), 'tmp') + + if not os.path.exists(tmp_dirpath): + os.mkdir(tmp_dirpath) + + for filename in filtered_filename_list: + + if 'ibw' in filename: + file_dict = g5505_file_reader.read_xps_ibw_file_as_dict(os.path.join(dirpath,filename)) + + h5file[group_name].create_dataset(name = file_dict['name'], + data = file_dict['data'], + #dtype = file_dict['dtype'], + shape = file_dict['shape']) + + #h5file[group_name][file_dict['name']].dims[0] = file_dict['dimension_units'] + + for key in file_dict['attributes_dict'].keys(): + h5file[group_name][file_dict['name']].attrs.create(name=key,data=file_dict['attributes_dict'][key]) + + if 'h5' in filename: + + # Create copy of original file to avoid possible file corruption and work with it. + backup_filename = 'backup_'+filename + # Path + + shutil.copy(os.path.join(dirpath,filename), os.path.join(tmp_dirpath,backup_filename)) + # Open backup h5 file and copy complet filesystem directory onto a group in h5file + with h5py.File(os.path.join(tmp_dirpath,backup_filename),'r') as src_file: + h5file.copy(source=src_file['/'],dest= group_name +'/'+filename) + + # TODO: generilize to multiphase chemistry text and dat files + # TODO: include header information from files as well + if ('txt' in filename or 'TXT' in filename) and any([item in os.path.join(dirpath,filename) for item in ['smps','gas']]): + if 'smps' in os.path.join(dirpath,filename): + file_dict = smog_chamber_file_reader.read_txt_files_as_dict(os.path.join(dirpath,filename),'smps') + elif 'gas' in os.path.join(dirpath,filename): + file_dict = smog_chamber_file_reader.read_txt_files_as_dict(os.path.join(dirpath,filename),'gas') + + # TODO: create datasets of compound data type to include variable/or column names and datetimestamps + h5file[group_name].create_group(filename) + h5file[group_name][filename].create_dataset(name = 'data', + data = file_dict['data'], + #dtype = file_dict['dtype'], + shape = file_dict['data'].shape) + + + h5file[group_name][filename].create_dataset(name = 'data_column_names', + data = np.array(file_dict['data_column_names']), + #dtype = file_dict['dtype'], + shape = np.array(file_dict['data_column_names']).shape) + + for key in file_dict['categ_data_dict'].keys(): + h5file[group_name][filename].create_dataset(name=key,data=file_dict['categ_data_dict'][key]) + + +def create_hdf5_file_from_dataframe(ofilename, input_data, approach : str, group_by_funcs : list, extract_attrs_func = None): + + """ Creates an hdf5 file with as many levels as indicated by len(group_by_funcs). + Top level denotes the root group/directory and bottom level denotes measurement level groups. + + Parameters: + input_data (pd.DataFrame | file-system path) : + group_by_funcs (list of callables or strs) : contains a list of callables or dataframe's column names that will be used + to partition or group files from top to bottom. + + Callables in the list must assign a categorical value to each file in a file list, internally represented as a DataFrame, + and they thus return a pd.Series of categorical values. + + On the other hand, strings in the list refer to the name of categorical columns in the input_data (when this is a DataFrame) + + Returns: + + """ + + # Check whether input_data is a valid file-system path or a DataFrame + is_valid_path = lambda x : os.path.exists(input_data) if isinstance(input_data,str) else False + + if is_valid_path(input_data): + + file_list = os.listdir(input_data) + + # Navigates file-system folders/directories from top to bottom. + #for dirpath, dirnames, filenames in os.walk(input_data,topdown=True): + + + #df = pd.DataFrame(file_list,columns=['filename']) + df = utils.augment_with_filetype(df) + + elif isinstance(input_data,pd.DataFrame): + df = input_data.copy() + else: + raise ValueError("input_data must be either a valid file-system path or a dataframe.") + + # + if utils.is_callable_list(group_by_funcs): + grouping_cols = [] + for i, func in enumerate(group_by_funcs): + grouping_cols.append('level_'+str(i)+'_groups') + df['level_'+str(i)+'_groups'] = func(df) + elif utils.is_str_list(group_by_funcs) and all([item in df.columns for item in group_by_funcs]): + grouping_cols = group_by_funcs + else: + raise ValueError("'group_by_funcs' must be a list of callables (or str) that takes input_data as input an returns a valid categorical output.") + + if approach == 'botton-up': + # TODO: implement botton-up approach + if is_nested_hierarchy(df.loc[:,grouping_cols]): + print('Do something') + else: + raise ValueError("group_by_funcs do not define a valid group hierarchy. Please reprocess the input_data or choose different grouping functions.") + + elif approach == 'top-down': + # Check the length of group_by_funcs list is at most 2 + #if len(group_by_funcs) > 2: + # # TODO: extend to more than 2 callable elements. + # raise ValueError("group_by_funcs can only contain at most two grouping elements.") + + with h5py.File(ofilename, 'w') as file: + + create_group_hierarchy(file, df, grouping_cols) + + file.attrs.create(name='depth', data=len(grouping_cols)-1) + + #join_path = lambda x,y: '/' + x + '/' + y + #for group_name in df[grouping_cols[0]].unique(): + # group_filter = df[grouping_cols[0]]==group_name + # for subgroup_name in df.loc[group_filter,grouping_cols[1]].unique(): + # # Create group subgroup folder structure implicitly. + # # Explicitly, grp = f.create_group(group_name), subgrp = grp.create_group(subgroup_name) + # print(join_path(group_name,subgroup_name)) + # f.create_group(join_path(group_name,subgroup_name)) + + # Get groups at the bottom of the hierarchy + #bottom_level_groups = get_groups_at_a_level(file, file.attrs['depth']) + + #nodes, parents, values = get_parent_child_relationships(file) + print(':)') + #fig = px.treemap(values=values,names=nodes, parents= parents) + #fig.update_traces(root_color="lightgrey") + #fig.update_layout(width = 800, height=600, margin = dict(t=50, l=25, r=25, b=25)) + #fig.show() + else: + raise ValueError("'approach' must take values in ['top-down','bottom-up']") + + + #for i, value in enumerate(df['level_'+str(0)+'_groups'].unique().tolist()): + + # 2. Validate group hierarchy, lower level groups must be embedded in higher level groups + + # 3. Create hdf5 file with groups defined by the 'file_group' column + # + # Add datasets to groups and the groups and the group's attributes + + #return 0 + + +def main_5505(): + + inputfile_dir = '\\\\fs101\\5505\\People\\Juan\\TypicalBeamTime' + + file_dict = g5505_file_reader.read_xps_ibw_file_as_dict(inputfile_dir+'\\SES\\0069069_N1s_495eV.ibw') + group_by_type = lambda x : utils.group_by_df_column(x,'filetype') + + select_dir_keywords = ['NEXAFS', 'Notes', 'Photos', 'Pressure', 'RGA', 'SES'] + create_hdf5_file_from_filesystem_path('test_sls_data.h5',inputfile_dir,select_dir_keywords,select_file_keywords=[]) + display_group_hierarchy_on_a_treemap('test_smog_chamber_v5.h5') + + #create_hdf5_file('test', inputfile_dir, 'Topdown', [group_by_type], extract_attrs_func = None) + +def main_smog_chamber(): + + inputfile_dir = '\\\\fs03\\Iron_Sulphate' + include_list = ['htof','ams', 'ptr', 'gas','smps'] + include_list = ['gas','smps\\20220726','htof\\2022.07.26','ptr\\2022.07.26','ams\\2022.07.26'] + select_date_list = ['20220726','2022.07.26'] + + create_hdf5_file_from_filesystem_path('test_smog_chamber_v5.h5',inputfile_dir,include_list,select_date_list) + display_group_hierarchy_on_a_treemap('test_smog_chamber_v5.h5') + +def main_mtable_h5_from_dataframe(): + # Read BeamTimeMetaData.h5, containing Thorsten's Matlab Table + input_data_df = read_mtable_as_dataframe('input_files\\BeamTimeMetaData.h5') + + # Preprocess Thorsten's input_data dataframe so that i can be used to create a newer .h5 file + # under certain grouping specificiations. + input_data_df = input_data_df.rename(columns = {'name':'filename'}) + input_data_df = utils.augment_with_filenumber(input_data_df) + input_data_df = utils.augment_with_filetype(input_data_df) + input_data_df = utils.split_sample_col_into_sample_and_data_quality_cols(input_data_df) + input_data_df['lastModifiedDatestr'] = input_data_df['lastModifiedDatestr'].astype('datetime64[s]') + + # Define grouping functions to be passed into create_hdf5_file function. These can also be set + # as strings refering to categorical columns in input_data_df. + + test_grouping_funcs = True + if test_grouping_funcs: + group_by_sample = lambda x : utils.group_by_df_column(x,'sample') + group_by_type = lambda x : utils.group_by_df_column(x,'filetype') + group_by_filenumber = lambda x : utils.group_by_df_column(x,'filenumber') + else: + group_by_sample = 'sample' + group_by_type = 'filetype' + group_by_filenumber = 'filenumber' + + create_hdf5_file_from_dataframe('test.h5',input_data_df, 'top-down', group_by_funcs = [group_by_sample, group_by_type, group_by_filenumber]) + + annotation_dict = {'Campaign name': 'SLS-Campaign-2023', + 'Users':'Thorsten, Luca, Zoe', + 'Startdate': str(input_data_df['lastModifiedDatestr'].min()), + 'Enddate': str(input_data_df['lastModifiedDatestr'].max()) + } + annotate_root_dir('test.h5',annotation_dict) + + display_group_hierarchy_on_a_treemap('test.h5') + + print(':)') + + +if __name__ == '__main__': + + main_mtable_h5_from_dataframe() + + print(':)') +