Add new file reader instruments/readers/structured_file_reader.py, and update registry.py and yaml
This commit is contained in:
@ -16,8 +16,9 @@ from instruments.readers.g5505_text_reader import read_txt_files_as_dict
|
||||
from instruments.readers.acsm_tofware_reader import read_acsm_files_as_dict
|
||||
from instruments.readers.acsm_flag_reader import read_jsonflag_as_dict
|
||||
from instruments.readers.nasa_ames_reader import read_nasa_ames_as_dict
|
||||
from instruments.readers.structured_file_reader import read_structured_file_as_dict
|
||||
|
||||
file_extensions = ['.ibw','.txt','.dat','.h5','.TXT','.csv','.pkl','.json','.yaml','.nas']
|
||||
file_extensions = ['.ibw','.txt','.dat','.h5','.TXT','.csv','.pkl','.json','.yaml','yml','.nas']
|
||||
|
||||
# Define the instruments directory (modify this as needed or set to None)
|
||||
default_instruments_dir = None # or provide an absolute path
|
||||
@ -27,6 +28,9 @@ file_readers = {
|
||||
'txt': lambda a1: read_txt_files_as_dict(a1, instruments_dir=default_instruments_dir, work_with_copy=False),
|
||||
'dat': lambda a1: read_txt_files_as_dict(a1, instruments_dir=default_instruments_dir, work_with_copy=False),
|
||||
'csv': lambda a1: read_txt_files_as_dict(a1, instruments_dir=default_instruments_dir, work_with_copy=False),
|
||||
'yaml': lambda a1: read_structured_file_as_dict(a1),
|
||||
'yml': lambda a1: read_structured_file_as_dict(a1),
|
||||
'json': lambda a1: read_structured_file_as_dict(a1),
|
||||
'ACSM_TOFWARE_txt' : lambda x: read_acsm_files_as_dict(x, instruments_dir=default_instruments_dir, work_with_copy=False),
|
||||
'ACSM_TOFWARE_csv' : lambda x: read_acsm_files_as_dict(x, instruments_dir=default_instruments_dir, work_with_copy=False),
|
||||
'ACSM_TOFWARE_flags_json' : lambda x: read_jsonflag_as_dict(x),
|
||||
@ -52,7 +56,7 @@ def find_reader(instrument_folder, file_extension):
|
||||
registry = load_registry()
|
||||
|
||||
for entry in registry:
|
||||
if entry["instrumentFolderName"] == instrument_folder and entry["fileExtension"] == file_extension:
|
||||
if entry["instrumentFolderName"] == instrument_folder and (file_extension in entry["fileExtension"].split(sep=',')):
|
||||
return entry["fileReaderPath"], entry["InstrumentDictionaryPath"]
|
||||
|
||||
return None, None # Not found
|
||||
|
113
instruments/readers/structured_file_reader.py
Normal file
113
instruments/readers/structured_file_reader.py
Normal file
@ -0,0 +1,113 @@
|
||||
import sys
|
||||
import os
|
||||
|
||||
try:
|
||||
thisFilePath = os.path.abspath(__file__)
|
||||
except NameError:
|
||||
print("Error: __file__ is not available. Ensure the script is being run from a file.")
|
||||
print("[Notice] Path to DIMA package may not be resolved properly.")
|
||||
thisFilePath = os.getcwd() # Use current directory or specify a default
|
||||
|
||||
dimaPath = os.path.normpath(os.path.join(thisFilePath, "..",'..','..')) # Move up to project root
|
||||
|
||||
if dimaPath not in sys.path: # Avoid duplicate entries
|
||||
sys.path.insert(0,dimaPath)
|
||||
|
||||
import pandas as pd
|
||||
import json, yaml
|
||||
import h5py
|
||||
import argparse
|
||||
import logging
|
||||
|
||||
import utils.g5505_utils as utils
|
||||
|
||||
def read_structured_file_as_dict(path_to_file):
|
||||
"""
|
||||
Reads a JSON or YAML file, flattens nested structures using pandas.json_normalize,
|
||||
converts to a NumPy structured array via utils.convert_attrdict_to_np_structured_array,
|
||||
and returns a standardized dictionary.
|
||||
"""
|
||||
|
||||
file_dict = {}
|
||||
_, path_head = os.path.split(path_to_file)
|
||||
|
||||
file_dict['name'] = path_head
|
||||
file_dict['attributes_dict'] = {}
|
||||
file_dict['datasets'] = []
|
||||
|
||||
try:
|
||||
with open(path_to_file, 'r') as stream:
|
||||
if path_to_file.endswith(('.yaml', '.yml')):
|
||||
raw_data = yaml.safe_load(stream)
|
||||
elif path_to_file.endswith('.json'):
|
||||
raw_data = json.load(stream)
|
||||
else:
|
||||
raise ValueError(f"Unsupported file type: {path_to_file}")
|
||||
except Exception as exc:
|
||||
logging.error("Failed to load input file %s: %s", path_to_file, exc)
|
||||
raise
|
||||
|
||||
try:
|
||||
df = pd.json_normalize(raw_data)
|
||||
except Exception as exc:
|
||||
logging.error("Failed to normalize data structure: %s", exc)
|
||||
raise
|
||||
|
||||
try:
|
||||
structured_array = utils.convert_attrdict_to_np_structured_array(df.to_dict(orient='records'))
|
||||
except Exception as exc:
|
||||
logging.error("Failed to convert to structured array: %s", exc)
|
||||
raise
|
||||
|
||||
dataset = {
|
||||
'name': 'data_table',
|
||||
'data': structured_array,
|
||||
'shape': structured_array.shape,
|
||||
'dtype': type(structured_array)
|
||||
}
|
||||
|
||||
file_dict['datasets'].append(dataset)
|
||||
return file_dict
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
from src.hdf5_ops import save_file_dict_to_hdf5
|
||||
from utils.g5505_utils import created_at
|
||||
|
||||
parser = argparse.ArgumentParser(description="Data ingestion process to HDF5 files.")
|
||||
parser.add_argument('dst_file_path', type=str, help="Path to the target HDF5 file.")
|
||||
parser.add_argument('src_file_path', type=str, help="Relative path to source file to be saved to target HDF5 file.")
|
||||
parser.add_argument('dst_group_name', type=str, help="Group name '/instFolder/[category]/fileName' in the target HDF5 file.")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
hdf5_file_path = args.dst_file_path
|
||||
src_file_path = args.src_file_path
|
||||
dst_group_name = args.dst_group_name
|
||||
default_mode = 'r+'
|
||||
|
||||
try:
|
||||
idr_dict = read_structured_file_as_dict(src_file_path)
|
||||
|
||||
if not os.path.exists(hdf5_file_path):
|
||||
default_mode = 'w'
|
||||
|
||||
print(f'Opening HDF5 file: {hdf5_file_path} in mode {default_mode}')
|
||||
|
||||
with h5py.File(hdf5_file_path, mode=default_mode, track_order=True) as hdf5_file_obj:
|
||||
try:
|
||||
if dst_group_name not in hdf5_file_obj:
|
||||
hdf5_file_obj.create_group(dst_group_name)
|
||||
hdf5_file_obj[dst_group_name].attrs['creation_date'] = created_at().encode('utf-8')
|
||||
print(f'Created new group: {dst_group_name}')
|
||||
else:
|
||||
print(f'Group {dst_group_name} already exists. Proceeding with data transfer...')
|
||||
except Exception as inst:
|
||||
logging.error('Failed to create group %s in HDF5: %s', dst_group_name, inst)
|
||||
|
||||
save_file_dict_to_hdf5(hdf5_file_obj, dst_group_name, idr_dict)
|
||||
print(f'Completed saving file dict with keys: {idr_dict.keys()}')
|
||||
|
||||
except Exception as e:
|
||||
logging.error('File reader failed to process %s: %s', src_file_path, e)
|
||||
print(f'File reader failed to process {src_file_path}. See logs for details.')
|
@ -78,3 +78,8 @@ instruments:
|
||||
fileExtension: nas
|
||||
fileReaderPath: instruments/readers/nasa_ames_reader.py
|
||||
InstrumentDictionaryPath: instruments/dictionaries/EBAS.yaml
|
||||
|
||||
- instrumentFolderName: ACSM_TOFWARE
|
||||
fileExtension: yaml,yml,json
|
||||
fileReaderPath: instruments/readers/read_structured_file_as_dict.py
|
||||
InstrumentDictionaryPath: instruments/dictionaries/EBAS.yaml
|
||||
|
Reference in New Issue
Block a user