diff --git a/scripts/sp2xr_pipeline.py b/scripts/sp2xr_pipeline.py index d479f66..d659768 100644 --- a/scripts/sp2xr_pipeline.py +++ b/scripts/sp2xr_pipeline.py @@ -1,14 +1,15 @@ from __future__ import annotations -import argparse -from pathlib import Path -from dask_jobqueue import SLURMCluster -from dask.distributed import Client import yaml import time import dask.dataframe as dd import pandas as pd import numpy as np +from sp2xr.helpers import ( + parse_args, + load_and_resolve_config, + initialize_cluster, +) from sp2xr.calibration import calibrate_particle_data from sp2xr.flag_single_particle_data import define_flags, add_thin_thick_flags from sp2xr.resample_pbp_hk import build_dt_summary, resample_hk_partition @@ -20,79 +21,47 @@ from sp2xr.distribution import ( from sp2xr.concentrations import add_concentrations -def parse_args(): - p = argparse.ArgumentParser( - description="SP2-XR end-to-end pipeline (calibration → 1-s summary → histograms)" - ) - - # required paths - p.add_argument("--input_pbp", required=True, help="root dir with raw PbP Parquet") - p.add_argument( - "--input_hk", required=True, help="root dir with house-keeping Parquet" - ) - p.add_argument("--output", required=True, help="root dir where results are written") - p.add_argument("--config", required=True, help="YAML with calibration coefficients") - - # optional workflow switches - p.add_argument("--conc", action="store_true", help="write bulk concentration table") - p.add_argument( - "--hist", action="store_true", help="write size/mass/time-lag histograms" - ) - p.add_argument( - "--dt", type=int, default=1, help="aggregation interval in seconds (default: 1)" - ) - - # cluster / resource knobs (all optional) - p.add_argument( - "--cores", type=int, default=8, help="CPU cores per SLURM job (default: 8)" - ) - p.add_argument("--memory", default="64GB", help="RAM per job (default: 64GB)") - p.add_argument( - "--walltime", default="00:59:00", help="SLURM wall-time (default: 1h)" - ) - p.add_argument( - "--partition", default="hourly", help="SLURM partition (default: hourly)" - ) - - return p.parse_args() - - -def make_cluster(cores=32, mem="64GB", wall="00:59:00", partition="hourly", out_dir=""): - cluster = SLURMCluster( - cores=cores, - processes=cores, - memory=mem, - walltime=wall, - job_extra_directives=[ - f"--partition={partition}", - f"--output={out_dir}/slurm-%j.out", - ], - ) - - client = Client(cluster) - cluster.scale(1) - client.wait_for_workers(1, timeout=600) - print(f"Dask dashboard: {client.dashboard_link}") - return client - - def main(): args = parse_args() - output_dir = Path(args.output) + config = load_and_resolve_config(args) - dt_s = args.dt + client = initialize_cluster(config) + """args = parse_args() + base = load_yaml_cfg(args.config) + base = apply_sets(base, args.set) + + # resolved values (CLI > config > built-in defaults) + input_pbp = choose(args.input_pbp, base, "paths.input_pbp") + input_hk = choose(args.input_hk, base, "paths.input_hk") + output = choose(args.output, base, "paths.output") + instr_cfg = choose(args.instr_config, base, "paths.instrument_config") + + dt = choose(args.dt, base, "workflow.dt", 1) + do_conc = args.conc or get(base, "workflow.conc", False) + do_hist = args.hist or get(base, "workflow.hist", False) + + rho_eff = choose(args.BC_rho, base, "bc.rho_eff", 1.8) + BC_type = choose(args.BC_type, base, "bc.type", "constant_effective_density") + + cores = choose(args.cores, base, "cluster.cores", 8) + memory = choose(args.memory, base, "cluster.memory", "64GB") + walltime = choose(args.walltime, base, "cluster.walltime", "00:59:00") + part = choose(args.partition, base, "cluster.partition", "hourly") + log_dir = get(base, "cluster.log_dir", "./slurm_out") + + # optional: persist fully-resolved config for provenance + os.makedirs(output, exist_ok=True) + with open(os.path.join(output, "config.resolved.yaml"), "w") as f: + yaml.safe_dump(base, f, sort_keys=False) + # 0. cluster ------------------------------------------------------- client = make_cluster( - cores=args.cores, - mem=args.memory, - wall=args.walltime, - partition=args.partition, - out_dir="/data/user/bertoz_b/merlin6data/SP2XR_code/slurm_out/", + cores=cores, mem=memory, wall=walltime, partition=part, out_dir=log_dir ) - + """ # 1. calibration stage -------------------------------------------- - cfg = yaml.safe_load(open(args.config)) + cfg = yaml.safe_load(open(config["instr_cfg"])) cfg_future = client.scatter(cfg, broadcast=True) inc_mass_bin_lims = np.logspace( @@ -101,18 +70,12 @@ def main(): cfg["inc_histo"]["n_bins"], ) inc_mass_bin_ctrs = bin_lims_to_ctrs(inc_mass_bin_lims) - # mass_bin_labels = np.round(inc_mass_bin_ctrs, 2) - rho_eff = 1800 - BC_type = "constant_effective_density" - ddf_raw = dd.read_parquet( - args.input_pbp, engine="pyarrow", aggregate_files=True, blocksize="32MB" - ) + ddf_raw = dd.read_parquet(config["input_pbp"], engine="pyarrow") meta_cal = ddf_raw._meta.copy() meta_cal["BC mass"] = pd.Series([], dtype="float64") meta_cal["Opt diam"] = pd.Series([], dtype="float64") - # meta_cal["ratio_inc_scatt"] = pd.Series([], dtype="float64") meta_cal["time_lag"] = pd.Series([], dtype="int8") meta_cal["date"] = pd.Series([], dtype="datetime64[ns]") meta_cal["hour"] = pd.Series([], dtype="int8") @@ -146,15 +109,12 @@ def main(): meta_dt = build_dt_summary(ddf_cal._meta) # empty frame for metadata - ddf_pbp_dt = ddf_cal.map_partitions(build_dt_summary, dt_s=args.dt, meta=meta_dt) + ddf_pbp_dt = ddf_cal.map_partitions( + build_dt_summary, dt_s=config["dt"], meta=meta_dt + ) # 2. load house-keeping once -------------------------------------- - ddf_hk = dd.read_parquet( - args.input_hk, - engine="pyarrow", - # aggregate_files=True, - # blocksize="32MB" - ) + ddf_hk = dd.read_parquet(config["input_hk"], engine="pyarrow") # Create a meta (empty dataframe with correct schema) meta = pd.DataFrame( { @@ -168,24 +128,9 @@ def main(): # Map partition-wise ddf_hk_dt = ddf_hk.map_partitions( - resample_hk_partition, dt=f"{args.dt}s", meta=meta + resample_hk_partition, dt=f"{config['dt']}s", meta=meta ) - '''def add_flow_same_second(pdf, flow_pdf, tol="999ms"): - """ - pdf : particle partition (pandas) - flow_pdf : *entire* flow table (pandas) - tol : max time difference accepted (string, Timedelta, or None) - """ - return pd.merge_asof( - pdf.sort_index(), - flow_pdf, - left_index=True, - right_index=True, - direction="backward", - tolerance=pd.Timedelta(tol), - )''' - flow_dt = ddf_hk_dt["Sample Flow Controller Read (vccm)"].compute() def floor_index_to_dt(pdf: pd.DataFrame) -> pd.DataFrame: @@ -193,7 +138,7 @@ def main(): Replace the existing DatetimeIndex with its lower-second value, without changing the index’s name or creating a new column. """ - pdf.index = pdf.index.floor(f"{args.dt}s") # keeps the original .name + pdf.index = pdf.index.floor(f"{config['dt']}s") # keeps the original .name return pdf # meta stays identical because only the *values* of the index change @@ -204,11 +149,6 @@ def main(): np.float64() ) # tell Dask a new float col appears - """ddf_pbp_with_flow = ddf_cal.map_partitions( - add_flow_same_second, - flow_pdf=flow_1s, # ← broadcast - meta=meta_pbp_with_flow, - )""" ddf_pbp_with_flow = ddf_cal.map_partitions( lambda part: part.join(flow_dt, how="left"), meta=meta_pbp_with_flow, @@ -226,7 +166,7 @@ def main(): ) ddf_pbp_with_flow.to_parquet( - path=f"{output_dir}/pbp_calibrated", + path=f"{config['output']}/pbp_calibrated", partition_on=["date", "hour"], engine="pyarrow", write_index=True, @@ -247,7 +187,7 @@ def main(): ddf_pbp_hk_dt = ddf_pbp_hk_dt.drop(columns=["date_x", "hour_x", "date_y", "hour_y"]) ddf_pbp_hk_dt.to_parquet( - path=f"{output_dir}/combined_pbp_hk_{dt_s}s", + path=f"{config['output']}/combined_pbp_hk_{config['dt']}s", partition_on=["date", "hour"], engine="pyarrow", write_index=True, @@ -256,24 +196,26 @@ def main(): ) # 4. (optional) dt bulk conc -------------------------- - do_conc = args.conc - meta_conc = add_concentrations(ddf_pbp_hk_dt._meta, dt=dt_s) if do_conc else None + meta_conc = ( + add_concentrations(ddf_pbp_hk_dt._meta, dt=config["dt"]) + if config["do_conc"] + else None + ) - if do_conc: + if config["do_conc"]: print("ok") ddf_conc = ddf_pbp_hk_dt.map_partitions( - add_concentrations, dt=dt_s, meta=meta_conc + add_concentrations, dt=config["dt"], meta=meta_conc ) ddf_conc.to_parquet( - f"{output_dir}/conc_{dt_s}s", + f"{config['output']}/conc_{config['dt']}s", partition_on=["date"], overwrite=True, ) # 5. (optional) dt histograms -------------------------- - do_hist = args.hist - if do_hist: + if config["do_hist"]: # ========= 3. RUN MASS HISTOGRAMS ========= # --- Mass histogram configs hist_configs = [ @@ -307,8 +249,8 @@ def main(): bin_ctrs=inc_mass_bin_ctrs, kind="mass", flag_col=cfg_hist["flag_col"], - rho_eff=rho_eff, - BC_type=BC_type, + rho_eff=config["rho_eff"], + BC_type=config["BC_type"], ) ddf_out = ddf_pbp_with_flow.map_partitions( process_hist_and_dist_partition, @@ -317,11 +259,11 @@ def main(): flag_value=cfg_hist["flag_value"], bin_lims=inc_mass_bin_lims, bin_ctrs=inc_mass_bin_ctrs, - dt=dt_s, + dt=config["dt"], calculate_conc=True, flow=None, - rho_eff=rho_eff, - BC_type=BC_type, + rho_eff=config["rho_eff"], + BC_type=config["BC_type"], t=1, meta=meta_hist, # <-- single line does the trick ) @@ -350,7 +292,7 @@ def main(): flag_value=None, bin_lims=scatt_bin_lims, bin_ctrs=scatt_bin_ctrs, - dt=dt_s, + dt=config["dt"], calculate_conc=True, flow=None, rho_eff=None, @@ -376,9 +318,6 @@ def main(): name_prefix = f"dNdlogDmev_{inc_mass_bin_ctrs[idx]:.2f}_timelag" - # meta_cols = [f"{name_prefix}_{i:.2f}" for i in timelag_bin_ctrs] - # meta_float = pd.DataFrame({c: pd.Series([], dtype="float64") for c in meta_cols}) - meta_hist = make_hist_meta( bin_ctrs=timelag_bin_ctrs, kind="timelag", @@ -388,16 +327,6 @@ def main(): BC_type=None, ) - # meta_cols = [f"dNdlogDmev_{inc_mass_bin_ctrs[idx]:.2f}_particles_for_tl_dist_{i:.2f}" for i in timelag_bin_ctrs] - """bin_center = ( - inc_mass_bin_ctrs[int(mass_bin)] - if mass_bin < len(inc_mass_bin_ctrs) - else mass_bin - ) - meta_cols = [ - f"dNdlogDmev_{bin_center:.2f}_particles_for_tl_dist_{i:.2f}" for i in timelag_bin_ctrs - ]""" - tl_ddf = ddf_bin.map_partitions( process_hist_and_dist_partition, col="time_lag", @@ -405,7 +334,7 @@ def main(): flag_value=1, bin_lims=timelag_bins_lims, bin_ctrs=timelag_bin_ctrs, - dt=dt_s, + dt=config["dt"], calculate_conc=True, flow=None, # ddf_pbp_hk["Sample Flow Controller Read (vccm)"], rho_eff=None, @@ -415,17 +344,12 @@ def main(): meta=meta_hist, # pd.DataFrame(columns=meta_cols) ) - # tl_ddf.columns = meta_cols results.append(tl_ddf) # ========= 6. MERGE ALL HISTOGRAMS ========= - - # merged_ddf = reduce(lambda left, right: left.join(right), results) merged_ddf = dd.concat(results, axis=1, interleave_partitions=True) - # merged_ddf = merged_ddf.rename_axis(index="time") # Add normalized date column - # merged_ddf = merged_ddf.assign(date=merged_ddf.index.to_series().dt.normalize()) index_as_dt = dd.to_datetime(merged_ddf.index.to_series()) merged_ddf["date"] = index_as_dt.map_partitions( lambda s: s.dt.normalize(), meta=("date", "datetime64[ns]") @@ -434,7 +358,7 @@ def main(): # ========= 7. SAVE TO PARQUET ========= merged_ddf.to_parquet( - f"{output_dir}/hists_{dt_s}s", + f"{config['output']}/hists_{config['dt']}s", partition_on=["date"], overwrite=True, ) diff --git a/src/sp2xr/helpers.py b/src/sp2xr/helpers.py index 95a01e2..cba43b9 100644 --- a/src/sp2xr/helpers.py +++ b/src/sp2xr/helpers.py @@ -1,10 +1,177 @@ import os import re import yaml +import argparse from pathlib import Path +from dask_jobqueue import SLURMCluster +from dask.distributed import Client from typing import Optional, List +def load_and_resolve_config(args): + base = load_yaml_cfg(args.config) + base = apply_sets(base, args.set) + + config = { + "input_pbp": choose(args.input_pbp, base, "paths.input_pbp"), + "input_hk": choose(args.input_hk, base, "paths.input_hk"), + "output": choose(args.output, base, "paths.output"), + "instr_cfg": choose(args.instr_config, base, "paths.instrument_config"), + "dt": choose(args.dt, base, "workflow.dt", 1), + "do_conc": args.conc or get(base, "workflow.conc", False), + "do_hist": args.hist or get(base, "workflow.hist", False), + "rho_eff": choose(args.BC_rho, base, "bc.rho_eff", 1.8), + "BC_type": choose(args.BC_type, base, "bc.type", "constant_effective_density"), + "cluster": { + "cores": choose(args.cores, base, "cluster.cores", 8), + "memory": choose(args.memory, base, "cluster.memory", "64GB"), + "walltime": choose(args.walltime, base, "cluster.walltime", "00:59:00"), + "partition": choose(args.partition, base, "cluster.partition", "hourly"), + "log_dir": get(base, "cluster.log_dir", "./slurm_out"), + }, + "base": base, + } + + os.makedirs(config["output"], exist_ok=True) + with open(os.path.join(config["output"], "config.resolved.yaml"), "w") as f: + yaml.safe_dump(base, f, sort_keys=False) + + return config + + +def initialize_cluster(config): + return make_cluster( + cores=config["cluster"]["cores"], + mem=config["cluster"]["memory"], + wall=config["cluster"]["walltime"], + partition=config["cluster"]["partition"], + out_dir=config["cluster"]["log_dir"], + ) + + +def make_cluster(cores=32, mem="64GB", wall="00:59:00", partition="hourly", out_dir=""): + cluster = SLURMCluster( + cores=cores, + processes=cores, + memory=mem, + walltime=wall, + job_extra_directives=[ + f"--partition={partition}", + f"--output={out_dir}/slurm-%j.out", + ], + ) + + client = Client(cluster) + cluster.scale(1) + client.wait_for_workers(1, timeout=600) + print(f"Dask dashboard: {client.dashboard_link}") + return client + + +def _expand(p): + return None if p is None else os.path.expandvars(os.path.expanduser(p)) + + +def load_yaml_cfg(path): + with open(path) as f: + cfg = yaml.safe_load(f) or {} + # expand common paths + for k in ("paths",): + if k in cfg: + for kk, vv in list(cfg[k].items()): + cfg[k][kk] = _expand(vv) + return cfg + + +def parse_args(): + p = argparse.ArgumentParser( + description="SP2-XR end-to-end pipeline (calibration → 1-s summary → histograms)" + ) + p.add_argument("--config", required=True, help="General YAML config") + + # required paths + p.add_argument("--input_pbp", required=False, help="root dir with raw PbP Parquet") + p.add_argument( + "--input_hk", required=False, help="root dir with house-keeping Parquet" + ) + p.add_argument( + "--output", required=False, help="root dir where results are written" + ) + p.add_argument( + "--instr_config", required=False, help="YAML with calibration coefficients" + ) + + # optional workflow switches + p.add_argument("--conc", action="store_true", help="write bulk concentration table") + p.add_argument( + "--hist", action="store_true", help="write size/mass/time-lag histograms" + ) + p.add_argument( + "--dt", type=int, default=1, help="aggregation interval in seconds (default: 1)" + ) + + # BC proeprties + p.add_argument( + "--BC_rho", type=float, default=1.8, help="BC density (default: 1.8)" + ) + p.add_argument( + "--BC_type", + type=str, + default="constant_effective_density", + help="BC type (default: constant_effective_density)", + ) + + # cluster / resource knobs (all optional) + p.add_argument( + "--cores", type=int, default=8, help="CPU cores per SLURM job (default: 8)" + ) + p.add_argument("--memory", default="64GB", help="RAM per job (default: 64GB)") + p.add_argument( + "--walltime", default="00:59:00", help="SLURM wall-time (default: 1h)" + ) + p.add_argument( + "--partition", default="hourly", help="SLURM partition (default: hourly)" + ) + p.add_argument("--set", action="append", default=[], help="dot.notation overrides") + return p.parse_args() + + +def apply_sets(cfg, sets): + def coerce(v): + if v.lower() in ("true", "false"): + return v.lower() == "true" + try: + if "." in v: + return float(v) + return int(v) + except (ValueError, TypeError): + return v + + for item in sets: + key, _, val = item.partition("=") + if not _: + raise ValueError(f"--set needs key=value, got: {item}") + parts = key.split(".") + cur = cfg + for p in parts[:-1]: + cur = cur.setdefault(p, {}) + cur[parts[-1]] = coerce(val) + return cfg + + +def get(cfg, dotted, default=None): + cur = cfg + for part in dotted.split("."): + if not isinstance(cur, dict) or part not in cur: + return default + cur = cur[part] + return cur + + +def choose(cli_val, cfg, dotted, default=None): + return cli_val if cli_val is not None else get(cfg, dotted, default) + + def find_matching_hk_file(pbp_path: str) -> Optional[str]: """ From a PbP filename, return a path to the corresponding hk file (.csv or .zip), if found. diff --git a/tests/run_config.yaml b/tests/run_config.yaml new file mode 100644 index 0000000..62cc40a --- /dev/null +++ b/tests/run_config.yaml @@ -0,0 +1,35 @@ +paths: + input_pbp: tests/data/pbp_files_test + input_hk: tests/data/hk_files_test + output: tests/data3 + instrument_config: tests/data/instrument_config.yaml + +workflow: + conc: true + hist: true + dt: 1 # seconds + +cluster: + cores: 8 + memory: 64GB + walltime: "00:59:00" + partition: hourly + log_dir: ./slurm_out + +bc: + rho_eff: 1800 + type: constant_effective_density + +histo: + inc: + min_mass: 1e-18 + max_mass: 1e-12 + n_bins: 40 + scatt: + min_D: 100 + max_D: 1000 + n_bins: 30 + timelag: + min: -50 + max: 100 + n_bins: 150 \ No newline at end of file