diff --git a/scripts/sp2xr_pipeline.py b/scripts/sp2xr_pipeline.py new file mode 100644 index 0000000..cfdb83e --- /dev/null +++ b/scripts/sp2xr_pipeline.py @@ -0,0 +1,1269 @@ +from __future__ import annotations + +import argparse +from functools import reduce +from pathlib import Path +from dask_jobqueue import SLURMCluster +from dask.distributed import Client +import yaml +import time +import dask.dataframe as dd +import pandas as pd +import numpy as np +from typing import Tuple, List +from sp2xr.calibration import calibrate_particle_data, BC_mass_to_diam + + +def parse_args(): + p = argparse.ArgumentParser( + description="SP2-XR end-to-end pipeline (calibration → 1-s summary → histograms)" + ) + + # required paths + p.add_argument("--input_pbp", required=True, help="root dir with raw PbP Parquet") + p.add_argument( + "--input_hk", required=True, help="root dir with house-keeping Parquet" + ) + p.add_argument("--output", required=True, help="root dir where results are written") + p.add_argument("--config", required=True, help="YAML with calibration coefficients") + + # optional workflow switches + p.add_argument("--conc", action="store_true", help="write bulk concentration table") + p.add_argument( + "--hist", action="store_true", help="write size/mass/time-lag histograms" + ) + p.add_argument( + "--dt", type=int, default=1, help="aggregation interval in seconds (default: 1)" + ) + + # cluster / resource knobs (all optional) + p.add_argument( + "--cores", type=int, default=8, help="CPU cores per SLURM job (default: 8)" + ) + p.add_argument("--memory", default="64GB", help="RAM per job (default: 64GB)") + p.add_argument( + "--walltime", default="00:59:00", help="SLURM wall-time (default: 1h)" + ) + p.add_argument( + "--partition", default="hourly", help="SLURM partition (default: hourly)" + ) + + return p.parse_args() + + +def make_cluster(cores=32, mem="64GB", wall="02:00:00", partition="daily"): + cluster = SLURMCluster( + cores=cores, + processes=cores, + memory=mem, + walltime=wall, + job_extra_directives=[f"--partition={partition}"], + ) + + client = Client(cluster) + cluster.scale(1) + client.wait_for_workers(1, timeout=600) + print(f"Dask dashboard: {client.dashboard_link}") + return client + + +def apply_calibration(partition, config): + """Apply calibration to a pandas partition.""" + pdf = calibrate_particle_data(partition, config) + pdf["date"] = partition.index.date.astype("datetime64[ns]") + pdf["hour"] = partition.index.hour.astype("int8") + return pdf + + +def define_flags(pdf_pbp, config): + flag_inc_transit_time = ( + pdf_pbp["Incand Transit Time"] + >= config["instrument_parameters"]["IncTransitMin"] + ) & ( + pdf_pbp["Incand Transit Time"] + <= config["instrument_parameters"]["IncTransitMax"] + ) + flag_inc_fwhm = ( + pdf_pbp["Incand FWHM"] >= config["instrument_parameters"]["IncFWHMMin"] + ) & (pdf_pbp["Incand FWHM"] <= config["instrument_parameters"]["IncFWHMMax"]) + flag_inc = flag_inc_transit_time & flag_inc_fwhm + pdf_pbp["flag_valid_inc_signal"] = flag_inc + pdf_pbp["flag_inc_not_sat"] = ( + pdf_pbp["Incand relPeak"] < config["Signal_saturation"]["IncSatPoint"] + ) + flag_inc_in_range = ( + flag_inc + & (pdf_pbp["BC mass"] >= config["inc_histo"]["min_mass"]) + & (pdf_pbp["BC mass"] <= config["inc_histo"]["max_mass"]) + ) + pdf_pbp["flag_valid_inc_signal_in_range"] = flag_inc_in_range + + flag_scatt_transit_time = ( + pdf_pbp["Scatter Transit Time"] + >= config["instrument_parameters"]["ScattTransitMin"] + ) & ( + pdf_pbp["Scatter Transit Time"] + <= config["instrument_parameters"]["ScattTransitMax"] + ) + flag_scatt_fwhm = ( + pdf_pbp["Scatter FWHM"] >= config["instrument_parameters"]["ScattFWHMMin"] + ) & (pdf_pbp["Scatter FWHM"] <= config["instrument_parameters"]["ScattFWHMMax"]) + flag_scatt = flag_scatt_transit_time & flag_scatt_fwhm + pdf_pbp["flag_valid_scatt_signal"] = flag_scatt + pdf_pbp["flag_scatt_not_sat"] = ( + pdf_pbp["Scatter relPeak"] < config["Signal_saturation"]["ScattSatPoint"] + ) + flag_scatt_in_range = ( + flag_scatt + & (pdf_pbp["Opt diam"] >= config["scatt_histo"]["min_D"]) + & (pdf_pbp["Opt diam"] <= config["scatt_histo"]["max_D"]) + ) + pdf_pbp["flag_valid_scatt_signal_in_range"] = flag_scatt_in_range + + pdf_pbp["flag_negative_timelag"] = ( + pdf_pbp["time_lag"] < config["time_delay_histo"]["min"] + ) + pdf_pbp["flag_extreme_positive_timelag"] = ( + pdf_pbp["time_lag"] >= config["time_delay_histo"]["max"] + ) + pdf_pbp["flag_valid_timelag_thin"] = ( + pdf_pbp["time_lag"] < config["thick_coating"]["threshold"] + ) & (pdf_pbp["time_lag"] >= config["time_delay_histo"]["min"]) + pdf_pbp["flag_valid_timelag_thick"] = ( + pdf_pbp["time_lag"] >= config["thick_coating"]["threshold"] + ) & (pdf_pbp["time_lag"] < config["time_delay_histo"]["max"]) + + pdf_pbp["flag_low_ratio_inc_scatt"] = pdf_pbp["ratio_inc_scatt"] < 1.1 + + pdf_pbp["Opt diam scatt only"] = pdf_pbp["Opt diam"].where( + flag_scatt & ~flag_inc, np.nan + ) + + return pdf_pbp + + +FLAG_COLS = [ + "flag_valid_scatt_signal", + "flag_scatt_not_sat", + "flag_valid_inc_signal_in_range", + "flag_valid_timelag_thin", + "flag_valid_timelag_thick", + "flag_negative_timelag", + "flag_extreme_positive_timelag", + "flag_low_ratio_inc_scatt", +] + + +def add_thin_thick_flags(pdf: pd.DataFrame) -> pd.DataFrame: + """Add cnts_* indicator + total columns (vectorised).""" + + # Safety check: ensure all flags exist + missing = [c for c in FLAG_COLS if c not in pdf.columns] + if missing: + raise KeyError(f"Missing flag columns: {missing}") + + f = pdf # shorthand + + # initialise empty int8 columns + zeros = np.zeros(len(pdf), dtype="int8") + newcols = { + "cnts_thin": zeros, + "cnts_thin_noScatt": zeros, + "cnts_thick": zeros, + "cnts_thick_sat": zeros, + "cnts_thin_sat": zeros, + "cnts_ntl_sat": zeros, + "cnts_ntl": zeros, + "cnts_extreme_positive_timelag": zeros, + "cnts_thin_low_inc_scatt_ratio": zeros, + "cnts_particles_for_tl_dist": zeros, + } + pdf = pdf.assign(**newcols) + + # ------------------------------------------------------------ + # vectorised assignments + # ------------------------------------------------------------ + pdf["cnts_thin_noScatt"] = ( + (~f.flag_valid_scatt_signal) & f.flag_valid_inc_signal_in_range + ).astype("int8") + + pdf["cnts_thin"] = ( + f.flag_valid_scatt_signal + & f.flag_scatt_not_sat + & f.flag_valid_inc_signal_in_range + & f.flag_valid_timelag_thin + & ~f.flag_low_ratio_inc_scatt + ).astype("int8") + + pdf["cnts_thin_low_inc_scatt_ratio"] = ( + f.flag_valid_scatt_signal + & f.flag_scatt_not_sat + & f.flag_valid_inc_signal_in_range + & f.flag_valid_timelag_thin + & f.flag_low_ratio_inc_scatt + ).astype("int8") + + pdf["cnts_thick"] = ( + f.flag_valid_scatt_signal + & f.flag_scatt_not_sat + & f.flag_valid_inc_signal_in_range + & f.flag_valid_timelag_thick + & ~f.flag_extreme_positive_timelag + ).astype("int8") + + pdf["cnts_thick_sat"] = ( + f.flag_valid_scatt_signal + & (~f.flag_scatt_not_sat) + & f.flag_valid_inc_signal_in_range + & f.flag_valid_timelag_thick + & ~f.flag_extreme_positive_timelag + ).astype("int8") + + pdf["cnts_thin_sat"] = ( + f.flag_valid_scatt_signal + & (~f.flag_scatt_not_sat) + & f.flag_valid_inc_signal_in_range + & f.flag_valid_timelag_thin + ).astype("int8") + + pdf["cnts_ntl_sat"] = ( + f.flag_valid_scatt_signal + & (~f.flag_scatt_not_sat) + & f.flag_valid_inc_signal_in_range + & f.flag_negative_timelag + ).astype("int8") + + pdf["cnts_ntl"] = ( + f.flag_valid_scatt_signal + & f.flag_scatt_not_sat + & f.flag_valid_inc_signal_in_range + & f.flag_negative_timelag + ).astype("int8") + + pdf["cnts_extreme_positive_timelag"] = ( + f.flag_valid_scatt_signal + & f.flag_valid_inc_signal_in_range + & f.flag_extreme_positive_timelag + ).astype("int8") + + pdf["cnts_particles_for_tl_dist"] = ( + f.flag_valid_scatt_signal + & f.flag_scatt_not_sat + & f.flag_valid_inc_signal_in_range + & ( + (f.flag_valid_timelag_thin & ~f.flag_low_ratio_inc_scatt) + | (f.flag_valid_timelag_thick & ~f.flag_extreme_positive_timelag) + ) + ).astype("int8") + + # totals + pdf["cnts_thin_total"] = (pdf["cnts_thin"] + pdf["cnts_thin_noScatt"]).astype( + "int16" + ) + + pdf["cnts_thick_total"] = ( + pdf["cnts_thick"] + + pdf["cnts_thick_sat"] + + pdf["cnts_ntl_sat"] + + pdf["cnts_ntl"] + + pdf["cnts_thin_sat"] + ).astype("int16") + + pdf["cnts_unclassified"] = ( + pdf["cnts_extreme_positive_timelag"] + pdf["cnts_thin_low_inc_scatt_ratio"] + ).astype("int16") + + return pdf + + +_SUM_COLS: List[str] = [ + "Dropped Records", + "Incand Mass (fg)", + "BC mass", + "BC mass within range", + "cnts_thin", + "cnts_thin_noScatt", + "cnts_thick", + "cnts_thick_sat", + "cnts_thin_sat", + "cnts_ntl_sat", + "cnts_ntl", + "cnts_extreme_positive_timelag", + "cnts_thin_low_inc_scatt_ratio", + "cnts_thin_total", + "cnts_thick_total", + "cnts_unclassified", +] + +_COUNT_SRC: List[str] = [ + "Incand Mass (fg)", + "BC mass", + "BC mass within range", + "Scatter Size (nm)", + "Opt diam scatt only", + "Opt diam scatt only within range", + "temporary_col", +] + +_COUNT_DST: List[str] = [ + "BC numb from file", + "BC numb", + "BC numb within range", + "scatter numb from file", + "Scatt numb", + "Scatt numb within range", + "original_idx", +] + + +def build_dt_summary(pdf: pd.DataFrame, dt_s: int = 1) -> pd.DataFrame: + """ + Resample single-particle data to `dt_s`-second bins and return a summary. + + Parameters + ---------- + pdf : pandas.DataFrame + Partition containing particle-level rows. Must have a DateTimeIndex. + dt_s : int, default 1 + Resampling interval in seconds. + + Returns + ------- + pandas.DataFrame + One row per `dt_s`, with summed counters, counts, and means. + Columns `date` and `hour` are added for downstream partitioning. + """ + dt_str = f"{dt_s}s" + + # --- SUM columns -------------------------------------------------- + df_sum = pdf[_SUM_COLS].resample(dt_str).sum() + + # --- COUNT columns ------------------------------------------------ + df_cnt = ( + pdf[_COUNT_SRC] + .resample(dt_str) + .count() + .rename(columns=dict(zip(_COUNT_SRC, _COUNT_DST))) + ) + + # --- MEAN columns ------------------------------------------------- + df_mean = ( + pdf[["Secs_2GB"]] + .resample(dt_str) + .mean() + .rename(columns={"Secs_2GB": "Secs_2GB_mean"}) + ) + + # --- merge + post-process ---------------------------------------- + out = pd.concat([df_sum, df_cnt, df_mean], axis=1) + + # drop empty bins (original_idx == 0) + out = out[out["original_idx"] != 0].drop(columns="original_idx") + + # add date / hour helper cols + out["date"] = out.index.normalize() + out["hour"] = out.index.hour.astype("int64") + + return out + + +def resample_hk_partition(pdf: pd.DataFrame, dt="1s") -> pd.DataFrame: + """ + Partition-wise resampling of HK data to `dt` bins. + Keeps only non-empty intervals (original_idx != 0). + """ + # Ensure local partition is sorted + pdf = pdf.sort_index() + + # --- resample mean values --- + df_mean = ( + pdf[ + ["Sample Flow Controller Read (sccm)", "Sample Flow Controller Read (vccm)"] + ] + .resample(dt) + .mean() + ) + + # --- resample count for 'original_idx' --- + pdf["temporary_col"] = 1 + df_count = pdf[["temporary_col"]].resample(dt).count() + df_count.rename(columns={"temporary_col": "original_idx"}, inplace=True) + + # --- merge results --- + out = df_mean.join(df_count) + + # --- drop empty bins --- + out = out[out["original_idx"] != 0].drop(columns="original_idx") + + # --- add date/hour columns --- + out["date"] = out.index.normalize() + out["hour"] = out.index.hour.astype("int64") + + return out + + +def add_concentrations(pdf: pd.DataFrame, dt: int = 1) -> pd.DataFrame: + """ + Add BC / scattering mass- and number-concentration columns. + + Parameters + ---------- + pdf : pandas.DataFrame + 1-second (or dt-second) summary partition with the columns generated + by your PbP + HK merge. + dt : int, default 1 + Length of each time bin in seconds. + + Returns + ------- + pandas.DataFrame + Same object with 12 new concentration columns. + """ + # avoid repeated division + flow_sccm = pdf["Sample Flow Controller Read (sccm)"] + flow_vccm = pdf["Sample Flow Controller Read (vccm)"] + sec_per_bin = dt / 60 # convert seconds to minutes + + # --- BC mass ------------------------------------------------------ + pdf["BC_massConc_std"] = pdf["BC mass"] * 1e-9 / (flow_sccm * sec_per_bin * 1e-6) + pdf["BC_massConc_vol"] = pdf["BC mass"] * 1e-9 / (flow_vccm * sec_per_bin * 1e-6) + + # --- BC number ---------------------------------------------------- + pdf["BC_numConc_std"] = pdf["BC numb"] / (flow_sccm * sec_per_bin) + pdf["BC_numConc_vol"] = pdf["BC numb"] / (flow_vccm * sec_per_bin) + + # --- BC within range --------------------------------------------- + pdf["BC_massConc_within_range_std"] = ( + pdf["BC mass within range"] * 1e-9 / (flow_sccm * sec_per_bin * 1e-6) + ) + pdf["BC_massConc_within_range_vol"] = ( + pdf["BC mass within range"] * 1e-9 / (flow_vccm * sec_per_bin * 1e-6) + ) + pdf["BC_numConc_within_range_std"] = pdf["BC numb within range"] / ( + flow_sccm * sec_per_bin + ) + pdf["BC_numConc_within_range_vol"] = pdf["BC numb within range"] / ( + flow_vccm * sec_per_bin + ) + + # --- scattering number ------------------------------------------- + pdf["S_numConc_std"] = pdf["Scatt numb"] / (flow_sccm * sec_per_bin) + pdf["S_numConc_vol"] = pdf["Scatt numb"] / (flow_vccm * sec_per_bin) + + pdf["S_numConc_within_range_std"] = pdf["Scatt numb within range"] / ( + flow_sccm * sec_per_bin + ) + pdf["S_numConc_within_range_vol"] = pdf["Scatt numb within range"] / ( + flow_vccm * sec_per_bin + ) + + return pdf + + +def make_bin_arrays( + *, + lims: np.ndarray | None = None, + ctrs: np.ndarray | None = None, + n: int = 50, + log: bool = True, +) -> Tuple[np.ndarray, np.ndarray]: + if lims is None and ctrs is None: + lims = ( + np.logspace(np.log10(0.3), np.log10(400), n) + if log + else np.linspace(0.3, 400, n) + ) + elif lims is None: + lims = np.append( + ctrs[0] - (ctrs[1] - ctrs[0]) / 2, + (ctrs[:-1] + ctrs[1:]) / 2, + ) + lims = np.append(lims, ctrs[-1] + (ctrs[-1] - ctrs[-2]) / 2) + ctrs = (lims[:-1] + lims[1:]) / 2 + return lims, ctrs + + +''' +def histo_counts_partition( + pdf: pd.DataFrame, + *, + col: str, + dt_s: int, + bin_lims: np.ndarray, + bin_ctrs: np.ndarray, + flag_col: Optional[str] = None, + flag_val: Optional[int] = None, +) -> pd.DataFrame: + if flag_col is not None and flag_val is not None: + pdf = pdf.loc[pdf[flag_col] == flag_val] + + dt_str = f"{dt_s}s" + + counts_ser = ( + pdf[col] + .resample(dt_str) + .apply(lambda x: np.histogram(x.dropna(), bins=bin_lims)[0]) + ) + + # keep only non-empty intervals (temporary_col exists in PbP frame) + pdf["temporary_col"] = 1 + nonempty = pdf["temporary_col"].resample(dt_str).count() + counts_ser = counts_ser[nonempty != 0] + + """if counts_ser.empty: + counts_df = pd.DataFrame( + np.nan, + index=pd.DatetimeIndex([], name=counts_ser.index.name), + columns=[f"bin_{i}" for i in range(len(bin_ctrs))], + ) + else: + counts_df = counts_ser.apply( + lambda arr: pd.Series(arr, index=bin_ctrs, dtype="int64") + )"""""" + if counts_ser.empty: + # use the REAL bin centres as column names + counts_df = pd.DataFrame( + np.nan, + index=pd.DatetimeIndex([], name=counts_ser.index.name), + columns=bin_ctrs, # <-- not "bin_0", "bin_1" + dtype="float64", + ) + else: + counts_df = counts_ser.apply( + lambda arr: pd.Series(arr, index=bin_ctrs, dtype="int64") + ) + + counts_df["date"] = counts_df.index.normalize() + counts_df["hour"] = counts_df.index.hour.astype("int64") + return counts_df + + + +def all_histograms_partition( + pdf: pd.DataFrame, + *, + specs: List[Tuple[str, Optional[str], Optional[int], str, Tuple[np.ndarray, np.ndarray]]], + dt_s: int, +) -> pd.DataFrame: + frames = [] + for idx, (key, flag_col, flag_val, col_name, bin_tuple) in enumerate(specs): + lims, ctrs = bin_tuple + h = histo_counts_partition( + pdf, + col=col_name, + dt_s=dt_s, + bin_lims=lims, + bin_ctrs=ctrs, + flag_col=flag_col, + flag_val=flag_val, + ) + helpers = h[["date", "hour"]] if idx == 0 else None + hist = h.drop(columns=["date", "hour"]) + + # prefix only the histogram columns + #hist = hist.add_prefix(f"{key}_") + hist.columns = [f"{key}_bin_{c}" for c in hist.columns] + + frames.append(hist) + if helpers is not None: + frames.append(helpers) # insert once, at the very end + + return pd.concat(frames, axis=1) + +def counts_to_conc_partition( + pdf: pd.DataFrame, + *, + flow_col: str, + dt_s: int, + rho_eff: float = 1800, + bin_width_map: dict, +) -> pd.DataFrame: + flow = pdf[flow_col] + conc_frames = [] + mass_frames = [] + + for col in pdf.columns: + if "_bin_" not in col: + continue # skip helpers, prefixed cols handled below + + # For each histogram population (prefix before first underscore) + prefixes = sorted({c.split("_", 1)[0] for c in pdf.columns if c.startswith("bin_")}) + for pref in prefixes: + cols = [c for c in pdf.columns if c.startswith(f"{pref}_bin_")] + bins = np.array([float(c.split("_")[-1]) for c in cols]) + dNd = pdf[cols].div(flow, axis=0) / (dt_s / 60) + dNd.columns = [c.replace("bin_", f"dNd_{pref}_") for c in cols] + conc_frames.append(dNd) + + # ---- add mass spectra if this is BC-mass histogram ----------- + if pref != "scatt_only": # crude rule: scatt histogram → skip mass + dlog = bin_width_map[pref] # scalar or 1-D array + density = rho_eff + dMd = dNd.mul((np.pi / 6) * density * bins**3, axis=1) / dlog + dMd.columns = [c.replace("dNd", "dMd") for c in dNd.columns] + mass_frames.append(dMd) + + out = pd.concat(conc_frames + mass_frames + [pdf[["date", "hour"]]], axis=1) + return out +''' + + +def dNdlogDp_to_dMdlogDp(N, Dp, rho=1800): + """This is from functions.py from Rob""" + """ + Given a normalized number size distribution [dN/dlogDp, cm^-3] defined over the diameters given by Dp [nm] + return the corresponding the mass size distribution [dM/dlogDp, ug/m3] + + Optional density input given in units of kg/m3 + """ + # M = N*((np.pi*(Dp*1e-9)**3)/6)*rho*1e18 + M = N * np.pi / 6 * rho * (Dp**3) * 1e-12 + return M + + +def bin_ctrs_to_lims(x_ctrs): + """This is from functions.py from Rob""" + """ + Take an array of bin centers and return an array 1 element larger of bin limits + """ + delta = np.diff(x_ctrs) / 2 + x_lims = np.append(x_ctrs[0] - delta[0], x_ctrs[:-1] + delta) + x_lims = np.append(x_lims, x_ctrs[-1] + delta[-1]) + return x_lims + + +def bin_lims_to_ctrs(x_lims): + """ + Take an array of bin limits and return an array 1 element smaller of bin centers + """ + x_ctrs = (x_lims[1:] + x_lims[:-1]) / 2 + return x_ctrs + + +def get_dlogp(Dp): + """This is from functions.py from Rob""" + """ + Given an array of diameters Dp calculate an array of the log of the difference dlogdp + """ + dlogdp = np.log10(bin_ctrs_to_lims(Dp)[1:]) - np.log10(bin_ctrs_to_lims(Dp)[:-1]) + return dlogdp + + +def counts2numConc(numb, flow, t=1): + """This is from functions.py from Rob""" + """ + Calculate number concentration from counts (cnts), flow rate in [ccm] (Q) and sampling time in [secs] (t) + Return numConc with units of [cm^-3] + """ + conc = numb.divide(flow * (t / 60), axis=0) + return conc + + +def calculate_histogram(series, bin_lims=np.logspace(np.log10(0.3), np.log10(400), 50)): + series = series.dropna() + if series.empty: + counts = np.full(len(bin_lims) - 1, np.nan, dtype=float) + else: + counts, _ = np.histogram(series, bins=bin_lims) + return counts + + +def process_hist_and_dist_partition( + df, + col, + flag_col, + flag_value, + bin_lims, + bin_ctrs, + dt_str, + calculate_conc=True, + flow=None, + rho_eff=1800, + BC_type="", + t=1, +): + # Filter the dataframe per partition + if flag_col and flag_value is not None: + df_filtered = df.loc[df[flag_col] == flag_value] + else: + df_filtered = df + + # Adjust flag_col + if flag_col is not None and "cnts" in flag_col: + flag_col = flag_col[5:] + + df_resampled = ( + df_filtered[[col]] + .resample(dt_str) + .apply(lambda x: calculate_histogram(x[col], bin_lims=bin_lims)) + ) + flow_dt = df["Sample Flow Controller Read (vccm)"].resample(dt_str).mean() + + df_resampled = df_resampled.to_frame(name="result") + + original_idx_df = df_filtered[["temporary_col"]].resample(dt_str).count() + original_idx_df.columns = ["original_idx"] # ensure proper naming + + df_resampled = df_resampled.join(original_idx_df, how="left") + if "original_idx" in df_resampled.columns: + df_resampled = df_resampled[df_resampled["original_idx"] != 0] + df_resampled = df_resampled.drop("original_idx", axis=1) + + # Initialize DataFrame based on whether the result is empty or not + list_col = "result" + max_list_length = len(bin_ctrs) + + if df_resampled.empty: + columns = [f"{list_col}_{i}" for i in range(max_list_length)] + ddf_hist = pd.DataFrame(np.nan, columns=columns, index=flow.index) + else: + ddf_hist = df_resampled[list_col].apply(pd.Series) + ddf_hist.index = df_resampled.index + + if calculate_conc: + # Join with the sample flow controller data + inc_hist_flow = ddf_hist.join(flow_dt) + + if rho_eff is not None and BC_type is not None: + # Calculate diameters and densities + density, Dmev = BC_mass_to_diam(bin_ctrs, rho_eff=rho_eff, BC_type=BC_type) + + # Calculate number concentration + dNdlogDmev = counts2numConc( + inc_hist_flow.iloc[:, :-1], inc_hist_flow.iloc[:, -1], t=t + ) / get_dlogp(Dmev) + if flag_col is None: + dNdlogDmev.columns = [f"dNdlogDmev_all_{i:.2f}" for i in Dmev] + else: + dNdlogDmev.columns = [f"dNdlogDmev_{flag_col}_{i:.2f}" for i in Dmev] + + # Calculate mass concentration + dMdlogDmev = dNdlogDp_to_dMdlogDp(dNdlogDmev, Dmev, rho=density) + if flag_col is None: + dMdlogDmev.columns = [f"dMdlogDmev_all_{i:.2f}" for i in Dmev] + else: + dMdlogDmev.columns = [f"dMdlogDmev_{flag_col}_{i:.2f}" for i in Dmev] + + else: + + # Calculate number concentration + dNdlogDmev = counts2numConc( + inc_hist_flow.iloc[:, :-1], inc_hist_flow.iloc[:, -1], t=t + ) / get_dlogp(bin_ctrs) + if flag_col is None: + dNdlogDmev.columns = [f"dNdlogDsc_all_{i:.2f}" for i in bin_ctrs] + else: + dNdlogDmev.columns = [f"dNdlogDsc_{flag_col}_{i:.2f}" for i in bin_ctrs] + + dMdlogDmev = None + # return dNdlogDmev, dMdlogDmev + + # else: + # return ddf_hist, None + + if calculate_conc: + # Concatenate results into a single dataframe + if dMdlogDmev is not None: + result_df = pd.concat([dNdlogDmev, dMdlogDmev], axis=1) + else: + result_df = dNdlogDmev + else: + result_df = ddf_hist + return result_df + + +def main(): + args = parse_args() + output_dir = Path(args.output) + + dt_s = args.dt + + # 0. cluster ------------------------------------------------------- + client = make_cluster( + cores=args.cores, mem=args.memory, wall=args.walltime, partition=args.partition + ) + + # 1. calibration stage -------------------------------------------- + cfg = yaml.safe_load(open(args.config)) + cfg_future = client.scatter(cfg, broadcast=True) + + inc_mass_bin_lims = np.logspace( + np.log10(cfg["inc_histo"]["min_mass"]), + np.log10(cfg["inc_histo"]["max_mass"]), + cfg["inc_histo"]["n_bins"], + ) + inc_mass_bin_ctrs = bin_lims_to_ctrs(inc_mass_bin_lims) + # mass_bin_labels = np.round(inc_mass_bin_ctrs, 2) + rho_eff = 1800 + BC_type = "constant_effective_density" + + ddf_raw = dd.read_parquet( + args.input_pbp, engine="pyarrow", aggregate_files=True, blocksize="32MB" + ) + + meta_cal = ddf_raw._meta.copy() + meta_cal["BC mass"] = pd.Series([], dtype="float64") + meta_cal["Opt diam"] = pd.Series([], dtype="float64") + # meta_cal["ratio_inc_scatt"] = pd.Series([], dtype="float64") + meta_cal["time_lag"] = pd.Series([], dtype="int8") + meta_cal["date"] = pd.Series([], dtype="datetime64[ns]") + meta_cal["hour"] = pd.Series([], dtype="int8") + + ddf_cal = ddf_raw.map_partitions( + apply_calibration, config=cfg_future, meta=meta_cal + ) + ddf_cal["time_lag"] = ddf_cal["Incand Peak Time"] - ddf_cal["Scatter Peak Time"] + ddf_cal["ratio_inc_scatt"] = np.log10(ddf_cal["Incand relPeak"]) / np.log10( + ddf_cal["Scatter relPeak"] + ) + + ddf_cal = define_flags(ddf_cal, cfg) + + meta_cnts = add_thin_thick_flags(ddf_cal._meta) # builds empty columns for meta + ddf_cal = ddf_cal.map_partitions(add_thin_thick_flags, meta=meta_cnts) + + # Create two new columns where the particles out of range are changed to np.nan (!! do not change them to 0, otherwise below where we resample and count, the 0s would be counted being a not null value!!) + ddf_cal["BC mass within range"] = ddf_cal["BC mass"].where( + ddf_cal["flag_valid_inc_signal_in_range"], np.nan + ) + ddf_cal["Opt diam scatt only within range"] = ddf_cal["Opt diam scatt only"].where( + ( + ddf_cal["flag_valid_scatt_signal_in_range"] + & ~ddf_cal["flag_valid_inc_signal"] + ), + np.nan, + ) + + """ddf_cal.to_parquet( + path=f"{output_dir}/pbp_calibrated", + partition_on=["date", "hour"], + engine="pyarrow", + write_index=True, + write_metadata_file=True, + overwrite=True, + )""" + + ddf_cal["temporary_col"] = 1 + + meta_dt = build_dt_summary(ddf_cal._meta) # empty frame for metadata + + ddf_pbp_dt = ddf_cal.map_partitions(build_dt_summary, dt_s=args.dt, meta=meta_dt) + + # 2. load house-keeping once -------------------------------------- + ddf_hk = dd.read_parquet( + args.input_hk, + engine="pyarrow", + # aggregate_files=True, + # blocksize="32MB" + ) + # Create a meta (empty dataframe with correct schema) + meta = pd.DataFrame( + { + "Sample Flow Controller Read (sccm)": pd.Series(dtype="float64"), + "Sample Flow Controller Read (vccm)": pd.Series(dtype="float64"), + "date": pd.Series(dtype="datetime64[ns]"), + "hour": pd.Series(dtype="int64"), + }, + index=pd.DatetimeIndex([]), + ) + + # Map partition-wise + ddf_hk_dt = ddf_hk.map_partitions( + resample_hk_partition, dt=f"{args.dt}s", meta=meta + ) + + def add_flow_nearest(pdf, flow_pdf, tol="500ms"): + """ + pdf : particle partition (pandas) + flow_pdf : *entire* flow table (pandas) + tol : max time difference accepted (string, Timedelta, or None) + """ + return pd.merge_asof( + pdf, + flow_pdf, + left_index=True, + right_index=True, + direction="nearest", + tolerance=pd.Timedelta(tol) if tol else None, + ) + + pdf_flow = ddf_hk_dt["Sample Flow Controller Read (vccm)"].compute() + + meta_pbp_with_flow = ddf_cal._meta.copy() + meta_pbp_with_flow["Sample Flow Controller Read (vccm)"] = ( + np.float64() + ) # tell Dask a new float col appears + + ddf_pbp_with_flow = ddf_cal.map_partitions( + add_flow_nearest, + flow_pdf=pdf_flow, # ← broadcast + meta=meta_pbp_with_flow, + ) + """ddf_pbp_with_flow = ddf_pbp_with_flow.assign( + **{"BC mass bin": pd.cut(ddf_pbp_with_flow["BC mass within range"], + bins=inc_mass_bin_lims, labels=mass_bin_labels)} + )""" + ddf_pbp_with_flow = ddf_pbp_with_flow.map_partitions( + lambda df: df.assign( + **{ + "BC mass bin": pd.cut( + df["BC mass within range"], bins=inc_mass_bin_lims, labels=False + ) + } + ), + meta=ddf_pbp_with_flow._meta.assign(**{"BC mass bin": 0}), + ) + + ddf_pbp_with_flow.to_parquet( + path=f"{output_dir}/pbp_calibrated", + partition_on=["date", "hour"], + engine="pyarrow", + write_index=True, + write_metadata_file=True, + overwrite=True, + ) + + # 3. aggregation core --------------------------------------------- + ddf_pbp_hk_dt = dd.merge( + ddf_pbp_dt, ddf_hk_dt, how="left", left_index=True, right_index=True + ) + time_index = dd.to_datetime(ddf_pbp_hk_dt.index.to_series()) + + ddf_pbp_hk_dt["date"] = time_index.dt.normalize() # works on Series + ddf_pbp_hk_dt["hour"] = time_index.dt.hour.astype("int64") + + # Optionally drop the old columns + ddf_pbp_hk_dt = ddf_pbp_hk_dt.drop(columns=["date_x", "hour_x", "date_y", "hour_y"]) + + ddf_pbp_hk_dt.to_parquet( + path=f"{output_dir}/combined_pbp_hk_{dt_s}s", + partition_on=["date", "hour"], + engine="pyarrow", + write_index=True, + write_metadata_file=True, + overwrite=True, + ) + """ + meta_core = build_core_meta(ddf_cal, ddf_hk, dt_s) + ddf_core = dd.merge(ddf_cal, ddf_hk, on="Time", how="left") \ + .map_partitions(aggregate_core, dt_s=dt_s, meta=meta_core) + + # always write the ‘core’ table + ddf_core.to_parquet(f"{root_out}/core_{dt_s}s", + partition_on=["date"], + overwrite=True) + """ + # 4. (optional) dt bulk conc -------------------------- + do_conc = args.conc + meta_conc = add_concentrations(ddf_pbp_hk_dt._meta, dt=dt_s) if do_conc else None + + if do_conc: + print("ok") + ddf_conc = ddf_pbp_hk_dt.map_partitions( + add_concentrations, dt=dt_s, meta=meta_conc + ) + ddf_conc.to_parquet( + f"{output_dir}/conc_{dt_s}s", + partition_on=["date"], + overwrite=True, + ) + + """if do_hist: + ddf_hist.to_parquet(f"{root_out}/hists_{dt_s}s", + partition_on=["date"], overwrite=True) + """ + + # 5. (optional) dt histograms -------------------------- + do_hist = args.hist + """# --------------------------------------------------------------------- + # Which particle subset → which flag column + # --------------------------------------------------------------------- + # key: suffix for column names + # val: (flag column, flag value) (None, None) means "all particles" + histogram_specs = { + "all" : (None, None), + "thin" : ("cnts_thin", 1), + "thin_noScatt" : ("cnts_thin_noScatt", 1), + "thin_low_inc_ratio" : ("cnts_thin_low_inc_scatt_ratio", 1), + "thick" : ("cnts_thick", 1), + "thick_sat" : ("cnts_thick_sat", 1), + "thin_sat" : ("cnts_thin_sat", 1), + "ntl_sat" : ("cnts_ntl_sat", 1), + "ntl" : ("cnts_ntl", 1), + "ext_pos_tl" : ("cnts_extreme_positive_timelag", 1), + "thin_total" : ("cnts_thin_total", 1), + "thick_total" : ("cnts_thick_total", 1), + "unclassified" : ("cnts_unclassified", 1), + }""" + """bins_dict = { + "mass": make_bin_arrays( # BC-mass histogram bins + lims=np.logspace( + np.log10(cfg["inc_histo"]["min_mass"]), + np.log10(cfg["inc_histo"]["max_mass"]), + cfg["inc_histo"]["n_bins"] + ) + ), # -> (limits, centres) + + "scatt": make_bin_arrays( # scattering-diameter bins + lims=np.logspace( + np.log10(cfg["scatt_histo"]["min_D"]), + np.log10(cfg["scatt_histo"]["max_D"]), + cfg["scatt_histo"]["n_bins"] + ) + ), + + "timelag": make_bin_arrays( # time-lag bins (linear) + lims=np.linspace( + cfg["time_delay_histo"]["min"], + cfg["time_delay_histo"]["max"], + cfg["time_delay_histo"]["n_bins"] + ), + log=False + ), + } + histogram_spec_list = [ + # key, flag_col, flag_val, column_name, bin_tuple, do_mass + ("all", None, None, "BC mass within range", bins_dict["mass"]), + ("thin", "cnts_thin", 1, "BC mass within range", bins_dict["mass"]), + ("thin_noScatt","cnts_thin_noScatt", 1, "BC mass within range", bins_dict["mass"]), + ("thin_low_ratio","cnts_thin_low_inc_scatt_ratio",1, "BC mass within range", bins_dict["mass"]), + ("thick", "cnts_thick", 1, "BC mass within range", bins_dict["mass"]), + ("thick_sat", "cnts_thick_sat", 1, "BC mass within range", bins_dict["mass"]), + ("thin_sat", "cnts_thin_sat", 1, "BC mass within range", bins_dict["mass"]), + ("ntl_sat", "cnts_ntl_sat", 1, "BC mass within range", bins_dict["mass"]), + ("ntl", "cnts_ntl", 1, "BC mass within range", bins_dict["mass"]), + ("ext_pos_tl", "cnts_extreme_positive_timelag", 1, "BC mass within range", bins_dict["mass"]), + ("thin_total", "cnts_thin_total", 1, "BC mass within range", bins_dict["mass"]), + ("thick_total", "cnts_thick_total", 1, "BC mass within range", bins_dict["mass"]), + ("unclassified","cnts_unclassified", 1, "BC mass within range", bins_dict["mass"]), + + # scattering-only diameter spectrum + ("scatt_only", None, None, "Opt diam scatt only", bins_dict["scatt"]), + ]""" + + if do_hist: + # --- Mass histogram configs + hist_configs = [ + {"name": "BC_all", "flag_col": None, "flag_value": None}, + {"name": "thin", "flag_col": "cnts_thin", "flag_value": 1}, + {"name": "thin_noScatt", "flag_col": "cnts_thin_noScatt", "flag_value": 1}, + {"name": "thick", "flag_col": "cnts_thick", "flag_value": 1}, + {"name": "thick_sat", "flag_col": "cnts_thick_sat", "flag_value": 1}, + {"name": "thin_sat", "flag_col": "cnts_thin_sat", "flag_value": 1}, + {"name": "ntl_sat", "flag_col": "cnts_ntl_sat", "flag_value": 1}, + {"name": "ntl", "flag_col": "cnts_ntl", "flag_value": 1}, + { + "name": "extreme_positive_timelag", + "flag_col": "cnts_extreme_positive_timelag", + "flag_value": 1, + }, + { + "name": "thin_low_inc_scatt_ratio", + "flag_col": "cnts_thin_low_inc_scatt_ratio", + "flag_value": 1, + }, + {"name": "thin_total", "flag_col": "cnts_thin_total", "flag_value": 1}, + {"name": "thick_total", "flag_col": "cnts_thick_total", "flag_value": 1}, + {"name": "unclassified", "flag_col": "cnts_unclassified", "flag_value": 1}, + ] + + # --- Scattering histogram configuration + scatt_bin_lims = np.logspace( + np.log10(cfg["scatt_histo"]["min_D"]), + np.log10(cfg["scatt_histo"]["max_D"]), + cfg["scatt_histo"]["n_bins"], + ) + scatt_bin_ctrs = bin_lims_to_ctrs(scatt_bin_lims) + scatt_config = { + "name": "scatt", + "col": "Opt diam scatt only", + "flag_col": None, + "flag_value": None, + "bin_lims": scatt_bin_lims, + "bin_ctrs": scatt_bin_ctrs, + "rho_eff": None, + "BC_type": None, + } + + # --- Timelag histogram parameters + timelag_bins_lims = np.linspace( + cfg["time_delay_histo"]["min"], + cfg["time_delay_histo"]["max"], + cfg["time_delay_histo"]["n_bins"], + ) + timelag_bin_ctrs = bin_lims_to_ctrs(timelag_bins_lims) + + # ========= 2. WRAPPER FUNCTION ========= + + def run_histogram( + ddf, + name, + col, + flag_col, + flag_value, + bin_lims, + bin_ctrs, + dt_str, + flow=None, + rho_eff=None, + BC_type=None, + t=1, + ): + """Runs process_hist_and_dist_partition and returns a Dask DF with renamed columns.""" + + meta_cols = [f"{name}_bin{i}" for i in range(len(bin_ctrs))] + + hist_ddf = ddf.map_partitions( + process_hist_and_dist_partition, + col=col, + flag_col=flag_col, + flag_value=flag_value, + bin_lims=bin_lims, + bin_ctrs=bin_ctrs, + dt_str=dt_str, + calculate_conc=True, + flow=flow, + rho_eff=rho_eff, + BC_type=BC_type, + t=t, + meta=pd.DataFrame(columns=meta_cols), + ) + + hist_ddf.columns = meta_cols + return hist_ddf + + # ========= 3. RUN MASS HISTOGRAMS ========= + + results = [] + + for cfg in hist_configs: + ddf_out = run_histogram( + ddf_pbp_with_flow, + name=cfg["name"], + col="BC mass within range", + flag_col=cfg["flag_col"], + flag_value=cfg["flag_value"], + bin_lims=inc_mass_bin_lims, + bin_ctrs=inc_mass_bin_ctrs, + dt_str="1s", + flow=None, + rho_eff=rho_eff, + BC_type=BC_type, + t=1, + ) + results.append(ddf_out) + + # ========= 4. RUN SCATTERING HISTOGRAM ========= + + scatt_ddf = run_histogram( + ddf_pbp_with_flow, + name=scatt_config["name"], + col=scatt_config["col"], + flag_col=scatt_config["flag_col"], + flag_value=scatt_config["flag_value"], + bin_lims=scatt_config["bin_lims"], + bin_ctrs=scatt_config["bin_ctrs"], + dt_str="1s", + # flow=ddf_pbp_hk["Sample Flow Controller Read (vccm)"], + rho_eff=scatt_config["rho_eff"], + BC_type=scatt_config["BC_type"], + t=1, + ) + results.append(scatt_ddf) + + # ========= 5. RUN TIMELAG HISTOGRAMS ========= + + mass_bins = ddf_pbp_with_flow["BC mass bin"].unique().compute() + for idx, mass_bin in enumerate(mass_bins): + ddf_bin = ddf_pbp_with_flow[ddf_pbp_with_flow["BC mass bin"] == mass_bin] + + # meta_cols = [f"dNdlogDmev_{inc_mass_bin_ctrs[idx]:.2f}_timelag_{i:.1f}" for i in timelag_bin_ctrs] + bin_center = ( + inc_mass_bin_ctrs[int(mass_bin)] + if mass_bin < len(inc_mass_bin_ctrs) + else mass_bin + ) + meta_cols = [ + f"dNdlogDmev_{bin_center:.2f}_timelag_{i:.1f}" for i in timelag_bin_ctrs + ] + + tl_ddf = ddf_bin.map_partitions( + process_hist_and_dist_partition, + col="time_lag_new", + flag_col="cnts_particles_for_tl_dist", + flag_value=1, + bin_lims=timelag_bins_lims, + bin_ctrs=timelag_bin_ctrs, + dt_str="1s", + calculate_conc=True, + # flow=ddf_pbp_hk["Sample Flow Controller Read (vccm)"], + rho_eff=None, + BC_type=None, + t=1, + meta=pd.DataFrame(columns=meta_cols), + ) + + tl_ddf.columns = meta_cols + results.append(tl_ddf) + + # ========= 6. MERGE ALL HISTOGRAMS ========= + + merged_ddf = reduce(lambda left, right: left.join(right), results) + + # Add normalized date column + # merged_ddf = merged_ddf.assign(date=merged_ddf.index.to_series().dt.normalize()) + index_as_dt = dd.to_datetime(merged_ddf.index) + merged_ddf["date"] = index_as_dt.map_partitions( + lambda s: s.dt.normalize(), meta=("date", "datetime64[ns]") + ) + + # ========= 7. SAVE TO PARQUET ========= + + merged_ddf.to_parquet( + f"{output_dir}/hists_{dt_s}s", + partition_on=["date"], + overwrite=True, + ) + """BC_all = ddf_pbp_with_flow.map_partitions( + process_hist_and_dist_partition, + col="BC mass within range", + flag_col=None, + flag_value=None, + bin_lims=inc_mass_bin_lims, + bin_ctrs=inc_mass_bin_ctrs, + dt_str='1s', + calculate_conc=True, + flow=None, + rho_eff=rho_eff, + BC_type=BC_type, + t=1, + ) + thin = ddf_pbp_with_flow.map_partitions( + process_hist_and_dist_partition, + col="BC mass within range", + flag_col="cnts_thin", + flag_value=1, + bin_lims=inc_mass_bin_lims, + bin_ctrs=inc_mass_bin_ctrs, + dt_str='1s', + calculate_conc=True, + rho_eff=rho_eff, + BC_type=BC_type, + t=1, + ) + time_index = dd.to_datetime(BC_all.index.to_series()) + BC_all["date"] = time_index.dt.normalize() # works on Series + merged_ddf = BC_all.join(thin) + # 8. Write parquet + merged_ddf.to_parquet( + f"{output_dir}/hists_{dt_s}s", + partition_on=["date"], + overwrite=True, + )""" + + client.close() + + +if __name__ == "__main__": + start_time = time.time() + main()