mirror of
https://gitea.psi.ch/APOG/acsmnode.git
synced 2025-06-29 04:40:48 +02:00
Update pipelines/steps/prepare_ebas_submission.py. Rmoved hard coded paths and build output name using metadata from campaign descriptor. Also, we can now specify month ranges
This commit is contained in:
@ -1,172 +1,233 @@
|
|||||||
import sys, os
|
import sys, os
|
||||||
|
|
||||||
try:
|
try:
|
||||||
thisFilePath = os.path.abspath(__file__)
|
thisFilePath = os.path.abspath(__file__)
|
||||||
print(thisFilePath)
|
print(thisFilePath)
|
||||||
except NameError:
|
except NameError:
|
||||||
print("[Notice] The __file__ attribute is unavailable in this environment (e.g., Jupyter or IDLE).")
|
print("[Notice] The __file__ attribute is unavailable in this environment (e.g., Jupyter or IDLE).")
|
||||||
print("When using a terminal, make sure the working directory is set to the script's location to prevent path issues (for the DIMA submodule)")
|
print("When using a terminal, make sure the working directory is set to the script's location to prevent path issues (for the DIMA submodule)")
|
||||||
#print("Otherwise, path to submodule DIMA may not be resolved properly.")
|
#print("Otherwise, path to submodule DIMA may not be resolved properly.")
|
||||||
thisFilePath = os.getcwd() # Use current directory or specify a default
|
thisFilePath = os.getcwd() # Use current directory or specify a default
|
||||||
|
|
||||||
|
|
||||||
projectPath = os.path.normpath(os.path.join(thisFilePath, "..", "..",'..')) # Move up to project root
|
projectPath = os.path.normpath(os.path.join(thisFilePath, "..", "..",'..')) # Move up to project root
|
||||||
|
|
||||||
if projectPath not in sys.path:
|
if projectPath not in sys.path:
|
||||||
sys.path.insert(0,projectPath)
|
sys.path.insert(0,projectPath)
|
||||||
|
|
||||||
import argparse
|
import argparse
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
import json, yaml
|
import json, yaml
|
||||||
import numpy as np
|
import numpy as np
|
||||||
from pipelines.steps.utils import get_metadata
|
from pipelines.steps.utils import get_metadata
|
||||||
from pipelines.steps.utils import metadata_dict_to_dataframe
|
from pipelines.steps.utils import metadata_dict_to_dataframe
|
||||||
from pipelines.steps.utils import load_project_yaml_files
|
from pipelines.steps.utils import load_project_yaml_files
|
||||||
|
|
||||||
|
|
||||||
def join_tables(csv_files: list):
|
def join_tables(csv_files: list):
|
||||||
"""
|
"""
|
||||||
Joins multiple CSV files based on their metadata-defined datetime column.
|
Joins multiple CSV files based on their metadata-defined datetime column.
|
||||||
|
|
||||||
Parameters
|
Parameters
|
||||||
----------
|
----------
|
||||||
csv_files : list
|
csv_files : list
|
||||||
List of paths to CSV files.
|
List of paths to CSV files.
|
||||||
|
|
||||||
Returns
|
Returns
|
||||||
-------
|
-------
|
||||||
pd.DataFrame
|
pd.DataFrame
|
||||||
Merged DataFrame.
|
Merged DataFrame.
|
||||||
"""
|
"""
|
||||||
if not all(isinstance(item, str) for item in csv_files):
|
if not all(isinstance(item, str) for item in csv_files):
|
||||||
raise TypeError(f"Invalid parameter. csv_files contain non-str items: {[item for item in csv_files if not isinstance(item, str)]}")
|
raise TypeError(f"Invalid parameter. csv_files contain non-str items: {[item for item in csv_files if not isinstance(item, str)]}")
|
||||||
|
|
||||||
if not all(os.path.exists(item) and item.endswith('.csv') for item in csv_files):
|
if not all(os.path.exists(item) and item.endswith('.csv') for item in csv_files):
|
||||||
raise RuntimeError("Parameter csv_files contains either an unreachable/broken path or a non-CSV file.")
|
raise RuntimeError("Parameter csv_files contains either an unreachable/broken path or a non-CSV file.")
|
||||||
|
|
||||||
acum_df = pd.read_csv(csv_files[0])
|
acum_df = pd.read_csv(csv_files[0])
|
||||||
left_datetime_var = get_metadata(csv_files[0]).get('datetime_var', None)
|
left_datetime_var = get_metadata(csv_files[0]).get('datetime_var', None)
|
||||||
acum_df = acum_df.drop_duplicates(subset=[left_datetime_var])
|
acum_df = acum_df.drop_duplicates(subset=[left_datetime_var])
|
||||||
|
|
||||||
if left_datetime_var is None:
|
if left_datetime_var is None:
|
||||||
raise ValueError(f"Missing datetime_var metadata in {csv_files[0]}")
|
raise ValueError(f"Missing datetime_var metadata in {csv_files[0]}")
|
||||||
|
|
||||||
for idx in range(1, len(csv_files)):
|
for idx in range(1, len(csv_files)):
|
||||||
append_df = pd.read_csv(csv_files[idx])
|
append_df = pd.read_csv(csv_files[idx])
|
||||||
right_datetime_var = get_metadata(csv_files[idx]).get('datetime_var', None)
|
right_datetime_var = get_metadata(csv_files[idx]).get('datetime_var', None)
|
||||||
|
|
||||||
if right_datetime_var is None:
|
if right_datetime_var is None:
|
||||||
raise ValueError(f"Missing datetime_var metadata in {csv_files[idx]}")
|
raise ValueError(f"Missing datetime_var metadata in {csv_files[idx]}")
|
||||||
|
|
||||||
append_df = append_df.drop_duplicates(subset=[right_datetime_var])
|
append_df = append_df.drop_duplicates(subset=[right_datetime_var])
|
||||||
acum_df = acum_df.merge(append_df, left_on=left_datetime_var, right_on=right_datetime_var, how='left')
|
acum_df = acum_df.merge(append_df, left_on=left_datetime_var, right_on=right_datetime_var, how='left')
|
||||||
|
|
||||||
return acum_df
|
return acum_df
|
||||||
|
|
||||||
#import argparse
|
#import argparse
|
||||||
#import os
|
#import os
|
||||||
#import pandas as pd
|
#import pandas as pd
|
||||||
from third_party.acsmProcessingSoftware.src import rawto012
|
from third_party.acsmProcessingSoftware.src import rawto012
|
||||||
#from utils import load_project_yaml_files, metadata_dict_to_dataframe, join_tables # Adjust imports based on actual file locations
|
#from utils import load_project_yaml_files, metadata_dict_to_dataframe, join_tables # Adjust imports based on actual file locations
|
||||||
|
|
||||||
def main(paths_to_processed_files : list, path_to_flags : str, month : int = None):
|
def validate_required_field(dct, key):
|
||||||
# Set up argument parsing
|
value = dct.get(key, None)
|
||||||
|
if not value:
|
||||||
acum_df = join_tables(paths_to_processed_files)
|
raise ValueError(f'[ERROR] Required field "{key}" is missing or empty in campaignDescriptor.yaml')
|
||||||
acsm_to_ebas = load_project_yaml_files(projectPath, "acsm_to_ebas.yaml")
|
return value
|
||||||
|
|
||||||
# Select variables that are both in the acsm_to_ebas dict and acum_df
|
def parse_months(month_str: str) -> list:
|
||||||
reduced_set_of_vars = [key for key in acum_df.columns if key in acsm_to_ebas['renaming_map'].keys()]
|
"""
|
||||||
acum_df = acum_df.loc[:, reduced_set_of_vars].rename(columns=acsm_to_ebas['renaming_map'])
|
Convert a string like '1,3,5-7' into a list of valid month integers [1–12].
|
||||||
|
Raises ValueError if any value is out of range.
|
||||||
flags_acum_df = join_tables([path_to_flags])
|
"""
|
||||||
flags_acum_df = flags_acum_df.rename(columns=acsm_to_ebas['renaming_map'])
|
months = set()
|
||||||
|
for part in month_str.split(','):
|
||||||
# Ensure time columns are datetime
|
part = part.strip()
|
||||||
acum_df['ACSM_time'] = pd.to_datetime(acum_df['ACSM_time'])
|
if '-' in part:
|
||||||
flags_acum_df['ACSM_time'] = pd.to_datetime(flags_acum_df['ACSM_time'])
|
try:
|
||||||
|
start, end = map(int, part.split('-'))
|
||||||
# Apply month filter if specified
|
if not (1 <= start <= 12 and 1 <= end <= 12):
|
||||||
if month:
|
raise ValueError(f"Month range {start}-{end} out of bounds (1–12)")
|
||||||
acum_df = acum_df[acum_df['ACSM_time'].dt.month == month]
|
months.update(range(start, end + 1))
|
||||||
flags_acum_df = flags_acum_df[flags_acum_df['ACSM_time'].dt.month == month]
|
except Exception:
|
||||||
|
raise ValueError(f"Invalid range format: '{part}'")
|
||||||
# Count the number of NaT (null) values
|
else:
|
||||||
num_nats = acum_df['ACSM_time'].isna().sum()
|
try:
|
||||||
total_rows = len(acum_df)
|
val = int(part)
|
||||||
percentage_nats = (num_nats / total_rows) * 100
|
if not 1 <= val <= 12:
|
||||||
|
raise ValueError(f"Month {val} is out of bounds (1–12)")
|
||||||
print(f"Total rows: {total_rows}")
|
months.add(val)
|
||||||
print(f"NaT (missing) values: {num_nats}")
|
except ValueError:
|
||||||
print(f"Percentage of data loss: {percentage_nats:.2f}%")
|
raise ValueError(f"Invalid month value: '{part}'")
|
||||||
|
return sorted(months)
|
||||||
num_nats = flags_acum_df['ACSM_time'].isna().sum()
|
|
||||||
total_rows = len(flags_acum_df)
|
|
||||||
percentage_nats = (num_nats / total_rows) * 100
|
def main(paths_to_processed_files : list, path_to_flags : str, month : int = None):
|
||||||
|
# Set up argument parsing
|
||||||
print(f"Total rows: {total_rows}")
|
|
||||||
print(f"NaT (missing) values: {num_nats}")
|
acum_df = join_tables(paths_to_processed_files)
|
||||||
print(f"Percentage of data loss: {percentage_nats:.2f}%")
|
acsm_to_ebas = load_project_yaml_files(projectPath, "acsm_to_ebas.yaml")
|
||||||
|
|
||||||
nat_acum = acum_df['ACSM_time'].isna()
|
# Select variables that are both in the acsm_to_ebas dict and acum_df
|
||||||
nat_flags = flags_acum_df['ACSM_time'].isna()
|
reduced_set_of_vars = [key for key in acum_df.columns if key in acsm_to_ebas['renaming_map'].keys()]
|
||||||
|
acum_df = acum_df.loc[:, reduced_set_of_vars].rename(columns=acsm_to_ebas['renaming_map'])
|
||||||
valid_rows = ~(nat_acum | nat_flags) # Compute valid rows in one step
|
|
||||||
|
flags_acum_df = join_tables([path_to_flags])
|
||||||
# Load YAML files
|
flags_acum_df = flags_acum_df.rename(columns=acsm_to_ebas['renaming_map'])
|
||||||
detection_limits = load_project_yaml_files(projectPath, "limits_of_detection.yaml")
|
|
||||||
station_params = load_project_yaml_files(projectPath, "station_params.yaml")
|
# Ensure time columns are datetime
|
||||||
|
acum_df['ACSM_time'] = pd.to_datetime(acum_df['ACSM_time'])
|
||||||
# Extract dictionaries from required keys
|
flags_acum_df['ACSM_time'] = pd.to_datetime(flags_acum_df['ACSM_time'])
|
||||||
lod_dict = detection_limits.get('LOD', {}).get('variables', {})
|
|
||||||
jfj_dict = station_params.get('stations', {}).get('JFJ', {})
|
# Apply month filter if specified
|
||||||
|
#if month:
|
||||||
# Convert dictionaries to DataFrames using the existing function
|
# acum_df = acum_df[acum_df['ACSM_time'].dt.month == month]
|
||||||
lod_df = metadata_dict_to_dataframe(lod_dict, shape=(len(acum_df), len(lod_dict)))
|
# flags_acum_df = flags_acum_df[flags_acum_df['ACSM_time'].dt.month == month]
|
||||||
jfj_df = metadata_dict_to_dataframe(jfj_dict, shape=(len(acum_df), len(jfj_dict)))
|
|
||||||
|
# Apply month filtering if specified
|
||||||
# Ensure indexes are properly aligned for merging
|
if month:
|
||||||
acum_df = acum_df.reset_index() # Convert index to a column for merging
|
try:
|
||||||
|
month_list = parse_months(month)
|
||||||
# Merge with LOD DataFrame
|
except Exception as e:
|
||||||
acum_df = acum_df.merge(lod_df, left_index=True, right_index=True, how='left')
|
raise ValueError(f"[ERROR] Could not parse month input '{month}': {e}")
|
||||||
|
|
||||||
# Merge with JFJ DataFrame
|
acum_df = acum_df[acum_df['ACSM_time'].dt.month.isin(month_list)]
|
||||||
acum_df = acum_df.merge(jfj_df, left_index=True, right_index=True, how='left')
|
flags_acum_df = flags_acum_df[flags_acum_df['ACSM_time'].dt.month.isin(month_list)]
|
||||||
|
|
||||||
acum_df = acum_df.rename(columns=acsm_to_ebas['renaming_map'])
|
|
||||||
|
# Count the number of NaT (null) values
|
||||||
# Save results
|
num_nats = acum_df['ACSM_time'].isna().sum()
|
||||||
output_dir = os.path.join(projectPath,'data')
|
total_rows = len(acum_df)
|
||||||
output_file1 = os.path.join(output_dir, 'JFJ_ACSM-017_2024.txt')
|
percentage_nats = (num_nats / total_rows) * 100
|
||||||
output_file2 = os.path.join(output_dir, 'JFJ_ACSM-017_FLAGS_2024.txt')
|
|
||||||
#output_file1 = os.path.join(output_dir, f'JFJ_ACSM-017_2024_month{args.month}.txt' if args.month else 'JFJ_ACSM-017_2024.txt')
|
print(f"Total rows: {total_rows}")
|
||||||
#output_file2 = os.path.join(output_dir, f'JFJ_ACSM-017_FLAGS_2024_month{args.month}.txt' if args.month else 'JFJ_ACSM-017_FLAGS_2024.txt')
|
print(f"NaT (missing) values: {num_nats}")
|
||||||
|
print(f"Percentage of data loss: {percentage_nats:.2f}%")
|
||||||
#acum_df = acum_df[[col for col in acsm_to_ebas['column_order'] if col in acum_df.columns]]
|
|
||||||
#flags_acum_df = flags_acum_df[[col for col in acsm_to_ebas['flags_column_order'] if col in flags_acum_df.columns]]
|
num_nats = flags_acum_df['ACSM_time'].isna().sum()
|
||||||
|
total_rows = len(flags_acum_df)
|
||||||
acum_df.loc[:, :].to_csv(output_file1, sep='\t', index=None, date_format="%Y/%m/%d %H:%M:%S")
|
percentage_nats = (num_nats / total_rows) * 100
|
||||||
flags_acum_df.loc[:, :].to_csv(output_file2, sep='\t', index=None, date_format="%Y/%m/%d %H:%M:%S")
|
|
||||||
|
print(f"Total rows: {total_rows}")
|
||||||
# Run external processing application
|
print(f"NaT (missing) values: {num_nats}")
|
||||||
app = rawto012.Application()
|
print(f"Percentage of data loss: {percentage_nats:.2f}%")
|
||||||
infile = output_file1
|
|
||||||
acq_err_log = output_file2
|
nat_acum = acum_df['ACSM_time'].isna()
|
||||||
outdir = output_dir
|
nat_flags = flags_acum_df['ACSM_time'].isna()
|
||||||
app.process(infile, acq_err_log, outdir=outdir)
|
|
||||||
|
valid_rows = ~(nat_acum | nat_flags) # Compute valid rows in one step
|
||||||
if __name__ == '__main__':
|
|
||||||
|
# Load YAML files
|
||||||
parser = argparse.ArgumentParser(description="Process and calibrate ACSM data for JFJ station.")
|
detection_limits = load_project_yaml_files(projectPath, "limits_of_detection.yaml")
|
||||||
parser.add_argument('--acsm_paths', type=str, required=True, nargs=3, help="Paths to the ACSM timeseries calibrated CSV file, the error CSV file, and the calibration factors CSV file.")
|
station_params = load_project_yaml_files(projectPath, "station_params.yaml")
|
||||||
parser.add_argument('--acsm_flags_path', type=str, required=True, help="Path to the ACSM flags CSV file.")
|
|
||||||
parser.add_argument('--month', type=int, choices=range(1, 13), help="Filter data for a specific month (1-12).")
|
# Extract dictionaries from required keys
|
||||||
|
lod_dict = detection_limits.get('LOD', {}).get('variables', {})
|
||||||
args = parser.parse_args()
|
jfj_dict = station_params.get('stations', {}).get('JFJ', {})
|
||||||
|
|
||||||
# Load data
|
# Convert dictionaries to DataFrames using the existing function
|
||||||
csv_files = args.acsm_paths # list of filenames
|
lod_df = metadata_dict_to_dataframe(lod_dict, shape=(len(acum_df), len(lod_dict)))
|
||||||
flags_file = args.acsm_flags_path
|
jfj_df = metadata_dict_to_dataframe(jfj_dict, shape=(len(acum_df), len(jfj_dict)))
|
||||||
month = args.month
|
|
||||||
main(csv_files, flags_file, month)
|
# Ensure indexes are properly aligned for merging
|
||||||
|
acum_df = acum_df.reset_index() # Convert index to a column for merging
|
||||||
|
|
||||||
|
# Merge with LOD DataFrame
|
||||||
|
acum_df = acum_df.merge(lod_df, left_index=True, right_index=True, how='left')
|
||||||
|
|
||||||
|
# Merge with JFJ DataFrame
|
||||||
|
acum_df = acum_df.merge(jfj_df, left_index=True, right_index=True, how='left')
|
||||||
|
|
||||||
|
acum_df = acum_df.rename(columns=acsm_to_ebas['renaming_map'])
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
# Load descriptor
|
||||||
|
campaignDescriptorDict = load_project_yaml_files(projectPath, 'campaignDescriptor.yaml')
|
||||||
|
|
||||||
|
# Validate required fields
|
||||||
|
station = validate_required_field(campaignDescriptorDict, 'station')
|
||||||
|
instrument_name = validate_required_field(campaignDescriptorDict, 'instrument_name')
|
||||||
|
year = validate_required_field(campaignDescriptorDict, 'year')
|
||||||
|
|
||||||
|
# Build output paths
|
||||||
|
output_dir = os.path.join(projectPath, 'data')
|
||||||
|
os.makedirs(output_dir, exist_ok=True)
|
||||||
|
|
||||||
|
output_file1 = os.path.join(output_dir, f'{station}_{instrument_name}_{year}.txt')
|
||||||
|
output_file2 = os.path.join(output_dir, f'{station}_{instrument_name}_FLAGS_{year}.txt')
|
||||||
|
|
||||||
|
#output_file1 = os.path.join(output_dir, f'JFJ_ACSM-017_2024_month{args.month}.txt' if args.month else 'JFJ_ACSM-017_2024.txt')
|
||||||
|
#output_file2 = os.path.join(output_dir, f'JFJ_ACSM-017_FLAGS_2024_month{args.month}.txt' if args.month else 'JFJ_ACSM-017_FLAGS_2024.txt')
|
||||||
|
|
||||||
|
#acum_df = acum_df[[col for col in acsm_to_ebas['column_order'] if col in acum_df.columns]]
|
||||||
|
#flags_acum_df = flags_acum_df[[col for col in acsm_to_ebas['flags_column_order'] if col in flags_acum_df.columns]]
|
||||||
|
|
||||||
|
acum_df.loc[:, :].to_csv(output_file1, sep='\t', index=None, date_format="%Y/%m/%d %H:%M:%S")
|
||||||
|
flags_acum_df.loc[:, :].to_csv(output_file2, sep='\t', index=None, date_format="%Y/%m/%d %H:%M:%S")
|
||||||
|
|
||||||
|
# Run external processing application
|
||||||
|
app = rawto012.Application()
|
||||||
|
infile = output_file1
|
||||||
|
acq_err_log = output_file2
|
||||||
|
outdir = output_dir
|
||||||
|
app.process(infile, acq_err_log, outdir=outdir)
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
|
||||||
|
parser = argparse.ArgumentParser(description="Process and calibrate ACSM data for JFJ station.")
|
||||||
|
parser.add_argument('--acsm_paths', type=str, required=True, nargs=3, help="Paths to the ACSM timeseries calibrated CSV file, the error CSV file, and the calibration factors CSV file.")
|
||||||
|
parser.add_argument('--acsm_flags_path', type=str, required=True, help="Path to the ACSM flags CSV file.")
|
||||||
|
parser.add_argument(
|
||||||
|
'--month',
|
||||||
|
type=str,
|
||||||
|
help="Filter data for specific months using comma-separated values and ranges. Ex: '1,3,5-7'"
|
||||||
|
)
|
||||||
|
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
# Load data
|
||||||
|
csv_files = args.acsm_paths # list of filenames
|
||||||
|
flags_file = args.acsm_flags_path
|
||||||
|
month = args.month
|
||||||
|
main(csv_files, flags_file, month)
|
||||||
|
Reference in New Issue
Block a user