Moved hdf5_file_path to file reader mapping and extension definitions to g5505_file_reader_module.py. Created functions to compute file_reader key from path to file in the hdf5 file and select the reader based on the key. This should enable more modular file reader selection.

This commit is contained in:
2024-08-07 16:21:22 +02:00
parent 3430627494
commit 4e669b3eee

View File

@ -14,6 +14,16 @@ import h5py
ROOT_DIR = os.path.abspath(os.curdir)
file_extensions = ['.ibw','.txt','.dat','.h5','.TXT','.csv']
file_readers = {'ibw': lambda a1: read_xps_ibw_file_as_dict(a1),
'txt': lambda a1: read_txt_files_as_dict(a1,False),
'TXT': lambda a1: read_txt_files_as_dict(a1,False),
'dat': lambda a1: read_txt_files_as_dict(a1,False),
'h5': lambda a1,a2,a3: copy_file_in_group(a1,a2,a3,False),
'ACSM_TOFWARE_txt': lambda a1: read_txt_files_as_dict(a1,False),
'ACSM_TOFWARE_csv': lambda a1: read_txt_files_as_dict(a1,False)}
def read_xps_ibw_file_as_dict(filename):
"""
Reads IBW files from the Multiphase Chemistry Group, which contain XPS spectra and acquisition settings,
@ -119,8 +129,8 @@ def read_txt_files_as_dict(filename : str , work_with_copy : bool = True ):
except yaml.YAMLError as exc:
print(exc)
# Verify if file can be read by available intrument configurations.
if not any(key in filename.replace(os.sep,'/') for key in config_dict.keys()):
return {}
#if not any(key in filename.replace(os.sep,'/') for key in config_dict.keys()):
# return {}
#TODO: this may be prone to error if assumed folder structure is non compliant
@ -131,20 +141,21 @@ def read_txt_files_as_dict(filename : str , work_with_copy : bool = True ):
for key in config_dict.keys():
if key.replace('/',os.sep) in filename:
file_encoding = config_dict[key].get('file_encoding',file_encoding)
separator = config_dict[key].get('separator',separator).replace('\\t','\t')
separator = config_dict[key].get('separator',separator)
table_header = config_dict[key].get('table_header',table_header)
timestamp_variables = config_dict[key].get('timestamp',[])
datetime_format = config_dict[key].get('datetime_format',[])
description_dict = {}
#link_to_description = config_dict[key].get('link_to_description',[]).replace('/',os.sep)
link_to_description = os.path.join(module_dir,config_dict[key].get('link_to_description',[]).replace('/',os.sep))
with open(link_to_description,'r') as stream:
link_to_description = config_dict[key].get('link_to_description', '').replace('/', os.sep)
if link_to_description:
path = os.path.join(module_dir, link_to_description)
try:
description_dict = yaml.load(stream, Loader=yaml.FullLoader)
except yaml.YAMLError as exc:
with open(path, 'r') as stream:
description_dict = yaml.load(stream, Loader=yaml.FullLoader)
except (FileNotFoundError, yaml.YAMLError) as exc:
print(exc)
break
#if 'None' in table_header:
# return {}
@ -158,12 +169,23 @@ def read_txt_files_as_dict(filename : str , work_with_copy : bool = True ):
tmp_filename = filename
#with open(tmp_filename,'rb',encoding=file_encoding,errors='ignore') as f:
if not isinstance(table_header, list):
table_header = [table_header]
file_encoding = [file_encoding]
separator = [separator]
with open(tmp_filename,'rb') as f:
table_preamble = []
for line_number, line in enumerate(f):
for line_number, line in enumerate(f):
if table_header in line.decode(file_encoding):
list_of_substrings = line.decode(file_encoding).split(separator)
for tb_idx, tb in enumerate(table_header):
if tb in line.decode(file_encoding[tb_idx]):
break
if tb in line.decode(file_encoding[tb_idx]):
list_of_substrings = line.decode(file_encoding[tb_idx]).split(separator[tb_idx].replace('\\t','\t'))
# Count occurrences of each substring
substring_counts = collections.Counter(list_of_substrings)
@ -180,7 +202,7 @@ def read_txt_files_as_dict(filename : str , work_with_copy : bool = True ):
break
# Subdivide line into words, and join them by single space.
# I asumme this can produce a cleaner line that contains no weird separator characters \t \r or extra spaces and so on.
list_of_substrings = line.decode(file_encoding).split()
list_of_substrings = line.decode(file_encoding[tb_idx]).split()
# TODO: ideally we should use a multilinear string but the yalm parser is not recognizing \n as special character
#line = ' '.join(list_of_substrings+['\n'])
#line = ' '.join(list_of_substrings)
@ -190,10 +212,10 @@ def read_txt_files_as_dict(filename : str , work_with_copy : bool = True ):
# TODO: it does not work with separator as none :(. fix for RGA
try:
df = pd.read_csv(tmp_filename,
delimiter = separator,
delimiter = separator[tb_idx].replace('\\t','\t'),
header=line_number,
#encoding='latin-1',
encoding = file_encoding,
encoding = file_encoding[tb_idx],
names=column_names,
skip_blank_lines=True)
@ -264,10 +286,12 @@ def read_txt_files_as_dict(filename : str , work_with_copy : bool = True ):
# Create attribute descriptions based on description_dict
dataset['attributes'] = {}
for column_name in df.columns:
column_attr_dict = description_dict['table_header'].get(column_name,
{'note':'there was no description available. Review instrument files.'})
dataset['attributes'].update({column_name: utils.parse_attribute(column_attr_dict)})
# Annotate column headers if description_dict is non empty
if description_dict:
for column_name in df.columns:
column_attr_dict = description_dict['table_header'].get(column_name,
{'note':'there was no description available. Review instrument files.'})
dataset['attributes'].update({column_name: utils.parse_attribute(column_attr_dict)})
#try:
# dataset['attributes'] = description_dict['table_header'].copy()
@ -314,14 +338,64 @@ def read_txt_files_as_dict(filename : str , work_with_copy : bool = True ):
return file_dict
def compute_filereader_key_from_path(hdf5_file_path):
"""Constructs the key 'instrumentname_ext' based on hdf5_file_path, structured as
/instrumentname/to/filename.ext, which access the file reader that should be used to read such a file.
Parameters
----------
hdf5_file_path : str
_description_
Returns
-------
_type_
_description_
"""
parts = hdf5_file_path.strip('/').split('/')
# Extract the filename and its extension
filename, file_extension = os.path.splitext(parts[-1])
# Extract the first directory directly under the root directory '/' in the hdf5 file
subfolder_name = parts[0] if len(parts) > 1 else ""
# Remove leading dot from the file extension
file_extension = file_extension.lstrip('.')
# Construct the resulting string
full_string = f"{subfolder_name}_{file_extension}"
return full_string, file_extension
def select_file_reader(path):
full_string, extension = compute_filereader_key_from_path(path)
# First, try to match the full string
if full_string in file_readers:
return file_readers[full_string]
# If no match, try to match the reader using only the extension
if extension in file_readers:
return file_readers[extension]
# Default case if no reader is found
return None
def main():
inputfile_dir = '\\\\fs101\\5505\\People\\Juan\\TypicalBeamTime'
root_dir = '//fs101/5505/People/Juan/TypicalBeamTime/'
file_path = os.path.join(root_dir,'SES/0069069_N1s_495eV.ibw')
file_dict = read_xps_ibw_file_as_dict(inputfile_dir+'\\SES\\0069069_N1s_495eV.ibw')
file_reader = select_file_reader(file_path.replace(root_dir,'/'))
for key in file_dict.keys():
print(key,file_dict[key])
#file_dict = read_xps_ibw_file_as_dict(inputfile_dir+'\\SES\\0069069_N1s_495eV.ibw')
if file_reader:
print(file_reader(file_path))
else:
print("No suitable file reader found.")
if __name__ == '__main__':