Major code refactoring and simplifications to enhance modularity. Included a command line interface.

This commit is contained in:
2024-11-01 09:52:41 +01:00
parent 510683a50d
commit 8d17bf267c

View File

@ -3,21 +3,29 @@ import os
root_dir = os.path.abspath(os.curdir)
sys.path.append(root_dir)
import src.hdf5_writer as hdf5_lib
import utils.g5505_utils as utils
import yaml
import yaml
import logging
from datetime import datetime
# Importing chain class from itertools
from itertools import chain
# Import DIMA modules
import src.hdf5_writer as hdf5_lib
import utils.g5505_utils as utils
from instruments.readers import filereader_registry
allowed_file_extensions = filereader_registry.file_extensions
from datetime import datetime
def _generate_datetime_dict(datetime_steps):
""" Generate the datetime augment dictionary from datetime steps. """
datetime_augment_dict = {}
for datetime_step in datetime_steps:
#tmp = datetime.strptime(datetime_step, '%Y-%m-%d %H-%M-%S')
datetime_augment_dict[datetime_step] = [
datetime_step.strftime('%Y-%m-%d'), datetime_step.strftime('%Y_%m_%d'), datetime_step.strftime('%Y.%m.%d'), datetime_step.strftime('%Y%m%d')
]
return datetime_augment_dict
def load_config_and_setup_logging(yaml_config_file_path, log_dir):
"""Load YAML configuration file, set up logging, and validate required keys and datetime_steps."""
@ -50,15 +58,13 @@ def load_config_and_setup_logging(yaml_config_file_path, log_dir):
raise KeyError(f"Missing required keys in YAML configuration: {missing_keys}")
# Validate integration_mode
integration_mode = config_dict.get('integration_mode', 'collection') # Default to 'collection'
integration_mode = config_dict.get('integration_mode', 'N/A') # Default to 'collection'
if integration_mode not in supported_integration_modes:
raise ValueError(f"Unsupported integration_mode '{integration_mode}'. Supported modes are {supported_integration_modes}")
raise RuntimeWarning(
f"Unsupported integration_mode '{integration_mode}'. Supported modes are {supported_integration_modes}. Setting '{integration_mode}' to 'single_experiment'."
)
# Get dataset start and end dates (optional)
dataset_startdate = config_dict.get('dataset_startdate')
dataset_enddate = config_dict.get('dataset_enddate')
# Validate datetime_steps format if it exists
if 'datetime_steps' in config_dict:
datetime_steps = config_dict['datetime_steps']
@ -74,34 +80,15 @@ def load_config_and_setup_logging(yaml_config_file_path, log_dir):
config_dict['datetime_steps'][step_idx] = datetime.strptime(step, expected_format)
except ValueError:
raise ValueError(f"Invalid datetime format for '{step}'. Expected format: {expected_format}")
# Set default dates from datetime_steps if not provided
if not dataset_startdate:
dataset_startdate = min(config_dict['datetime_steps']).strftime('%Y-%m-%d') # Earliest datetime step
if not dataset_enddate:
dataset_enddate = max(config_dict['datetime_steps']).strftime('%Y-%m-%d') # Latest datetime step
# Augment datatime_steps list as a dictionary. This to speed up single-experiment file generation
config_dict['datetime_steps_dict'] = _generate_datetime_dict(datetime_steps)
else:
# If datetime_steps is not present, set the integration mode to 'collection'
logging.info("datetime_steps missing, setting integration_mode to 'collection'.")
config_dict['integration_mode'] = 'collection'
# Set sensible defaults using the current date
current_date = datetime.now().strftime('%Y-%m-%d')
if not dataset_startdate:
dataset_startdate = current_date
if not dataset_enddate:
dataset_enddate = current_date
config_dict['expected_datetime_format'] = '%Y-%m-%d %H-%M-%S'
# Add dataset dates to config for use later
config_dict['dataset_startdate'] = dataset_startdate
config_dict['dataset_enddate'] = dataset_enddate
# Validate filename_format if defined
if 'filename_format' in config_dict:
if not isinstance(config_dict['filename_format'], str):
raise ValueError(f'"Specified filename_format needs to be of String type" ')
@ -120,22 +107,50 @@ def load_config_and_setup_logging(yaml_config_file_path, log_dir):
else:
config_dict['filename_format'] = None
# Compute complementary metadata elements
# Create output filename prefix
if not config_dict['filename_format']: # default behavior
config_dict['filename_prefix'] = '_'.join([config_dict[key] for key in ['experiment', 'contact']])
else:
config_dict['filename_prefix'] = '_'.join([config_dict[key] for key in config_dict['filename_format'].split(sep=',')])
# Set default dates from datetime_steps if not provided
current_date = datetime.now().strftime('%Y-%m-%d')
dates = config_dict.get('datetime_steps',[])
if not config_dict.get('dataset_startdate'):
config_dict['dataset_startdate'] = min(config_dict['datetime_steps']).strftime('%Y-%m-%d') if dates else current_date # Earliest datetime step
if not config_dict.get('dataset_enddate'):
config_dict['dataset_enddate'] = max(config_dict['datetime_steps']).strftime('%Y-%m-%d') if dates else current_date # Latest datetime step
config_dict['expected_datetime_format'] = '%Y-%m-%d %H-%M-%S'
return config_dict
def generate_datetime_dict(datetime_steps):
""" Generate the datetime augment dictionary from datetime steps. """
datetime_augment_dict = {}
for datetime_step in datetime_steps:
#tmp = datetime.strptime(datetime_step, '%Y-%m-%d %H-%M-%S')
datetime_augment_dict[datetime_step] = [
datetime_step.strftime('%Y-%m-%d'), datetime_step.strftime('%Y_%m_%d'), datetime_step.strftime('%Y.%m.%d'), datetime_step.strftime('%Y%m%d')
]
return datetime_augment_dict
def copy_subtree_and_create_hdf5(src, dst, select_dir_keywords, select_file_keywords, allowed_file_extensions, root_metadata_dict):
def integrate_data_sources(yaml_config_file_path, log_dir='logs/'):
"""Helper function to copy directory with constraints and create HDF5."""
src = src.replace(os.sep,'/')
dst = dst.replace(os.sep,'/')
logging.info("Creating constrained copy of the experimental campaign folder %s at: %s", src, dst)
""" Integrates data sources specified by the input configuration file into HDF5 files.
path_to_files_dict = utils.copy_directory_with_contraints(src, dst, select_dir_keywords, select_file_keywords, allowed_file_extensions)
logging.info("Finished creating a copy of the experimental campaign folder tree at: %s", dst)
logging.info("Creating HDF5 file at: %s", dst)
hdf5_path = hdf5_lib.create_hdf5_file_from_filesystem_path(dst, path_to_files_dict, select_dir_keywords, root_metadata_dict)
logging.info("Completed creation of HDF5 file %s at: %s", hdf5_path, dst)
return hdf5_path
def run_pipeline(path_to_config_yamlFile, log_dir='logs/'):
"""Integrates data sources specified by the input configuration file into HDF5 files.
Parameters:
yaml_config_file_path (str): Path to the YAML configuration file.
@ -145,124 +160,73 @@ def integrate_data_sources(yaml_config_file_path, log_dir='logs/'):
list: List of Paths to the created HDF5 file(s).
"""
config_dict = load_config_and_setup_logging(yaml_config_file_path, log_dir)
config_dict = load_config_and_setup_logging(path_to_config_yamlFile, log_dir)
exp_campaign_name = config_dict['experiment']
initials = config_dict['contact']
path_to_input_dir = config_dict['input_file_directory']
path_to_output_dir = config_dict['output_file_directory']
select_dir_keywords = config_dict['instrument_datafolder']
root_metadata_dict = {
'project' : config_dict['project'],
'experiment' : config_dict['experiment'],
'contact' : config_dict['contact'],
'actris_level': config_dict['actris_level']
}
# Get dataset start and end dates.
# Define root folder metadata dictionary
root_metadata_dict = {key : config_dict[key] for key in ['project', 'experiment', 'contact', 'actris_level']}
# Get dataset start and end dates
dataset_startdate = config_dict['dataset_startdate']
dataset_enddate = config_dict['dataset_enddate']
# Handle datetime_steps and integration mode
datetime_augment_dict = {}
select_file_keywords = []
if 'datetime_steps' in config_dict and config_dict['datetime_steps']:
# We augment each datetime step with various datetime formats to compensate for filenaming inconsistencies
datetime_augment_dict = generate_datetime_dict(config_dict['datetime_steps'])
# Create flattened datetime list of datetime object file keywords
select_file_keywords = list(chain.from_iterable(datetime_augment_dict.values()))
if not config_dict['filename_format']: # if config_dict['filename_format'] == None
filename_parts = [exp_campaign_name,initials]
else:
filename_parts = [config_dict[key] for key in config_dict['filename_format'].split(',')]
# Determine mode and process accordingly
output_filename_path = []
campaign_name_template = lambda name, date, initials : f"{name}_{date}_{initials}"
campaign_name_template = lambda filename_parts, suffix : '_'.join(filename_parts+[suffix])
campaign_name_template = lambda filename_prefix, suffix: '_'.join([filename_prefix, suffix])
date_str = f'{dataset_startdate}_{dataset_enddate}'
# Create path to new rawdata subfolder and standardize it with forward-slashes as required by pipeline
path_to_rawdata_folder = os.path.join(path_to_output_dir, 'collection_' + campaign_name_template(filename_parts, date_str),"").replace(os.sep,'/')
if datetime_augment_dict:
# Create path to new raw datafolder and standardize with forward slashes
path_to_rawdata_folder = os.path.join(
path_to_output_dir, 'collection_' + campaign_name_template(config_dict['filename_prefix'], date_str), "").replace(os.sep, '/')
# Process individual datetime steps if available, regardless of mode
if config_dict.get('datetime_steps_dict', {}):
# Single experiment mode
for datetime_step in datetime_augment_dict.keys():
for datetime_step, file_keywords in config_dict['datetime_steps_dict'].items():
date_str = datetime_step.strftime('%Y-%m-%d')
select_file_keywords = datetime_augment_dict[datetime_step]
single_campaign_name = campaign_name_template(config_dict['filename_prefix'], date_str)
path_to_rawdata_subfolder = os.path.join(path_to_rawdata_folder, single_campaign_name, "")
single_campaign_name = campaign_name_template(filename_parts, date_str) #campaign_name_template(exp_campaign_name, initials, date_str)
# Create path to new rawdata subfolder and standardize it with forward slashes as required by pipeline
path_to_rawdata_subfolder = os.path.join(path_to_rawdata_folder, single_campaign_name,"").replace(os.sep,'/')
# Run two step data integration pipeline on specified experimental campaign data collection
path_to_integrated_stepwise_hdf5_file = dima_pipeline(path_to_input_dir,
path_to_rawdata_subfolder ,
select_dir_keywords,
select_file_keywords,
root_metadata_dict)
path_to_integrated_stepwise_hdf5_file = copy_subtree_and_create_hdf5(
path_to_input_dir, path_to_rawdata_subfolder, select_dir_keywords,
file_keywords, allowed_file_extensions, root_metadata_dict)
output_filename_path.append(path_to_integrated_stepwise_hdf5_file)
# Collection mode processing if specified
if 'collection' in config_dict.get('integration_mode', 'single_experiment'):
#date_str = f'{dataset_startdate}_{dataset_enddate}'
path_to_filenames_dict = {}
path_to_filenames_dict[path_to_rawdata_folder] = []
for path in output_filename_path:
path_to_file, filename = os.path.split(path) # path_to_file should be same as path_to_rawdata_folder
path_to_filenames_dict[path_to_rawdata_folder].append(filename)
path_to_filenames_dict = {path_to_rawdata_folder: [os.path.basename(path) for path in output_filename_path]} if output_filename_path else {}
hdf5_path = hdf5_lib.create_hdf5_file_from_filesystem_path(path_to_rawdata_folder, path_to_filenames_dict, [], root_metadata_dict)
output_filename_path.append(hdf5_path)
else:
path_to_integrated_stepwise_hdf5_file = copy_subtree_and_create_hdf5(
path_to_input_dir, path_to_rawdata_folder, select_dir_keywords, [],
allowed_file_extensions, root_metadata_dict)
output_filename_path.append(path_to_integrated_stepwise_hdf5_file)
path_to_integrated_hdf5_file = hdf5_lib.create_hdf5_file_from_filesystem_path(path_to_rawdata_folder,
path_to_filenames_dict,
[],
root_metadata_dict)
output_filename_path.append(path_to_integrated_hdf5_file)
else: # collection
path_to_integrated_hdf5_file = dima_pipeline(path_to_input_dir,
path_to_rawdata_folder,
select_dir_keywords,
select_file_keywords,
root_metadata_dict)
output_filename_path.append(path_to_integrated_hdf5_file)
return output_filename_path
def dima_pipeline(path_to_input_dir, path_to_rawdata_folder, select_dir_keywords, select_file_keywords, root_metadata_dict = {}):
# Possibly replace below line with os.path.join(path_to_rawdata_folder,'..')
path_to_output_dir = os.path.join(path_to_rawdata_folder,'..')
path_to_output_dir = os.path.normpath(path_to_output_dir)
logging.info("Creating copy of the experimental campaign folder at: %s", path_to_output_dir)
# Step 1: Search through input directory and make a copy in the output directory that complies with naming constraints
path_to_input_dir_filenames_dict = utils.copy_directory_with_contraints(path_to_input_dir,
path_to_rawdata_folder,
select_dir_keywords,
select_file_keywords,
allowed_file_extensions)
logging.info("Finished creating a copy of the experimental campaign folder at: %s", path_to_output_dir)
if __name__ == "__main__":
# Step 2: Create HDF5 file
logging.info("Creating HDF5 file at: %s", path_to_output_dir)
if len(sys.argv) < 2:
print("Usage: python data_integration.py <function_name> <function_args>")
sys.exit(1)
# Extract the function name from the command line arguments
function_name = sys.argv[1]
# Handle function execution based on the provided function name
if function_name == 'run':
if len(sys.argv) != 2:
print("Usage: python data_integration.py run <path_to_config_yamlFile>")
sys.exit(1)
# Extract path to configuration file, specifying the data integration task
path_to_config_yamlFile = sys.argv[2]
run_pipeline(path_to_config_yamlFile)
path_to_integrated_hdf5_file = hdf5_lib.create_hdf5_file_from_filesystem_path(path_to_rawdata_folder,
path_to_input_dir_filenames_dict,
select_dir_keywords,
root_metadata_dict)
logging.info("Completed creation of HDF5 file %s for the specified experimental campaign at: %s",
path_to_integrated_hdf5_file, path_to_output_dir)
return path_to_integrated_hdf5_file