diff --git a/src/g5505_file_reader.py b/src/g5505_file_reader.py index d5065c6..07ad51e 100644 --- a/src/g5505_file_reader.py +++ b/src/g5505_file_reader.py @@ -14,6 +14,16 @@ import h5py ROOT_DIR = os.path.abspath(os.curdir) +file_extensions = ['.ibw','.txt','.dat','.h5','.TXT','.csv'] + +file_readers = {'ibw': lambda a1: read_xps_ibw_file_as_dict(a1), + 'txt': lambda a1: read_txt_files_as_dict(a1,False), + 'TXT': lambda a1: read_txt_files_as_dict(a1,False), + 'dat': lambda a1: read_txt_files_as_dict(a1,False), + 'h5': lambda a1,a2,a3: copy_file_in_group(a1,a2,a3,False), + 'ACSM_TOFWARE_txt': lambda a1: read_txt_files_as_dict(a1,False), + 'ACSM_TOFWARE_csv': lambda a1: read_txt_files_as_dict(a1,False)} + def read_xps_ibw_file_as_dict(filename): """ Reads IBW files from the Multiphase Chemistry Group, which contain XPS spectra and acquisition settings, @@ -119,8 +129,8 @@ def read_txt_files_as_dict(filename : str , work_with_copy : bool = True ): except yaml.YAMLError as exc: print(exc) # Verify if file can be read by available intrument configurations. - if not any(key in filename.replace(os.sep,'/') for key in config_dict.keys()): - return {} + #if not any(key in filename.replace(os.sep,'/') for key in config_dict.keys()): + # return {} #TODO: this may be prone to error if assumed folder structure is non compliant @@ -131,20 +141,21 @@ def read_txt_files_as_dict(filename : str , work_with_copy : bool = True ): for key in config_dict.keys(): if key.replace('/',os.sep) in filename: file_encoding = config_dict[key].get('file_encoding',file_encoding) - separator = config_dict[key].get('separator',separator).replace('\\t','\t') + separator = config_dict[key].get('separator',separator) table_header = config_dict[key].get('table_header',table_header) timestamp_variables = config_dict[key].get('timestamp',[]) datetime_format = config_dict[key].get('datetime_format',[]) description_dict = {} - #link_to_description = config_dict[key].get('link_to_description',[]).replace('/',os.sep) - link_to_description = os.path.join(module_dir,config_dict[key].get('link_to_description',[]).replace('/',os.sep)) - with open(link_to_description,'r') as stream: + link_to_description = config_dict[key].get('link_to_description', '').replace('/', os.sep) + + if link_to_description: + path = os.path.join(module_dir, link_to_description) try: - description_dict = yaml.load(stream, Loader=yaml.FullLoader) - except yaml.YAMLError as exc: + with open(path, 'r') as stream: + description_dict = yaml.load(stream, Loader=yaml.FullLoader) + except (FileNotFoundError, yaml.YAMLError) as exc: print(exc) - break #if 'None' in table_header: # return {} @@ -158,12 +169,23 @@ def read_txt_files_as_dict(filename : str , work_with_copy : bool = True ): tmp_filename = filename #with open(tmp_filename,'rb',encoding=file_encoding,errors='ignore') as f: + + if not isinstance(table_header, list): + table_header = [table_header] + file_encoding = [file_encoding] + separator = [separator] + with open(tmp_filename,'rb') as f: table_preamble = [] - for line_number, line in enumerate(f): + for line_number, line in enumerate(f): + - if table_header in line.decode(file_encoding): - list_of_substrings = line.decode(file_encoding).split(separator) + for tb_idx, tb in enumerate(table_header): + if tb in line.decode(file_encoding[tb_idx]): + break + + if tb in line.decode(file_encoding[tb_idx]): + list_of_substrings = line.decode(file_encoding[tb_idx]).split(separator[tb_idx].replace('\\t','\t')) # Count occurrences of each substring substring_counts = collections.Counter(list_of_substrings) @@ -180,7 +202,7 @@ def read_txt_files_as_dict(filename : str , work_with_copy : bool = True ): break # Subdivide line into words, and join them by single space. # I asumme this can produce a cleaner line that contains no weird separator characters \t \r or extra spaces and so on. - list_of_substrings = line.decode(file_encoding).split() + list_of_substrings = line.decode(file_encoding[tb_idx]).split() # TODO: ideally we should use a multilinear string but the yalm parser is not recognizing \n as special character #line = ' '.join(list_of_substrings+['\n']) #line = ' '.join(list_of_substrings) @@ -190,10 +212,10 @@ def read_txt_files_as_dict(filename : str , work_with_copy : bool = True ): # TODO: it does not work with separator as none :(. fix for RGA try: df = pd.read_csv(tmp_filename, - delimiter = separator, + delimiter = separator[tb_idx].replace('\\t','\t'), header=line_number, #encoding='latin-1', - encoding = file_encoding, + encoding = file_encoding[tb_idx], names=column_names, skip_blank_lines=True) @@ -264,10 +286,12 @@ def read_txt_files_as_dict(filename : str , work_with_copy : bool = True ): # Create attribute descriptions based on description_dict dataset['attributes'] = {} - for column_name in df.columns: - column_attr_dict = description_dict['table_header'].get(column_name, - {'note':'there was no description available. Review instrument files.'}) - dataset['attributes'].update({column_name: utils.parse_attribute(column_attr_dict)}) + # Annotate column headers if description_dict is non empty + if description_dict: + for column_name in df.columns: + column_attr_dict = description_dict['table_header'].get(column_name, + {'note':'there was no description available. Review instrument files.'}) + dataset['attributes'].update({column_name: utils.parse_attribute(column_attr_dict)}) #try: # dataset['attributes'] = description_dict['table_header'].copy() @@ -314,14 +338,64 @@ def read_txt_files_as_dict(filename : str , work_with_copy : bool = True ): return file_dict +def compute_filereader_key_from_path(hdf5_file_path): + """Constructs the key 'instrumentname_ext' based on hdf5_file_path, structured as + /instrumentname/to/filename.ext, which access the file reader that should be used to read such a file. + + Parameters + ---------- + hdf5_file_path : str + _description_ + + Returns + ------- + _type_ + _description_ + """ + + parts = hdf5_file_path.strip('/').split('/') + + # Extract the filename and its extension + filename, file_extension = os.path.splitext(parts[-1]) + + # Extract the first directory directly under the root directory '/' in the hdf5 file + subfolder_name = parts[0] if len(parts) > 1 else "" + + # Remove leading dot from the file extension + file_extension = file_extension.lstrip('.') + + # Construct the resulting string + full_string = f"{subfolder_name}_{file_extension}" + + return full_string, file_extension + +def select_file_reader(path): + full_string, extension = compute_filereader_key_from_path(path) + + # First, try to match the full string + if full_string in file_readers: + return file_readers[full_string] + + # If no match, try to match the reader using only the extension + if extension in file_readers: + return file_readers[extension] + + # Default case if no reader is found + return None + def main(): - inputfile_dir = '\\\\fs101\\5505\\People\\Juan\\TypicalBeamTime' + root_dir = '//fs101/5505/People/Juan/TypicalBeamTime/' + file_path = os.path.join(root_dir,'SES/0069069_N1s_495eV.ibw') - file_dict = read_xps_ibw_file_as_dict(inputfile_dir+'\\SES\\0069069_N1s_495eV.ibw') + file_reader = select_file_reader(file_path.replace(root_dir,'/')) - for key in file_dict.keys(): - print(key,file_dict[key]) + #file_dict = read_xps_ibw_file_as_dict(inputfile_dir+'\\SES\\0069069_N1s_495eV.ibw') + + if file_reader: + print(file_reader(file_path)) + else: + print("No suitable file reader found.") if __name__ == '__main__':