Added new filereader dictionary pair for nasames files. This is a first version that may change.
This commit is contained in:
9
instruments/dictionaries/EBAS.yaml
Normal file
9
instruments/dictionaries/EBAS.yaml
Normal file
@ -0,0 +1,9 @@
|
|||||||
|
table_header :
|
||||||
|
start_time:
|
||||||
|
description: Start time of the buffer (includes milliseconds)
|
||||||
|
datetime_format: "%Y-%m-%d %H-%M-%S"
|
||||||
|
data_type: 'datetime'
|
||||||
|
end_time:
|
||||||
|
description: Start time of the buffer (includes milliseconds)
|
||||||
|
datetime_format: "%Y-%m-%d %H-%M-%S"
|
||||||
|
data_type: 'datetime'
|
171
instruments/readers/nasa_ames_reader.py
Normal file
171
instruments/readers/nasa_ames_reader.py
Normal file
@ -0,0 +1,171 @@
|
|||||||
|
import sys
|
||||||
|
import os
|
||||||
|
|
||||||
|
try:
|
||||||
|
thisFilePath = os.path.abspath(__file__)
|
||||||
|
except NameError:
|
||||||
|
print("Error: __file__ is not available. Ensure the script is being run from a file.")
|
||||||
|
print("[Notice] Path to DIMA package may not be resolved properly.")
|
||||||
|
thisFilePath = os.getcwd() # Use current directory or specify a default
|
||||||
|
|
||||||
|
dimaPath = os.path.normpath(os.path.join(thisFilePath, "..",'..','..')) # Move up to project root
|
||||||
|
|
||||||
|
if dimaPath not in sys.path: # Avoid duplicate entries
|
||||||
|
sys.path.insert(0,dimaPath)
|
||||||
|
|
||||||
|
import pandas as pd
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
import yaml
|
||||||
|
import h5py
|
||||||
|
import logging
|
||||||
|
import argparse
|
||||||
|
|
||||||
|
|
||||||
|
import utils.g5505_utils as utils
|
||||||
|
|
||||||
|
def read_nasa_ames_as_dict(filename, instruments_dir: str = None, work_with_copy: bool = True):
|
||||||
|
|
||||||
|
# If instruments_dir is not provided, use the default path relative to the module directory
|
||||||
|
if not instruments_dir:
|
||||||
|
# Assuming the instruments folder is one level up from the source module directory
|
||||||
|
module_dir = os.path.dirname(__file__)
|
||||||
|
instruments_dir = os.path.join(module_dir, '..')
|
||||||
|
|
||||||
|
# Normalize the path (resolves any '..' in the path)
|
||||||
|
instrument_configs_path = os.path.abspath(os.path.join(instruments_dir,'dictionaries','EBAS.yaml'))
|
||||||
|
|
||||||
|
with open(instrument_configs_path,'r') as stream:
|
||||||
|
try:
|
||||||
|
config_dict = yaml.load(stream, Loader=yaml.FullLoader)
|
||||||
|
except yaml.YAMLError as exc:
|
||||||
|
print(exc)
|
||||||
|
# Get dictonary of terms to describe header variables from nasa ames file
|
||||||
|
description_dict = config_dict.get('table_header',{})
|
||||||
|
|
||||||
|
# Read all lines once
|
||||||
|
with open(filename, 'r') as file:
|
||||||
|
lines = file.readlines()
|
||||||
|
|
||||||
|
# Extract header length from the first line
|
||||||
|
header_length = int(lines[0].split()[0])
|
||||||
|
file_header = lines[:header_length]
|
||||||
|
|
||||||
|
# Extract start date from line 7
|
||||||
|
date_header = lines[6].split()
|
||||||
|
start_date_str = f"{date_header[0]}-{date_header[1]}-{date_header[2]}"
|
||||||
|
start_date = datetime.strptime(start_date_str, "%Y-%m-%d")
|
||||||
|
|
||||||
|
# Extract number of dependent variables from line 10
|
||||||
|
num_dep_vars = int(lines[9].split()[0])
|
||||||
|
|
||||||
|
# Get variable names: start_time + vars from lines 13 to 13+num_dep_vars-1 (zero-indexed: 12 to 12+num_dep_vars)
|
||||||
|
vars_list = ["start_time"] + [lines[i].strip() for i in range(12, 12 + num_dep_vars)]
|
||||||
|
|
||||||
|
# Get the last line of the header (data column names)
|
||||||
|
dat_head_line = lines[header_length - 1]
|
||||||
|
dat_head = [x for x in dat_head_line.split() if x]
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Read the data using pandas, skipping the header
|
||||||
|
df = pd.read_csv(filename,
|
||||||
|
sep="\s+",
|
||||||
|
header=header_length - 1,
|
||||||
|
skip_blank_lines=True)
|
||||||
|
# Compute actual datetime from start_time and (if present) end_time
|
||||||
|
df['start_time'] = df['start_time'].apply(lambda x: start_date + timedelta(days=x))
|
||||||
|
if 'end_time' in df.columns:
|
||||||
|
df['end_time'] = df['end_time'].apply(lambda x: start_date + timedelta(days=x))
|
||||||
|
|
||||||
|
# Create header metadata dictionary
|
||||||
|
header_metadata_dict = {
|
||||||
|
'header_length': header_length,
|
||||||
|
'start_date': start_date_str,
|
||||||
|
'num_dep_vars': num_dep_vars,
|
||||||
|
'variable_names': vars_list,
|
||||||
|
'raw_header': file_header
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
file_dict = {}
|
||||||
|
path_tail, path_head = os.path.split(filename)
|
||||||
|
|
||||||
|
file_dict['name'] = path_head
|
||||||
|
# TODO: review this header dictionary, it may not be the best way to represent header data
|
||||||
|
file_dict['attributes_dict'] = header_metadata_dict
|
||||||
|
file_dict['datasets'] = []
|
||||||
|
####
|
||||||
|
|
||||||
|
import utils.g5505_utils as utils
|
||||||
|
#if numerical_variables:
|
||||||
|
dataset = {}
|
||||||
|
dataset['name'] = 'data_table'#_numerical_variables'
|
||||||
|
dataset['data'] = utils.convert_dataframe_to_np_structured_array(df) #df_numerical_attrs.to_numpy()
|
||||||
|
dataset['shape'] = dataset['data'].shape
|
||||||
|
dataset['dtype'] = type(dataset['data'])
|
||||||
|
|
||||||
|
# Create attribute descriptions based on description_dict
|
||||||
|
dataset['attributes'] = {}
|
||||||
|
|
||||||
|
# Annotate column headers if description_dict is non empty
|
||||||
|
if description_dict:
|
||||||
|
for column_name in df.columns:
|
||||||
|
column_attr_dict = description_dict.get(column_name,
|
||||||
|
{'note':'there was no description available. Review instrument files.'})
|
||||||
|
dataset['attributes'].update({column_name: utils.convert_attrdict_to_np_structured_array(column_attr_dict)})
|
||||||
|
|
||||||
|
file_dict['datasets'].append(dataset)
|
||||||
|
|
||||||
|
except:
|
||||||
|
return {}
|
||||||
|
|
||||||
|
return file_dict
|
||||||
|
#return header_metadata, df
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
|
||||||
|
from src.hdf5_ops import save_file_dict_to_hdf5
|
||||||
|
from utils.g5505_utils import created_at
|
||||||
|
|
||||||
|
# Set up argument parsing
|
||||||
|
parser = argparse.ArgumentParser(description="Data ingestion process to HDF5 files.")
|
||||||
|
parser.add_argument('dst_file_path', type=str, help="Path to the target HDF5 file.")
|
||||||
|
parser.add_argument('src_file_path', type=str, help="Relative path to source file to be saved to target HDF5 file.")
|
||||||
|
parser.add_argument('dst_group_name', type=str, help="Group name '/instFolder/[category]/fileName' in the target HDF5 file.")
|
||||||
|
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
hdf5_file_path = args.dst_file_path
|
||||||
|
src_file_path = args.src_file_path
|
||||||
|
dst_group_name = args.dst_group_name
|
||||||
|
default_mode = 'r+'
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Read source file and return an internal dictionary representation
|
||||||
|
idr_dict = read_nasa_ames_as_dict(src_file_path)
|
||||||
|
|
||||||
|
if not os.path.exists(hdf5_file_path):
|
||||||
|
default_mode = 'w'
|
||||||
|
|
||||||
|
print(f'Opening HDF5 file: {hdf5_file_path} in mode {default_mode}')
|
||||||
|
|
||||||
|
with h5py.File(hdf5_file_path, mode=default_mode, track_order=True) as hdf5_file_obj:
|
||||||
|
try:
|
||||||
|
# Create group if it does not exist
|
||||||
|
if dst_group_name not in hdf5_file_obj:
|
||||||
|
hdf5_file_obj.create_group(dst_group_name)
|
||||||
|
hdf5_file_obj[dst_group_name].attrs['creation_date'] = created_at().encode('utf-8')
|
||||||
|
print(f'Created new group: {dst_group_name}')
|
||||||
|
else:
|
||||||
|
print(f'Group {dst_group_name} already exists. Proceeding with data transfer...')
|
||||||
|
|
||||||
|
except Exception as inst:
|
||||||
|
logging.error('Failed to create group %s in HDF5: %s', dst_group_name, inst)
|
||||||
|
|
||||||
|
# Save dictionary to HDF5
|
||||||
|
save_file_dict_to_hdf5(hdf5_file_obj, dst_group_name, idr_dict)
|
||||||
|
print(f'Completed saving file dict with keys: {idr_dict.keys()}')
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logging.error('File reader failed to process %s: %s', src_file_path, e)
|
||||||
|
print(f'File reader failed to process {src_file_path}. See logs for details.')
|
Reference in New Issue
Block a user