Files
dima/src/data_integration_lib.py

113 lines
6.3 KiB
Python

import os
import src.hdf5_lib as hdf5_lib
import src.g5505_utils as utils
import yaml
import logging
from datetime import datetime
def integrate_data_sources(yaml_config_file_path, log_dir='logs/'):
""" Integrates data sources specified by the input configuration file into HDF5 files.
Parameters:
yaml_config_file_path (str): Path to the YAML configuration file.
log_dir (str): Directory to save the log file.
Returns:
str: Path to the created HDF5 file.
"""
date = utils.created_at()
utils.setup_logging(log_dir, f"integrate_data_sources_{date}.log")
with open(yaml_config_file_path,'r') as stream:
try:
config_dict = yaml.load(stream, Loader=yaml.FullLoader)
except yaml.YAMLError as exc:
logging.error("Error loading YAML file: %s", exc)
raise
output_filename = lambda name, datetime, initials: '_'.join([name,datetime,initials])+'.h5'
exp_campaign_name = config_dict['experiment_name']
initials = config_dict['user_name']
input_file_dir = config_dict['input_file_directory']
output_dir = config_dict['output_file_directory']
select_dir_keywords = config_dict['instrument_datafolder']
root_metadata = {'project_name' : config_dict['project_name'],
'experiment_name' : config_dict['experiment_name'],
'user_name' : config_dict['user_name'],
}
if config_dict['datetime_steps']:
datetime_augment_dict = {}
for datetime_step in config_dict['datetime_steps']:
tmp = datetime.strptime(datetime_step,'%Y-%m-%d %H-%M-%S') #convert(datetime_step)
datetime_augment_dict[tmp] = [tmp.strftime('%Y-%m-%d'),tmp.strftime('%Y_%m_%d'),tmp.strftime('%Y.%m.%d'),tmp.strftime('%Y%m%d')]
print(tmp)
if 'experimental_step' in config_dict['integration_mode']:
for datetime_step in datetime_augment_dict.keys():
select_file_keywords = datetime_augment_dict[datetime_step]
output_filename_step = output_filename(exp_campaign_name,datetime_step.strftime('%Y-%m-%d'),initials)
output_filename_step = os.path.join(output_dir,output_filename_step)
print(output_filename_step)
output_filename_path = hdf5_lib.create_hdf5_file_from_filesystem_path(output_filename_step,
input_file_dir,
select_dir_keywords,
select_file_keywords,
root_metadata_dict = root_metadata)
elif 'collection' in config_dict['integration_mode']:
select_file_keywords = []
for datetime_step in datetime_augment_dict.keys():
select_file_keywords = select_file_keywords + datetime_augment_dict[datetime_step]
min_datetime = min(datetime_augment_dict.keys())
max_datetime = max(datetime_augment_dict.keys())
output_filename_step = output_filename(exp_campaign_name,min_datetime.strftime('%Y-%m-%d')+'_'+max_datetime.strftime('%Y-%m-%d'),initials)
output_filename_step = os.path.join(output_dir,output_filename_step)
output_filename_path = hdf5_lib.create_hdf5_file_from_filesystem_path(output_filename_step,
input_file_dir,
select_dir_keywords,
select_file_keywords,
root_metadata_dict = root_metadata)
else:
output_filename_step = output_filename(exp_campaign_name,config_dict['experiment_date'],initials)
output_filename_step = os.path.join(output_dir,output_filename_step)
output_filename_path = hdf5_lib.create_hdf5_file_from_filesystem_path(output_filename_step,
input_file_dir,
select_dir_keywords,
select_file_keywords=[],
root_metadata_dict = root_metadata)
"""for datetime_step in config_dict['datetime_steps']:
tmp = datetime.strptime(datetime_step,'%Y-%m-%d %H-%M-%S') #convert(datetime_step)
root_metadata['creation_date'] = datetime_step
print(tmp)
select_file_keywords = [tmp.strftime('%Y-%m-%d'),tmp.strftime('%Y_%m_%d'),tmp.strftime('%Y.%m.%d'),tmp.strftime('%Y%m%d')]
print(select_file_keywords)
output_filename_step = output_filename(exp_campaign_name,tmp.strftime('%Y-%m-%d_%H-%M-%S'),initials)
output_filename_step = os.path.join(output_dir,output_filename_step)
print(output_filename_step)
output_filename_path, output_yml_filename_path = hdf5_lib.create_hdf5_file_from_filesystem_path(output_filename_step,
input_file_dir,
select_dir_keywords,
select_file_keywords,
root_metadata_dict = root_metadata)"""
return output_filename_path