WIP: Completed flag generation for species, command line interface is now clear but still does not consider potentially existing flags. Review TODO.md, for efficiency we need to simplify flag assignment and reconciliation for the species flag type.

This commit is contained in:
2025-02-26 15:11:12 +01:00
parent 600c60be36
commit 33f29a8080
2 changed files with 221 additions and 111 deletions

View File

@ -66,6 +66,13 @@ def generate_diagnostic_flags(data_table):
print(f"Error accessing validation thresholds at: {validity_thersholds_file}")
return 1
# Define binary to ebas flag code map
# Specify labeling function to create numbered EBAS flags. It maps a column indicator,
# marking a time interval in which a particular flagging event occurred.
binary_to_ebas_code = {False : 0, True : 456}
# Initialize a dictionary to store indicator variables
indicator_variables = {}
@ -86,17 +93,25 @@ def generate_diagnostic_flags(data_table):
# Create an indicator variable for the current diagnostic variable
tmp = data_table[diagnostic_variable]
indicator_variables['flag_'+diagnostic_variable] = ((tmp >= lower_lim) & (tmp <= upper_lim)).to_numpy()
indicator_variables['flag_'+diagnostic_variable] = np.logical_not(((tmp >= lower_lim) & (tmp <= upper_lim)).to_numpy())
indicator_variables['numflag_'+diagnostic_variable] = np.array([binary_to_ebas_code[entry] for entry in indicator_variables['flag_'+diagnostic_variable]], dtype=np.int64)
# Add indicator variables to the new data table
new_data_table = pd.DataFrame(indicator_variables)
new_data_table['flag_any_diagnostic_flag'] = new_data_table.apply(lambda x : any(np.logical_not(x.values)), axis='columns')
aggr_func = lambda x : max(x.values)
new_data_table['numflag_any_diagnostic_flag'] = new_data_table.loc[:,['numflag_' in col for col in new_data_table.columns]].aggregate(aggr_func,axis='columns')
aggr_func = lambda x : np.nan if x.isna().all() else any(x.dropna().values)
new_data_table['flag_any_diagnostic_flag'] = new_data_table.loc[:,['flag_' in col for col in new_data_table.columns]].aggregate(aggr_func, axis='columns')
#new_data_table['flag_any_diagnostic_flag'] = new_data_table.apply(lambda x : any(np.logical_not(x.values)), axis='columns')
#new_data_table['flag_any_diagnostic'] = new_data_table.apply(
# lambda x: np.nan if x.isna().all() else any(x.dropna().values), axis='columns'
#)
return new_data_table
def generate_species_flags(data_table : pd.DataFrame , flags_table : pd.DataFrame ):
# TODO: abstract some of the code in the command line main
def generate_species_flags(data_table : pd.DataFrame):
"""Generate flags for columns in data_table based on flags_table
@ -106,10 +121,139 @@ def generate_species_flags(data_table : pd.DataFrame , flags_table : pd.DataFram
_description_
"""
# Get name of the specifies to flag based on diagnostics and manual flags
path_to_calib_params = os.path.normpath(os.path.join(projectPath,'pipelines/params/calibration_params.yaml'))
if not os.path.exists(path_to_calib_params):
raise FileNotFoundError(f'Calibration params file:{path_to_calib_params}')
with open(path_to_calib_params,'r') as stream:
calib_param_dict = yaml.safe_load(stream)
predefined_species = calib_param_dict['variables']['species']
print('Predefined_species:', predefined_species)
# Save output tables to csv file and save/or update data lineage record
filename, ext = os.path.splitext(parent_file)
path_to_flags_file = '/'.join([path_to_output_folder, f'{filename}_flags.csv'])
variables_set = set(data_table.columns)
print(variables_set)
manual_json_flags = []
csv_flags = []
# Inspect flags folder
for filename in os.listdir(path_to_output_folder):
if all([filename.endswith('.json'), 'metadata' not in filename, 'flag' in filename]):
manual_json_flags.append(filename)
elif filename.endswith('.csv'):
csv_flags.append(filename)
if csv_flags:
flags_table = pd.read_csv(os.path.join(path_to_output_folder, csv_flags[0]))
if 'numflag_any_diagnostic_flag' in flags_table.columns:
#renaming_map = {var: f'flag_{var}' for var in data_table.columns}
#data_table[renaming_map.keys()] = flags_table['flag_any_diagnostic_flag'].values
#data_table.rename(columns=renaming_map, inplace=True)
renaming_map = {}
for var in data_table.columns:
#print(var)
if (not datetime_var == var) and (var in predefined_species):
renaming_map[var] = f'numflag_{var}'
print(f'numflag_{var}')
data_table[var] = pd.Series(flags_table['numflag_any_diagnostic_flag'].values)
print(renaming_map)
data_table.rename(columns=renaming_map, inplace=True)
else:
raise FileNotFoundError("Automated diagnostic flag .csv not found. Hint: Run pipelines/step/generate_flags.py <campaignFile.h5> --flag-type diagnostics.")
numflag_columns = [col for col in data_table.columns if 'numflag_' in col]
print(numflag_columns)
for flag_filename in manual_json_flags:
#print(flag_filename)
parts = os.path.splitext(flag_filename)[0].split('_')
varname = '_'.join(parts[2:]) # Extract variable name from filename
#if f'flag_{varname}' in data_table.columns:
try:
# Load manually generate flag
with open(os.path.join(path_to_output_folder, flag_filename), 'r') as stream:
flag_dict = json.load(stream)
t1 = pd.to_datetime(flag_dict.get('startdate'))
t2 = pd.to_datetime(flag_dict.get('enddate'))
flag_code = flag_dict.get('flag_code', np.nan) # Default to NaN if missing
if pd.isnull(t1) or pd.isnull(t2):
continue # Skip if invalid timestamps
if not data_table[datetime_var].is_monotonic_increasing:
data_table.sort_values(by=datetime_var, inplace=True)
data_table.reset_index(drop=True, inplace=True)
t1_idx = int(abs(data_table[datetime_var] - t1).argmin())
t2_idx = int(abs(data_table[datetime_var] - t2).argmin())
#print(flag_code)
#for col in data_table.columns:
# if 'numflag_' in col:
# print(col)
data_table = reconcile_flags(data_table, flag_code, t1_idx, t2_idx, numflag_columns)
# Apply the ranking logic efficiently
#data_table.loc[t1_idx:t2_idx, numflag_columns] = data_table.loc[t1_idx:t2_idx, numflag_columns].map(
# lambda x: reconcile_flags(x, flag_code)
#)
#if 456 <= flag_code <= 800:
# data_table.loc[t1_idx:t2_idx, numflag_columns] = data_table.loc[t1_idx:t2_idx, numflag_columns].applymap(lambda x: max(x, flag_code))
#else:
# data_table.loc[t1_idx:t2_idx, numflag_columns] = flag_code
except (KeyError, ValueError, FileNotFoundError) as e:
print(f"Error processing {flag_filename}: {e}")
continue
return 0
return data_table.loc[:,numflag_columns]
with open('app/flags/ebas_dict.yaml','r') as stream:
ebas_dict = yaml.safe_load(stream)
flag_ranking = ebas_dict['flag_ranking']
# Vectorized function for getting the rank of a flag
def get_rank(flag):
return flag_ranking.get(flag, 0) # Default rank 0 for unknown flags
# Vectorized function for reconciling flags
def reconcile_flags(data_table, flag_code, t1_idx, t2_idx, numflag_columns):
# Get the rank of the flag_code
flag_code_rank = get_rank(flag_code)
# Extract the relevant subtable for efficiency
sub_table = data_table.loc[t1_idx:t2_idx, numflag_columns]
# Get the ranks for the current values in the subtable, using np.vectorize to avoid applymap
current_ranks = np.vectorize(get_rank)(sub_table.values) # Element-wise rank computation
# Compare ranks: If the rank of the flag_code is higher, it will override the current value
new_values = np.where(current_ranks < flag_code_rank, flag_code, sub_table.values)
# Update the dataframe with the new values
data_table.loc[t1_idx:t2_idx, numflag_columns] = new_values
return data_table
# all_dat[VaporizerTemp_C >= heater_lower_lim & VaporizerTemp_C <= heater_upper_lim ,flag_heater_auto:="V"]
# all_dat[ABsamp >= AB_lower_lim & ABsamp <= AB_upper_lim ,flag_AB_auto:="V"]
@ -128,31 +272,68 @@ if __name__ == '__main__':
help="Specify the flag type. Must be one of: diagnostics, species, cpd"
)
parser.add_argument('data_file', type=str, help="Path to the input HDF5 file containing the data table.")
parser.add_argument('dataset_name', type=str, help ='Relative path to data_table (i.e., dataset name) in HDF5 file')
#parser.add_argument('dataset_name', type=str, help ='Relative path to data_table (i.e., dataset name) in HDF5 file')
#parser.add_argument('validity_thersholds_file', type=str, help="Path to the input YAML file containing calibration factors.")
#parser.add_argument('output_file', type=str, help="Path to save the output calibrated data as a CSV file.")
args = parser.parse_args()
flag_type = args.flag_type
data_file = args.data_file
# Load input data and calibration factors
try:
#data_table = pd.read_json(args.data_file)
print(args.data_file)
dataManager = dataOps.HDF5DataOpsManager(args.data_file)
dataManager.load_file_obj()
dataset_name = '/'+args.dataset_name
data_table = dataManager.extract_dataset_as_dataframe('/'+args.dataset_name)
datetime_var, datetime_var_format = dataManager.infer_datetime_variable('/'+args.dataset_name)
base_name = '/ACSM_TOFWARE'
if '/ACSM_TOFWARE' not in dataManager.file_obj:
dataManager.unload_file_obj()
print(f'Invalid data file: {data_file}. Missing instrument folder ACSM_TOFWARE.')
raise ImportError(f'Instrument folder "/ACSM_TOFWARE" not found in data_file : {data_file}')
dataManager.extract_and_load_dataset_metadata()
dataset_metadata_df = dataManager.dataset_metadata_df.copy()
print(dataset_metadata_df.head())
dataset_name_idx = dataset_metadata_df.index[(dataset_metadata_df['dataset_name']==args.dataset_name).to_numpy()]
data_table_metadata = dataset_metadata_df.loc[dataset_name_idx,:]
parent_instrument = data_table_metadata.loc[dataset_name_idx,'parent_instrument'].values[0]
parent_file = data_table_metadata.loc[dataset_name_idx,'parent_file'].values[0]
# Find dataset associated with diagnostic channels
if flag_type == 'diagnostics':
keywords = ['ACSM_JFJ_','_meta.txt/data_table']
find_keyword = [all(keyword in item for keyword in keywords) for item in dataset_metadata_df['dataset_name']]
if flag_type == 'species':
keywords = ['ACSM_JFJ_','_timeseries.txt/data_table']
find_keyword = [all(keyword in item for keyword in keywords) for item in dataset_metadata_df['dataset_name']]
#dataset_name = dataset_metadata_df['dataset_name'][find_keyword]
#parent_file = dataset_metadata_df.loc[find_keyword,'parent_file']
#parent_flag_file = '_'.join([os.path.splitext(parent_file),'flags.csv'])
#parent_instrument = dataset_metadata_df.loc[find_keyword,'parent_instrument']
dataset_name = dataset_metadata_df['dataset_name'][find_keyword]
parent_file = dataset_metadata_df.loc[find_keyword,'parent_file']
#parent_flag_file = '_'.join([os.path.splitext(parent_file)[0],'flags.csv']) # Expected name
parent_instrument = dataset_metadata_df.loc[find_keyword,'parent_instrument']
print(':)')
if not (dataset_name.size == 1):
raise ValueError(f'{flag_type} file is not uniquely identifiable: {parent_file}')
else:
dataset_name = dataset_name.values[0]
parent_file = parent_file.values[0]
parent_instrument = parent_instrument.values[0]
data_table = dataManager.extract_dataset_as_dataframe(dataset_name)
datetime_var, datetime_var_format = dataManager.infer_datetime_variable(dataset_name)
#dataManager.extract_and_load_dataset_metadata()
#dataset_metadata_df = dataManager.dataset_metadata_df.copy()
#print(dataset_metadata_df.head())
#dataset_name_idx = dataset_metadata_df.index[(dataset_metadata_df['dataset_name']==args.dataset_name).to_numpy()]
#data_table_metadata = dataset_metadata_df.loc[dataset_name_idx,:]
#parent_instrument = data_table_metadata.loc[dataset_name_idx,'parent_instrument'].values[0]
#parent_file = data_table_metadata.loc[dataset_name_idx,'parent_file'].values[0]
dataManager.unload_file_obj()
@ -191,104 +372,33 @@ if __name__ == '__main__':
# Compute diagnostic flags based on validity thresholds defined in configuration_file_dict
if flag_type == 'diagnostics':
flags_table = generate_diagnostic_flags(data_table)
flags_table = generate_diagnostic_flags(data_table)
metadata = {'actris_level' : 1,
'processing_script': processingScriptRelPath.replace(os.sep,'/'),
'processing_date' : utils.created_at(),
'flag_type' : flag_type
}
# Save output tables to csv file and save/or update data lineage record
filename, ext = os.path.splitext(parent_file)
path_to_flags_file = '/'.join([path_to_output_folder, f'{filename}_flags.csv'])
#path_to_calibration_factors_file = '/'.join([path_to_output_folder, f'{filename}_calibration_factors.csv'])
flags_table.to_csv(path_to_flags_file, index=False)
status = stepUtils.record_data_lineage(path_to_flags_file, projectPath, metadata)
if flag_type == 'species':
flags_table = generate_species_flags(data_table)
metadata = {'actris_level' : 1,
'processing_script': processingScriptRelPath.replace(os.sep,'/'),
'processing_date' : utils.created_at(),
'flag_type' : flag_type
}
# Save output tables to csv file and save/or update data lineage record
filename, ext = os.path.splitext(parent_file)
path_to_flags_file = '/'.join([path_to_output_folder, f'{filename}_flags.csv'])
#path_to_calibration_factors_file = '/'.join([path_to_output_folder, f'{filename}_calibration_factors.csv'])
flags_table.to_csv(path_to_flags_file, index=False)
status = stepUtils.record_data_lineage(path_to_flags_file, projectPath, metadata)
print(f"Flags saved to {path_to_flags_file}")
print(f"Data lineage saved to {path_to_output_dir}")
if flag_type == 'species':
# Save output tables to csv file and save/or update data lineage record
filename, ext = os.path.splitext(parent_file)
path_to_flags_file = '/'.join([path_to_output_folder, f'{filename}_flags.csv'])
variables_set = set(data_table.columns)
manual_json_flags = []
csv_flags = []
# Inspect flags folder
for filename in os.listdir(path_to_output_folder):
if any(var in filename and filename.endswith('.json') for var in variables_set):
manual_json_flags.append(filename)
elif filename.endswith('.csv'):
csv_flags.append(filename)
if len(csv_flags) == 1:
flags_table = pd.read_csv(os.path.join(path_to_output_folder, csv_flags[0]))
if 'flag_any_diagnostic_flag' in flags_table.columns:
#renaming_map = {var: f'flag_{var}' for var in data_table.columns}
#data_table[renaming_map.keys()] = flags_table['flag_any_diagnostic_flag'].values
#data_table.rename(columns=renaming_map, inplace=True)
renaming_map = {}
for var in data_table.columns:
if not datetime_var == var:
renaming_map[var] = f'flag_{var}'
data_table[var] = pd.Series(flags_table['flag_any_diagnostic_flag'].values)
print(renaming_map)
data_table.rename(columns=renaming_map, inplace=True)
print(csv_flags)
# TODO: validate the code below. I suspect we need to change the flag creation strategy.
# First, aggregate manual num flags as one using median maybe and then create a flag for each specie
# taking into account the existing specifies flags (read csv file and update it if needed)
for flag_filename in manual_json_flags:
parts = os.path.splitext(flag_filename)[0].split('_')
varname = '_'.join(parts[2:]) # Extract variable name from filename
print(varname)
if f'flag_{varname}' in data_table.columns:
try:
with open(os.path.join(path_to_output_folder, flag_filename), 'r') as stream:
flag_dict = json.load(stream)
t1 = pd.to_datetime(flag_dict.get('startdate'))
t2 = pd.to_datetime(flag_dict.get('enddate'))
flag_code = flag_dict.get('flag_code', np.nan) # Default to NaN if missing
if pd.isnull(t1) or pd.isnull(t2):
continue # Skip if invalid timestamps
if not data_table[datetime_var].is_monotonic_increasing:
data_table.sort_values(by=datetime_var, inplace=True)
data_table.reset_index(drop=True, inplace=True)
t1_idx = abs(data_table[datetime_var] - t1).argmin()
t2_idx = abs(data_table[datetime_var] - t2).argmin()
data_table.loc[t1_idx:t2_idx, f'flag_{varname}'] = flag_code
except (KeyError, ValueError, FileNotFoundError) as e:
print(f"Error processing {flag_filename}: {e}")
continue
data_table.to_csv(path_to_flags_file, index=False)
print(f"Flags saved to {path_to_flags_file}")
print(f"Data lineage saved to {path_to_output_dir}")
#flags_table.to_csv(path_to_flags_file, index=False)
# Read json and assign numeric flag to column
except Exception as e:
print(f"Error during calibration: {e}")
exit(1)

View File

@ -49,8 +49,8 @@ def visualize_table_variables(data_file_path, dataset_name, flags_dataset_name,
var_flag_name = f"flag_{var}"
if var_flag_name in flags_df.columns:
# Identify valid and invalid indices
ind_valid = flags_df[var_flag_name].to_numpy()
ind_invalid = np.logical_not(ind_valid)
ind_invalid = flags_df[var_flag_name].to_numpy()
# ind_valid = np.logical_not(ind_valid)
# Detect start and end indices of invalid regions
# Find transition points in invalid regions
invalid_starts = np.diff(np.concatenate(([False], ind_invalid, [False]))).nonzero()[0][::2]