Synch with remote repo
This commit is contained in:
@ -1,265 +1,265 @@
|
||||
import sys
|
||||
import os
|
||||
import re
|
||||
|
||||
try:
|
||||
thisFilePath = os.path.abspath(__file__)
|
||||
except NameError:
|
||||
print("Error: __file__ is not available. Ensure the script is being run from a file.")
|
||||
print("[Notice] Path to DIMA package may not be resolved properly.")
|
||||
thisFilePath = os.getcwd() # Use current directory or specify a default
|
||||
|
||||
dimaPath = os.path.normpath(os.path.join(thisFilePath, "..",'..')) # Move up to project root
|
||||
|
||||
if dimaPath not in sys.path: # Avoid duplicate entries
|
||||
sys.path.append(dimaPath)
|
||||
|
||||
|
||||
import yaml
|
||||
import logging
|
||||
from datetime import datetime
|
||||
# Importing chain class from itertools
|
||||
from itertools import chain
|
||||
|
||||
# Import DIMA modules
|
||||
import src.hdf5_writer as hdf5_lib
|
||||
import utils.g5505_utils as utils
|
||||
from instruments.readers import filereader_registry
|
||||
|
||||
allowed_file_extensions = filereader_registry.file_extensions
|
||||
|
||||
def _generate_datetime_dict(datetime_steps):
|
||||
""" Generate the datetime augment dictionary from datetime steps. """
|
||||
datetime_augment_dict = {}
|
||||
for datetime_step in datetime_steps:
|
||||
#tmp = datetime.strptime(datetime_step, '%Y-%m-%d %H-%M-%S')
|
||||
datetime_augment_dict[datetime_step] = [
|
||||
datetime_step.strftime('%Y-%m-%d'), datetime_step.strftime('%Y_%m_%d'), datetime_step.strftime('%Y.%m.%d'), datetime_step.strftime('%Y%m%d')
|
||||
]
|
||||
return datetime_augment_dict
|
||||
|
||||
def load_config_and_setup_logging(yaml_config_file_path, log_dir):
|
||||
"""Load YAML configuration file, set up logging, and validate required keys and datetime_steps."""
|
||||
|
||||
# Define required keys
|
||||
required_keys = [
|
||||
'experiment', 'contact', 'input_file_directory', 'output_file_directory',
|
||||
'instrument_datafolder', 'project', 'actris_level'
|
||||
]
|
||||
|
||||
# Supported integration modes
|
||||
supported_integration_modes = ['collection', 'single_experiment']
|
||||
|
||||
|
||||
# Set up logging
|
||||
date = utils.created_at("%Y_%m").replace(":", "-")
|
||||
utils.setup_logging(log_dir, f"integrate_data_sources_{date}.log")
|
||||
|
||||
# Load YAML configuration file
|
||||
with open(yaml_config_file_path, 'r') as stream:
|
||||
try:
|
||||
config_dict = yaml.load(stream, Loader=yaml.FullLoader)
|
||||
except yaml.YAMLError as exc:
|
||||
logging.error("Error loading YAML file: %s", exc)
|
||||
raise ValueError(f"Failed to load YAML file: {exc}")
|
||||
|
||||
# Check if required keys are present
|
||||
missing_keys = [key for key in required_keys if key not in config_dict]
|
||||
if missing_keys:
|
||||
raise KeyError(f"Missing required keys in YAML configuration: {missing_keys}")
|
||||
|
||||
# Check the instrument_datafolder required type and ensure the list is of at least length one.
|
||||
if isinstance(config_dict['instrument_datafolder'], list) and not len(config_dict['instrument_datafolder'])>=1:
|
||||
raise ValueError('Invalid value for key "instrument_datafolder". Expected a list of strings with at least one item.'
|
||||
'Each item represents a subfolder name in the input file directory, where the name'
|
||||
'must match the format "<subfolder>[/<subfolder>]".'
|
||||
'The first subfolder name is required, and the second is optional. '
|
||||
'Examples of valid values: "level1", "level1/level2".')
|
||||
|
||||
# Define the pattern for valid subfolder names: `subfolder` or `subfolder/subfolder`
|
||||
#valid_pattern = re.compile(r'^[^/]+(/[^/]+)?$')
|
||||
|
||||
# Validate each subfolder name
|
||||
#for folder in config_dict['instrument_folder']:
|
||||
# if not isinstance(folder, str) or not valid_pattern.match(folder):
|
||||
# raise ValueError(
|
||||
# 'Invalid value for key "instrument_folder" in YAML file.'
|
||||
# 'Each item must be a string matching the format '
|
||||
# '"<subfolder>[/<subfolder>]". The first subfolder name is required, and the second is optional. '
|
||||
# 'Examples of valid values: "level1", "level1/level2". '
|
||||
# f'Invalid item: {folder}'
|
||||
# )
|
||||
|
||||
# Validate integration_mode
|
||||
integration_mode = config_dict.get('integration_mode', 'N/A') # Default to 'collection'
|
||||
if integration_mode not in supported_integration_modes:
|
||||
raise RuntimeWarning(
|
||||
f"Unsupported integration_mode '{integration_mode}'. Supported modes are {supported_integration_modes}. Setting '{integration_mode}' to 'single_experiment'."
|
||||
)
|
||||
|
||||
|
||||
# Validate datetime_steps format if it exists
|
||||
if 'datetime_steps' in config_dict:
|
||||
datetime_steps = config_dict['datetime_steps']
|
||||
expected_format = '%Y-%m-%d %H-%M-%S'
|
||||
|
||||
# Check if datetime_steps is a list or a falsy value
|
||||
if datetime_steps and not isinstance(datetime_steps, list):
|
||||
raise TypeError(f"datetime_steps should be a list of strings or a falsy value (None, empty), but got {type(datetime_steps)}")
|
||||
|
||||
for step_idx, step in enumerate(datetime_steps):
|
||||
try:
|
||||
# Attempt to parse the datetime to ensure correct format
|
||||
config_dict['datetime_steps'][step_idx] = datetime.strptime(step, expected_format)
|
||||
except ValueError:
|
||||
raise ValueError(f"Invalid datetime format for '{step}'. Expected format: {expected_format}")
|
||||
# Augment datatime_steps list as a dictionary. This to speed up single-experiment file generation
|
||||
config_dict['datetime_steps_dict'] = _generate_datetime_dict(datetime_steps)
|
||||
else:
|
||||
# If datetime_steps is not present, set the integration mode to 'collection'
|
||||
logging.info("datetime_steps missing, setting integration_mode to 'collection'.")
|
||||
config_dict['integration_mode'] = 'collection'
|
||||
|
||||
# Validate filename_format if defined
|
||||
if 'filename_format' in config_dict:
|
||||
if not isinstance(config_dict['filename_format'], str):
|
||||
raise ValueError(f'"Specified filename_format needs to be of String type" ')
|
||||
|
||||
# Split the string and check if each key exists in config_dict
|
||||
keys = [key.strip() for key in config_dict['filename_format'].split(',')]
|
||||
missing_keys = [key for key in keys if key not in config_dict]
|
||||
|
||||
# If there are any missing keys, raise an assertion error
|
||||
# assert not missing_keys, f'Missing key(s) in config_dict: {", ".join(missing_keys)}'
|
||||
if not missing_keys:
|
||||
config_dict['filename_format'] = ','.join(keys)
|
||||
else:
|
||||
config_dict['filename_format'] = None
|
||||
print(f'"filename_format" should contain comma-separated keys that match existing keys in the YAML config file.')
|
||||
print('Setting "filename_format" as None')
|
||||
else:
|
||||
config_dict['filename_format'] = None
|
||||
|
||||
# Compute complementary metadata elements
|
||||
|
||||
# Create output filename prefix
|
||||
if not config_dict['filename_format']: # default behavior
|
||||
config_dict['filename_prefix'] = '_'.join([config_dict[key] for key in ['experiment', 'contact']])
|
||||
else:
|
||||
config_dict['filename_prefix'] = '_'.join([config_dict[key] for key in config_dict['filename_format'].split(sep=',')])
|
||||
|
||||
# Set default dates from datetime_steps if not provided
|
||||
current_date = datetime.now().strftime('%Y-%m-%d')
|
||||
dates = config_dict.get('datetime_steps',[])
|
||||
if not config_dict.get('dataset_startdate'):
|
||||
config_dict['dataset_startdate'] = min(config_dict['datetime_steps']).strftime('%Y-%m-%d') if dates else current_date # Earliest datetime step
|
||||
|
||||
if not config_dict.get('dataset_enddate'):
|
||||
config_dict['dataset_enddate'] = max(config_dict['datetime_steps']).strftime('%Y-%m-%d') if dates else current_date # Latest datetime step
|
||||
|
||||
config_dict['expected_datetime_format'] = '%Y-%m-%d %H-%M-%S'
|
||||
|
||||
return config_dict
|
||||
|
||||
|
||||
def copy_subtree_and_create_hdf5(src, dst, select_dir_keywords, select_file_keywords, allowed_file_extensions, root_metadata_dict):
|
||||
|
||||
"""Helper function to copy directory with constraints and create HDF5."""
|
||||
src = src.replace(os.sep,'/')
|
||||
dst = dst.replace(os.sep,'/')
|
||||
|
||||
logging.info("Creating constrained copy of the experimental campaign folder %s at: %s", src, dst)
|
||||
|
||||
path_to_files_dict = utils.copy_directory_with_contraints(src, dst, select_dir_keywords, select_file_keywords, allowed_file_extensions)
|
||||
logging.info("Finished creating a copy of the experimental campaign folder tree at: %s", dst)
|
||||
|
||||
|
||||
logging.info("Creating HDF5 file at: %s", dst)
|
||||
hdf5_path = hdf5_lib.create_hdf5_file_from_filesystem_path(dst, path_to_files_dict, select_dir_keywords, root_metadata_dict)
|
||||
logging.info("Completed creation of HDF5 file %s at: %s", hdf5_path, dst)
|
||||
|
||||
return hdf5_path
|
||||
|
||||
|
||||
def run_pipeline(path_to_config_yamlFile, log_dir='logs/'):
|
||||
|
||||
"""Integrates data sources specified by the input configuration file into HDF5 files.
|
||||
|
||||
Parameters:
|
||||
yaml_config_file_path (str): Path to the YAML configuration file.
|
||||
log_dir (str): Directory to save the log file.
|
||||
|
||||
Returns:
|
||||
list: List of Paths to the created HDF5 file(s).
|
||||
"""
|
||||
|
||||
config_dict = load_config_and_setup_logging(path_to_config_yamlFile, log_dir)
|
||||
|
||||
path_to_input_dir = config_dict['input_file_directory']
|
||||
path_to_output_dir = config_dict['output_file_directory']
|
||||
select_dir_keywords = config_dict['instrument_datafolder']
|
||||
|
||||
# Define root folder metadata dictionary
|
||||
root_metadata_dict = {key : config_dict[key] for key in ['project', 'experiment', 'contact', 'actris_level']}
|
||||
|
||||
# Get dataset start and end dates
|
||||
dataset_startdate = config_dict['dataset_startdate']
|
||||
dataset_enddate = config_dict['dataset_enddate']
|
||||
|
||||
# Determine mode and process accordingly
|
||||
output_filename_path = []
|
||||
campaign_name_template = lambda filename_prefix, suffix: '_'.join([filename_prefix, suffix])
|
||||
date_str = f'{dataset_startdate}_{dataset_enddate}'
|
||||
|
||||
# Create path to new raw datafolder and standardize with forward slashes
|
||||
path_to_rawdata_folder = os.path.join(
|
||||
path_to_output_dir, 'collection_' + campaign_name_template(config_dict['filename_prefix'], date_str), "").replace(os.sep, '/')
|
||||
|
||||
# Process individual datetime steps if available, regardless of mode
|
||||
if config_dict.get('datetime_steps_dict', {}):
|
||||
# Single experiment mode
|
||||
for datetime_step, file_keywords in config_dict['datetime_steps_dict'].items():
|
||||
date_str = datetime_step.strftime('%Y-%m-%d')
|
||||
single_campaign_name = campaign_name_template(config_dict['filename_prefix'], date_str)
|
||||
path_to_rawdata_subfolder = os.path.join(path_to_rawdata_folder, single_campaign_name, "")
|
||||
|
||||
path_to_integrated_stepwise_hdf5_file = copy_subtree_and_create_hdf5(
|
||||
path_to_input_dir, path_to_rawdata_subfolder, select_dir_keywords,
|
||||
file_keywords, allowed_file_extensions, root_metadata_dict)
|
||||
|
||||
output_filename_path.append(path_to_integrated_stepwise_hdf5_file)
|
||||
|
||||
# Collection mode processing if specified
|
||||
if 'collection' in config_dict.get('integration_mode', 'single_experiment'):
|
||||
path_to_filenames_dict = {path_to_rawdata_folder: [os.path.basename(path) for path in output_filename_path]} if output_filename_path else {}
|
||||
hdf5_path = hdf5_lib.create_hdf5_file_from_filesystem_path(path_to_rawdata_folder, path_to_filenames_dict, [], root_metadata_dict)
|
||||
output_filename_path.append(hdf5_path)
|
||||
else:
|
||||
path_to_integrated_stepwise_hdf5_file = copy_subtree_and_create_hdf5(
|
||||
path_to_input_dir, path_to_rawdata_folder, select_dir_keywords, [],
|
||||
allowed_file_extensions, root_metadata_dict)
|
||||
output_filename_path.append(path_to_integrated_stepwise_hdf5_file)
|
||||
|
||||
return output_filename_path
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
if len(sys.argv) < 2:
|
||||
print("Usage: python data_integration.py <function_name> <function_args>")
|
||||
sys.exit(1)
|
||||
|
||||
# Extract the function name from the command line arguments
|
||||
function_name = sys.argv[1]
|
||||
|
||||
# Handle function execution based on the provided function name
|
||||
if function_name == 'run':
|
||||
|
||||
if len(sys.argv) != 3:
|
||||
print("Usage: python data_integration.py run <path_to_config_yamlFile>")
|
||||
sys.exit(1)
|
||||
# Extract path to configuration file, specifying the data integration task
|
||||
path_to_config_yamlFile = sys.argv[2]
|
||||
run_pipeline(path_to_config_yamlFile)
|
||||
|
||||
|
||||
import sys
|
||||
import os
|
||||
import re
|
||||
|
||||
try:
|
||||
thisFilePath = os.path.abspath(__file__)
|
||||
except NameError:
|
||||
print("Error: __file__ is not available. Ensure the script is being run from a file.")
|
||||
print("[Notice] Path to DIMA package may not be resolved properly.")
|
||||
thisFilePath = os.getcwd() # Use current directory or specify a default
|
||||
|
||||
dimaPath = os.path.normpath(os.path.join(thisFilePath, "..",'..')) # Move up to project root
|
||||
|
||||
if dimaPath not in sys.path: # Avoid duplicate entries
|
||||
sys.path.append(dimaPath)
|
||||
|
||||
|
||||
import yaml
|
||||
import logging
|
||||
from datetime import datetime
|
||||
# Importing chain class from itertools
|
||||
from itertools import chain
|
||||
|
||||
# Import DIMA modules
|
||||
import src.hdf5_writer as hdf5_lib
|
||||
import utils.g5505_utils as utils
|
||||
from instruments.readers import filereader_registry
|
||||
|
||||
allowed_file_extensions = filereader_registry.file_extensions
|
||||
|
||||
def _generate_datetime_dict(datetime_steps):
|
||||
""" Generate the datetime augment dictionary from datetime steps. """
|
||||
datetime_augment_dict = {}
|
||||
for datetime_step in datetime_steps:
|
||||
#tmp = datetime.strptime(datetime_step, '%Y-%m-%d %H-%M-%S')
|
||||
datetime_augment_dict[datetime_step] = [
|
||||
datetime_step.strftime('%Y-%m-%d'), datetime_step.strftime('%Y_%m_%d'), datetime_step.strftime('%Y.%m.%d'), datetime_step.strftime('%Y%m%d')
|
||||
]
|
||||
return datetime_augment_dict
|
||||
|
||||
def load_config_and_setup_logging(yaml_config_file_path, log_dir):
|
||||
"""Load YAML configuration file, set up logging, and validate required keys and datetime_steps."""
|
||||
|
||||
# Define required keys
|
||||
required_keys = [
|
||||
'experiment', 'contact', 'input_file_directory', 'output_file_directory',
|
||||
'instrument_datafolder', 'project', 'actris_level'
|
||||
]
|
||||
|
||||
# Supported integration modes
|
||||
supported_integration_modes = ['collection', 'single_experiment']
|
||||
|
||||
|
||||
# Set up logging
|
||||
date = utils.created_at("%Y_%m").replace(":", "-")
|
||||
utils.setup_logging(log_dir, f"integrate_data_sources_{date}.log")
|
||||
|
||||
# Load YAML configuration file
|
||||
with open(yaml_config_file_path, 'r') as stream:
|
||||
try:
|
||||
config_dict = yaml.load(stream, Loader=yaml.FullLoader)
|
||||
except yaml.YAMLError as exc:
|
||||
logging.error("Error loading YAML file: %s", exc)
|
||||
raise ValueError(f"Failed to load YAML file: {exc}")
|
||||
|
||||
# Check if required keys are present
|
||||
missing_keys = [key for key in required_keys if key not in config_dict]
|
||||
if missing_keys:
|
||||
raise KeyError(f"Missing required keys in YAML configuration: {missing_keys}")
|
||||
|
||||
# Check the instrument_datafolder required type and ensure the list is of at least length one.
|
||||
if isinstance(config_dict['instrument_datafolder'], list) and not len(config_dict['instrument_datafolder'])>=1:
|
||||
raise ValueError('Invalid value for key "instrument_datafolder". Expected a list of strings with at least one item.'
|
||||
'Each item represents a subfolder name in the input file directory, where the name'
|
||||
'must match the format "<subfolder>[/<subfolder>]".'
|
||||
'The first subfolder name is required, and the second is optional. '
|
||||
'Examples of valid values: "level1", "level1/level2".')
|
||||
|
||||
# Define the pattern for valid subfolder names: `subfolder` or `subfolder/subfolder`
|
||||
#valid_pattern = re.compile(r'^[^/]+(/[^/]+)?$')
|
||||
|
||||
# Validate each subfolder name
|
||||
#for folder in config_dict['instrument_folder']:
|
||||
# if not isinstance(folder, str) or not valid_pattern.match(folder):
|
||||
# raise ValueError(
|
||||
# 'Invalid value for key "instrument_folder" in YAML file.'
|
||||
# 'Each item must be a string matching the format '
|
||||
# '"<subfolder>[/<subfolder>]". The first subfolder name is required, and the second is optional. '
|
||||
# 'Examples of valid values: "level1", "level1/level2". '
|
||||
# f'Invalid item: {folder}'
|
||||
# )
|
||||
|
||||
# Validate integration_mode
|
||||
integration_mode = config_dict.get('integration_mode', 'N/A') # Default to 'collection'
|
||||
if integration_mode not in supported_integration_modes:
|
||||
raise RuntimeWarning(
|
||||
f"Unsupported integration_mode '{integration_mode}'. Supported modes are {supported_integration_modes}. Setting '{integration_mode}' to 'single_experiment'."
|
||||
)
|
||||
|
||||
|
||||
# Validate datetime_steps format if it exists
|
||||
if 'datetime_steps' in config_dict:
|
||||
datetime_steps = config_dict['datetime_steps']
|
||||
expected_format = '%Y-%m-%d %H-%M-%S'
|
||||
|
||||
# Check if datetime_steps is a list or a falsy value
|
||||
if datetime_steps and not isinstance(datetime_steps, list):
|
||||
raise TypeError(f"datetime_steps should be a list of strings or a falsy value (None, empty), but got {type(datetime_steps)}")
|
||||
|
||||
for step_idx, step in enumerate(datetime_steps):
|
||||
try:
|
||||
# Attempt to parse the datetime to ensure correct format
|
||||
config_dict['datetime_steps'][step_idx] = datetime.strptime(step, expected_format)
|
||||
except ValueError:
|
||||
raise ValueError(f"Invalid datetime format for '{step}'. Expected format: {expected_format}")
|
||||
# Augment datatime_steps list as a dictionary. This to speed up single-experiment file generation
|
||||
config_dict['datetime_steps_dict'] = _generate_datetime_dict(datetime_steps)
|
||||
else:
|
||||
# If datetime_steps is not present, set the integration mode to 'collection'
|
||||
logging.info("datetime_steps missing, setting integration_mode to 'collection'.")
|
||||
config_dict['integration_mode'] = 'collection'
|
||||
|
||||
# Validate filename_format if defined
|
||||
if 'filename_format' in config_dict:
|
||||
if not isinstance(config_dict['filename_format'], str):
|
||||
raise ValueError(f'"Specified filename_format needs to be of String type" ')
|
||||
|
||||
# Split the string and check if each key exists in config_dict
|
||||
keys = [key.strip() for key in config_dict['filename_format'].split(',')]
|
||||
missing_keys = [key for key in keys if key not in config_dict]
|
||||
|
||||
# If there are any missing keys, raise an assertion error
|
||||
# assert not missing_keys, f'Missing key(s) in config_dict: {", ".join(missing_keys)}'
|
||||
if not missing_keys:
|
||||
config_dict['filename_format'] = ','.join(keys)
|
||||
else:
|
||||
config_dict['filename_format'] = None
|
||||
print(f'"filename_format" should contain comma-separated keys that match existing keys in the YAML config file.')
|
||||
print('Setting "filename_format" as None')
|
||||
else:
|
||||
config_dict['filename_format'] = None
|
||||
|
||||
# Compute complementary metadata elements
|
||||
|
||||
# Create output filename prefix
|
||||
if not config_dict['filename_format']: # default behavior
|
||||
config_dict['filename_prefix'] = '_'.join([config_dict[key] for key in ['experiment', 'contact']])
|
||||
else:
|
||||
config_dict['filename_prefix'] = '_'.join([config_dict[key] for key in config_dict['filename_format'].split(sep=',')])
|
||||
|
||||
# Set default dates from datetime_steps if not provided
|
||||
current_date = datetime.now().strftime('%Y-%m-%d')
|
||||
dates = config_dict.get('datetime_steps',[])
|
||||
if not config_dict.get('dataset_startdate'):
|
||||
config_dict['dataset_startdate'] = min(config_dict['datetime_steps']).strftime('%Y-%m-%d') if dates else current_date # Earliest datetime step
|
||||
|
||||
if not config_dict.get('dataset_enddate'):
|
||||
config_dict['dataset_enddate'] = max(config_dict['datetime_steps']).strftime('%Y-%m-%d') if dates else current_date # Latest datetime step
|
||||
|
||||
config_dict['expected_datetime_format'] = '%Y-%m-%d %H-%M-%S'
|
||||
|
||||
return config_dict
|
||||
|
||||
|
||||
def copy_subtree_and_create_hdf5(src, dst, select_dir_keywords, select_file_keywords, allowed_file_extensions, root_metadata_dict):
|
||||
|
||||
"""Helper function to copy directory with constraints and create HDF5."""
|
||||
src = src.replace(os.sep,'/')
|
||||
dst = dst.replace(os.sep,'/')
|
||||
|
||||
logging.info("Creating constrained copy of the experimental campaign folder %s at: %s", src, dst)
|
||||
|
||||
path_to_files_dict = utils.copy_directory_with_contraints(src, dst, select_dir_keywords, select_file_keywords, allowed_file_extensions)
|
||||
logging.info("Finished creating a copy of the experimental campaign folder tree at: %s", dst)
|
||||
|
||||
|
||||
logging.info("Creating HDF5 file at: %s", dst)
|
||||
hdf5_path = hdf5_lib.create_hdf5_file_from_filesystem_path(dst, path_to_files_dict, select_dir_keywords, root_metadata_dict)
|
||||
logging.info("Completed creation of HDF5 file %s at: %s", hdf5_path, dst)
|
||||
|
||||
return hdf5_path
|
||||
|
||||
|
||||
def run_pipeline(path_to_config_yamlFile, log_dir='logs/'):
|
||||
|
||||
"""Integrates data sources specified by the input configuration file into HDF5 files.
|
||||
|
||||
Parameters:
|
||||
yaml_config_file_path (str): Path to the YAML configuration file.
|
||||
log_dir (str): Directory to save the log file.
|
||||
|
||||
Returns:
|
||||
list: List of Paths to the created HDF5 file(s).
|
||||
"""
|
||||
|
||||
config_dict = load_config_and_setup_logging(path_to_config_yamlFile, log_dir)
|
||||
|
||||
path_to_input_dir = config_dict['input_file_directory']
|
||||
path_to_output_dir = config_dict['output_file_directory']
|
||||
select_dir_keywords = config_dict['instrument_datafolder']
|
||||
|
||||
# Define root folder metadata dictionary
|
||||
root_metadata_dict = {key : config_dict[key] for key in ['project', 'experiment', 'contact', 'actris_level']}
|
||||
|
||||
# Get dataset start and end dates
|
||||
dataset_startdate = config_dict['dataset_startdate']
|
||||
dataset_enddate = config_dict['dataset_enddate']
|
||||
|
||||
# Determine mode and process accordingly
|
||||
output_filename_path = []
|
||||
campaign_name_template = lambda filename_prefix, suffix: '_'.join([filename_prefix, suffix])
|
||||
date_str = f'{dataset_startdate}_{dataset_enddate}'
|
||||
|
||||
# Create path to new raw datafolder and standardize with forward slashes
|
||||
path_to_rawdata_folder = os.path.join(
|
||||
path_to_output_dir, 'collection_' + campaign_name_template(config_dict['filename_prefix'], date_str), "").replace(os.sep, '/')
|
||||
|
||||
# Process individual datetime steps if available, regardless of mode
|
||||
if config_dict.get('datetime_steps_dict', {}):
|
||||
# Single experiment mode
|
||||
for datetime_step, file_keywords in config_dict['datetime_steps_dict'].items():
|
||||
date_str = datetime_step.strftime('%Y-%m-%d')
|
||||
single_campaign_name = campaign_name_template(config_dict['filename_prefix'], date_str)
|
||||
path_to_rawdata_subfolder = os.path.join(path_to_rawdata_folder, single_campaign_name, "")
|
||||
|
||||
path_to_integrated_stepwise_hdf5_file = copy_subtree_and_create_hdf5(
|
||||
path_to_input_dir, path_to_rawdata_subfolder, select_dir_keywords,
|
||||
file_keywords, allowed_file_extensions, root_metadata_dict)
|
||||
|
||||
output_filename_path.append(path_to_integrated_stepwise_hdf5_file)
|
||||
|
||||
# Collection mode processing if specified
|
||||
if 'collection' in config_dict.get('integration_mode', 'single_experiment'):
|
||||
path_to_filenames_dict = {path_to_rawdata_folder: [os.path.basename(path) for path in output_filename_path]} if output_filename_path else {}
|
||||
hdf5_path = hdf5_lib.create_hdf5_file_from_filesystem_path(path_to_rawdata_folder, path_to_filenames_dict, [], root_metadata_dict)
|
||||
output_filename_path.append(hdf5_path)
|
||||
else:
|
||||
path_to_integrated_stepwise_hdf5_file = copy_subtree_and_create_hdf5(
|
||||
path_to_input_dir, path_to_rawdata_folder, select_dir_keywords, [],
|
||||
allowed_file_extensions, root_metadata_dict)
|
||||
output_filename_path.append(path_to_integrated_stepwise_hdf5_file)
|
||||
|
||||
return output_filename_path
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
if len(sys.argv) < 2:
|
||||
print("Usage: python data_integration.py <function_name> <function_args>")
|
||||
sys.exit(1)
|
||||
|
||||
# Extract the function name from the command line arguments
|
||||
function_name = sys.argv[1]
|
||||
|
||||
# Handle function execution based on the provided function name
|
||||
if function_name == 'run':
|
||||
|
||||
if len(sys.argv) != 3:
|
||||
print("Usage: python data_integration.py run <path_to_config_yamlFile>")
|
||||
sys.exit(1)
|
||||
# Extract path to configuration file, specifying the data integration task
|
||||
path_to_config_yamlFile = sys.argv[2]
|
||||
run_pipeline(path_to_config_yamlFile)
|
||||
|
||||
|
||||
|
@ -1,179 +1,179 @@
|
||||
import sys
|
||||
import os
|
||||
|
||||
try:
|
||||
thisFilePath = os.path.abspath(__file__)
|
||||
except NameError:
|
||||
print("Error: __file__ is not available. Ensure the script is being run from a file.")
|
||||
print("[Notice] Path to DIMA package may not be resolved properly.")
|
||||
thisFilePath = os.getcwd() # Use current directory or specify a default
|
||||
|
||||
dimaPath = os.path.normpath(os.path.join(thisFilePath, "..",'..')) # Move up to project root
|
||||
|
||||
if dimaPath not in sys.path: # Avoid duplicate entries
|
||||
sys.path.append(dimaPath)
|
||||
|
||||
import h5py
|
||||
import yaml
|
||||
import src.hdf5_ops as hdf5_ops
|
||||
|
||||
|
||||
def load_yaml(review_yaml_file):
|
||||
with open(review_yaml_file, 'r') as stream:
|
||||
try:
|
||||
return yaml.load(stream, Loader=yaml.FullLoader)
|
||||
except yaml.YAMLError as exc:
|
||||
print(exc)
|
||||
return None
|
||||
|
||||
def validate_yaml_dict(input_hdf5_file, yaml_dict):
|
||||
errors = []
|
||||
notes = []
|
||||
|
||||
with h5py.File(input_hdf5_file, 'r') as hdf5_file:
|
||||
# 1. Check for valid object names
|
||||
for key in yaml_dict:
|
||||
if key not in hdf5_file:
|
||||
error_msg = f"Error: {key} is not a valid object's name in the HDF5 file."
|
||||
print(error_msg)
|
||||
errors.append(error_msg)
|
||||
|
||||
# 2. Confirm metadata dict for each object is a dictionary
|
||||
for key, meta_dict in yaml_dict.items():
|
||||
if not isinstance(meta_dict, dict):
|
||||
error_msg = f"Error: Metadata for {key} should be a dictionary."
|
||||
print(error_msg)
|
||||
errors.append(error_msg)
|
||||
else:
|
||||
if 'attributes' not in meta_dict:
|
||||
warning_msg = f"Warning: No 'attributes' in metadata dict for {key}."
|
||||
print(warning_msg)
|
||||
notes.append(warning_msg)
|
||||
|
||||
# 3. Verify update, append, and delete operations are well specified
|
||||
for key, meta_dict in yaml_dict.items():
|
||||
attributes = meta_dict.get("attributes", {})
|
||||
|
||||
for attr_name, attr_value in attributes.items():
|
||||
# Ensure the object exists before accessing attributes
|
||||
if key in hdf5_file:
|
||||
hdf5_obj_attrs = hdf5_file[key].attrs # Access object-specific attributes
|
||||
|
||||
if attr_name in hdf5_obj_attrs:
|
||||
# Attribute exists: it can be updated or deleted
|
||||
if isinstance(attr_value, dict) and "delete" in attr_value:
|
||||
note_msg = f"Note: '{attr_name}' in {key} may be deleted if 'delete' is set as true."
|
||||
print(note_msg)
|
||||
notes.append(note_msg)
|
||||
else:
|
||||
note_msg = f"Note: '{attr_name}' in {key} will be updated."
|
||||
print(note_msg)
|
||||
notes.append(note_msg)
|
||||
else:
|
||||
# Attribute does not exist: it can be appended or flagged as an invalid delete
|
||||
if isinstance(attr_value, dict) and "delete" in attr_value:
|
||||
error_msg = f"Error: Cannot delete non-existent attribute '{attr_name}' in {key}."
|
||||
print(error_msg)
|
||||
errors.append(error_msg)
|
||||
else:
|
||||
note_msg = f"Note: '{attr_name}' in {key} will be appended."
|
||||
print(note_msg)
|
||||
notes.append(note_msg)
|
||||
else:
|
||||
error_msg = f"Error: '{key}' is not a valid object in the HDF5 file."
|
||||
print(error_msg)
|
||||
errors.append(error_msg)
|
||||
|
||||
return len(errors) == 0, errors, notes
|
||||
|
||||
|
||||
def update_hdf5_file_with_review(input_hdf5_file, review_yaml_file):
|
||||
|
||||
"""
|
||||
Updates, appends, or deletes metadata attributes in an HDF5 file based on a provided YAML dictionary.
|
||||
|
||||
Parameters:
|
||||
-----------
|
||||
input_hdf5_file : str
|
||||
Path to the HDF5 file.
|
||||
|
||||
yaml_dict : dict
|
||||
Dictionary specifying objects and their attributes with operations. Example format:
|
||||
{
|
||||
"object_name": { "attributes" : "attr_name": { "value": attr_value,
|
||||
"delete": true | false
|
||||
}
|
||||
}
|
||||
}
|
||||
"""
|
||||
yaml_dict = load_yaml(review_yaml_file)
|
||||
|
||||
success, errors, notes = validate_yaml_dict(input_hdf5_file,yaml_dict)
|
||||
if not success:
|
||||
raise ValueError(f"Review yaml file {review_yaml_file} is invalid. Validation errors: {errors}")
|
||||
|
||||
# Initialize HDF5 operations manager
|
||||
DataOpsAPI = hdf5_ops.HDF5DataOpsManager(input_hdf5_file)
|
||||
DataOpsAPI.load_file_obj()
|
||||
|
||||
# Iterate over each object in the YAML dictionary
|
||||
for obj_name, attr_dict in yaml_dict.items():
|
||||
# Prepare dictionaries for append, update, and delete actions
|
||||
append_dict = {}
|
||||
update_dict = {}
|
||||
delete_dict = {}
|
||||
|
||||
if not obj_name in DataOpsAPI.file_obj:
|
||||
continue # Skip if the object does not exist
|
||||
|
||||
# Iterate over each attribute in the current object
|
||||
for attr_name, attr_props in attr_dict['attributes'].items():
|
||||
if not isinstance(attr_props, dict):
|
||||
#attr_props = {'value': attr_props}
|
||||
# Check if the attribute exists (for updating)
|
||||
if attr_name in DataOpsAPI.file_obj[obj_name].attrs:
|
||||
update_dict[attr_name] = attr_props
|
||||
# Otherwise, it's a new attribute to append
|
||||
else:
|
||||
append_dict[attr_name] = attr_props
|
||||
else:
|
||||
# Check if the attribute is marked for deletion
|
||||
if attr_props.get('delete', False):
|
||||
delete_dict[attr_name] = attr_props
|
||||
|
||||
# Perform a single pass for all three operations
|
||||
if append_dict:
|
||||
DataOpsAPI.append_metadata(obj_name, append_dict)
|
||||
if update_dict:
|
||||
DataOpsAPI.update_metadata(obj_name, update_dict)
|
||||
if delete_dict:
|
||||
DataOpsAPI.delete_metadata(obj_name, delete_dict)
|
||||
|
||||
# Close hdf5 file
|
||||
DataOpsAPI.unload_file_obj()
|
||||
# Regenerate yaml snapshot of updated HDF5 file
|
||||
output_yml_filename_path = hdf5_ops.serialize_metadata(input_hdf5_file)
|
||||
print(f'{output_yml_filename_path} was successfully regenerated from the updated version of{input_hdf5_file}')
|
||||
|
||||
def count(hdf5_obj,yml_dict):
|
||||
print(hdf5_obj.name)
|
||||
if isinstance(hdf5_obj,h5py.Group) and len(hdf5_obj.name.split('/')) <= 4:
|
||||
obj_review = yml_dict[hdf5_obj.name]
|
||||
additions = [not (item in hdf5_obj.attrs.keys()) for item in obj_review['attributes'].keys()]
|
||||
count_additions = sum(additions)
|
||||
deletions = [not (item in obj_review['attributes'].keys()) for item in hdf5_obj.attrs.keys()]
|
||||
count_delections = sum(deletions)
|
||||
print('additions',count_additions, 'deletions', count_delections)
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
if len(sys.argv) < 4:
|
||||
print("Usage: python metadata_revision.py update <path/to/target_file.hdf5> <path/to/metadata_review_file.yaml>")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if sys.argv[1] == 'update':
|
||||
input_hdf5_file = sys.argv[2]
|
||||
review_yaml_file = sys.argv[3]
|
||||
update_hdf5_file_with_review(input_hdf5_file, review_yaml_file)
|
||||
#run(sys.argv[2])
|
||||
import sys
|
||||
import os
|
||||
|
||||
try:
|
||||
thisFilePath = os.path.abspath(__file__)
|
||||
except NameError:
|
||||
print("Error: __file__ is not available. Ensure the script is being run from a file.")
|
||||
print("[Notice] Path to DIMA package may not be resolved properly.")
|
||||
thisFilePath = os.getcwd() # Use current directory or specify a default
|
||||
|
||||
dimaPath = os.path.normpath(os.path.join(thisFilePath, "..",'..')) # Move up to project root
|
||||
|
||||
if dimaPath not in sys.path: # Avoid duplicate entries
|
||||
sys.path.append(dimaPath)
|
||||
|
||||
import h5py
|
||||
import yaml
|
||||
import src.hdf5_ops as hdf5_ops
|
||||
|
||||
|
||||
def load_yaml(review_yaml_file):
|
||||
with open(review_yaml_file, 'r') as stream:
|
||||
try:
|
||||
return yaml.load(stream, Loader=yaml.FullLoader)
|
||||
except yaml.YAMLError as exc:
|
||||
print(exc)
|
||||
return None
|
||||
|
||||
def validate_yaml_dict(input_hdf5_file, yaml_dict):
|
||||
errors = []
|
||||
notes = []
|
||||
|
||||
with h5py.File(input_hdf5_file, 'r') as hdf5_file:
|
||||
# 1. Check for valid object names
|
||||
for key in yaml_dict:
|
||||
if key not in hdf5_file:
|
||||
error_msg = f"Error: {key} is not a valid object's name in the HDF5 file."
|
||||
print(error_msg)
|
||||
errors.append(error_msg)
|
||||
|
||||
# 2. Confirm metadata dict for each object is a dictionary
|
||||
for key, meta_dict in yaml_dict.items():
|
||||
if not isinstance(meta_dict, dict):
|
||||
error_msg = f"Error: Metadata for {key} should be a dictionary."
|
||||
print(error_msg)
|
||||
errors.append(error_msg)
|
||||
else:
|
||||
if 'attributes' not in meta_dict:
|
||||
warning_msg = f"Warning: No 'attributes' in metadata dict for {key}."
|
||||
print(warning_msg)
|
||||
notes.append(warning_msg)
|
||||
|
||||
# 3. Verify update, append, and delete operations are well specified
|
||||
for key, meta_dict in yaml_dict.items():
|
||||
attributes = meta_dict.get("attributes", {})
|
||||
|
||||
for attr_name, attr_value in attributes.items():
|
||||
# Ensure the object exists before accessing attributes
|
||||
if key in hdf5_file:
|
||||
hdf5_obj_attrs = hdf5_file[key].attrs # Access object-specific attributes
|
||||
|
||||
if attr_name in hdf5_obj_attrs:
|
||||
# Attribute exists: it can be updated or deleted
|
||||
if isinstance(attr_value, dict) and "delete" in attr_value:
|
||||
note_msg = f"Note: '{attr_name}' in {key} may be deleted if 'delete' is set as true."
|
||||
print(note_msg)
|
||||
notes.append(note_msg)
|
||||
else:
|
||||
note_msg = f"Note: '{attr_name}' in {key} will be updated."
|
||||
print(note_msg)
|
||||
notes.append(note_msg)
|
||||
else:
|
||||
# Attribute does not exist: it can be appended or flagged as an invalid delete
|
||||
if isinstance(attr_value, dict) and "delete" in attr_value:
|
||||
error_msg = f"Error: Cannot delete non-existent attribute '{attr_name}' in {key}."
|
||||
print(error_msg)
|
||||
errors.append(error_msg)
|
||||
else:
|
||||
note_msg = f"Note: '{attr_name}' in {key} will be appended."
|
||||
print(note_msg)
|
||||
notes.append(note_msg)
|
||||
else:
|
||||
error_msg = f"Error: '{key}' is not a valid object in the HDF5 file."
|
||||
print(error_msg)
|
||||
errors.append(error_msg)
|
||||
|
||||
return len(errors) == 0, errors, notes
|
||||
|
||||
|
||||
def update_hdf5_file_with_review(input_hdf5_file, review_yaml_file):
|
||||
|
||||
"""
|
||||
Updates, appends, or deletes metadata attributes in an HDF5 file based on a provided YAML dictionary.
|
||||
|
||||
Parameters:
|
||||
-----------
|
||||
input_hdf5_file : str
|
||||
Path to the HDF5 file.
|
||||
|
||||
yaml_dict : dict
|
||||
Dictionary specifying objects and their attributes with operations. Example format:
|
||||
{
|
||||
"object_name": { "attributes" : "attr_name": { "value": attr_value,
|
||||
"delete": true | false
|
||||
}
|
||||
}
|
||||
}
|
||||
"""
|
||||
yaml_dict = load_yaml(review_yaml_file)
|
||||
|
||||
success, errors, notes = validate_yaml_dict(input_hdf5_file,yaml_dict)
|
||||
if not success:
|
||||
raise ValueError(f"Review yaml file {review_yaml_file} is invalid. Validation errors: {errors}")
|
||||
|
||||
# Initialize HDF5 operations manager
|
||||
DataOpsAPI = hdf5_ops.HDF5DataOpsManager(input_hdf5_file)
|
||||
DataOpsAPI.load_file_obj()
|
||||
|
||||
# Iterate over each object in the YAML dictionary
|
||||
for obj_name, attr_dict in yaml_dict.items():
|
||||
# Prepare dictionaries for append, update, and delete actions
|
||||
append_dict = {}
|
||||
update_dict = {}
|
||||
delete_dict = {}
|
||||
|
||||
if not obj_name in DataOpsAPI.file_obj:
|
||||
continue # Skip if the object does not exist
|
||||
|
||||
# Iterate over each attribute in the current object
|
||||
for attr_name, attr_props in attr_dict['attributes'].items():
|
||||
if not isinstance(attr_props, dict):
|
||||
#attr_props = {'value': attr_props}
|
||||
# Check if the attribute exists (for updating)
|
||||
if attr_name in DataOpsAPI.file_obj[obj_name].attrs:
|
||||
update_dict[attr_name] = attr_props
|
||||
# Otherwise, it's a new attribute to append
|
||||
else:
|
||||
append_dict[attr_name] = attr_props
|
||||
else:
|
||||
# Check if the attribute is marked for deletion
|
||||
if attr_props.get('delete', False):
|
||||
delete_dict[attr_name] = attr_props
|
||||
|
||||
# Perform a single pass for all three operations
|
||||
if append_dict:
|
||||
DataOpsAPI.append_metadata(obj_name, append_dict)
|
||||
if update_dict:
|
||||
DataOpsAPI.update_metadata(obj_name, update_dict)
|
||||
if delete_dict:
|
||||
DataOpsAPI.delete_metadata(obj_name, delete_dict)
|
||||
|
||||
# Close hdf5 file
|
||||
DataOpsAPI.unload_file_obj()
|
||||
# Regenerate yaml snapshot of updated HDF5 file
|
||||
output_yml_filename_path = hdf5_ops.serialize_metadata(input_hdf5_file)
|
||||
print(f'{output_yml_filename_path} was successfully regenerated from the updated version of{input_hdf5_file}')
|
||||
|
||||
def count(hdf5_obj,yml_dict):
|
||||
print(hdf5_obj.name)
|
||||
if isinstance(hdf5_obj,h5py.Group) and len(hdf5_obj.name.split('/')) <= 4:
|
||||
obj_review = yml_dict[hdf5_obj.name]
|
||||
additions = [not (item in hdf5_obj.attrs.keys()) for item in obj_review['attributes'].keys()]
|
||||
count_additions = sum(additions)
|
||||
deletions = [not (item in obj_review['attributes'].keys()) for item in hdf5_obj.attrs.keys()]
|
||||
count_delections = sum(deletions)
|
||||
print('additions',count_additions, 'deletions', count_delections)
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
if len(sys.argv) < 4:
|
||||
print("Usage: python metadata_revision.py update <path/to/target_file.hdf5> <path/to/metadata_review_file.yaml>")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if sys.argv[1] == 'update':
|
||||
input_hdf5_file = sys.argv[2]
|
||||
review_yaml_file = sys.argv[3]
|
||||
update_hdf5_file_with_review(input_hdf5_file, review_yaml_file)
|
||||
#run(sys.argv[2])
|
||||
|
Reference in New Issue
Block a user