mirror of
https://gitea.psi.ch/APOG/acsmnode.git
synced 2025-06-29 12:50:48 +02:00
WIP: Redesigned command line functionality. options --species is half baked :). It runs without error but second part needs validation.
This commit is contained in:
@ -34,7 +34,8 @@ def compute_cpc_flags():
|
|||||||
|
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
def compute_diagnostic_variable_flags(data_table, validity_thresholds_dict):
|
#def compute_diagnostic_variable_flags(data_table, validity_thresholds_dict):
|
||||||
|
def generate_diagnostic_flags(data_table):
|
||||||
"""
|
"""
|
||||||
Create indicator variables that check whether a particular diagnostic variable is within
|
Create indicator variables that check whether a particular diagnostic variable is within
|
||||||
pre-specified/acceptable limits, which are defined by `variable_limits`.
|
pre-specified/acceptable limits, which are defined by `variable_limits`.
|
||||||
@ -54,6 +55,17 @@ def compute_diagnostic_variable_flags(data_table, validity_thresholds_dict):
|
|||||||
and additional indicator variables, representing flags.
|
and additional indicator variables, representing flags.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
# Implicit input
|
||||||
|
validity_thersholds_file = 'pipelines/params/validity_thresholds.yaml'
|
||||||
|
|
||||||
|
validity_thresholds_dict = {}
|
||||||
|
try:
|
||||||
|
with open(validity_thersholds_file, 'r') as stream:
|
||||||
|
validity_thresholds_dict = yaml.load(stream, Loader=yaml.FullLoader)
|
||||||
|
except Exception as e:
|
||||||
|
|
||||||
|
print(f"Error accessing validation thresholds at: {validity_thersholds_file}")
|
||||||
|
return 1
|
||||||
# Initialize a dictionary to store indicator variables
|
# Initialize a dictionary to store indicator variables
|
||||||
indicator_variables = {}
|
indicator_variables = {}
|
||||||
|
|
||||||
@ -78,9 +90,26 @@ def compute_diagnostic_variable_flags(data_table, validity_thresholds_dict):
|
|||||||
|
|
||||||
# Add indicator variables to the new data table
|
# Add indicator variables to the new data table
|
||||||
new_data_table = pd.DataFrame(indicator_variables)
|
new_data_table = pd.DataFrame(indicator_variables)
|
||||||
|
new_data_table['flag_any_diagnostic_flag'] = new_data_table.apply(lambda x : any(np.logical_not(x.values)), axis='columns')
|
||||||
|
#new_data_table['flag_any_diagnostic'] = new_data_table.apply(
|
||||||
|
# lambda x: np.nan if x.isna().all() else any(x.dropna().values), axis='columns'
|
||||||
|
#)
|
||||||
return new_data_table
|
return new_data_table
|
||||||
|
|
||||||
|
def generate_species_flags(data_table : pd.DataFrame , flags_table : pd.DataFrame ):
|
||||||
|
|
||||||
|
"""Generate flags for columns in data_table based on flags_table
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
_type_
|
||||||
|
_description_
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
return 0
|
||||||
|
|
||||||
# all_dat[VaporizerTemp_C >= heater_lower_lim & VaporizerTemp_C <= heater_upper_lim ,flag_heater_auto:="V"]
|
# all_dat[VaporizerTemp_C >= heater_lower_lim & VaporizerTemp_C <= heater_upper_lim ,flag_heater_auto:="V"]
|
||||||
# all_dat[ABsamp >= AB_lower_lim & ABsamp <= AB_upper_lim ,flag_AB_auto:="V"]
|
# all_dat[ABsamp >= AB_lower_lim & ABsamp <= AB_upper_lim ,flag_AB_auto:="V"]
|
||||||
@ -91,9 +120,16 @@ if __name__ == '__main__':
|
|||||||
|
|
||||||
# Set up argument parsing
|
# Set up argument parsing
|
||||||
parser = argparse.ArgumentParser(description="Calibrate species data using calibration factors.")
|
parser = argparse.ArgumentParser(description="Calibrate species data using calibration factors.")
|
||||||
|
|
||||||
|
parser.add_argument(
|
||||||
|
"--flag-type",
|
||||||
|
required=True,
|
||||||
|
choices=["diagnostics", "species", "cpd"],
|
||||||
|
help="Specify the flag type. Must be one of: diagnostics, species, cpd"
|
||||||
|
)
|
||||||
parser.add_argument('data_file', type=str, help="Path to the input HDF5 file containing the data table.")
|
parser.add_argument('data_file', type=str, help="Path to the input HDF5 file containing the data table.")
|
||||||
parser.add_argument('dataset_name', type=str, help ='Relative path to data_table (i.e., dataset name) in HDF5 file')
|
parser.add_argument('dataset_name', type=str, help ='Relative path to data_table (i.e., dataset name) in HDF5 file')
|
||||||
parser.add_argument('validity_thersholds_file', type=str, help="Path to the input YAML file containing calibration factors.")
|
#parser.add_argument('validity_thersholds_file', type=str, help="Path to the input YAML file containing calibration factors.")
|
||||||
#parser.add_argument('output_file', type=str, help="Path to save the output calibrated data as a CSV file.")
|
#parser.add_argument('output_file', type=str, help="Path to save the output calibrated data as a CSV file.")
|
||||||
|
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
@ -108,7 +144,7 @@ if __name__ == '__main__':
|
|||||||
dataManager.load_file_obj()
|
dataManager.load_file_obj()
|
||||||
dataset_name = '/'+args.dataset_name
|
dataset_name = '/'+args.dataset_name
|
||||||
data_table = dataManager.extract_dataset_as_dataframe('/'+args.dataset_name)
|
data_table = dataManager.extract_dataset_as_dataframe('/'+args.dataset_name)
|
||||||
|
datetime_var, datetime_var_format = dataManager.infer_datetime_variable('/'+args.dataset_name)
|
||||||
dataManager.extract_and_load_dataset_metadata()
|
dataManager.extract_and_load_dataset_metadata()
|
||||||
dataset_metadata_df = dataManager.dataset_metadata_df.copy()
|
dataset_metadata_df = dataManager.dataset_metadata_df.copy()
|
||||||
print(dataset_metadata_df.head())
|
print(dataset_metadata_df.head())
|
||||||
@ -119,10 +155,7 @@ if __name__ == '__main__':
|
|||||||
parent_file = data_table_metadata.loc[dataset_name_idx,'parent_file'].values[0]
|
parent_file = data_table_metadata.loc[dataset_name_idx,'parent_file'].values[0]
|
||||||
|
|
||||||
dataManager.unload_file_obj()
|
dataManager.unload_file_obj()
|
||||||
print(args.validity_thersholds_file)
|
|
||||||
|
|
||||||
with open(args.validity_thersholds_file, 'r') as stream:
|
|
||||||
validity_thersholds_dict = yaml.load(stream, Loader=yaml.FullLoader)
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error loading input files: {e}")
|
print(f"Error loading input files: {e}")
|
||||||
exit(1)
|
exit(1)
|
||||||
@ -134,6 +167,8 @@ if __name__ == '__main__':
|
|||||||
|
|
||||||
|
|
||||||
# Perform calibration
|
# Perform calibration
|
||||||
|
|
||||||
|
flag_type = args.flag_type
|
||||||
try:
|
try:
|
||||||
# Define output directory of apply_calibration_factors() step
|
# Define output directory of apply_calibration_factors() step
|
||||||
suffix = 'flags'
|
suffix = 'flags'
|
||||||
@ -150,28 +185,109 @@ if __name__ == '__main__':
|
|||||||
if not os.path.exists(path_to_output_folder):
|
if not os.path.exists(path_to_output_folder):
|
||||||
os.makedirs(path_to_output_folder)
|
os.makedirs(path_to_output_folder)
|
||||||
|
|
||||||
print('Processing script %s:', processingScriptRelPath)
|
print('Processing script:', processingScriptRelPath)
|
||||||
print('Output directory: %s', path_to_output_folder)
|
print('Output directory:', path_to_output_folder)
|
||||||
|
|
||||||
# Compute diagnostic flags based on validity thresholds defined in configuration_file_dict
|
# Compute diagnostic flags based on validity thresholds defined in configuration_file_dict
|
||||||
|
|
||||||
flags_table = compute_diagnostic_variable_flags(data_table, validity_thersholds_dict)
|
if flag_type == 'diagnostics':
|
||||||
metadata = {'actris_level' : 1,
|
|
||||||
'processing_script': processingScriptRelPath.replace(os.sep,'/'),
|
flags_table = generate_diagnostic_flags(data_table)
|
||||||
'processing_date' : utils.created_at()
|
metadata = {'actris_level' : 1,
|
||||||
}
|
'processing_script': processingScriptRelPath.replace(os.sep,'/'),
|
||||||
|
'processing_date' : utils.created_at(),
|
||||||
|
'flag_type' : flag_type
|
||||||
|
}
|
||||||
|
|
||||||
|
# Save output tables to csv file and save/or update data lineage record
|
||||||
|
filename, ext = os.path.splitext(parent_file)
|
||||||
|
path_to_flags_file = '/'.join([path_to_output_folder, f'{filename}_flags.csv'])
|
||||||
|
#path_to_calibration_factors_file = '/'.join([path_to_output_folder, f'{filename}_calibration_factors.csv'])
|
||||||
|
|
||||||
|
flags_table.to_csv(path_to_flags_file, index=False)
|
||||||
|
|
||||||
|
status = stepUtils.record_data_lineage(path_to_flags_file, projectPath, metadata)
|
||||||
|
|
||||||
|
print(f"Flags saved to {path_to_flags_file}")
|
||||||
|
print(f"Data lineage saved to {path_to_output_dir}")
|
||||||
|
|
||||||
|
if flag_type == 'species':
|
||||||
|
|
||||||
|
# Save output tables to csv file and save/or update data lineage record
|
||||||
|
filename, ext = os.path.splitext(parent_file)
|
||||||
|
path_to_flags_file = '/'.join([path_to_output_folder, f'{filename}_flags.csv'])
|
||||||
|
|
||||||
|
variables_set = set(data_table.columns)
|
||||||
|
|
||||||
|
manual_json_flags = []
|
||||||
|
csv_flags = []
|
||||||
|
|
||||||
|
# Inspect flags folder
|
||||||
|
for filename in os.listdir(path_to_output_folder):
|
||||||
|
if any(var in filename and filename.endswith('.json') for var in variables_set):
|
||||||
|
manual_json_flags.append(filename)
|
||||||
|
elif filename.endswith('.csv'):
|
||||||
|
csv_flags.append(filename)
|
||||||
|
|
||||||
|
if len(csv_flags) == 1:
|
||||||
|
flags_table = pd.read_csv(os.path.join(path_to_output_folder, csv_flags[0]))
|
||||||
|
|
||||||
|
if 'flag_any_diagnostic_flag' in flags_table.columns:
|
||||||
|
#renaming_map = {var: f'flag_{var}' for var in data_table.columns}
|
||||||
|
#data_table[renaming_map.keys()] = flags_table['flag_any_diagnostic_flag'].values
|
||||||
|
#data_table.rename(columns=renaming_map, inplace=True)
|
||||||
|
|
||||||
|
renaming_map = {}
|
||||||
|
for var in data_table.columns:
|
||||||
|
if not datetime_var == var:
|
||||||
|
renaming_map[var] = f'flag_{var}'
|
||||||
|
data_table[var] = pd.Series(flags_table['flag_any_diagnostic_flag'].values)
|
||||||
|
print(renaming_map)
|
||||||
|
data_table.rename(columns=renaming_map, inplace=True)
|
||||||
|
print(csv_flags)
|
||||||
|
# TODO: validate the code below. I suspect we need to change the flag creation strategy.
|
||||||
|
# First, aggregate manual num flags as one using median maybe and then create a flag for each specie
|
||||||
|
# taking into account the existing specifies flags (read csv file and update it if needed)
|
||||||
|
for flag_filename in manual_json_flags:
|
||||||
|
parts = os.path.splitext(flag_filename)[0].split('_')
|
||||||
|
varname = '_'.join(parts[2:]) # Extract variable name from filename
|
||||||
|
print(varname)
|
||||||
|
if f'flag_{varname}' in data_table.columns:
|
||||||
|
try:
|
||||||
|
with open(os.path.join(path_to_output_folder, flag_filename), 'r') as stream:
|
||||||
|
flag_dict = json.load(stream)
|
||||||
|
|
||||||
|
t1 = pd.to_datetime(flag_dict.get('startdate'))
|
||||||
|
t2 = pd.to_datetime(flag_dict.get('enddate'))
|
||||||
|
flag_code = flag_dict.get('flag_code', np.nan) # Default to NaN if missing
|
||||||
|
|
||||||
|
if pd.isnull(t1) or pd.isnull(t2):
|
||||||
|
continue # Skip if invalid timestamps
|
||||||
|
|
||||||
|
if not data_table[datetime_var].is_monotonic_increasing:
|
||||||
|
data_table.sort_values(by=datetime_var, inplace=True)
|
||||||
|
data_table.reset_index(drop=True, inplace=True)
|
||||||
|
|
||||||
|
t1_idx = abs(data_table[datetime_var] - t1).argmin()
|
||||||
|
t2_idx = abs(data_table[datetime_var] - t2).argmin()
|
||||||
|
|
||||||
|
data_table.loc[t1_idx:t2_idx, f'flag_{varname}'] = flag_code
|
||||||
|
|
||||||
|
except (KeyError, ValueError, FileNotFoundError) as e:
|
||||||
|
print(f"Error processing {flag_filename}: {e}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
data_table.to_csv(path_to_flags_file, index=False)
|
||||||
|
|
||||||
|
|
||||||
|
# Read json and assign numeric flag to column
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# Save output tables to csv file and save/or update data lineage record
|
|
||||||
filename, ext = os.path.splitext(parent_file)
|
|
||||||
path_to_flags_file = '/'.join([path_to_output_folder, f'{filename}_flags.csv'])
|
|
||||||
#path_to_calibration_factors_file = '/'.join([path_to_output_folder, f'{filename}_calibration_factors.csv'])
|
|
||||||
|
|
||||||
flags_table.to_csv(path_to_flags_file, index=False)
|
|
||||||
|
|
||||||
status = stepUtils.record_data_lineage(path_to_flags_file, projectPath, metadata)
|
|
||||||
|
|
||||||
print(f"Flags saved to {path_to_flags_file}")
|
|
||||||
print(f"Data lineage saved to {path_to_output_dir}")
|
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error during calibration: {e}")
|
print(f"Error during calibration: {e}")
|
||||||
|
Reference in New Issue
Block a user