diff --git a/pipelines/data_integration.py b/pipelines/data_integration.py index 318b84d..4521d42 100644 --- a/pipelines/data_integration.py +++ b/pipelines/data_integration.py @@ -38,12 +38,19 @@ def _generate_datetime_dict(datetime_steps): """ Generate the datetime augment dictionary from datetime steps. """ datetime_augment_dict = {} for datetime_step in datetime_steps: - #tmp = datetime.strptime(datetime_step, '%Y-%m-%d %H-%M-%S') datetime_augment_dict[datetime_step] = [ - datetime_step.strftime('%Y-%m-%d'), datetime_step.strftime('%Y_%m_%d'), datetime_step.strftime('%Y.%m.%d'), datetime_step.strftime('%Y%m%d') + datetime_step.strftime('%Y-%m-%d'), datetime_step.strftime('%Y_%m_%d'), + datetime_step.strftime('%Y.%m.%d'), datetime_step.strftime('%Y%m%d') ] return datetime_augment_dict +def _generate_output_path_fragment(filename_prefix, integration_mode, dataset_startdate, dataset_enddate, index=None): + """Generate consistent directory or file name fragment based on mode.""" + if integration_mode == 'collection': + return f'collection_{index}_{filename_prefix}_{dataset_enddate}' + else: + return f'{filename_prefix}_{dataset_enddate}' + def load_config_and_setup_logging(yaml_config_file_path, log_dir): """Load YAML configuration file, set up logging, and validate required keys and datetime_steps.""" @@ -189,17 +196,6 @@ def copy_subtree_and_create_hdf5(src, dst, select_dir_keywords, select_file_keyw def run_pipeline(path_to_config_yamlFile, log_dir='logs/'): - - """Integrates data sources specified by the input configuration file into HDF5 files. - - Parameters: - yaml_config_file_path (str): Path to the YAML configuration file. - log_dir (str): Directory to save the log file. - - Returns: - list: List of Paths to the created HDF5 file(s). - """ - config_dict = load_config_and_setup_logging(path_to_config_yamlFile, log_dir) path_to_input_dir = config_dict['input_file_directory'] @@ -213,61 +209,59 @@ def run_pipeline(path_to_config_yamlFile, log_dir='logs/'): dataset_startdate = config_dict['dataset_startdate'] dataset_enddate = config_dict['dataset_enddate'] - # Determine mode and process accordingly - output_filename_path = [] - campaign_name_template = lambda filename_prefix, suffix: '_'.join([filename_prefix, suffix]) - date_str = f'{dataset_startdate}_{dataset_enddate}' + integration_mode = config_dict.get('integration_mode', 'single_experiment') + filename_prefix = config_dict['filename_prefix'] + + output_filename_path = [] + + # Determine top-level campaign folder path + top_level_foldername = _generate_output_path_fragment( + filename_prefix, integration_mode, dataset_startdate, dataset_enddate, index=1 + ) - # Create path to new raw datafolder and standardize with forward slashes path_to_rawdata_folder = os.path.join( - path_to_output_dir, 'collection_' + campaign_name_template(config_dict['filename_prefix'], date_str), "").replace(os.sep, '/') + path_to_output_dir, top_level_foldername, "" + ).replace(os.sep, '/') # Process individual datetime steps if available, regardless of mode - if config_dict.get('datetime_steps_dict', {}): - # Single experiment mode + if config_dict.get('datetime_steps_dict', {}): for datetime_step, file_keywords in config_dict['datetime_steps_dict'].items(): - date_str = datetime_step.strftime('%Y-%m-%d') - single_campaign_name = campaign_name_template(config_dict['filename_prefix'], date_str) - path_to_rawdata_subfolder = os.path.join(path_to_rawdata_folder, single_campaign_name, "") + single_date_str = datetime_step.strftime('%Y%m%d') + subfolder_name = f"{filename_prefix}_{single_date_str}" + subfolder_name = f"experimental_step_{single_date_str}" + path_to_rawdata_subfolder = os.path.join(path_to_rawdata_folder, subfolder_name, "") path_to_integrated_stepwise_hdf5_file = copy_subtree_and_create_hdf5( - path_to_input_dir, path_to_rawdata_subfolder, select_dir_keywords, - file_keywords, allowed_file_extensions, root_metadata_dict) + path_to_input_dir, path_to_rawdata_subfolder, select_dir_keywords, + file_keywords, allowed_file_extensions, root_metadata_dict) output_filename_path.append(path_to_integrated_stepwise_hdf5_file) - # Collection mode processing if specified - if 'collection' in config_dict.get('integration_mode', 'single_experiment'): + # Collection mode post-processing + if integration_mode == 'collection': path_to_filenames_dict = {path_to_rawdata_folder: [os.path.basename(path) for path in output_filename_path]} if output_filename_path else {} - #hdf5_path = hdf5_lib.create_hdf5_file_from_filesystem_path_new(path_to_rawdata_folder, path_to_filenames_dict, [], root_metadata_dict) - hdf5_path = hdf5_lib.create_hdf5_file_from_filesystem_path(path_to_rawdata_folder, path_to_filenames_dict, [], root_metadata_dict) + hdf5_path = hdf5_lib.create_hdf5_file_from_filesystem_path( + path_to_rawdata_folder, path_to_filenames_dict, [], root_metadata_dict + ) output_filename_path.append(hdf5_path) else: path_to_integrated_stepwise_hdf5_file = copy_subtree_and_create_hdf5( - path_to_input_dir, path_to_rawdata_folder, select_dir_keywords, [], - allowed_file_extensions, root_metadata_dict) + path_to_input_dir, path_to_rawdata_folder, select_dir_keywords, [], + allowed_file_extensions, root_metadata_dict) output_filename_path.append(path_to_integrated_stepwise_hdf5_file) return output_filename_path - if __name__ == "__main__": - if len(sys.argv) < 2: print("Usage: python data_integration.py ") sys.exit(1) - # Extract the function name from the command line arguments function_name = sys.argv[1] - # Handle function execution based on the provided function name if function_name == 'run': - if len(sys.argv) != 3: print("Usage: python data_integration.py run ") sys.exit(1) - # Extract path to configuration file, specifying the data integration task path_to_config_yamlFile = sys.argv[2] run_pipeline(path_to_config_yamlFile) - -