diff --git a/pipelines/metadata_revision.py b/pipelines/metadata_revision.py index 5311cb5..f5b2c5c 100644 --- a/pipelines/metadata_revision.py +++ b/pipelines/metadata_revision.py @@ -202,10 +202,10 @@ def update_hdf5_attributes(input_hdf5_file, yaml_dict): if attr_value.get('delete'): # delete when True hdf5_obj.attrs.__delitem__(attr_name) elif not (attr_value.get('rename_as') == attr_name): # update when true - hdf5_obj.attrs[attr_value.get('rename_as')] = hdf5_obj.attrs[attr_name] # parse_attribute(attr_value) + hdf5_obj.attrs[attr_value.get('rename_as')] = hdf5_obj.attrs[attr_name] # convert_attrdict_to_np_structured_array(attr_value) hdf5_obj.attrs.__delitem__(attr_name) else: # add a new attribute - hdf5_obj.attrs.update({attr_name : utils.parse_attribute(attr_value)}) + hdf5_obj.attrs.update({attr_name : utils.convert_attrdict_to_np_structured_array(attr_value)}) with h5py.File(input_hdf5_file, 'r+') as f: for key in yaml_dict.keys(): diff --git a/src/hdf5_lib.py b/src/hdf5_lib.py index 152b100..c7e8cc9 100644 --- a/src/hdf5_lib.py +++ b/src/hdf5_lib.py @@ -151,23 +151,6 @@ def annotate_root_dir(filename,annotation_dict: dict): # file.attrs.create('metadata_'+key, annotation_dict[key]) -def is_valid_directory_path(dirpath,select_dir_keywords): - - activated_keywords = [] - if select_dir_keywords: - for item in select_dir_keywords: - if len(item.split(os.sep))>1: - is_sublist = all([x in dirpath.split(os.sep) for x in item.split(os.sep)]) - activated_keywords.append(is_sublist) - else: - activated_keywords.append(item in dirpath) - else: - activated_keywords.append(True) - - return any(activated_keywords) - - - def transfer_file_dict_to_hdf5(h5file, group_name, file_dict): """ Transfers data from a file_dict to an HDF5 file. @@ -413,7 +396,7 @@ def save_processed_dataframe_to_hdf5(df, annotator, output_filename): # src_hdf5 for key, value in data_level_attributes.items(): if isinstance(value,dict): - data_level_attributes[key] = utils.parse_attribute(value) + data_level_attributes[key] = utils.convert_dict_to_np_structured_array(value) # Prepare file dictionary diff --git a/src/hdf5_lib_part2.py b/src/hdf5_lib_part2.py new file mode 100644 index 0000000..e825daa --- /dev/null +++ b/src/hdf5_lib_part2.py @@ -0,0 +1,573 @@ +import sys +import os +root_dir = os.path.abspath(os.curdir) +sys.path.append(root_dir) + +import pandas as pd +import numpy as np +import h5py +import logging + +import utils.g5505_utils as utils +import instruments.readers.filereader_registry as filereader_registry + +def read_mtable_as_dataframe(filename): + + """ Reconstruct a Matlab Table encoded in a .h5 file as a Pandas DataFrame. The input .h5 file + contains as many groups as rows in the Matlab Table, and each group stores dataset-like variables in the Table as + Datasets while categorical and numerical variables in the table are represented as attributes of each group. + + Note: DataFrame is constructed columnwise to ensure homogenous data columns. + + Parameters: + + filename (str): .h5 file's name. It may include location-path information. + + Returns: + + output_dataframe (pd.DataFrame): Matlab's Table as a Pandas DataFrame + + """ + + #contructs dataframe by filling out entries columnwise. This way we can ensure homogenous data columns""" + + with h5py.File(filename,'r') as file: + + # Define group's attributes and datasets. This should hold + # for all groups. TODO: implement verification and noncompliance error if needed. + group_list = list(file.keys()) + group_attrs = list(file[group_list[0]].attrs.keys()) + # + column_attr_names = [item[item.find('_')+1::] for item in group_attrs] + column_attr_names_idx = [int(item[4:(item.find('_'))]) for item in group_attrs] + + group_datasets = list(file[group_list[0]].keys()) if not 'DS_EMPTY' in file[group_list[0]].keys() else [] + # + column_dataset_names = [file[group_list[0]][item].attrs['column_name'] for item in group_datasets] + column_dataset_names_idx = [int(item[2:]) for item in group_datasets] + + + # Define data_frame as group_attrs + group_datasets + #pd_series_index = group_attrs + group_datasets + pd_series_index = column_attr_names + column_dataset_names + + output_dataframe = pd.DataFrame(columns=pd_series_index,index=group_list) + + tmp_col = [] + + for meas_prop in group_attrs + group_datasets: + if meas_prop in group_attrs: + column_label = meas_prop[meas_prop.find('_')+1:] + # Create numerical or categorical column from group's attributes + tmp_col = [file[group_key].attrs[meas_prop][()][0] for group_key in group_list] + else: + # Create dataset column from group's datasets + column_label = file[group_list[0] + '/' + meas_prop].attrs['column_name'] + #tmp_col = [file[group_key + '/' + meas_prop][()][0] for group_key in group_list] + tmp_col = [file[group_key + '/' + meas_prop][()] for group_key in group_list] + + output_dataframe.loc[:,column_label] = tmp_col + + return output_dataframe + +def create_group_hierarchy(obj, df, columns): + + """ + Input: + obj (h5py.File or h5py.Group) + columns (list of strs): denote categorical columns in df to be used to define hdf5 file group hierarchy + """ + + if not columns: + return + + # Determine categories associated with first categorical column + unique_values = df[columns[0]].unique() + + if obj.name == '/': + obj.attrs.create('count',df.shape[0]) + obj.attrs.create('file_list',df['filename'].tolist()) + + for group_name in unique_values: + + group = obj.require_group(group_name) + group.attrs.create('column_name', columns[0]) + + sub_df = df[df[columns[0]]==group_name] # same as df.loc[df[columns[0]]==group_name,:] + group.attrs.create('count',sub_df.shape[0]) + group.attrs.create('file_list',sub_df['filename'].tolist()) + + # if group_name == 'MgO powder,H2O,HCl': + # print('Here:',sub_df.shape) + create_group_hierarchy(group, sub_df, columns[1::]) + +def is_nested_hierarchy(df) -> bool: + """receives a dataframe with categorical columns and checks whether rows form a nested group hierarchy. + That is, from bottom to top, subsequent hierarchical levels contain nested groups. The lower level groups belong to exactly one group in the higher level group. + """ + # TODO: generalize the code to check for deeper group hierachies. + def are_nested(df, col, col_nxt): + """ Checks whether low level LL groups can be separated in terms of high level HL groups. + That is, elements of low-level groups do not belong to more than one HL group.""" + + # Compute higher level group names/categories + memberships = df[col_nxt].unique().tolist() + + # Compute upper-level group memberships of low-level groups + col_avg_memberships = df.groupby(col).mean()[col_nxt].unique() + + # Check whether all low-level groups have an actual hlg membership. That is, their avg. hlg membership is in the hlg membership. + return all([col_avg_memberships[group_idx] in memberships for group_idx in range(len(col_avg_memberships))]) + + df_tmp = df.copy() + + # Create relabeling map + for column_name in df_tmp.columns: + category_index = pd.Series(np.arange(len(df_tmp[column_name].unique())), index=df_tmp[column_name].unique()) + df_tmp[column_name] = category_index[df_tmp[column_name].tolist()].tolist() + + df_tmp.plot() + + return all([are_nested(df_tmp,'level_'+str(i)+'_groups','level_'+str(i+1)+'_groups') for i in range(len(df_tmp.columns)-1)]) + + +def get_groups_at_a_level(file: h5py.File, level: str): + + groups = [] + def node_selector(name, obj): + if name.count('/') == level: + print(name) + groups.append(obj.name) + + file.visititems(node_selector) + #file.visititems() + return groups + + +def annotate_root_dir(filename,annotation_dict: dict): + with h5py.File(filename,'r+') as file: + file.attrs.update(annotation_dict) + #for key in annotation_dict: + # file.attrs.create('metadata_'+key, annotation_dict[key]) + + +def transfer_file_dict_to_hdf5(h5file, group_name, file_dict): + """ + Transfers data from a file_dict to an HDF5 file. + + Parameters + ---------- + h5file : h5py.File + HDF5 file object where the data will be written. + group_name : str + Name of the HDF5 group where data will be stored. + file_dict : dict + Dictionary containing file data to be transferred. Required structure: + { + 'name': str, + 'attributes_dict': dict, + 'datasets': [ + { + 'name': str, + 'data': array-like, + 'shape': tuple, + 'attributes': dict (optional) + }, + ... + ] + } + + Returns + ------- + None + """ + + if not file_dict: + return + + try: + # Create group and add their attributes + group = h5file[group_name].create_group(name=file_dict['name']) + # Add group attributes + group.attrs.update(file_dict['attributes_dict']) + + # Add datasets to the just created group + for dataset in file_dict['datasets']: + dataset_obj = group.create_dataset( + name=dataset['name'], + data=dataset['data'], + shape=dataset['shape'] + ) + + # Add dataset's attributes + attributes = dataset.get('attributes', {}) + dataset_obj.attrs.update(attributes) + except Exception as inst: + print(inst) + logging.error('Failed to transfer data into HDF5: %s', inst) + +def copy_file_in_group(source_file_path, dest_file_obj : h5py.File, dest_group_name, work_with_copy : bool = True): + # Create copy of original file to avoid possible file corruption and work with it. + + if work_with_copy: + tmp_file_path = utils.make_file_copy(source_file_path) + else: + tmp_file_path = source_file_path + + # Open backup h5 file and copy complet filesystem directory onto a group in h5file + with h5py.File(tmp_file_path,'r') as src_file: + dest_file_obj.copy(source= src_file['/'], dest= dest_group_name) + + if 'tmp_files' in tmp_file_path: + os.remove(tmp_file_path) + +def create_hdf5_file_from_filesystem_path(path_to_input_directory: str, + path_to_filenames_dict: dict = None, + select_dir_keywords : list = [], + root_metadata_dict : dict = {}): + + """ + Creates an .h5 file with name "output_filename" that preserves the directory tree (or folder structure) + of a given filesystem path. + + The data integration capabilities are limited by our file reader, which can only access data from a list of + admissible file formats. These, however, can be extended. Directories are groups in the resulting HDF5 file. + Files are formatted as composite objects consisting of a group, file, and attributes. + + Parameters + ---------- + output_filename : str + Name of the output HDF5 file. + path_to_input_directory : str + Path to root directory, specified with forward slashes, e.g., path/to/root. + + path_to_filenames_dict : dict, optional + A pre-processed dictionary where keys are directory paths on the input directory's tree and values are lists of files. + If provided, 'input_file_system_path' is ignored. + + select_dir_keywords : list + List of string elements to consider or select only directory paths that contain + a word in 'select_dir_keywords'. When empty, all directory paths are considered + to be included in the HDF5 file group hierarchy. + root_metadata_dict : dict + Metadata to include at the root level of the HDF5 file. + + Returns + ------- + output_filename : str + Path to the created HDF5 file. + """ + + + + if not '/' in path_to_input_directory: + raise ValueError('path_to_input_directory needs to be specified using forward slashes "/".' ) + + #path_to_output_directory = os.path.join(path_to_input_directory,'..') + path_to_input_directory = os.path.normpath(path_to_input_directory).strip(os.sep) + + + for i, keyword in enumerate(select_dir_keywords): + select_dir_keywords[i] = keyword.replace('/',os.sep) + + if not path_to_filenames_dict: + # On dry_run=True, returns path to files dictionary of the output directory without making a actual copy of the input directory + path_to_output_directory = os.path.join(path_to_input_directory,'..') + path_to_filenames_dict = utils.copy_directory_with_contraints(path_to_input_directory, + path_to_output_directory, + dry_run=True) + # Set input_directory as copied input directory + root_dir = path_to_input_directory + path_to_output_file = path_to_input_directory.rstrip(os.path.sep) + '.h5' + + with h5py.File(path_to_output_file, mode='w', track_order=True) as h5file: + + number_of_dirs = len(path_to_filenames_dict.keys()) + dir_number = 1 + for dirpath, filtered_filenames_list in path_to_filenames_dict.items(): + + start_message = f'Starting to transfer files in directory: {dirpath}' + end_message = f'\nCompleted transferring files in directory: {dirpath}' + # Print and log the start message + print(start_message) + logging.info(start_message) + + # Check if filtered_filenames_list is nonempty. TODO: This is perhaps redundant by design of path_to_filenames_dict. + if not filtered_filenames_list: + continue + + group_name = dirpath.replace(os.sep,'/') + group_name = group_name.replace(root_dir.replace(os.sep,'/') + '/', '/') + + # Flatten group name to one level + if select_dir_keywords: + offset = sum([len(i.split(os.sep)) if i in dirpath else 0 for i in select_dir_keywords]) + else: + offset = 1 + tmp_list = group_name.split('/') + if len(tmp_list) > offset+1: + group_name = '/'.join([tmp_list[i] for i in range(offset+1)]) + + # Group hierarchy is implicitly defined by the forward slashes + if not group_name in h5file.keys(): + h5file.create_group(group_name) + #h5file[group_name].attrs.create(name='filtered_file_list',data=convert_string_to_bytes(filtered_filename_list)) + #h5file[group_name].attrs.create(name='file_list',data=convert_string_to_bytes(filenames_list)) + else: + print(group_name,' was already created.') + + for filenumber, filename in enumerate(filtered_filenames_list): + + #file_ext = os.path.splitext(filename)[1] + #try: + + # hdf5 path to filename group + dest_group_name = f'{group_name}/{filename}' + + if not 'h5' in filename: + #file_dict = config_file.select_file_readers(group_id)[file_ext](os.path.join(dirpath,filename)) + #file_dict = ext_to_reader_dict[file_ext](os.path.join(dirpath,filename)) + file_dict = filereader_registry.select_file_reader(dest_group_name)(os.path.join(dirpath,filename)) + + transfer_file_dict_to_hdf5(h5file, group_name, file_dict) + + else: + source_file_path = os.path.join(dirpath,filename) + dest_file_obj = h5file + #group_name +'/'+filename + #ext_to_reader_dict[file_ext](source_file_path, dest_file_obj, dest_group_name) + #g5505f_reader.select_file_reader(dest_group_name)(source_file_path, dest_file_obj, dest_group_name) + copy_file_in_group(source_file_path, dest_file_obj, dest_group_name, False) + + # Update the progress bar and log the end message + utils.progressBar(dir_number, number_of_dirs, end_message) + logging.info(end_message) + dir_number = dir_number + 1 + + + + if len(root_metadata_dict.keys())>0: + for key, value in root_metadata_dict.items(): + #if key in h5file.attrs: + # del h5file.attrs[key] + h5file.attrs.create(key, value) + #annotate_root_dir(output_filename,root_metadata_dict) + + + #output_yml_filename_path = hdf5_vis.take_yml_snapshot_of_hdf5_file(output_filename) + + return path_to_output_file #, output_yml_filename_path + +def save_processed_dataframe_to_hdf5(df, annotator, output_filename): # src_hdf5_path, script_date, script_name): + """ + Save processed dataframe columns with annotations to an HDF5 file. + + Parameters: + df (pd.DataFrame): DataFrame containing processed time series. + annotator (): Annotator object with get_metadata method. + output_filename (str): Path to the source HDF5 file. + """ + # Convert datetime columns to string + datetime_cols = df.select_dtypes(include=['datetime64']).columns + + if list(datetime_cols): + df[datetime_cols] = df[datetime_cols].map(str) + + # Convert dataframe to structured array + icad_data_table = utils.convert_dataframe_to_np_structured_array(df) + + # Get metadata + metadata_dict = annotator.get_metadata() + + # Prepare project level attributes to be added at the root level + + project_level_attributes = metadata_dict['metadata']['project'] + + # Prepare high-level attributes + high_level_attributes = { + 'parent_files': metadata_dict['parent_files'], + **metadata_dict['metadata']['sample'], + **metadata_dict['metadata']['environment'], + **metadata_dict['metadata']['instruments'] + } + + # Prepare data level attributes + data_level_attributes = metadata_dict['metadata']['datasets'] + + for key, value in data_level_attributes.items(): + if isinstance(value,dict): + data_level_attributes[key] = utils.convert_attrdict_to_np_structured_array(value) + + + # Prepare file dictionary + file_dict = { + 'name': project_level_attributes['processing_file'], + 'attributes_dict': high_level_attributes, + 'datasets': [{ + 'name': "data_table", + 'data': icad_data_table, + 'shape': icad_data_table.shape, + 'attributes': data_level_attributes + }] + } + + # Check if the file exists + if os.path.exists(output_filename): + mode = "a" + print(f"File {output_filename} exists. Opening in append mode.") + else: + mode = "w" + print(f"File {output_filename} does not exist. Creating a new file.") + + + # Write to HDF5 + with h5py.File(output_filename, mode) as h5file: + # Add project level attributes at the root/top level + h5file.attrs.update(project_level_attributes) + transfer_file_dict_to_hdf5(h5file, '/', file_dict) + + +def create_hdf5_file_from_dataframe(ofilename, input_data, approach : str, group_by_funcs : list, extract_attrs_func = None): + + """ Creates an hdf5 file with as many levels as indicated by len(group_by_funcs). + Top level denotes the root group/directory and bottom level denotes measurement level groups. + + Parameters: + input_data (pd.DataFrame) : + group_by_funcs (list of callables or strs) : contains a list of callables or dataframe's column names that will be used + to partition or group files from top to bottom. + + Callables in the list must assign a categorical value to each file in a file list, internally represented as a DataFrame, + and they thus return a pd.Series of categorical values. + + On the other hand, strings in the list refer to the name of categorical columns in the input_data (when this is a DataFrame) + + Returns: + + """ + + # Check whether input_data is a valid file-system path or a DataFrame + is_valid_path = lambda x : os.path.exists(input_data) if isinstance(input_data,str) else False + + if is_valid_path(input_data): + + file_list = os.listdir(input_data) + + # Navigates file-system folders/directories from top to bottom. + #for dirpath, dirnames, filenames in os.walk(input_data,topdown=True): + + + #df = pd.DataFrame(file_list,columns=['filename']) + df = utils.augment_with_filetype(df) + + elif isinstance(input_data,pd.DataFrame): + df = input_data.copy() + else: + raise ValueError("input_data must be either a valid file-system path or a dataframe.") + + # Create group columns to form paths + if utils.is_callable_list(group_by_funcs): + grouping_cols = [] + for i, func in enumerate(group_by_funcs): + grouping_cols.append('level_'+str(i)+'_groups') + df['level_'+str(i)+'_groups'] = func(df) + elif utils.is_str_list(group_by_funcs) and all([item in df.columns for item in group_by_funcs]): + grouping_cols = group_by_funcs + else: + raise ValueError("'group_by_funcs' must be a list of callables (or str) that takes input_data as input an returns a valid categorical output.") + + # Concatenate group columns to form paths + df['group_path'] = df[grouping_cols].apply(lambda row: '/'.join(row.values.astype(str)), axis=1) + + if approach == 'botton-up': + # TODO: implement botton-up approach + if is_nested_hierarchy(df.loc[:,grouping_cols]): + print('Do something') + else: + raise ValueError("group_by_funcs do not define a valid group hierarchy. Please reprocess the input_data or choose different grouping functions.") + + elif approach == 'top-down': + # Check the length of group_by_funcs list is at most 2 + #if len(group_by_funcs) > 2: + # # TODO: extend to more than 2 callable elements. + # raise ValueError("group_by_funcs can only contain at most two grouping elements.") + + with h5py.File(ofilename, 'w') as file: + + # Create groups based on concatenated paths + for path in df['group_path'].unique(): + file.create_group(path) + # TODO: incorporate remaining cols (i.e., excluding the group columns) as either metadata or datasets + + #create_group_hierarchy(file, df, grouping_cols) + + file.attrs.create(name='depth', data=len(grouping_cols)-1) + + print(':)') + + else: + raise ValueError("'approach' must take values in ['top-down','bottom-up']") + + + #for i, value in enumerate(df['level_'+str(0)+'_groups'].unique().tolist()): + + # 2. Validate group hierarchy, lower level groups must be embedded in higher level groups + + # 3. Create hdf5 file with groups defined by the 'file_group' column + # + # Add datasets to groups and the groups and the group's attributes + + #return 0 + +def main_mtable_h5_from_dataframe(): + + #import os + ROOT_DIR = os.path.abspath(os.curdir) + # Read BeamTimeMetaData.h5, containing Thorsten's Matlab Table + input_data_df = read_mtable_as_dataframe(os.path.join(ROOT_DIR,'input_files\\BeamTimeMetaData.h5')) + + # Preprocess Thorsten's input_data dataframe so that i can be used to create a newer .h5 file + # under certain grouping specificiations. + input_data_df = input_data_df.rename(columns = {'name':'filename'}) + input_data_df = utils.augment_with_filenumber(input_data_df) + input_data_df = utils.augment_with_filetype(input_data_df) + input_data_df = utils.split_sample_col_into_sample_and_data_quality_cols(input_data_df) + input_data_df['lastModifiedDatestr'] = input_data_df['lastModifiedDatestr'].astype('datetime64[s]') + + # Define grouping functions to be passed into create_hdf5_file function. These can also be set + # as strings refering to categorical columns in input_data_df. + + test_grouping_funcs = True + if test_grouping_funcs: + group_by_sample = lambda x : utils.group_by_df_column(x,'sample') + group_by_type = lambda x : utils.group_by_df_column(x,'filetype') + #group_by_filenumber = lambda x : utils.group_by_df_column(x,'filenumber') + else: + group_by_sample = 'sample' + group_by_type = 'filetype' + group_by_filenumber = 'filenumber' + + output_filename_path = os.path.join('output_files','thorsten_file_list.h5') + + create_hdf5_file_from_dataframe(output_filename_path,input_data_df, 'top-down', group_by_funcs = [group_by_sample, group_by_type]) + #create_hdf5_file_from_dataframe('test.h5',input_data_df, 'top-down', group_by_funcs = [group_by_sample, group_by_type, group_by_filenumber]) + + annotation_dict = {'1-Campaign name': '**SLS-Campaign-2023**', + '2-Users':'Thorsten, Luca, Zoe', + '3-Startdate': str(input_data_df['lastModifiedDatestr'].min()), + '4-Enddate': str(input_data_df['lastModifiedDatestr'].max()) + } + annotate_root_dir(output_filename_path, annotation_dict) + + #display_group_hierarchy_on_a_treemap(output_filename_path) + + print(':)') + +if __name__ == '__main__': + + #main() + main_mtable_h5_from_dataframe() + #main_5505() + + print(':)') + diff --git a/src/hdf5_ops.py b/src/hdf5_ops.py index 5f08c29..03fd7aa 100644 --- a/src/hdf5_ops.py +++ b/src/hdf5_ops.py @@ -76,7 +76,7 @@ class HDF5DataOpsManager(): # Parse value into HDF5 admissible type for key in dataset_dict['attributes'].keys(): value = dataset_dict['attributes'][key] - dataset_dict['attributes'][key] = utils.parse_attribute(value) + dataset_dict['attributes'][key] = utils.convert_attrdict_to_np_structured_array(value) #name = dataset_dict['name'] #data = dataset_dict['data'] @@ -98,7 +98,7 @@ class HDF5DataOpsManager(): for new_attr_key in annotation_dict.keys(): value = annotation_dict[new_attr_key] if isinstance(value, dict): - annotation_dict[new_attr_key] = utils.parse_attribute(annotation_dict[new_attr_key]) + annotation_dict[new_attr_key] = utils.convert_attrdict_to_np_structured_array(annotation_dict[new_attr_key]) obj.attrs.update(annotation_dict) def get_metadata(self, obj_path): @@ -231,60 +231,6 @@ def get_parent_child_relationships(file: h5py.File): return nodes, parent, values -def to_serializable_dtype(value): - - """Transform value's dtype into YAML/JSON compatible dtype - - Parameters - ---------- - value : _type_ - _description_ - - Returns - ------- - _type_ - _description_ - """ - try: - if isinstance(value, np.generic): - if np.issubdtype(value.dtype, np.bytes_): - value = value.decode('utf-8') - elif np.issubdtype(value.dtype, np.unicode_): - value = str(value) - elif np.issubdtype(value.dtype, np.number): - value = float(value) - else: - print('Yaml-compatible data-type was not found. Value has been set to NaN.') - value = np.nan - elif isinstance(value, np.ndarray): - # Handling structured array types (with fields) - if value.dtype.names: - value = {field: to_serializable_dtype(value[field]) for field in value.dtype.names} - else: - # Handling regular array NumPy types - if np.issubdtype(value.dtype, np.bytes_): - value = [item.decode('utf-8') for item in value] if len(value) > 1 else value[0].decode('utf-8') - elif np.issubdtype(value.dtype, np.unicode_): - value = [str(item) for item in value] if len(value) > 1 else str(value[0]) - elif np.issubdtype(value.dtype, np.integer): - value = [int(item) for item in value] if len(value) > 1 else int(value[0]) - elif np.issubdtype(value.dtype, np.floating): - value = [float(item) for item in value] if len(value) > 1 else float(value[0]) - else: - print('Yaml-compatible data-type was not found. Value has been set to NaN.') - value = np.nan - - except Exception as e: - print(f'Error converting value: {e}. Value has been set to NaN.') - value = np.nan - - return value - -def is_structured_array(attr_val): - if isinstance(attr_val,np.ndarray): - return True if attr_val.dtype.names is not None else False - else: - return False def construct_attributes_dict(attrs_obj): @@ -293,13 +239,13 @@ def construct_attributes_dict(attrs_obj): attr_dict[key] = {} if not key in ['file_list','filtered_file_list']: - if is_structured_array(value): + if utils.is_structured_array(value): #for subattr in value.dtype.names: #attr_dict[key][subattr] = make_dtype_yaml_compatible(value[subattr]) - attr_dict[key] = to_serializable_dtype(value) + attr_dict[key] = utils.to_serializable_dtype(value) else: attr_dict[key] = {"rename_as" : key, - "value" : to_serializable_dtype(value) + "value" : utils.to_serializable_dtype(value) } #if isinstance(value,str): diff --git a/utils/g5505_utils.py b/utils/g5505_utils.py index 49f577d..dafe78e 100644 --- a/utils/g5505_utils.py +++ b/utils/g5505_utils.py @@ -109,7 +109,7 @@ def created_at(): created_at = now_tz_aware.strftime('%Y-%m-%d_%H-%M-%S') + '_UTC-OFST_' + tz return created_at -def dataframe_to_np_structured_array(df: pd.DataFrame): +def convert_dataframe_to_np_structured_array(df: pd.DataFrame): # Define the dtype for the structured array, ensuring compatibility with h5py dtype = [] @@ -153,6 +153,47 @@ def convert_string_to_bytes(input_list: list): return input_array_bytes +def convert_attrdict_to_np_structured_array(attr_value: dict): + """ + Converts a dictionary of attributes into a numpy structured array for HDF5 + compound type compatibility. + + Each dictionary key is mapped to a field in the structured array, with the + data type (S) determined by the longest string representation of the values. + If the dictionary is empty, the function returns 'missing'. + + Parameters + ---------- + attr_value : dict + Dictionary containing the attributes to be converted. Example: + attr_value = { + 'name': 'Temperature', + 'unit': 'Celsius', + 'value': 23.5, + 'timestamp': '2023-09-26 10:00' + } + + Returns + ------- + new_attr_value : ndarray or str + Numpy structured array with UTF-8 encoded fields. Returns 'missing' if + the input dictionary is empty. + """ + dtype = [] + values_list = [] + max_length = max(len(str(attr_value[key])) for key in attr_value.keys()) + for key in attr_value.keys(): + if key != 'rename_as': + dtype.append((key, f'S{max_length}')) + values_list.append(attr_value[key]) + if values_list: + new_attr_value = np.array([tuple(values_list)], dtype=dtype) + else: + new_attr_value = 'missing' + + return new_attr_value + + def infer_units(column_name): # TODO: complete or remove @@ -165,23 +206,6 @@ def infer_units(column_name): return match -def parse_attribute(attr_value : dict): - "Parse a dictionary attribute into an equivalent numpy structured array, which compatible with compound HDF5 type" - dtype = [] - values_list = [] - max_length = max(len(str(attr_value[key])) for key in attr_value.keys()) - for key in attr_value.keys(): - if (not key=='rename_as'): - dtype.append((key,f'S{max_length}')) - values_list.append(attr_value[key]) - - if values_list: - new_attr_value = np.array([tuple(values_list)],dtype=dtype) - else: - new_attr_value = 'missing' - - return new_attr_value - def progressBar(count_value, total, suffix=''): bar_length = 100 filled_up_Length = int(round(bar_length* count_value / float(total))) @@ -270,4 +294,59 @@ def copy_directory_with_contraints(input_dir_path, output_dir_path, except Exception as e: logging.error("Failed to copy %s: %s", src_file_path, e) - return path_to_files_dict \ No newline at end of file + return path_to_files_dict + +def to_serializable_dtype(value): + + """Transform value's dtype into YAML/JSON compatible dtype + + Parameters + ---------- + value : _type_ + _description_ + + Returns + ------- + _type_ + _description_ + """ + try: + if isinstance(value, np.generic): + if np.issubdtype(value.dtype, np.bytes_): + value = value.decode('utf-8') + elif np.issubdtype(value.dtype, np.unicode_): + value = str(value) + elif np.issubdtype(value.dtype, np.number): + value = float(value) + else: + print('Yaml-compatible data-type was not found. Value has been set to NaN.') + value = np.nan + elif isinstance(value, np.ndarray): + # Handling structured array types (with fields) + if value.dtype.names: + value = {field: to_serializable_dtype(value[field]) for field in value.dtype.names} + else: + # Handling regular array NumPy types + if np.issubdtype(value.dtype, np.bytes_): + value = [item.decode('utf-8') for item in value] if len(value) > 1 else value[0].decode('utf-8') + elif np.issubdtype(value.dtype, np.unicode_): + value = [str(item) for item in value] if len(value) > 1 else str(value[0]) + elif np.issubdtype(value.dtype, np.integer): + value = [int(item) for item in value] if len(value) > 1 else int(value[0]) + elif np.issubdtype(value.dtype, np.floating): + value = [float(item) for item in value] if len(value) > 1 else float(value[0]) + else: + print('Yaml-compatible data-type was not found. Value has been set to NaN.') + value = np.nan + + except Exception as e: + print(f'Error converting value: {e}. Value has been set to NaN.') + value = np.nan + + return value + +def is_structured_array(attr_val): + if isinstance(attr_val,np.ndarray): + return True if attr_val.dtype.names is not None else False + else: + return False \ No newline at end of file