From 8d17bf267ce356f61feaa1d27e51b4e69b9b41ca Mon Sep 17 00:00:00 2001 From: Florez Ospina Juan Felipe Date: Fri, 1 Nov 2024 09:52:41 +0100 Subject: [PATCH] Major code refactoring and simplifications to enhance modularity. Included a command line interface. --- pipelines/data_integration.py | 242 +++++++++++++++------------------- 1 file changed, 103 insertions(+), 139 deletions(-) diff --git a/pipelines/data_integration.py b/pipelines/data_integration.py index 635598c..5b9b529 100644 --- a/pipelines/data_integration.py +++ b/pipelines/data_integration.py @@ -3,21 +3,29 @@ import os root_dir = os.path.abspath(os.curdir) sys.path.append(root_dir) -import src.hdf5_writer as hdf5_lib -import utils.g5505_utils as utils -import yaml - + +import yaml import logging from datetime import datetime # Importing chain class from itertools from itertools import chain +# Import DIMA modules +import src.hdf5_writer as hdf5_lib +import utils.g5505_utils as utils from instruments.readers import filereader_registry allowed_file_extensions = filereader_registry.file_extensions - -from datetime import datetime +def _generate_datetime_dict(datetime_steps): + """ Generate the datetime augment dictionary from datetime steps. """ + datetime_augment_dict = {} + for datetime_step in datetime_steps: + #tmp = datetime.strptime(datetime_step, '%Y-%m-%d %H-%M-%S') + datetime_augment_dict[datetime_step] = [ + datetime_step.strftime('%Y-%m-%d'), datetime_step.strftime('%Y_%m_%d'), datetime_step.strftime('%Y.%m.%d'), datetime_step.strftime('%Y%m%d') + ] + return datetime_augment_dict def load_config_and_setup_logging(yaml_config_file_path, log_dir): """Load YAML configuration file, set up logging, and validate required keys and datetime_steps.""" @@ -50,15 +58,13 @@ def load_config_and_setup_logging(yaml_config_file_path, log_dir): raise KeyError(f"Missing required keys in YAML configuration: {missing_keys}") # Validate integration_mode - integration_mode = config_dict.get('integration_mode', 'collection') # Default to 'collection' + integration_mode = config_dict.get('integration_mode', 'N/A') # Default to 'collection' if integration_mode not in supported_integration_modes: - raise ValueError(f"Unsupported integration_mode '{integration_mode}'. Supported modes are {supported_integration_modes}") + raise RuntimeWarning( + f"Unsupported integration_mode '{integration_mode}'. Supported modes are {supported_integration_modes}. Setting '{integration_mode}' to 'single_experiment'." + ) - # Get dataset start and end dates (optional) - dataset_startdate = config_dict.get('dataset_startdate') - dataset_enddate = config_dict.get('dataset_enddate') - # Validate datetime_steps format if it exists if 'datetime_steps' in config_dict: datetime_steps = config_dict['datetime_steps'] @@ -74,34 +80,15 @@ def load_config_and_setup_logging(yaml_config_file_path, log_dir): config_dict['datetime_steps'][step_idx] = datetime.strptime(step, expected_format) except ValueError: raise ValueError(f"Invalid datetime format for '{step}'. Expected format: {expected_format}") - - # Set default dates from datetime_steps if not provided - if not dataset_startdate: - dataset_startdate = min(config_dict['datetime_steps']).strftime('%Y-%m-%d') # Earliest datetime step - - if not dataset_enddate: - dataset_enddate = max(config_dict['datetime_steps']).strftime('%Y-%m-%d') # Latest datetime step + # Augment datatime_steps list as a dictionary. This to speed up single-experiment file generation + config_dict['datetime_steps_dict'] = _generate_datetime_dict(datetime_steps) else: # If datetime_steps is not present, set the integration mode to 'collection' logging.info("datetime_steps missing, setting integration_mode to 'collection'.") config_dict['integration_mode'] = 'collection' - # Set sensible defaults using the current date - current_date = datetime.now().strftime('%Y-%m-%d') - if not dataset_startdate: - dataset_startdate = current_date - if not dataset_enddate: - dataset_enddate = current_date - - config_dict['expected_datetime_format'] = '%Y-%m-%d %H-%M-%S' - - # Add dataset dates to config for use later - config_dict['dataset_startdate'] = dataset_startdate - config_dict['dataset_enddate'] = dataset_enddate - # Validate filename_format if defined if 'filename_format' in config_dict: - if not isinstance(config_dict['filename_format'], str): raise ValueError(f'"Specified filename_format needs to be of String type" ') @@ -120,22 +107,50 @@ def load_config_and_setup_logging(yaml_config_file_path, log_dir): else: config_dict['filename_format'] = None + # Compute complementary metadata elements + + # Create output filename prefix + if not config_dict['filename_format']: # default behavior + config_dict['filename_prefix'] = '_'.join([config_dict[key] for key in ['experiment', 'contact']]) + else: + config_dict['filename_prefix'] = '_'.join([config_dict[key] for key in config_dict['filename_format'].split(sep=',')]) + + # Set default dates from datetime_steps if not provided + current_date = datetime.now().strftime('%Y-%m-%d') + dates = config_dict.get('datetime_steps',[]) + if not config_dict.get('dataset_startdate'): + config_dict['dataset_startdate'] = min(config_dict['datetime_steps']).strftime('%Y-%m-%d') if dates else current_date # Earliest datetime step + + if not config_dict.get('dataset_enddate'): + config_dict['dataset_enddate'] = max(config_dict['datetime_steps']).strftime('%Y-%m-%d') if dates else current_date # Latest datetime step + + config_dict['expected_datetime_format'] = '%Y-%m-%d %H-%M-%S' + return config_dict -def generate_datetime_dict(datetime_steps): - """ Generate the datetime augment dictionary from datetime steps. """ - datetime_augment_dict = {} - for datetime_step in datetime_steps: - #tmp = datetime.strptime(datetime_step, '%Y-%m-%d %H-%M-%S') - datetime_augment_dict[datetime_step] = [ - datetime_step.strftime('%Y-%m-%d'), datetime_step.strftime('%Y_%m_%d'), datetime_step.strftime('%Y.%m.%d'), datetime_step.strftime('%Y%m%d') - ] - return datetime_augment_dict +def copy_subtree_and_create_hdf5(src, dst, select_dir_keywords, select_file_keywords, allowed_file_extensions, root_metadata_dict): -def integrate_data_sources(yaml_config_file_path, log_dir='logs/'): + """Helper function to copy directory with constraints and create HDF5.""" + src = src.replace(os.sep,'/') + dst = dst.replace(os.sep,'/') + + logging.info("Creating constrained copy of the experimental campaign folder %s at: %s", src, dst) - """ Integrates data sources specified by the input configuration file into HDF5 files. + path_to_files_dict = utils.copy_directory_with_contraints(src, dst, select_dir_keywords, select_file_keywords, allowed_file_extensions) + logging.info("Finished creating a copy of the experimental campaign folder tree at: %s", dst) + + + logging.info("Creating HDF5 file at: %s", dst) + hdf5_path = hdf5_lib.create_hdf5_file_from_filesystem_path(dst, path_to_files_dict, select_dir_keywords, root_metadata_dict) + logging.info("Completed creation of HDF5 file %s at: %s", hdf5_path, dst) + + return hdf5_path + + +def run_pipeline(path_to_config_yamlFile, log_dir='logs/'): + + """Integrates data sources specified by the input configuration file into HDF5 files. Parameters: yaml_config_file_path (str): Path to the YAML configuration file. @@ -145,124 +160,73 @@ def integrate_data_sources(yaml_config_file_path, log_dir='logs/'): list: List of Paths to the created HDF5 file(s). """ - config_dict = load_config_and_setup_logging(yaml_config_file_path, log_dir) + config_dict = load_config_and_setup_logging(path_to_config_yamlFile, log_dir) - exp_campaign_name = config_dict['experiment'] - initials = config_dict['contact'] path_to_input_dir = config_dict['input_file_directory'] path_to_output_dir = config_dict['output_file_directory'] select_dir_keywords = config_dict['instrument_datafolder'] - root_metadata_dict = { - 'project' : config_dict['project'], - 'experiment' : config_dict['experiment'], - 'contact' : config_dict['contact'], - 'actris_level': config_dict['actris_level'] - } - # Get dataset start and end dates. + # Define root folder metadata dictionary + root_metadata_dict = {key : config_dict[key] for key in ['project', 'experiment', 'contact', 'actris_level']} + + # Get dataset start and end dates dataset_startdate = config_dict['dataset_startdate'] dataset_enddate = config_dict['dataset_enddate'] - # Handle datetime_steps and integration mode - datetime_augment_dict = {} - select_file_keywords = [] - - if 'datetime_steps' in config_dict and config_dict['datetime_steps']: - # We augment each datetime step with various datetime formats to compensate for filenaming inconsistencies - datetime_augment_dict = generate_datetime_dict(config_dict['datetime_steps']) - - # Create flattened datetime list of datetime object file keywords - select_file_keywords = list(chain.from_iterable(datetime_augment_dict.values())) - - - if not config_dict['filename_format']: # if config_dict['filename_format'] == None - filename_parts = [exp_campaign_name,initials] - else: - filename_parts = [config_dict[key] for key in config_dict['filename_format'].split(',')] - # Determine mode and process accordingly output_filename_path = [] - - campaign_name_template = lambda name, date, initials : f"{name}_{date}_{initials}" - campaign_name_template = lambda filename_parts, suffix : '_'.join(filename_parts+[suffix]) - + campaign_name_template = lambda filename_prefix, suffix: '_'.join([filename_prefix, suffix]) date_str = f'{dataset_startdate}_{dataset_enddate}' - # Create path to new rawdata subfolder and standardize it with forward-slashes as required by pipeline - path_to_rawdata_folder = os.path.join(path_to_output_dir, 'collection_' + campaign_name_template(filename_parts, date_str),"").replace(os.sep,'/') - if datetime_augment_dict: - + # Create path to new raw datafolder and standardize with forward slashes + path_to_rawdata_folder = os.path.join( + path_to_output_dir, 'collection_' + campaign_name_template(config_dict['filename_prefix'], date_str), "").replace(os.sep, '/') + + # Process individual datetime steps if available, regardless of mode + if config_dict.get('datetime_steps_dict', {}): # Single experiment mode - for datetime_step in datetime_augment_dict.keys(): - + for datetime_step, file_keywords in config_dict['datetime_steps_dict'].items(): date_str = datetime_step.strftime('%Y-%m-%d') - select_file_keywords = datetime_augment_dict[datetime_step] + single_campaign_name = campaign_name_template(config_dict['filename_prefix'], date_str) + path_to_rawdata_subfolder = os.path.join(path_to_rawdata_folder, single_campaign_name, "") - single_campaign_name = campaign_name_template(filename_parts, date_str) #campaign_name_template(exp_campaign_name, initials, date_str) - - # Create path to new rawdata subfolder and standardize it with forward slashes as required by pipeline - path_to_rawdata_subfolder = os.path.join(path_to_rawdata_folder, single_campaign_name,"").replace(os.sep,'/') - - # Run two step data integration pipeline on specified experimental campaign data collection - path_to_integrated_stepwise_hdf5_file = dima_pipeline(path_to_input_dir, - path_to_rawdata_subfolder , - select_dir_keywords, - select_file_keywords, - root_metadata_dict) + path_to_integrated_stepwise_hdf5_file = copy_subtree_and_create_hdf5( + path_to_input_dir, path_to_rawdata_subfolder, select_dir_keywords, + file_keywords, allowed_file_extensions, root_metadata_dict) output_filename_path.append(path_to_integrated_stepwise_hdf5_file) + # Collection mode processing if specified if 'collection' in config_dict.get('integration_mode', 'single_experiment'): - #date_str = f'{dataset_startdate}_{dataset_enddate}' - path_to_filenames_dict = {} - path_to_filenames_dict[path_to_rawdata_folder] = [] - for path in output_filename_path: - path_to_file, filename = os.path.split(path) # path_to_file should be same as path_to_rawdata_folder - path_to_filenames_dict[path_to_rawdata_folder].append(filename) + path_to_filenames_dict = {path_to_rawdata_folder: [os.path.basename(path) for path in output_filename_path]} if output_filename_path else {} + hdf5_path = hdf5_lib.create_hdf5_file_from_filesystem_path(path_to_rawdata_folder, path_to_filenames_dict, [], root_metadata_dict) + output_filename_path.append(hdf5_path) + else: + path_to_integrated_stepwise_hdf5_file = copy_subtree_and_create_hdf5( + path_to_input_dir, path_to_rawdata_folder, select_dir_keywords, [], + allowed_file_extensions, root_metadata_dict) + output_filename_path.append(path_to_integrated_stepwise_hdf5_file) - path_to_integrated_hdf5_file = hdf5_lib.create_hdf5_file_from_filesystem_path(path_to_rawdata_folder, - path_to_filenames_dict, - [], - root_metadata_dict) - - output_filename_path.append(path_to_integrated_hdf5_file) - - else: # collection - path_to_integrated_hdf5_file = dima_pipeline(path_to_input_dir, - path_to_rawdata_folder, - select_dir_keywords, - select_file_keywords, - root_metadata_dict) - - - output_filename_path.append(path_to_integrated_hdf5_file) - return output_filename_path -def dima_pipeline(path_to_input_dir, path_to_rawdata_folder, select_dir_keywords, select_file_keywords, root_metadata_dict = {}): - # Possibly replace below line with os.path.join(path_to_rawdata_folder,'..') - path_to_output_dir = os.path.join(path_to_rawdata_folder,'..') - path_to_output_dir = os.path.normpath(path_to_output_dir) - - logging.info("Creating copy of the experimental campaign folder at: %s", path_to_output_dir) - # Step 1: Search through input directory and make a copy in the output directory that complies with naming constraints - path_to_input_dir_filenames_dict = utils.copy_directory_with_contraints(path_to_input_dir, - path_to_rawdata_folder, - select_dir_keywords, - select_file_keywords, - allowed_file_extensions) - logging.info("Finished creating a copy of the experimental campaign folder at: %s", path_to_output_dir) +if __name__ == "__main__": - # Step 2: Create HDF5 file - logging.info("Creating HDF5 file at: %s", path_to_output_dir) + if len(sys.argv) < 2: + print("Usage: python data_integration.py ") + sys.exit(1) + + # Extract the function name from the command line arguments + function_name = sys.argv[1] + + # Handle function execution based on the provided function name + if function_name == 'run': + + if len(sys.argv) != 2: + print("Usage: python data_integration.py run ") + sys.exit(1) + # Extract path to configuration file, specifying the data integration task + path_to_config_yamlFile = sys.argv[2] + run_pipeline(path_to_config_yamlFile) - path_to_integrated_hdf5_file = hdf5_lib.create_hdf5_file_from_filesystem_path(path_to_rawdata_folder, - path_to_input_dir_filenames_dict, - select_dir_keywords, - root_metadata_dict) - logging.info("Completed creation of HDF5 file %s for the specified experimental campaign at: %s", - path_to_integrated_hdf5_file, path_to_output_dir) - - return path_to_integrated_hdf5_file