Compare commits
4 Commits
edabcb57f8
...
7b8a266057
Author | SHA1 | Date | |
---|---|---|---|
7b8a266057 | |||
98ce166e2a | |||
33d1f20d38 | |||
63e32403ba |
@ -2,7 +2,7 @@
|
||||
input_file_directory: '//fs03/Iron_Sulphate'
|
||||
|
||||
# Path to directory where raw data is copied and converted to HDF5 format for local analysis.
|
||||
output_file_directory: 'output_files/'
|
||||
output_file_directory: '../output_files/'
|
||||
|
||||
# Project metadata for data lineage and provenance
|
||||
project: 'Fe SOA project'
|
||||
|
@ -2,7 +2,7 @@
|
||||
input_file_directory: '//fs101/5505/People/Juan/TypicalBeamTime'
|
||||
|
||||
# Path to directory where raw data is copied and converted to HDF5 format for local analysis.
|
||||
output_file_directory: 'output_files/'
|
||||
output_file_directory: '../output_files/'
|
||||
|
||||
# Project metadata for data lineage and provenance
|
||||
project: 'Beamtime May 2024, Ice Napp'
|
||||
|
@ -16,8 +16,9 @@ from instruments.readers.g5505_text_reader import read_txt_files_as_dict
|
||||
from instruments.readers.acsm_tofware_reader import read_acsm_files_as_dict
|
||||
from instruments.readers.acsm_flag_reader import read_jsonflag_as_dict
|
||||
from instruments.readers.nasa_ames_reader import read_nasa_ames_as_dict
|
||||
from instruments.readers.structured_file_reader import read_structured_file_as_dict
|
||||
|
||||
file_extensions = ['.ibw','.txt','.dat','.h5','.TXT','.csv','.pkl','.json','.yaml','.nas']
|
||||
file_extensions = ['.ibw','.txt','.dat','.h5','.TXT','.csv','.pkl','.json','.yaml','yml','.nas']
|
||||
|
||||
# Define the instruments directory (modify this as needed or set to None)
|
||||
default_instruments_dir = None # or provide an absolute path
|
||||
@ -27,6 +28,9 @@ file_readers = {
|
||||
'txt': lambda a1: read_txt_files_as_dict(a1, instruments_dir=default_instruments_dir, work_with_copy=False),
|
||||
'dat': lambda a1: read_txt_files_as_dict(a1, instruments_dir=default_instruments_dir, work_with_copy=False),
|
||||
'csv': lambda a1: read_txt_files_as_dict(a1, instruments_dir=default_instruments_dir, work_with_copy=False),
|
||||
'yaml': lambda a1: read_structured_file_as_dict(a1),
|
||||
'yml': lambda a1: read_structured_file_as_dict(a1),
|
||||
'json': lambda a1: read_structured_file_as_dict(a1),
|
||||
'ACSM_TOFWARE_txt' : lambda x: read_acsm_files_as_dict(x, instruments_dir=default_instruments_dir, work_with_copy=False),
|
||||
'ACSM_TOFWARE_csv' : lambda x: read_acsm_files_as_dict(x, instruments_dir=default_instruments_dir, work_with_copy=False),
|
||||
'ACSM_TOFWARE_flags_json' : lambda x: read_jsonflag_as_dict(x),
|
||||
@ -52,7 +56,7 @@ def find_reader(instrument_folder, file_extension):
|
||||
registry = load_registry()
|
||||
|
||||
for entry in registry:
|
||||
if entry["instrumentFolderName"] == instrument_folder and entry["fileExtension"] == file_extension:
|
||||
if entry["instrumentFolderName"] == instrument_folder and (file_extension in entry["fileExtension"].split(sep=',')):
|
||||
return entry["fileReaderPath"], entry["InstrumentDictionaryPath"]
|
||||
|
||||
return None, None # Not found
|
||||
|
115
instruments/readers/structured_file_reader.py
Normal file
115
instruments/readers/structured_file_reader.py
Normal file
@ -0,0 +1,115 @@
|
||||
import sys
|
||||
import os
|
||||
|
||||
try:
|
||||
thisFilePath = os.path.abspath(__file__)
|
||||
except NameError:
|
||||
print("Error: __file__ is not available. Ensure the script is being run from a file.")
|
||||
print("[Notice] Path to DIMA package may not be resolved properly.")
|
||||
thisFilePath = os.getcwd() # Use current directory or specify a default
|
||||
|
||||
dimaPath = os.path.normpath(os.path.join(thisFilePath, "..",'..','..')) # Move up to project root
|
||||
|
||||
if dimaPath not in sys.path: # Avoid duplicate entries
|
||||
sys.path.insert(0,dimaPath)
|
||||
|
||||
import pandas as pd
|
||||
import json, yaml
|
||||
import h5py
|
||||
import argparse
|
||||
import logging
|
||||
|
||||
import utils.g5505_utils as utils
|
||||
|
||||
def read_structured_file_as_dict(path_to_file):
|
||||
"""
|
||||
Reads a JSON or YAML file, flattens nested structures using pandas.json_normalize,
|
||||
converts to a NumPy structured array via utils.convert_attrdict_to_np_structured_array,
|
||||
and returns a standardized dictionary.
|
||||
"""
|
||||
|
||||
file_dict = {}
|
||||
_, path_head = os.path.split(path_to_file)
|
||||
|
||||
file_dict['name'] = path_head
|
||||
file_dict['attributes_dict'] = {}
|
||||
file_dict['datasets'] = []
|
||||
|
||||
try:
|
||||
with open(path_to_file, 'r') as stream:
|
||||
if path_to_file.endswith(('.yaml', '.yml')):
|
||||
raw_data = yaml.safe_load(stream)
|
||||
elif path_to_file.endswith('.json'):
|
||||
raw_data = json.load(stream)
|
||||
else:
|
||||
raise ValueError(f"Unsupported file type: {path_to_file}")
|
||||
except Exception as exc:
|
||||
logging.error("Failed to load input file %s: %s", path_to_file, exc)
|
||||
raise
|
||||
|
||||
try:
|
||||
df = pd.json_normalize(raw_data)
|
||||
except Exception as exc:
|
||||
logging.error("Failed to normalize data structure: %s", exc)
|
||||
raise
|
||||
|
||||
for item_idx, item in enumerate(df.to_dict(orient='records')):
|
||||
try:
|
||||
structured_array = utils.convert_attrdict_to_np_structured_array(item)
|
||||
except Exception as exc:
|
||||
logging.error("Failed to convert to structured array: %s", exc)
|
||||
raise
|
||||
|
||||
dataset = {
|
||||
'name': f'data_table_{item_idx}',
|
||||
'data': structured_array,
|
||||
'shape': structured_array.shape,
|
||||
'dtype': type(structured_array)
|
||||
}
|
||||
|
||||
file_dict['datasets'].append(dataset)
|
||||
|
||||
return file_dict
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
from src.hdf5_ops import save_file_dict_to_hdf5
|
||||
from utils.g5505_utils import created_at
|
||||
|
||||
parser = argparse.ArgumentParser(description="Data ingestion process to HDF5 files.")
|
||||
parser.add_argument('dst_file_path', type=str, help="Path to the target HDF5 file.")
|
||||
parser.add_argument('src_file_path', type=str, help="Relative path to source file to be saved to target HDF5 file.")
|
||||
parser.add_argument('dst_group_name', type=str, help="Group name '/instFolder/[category]/fileName' in the target HDF5 file.")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
hdf5_file_path = args.dst_file_path
|
||||
src_file_path = args.src_file_path
|
||||
dst_group_name = args.dst_group_name
|
||||
default_mode = 'r+'
|
||||
|
||||
try:
|
||||
idr_dict = read_structured_file_as_dict(src_file_path)
|
||||
|
||||
if not os.path.exists(hdf5_file_path):
|
||||
default_mode = 'w'
|
||||
|
||||
print(f'Opening HDF5 file: {hdf5_file_path} in mode {default_mode}')
|
||||
|
||||
with h5py.File(hdf5_file_path, mode=default_mode, track_order=True) as hdf5_file_obj:
|
||||
try:
|
||||
if dst_group_name not in hdf5_file_obj:
|
||||
hdf5_file_obj.create_group(dst_group_name)
|
||||
hdf5_file_obj[dst_group_name].attrs['creation_date'] = created_at().encode('utf-8')
|
||||
print(f'Created new group: {dst_group_name}')
|
||||
else:
|
||||
print(f'Group {dst_group_name} already exists. Proceeding with data transfer...')
|
||||
except Exception as inst:
|
||||
logging.error('Failed to create group %s in HDF5: %s', dst_group_name, inst)
|
||||
|
||||
save_file_dict_to_hdf5(hdf5_file_obj, dst_group_name, idr_dict)
|
||||
print(f'Completed saving file dict with keys: {idr_dict.keys()}')
|
||||
|
||||
except Exception as e:
|
||||
logging.error('File reader failed to process %s: %s', src_file_path, e)
|
||||
print(f'File reader failed to process {src_file_path}. See logs for details.')
|
@ -78,3 +78,8 @@ instruments:
|
||||
fileExtension: nas
|
||||
fileReaderPath: instruments/readers/nasa_ames_reader.py
|
||||
InstrumentDictionaryPath: instruments/dictionaries/EBAS.yaml
|
||||
|
||||
- instrumentFolderName: ACSM_TOFWARE
|
||||
fileExtension: yaml,yml,json
|
||||
fileReaderPath: instruments/readers/read_structured_file_as_dict.py
|
||||
InstrumentDictionaryPath: instruments/dictionaries/EBAS.yaml
|
||||
|
@ -100,6 +100,20 @@ def create_hdf5_file_from_filesystem_path(path_to_input_directory: str,
|
||||
print(message)
|
||||
logging.error(message)
|
||||
else:
|
||||
# Step 1: Preprocess all metadata.json files into a lookup dict
|
||||
all_metadata_dict = {}
|
||||
|
||||
for dirpath, filenames in path_to_filenames_dict.items():
|
||||
metadata_file = next((f for f in filenames if f.endswith('metadata.json')), None)
|
||||
if metadata_file:
|
||||
metadata_path = os.path.join(dirpath, metadata_file)
|
||||
try:
|
||||
with open(metadata_path, 'r') as metafile:
|
||||
all_metadata_dict[dirpath] = json.load(metafile)
|
||||
except json.JSONDecodeError:
|
||||
logging.warning(f"Invalid JSON in metadata file: {metadata_path}")
|
||||
all_metadata_dict[dirpath] = {}
|
||||
|
||||
with h5py.File(path_to_output_file, mode=mode, track_order=True) as h5file:
|
||||
|
||||
number_of_dirs = len(path_to_filenames_dict.keys())
|
||||
@ -138,21 +152,14 @@ def create_hdf5_file_from_filesystem_path(path_to_input_directory: str,
|
||||
stdout = inst
|
||||
logging.error('Failed to create group %s into HDF5: %s', group_name, inst)
|
||||
|
||||
if 'data_lineage_metadata.json' in filtered_filenames_list:
|
||||
idx = filtered_filenames_list.index('data_lineage_metadata.json')
|
||||
data_lineage_file = filtered_filenames_list[idx]
|
||||
try:
|
||||
with open('/'.join([dirpath,data_lineage_file]),'r') as dlf:
|
||||
data_lineage_dict = json.load(dlf)
|
||||
filtered_filenames_list.pop(idx)
|
||||
except json.JSONDecodeError:
|
||||
data_lineage_dict = {} # Start fresh if file is invalid
|
||||
|
||||
else:
|
||||
data_lineage_dict = {}
|
||||
|
||||
# Step 3: During ingestion, attach metadata per file
|
||||
metadata_dict = all_metadata_dict.get(dirpath, {})
|
||||
|
||||
for filenumber, filename in enumerate(filtered_filenames_list):
|
||||
|
||||
# Skip any file that itself ends in metadata.json
|
||||
if filename.endswith('metadata.json'):
|
||||
continue
|
||||
|
||||
# hdf5 path to filename group
|
||||
dest_group_name = f'{group_name}/{filename}'
|
||||
@ -163,6 +170,10 @@ def create_hdf5_file_from_filesystem_path(path_to_input_directory: str,
|
||||
#file_dict = ext_to_reader_dict[file_ext](os.path.join(dirpath,filename))
|
||||
file_dict = filereader_registry.select_file_reader(dest_group_name)(source_file_path)
|
||||
|
||||
# Attach per-file metadata if available
|
||||
if filename in metadata_dict:
|
||||
file_dict.get("attributes_dict",{}).update(metadata_dict[filename])
|
||||
|
||||
stdout = hdf5_ops.save_file_dict_to_hdf5(dest_file_obj, group_name, file_dict)
|
||||
|
||||
else:
|
||||
@ -270,6 +281,21 @@ def create_hdf5_file_from_filesystem_path_new(path_to_input_directory: str,
|
||||
print(message)
|
||||
logging.error(message)
|
||||
else:
|
||||
|
||||
# Step 1: Preprocess all metadata.json files into a lookup dict
|
||||
all_metadata_dict = {}
|
||||
|
||||
for dirpath, filenames in path_to_filenames_dict.items():
|
||||
metadata_file = next((f for f in filenames if f.endswith('metadata.json')), None)
|
||||
if metadata_file:
|
||||
metadata_path = os.path.join(dirpath, metadata_file)
|
||||
try:
|
||||
with open(metadata_path, 'r') as metafile:
|
||||
all_metadata_dict[dirpath] = json.load(metafile)
|
||||
except json.JSONDecodeError:
|
||||
logging.warning(f"Invalid JSON in metadata file: {metadata_path}")
|
||||
all_metadata_dict[dirpath] = {}
|
||||
|
||||
with h5py.File(path_to_output_file, mode=mode, track_order=True) as h5file:
|
||||
print('Created file')
|
||||
|
||||
@ -309,7 +335,14 @@ def create_hdf5_file_from_filesystem_path_new(path_to_input_directory: str,
|
||||
# stdout = inst
|
||||
# logging.error('Failed to create group %s into HDF5: %s', group_name, inst)
|
||||
|
||||
# Step 3: During ingestion, attach metadata per file
|
||||
# TODO: pass this metadata fict to run_file_reader line 363
|
||||
metadata_dict = all_metadata_dict.get(dirpath, {})
|
||||
|
||||
for filenumber, filename in enumerate(filtered_filenames_list):
|
||||
|
||||
if filename.endswith('metadata.json'):
|
||||
continue
|
||||
|
||||
#file_ext = os.path.splitext(filename)[1]
|
||||
#try:
|
||||
|
Reference in New Issue
Block a user