Removed smogchamber reader because its funtionality is now integrated into g5505_file_reader.py.

This commit is contained in:
2024-07-09 16:13:01 +02:00
parent afc6c93823
commit 0c74c52e09

View File

@ -1,176 +0,0 @@
import os
import numpy as np
import pandas as pd
from igor2.binarywave import load as loadibw
import src.g5505_utils as utils
import h5py
#def read_txt_files_as_dict(filename : str ,instrument_folder : str):
def read_txt_files_as_dict(filename : str ):
#if instrument_folder == 'smps':
# Infer from filename whether txt file comes from smps or gas folder
#TODO: this may be prone to error if assumed folder structure is non compliant
if 'smps' in filename:
table_of_header = 'Sample # Date Start Time Sample Temp (C) Sample Pressure (kPa)'
separator = '\t'
file_encoding = 'latin-1'
elif 'gas' in filename:
table_of_header = 'Date_Time HoribaNO HoribaNOy Thermo42C_NO Thermo42C_NOx APHA370 CH4'
separator = '\t'
file_encoding = 'utf-8'
else:
raise ValueError('intrument_folder must be set as a either "smps" or "gas"')
tmp_file_path = utils.make_file_copy(filename)
# Read header as a dictionary and detect where data table starts
header_dict = {}
data_start = False
with open(tmp_file_path,'r', encoding=file_encoding, errors='ignore') as f:
#file_encoding = f.encoding
#table_preamble = ""
table_preamble = []
line_number = 0
for line_number, line in enumerate(f):
#print(line_number,line)
if table_of_header in line:
list_of_substrings = line.split(separator)
data_start = True
column_names = []
for i, name in enumerate(list_of_substrings):
column_names.append(str(i)+'_'+name)
#print(line_number, len(column_names ))
break
# Subdivide line into words, and join them by single space.
# I asumme this can produce a cleaner line that contains no weird separator characters \t \r or extra spaces and so on.
list_of_substrings = line.split()
# TODO: ideally we should use a multilinear string but the yalm parser is not recognizing \n as special character
#line = ' '.join(list_of_substrings+['\n'])
line = ' '.join(list_of_substrings)
table_preamble.append(line)# += new_line
header_dict["table_preamble"] = table_preamble #.replace('\n','\\n').replace('\t','\\t')
if not data_start:
raise ValueError('Invalid table header. The table header was not found and therefore table data cannot be extracted from txt or dat file.')
df = pd.read_csv(tmp_file_path,
delimiter = separator,
header=line_number,
#encoding='latin-1',
encoding= file_encoding,
names=column_names,
skip_blank_lines=True)
df_numerical_attrs = df.select_dtypes(include ='number')
df_categorical_attrs = df.select_dtypes(exclude='number')
if 'smps' in tmp_file_path:
df_categorical_attrs['timestamps'] = [ df_categorical_attrs.loc[i,'1_Date']+' '+df_categorical_attrs.loc[i,'2_Start Time'] for i in df.index]
df_categorical_attrs = df_categorical_attrs.drop(columns=['1_Date','2_Start Time'])
elif 'gas' in tmp_file_path:
df_categorical_attrs = df_categorical_attrs.rename(columns={'0_Date_Time' : 'timestamps'})
#data_column_names = [item.encode("utf-8") for item in df_numerical_attrs.columns]
numerical_variables = [item for item in df_numerical_attrs.columns]
categorical_variables = [item for item in df_categorical_attrs.columns]
###
file_dict = {}
path_tail, path_head = os.path.split(tmp_file_path)
file_dict['name'] = path_head
# TODO: review this header dictionary, it may not be the best way to represent header data
file_dict['attributes_dict'] = header_dict
file_dict['datasets'] = []
####
if numerical_variables:
dataset = {}
dataset['name'] = 'numerical_variables'
dataset['data'] = df_numerical_attrs.to_numpy()
dataset['shape'] = dataset['data'].shape
dataset['dtype'] = type(dataset['data'])
#dataset['data_units'] = file_obj['wave']['data_units']
file_dict['datasets'].append(dataset)
rows,cols = dataset['shape']
# This lines were added to test the structured array functionality
tmp = [tuple(dataset['data'][i,:]) for i in range(dataset['shape'][0])]
dtype_tmp = [(numerical_variables[i],'f4') for i in range(dataset['shape'][1])]
data = np.array(tmp, dtype=dtype_tmp)
dataset['data'] = data
dataset['shape'] = dataset['data'].shape
dataset = {}
numerical_variables= [item.encode("utf-8") for item in numerical_variables]
dataset['name'] = 'numerical_variable_names'
dataset['data'] = np.array(numerical_variables).reshape((1,cols))
dataset['shape'] = dataset['data'].shape
dataset['dtype'] = type(dataset['data'])
file_dict['datasets'].append(dataset)
#if 'timestamps' in categorical_variables:
# dataset = {}
# dataset['name'] = 'timestamps'
# dataset['data'] = df_categorical_attrs['timestamps'].to_numpy().reshape((rows,1))
# dataset['shape'] = dataset['data'].shape
# dataset['dtype'] = type(dataset['data'])
# file_dict['datasets'].append(dataset)
# categorical_variables.remove('timestamps')
if categorical_variables:
dataset = {}
dataset['name'] = 'categorical_variables'
dataset['data'] = df_categorical_attrs.loc[:,categorical_variables].to_numpy()
dataset['shape'] = dataset['data'].shape
dataset['dtype'] = type(dataset['data'])
file_dict['datasets'].append(dataset)
dataset = {}
categorical_variables = [item.encode("utf-8") for item in categorical_variables]
dataset['name'] = 'categorial_variable_names'
dataset['data'] = np.array(categorical_variables).reshape((1,len(categorical_variables)))
dataset['shape'] = dataset['data'].shape
dataset['dtype'] = type(dataset['data'])
file_dict['datasets'].append(dataset)
return file_dict
def main():
#filename = 'M:\\gas\\20220705_000004_MSC_gases.txt' corrupted file
#filename = 'M:\\gas\\20220726_101617_MSC_gases.txt'
root_dir = '\\\\fs03\\Iron_Sulphate'
instrument_folder = 'smps'
filename_path = os.path.join(root_dir,'smps\\20220726\\20220726_num.TXT')
instrument_folder = 'gas'
filename_path = os.path.join(root_dir,'gas\\20220726_101617_MSC_gases.txt')
result = read_txt_files_as_dict(filename_path,instrument_folder)
print(':)')
return result
if __name__ == '__main__':
output_dict = main()
print(output_dict['num_data_df'].columns, len(output_dict['num_data_df'].columns),'\n')
print(output_dict['num_data_df'].columns, len(output_dict['categ_data_df'].columns))