From cc1b445d38d3ad12f634e4b609f0ab72da459d4f Mon Sep 17 00:00:00 2001 From: Florez Ospina Juan Felipe Date: Fri, 16 May 2025 14:22:17 +0200 Subject: [PATCH] Implement binary flags for visualization purposes, and fix bug with cpc flags. --- notebooks/demo_acsm_pipeline.ipynb | 11 +- ...lize_diagnostic_flags_from_hdf5_file.ipynb | 31 ++++- pipelines/steps/generate_flags.py | 114 +++++++++++------- 3 files changed, 105 insertions(+), 51 deletions(-) diff --git a/notebooks/demo_acsm_pipeline.ipynb b/notebooks/demo_acsm_pipeline.ipynb index cb7959e..cb8ee39 100644 --- a/notebooks/demo_acsm_pipeline.ipynb +++ b/notebooks/demo_acsm_pipeline.ipynb @@ -152,8 +152,17 @@ "#status = subprocess.run(command, capture_output=True, check=True)\n", "#print(status.stdout.decode())\n", "generate_flags(path_to_data_file, 'diagnostics')\n", + "\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ "\n", - "generate_flags(path_to_data_file, 'cpc')\n" + "generate_flags(path_to_data_file, 'cpc')" ] }, { diff --git a/notebooks/demo_visualize_diagnostic_flags_from_hdf5_file.ipynb b/notebooks/demo_visualize_diagnostic_flags_from_hdf5_file.ipynb index cc1a73d..563a2fc 100644 --- a/notebooks/demo_visualize_diagnostic_flags_from_hdf5_file.ipynb +++ b/notebooks/demo_visualize_diagnostic_flags_from_hdf5_file.ipynb @@ -46,7 +46,7 @@ "source": [ "import dima.src.hdf5_ops as dataOps\n", "\n", - "CAMPAIGN_DATA_FILE = \"../data/collection_JFJ_2024_2025-04-11_2025-04-11.h5\"\n", + "CAMPAIGN_DATA_FILE = \"../data/collection_JFJ_2024_2025-05-16_2025-05-16.h5\"\n", "APPEND_DIR = os.path.splitext(CAMPAIGN_DATA_FILE)[0]\n", "\n", "path_to_data_file = CAMPAIGN_DATA_FILE\n", @@ -79,7 +79,7 @@ "import pandas as pd\n", "\n", "# Specify diagnostic variables and the associated flags \n", - "dataset_idx = 0\n", + "dataset_idx = 1\n", "dataset_name = dataset_metadata_df['dataset_name'][dataset_idx]\n", "parent_instrument = dataset_metadata_df['parent_instrument'][dataset_idx]\n", "\n", @@ -133,17 +133,36 @@ "import pipelines.steps.visualize_datatable_vars as vis\n", "\n", "\n", + "variable_sets = {\n", + " \"diagnostic\": {\n", + " \"variables\": [\n", + " 'VaporizerTemp_C', 'HeaterBias_V', 'FlowRefWave', 'FlowRate_mb', 'FlowRate_ccs',\n", + " 'FilamentEmission_mA', 'Detector_V', 'AnalogInput06_V', 'ABRefWave', 'ABsamp', 'ABCorrFact'\n", + " ],\n", + " \"time_var\": \"t_base\"\n", + " },\n", + " \"cpc\": {\n", + " \"variables\": [\"conc\"],\n", + " \"time_var\": \"end_time\"\n", + " },\n", + " \"species\": {\n", + " \"variables\": ['Chl_11000', 'NH4_11000', 'SO4_11000', 'NO3_11000', 'Org_11000'],\n", + " \"time_var\": \"t_start_Buf\"\n", + " }\n", + "}\n", "\n", - "diagnostic_variables = ['VaporizerTemp_C', 'HeaterBias_V', 'FlowRefWave', 'FlowRate_mb', 'FlowRate_ccs', 'FilamentEmission_mA', 'Detector_V',\n", - " 'AnalogInput06_V', 'ABRefWave', 'ABsamp', 'ABCorrFact']\n", + "# Choose one: \"diagnostic\", \"cpc\", or \"species\"\n", + "selected_set = \"diagnostic\"\n", + "\n", + "variables = variable_sets[selected_set][\"variables\"]\n", + "time_var = variable_sets[selected_set][\"time_var\"]\n", "\n", "\n", - "time_var = 't_base'\n", "figs = vis.visualize_table_variables(path_to_data_file, \n", " dataset_name, \n", " flags_dataset_name,\n", " x_var = time_var,\n", - " y_vars = diagnostic_variables)\n", + " y_vars = variables)\n", "\n", "\n", "\n" diff --git a/pipelines/steps/generate_flags.py b/pipelines/steps/generate_flags.py index 81b8c79..9dd4e5e 100644 --- a/pipelines/steps/generate_flags.py +++ b/pipelines/steps/generate_flags.py @@ -31,6 +31,53 @@ import json from pipelines.steps.utils import load_project_yaml_files, get_metadata from math import floor +path_to_ebas_dict = os.path.normpath(os.path.join(projectPath,'app/flags/ebas_dict.yaml')) +with open(path_to_ebas_dict ,'r') as stream: + ebas_dict = yaml.safe_load(stream) + flags_dict = ebas_dict['flags'] + flag_ranking = ebas_dict['flag_ranking'] + + +# Vectorized function for getting the rank of a flag +def get_rank(flag): + return flag_ranking.get(flag, -10) # Default rank is NaN for unknown flags + +# Vectorized function for reconciling flags +def reconcile_flags(data_table, flag_code, t1_idx, t2_idx, numflag_columns): + # Extract the relevant subtable + sub_table = data_table.loc[t1_idx:t2_idx, numflag_columns].copy() + + # Compute ranks of current values + current_ranks = np.vectorize(get_rank)(sub_table.values) + + # Handle flag_code: broadcast scalar or reshape array + if np.isscalar(flag_code): + flag_code_values = np.full(sub_table.shape, flag_code) + flag_code_ranks = np.full(sub_table.shape, get_rank(flag_code)) + else: + # Convert to NumPy array and ensure correct shape + flag_code_array = np.asarray(flag_code) + if flag_code_array.ndim == 1: + # Assume it's one flag per row — broadcast across columns + flag_code_values = np.tile(flag_code_array[:, None], (1, len(numflag_columns))) + flag_code_ranks = np.vectorize(get_rank)(flag_code_values) + else: + # Full 2D matrix + flag_code_values = flag_code_array + flag_code_ranks = np.vectorize(get_rank)(flag_code_array) + + # Validate shape match + if flag_code_values.shape != sub_table.shape: + raise ValueError(f"Shape mismatch: expected {sub_table.shape}, got {flag_code_values.shape}") + + # Reconcile values based on rank comparison + new_values = np.where(current_ranks < flag_code_ranks, flag_code_values, sub_table.values) + + # Assign reconciled values back + data_table.loc[t1_idx:t2_idx, numflag_columns] = new_values.astype(np.int64) + + return data_table + def generate_cpc_flags(data_table, datetime_var: str = 'start_time'): # TODO: ask Rob where to find this information. required_variables = ['start_time', 'end_time', 'st_y', 'ed_y', 'p_int', 'T_int', 'conc', 'numflag'] @@ -45,9 +92,14 @@ def generate_cpc_flags(data_table, datetime_var: str = 'start_time'): print(flags_table.head()) # Multiply numflag by 100 and floor it - flags_table['numflag'] = flags_table['numflag'].apply(lambda x: int(floor(x * 1000))) + flags_table['numflag'] = flags_table['numflag'].apply(lambda x: floor(x * 1000)) + flags_table.rename(columns={'numflag':'numflag_cpc'}, inplace=True) + default_value = flags_dict[999] + flags_table['flag_conc'] = flags_table['numflag_cpc'].copy().values + flags_table['flag_conc'] = flags_table['flag_conc'].apply(lambda x : flags_dict.get(x,default_value)['validity']=='I') + return flags_table @@ -252,54 +304,28 @@ def generate_species_flags(data_table : pd.DataFrame, calib_param_dict : dict, f - return data_table.loc[:,[datetime_var] + numflag_columns] + # Binarize flags for streamlined visualization of invalid and valid regions + binary_flag_columns = [] + default_code = 999 # EBAS missing measurement unspecified reason + default_value = flags_dict[default_code] # fallback definition for unknown flags + + # Convert them to integer type (handling NaNs if needed) + data_table[numflag_columns] = data_table[numflag_columns].fillna(default_code).astype(int) + + for numflag_var in numflag_columns: + flag_var = numflag_var.replace('numflag_', 'flag_') + binary_flag_columns.append(flag_var) + + # Apply validity check: True if flag is 'I' (invalid) + data_table[flag_var] = data_table[numflag_var].apply( + lambda x: flags_dict.get(x, default_value).get('validity') == 'I' + ) -path_to_ebas_dict = os.path.normpath(os.path.join(projectPath,'app/flags/ebas_dict.yaml')) -with open(path_to_ebas_dict ,'r') as stream: - ebas_dict = yaml.safe_load(stream) - flag_ranking = ebas_dict['flag_ranking'] + return data_table.loc[:, [datetime_var] + numflag_columns + binary_flag_columns] -# Vectorized function for getting the rank of a flag -def get_rank(flag): - return flag_ranking.get(flag, -10) # Default rank is NaN for unknown flags -# Vectorized function for reconciling flags -def reconcile_flags(data_table, flag_code, t1_idx, t2_idx, numflag_columns): - # Extract the relevant subtable - sub_table = data_table.loc[t1_idx:t2_idx, numflag_columns].copy() - - # Compute ranks of current values - current_ranks = np.vectorize(get_rank)(sub_table.values) - - # Handle flag_code: broadcast scalar or reshape array - if np.isscalar(flag_code): - flag_code_values = np.full(sub_table.shape, flag_code) - flag_code_ranks = np.full(sub_table.shape, get_rank(flag_code)) - else: - # Convert to NumPy array and ensure correct shape - flag_code_array = np.asarray(flag_code) - if flag_code_array.ndim == 1: - # Assume it's one flag per row — broadcast across columns - flag_code_values = np.tile(flag_code_array[:, None], (1, len(numflag_columns))) - flag_code_ranks = np.vectorize(get_rank)(flag_code_values) - else: - # Full 2D matrix - flag_code_values = flag_code_array - flag_code_ranks = np.vectorize(get_rank)(flag_code_array) - - # Validate shape match - if flag_code_values.shape != sub_table.shape: - raise ValueError(f"Shape mismatch: expected {sub_table.shape}, got {flag_code_values.shape}") - - # Reconcile values based on rank comparison - new_values = np.where(current_ranks < flag_code_ranks, flag_code_values, sub_table.values) - - # Assign reconciled values back - data_table.loc[t1_idx:t2_idx, numflag_columns] = new_values.astype(np.int64) - - return data_table # all_dat[VaporizerTemp_C >= heater_lower_lim & VaporizerTemp_C <= heater_upper_lim ,flag_heater_auto:="V"]