From 196e2378387df568aebbc67c7e0c8e05925dd5c3 Mon Sep 17 00:00:00 2001 From: simon_l Date: Wed, 28 May 2025 08:48:38 +0200 Subject: [PATCH 1/2] Update pipelines/steps/utils.py. Changes uncertainty_estimate --- pipelines/steps/utils.py | 464 +++++++++++++++++++-------------------- 1 file changed, 232 insertions(+), 232 deletions(-) diff --git a/pipelines/steps/utils.py b/pipelines/steps/utils.py index 95c8f78..57038d9 100644 --- a/pipelines/steps/utils.py +++ b/pipelines/steps/utils.py @@ -1,233 +1,233 @@ -import os -import json, yaml -import numpy as np -import pandas as pd - -def record_data_lineage(path_to_output_file, projectPath, metadata): - - path_to_output_dir, output_file = os.path.split(path_to_output_file) - - path_to_metadata_file = '/'.join([path_to_output_dir,'data_lineage_metadata.json']) - # Ensure the file exists - if not os.path.exists(path_to_metadata_file): - with open(path_to_metadata_file, 'w') as f: - json.dump({}, f) # Initialize empty JSON - - # Read the existing JSON - with open(path_to_metadata_file, 'r') as metadata_file: - try: - json_dict = json.load(metadata_file) - except json.JSONDecodeError: - json_dict = {} # Start fresh if file is invalid - - # Compute relative output file path and update the JSON object - #grelpath_to_output_file = os.path.relpath(path_to_output_file, start=projectPath).replace(os.sep, '/') - json_dict[output_file] = metadata - - # Write updated JSON back to the file - with open(path_to_metadata_file, 'w') as metadata_file: - json.dump(json_dict, metadata_file, indent=4) - - - print(f"Metadata for calibrated data saved to {path_to_metadata_file}") - - return 0 - -def get_metadata(path_to_file): - - path, filename = os.path.split(path_to_file) - - path_to_metadata = None - for item in os.listdir(path): - if 'metadata.json' in item: - path_to_metadata = os.path.normpath(os.path.join(path,item)) - metadata = {} - if path_to_file: - with open(path_to_metadata,'r') as stream: - metadata = json.load(stream) - - metadata = metadata.get(filename,{}) - - return metadata - -import numpy as np - -import numpy as np - -def generate_missing_value_code(max_val, num_decimals): - """ - Generate the largest all-9s missing value that can be represented exactly by float. - Caps total digits to 16 to avoid rounding. - - Args: - max_val (float): Largest expected valid value in the column. - num_decimals (int): Number of decimal places to preserve. - - Returns: - float: The missing value code. - """ - MAX_SIGNIFICANT_DIGITS = 16 - - # Calculate order of magnitude (roughly digits before decimal) - order = int(np.floor(np.log10(max_val))) + 2 if max_val > 0 else 2 - - # Cap total digits at 16 to avoid float rounding - total_digits = order + num_decimals - if total_digits > MAX_SIGNIFICANT_DIGITS: - # Reduce integer digits first to keep decimals if possible - int_digits = max(MAX_SIGNIFICANT_DIGITS - num_decimals, 1) - dec_digits = min(num_decimals, MAX_SIGNIFICANT_DIGITS - int_digits) - else: - int_digits = order - dec_digits = num_decimals - - # Construct the missing code string - if dec_digits > 0: - int_part = '9' * int_digits - dec_part = '9' * dec_digits - missing_code_str = f"{int_part}.{dec_part}" - else: - missing_code_str = '9' * int_digits - - missing_code = float(missing_code_str) - - return missing_code - -import math -import numpy as np - -def compute_uncertainty_estimate(x, x_err): - """ - Computes uncertainty estimate: sqrt((0.5 * x_err)^2 + (0.5 * x)^2) - for scalar inputs. Prints errors if inputs are invalid. - """ - try: - x = float(x) - x_err = float(x_err) - - if math.isnan(x) or math.isnan(x_err): - print(f"Warning: One or both inputs are NaN -> x: {x}, x_err: {x_err}") - return np.nan - - return math.sqrt((0.5 * x_err)**2 + (0.5 * x)**2) - - except (ValueError, TypeError) as e: - print(f"Error computing uncertainty for x: {x}, x_err: {x_err} -> {e}") - return np.nan - - - -def generate_error_dataframe(df: pd.DataFrame, datetime_var): - """ - Generates an error DataFrame by filling numeric 'correct' columns with a missing value code. - - Parameters - ---------- - df : pd.DataFrame - Input DataFrame containing numerical columns. - datetime_var : str - Name of the datetime column to retain. - - Returns - ------- - pd.DataFrame - DataFrame with error values based on missing value codes. - """ - df_numeric = df.select_dtypes(include=np.number) - - err_df_columns = [] - err_df_values = [] - - # Correct way to filter columns containing 'correct' - correct_cols = [col for col in df_numeric.columns if 'correct' in col] - - for col in correct_cols: - missing_value_code = generate_missing_value_code(df[col].max(skipna=True), 4) - err_df_values.append(missing_value_code) - err_df_columns.append(f"{col}_err") - - # Fix np.matmul usage and reshape err_df_values correctly - err_matrix = np.tile(np.array(err_df_values), (len(df),1)) # np.ones((len(df), len(err_df_values))) * np.array(err_df_values) - - df_err = pd.DataFrame(data=err_matrix, columns=err_df_columns) - - # Ensure datetime_var exists in df before assignment - if datetime_var in df.columns: - df_err[datetime_var] = df[datetime_var].values - else: - raise ValueError(f"Column '{datetime_var}' not found in DataFrame") - - return df_err - -import numpy as np -import pandas as pd - -def metadata_dict_to_dataframe(metadata: dict, shape: tuple): - """ - Converts a metadata dictionary into a repeated data table. - - Parameters - ---------- - metadata : dict - Dictionary containing metadata where keys are column names and values are repeated across rows. - shape : tuple - Shape of the output DataFrame (rows, columns). The number of columns must match the length of `metadata`. - - Returns - ------- - pd.DataFrame - DataFrame with metadata values repeated according to the specified shape. - """ - # Ensure shape is valid (rows, columns) - rows, cols = shape - - if cols != len(metadata): - raise ValueError(f"Shape mismatch: {cols} columns expected, but metadata has {len(metadata)} keys.") - - # Extract metadata values and reshape them properly - values = np.array(list(metadata.values())).reshape((1,cols)) - - # Tile the values to match the desired shape - data_table = np.tile(values, (rows, 1)) - - # Create DataFrame with correct column names - df = pd.DataFrame(data=data_table, columns=list(metadata.keys())) - - return df - -def resolve_project_path(): - try: - thisFilePath = os.path.abspath(__file__) - except NameError: - thisFilePath = os.getcwd() - return os.path.normpath(os.path.join(thisFilePath, "..", "..", "..")) - -def load_project_yaml_files(projectPath : str, filename : str): - - allowed_filenames = ['acsm_to_ebas.yaml', 'calibration_params.yaml', 'calibration_factors.yaml', 'limits_of_detection.yaml', 'station_params.yaml', 'validity_thresholds.yaml', 'campaignDescriptor.yaml'] - - if not filename in allowed_filenames: - raise ValueError(f'Invalid filename : {filename}. The filename should be selected from the following list {allowed_filenames}.') - - filename_to_relpath = {"acsm_to_ebas.yaml":"pipelines/dictionaries/acsm_to_ebas.yaml", - "calibration_params.yaml":"pipelines/params/calibration_params.yaml", - "calibration_factors.yaml" : "pipelines/params/calibration_factors.yaml", - "limits_of_detection.yaml":"pipelines/params/limits_of_detection.yaml", - "station_params.yaml":"pipelines/params/station_params.yaml", - "validity_thresholds.yaml":"pipelines/params/validity_thresholds.yaml", - "campaignDescriptor.yaml":"campaignDescriptor.yaml"} - - # Implicit input - if filename_to_relpath.get(filename,None): - dict_file = os.path.normpath(os.path.join(projectPath,filename_to_relpath[filename])) - - output_dict = {} - try: - with open(dict_file, 'r') as stream: - output_dict = yaml.load(stream, Loader=yaml.FullLoader) - except Exception as e: - - print(f'Error loading {dict_file}: {e}') - return {} - +import os +import json, yaml +import numpy as np +import pandas as pd + +def record_data_lineage(path_to_output_file, projectPath, metadata): + + path_to_output_dir, output_file = os.path.split(path_to_output_file) + + path_to_metadata_file = '/'.join([path_to_output_dir,'data_lineage_metadata.json']) + # Ensure the file exists + if not os.path.exists(path_to_metadata_file): + with open(path_to_metadata_file, 'w') as f: + json.dump({}, f) # Initialize empty JSON + + # Read the existing JSON + with open(path_to_metadata_file, 'r') as metadata_file: + try: + json_dict = json.load(metadata_file) + except json.JSONDecodeError: + json_dict = {} # Start fresh if file is invalid + + # Compute relative output file path and update the JSON object + #grelpath_to_output_file = os.path.relpath(path_to_output_file, start=projectPath).replace(os.sep, '/') + json_dict[output_file] = metadata + + # Write updated JSON back to the file + with open(path_to_metadata_file, 'w') as metadata_file: + json.dump(json_dict, metadata_file, indent=4) + + + print(f"Metadata for calibrated data saved to {path_to_metadata_file}") + + return 0 + +def get_metadata(path_to_file): + + path, filename = os.path.split(path_to_file) + + path_to_metadata = None + for item in os.listdir(path): + if 'metadata.json' in item: + path_to_metadata = os.path.normpath(os.path.join(path,item)) + metadata = {} + if path_to_file: + with open(path_to_metadata,'r') as stream: + metadata = json.load(stream) + + metadata = metadata.get(filename,{}) + + return metadata + +import numpy as np + +import numpy as np + +def generate_missing_value_code(max_val, num_decimals): + """ + Generate the largest all-9s missing value that can be represented exactly by float. + Caps total digits to 16 to avoid rounding. + + Args: + max_val (float): Largest expected valid value in the column. + num_decimals (int): Number of decimal places to preserve. + + Returns: + float: The missing value code. + """ + MAX_SIGNIFICANT_DIGITS = 16 + + # Calculate order of magnitude (roughly digits before decimal) + order = int(np.floor(np.log10(max_val))) + 2 if max_val > 0 else 2 + + # Cap total digits at 16 to avoid float rounding + total_digits = order + num_decimals + if total_digits > MAX_SIGNIFICANT_DIGITS: + # Reduce integer digits first to keep decimals if possible + int_digits = max(MAX_SIGNIFICANT_DIGITS - num_decimals, 1) + dec_digits = min(num_decimals, MAX_SIGNIFICANT_DIGITS - int_digits) + else: + int_digits = order + dec_digits = num_decimals + + # Construct the missing code string + if dec_digits > 0: + int_part = '9' * int_digits + dec_part = '9' * dec_digits + missing_code_str = f"{int_part}.{dec_part}" + else: + missing_code_str = '9' * int_digits + + missing_code = float(missing_code_str) + + return missing_code + +import math +import numpy as np + +def compute_uncertainty_estimate(x, x_err): + """ + Computes uncertainty estimate: sqrt((x_err)^2 + (0.5 * x)^2) + for scalar inputs. Prints errors if inputs are invalid. + """ + try: + x = float(x) + x_err = float(x_err) + + if math.isnan(x) or math.isnan(x_err): + print(f"Warning: One or both inputs are NaN -> x: {x}, x_err: {x_err}") + return np.nan + + return math.sqrt((x_err)**2 + (0.5 * x)**2) + + except (ValueError, TypeError) as e: + print(f"Error computing uncertainty for x: {x}, x_err: {x_err} -> {e}") + return np.nan + + + +def generate_error_dataframe(df: pd.DataFrame, datetime_var): + """ + Generates an error DataFrame by filling numeric 'correct' columns with a missing value code. + + Parameters + ---------- + df : pd.DataFrame + Input DataFrame containing numerical columns. + datetime_var : str + Name of the datetime column to retain. + + Returns + ------- + pd.DataFrame + DataFrame with error values based on missing value codes. + """ + df_numeric = df.select_dtypes(include=np.number) + + err_df_columns = [] + err_df_values = [] + + # Correct way to filter columns containing 'correct' + correct_cols = [col for col in df_numeric.columns if 'correct' in col] + + for col in correct_cols: + missing_value_code = generate_missing_value_code(df[col].max(skipna=True), 4) + err_df_values.append(missing_value_code) + err_df_columns.append(f"{col}_err") + + # Fix np.matmul usage and reshape err_df_values correctly + err_matrix = np.tile(np.array(err_df_values), (len(df),1)) # np.ones((len(df), len(err_df_values))) * np.array(err_df_values) + + df_err = pd.DataFrame(data=err_matrix, columns=err_df_columns) + + # Ensure datetime_var exists in df before assignment + if datetime_var in df.columns: + df_err[datetime_var] = df[datetime_var].values + else: + raise ValueError(f"Column '{datetime_var}' not found in DataFrame") + + return df_err + +import numpy as np +import pandas as pd + +def metadata_dict_to_dataframe(metadata: dict, shape: tuple): + """ + Converts a metadata dictionary into a repeated data table. + + Parameters + ---------- + metadata : dict + Dictionary containing metadata where keys are column names and values are repeated across rows. + shape : tuple + Shape of the output DataFrame (rows, columns). The number of columns must match the length of `metadata`. + + Returns + ------- + pd.DataFrame + DataFrame with metadata values repeated according to the specified shape. + """ + # Ensure shape is valid (rows, columns) + rows, cols = shape + + if cols != len(metadata): + raise ValueError(f"Shape mismatch: {cols} columns expected, but metadata has {len(metadata)} keys.") + + # Extract metadata values and reshape them properly + values = np.array(list(metadata.values())).reshape((1,cols)) + + # Tile the values to match the desired shape + data_table = np.tile(values, (rows, 1)) + + # Create DataFrame with correct column names + df = pd.DataFrame(data=data_table, columns=list(metadata.keys())) + + return df + +def resolve_project_path(): + try: + thisFilePath = os.path.abspath(__file__) + except NameError: + thisFilePath = os.getcwd() + return os.path.normpath(os.path.join(thisFilePath, "..", "..", "..")) + +def load_project_yaml_files(projectPath : str, filename : str): + + allowed_filenames = ['acsm_to_ebas.yaml', 'calibration_params.yaml', 'calibration_factors.yaml', 'limits_of_detection.yaml', 'station_params.yaml', 'validity_thresholds.yaml', 'campaignDescriptor.yaml'] + + if not filename in allowed_filenames: + raise ValueError(f'Invalid filename : {filename}. The filename should be selected from the following list {allowed_filenames}.') + + filename_to_relpath = {"acsm_to_ebas.yaml":"pipelines/dictionaries/acsm_to_ebas.yaml", + "calibration_params.yaml":"pipelines/params/calibration_params.yaml", + "calibration_factors.yaml" : "pipelines/params/calibration_factors.yaml", + "limits_of_detection.yaml":"pipelines/params/limits_of_detection.yaml", + "station_params.yaml":"pipelines/params/station_params.yaml", + "validity_thresholds.yaml":"pipelines/params/validity_thresholds.yaml", + "campaignDescriptor.yaml":"campaignDescriptor.yaml"} + + # Implicit input + if filename_to_relpath.get(filename,None): + dict_file = os.path.normpath(os.path.join(projectPath,filename_to_relpath[filename])) + + output_dict = {} + try: + with open(dict_file, 'r') as stream: + output_dict = yaml.load(stream, Loader=yaml.FullLoader) + except Exception as e: + + print(f'Error loading {dict_file}: {e}') + return {} + return output_dict \ No newline at end of file From 3a23db1a123fd7facc11fa4d7abd6fbc3e89183f Mon Sep 17 00:00:00 2001 From: simon_l Date: Wed, 28 May 2025 11:48:26 +0200 Subject: [PATCH 2/2] Update TODO.md. Added some todos and deleted one that is done now. --- TODO.md | 92 +++++++++++++++++++++++++++++---------------------------- 1 file changed, 47 insertions(+), 45 deletions(-) diff --git a/TODO.md b/TODO.md index ce05f0a..b05aa97 100644 --- a/TODO.md +++ b/TODO.md @@ -1,45 +1,47 @@ -# TODO -* [from Leïla] Correct error when flags are loaded in the flagging app -* [from Leïla] End the file at end of year (or filter only current year) -* [from Leïla] Change "9999.999" to "9999999.9999" in header -* [from Leïla] Update Detection limit values in L2 header: take the ones (1h) from limits_of_detection.yaml -* [from Leïla] For PAY, calculate error as 50% of concentration -* [from Leïla] Correct errors (uncertainties) that they can't be lower than 0.0001 -* [from Leïla] Change flow rate values to 10% of flow rate ref -* [New] Create data flows to validate and replace existing data chain params. data/campaignData/param/ -> pipelines/params and campaignDescriptor.yaml -> acsm data converter. -* [New] DIMA. Add dictionaries to explain variables at different levels. -* [New] DIMA. Modify data integration pipeline to not add current datetime in filename when not specified. -* [New] DIMA. Set data/ folder as default and only possible output folder -* [New] DIMA. Review DataManager File Update. Generated CSV files are not transfering properly. -* [New] DIMA. Ensure code snippets that open and close HDF5 files do so securly and do not leave the file locked -* [New] DIMA. Revise default csv file reader and enhance it to infer datetime column and format. -* [New] Learn how to run docker-compose.yaml -* [New] EBAS_SUBMISSION_STEP. Extend time filter to ranges, create a merge data frame function, and construct file name and output dir dynammically. It is currently hardcoded. -* [New] Finish command line interface for visualize_datatable_vars and add modes, --flags, --dataset, and save to figures folder in repo -* Implement flagging-app specific data operations such as: - 1. [New item] When verify flags from checklist is active, enable delete-flag button to delete flag associated with active cell on table. - 2. [New item] When verify and ready to trasnfer items on checklist are active, enable record-flags button to record verified flags into the HDF5 file. - 3. [New item] When all checklist items active, enable apply button to apply flags to the time series data and save it to the HDF5 file. - 1. ~~Define data manager obj with apply flags behavior.~~ - 2. Define metadata answering who did the flagging and quality assurance tests? - 3. Update intruments/dictionaries/ACSM_TOFWARE_flags.yaml and instruments/readers/flag_reader.py to describe metadata elements based on dictionary. - 4. ~~Update DIMA data integration pipeline to allowed user-defined file naming template~~ - 5. ~~Design and implement flag visualization feature: click flag on table and display on figure shaded region when feature is enabled~~ - 6. Implement schema validator on yaml/json representation of hdf5 metadata - 7. Implement updates to 'actris level' and 'processing_script' after operation applied to data/file? - -* ~~When `Create Flag` is clicked, modify the title to indicate that we are in flagging mode and ROIs can be drawn by dragging.~~ - -* ~~Update `Commit Flag` logic:~~ - ~~3. Update recorded flags directory, and add provenance information to each flag (which instrument and channel belongs to).~~ - -* Record collected flag information initially in a YAML or JSON file. Is this faster than writing directly to the HDF5 file? - -* Should we actively transfer collected flags by clicking a button? after commit button is pressed, each flag is now stored in an independent json file - -* Enable some form of chunk storage and visualization from the HDF5 file. Iterate over chunks for faster display versus access time. - 1. Do I need to modify DIMA? - 2. What is a good chunk size? - 3. What Dash component can we use to iterate over the chunks? - -![Screenshot](figures/flagging_app_screenshot.JPG) +# TODO +* [from Leïla] Data Flagging App. Correct error when flags are loaded in the app. +* [from Leïla] Before Step 5. The file should contain only data of the current year. +* [from Leïla] Ebas converter. Change "9999.999" to "9999999.9999" in header +* [from Leïla] Ebas converter. Update Detection limit values in L2 header: take the ones (1h) from limits_of_detection.yaml +* [from Leïla] Ebas converter. Correct errors (uncertainties) that they can't be lower than 0.0001 +* [from Leïla] Flag using validity threshold. Change flow rate values to 10% of flow rate ref. +* [from Leïla] In Step 1. Creation of a new collection should be an option and not automatic. +* [from Leïla] The data chain (except Step 5) should also include Org_mx and Ord_err. To discuss together. +* [from Leïla] Step 4.1. Add a step to verify ACSM data with external instruments (MPSS, eBC). To discuss together. +* [New] Create data flows to validate and replace existing data chain params. data/campaignData/param/ -> pipelines/params and campaignDescriptor.yaml -> acsm data converter. +* [New] DIMA. Add dictionaries to explain variables at different levels. +* [New] DIMA. Modify data integration pipeline to not add current datetime in filename when not specified. +* [New] DIMA. Set data/ folder as default and only possible output folder +* [New] DIMA. Review DataManager File Update. Generated CSV files are not transfering properly. +* [New] DIMA. Ensure code snippets that open and close HDF5 files do so securly and do not leave the file locked +* [New] DIMA. Revise default csv file reader and enhance it to infer datetime column and format. +* [New] Learn how to run docker-compose.yaml +* [New] EBAS_SUBMISSION_STEP. Extend time filter to ranges, create a merge data frame function, and construct file name and output dir dynammically. It is currently hardcoded. +* [New] Finish command line interface for visualize_datatable_vars and add modes, --flags, --dataset, and save to figures folder in repo +* Implement flagging-app specific data operations such as: + 1. [New item] When verify flags from checklist is active, enable delete-flag button to delete flag associated with active cell on table. + 2. [New item] When verify and ready to trasnfer items on checklist are active, enable record-flags button to record verified flags into the HDF5 file. + 3. [New item] When all checklist items active, enable apply button to apply flags to the time series data and save it to the HDF5 file. + 1. ~~Define data manager obj with apply flags behavior.~~ + 2. Define metadata answering who did the flagging and quality assurance tests? + 3. Update intruments/dictionaries/ACSM_TOFWARE_flags.yaml and instruments/readers/flag_reader.py to describe metadata elements based on dictionary. + 4. ~~Update DIMA data integration pipeline to allowed user-defined file naming template~~ + 5. ~~Design and implement flag visualization feature: click flag on table and display on figure shaded region when feature is enabled~~ + 6. Implement schema validator on yaml/json representation of hdf5 metadata + 7. Implement updates to 'actris level' and 'processing_script' after operation applied to data/file? + +* ~~When `Create Flag` is clicked, modify the title to indicate that we are in flagging mode and ROIs can be drawn by dragging.~~ + +* ~~Update `Commit Flag` logic:~~ + ~~3. Update recorded flags directory, and add provenance information to each flag (which instrument and channel belongs to).~~ + +* Record collected flag information initially in a YAML or JSON file. Is this faster than writing directly to the HDF5 file? + +* Should we actively transfer collected flags by clicking a button? after commit button is pressed, each flag is now stored in an independent json file + +* Enable some form of chunk storage and visualization from the HDF5 file. Iterate over chunks for faster display versus access time. + 1. Do I need to modify DIMA? + 2. What is a good chunk size? + 3. What Dash component can we use to iterate over the chunks? + +![Screenshot](figures/flagging_app_screenshot.JPG)