125 lines
3.6 KiB
Python
125 lines
3.6 KiB
Python
"""
|
|
Apply SP2-XR calibration to particle data.
|
|
|
|
Usage:
|
|
python scripts/apply_calibration.py \
|
|
--input path/to/input.parquet \
|
|
--config path/to/config.yaml \
|
|
--output path/to/output.parquet \
|
|
--mode add
|
|
"""
|
|
|
|
import argparse
|
|
import pandas as pd
|
|
import dask.dataframe as dd
|
|
from pathlib import Path
|
|
import yaml
|
|
import time
|
|
from dask.distributed import Client
|
|
from dask_jobqueue import SLURMCluster
|
|
from sp2xr.calibration import calibrate_particle_data
|
|
|
|
|
|
def parse_args():
|
|
parser = argparse.ArgumentParser(
|
|
description="Apply SP2-XR calibration to a dataset using Dask + SLURM"
|
|
)
|
|
parser.add_argument(
|
|
"--input", required=True, help="Input Parquet file or directory"
|
|
)
|
|
parser.add_argument(
|
|
"--config", required=True, help="YAML calibration configuration"
|
|
)
|
|
parser.add_argument(
|
|
"--output",
|
|
required=True,
|
|
help="Output directory for calibrated Parquet dataset",
|
|
)
|
|
parser.add_argument(
|
|
"--cores", type=int, default=32, help="Cores per job (default: 32)"
|
|
)
|
|
parser.add_argument(
|
|
"--memory", default="64GB", help="Memory per job (default: 64GB)"
|
|
)
|
|
parser.add_argument("--walltime", default="02:00:00", help="Walltime (default: 2h)")
|
|
parser.add_argument(
|
|
"--partition", default="daily", help="SLURM partition (default: daily)"
|
|
)
|
|
return parser.parse_args()
|
|
|
|
|
|
def apply_calibration(partition, config):
|
|
"""Apply calibration to a pandas partition."""
|
|
pdf = calibrate_particle_data(partition, config)
|
|
# pdf["date"] = pdf.index.date.astype("datetime64[ns]")
|
|
# pdf["hour"] = pdf.index.hour.astype("int64")
|
|
return pdf
|
|
|
|
|
|
def main():
|
|
args = parse_args()
|
|
output_dir = Path(args.output)
|
|
|
|
# --- Setup Dask + SLURM ---
|
|
cluster = SLURMCluster(
|
|
cores=args.cores,
|
|
processes=args.cores,
|
|
memory=args.memory,
|
|
walltime=args.walltime,
|
|
job_extra_directives=[f"--partition={args.partition}"],
|
|
)
|
|
cluster.scale(1)
|
|
client = Client(cluster)
|
|
print(f"Dask dashboard: {client.dashboard_link}")
|
|
|
|
# --- Load dataset lazily with Dask ---
|
|
df = dd.read_parquet(
|
|
args.input,
|
|
engine="pyarrow",
|
|
aggregate_files=True,
|
|
split_row_groups=False,
|
|
blocksize="32MB",
|
|
)
|
|
print(f"✅ Loaded dataset with {df.npartitions} partitions")
|
|
|
|
# --- Load calibration config ---
|
|
with open(args.config, "r") as f:
|
|
config = yaml.safe_load(f)
|
|
cfg_future = client.scatter(config, broadcast=True)
|
|
|
|
# --- Build a small empty dataframe with correct columns and dtypes ---
|
|
meta = df._meta.assign(
|
|
**{
|
|
"BC mass": pd.Series(dtype="float64"),
|
|
"Opt diam": pd.Series(dtype="float64"),
|
|
"date": pd.Series(dtype="datetime64[ns]"),
|
|
"hour": pd.Series(dtype="int64"),
|
|
}
|
|
)
|
|
|
|
# --- Apply calibration in parallel ---
|
|
calibrated_df = df.map_partitions(apply_calibration, config=cfg_future, meta=meta)
|
|
calibrated_df["date"] = df.index.dt.date.astype("date64[pyarrow]")
|
|
calibrated_df["hour"] = df.index.dt.hour.astype("i8")
|
|
|
|
# --- Save calibrated dataset (partitioned by date/hour) ---
|
|
calibrated_df.to_parquet(
|
|
path=str(output_dir),
|
|
engine="pyarrow",
|
|
partition_on=["date", "hour"], # ✅ correct arg for Dask
|
|
overwrite=True, # Dask uses 'overwrite' instead of append=False
|
|
write_index=True,
|
|
write_metadata_file=True,
|
|
)
|
|
|
|
print(
|
|
f"✅ Calibration applied successfully in {(time.time() - start_time)/60:.2f} minutes"
|
|
)
|
|
client.close()
|
|
cluster.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
start_time = time.time()
|
|
main()
|