diff --git a/scripts/sp2xr_pipeline.py b/scripts/sp2xr_pipeline.py index cfdb83e..feaf128 100644 --- a/scripts/sp2xr_pipeline.py +++ b/scripts/sp2xr_pipeline.py @@ -8,6 +8,7 @@ from dask.distributed import Client import yaml import time import dask.dataframe as dd +from dask.dataframe.utils import make_meta import pandas as pd import numpy as np from typing import Tuple, List @@ -51,13 +52,16 @@ def parse_args(): return p.parse_args() -def make_cluster(cores=32, mem="64GB", wall="02:00:00", partition="daily"): +def make_cluster(cores=32, mem="64GB", wall="02:00:00", partition="daily", out_dir=""): cluster = SLURMCluster( cores=cores, processes=cores, memory=mem, walltime=wall, - job_extra_directives=[f"--partition={partition}"], + job_extra_directives=[ + f"--partition={partition}", + f"--output={out_dir}/slurm-%j.out", + ], ) client = Client(cluster) @@ -374,6 +378,7 @@ def resample_hk_partition(pdf: pd.DataFrame, dt="1s") -> pd.DataFrame: """ # Ensure local partition is sorted pdf = pdf.sort_index() + pdf.index = pdf.index.floor(dt) # --- resample mean values --- df_mean = ( @@ -395,6 +400,8 @@ def resample_hk_partition(pdf: pd.DataFrame, dt="1s") -> pd.DataFrame: # --- drop empty bins --- out = out[out["original_idx"] != 0].drop(columns="original_idx") + out = out.asfreq(dt).ffill(limit=1) + # --- add date/hour columns --- out["date"] = out.index.normalize() out["hour"] = out.index.hour.astype("int64") @@ -483,131 +490,6 @@ def make_bin_arrays( return lims, ctrs -''' -def histo_counts_partition( - pdf: pd.DataFrame, - *, - col: str, - dt_s: int, - bin_lims: np.ndarray, - bin_ctrs: np.ndarray, - flag_col: Optional[str] = None, - flag_val: Optional[int] = None, -) -> pd.DataFrame: - if flag_col is not None and flag_val is not None: - pdf = pdf.loc[pdf[flag_col] == flag_val] - - dt_str = f"{dt_s}s" - - counts_ser = ( - pdf[col] - .resample(dt_str) - .apply(lambda x: np.histogram(x.dropna(), bins=bin_lims)[0]) - ) - - # keep only non-empty intervals (temporary_col exists in PbP frame) - pdf["temporary_col"] = 1 - nonempty = pdf["temporary_col"].resample(dt_str).count() - counts_ser = counts_ser[nonempty != 0] - - """if counts_ser.empty: - counts_df = pd.DataFrame( - np.nan, - index=pd.DatetimeIndex([], name=counts_ser.index.name), - columns=[f"bin_{i}" for i in range(len(bin_ctrs))], - ) - else: - counts_df = counts_ser.apply( - lambda arr: pd.Series(arr, index=bin_ctrs, dtype="int64") - )"""""" - if counts_ser.empty: - # use the REAL bin centres as column names - counts_df = pd.DataFrame( - np.nan, - index=pd.DatetimeIndex([], name=counts_ser.index.name), - columns=bin_ctrs, # <-- not "bin_0", "bin_1" - dtype="float64", - ) - else: - counts_df = counts_ser.apply( - lambda arr: pd.Series(arr, index=bin_ctrs, dtype="int64") - ) - - counts_df["date"] = counts_df.index.normalize() - counts_df["hour"] = counts_df.index.hour.astype("int64") - return counts_df - - - -def all_histograms_partition( - pdf: pd.DataFrame, - *, - specs: List[Tuple[str, Optional[str], Optional[int], str, Tuple[np.ndarray, np.ndarray]]], - dt_s: int, -) -> pd.DataFrame: - frames = [] - for idx, (key, flag_col, flag_val, col_name, bin_tuple) in enumerate(specs): - lims, ctrs = bin_tuple - h = histo_counts_partition( - pdf, - col=col_name, - dt_s=dt_s, - bin_lims=lims, - bin_ctrs=ctrs, - flag_col=flag_col, - flag_val=flag_val, - ) - helpers = h[["date", "hour"]] if idx == 0 else None - hist = h.drop(columns=["date", "hour"]) - - # prefix only the histogram columns - #hist = hist.add_prefix(f"{key}_") - hist.columns = [f"{key}_bin_{c}" for c in hist.columns] - - frames.append(hist) - if helpers is not None: - frames.append(helpers) # insert once, at the very end - - return pd.concat(frames, axis=1) - -def counts_to_conc_partition( - pdf: pd.DataFrame, - *, - flow_col: str, - dt_s: int, - rho_eff: float = 1800, - bin_width_map: dict, -) -> pd.DataFrame: - flow = pdf[flow_col] - conc_frames = [] - mass_frames = [] - - for col in pdf.columns: - if "_bin_" not in col: - continue # skip helpers, prefixed cols handled below - - # For each histogram population (prefix before first underscore) - prefixes = sorted({c.split("_", 1)[0] for c in pdf.columns if c.startswith("bin_")}) - for pref in prefixes: - cols = [c for c in pdf.columns if c.startswith(f"{pref}_bin_")] - bins = np.array([float(c.split("_")[-1]) for c in cols]) - dNd = pdf[cols].div(flow, axis=0) / (dt_s / 60) - dNd.columns = [c.replace("bin_", f"dNd_{pref}_") for c in cols] - conc_frames.append(dNd) - - # ---- add mass spectra if this is BC-mass histogram ----------- - if pref != "scatt_only": # crude rule: scatt histogram → skip mass - dlog = bin_width_map[pref] # scalar or 1-D array - density = rho_eff - dMd = dNd.mul((np.pi / 6) * density * bins**3, axis=1) / dlog - dMd.columns = [c.replace("dNd", "dMd") for c in dNd.columns] - mass_frames.append(dMd) - - out = pd.concat(conc_frames + mass_frames + [pdf[["date", "hour"]]], axis=1) - return out -''' - - def dNdlogDp_to_dMdlogDp(N, Dp, rho=1800): """This is from functions.py from Rob""" """ @@ -699,7 +581,11 @@ def process_hist_and_dist_partition( ) flow_dt = df["Sample Flow Controller Read (vccm)"].resample(dt_str).mean() - df_resampled = df_resampled.to_frame(name="result") + # df_resampled = df_resampled.to_frame(name="result") + if isinstance(df_resampled, pd.Series): + df_resampled = df_resampled.to_frame(name="result") + else: # already a DataFrame + df_resampled.columns = ["result"] original_idx_df = df_filtered[["temporary_col"]].resample(dt_str).count() original_idx_df.columns = ["original_idx"] # ensure proper naming @@ -719,6 +605,8 @@ def process_hist_and_dist_partition( else: ddf_hist = df_resampled[list_col].apply(pd.Series) ddf_hist.index = df_resampled.index + columns = [f"{list_col}_{i}" for i in range(max_list_length)] + ddf_hist.columns = columns if calculate_conc: # Join with the sample flow controller data @@ -780,7 +668,11 @@ def main(): # 0. cluster ------------------------------------------------------- client = make_cluster( - cores=args.cores, mem=args.memory, wall=args.walltime, partition=args.partition + cores=args.cores, + mem=args.memory, + wall=args.walltime, + partition=args.partition, + out_dir="/data/user/bertoz_b/merlin6data/SP2XR_code/slurm_out/", ) # 1. calibration stage -------------------------------------------- @@ -834,15 +726,6 @@ def main(): np.nan, ) - """ddf_cal.to_parquet( - path=f"{output_dir}/pbp_calibrated", - partition_on=["date", "hour"], - engine="pyarrow", - write_index=True, - write_metadata_file=True, - overwrite=True, - )""" - ddf_cal["temporary_col"] = 1 meta_dt = build_dt_summary(ddf_cal._meta) # empty frame for metadata @@ -872,37 +755,49 @@ def main(): resample_hk_partition, dt=f"{args.dt}s", meta=meta ) - def add_flow_nearest(pdf, flow_pdf, tol="500ms"): + '''def add_flow_same_second(pdf, flow_pdf, tol="999ms"): """ pdf : particle partition (pandas) flow_pdf : *entire* flow table (pandas) tol : max time difference accepted (string, Timedelta, or None) """ return pd.merge_asof( - pdf, + pdf.sort_index(), flow_pdf, left_index=True, right_index=True, - direction="nearest", - tolerance=pd.Timedelta(tol) if tol else None, - ) + direction="backward", + tolerance=pd.Timedelta(tol), + )''' - pdf_flow = ddf_hk_dt["Sample Flow Controller Read (vccm)"].compute() + flow_1s = ddf_hk_dt["Sample Flow Controller Read (vccm)"].compute() + + def floor_index_to_sec(pdf: pd.DataFrame) -> pd.DataFrame: + """ + Replace the existing DatetimeIndex with its lower-second value, + without changing the index’s name or creating a new column. + """ + pdf.index = pdf.index.floor("s") # keeps the original .name + return pdf + + # meta stays identical because only the *values* of the index change + ddf_cal = ddf_cal.map_partitions(floor_index_to_sec, meta=ddf_cal._meta) meta_pbp_with_flow = ddf_cal._meta.copy() meta_pbp_with_flow["Sample Flow Controller Read (vccm)"] = ( np.float64() ) # tell Dask a new float col appears + """ddf_pbp_with_flow = ddf_cal.map_partitions( + add_flow_same_second, + flow_pdf=flow_1s, # ← broadcast + meta=meta_pbp_with_flow, + )""" ddf_pbp_with_flow = ddf_cal.map_partitions( - add_flow_nearest, - flow_pdf=pdf_flow, # ← broadcast + lambda part: part.join(flow_1s, how="left"), meta=meta_pbp_with_flow, ) - """ddf_pbp_with_flow = ddf_pbp_with_flow.assign( - **{"BC mass bin": pd.cut(ddf_pbp_with_flow["BC mass within range"], - bins=inc_mass_bin_lims, labels=mass_bin_labels)} - )""" + ddf_pbp_with_flow = ddf_pbp_with_flow.map_partitions( lambda df: df.assign( **{ @@ -943,16 +838,7 @@ def main(): write_metadata_file=True, overwrite=True, ) - """ - meta_core = build_core_meta(ddf_cal, ddf_hk, dt_s) - ddf_core = dd.merge(ddf_cal, ddf_hk, on="Time", how="left") \ - .map_partitions(aggregate_core, dt_s=dt_s, meta=meta_core) - # always write the ‘core’ table - ddf_core.to_parquet(f"{root_out}/core_{dt_s}s", - partition_on=["date"], - overwrite=True) - """ # 4. (optional) dt bulk conc -------------------------- do_conc = args.conc meta_conc = add_concentrations(ddf_pbp_hk_dt._meta, dt=dt_s) if do_conc else None @@ -968,78 +854,8 @@ def main(): overwrite=True, ) - """if do_hist: - ddf_hist.to_parquet(f"{root_out}/hists_{dt_s}s", - partition_on=["date"], overwrite=True) - """ - # 5. (optional) dt histograms -------------------------- do_hist = args.hist - """# --------------------------------------------------------------------- - # Which particle subset → which flag column - # --------------------------------------------------------------------- - # key: suffix for column names - # val: (flag column, flag value) (None, None) means "all particles" - histogram_specs = { - "all" : (None, None), - "thin" : ("cnts_thin", 1), - "thin_noScatt" : ("cnts_thin_noScatt", 1), - "thin_low_inc_ratio" : ("cnts_thin_low_inc_scatt_ratio", 1), - "thick" : ("cnts_thick", 1), - "thick_sat" : ("cnts_thick_sat", 1), - "thin_sat" : ("cnts_thin_sat", 1), - "ntl_sat" : ("cnts_ntl_sat", 1), - "ntl" : ("cnts_ntl", 1), - "ext_pos_tl" : ("cnts_extreme_positive_timelag", 1), - "thin_total" : ("cnts_thin_total", 1), - "thick_total" : ("cnts_thick_total", 1), - "unclassified" : ("cnts_unclassified", 1), - }""" - """bins_dict = { - "mass": make_bin_arrays( # BC-mass histogram bins - lims=np.logspace( - np.log10(cfg["inc_histo"]["min_mass"]), - np.log10(cfg["inc_histo"]["max_mass"]), - cfg["inc_histo"]["n_bins"] - ) - ), # -> (limits, centres) - - "scatt": make_bin_arrays( # scattering-diameter bins - lims=np.logspace( - np.log10(cfg["scatt_histo"]["min_D"]), - np.log10(cfg["scatt_histo"]["max_D"]), - cfg["scatt_histo"]["n_bins"] - ) - ), - - "timelag": make_bin_arrays( # time-lag bins (linear) - lims=np.linspace( - cfg["time_delay_histo"]["min"], - cfg["time_delay_histo"]["max"], - cfg["time_delay_histo"]["n_bins"] - ), - log=False - ), - } - histogram_spec_list = [ - # key, flag_col, flag_val, column_name, bin_tuple, do_mass - ("all", None, None, "BC mass within range", bins_dict["mass"]), - ("thin", "cnts_thin", 1, "BC mass within range", bins_dict["mass"]), - ("thin_noScatt","cnts_thin_noScatt", 1, "BC mass within range", bins_dict["mass"]), - ("thin_low_ratio","cnts_thin_low_inc_scatt_ratio",1, "BC mass within range", bins_dict["mass"]), - ("thick", "cnts_thick", 1, "BC mass within range", bins_dict["mass"]), - ("thick_sat", "cnts_thick_sat", 1, "BC mass within range", bins_dict["mass"]), - ("thin_sat", "cnts_thin_sat", 1, "BC mass within range", bins_dict["mass"]), - ("ntl_sat", "cnts_ntl_sat", 1, "BC mass within range", bins_dict["mass"]), - ("ntl", "cnts_ntl", 1, "BC mass within range", bins_dict["mass"]), - ("ext_pos_tl", "cnts_extreme_positive_timelag", 1, "BC mass within range", bins_dict["mass"]), - ("thin_total", "cnts_thin_total", 1, "BC mass within range", bins_dict["mass"]), - ("thick_total", "cnts_thick_total", 1, "BC mass within range", bins_dict["mass"]), - ("unclassified","cnts_unclassified", 1, "BC mass within range", bins_dict["mass"]), - - # scattering-only diameter spectrum - ("scatt_only", None, None, "Opt diam scatt only", bins_dict["scatt"]), - ]""" if do_hist: # --- Mass histogram configs @@ -1067,6 +883,83 @@ def main(): {"name": "unclassified", "flag_col": "cnts_unclassified", "flag_value": 1}, ] + # ========= 2. WRAPPER FUNCTION ========= + + def make_hist_meta( + *, + bin_ctrs: np.ndarray, + kind: str, # "mass", "scatt", "timelag" + flag_col: str | None, # e.g. "cnts_thin" or None + rho_eff=None, + BC_type=None, + dtype=np.float64, + ): + """ + Return an empty DataFrame whose column names AND dtypes are exactly + what `process_hist_and_dist_partition()` will emit, without ever + calling that function. + """ + # ---- pick prefix exactly like the function ---------------------- + prefix = ( + "all" + if flag_col is None + else flag_col[5:] if flag_col.startswith("cnts_") else flag_col + ) + + # ---- translate mass bins if needed ------------------------------ + if kind == "mass" and rho_eff is not None and BC_type is not None: + # same conversion used inside the partition function + _, labels = BC_mass_to_diam(bin_ctrs, rho_eff=rho_eff, BC_type=BC_type) + n_prefix, m_prefix = "dNdlogDmev", "dMdlogDmev" + elif kind == "scatt": + labels = bin_ctrs + n_prefix, m_prefix = "dNdlogDsc", None + elif kind == "timelag": + labels = bin_ctrs + n_prefix, m_prefix = f"{prefix}_timelag", None + else: + raise ValueError(f"Unknown histogram kind '{kind}'") + + # ---- build column list ------------------------------------------ + cols = [f"{n_prefix}_{prefix}_{v:.2f}" for v in labels] + if kind == "mass" and m_prefix: + cols += [f"{m_prefix}_{prefix}_{v:.2f}" for v in labels] + + empty_df = pd.DataFrame( + {c: pd.Series(dtype=dtype) for c in cols}, + index=pd.DatetimeIndex([], name="calculated_time"), + ) + return make_meta(empty_df) + + # ========= 3. RUN MASS HISTOGRAMS ========= + + results = [] + for cfg_hist in hist_configs: + meta_hist = make_hist_meta( + bin_ctrs=inc_mass_bin_ctrs, + kind="mass", + flag_col=cfg_hist["flag_col"], + rho_eff=rho_eff, + BC_type=BC_type, + ) + ddf_out = ddf_pbp_with_flow.map_partitions( + process_hist_and_dist_partition, + col="BC mass within range", + flag_col=cfg_hist["flag_col"], + flag_value=cfg_hist["flag_value"], + bin_lims=inc_mass_bin_lims, + bin_ctrs=inc_mass_bin_ctrs, + dt_str="1s", + calculate_conc=True, + flow=None, + rho_eff=rho_eff, + BC_type=BC_type, + t=1, + meta=meta_hist, # <-- single line does the trick + ) + results.append(ddf_out) + + # ========= 4. RUN SCATTERING HISTOGRAM ========= # --- Scattering histogram configuration scatt_bin_lims = np.logspace( np.log10(cfg["scatt_histo"]["min_D"]), @@ -1074,17 +967,32 @@ def main(): cfg["scatt_histo"]["n_bins"], ) scatt_bin_ctrs = bin_lims_to_ctrs(scatt_bin_lims) - scatt_config = { - "name": "scatt", - "col": "Opt diam scatt only", - "flag_col": None, - "flag_value": None, - "bin_lims": scatt_bin_lims, - "bin_ctrs": scatt_bin_ctrs, - "rho_eff": None, - "BC_type": None, - } + meta_hist = make_hist_meta( + bin_ctrs=scatt_bin_ctrs, + kind="scatt", + flag_col=None, + rho_eff=None, + BC_type=None, + ) + ddf_scatt = ddf_pbp_with_flow.map_partitions( + process_hist_and_dist_partition, + col="Opt diam scatt only", + flag_col=None, + flag_value=None, + bin_lims=scatt_bin_lims, + bin_ctrs=scatt_bin_ctrs, + dt_str="1s", + calculate_conc=True, + flow=None, + rho_eff=None, + BC_type=None, + t=1, + meta=meta_hist, # <-- single line does the trick + ) + results.append(ddf_scatt) + # ========= 5. RUN TIMELAG HISTOGRAMS ========= + """ # --- Timelag histogram parameters timelag_bins_lims = np.linspace( cfg["time_delay_histo"]["min"], @@ -1093,86 +1001,6 @@ def main(): ) timelag_bin_ctrs = bin_lims_to_ctrs(timelag_bins_lims) - # ========= 2. WRAPPER FUNCTION ========= - - def run_histogram( - ddf, - name, - col, - flag_col, - flag_value, - bin_lims, - bin_ctrs, - dt_str, - flow=None, - rho_eff=None, - BC_type=None, - t=1, - ): - """Runs process_hist_and_dist_partition and returns a Dask DF with renamed columns.""" - - meta_cols = [f"{name}_bin{i}" for i in range(len(bin_ctrs))] - - hist_ddf = ddf.map_partitions( - process_hist_and_dist_partition, - col=col, - flag_col=flag_col, - flag_value=flag_value, - bin_lims=bin_lims, - bin_ctrs=bin_ctrs, - dt_str=dt_str, - calculate_conc=True, - flow=flow, - rho_eff=rho_eff, - BC_type=BC_type, - t=t, - meta=pd.DataFrame(columns=meta_cols), - ) - - hist_ddf.columns = meta_cols - return hist_ddf - - # ========= 3. RUN MASS HISTOGRAMS ========= - - results = [] - - for cfg in hist_configs: - ddf_out = run_histogram( - ddf_pbp_with_flow, - name=cfg["name"], - col="BC mass within range", - flag_col=cfg["flag_col"], - flag_value=cfg["flag_value"], - bin_lims=inc_mass_bin_lims, - bin_ctrs=inc_mass_bin_ctrs, - dt_str="1s", - flow=None, - rho_eff=rho_eff, - BC_type=BC_type, - t=1, - ) - results.append(ddf_out) - - # ========= 4. RUN SCATTERING HISTOGRAM ========= - - scatt_ddf = run_histogram( - ddf_pbp_with_flow, - name=scatt_config["name"], - col=scatt_config["col"], - flag_col=scatt_config["flag_col"], - flag_value=scatt_config["flag_value"], - bin_lims=scatt_config["bin_lims"], - bin_ctrs=scatt_config["bin_ctrs"], - dt_str="1s", - # flow=ddf_pbp_hk["Sample Flow Controller Read (vccm)"], - rho_eff=scatt_config["rho_eff"], - BC_type=scatt_config["BC_type"], - t=1, - ) - results.append(scatt_ddf) - - # ========= 5. RUN TIMELAG HISTOGRAMS ========= - mass_bins = ddf_pbp_with_flow["BC mass bin"].unique().compute() for idx, mass_bin in enumerate(mass_bins): ddf_bin = ddf_pbp_with_flow[ddf_pbp_with_flow["BC mass bin"] == mass_bin] @@ -1205,14 +1033,14 @@ def main(): tl_ddf.columns = meta_cols results.append(tl_ddf) - + """ # ========= 6. MERGE ALL HISTOGRAMS ========= merged_ddf = reduce(lambda left, right: left.join(right), results) # Add normalized date column # merged_ddf = merged_ddf.assign(date=merged_ddf.index.to_series().dt.normalize()) - index_as_dt = dd.to_datetime(merged_ddf.index) + index_as_dt = dd.to_datetime(merged_ddf.index.to_series()) merged_ddf["date"] = index_as_dt.map_partitions( lambda s: s.dt.normalize(), meta=("date", "datetime64[ns]") ) @@ -1224,42 +1052,6 @@ def main(): partition_on=["date"], overwrite=True, ) - """BC_all = ddf_pbp_with_flow.map_partitions( - process_hist_and_dist_partition, - col="BC mass within range", - flag_col=None, - flag_value=None, - bin_lims=inc_mass_bin_lims, - bin_ctrs=inc_mass_bin_ctrs, - dt_str='1s', - calculate_conc=True, - flow=None, - rho_eff=rho_eff, - BC_type=BC_type, - t=1, - ) - thin = ddf_pbp_with_flow.map_partitions( - process_hist_and_dist_partition, - col="BC mass within range", - flag_col="cnts_thin", - flag_value=1, - bin_lims=inc_mass_bin_lims, - bin_ctrs=inc_mass_bin_ctrs, - dt_str='1s', - calculate_conc=True, - rho_eff=rho_eff, - BC_type=BC_type, - t=1, - ) - time_index = dd.to_datetime(BC_all.index.to_series()) - BC_all["date"] = time_index.dt.normalize() # works on Series - merged_ddf = BC_all.join(thin) - # 8. Write parquet - merged_ddf.to_parquet( - f"{output_dir}/hists_{dt_s}s", - partition_on=["date"], - overwrite=True, - )""" client.close()