Files
dima/utils/g5505_utils.py

506 lines
19 KiB
Python

import sys
import os
try:
thisFilePath = os.path.abspath(__file__)
except NameError:
print("Error: __file__ is not available. Ensure the script is being run from a file.")
print("[Notice] Path to DIMA package may not be resolved properly.")
thisFilePath = os.getcwd() # Use current directory or specify a default
dimaPath = os.path.normpath(os.path.join(thisFilePath, "..",'..','..')) # Move up to project root
if dimaPath not in sys.path: # Avoid duplicate entries
sys.path.insert(0,dimaPath)
import pandas as pd
import os
import sys
import shutil
import datetime
import logging
import numpy as np
import h5py
import re
import yaml
def setup_logging(log_dir, log_filename):
"""Sets up logging to a specified directory and file.
Parameters:
log_dir (str): Directory to save the log file.
log_filename (str): Name of the log file.
"""
# Ensure the log directory exists
os.makedirs(log_dir, exist_ok=True)
# Create a logger instance
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# Create a file handler
log_path = os.path.join(log_dir, log_filename)
file_handler = logging.FileHandler(log_path)
# Create a formatter and set it for the handler
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
# Add the handler to the logger
logger.addHandler(file_handler)
def is_callable_list(x : list):
return all([callable(item) for item in x])
def is_str_list(x : list):
return all([isinstance(item,str) for item in x])
def augment_with_filetype(df):
df['filetype'] = [os.path.splitext(item)[1][1::] for item in df['filename']]
#return [os.path.splitext(item)[1][1::] for item in df['filename']]
return df
def augment_with_filenumber(df):
df['filenumber'] = [item[0:item.find('_')] for item in df['filename']]
#return [item[0:item.find('_')] for item in df['filename']]
return df
def group_by_df_column(df, column_name: str):
"""
df (pandas.DataFrame):
column_name (str): column_name of df by which grouping operation will take place.
"""
if not column_name in df.columns:
raise ValueError("column_name must be in the columns of df.")
return df[column_name]
def split_sample_col_into_sample_and_data_quality_cols(input_data: pd.DataFrame):
sample_name = []
sample_quality = []
for item in input_data['sample']:
if item.find('(')!=-1:
#print(item)
sample_name.append(item[0:item.find('(')])
sample_quality.append(item[item.find('(')+1:len(item)-1])
else:
if item=='':
sample_name.append('Not yet annotated')
sample_quality.append('unevaluated')
else:
sample_name.append(item)
sample_quality.append('good data')
input_data['sample'] = sample_name
input_data['data_quality'] = sample_quality
return input_data
def make_file_copy(source_file_path, output_folder_name : str = 'tmp_files'):
pathtail, filename = os.path.split(source_file_path)
#backup_filename = 'backup_'+ filename
backup_filename = filename
# Path
ROOT_DIR = os.path.abspath(os.curdir)
tmp_dirpath = os.path.join(ROOT_DIR,output_folder_name)
if not os.path.exists(tmp_dirpath):
os.mkdir(tmp_dirpath)
tmp_file_path = os.path.join(tmp_dirpath,backup_filename)
shutil.copy(source_file_path, tmp_file_path)
return tmp_file_path
def created_at(datetime_format = '%Y-%m-%d %H:%M:%S'):
now = datetime.datetime.now()
# Populate now object with time zone information obtained from the local system
now_tz_aware = now.astimezone()
tz = now_tz_aware.strftime('%z')
# Replace colons in the time part of the timestamp with hyphens to make it file name friendly
created_at = now_tz_aware.strftime(datetime_format) #+ '_UTC-OFST_' + tz
return created_at
def sanitize_dataframe(df: pd.DataFrame) -> pd.DataFrame:
# Handle datetime columns (convert to string in 'yyyy-mm-dd hh:mm:ss' format)
datetime_cols = df.select_dtypes(include=['datetime']).columns
for col in datetime_cols:
# Convert datetime to string in the specified format, handling NaT
df[col] = df[col].dt.strftime('%Y-%m-%d %H-%M-%S')
# Handle object columns with mixed types
otype_cols = df.select_dtypes(include='O')
for col in otype_cols:
col_data = df[col]
# Check if all elements in the column are strings
if col_data.apply(lambda x: isinstance(x, str)).all():
df[col] = df[col].astype(str)
else:
# If the column contains mixed types, attempt to convert to numeric, coercing errors to NaN
df[col] = pd.to_numeric(col_data, errors='coerce')
# Handle NaN values differently based on dtype
if pd.api.types.is_string_dtype(df[col]):
# Replace NaN in string columns with empty string
df[col] = df[col].fillna('') # Replace NaN with empty string
elif pd.api.types.is_numeric_dtype(df[col]):
# For numeric columns, we want to keep NaN as it is
# But if integer column has NaN, consider casting to float
if pd.api.types.is_integer_dtype(df[col]):
df[col] = df[col].astype(float) # Cast to float to allow NaN
else:
df[col] = df[col].fillna(np.nan) # Keep NaN in float columns
return df
def convert_dataframe_to_np_structured_array(df: pd.DataFrame):
df = sanitize_dataframe(df)
# Define the dtype for the structured array, ensuring compatibility with h5py
dtype = []
for col in df.columns:
col_data = df[col]
col_dtype = col_data.dtype
try:
if pd.api.types.is_string_dtype(col_dtype):
# Convert string dtype to fixed-length strings
max_len = col_data.str.len().max() if not col_data.isnull().all() else 0
dtype.append((col, f'S{max_len}'))
elif pd.api.types.is_integer_dtype(col_dtype):
dtype.append((col, 'i4')) # Assuming 32-bit integer
elif pd.api.types.is_float_dtype(col_dtype):
dtype.append((col, 'f4')) # Assuming 32-bit float
elif pd.api.types.is_bool_dtype(col_dtype):
dtype.append((col,bool))
else:
# Handle unsupported data types
print(f"Unsupported dtype found in column '{col}': {col_data.dtype}")
raise ValueError(f"Unsupported data type: {col_data.dtype}")
except Exception as e:
# Log more detailed error message
print(f"Error processing column '{col}': {e}")
raise
# Convert the DataFrame to a structured array
structured_array = np.array(list(df.itertuples(index=False, name=None)), dtype=dtype)
return structured_array
def convert_string_to_bytes(input_list: list):
"""Convert a list of strings into a numpy array with utf8-type entries.
Parameters
----------
input_list (list) : list of string objects
Returns
-------
input_array_bytes (ndarray): array of ut8-type entries.
"""
utf8_type = lambda max_length: h5py.string_dtype('utf-8', max_length)
if input_list:
max_length = max(len(item) for item in input_list)
# Convert the strings to bytes with utf-8 encoding, specifying errors='ignore' to skip characters that cannot be encoded
input_list_bytes = [item.encode('utf-8', errors='ignore') for item in input_list]
input_array_bytes = np.array(input_list_bytes,dtype=utf8_type(max_length))
else:
input_array_bytes = np.array([],dtype=utf8_type(0))
return input_array_bytes
def convert_attrdict_to_np_structured_array(attr_value: dict):
"""
Converts a dictionary of attributes into a NumPy structured array with byte-encoded fields.
Handles UTF-8 encoding to avoid UnicodeEncodeError with non-ASCII characters.
Parameters
----------
attr_value : dict
Dictionary with scalar values (int, float, str).
Returns
-------
new_attr_value : ndarray
1-row structured array with fixed-size byte fields (dtype='S').
"""
if not isinstance(attr_value, dict):
raise ValueError(f"Input must be a dictionary, got {type(attr_value)}")
if not attr_value:
return np.array(['missing'], dtype=[('value', 'S16')]) # placeholder
dtype = []
values_list = []
max_str_len = max(len(str(v)) for v in attr_value.values())
byte_len = max_str_len * 4 # UTF-8 worst-case
for key, val in attr_value.items():
if key == 'rename_as':
continue
if isinstance(val, (int, float, str)):
dtype.append((key, f'S{byte_len}'))
try:
encoded_val = str(val).encode('utf-8') # explicit UTF-8
values_list.append(encoded_val)
except UnicodeEncodeError as e:
logging.error(f"Failed to encode {key}={val}: {e}")
raise
else:
logging.warning(f"Skipping unsupported type for key {key}: {type(val)}")
if values_list:
return np.array([tuple(values_list)], dtype=dtype)
else:
return np.array(['missing'], dtype=[('value', 'S16')])
def infer_units(column_name):
# TODO: complete or remove
match = re.search('\[.+\]')
if match:
return match
else:
match = re.search('\(.+\)')
return match
def progressBar(count_value, total, suffix=''):
bar_length = 100
filled_up_Length = int(round(bar_length* count_value / float(total)))
percentage = round(100.0 * count_value/float(total),1)
bar = '=' * filled_up_Length + '-' * (bar_length - filled_up_Length)
sys.stdout.write('[%s] %s%s ...%s\r' %(bar, percentage, '%', suffix))
sys.stdout.flush()
def copy_directory_with_contraints(input_dir_path, output_dir_path,
select_dir_keywords = None,
select_file_keywords = None,
allowed_file_extensions = None,
dry_run = False):
"""
Copies files from input_dir_path to output_dir_path based on specified constraints.
Parameters
----------
input_dir_path (str): Path to the input directory.
output_dir_path (str): Path to the output directory.
select_dir_keywords (list): optional, List of keywords for selecting directories.
select_file_keywords (list): optional, List of keywords for selecting files.
allowed_file_extensions (list): optional, List of allowed file extensions.
Returns
-------
path_to_files_dict (dict): dictionary mapping directory paths to lists of copied file names satisfying the constraints.
"""
# Unconstrained default behavior: No filters, make sure variable are lists even when defined as None in function signature
select_dir_keywords = select_dir_keywords or []
select_file_keywords = select_file_keywords or []
allowed_file_extensions = allowed_file_extensions or []
# Normalize paths and keywords to be consistently specified with os specific separator
input_dir_path = os.path.normpath(input_dir_path)
output_dir_path = os.path.normpath(output_dir_path)
select_dir_keywords = [keyword.replace('/',os.sep) for keyword in select_dir_keywords]
try:
with open(os.path.join(dimaPath, 'dima/utils/exclude_path_keywords.yaml'), 'r') as stream:
exclude_path_dict = yaml.safe_load(stream)
if isinstance(exclude_path_dict, dict):
exclude_path_keywords = exclude_path_dict.get('exclude_paths',{}).get('containing', [])
if not all(isinstance(keyword, str) for keyword in exclude_path_keywords):
exclude_path_keywords = []
else:
exclude_path_keywords = []
except (FileNotFoundError, yaml.YAMLError) as e:
print(f"Warning. Unable to load YAML file: {e}")
exclude_path_keywords = []
date = created_at('%Y_%m').replace(":", "-")
log_dir='logs/'
setup_logging(log_dir, f"copy_directory_with_contraints_{date}.log")
# Define helper functions. Return by default true when filtering lists are either None or []
def has_allowed_extension(filename):
return not allowed_file_extensions or os.path.splitext(filename)[1] in allowed_file_extensions
def file_is_selected(filename):
return not select_file_keywords or any(keyword in filename for keyword in select_file_keywords)
# Exclude path keywords
# Collect paths of directories, which are directly connected to the root dir and match select_dir_keywords
paths = []
if select_dir_keywords:
for item in os.listdir(input_dir_path): #Path(input_dir_path).iterdir():
if any([item in keyword for keyword in select_dir_keywords]):
paths.append(os.path.join(input_dir_path,item))
else:
paths.append(input_dir_path) #paths.append(Path(input_dir_path))
path_to_files_dict = {} # Dictionary to store directory-file pairs satisfying constraints
for subpath in paths:
for dirpath, _, filenames in os.walk(subpath,topdown=False):
# Exclude any dirpath containing a keyword in exclude_path_keywords
if any(excluded in dirpath for excluded in exclude_path_keywords):
continue
# Ensure composite keywords e.g., <keyword>/<keyword> are contained in the path
if select_dir_keywords and not any([keyword in dirpath for keyword in select_dir_keywords]):
continue
# Reduce filenames to those that are admissible
admissible_filenames = [
filename for filename in filenames
if file_is_selected(filename) and has_allowed_extension(filename)
]
if admissible_filenames: # Only create directory if there are files to copy
relative_dirpath = os.path.relpath(dirpath, input_dir_path)
target_dirpath = os.path.join(output_dir_path, relative_dirpath)
path_to_files_dict[target_dirpath] = admissible_filenames
if not dry_run:
# Perform the actual copying
os.makedirs(target_dirpath, exist_ok=True)
for filename in admissible_filenames:
src_file_path = os.path.join(dirpath, filename)
dest_file_path = os.path.join(target_dirpath, filename)
try:
shutil.copy2(src_file_path, dest_file_path)
except Exception as e:
logging.error("Failed to copy %s: %s", src_file_path, e)
return path_to_files_dict
def to_serializable_dtype(value):
"""Transform value's dtype into YAML/JSON compatible dtype
Parameters
----------
value : _type_
_description_
Returns
-------
_type_
_description_
"""
try:
if isinstance(value, np.generic):
if np.issubdtype(value.dtype, np.bytes_):
value = value.decode('utf-8')
elif np.issubdtype(value.dtype, np.unicode_):
value = str(value)
elif np.issubdtype(value.dtype, np.number):
value = float(value)
else:
print('Yaml-compatible data-type was not found. Value has been set to NaN.')
value = np.nan
elif isinstance(value, np.ndarray):
# Handling structured array types (with fields)
if value.dtype.names:
value = {field: to_serializable_dtype(value[field]) for field in value.dtype.names}
else:
# Handling regular array NumPy types with assumption of unform dtype accross array elements
# TODO: evaluate a more general way to check for individual dtypes
if isinstance(value[0], bytes):
# Decode bytes
value = [item.decode('utf-8') for item in value] if len(value) > 1 else value[0].decode('utf-8')
elif isinstance(value[0], str):
# Already a string type
value = [str(item) for item in value] if len(value) > 1 else str(value[0])
elif isinstance(value[0], int):
# Integer type
value = [int(item) for item in value] if len(value) > 1 else int(value[0])
elif isinstance(value[0], float):
# Floating type
value = [float(item) for item in value] if len(value) > 1 else float(value[0])
else:
print('Yaml-compatible data-type was not found. Value has been set to NaN.')
print("Debug: value.dtype is", value.dtype)
value = np.nan
except Exception as e:
print(f'Error converting value: {e}. Value has been set to NaN.')
value = np.nan
return value
def is_structured_array(attr_val):
if isinstance(attr_val,np.ndarray):
return True if attr_val.dtype.names is not None else False
else:
return False
import os
from pathlib import Path
def find_env_file(start_path=None):
"""
Find .env file by walking up the directory tree.
Looks for .env in current dir, then parent dirs up to filesystem root.
Args:
start_path: Starting directory (defaults to current working directory)
Returns:
Path to .env file or None if not found
"""
if start_path is None:
start_path = os.getcwd()
current_path = Path(start_path).resolve()
# Walk up the directory tree
for path in [current_path] + list(current_path.parents):
env_file = path / '.env'
if env_file.exists():
return str(env_file)
return None
import os
def load_env_from_root():
"""Load environment variables from .env file found in project root or parent."""
env_file = find_env_file()
if env_file:
try:
from dotenv import load_dotenv
load_dotenv(env_file, override=True) # override existing values
print(f"Loaded .env from: {env_file}")
return True
except ImportError:
with open(env_file, 'r') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#') and '=' in line:
key, value = line.split('=', 1)
os.environ[key.strip()] = value.strip()
print(f"Manually loaded .env from: {env_file}")
return True
else:
print("No .env file found in project hierarchy")
return False