import sys import os root_dir = os.path.abspath(os.curdir) sys.path.append(root_dir) import src.hdf5_lib as hdf5_lib import utils.g5505_utils as utils import yaml import logging from datetime import datetime # Importing chain class from itertools from itertools import chain from instruments.readers import filereader_registry allowed_file_extensions = filereader_registry.file_extensions from datetime import datetime def load_config_and_setup_logging(yaml_config_file_path, log_dir): """Load YAML configuration file, set up logging, and validate required keys and datetime_steps.""" # Define required keys required_keys = [ 'experiment', 'contact', 'input_file_directory', 'output_file_directory', 'instrument_datafolder', 'project', 'actris_level' ] # Supported integration modes supported_integration_modes = ['collection', 'single_experiment'] # Set up logging date = utils.created_at().replace(":", "-") utils.setup_logging(log_dir, f"integrate_data_sources_{date}.log") # Load YAML configuration file with open(yaml_config_file_path, 'r') as stream: try: config_dict = yaml.load(stream, Loader=yaml.FullLoader) except yaml.YAMLError as exc: logging.error("Error loading YAML file: %s", exc) raise ValueError(f"Failed to load YAML file: {exc}") # Check if required keys are present missing_keys = [key for key in required_keys if key not in config_dict] if missing_keys: raise KeyError(f"Missing required keys in YAML configuration: {missing_keys}") # Validate integration_mode integration_mode = config_dict.get('integration_mode', 'collection') # Default to 'collection' if integration_mode not in supported_integration_modes: raise ValueError(f"Unsupported integration_mode '{integration_mode}'. Supported modes are {supported_integration_modes}") # Get dataset start and end dates (optional) dataset_startdate = config_dict.get('dataset_startdate') dataset_enddate = config_dict.get('dataset_enddate') # Validate datetime_steps format if it exists if 'datetime_steps' in config_dict: datetime_steps = config_dict['datetime_steps'] expected_format = '%Y-%m-%d %H-%M-%S' # Check if datetime_steps is a list or a falsy value if datetime_steps and not isinstance(datetime_steps, list): raise TypeError(f"datetime_steps should be a list of strings or a falsy value (None, empty), but got {type(datetime_steps)}") for step_idx, step in enumerate(datetime_steps): try: # Attempt to parse the datetime to ensure correct format config_dict['datetime_steps'][step_idx] = datetime.strptime(step, expected_format) except ValueError: raise ValueError(f"Invalid datetime format for '{step}'. Expected format: {expected_format}") # Set default dates from datetime_steps if not provided if not dataset_startdate: dataset_startdate = min(config_dict['datetime_steps']).strftime('%Y-%m-%d') # Earliest datetime step if not dataset_enddate: dataset_enddate = max(config_dict['datetime_steps']).strftime('%Y-%m-%d') # Latest datetime step else: # If datetime_steps is not present, set the integration mode to 'collection' logging.info("datetime_steps missing, setting integration_mode to 'collection'.") config_dict['integration_mode'] = 'collection' # Set sensible defaults using the current date current_date = datetime.now().strftime('%Y-%m-%d') if not dataset_startdate: dataset_startdate = current_date if not dataset_enddate: dataset_enddate = current_date config_dict['expected_datetime_format'] = '%Y-%m-%d %H-%M-%S' # Add dataset dates to config for use later config_dict['dataset_startdate'] = dataset_startdate config_dict['dataset_enddate'] = dataset_enddate # Validate filename_format if defined if 'filename_format' in config_dict: if not isinstance(config_dict['filename_format'], str): raise ValueError(f'"Specified filename_format needs to be of String type" ') # Split the string and check if each key exists in config_dict keys = [key.strip() for key in config_dict['filename_format'].split(',')] missing_keys = [key for key in keys if key not in config_dict] # If there are any missing keys, raise an assertion error # assert not missing_keys, f'Missing key(s) in config_dict: {", ".join(missing_keys)}' if not missing_keys: config_dict['filename_format'] = ','.join(keys) else: config_dict['filename_format'] = None print(f'"filename_format" should contain comma-separated keys that match existing keys in the YAML config file.') print('Setting "filename_format" as None') else: config_dict['filename_format'] = None return config_dict def generate_datetime_dict(datetime_steps): """ Generate the datetime augment dictionary from datetime steps. """ datetime_augment_dict = {} for datetime_step in datetime_steps: #tmp = datetime.strptime(datetime_step, '%Y-%m-%d %H-%M-%S') datetime_augment_dict[datetime_step] = [ datetime_step.strftime('%Y-%m-%d'), datetime_step.strftime('%Y_%m_%d'), datetime_step.strftime('%Y.%m.%d'), datetime_step.strftime('%Y%m%d') ] return datetime_augment_dict def integrate_data_sources(yaml_config_file_path, log_dir='logs/'): """ Integrates data sources specified by the input configuration file into HDF5 files. Parameters: yaml_config_file_path (str): Path to the YAML configuration file. log_dir (str): Directory to save the log file. Returns: list: List of Paths to the created HDF5 file(s). """ config_dict = load_config_and_setup_logging(yaml_config_file_path, log_dir) exp_campaign_name = config_dict['experiment'] initials = config_dict['contact'] path_to_input_dir = config_dict['input_file_directory'] path_to_output_dir = config_dict['output_file_directory'] select_dir_keywords = config_dict['instrument_datafolder'] root_metadata_dict = { 'project' : config_dict['project'], 'experiment' : config_dict['experiment'], 'contact' : config_dict['contact'], 'actris_level': config_dict['actris_level'] } # Get dataset start and end dates. dataset_startdate = config_dict['dataset_startdate'] dataset_enddate = config_dict['dataset_enddate'] # Handle datetime_steps and integration mode datetime_augment_dict = {} select_file_keywords = [] if 'datetime_steps' in config_dict and config_dict['datetime_steps']: # We augment each datetime step with various datetime formats to compensate for filenaming inconsistencies datetime_augment_dict = generate_datetime_dict(config_dict['datetime_steps']) # Create flattened datetime list of datetime object file keywords select_file_keywords = list(chain.from_iterable(datetime_augment_dict.values())) if not config_dict['filename_format']: # if config_dict['filename_format'] == None filename_parts = [exp_campaign_name,initials] else: filename_parts = [config_dict[key] for key in config_dict['filename_format'].split(',')] # Determine mode and process accordingly output_filename_path = [] campaign_name_template = lambda name, date, initials : f"{name}_{date}_{initials}" campaign_name_template = lambda filename_parts, suffix : '_'.join(filename_parts+[suffix]) date_str = f'{dataset_startdate}_{dataset_enddate}' # Create path to new rawdata subfolder and standardize it with forward-slashes as required by pipeline path_to_rawdata_folder = os.path.join(path_to_output_dir, 'collection_' + campaign_name_template(filename_parts, date_str),"").replace(os.sep,'/') if datetime_augment_dict: # Single experiment mode for datetime_step in datetime_augment_dict.keys(): date_str = datetime_step.strftime('%Y-%m-%d') select_file_keywords = datetime_augment_dict[datetime_step] single_campaign_name = campaign_name_template(filename_parts, date_str) #campaign_name_template(exp_campaign_name, initials, date_str) # Create path to new rawdata subfolder and standardize it with forward slashes as required by pipeline path_to_rawdata_subfolder = os.path.join(path_to_rawdata_folder, single_campaign_name,"").replace(os.sep,'/') # Run two step data integration pipeline on specified experimental campaign data collection path_to_integrated_stepwise_hdf5_file = dima_pipeline(path_to_input_dir, path_to_rawdata_subfolder , select_dir_keywords, select_file_keywords, root_metadata_dict) output_filename_path.append(path_to_integrated_stepwise_hdf5_file) if 'collection' in config_dict.get('integration_mode', 'single_experiment'): #date_str = f'{dataset_startdate}_{dataset_enddate}' path_to_filenames_dict = {} path_to_filenames_dict[path_to_rawdata_folder] = [] for path in output_filename_path: path_to_file, filename = os.path.split(path) # path_to_file should be same as path_to_rawdata_folder path_to_filenames_dict[path_to_rawdata_folder].append(filename) path_to_integrated_hdf5_file = hdf5_lib.create_hdf5_file_from_filesystem_path(path_to_rawdata_folder, path_to_filenames_dict, [], root_metadata_dict) output_filename_path.append(path_to_integrated_hdf5_file) else: # collection path_to_integrated_hdf5_file = dima_pipeline(path_to_input_dir, path_to_rawdata_folder, select_dir_keywords, select_file_keywords, root_metadata_dict) output_filename_path.append(path_to_integrated_hdf5_file) return output_filename_path def dima_pipeline(path_to_input_dir, path_to_rawdata_folder, select_dir_keywords, select_file_keywords, root_metadata_dict = {}): # Possibly replace below line with os.path.join(path_to_rawdata_folder,'..') path_to_output_dir = os.path.join(path_to_rawdata_folder,'..') path_to_output_dir = os.path.normpath(path_to_output_dir) logging.info("Creating copy of the experimental campaign folder at: %s", path_to_output_dir) # Step 1: Search through input directory and make a copy in the output directory that complies with naming constraints path_to_input_dir_filenames_dict = utils.copy_directory_with_contraints(path_to_input_dir, path_to_rawdata_folder, select_dir_keywords, select_file_keywords, allowed_file_extensions) logging.info("Finished creating a copy of the experimental campaign folder at: %s", path_to_output_dir) # Step 2: Create HDF5 file logging.info("Creating HDF5 file at: %s", path_to_output_dir) path_to_integrated_hdf5_file = hdf5_lib.create_hdf5_file_from_filesystem_path(path_to_rawdata_folder, path_to_input_dir_filenames_dict, select_dir_keywords, root_metadata_dict) logging.info("Completed creation of HDF5 file %s for the specified experimental campaign at: %s", path_to_integrated_hdf5_file, path_to_output_dir) return path_to_integrated_hdf5_file