Moved a few functions from ...reader.py and hdf5_lib.py into ..utils.py, and refactored accordingly.

This commit is contained in:
2024-07-10 09:19:30 +02:00
parent 0c74c52e09
commit 0a0b4ac41d
3 changed files with 151 additions and 97 deletions

View File

@ -2,10 +2,11 @@ import os
import numpy as np import numpy as np
import pandas as pd import pandas as pd
import collections
from igor2.binarywave import load as loadibw from igor2.binarywave import load as loadibw
import src.g5505_utils as utils import src.g5505_utils as utils
import src.metadata_review_lib as metadata #import src.metadata_review_lib as metadata
#from src.metadata_review_lib import parse_attribute #from src.metadata_review_lib import parse_attribute
import yaml import yaml
@ -14,8 +15,35 @@ import h5py
ROOT_DIR = os.path.abspath(os.curdir) ROOT_DIR = os.path.abspath(os.curdir)
def read_xps_ibw_file_as_dict(filename): def read_xps_ibw_file_as_dict(filename):
"""
Reads IBW files from the Multiphase Chemistry Group, which contain XPS spectra and acquisition settings,
and formats the data into a dictionary with the structure {datasets: list of datasets}. Each dataset in the
list has the following structure:
{
'name': 'name',
'data': data_array,
'data_units': 'units',
'shape': data_shape,
'dtype': data_type
}
Parameters
----------
filename : str
The IBW filename from the Multiphase Chemistry Group beamline.
Returns
-------
file_dict : dict
A dictionary containing the datasets from the IBW file.
Raises
------
ValueError
If the input IBW file is not a valid IBW file.
"""
""" Reads ibw files from multiphase chemistry group, which contain xps spectra and acquisition settings."""
file_obj = loadibw(filename) file_obj = loadibw(filename)
@ -77,21 +105,6 @@ def copy_file_in_group(source_file_path, dest_file_obj : h5py.File, dest_group_n
if 'tmp_files' in tmp_file_path: if 'tmp_files' in tmp_file_path:
os.remove(tmp_file_path) os.remove(tmp_file_path)
import re
def infer_units(column_name):
match = re.search('\[.+\]')
if match:
return match
else:
match = re.search('\(.+\)')
return match
from collections import Counter
def read_txt_files_as_dict(filename : str , work_with_copy : bool = True ): def read_txt_files_as_dict(filename : str , work_with_copy : bool = True ):
# Get the directory of the current module # Get the directory of the current module
@ -152,7 +165,7 @@ def read_txt_files_as_dict(filename : str , work_with_copy : bool = True ):
list_of_substrings = line.decode(file_encoding).split(separator) list_of_substrings = line.decode(file_encoding).split(separator)
# Count occurrences of each substring # Count occurrences of each substring
substring_counts = Counter(list_of_substrings) substring_counts = collections.Counter(list_of_substrings)
data_start = True data_start = True
# Generate column names with appended index only for repeated substrings # Generate column names with appended index only for repeated substrings
column_names = [f"{i}_{name.strip()}" if substring_counts[name] > 1 else name.strip() for i, name in enumerate(list_of_substrings)] column_names = [f"{i}_{name.strip()}" if substring_counts[name] > 1 else name.strip() for i, name in enumerate(list_of_substrings)]
@ -178,7 +191,7 @@ def read_txt_files_as_dict(filename : str , work_with_copy : bool = True ):
# https://docs.h5py.org/en/stable/strings.html # https://docs.h5py.org/en/stable/strings.html
if table_preamble: if table_preamble:
header_dict["table_preamble"] = metadata.convert_string_to_bytes(table_preamble) header_dict["table_preamble"] = utils.convert_string_to_bytes(table_preamble)
@ -260,9 +273,9 @@ def read_txt_files_as_dict(filename : str , work_with_copy : bool = True ):
if not key in numerical_variables: if not key in numerical_variables:
dataset['attributes'].pop(key) # delete key dataset['attributes'].pop(key) # delete key
else: else:
dataset['attributes'][key] = metadata.parse_attribute(dataset['attributes'][key]) dataset['attributes'][key] = utils.parse_attribute(dataset['attributes'][key])
if timestamps_name in categorical_variables: if timestamps_name in categorical_variables:
dataset['attributes'][timestamps_name] = metadata.parse_attribute({'unit':'YYYY-MM-DD HH:MM:SS.ffffff'}) dataset['attributes'][timestamps_name] = utils.parse_attribute({'unit':'YYYY-MM-DD HH:MM:SS.ffffff'})
except ValueError as err: except ValueError as err:
print(err) print(err)
@ -276,7 +289,7 @@ def read_txt_files_as_dict(filename : str , work_with_copy : bool = True ):
# dataset['shape'] = dataset['data'].shape # dataset['shape'] = dataset['data'].shape
# dataset['dtype'] = type(dataset['data']) # dataset['dtype'] = type(dataset['data'])
# if timestamps_name in categorical_variables: # if timestamps_name in categorical_variables:
# dataset['attributes'] = {timestamps_name: metadata.parse_attribute({'unit':'YYYY-MM-DD HH:MM:SS.ffffff'})} # dataset['attributes'] = {timestamps_name: utils.parse_attribute({'unit':'YYYY-MM-DD HH:MM:SS.ffffff'})}
# file_dict['datasets'].append(dataset) # file_dict['datasets'].append(dataset)

View File

@ -1,9 +1,13 @@
import pandas as pd import pandas as pd
import os import os
import sys
import shutil import shutil
import datetime import datetime
import logging import logging
import numpy as np import numpy as np
import h5py
import re
def setup_logging(log_dir, log_filename): def setup_logging(log_dir, log_filename):
"""Sets up logging to a specified directory and file. """Sets up logging to a specified directory and file.
@ -130,3 +134,112 @@ def dataframe_to_np_structured_array(df: pd.DataFrame):
structured_array = np.array(list(df.itertuples(index=False, name=None)), dtype=dtype) structured_array = np.array(list(df.itertuples(index=False, name=None)), dtype=dtype)
return structured_array return structured_array
def convert_string_to_bytes(input_list: list):
"""Convert a list of strings into a numpy array with utf8-type entries.
Parameters
----------
input_list (list) : list of string objects
Returns
-------
input_array_bytes (ndarray): array of ut8-type entries.
"""
utf8_type = lambda max_length: h5py.string_dtype('utf-8', max_length)
if input_list:
max_length = max(len(item) for item in input_list)
# Convert the strings to bytes with utf-8 encoding, specifying errors='ignore' to skip characters that cannot be encoded
input_list_bytes = [item.encode('utf-8', errors='ignore') for item in input_list]
input_array_bytes = np.array(input_list_bytes,dtype=utf8_type(max_length))
else:
input_array_bytes = np.array([],dtype=utf8_type(0))
return input_array_bytes
def infer_units(column_name):
# TODO: complete or remove
match = re.search('\[.+\]')
if match:
return match
else:
match = re.search('\(.+\)')
return match
def progressBar(count_value, total, suffix=''):
bar_length = 100
filled_up_Length = int(round(bar_length* count_value / float(total)))
percentage = round(100.0 * count_value/float(total),1)
bar = '=' * filled_up_Length + '-' * (bar_length - filled_up_Length)
sys.stdout.write('[%s] %s%s ...%s\r' %(bar, percentage, '%', suffix))
sys.stdout.flush()
def copy_directory_with_contraints(input_dir_path, output_dir_path, select_dir_keywords, select_file_keywords, allowed_file_extensions):
"""
Copies files from input_dir_path to output_dir_path based on specified constraints.
Parameters
----------
input_dir_path (str): Path to the input directory.
output_dir_path (str): Path to the output directory.
select_dir_keywords (list): List of keywords for selecting directories.
select_file_keywords (list): List of keywords for selecting files.
allowed_file_extensions (list): List of allowed file extensions.
Returns
-------
path_to_files_dict (dict): dictionary mapping directory paths to lists of copied file names satisfying the constraints.
"""
date = created_at()
log_dir='logs/'
setup_logging(log_dir, f"copy_directory_with_contraints_{date}.log")
def has_allowed_extension(filename):
return os.path.splitext(filename)[1] in allowed_file_extensions
def file_is_selected(filename):
return any(keyword in filename for keyword in select_file_keywords) if select_file_keywords else True
# Collect paths of directories, which are directly connected to the root dir and match select_dir_keywords
paths = []
if select_dir_keywords:
for item in os.listdir(input_dir_path): #Path(input_dir_path).iterdir():
if any([item in keyword for keyword in select_dir_keywords]):
paths.append(os.path.join(input_dir_path,item))
else:
paths.append(input_dir_path) #paths.append(Path(input_dir_path))
ROOT_DIR = input_dir_path
path_to_files_dict = {} # Dictionary to store directory-file pairs satisfying constraints
for subpath in paths:
for dirpath, _, filenames in os.walk(subpath,topdown=False):
# Reduce filenames to those that are admissible
admissible_filenames = [filename for filename in filenames if has_allowed_extension(filename) and file_is_selected(filename)]
if admissible_filenames: # Only create directory if there are files to copy
relative_dirpath = os.path.relpath(dirpath, ROOT_DIR)
target_dirpath = os.path.join(output_dir_path, relative_dirpath)
#path_to_files_dict[dirpath] = admissible_filenames
path_to_files_dict[target_dirpath] = admissible_filenames
os.makedirs(target_dirpath, exist_ok=True)
for filename in admissible_filenames:
src_file_path = os.path.join(dirpath, filename)
dest_file_path = os.path.join(target_dirpath, filename)
try:
shutil.copy2(src_file_path, dest_file_path)
except Exception as e:
logging.error("Failed to copy %s: %s", src_file_path, e)
return path_to_files_dict

View File

@ -26,14 +26,6 @@ ext_to_reader_dict = {'.ibw': g5505f_reader.read_xps_ibw_file_as_dict,
'.dat': lambda a1: g5505f_reader.read_txt_files_as_dict(a1,False), '.dat': lambda a1: g5505f_reader.read_txt_files_as_dict(a1,False),
'.h5': lambda a1,a2,a3: g5505f_reader.copy_file_in_group(a1,a2,a3,False)} '.h5': lambda a1,a2,a3: g5505f_reader.copy_file_in_group(a1,a2,a3,False)}
def progressBar(count_value, total, suffix=''):
bar_length = 100
filled_up_Length = int(round(bar_length* count_value / float(total)))
percentage = round(100.0 * count_value/float(total),1)
bar = '=' * filled_up_Length + '-' * (bar_length - filled_up_Length)
sys.stdout.write('[%s] %s%s ...%s\r' %(bar, percentage, '%', suffix))
sys.stdout.flush()
def read_mtable_as_dataframe(filename): def read_mtable_as_dataframe(filename):
""" Reconstruct a Matlab Table encoded in a .h5 file as a Pandas DataFrame. The input .h5 file """ Reconstruct a Matlab Table encoded in a .h5 file as a Pandas DataFrame. The input .h5 file
@ -224,70 +216,6 @@ def is_valid_directory_path(dirpath,select_dir_keywords):
def copy_directory_with_contraints(input_dir_path, output_dir_path, select_dir_keywords, select_file_keywords, allowed_file_extensions):
"""
Copies files from input_dir_path to output_dir_path based on specified constraints.
Parameters:
input_dir_path (str): Path to the input directory.
output_dir_path (str): Path to the output directory.
select_dir_keywords (list): List of keywords for selecting directories.
select_file_keywords (list): List of keywords for selecting files.
allowed_file_extensions (list): List of allowed file extensions.
Returns:
path_to_files_dict (dict): dictionary mapping directory paths to lists of copied file names satisfying the constraints.
"""
date = utils.created_at()
log_dir='logs/'
utils.setup_logging(log_dir, f"copy_directory_with_contraints_{date}.log")
def has_allowed_extension(filename):
return os.path.splitext(filename)[1] in allowed_file_extensions
def file_is_selected(filename):
return any(keyword in filename for keyword in select_file_keywords) if select_file_keywords else True
# Collect paths of directories, which are directly connected to the root dir and match select_dir_keywords
paths = []
if select_dir_keywords:
for item in os.listdir(input_dir_path): #Path(input_dir_path).iterdir():
if any([item in keyword for keyword in select_dir_keywords]):
paths.append(os.path.join(input_dir_path,item))
else:
paths.append(input_dir_path) #paths.append(Path(input_dir_path))
ROOT_DIR = input_dir_path
path_to_files_dict = {} # Dictionary to store directory-file pairs satisfying constraints
for subpath in paths:
for dirpath, _, filenames in os.walk(subpath,topdown=False):
# Reduce filenames to those that are admissible
admissible_filenames = [filename for filename in filenames if has_allowed_extension(filename) and file_is_selected(filename)]
if admissible_filenames: # Only create directory if there are files to copy
relative_dirpath = os.path.relpath(dirpath, ROOT_DIR)
target_dirpath = os.path.join(output_dir_path, relative_dirpath)
#path_to_files_dict[dirpath] = admissible_filenames
path_to_files_dict[target_dirpath] = admissible_filenames
os.makedirs(target_dirpath, exist_ok=True)
for filename in admissible_filenames:
src_file_path = os.path.join(dirpath, filename)
dest_file_path = os.path.join(target_dirpath, filename)
try:
shutil.copy2(src_file_path, dest_file_path)
except Exception as e:
logging.error("Failed to copy %s: %s", src_file_path, e)
return path_to_files_dict
def transfer_file_dict_to_hdf5(h5file, group_name, file_dict): def transfer_file_dict_to_hdf5(h5file, group_name, file_dict):
""" """
Transfers data from a file_dict to an HDF5 file. Transfers data from a file_dict to an HDF5 file.
@ -381,7 +309,7 @@ def create_hdf5_file_from_filesystem_path(output_filename : str,
# Copy input_directory into the output_dir_path, and work with it from now on # Copy input_directory into the output_dir_path, and work with it from now on
output_dir_path = os.path.splitext(output_filename)[0].replace('/',os.sep) output_dir_path = os.path.splitext(output_filename)[0].replace('/',os.sep)
path_to_filenames_dict = copy_directory_with_contraints(input_file_system_path, path_to_filenames_dict = utils.copy_directory_with_contraints(input_file_system_path,
output_dir_path, output_dir_path,
select_dir_keywords, select_dir_keywords,
select_file_keywords, select_file_keywords,
@ -442,7 +370,7 @@ def create_hdf5_file_from_filesystem_path(output_filename : str,
ext_to_reader_dict[file_ext](source_file_path, dest_file_obj, dest_group_name) ext_to_reader_dict[file_ext](source_file_path, dest_file_obj, dest_group_name)
# Update the progress bar and log the end message # Update the progress bar and log the end message
progressBar(dir_number, number_of_dirs, end_message) utils.progressBar(dir_number, number_of_dirs, end_message)
logging.info(end_message) logging.info(end_message)
dir_number = dir_number + 1 dir_number = dir_number + 1