From df2f7b3de610005dd4321490fbe55923ed0538e9 Mon Sep 17 00:00:00 2001 From: Florez Ospina Juan Felipe Date: Wed, 25 Sep 2024 15:23:23 +0200 Subject: [PATCH] Abstracted reusable steps in integration_sources as dima_pipeline.py and added functionality to make a collection of hdf5 files, where each represents an single experiment of campaign. --- pipelines/data_integration.py | 109 +++++++++++++++++++++------------- 1 file changed, 69 insertions(+), 40 deletions(-) diff --git a/pipelines/data_integration.py b/pipelines/data_integration.py index c813c1a..35ccc75 100644 --- a/pipelines/data_integration.py +++ b/pipelines/data_integration.py @@ -123,10 +123,7 @@ def integrate_data_sources(yaml_config_file_path, log_dir='logs/'): Returns: list: List of Paths to the created HDF5 file(s). """ - - def output_filename(name, date, initials): - return f"{name}_{date}_{initials}.h5" - + config_dict = load_config_and_setup_logging(yaml_config_file_path, log_dir) exp_campaign_name = config_dict['experiment'] @@ -161,53 +158,85 @@ def integrate_data_sources(yaml_config_file_path, log_dir='logs/'): # Determine mode and process accordingly output_filename_path = [] - if 'single_experiment' in config_dict.get('integration_mode', 'collection') and datetime_augment_dict: - - # Single experiment mode - for datetime_step in datetime_augment_dict.keys(): + campaign_name_template = lambda name, date, initials : f"{name}_{date}_{initials}" + + date_str = f'{dataset_startdate}_{dataset_enddate}' + # Create path to new rawdata subfolder and standardize it with forward-slashes as required by pipeline + path_to_rawdata_folder = os.path.join(path_to_output_dir, 'collection_' + campaign_name_template(exp_campaign_name, initials, date_str),"").replace(os.sep,'/') + + if datetime_augment_dict: + + # Single experiment mode + for datetime_step in datetime_augment_dict.keys(): date_str = datetime_step.strftime('%Y-%m-%d') select_file_keywords = datetime_augment_dict[datetime_step] - filename = output_filename(exp_campaign_name, initials, date_str) - path_to_rawdata_folder = os.path.splitext(os.path.join(path_to_output_dir, filename))[0] + single_campaign_name = campaign_name_template(exp_campaign_name, initials, date_str) - # Step 1: Search through input directory and make a copy in the output directory that complies with naming constraints - path_to_input_dir_filenames_dict = utils.copy_directory_with_contraints(path_to_input_dir, - path_to_rawdata_folder, + # Create path to new rawdata subfolder and standardize it with forward slashes as required by pipeline + path_to_rawdata_subfolder = os.path.join(path_to_rawdata_folder, single_campaign_name,"").replace(os.sep,'/') + + # Run two step data integration pipeline on specified experimental campaign data collection + path_to_integrated_stepwise_hdf5_file = dima_pipeline(path_to_input_dir, + path_to_rawdata_subfolder , select_dir_keywords, select_file_keywords, - allowed_file_extensions) + root_metadata_dict) - # Step 2: Create HDF5 file + output_filename_path.append(path_to_integrated_stepwise_hdf5_file) - path_to_integrated__stepwise_hdf5_file = hdf5_lib.create_hdf5_file_from_filesystem_path(path_to_rawdata_folder, - path_to_input_dir_filenames_dict, - select_dir_keywords, - root_metadata_dict) - output_filename_path.append(path_to_integrated__stepwise_hdf5_file) + if 'collection' in config_dict.get('integration_mode', 'single_experiment'): + #date_str = f'{dataset_startdate}_{dataset_enddate}' + path_to_filenames_dict = {} + path_to_filenames_dict[path_to_rawdata_folder] = [] + for path in output_filename_path: + path_to_file, filename = os.path.split(path) # path_to_file should be same as path_to_rawdata_folder + path_to_filenames_dict[path_to_rawdata_folder].append(filename) + + path_to_integrated_hdf5_file = hdf5_lib.create_hdf5_file_from_filesystem_path(path_to_rawdata_folder, + path_to_filenames_dict, + [], + root_metadata_dict) - else: # collection + output_filename_path.append(path_to_integrated_hdf5_file) - # Collection mode or no datetime_steps + else: # collection + path_to_integrated_hdf5_file = dima_pipeline(path_to_input_dir, + path_to_rawdata_folder, + select_dir_keywords, + select_file_keywords, + root_metadata_dict) - date_str = f'{dataset_startdate}_{dataset_enddate}' - filename = output_filename(exp_campaign_name, initials, date_str) - path_to_rawdata_folder = os.path.splitext(os.path.join(path_to_output_dir, filename))[0] - logging.info("Creating copy of specified experimental campaign folder at: %s", path_to_output_dir) - # Step 1: Search through input directory and make a copy in the output directory that complies with naming constraints - path_to_input_dir_filenames_dict = utils.copy_directory_with_contraints(path_to_input_dir, - path_to_rawdata_folder, - select_dir_keywords, - select_file_keywords, - allowed_file_extensions) - # Step 2: Create HDF5 file - logging.info("Creating HDF5 file at: %s", path_to_output_dir) - path_to_integrated_hdf5_file = hdf5_lib.create_hdf5_file_from_filesystem_path(path_to_rawdata_folder, - path_to_input_dir_filenames_dict, - select_dir_keywords, - root_metadata_dict) - output_filename_path.append(path_to_integrated_hdf5_file ) + output_filename_path.append(path_to_integrated_hdf5_file) - return output_filename_path \ No newline at end of file + return output_filename_path + +def dima_pipeline(path_to_input_dir, path_to_rawdata_folder, select_dir_keywords, select_file_keywords, root_metadata_dict = {}): + # Possibly replace below line with os.path.join(path_to_rawdata_folder,'..') + path_to_output_dir = os.path.join(path_to_rawdata_folder,'..') + path_to_output_dir = os.path.normpath(path_to_output_dir) + + logging.info("Creating copy of the experimental campaign folder at: %s", path_to_output_dir) + + # Step 1: Search through input directory and make a copy in the output directory that complies with naming constraints + path_to_input_dir_filenames_dict = utils.copy_directory_with_contraints(path_to_input_dir, + path_to_rawdata_folder, + select_dir_keywords, + select_file_keywords, + allowed_file_extensions) + logging.info("Finished creating a copy of the experimental campaign folder at: %s", path_to_output_dir) + + # Step 2: Create HDF5 file + logging.info("Creating HDF5 file at: %s", path_to_output_dir) + + path_to_integrated_hdf5_file = hdf5_lib.create_hdf5_file_from_filesystem_path(path_to_rawdata_folder, + path_to_input_dir_filenames_dict, + select_dir_keywords, + root_metadata_dict) + logging.info("Completed creation of HDF5 file %s for the specified experimental campaign at: %s", + path_to_integrated_hdf5_file, path_to_output_dir) + + return path_to_integrated_hdf5_file +