import sys import os root_dir = os.path.abspath(os.curdir) sys.path.append(root_dir) import numpy as np import pandas as pd import collections from igor2.binarywave import load as loadibw import utils.g5505_utils as utils #import src.metadata_review_lib as metadata #from src.metadata_review_lib import parse_attribute import yaml import h5py ROOT_DIR = os.path.abspath(os.curdir) file_extensions = ['.ibw','.txt','.dat','.h5','.TXT','.csv'] # Define the instruments directory (modify this as needed or set to None) default_instruments_dir = None # or provide an absolute path file_readers = { 'ibw': lambda a1: read_xps_ibw_file_as_dict(a1), 'txt': lambda a1: read_txt_files_as_dict(a1, instruments_dir=default_instruments_dir, work_with_copy=False), 'TXT': lambda a1: read_txt_files_as_dict(a1, instruments_dir=default_instruments_dir, work_with_copy=False), 'dat': lambda a1: read_txt_files_as_dict(a1, instruments_dir=default_instruments_dir, work_with_copy=False), #'h5': lambda a1, a2, a3: copy_file_in_group(a1, a2, a3, work_with_copy=False), 'ACSM_TOFWARE_txt': lambda a1: read_txt_files_as_dict(a1, instruments_dir=default_instruments_dir, work_with_copy=False), 'ACSM_TOFWARE_csv': lambda a1: read_txt_files_as_dict(a1, instruments_dir=default_instruments_dir, work_with_copy=False) } def read_xps_ibw_file_as_dict(filename): """ Reads IBW files from the Multiphase Chemistry Group, which contain XPS spectra and acquisition settings, and formats the data into a dictionary with the structure {datasets: list of datasets}. Each dataset in the list has the following structure: { 'name': 'name', 'data': data_array, 'data_units': 'units', 'shape': data_shape, 'dtype': data_type } Parameters ---------- filename : str The IBW filename from the Multiphase Chemistry Group beamline. Returns ------- file_dict : dict A dictionary containing the datasets from the IBW file. Raises ------ ValueError If the input IBW file is not a valid IBW file. """ file_obj = loadibw(filename) required_keys = ['wData','data_units','dimension_units','note'] if sum([item in required_keys for item in file_obj['wave'].keys()]) < len(required_keys): raise ValueError('This is not a valid xps ibw file. It does not satisfy minimum adimissibility criteria.') file_dict = {} path_tail, path_head = os.path.split(filename) # Group name and attributes file_dict['name'] = path_head file_dict['attributes_dict'] = {} # Convert notes of bytes class to string class and split string into a list of elements separated by '\r'. notes_list = file_obj['wave']['note'].decode("utf-8").split('\r') exclude_list = ['Excitation Energy'] for item in notes_list: if '=' in item: key, value = tuple(item.split('=')) # TODO: check if value can be converted into a numeric type. Now all values are string type if not key in exclude_list: file_dict['attributes_dict'][key] = value # TODO: talk to Thorsten to see if there is an easier way to access the below attributes dimension_labels = file_obj['wave']['dimension_units'].decode("utf-8").split(']') file_dict['attributes_dict']['dimension_units'] = [item+']' for item in dimension_labels[0:len(dimension_labels)-1]] # Datasets and their attributes file_dict['datasets'] = [] dataset = {} dataset['name'] = 'spectrum' dataset['data'] = file_obj['wave']['wData'] dataset['data_units'] = file_obj['wave']['data_units'] dataset['shape'] = dataset['data'].shape dataset['dtype'] = type(dataset['data']) # TODO: include energy axis dataset file_dict['datasets'].append(dataset) return file_dict def read_txt_files_as_dict(filename: str, instruments_dir: str = None, work_with_copy: bool = True): # If instruments_dir is not provided, use the default path relative to the module directory if not instruments_dir: # Assuming the instruments folder is one level up from the source module directory #module_dir = os.path.dirname(__file__) #instruments_dir = os.path.join(module_dir, '..', 'instruments') instruments_dir = os.path.join(root_dir,'instruments') # Normalize the path (resolves any '..' in the path) instrument_configs_path = os.path.abspath(os.path.join(instruments_dir,'text_data_sources.yaml')) with open(instrument_configs_path,'r') as stream: try: config_dict = yaml.load(stream, Loader=yaml.FullLoader) except yaml.YAMLError as exc: print(exc) # Verify if file can be read by available intrument configurations. #if not any(key in filename.replace(os.sep,'/') for key in config_dict.keys()): # return {} #TODO: this may be prone to error if assumed folder structure is non compliant file_encoding = config_dict['default']['file_encoding'] #'utf-8' separator = config_dict['default']['separator'] table_header = config_dict['default']['table_header'] for key in config_dict.keys(): if key.replace('/',os.sep) in filename: file_encoding = config_dict[key].get('file_encoding',file_encoding) separator = config_dict[key].get('separator',separator) table_header = config_dict[key].get('table_header',table_header) timestamp_variables = config_dict[key].get('timestamp',[]) datetime_format = config_dict[key].get('datetime_format',[]) description_dict = {} link_to_description = config_dict[key].get('link_to_description', '').replace('/', os.sep) if link_to_description: path = os.path.join(instruments_dir, link_to_description) try: with open(path, 'r') as stream: description_dict = yaml.load(stream, Loader=yaml.FullLoader) except (FileNotFoundError, yaml.YAMLError) as exc: print(exc) #if 'None' in table_header: # return {} # Read header as a dictionary and detect where data table starts header_dict = {} data_start = False # Work with copy of the file for safety if work_with_copy: tmp_filename = utils.make_file_copy(source_file_path=filename) else: tmp_filename = filename #with open(tmp_filename,'rb',encoding=file_encoding,errors='ignore') as f: if not isinstance(table_header, list): table_header = [table_header] file_encoding = [file_encoding] separator = [separator] with open(tmp_filename,'rb') as f: table_preamble = [] for line_number, line in enumerate(f): for tb_idx, tb in enumerate(table_header): if tb in line.decode(file_encoding[tb_idx]): break if tb in line.decode(file_encoding[tb_idx]): list_of_substrings = line.decode(file_encoding[tb_idx]).split(separator[tb_idx].replace('\\t','\t')) # Count occurrences of each substring substring_counts = collections.Counter(list_of_substrings) data_start = True # Generate column names with appended index only for repeated substrings column_names = [f"{i}_{name.strip()}" if substring_counts[name] > 1 else name.strip() for i, name in enumerate(list_of_substrings)] #column_names = [str(i)+'_'+name.strip() for i, name in enumerate(list_of_substrings)] #column_names = [] #for i, name in enumerate(list_of_substrings): # column_names.append(str(i)+'_'+name) #print(line_number, len(column_names ),'\n') break # Subdivide line into words, and join them by single space. # I asumme this can produce a cleaner line that contains no weird separator characters \t \r or extra spaces and so on. list_of_substrings = line.decode(file_encoding[tb_idx]).split() # TODO: ideally we should use a multilinear string but the yalm parser is not recognizing \n as special character #line = ' '.join(list_of_substrings+['\n']) #line = ' '.join(list_of_substrings) table_preamble.append(' '.join([item for item in list_of_substrings]))# += new_line # TODO: it does not work with separator as none :(. fix for RGA try: df = pd.read_csv(tmp_filename, delimiter = separator[tb_idx].replace('\\t','\t'), header=line_number, #encoding='latin-1', encoding = file_encoding[tb_idx], names=column_names, skip_blank_lines=True) df_numerical_attrs = df.select_dtypes(include ='number') df_categorical_attrs = df.select_dtypes(exclude='number') numerical_variables = [item for item in df_numerical_attrs.columns] # Consolidate into single timestamp column the separate columns 'date' 'time' specified in text_data_source.yaml if timestamp_variables: #df_categorical_attrs['timestamps'] = [' '.join(df_categorical_attrs.loc[i,timestamp_variables].to_numpy()) for i in df.index] #df_categorical_attrs['timestamps'] = [ df_categorical_attrs.loc[i,'0_Date']+' '+df_categorical_attrs.loc[i,'1_Time'] for i in df.index] #df_categorical_attrs['timestamps'] = df_categorical_attrs[timestamp_variables].astype(str).agg(' '.join, axis=1) timestamps_name = ' '.join(timestamp_variables) df_categorical_attrs[ timestamps_name] = df_categorical_attrs[timestamp_variables].astype(str).agg(' '.join, axis=1) valid_indices = [] if datetime_format: df_categorical_attrs[ timestamps_name] = pd.to_datetime(df_categorical_attrs[ timestamps_name],format=datetime_format,errors='coerce') valid_indices = df_categorical_attrs.dropna(subset=[timestamps_name]).index df_categorical_attrs = df_categorical_attrs.loc[valid_indices,:] df_numerical_attrs = df_numerical_attrs.loc[valid_indices,:] df_categorical_attrs[timestamps_name] = df_categorical_attrs[timestamps_name].dt.strftime(config_dict['default']['desired_format']) startdate = df_categorical_attrs[timestamps_name].min() enddate = df_categorical_attrs[timestamps_name].max() df_categorical_attrs[timestamps_name] = df_categorical_attrs[timestamps_name].astype(str) #header_dict.update({'stastrrtdate':startdate,'enddate':enddate}) header_dict['startdate']= str(startdate) header_dict['enddate']=str(enddate) if len(timestamp_variables) > 1: df_categorical_attrs = df_categorical_attrs.drop(columns = timestamp_variables) #df_categorical_attrs.reindex(drop=True) #df_numerical_attrs.reindex(drop=True) categorical_variables = [item for item in df_categorical_attrs.columns] #### #elif 'RGA' in filename: # df_categorical_attrs = df_categorical_attrs.rename(columns={'0_Time(s)' : 'timestamps'}) ### file_dict = {} path_tail, path_head = os.path.split(tmp_filename) file_dict['name'] = path_head # TODO: review this header dictionary, it may not be the best way to represent header data file_dict['attributes_dict'] = header_dict file_dict['datasets'] = [] #### df = pd.concat((df_categorical_attrs,df_numerical_attrs),axis=1) #if numerical_variables: dataset = {} dataset['name'] = 'data_table'#_numerical_variables' dataset['data'] = utils.dataframe_to_np_structured_array(df) #df_numerical_attrs.to_numpy() dataset['shape'] = dataset['data'].shape dataset['dtype'] = type(dataset['data']) #dataset['data_units'] = file_obj['wave']['data_units'] # # Create attribute descriptions based on description_dict dataset['attributes'] = {} # Annotate column headers if description_dict is non empty if description_dict: for column_name in df.columns: column_attr_dict = description_dict['table_header'].get(column_name, {'note':'there was no description available. Review instrument files.'}) dataset['attributes'].update({column_name: utils.parse_attribute(column_attr_dict)}) #try: # dataset['attributes'] = description_dict['table_header'].copy() # for key in description_dict['table_header'].keys(): # if not key in numerical_variables: # dataset['attributes'].pop(key) # delete key # else: # dataset['attributes'][key] = utils.parse_attribute(dataset['attributes'][key]) # if timestamps_name in categorical_variables: # dataset['attributes'][timestamps_name] = utils.parse_attribute({'unit':'YYYY-MM-DD HH:MM:SS.ffffff'}) #except ValueError as err: # print(err) # Represent string values as fixed length strings in the HDF5 file, which need # to be decoded as string when we read them. It provides better control than variable strings, # at the expense of flexibility. # https://docs.h5py.org/en/stable/strings.html if table_preamble: #header_dict["table_preamble"] = utils.convert_string_to_bytes(table_preamble) tp_dataset = {} tp_dataset['name'] = "table_preamble" tp_dataset['data'] = utils.convert_string_to_bytes(table_preamble) tp_dataset['shape'] = tp_dataset['data'].shape tp_dataset['dtype'] = type(tp_dataset['data']) tp_dataset['attributes'] = {} file_dict['datasets'].append(tp_dataset) file_dict['datasets'].append(dataset) #if categorical_variables: # dataset = {} # dataset['name'] = 'table_categorical_variables' # dataset['data'] = dataframe_to_np_structured_array(df_categorical_attrs) #df_categorical_attrs.loc[:,categorical_variables].to_numpy() # dataset['shape'] = dataset['data'].shape # dataset['dtype'] = type(dataset['data']) # if timestamps_name in categorical_variables: # dataset['attributes'] = {timestamps_name: utils.parse_attribute({'unit':'YYYY-MM-DD HH:MM:SS.ffffff'})} # file_dict['datasets'].append(dataset) except: return {} return file_dict def compute_filereader_key_from_path(hdf5_file_path): """Constructs the key 'instrumentname_ext' based on hdf5_file_path, structured as /instrumentname/to/filename.ext, which access the file reader that should be used to read such a file. Parameters ---------- hdf5_file_path : str _description_ Returns ------- _type_ _description_ """ parts = hdf5_file_path.strip('/').split('/') # Extract the filename and its extension filename, file_extension = os.path.splitext(parts[-1]) # Extract the first directory directly under the root directory '/' in the hdf5 file subfolder_name = parts[0] if len(parts) > 1 else "" # Remove leading dot from the file extension file_extension = file_extension.lstrip('.') # Construct the resulting string full_string = f"{subfolder_name}_{file_extension}" return full_string, file_extension def select_file_reader(path): full_string, extension = compute_filereader_key_from_path(path) # First, try to match the full string if full_string in file_readers: return file_readers[full_string] # If no match, try to match the reader using only the extension if extension in file_readers: return file_readers[extension] # Default case if no reader is found return None def main(): root_dir = '//fs101/5505/People/Juan/TypicalBeamTime/' file_path = os.path.join(root_dir,'SES/0069069_N1s_495eV.ibw') file_reader = select_file_reader(file_path.replace(root_dir,'/')) #file_dict = read_xps_ibw_file_as_dict(inputfile_dir+'\\SES\\0069069_N1s_495eV.ibw') if file_reader: print(file_reader(file_path)) else: print("No suitable file reader found.") if __name__ == '__main__': main() print(':)')