import sys import os root_dir = os.path.abspath(os.curdir) sys.path.append(root_dir) import h5py import pandas as pd import numpy as np import utils.g5505_utils as utils import src.hdf5_lib as hdf5_lib import logging import datetime import os import h5py import yaml import json import copy class HDF5DataOpsManager(): """ A class to handle HDF5 file operations. Parameters: path_to_file : str path/to/hdf5file. mode : str 'r' or 'r+' read or read/write mode only when file exists """ def __init__(self, file_path, mode = 'r+') -> None: # Class attributes if mode in ['r','r+']: self.mode = mode self.file_path = file_path self.file_obj = None #self._open_file() self.dataset_metadata_df = None # Define private methods # Define public methods def load_file_obj(self): if self.file_obj is None: self.file_obj = h5py.File(self.file_path, self.mode) def unload_file_obj(self): if self.file_obj: self.file_obj.flush() # Ensure all data is written to disk self.file_obj.close() self.file_obj = None def extract_and_load_dataset_metadata(self): def __get_datasets(name, obj, list_of_datasets): if isinstance(obj,h5py.Dataset): list_of_datasets.append(name) #print(f'Adding dataset: {name}') #tail: {head} head: {tail}') list_of_datasets = [] if self.file_obj is None: raise RuntimeError("File object is not loaded. Please load the HDF5 file using the 'load_file_obj' method before attempting to extract datasets.") try: list_of_datasets = [] self.file_obj.visititems(lambda name, obj: __get_datasets(name, obj, list_of_datasets)) dataset_metadata_df = pd.DataFrame({'dataset_name': list_of_datasets}) dataset_metadata_df['parent_instrument'] = dataset_metadata_df['dataset_name'].apply(lambda x: x.split('/')[-3]) dataset_metadata_df['parent_file'] = dataset_metadata_df['dataset_name'].apply(lambda x: x.split('/')[-2]) self.dataset_metadata_df = dataset_metadata_df except Exception as e: self.unload_file_obj() print(f"An unexpected error occurred: {e}. File object will be unloaded.") def extract_dataset_as_dataframe(self,dataset_name): """ returns a copy of the dataset content in the form of dataframe when possible or numpy array """ if self.file_obj is None: raise RuntimeError("File object is not loaded. Please load the HDF5 file using the 'load_file_obj' method before attempting to extract datasets.") dataset_obj = self.file_obj[dataset_name] # Read dataset content from dataset obj data = dataset_obj[...] # The above statement can be understood as follows: # data = np.empty(shape=dataset_obj.shape, # dtype=dataset_obj.dtype) # dataset_obj.read_direct(data) try: return pd.DataFrame(data) except ValueError as e: logging.error(f"Failed to convert dataset '{dataset_name}' to DataFrame: {e}. Instead, dataset will be returned as Numpy array.") return data # 'data' is a NumPy array here except Exception as e: self.unload_file_obj() print(f"An unexpected error occurred: {e}. Returning None and unloading file object") return None # Define metadata revision methods: append(), update(), delete(), and rename(). def append_metadata(self, obj_name, annotation_dict): """ Appends metadata attributes to the specified object (obj_name) based on the provided annotation_dict. This method ensures that the provided metadata attributes do not overwrite any existing ones. If an attribute already exists, a ValueError is raised. The function supports storing scalar values (int, float, str) and compound values such as dictionaries that are converted into NumPy structured arrays before being added to the metadata. Parameters: ----------- obj_name: str Path to the target object (dataset or group) within the HDF5 file. annotation_dict: dict A dictionary where the keys represent new attribute names (strings), and the values can be: - Scalars: int, float, or str. - Compound values (dictionaries) for more complex metadata, which are converted to NumPy structured arrays. Example of a compound value: Example: ---------- annotation_dict = { "relative_humidity": { "value": 65, "units": "percentage", "range": "[0,100]", "definition": "amount of water vapor present ..." } } """ if self.file_obj is None: raise RuntimeError("File object is not loaded. Please load the HDF5 file using the 'load_file_obj' method before attempting to modify it.") # Create a copy of annotation_dict to avoid modifying the original annotation_dict_copy = copy.deepcopy(annotation_dict) try: obj = self.file_obj[obj_name] # Check if any attribute already exists if any(key in obj.attrs for key in annotation_dict_copy.keys()): raise ValueError("Make sure the provided (key, value) pairs are not existing metadata elements or attributes. To modify or delete existing attributes use .modify_annotation() or .delete_annotation()") # Process the dictionary values and convert them to structured arrays if needed for key, value in annotation_dict_copy.items(): if isinstance(value, dict): # Convert dictionaries to NumPy structured arrays for complex attributes annotation_dict_copy[key] = utils.convert_attrdict_to_np_structured_array(value) # Update the object's attributes with the new metadata obj.attrs.update(annotation_dict_copy) except Exception as e: self.unload_file_obj() print(f"An unexpected error occurred: {e}. The file object has been properly closed.") def update_metadata(self, obj_name, annotation_dict): """ Updates the value of existing metadata attributes of the specified object (obj_name) based on the provided annotation_dict. The function disregards non-existing attributes and suggests to use the append_metadata() method to include those in the metadata. Parameters: ----------- obj_name : str Path to the target object (dataset or group) within the HDF5 file. annotation_dict: dict A dictionary where the keys represent existing attribute names (strings), and the values can be: - Scalars: int, float, or str. - Compound values (dictionaries) for more complex metadata, which are converted to NumPy structured arrays. Example of a compound value: Example: ---------- annotation_dict = { "relative_humidity": { "value": 65, "units": "percentage", "range": "[0,100]", "definition": "amount of water vapor present ..." } } """ if self.file_obj is None: raise RuntimeError("File object is not loaded. Please load the HDF5 file using the 'load_file_obj' method before attempting to modify it.") update_dict = {} try: obj = self.file_obj[obj_name] for key, value in annotation_dict.items(): if key in obj.attrs: if isinstance(value, dict): update_dict[key] = utils.convert_attrdict_to_np_structured_array(value) else: update_dict[key] = value else: # Optionally, log or warn about non-existing keys being ignored. print(f"Warning: Key '{key}' does not exist and will be ignored.") obj.attrs.update(update_dict) except Exception as e: self.unload_file_obj() print(f"An unexpected error occurred: {e}. The file object has been properly closed.") def delete_metadata(self, obj_name, annotation_dict): """ Deletes metadata attributes of the specified object (obj_name) based on the provided annotation_dict. Parameters: ----------- obj_name: str Path to the target object (dataset or group) within the HDF5 file. annotation_dict: dict Dictionary where keys represent attribute names, and values should be dictionaries containing {"delete": True} to mark them for deletion. Example: -------- annotation_dict = {"attr_to_be_deleted": {"delete": True}} Behavior: --------- - Deletes the specified attributes from the object's metadata if marked for deletion. - Issues a warning if the attribute is not found or not marked for deletion. """ if self.file_obj is None: raise RuntimeError("File object is not loaded. Please load the HDF5 file using the 'load_file_obj' method before attempting to modify it.") try: obj = self.file_obj[obj_name] for attr_key, value in annotation_dict.items(): if attr_key in obj.attrs: if isinstance(value, dict) and value.get('delete', False): obj.attrs.__delitem__(attr_key) else: msg = f"Warning: Value for key '{attr_key}' is not marked for deletion or is invalid." print(msg) else: msg = f"Warning: Key '{attr_key}' does not exist in metadata." print(msg) except Exception as e: self.unload_file_obj() print(f"An unexpected error occurred: {e}. The file object has been properly closed.") def rename_metadata(self, obj_name, renaming_map): """ Renames metadata attributes of the specified object (obj_name) based on the provided renaming_map. Parameters: ----------- obj_name: str Path to the target object (dataset or group) within the HDF5 file. renaming_map: dict A dictionary where keys are current attribute names (strings), and values are the new attribute names (strings or byte strings) to rename to. Example: -------- renaming_map = { "old_attr_name": "new_attr_name", "old_attr_2": "new_attr_2" } """ if self.file_obj is None: raise RuntimeError("File object is not loaded. Please load the HDF5 file using the 'load_file_obj' method before attempting to modify it.") try: obj = self.file_obj[obj_name] # Iterate over the renaming_map to process renaming for old_attr, new_attr in renaming_map.items(): if old_attr in obj.attrs: # Get the old attribute's value attr_value = obj.attrs[old_attr] # Create a new attribute with the new name obj.attrs.create(new_attr, data=attr_value) # Delete the old attribute obj.attrs.__delitem__(old_attr) else: # Skip if the old attribute doesn't exist msg = f"Skipping: Attribute '{old_attr}' does not exist." print(msg) # Optionally, replace with warnings.warn(msg) except Exception as e: self.unload_file_obj() print( f"An unexpected error occurred: {e}. The file object has been properly closed. " "Please ensure that 'obj_name' exists in the file, and that the keys in 'renaming_map' are valid attributes of the object." ) self.unload_file_obj() def get_metadata(self, obj_path): """ Get file attributes from object at path = obj_path. For example, obj_path = '/' will get root level attributes or metadata. """ try: # Access the attributes for the object at the given path metadata_dict = self.file_obj[obj_path].attrs except KeyError: # Handle the case where the path doesn't exist logging.error(f'Invalid object path: {obj_path}') metadata_dict = {} return metadata_dict def reformat_datetime_column(self, dataset_name, column_name, src_format, desired_format='%Y-%m-%d %H:%M:%S.%f'): # Access the dataset dataset = self.file_obj[dataset_name] # Read the column data into a pandas Series and decode bytes to strings dt_column_data = pd.Series(dataset[column_name][:]).apply(lambda x: x.decode() ) # Convert to datetime using the source format dt_column_data = pd.to_datetime(dt_column_data, format=src_format, errors = 'coerce') # Reformat datetime objects to the desired format as strings dt_column_data = dt_column_data.dt.strftime(desired_format) # Encode the strings back to bytes #encoded_data = dt_column_data.apply(lambda x: x.encode() if not pd.isnull(x) else 'N/A').to_numpy() # Update the dataset in place #dataset[column_name][:] = encoded_data # Convert byte strings to datetime objects #timestamps = [datetime.datetime.strptime(a.decode(), src_format).strftime(desired_format) for a in dt_column_data] #datetime.strptime('31/01/22 23:59:59.999999', # '%d/%m/%y %H:%M:%S.%f') #pd.to_datetime( # np.array([a.decode() for a in dt_column_data]), # format=src_format, # errors='coerce' #) # Standardize the datetime format #standardized_time = datetime.strftime(desired_format) # Convert to byte strings to store back in the HDF5 dataset #standardized_time_bytes = np.array([s.encode() for s in timestamps]) # Update the column in the dataset (in-place update) # TODO: make this a more secure operation #dataset[column_name][:] = standardized_time_bytes #return np.array(timestamps) return dt_column_data.to_numpy() # Define data append operations: append_dataset(), and update_file() def append_dataset(self,dataset_dict, group_name): # Parse value into HDF5 admissible type for key in dataset_dict['attributes'].keys(): value = dataset_dict['attributes'][key] if isinstance(key, dict): dataset_dict['attributes'][key] = utils.convert_attrdict_to_np_structured_array(value) if not group_name in self.file_obj: self.file_obj.create_group(group_name, track_order=True) self.file_obj[group_name].attrs['creation_date'] = utils.created_at().encode("utf-8") self.file_obj[group_name].create_dataset(dataset_dict['name'], data=dataset_dict['data']) self.file_obj[group_name][dataset_dict['name']].attrs.update(dataset_dict['attributes']) self.file_obj[group_name].attrs['last_update_date'] = utils.created_at().encode("utf-8") def update_file(self, path_to_append_dir): # Split the reference file path and the append directory path into directories and filenames ref_tail, ref_head = os.path.split(self.file_path) ref_head_filename, head_ext = os.path.splitext(ref_head) tail, head = os.path.split(path_to_append_dir) # Ensure the append directory is in the same directory as the reference file and has the same name (without extension) if not (ref_tail == tail and ref_head_filename == head): raise ValueError("The append directory must be in the same directory as the reference HDF5 file and have the same name without the extension.") # Close the file if it's already open if self.file_obj is not None: self.unload_file_obj() # Attempt to open the file in 'r+' mode for appending try: hdf5_lib.create_hdf5_file_from_filesystem_path(path_to_append_dir, mode='r+') except FileNotFoundError: raise FileNotFoundError(f"Reference HDF5 file '{self.file_path}' not found.") except OSError as e: raise OSError(f"Error opening HDF5 file: {e}") def read_dataset_from_hdf5file(hdf5_file_path, dataset_path): # Open the HDF5 file with h5py.File(hdf5_file_path, 'r') as hdf: # Load the dataset dataset = hdf[dataset_path] data = np.empty(dataset.shape, dtype=dataset.dtype) dataset.read_direct(data) df = pd.DataFrame(data) for col_name in df.select_dtypes(exclude='number'): df[col_name] = df[col_name].str.decode('utf-8') #apply(lambda x: x.decode('utf-8') if isinstance(x,bytes) else x) ## Extract metadata (attributes) and convert to a dictionary #metadata = hdf5_vis.construct_attributes_dict(hdf[dataset_name].attrs) ## Create a one-row DataFrame with the metadata #metadata_df = pd.DataFrame.from_dict(data, orient='columns') return df def get_parent_child_relationships(file: h5py.File): nodes = ['/'] parent = [''] #values = [file.attrs['count']] # TODO: maybe we should make this more general and not dependent on file_list attribute? #if 'file_list' in file.attrs.keys(): # values = [len(file.attrs['file_list'])] #else: # values = [1] values = [len(file.keys())] def node_visitor(name,obj): if name.count('/') <=2: nodes.append(obj.name) parent.append(obj.parent.name) #nodes.append(os.path.split(obj.name)[1]) #parent.append(os.path.split(obj.parent.name)[1]) if isinstance(obj,h5py.Dataset):# or not 'file_list' in obj.attrs.keys(): values.append(1) else: print(obj.name) try: values.append(len(obj.keys())) except: values.append(0) file.visititems(node_visitor) return nodes, parent, values def __print_metadata__(name, obj, folder_depth, yaml_dict): # TODO: should we enable deeper folders ? if len(obj.name.split('/')) <= folder_depth: name_to_list = obj.name.split('/') name_head = name_to_list[-1] if isinstance(obj,h5py.Group): #print('name:', obj.name) #print('attributes:', dict(obj.attrs)) #attr_dict = {} group_dict = {} # Convert attribute dict to a YAML/JSON serializable dict attr_dict = {key: utils.to_serializable_dtype(val) for key, val in obj.attrs.items()} #for key, value in obj.attrs.items(): #print (key, value.dtype) # if key == 'Layout': # print(value) # if not key in ['file_list','filtered_file_list']: # value = make_dtype_yaml_compatible(value) # attr_dict[key] = {'rename_as' : key, # 'value' : value # } #group_dict[obj.name] = {'name': obj.name, 'attributes': attr_dict} group_dict = {"name": name_head, "attributes": attr_dict, "datasets":{}} #group_dict[obj.name]["name"] = obj.name #group_dict[obj.name]["attributes"] = attr_dict #group_dict[obj.name]["datasets"] = {} #print(name) yaml_dict[obj.name] = group_dict elif isinstance(obj, h5py.Dataset): # Convert attribute dict to a YAML/JSON serializable dict attr_dict = {key: utils.to_serializable_dtype(val) for key, val in obj.attrs.items()} parent_name = '/'.join(name_to_list[:-1]) yaml_dict[parent_name]["datasets"][name_head] = {"rename_as": name_head ,"attributes": attr_dict} #print(yaml.dump(group_dict,sort_keys=False)) #elif len(obj.name.split('/')) == 3: # print(yaml.dump()) def serialize_metadata(input_filename_path, folder_depth: int = 4, output_format: str = 'yaml') -> str: """ Serialize metadata from an HDF5 file into YAML or JSON format. Parameters ---------- input_filename_path : str The path to the input HDF5 file. folder_depth : int, optional The folder depth to control how much of the HDF5 file hierarchy is traversed (default is 4). output_format : str, optional The format to serialize the output, either 'yaml' or 'json' (default is 'yaml'). Returns ------- str The output file path where the serialized metadata is stored (either .yaml or .json). """ # Choose the appropriate output format (YAML or JSON) if output_format not in ['yaml', 'json']: raise ValueError("Unsupported format. Please choose either 'yaml' or 'json'.") # Initialize dictionary to store YAML/JSON data yaml_dict = {} # Split input file path to get the output file's base name output_filename_tail, ext = os.path.splitext(input_filename_path) # Open the HDF5 file and extract metadata with h5py.File(input_filename_path, 'r') as f: # Convert attribute dict to a YAML/JSON serializable dict attrs_dict = {key: utils.to_serializable_dtype(val) for key, val in f.attrs.items()} yaml_dict[f.name] = { "name": f.name, "attributes": attrs_dict, "datasets": {} } # Traverse HDF5 file hierarchy and add datasets f.visititems(lambda name, obj: __print_metadata__(name, obj, folder_depth, yaml_dict)) # Serialize and write the data output_file_path = output_filename_tail + '.' + output_format with open(output_file_path, 'w') as output_file: if output_format == 'json': json_output = json.dumps(yaml_dict, indent=4, sort_keys=False) output_file.write(json_output) elif output_format == 'yaml': yaml_output = yaml.dump(yaml_dict, sort_keys=False) output_file.write(yaml_output) return output_file_path def get_groups_at_a_level(file: h5py.File, level: str): groups = [] def node_selector(name, obj): if name.count('/') == level: print(name) groups.append(obj.name) file.visititems(node_selector) #file.visititems() return groups