Files
dima/instruments/readers/g5505_text_reader.py

410 lines
19 KiB
Python

import sys
import os
try:
thisFilePath = os.path.abspath(__file__)
except NameError:
print("Error: __file__ is not available. Ensure the script is being run from a file.")
print("[Notice] Path to DIMA package may not be resolved properly.")
thisFilePath = os.getcwd() # Use current directory or specify a default
dimaPath = os.path.normpath(os.path.join(thisFilePath, "..",'..','..')) # Move up to project root
if dimaPath not in sys.path: # Avoid duplicate entries
sys.path.insert(0,dimaPath)
import pandas as pd
import collections
import yaml
import h5py
import argparse
import logging
import warnings
import utils.g5505_utils as utils
def read_txt_files_as_dict(filename: str, instruments_dir: str = None, work_with_copy: bool = True):
filename = os.path.normpath(filename)
# If instruments_dir is not provided, use the default path relative to the module directory
if not instruments_dir:
# Assuming the instruments folder is one level up from the source module directory
module_dir = os.path.dirname(__file__)
instruments_dir = os.path.join(module_dir, '..')
#(config_dict,
#file_encoding,
#separator,
#table_header,
#timestamp_variables,
#datetime_format,
#description_dict) = load_file_reader_parameters(filename, instruments_dir)
format_variants, description_dict = load_file_reader_parameters(filename, instruments_dir)
# Read header as a dictionary and detect where data table starts
header_dict = {'actris_level': 0, 'processing_date':utils.created_at(), 'processing_script' : os.path.relpath(thisFilePath,dimaPath)}
data_start = False
# Work with copy of the file for safety
if work_with_copy:
tmp_filename = utils.make_file_copy(source_file_path=filename)
else:
tmp_filename = filename
# Run header detection
header_line_number, column_names, fmt_dict, table_preamble = detect_table_header_line(tmp_filename, format_variants)
# Unpack validated format info
table_header = fmt_dict['table_header']
separator = fmt_dict['separator']
file_encoding = fmt_dict['file_encoding']
timestamp_variables = fmt_dict.get('timestamp', [])
datetime_format = fmt_dict.get('datetime_format', None)
desired_datetime_fmt = fmt_dict['desired_datetime_format']
# Ensure separator is valid
if not isinstance(separator, str) or not separator.strip():
raise ValueError(f"Invalid separator found in format: {repr(separator)}")
# Load DataFrame
try:
if 'infer' not in table_header:
df = pd.read_csv(tmp_filename,
delimiter=separator,
header=header_line_number,
encoding=file_encoding,
names=column_names,
skip_blank_lines=True)
else:
df = pd.read_csv(tmp_filename,
delimiter=separator,
header=header_line_number,
encoding=file_encoding,
skip_blank_lines=True)
df_numerical_attrs = df.select_dtypes(include ='number')
df_categorical_attrs = df.select_dtypes(exclude='number')
numerical_variables = [item for item in df_numerical_attrs.columns]
# Consolidate into single timestamp column the separate columns 'date' 'time' specified in text_data_source.yaml
if timestamp_variables:
if not all(col in df_categorical_attrs.columns for col in timestamp_variables):
raise ValueError(f"Invalid timestamp columns: {[col for col in timestamp_variables if col not in df_categorical_attrs.columns]}.")
#df_categorical_attrs['timestamps'] = [' '.join(df_categorical_attrs.loc[i,timestamp_variables].to_numpy()) for i in df.index]
#df_categorical_attrs['timestamps'] = [ df_categorical_attrs.loc[i,'0_Date']+' '+df_categorical_attrs.loc[i,'1_Time'] for i in df.index]
#df_categorical_attrs['timestamps'] = df_categorical_attrs[timestamp_variables].astype(str).agg(' '.join, axis=1)
timestamps_name = ' '.join(timestamp_variables)
df_categorical_attrs[ timestamps_name] = df_categorical_attrs[timestamp_variables].astype(str).agg(' '.join, axis=1)
valid_indices = []
if datetime_format:
df_categorical_attrs[ timestamps_name] = pd.to_datetime(df_categorical_attrs[ timestamps_name],format=datetime_format,errors='coerce')
valid_indices = df_categorical_attrs.dropna(subset=[timestamps_name]).index
df_categorical_attrs = df_categorical_attrs.loc[valid_indices,:]
df_numerical_attrs = df_numerical_attrs.loc[valid_indices,:]
df_categorical_attrs[timestamps_name] = df_categorical_attrs[timestamps_name].dt.strftime(desired_datetime_fmt)
startdate = df_categorical_attrs[timestamps_name].min()
enddate = df_categorical_attrs[timestamps_name].max()
df_categorical_attrs[timestamps_name] = df_categorical_attrs[timestamps_name].astype(str)
#header_dict.update({'stastrrtdate':startdate,'enddate':enddate})
header_dict['startdate']= str(startdate)
header_dict['enddate']=str(enddate)
if len(timestamp_variables) > 1:
df_categorical_attrs = df_categorical_attrs.drop(columns = timestamp_variables)
####
#elif 'RGA' in filename:
# df_categorical_attrs = df_categorical_attrs.rename(columns={'0_Time(s)' : 'timestamps'})
###
file_dict = {}
path_tail, path_head = os.path.split(tmp_filename)
file_dict['name'] = path_head
# TODO: review this header dictionary, it may not be the best way to represent header data
file_dict['attributes_dict'] = header_dict
file_dict['datasets'] = []
####
df = pd.concat((df_categorical_attrs,df_numerical_attrs),axis=1)
#if numerical_variables:
dataset = {}
dataset['name'] = 'data_table'#_numerical_variables'
dataset['data'] = utils.convert_dataframe_to_np_structured_array(df) #df_numerical_attrs.to_numpy()
dataset['shape'] = dataset['data'].shape
dataset['dtype'] = type(dataset['data'])
#dataset['data_units'] = file_obj['wave']['data_units']
#
# Create attribute descriptions based on description_dict
dataset['attributes'] = {}
# Annotate column headers if description_dict is non empty
if description_dict:
for column_name in df.columns:
column_attr_dict = description_dict['table_header'].get(column_name,
{'note':'there was no description available. Review instrument files.'})
dataset['attributes'].update({column_name: utils.convert_attrdict_to_np_structured_array(column_attr_dict)})
#try:
# dataset['attributes'] = description_dict['table_header'].copy()
# for key in description_dict['table_header'].keys():
# if not key in numerical_variables:
# dataset['attributes'].pop(key) # delete key
# else:
# dataset['attributes'][key] = utils.parse_attribute(dataset['attributes'][key])
# if timestamps_name in categorical_variables:
# dataset['attributes'][timestamps_name] = utils.parse_attribute({'unit':'YYYY-MM-DD HH:MM:SS.ffffff'})
#except ValueError as err:
# print(err)
# Represent string values as fixed length strings in the HDF5 file, which need
# to be decoded as string when we read them. It provides better control than variable strings,
# at the expense of flexibility.
# https://docs.h5py.org/en/stable/strings.html
if table_preamble:
#header_dict["table_preamble"] = utils.convert_string_to_bytes(table_preamble)
tp_dataset = {}
tp_dataset['name'] = "table_preamble"
tp_dataset['data'] = utils.convert_string_to_bytes(table_preamble)
tp_dataset['shape'] = tp_dataset['data'].shape
tp_dataset['dtype'] = type(tp_dataset['data'])
tp_dataset['attributes'] = {}
file_dict['datasets'].append(tp_dataset)
file_dict['datasets'].append(dataset)
#if categorical_variables:
# dataset = {}
# dataset['name'] = 'table_categorical_variables'
# dataset['data'] = dataframe_to_np_structured_array(df_categorical_attrs) #df_categorical_attrs.loc[:,categorical_variables].to_numpy()
# dataset['shape'] = dataset['data'].shape
# dataset['dtype'] = type(dataset['data'])
# if timestamps_name in categorical_variables:
# dataset['attributes'] = {timestamps_name: utils.parse_attribute({'unit':'YYYY-MM-DD HH:MM:SS.ffffff'})}
# file_dict['datasets'].append(dataset)
#except Exception as e:
except Exception as e:
#raise RuntimeError(f"Failed to read file with detected format: {e}")
print(e)
return {}
return file_dict
## Supporting functions
def detect_table_header_line(filepath, format_variants, verbose=False):
"""
Tries multiple format variants to detect the table header line in the file.
Args:
filepath (str): Path to file.
format_variants (List[Dict]): Each must contain:
- 'file_encoding' (str)
- 'separator' (str)
- 'table_header' (str or list of str)
verbose (bool): If True, prints debug info.
Returns:
Tuple:
- header_line_idx (int)
- column_names (List[str])
- matched_format (Dict[str, Any]) # full format dict (validated)
- preamble_lines (List[str])
"""
import collections
import warnings
for idx, fmt in enumerate(format_variants):
# Validate format dict
if 'file_encoding' not in fmt or not isinstance(fmt['file_encoding'], str):
raise ValueError(f"[Format {idx}] 'file_encoding' must be a string.")
if 'separator' not in fmt or not isinstance(fmt['separator'], str):
raise ValueError(f"[Format {idx}] 'separator' must be a string.")
if 'table_header' not in fmt or not isinstance(fmt['table_header'], (str, list)):
raise ValueError(f"[Format {idx}] 'table_header' must be a string or list of strings.")
encoding = fmt['file_encoding']
separator = fmt['separator']
header_patterns = fmt['table_header']
if isinstance(header_patterns, str):
header_patterns = [header_patterns]
preamble_lines = []
try:
with open(filepath, 'rb') as f:
for line_number, line in enumerate(f):
try:
decoded_line = line.decode(encoding)
except UnicodeDecodeError:
break # Try next format
for pattern in header_patterns:
if pattern in decoded_line:
substrings = decoded_line.split(separator.replace('\\t', '\t'))
counts = collections.Counter(substrings)
column_names = [
f"{i}_{name.strip()}" if counts[name] > 1 else name.strip()
for i, name in enumerate(substrings)
]
if verbose:
print(f"[Detected header] Line {line_number}: {column_names}")
return line_number, column_names, fmt, preamble_lines
preamble_lines.append(' '.join(decoded_line.split()))
except Exception as e:
if verbose:
print(f"[Format {idx}] Attempt failed: {e}")
continue
warnings.warn("Table header was not detected using known patterns. Will attempt inference mode.")
# Return fallback format with 'infer' but retain encoding/separator from first variant
fallback_fmt = {
'file_encoding': 'utf-8',
'separator': ',',
'table_header': ['infer']
}
return -1, [], fallback_fmt, []
def load_file_reader_parameters(filename: str, instruments_dir: str) -> tuple:
"""
Load file reader configuration parameters based on the file and instrument directory.
Returns:
- format_variants: List of dicts with keys:
'file_encoding', 'separator', 'table_header', 'timestamp', 'datetime_format', 'desired_datetime_format'
- description_dict: Dict loaded from instrument's description YAML
"""
config_path = os.path.abspath(os.path.join(instruments_dir, 'readers', 'config_text_reader.yaml'))
if not os.path.exists(config_dict):
config_path = os.path.join(dimaPath,'instruments','readers', 'config_text_reader.yaml')
try:
with open(config_path, 'r') as stream:
config_dict = yaml.load(stream, Loader=yaml.FullLoader)
except yaml.YAMLError as exc:
print(f"[YAML Load Error] {exc}")
return {}, [], {}
default_config = config_dict.get('default', {})
default_format = {
'file_encoding': default_config.get('file_encoding', 'utf-8'),
'separator': default_config.get('separator', ',').replace('\\t','\t'),
'table_header': default_config.get('table_header', 'infer'),
'timestamp': [],
'datetime_format': default_config.get('datetime_format', '%Y-%m-%d %H:%M:%S.%f'),
'desired_datetime_format' : default_config.get('desired_format', '%Y-%m-%d %H:%M:%S.%f')
}
format_variants = []
description_dict = {}
# Match instrument key by folder name in file path
filename = os.path.normpath(filename)
for instFolder in config_dict.keys():
if instFolder in filename.split(os.sep):
inst_config = config_dict[instFolder]
# New style: has 'formats' block
if 'formats' in inst_config:
for fmt in inst_config['formats']:
format_variants.append({
'file_encoding': fmt.get('file_encoding', default_format['file_encoding']),
'separator': fmt.get('separator', default_format['separator']),
'table_header': fmt.get('table_header', default_format['table_header']),
'timestamp': fmt.get('timestamp', []),
'datetime_format': fmt.get('datetime_format', default_config['datetime_format']),
'desired_datetime_format' : default_config['desired_datetime_format']
})
else:
# Old style: flat format
format_variants.append({
'file_encoding': inst_config.get('file_encoding', default_format['file_encoding']),
'separator': inst_config.get('separator', default_format['separator']),
'table_header': inst_config.get('table_header', default_format['table_header']),
'timestamp': inst_config.get('timestamp', []),
'datetime_format': inst_config.get('datetime_format', default_config['datetime_format']),
'desired_datetime_format' : default_config['desired_datetime_format']
})
# Description loading
link_to_description = inst_config.get('link_to_description', '').replace('/', os.sep)
if link_to_description:
desc_path = os.path.join(instruments_dir, link_to_description)
try:
with open(desc_path, 'r') as desc_stream:
description_dict = yaml.load(desc_stream, Loader=yaml.FullLoader)
except (FileNotFoundError, yaml.YAMLError) as exc:
print(f"[Description Load Error] {exc}")
break # Stop after first match
# Always return config_dict + list of formats + description
return format_variants, description_dict
if __name__ == "__main__":
from src.hdf5_ops import save_file_dict_to_hdf5
from utils.g5505_utils import created_at
# Set up argument parsing
parser = argparse.ArgumentParser(description="Data ingestion process to HDF5 files.")
parser.add_argument('dst_file_path', type=str, help="Path to the target HDF5 file.")
parser.add_argument('src_file_path', type=str, help="Relative path to source file to be saved to target HDF5 file.")
parser.add_argument('dst_group_name', type=str, help="Group name '/instFolder/[category]/fileName' in the target HDF5 file.")
args = parser.parse_args()
hdf5_file_path = args.dst_file_path
src_file_path = args.src_file_path
dst_group_name = args.dst_group_name
default_mode = 'r+'
try:
# Read source file and return an internal dictionary representation
idr_dict = read_txt_files_as_dict(src_file_path)
if not os.path.exists(hdf5_file_path):
default_mode = 'w'
print(f'Opening HDF5 file: {hdf5_file_path} in mode {default_mode}')
with h5py.File(hdf5_file_path, mode=default_mode, track_order=True) as hdf5_file_obj:
try:
# Create group if it does not exist
if dst_group_name not in hdf5_file_obj:
hdf5_file_obj.create_group(dst_group_name)
hdf5_file_obj[dst_group_name].attrs['creation_date'] = created_at().encode('utf-8')
print(f'Created new group: {dst_group_name}')
else:
print(f'Group {dst_group_name} already exists. Proceeding with data transfer...')
except Exception as inst:
logging.error('Failed to create group %s in HDF5: %s', dst_group_name, inst)
# Save dictionary to HDF5
save_file_dict_to_hdf5(hdf5_file_obj, dst_group_name, idr_dict)
print(f'Completed saving file dict with keys: {idr_dict.keys()}')
except Exception as e:
logging.error('File reader failed to process %s: %s', src_file_path, e)
print(f'File reader failed to process {src_file_path}. See logs for details.')