import sys import os try: thisFilePath = os.path.abspath(__file__) except NameError: print("Error: __file__ is not available. Ensure the script is being run from a file.") print("[Notice] Path to DIMA package may not be resolved properly.") thisFilePath = os.getcwd() # Use current directory or specify a default dimaPath = os.path.normpath(os.path.join(thisFilePath, "..",'..','..')) # Move up to project root if dimaPath not in sys.path: # Avoid duplicate entries sys.path.insert(0,dimaPath) import pandas as pd import os import sys import shutil import datetime import logging import numpy as np import h5py import re import yaml def setup_logging(log_dir, log_filename): """Sets up logging to a specified directory and file. Parameters: log_dir (str): Directory to save the log file. log_filename (str): Name of the log file. """ # Ensure the log directory exists os.makedirs(log_dir, exist_ok=True) # Create a logger instance logger = logging.getLogger() logger.setLevel(logging.INFO) # Create a file handler log_path = os.path.join(log_dir, log_filename) file_handler = logging.FileHandler(log_path) # Create a formatter and set it for the handler formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') file_handler.setFormatter(formatter) # Add the handler to the logger logger.addHandler(file_handler) def is_callable_list(x : list): return all([callable(item) for item in x]) def is_str_list(x : list): return all([isinstance(item,str) for item in x]) def augment_with_filetype(df): df['filetype'] = [os.path.splitext(item)[1][1::] for item in df['filename']] #return [os.path.splitext(item)[1][1::] for item in df['filename']] return df def augment_with_filenumber(df): df['filenumber'] = [item[0:item.find('_')] for item in df['filename']] #return [item[0:item.find('_')] for item in df['filename']] return df def group_by_df_column(df, column_name: str): """ df (pandas.DataFrame): column_name (str): column_name of df by which grouping operation will take place. """ if not column_name in df.columns: raise ValueError("column_name must be in the columns of df.") return df[column_name] def split_sample_col_into_sample_and_data_quality_cols(input_data: pd.DataFrame): sample_name = [] sample_quality = [] for item in input_data['sample']: if item.find('(')!=-1: #print(item) sample_name.append(item[0:item.find('(')]) sample_quality.append(item[item.find('(')+1:len(item)-1]) else: if item=='': sample_name.append('Not yet annotated') sample_quality.append('unevaluated') else: sample_name.append(item) sample_quality.append('good data') input_data['sample'] = sample_name input_data['data_quality'] = sample_quality return input_data def make_file_copy(source_file_path, output_folder_name : str = 'tmp_files'): pathtail, filename = os.path.split(source_file_path) #backup_filename = 'backup_'+ filename backup_filename = filename # Path ROOT_DIR = os.path.abspath(os.curdir) tmp_dirpath = os.path.join(ROOT_DIR,output_folder_name) if not os.path.exists(tmp_dirpath): os.mkdir(tmp_dirpath) tmp_file_path = os.path.join(tmp_dirpath,backup_filename) shutil.copy(source_file_path, tmp_file_path) return tmp_file_path def created_at(datetime_format = '%Y-%m-%d %H:%M:%S'): now = datetime.datetime.now() # Populate now object with time zone information obtained from the local system now_tz_aware = now.astimezone() tz = now_tz_aware.strftime('%z') # Replace colons in the time part of the timestamp with hyphens to make it file name friendly created_at = now_tz_aware.strftime(datetime_format) #+ '_UTC-OFST_' + tz return created_at def sanitize_dataframe(df: pd.DataFrame) -> pd.DataFrame: # Handle datetime columns (convert to string in 'yyyy-mm-dd hh:mm:ss' format) datetime_cols = df.select_dtypes(include=['datetime']).columns for col in datetime_cols: # Convert datetime to string in the specified format, handling NaT df[col] = df[col].dt.strftime('%Y-%m-%d %H-%M-%S') # Handle object columns with mixed types otype_cols = df.select_dtypes(include='O') for col in otype_cols: col_data = df[col] # Check if all elements in the column are strings if col_data.apply(lambda x: isinstance(x, str)).all(): df[col] = df[col].astype(str) else: # If the column contains mixed types, attempt to convert to numeric, coercing errors to NaN df[col] = pd.to_numeric(col_data, errors='coerce') # Handle NaN values differently based on dtype if pd.api.types.is_string_dtype(df[col]): # Replace NaN in string columns with empty string df[col] = df[col].fillna('') # Replace NaN with empty string elif pd.api.types.is_numeric_dtype(df[col]): # For numeric columns, we want to keep NaN as it is # But if integer column has NaN, consider casting to float if pd.api.types.is_integer_dtype(df[col]): df[col] = df[col].astype(float) # Cast to float to allow NaN else: df[col] = df[col].fillna(np.nan) # Keep NaN in float columns return df def convert_dataframe_to_np_structured_array(df: pd.DataFrame): df = sanitize_dataframe(df) # Define the dtype for the structured array, ensuring compatibility with h5py dtype = [] for col in df.columns: col_data = df[col] col_dtype = col_data.dtype try: if pd.api.types.is_string_dtype(col_dtype): # Convert string dtype to fixed-length strings max_len = col_data.str.len().max() if not col_data.isnull().all() else 0 dtype.append((col, f'S{max_len}')) elif pd.api.types.is_integer_dtype(col_dtype): dtype.append((col, 'i4')) # Assuming 32-bit integer elif pd.api.types.is_float_dtype(col_dtype): dtype.append((col, 'f4')) # Assuming 32-bit float elif pd.api.types.is_bool_dtype(col_dtype): dtype.append((col,bool)) else: # Handle unsupported data types print(f"Unsupported dtype found in column '{col}': {col_data.dtype}") raise ValueError(f"Unsupported data type: {col_data.dtype}") except Exception as e: # Log more detailed error message print(f"Error processing column '{col}': {e}") raise # Convert the DataFrame to a structured array structured_array = np.array(list(df.itertuples(index=False, name=None)), dtype=dtype) return structured_array def convert_string_to_bytes(input_list: list): """Convert a list of strings into a numpy array with utf8-type entries. Parameters ---------- input_list (list) : list of string objects Returns ------- input_array_bytes (ndarray): array of ut8-type entries. """ utf8_type = lambda max_length: h5py.string_dtype('utf-8', max_length) if input_list: max_length = max(len(item) for item in input_list) # Convert the strings to bytes with utf-8 encoding, specifying errors='ignore' to skip characters that cannot be encoded input_list_bytes = [item.encode('utf-8', errors='ignore') for item in input_list] input_array_bytes = np.array(input_list_bytes,dtype=utf8_type(max_length)) else: input_array_bytes = np.array([],dtype=utf8_type(0)) return input_array_bytes def convert_attrdict_to_np_structured_array(attr_value: dict): """ Converts a dictionary of attributes into a NumPy structured array with byte-encoded fields. Handles UTF-8 encoding to avoid UnicodeEncodeError with non-ASCII characters. Parameters ---------- attr_value : dict Dictionary with scalar values (int, float, str). Returns ------- new_attr_value : ndarray 1-row structured array with fixed-size byte fields (dtype='S'). """ if not isinstance(attr_value, dict): raise ValueError(f"Input must be a dictionary, got {type(attr_value)}") if not attr_value: return np.array(['missing'], dtype=[('value', 'S16')]) # placeholder dtype = [] values_list = [] max_str_len = max(len(str(v)) for v in attr_value.values()) byte_len = max_str_len * 4 # UTF-8 worst-case for key, val in attr_value.items(): if key == 'rename_as': continue if isinstance(val, (int, float, str)): dtype.append((key, f'S{byte_len}')) try: encoded_val = str(val).encode('utf-8') # explicit UTF-8 values_list.append(encoded_val) except UnicodeEncodeError as e: logging.error(f"Failed to encode {key}={val}: {e}") raise else: logging.warning(f"Skipping unsupported type for key {key}: {type(val)}") if values_list: return np.array([tuple(values_list)], dtype=dtype) else: return np.array(['missing'], dtype=[('value', 'S16')]) def infer_units(column_name): # TODO: complete or remove match = re.search('\[.+\]') if match: return match else: match = re.search('\(.+\)') return match def progressBar(count_value, total, suffix=''): bar_length = 100 filled_up_Length = int(round(bar_length* count_value / float(total))) percentage = round(100.0 * count_value/float(total),1) bar = '=' * filled_up_Length + '-' * (bar_length - filled_up_Length) sys.stdout.write('[%s] %s%s ...%s\r' %(bar, percentage, '%', suffix)) sys.stdout.flush() def copy_directory_with_contraints(input_dir_path, output_dir_path, select_dir_keywords = None, select_file_keywords = None, allowed_file_extensions = None, dry_run = False): """ Copies files from input_dir_path to output_dir_path based on specified constraints. Parameters ---------- input_dir_path (str): Path to the input directory. output_dir_path (str): Path to the output directory. select_dir_keywords (list): optional, List of keywords for selecting directories. select_file_keywords (list): optional, List of keywords for selecting files. allowed_file_extensions (list): optional, List of allowed file extensions. Returns ------- path_to_files_dict (dict): dictionary mapping directory paths to lists of copied file names satisfying the constraints. """ # Unconstrained default behavior: No filters, make sure variable are lists even when defined as None in function signature select_dir_keywords = select_dir_keywords or [] select_file_keywords = select_file_keywords or [] allowed_file_extensions = allowed_file_extensions or [] # Normalize paths and keywords to be consistently specified with os specific separator input_dir_path = os.path.normpath(input_dir_path) output_dir_path = os.path.normpath(output_dir_path) select_dir_keywords = [keyword.replace('/',os.sep) for keyword in select_dir_keywords] try: with open(os.path.join(dimaPath, 'dima/utils/exclude_path_keywords.yaml'), 'r') as stream: exclude_path_dict = yaml.safe_load(stream) if isinstance(exclude_path_dict, dict): exclude_path_keywords = exclude_path_dict.get('exclude_paths',{}).get('containing', []) if not all(isinstance(keyword, str) for keyword in exclude_path_keywords): exclude_path_keywords = [] else: exclude_path_keywords = [] except (FileNotFoundError, yaml.YAMLError) as e: print(f"Warning. Unable to load YAML file: {e}") exclude_path_keywords = [] date = created_at('%Y_%m').replace(":", "-") log_dir='logs/' setup_logging(log_dir, f"copy_directory_with_contraints_{date}.log") # Define helper functions. Return by default true when filtering lists are either None or [] def has_allowed_extension(filename): return not allowed_file_extensions or os.path.splitext(filename)[1] in allowed_file_extensions def file_is_selected(filename): return not select_file_keywords or any(keyword in filename for keyword in select_file_keywords) # Exclude path keywords # Collect paths of directories, which are directly connected to the root dir and match select_dir_keywords paths = [] if select_dir_keywords: for item in os.listdir(input_dir_path): #Path(input_dir_path).iterdir(): if any([item in keyword for keyword in select_dir_keywords]): paths.append(os.path.join(input_dir_path,item)) else: paths.append(input_dir_path) #paths.append(Path(input_dir_path)) path_to_files_dict = {} # Dictionary to store directory-file pairs satisfying constraints for subpath in paths: for dirpath, _, filenames in os.walk(subpath,topdown=False): # Exclude any dirpath containing a keyword in exclude_path_keywords if any(excluded in dirpath for excluded in exclude_path_keywords): continue # Ensure composite keywords e.g., / are contained in the path if select_dir_keywords and not any([keyword in dirpath for keyword in select_dir_keywords]): continue # Reduce filenames to those that are admissible admissible_filenames = [ filename for filename in filenames if file_is_selected(filename) and has_allowed_extension(filename) ] if admissible_filenames: # Only create directory if there are files to copy relative_dirpath = os.path.relpath(dirpath, input_dir_path) target_dirpath = os.path.join(output_dir_path, relative_dirpath) path_to_files_dict[target_dirpath] = admissible_filenames if not dry_run: # Perform the actual copying os.makedirs(target_dirpath, exist_ok=True) for filename in admissible_filenames: src_file_path = os.path.join(dirpath, filename) dest_file_path = os.path.join(target_dirpath, filename) try: shutil.copy2(src_file_path, dest_file_path) except Exception as e: logging.error("Failed to copy %s: %s", src_file_path, e) return path_to_files_dict def to_serializable_dtype(value): """Transform value's dtype into YAML/JSON compatible dtype Parameters ---------- value : _type_ _description_ Returns ------- _type_ _description_ """ try: if isinstance(value, np.generic): if np.issubdtype(value.dtype, np.bytes_): value = value.decode('utf-8') elif np.issubdtype(value.dtype, np.unicode_): value = str(value) elif np.issubdtype(value.dtype, np.number): value = float(value) else: print('Yaml-compatible data-type was not found. Value has been set to NaN.') value = np.nan elif isinstance(value, np.ndarray): # Handling structured array types (with fields) if value.dtype.names: value = {field: to_serializable_dtype(value[field]) for field in value.dtype.names} else: # Handling regular array NumPy types with assumption of unform dtype accross array elements # TODO: evaluate a more general way to check for individual dtypes if isinstance(value[0], bytes): # Decode bytes value = [item.decode('utf-8') for item in value] if len(value) > 1 else value[0].decode('utf-8') elif isinstance(value[0], str): # Already a string type value = [str(item) for item in value] if len(value) > 1 else str(value[0]) elif isinstance(value[0], int): # Integer type value = [int(item) for item in value] if len(value) > 1 else int(value[0]) elif isinstance(value[0], float): # Floating type value = [float(item) for item in value] if len(value) > 1 else float(value[0]) else: print('Yaml-compatible data-type was not found. Value has been set to NaN.') print("Debug: value.dtype is", value.dtype) value = np.nan except Exception as e: print(f'Error converting value: {e}. Value has been set to NaN.') value = np.nan return value def is_structured_array(attr_val): if isinstance(attr_val,np.ndarray): return True if attr_val.dtype.names is not None else False else: return False import os from pathlib import Path def find_env_file(start_path=None): """ Find .env file by walking up the directory tree. Looks for .env in current dir, then parent dirs up to filesystem root. Args: start_path: Starting directory (defaults to current working directory) Returns: Path to .env file or None if not found """ if start_path is None: start_path = os.getcwd() current_path = Path(start_path).resolve() # Walk up the directory tree for path in [current_path] + list(current_path.parents): env_file = path / '.env' if env_file.exists(): return str(env_file) return None import os def load_env_from_root(): """Load environment variables from .env file found in project root or parent.""" env_file = find_env_file() if env_file: try: from dotenv import load_dotenv load_dotenv(env_file, override=True) # override existing values print(f"Loaded .env from: {env_file}") return True except ImportError: with open(env_file, 'r') as f: for line in f: line = line.strip() if line and not line.startswith('#') and '=' in line: key, value = line.split('=', 1) os.environ[key.strip()] = value.strip() print(f"Manually loaded .env from: {env_file}") return True else: print("No .env file found in project hierarchy") return False