diff --git a/scripts/sp2xr_pipeline.py b/scripts/sp2xr_pipeline.py index 0ae42b3..3c3b5f7 100644 --- a/scripts/sp2xr_pipeline.py +++ b/scripts/sp2xr_pipeline.py @@ -9,6 +9,7 @@ from sp2xr.helpers import ( parse_args, load_and_resolve_config, initialize_cluster, + floor_index_to_dt, ) from sp2xr.apply_calib import calibrate_single_particle from sp2xr.resample_pbp_hk import build_dt_summary, resample_hk_partition @@ -26,43 +27,10 @@ def main(): client = initialize_cluster(run_config) - """args = parse_args() - base = load_yaml_cfg(args.config) - base = apply_sets(base, args.set) - - # resolved values (CLI > config > built-in defaults) - input_pbp = choose(args.input_pbp, base, "paths.input_pbp") - input_hk = choose(args.input_hk, base, "paths.input_hk") - output = choose(args.output, base, "paths.output") - instr_cfg = choose(args.instr_config, base, "paths.instrument_config") - - dt = choose(args.dt, base, "workflow.dt", 1) - do_conc = args.conc or get(base, "workflow.conc", False) - do_hist = args.hist or get(base, "workflow.hist", False) - - rho_eff = choose(args.BC_rho, base, "bc.rho_eff", 1.8) - BC_type = choose(args.BC_type, base, "bc.type", "constant_effective_density") - - cores = choose(args.cores, base, "cluster.cores", 8) - memory = choose(args.memory, base, "cluster.memory", "64GB") - walltime = choose(args.walltime, base, "cluster.walltime", "00:59:00") - part = choose(args.partition, base, "cluster.partition", "hourly") - log_dir = get(base, "cluster.log_dir", "./slurm_out") - - # optional: persist fully-resolved config for provenance - os.makedirs(output, exist_ok=True) - with open(os.path.join(output, "config.resolved.yaml"), "w") as f: - yaml.safe_dump(base, f, sort_keys=False) - - # 0. cluster ------------------------------------------------------- - client = make_cluster( - cores=cores, mem=memory, wall=walltime, partition=part, out_dir=log_dir - ) - """ - # 1. calibration stage -------------------------------------------- + # 0. calibration stage -------------------------------------------- instr_config = yaml.safe_load(open(run_config["instr_cfg"])) - # cfg_future = client.scatter(cfg, broadcast=True) + # 1. Bins inc_mass_bin_lims = np.logspace( np.log10(run_config["histo"]["inc"]["min_mass"]), np.log10(run_config["histo"]["inc"]["max_mass"]), @@ -70,19 +38,23 @@ def main(): ) inc_mass_bin_ctrs = bin_lims_to_ctrs(inc_mass_bin_lims) - ddf_raw = dd.read_parquet(run_config["input_pbp"], engine="pyarrow") - - ddf_cal = calibrate_single_particle(ddf_raw, instr_config, run_config) - - meta_dt = build_dt_summary(ddf_cal._meta) # empty frame for metadata - - ddf_pbp_dt = ddf_cal.map_partitions( - build_dt_summary, dt_s=run_config["dt"], meta=meta_dt + scatt_bin_lims = np.logspace( + np.log10(run_config["histo"]["scatt"]["min_D"]), + np.log10(run_config["histo"]["scatt"]["max_D"]), + run_config["histo"]["scatt"]["n_bins"], ) + scatt_bin_ctrs = bin_lims_to_ctrs(scatt_bin_lims) - # 2. load house-keeping once -------------------------------------- + timelag_bins_lims = np.linspace( + run_config["histo"]["timelag"]["min"], + run_config["histo"]["timelag"]["max"], + run_config["histo"]["timelag"]["n_bins"], + ) + timelag_bin_ctrs = bin_lims_to_ctrs(timelag_bins_lims) + + # 2. HK processing -------------------------------------- ddf_hk = dd.read_parquet(run_config["input_hk"], engine="pyarrow") - # Create a meta (empty dataframe with correct schema) + meta = pd.DataFrame( { "Sample Flow Controller Read (sccm)": pd.Series(dtype="float64"), @@ -92,29 +64,23 @@ def main(): }, index=pd.DatetimeIndex([]), ) - - # Map partition-wise ddf_hk_dt = ddf_hk.map_partitions( resample_hk_partition, dt=f"{run_config['dt']}s", meta=meta ) flow_dt = ddf_hk_dt["Sample Flow Controller Read (vccm)"].compute() - def floor_index_to_dt(pdf: pd.DataFrame) -> pd.DataFrame: - """ - Replace the existing DatetimeIndex with its lower-second value, - without changing the index’s name or creating a new column. - """ - pdf.index = pdf.index.floor(f"{run_config['dt']}s") # keeps the original .name - return pdf + # 3. PBP processing -------------------------------------- + ddf_raw = dd.read_parquet(run_config["input_pbp"], engine="pyarrow") - # meta stays identical because only the *values* of the index change - ddf_cal = ddf_cal.map_partitions(floor_index_to_dt, meta=ddf_cal._meta) + ddf_cal = calibrate_single_particle(ddf_raw, instr_config, run_config) + + ddf_cal = ddf_cal.map_partitions( + floor_index_to_dt, run_config=run_config, meta=ddf_cal._meta + ) meta_pbp_with_flow = ddf_cal._meta.copy() - meta_pbp_with_flow["Sample Flow Controller Read (vccm)"] = ( - np.float64() - ) # tell Dask a new float col appears + meta_pbp_with_flow["Sample Flow Controller Read (vccm)"] = np.float64() ddf_pbp_with_flow = ddf_cal.map_partitions( lambda part: part.join(flow_dt, how="left"), @@ -133,7 +99,7 @@ def main(): ) ddf_pbp_with_flow.to_parquet( - path=f"{run_config['output']}/pbp_calibrated", + path=f"{run_config['output']}/pbp_{run_config['dt']}s_calibrated", partition_on=["date", "hour"], engine="pyarrow", write_index=True, @@ -141,7 +107,11 @@ def main(): overwrite=True, ) - # 3. aggregation core --------------------------------------------- + # 4. Aggregate PBP --------------------------------------------- + ddf_pbp_dt = ddf_cal.map_partitions( + build_dt_summary, dt_s=run_config["dt"], meta=build_dt_summary(ddf_cal._meta) + ) + ddf_pbp_hk_dt = dd.merge( ddf_pbp_dt, ddf_hk_dt, how="left", left_index=True, right_index=True ) @@ -163,14 +133,9 @@ def main(): ) # 4. (optional) dt bulk conc -------------------------- - meta_conc = ( - add_concentrations(ddf_pbp_hk_dt._meta, dt=run_config["dt"]) - if run_config["do_conc"] - else None - ) - if run_config["do_conc"]: - print("ok") + meta_conc = add_concentrations(ddf_pbp_hk_dt._meta, dt=run_config["dt"]) + ddf_conc = ddf_pbp_hk_dt.map_partitions( add_concentrations, dt=run_config["dt"], meta=meta_conc ) @@ -183,8 +148,7 @@ def main(): # 5. (optional) dt histograms -------------------------- if run_config["do_hist"]: - # ========= 3. RUN MASS HISTOGRAMS ========= - # --- Mass histogram configs + # --- Mass histogram hist_configs = [ {"name": "BC_all", "flag_col": None, "flag_value": None}, {"name": "thin", "flag_col": "cnts_thin", "flag_value": 1}, @@ -232,18 +196,11 @@ def main(): rho_eff=run_config["rho_eff"], BC_type=run_config["BC_type"], t=1, - meta=meta_hist, # <-- single line does the trick + meta=meta_hist, ) results.append(ddf_out) - # ========= 4. RUN SCATTERING HISTOGRAM ========= - # --- Scattering histogram configuration - scatt_bin_lims = np.logspace( - np.log10(run_config["histo"]["scatt"]["min_D"]), - np.log10(run_config["histo"]["scatt"]["max_D"]), - run_config["histo"]["scatt"]["n_bins"], - ) - scatt_bin_ctrs = bin_lims_to_ctrs(scatt_bin_lims) + # --- Scattering histogram meta_hist = make_hist_meta( bin_ctrs=scatt_bin_ctrs, @@ -265,19 +222,11 @@ def main(): rho_eff=None, BC_type=None, t=1, - meta=meta_hist, # <-- single line does the trick + meta=meta_hist, ) results.append(ddf_scatt) - # ========= 5. RUN TIMELAG HISTOGRAMS ========= - - # --- Timelag histogram parameters - timelag_bins_lims = np.linspace( - run_config["histo"]["timelag"]["min"], - run_config["histo"]["timelag"]["max"], - run_config["histo"]["timelag"]["n_bins"], - ) - timelag_bin_ctrs = bin_lims_to_ctrs(timelag_bins_lims) + # --- Timelag histogram mass_bins = ddf_pbp_with_flow["BC mass bin"].unique().compute() for idx, mass_bin in enumerate(mass_bins[:1]): @@ -303,26 +252,25 @@ def main(): bin_ctrs=timelag_bin_ctrs, dt=run_config["dt"], calculate_conc=True, - flow=None, # ddf_pbp_hk["Sample Flow Controller Read (vccm)"], + flow=None, rho_eff=None, BC_type=None, t=1, - name_prefix=name_prefix, # <-- force naming - meta=meta_hist, # pd.DataFrame(columns=meta_cols) + name_prefix=name_prefix, + meta=meta_hist, ) results.append(tl_ddf) - # ========= 6. MERGE ALL HISTOGRAMS ========= + # --- Merge all hists merged_ddf = dd.concat(results, axis=1, interleave_partitions=True) - # Add normalized date column index_as_dt = dd.to_datetime(merged_ddf.index.to_series()) merged_ddf["date"] = index_as_dt.map_partitions( lambda s: s.dt.normalize(), meta=("date", "datetime64[ns]") ) - # ========= 7. SAVE TO PARQUET ========= + # --- Save hists to parquet merged_ddf.to_parquet( f"{run_config['output']}/hists_{run_config['dt']}s", diff --git a/src/sp2xr/helpers.py b/src/sp2xr/helpers.py index 8e8704c..a8bdfa4 100644 --- a/src/sp2xr/helpers.py +++ b/src/sp2xr/helpers.py @@ -2,12 +2,22 @@ import os import re import yaml import argparse +import pandas as pd from pathlib import Path from dask_jobqueue import SLURMCluster from dask.distributed import Client from typing import Optional, List +def floor_index_to_dt(pdf: pd.DataFrame, run_config: dict) -> pd.DataFrame: + """ + Replace the existing DatetimeIndex with its lower-second value, + without changing the index’s name or creating a new column. + """ + pdf.index = pdf.index.floor(f"{run_config['dt']}s") # keeps the original .name + return pdf + + def load_and_resolve_config(args): base = load_yaml_cfg(args.config) base = apply_sets(base, args.set)