diff --git a/instruments/dictionaries/EBAS.yaml b/instruments/dictionaries/EBAS.yaml new file mode 100644 index 0000000..c17ec81 --- /dev/null +++ b/instruments/dictionaries/EBAS.yaml @@ -0,0 +1,9 @@ +table_header : + start_time: + description: Start time of the buffer (includes milliseconds) + datetime_format: "%Y-%m-%d %H-%M-%S" + data_type: 'datetime' + end_time: + description: Start time of the buffer (includes milliseconds) + datetime_format: "%Y-%m-%d %H-%M-%S" + data_type: 'datetime' \ No newline at end of file diff --git a/instruments/readers/nasa_ames_reader.py b/instruments/readers/nasa_ames_reader.py new file mode 100644 index 0000000..2850b83 --- /dev/null +++ b/instruments/readers/nasa_ames_reader.py @@ -0,0 +1,171 @@ +import sys +import os + +try: + thisFilePath = os.path.abspath(__file__) +except NameError: + print("Error: __file__ is not available. Ensure the script is being run from a file.") + print("[Notice] Path to DIMA package may not be resolved properly.") + thisFilePath = os.getcwd() # Use current directory or specify a default + +dimaPath = os.path.normpath(os.path.join(thisFilePath, "..",'..','..')) # Move up to project root + +if dimaPath not in sys.path: # Avoid duplicate entries + sys.path.insert(0,dimaPath) + +import pandas as pd +from datetime import datetime, timedelta +import yaml +import h5py +import logging +import argparse + + +import utils.g5505_utils as utils + +def read_nasa_ames_as_dict(filename, instruments_dir: str = None, work_with_copy: bool = True): + + # If instruments_dir is not provided, use the default path relative to the module directory + if not instruments_dir: + # Assuming the instruments folder is one level up from the source module directory + module_dir = os.path.dirname(__file__) + instruments_dir = os.path.join(module_dir, '..') + + # Normalize the path (resolves any '..' in the path) + instrument_configs_path = os.path.abspath(os.path.join(instruments_dir,'dictionaries','EBAS.yaml')) + + with open(instrument_configs_path,'r') as stream: + try: + config_dict = yaml.load(stream, Loader=yaml.FullLoader) + except yaml.YAMLError as exc: + print(exc) + # Get dictonary of terms to describe header variables from nasa ames file + description_dict = config_dict.get('table_header',{}) + + # Read all lines once + with open(filename, 'r') as file: + lines = file.readlines() + + # Extract header length from the first line + header_length = int(lines[0].split()[0]) + file_header = lines[:header_length] + + # Extract start date from line 7 + date_header = lines[6].split() + start_date_str = f"{date_header[0]}-{date_header[1]}-{date_header[2]}" + start_date = datetime.strptime(start_date_str, "%Y-%m-%d") + + # Extract number of dependent variables from line 10 + num_dep_vars = int(lines[9].split()[0]) + + # Get variable names: start_time + vars from lines 13 to 13+num_dep_vars-1 (zero-indexed: 12 to 12+num_dep_vars) + vars_list = ["start_time"] + [lines[i].strip() for i in range(12, 12 + num_dep_vars)] + + # Get the last line of the header (data column names) + dat_head_line = lines[header_length - 1] + dat_head = [x for x in dat_head_line.split() if x] + + try: + # Read the data using pandas, skipping the header + df = pd.read_csv(filename, + sep="\s+", + header=header_length - 1, + skip_blank_lines=True) + # Compute actual datetime from start_time and (if present) end_time + df['start_time'] = df['start_time'].apply(lambda x: start_date + timedelta(days=x)) + if 'end_time' in df.columns: + df['end_time'] = df['end_time'].apply(lambda x: start_date + timedelta(days=x)) + + # Create header metadata dictionary + header_metadata_dict = { + 'header_length': header_length, + 'start_date': start_date_str, + 'num_dep_vars': num_dep_vars, + 'variable_names': vars_list, + 'raw_header': file_header + } + + + file_dict = {} + path_tail, path_head = os.path.split(filename) + + file_dict['name'] = path_head + # TODO: review this header dictionary, it may not be the best way to represent header data + file_dict['attributes_dict'] = header_metadata_dict + file_dict['datasets'] = [] + #### + + import utils.g5505_utils as utils + #if numerical_variables: + dataset = {} + dataset['name'] = 'data_table'#_numerical_variables' + dataset['data'] = utils.convert_dataframe_to_np_structured_array(df) #df_numerical_attrs.to_numpy() + dataset['shape'] = dataset['data'].shape + dataset['dtype'] = type(dataset['data']) + + # Create attribute descriptions based on description_dict + dataset['attributes'] = {} + + # Annotate column headers if description_dict is non empty + if description_dict: + for column_name in df.columns: + column_attr_dict = description_dict.get(column_name, + {'note':'there was no description available. Review instrument files.'}) + dataset['attributes'].update({column_name: utils.convert_attrdict_to_np_structured_array(column_attr_dict)}) + + file_dict['datasets'].append(dataset) + + except: + return {} + + return file_dict + #return header_metadata, df + + +if __name__ == "__main__": + + from src.hdf5_ops import save_file_dict_to_hdf5 + from utils.g5505_utils import created_at + + # Set up argument parsing + parser = argparse.ArgumentParser(description="Data ingestion process to HDF5 files.") + parser.add_argument('dst_file_path', type=str, help="Path to the target HDF5 file.") + parser.add_argument('src_file_path', type=str, help="Relative path to source file to be saved to target HDF5 file.") + parser.add_argument('dst_group_name', type=str, help="Group name '/instFolder/[category]/fileName' in the target HDF5 file.") + + args = parser.parse_args() + + hdf5_file_path = args.dst_file_path + src_file_path = args.src_file_path + dst_group_name = args.dst_group_name + default_mode = 'r+' + + try: + # Read source file and return an internal dictionary representation + idr_dict = read_nasa_ames_as_dict(src_file_path) + + if not os.path.exists(hdf5_file_path): + default_mode = 'w' + + print(f'Opening HDF5 file: {hdf5_file_path} in mode {default_mode}') + + with h5py.File(hdf5_file_path, mode=default_mode, track_order=True) as hdf5_file_obj: + try: + # Create group if it does not exist + if dst_group_name not in hdf5_file_obj: + hdf5_file_obj.create_group(dst_group_name) + hdf5_file_obj[dst_group_name].attrs['creation_date'] = created_at().encode('utf-8') + print(f'Created new group: {dst_group_name}') + else: + print(f'Group {dst_group_name} already exists. Proceeding with data transfer...') + + except Exception as inst: + logging.error('Failed to create group %s in HDF5: %s', dst_group_name, inst) + + # Save dictionary to HDF5 + save_file_dict_to_hdf5(hdf5_file_obj, dst_group_name, idr_dict) + print(f'Completed saving file dict with keys: {idr_dict.keys()}') + + except Exception as e: + logging.error('File reader failed to process %s: %s', src_file_path, e) + print(f'File reader failed to process {src_file_path}. See logs for details.')