diff --git a/notebooks/demo_acsm_pipeline.ipynb b/notebooks/demo_acsm_pipeline.ipynb index 54945c5..cb7959e 100644 --- a/notebooks/demo_acsm_pipeline.ipynb +++ b/notebooks/demo_acsm_pipeline.ipynb @@ -151,7 +151,9 @@ "#command = ['python', 'pipelines/steps/compute_automated_flags.py', path_to_data_file, dataset_name, path_to_config_file]\n", "#status = subprocess.run(command, capture_output=True, check=True)\n", "#print(status.stdout.decode())\n", - "generate_flags(path_to_data_file, 'diagnostics')\n" + "generate_flags(path_to_data_file, 'diagnostics')\n", + "\n", + "generate_flags(path_to_data_file, 'cpc')\n" ] }, { diff --git a/pipelines/steps/generate_flags.py b/pipelines/steps/generate_flags.py index 3751601..81b8c79 100644 --- a/pipelines/steps/generate_flags.py +++ b/pipelines/steps/generate_flags.py @@ -28,12 +28,28 @@ import dima.src.hdf5_ops as dataOps import pipelines.steps.utils as stepUtils import dima.utils.g5505_utils as utils import json -from pipelines.steps.utils import load_project_yaml_files +from pipelines.steps.utils import load_project_yaml_files, get_metadata +from math import floor -def compute_cpc_flags(): - # TODO: ask rob where to find this information. +def generate_cpc_flags(data_table, datetime_var: str = 'start_time'): + # TODO: ask Rob where to find this information. + required_variables = ['start_time', 'end_time', 'st_y', 'ed_y', 'p_int', 'T_int', 'conc', 'numflag'] + + # Optionally check if required variables exist in data_table + # Uncomment if you want to enforce the check: + # if not all(var in data_table.columns for var in required_variables): + # raise ValueError("Some required variables are missing from the data_table.") + + # Select only datetime and numflag columns + flags_table = data_table.loc[:, [datetime_var, 'numflag']].copy() + print(flags_table.head()) + + # Multiply numflag by 100 and floor it + flags_table['numflag'] = flags_table['numflag'].apply(lambda x: int(floor(x * 1000))) + flags_table.rename(columns={'numflag':'numflag_cpc'}, inplace=True) + + return flags_table - return 0 #def compute_diagnostic_variable_flags(data_table, validity_thresholds_dict): def generate_diagnostic_flags(data_table, validity_thresholds_dict): @@ -101,6 +117,7 @@ def generate_diagnostic_flags(data_table, validity_thresholds_dict): #) return new_data_table +from scipy.interpolate import interp1d def generate_species_flags(data_table : pd.DataFrame, calib_param_dict : dict, flagsFolderPath, datetime_var : str = 't_start_Buf'): @@ -124,29 +141,79 @@ def generate_species_flags(data_table : pd.DataFrame, calib_param_dict : dict, f #print(manual_json_flags,csv_flags) if csv_flags: - flags_table = pd.read_csv(os.path.join(flagsFolderPath, csv_flags[0])) - if 'numflag_any_diagnostic_flag' in flags_table.columns: - # Define condition for renaming - required = lambda var: var != datetime_var and var in predefined_species + # Loop over CSV files in the flags folder + for filename in csv_flags: + filePath = os.path.join(flagsFolderPath, filename) + fileMetadataDict = get_metadata(filePath) + flag_datetime_var = fileMetadataDict['datetime_var'] - # Create renaming map using dictionary comprehension - renaming_map = {var: f'numflag_{var}' for var in data_table.columns if required(var)} + flags_table = pd.read_csv(filePath) + + # Ensure datetime type + data_table[datetime_var] = pd.to_datetime(data_table[datetime_var]) + flags_table[flag_datetime_var] = pd.to_datetime(flags_table[flag_datetime_var]) + + if 'numflag_any_diagnostic_flag' in flags_table.columns: + # Sort for interpolation + flags_table_sorted = flags_table.sort_values(flag_datetime_var) + + # Pre-convert datetimes to int64 for interpolation + flag_times_int = flags_table_sorted[flag_datetime_var].astype(np.int64) + data_times_int = data_table[datetime_var].astype(np.int64) + + # Nearest-neighbor interpolator + flag_interp_func = interp1d( + flag_times_int, + flags_table_sorted['numflag_any_diagnostic_flag'], + kind='nearest', + bounds_error=False, + fill_value='extrapolate' + ) + + # Interpolated flags + interpolated_flags = flag_interp_func(data_times_int) + + # Define which columns to flag + required = lambda var: var != datetime_var and var in predefined_species + renaming_map = {var: f'numflag_{var}' for var in data_table.columns if required(var)} + variables = list(renaming_map.keys()) + + # Assign flags to those columns + for var in variables: + data_table[var] = interpolated_flags + + data_table.rename(columns=renaming_map, inplace=True) + + elif 'numflag_cpc' in flags_table.columns: + # Sort for interpolation + flags_table_sorted = flags_table.sort_values(flag_datetime_var) + + # Pre-convert datetimes to int64 for interpolation + flag_times_int = flags_table_sorted[flag_datetime_var].values.astype(np.int64) + data_times_int = data_table[datetime_var].values.astype(np.int64) + + # Nearest-neighbor interpolator + flag_interp_func = interp1d( + flag_times_int, + flags_table_sorted['numflag_cpc'], + kind='nearest', + bounds_error=False, + fill_value='extrapolate' + ) + + # Interpolated flags + interpolated_cpc_flags = flag_interp_func(data_times_int) + - # Assign the same flag values across required columns using vectorized operation - variables = list(renaming_map.keys()) - data_table[variables] = np.tile( - flags_table['numflag_any_diagnostic_flag'].values[:, None], - (1, len(variables)) - ) - #print(renaming_map) - data_table.rename(columns=renaming_map, inplace=True) else: raise FileNotFoundError("Automated diagnostic flag .csv not found. Hint: Run pipelines/step/generate_flags.py --flag-type diagnostics.") numflag_columns = [col for col in data_table.columns if 'numflag_' in col] + data_table = reconcile_flags(data_table, interpolated_cpc_flags, 0, interpolated_cpc_flags.size, numflag_columns) + #print(numflag_columns) for flag_filename in manual_json_flags: #print(flag_filename) @@ -196,27 +263,45 @@ with open(path_to_ebas_dict ,'r') as stream: # Vectorized function for getting the rank of a flag def get_rank(flag): - return flag_ranking.get(flag, np.nan) # Default rank 0 for unknown flags + return flag_ranking.get(flag, -10) # Default rank is NaN for unknown flags # Vectorized function for reconciling flags def reconcile_flags(data_table, flag_code, t1_idx, t2_idx, numflag_columns): - # Get the rank of the flag_code - flag_code_rank = get_rank(flag_code) + # Extract the relevant subtable + sub_table = data_table.loc[t1_idx:t2_idx, numflag_columns].copy() - # Extract the relevant subtable for efficiency - sub_table = data_table.loc[t1_idx:t2_idx, numflag_columns] + # Compute ranks of current values + current_ranks = np.vectorize(get_rank)(sub_table.values) - # Get the ranks for the current values in the subtable, using np.vectorize to avoid applymap - current_ranks = np.vectorize(get_rank)(sub_table.values) # Element-wise rank computation + # Handle flag_code: broadcast scalar or reshape array + if np.isscalar(flag_code): + flag_code_values = np.full(sub_table.shape, flag_code) + flag_code_ranks = np.full(sub_table.shape, get_rank(flag_code)) + else: + # Convert to NumPy array and ensure correct shape + flag_code_array = np.asarray(flag_code) + if flag_code_array.ndim == 1: + # Assume it's one flag per row — broadcast across columns + flag_code_values = np.tile(flag_code_array[:, None], (1, len(numflag_columns))) + flag_code_ranks = np.vectorize(get_rank)(flag_code_values) + else: + # Full 2D matrix + flag_code_values = flag_code_array + flag_code_ranks = np.vectorize(get_rank)(flag_code_array) - # Compare ranks: If the rank of the flag_code is higher, it will override the current value - new_values = np.where(current_ranks < flag_code_rank, flag_code, sub_table.values) + # Validate shape match + if flag_code_values.shape != sub_table.shape: + raise ValueError(f"Shape mismatch: expected {sub_table.shape}, got {flag_code_values.shape}") - # Update the dataframe with the new values + # Reconcile values based on rank comparison + new_values = np.where(current_ranks < flag_code_ranks, flag_code_values, sub_table.values) + + # Assign reconciled values back data_table.loc[t1_idx:t2_idx, numflag_columns] = new_values.astype(np.int64) return data_table + # all_dat[VaporizerTemp_C >= heater_lower_lim & VaporizerTemp_C <= heater_upper_lim ,flag_heater_auto:="V"] # all_dat[ABsamp >= AB_lower_lim & ABsamp <= AB_upper_lim ,flag_AB_auto:="V"] # all_dat[FlowRate_ccs >= flow_lower_lim & FlowRate_ccs <= flow_upper_lim ,flag_flow_auto:="V"] @@ -248,6 +333,10 @@ def main(data_file, flag_type): keywords = [f'ACSM_{STATION_ABBR}_','_timeseries.txt/data_table'] find_keyword = [all(keyword in item for keyword in keywords) for item in dataset_metadata_df['dataset_name']] + if flag_type == 'cpc': + keywords = ['cpc.particle_number_concentration.aerosol.', f'CH02L_TSI_3772_{STATION_ABBR}.CH02L_CPC.lev1.nas'] + find_keyword = [all(keyword in item for keyword in keywords) for item in dataset_metadata_df['dataset_name']] + # Specify source dataset to be extracted from input hdf5 data file columns = ['dataset_name','parent_file','parent_instrument'] dataset_name, parent_file, parent_instrument = tuple(dataset_metadata_df.loc[find_keyword,col] for col in columns) @@ -316,6 +405,9 @@ def main(data_file, flag_type): #calib_param_dict = load_parameters(flag_type) calib_param_dict = load_project_yaml_files(projectPath, "calibration_params.yaml") flags_table = generate_species_flags(data_table,calib_param_dict,path_to_output_folder,datetime_var) + if flag_type == 'cpc': + print(':D') + flags_table = generate_cpc_flags(data_table, datetime_var) metadata = {'actris_level' : 1, 'processing_script': processingScriptRelPath.replace(os.sep,'/'), @@ -361,7 +453,7 @@ def get_flags_from_folder(flagsFolderPath): if folderitem.startswith('flag') and folderitem.endswith('.json'): manual_json_flags.append(folderitem) # Identify CSV flag files - elif folderitem.endswith('.csv'): + elif folderitem.endswith('.csv') and '_flags' in folderitem: csv_flags.append(folderitem) # Return the lists of manual flag JSON files and CSV flag files @@ -375,8 +467,8 @@ if __name__ == '__main__': parser.add_argument( "--flag-type", required=True, - choices=["diagnostics", "species", "cpd"], - help="Specify the flag type. Must be one of: diagnostics, species, cpd" + choices=["diagnostics", "species", "cpc"], + help="Specify the flag type. Must be one of: diagnostics, species, cpc" ) parser.add_argument('data_file', type=str, help="Path to the input HDF5 file containing the data table.") #parser.add_argument('dataset_name', type=str, help ='Relative path to data_table (i.e., dataset name) in HDF5 file')