Files
acsmnode/notebooks/demo_acsm_pipeline.ipynb

376 lines
13 KiB
Plaintext

{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# ACSM Data Chain Workflow\n",
"\n",
"In this notebook, we will go through our **ACSM Data Chain**. This involves the following steps:\n",
"\n",
"1. Run the data integration pipeline to retrieve ACSM input data and prepare it for processing. \n",
"2. Perform QC/QA analysis. \n",
"3. (Optional) Conduct visual analysis for flag validation. \n",
"4. Prepare input data and QC/QA analysis results for submission to the EBAS database. \n",
"\n",
"## Import Libraries and Data Chain Steps\n",
"\n",
"* Execute (or Run) the cell below."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import sys\n",
"import os\n",
"# Set up project root directory\n",
"\n",
"\n",
"notebook_dir = os.getcwd() # Current working directory (assumes running from notebooks/)\n",
"project_path = os.path.normpath(os.path.join(notebook_dir, \"..\")) # Move up to project root\n",
"dima_path = os.path.normpath(os.path.join(project_path, \"dima\")) # Move up to project root\n",
"\n",
"if project_path not in sys.path: # Avoid duplicate entries\n",
" sys.path.append(project_path)\n",
"if dima_path not in sys.path:\n",
" sys.path.insert(0,dima_path)\n",
"#sys.path.append(os.path.join(root_dir,'dima','instruments'))\n",
"#sys.path.append(os.path.join(root_dir,'dima','src'))\n",
"#sys.path.append(os.path.join(root_dir,'dima','utils'))\n",
"\n",
"#import dima.visualization.hdf5_vis as hdf5_vis\n",
"#import dima.pipelines.data_integration as data_integration\n",
"import subprocess\n",
"\n",
"\n",
"for item in sys.path:\n",
" print(item)\n",
"\n",
"from dima.pipelines.data_integration import run_pipeline as get_campaign_data\n",
"from pipelines.steps.apply_calibration_factors import main as apply_calibration_factors\n",
"from pipelines.steps.generate_flags import main as generate_flags\n",
"from pipelines.steps.prepare_ebas_submission import main as prepare_ebas_submission \n",
"from pipelines.steps.update_actris_header import main as update_actris_header\n",
"from pipelines.steps.utils import load_project_yaml_files\n",
"from pipelines.steps.update_datachain_params import main as update_datachain_params\n",
"from pipelines.steps.drop_column_from_nas_file import main as drop_column_from_nas_file\n",
"from pipelines.steps.adjust_uncertainty_column_in_nas_file import main as adjust_uncertainty_column_in_nas_file\n",
"\n",
"campaign_descriptor = load_project_yaml_files(project_path, \"campaignDescriptor.yaml\")\n",
"YEAR = campaign_descriptor['year']\n",
"STATION_ABBR = campaign_descriptor['station_abbr']\n",
"\n",
"workflow_fname = f'workflow_acsm_data_{STATION_ABBR}_{YEAR}'\n",
"\n",
"print(workflow_fname)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Step 1: Retrieve Input Data from a Network Drive\n",
"\n",
"* Create a configuration file (i.e., a `.yaml` file) following the example provided in the input folder.\n",
"* Set up the input and output directory paths.\n",
"* Execute the cell (or Skip it and Execute next cell with manually defined **CAMPAIGN_DATA_FILE** and **APPEND_DATA_DIR**)."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"path_to_config_file = '../campaignDescriptor.yaml'\n",
"paths_to_hdf5_files = get_campaign_data(path_to_config_file)\n",
"# Select campaign data file and append directory\n",
"CAMPAIGN_DATA_FILE = paths_to_hdf5_files[0]\n",
"APPEND_DATA_DIR = os.path.splitext(CAMPAIGN_DATA_FILE)[0]\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Uncomment and define the following variables manually to reanalize previous data collections\n",
"#CAMPAIGN_DATA_FILE = '../data/collection_PAY_2024_2025-06-05_2025-06-05.h5'\n",
"#CAMPAIGN_DATA_FILE = '../data/collection_JFJ_2024_2025-06-06_2025-06-06.h5'\n",
"#APPEND_DATA_DIR = '../data/collection_JFJ_2024_2025-06-06_2025-06-06'\n",
"#APPEND_DATA_DIR = '../data/collection_PAY_2024_2025-05-26_2025-05-26'\n",
"#CAMPAIGN_DATA_FILE = '../data/collection_PAY_2024_2025-05-21_2025-05-21.h5'\n",
"#APPEND_DATA_DIR = '../data/collection_PAY_2024_2025-05-21_2025-05-21'"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Step 1.1: Update Data Chain Parameters with Input Data\n",
"* Ensure the data folder retreived from the network drive contains a suitably specified folder `ACSM_TOFWARE/<year>/params`."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"update_datachain_params(CAMPAIGN_DATA_FILE, 'ACSM_TOFWARE/2024', capture_renku_metadata=True, workflow_name=workflow_fname)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Step 2: Calibrate Input Campaign Data and Save Data Products\n",
"\n",
"* Make sure the variable `CAMPAIGN_DATA_FILE` is properly defined in previous step. Otherwise, set the variable manually as indicated below.\n",
"* Execute the cell."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Define manually path to data file by uncomenting the following line, and filling the path\n",
"\n",
"# CAMPAIGN_DATA_FILE = ../data/<enter here *.h5 filename of interest inside the data directory>\n",
"path_to_data_file = CAMPAIGN_DATA_FILE\n",
"path_to_calibration_file = '../pipelines/params/calibration_factors.yaml'\n",
"\n",
"apply_calibration_factors(path_to_data_file,path_to_calibration_file, capture_renku_metadata=True, workflow_name=workflow_fname)\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Step 3: Perform QC/QA Analysis\n",
"\n",
"* Generate automated flags based on validity thresholds for diagnostic channels.\n",
"* (Optional) Generate manual flags using the **Data Flagging App**, accessible at: \n",
" [http://localhost:8050/](http://localhost:8050/)\n",
"* Execute the cell."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"dataset_name = f'ACSM_TOFWARE/{YEAR}/ACSM_{STATION_ABBR}_{YEAR}_meta.txt/data_table'\n",
"path_to_config_file = 'pipelines/params/validity_thresholds.yaml'\n",
"#command = ['python', 'pipelines/steps/compute_automated_flags.py', path_to_data_file, dataset_name, path_to_config_file]\n",
"#status = subprocess.run(command, capture_output=True, check=True)\n",
"#print(status.stdout.decode())\n",
"path_to_data_file = CAMPAIGN_DATA_FILE\n",
"generate_flags(path_to_data_file, 'diagnostics', capture_renku_metadata=True, workflow_name=workflow_fname)\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"\n",
"generate_flags(path_to_data_file, 'cpc', capture_renku_metadata=True, workflow_name=workflow_fname)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## (Optional) Step 3.1: Inspect Previously Generated Flags for Correctness\n",
"\n",
"* Perform flag validation using the Jupyter Notebook workflow available at: \n",
" [../notebooks/demo_visualize_diagnostic_flags_from_hdf5_file.ipynb](demo_visualize_diagnostic_flags_from_hdf5_file.ipynb)\n",
"* Follow the notebook steps to visually inspect previously generated flags."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Step 4: Apply Diagnostic and Manual Flags to Variables of Interest\n",
"\n",
"* Generate flags for species based on previously collected QC/QA flags.\n",
"* Execute the cell."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#CAMPAIGN_DATA_FILE = '../data/collection_JFJ_2024_2025-04-08_2025-04-08.h5'\n",
"path_to_data_file = CAMPAIGN_DATA_FILE\n",
"dataset_name = f'ACSM_TOFWARE/{YEAR}/ACSM_{STATION_ABBR}_{YEAR}_meta.txt/data_table'\n",
"path_to_config_file = 'pipelines/params/validity_thresholds.yaml'\n",
"#command = ['python', 'pipelines/steps/compute_automated_flags.py', path_to_data_file, dataset_name, path_to_config_file]\n",
"#status = subprocess.run(command, capture_output=True, check=True)\n",
"#print(status.stdout.decode())\n",
"generate_flags(path_to_data_file, 'species', capture_renku_metadata=True, workflow_name=workflow_fname)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Step 5: Generate Campaign Data in EBAS Format\n",
"\n",
"* Gather and set paths to the required data products produced in the previous steps.\n",
"* Execute the cell."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import warnings\n",
"print(APPEND_DATA_DIR)\n",
"DATA_DIR = f\"{APPEND_DATA_DIR}/ACSM_TOFWARE_processed/{YEAR}\"\n",
"FLAGS_DIR = f\"{APPEND_DATA_DIR}/ACSM_TOFWARE_flags/{YEAR}\"\n",
"\n",
"PATH1 = f\"{DATA_DIR}/ACSM_{STATION_ABBR}_{YEAR}_timeseries_calibrated.csv\"\n",
"PATH2 = f\"{DATA_DIR}/ACSM_{STATION_ABBR}_{YEAR}_timeseries_calibrated_err.csv\"\n",
"PATH3 = f\"{DATA_DIR}/ACSM_{STATION_ABBR}_{YEAR}_timeseries_calibration_factors.csv\"\n",
"PATH4 = f\"{FLAGS_DIR}/ACSM_{STATION_ABBR}_{YEAR}_timeseries_flags.csv\"\n",
"\n",
"[print(p, os.path.exists(p)) for p in [PATH1,PATH2,PATH3,PATH4]]\n",
"update_actris_header('../campaignDescriptor.yaml')\n",
"\n",
"month = \"2-3\"\n",
"with warnings.catch_warnings():\n",
" warnings.simplefilter('ignore')\n",
" prepare_ebas_submission([PATH1, PATH2, PATH3], PATH4, month,capture_renku_metadata=True, workflow_name=workflow_fname)\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Step 5.1: Remove inletP column from a generated nas file\n",
"* Select a nas file from the data folder"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#path_to_data_file = '../data/CH0001G.20240201010000.20250519140310.aerosol_mass_spectrometer.chemistry_ACSM.pm1_non_refractory.2mo.1h.CH02L_Aerodyne_ToF-ACSM_017.CH02L_Aerodyne_ToF-ACSM_JFJ.lev2.nas'\n",
"path_to_data_file = '../data/CH0001G.20240201010000.20250527075812.aerosol_mass_spectrometer.chemistry_ACSM.pm1_non_refractory.2mo.1h.CH02L_Aerodyne_ToF-ACSM_017.CH02L_Aerodyne_ToF-ACSM_JFJ.lev2.nas'\n",
"\n",
"drop_column_from_nas_file(path_to_data_file, column_to_remove='inletP')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#print(((0.5*0.9520)**2 + (0.5*0.0554)**2)**0.5)\n",
"#from math import sqrt\n",
"#print(':)',sqrt((0.5*0.9520)**2 + (0.5*0.0554)**2))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Step 5.2: Adjust uncertainty of selected column name / variable by adding a constant\n",
"* Select a nas file from the data folder"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"path_to_data_file = '../data/CH0001G.20240201010000.20250527075812.aerosol_mass_spectrometer.chemistry_ACSM.pm1_non_refractory.2mo.1h.CH02L_Aerodyne_ToF-ACSM_017.CH02L_Aerodyne_ToF-ACSM_JFJ.lev2.nas'\n",
"#adjust_uncertainty_column_in_nas_file(path_to_data_file, base_column_name='Org')\n",
"variables = ['Org', 'NO3', 'NH4', 'SO4', 'Chl']\n",
"adjust_uncertainty_column_in_nas_file(path_to_data_file, base_column_names=variables)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Step 6: Save Data Products to an HDF5 File\n",
"\n",
"* Gather and set paths to the required data products produced in the previous steps.\n",
"* Execute the cell.\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import dima.src.hdf5_ops as dataOps \n",
"#print(os.curdir)\n",
"\n",
"\n",
"dataManager = dataOps.HDF5DataOpsManager(CAMPAIGN_DATA_FILE)\n",
"print(dataManager.file_path)\n",
"print(APPEND_DATA_DIR)\n",
"dataManager.update_file(APPEND_DATA_DIR)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"dataManager = dataOps.HDF5DataOpsManager(path_to_data_file)\n",
"dataManager.load_file_obj()\n",
"dataManager.extract_and_load_dataset_metadata()\n",
"df = dataManager.dataset_metadata_df\n",
"print(df.head(10))\n",
"dataManager.unload_file_obj()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.10"
}
},
"nbformat": 4,
"nbformat_minor": 2
}