Files
dima/pipelines/data_integration.py

284 lines
13 KiB
Python

import sys
import os
import re
try:
thisFilePath = os.path.abspath(__file__)
except NameError:
print("Error: __file__ is not available. Ensure the script is being run from a file.")
print("[Notice] Path to DIMA package may not be resolved properly.")
thisFilePath = os.getcwd() # Use current directory or specify a default
dimaPath = os.path.normpath(os.path.join(thisFilePath, "..",'..')) # Move up to project root
if dimaPath not in sys.path: # Avoid duplicate entries
sys.path.append(dimaPath)
import yaml
import logging
from datetime import datetime
# Importing chain class from itertools
from itertools import chain
# Import DIMA modules
try:
from dima.src import hdf5_writer as hdf5_lib
from dima.utils import g5505_utils as utils
from dima.instruments import filereader_registry
except ModuleNotFoundError:
print(':)')
import src.hdf5_writer as hdf5_lib
import utils.g5505_utils as utils
from instruments import filereader_registry
allowed_file_extensions = filereader_registry.file_extensions
def _generate_datetime_dict(datetime_steps):
""" Generate the datetime augment dictionary from datetime steps. """
datetime_augment_dict = {}
for datetime_step in datetime_steps:
datetime_augment_dict[datetime_step] = [
datetime_step.strftime('%Y-%m-%d'), datetime_step.strftime('%Y_%m_%d'),
datetime_step.strftime('%Y.%m.%d'), datetime_step.strftime('%Y%m%d')
]
return datetime_augment_dict
def _generate_output_path_fragment(filename_prefix, integration_mode, dataset_startdate, dataset_enddate, index=None):
"""Generate consistent directory or file name fragment based on mode."""
if integration_mode == 'collection':
return f'collection_{index}_{filename_prefix}_{dataset_enddate}'
else:
return f'{filename_prefix}_{dataset_enddate}'
def load_config_and_setup_logging(yaml_config_file_path, log_dir):
"""Load YAML configuration file, set up logging, and validate required keys and datetime_steps."""
# Define required keys
required_keys = [
'experiment', 'contact', 'input_file_directory', 'output_file_directory',
'instrument_datafolder', 'project', 'actris_level'
]
# Supported integration modes
supported_integration_modes = ['collection', 'single_experiment']
# Set up logging
date = utils.created_at("%Y_%m").replace(":", "-")
utils.setup_logging(log_dir, f"integrate_data_sources_{date}.log")
# Load YAML configuration file
with open(yaml_config_file_path, 'r') as stream:
try:
config_dict = yaml.load(stream, Loader=yaml.FullLoader)
except yaml.YAMLError as exc:
logging.error("Error loading YAML file: %s", exc)
raise ValueError(f"Failed to load YAML file: {exc}")
# Check if required keys are present
missing_keys = [key for key in required_keys if key not in config_dict]
if missing_keys:
raise KeyError(f"Missing required keys in YAML configuration: {missing_keys}")
# Look for all placeholders like ${VAR_NAME}
input_dir = config_dict['input_file_directory']
placeholders = re.findall(r'\$\{([^}^{]+)\}', input_dir)
success = utils.load_env_from_root()
print(f'Success : {success}')
for var in placeholders:
env_value = os.environ.get(var)
if env_value is None:
raise ValueError(f"Environment variable '{var}' is not set but used in the config.")
input_dir = input_dir.replace(f"${{{var}}}", env_value)
config_dict['input_file_directory'] = input_dir
# Check the instrument_datafolder required type and ensure the list is of at least length one.
if isinstance(config_dict['instrument_datafolder'], list) and not len(config_dict['instrument_datafolder'])>=1:
raise ValueError('Invalid value for key "instrument_datafolder". Expected a list of strings with at least one item.'
'Each item represents a subfolder name in the input file directory, where the name'
'must match the format "<subfolder>[/<subfolder>]".'
'The first subfolder name is required, and the second is optional. '
'Examples of valid values: "level1", "level1/level2".')
# Define the pattern for valid subfolder names: `subfolder` or `subfolder/subfolder`
#valid_pattern = re.compile(r'^[^/]+(/[^/]+)?$')
# Validate each subfolder name
#for folder in config_dict['instrument_folder']:
# if not isinstance(folder, str) or not valid_pattern.match(folder):
# raise ValueError(
# 'Invalid value for key "instrument_folder" in YAML file.'
# 'Each item must be a string matching the format '
# '"<subfolder>[/<subfolder>]". The first subfolder name is required, and the second is optional. '
# 'Examples of valid values: "level1", "level1/level2". '
# f'Invalid item: {folder}'
# )
# Validate integration_mode
integration_mode = config_dict.get('integration_mode', 'N/A') # Default to 'collection'
if integration_mode not in supported_integration_modes:
raise RuntimeWarning(
f"Unsupported integration_mode '{integration_mode}'. Supported modes are {supported_integration_modes}. Setting '{integration_mode}' to 'single_experiment'."
)
# Validate datetime_steps format if it exists
if 'datetime_steps' in config_dict:
datetime_steps = config_dict['datetime_steps']
expected_format = '%Y-%m-%d %H-%M-%S'
# Check if datetime_steps is a list or a falsy value
if datetime_steps and not isinstance(datetime_steps, list):
raise TypeError(f"datetime_steps should be a list of strings or a falsy value (None, empty), but got {type(datetime_steps)}")
for step_idx, step in enumerate(datetime_steps):
try:
# Attempt to parse the datetime to ensure correct format
config_dict['datetime_steps'][step_idx] = datetime.strptime(step, expected_format)
except ValueError:
raise ValueError(f"Invalid datetime format for '{step}'. Expected format: {expected_format}")
# Augment datatime_steps list as a dictionary. This to speed up single-experiment file generation
config_dict['datetime_steps_dict'] = _generate_datetime_dict(datetime_steps)
else:
# If datetime_steps is not present, set the integration mode to 'collection'
logging.info("datetime_steps missing, setting integration_mode to 'collection'.")
config_dict['integration_mode'] = 'collection'
# Validate filename_format if defined
if 'filename_format' in config_dict:
if not isinstance(config_dict['filename_format'], str):
raise ValueError(f'"Specified filename_format needs to be of String type" ')
# Split the string and check if each key exists in config_dict
keys = [key.strip() for key in config_dict['filename_format'].split(',')]
missing_keys = [key for key in keys if key not in config_dict]
# If there are any missing keys, raise an assertion error
# assert not missing_keys, f'Missing key(s) in config_dict: {", ".join(missing_keys)}'
if not missing_keys:
config_dict['filename_format'] = ','.join(keys)
else:
config_dict['filename_format'] = None
print(f'"filename_format" should contain comma-separated keys that match existing keys in the YAML config file.')
print('Setting "filename_format" as None')
else:
config_dict['filename_format'] = None
# Compute complementary metadata elements
# Create output filename prefix
if not config_dict['filename_format']: # default behavior
config_dict['filename_prefix'] = '_'.join([config_dict[key] for key in ['experiment', 'contact']])
else:
config_dict['filename_prefix'] = '_'.join([config_dict[key] for key in config_dict['filename_format'].split(sep=',')])
# Set default dates from datetime_steps if not provided
current_date = datetime.now().strftime('%Y-%m-%d')
dates = config_dict.get('datetime_steps',[])
if not config_dict.get('dataset_startdate'):
config_dict['dataset_startdate'] = min(config_dict['datetime_steps']).strftime('%Y-%m-%d') if dates else current_date # Earliest datetime step
if not config_dict.get('dataset_enddate'):
config_dict['dataset_enddate'] = max(config_dict['datetime_steps']).strftime('%Y-%m-%d') if dates else current_date # Latest datetime step
config_dict['expected_datetime_format'] = '%Y-%m-%d %H-%M-%S'
return config_dict
def copy_subtree_and_create_hdf5(src, dst, select_dir_keywords, select_file_keywords, allowed_file_extensions, root_metadata_dict):
"""Helper function to copy directory with constraints and create HDF5."""
src = src.replace(os.sep,'/')
dst = dst.replace(os.sep,'/')
logging.info("Creating constrained copy of the experimental campaign folder %s at: %s", src, dst)
path_to_files_dict = utils.copy_directory_with_contraints(src, dst, select_dir_keywords, select_file_keywords, allowed_file_extensions)
logging.info("Finished creating a copy of the experimental campaign folder tree at: %s", dst)
logging.info("Creating HDF5 file at: %s", dst)
#hdf5_path = hdf5_lib.create_hdf5_file_from_filesystem_path_new(dst, path_to_files_dict, select_dir_keywords, root_metadata_dict)
hdf5_path = hdf5_lib.create_hdf5_file_from_filesystem_path(dst, path_to_files_dict, select_dir_keywords, root_metadata_dict)
logging.info("Completed creation of HDF5 file %s at: %s", hdf5_path, dst)
return hdf5_path
def run_pipeline(path_to_config_yamlFile, log_dir='logs/'):
config_dict = load_config_and_setup_logging(path_to_config_yamlFile, log_dir)
path_to_input_dir = config_dict['input_file_directory']
path_to_output_dir = config_dict['output_file_directory']
select_dir_keywords = config_dict['instrument_datafolder']
# Define root folder metadata dictionary
root_metadata_dict = {key : config_dict[key] for key in ['project', 'experiment', 'contact', 'actris_level']}
# Get dataset start and end dates
dataset_startdate = config_dict['dataset_startdate']
dataset_enddate = config_dict['dataset_enddate']
integration_mode = config_dict.get('integration_mode', 'single_experiment')
filename_prefix = config_dict['filename_prefix']
output_filename_path = []
# Determine top-level campaign folder path
top_level_foldername = _generate_output_path_fragment(
filename_prefix, integration_mode, dataset_startdate, dataset_enddate, index=1
)
path_to_rawdata_folder = os.path.join(
path_to_output_dir, top_level_foldername, ""
).replace(os.sep, '/')
# Process individual datetime steps if available, regardless of mode
if config_dict.get('datetime_steps_dict', {}):
for datetime_step, file_keywords in config_dict['datetime_steps_dict'].items():
single_date_str = datetime_step.strftime('%Y%m%d')
subfolder_name = f"{filename_prefix}_{single_date_str}"
subfolder_name = f"experimental_step_{single_date_str}"
path_to_rawdata_subfolder = os.path.join(path_to_rawdata_folder, subfolder_name, "")
path_to_integrated_stepwise_hdf5_file = copy_subtree_and_create_hdf5(
path_to_input_dir, path_to_rawdata_subfolder, select_dir_keywords,
file_keywords, allowed_file_extensions, root_metadata_dict)
output_filename_path.append(path_to_integrated_stepwise_hdf5_file)
# Collection mode post-processing
if integration_mode == 'collection':
path_to_filenames_dict = {path_to_rawdata_folder: [os.path.basename(path) for path in output_filename_path]} if output_filename_path else {}
hdf5_path = hdf5_lib.create_hdf5_file_from_filesystem_path(
path_to_rawdata_folder, path_to_filenames_dict, [], root_metadata_dict
)
output_filename_path.append(hdf5_path)
else:
path_to_integrated_stepwise_hdf5_file = copy_subtree_and_create_hdf5(
path_to_input_dir, path_to_rawdata_folder, select_dir_keywords, [],
allowed_file_extensions, root_metadata_dict)
output_filename_path.append(path_to_integrated_stepwise_hdf5_file)
return output_filename_path
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Usage: python data_integration.py <function_name> <function_args>")
sys.exit(1)
function_name = sys.argv[1]
if function_name == 'run':
if len(sys.argv) != 3:
print("Usage: python data_integration.py run <path_to_config_yamlFile>")
sys.exit(1)
path_to_config_yamlFile = sys.argv[2]
run_pipeline(path_to_config_yamlFile)