Clean up old parts in the code and re-arranged processing order

This commit is contained in:
2025-08-13 16:08:37 +02:00
parent 5f3d25817c
commit 1038b18187
2 changed files with 53 additions and 95 deletions

View File

@@ -9,6 +9,7 @@ from sp2xr.helpers import (
parse_args,
load_and_resolve_config,
initialize_cluster,
floor_index_to_dt,
)
from sp2xr.apply_calib import calibrate_single_particle
from sp2xr.resample_pbp_hk import build_dt_summary, resample_hk_partition
@@ -26,43 +27,10 @@ def main():
client = initialize_cluster(run_config)
"""args = parse_args()
base = load_yaml_cfg(args.config)
base = apply_sets(base, args.set)
# resolved values (CLI > config > built-in defaults)
input_pbp = choose(args.input_pbp, base, "paths.input_pbp")
input_hk = choose(args.input_hk, base, "paths.input_hk")
output = choose(args.output, base, "paths.output")
instr_cfg = choose(args.instr_config, base, "paths.instrument_config")
dt = choose(args.dt, base, "workflow.dt", 1)
do_conc = args.conc or get(base, "workflow.conc", False)
do_hist = args.hist or get(base, "workflow.hist", False)
rho_eff = choose(args.BC_rho, base, "bc.rho_eff", 1.8)
BC_type = choose(args.BC_type, base, "bc.type", "constant_effective_density")
cores = choose(args.cores, base, "cluster.cores", 8)
memory = choose(args.memory, base, "cluster.memory", "64GB")
walltime = choose(args.walltime, base, "cluster.walltime", "00:59:00")
part = choose(args.partition, base, "cluster.partition", "hourly")
log_dir = get(base, "cluster.log_dir", "./slurm_out")
# optional: persist fully-resolved config for provenance
os.makedirs(output, exist_ok=True)
with open(os.path.join(output, "config.resolved.yaml"), "w") as f:
yaml.safe_dump(base, f, sort_keys=False)
# 0. cluster -------------------------------------------------------
client = make_cluster(
cores=cores, mem=memory, wall=walltime, partition=part, out_dir=log_dir
)
"""
# 1. calibration stage --------------------------------------------
# 0. calibration stage --------------------------------------------
instr_config = yaml.safe_load(open(run_config["instr_cfg"]))
# cfg_future = client.scatter(cfg, broadcast=True)
# 1. Bins
inc_mass_bin_lims = np.logspace(
np.log10(run_config["histo"]["inc"]["min_mass"]),
np.log10(run_config["histo"]["inc"]["max_mass"]),
@@ -70,19 +38,23 @@ def main():
)
inc_mass_bin_ctrs = bin_lims_to_ctrs(inc_mass_bin_lims)
ddf_raw = dd.read_parquet(run_config["input_pbp"], engine="pyarrow")
ddf_cal = calibrate_single_particle(ddf_raw, instr_config, run_config)
meta_dt = build_dt_summary(ddf_cal._meta) # empty frame for metadata
ddf_pbp_dt = ddf_cal.map_partitions(
build_dt_summary, dt_s=run_config["dt"], meta=meta_dt
scatt_bin_lims = np.logspace(
np.log10(run_config["histo"]["scatt"]["min_D"]),
np.log10(run_config["histo"]["scatt"]["max_D"]),
run_config["histo"]["scatt"]["n_bins"],
)
scatt_bin_ctrs = bin_lims_to_ctrs(scatt_bin_lims)
# 2. load house-keeping once --------------------------------------
timelag_bins_lims = np.linspace(
run_config["histo"]["timelag"]["min"],
run_config["histo"]["timelag"]["max"],
run_config["histo"]["timelag"]["n_bins"],
)
timelag_bin_ctrs = bin_lims_to_ctrs(timelag_bins_lims)
# 2. HK processing --------------------------------------
ddf_hk = dd.read_parquet(run_config["input_hk"], engine="pyarrow")
# Create a meta (empty dataframe with correct schema)
meta = pd.DataFrame(
{
"Sample Flow Controller Read (sccm)": pd.Series(dtype="float64"),
@@ -92,29 +64,23 @@ def main():
},
index=pd.DatetimeIndex([]),
)
# Map partition-wise
ddf_hk_dt = ddf_hk.map_partitions(
resample_hk_partition, dt=f"{run_config['dt']}s", meta=meta
)
flow_dt = ddf_hk_dt["Sample Flow Controller Read (vccm)"].compute()
def floor_index_to_dt(pdf: pd.DataFrame) -> pd.DataFrame:
"""
Replace the existing DatetimeIndex with its lower-second value,
without changing the indexs name or creating a new column.
"""
pdf.index = pdf.index.floor(f"{run_config['dt']}s") # keeps the original .name
return pdf
# 3. PBP processing --------------------------------------
ddf_raw = dd.read_parquet(run_config["input_pbp"], engine="pyarrow")
# meta stays identical because only the *values* of the index change
ddf_cal = ddf_cal.map_partitions(floor_index_to_dt, meta=ddf_cal._meta)
ddf_cal = calibrate_single_particle(ddf_raw, instr_config, run_config)
ddf_cal = ddf_cal.map_partitions(
floor_index_to_dt, run_config=run_config, meta=ddf_cal._meta
)
meta_pbp_with_flow = ddf_cal._meta.copy()
meta_pbp_with_flow["Sample Flow Controller Read (vccm)"] = (
np.float64()
) # tell Dask a new float col appears
meta_pbp_with_flow["Sample Flow Controller Read (vccm)"] = np.float64()
ddf_pbp_with_flow = ddf_cal.map_partitions(
lambda part: part.join(flow_dt, how="left"),
@@ -133,7 +99,7 @@ def main():
)
ddf_pbp_with_flow.to_parquet(
path=f"{run_config['output']}/pbp_calibrated",
path=f"{run_config['output']}/pbp_{run_config['dt']}s_calibrated",
partition_on=["date", "hour"],
engine="pyarrow",
write_index=True,
@@ -141,7 +107,11 @@ def main():
overwrite=True,
)
# 3. aggregation core ---------------------------------------------
# 4. Aggregate PBP ---------------------------------------------
ddf_pbp_dt = ddf_cal.map_partitions(
build_dt_summary, dt_s=run_config["dt"], meta=build_dt_summary(ddf_cal._meta)
)
ddf_pbp_hk_dt = dd.merge(
ddf_pbp_dt, ddf_hk_dt, how="left", left_index=True, right_index=True
)
@@ -163,14 +133,9 @@ def main():
)
# 4. (optional) dt bulk conc --------------------------
meta_conc = (
add_concentrations(ddf_pbp_hk_dt._meta, dt=run_config["dt"])
if run_config["do_conc"]
else None
)
if run_config["do_conc"]:
print("ok")
meta_conc = add_concentrations(ddf_pbp_hk_dt._meta, dt=run_config["dt"])
ddf_conc = ddf_pbp_hk_dt.map_partitions(
add_concentrations, dt=run_config["dt"], meta=meta_conc
)
@@ -183,8 +148,7 @@ def main():
# 5. (optional) dt histograms --------------------------
if run_config["do_hist"]:
# ========= 3. RUN MASS HISTOGRAMS =========
# --- Mass histogram configs
# --- Mass histogram
hist_configs = [
{"name": "BC_all", "flag_col": None, "flag_value": None},
{"name": "thin", "flag_col": "cnts_thin", "flag_value": 1},
@@ -232,18 +196,11 @@ def main():
rho_eff=run_config["rho_eff"],
BC_type=run_config["BC_type"],
t=1,
meta=meta_hist, # <-- single line does the trick
meta=meta_hist,
)
results.append(ddf_out)
# ========= 4. RUN SCATTERING HISTOGRAM =========
# --- Scattering histogram configuration
scatt_bin_lims = np.logspace(
np.log10(run_config["histo"]["scatt"]["min_D"]),
np.log10(run_config["histo"]["scatt"]["max_D"]),
run_config["histo"]["scatt"]["n_bins"],
)
scatt_bin_ctrs = bin_lims_to_ctrs(scatt_bin_lims)
# --- Scattering histogram
meta_hist = make_hist_meta(
bin_ctrs=scatt_bin_ctrs,
@@ -265,19 +222,11 @@ def main():
rho_eff=None,
BC_type=None,
t=1,
meta=meta_hist, # <-- single line does the trick
meta=meta_hist,
)
results.append(ddf_scatt)
# ========= 5. RUN TIMELAG HISTOGRAMS =========
# --- Timelag histogram parameters
timelag_bins_lims = np.linspace(
run_config["histo"]["timelag"]["min"],
run_config["histo"]["timelag"]["max"],
run_config["histo"]["timelag"]["n_bins"],
)
timelag_bin_ctrs = bin_lims_to_ctrs(timelag_bins_lims)
# --- Timelag histogram
mass_bins = ddf_pbp_with_flow["BC mass bin"].unique().compute()
for idx, mass_bin in enumerate(mass_bins[:1]):
@@ -303,26 +252,25 @@ def main():
bin_ctrs=timelag_bin_ctrs,
dt=run_config["dt"],
calculate_conc=True,
flow=None, # ddf_pbp_hk["Sample Flow Controller Read (vccm)"],
flow=None,
rho_eff=None,
BC_type=None,
t=1,
name_prefix=name_prefix, # <-- force naming
meta=meta_hist, # pd.DataFrame(columns=meta_cols)
name_prefix=name_prefix,
meta=meta_hist,
)
results.append(tl_ddf)
# ========= 6. MERGE ALL HISTOGRAMS =========
# --- Merge all hists
merged_ddf = dd.concat(results, axis=1, interleave_partitions=True)
# Add normalized date column
index_as_dt = dd.to_datetime(merged_ddf.index.to_series())
merged_ddf["date"] = index_as_dt.map_partitions(
lambda s: s.dt.normalize(), meta=("date", "datetime64[ns]")
)
# ========= 7. SAVE TO PARQUET =========
# --- Save hists to parquet
merged_ddf.to_parquet(
f"{run_config['output']}/hists_{run_config['dt']}s",

View File

@@ -2,12 +2,22 @@ import os
import re
import yaml
import argparse
import pandas as pd
from pathlib import Path
from dask_jobqueue import SLURMCluster
from dask.distributed import Client
from typing import Optional, List
def floor_index_to_dt(pdf: pd.DataFrame, run_config: dict) -> pd.DataFrame:
"""
Replace the existing DatetimeIndex with its lower-second value,
without changing the indexs name or creating a new column.
"""
pdf.index = pdf.index.floor(f"{run_config['dt']}s") # keeps the original .name
return pdf
def load_and_resolve_config(args):
base = load_yaml_cfg(args.config)
base = apply_sets(base, args.set)