Moved is_structured_array() and to_serializable_dtype() to utils, ranamed a few functions and propagated changes to dependent modules.
This commit is contained in:
@ -202,10 +202,10 @@ def update_hdf5_attributes(input_hdf5_file, yaml_dict):
|
||||
if attr_value.get('delete'): # delete when True
|
||||
hdf5_obj.attrs.__delitem__(attr_name)
|
||||
elif not (attr_value.get('rename_as') == attr_name): # update when true
|
||||
hdf5_obj.attrs[attr_value.get('rename_as')] = hdf5_obj.attrs[attr_name] # parse_attribute(attr_value)
|
||||
hdf5_obj.attrs[attr_value.get('rename_as')] = hdf5_obj.attrs[attr_name] # convert_attrdict_to_np_structured_array(attr_value)
|
||||
hdf5_obj.attrs.__delitem__(attr_name)
|
||||
else: # add a new attribute
|
||||
hdf5_obj.attrs.update({attr_name : utils.parse_attribute(attr_value)})
|
||||
hdf5_obj.attrs.update({attr_name : utils.convert_attrdict_to_np_structured_array(attr_value)})
|
||||
|
||||
with h5py.File(input_hdf5_file, 'r+') as f:
|
||||
for key in yaml_dict.keys():
|
||||
|
@ -151,23 +151,6 @@ def annotate_root_dir(filename,annotation_dict: dict):
|
||||
# file.attrs.create('metadata_'+key, annotation_dict[key])
|
||||
|
||||
|
||||
def is_valid_directory_path(dirpath,select_dir_keywords):
|
||||
|
||||
activated_keywords = []
|
||||
if select_dir_keywords:
|
||||
for item in select_dir_keywords:
|
||||
if len(item.split(os.sep))>1:
|
||||
is_sublist = all([x in dirpath.split(os.sep) for x in item.split(os.sep)])
|
||||
activated_keywords.append(is_sublist)
|
||||
else:
|
||||
activated_keywords.append(item in dirpath)
|
||||
else:
|
||||
activated_keywords.append(True)
|
||||
|
||||
return any(activated_keywords)
|
||||
|
||||
|
||||
|
||||
def transfer_file_dict_to_hdf5(h5file, group_name, file_dict):
|
||||
"""
|
||||
Transfers data from a file_dict to an HDF5 file.
|
||||
@ -413,7 +396,7 @@ def save_processed_dataframe_to_hdf5(df, annotator, output_filename): # src_hdf5
|
||||
|
||||
for key, value in data_level_attributes.items():
|
||||
if isinstance(value,dict):
|
||||
data_level_attributes[key] = utils.parse_attribute(value)
|
||||
data_level_attributes[key] = utils.convert_dict_to_np_structured_array(value)
|
||||
|
||||
|
||||
# Prepare file dictionary
|
||||
|
573
src/hdf5_lib_part2.py
Normal file
573
src/hdf5_lib_part2.py
Normal file
@ -0,0 +1,573 @@
|
||||
import sys
|
||||
import os
|
||||
root_dir = os.path.abspath(os.curdir)
|
||||
sys.path.append(root_dir)
|
||||
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
import h5py
|
||||
import logging
|
||||
|
||||
import utils.g5505_utils as utils
|
||||
import instruments.readers.filereader_registry as filereader_registry
|
||||
|
||||
def read_mtable_as_dataframe(filename):
|
||||
|
||||
""" Reconstruct a Matlab Table encoded in a .h5 file as a Pandas DataFrame. The input .h5 file
|
||||
contains as many groups as rows in the Matlab Table, and each group stores dataset-like variables in the Table as
|
||||
Datasets while categorical and numerical variables in the table are represented as attributes of each group.
|
||||
|
||||
Note: DataFrame is constructed columnwise to ensure homogenous data columns.
|
||||
|
||||
Parameters:
|
||||
|
||||
filename (str): .h5 file's name. It may include location-path information.
|
||||
|
||||
Returns:
|
||||
|
||||
output_dataframe (pd.DataFrame): Matlab's Table as a Pandas DataFrame
|
||||
|
||||
"""
|
||||
|
||||
#contructs dataframe by filling out entries columnwise. This way we can ensure homogenous data columns"""
|
||||
|
||||
with h5py.File(filename,'r') as file:
|
||||
|
||||
# Define group's attributes and datasets. This should hold
|
||||
# for all groups. TODO: implement verification and noncompliance error if needed.
|
||||
group_list = list(file.keys())
|
||||
group_attrs = list(file[group_list[0]].attrs.keys())
|
||||
#
|
||||
column_attr_names = [item[item.find('_')+1::] for item in group_attrs]
|
||||
column_attr_names_idx = [int(item[4:(item.find('_'))]) for item in group_attrs]
|
||||
|
||||
group_datasets = list(file[group_list[0]].keys()) if not 'DS_EMPTY' in file[group_list[0]].keys() else []
|
||||
#
|
||||
column_dataset_names = [file[group_list[0]][item].attrs['column_name'] for item in group_datasets]
|
||||
column_dataset_names_idx = [int(item[2:]) for item in group_datasets]
|
||||
|
||||
|
||||
# Define data_frame as group_attrs + group_datasets
|
||||
#pd_series_index = group_attrs + group_datasets
|
||||
pd_series_index = column_attr_names + column_dataset_names
|
||||
|
||||
output_dataframe = pd.DataFrame(columns=pd_series_index,index=group_list)
|
||||
|
||||
tmp_col = []
|
||||
|
||||
for meas_prop in group_attrs + group_datasets:
|
||||
if meas_prop in group_attrs:
|
||||
column_label = meas_prop[meas_prop.find('_')+1:]
|
||||
# Create numerical or categorical column from group's attributes
|
||||
tmp_col = [file[group_key].attrs[meas_prop][()][0] for group_key in group_list]
|
||||
else:
|
||||
# Create dataset column from group's datasets
|
||||
column_label = file[group_list[0] + '/' + meas_prop].attrs['column_name']
|
||||
#tmp_col = [file[group_key + '/' + meas_prop][()][0] for group_key in group_list]
|
||||
tmp_col = [file[group_key + '/' + meas_prop][()] for group_key in group_list]
|
||||
|
||||
output_dataframe.loc[:,column_label] = tmp_col
|
||||
|
||||
return output_dataframe
|
||||
|
||||
def create_group_hierarchy(obj, df, columns):
|
||||
|
||||
"""
|
||||
Input:
|
||||
obj (h5py.File or h5py.Group)
|
||||
columns (list of strs): denote categorical columns in df to be used to define hdf5 file group hierarchy
|
||||
"""
|
||||
|
||||
if not columns:
|
||||
return
|
||||
|
||||
# Determine categories associated with first categorical column
|
||||
unique_values = df[columns[0]].unique()
|
||||
|
||||
if obj.name == '/':
|
||||
obj.attrs.create('count',df.shape[0])
|
||||
obj.attrs.create('file_list',df['filename'].tolist())
|
||||
|
||||
for group_name in unique_values:
|
||||
|
||||
group = obj.require_group(group_name)
|
||||
group.attrs.create('column_name', columns[0])
|
||||
|
||||
sub_df = df[df[columns[0]]==group_name] # same as df.loc[df[columns[0]]==group_name,:]
|
||||
group.attrs.create('count',sub_df.shape[0])
|
||||
group.attrs.create('file_list',sub_df['filename'].tolist())
|
||||
|
||||
# if group_name == 'MgO powder,H2O,HCl':
|
||||
# print('Here:',sub_df.shape)
|
||||
create_group_hierarchy(group, sub_df, columns[1::])
|
||||
|
||||
def is_nested_hierarchy(df) -> bool:
|
||||
"""receives a dataframe with categorical columns and checks whether rows form a nested group hierarchy.
|
||||
That is, from bottom to top, subsequent hierarchical levels contain nested groups. The lower level groups belong to exactly one group in the higher level group.
|
||||
"""
|
||||
# TODO: generalize the code to check for deeper group hierachies.
|
||||
def are_nested(df, col, col_nxt):
|
||||
""" Checks whether low level LL groups can be separated in terms of high level HL groups.
|
||||
That is, elements of low-level groups do not belong to more than one HL group."""
|
||||
|
||||
# Compute higher level group names/categories
|
||||
memberships = df[col_nxt].unique().tolist()
|
||||
|
||||
# Compute upper-level group memberships of low-level groups
|
||||
col_avg_memberships = df.groupby(col).mean()[col_nxt].unique()
|
||||
|
||||
# Check whether all low-level groups have an actual hlg membership. That is, their avg. hlg membership is in the hlg membership.
|
||||
return all([col_avg_memberships[group_idx] in memberships for group_idx in range(len(col_avg_memberships))])
|
||||
|
||||
df_tmp = df.copy()
|
||||
|
||||
# Create relabeling map
|
||||
for column_name in df_tmp.columns:
|
||||
category_index = pd.Series(np.arange(len(df_tmp[column_name].unique())), index=df_tmp[column_name].unique())
|
||||
df_tmp[column_name] = category_index[df_tmp[column_name].tolist()].tolist()
|
||||
|
||||
df_tmp.plot()
|
||||
|
||||
return all([are_nested(df_tmp,'level_'+str(i)+'_groups','level_'+str(i+1)+'_groups') for i in range(len(df_tmp.columns)-1)])
|
||||
|
||||
|
||||
def get_groups_at_a_level(file: h5py.File, level: str):
|
||||
|
||||
groups = []
|
||||
def node_selector(name, obj):
|
||||
if name.count('/') == level:
|
||||
print(name)
|
||||
groups.append(obj.name)
|
||||
|
||||
file.visititems(node_selector)
|
||||
#file.visititems()
|
||||
return groups
|
||||
|
||||
|
||||
def annotate_root_dir(filename,annotation_dict: dict):
|
||||
with h5py.File(filename,'r+') as file:
|
||||
file.attrs.update(annotation_dict)
|
||||
#for key in annotation_dict:
|
||||
# file.attrs.create('metadata_'+key, annotation_dict[key])
|
||||
|
||||
|
||||
def transfer_file_dict_to_hdf5(h5file, group_name, file_dict):
|
||||
"""
|
||||
Transfers data from a file_dict to an HDF5 file.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
h5file : h5py.File
|
||||
HDF5 file object where the data will be written.
|
||||
group_name : str
|
||||
Name of the HDF5 group where data will be stored.
|
||||
file_dict : dict
|
||||
Dictionary containing file data to be transferred. Required structure:
|
||||
{
|
||||
'name': str,
|
||||
'attributes_dict': dict,
|
||||
'datasets': [
|
||||
{
|
||||
'name': str,
|
||||
'data': array-like,
|
||||
'shape': tuple,
|
||||
'attributes': dict (optional)
|
||||
},
|
||||
...
|
||||
]
|
||||
}
|
||||
|
||||
Returns
|
||||
-------
|
||||
None
|
||||
"""
|
||||
|
||||
if not file_dict:
|
||||
return
|
||||
|
||||
try:
|
||||
# Create group and add their attributes
|
||||
group = h5file[group_name].create_group(name=file_dict['name'])
|
||||
# Add group attributes
|
||||
group.attrs.update(file_dict['attributes_dict'])
|
||||
|
||||
# Add datasets to the just created group
|
||||
for dataset in file_dict['datasets']:
|
||||
dataset_obj = group.create_dataset(
|
||||
name=dataset['name'],
|
||||
data=dataset['data'],
|
||||
shape=dataset['shape']
|
||||
)
|
||||
|
||||
# Add dataset's attributes
|
||||
attributes = dataset.get('attributes', {})
|
||||
dataset_obj.attrs.update(attributes)
|
||||
except Exception as inst:
|
||||
print(inst)
|
||||
logging.error('Failed to transfer data into HDF5: %s', inst)
|
||||
|
||||
def copy_file_in_group(source_file_path, dest_file_obj : h5py.File, dest_group_name, work_with_copy : bool = True):
|
||||
# Create copy of original file to avoid possible file corruption and work with it.
|
||||
|
||||
if work_with_copy:
|
||||
tmp_file_path = utils.make_file_copy(source_file_path)
|
||||
else:
|
||||
tmp_file_path = source_file_path
|
||||
|
||||
# Open backup h5 file and copy complet filesystem directory onto a group in h5file
|
||||
with h5py.File(tmp_file_path,'r') as src_file:
|
||||
dest_file_obj.copy(source= src_file['/'], dest= dest_group_name)
|
||||
|
||||
if 'tmp_files' in tmp_file_path:
|
||||
os.remove(tmp_file_path)
|
||||
|
||||
def create_hdf5_file_from_filesystem_path(path_to_input_directory: str,
|
||||
path_to_filenames_dict: dict = None,
|
||||
select_dir_keywords : list = [],
|
||||
root_metadata_dict : dict = {}):
|
||||
|
||||
"""
|
||||
Creates an .h5 file with name "output_filename" that preserves the directory tree (or folder structure)
|
||||
of a given filesystem path.
|
||||
|
||||
The data integration capabilities are limited by our file reader, which can only access data from a list of
|
||||
admissible file formats. These, however, can be extended. Directories are groups in the resulting HDF5 file.
|
||||
Files are formatted as composite objects consisting of a group, file, and attributes.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
output_filename : str
|
||||
Name of the output HDF5 file.
|
||||
path_to_input_directory : str
|
||||
Path to root directory, specified with forward slashes, e.g., path/to/root.
|
||||
|
||||
path_to_filenames_dict : dict, optional
|
||||
A pre-processed dictionary where keys are directory paths on the input directory's tree and values are lists of files.
|
||||
If provided, 'input_file_system_path' is ignored.
|
||||
|
||||
select_dir_keywords : list
|
||||
List of string elements to consider or select only directory paths that contain
|
||||
a word in 'select_dir_keywords'. When empty, all directory paths are considered
|
||||
to be included in the HDF5 file group hierarchy.
|
||||
root_metadata_dict : dict
|
||||
Metadata to include at the root level of the HDF5 file.
|
||||
|
||||
Returns
|
||||
-------
|
||||
output_filename : str
|
||||
Path to the created HDF5 file.
|
||||
"""
|
||||
|
||||
|
||||
|
||||
if not '/' in path_to_input_directory:
|
||||
raise ValueError('path_to_input_directory needs to be specified using forward slashes "/".' )
|
||||
|
||||
#path_to_output_directory = os.path.join(path_to_input_directory,'..')
|
||||
path_to_input_directory = os.path.normpath(path_to_input_directory).strip(os.sep)
|
||||
|
||||
|
||||
for i, keyword in enumerate(select_dir_keywords):
|
||||
select_dir_keywords[i] = keyword.replace('/',os.sep)
|
||||
|
||||
if not path_to_filenames_dict:
|
||||
# On dry_run=True, returns path to files dictionary of the output directory without making a actual copy of the input directory
|
||||
path_to_output_directory = os.path.join(path_to_input_directory,'..')
|
||||
path_to_filenames_dict = utils.copy_directory_with_contraints(path_to_input_directory,
|
||||
path_to_output_directory,
|
||||
dry_run=True)
|
||||
# Set input_directory as copied input directory
|
||||
root_dir = path_to_input_directory
|
||||
path_to_output_file = path_to_input_directory.rstrip(os.path.sep) + '.h5'
|
||||
|
||||
with h5py.File(path_to_output_file, mode='w', track_order=True) as h5file:
|
||||
|
||||
number_of_dirs = len(path_to_filenames_dict.keys())
|
||||
dir_number = 1
|
||||
for dirpath, filtered_filenames_list in path_to_filenames_dict.items():
|
||||
|
||||
start_message = f'Starting to transfer files in directory: {dirpath}'
|
||||
end_message = f'\nCompleted transferring files in directory: {dirpath}'
|
||||
# Print and log the start message
|
||||
print(start_message)
|
||||
logging.info(start_message)
|
||||
|
||||
# Check if filtered_filenames_list is nonempty. TODO: This is perhaps redundant by design of path_to_filenames_dict.
|
||||
if not filtered_filenames_list:
|
||||
continue
|
||||
|
||||
group_name = dirpath.replace(os.sep,'/')
|
||||
group_name = group_name.replace(root_dir.replace(os.sep,'/') + '/', '/')
|
||||
|
||||
# Flatten group name to one level
|
||||
if select_dir_keywords:
|
||||
offset = sum([len(i.split(os.sep)) if i in dirpath else 0 for i in select_dir_keywords])
|
||||
else:
|
||||
offset = 1
|
||||
tmp_list = group_name.split('/')
|
||||
if len(tmp_list) > offset+1:
|
||||
group_name = '/'.join([tmp_list[i] for i in range(offset+1)])
|
||||
|
||||
# Group hierarchy is implicitly defined by the forward slashes
|
||||
if not group_name in h5file.keys():
|
||||
h5file.create_group(group_name)
|
||||
#h5file[group_name].attrs.create(name='filtered_file_list',data=convert_string_to_bytes(filtered_filename_list))
|
||||
#h5file[group_name].attrs.create(name='file_list',data=convert_string_to_bytes(filenames_list))
|
||||
else:
|
||||
print(group_name,' was already created.')
|
||||
|
||||
for filenumber, filename in enumerate(filtered_filenames_list):
|
||||
|
||||
#file_ext = os.path.splitext(filename)[1]
|
||||
#try:
|
||||
|
||||
# hdf5 path to filename group
|
||||
dest_group_name = f'{group_name}/{filename}'
|
||||
|
||||
if not 'h5' in filename:
|
||||
#file_dict = config_file.select_file_readers(group_id)[file_ext](os.path.join(dirpath,filename))
|
||||
#file_dict = ext_to_reader_dict[file_ext](os.path.join(dirpath,filename))
|
||||
file_dict = filereader_registry.select_file_reader(dest_group_name)(os.path.join(dirpath,filename))
|
||||
|
||||
transfer_file_dict_to_hdf5(h5file, group_name, file_dict)
|
||||
|
||||
else:
|
||||
source_file_path = os.path.join(dirpath,filename)
|
||||
dest_file_obj = h5file
|
||||
#group_name +'/'+filename
|
||||
#ext_to_reader_dict[file_ext](source_file_path, dest_file_obj, dest_group_name)
|
||||
#g5505f_reader.select_file_reader(dest_group_name)(source_file_path, dest_file_obj, dest_group_name)
|
||||
copy_file_in_group(source_file_path, dest_file_obj, dest_group_name, False)
|
||||
|
||||
# Update the progress bar and log the end message
|
||||
utils.progressBar(dir_number, number_of_dirs, end_message)
|
||||
logging.info(end_message)
|
||||
dir_number = dir_number + 1
|
||||
|
||||
|
||||
|
||||
if len(root_metadata_dict.keys())>0:
|
||||
for key, value in root_metadata_dict.items():
|
||||
#if key in h5file.attrs:
|
||||
# del h5file.attrs[key]
|
||||
h5file.attrs.create(key, value)
|
||||
#annotate_root_dir(output_filename,root_metadata_dict)
|
||||
|
||||
|
||||
#output_yml_filename_path = hdf5_vis.take_yml_snapshot_of_hdf5_file(output_filename)
|
||||
|
||||
return path_to_output_file #, output_yml_filename_path
|
||||
|
||||
def save_processed_dataframe_to_hdf5(df, annotator, output_filename): # src_hdf5_path, script_date, script_name):
|
||||
"""
|
||||
Save processed dataframe columns with annotations to an HDF5 file.
|
||||
|
||||
Parameters:
|
||||
df (pd.DataFrame): DataFrame containing processed time series.
|
||||
annotator (): Annotator object with get_metadata method.
|
||||
output_filename (str): Path to the source HDF5 file.
|
||||
"""
|
||||
# Convert datetime columns to string
|
||||
datetime_cols = df.select_dtypes(include=['datetime64']).columns
|
||||
|
||||
if list(datetime_cols):
|
||||
df[datetime_cols] = df[datetime_cols].map(str)
|
||||
|
||||
# Convert dataframe to structured array
|
||||
icad_data_table = utils.convert_dataframe_to_np_structured_array(df)
|
||||
|
||||
# Get metadata
|
||||
metadata_dict = annotator.get_metadata()
|
||||
|
||||
# Prepare project level attributes to be added at the root level
|
||||
|
||||
project_level_attributes = metadata_dict['metadata']['project']
|
||||
|
||||
# Prepare high-level attributes
|
||||
high_level_attributes = {
|
||||
'parent_files': metadata_dict['parent_files'],
|
||||
**metadata_dict['metadata']['sample'],
|
||||
**metadata_dict['metadata']['environment'],
|
||||
**metadata_dict['metadata']['instruments']
|
||||
}
|
||||
|
||||
# Prepare data level attributes
|
||||
data_level_attributes = metadata_dict['metadata']['datasets']
|
||||
|
||||
for key, value in data_level_attributes.items():
|
||||
if isinstance(value,dict):
|
||||
data_level_attributes[key] = utils.convert_attrdict_to_np_structured_array(value)
|
||||
|
||||
|
||||
# Prepare file dictionary
|
||||
file_dict = {
|
||||
'name': project_level_attributes['processing_file'],
|
||||
'attributes_dict': high_level_attributes,
|
||||
'datasets': [{
|
||||
'name': "data_table",
|
||||
'data': icad_data_table,
|
||||
'shape': icad_data_table.shape,
|
||||
'attributes': data_level_attributes
|
||||
}]
|
||||
}
|
||||
|
||||
# Check if the file exists
|
||||
if os.path.exists(output_filename):
|
||||
mode = "a"
|
||||
print(f"File {output_filename} exists. Opening in append mode.")
|
||||
else:
|
||||
mode = "w"
|
||||
print(f"File {output_filename} does not exist. Creating a new file.")
|
||||
|
||||
|
||||
# Write to HDF5
|
||||
with h5py.File(output_filename, mode) as h5file:
|
||||
# Add project level attributes at the root/top level
|
||||
h5file.attrs.update(project_level_attributes)
|
||||
transfer_file_dict_to_hdf5(h5file, '/', file_dict)
|
||||
|
||||
|
||||
def create_hdf5_file_from_dataframe(ofilename, input_data, approach : str, group_by_funcs : list, extract_attrs_func = None):
|
||||
|
||||
""" Creates an hdf5 file with as many levels as indicated by len(group_by_funcs).
|
||||
Top level denotes the root group/directory and bottom level denotes measurement level groups.
|
||||
|
||||
Parameters:
|
||||
input_data (pd.DataFrame) :
|
||||
group_by_funcs (list of callables or strs) : contains a list of callables or dataframe's column names that will be used
|
||||
to partition or group files from top to bottom.
|
||||
|
||||
Callables in the list must assign a categorical value to each file in a file list, internally represented as a DataFrame,
|
||||
and they thus return a pd.Series of categorical values.
|
||||
|
||||
On the other hand, strings in the list refer to the name of categorical columns in the input_data (when this is a DataFrame)
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
|
||||
# Check whether input_data is a valid file-system path or a DataFrame
|
||||
is_valid_path = lambda x : os.path.exists(input_data) if isinstance(input_data,str) else False
|
||||
|
||||
if is_valid_path(input_data):
|
||||
|
||||
file_list = os.listdir(input_data)
|
||||
|
||||
# Navigates file-system folders/directories from top to bottom.
|
||||
#for dirpath, dirnames, filenames in os.walk(input_data,topdown=True):
|
||||
|
||||
|
||||
#df = pd.DataFrame(file_list,columns=['filename'])
|
||||
df = utils.augment_with_filetype(df)
|
||||
|
||||
elif isinstance(input_data,pd.DataFrame):
|
||||
df = input_data.copy()
|
||||
else:
|
||||
raise ValueError("input_data must be either a valid file-system path or a dataframe.")
|
||||
|
||||
# Create group columns to form paths
|
||||
if utils.is_callable_list(group_by_funcs):
|
||||
grouping_cols = []
|
||||
for i, func in enumerate(group_by_funcs):
|
||||
grouping_cols.append('level_'+str(i)+'_groups')
|
||||
df['level_'+str(i)+'_groups'] = func(df)
|
||||
elif utils.is_str_list(group_by_funcs) and all([item in df.columns for item in group_by_funcs]):
|
||||
grouping_cols = group_by_funcs
|
||||
else:
|
||||
raise ValueError("'group_by_funcs' must be a list of callables (or str) that takes input_data as input an returns a valid categorical output.")
|
||||
|
||||
# Concatenate group columns to form paths
|
||||
df['group_path'] = df[grouping_cols].apply(lambda row: '/'.join(row.values.astype(str)), axis=1)
|
||||
|
||||
if approach == 'botton-up':
|
||||
# TODO: implement botton-up approach
|
||||
if is_nested_hierarchy(df.loc[:,grouping_cols]):
|
||||
print('Do something')
|
||||
else:
|
||||
raise ValueError("group_by_funcs do not define a valid group hierarchy. Please reprocess the input_data or choose different grouping functions.")
|
||||
|
||||
elif approach == 'top-down':
|
||||
# Check the length of group_by_funcs list is at most 2
|
||||
#if len(group_by_funcs) > 2:
|
||||
# # TODO: extend to more than 2 callable elements.
|
||||
# raise ValueError("group_by_funcs can only contain at most two grouping elements.")
|
||||
|
||||
with h5py.File(ofilename, 'w') as file:
|
||||
|
||||
# Create groups based on concatenated paths
|
||||
for path in df['group_path'].unique():
|
||||
file.create_group(path)
|
||||
# TODO: incorporate remaining cols (i.e., excluding the group columns) as either metadata or datasets
|
||||
|
||||
#create_group_hierarchy(file, df, grouping_cols)
|
||||
|
||||
file.attrs.create(name='depth', data=len(grouping_cols)-1)
|
||||
|
||||
print(':)')
|
||||
|
||||
else:
|
||||
raise ValueError("'approach' must take values in ['top-down','bottom-up']")
|
||||
|
||||
|
||||
#for i, value in enumerate(df['level_'+str(0)+'_groups'].unique().tolist()):
|
||||
|
||||
# 2. Validate group hierarchy, lower level groups must be embedded in higher level groups
|
||||
|
||||
# 3. Create hdf5 file with groups defined by the 'file_group' column
|
||||
#
|
||||
# Add datasets to groups and the groups and the group's attributes
|
||||
|
||||
#return 0
|
||||
|
||||
def main_mtable_h5_from_dataframe():
|
||||
|
||||
#import os
|
||||
ROOT_DIR = os.path.abspath(os.curdir)
|
||||
# Read BeamTimeMetaData.h5, containing Thorsten's Matlab Table
|
||||
input_data_df = read_mtable_as_dataframe(os.path.join(ROOT_DIR,'input_files\\BeamTimeMetaData.h5'))
|
||||
|
||||
# Preprocess Thorsten's input_data dataframe so that i can be used to create a newer .h5 file
|
||||
# under certain grouping specificiations.
|
||||
input_data_df = input_data_df.rename(columns = {'name':'filename'})
|
||||
input_data_df = utils.augment_with_filenumber(input_data_df)
|
||||
input_data_df = utils.augment_with_filetype(input_data_df)
|
||||
input_data_df = utils.split_sample_col_into_sample_and_data_quality_cols(input_data_df)
|
||||
input_data_df['lastModifiedDatestr'] = input_data_df['lastModifiedDatestr'].astype('datetime64[s]')
|
||||
|
||||
# Define grouping functions to be passed into create_hdf5_file function. These can also be set
|
||||
# as strings refering to categorical columns in input_data_df.
|
||||
|
||||
test_grouping_funcs = True
|
||||
if test_grouping_funcs:
|
||||
group_by_sample = lambda x : utils.group_by_df_column(x,'sample')
|
||||
group_by_type = lambda x : utils.group_by_df_column(x,'filetype')
|
||||
#group_by_filenumber = lambda x : utils.group_by_df_column(x,'filenumber')
|
||||
else:
|
||||
group_by_sample = 'sample'
|
||||
group_by_type = 'filetype'
|
||||
group_by_filenumber = 'filenumber'
|
||||
|
||||
output_filename_path = os.path.join('output_files','thorsten_file_list.h5')
|
||||
|
||||
create_hdf5_file_from_dataframe(output_filename_path,input_data_df, 'top-down', group_by_funcs = [group_by_sample, group_by_type])
|
||||
#create_hdf5_file_from_dataframe('test.h5',input_data_df, 'top-down', group_by_funcs = [group_by_sample, group_by_type, group_by_filenumber])
|
||||
|
||||
annotation_dict = {'1-Campaign name': '**SLS-Campaign-2023**',
|
||||
'2-Users':'Thorsten, Luca, Zoe',
|
||||
'3-Startdate': str(input_data_df['lastModifiedDatestr'].min()),
|
||||
'4-Enddate': str(input_data_df['lastModifiedDatestr'].max())
|
||||
}
|
||||
annotate_root_dir(output_filename_path, annotation_dict)
|
||||
|
||||
#display_group_hierarchy_on_a_treemap(output_filename_path)
|
||||
|
||||
print(':)')
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
#main()
|
||||
main_mtable_h5_from_dataframe()
|
||||
#main_5505()
|
||||
|
||||
print(':)')
|
||||
|
@ -76,7 +76,7 @@ class HDF5DataOpsManager():
|
||||
# Parse value into HDF5 admissible type
|
||||
for key in dataset_dict['attributes'].keys():
|
||||
value = dataset_dict['attributes'][key]
|
||||
dataset_dict['attributes'][key] = utils.parse_attribute(value)
|
||||
dataset_dict['attributes'][key] = utils.convert_attrdict_to_np_structured_array(value)
|
||||
|
||||
#name = dataset_dict['name']
|
||||
#data = dataset_dict['data']
|
||||
@ -98,7 +98,7 @@ class HDF5DataOpsManager():
|
||||
for new_attr_key in annotation_dict.keys():
|
||||
value = annotation_dict[new_attr_key]
|
||||
if isinstance(value, dict):
|
||||
annotation_dict[new_attr_key] = utils.parse_attribute(annotation_dict[new_attr_key])
|
||||
annotation_dict[new_attr_key] = utils.convert_attrdict_to_np_structured_array(annotation_dict[new_attr_key])
|
||||
obj.attrs.update(annotation_dict)
|
||||
|
||||
def get_metadata(self, obj_path):
|
||||
@ -231,60 +231,6 @@ def get_parent_child_relationships(file: h5py.File):
|
||||
return nodes, parent, values
|
||||
|
||||
|
||||
def to_serializable_dtype(value):
|
||||
|
||||
"""Transform value's dtype into YAML/JSON compatible dtype
|
||||
|
||||
Parameters
|
||||
----------
|
||||
value : _type_
|
||||
_description_
|
||||
|
||||
Returns
|
||||
-------
|
||||
_type_
|
||||
_description_
|
||||
"""
|
||||
try:
|
||||
if isinstance(value, np.generic):
|
||||
if np.issubdtype(value.dtype, np.bytes_):
|
||||
value = value.decode('utf-8')
|
||||
elif np.issubdtype(value.dtype, np.unicode_):
|
||||
value = str(value)
|
||||
elif np.issubdtype(value.dtype, np.number):
|
||||
value = float(value)
|
||||
else:
|
||||
print('Yaml-compatible data-type was not found. Value has been set to NaN.')
|
||||
value = np.nan
|
||||
elif isinstance(value, np.ndarray):
|
||||
# Handling structured array types (with fields)
|
||||
if value.dtype.names:
|
||||
value = {field: to_serializable_dtype(value[field]) for field in value.dtype.names}
|
||||
else:
|
||||
# Handling regular array NumPy types
|
||||
if np.issubdtype(value.dtype, np.bytes_):
|
||||
value = [item.decode('utf-8') for item in value] if len(value) > 1 else value[0].decode('utf-8')
|
||||
elif np.issubdtype(value.dtype, np.unicode_):
|
||||
value = [str(item) for item in value] if len(value) > 1 else str(value[0])
|
||||
elif np.issubdtype(value.dtype, np.integer):
|
||||
value = [int(item) for item in value] if len(value) > 1 else int(value[0])
|
||||
elif np.issubdtype(value.dtype, np.floating):
|
||||
value = [float(item) for item in value] if len(value) > 1 else float(value[0])
|
||||
else:
|
||||
print('Yaml-compatible data-type was not found. Value has been set to NaN.')
|
||||
value = np.nan
|
||||
|
||||
except Exception as e:
|
||||
print(f'Error converting value: {e}. Value has been set to NaN.')
|
||||
value = np.nan
|
||||
|
||||
return value
|
||||
|
||||
def is_structured_array(attr_val):
|
||||
if isinstance(attr_val,np.ndarray):
|
||||
return True if attr_val.dtype.names is not None else False
|
||||
else:
|
||||
return False
|
||||
|
||||
def construct_attributes_dict(attrs_obj):
|
||||
|
||||
@ -293,13 +239,13 @@ def construct_attributes_dict(attrs_obj):
|
||||
attr_dict[key] = {}
|
||||
if not key in ['file_list','filtered_file_list']:
|
||||
|
||||
if is_structured_array(value):
|
||||
if utils.is_structured_array(value):
|
||||
#for subattr in value.dtype.names:
|
||||
#attr_dict[key][subattr] = make_dtype_yaml_compatible(value[subattr])
|
||||
attr_dict[key] = to_serializable_dtype(value)
|
||||
attr_dict[key] = utils.to_serializable_dtype(value)
|
||||
else:
|
||||
attr_dict[key] = {"rename_as" : key,
|
||||
"value" : to_serializable_dtype(value)
|
||||
"value" : utils.to_serializable_dtype(value)
|
||||
}
|
||||
|
||||
#if isinstance(value,str):
|
||||
|
@ -109,7 +109,7 @@ def created_at():
|
||||
created_at = now_tz_aware.strftime('%Y-%m-%d_%H-%M-%S') + '_UTC-OFST_' + tz
|
||||
return created_at
|
||||
|
||||
def dataframe_to_np_structured_array(df: pd.DataFrame):
|
||||
def convert_dataframe_to_np_structured_array(df: pd.DataFrame):
|
||||
|
||||
# Define the dtype for the structured array, ensuring compatibility with h5py
|
||||
dtype = []
|
||||
@ -153,6 +153,47 @@ def convert_string_to_bytes(input_list: list):
|
||||
|
||||
return input_array_bytes
|
||||
|
||||
def convert_attrdict_to_np_structured_array(attr_value: dict):
|
||||
"""
|
||||
Converts a dictionary of attributes into a numpy structured array for HDF5
|
||||
compound type compatibility.
|
||||
|
||||
Each dictionary key is mapped to a field in the structured array, with the
|
||||
data type (S) determined by the longest string representation of the values.
|
||||
If the dictionary is empty, the function returns 'missing'.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
attr_value : dict
|
||||
Dictionary containing the attributes to be converted. Example:
|
||||
attr_value = {
|
||||
'name': 'Temperature',
|
||||
'unit': 'Celsius',
|
||||
'value': 23.5,
|
||||
'timestamp': '2023-09-26 10:00'
|
||||
}
|
||||
|
||||
Returns
|
||||
-------
|
||||
new_attr_value : ndarray or str
|
||||
Numpy structured array with UTF-8 encoded fields. Returns 'missing' if
|
||||
the input dictionary is empty.
|
||||
"""
|
||||
dtype = []
|
||||
values_list = []
|
||||
max_length = max(len(str(attr_value[key])) for key in attr_value.keys())
|
||||
for key in attr_value.keys():
|
||||
if key != 'rename_as':
|
||||
dtype.append((key, f'S{max_length}'))
|
||||
values_list.append(attr_value[key])
|
||||
if values_list:
|
||||
new_attr_value = np.array([tuple(values_list)], dtype=dtype)
|
||||
else:
|
||||
new_attr_value = 'missing'
|
||||
|
||||
return new_attr_value
|
||||
|
||||
|
||||
def infer_units(column_name):
|
||||
# TODO: complete or remove
|
||||
|
||||
@ -165,23 +206,6 @@ def infer_units(column_name):
|
||||
|
||||
return match
|
||||
|
||||
def parse_attribute(attr_value : dict):
|
||||
"Parse a dictionary attribute into an equivalent numpy structured array, which compatible with compound HDF5 type"
|
||||
dtype = []
|
||||
values_list = []
|
||||
max_length = max(len(str(attr_value[key])) for key in attr_value.keys())
|
||||
for key in attr_value.keys():
|
||||
if (not key=='rename_as'):
|
||||
dtype.append((key,f'S{max_length}'))
|
||||
values_list.append(attr_value[key])
|
||||
|
||||
if values_list:
|
||||
new_attr_value = np.array([tuple(values_list)],dtype=dtype)
|
||||
else:
|
||||
new_attr_value = 'missing'
|
||||
|
||||
return new_attr_value
|
||||
|
||||
def progressBar(count_value, total, suffix=''):
|
||||
bar_length = 100
|
||||
filled_up_Length = int(round(bar_length* count_value / float(total)))
|
||||
@ -270,4 +294,59 @@ def copy_directory_with_contraints(input_dir_path, output_dir_path,
|
||||
except Exception as e:
|
||||
logging.error("Failed to copy %s: %s", src_file_path, e)
|
||||
|
||||
return path_to_files_dict
|
||||
return path_to_files_dict
|
||||
|
||||
def to_serializable_dtype(value):
|
||||
|
||||
"""Transform value's dtype into YAML/JSON compatible dtype
|
||||
|
||||
Parameters
|
||||
----------
|
||||
value : _type_
|
||||
_description_
|
||||
|
||||
Returns
|
||||
-------
|
||||
_type_
|
||||
_description_
|
||||
"""
|
||||
try:
|
||||
if isinstance(value, np.generic):
|
||||
if np.issubdtype(value.dtype, np.bytes_):
|
||||
value = value.decode('utf-8')
|
||||
elif np.issubdtype(value.dtype, np.unicode_):
|
||||
value = str(value)
|
||||
elif np.issubdtype(value.dtype, np.number):
|
||||
value = float(value)
|
||||
else:
|
||||
print('Yaml-compatible data-type was not found. Value has been set to NaN.')
|
||||
value = np.nan
|
||||
elif isinstance(value, np.ndarray):
|
||||
# Handling structured array types (with fields)
|
||||
if value.dtype.names:
|
||||
value = {field: to_serializable_dtype(value[field]) for field in value.dtype.names}
|
||||
else:
|
||||
# Handling regular array NumPy types
|
||||
if np.issubdtype(value.dtype, np.bytes_):
|
||||
value = [item.decode('utf-8') for item in value] if len(value) > 1 else value[0].decode('utf-8')
|
||||
elif np.issubdtype(value.dtype, np.unicode_):
|
||||
value = [str(item) for item in value] if len(value) > 1 else str(value[0])
|
||||
elif np.issubdtype(value.dtype, np.integer):
|
||||
value = [int(item) for item in value] if len(value) > 1 else int(value[0])
|
||||
elif np.issubdtype(value.dtype, np.floating):
|
||||
value = [float(item) for item in value] if len(value) > 1 else float(value[0])
|
||||
else:
|
||||
print('Yaml-compatible data-type was not found. Value has been set to NaN.')
|
||||
value = np.nan
|
||||
|
||||
except Exception as e:
|
||||
print(f'Error converting value: {e}. Value has been set to NaN.')
|
||||
value = np.nan
|
||||
|
||||
return value
|
||||
|
||||
def is_structured_array(attr_val):
|
||||
if isinstance(attr_val,np.ndarray):
|
||||
return True if attr_val.dtype.names is not None else False
|
||||
else:
|
||||
return False
|
Reference in New Issue
Block a user