diff --git a/pipelines/steps/prepare_ebas_submission.py b/pipelines/steps/prepare_ebas_submission.py index a0201a7..7302b44 100644 --- a/pipelines/steps/prepare_ebas_submission.py +++ b/pipelines/steps/prepare_ebas_submission.py @@ -62,87 +62,67 @@ def join_tables(csv_files: list): return acum_df - +#import argparse +#import os +#import pandas as pd +from third_party.acsmProcessingSoftware.src import rawto012 +#from utils import load_project_yaml_files, metadata_dict_to_dataframe, join_tables # Adjust imports based on actual file locations +def main(paths_to_processed_files : list, path_to_flags : str, month : int = None): + # Set up argument parsing - -if __name__ == "__main__": - - path1 = 'data/collection_JFJ_2024_LeilaS_2025-02-17_2025-02-17/ACSM_TOFWARE_processed/2024/ACSM_JFJ_2024_timeseries_calibrated.csv' - path2 = 'data/collection_JFJ_2024_LeilaS_2025-02-17_2025-02-17/ACSM_TOFWARE_processed/2024/ACSM_JFJ_2024_timeseries_calibrated_err.csv' - path3 = 'data/collection_JFJ_2024_LeilaS_2025-02-17_2025-02-17/ACSM_TOFWARE_processed/2024/ACSM_JFJ_2024_timeseries_calibration_factors.csv' - path4 = 'data/collection_JFJ_2024_LeilaS_2025-02-17_2025-02-17/ACSM_TOFWARE_flags/2024/ACSM_JFJ_2024_timeseries_flags.csv' - - acum_df = join_tables([path1,path2,path3]) - + acum_df = join_tables(paths_to_processed_files) acsm_to_ebas = load_project_yaml_files(projectPath, "acsm_to_ebas.yaml") - # Select variables that are both in the acsm to ebas dict and acum_df + # Select variables that are both in the acsm_to_ebas dict and acum_df reduced_set_of_vars = [key for key in acum_df.columns if key in acsm_to_ebas['renaming_map'].keys()] + acum_df = acum_df.loc[:, reduced_set_of_vars].rename(columns=acsm_to_ebas['renaming_map']) - acum_df = acum_df.loc[:,reduced_set_of_vars].rename(columns=acsm_to_ebas['renaming_map']) - #print("Before renaming:", acum_df.columns) - #print("Renaming map keys:", acsm_to_ebas['renaming_map'].keys()) - - - - - #print(reduced_set_of_vars) - - flags_acum_df = join_tables([path4]) + flags_acum_df = join_tables([path_to_flags]) flags_acum_df = flags_acum_df.rename(columns=acsm_to_ebas['renaming_map']) - - - - # Ensure time columns are datetime acum_df['ACSM_time'] = pd.to_datetime(acum_df['ACSM_time']) - flags_acum_df['ACSM_time'] = pd.to_datetime(acum_df['ACSM_time']) + flags_acum_df['ACSM_time'] = pd.to_datetime(flags_acum_df['ACSM_time']) + + # Apply month filter if specified + if month: + acum_df = acum_df[acum_df['ACSM_time'].dt.month == month] + flags_acum_df = flags_acum_df[flags_acum_df['ACSM_time'].dt.month == month] # Count the number of NaT (null) values num_nats = acum_df['ACSM_time'].isna().sum() - # Get the total number of rows total_rows = len(acum_df) - # Calculate the percentage of NaT values percentage_nats = (num_nats / total_rows) * 100 print(f"Total rows: {total_rows}") print(f"NaT (missing) values: {num_nats}") print(f"Percentage of data loss: {percentage_nats:.2f}%") - # Count the number of NaT (null) values num_nats = flags_acum_df['ACSM_time'].isna().sum() - # Get the total number of rows total_rows = len(flags_acum_df) - # Calculate the percentage of NaT values percentage_nats = (num_nats / total_rows) * 100 + print(f"Total rows: {total_rows}") print(f"NaT (missing) values: {num_nats}") print(f"Percentage of data loss: {percentage_nats:.2f}%") - nat_acum = acum_df['ACSM_time'].isna() nat_flags = flags_acum_df['ACSM_time'].isna() valid_rows = ~(nat_acum | nat_flags) # Compute valid rows in one step - # Define file paths - #path_to_detection_limits = os.path.normpath(os.path.join(projectPath, 'pipelines/params/limits_of_detection.yaml')) - #path_to_station_params = os.path.normpath(os.path.join(projectPath, 'pipelines/params/station_params.yaml')) - # Load YAML files - #detection_limits = load_yaml(path_to_detection_limits) detection_limits = load_project_yaml_files(projectPath, "limits_of_detection.yaml") - station_params = load_project_yaml_files(projectPath, "station_params.yaml") # load_yaml(path_to_station_params) + station_params = load_project_yaml_files(projectPath, "station_params.yaml") # Extract dictionaries from required keys - lod_dict = detection_limits.get('LOD', {}).get('variables',{}) # Extract "LOD" dictionary - jfj_dict = station_params.get('stations', {}).get('JFJ', {}) # Extract "JFJ" dictionary + lod_dict = detection_limits.get('LOD', {}).get('variables', {}) + jfj_dict = station_params.get('stations', {}).get('JFJ', {}) # Convert dictionaries to DataFrames using the existing function - lod_df = metadata_dict_to_dataframe(lod_dict, shape = (len(acum_df),len(lod_dict))) - jfj_df = metadata_dict_to_dataframe(jfj_dict, shape = (len(acum_df),len(jfj_dict))) + lod_df = metadata_dict_to_dataframe(lod_dict, shape=(len(acum_df), len(lod_dict))) + jfj_df = metadata_dict_to_dataframe(jfj_dict, shape=(len(acum_df), len(jfj_dict))) # Ensure indexes are properly aligned for merging acum_df = acum_df.reset_index() # Convert index to a column for merging @@ -153,22 +133,36 @@ if __name__ == "__main__": # Merge with JFJ DataFrame acum_df = acum_df.merge(jfj_df, left_index=True, right_index=True, how='left') - acum_df = acum_df.rename(columns=acsm_to_ebas['renaming_map']) - - #reduced_set_of_vars = [key for key in reduced_set_of_vars if '' not in key] - acum_df.loc[valid_rows.to_numpy(),:].to_csv('data/JFJ_ACSM-017_2024.txt',sep='\t',index=None, date_format="%Y/%m/%d %H:%M:%S") - flags_acum_df.loc[valid_rows.to_numpy(),:].to_csv('data/JFJ_ACSM-017_FLAGS_2024.txt',sep='\t',index=None, date_format="%Y/%m/%d %H:%M:%S") + # Save results + output_dir = os.path.join(projectPath,'data') + output_file1 = os.path.join(output_dir, 'JFJ_ACSM-017_2024.txt') + output_file2 = os.path.join(output_dir, 'JFJ_ACSM-017_FLAGS_2024.txt') + #output_file1 = os.path.join(output_dir, f'JFJ_ACSM-017_2024_month{args.month}.txt' if args.month else 'JFJ_ACSM-017_2024.txt') + #output_file2 = os.path.join(output_dir, f'JFJ_ACSM-017_FLAGS_2024_month{args.month}.txt' if args.month else 'JFJ_ACSM-017_FLAGS_2024.txt') + acum_df.loc[:, :].to_csv(output_file1, sep='\t', index=None, date_format="%Y/%m/%d %H:%M:%S") + flags_acum_df.loc[:, :].to_csv(output_file2, sep='\t', index=None, date_format="%Y/%m/%d %H:%M:%S") - - from third_party.acsmProcessingSoftware.src import rawto012 - + # Run external processing application app = rawto012.Application() - infile = 'data/JFJ_ACSM-017_2024.txt' - acq_err_log = 'data/JFJ_ACSM-017_FLAGS_2024.txt' - outdir = 'data/' + infile = output_file1 + acq_err_log = output_file2 + outdir = output_dir app.process(infile, acq_err_log, outdir=outdir) +if __name__ == '__main__': + parser = argparse.ArgumentParser(description="Process and calibrate ACSM data for JFJ station.") + parser.add_argument('--acsm_paths', type=str, required=True, nargs=3, help="Paths to the ACSM timeseries calibrated CSV file, the error CSV file, and the calibration factors CSV file.") + parser.add_argument('--acsm_flags_path', type=str, required=True, help="Path to the ACSM flags CSV file.") + parser.add_argument('--month', type=int, choices=range(1, 13), help="Filter data for a specific month (1-12).") + + args = parser.parse_args() + + # Load data + csv_files = args.acsm_paths # list of filenames + flags_file = args.acsm_flags_path + month = args.month + main(csv_files, flags_file, month)