diff --git a/pipelines/steps/generate_flags.py b/pipelines/steps/generate_flags.py index 9d32b70..68ec1f8 100644 --- a/pipelines/steps/generate_flags.py +++ b/pipelines/steps/generate_flags.py @@ -66,6 +66,13 @@ def generate_diagnostic_flags(data_table): print(f"Error accessing validation thresholds at: {validity_thersholds_file}") return 1 + + # Define binary to ebas flag code map + # Specify labeling function to create numbered EBAS flags. It maps a column indicator, + # marking a time interval in which a particular flagging event occurred. + binary_to_ebas_code = {False : 0, True : 456} + + # Initialize a dictionary to store indicator variables indicator_variables = {} @@ -86,17 +93,25 @@ def generate_diagnostic_flags(data_table): # Create an indicator variable for the current diagnostic variable tmp = data_table[diagnostic_variable] - indicator_variables['flag_'+diagnostic_variable] = ((tmp >= lower_lim) & (tmp <= upper_lim)).to_numpy() + indicator_variables['flag_'+diagnostic_variable] = np.logical_not(((tmp >= lower_lim) & (tmp <= upper_lim)).to_numpy()) + indicator_variables['numflag_'+diagnostic_variable] = np.array([binary_to_ebas_code[entry] for entry in indicator_variables['flag_'+diagnostic_variable]], dtype=np.int64) # Add indicator variables to the new data table new_data_table = pd.DataFrame(indicator_variables) - new_data_table['flag_any_diagnostic_flag'] = new_data_table.apply(lambda x : any(np.logical_not(x.values)), axis='columns') + + aggr_func = lambda x : max(x.values) + new_data_table['numflag_any_diagnostic_flag'] = new_data_table.loc[:,['numflag_' in col for col in new_data_table.columns]].aggregate(aggr_func,axis='columns') + + aggr_func = lambda x : np.nan if x.isna().all() else any(x.dropna().values) + new_data_table['flag_any_diagnostic_flag'] = new_data_table.loc[:,['flag_' in col for col in new_data_table.columns]].aggregate(aggr_func, axis='columns') + #new_data_table['flag_any_diagnostic_flag'] = new_data_table.apply(lambda x : any(np.logical_not(x.values)), axis='columns') #new_data_table['flag_any_diagnostic'] = new_data_table.apply( # lambda x: np.nan if x.isna().all() else any(x.dropna().values), axis='columns' #) return new_data_table -def generate_species_flags(data_table : pd.DataFrame , flags_table : pd.DataFrame ): +# TODO: abstract some of the code in the command line main +def generate_species_flags(data_table : pd.DataFrame): """Generate flags for columns in data_table based on flags_table @@ -106,10 +121,139 @@ def generate_species_flags(data_table : pd.DataFrame , flags_table : pd.DataFram _description_ """ + # Get name of the specifies to flag based on diagnostics and manual flags + path_to_calib_params = os.path.normpath(os.path.join(projectPath,'pipelines/params/calibration_params.yaml')) + + if not os.path.exists(path_to_calib_params): + raise FileNotFoundError(f'Calibration params file:{path_to_calib_params}') + + with open(path_to_calib_params,'r') as stream: + calib_param_dict = yaml.safe_load(stream) + + predefined_species = calib_param_dict['variables']['species'] + + print('Predefined_species:', predefined_species) + + # Save output tables to csv file and save/or update data lineage record + filename, ext = os.path.splitext(parent_file) + path_to_flags_file = '/'.join([path_to_output_folder, f'{filename}_flags.csv']) + + variables_set = set(data_table.columns) + print(variables_set) + + manual_json_flags = [] + csv_flags = [] + + # Inspect flags folder + for filename in os.listdir(path_to_output_folder): + if all([filename.endswith('.json'), 'metadata' not in filename, 'flag' in filename]): + manual_json_flags.append(filename) + elif filename.endswith('.csv'): + csv_flags.append(filename) + + if csv_flags: + flags_table = pd.read_csv(os.path.join(path_to_output_folder, csv_flags[0])) + if 'numflag_any_diagnostic_flag' in flags_table.columns: + #renaming_map = {var: f'flag_{var}' for var in data_table.columns} + #data_table[renaming_map.keys()] = flags_table['flag_any_diagnostic_flag'].values + #data_table.rename(columns=renaming_map, inplace=True) + + renaming_map = {} + for var in data_table.columns: + #print(var) + if (not datetime_var == var) and (var in predefined_species): + renaming_map[var] = f'numflag_{var}' + print(f'numflag_{var}') + data_table[var] = pd.Series(flags_table['numflag_any_diagnostic_flag'].values) + print(renaming_map) + data_table.rename(columns=renaming_map, inplace=True) + else: + raise FileNotFoundError("Automated diagnostic flag .csv not found. Hint: Run pipelines/step/generate_flags.py --flag-type diagnostics.") + + + numflag_columns = [col for col in data_table.columns if 'numflag_' in col] + + print(numflag_columns) + for flag_filename in manual_json_flags: + #print(flag_filename) + parts = os.path.splitext(flag_filename)[0].split('_') + varname = '_'.join(parts[2:]) # Extract variable name from filename + + #if f'flag_{varname}' in data_table.columns: + try: + # Load manually generate flag + with open(os.path.join(path_to_output_folder, flag_filename), 'r') as stream: + flag_dict = json.load(stream) + + t1 = pd.to_datetime(flag_dict.get('startdate')) + t2 = pd.to_datetime(flag_dict.get('enddate')) + flag_code = flag_dict.get('flag_code', np.nan) # Default to NaN if missing + + if pd.isnull(t1) or pd.isnull(t2): + continue # Skip if invalid timestamps + + if not data_table[datetime_var].is_monotonic_increasing: + data_table.sort_values(by=datetime_var, inplace=True) + data_table.reset_index(drop=True, inplace=True) + + t1_idx = int(abs(data_table[datetime_var] - t1).argmin()) + t2_idx = int(abs(data_table[datetime_var] - t2).argmin()) + #print(flag_code) + #for col in data_table.columns: + + # if 'numflag_' in col: + # print(col) + + data_table = reconcile_flags(data_table, flag_code, t1_idx, t2_idx, numflag_columns) + + # Apply the ranking logic efficiently + #data_table.loc[t1_idx:t2_idx, numflag_columns] = data_table.loc[t1_idx:t2_idx, numflag_columns].map( + # lambda x: reconcile_flags(x, flag_code) + #) + + + #if 456 <= flag_code <= 800: + # data_table.loc[t1_idx:t2_idx, numflag_columns] = data_table.loc[t1_idx:t2_idx, numflag_columns].applymap(lambda x: max(x, flag_code)) + #else: + # data_table.loc[t1_idx:t2_idx, numflag_columns] = flag_code + + except (KeyError, ValueError, FileNotFoundError) as e: + print(f"Error processing {flag_filename}: {e}") + continue - return 0 + return data_table.loc[:,numflag_columns] + + + +with open('app/flags/ebas_dict.yaml','r') as stream: + ebas_dict = yaml.safe_load(stream) + flag_ranking = ebas_dict['flag_ranking'] + + +# Vectorized function for getting the rank of a flag +def get_rank(flag): + return flag_ranking.get(flag, 0) # Default rank 0 for unknown flags + +# Vectorized function for reconciling flags +def reconcile_flags(data_table, flag_code, t1_idx, t2_idx, numflag_columns): + # Get the rank of the flag_code + flag_code_rank = get_rank(flag_code) + + # Extract the relevant subtable for efficiency + sub_table = data_table.loc[t1_idx:t2_idx, numflag_columns] + + # Get the ranks for the current values in the subtable, using np.vectorize to avoid applymap + current_ranks = np.vectorize(get_rank)(sub_table.values) # Element-wise rank computation + + # Compare ranks: If the rank of the flag_code is higher, it will override the current value + new_values = np.where(current_ranks < flag_code_rank, flag_code, sub_table.values) + + # Update the dataframe with the new values + data_table.loc[t1_idx:t2_idx, numflag_columns] = new_values + + return data_table # all_dat[VaporizerTemp_C >= heater_lower_lim & VaporizerTemp_C <= heater_upper_lim ,flag_heater_auto:="V"] # all_dat[ABsamp >= AB_lower_lim & ABsamp <= AB_upper_lim ,flag_AB_auto:="V"] @@ -128,31 +272,68 @@ if __name__ == '__main__': help="Specify the flag type. Must be one of: diagnostics, species, cpd" ) parser.add_argument('data_file', type=str, help="Path to the input HDF5 file containing the data table.") - parser.add_argument('dataset_name', type=str, help ='Relative path to data_table (i.e., dataset name) in HDF5 file') + #parser.add_argument('dataset_name', type=str, help ='Relative path to data_table (i.e., dataset name) in HDF5 file') #parser.add_argument('validity_thersholds_file', type=str, help="Path to the input YAML file containing calibration factors.") #parser.add_argument('output_file', type=str, help="Path to save the output calibrated data as a CSV file.") args = parser.parse_args() + flag_type = args.flag_type + data_file = args.data_file + # Load input data and calibration factors try: - #data_table = pd.read_json(args.data_file) - - print(args.data_file) - dataManager = dataOps.HDF5DataOpsManager(args.data_file) dataManager.load_file_obj() - dataset_name = '/'+args.dataset_name - data_table = dataManager.extract_dataset_as_dataframe('/'+args.dataset_name) - datetime_var, datetime_var_format = dataManager.infer_datetime_variable('/'+args.dataset_name) + + base_name = '/ACSM_TOFWARE' + if '/ACSM_TOFWARE' not in dataManager.file_obj: + dataManager.unload_file_obj() + print(f'Invalid data file: {data_file}. Missing instrument folder ACSM_TOFWARE.') + raise ImportError(f'Instrument folder "/ACSM_TOFWARE" not found in data_file : {data_file}') + + + dataManager.extract_and_load_dataset_metadata() dataset_metadata_df = dataManager.dataset_metadata_df.copy() - print(dataset_metadata_df.head()) - dataset_name_idx = dataset_metadata_df.index[(dataset_metadata_df['dataset_name']==args.dataset_name).to_numpy()] - data_table_metadata = dataset_metadata_df.loc[dataset_name_idx,:] - parent_instrument = data_table_metadata.loc[dataset_name_idx,'parent_instrument'].values[0] - parent_file = data_table_metadata.loc[dataset_name_idx,'parent_file'].values[0] + # Find dataset associated with diagnostic channels + if flag_type == 'diagnostics': + keywords = ['ACSM_JFJ_','_meta.txt/data_table'] + find_keyword = [all(keyword in item for keyword in keywords) for item in dataset_metadata_df['dataset_name']] + + if flag_type == 'species': + keywords = ['ACSM_JFJ_','_timeseries.txt/data_table'] + find_keyword = [all(keyword in item for keyword in keywords) for item in dataset_metadata_df['dataset_name']] + #dataset_name = dataset_metadata_df['dataset_name'][find_keyword] + #parent_file = dataset_metadata_df.loc[find_keyword,'parent_file'] + #parent_flag_file = '_'.join([os.path.splitext(parent_file),'flags.csv']) + #parent_instrument = dataset_metadata_df.loc[find_keyword,'parent_instrument'] + + dataset_name = dataset_metadata_df['dataset_name'][find_keyword] + parent_file = dataset_metadata_df.loc[find_keyword,'parent_file'] + #parent_flag_file = '_'.join([os.path.splitext(parent_file)[0],'flags.csv']) # Expected name + parent_instrument = dataset_metadata_df.loc[find_keyword,'parent_instrument'] + + print(':)') + if not (dataset_name.size == 1): + raise ValueError(f'{flag_type} file is not uniquely identifiable: {parent_file}') + else: + dataset_name = dataset_name.values[0] + parent_file = parent_file.values[0] + parent_instrument = parent_instrument.values[0] + + data_table = dataManager.extract_dataset_as_dataframe(dataset_name) + datetime_var, datetime_var_format = dataManager.infer_datetime_variable(dataset_name) + + #dataManager.extract_and_load_dataset_metadata() + #dataset_metadata_df = dataManager.dataset_metadata_df.copy() + #print(dataset_metadata_df.head()) + + #dataset_name_idx = dataset_metadata_df.index[(dataset_metadata_df['dataset_name']==args.dataset_name).to_numpy()] + #data_table_metadata = dataset_metadata_df.loc[dataset_name_idx,:] + #parent_instrument = data_table_metadata.loc[dataset_name_idx,'parent_instrument'].values[0] + #parent_file = data_table_metadata.loc[dataset_name_idx,'parent_file'].values[0] dataManager.unload_file_obj() @@ -191,104 +372,33 @@ if __name__ == '__main__': # Compute diagnostic flags based on validity thresholds defined in configuration_file_dict if flag_type == 'diagnostics': + flags_table = generate_diagnostic_flags(data_table) - flags_table = generate_diagnostic_flags(data_table) - metadata = {'actris_level' : 1, - 'processing_script': processingScriptRelPath.replace(os.sep,'/'), - 'processing_date' : utils.created_at(), - 'flag_type' : flag_type - } - - # Save output tables to csv file and save/or update data lineage record - filename, ext = os.path.splitext(parent_file) - path_to_flags_file = '/'.join([path_to_output_folder, f'{filename}_flags.csv']) - #path_to_calibration_factors_file = '/'.join([path_to_output_folder, f'{filename}_calibration_factors.csv']) - - flags_table.to_csv(path_to_flags_file, index=False) - - status = stepUtils.record_data_lineage(path_to_flags_file, projectPath, metadata) + if flag_type == 'species': + flags_table = generate_species_flags(data_table) + + metadata = {'actris_level' : 1, + 'processing_script': processingScriptRelPath.replace(os.sep,'/'), + 'processing_date' : utils.created_at(), + 'flag_type' : flag_type + } + + # Save output tables to csv file and save/or update data lineage record + filename, ext = os.path.splitext(parent_file) + path_to_flags_file = '/'.join([path_to_output_folder, f'{filename}_flags.csv']) + #path_to_calibration_factors_file = '/'.join([path_to_output_folder, f'{filename}_calibration_factors.csv']) + + flags_table.to_csv(path_to_flags_file, index=False) + status = stepUtils.record_data_lineage(path_to_flags_file, projectPath, metadata) - print(f"Flags saved to {path_to_flags_file}") - print(f"Data lineage saved to {path_to_output_dir}") - - if flag_type == 'species': - - # Save output tables to csv file and save/or update data lineage record - filename, ext = os.path.splitext(parent_file) - path_to_flags_file = '/'.join([path_to_output_folder, f'{filename}_flags.csv']) - - variables_set = set(data_table.columns) - - manual_json_flags = [] - csv_flags = [] - - # Inspect flags folder - for filename in os.listdir(path_to_output_folder): - if any(var in filename and filename.endswith('.json') for var in variables_set): - manual_json_flags.append(filename) - elif filename.endswith('.csv'): - csv_flags.append(filename) - - if len(csv_flags) == 1: - flags_table = pd.read_csv(os.path.join(path_to_output_folder, csv_flags[0])) - - if 'flag_any_diagnostic_flag' in flags_table.columns: - #renaming_map = {var: f'flag_{var}' for var in data_table.columns} - #data_table[renaming_map.keys()] = flags_table['flag_any_diagnostic_flag'].values - #data_table.rename(columns=renaming_map, inplace=True) - - renaming_map = {} - for var in data_table.columns: - if not datetime_var == var: - renaming_map[var] = f'flag_{var}' - data_table[var] = pd.Series(flags_table['flag_any_diagnostic_flag'].values) - print(renaming_map) - data_table.rename(columns=renaming_map, inplace=True) - print(csv_flags) - # TODO: validate the code below. I suspect we need to change the flag creation strategy. - # First, aggregate manual num flags as one using median maybe and then create a flag for each specie - # taking into account the existing specifies flags (read csv file and update it if needed) - for flag_filename in manual_json_flags: - parts = os.path.splitext(flag_filename)[0].split('_') - varname = '_'.join(parts[2:]) # Extract variable name from filename - print(varname) - if f'flag_{varname}' in data_table.columns: - try: - with open(os.path.join(path_to_output_folder, flag_filename), 'r') as stream: - flag_dict = json.load(stream) - - t1 = pd.to_datetime(flag_dict.get('startdate')) - t2 = pd.to_datetime(flag_dict.get('enddate')) - flag_code = flag_dict.get('flag_code', np.nan) # Default to NaN if missing - - if pd.isnull(t1) or pd.isnull(t2): - continue # Skip if invalid timestamps - - if not data_table[datetime_var].is_monotonic_increasing: - data_table.sort_values(by=datetime_var, inplace=True) - data_table.reset_index(drop=True, inplace=True) - - t1_idx = abs(data_table[datetime_var] - t1).argmin() - t2_idx = abs(data_table[datetime_var] - t2).argmin() - - data_table.loc[t1_idx:t2_idx, f'flag_{varname}'] = flag_code - - except (KeyError, ValueError, FileNotFoundError) as e: - print(f"Error processing {flag_filename}: {e}") - continue - - data_table.to_csv(path_to_flags_file, index=False) + print(f"Flags saved to {path_to_flags_file}") + print(f"Data lineage saved to {path_to_output_dir}") + + #flags_table.to_csv(path_to_flags_file, index=False) # Read json and assign numeric flag to column - - - - - - - except Exception as e: print(f"Error during calibration: {e}") exit(1) diff --git a/pipelines/steps/visualize_datatable_vars.py b/pipelines/steps/visualize_datatable_vars.py index fd2c2ae..069efd5 100644 --- a/pipelines/steps/visualize_datatable_vars.py +++ b/pipelines/steps/visualize_datatable_vars.py @@ -49,8 +49,8 @@ def visualize_table_variables(data_file_path, dataset_name, flags_dataset_name, var_flag_name = f"flag_{var}" if var_flag_name in flags_df.columns: # Identify valid and invalid indices - ind_valid = flags_df[var_flag_name].to_numpy() - ind_invalid = np.logical_not(ind_valid) + ind_invalid = flags_df[var_flag_name].to_numpy() + # ind_valid = np.logical_not(ind_valid) # Detect start and end indices of invalid regions # Find transition points in invalid regions invalid_starts = np.diff(np.concatenate(([False], ind_invalid, [False]))).nonzero()[0][::2]