153 lines
5.5 KiB
Python
153 lines
5.5 KiB
Python
import sys
|
|
import os
|
|
|
|
try:
|
|
thisFilePath = os.path.abspath(__file__)
|
|
except NameError:
|
|
print("Error: __file__ is not available. Ensure the script is being run from a file.")
|
|
print("[Notice] Path to DIMA package may not be resolved properly.")
|
|
thisFilePath = os.getcwd() # Use current directory or specify a default
|
|
|
|
dimaPath = os.path.normpath(os.path.join(thisFilePath, "..",'..','..')) # Move up to project root
|
|
|
|
if dimaPath not in sys.path: # Avoid duplicate entries
|
|
sys.path.insert(0,dimaPath)
|
|
|
|
|
|
|
|
import h5py
|
|
|
|
from igor2.binarywave import load as loadibw
|
|
import logging
|
|
import argparse
|
|
import utils.g5505_utils as utils
|
|
|
|
|
|
def read_xps_ibw_file_as_dict(filename):
|
|
"""
|
|
Reads IBW files from the Multiphase Chemistry Group, which contain XPS spectra and acquisition settings,
|
|
and formats the data into a dictionary with the structure {datasets: list of datasets}. Each dataset in the
|
|
list has the following structure:
|
|
|
|
{
|
|
'name': 'name',
|
|
'data': data_array,
|
|
'data_units': 'units',
|
|
'shape': data_shape,
|
|
'dtype': data_type
|
|
}
|
|
|
|
Parameters
|
|
----------
|
|
filename : str
|
|
The IBW filename from the Multiphase Chemistry Group beamline.
|
|
|
|
Returns
|
|
-------
|
|
file_dict : dict
|
|
A dictionary containing the datasets from the IBW file.
|
|
|
|
Raises
|
|
------
|
|
ValueError
|
|
If the input IBW file is not a valid IBW file.
|
|
|
|
"""
|
|
|
|
|
|
file_obj = loadibw(filename)
|
|
|
|
required_keys = ['wData','data_units','dimension_units','note']
|
|
if sum([item in required_keys for item in file_obj['wave'].keys()]) < len(required_keys):
|
|
raise ValueError('This is not a valid xps ibw file. It does not satisfy minimum adimissibility criteria.')
|
|
|
|
file_dict = {}
|
|
path_tail, path_head = os.path.split(filename)
|
|
|
|
# Group name and attributes
|
|
file_dict['name'] = path_head
|
|
file_dict['attributes_dict'] = {'actris_level': 0, 'processing_date':utils.created_at(), 'processing_script' : os.path.relpath(thisFilePath,dimaPath)}
|
|
|
|
# Convert notes of bytes class to string class and split string into a list of elements separated by '\r'.
|
|
notes_list = file_obj['wave']['note'].decode("utf-8").split('\r')
|
|
exclude_list = ['Excitation Energy']
|
|
for item in notes_list:
|
|
if '=' in item:
|
|
key, value = tuple(item.split('='))
|
|
# TODO: check if value can be converted into a numeric type. Now all values are string type
|
|
if not key in exclude_list:
|
|
file_dict['attributes_dict'][key] = value
|
|
|
|
# TODO: talk to Thorsten to see if there is an easier way to access the below attributes
|
|
dimension_labels = file_obj['wave']['dimension_units'].decode("utf-8").split(']')
|
|
file_dict['attributes_dict']['dimension_units'] = [item+']' for item in dimension_labels[0:len(dimension_labels)-1]]
|
|
|
|
# Datasets and their attributes
|
|
|
|
file_dict['datasets'] = []
|
|
|
|
dataset = {}
|
|
dataset['name'] = 'spectrum'
|
|
dataset['data'] = file_obj['wave']['wData']
|
|
dataset['data_units'] = file_obj['wave']['data_units']
|
|
dataset['shape'] = dataset['data'].shape
|
|
dataset['dtype'] = type(dataset['data'])
|
|
|
|
# TODO: include energy axis dataset
|
|
|
|
file_dict['datasets'].append(dataset)
|
|
|
|
|
|
return file_dict
|
|
|
|
if __name__ == "__main__":
|
|
|
|
from src.hdf5_ops import save_file_dict_to_hdf5
|
|
from utils.g5505_utils import created_at
|
|
|
|
|
|
|
|
# Set up argument parsing
|
|
parser = argparse.ArgumentParser(description="Data ingestion process to HDF5 files.")
|
|
parser.add_argument('dst_file_path', type=str, help="Path to the target HDF5 file.")
|
|
parser.add_argument('src_file_path', type=str, help="Relative path to source file to be saved to target HDF5 file.")
|
|
parser.add_argument('dst_group_name', type=str, help="Group name '/instFolder/[category]/fileName' in the target HDF5 file.")
|
|
|
|
args = parser.parse_args()
|
|
|
|
hdf5_file_path = args.dst_file_path
|
|
src_file_path = args.src_file_path
|
|
dst_group_name = args.dst_group_name
|
|
default_mode = 'r+'
|
|
|
|
try:
|
|
# Read source file and return an internal dictionary representation
|
|
idr_dict = read_xps_ibw_file_as_dict(src_file_path)
|
|
|
|
if not os.path.exists(hdf5_file_path):
|
|
default_mode = 'w'
|
|
|
|
print(f'Opening HDF5 file: {hdf5_file_path} in mode {default_mode}')
|
|
|
|
with h5py.File(hdf5_file_path, mode=default_mode, track_order=True) as hdf5_file_obj:
|
|
try:
|
|
# Create group if it does not exist
|
|
if dst_group_name not in hdf5_file_obj:
|
|
hdf5_file_obj.create_group(dst_group_name)
|
|
hdf5_file_obj[dst_group_name].attrs['creation_date'] = created_at().encode('utf-8')
|
|
print(f'Created new group: {dst_group_name}')
|
|
else:
|
|
print(f'Group {dst_group_name} already exists. Proceeding with data transfer...')
|
|
|
|
except Exception as inst:
|
|
logging.error('Failed to create group %s in HDF5: %s', dst_group_name, inst)
|
|
|
|
# Save dictionary to HDF5
|
|
save_file_dict_to_hdf5(hdf5_file_obj, dst_group_name, idr_dict)
|
|
print(f'Completed saving file dict with keys: {idr_dict.keys()}')
|
|
|
|
except Exception as e:
|
|
logging.error('File reader failed to process %s: %s', src_file_path, e)
|
|
print(f'File reader failed to process {src_file_path}. See logs for details.')
|
|
|