import sys import os import re try: thisFilePath = os.path.abspath(__file__) except NameError: print("Error: __file__ is not available. Ensure the script is being run from a file.") print("[Notice] Path to DIMA package may not be resolved properly.") thisFilePath = os.getcwd() # Use current directory or specify a default dimaPath = os.path.normpath(os.path.join(thisFilePath, "..",'..')) # Move up to project root if dimaPath not in sys.path: # Avoid duplicate entries sys.path.append(dimaPath) import yaml import logging from datetime import datetime # Importing chain class from itertools from itertools import chain import shutil # Import DIMA modules try: from dima.src import hdf5_writer as hdf5_lib from dima.utils import g5505_utils as utils from dima.instruments import filereader_registry except ModuleNotFoundError: print(':)') import src.hdf5_writer as hdf5_lib import utils.g5505_utils as utils from instruments import filereader_registry allowed_file_extensions = filereader_registry.file_extensions def _generate_datetime_dict(datetime_steps): """ Generate the datetime augment dictionary from datetime steps. """ datetime_augment_dict = {} for datetime_step in datetime_steps: datetime_augment_dict[datetime_step] = [ datetime_step.strftime('%Y-%m-%d'), datetime_step.strftime('%Y_%m_%d'), datetime_step.strftime('%Y.%m.%d'), datetime_step.strftime('%Y%m%d') ] return datetime_augment_dict def _generate_output_path_fragment(filename_prefix, integration_mode, dataset_startdate, dataset_enddate, index=None): """Generate consistent directory or file name fragment based on mode.""" if integration_mode == 'collection': return f'collection_{index}_{filename_prefix}_{dataset_enddate}' else: return f'{filename_prefix}_{dataset_enddate}' def load_config_and_setup_logging(yaml_config_file_path, log_dir): """Load YAML configuration file, set up logging, and validate required keys and datetime_steps.""" # Define required keys required_keys = [ 'experiment', 'contact', 'input_file_directory', 'output_file_directory', 'instrument_datafolder', 'project', 'actris_level' ] # Supported integration modes supported_integration_modes = ['collection', 'single_experiment'] # Set up logging date = utils.created_at("%Y_%m").replace(":", "-") utils.setup_logging(log_dir, f"integrate_data_sources_{date}.log") # Load YAML configuration file with open(yaml_config_file_path, 'r') as stream: try: config_dict = yaml.load(stream, Loader=yaml.FullLoader) except yaml.YAMLError as exc: logging.error("Error loading YAML file: %s", exc) raise ValueError(f"Failed to load YAML file: {exc}") # Check if required keys are present missing_keys = [key for key in required_keys if key not in config_dict] if missing_keys: raise KeyError(f"Missing required keys in YAML configuration: {missing_keys}") # Look for all placeholders like ${VAR_NAME} input_dir = config_dict['input_file_directory'] placeholders = re.findall(r'\$\{([^}^{]+)\}', input_dir) success = utils.load_env_from_root() print(f'Success : {success}') for var in placeholders: env_value = os.environ.get(var) if env_value is None: raise ValueError(f"Environment variable '{var}' is not set but used in the config.") input_dir = input_dir.replace(f"${{{var}}}", env_value) config_dict['input_file_directory'] = input_dir # Check the instrument_datafolder required type and ensure the list is of at least length one. if isinstance(config_dict['instrument_datafolder'], list) and not len(config_dict['instrument_datafolder'])>=1: raise ValueError('Invalid value for key "instrument_datafolder". Expected a list of strings with at least one item.' 'Each item represents a subfolder name in the input file directory, where the name' 'must match the format "[/]".' 'The first subfolder name is required, and the second is optional. ' 'Examples of valid values: "level1", "level1/level2".') # Define the pattern for valid subfolder names: `subfolder` or `subfolder/subfolder` #valid_pattern = re.compile(r'^[^/]+(/[^/]+)?$') # Validate each subfolder name #for folder in config_dict['instrument_folder']: # if not isinstance(folder, str) or not valid_pattern.match(folder): # raise ValueError( # 'Invalid value for key "instrument_folder" in YAML file.' # 'Each item must be a string matching the format ' # '"[/]". The first subfolder name is required, and the second is optional. ' # 'Examples of valid values: "level1", "level1/level2". ' # f'Invalid item: {folder}' # ) # Validate integration_mode integration_mode = config_dict.get('integration_mode', 'N/A') # Default to 'collection' if integration_mode not in supported_integration_modes: raise RuntimeWarning( f"Unsupported integration_mode '{integration_mode}'. Supported modes are {supported_integration_modes}. Setting '{integration_mode}' to 'single_experiment'." ) # Validate datetime_steps format if it exists if 'datetime_steps' in config_dict: datetime_steps = config_dict['datetime_steps'] expected_format = '%Y-%m-%d %H-%M-%S' # Check if datetime_steps is a list or a falsy value if datetime_steps and not isinstance(datetime_steps, list): raise TypeError(f"datetime_steps should be a list of strings or a falsy value (None, empty), but got {type(datetime_steps)}") for step_idx, step in enumerate(datetime_steps): try: # Attempt to parse the datetime to ensure correct format config_dict['datetime_steps'][step_idx] = datetime.strptime(step, expected_format) except ValueError: raise ValueError(f"Invalid datetime format for '{step}'. Expected format: {expected_format}") # Augment datatime_steps list as a dictionary. This to speed up single-experiment file generation config_dict['datetime_steps_dict'] = _generate_datetime_dict(datetime_steps) else: # If datetime_steps is not present, set the integration mode to 'collection' logging.info("datetime_steps missing, setting integration_mode to 'collection'.") config_dict['integration_mode'] = 'collection' # Validate filename_format if defined if 'filename_format' in config_dict: if not isinstance(config_dict['filename_format'], str): raise ValueError(f'"Specified filename_format needs to be of String type" ') # Split the string and check if each key exists in config_dict keys = [key.strip() for key in config_dict['filename_format'].split(',')] missing_keys = [key for key in keys if key not in config_dict] # If there are any missing keys, raise an assertion error # assert not missing_keys, f'Missing key(s) in config_dict: {", ".join(missing_keys)}' if not missing_keys: config_dict['filename_format'] = ','.join(keys) else: config_dict['filename_format'] = None print(f'"filename_format" should contain comma-separated keys that match existing keys in the YAML config file.') print('Setting "filename_format" as None') else: config_dict['filename_format'] = None # Compute complementary metadata elements # Create output filename prefix if not config_dict['filename_format']: # default behavior config_dict['filename_prefix'] = '_'.join([config_dict[key] for key in ['experiment', 'contact']]) else: config_dict['filename_prefix'] = '_'.join([config_dict[key] for key in config_dict['filename_format'].split(sep=',')]) # Set default dates from datetime_steps if not provided current_date = datetime.now().strftime('%Y-%m-%d') dates = config_dict.get('datetime_steps',[]) if not config_dict.get('dataset_startdate'): config_dict['dataset_startdate'] = min(config_dict['datetime_steps']).strftime('%Y-%m-%d') if dates else current_date # Earliest datetime step if not config_dict.get('dataset_enddate'): config_dict['dataset_enddate'] = max(config_dict['datetime_steps']).strftime('%Y-%m-%d') if dates else current_date # Latest datetime step config_dict['expected_datetime_format'] = '%Y-%m-%d %H-%M-%S' return config_dict def copy_subtree_and_create_hdf5(src, dst, select_dir_keywords, select_file_keywords, allowed_file_extensions, root_metadata_dict): """Helper function to copy directory with constraints and create HDF5.""" src = src.replace(os.sep,'/') dst = dst.replace(os.sep,'/') # Dry run to see what needs copying logging.info("Checking copy status for %s", src) # Return path to files that are expected in the dst directory path_to_expected_files = utils.copy_directory_with_contraints(src, dst, select_dir_keywords, select_file_keywords, allowed_file_extensions, dry_run=True) # Check existence and collect sizes all_exist = True total_size = 0 for dir_path, filenames in path_to_expected_files.items(): for filename in filenames: dst_file_path = os.path.join(dir_path, filename) if not os.path.exists(dst_file_path): all_exist = False # Get size from source file src_file_path = os.path.join(src, os.path.relpath(dst_file_path, dst)) if os.path.exists(src_file_path): #print(os.path.getsize(src_file_path)) total_size += os.path.getsize(src_file_path) if all_exist: logging.info(f"All files already exist at {dst}, skipping copy.") print(f"[Notice] All files already exist at {dst}, skipping copy.") path_to_files_dict = path_to_expected_files else: # Check available space for missing files only dst_free = shutil.disk_usage(".").free # checks the free space in the current dir if total_size > dst_free: raise Exception(f"Insufficient space: need {total_size/1e9:.6f}GB, have {dst_free/1e9:.6f}GB") else: print(f"Campaign folder size: {total_size/1e9:.6f}GB") print(f"Free space: {dst_free/1e9:.6f}GB") logging.info(f"Creating constrained copy of the experimental campaign folder {src} at: {dst}")#, src, dst) path_to_files_dict = utils.copy_directory_with_contraints(src, dst, select_dir_keywords, select_file_keywords, allowed_file_extensions) logging.info("Finished creating a copy of the experimental campaign folder tree at: %s", dst) logging.info("Creating HDF5 file at: %s", dst) #hdf5_path = hdf5_lib.create_hdf5_file_from_filesystem_path_new(dst, path_to_files_dict, select_dir_keywords, root_metadata_dict) hdf5_path = hdf5_lib.create_hdf5_file_from_filesystem_path(dst, path_to_files_dict, select_dir_keywords, root_metadata_dict) logging.info("Completed creation of HDF5 file %s at: %s", hdf5_path, dst) return hdf5_path def run_pipeline(path_to_config_yamlFile, log_dir='logs/'): config_dict = load_config_and_setup_logging(path_to_config_yamlFile, log_dir) path_to_input_dir = config_dict['input_file_directory'] path_to_output_dir = config_dict['output_file_directory'] select_dir_keywords = config_dict['instrument_datafolder'] # Define root folder metadata dictionary root_metadata_dict = {key : config_dict[key] for key in ['project', 'experiment', 'contact', 'actris_level']} # Get dataset start and end dates dataset_startdate = config_dict['dataset_startdate'] dataset_enddate = config_dict['dataset_enddate'] integration_mode = config_dict.get('integration_mode', 'single_experiment') filename_prefix = config_dict['filename_prefix'] output_filename_path = [] # Determine top-level campaign folder path top_level_foldername = _generate_output_path_fragment( filename_prefix, integration_mode, dataset_startdate, dataset_enddate, index=1 ) path_to_rawdata_folder = os.path.join( path_to_output_dir, top_level_foldername, "" ).replace(os.sep, '/') # Process individual datetime steps if available, regardless of mode if config_dict.get('datetime_steps_dict', {}): for datetime_step, file_keywords in config_dict['datetime_steps_dict'].items(): single_date_str = datetime_step.strftime('%Y%m%d') subfolder_name = f"{filename_prefix}_{single_date_str}" subfolder_name = f"experimental_step_{single_date_str}" path_to_rawdata_subfolder = os.path.join(path_to_rawdata_folder, subfolder_name, "") path_to_integrated_stepwise_hdf5_file = copy_subtree_and_create_hdf5( path_to_input_dir, path_to_rawdata_subfolder, select_dir_keywords, file_keywords, allowed_file_extensions, root_metadata_dict) output_filename_path.append(path_to_integrated_stepwise_hdf5_file) # Collection mode post-processing if integration_mode == 'collection': path_to_filenames_dict = {path_to_rawdata_folder: [os.path.basename(path) for path in output_filename_path]} if output_filename_path else {} hdf5_path = hdf5_lib.create_hdf5_file_from_filesystem_path( path_to_rawdata_folder, path_to_filenames_dict, [], root_metadata_dict ) output_filename_path.append(hdf5_path) else: path_to_integrated_stepwise_hdf5_file = copy_subtree_and_create_hdf5( path_to_input_dir, path_to_rawdata_folder, select_dir_keywords, [], allowed_file_extensions, root_metadata_dict) output_filename_path.append(path_to_integrated_stepwise_hdf5_file) return output_filename_path if __name__ == "__main__": if len(sys.argv) < 2: print("Usage: python data_integration.py ") sys.exit(1) function_name = sys.argv[1] if function_name == 'run': if len(sys.argv) != 3: print("Usage: python data_integration.py run ") sys.exit(1) path_to_config_yamlFile = sys.argv[2] run_pipeline(path_to_config_yamlFile)