From 2dd033bcb380e5250e5e9e39530ad02a37c09507 Mon Sep 17 00:00:00 2001 From: Florez Ospina Juan Felipe Date: Tue, 17 Sep 2024 15:28:11 +0200 Subject: [PATCH] Refactored code into functions to parse and validate yaml condif file and to perform specified data integration task using a pipeline like software structure. --- src/data_integration_lib.py | 223 +++++++++++++++++++++++++++--------- 1 file changed, 167 insertions(+), 56 deletions(-) diff --git a/src/data_integration_lib.py b/src/data_integration_lib.py index 7231783..c813c1a 100644 --- a/src/data_integration_lib.py +++ b/src/data_integration_lib.py @@ -9,8 +9,108 @@ import yaml import logging from datetime import datetime +# Importing chain class from itertools +from itertools import chain + +from instruments.readers import filereader_registry + +allowed_file_extensions = filereader_registry.file_extensions +from datetime import datetime + +def load_config_and_setup_logging(yaml_config_file_path, log_dir): + """Load YAML configuration file, set up logging, and validate required keys and datetime_steps.""" + + # Define required keys + required_keys = [ + 'experiment', 'contact', 'input_file_directory', 'output_file_directory', + 'instrument_datafolder', 'project', 'actris_level' + ] + + # Supported integration modes + supported_integration_modes = ['collection', 'single_experiment'] + + + # Set up logging + date = utils.created_at() + utils.setup_logging(log_dir, f"integrate_data_sources_{date}.log") + + # Load YAML configuration file + with open(yaml_config_file_path, 'r') as stream: + try: + config_dict = yaml.load(stream, Loader=yaml.FullLoader) + except yaml.YAMLError as exc: + logging.error("Error loading YAML file: %s", exc) + raise ValueError(f"Failed to load YAML file: {exc}") + + # Check if required keys are present + missing_keys = [key for key in required_keys if key not in config_dict] + if missing_keys: + raise KeyError(f"Missing required keys in YAML configuration: {missing_keys}") + + # Validate integration_mode + integration_mode = config_dict.get('integration_mode', 'collection') # Default to 'collection' + if integration_mode not in supported_integration_modes: + raise ValueError(f"Unsupported integration_mode '{integration_mode}'. Supported modes are {supported_integration_modes}") + + + # Get dataset start and end dates (optional) + dataset_startdate = config_dict.get('dataset_startdate') + dataset_enddate = config_dict.get('dataset_enddate') + + # Validate datetime_steps format if it exists + if 'datetime_steps' in config_dict: + datetime_steps = config_dict['datetime_steps'] + expected_format = '%Y-%m-%d %H-%M-%S' + + # Check if datetime_steps is a list or a falsy value + if datetime_steps and not isinstance(datetime_steps, list): + raise TypeError(f"datetime_steps should be a list of strings or a falsy value (None, empty), but got {type(datetime_steps)}") + + for step_idx, step in enumerate(datetime_steps): + try: + # Attempt to parse the datetime to ensure correct format + config_dict['datetime_steps'][step_idx] = datetime.strptime(step, expected_format) + except ValueError: + raise ValueError(f"Invalid datetime format for '{step}'. Expected format: {expected_format}") + + # Set default dates from datetime_steps if not provided + if not dataset_startdate: + dataset_startdate = min(config_dict['datetime_steps']).strftime('%Y-%m-%d') # Earliest datetime step + + if not dataset_enddate: + dataset_enddate = max(config_dict['datetime_steps']).strftime('%Y-%m-%d') # Latest datetime step + else: + # If datetime_steps is not present, set the integration mode to 'collection' + logging.info("datetime_steps missing, setting integration_mode to 'collection'.") + config_dict['integration_mode'] = 'collection' + + # Set sensible defaults using the current date + current_date = datetime.now().strftime('%Y-%m-%d') + if not dataset_startdate: + dataset_startdate = current_date + if not dataset_enddate: + dataset_enddate = current_date + + config_dict['expected_datetime_format'] = '%Y-%m-%d %H-%M-%S' + + # Add dataset dates to config for use later + config_dict['dataset_startdate'] = dataset_startdate + config_dict['dataset_enddate'] = dataset_enddate + + return config_dict + + +def generate_datetime_dict(datetime_steps): + """ Generate the datetime augment dictionary from datetime steps. """ + datetime_augment_dict = {} + for datetime_step in datetime_steps: + #tmp = datetime.strptime(datetime_step, '%Y-%m-%d %H-%M-%S') + datetime_augment_dict[datetime_step] = [ + datetime_step.strftime('%Y-%m-%d'), datetime_step.strftime('%Y_%m_%d'), datetime_step.strftime('%Y.%m.%d'), datetime_step.strftime('%Y%m%d') + ] + return datetime_augment_dict def integrate_data_sources(yaml_config_file_path, log_dir='logs/'): @@ -21,26 +121,18 @@ def integrate_data_sources(yaml_config_file_path, log_dir='logs/'): log_dir (str): Directory to save the log file. Returns: - str: Path (or list of Paths) to the created HDF5 file(s). + list: List of Paths to the created HDF5 file(s). """ - date = utils.created_at() - utils.setup_logging(log_dir, f"integrate_data_sources_{date}.log") - - with open(yaml_config_file_path,'r') as stream: - try: - config_dict = yaml.load(stream, Loader=yaml.FullLoader) - except yaml.YAMLError as exc: - logging.error("Error loading YAML file: %s", exc) - raise - def output_filename(name, date, initials): return f"{name}_{date}_{initials}.h5" + + config_dict = load_config_and_setup_logging(yaml_config_file_path, log_dir) exp_campaign_name = config_dict['experiment'] initials = config_dict['contact'] - input_file_dir = config_dict['input_file_directory'] - output_dir = config_dict['output_file_directory'] + path_to_input_dir = config_dict['input_file_directory'] + path_to_output_dir = config_dict['output_file_directory'] select_dir_keywords = config_dict['instrument_datafolder'] root_metadata_dict = { 'project' : config_dict['project'], @@ -49,54 +141,73 @@ def integrate_data_sources(yaml_config_file_path, log_dir='logs/'): 'actris_level': config_dict['actris_level'] } - def create_hdf5_file(date_str, select_file_keywords,root_metadata): - filename = output_filename(exp_campaign_name, date_str, initials) - output_path = os.path.join(output_dir, filename) - logging.info("Creating HDF5 file at: %s", output_path) - - return hdf5_lib.create_hdf5_file_from_filesystem_path( - output_path, input_file_dir, select_dir_keywords, select_file_keywords, root_metadata_dict=root_metadata - ) + # Get dataset start and end dates. + dataset_startdate = config_dict['dataset_startdate'] + dataset_enddate = config_dict['dataset_enddate'] + + # Handle datetime_steps and integration mode + datetime_augment_dict = {} + select_file_keywords = [] - if config_dict.get('datetime_steps'): + if 'datetime_steps' in config_dict and config_dict['datetime_steps']: + # We augment each datetime step with various datetime formats to compensate for filenaming inconsistencies + datetime_augment_dict = generate_datetime_dict(config_dict['datetime_steps']) - datetime_augment_dict = {} - for datetime_step in config_dict['datetime_steps']: - tmp = datetime.strptime(datetime_step,'%Y-%m-%d %H-%M-%S') #convert(datetime_step) - datetime_augment_dict[tmp] = [tmp.strftime('%Y-%m-%d'),tmp.strftime('%Y_%m_%d'),tmp.strftime('%Y.%m.%d'),tmp.strftime('%Y%m%d')] - print(tmp) + # Create flattened datetime list of datetime object file keywords + select_file_keywords = list(chain.from_iterable(datetime_augment_dict.values())) - if 'single_experiment' in config_dict['integration_mode']: - output_filename_path = [] - for datetime_step in datetime_augment_dict.keys(): - date_str = datetime_step.strftime('%Y-%m-%d') - select_file_keywords = datetime_augment_dict[datetime_step] + + + # Determine mode and process accordingly + output_filename_path = [] - root_metadata_dict.update({'dataset_startdate': date_str, - 'dataset_enddate': date_str}) - dt_step_output_filename_path= create_hdf5_file(date_str, select_file_keywords, root_metadata_dict) - output_filename_path.append(dt_step_output_filename_path) - - elif 'collection' in config_dict['integration_mode']: - select_file_keywords = [] - for datetime_step in datetime_augment_dict.keys(): - select_file_keywords = select_file_keywords + datetime_augment_dict[datetime_step] + if 'single_experiment' in config_dict.get('integration_mode', 'collection') and datetime_augment_dict: + + # Single experiment mode + for datetime_step in datetime_augment_dict.keys(): - config_dict['dataset_startdate'] = min(datetime_augment_dict.keys()) - config_dict['dataset_enddate'] = max(datetime_augment_dict.keys()) - startdate = config_dict['dataset_startdate'].strftime('%Y-%m-%d') - enddate = config_dict['dataset_enddate'].strftime('%Y-%m-%d') - root_metadata_dict.update({'dataset_startdate': startdate, - 'dataset_enddate': enddate}) + date_str = datetime_step.strftime('%Y-%m-%d') + select_file_keywords = datetime_augment_dict[datetime_step] - date_str = f'{startdate}_{enddate}' - output_filename_path = create_hdf5_file(date_str, select_file_keywords, root_metadata_dict) - else: - startdate = config_dict['dataset_startdate'] - enddate = config_dict['dataset_enddate'] - root_metadata_dict.update({'dataset_startdate': startdate, - 'dataset_enddate': enddate}) - date_str = f'{startdate}_{enddate}' - output_filename_path = create_hdf5_file(date_str, select_file_keywords = [], root_metadata = root_metadata_dict) + filename = output_filename(exp_campaign_name, initials, date_str) + path_to_rawdata_folder = os.path.splitext(os.path.join(path_to_output_dir, filename))[0] + + # Step 1: Search through input directory and make a copy in the output directory that complies with naming constraints + path_to_input_dir_filenames_dict = utils.copy_directory_with_contraints(path_to_input_dir, + path_to_rawdata_folder, + select_dir_keywords, + select_file_keywords, + allowed_file_extensions) + + # Step 2: Create HDF5 file + + path_to_integrated__stepwise_hdf5_file = hdf5_lib.create_hdf5_file_from_filesystem_path(path_to_rawdata_folder, + path_to_input_dir_filenames_dict, + select_dir_keywords, + root_metadata_dict) + output_filename_path.append(path_to_integrated__stepwise_hdf5_file) + + else: # collection + + # Collection mode or no datetime_steps + + date_str = f'{dataset_startdate}_{dataset_enddate}' + filename = output_filename(exp_campaign_name, initials, date_str) + path_to_rawdata_folder = os.path.splitext(os.path.join(path_to_output_dir, filename))[0] + + logging.info("Creating copy of specified experimental campaign folder at: %s", path_to_output_dir) + # Step 1: Search through input directory and make a copy in the output directory that complies with naming constraints + path_to_input_dir_filenames_dict = utils.copy_directory_with_contraints(path_to_input_dir, + path_to_rawdata_folder, + select_dir_keywords, + select_file_keywords, + allowed_file_extensions) + # Step 2: Create HDF5 file + logging.info("Creating HDF5 file at: %s", path_to_output_dir) + path_to_integrated_hdf5_file = hdf5_lib.create_hdf5_file_from_filesystem_path(path_to_rawdata_folder, + path_to_input_dir_filenames_dict, + select_dir_keywords, + root_metadata_dict) + output_filename_path.append(path_to_integrated_hdf5_file ) return output_filename_path \ No newline at end of file