diff --git a/pipelines/steps/apply_calibration_factors.py b/pipelines/steps/apply_calibration_factors.py index 3d5bb7f..7e10a9a 100644 --- a/pipelines/steps/apply_calibration_factors.py +++ b/pipelines/steps/apply_calibration_factors.py @@ -3,33 +3,40 @@ import sys, os try: thisFilePath = os.path.abspath(__file__) - print(thisFilePath) + print('File path:',thisFilePath) except NameError: print("[Notice] The __file__ attribute is unavailable in this environment (e.g., Jupyter or IDLE).") print("When using a terminal, make sure the working directory is set to the script's location to prevent path issues (for the DIMA submodule)") #print("Otherwise, path to submodule DIMA may not be resolved properly.") thisFilePath = os.getcwd() # Use current directory or specify a default -dimaPath = os.path.normpath(os.path.join(thisFilePath, "..", "..",'..')) # Move up to project root -projectPath = os.path.normpath(os.path.join(dimaPath,'..')) -print(dimaPath) +projectPath = os.path.normpath(os.path.join(thisFilePath, "..", "..",'..')) # Move up to project root +#print('Project path:', projectPath) +dimaPath = os.path.normpath('/'.join([projectPath,'dima'])) +#print('DIMA path:', dimaPath) + + +# Set up project root directory +sys.path.insert(0,projectPath) +sys.path.insert(0,dimaPath) + + +#import importlib.util +#print("Checking if projectPath exists:", os.path.exists(projectPath)) +#if os.path.exists(projectPath): +# print("Contents of dimaPath:", os.listdir(projectPath)) +#print("Checking if Python can find 'dima':", importlib.util.find_spec("dima")) + import numpy as np import pandas as pd from math import prod # To replace multiplyall import argparse import yaml -# Set up project root directory -#root_dir = os.path.abspath(os.curdir) -#sys.path.append(root_dir) -sys.path.append(dimaPath) - import dima.src.hdf5_ops as dataOps +import dima.utils.g5505_utils as utils import pipelines.steps.utils as stepUtils -import numpy as np -import pandas as pd -from math import prod def compute_calibration_factors(data_table, datetime_var_name, calibration_params, calibration_factors): """ @@ -190,7 +197,7 @@ if __name__ == '__main__': dataManager = dataOps.HDF5DataOpsManager(args.data_file) dataManager.load_file_obj() dataset_name = '/'+args.dataset_name - data_table = dataManager.extract_dataset_as_dataframe('/'+args.dataset_name) + data_table = dataManager.extract_dataset_as_dataframe(dataset_name) datetime_var, datetime_format = dataManager.infer_datetime_variable(dataset_name) #data_table['t_start_Buf'] = data_table['t_start_Buf'].apply(lambda x : x.decode()) @@ -223,36 +230,47 @@ if __name__ == '__main__': # Perform calibration try: + # Define output directory of apply_calibration_factors() step + suffix = 'processed' + if len(parent_instrument.split('/')) >= 2: + instFolder = parent_instrument.split('/')[0] + category = parent_instrument.split('/')[1] + else: + instFolder = parent_instrument.split('/')[0] + category = '' + + path_to_output_folder, ext = os.path.splitext('/'.join([path_to_output_dir,f'{instFolder}_{suffix}',category])) processingScriptRelPath = os.path.relpath(thisFilePath,start=projectPath) - print(processingScriptRelPath) + if not os.path.exists(path_to_output_folder): + os.makedirs(path_to_output_folder) - metadata = {'actris_level' : 1, 'processing_script': processingScriptRelPath.replace(os.sep,'/')} + print(f'Processing script : {processingScriptRelPath}') + print(f'Output directory : {path_to_output_folder}') - path_to_output_file, ext = os.path.splitext('/'.join([path_to_output_dir,parent_instrument,parent_file])) - path_to_calibrated_file = ''.join([path_to_output_file, '_calibrated.csv']) - - path_to_calibration_factors_file = ''.join([path_to_output_file, '_calibration_factors.csv']) - - - - #path_tail, path_head = os.path.split(path_to_calibrated_file) - #path_to_metadata_file = '/'.join([path_tail, 'data_lineage_metadata.json']) - - print('Path to output file :', path_to_calibrated_file) - import dima.utils.g5505_utils as utils - import json + # Apply calibration factors to input data_table and generate data lineage metadata calibration_factor_table, calibrated_table = apply_calibration_factors(data_table, datetime_var, args.calibration_file) #calibration_factors) - metadata['processing_date'] = utils.created_at() + + metadata = {'actris_level' : 1, + 'processing_script': processingScriptRelPath.replace(os.sep,'/'), + 'processing_date' : utils.created_at()} + + # Save output tables to csv file and save/or update data lineage record + filename, ext = os.path.splitext(parent_file) + path_to_calibrated_file = '/'.join([path_to_output_folder, f'{filename}_calibrated.csv']) + path_to_calibration_factors_file = '/'.join([path_to_output_folder, f'{filename}_calibration_factors.csv']) + calibrated_table.to_csv(path_to_calibrated_file, index=False) calibration_factor_table.to_csv(path_to_calibration_factors_file, index=False) status = stepUtils.record_data_lineage(path_to_calibrated_file, projectPath, metadata) status = stepUtils.record_data_lineage(path_to_calibration_factors_file, projectPath, metadata) - + print('Calibration factors saved to', path_to_calibration_factors_file) print(f"Calibrated data saved to {path_to_calibrated_file}") + print(f"Data lineage saved to {path_to_output_dir}") + except Exception as e: print(f"Error during calibration: {e}") exit(1) \ No newline at end of file diff --git a/pipelines/steps/compute_automated_flags.py b/pipelines/steps/compute_automated_flags.py index 46bd163..4d3fbd6 100644 --- a/pipelines/steps/compute_automated_flags.py +++ b/pipelines/steps/compute_automated_flags.py @@ -93,7 +93,7 @@ if __name__ == '__main__': parser = argparse.ArgumentParser(description="Calibrate species data using calibration factors.") parser.add_argument('data_file', type=str, help="Path to the input HDF5 file containing the data table.") parser.add_argument('dataset_name', type=str, help ='Relative path to data_table (i.e., dataset name) in HDF5 file') - parser.add_argument('calibration_file', type=str, help="Path to the input YAML file containing calibration factors.") + parser.add_argument('validity_thersholds_file', type=str, help="Path to the input YAML file containing calibration factors.") #parser.add_argument('output_file', type=str, help="Path to save the output calibrated data as a CSV file.") args = parser.parse_args() @@ -119,10 +119,10 @@ if __name__ == '__main__': parent_file = data_table_metadata.loc[dataset_name_idx,'parent_file'].values[0] dataManager.unload_file_obj() - print(args.calibration_file) + print(args.validity_thersholds_file) - with open(args.calibration_file, 'r') as stream: - calibration_factors = yaml.load(stream, Loader=yaml.FullLoader) + with open(args.validity_thersholds_file, 'r') as stream: + validity_thersholds_dict = yaml.load(stream, Loader=yaml.FullLoader) except Exception as e: print(f"Error loading input files: {e}") exit(1) @@ -135,32 +135,45 @@ if __name__ == '__main__': # Perform calibration try: + # Define output directory of apply_calibration_factors() step + suffix = 'flags' + if len(parent_instrument.split('/')) >= 2: + instFolder = parent_instrument.split('/')[0] + category = parent_instrument.split('/')[1] + else: + instFolder = parent_instrument.split('/')[0] + category = '' + path_to_output_folder, ext = os.path.splitext('/'.join([path_to_output_dir,f'{instFolder}_{suffix}',category])) processingScriptRelPath = os.path.relpath(thisFilePath,start=projectPath) - print(processingScriptRelPath) + if not os.path.exists(path_to_output_folder): + os.makedirs(path_to_output_folder) - metadata = {'actris_level' : 1, 'processing_script': processingScriptRelPath.replace(os.sep,'/')} + print('Processing script %s:', processingScriptRelPath) + print('Output directory: %s', path_to_output_folder) - path_to_output_file, ext = os.path.splitext('/'.join([path_to_output_dir,parent_instrument,parent_file])) - path_to_calibrated_file = ''.join([path_to_output_file, '_flags.csv']) + # Compute diagnostic flags based on validity thresholds defined in configuration_file_dict + + flags_table = compute_diagnostic_variable_flags(data_table, validity_thersholds_dict) + metadata = {'actris_level' : 1, + 'processing_script': processingScriptRelPath.replace(os.sep,'/'), + 'processing_date' : utils.created_at() + } - path_tail, path_head = os.path.split(path_to_calibrated_file) - path_to_metadata_file = '/'.join([path_tail, 'data_lineage_metadata.json']) + # Save output tables to csv file and save/or update data lineage record + filename, ext = os.path.splitext(parent_file) + path_to_flags_file = '/'.join([path_to_output_folder, f'{filename}_flags.csv']) + #path_to_calibration_factors_file = '/'.join([path_to_output_folder, f'{filename}_calibration_factors.csv']) - print('Path to output file :', path_to_calibrated_file) + flags_table.to_csv(path_to_flags_file, index=False) + + status = stepUtils.record_data_lineage(path_to_flags_file, projectPath, metadata) - print(calibration_factors.keys()) - calibrated_table = compute_diagnostic_variable_flags(data_table, calibration_factors) - metadata['processing_date'] = utils.created_at() - calibrated_table.to_csv(path_to_calibrated_file, index=False) + print(f"Flags saved to {path_to_flags_file}") + print(f"Data lineage saved to {path_to_output_dir}") - status = stepUtils.record_data_lineage(path_to_calibrated_file, projectPath, metadata) - - - - print(f"Calibrated data saved to {path_to_calibrated_file}") - print(f"Metadata for calibrated data saved to {path_to_metadata_file}") except Exception as e: print(f"Error during calibration: {e}") - exit(1) \ No newline at end of file + exit(1) +