Update all file readers with command line interface so we can run them as a subprocess. Added also registry.yaml to decouple code from user-based instrument adaptations or extensions.

This commit is contained in:
2025-02-24 17:27:12 +01:00
parent 02e926e003
commit e5fdc6fa31
7 changed files with 495 additions and 85 deletions

View File

@@ -0,0 +1,101 @@
import sys
import os
try:
thisFilePath = os.path.abspath(__file__)
except NameError:
print("Error: __file__ is not available. Ensure the script is being run from a file.")
print("[Notice] Path to DIMA package may not be resolved properly.")
thisFilePath = os.getcwd() # Use current directory or specify a default
dimaPath = os.path.normpath(os.path.join(thisFilePath, "..",'..','..')) # Move up to project root
if dimaPath not in sys.path: # Avoid duplicate entries
sys.path.insert(0,dimaPath)
import pandas as pd
import collections
import json
import h5py
import argparse
import logging
import utils.g5505_utils as utils
def read_jsonflag_as_dict(path_to_file):
file_dict = {}
path_tail, path_head = os.path.split(path_to_file)
file_dict['name'] = path_head
# TODO: review this header dictionary, it may not be the best way to represent header data
file_dict['attributes_dict'] = {}
file_dict['datasets'] = []
try:
with open(path_to_file, 'r') as stream:
flag = json.load(stream)#, Loader=json.FullLoader)
except (FileNotFoundError, json.JSONDecodeError) as exc:
print(exc)
dataset = {}
dataset['name'] = 'data_table'#_numerical_variables'
dataset['data'] = utils.convert_attrdict_to_np_structured_array(flag) #df_numerical_attrs.to_numpy()
dataset['shape'] = dataset['data'].shape
dataset['dtype'] = type(dataset['data'])
file_dict['datasets'].append(dataset)
return file_dict
if __name__ == "__main__":
from src.hdf5_ops import save_file_dict_to_hdf5
from utils.g5505_utils import created_at
# Set up argument parsing
parser = argparse.ArgumentParser(description="Data ingestion process to HDF5 files.")
parser.add_argument('dst_file_path', type=str, help="Path to the target HDF5 file.")
parser.add_argument('src_file_path', type=str, help="Relative path to source file to be saved to target HDF5 file.")
parser.add_argument('dst_group_name', type=str, help="Group name '/instFolder/[category]/fileName' in the target HDF5 file.")
args = parser.parse_args()
hdf5_file_path = args.dst_file_path
src_file_path = args.src_file_path
dst_group_name = args.dst_group_name
default_mode = 'r+'
try:
# Read source file and return an internal dictionary representation
idr_dict = read_jsonflag_as_dict(src_file_path)
if not os.path.exists(hdf5_file_path):
default_mode = 'w'
print(f'Opening HDF5 file: {hdf5_file_path} in mode {default_mode}')
with h5py.File(hdf5_file_path, mode=default_mode, track_order=True) as hdf5_file_obj:
try:
# Create group if it does not exist
if dst_group_name not in hdf5_file_obj:
hdf5_file_obj.create_group(dst_group_name)
hdf5_file_obj[dst_group_name].attrs['creation_date'] = created_at().encode('utf-8')
print(f'Created new group: {dst_group_name}')
else:
print(f'Group {dst_group_name} already exists. Proceeding with data transfer...')
except Exception as inst:
logging.error('Failed to create group %s in HDF5: %s', dst_group_name, inst)
# Save dictionary to HDF5
save_file_dict_to_hdf5(hdf5_file_obj, dst_group_name, idr_dict)
print(f'Completed saving file dict with keys: {idr_dict.keys()}')
except Exception as e:
logging.error('File reader failed to process %s: %s', src_file_path, e)
print(f'File reader failed to process {src_file_path}. See logs for details.')

View File

@@ -1,15 +1,26 @@
import sys
import os
try:
thisFilePath = os.path.abspath(__file__)
except NameError:
print("Error: __file__ is not available. Ensure the script is being run from a file.")
print("[Notice] Path to DIMA package may not be resolved properly.")
thisFilePath = os.getcwd() # Use current directory or specify a default
dimaPath = os.path.normpath(os.path.join(thisFilePath, "..",'..','..')) # Move up to project root
if dimaPath not in sys.path: # Avoid duplicate entries
sys.path.insert(0,dimaPath)
import pandas as pd
import collections
import yaml
import h5py
import argparse
import logging
#root_dir = os.path.abspath(os.curdir)
#sys.path.append(root_dir)
try:
from dima.utils import g5505_utils as utils
except ModuleNotFoundError:
import utils.g5505_utils as utils
import utils.g5505_utils as utils
@@ -223,4 +234,55 @@ def read_acsm_files_as_dict(filename: str, instruments_dir: str = None, work_wit
except:
return {}
return file_dict
return file_dict
if __name__ == "__main__":
from src.hdf5_ops import save_file_dict_to_hdf5
from utils.g5505_utils import created_at
# Set up argument parsing
parser = argparse.ArgumentParser(description="Data ingestion process to HDF5 files.")
parser.add_argument('dst_file_path', type=str, help="Path to the target HDF5 file.")
parser.add_argument('src_file_path', type=str, help="Relative path to source file to be saved to target HDF5 file.")
parser.add_argument('dst_group_name', type=str, help="Group name '/instFolder/[category]/fileName' in the target HDF5 file.")
args = parser.parse_args()
hdf5_file_path = args.dst_file_path
src_file_path = args.src_file_path
dst_group_name = args.dst_group_name
default_mode = 'r+'
try:
# Read source file and return an internal dictionary representation
idr_dict = read_acsm_files_as_dict(src_file_path)
if not os.path.exists(hdf5_file_path):
default_mode = 'w'
print(f'Opening HDF5 file: {hdf5_file_path} in mode {default_mode}')
with h5py.File(hdf5_file_path, mode=default_mode, track_order=True) as hdf5_file_obj:
try:
# Create group if it does not exist
if dst_group_name not in hdf5_file_obj:
hdf5_file_obj.create_group(dst_group_name)
hdf5_file_obj[dst_group_name].attrs['creation_date'] = created_at().encode('utf-8')
print(f'Created new group: {dst_group_name}')
else:
print(f'Group {dst_group_name} already exists. Proceeding with data transfer...')
except Exception as inst:
logging.error('Failed to create group %s in HDF5: %s', dst_group_name, inst)
# Save dictionary to HDF5
save_file_dict_to_hdf5(hdf5_file_obj, dst_group_name, idr_dict)
print(f'Completed saving file dict with keys: {idr_dict.keys()}')
except Exception as e:
logging.error('File reader failed to process %s: %s', src_file_path, e)
print(f'File reader failed to process {src_file_path}. See logs for details.')

View File

@@ -1,42 +0,0 @@
import os
import json
#root_dir = os.path.abspath(os.curdir)
#sys.path.append(root_dir)
#print(__file__)
#from instruments.readers import set_dima_path as configpath
#configpath.set_dima_path()
try:
from dima.utils import g5505_utils as utils
except ModuleNotFoundError:
import utils.g5505_utils as utils
def read_jsonflag_as_dict(path_to_file):
file_dict = {}
path_tail, path_head = os.path.split(path_to_file)
file_dict['name'] = path_head
# TODO: review this header dictionary, it may not be the best way to represent header data
file_dict['attributes_dict'] = {}
file_dict['datasets'] = []
try:
with open(path_to_file, 'r') as stream:
flag = json.load(stream)#, Loader=json.FullLoader)
except (FileNotFoundError, json.JSONDecodeError) as exc:
print(exc)
dataset = {}
dataset['name'] = 'data_table'#_numerical_variables'
dataset['data'] = g5505_utils.convert_attrdict_to_np_structured_array(flag) #df_numerical_attrs.to_numpy()
dataset['shape'] = dataset['data'].shape
dataset['dtype'] = type(dataset['data'])
file_dict['datasets'].append(dataset)
return file_dict

View File

@@ -1,19 +1,40 @@
import sys
import os
try:
thisFilePath = os.path.abspath(__file__)
except NameError:
print("Error: __file__ is not available. Ensure the script is being run from a file.")
print("[Notice] Path to DIMA package may not be resolved properly.")
thisFilePath = os.getcwd() # Use current directory or specify a default
dimaPath = os.path.normpath(os.path.join(thisFilePath, "..",'..','..')) # Move up to project root
if dimaPath not in sys.path: # Avoid duplicate entries
sys.path.insert(0,dimaPath)
import pandas as pd
import collections
import yaml
import h5py
import argparse
import logging
# Import project modules
root_dir = os.path.abspath(os.curdir)
sys.path.append(root_dir)
#root_dir = os.path.abspath(os.curdir)
#sys.path.append(root_dir)
#try:
# from dima.utils import g5505_utils as utils
#except ModuleNotFoundError:
# import utils.g5505_utils as utils
# import src.hdf5_ops as hdf5_ops
import utils.g5505_utils as utils
def read_txt_files_as_dict(filename: str, instruments_dir: str = None, work_with_copy: bool = True):
filename = os.path.normpath(filename)
# If instruments_dir is not provided, use the default path relative to the module directory
if not instruments_dir:
# Assuming the instruments folder is one level up from the source module directory
@@ -23,6 +44,8 @@ def read_txt_files_as_dict(filename: str, instruments_dir: str = None, work_with
# Normalize the path (resolves any '..' in the path)
instrument_configs_path = os.path.abspath(os.path.join(instruments_dir,'readers','config_text_reader.yaml'))
print(instrument_configs_path)
with open(instrument_configs_path,'r') as stream:
try:
config_dict = yaml.load(stream, Loader=yaml.FullLoader)
@@ -44,7 +67,9 @@ def read_txt_files_as_dict(filename: str, instruments_dir: str = None, work_with
description_dict = {}
for instFolder in config_dict.keys():
if instFolder in filename.split(os.sep):
file_encoding = config_dict[instFolder].get('file_encoding',file_encoding)
separator = config_dict[instFolder].get('separator',separator)
table_header = config_dict[instFolder].get('table_header',table_header)
@@ -76,6 +101,7 @@ def read_txt_files_as_dict(filename: str, instruments_dir: str = None, work_with
#with open(tmp_filename,'rb',encoding=file_encoding,errors='ignore') as f:
if not isinstance(table_header, list):
table_header = [table_header]
file_encoding = [file_encoding]
separator = [separator]
@@ -87,14 +113,17 @@ def read_txt_files_as_dict(filename: str, instruments_dir: str = None, work_with
with open(tmp_filename,'rb') as f:
for line_number, line in enumerate(f):
decoded_line = line.decode(file_encoding[tb_idx])
for tb_idx, tb in enumerate(table_header):
if tb in line.decode(file_encoding[tb_idx]):
break
if tb in line.decode(file_encoding[tb_idx]):
list_of_substrings = line.decode(file_encoding[tb_idx]).split(separator[tb_idx].replace('\\t','\t'))
for tb_idx, tb in enumerate(table_header):
print(tb)
if tb in decoded_line:
break
if tb in decoded_line:
list_of_substrings = decoded_line.split(separator[tb_idx].replace('\\t','\t'))
# Count occurrences of each substring
substring_counts = collections.Counter(list_of_substrings)
@@ -109,9 +138,11 @@ def read_txt_files_as_dict(filename: str, instruments_dir: str = None, work_with
#print(line_number, len(column_names ),'\n')
break
else:
print('Table header was not detected.')
# Subdivide line into words, and join them by single space.
# I asumme this can produce a cleaner line that contains no weird separator characters \t \r or extra spaces and so on.
list_of_substrings = line.decode(file_encoding[tb_idx]).split()
list_of_substrings = decoded_line.split()
# TODO: ideally we should use a multilinear string but the yalm parser is not recognizing \n as special character
#line = ' '.join(list_of_substrings+['\n'])
#line = ' '.join(list_of_substrings)
@@ -119,8 +150,13 @@ def read_txt_files_as_dict(filename: str, instruments_dir: str = None, work_with
# TODO: it does not work with separator as none :(. fix for RGA
try:
print(column_names)
if not 'infer' in table_header:
#print(table_header)
#print(file_encoding[tb_idx])
df = pd.read_csv(tmp_filename,
delimiter = separator[tb_idx].replace('\\t','\t'),
header=line_number,
@@ -138,7 +174,7 @@ def read_txt_files_as_dict(filename: str, instruments_dir: str = None, work_with
df_numerical_attrs = df.select_dtypes(include ='number')
df_categorical_attrs = df.select_dtypes(exclude='number')
numerical_variables = [item for item in df_numerical_attrs.columns]
# Consolidate into single timestamp column the separate columns 'date' 'time' specified in text_data_source.yaml
if timestamp_variables:
#df_categorical_attrs['timestamps'] = [' '.join(df_categorical_attrs.loc[i,timestamp_variables].to_numpy()) for i in df.index]
@@ -148,7 +184,7 @@ def read_txt_files_as_dict(filename: str, instruments_dir: str = None, work_with
#df_categorical_attrs['timestamps'] = df_categorical_attrs[timestamp_variables].astype(str).agg(' '.join, axis=1)
timestamps_name = ' '.join(timestamp_variables)
df_categorical_attrs[ timestamps_name] = df_categorical_attrs[timestamp_variables].astype(str).agg(' '.join, axis=1)
valid_indices = []
if datetime_format:
df_categorical_attrs[ timestamps_name] = pd.to_datetime(df_categorical_attrs[ timestamps_name],format=datetime_format,errors='coerce')
@@ -249,7 +285,59 @@ def read_txt_files_as_dict(filename: str, instruments_dir: str = None, work_with
# if timestamps_name in categorical_variables:
# dataset['attributes'] = {timestamps_name: utils.parse_attribute({'unit':'YYYY-MM-DD HH:MM:SS.ffffff'})}
# file_dict['datasets'].append(dataset)
except:
except Exception as e:
print(e)
return {}
return file_dict
return file_dict
if __name__ == "__main__":
from src.hdf5_ops import save_file_dict_to_hdf5
from utils.g5505_utils import created_at
# Set up argument parsing
parser = argparse.ArgumentParser(description="Data ingestion process to HDF5 files.")
parser.add_argument('dst_file_path', type=str, help="Path to the target HDF5 file.")
parser.add_argument('src_file_path', type=str, help="Relative path to source file to be saved to target HDF5 file.")
parser.add_argument('dst_group_name', type=str, help="Group name '/instFolder/[category]/fileName' in the target HDF5 file.")
args = parser.parse_args()
hdf5_file_path = args.dst_file_path
src_file_path = args.src_file_path
dst_group_name = args.dst_group_name
default_mode = 'r+'
try:
# Read source file and return an internal dictionary representation
idr_dict = read_txt_files_as_dict(src_file_path)
if not os.path.exists(hdf5_file_path):
default_mode = 'w'
print(f'Opening HDF5 file: {hdf5_file_path} in mode {default_mode}')
with h5py.File(hdf5_file_path, mode=default_mode, track_order=True) as hdf5_file_obj:
try:
# Create group if it does not exist
if dst_group_name not in hdf5_file_obj:
hdf5_file_obj.create_group(dst_group_name)
hdf5_file_obj[dst_group_name].attrs['creation_date'] = created_at().encode('utf-8')
print(f'Created new group: {dst_group_name}')
else:
print(f'Group {dst_group_name} already exists. Proceeding with data transfer...')
except Exception as inst:
logging.error('Failed to create group %s in HDF5: %s', dst_group_name, inst)
# Save dictionary to HDF5
save_file_dict_to_hdf5(hdf5_file_obj, dst_group_name, idr_dict)
print(f'Completed saving file dict with keys: {idr_dict.keys()}')
except Exception as e:
logging.error('File reader failed to process %s: %s', src_file_path, e)
print(f'File reader failed to process {src_file_path}. See logs for details.')

View File

@@ -1,5 +1,10 @@
import os
import sys
import h5py
from igor2.binarywave import load as loadibw
import logging
import argparse
def read_xps_ibw_file_as_dict(filename):
"""
@@ -76,4 +81,66 @@ def read_xps_ibw_file_as_dict(filename):
file_dict['datasets'].append(dataset)
return file_dict
return file_dict
if __name__ == "__main__":
try:
thisFilePath = os.path.abspath(__file__)
except NameError:
print("Error: __file__ is not available. Ensure the script is being run from a file.")
print("[Notice] Path to DIMA package may not be resolved properly.")
thisFilePath = os.getcwd() # Use current directory or specify a default
dimaPath = os.path.normpath(os.path.join(thisFilePath, "..",'..','..')) # Move up to project root
if dimaPath not in sys.path: # Avoid duplicate entries
sys.path.insert(0,dimaPath)
from src.hdf5_ops import save_file_dict_to_hdf5
from utils.g5505_utils import created_at
# Set up argument parsing
parser = argparse.ArgumentParser(description="Data ingestion process to HDF5 files.")
parser.add_argument('dst_file_path', type=str, help="Path to the target HDF5 file.")
parser.add_argument('src_file_path', type=str, help="Relative path to source file to be saved to target HDF5 file.")
parser.add_argument('dst_group_name', type=str, help="Group name '/instFolder/[category]/fileName' in the target HDF5 file.")
args = parser.parse_args()
hdf5_file_path = args.dst_file_path
src_file_path = args.src_file_path
dst_group_name = args.dst_group_name
default_mode = 'r+'
try:
# Read source file and return an internal dictionary representation
idr_dict = read_xps_ibw_file_as_dict(src_file_path)
if not os.path.exists(hdf5_file_path):
default_mode = 'w'
print(f'Opening HDF5 file: {hdf5_file_path} in mode {default_mode}')
with h5py.File(hdf5_file_path, mode=default_mode, track_order=True) as hdf5_file_obj:
try:
# Create group if it does not exist
if dst_group_name not in hdf5_file_obj:
hdf5_file_obj.create_group(dst_group_name)
hdf5_file_obj[dst_group_name].attrs['creation_date'] = created_at().encode('utf-8')
print(f'Created new group: {dst_group_name}')
else:
print(f'Group {dst_group_name} already exists. Proceeding with data transfer...')
except Exception as inst:
logging.error('Failed to create group %s in HDF5: %s', dst_group_name, inst)
# Save dictionary to HDF5
save_file_dict_to_hdf5(hdf5_file_obj, dst_group_name, idr_dict)
print(f'Completed saving file dict with keys: {idr_dict.keys()}')
except Exception as e:
logging.error('File reader failed to process %s: %s', src_file_path, e)
print(f'File reader failed to process {src_file_path}. See logs for details.')