diff --git a/src/hdf5_lib_part2.py b/src/hdf5_lib_part2.py deleted file mode 100644 index 5682e79..0000000 --- a/src/hdf5_lib_part2.py +++ /dev/null @@ -1,403 +0,0 @@ -import sys -import os -root_dir = os.path.abspath(os.curdir) -sys.path.append(root_dir) - -import pandas as pd -import numpy as np -import h5py -import logging - -import utils.g5505_utils as utils -import instruments.readers.filereader_registry as filereader_registry -import src.hdf5_ops as hdf5_ops - - -def create_group_hierarchy(obj, df, columns): - - """ - Input: - obj (h5py.File or h5py.Group) - columns (list of strs): denote categorical columns in df to be used to define hdf5 file group hierarchy - """ - - if not columns: - return - - # Determine categories associated with first categorical column - unique_values = df[columns[0]].unique() - - if obj.name == '/': - obj.attrs.create('count',df.shape[0]) - obj.attrs.create('file_list',df['filename'].tolist()) - - for group_name in unique_values: - - group = obj.require_group(group_name) - group.attrs.create('column_name', columns[0]) - - sub_df = df[df[columns[0]]==group_name] # same as df.loc[df[columns[0]]==group_name,:] - group.attrs.create('count',sub_df.shape[0]) - group.attrs.create('file_list',sub_df['filename'].tolist()) - - # if group_name == 'MgO powder,H2O,HCl': - # print('Here:',sub_df.shape) - create_group_hierarchy(group, sub_df, columns[1::]) - -def is_nested_hierarchy(df) -> bool: - """receives a dataframe with categorical columns and checks whether rows form a nested group hierarchy. - That is, from bottom to top, subsequent hierarchical levels contain nested groups. The lower level groups belong to exactly one group in the higher level group. - """ - # TODO: generalize the code to check for deeper group hierachies. - def are_nested(df, col, col_nxt): - """ Checks whether low level LL groups can be separated in terms of high level HL groups. - That is, elements of low-level groups do not belong to more than one HL group.""" - - # Compute higher level group names/categories - memberships = df[col_nxt].unique().tolist() - - # Compute upper-level group memberships of low-level groups - col_avg_memberships = df.groupby(col).mean()[col_nxt].unique() - - # Check whether all low-level groups have an actual hlg membership. That is, their avg. hlg membership is in the hlg membership. - return all([col_avg_memberships[group_idx] in memberships for group_idx in range(len(col_avg_memberships))]) - - df_tmp = df.copy() - - # Create relabeling map - for column_name in df_tmp.columns: - category_index = pd.Series(np.arange(len(df_tmp[column_name].unique())), index=df_tmp[column_name].unique()) - df_tmp[column_name] = category_index[df_tmp[column_name].tolist()].tolist() - - df_tmp.plot() - - return all([are_nested(df_tmp,'level_'+str(i)+'_groups','level_'+str(i+1)+'_groups') for i in range(len(df_tmp.columns)-1)]) - - - -def transfer_file_dict_to_hdf5(h5file, group_name, file_dict): - """ - Transfers data from a file_dict to an HDF5 file. - - Parameters - ---------- - h5file : h5py.File - HDF5 file object where the data will be written. - group_name : str - Name of the HDF5 group where data will be stored. - file_dict : dict - Dictionary containing file data to be transferred. Required structure: - { - 'name': str, - 'attributes_dict': dict, - 'datasets': [ - { - 'name': str, - 'data': array-like, - 'shape': tuple, - 'attributes': dict (optional) - }, - ... - ] - } - - Returns - ------- - None - """ - - if not file_dict: - return - - try: - # Create group and add their attributes - group = h5file[group_name].create_group(name=file_dict['name']) - # Add group attributes - group.attrs.update(file_dict['attributes_dict']) - - # Add datasets to the just created group - for dataset in file_dict['datasets']: - dataset_obj = group.create_dataset( - name=dataset['name'], - data=dataset['data'], - shape=dataset['shape'] - ) - - # Add dataset's attributes - attributes = dataset.get('attributes', {}) - dataset_obj.attrs.update(attributes) - except Exception as inst: - print(inst) - logging.error('Failed to transfer data into HDF5: %s', inst) - -def copy_file_in_group(source_file_path, dest_file_obj : h5py.File, dest_group_name, work_with_copy : bool = True): - # Create copy of original file to avoid possible file corruption and work with it. - - if work_with_copy: - tmp_file_path = utils.make_file_copy(source_file_path) - else: - tmp_file_path = source_file_path - - # Open backup h5 file and copy complet filesystem directory onto a group in h5file - with h5py.File(tmp_file_path,'r') as src_file: - dest_file_obj.copy(source= src_file['/'], dest= dest_group_name) - - if 'tmp_files' in tmp_file_path: - os.remove(tmp_file_path) - -def create_hdf5_file_from_filesystem_path(path_to_input_directory: str, - path_to_filenames_dict: dict = None, - select_dir_keywords : list = [], - root_metadata_dict : dict = {}): - - """ - Creates an .h5 file with name "output_filename" that preserves the directory tree (or folder structure) - of a given filesystem path. - - The data integration capabilities are limited by our file reader, which can only access data from a list of - admissible file formats. These, however, can be extended. Directories are groups in the resulting HDF5 file. - Files are formatted as composite objects consisting of a group, file, and attributes. - - Parameters - ---------- - output_filename : str - Name of the output HDF5 file. - path_to_input_directory : str - Path to root directory, specified with forward slashes, e.g., path/to/root. - - path_to_filenames_dict : dict, optional - A pre-processed dictionary where keys are directory paths on the input directory's tree and values are lists of files. - If provided, 'input_file_system_path' is ignored. - - select_dir_keywords : list - List of string elements to consider or select only directory paths that contain - a word in 'select_dir_keywords'. When empty, all directory paths are considered - to be included in the HDF5 file group hierarchy. - root_metadata_dict : dict - Metadata to include at the root level of the HDF5 file. - - Returns - ------- - output_filename : str - Path to the created HDF5 file. - """ - - - - if not '/' in path_to_input_directory: - raise ValueError('path_to_input_directory needs to be specified using forward slashes "/".' ) - - #path_to_output_directory = os.path.join(path_to_input_directory,'..') - path_to_input_directory = os.path.normpath(path_to_input_directory).strip(os.sep) - - - for i, keyword in enumerate(select_dir_keywords): - select_dir_keywords[i] = keyword.replace('/',os.sep) - - if not path_to_filenames_dict: - # On dry_run=True, returns path to files dictionary of the output directory without making a actual copy of the input directory - path_to_output_directory = os.path.join(path_to_input_directory,'..') - path_to_filenames_dict = utils.copy_directory_with_contraints(path_to_input_directory, - path_to_output_directory, - dry_run=True) - # Set input_directory as copied input directory - root_dir = path_to_input_directory - path_to_output_file = path_to_input_directory.rstrip(os.path.sep) + '.h5' - - with h5py.File(path_to_output_file, mode='w', track_order=True) as h5file: - - number_of_dirs = len(path_to_filenames_dict.keys()) - dir_number = 1 - for dirpath, filtered_filenames_list in path_to_filenames_dict.items(): - - start_message = f'Starting to transfer files in directory: {dirpath}' - end_message = f'\nCompleted transferring files in directory: {dirpath}' - # Print and log the start message - print(start_message) - logging.info(start_message) - - # Check if filtered_filenames_list is nonempty. TODO: This is perhaps redundant by design of path_to_filenames_dict. - if not filtered_filenames_list: - continue - - group_name = dirpath.replace(os.sep,'/') - group_name = group_name.replace(root_dir.replace(os.sep,'/') + '/', '/') - - # Flatten group name to one level - if select_dir_keywords: - offset = sum([len(i.split(os.sep)) if i in dirpath else 0 for i in select_dir_keywords]) - else: - offset = 1 - tmp_list = group_name.split('/') - if len(tmp_list) > offset+1: - group_name = '/'.join([tmp_list[i] for i in range(offset+1)]) - - # Group hierarchy is implicitly defined by the forward slashes - if not group_name in h5file.keys(): - h5file.create_group(group_name) - #h5file[group_name].attrs.create(name='filtered_file_list',data=convert_string_to_bytes(filtered_filename_list)) - #h5file[group_name].attrs.create(name='file_list',data=convert_string_to_bytes(filenames_list)) - else: - print(group_name,' was already created.') - - for filenumber, filename in enumerate(filtered_filenames_list): - - #file_ext = os.path.splitext(filename)[1] - #try: - - # hdf5 path to filename group - dest_group_name = f'{group_name}/{filename}' - - if not 'h5' in filename: - #file_dict = config_file.select_file_readers(group_id)[file_ext](os.path.join(dirpath,filename)) - #file_dict = ext_to_reader_dict[file_ext](os.path.join(dirpath,filename)) - file_dict = filereader_registry.select_file_reader(dest_group_name)(os.path.join(dirpath,filename)) - - transfer_file_dict_to_hdf5(h5file, group_name, file_dict) - - else: - source_file_path = os.path.join(dirpath,filename) - dest_file_obj = h5file - #group_name +'/'+filename - #ext_to_reader_dict[file_ext](source_file_path, dest_file_obj, dest_group_name) - #g5505f_reader.select_file_reader(dest_group_name)(source_file_path, dest_file_obj, dest_group_name) - copy_file_in_group(source_file_path, dest_file_obj, dest_group_name, False) - - # Update the progress bar and log the end message - utils.progressBar(dir_number, number_of_dirs, end_message) - logging.info(end_message) - dir_number = dir_number + 1 - - - - if len(root_metadata_dict.keys())>0: - for key, value in root_metadata_dict.items(): - #if key in h5file.attrs: - # del h5file.attrs[key] - h5file.attrs.create(key, value) - - - #output_yml_filename_path = hdf5_vis.take_yml_snapshot_of_hdf5_file(output_filename) - - return path_to_output_file #, output_yml_filename_path - -def save_processed_dataframe_to_hdf5(df, annotator, output_filename): # src_hdf5_path, script_date, script_name): - """ - Save processed dataframe columns with annotations to an HDF5 file. - - Parameters: - df (pd.DataFrame): DataFrame containing processed time series. - annotator (): Annotator object with get_metadata method. - output_filename (str): Path to the source HDF5 file. - """ - # Convert datetime columns to string - datetime_cols = df.select_dtypes(include=['datetime64']).columns - - if list(datetime_cols): - df[datetime_cols] = df[datetime_cols].map(str) - - # Convert dataframe to structured array - icad_data_table = utils.convert_dataframe_to_np_structured_array(df) - - # Get metadata - metadata_dict = annotator.get_metadata() - - # Prepare project level attributes to be added at the root level - - project_level_attributes = metadata_dict['metadata']['project'] - - # Prepare high-level attributes - high_level_attributes = { - 'parent_files': metadata_dict['parent_files'], - **metadata_dict['metadata']['sample'], - **metadata_dict['metadata']['environment'], - **metadata_dict['metadata']['instruments'] - } - - # Prepare data level attributes - data_level_attributes = metadata_dict['metadata']['datasets'] - - for key, value in data_level_attributes.items(): - if isinstance(value,dict): - data_level_attributes[key] = utils.convert_attrdict_to_np_structured_array(value) - - - # Prepare file dictionary - file_dict = { - 'name': project_level_attributes['processing_file'], - 'attributes_dict': high_level_attributes, - 'datasets': [{ - 'name': "data_table", - 'data': icad_data_table, - 'shape': icad_data_table.shape, - 'attributes': data_level_attributes - }] - } - - # Check if the file exists - if os.path.exists(output_filename): - mode = "a" - print(f"File {output_filename} exists. Opening in append mode.") - else: - mode = "w" - print(f"File {output_filename} does not exist. Creating a new file.") - - - # Write to HDF5 - with h5py.File(output_filename, mode) as h5file: - # Add project level attributes at the root/top level - h5file.attrs.update(project_level_attributes) - transfer_file_dict_to_hdf5(h5file, '/', file_dict) - -def main_mtable_h5_from_dataframe(): - - #import os - ROOT_DIR = os.path.abspath(os.curdir) - # Read BeamTimeMetaData.h5, containing Thorsten's Matlab Table - input_data_df = read_mtable_as_dataframe(os.path.join(ROOT_DIR,'input_files\\BeamTimeMetaData.h5')) - - # Preprocess Thorsten's input_data dataframe so that i can be used to create a newer .h5 file - # under certain grouping specificiations. - input_data_df = input_data_df.rename(columns = {'name':'filename'}) - input_data_df = utils.augment_with_filenumber(input_data_df) - input_data_df = utils.augment_with_filetype(input_data_df) - input_data_df = utils.split_sample_col_into_sample_and_data_quality_cols(input_data_df) - input_data_df['lastModifiedDatestr'] = input_data_df['lastModifiedDatestr'].astype('datetime64[s]') - - # Define grouping functions to be passed into create_hdf5_file function. These can also be set - # as strings refering to categorical columns in input_data_df. - - test_grouping_funcs = True - if test_grouping_funcs: - group_by_sample = lambda x : utils.group_by_df_column(x,'sample') - group_by_type = lambda x : utils.group_by_df_column(x,'filetype') - #group_by_filenumber = lambda x : utils.group_by_df_column(x,'filenumber') - else: - group_by_sample = 'sample' - group_by_type = 'filetype' - group_by_filenumber = 'filenumber' - - output_filename_path = os.path.join('output_files','thorsten_file_list.h5') - - create_hdf5_file_from_dataframe(output_filename_path,input_data_df, 'top-down', group_by_funcs = [group_by_sample, group_by_type]) - #create_hdf5_file_from_dataframe('test.h5',input_data_df, 'top-down', group_by_funcs = [group_by_sample, group_by_type, group_by_filenumber]) - - annotation_dict = {'1-Campaign name': '**SLS-Campaign-2023**', - '2-Users':'Thorsten, Luca, Zoe', - '3-Startdate': str(input_data_df['lastModifiedDatestr'].min()), - '4-Enddate': str(input_data_df['lastModifiedDatestr'].max()) - } - hdf5_ops.annotate_root_dir(output_filename_path, annotation_dict) - - #display_group_hierarchy_on_a_treemap(output_filename_path) - - print(':)') - -if __name__ == '__main__': - - #main() - main_mtable_h5_from_dataframe() - #main_5505() - - print(':)') -