src/hdf5_data_extraction.py -> src/hdf5_ops.py

This commit is contained in:
2024-09-13 14:55:12 +02:00
parent e8e2473ebe
commit b42482069c

198
src/hdf5_ops.py Normal file
View File

@@ -0,0 +1,198 @@
import sys
import os
root_dir = os.path.abspath(os.curdir)
sys.path.append(root_dir)
import h5py
import pandas as pd
import numpy as np
import utils.g5505_utils as utils
import logging
import datetime
class HDF5DataOpsManager():
def __init__(self, file_path, mode = 'r+') -> None:
if mode in ['r','r+']:
self.mode = mode
self.file_path = file_path
self.file_obj = None
self._open_file()
self.list_of_datasets = []
# Define private methods
def _open_file(self):
if self.file_obj is None:
self.file_obj = h5py.File(self.file_path, self.mode)
def _collect_dataset_names(self, name, obj, list_of_datasets):
if isinstance(obj, h5py.Dataset):
list_of_datasets.append(name)
# Define public methods
def close_file(self):
if self.file_obj:
self.file_obj.flush() # Ensure all data is written to disk
self.file_obj.close()
self.file_obj = None
def retrieve_dataframe_of_dataset_names(self):
list_of_datasets = []
self.file_obj.visititems(lambda name, obj: self._collect_dataset_names(name, obj, list_of_datasets))
dataset_df = pd.DataFrame({'dataset_name': list_of_datasets})
dataset_df['parent_instrument'] = dataset_df['dataset_name'].apply(lambda x: x.split('/')[-3])
dataset_df['parent_file'] = dataset_df['dataset_name'].apply(lambda x: x.split('/')[-2])
return dataset_df
def read_dataset_as_dataframe(self,dataset_name):
"""
returns a copy of the dataset content in the form of dataframe when possible or numpy array
"""
if self.file_obj is None:
self.open_file()
dataset_obj = self.file_obj[dataset_name]
# Read dataset content from dataset obj
data = dataset_obj[...]
# The above statement can be understood as follows:
# data = np.empty(shape=dataset_obj.shape,
# dtype=dataset_obj.dtype)
# dataset_obj.read_direct(data)
try:
return pd.DataFrame(data)
except ValueError as exp:
logging.error(f"Failed to convert dataset '{dataset_name}' to DataFrame: {exp}")
return data # 'data' is a NumPy array here
def append_dataset(self,dataset_dict, group_name):
# Parse value into HDF5 admissible type
for key in dataset_dict['attributes'].keys():
value = dataset_dict['attributes'][key]
dataset_dict['attributes'][key] = utils.parse_attribute(value)
#name = dataset_dict['name']
#data = dataset_dict['data']
#dtype = dataset_dict['dtype']
self.file_obj[group_name].create_dataset(dataset_dict['name'], data=dataset_dict['data'])
self.file_obj[group_name][dataset_dict['name']].attrs.update(dataset_dict['attributes'])
def append_annotations(self, obj_name, annotation_dict):
""" appends annotations in the form of a dictionary to the obj (group or dataset) spefified by obj_name"""
obj = self.file_obj[obj_name]
# Verify if attributes to append are all new
if any(new_attr_key in obj.attrs.keys() for new_attr_key in annotation_dict.keys()):
self.close_file()
raise ValueError("Make sure the provided key, value pairs are not existing metadata elements or attributes. To modify or delete existing attributes use .modify_annotation() or .delete_annotation()")
for new_attr_key in annotation_dict.keys():
value = annotation_dict[new_attr_key]
if isinstance(value, dict):
annotation_dict[new_attr_key] = utils.parse_attribute(annotation_dict[new_attr_key])
obj.attrs.update(annotation_dict)
def get_metadata(self, obj_path):
""" Get file attributes from object at path = obj_path. For example,
obj_path = '/' will get root level attributes or metadata.
"""
try:
# Access the attributes for the object at the given path
metadata_dict = self.file_obj[obj_path].attrs
except KeyError:
# Handle the case where the path doesn't exist
logging.error(f'Invalid object path: {obj_path}')
metadata_dict = {}
return metadata_dict
def reformat_datetime_column(self, dataset_name, column_name, src_format, desired_format='%Y-%m-%d %H:%M:%S.%f'):
# Access the dataset
dataset = self.file_obj[dataset_name]
# Read the column data into a pandas Series and decode bytes to strings
dt_column_data = pd.Series(dataset[column_name][:]).apply(lambda x: x.decode() )
# Convert to datetime using the source format
dt_column_data = pd.to_datetime(dt_column_data, format=src_format, errors = 'coerce')
# Reformat datetime objects to the desired format as strings
dt_column_data = dt_column_data.dt.strftime(desired_format)
# Encode the strings back to bytes
#encoded_data = dt_column_data.apply(lambda x: x.encode() if not pd.isnull(x) else 'N/A').to_numpy()
# Update the dataset in place
#dataset[column_name][:] = encoded_data
# Convert byte strings to datetime objects
#timestamps = [datetime.datetime.strptime(a.decode(), src_format).strftime(desired_format) for a in dt_column_data]
#datetime.strptime('31/01/22 23:59:59.999999',
# '%d/%m/%y %H:%M:%S.%f')
#pd.to_datetime(
# np.array([a.decode() for a in dt_column_data]),
# format=src_format,
# errors='coerce'
#)
# Standardize the datetime format
#standardized_time = datetime.strftime(desired_format)
# Convert to byte strings to store back in the HDF5 dataset
#standardized_time_bytes = np.array([s.encode() for s in timestamps])
# Update the column in the dataset (in-place update)
# TODO: make this a more secure operation
#dataset[column_name][:] = standardized_time_bytes
#return np.array(timestamps)
return dt_column_data.to_numpy()
def read_dataset_from_hdf5file(hdf5_file_path, dataset_path):
# Open the HDF5 file
with h5py.File(hdf5_file_path, 'r') as hdf:
# Load the dataset
dataset = hdf[dataset_path]
data = np.empty(dataset.shape, dtype=dataset.dtype)
dataset.read_direct(data)
df = pd.DataFrame(data)
for col_name in df.select_dtypes(exclude='number'):
df[col_name] = df[col_name].str.decode('utf-8') #apply(lambda x: x.decode('utf-8') if isinstance(x,bytes) else x)
## Extract metadata (attributes) and convert to a dictionary
#metadata = hdf5_vis.construct_attributes_dict(hdf[dataset_name].attrs)
## Create a one-row DataFrame with the metadata
#metadata_df = pd.DataFrame.from_dict(data, orient='columns')
return df
def list_datasets_in_hdf5file(hdf5_file_path):
def get_datasets(name, obj, list_of_datasets):
if isinstance(obj,h5py.Dataset):
list_of_datasets.append(name)
#print(f'Adding dataset: {name}') #tail: {head} head: {tail}')
with h5py.File(hdf5_file_path,'r') as file:
list_of_datasets = []
file.visititems(lambda name, obj: get_datasets(name, obj, list_of_datasets))
dataset_df = pd.DataFrame({'dataset_name':list_of_datasets})
dataset_df['parent_instrument'] = dataset_df['dataset_name'].apply(lambda x: x.split('/')[-3])
dataset_df['parent_file'] = dataset_df['dataset_name'].apply(lambda x: x.split('/')[-2])
return dataset_df