Refactor command line interface to run a main function on the parsed arguments.

This commit is contained in:
2025-03-14 13:36:52 +01:00
parent 8cdd8a0771
commit 39a9ab07a2

View File

@ -125,6 +125,7 @@ def load_calibration_file(calibration_factors_file):
# Get path to file where calibrations params are defined # Get path to file where calibrations params are defined
path_to_calib_params_file = calibration_factors.get("calibration_params", {}).get('path_to_file') path_to_calib_params_file = calibration_factors.get("calibration_params", {}).get('path_to_file')
path_to_calib_params_file = os.path.normpath(os.path.join(projectPath,path_to_calib_params_file))
# Validate # Validate
if not path_to_calib_params_file: if not path_to_calib_params_file:
@ -199,81 +200,50 @@ def apply_calibration_factors(data_table, datetime_var_name, calibration_factors
return calibration_factor_table, new_data_table return calibration_factor_table, new_data_table
def main(data_file, calibration_file):
"""Main function for processing the data with calibration."""
if __name__ == '__main__':
# Set up argument parsing
parser = argparse.ArgumentParser(description="Calibrate species data using calibration factors.")
parser.add_argument('data_file', type=str, help="Path to the input HDF5 file containing the data table.")
#parser.add_argument('dataset_name', type=str, help ='Relative path to data_table (i.e., dataset name) in HDF5 file')
parser.add_argument('calibration_file', type=str, help="Path to the input YAML file containing calibration factors.")
#parser.add_argument('output_file', type=str, help="Path to save the output calibrated data as a CSV file.")
args = parser.parse_args()
if not any(item in args.calibration_file for item in ['.yaml','.yml']):
raise TypeError(f"Invalid file type. Calibration file {args.calibration_file} needs to be a valid yaml file.")
# Load input data and calibration factors # Load input data and calibration factors
try: try:
#data_table = pd.read_json(args.data_file) print(f"Opening data file: {data_file} using src.hdf5_ops.HDF5DataOpsManager().")
dataManager = dataOps.HDF5DataOpsManager(data_file)
print(f'Openning data file : {args.data_file} using src.hdf5_ops.HDF5DataManager().')
dataManager = dataOps.HDF5DataOpsManager(args.data_file)
dataManager.load_file_obj() dataManager.load_file_obj()
dataManager.extract_and_load_dataset_metadata() dataManager.extract_and_load_dataset_metadata()
dataset_metadata_df = dataManager.dataset_metadata_df.copy() dataset_metadata_df = dataManager.dataset_metadata_df.copy()
keywords = ['ACSM_TOFWARE/', 'ACSM_JFJ_', '_timeseries.txt/data_table']
keywords = ['ACSM_TOFWARE/','ACSM_JFJ_','_timeseries.txt/data_table']
find_keyword = [all(keyword in item for keyword in keywords) for item in dataset_metadata_df['dataset_name']] find_keyword = [all(keyword in item for keyword in keywords) for item in dataset_metadata_df['dataset_name']]
if sum(find_keyword)!=1: if sum(find_keyword) != 1:
input_file_name = '<year>'.join(keywords) input_file_name = '<year>'.join(keywords)
raise RuntimeError(f'Input file {input_file_name} was neither found nor uniquely identified.') raise RuntimeError(f'Input file {input_file_name} was neither found nor uniquely identified.')
dataset_name = dataset_metadata_df['dataset_name'][find_keyword].values[0] dataset_name = dataset_metadata_df['dataset_name'][find_keyword].values[0]
parent_file = dataset_metadata_df.loc[find_keyword,'parent_file'].values[0] parent_file = dataset_metadata_df.loc[find_keyword, 'parent_file'].values[0]
parent_instrument = dataset_metadata_df.loc[find_keyword,'parent_instrument'].values[0] parent_instrument = dataset_metadata_df.loc[find_keyword, 'parent_instrument'].values[0]
#dataset_name = '/'+args.dataset_name
data_table = dataManager.extract_dataset_as_dataframe(dataset_name) data_table = dataManager.extract_dataset_as_dataframe(dataset_name)
datetime_var, datetime_format = dataManager.infer_datetime_variable(dataset_name) datetime_var, datetime_format = dataManager.infer_datetime_variable(dataset_name)
print(dataset_metadata_df.head()) print(dataset_metadata_df.head())
print(parent_file) print(parent_file)
print(args.calibration_file) print(calibration_file)
except Exception as e: except Exception as e:
print(f"Error loading input files: {e}") print(f"Error loading input files: {e}")
exit(1) finally:
finally: dataManager.unload_file_obj()
dataManager.unload_file_obj() print(f'Closing data file: {data_file} to unlock the file.')
print(f'Closing data file : {args.data_file} to unlock the file.')
path_to_output_dir, ext = os.path.splitext(args.data_file) # Count NaT values and calculate percentage
print('Path to output directory :', path_to_output_dir)
# Count the number of NaT (null) values
num_nats = data_table[datetime_var].isna().sum() num_nats = data_table[datetime_var].isna().sum()
# Get the total number of rows
total_rows = len(data_table) total_rows = len(data_table)
# Calculate the percentage of NaT values
percentage_nats = (num_nats / total_rows) * 100 percentage_nats = (num_nats / total_rows) * 100
print(f"Total rows: {total_rows}") print(f"Total rows: {total_rows}")
print(f"NaT (missing) values: {num_nats}") print(f"NaT (missing) values: {num_nats}")
print(f"Percentage of data loss: {percentage_nats:.4f}%") print(f"Percentage of data loss: {percentage_nats:.4f}%")
# Perform calibration # Perform calibration
try: try:
# Define output directory of apply_calibration_factors() step
suffix = 'processed' suffix = 'processed'
if len(parent_instrument.split('/')) >= 2: if len(parent_instrument.split('/')) >= 2:
instFolder = parent_instrument.split('/')[0] instFolder = parent_instrument.split('/')[0]
@ -282,50 +252,66 @@ if __name__ == '__main__':
instFolder = parent_instrument.split('/')[0] instFolder = parent_instrument.split('/')[0]
category = '' category = ''
path_to_output_folder, ext = os.path.splitext('/'.join([path_to_output_dir,f'{instFolder}_{suffix}',category])) path_to_output_dir, _ = os.path.splitext(data_file)
processingScriptRelPath = os.path.relpath(thisFilePath,start=projectPath) path_to_output_folder = os.path.join(path_to_output_dir, f"{instFolder}_{suffix}", category)
if not os.path.exists(path_to_output_folder): if not os.path.exists(path_to_output_folder):
os.makedirs(path_to_output_folder) os.makedirs(path_to_output_folder)
print(f'Processing script : {processingScriptRelPath}') print(f"Output directory: {path_to_output_folder}")
print(f'Output directory : {path_to_output_folder}')
# Apply calibration factors to input data_table and generate data lineage metadata # Apply calibration factors to input data_table and generate data lineage metadata
calibration_factor_table, calibrated_table = apply_calibration_factors(data_table, datetime_var, args.calibration_file) #calibration_factors) calibration_factor_table, calibrated_table = apply_calibration_factors(data_table, datetime_var, calibration_file)
calibrated_table_err = generate_error_dataframe(calibrated_table, datetime_var) calibrated_table_err = generate_error_dataframe(calibrated_table, datetime_var)
# Define suffix to dataframe map to streamline dataframe export suffix_to_dataframe_dict = {
suffix_to_dataframe_dict = {'calibrated.csv': calibrated_table, 'calibrated.csv': calibrated_table,
'calibrated_err.csv': calibrated_table_err, 'calibrated_err.csv': calibrated_table_err,
'calibration_factors.csv': calibration_factor_table} 'calibration_factors.csv': calibration_factor_table
}
metadata = {'actris_level': 1, metadata = {
'processing_script': processingScriptRelPath.replace(os.sep, '/'), 'actris_level': 1,
'processing_date': utils.created_at(), 'processing_script': os.path.relpath(__file__, start=os.getcwd()),
'datetime_var': datetime_var} 'processing_date': utils.created_at(),
'datetime_var': datetime_var
}
# Save output tables to csv file and save/or update data lineage record # Save output tables to CSV and record data lineage
filename, ext = os.path.splitext(parent_file) filename, _ = os.path.splitext(parent_file)
if not ext: if not _:
filename += '.csv' # Handle edge case for missing file extension filename += '.csv'
for suffix, data_table in suffix_to_dataframe_dict.items(): for suffix, data_table in suffix_to_dataframe_dict.items():
# Form path to output file
path_to_output_file = os.path.join(path_to_output_folder, f'{filename}_{suffix}') path_to_output_file = os.path.join(path_to_output_folder, f'{filename}_{suffix}')
try: try:
data_table.to_csv(path_to_output_file, index=False) data_table.to_csv(path_to_output_file, index=False)
print(f"Saved {filename}_{suffix} to {path_to_output_folder}") print(f"Saved {filename}_{suffix} to {path_to_output_folder}")
except Exception as e: except Exception as e:
print(f"Failed to save {path_to_output_file} due to: {e}") print(f"Failed to save {path_to_output_file} due to: {e}")
continue continue
# Record data lineage # Record data lineage
metadata['suffix'] = suffix metadata['suffix'] = suffix
status = stepUtils.record_data_lineage(path_to_output_file, projectPath, metadata) stepUtils.record_data_lineage(path_to_output_file, os.getcwd(), metadata)
except Exception as e: except Exception as e:
print(f"Error during calibration: {e}") print(f"Error during calibration: {e}")
exit(1) exit(1)
if __name__ == '__main__':
# Set up argument parsing
parser = argparse.ArgumentParser(description="Calibrate species data using calibration factors.")
parser.add_argument('data_file', type=str, help="Path to the input HDF5 file containing the data table.")
#parser.add_argument('dataset_name', type=str, help ='Relative path to data_table (i.e., dataset name) in HDF5 file')
parser.add_argument('calibration_file', type=str, help="Path to the input YAML file containing calibration factors.")
#parser.add_argument('output_file', type=str, help="Path to save the output calibrated data as a CSV file.")
args = parser.parse_args()
if not any(item in args.calibration_file for item in ['.yaml', '.yml']):
raise TypeError(f"Invalid file type. Calibration file {args.calibration_file} needs to be a valid YAML file.")
main(args.data_file, args.calibration_file)