289 lines
13 KiB
Python
289 lines
13 KiB
Python
import sys
|
|
import os
|
|
|
|
try:
|
|
thisFilePath = os.path.abspath(__file__)
|
|
except NameError:
|
|
print("Error: __file__ is not available. Ensure the script is being run from a file.")
|
|
print("[Notice] Path to DIMA package may not be resolved properly.")
|
|
thisFilePath = os.getcwd() # Use current directory or specify a default
|
|
|
|
dimaPath = os.path.normpath(os.path.join(thisFilePath, "..",'..','..')) # Move up to project root
|
|
|
|
if dimaPath not in sys.path: # Avoid duplicate entries
|
|
sys.path.insert(0,dimaPath)
|
|
|
|
import pandas as pd
|
|
import collections
|
|
import yaml
|
|
import h5py
|
|
import argparse
|
|
import logging
|
|
|
|
import utils.g5505_utils as utils
|
|
|
|
|
|
|
|
|
|
def read_acsm_files_as_dict(filename: str, instruments_dir: str = None, work_with_copy: bool = True):
|
|
# If instruments_dir is not provided, use the default path relative to the module directory
|
|
if not instruments_dir:
|
|
# Assuming the instruments folder is one level up from the source module directory
|
|
module_dir = os.path.dirname(__file__)
|
|
instruments_dir = os.path.join(module_dir, '..')
|
|
|
|
# Normalize the path (resolves any '..' in the path)
|
|
instrument_configs_path = os.path.abspath(os.path.join(instruments_dir,'dictionaries','ACSM_TOFWARE.yaml'))
|
|
|
|
with open(instrument_configs_path,'r') as stream:
|
|
try:
|
|
config_dict = yaml.load(stream, Loader=yaml.FullLoader)
|
|
except yaml.YAMLError as exc:
|
|
print(exc)
|
|
# Verify if file can be read by available intrument configurations.
|
|
#if not any(key in filename.replace(os.sep,'/') for key in config_dict.keys()):
|
|
# return {}
|
|
|
|
|
|
|
|
#TODO: this may be prone to error if assumed folder structure is non compliant
|
|
|
|
|
|
description_dict = config_dict.get('table_header',{})
|
|
|
|
file_encoding = config_dict['config_text_reader'].get('file_encoding','utf-8')
|
|
separator = config_dict['config_text_reader'].get('separator',None)
|
|
table_header = config_dict['config_text_reader'].get('table_header',None)
|
|
timestamp_variables = config_dict['config_text_reader'].get('timestamp',[])
|
|
datetime_format = config_dict['config_text_reader'].get('datetime_format',[])
|
|
|
|
|
|
|
|
# Read header as a dictionary and detect where data table starts
|
|
header_dict = {}
|
|
data_start = False
|
|
# Work with copy of the file for safety
|
|
if work_with_copy:
|
|
tmp_filename = utils.make_file_copy(source_file_path=filename)
|
|
else:
|
|
tmp_filename = filename
|
|
|
|
if not isinstance(table_header, list):
|
|
table_header = [table_header]
|
|
file_encoding = [file_encoding]
|
|
separator = [separator]
|
|
|
|
with open(tmp_filename,'rb') as f:
|
|
table_preamble = []
|
|
for line_number, line in enumerate(f):
|
|
|
|
|
|
for tb_idx, tb in enumerate(table_header):
|
|
if tb in line.decode(file_encoding[tb_idx]):
|
|
break
|
|
|
|
if tb in line.decode(file_encoding[tb_idx]):
|
|
list_of_substrings = line.decode(file_encoding[tb_idx]).split(separator[tb_idx].replace('\\t','\t'))
|
|
|
|
# Count occurrences of each substring
|
|
substring_counts = collections.Counter(list_of_substrings)
|
|
data_start = True
|
|
# Generate column names with appended index only for repeated substrings
|
|
column_names = [f"{i}_{name.strip()}" if substring_counts[name] > 1 else name.strip() for i, name in enumerate(list_of_substrings)]
|
|
|
|
#column_names = [str(i)+'_'+name.strip() for i, name in enumerate(list_of_substrings)]
|
|
#column_names = []
|
|
#for i, name in enumerate(list_of_substrings):
|
|
# column_names.append(str(i)+'_'+name)
|
|
|
|
#print(line_number, len(column_names ),'\n')
|
|
break
|
|
# Subdivide line into words, and join them by single space.
|
|
# I asumme this can produce a cleaner line that contains no weird separator characters \t \r or extra spaces and so on.
|
|
list_of_substrings = line.decode(file_encoding[tb_idx]).split()
|
|
# TODO: ideally we should use a multilinear string but the yalm parser is not recognizing \n as special character
|
|
#line = ' '.join(list_of_substrings+['\n'])
|
|
#line = ' '.join(list_of_substrings)
|
|
table_preamble.append(' '.join([item for item in list_of_substrings]))# += new_line
|
|
|
|
|
|
# TODO: it does not work with separator as none :(. fix for RGA
|
|
try:
|
|
df = pd.read_csv(tmp_filename,
|
|
delimiter = separator[tb_idx].replace('\\t','\t'),
|
|
header=line_number,
|
|
#encoding='latin-1',
|
|
encoding = file_encoding[tb_idx],
|
|
names=column_names,
|
|
skip_blank_lines=True)
|
|
|
|
df_numerical_attrs = df.select_dtypes(include ='number')
|
|
df_categorical_attrs = df.select_dtypes(exclude='number')
|
|
numerical_variables = [item for item in df_numerical_attrs.columns]
|
|
|
|
# Consolidate into single timestamp column the separate columns 'date' 'time' specified in text_data_source.yaml
|
|
if timestamp_variables:
|
|
#df_categorical_attrs['timestamps'] = [' '.join(df_categorical_attrs.loc[i,timestamp_variables].to_numpy()) for i in df.index]
|
|
#df_categorical_attrs['timestamps'] = [ df_categorical_attrs.loc[i,'0_Date']+' '+df_categorical_attrs.loc[i,'1_Time'] for i in df.index]
|
|
|
|
|
|
#df_categorical_attrs['timestamps'] = df_categorical_attrs[timestamp_variables].astype(str).agg(' '.join, axis=1)
|
|
timestamps_name = ' '.join(timestamp_variables)
|
|
df_categorical_attrs[ timestamps_name] = df_categorical_attrs[timestamp_variables].astype(str).agg(' '.join, axis=1)
|
|
|
|
valid_indices = []
|
|
if datetime_format:
|
|
df_categorical_attrs[ timestamps_name] = pd.to_datetime(df_categorical_attrs[ timestamps_name],format=datetime_format,errors='coerce')
|
|
valid_indices = df_categorical_attrs.dropna(subset=[timestamps_name]).index
|
|
df_categorical_attrs = df_categorical_attrs.loc[valid_indices,:]
|
|
df_numerical_attrs = df_numerical_attrs.loc[valid_indices,:]
|
|
|
|
df_categorical_attrs[timestamps_name] = df_categorical_attrs[timestamps_name].dt.strftime(config_dict['default']['desired_format'])
|
|
startdate = df_categorical_attrs[timestamps_name].min()
|
|
enddate = df_categorical_attrs[timestamps_name].max()
|
|
|
|
df_categorical_attrs[timestamps_name] = df_categorical_attrs[timestamps_name].astype(str)
|
|
#header_dict.update({'stastrrtdate':startdate,'enddate':enddate})
|
|
header_dict['startdate']= str(startdate)
|
|
header_dict['enddate']=str(enddate)
|
|
|
|
if len(timestamp_variables) > 1:
|
|
df_categorical_attrs = df_categorical_attrs.drop(columns = timestamp_variables)
|
|
|
|
|
|
#df_categorical_attrs.reindex(drop=True)
|
|
#df_numerical_attrs.reindex(drop=True)
|
|
|
|
|
|
|
|
categorical_variables = [item for item in df_categorical_attrs.columns]
|
|
####
|
|
#elif 'RGA' in filename:
|
|
# df_categorical_attrs = df_categorical_attrs.rename(columns={'0_Time(s)' : 'timestamps'})
|
|
|
|
###
|
|
file_dict = {}
|
|
path_tail, path_head = os.path.split(tmp_filename)
|
|
|
|
file_dict['name'] = path_head
|
|
# TODO: review this header dictionary, it may not be the best way to represent header data
|
|
file_dict['attributes_dict'] = header_dict
|
|
file_dict['datasets'] = []
|
|
####
|
|
|
|
df = pd.concat((df_categorical_attrs,df_numerical_attrs),axis=1)
|
|
|
|
#if numerical_variables:
|
|
dataset = {}
|
|
dataset['name'] = 'data_table'#_numerical_variables'
|
|
dataset['data'] = utils.convert_dataframe_to_np_structured_array(df) #df_numerical_attrs.to_numpy()
|
|
dataset['shape'] = dataset['data'].shape
|
|
dataset['dtype'] = type(dataset['data'])
|
|
#dataset['data_units'] = file_obj['wave']['data_units']
|
|
#
|
|
# Create attribute descriptions based on description_dict
|
|
dataset['attributes'] = {}
|
|
|
|
# Annotate column headers if description_dict is non empty
|
|
if description_dict:
|
|
for column_name in df.columns:
|
|
column_attr_dict = description_dict.get(column_name,
|
|
{'note':'there was no description available. Review instrument files.'})
|
|
dataset['attributes'].update({column_name: utils.convert_attrdict_to_np_structured_array(column_attr_dict)})
|
|
|
|
#try:
|
|
# dataset['attributes'] = description_dict['table_header'].copy()
|
|
# for key in description_dict['table_header'].keys():
|
|
# if not key in numerical_variables:
|
|
# dataset['attributes'].pop(key) # delete key
|
|
# else:
|
|
# dataset['attributes'][key] = utils.parse_attribute(dataset['attributes'][key])
|
|
# if timestamps_name in categorical_variables:
|
|
# dataset['attributes'][timestamps_name] = utils.parse_attribute({'unit':'YYYY-MM-DD HH:MM:SS.ffffff'})
|
|
#except ValueError as err:
|
|
# print(err)
|
|
|
|
# Represent string values as fixed length strings in the HDF5 file, which need
|
|
# to be decoded as string when we read them. It provides better control than variable strings,
|
|
# at the expense of flexibility.
|
|
# https://docs.h5py.org/en/stable/strings.html
|
|
|
|
|
|
if table_preamble:
|
|
#header_dict["table_preamble"] = utils.convert_string_to_bytes(table_preamble)
|
|
tp_dataset = {}
|
|
tp_dataset['name'] = "table_preamble"
|
|
tp_dataset['data'] = utils.convert_string_to_bytes(table_preamble)
|
|
tp_dataset['shape'] = tp_dataset['data'].shape
|
|
tp_dataset['dtype'] = type(tp_dataset['data'])
|
|
tp_dataset['attributes'] = {}
|
|
file_dict['datasets'].append(tp_dataset)
|
|
|
|
file_dict['datasets'].append(dataset)
|
|
|
|
|
|
#if categorical_variables:
|
|
# dataset = {}
|
|
# dataset['name'] = 'table_categorical_variables'
|
|
# dataset['data'] = dataframe_to_np_structured_array(df_categorical_attrs) #df_categorical_attrs.loc[:,categorical_variables].to_numpy()
|
|
# dataset['shape'] = dataset['data'].shape
|
|
# dataset['dtype'] = type(dataset['data'])
|
|
# if timestamps_name in categorical_variables:
|
|
# dataset['attributes'] = {timestamps_name: utils.parse_attribute({'unit':'YYYY-MM-DD HH:MM:SS.ffffff'})}
|
|
# file_dict['datasets'].append(dataset)
|
|
except:
|
|
return {}
|
|
|
|
return file_dict
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
from src.hdf5_ops import save_file_dict_to_hdf5
|
|
from utils.g5505_utils import created_at
|
|
|
|
# Set up argument parsing
|
|
parser = argparse.ArgumentParser(description="Data ingestion process to HDF5 files.")
|
|
parser.add_argument('dst_file_path', type=str, help="Path to the target HDF5 file.")
|
|
parser.add_argument('src_file_path', type=str, help="Relative path to source file to be saved to target HDF5 file.")
|
|
parser.add_argument('dst_group_name', type=str, help="Group name '/instFolder/[category]/fileName' in the target HDF5 file.")
|
|
|
|
args = parser.parse_args()
|
|
|
|
hdf5_file_path = args.dst_file_path
|
|
src_file_path = args.src_file_path
|
|
dst_group_name = args.dst_group_name
|
|
default_mode = 'r+'
|
|
|
|
try:
|
|
# Read source file and return an internal dictionary representation
|
|
idr_dict = read_acsm_files_as_dict(src_file_path)
|
|
|
|
if not os.path.exists(hdf5_file_path):
|
|
default_mode = 'w'
|
|
|
|
print(f'Opening HDF5 file: {hdf5_file_path} in mode {default_mode}')
|
|
|
|
with h5py.File(hdf5_file_path, mode=default_mode, track_order=True) as hdf5_file_obj:
|
|
try:
|
|
# Create group if it does not exist
|
|
if dst_group_name not in hdf5_file_obj:
|
|
hdf5_file_obj.create_group(dst_group_name)
|
|
hdf5_file_obj[dst_group_name].attrs['creation_date'] = created_at().encode('utf-8')
|
|
print(f'Created new group: {dst_group_name}')
|
|
else:
|
|
print(f'Group {dst_group_name} already exists. Proceeding with data transfer...')
|
|
|
|
except Exception as inst:
|
|
logging.error('Failed to create group %s in HDF5: %s', dst_group_name, inst)
|
|
|
|
# Save dictionary to HDF5
|
|
save_file_dict_to_hdf5(hdf5_file_obj, dst_group_name, idr_dict)
|
|
print(f'Completed saving file dict with keys: {idr_dict.keys()}')
|
|
|
|
except Exception as e:
|
|
logging.error('File reader failed to process %s: %s', src_file_path, e)
|
|
print(f'File reader failed to process {src_file_path}. See logs for details.')
|
|
|