Files
acsmnode/pipelines/steps/prepare_ebas_submission.py

234 lines
9.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import sys, os
try:
thisFilePath = os.path.abspath(__file__)
print(thisFilePath)
except NameError:
print("[Notice] The __file__ attribute is unavailable in this environment (e.g., Jupyter or IDLE).")
print("When using a terminal, make sure the working directory is set to the script's location to prevent path issues (for the DIMA submodule)")
#print("Otherwise, path to submodule DIMA may not be resolved properly.")
thisFilePath = os.getcwd() # Use current directory or specify a default
projectPath = os.path.normpath(os.path.join(thisFilePath, "..", "..",'..')) # Move up to project root
if projectPath not in sys.path:
sys.path.insert(0,projectPath)
import argparse
import pandas as pd
import json, yaml
import numpy as np
from pipelines.steps.utils import get_metadata
from pipelines.steps.utils import metadata_dict_to_dataframe
from pipelines.steps.utils import load_project_yaml_files
def join_tables(csv_files: list):
"""
Joins multiple CSV files based on their metadata-defined datetime column.
Parameters
----------
csv_files : list
List of paths to CSV files.
Returns
-------
pd.DataFrame
Merged DataFrame.
"""
if not all(isinstance(item, str) for item in csv_files):
raise TypeError(f"Invalid parameter. csv_files contain non-str items: {[item for item in csv_files if not isinstance(item, str)]}")
if not all(os.path.exists(item) and item.endswith('.csv') for item in csv_files):
raise RuntimeError("Parameter csv_files contains either an unreachable/broken path or a non-CSV file.")
acum_df = pd.read_csv(csv_files[0])
left_datetime_var = get_metadata(csv_files[0]).get('datetime_var', None)
acum_df = acum_df.drop_duplicates(subset=[left_datetime_var])
if left_datetime_var is None:
raise ValueError(f"Missing datetime_var metadata in {csv_files[0]}")
for idx in range(1, len(csv_files)):
append_df = pd.read_csv(csv_files[idx])
right_datetime_var = get_metadata(csv_files[idx]).get('datetime_var', None)
if right_datetime_var is None:
raise ValueError(f"Missing datetime_var metadata in {csv_files[idx]}")
append_df = append_df.drop_duplicates(subset=[right_datetime_var])
acum_df = acum_df.merge(append_df, left_on=left_datetime_var, right_on=right_datetime_var, how='left')
return acum_df
#import argparse
#import os
#import pandas as pd
from third_party.acsmProcessingSoftware.src import rawto012
#from utils import load_project_yaml_files, metadata_dict_to_dataframe, join_tables # Adjust imports based on actual file locations
def validate_required_field(dct, key):
value = dct.get(key, None)
if not value:
raise ValueError(f'[ERROR] Required field "{key}" is missing or empty in campaignDescriptor.yaml')
return value
def parse_months(month_str: str) -> list:
"""
Convert a string like '1,3,5-7' into a list of valid month integers [112].
Raises ValueError if any value is out of range.
"""
months = set()
for part in month_str.split(','):
part = part.strip()
if '-' in part:
try:
start, end = map(int, part.split('-'))
if not (1 <= start <= 12 and 1 <= end <= 12):
raise ValueError(f"Month range {start}-{end} out of bounds (112)")
months.update(range(start, end + 1))
except Exception:
raise ValueError(f"Invalid range format: '{part}'")
else:
try:
val = int(part)
if not 1 <= val <= 12:
raise ValueError(f"Month {val} is out of bounds (112)")
months.add(val)
except ValueError:
raise ValueError(f"Invalid month value: '{part}'")
return sorted(months)
def main(paths_to_processed_files : list, path_to_flags : str, month : int = None):
# Set up argument parsing
acum_df = join_tables(paths_to_processed_files)
acsm_to_ebas = load_project_yaml_files(projectPath, "acsm_to_ebas.yaml")
# Select variables that are both in the acsm_to_ebas dict and acum_df
reduced_set_of_vars = [key for key in acum_df.columns if key in acsm_to_ebas['renaming_map'].keys()]
acum_df = acum_df.loc[:, reduced_set_of_vars].rename(columns=acsm_to_ebas['renaming_map'])
flags_acum_df = join_tables([path_to_flags])
flags_acum_df = flags_acum_df.rename(columns=acsm_to_ebas['renaming_map'])
# Ensure time columns are datetime
acum_df['ACSM_time'] = pd.to_datetime(acum_df['ACSM_time'])
flags_acum_df['ACSM_time'] = pd.to_datetime(flags_acum_df['ACSM_time'])
# Apply month filter if specified
#if month:
# acum_df = acum_df[acum_df['ACSM_time'].dt.month == month]
# flags_acum_df = flags_acum_df[flags_acum_df['ACSM_time'].dt.month == month]
# Apply month filtering if specified
if month:
try:
month_list = parse_months(month)
except Exception as e:
raise ValueError(f"[ERROR] Could not parse month input '{month}': {e}")
acum_df = acum_df[acum_df['ACSM_time'].dt.month.isin(month_list)]
flags_acum_df = flags_acum_df[flags_acum_df['ACSM_time'].dt.month.isin(month_list)]
# Count the number of NaT (null) values
num_nats = acum_df['ACSM_time'].isna().sum()
total_rows = len(acum_df)
percentage_nats = (num_nats / total_rows) * 100
print(f"Total rows: {total_rows}")
print(f"NaT (missing) values: {num_nats}")
print(f"Percentage of data loss: {percentage_nats:.2f}%")
num_nats = flags_acum_df['ACSM_time'].isna().sum()
total_rows = len(flags_acum_df)
percentage_nats = (num_nats / total_rows) * 100
print(f"Total rows: {total_rows}")
print(f"NaT (missing) values: {num_nats}")
print(f"Percentage of data loss: {percentage_nats:.2f}%")
nat_acum = acum_df['ACSM_time'].isna()
nat_flags = flags_acum_df['ACSM_time'].isna()
valid_rows = ~(nat_acum | nat_flags) # Compute valid rows in one step
# Load YAML files
detection_limits = load_project_yaml_files(projectPath, "limits_of_detection.yaml")
station_params = load_project_yaml_files(projectPath, "station_params.yaml")
# Extract dictionaries from required keys
lod_dict = detection_limits.get('LOD', {}).get('variables', {})
jfj_dict = station_params.get('stations', {}).get('JFJ', {})
# Convert dictionaries to DataFrames using the existing function
lod_df = metadata_dict_to_dataframe(lod_dict, shape=(len(acum_df), len(lod_dict)))
jfj_df = metadata_dict_to_dataframe(jfj_dict, shape=(len(acum_df), len(jfj_dict)))
# Ensure indexes are properly aligned for merging
acum_df = acum_df.reset_index() # Convert index to a column for merging
# Merge with LOD DataFrame
acum_df = acum_df.merge(lod_df, left_index=True, right_index=True, how='left')
# Merge with JFJ DataFrame
acum_df = acum_df.merge(jfj_df, left_index=True, right_index=True, how='left')
acum_df = acum_df.rename(columns=acsm_to_ebas['renaming_map'])
# Load descriptor
campaignDescriptorDict = load_project_yaml_files(projectPath, 'campaignDescriptor.yaml')
# Validate required fields
STATION_ABBR = validate_required_field(campaignDescriptorDict, 'station_abbr')
instrument_name = validate_required_field(campaignDescriptorDict, 'instrument_name')
year = validate_required_field(campaignDescriptorDict, 'year')
# Build output paths
output_dir = os.path.join(projectPath, 'data')
os.makedirs(output_dir, exist_ok=True)
output_file1 = os.path.join(output_dir, f'{STATION_ABBR}_{instrument_name}_{year}.txt')
output_file2 = os.path.join(output_dir, f'{STATION_ABBR}_{instrument_name}_FLAGS_{year}.txt')
#output_file1 = os.path.join(output_dir, f'JFJ_ACSM-017_2024_month{args.month}.txt' if args.month else 'JFJ_ACSM-017_2024.txt')
#output_file2 = os.path.join(output_dir, f'JFJ_ACSM-017_FLAGS_2024_month{args.month}.txt' if args.month else 'JFJ_ACSM-017_FLAGS_2024.txt')
#acum_df = acum_df[[col for col in acsm_to_ebas['column_order'] if col in acum_df.columns]]
#flags_acum_df = flags_acum_df[[col for col in acsm_to_ebas['flags_column_order'] if col in flags_acum_df.columns]]
acum_df.loc[:, :].to_csv(output_file1, sep='\t', index=None, date_format="%Y/%m/%d %H:%M:%S")
flags_acum_df.loc[:, :].to_csv(output_file2, sep='\t', index=None, date_format="%Y/%m/%d %H:%M:%S")
# Run external processing application
app = rawto012.Application()
infile = output_file1
acq_err_log = output_file2
outdir = output_dir
app.process(infile, acq_err_log, outdir=outdir)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="Process and calibrate ACSM data for JFJ station.")
parser.add_argument('--acsm_paths', type=str, required=True, nargs=3, help="Paths to the ACSM timeseries calibrated CSV file, the error CSV file, and the calibration factors CSV file.")
parser.add_argument('--acsm_flags_path', type=str, required=True, help="Path to the ACSM flags CSV file.")
parser.add_argument(
'--month',
type=str,
help="Filter data for specific months using comma-separated values and ranges. Ex: '1,3,5-7'"
)
args = parser.parse_args()
# Load data
csv_files = args.acsm_paths # list of filenames
flags_file = args.acsm_flags_path
month = args.month
main(csv_files, flags_file, month)