243 lines
11 KiB
Python
243 lines
11 KiB
Python
import sys
|
|
import os
|
|
root_dir = os.path.abspath(os.curdir)
|
|
sys.path.append(root_dir)
|
|
|
|
import src.hdf5_lib as hdf5_lib
|
|
import utils.g5505_utils as utils
|
|
import yaml
|
|
|
|
import logging
|
|
from datetime import datetime
|
|
# Importing chain class from itertools
|
|
from itertools import chain
|
|
|
|
from instruments.readers import filereader_registry
|
|
|
|
allowed_file_extensions = filereader_registry.file_extensions
|
|
|
|
|
|
from datetime import datetime
|
|
|
|
def load_config_and_setup_logging(yaml_config_file_path, log_dir):
|
|
"""Load YAML configuration file, set up logging, and validate required keys and datetime_steps."""
|
|
|
|
# Define required keys
|
|
required_keys = [
|
|
'experiment', 'contact', 'input_file_directory', 'output_file_directory',
|
|
'instrument_datafolder', 'project', 'actris_level'
|
|
]
|
|
|
|
# Supported integration modes
|
|
supported_integration_modes = ['collection', 'single_experiment']
|
|
|
|
|
|
# Set up logging
|
|
date = utils.created_at().replace(":", "-")
|
|
utils.setup_logging(log_dir, f"integrate_data_sources_{date}.log")
|
|
|
|
# Load YAML configuration file
|
|
with open(yaml_config_file_path, 'r') as stream:
|
|
try:
|
|
config_dict = yaml.load(stream, Loader=yaml.FullLoader)
|
|
except yaml.YAMLError as exc:
|
|
logging.error("Error loading YAML file: %s", exc)
|
|
raise ValueError(f"Failed to load YAML file: {exc}")
|
|
|
|
# Check if required keys are present
|
|
missing_keys = [key for key in required_keys if key not in config_dict]
|
|
if missing_keys:
|
|
raise KeyError(f"Missing required keys in YAML configuration: {missing_keys}")
|
|
|
|
# Validate integration_mode
|
|
integration_mode = config_dict.get('integration_mode', 'collection') # Default to 'collection'
|
|
if integration_mode not in supported_integration_modes:
|
|
raise ValueError(f"Unsupported integration_mode '{integration_mode}'. Supported modes are {supported_integration_modes}")
|
|
|
|
|
|
# Get dataset start and end dates (optional)
|
|
dataset_startdate = config_dict.get('dataset_startdate')
|
|
dataset_enddate = config_dict.get('dataset_enddate')
|
|
|
|
# Validate datetime_steps format if it exists
|
|
if 'datetime_steps' in config_dict:
|
|
datetime_steps = config_dict['datetime_steps']
|
|
expected_format = '%Y-%m-%d %H-%M-%S'
|
|
|
|
# Check if datetime_steps is a list or a falsy value
|
|
if datetime_steps and not isinstance(datetime_steps, list):
|
|
raise TypeError(f"datetime_steps should be a list of strings or a falsy value (None, empty), but got {type(datetime_steps)}")
|
|
|
|
for step_idx, step in enumerate(datetime_steps):
|
|
try:
|
|
# Attempt to parse the datetime to ensure correct format
|
|
config_dict['datetime_steps'][step_idx] = datetime.strptime(step, expected_format)
|
|
except ValueError:
|
|
raise ValueError(f"Invalid datetime format for '{step}'. Expected format: {expected_format}")
|
|
|
|
# Set default dates from datetime_steps if not provided
|
|
if not dataset_startdate:
|
|
dataset_startdate = min(config_dict['datetime_steps']).strftime('%Y-%m-%d') # Earliest datetime step
|
|
|
|
if not dataset_enddate:
|
|
dataset_enddate = max(config_dict['datetime_steps']).strftime('%Y-%m-%d') # Latest datetime step
|
|
else:
|
|
# If datetime_steps is not present, set the integration mode to 'collection'
|
|
logging.info("datetime_steps missing, setting integration_mode to 'collection'.")
|
|
config_dict['integration_mode'] = 'collection'
|
|
|
|
# Set sensible defaults using the current date
|
|
current_date = datetime.now().strftime('%Y-%m-%d')
|
|
if not dataset_startdate:
|
|
dataset_startdate = current_date
|
|
if not dataset_enddate:
|
|
dataset_enddate = current_date
|
|
|
|
config_dict['expected_datetime_format'] = '%Y-%m-%d %H-%M-%S'
|
|
|
|
# Add dataset dates to config for use later
|
|
config_dict['dataset_startdate'] = dataset_startdate
|
|
config_dict['dataset_enddate'] = dataset_enddate
|
|
|
|
return config_dict
|
|
|
|
|
|
def generate_datetime_dict(datetime_steps):
|
|
""" Generate the datetime augment dictionary from datetime steps. """
|
|
datetime_augment_dict = {}
|
|
for datetime_step in datetime_steps:
|
|
#tmp = datetime.strptime(datetime_step, '%Y-%m-%d %H-%M-%S')
|
|
datetime_augment_dict[datetime_step] = [
|
|
datetime_step.strftime('%Y-%m-%d'), datetime_step.strftime('%Y_%m_%d'), datetime_step.strftime('%Y.%m.%d'), datetime_step.strftime('%Y%m%d')
|
|
]
|
|
return datetime_augment_dict
|
|
|
|
def integrate_data_sources(yaml_config_file_path, log_dir='logs/'):
|
|
|
|
""" Integrates data sources specified by the input configuration file into HDF5 files.
|
|
|
|
Parameters:
|
|
yaml_config_file_path (str): Path to the YAML configuration file.
|
|
log_dir (str): Directory to save the log file.
|
|
|
|
Returns:
|
|
list: List of Paths to the created HDF5 file(s).
|
|
"""
|
|
|
|
config_dict = load_config_and_setup_logging(yaml_config_file_path, log_dir)
|
|
|
|
exp_campaign_name = config_dict['experiment']
|
|
initials = config_dict['contact']
|
|
path_to_input_dir = config_dict['input_file_directory']
|
|
path_to_output_dir = config_dict['output_file_directory']
|
|
select_dir_keywords = config_dict['instrument_datafolder']
|
|
root_metadata_dict = {
|
|
'project' : config_dict['project'],
|
|
'experiment' : config_dict['experiment'],
|
|
'contact' : config_dict['contact'],
|
|
'actris_level': config_dict['actris_level']
|
|
}
|
|
|
|
# Get dataset start and end dates.
|
|
dataset_startdate = config_dict['dataset_startdate']
|
|
dataset_enddate = config_dict['dataset_enddate']
|
|
|
|
# Handle datetime_steps and integration mode
|
|
datetime_augment_dict = {}
|
|
select_file_keywords = []
|
|
|
|
if 'datetime_steps' in config_dict and config_dict['datetime_steps']:
|
|
# We augment each datetime step with various datetime formats to compensate for filenaming inconsistencies
|
|
datetime_augment_dict = generate_datetime_dict(config_dict['datetime_steps'])
|
|
|
|
# Create flattened datetime list of datetime object file keywords
|
|
select_file_keywords = list(chain.from_iterable(datetime_augment_dict.values()))
|
|
|
|
|
|
|
|
# Determine mode and process accordingly
|
|
output_filename_path = []
|
|
|
|
campaign_name_template = lambda name, date, initials : f"{name}_{date}_{initials}"
|
|
|
|
date_str = f'{dataset_startdate}_{dataset_enddate}'
|
|
# Create path to new rawdata subfolder and standardize it with forward-slashes as required by pipeline
|
|
path_to_rawdata_folder = os.path.join(path_to_output_dir, 'collection_' + campaign_name_template(exp_campaign_name, initials, date_str),"").replace(os.sep,'/')
|
|
|
|
if datetime_augment_dict:
|
|
|
|
# Single experiment mode
|
|
for datetime_step in datetime_augment_dict.keys():
|
|
|
|
date_str = datetime_step.strftime('%Y-%m-%d')
|
|
select_file_keywords = datetime_augment_dict[datetime_step]
|
|
|
|
single_campaign_name = campaign_name_template(exp_campaign_name, initials, date_str)
|
|
|
|
# Create path to new rawdata subfolder and standardize it with forward slashes as required by pipeline
|
|
path_to_rawdata_subfolder = os.path.join(path_to_rawdata_folder, single_campaign_name,"").replace(os.sep,'/')
|
|
|
|
# Run two step data integration pipeline on specified experimental campaign data collection
|
|
path_to_integrated_stepwise_hdf5_file = dima_pipeline(path_to_input_dir,
|
|
path_to_rawdata_subfolder ,
|
|
select_dir_keywords,
|
|
select_file_keywords,
|
|
root_metadata_dict)
|
|
|
|
output_filename_path.append(path_to_integrated_stepwise_hdf5_file)
|
|
|
|
if 'collection' in config_dict.get('integration_mode', 'single_experiment'):
|
|
#date_str = f'{dataset_startdate}_{dataset_enddate}'
|
|
path_to_filenames_dict = {}
|
|
path_to_filenames_dict[path_to_rawdata_folder] = []
|
|
for path in output_filename_path:
|
|
path_to_file, filename = os.path.split(path) # path_to_file should be same as path_to_rawdata_folder
|
|
path_to_filenames_dict[path_to_rawdata_folder].append(filename)
|
|
|
|
path_to_integrated_hdf5_file = hdf5_lib.create_hdf5_file_from_filesystem_path(path_to_rawdata_folder,
|
|
path_to_filenames_dict,
|
|
[],
|
|
root_metadata_dict)
|
|
|
|
output_filename_path.append(path_to_integrated_hdf5_file)
|
|
|
|
else: # collection
|
|
path_to_integrated_hdf5_file = dima_pipeline(path_to_input_dir,
|
|
path_to_rawdata_folder,
|
|
select_dir_keywords,
|
|
select_file_keywords,
|
|
root_metadata_dict)
|
|
|
|
|
|
output_filename_path.append(path_to_integrated_hdf5_file)
|
|
|
|
return output_filename_path
|
|
|
|
def dima_pipeline(path_to_input_dir, path_to_rawdata_folder, select_dir_keywords, select_file_keywords, root_metadata_dict = {}):
|
|
# Possibly replace below line with os.path.join(path_to_rawdata_folder,'..')
|
|
path_to_output_dir = os.path.join(path_to_rawdata_folder,'..')
|
|
path_to_output_dir = os.path.normpath(path_to_output_dir)
|
|
|
|
logging.info("Creating copy of the experimental campaign folder at: %s", path_to_output_dir)
|
|
|
|
# Step 1: Search through input directory and make a copy in the output directory that complies with naming constraints
|
|
path_to_input_dir_filenames_dict = utils.copy_directory_with_contraints(path_to_input_dir,
|
|
path_to_rawdata_folder,
|
|
select_dir_keywords,
|
|
select_file_keywords,
|
|
allowed_file_extensions)
|
|
logging.info("Finished creating a copy of the experimental campaign folder at: %s", path_to_output_dir)
|
|
|
|
# Step 2: Create HDF5 file
|
|
logging.info("Creating HDF5 file at: %s", path_to_output_dir)
|
|
|
|
path_to_integrated_hdf5_file = hdf5_lib.create_hdf5_file_from_filesystem_path(path_to_rawdata_folder,
|
|
path_to_input_dir_filenames_dict,
|
|
select_dir_keywords,
|
|
root_metadata_dict)
|
|
logging.info("Completed creation of HDF5 file %s for the specified experimental campaign at: %s",
|
|
path_to_integrated_hdf5_file, path_to_output_dir)
|
|
|
|
return path_to_integrated_hdf5_file
|
|
|