mirror of
https://gitea.psi.ch/APOG/acsmnode.git
synced 2025-06-24 21:21:08 +02:00
Implement new step pipelines/steps/adjust_uncertainty_column_in_nas_file.py.
This commit is contained in:
126
pipelines/steps/adjust_uncertainty_column_in_nas_file.py
Normal file
126
pipelines/steps/adjust_uncertainty_column_in_nas_file.py
Normal file
@ -0,0 +1,126 @@
|
|||||||
|
import sys, os
|
||||||
|
import re
|
||||||
|
|
||||||
|
try:
|
||||||
|
thisFilePath = os.path.abspath(__file__)
|
||||||
|
print(thisFilePath)
|
||||||
|
except NameError:
|
||||||
|
print("[Notice] The __file__ attribute is unavailable in this environment (e.g., Jupyter or IDLE).")
|
||||||
|
print("When using a terminal, make sure the working directory is set to the script's location to prevent path issues (for the DIMA submodule)")
|
||||||
|
#print("Otherwise, path to submodule DIMA may not be resolved properly.")
|
||||||
|
thisFilePath = os.getcwd() # Use current directory or specify a default
|
||||||
|
|
||||||
|
|
||||||
|
projectPath = os.path.normpath(os.path.join(thisFilePath, "..", "..",'..')) # Move up to project root
|
||||||
|
|
||||||
|
if projectPath not in sys.path:
|
||||||
|
sys.path.insert(0,projectPath)
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
import pandas as pd
|
||||||
|
from dima.instruments.readers.nasa_ames_reader import read_nasa_ames_as_dict
|
||||||
|
from pipelines.steps.utils import compute_uncertainty_estimate
|
||||||
|
|
||||||
|
def main(path_to_data_file, base_column_name):
|
||||||
|
|
||||||
|
|
||||||
|
if not path_to_data_file.endswith('.nas'):
|
||||||
|
raise RuntimeError(f'Invalid file extension. The input file {path_to_data_file} must be a .nas file.')
|
||||||
|
|
||||||
|
# Read and extract data
|
||||||
|
idr_dict = read_nasa_ames_as_dict(path_to_data_file)
|
||||||
|
header_metadata_dict = idr_dict['attributes_dict']
|
||||||
|
|
||||||
|
# Locate data table
|
||||||
|
dataset = None
|
||||||
|
for d in idr_dict['datasets']:
|
||||||
|
if d['name'] == 'data_table':
|
||||||
|
dataset = d
|
||||||
|
if dataset is None:
|
||||||
|
raise ValueError("Dataset named 'data_table' not found.")
|
||||||
|
|
||||||
|
data_table = dataset['data'] # structured numpy array
|
||||||
|
df = pd.DataFrame(data_table)
|
||||||
|
|
||||||
|
if base_column_name not in df.columns:
|
||||||
|
raise ValueError(f"Base column '{base_column_name}' not found in dataset.")
|
||||||
|
|
||||||
|
err_column = f"err_{base_column_name}"
|
||||||
|
if err_column not in df.columns:
|
||||||
|
raise ValueError(f"Column '{err_column}' not found in dataset.")
|
||||||
|
|
||||||
|
# Apply callback to base column
|
||||||
|
|
||||||
|
err_index = data_table.dtype.names.index(err_column)
|
||||||
|
|
||||||
|
# Read original lines from file
|
||||||
|
with open(path_to_data_file, 'rb') as file:
|
||||||
|
raw_lines = file.readlines()
|
||||||
|
|
||||||
|
header_length = header_metadata_dict['header_length']
|
||||||
|
data_table_lines = []
|
||||||
|
|
||||||
|
# Iterate through data table lines
|
||||||
|
cnt = 0
|
||||||
|
for line_idx in range(len(raw_lines)):
|
||||||
|
if line_idx >= header_length - 1:
|
||||||
|
line = raw_lines[line_idx]
|
||||||
|
fields = list(re.finditer(rb'\S+', line))
|
||||||
|
|
||||||
|
if err_index < len(fields):
|
||||||
|
match = fields[err_index]
|
||||||
|
original_bytes = match.group()
|
||||||
|
original_str = original_bytes.decode('utf-8')
|
||||||
|
|
||||||
|
# Skip column header or fill values
|
||||||
|
clean_original_str = original_str.strip().replace('.', '')
|
||||||
|
if err_column in original_str:
|
||||||
|
data_table_lines.append(line)
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Decimal precision
|
||||||
|
decimals = len(original_str.split('.')[1]) if '.' in original_str else 0
|
||||||
|
|
||||||
|
try:
|
||||||
|
original_err = float(original_str)
|
||||||
|
if not (clean_original_str and all(c == '9' for c in clean_original_str)):
|
||||||
|
additional_term = df.loc[cnt, base_column_name]
|
||||||
|
updated_value = compute_uncertainty_estimate(additional_term, original_err)
|
||||||
|
else: # if original value is missing, then keep the same
|
||||||
|
updated_value = original_err
|
||||||
|
except Exception as e:
|
||||||
|
raise RuntimeError(f"Error calculating updated value on line {line_idx}: {e}")
|
||||||
|
|
||||||
|
# Preserve width and precision
|
||||||
|
start, end = match.span()
|
||||||
|
width = end - start
|
||||||
|
formatted_str = f"{updated_value:.{decimals}f}"
|
||||||
|
|
||||||
|
if len(formatted_str) > width:
|
||||||
|
print(f"Warning: formatted value '{formatted_str}' too wide for field of width {width} at line {line_idx}. Value may be truncated.")
|
||||||
|
|
||||||
|
formatted_bytes = formatted_str.rjust(width).encode('utf-8')
|
||||||
|
new_line = line[:start] + formatted_bytes + line[end:]
|
||||||
|
data_table_lines.append(new_line)
|
||||||
|
cnt += 1
|
||||||
|
|
||||||
|
# Reconstruct the file
|
||||||
|
processed_lines = (
|
||||||
|
header_metadata_dict['raw_header_part1'] +
|
||||||
|
header_metadata_dict['raw_header_part2'] +
|
||||||
|
header_metadata_dict['raw_header_part3'] +
|
||||||
|
data_table_lines
|
||||||
|
)
|
||||||
|
|
||||||
|
# Write updated content to file
|
||||||
|
with open(path_to_data_file, 'wb') as f:
|
||||||
|
for line in processed_lines:
|
||||||
|
decoded = line.decode('utf-8').rstrip('\n')
|
||||||
|
f.write((decoded + '\n').encode('utf-8'))
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
path_to_data_file = os.path.normpath(os.path.join(
|
||||||
|
'data', 'CH0002G.20240201010000.20250521123253.aerosol_mass_spectrometer.chemistry_ACSM.pm1_non_refractory.7w.1h.CH02L_Aerodyne_ToF-ACSM_092.CH02L_Aerodyne_ToF-ACSM_PAY.lev2.nas'
|
||||||
|
))
|
||||||
|
main(path_to_data_file, base_column_name='Org')
|
@ -93,7 +93,8 @@ def generate_missing_value_code(max_val, num_decimals):
|
|||||||
|
|
||||||
return missing_code
|
return missing_code
|
||||||
|
|
||||||
|
def compute_uncertainty_estimate(x,x_err):
|
||||||
|
return ((0.5*x_err)**2+(0.5*x)**2)**0.5
|
||||||
|
|
||||||
|
|
||||||
def generate_error_dataframe(df: pd.DataFrame, datetime_var):
|
def generate_error_dataframe(df: pd.DataFrame, datetime_var):
|
||||||
|
Reference in New Issue
Block a user