From 63359449d39f6634569393326e797e9fd64eabb5 Mon Sep 17 00:00:00 2001 From: Barbara Bertozzi Date: Mon, 4 Aug 2025 18:43:25 +0200 Subject: [PATCH] refactor: split out single-particle calibration into its own module --- scripts/sp2xr_apply_calibration.py | 155 +++++++++++++++++++++++++++++ src/sp2xr/calibration.py | 129 ++++++++++++++++++++++++ 2 files changed, 284 insertions(+) create mode 100644 scripts/sp2xr_apply_calibration.py create mode 100644 src/sp2xr/calibration.py diff --git a/scripts/sp2xr_apply_calibration.py b/scripts/sp2xr_apply_calibration.py new file mode 100644 index 0000000..c502d49 --- /dev/null +++ b/scripts/sp2xr_apply_calibration.py @@ -0,0 +1,155 @@ +""" +Apply SP2-XR calibration to particle data. + +Usage: + python scripts/apply_calibration.py \ + --input path/to/input.parquet \ + --config path/to/config.yaml \ + --output path/to/output.parquet \ + --mode add +""" + +import argparse +import pandas as pd +import dask.dataframe as dd +from pathlib import Path +import yaml +import time +from dask.distributed import Client +from dask_jobqueue import SLURMCluster +from sp2xr.calibration import calibrate_particle_data + + +def parse_args(): + parser = argparse.ArgumentParser( + description="Apply SP2-XR calibration to a dataset using Dask + SLURM" + ) + parser.add_argument( + "--input", required=True, help="Input Parquet file or directory" + ) + parser.add_argument( + "--config", required=True, help="YAML calibration configuration" + ) + parser.add_argument( + "--output", + required=True, + help="Output directory for calibrated Parquet dataset", + ) + parser.add_argument( + "--cores", type=int, default=32, help="Cores per job (default: 32)" + ) + parser.add_argument( + "--memory", default="64GB", help="Memory per job (default: 64GB)" + ) + parser.add_argument("--walltime", default="02:00:00", help="Walltime (default: 2h)") + parser.add_argument( + "--partition", default="daily", help="SLURM partition (default: daily)" + ) + return parser.parse_args() + + +def apply_calibration(partition, config): + """Apply calibration to a pandas partition.""" + pdf = calibrate_particle_data(partition, config) + pdf["date"] = pdf.index.date.astype("datetime64[ns]") + pdf["hour"] = pdf.index.hour.astype("int64") + return pdf + + +def main(): + args = parse_args() + output_dir = Path(args.output) + + # --- Setup Dask + SLURM --- + cluster = SLURMCluster( + cores=args.cores, + processes=args.cores, + memory=args.memory, + walltime=args.walltime, + job_extra_directives=[f"--partition={args.partition}"], + ) + cluster.scale(1) + client = Client(cluster) + print(f"Dask dashboard: {client.dashboard_link}") + + # --- Load dataset lazily with Dask --- + df = dd.read_parquet( + args.input, + engine="pyarrow", + aggregate_files=True, + split_row_groups=False, + blocksize="32MB", + ) + print(f"✅ Loaded dataset with {df.npartitions} partitions") + + # --- Load calibration config --- + with open(args.config, "r") as f: + config = yaml.safe_load(f) + cfg_future = client.scatter(config, broadcast=True) + + # --- Build a small empty dataframe with correct columns and dtypes --- + meta = df._meta.assign( + **{ + "BC mass": pd.Series(dtype="float64"), + "Opt diam": pd.Series(dtype="float64"), + "date": pd.Series(dtype="datetime64[ns]"), + "hour": pd.Series(dtype="int64"), + } + ) + + # --- Apply calibration in parallel --- + calibrated_df = df.map_partitions(apply_calibration, config=cfg_future, meta=meta) + + # --- Save calibrated dataset (partitioned by date/hour) --- + calibrated_df.to_parquet( + path=str(output_dir), + engine="pyarrow", + partition_on=["date", "hour"], # ✅ correct arg for Dask + overwrite=True, # Dask uses 'overwrite' instead of append=False + write_index=True, + write_metadata_file=True, + ) + + print( + f"✅ Calibration applied successfully in {(time.time() - start_time)/60:.2f} minutes" + ) + client.close() + cluster.close() + + +if __name__ == "__main__": + start_time = time.time() + main() + +""" +def main(): + args = parse_args() + output_dir = Path(args.output) + + # Load dataset + df = dd.read_parquet(args.input, engine="pyarrow") + + # Load calibration config + with open(args.config, "r") as f: + config = yaml.safe_load(f) + + # Apply calibration + calibrated_df = calibrate_particle_data(df, config) + calibrated_df["date"] = df.index.dt.date.astype("date64[pyarrow]") + calibrated_df["hour"] = df.index.dt.hour.astype("i8") + + # Save output + calibrated_df.to_parquet( + path=output_dir, + engine="pyarrow", + partition_on=["date", "hour"], + coerce_timestamps="us", + allow_truncated_timestamps=True, + write_index=True, + append=False, + ) + print("✅ Calibration applied successfully!") + +if __name__ == "__main__": + main() +""" diff --git a/src/sp2xr/calibration.py b/src/sp2xr/calibration.py new file mode 100644 index 0000000..7f22e0f --- /dev/null +++ b/src/sp2xr/calibration.py @@ -0,0 +1,129 @@ +import numpy as np + + +def polynomial(x, *coefficients): + """ + Function to generate a generic polynomial curve. The number of input coefficicnets define the degree of the polynomial curve + + Parameters + ---------- + x : np array + x-array for the evaluation of the polynomial curve. + *coefficients : array of floats + Coefficients for the polynomial curve. First value is for order 0, and so on. + + Returns + ------- + result : np array + Evaluation of the polynomial curve at x-array. + + """ + result = 0 + for i, coef in enumerate(coefficients): + result += coef * (x**i) + return result + + +def powerlaw(x, *coeff): + """ + Generic power-law calibration function. + + Parameters: + x (float): input value + *coeff: coefficients, expected at least A and B, optional C + + Returns: + y (float): calibrated value + """ + if len(coeff) < 2: + raise ValueError("powerlaw requires at least two parameters: A and B") + + A, B = coeff[0], coeff[1] + C = coeff[2] if len(coeff) > 2 else 0 # default offset C=0 + return A * (x**B) + C + + +def apply_inc_calibration(df, curve_type=None, params=None): + """ + Apply incandescence calibration to a Dask DataFrame. + + Parameters: + df (pd.DataFrame): particle dataframe + curve_type (str): 'polynomial', 'powerlaw', or None (use raw mass) + params (list or tuple): calibration parameters + + Returns: + df (pd.DataFrame) with new column 'BC mass' + """ + inc = df["Incand relPeak"].to_numpy() + + if curve_type == "polynomial": + # df["BC mass"] = df["Incand relPeak"].apply(lambda x: polynomial(x, *params)) + bc_mass = polynomial(inc, *params) + elif curve_type == "powerlaw": + # df["BC mass"] = df["Incand relPeak"].apply(lambda x: powerlaw(x, params)) + bc_mass = powerlaw(inc, *params) + else: + # df["BC mass"] = df["Incand Mass (fg)"] + bc_mass = df["Incand Mass (fg)"].to_numpy() + + # Zero signal → NaN + # df.loc[df["Incand relPeak"] == 0, "BC mass"] = np.nan + # df["BC mass"] = np.where(inc == 0, np.nan, bc_mass) #df["BC mass"].mask(df["Incand relPeak"] == 0, np.nan) + bc_mass = np.where(inc == 0, np.nan, bc_mass) + + df["BC mass"] = bc_mass + return df + + +def apply_scatt_calibration(df, curve_type=None, params=None): + """ + Apply scattering calibration to a Dask DataFrame. + + Parameters: + df (pd.DataFrame) + curve_type (str): 'polynomial', 'powerlaw', or None (use raw size) + params (list or tuple): calibration parameters + + Returns: + df (pd.DataFrame) with new column 'Opt diam' + """ + scatt = df["Scatter relPeak"].to_numpy() + + if curve_type == "polynomial": + # df["Opt diam"] = df["Scatter relPeak"].apply(lambda x: polynomial(x, *params)) + scatt_diam = polynomial(scatt, *params) + elif curve_type == "powerlaw": + # df["Opt diam"] = df["Scatter relPeak"].apply(lambda x: powerlaw(x, *params)) + scatt_diam = powerlaw(scatt, *params) + else: + # df["Opt diam"] = df["Scatter Size (nm)"] + scatt_diam = df["Scatter relPeak"].to_numpy() + + # Zero signal → NaN + # df.loc[df["Scatter relPeak"] == 0, "Opt diam"] = np.nan + # df["Opt diam"] = df["Opt diam"].mask(df["Scatter relPeak"] == 0, np.nan) + scatt_diam = np.where(scatt == 0, np.nan, scatt_diam) + + df["Opt diam"] = scatt_diam + return df + + +def calibrate_particle_data(df, config): + """Wrapper to apply both incandescence and scattering calibration based on config.""" + inc_conf = config.get("inc_calibration", {}) + scatt_conf = config.get("scatt_calibration", {}) + + df = apply_inc_calibration( + df, + curve_type=inc_conf.get("curve_type"), + params=inc_conf.get("parameters", []), + ) + + df = apply_scatt_calibration( + df, + curve_type=scatt_conf.get("curve_type"), + params=scatt_conf.get("parameters", []), + ) + + return df