diff --git a/src/hdf5_ops.py b/src/hdf5_ops.py index 133f0f3..883fc07 100644 --- a/src/hdf5_ops.py +++ b/src/hdf5_ops.py @@ -10,6 +10,7 @@ import numpy as np import utils.g5505_utils as utils import logging import datetime +import yaml class HDF5DataOpsManager(): def __init__(self, file_path, mode = 'r+') -> None: @@ -227,4 +228,154 @@ def get_parent_child_relationships(file: h5py.File): file.visititems(node_visitor) - return nodes, parent, values \ No newline at end of file + return nodes, parent, values + + +def to_serializable_dtype(value): + + """Transform value's dtype into YAML/JSON compatible dtype + + Parameters + ---------- + value : _type_ + _description_ + + Returns + ------- + _type_ + _description_ + """ + try: + if isinstance(value, np.generic): + if np.issubdtype(value.dtype, np.bytes_): + value = value.decode('utf-8') + elif np.issubdtype(value.dtype, np.unicode_): + value = str(value) + elif np.issubdtype(value.dtype, np.number): + value = float(value) + else: + print('Yaml-compatible data-type was not found. Value has been set to NaN.') + value = np.nan + elif isinstance(value, np.ndarray): + # Handling structured array types (with fields) + if value.dtype.names: + value = {field: to_serializable_dtype(value[field]) for field in value.dtype.names} + else: + # Handling regular array NumPy types + if np.issubdtype(value.dtype, np.bytes_): + value = [item.decode('utf-8') for item in value] if len(value) > 1 else value[0].decode('utf-8') + elif np.issubdtype(value.dtype, np.unicode_): + value = [str(item) for item in value] if len(value) > 1 else str(value[0]) + elif np.issubdtype(value.dtype, np.integer): + value = [int(item) for item in value] if len(value) > 1 else int(value[0]) + elif np.issubdtype(value.dtype, np.floating): + value = [float(item) for item in value] if len(value) > 1 else float(value[0]) + else: + print('Yaml-compatible data-type was not found. Value has been set to NaN.') + value = np.nan + + except Exception as e: + print(f'Error converting value: {e}. Value has been set to NaN.') + value = np.nan + + return value + +def is_structured_array(attr_val): + if isinstance(attr_val,np.ndarray): + return True if attr_val.dtype.names is not None else False + else: + return False + +def construct_attributes_dict(attrs_obj): + + attr_dict = {} + for key, value in attrs_obj.items(): + attr_dict[key] = {} + if not key in ['file_list','filtered_file_list']: + + if is_structured_array(value): + #for subattr in value.dtype.names: + #attr_dict[key][subattr] = make_dtype_yaml_compatible(value[subattr]) + attr_dict[key] = to_serializable_dtype(value) + else: + attr_dict[key] = {"rename_as" : key, + "value" : to_serializable_dtype(value) + } + + #if isinstance(value,str): + # value.replace('\\','\\\\') + + + return attr_dict + +def print_metadata(name, obj, folder_depth, yaml_dict): + + # TODO: should we enable deeper folders ? + if len(obj.name.split('/')) <= folder_depth: + name_to_list = obj.name.split('/') + name_head = name_to_list[-1] + + if isinstance(obj,h5py.Group): + #print('name:', obj.name) + #print('attributes:', dict(obj.attrs)) + #attr_dict = {} + group_dict = {} + + attr_dict = construct_attributes_dict(obj.attrs) + + #for key, value in obj.attrs.items(): + #print (key, value.dtype) + # if key == 'Layout': + # print(value) + + # if not key in ['file_list','filtered_file_list']: + + # value = make_dtype_yaml_compatible(value) + + # attr_dict[key] = {'rename_as' : key, + # 'value' : value + # } + + #group_dict[obj.name] = {'name': obj.name, 'attributes': attr_dict} + group_dict = {"name": name_head, "attributes": attr_dict, "datasets":{}} + #group_dict[obj.name]["name"] = obj.name + #group_dict[obj.name]["attributes"] = attr_dict + #group_dict[obj.name]["datasets"] = {} + #print(name) + + yaml_dict[obj.name] = group_dict + elif isinstance(obj, h5py.Dataset): + parent_name = '/'.join(name_to_list[:-1]) + yaml_dict[parent_name]["datasets"][name_head] = {"rename_as": name_head ,"attributes": construct_attributes_dict(obj.attrs)} + #print(yaml.dump(group_dict,sort_keys=False)) + + #elif len(obj.name.split('/')) == 3: + # print(yaml.dump()) + +def take_yml_snapshot_of_hdf5_file(input_filename_path,folder_depth: int = 4): + + yaml_dict = {} + + output_filename_tail, ext = os.path.splitext(input_filename_path) + + with h5py.File(input_filename_path,'r') as f: + + attrs_dict = construct_attributes_dict(f.attrs) + yaml_dict[f.name] = {"name": f.name, "attributes": attrs_dict, "datasets":{}} + f.visititems(lambda name, obj: print_metadata(name,obj,folder_depth,yaml_dict)) + + #with open(output_filename_tail+".json","w") as yaml_file: + # json_obj = json.dumps(yaml_dict,indent=4,sort_keys=False,) + # yaml_file.write(json_obj) + + with open(output_filename_tail+".yaml","w") as yaml_file: + yaml_output = yaml.dump(yaml_dict,sort_keys=False) + #for key in yaml_dict: + # yaml_output = yaml.dump(yaml_dict[key],sort_keys=False) + yaml_file.write(yaml_output ) + + return output_filename_tail+".yaml" + + + +