From d866c8f9f952d77e96f201b97442451dbffd88c9 Mon Sep 17 00:00:00 2001 From: Florez Ospina Juan Felipe Date: Fri, 23 Aug 2024 16:06:44 +0200 Subject: [PATCH] Split instruments/readers/g5505_file_reader.py into a fileregistry.py and independent file readers. This is to improve instrument modularity and additions --- instruments/readers/filereader_registry.py | 68 ++++++ instruments/readers/g5505_text_reader.py | 240 +++++++++++++++++++++ instruments/readers/xps_ibw_reader.py | 82 +++++++ 3 files changed, 390 insertions(+) create mode 100644 instruments/readers/filereader_registry.py create mode 100644 instruments/readers/g5505_text_reader.py create mode 100644 instruments/readers/xps_ibw_reader.py diff --git a/instruments/readers/filereader_registry.py b/instruments/readers/filereader_registry.py new file mode 100644 index 0000000..2d893fb --- /dev/null +++ b/instruments/readers/filereader_registry.py @@ -0,0 +1,68 @@ +import sys +import os +root_dir = os.path.abspath(os.curdir) +sys.path.append(root_dir) + +from instruments.readers.xps_ibw_reader import read_xps_ibw_file_as_dict +from instruments.readers.g5505_text_reader import read_txt_files_as_dict + + +file_extensions = ['.ibw','.txt','.dat','.h5','.TXT','.csv'] + +# Define the instruments directory (modify this as needed or set to None) +default_instruments_dir = None # or provide an absolute path + +file_readers = { + 'ibw': lambda a1: read_xps_ibw_file_as_dict(a1), + 'txt': lambda a1: read_txt_files_as_dict(a1, instruments_dir=default_instruments_dir, work_with_copy=False), + 'TXT': lambda a1: read_txt_files_as_dict(a1, instruments_dir=default_instruments_dir, work_with_copy=False), + 'dat': lambda a1: read_txt_files_as_dict(a1, instruments_dir=default_instruments_dir, work_with_copy=False), + #'h5': lambda a1, a2, a3: copy_file_in_group(a1, a2, a3, work_with_copy=False), + 'ACSM_TOFWARE_txt': lambda a1: read_txt_files_as_dict(a1, instruments_dir=default_instruments_dir, work_with_copy=False), + 'ACSM_TOFWARE_csv': lambda a1: read_txt_files_as_dict(a1, instruments_dir=default_instruments_dir, work_with_copy=False) +} + +def compute_filereader_key_from_path(hdf5_file_path): + """Constructs the key 'instrumentname_ext' based on hdf5_file_path, structured as + /instrumentname/to/filename.ext, which access the file reader that should be used to read such a file. + + Parameters + ---------- + hdf5_file_path : str + _description_ + + Returns + ------- + _type_ + _description_ + """ + + parts = hdf5_file_path.strip('/').split('/') + + # Extract the filename and its extension + filename, file_extension = os.path.splitext(parts[-1]) + + # Extract the first directory directly under the root directory '/' in the hdf5 file + subfolder_name = parts[0] if len(parts) > 1 else "" + + # Remove leading dot from the file extension + file_extension = file_extension.lstrip('.') + + # Construct the resulting string + full_string = f"{subfolder_name}_{file_extension}" + + return full_string, file_extension + +def select_file_reader(path): + full_string, extension = compute_filereader_key_from_path(path) + + # First, try to match the full string + if full_string in file_readers: + return file_readers[full_string] + + # If no match, try to match the reader using only the extension + if extension in file_readers: + return file_readers[extension] + + # Default case if no reader is found + return None \ No newline at end of file diff --git a/instruments/readers/g5505_text_reader.py b/instruments/readers/g5505_text_reader.py new file mode 100644 index 0000000..62c558c --- /dev/null +++ b/instruments/readers/g5505_text_reader.py @@ -0,0 +1,240 @@ +import sys +import os +root_dir = os.path.abspath(os.curdir) +sys.path.append(root_dir) + +import pandas as pd +import collections + +import utils.g5505_utils as utils +#import src.metadata_review_lib as metadata +#from src.metadata_review_lib import parse_attribute + +import yaml + + +def read_txt_files_as_dict(filename: str, instruments_dir: str = None, work_with_copy: bool = True): + # If instruments_dir is not provided, use the default path relative to the module directory + if not instruments_dir: + # Assuming the instruments folder is one level up from the source module directory + #module_dir = os.path.dirname(__file__) + #instruments_dir = os.path.join(module_dir, '..', 'instruments') + instruments_dir = os.path.join(root_dir,'instruments') + + # Normalize the path (resolves any '..' in the path) + instrument_configs_path = os.path.abspath(os.path.join(instruments_dir,'readers/config_text_reader.yaml')) + + with open(instrument_configs_path,'r') as stream: + try: + config_dict = yaml.load(stream, Loader=yaml.FullLoader) + except yaml.YAMLError as exc: + print(exc) + # Verify if file can be read by available intrument configurations. + #if not any(key in filename.replace(os.sep,'/') for key in config_dict.keys()): + # return {} + + + #TODO: this may be prone to error if assumed folder structure is non compliant + file_encoding = config_dict['default']['file_encoding'] #'utf-8' + separator = config_dict['default']['separator'] + table_header = config_dict['default']['table_header'] + + for key in config_dict.keys(): + if key.replace('/',os.sep) in filename: + file_encoding = config_dict[key].get('file_encoding',file_encoding) + separator = config_dict[key].get('separator',separator) + table_header = config_dict[key].get('table_header',table_header) + timestamp_variables = config_dict[key].get('timestamp',[]) + datetime_format = config_dict[key].get('datetime_format',[]) + + description_dict = {} + link_to_description = config_dict[key].get('link_to_description', '').replace('/', os.sep) + + if link_to_description: + path = os.path.join(instruments_dir, link_to_description) + try: + with open(path, 'r') as stream: + description_dict = yaml.load(stream, Loader=yaml.FullLoader) + except (FileNotFoundError, yaml.YAMLError) as exc: + print(exc) + #if 'None' in table_header: + # return {} + + # Read header as a dictionary and detect where data table starts + header_dict = {} + data_start = False + # Work with copy of the file for safety + if work_with_copy: + tmp_filename = utils.make_file_copy(source_file_path=filename) + else: + tmp_filename = filename + + #with open(tmp_filename,'rb',encoding=file_encoding,errors='ignore') as f: + + if not isinstance(table_header, list): + table_header = [table_header] + file_encoding = [file_encoding] + separator = [separator] + + with open(tmp_filename,'rb') as f: + table_preamble = [] + for line_number, line in enumerate(f): + + + for tb_idx, tb in enumerate(table_header): + if tb in line.decode(file_encoding[tb_idx]): + break + + if tb in line.decode(file_encoding[tb_idx]): + list_of_substrings = line.decode(file_encoding[tb_idx]).split(separator[tb_idx].replace('\\t','\t')) + + # Count occurrences of each substring + substring_counts = collections.Counter(list_of_substrings) + data_start = True + # Generate column names with appended index only for repeated substrings + column_names = [f"{i}_{name.strip()}" if substring_counts[name] > 1 else name.strip() for i, name in enumerate(list_of_substrings)] + + #column_names = [str(i)+'_'+name.strip() for i, name in enumerate(list_of_substrings)] + #column_names = [] + #for i, name in enumerate(list_of_substrings): + # column_names.append(str(i)+'_'+name) + + #print(line_number, len(column_names ),'\n') + break + # Subdivide line into words, and join them by single space. + # I asumme this can produce a cleaner line that contains no weird separator characters \t \r or extra spaces and so on. + list_of_substrings = line.decode(file_encoding[tb_idx]).split() + # TODO: ideally we should use a multilinear string but the yalm parser is not recognizing \n as special character + #line = ' '.join(list_of_substrings+['\n']) + #line = ' '.join(list_of_substrings) + table_preamble.append(' '.join([item for item in list_of_substrings]))# += new_line + + + # TODO: it does not work with separator as none :(. fix for RGA + try: + df = pd.read_csv(tmp_filename, + delimiter = separator[tb_idx].replace('\\t','\t'), + header=line_number, + #encoding='latin-1', + encoding = file_encoding[tb_idx], + names=column_names, + skip_blank_lines=True) + + df_numerical_attrs = df.select_dtypes(include ='number') + df_categorical_attrs = df.select_dtypes(exclude='number') + numerical_variables = [item for item in df_numerical_attrs.columns] + + # Consolidate into single timestamp column the separate columns 'date' 'time' specified in text_data_source.yaml + if timestamp_variables: + #df_categorical_attrs['timestamps'] = [' '.join(df_categorical_attrs.loc[i,timestamp_variables].to_numpy()) for i in df.index] + #df_categorical_attrs['timestamps'] = [ df_categorical_attrs.loc[i,'0_Date']+' '+df_categorical_attrs.loc[i,'1_Time'] for i in df.index] + + + #df_categorical_attrs['timestamps'] = df_categorical_attrs[timestamp_variables].astype(str).agg(' '.join, axis=1) + timestamps_name = ' '.join(timestamp_variables) + df_categorical_attrs[ timestamps_name] = df_categorical_attrs[timestamp_variables].astype(str).agg(' '.join, axis=1) + + valid_indices = [] + if datetime_format: + df_categorical_attrs[ timestamps_name] = pd.to_datetime(df_categorical_attrs[ timestamps_name],format=datetime_format,errors='coerce') + valid_indices = df_categorical_attrs.dropna(subset=[timestamps_name]).index + df_categorical_attrs = df_categorical_attrs.loc[valid_indices,:] + df_numerical_attrs = df_numerical_attrs.loc[valid_indices,:] + + df_categorical_attrs[timestamps_name] = df_categorical_attrs[timestamps_name].dt.strftime(config_dict['default']['desired_format']) + startdate = df_categorical_attrs[timestamps_name].min() + enddate = df_categorical_attrs[timestamps_name].max() + + df_categorical_attrs[timestamps_name] = df_categorical_attrs[timestamps_name].astype(str) + #header_dict.update({'stastrrtdate':startdate,'enddate':enddate}) + header_dict['startdate']= str(startdate) + header_dict['enddate']=str(enddate) + + if len(timestamp_variables) > 1: + df_categorical_attrs = df_categorical_attrs.drop(columns = timestamp_variables) + + + #df_categorical_attrs.reindex(drop=True) + #df_numerical_attrs.reindex(drop=True) + + + + categorical_variables = [item for item in df_categorical_attrs.columns] + #### + #elif 'RGA' in filename: + # df_categorical_attrs = df_categorical_attrs.rename(columns={'0_Time(s)' : 'timestamps'}) + + ### + file_dict = {} + path_tail, path_head = os.path.split(tmp_filename) + + file_dict['name'] = path_head + # TODO: review this header dictionary, it may not be the best way to represent header data + file_dict['attributes_dict'] = header_dict + file_dict['datasets'] = [] + #### + + df = pd.concat((df_categorical_attrs,df_numerical_attrs),axis=1) + + #if numerical_variables: + dataset = {} + dataset['name'] = 'data_table'#_numerical_variables' + dataset['data'] = utils.dataframe_to_np_structured_array(df) #df_numerical_attrs.to_numpy() + dataset['shape'] = dataset['data'].shape + dataset['dtype'] = type(dataset['data']) + #dataset['data_units'] = file_obj['wave']['data_units'] + # + # Create attribute descriptions based on description_dict + dataset['attributes'] = {} + + # Annotate column headers if description_dict is non empty + if description_dict: + for column_name in df.columns: + column_attr_dict = description_dict['table_header'].get(column_name, + {'note':'there was no description available. Review instrument files.'}) + dataset['attributes'].update({column_name: utils.parse_attribute(column_attr_dict)}) + + #try: + # dataset['attributes'] = description_dict['table_header'].copy() + # for key in description_dict['table_header'].keys(): + # if not key in numerical_variables: + # dataset['attributes'].pop(key) # delete key + # else: + # dataset['attributes'][key] = utils.parse_attribute(dataset['attributes'][key]) + # if timestamps_name in categorical_variables: + # dataset['attributes'][timestamps_name] = utils.parse_attribute({'unit':'YYYY-MM-DD HH:MM:SS.ffffff'}) + #except ValueError as err: + # print(err) + + # Represent string values as fixed length strings in the HDF5 file, which need + # to be decoded as string when we read them. It provides better control than variable strings, + # at the expense of flexibility. + # https://docs.h5py.org/en/stable/strings.html + + + if table_preamble: + #header_dict["table_preamble"] = utils.convert_string_to_bytes(table_preamble) + tp_dataset = {} + tp_dataset['name'] = "table_preamble" + tp_dataset['data'] = utils.convert_string_to_bytes(table_preamble) + tp_dataset['shape'] = tp_dataset['data'].shape + tp_dataset['dtype'] = type(tp_dataset['data']) + tp_dataset['attributes'] = {} + file_dict['datasets'].append(tp_dataset) + + file_dict['datasets'].append(dataset) + + + #if categorical_variables: + # dataset = {} + # dataset['name'] = 'table_categorical_variables' + # dataset['data'] = dataframe_to_np_structured_array(df_categorical_attrs) #df_categorical_attrs.loc[:,categorical_variables].to_numpy() + # dataset['shape'] = dataset['data'].shape + # dataset['dtype'] = type(dataset['data']) + # if timestamps_name in categorical_variables: + # dataset['attributes'] = {timestamps_name: utils.parse_attribute({'unit':'YYYY-MM-DD HH:MM:SS.ffffff'})} + # file_dict['datasets'].append(dataset) + except: + return {} + + return file_dict \ No newline at end of file diff --git a/instruments/readers/xps_ibw_reader.py b/instruments/readers/xps_ibw_reader.py new file mode 100644 index 0000000..846fd11 --- /dev/null +++ b/instruments/readers/xps_ibw_reader.py @@ -0,0 +1,82 @@ +import os +import numpy as np +import pandas as pd +import collections +from igor2.binarywave import load as loadibw + +def read_xps_ibw_file_as_dict(filename): + """ + Reads IBW files from the Multiphase Chemistry Group, which contain XPS spectra and acquisition settings, + and formats the data into a dictionary with the structure {datasets: list of datasets}. Each dataset in the + list has the following structure: + + { + 'name': 'name', + 'data': data_array, + 'data_units': 'units', + 'shape': data_shape, + 'dtype': data_type + } + + Parameters + ---------- + filename : str + The IBW filename from the Multiphase Chemistry Group beamline. + + Returns + ------- + file_dict : dict + A dictionary containing the datasets from the IBW file. + + Raises + ------ + ValueError + If the input IBW file is not a valid IBW file. + + """ + + + file_obj = loadibw(filename) + + required_keys = ['wData','data_units','dimension_units','note'] + if sum([item in required_keys for item in file_obj['wave'].keys()]) < len(required_keys): + raise ValueError('This is not a valid xps ibw file. It does not satisfy minimum adimissibility criteria.') + + file_dict = {} + path_tail, path_head = os.path.split(filename) + + # Group name and attributes + file_dict['name'] = path_head + file_dict['attributes_dict'] = {} + + # Convert notes of bytes class to string class and split string into a list of elements separated by '\r'. + notes_list = file_obj['wave']['note'].decode("utf-8").split('\r') + exclude_list = ['Excitation Energy'] + for item in notes_list: + if '=' in item: + key, value = tuple(item.split('=')) + # TODO: check if value can be converted into a numeric type. Now all values are string type + if not key in exclude_list: + file_dict['attributes_dict'][key] = value + + # TODO: talk to Thorsten to see if there is an easier way to access the below attributes + dimension_labels = file_obj['wave']['dimension_units'].decode("utf-8").split(']') + file_dict['attributes_dict']['dimension_units'] = [item+']' for item in dimension_labels[0:len(dimension_labels)-1]] + + # Datasets and their attributes + + file_dict['datasets'] = [] + + dataset = {} + dataset['name'] = 'spectrum' + dataset['data'] = file_obj['wave']['wData'] + dataset['data_units'] = file_obj['wave']['data_units'] + dataset['shape'] = dataset['data'].shape + dataset['dtype'] = type(dataset['data']) + + # TODO: include energy axis dataset + + file_dict['datasets'].append(dataset) + + + return file_dict \ No newline at end of file