Refactor: working version of sp2xr_pipeline but not yet polished

This commit is contained in:
2025-08-08 11:54:32 +02:00
parent 6fa6fabf03
commit 7e736803c1

View File

@@ -8,6 +8,7 @@ from dask.distributed import Client
import yaml
import time
import dask.dataframe as dd
from dask.dataframe.utils import make_meta
import pandas as pd
import numpy as np
from typing import Tuple, List
@@ -51,13 +52,16 @@ def parse_args():
return p.parse_args()
def make_cluster(cores=32, mem="64GB", wall="02:00:00", partition="daily"):
def make_cluster(cores=32, mem="64GB", wall="02:00:00", partition="daily", out_dir=""):
cluster = SLURMCluster(
cores=cores,
processes=cores,
memory=mem,
walltime=wall,
job_extra_directives=[f"--partition={partition}"],
job_extra_directives=[
f"--partition={partition}",
f"--output={out_dir}/slurm-%j.out",
],
)
client = Client(cluster)
@@ -374,6 +378,7 @@ def resample_hk_partition(pdf: pd.DataFrame, dt="1s") -> pd.DataFrame:
"""
# Ensure local partition is sorted
pdf = pdf.sort_index()
pdf.index = pdf.index.floor(dt)
# --- resample mean values ---
df_mean = (
@@ -395,6 +400,8 @@ def resample_hk_partition(pdf: pd.DataFrame, dt="1s") -> pd.DataFrame:
# --- drop empty bins ---
out = out[out["original_idx"] != 0].drop(columns="original_idx")
out = out.asfreq(dt).ffill(limit=1)
# --- add date/hour columns ---
out["date"] = out.index.normalize()
out["hour"] = out.index.hour.astype("int64")
@@ -483,131 +490,6 @@ def make_bin_arrays(
return lims, ctrs
'''
def histo_counts_partition(
pdf: pd.DataFrame,
*,
col: str,
dt_s: int,
bin_lims: np.ndarray,
bin_ctrs: np.ndarray,
flag_col: Optional[str] = None,
flag_val: Optional[int] = None,
) -> pd.DataFrame:
if flag_col is not None and flag_val is not None:
pdf = pdf.loc[pdf[flag_col] == flag_val]
dt_str = f"{dt_s}s"
counts_ser = (
pdf[col]
.resample(dt_str)
.apply(lambda x: np.histogram(x.dropna(), bins=bin_lims)[0])
)
# keep only non-empty intervals (temporary_col exists in PbP frame)
pdf["temporary_col"] = 1
nonempty = pdf["temporary_col"].resample(dt_str).count()
counts_ser = counts_ser[nonempty != 0]
"""if counts_ser.empty:
counts_df = pd.DataFrame(
np.nan,
index=pd.DatetimeIndex([], name=counts_ser.index.name),
columns=[f"bin_{i}" for i in range(len(bin_ctrs))],
)
else:
counts_df = counts_ser.apply(
lambda arr: pd.Series(arr, index=bin_ctrs, dtype="int64")
)""""""
if counts_ser.empty:
# use the REAL bin centres as column names
counts_df = pd.DataFrame(
np.nan,
index=pd.DatetimeIndex([], name=counts_ser.index.name),
columns=bin_ctrs, # <-- not "bin_0", "bin_1"
dtype="float64",
)
else:
counts_df = counts_ser.apply(
lambda arr: pd.Series(arr, index=bin_ctrs, dtype="int64")
)
counts_df["date"] = counts_df.index.normalize()
counts_df["hour"] = counts_df.index.hour.astype("int64")
return counts_df
def all_histograms_partition(
pdf: pd.DataFrame,
*,
specs: List[Tuple[str, Optional[str], Optional[int], str, Tuple[np.ndarray, np.ndarray]]],
dt_s: int,
) -> pd.DataFrame:
frames = []
for idx, (key, flag_col, flag_val, col_name, bin_tuple) in enumerate(specs):
lims, ctrs = bin_tuple
h = histo_counts_partition(
pdf,
col=col_name,
dt_s=dt_s,
bin_lims=lims,
bin_ctrs=ctrs,
flag_col=flag_col,
flag_val=flag_val,
)
helpers = h[["date", "hour"]] if idx == 0 else None
hist = h.drop(columns=["date", "hour"])
# prefix only the histogram columns
#hist = hist.add_prefix(f"{key}_")
hist.columns = [f"{key}_bin_{c}" for c in hist.columns]
frames.append(hist)
if helpers is not None:
frames.append(helpers) # insert once, at the very end
return pd.concat(frames, axis=1)
def counts_to_conc_partition(
pdf: pd.DataFrame,
*,
flow_col: str,
dt_s: int,
rho_eff: float = 1800,
bin_width_map: dict,
) -> pd.DataFrame:
flow = pdf[flow_col]
conc_frames = []
mass_frames = []
for col in pdf.columns:
if "_bin_" not in col:
continue # skip helpers, prefixed cols handled below
# For each histogram population (prefix before first underscore)
prefixes = sorted({c.split("_", 1)[0] for c in pdf.columns if c.startswith("bin_")})
for pref in prefixes:
cols = [c for c in pdf.columns if c.startswith(f"{pref}_bin_")]
bins = np.array([float(c.split("_")[-1]) for c in cols])
dNd = pdf[cols].div(flow, axis=0) / (dt_s / 60)
dNd.columns = [c.replace("bin_", f"dNd_{pref}_") for c in cols]
conc_frames.append(dNd)
# ---- add mass spectra if this is BC-mass histogram -----------
if pref != "scatt_only": # crude rule: scatt histogram → skip mass
dlog = bin_width_map[pref] # scalar or 1-D array
density = rho_eff
dMd = dNd.mul((np.pi / 6) * density * bins**3, axis=1) / dlog
dMd.columns = [c.replace("dNd", "dMd") for c in dNd.columns]
mass_frames.append(dMd)
out = pd.concat(conc_frames + mass_frames + [pdf[["date", "hour"]]], axis=1)
return out
'''
def dNdlogDp_to_dMdlogDp(N, Dp, rho=1800):
"""This is from functions.py from Rob"""
"""
@@ -699,7 +581,11 @@ def process_hist_and_dist_partition(
)
flow_dt = df["Sample Flow Controller Read (vccm)"].resample(dt_str).mean()
df_resampled = df_resampled.to_frame(name="result")
# df_resampled = df_resampled.to_frame(name="result")
if isinstance(df_resampled, pd.Series):
df_resampled = df_resampled.to_frame(name="result")
else: # already a DataFrame
df_resampled.columns = ["result"]
original_idx_df = df_filtered[["temporary_col"]].resample(dt_str).count()
original_idx_df.columns = ["original_idx"] # ensure proper naming
@@ -719,6 +605,8 @@ def process_hist_and_dist_partition(
else:
ddf_hist = df_resampled[list_col].apply(pd.Series)
ddf_hist.index = df_resampled.index
columns = [f"{list_col}_{i}" for i in range(max_list_length)]
ddf_hist.columns = columns
if calculate_conc:
# Join with the sample flow controller data
@@ -780,7 +668,11 @@ def main():
# 0. cluster -------------------------------------------------------
client = make_cluster(
cores=args.cores, mem=args.memory, wall=args.walltime, partition=args.partition
cores=args.cores,
mem=args.memory,
wall=args.walltime,
partition=args.partition,
out_dir="/data/user/bertoz_b/merlin6data/SP2XR_code/slurm_out/",
)
# 1. calibration stage --------------------------------------------
@@ -834,15 +726,6 @@ def main():
np.nan,
)
"""ddf_cal.to_parquet(
path=f"{output_dir}/pbp_calibrated",
partition_on=["date", "hour"],
engine="pyarrow",
write_index=True,
write_metadata_file=True,
overwrite=True,
)"""
ddf_cal["temporary_col"] = 1
meta_dt = build_dt_summary(ddf_cal._meta) # empty frame for metadata
@@ -872,37 +755,49 @@ def main():
resample_hk_partition, dt=f"{args.dt}s", meta=meta
)
def add_flow_nearest(pdf, flow_pdf, tol="500ms"):
'''def add_flow_same_second(pdf, flow_pdf, tol="999ms"):
"""
pdf : particle partition (pandas)
flow_pdf : *entire* flow table (pandas)
tol : max time difference accepted (string, Timedelta, or None)
"""
return pd.merge_asof(
pdf,
pdf.sort_index(),
flow_pdf,
left_index=True,
right_index=True,
direction="nearest",
tolerance=pd.Timedelta(tol) if tol else None,
)
direction="backward",
tolerance=pd.Timedelta(tol),
)'''
pdf_flow = ddf_hk_dt["Sample Flow Controller Read (vccm)"].compute()
flow_1s = ddf_hk_dt["Sample Flow Controller Read (vccm)"].compute()
def floor_index_to_sec(pdf: pd.DataFrame) -> pd.DataFrame:
"""
Replace the existing DatetimeIndex with its lower-second value,
without changing the indexs name or creating a new column.
"""
pdf.index = pdf.index.floor("s") # keeps the original .name
return pdf
# meta stays identical because only the *values* of the index change
ddf_cal = ddf_cal.map_partitions(floor_index_to_sec, meta=ddf_cal._meta)
meta_pbp_with_flow = ddf_cal._meta.copy()
meta_pbp_with_flow["Sample Flow Controller Read (vccm)"] = (
np.float64()
) # tell Dask a new float col appears
"""ddf_pbp_with_flow = ddf_cal.map_partitions(
add_flow_same_second,
flow_pdf=flow_1s, # ← broadcast
meta=meta_pbp_with_flow,
)"""
ddf_pbp_with_flow = ddf_cal.map_partitions(
add_flow_nearest,
flow_pdf=pdf_flow, # ← broadcast
lambda part: part.join(flow_1s, how="left"),
meta=meta_pbp_with_flow,
)
"""ddf_pbp_with_flow = ddf_pbp_with_flow.assign(
**{"BC mass bin": pd.cut(ddf_pbp_with_flow["BC mass within range"],
bins=inc_mass_bin_lims, labels=mass_bin_labels)}
)"""
ddf_pbp_with_flow = ddf_pbp_with_flow.map_partitions(
lambda df: df.assign(
**{
@@ -943,16 +838,7 @@ def main():
write_metadata_file=True,
overwrite=True,
)
"""
meta_core = build_core_meta(ddf_cal, ddf_hk, dt_s)
ddf_core = dd.merge(ddf_cal, ddf_hk, on="Time", how="left") \
.map_partitions(aggregate_core, dt_s=dt_s, meta=meta_core)
# always write the core table
ddf_core.to_parquet(f"{root_out}/core_{dt_s}s",
partition_on=["date"],
overwrite=True)
"""
# 4. (optional) dt bulk conc --------------------------
do_conc = args.conc
meta_conc = add_concentrations(ddf_pbp_hk_dt._meta, dt=dt_s) if do_conc else None
@@ -968,78 +854,8 @@ def main():
overwrite=True,
)
"""if do_hist:
ddf_hist.to_parquet(f"{root_out}/hists_{dt_s}s",
partition_on=["date"], overwrite=True)
"""
# 5. (optional) dt histograms --------------------------
do_hist = args.hist
"""# ---------------------------------------------------------------------
# Which particle subset → which flag column
# ---------------------------------------------------------------------
# key: suffix for column names
# val: (flag column, flag value) (None, None) means "all particles"
histogram_specs = {
"all" : (None, None),
"thin" : ("cnts_thin", 1),
"thin_noScatt" : ("cnts_thin_noScatt", 1),
"thin_low_inc_ratio" : ("cnts_thin_low_inc_scatt_ratio", 1),
"thick" : ("cnts_thick", 1),
"thick_sat" : ("cnts_thick_sat", 1),
"thin_sat" : ("cnts_thin_sat", 1),
"ntl_sat" : ("cnts_ntl_sat", 1),
"ntl" : ("cnts_ntl", 1),
"ext_pos_tl" : ("cnts_extreme_positive_timelag", 1),
"thin_total" : ("cnts_thin_total", 1),
"thick_total" : ("cnts_thick_total", 1),
"unclassified" : ("cnts_unclassified", 1),
}"""
"""bins_dict = {
"mass": make_bin_arrays( # BC-mass histogram bins
lims=np.logspace(
np.log10(cfg["inc_histo"]["min_mass"]),
np.log10(cfg["inc_histo"]["max_mass"]),
cfg["inc_histo"]["n_bins"]
)
), # -> (limits, centres)
"scatt": make_bin_arrays( # scattering-diameter bins
lims=np.logspace(
np.log10(cfg["scatt_histo"]["min_D"]),
np.log10(cfg["scatt_histo"]["max_D"]),
cfg["scatt_histo"]["n_bins"]
)
),
"timelag": make_bin_arrays( # time-lag bins (linear)
lims=np.linspace(
cfg["time_delay_histo"]["min"],
cfg["time_delay_histo"]["max"],
cfg["time_delay_histo"]["n_bins"]
),
log=False
),
}
histogram_spec_list = [
# key, flag_col, flag_val, column_name, bin_tuple, do_mass
("all", None, None, "BC mass within range", bins_dict["mass"]),
("thin", "cnts_thin", 1, "BC mass within range", bins_dict["mass"]),
("thin_noScatt","cnts_thin_noScatt", 1, "BC mass within range", bins_dict["mass"]),
("thin_low_ratio","cnts_thin_low_inc_scatt_ratio",1, "BC mass within range", bins_dict["mass"]),
("thick", "cnts_thick", 1, "BC mass within range", bins_dict["mass"]),
("thick_sat", "cnts_thick_sat", 1, "BC mass within range", bins_dict["mass"]),
("thin_sat", "cnts_thin_sat", 1, "BC mass within range", bins_dict["mass"]),
("ntl_sat", "cnts_ntl_sat", 1, "BC mass within range", bins_dict["mass"]),
("ntl", "cnts_ntl", 1, "BC mass within range", bins_dict["mass"]),
("ext_pos_tl", "cnts_extreme_positive_timelag", 1, "BC mass within range", bins_dict["mass"]),
("thin_total", "cnts_thin_total", 1, "BC mass within range", bins_dict["mass"]),
("thick_total", "cnts_thick_total", 1, "BC mass within range", bins_dict["mass"]),
("unclassified","cnts_unclassified", 1, "BC mass within range", bins_dict["mass"]),
# scattering-only diameter spectrum
("scatt_only", None, None, "Opt diam scatt only", bins_dict["scatt"]),
]"""
if do_hist:
# --- Mass histogram configs
@@ -1067,6 +883,83 @@ def main():
{"name": "unclassified", "flag_col": "cnts_unclassified", "flag_value": 1},
]
# ========= 2. WRAPPER FUNCTION =========
def make_hist_meta(
*,
bin_ctrs: np.ndarray,
kind: str, # "mass", "scatt", "timelag"
flag_col: str | None, # e.g. "cnts_thin" or None
rho_eff=None,
BC_type=None,
dtype=np.float64,
):
"""
Return an empty DataFrame whose column names AND dtypes are exactly
what `process_hist_and_dist_partition()` will emit, without ever
calling that function.
"""
# ---- pick prefix exactly like the function ----------------------
prefix = (
"all"
if flag_col is None
else flag_col[5:] if flag_col.startswith("cnts_") else flag_col
)
# ---- translate mass bins if needed ------------------------------
if kind == "mass" and rho_eff is not None and BC_type is not None:
# same conversion used inside the partition function
_, labels = BC_mass_to_diam(bin_ctrs, rho_eff=rho_eff, BC_type=BC_type)
n_prefix, m_prefix = "dNdlogDmev", "dMdlogDmev"
elif kind == "scatt":
labels = bin_ctrs
n_prefix, m_prefix = "dNdlogDsc", None
elif kind == "timelag":
labels = bin_ctrs
n_prefix, m_prefix = f"{prefix}_timelag", None
else:
raise ValueError(f"Unknown histogram kind '{kind}'")
# ---- build column list ------------------------------------------
cols = [f"{n_prefix}_{prefix}_{v:.2f}" for v in labels]
if kind == "mass" and m_prefix:
cols += [f"{m_prefix}_{prefix}_{v:.2f}" for v in labels]
empty_df = pd.DataFrame(
{c: pd.Series(dtype=dtype) for c in cols},
index=pd.DatetimeIndex([], name="calculated_time"),
)
return make_meta(empty_df)
# ========= 3. RUN MASS HISTOGRAMS =========
results = []
for cfg_hist in hist_configs:
meta_hist = make_hist_meta(
bin_ctrs=inc_mass_bin_ctrs,
kind="mass",
flag_col=cfg_hist["flag_col"],
rho_eff=rho_eff,
BC_type=BC_type,
)
ddf_out = ddf_pbp_with_flow.map_partitions(
process_hist_and_dist_partition,
col="BC mass within range",
flag_col=cfg_hist["flag_col"],
flag_value=cfg_hist["flag_value"],
bin_lims=inc_mass_bin_lims,
bin_ctrs=inc_mass_bin_ctrs,
dt_str="1s",
calculate_conc=True,
flow=None,
rho_eff=rho_eff,
BC_type=BC_type,
t=1,
meta=meta_hist, # <-- single line does the trick
)
results.append(ddf_out)
# ========= 4. RUN SCATTERING HISTOGRAM =========
# --- Scattering histogram configuration
scatt_bin_lims = np.logspace(
np.log10(cfg["scatt_histo"]["min_D"]),
@@ -1074,17 +967,32 @@ def main():
cfg["scatt_histo"]["n_bins"],
)
scatt_bin_ctrs = bin_lims_to_ctrs(scatt_bin_lims)
scatt_config = {
"name": "scatt",
"col": "Opt diam scatt only",
"flag_col": None,
"flag_value": None,
"bin_lims": scatt_bin_lims,
"bin_ctrs": scatt_bin_ctrs,
"rho_eff": None,
"BC_type": None,
}
meta_hist = make_hist_meta(
bin_ctrs=scatt_bin_ctrs,
kind="scatt",
flag_col=None,
rho_eff=None,
BC_type=None,
)
ddf_scatt = ddf_pbp_with_flow.map_partitions(
process_hist_and_dist_partition,
col="Opt diam scatt only",
flag_col=None,
flag_value=None,
bin_lims=scatt_bin_lims,
bin_ctrs=scatt_bin_ctrs,
dt_str="1s",
calculate_conc=True,
flow=None,
rho_eff=None,
BC_type=None,
t=1,
meta=meta_hist, # <-- single line does the trick
)
results.append(ddf_scatt)
# ========= 5. RUN TIMELAG HISTOGRAMS =========
"""
# --- Timelag histogram parameters
timelag_bins_lims = np.linspace(
cfg["time_delay_histo"]["min"],
@@ -1093,86 +1001,6 @@ def main():
)
timelag_bin_ctrs = bin_lims_to_ctrs(timelag_bins_lims)
# ========= 2. WRAPPER FUNCTION =========
def run_histogram(
ddf,
name,
col,
flag_col,
flag_value,
bin_lims,
bin_ctrs,
dt_str,
flow=None,
rho_eff=None,
BC_type=None,
t=1,
):
"""Runs process_hist_and_dist_partition and returns a Dask DF with renamed columns."""
meta_cols = [f"{name}_bin{i}" for i in range(len(bin_ctrs))]
hist_ddf = ddf.map_partitions(
process_hist_and_dist_partition,
col=col,
flag_col=flag_col,
flag_value=flag_value,
bin_lims=bin_lims,
bin_ctrs=bin_ctrs,
dt_str=dt_str,
calculate_conc=True,
flow=flow,
rho_eff=rho_eff,
BC_type=BC_type,
t=t,
meta=pd.DataFrame(columns=meta_cols),
)
hist_ddf.columns = meta_cols
return hist_ddf
# ========= 3. RUN MASS HISTOGRAMS =========
results = []
for cfg in hist_configs:
ddf_out = run_histogram(
ddf_pbp_with_flow,
name=cfg["name"],
col="BC mass within range",
flag_col=cfg["flag_col"],
flag_value=cfg["flag_value"],
bin_lims=inc_mass_bin_lims,
bin_ctrs=inc_mass_bin_ctrs,
dt_str="1s",
flow=None,
rho_eff=rho_eff,
BC_type=BC_type,
t=1,
)
results.append(ddf_out)
# ========= 4. RUN SCATTERING HISTOGRAM =========
scatt_ddf = run_histogram(
ddf_pbp_with_flow,
name=scatt_config["name"],
col=scatt_config["col"],
flag_col=scatt_config["flag_col"],
flag_value=scatt_config["flag_value"],
bin_lims=scatt_config["bin_lims"],
bin_ctrs=scatt_config["bin_ctrs"],
dt_str="1s",
# flow=ddf_pbp_hk["Sample Flow Controller Read (vccm)"],
rho_eff=scatt_config["rho_eff"],
BC_type=scatt_config["BC_type"],
t=1,
)
results.append(scatt_ddf)
# ========= 5. RUN TIMELAG HISTOGRAMS =========
mass_bins = ddf_pbp_with_flow["BC mass bin"].unique().compute()
for idx, mass_bin in enumerate(mass_bins):
ddf_bin = ddf_pbp_with_flow[ddf_pbp_with_flow["BC mass bin"] == mass_bin]
@@ -1205,14 +1033,14 @@ def main():
tl_ddf.columns = meta_cols
results.append(tl_ddf)
"""
# ========= 6. MERGE ALL HISTOGRAMS =========
merged_ddf = reduce(lambda left, right: left.join(right), results)
# Add normalized date column
# merged_ddf = merged_ddf.assign(date=merged_ddf.index.to_series().dt.normalize())
index_as_dt = dd.to_datetime(merged_ddf.index)
index_as_dt = dd.to_datetime(merged_ddf.index.to_series())
merged_ddf["date"] = index_as_dt.map_partitions(
lambda s: s.dt.normalize(), meta=("date", "datetime64[ns]")
)
@@ -1224,42 +1052,6 @@ def main():
partition_on=["date"],
overwrite=True,
)
"""BC_all = ddf_pbp_with_flow.map_partitions(
process_hist_and_dist_partition,
col="BC mass within range",
flag_col=None,
flag_value=None,
bin_lims=inc_mass_bin_lims,
bin_ctrs=inc_mass_bin_ctrs,
dt_str='1s',
calculate_conc=True,
flow=None,
rho_eff=rho_eff,
BC_type=BC_type,
t=1,
)
thin = ddf_pbp_with_flow.map_partitions(
process_hist_and_dist_partition,
col="BC mass within range",
flag_col="cnts_thin",
flag_value=1,
bin_lims=inc_mass_bin_lims,
bin_ctrs=inc_mass_bin_ctrs,
dt_str='1s',
calculate_conc=True,
rho_eff=rho_eff,
BC_type=BC_type,
t=1,
)
time_index = dd.to_datetime(BC_all.index.to_series())
BC_all["date"] = time_index.dt.normalize() # works on Series
merged_ddf = BC_all.join(thin)
# 8. Write parquet
merged_ddf.to_parquet(
f"{output_dir}/hists_{dt_s}s",
partition_on=["date"],
overwrite=True,
)"""
client.close()