mirror of
https://gitea.psi.ch/APOG/acsmnode.git
synced 2025-06-24 21:21:08 +02:00
Refactored command line interface, generate_species_flags, and created functions to improve performance and code clarity.
This commit is contained in:
@ -101,8 +101,8 @@ def generate_diagnostic_flags(data_table, validity_thresholds_dict):
|
||||
#)
|
||||
return new_data_table
|
||||
|
||||
# TODO: abstract some of the code in the command line main
|
||||
def generate_species_flags(data_table : pd.DataFrame, calib_param_dict : dict):
|
||||
|
||||
def generate_species_flags(data_table : pd.DataFrame, calib_param_dict : dict, flagsFolderPath, datetime_var : str = 't_start_Buf'):
|
||||
|
||||
"""Generate flags for columns in data_table based on flags_table
|
||||
|
||||
@ -121,37 +121,30 @@ def generate_species_flags(data_table : pd.DataFrame, calib_param_dict : dict):
|
||||
|
||||
print('Predefined_species:', predefined_species)
|
||||
|
||||
# Save output tables to csv file and save/or update data lineage record
|
||||
filename, ext = os.path.splitext(parent_file)
|
||||
path_to_flags_file = '/'.join([path_to_output_folder, f'{filename}_flags.csv'])
|
||||
|
||||
variables_set = set(data_table.columns)
|
||||
print(variables_set)
|
||||
|
||||
manual_json_flags = []
|
||||
csv_flags = []
|
||||
|
||||
# Inspect flags folder
|
||||
for filename in os.listdir(path_to_output_folder):
|
||||
if all([filename.endswith('.json'), 'metadata' not in filename, 'flag' in filename]):
|
||||
manual_json_flags.append(filename)
|
||||
elif filename.endswith('.csv'):
|
||||
csv_flags.append(filename)
|
||||
|
||||
manual_json_flags, csv_flags = get_flags_from_folder(flagsFolderPath)
|
||||
|
||||
print(manual_json_flags,csv_flags)
|
||||
|
||||
if csv_flags:
|
||||
flags_table = pd.read_csv(os.path.join(path_to_output_folder, csv_flags[0]))
|
||||
flags_table = pd.read_csv(os.path.join(flagsFolderPath, csv_flags[0]))
|
||||
if 'numflag_any_diagnostic_flag' in flags_table.columns:
|
||||
#renaming_map = {var: f'flag_{var}' for var in data_table.columns}
|
||||
#data_table[renaming_map.keys()] = flags_table['flag_any_diagnostic_flag'].values
|
||||
#data_table.rename(columns=renaming_map, inplace=True)
|
||||
|
||||
renaming_map = {}
|
||||
for var in data_table.columns:
|
||||
#print(var)
|
||||
if (not datetime_var == var) and (var in predefined_species):
|
||||
renaming_map[var] = f'numflag_{var}'
|
||||
print(f'numflag_{var}')
|
||||
data_table[var] = pd.Series(flags_table['numflag_any_diagnostic_flag'].values,dtype=np.int64)
|
||||
# Define condition for renaming
|
||||
required = lambda var: var != datetime_var and var in predefined_species
|
||||
|
||||
# Create renaming map using dictionary comprehension
|
||||
renaming_map = {var: f'numflag_{var}' for var in data_table.columns if required(var)}
|
||||
|
||||
# Assign the same flag values across required columns using vectorized operation
|
||||
variables = list(renaming_map.keys())
|
||||
data_table[variables] = np.tile(
|
||||
flags_table['numflag_any_diagnostic_flag'].values[:, None],
|
||||
(1, len(variables))
|
||||
)
|
||||
print(renaming_map)
|
||||
data_table.rename(columns=renaming_map, inplace=True)
|
||||
else:
|
||||
@ -169,7 +162,7 @@ def generate_species_flags(data_table : pd.DataFrame, calib_param_dict : dict):
|
||||
#if f'flag_{varname}' in data_table.columns:
|
||||
try:
|
||||
# Load manually generate flag
|
||||
with open(os.path.join(path_to_output_folder, flag_filename), 'r') as stream:
|
||||
with open(os.path.join(flagsFolderPath, flag_filename), 'r') as stream:
|
||||
flag_dict = json.load(stream)
|
||||
|
||||
t1 = pd.to_datetime(flag_dict.get('startdate'))
|
||||
@ -192,18 +185,6 @@ def generate_species_flags(data_table : pd.DataFrame, calib_param_dict : dict):
|
||||
# print(col)
|
||||
|
||||
data_table = reconcile_flags(data_table, flag_code, t1_idx, t2_idx, numflag_columns)
|
||||
|
||||
# Apply the ranking logic efficiently
|
||||
#data_table.loc[t1_idx:t2_idx, numflag_columns] = data_table.loc[t1_idx:t2_idx, numflag_columns].map(
|
||||
# lambda x: reconcile_flags(x, flag_code)
|
||||
#)
|
||||
|
||||
|
||||
#if 456 <= flag_code <= 800:
|
||||
# data_table.loc[t1_idx:t2_idx, numflag_columns] = data_table.loc[t1_idx:t2_idx, numflag_columns].applymap(lambda x: max(x, flag_code))
|
||||
#else:
|
||||
# data_table.loc[t1_idx:t2_idx, numflag_columns] = flag_code
|
||||
|
||||
except (KeyError, ValueError, FileNotFoundError) as e:
|
||||
print(f"Error processing {flag_filename}: {e}")
|
||||
continue
|
||||
@ -247,28 +228,8 @@ def reconcile_flags(data_table, flag_code, t1_idx, t2_idx, numflag_columns):
|
||||
# all_dat[FlowRate_ccs >= flow_lower_lim & FlowRate_ccs <= flow_upper_lim ,flag_flow_auto:="V"]
|
||||
# all_dat[FilamentEmission_mA >= filament_lower_lim & FilamentEmission_mA <= filament_upper_lim ,flag_filament_auto:="V"]
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
# Set up argument parsing
|
||||
parser = argparse.ArgumentParser(description="Generate flags for diagnostics and species variables.")
|
||||
|
||||
parser.add_argument(
|
||||
"--flag-type",
|
||||
required=True,
|
||||
choices=["diagnostics", "species", "cpd"],
|
||||
help="Specify the flag type. Must be one of: diagnostics, species, cpd"
|
||||
)
|
||||
parser.add_argument('data_file', type=str, help="Path to the input HDF5 file containing the data table.")
|
||||
#parser.add_argument('dataset_name', type=str, help ='Relative path to data_table (i.e., dataset name) in HDF5 file')
|
||||
#parser.add_argument('validity_thersholds_file', type=str, help="Path to the input YAML file containing calibration factors.")
|
||||
#parser.add_argument('output_file', type=str, help="Path to save the output calibrated data as a CSV file.")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
flag_type = args.flag_type
|
||||
data_file = args.data_file
|
||||
|
||||
# Load input data and calibration factors
|
||||
def main(data_file, flag_type):
|
||||
# Open data file and load dataset associated with flag_type : either diagnostics or species
|
||||
try:
|
||||
dataManager = dataOps.HDF5DataOpsManager(args.data_file)
|
||||
dataManager.load_file_obj()
|
||||
@ -292,16 +253,10 @@ if __name__ == '__main__':
|
||||
if flag_type == 'species':
|
||||
keywords = ['ACSM_JFJ_','_timeseries.txt/data_table']
|
||||
find_keyword = [all(keyword in item for keyword in keywords) for item in dataset_metadata_df['dataset_name']]
|
||||
#dataset_name = dataset_metadata_df['dataset_name'][find_keyword]
|
||||
#parent_file = dataset_metadata_df.loc[find_keyword,'parent_file']
|
||||
#parent_flag_file = '_'.join([os.path.splitext(parent_file),'flags.csv'])
|
||||
#parent_instrument = dataset_metadata_df.loc[find_keyword,'parent_instrument']
|
||||
|
||||
dataset_name = dataset_metadata_df['dataset_name'][find_keyword]
|
||||
parent_file = dataset_metadata_df.loc[find_keyword,'parent_file']
|
||||
#parent_flag_file = '_'.join([os.path.splitext(parent_file)[0],'flags.csv']) # Expected name
|
||||
parent_instrument = dataset_metadata_df.loc[find_keyword,'parent_instrument']
|
||||
|
||||
# Specify source dataset to be extracted from input hdf5 data file
|
||||
columns = ['dataset_name','parent_file','parent_instrument']
|
||||
dataset_name, parent_file, parent_instrument = tuple(dataset_metadata_df.loc[find_keyword,col] for col in columns)
|
||||
print(':)')
|
||||
if not (dataset_name.size == 1):
|
||||
raise ValueError(f'{flag_type} file is not uniquely identifiable: {parent_file}')
|
||||
@ -313,6 +268,7 @@ if __name__ == '__main__':
|
||||
data_table = dataManager.extract_dataset_as_dataframe(dataset_name)
|
||||
datetime_var, datetime_var_format = dataManager.infer_datetime_variable(dataset_name)
|
||||
|
||||
|
||||
# Count the number of NaT (null) values
|
||||
num_nats = data_table[datetime_var].isna().sum()
|
||||
# Get the total number of rows
|
||||
@ -325,20 +281,18 @@ if __name__ == '__main__':
|
||||
|
||||
dataManager.unload_file_obj()
|
||||
|
||||
except Exception as e:
|
||||
except Exception as e:
|
||||
print(f"Error loading input files: {e}")
|
||||
exit(1)
|
||||
|
||||
path_to_output_dir, ext = os.path.splitext(args.data_file)
|
||||
|
||||
print('Path to output directory :', path_to_output_dir)
|
||||
finally:
|
||||
dataManager.unload_file_obj()
|
||||
|
||||
|
||||
|
||||
# Perform calibration
|
||||
|
||||
flag_type = args.flag_type
|
||||
print('Starting flag generation.')
|
||||
try:
|
||||
path_to_output_dir, ext = os.path.splitext(data_file)
|
||||
print('Path to output directory :', path_to_output_dir)
|
||||
# Define output directory of apply_calibration_factors() step
|
||||
suffix = 'flags'
|
||||
if len(parent_instrument.split('/')) >= 2:
|
||||
@ -367,7 +321,7 @@ if __name__ == '__main__':
|
||||
if flag_type == 'species':
|
||||
#calib_param_dict = load_parameters(flag_type)
|
||||
calib_param_dict = load_project_yaml_files(projectPath, "calibration_params.yaml")
|
||||
flags_table = generate_species_flags(data_table,calib_param_dict)
|
||||
flags_table = generate_species_flags(data_table,calib_param_dict,path_to_output_folder,datetime_var)
|
||||
|
||||
metadata = {'actris_level' : 1,
|
||||
'processing_script': processingScriptRelPath.replace(os.sep,'/'),
|
||||
@ -396,3 +350,48 @@ if __name__ == '__main__':
|
||||
print(f"Error during calibration: {e}")
|
||||
exit(1)
|
||||
|
||||
|
||||
def get_flags_from_folder(flagsFolderPath):
|
||||
# Get current state of flags folder, which will condition the species flagging
|
||||
manual_json_flags = []
|
||||
csv_flags = []
|
||||
|
||||
# Loop through all files in the flags folder
|
||||
for folderitem in os.listdir(flagsFolderPath):
|
||||
|
||||
# Skip system-level metadata JSON file
|
||||
if all([folderitem.endswith('.json'), 'metadata' in folderitem]):
|
||||
continue
|
||||
|
||||
# Identify manual flag JSON files with 'flag_<number>_<variablename>.json' format
|
||||
if folderitem.startswith('flag') and folderitem.endswith('.json'):
|
||||
manual_json_flags.append(folderitem)
|
||||
# Identify CSV flag files
|
||||
elif folderitem.endswith('.csv'):
|
||||
csv_flags.append(folderitem)
|
||||
|
||||
# Return the lists of manual flag JSON files and CSV flag files
|
||||
return manual_json_flags, csv_flags
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
# Set up argument parsing
|
||||
parser = argparse.ArgumentParser(description="Generate flags for diagnostics and species variables.")
|
||||
|
||||
parser.add_argument(
|
||||
"--flag-type",
|
||||
required=True,
|
||||
choices=["diagnostics", "species", "cpd"],
|
||||
help="Specify the flag type. Must be one of: diagnostics, species, cpd"
|
||||
)
|
||||
parser.add_argument('data_file', type=str, help="Path to the input HDF5 file containing the data table.")
|
||||
#parser.add_argument('dataset_name', type=str, help ='Relative path to data_table (i.e., dataset name) in HDF5 file')
|
||||
#parser.add_argument('validity_thersholds_file', type=str, help="Path to the input YAML file containing calibration factors.")
|
||||
#parser.add_argument('output_file', type=str, help="Path to save the output calibrated data as a CSV file.")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
flag_type = args.flag_type
|
||||
data_file = args.data_file
|
||||
|
||||
main(data_file, flag_type)
|
Reference in New Issue
Block a user