From d6bb20ae7dcd9bc81e6b259b25e32ef8efa8aa36 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20Felipe=20Fl=C3=B3rez=20Ospina?= Date: Sat, 28 Jun 2025 14:45:06 +0200 Subject: [PATCH] Update pipelines/data_integration.py with new functionality. copy subtree and create hdf5 function now checks whether there is already a copy of the src directory to avoid replacing files and verifies there is enough free storage space to initiate data transfer and subsequent ingestion into hdf5. --- pipelines/data_integration.py | 49 ++++++++++++++++++++++++++++++----- 1 file changed, 43 insertions(+), 6 deletions(-) diff --git a/pipelines/data_integration.py b/pipelines/data_integration.py index 66a5fca..2cfb013 100644 --- a/pipelines/data_integration.py +++ b/pipelines/data_integration.py @@ -18,8 +18,8 @@ if dimaPath not in sys.path: # Avoid duplicate entries import yaml import logging from datetime import datetime -# Importing chain class from itertools -from itertools import chain +import shutil + # Import DIMA modules try: @@ -196,11 +196,48 @@ def copy_subtree_and_create_hdf5(src, dst, select_dir_keywords, select_file_keyw """Helper function to copy directory with constraints and create HDF5.""" src = src.replace(os.sep,'/') dst = dst.replace(os.sep,'/') - - logging.info("Creating constrained copy of the experimental campaign folder %s at: %s", src, dst) - path_to_files_dict = utils.copy_directory_with_contraints(src, dst, select_dir_keywords, select_file_keywords, allowed_file_extensions) - logging.info("Finished creating a copy of the experimental campaign folder tree at: %s", dst) + + # Dry run to see what needs copying + logging.info("Checking copy status for %s", src) + # Return path to files that are expected in the dst directory + path_to_expected_files = utils.copy_directory_with_contraints(src, dst, select_dir_keywords, + select_file_keywords, allowed_file_extensions, + dry_run=True) + + # Check existence and collect sizes + all_exist = True + total_size = 0 + + for dir_path, filenames in path_to_expected_files.items(): + for filename in filenames: + dst_file_path = os.path.join(dir_path, filename) + + if not os.path.exists(dst_file_path): + all_exist = False + # Get size from source file + src_file_path = os.path.join(src, os.path.relpath(dst_file_path, dst)) + + if os.path.exists(src_file_path): + #print(os.path.getsize(src_file_path)) + total_size += os.path.getsize(src_file_path) + + if all_exist: + logging.info(f"All files already exist at {dst}, skipping copy.") + print(f"[Notice] All files already exist at {dst}, skipping copy.") + path_to_files_dict = path_to_expected_files + else: + # Check available space for missing files only + dst_free = shutil.disk_usage(".").free # checks the free space in the current dir + if total_size > dst_free: + raise Exception(f"Insufficient space: need {total_size/1e9:.6f}GB, have {dst_free/1e9:.6f}GB") + else: + print(f"Campaign folder size: {total_size/1e9:.6f}GB") + print(f"Free space: {dst_free/1e9:.6f}GB") + + logging.info(f"Creating constrained copy of the experimental campaign folder {src} at: {dst}")#, src, dst) + path_to_files_dict = utils.copy_directory_with_contraints(src, dst, select_dir_keywords, select_file_keywords, allowed_file_extensions) + logging.info("Finished creating a copy of the experimental campaign folder tree at: %s", dst) logging.info("Creating HDF5 file at: %s", dst)