mirror of
https://gitea.psi.ch/APOG/acsmnode.git
synced 2025-07-04 07:04:48 +02:00
Add generate cpc flags pipelines/steps/generate_flags.py
This commit is contained in:
@ -151,7 +151,9 @@
|
||||
"#command = ['python', 'pipelines/steps/compute_automated_flags.py', path_to_data_file, dataset_name, path_to_config_file]\n",
|
||||
"#status = subprocess.run(command, capture_output=True, check=True)\n",
|
||||
"#print(status.stdout.decode())\n",
|
||||
"generate_flags(path_to_data_file, 'diagnostics')\n"
|
||||
"generate_flags(path_to_data_file, 'diagnostics')\n",
|
||||
"\n",
|
||||
"generate_flags(path_to_data_file, 'cpc')\n"
|
||||
]
|
||||
},
|
||||
{
|
||||
|
@ -28,12 +28,28 @@ import dima.src.hdf5_ops as dataOps
|
||||
import pipelines.steps.utils as stepUtils
|
||||
import dima.utils.g5505_utils as utils
|
||||
import json
|
||||
from pipelines.steps.utils import load_project_yaml_files
|
||||
from pipelines.steps.utils import load_project_yaml_files, get_metadata
|
||||
from math import floor
|
||||
|
||||
def compute_cpc_flags():
|
||||
# TODO: ask rob where to find this information.
|
||||
def generate_cpc_flags(data_table, datetime_var: str = 'start_time'):
|
||||
# TODO: ask Rob where to find this information.
|
||||
required_variables = ['start_time', 'end_time', 'st_y', 'ed_y', 'p_int', 'T_int', 'conc', 'numflag']
|
||||
|
||||
# Optionally check if required variables exist in data_table
|
||||
# Uncomment if you want to enforce the check:
|
||||
# if not all(var in data_table.columns for var in required_variables):
|
||||
# raise ValueError("Some required variables are missing from the data_table.")
|
||||
|
||||
# Select only datetime and numflag columns
|
||||
flags_table = data_table.loc[:, [datetime_var, 'numflag']].copy()
|
||||
print(flags_table.head())
|
||||
|
||||
# Multiply numflag by 100 and floor it
|
||||
flags_table['numflag'] = flags_table['numflag'].apply(lambda x: int(floor(x * 1000)))
|
||||
flags_table.rename(columns={'numflag':'numflag_cpc'}, inplace=True)
|
||||
|
||||
return flags_table
|
||||
|
||||
return 0
|
||||
|
||||
#def compute_diagnostic_variable_flags(data_table, validity_thresholds_dict):
|
||||
def generate_diagnostic_flags(data_table, validity_thresholds_dict):
|
||||
@ -101,6 +117,7 @@ def generate_diagnostic_flags(data_table, validity_thresholds_dict):
|
||||
#)
|
||||
return new_data_table
|
||||
|
||||
from scipy.interpolate import interp1d
|
||||
|
||||
def generate_species_flags(data_table : pd.DataFrame, calib_param_dict : dict, flagsFolderPath, datetime_var : str = 't_start_Buf'):
|
||||
|
||||
@ -124,29 +141,79 @@ def generate_species_flags(data_table : pd.DataFrame, calib_param_dict : dict, f
|
||||
#print(manual_json_flags,csv_flags)
|
||||
|
||||
if csv_flags:
|
||||
flags_table = pd.read_csv(os.path.join(flagsFolderPath, csv_flags[0]))
|
||||
|
||||
# Loop over CSV files in the flags folder
|
||||
for filename in csv_flags:
|
||||
filePath = os.path.join(flagsFolderPath, filename)
|
||||
fileMetadataDict = get_metadata(filePath)
|
||||
flag_datetime_var = fileMetadataDict['datetime_var']
|
||||
|
||||
flags_table = pd.read_csv(filePath)
|
||||
|
||||
# Ensure datetime type
|
||||
data_table[datetime_var] = pd.to_datetime(data_table[datetime_var])
|
||||
flags_table[flag_datetime_var] = pd.to_datetime(flags_table[flag_datetime_var])
|
||||
|
||||
if 'numflag_any_diagnostic_flag' in flags_table.columns:
|
||||
# Sort for interpolation
|
||||
flags_table_sorted = flags_table.sort_values(flag_datetime_var)
|
||||
|
||||
# Define condition for renaming
|
||||
required = lambda var: var != datetime_var and var in predefined_species
|
||||
# Pre-convert datetimes to int64 for interpolation
|
||||
flag_times_int = flags_table_sorted[flag_datetime_var].astype(np.int64)
|
||||
data_times_int = data_table[datetime_var].astype(np.int64)
|
||||
|
||||
# Create renaming map using dictionary comprehension
|
||||
renaming_map = {var: f'numflag_{var}' for var in data_table.columns if required(var)}
|
||||
|
||||
# Assign the same flag values across required columns using vectorized operation
|
||||
variables = list(renaming_map.keys())
|
||||
data_table[variables] = np.tile(
|
||||
flags_table['numflag_any_diagnostic_flag'].values[:, None],
|
||||
(1, len(variables))
|
||||
# Nearest-neighbor interpolator
|
||||
flag_interp_func = interp1d(
|
||||
flag_times_int,
|
||||
flags_table_sorted['numflag_any_diagnostic_flag'],
|
||||
kind='nearest',
|
||||
bounds_error=False,
|
||||
fill_value='extrapolate'
|
||||
)
|
||||
#print(renaming_map)
|
||||
|
||||
# Interpolated flags
|
||||
interpolated_flags = flag_interp_func(data_times_int)
|
||||
|
||||
# Define which columns to flag
|
||||
required = lambda var: var != datetime_var and var in predefined_species
|
||||
renaming_map = {var: f'numflag_{var}' for var in data_table.columns if required(var)}
|
||||
variables = list(renaming_map.keys())
|
||||
|
||||
# Assign flags to those columns
|
||||
for var in variables:
|
||||
data_table[var] = interpolated_flags
|
||||
|
||||
data_table.rename(columns=renaming_map, inplace=True)
|
||||
|
||||
elif 'numflag_cpc' in flags_table.columns:
|
||||
# Sort for interpolation
|
||||
flags_table_sorted = flags_table.sort_values(flag_datetime_var)
|
||||
|
||||
# Pre-convert datetimes to int64 for interpolation
|
||||
flag_times_int = flags_table_sorted[flag_datetime_var].values.astype(np.int64)
|
||||
data_times_int = data_table[datetime_var].values.astype(np.int64)
|
||||
|
||||
# Nearest-neighbor interpolator
|
||||
flag_interp_func = interp1d(
|
||||
flag_times_int,
|
||||
flags_table_sorted['numflag_cpc'],
|
||||
kind='nearest',
|
||||
bounds_error=False,
|
||||
fill_value='extrapolate'
|
||||
)
|
||||
|
||||
# Interpolated flags
|
||||
interpolated_cpc_flags = flag_interp_func(data_times_int)
|
||||
|
||||
|
||||
else:
|
||||
raise FileNotFoundError("Automated diagnostic flag .csv not found. Hint: Run pipelines/step/generate_flags.py <campaignFile.h5> --flag-type diagnostics.")
|
||||
|
||||
|
||||
numflag_columns = [col for col in data_table.columns if 'numflag_' in col]
|
||||
|
||||
data_table = reconcile_flags(data_table, interpolated_cpc_flags, 0, interpolated_cpc_flags.size, numflag_columns)
|
||||
|
||||
#print(numflag_columns)
|
||||
for flag_filename in manual_json_flags:
|
||||
#print(flag_filename)
|
||||
@ -196,27 +263,45 @@ with open(path_to_ebas_dict ,'r') as stream:
|
||||
|
||||
# Vectorized function for getting the rank of a flag
|
||||
def get_rank(flag):
|
||||
return flag_ranking.get(flag, np.nan) # Default rank 0 for unknown flags
|
||||
return flag_ranking.get(flag, -10) # Default rank is NaN for unknown flags
|
||||
|
||||
# Vectorized function for reconciling flags
|
||||
def reconcile_flags(data_table, flag_code, t1_idx, t2_idx, numflag_columns):
|
||||
# Get the rank of the flag_code
|
||||
flag_code_rank = get_rank(flag_code)
|
||||
# Extract the relevant subtable
|
||||
sub_table = data_table.loc[t1_idx:t2_idx, numflag_columns].copy()
|
||||
|
||||
# Extract the relevant subtable for efficiency
|
||||
sub_table = data_table.loc[t1_idx:t2_idx, numflag_columns]
|
||||
# Compute ranks of current values
|
||||
current_ranks = np.vectorize(get_rank)(sub_table.values)
|
||||
|
||||
# Get the ranks for the current values in the subtable, using np.vectorize to avoid applymap
|
||||
current_ranks = np.vectorize(get_rank)(sub_table.values) # Element-wise rank computation
|
||||
# Handle flag_code: broadcast scalar or reshape array
|
||||
if np.isscalar(flag_code):
|
||||
flag_code_values = np.full(sub_table.shape, flag_code)
|
||||
flag_code_ranks = np.full(sub_table.shape, get_rank(flag_code))
|
||||
else:
|
||||
# Convert to NumPy array and ensure correct shape
|
||||
flag_code_array = np.asarray(flag_code)
|
||||
if flag_code_array.ndim == 1:
|
||||
# Assume it's one flag per row — broadcast across columns
|
||||
flag_code_values = np.tile(flag_code_array[:, None], (1, len(numflag_columns)))
|
||||
flag_code_ranks = np.vectorize(get_rank)(flag_code_values)
|
||||
else:
|
||||
# Full 2D matrix
|
||||
flag_code_values = flag_code_array
|
||||
flag_code_ranks = np.vectorize(get_rank)(flag_code_array)
|
||||
|
||||
# Compare ranks: If the rank of the flag_code is higher, it will override the current value
|
||||
new_values = np.where(current_ranks < flag_code_rank, flag_code, sub_table.values)
|
||||
# Validate shape match
|
||||
if flag_code_values.shape != sub_table.shape:
|
||||
raise ValueError(f"Shape mismatch: expected {sub_table.shape}, got {flag_code_values.shape}")
|
||||
|
||||
# Update the dataframe with the new values
|
||||
# Reconcile values based on rank comparison
|
||||
new_values = np.where(current_ranks < flag_code_ranks, flag_code_values, sub_table.values)
|
||||
|
||||
# Assign reconciled values back
|
||||
data_table.loc[t1_idx:t2_idx, numflag_columns] = new_values.astype(np.int64)
|
||||
|
||||
return data_table
|
||||
|
||||
|
||||
# all_dat[VaporizerTemp_C >= heater_lower_lim & VaporizerTemp_C <= heater_upper_lim ,flag_heater_auto:="V"]
|
||||
# all_dat[ABsamp >= AB_lower_lim & ABsamp <= AB_upper_lim ,flag_AB_auto:="V"]
|
||||
# all_dat[FlowRate_ccs >= flow_lower_lim & FlowRate_ccs <= flow_upper_lim ,flag_flow_auto:="V"]
|
||||
@ -248,6 +333,10 @@ def main(data_file, flag_type):
|
||||
keywords = [f'ACSM_{STATION_ABBR}_','_timeseries.txt/data_table']
|
||||
find_keyword = [all(keyword in item for keyword in keywords) for item in dataset_metadata_df['dataset_name']]
|
||||
|
||||
if flag_type == 'cpc':
|
||||
keywords = ['cpc.particle_number_concentration.aerosol.', f'CH02L_TSI_3772_{STATION_ABBR}.CH02L_CPC.lev1.nas']
|
||||
find_keyword = [all(keyword in item for keyword in keywords) for item in dataset_metadata_df['dataset_name']]
|
||||
|
||||
# Specify source dataset to be extracted from input hdf5 data file
|
||||
columns = ['dataset_name','parent_file','parent_instrument']
|
||||
dataset_name, parent_file, parent_instrument = tuple(dataset_metadata_df.loc[find_keyword,col] for col in columns)
|
||||
@ -316,6 +405,9 @@ def main(data_file, flag_type):
|
||||
#calib_param_dict = load_parameters(flag_type)
|
||||
calib_param_dict = load_project_yaml_files(projectPath, "calibration_params.yaml")
|
||||
flags_table = generate_species_flags(data_table,calib_param_dict,path_to_output_folder,datetime_var)
|
||||
if flag_type == 'cpc':
|
||||
print(':D')
|
||||
flags_table = generate_cpc_flags(data_table, datetime_var)
|
||||
|
||||
metadata = {'actris_level' : 1,
|
||||
'processing_script': processingScriptRelPath.replace(os.sep,'/'),
|
||||
@ -361,7 +453,7 @@ def get_flags_from_folder(flagsFolderPath):
|
||||
if folderitem.startswith('flag') and folderitem.endswith('.json'):
|
||||
manual_json_flags.append(folderitem)
|
||||
# Identify CSV flag files
|
||||
elif folderitem.endswith('.csv'):
|
||||
elif folderitem.endswith('.csv') and '_flags' in folderitem:
|
||||
csv_flags.append(folderitem)
|
||||
|
||||
# Return the lists of manual flag JSON files and CSV flag files
|
||||
@ -375,8 +467,8 @@ if __name__ == '__main__':
|
||||
parser.add_argument(
|
||||
"--flag-type",
|
||||
required=True,
|
||||
choices=["diagnostics", "species", "cpd"],
|
||||
help="Specify the flag type. Must be one of: diagnostics, species, cpd"
|
||||
choices=["diagnostics", "species", "cpc"],
|
||||
help="Specify the flag type. Must be one of: diagnostics, species, cpc"
|
||||
)
|
||||
parser.add_argument('data_file', type=str, help="Path to the input HDF5 file containing the data table.")
|
||||
#parser.add_argument('dataset_name', type=str, help ='Relative path to data_table (i.e., dataset name) in HDF5 file')
|
||||
|
Reference in New Issue
Block a user