Files
dima/src/hdf5_ops.py

382 lines
14 KiB
Python

import sys
import os
root_dir = os.path.abspath(os.curdir)
sys.path.append(root_dir)
import h5py
import pandas as pd
import numpy as np
import utils.g5505_utils as utils
import logging
import datetime
import yaml
class HDF5DataOpsManager():
def __init__(self, file_path, mode = 'r+') -> None:
if mode in ['r','r+']:
self.mode = mode
self.file_path = file_path
self.file_obj = None
self._open_file()
self.list_of_datasets = []
# Define private methods
def _open_file(self):
if self.file_obj is None:
self.file_obj = h5py.File(self.file_path, self.mode)
def _collect_dataset_names(self, name, obj, list_of_datasets):
if isinstance(obj, h5py.Dataset):
list_of_datasets.append(name)
# Define public methods
def close_file(self):
if self.file_obj:
self.file_obj.flush() # Ensure all data is written to disk
self.file_obj.close()
self.file_obj = None
def retrieve_dataframe_of_dataset_names(self):
list_of_datasets = []
self.file_obj.visititems(lambda name, obj: self._collect_dataset_names(name, obj, list_of_datasets))
dataset_df = pd.DataFrame({'dataset_name': list_of_datasets})
dataset_df['parent_instrument'] = dataset_df['dataset_name'].apply(lambda x: x.split('/')[-3])
dataset_df['parent_file'] = dataset_df['dataset_name'].apply(lambda x: x.split('/')[-2])
return dataset_df
def read_dataset_as_dataframe(self,dataset_name):
"""
returns a copy of the dataset content in the form of dataframe when possible or numpy array
"""
if self.file_obj is None:
self.open_file()
dataset_obj = self.file_obj[dataset_name]
# Read dataset content from dataset obj
data = dataset_obj[...]
# The above statement can be understood as follows:
# data = np.empty(shape=dataset_obj.shape,
# dtype=dataset_obj.dtype)
# dataset_obj.read_direct(data)
try:
return pd.DataFrame(data)
except ValueError as exp:
logging.error(f"Failed to convert dataset '{dataset_name}' to DataFrame: {exp}")
return data # 'data' is a NumPy array here
def append_dataset(self,dataset_dict, group_name):
# Parse value into HDF5 admissible type
for key in dataset_dict['attributes'].keys():
value = dataset_dict['attributes'][key]
dataset_dict['attributes'][key] = utils.parse_attribute(value)
#name = dataset_dict['name']
#data = dataset_dict['data']
#dtype = dataset_dict['dtype']
self.file_obj[group_name].create_dataset(dataset_dict['name'], data=dataset_dict['data'])
self.file_obj[group_name][dataset_dict['name']].attrs.update(dataset_dict['attributes'])
def append_annotations(self, obj_name, annotation_dict):
""" appends annotations in the form of a dictionary to the obj (group or dataset) spefified by obj_name"""
obj = self.file_obj[obj_name]
# Verify if attributes to append are all new
if any(new_attr_key in obj.attrs.keys() for new_attr_key in annotation_dict.keys()):
self.close_file()
raise ValueError("Make sure the provided key, value pairs are not existing metadata elements or attributes. To modify or delete existing attributes use .modify_annotation() or .delete_annotation()")
for new_attr_key in annotation_dict.keys():
value = annotation_dict[new_attr_key]
if isinstance(value, dict):
annotation_dict[new_attr_key] = utils.parse_attribute(annotation_dict[new_attr_key])
obj.attrs.update(annotation_dict)
def get_metadata(self, obj_path):
""" Get file attributes from object at path = obj_path. For example,
obj_path = '/' will get root level attributes or metadata.
"""
try:
# Access the attributes for the object at the given path
metadata_dict = self.file_obj[obj_path].attrs
except KeyError:
# Handle the case where the path doesn't exist
logging.error(f'Invalid object path: {obj_path}')
metadata_dict = {}
return metadata_dict
def reformat_datetime_column(self, dataset_name, column_name, src_format, desired_format='%Y-%m-%d %H:%M:%S.%f'):
# Access the dataset
dataset = self.file_obj[dataset_name]
# Read the column data into a pandas Series and decode bytes to strings
dt_column_data = pd.Series(dataset[column_name][:]).apply(lambda x: x.decode() )
# Convert to datetime using the source format
dt_column_data = pd.to_datetime(dt_column_data, format=src_format, errors = 'coerce')
# Reformat datetime objects to the desired format as strings
dt_column_data = dt_column_data.dt.strftime(desired_format)
# Encode the strings back to bytes
#encoded_data = dt_column_data.apply(lambda x: x.encode() if not pd.isnull(x) else 'N/A').to_numpy()
# Update the dataset in place
#dataset[column_name][:] = encoded_data
# Convert byte strings to datetime objects
#timestamps = [datetime.datetime.strptime(a.decode(), src_format).strftime(desired_format) for a in dt_column_data]
#datetime.strptime('31/01/22 23:59:59.999999',
# '%d/%m/%y %H:%M:%S.%f')
#pd.to_datetime(
# np.array([a.decode() for a in dt_column_data]),
# format=src_format,
# errors='coerce'
#)
# Standardize the datetime format
#standardized_time = datetime.strftime(desired_format)
# Convert to byte strings to store back in the HDF5 dataset
#standardized_time_bytes = np.array([s.encode() for s in timestamps])
# Update the column in the dataset (in-place update)
# TODO: make this a more secure operation
#dataset[column_name][:] = standardized_time_bytes
#return np.array(timestamps)
return dt_column_data.to_numpy()
def read_dataset_from_hdf5file(hdf5_file_path, dataset_path):
# Open the HDF5 file
with h5py.File(hdf5_file_path, 'r') as hdf:
# Load the dataset
dataset = hdf[dataset_path]
data = np.empty(dataset.shape, dtype=dataset.dtype)
dataset.read_direct(data)
df = pd.DataFrame(data)
for col_name in df.select_dtypes(exclude='number'):
df[col_name] = df[col_name].str.decode('utf-8') #apply(lambda x: x.decode('utf-8') if isinstance(x,bytes) else x)
## Extract metadata (attributes) and convert to a dictionary
#metadata = hdf5_vis.construct_attributes_dict(hdf[dataset_name].attrs)
## Create a one-row DataFrame with the metadata
#metadata_df = pd.DataFrame.from_dict(data, orient='columns')
return df
def list_datasets_in_hdf5file(hdf5_file_path):
def get_datasets(name, obj, list_of_datasets):
if isinstance(obj,h5py.Dataset):
list_of_datasets.append(name)
#print(f'Adding dataset: {name}') #tail: {head} head: {tail}')
with h5py.File(hdf5_file_path,'r') as file:
list_of_datasets = []
file.visititems(lambda name, obj: get_datasets(name, obj, list_of_datasets))
dataset_df = pd.DataFrame({'dataset_name':list_of_datasets})
dataset_df['parent_instrument'] = dataset_df['dataset_name'].apply(lambda x: x.split('/')[-3])
dataset_df['parent_file'] = dataset_df['dataset_name'].apply(lambda x: x.split('/')[-2])
return dataset_df
def get_parent_child_relationships(file: h5py.File):
nodes = ['/']
parent = ['']
#values = [file.attrs['count']]
# TODO: maybe we should make this more general and not dependent on file_list attribute?
#if 'file_list' in file.attrs.keys():
# values = [len(file.attrs['file_list'])]
#else:
# values = [1]
values = [len(file.keys())]
def node_visitor(name,obj):
if name.count('/') <=2:
nodes.append(obj.name)
parent.append(obj.parent.name)
#nodes.append(os.path.split(obj.name)[1])
#parent.append(os.path.split(obj.parent.name)[1])
if isinstance(obj,h5py.Dataset):# or not 'file_list' in obj.attrs.keys():
values.append(1)
else:
print(obj.name)
try:
values.append(len(obj.keys()))
except:
values.append(0)
file.visititems(node_visitor)
return nodes, parent, values
def to_serializable_dtype(value):
"""Transform value's dtype into YAML/JSON compatible dtype
Parameters
----------
value : _type_
_description_
Returns
-------
_type_
_description_
"""
try:
if isinstance(value, np.generic):
if np.issubdtype(value.dtype, np.bytes_):
value = value.decode('utf-8')
elif np.issubdtype(value.dtype, np.unicode_):
value = str(value)
elif np.issubdtype(value.dtype, np.number):
value = float(value)
else:
print('Yaml-compatible data-type was not found. Value has been set to NaN.')
value = np.nan
elif isinstance(value, np.ndarray):
# Handling structured array types (with fields)
if value.dtype.names:
value = {field: to_serializable_dtype(value[field]) for field in value.dtype.names}
else:
# Handling regular array NumPy types
if np.issubdtype(value.dtype, np.bytes_):
value = [item.decode('utf-8') for item in value] if len(value) > 1 else value[0].decode('utf-8')
elif np.issubdtype(value.dtype, np.unicode_):
value = [str(item) for item in value] if len(value) > 1 else str(value[0])
elif np.issubdtype(value.dtype, np.integer):
value = [int(item) for item in value] if len(value) > 1 else int(value[0])
elif np.issubdtype(value.dtype, np.floating):
value = [float(item) for item in value] if len(value) > 1 else float(value[0])
else:
print('Yaml-compatible data-type was not found. Value has been set to NaN.')
value = np.nan
except Exception as e:
print(f'Error converting value: {e}. Value has been set to NaN.')
value = np.nan
return value
def is_structured_array(attr_val):
if isinstance(attr_val,np.ndarray):
return True if attr_val.dtype.names is not None else False
else:
return False
def construct_attributes_dict(attrs_obj):
attr_dict = {}
for key, value in attrs_obj.items():
attr_dict[key] = {}
if not key in ['file_list','filtered_file_list']:
if is_structured_array(value):
#for subattr in value.dtype.names:
#attr_dict[key][subattr] = make_dtype_yaml_compatible(value[subattr])
attr_dict[key] = to_serializable_dtype(value)
else:
attr_dict[key] = {"rename_as" : key,
"value" : to_serializable_dtype(value)
}
#if isinstance(value,str):
# value.replace('\\','\\\\')
return attr_dict
def print_metadata(name, obj, folder_depth, yaml_dict):
# TODO: should we enable deeper folders ?
if len(obj.name.split('/')) <= folder_depth:
name_to_list = obj.name.split('/')
name_head = name_to_list[-1]
if isinstance(obj,h5py.Group):
#print('name:', obj.name)
#print('attributes:', dict(obj.attrs))
#attr_dict = {}
group_dict = {}
attr_dict = construct_attributes_dict(obj.attrs)
#for key, value in obj.attrs.items():
#print (key, value.dtype)
# if key == 'Layout':
# print(value)
# if not key in ['file_list','filtered_file_list']:
# value = make_dtype_yaml_compatible(value)
# attr_dict[key] = {'rename_as' : key,
# 'value' : value
# }
#group_dict[obj.name] = {'name': obj.name, 'attributes': attr_dict}
group_dict = {"name": name_head, "attributes": attr_dict, "datasets":{}}
#group_dict[obj.name]["name"] = obj.name
#group_dict[obj.name]["attributes"] = attr_dict
#group_dict[obj.name]["datasets"] = {}
#print(name)
yaml_dict[obj.name] = group_dict
elif isinstance(obj, h5py.Dataset):
parent_name = '/'.join(name_to_list[:-1])
yaml_dict[parent_name]["datasets"][name_head] = {"rename_as": name_head ,"attributes": construct_attributes_dict(obj.attrs)}
#print(yaml.dump(group_dict,sort_keys=False))
#elif len(obj.name.split('/')) == 3:
# print(yaml.dump())
def to_yaml(input_filename_path,folder_depth: int = 4):
yaml_dict = {}
output_filename_tail, ext = os.path.splitext(input_filename_path)
with h5py.File(input_filename_path,'r') as f:
attrs_dict = construct_attributes_dict(f.attrs)
yaml_dict[f.name] = {"name": f.name, "attributes": attrs_dict, "datasets":{}}
f.visititems(lambda name, obj: print_metadata(name,obj,folder_depth,yaml_dict))
#with open(output_filename_tail+".json","w") as yaml_file:
# json_obj = json.dumps(yaml_dict,indent=4,sort_keys=False,)
# yaml_file.write(json_obj)
with open(output_filename_tail+".yaml","w") as yaml_file:
yaml_output = yaml.dump(yaml_dict,sort_keys=False)
#for key in yaml_dict:
# yaml_output = yaml.dump(yaml_dict[key],sort_keys=False)
yaml_file.write(yaml_output )
return output_filename_tail+".yaml"