Simplify output dir and file naming
This commit is contained in:
@ -38,12 +38,19 @@ def _generate_datetime_dict(datetime_steps):
|
|||||||
""" Generate the datetime augment dictionary from datetime steps. """
|
""" Generate the datetime augment dictionary from datetime steps. """
|
||||||
datetime_augment_dict = {}
|
datetime_augment_dict = {}
|
||||||
for datetime_step in datetime_steps:
|
for datetime_step in datetime_steps:
|
||||||
#tmp = datetime.strptime(datetime_step, '%Y-%m-%d %H-%M-%S')
|
|
||||||
datetime_augment_dict[datetime_step] = [
|
datetime_augment_dict[datetime_step] = [
|
||||||
datetime_step.strftime('%Y-%m-%d'), datetime_step.strftime('%Y_%m_%d'), datetime_step.strftime('%Y.%m.%d'), datetime_step.strftime('%Y%m%d')
|
datetime_step.strftime('%Y-%m-%d'), datetime_step.strftime('%Y_%m_%d'),
|
||||||
|
datetime_step.strftime('%Y.%m.%d'), datetime_step.strftime('%Y%m%d')
|
||||||
]
|
]
|
||||||
return datetime_augment_dict
|
return datetime_augment_dict
|
||||||
|
|
||||||
|
def _generate_output_path_fragment(filename_prefix, integration_mode, dataset_startdate, dataset_enddate, index=None):
|
||||||
|
"""Generate consistent directory or file name fragment based on mode."""
|
||||||
|
if integration_mode == 'collection':
|
||||||
|
return f'collection_{index}_{filename_prefix}_{dataset_enddate}'
|
||||||
|
else:
|
||||||
|
return f'{filename_prefix}_{dataset_enddate}'
|
||||||
|
|
||||||
def load_config_and_setup_logging(yaml_config_file_path, log_dir):
|
def load_config_and_setup_logging(yaml_config_file_path, log_dir):
|
||||||
"""Load YAML configuration file, set up logging, and validate required keys and datetime_steps."""
|
"""Load YAML configuration file, set up logging, and validate required keys and datetime_steps."""
|
||||||
|
|
||||||
@ -189,17 +196,6 @@ def copy_subtree_and_create_hdf5(src, dst, select_dir_keywords, select_file_keyw
|
|||||||
|
|
||||||
|
|
||||||
def run_pipeline(path_to_config_yamlFile, log_dir='logs/'):
|
def run_pipeline(path_to_config_yamlFile, log_dir='logs/'):
|
||||||
|
|
||||||
"""Integrates data sources specified by the input configuration file into HDF5 files.
|
|
||||||
|
|
||||||
Parameters:
|
|
||||||
yaml_config_file_path (str): Path to the YAML configuration file.
|
|
||||||
log_dir (str): Directory to save the log file.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
list: List of Paths to the created HDF5 file(s).
|
|
||||||
"""
|
|
||||||
|
|
||||||
config_dict = load_config_and_setup_logging(path_to_config_yamlFile, log_dir)
|
config_dict = load_config_and_setup_logging(path_to_config_yamlFile, log_dir)
|
||||||
|
|
||||||
path_to_input_dir = config_dict['input_file_directory']
|
path_to_input_dir = config_dict['input_file_directory']
|
||||||
@ -213,61 +209,59 @@ def run_pipeline(path_to_config_yamlFile, log_dir='logs/'):
|
|||||||
dataset_startdate = config_dict['dataset_startdate']
|
dataset_startdate = config_dict['dataset_startdate']
|
||||||
dataset_enddate = config_dict['dataset_enddate']
|
dataset_enddate = config_dict['dataset_enddate']
|
||||||
|
|
||||||
# Determine mode and process accordingly
|
integration_mode = config_dict.get('integration_mode', 'single_experiment')
|
||||||
output_filename_path = []
|
filename_prefix = config_dict['filename_prefix']
|
||||||
campaign_name_template = lambda filename_prefix, suffix: '_'.join([filename_prefix, suffix])
|
|
||||||
date_str = f'{dataset_startdate}_{dataset_enddate}'
|
output_filename_path = []
|
||||||
|
|
||||||
|
# Determine top-level campaign folder path
|
||||||
|
top_level_foldername = _generate_output_path_fragment(
|
||||||
|
filename_prefix, integration_mode, dataset_startdate, dataset_enddate, index=1
|
||||||
|
)
|
||||||
|
|
||||||
# Create path to new raw datafolder and standardize with forward slashes
|
|
||||||
path_to_rawdata_folder = os.path.join(
|
path_to_rawdata_folder = os.path.join(
|
||||||
path_to_output_dir, 'collection_' + campaign_name_template(config_dict['filename_prefix'], date_str), "").replace(os.sep, '/')
|
path_to_output_dir, top_level_foldername, ""
|
||||||
|
).replace(os.sep, '/')
|
||||||
|
|
||||||
# Process individual datetime steps if available, regardless of mode
|
# Process individual datetime steps if available, regardless of mode
|
||||||
if config_dict.get('datetime_steps_dict', {}):
|
if config_dict.get('datetime_steps_dict', {}):
|
||||||
# Single experiment mode
|
|
||||||
for datetime_step, file_keywords in config_dict['datetime_steps_dict'].items():
|
for datetime_step, file_keywords in config_dict['datetime_steps_dict'].items():
|
||||||
date_str = datetime_step.strftime('%Y-%m-%d')
|
single_date_str = datetime_step.strftime('%Y%m%d')
|
||||||
single_campaign_name = campaign_name_template(config_dict['filename_prefix'], date_str)
|
subfolder_name = f"{filename_prefix}_{single_date_str}"
|
||||||
path_to_rawdata_subfolder = os.path.join(path_to_rawdata_folder, single_campaign_name, "")
|
subfolder_name = f"experimental_step_{single_date_str}"
|
||||||
|
path_to_rawdata_subfolder = os.path.join(path_to_rawdata_folder, subfolder_name, "")
|
||||||
|
|
||||||
path_to_integrated_stepwise_hdf5_file = copy_subtree_and_create_hdf5(
|
path_to_integrated_stepwise_hdf5_file = copy_subtree_and_create_hdf5(
|
||||||
path_to_input_dir, path_to_rawdata_subfolder, select_dir_keywords,
|
path_to_input_dir, path_to_rawdata_subfolder, select_dir_keywords,
|
||||||
file_keywords, allowed_file_extensions, root_metadata_dict)
|
file_keywords, allowed_file_extensions, root_metadata_dict)
|
||||||
|
|
||||||
output_filename_path.append(path_to_integrated_stepwise_hdf5_file)
|
output_filename_path.append(path_to_integrated_stepwise_hdf5_file)
|
||||||
|
|
||||||
# Collection mode processing if specified
|
# Collection mode post-processing
|
||||||
if 'collection' in config_dict.get('integration_mode', 'single_experiment'):
|
if integration_mode == 'collection':
|
||||||
path_to_filenames_dict = {path_to_rawdata_folder: [os.path.basename(path) for path in output_filename_path]} if output_filename_path else {}
|
path_to_filenames_dict = {path_to_rawdata_folder: [os.path.basename(path) for path in output_filename_path]} if output_filename_path else {}
|
||||||
#hdf5_path = hdf5_lib.create_hdf5_file_from_filesystem_path_new(path_to_rawdata_folder, path_to_filenames_dict, [], root_metadata_dict)
|
hdf5_path = hdf5_lib.create_hdf5_file_from_filesystem_path(
|
||||||
hdf5_path = hdf5_lib.create_hdf5_file_from_filesystem_path(path_to_rawdata_folder, path_to_filenames_dict, [], root_metadata_dict)
|
path_to_rawdata_folder, path_to_filenames_dict, [], root_metadata_dict
|
||||||
|
)
|
||||||
output_filename_path.append(hdf5_path)
|
output_filename_path.append(hdf5_path)
|
||||||
else:
|
else:
|
||||||
path_to_integrated_stepwise_hdf5_file = copy_subtree_and_create_hdf5(
|
path_to_integrated_stepwise_hdf5_file = copy_subtree_and_create_hdf5(
|
||||||
path_to_input_dir, path_to_rawdata_folder, select_dir_keywords, [],
|
path_to_input_dir, path_to_rawdata_folder, select_dir_keywords, [],
|
||||||
allowed_file_extensions, root_metadata_dict)
|
allowed_file_extensions, root_metadata_dict)
|
||||||
output_filename_path.append(path_to_integrated_stepwise_hdf5_file)
|
output_filename_path.append(path_to_integrated_stepwise_hdf5_file)
|
||||||
|
|
||||||
return output_filename_path
|
return output_filename_path
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|
||||||
if len(sys.argv) < 2:
|
if len(sys.argv) < 2:
|
||||||
print("Usage: python data_integration.py <function_name> <function_args>")
|
print("Usage: python data_integration.py <function_name> <function_args>")
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
# Extract the function name from the command line arguments
|
|
||||||
function_name = sys.argv[1]
|
function_name = sys.argv[1]
|
||||||
|
|
||||||
# Handle function execution based on the provided function name
|
|
||||||
if function_name == 'run':
|
if function_name == 'run':
|
||||||
|
|
||||||
if len(sys.argv) != 3:
|
if len(sys.argv) != 3:
|
||||||
print("Usage: python data_integration.py run <path_to_config_yamlFile>")
|
print("Usage: python data_integration.py run <path_to_config_yamlFile>")
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
# Extract path to configuration file, specifying the data integration task
|
|
||||||
path_to_config_yamlFile = sys.argv[2]
|
path_to_config_yamlFile = sys.argv[2]
|
||||||
run_pipeline(path_to_config_yamlFile)
|
run_pipeline(path_to_config_yamlFile)
|
||||||
|
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user