Refactor instruments/readers/g5505_text_reader.py, some code abstracted as functions to improve readabilitity.

This commit is contained in:
2025-06-19 20:40:14 +02:00
parent f555f7f199
commit b96c04fc01

View File

@ -19,19 +19,94 @@ import yaml
import h5py
import argparse
import logging
# Import project modules
#root_dir = os.path.abspath(os.curdir)
#sys.path.append(root_dir)
#try:
# from dima.utils import g5505_utils as utils
#except ModuleNotFoundError:
# import utils.g5505_utils as utils
# import src.hdf5_ops as hdf5_ops
import warnings
import utils.g5505_utils as utils
def detect_table_header_line(filepath, table_header_list, encoding_list, separator_list, verbose=False):
"""
Detects the table header line in the file and returns:
- header_line_idx (int)
- column_names (List[str])
- tb_idx used
- preamble_lines (List[str])
Returns (-1, [], None, []) if not found.
"""
preamble_lines = []
header_line_idx = -1
column_names = []
tb_idx = None
with open(filepath, 'rb') as f:
for line_number, line in enumerate(f):
decoded_line = line.decode(encoding_list[0]) # assume consistent encoding initially
for idx, tb in enumerate(table_header_list):
if tb in decoded_line:
tb_idx = idx
list_of_substrings = decoded_line.split(separator_list[idx].replace('\\t', '\t'))
counts = collections.Counter(list_of_substrings)
column_names = [f"{i}_{name.strip()}" if counts[name] > 1 else name.strip()
for i, name in enumerate(list_of_substrings)]
header_line_idx = line_number
if verbose:
print(f"[Detected header] Line {line_number}: {column_names}")
return header_line_idx, column_names, tb_idx, preamble_lines
preamble_lines.append(' '.join(decoded_line.split()))
warnings.warn("Table header was not detected using known patterns. Will attempt inference mode.")
return -1, [], None, preamble_lines
def load_file_reader_parameters(filename: str, instruments_dir: str) -> tuple:
"""
Load file reader configuration parameters based on the file and instrument directory.
Returns:
- config_dict: Full configuration dictionary
- file_encoding
- separator
- table_header
- timestamp_variables
- datetime_format
- description_dict
"""
config_path = os.path.abspath(os.path.join(instruments_dir, 'readers', 'config_text_reader.yaml'))
try:
with open(config_path, 'r') as stream:
config_dict = yaml.load(stream, Loader=yaml.FullLoader)
except yaml.YAMLError as exc:
print(f"[YAML Load Error] {exc}")
return {}, '', '', '', [], [], {}
# Defaults
file_encoding = config_dict.get('default', {}).get('file_encoding', 'utf-8')
separator = config_dict.get('default', {}).get('separator', ',')
table_header = config_dict.get('default', {}).get('table_header', 'infer')
timestamp_variables = []
datetime_format = []
description_dict = {}
for instFolder in config_dict.keys():
if instFolder in filename.split(os.sep):
file_encoding = config_dict[instFolder].get('file_encoding', file_encoding)
separator = config_dict[instFolder].get('separator', separator)
table_header = config_dict[instFolder].get('table_header', table_header)
timestamp_variables = config_dict[instFolder].get('timestamp', [])
datetime_format = config_dict[instFolder].get('datetime_format', [])
link_to_description = config_dict[instFolder].get('link_to_description', '').replace('/', os.sep)
if link_to_description:
path = os.path.join(instruments_dir, link_to_description)
try:
with open(path, 'r') as stream:
description_dict = yaml.load(stream, Loader=yaml.FullLoader)
except (FileNotFoundError, yaml.YAMLError) as exc:
print(f"[Description Load Error] {exc}")
return (config_dict, file_encoding, separator, table_header,
timestamp_variables, datetime_format, description_dict)
def read_txt_files_as_dict(filename: str, instruments_dir: str = None, work_with_copy: bool = True):
filename = os.path.normpath(filename)
@ -41,56 +116,16 @@ def read_txt_files_as_dict(filename: str, instruments_dir: str = None, work_with
module_dir = os.path.dirname(__file__)
instruments_dir = os.path.join(module_dir, '..')
# Normalize the path (resolves any '..' in the path)
instrument_configs_path = os.path.abspath(os.path.join(instruments_dir,'readers','config_text_reader.yaml'))
print(instrument_configs_path)
with open(instrument_configs_path,'r') as stream:
try:
config_dict = yaml.load(stream, Loader=yaml.FullLoader)
except yaml.YAMLError as exc:
print(exc)
# Verify if file can be read by available intrument configurations.
#if not any(key in filename.replace(os.sep,'/') for key in config_dict.keys()):
# return {}
#TODO: this may be prone to error if assumed folder structure is non compliant
file_encoding = config_dict['default']['file_encoding'] #'utf-8'
separator = config_dict['default']['separator']
table_header = config_dict['default']['table_header']
timestamp_variables = []
datetime_format = []
tb_idx = 0
column_names = ''
description_dict = {}
for instFolder in config_dict.keys():
if instFolder in filename.split(os.sep):
file_encoding = config_dict[instFolder].get('file_encoding',file_encoding)
separator = config_dict[instFolder].get('separator',separator)
table_header = config_dict[instFolder].get('table_header',table_header)
timestamp_variables = config_dict[instFolder].get('timestamp',[])
datetime_format = config_dict[instFolder].get('datetime_format',[])
link_to_description = config_dict[instFolder].get('link_to_description', '').replace('/', os.sep)
if link_to_description:
path = os.path.join(instruments_dir, link_to_description)
try:
with open(path, 'r') as stream:
description_dict = yaml.load(stream, Loader=yaml.FullLoader)
except (FileNotFoundError, yaml.YAMLError) as exc:
print(exc)
#if 'None' in table_header:
# return {}
(config_dict,
file_encoding,
separator,
table_header,
timestamp_variables,
datetime_format,
description_dict) = load_file_reader_parameters(filename, instruments_dir)
# Read header as a dictionary and detect where data table starts
header_dict = {}
header_dict = {'actris_level': 0, 'processing_date':utils.created_at(), 'processing_script' : os.path.relpath(thisFilePath,dimaPath)}
data_start = False
# Work with copy of the file for safety
if work_with_copy:
@ -109,58 +144,20 @@ def read_txt_files_as_dict(filename: str, instruments_dir: str = None, work_with
table_preamble = []
line_number = 0
if 'infer' not in table_header:
header_line_idx, column_names, tb_idx, table_preamble = detect_table_header_line(
tmp_filename, table_header, file_encoding, separator)
with open(tmp_filename,'rb') as f:
for line_number, line in enumerate(f):
decoded_line = line.decode(file_encoding[tb_idx])
for tb_idx, tb in enumerate(table_header):
print(tb)
if tb in decoded_line:
break
if tb in decoded_line:
list_of_substrings = decoded_line.split(separator[tb_idx].replace('\\t','\t'))
# Count occurrences of each substring
substring_counts = collections.Counter(list_of_substrings)
data_start = True
# Generate column names with appended index only for repeated substrings
column_names = [f"{i}_{name.strip()}" if substring_counts[name] > 1 else name.strip() for i, name in enumerate(list_of_substrings)]
#column_names = [str(i)+'_'+name.strip() for i, name in enumerate(list_of_substrings)]
#column_names = []
#for i, name in enumerate(list_of_substrings):
# column_names.append(str(i)+'_'+name)
#print(line_number, len(column_names ),'\n')
break
else:
print('Table header was not detected.')
# Subdivide line into words, and join them by single space.
# I asumme this can produce a cleaner line that contains no weird separator characters \t \r or extra spaces and so on.
list_of_substrings = decoded_line.split()
# TODO: ideally we should use a multilinear string but the yalm parser is not recognizing \n as special character
#line = ' '.join(list_of_substrings+['\n'])
#line = ' '.join(list_of_substrings)
table_preamble.append(' '.join([item for item in list_of_substrings]))# += new_line
if header_line_idx == -1:
table_header = ['infer'] # fallback to pandas' inference
# TODO: it does not work with separator as none :(. fix for RGA
try:
print(column_names)
if not 'infer' in table_header:
#print(table_header)
#print(file_encoding[tb_idx])
df = pd.read_csv(tmp_filename,
delimiter = separator[tb_idx].replace('\\t','\t'),
header=line_number,
#encoding='latin-1',
header=header_line_idx,
encoding = file_encoding[tb_idx],
names=column_names,
skip_blank_lines=True)