From 78340464aa18369358ca43b5b3bca0a52118a202 Mon Sep 17 00:00:00 2001 From: Florez Ospina Juan Felipe Date: Fri, 7 Mar 2025 16:45:33 +0100 Subject: [PATCH] Add functions: generate_error_dataframe() with missing values, metadata_dict_to_dataframe(), and load_project_yaml_files() to easily access data from yaml files in the project. --- pipelines/steps/utils.py | 132 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 129 insertions(+), 3 deletions(-) diff --git a/pipelines/steps/utils.py b/pipelines/steps/utils.py index dec2085..2341845 100644 --- a/pipelines/steps/utils.py +++ b/pipelines/steps/utils.py @@ -1,6 +1,7 @@ import os -import json - +import json, yaml +import numpy as np +import pandas as pd def record_data_lineage(path_to_output_file, projectPath, metadata): @@ -47,4 +48,129 @@ def get_metadata(path_to_file): metadata = metadata.get(filename,{}) - return metadata \ No newline at end of file + return metadata + +def generate_missing_value_code(max_val, num_decimals): + """ + Generates a missing value code consisting of all 9s. + - `max_val`: Largest expected valid value in the column. + - `num_decimals`: Number of decimal places to preserve. + """ + # Determine order of magnitude (1-2 orders larger than max value) + order = int(np.floor(np.log10(max_val))) + 2 if max_val > 0 else 2 + + # Construct the missing value code as all 9s + if num_decimals > 0: + missing_code = float(f"{'9' * (order + num_decimals)}.{ '9' * num_decimals }") + else: + missing_code = int('9' * order) + + return missing_code + + +def generate_error_dataframe(df: pd.DataFrame, datetime_var): + """ + Generates an error DataFrame by filling numeric 'correct' columns with a missing value code. + + Parameters + ---------- + df : pd.DataFrame + Input DataFrame containing numerical columns. + datetime_var : str + Name of the datetime column to retain. + + Returns + ------- + pd.DataFrame + DataFrame with error values based on missing value codes. + """ + df_numeric = df.select_dtypes(include=np.number) + + err_df_columns = [] + err_df_values = [] + + # Correct way to filter columns containing 'correct' + correct_cols = [col for col in df_numeric.columns if 'correct' in col] + + for col in correct_cols: + missing_value_code = generate_missing_value_code(df[col].max(skipna=True), 4) + err_df_values.append(missing_value_code) + err_df_columns.append(f"{col}_err") + + # Fix np.matmul usage and reshape err_df_values correctly + err_matrix = np.tile(np.array(err_df_values), (len(df),1)) # np.ones((len(df), len(err_df_values))) * np.array(err_df_values) + + df_err = pd.DataFrame(data=err_matrix, columns=err_df_columns) + + # Ensure datetime_var exists in df before assignment + if datetime_var in df.columns: + df_err[datetime_var] = df[datetime_var].values + else: + raise ValueError(f"Column '{datetime_var}' not found in DataFrame") + + return df_err + +import numpy as np +import pandas as pd + +def metadata_dict_to_dataframe(metadata: dict, shape: tuple): + """ + Converts a metadata dictionary into a repeated data table. + + Parameters + ---------- + metadata : dict + Dictionary containing metadata where keys are column names and values are repeated across rows. + shape : tuple + Shape of the output DataFrame (rows, columns). The number of columns must match the length of `metadata`. + + Returns + ------- + pd.DataFrame + DataFrame with metadata values repeated according to the specified shape. + """ + # Ensure shape is valid (rows, columns) + rows, cols = shape + + if cols != len(metadata): + raise ValueError(f"Shape mismatch: {cols} columns expected, but metadata has {len(metadata)} keys.") + + # Extract metadata values and reshape them properly + values = np.array(list(metadata.values())).reshape((1,cols)) + + # Tile the values to match the desired shape + data_table = np.tile(values, (rows, 1)) + + # Create DataFrame with correct column names + df = pd.DataFrame(data=data_table, columns=list(metadata.keys())) + + return df + + +def load_project_yaml_files(projectPath : str, filename : str): + + allowed_filenames = ['acsm_to_ebas.yaml', 'calibration_params.yaml', 'limits_of_detection.yaml', 'station_params.yaml', 'validity_thresholds.yaml'] + + if not filename in allowed_filenames: + raise ValueError(f'Invalid filename : {filename}. The filename should be selected from the following list {allowed_filenames}.') + + filename_to_relpath = {"acsm_to_ebas.yaml":"pipelines/dictionaries/acsm_to_ebas.yaml", + "calibration_params.yaml":"pipelines/params/calibration_params.yaml", + "limits_of_detection.yaml":"pipelines/params/limits_of_detection.yaml", + "station_params.yaml":"pipelines/params/station_params.yaml", + "validity_thresholds.yaml":"pipelines/params/validity_thresholds.yaml"} + + # Implicit input + if filename_to_relpath.get(filename,None): + dict_file = os.path.normpath(os.path.join(projectPath,filename_to_relpath[filename])) + + output_dict = {} + try: + with open(dict_file, 'r') as stream: + output_dict = yaml.load(stream, Loader=yaml.FullLoader) + except Exception as e: + + print(f'Error loading {dict_file}: {e}') + return {} + + return output_dict \ No newline at end of file