Merge pull request 'WIP.new_changes_leila' (#1) from new_changes_leila into main

Reviewed-on: APOG/acsmnode#1
This commit is contained in:
2025-06-04 09:35:31 +02:00
2 changed files with 279 additions and 277 deletions

92
TODO.md
View File

@ -1,45 +1,47 @@
# TODO # TODO
* [from Leïla] Correct error when flags are loaded in the flagging app * [from Leïla] Data Flagging App. Correct error when flags are loaded in the app.
* [from Leïla] End the file at end of year (or filter only current year) * [from Leïla] Before Step 5. The file should contain only data of the current year.
* [from Leïla] Change "9999.999" to "9999999.9999" in header * [from Leïla] Ebas converter. Change "9999.999" to "9999999.9999" in header
* [from Leïla] Update Detection limit values in L2 header: take the ones (1h) from limits_of_detection.yaml * [from Leïla] Ebas converter. Update Detection limit values in L2 header: take the ones (1h) from limits_of_detection.yaml
* [from Leïla] For PAY, calculate error as 50% of concentration * [from Leïla] Ebas converter. Correct errors (uncertainties) that they can't be lower than 0.0001
* [from Leïla] Correct errors (uncertainties) that they can't be lower than 0.0001 * [from Leïla] Flag using validity threshold. Change flow rate values to 10% of flow rate ref.
* [from Leïla] Change flow rate values to 10% of flow rate ref * [from Leïla] In Step 1. Creation of a new collection should be an option and not automatic.
* [New] Create data flows to validate and replace existing data chain params. data/campaignData/param/ -> pipelines/params and campaignDescriptor.yaml -> acsm data converter. * [from Leïla] The data chain (except Step 5) should also include Org_mx and Ord_err. To discuss together.
* [New] DIMA. Add dictionaries to explain variables at different levels. * [from Leïla] Step 4.1. Add a step to verify ACSM data with external instruments (MPSS, eBC). To discuss together.
* [New] DIMA. Modify data integration pipeline to not add current datetime in filename when not specified. * [New] Create data flows to validate and replace existing data chain params. data/campaignData/param/ -> pipelines/params and campaignDescriptor.yaml -> acsm data converter.
* [New] DIMA. Set data/ folder as default and only possible output folder * [New] DIMA. Add dictionaries to explain variables at different levels.
* [New] DIMA. Review DataManager File Update. Generated CSV files are not transfering properly. * [New] DIMA. Modify data integration pipeline to not add current datetime in filename when not specified.
* [New] DIMA. Ensure code snippets that open and close HDF5 files do so securly and do not leave the file locked * [New] DIMA. Set data/ folder as default and only possible output folder
* [New] DIMA. Revise default csv file reader and enhance it to infer datetime column and format. * [New] DIMA. Review DataManager File Update. Generated CSV files are not transfering properly.
* [New] Learn how to run docker-compose.yaml * [New] DIMA. Ensure code snippets that open and close HDF5 files do so securly and do not leave the file locked
* [New] EBAS_SUBMISSION_STEP. Extend time filter to ranges, create a merge data frame function, and construct file name and output dir dynammically. It is currently hardcoded. * [New] DIMA. Revise default csv file reader and enhance it to infer datetime column and format.
* [New] Finish command line interface for visualize_datatable_vars and add modes, --flags, --dataset, and save to figures folder in repo * [New] Learn how to run docker-compose.yaml
* Implement flagging-app specific data operations such as: * [New] EBAS_SUBMISSION_STEP. Extend time filter to ranges, create a merge data frame function, and construct file name and output dir dynammically. It is currently hardcoded.
1. [New item] When verify flags from checklist is active, enable delete-flag button to delete flag associated with active cell on table. * [New] Finish command line interface for visualize_datatable_vars and add modes, --flags, --dataset, and save to figures folder in repo
2. [New item] When verify and ready to trasnfer items on checklist are active, enable record-flags button to record verified flags into the HDF5 file. * Implement flagging-app specific data operations such as:
3. [New item] When all checklist items active, enable apply button to apply flags to the time series data and save it to the HDF5 file. 1. [New item] When verify flags from checklist is active, enable delete-flag button to delete flag associated with active cell on table.
1. ~~Define data manager obj with apply flags behavior.~~ 2. [New item] When verify and ready to trasnfer items on checklist are active, enable record-flags button to record verified flags into the HDF5 file.
2. Define metadata answering who did the flagging and quality assurance tests? 3. [New item] When all checklist items active, enable apply button to apply flags to the time series data and save it to the HDF5 file.
3. Update intruments/dictionaries/ACSM_TOFWARE_flags.yaml and instruments/readers/flag_reader.py to describe metadata elements based on dictionary. 1. ~~Define data manager obj with apply flags behavior.~~
4. ~~Update DIMA data integration pipeline to allowed user-defined file naming template~~ 2. Define metadata answering who did the flagging and quality assurance tests?
5. ~~Design and implement flag visualization feature: click flag on table and display on figure shaded region when feature is enabled~~ 3. Update intruments/dictionaries/ACSM_TOFWARE_flags.yaml and instruments/readers/flag_reader.py to describe metadata elements based on dictionary.
6. Implement schema validator on yaml/json representation of hdf5 metadata 4. ~~Update DIMA data integration pipeline to allowed user-defined file naming template~~
7. Implement updates to 'actris level' and 'processing_script' after operation applied to data/file? 5. ~~Design and implement flag visualization feature: click flag on table and display on figure shaded region when feature is enabled~~
6. Implement schema validator on yaml/json representation of hdf5 metadata
* ~~When `Create Flag` is clicked, modify the title to indicate that we are in flagging mode and ROIs can be drawn by dragging.~~ 7. Implement updates to 'actris level' and 'processing_script' after operation applied to data/file?
* ~~Update `Commit Flag` logic:~~ * ~~When `Create Flag` is clicked, modify the title to indicate that we are in flagging mode and ROIs can be drawn by dragging.~~
~~3. Update recorded flags directory, and add provenance information to each flag (which instrument and channel belongs to).~~
* ~~Update `Commit Flag` logic:~~
* Record collected flag information initially in a YAML or JSON file. Is this faster than writing directly to the HDF5 file? ~~3. Update recorded flags directory, and add provenance information to each flag (which instrument and channel belongs to).~~
* Should we actively transfer collected flags by clicking a button? after commit button is pressed, each flag is now stored in an independent json file * Record collected flag information initially in a YAML or JSON file. Is this faster than writing directly to the HDF5 file?
* Enable some form of chunk storage and visualization from the HDF5 file. Iterate over chunks for faster display versus access time. * Should we actively transfer collected flags by clicking a button? after commit button is pressed, each flag is now stored in an independent json file
1. Do I need to modify DIMA?
2. What is a good chunk size? * Enable some form of chunk storage and visualization from the HDF5 file. Iterate over chunks for faster display versus access time.
3. What Dash component can we use to iterate over the chunks? 1. Do I need to modify DIMA?
2. What is a good chunk size?
![Screenshot](figures/flagging_app_screenshot.JPG) 3. What Dash component can we use to iterate over the chunks?
![Screenshot](figures/flagging_app_screenshot.JPG)

View File

@ -1,233 +1,233 @@
import os import os
import json, yaml import json, yaml
import numpy as np import numpy as np
import pandas as pd import pandas as pd
def record_data_lineage(path_to_output_file, projectPath, metadata): def record_data_lineage(path_to_output_file, projectPath, metadata):
path_to_output_dir, output_file = os.path.split(path_to_output_file) path_to_output_dir, output_file = os.path.split(path_to_output_file)
path_to_metadata_file = '/'.join([path_to_output_dir,'data_lineage_metadata.json']) path_to_metadata_file = '/'.join([path_to_output_dir,'data_lineage_metadata.json'])
# Ensure the file exists # Ensure the file exists
if not os.path.exists(path_to_metadata_file): if not os.path.exists(path_to_metadata_file):
with open(path_to_metadata_file, 'w') as f: with open(path_to_metadata_file, 'w') as f:
json.dump({}, f) # Initialize empty JSON json.dump({}, f) # Initialize empty JSON
# Read the existing JSON # Read the existing JSON
with open(path_to_metadata_file, 'r') as metadata_file: with open(path_to_metadata_file, 'r') as metadata_file:
try: try:
json_dict = json.load(metadata_file) json_dict = json.load(metadata_file)
except json.JSONDecodeError: except json.JSONDecodeError:
json_dict = {} # Start fresh if file is invalid json_dict = {} # Start fresh if file is invalid
# Compute relative output file path and update the JSON object # Compute relative output file path and update the JSON object
#grelpath_to_output_file = os.path.relpath(path_to_output_file, start=projectPath).replace(os.sep, '/') #grelpath_to_output_file = os.path.relpath(path_to_output_file, start=projectPath).replace(os.sep, '/')
json_dict[output_file] = metadata json_dict[output_file] = metadata
# Write updated JSON back to the file # Write updated JSON back to the file
with open(path_to_metadata_file, 'w') as metadata_file: with open(path_to_metadata_file, 'w') as metadata_file:
json.dump(json_dict, metadata_file, indent=4) json.dump(json_dict, metadata_file, indent=4)
print(f"Metadata for calibrated data saved to {path_to_metadata_file}") print(f"Metadata for calibrated data saved to {path_to_metadata_file}")
return 0 return 0
def get_metadata(path_to_file): def get_metadata(path_to_file):
path, filename = os.path.split(path_to_file) path, filename = os.path.split(path_to_file)
path_to_metadata = None path_to_metadata = None
for item in os.listdir(path): for item in os.listdir(path):
if 'metadata.json' in item: if 'metadata.json' in item:
path_to_metadata = os.path.normpath(os.path.join(path,item)) path_to_metadata = os.path.normpath(os.path.join(path,item))
metadata = {} metadata = {}
if path_to_file: if path_to_file:
with open(path_to_metadata,'r') as stream: with open(path_to_metadata,'r') as stream:
metadata = json.load(stream) metadata = json.load(stream)
metadata = metadata.get(filename,{}) metadata = metadata.get(filename,{})
return metadata return metadata
import numpy as np import numpy as np
import numpy as np import numpy as np
def generate_missing_value_code(max_val, num_decimals): def generate_missing_value_code(max_val, num_decimals):
""" """
Generate the largest all-9s missing value that can be represented exactly by float. Generate the largest all-9s missing value that can be represented exactly by float.
Caps total digits to 16 to avoid rounding. Caps total digits to 16 to avoid rounding.
Args: Args:
max_val (float): Largest expected valid value in the column. max_val (float): Largest expected valid value in the column.
num_decimals (int): Number of decimal places to preserve. num_decimals (int): Number of decimal places to preserve.
Returns: Returns:
float: The missing value code. float: The missing value code.
""" """
MAX_SIGNIFICANT_DIGITS = 16 MAX_SIGNIFICANT_DIGITS = 16
# Calculate order of magnitude (roughly digits before decimal) # Calculate order of magnitude (roughly digits before decimal)
order = int(np.floor(np.log10(max_val))) + 2 if max_val > 0 else 2 order = int(np.floor(np.log10(max_val))) + 2 if max_val > 0 else 2
# Cap total digits at 16 to avoid float rounding # Cap total digits at 16 to avoid float rounding
total_digits = order + num_decimals total_digits = order + num_decimals
if total_digits > MAX_SIGNIFICANT_DIGITS: if total_digits > MAX_SIGNIFICANT_DIGITS:
# Reduce integer digits first to keep decimals if possible # Reduce integer digits first to keep decimals if possible
int_digits = max(MAX_SIGNIFICANT_DIGITS - num_decimals, 1) int_digits = max(MAX_SIGNIFICANT_DIGITS - num_decimals, 1)
dec_digits = min(num_decimals, MAX_SIGNIFICANT_DIGITS - int_digits) dec_digits = min(num_decimals, MAX_SIGNIFICANT_DIGITS - int_digits)
else: else:
int_digits = order int_digits = order
dec_digits = num_decimals dec_digits = num_decimals
# Construct the missing code string # Construct the missing code string
if dec_digits > 0: if dec_digits > 0:
int_part = '9' * int_digits int_part = '9' * int_digits
dec_part = '9' * dec_digits dec_part = '9' * dec_digits
missing_code_str = f"{int_part}.{dec_part}" missing_code_str = f"{int_part}.{dec_part}"
else: else:
missing_code_str = '9' * int_digits missing_code_str = '9' * int_digits
missing_code = float(missing_code_str) missing_code = float(missing_code_str)
return missing_code return missing_code
import math import math
import numpy as np import numpy as np
def compute_uncertainty_estimate(x, x_err): def compute_uncertainty_estimate(x, x_err):
""" """
Computes uncertainty estimate: sqrt((0.5 * x_err)^2 + (0.5 * x)^2) Computes uncertainty estimate: sqrt((x_err)^2 + (0.5 * x)^2)
for scalar inputs. Prints errors if inputs are invalid. for scalar inputs. Prints errors if inputs are invalid.
""" """
try: try:
x = float(x) x = float(x)
x_err = float(x_err) x_err = float(x_err)
if math.isnan(x) or math.isnan(x_err): if math.isnan(x) or math.isnan(x_err):
print(f"Warning: One or both inputs are NaN -> x: {x}, x_err: {x_err}") print(f"Warning: One or both inputs are NaN -> x: {x}, x_err: {x_err}")
return np.nan return np.nan
return math.sqrt((0.5 * x_err)**2 + (0.5 * x)**2) return math.sqrt((x_err)**2 + (0.5 * x)**2)
except (ValueError, TypeError) as e: except (ValueError, TypeError) as e:
print(f"Error computing uncertainty for x: {x}, x_err: {x_err} -> {e}") print(f"Error computing uncertainty for x: {x}, x_err: {x_err} -> {e}")
return np.nan return np.nan
def generate_error_dataframe(df: pd.DataFrame, datetime_var): def generate_error_dataframe(df: pd.DataFrame, datetime_var):
""" """
Generates an error DataFrame by filling numeric 'correct' columns with a missing value code. Generates an error DataFrame by filling numeric 'correct' columns with a missing value code.
Parameters Parameters
---------- ----------
df : pd.DataFrame df : pd.DataFrame
Input DataFrame containing numerical columns. Input DataFrame containing numerical columns.
datetime_var : str datetime_var : str
Name of the datetime column to retain. Name of the datetime column to retain.
Returns Returns
------- -------
pd.DataFrame pd.DataFrame
DataFrame with error values based on missing value codes. DataFrame with error values based on missing value codes.
""" """
df_numeric = df.select_dtypes(include=np.number) df_numeric = df.select_dtypes(include=np.number)
err_df_columns = [] err_df_columns = []
err_df_values = [] err_df_values = []
# Correct way to filter columns containing 'correct' # Correct way to filter columns containing 'correct'
correct_cols = [col for col in df_numeric.columns if 'correct' in col] correct_cols = [col for col in df_numeric.columns if 'correct' in col]
for col in correct_cols: for col in correct_cols:
missing_value_code = generate_missing_value_code(df[col].max(skipna=True), 4) missing_value_code = generate_missing_value_code(df[col].max(skipna=True), 4)
err_df_values.append(missing_value_code) err_df_values.append(missing_value_code)
err_df_columns.append(f"{col}_err") err_df_columns.append(f"{col}_err")
# Fix np.matmul usage and reshape err_df_values correctly # Fix np.matmul usage and reshape err_df_values correctly
err_matrix = np.tile(np.array(err_df_values), (len(df),1)) # np.ones((len(df), len(err_df_values))) * np.array(err_df_values) err_matrix = np.tile(np.array(err_df_values), (len(df),1)) # np.ones((len(df), len(err_df_values))) * np.array(err_df_values)
df_err = pd.DataFrame(data=err_matrix, columns=err_df_columns) df_err = pd.DataFrame(data=err_matrix, columns=err_df_columns)
# Ensure datetime_var exists in df before assignment # Ensure datetime_var exists in df before assignment
if datetime_var in df.columns: if datetime_var in df.columns:
df_err[datetime_var] = df[datetime_var].values df_err[datetime_var] = df[datetime_var].values
else: else:
raise ValueError(f"Column '{datetime_var}' not found in DataFrame") raise ValueError(f"Column '{datetime_var}' not found in DataFrame")
return df_err return df_err
import numpy as np import numpy as np
import pandas as pd import pandas as pd
def metadata_dict_to_dataframe(metadata: dict, shape: tuple): def metadata_dict_to_dataframe(metadata: dict, shape: tuple):
""" """
Converts a metadata dictionary into a repeated data table. Converts a metadata dictionary into a repeated data table.
Parameters Parameters
---------- ----------
metadata : dict metadata : dict
Dictionary containing metadata where keys are column names and values are repeated across rows. Dictionary containing metadata where keys are column names and values are repeated across rows.
shape : tuple shape : tuple
Shape of the output DataFrame (rows, columns). The number of columns must match the length of `metadata`. Shape of the output DataFrame (rows, columns). The number of columns must match the length of `metadata`.
Returns Returns
------- -------
pd.DataFrame pd.DataFrame
DataFrame with metadata values repeated according to the specified shape. DataFrame with metadata values repeated according to the specified shape.
""" """
# Ensure shape is valid (rows, columns) # Ensure shape is valid (rows, columns)
rows, cols = shape rows, cols = shape
if cols != len(metadata): if cols != len(metadata):
raise ValueError(f"Shape mismatch: {cols} columns expected, but metadata has {len(metadata)} keys.") raise ValueError(f"Shape mismatch: {cols} columns expected, but metadata has {len(metadata)} keys.")
# Extract metadata values and reshape them properly # Extract metadata values and reshape them properly
values = np.array(list(metadata.values())).reshape((1,cols)) values = np.array(list(metadata.values())).reshape((1,cols))
# Tile the values to match the desired shape # Tile the values to match the desired shape
data_table = np.tile(values, (rows, 1)) data_table = np.tile(values, (rows, 1))
# Create DataFrame with correct column names # Create DataFrame with correct column names
df = pd.DataFrame(data=data_table, columns=list(metadata.keys())) df = pd.DataFrame(data=data_table, columns=list(metadata.keys()))
return df return df
def resolve_project_path(): def resolve_project_path():
try: try:
thisFilePath = os.path.abspath(__file__) thisFilePath = os.path.abspath(__file__)
except NameError: except NameError:
thisFilePath = os.getcwd() thisFilePath = os.getcwd()
return os.path.normpath(os.path.join(thisFilePath, "..", "..", "..")) return os.path.normpath(os.path.join(thisFilePath, "..", "..", ".."))
def load_project_yaml_files(projectPath : str, filename : str): def load_project_yaml_files(projectPath : str, filename : str):
allowed_filenames = ['acsm_to_ebas.yaml', 'calibration_params.yaml', 'calibration_factors.yaml', 'limits_of_detection.yaml', 'station_params.yaml', 'validity_thresholds.yaml', 'campaignDescriptor.yaml'] allowed_filenames = ['acsm_to_ebas.yaml', 'calibration_params.yaml', 'calibration_factors.yaml', 'limits_of_detection.yaml', 'station_params.yaml', 'validity_thresholds.yaml', 'campaignDescriptor.yaml']
if not filename in allowed_filenames: if not filename in allowed_filenames:
raise ValueError(f'Invalid filename : {filename}. The filename should be selected from the following list {allowed_filenames}.') raise ValueError(f'Invalid filename : {filename}. The filename should be selected from the following list {allowed_filenames}.')
filename_to_relpath = {"acsm_to_ebas.yaml":"pipelines/dictionaries/acsm_to_ebas.yaml", filename_to_relpath = {"acsm_to_ebas.yaml":"pipelines/dictionaries/acsm_to_ebas.yaml",
"calibration_params.yaml":"pipelines/params/calibration_params.yaml", "calibration_params.yaml":"pipelines/params/calibration_params.yaml",
"calibration_factors.yaml" : "pipelines/params/calibration_factors.yaml", "calibration_factors.yaml" : "pipelines/params/calibration_factors.yaml",
"limits_of_detection.yaml":"pipelines/params/limits_of_detection.yaml", "limits_of_detection.yaml":"pipelines/params/limits_of_detection.yaml",
"station_params.yaml":"pipelines/params/station_params.yaml", "station_params.yaml":"pipelines/params/station_params.yaml",
"validity_thresholds.yaml":"pipelines/params/validity_thresholds.yaml", "validity_thresholds.yaml":"pipelines/params/validity_thresholds.yaml",
"campaignDescriptor.yaml":"campaignDescriptor.yaml"} "campaignDescriptor.yaml":"campaignDescriptor.yaml"}
# Implicit input # Implicit input
if filename_to_relpath.get(filename,None): if filename_to_relpath.get(filename,None):
dict_file = os.path.normpath(os.path.join(projectPath,filename_to_relpath[filename])) dict_file = os.path.normpath(os.path.join(projectPath,filename_to_relpath[filename]))
output_dict = {} output_dict = {}
try: try:
with open(dict_file, 'r') as stream: with open(dict_file, 'r') as stream:
output_dict = yaml.load(stream, Loader=yaml.FullLoader) output_dict = yaml.load(stream, Loader=yaml.FullLoader)
except Exception as e: except Exception as e:
print(f'Error loading {dict_file}: {e}') print(f'Error loading {dict_file}: {e}')
return {} return {}
return output_dict return output_dict