Files
acsm-fairifier/pipelines/steps/generate_flags.py

426 lines
18 KiB
Python

import sys, os
try:
thisFilePath = os.path.abspath(__file__)
print(thisFilePath)
except NameError:
print("[Notice] The __file__ attribute is unavailable in this environment (e.g., Jupyter or IDLE).")
print("When using a terminal, make sure the working directory is set to the script's location to prevent path issues (for the DIMA submodule)")
#print("Otherwise, path to submodule DIMA may not be resolved properly.")
thisFilePath = os.getcwd() # Use current directory or specify a default
import numpy as np
import pandas as pd
import argparse
import yaml, json
projectPath = os.path.normpath(os.path.join(thisFilePath, "..", "..",'..')) # Move up to project root
#print('Project path:', projectPath)
dimaPath = os.path.normpath('/'.join([projectPath,'dima']))
#print('DIMA path:', dimaPath)
# Set up project root directory
sys.path.insert(0,projectPath)
sys.path.insert(0,dimaPath)
import dima.src.hdf5_ops as dataOps
import pipelines.steps.utils as stepUtils
import dima.utils.g5505_utils as utils
import json
def compute_cpc_flags():
# TODO: ask rob where to find this information.
return 0
def load_parameters(flag_type):
# Implicit input
if flag_type == 'diagnostics':
flag_type_file = os.path.normpath(os.path.join(projectPath,'pipelines/params/validity_thresholds.yaml'))
error_message = f"Error accessing validation thresholds at: {flag_type_file}"
elif flag_type == 'species':
flag_type_file = os.path.normpath(os.path.join(projectPath,'pipelines/params/calibration_params.yaml'))
error_message = f"Error accessing calibration parameters at: {flag_type_file}"
output_dict = {}
try:
with open(flag_type_file, 'r') as stream:
output_dict = yaml.load(stream, Loader=yaml.FullLoader)
except Exception as e:
print(error_message)
return {}
# Get name of the specifies to flag based on diagnostics and manual flags
#path_to_calib_params = os.path.normpath(os.path.join(projectPath,'pipelines/params/calibration_params.yaml'))
#if not os.path.exists(path_to_calib_params):
# raise FileNotFoundError(f'Calibration params file:{path_to_calib_params}')
#with open(path_to_calib_params,'r') as stream:
# calib_param_dict = yaml.safe_load(stream)
return output_dict
#def compute_diagnostic_variable_flags(data_table, validity_thresholds_dict):
def generate_diagnostic_flags(data_table, validity_thresholds_dict):
"""
Create indicator variables that check whether a particular diagnostic variable is within
pre-specified/acceptable limits, which are defined by `variable_limits`.
Parameters:
data_table (pd.DataFrame): The input data table with variables to calibrate.
variable_limits (dict): Dictionary mapping diagnostic-variables to their limits, e.g.,
{
'ABsamp': {
'lower_lim': {'value': 20000, 'description': "not specified yet"},
'upper_lim': {'value': 500000, 'description': "not specified yet"}
}
}
Returns:
pd.DataFrame: A new data table with calibrated variables, containing the original columns
and additional indicator variables, representing flags.
"""
# Define binary to ebas flag code map
# Specify labeling function to create numbered EBAS flags. It maps a column indicator,
# marking a time interval in which a particular flagging event occurred.
binary_to_ebas_code = {False : 0, True : 456}
# Initialize a dictionary to store indicator variables
indicator_variables = {}
indicator_variables['t_base'] = data_table['t_base']
# Loop through the column names in the data table
for diagnostic_variable in data_table.columns:
print(diagnostic_variable)
# Skip if the diagnostic variable is not in variable_limits
if diagnostic_variable not in validity_thresholds_dict['validity_thresholds']['variables']:
print(f'Diagnostic variable {diagnostic_variable} has not defined limits in {validity_thresholds_dict}.')
continue
# Get lower and upper limits for diagnostic_variable from variable limits dict
variable_ranges = validity_thresholds_dict['validity_thresholds']['variables'][diagnostic_variable]
lower_lim = variable_ranges['lower_lim']
upper_lim = variable_ranges['upper_lim']
# Create an indicator variable for the current diagnostic variable
tmp = data_table[diagnostic_variable]
indicator_variables['flag_'+diagnostic_variable] = np.logical_not(((tmp >= lower_lim) & (tmp <= upper_lim)).to_numpy())
indicator_variables['numflag_'+diagnostic_variable] = np.array([binary_to_ebas_code[entry] for entry in indicator_variables['flag_'+diagnostic_variable]], dtype=np.int64)
# Add indicator variables to the new data table
new_data_table = pd.DataFrame(indicator_variables)
aggr_func = lambda x : max(x.values)
new_data_table['numflag_any_diagnostic_flag'] = new_data_table.loc[:,['numflag_' in col for col in new_data_table.columns]].aggregate(aggr_func,axis='columns')
aggr_func = lambda x : np.nan if x.isna().all() else any(x.dropna().values)
new_data_table['flag_any_diagnostic_flag'] = new_data_table.loc[:,['flag_' in col for col in new_data_table.columns]].aggregate(aggr_func, axis='columns')
#new_data_table['flag_any_diagnostic_flag'] = new_data_table.apply(lambda x : any(np.logical_not(x.values)), axis='columns')
#new_data_table['flag_any_diagnostic'] = new_data_table.apply(
# lambda x: np.nan if x.isna().all() else any(x.dropna().values), axis='columns'
#)
return new_data_table
# TODO: abstract some of the code in the command line main
def generate_species_flags(data_table : pd.DataFrame, calib_param_dict : dict):
"""Generate flags for columns in data_table based on flags_table
Returns
-------
_type_
_description_
"""
predefined_species = calib_param_dict.get('variables',{}).get('species',[])
if not predefined_species:
raise RuntimeError("Undefined species. Input argument 'calib_param_dict' must contain a 'variables' : {'species' : ['example1',...,'examplen']} ")
print('Predefined_species:', predefined_species)
# Save output tables to csv file and save/or update data lineage record
filename, ext = os.path.splitext(parent_file)
path_to_flags_file = '/'.join([path_to_output_folder, f'{filename}_flags.csv'])
variables_set = set(data_table.columns)
print(variables_set)
manual_json_flags = []
csv_flags = []
# Inspect flags folder
for filename in os.listdir(path_to_output_folder):
if all([filename.endswith('.json'), 'metadata' not in filename, 'flag' in filename]):
manual_json_flags.append(filename)
elif filename.endswith('.csv'):
csv_flags.append(filename)
if csv_flags:
flags_table = pd.read_csv(os.path.join(path_to_output_folder, csv_flags[0]))
if 'numflag_any_diagnostic_flag' in flags_table.columns:
#renaming_map = {var: f'flag_{var}' for var in data_table.columns}
#data_table[renaming_map.keys()] = flags_table['flag_any_diagnostic_flag'].values
#data_table.rename(columns=renaming_map, inplace=True)
renaming_map = {}
for var in data_table.columns:
#print(var)
if (not datetime_var == var) and (var in predefined_species):
renaming_map[var] = f'numflag_{var}'
print(f'numflag_{var}')
data_table[var] = pd.Series(flags_table['numflag_any_diagnostic_flag'].values,dtype=np.int64)
print(renaming_map)
data_table.rename(columns=renaming_map, inplace=True)
else:
raise FileNotFoundError("Automated diagnostic flag .csv not found. Hint: Run pipelines/step/generate_flags.py <campaignFile.h5> --flag-type diagnostics.")
numflag_columns = [col for col in data_table.columns if 'numflag_' in col]
print(numflag_columns)
for flag_filename in manual_json_flags:
#print(flag_filename)
parts = os.path.splitext(flag_filename)[0].split('_')
varname = '_'.join(parts[2:]) # Extract variable name from filename
#if f'flag_{varname}' in data_table.columns:
try:
# Load manually generate flag
with open(os.path.join(path_to_output_folder, flag_filename), 'r') as stream:
flag_dict = json.load(stream)
t1 = pd.to_datetime(flag_dict.get('startdate'))
t2 = pd.to_datetime(flag_dict.get('enddate'))
flag_code = flag_dict.get('flag_code', np.nan) # Default to NaN if missing
if pd.isnull(t1) or pd.isnull(t2):
continue # Skip if invalid timestamps
if not data_table[datetime_var].is_monotonic_increasing:
data_table.sort_values(by=datetime_var, inplace=True)
data_table.reset_index(drop=True, inplace=True)
t1_idx = int(abs(data_table[datetime_var] - t1).argmin())
t2_idx = int(abs(data_table[datetime_var] - t2).argmin())
#print(flag_code)
#for col in data_table.columns:
# if 'numflag_' in col:
# print(col)
data_table = reconcile_flags(data_table, flag_code, t1_idx, t2_idx, numflag_columns)
# Apply the ranking logic efficiently
#data_table.loc[t1_idx:t2_idx, numflag_columns] = data_table.loc[t1_idx:t2_idx, numflag_columns].map(
# lambda x: reconcile_flags(x, flag_code)
#)
#if 456 <= flag_code <= 800:
# data_table.loc[t1_idx:t2_idx, numflag_columns] = data_table.loc[t1_idx:t2_idx, numflag_columns].applymap(lambda x: max(x, flag_code))
#else:
# data_table.loc[t1_idx:t2_idx, numflag_columns] = flag_code
except (KeyError, ValueError, FileNotFoundError) as e:
print(f"Error processing {flag_filename}: {e}")
continue
return data_table.loc[:,[datetime_var] + numflag_columns]
with open('app/flags/ebas_dict.yaml','r') as stream:
ebas_dict = yaml.safe_load(stream)
flag_ranking = ebas_dict['flag_ranking']
# Vectorized function for getting the rank of a flag
def get_rank(flag):
return flag_ranking.get(flag, 0) # Default rank 0 for unknown flags
# Vectorized function for reconciling flags
def reconcile_flags(data_table, flag_code, t1_idx, t2_idx, numflag_columns):
# Get the rank of the flag_code
flag_code_rank = get_rank(flag_code)
# Extract the relevant subtable for efficiency
sub_table = data_table.loc[t1_idx:t2_idx, numflag_columns]
# Get the ranks for the current values in the subtable, using np.vectorize to avoid applymap
current_ranks = np.vectorize(get_rank)(sub_table.values) # Element-wise rank computation
# Compare ranks: If the rank of the flag_code is higher, it will override the current value
new_values = np.where(current_ranks < flag_code_rank, flag_code, sub_table.values)
# Update the dataframe with the new values
data_table.loc[t1_idx:t2_idx, numflag_columns] = new_values.astype(np.int64)
return data_table
# all_dat[VaporizerTemp_C >= heater_lower_lim & VaporizerTemp_C <= heater_upper_lim ,flag_heater_auto:="V"]
# all_dat[ABsamp >= AB_lower_lim & ABsamp <= AB_upper_lim ,flag_AB_auto:="V"]
# all_dat[FlowRate_ccs >= flow_lower_lim & FlowRate_ccs <= flow_upper_lim ,flag_flow_auto:="V"]
# all_dat[FilamentEmission_mA >= filament_lower_lim & FilamentEmission_mA <= filament_upper_lim ,flag_filament_auto:="V"]
if __name__ == '__main__':
# Set up argument parsing
parser = argparse.ArgumentParser(description="Calibrate species data using calibration factors.")
parser.add_argument(
"--flag-type",
required=True,
choices=["diagnostics", "species", "cpd"],
help="Specify the flag type. Must be one of: diagnostics, species, cpd"
)
parser.add_argument('data_file', type=str, help="Path to the input HDF5 file containing the data table.")
#parser.add_argument('dataset_name', type=str, help ='Relative path to data_table (i.e., dataset name) in HDF5 file')
#parser.add_argument('validity_thersholds_file', type=str, help="Path to the input YAML file containing calibration factors.")
#parser.add_argument('output_file', type=str, help="Path to save the output calibrated data as a CSV file.")
args = parser.parse_args()
flag_type = args.flag_type
data_file = args.data_file
# Load input data and calibration factors
try:
dataManager = dataOps.HDF5DataOpsManager(args.data_file)
dataManager.load_file_obj()
base_name = '/ACSM_TOFWARE'
if '/ACSM_TOFWARE' not in dataManager.file_obj:
dataManager.unload_file_obj()
print(f'Invalid data file: {data_file}. Missing instrument folder ACSM_TOFWARE.')
raise ImportError(f'Instrument folder "/ACSM_TOFWARE" not found in data_file : {data_file}')
dataManager.extract_and_load_dataset_metadata()
dataset_metadata_df = dataManager.dataset_metadata_df.copy()
# Find dataset associated with diagnostic channels
if flag_type == 'diagnostics':
keywords = ['ACSM_JFJ_','_meta.txt/data_table']
find_keyword = [all(keyword in item for keyword in keywords) for item in dataset_metadata_df['dataset_name']]
if flag_type == 'species':
keywords = ['ACSM_JFJ_','_timeseries.txt/data_table']
find_keyword = [all(keyword in item for keyword in keywords) for item in dataset_metadata_df['dataset_name']]
#dataset_name = dataset_metadata_df['dataset_name'][find_keyword]
#parent_file = dataset_metadata_df.loc[find_keyword,'parent_file']
#parent_flag_file = '_'.join([os.path.splitext(parent_file),'flags.csv'])
#parent_instrument = dataset_metadata_df.loc[find_keyword,'parent_instrument']
dataset_name = dataset_metadata_df['dataset_name'][find_keyword]
parent_file = dataset_metadata_df.loc[find_keyword,'parent_file']
#parent_flag_file = '_'.join([os.path.splitext(parent_file)[0],'flags.csv']) # Expected name
parent_instrument = dataset_metadata_df.loc[find_keyword,'parent_instrument']
print(':)')
if not (dataset_name.size == 1):
raise ValueError(f'{flag_type} file is not uniquely identifiable: {parent_file}')
else:
dataset_name = dataset_name.values[0]
parent_file = parent_file.values[0]
parent_instrument = parent_instrument.values[0]
data_table = dataManager.extract_dataset_as_dataframe(dataset_name)
datetime_var, datetime_var_format = dataManager.infer_datetime_variable(dataset_name)
#dataManager.extract_and_load_dataset_metadata()
#dataset_metadata_df = dataManager.dataset_metadata_df.copy()
#print(dataset_metadata_df.head())
#dataset_name_idx = dataset_metadata_df.index[(dataset_metadata_df['dataset_name']==args.dataset_name).to_numpy()]
#data_table_metadata = dataset_metadata_df.loc[dataset_name_idx,:]
#parent_instrument = data_table_metadata.loc[dataset_name_idx,'parent_instrument'].values[0]
#parent_file = data_table_metadata.loc[dataset_name_idx,'parent_file'].values[0]
dataManager.unload_file_obj()
except Exception as e:
print(f"Error loading input files: {e}")
exit(1)
path_to_output_dir, ext = os.path.splitext(args.data_file)
print('Path to output directory :', path_to_output_dir)
# Perform calibration
flag_type = args.flag_type
try:
# Define output directory of apply_calibration_factors() step
suffix = 'flags'
if len(parent_instrument.split('/')) >= 2:
instFolder = parent_instrument.split('/')[0]
category = parent_instrument.split('/')[1]
else:
instFolder = parent_instrument.split('/')[0]
category = ''
path_to_output_folder, ext = os.path.splitext('/'.join([path_to_output_dir,f'{instFolder}_{suffix}',category]))
processingScriptRelPath = os.path.relpath(thisFilePath,start=projectPath)
if not os.path.exists(path_to_output_folder):
os.makedirs(path_to_output_folder)
print('Processing script:', processingScriptRelPath)
print('Output directory:', path_to_output_folder)
# Compute diagnostic flags based on validity thresholds defined in configuration_file_dict
if flag_type == 'diagnostics':
validity_thresholds_dict = load_parameters(flag_type)
flags_table = generate_diagnostic_flags(data_table, validity_thresholds_dict)
if flag_type == 'species':
calib_param_dict = load_parameters(flag_type)
flags_table = generate_species_flags(data_table,calib_param_dict)
metadata = {'actris_level' : 1,
'processing_script': processingScriptRelPath.replace(os.sep,'/'),
'processing_date' : utils.created_at(),
'flag_type' : flag_type,
'datetime_var': datetime_var
}
# Save output tables to csv file and save/or update data lineage record
filename, ext = os.path.splitext(parent_file)
path_to_flags_file = '/'.join([path_to_output_folder, f'{filename}_flags.csv'])
#path_to_calibration_factors_file = '/'.join([path_to_output_folder, f'{filename}_calibration_factors.csv'])
flags_table.to_csv(path_to_flags_file, index=False)
status = stepUtils.record_data_lineage(path_to_flags_file, projectPath, metadata)
print(f"Flags saved to {path_to_flags_file}")
print(f"Data lineage saved to {path_to_output_dir}")
#flags_table.to_csv(path_to_flags_file, index=False)
# Read json and assign numeric flag to column
except Exception as e:
print(f"Error during calibration: {e}")
exit(1)