Source code for src.hdf5_ops

import sys
import os

try:
    thisFilePath = os.path.abspath(__file__)
except NameError:
    print("Error: __file__ is not available. Ensure the script is being run from a file.")
    print("[Notice] Path to DIMA package may not be resolved properly.")
    thisFilePath = os.getcwd()  # Use current directory or specify a default

dimaPath = os.path.normpath(os.path.join(thisFilePath, "..",'..'))  # Move up to project root

if dimaPath not in sys.path:  # Avoid duplicate entries
    sys.path.append(dimaPath)


import h5py
import pandas as pd
import numpy as np

import utils.g5505_utils as utils
import src.hdf5_writer as hdf5_lib
import logging
import datetime

import h5py

import yaml
import json
import copy

[docs] class HDF5DataOpsManager(): """ A class to handle HDF5 fundamental middle level file operations to power data updates, metadata revision, and data analysis with hdf5 files encoding multi-instrument experimental campaign data. Parameters: ----------- path_to_file : str path/to/hdf5file. mode : str 'r' or 'r+' read or read/write mode only when file exists """ def __init__(self, file_path, mode = 'r+') -> None: # Class attributes if mode in ['r','r+']: self.mode = mode self.file_path = file_path self.file_obj = None #self._open_file() self.dataset_metadata_df = None # Define private methods # Define public methods
[docs] def load_file_obj(self): if self.file_obj is None: self.file_obj = h5py.File(self.file_path, self.mode)
[docs] def unload_file_obj(self): if self.file_obj: self.file_obj.flush() # Ensure all data is written to disk self.file_obj.close() self.file_obj = None
[docs] def extract_and_load_dataset_metadata(self): def __get_datasets(name, obj, list_of_datasets): if isinstance(obj,h5py.Dataset): list_of_datasets.append(name) #print(f'Adding dataset: {name}') #tail: {head} head: {tail}') list_of_datasets = [] if self.file_obj is None: raise RuntimeError("File object is not loaded. Please load the HDF5 file using the 'load_file_obj' method before attempting to extract datasets.") try: list_of_datasets = [] self.file_obj.visititems(lambda name, obj: __get_datasets(name, obj, list_of_datasets)) dataset_metadata_df = pd.DataFrame({'dataset_name': list_of_datasets}) dataset_metadata_df['parent_instrument'] = dataset_metadata_df['dataset_name'].apply(lambda x: x.split('/')[-3]) dataset_metadata_df['parent_file'] = dataset_metadata_df['dataset_name'].apply(lambda x: x.split('/')[-2]) self.dataset_metadata_df = dataset_metadata_df except Exception as e: self.unload_file_obj() print(f"An unexpected error occurred: {e}. File object will be unloaded.")
[docs] def extract_dataset_as_dataframe(self,dataset_name): """ returns a copy of the dataset content in the form of dataframe when possible or numpy array """ if self.file_obj is None: raise RuntimeError("File object is not loaded. Please load the HDF5 file using the 'load_file_obj' method before attempting to extract datasets.") dataset_obj = self.file_obj[dataset_name] # Read dataset content from dataset obj data = dataset_obj[...] # The above statement can be understood as follows: # data = np.empty(shape=dataset_obj.shape, # dtype=dataset_obj.dtype) # dataset_obj.read_direct(data) try: return pd.DataFrame(data) except ValueError as e: logging.error(f"Failed to convert dataset '{dataset_name}' to DataFrame: {e}. Instead, dataset will be returned as Numpy array.") return data # 'data' is a NumPy array here except Exception as e: self.unload_file_obj() print(f"An unexpected error occurred: {e}. Returning None and unloading file object") return None
# Define metadata revision methods: append(), update(), delete(), and rename().
[docs] def append_metadata(self, obj_name, annotation_dict): """ Appends metadata attributes to the specified object (obj_name) based on the provided annotation_dict. This method ensures that the provided metadata attributes do not overwrite any existing ones. If an attribute already exists, a ValueError is raised. The function supports storing scalar values (int, float, str) and compound values such as dictionaries that are converted into NumPy structured arrays before being added to the metadata. Parameters: ----------- obj_name: str Path to the target object (dataset or group) within the HDF5 file. annotation_dict: dict A dictionary where the keys represent new attribute names (strings), and the values can be: - Scalars: int, float, or str. - Compound values (dictionaries) for more complex metadata, which are converted to NumPy structured arrays. Example of a compound value: Example: ---------- annotation_dict = { "relative_humidity": { "value": 65, "units": "percentage", "range": "[0,100]", "definition": "amount of water vapor present ..." } } """ if self.file_obj is None: raise RuntimeError("File object is not loaded. Please load the HDF5 file using the 'load_file_obj' method before attempting to modify it.") # Create a copy of annotation_dict to avoid modifying the original annotation_dict_copy = copy.deepcopy(annotation_dict) try: obj = self.file_obj[obj_name] # Check if any attribute already exists if any(key in obj.attrs for key in annotation_dict_copy.keys()): raise ValueError("Make sure the provided (key, value) pairs are not existing metadata elements or attributes. To modify or delete existing attributes use .modify_annotation() or .delete_annotation()") # Process the dictionary values and convert them to structured arrays if needed for key, value in annotation_dict_copy.items(): if isinstance(value, dict): # Convert dictionaries to NumPy structured arrays for complex attributes annotation_dict_copy[key] = utils.convert_attrdict_to_np_structured_array(value) # Update the object's attributes with the new metadata obj.attrs.update(annotation_dict_copy) except Exception as e: self.unload_file_obj() print(f"An unexpected error occurred: {e}. The file object has been properly closed.")
[docs] def update_metadata(self, obj_name, annotation_dict): """ Updates the value of existing metadata attributes of the specified object (obj_name) based on the provided annotation_dict. The function disregards non-existing attributes and suggests to use the append_metadata() method to include those in the metadata. Parameters: ----------- obj_name : str Path to the target object (dataset or group) within the HDF5 file. annotation_dict: dict A dictionary where the keys represent existing attribute names (strings), and the values can be: - Scalars: int, float, or str. - Compound values (dictionaries) for more complex metadata, which are converted to NumPy structured arrays. Example of a compound value: Example: ---------- annotation_dict = { "relative_humidity": { "value": 65, "units": "percentage", "range": "[0,100]", "definition": "amount of water vapor present ..." } } """ if self.file_obj is None: raise RuntimeError("File object is not loaded. Please load the HDF5 file using the 'load_file_obj' method before attempting to modify it.") update_dict = {} try: obj = self.file_obj[obj_name] for key, value in annotation_dict.items(): if key in obj.attrs: if isinstance(value, dict): update_dict[key] = utils.convert_attrdict_to_np_structured_array(value) else: update_dict[key] = value else: # Optionally, log or warn about non-existing keys being ignored. print(f"Warning: Key '{key}' does not exist and will be ignored.") obj.attrs.update(update_dict) except Exception as e: self.unload_file_obj() print(f"An unexpected error occurred: {e}. The file object has been properly closed.")
[docs] def delete_metadata(self, obj_name, annotation_dict): """ Deletes metadata attributes of the specified object (obj_name) based on the provided annotation_dict. Parameters: ----------- obj_name: str Path to the target object (dataset or group) within the HDF5 file. annotation_dict: dict Dictionary where keys represent attribute names, and values should be dictionaries containing {"delete": True} to mark them for deletion. Example: -------- annotation_dict = {"attr_to_be_deleted": {"delete": True}} Behavior: --------- - Deletes the specified attributes from the object's metadata if marked for deletion. - Issues a warning if the attribute is not found or not marked for deletion. """ if self.file_obj is None: raise RuntimeError("File object is not loaded. Please load the HDF5 file using the 'load_file_obj' method before attempting to modify it.") try: obj = self.file_obj[obj_name] for attr_key, value in annotation_dict.items(): if attr_key in obj.attrs: if isinstance(value, dict) and value.get('delete', False): obj.attrs.__delitem__(attr_key) else: msg = f"Warning: Value for key '{attr_key}' is not marked for deletion or is invalid." print(msg) else: msg = f"Warning: Key '{attr_key}' does not exist in metadata." print(msg) except Exception as e: self.unload_file_obj() print(f"An unexpected error occurred: {e}. The file object has been properly closed.")
[docs] def rename_metadata(self, obj_name, renaming_map): """ Renames metadata attributes of the specified object (obj_name) based on the provided renaming_map. Parameters: ----------- obj_name: str Path to the target object (dataset or group) within the HDF5 file. renaming_map: dict A dictionary where keys are current attribute names (strings), and values are the new attribute names (strings or byte strings) to rename to. Example: -------- renaming_map = { "old_attr_name": "new_attr_name", "old_attr_2": "new_attr_2" } """ if self.file_obj is None: raise RuntimeError("File object is not loaded. Please load the HDF5 file using the 'load_file_obj' method before attempting to modify it.") try: obj = self.file_obj[obj_name] # Iterate over the renaming_map to process renaming for old_attr, new_attr in renaming_map.items(): if old_attr in obj.attrs: # Get the old attribute's value attr_value = obj.attrs[old_attr] # Create a new attribute with the new name obj.attrs.create(new_attr, data=attr_value) # Delete the old attribute obj.attrs.__delitem__(old_attr) else: # Skip if the old attribute doesn't exist msg = f"Skipping: Attribute '{old_attr}' does not exist." print(msg) # Optionally, replace with warnings.warn(msg) except Exception as e: self.unload_file_obj() print( f"An unexpected error occurred: {e}. The file object has been properly closed. " "Please ensure that 'obj_name' exists in the file, and that the keys in 'renaming_map' are valid attributes of the object." ) self.unload_file_obj()
[docs] def get_metadata(self, obj_path): """ Get file attributes from object at path = obj_path. For example, obj_path = '/' will get root level attributes or metadata. """ try: # Access the attributes for the object at the given path metadata_dict = self.file_obj[obj_path].attrs except KeyError: # Handle the case where the path doesn't exist logging.error(f'Invalid object path: {obj_path}') metadata_dict = {} return metadata_dict
[docs] def reformat_datetime_column(self, dataset_name, column_name, src_format, desired_format='%Y-%m-%d %H:%M:%S.%f'): # Access the dataset dataset = self.file_obj[dataset_name] # Read the column data into a pandas Series and decode bytes to strings dt_column_data = pd.Series(dataset[column_name][:]).apply(lambda x: x.decode() ) # Convert to datetime using the source format dt_column_data = pd.to_datetime(dt_column_data, format=src_format, errors = 'coerce') # Reformat datetime objects to the desired format as strings dt_column_data = dt_column_data.dt.strftime(desired_format) # Encode the strings back to bytes #encoded_data = dt_column_data.apply(lambda x: x.encode() if not pd.isnull(x) else 'N/A').to_numpy() # Update the dataset in place #dataset[column_name][:] = encoded_data # Convert byte strings to datetime objects #timestamps = [datetime.datetime.strptime(a.decode(), src_format).strftime(desired_format) for a in dt_column_data] #datetime.strptime('31/01/22 23:59:59.999999', # '%d/%m/%y %H:%M:%S.%f') #pd.to_datetime( # np.array([a.decode() for a in dt_column_data]), # format=src_format, # errors='coerce' #) # Standardize the datetime format #standardized_time = datetime.strftime(desired_format) # Convert to byte strings to store back in the HDF5 dataset #standardized_time_bytes = np.array([s.encode() for s in timestamps]) # Update the column in the dataset (in-place update) # TODO: make this a more secure operation #dataset[column_name][:] = standardized_time_bytes #return np.array(timestamps) return dt_column_data.to_numpy()
# Define data append operations: append_dataset(), and update_file()
[docs] def append_dataset(self,dataset_dict, group_name): # Parse value into HDF5 admissible type for key in dataset_dict['attributes'].keys(): value = dataset_dict['attributes'][key] if isinstance(value, dict): dataset_dict['attributes'][key] = utils.convert_attrdict_to_np_structured_array(value) if not group_name in self.file_obj: self.file_obj.create_group(group_name, track_order=True) self.file_obj[group_name].attrs['creation_date'] = utils.created_at().encode("utf-8") self.file_obj[group_name].create_dataset(dataset_dict['name'], data=dataset_dict['data']) self.file_obj[group_name][dataset_dict['name']].attrs.update(dataset_dict['attributes']) self.file_obj[group_name].attrs['last_update_date'] = utils.created_at().encode("utf-8")
[docs] def update_file(self, path_to_append_dir): # Split the reference file path and the append directory path into directories and filenames ref_tail, ref_head = os.path.split(self.file_path) ref_head_filename, head_ext = os.path.splitext(ref_head) tail, head = os.path.split(path_to_append_dir) # Ensure the append directory is in the same directory as the reference file and has the same name (without extension) if not (ref_tail == tail and ref_head_filename == head): raise ValueError("The append directory must be in the same directory as the reference HDF5 file and have the same name without the extension.") # Close the file if it's already open if self.file_obj is not None: self.unload_file_obj() # Attempt to open the file in 'r+' mode for appending try: hdf5_lib.create_hdf5_file_from_filesystem_path(path_to_append_dir, mode='r+') except FileNotFoundError: raise FileNotFoundError(f"Reference HDF5 file '{self.file_path}' not found.") except OSError as e: raise OSError(f"Error opening HDF5 file: {e}")
[docs] def get_parent_child_relationships(file: h5py.File): nodes = ['/'] parent = [''] #values = [file.attrs['count']] # TODO: maybe we should make this more general and not dependent on file_list attribute? #if 'file_list' in file.attrs.keys(): # values = [len(file.attrs['file_list'])] #else: # values = [1] values = [len(file.keys())] def node_visitor(name,obj): if name.count('/') <=2: nodes.append(obj.name) parent.append(obj.parent.name) #nodes.append(os.path.split(obj.name)[1]) #parent.append(os.path.split(obj.parent.name)[1]) if isinstance(obj,h5py.Dataset):# or not 'file_list' in obj.attrs.keys(): values.append(1) else: print(obj.name) try: values.append(len(obj.keys())) except: values.append(0) file.visititems(node_visitor) return nodes, parent, values
def __print_metadata__(name, obj, folder_depth, yaml_dict): """ Extracts metadata from HDF5 groups and datasets and organizes them into a dictionary with compact representation. Parameters: ----------- name (str): Name of the HDF5 object being inspected. obj (h5py.Group or h5py.Dataset): The HDF5 object (Group or Dataset). folder_depth (int): Maximum depth of folders to explore. yaml_dict (dict): Dictionary to populate with metadata. """ # Process only objects within the specified folder depth if len(obj.name.split('/')) <= folder_depth: # and ".h5" not in obj.name: name_to_list = obj.name.split('/') name_head = name_to_list[-1] if not name_to_list[-1]=='' else obj.name if isinstance(obj, h5py.Group): # Handle groups # Convert attributes to a YAML/JSON serializable format attr_dict = {key: utils.to_serializable_dtype(val) for key, val in obj.attrs.items()} # Initialize the group dictionary group_dict = {"name": name_head, "attributes": attr_dict} # Handle group members compactly #subgroups = [member_name for member_name in obj if isinstance(obj[member_name], h5py.Group)] #datasets = [member_name for member_name in obj if isinstance(obj[member_name], h5py.Dataset)] # Summarize groups and datasets #group_dict["content_summary"] = { # "group_count": len(subgroups), # "group_preview": subgroups[:3] + (["..."] if len(subgroups) > 3 else []), # "dataset_count": len(datasets), # "dataset_preview": datasets[:3] + (["..."] if len(datasets) > 3 else []) #} yaml_dict[obj.name] = group_dict elif isinstance(obj, h5py.Dataset): # Handle datasets # Convert attributes to a YAML/JSON serializable format attr_dict = {key: utils.to_serializable_dtype(val) for key, val in obj.attrs.items()} dataset_dict = {"name": name_head, "attributes": attr_dict} yaml_dict[obj.name] = dataset_dict
[docs] def serialize_metadata(input_filename_path, folder_depth: int = 4, output_format: str = 'yaml') -> str: """ Serialize metadata from an HDF5 file into YAML or JSON format. Parameters ---------- input_filename_path : str The path to the input HDF5 file. folder_depth : int, optional The folder depth to control how much of the HDF5 file hierarchy is traversed (default is 4). output_format : str, optional The format to serialize the output, either 'yaml' or 'json' (default is 'yaml'). Returns ------- str The output file path where the serialized metadata is stored (either .yaml or .json). """ # Choose the appropriate output format (YAML or JSON) if output_format not in ['yaml', 'json']: raise ValueError("Unsupported format. Please choose either 'yaml' or 'json'.") # Initialize dictionary to store YAML/JSON data yaml_dict = {} # Split input file path to get the output file's base name output_filename_tail, ext = os.path.splitext(input_filename_path) # Open the HDF5 file and extract metadata with h5py.File(input_filename_path, 'r') as f: # Convert attribute dict to a YAML/JSON serializable dict #attrs_dict = {key: utils.to_serializable_dtype(val) for key, val in f.attrs.items()} #yaml_dict[f.name] = { # "name": f.name, # "attributes": attrs_dict, # "datasets": {} #} __print_metadata__(f.name, f, folder_depth, yaml_dict) # Traverse HDF5 file hierarchy and add datasets f.visititems(lambda name, obj: __print_metadata__(name, obj, folder_depth, yaml_dict)) # Serialize and write the data output_file_path = output_filename_tail + '.' + output_format with open(output_file_path, 'w') as output_file: if output_format == 'json': json_output = json.dumps(yaml_dict, indent=4, sort_keys=False) output_file.write(json_output) elif output_format == 'yaml': yaml_output = yaml.dump(yaml_dict, sort_keys=False) output_file.write(yaml_output) return output_file_path
[docs] def get_groups_at_a_level(file: h5py.File, level: str): groups = [] def node_selector(name, obj): if name.count('/') == level: print(name) groups.append(obj.name) file.visititems(node_selector) #file.visititems() return groups
[docs] def read_mtable_as_dataframe(filename): """ Reconstruct a MATLAB Table encoded in a .h5 file as a Pandas DataFrame. This function reads a .h5 file containing a MATLAB Table and reconstructs it as a Pandas DataFrame. The input .h5 file contains one group per row of the MATLAB Table. Each group stores the table's dataset-like variables as Datasets, while categorical and numerical variables are represented as attributes of the respective group. To ensure homogeneity of data columns, the DataFrame is constructed column-wise. Parameters ---------- filename : str The name of the .h5 file. This may include the file's location and path information. Returns ------- pd.DataFrame The MATLAB Table reconstructed as a Pandas DataFrame. """ #contructs dataframe by filling out entries columnwise. This way we can ensure homogenous data columns""" with h5py.File(filename,'r') as file: # Define group's attributes and datasets. This should hold # for all groups. TODO: implement verification and noncompliance error if needed. group_list = list(file.keys()) group_attrs = list(file[group_list[0]].attrs.keys()) # column_attr_names = [item[item.find('_')+1::] for item in group_attrs] column_attr_names_idx = [int(item[4:(item.find('_'))]) for item in group_attrs] group_datasets = list(file[group_list[0]].keys()) if not 'DS_EMPTY' in file[group_list[0]].keys() else [] # column_dataset_names = [file[group_list[0]][item].attrs['column_name'] for item in group_datasets] column_dataset_names_idx = [int(item[2:]) for item in group_datasets] # Define data_frame as group_attrs + group_datasets #pd_series_index = group_attrs + group_datasets pd_series_index = column_attr_names + column_dataset_names output_dataframe = pd.DataFrame(columns=pd_series_index,index=group_list) tmp_col = [] for meas_prop in group_attrs + group_datasets: if meas_prop in group_attrs: column_label = meas_prop[meas_prop.find('_')+1:] # Create numerical or categorical column from group's attributes tmp_col = [file[group_key].attrs[meas_prop][()][0] for group_key in group_list] else: # Create dataset column from group's datasets column_label = file[group_list[0] + '/' + meas_prop].attrs['column_name'] #tmp_col = [file[group_key + '/' + meas_prop][()][0] for group_key in group_list] tmp_col = [file[group_key + '/' + meas_prop][()] for group_key in group_list] output_dataframe.loc[:,column_label] = tmp_col return output_dataframe
if __name__ == "__main__": if len(sys.argv) < 5: print("Usage: python hdf5_ops.py serialize <path/to/target_file.hdf5> <folder_depth : int = 2> <format=json|yaml>") sys.exit(1) if sys.argv[1] == 'serialize': input_hdf5_file = sys.argv[2] folder_depth = int(sys.argv[3]) file_format = sys.argv[4] try: # Call the serialize_metadata function and capture the output path path_to_file = serialize_metadata(input_hdf5_file, folder_depth = folder_depth, output_format=file_format) print(f"Metadata serialized to {path_to_file}") except Exception as e: print(f"An error occurred during serialization: {e}") sys.exit(1) #run(sys.argv[2])