Files
dima/hdf5_lib.py

389 lines
15 KiB
Python

import pandas as pd
import h5py
import os
#import sys
#from itertools import product
import numpy as np
import matplotlib.pyplot as plt
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
def read_hdf5_as_dataframe(filename):
""" Reconstruct a Matlab Table encoded in a .h5 file as a Pandas DataFrame. The input h5. file
contains as many groups as rows in the Matlab Table, and each group stores dataset-like variables in the Table as
Datasets while categorical and numerical variables in the table are represented as attributes of each group.
Note:DataFrame is constructed columnwise to ensure homogenous data columns.
Parameters:
filename (str): .h5 file's name. It may include location-path information.
Returns:
output_dataframe (pd.DataFrame): Matlab's Table as a Pandas DataFrame
"""
#contructs dataframe by filling out entries columnwise. This way we can ensure homogenous data columns"""
with h5py.File(filename,'r') as file:
# Define group's attributes and datasets. This should hold
# for all groups. TODO: implement verification and noncompliance error if needed.
group_list = list(file.keys())
group_attrs = list(file[group_list[0]].attrs.keys())
#
column_attr_names = [item[item.find('_')+1::] for item in group_attrs]
column_attr_names_idx = [int(item[4:(item.find('_'))]) for item in group_attrs]
group_datasets = list(file[group_list[0]].keys()) if not 'DS_EMPTY' in file[group_list[0]].keys() else []
#
column_dataset_names = [file[group_list[0]][item].attrs['column_name'] for item in group_datasets]
column_dataset_names_idx = [int(item[2:]) for item in group_datasets]
# Define data_frame as group_attrs + group_datasets
#pd_series_index = group_attrs + group_datasets
pd_series_index = column_attr_names + column_dataset_names
output_dataframe = pd.DataFrame(columns=pd_series_index,index=group_list)
tmp_col = []
for meas_prop in group_attrs + group_datasets:
if meas_prop in group_attrs:
column_label = meas_prop[meas_prop.find('_')+1:]
# Create numerical or categorical column from group's attributes
tmp_col = [file[group_key].attrs[meas_prop][()][0] for group_key in group_list]
else:
# Create dataset column from group's datasets
column_label = file[group_list[0] + '/' + meas_prop].attrs['column_name']
#tmp_col = [file[group_key + '/' + meas_prop][()][0] for group_key in group_list]
tmp_col = [file[group_key + '/' + meas_prop][()] for group_key in group_list]
output_dataframe.loc[:,column_label] = tmp_col
return output_dataframe
def is_callable_list(x : list):
return all([callable(item) for item in x])
def is_str_list(x : list):
return all([isinstance(item,str) for item in x])
def is_nested_hierarchy(df) -> bool:
"""receives a dataframe with categorical columns and checks whether rows form a nested group hierarchy.
That is, from bottom to top, subsequent hierarchical levels contain nested groups. The lower level groups belong to exactly one group in the higher level group.
"""
# TODO: generalize the code to check for deeper group hierachies.
def are_nested(df, col, col_nxt):
""" Checks whether low level LL groups can be separated in terms of high level HL groups.
That is, elements of low-level groups do not belong to more than one HL group."""
# Compute higher level group names/categories
memberships = df[col_nxt].unique().tolist()
# Compute upper-level group memberships of low-level groups
col_avg_memberships = df.groupby(col).mean()[col_nxt].unique()
# Check whether all low-level groups have an actual hlg membership. That is, their avg. hlg membership is in the hlg membership.
return all([col_avg_memberships[group_idx] in memberships for group_idx in range(len(col_avg_memberships))])
df_tmp = df.copy()
# Create relabeling map
for column_name in df_tmp.columns:
category_index = pd.Series(np.arange(len(df_tmp[column_name].unique())), index=df_tmp[column_name].unique())
df_tmp[column_name] = category_index[df_tmp[column_name].tolist()].tolist()
df_tmp.plot()
return all([are_nested(df_tmp,'level_'+str(i)+'_groups','level_'+str(i+1)+'_groups') for i in range(len(df_tmp.columns)-1)])
def get_attr_names(input_data):
# TODO: extend this to file-system paths
if not isinstance(input_data,pd.DataFrame):
raise ValueError("input_data must be a pd.DataFrame")
return input_data.columns
def create_group_hierarchy(obj, df, columns):
"""
Input:
obj (h5py.File or h5py.Group)
columns (list of strs): denote categorical columns in df to be used to define hdf5 file group hierarchy
"""
if not columns:
return
# Determine categories associated with first categorical column
unique_values = df[columns[0]].unique()
for group_name in unique_values:
group = obj.require_group(group_name)
group.attrs.create('column_name', columns[0])
sub_df = df[df[columns[0]]==group_name] # same as df.loc[df[columns[0]]==group_name,:]
group.attrs.create('count',sub_df.shape[0])
#if group_name == 'MgO powder,H2O,HCl':
# print('Here:',sub_df.shape)
create_group_hierarchy(group, sub_df, columns[1::])
def get_parent_child_relationships(file: h5py.File):
nodes = []
parent = []
values = []
def node_visitor(name,obj):
if isinstance(obj,h5py.Group):
nodes.append(obj.name)
parent.append(obj.parent.name)
#nodes.append(os.path.split(obj.name)[1])
#parent.append(os.path.split(obj.parent.name)[1])
values.append(obj.attrs['count'])
file.visititems(node_visitor)
return nodes, parent, values
def get_groups_at_a_level(file: h5py.File, level: str):
groups = []
def node_selector(name, obj):
if name.count('/') == level:
print(name)
groups.append(obj.name)
file.visititems(node_selector)
#file.visititems()
return groups
def format_group_names(names: list):
formated_names = []
for name in names:
idx = name.rfind('/')
if len(name) > 1:
formated_names.append(name[idx+1::])
else:
formated_names.append(name)
return pd.DataFrame(formated_names,columns=['formated_names'],index=names)
def display_group_hierarchy_on_treemap(filename: str):
with h5py.File(filename,'r') as file:
nodes, parents, values = get_parent_child_relationships(file)
#formating_df = format_group_names(nodes + ["/"])
fig = make_subplots(1, 1, specs=[[{"type": "domain"}]],)
fig.add_trace(go.Treemap(
labels=nodes, #formating_df['formated_names'][nodes],
parents=parents,#formating_df['formated_names'][parents],
values=values,
branchvalues='total',
customdata= pd.Series(nodes),
#marker=dict(
# colors=df_all_trees['color'],
# colorscale='RdBu',
# cmid=average_score),
#hovertemplate='<b>%{label} </b> <br> Number of files: %{value}<br> Success rate: %{color:.2f}',
hovertemplate='<b>%{label} </b> <br> Count: %{value} <br> Path: %{customdata}',
name=''
))
fig.update_layout(width = 800, height= 600, margin = dict(t=50, l=25, r=25, b=25))
fig.show()
def create_hdf5_file(ofilename, input_data, approach : str, group_by_funcs : list, extract_attrs_func = None):
""" Creates an hdf5 file with as many levels as indicated by len(group_by_funcs).
Top level denotes the root group/directory and bottom level denotes measurement level groups.
Parameters:
input_data (pd.DataFrame | file-system path) :
group_by_funcs (list of callables or strs) : contains a list of callables or dataframe's column names that will be used
to partition or group files from top to bottom.
Callables in the list must assign a categorical value to each file in a file list, internally represented as a DataFrame,
and they thus return a pd.Series of categorical values.
On the other hand, strings in the list refer to the name of categorical columns in the input_data (when this is a DataFrame)
Returns:
"""
# Check whether input_data is a valid file-system path or a DataFrame
check_possible_path = lambda x : os.path.exists(input_data) if isinstance(input_data,str) else False
if check_possible_path(input_data):
file_list = os.listdir(input_data)
df = pd.DataFrame(file_list,columns='filename')
elif isinstance(input_data,pd.DataFrame):
df = input_data.copy()
else:
raise ValueError("input_data must be either a valid file-system path or a dataframe.")
#
if is_callable_list(group_by_funcs):
grouping_cols = []
for i, func in enumerate(group_by_funcs):
grouping_cols.append('level_'+str(i)+'_groups')
df['level_'+str(i)+'_groups'] = func(df)
elif is_str_list(group_by_funcs) and all([item in df.columns for item in group_by_funcs]):
grouping_cols = group_by_funcs
else:
raise ValueError("'group_by_funcs' must be a list of callables (or str) that takes input_data as input an returns a valid categorical output.")
if approach == 'botton-up':
# TODO: implement botton-up approach
if is_nested_hierarchy(df.loc[:,grouping_cols]):
print('Do something')
else:
raise ValueError("group_by_funcs do not define a valid group hierarchy. Please reprocess the input_data or choose different grouping functions.")
elif approach == 'top-down':
# Check the length of group_by_funcs list is at most 2
#if len(group_by_funcs) > 2:
# # TODO: extend to more than 2 callable elements.
# raise ValueError("group_by_funcs can only contain at most two grouping elements.")
with h5py.File(ofilename, 'w') as file:
create_group_hierarchy(file, df, grouping_cols)
file.attrs.create(name='depth', data=len(grouping_cols)-1)
#join_path = lambda x,y: '/' + x + '/' + y
#for group_name in df[grouping_cols[0]].unique():
# group_filter = df[grouping_cols[0]]==group_name
# for subgroup_name in df.loc[group_filter,grouping_cols[1]].unique():
# # Create group subgroup folder structure implicitly.
# # Explicitly, grp = f.create_group(group_name), subgrp = grp.create_group(subgroup_name)
# print(join_path(group_name,subgroup_name))
# f.create_group(join_path(group_name,subgroup_name))
# Get groups at the bottom of the hierarchy
bottom_level_groups = get_groups_at_a_level(file, file.attrs['depth'])
nodes, parents, values = get_parent_child_relationships(file)
print(':)')
#fig = px.treemap(values=values,names=nodes, parents= parents)
#fig.update_traces(root_color="lightgrey")
#fig.update_layout(width = 800, height=600, margin = dict(t=50, l=25, r=25, b=25))
#fig.show()
else:
raise ValueError("'approach' must take values in ['top-down','bottom-up']")
#for i, value in enumerate(df['level_'+str(0)+'_groups'].unique().tolist()):
# 2. Validate group hierarchy, lower level groups must be embedded in higher level groups
# 3. Create hdf5 file with groups defined by the 'file_group' column
#
# Add datasets to groups and the groups and the group's attributes
return 0
def augment_with_filetype(df):
df['filetype'] = [os.path.splitext(item)[1][1::] for item in df['filename']]
#return [os.path.splitext(item)[1][1::] for item in df['filename']]
return df
def augment_with_filenumber(df):
df['filenumber'] = [item[0:item.find('_')] for item in df['filename']]
#return [item[0:item.find('_')] for item in df['filename']]
return df
def group_by_df_column(df, column_name: str):
"""
df (pandas.DataFrame):
column_name (str): column_name of df by which grouping operation will take place.
"""
if not column_name in df.columns:
raise ValueError("column_name must be in the columns of df.")
return df[column_name]
def main():
# input data frame
input_data = read_hdf5_as_dataframe('input_files\\BeamTimeMetaData.h5')
# Rename column 'name' with 'filename'. get_filetype finds filetypes based on extension of filenames assumed to be located at the column 'filename'.
input_data = input_data.rename(columns = {'name':'filename'})
# Add column with filetypes to input_data
input_data = augment_with_filenumber(input_data)
input_data = augment_with_filetype(input_data)
#input_data['filetype'] = get_filetype(input_data)
print(input_data['filetype'].unique())
# Reduce input_data to files of ibw type
input_data = input_data.loc[input_data['filetype']=='ibw', : ]
#input_data = input_data.loc[input_data['sample']!='' , : ]
sample_name = []
sample_quality = []
for item in input_data['sample']:
if item.find('(')!=-1:
print(item)
sample_name.append(item[0:item.find('(')])
sample_quality.append(item[item.find('(')+1:len(item)-1])
else:
if item=='':
sample_name.append('Not yet annotated')
sample_quality.append('unevaluated')
else:
sample_name.append(item)
sample_quality.append('good data')
input_data['sample'] = sample_name
input_data['data_quality'] = sample_quality
group_by_sample = lambda x : group_by_df_column(x,'sample')
group_by_type = lambda x : group_by_df_column(x,'filetype')
group_by_filenumber = lambda x : group_by_df_column(x,'filenumber')
#fig = px.treemap(values=[10,4,3,3,2],names=[1,2,3,4,5], parents=[None,1,1,1,2],hover_name=['si senhor',':)',':)',':)','bottom'])
#fig = px.treemap(input_data,path=[px.Constant("BeamtimeMetadata.h5"),'sample','filenumber'])
#fig.update_traces(root_color = "lightgrey")
#fig.update_layout(margin = dict(t=50, l=25, r=25, b=25))
#fig.show()
success = create_hdf5_file('test.h5',input_data, 'top-down', group_by_funcs = [group_by_sample, group_by_filenumber])
display_group_hierarchy_on_treemap('test.h5')
print(':)')
#success = create_hdf5_file('test_v2.h5',input_data, 'top-down', group_by_funcs = ['sample','filenumber','filetype'])
#df['file_group']
#print(df.head())
if __name__ == '__main__':
main()