Moved take_yml_snapshot_of_hdf5_file func and associted helper functions from hdf5_vis.py into hdf5_ops.py
This commit is contained in:
153
src/hdf5_vis.py
153
src/hdf5_vis.py
@ -67,156 +67,3 @@ def display_group_hierarchy_on_a_treemap(filename: str):
|
|||||||
#pio.write_image(fig,file_name + ".png",width=800,height=600,format='png')
|
#pio.write_image(fig,file_name + ".png",width=800,height=600,format='png')
|
||||||
|
|
||||||
#
|
#
|
||||||
def to_serializable_dtype(value):
|
|
||||||
|
|
||||||
"""Transform value's dtype into YAML/JSON compatible dtype
|
|
||||||
|
|
||||||
Parameters
|
|
||||||
----------
|
|
||||||
value : _type_
|
|
||||||
_description_
|
|
||||||
|
|
||||||
Returns
|
|
||||||
-------
|
|
||||||
_type_
|
|
||||||
_description_
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
if isinstance(value, np.generic):
|
|
||||||
if np.issubdtype(value.dtype, np.bytes_):
|
|
||||||
value = value.decode('utf-8')
|
|
||||||
elif np.issubdtype(value.dtype, np.unicode_):
|
|
||||||
value = str(value)
|
|
||||||
elif np.issubdtype(value.dtype, np.number):
|
|
||||||
value = float(value)
|
|
||||||
else:
|
|
||||||
print('Yaml-compatible data-type was not found. Value has been set to NaN.')
|
|
||||||
value = np.nan
|
|
||||||
elif isinstance(value, np.ndarray):
|
|
||||||
# Handling structured array types (with fields)
|
|
||||||
if value.dtype.names:
|
|
||||||
value = {field: to_serializable_dtype(value[field]) for field in value.dtype.names}
|
|
||||||
else:
|
|
||||||
# Handling regular array NumPy types
|
|
||||||
if np.issubdtype(value.dtype, np.bytes_):
|
|
||||||
value = [item.decode('utf-8') for item in value] if len(value) > 1 else value[0].decode('utf-8')
|
|
||||||
elif np.issubdtype(value.dtype, np.unicode_):
|
|
||||||
value = [str(item) for item in value] if len(value) > 1 else str(value[0])
|
|
||||||
elif np.issubdtype(value.dtype, np.integer):
|
|
||||||
value = [int(item) for item in value] if len(value) > 1 else int(value[0])
|
|
||||||
elif np.issubdtype(value.dtype, np.floating):
|
|
||||||
value = [float(item) for item in value] if len(value) > 1 else float(value[0])
|
|
||||||
else:
|
|
||||||
print('Yaml-compatible data-type was not found. Value has been set to NaN.')
|
|
||||||
value = np.nan
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
print(f'Error converting value: {e}. Value has been set to NaN.')
|
|
||||||
value = np.nan
|
|
||||||
|
|
||||||
return value
|
|
||||||
|
|
||||||
def is_structured_array(attr_val):
|
|
||||||
if isinstance(attr_val,np.ndarray):
|
|
||||||
return True if attr_val.dtype.names is not None else False
|
|
||||||
else:
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def construct_attributes_dict(attrs_obj):
|
|
||||||
|
|
||||||
attr_dict = {}
|
|
||||||
for key, value in attrs_obj.items():
|
|
||||||
attr_dict[key] = {}
|
|
||||||
if not key in ['file_list','filtered_file_list']:
|
|
||||||
|
|
||||||
if is_structured_array(value):
|
|
||||||
#for subattr in value.dtype.names:
|
|
||||||
#attr_dict[key][subattr] = make_dtype_yaml_compatible(value[subattr])
|
|
||||||
attr_dict[key] = to_serializable_dtype(value)
|
|
||||||
else:
|
|
||||||
attr_dict[key] = {"rename_as" : key,
|
|
||||||
"value" : to_serializable_dtype(value)
|
|
||||||
}
|
|
||||||
|
|
||||||
#if isinstance(value,str):
|
|
||||||
# value.replace('\\','\\\\')
|
|
||||||
|
|
||||||
|
|
||||||
return attr_dict
|
|
||||||
|
|
||||||
def print_metadata(name, obj, folder_depth, yaml_dict):
|
|
||||||
|
|
||||||
# TODO: should we enable deeper folders ?
|
|
||||||
if len(obj.name.split('/')) <= folder_depth:
|
|
||||||
name_to_list = obj.name.split('/')
|
|
||||||
name_head = name_to_list[-1]
|
|
||||||
|
|
||||||
if isinstance(obj,h5py.Group):
|
|
||||||
#print('name:', obj.name)
|
|
||||||
#print('attributes:', dict(obj.attrs))
|
|
||||||
#attr_dict = {}
|
|
||||||
group_dict = {}
|
|
||||||
|
|
||||||
attr_dict = construct_attributes_dict(obj.attrs)
|
|
||||||
|
|
||||||
#for key, value in obj.attrs.items():
|
|
||||||
#print (key, value.dtype)
|
|
||||||
# if key == 'Layout':
|
|
||||||
# print(value)
|
|
||||||
|
|
||||||
# if not key in ['file_list','filtered_file_list']:
|
|
||||||
|
|
||||||
# value = make_dtype_yaml_compatible(value)
|
|
||||||
|
|
||||||
# attr_dict[key] = {'rename_as' : key,
|
|
||||||
# 'value' : value
|
|
||||||
# }
|
|
||||||
|
|
||||||
#group_dict[obj.name] = {'name': obj.name, 'attributes': attr_dict}
|
|
||||||
group_dict = {"name": name_head, "attributes": attr_dict, "datasets":{}}
|
|
||||||
#group_dict[obj.name]["name"] = obj.name
|
|
||||||
#group_dict[obj.name]["attributes"] = attr_dict
|
|
||||||
#group_dict[obj.name]["datasets"] = {}
|
|
||||||
#print(name)
|
|
||||||
|
|
||||||
yaml_dict[obj.name] = group_dict
|
|
||||||
elif isinstance(obj, h5py.Dataset):
|
|
||||||
parent_name = '/'.join(name_to_list[:-1])
|
|
||||||
yaml_dict[parent_name]["datasets"][name_head] = {"rename_as": name_head ,"attributes": construct_attributes_dict(obj.attrs)}
|
|
||||||
#print(yaml.dump(group_dict,sort_keys=False))
|
|
||||||
|
|
||||||
#elif len(obj.name.split('/')) == 3:
|
|
||||||
# print(yaml.dump())
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def take_yml_snapshot_of_hdf5_file(input_filename_path,folder_depth: int = 4):
|
|
||||||
|
|
||||||
yaml_dict = {}
|
|
||||||
|
|
||||||
output_filename_tail, ext = os.path.splitext(input_filename_path)
|
|
||||||
|
|
||||||
with h5py.File(input_filename_path,'r') as f:
|
|
||||||
|
|
||||||
attrs_dict = construct_attributes_dict(f.attrs)
|
|
||||||
yaml_dict[f.name] = {"name": f.name, "attributes": attrs_dict, "datasets":{}}
|
|
||||||
f.visititems(lambda name, obj: print_metadata(name,obj,folder_depth,yaml_dict))
|
|
||||||
|
|
||||||
#with open(output_filename_tail+".json","w") as yaml_file:
|
|
||||||
# json_obj = json.dumps(yaml_dict,indent=4,sort_keys=False,)
|
|
||||||
# yaml_file.write(json_obj)
|
|
||||||
|
|
||||||
with open(output_filename_tail+".yaml","w") as yaml_file:
|
|
||||||
yaml_output = yaml.dump(yaml_dict,sort_keys=False)
|
|
||||||
#for key in yaml_dict:
|
|
||||||
# yaml_output = yaml.dump(yaml_dict[key],sort_keys=False)
|
|
||||||
yaml_file.write(yaml_output )
|
|
||||||
|
|
||||||
return output_filename_tail+".yaml"
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user