Implement binary flags for visualization purposes, and fix bug with cpc flags.

This commit is contained in:
2025-05-16 14:22:17 +02:00
parent 58d7b425aa
commit cc1b445d38
3 changed files with 105 additions and 51 deletions

View File

@ -31,6 +31,53 @@ import json
from pipelines.steps.utils import load_project_yaml_files, get_metadata
from math import floor
path_to_ebas_dict = os.path.normpath(os.path.join(projectPath,'app/flags/ebas_dict.yaml'))
with open(path_to_ebas_dict ,'r') as stream:
ebas_dict = yaml.safe_load(stream)
flags_dict = ebas_dict['flags']
flag_ranking = ebas_dict['flag_ranking']
# Vectorized function for getting the rank of a flag
def get_rank(flag):
return flag_ranking.get(flag, -10) # Default rank is NaN for unknown flags
# Vectorized function for reconciling flags
def reconcile_flags(data_table, flag_code, t1_idx, t2_idx, numflag_columns):
# Extract the relevant subtable
sub_table = data_table.loc[t1_idx:t2_idx, numflag_columns].copy()
# Compute ranks of current values
current_ranks = np.vectorize(get_rank)(sub_table.values)
# Handle flag_code: broadcast scalar or reshape array
if np.isscalar(flag_code):
flag_code_values = np.full(sub_table.shape, flag_code)
flag_code_ranks = np.full(sub_table.shape, get_rank(flag_code))
else:
# Convert to NumPy array and ensure correct shape
flag_code_array = np.asarray(flag_code)
if flag_code_array.ndim == 1:
# Assume it's one flag per row — broadcast across columns
flag_code_values = np.tile(flag_code_array[:, None], (1, len(numflag_columns)))
flag_code_ranks = np.vectorize(get_rank)(flag_code_values)
else:
# Full 2D matrix
flag_code_values = flag_code_array
flag_code_ranks = np.vectorize(get_rank)(flag_code_array)
# Validate shape match
if flag_code_values.shape != sub_table.shape:
raise ValueError(f"Shape mismatch: expected {sub_table.shape}, got {flag_code_values.shape}")
# Reconcile values based on rank comparison
new_values = np.where(current_ranks < flag_code_ranks, flag_code_values, sub_table.values)
# Assign reconciled values back
data_table.loc[t1_idx:t2_idx, numflag_columns] = new_values.astype(np.int64)
return data_table
def generate_cpc_flags(data_table, datetime_var: str = 'start_time'):
# TODO: ask Rob where to find this information.
required_variables = ['start_time', 'end_time', 'st_y', 'ed_y', 'p_int', 'T_int', 'conc', 'numflag']
@ -45,9 +92,14 @@ def generate_cpc_flags(data_table, datetime_var: str = 'start_time'):
print(flags_table.head())
# Multiply numflag by 100 and floor it
flags_table['numflag'] = flags_table['numflag'].apply(lambda x: int(floor(x * 1000)))
flags_table['numflag'] = flags_table['numflag'].apply(lambda x: floor(x * 1000))
flags_table.rename(columns={'numflag':'numflag_cpc'}, inplace=True)
default_value = flags_dict[999]
flags_table['flag_conc'] = flags_table['numflag_cpc'].copy().values
flags_table['flag_conc'] = flags_table['flag_conc'].apply(lambda x : flags_dict.get(x,default_value)['validity']=='I')
return flags_table
@ -252,54 +304,28 @@ def generate_species_flags(data_table : pd.DataFrame, calib_param_dict : dict, f
return data_table.loc[:,[datetime_var] + numflag_columns]
# Binarize flags for streamlined visualization of invalid and valid regions
binary_flag_columns = []
default_code = 999 # EBAS missing measurement unspecified reason
default_value = flags_dict[default_code] # fallback definition for unknown flags
# Convert them to integer type (handling NaNs if needed)
data_table[numflag_columns] = data_table[numflag_columns].fillna(default_code).astype(int)
for numflag_var in numflag_columns:
flag_var = numflag_var.replace('numflag_', 'flag_')
binary_flag_columns.append(flag_var)
# Apply validity check: True if flag is 'I' (invalid)
data_table[flag_var] = data_table[numflag_var].apply(
lambda x: flags_dict.get(x, default_value).get('validity') == 'I'
)
path_to_ebas_dict = os.path.normpath(os.path.join(projectPath,'app/flags/ebas_dict.yaml'))
with open(path_to_ebas_dict ,'r') as stream:
ebas_dict = yaml.safe_load(stream)
flag_ranking = ebas_dict['flag_ranking']
return data_table.loc[:, [datetime_var] + numflag_columns + binary_flag_columns]
# Vectorized function for getting the rank of a flag
def get_rank(flag):
return flag_ranking.get(flag, -10) # Default rank is NaN for unknown flags
# Vectorized function for reconciling flags
def reconcile_flags(data_table, flag_code, t1_idx, t2_idx, numflag_columns):
# Extract the relevant subtable
sub_table = data_table.loc[t1_idx:t2_idx, numflag_columns].copy()
# Compute ranks of current values
current_ranks = np.vectorize(get_rank)(sub_table.values)
# Handle flag_code: broadcast scalar or reshape array
if np.isscalar(flag_code):
flag_code_values = np.full(sub_table.shape, flag_code)
flag_code_ranks = np.full(sub_table.shape, get_rank(flag_code))
else:
# Convert to NumPy array and ensure correct shape
flag_code_array = np.asarray(flag_code)
if flag_code_array.ndim == 1:
# Assume it's one flag per row — broadcast across columns
flag_code_values = np.tile(flag_code_array[:, None], (1, len(numflag_columns)))
flag_code_ranks = np.vectorize(get_rank)(flag_code_values)
else:
# Full 2D matrix
flag_code_values = flag_code_array
flag_code_ranks = np.vectorize(get_rank)(flag_code_array)
# Validate shape match
if flag_code_values.shape != sub_table.shape:
raise ValueError(f"Shape mismatch: expected {sub_table.shape}, got {flag_code_values.shape}")
# Reconcile values based on rank comparison
new_values = np.where(current_ranks < flag_code_ranks, flag_code_values, sub_table.values)
# Assign reconciled values back
data_table.loc[t1_idx:t2_idx, numflag_columns] = new_values.astype(np.int64)
return data_table
# all_dat[VaporizerTemp_C >= heater_lower_lim & VaporizerTemp_C <= heater_upper_lim ,flag_heater_auto:="V"]