Created src folder and transfered into it previously deleted python scripts.

This commit is contained in:
2024-02-15 09:52:15 +01:00
parent 4433f47505
commit dfdfea2b71
3 changed files with 653 additions and 0 deletions

67
src/g5505_file_reader.py Normal file
View File

@ -0,0 +1,67 @@
import numpy as np
import pandas as np
import matplotlib.pyplot as plt
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from igor2.binarywave import load as loadibw
#import h5py
import os
import tempfile
import shutil
def read_xps_ibw_file_as_dict(filename):
""" Reads ibw files from multiphase chemistry group, which contain xps spectra and acquisition settings."""
file_obj = loadibw(filename)
required_keys = ['wData','data_units','dimension_units','note']
if sum([item in required_keys for item in file_obj['wave'].keys()]) < len(required_keys):
raise ValueError('This is not a valid xps ibw file. It does not satisfy minimum adimissibility criteria.')
file_dict = {}
path_tail, path_head = os.path.split(filename)
file_dict['name'] = path_head
file_dict['data'] = file_obj['wave']['wData']
file_dict['data_units'] = file_obj['wave']['data_units']
file_dict['shape'] = file_dict['data'].shape
file_dict['dtype'] = type(file_dict['data'])
file_dict['attributes_dict'] = {}
# Convert notes of bytes class to string class and split string into a list of elements separated by '\r'.
notes_list = file_obj['wave']['note'].decode("utf-8").split('\r')
exclude_list = ['Excitation Energy']
for item in notes_list:
if '=' in item:
key, value = tuple(item.split('='))
# TODO: check if value can be converted into a numeric type. Now all values are string type
if not key in exclude_list:
file_dict['attributes_dict'][key] = value
# TODO: talk to Thorsten to see if there is an easier way to access the below attributes
dimension_labels = file_obj['wave']['dimension_units'].decode("utf-8").split(']')
file_dict['attributes_dict']['dimension_units'] = [item+']' for item in dimension_labels[0:len(dimension_labels)-1]]
return file_dict
def main():
inputfile_dir = '\\\\fs101\\5505\\People\\Juan\\TypicalBeamTime'
file_dict = read_xps_ibw_file_as_dict(inputfile_dir+'\\SES\\0069069_N1s_495eV.ibw')
for key in file_dict.keys():
print(key,file_dict[key])
if __name__ == '__main__':
main()
print(':)')

51
src/g5505_utils.py Normal file
View File

@ -0,0 +1,51 @@
import pandas as pd
import os
def is_callable_list(x : list):
return all([callable(item) for item in x])
def is_str_list(x : list):
return all([isinstance(item,str) for item in x])
def augment_with_filetype(df):
df['filetype'] = [os.path.splitext(item)[1][1::] for item in df['filename']]
#return [os.path.splitext(item)[1][1::] for item in df['filename']]
return df
def augment_with_filenumber(df):
df['filenumber'] = [item[0:item.find('_')] for item in df['filename']]
#return [item[0:item.find('_')] for item in df['filename']]
return df
def group_by_df_column(df, column_name: str):
"""
df (pandas.DataFrame):
column_name (str): column_name of df by which grouping operation will take place.
"""
if not column_name in df.columns:
raise ValueError("column_name must be in the columns of df.")
return df[column_name]
def split_sample_col_into_sample_and_data_quality_cols(input_data: pd.DataFrame):
sample_name = []
sample_quality = []
for item in input_data['sample']:
if item.find('(')!=-1:
#print(item)
sample_name.append(item[0:item.find('(')])
sample_quality.append(item[item.find('(')+1:len(item)-1])
else:
if item=='':
sample_name.append('Not yet annotated')
sample_quality.append('unevaluated')
else:
sample_name.append(item)
sample_quality.append('good data')
input_data['sample'] = sample_name
input_data['data_quality'] = sample_quality
return input_data

535
src/hdf5_lib.py Normal file
View File

@ -0,0 +1,535 @@
import pandas as pd
import h5py
import os
#import sys
#from itertools import product
import numpy as np
import matplotlib.pyplot as plt
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import g5505_file_reader
import g5505_utils as utils
import smog_chamber_file_reader
def read_mtable_as_dataframe(filename):
""" Reconstruct a Matlab Table encoded in a .h5 file as a Pandas DataFrame. The input .h5 file
contains as many groups as rows in the Matlab Table, and each group stores dataset-like variables in the Table as
Datasets while categorical and numerical variables in the table are represented as attributes of each group.
Note: DataFrame is constructed columnwise to ensure homogenous data columns.
Parameters:
filename (str): .h5 file's name. It may include location-path information.
Returns:
output_dataframe (pd.DataFrame): Matlab's Table as a Pandas DataFrame
"""
#contructs dataframe by filling out entries columnwise. This way we can ensure homogenous data columns"""
with h5py.File(filename,'r') as file:
# Define group's attributes and datasets. This should hold
# for all groups. TODO: implement verification and noncompliance error if needed.
group_list = list(file.keys())
group_attrs = list(file[group_list[0]].attrs.keys())
#
column_attr_names = [item[item.find('_')+1::] for item in group_attrs]
column_attr_names_idx = [int(item[4:(item.find('_'))]) for item in group_attrs]
group_datasets = list(file[group_list[0]].keys()) if not 'DS_EMPTY' in file[group_list[0]].keys() else []
#
column_dataset_names = [file[group_list[0]][item].attrs['column_name'] for item in group_datasets]
column_dataset_names_idx = [int(item[2:]) for item in group_datasets]
# Define data_frame as group_attrs + group_datasets
#pd_series_index = group_attrs + group_datasets
pd_series_index = column_attr_names + column_dataset_names
output_dataframe = pd.DataFrame(columns=pd_series_index,index=group_list)
tmp_col = []
for meas_prop in group_attrs + group_datasets:
if meas_prop in group_attrs:
column_label = meas_prop[meas_prop.find('_')+1:]
# Create numerical or categorical column from group's attributes
tmp_col = [file[group_key].attrs[meas_prop][()][0] for group_key in group_list]
else:
# Create dataset column from group's datasets
column_label = file[group_list[0] + '/' + meas_prop].attrs['column_name']
#tmp_col = [file[group_key + '/' + meas_prop][()][0] for group_key in group_list]
tmp_col = [file[group_key + '/' + meas_prop][()] for group_key in group_list]
output_dataframe.loc[:,column_label] = tmp_col
return output_dataframe
def create_group_hierarchy(obj, df, columns):
"""
Input:
obj (h5py.File or h5py.Group)
columns (list of strs): denote categorical columns in df to be used to define hdf5 file group hierarchy
"""
if not columns:
return
# Determine categories associated with first categorical column
unique_values = df[columns[0]].unique()
if obj.name == '/':
obj.attrs.create('count',df.shape[0])
for group_name in unique_values:
group = obj.require_group(group_name)
group.attrs.create('column_name', columns[0])
sub_df = df[df[columns[0]]==group_name] # same as df.loc[df[columns[0]]==group_name,:]
group.attrs.create('count',sub_df.shape[0])
# if group_name == 'MgO powder,H2O,HCl':
# print('Here:',sub_df.shape)
create_group_hierarchy(group, sub_df, columns[1::])
def is_nested_hierarchy(df) -> bool:
"""receives a dataframe with categorical columns and checks whether rows form a nested group hierarchy.
That is, from bottom to top, subsequent hierarchical levels contain nested groups. The lower level groups belong to exactly one group in the higher level group.
"""
# TODO: generalize the code to check for deeper group hierachies.
def are_nested(df, col, col_nxt):
""" Checks whether low level LL groups can be separated in terms of high level HL groups.
That is, elements of low-level groups do not belong to more than one HL group."""
# Compute higher level group names/categories
memberships = df[col_nxt].unique().tolist()
# Compute upper-level group memberships of low-level groups
col_avg_memberships = df.groupby(col).mean()[col_nxt].unique()
# Check whether all low-level groups have an actual hlg membership. That is, their avg. hlg membership is in the hlg membership.
return all([col_avg_memberships[group_idx] in memberships for group_idx in range(len(col_avg_memberships))])
df_tmp = df.copy()
# Create relabeling map
for column_name in df_tmp.columns:
category_index = pd.Series(np.arange(len(df_tmp[column_name].unique())), index=df_tmp[column_name].unique())
df_tmp[column_name] = category_index[df_tmp[column_name].tolist()].tolist()
df_tmp.plot()
return all([are_nested(df_tmp,'level_'+str(i)+'_groups','level_'+str(i+1)+'_groups') for i in range(len(df_tmp.columns)-1)])
def get_attr_names(input_data):
# TODO: extend this to file-system paths
if not isinstance(input_data,pd.DataFrame):
raise ValueError("input_data must be a pd.DataFrame")
return input_data.columns
def get_parent_child_relationships(file: h5py.File):
nodes = ['/']
parent = ['']
#values = [file.attrs['count']]
# TODO: maybe we should make this more general and not dependent on file_list attribute?
if 'file_list' in file.attrs.keys():
values = [len(file.attrs['file_list'])]
else:
values = [1]
def node_visitor(name,obj):
#if isinstance(obj,h5py.Group):
nodes.append(obj.name)
parent.append(obj.parent.name)
#nodes.append(os.path.split(obj.name)[1])
#parent.append(os.path.split(obj.parent.name)[1])
if isinstance(obj,h5py.Dataset) or not 'file_list' in obj.attrs.keys():
values.append(1)
else:
values.append(len(obj.attrs['file_list']))
file.visititems(node_visitor)
return nodes, parent, values
def get_groups_at_a_level(file: h5py.File, level: str):
groups = []
def node_selector(name, obj):
if name.count('/') == level:
print(name)
groups.append(obj.name)
file.visititems(node_selector)
#file.visititems()
return groups
def format_group_names(names: list):
formated_names = []
for name in names:
idx = name.rfind('/')
if len(name) > 1:
formated_names.append(name[idx+1::])
else:
formated_names.append(name)
return pd.DataFrame(formated_names,columns=['formated_names'],index=names)
def display_group_hierarchy_on_a_treemap(filename: str):
with h5py.File(filename,'r') as file:
nodes, parents, values = get_parent_child_relationships(file)
metadata_list = []
metadata_dict={}
for key in file.attrs.keys():
if 'metadata' in key:
metadata_dict[key[key.find('_')+1::]]= file.attrs[key]
metadata_list.append(key[key.find('_')+1::]+':'+file.attrs[key])
metadata = '<br>'.join(['<br>'] + metadata_list)
customdata_series = pd.Series(nodes)
customdata_series[0] = metadata
fig = make_subplots(1, 1, specs=[[{"type": "domain"}]],)
fig.add_trace(go.Treemap(
labels=nodes, #formating_df['formated_names'][nodes],
parents=parents,#formating_df['formated_names'][parents],
values=values,
branchvalues='remainder',
customdata= customdata_series,
#marker=dict(
# colors=df_all_trees['color'],
# colorscale='RdBu',
# cmid=average_score),
#hovertemplate='<b>%{label} </b> <br> Number of files: %{value}<br> Success rate: %{color:.2f}',
hovertemplate='<b>%{label} </b> <br> Count: %{value} <br> Path: %{customdata}',
name='',
root_color="lightgrey"
))
fig.update_layout(width = 800, height= 600, margin = dict(t=50, l=25, r=25, b=25))
fig.show()
def annotate_root_dir(filename,annotation_dict: dict):
with h5py.File(filename,'r+') as file:
for key in annotation_dict:
file.attrs.create('metadata_'+key, annotation_dict[key])
import shutil
def create_hdf5_file_from_filesystem_path(ofilename : str, input_file_system_path : str, select_dir_keywords = [], select_file_keywords =[]):
"""
Creates an .h5 file with name ofilename that preserves the directory tree (or folder structure) of given a filesystem path and
a few file and directory keywords. The keywords enable filtering of directories and files that do not contain the specified keywords.
In the .h5 file, only files that are admissible file formats will be stored in the form of datasets and attributes.
Parameters:
ofilename (str):
input_file_system_path (str) :
select_dir_keywords (list): default value [],
list of string elements to consider or select only directory paths that contain a word in 'select_dir_keywords'.
When empty, all directory paths are considered to be included in the hdf5 file group hierarchy.
select_file_keywords (list): default value [],
list of string elements to consider or select only files that contain a word in 'select_file_keywords'.
When empty, all files are considered to be stored in the hdf5 file.
Returns:
"""
with h5py.File(ofilename, 'w') as h5file:
root_dir = '?##'
# Visit each subdirectory from top to bottom, root directory defined by input_file_sytem_path to the lower
# level directories.
for node_number, node in enumerate(os.walk(input_file_system_path, topdown=True)):
dirpath, dirnames, filenames_list = node
if node_number == 0:
offset = dirpath.count(os.sep)
# Filter out files with filenames not containing a keyword specified in the parameter 'select_file_keywords'.
# When select_file_keywords is an empty, i.e., [], do not apply any filter on the filenames.
if select_file_keywords:
filtered_filename_list = []
for filename in filenames_list:
if any([date in filename for date in select_file_keywords]):
filtered_filename_list.append(filename)
else:
filtered_filename_list = filenames_list.copy()
# Skip subdirectories that do not contain a keyword in the parameter 'select_dir_keywords' when it is nonempty
if select_dir_keywords:
if (dirpath.count(os.sep) > offset) and not any([item in dirpath for item in select_dir_keywords]):
continue
# TODO: i think the below lines can be simplified, or based on the enumeration there is no need for conditionals
group_name = dirpath.replace(os.sep,'/')
if root_dir == '?##':
# Set root_dir to top directory path in input file system
root_dir = group_name
group_name = group_name.replace(root_dir,'/')
#h5file.attrs.create(name='count',data=len(filenames_list))
h5file.attrs.create(name='file_list',data=filtered_filename_list)
else:
group_name = group_name.replace(root_dir+'/','/')
# Group hierarchy is implicitly defined by the forward slashes
h5file.create_group(group_name)
h5file[group_name].attrs.create(name='file_list',data=filtered_filename_list)
# TODO: for each "admissible" file in filenames, create an associated dataset in the corresponding group (subdirectory)
tmp_dirpath = os.path.join(os.getcwd(), 'tmp')
if not os.path.exists(tmp_dirpath):
os.mkdir(tmp_dirpath)
for filename in filtered_filename_list:
if 'ibw' in filename:
file_dict = g5505_file_reader.read_xps_ibw_file_as_dict(os.path.join(dirpath,filename))
h5file[group_name].create_dataset(name = file_dict['name'],
data = file_dict['data'],
#dtype = file_dict['dtype'],
shape = file_dict['shape'])
#h5file[group_name][file_dict['name']].dims[0] = file_dict['dimension_units']
for key in file_dict['attributes_dict'].keys():
h5file[group_name][file_dict['name']].attrs.create(name=key,data=file_dict['attributes_dict'][key])
if 'h5' in filename:
# Create copy of original file to avoid possible file corruption and work with it.
backup_filename = 'backup_'+filename
# Path
shutil.copy(os.path.join(dirpath,filename), os.path.join(tmp_dirpath,backup_filename))
# Open backup h5 file and copy complet filesystem directory onto a group in h5file
with h5py.File(os.path.join(tmp_dirpath,backup_filename),'r') as src_file:
h5file.copy(source=src_file['/'],dest= group_name +'/'+filename)
# TODO: generilize to multiphase chemistry text and dat files
# TODO: include header information from files as well
if ('txt' in filename or 'TXT' in filename) and any([item in os.path.join(dirpath,filename) for item in ['smps','gas']]):
if 'smps' in os.path.join(dirpath,filename):
file_dict = smog_chamber_file_reader.read_txt_files_as_dict(os.path.join(dirpath,filename),'smps')
elif 'gas' in os.path.join(dirpath,filename):
file_dict = smog_chamber_file_reader.read_txt_files_as_dict(os.path.join(dirpath,filename),'gas')
# TODO: create datasets of compound data type to include variable/or column names and datetimestamps
h5file[group_name].create_group(filename)
h5file[group_name][filename].create_dataset(name = 'data',
data = file_dict['data'],
#dtype = file_dict['dtype'],
shape = file_dict['data'].shape)
h5file[group_name][filename].create_dataset(name = 'data_column_names',
data = np.array(file_dict['data_column_names']),
#dtype = file_dict['dtype'],
shape = np.array(file_dict['data_column_names']).shape)
for key in file_dict['categ_data_dict'].keys():
h5file[group_name][filename].create_dataset(name=key,data=file_dict['categ_data_dict'][key])
def create_hdf5_file_from_dataframe(ofilename, input_data, approach : str, group_by_funcs : list, extract_attrs_func = None):
""" Creates an hdf5 file with as many levels as indicated by len(group_by_funcs).
Top level denotes the root group/directory and bottom level denotes measurement level groups.
Parameters:
input_data (pd.DataFrame | file-system path) :
group_by_funcs (list of callables or strs) : contains a list of callables or dataframe's column names that will be used
to partition or group files from top to bottom.
Callables in the list must assign a categorical value to each file in a file list, internally represented as a DataFrame,
and they thus return a pd.Series of categorical values.
On the other hand, strings in the list refer to the name of categorical columns in the input_data (when this is a DataFrame)
Returns:
"""
# Check whether input_data is a valid file-system path or a DataFrame
is_valid_path = lambda x : os.path.exists(input_data) if isinstance(input_data,str) else False
if is_valid_path(input_data):
file_list = os.listdir(input_data)
# Navigates file-system folders/directories from top to bottom.
#for dirpath, dirnames, filenames in os.walk(input_data,topdown=True):
#df = pd.DataFrame(file_list,columns=['filename'])
df = utils.augment_with_filetype(df)
elif isinstance(input_data,pd.DataFrame):
df = input_data.copy()
else:
raise ValueError("input_data must be either a valid file-system path or a dataframe.")
#
if utils.is_callable_list(group_by_funcs):
grouping_cols = []
for i, func in enumerate(group_by_funcs):
grouping_cols.append('level_'+str(i)+'_groups')
df['level_'+str(i)+'_groups'] = func(df)
elif utils.is_str_list(group_by_funcs) and all([item in df.columns for item in group_by_funcs]):
grouping_cols = group_by_funcs
else:
raise ValueError("'group_by_funcs' must be a list of callables (or str) that takes input_data as input an returns a valid categorical output.")
if approach == 'botton-up':
# TODO: implement botton-up approach
if is_nested_hierarchy(df.loc[:,grouping_cols]):
print('Do something')
else:
raise ValueError("group_by_funcs do not define a valid group hierarchy. Please reprocess the input_data or choose different grouping functions.")
elif approach == 'top-down':
# Check the length of group_by_funcs list is at most 2
#if len(group_by_funcs) > 2:
# # TODO: extend to more than 2 callable elements.
# raise ValueError("group_by_funcs can only contain at most two grouping elements.")
with h5py.File(ofilename, 'w') as file:
create_group_hierarchy(file, df, grouping_cols)
file.attrs.create(name='depth', data=len(grouping_cols)-1)
#join_path = lambda x,y: '/' + x + '/' + y
#for group_name in df[grouping_cols[0]].unique():
# group_filter = df[grouping_cols[0]]==group_name
# for subgroup_name in df.loc[group_filter,grouping_cols[1]].unique():
# # Create group subgroup folder structure implicitly.
# # Explicitly, grp = f.create_group(group_name), subgrp = grp.create_group(subgroup_name)
# print(join_path(group_name,subgroup_name))
# f.create_group(join_path(group_name,subgroup_name))
# Get groups at the bottom of the hierarchy
#bottom_level_groups = get_groups_at_a_level(file, file.attrs['depth'])
#nodes, parents, values = get_parent_child_relationships(file)
print(':)')
#fig = px.treemap(values=values,names=nodes, parents= parents)
#fig.update_traces(root_color="lightgrey")
#fig.update_layout(width = 800, height=600, margin = dict(t=50, l=25, r=25, b=25))
#fig.show()
else:
raise ValueError("'approach' must take values in ['top-down','bottom-up']")
#for i, value in enumerate(df['level_'+str(0)+'_groups'].unique().tolist()):
# 2. Validate group hierarchy, lower level groups must be embedded in higher level groups
# 3. Create hdf5 file with groups defined by the 'file_group' column
#
# Add datasets to groups and the groups and the group's attributes
#return 0
def main_5505():
inputfile_dir = '\\\\fs101\\5505\\People\\Juan\\TypicalBeamTime'
file_dict = g5505_file_reader.read_xps_ibw_file_as_dict(inputfile_dir+'\\SES\\0069069_N1s_495eV.ibw')
group_by_type = lambda x : utils.group_by_df_column(x,'filetype')
select_dir_keywords = ['NEXAFS', 'Notes', 'Photos', 'Pressure', 'RGA', 'SES']
create_hdf5_file_from_filesystem_path('test_sls_data.h5',inputfile_dir,select_dir_keywords,select_file_keywords=[])
display_group_hierarchy_on_a_treemap('test_smog_chamber_v5.h5')
#create_hdf5_file('test', inputfile_dir, 'Topdown', [group_by_type], extract_attrs_func = None)
def main_smog_chamber():
inputfile_dir = '\\\\fs03\\Iron_Sulphate'
include_list = ['htof','ams', 'ptr', 'gas','smps']
include_list = ['gas','smps\\20220726','htof\\2022.07.26','ptr\\2022.07.26','ams\\2022.07.26']
select_date_list = ['20220726','2022.07.26']
create_hdf5_file_from_filesystem_path('test_smog_chamber_v5.h5',inputfile_dir,include_list,select_date_list)
display_group_hierarchy_on_a_treemap('test_smog_chamber_v5.h5')
def main_mtable_h5_from_dataframe():
# Read BeamTimeMetaData.h5, containing Thorsten's Matlab Table
input_data_df = read_mtable_as_dataframe('input_files\\BeamTimeMetaData.h5')
# Preprocess Thorsten's input_data dataframe so that i can be used to create a newer .h5 file
# under certain grouping specificiations.
input_data_df = input_data_df.rename(columns = {'name':'filename'})
input_data_df = utils.augment_with_filenumber(input_data_df)
input_data_df = utils.augment_with_filetype(input_data_df)
input_data_df = utils.split_sample_col_into_sample_and_data_quality_cols(input_data_df)
input_data_df['lastModifiedDatestr'] = input_data_df['lastModifiedDatestr'].astype('datetime64[s]')
# Define grouping functions to be passed into create_hdf5_file function. These can also be set
# as strings refering to categorical columns in input_data_df.
test_grouping_funcs = True
if test_grouping_funcs:
group_by_sample = lambda x : utils.group_by_df_column(x,'sample')
group_by_type = lambda x : utils.group_by_df_column(x,'filetype')
group_by_filenumber = lambda x : utils.group_by_df_column(x,'filenumber')
else:
group_by_sample = 'sample'
group_by_type = 'filetype'
group_by_filenumber = 'filenumber'
create_hdf5_file_from_dataframe('test.h5',input_data_df, 'top-down', group_by_funcs = [group_by_sample, group_by_type, group_by_filenumber])
annotation_dict = {'Campaign name': 'SLS-Campaign-2023',
'Users':'Thorsten, Luca, Zoe',
'Startdate': str(input_data_df['lastModifiedDatestr'].min()),
'Enddate': str(input_data_df['lastModifiedDatestr'].max())
}
annotate_root_dir('test.h5',annotation_dict)
display_group_hierarchy_on_a_treemap('test.h5')
print(':)')
if __name__ == '__main__':
main_mtable_h5_from_dataframe()
print(':)')