Files
dima/src/g5505_file_reader.py

288 lines
12 KiB
Python

import os
import numpy as np
import pandas as pd
from igor2.binarywave import load as loadibw
import src.g5505_utils as utils
import src.metadata_review_lib as metadata
#from src.metadata_review_lib import parse_attribute
import yaml
import h5py
ROOT_DIR = os.path.abspath(os.curdir)
def read_xps_ibw_file_as_dict(filename):
""" Reads ibw files from multiphase chemistry group, which contain xps spectra and acquisition settings."""
file_obj = loadibw(filename)
required_keys = ['wData','data_units','dimension_units','note']
if sum([item in required_keys for item in file_obj['wave'].keys()]) < len(required_keys):
raise ValueError('This is not a valid xps ibw file. It does not satisfy minimum adimissibility criteria.')
file_dict = {}
path_tail, path_head = os.path.split(filename)
# Group name and attributes
file_dict['name'] = path_head
file_dict['attributes_dict'] = {}
# Convert notes of bytes class to string class and split string into a list of elements separated by '\r'.
notes_list = file_obj['wave']['note'].decode("utf-8").split('\r')
exclude_list = ['Excitation Energy']
for item in notes_list:
if '=' in item:
key, value = tuple(item.split('='))
# TODO: check if value can be converted into a numeric type. Now all values are string type
if not key in exclude_list:
file_dict['attributes_dict'][key] = value
# TODO: talk to Thorsten to see if there is an easier way to access the below attributes
dimension_labels = file_obj['wave']['dimension_units'].decode("utf-8").split(']')
file_dict['attributes_dict']['dimension_units'] = [item+']' for item in dimension_labels[0:len(dimension_labels)-1]]
# Datasets and their attributes
file_dict['datasets'] = []
dataset = {}
dataset['name'] = 'spectrum'
dataset['data'] = file_obj['wave']['wData']
dataset['data_units'] = file_obj['wave']['data_units']
dataset['shape'] = dataset['data'].shape
dataset['dtype'] = type(dataset['data'])
# TODO: include energy axis dataset
file_dict['datasets'].append(dataset)
return file_dict
def copy_file_in_group(source_file_path, dest_file_obj : h5py.File, dest_group_name):
# Create copy of original file to avoid possible file corruption and work with it.
tmp_file_path = utils.make_file_copy(source_file_path)
# Open backup h5 file and copy complet filesystem directory onto a group in h5file
with h5py.File(tmp_file_path,'r') as src_file:
dest_file_obj.copy(source= src_file['/'], dest= dest_group_name)
if 'tmp_files' in tmp_file_path:
os.remove(tmp_file_path)
import re
def infer_units(column_name):
match = re.search('\[.+\]')
if match:
return match
else:
match = re.search('\(.+\)')
return match
def dataframe_to_np_structured_array(df: pd.DataFrame):
# Define the dtype for the structured array, ensuring compatibility with h5py
dtype = []
for col in df.columns:
col_dtype = df[col].dtype
if pd.api.types.is_string_dtype(col_dtype):
# Convert string dtype to fixed-length strings
max_len = df[col].str.len().max()
dtype.append((col, f'S{max_len}'))
elif pd.api.types.is_integer_dtype(col_dtype):
dtype.append((col, 'i4')) # Assuming 32-bit integer
elif pd.api.types.is_float_dtype(col_dtype):
dtype.append((col, 'f4')) # Assuming 32-bit float
else:
raise ValueError(f"Unsupported dtype: {col_dtype}")
# Convert the DataFrame to a structured array
structured_array = np.array(list(df.itertuples(index=False, name=None)), dtype=dtype)
return structured_array
def read_txt_files_as_dict(filename : str ):
with open('src/text_data_sources.yaml','r') as stream:
try:
config_dict = yaml.load(stream, Loader=yaml.FullLoader)
except yaml.YAMLError as exc:
print(exc)
# Verify if file can be read by available intrument configurations.
if not any(key in filename for key in config_dict.keys()):
return {}
#TODO: this may be prone to error if assumed folder structure is non compliant
file_encoding = config_dict['default']['file_encoding'] #'utf-8'
separator = config_dict['default']['separator']
table_header = config_dict['default']['table_header']
for key in config_dict.keys():
if key.replace('/',os.sep) in filename:
file_encoding = config_dict[key].get('file_encoding',file_encoding)
separator = config_dict[key].get('separator',separator).replace('\\t','\t')
table_header = config_dict[key].get('table_header',table_header)
timestamp_variables = config_dict[key].get('timestamp',[])
description_dict = {}
link_to_description = config_dict[key].get('link_to_description',[]).replace('/',os.sep)
with open(link_to_description,'r') as stream:
try:
description_dict = yaml.load(stream, Loader=yaml.FullLoader)
except yaml.YAMLError as exc:
print(exc)
break
#if 'None' in table_header:
# return {}
# Read header as a dictionary and detect where data table starts
header_dict = {}
data_start = False
# Work with copy of the file for safety
tmp_filename = utils.make_file_copy(source_file_path=filename)
#with open(tmp_filename,'rb',encoding=file_encoding,errors='ignore') as f:
with open(tmp_filename,'rb') as f:
table_preamble = []
for line_number, line in enumerate(f):
if table_header in line.decode(file_encoding):
list_of_substrings = line.decode(file_encoding).split(separator)
data_start = True
column_names = [str(i)+'_'+name for i, name in enumerate(list_of_substrings)]
#column_names = []
#for i, name in enumerate(list_of_substrings):
# column_names.append(str(i)+'_'+name)
#print(line_number, len(column_names ),'\n')
break
# Subdivide line into words, and join them by single space.
# I asumme this can produce a cleaner line that contains no weird separator characters \t \r or extra spaces and so on.
list_of_substrings = line.decode(file_encoding).split()
# TODO: ideally we should use a multilinear string but the yalm parser is not recognizing \n as special character
#line = ' '.join(list_of_substrings+['\n'])
#line = ' '.join(list_of_substrings)
table_preamble.append(' '.join([item for item in list_of_substrings]))# += new_line
# Represent string values as fixed length strings in the HDF5 file, which need
# to be decoded as string when we read them. It provides better control than variable strings,
# at the expense of flexibility.
# https://docs.h5py.org/en/stable/strings.html
if table_preamble:
header_dict["table_preamble"] = metadata.convert_string_to_bytes(table_preamble)
# TODO: it does not work with separator as none :(. fix for RGA
try:
df = pd.read_csv(tmp_filename,
delimiter = separator,
header=line_number,
#encoding='latin-1',
encoding = file_encoding,
names=column_names,
skip_blank_lines=True)
df_numerical_attrs = df.select_dtypes(include ='number')
df_categorical_attrs = df.select_dtypes(exclude='number')
numerical_variables = [item for item in df_numerical_attrs.columns]
# Consolidate into single timestamp column the separate columns 'date' 'time' specified in text_data_source.yaml
if timestamp_variables:
#df_categorical_attrs['timestamps'] = [' '.join(df_categorical_attrs.loc[i,timestamp_variables].to_numpy()) for i in df.index]
#df_categorical_attrs['timestamps'] = [ df_categorical_attrs.loc[i,'0_Date']+' '+df_categorical_attrs.loc[i,'1_Time'] for i in df.index]
df_categorical_attrs['timestamps'] = df_categorical_attrs[timestamp_variables].astype(str).agg(' '.join, axis=1)
df_categorical_attrs = df_categorical_attrs.drop(columns = timestamp_variables)
categorical_variables = [item for item in df_categorical_attrs.columns]
####
#elif 'RGA' in filename:
# df_categorical_attrs = df_categorical_attrs.rename(columns={'0_Time(s)' : 'timestamps'})
###
file_dict = {}
path_tail, path_head = os.path.split(tmp_filename)
file_dict['name'] = path_head
# TODO: review this header dictionary, it may not be the best way to represent header data
file_dict['attributes_dict'] = header_dict
file_dict['datasets'] = []
####
if numerical_variables:
dataset = {}
dataset['name'] = 'table_numerical_variables'
dataset['data'] = dataframe_to_np_structured_array(df_numerical_attrs) #df_numerical_attrs.to_numpy()
dataset['shape'] = dataset['data'].shape
dataset['dtype'] = type(dataset['data'])
#dataset['data_units'] = file_obj['wave']['data_units']
file_dict['datasets'].append(dataset)
try:
dataset['attributes'] = description_dict['table_header'].copy()
for key in description_dict['table_header'].keys():
if not key in numerical_variables:
dataset['attributes'].pop(key) # delete key
else:
dataset['attributes'][key] = metadata.parse_attribute(dataset['attributes'][key])
except ValueError as err:
print(err)
#dataset = {}
#numerical_variables= [item.encode("utf-8") for item in numerical_variables]
#dataset['name'] = 'numerical_variable_names'
#dataset['data'] = np.array(numerical_variables).reshape((1,len(numerical_variables)))
#dataset['shape'] = dataset['data'].shape
#dataset['dtype'] = type(dataset['data'])
#file_dict['datasets'].append(dataset)
if categorical_variables:
dataset = {}
dataset['name'] = 'table_categorical_variables'
dataset['data'] = dataframe_to_np_structured_array(df_categorical_attrs) #df_categorical_attrs.loc[:,categorical_variables].to_numpy()
dataset['shape'] = dataset['data'].shape
dataset['dtype'] = type(dataset['data'])
file_dict['datasets'].append(dataset)
# dataset = {}
# categorical_variables = [item.encode("utf-8") for item in categorical_variables]
# dataset['name'] = 'categorial_variable_names'
# dataset['data'] = np.array(categorical_variables).reshape((1,len(categorical_variables)))
# dataset['shape'] = dataset['data'].shape
# dataset['dtype'] = type(dataset['data'])
# file_dict['datasets'].append(dataset)
except:
return {}
return file_dict
def main():
inputfile_dir = '\\\\fs101\\5505\\People\\Juan\\TypicalBeamTime'
file_dict = read_xps_ibw_file_as_dict(inputfile_dir+'\\SES\\0069069_N1s_495eV.ibw')
for key in file_dict.keys():
print(key,file_dict[key])
if __name__ == '__main__':
main()
print(':)')