786 lines
31 KiB
Python
786 lines
31 KiB
Python
import sys
|
|
import os
|
|
|
|
try:
|
|
thisFilePath = os.path.abspath(__file__)
|
|
except NameError:
|
|
print("Error: __file__ is not available. Ensure the script is being run from a file.")
|
|
print("[Notice] Path to DIMA package may not be resolved properly.")
|
|
thisFilePath = os.getcwd() # Use current directory or specify a default
|
|
|
|
dimaPath = os.path.normpath(os.path.join(thisFilePath, "..",'..')) # Move up to project root
|
|
|
|
if dimaPath not in sys.path: # Avoid duplicate entries
|
|
sys.path.append(dimaPath)
|
|
|
|
|
|
import h5py
|
|
import pandas as pd
|
|
import numpy as np
|
|
import logging
|
|
import datetime
|
|
import yaml
|
|
import json
|
|
import copy
|
|
|
|
import utils.g5505_utils as utils
|
|
import src.hdf5_writer as hdf5_lib
|
|
|
|
class HDF5DataOpsManager():
|
|
|
|
"""
|
|
A class to handle HDF5 fundamental middle level file operations to power data updates, metadata revision, and data analysis
|
|
with hdf5 files encoding multi-instrument experimental campaign data.
|
|
|
|
Parameters:
|
|
-----------
|
|
path_to_file : str
|
|
path/to/hdf5file.
|
|
mode : str
|
|
'r' or 'r+' read or read/write mode only when file exists
|
|
"""
|
|
def __init__(self, file_path, mode = 'r+') -> None:
|
|
|
|
# Class attributes
|
|
if mode in ['r','r+']:
|
|
self.mode = mode
|
|
self.file_path = file_path
|
|
self.file_obj = None
|
|
#self._open_file()
|
|
self.dataset_metadata_df = None
|
|
|
|
# Define private methods
|
|
|
|
# Define public methods
|
|
|
|
def load_file_obj(self):
|
|
if self.file_obj is None:
|
|
self.file_obj = h5py.File(self.file_path, self.mode)
|
|
|
|
def unload_file_obj(self):
|
|
if self.file_obj:
|
|
self.file_obj.flush() # Ensure all data is written to disk
|
|
self.file_obj.close()
|
|
self.file_obj = None
|
|
self.dataset_metadata_df = None # maybe replace by del self.dataset_metadata_df to explicitly clear the reference as well as the memory.
|
|
|
|
def extract_and_load_dataset_metadata(self):
|
|
|
|
def __get_datasets(name, obj, list_of_datasets):
|
|
if isinstance(obj,h5py.Dataset):
|
|
list_of_datasets.append(name)
|
|
#print(f'Adding dataset: {name}') #tail: {head} head: {tail}')
|
|
list_of_datasets = []
|
|
|
|
if self.file_obj is None:
|
|
raise RuntimeError("File object is not loaded. Please load the HDF5 file using the 'load_file_obj' method before attempting to extract datasets.")
|
|
|
|
try:
|
|
|
|
list_of_datasets = []
|
|
|
|
self.file_obj.visititems(lambda name, obj: __get_datasets(name, obj, list_of_datasets))
|
|
|
|
dataset_metadata_df = pd.DataFrame({'dataset_name': list_of_datasets})
|
|
dataset_metadata_df['parent_instrument'] = dataset_metadata_df['dataset_name'].apply(lambda x: '/'.join(x.split('/')[i] for i in range(0,len(x.split('/')) - 2)))#[-3]))
|
|
dataset_metadata_df['parent_file'] = dataset_metadata_df['dataset_name'].apply(lambda x: x.split('/')[-2])
|
|
|
|
self.dataset_metadata_df = dataset_metadata_df
|
|
|
|
except Exception as e:
|
|
|
|
self.unload_file_obj()
|
|
print(f"An unexpected error occurred: {e}. File object will be unloaded.")
|
|
|
|
|
|
def infer_datetime_variable(self,dataset_name):
|
|
|
|
if self.file_obj is None:
|
|
raise RuntimeError("File object is not loaded. Please load the HDF5 file using the 'load_file_obj' method before attempting to extract datasets.")
|
|
|
|
metadata_dict = self.get_metadata(dataset_name)
|
|
|
|
datetime_var = None
|
|
datetime_format = None
|
|
for key in metadata_dict.keys(): # by construction key correspond to column/variable names
|
|
if not utils.is_structured_array(metadata_dict[key]):
|
|
continue
|
|
if 'data_type' in metadata_dict[key].dtype.names:
|
|
if metadata_dict[key][0]['data_type'].decode() == 'datetime':
|
|
datetime_var = key
|
|
datetime_format = metadata_dict[key]['datetime_format'][0].decode()
|
|
return datetime_var, datetime_format
|
|
|
|
|
|
|
|
return None, None
|
|
|
|
|
|
|
|
def extract_dataset_as_dataframe(self,dataset_name):
|
|
"""
|
|
returns a copy of the dataset content in the form of dataframe when possible or numpy array
|
|
"""
|
|
if self.file_obj is None:
|
|
raise RuntimeError("File object is not loaded. Please load the HDF5 file using the 'load_file_obj' method before attempting to extract datasets.")
|
|
|
|
dataset_obj = self.file_obj[dataset_name]
|
|
|
|
datetime_var, datetime_format = self.infer_datetime_variable(dataset_name)
|
|
# Read dataset content from dataset obj
|
|
data = dataset_obj[...]
|
|
# The above statement can be understood as follows:
|
|
# data = np.empty(shape=dataset_obj.shape,
|
|
# dtype=dataset_obj.dtype)
|
|
# dataset_obj.read_direct(data)
|
|
|
|
try:
|
|
data = pd.DataFrame(data)
|
|
|
|
if datetime_var in data.columns:
|
|
data[datetime_var] = data[datetime_var].apply(lambda x : x.decode())
|
|
data[datetime_var] = pd.to_datetime(data[datetime_var],format=datetime_format,errors='coerce')
|
|
|
|
return pd.DataFrame(data)
|
|
except ValueError as e:
|
|
logging.error(f"Failed to convert dataset '{dataset_name}' to DataFrame: {e}. Instead, dataset will be returned as Numpy array.")
|
|
return data # 'data' is a NumPy array here
|
|
except Exception as e:
|
|
self.unload_file_obj()
|
|
print(f"An unexpected error occurred: {e}. Returning None and unloading file object")
|
|
return None
|
|
|
|
# Define metadata revision methods: append(), update(), delete(), and rename().
|
|
|
|
def append_metadata(self, obj_name, annotation_dict):
|
|
"""
|
|
Appends metadata attributes to the specified object (obj_name) based on the provided annotation_dict.
|
|
|
|
This method ensures that the provided metadata attributes do not overwrite any existing ones. If an attribute already exists,
|
|
a ValueError is raised. The function supports storing scalar values (int, float, str) and compound values such as dictionaries
|
|
that are converted into NumPy structured arrays before being added to the metadata.
|
|
|
|
Parameters:
|
|
-----------
|
|
obj_name: str
|
|
Path to the target object (dataset or group) within the HDF5 file.
|
|
|
|
annotation_dict: dict
|
|
A dictionary where the keys represent new attribute names (strings), and the values can be:
|
|
- Scalars: int, float, or str.
|
|
- Compound values (dictionaries) for more complex metadata, which are converted to NumPy structured arrays.
|
|
Example of a compound value:
|
|
|
|
Example:
|
|
----------
|
|
annotation_dict = {
|
|
"relative_humidity": {
|
|
"value": 65,
|
|
"units": "percentage",
|
|
"range": "[0,100]",
|
|
"definition": "amount of water vapor present ..."
|
|
}
|
|
}
|
|
"""
|
|
|
|
if self.file_obj is None:
|
|
raise RuntimeError("File object is not loaded. Please load the HDF5 file using the 'load_file_obj' method before attempting to modify it.")
|
|
|
|
# Create a copy of annotation_dict to avoid modifying the original
|
|
annotation_dict_copy = copy.deepcopy(annotation_dict)
|
|
|
|
try:
|
|
obj = self.file_obj[obj_name]
|
|
|
|
# Check if any attribute already exists
|
|
if any(key in obj.attrs for key in annotation_dict_copy.keys()):
|
|
raise ValueError("Make sure the provided (key, value) pairs are not existing metadata elements or attributes. To modify or delete existing attributes use .modify_annotation() or .delete_annotation()")
|
|
|
|
# Process the dictionary values and convert them to structured arrays if needed
|
|
for key, value in annotation_dict_copy.items():
|
|
if isinstance(value, dict):
|
|
# Convert dictionaries to NumPy structured arrays for complex attributes
|
|
annotation_dict_copy[key] = utils.convert_attrdict_to_np_structured_array(value)
|
|
|
|
# Update the object's attributes with the new metadata
|
|
obj.attrs.update(annotation_dict_copy)
|
|
|
|
except Exception as e:
|
|
self.unload_file_obj()
|
|
print(f"An unexpected error occurred: {e}. The file object has been properly closed.")
|
|
|
|
|
|
def update_metadata(self, obj_name, annotation_dict):
|
|
"""
|
|
Updates the value of existing metadata attributes of the specified object (obj_name) based on the provided annotation_dict.
|
|
|
|
The function disregards non-existing attributes and suggests to use the append_metadata() method to include those in the metadata.
|
|
|
|
Parameters:
|
|
-----------
|
|
obj_name : str
|
|
Path to the target object (dataset or group) within the HDF5 file.
|
|
|
|
annotation_dict: dict
|
|
A dictionary where the keys represent existing attribute names (strings), and the values can be:
|
|
- Scalars: int, float, or str.
|
|
- Compound values (dictionaries) for more complex metadata, which are converted to NumPy structured arrays.
|
|
Example of a compound value:
|
|
|
|
Example:
|
|
----------
|
|
annotation_dict = {
|
|
"relative_humidity": {
|
|
"value": 65,
|
|
"units": "percentage",
|
|
"range": "[0,100]",
|
|
"definition": "amount of water vapor present ..."
|
|
}
|
|
}
|
|
|
|
|
|
"""
|
|
|
|
if self.file_obj is None:
|
|
raise RuntimeError("File object is not loaded. Please load the HDF5 file using the 'load_file_obj' method before attempting to modify it.")
|
|
|
|
update_dict = {}
|
|
|
|
try:
|
|
|
|
obj = self.file_obj[obj_name]
|
|
for key, value in annotation_dict.items():
|
|
if key in obj.attrs:
|
|
if isinstance(value, dict):
|
|
update_dict[key] = utils.convert_attrdict_to_np_structured_array(value)
|
|
else:
|
|
update_dict[key] = value
|
|
else:
|
|
# Optionally, log or warn about non-existing keys being ignored.
|
|
print(f"Warning: Key '{key}' does not exist and will be ignored.")
|
|
|
|
obj.attrs.update(update_dict)
|
|
|
|
except Exception as e:
|
|
self.unload_file_obj()
|
|
print(f"An unexpected error occurred: {e}. The file object has been properly closed.")
|
|
|
|
def delete_metadata(self, obj_name, annotation_dict):
|
|
"""
|
|
Deletes metadata attributes of the specified object (obj_name) based on the provided annotation_dict.
|
|
|
|
Parameters:
|
|
-----------
|
|
obj_name: str
|
|
Path to the target object (dataset or group) within the HDF5 file.
|
|
|
|
annotation_dict: dict
|
|
Dictionary where keys represent attribute names, and values should be dictionaries containing
|
|
{"delete": True} to mark them for deletion.
|
|
|
|
Example:
|
|
--------
|
|
annotation_dict = {"attr_to_be_deleted": {"delete": True}}
|
|
|
|
Behavior:
|
|
---------
|
|
- Deletes the specified attributes from the object's metadata if marked for deletion.
|
|
- Issues a warning if the attribute is not found or not marked for deletion.
|
|
"""
|
|
|
|
if self.file_obj is None:
|
|
raise RuntimeError("File object is not loaded. Please load the HDF5 file using the 'load_file_obj' method before attempting to modify it.")
|
|
|
|
try:
|
|
obj = self.file_obj[obj_name]
|
|
for attr_key, value in annotation_dict.items():
|
|
if attr_key in obj.attrs:
|
|
if isinstance(value, dict) and value.get('delete', False):
|
|
obj.attrs.__delitem__(attr_key)
|
|
else:
|
|
msg = f"Warning: Value for key '{attr_key}' is not marked for deletion or is invalid."
|
|
print(msg)
|
|
else:
|
|
msg = f"Warning: Key '{attr_key}' does not exist in metadata."
|
|
print(msg)
|
|
|
|
except Exception as e:
|
|
self.unload_file_obj()
|
|
print(f"An unexpected error occurred: {e}. The file object has been properly closed.")
|
|
|
|
|
|
def rename_metadata(self, obj_name, renaming_map):
|
|
"""
|
|
Renames metadata attributes of the specified object (obj_name) based on the provided renaming_map.
|
|
|
|
Parameters:
|
|
-----------
|
|
obj_name: str
|
|
Path to the target object (dataset or group) within the HDF5 file.
|
|
|
|
renaming_map: dict
|
|
A dictionary where keys are current attribute names (strings), and values are the new attribute names (strings or byte strings) to rename to.
|
|
|
|
Example:
|
|
--------
|
|
renaming_map = {
|
|
"old_attr_name": "new_attr_name",
|
|
"old_attr_2": "new_attr_2"
|
|
}
|
|
|
|
"""
|
|
|
|
if self.file_obj is None:
|
|
raise RuntimeError("File object is not loaded. Please load the HDF5 file using the 'load_file_obj' method before attempting to modify it.")
|
|
|
|
try:
|
|
obj = self.file_obj[obj_name]
|
|
# Iterate over the renaming_map to process renaming
|
|
for old_attr, new_attr in renaming_map.items():
|
|
if old_attr in obj.attrs:
|
|
# Get the old attribute's value
|
|
attr_value = obj.attrs[old_attr]
|
|
|
|
# Create a new attribute with the new name
|
|
obj.attrs.create(new_attr, data=attr_value)
|
|
|
|
# Delete the old attribute
|
|
obj.attrs.__delitem__(old_attr)
|
|
else:
|
|
# Skip if the old attribute doesn't exist
|
|
msg = f"Skipping: Attribute '{old_attr}' does not exist."
|
|
print(msg) # Optionally, replace with warnings.warn(msg)
|
|
except Exception as e:
|
|
self.unload_file_obj()
|
|
print(
|
|
f"An unexpected error occurred: {e}. The file object has been properly closed. "
|
|
"Please ensure that 'obj_name' exists in the file, and that the keys in 'renaming_map' are valid attributes of the object."
|
|
)
|
|
|
|
self.unload_file_obj()
|
|
|
|
def get_metadata(self, obj_path):
|
|
""" Get file attributes from object at path = obj_path. For example,
|
|
obj_path = '/' will get root level attributes or metadata.
|
|
"""
|
|
try:
|
|
# Access the attributes for the object at the given path
|
|
metadata_dict = self.file_obj[obj_path].attrs
|
|
except KeyError:
|
|
# Handle the case where the path doesn't exist
|
|
logging.error(f'Invalid object path: {obj_path}')
|
|
metadata_dict = {}
|
|
|
|
return metadata_dict
|
|
|
|
|
|
def reformat_datetime_column(self, dataset_name, column_name, src_format, desired_format='%Y-%m-%d %H:%M:%S.%f'):
|
|
# Access the dataset
|
|
dataset = self.file_obj[dataset_name]
|
|
|
|
# Read the column data into a pandas Series and decode bytes to strings
|
|
dt_column_data = pd.Series(dataset[column_name][:]).apply(lambda x: x.decode() )
|
|
|
|
# Convert to datetime using the source format
|
|
dt_column_data = pd.to_datetime(dt_column_data, format=src_format, errors = 'coerce')
|
|
|
|
# Reformat datetime objects to the desired format as strings
|
|
dt_column_data = dt_column_data.dt.strftime(desired_format)
|
|
|
|
# Encode the strings back to bytes
|
|
#encoded_data = dt_column_data.apply(lambda x: x.encode() if not pd.isnull(x) else 'N/A').to_numpy()
|
|
|
|
# Update the dataset in place
|
|
#dataset[column_name][:] = encoded_data
|
|
|
|
# Convert byte strings to datetime objects
|
|
#timestamps = [datetime.datetime.strptime(a.decode(), src_format).strftime(desired_format) for a in dt_column_data]
|
|
|
|
#datetime.strptime('31/01/22 23:59:59.999999',
|
|
# '%d/%m/%y %H:%M:%S.%f')
|
|
|
|
#pd.to_datetime(
|
|
# np.array([a.decode() for a in dt_column_data]),
|
|
# format=src_format,
|
|
# errors='coerce'
|
|
#)
|
|
|
|
|
|
# Standardize the datetime format
|
|
#standardized_time = datetime.strftime(desired_format)
|
|
|
|
# Convert to byte strings to store back in the HDF5 dataset
|
|
#standardized_time_bytes = np.array([s.encode() for s in timestamps])
|
|
|
|
# Update the column in the dataset (in-place update)
|
|
# TODO: make this a more secure operation
|
|
#dataset[column_name][:] = standardized_time_bytes
|
|
|
|
#return np.array(timestamps)
|
|
return dt_column_data.to_numpy()
|
|
|
|
# Define data append operations: append_dataset(), and update_file()
|
|
|
|
def append_dataset(self,dataset_dict, group_name):
|
|
|
|
# Parse value into HDF5 admissible type
|
|
for key in dataset_dict['attributes'].keys():
|
|
value = dataset_dict['attributes'][key]
|
|
if isinstance(value, dict):
|
|
dataset_dict['attributes'][key] = utils.convert_attrdict_to_np_structured_array(value)
|
|
|
|
if not group_name in self.file_obj:
|
|
self.file_obj.create_group(group_name, track_order=True)
|
|
self.file_obj[group_name].attrs['creation_date'] = utils.created_at().encode("utf-8")
|
|
|
|
self.file_obj[group_name].create_dataset(dataset_dict['name'], data=dataset_dict['data'])
|
|
self.file_obj[group_name][dataset_dict['name']].attrs.update(dataset_dict['attributes'])
|
|
self.file_obj[group_name].attrs['last_update_date'] = utils.created_at().encode("utf-8")
|
|
|
|
def update_file(self, path_to_append_dir):
|
|
# Split the reference file path and the append directory path into directories and filenames
|
|
ref_tail, ref_head = os.path.split(self.file_path)
|
|
ref_head_filename, head_ext = os.path.splitext(ref_head)
|
|
tail, head = os.path.split(path_to_append_dir)
|
|
|
|
|
|
# Ensure the append directory is in the same directory as the reference file and has the same name (without extension)
|
|
if not (ref_tail == tail and ref_head_filename == head):
|
|
raise ValueError("The append directory must be in the same directory as the reference HDF5 file and have the same name without the extension.")
|
|
|
|
# Close the file if it's already open
|
|
if self.file_obj is not None:
|
|
self.unload_file_obj()
|
|
|
|
# Attempt to open the file in 'r+' mode for appending
|
|
try:
|
|
hdf5_lib.create_hdf5_file_from_filesystem_path(path_to_append_dir, mode='r+')
|
|
except FileNotFoundError:
|
|
raise FileNotFoundError(f"Reference HDF5 file '{self.file_path}' not found.")
|
|
except OSError as e:
|
|
raise OSError(f"Error opening HDF5 file: {e}")
|
|
|
|
|
|
|
|
def get_parent_child_relationships(file: h5py.File):
|
|
|
|
nodes = ['/']
|
|
parent = ['']
|
|
#values = [file.attrs['count']]
|
|
# TODO: maybe we should make this more general and not dependent on file_list attribute?
|
|
#if 'file_list' in file.attrs.keys():
|
|
# values = [len(file.attrs['file_list'])]
|
|
#else:
|
|
# values = [1]
|
|
values = [len(file.keys())]
|
|
|
|
def node_visitor(name,obj):
|
|
if name.count('/') <=2:
|
|
nodes.append(obj.name)
|
|
parent.append(obj.parent.name)
|
|
#nodes.append(os.path.split(obj.name)[1])
|
|
#parent.append(os.path.split(obj.parent.name)[1])
|
|
|
|
if isinstance(obj,h5py.Dataset):# or not 'file_list' in obj.attrs.keys():
|
|
values.append(1)
|
|
else:
|
|
print(obj.name)
|
|
try:
|
|
values.append(len(obj.keys()))
|
|
except:
|
|
values.append(0)
|
|
|
|
file.visititems(node_visitor)
|
|
|
|
return nodes, parent, values
|
|
|
|
|
|
def __print_metadata__(name, obj, folder_depth, yaml_dict):
|
|
|
|
"""
|
|
Extracts metadata from HDF5 groups and datasets and organizes them into a dictionary with compact representation.
|
|
|
|
Parameters:
|
|
-----------
|
|
name (str): Name of the HDF5 object being inspected.
|
|
obj (h5py.Group or h5py.Dataset): The HDF5 object (Group or Dataset).
|
|
folder_depth (int): Maximum depth of folders to explore.
|
|
yaml_dict (dict): Dictionary to populate with metadata.
|
|
"""
|
|
# Process only objects within the specified folder depth
|
|
if len(obj.name.split('/')) <= folder_depth: # and ".h5" not in obj.name:
|
|
name_to_list = obj.name.split('/')
|
|
name_head = name_to_list[-1] if not name_to_list[-1]=='' else obj.name
|
|
|
|
if isinstance(obj, h5py.Group): # Handle groups
|
|
# Convert attributes to a YAML/JSON serializable format
|
|
attr_dict = {key: utils.to_serializable_dtype(val) for key, val in obj.attrs.items()}
|
|
|
|
# Initialize the group dictionary
|
|
group_dict = {"name": name_head, "attributes": attr_dict}
|
|
|
|
# Handle group members compactly
|
|
#subgroups = [member_name for member_name in obj if isinstance(obj[member_name], h5py.Group)]
|
|
#datasets = [member_name for member_name in obj if isinstance(obj[member_name], h5py.Dataset)]
|
|
|
|
# Summarize groups and datasets
|
|
#group_dict["content_summary"] = {
|
|
# "group_count": len(subgroups),
|
|
# "group_preview": subgroups[:3] + (["..."] if len(subgroups) > 3 else []),
|
|
# "dataset_count": len(datasets),
|
|
# "dataset_preview": datasets[:3] + (["..."] if len(datasets) > 3 else [])
|
|
#}
|
|
|
|
yaml_dict[obj.name] = group_dict
|
|
|
|
elif isinstance(obj, h5py.Dataset): # Handle datasets
|
|
# Convert attributes to a YAML/JSON serializable format
|
|
attr_dict = {key: utils.to_serializable_dtype(val) for key, val in obj.attrs.items()}
|
|
|
|
dataset_dict = {"name": name_head, "attributes": attr_dict}
|
|
|
|
yaml_dict[obj.name] = dataset_dict
|
|
|
|
|
|
|
|
def serialize_metadata(input_filename_path, folder_depth: int = 4, output_format: str = 'yaml') -> str:
|
|
"""
|
|
Serialize metadata from an HDF5 file into YAML or JSON format.
|
|
|
|
Parameters
|
|
----------
|
|
input_filename_path : str
|
|
The path to the input HDF5 file.
|
|
folder_depth : int, optional
|
|
The folder depth to control how much of the HDF5 file hierarchy is traversed (default is 4).
|
|
output_format : str, optional
|
|
The format to serialize the output, either 'yaml' or 'json' (default is 'yaml').
|
|
|
|
Returns
|
|
-------
|
|
str
|
|
The output file path where the serialized metadata is stored (either .yaml or .json).
|
|
|
|
"""
|
|
|
|
# Choose the appropriate output format (YAML or JSON)
|
|
if output_format not in ['yaml', 'json']:
|
|
raise ValueError("Unsupported format. Please choose either 'yaml' or 'json'.")
|
|
|
|
# Initialize dictionary to store YAML/JSON data
|
|
yaml_dict = {}
|
|
|
|
# Split input file path to get the output file's base name
|
|
output_filename_tail, ext = os.path.splitext(input_filename_path)
|
|
|
|
# Open the HDF5 file and extract metadata
|
|
with h5py.File(input_filename_path, 'r') as f:
|
|
# Convert attribute dict to a YAML/JSON serializable dict
|
|
#attrs_dict = {key: utils.to_serializable_dtype(val) for key, val in f.attrs.items()}
|
|
#yaml_dict[f.name] = {
|
|
# "name": f.name,
|
|
# "attributes": attrs_dict,
|
|
# "datasets": {}
|
|
#}
|
|
__print_metadata__(f.name, f, folder_depth, yaml_dict)
|
|
# Traverse HDF5 file hierarchy and add datasets
|
|
f.visititems(lambda name, obj: __print_metadata__(name, obj, folder_depth, yaml_dict))
|
|
|
|
|
|
# Serialize and write the data
|
|
output_file_path = output_filename_tail + '.' + output_format
|
|
with open(output_file_path, 'w') as output_file:
|
|
if output_format == 'json':
|
|
json_output = json.dumps(yaml_dict, indent=4, sort_keys=False)
|
|
output_file.write(json_output)
|
|
elif output_format == 'yaml':
|
|
yaml_output = yaml.dump(yaml_dict, sort_keys=False)
|
|
output_file.write(yaml_output)
|
|
|
|
return output_file_path
|
|
|
|
|
|
def get_groups_at_a_level(file: h5py.File, level: str):
|
|
|
|
groups = []
|
|
def node_selector(name, obj):
|
|
if name.count('/') == level:
|
|
print(name)
|
|
groups.append(obj.name)
|
|
|
|
file.visititems(node_selector)
|
|
#file.visititems()
|
|
return groups
|
|
|
|
def read_mtable_as_dataframe(filename):
|
|
|
|
"""
|
|
Reconstruct a MATLAB Table encoded in a .h5 file as a Pandas DataFrame.
|
|
|
|
This function reads a .h5 file containing a MATLAB Table and reconstructs it as a Pandas DataFrame.
|
|
The input .h5 file contains one group per row of the MATLAB Table. Each group stores the table's
|
|
dataset-like variables as Datasets, while categorical and numerical variables are represented as
|
|
attributes of the respective group.
|
|
|
|
To ensure homogeneity of data columns, the DataFrame is constructed column-wise.
|
|
|
|
Parameters
|
|
----------
|
|
filename : str
|
|
The name of the .h5 file. This may include the file's location and path information.
|
|
|
|
Returns
|
|
-------
|
|
pd.DataFrame
|
|
The MATLAB Table reconstructed as a Pandas DataFrame.
|
|
"""
|
|
|
|
|
|
#contructs dataframe by filling out entries columnwise. This way we can ensure homogenous data columns"""
|
|
|
|
with h5py.File(filename,'r') as file:
|
|
|
|
# Define group's attributes and datasets. This should hold
|
|
# for all groups. TODO: implement verification and noncompliance error if needed.
|
|
group_list = list(file.keys())
|
|
group_attrs = list(file[group_list[0]].attrs.keys())
|
|
#
|
|
column_attr_names = [item[item.find('_')+1::] for item in group_attrs]
|
|
column_attr_names_idx = [int(item[4:(item.find('_'))]) for item in group_attrs]
|
|
|
|
group_datasets = list(file[group_list[0]].keys()) if not 'DS_EMPTY' in file[group_list[0]].keys() else []
|
|
#
|
|
column_dataset_names = [file[group_list[0]][item].attrs['column_name'] for item in group_datasets]
|
|
column_dataset_names_idx = [int(item[2:]) for item in group_datasets]
|
|
|
|
|
|
# Define data_frame as group_attrs + group_datasets
|
|
#pd_series_index = group_attrs + group_datasets
|
|
pd_series_index = column_attr_names + column_dataset_names
|
|
|
|
output_dataframe = pd.DataFrame(columns=pd_series_index,index=group_list)
|
|
|
|
tmp_col = []
|
|
|
|
for meas_prop in group_attrs + group_datasets:
|
|
if meas_prop in group_attrs:
|
|
column_label = meas_prop[meas_prop.find('_')+1:]
|
|
# Create numerical or categorical column from group's attributes
|
|
tmp_col = [file[group_key].attrs[meas_prop][()][0] for group_key in group_list]
|
|
else:
|
|
# Create dataset column from group's datasets
|
|
column_label = file[group_list[0] + '/' + meas_prop].attrs['column_name']
|
|
#tmp_col = [file[group_key + '/' + meas_prop][()][0] for group_key in group_list]
|
|
tmp_col = [file[group_key + '/' + meas_prop][()] for group_key in group_list]
|
|
|
|
output_dataframe.loc[:,column_label] = tmp_col
|
|
|
|
return output_dataframe
|
|
|
|
if __name__ == "__main__":
|
|
if len(sys.argv) < 5:
|
|
print("Usage: python hdf5_ops.py serialize <path/to/target_file.hdf5> <folder_depth : int = 2> <format=json|yaml>")
|
|
sys.exit(1)
|
|
|
|
if sys.argv[1] == 'serialize':
|
|
input_hdf5_file = sys.argv[2]
|
|
folder_depth = int(sys.argv[3])
|
|
file_format = sys.argv[4]
|
|
|
|
try:
|
|
# Call the serialize_metadata function and capture the output path
|
|
path_to_file = serialize_metadata(input_hdf5_file,
|
|
folder_depth = folder_depth,
|
|
output_format=file_format)
|
|
print(f"Metadata serialized to {path_to_file}")
|
|
except Exception as e:
|
|
print(f"An error occurred during serialization: {e}")
|
|
sys.exit(1)
|
|
|
|
#run(sys.argv[2])
|
|
|
|
|
|
def save_file_dict_to_hdf5(h5file, group_name, file_dict):
|
|
"""
|
|
Transfers data from a file_dict to an HDF5 file.
|
|
|
|
Parameters
|
|
----------
|
|
h5file : h5py.File
|
|
HDF5 file object where the data will be written.
|
|
group_name : str
|
|
Name of the HDF5 group where data will be stored.
|
|
file_dict : dict
|
|
Dictionary containing file data to be transferred. Required structure:
|
|
{
|
|
'name': str,
|
|
'attributes_dict': dict,
|
|
'datasets': [
|
|
{
|
|
'name': str,
|
|
'data': array-like,
|
|
'shape': tuple,
|
|
'attributes': dict (optional)
|
|
},
|
|
...
|
|
]
|
|
}
|
|
|
|
Returns
|
|
-------
|
|
None
|
|
"""
|
|
|
|
if not file_dict:
|
|
return
|
|
|
|
try:
|
|
# Create group and add their attributes
|
|
filename = file_dict['name']
|
|
|
|
# Base filename to use as group name
|
|
base_filename = file_dict['name']
|
|
candidate_name = base_filename
|
|
replicate_index = 0
|
|
|
|
# Check for existing group and find a free name
|
|
parent_group = h5file.require_group(group_name)
|
|
while candidate_name in parent_group:
|
|
replicate_index += 1
|
|
candidate_name = f"{base_filename}_{replicate_index}"
|
|
|
|
group = h5file[group_name].create_group(name=candidate_name )
|
|
# Add group attributes
|
|
group.attrs.update(file_dict['attributes_dict'])
|
|
|
|
# Annotate replicate if renamed
|
|
if replicate_index > 0:
|
|
group.attrs['replicate_of'] = base_filename
|
|
group.attrs['replicate_info'] = (
|
|
f"Renamed due to existing group with same name. "
|
|
f"This is replicate #{replicate_index}."
|
|
)
|
|
|
|
# Add datasets to the just created group
|
|
for dataset in file_dict['datasets']:
|
|
dataset_obj = group.create_dataset(
|
|
name=dataset['name'],
|
|
data=dataset['data'],
|
|
shape=dataset['shape']
|
|
)
|
|
|
|
# Add dataset's attributes
|
|
attributes = dataset.get('attributes', {})
|
|
dataset_obj.attrs.update(attributes)
|
|
group.attrs['last_update_date'] = utils.created_at().encode('utf-8')
|
|
|
|
stdout = f'Completed transfer for /{group_name}/{filename}'
|
|
print(stdout)
|
|
|
|
except Exception as inst:
|
|
logging.error('Failed to transfer data into HDF5: %s', inst)
|
|
return -1
|
|
|
|
return 0
|
|
|