diff --git a/pipelines/data_integration.py b/pipelines/data_integration.py index f0c6e5e..a27b611 100644 --- a/pipelines/data_integration.py +++ b/pipelines/data_integration.py @@ -99,6 +99,27 @@ def load_config_and_setup_logging(yaml_config_file_path, log_dir): config_dict['dataset_startdate'] = dataset_startdate config_dict['dataset_enddate'] = dataset_enddate + # Validate filename_format if defined + if 'filename_format' in config_dict: + + if not isinstance(config_dict['filename_format'], str): + raise ValueError(f'"Specified filename_format needs to be of String type" ') + + # Split the string and check if each key exists in config_dict + keys = [key.strip() for key in config_dict['filename_format'].split(',')] + missing_keys = [key for key in keys if key not in config_dict] + + # If there are any missing keys, raise an assertion error + # assert not missing_keys, f'Missing key(s) in config_dict: {", ".join(missing_keys)}' + if not missing_keys: + config_dict['filename_format'] = ','.join(keys) + else: + config_dict['filename_format'] = None + print(f'"filename_format" should contain comma-separated keys that match existing keys in the YAML config file.') + print('Setting "filename_format" as None') + else: + config_dict['filename_format'] = None + return config_dict @@ -154,15 +175,20 @@ def integrate_data_sources(yaml_config_file_path, log_dir='logs/'): select_file_keywords = list(chain.from_iterable(datetime_augment_dict.values())) + if not config_dict['filename_format']: # if config_dict['filename_format'] == None + filename_parts = [exp_campaign_name,initials] + else: + filename_parts = [config_dict[key] for key in config_dict['filename_format'].split(',')] # Determine mode and process accordingly output_filename_path = [] campaign_name_template = lambda name, date, initials : f"{name}_{date}_{initials}" + campaign_name_template = lambda filename_parts, suffix : '_'.join(filename_parts+[suffix]) date_str = f'{dataset_startdate}_{dataset_enddate}' # Create path to new rawdata subfolder and standardize it with forward-slashes as required by pipeline - path_to_rawdata_folder = os.path.join(path_to_output_dir, 'collection_' + campaign_name_template(exp_campaign_name, initials, date_str),"").replace(os.sep,'/') + path_to_rawdata_folder = os.path.join(path_to_output_dir, 'collection_' + campaign_name_template(filename_parts, date_str),"").replace(os.sep,'/') if datetime_augment_dict: @@ -172,7 +198,7 @@ def integrate_data_sources(yaml_config_file_path, log_dir='logs/'): date_str = datetime_step.strftime('%Y-%m-%d') select_file_keywords = datetime_augment_dict[datetime_step] - single_campaign_name = campaign_name_template(exp_campaign_name, initials, date_str) + single_campaign_name = campaign_name_template(filename_parts, date_str) #campaign_name_template(exp_campaign_name, initials, date_str) # Create path to new rawdata subfolder and standardize it with forward slashes as required by pipeline path_to_rawdata_subfolder = os.path.join(path_to_rawdata_folder, single_campaign_name,"").replace(os.sep,'/')