From fa4cb9e7d4fb0216fbbabc42612abbabadf39015 Mon Sep 17 00:00:00 2001 From: Barbara Bertozzi Date: Tue, 12 Aug 2025 15:53:48 +0200 Subject: [PATCH] Refactor: moved some functions to respective modules and implemented the timelag hists --- scripts/sp2xr_apply_calibration.py | 6 +- scripts/sp2xr_pipeline.py | 716 ++--------------------------- src/sp2xr/calibration.py | 11 +- src/sp2xr/distribution.py | 174 ++++++- 4 files changed, 237 insertions(+), 670 deletions(-) diff --git a/scripts/sp2xr_apply_calibration.py b/scripts/sp2xr_apply_calibration.py index c502d49..c3bd32b 100644 --- a/scripts/sp2xr_apply_calibration.py +++ b/scripts/sp2xr_apply_calibration.py @@ -51,8 +51,8 @@ def parse_args(): def apply_calibration(partition, config): """Apply calibration to a pandas partition.""" pdf = calibrate_particle_data(partition, config) - pdf["date"] = pdf.index.date.astype("datetime64[ns]") - pdf["hour"] = pdf.index.hour.astype("int64") + # pdf["date"] = pdf.index.date.astype("datetime64[ns]") + # pdf["hour"] = pdf.index.hour.astype("int64") return pdf @@ -99,6 +99,8 @@ def main(): # --- Apply calibration in parallel --- calibrated_df = df.map_partitions(apply_calibration, config=cfg_future, meta=meta) + calibrated_df["date"] = df.index.dt.date.astype("date64[pyarrow]") + calibrated_df["hour"] = df.index.dt.hour.astype("i8") # --- Save calibrated dataset (partitioned by date/hour) --- calibrated_df.to_parquet( diff --git a/scripts/sp2xr_pipeline.py b/scripts/sp2xr_pipeline.py index feaf128..d479f66 100644 --- a/scripts/sp2xr_pipeline.py +++ b/scripts/sp2xr_pipeline.py @@ -1,18 +1,23 @@ from __future__ import annotations import argparse -from functools import reduce from pathlib import Path from dask_jobqueue import SLURMCluster from dask.distributed import Client import yaml import time import dask.dataframe as dd -from dask.dataframe.utils import make_meta import pandas as pd import numpy as np -from typing import Tuple, List -from sp2xr.calibration import calibrate_particle_data, BC_mass_to_diam +from sp2xr.calibration import calibrate_particle_data +from sp2xr.flag_single_particle_data import define_flags, add_thin_thick_flags +from sp2xr.resample_pbp_hk import build_dt_summary, resample_hk_partition +from sp2xr.distribution import ( + bin_lims_to_ctrs, + process_hist_and_dist_partition, + make_hist_meta, +) +from sp2xr.concentrations import add_concentrations def parse_args(): @@ -52,7 +57,7 @@ def parse_args(): return p.parse_args() -def make_cluster(cores=32, mem="64GB", wall="02:00:00", partition="daily", out_dir=""): +def make_cluster(cores=32, mem="64GB", wall="00:59:00", partition="hourly", out_dir=""): cluster = SLURMCluster( cores=cores, processes=cores, @@ -71,595 +76,6 @@ def make_cluster(cores=32, mem="64GB", wall="02:00:00", partition="daily", out_d return client -def apply_calibration(partition, config): - """Apply calibration to a pandas partition.""" - pdf = calibrate_particle_data(partition, config) - pdf["date"] = partition.index.date.astype("datetime64[ns]") - pdf["hour"] = partition.index.hour.astype("int8") - return pdf - - -def define_flags(pdf_pbp, config): - flag_inc_transit_time = ( - pdf_pbp["Incand Transit Time"] - >= config["instrument_parameters"]["IncTransitMin"] - ) & ( - pdf_pbp["Incand Transit Time"] - <= config["instrument_parameters"]["IncTransitMax"] - ) - flag_inc_fwhm = ( - pdf_pbp["Incand FWHM"] >= config["instrument_parameters"]["IncFWHMMin"] - ) & (pdf_pbp["Incand FWHM"] <= config["instrument_parameters"]["IncFWHMMax"]) - flag_inc = flag_inc_transit_time & flag_inc_fwhm - pdf_pbp["flag_valid_inc_signal"] = flag_inc - pdf_pbp["flag_inc_not_sat"] = ( - pdf_pbp["Incand relPeak"] < config["Signal_saturation"]["IncSatPoint"] - ) - flag_inc_in_range = ( - flag_inc - & (pdf_pbp["BC mass"] >= config["inc_histo"]["min_mass"]) - & (pdf_pbp["BC mass"] <= config["inc_histo"]["max_mass"]) - ) - pdf_pbp["flag_valid_inc_signal_in_range"] = flag_inc_in_range - - flag_scatt_transit_time = ( - pdf_pbp["Scatter Transit Time"] - >= config["instrument_parameters"]["ScattTransitMin"] - ) & ( - pdf_pbp["Scatter Transit Time"] - <= config["instrument_parameters"]["ScattTransitMax"] - ) - flag_scatt_fwhm = ( - pdf_pbp["Scatter FWHM"] >= config["instrument_parameters"]["ScattFWHMMin"] - ) & (pdf_pbp["Scatter FWHM"] <= config["instrument_parameters"]["ScattFWHMMax"]) - flag_scatt = flag_scatt_transit_time & flag_scatt_fwhm - pdf_pbp["flag_valid_scatt_signal"] = flag_scatt - pdf_pbp["flag_scatt_not_sat"] = ( - pdf_pbp["Scatter relPeak"] < config["Signal_saturation"]["ScattSatPoint"] - ) - flag_scatt_in_range = ( - flag_scatt - & (pdf_pbp["Opt diam"] >= config["scatt_histo"]["min_D"]) - & (pdf_pbp["Opt diam"] <= config["scatt_histo"]["max_D"]) - ) - pdf_pbp["flag_valid_scatt_signal_in_range"] = flag_scatt_in_range - - pdf_pbp["flag_negative_timelag"] = ( - pdf_pbp["time_lag"] < config["time_delay_histo"]["min"] - ) - pdf_pbp["flag_extreme_positive_timelag"] = ( - pdf_pbp["time_lag"] >= config["time_delay_histo"]["max"] - ) - pdf_pbp["flag_valid_timelag_thin"] = ( - pdf_pbp["time_lag"] < config["thick_coating"]["threshold"] - ) & (pdf_pbp["time_lag"] >= config["time_delay_histo"]["min"]) - pdf_pbp["flag_valid_timelag_thick"] = ( - pdf_pbp["time_lag"] >= config["thick_coating"]["threshold"] - ) & (pdf_pbp["time_lag"] < config["time_delay_histo"]["max"]) - - pdf_pbp["flag_low_ratio_inc_scatt"] = pdf_pbp["ratio_inc_scatt"] < 1.1 - - pdf_pbp["Opt diam scatt only"] = pdf_pbp["Opt diam"].where( - flag_scatt & ~flag_inc, np.nan - ) - - return pdf_pbp - - -FLAG_COLS = [ - "flag_valid_scatt_signal", - "flag_scatt_not_sat", - "flag_valid_inc_signal_in_range", - "flag_valid_timelag_thin", - "flag_valid_timelag_thick", - "flag_negative_timelag", - "flag_extreme_positive_timelag", - "flag_low_ratio_inc_scatt", -] - - -def add_thin_thick_flags(pdf: pd.DataFrame) -> pd.DataFrame: - """Add cnts_* indicator + total columns (vectorised).""" - - # Safety check: ensure all flags exist - missing = [c for c in FLAG_COLS if c not in pdf.columns] - if missing: - raise KeyError(f"Missing flag columns: {missing}") - - f = pdf # shorthand - - # initialise empty int8 columns - zeros = np.zeros(len(pdf), dtype="int8") - newcols = { - "cnts_thin": zeros, - "cnts_thin_noScatt": zeros, - "cnts_thick": zeros, - "cnts_thick_sat": zeros, - "cnts_thin_sat": zeros, - "cnts_ntl_sat": zeros, - "cnts_ntl": zeros, - "cnts_extreme_positive_timelag": zeros, - "cnts_thin_low_inc_scatt_ratio": zeros, - "cnts_particles_for_tl_dist": zeros, - } - pdf = pdf.assign(**newcols) - - # ------------------------------------------------------------ - # vectorised assignments - # ------------------------------------------------------------ - pdf["cnts_thin_noScatt"] = ( - (~f.flag_valid_scatt_signal) & f.flag_valid_inc_signal_in_range - ).astype("int8") - - pdf["cnts_thin"] = ( - f.flag_valid_scatt_signal - & f.flag_scatt_not_sat - & f.flag_valid_inc_signal_in_range - & f.flag_valid_timelag_thin - & ~f.flag_low_ratio_inc_scatt - ).astype("int8") - - pdf["cnts_thin_low_inc_scatt_ratio"] = ( - f.flag_valid_scatt_signal - & f.flag_scatt_not_sat - & f.flag_valid_inc_signal_in_range - & f.flag_valid_timelag_thin - & f.flag_low_ratio_inc_scatt - ).astype("int8") - - pdf["cnts_thick"] = ( - f.flag_valid_scatt_signal - & f.flag_scatt_not_sat - & f.flag_valid_inc_signal_in_range - & f.flag_valid_timelag_thick - & ~f.flag_extreme_positive_timelag - ).astype("int8") - - pdf["cnts_thick_sat"] = ( - f.flag_valid_scatt_signal - & (~f.flag_scatt_not_sat) - & f.flag_valid_inc_signal_in_range - & f.flag_valid_timelag_thick - & ~f.flag_extreme_positive_timelag - ).astype("int8") - - pdf["cnts_thin_sat"] = ( - f.flag_valid_scatt_signal - & (~f.flag_scatt_not_sat) - & f.flag_valid_inc_signal_in_range - & f.flag_valid_timelag_thin - ).astype("int8") - - pdf["cnts_ntl_sat"] = ( - f.flag_valid_scatt_signal - & (~f.flag_scatt_not_sat) - & f.flag_valid_inc_signal_in_range - & f.flag_negative_timelag - ).astype("int8") - - pdf["cnts_ntl"] = ( - f.flag_valid_scatt_signal - & f.flag_scatt_not_sat - & f.flag_valid_inc_signal_in_range - & f.flag_negative_timelag - ).astype("int8") - - pdf["cnts_extreme_positive_timelag"] = ( - f.flag_valid_scatt_signal - & f.flag_valid_inc_signal_in_range - & f.flag_extreme_positive_timelag - ).astype("int8") - - pdf["cnts_particles_for_tl_dist"] = ( - f.flag_valid_scatt_signal - & f.flag_scatt_not_sat - & f.flag_valid_inc_signal_in_range - & ( - (f.flag_valid_timelag_thin & ~f.flag_low_ratio_inc_scatt) - | (f.flag_valid_timelag_thick & ~f.flag_extreme_positive_timelag) - ) - ).astype("int8") - - # totals - pdf["cnts_thin_total"] = (pdf["cnts_thin"] + pdf["cnts_thin_noScatt"]).astype( - "int16" - ) - - pdf["cnts_thick_total"] = ( - pdf["cnts_thick"] - + pdf["cnts_thick_sat"] - + pdf["cnts_ntl_sat"] - + pdf["cnts_ntl"] - + pdf["cnts_thin_sat"] - ).astype("int16") - - pdf["cnts_unclassified"] = ( - pdf["cnts_extreme_positive_timelag"] + pdf["cnts_thin_low_inc_scatt_ratio"] - ).astype("int16") - - return pdf - - -_SUM_COLS: List[str] = [ - "Dropped Records", - "Incand Mass (fg)", - "BC mass", - "BC mass within range", - "cnts_thin", - "cnts_thin_noScatt", - "cnts_thick", - "cnts_thick_sat", - "cnts_thin_sat", - "cnts_ntl_sat", - "cnts_ntl", - "cnts_extreme_positive_timelag", - "cnts_thin_low_inc_scatt_ratio", - "cnts_thin_total", - "cnts_thick_total", - "cnts_unclassified", -] - -_COUNT_SRC: List[str] = [ - "Incand Mass (fg)", - "BC mass", - "BC mass within range", - "Scatter Size (nm)", - "Opt diam scatt only", - "Opt diam scatt only within range", - "temporary_col", -] - -_COUNT_DST: List[str] = [ - "BC numb from file", - "BC numb", - "BC numb within range", - "scatter numb from file", - "Scatt numb", - "Scatt numb within range", - "original_idx", -] - - -def build_dt_summary(pdf: pd.DataFrame, dt_s: int = 1) -> pd.DataFrame: - """ - Resample single-particle data to `dt_s`-second bins and return a summary. - - Parameters - ---------- - pdf : pandas.DataFrame - Partition containing particle-level rows. Must have a DateTimeIndex. - dt_s : int, default 1 - Resampling interval in seconds. - - Returns - ------- - pandas.DataFrame - One row per `dt_s`, with summed counters, counts, and means. - Columns `date` and `hour` are added for downstream partitioning. - """ - dt_str = f"{dt_s}s" - - # --- SUM columns -------------------------------------------------- - df_sum = pdf[_SUM_COLS].resample(dt_str).sum() - - # --- COUNT columns ------------------------------------------------ - df_cnt = ( - pdf[_COUNT_SRC] - .resample(dt_str) - .count() - .rename(columns=dict(zip(_COUNT_SRC, _COUNT_DST))) - ) - - # --- MEAN columns ------------------------------------------------- - df_mean = ( - pdf[["Secs_2GB"]] - .resample(dt_str) - .mean() - .rename(columns={"Secs_2GB": "Secs_2GB_mean"}) - ) - - # --- merge + post-process ---------------------------------------- - out = pd.concat([df_sum, df_cnt, df_mean], axis=1) - - # drop empty bins (original_idx == 0) - out = out[out["original_idx"] != 0].drop(columns="original_idx") - - # add date / hour helper cols - out["date"] = out.index.normalize() - out["hour"] = out.index.hour.astype("int64") - - return out - - -def resample_hk_partition(pdf: pd.DataFrame, dt="1s") -> pd.DataFrame: - """ - Partition-wise resampling of HK data to `dt` bins. - Keeps only non-empty intervals (original_idx != 0). - """ - # Ensure local partition is sorted - pdf = pdf.sort_index() - pdf.index = pdf.index.floor(dt) - - # --- resample mean values --- - df_mean = ( - pdf[ - ["Sample Flow Controller Read (sccm)", "Sample Flow Controller Read (vccm)"] - ] - .resample(dt) - .mean() - ) - - # --- resample count for 'original_idx' --- - pdf["temporary_col"] = 1 - df_count = pdf[["temporary_col"]].resample(dt).count() - df_count.rename(columns={"temporary_col": "original_idx"}, inplace=True) - - # --- merge results --- - out = df_mean.join(df_count) - - # --- drop empty bins --- - out = out[out["original_idx"] != 0].drop(columns="original_idx") - - out = out.asfreq(dt).ffill(limit=1) - - # --- add date/hour columns --- - out["date"] = out.index.normalize() - out["hour"] = out.index.hour.astype("int64") - - return out - - -def add_concentrations(pdf: pd.DataFrame, dt: int = 1) -> pd.DataFrame: - """ - Add BC / scattering mass- and number-concentration columns. - - Parameters - ---------- - pdf : pandas.DataFrame - 1-second (or dt-second) summary partition with the columns generated - by your PbP + HK merge. - dt : int, default 1 - Length of each time bin in seconds. - - Returns - ------- - pandas.DataFrame - Same object with 12 new concentration columns. - """ - # avoid repeated division - flow_sccm = pdf["Sample Flow Controller Read (sccm)"] - flow_vccm = pdf["Sample Flow Controller Read (vccm)"] - sec_per_bin = dt / 60 # convert seconds to minutes - - # --- BC mass ------------------------------------------------------ - pdf["BC_massConc_std"] = pdf["BC mass"] * 1e-9 / (flow_sccm * sec_per_bin * 1e-6) - pdf["BC_massConc_vol"] = pdf["BC mass"] * 1e-9 / (flow_vccm * sec_per_bin * 1e-6) - - # --- BC number ---------------------------------------------------- - pdf["BC_numConc_std"] = pdf["BC numb"] / (flow_sccm * sec_per_bin) - pdf["BC_numConc_vol"] = pdf["BC numb"] / (flow_vccm * sec_per_bin) - - # --- BC within range --------------------------------------------- - pdf["BC_massConc_within_range_std"] = ( - pdf["BC mass within range"] * 1e-9 / (flow_sccm * sec_per_bin * 1e-6) - ) - pdf["BC_massConc_within_range_vol"] = ( - pdf["BC mass within range"] * 1e-9 / (flow_vccm * sec_per_bin * 1e-6) - ) - pdf["BC_numConc_within_range_std"] = pdf["BC numb within range"] / ( - flow_sccm * sec_per_bin - ) - pdf["BC_numConc_within_range_vol"] = pdf["BC numb within range"] / ( - flow_vccm * sec_per_bin - ) - - # --- scattering number ------------------------------------------- - pdf["S_numConc_std"] = pdf["Scatt numb"] / (flow_sccm * sec_per_bin) - pdf["S_numConc_vol"] = pdf["Scatt numb"] / (flow_vccm * sec_per_bin) - - pdf["S_numConc_within_range_std"] = pdf["Scatt numb within range"] / ( - flow_sccm * sec_per_bin - ) - pdf["S_numConc_within_range_vol"] = pdf["Scatt numb within range"] / ( - flow_vccm * sec_per_bin - ) - - return pdf - - -def make_bin_arrays( - *, - lims: np.ndarray | None = None, - ctrs: np.ndarray | None = None, - n: int = 50, - log: bool = True, -) -> Tuple[np.ndarray, np.ndarray]: - if lims is None and ctrs is None: - lims = ( - np.logspace(np.log10(0.3), np.log10(400), n) - if log - else np.linspace(0.3, 400, n) - ) - elif lims is None: - lims = np.append( - ctrs[0] - (ctrs[1] - ctrs[0]) / 2, - (ctrs[:-1] + ctrs[1:]) / 2, - ) - lims = np.append(lims, ctrs[-1] + (ctrs[-1] - ctrs[-2]) / 2) - ctrs = (lims[:-1] + lims[1:]) / 2 - return lims, ctrs - - -def dNdlogDp_to_dMdlogDp(N, Dp, rho=1800): - """This is from functions.py from Rob""" - """ - Given a normalized number size distribution [dN/dlogDp, cm^-3] defined over the diameters given by Dp [nm] - return the corresponding the mass size distribution [dM/dlogDp, ug/m3] - - Optional density input given in units of kg/m3 - """ - # M = N*((np.pi*(Dp*1e-9)**3)/6)*rho*1e18 - M = N * np.pi / 6 * rho * (Dp**3) * 1e-12 - return M - - -def bin_ctrs_to_lims(x_ctrs): - """This is from functions.py from Rob""" - """ - Take an array of bin centers and return an array 1 element larger of bin limits - """ - delta = np.diff(x_ctrs) / 2 - x_lims = np.append(x_ctrs[0] - delta[0], x_ctrs[:-1] + delta) - x_lims = np.append(x_lims, x_ctrs[-1] + delta[-1]) - return x_lims - - -def bin_lims_to_ctrs(x_lims): - """ - Take an array of bin limits and return an array 1 element smaller of bin centers - """ - x_ctrs = (x_lims[1:] + x_lims[:-1]) / 2 - return x_ctrs - - -def get_dlogp(Dp): - """This is from functions.py from Rob""" - """ - Given an array of diameters Dp calculate an array of the log of the difference dlogdp - """ - dlogdp = np.log10(bin_ctrs_to_lims(Dp)[1:]) - np.log10(bin_ctrs_to_lims(Dp)[:-1]) - return dlogdp - - -def counts2numConc(numb, flow, t=1): - """This is from functions.py from Rob""" - """ - Calculate number concentration from counts (cnts), flow rate in [ccm] (Q) and sampling time in [secs] (t) - Return numConc with units of [cm^-3] - """ - conc = numb.divide(flow * (t / 60), axis=0) - return conc - - -def calculate_histogram(series, bin_lims=np.logspace(np.log10(0.3), np.log10(400), 50)): - series = series.dropna() - if series.empty: - counts = np.full(len(bin_lims) - 1, np.nan, dtype=float) - else: - counts, _ = np.histogram(series, bins=bin_lims) - return counts - - -def process_hist_and_dist_partition( - df, - col, - flag_col, - flag_value, - bin_lims, - bin_ctrs, - dt_str, - calculate_conc=True, - flow=None, - rho_eff=1800, - BC_type="", - t=1, -): - # Filter the dataframe per partition - if flag_col and flag_value is not None: - df_filtered = df.loc[df[flag_col] == flag_value] - else: - df_filtered = df - - # Adjust flag_col - if flag_col is not None and "cnts" in flag_col: - flag_col = flag_col[5:] - - df_resampled = ( - df_filtered[[col]] - .resample(dt_str) - .apply(lambda x: calculate_histogram(x[col], bin_lims=bin_lims)) - ) - flow_dt = df["Sample Flow Controller Read (vccm)"].resample(dt_str).mean() - - # df_resampled = df_resampled.to_frame(name="result") - if isinstance(df_resampled, pd.Series): - df_resampled = df_resampled.to_frame(name="result") - else: # already a DataFrame - df_resampled.columns = ["result"] - - original_idx_df = df_filtered[["temporary_col"]].resample(dt_str).count() - original_idx_df.columns = ["original_idx"] # ensure proper naming - - df_resampled = df_resampled.join(original_idx_df, how="left") - if "original_idx" in df_resampled.columns: - df_resampled = df_resampled[df_resampled["original_idx"] != 0] - df_resampled = df_resampled.drop("original_idx", axis=1) - - # Initialize DataFrame based on whether the result is empty or not - list_col = "result" - max_list_length = len(bin_ctrs) - - if df_resampled.empty: - columns = [f"{list_col}_{i}" for i in range(max_list_length)] - ddf_hist = pd.DataFrame(np.nan, columns=columns, index=flow.index) - else: - ddf_hist = df_resampled[list_col].apply(pd.Series) - ddf_hist.index = df_resampled.index - columns = [f"{list_col}_{i}" for i in range(max_list_length)] - ddf_hist.columns = columns - - if calculate_conc: - # Join with the sample flow controller data - inc_hist_flow = ddf_hist.join(flow_dt) - - if rho_eff is not None and BC_type is not None: - # Calculate diameters and densities - density, Dmev = BC_mass_to_diam(bin_ctrs, rho_eff=rho_eff, BC_type=BC_type) - - # Calculate number concentration - dNdlogDmev = counts2numConc( - inc_hist_flow.iloc[:, :-1], inc_hist_flow.iloc[:, -1], t=t - ) / get_dlogp(Dmev) - if flag_col is None: - dNdlogDmev.columns = [f"dNdlogDmev_all_{i:.2f}" for i in Dmev] - else: - dNdlogDmev.columns = [f"dNdlogDmev_{flag_col}_{i:.2f}" for i in Dmev] - - # Calculate mass concentration - dMdlogDmev = dNdlogDp_to_dMdlogDp(dNdlogDmev, Dmev, rho=density) - if flag_col is None: - dMdlogDmev.columns = [f"dMdlogDmev_all_{i:.2f}" for i in Dmev] - else: - dMdlogDmev.columns = [f"dMdlogDmev_{flag_col}_{i:.2f}" for i in Dmev] - - else: - - # Calculate number concentration - dNdlogDmev = counts2numConc( - inc_hist_flow.iloc[:, :-1], inc_hist_flow.iloc[:, -1], t=t - ) / get_dlogp(bin_ctrs) - if flag_col is None: - dNdlogDmev.columns = [f"dNdlogDsc_all_{i:.2f}" for i in bin_ctrs] - else: - dNdlogDmev.columns = [f"dNdlogDsc_{flag_col}_{i:.2f}" for i in bin_ctrs] - - dMdlogDmev = None - # return dNdlogDmev, dMdlogDmev - - # else: - # return ddf_hist, None - - if calculate_conc: - # Concatenate results into a single dataframe - if dMdlogDmev is not None: - result_df = pd.concat([dNdlogDmev, dMdlogDmev], axis=1) - else: - result_df = dNdlogDmev - else: - result_df = ddf_hist - return result_df - - def main(): args = parse_args() output_dir = Path(args.output) @@ -702,7 +118,7 @@ def main(): meta_cal["hour"] = pd.Series([], dtype="int8") ddf_cal = ddf_raw.map_partitions( - apply_calibration, config=cfg_future, meta=meta_cal + calibrate_particle_data, config=cfg_future, meta=meta_cal ) ddf_cal["time_lag"] = ddf_cal["Incand Peak Time"] - ddf_cal["Scatter Peak Time"] ddf_cal["ratio_inc_scatt"] = np.log10(ddf_cal["Incand relPeak"]) / np.log10( @@ -770,18 +186,18 @@ def main(): tolerance=pd.Timedelta(tol), )''' - flow_1s = ddf_hk_dt["Sample Flow Controller Read (vccm)"].compute() + flow_dt = ddf_hk_dt["Sample Flow Controller Read (vccm)"].compute() - def floor_index_to_sec(pdf: pd.DataFrame) -> pd.DataFrame: + def floor_index_to_dt(pdf: pd.DataFrame) -> pd.DataFrame: """ Replace the existing DatetimeIndex with its lower-second value, without changing the index’s name or creating a new column. """ - pdf.index = pdf.index.floor("s") # keeps the original .name + pdf.index = pdf.index.floor(f"{args.dt}s") # keeps the original .name return pdf # meta stays identical because only the *values* of the index change - ddf_cal = ddf_cal.map_partitions(floor_index_to_sec, meta=ddf_cal._meta) + ddf_cal = ddf_cal.map_partitions(floor_index_to_dt, meta=ddf_cal._meta) meta_pbp_with_flow = ddf_cal._meta.copy() meta_pbp_with_flow["Sample Flow Controller Read (vccm)"] = ( @@ -794,7 +210,7 @@ def main(): meta=meta_pbp_with_flow, )""" ddf_pbp_with_flow = ddf_cal.map_partitions( - lambda part: part.join(flow_1s, how="left"), + lambda part: part.join(flow_dt, how="left"), meta=meta_pbp_with_flow, ) @@ -858,6 +274,7 @@ def main(): do_hist = args.hist if do_hist: + # ========= 3. RUN MASS HISTOGRAMS ========= # --- Mass histogram configs hist_configs = [ {"name": "BC_all", "flag_col": None, "flag_value": None}, @@ -883,57 +300,8 @@ def main(): {"name": "unclassified", "flag_col": "cnts_unclassified", "flag_value": 1}, ] - # ========= 2. WRAPPER FUNCTION ========= - - def make_hist_meta( - *, - bin_ctrs: np.ndarray, - kind: str, # "mass", "scatt", "timelag" - flag_col: str | None, # e.g. "cnts_thin" or None - rho_eff=None, - BC_type=None, - dtype=np.float64, - ): - """ - Return an empty DataFrame whose column names AND dtypes are exactly - what `process_hist_and_dist_partition()` will emit, without ever - calling that function. - """ - # ---- pick prefix exactly like the function ---------------------- - prefix = ( - "all" - if flag_col is None - else flag_col[5:] if flag_col.startswith("cnts_") else flag_col - ) - - # ---- translate mass bins if needed ------------------------------ - if kind == "mass" and rho_eff is not None and BC_type is not None: - # same conversion used inside the partition function - _, labels = BC_mass_to_diam(bin_ctrs, rho_eff=rho_eff, BC_type=BC_type) - n_prefix, m_prefix = "dNdlogDmev", "dMdlogDmev" - elif kind == "scatt": - labels = bin_ctrs - n_prefix, m_prefix = "dNdlogDsc", None - elif kind == "timelag": - labels = bin_ctrs - n_prefix, m_prefix = f"{prefix}_timelag", None - else: - raise ValueError(f"Unknown histogram kind '{kind}'") - - # ---- build column list ------------------------------------------ - cols = [f"{n_prefix}_{prefix}_{v:.2f}" for v in labels] - if kind == "mass" and m_prefix: - cols += [f"{m_prefix}_{prefix}_{v:.2f}" for v in labels] - - empty_df = pd.DataFrame( - {c: pd.Series(dtype=dtype) for c in cols}, - index=pd.DatetimeIndex([], name="calculated_time"), - ) - return make_meta(empty_df) - - # ========= 3. RUN MASS HISTOGRAMS ========= - results = [] + for cfg_hist in hist_configs: meta_hist = make_hist_meta( bin_ctrs=inc_mass_bin_ctrs, @@ -949,7 +317,7 @@ def main(): flag_value=cfg_hist["flag_value"], bin_lims=inc_mass_bin_lims, bin_ctrs=inc_mass_bin_ctrs, - dt_str="1s", + dt=dt_s, calculate_conc=True, flow=None, rho_eff=rho_eff, @@ -982,7 +350,7 @@ def main(): flag_value=None, bin_lims=scatt_bin_lims, bin_ctrs=scatt_bin_ctrs, - dt_str="1s", + dt=dt_s, calculate_conc=True, flow=None, rho_eff=None, @@ -991,8 +359,9 @@ def main(): meta=meta_hist, # <-- single line does the trick ) results.append(ddf_scatt) + # ========= 5. RUN TIMELAG HISTOGRAMS ========= - """ + # --- Timelag histogram parameters timelag_bins_lims = np.linspace( cfg["time_delay_histo"]["min"], @@ -1002,41 +371,58 @@ def main(): timelag_bin_ctrs = bin_lims_to_ctrs(timelag_bins_lims) mass_bins = ddf_pbp_with_flow["BC mass bin"].unique().compute() - for idx, mass_bin in enumerate(mass_bins): + for idx, mass_bin in enumerate(mass_bins[:1]): ddf_bin = ddf_pbp_with_flow[ddf_pbp_with_flow["BC mass bin"] == mass_bin] - # meta_cols = [f"dNdlogDmev_{inc_mass_bin_ctrs[idx]:.2f}_timelag_{i:.1f}" for i in timelag_bin_ctrs] - bin_center = ( + name_prefix = f"dNdlogDmev_{inc_mass_bin_ctrs[idx]:.2f}_timelag" + + # meta_cols = [f"{name_prefix}_{i:.2f}" for i in timelag_bin_ctrs] + # meta_float = pd.DataFrame({c: pd.Series([], dtype="float64") for c in meta_cols}) + + meta_hist = make_hist_meta( + bin_ctrs=timelag_bin_ctrs, + kind="timelag", + flag_col="cnts_particles_for_tl_dist", + name_prefix=name_prefix, + rho_eff=None, + BC_type=None, + ) + + # meta_cols = [f"dNdlogDmev_{inc_mass_bin_ctrs[idx]:.2f}_particles_for_tl_dist_{i:.2f}" for i in timelag_bin_ctrs] + """bin_center = ( inc_mass_bin_ctrs[int(mass_bin)] if mass_bin < len(inc_mass_bin_ctrs) else mass_bin ) meta_cols = [ - f"dNdlogDmev_{bin_center:.2f}_timelag_{i:.1f}" for i in timelag_bin_ctrs - ] + f"dNdlogDmev_{bin_center:.2f}_particles_for_tl_dist_{i:.2f}" for i in timelag_bin_ctrs + ]""" tl_ddf = ddf_bin.map_partitions( process_hist_and_dist_partition, - col="time_lag_new", + col="time_lag", flag_col="cnts_particles_for_tl_dist", flag_value=1, bin_lims=timelag_bins_lims, bin_ctrs=timelag_bin_ctrs, - dt_str="1s", + dt=dt_s, calculate_conc=True, - # flow=ddf_pbp_hk["Sample Flow Controller Read (vccm)"], + flow=None, # ddf_pbp_hk["Sample Flow Controller Read (vccm)"], rho_eff=None, BC_type=None, t=1, - meta=pd.DataFrame(columns=meta_cols), + name_prefix=name_prefix, # <-- force naming + meta=meta_hist, # pd.DataFrame(columns=meta_cols) ) - tl_ddf.columns = meta_cols + # tl_ddf.columns = meta_cols results.append(tl_ddf) - """ + # ========= 6. MERGE ALL HISTOGRAMS ========= - merged_ddf = reduce(lambda left, right: left.join(right), results) + # merged_ddf = reduce(lambda left, right: left.join(right), results) + merged_ddf = dd.concat(results, axis=1, interleave_partitions=True) + # merged_ddf = merged_ddf.rename_axis(index="time") # Add normalized date column # merged_ddf = merged_ddf.assign(date=merged_ddf.index.to_series().dt.normalize()) diff --git a/src/sp2xr/calibration.py b/src/sp2xr/calibration.py index 8bc9718..66539cc 100644 --- a/src/sp2xr/calibration.py +++ b/src/sp2xr/calibration.py @@ -2,6 +2,14 @@ import numpy as np from numpy.polynomial import Polynomial +def apply_calibration(partition, config): + """Apply calibration to a pandas partition.""" + pdf = calibrate_particle_data(partition, config) + # pdf["date"] = partition.index.date.astype("datetime64[ns]") + # pdf["hour"] = partition.index.hour.astype("int8") + return pdf + + def polynomial(x, *coefficients): """ Function to generate a generic polynomial curve. The number of input coefficicnets define the degree of the polynomial curve @@ -126,7 +134,8 @@ def calibrate_particle_data(df, config): curve_type=scatt_conf.get("curve_type"), params=scatt_conf.get("parameters", []), ) - + df["date"] = df.index.date.astype("datetime64[ns]") + df["hour"] = df.index.hour.astype("int8") return df diff --git a/src/sp2xr/distribution.py b/src/sp2xr/distribution.py index f648ff2..f1ef593 100644 --- a/src/sp2xr/distribution.py +++ b/src/sp2xr/distribution.py @@ -1,6 +1,9 @@ +from __future__ import annotations import numpy as np import pandas as pd from typing import Tuple +from typing import Optional +from dask.dataframe.utils import make_meta from .calibration import BC_mass_to_diam @@ -87,20 +90,21 @@ def calculate_histogram(series, bin_lims=np.logspace(np.log10(0.3), np.log10(400 return counts -def process_hist_and_dist_partition( +"""def process_hist_and_dist_partition( df, col, flag_col, flag_value, bin_lims, bin_ctrs, - dt_str, + dt, calculate_conc=True, flow=None, rho_eff=1800, BC_type="", t=1, ): + dt_str = f"{dt}s" # Filter the dataframe per partition if flag_col and flag_value is not None: df_filtered = df.loc[df[flag_col] == flag_value] @@ -195,3 +199,169 @@ def process_hist_and_dist_partition( else: result_df = ddf_hist return result_df +""" + + +def process_hist_and_dist_partition( + df: pd.DataFrame, + col: str, + flag_col: Optional[str], + flag_value: Optional[int], + bin_lims: np.ndarray, + bin_ctrs: np.ndarray, + dt, + calculate_conc: bool = True, + flow=None, # kept for API compatibility; not used + rho_eff: Optional[float] = 1800, + BC_type: Optional[str] = "", + t: float = 1.0, + name_prefix: Optional[str] = None, # <-- NEW: force exact output name prefix +): + # normalize dt -> "Xs" + dt_str = f"{dt}s" if isinstance(dt, (int, float)) else str(dt) + + # filter by flag (when provided) + if flag_col and flag_value is not None: + df_filtered = df.loc[df[flag_col] == flag_value] + else: + df_filtered = df + + # adjust flag_col "cnts_*" -> "*" + if flag_col is not None and flag_col.startswith("cnts_"): + flag_core = flag_col[5:] + else: + flag_core = flag_col + + # histogram of 'col' per time bin + df_resampled = ( + df_filtered[[col]] + .resample(dt_str) + .apply(lambda x: calculate_histogram(x[col], bin_lims=bin_lims)) + ) + + # flow per time bin (from this partition’s index) + flow_dt = df["Sample Flow Controller Read (vccm)"].resample(dt_str).mean() + + # ensure df_resampled is a single-column DataFrame named "result" + if isinstance(df_resampled, pd.Series): + df_resampled = df_resampled.to_frame(name="result") + else: + df_resampled.columns = ["result"] + + # drop time bins with zero original rows (matches your prior logic) + original_idx_df = df_filtered[["temporary_col"]].resample(dt_str).count() + original_idx_df.columns = ["original_idx"] + df_resampled = df_resampled.join(original_idx_df, how="left") + if "original_idx" in df_resampled.columns: + df_resampled = df_resampled[df_resampled["original_idx"] != 0] + df_resampled = df_resampled.drop(columns=["original_idx"]) + + # expand list-of-counts -> wide matrix (one col per bin) + max_list_length = len(bin_ctrs) + if df_resampled.empty: + # IMPORTANT: use the resampled time index so schema matches later joins + idx = flow_dt.index + columns = [f"result_{i}" for i in range(max_list_length)] + ddf_hist = pd.DataFrame(np.nan, columns=columns, index=idx) + else: + ddf_hist = df_resampled["result"].apply(pd.Series) + ddf_hist.index = df_resampled.index + ddf_hist.columns = [f"result_{i}" for i in range(max_list_length)] + + if not calculate_conc: + return ddf_hist + + # join flow + inc_hist_flow = ddf_hist.join(flow_dt) + + # compute number concentration (per-log-bin) + # last column is flow; left part are counts per bin + dNd = counts2numConc(inc_hist_flow.iloc[:, :-1], inc_hist_flow.iloc[:, -1], t=t) + + # choose the abscissa used for dlog (Dmev for BC, or bin_ctrs otherwise) + if rho_eff is not None and BC_type is not None: + density, Dmev = BC_mass_to_diam(bin_ctrs, rho_eff=rho_eff, BC_type=BC_type) + dNdlog = dNd / get_dlogp(Dmev) + # names + if name_prefix is not None: + dNdlog.columns = [f"{name_prefix}_{b:.2f}" for b in Dmev] + else: + base = "dNdlogDmev" + tag = "all" if flag_core is None else flag_core + dNdlog.columns = [f"{base}_{tag}_{b:.2f}" for b in Dmev] + + # mass concentration + dMdlog = dNdlogDp_to_dMdlogDp(dNdlog, Dmev, rho=density) + if name_prefix is not None: + # mirror the naming with a mass prefix + mass_prefix = name_prefix.replace("dNdlog", "dMdlog") + dMdlog.columns = [f"{mass_prefix}_{b:.2f}" for b in Dmev] + else: + base = "dMdlogDmev" + tag = "all" if flag_core is None else flag_core + dMdlog.columns = [f"{base}_{tag}_{b:.2f}" for b in Dmev] + + return pd.concat([dNdlog, dMdlog], axis=1) + + else: + # e.g., timelag/scattering case (no BC diameter conversion) + dNdlog = dNd / get_dlogp(bin_ctrs) + if name_prefix is not None: + dNdlog.columns = [f"{name_prefix}_{b:.2f}" for b in bin_ctrs] + else: + base = "dNdlogDsc" + tag = "all" if flag_core is None else flag_core + dNdlog.columns = [f"{base}_{tag}_{b:.2f}" for b in bin_ctrs] + return dNdlog + + +def make_hist_meta( + *, + bin_ctrs: np.ndarray, + kind: str, # "mass", "scatt", "timelag" + flag_col: str | None, # e.g. "cnts_thin" or None + name_prefix=None, + rho_eff=None, + BC_type=None, + dtype=np.float64, +): + """ + Return an empty DataFrame whose column names AND dtypes are exactly + what `process_hist_and_dist_partition()` will emit, without ever + calling that function. + """ + # ---- pick prefix exactly like the function ---------------------- + prefix = ( + "all_" + if flag_col is None + else f"{flag_col[5:]}_" if flag_col.startswith("cnts_") else f"{flag_col}_" + ) + + # ---- translate mass bins if needed ------------------------------ + if kind == "mass" and rho_eff is not None and BC_type is not None: + # same conversion used inside the partition function + _, labels = BC_mass_to_diam(bin_ctrs, rho_eff=rho_eff, BC_type=BC_type) + n_prefix, m_prefix = "dNdlogDmev", "dMdlogDmev" + elif kind == "scatt": + labels = bin_ctrs + n_prefix, m_prefix = "dNdlogDsc", None + elif kind == "timelag": + labels = bin_ctrs + if name_prefix is None: + n_prefix, m_prefix = f"{prefix}", None + else: + prefix = "" + n_prefix, m_prefix = f"{name_prefix}", None + else: + raise ValueError(f"Unknown histogram kind '{kind}'") + + # ---- build column list ------------------------------------------ + cols = [f"{n_prefix}_{prefix}{v:.2f}" for v in labels] + if kind == "mass" and m_prefix: + cols += [f"{m_prefix}_{prefix}{v:.2f}" for v in labels] + + empty_df = pd.DataFrame( + {c: pd.Series(dtype=dtype) for c in cols}, + index=pd.DatetimeIndex([], name="calculated_time"), + ) + return make_meta(empty_df)