Create new version v1.1.0. Data integration pipeline now does disk space check and skips data transfer if destination files already exist.

This commit is contained in:
2025-06-28 16:21:26 +02:00
parent 0115745433
commit c40d138563
4 changed files with 735 additions and 27 deletions

View File

@ -20,7 +20,7 @@ import logging
from datetime import datetime
# Importing chain class from itertools
from itertools import chain
import shutil
# Import DIMA modules
try:
from dima.src import hdf5_writer as hdf5_lib
@ -196,11 +196,48 @@ def copy_subtree_and_create_hdf5(src, dst, select_dir_keywords, select_file_keyw
"""Helper function to copy directory with constraints and create HDF5."""
src = src.replace(os.sep,'/')
dst = dst.replace(os.sep,'/')
logging.info("Creating constrained copy of the experimental campaign folder %s at: %s", src, dst)
path_to_files_dict = utils.copy_directory_with_contraints(src, dst, select_dir_keywords, select_file_keywords, allowed_file_extensions)
logging.info("Finished creating a copy of the experimental campaign folder tree at: %s", dst)
# Dry run to see what needs copying
logging.info("Checking copy status for %s", src)
# Return path to files that are expected in the dst directory
path_to_expected_files = utils.copy_directory_with_contraints(src, dst, select_dir_keywords,
select_file_keywords, allowed_file_extensions,
dry_run=True)
# Check existence and collect sizes
all_exist = True
total_size = 0
for dir_path, filenames in path_to_expected_files.items():
for filename in filenames:
dst_file_path = os.path.join(dir_path, filename)
if not os.path.exists(dst_file_path):
all_exist = False
# Get size from source file
src_file_path = os.path.join(src, os.path.relpath(dst_file_path, dst))
if os.path.exists(src_file_path):
#print(os.path.getsize(src_file_path))
total_size += os.path.getsize(src_file_path)
if all_exist:
logging.info(f"All files already exist at {dst}, skipping copy.")
print(f"[Notice] All files already exist at {dst}, skipping copy.")
path_to_files_dict = path_to_expected_files
else:
# Check available space for missing files only
dst_free = shutil.disk_usage(".").free # checks the free space in the current dir
if total_size > dst_free:
raise Exception(f"Insufficient space: need {total_size/1e9:.6f}GB, have {dst_free/1e9:.6f}GB")
else:
print(f"Campaign folder size: {total_size/1e9:.6f}GB")
print(f"Free space: {dst_free/1e9:.6f}GB")
logging.info(f"Creating constrained copy of the experimental campaign folder {src} at: {dst}")#, src, dst)
path_to_files_dict = utils.copy_directory_with_contraints(src, dst, select_dir_keywords, select_file_keywords, allowed_file_extensions)
logging.info("Finished creating a copy of the experimental campaign folder tree at: %s", dst)
logging.info("Creating HDF5 file at: %s", dst)
@ -211,6 +248,8 @@ def copy_subtree_and_create_hdf5(src, dst, select_dir_keywords, select_file_keyw
return hdf5_path
def run_pipeline(path_to_config_yamlFile, log_dir='logs/'):
config_dict = load_config_and_setup_logging(path_to_config_yamlFile, log_dir)