Update pipelines/steps/utils.py. Changes uncertainty_estimate

This commit is contained in:
2025-05-28 08:48:38 +02:00
parent ddfa1273af
commit 196e237838

View File

@ -1,233 +1,233 @@
import os import os
import json, yaml import json, yaml
import numpy as np import numpy as np
import pandas as pd import pandas as pd
def record_data_lineage(path_to_output_file, projectPath, metadata): def record_data_lineage(path_to_output_file, projectPath, metadata):
path_to_output_dir, output_file = os.path.split(path_to_output_file) path_to_output_dir, output_file = os.path.split(path_to_output_file)
path_to_metadata_file = '/'.join([path_to_output_dir,'data_lineage_metadata.json']) path_to_metadata_file = '/'.join([path_to_output_dir,'data_lineage_metadata.json'])
# Ensure the file exists # Ensure the file exists
if not os.path.exists(path_to_metadata_file): if not os.path.exists(path_to_metadata_file):
with open(path_to_metadata_file, 'w') as f: with open(path_to_metadata_file, 'w') as f:
json.dump({}, f) # Initialize empty JSON json.dump({}, f) # Initialize empty JSON
# Read the existing JSON # Read the existing JSON
with open(path_to_metadata_file, 'r') as metadata_file: with open(path_to_metadata_file, 'r') as metadata_file:
try: try:
json_dict = json.load(metadata_file) json_dict = json.load(metadata_file)
except json.JSONDecodeError: except json.JSONDecodeError:
json_dict = {} # Start fresh if file is invalid json_dict = {} # Start fresh if file is invalid
# Compute relative output file path and update the JSON object # Compute relative output file path and update the JSON object
#grelpath_to_output_file = os.path.relpath(path_to_output_file, start=projectPath).replace(os.sep, '/') #grelpath_to_output_file = os.path.relpath(path_to_output_file, start=projectPath).replace(os.sep, '/')
json_dict[output_file] = metadata json_dict[output_file] = metadata
# Write updated JSON back to the file # Write updated JSON back to the file
with open(path_to_metadata_file, 'w') as metadata_file: with open(path_to_metadata_file, 'w') as metadata_file:
json.dump(json_dict, metadata_file, indent=4) json.dump(json_dict, metadata_file, indent=4)
print(f"Metadata for calibrated data saved to {path_to_metadata_file}") print(f"Metadata for calibrated data saved to {path_to_metadata_file}")
return 0 return 0
def get_metadata(path_to_file): def get_metadata(path_to_file):
path, filename = os.path.split(path_to_file) path, filename = os.path.split(path_to_file)
path_to_metadata = None path_to_metadata = None
for item in os.listdir(path): for item in os.listdir(path):
if 'metadata.json' in item: if 'metadata.json' in item:
path_to_metadata = os.path.normpath(os.path.join(path,item)) path_to_metadata = os.path.normpath(os.path.join(path,item))
metadata = {} metadata = {}
if path_to_file: if path_to_file:
with open(path_to_metadata,'r') as stream: with open(path_to_metadata,'r') as stream:
metadata = json.load(stream) metadata = json.load(stream)
metadata = metadata.get(filename,{}) metadata = metadata.get(filename,{})
return metadata return metadata
import numpy as np import numpy as np
import numpy as np import numpy as np
def generate_missing_value_code(max_val, num_decimals): def generate_missing_value_code(max_val, num_decimals):
""" """
Generate the largest all-9s missing value that can be represented exactly by float. Generate the largest all-9s missing value that can be represented exactly by float.
Caps total digits to 16 to avoid rounding. Caps total digits to 16 to avoid rounding.
Args: Args:
max_val (float): Largest expected valid value in the column. max_val (float): Largest expected valid value in the column.
num_decimals (int): Number of decimal places to preserve. num_decimals (int): Number of decimal places to preserve.
Returns: Returns:
float: The missing value code. float: The missing value code.
""" """
MAX_SIGNIFICANT_DIGITS = 16 MAX_SIGNIFICANT_DIGITS = 16
# Calculate order of magnitude (roughly digits before decimal) # Calculate order of magnitude (roughly digits before decimal)
order = int(np.floor(np.log10(max_val))) + 2 if max_val > 0 else 2 order = int(np.floor(np.log10(max_val))) + 2 if max_val > 0 else 2
# Cap total digits at 16 to avoid float rounding # Cap total digits at 16 to avoid float rounding
total_digits = order + num_decimals total_digits = order + num_decimals
if total_digits > MAX_SIGNIFICANT_DIGITS: if total_digits > MAX_SIGNIFICANT_DIGITS:
# Reduce integer digits first to keep decimals if possible # Reduce integer digits first to keep decimals if possible
int_digits = max(MAX_SIGNIFICANT_DIGITS - num_decimals, 1) int_digits = max(MAX_SIGNIFICANT_DIGITS - num_decimals, 1)
dec_digits = min(num_decimals, MAX_SIGNIFICANT_DIGITS - int_digits) dec_digits = min(num_decimals, MAX_SIGNIFICANT_DIGITS - int_digits)
else: else:
int_digits = order int_digits = order
dec_digits = num_decimals dec_digits = num_decimals
# Construct the missing code string # Construct the missing code string
if dec_digits > 0: if dec_digits > 0:
int_part = '9' * int_digits int_part = '9' * int_digits
dec_part = '9' * dec_digits dec_part = '9' * dec_digits
missing_code_str = f"{int_part}.{dec_part}" missing_code_str = f"{int_part}.{dec_part}"
else: else:
missing_code_str = '9' * int_digits missing_code_str = '9' * int_digits
missing_code = float(missing_code_str) missing_code = float(missing_code_str)
return missing_code return missing_code
import math import math
import numpy as np import numpy as np
def compute_uncertainty_estimate(x, x_err): def compute_uncertainty_estimate(x, x_err):
""" """
Computes uncertainty estimate: sqrt((0.5 * x_err)^2 + (0.5 * x)^2) Computes uncertainty estimate: sqrt((x_err)^2 + (0.5 * x)^2)
for scalar inputs. Prints errors if inputs are invalid. for scalar inputs. Prints errors if inputs are invalid.
""" """
try: try:
x = float(x) x = float(x)
x_err = float(x_err) x_err = float(x_err)
if math.isnan(x) or math.isnan(x_err): if math.isnan(x) or math.isnan(x_err):
print(f"Warning: One or both inputs are NaN -> x: {x}, x_err: {x_err}") print(f"Warning: One or both inputs are NaN -> x: {x}, x_err: {x_err}")
return np.nan return np.nan
return math.sqrt((0.5 * x_err)**2 + (0.5 * x)**2) return math.sqrt((x_err)**2 + (0.5 * x)**2)
except (ValueError, TypeError) as e: except (ValueError, TypeError) as e:
print(f"Error computing uncertainty for x: {x}, x_err: {x_err} -> {e}") print(f"Error computing uncertainty for x: {x}, x_err: {x_err} -> {e}")
return np.nan return np.nan
def generate_error_dataframe(df: pd.DataFrame, datetime_var): def generate_error_dataframe(df: pd.DataFrame, datetime_var):
""" """
Generates an error DataFrame by filling numeric 'correct' columns with a missing value code. Generates an error DataFrame by filling numeric 'correct' columns with a missing value code.
Parameters Parameters
---------- ----------
df : pd.DataFrame df : pd.DataFrame
Input DataFrame containing numerical columns. Input DataFrame containing numerical columns.
datetime_var : str datetime_var : str
Name of the datetime column to retain. Name of the datetime column to retain.
Returns Returns
------- -------
pd.DataFrame pd.DataFrame
DataFrame with error values based on missing value codes. DataFrame with error values based on missing value codes.
""" """
df_numeric = df.select_dtypes(include=np.number) df_numeric = df.select_dtypes(include=np.number)
err_df_columns = [] err_df_columns = []
err_df_values = [] err_df_values = []
# Correct way to filter columns containing 'correct' # Correct way to filter columns containing 'correct'
correct_cols = [col for col in df_numeric.columns if 'correct' in col] correct_cols = [col for col in df_numeric.columns if 'correct' in col]
for col in correct_cols: for col in correct_cols:
missing_value_code = generate_missing_value_code(df[col].max(skipna=True), 4) missing_value_code = generate_missing_value_code(df[col].max(skipna=True), 4)
err_df_values.append(missing_value_code) err_df_values.append(missing_value_code)
err_df_columns.append(f"{col}_err") err_df_columns.append(f"{col}_err")
# Fix np.matmul usage and reshape err_df_values correctly # Fix np.matmul usage and reshape err_df_values correctly
err_matrix = np.tile(np.array(err_df_values), (len(df),1)) # np.ones((len(df), len(err_df_values))) * np.array(err_df_values) err_matrix = np.tile(np.array(err_df_values), (len(df),1)) # np.ones((len(df), len(err_df_values))) * np.array(err_df_values)
df_err = pd.DataFrame(data=err_matrix, columns=err_df_columns) df_err = pd.DataFrame(data=err_matrix, columns=err_df_columns)
# Ensure datetime_var exists in df before assignment # Ensure datetime_var exists in df before assignment
if datetime_var in df.columns: if datetime_var in df.columns:
df_err[datetime_var] = df[datetime_var].values df_err[datetime_var] = df[datetime_var].values
else: else:
raise ValueError(f"Column '{datetime_var}' not found in DataFrame") raise ValueError(f"Column '{datetime_var}' not found in DataFrame")
return df_err return df_err
import numpy as np import numpy as np
import pandas as pd import pandas as pd
def metadata_dict_to_dataframe(metadata: dict, shape: tuple): def metadata_dict_to_dataframe(metadata: dict, shape: tuple):
""" """
Converts a metadata dictionary into a repeated data table. Converts a metadata dictionary into a repeated data table.
Parameters Parameters
---------- ----------
metadata : dict metadata : dict
Dictionary containing metadata where keys are column names and values are repeated across rows. Dictionary containing metadata where keys are column names and values are repeated across rows.
shape : tuple shape : tuple
Shape of the output DataFrame (rows, columns). The number of columns must match the length of `metadata`. Shape of the output DataFrame (rows, columns). The number of columns must match the length of `metadata`.
Returns Returns
------- -------
pd.DataFrame pd.DataFrame
DataFrame with metadata values repeated according to the specified shape. DataFrame with metadata values repeated according to the specified shape.
""" """
# Ensure shape is valid (rows, columns) # Ensure shape is valid (rows, columns)
rows, cols = shape rows, cols = shape
if cols != len(metadata): if cols != len(metadata):
raise ValueError(f"Shape mismatch: {cols} columns expected, but metadata has {len(metadata)} keys.") raise ValueError(f"Shape mismatch: {cols} columns expected, but metadata has {len(metadata)} keys.")
# Extract metadata values and reshape them properly # Extract metadata values and reshape them properly
values = np.array(list(metadata.values())).reshape((1,cols)) values = np.array(list(metadata.values())).reshape((1,cols))
# Tile the values to match the desired shape # Tile the values to match the desired shape
data_table = np.tile(values, (rows, 1)) data_table = np.tile(values, (rows, 1))
# Create DataFrame with correct column names # Create DataFrame with correct column names
df = pd.DataFrame(data=data_table, columns=list(metadata.keys())) df = pd.DataFrame(data=data_table, columns=list(metadata.keys()))
return df return df
def resolve_project_path(): def resolve_project_path():
try: try:
thisFilePath = os.path.abspath(__file__) thisFilePath = os.path.abspath(__file__)
except NameError: except NameError:
thisFilePath = os.getcwd() thisFilePath = os.getcwd()
return os.path.normpath(os.path.join(thisFilePath, "..", "..", "..")) return os.path.normpath(os.path.join(thisFilePath, "..", "..", ".."))
def load_project_yaml_files(projectPath : str, filename : str): def load_project_yaml_files(projectPath : str, filename : str):
allowed_filenames = ['acsm_to_ebas.yaml', 'calibration_params.yaml', 'calibration_factors.yaml', 'limits_of_detection.yaml', 'station_params.yaml', 'validity_thresholds.yaml', 'campaignDescriptor.yaml'] allowed_filenames = ['acsm_to_ebas.yaml', 'calibration_params.yaml', 'calibration_factors.yaml', 'limits_of_detection.yaml', 'station_params.yaml', 'validity_thresholds.yaml', 'campaignDescriptor.yaml']
if not filename in allowed_filenames: if not filename in allowed_filenames:
raise ValueError(f'Invalid filename : {filename}. The filename should be selected from the following list {allowed_filenames}.') raise ValueError(f'Invalid filename : {filename}. The filename should be selected from the following list {allowed_filenames}.')
filename_to_relpath = {"acsm_to_ebas.yaml":"pipelines/dictionaries/acsm_to_ebas.yaml", filename_to_relpath = {"acsm_to_ebas.yaml":"pipelines/dictionaries/acsm_to_ebas.yaml",
"calibration_params.yaml":"pipelines/params/calibration_params.yaml", "calibration_params.yaml":"pipelines/params/calibration_params.yaml",
"calibration_factors.yaml" : "pipelines/params/calibration_factors.yaml", "calibration_factors.yaml" : "pipelines/params/calibration_factors.yaml",
"limits_of_detection.yaml":"pipelines/params/limits_of_detection.yaml", "limits_of_detection.yaml":"pipelines/params/limits_of_detection.yaml",
"station_params.yaml":"pipelines/params/station_params.yaml", "station_params.yaml":"pipelines/params/station_params.yaml",
"validity_thresholds.yaml":"pipelines/params/validity_thresholds.yaml", "validity_thresholds.yaml":"pipelines/params/validity_thresholds.yaml",
"campaignDescriptor.yaml":"campaignDescriptor.yaml"} "campaignDescriptor.yaml":"campaignDescriptor.yaml"}
# Implicit input # Implicit input
if filename_to_relpath.get(filename,None): if filename_to_relpath.get(filename,None):
dict_file = os.path.normpath(os.path.join(projectPath,filename_to_relpath[filename])) dict_file = os.path.normpath(os.path.join(projectPath,filename_to_relpath[filename]))
output_dict = {} output_dict = {}
try: try:
with open(dict_file, 'r') as stream: with open(dict_file, 'r') as stream:
output_dict = yaml.load(stream, Loader=yaml.FullLoader) output_dict = yaml.load(stream, Loader=yaml.FullLoader)
except Exception as e: except Exception as e:
print(f'Error loading {dict_file}: {e}') print(f'Error loading {dict_file}: {e}')
return {} return {}
return output_dict return output_dict