Compare commits

...

4 Commits

6 changed files with 174 additions and 17 deletions

View File

@ -2,7 +2,7 @@
input_file_directory: '//fs03/Iron_Sulphate'
# Path to directory where raw data is copied and converted to HDF5 format for local analysis.
output_file_directory: 'output_files/'
output_file_directory: '../output_files/'
# Project metadata for data lineage and provenance
project: 'Fe SOA project'

View File

@ -2,7 +2,7 @@
input_file_directory: '//fs101/5505/People/Juan/TypicalBeamTime'
# Path to directory where raw data is copied and converted to HDF5 format for local analysis.
output_file_directory: 'output_files/'
output_file_directory: '../output_files/'
# Project metadata for data lineage and provenance
project: 'Beamtime May 2024, Ice Napp'

View File

@ -16,8 +16,9 @@ from instruments.readers.g5505_text_reader import read_txt_files_as_dict
from instruments.readers.acsm_tofware_reader import read_acsm_files_as_dict
from instruments.readers.acsm_flag_reader import read_jsonflag_as_dict
from instruments.readers.nasa_ames_reader import read_nasa_ames_as_dict
from instruments.readers.structured_file_reader import read_structured_file_as_dict
file_extensions = ['.ibw','.txt','.dat','.h5','.TXT','.csv','.pkl','.json','.yaml','.nas']
file_extensions = ['.ibw','.txt','.dat','.h5','.TXT','.csv','.pkl','.json','.yaml','yml','.nas']
# Define the instruments directory (modify this as needed or set to None)
default_instruments_dir = None # or provide an absolute path
@ -27,6 +28,9 @@ file_readers = {
'txt': lambda a1: read_txt_files_as_dict(a1, instruments_dir=default_instruments_dir, work_with_copy=False),
'dat': lambda a1: read_txt_files_as_dict(a1, instruments_dir=default_instruments_dir, work_with_copy=False),
'csv': lambda a1: read_txt_files_as_dict(a1, instruments_dir=default_instruments_dir, work_with_copy=False),
'yaml': lambda a1: read_structured_file_as_dict(a1),
'yml': lambda a1: read_structured_file_as_dict(a1),
'json': lambda a1: read_structured_file_as_dict(a1),
'ACSM_TOFWARE_txt' : lambda x: read_acsm_files_as_dict(x, instruments_dir=default_instruments_dir, work_with_copy=False),
'ACSM_TOFWARE_csv' : lambda x: read_acsm_files_as_dict(x, instruments_dir=default_instruments_dir, work_with_copy=False),
'ACSM_TOFWARE_flags_json' : lambda x: read_jsonflag_as_dict(x),
@ -52,7 +56,7 @@ def find_reader(instrument_folder, file_extension):
registry = load_registry()
for entry in registry:
if entry["instrumentFolderName"] == instrument_folder and entry["fileExtension"] == file_extension:
if entry["instrumentFolderName"] == instrument_folder and (file_extension in entry["fileExtension"].split(sep=',')):
return entry["fileReaderPath"], entry["InstrumentDictionaryPath"]
return None, None # Not found

View File

@ -0,0 +1,115 @@
import sys
import os
try:
thisFilePath = os.path.abspath(__file__)
except NameError:
print("Error: __file__ is not available. Ensure the script is being run from a file.")
print("[Notice] Path to DIMA package may not be resolved properly.")
thisFilePath = os.getcwd() # Use current directory or specify a default
dimaPath = os.path.normpath(os.path.join(thisFilePath, "..",'..','..')) # Move up to project root
if dimaPath not in sys.path: # Avoid duplicate entries
sys.path.insert(0,dimaPath)
import pandas as pd
import json, yaml
import h5py
import argparse
import logging
import utils.g5505_utils as utils
def read_structured_file_as_dict(path_to_file):
"""
Reads a JSON or YAML file, flattens nested structures using pandas.json_normalize,
converts to a NumPy structured array via utils.convert_attrdict_to_np_structured_array,
and returns a standardized dictionary.
"""
file_dict = {}
_, path_head = os.path.split(path_to_file)
file_dict['name'] = path_head
file_dict['attributes_dict'] = {}
file_dict['datasets'] = []
try:
with open(path_to_file, 'r') as stream:
if path_to_file.endswith(('.yaml', '.yml')):
raw_data = yaml.safe_load(stream)
elif path_to_file.endswith('.json'):
raw_data = json.load(stream)
else:
raise ValueError(f"Unsupported file type: {path_to_file}")
except Exception as exc:
logging.error("Failed to load input file %s: %s", path_to_file, exc)
raise
try:
df = pd.json_normalize(raw_data)
except Exception as exc:
logging.error("Failed to normalize data structure: %s", exc)
raise
for item_idx, item in enumerate(df.to_dict(orient='records')):
try:
structured_array = utils.convert_attrdict_to_np_structured_array(item)
except Exception as exc:
logging.error("Failed to convert to structured array: %s", exc)
raise
dataset = {
'name': f'data_table_{item_idx}',
'data': structured_array,
'shape': structured_array.shape,
'dtype': type(structured_array)
}
file_dict['datasets'].append(dataset)
return file_dict
if __name__ == "__main__":
from src.hdf5_ops import save_file_dict_to_hdf5
from utils.g5505_utils import created_at
parser = argparse.ArgumentParser(description="Data ingestion process to HDF5 files.")
parser.add_argument('dst_file_path', type=str, help="Path to the target HDF5 file.")
parser.add_argument('src_file_path', type=str, help="Relative path to source file to be saved to target HDF5 file.")
parser.add_argument('dst_group_name', type=str, help="Group name '/instFolder/[category]/fileName' in the target HDF5 file.")
args = parser.parse_args()
hdf5_file_path = args.dst_file_path
src_file_path = args.src_file_path
dst_group_name = args.dst_group_name
default_mode = 'r+'
try:
idr_dict = read_structured_file_as_dict(src_file_path)
if not os.path.exists(hdf5_file_path):
default_mode = 'w'
print(f'Opening HDF5 file: {hdf5_file_path} in mode {default_mode}')
with h5py.File(hdf5_file_path, mode=default_mode, track_order=True) as hdf5_file_obj:
try:
if dst_group_name not in hdf5_file_obj:
hdf5_file_obj.create_group(dst_group_name)
hdf5_file_obj[dst_group_name].attrs['creation_date'] = created_at().encode('utf-8')
print(f'Created new group: {dst_group_name}')
else:
print(f'Group {dst_group_name} already exists. Proceeding with data transfer...')
except Exception as inst:
logging.error('Failed to create group %s in HDF5: %s', dst_group_name, inst)
save_file_dict_to_hdf5(hdf5_file_obj, dst_group_name, idr_dict)
print(f'Completed saving file dict with keys: {idr_dict.keys()}')
except Exception as e:
logging.error('File reader failed to process %s: %s', src_file_path, e)
print(f'File reader failed to process {src_file_path}. See logs for details.')

View File

@ -78,3 +78,8 @@ instruments:
fileExtension: nas
fileReaderPath: instruments/readers/nasa_ames_reader.py
InstrumentDictionaryPath: instruments/dictionaries/EBAS.yaml
- instrumentFolderName: ACSM_TOFWARE
fileExtension: yaml,yml,json
fileReaderPath: instruments/readers/read_structured_file_as_dict.py
InstrumentDictionaryPath: instruments/dictionaries/EBAS.yaml

View File

@ -100,6 +100,20 @@ def create_hdf5_file_from_filesystem_path(path_to_input_directory: str,
print(message)
logging.error(message)
else:
# Step 1: Preprocess all metadata.json files into a lookup dict
all_metadata_dict = {}
for dirpath, filenames in path_to_filenames_dict.items():
metadata_file = next((f for f in filenames if f.endswith('metadata.json')), None)
if metadata_file:
metadata_path = os.path.join(dirpath, metadata_file)
try:
with open(metadata_path, 'r') as metafile:
all_metadata_dict[dirpath] = json.load(metafile)
except json.JSONDecodeError:
logging.warning(f"Invalid JSON in metadata file: {metadata_path}")
all_metadata_dict[dirpath] = {}
with h5py.File(path_to_output_file, mode=mode, track_order=True) as h5file:
number_of_dirs = len(path_to_filenames_dict.keys())
@ -138,21 +152,14 @@ def create_hdf5_file_from_filesystem_path(path_to_input_directory: str,
stdout = inst
logging.error('Failed to create group %s into HDF5: %s', group_name, inst)
if 'data_lineage_metadata.json' in filtered_filenames_list:
idx = filtered_filenames_list.index('data_lineage_metadata.json')
data_lineage_file = filtered_filenames_list[idx]
try:
with open('/'.join([dirpath,data_lineage_file]),'r') as dlf:
data_lineage_dict = json.load(dlf)
filtered_filenames_list.pop(idx)
except json.JSONDecodeError:
data_lineage_dict = {} # Start fresh if file is invalid
else:
data_lineage_dict = {}
# Step 3: During ingestion, attach metadata per file
metadata_dict = all_metadata_dict.get(dirpath, {})
for filenumber, filename in enumerate(filtered_filenames_list):
# Skip any file that itself ends in metadata.json
if filename.endswith('metadata.json'):
continue
# hdf5 path to filename group
dest_group_name = f'{group_name}/{filename}'
@ -163,6 +170,10 @@ def create_hdf5_file_from_filesystem_path(path_to_input_directory: str,
#file_dict = ext_to_reader_dict[file_ext](os.path.join(dirpath,filename))
file_dict = filereader_registry.select_file_reader(dest_group_name)(source_file_path)
# Attach per-file metadata if available
if filename in metadata_dict:
file_dict.get("attributes_dict",{}).update(metadata_dict[filename])
stdout = hdf5_ops.save_file_dict_to_hdf5(dest_file_obj, group_name, file_dict)
else:
@ -270,6 +281,21 @@ def create_hdf5_file_from_filesystem_path_new(path_to_input_directory: str,
print(message)
logging.error(message)
else:
# Step 1: Preprocess all metadata.json files into a lookup dict
all_metadata_dict = {}
for dirpath, filenames in path_to_filenames_dict.items():
metadata_file = next((f for f in filenames if f.endswith('metadata.json')), None)
if metadata_file:
metadata_path = os.path.join(dirpath, metadata_file)
try:
with open(metadata_path, 'r') as metafile:
all_metadata_dict[dirpath] = json.load(metafile)
except json.JSONDecodeError:
logging.warning(f"Invalid JSON in metadata file: {metadata_path}")
all_metadata_dict[dirpath] = {}
with h5py.File(path_to_output_file, mode=mode, track_order=True) as h5file:
print('Created file')
@ -309,7 +335,14 @@ def create_hdf5_file_from_filesystem_path_new(path_to_input_directory: str,
# stdout = inst
# logging.error('Failed to create group %s into HDF5: %s', group_name, inst)
# Step 3: During ingestion, attach metadata per file
# TODO: pass this metadata fict to run_file_reader line 363
metadata_dict = all_metadata_dict.get(dirpath, {})
for filenumber, filename in enumerate(filtered_filenames_list):
if filename.endswith('metadata.json'):
continue
#file_ext = os.path.splitext(filename)[1]
#try: