Files
SP2XR/scripts/sp2xr_apply_calibration.py

125 lines
3.6 KiB
Python

"""
Apply SP2-XR calibration to particle data.
Usage:
python scripts/apply_calibration.py \
--input path/to/input.parquet \
--config path/to/config.yaml \
--output path/to/output.parquet \
--mode add
"""
import argparse
import pandas as pd
import dask.dataframe as dd
from pathlib import Path
import yaml
import time
from dask.distributed import Client
from dask_jobqueue import SLURMCluster
from sp2xr.calibration import calibrate_particle_data
def parse_args():
parser = argparse.ArgumentParser(
description="Apply SP2-XR calibration to a dataset using Dask + SLURM"
)
parser.add_argument(
"--input", required=True, help="Input Parquet file or directory"
)
parser.add_argument(
"--config", required=True, help="YAML calibration configuration"
)
parser.add_argument(
"--output",
required=True,
help="Output directory for calibrated Parquet dataset",
)
parser.add_argument(
"--cores", type=int, default=32, help="Cores per job (default: 32)"
)
parser.add_argument(
"--memory", default="64GB", help="Memory per job (default: 64GB)"
)
parser.add_argument("--walltime", default="02:00:00", help="Walltime (default: 2h)")
parser.add_argument(
"--partition", default="daily", help="SLURM partition (default: daily)"
)
return parser.parse_args()
def apply_calibration(partition, config):
"""Apply calibration to a pandas partition."""
pdf = calibrate_particle_data(partition, config)
# pdf["date"] = pdf.index.date.astype("datetime64[ns]")
# pdf["hour"] = pdf.index.hour.astype("int64")
return pdf
def main():
args = parse_args()
output_dir = Path(args.output)
# --- Setup Dask + SLURM ---
cluster = SLURMCluster(
cores=args.cores,
processes=args.cores,
memory=args.memory,
walltime=args.walltime,
job_extra_directives=[f"--partition={args.partition}"],
)
cluster.scale(1)
client = Client(cluster)
print(f"Dask dashboard: {client.dashboard_link}")
# --- Load dataset lazily with Dask ---
df = dd.read_parquet(
args.input,
engine="pyarrow",
aggregate_files=True,
split_row_groups=False,
blocksize="32MB",
)
print(f"✅ Loaded dataset with {df.npartitions} partitions")
# --- Load calibration config ---
with open(args.config, "r") as f:
config = yaml.safe_load(f)
cfg_future = client.scatter(config, broadcast=True)
# --- Build a small empty dataframe with correct columns and dtypes ---
meta = df._meta.assign(
**{
"BC mass": pd.Series(dtype="float64"),
"Opt diam": pd.Series(dtype="float64"),
"date": pd.Series(dtype="datetime64[ns]"),
"hour": pd.Series(dtype="int64"),
}
)
# --- Apply calibration in parallel ---
calibrated_df = df.map_partitions(apply_calibration, config=cfg_future, meta=meta)
calibrated_df["date"] = df.index.dt.date.astype("date64[pyarrow]")
calibrated_df["hour"] = df.index.dt.hour.astype("i8")
# --- Save calibrated dataset (partitioned by date/hour) ---
calibrated_df.to_parquet(
path=str(output_dir),
engine="pyarrow",
partition_on=["date", "hour"], # ✅ correct arg for Dask
overwrite=True, # Dask uses 'overwrite' instead of append=False
write_index=True,
write_metadata_file=True,
)
print(
f"✅ Calibration applied successfully in {(time.time() - start_time)/60:.2f} minutes"
)
client.close()
cluster.close()
if __name__ == "__main__":
start_time = time.time()
main()