mirror of
https://gitea.psi.ch/APOG/acsmnode.git
synced 2025-06-25 16:55:44 +02:00
Update to account for yaml file attribute renamings.
This commit is contained in:
@ -1,167 +1,169 @@
|
||||
import sys, os
|
||||
|
||||
try:
|
||||
thisFilePath = os.path.abspath(__file__)
|
||||
print(thisFilePath)
|
||||
except NameError:
|
||||
print("[Notice] The __file__ attribute is unavailable in this environment (e.g., Jupyter or IDLE).")
|
||||
print("When using a terminal, make sure the working directory is set to the script's location to prevent path issues (for the DIMA submodule)")
|
||||
#print("Otherwise, path to submodule DIMA may not be resolved properly.")
|
||||
thisFilePath = os.getcwd() # Use current directory or specify a default
|
||||
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
import argparse
|
||||
|
||||
import yaml, json
|
||||
|
||||
dimaPath = os.path.normpath(os.path.join(thisFilePath, "..", "..",'..')) # Move up to project root
|
||||
projectPath = os.path.normpath(os.path.join(dimaPath,'..'))
|
||||
print(dimaPath)
|
||||
sys.path.append(dimaPath)
|
||||
import dima.src.hdf5_ops as dataOps
|
||||
|
||||
|
||||
def create_flags_for_diagnostic_vars(data_table, variable_limits):
|
||||
"""
|
||||
Create indicator variables that check whether a particular diagnostic variable is within
|
||||
pre-specified/acceptable limits, which are defined by `variable_limits`.
|
||||
|
||||
Parameters:
|
||||
data_table (pd.DataFrame): The input data table with variables to calibrate.
|
||||
variable_limits (dict): Dictionary mapping diagnostic-variables to their limits, e.g.,
|
||||
{
|
||||
'ABsamp': {
|
||||
'lower_lim': {'value': 20000, 'description': "not specified yet"},
|
||||
'upper_lim': {'value': 500000, 'description': "not specified yet"}
|
||||
}
|
||||
}
|
||||
|
||||
Returns:
|
||||
pd.DataFrame: A new data table with calibrated variables, containing the original columns
|
||||
and additional indicator variables, representing flags.
|
||||
"""
|
||||
|
||||
# Initialize a dictionary to store indicator variables
|
||||
indicator_variables = {}
|
||||
|
||||
# Loop through the column names in the data table
|
||||
for diagnostic_variable in data_table.columns:
|
||||
|
||||
# Skip if the diagnostic variable is not in variable_limits
|
||||
if diagnostic_variable not in variable_limits:
|
||||
print(f'Diagnostic variable {diagnostic_variable} has not defined limits in {variable_limits}.')
|
||||
continue
|
||||
|
||||
# Get lower and upper limits for diagnostic_variable from variable limits dict
|
||||
lower_lim = variable_limits[diagnostic_variable]['lower_lim']['value']
|
||||
upper_lim = variable_limits[diagnostic_variable]['upper_lim']['value']
|
||||
|
||||
# Create an indicator variable for the current diagnostic variable
|
||||
tmp = data_table[diagnostic_variable]
|
||||
indicator_variables['flag_'+diagnostic_variable] = ((tmp >= lower_lim) & (tmp <= upper_lim)).to_numpy()
|
||||
|
||||
# Add indicator variables to the new data table
|
||||
new_data_table = pd.DataFrame(indicator_variables)
|
||||
|
||||
return new_data_table
|
||||
|
||||
|
||||
# all_dat[VaporizerTemp_C >= heater_lower_lim & VaporizerTemp_C <= heater_upper_lim ,flag_heater_auto:="V"]
|
||||
# all_dat[ABsamp >= AB_lower_lim & ABsamp <= AB_upper_lim ,flag_AB_auto:="V"]
|
||||
# all_dat[FlowRate_ccs >= flow_lower_lim & FlowRate_ccs <= flow_upper_lim ,flag_flow_auto:="V"]
|
||||
# all_dat[FilamentEmission_mA >= filament_lower_lim & FilamentEmission_mA <= filament_upper_lim ,flag_filament_auto:="V"]
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
# Set up argument parsing
|
||||
parser = argparse.ArgumentParser(description="Calibrate species data using calibration factors.")
|
||||
parser.add_argument('data_file', type=str, help="Path to the input HDF5 file containing the data table.")
|
||||
parser.add_argument('dataset_name', type=str, help ='Relative path to data_table (i.e., dataset name) in HDF5 file')
|
||||
parser.add_argument('calibration_file', type=str, help="Path to the input YAML file containing calibration factors.")
|
||||
#parser.add_argument('output_file', type=str, help="Path to save the output calibrated data as a CSV file.")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# Load input data and calibration factors
|
||||
try:
|
||||
#data_table = pd.read_json(args.data_file)
|
||||
|
||||
print(args.data_file)
|
||||
|
||||
dataManager = dataOps.HDF5DataOpsManager(args.data_file)
|
||||
dataManager.load_file_obj()
|
||||
dataset_name = '/'+args.dataset_name
|
||||
data_table = dataManager.extract_dataset_as_dataframe('/'+args.dataset_name)
|
||||
|
||||
dataManager.extract_and_load_dataset_metadata()
|
||||
dataset_metadata_df = dataManager.dataset_metadata_df.copy()
|
||||
print(dataset_metadata_df.head())
|
||||
|
||||
dataset_name_idx = dataset_metadata_df.index[(dataset_metadata_df['dataset_name']==args.dataset_name).to_numpy()]
|
||||
data_table_metadata = dataset_metadata_df.loc[dataset_name_idx,:]
|
||||
parent_instrument = data_table_metadata.loc[dataset_name_idx,'parent_instrument'].values[0]
|
||||
parent_file = data_table_metadata.loc[dataset_name_idx,'parent_file'].values[0]
|
||||
|
||||
dataManager.unload_file_obj()
|
||||
print(args.calibration_file)
|
||||
|
||||
with open(args.calibration_file, 'r') as stream:
|
||||
calibration_factors = yaml.load(stream, Loader=yaml.FullLoader)
|
||||
except Exception as e:
|
||||
print(f"Error loading input files: {e}")
|
||||
exit(1)
|
||||
|
||||
path_to_output_dir, ext = os.path.splitext(args.data_file)
|
||||
|
||||
print('Path to output directory :', path_to_output_dir)
|
||||
|
||||
|
||||
|
||||
# Perform calibration
|
||||
try:
|
||||
|
||||
processingScriptRelPath = os.path.relpath(thisFilePath,start=projectPath)
|
||||
|
||||
print(processingScriptRelPath)
|
||||
|
||||
metadata = {'actris_level' : 1, 'processing_script': processingScriptRelPath.replace(os.sep,'/')}
|
||||
|
||||
path_to_output_file, ext = os.path.splitext('/'.join([path_to_output_dir,parent_instrument,parent_file]))
|
||||
path_to_calibrated_file = ''.join([path_to_output_file, '_flags.csv'])
|
||||
|
||||
path_tail, path_head = os.path.split(path_to_calibrated_file)
|
||||
path_to_metadata_file = '/'.join([path_tail, 'data_lineage_metadata.json'])
|
||||
|
||||
print('Path to output file :', path_to_calibrated_file)
|
||||
import dima.utils.g5505_utils as utils
|
||||
import json
|
||||
calibrated_table = create_flags_for_diagnostic_vars(data_table, calibration_factors)
|
||||
metadata['processing_date'] = utils.created_at()
|
||||
calibrated_table.to_csv(path_to_calibrated_file, index=False)
|
||||
|
||||
# Ensure the file exists
|
||||
if not os.path.exists(path_to_metadata_file):
|
||||
with open(path_to_metadata_file, 'w') as f:
|
||||
json.dump({}, f) # Initialize empty JSON
|
||||
|
||||
# Read the existing JSON
|
||||
with open(path_to_metadata_file, 'r') as metadata_file:
|
||||
try:
|
||||
json_dict = json.load(metadata_file)
|
||||
except json.JSONDecodeError:
|
||||
json_dict = {} # Start fresh if file is invalid
|
||||
|
||||
# Update the JSON object
|
||||
outputfileRelPath = os.path.relpath(path_to_calibrated_file, start=projectPath).replace(os.sep, '/')
|
||||
json_dict[outputfileRelPath] = metadata
|
||||
|
||||
# Write updated JSON back to the file
|
||||
with open(path_to_metadata_file, 'w') as metadata_file:
|
||||
json.dump(json_dict, metadata_file, indent=4)
|
||||
|
||||
print(f"Calibrated data saved to {path_to_calibrated_file}")
|
||||
print(f"Metadata for calibrated data saved to {path_to_metadata_file}")
|
||||
except Exception as e:
|
||||
print(f"Error during calibration: {e}")
|
||||
import sys, os
|
||||
|
||||
try:
|
||||
thisFilePath = os.path.abspath(__file__)
|
||||
print(thisFilePath)
|
||||
except NameError:
|
||||
print("[Notice] The __file__ attribute is unavailable in this environment (e.g., Jupyter or IDLE).")
|
||||
print("When using a terminal, make sure the working directory is set to the script's location to prevent path issues (for the DIMA submodule)")
|
||||
#print("Otherwise, path to submodule DIMA may not be resolved properly.")
|
||||
thisFilePath = os.getcwd() # Use current directory or specify a default
|
||||
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
import argparse
|
||||
|
||||
import yaml, json
|
||||
|
||||
dimaPath = os.path.normpath(os.path.join(thisFilePath, "..", "..",'..')) # Move up to project root
|
||||
projectPath = os.path.normpath(os.path.join(dimaPath,'..'))
|
||||
print(dimaPath)
|
||||
sys.path.append(dimaPath)
|
||||
import dima.src.hdf5_ops as dataOps
|
||||
|
||||
|
||||
def create_flags_for_diagnostic_vars(data_table, validity_thresholds_dict):
|
||||
"""
|
||||
Create indicator variables that check whether a particular diagnostic variable is within
|
||||
pre-specified/acceptable limits, which are defined by `variable_limits`.
|
||||
|
||||
Parameters:
|
||||
data_table (pd.DataFrame): The input data table with variables to calibrate.
|
||||
variable_limits (dict): Dictionary mapping diagnostic-variables to their limits, e.g.,
|
||||
{
|
||||
'ABsamp': {
|
||||
'lower_lim': {'value': 20000, 'description': "not specified yet"},
|
||||
'upper_lim': {'value': 500000, 'description': "not specified yet"}
|
||||
}
|
||||
}
|
||||
|
||||
Returns:
|
||||
pd.DataFrame: A new data table with calibrated variables, containing the original columns
|
||||
and additional indicator variables, representing flags.
|
||||
"""
|
||||
|
||||
# Initialize a dictionary to store indicator variables
|
||||
indicator_variables = {}
|
||||
|
||||
# Loop through the column names in the data table
|
||||
for diagnostic_variable in data_table.columns:
|
||||
print(diagnostic_variable)
|
||||
# Skip if the diagnostic variable is not in variable_limits
|
||||
if diagnostic_variable not in validity_thresholds_dict['validity_thresholds']['variables']:
|
||||
print(f'Diagnostic variable {diagnostic_variable} has not defined limits in {validity_thresholds_dict}.')
|
||||
continue
|
||||
|
||||
# Get lower and upper limits for diagnostic_variable from variable limits dict
|
||||
variable_ranges = validity_thresholds_dict['validity_thresholds']['variables'][diagnostic_variable]
|
||||
lower_lim = variable_ranges['lower_lim']
|
||||
upper_lim = variable_ranges['upper_lim']
|
||||
|
||||
# Create an indicator variable for the current diagnostic variable
|
||||
tmp = data_table[diagnostic_variable]
|
||||
indicator_variables['flag_'+diagnostic_variable] = ((tmp >= lower_lim) & (tmp <= upper_lim)).to_numpy()
|
||||
|
||||
# Add indicator variables to the new data table
|
||||
new_data_table = pd.DataFrame(indicator_variables)
|
||||
|
||||
return new_data_table
|
||||
|
||||
|
||||
# all_dat[VaporizerTemp_C >= heater_lower_lim & VaporizerTemp_C <= heater_upper_lim ,flag_heater_auto:="V"]
|
||||
# all_dat[ABsamp >= AB_lower_lim & ABsamp <= AB_upper_lim ,flag_AB_auto:="V"]
|
||||
# all_dat[FlowRate_ccs >= flow_lower_lim & FlowRate_ccs <= flow_upper_lim ,flag_flow_auto:="V"]
|
||||
# all_dat[FilamentEmission_mA >= filament_lower_lim & FilamentEmission_mA <= filament_upper_lim ,flag_filament_auto:="V"]
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
# Set up argument parsing
|
||||
parser = argparse.ArgumentParser(description="Calibrate species data using calibration factors.")
|
||||
parser.add_argument('data_file', type=str, help="Path to the input HDF5 file containing the data table.")
|
||||
parser.add_argument('dataset_name', type=str, help ='Relative path to data_table (i.e., dataset name) in HDF5 file')
|
||||
parser.add_argument('calibration_file', type=str, help="Path to the input YAML file containing calibration factors.")
|
||||
#parser.add_argument('output_file', type=str, help="Path to save the output calibrated data as a CSV file.")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# Load input data and calibration factors
|
||||
try:
|
||||
#data_table = pd.read_json(args.data_file)
|
||||
|
||||
print(args.data_file)
|
||||
|
||||
dataManager = dataOps.HDF5DataOpsManager(args.data_file)
|
||||
dataManager.load_file_obj()
|
||||
dataset_name = '/'+args.dataset_name
|
||||
data_table = dataManager.extract_dataset_as_dataframe('/'+args.dataset_name)
|
||||
|
||||
dataManager.extract_and_load_dataset_metadata()
|
||||
dataset_metadata_df = dataManager.dataset_metadata_df.copy()
|
||||
print(dataset_metadata_df.head())
|
||||
|
||||
dataset_name_idx = dataset_metadata_df.index[(dataset_metadata_df['dataset_name']==args.dataset_name).to_numpy()]
|
||||
data_table_metadata = dataset_metadata_df.loc[dataset_name_idx,:]
|
||||
parent_instrument = data_table_metadata.loc[dataset_name_idx,'parent_instrument'].values[0]
|
||||
parent_file = data_table_metadata.loc[dataset_name_idx,'parent_file'].values[0]
|
||||
|
||||
dataManager.unload_file_obj()
|
||||
print(args.calibration_file)
|
||||
|
||||
with open(args.calibration_file, 'r') as stream:
|
||||
calibration_factors = yaml.load(stream, Loader=yaml.FullLoader)
|
||||
except Exception as e:
|
||||
print(f"Error loading input files: {e}")
|
||||
exit(1)
|
||||
|
||||
path_to_output_dir, ext = os.path.splitext(args.data_file)
|
||||
|
||||
print('Path to output directory :', path_to_output_dir)
|
||||
|
||||
|
||||
|
||||
# Perform calibration
|
||||
try:
|
||||
|
||||
processingScriptRelPath = os.path.relpath(thisFilePath,start=projectPath)
|
||||
|
||||
print(processingScriptRelPath)
|
||||
|
||||
metadata = {'actris_level' : 1, 'processing_script': processingScriptRelPath.replace(os.sep,'/')}
|
||||
|
||||
path_to_output_file, ext = os.path.splitext('/'.join([path_to_output_dir,parent_instrument,parent_file]))
|
||||
path_to_calibrated_file = ''.join([path_to_output_file, '_flags.csv'])
|
||||
|
||||
path_tail, path_head = os.path.split(path_to_calibrated_file)
|
||||
path_to_metadata_file = '/'.join([path_tail, 'data_lineage_metadata.json'])
|
||||
|
||||
print('Path to output file :', path_to_calibrated_file)
|
||||
import dima.utils.g5505_utils as utils
|
||||
import json
|
||||
print(calibration_factors.keys())
|
||||
calibrated_table = create_flags_for_diagnostic_vars(data_table, calibration_factors)
|
||||
metadata['processing_date'] = utils.created_at()
|
||||
calibrated_table.to_csv(path_to_calibrated_file, index=False)
|
||||
|
||||
# Ensure the file exists
|
||||
if not os.path.exists(path_to_metadata_file):
|
||||
with open(path_to_metadata_file, 'w') as f:
|
||||
json.dump({}, f) # Initialize empty JSON
|
||||
|
||||
# Read the existing JSON
|
||||
with open(path_to_metadata_file, 'r') as metadata_file:
|
||||
try:
|
||||
json_dict = json.load(metadata_file)
|
||||
except json.JSONDecodeError:
|
||||
json_dict = {} # Start fresh if file is invalid
|
||||
|
||||
# Update the JSON object
|
||||
outputfileRelPath = os.path.relpath(path_to_calibrated_file, start=projectPath).replace(os.sep, '/')
|
||||
json_dict[outputfileRelPath] = metadata
|
||||
|
||||
# Write updated JSON back to the file
|
||||
with open(path_to_metadata_file, 'w') as metadata_file:
|
||||
json.dump(json_dict, metadata_file, indent=4)
|
||||
|
||||
print(f"Calibrated data saved to {path_to_calibrated_file}")
|
||||
print(f"Metadata for calibrated data saved to {path_to_metadata_file}")
|
||||
except Exception as e:
|
||||
print(f"Error during calibration: {e}")
|
||||
exit(1)
|
Reference in New Issue
Block a user