Files
dima/instruments/readers/nasa_ames_reader.py

268 lines
10 KiB
Python

import sys
import os
try:
thisFilePath = os.path.abspath(__file__)
except NameError:
print("Error: __file__ is not available. Ensure the script is being run from a file.")
print("[Notice] Path to DIMA package may not be resolved properly.")
thisFilePath = os.getcwd() # Use current directory or specify a default
dimaPath = os.path.normpath(os.path.join(thisFilePath, "..",'..','..')) # Move up to project root
if dimaPath not in sys.path: # Avoid duplicate entries
sys.path.insert(0,dimaPath)
import pandas as pd
from datetime import datetime, timedelta
import yaml
import h5py
import logging
import argparse
import utils.g5505_utils as utils
def split_header(header_lines):
header_lines_copy = []
for line in header_lines:
if isinstance(line, bytes):
decoded_line = line.decode('utf-8', errors='ignore').strip()
header_lines_copy.append(decoded_line)
else:
header_lines_copy.append(line.strip())
# Find the index where the variable descriptions start
var_desc_marker = "Days from the file reference point (start_time)"
try:
var_start_idx = header_lines_copy.index(var_desc_marker)
except ValueError:
raise Exception("Expected variable description marker not found.")
# Part 1: Everything before variable description
part1 = header_lines[:var_start_idx]
# Part 2: Variable descriptions — until the first key-value line (contains ':')
part2 = []
part3 = []
in_part3 = False
for line in header_lines[var_start_idx:]:
if not in_part3 and ':' in line.decode(encoding = "utf-8"):
in_part3 = True # We assume this is where key-value pairs begin
if in_part3:
part3.append(line)
else:
part2.append(line)
return part1, part2, part3
def extract_var_descriptions(part2):
nvars = int(part2[1].decode(encoding='utf-8').strip())
if not sum(float(i) for i in part2[2].decode(encoding='utf-8').strip().split()) == nvars:
line1 = part2[1].decode(encoding='utf-8')
line2 = part2[2].decode(encoding='utf-8')
raise RuntimeError(f'Inconsistent lines. Check lines {line1} and {line2}')
descriptions = []
for line_idx in range(4,4+nvars):
descriptions.append(part2[line_idx])
return descriptions
def read_nasa_ames_as_dict(filename, instruments_dir: str = None, work_with_copy: bool = True):
# If instruments_dir is not provided, use the default path relative to the module directory
if not instruments_dir:
# Assuming the instruments folder is one level up from the source module directory
module_dir = os.path.dirname(__file__)
instruments_dir = os.path.join(module_dir, '..')
# Normalize the path (resolves any '..' in the path)
instrument_configs_path = os.path.abspath(os.path.join(instruments_dir,'dictionaries','EBAS.yaml'))
with open(instrument_configs_path,'r') as stream:
try:
config_dict = yaml.load(stream, Loader=yaml.FullLoader)
except yaml.YAMLError as exc:
print(exc)
# Get dictonary of terms to describe header variables from nasa ames file
description_dict = config_dict.get('table_header',{})
# Read all lines once
with open(filename, 'rb') as file:
lines = file.readlines()
# Extract header length from the first line
header_length = int(lines[0].split()[0])
file_header = lines[:header_length]
# Split header in three parts, header preamble, var descriptions, and metadata pairs
part1, part2, part3 = split_header(file_header)
var_descriptions = extract_var_descriptions(part2)
table_header = part3[len(part3)-1]
part3.remove(table_header)
for line in part3:
if 'Startdate:' in line.decode(encoding = "utf-8"):
line_parts = line.decode(encoding = "utf-8").split(':',1)
attribute_name = line_parts[0]
attribute_value = line_parts[1]
print(attribute_name,attribute_value)
#date_header = lines[6].split()
# Split the string by '.'
#filename_parts = attribute_value.split('.')
# Extract the datetime strings
start_str = attribute_value.strip()
#end_str = filename_parts[2]
# Parse into datetime objects
start_date = datetime.strptime(start_str, "%Y%m%d%H%M%S")
#end_date = datetime.strptime(end_str, "%Y%m%d%H%M%S")
#start_date_str = f"{date_header[0]}-{date_header[1]}-{date_header[2]}"
#start_date = datetime.strptime(start_date_str, "%Y-%m-%d")
# Extract number of dependent variables from line 10
#num_dep_vars = int(lines[9].split()[0])
# Get variable names: start_time + vars from lines 13 to 13+num_dep_vars-1 (zero-indexed: 12 to 12+num_dep_vars)
vars_list = table_header.decode(encoding="utf-8").strip().split() #["start_time"] + [lines[i].strip() for i in range(12, 12 + num_dep_vars)]
# Get the last line of the header (data column names)
dat_head_line = lines[header_length - 1]
dat_head = [x for x in dat_head_line.split() if x]
try:
# Read the data using pandas, skipping the header
df = pd.read_csv(filename,
sep="\s+",
header=header_length - 1,
skip_blank_lines=True)
df['start_time'] = df['start_time'].astype(str).str.strip()
df['end_time'] = df['end_time'].astype(str).str.strip()
df['start_time'] = pd.to_numeric(df['start_time'], errors='coerce')
df['end_time'] = pd.to_numeric(df['end_time'], errors='coerce')
# Compute actual datetime from start_time and (if present) end_time
df['start_time'] = df['start_time'].apply(
lambda x: start_date + timedelta(days=x) if pd.notna(x) else pd.NaT
)
if 'end_time' in df.columns:
df['end_time'] = df['end_time'].apply(
lambda x: start_date + timedelta(days=x) if pd.notna(x) else pd.NaT
)
# Create header metadata dictionary
header_metadata_dict = {
'header_length': header_length,
'start_date': start_str,
#'num_dep_vars': num_dep_vars,
'variable_names': vars_list,
'variable_descriptions' : var_descriptions,
'raw_header_part1': part1,
'raw_header_part2': part2,
'raw_header_part3': part3
}
file_dict = {}
path_tail, path_head = os.path.split(filename)
file_dict['name'] = path_head
# TODO: review this header dictionary, it may not be the best way to represent header data
file_dict['attributes_dict'] = header_metadata_dict
file_dict['datasets'] = []
####
import utils.g5505_utils as utils
#if numerical_variables:
dataset = {}
dataset['name'] = 'data_table'#_numerical_variables'
dataset['data'] = utils.convert_dataframe_to_np_structured_array(df) #df_numerical_attrs.to_numpy()
dataset['shape'] = dataset['data'].shape
dataset['dtype'] = type(dataset['data'])
# Create attribute descriptions based on description_dict
dataset['attributes'] = {}
# Annotate column headers if description_dict is non empty
if description_dict:
for column_name in df.columns:
column_attr_dict = description_dict.get(column_name,
{'note':'there was no description available. Review instrument files.'})
dataset['attributes'].update({column_name: utils.convert_attrdict_to_np_structured_array(column_attr_dict)})
file_dict['datasets'].append(dataset)
except:
return {}
return file_dict
#return header_metadata, df
if __name__ == "__main__":
from src.hdf5_ops import save_file_dict_to_hdf5
from utils.g5505_utils import created_at
# Set up argument parsing
parser = argparse.ArgumentParser(description="Data ingestion process to HDF5 files.")
parser.add_argument('dst_file_path', type=str, help="Path to the target HDF5 file.")
parser.add_argument('src_file_path', type=str, help="Relative path to source file to be saved to target HDF5 file.")
parser.add_argument('dst_group_name', type=str, help="Group name '/instFolder/[category]/fileName' in the target HDF5 file.")
args = parser.parse_args()
hdf5_file_path = args.dst_file_path
src_file_path = args.src_file_path
dst_group_name = args.dst_group_name
default_mode = 'r+'
try:
# Read source file and return an internal dictionary representation
idr_dict = read_nasa_ames_as_dict(src_file_path)
if not os.path.exists(hdf5_file_path):
default_mode = 'w'
print(f'Opening HDF5 file: {hdf5_file_path} in mode {default_mode}')
with h5py.File(hdf5_file_path, mode=default_mode, track_order=True) as hdf5_file_obj:
try:
# Create group if it does not exist
if dst_group_name not in hdf5_file_obj:
hdf5_file_obj.create_group(dst_group_name)
hdf5_file_obj[dst_group_name].attrs['creation_date'] = created_at().encode('utf-8')
print(f'Created new group: {dst_group_name}')
else:
print(f'Group {dst_group_name} already exists. Proceeding with data transfer...')
except Exception as inst:
logging.error('Failed to create group %s in HDF5: %s', dst_group_name, inst)
# Save dictionary to HDF5
save_file_dict_to_hdf5(hdf5_file_obj, dst_group_name, idr_dict)
print(f'Completed saving file dict with keys: {idr_dict.keys()}')
except Exception as e:
logging.error('File reader failed to process %s: %s', src_file_path, e)
print(f'File reader failed to process {src_file_path}. See logs for details.')