Restructured a bit to include the default case of copying an imnput directory without any constraints. Also, added dry_run input argument that returns a path to files dict representation of output directory without making an actual copy. Useful when input directory is already safe to work with directly
This commit is contained in:
@ -190,7 +190,11 @@ def progressBar(count_value, total, suffix=''):
|
||||
sys.stdout.write('[%s] %s%s ...%s\r' %(bar, percentage, '%', suffix))
|
||||
sys.stdout.flush()
|
||||
|
||||
def copy_directory_with_contraints(input_dir_path, output_dir_path, select_dir_keywords, select_file_keywords, allowed_file_extensions):
|
||||
def copy_directory_with_contraints(input_dir_path, output_dir_path,
|
||||
select_dir_keywords = None,
|
||||
select_file_keywords = None,
|
||||
allowed_file_extensions = None,
|
||||
dry_run = False):
|
||||
"""
|
||||
Copies files from input_dir_path to output_dir_path based on specified constraints.
|
||||
|
||||
@ -198,24 +202,30 @@ def copy_directory_with_contraints(input_dir_path, output_dir_path, select_dir_k
|
||||
----------
|
||||
input_dir_path (str): Path to the input directory.
|
||||
output_dir_path (str): Path to the output directory.
|
||||
select_dir_keywords (list): List of keywords for selecting directories.
|
||||
select_file_keywords (list): List of keywords for selecting files.
|
||||
allowed_file_extensions (list): List of allowed file extensions.
|
||||
select_dir_keywords (list): optional, List of keywords for selecting directories.
|
||||
select_file_keywords (list): optional, List of keywords for selecting files.
|
||||
allowed_file_extensions (list): optional, List of allowed file extensions.
|
||||
|
||||
Returns
|
||||
-------
|
||||
path_to_files_dict (dict): dictionary mapping directory paths to lists of copied file names satisfying the constraints.
|
||||
"""
|
||||
|
||||
# Unconstrained default behavior: No filters, make sure variable are lists even when defined as None in function signature
|
||||
select_dir_keywords = select_dir_keywords or []
|
||||
select_file_keywords = select_file_keywords or []
|
||||
allowed_file_extensions = allowed_file_extensions or []
|
||||
|
||||
date = created_at()
|
||||
log_dir='logs/'
|
||||
setup_logging(log_dir, f"copy_directory_with_contraints_{date}.log")
|
||||
|
||||
# Define helper functions. Return by default true when filtering lists are either None or []
|
||||
def has_allowed_extension(filename):
|
||||
return os.path.splitext(filename)[1] in allowed_file_extensions
|
||||
|
||||
return not allowed_file_extensions or os.path.splitext(filename)[1] in allowed_file_extensions
|
||||
|
||||
def file_is_selected(filename):
|
||||
return any(keyword in filename for keyword in select_file_keywords) if select_file_keywords else True
|
||||
return not select_file_keywords or any(keyword in filename for keyword in select_file_keywords)
|
||||
|
||||
|
||||
# Collect paths of directories, which are directly connected to the root dir and match select_dir_keywords
|
||||
@ -227,7 +237,7 @@ def copy_directory_with_contraints(input_dir_path, output_dir_path, select_dir_k
|
||||
else:
|
||||
paths.append(input_dir_path) #paths.append(Path(input_dir_path))
|
||||
|
||||
ROOT_DIR = input_dir_path
|
||||
|
||||
path_to_files_dict = {} # Dictionary to store directory-file pairs satisfying constraints
|
||||
|
||||
for subpath in paths:
|
||||
@ -235,23 +245,29 @@ def copy_directory_with_contraints(input_dir_path, output_dir_path, select_dir_k
|
||||
for dirpath, _, filenames in os.walk(subpath,topdown=False):
|
||||
|
||||
# Reduce filenames to those that are admissible
|
||||
admissible_filenames = [filename for filename in filenames if has_allowed_extension(filename) and file_is_selected(filename)]
|
||||
admissible_filenames = [
|
||||
filename for filename in filenames
|
||||
if file_is_selected(filename) and has_allowed_extension(filename)
|
||||
]
|
||||
|
||||
if admissible_filenames: # Only create directory if there are files to copy
|
||||
|
||||
|
||||
relative_dirpath = os.path.relpath(dirpath, ROOT_DIR)
|
||||
relative_dirpath = os.path.relpath(dirpath, input_dir_path)
|
||||
target_dirpath = os.path.join(output_dir_path, relative_dirpath)
|
||||
#path_to_files_dict[dirpath] = admissible_filenames
|
||||
path_to_files_dict[target_dirpath] = admissible_filenames
|
||||
os.makedirs(target_dirpath, exist_ok=True)
|
||||
|
||||
for filename in admissible_filenames:
|
||||
src_file_path = os.path.join(dirpath, filename)
|
||||
dest_file_path = os.path.join(target_dirpath, filename)
|
||||
try:
|
||||
shutil.copy2(src_file_path, dest_file_path)
|
||||
except Exception as e:
|
||||
logging.error("Failed to copy %s: %s", src_file_path, e)
|
||||
|
||||
if not dry_run:
|
||||
|
||||
# Perform the actual copying
|
||||
|
||||
os.makedirs(target_dirpath, exist_ok=True)
|
||||
|
||||
for filename in admissible_filenames:
|
||||
src_file_path = os.path.join(dirpath, filename)
|
||||
dest_file_path = os.path.join(target_dirpath, filename)
|
||||
try:
|
||||
shutil.copy2(src_file_path, dest_file_path)
|
||||
except Exception as e:
|
||||
logging.error("Failed to copy %s: %s", src_file_path, e)
|
||||
|
||||
return path_to_files_dict
|
Reference in New Issue
Block a user