Moved src/g5505_utils.py to utils/g5505_utils.py

This commit is contained in:
2024-08-23 07:27:39 +02:00
parent d7fc38abd9
commit 1112a214e9

257
utils/g5505_utils.py Normal file
View File

@ -0,0 +1,257 @@
import pandas as pd
import os
import sys
import shutil
import datetime
import logging
import numpy as np
import h5py
import re
def setup_logging(log_dir, log_filename):
"""Sets up logging to a specified directory and file.
Parameters:
log_dir (str): Directory to save the log file.
log_filename (str): Name of the log file.
"""
# Ensure the log directory exists
os.makedirs(log_dir, exist_ok=True)
# Create a logger instance
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# Create a file handler
log_path = os.path.join(log_dir, log_filename)
file_handler = logging.FileHandler(log_path)
# Create a formatter and set it for the handler
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
# Add the handler to the logger
logger.addHandler(file_handler)
def is_callable_list(x : list):
return all([callable(item) for item in x])
def is_str_list(x : list):
return all([isinstance(item,str) for item in x])
def augment_with_filetype(df):
df['filetype'] = [os.path.splitext(item)[1][1::] for item in df['filename']]
#return [os.path.splitext(item)[1][1::] for item in df['filename']]
return df
def augment_with_filenumber(df):
df['filenumber'] = [item[0:item.find('_')] for item in df['filename']]
#return [item[0:item.find('_')] for item in df['filename']]
return df
def group_by_df_column(df, column_name: str):
"""
df (pandas.DataFrame):
column_name (str): column_name of df by which grouping operation will take place.
"""
if not column_name in df.columns:
raise ValueError("column_name must be in the columns of df.")
return df[column_name]
def split_sample_col_into_sample_and_data_quality_cols(input_data: pd.DataFrame):
sample_name = []
sample_quality = []
for item in input_data['sample']:
if item.find('(')!=-1:
#print(item)
sample_name.append(item[0:item.find('(')])
sample_quality.append(item[item.find('(')+1:len(item)-1])
else:
if item=='':
sample_name.append('Not yet annotated')
sample_quality.append('unevaluated')
else:
sample_name.append(item)
sample_quality.append('good data')
input_data['sample'] = sample_name
input_data['data_quality'] = sample_quality
return input_data
def make_file_copy(source_file_path, output_folder_name : str = 'tmp_files'):
pathtail, filename = os.path.split(source_file_path)
#backup_filename = 'backup_'+ filename
backup_filename = filename
# Path
ROOT_DIR = os.path.abspath(os.curdir)
tmp_dirpath = os.path.join(ROOT_DIR,output_folder_name)
if not os.path.exists(tmp_dirpath):
os.mkdir(tmp_dirpath)
tmp_file_path = os.path.join(tmp_dirpath,backup_filename)
shutil.copy(source_file_path, tmp_file_path)
return tmp_file_path
def created_at():
now = datetime.datetime.now()
# Populate now object with time zone information obtained from the local system
now_tz_aware = now.astimezone()
tz = now_tz_aware.strftime('%z')
# Replace colons in the time part of the timestamp with hyphens to make it file name friendly
created_at = now_tz_aware.strftime('%Y-%m-%d_%H-%M-%S') + '_UTC-OFST_' + tz
return created_at
def dataframe_to_np_structured_array(df: pd.DataFrame):
# Define the dtype for the structured array, ensuring compatibility with h5py
dtype = []
for col in df.columns:
col_dtype = df[col].dtype
if pd.api.types.is_string_dtype(col_dtype):
# Convert string dtype to fixed-length strings
max_len = df[col].str.len().max()
dtype.append((col, f'S{max_len}'))
elif pd.api.types.is_integer_dtype(col_dtype):
dtype.append((col, 'i4')) # Assuming 32-bit integer
elif pd.api.types.is_float_dtype(col_dtype):
dtype.append((col, 'f4')) # Assuming 32-bit float
else:
raise ValueError(f"Unsupported dtype: {col_dtype}")
# Convert the DataFrame to a structured array
structured_array = np.array(list(df.itertuples(index=False, name=None)), dtype=dtype)
return structured_array
def convert_string_to_bytes(input_list: list):
"""Convert a list of strings into a numpy array with utf8-type entries.
Parameters
----------
input_list (list) : list of string objects
Returns
-------
input_array_bytes (ndarray): array of ut8-type entries.
"""
utf8_type = lambda max_length: h5py.string_dtype('utf-8', max_length)
if input_list:
max_length = max(len(item) for item in input_list)
# Convert the strings to bytes with utf-8 encoding, specifying errors='ignore' to skip characters that cannot be encoded
input_list_bytes = [item.encode('utf-8', errors='ignore') for item in input_list]
input_array_bytes = np.array(input_list_bytes,dtype=utf8_type(max_length))
else:
input_array_bytes = np.array([],dtype=utf8_type(0))
return input_array_bytes
def infer_units(column_name):
# TODO: complete or remove
match = re.search('\[.+\]')
if match:
return match
else:
match = re.search('\(.+\)')
return match
def parse_attribute(attr_value : dict):
"Parse a dictionary attribute into an equivalent numpy structured array, which compatible with compound HDF5 type"
dtype = []
values_list = []
max_length = max(len(str(attr_value[key])) for key in attr_value.keys())
for key in attr_value.keys():
if (not key=='rename_as'):
dtype.append((key,f'S{max_length}'))
values_list.append(attr_value[key])
if values_list:
new_attr_value = np.array([tuple(values_list)],dtype=dtype)
else:
new_attr_value = 'missing'
return new_attr_value
def progressBar(count_value, total, suffix=''):
bar_length = 100
filled_up_Length = int(round(bar_length* count_value / float(total)))
percentage = round(100.0 * count_value/float(total),1)
bar = '=' * filled_up_Length + '-' * (bar_length - filled_up_Length)
sys.stdout.write('[%s] %s%s ...%s\r' %(bar, percentage, '%', suffix))
sys.stdout.flush()
def copy_directory_with_contraints(input_dir_path, output_dir_path, select_dir_keywords, select_file_keywords, allowed_file_extensions):
"""
Copies files from input_dir_path to output_dir_path based on specified constraints.
Parameters
----------
input_dir_path (str): Path to the input directory.
output_dir_path (str): Path to the output directory.
select_dir_keywords (list): List of keywords for selecting directories.
select_file_keywords (list): List of keywords for selecting files.
allowed_file_extensions (list): List of allowed file extensions.
Returns
-------
path_to_files_dict (dict): dictionary mapping directory paths to lists of copied file names satisfying the constraints.
"""
date = created_at()
log_dir='logs/'
setup_logging(log_dir, f"copy_directory_with_contraints_{date}.log")
def has_allowed_extension(filename):
return os.path.splitext(filename)[1] in allowed_file_extensions
def file_is_selected(filename):
return any(keyword in filename for keyword in select_file_keywords) if select_file_keywords else True
# Collect paths of directories, which are directly connected to the root dir and match select_dir_keywords
paths = []
if select_dir_keywords:
for item in os.listdir(input_dir_path): #Path(input_dir_path).iterdir():
if any([item in keyword for keyword in select_dir_keywords]):
paths.append(os.path.join(input_dir_path,item))
else:
paths.append(input_dir_path) #paths.append(Path(input_dir_path))
ROOT_DIR = input_dir_path
path_to_files_dict = {} # Dictionary to store directory-file pairs satisfying constraints
for subpath in paths:
for dirpath, _, filenames in os.walk(subpath,topdown=False):
# Reduce filenames to those that are admissible
admissible_filenames = [filename for filename in filenames if has_allowed_extension(filename) and file_is_selected(filename)]
if admissible_filenames: # Only create directory if there are files to copy
relative_dirpath = os.path.relpath(dirpath, ROOT_DIR)
target_dirpath = os.path.join(output_dir_path, relative_dirpath)
#path_to_files_dict[dirpath] = admissible_filenames
path_to_files_dict[target_dirpath] = admissible_filenames
os.makedirs(target_dirpath, exist_ok=True)
for filename in admissible_filenames:
src_file_path = os.path.join(dirpath, filename)
dest_file_path = os.path.join(target_dirpath, filename)
try:
shutil.copy2(src_file_path, dest_file_path)
except Exception as e:
logging.error("Failed to copy %s: %s", src_file_path, e)
return path_to_files_dict