506 lines
19 KiB
Python
506 lines
19 KiB
Python
import sys
|
|
import os
|
|
|
|
try:
|
|
thisFilePath = os.path.abspath(__file__)
|
|
except NameError:
|
|
print("Error: __file__ is not available. Ensure the script is being run from a file.")
|
|
print("[Notice] Path to DIMA package may not be resolved properly.")
|
|
thisFilePath = os.getcwd() # Use current directory or specify a default
|
|
|
|
dimaPath = os.path.normpath(os.path.join(thisFilePath, "..",'..','..')) # Move up to project root
|
|
|
|
if dimaPath not in sys.path: # Avoid duplicate entries
|
|
sys.path.insert(0,dimaPath)
|
|
|
|
import pandas as pd
|
|
import os
|
|
import sys
|
|
import shutil
|
|
import datetime
|
|
import logging
|
|
import numpy as np
|
|
import h5py
|
|
import re
|
|
import yaml
|
|
|
|
def setup_logging(log_dir, log_filename):
|
|
"""Sets up logging to a specified directory and file.
|
|
|
|
Parameters:
|
|
log_dir (str): Directory to save the log file.
|
|
log_filename (str): Name of the log file.
|
|
"""
|
|
# Ensure the log directory exists
|
|
os.makedirs(log_dir, exist_ok=True)
|
|
|
|
# Create a logger instance
|
|
logger = logging.getLogger()
|
|
logger.setLevel(logging.INFO)
|
|
|
|
# Create a file handler
|
|
log_path = os.path.join(log_dir, log_filename)
|
|
file_handler = logging.FileHandler(log_path)
|
|
|
|
# Create a formatter and set it for the handler
|
|
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
file_handler.setFormatter(formatter)
|
|
|
|
# Add the handler to the logger
|
|
logger.addHandler(file_handler)
|
|
|
|
|
|
def is_callable_list(x : list):
|
|
return all([callable(item) for item in x])
|
|
|
|
def is_str_list(x : list):
|
|
return all([isinstance(item,str) for item in x])
|
|
|
|
def augment_with_filetype(df):
|
|
df['filetype'] = [os.path.splitext(item)[1][1::] for item in df['filename']]
|
|
#return [os.path.splitext(item)[1][1::] for item in df['filename']]
|
|
return df
|
|
|
|
def augment_with_filenumber(df):
|
|
df['filenumber'] = [item[0:item.find('_')] for item in df['filename']]
|
|
#return [item[0:item.find('_')] for item in df['filename']]
|
|
return df
|
|
|
|
def group_by_df_column(df, column_name: str):
|
|
"""
|
|
df (pandas.DataFrame):
|
|
column_name (str): column_name of df by which grouping operation will take place.
|
|
"""
|
|
|
|
if not column_name in df.columns:
|
|
raise ValueError("column_name must be in the columns of df.")
|
|
|
|
return df[column_name]
|
|
|
|
def split_sample_col_into_sample_and_data_quality_cols(input_data: pd.DataFrame):
|
|
|
|
sample_name = []
|
|
sample_quality = []
|
|
for item in input_data['sample']:
|
|
if item.find('(')!=-1:
|
|
#print(item)
|
|
sample_name.append(item[0:item.find('(')])
|
|
sample_quality.append(item[item.find('(')+1:len(item)-1])
|
|
else:
|
|
if item=='':
|
|
sample_name.append('Not yet annotated')
|
|
sample_quality.append('unevaluated')
|
|
else:
|
|
sample_name.append(item)
|
|
sample_quality.append('good data')
|
|
input_data['sample'] = sample_name
|
|
input_data['data_quality'] = sample_quality
|
|
|
|
return input_data
|
|
|
|
def make_file_copy(source_file_path, output_folder_name : str = 'tmp_files'):
|
|
|
|
pathtail, filename = os.path.split(source_file_path)
|
|
#backup_filename = 'backup_'+ filename
|
|
backup_filename = filename
|
|
# Path
|
|
ROOT_DIR = os.path.abspath(os.curdir)
|
|
|
|
tmp_dirpath = os.path.join(ROOT_DIR,output_folder_name)
|
|
if not os.path.exists(tmp_dirpath):
|
|
os.mkdir(tmp_dirpath)
|
|
|
|
tmp_file_path = os.path.join(tmp_dirpath,backup_filename)
|
|
shutil.copy(source_file_path, tmp_file_path)
|
|
|
|
return tmp_file_path
|
|
|
|
def created_at(datetime_format = '%Y-%m-%d %H:%M:%S'):
|
|
now = datetime.datetime.now()
|
|
# Populate now object with time zone information obtained from the local system
|
|
now_tz_aware = now.astimezone()
|
|
tz = now_tz_aware.strftime('%z')
|
|
# Replace colons in the time part of the timestamp with hyphens to make it file name friendly
|
|
created_at = now_tz_aware.strftime(datetime_format) #+ '_UTC-OFST_' + tz
|
|
return created_at
|
|
|
|
def sanitize_dataframe(df: pd.DataFrame) -> pd.DataFrame:
|
|
# Handle datetime columns (convert to string in 'yyyy-mm-dd hh:mm:ss' format)
|
|
datetime_cols = df.select_dtypes(include=['datetime']).columns
|
|
for col in datetime_cols:
|
|
# Convert datetime to string in the specified format, handling NaT
|
|
df[col] = df[col].dt.strftime('%Y-%m-%d %H-%M-%S')
|
|
|
|
# Handle object columns with mixed types
|
|
otype_cols = df.select_dtypes(include='O')
|
|
for col in otype_cols:
|
|
col_data = df[col]
|
|
|
|
# Check if all elements in the column are strings
|
|
if col_data.apply(lambda x: isinstance(x, str)).all():
|
|
df[col] = df[col].astype(str)
|
|
else:
|
|
# If the column contains mixed types, attempt to convert to numeric, coercing errors to NaN
|
|
df[col] = pd.to_numeric(col_data, errors='coerce')
|
|
|
|
# Handle NaN values differently based on dtype
|
|
if pd.api.types.is_string_dtype(df[col]):
|
|
# Replace NaN in string columns with empty string
|
|
df[col] = df[col].fillna('') # Replace NaN with empty string
|
|
elif pd.api.types.is_numeric_dtype(df[col]):
|
|
# For numeric columns, we want to keep NaN as it is
|
|
# But if integer column has NaN, consider casting to float
|
|
if pd.api.types.is_integer_dtype(df[col]):
|
|
df[col] = df[col].astype(float) # Cast to float to allow NaN
|
|
else:
|
|
df[col] = df[col].fillna(np.nan) # Keep NaN in float columns
|
|
|
|
return df
|
|
|
|
def convert_dataframe_to_np_structured_array(df: pd.DataFrame):
|
|
|
|
df = sanitize_dataframe(df)
|
|
# Define the dtype for the structured array, ensuring compatibility with h5py
|
|
dtype = []
|
|
for col in df.columns:
|
|
|
|
col_data = df[col]
|
|
col_dtype = col_data.dtype
|
|
|
|
try:
|
|
if pd.api.types.is_string_dtype(col_dtype):
|
|
# Convert string dtype to fixed-length strings
|
|
max_len = col_data.str.len().max() if not col_data.isnull().all() else 0
|
|
dtype.append((col, f'S{max_len}'))
|
|
elif pd.api.types.is_integer_dtype(col_dtype):
|
|
dtype.append((col, 'i4')) # Assuming 32-bit integer
|
|
elif pd.api.types.is_float_dtype(col_dtype):
|
|
dtype.append((col, 'f4')) # Assuming 32-bit float
|
|
elif pd.api.types.is_bool_dtype(col_dtype):
|
|
dtype.append((col,bool))
|
|
else:
|
|
# Handle unsupported data types
|
|
print(f"Unsupported dtype found in column '{col}': {col_data.dtype}")
|
|
raise ValueError(f"Unsupported data type: {col_data.dtype}")
|
|
|
|
except Exception as e:
|
|
# Log more detailed error message
|
|
print(f"Error processing column '{col}': {e}")
|
|
raise
|
|
|
|
# Convert the DataFrame to a structured array
|
|
structured_array = np.array(list(df.itertuples(index=False, name=None)), dtype=dtype)
|
|
|
|
return structured_array
|
|
|
|
def convert_string_to_bytes(input_list: list):
|
|
"""Convert a list of strings into a numpy array with utf8-type entries.
|
|
|
|
Parameters
|
|
----------
|
|
input_list (list) : list of string objects
|
|
|
|
Returns
|
|
-------
|
|
input_array_bytes (ndarray): array of ut8-type entries.
|
|
"""
|
|
utf8_type = lambda max_length: h5py.string_dtype('utf-8', max_length)
|
|
if input_list:
|
|
max_length = max(len(item) for item in input_list)
|
|
# Convert the strings to bytes with utf-8 encoding, specifying errors='ignore' to skip characters that cannot be encoded
|
|
input_list_bytes = [item.encode('utf-8', errors='ignore') for item in input_list]
|
|
input_array_bytes = np.array(input_list_bytes,dtype=utf8_type(max_length))
|
|
else:
|
|
input_array_bytes = np.array([],dtype=utf8_type(0))
|
|
|
|
return input_array_bytes
|
|
|
|
def convert_attrdict_to_np_structured_array(attr_value: dict):
|
|
"""
|
|
Converts a dictionary of attributes into a NumPy structured array with byte-encoded fields.
|
|
Handles UTF-8 encoding to avoid UnicodeEncodeError with non-ASCII characters.
|
|
|
|
Parameters
|
|
----------
|
|
attr_value : dict
|
|
Dictionary with scalar values (int, float, str).
|
|
|
|
Returns
|
|
-------
|
|
new_attr_value : ndarray
|
|
1-row structured array with fixed-size byte fields (dtype='S').
|
|
"""
|
|
if not isinstance(attr_value, dict):
|
|
raise ValueError(f"Input must be a dictionary, got {type(attr_value)}")
|
|
|
|
if not attr_value:
|
|
return np.array(['missing'], dtype=[('value', 'S16')]) # placeholder
|
|
|
|
dtype = []
|
|
values_list = []
|
|
|
|
max_str_len = max(len(str(v)) for v in attr_value.values())
|
|
byte_len = max_str_len * 4 # UTF-8 worst-case
|
|
|
|
for key, val in attr_value.items():
|
|
if key == 'rename_as':
|
|
continue
|
|
if isinstance(val, (int, float, str)):
|
|
dtype.append((key, f'S{byte_len}'))
|
|
try:
|
|
encoded_val = str(val).encode('utf-8') # explicit UTF-8
|
|
values_list.append(encoded_val)
|
|
except UnicodeEncodeError as e:
|
|
logging.error(f"Failed to encode {key}={val}: {e}")
|
|
raise
|
|
else:
|
|
logging.warning(f"Skipping unsupported type for key {key}: {type(val)}")
|
|
|
|
if values_list:
|
|
return np.array([tuple(values_list)], dtype=dtype)
|
|
else:
|
|
return np.array(['missing'], dtype=[('value', 'S16')])
|
|
|
|
|
|
def infer_units(column_name):
|
|
# TODO: complete or remove
|
|
|
|
match = re.search('\[.+\]')
|
|
|
|
if match:
|
|
return match
|
|
else:
|
|
match = re.search('\(.+\)')
|
|
|
|
return match
|
|
|
|
def progressBar(count_value, total, suffix=''):
|
|
bar_length = 100
|
|
filled_up_Length = int(round(bar_length* count_value / float(total)))
|
|
percentage = round(100.0 * count_value/float(total),1)
|
|
bar = '=' * filled_up_Length + '-' * (bar_length - filled_up_Length)
|
|
sys.stdout.write('[%s] %s%s ...%s\r' %(bar, percentage, '%', suffix))
|
|
sys.stdout.flush()
|
|
|
|
def copy_directory_with_contraints(input_dir_path, output_dir_path,
|
|
select_dir_keywords = None,
|
|
select_file_keywords = None,
|
|
allowed_file_extensions = None,
|
|
dry_run = False):
|
|
"""
|
|
Copies files from input_dir_path to output_dir_path based on specified constraints.
|
|
|
|
Parameters
|
|
----------
|
|
input_dir_path (str): Path to the input directory.
|
|
output_dir_path (str): Path to the output directory.
|
|
select_dir_keywords (list): optional, List of keywords for selecting directories.
|
|
select_file_keywords (list): optional, List of keywords for selecting files.
|
|
allowed_file_extensions (list): optional, List of allowed file extensions.
|
|
|
|
Returns
|
|
-------
|
|
path_to_files_dict (dict): dictionary mapping directory paths to lists of copied file names satisfying the constraints.
|
|
"""
|
|
|
|
# Unconstrained default behavior: No filters, make sure variable are lists even when defined as None in function signature
|
|
select_dir_keywords = select_dir_keywords or []
|
|
select_file_keywords = select_file_keywords or []
|
|
allowed_file_extensions = allowed_file_extensions or []
|
|
|
|
# Normalize paths and keywords to be consistently specified with os specific separator
|
|
input_dir_path = os.path.normpath(input_dir_path)
|
|
output_dir_path = os.path.normpath(output_dir_path)
|
|
select_dir_keywords = [keyword.replace('/',os.sep) for keyword in select_dir_keywords]
|
|
|
|
try:
|
|
with open(os.path.join(dimaPath, 'dima/utils/exclude_path_keywords.yaml'), 'r') as stream:
|
|
exclude_path_dict = yaml.safe_load(stream)
|
|
if isinstance(exclude_path_dict, dict):
|
|
exclude_path_keywords = exclude_path_dict.get('exclude_paths',{}).get('containing', [])
|
|
if not all(isinstance(keyword, str) for keyword in exclude_path_keywords):
|
|
exclude_path_keywords = []
|
|
else:
|
|
exclude_path_keywords = []
|
|
except (FileNotFoundError, yaml.YAMLError) as e:
|
|
print(f"Warning. Unable to load YAML file: {e}")
|
|
exclude_path_keywords = []
|
|
|
|
date = created_at('%Y_%m').replace(":", "-")
|
|
log_dir='logs/'
|
|
setup_logging(log_dir, f"copy_directory_with_contraints_{date}.log")
|
|
|
|
# Define helper functions. Return by default true when filtering lists are either None or []
|
|
def has_allowed_extension(filename):
|
|
return not allowed_file_extensions or os.path.splitext(filename)[1] in allowed_file_extensions
|
|
|
|
def file_is_selected(filename):
|
|
return not select_file_keywords or any(keyword in filename for keyword in select_file_keywords)
|
|
# Exclude path keywords
|
|
|
|
|
|
# Collect paths of directories, which are directly connected to the root dir and match select_dir_keywords
|
|
paths = []
|
|
if select_dir_keywords:
|
|
for item in os.listdir(input_dir_path): #Path(input_dir_path).iterdir():
|
|
if any([item in keyword for keyword in select_dir_keywords]):
|
|
paths.append(os.path.join(input_dir_path,item))
|
|
else:
|
|
paths.append(input_dir_path) #paths.append(Path(input_dir_path))
|
|
|
|
|
|
path_to_files_dict = {} # Dictionary to store directory-file pairs satisfying constraints
|
|
|
|
for subpath in paths:
|
|
|
|
for dirpath, _, filenames in os.walk(subpath,topdown=False):
|
|
|
|
# Exclude any dirpath containing a keyword in exclude_path_keywords
|
|
if any(excluded in dirpath for excluded in exclude_path_keywords):
|
|
continue
|
|
|
|
# Ensure composite keywords e.g., <keyword>/<keyword> are contained in the path
|
|
if select_dir_keywords and not any([keyword in dirpath for keyword in select_dir_keywords]):
|
|
continue
|
|
|
|
# Reduce filenames to those that are admissible
|
|
admissible_filenames = [
|
|
filename for filename in filenames
|
|
if file_is_selected(filename) and has_allowed_extension(filename)
|
|
]
|
|
|
|
if admissible_filenames: # Only create directory if there are files to copy
|
|
|
|
relative_dirpath = os.path.relpath(dirpath, input_dir_path)
|
|
target_dirpath = os.path.join(output_dir_path, relative_dirpath)
|
|
path_to_files_dict[target_dirpath] = admissible_filenames
|
|
|
|
if not dry_run:
|
|
|
|
# Perform the actual copying
|
|
|
|
os.makedirs(target_dirpath, exist_ok=True)
|
|
|
|
for filename in admissible_filenames:
|
|
src_file_path = os.path.join(dirpath, filename)
|
|
dest_file_path = os.path.join(target_dirpath, filename)
|
|
try:
|
|
shutil.copy2(src_file_path, dest_file_path)
|
|
except Exception as e:
|
|
logging.error("Failed to copy %s: %s", src_file_path, e)
|
|
|
|
return path_to_files_dict
|
|
|
|
def to_serializable_dtype(value):
|
|
|
|
"""Transform value's dtype into YAML/JSON compatible dtype
|
|
|
|
Parameters
|
|
----------
|
|
value : _type_
|
|
_description_
|
|
|
|
Returns
|
|
-------
|
|
_type_
|
|
_description_
|
|
"""
|
|
try:
|
|
if isinstance(value, np.generic):
|
|
if np.issubdtype(value.dtype, np.bytes_):
|
|
value = value.decode('utf-8')
|
|
elif np.issubdtype(value.dtype, np.unicode_):
|
|
value = str(value)
|
|
elif np.issubdtype(value.dtype, np.number):
|
|
value = float(value)
|
|
else:
|
|
print('Yaml-compatible data-type was not found. Value has been set to NaN.')
|
|
value = np.nan
|
|
elif isinstance(value, np.ndarray):
|
|
# Handling structured array types (with fields)
|
|
if value.dtype.names:
|
|
value = {field: to_serializable_dtype(value[field]) for field in value.dtype.names}
|
|
else:
|
|
# Handling regular array NumPy types with assumption of unform dtype accross array elements
|
|
# TODO: evaluate a more general way to check for individual dtypes
|
|
if isinstance(value[0], bytes):
|
|
# Decode bytes
|
|
value = [item.decode('utf-8') for item in value] if len(value) > 1 else value[0].decode('utf-8')
|
|
elif isinstance(value[0], str):
|
|
# Already a string type
|
|
value = [str(item) for item in value] if len(value) > 1 else str(value[0])
|
|
elif isinstance(value[0], int):
|
|
# Integer type
|
|
value = [int(item) for item in value] if len(value) > 1 else int(value[0])
|
|
elif isinstance(value[0], float):
|
|
# Floating type
|
|
value = [float(item) for item in value] if len(value) > 1 else float(value[0])
|
|
else:
|
|
print('Yaml-compatible data-type was not found. Value has been set to NaN.')
|
|
print("Debug: value.dtype is", value.dtype)
|
|
value = np.nan
|
|
|
|
except Exception as e:
|
|
print(f'Error converting value: {e}. Value has been set to NaN.')
|
|
value = np.nan
|
|
|
|
return value
|
|
|
|
def is_structured_array(attr_val):
|
|
if isinstance(attr_val,np.ndarray):
|
|
return True if attr_val.dtype.names is not None else False
|
|
else:
|
|
return False
|
|
|
|
import os
|
|
from pathlib import Path
|
|
|
|
def find_env_file(start_path=None):
|
|
"""
|
|
Find .env file by walking up the directory tree.
|
|
Looks for .env in current dir, then parent dirs up to filesystem root.
|
|
|
|
Args:
|
|
start_path: Starting directory (defaults to current working directory)
|
|
|
|
Returns:
|
|
Path to .env file or None if not found
|
|
"""
|
|
if start_path is None:
|
|
start_path = os.getcwd()
|
|
|
|
current_path = Path(start_path).resolve()
|
|
|
|
# Walk up the directory tree
|
|
for path in [current_path] + list(current_path.parents):
|
|
env_file = path / '.env'
|
|
if env_file.exists():
|
|
return str(env_file)
|
|
|
|
return None
|
|
|
|
import os
|
|
|
|
def load_env_from_root():
|
|
"""Load environment variables from .env file found in project root or parent."""
|
|
env_file = find_env_file()
|
|
|
|
if env_file:
|
|
try:
|
|
from dotenv import load_dotenv
|
|
load_dotenv(env_file, override=True) # override existing values
|
|
print(f"Loaded .env from: {env_file}")
|
|
return True
|
|
except ImportError:
|
|
with open(env_file, 'r') as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if line and not line.startswith('#') and '=' in line:
|
|
key, value = line.split('=', 1)
|
|
os.environ[key.strip()] = value.strip()
|
|
print(f"Manually loaded .env from: {env_file}")
|
|
return True
|
|
|
|
else:
|
|
print("No .env file found in project hierarchy")
|
|
return False |