refactor: split out single-particle calibration into its own module

This commit is contained in:
2025-08-04 18:43:25 +02:00
parent 74c5a77b48
commit 63359449d3
2 changed files with 284 additions and 0 deletions

View File

@@ -0,0 +1,155 @@
"""
Apply SP2-XR calibration to particle data.
Usage:
python scripts/apply_calibration.py \
--input path/to/input.parquet \
--config path/to/config.yaml \
--output path/to/output.parquet \
--mode add
"""
import argparse
import pandas as pd
import dask.dataframe as dd
from pathlib import Path
import yaml
import time
from dask.distributed import Client
from dask_jobqueue import SLURMCluster
from sp2xr.calibration import calibrate_particle_data
def parse_args():
parser = argparse.ArgumentParser(
description="Apply SP2-XR calibration to a dataset using Dask + SLURM"
)
parser.add_argument(
"--input", required=True, help="Input Parquet file or directory"
)
parser.add_argument(
"--config", required=True, help="YAML calibration configuration"
)
parser.add_argument(
"--output",
required=True,
help="Output directory for calibrated Parquet dataset",
)
parser.add_argument(
"--cores", type=int, default=32, help="Cores per job (default: 32)"
)
parser.add_argument(
"--memory", default="64GB", help="Memory per job (default: 64GB)"
)
parser.add_argument("--walltime", default="02:00:00", help="Walltime (default: 2h)")
parser.add_argument(
"--partition", default="daily", help="SLURM partition (default: daily)"
)
return parser.parse_args()
def apply_calibration(partition, config):
"""Apply calibration to a pandas partition."""
pdf = calibrate_particle_data(partition, config)
pdf["date"] = pdf.index.date.astype("datetime64[ns]")
pdf["hour"] = pdf.index.hour.astype("int64")
return pdf
def main():
args = parse_args()
output_dir = Path(args.output)
# --- Setup Dask + SLURM ---
cluster = SLURMCluster(
cores=args.cores,
processes=args.cores,
memory=args.memory,
walltime=args.walltime,
job_extra_directives=[f"--partition={args.partition}"],
)
cluster.scale(1)
client = Client(cluster)
print(f"Dask dashboard: {client.dashboard_link}")
# --- Load dataset lazily with Dask ---
df = dd.read_parquet(
args.input,
engine="pyarrow",
aggregate_files=True,
split_row_groups=False,
blocksize="32MB",
)
print(f"✅ Loaded dataset with {df.npartitions} partitions")
# --- Load calibration config ---
with open(args.config, "r") as f:
config = yaml.safe_load(f)
cfg_future = client.scatter(config, broadcast=True)
# --- Build a small empty dataframe with correct columns and dtypes ---
meta = df._meta.assign(
**{
"BC mass": pd.Series(dtype="float64"),
"Opt diam": pd.Series(dtype="float64"),
"date": pd.Series(dtype="datetime64[ns]"),
"hour": pd.Series(dtype="int64"),
}
)
# --- Apply calibration in parallel ---
calibrated_df = df.map_partitions(apply_calibration, config=cfg_future, meta=meta)
# --- Save calibrated dataset (partitioned by date/hour) ---
calibrated_df.to_parquet(
path=str(output_dir),
engine="pyarrow",
partition_on=["date", "hour"], # ✅ correct arg for Dask
overwrite=True, # Dask uses 'overwrite' instead of append=False
write_index=True,
write_metadata_file=True,
)
print(
f"✅ Calibration applied successfully in {(time.time() - start_time)/60:.2f} minutes"
)
client.close()
cluster.close()
if __name__ == "__main__":
start_time = time.time()
main()
"""
def main():
args = parse_args()
output_dir = Path(args.output)
# Load dataset
df = dd.read_parquet(args.input, engine="pyarrow")
# Load calibration config
with open(args.config, "r") as f:
config = yaml.safe_load(f)
# Apply calibration
calibrated_df = calibrate_particle_data(df, config)
calibrated_df["date"] = df.index.dt.date.astype("date64[pyarrow]")
calibrated_df["hour"] = df.index.dt.hour.astype("i8")
# Save output
calibrated_df.to_parquet(
path=output_dir,
engine="pyarrow",
partition_on=["date", "hour"],
coerce_timestamps="us",
allow_truncated_timestamps=True,
write_index=True,
append=False,
)
print("✅ Calibration applied successfully!")
if __name__ == "__main__":
main()
"""

129
src/sp2xr/calibration.py Normal file
View File

@@ -0,0 +1,129 @@
import numpy as np
def polynomial(x, *coefficients):
"""
Function to generate a generic polynomial curve. The number of input coefficicnets define the degree of the polynomial curve
Parameters
----------
x : np array
x-array for the evaluation of the polynomial curve.
*coefficients : array of floats
Coefficients for the polynomial curve. First value is for order 0, and so on.
Returns
-------
result : np array
Evaluation of the polynomial curve at x-array.
"""
result = 0
for i, coef in enumerate(coefficients):
result += coef * (x**i)
return result
def powerlaw(x, *coeff):
"""
Generic power-law calibration function.
Parameters:
x (float): input value
*coeff: coefficients, expected at least A and B, optional C
Returns:
y (float): calibrated value
"""
if len(coeff) < 2:
raise ValueError("powerlaw requires at least two parameters: A and B")
A, B = coeff[0], coeff[1]
C = coeff[2] if len(coeff) > 2 else 0 # default offset C=0
return A * (x**B) + C
def apply_inc_calibration(df, curve_type=None, params=None):
"""
Apply incandescence calibration to a Dask DataFrame.
Parameters:
df (pd.DataFrame): particle dataframe
curve_type (str): 'polynomial', 'powerlaw', or None (use raw mass)
params (list or tuple): calibration parameters
Returns:
df (pd.DataFrame) with new column 'BC mass'
"""
inc = df["Incand relPeak"].to_numpy()
if curve_type == "polynomial":
# df["BC mass"] = df["Incand relPeak"].apply(lambda x: polynomial(x, *params))
bc_mass = polynomial(inc, *params)
elif curve_type == "powerlaw":
# df["BC mass"] = df["Incand relPeak"].apply(lambda x: powerlaw(x, params))
bc_mass = powerlaw(inc, *params)
else:
# df["BC mass"] = df["Incand Mass (fg)"]
bc_mass = df["Incand Mass (fg)"].to_numpy()
# Zero signal → NaN
# df.loc[df["Incand relPeak"] == 0, "BC mass"] = np.nan
# df["BC mass"] = np.where(inc == 0, np.nan, bc_mass) #df["BC mass"].mask(df["Incand relPeak"] == 0, np.nan)
bc_mass = np.where(inc == 0, np.nan, bc_mass)
df["BC mass"] = bc_mass
return df
def apply_scatt_calibration(df, curve_type=None, params=None):
"""
Apply scattering calibration to a Dask DataFrame.
Parameters:
df (pd.DataFrame)
curve_type (str): 'polynomial', 'powerlaw', or None (use raw size)
params (list or tuple): calibration parameters
Returns:
df (pd.DataFrame) with new column 'Opt diam'
"""
scatt = df["Scatter relPeak"].to_numpy()
if curve_type == "polynomial":
# df["Opt diam"] = df["Scatter relPeak"].apply(lambda x: polynomial(x, *params))
scatt_diam = polynomial(scatt, *params)
elif curve_type == "powerlaw":
# df["Opt diam"] = df["Scatter relPeak"].apply(lambda x: powerlaw(x, *params))
scatt_diam = powerlaw(scatt, *params)
else:
# df["Opt diam"] = df["Scatter Size (nm)"]
scatt_diam = df["Scatter relPeak"].to_numpy()
# Zero signal → NaN
# df.loc[df["Scatter relPeak"] == 0, "Opt diam"] = np.nan
# df["Opt diam"] = df["Opt diam"].mask(df["Scatter relPeak"] == 0, np.nan)
scatt_diam = np.where(scatt == 0, np.nan, scatt_diam)
df["Opt diam"] = scatt_diam
return df
def calibrate_particle_data(df, config):
"""Wrapper to apply both incandescence and scattering calibration based on config."""
inc_conf = config.get("inc_calibration", {})
scatt_conf = config.get("scatt_calibration", {})
df = apply_inc_calibration(
df,
curve_type=inc_conf.get("curve_type"),
params=inc_conf.get("parameters", []),
)
df = apply_scatt_calibration(
df,
curve_type=scatt_conf.get("curve_type"),
params=scatt_conf.get("parameters", []),
)
return df