import sys import os try: thisFilePath = os.path.abspath(__file__) except NameError: print("Error: __file__ is not available. Ensure the script is being run from a file.") print("[Notice] Path to DIMA package may not be resolved properly.") thisFilePath = os.getcwd() # Use current directory or specify a default dimaPath = os.path.normpath(os.path.join(thisFilePath, "..",'..','..')) # Move up to project root if dimaPath not in sys.path: # Avoid duplicate entries sys.path.insert(0,dimaPath) import pandas as pd import collections import yaml import h5py import argparse import logging import warnings import utils.g5505_utils as utils def read_txt_files_as_dict(filename: str, instruments_dir: str = None, work_with_copy: bool = True): filename = os.path.normpath(filename) # If instruments_dir is not provided, use the default path relative to the module directory if not instruments_dir: # Assuming the instruments folder is one level up from the source module directory module_dir = os.path.dirname(__file__) instruments_dir = os.path.join(module_dir, '..') #(config_dict, #file_encoding, #separator, #table_header, #timestamp_variables, #datetime_format, #description_dict) = load_file_reader_parameters(filename, instruments_dir) format_variants, description_dict = load_file_reader_parameters(filename, instruments_dir) # Read header as a dictionary and detect where data table starts header_dict = {'actris_level': 0, 'processing_date':utils.created_at(), 'processing_script' : os.path.relpath(thisFilePath,dimaPath)} data_start = False # Work with copy of the file for safety if work_with_copy: tmp_filename = utils.make_file_copy(source_file_path=filename) else: tmp_filename = filename # Run header detection header_line_number, column_names, fmt_dict, table_preamble = detect_table_header_line(tmp_filename, format_variants) # Unpack validated format info table_header = fmt_dict['table_header'] separator = fmt_dict['separator'] file_encoding = fmt_dict['file_encoding'] timestamp_variables = fmt_dict.get('timestamp', []) datetime_format = fmt_dict.get('datetime_format', None) desired_datetime_fmt = fmt_dict.get('desired_datetime_format', None) # Ensure separator is valid if not isinstance(separator, str) or not separator.strip(): raise ValueError(f"Invalid separator found in format: {repr(separator)}") # Load DataFrame try: if 'infer' not in table_header: df = pd.read_csv(tmp_filename, delimiter=separator, header=header_line_number, encoding=file_encoding, names=column_names, skip_blank_lines=True) else: df = pd.read_csv(tmp_filename, delimiter=separator, header=header_line_number, encoding=file_encoding, skip_blank_lines=True) df_numerical_attrs = df.select_dtypes(include ='number') df_categorical_attrs = df.select_dtypes(exclude='number') numerical_variables = [item for item in df_numerical_attrs.columns] # Consolidate into single timestamp column the separate columns 'date' 'time' specified in text_data_source.yaml if timestamp_variables: if not all(col in df_categorical_attrs.columns for col in timestamp_variables): raise ValueError(f"Invalid timestamp columns: {[col for col in timestamp_variables if col not in df_categorical_attrs.columns]}.") #df_categorical_attrs['timestamps'] = [' '.join(df_categorical_attrs.loc[i,timestamp_variables].to_numpy()) for i in df.index] #df_categorical_attrs['timestamps'] = [ df_categorical_attrs.loc[i,'0_Date']+' '+df_categorical_attrs.loc[i,'1_Time'] for i in df.index] #df_categorical_attrs['timestamps'] = df_categorical_attrs[timestamp_variables].astype(str).agg(' '.join, axis=1) timestamps_name = ' '.join(timestamp_variables) df_categorical_attrs[ timestamps_name] = df_categorical_attrs[timestamp_variables].astype(str).agg(' '.join, axis=1) valid_indices = [] if datetime_format: df_categorical_attrs[ timestamps_name] = pd.to_datetime(df_categorical_attrs[ timestamps_name],format=datetime_format,errors='coerce') valid_indices = df_categorical_attrs.dropna(subset=[timestamps_name]).index df_categorical_attrs = df_categorical_attrs.loc[valid_indices,:] df_numerical_attrs = df_numerical_attrs.loc[valid_indices,:] df_categorical_attrs[timestamps_name] = df_categorical_attrs[timestamps_name].dt.strftime(desired_datetime_fmt) startdate = df_categorical_attrs[timestamps_name].min() enddate = df_categorical_attrs[timestamps_name].max() df_categorical_attrs[timestamps_name] = df_categorical_attrs[timestamps_name].astype(str) #header_dict.update({'stastrrtdate':startdate,'enddate':enddate}) header_dict['startdate']= str(startdate) header_dict['enddate']=str(enddate) if len(timestamp_variables) > 1: df_categorical_attrs = df_categorical_attrs.drop(columns = timestamp_variables) #### #elif 'RGA' in filename: # df_categorical_attrs = df_categorical_attrs.rename(columns={'0_Time(s)' : 'timestamps'}) ### file_dict = {} path_tail, path_head = os.path.split(tmp_filename) file_dict['name'] = path_head # TODO: review this header dictionary, it may not be the best way to represent header data file_dict['attributes_dict'] = header_dict file_dict['datasets'] = [] #### df = pd.concat((df_categorical_attrs,df_numerical_attrs),axis=1) #if numerical_variables: dataset = {} dataset['name'] = 'data_table'#_numerical_variables' dataset['data'] = utils.convert_dataframe_to_np_structured_array(df) #df_numerical_attrs.to_numpy() dataset['shape'] = dataset['data'].shape dataset['dtype'] = type(dataset['data']) #dataset['data_units'] = file_obj['wave']['data_units'] # # Create attribute descriptions based on description_dict dataset['attributes'] = {} # Annotate column headers if description_dict is non empty if description_dict: for column_name in df.columns: column_attr_dict = description_dict['table_header'].get(column_name, {'note':'there was no description available. Review instrument files.'}) dataset['attributes'].update({column_name: utils.convert_attrdict_to_np_structured_array(column_attr_dict)}) #try: # dataset['attributes'] = description_dict['table_header'].copy() # for key in description_dict['table_header'].keys(): # if not key in numerical_variables: # dataset['attributes'].pop(key) # delete key # else: # dataset['attributes'][key] = utils.parse_attribute(dataset['attributes'][key]) # if timestamps_name in categorical_variables: # dataset['attributes'][timestamps_name] = utils.parse_attribute({'unit':'YYYY-MM-DD HH:MM:SS.ffffff'}) #except ValueError as err: # print(err) # Represent string values as fixed length strings in the HDF5 file, which need # to be decoded as string when we read them. It provides better control than variable strings, # at the expense of flexibility. # https://docs.h5py.org/en/stable/strings.html if table_preamble: #header_dict["table_preamble"] = utils.convert_string_to_bytes(table_preamble) tp_dataset = {} tp_dataset['name'] = "table_preamble" tp_dataset['data'] = utils.convert_string_to_bytes(table_preamble) tp_dataset['shape'] = tp_dataset['data'].shape tp_dataset['dtype'] = type(tp_dataset['data']) tp_dataset['attributes'] = {} file_dict['datasets'].append(tp_dataset) file_dict['datasets'].append(dataset) #if categorical_variables: # dataset = {} # dataset['name'] = 'table_categorical_variables' # dataset['data'] = dataframe_to_np_structured_array(df_categorical_attrs) #df_categorical_attrs.loc[:,categorical_variables].to_numpy() # dataset['shape'] = dataset['data'].shape # dataset['dtype'] = type(dataset['data']) # if timestamps_name in categorical_variables: # dataset['attributes'] = {timestamps_name: utils.parse_attribute({'unit':'YYYY-MM-DD HH:MM:SS.ffffff'})} # file_dict['datasets'].append(dataset) #except Exception as e: except Exception as e: #raise RuntimeError(f"Failed to read file with detected format: {e}") print(e) return {} return file_dict ## Supporting functions def detect_table_header_line(filepath, format_variants, verbose=False): """ Tries multiple format variants to detect the table header line in the file. Args: filepath (str): Path to file. format_variants (List[Dict]): Each must contain: - 'file_encoding' (str) - 'separator' (str) - 'table_header' (str or list of str) verbose (bool): If True, prints debug info. Returns: Tuple: - header_line_idx (int) - column_names (List[str]) - matched_format (Dict[str, Any]) # full format dict (validated) - preamble_lines (List[str]) """ import collections import warnings for idx, fmt in enumerate(format_variants): # Validate format dict if 'file_encoding' not in fmt or not isinstance(fmt['file_encoding'], str): raise ValueError(f"[Format {idx}] 'file_encoding' must be a string.") if 'separator' not in fmt or not isinstance(fmt['separator'], str): raise ValueError(f"[Format {idx}] 'separator' must be a string.") if 'table_header' not in fmt or not isinstance(fmt['table_header'], (str, list)): raise ValueError(f"[Format {idx}] 'table_header' must be a string or list of strings.") encoding = fmt['file_encoding'] separator = fmt['separator'] header_patterns = fmt['table_header'] if isinstance(header_patterns, str): header_patterns = [header_patterns] preamble_lines = [] try: with open(filepath, 'rb') as f: for line_number, line in enumerate(f): try: decoded_line = line.decode(encoding) except UnicodeDecodeError: break # Try next format for pattern in header_patterns: if pattern in decoded_line: substrings = decoded_line.split(separator.replace('\\t', '\t')) counts = collections.Counter(substrings) column_names = [ f"{i}_{name.strip()}" if counts[name] > 1 else name.strip() for i, name in enumerate(substrings) ] if verbose: print(f"[Detected header] Line {line_number}: {column_names}") return line_number, column_names, fmt, preamble_lines preamble_lines.append(' '.join(decoded_line.split())) except Exception as e: if verbose: print(f"[Format {idx}] Attempt failed: {e}") continue warnings.warn("Table header was not detected using known patterns. Will attempt inference mode.") # Return fallback format with 'infer' but retain encoding/separator from first variant fallback_fmt = { 'file_encoding': 'utf-8', 'separator': ',', 'table_header': ['infer'] } return -1, [], fallback_fmt, [] def load_file_reader_parameters(filename: str, instruments_dir: str) -> tuple: """ Load file reader configuration parameters based on the file and instrument directory. Returns: - format_variants: List of dicts with keys: 'file_encoding', 'separator', 'table_header', 'timestamp', 'datetime_format', 'desired_datetime_format' - description_dict: Dict loaded from instrument's description YAML """ config_path = os.path.abspath(os.path.join(instruments_dir, 'readers', 'config_text_reader.yaml')) if not os.path.exists(config_path): config_path = os.path.join(dimaPath,'instruments','readers', 'config_text_reader.yaml') try: with open(config_path, 'r') as stream: config_dict = yaml.load(stream, Loader=yaml.FullLoader) except yaml.YAMLError as exc: print(f"[YAML Load Error] {exc}") return {}, [], {} default_config = config_dict.get('default', {}) default_format = { 'file_encoding': default_config.get('file_encoding', 'utf-8'), 'separator': default_config.get('separator', ',').replace('\\t','\t'), 'table_header': default_config.get('table_header', 'infer'), 'timestamp': [], 'datetime_format': default_config.get('datetime_format', '%Y-%m-%d %H:%M:%S.%f'), 'desired_datetime_format' : default_config.get('desired_format', '%Y-%m-%d %H:%M:%S.%f') } format_variants = [] description_dict = {} # Match instrument key by folder name in file path filename = os.path.normpath(filename) for instFolder in config_dict.keys(): if instFolder in filename.split(os.sep): inst_config = config_dict[instFolder] # New style: has 'formats' block if 'formats' in inst_config: for fmt in inst_config['formats']: format_variants.append({ 'file_encoding': fmt.get('file_encoding', default_format['file_encoding']), 'separator': fmt.get('separator', default_format['separator']), 'table_header': fmt.get('table_header', default_format['table_header']), 'timestamp': fmt.get('timestamp', []), 'datetime_format': fmt.get('datetime_format', default_format['desired_datetime_format']), 'desired_datetime_format' :default_format['desired_datetime_format'] }) else: # Old style: flat format format_variants.append({ 'file_encoding': inst_config.get('file_encoding', default_format['file_encoding']), 'separator': inst_config.get('separator', default_format['separator']), 'table_header': inst_config.get('table_header', default_format['table_header']), 'timestamp': inst_config.get('timestamp', []), 'datetime_format': inst_config.get('datetime_format', default_format['desired_datetime_format']), 'desired_datetime_format' : default_format['desired_datetime_format'] }) # Description loading link_to_description = inst_config.get('link_to_description', '').replace('/', os.sep) if link_to_description: desc_path = os.path.join(instruments_dir, link_to_description) try: with open(desc_path, 'r') as desc_stream: description_dict = yaml.load(desc_stream, Loader=yaml.FullLoader) except (FileNotFoundError, yaml.YAMLError) as exc: print(f"[Description Load Error] {exc}") break # Stop after first match # Always return config_dict + list of formats + description return format_variants, description_dict if __name__ == "__main__": from src.hdf5_ops import save_file_dict_to_hdf5 from utils.g5505_utils import created_at # Set up argument parsing parser = argparse.ArgumentParser(description="Data ingestion process to HDF5 files.") parser.add_argument('dst_file_path', type=str, help="Path to the target HDF5 file.") parser.add_argument('src_file_path', type=str, help="Relative path to source file to be saved to target HDF5 file.") parser.add_argument('dst_group_name', type=str, help="Group name '/instFolder/[category]/fileName' in the target HDF5 file.") args = parser.parse_args() hdf5_file_path = args.dst_file_path src_file_path = args.src_file_path dst_group_name = args.dst_group_name default_mode = 'r+' try: # Read source file and return an internal dictionary representation idr_dict = read_txt_files_as_dict(src_file_path) if not os.path.exists(hdf5_file_path): default_mode = 'w' print(f'Opening HDF5 file: {hdf5_file_path} in mode {default_mode}') with h5py.File(hdf5_file_path, mode=default_mode, track_order=True) as hdf5_file_obj: try: # Create group if it does not exist if dst_group_name not in hdf5_file_obj: hdf5_file_obj.create_group(dst_group_name) hdf5_file_obj[dst_group_name].attrs['creation_date'] = created_at().encode('utf-8') print(f'Created new group: {dst_group_name}') else: print(f'Group {dst_group_name} already exists. Proceeding with data transfer...') except Exception as inst: logging.error('Failed to create group %s in HDF5: %s', dst_group_name, inst) # Save dictionary to HDF5 save_file_dict_to_hdf5(hdf5_file_obj, dst_group_name, idr_dict) print(f'Completed saving file dict with keys: {idr_dict.keys()}') except Exception as e: logging.error('File reader failed to process %s: %s', src_file_path, e) print(f'File reader failed to process {src_file_path}. See logs for details.')