Update pipelines/data_integration.py with new functionality. copy subtree and create hdf5 function now checks whether there is already a copy of the src directory to avoid replacing files and verifies there is enough free storage space to initiate data transfer and subsequent ingestion into hdf5.

This commit is contained in:
Juan Felipe Flórez Ospina
2025-06-28 14:45:06 +02:00
parent 0115745433
commit d6bb20ae7d

View File

@@ -18,8 +18,8 @@ if dimaPath not in sys.path: # Avoid duplicate entries
import yaml import yaml
import logging import logging
from datetime import datetime from datetime import datetime
# Importing chain class from itertools import shutil
from itertools import chain
# Import DIMA modules # Import DIMA modules
try: try:
@@ -196,11 +196,48 @@ def copy_subtree_and_create_hdf5(src, dst, select_dir_keywords, select_file_keyw
"""Helper function to copy directory with constraints and create HDF5.""" """Helper function to copy directory with constraints and create HDF5."""
src = src.replace(os.sep,'/') src = src.replace(os.sep,'/')
dst = dst.replace(os.sep,'/') dst = dst.replace(os.sep,'/')
logging.info("Creating constrained copy of the experimental campaign folder %s at: %s", src, dst)
path_to_files_dict = utils.copy_directory_with_contraints(src, dst, select_dir_keywords, select_file_keywords, allowed_file_extensions)
logging.info("Finished creating a copy of the experimental campaign folder tree at: %s", dst) # Dry run to see what needs copying
logging.info("Checking copy status for %s", src)
# Return path to files that are expected in the dst directory
path_to_expected_files = utils.copy_directory_with_contraints(src, dst, select_dir_keywords,
select_file_keywords, allowed_file_extensions,
dry_run=True)
# Check existence and collect sizes
all_exist = True
total_size = 0
for dir_path, filenames in path_to_expected_files.items():
for filename in filenames:
dst_file_path = os.path.join(dir_path, filename)
if not os.path.exists(dst_file_path):
all_exist = False
# Get size from source file
src_file_path = os.path.join(src, os.path.relpath(dst_file_path, dst))
if os.path.exists(src_file_path):
#print(os.path.getsize(src_file_path))
total_size += os.path.getsize(src_file_path)
if all_exist:
logging.info(f"All files already exist at {dst}, skipping copy.")
print(f"[Notice] All files already exist at {dst}, skipping copy.")
path_to_files_dict = path_to_expected_files
else:
# Check available space for missing files only
dst_free = shutil.disk_usage(".").free # checks the free space in the current dir
if total_size > dst_free:
raise Exception(f"Insufficient space: need {total_size/1e9:.6f}GB, have {dst_free/1e9:.6f}GB")
else:
print(f"Campaign folder size: {total_size/1e9:.6f}GB")
print(f"Free space: {dst_free/1e9:.6f}GB")
logging.info(f"Creating constrained copy of the experimental campaign folder {src} at: {dst}")#, src, dst)
path_to_files_dict = utils.copy_directory_with_contraints(src, dst, select_dir_keywords, select_file_keywords, allowed_file_extensions)
logging.info("Finished creating a copy of the experimental campaign folder tree at: %s", dst)
logging.info("Creating HDF5 file at: %s", dst) logging.info("Creating HDF5 file at: %s", dst)