From b96c04fc012ede2f3bffe0777274d84ab4485cc1 Mon Sep 17 00:00:00 2001 From: Florez Ospina Juan Felipe Date: Thu, 19 Jun 2025 20:40:14 +0200 Subject: [PATCH] Refactor instruments/readers/g5505_text_reader.py, some code abstracted as functions to improve readabilitity. --- instruments/readers/g5505_text_reader.py | 201 +++++++++++------------ 1 file changed, 99 insertions(+), 102 deletions(-) diff --git a/instruments/readers/g5505_text_reader.py b/instruments/readers/g5505_text_reader.py index 262959d..57735f9 100644 --- a/instruments/readers/g5505_text_reader.py +++ b/instruments/readers/g5505_text_reader.py @@ -19,19 +19,94 @@ import yaml import h5py import argparse import logging -# Import project modules -#root_dir = os.path.abspath(os.curdir) -#sys.path.append(root_dir) - - -#try: -# from dima.utils import g5505_utils as utils -#except ModuleNotFoundError: -# import utils.g5505_utils as utils -# import src.hdf5_ops as hdf5_ops +import warnings import utils.g5505_utils as utils +def detect_table_header_line(filepath, table_header_list, encoding_list, separator_list, verbose=False): + """ + Detects the table header line in the file and returns: + - header_line_idx (int) + - column_names (List[str]) + - tb_idx used + - preamble_lines (List[str]) + Returns (-1, [], None, []) if not found. + """ + preamble_lines = [] + header_line_idx = -1 + column_names = [] + tb_idx = None + + with open(filepath, 'rb') as f: + for line_number, line in enumerate(f): + decoded_line = line.decode(encoding_list[0]) # assume consistent encoding initially + for idx, tb in enumerate(table_header_list): + if tb in decoded_line: + tb_idx = idx + list_of_substrings = decoded_line.split(separator_list[idx].replace('\\t', '\t')) + counts = collections.Counter(list_of_substrings) + column_names = [f"{i}_{name.strip()}" if counts[name] > 1 else name.strip() + for i, name in enumerate(list_of_substrings)] + header_line_idx = line_number + if verbose: + print(f"[Detected header] Line {line_number}: {column_names}") + return header_line_idx, column_names, tb_idx, preamble_lines + preamble_lines.append(' '.join(decoded_line.split())) + + warnings.warn("Table header was not detected using known patterns. Will attempt inference mode.") + return -1, [], None, preamble_lines + +def load_file_reader_parameters(filename: str, instruments_dir: str) -> tuple: + """ + Load file reader configuration parameters based on the file and instrument directory. + + Returns: + - config_dict: Full configuration dictionary + - file_encoding + - separator + - table_header + - timestamp_variables + - datetime_format + - description_dict + """ + config_path = os.path.abspath(os.path.join(instruments_dir, 'readers', 'config_text_reader.yaml')) + + try: + with open(config_path, 'r') as stream: + config_dict = yaml.load(stream, Loader=yaml.FullLoader) + except yaml.YAMLError as exc: + print(f"[YAML Load Error] {exc}") + return {}, '', '', '', [], [], {} + + # Defaults + file_encoding = config_dict.get('default', {}).get('file_encoding', 'utf-8') + separator = config_dict.get('default', {}).get('separator', ',') + table_header = config_dict.get('default', {}).get('table_header', 'infer') + timestamp_variables = [] + datetime_format = [] + description_dict = {} + + for instFolder in config_dict.keys(): + if instFolder in filename.split(os.sep): + file_encoding = config_dict[instFolder].get('file_encoding', file_encoding) + separator = config_dict[instFolder].get('separator', separator) + table_header = config_dict[instFolder].get('table_header', table_header) + timestamp_variables = config_dict[instFolder].get('timestamp', []) + datetime_format = config_dict[instFolder].get('datetime_format', []) + + link_to_description = config_dict[instFolder].get('link_to_description', '').replace('/', os.sep) + if link_to_description: + path = os.path.join(instruments_dir, link_to_description) + try: + with open(path, 'r') as stream: + description_dict = yaml.load(stream, Loader=yaml.FullLoader) + except (FileNotFoundError, yaml.YAMLError) as exc: + print(f"[Description Load Error] {exc}") + + return (config_dict, file_encoding, separator, table_header, + timestamp_variables, datetime_format, description_dict) + + def read_txt_files_as_dict(filename: str, instruments_dir: str = None, work_with_copy: bool = True): filename = os.path.normpath(filename) @@ -41,56 +116,16 @@ def read_txt_files_as_dict(filename: str, instruments_dir: str = None, work_with module_dir = os.path.dirname(__file__) instruments_dir = os.path.join(module_dir, '..') - # Normalize the path (resolves any '..' in the path) - instrument_configs_path = os.path.abspath(os.path.join(instruments_dir,'readers','config_text_reader.yaml')) - - print(instrument_configs_path) - - with open(instrument_configs_path,'r') as stream: - try: - config_dict = yaml.load(stream, Loader=yaml.FullLoader) - except yaml.YAMLError as exc: - print(exc) - # Verify if file can be read by available intrument configurations. - #if not any(key in filename.replace(os.sep,'/') for key in config_dict.keys()): - # return {} - - - #TODO: this may be prone to error if assumed folder structure is non compliant - file_encoding = config_dict['default']['file_encoding'] #'utf-8' - separator = config_dict['default']['separator'] - table_header = config_dict['default']['table_header'] - timestamp_variables = [] - datetime_format = [] - tb_idx = 0 - column_names = '' - description_dict = {} - - for instFolder in config_dict.keys(): - - if instFolder in filename.split(os.sep): - - file_encoding = config_dict[instFolder].get('file_encoding',file_encoding) - separator = config_dict[instFolder].get('separator',separator) - table_header = config_dict[instFolder].get('table_header',table_header) - timestamp_variables = config_dict[instFolder].get('timestamp',[]) - datetime_format = config_dict[instFolder].get('datetime_format',[]) - - - link_to_description = config_dict[instFolder].get('link_to_description', '').replace('/', os.sep) - - if link_to_description: - path = os.path.join(instruments_dir, link_to_description) - try: - with open(path, 'r') as stream: - description_dict = yaml.load(stream, Loader=yaml.FullLoader) - except (FileNotFoundError, yaml.YAMLError) as exc: - print(exc) - #if 'None' in table_header: - # return {} + (config_dict, + file_encoding, + separator, + table_header, + timestamp_variables, + datetime_format, + description_dict) = load_file_reader_parameters(filename, instruments_dir) # Read header as a dictionary and detect where data table starts - header_dict = {} + header_dict = {'actris_level': 0, 'processing_date':utils.created_at(), 'processing_script' : os.path.relpath(thisFilePath,dimaPath)} data_start = False # Work with copy of the file for safety if work_with_copy: @@ -109,58 +144,20 @@ def read_txt_files_as_dict(filename: str, instruments_dir: str = None, work_with table_preamble = [] line_number = 0 if 'infer' not in table_header: + header_line_idx, column_names, tb_idx, table_preamble = detect_table_header_line( + tmp_filename, table_header, file_encoding, separator) - with open(tmp_filename,'rb') as f: - - for line_number, line in enumerate(f): - decoded_line = line.decode(file_encoding[tb_idx]) - - - for tb_idx, tb in enumerate(table_header): - print(tb) - if tb in decoded_line: - break - - if tb in decoded_line: - - list_of_substrings = decoded_line.split(separator[tb_idx].replace('\\t','\t')) - - # Count occurrences of each substring - substring_counts = collections.Counter(list_of_substrings) - data_start = True - # Generate column names with appended index only for repeated substrings - column_names = [f"{i}_{name.strip()}" if substring_counts[name] > 1 else name.strip() for i, name in enumerate(list_of_substrings)] - - #column_names = [str(i)+'_'+name.strip() for i, name in enumerate(list_of_substrings)] - #column_names = [] - #for i, name in enumerate(list_of_substrings): - # column_names.append(str(i)+'_'+name) - - #print(line_number, len(column_names ),'\n') - break - else: - print('Table header was not detected.') - # Subdivide line into words, and join them by single space. - # I asumme this can produce a cleaner line that contains no weird separator characters \t \r or extra spaces and so on. - list_of_substrings = decoded_line.split() - # TODO: ideally we should use a multilinear string but the yalm parser is not recognizing \n as special character - #line = ' '.join(list_of_substrings+['\n']) - #line = ' '.join(list_of_substrings) - table_preamble.append(' '.join([item for item in list_of_substrings]))# += new_line + if header_line_idx == -1: + table_header = ['infer'] # fallback to pandas' inference # TODO: it does not work with separator as none :(. fix for RGA try: - print(column_names) - if not 'infer' in table_header: - #print(table_header) - #print(file_encoding[tb_idx]) - + if not 'infer' in table_header: df = pd.read_csv(tmp_filename, delimiter = separator[tb_idx].replace('\\t','\t'), - header=line_number, - #encoding='latin-1', + header=header_line_idx, encoding = file_encoding[tb_idx], names=column_names, skip_blank_lines=True)