Update all file readers with command line interface so we can run them as a subprocess. Added also registry.yaml to decouple code from user-based instrument adaptations or extensions.
This commit is contained in:
@ -1,15 +1,20 @@
|
||||
import os
|
||||
import sys
|
||||
import subprocess
|
||||
import yaml
|
||||
#root_dir = os.path.abspath(os.curdir)
|
||||
#sys.path.append(root_dir)
|
||||
|
||||
try:
|
||||
from dima.instruments.readers.xps_ibw_reader import read_xps_ibw_file_as_dict
|
||||
from dima.instruments.readers.g5505_text_reader import read_txt_files_as_dict
|
||||
#try:
|
||||
# from dima.instruments.readers.xps_ibw_reader import read_xps_ibw_file_as_dict
|
||||
# from dima.instruments.readers.g5505_text_reader import read_txt_files_as_dict
|
||||
|
||||
except ModuleNotFoundError:
|
||||
#except ModuleNotFoundError as e:
|
||||
# print(e)
|
||||
from instruments.readers.xps_ibw_reader import read_xps_ibw_file_as_dict
|
||||
from instruments.readers.g5505_text_reader import read_txt_files_as_dict
|
||||
from instruments.readers.acsm_tofware_reader import read_acsm_files_as_dict
|
||||
from instruments.readers.acsm_flag_reader import read_jsonflag_as_dict
|
||||
|
||||
file_extensions = ['.ibw','.txt','.dat','.h5','.TXT','.csv','.pkl','.json','.yaml']
|
||||
|
||||
@ -19,23 +24,36 @@ default_instruments_dir = None # or provide an absolute path
|
||||
file_readers = {
|
||||
'ibw': lambda a1: read_xps_ibw_file_as_dict(a1),
|
||||
'txt': lambda a1: read_txt_files_as_dict(a1, instruments_dir=default_instruments_dir, work_with_copy=False),
|
||||
# 'TXT': lambda a1: read_txt_files_as_dict(a1, instruments_dir=default_instruments_dir, work_with_copy=False),
|
||||
'dat': lambda a1: read_txt_files_as_dict(a1, instruments_dir=default_instruments_dir, work_with_copy=False),
|
||||
'csv': lambda a1: read_txt_files_as_dict(a1, instruments_dir=default_instruments_dir, work_with_copy=False)
|
||||
}
|
||||
'csv': lambda a1: read_txt_files_as_dict(a1, instruments_dir=default_instruments_dir, work_with_copy=False),
|
||||
'ACSM_TOFWARE_txt' : lambda x: read_acsm_files_as_dict(x, instruments_dir=default_instruments_dir, work_with_copy=False),
|
||||
'ACSM_TOFWARE_csv' : lambda x: read_acsm_files_as_dict(x, instruments_dir=default_instruments_dir, work_with_copy=False),
|
||||
'ACSM_TOFWARE_flags_json' : lambda x: read_jsonflag_as_dict(x)}
|
||||
|
||||
# Add new "instrument reader (Data flagging app data)"
|
||||
REGISTRY_FILE = "registry.yaml" #os.path.join(os.path.dirname(__file__), "registry.yaml")
|
||||
|
||||
from instruments.readers.acsm_tofware_reader import read_acsm_files_as_dict
|
||||
file_extensions.append('.txt')
|
||||
file_readers.update({'ACSM_TOFWARE_txt' : lambda x: read_acsm_files_as_dict(x, instruments_dir=default_instruments_dir, work_with_copy=False)})
|
||||
def load_registry():
|
||||
|
||||
file_extensions.append('.csv')
|
||||
file_readers.update({'ACSM_TOFWARE_csv' : lambda x: read_acsm_files_as_dict(x, instruments_dir=default_instruments_dir, work_with_copy=False)})
|
||||
module_dir = os.path.dirname(__file__)
|
||||
instruments_dir = os.path.join(module_dir, '..')
|
||||
|
||||
from instruments.readers.flag_reader import read_jsonflag_as_dict
|
||||
file_extensions.append('.json')
|
||||
file_readers.update({'ACSM_TOFWARE_flags_json' : lambda x: read_jsonflag_as_dict(x)})
|
||||
# Normalize the path (resolves any '..' in the path)
|
||||
registry_path = os.path.abspath(os.path.join(module_dir,REGISTRY_FILE))
|
||||
|
||||
with open(registry_path, "r") as file:
|
||||
return yaml.safe_load(file)["instruments"]
|
||||
|
||||
def find_reader(instrument_folder, file_extension):
|
||||
|
||||
|
||||
|
||||
registry = load_registry()
|
||||
|
||||
for entry in registry:
|
||||
if entry["instrumentFolderName"] == instrument_folder and entry["fileExtension"] == file_extension:
|
||||
return entry["fileReaderPath"], entry["InstrumentDictionaryPath"]
|
||||
|
||||
return None, None # Not found
|
||||
|
||||
def compute_filereader_key_from_path(hdf5_file_path):
|
||||
"""Constructs the key 'instrumentname_ext' based on hdf5_file_path, structured as
|
||||
@ -82,3 +100,44 @@ def select_file_reader(path):
|
||||
|
||||
# Default case if no reader is found
|
||||
return lambda x : None
|
||||
|
||||
|
||||
|
||||
def run_reader(hdf5_file_path, src_file_path, dst_group_name):
|
||||
try:
|
||||
thisFilePath = os.path.abspath(__file__)
|
||||
except NameError:
|
||||
print("Error: __file__ is not available. Ensure the script is being run from a file.")
|
||||
print("[Notice] Path to DIMA package may not be resolved properly.")
|
||||
thisFilePath = os.getcwd() # Use current directory or specify a default
|
||||
|
||||
projectPath = os.path.normpath(os.path.join(thisFilePath, "..",'..')) # Move up to project root
|
||||
|
||||
#
|
||||
full_string, file_extension = compute_filereader_key_from_path(dst_group_name)
|
||||
full_string_parts = full_string.split("_")
|
||||
full_string_parts.remove(file_extension)
|
||||
instrument_folder = '_'.join(full_string_parts)
|
||||
|
||||
|
||||
|
||||
reader_path, dict_path = find_reader(instrument_folder, file_extension)
|
||||
|
||||
|
||||
if reader_path:
|
||||
reader_path = os.path.normpath(os.path.join(projectPath, reader_path))
|
||||
if not os.path.exists(reader_path):
|
||||
raise FileNotFoundError(f"File reader {reader_path} not found for key {full_string}. Verify the reader is properly referenced in registry.yaml.")
|
||||
else:
|
||||
print(f'Attempting to run {reader_path}')
|
||||
|
||||
|
||||
command = ["python", reader_path, hdf5_file_path, src_file_path, instrument_folder]
|
||||
#if dict_path:
|
||||
# args.append(dict_path)
|
||||
print(f"Running: {command}")
|
||||
output = subprocess.run(command, capture_output=True)#, check=True)
|
||||
print('Subprocess output',output.stdout)
|
||||
else:
|
||||
print(f'There is no file reader available to process files in {instrument_folder}.')
|
||||
#logging.info(instFoldermsdEnd )
|
||||
|
101
instruments/readers/acsm_flag_reader.py
Normal file
101
instruments/readers/acsm_flag_reader.py
Normal file
@ -0,0 +1,101 @@
|
||||
import sys
|
||||
import os
|
||||
|
||||
try:
|
||||
thisFilePath = os.path.abspath(__file__)
|
||||
except NameError:
|
||||
print("Error: __file__ is not available. Ensure the script is being run from a file.")
|
||||
print("[Notice] Path to DIMA package may not be resolved properly.")
|
||||
thisFilePath = os.getcwd() # Use current directory or specify a default
|
||||
|
||||
dimaPath = os.path.normpath(os.path.join(thisFilePath, "..",'..','..')) # Move up to project root
|
||||
|
||||
if dimaPath not in sys.path: # Avoid duplicate entries
|
||||
sys.path.insert(0,dimaPath)
|
||||
|
||||
import pandas as pd
|
||||
import collections
|
||||
import json
|
||||
import h5py
|
||||
import argparse
|
||||
import logging
|
||||
|
||||
import utils.g5505_utils as utils
|
||||
|
||||
|
||||
|
||||
def read_jsonflag_as_dict(path_to_file):
|
||||
|
||||
|
||||
file_dict = {}
|
||||
path_tail, path_head = os.path.split(path_to_file)
|
||||
|
||||
file_dict['name'] = path_head
|
||||
# TODO: review this header dictionary, it may not be the best way to represent header data
|
||||
file_dict['attributes_dict'] = {}
|
||||
file_dict['datasets'] = []
|
||||
|
||||
try:
|
||||
with open(path_to_file, 'r') as stream:
|
||||
flag = json.load(stream)#, Loader=json.FullLoader)
|
||||
except (FileNotFoundError, json.JSONDecodeError) as exc:
|
||||
print(exc)
|
||||
|
||||
dataset = {}
|
||||
dataset['name'] = 'data_table'#_numerical_variables'
|
||||
dataset['data'] = utils.convert_attrdict_to_np_structured_array(flag) #df_numerical_attrs.to_numpy()
|
||||
dataset['shape'] = dataset['data'].shape
|
||||
dataset['dtype'] = type(dataset['data'])
|
||||
|
||||
file_dict['datasets'].append(dataset)
|
||||
|
||||
return file_dict
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
from src.hdf5_ops import save_file_dict_to_hdf5
|
||||
from utils.g5505_utils import created_at
|
||||
|
||||
# Set up argument parsing
|
||||
parser = argparse.ArgumentParser(description="Data ingestion process to HDF5 files.")
|
||||
parser.add_argument('dst_file_path', type=str, help="Path to the target HDF5 file.")
|
||||
parser.add_argument('src_file_path', type=str, help="Relative path to source file to be saved to target HDF5 file.")
|
||||
parser.add_argument('dst_group_name', type=str, help="Group name '/instFolder/[category]/fileName' in the target HDF5 file.")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
hdf5_file_path = args.dst_file_path
|
||||
src_file_path = args.src_file_path
|
||||
dst_group_name = args.dst_group_name
|
||||
default_mode = 'r+'
|
||||
|
||||
try:
|
||||
# Read source file and return an internal dictionary representation
|
||||
idr_dict = read_jsonflag_as_dict(src_file_path)
|
||||
|
||||
if not os.path.exists(hdf5_file_path):
|
||||
default_mode = 'w'
|
||||
|
||||
print(f'Opening HDF5 file: {hdf5_file_path} in mode {default_mode}')
|
||||
|
||||
with h5py.File(hdf5_file_path, mode=default_mode, track_order=True) as hdf5_file_obj:
|
||||
try:
|
||||
# Create group if it does not exist
|
||||
if dst_group_name not in hdf5_file_obj:
|
||||
hdf5_file_obj.create_group(dst_group_name)
|
||||
hdf5_file_obj[dst_group_name].attrs['creation_date'] = created_at().encode('utf-8')
|
||||
print(f'Created new group: {dst_group_name}')
|
||||
else:
|
||||
print(f'Group {dst_group_name} already exists. Proceeding with data transfer...')
|
||||
|
||||
except Exception as inst:
|
||||
logging.error('Failed to create group %s in HDF5: %s', dst_group_name, inst)
|
||||
|
||||
# Save dictionary to HDF5
|
||||
save_file_dict_to_hdf5(hdf5_file_obj, dst_group_name, idr_dict)
|
||||
print(f'Completed saving file dict with keys: {idr_dict.keys()}')
|
||||
|
||||
except Exception as e:
|
||||
logging.error('File reader failed to process %s: %s', src_file_path, e)
|
||||
print(f'File reader failed to process {src_file_path}. See logs for details.')
|
||||
|
@ -1,14 +1,25 @@
|
||||
import sys
|
||||
import os
|
||||
|
||||
try:
|
||||
thisFilePath = os.path.abspath(__file__)
|
||||
except NameError:
|
||||
print("Error: __file__ is not available. Ensure the script is being run from a file.")
|
||||
print("[Notice] Path to DIMA package may not be resolved properly.")
|
||||
thisFilePath = os.getcwd() # Use current directory or specify a default
|
||||
|
||||
dimaPath = os.path.normpath(os.path.join(thisFilePath, "..",'..','..')) # Move up to project root
|
||||
|
||||
if dimaPath not in sys.path: # Avoid duplicate entries
|
||||
sys.path.insert(0,dimaPath)
|
||||
|
||||
import pandas as pd
|
||||
import collections
|
||||
import yaml
|
||||
import h5py
|
||||
import argparse
|
||||
import logging
|
||||
|
||||
#root_dir = os.path.abspath(os.curdir)
|
||||
#sys.path.append(root_dir)
|
||||
try:
|
||||
from dima.utils import g5505_utils as utils
|
||||
except ModuleNotFoundError:
|
||||
import utils.g5505_utils as utils
|
||||
|
||||
|
||||
@ -224,3 +235,54 @@ def read_acsm_files_as_dict(filename: str, instruments_dir: str = None, work_wit
|
||||
return {}
|
||||
|
||||
return file_dict
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
from src.hdf5_ops import save_file_dict_to_hdf5
|
||||
from utils.g5505_utils import created_at
|
||||
|
||||
# Set up argument parsing
|
||||
parser = argparse.ArgumentParser(description="Data ingestion process to HDF5 files.")
|
||||
parser.add_argument('dst_file_path', type=str, help="Path to the target HDF5 file.")
|
||||
parser.add_argument('src_file_path', type=str, help="Relative path to source file to be saved to target HDF5 file.")
|
||||
parser.add_argument('dst_group_name', type=str, help="Group name '/instFolder/[category]/fileName' in the target HDF5 file.")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
hdf5_file_path = args.dst_file_path
|
||||
src_file_path = args.src_file_path
|
||||
dst_group_name = args.dst_group_name
|
||||
default_mode = 'r+'
|
||||
|
||||
try:
|
||||
# Read source file and return an internal dictionary representation
|
||||
idr_dict = read_acsm_files_as_dict(src_file_path)
|
||||
|
||||
if not os.path.exists(hdf5_file_path):
|
||||
default_mode = 'w'
|
||||
|
||||
print(f'Opening HDF5 file: {hdf5_file_path} in mode {default_mode}')
|
||||
|
||||
with h5py.File(hdf5_file_path, mode=default_mode, track_order=True) as hdf5_file_obj:
|
||||
try:
|
||||
# Create group if it does not exist
|
||||
if dst_group_name not in hdf5_file_obj:
|
||||
hdf5_file_obj.create_group(dst_group_name)
|
||||
hdf5_file_obj[dst_group_name].attrs['creation_date'] = created_at().encode('utf-8')
|
||||
print(f'Created new group: {dst_group_name}')
|
||||
else:
|
||||
print(f'Group {dst_group_name} already exists. Proceeding with data transfer...')
|
||||
|
||||
except Exception as inst:
|
||||
logging.error('Failed to create group %s in HDF5: %s', dst_group_name, inst)
|
||||
|
||||
# Save dictionary to HDF5
|
||||
save_file_dict_to_hdf5(hdf5_file_obj, dst_group_name, idr_dict)
|
||||
print(f'Completed saving file dict with keys: {idr_dict.keys()}')
|
||||
|
||||
except Exception as e:
|
||||
logging.error('File reader failed to process %s: %s', src_file_path, e)
|
||||
print(f'File reader failed to process {src_file_path}. See logs for details.')
|
||||
|
||||
|
@ -1,42 +0,0 @@
|
||||
import os
|
||||
import json
|
||||
|
||||
#root_dir = os.path.abspath(os.curdir)
|
||||
#sys.path.append(root_dir)
|
||||
#print(__file__)
|
||||
|
||||
#from instruments.readers import set_dima_path as configpath
|
||||
#configpath.set_dima_path()
|
||||
|
||||
try:
|
||||
from dima.utils import g5505_utils as utils
|
||||
except ModuleNotFoundError:
|
||||
import utils.g5505_utils as utils
|
||||
|
||||
|
||||
def read_jsonflag_as_dict(path_to_file):
|
||||
|
||||
|
||||
file_dict = {}
|
||||
path_tail, path_head = os.path.split(path_to_file)
|
||||
|
||||
file_dict['name'] = path_head
|
||||
# TODO: review this header dictionary, it may not be the best way to represent header data
|
||||
file_dict['attributes_dict'] = {}
|
||||
file_dict['datasets'] = []
|
||||
|
||||
try:
|
||||
with open(path_to_file, 'r') as stream:
|
||||
flag = json.load(stream)#, Loader=json.FullLoader)
|
||||
except (FileNotFoundError, json.JSONDecodeError) as exc:
|
||||
print(exc)
|
||||
|
||||
dataset = {}
|
||||
dataset['name'] = 'data_table'#_numerical_variables'
|
||||
dataset['data'] = g5505_utils.convert_attrdict_to_np_structured_array(flag) #df_numerical_attrs.to_numpy()
|
||||
dataset['shape'] = dataset['data'].shape
|
||||
dataset['dtype'] = type(dataset['data'])
|
||||
|
||||
file_dict['datasets'].append(dataset)
|
||||
|
||||
return file_dict
|
@ -1,19 +1,40 @@
|
||||
import sys
|
||||
import os
|
||||
|
||||
try:
|
||||
thisFilePath = os.path.abspath(__file__)
|
||||
except NameError:
|
||||
print("Error: __file__ is not available. Ensure the script is being run from a file.")
|
||||
print("[Notice] Path to DIMA package may not be resolved properly.")
|
||||
thisFilePath = os.getcwd() # Use current directory or specify a default
|
||||
|
||||
dimaPath = os.path.normpath(os.path.join(thisFilePath, "..",'..','..')) # Move up to project root
|
||||
|
||||
if dimaPath not in sys.path: # Avoid duplicate entries
|
||||
sys.path.insert(0,dimaPath)
|
||||
|
||||
import pandas as pd
|
||||
import collections
|
||||
import yaml
|
||||
|
||||
import h5py
|
||||
import argparse
|
||||
import logging
|
||||
# Import project modules
|
||||
root_dir = os.path.abspath(os.curdir)
|
||||
sys.path.append(root_dir)
|
||||
#root_dir = os.path.abspath(os.curdir)
|
||||
#sys.path.append(root_dir)
|
||||
|
||||
|
||||
#try:
|
||||
# from dima.utils import g5505_utils as utils
|
||||
#except ModuleNotFoundError:
|
||||
# import utils.g5505_utils as utils
|
||||
# import src.hdf5_ops as hdf5_ops
|
||||
import utils.g5505_utils as utils
|
||||
|
||||
|
||||
|
||||
|
||||
def read_txt_files_as_dict(filename: str, instruments_dir: str = None, work_with_copy: bool = True):
|
||||
|
||||
filename = os.path.normpath(filename)
|
||||
# If instruments_dir is not provided, use the default path relative to the module directory
|
||||
if not instruments_dir:
|
||||
# Assuming the instruments folder is one level up from the source module directory
|
||||
@ -23,6 +44,8 @@ def read_txt_files_as_dict(filename: str, instruments_dir: str = None, work_with
|
||||
# Normalize the path (resolves any '..' in the path)
|
||||
instrument_configs_path = os.path.abspath(os.path.join(instruments_dir,'readers','config_text_reader.yaml'))
|
||||
|
||||
print(instrument_configs_path)
|
||||
|
||||
with open(instrument_configs_path,'r') as stream:
|
||||
try:
|
||||
config_dict = yaml.load(stream, Loader=yaml.FullLoader)
|
||||
@ -44,7 +67,9 @@ def read_txt_files_as_dict(filename: str, instruments_dir: str = None, work_with
|
||||
description_dict = {}
|
||||
|
||||
for instFolder in config_dict.keys():
|
||||
|
||||
if instFolder in filename.split(os.sep):
|
||||
|
||||
file_encoding = config_dict[instFolder].get('file_encoding',file_encoding)
|
||||
separator = config_dict[instFolder].get('separator',separator)
|
||||
table_header = config_dict[instFolder].get('table_header',table_header)
|
||||
@ -76,6 +101,7 @@ def read_txt_files_as_dict(filename: str, instruments_dir: str = None, work_with
|
||||
#with open(tmp_filename,'rb',encoding=file_encoding,errors='ignore') as f:
|
||||
|
||||
if not isinstance(table_header, list):
|
||||
|
||||
table_header = [table_header]
|
||||
file_encoding = [file_encoding]
|
||||
separator = [separator]
|
||||
@ -87,14 +113,17 @@ def read_txt_files_as_dict(filename: str, instruments_dir: str = None, work_with
|
||||
with open(tmp_filename,'rb') as f:
|
||||
|
||||
for line_number, line in enumerate(f):
|
||||
decoded_line = line.decode(file_encoding[tb_idx])
|
||||
|
||||
|
||||
for tb_idx, tb in enumerate(table_header):
|
||||
if tb in line.decode(file_encoding[tb_idx]):
|
||||
print(tb)
|
||||
if tb in decoded_line:
|
||||
break
|
||||
|
||||
if tb in line.decode(file_encoding[tb_idx]):
|
||||
list_of_substrings = line.decode(file_encoding[tb_idx]).split(separator[tb_idx].replace('\\t','\t'))
|
||||
if tb in decoded_line:
|
||||
|
||||
list_of_substrings = decoded_line.split(separator[tb_idx].replace('\\t','\t'))
|
||||
|
||||
# Count occurrences of each substring
|
||||
substring_counts = collections.Counter(list_of_substrings)
|
||||
@ -109,9 +138,11 @@ def read_txt_files_as_dict(filename: str, instruments_dir: str = None, work_with
|
||||
|
||||
#print(line_number, len(column_names ),'\n')
|
||||
break
|
||||
else:
|
||||
print('Table header was not detected.')
|
||||
# Subdivide line into words, and join them by single space.
|
||||
# I asumme this can produce a cleaner line that contains no weird separator characters \t \r or extra spaces and so on.
|
||||
list_of_substrings = line.decode(file_encoding[tb_idx]).split()
|
||||
list_of_substrings = decoded_line.split()
|
||||
# TODO: ideally we should use a multilinear string but the yalm parser is not recognizing \n as special character
|
||||
#line = ' '.join(list_of_substrings+['\n'])
|
||||
#line = ' '.join(list_of_substrings)
|
||||
@ -119,8 +150,13 @@ def read_txt_files_as_dict(filename: str, instruments_dir: str = None, work_with
|
||||
|
||||
|
||||
# TODO: it does not work with separator as none :(. fix for RGA
|
||||
|
||||
try:
|
||||
print(column_names)
|
||||
if not 'infer' in table_header:
|
||||
#print(table_header)
|
||||
#print(file_encoding[tb_idx])
|
||||
|
||||
df = pd.read_csv(tmp_filename,
|
||||
delimiter = separator[tb_idx].replace('\\t','\t'),
|
||||
header=line_number,
|
||||
@ -249,7 +285,59 @@ def read_txt_files_as_dict(filename: str, instruments_dir: str = None, work_with
|
||||
# if timestamps_name in categorical_variables:
|
||||
# dataset['attributes'] = {timestamps_name: utils.parse_attribute({'unit':'YYYY-MM-DD HH:MM:SS.ffffff'})}
|
||||
# file_dict['datasets'].append(dataset)
|
||||
except:
|
||||
except Exception as e:
|
||||
print(e)
|
||||
return {}
|
||||
|
||||
return file_dict
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
from src.hdf5_ops import save_file_dict_to_hdf5
|
||||
from utils.g5505_utils import created_at
|
||||
|
||||
# Set up argument parsing
|
||||
parser = argparse.ArgumentParser(description="Data ingestion process to HDF5 files.")
|
||||
parser.add_argument('dst_file_path', type=str, help="Path to the target HDF5 file.")
|
||||
parser.add_argument('src_file_path', type=str, help="Relative path to source file to be saved to target HDF5 file.")
|
||||
parser.add_argument('dst_group_name', type=str, help="Group name '/instFolder/[category]/fileName' in the target HDF5 file.")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
hdf5_file_path = args.dst_file_path
|
||||
src_file_path = args.src_file_path
|
||||
dst_group_name = args.dst_group_name
|
||||
default_mode = 'r+'
|
||||
|
||||
try:
|
||||
# Read source file and return an internal dictionary representation
|
||||
idr_dict = read_txt_files_as_dict(src_file_path)
|
||||
|
||||
if not os.path.exists(hdf5_file_path):
|
||||
default_mode = 'w'
|
||||
|
||||
print(f'Opening HDF5 file: {hdf5_file_path} in mode {default_mode}')
|
||||
|
||||
with h5py.File(hdf5_file_path, mode=default_mode, track_order=True) as hdf5_file_obj:
|
||||
try:
|
||||
# Create group if it does not exist
|
||||
if dst_group_name not in hdf5_file_obj:
|
||||
hdf5_file_obj.create_group(dst_group_name)
|
||||
hdf5_file_obj[dst_group_name].attrs['creation_date'] = created_at().encode('utf-8')
|
||||
print(f'Created new group: {dst_group_name}')
|
||||
else:
|
||||
print(f'Group {dst_group_name} already exists. Proceeding with data transfer...')
|
||||
|
||||
except Exception as inst:
|
||||
logging.error('Failed to create group %s in HDF5: %s', dst_group_name, inst)
|
||||
|
||||
# Save dictionary to HDF5
|
||||
save_file_dict_to_hdf5(hdf5_file_obj, dst_group_name, idr_dict)
|
||||
print(f'Completed saving file dict with keys: {idr_dict.keys()}')
|
||||
|
||||
except Exception as e:
|
||||
logging.error('File reader failed to process %s: %s', src_file_path, e)
|
||||
print(f'File reader failed to process {src_file_path}. See logs for details.')
|
||||
|
||||
|
@ -1,5 +1,10 @@
|
||||
import os
|
||||
import sys
|
||||
import h5py
|
||||
|
||||
from igor2.binarywave import load as loadibw
|
||||
import logging
|
||||
import argparse
|
||||
|
||||
def read_xps_ibw_file_as_dict(filename):
|
||||
"""
|
||||
@ -77,3 +82,65 @@ def read_xps_ibw_file_as_dict(filename):
|
||||
|
||||
|
||||
return file_dict
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
|
||||
try:
|
||||
thisFilePath = os.path.abspath(__file__)
|
||||
except NameError:
|
||||
print("Error: __file__ is not available. Ensure the script is being run from a file.")
|
||||
print("[Notice] Path to DIMA package may not be resolved properly.")
|
||||
thisFilePath = os.getcwd() # Use current directory or specify a default
|
||||
|
||||
dimaPath = os.path.normpath(os.path.join(thisFilePath, "..",'..','..')) # Move up to project root
|
||||
|
||||
if dimaPath not in sys.path: # Avoid duplicate entries
|
||||
sys.path.insert(0,dimaPath)
|
||||
|
||||
from src.hdf5_ops import save_file_dict_to_hdf5
|
||||
from utils.g5505_utils import created_at
|
||||
|
||||
# Set up argument parsing
|
||||
parser = argparse.ArgumentParser(description="Data ingestion process to HDF5 files.")
|
||||
parser.add_argument('dst_file_path', type=str, help="Path to the target HDF5 file.")
|
||||
parser.add_argument('src_file_path', type=str, help="Relative path to source file to be saved to target HDF5 file.")
|
||||
parser.add_argument('dst_group_name', type=str, help="Group name '/instFolder/[category]/fileName' in the target HDF5 file.")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
hdf5_file_path = args.dst_file_path
|
||||
src_file_path = args.src_file_path
|
||||
dst_group_name = args.dst_group_name
|
||||
default_mode = 'r+'
|
||||
|
||||
try:
|
||||
# Read source file and return an internal dictionary representation
|
||||
idr_dict = read_xps_ibw_file_as_dict(src_file_path)
|
||||
|
||||
if not os.path.exists(hdf5_file_path):
|
||||
default_mode = 'w'
|
||||
|
||||
print(f'Opening HDF5 file: {hdf5_file_path} in mode {default_mode}')
|
||||
|
||||
with h5py.File(hdf5_file_path, mode=default_mode, track_order=True) as hdf5_file_obj:
|
||||
try:
|
||||
# Create group if it does not exist
|
||||
if dst_group_name not in hdf5_file_obj:
|
||||
hdf5_file_obj.create_group(dst_group_name)
|
||||
hdf5_file_obj[dst_group_name].attrs['creation_date'] = created_at().encode('utf-8')
|
||||
print(f'Created new group: {dst_group_name}')
|
||||
else:
|
||||
print(f'Group {dst_group_name} already exists. Proceeding with data transfer...')
|
||||
|
||||
except Exception as inst:
|
||||
logging.error('Failed to create group %s in HDF5: %s', dst_group_name, inst)
|
||||
|
||||
# Save dictionary to HDF5
|
||||
save_file_dict_to_hdf5(hdf5_file_obj, dst_group_name, idr_dict)
|
||||
print(f'Completed saving file dict with keys: {idr_dict.keys()}')
|
||||
|
||||
except Exception as e:
|
||||
logging.error('File reader failed to process %s: %s', src_file_path, e)
|
||||
print(f'File reader failed to process {src_file_path}. See logs for details.')
|
||||
|
||||
|
75
instruments/registry.yaml
Normal file
75
instruments/registry.yaml
Normal file
@ -0,0 +1,75 @@
|
||||
instruments:
|
||||
- instrumentFolderName: default
|
||||
fileExtension: csv
|
||||
fileReaderPath: instruments/readers/g5505_text_reader.py
|
||||
InstrumentDictionaryPath: null
|
||||
|
||||
- instrumentFolderName: NEXAFS
|
||||
fileExtension: h5
|
||||
fileReaderPath: null
|
||||
InstrumentDictionaryPath: null
|
||||
|
||||
- instrumentFolderName: SES
|
||||
fileExtension: ibw
|
||||
fileReaderPath: instruments/readers/xps_ibw_reader.py
|
||||
InstrumentDictionaryPath: null
|
||||
|
||||
- instrumentFolderName: RGA
|
||||
fileExtension: txt
|
||||
fileReaderPath: instruments/readers/g5505_text_reader.py
|
||||
InstrumentDictionaryPath: instruments/dictionaries/RGA.yaml
|
||||
|
||||
- instrumentFolderName: Pressure
|
||||
fileExtension: dat
|
||||
fileReaderPath: instruments/readers/g5505_text_reader.py
|
||||
InstrumentDictionaryPath: instruments/dictionaries/Pressure.yaml
|
||||
|
||||
- instrumentFolderName: Humidity_Sensors
|
||||
fileExtension: dat
|
||||
fileReaderPath: instruments/readers/g5505_text_reader.py
|
||||
InstrumentDictionaryPath: instruments/dictionaries/Humidity_Sensors.yaml
|
||||
|
||||
- instrumentFolderName: ICAD
|
||||
fileExtension: dat
|
||||
fileReaderPath: instruments/readers/g5505_text_reader.py
|
||||
InstrumentDictionaryPath: instruments/dictionaries/ICAD.yaml
|
||||
|
||||
- instrumentFolderName: Lopap
|
||||
fileExtension: dat
|
||||
fileReaderPath: instruments/readers/g5505_text_reader.py
|
||||
InstrumentDictionaryPath: instruments/dictionaries/Lopap.yaml
|
||||
|
||||
- instrumentFolderName: T200_NOx
|
||||
fileExtension: dat
|
||||
fileReaderPath: instruments/readers/g5505_text_reader.py
|
||||
InstrumentDictionaryPath: instruments/dictionaries/T200_NOx.yaml
|
||||
|
||||
- instrumentFolderName: T360U_CO2
|
||||
fileExtension: dat
|
||||
fileReaderPath: instruments/readers/g5505_text_reader.py
|
||||
InstrumentDictionaryPath: instruments/dictionaries/T360U_CO2.yaml
|
||||
|
||||
- instrumentFolderName: htof
|
||||
fileExtension: h5
|
||||
fileReaderPath: null
|
||||
InstrumentDictionaryPath: null
|
||||
|
||||
- instrumentFolderName: smps
|
||||
fileExtension: txt
|
||||
fileReaderPath: instruments/readers/g5505_text_reader.py
|
||||
InstrumentDictionaryPath: instruments/dictionaries/smps.yaml
|
||||
|
||||
- instrumentFolderName: gas
|
||||
fileExtension: txt
|
||||
fileReaderPath: instruments/readers/g5505_text_reader.py
|
||||
InstrumentDictionaryPath: instruments/dictionaries/gas.yaml
|
||||
|
||||
- instrumentFolderName: ACSM_TOFWARE
|
||||
fileExtension: txt
|
||||
fileReaderPath: instruments/readers/acsm_tofware_reader.py
|
||||
InstrumentDictionaryPath: instruments/dictionaries/ACSM_TOFWARE.yaml
|
||||
|
||||
- instrumentFolderName: ACSM_TOFWARE
|
||||
fileExtension: csv
|
||||
fileReaderPath: instruments/readers/acsm_tofware_reader.py
|
||||
InstrumentDictionaryPath: instruments/dictionaries/ACSM_TOFWARE.yaml
|
Reference in New Issue
Block a user