Abstracted reusable steps in integration_sources as dima_pipeline.py and added functionality to make a collection of hdf5 files, where each represents an single experiment of campaign.

This commit is contained in:
2024-09-25 15:23:23 +02:00
parent 1e1499c28a
commit df2f7b3de6

View File

@ -123,10 +123,7 @@ def integrate_data_sources(yaml_config_file_path, log_dir='logs/'):
Returns:
list: List of Paths to the created HDF5 file(s).
"""
def output_filename(name, date, initials):
return f"{name}_{date}_{initials}.h5"
config_dict = load_config_and_setup_logging(yaml_config_file_path, log_dir)
exp_campaign_name = config_dict['experiment']
@ -161,53 +158,85 @@ def integrate_data_sources(yaml_config_file_path, log_dir='logs/'):
# Determine mode and process accordingly
output_filename_path = []
if 'single_experiment' in config_dict.get('integration_mode', 'collection') and datetime_augment_dict:
# Single experiment mode
for datetime_step in datetime_augment_dict.keys():
campaign_name_template = lambda name, date, initials : f"{name}_{date}_{initials}"
date_str = f'{dataset_startdate}_{dataset_enddate}'
# Create path to new rawdata subfolder and standardize it with forward-slashes as required by pipeline
path_to_rawdata_folder = os.path.join(path_to_output_dir, 'collection_' + campaign_name_template(exp_campaign_name, initials, date_str),"").replace(os.sep,'/')
if datetime_augment_dict:
# Single experiment mode
for datetime_step in datetime_augment_dict.keys():
date_str = datetime_step.strftime('%Y-%m-%d')
select_file_keywords = datetime_augment_dict[datetime_step]
filename = output_filename(exp_campaign_name, initials, date_str)
path_to_rawdata_folder = os.path.splitext(os.path.join(path_to_output_dir, filename))[0]
single_campaign_name = campaign_name_template(exp_campaign_name, initials, date_str)
# Step 1: Search through input directory and make a copy in the output directory that complies with naming constraints
path_to_input_dir_filenames_dict = utils.copy_directory_with_contraints(path_to_input_dir,
path_to_rawdata_folder,
# Create path to new rawdata subfolder and standardize it with forward slashes as required by pipeline
path_to_rawdata_subfolder = os.path.join(path_to_rawdata_folder, single_campaign_name,"").replace(os.sep,'/')
# Run two step data integration pipeline on specified experimental campaign data collection
path_to_integrated_stepwise_hdf5_file = dima_pipeline(path_to_input_dir,
path_to_rawdata_subfolder ,
select_dir_keywords,
select_file_keywords,
allowed_file_extensions)
root_metadata_dict)
# Step 2: Create HDF5 file
output_filename_path.append(path_to_integrated_stepwise_hdf5_file)
path_to_integrated__stepwise_hdf5_file = hdf5_lib.create_hdf5_file_from_filesystem_path(path_to_rawdata_folder,
path_to_input_dir_filenames_dict,
select_dir_keywords,
root_metadata_dict)
output_filename_path.append(path_to_integrated__stepwise_hdf5_file)
if 'collection' in config_dict.get('integration_mode', 'single_experiment'):
#date_str = f'{dataset_startdate}_{dataset_enddate}'
path_to_filenames_dict = {}
path_to_filenames_dict[path_to_rawdata_folder] = []
for path in output_filename_path:
path_to_file, filename = os.path.split(path) # path_to_file should be same as path_to_rawdata_folder
path_to_filenames_dict[path_to_rawdata_folder].append(filename)
path_to_integrated_hdf5_file = hdf5_lib.create_hdf5_file_from_filesystem_path(path_to_rawdata_folder,
path_to_filenames_dict,
[],
root_metadata_dict)
else: # collection
output_filename_path.append(path_to_integrated_hdf5_file)
# Collection mode or no datetime_steps
else: # collection
path_to_integrated_hdf5_file = dima_pipeline(path_to_input_dir,
path_to_rawdata_folder,
select_dir_keywords,
select_file_keywords,
root_metadata_dict)
date_str = f'{dataset_startdate}_{dataset_enddate}'
filename = output_filename(exp_campaign_name, initials, date_str)
path_to_rawdata_folder = os.path.splitext(os.path.join(path_to_output_dir, filename))[0]
logging.info("Creating copy of specified experimental campaign folder at: %s", path_to_output_dir)
# Step 1: Search through input directory and make a copy in the output directory that complies with naming constraints
path_to_input_dir_filenames_dict = utils.copy_directory_with_contraints(path_to_input_dir,
path_to_rawdata_folder,
select_dir_keywords,
select_file_keywords,
allowed_file_extensions)
# Step 2: Create HDF5 file
logging.info("Creating HDF5 file at: %s", path_to_output_dir)
path_to_integrated_hdf5_file = hdf5_lib.create_hdf5_file_from_filesystem_path(path_to_rawdata_folder,
path_to_input_dir_filenames_dict,
select_dir_keywords,
root_metadata_dict)
output_filename_path.append(path_to_integrated_hdf5_file )
output_filename_path.append(path_to_integrated_hdf5_file)
return output_filename_path
return output_filename_path
def dima_pipeline(path_to_input_dir, path_to_rawdata_folder, select_dir_keywords, select_file_keywords, root_metadata_dict = {}):
# Possibly replace below line with os.path.join(path_to_rawdata_folder,'..')
path_to_output_dir = os.path.join(path_to_rawdata_folder,'..')
path_to_output_dir = os.path.normpath(path_to_output_dir)
logging.info("Creating copy of the experimental campaign folder at: %s", path_to_output_dir)
# Step 1: Search through input directory and make a copy in the output directory that complies with naming constraints
path_to_input_dir_filenames_dict = utils.copy_directory_with_contraints(path_to_input_dir,
path_to_rawdata_folder,
select_dir_keywords,
select_file_keywords,
allowed_file_extensions)
logging.info("Finished creating a copy of the experimental campaign folder at: %s", path_to_output_dir)
# Step 2: Create HDF5 file
logging.info("Creating HDF5 file at: %s", path_to_output_dir)
path_to_integrated_hdf5_file = hdf5_lib.create_hdf5_file_from_filesystem_path(path_to_rawdata_folder,
path_to_input_dir_filenames_dict,
select_dir_keywords,
root_metadata_dict)
logging.info("Completed creation of HDF5 file %s for the specified experimental campaign at: %s",
path_to_integrated_hdf5_file, path_to_output_dir)
return path_to_integrated_hdf5_file