diff --git a/src/smog_chamber_file_reader.py b/src/smog_chamber_file_reader.py deleted file mode 100644 index 0f40788..0000000 --- a/src/smog_chamber_file_reader.py +++ /dev/null @@ -1,176 +0,0 @@ -import os - -import numpy as np -import pandas as pd -from igor2.binarywave import load as loadibw - -import src.g5505_utils as utils - - -import h5py - -#def read_txt_files_as_dict(filename : str ,instrument_folder : str): -def read_txt_files_as_dict(filename : str ): - - #if instrument_folder == 'smps': - # Infer from filename whether txt file comes from smps or gas folder - #TODO: this may be prone to error if assumed folder structure is non compliant - if 'smps' in filename: - table_of_header = 'Sample # Date Start Time Sample Temp (C) Sample Pressure (kPa)' - separator = '\t' - file_encoding = 'latin-1' - elif 'gas' in filename: - table_of_header = 'Date_Time HoribaNO HoribaNOy Thermo42C_NO Thermo42C_NOx APHA370 CH4' - separator = '\t' - file_encoding = 'utf-8' - else: - raise ValueError('intrument_folder must be set as a either "smps" or "gas"') - - tmp_file_path = utils.make_file_copy(filename) - - # Read header as a dictionary and detect where data table starts - header_dict = {} - data_start = False - with open(tmp_file_path,'r', encoding=file_encoding, errors='ignore') as f: - #file_encoding = f.encoding - #table_preamble = "" - table_preamble = [] - line_number = 0 - for line_number, line in enumerate(f): - #print(line_number,line) - if table_of_header in line: - list_of_substrings = line.split(separator) - data_start = True - column_names = [] - for i, name in enumerate(list_of_substrings): - column_names.append(str(i)+'_'+name) - - #print(line_number, len(column_names )) - break - # Subdivide line into words, and join them by single space. - # I asumme this can produce a cleaner line that contains no weird separator characters \t \r or extra spaces and so on. - list_of_substrings = line.split() - # TODO: ideally we should use a multilinear string but the yalm parser is not recognizing \n as special character - #line = ' '.join(list_of_substrings+['\n']) - line = ' '.join(list_of_substrings) - table_preamble.append(line)# += new_line - - header_dict["table_preamble"] = table_preamble #.replace('\n','\\n').replace('\t','\\t') - - if not data_start: - raise ValueError('Invalid table header. The table header was not found and therefore table data cannot be extracted from txt or dat file.') - - df = pd.read_csv(tmp_file_path, - delimiter = separator, - header=line_number, - #encoding='latin-1', - encoding= file_encoding, - names=column_names, - skip_blank_lines=True) - - df_numerical_attrs = df.select_dtypes(include ='number') - df_categorical_attrs = df.select_dtypes(exclude='number') - - if 'smps' in tmp_file_path: - df_categorical_attrs['timestamps'] = [ df_categorical_attrs.loc[i,'1_Date']+' '+df_categorical_attrs.loc[i,'2_Start Time'] for i in df.index] - df_categorical_attrs = df_categorical_attrs.drop(columns=['1_Date','2_Start Time']) - elif 'gas' in tmp_file_path: - df_categorical_attrs = df_categorical_attrs.rename(columns={'0_Date_Time' : 'timestamps'}) - - #data_column_names = [item.encode("utf-8") for item in df_numerical_attrs.columns] - numerical_variables = [item for item in df_numerical_attrs.columns] - categorical_variables = [item for item in df_categorical_attrs.columns] - - ### - file_dict = {} - path_tail, path_head = os.path.split(tmp_file_path) - - file_dict['name'] = path_head - # TODO: review this header dictionary, it may not be the best way to represent header data - file_dict['attributes_dict'] = header_dict - file_dict['datasets'] = [] - #### - - if numerical_variables: - dataset = {} - dataset['name'] = 'numerical_variables' - dataset['data'] = df_numerical_attrs.to_numpy() - dataset['shape'] = dataset['data'].shape - dataset['dtype'] = type(dataset['data']) - #dataset['data_units'] = file_obj['wave']['data_units'] - file_dict['datasets'].append(dataset) - rows,cols = dataset['shape'] - - # This lines were added to test the structured array functionality - tmp = [tuple(dataset['data'][i,:]) for i in range(dataset['shape'][0])] - dtype_tmp = [(numerical_variables[i],'f4') for i in range(dataset['shape'][1])] - - data = np.array(tmp, dtype=dtype_tmp) - dataset['data'] = data - dataset['shape'] = dataset['data'].shape - - dataset = {} - numerical_variables= [item.encode("utf-8") for item in numerical_variables] - dataset['name'] = 'numerical_variable_names' - dataset['data'] = np.array(numerical_variables).reshape((1,cols)) - dataset['shape'] = dataset['data'].shape - dataset['dtype'] = type(dataset['data']) - file_dict['datasets'].append(dataset) - - #if 'timestamps' in categorical_variables: - # dataset = {} - # dataset['name'] = 'timestamps' - # dataset['data'] = df_categorical_attrs['timestamps'].to_numpy().reshape((rows,1)) - # dataset['shape'] = dataset['data'].shape - # dataset['dtype'] = type(dataset['data']) - # file_dict['datasets'].append(dataset) - # categorical_variables.remove('timestamps') - - if categorical_variables: - dataset = {} - dataset['name'] = 'categorical_variables' - dataset['data'] = df_categorical_attrs.loc[:,categorical_variables].to_numpy() - dataset['shape'] = dataset['data'].shape - dataset['dtype'] = type(dataset['data']) - file_dict['datasets'].append(dataset) - - dataset = {} - categorical_variables = [item.encode("utf-8") for item in categorical_variables] - dataset['name'] = 'categorial_variable_names' - dataset['data'] = np.array(categorical_variables).reshape((1,len(categorical_variables))) - dataset['shape'] = dataset['data'].shape - dataset['dtype'] = type(dataset['data']) - file_dict['datasets'].append(dataset) - - return file_dict - -def main(): - - - - #filename = 'M:\\gas\\20220705_000004_MSC_gases.txt' corrupted file - #filename = 'M:\\gas\\20220726_101617_MSC_gases.txt' - root_dir = '\\\\fs03\\Iron_Sulphate' - - instrument_folder = 'smps' - filename_path = os.path.join(root_dir,'smps\\20220726\\20220726_num.TXT') - - - instrument_folder = 'gas' - filename_path = os.path.join(root_dir,'gas\\20220726_101617_MSC_gases.txt') - - - - result = read_txt_files_as_dict(filename_path,instrument_folder) - - print(':)') - - return result - - -if __name__ == '__main__': - - output_dict = main() - - print(output_dict['num_data_df'].columns, len(output_dict['num_data_df'].columns),'\n') - print(output_dict['num_data_df'].columns, len(output_dict['categ_data_df'].columns)) \ No newline at end of file