Moved get_metadata() from pipelines/steps/prepare_ebas_submission.py to utils.py

This commit is contained in:
2025-03-03 18:55:46 +01:00
parent def67a0562
commit 6eccbb5018
2 changed files with 88 additions and 19 deletions

View File

@ -1,9 +1,22 @@
import sys, os
try:
thisFilePath = os.path.abspath(__file__)
print(thisFilePath)
except NameError:
print("[Notice] The __file__ attribute is unavailable in this environment (e.g., Jupyter or IDLE).")
print("When using a terminal, make sure the working directory is set to the script's location to prevent path issues (for the DIMA submodule)")
#print("Otherwise, path to submodule DIMA may not be resolved properly.")
thisFilePath = os.getcwd() # Use current directory or specify a default
projectPath = os.path.normpath(os.path.join(thisFilePath, "..", "..",'..')) # Move up to project root
import argparse import argparse
import os
import pandas as pd import pandas as pd
import json import json, yaml
import os
import pandas as pd import pandas as pd
from utils import get_metadata
def join_tables(csv_files: list): def join_tables(csv_files: list):
""" """
@ -43,22 +56,22 @@ def join_tables(csv_files: list):
return acum_df return acum_df
def get_metadata(path_to_file): def load_acsm_to_ebas_dict():
path, filename = os.path.split(path_to_file) # Implicit input
dict_file = os.path.normpath(os.path.join(projectPath,"pipelines/dictionaries/acsm_to_ebas.yaml"))
path_to_metadata = None output_dict = {}
for item in os.listdir(path): try:
if 'metadata.json' in item: with open(dict_file, 'r') as stream:
path_to_metadata = os.path.normpath(os.path.join(path,item)) output_dict = yaml.load(stream, Loader=yaml.FullLoader)
metadata = {} except Exception as e:
if path_to_file:
with open(path_to_metadata,'r') as stream:
metadata = json.load(stream)
metadata = metadata.get(filename,{}) print(f'Error loading {dict_file}: {e}')
return {}
return output_dict
return metadata
if __name__ == "__main__": if __name__ == "__main__":
@ -68,8 +81,47 @@ if __name__ == "__main__":
acum_df = join_tables([path1,path2]) acum_df = join_tables([path1,path2])
acum_df.to_csv('data/all_table.txt',sep='\t',index=None) acsm_to_ebas = load_acsm_to_ebas_dict()
#print("Before renaming:", acum_df.columns)
#print("Renaming map keys:", acsm_to_ebas['renaming_map'].keys())
acum_df = acum_df.rename(columns=acsm_to_ebas['renaming_map'])
acum_df['ACSM_time'] = pd.to_datetime(acum_df['ACSM_time'])
reduced_set_of_vars = [key for key in acum_df.columns if 'factor' not in key]
print(reduced_set_of_vars)
acum_df.loc[:,reduced_set_of_vars].to_csv('data/JFJ_ACSM-017_2024.txt',sep='\t',index=None, date_format="%Y/%m/%d %H:%M:%S")
# Count the number of NaT (null) values
num_nats = acum_df['ACSM_time'].isna().sum()
# Get the total number of rows
total_rows = len(acum_df)
# Calculate the percentage of NaT values
percentage_nats = (num_nats / total_rows) * 100
print(f"Total rows: {total_rows}")
print(f"NaT (missing) values: {num_nats}")
print(f"Percentage of data loss: {percentage_nats:.2f}%")
acum_df = join_tables([path3]) acum_df = join_tables([path3])
acum_df = acum_df.rename(columns=acsm_to_ebas['renaming_map'])
acum_df['ACSM_time'] = pd.to_datetime(acum_df['ACSM_time'])
acum_df.to_csv('data/all_table_flags.txt',sep='\t',index=None)
# Count the number of NaT (null) values
num_nats = acum_df['ACSM_time'].isna().sum()
# Get the total number of rows
total_rows = len(acum_df)
# Calculate the percentage of NaT values
percentage_nats = (num_nats / total_rows) * 100
print(f"Total rows: {total_rows}")
print(f"NaT (missing) values: {num_nats}")
print(f"Percentage of data loss: {percentage_nats:.2f}%")
acum_df.to_csv('data/JFJ_ACSM-017_FLAGS_2024.txt',sep='\t',index=None, date_format="%Y/%m/%d %H:%M:%S")

View File

@ -31,3 +31,20 @@ def record_data_lineage(path_to_output_file, projectPath, metadata):
print(f"Metadata for calibrated data saved to {path_to_metadata_file}") print(f"Metadata for calibrated data saved to {path_to_metadata_file}")
return 0 return 0
def get_metadata(path_to_file):
path, filename = os.path.split(path_to_file)
path_to_metadata = None
for item in os.listdir(path):
if 'metadata.json' in item:
path_to_metadata = os.path.normpath(os.path.join(path,item))
metadata = {}
if path_to_file:
with open(path_to_metadata,'r') as stream:
metadata = json.load(stream)
metadata = metadata.get(filename,{})
return metadata