mirror of
https://gitea.psi.ch/APOG/acsm-fairifier.git
synced 2025-07-14 03:11:48 +02:00
571 lines
25 KiB
Python
571 lines
25 KiB
Python
import sys, os
|
|
|
|
try:
|
|
thisFilePath = os.path.abspath(__file__)
|
|
print(thisFilePath)
|
|
except NameError:
|
|
print("[Notice] The __file__ attribute is unavailable in this environment (e.g., Jupyter or IDLE).")
|
|
print("When using a terminal, make sure the working directory is set to the script's location to prevent path issues (for the DIMA submodule)")
|
|
#print("Otherwise, path to submodule DIMA may not be resolved properly.")
|
|
thisFilePath = os.getcwd() # Use current directory or specify a default
|
|
|
|
import numpy as np
|
|
import pandas as pd
|
|
import argparse
|
|
|
|
import yaml, json
|
|
|
|
projectPath = os.path.normpath(os.path.join(thisFilePath, "..", "..",'..')) # Move up to project root
|
|
dimaPath = os.path.normpath('/'.join([projectPath,'dima']))
|
|
#print('Project path:', projectPath)
|
|
#print('DIMA path:', dimaPath)
|
|
|
|
|
|
# Set up project root directory
|
|
sys.path.insert(0,projectPath)
|
|
sys.path.insert(0,dimaPath)
|
|
import dima.src.hdf5_ops as dataOps
|
|
import pipelines.steps.utils as stepUtils
|
|
import dima.utils.g5505_utils as utils
|
|
import json
|
|
from pipelines.steps.utils import load_project_yaml_files, get_metadata
|
|
from math import floor
|
|
|
|
path_to_ebas_dict = os.path.normpath(os.path.join(projectPath,'app/flags/ebas_dict.yaml'))
|
|
with open(path_to_ebas_dict ,'r') as stream:
|
|
ebas_dict = yaml.safe_load(stream)
|
|
flags_dict = ebas_dict['flags']
|
|
flag_ranking = ebas_dict['flag_ranking']
|
|
|
|
|
|
# Vectorized function for getting the rank of a flag
|
|
def get_rank(flag):
|
|
return flag_ranking.get(flag, -10) # Default rank is NaN for unknown flags
|
|
|
|
# Vectorized function for reconciling flags
|
|
def reconcile_flags(data_table, flag_code, t1_idx, t2_idx, numflag_columns):
|
|
# Extract the relevant subtable
|
|
sub_table = data_table.loc[t1_idx:t2_idx, numflag_columns].copy()
|
|
|
|
# Compute ranks of current values
|
|
current_ranks = np.vectorize(get_rank)(sub_table.values)
|
|
|
|
# Handle flag_code: broadcast scalar or reshape array
|
|
if np.isscalar(flag_code):
|
|
flag_code_values = np.full(sub_table.shape, flag_code)
|
|
flag_code_ranks = np.full(sub_table.shape, get_rank(flag_code))
|
|
else:
|
|
# Convert to NumPy array and ensure correct shape
|
|
flag_code_array = np.asarray(flag_code)
|
|
if flag_code_array.ndim == 1:
|
|
# Assume it's one flag per row — broadcast across columns
|
|
flag_code_values = np.tile(flag_code_array[:, None], (1, len(numflag_columns)))
|
|
flag_code_ranks = np.vectorize(get_rank)(flag_code_values)
|
|
else:
|
|
# Full 2D matrix
|
|
flag_code_values = flag_code_array
|
|
flag_code_ranks = np.vectorize(get_rank)(flag_code_array)
|
|
|
|
# Validate shape match
|
|
if flag_code_values.shape != sub_table.shape:
|
|
raise ValueError(f"Shape mismatch: expected {sub_table.shape}, got {flag_code_values.shape}")
|
|
|
|
# Reconcile values based on rank comparison
|
|
new_values = np.where(current_ranks < flag_code_ranks, flag_code_values, sub_table.values)
|
|
|
|
# Assign reconciled values back
|
|
data_table.loc[t1_idx:t2_idx, numflag_columns] = new_values.astype(np.int64)
|
|
|
|
return data_table
|
|
|
|
def generate_cpc_flags(data_table, datetime_var: str = 'start_time'):
|
|
# TODO: ask Rob where to find this information.
|
|
required_variables = ['start_time', 'end_time', 'st_y', 'ed_y', 'p_int', 'T_int', 'conc', 'numflag']
|
|
|
|
# Optionally check if required variables exist in data_table
|
|
# Uncomment if you want to enforce the check:
|
|
# if not all(var in data_table.columns for var in required_variables):
|
|
# raise ValueError("Some required variables are missing from the data_table.")
|
|
|
|
# Select only datetime and numflag columns
|
|
flags_table = data_table.loc[:, [datetime_var, 'numflag']].copy()
|
|
print(flags_table.head())
|
|
|
|
# Multiply numflag by 100 and floor it
|
|
flags_table['numflag'] = flags_table['numflag'].apply(lambda x: floor(x * 1000))
|
|
|
|
flags_table.rename(columns={'numflag':'numflag_cpc'}, inplace=True)
|
|
|
|
default_value = flags_dict[999]
|
|
flags_table['flag_conc'] = flags_table['numflag_cpc'].copy().values
|
|
flags_table['flag_conc'] = flags_table['flag_conc'].apply(lambda x : flags_dict.get(x,default_value)['validity']=='I')
|
|
|
|
return flags_table
|
|
|
|
|
|
#def compute_diagnostic_variable_flags(data_table, validity_thresholds_dict):
|
|
def generate_diagnostic_flags(data_table, validity_thresholds_dict):
|
|
"""
|
|
Create indicator variables that check whether a particular diagnostic variable is within
|
|
pre-specified/acceptable limits, which are defined by `variable_limits`.
|
|
|
|
Parameters:
|
|
data_table (pd.DataFrame): The input data table with variables to calibrate.
|
|
variable_limits (dict): Dictionary mapping diagnostic-variables to their limits, e.g.,
|
|
{
|
|
'ABsamp': {
|
|
'lower_lim': {'value': 20000, 'description': "not specified yet"},
|
|
'upper_lim': {'value': 500000, 'description': "not specified yet"}
|
|
}
|
|
}
|
|
|
|
Returns:
|
|
pd.DataFrame: A new data table with calibrated variables, containing the original columns
|
|
and additional indicator variables, representing flags.
|
|
"""
|
|
|
|
|
|
|
|
# Define binary to ebas flag code map
|
|
# Specify labeling function to create numbered EBAS flags. It maps a column indicator,
|
|
# marking a time interval in which a particular flagging event occurred.
|
|
binary_to_ebas_code = {False : 0, True : 456}
|
|
|
|
|
|
# Initialize a dictionary to store indicator variables
|
|
indicator_variables = {}
|
|
|
|
indicator_variables['t_base'] = data_table['t_base']
|
|
|
|
# Loop through the column names in the data table
|
|
for diagnostic_variable in data_table.columns:
|
|
#print(diagnostic_variable)
|
|
# Skip if the diagnostic variable is not in variable_limits
|
|
if diagnostic_variable not in validity_thresholds_dict['validity_thresholds']['variables']:
|
|
print(f'Unspecified validity thresholds for variable {diagnostic_variable}. If needed, update pipelines/params/validity_thresholds.yaml accordingly.')
|
|
continue
|
|
|
|
# Get lower and upper limits for diagnostic_variable from variable limits dict
|
|
variable_ranges = validity_thresholds_dict['validity_thresholds']['variables'][diagnostic_variable]
|
|
lower_lim = variable_ranges['lower_lim']
|
|
upper_lim = variable_ranges['upper_lim']
|
|
|
|
# Create an indicator variable for the current diagnostic variable
|
|
tmp = data_table[diagnostic_variable]
|
|
indicator_variables['flag_'+diagnostic_variable] = np.logical_not(((tmp >= lower_lim) & (tmp <= upper_lim)).to_numpy())
|
|
indicator_variables['numflag_'+diagnostic_variable] = np.array([binary_to_ebas_code[entry] for entry in indicator_variables['flag_'+diagnostic_variable]], dtype=np.int64)
|
|
|
|
# Add indicator variables to the new data table
|
|
new_data_table = pd.DataFrame(indicator_variables)
|
|
|
|
aggr_func = lambda x : max(x.values)
|
|
new_data_table['numflag_any_diagnostic_flag'] = new_data_table.loc[:,['numflag_' in col for col in new_data_table.columns]].aggregate(aggr_func,axis='columns')
|
|
|
|
aggr_func = lambda x : np.nan if x.isna().all() else any(x.dropna().values)
|
|
new_data_table['flag_any_diagnostic_flag'] = new_data_table.loc[:,['flag_' in col for col in new_data_table.columns]].aggregate(aggr_func, axis='columns')
|
|
#new_data_table['flag_any_diagnostic_flag'] = new_data_table.apply(lambda x : any(np.logical_not(x.values)), axis='columns')
|
|
#new_data_table['flag_any_diagnostic'] = new_data_table.apply(
|
|
# lambda x: np.nan if x.isna().all() else any(x.dropna().values), axis='columns'
|
|
#)
|
|
return new_data_table
|
|
|
|
from scipy.interpolate import interp1d
|
|
|
|
def generate_species_flags(data_table : pd.DataFrame, calib_param_dict : dict, flagsFolderPath, datetime_var : str = 't_start_Buf'):
|
|
|
|
"""Generate flags for columns in data_table based on flags_table
|
|
|
|
Returns
|
|
-------
|
|
_type_
|
|
_description_
|
|
"""
|
|
|
|
|
|
print('Retreiving species to be flagged ...')
|
|
predefined_species = calib_param_dict.get('variables',{}).get('species',[])
|
|
print(f'Species to be flagged are: {predefined_species}. If needed, update pipelines/params/calibration_params.yaml')
|
|
|
|
if not predefined_species:
|
|
raise RuntimeError("Undefined species. Input argument 'calib_param_dict' must contain a 'variables' : {'species' : ['example1',...,'examplen']} ")
|
|
|
|
manual_json_flags, csv_flags = get_flags_from_folder(flagsFolderPath)
|
|
#print(manual_json_flags,csv_flags)
|
|
interpolated_cpc_flags = []
|
|
if csv_flags:
|
|
|
|
# Loop over CSV files in the flags folder
|
|
for filename in csv_flags:
|
|
filePath = os.path.join(flagsFolderPath, filename)
|
|
fileMetadataDict = get_metadata(filePath)
|
|
flag_datetime_var = fileMetadataDict['datetime_var']
|
|
|
|
flags_table = pd.read_csv(filePath)
|
|
|
|
# Ensure datetime type
|
|
data_table[datetime_var] = pd.to_datetime(data_table[datetime_var])
|
|
flags_table[flag_datetime_var] = pd.to_datetime(flags_table[flag_datetime_var])
|
|
|
|
if 'numflag_any_diagnostic_flag' in flags_table.columns:
|
|
# Sort for interpolation
|
|
flags_table_sorted = flags_table.sort_values(flag_datetime_var)
|
|
|
|
# Pre-convert datetimes to int64 for interpolation
|
|
flag_times_int = flags_table_sorted[flag_datetime_var].astype(np.int64)
|
|
data_times_int = data_table[datetime_var].astype(np.int64)
|
|
|
|
# Nearest-neighbor interpolator
|
|
flag_interp_func = interp1d(
|
|
flag_times_int,
|
|
flags_table_sorted['numflag_any_diagnostic_flag'],
|
|
kind='nearest',
|
|
bounds_error=False,
|
|
fill_value='extrapolate'
|
|
)
|
|
|
|
# Interpolated flags
|
|
interpolated_flags = flag_interp_func(data_times_int)
|
|
|
|
# Define which columns to flag
|
|
required = lambda var: var != datetime_var and var in predefined_species
|
|
renaming_map = {var: f'numflag_{var}' for var in data_table.columns if required(var)}
|
|
variables = list(renaming_map.keys())
|
|
|
|
# Assign flags to those columns
|
|
for var in variables:
|
|
data_table[var] = interpolated_flags
|
|
|
|
data_table.rename(columns=renaming_map, inplace=True)
|
|
|
|
elif 'numflag_cpc' in flags_table.columns:
|
|
# Sort for interpolation
|
|
flags_table_sorted = flags_table.sort_values(flag_datetime_var)
|
|
|
|
# Pre-convert datetimes to int64 for interpolation
|
|
flag_times_int = flags_table_sorted[flag_datetime_var].values.astype(np.int64)
|
|
data_times_int = data_table[datetime_var].values.astype(np.int64)
|
|
|
|
# Nearest-neighbor interpolator
|
|
flag_interp_func = interp1d(
|
|
flag_times_int,
|
|
flags_table_sorted['numflag_cpc'],
|
|
kind='nearest',
|
|
bounds_error=False,
|
|
fill_value='extrapolate'
|
|
)
|
|
|
|
# Interpolated flags
|
|
interpolated_cpc_flags = flag_interp_func(data_times_int)
|
|
|
|
|
|
else:
|
|
raise FileNotFoundError("Automated diagnostic flag .csv not found. Hint: Run pipelines/step/generate_flags.py <campaignFile.h5> --flag-type diagnostics.")
|
|
|
|
|
|
numflag_columns = [col for col in data_table.columns if 'numflag_' in col]
|
|
|
|
if len(interpolated_cpc_flags)>0:
|
|
|
|
data_table = reconcile_flags(data_table, interpolated_cpc_flags, 0, interpolated_cpc_flags.size, numflag_columns)
|
|
|
|
#print(numflag_columns)
|
|
for flag_filename in manual_json_flags:
|
|
#print(flag_filename)
|
|
parts = os.path.splitext(flag_filename)[0].split('_')
|
|
varname = '_'.join(parts[2:]) # Extract variable name from filename
|
|
|
|
#if f'flag_{varname}' in data_table.columns:
|
|
try:
|
|
# Load manually generate flag
|
|
with open(os.path.join(flagsFolderPath, flag_filename), 'r') as stream:
|
|
flag_dict = json.load(stream)
|
|
|
|
t1 = pd.to_datetime(flag_dict.get('startdate'))
|
|
t2 = pd.to_datetime(flag_dict.get('enddate'))
|
|
flag_code = flag_dict.get('flag_code', np.nan) # Default to NaN if missing
|
|
|
|
if pd.isnull(t1) or pd.isnull(t2):
|
|
continue # Skip if invalid timestamps
|
|
|
|
if not data_table[datetime_var].is_monotonic_increasing:
|
|
data_table.sort_values(by=datetime_var, inplace=True)
|
|
data_table.reset_index(drop=True, inplace=True)
|
|
|
|
t1_idx = int(abs(data_table[datetime_var] - t1).argmin())
|
|
t2_idx = int(abs(data_table[datetime_var] - t2).argmin())
|
|
#print(flag_code)
|
|
#for col in data_table.columns:
|
|
|
|
# if 'numflag_' in col:
|
|
# print(col)
|
|
|
|
data_table = reconcile_flags(data_table, flag_code, t1_idx, t2_idx, numflag_columns)
|
|
except (KeyError, ValueError, FileNotFoundError) as e:
|
|
print(f"Error processing {flag_filename}: {e}")
|
|
continue
|
|
|
|
|
|
|
|
# Binarize flags for streamlined visualization of invalid and valid regions
|
|
binary_flag_columns = []
|
|
default_code = 999 # EBAS missing measurement unspecified reason
|
|
default_value = flags_dict[default_code] # fallback definition for unknown flags
|
|
|
|
# Convert them to integer type (handling NaNs if needed)
|
|
data_table[numflag_columns] = data_table[numflag_columns].fillna(default_code).astype(int)
|
|
|
|
for numflag_var in numflag_columns:
|
|
flag_var = numflag_var.replace('numflag_', 'flag_')
|
|
binary_flag_columns.append(flag_var)
|
|
|
|
# Apply validity check: True if flag is 'I' (invalid)
|
|
data_table[flag_var] = data_table[numflag_var].apply(
|
|
lambda x: flags_dict.get(x, default_value).get('validity') == 'I'
|
|
)
|
|
|
|
|
|
return data_table.loc[:, [datetime_var] + numflag_columns + binary_flag_columns]
|
|
|
|
|
|
|
|
|
|
|
|
# all_dat[VaporizerTemp_C >= heater_lower_lim & VaporizerTemp_C <= heater_upper_lim ,flag_heater_auto:="V"]
|
|
# all_dat[ABsamp >= AB_lower_lim & ABsamp <= AB_upper_lim ,flag_AB_auto:="V"]
|
|
# all_dat[FlowRate_ccs >= flow_lower_lim & FlowRate_ccs <= flow_upper_lim ,flag_flow_auto:="V"]
|
|
# all_dat[FilamentEmission_mA >= filament_lower_lim & FilamentEmission_mA <= filament_upper_lim ,flag_filament_auto:="V"]
|
|
|
|
def main(data_file, flag_type, capture_renku_metadata=False, workflow_name='generate_flags_workflow'):
|
|
|
|
inputs = []
|
|
outputs = []
|
|
parameters = []
|
|
|
|
|
|
try:
|
|
# Load data and locate relevant dataset
|
|
dataManager = dataOps.HDF5DataOpsManager(data_file)
|
|
dataManager.load_file_obj()
|
|
|
|
base_name = '/ACSM_TOFWARE'
|
|
if '/ACSM_TOFWARE' not in dataManager.file_obj:
|
|
dataManager.unload_file_obj()
|
|
print(f'Invalid data file: {data_file}. Missing instrument folder ACSM_TOFWARE.')
|
|
raise ImportError(f'Instrument folder "/ACSM_TOFWARE" not found in data_file : {data_file}')
|
|
|
|
dataManager.extract_and_load_dataset_metadata()
|
|
dataset_metadata_df = dataManager.dataset_metadata_df.copy()
|
|
STATION_ABBR = load_project_yaml_files(projectPath,'campaignDescriptor.yaml')['station_abbr']
|
|
|
|
# Find dataset associated with flag_type
|
|
if flag_type == 'diagnostics':
|
|
keywords = [f'ACSM_{STATION_ABBR}_','_meta.txt/data_table']
|
|
elif flag_type == 'species':
|
|
keywords = [f'ACSM_{STATION_ABBR}_','_timeseries.txt/data_table']
|
|
elif flag_type == 'cpc':
|
|
keywords = ['cpc.particle_number_concentration.aerosol.', f'CH02L_TSI_3772_{STATION_ABBR}.CH02L_CPC.lev1.nas']
|
|
else:
|
|
raise ValueError(f"Unsupported flag_type: {flag_type}")
|
|
|
|
find_keyword = [all(keyword in item for keyword in keywords) for item in dataset_metadata_df['dataset_name']]
|
|
columns = ['dataset_name','parent_file','parent_instrument']
|
|
dataset_name, parent_file, parent_instrument = tuple(dataset_metadata_df.loc[find_keyword,col] for col in columns)
|
|
|
|
if not (dataset_name.size == 1):
|
|
raise ValueError(f'{flag_type} file is not uniquely identifiable: {parent_file}')
|
|
else:
|
|
dataset_name = dataset_name.values[0]
|
|
parent_file = parent_file.values[0]
|
|
parent_instrument = parent_instrument.values[0]
|
|
|
|
# Extract data and timestamp
|
|
data_table = dataManager.extract_dataset_as_dataframe(dataset_name)
|
|
datetime_var, datetime_var_format = dataManager.infer_datetime_variable(dataset_name)
|
|
dataManager.unload_file_obj()
|
|
|
|
# Report missing timestamps
|
|
num_nats = data_table[datetime_var].isna().sum()
|
|
total_rows = len(data_table)
|
|
percentage_nats = (num_nats / total_rows) * 100
|
|
print(f"Total rows: {total_rows}")
|
|
print(f"NaT (missing) values: {num_nats}")
|
|
print(f"Percentage of data loss: {percentage_nats:.4f}%")
|
|
|
|
except Exception as e:
|
|
print(f"Error loading input files: {e}")
|
|
return 1
|
|
|
|
print('Starting flag generation.')
|
|
try:
|
|
path_to_output_dir, ext = os.path.splitext(data_file)
|
|
suffix = 'flags'
|
|
|
|
# Parse folder/category from instrument
|
|
parts = parent_instrument.split('/')
|
|
instFolder = parts[0]
|
|
category = parts[1] if len(parts) >= 2 else ''
|
|
|
|
path_to_output_folder = os.path.splitext('/'.join([path_to_output_dir,f'{instFolder}_{suffix}',category]))[0]
|
|
processingScriptRelPath = os.path.relpath(thisFilePath, start=projectPath)
|
|
|
|
if not os.path.exists(path_to_output_folder):
|
|
os.makedirs(path_to_output_folder)
|
|
|
|
print('Processing script:', processingScriptRelPath)
|
|
print('Output directory:', path_to_output_folder)
|
|
|
|
# Flagging logic
|
|
if flag_type == 'diagnostics':
|
|
validity_thresholds_dict = load_project_yaml_files(projectPath, "validity_thresholds.yaml")
|
|
flags_table = generate_diagnostic_flags(data_table, validity_thresholds_dict)
|
|
elif flag_type == 'species':
|
|
calib_param_dict = load_project_yaml_files(projectPath, "calibration_params.yaml")
|
|
flags_table = generate_species_flags(data_table, calib_param_dict, path_to_output_folder, datetime_var)
|
|
elif flag_type == 'cpc':
|
|
flags_table = generate_cpc_flags(data_table, datetime_var)
|
|
|
|
# Metadata for lineage
|
|
metadata = {
|
|
'actris_level' : 1,
|
|
'processing_script': processingScriptRelPath.replace(os.sep,'/'),
|
|
'processing_date' : utils.created_at(),
|
|
'flag_type' : flag_type,
|
|
'datetime_var': datetime_var
|
|
}
|
|
|
|
filename, ext = os.path.splitext(parent_file)
|
|
path_to_flags_file = '/'.join([path_to_output_folder, f'{filename}_flags.csv'])
|
|
|
|
# Save output and record lineage
|
|
flags_table.to_csv(path_to_flags_file, index=False)
|
|
status = stepUtils.record_data_lineage(path_to_flags_file, projectPath, metadata)
|
|
|
|
print(f"Flags saved to {path_to_flags_file}")
|
|
print(f"Data lineage saved to {path_to_output_folder}")
|
|
|
|
except Exception as e:
|
|
print(f"Error during flag generation: {e}")
|
|
return 1
|
|
|
|
# --------------------- Renku Metadata Collection ----------------------------
|
|
if capture_renku_metadata:
|
|
from workflows.utils import RenkuWorkflowBuilder
|
|
|
|
inputs.append(("script_py", {'path': os.path.relpath(thisFilePath, start=projectPath)}))
|
|
inputs.append(("data_file", {'path': os.path.relpath(data_file, start=projectPath)}))
|
|
|
|
# Parameter
|
|
parameters.append(("flag_type", {'value': flag_type}))
|
|
|
|
# Add implicit YAML config
|
|
if flag_type == 'diagnostics':
|
|
inputs.append(("validity_thresholds_yaml", {
|
|
'path': os.path.relpath(os.path.join(projectPath, "pipelines/params/validity_thresholds.yaml"), start=projectPath),
|
|
'implicit': True
|
|
}))
|
|
|
|
elif flag_type == 'species':
|
|
inputs.append(("calibration_params_yaml", {
|
|
'path': os.path.relpath(os.path.join(projectPath, "pipelines/params/calibration_params.yaml"), start=projectPath),
|
|
'implicit': True
|
|
}))
|
|
|
|
# Add CSV and JSON flags from flags folder as implicit inputs
|
|
|
|
flag_index = 0
|
|
for fname in os.listdir(path_to_output_folder):
|
|
full_path = os.path.join(path_to_output_folder, fname)
|
|
|
|
# Skip the output file to avoid circular dependency
|
|
if os.path.abspath(full_path) == os.path.abspath(path_to_flags_file):
|
|
continue
|
|
|
|
rel_flag_path = os.path.relpath(full_path, start=projectPath)
|
|
|
|
if fname.endswith('.csv') or (fname.endswith('.json') and 'metadata' not in fname):
|
|
inputs.append((f"flag_in_{flag_index}", {
|
|
'description': 'manual flag by domain expert' if fname.endswith('.json') else 'automated or cpc flag',
|
|
'path': rel_flag_path,
|
|
'implicit': True
|
|
|
|
}))
|
|
flag_index += 1
|
|
|
|
#elif flag_type == 'cpc':
|
|
# CPC may require logic like species if any dependencies are found
|
|
# for fname in os.listdir(path_to_output_folder):
|
|
# rel_flag_path = os.path.relpath(os.path.join(path_to_output_folder, fname), start=projectPath)
|
|
# if fname.endswith('.nas') and ('cpc' in fname):
|
|
# inputs.append((f"flag_{fname}", {
|
|
# 'path': rel_flag_path,
|
|
# 'implicit': True
|
|
# }))
|
|
|
|
# Output
|
|
outputs.append(("flags_csv", {
|
|
'path': os.path.relpath(path_to_flags_file, start=projectPath),
|
|
'implicit': True
|
|
}))
|
|
|
|
# Define workflow step
|
|
workflowfile_builder = RenkuWorkflowBuilder(name=workflow_name)
|
|
workflowfile_builder.add_step(
|
|
step_name=f"generate_flags_{flag_type}",
|
|
base_command="python",
|
|
inputs=inputs,
|
|
outputs=outputs,
|
|
parameters=parameters
|
|
)
|
|
workflowfile_builder.save_to_file(os.path.join(projectPath, 'workflows'))
|
|
|
|
|
|
return 0
|
|
|
|
|
|
|
|
|
|
|
|
def get_flags_from_folder(flagsFolderPath):
|
|
# Get current state of flags folder, which will condition the species flagging
|
|
manual_json_flags = []
|
|
csv_flags = []
|
|
|
|
# Loop through all files in the flags folder
|
|
for folderitem in os.listdir(flagsFolderPath):
|
|
|
|
# Skip system-level metadata JSON file
|
|
if all([folderitem.endswith('.json'), 'metadata' in folderitem]):
|
|
continue
|
|
|
|
# Identify manual flag JSON files with 'flag_<number>_<variablename>.json' format
|
|
if folderitem.startswith('flag') and folderitem.endswith('.json'):
|
|
manual_json_flags.append(folderitem)
|
|
# Identify CSV flag files
|
|
elif folderitem.endswith('.csv') and '_flags' in folderitem:
|
|
csv_flags.append(folderitem)
|
|
|
|
# Return the lists of manual flag JSON files and CSV flag files
|
|
return manual_json_flags, csv_flags
|
|
|
|
if __name__ == '__main__':
|
|
|
|
# Set up argument parsing
|
|
parser = argparse.ArgumentParser(description="Generate flags for diagnostics and species variables.")
|
|
|
|
parser.add_argument(
|
|
"--flag-type",
|
|
required=True,
|
|
choices=["diagnostics", "species", "cpc"],
|
|
help="Specify the flag type. Must be one of: diagnostics, species, cpc"
|
|
)
|
|
parser.add_argument('data_file', type=str, help="Path to the input HDF5 file containing the data table.")
|
|
#parser.add_argument('dataset_name', type=str, help ='Relative path to data_table (i.e., dataset name) in HDF5 file')
|
|
#parser.add_argument('validity_thersholds_file', type=str, help="Path to the input YAML file containing calibration factors.")
|
|
#parser.add_argument('output_file', type=str, help="Path to save the output calibrated data as a CSV file.")
|
|
|
|
args = parser.parse_args()
|
|
|
|
flag_type = args.flag_type
|
|
data_file = args.data_file
|
|
|
|
main(data_file, flag_type) |