mirror of
https://gitea.psi.ch/APOG/acsmnode.git
synced 2025-06-24 21:21:08 +02:00
WIP: finished last commit, and tested the whole step. Now we need to implement the command line interface.
This commit is contained in:
@ -12,11 +12,16 @@ except NameError:
|
||||
|
||||
projectPath = os.path.normpath(os.path.join(thisFilePath, "..", "..",'..')) # Move up to project root
|
||||
|
||||
if projectPath not in sys.path:
|
||||
sys.path.insert(0,projectPath)
|
||||
|
||||
import argparse
|
||||
import pandas as pd
|
||||
import json, yaml
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
from utils import get_metadata
|
||||
from utils import metadata_dict_to_dataframe
|
||||
from pipelines.steps.utils import load_project_yaml_files
|
||||
|
||||
def join_tables(csv_files: list):
|
||||
"""
|
||||
@ -58,44 +63,39 @@ def join_tables(csv_files: list):
|
||||
return acum_df
|
||||
|
||||
|
||||
def load_acsm_to_ebas_dict():
|
||||
|
||||
# Implicit input
|
||||
dict_file = os.path.normpath(os.path.join(projectPath,"pipelines/dictionaries/acsm_to_ebas.yaml"))
|
||||
|
||||
output_dict = {}
|
||||
try:
|
||||
with open(dict_file, 'r') as stream:
|
||||
output_dict = yaml.load(stream, Loader=yaml.FullLoader)
|
||||
except Exception as e:
|
||||
|
||||
print(f'Error loading {dict_file}: {e}')
|
||||
return {}
|
||||
|
||||
return output_dict
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
path1 = 'data/collection_JFJ_2024_LeilaS_2025-02-17_2025-02-17/ACSM_TOFWARE_processed/2024/ACSM_JFJ_2024_timeseries_calibrated.csv'
|
||||
path2 = 'data/collection_JFJ_2024_LeilaS_2025-02-17_2025-02-17/ACSM_TOFWARE_processed/2024/ACSM_JFJ_2024_timeseries_calibration_factors.csv'
|
||||
path3 = 'data/collection_JFJ_2024_LeilaS_2025-02-17_2025-02-17/ACSM_TOFWARE_flags/2024/ACSM_JFJ_2024_timeseries_flags.csv'
|
||||
path2 = 'data/collection_JFJ_2024_LeilaS_2025-02-17_2025-02-17/ACSM_TOFWARE_processed/2024/ACSM_JFJ_2024_timeseries_calibrated_err.csv'
|
||||
path3 = 'data/collection_JFJ_2024_LeilaS_2025-02-17_2025-02-17/ACSM_TOFWARE_processed/2024/ACSM_JFJ_2024_timeseries_calibration_factors.csv'
|
||||
path4 = 'data/collection_JFJ_2024_LeilaS_2025-02-17_2025-02-17/ACSM_TOFWARE_flags/2024/ACSM_JFJ_2024_timeseries_flags.csv'
|
||||
|
||||
acum_df = join_tables([path1,path2])
|
||||
acum_df = join_tables([path1,path2,path3])
|
||||
|
||||
acsm_to_ebas = load_acsm_to_ebas_dict()
|
||||
acsm_to_ebas = load_project_yaml_files(projectPath, "acsm_to_ebas.yaml")
|
||||
|
||||
# Select variables that are both in the acsm to ebas dict and acum_df
|
||||
reduced_set_of_vars = [key for key in acum_df.columns if key in acsm_to_ebas['renaming_map'].keys()]
|
||||
|
||||
acum_df = acum_df.loc[:,reduced_set_of_vars].rename(columns=acsm_to_ebas['renaming_map'])
|
||||
#print("Before renaming:", acum_df.columns)
|
||||
#print("Renaming map keys:", acsm_to_ebas['renaming_map'].keys())
|
||||
|
||||
acum_df = acum_df.rename(columns=acsm_to_ebas['renaming_map'])
|
||||
|
||||
|
||||
|
||||
reduced_set_of_vars = [key for key in acum_df.columns if 'factor' not in key]
|
||||
#print(reduced_set_of_vars)
|
||||
|
||||
flags_acum_df = join_tables([path3])
|
||||
flags_acum_df = join_tables([path4])
|
||||
flags_acum_df = flags_acum_df.rename(columns=acsm_to_ebas['renaming_map'])
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
# Ensure time columns are datetime
|
||||
acum_df['ACSM_time'] = pd.to_datetime(acum_df['ACSM_time'])
|
||||
flags_acum_df['ACSM_time'] = pd.to_datetime(acum_df['ACSM_time'])
|
||||
@ -127,32 +127,48 @@ if __name__ == "__main__":
|
||||
|
||||
valid_rows = ~(nat_acum | nat_flags) # Compute valid rows in one step
|
||||
|
||||
# Define file paths
|
||||
#path_to_detection_limits = os.path.normpath(os.path.join(projectPath, 'pipelines/params/limits_of_detection.yaml'))
|
||||
#path_to_station_params = os.path.normpath(os.path.join(projectPath, 'pipelines/params/station_params.yaml'))
|
||||
|
||||
acum_df.loc[valid_rows.to_numpy(),reduced_set_of_vars].to_csv('data/JFJ_ACSM-017_2024.txt',sep='\t',index=None, date_format="%Y/%m/%d %H:%M:%S")
|
||||
# Load YAML files
|
||||
#detection_limits = load_yaml(path_to_detection_limits)
|
||||
detection_limits = load_project_yaml_files(projectPath, "limits_of_detection.yaml")
|
||||
station_params = load_project_yaml_files(projectPath, "station_params.yaml") # load_yaml(path_to_station_params)
|
||||
|
||||
# Extract dictionaries from required keys
|
||||
lod_dict = detection_limits.get('LOD', {}).get('variables',{}) # Extract "LOD" dictionary
|
||||
jfj_dict = station_params.get('stations', {}).get('JFJ', {}) # Extract "JFJ" dictionary
|
||||
|
||||
# Convert dictionaries to DataFrames using the existing function
|
||||
lod_df = metadata_dict_to_dataframe(lod_dict, shape = (len(acum_df),len(lod_dict)))
|
||||
jfj_df = metadata_dict_to_dataframe(jfj_dict, shape = (len(acum_df),len(jfj_dict)))
|
||||
|
||||
# Ensure indexes are properly aligned for merging
|
||||
acum_df = acum_df.reset_index() # Convert index to a column for merging
|
||||
|
||||
# Merge with LOD DataFrame
|
||||
acum_df = acum_df.merge(lod_df, left_index=True, right_index=True, how='left')
|
||||
|
||||
# Merge with JFJ DataFrame
|
||||
acum_df = acum_df.merge(jfj_df, left_index=True, right_index=True, how='left')
|
||||
|
||||
|
||||
acum_df = acum_df.rename(columns=acsm_to_ebas['renaming_map'])
|
||||
|
||||
|
||||
#reduced_set_of_vars = [key for key in reduced_set_of_vars if '' not in key]
|
||||
acum_df.loc[valid_rows.to_numpy(),:].to_csv('data/JFJ_ACSM-017_2024.txt',sep='\t',index=None, date_format="%Y/%m/%d %H:%M:%S")
|
||||
flags_acum_df.loc[valid_rows.to_numpy(),:].to_csv('data/JFJ_ACSM-017_FLAGS_2024.txt',sep='\t',index=None, date_format="%Y/%m/%d %H:%M:%S")
|
||||
|
||||
|
||||
|
||||
|
||||
#acum_df['ACSM_time'] = pd.to_datetime(acum_df['ACSM_time'])
|
||||
#flags_acum_df['ACSM_time'] = pd.to_datetime(flags_acum_df['ACSM_time'])
|
||||
from third_party.acsmProcessingSoftware.src import rawto012
|
||||
|
||||
# Set datetime as index
|
||||
#acum_df.set_index('ACSM_time', inplace=True)
|
||||
#flags_acum_df.set_index('ACSM_time', inplace=True)
|
||||
app = rawto012.Application()
|
||||
infile = 'data/JFJ_ACSM-017_2024.txt'
|
||||
acq_err_log = 'data/JFJ_ACSM-017_FLAGS_2024.txt'
|
||||
outdir = 'data/'
|
||||
app.process(infile, acq_err_log, outdir=outdir)
|
||||
|
||||
#nat_acum = acum_df['ACSM_time'].isna()
|
||||
#nat_flags = flags_acum_df['ACSM_time'].isna()
|
||||
|
||||
#valid_rows = ~(nat_acum | nat_flags) # Compute valid rows in one step
|
||||
|
||||
#acum_df_filtered = acum_df.loc[valid_rows.to_numpy(),:]
|
||||
#flags_acum_df_filtered = flags_acum_df[valid_rows.to_numpy(),:]
|
||||
|
||||
# Step 4: Apply the valid mask to both dataframes
|
||||
#acum_df_filtered = acum_df[valid_rows]
|
||||
#flags_acum_df_filtered = flags_acum_df[valid_rows]
|
||||
|
||||
# Display results
|
||||
#print(acum_df_filtered)
|
||||
#print(flags_acum_df_filtered)
|
Reference in New Issue
Block a user