From 9789d312f9c0ab3e6f76e64d3677e3cbef9d66b3 Mon Sep 17 00:00:00 2001 From: Florez Ospina Juan Felipe Date: Fri, 23 Aug 2024 16:09:04 +0200 Subject: [PATCH] Removed and splitted into instruments/readers/filereader_registry.py instruments/readers/g5505_text_reader.py instruments/readers/xps_ibw_reader.py --- instruments/readers/g5505_file_reader.py | 403 ----------------------- 1 file changed, 403 deletions(-) delete mode 100644 instruments/readers/g5505_file_reader.py diff --git a/instruments/readers/g5505_file_reader.py b/instruments/readers/g5505_file_reader.py deleted file mode 100644 index ee26d5c..0000000 --- a/instruments/readers/g5505_file_reader.py +++ /dev/null @@ -1,403 +0,0 @@ -import sys -import os -root_dir = os.path.abspath(os.curdir) -sys.path.append(root_dir) - -import numpy as np -import pandas as pd -import collections -from igor2.binarywave import load as loadibw - -import utils.g5505_utils as utils -#import src.metadata_review_lib as metadata -#from src.metadata_review_lib import parse_attribute - -import yaml -import h5py - -ROOT_DIR = os.path.abspath(os.curdir) - -file_extensions = ['.ibw','.txt','.dat','.h5','.TXT','.csv'] - -# Define the instruments directory (modify this as needed or set to None) -default_instruments_dir = None # or provide an absolute path - -file_readers = { - 'ibw': lambda a1: read_xps_ibw_file_as_dict(a1), - 'txt': lambda a1: read_txt_files_as_dict(a1, instruments_dir=default_instruments_dir, work_with_copy=False), - 'TXT': lambda a1: read_txt_files_as_dict(a1, instruments_dir=default_instruments_dir, work_with_copy=False), - 'dat': lambda a1: read_txt_files_as_dict(a1, instruments_dir=default_instruments_dir, work_with_copy=False), - #'h5': lambda a1, a2, a3: copy_file_in_group(a1, a2, a3, work_with_copy=False), - 'ACSM_TOFWARE_txt': lambda a1: read_txt_files_as_dict(a1, instruments_dir=default_instruments_dir, work_with_copy=False), - 'ACSM_TOFWARE_csv': lambda a1: read_txt_files_as_dict(a1, instruments_dir=default_instruments_dir, work_with_copy=False) -} - - -def read_xps_ibw_file_as_dict(filename): - """ - Reads IBW files from the Multiphase Chemistry Group, which contain XPS spectra and acquisition settings, - and formats the data into a dictionary with the structure {datasets: list of datasets}. Each dataset in the - list has the following structure: - - { - 'name': 'name', - 'data': data_array, - 'data_units': 'units', - 'shape': data_shape, - 'dtype': data_type - } - - Parameters - ---------- - filename : str - The IBW filename from the Multiphase Chemistry Group beamline. - - Returns - ------- - file_dict : dict - A dictionary containing the datasets from the IBW file. - - Raises - ------ - ValueError - If the input IBW file is not a valid IBW file. - - """ - - - file_obj = loadibw(filename) - - required_keys = ['wData','data_units','dimension_units','note'] - if sum([item in required_keys for item in file_obj['wave'].keys()]) < len(required_keys): - raise ValueError('This is not a valid xps ibw file. It does not satisfy minimum adimissibility criteria.') - - file_dict = {} - path_tail, path_head = os.path.split(filename) - - # Group name and attributes - file_dict['name'] = path_head - file_dict['attributes_dict'] = {} - - # Convert notes of bytes class to string class and split string into a list of elements separated by '\r'. - notes_list = file_obj['wave']['note'].decode("utf-8").split('\r') - exclude_list = ['Excitation Energy'] - for item in notes_list: - if '=' in item: - key, value = tuple(item.split('=')) - # TODO: check if value can be converted into a numeric type. Now all values are string type - if not key in exclude_list: - file_dict['attributes_dict'][key] = value - - # TODO: talk to Thorsten to see if there is an easier way to access the below attributes - dimension_labels = file_obj['wave']['dimension_units'].decode("utf-8").split(']') - file_dict['attributes_dict']['dimension_units'] = [item+']' for item in dimension_labels[0:len(dimension_labels)-1]] - - # Datasets and their attributes - - file_dict['datasets'] = [] - - dataset = {} - dataset['name'] = 'spectrum' - dataset['data'] = file_obj['wave']['wData'] - dataset['data_units'] = file_obj['wave']['data_units'] - dataset['shape'] = dataset['data'].shape - dataset['dtype'] = type(dataset['data']) - - # TODO: include energy axis dataset - - file_dict['datasets'].append(dataset) - - - return file_dict - -def read_txt_files_as_dict(filename: str, instruments_dir: str = None, work_with_copy: bool = True): - # If instruments_dir is not provided, use the default path relative to the module directory - if not instruments_dir: - # Assuming the instruments folder is one level up from the source module directory - #module_dir = os.path.dirname(__file__) - #instruments_dir = os.path.join(module_dir, '..', 'instruments') - instruments_dir = os.path.join(root_dir,'instruments') - - # Normalize the path (resolves any '..' in the path) - instrument_configs_path = os.path.abspath(os.path.join(instruments_dir,'text_data_sources.yaml')) - - with open(instrument_configs_path,'r') as stream: - try: - config_dict = yaml.load(stream, Loader=yaml.FullLoader) - except yaml.YAMLError as exc: - print(exc) - # Verify if file can be read by available intrument configurations. - #if not any(key in filename.replace(os.sep,'/') for key in config_dict.keys()): - # return {} - - - #TODO: this may be prone to error if assumed folder structure is non compliant - file_encoding = config_dict['default']['file_encoding'] #'utf-8' - separator = config_dict['default']['separator'] - table_header = config_dict['default']['table_header'] - - for key in config_dict.keys(): - if key.replace('/',os.sep) in filename: - file_encoding = config_dict[key].get('file_encoding',file_encoding) - separator = config_dict[key].get('separator',separator) - table_header = config_dict[key].get('table_header',table_header) - timestamp_variables = config_dict[key].get('timestamp',[]) - datetime_format = config_dict[key].get('datetime_format',[]) - - description_dict = {} - link_to_description = config_dict[key].get('link_to_description', '').replace('/', os.sep) - - if link_to_description: - path = os.path.join(instruments_dir, link_to_description) - try: - with open(path, 'r') as stream: - description_dict = yaml.load(stream, Loader=yaml.FullLoader) - except (FileNotFoundError, yaml.YAMLError) as exc: - print(exc) - #if 'None' in table_header: - # return {} - - # Read header as a dictionary and detect where data table starts - header_dict = {} - data_start = False - # Work with copy of the file for safety - if work_with_copy: - tmp_filename = utils.make_file_copy(source_file_path=filename) - else: - tmp_filename = filename - - #with open(tmp_filename,'rb',encoding=file_encoding,errors='ignore') as f: - - if not isinstance(table_header, list): - table_header = [table_header] - file_encoding = [file_encoding] - separator = [separator] - - with open(tmp_filename,'rb') as f: - table_preamble = [] - for line_number, line in enumerate(f): - - - for tb_idx, tb in enumerate(table_header): - if tb in line.decode(file_encoding[tb_idx]): - break - - if tb in line.decode(file_encoding[tb_idx]): - list_of_substrings = line.decode(file_encoding[tb_idx]).split(separator[tb_idx].replace('\\t','\t')) - - # Count occurrences of each substring - substring_counts = collections.Counter(list_of_substrings) - data_start = True - # Generate column names with appended index only for repeated substrings - column_names = [f"{i}_{name.strip()}" if substring_counts[name] > 1 else name.strip() for i, name in enumerate(list_of_substrings)] - - #column_names = [str(i)+'_'+name.strip() for i, name in enumerate(list_of_substrings)] - #column_names = [] - #for i, name in enumerate(list_of_substrings): - # column_names.append(str(i)+'_'+name) - - #print(line_number, len(column_names ),'\n') - break - # Subdivide line into words, and join them by single space. - # I asumme this can produce a cleaner line that contains no weird separator characters \t \r or extra spaces and so on. - list_of_substrings = line.decode(file_encoding[tb_idx]).split() - # TODO: ideally we should use a multilinear string but the yalm parser is not recognizing \n as special character - #line = ' '.join(list_of_substrings+['\n']) - #line = ' '.join(list_of_substrings) - table_preamble.append(' '.join([item for item in list_of_substrings]))# += new_line - - - # TODO: it does not work with separator as none :(. fix for RGA - try: - df = pd.read_csv(tmp_filename, - delimiter = separator[tb_idx].replace('\\t','\t'), - header=line_number, - #encoding='latin-1', - encoding = file_encoding[tb_idx], - names=column_names, - skip_blank_lines=True) - - df_numerical_attrs = df.select_dtypes(include ='number') - df_categorical_attrs = df.select_dtypes(exclude='number') - numerical_variables = [item for item in df_numerical_attrs.columns] - - # Consolidate into single timestamp column the separate columns 'date' 'time' specified in text_data_source.yaml - if timestamp_variables: - #df_categorical_attrs['timestamps'] = [' '.join(df_categorical_attrs.loc[i,timestamp_variables].to_numpy()) for i in df.index] - #df_categorical_attrs['timestamps'] = [ df_categorical_attrs.loc[i,'0_Date']+' '+df_categorical_attrs.loc[i,'1_Time'] for i in df.index] - - - #df_categorical_attrs['timestamps'] = df_categorical_attrs[timestamp_variables].astype(str).agg(' '.join, axis=1) - timestamps_name = ' '.join(timestamp_variables) - df_categorical_attrs[ timestamps_name] = df_categorical_attrs[timestamp_variables].astype(str).agg(' '.join, axis=1) - - valid_indices = [] - if datetime_format: - df_categorical_attrs[ timestamps_name] = pd.to_datetime(df_categorical_attrs[ timestamps_name],format=datetime_format,errors='coerce') - valid_indices = df_categorical_attrs.dropna(subset=[timestamps_name]).index - df_categorical_attrs = df_categorical_attrs.loc[valid_indices,:] - df_numerical_attrs = df_numerical_attrs.loc[valid_indices,:] - - df_categorical_attrs[timestamps_name] = df_categorical_attrs[timestamps_name].dt.strftime(config_dict['default']['desired_format']) - startdate = df_categorical_attrs[timestamps_name].min() - enddate = df_categorical_attrs[timestamps_name].max() - - df_categorical_attrs[timestamps_name] = df_categorical_attrs[timestamps_name].astype(str) - #header_dict.update({'stastrrtdate':startdate,'enddate':enddate}) - header_dict['startdate']= str(startdate) - header_dict['enddate']=str(enddate) - - if len(timestamp_variables) > 1: - df_categorical_attrs = df_categorical_attrs.drop(columns = timestamp_variables) - - - #df_categorical_attrs.reindex(drop=True) - #df_numerical_attrs.reindex(drop=True) - - - - categorical_variables = [item for item in df_categorical_attrs.columns] - #### - #elif 'RGA' in filename: - # df_categorical_attrs = df_categorical_attrs.rename(columns={'0_Time(s)' : 'timestamps'}) - - ### - file_dict = {} - path_tail, path_head = os.path.split(tmp_filename) - - file_dict['name'] = path_head - # TODO: review this header dictionary, it may not be the best way to represent header data - file_dict['attributes_dict'] = header_dict - file_dict['datasets'] = [] - #### - - df = pd.concat((df_categorical_attrs,df_numerical_attrs),axis=1) - - #if numerical_variables: - dataset = {} - dataset['name'] = 'data_table'#_numerical_variables' - dataset['data'] = utils.dataframe_to_np_structured_array(df) #df_numerical_attrs.to_numpy() - dataset['shape'] = dataset['data'].shape - dataset['dtype'] = type(dataset['data']) - #dataset['data_units'] = file_obj['wave']['data_units'] - # - # Create attribute descriptions based on description_dict - dataset['attributes'] = {} - - # Annotate column headers if description_dict is non empty - if description_dict: - for column_name in df.columns: - column_attr_dict = description_dict['table_header'].get(column_name, - {'note':'there was no description available. Review instrument files.'}) - dataset['attributes'].update({column_name: utils.parse_attribute(column_attr_dict)}) - - #try: - # dataset['attributes'] = description_dict['table_header'].copy() - # for key in description_dict['table_header'].keys(): - # if not key in numerical_variables: - # dataset['attributes'].pop(key) # delete key - # else: - # dataset['attributes'][key] = utils.parse_attribute(dataset['attributes'][key]) - # if timestamps_name in categorical_variables: - # dataset['attributes'][timestamps_name] = utils.parse_attribute({'unit':'YYYY-MM-DD HH:MM:SS.ffffff'}) - #except ValueError as err: - # print(err) - - # Represent string values as fixed length strings in the HDF5 file, which need - # to be decoded as string when we read them. It provides better control than variable strings, - # at the expense of flexibility. - # https://docs.h5py.org/en/stable/strings.html - - - if table_preamble: - #header_dict["table_preamble"] = utils.convert_string_to_bytes(table_preamble) - tp_dataset = {} - tp_dataset['name'] = "table_preamble" - tp_dataset['data'] = utils.convert_string_to_bytes(table_preamble) - tp_dataset['shape'] = tp_dataset['data'].shape - tp_dataset['dtype'] = type(tp_dataset['data']) - tp_dataset['attributes'] = {} - file_dict['datasets'].append(tp_dataset) - - file_dict['datasets'].append(dataset) - - - #if categorical_variables: - # dataset = {} - # dataset['name'] = 'table_categorical_variables' - # dataset['data'] = dataframe_to_np_structured_array(df_categorical_attrs) #df_categorical_attrs.loc[:,categorical_variables].to_numpy() - # dataset['shape'] = dataset['data'].shape - # dataset['dtype'] = type(dataset['data']) - # if timestamps_name in categorical_variables: - # dataset['attributes'] = {timestamps_name: utils.parse_attribute({'unit':'YYYY-MM-DD HH:MM:SS.ffffff'})} - # file_dict['datasets'].append(dataset) - except: - return {} - - return file_dict - -def compute_filereader_key_from_path(hdf5_file_path): - """Constructs the key 'instrumentname_ext' based on hdf5_file_path, structured as - /instrumentname/to/filename.ext, which access the file reader that should be used to read such a file. - - Parameters - ---------- - hdf5_file_path : str - _description_ - - Returns - ------- - _type_ - _description_ - """ - - parts = hdf5_file_path.strip('/').split('/') - - # Extract the filename and its extension - filename, file_extension = os.path.splitext(parts[-1]) - - # Extract the first directory directly under the root directory '/' in the hdf5 file - subfolder_name = parts[0] if len(parts) > 1 else "" - - # Remove leading dot from the file extension - file_extension = file_extension.lstrip('.') - - # Construct the resulting string - full_string = f"{subfolder_name}_{file_extension}" - - return full_string, file_extension - -def select_file_reader(path): - full_string, extension = compute_filereader_key_from_path(path) - - # First, try to match the full string - if full_string in file_readers: - return file_readers[full_string] - - # If no match, try to match the reader using only the extension - if extension in file_readers: - return file_readers[extension] - - # Default case if no reader is found - return None - -def main(): - - root_dir = '//fs101/5505/People/Juan/TypicalBeamTime/' - file_path = os.path.join(root_dir,'SES/0069069_N1s_495eV.ibw') - - file_reader = select_file_reader(file_path.replace(root_dir,'/')) - - #file_dict = read_xps_ibw_file_as_dict(inputfile_dir+'\\SES\\0069069_N1s_495eV.ibw') - - if file_reader: - print(file_reader(file_path)) - else: - print("No suitable file reader found.") - - -if __name__ == '__main__': - - main() - - print(':)') \ No newline at end of file