diff --git a/pipelines/steps/apply_calibration_factors.py b/pipelines/steps/apply_calibration_factors.py index 5eb5797..820d0da 100644 --- a/pipelines/steps/apply_calibration_factors.py +++ b/pipelines/steps/apply_calibration_factors.py @@ -267,7 +267,15 @@ if __name__ == '__main__': print('Path to output directory :', path_to_output_dir) - + # Count the number of NaT (null) values + num_nats = data_table[datetime_var].isna().sum() + # Get the total number of rows + total_rows = len(data_table) + # Calculate the percentage of NaT values + percentage_nats = (num_nats / total_rows) * 100 + print(f"Total rows: {total_rows}") + print(f"NaT (missing) values: {num_nats}") + print(f"Percentage of data loss: {percentage_nats:.4f}%") # Perform calibration diff --git a/pipelines/steps/generate_flags.py b/pipelines/steps/generate_flags.py index 6f17bd0..19d231f 100644 --- a/pipelines/steps/generate_flags.py +++ b/pipelines/steps/generate_flags.py @@ -251,7 +251,7 @@ with open('app/flags/ebas_dict.yaml','r') as stream: # Vectorized function for getting the rank of a flag def get_rank(flag): - return flag_ranking.get(flag, 0) # Default rank 0 for unknown flags + return flag_ranking.get(flag, np.nan) # Default rank 0 for unknown flags # Vectorized function for reconciling flags def reconcile_flags(data_table, flag_code, t1_idx, t2_idx, numflag_columns): @@ -280,7 +280,7 @@ def reconcile_flags(data_table, flag_code, t1_idx, t2_idx, numflag_columns): if __name__ == '__main__': # Set up argument parsing - parser = argparse.ArgumentParser(description="Calibrate species data using calibration factors.") + parser = argparse.ArgumentParser(description="Generate flags for diagnostics and species variables.") parser.add_argument( "--flag-type", @@ -343,14 +343,15 @@ if __name__ == '__main__': data_table = dataManager.extract_dataset_as_dataframe(dataset_name) datetime_var, datetime_var_format = dataManager.infer_datetime_variable(dataset_name) - #dataManager.extract_and_load_dataset_metadata() - #dataset_metadata_df = dataManager.dataset_metadata_df.copy() - #print(dataset_metadata_df.head()) - - #dataset_name_idx = dataset_metadata_df.index[(dataset_metadata_df['dataset_name']==args.dataset_name).to_numpy()] - #data_table_metadata = dataset_metadata_df.loc[dataset_name_idx,:] - #parent_instrument = data_table_metadata.loc[dataset_name_idx,'parent_instrument'].values[0] - #parent_file = data_table_metadata.loc[dataset_name_idx,'parent_file'].values[0] + # Count the number of NaT (null) values + num_nats = data_table[datetime_var].isna().sum() + # Get the total number of rows + total_rows = len(data_table) + # Calculate the percentage of NaT values + percentage_nats = (num_nats / total_rows) * 100 + print(f"Total rows: {total_rows}") + print(f"NaT (missing) values: {num_nats}") + print(f"Percentage of data loss: {percentage_nats:.4f}%") dataManager.unload_file_obj() diff --git a/pipelines/steps/prepare_ebas_submission.py b/pipelines/steps/prepare_ebas_submission.py index 41cc71d..0d57ad2 100644 --- a/pipelines/steps/prepare_ebas_submission.py +++ b/pipelines/steps/prepare_ebas_submission.py @@ -39,7 +39,8 @@ def join_tables(csv_files: list): raise RuntimeError("Parameter csv_files contains either an unreachable/broken path or a non-CSV file.") acum_df = pd.read_csv(csv_files[0]) - left_datetime_var = get_metadata(csv_files[0]).get('datetime_var', None) + left_datetime_var = get_metadata(csv_files[0]).get('datetime_var', None) + acum_df = acum_df.drop_duplicates(subset=[left_datetime_var]) if left_datetime_var is None: raise ValueError(f"Missing datetime_var metadata in {csv_files[0]}") @@ -51,7 +52,8 @@ def join_tables(csv_files: list): if right_datetime_var is None: raise ValueError(f"Missing datetime_var metadata in {csv_files[idx]}") - acum_df = acum_df.merge(append_df, left_on=left_datetime_var, right_on=right_datetime_var, how='inner') + append_df = append_df.drop_duplicates(subset=[right_datetime_var]) + acum_df = acum_df.merge(append_df, left_on=left_datetime_var, right_on=right_datetime_var, how='left') return acum_df @@ -87,36 +89,21 @@ if __name__ == "__main__": #print("Renaming map keys:", acsm_to_ebas['renaming_map'].keys()) acum_df = acum_df.rename(columns=acsm_to_ebas['renaming_map']) - acum_df['ACSM_time'] = pd.to_datetime(acum_df['ACSM_time']) reduced_set_of_vars = [key for key in acum_df.columns if 'factor' not in key] - print(reduced_set_of_vars) - acum_df.loc[:,reduced_set_of_vars].to_csv('data/JFJ_ACSM-017_2024.txt',sep='\t',index=None, date_format="%Y/%m/%d %H:%M:%S") + #print(reduced_set_of_vars) + + flags_acum_df = join_tables([path3]) + flags_acum_df = flags_acum_df.rename(columns=acsm_to_ebas['renaming_map']) - # Count the number of NaT (null) values - num_nats = acum_df['ACSM_time'].isna().sum() - - # Get the total number of rows - total_rows = len(acum_df) - - # Calculate the percentage of NaT values - percentage_nats = (num_nats / total_rows) * 100 - - print(f"Total rows: {total_rows}") - print(f"NaT (missing) values: {num_nats}") - print(f"Percentage of data loss: {percentage_nats:.2f}%") - - acum_df = join_tables([path3]) - acum_df = acum_df.rename(columns=acsm_to_ebas['renaming_map']) + # Ensure time columns are datetime acum_df['ACSM_time'] = pd.to_datetime(acum_df['ACSM_time']) - + flags_acum_df['ACSM_time'] = pd.to_datetime(acum_df['ACSM_time']) # Count the number of NaT (null) values num_nats = acum_df['ACSM_time'].isna().sum() - # Get the total number of rows total_rows = len(acum_df) - # Calculate the percentage of NaT values percentage_nats = (num_nats / total_rows) * 100 @@ -124,4 +111,48 @@ if __name__ == "__main__": print(f"NaT (missing) values: {num_nats}") print(f"Percentage of data loss: {percentage_nats:.2f}%") - acum_df.to_csv('data/JFJ_ACSM-017_FLAGS_2024.txt',sep='\t',index=None, date_format="%Y/%m/%d %H:%M:%S") \ No newline at end of file + # Count the number of NaT (null) values + num_nats = flags_acum_df['ACSM_time'].isna().sum() + # Get the total number of rows + total_rows = len(flags_acum_df) + # Calculate the percentage of NaT values + percentage_nats = (num_nats / total_rows) * 100 + print(f"Total rows: {total_rows}") + print(f"NaT (missing) values: {num_nats}") + print(f"Percentage of data loss: {percentage_nats:.2f}%") + + + nat_acum = acum_df['ACSM_time'].isna() + nat_flags = flags_acum_df['ACSM_time'].isna() + + valid_rows = ~(nat_acum | nat_flags) # Compute valid rows in one step + + + acum_df.loc[valid_rows.to_numpy(),reduced_set_of_vars].to_csv('data/JFJ_ACSM-017_2024.txt',sep='\t',index=None, date_format="%Y/%m/%d %H:%M:%S") + flags_acum_df.loc[valid_rows.to_numpy(),:].to_csv('data/JFJ_ACSM-017_FLAGS_2024.txt',sep='\t',index=None, date_format="%Y/%m/%d %H:%M:%S") + + + + + #acum_df['ACSM_time'] = pd.to_datetime(acum_df['ACSM_time']) + #flags_acum_df['ACSM_time'] = pd.to_datetime(flags_acum_df['ACSM_time']) + + # Set datetime as index + #acum_df.set_index('ACSM_time', inplace=True) + #flags_acum_df.set_index('ACSM_time', inplace=True) + + #nat_acum = acum_df['ACSM_time'].isna() + #nat_flags = flags_acum_df['ACSM_time'].isna() + + #valid_rows = ~(nat_acum | nat_flags) # Compute valid rows in one step + + #acum_df_filtered = acum_df.loc[valid_rows.to_numpy(),:] + #flags_acum_df_filtered = flags_acum_df[valid_rows.to_numpy(),:] + + # Step 4: Apply the valid mask to both dataframes + #acum_df_filtered = acum_df[valid_rows] + #flags_acum_df_filtered = flags_acum_df[valid_rows] + + # Display results + #print(acum_df_filtered) + #print(flags_acum_df_filtered) \ No newline at end of file