import sys import os root_dir = os.path.abspath(os.curdir) sys.path.append(root_dir) import h5py import pandas as pd import numpy as np import utils.g5505_utils as utils import src.hdf5_lib as hdf5_lib import logging import datetime import os import h5py import yaml import json import copy class HDF5DataOpsManager(): """ A class to handle HDF5 file operations. Parameters: path_to_file : str path/to/hdf5file. mode : str 'r' or 'r+' read or read/write mode only when file exists """ def __init__(self, file_path, mode = 'r+') -> None: # Class attributes if mode in ['r','r+']: self.mode = mode self.file_path = file_path self.file_obj = None #self._open_file() self.dataset_metadata_df = None # Define private methods # Define public methods def open_file(self): if self.file_obj is None: self.file_obj = h5py.File(self.file_path, self.mode) def close_file(self): if self.file_obj: self.file_obj.flush() # Ensure all data is written to disk self.file_obj.close() self.file_obj = None def load_dataset_metadata(self): def __get_datasets(name, obj, list_of_datasets): if isinstance(obj,h5py.Dataset): list_of_datasets.append(name) #print(f'Adding dataset: {name}') #tail: {head} head: {tail}') list_of_datasets = [] with h5py.File(self.file_path,'r') as file: list_of_datasets = [] file.visititems(lambda name, obj: __get_datasets(name, obj, list_of_datasets)) dataset_metadata_df = pd.DataFrame({'dataset_name': list_of_datasets}) dataset_metadata_df['parent_instrument'] = dataset_metadata_df['dataset_name'].apply(lambda x: x.split('/')[-3]) dataset_metadata_df['parent_file'] = dataset_metadata_df['dataset_name'].apply(lambda x: x.split('/')[-2]) self.dataset_metadata_df = dataset_metadata_df def read_dataset_as_dataframe(self,dataset_name): """ returns a copy of the dataset content in the form of dataframe when possible or numpy array """ if self.file_obj is None: self.open_file() dataset_obj = self.file_obj[dataset_name] # Read dataset content from dataset obj data = dataset_obj[...] # The above statement can be understood as follows: # data = np.empty(shape=dataset_obj.shape, # dtype=dataset_obj.dtype) # dataset_obj.read_direct(data) try: return pd.DataFrame(data) except ValueError as exp: logging.error(f"Failed to convert dataset '{dataset_name}' to DataFrame: {exp}. Instead, dataset will be returned as Numpy array.") return data # 'data' is a NumPy array here def append_dataset(self,dataset_dict, group_name): # Parse value into HDF5 admissible type for key in dataset_dict['attributes'].keys(): value = dataset_dict['attributes'][key] dataset_dict['attributes'][key] = utils.convert_attrdict_to_np_structured_array(value) #name = dataset_dict['name'] #data = dataset_dict['data'] #dtype = dataset_dict['dtype'] self.file_obj[group_name].create_dataset(dataset_dict['name'], data=dataset_dict['data']) self.file_obj[group_name][dataset_dict['name']].attrs.update(dataset_dict['attributes']) def append_metadata(self, obj_name, annotation_dict): """ Appends metadata attributes to the specified object (obj_name) based on the provided annotation_dict. This method ensures that the provided metadata attributes do not overwrite any existing ones. If an attribute already exists, a ValueError is raised. The function supports storing scalar values (int, float, str) and compound values such as dictionaries that are converted into NumPy structured arrays before being added to the metadata. Parameters: ----------- obj_name: str Path to the target object (dataset or group) within the HDF5 file. annotation_dict: dict A dictionary where the keys represent new attribute names (strings), and the values can be: - Scalars: int, float, or str. - Compound values (dictionaries) for more complex metadata, which are converted to NumPy structured arrays. Example of a compound value: Example: ---------- annotation_dict = { "relative_humidity": { "value": 65, "units": "percentage", "range": "[0,100]", "definition": "amount of water vapor present ..." } } """ if self.file_obj is None: self.open_file() # Create a copy of annotation_dict to avoid modifying the original annotation_dict_copy = copy.deepcopy(annotation_dict) #with h5py.File(self.file_path, mode='r+') as file_obj: obj = self.file_obj[obj_name] # Check if any attribute already exists if any(key in obj.attrs for key in annotation_dict_copy.keys()): raise ValueError("Make sure the provided (key, value) pairs are not existing metadata elements or attributes. To modify or delete existing attributes use .modify_annotation() or .delete_annotation()") # Process the dictionary values and convert them to structured arrays if needed for key, value in annotation_dict_copy.items(): if isinstance(value, dict): # Convert dictionaries to NumPy structured arrays for complex attributes annotation_dict_copy[key] = utils.convert_attrdict_to_np_structured_array(value) # Update the object's attributes with the new metadata obj.attrs.update(annotation_dict_copy) def update_metadata(self, obj_name, annotation_dict): """ Updates the value of existing metadata attributes of the specified object (obj_name) based on the provided annotation_dict. The function disregards non-existing attributes and suggests to use the append_metadata() method to include those in the metadata. Parameters: ----------- obj_name : str Path to the target object (dataset or group) within the HDF5 file. annotation_dict: dict A dictionary where the keys represent existing attribute names (strings), and the values can be: - Scalars: int, float, or str. - Compound values (dictionaries) for more complex metadata, which are converted to NumPy structured arrays. Example of a compound value: Example: ---------- annotation_dict = { "relative_humidity": { "value": 65, "units": "percentage", "range": "[0,100]", "definition": "amount of water vapor present ..." } } """ if self.file_obj is None: self.open_file() update_dict = {} #with h5py.File(self.file_path, mode='r+') as file_obj: obj = self.file_obj[obj_name] for key, value in annotation_dict.items(): if key in obj.attrs: if isinstance(value, dict): update_dict[key] = utils.convert_attrdict_to_np_structured_array(value) else: update_dict[key] = value else: # Optionally, log or warn about non-existing keys being ignored. print(f"Warning: Key '{key}' does not exist and will be ignored.") obj.attrs.update(update_dict) def delete_metadata(self, obj_name, annotation_dict): """ Deletes metadata attributes of the specified object (obj_name) based on the provided annotation_dict. Parameters: ----------- obj_name: str Path to the target object (dataset or group) within the HDF5 file. annotation_dict: dict Dictionary where keys represent attribute names, and values should be dictionaries containing {"delete": True} to mark them for deletion. Example: -------- annotation_dict = {"attr_to_be_deleted": {"delete": True}} Behavior: --------- - Deletes the specified attributes from the object's metadata if marked for deletion. - Issues a warning if the attribute is not found or not marked for deletion. """ if self.file_obj is None: self.open_file() #with h5py.File(self.file_path, mode='r+') as file_obj: obj = self.file_obj[obj_name] for attr_key, value in annotation_dict.items(): if attr_key in obj.attrs: if isinstance(value, dict) and value.get('delete', False): obj.attrs.__delitem__(attr_key) else: msg = f"Warning: Value for key '{attr_key}' is not marked for deletion or is invalid." print(msg) else: msg = f"Warning: Key '{attr_key}' does not exist in metadata." print(msg) def rename_metadata(self, obj_name, renaming_map): """ Renames metadata attributes of the specified object (obj_name) based on the provided renaming_map. Parameters: ----------- obj_name: str Path to the target object (dataset or group) within the HDF5 file. renaming_map: dict A dictionary where keys are current attribute names (strings), and values are the new attribute names (strings or byte strings) to rename to. Example: -------- renaming_map = { "old_attr_name": "new_attr_name", "old_attr_2": "new_attr_2" } """ #with h5py.File(self.file_path, mode='r+') as file_obj: if self.file_obj is None: self.open_file() obj = self.file_obj[obj_name] # Iterate over the renaming_map to process renaming for old_attr, new_attr in renaming_map.items(): if old_attr in obj.attrs: # Get the old attribute's value attr_value = obj.attrs[old_attr] # Create a new attribute with the new name obj.attrs.create(new_attr, data=attr_value) # Delete the old attribute obj.attrs.__delitem__(old_attr) else: # Skip if the old attribute doesn't exist msg = f"Skipping: Attribute '{old_attr}' does not exist." print(msg) # Optionally, replace with warnings.warn(msg) self.close_file() def get_metadata(self, obj_path): """ Get file attributes from object at path = obj_path. For example, obj_path = '/' will get root level attributes or metadata. """ try: # Access the attributes for the object at the given path metadata_dict = self.file_obj[obj_path].attrs except KeyError: # Handle the case where the path doesn't exist logging.error(f'Invalid object path: {obj_path}') metadata_dict = {} return metadata_dict def reformat_datetime_column(self, dataset_name, column_name, src_format, desired_format='%Y-%m-%d %H:%M:%S.%f'): # Access the dataset dataset = self.file_obj[dataset_name] # Read the column data into a pandas Series and decode bytes to strings dt_column_data = pd.Series(dataset[column_name][:]).apply(lambda x: x.decode() ) # Convert to datetime using the source format dt_column_data = pd.to_datetime(dt_column_data, format=src_format, errors = 'coerce') # Reformat datetime objects to the desired format as strings dt_column_data = dt_column_data.dt.strftime(desired_format) # Encode the strings back to bytes #encoded_data = dt_column_data.apply(lambda x: x.encode() if not pd.isnull(x) else 'N/A').to_numpy() # Update the dataset in place #dataset[column_name][:] = encoded_data # Convert byte strings to datetime objects #timestamps = [datetime.datetime.strptime(a.decode(), src_format).strftime(desired_format) for a in dt_column_data] #datetime.strptime('31/01/22 23:59:59.999999', # '%d/%m/%y %H:%M:%S.%f') #pd.to_datetime( # np.array([a.decode() for a in dt_column_data]), # format=src_format, # errors='coerce' #) # Standardize the datetime format #standardized_time = datetime.strftime(desired_format) # Convert to byte strings to store back in the HDF5 dataset #standardized_time_bytes = np.array([s.encode() for s in timestamps]) # Update the column in the dataset (in-place update) # TODO: make this a more secure operation #dataset[column_name][:] = standardized_time_bytes #return np.array(timestamps) return dt_column_data.to_numpy() def update_file(self, path_to_append_dir): # Split the reference file path and the append directory path into directories and filenames ref_tail, ref_head = os.path.split(self.file_path) ref_head_filename, head_ext = os.path.splitext(ref_head) tail, head = os.path.split(path_to_append_dir) # Ensure the append directory is in the same directory as the reference file and has the same name (without extension) if not (ref_tail == tail and ref_head_filename == head): raise ValueError("The append directory must be in the same directory as the reference HDF5 file and have the same name without the extension.") # Close the file if it's already open if self.file_obj is not None: self.close_file() # Attempt to open the file in 'r+' mode for appending try: hdf5_lib.create_hdf5_file_from_filesystem_path(path_to_append_dir, mode='r+') except FileNotFoundError: raise FileNotFoundError(f"Reference HDF5 file '{self.file_path}' not found.") except OSError as e: raise OSError(f"Error opening HDF5 file: {e}") def read_dataset_from_hdf5file(hdf5_file_path, dataset_path): # Open the HDF5 file with h5py.File(hdf5_file_path, 'r') as hdf: # Load the dataset dataset = hdf[dataset_path] data = np.empty(dataset.shape, dtype=dataset.dtype) dataset.read_direct(data) df = pd.DataFrame(data) for col_name in df.select_dtypes(exclude='number'): df[col_name] = df[col_name].str.decode('utf-8') #apply(lambda x: x.decode('utf-8') if isinstance(x,bytes) else x) ## Extract metadata (attributes) and convert to a dictionary #metadata = hdf5_vis.construct_attributes_dict(hdf[dataset_name].attrs) ## Create a one-row DataFrame with the metadata #metadata_df = pd.DataFrame.from_dict(data, orient='columns') return df def get_parent_child_relationships(file: h5py.File): nodes = ['/'] parent = [''] #values = [file.attrs['count']] # TODO: maybe we should make this more general and not dependent on file_list attribute? #if 'file_list' in file.attrs.keys(): # values = [len(file.attrs['file_list'])] #else: # values = [1] values = [len(file.keys())] def node_visitor(name,obj): if name.count('/') <=2: nodes.append(obj.name) parent.append(obj.parent.name) #nodes.append(os.path.split(obj.name)[1]) #parent.append(os.path.split(obj.parent.name)[1]) if isinstance(obj,h5py.Dataset):# or not 'file_list' in obj.attrs.keys(): values.append(1) else: print(obj.name) try: values.append(len(obj.keys())) except: values.append(0) file.visititems(node_visitor) return nodes, parent, values def __print_metadata__(name, obj, folder_depth, yaml_dict): # TODO: should we enable deeper folders ? if len(obj.name.split('/')) <= folder_depth: name_to_list = obj.name.split('/') name_head = name_to_list[-1] if isinstance(obj,h5py.Group): #print('name:', obj.name) #print('attributes:', dict(obj.attrs)) #attr_dict = {} group_dict = {} # Convert attribute dict to a YAML/JSON serializable dict attr_dict = {key: utils.to_serializable_dtype(val) for key, val in obj.attrs.items()} #for key, value in obj.attrs.items(): #print (key, value.dtype) # if key == 'Layout': # print(value) # if not key in ['file_list','filtered_file_list']: # value = make_dtype_yaml_compatible(value) # attr_dict[key] = {'rename_as' : key, # 'value' : value # } #group_dict[obj.name] = {'name': obj.name, 'attributes': attr_dict} group_dict = {"name": name_head, "attributes": attr_dict, "datasets":{}} #group_dict[obj.name]["name"] = obj.name #group_dict[obj.name]["attributes"] = attr_dict #group_dict[obj.name]["datasets"] = {} #print(name) yaml_dict[obj.name] = group_dict elif isinstance(obj, h5py.Dataset): # Convert attribute dict to a YAML/JSON serializable dict attr_dict = {key: utils.to_serializable_dtype(val) for key, val in obj.attrs.items()} parent_name = '/'.join(name_to_list[:-1]) yaml_dict[parent_name]["datasets"][name_head] = {"rename_as": name_head ,"attributes": attr_dict} #print(yaml.dump(group_dict,sort_keys=False)) #elif len(obj.name.split('/')) == 3: # print(yaml.dump()) def serialize_metadata(input_filename_path, folder_depth: int = 4, output_format: str = 'yaml') -> str: """ Serialize metadata from an HDF5 file into YAML or JSON format. Parameters ---------- input_filename_path : str The path to the input HDF5 file. folder_depth : int, optional The folder depth to control how much of the HDF5 file hierarchy is traversed (default is 4). output_format : str, optional The format to serialize the output, either 'yaml' or 'json' (default is 'yaml'). Returns ------- str The output file path where the serialized metadata is stored (either .yaml or .json). """ # Choose the appropriate output format (YAML or JSON) if output_format not in ['yaml', 'json']: raise ValueError("Unsupported format. Please choose either 'yaml' or 'json'.") # Initialize dictionary to store YAML/JSON data yaml_dict = {} # Split input file path to get the output file's base name output_filename_tail, ext = os.path.splitext(input_filename_path) # Open the HDF5 file and extract metadata with h5py.File(input_filename_path, 'r') as f: # Convert attribute dict to a YAML/JSON serializable dict attrs_dict = {key: utils.to_serializable_dtype(val) for key, val in f.attrs.items()} yaml_dict[f.name] = { "name": f.name, "attributes": attrs_dict, "datasets": {} } # Traverse HDF5 file hierarchy and add datasets f.visititems(lambda name, obj: __print_metadata__(name, obj, folder_depth, yaml_dict)) # Serialize and write the data output_file_path = output_filename_tail + '.' + output_format with open(output_file_path, 'w') as output_file: if output_format == 'json': json_output = json.dumps(yaml_dict, indent=4, sort_keys=False) output_file.write(json_output) elif output_format == 'yaml': yaml_output = yaml.dump(yaml_dict, sort_keys=False) output_file.write(yaml_output) return output_file_path def get_groups_at_a_level(file: h5py.File, level: str): groups = [] def node_selector(name, obj): if name.count('/') == level: print(name) groups.append(obj.name) file.visititems(node_selector) #file.visititems() return groups