Refactor: implemented input run_config.yaml file

This commit is contained in:
2025-08-13 14:09:58 +02:00
parent fa4cb9e7d4
commit 6f87b4cc79
3 changed files with 266 additions and 140 deletions

View File

@@ -1,14 +1,15 @@
from __future__ import annotations
import argparse
from pathlib import Path
from dask_jobqueue import SLURMCluster
from dask.distributed import Client
import yaml
import time
import dask.dataframe as dd
import pandas as pd
import numpy as np
from sp2xr.helpers import (
parse_args,
load_and_resolve_config,
initialize_cluster,
)
from sp2xr.calibration import calibrate_particle_data
from sp2xr.flag_single_particle_data import define_flags, add_thin_thick_flags
from sp2xr.resample_pbp_hk import build_dt_summary, resample_hk_partition
@@ -20,79 +21,47 @@ from sp2xr.distribution import (
from sp2xr.concentrations import add_concentrations
def parse_args():
p = argparse.ArgumentParser(
description="SP2-XR end-to-end pipeline (calibration → 1-s summary → histograms)"
)
# required paths
p.add_argument("--input_pbp", required=True, help="root dir with raw PbP Parquet")
p.add_argument(
"--input_hk", required=True, help="root dir with house-keeping Parquet"
)
p.add_argument("--output", required=True, help="root dir where results are written")
p.add_argument("--config", required=True, help="YAML with calibration coefficients")
# optional workflow switches
p.add_argument("--conc", action="store_true", help="write bulk concentration table")
p.add_argument(
"--hist", action="store_true", help="write size/mass/time-lag histograms"
)
p.add_argument(
"--dt", type=int, default=1, help="aggregation interval in seconds (default: 1)"
)
# cluster / resource knobs (all optional)
p.add_argument(
"--cores", type=int, default=8, help="CPU cores per SLURM job (default: 8)"
)
p.add_argument("--memory", default="64GB", help="RAM per job (default: 64GB)")
p.add_argument(
"--walltime", default="00:59:00", help="SLURM wall-time (default: 1h)"
)
p.add_argument(
"--partition", default="hourly", help="SLURM partition (default: hourly)"
)
return p.parse_args()
def make_cluster(cores=32, mem="64GB", wall="00:59:00", partition="hourly", out_dir=""):
cluster = SLURMCluster(
cores=cores,
processes=cores,
memory=mem,
walltime=wall,
job_extra_directives=[
f"--partition={partition}",
f"--output={out_dir}/slurm-%j.out",
],
)
client = Client(cluster)
cluster.scale(1)
client.wait_for_workers(1, timeout=600)
print(f"Dask dashboard: {client.dashboard_link}")
return client
def main():
args = parse_args()
output_dir = Path(args.output)
config = load_and_resolve_config(args)
dt_s = args.dt
client = initialize_cluster(config)
"""args = parse_args()
base = load_yaml_cfg(args.config)
base = apply_sets(base, args.set)
# resolved values (CLI > config > built-in defaults)
input_pbp = choose(args.input_pbp, base, "paths.input_pbp")
input_hk = choose(args.input_hk, base, "paths.input_hk")
output = choose(args.output, base, "paths.output")
instr_cfg = choose(args.instr_config, base, "paths.instrument_config")
dt = choose(args.dt, base, "workflow.dt", 1)
do_conc = args.conc or get(base, "workflow.conc", False)
do_hist = args.hist or get(base, "workflow.hist", False)
rho_eff = choose(args.BC_rho, base, "bc.rho_eff", 1.8)
BC_type = choose(args.BC_type, base, "bc.type", "constant_effective_density")
cores = choose(args.cores, base, "cluster.cores", 8)
memory = choose(args.memory, base, "cluster.memory", "64GB")
walltime = choose(args.walltime, base, "cluster.walltime", "00:59:00")
part = choose(args.partition, base, "cluster.partition", "hourly")
log_dir = get(base, "cluster.log_dir", "./slurm_out")
# optional: persist fully-resolved config for provenance
os.makedirs(output, exist_ok=True)
with open(os.path.join(output, "config.resolved.yaml"), "w") as f:
yaml.safe_dump(base, f, sort_keys=False)
# 0. cluster -------------------------------------------------------
client = make_cluster(
cores=args.cores,
mem=args.memory,
wall=args.walltime,
partition=args.partition,
out_dir="/data/user/bertoz_b/merlin6data/SP2XR_code/slurm_out/",
cores=cores, mem=memory, wall=walltime, partition=part, out_dir=log_dir
)
"""
# 1. calibration stage --------------------------------------------
cfg = yaml.safe_load(open(args.config))
cfg = yaml.safe_load(open(config["instr_cfg"]))
cfg_future = client.scatter(cfg, broadcast=True)
inc_mass_bin_lims = np.logspace(
@@ -101,18 +70,12 @@ def main():
cfg["inc_histo"]["n_bins"],
)
inc_mass_bin_ctrs = bin_lims_to_ctrs(inc_mass_bin_lims)
# mass_bin_labels = np.round(inc_mass_bin_ctrs, 2)
rho_eff = 1800
BC_type = "constant_effective_density"
ddf_raw = dd.read_parquet(
args.input_pbp, engine="pyarrow", aggregate_files=True, blocksize="32MB"
)
ddf_raw = dd.read_parquet(config["input_pbp"], engine="pyarrow")
meta_cal = ddf_raw._meta.copy()
meta_cal["BC mass"] = pd.Series([], dtype="float64")
meta_cal["Opt diam"] = pd.Series([], dtype="float64")
# meta_cal["ratio_inc_scatt"] = pd.Series([], dtype="float64")
meta_cal["time_lag"] = pd.Series([], dtype="int8")
meta_cal["date"] = pd.Series([], dtype="datetime64[ns]")
meta_cal["hour"] = pd.Series([], dtype="int8")
@@ -146,15 +109,12 @@ def main():
meta_dt = build_dt_summary(ddf_cal._meta) # empty frame for metadata
ddf_pbp_dt = ddf_cal.map_partitions(build_dt_summary, dt_s=args.dt, meta=meta_dt)
ddf_pbp_dt = ddf_cal.map_partitions(
build_dt_summary, dt_s=config["dt"], meta=meta_dt
)
# 2. load house-keeping once --------------------------------------
ddf_hk = dd.read_parquet(
args.input_hk,
engine="pyarrow",
# aggregate_files=True,
# blocksize="32MB"
)
ddf_hk = dd.read_parquet(config["input_hk"], engine="pyarrow")
# Create a meta (empty dataframe with correct schema)
meta = pd.DataFrame(
{
@@ -168,24 +128,9 @@ def main():
# Map partition-wise
ddf_hk_dt = ddf_hk.map_partitions(
resample_hk_partition, dt=f"{args.dt}s", meta=meta
resample_hk_partition, dt=f"{config['dt']}s", meta=meta
)
'''def add_flow_same_second(pdf, flow_pdf, tol="999ms"):
"""
pdf : particle partition (pandas)
flow_pdf : *entire* flow table (pandas)
tol : max time difference accepted (string, Timedelta, or None)
"""
return pd.merge_asof(
pdf.sort_index(),
flow_pdf,
left_index=True,
right_index=True,
direction="backward",
tolerance=pd.Timedelta(tol),
)'''
flow_dt = ddf_hk_dt["Sample Flow Controller Read (vccm)"].compute()
def floor_index_to_dt(pdf: pd.DataFrame) -> pd.DataFrame:
@@ -193,7 +138,7 @@ def main():
Replace the existing DatetimeIndex with its lower-second value,
without changing the indexs name or creating a new column.
"""
pdf.index = pdf.index.floor(f"{args.dt}s") # keeps the original .name
pdf.index = pdf.index.floor(f"{config['dt']}s") # keeps the original .name
return pdf
# meta stays identical because only the *values* of the index change
@@ -204,11 +149,6 @@ def main():
np.float64()
) # tell Dask a new float col appears
"""ddf_pbp_with_flow = ddf_cal.map_partitions(
add_flow_same_second,
flow_pdf=flow_1s, # ← broadcast
meta=meta_pbp_with_flow,
)"""
ddf_pbp_with_flow = ddf_cal.map_partitions(
lambda part: part.join(flow_dt, how="left"),
meta=meta_pbp_with_flow,
@@ -226,7 +166,7 @@ def main():
)
ddf_pbp_with_flow.to_parquet(
path=f"{output_dir}/pbp_calibrated",
path=f"{config['output']}/pbp_calibrated",
partition_on=["date", "hour"],
engine="pyarrow",
write_index=True,
@@ -247,7 +187,7 @@ def main():
ddf_pbp_hk_dt = ddf_pbp_hk_dt.drop(columns=["date_x", "hour_x", "date_y", "hour_y"])
ddf_pbp_hk_dt.to_parquet(
path=f"{output_dir}/combined_pbp_hk_{dt_s}s",
path=f"{config['output']}/combined_pbp_hk_{config['dt']}s",
partition_on=["date", "hour"],
engine="pyarrow",
write_index=True,
@@ -256,24 +196,26 @@ def main():
)
# 4. (optional) dt bulk conc --------------------------
do_conc = args.conc
meta_conc = add_concentrations(ddf_pbp_hk_dt._meta, dt=dt_s) if do_conc else None
meta_conc = (
add_concentrations(ddf_pbp_hk_dt._meta, dt=config["dt"])
if config["do_conc"]
else None
)
if do_conc:
if config["do_conc"]:
print("ok")
ddf_conc = ddf_pbp_hk_dt.map_partitions(
add_concentrations, dt=dt_s, meta=meta_conc
add_concentrations, dt=config["dt"], meta=meta_conc
)
ddf_conc.to_parquet(
f"{output_dir}/conc_{dt_s}s",
f"{config['output']}/conc_{config['dt']}s",
partition_on=["date"],
overwrite=True,
)
# 5. (optional) dt histograms --------------------------
do_hist = args.hist
if do_hist:
if config["do_hist"]:
# ========= 3. RUN MASS HISTOGRAMS =========
# --- Mass histogram configs
hist_configs = [
@@ -307,8 +249,8 @@ def main():
bin_ctrs=inc_mass_bin_ctrs,
kind="mass",
flag_col=cfg_hist["flag_col"],
rho_eff=rho_eff,
BC_type=BC_type,
rho_eff=config["rho_eff"],
BC_type=config["BC_type"],
)
ddf_out = ddf_pbp_with_flow.map_partitions(
process_hist_and_dist_partition,
@@ -317,11 +259,11 @@ def main():
flag_value=cfg_hist["flag_value"],
bin_lims=inc_mass_bin_lims,
bin_ctrs=inc_mass_bin_ctrs,
dt=dt_s,
dt=config["dt"],
calculate_conc=True,
flow=None,
rho_eff=rho_eff,
BC_type=BC_type,
rho_eff=config["rho_eff"],
BC_type=config["BC_type"],
t=1,
meta=meta_hist, # <-- single line does the trick
)
@@ -350,7 +292,7 @@ def main():
flag_value=None,
bin_lims=scatt_bin_lims,
bin_ctrs=scatt_bin_ctrs,
dt=dt_s,
dt=config["dt"],
calculate_conc=True,
flow=None,
rho_eff=None,
@@ -376,9 +318,6 @@ def main():
name_prefix = f"dNdlogDmev_{inc_mass_bin_ctrs[idx]:.2f}_timelag"
# meta_cols = [f"{name_prefix}_{i:.2f}" for i in timelag_bin_ctrs]
# meta_float = pd.DataFrame({c: pd.Series([], dtype="float64") for c in meta_cols})
meta_hist = make_hist_meta(
bin_ctrs=timelag_bin_ctrs,
kind="timelag",
@@ -388,16 +327,6 @@ def main():
BC_type=None,
)
# meta_cols = [f"dNdlogDmev_{inc_mass_bin_ctrs[idx]:.2f}_particles_for_tl_dist_{i:.2f}" for i in timelag_bin_ctrs]
"""bin_center = (
inc_mass_bin_ctrs[int(mass_bin)]
if mass_bin < len(inc_mass_bin_ctrs)
else mass_bin
)
meta_cols = [
f"dNdlogDmev_{bin_center:.2f}_particles_for_tl_dist_{i:.2f}" for i in timelag_bin_ctrs
]"""
tl_ddf = ddf_bin.map_partitions(
process_hist_and_dist_partition,
col="time_lag",
@@ -405,7 +334,7 @@ def main():
flag_value=1,
bin_lims=timelag_bins_lims,
bin_ctrs=timelag_bin_ctrs,
dt=dt_s,
dt=config["dt"],
calculate_conc=True,
flow=None, # ddf_pbp_hk["Sample Flow Controller Read (vccm)"],
rho_eff=None,
@@ -415,17 +344,12 @@ def main():
meta=meta_hist, # pd.DataFrame(columns=meta_cols)
)
# tl_ddf.columns = meta_cols
results.append(tl_ddf)
# ========= 6. MERGE ALL HISTOGRAMS =========
# merged_ddf = reduce(lambda left, right: left.join(right), results)
merged_ddf = dd.concat(results, axis=1, interleave_partitions=True)
# merged_ddf = merged_ddf.rename_axis(index="time")
# Add normalized date column
# merged_ddf = merged_ddf.assign(date=merged_ddf.index.to_series().dt.normalize())
index_as_dt = dd.to_datetime(merged_ddf.index.to_series())
merged_ddf["date"] = index_as_dt.map_partitions(
lambda s: s.dt.normalize(), meta=("date", "datetime64[ns]")
@@ -434,7 +358,7 @@ def main():
# ========= 7. SAVE TO PARQUET =========
merged_ddf.to_parquet(
f"{output_dir}/hists_{dt_s}s",
f"{config['output']}/hists_{config['dt']}s",
partition_on=["date"],
overwrite=True,
)

View File

@@ -1,10 +1,177 @@
import os
import re
import yaml
import argparse
from pathlib import Path
from dask_jobqueue import SLURMCluster
from dask.distributed import Client
from typing import Optional, List
def load_and_resolve_config(args):
base = load_yaml_cfg(args.config)
base = apply_sets(base, args.set)
config = {
"input_pbp": choose(args.input_pbp, base, "paths.input_pbp"),
"input_hk": choose(args.input_hk, base, "paths.input_hk"),
"output": choose(args.output, base, "paths.output"),
"instr_cfg": choose(args.instr_config, base, "paths.instrument_config"),
"dt": choose(args.dt, base, "workflow.dt", 1),
"do_conc": args.conc or get(base, "workflow.conc", False),
"do_hist": args.hist or get(base, "workflow.hist", False),
"rho_eff": choose(args.BC_rho, base, "bc.rho_eff", 1.8),
"BC_type": choose(args.BC_type, base, "bc.type", "constant_effective_density"),
"cluster": {
"cores": choose(args.cores, base, "cluster.cores", 8),
"memory": choose(args.memory, base, "cluster.memory", "64GB"),
"walltime": choose(args.walltime, base, "cluster.walltime", "00:59:00"),
"partition": choose(args.partition, base, "cluster.partition", "hourly"),
"log_dir": get(base, "cluster.log_dir", "./slurm_out"),
},
"base": base,
}
os.makedirs(config["output"], exist_ok=True)
with open(os.path.join(config["output"], "config.resolved.yaml"), "w") as f:
yaml.safe_dump(base, f, sort_keys=False)
return config
def initialize_cluster(config):
return make_cluster(
cores=config["cluster"]["cores"],
mem=config["cluster"]["memory"],
wall=config["cluster"]["walltime"],
partition=config["cluster"]["partition"],
out_dir=config["cluster"]["log_dir"],
)
def make_cluster(cores=32, mem="64GB", wall="00:59:00", partition="hourly", out_dir=""):
cluster = SLURMCluster(
cores=cores,
processes=cores,
memory=mem,
walltime=wall,
job_extra_directives=[
f"--partition={partition}",
f"--output={out_dir}/slurm-%j.out",
],
)
client = Client(cluster)
cluster.scale(1)
client.wait_for_workers(1, timeout=600)
print(f"Dask dashboard: {client.dashboard_link}")
return client
def _expand(p):
return None if p is None else os.path.expandvars(os.path.expanduser(p))
def load_yaml_cfg(path):
with open(path) as f:
cfg = yaml.safe_load(f) or {}
# expand common paths
for k in ("paths",):
if k in cfg:
for kk, vv in list(cfg[k].items()):
cfg[k][kk] = _expand(vv)
return cfg
def parse_args():
p = argparse.ArgumentParser(
description="SP2-XR end-to-end pipeline (calibration → 1-s summary → histograms)"
)
p.add_argument("--config", required=True, help="General YAML config")
# required paths
p.add_argument("--input_pbp", required=False, help="root dir with raw PbP Parquet")
p.add_argument(
"--input_hk", required=False, help="root dir with house-keeping Parquet"
)
p.add_argument(
"--output", required=False, help="root dir where results are written"
)
p.add_argument(
"--instr_config", required=False, help="YAML with calibration coefficients"
)
# optional workflow switches
p.add_argument("--conc", action="store_true", help="write bulk concentration table")
p.add_argument(
"--hist", action="store_true", help="write size/mass/time-lag histograms"
)
p.add_argument(
"--dt", type=int, default=1, help="aggregation interval in seconds (default: 1)"
)
# BC proeprties
p.add_argument(
"--BC_rho", type=float, default=1.8, help="BC density (default: 1.8)"
)
p.add_argument(
"--BC_type",
type=str,
default="constant_effective_density",
help="BC type (default: constant_effective_density)",
)
# cluster / resource knobs (all optional)
p.add_argument(
"--cores", type=int, default=8, help="CPU cores per SLURM job (default: 8)"
)
p.add_argument("--memory", default="64GB", help="RAM per job (default: 64GB)")
p.add_argument(
"--walltime", default="00:59:00", help="SLURM wall-time (default: 1h)"
)
p.add_argument(
"--partition", default="hourly", help="SLURM partition (default: hourly)"
)
p.add_argument("--set", action="append", default=[], help="dot.notation overrides")
return p.parse_args()
def apply_sets(cfg, sets):
def coerce(v):
if v.lower() in ("true", "false"):
return v.lower() == "true"
try:
if "." in v:
return float(v)
return int(v)
except (ValueError, TypeError):
return v
for item in sets:
key, _, val = item.partition("=")
if not _:
raise ValueError(f"--set needs key=value, got: {item}")
parts = key.split(".")
cur = cfg
for p in parts[:-1]:
cur = cur.setdefault(p, {})
cur[parts[-1]] = coerce(val)
return cfg
def get(cfg, dotted, default=None):
cur = cfg
for part in dotted.split("."):
if not isinstance(cur, dict) or part not in cur:
return default
cur = cur[part]
return cur
def choose(cli_val, cfg, dotted, default=None):
return cli_val if cli_val is not None else get(cfg, dotted, default)
def find_matching_hk_file(pbp_path: str) -> Optional[str]:
"""
From a PbP filename, return a path to the corresponding hk file (.csv or .zip), if found.

35
tests/run_config.yaml Normal file
View File

@@ -0,0 +1,35 @@
paths:
input_pbp: tests/data/pbp_files_test
input_hk: tests/data/hk_files_test
output: tests/data3
instrument_config: tests/data/instrument_config.yaml
workflow:
conc: true
hist: true
dt: 1 # seconds
cluster:
cores: 8
memory: 64GB
walltime: "00:59:00"
partition: hourly
log_dir: ./slurm_out
bc:
rho_eff: 1800
type: constant_effective_density
histo:
inc:
min_mass: 1e-18
max_mass: 1e-12
n_bins: 40
scatt:
min_D: 100
max_D: 1000
n_bins: 30
timelag:
min: -50
max: 100
n_bins: 150