import pandas as pd import os import sys import shutil import datetime import logging import numpy as np import h5py import re def setup_logging(log_dir, log_filename): """Sets up logging to a specified directory and file. Parameters: log_dir (str): Directory to save the log file. log_filename (str): Name of the log file. """ # Ensure the log directory exists os.makedirs(log_dir, exist_ok=True) # Create a logger instance logger = logging.getLogger() logger.setLevel(logging.INFO) # Create a file handler log_path = os.path.join(log_dir, log_filename) file_handler = logging.FileHandler(log_path) # Create a formatter and set it for the handler formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') file_handler.setFormatter(formatter) # Add the handler to the logger logger.addHandler(file_handler) def is_callable_list(x : list): return all([callable(item) for item in x]) def is_str_list(x : list): return all([isinstance(item,str) for item in x]) def augment_with_filetype(df): df['filetype'] = [os.path.splitext(item)[1][1::] for item in df['filename']] #return [os.path.splitext(item)[1][1::] for item in df['filename']] return df def augment_with_filenumber(df): df['filenumber'] = [item[0:item.find('_')] for item in df['filename']] #return [item[0:item.find('_')] for item in df['filename']] return df def group_by_df_column(df, column_name: str): """ df (pandas.DataFrame): column_name (str): column_name of df by which grouping operation will take place. """ if not column_name in df.columns: raise ValueError("column_name must be in the columns of df.") return df[column_name] def split_sample_col_into_sample_and_data_quality_cols(input_data: pd.DataFrame): sample_name = [] sample_quality = [] for item in input_data['sample']: if item.find('(')!=-1: #print(item) sample_name.append(item[0:item.find('(')]) sample_quality.append(item[item.find('(')+1:len(item)-1]) else: if item=='': sample_name.append('Not yet annotated') sample_quality.append('unevaluated') else: sample_name.append(item) sample_quality.append('good data') input_data['sample'] = sample_name input_data['data_quality'] = sample_quality return input_data def make_file_copy(source_file_path, output_folder_name : str = 'tmp_files'): pathtail, filename = os.path.split(source_file_path) #backup_filename = 'backup_'+ filename backup_filename = filename # Path ROOT_DIR = os.path.abspath(os.curdir) tmp_dirpath = os.path.join(ROOT_DIR,output_folder_name) if not os.path.exists(tmp_dirpath): os.mkdir(tmp_dirpath) tmp_file_path = os.path.join(tmp_dirpath,backup_filename) shutil.copy(source_file_path, tmp_file_path) return tmp_file_path def created_at(): now = datetime.datetime.now() # Populate now object with time zone information obtained from the local system now_tz_aware = now.astimezone() tz = now_tz_aware.strftime('%z') # Replace colons in the time part of the timestamp with hyphens to make it file name friendly created_at = now_tz_aware.strftime('%Y-%m-%d_%H-%M-%S') + '_UTC-OFST_' + tz return created_at def dataframe_to_np_structured_array(df: pd.DataFrame): # Define the dtype for the structured array, ensuring compatibility with h5py dtype = [] for col in df.columns: col_dtype = df[col].dtype if pd.api.types.is_string_dtype(col_dtype): # Convert string dtype to fixed-length strings max_len = df[col].str.len().max() dtype.append((col, f'S{max_len}')) elif pd.api.types.is_integer_dtype(col_dtype): dtype.append((col, 'i4')) # Assuming 32-bit integer elif pd.api.types.is_float_dtype(col_dtype): dtype.append((col, 'f4')) # Assuming 32-bit float else: raise ValueError(f"Unsupported dtype: {col_dtype}") # Convert the DataFrame to a structured array structured_array = np.array(list(df.itertuples(index=False, name=None)), dtype=dtype) return structured_array def convert_string_to_bytes(input_list: list): """Convert a list of strings into a numpy array with utf8-type entries. Parameters ---------- input_list (list) : list of string objects Returns ------- input_array_bytes (ndarray): array of ut8-type entries. """ utf8_type = lambda max_length: h5py.string_dtype('utf-8', max_length) if input_list: max_length = max(len(item) for item in input_list) # Convert the strings to bytes with utf-8 encoding, specifying errors='ignore' to skip characters that cannot be encoded input_list_bytes = [item.encode('utf-8', errors='ignore') for item in input_list] input_array_bytes = np.array(input_list_bytes,dtype=utf8_type(max_length)) else: input_array_bytes = np.array([],dtype=utf8_type(0)) return input_array_bytes def infer_units(column_name): # TODO: complete or remove match = re.search('\[.+\]') if match: return match else: match = re.search('\(.+\)') return match def progressBar(count_value, total, suffix=''): bar_length = 100 filled_up_Length = int(round(bar_length* count_value / float(total))) percentage = round(100.0 * count_value/float(total),1) bar = '=' * filled_up_Length + '-' * (bar_length - filled_up_Length) sys.stdout.write('[%s] %s%s ...%s\r' %(bar, percentage, '%', suffix)) sys.stdout.flush() def copy_directory_with_contraints(input_dir_path, output_dir_path, select_dir_keywords, select_file_keywords, allowed_file_extensions): """ Copies files from input_dir_path to output_dir_path based on specified constraints. Parameters ---------- input_dir_path (str): Path to the input directory. output_dir_path (str): Path to the output directory. select_dir_keywords (list): List of keywords for selecting directories. select_file_keywords (list): List of keywords for selecting files. allowed_file_extensions (list): List of allowed file extensions. Returns ------- path_to_files_dict (dict): dictionary mapping directory paths to lists of copied file names satisfying the constraints. """ date = created_at() log_dir='logs/' setup_logging(log_dir, f"copy_directory_with_contraints_{date}.log") def has_allowed_extension(filename): return os.path.splitext(filename)[1] in allowed_file_extensions def file_is_selected(filename): return any(keyword in filename for keyword in select_file_keywords) if select_file_keywords else True # Collect paths of directories, which are directly connected to the root dir and match select_dir_keywords paths = [] if select_dir_keywords: for item in os.listdir(input_dir_path): #Path(input_dir_path).iterdir(): if any([item in keyword for keyword in select_dir_keywords]): paths.append(os.path.join(input_dir_path,item)) else: paths.append(input_dir_path) #paths.append(Path(input_dir_path)) ROOT_DIR = input_dir_path path_to_files_dict = {} # Dictionary to store directory-file pairs satisfying constraints for subpath in paths: for dirpath, _, filenames in os.walk(subpath,topdown=False): # Reduce filenames to those that are admissible admissible_filenames = [filename for filename in filenames if has_allowed_extension(filename) and file_is_selected(filename)] if admissible_filenames: # Only create directory if there are files to copy relative_dirpath = os.path.relpath(dirpath, ROOT_DIR) target_dirpath = os.path.join(output_dir_path, relative_dirpath) #path_to_files_dict[dirpath] = admissible_filenames path_to_files_dict[target_dirpath] = admissible_filenames os.makedirs(target_dirpath, exist_ok=True) for filename in admissible_filenames: src_file_path = os.path.join(dirpath, filename) dest_file_path = os.path.join(target_dirpath, filename) try: shutil.copy2(src_file_path, dest_file_path) except Exception as e: logging.error("Failed to copy %s: %s", src_file_path, e) return path_to_files_dict