Refactored code into functions to parse and validate yaml condif file and to perform specified data integration task using a pipeline like software structure.
This commit is contained in:
@ -9,8 +9,108 @@ import yaml
|
||||
|
||||
import logging
|
||||
from datetime import datetime
|
||||
# Importing chain class from itertools
|
||||
from itertools import chain
|
||||
|
||||
from instruments.readers import filereader_registry
|
||||
|
||||
allowed_file_extensions = filereader_registry.file_extensions
|
||||
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
def load_config_and_setup_logging(yaml_config_file_path, log_dir):
|
||||
"""Load YAML configuration file, set up logging, and validate required keys and datetime_steps."""
|
||||
|
||||
# Define required keys
|
||||
required_keys = [
|
||||
'experiment', 'contact', 'input_file_directory', 'output_file_directory',
|
||||
'instrument_datafolder', 'project', 'actris_level'
|
||||
]
|
||||
|
||||
# Supported integration modes
|
||||
supported_integration_modes = ['collection', 'single_experiment']
|
||||
|
||||
|
||||
# Set up logging
|
||||
date = utils.created_at()
|
||||
utils.setup_logging(log_dir, f"integrate_data_sources_{date}.log")
|
||||
|
||||
# Load YAML configuration file
|
||||
with open(yaml_config_file_path, 'r') as stream:
|
||||
try:
|
||||
config_dict = yaml.load(stream, Loader=yaml.FullLoader)
|
||||
except yaml.YAMLError as exc:
|
||||
logging.error("Error loading YAML file: %s", exc)
|
||||
raise ValueError(f"Failed to load YAML file: {exc}")
|
||||
|
||||
# Check if required keys are present
|
||||
missing_keys = [key for key in required_keys if key not in config_dict]
|
||||
if missing_keys:
|
||||
raise KeyError(f"Missing required keys in YAML configuration: {missing_keys}")
|
||||
|
||||
# Validate integration_mode
|
||||
integration_mode = config_dict.get('integration_mode', 'collection') # Default to 'collection'
|
||||
if integration_mode not in supported_integration_modes:
|
||||
raise ValueError(f"Unsupported integration_mode '{integration_mode}'. Supported modes are {supported_integration_modes}")
|
||||
|
||||
|
||||
# Get dataset start and end dates (optional)
|
||||
dataset_startdate = config_dict.get('dataset_startdate')
|
||||
dataset_enddate = config_dict.get('dataset_enddate')
|
||||
|
||||
# Validate datetime_steps format if it exists
|
||||
if 'datetime_steps' in config_dict:
|
||||
datetime_steps = config_dict['datetime_steps']
|
||||
expected_format = '%Y-%m-%d %H-%M-%S'
|
||||
|
||||
# Check if datetime_steps is a list or a falsy value
|
||||
if datetime_steps and not isinstance(datetime_steps, list):
|
||||
raise TypeError(f"datetime_steps should be a list of strings or a falsy value (None, empty), but got {type(datetime_steps)}")
|
||||
|
||||
for step_idx, step in enumerate(datetime_steps):
|
||||
try:
|
||||
# Attempt to parse the datetime to ensure correct format
|
||||
config_dict['datetime_steps'][step_idx] = datetime.strptime(step, expected_format)
|
||||
except ValueError:
|
||||
raise ValueError(f"Invalid datetime format for '{step}'. Expected format: {expected_format}")
|
||||
|
||||
# Set default dates from datetime_steps if not provided
|
||||
if not dataset_startdate:
|
||||
dataset_startdate = min(config_dict['datetime_steps']).strftime('%Y-%m-%d') # Earliest datetime step
|
||||
|
||||
if not dataset_enddate:
|
||||
dataset_enddate = max(config_dict['datetime_steps']).strftime('%Y-%m-%d') # Latest datetime step
|
||||
else:
|
||||
# If datetime_steps is not present, set the integration mode to 'collection'
|
||||
logging.info("datetime_steps missing, setting integration_mode to 'collection'.")
|
||||
config_dict['integration_mode'] = 'collection'
|
||||
|
||||
# Set sensible defaults using the current date
|
||||
current_date = datetime.now().strftime('%Y-%m-%d')
|
||||
if not dataset_startdate:
|
||||
dataset_startdate = current_date
|
||||
if not dataset_enddate:
|
||||
dataset_enddate = current_date
|
||||
|
||||
config_dict['expected_datetime_format'] = '%Y-%m-%d %H-%M-%S'
|
||||
|
||||
# Add dataset dates to config for use later
|
||||
config_dict['dataset_startdate'] = dataset_startdate
|
||||
config_dict['dataset_enddate'] = dataset_enddate
|
||||
|
||||
return config_dict
|
||||
|
||||
|
||||
def generate_datetime_dict(datetime_steps):
|
||||
""" Generate the datetime augment dictionary from datetime steps. """
|
||||
datetime_augment_dict = {}
|
||||
for datetime_step in datetime_steps:
|
||||
#tmp = datetime.strptime(datetime_step, '%Y-%m-%d %H-%M-%S')
|
||||
datetime_augment_dict[datetime_step] = [
|
||||
datetime_step.strftime('%Y-%m-%d'), datetime_step.strftime('%Y_%m_%d'), datetime_step.strftime('%Y.%m.%d'), datetime_step.strftime('%Y%m%d')
|
||||
]
|
||||
return datetime_augment_dict
|
||||
|
||||
def integrate_data_sources(yaml_config_file_path, log_dir='logs/'):
|
||||
|
||||
@ -21,26 +121,18 @@ def integrate_data_sources(yaml_config_file_path, log_dir='logs/'):
|
||||
log_dir (str): Directory to save the log file.
|
||||
|
||||
Returns:
|
||||
str: Path (or list of Paths) to the created HDF5 file(s).
|
||||
list: List of Paths to the created HDF5 file(s).
|
||||
"""
|
||||
|
||||
date = utils.created_at()
|
||||
utils.setup_logging(log_dir, f"integrate_data_sources_{date}.log")
|
||||
|
||||
with open(yaml_config_file_path,'r') as stream:
|
||||
try:
|
||||
config_dict = yaml.load(stream, Loader=yaml.FullLoader)
|
||||
except yaml.YAMLError as exc:
|
||||
logging.error("Error loading YAML file: %s", exc)
|
||||
raise
|
||||
|
||||
def output_filename(name, date, initials):
|
||||
return f"{name}_{date}_{initials}.h5"
|
||||
|
||||
config_dict = load_config_and_setup_logging(yaml_config_file_path, log_dir)
|
||||
|
||||
exp_campaign_name = config_dict['experiment']
|
||||
initials = config_dict['contact']
|
||||
input_file_dir = config_dict['input_file_directory']
|
||||
output_dir = config_dict['output_file_directory']
|
||||
path_to_input_dir = config_dict['input_file_directory']
|
||||
path_to_output_dir = config_dict['output_file_directory']
|
||||
select_dir_keywords = config_dict['instrument_datafolder']
|
||||
root_metadata_dict = {
|
||||
'project' : config_dict['project'],
|
||||
@ -49,54 +141,73 @@ def integrate_data_sources(yaml_config_file_path, log_dir='logs/'):
|
||||
'actris_level': config_dict['actris_level']
|
||||
}
|
||||
|
||||
def create_hdf5_file(date_str, select_file_keywords,root_metadata):
|
||||
filename = output_filename(exp_campaign_name, date_str, initials)
|
||||
output_path = os.path.join(output_dir, filename)
|
||||
logging.info("Creating HDF5 file at: %s", output_path)
|
||||
|
||||
return hdf5_lib.create_hdf5_file_from_filesystem_path(
|
||||
output_path, input_file_dir, select_dir_keywords, select_file_keywords, root_metadata_dict=root_metadata
|
||||
)
|
||||
# Get dataset start and end dates.
|
||||
dataset_startdate = config_dict['dataset_startdate']
|
||||
dataset_enddate = config_dict['dataset_enddate']
|
||||
|
||||
# Handle datetime_steps and integration mode
|
||||
datetime_augment_dict = {}
|
||||
select_file_keywords = []
|
||||
|
||||
if config_dict.get('datetime_steps'):
|
||||
if 'datetime_steps' in config_dict and config_dict['datetime_steps']:
|
||||
# We augment each datetime step with various datetime formats to compensate for filenaming inconsistencies
|
||||
datetime_augment_dict = generate_datetime_dict(config_dict['datetime_steps'])
|
||||
|
||||
datetime_augment_dict = {}
|
||||
for datetime_step in config_dict['datetime_steps']:
|
||||
tmp = datetime.strptime(datetime_step,'%Y-%m-%d %H-%M-%S') #convert(datetime_step)
|
||||
datetime_augment_dict[tmp] = [tmp.strftime('%Y-%m-%d'),tmp.strftime('%Y_%m_%d'),tmp.strftime('%Y.%m.%d'),tmp.strftime('%Y%m%d')]
|
||||
print(tmp)
|
||||
# Create flattened datetime list of datetime object file keywords
|
||||
select_file_keywords = list(chain.from_iterable(datetime_augment_dict.values()))
|
||||
|
||||
if 'single_experiment' in config_dict['integration_mode']:
|
||||
output_filename_path = []
|
||||
for datetime_step in datetime_augment_dict.keys():
|
||||
date_str = datetime_step.strftime('%Y-%m-%d')
|
||||
select_file_keywords = datetime_augment_dict[datetime_step]
|
||||
|
||||
|
||||
# Determine mode and process accordingly
|
||||
output_filename_path = []
|
||||
|
||||
root_metadata_dict.update({'dataset_startdate': date_str,
|
||||
'dataset_enddate': date_str})
|
||||
dt_step_output_filename_path= create_hdf5_file(date_str, select_file_keywords, root_metadata_dict)
|
||||
output_filename_path.append(dt_step_output_filename_path)
|
||||
|
||||
elif 'collection' in config_dict['integration_mode']:
|
||||
select_file_keywords = []
|
||||
for datetime_step in datetime_augment_dict.keys():
|
||||
select_file_keywords = select_file_keywords + datetime_augment_dict[datetime_step]
|
||||
if 'single_experiment' in config_dict.get('integration_mode', 'collection') and datetime_augment_dict:
|
||||
|
||||
# Single experiment mode
|
||||
for datetime_step in datetime_augment_dict.keys():
|
||||
|
||||
config_dict['dataset_startdate'] = min(datetime_augment_dict.keys())
|
||||
config_dict['dataset_enddate'] = max(datetime_augment_dict.keys())
|
||||
startdate = config_dict['dataset_startdate'].strftime('%Y-%m-%d')
|
||||
enddate = config_dict['dataset_enddate'].strftime('%Y-%m-%d')
|
||||
root_metadata_dict.update({'dataset_startdate': startdate,
|
||||
'dataset_enddate': enddate})
|
||||
date_str = datetime_step.strftime('%Y-%m-%d')
|
||||
select_file_keywords = datetime_augment_dict[datetime_step]
|
||||
|
||||
date_str = f'{startdate}_{enddate}'
|
||||
output_filename_path = create_hdf5_file(date_str, select_file_keywords, root_metadata_dict)
|
||||
else:
|
||||
startdate = config_dict['dataset_startdate']
|
||||
enddate = config_dict['dataset_enddate']
|
||||
root_metadata_dict.update({'dataset_startdate': startdate,
|
||||
'dataset_enddate': enddate})
|
||||
date_str = f'{startdate}_{enddate}'
|
||||
output_filename_path = create_hdf5_file(date_str, select_file_keywords = [], root_metadata = root_metadata_dict)
|
||||
filename = output_filename(exp_campaign_name, initials, date_str)
|
||||
path_to_rawdata_folder = os.path.splitext(os.path.join(path_to_output_dir, filename))[0]
|
||||
|
||||
# Step 1: Search through input directory and make a copy in the output directory that complies with naming constraints
|
||||
path_to_input_dir_filenames_dict = utils.copy_directory_with_contraints(path_to_input_dir,
|
||||
path_to_rawdata_folder,
|
||||
select_dir_keywords,
|
||||
select_file_keywords,
|
||||
allowed_file_extensions)
|
||||
|
||||
# Step 2: Create HDF5 file
|
||||
|
||||
path_to_integrated__stepwise_hdf5_file = hdf5_lib.create_hdf5_file_from_filesystem_path(path_to_rawdata_folder,
|
||||
path_to_input_dir_filenames_dict,
|
||||
select_dir_keywords,
|
||||
root_metadata_dict)
|
||||
output_filename_path.append(path_to_integrated__stepwise_hdf5_file)
|
||||
|
||||
else: # collection
|
||||
|
||||
# Collection mode or no datetime_steps
|
||||
|
||||
date_str = f'{dataset_startdate}_{dataset_enddate}'
|
||||
filename = output_filename(exp_campaign_name, initials, date_str)
|
||||
path_to_rawdata_folder = os.path.splitext(os.path.join(path_to_output_dir, filename))[0]
|
||||
|
||||
logging.info("Creating copy of specified experimental campaign folder at: %s", path_to_output_dir)
|
||||
# Step 1: Search through input directory and make a copy in the output directory that complies with naming constraints
|
||||
path_to_input_dir_filenames_dict = utils.copy_directory_with_contraints(path_to_input_dir,
|
||||
path_to_rawdata_folder,
|
||||
select_dir_keywords,
|
||||
select_file_keywords,
|
||||
allowed_file_extensions)
|
||||
# Step 2: Create HDF5 file
|
||||
logging.info("Creating HDF5 file at: %s", path_to_output_dir)
|
||||
path_to_integrated_hdf5_file = hdf5_lib.create_hdf5_file_from_filesystem_path(path_to_rawdata_folder,
|
||||
path_to_input_dir_filenames_dict,
|
||||
select_dir_keywords,
|
||||
root_metadata_dict)
|
||||
output_filename_path.append(path_to_integrated_hdf5_file )
|
||||
|
||||
return output_filename_path
|
Reference in New Issue
Block a user