Files
dima/src/smog_chamber_file_reader.py

165 lines
6.2 KiB
Python

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import os
import g5505_utils as utils
#def read_txt_files_as_dict(filename : str ,instrument_folder : str):
def read_txt_files_as_dict(filename : str ):
#if instrument_folder == 'smps':
# Infer from filename whether txt file comes from smps or gas folder
#TODO: this may be prone to error if assumed folder structure is non compliant
if 'smps' in filename:
table_of_header = 'Sample # Date Start Time Sample Temp (C) Sample Pressure (kPa)'
separator = '\t'
elif 'gas' in filename:
table_of_header = 'Date_Time HoribaNO HoribaNOy Thermo42C_NO Thermo42C_NOx APHA370 CH4'
separator = '\t'
else:
raise ValueError('intrument_folder must be set as a either "smps" or "gas"')
tmp_file_path = utils.make_file_copy(filename)
# Read header as a dictionary and detect where data table starts
header_dict = {}
data_start = False
with open(tmp_file_path,'r') as f:
file_encoding = f.encoding
table_preamble = ""
for line_number, line in enumerate(f):
list_of_substrings = line.split(separator)
if not (line == '\n'):
#table_preamble += line.strip() #+ "\n"
table_preamble += line
if table_of_header in line:
data_start = True
column_names = []
for i, name in enumerate(list_of_substrings):
column_names.append(str(i)+'_'+name)
print(line_number, len(column_names ))
break
header_dict["table_preamble"] = table_preamble
if not data_start:
raise ValueError('Invalid table header. The table header was not found and therefore table data cannot be extracted from txt or dat file.')
df = pd.read_csv(tmp_file_path,
delimiter = separator,
header=line_number,
#encoding='latin-1',
encoding= file_encoding,
names=column_names,
skip_blank_lines=True)
df_numerical_attrs = df.select_dtypes(include ='number')
df_categorical_attrs = df.select_dtypes(exclude='number')
if 'smps' in tmp_file_path:
df_categorical_attrs['timestamps'] = [ df_categorical_attrs.loc[i,'1_Date']+' '+df_categorical_attrs.loc[i,'2_Start Time'] for i in df.index]
df_categorical_attrs = df_categorical_attrs.drop(columns=['1_Date','2_Start Time'])
elif 'gas' in tmp_file_path:
df_categorical_attrs = df_categorical_attrs.rename(columns={'0_Date_Time' : 'timestamps'})
#data_column_names = [item.encode("utf-8") for item in df_numerical_attrs.columns]
numerical_variables = [item for item in df_numerical_attrs.columns]
categorical_variables = [item for item in df_categorical_attrs.columns]
###
file_dict = {}
path_tail, path_head = os.path.split(tmp_file_path)
file_dict['name'] = path_head
# TODO: review this header dictionary, it may not be the best way to represent header data
file_dict['attributes_dict'] = header_dict
file_dict['datasets'] = []
####
if numerical_variables:
dataset = {}
dataset['name'] = 'numerical_variables'
dataset['data'] = df_numerical_attrs.to_numpy()
dataset['shape'] = dataset['data'].shape
dataset['dtype'] = type(dataset['data'])
#dataset['data_units'] = file_obj['wave']['data_units']
file_dict['datasets'].append(dataset)
rows,cols = dataset['shape']
# This lines were added to test the structured array functionality
tmp = [tuple(dataset['data'][i,:]) for i in range(dataset['shape'][0])]
dtype_tmp = [(numerical_variables[i],'f4') for i in range(dataset['shape'][1])]
data = np.array(tmp, dtype=dtype_tmp)
dataset['data'] = data
dataset['shape'] = dataset['data'].shape
dataset = {}
numerical_variables= [item.encode("utf-8") for item in numerical_variables]
dataset['name'] = 'numerical_variable_names'
dataset['data'] = np.array(numerical_variables).reshape((1,cols))
dataset['shape'] = dataset['data'].shape
dataset['dtype'] = type(dataset['data'])
file_dict['datasets'].append(dataset)
if 'timestamps' in categorical_variables:
dataset = {}
dataset['name'] = 'timestamps'
dataset['data'] = df_categorical_attrs['timestamps'].to_numpy().reshape((rows,1))
dataset['shape'] = dataset['data'].shape
dataset['dtype'] = type(dataset['data'])
file_dict['datasets'].append(dataset)
categorical_variables.remove('timestamps')
if categorical_variables:
dataset = {}
dataset['name'] = 'categorical_variables'
dataset['data'] = df_categorical_attrs.loc[:,categorical_variables].to_numpy()
dataset['shape'] = dataset['data'].shape
dataset['dtype'] = type(dataset['data'])
file_dict['datasets'].append(dataset)
dataset = {}
categorical_variables = [item.encode("utf-8") for item in categorical_variables]
dataset['name'] = 'categorial_variable_names'
dataset['data'] = np.array(categorical_variables).reshape((1,len(categorical_variables)))
dataset['shape'] = dataset['data'].shape
dataset['dtype'] = type(dataset['data'])
file_dict['datasets'].append(dataset)
return file_dict
def main():
#filename = 'M:\\gas\\20220705_000004_MSC_gases.txt' corrupted file
#filename = 'M:\\gas\\20220726_101617_MSC_gases.txt'
root_dir = '\\\\fs03\\Iron_Sulphate'
instrument_folder = 'smps'
filename_path = os.path.join(root_dir,'smps\\20220726\\20220726_num.TXT')
instrument_folder = 'gas'
filename_path = os.path.join(root_dir,'gas\\20220726_101617_MSC_gases.txt')
result = read_txt_files_as_dict(filename_path,instrument_folder)
print(':)')
return result
if __name__ == '__main__':
output_dict = main()
print(output_dict['num_data_df'].columns, len(output_dict['num_data_df'].columns),'\n')
print(output_dict['num_data_df'].columns, len(output_dict['categ_data_df'].columns))