From 0a0b4ac41d711141e01215b36178523dcc76a631 Mon Sep 17 00:00:00 2001 From: Florez Ospina Juan Felipe Date: Wed, 10 Jul 2024 09:19:30 +0200 Subject: [PATCH] Moved a few functions from ...reader.py and hdf5_lib.py into ..utils.py, and refactored accordingly. --- src/g5505_file_reader.py | 57 +++++++++++-------- src/g5505_utils.py | 115 ++++++++++++++++++++++++++++++++++++++- src/hdf5_lib.py | 76 +------------------------- 3 files changed, 151 insertions(+), 97 deletions(-) diff --git a/src/g5505_file_reader.py b/src/g5505_file_reader.py index fed0601..5a718cc 100644 --- a/src/g5505_file_reader.py +++ b/src/g5505_file_reader.py @@ -2,10 +2,11 @@ import os import numpy as np import pandas as pd +import collections from igor2.binarywave import load as loadibw import src.g5505_utils as utils -import src.metadata_review_lib as metadata +#import src.metadata_review_lib as metadata #from src.metadata_review_lib import parse_attribute import yaml @@ -14,8 +15,35 @@ import h5py ROOT_DIR = os.path.abspath(os.curdir) def read_xps_ibw_file_as_dict(filename): + """ + Reads IBW files from the Multiphase Chemistry Group, which contain XPS spectra and acquisition settings, + and formats the data into a dictionary with the structure {datasets: list of datasets}. Each dataset in the + list has the following structure: + + { + 'name': 'name', + 'data': data_array, + 'data_units': 'units', + 'shape': data_shape, + 'dtype': data_type + } + + Parameters + ---------- + filename : str + The IBW filename from the Multiphase Chemistry Group beamline. + + Returns + ------- + file_dict : dict + A dictionary containing the datasets from the IBW file. + + Raises + ------ + ValueError + If the input IBW file is not a valid IBW file. + """ - """ Reads ibw files from multiphase chemistry group, which contain xps spectra and acquisition settings.""" file_obj = loadibw(filename) @@ -77,21 +105,6 @@ def copy_file_in_group(source_file_path, dest_file_obj : h5py.File, dest_group_n if 'tmp_files' in tmp_file_path: os.remove(tmp_file_path) -import re - -def infer_units(column_name): - - match = re.search('\[.+\]') - - if match: - return match - else: - match = re.search('\(.+\)') - - return match - -from collections import Counter - def read_txt_files_as_dict(filename : str , work_with_copy : bool = True ): # Get the directory of the current module @@ -152,7 +165,7 @@ def read_txt_files_as_dict(filename : str , work_with_copy : bool = True ): list_of_substrings = line.decode(file_encoding).split(separator) # Count occurrences of each substring - substring_counts = Counter(list_of_substrings) + substring_counts = collections.Counter(list_of_substrings) data_start = True # Generate column names with appended index only for repeated substrings column_names = [f"{i}_{name.strip()}" if substring_counts[name] > 1 else name.strip() for i, name in enumerate(list_of_substrings)] @@ -178,7 +191,7 @@ def read_txt_files_as_dict(filename : str , work_with_copy : bool = True ): # https://docs.h5py.org/en/stable/strings.html if table_preamble: - header_dict["table_preamble"] = metadata.convert_string_to_bytes(table_preamble) + header_dict["table_preamble"] = utils.convert_string_to_bytes(table_preamble) @@ -260,9 +273,9 @@ def read_txt_files_as_dict(filename : str , work_with_copy : bool = True ): if not key in numerical_variables: dataset['attributes'].pop(key) # delete key else: - dataset['attributes'][key] = metadata.parse_attribute(dataset['attributes'][key]) + dataset['attributes'][key] = utils.parse_attribute(dataset['attributes'][key]) if timestamps_name in categorical_variables: - dataset['attributes'][timestamps_name] = metadata.parse_attribute({'unit':'YYYY-MM-DD HH:MM:SS.ffffff'}) + dataset['attributes'][timestamps_name] = utils.parse_attribute({'unit':'YYYY-MM-DD HH:MM:SS.ffffff'}) except ValueError as err: print(err) @@ -276,7 +289,7 @@ def read_txt_files_as_dict(filename : str , work_with_copy : bool = True ): # dataset['shape'] = dataset['data'].shape # dataset['dtype'] = type(dataset['data']) # if timestamps_name in categorical_variables: - # dataset['attributes'] = {timestamps_name: metadata.parse_attribute({'unit':'YYYY-MM-DD HH:MM:SS.ffffff'})} + # dataset['attributes'] = {timestamps_name: utils.parse_attribute({'unit':'YYYY-MM-DD HH:MM:SS.ffffff'})} # file_dict['datasets'].append(dataset) diff --git a/src/g5505_utils.py b/src/g5505_utils.py index b9e61c3..401c3d0 100644 --- a/src/g5505_utils.py +++ b/src/g5505_utils.py @@ -1,9 +1,13 @@ import pandas as pd import os +import sys import shutil import datetime import logging import numpy as np +import h5py +import re + def setup_logging(log_dir, log_filename): """Sets up logging to a specified directory and file. @@ -129,4 +133,113 @@ def dataframe_to_np_structured_array(df: pd.DataFrame): # Convert the DataFrame to a structured array structured_array = np.array(list(df.itertuples(index=False, name=None)), dtype=dtype) - return structured_array \ No newline at end of file + return structured_array + +def convert_string_to_bytes(input_list: list): + """Convert a list of strings into a numpy array with utf8-type entries. + + Parameters + ---------- + input_list (list) : list of string objects + + Returns + ------- + input_array_bytes (ndarray): array of ut8-type entries. + """ + utf8_type = lambda max_length: h5py.string_dtype('utf-8', max_length) + if input_list: + max_length = max(len(item) for item in input_list) + # Convert the strings to bytes with utf-8 encoding, specifying errors='ignore' to skip characters that cannot be encoded + input_list_bytes = [item.encode('utf-8', errors='ignore') for item in input_list] + input_array_bytes = np.array(input_list_bytes,dtype=utf8_type(max_length)) + else: + input_array_bytes = np.array([],dtype=utf8_type(0)) + + return input_array_bytes + +def infer_units(column_name): + # TODO: complete or remove + + match = re.search('\[.+\]') + + if match: + return match + else: + match = re.search('\(.+\)') + + return match + + +def progressBar(count_value, total, suffix=''): + bar_length = 100 + filled_up_Length = int(round(bar_length* count_value / float(total))) + percentage = round(100.0 * count_value/float(total),1) + bar = '=' * filled_up_Length + '-' * (bar_length - filled_up_Length) + sys.stdout.write('[%s] %s%s ...%s\r' %(bar, percentage, '%', suffix)) + sys.stdout.flush() + +def copy_directory_with_contraints(input_dir_path, output_dir_path, select_dir_keywords, select_file_keywords, allowed_file_extensions): + """ + Copies files from input_dir_path to output_dir_path based on specified constraints. + + Parameters + ---------- + input_dir_path (str): Path to the input directory. + output_dir_path (str): Path to the output directory. + select_dir_keywords (list): List of keywords for selecting directories. + select_file_keywords (list): List of keywords for selecting files. + allowed_file_extensions (list): List of allowed file extensions. + + Returns + ------- + path_to_files_dict (dict): dictionary mapping directory paths to lists of copied file names satisfying the constraints. + """ + + date = created_at() + log_dir='logs/' + setup_logging(log_dir, f"copy_directory_with_contraints_{date}.log") + + def has_allowed_extension(filename): + return os.path.splitext(filename)[1] in allowed_file_extensions + + def file_is_selected(filename): + return any(keyword in filename for keyword in select_file_keywords) if select_file_keywords else True + + + # Collect paths of directories, which are directly connected to the root dir and match select_dir_keywords + paths = [] + if select_dir_keywords: + for item in os.listdir(input_dir_path): #Path(input_dir_path).iterdir(): + if any([item in keyword for keyword in select_dir_keywords]): + paths.append(os.path.join(input_dir_path,item)) + else: + paths.append(input_dir_path) #paths.append(Path(input_dir_path)) + + ROOT_DIR = input_dir_path + path_to_files_dict = {} # Dictionary to store directory-file pairs satisfying constraints + + for subpath in paths: + + for dirpath, _, filenames in os.walk(subpath,topdown=False): + + # Reduce filenames to those that are admissible + admissible_filenames = [filename for filename in filenames if has_allowed_extension(filename) and file_is_selected(filename)] + + if admissible_filenames: # Only create directory if there are files to copy + + + relative_dirpath = os.path.relpath(dirpath, ROOT_DIR) + target_dirpath = os.path.join(output_dir_path, relative_dirpath) + #path_to_files_dict[dirpath] = admissible_filenames + path_to_files_dict[target_dirpath] = admissible_filenames + os.makedirs(target_dirpath, exist_ok=True) + + for filename in admissible_filenames: + src_file_path = os.path.join(dirpath, filename) + dest_file_path = os.path.join(target_dirpath, filename) + try: + shutil.copy2(src_file_path, dest_file_path) + except Exception as e: + logging.error("Failed to copy %s: %s", src_file_path, e) + + return path_to_files_dict \ No newline at end of file diff --git a/src/hdf5_lib.py b/src/hdf5_lib.py index 931169c..3d7b5de 100644 --- a/src/hdf5_lib.py +++ b/src/hdf5_lib.py @@ -26,14 +26,6 @@ ext_to_reader_dict = {'.ibw': g5505f_reader.read_xps_ibw_file_as_dict, '.dat': lambda a1: g5505f_reader.read_txt_files_as_dict(a1,False), '.h5': lambda a1,a2,a3: g5505f_reader.copy_file_in_group(a1,a2,a3,False)} -def progressBar(count_value, total, suffix=''): - bar_length = 100 - filled_up_Length = int(round(bar_length* count_value / float(total))) - percentage = round(100.0 * count_value/float(total),1) - bar = '=' * filled_up_Length + '-' * (bar_length - filled_up_Length) - sys.stdout.write('[%s] %s%s ...%s\r' %(bar, percentage, '%', suffix)) - sys.stdout.flush() - def read_mtable_as_dataframe(filename): """ Reconstruct a Matlab Table encoded in a .h5 file as a Pandas DataFrame. The input .h5 file @@ -224,70 +216,6 @@ def is_valid_directory_path(dirpath,select_dir_keywords): -def copy_directory_with_contraints(input_dir_path, output_dir_path, select_dir_keywords, select_file_keywords, allowed_file_extensions): - """ - Copies files from input_dir_path to output_dir_path based on specified constraints. - - Parameters: - input_dir_path (str): Path to the input directory. - output_dir_path (str): Path to the output directory. - select_dir_keywords (list): List of keywords for selecting directories. - select_file_keywords (list): List of keywords for selecting files. - allowed_file_extensions (list): List of allowed file extensions. - - Returns: - path_to_files_dict (dict): dictionary mapping directory paths to lists of copied file names satisfying the constraints. - """ - - date = utils.created_at() - log_dir='logs/' - utils.setup_logging(log_dir, f"copy_directory_with_contraints_{date}.log") - - def has_allowed_extension(filename): - return os.path.splitext(filename)[1] in allowed_file_extensions - - def file_is_selected(filename): - return any(keyword in filename for keyword in select_file_keywords) if select_file_keywords else True - - - # Collect paths of directories, which are directly connected to the root dir and match select_dir_keywords - paths = [] - if select_dir_keywords: - for item in os.listdir(input_dir_path): #Path(input_dir_path).iterdir(): - if any([item in keyword for keyword in select_dir_keywords]): - paths.append(os.path.join(input_dir_path,item)) - else: - paths.append(input_dir_path) #paths.append(Path(input_dir_path)) - - ROOT_DIR = input_dir_path - path_to_files_dict = {} # Dictionary to store directory-file pairs satisfying constraints - - for subpath in paths: - - for dirpath, _, filenames in os.walk(subpath,topdown=False): - - # Reduce filenames to those that are admissible - admissible_filenames = [filename for filename in filenames if has_allowed_extension(filename) and file_is_selected(filename)] - - if admissible_filenames: # Only create directory if there are files to copy - - - relative_dirpath = os.path.relpath(dirpath, ROOT_DIR) - target_dirpath = os.path.join(output_dir_path, relative_dirpath) - #path_to_files_dict[dirpath] = admissible_filenames - path_to_files_dict[target_dirpath] = admissible_filenames - os.makedirs(target_dirpath, exist_ok=True) - - for filename in admissible_filenames: - src_file_path = os.path.join(dirpath, filename) - dest_file_path = os.path.join(target_dirpath, filename) - try: - shutil.copy2(src_file_path, dest_file_path) - except Exception as e: - logging.error("Failed to copy %s: %s", src_file_path, e) - - return path_to_files_dict - def transfer_file_dict_to_hdf5(h5file, group_name, file_dict): """ Transfers data from a file_dict to an HDF5 file. @@ -381,7 +309,7 @@ def create_hdf5_file_from_filesystem_path(output_filename : str, # Copy input_directory into the output_dir_path, and work with it from now on output_dir_path = os.path.splitext(output_filename)[0].replace('/',os.sep) - path_to_filenames_dict = copy_directory_with_contraints(input_file_system_path, + path_to_filenames_dict = utils.copy_directory_with_contraints(input_file_system_path, output_dir_path, select_dir_keywords, select_file_keywords, @@ -442,7 +370,7 @@ def create_hdf5_file_from_filesystem_path(output_filename : str, ext_to_reader_dict[file_ext](source_file_path, dest_file_obj, dest_group_name) # Update the progress bar and log the end message - progressBar(dir_number, number_of_dirs, end_message) + utils.progressBar(dir_number, number_of_dirs, end_message) logging.info(end_message) dir_number = dir_number + 1