diff --git a/src/sp2xr/concentrations.py b/src/sp2xr/concentrations.py new file mode 100644 index 0000000..cd81249 --- /dev/null +++ b/src/sp2xr/concentrations.py @@ -0,0 +1,59 @@ +import pandas as pd + + +def add_concentrations(pdf: pd.DataFrame, dt: int = 1) -> pd.DataFrame: + """ + Add BC / scattering mass- and number-concentration columns. + + Parameters + ---------- + pdf : pandas.DataFrame + 1-second (or dt-second) summary partition with the columns generated + by your PbP + HK merge. + dt : int, default 1 + Length of each time bin in seconds. + + Returns + ------- + pandas.DataFrame + Same object with 12 new concentration columns. + """ + # avoid repeated division + flow_sccm = pdf["Sample Flow Controller Read (sccm)"] + flow_vccm = pdf["Sample Flow Controller Read (vccm)"] + sec_per_bin = dt / 60 # convert seconds to minutes + + # --- BC mass ------------------------------------------------------ + pdf["BC_massConc_std"] = pdf["BC mass"] * 1e-9 / (flow_sccm * sec_per_bin * 1e-6) + pdf["BC_massConc_vol"] = pdf["BC mass"] * 1e-9 / (flow_vccm * sec_per_bin * 1e-6) + + # --- BC number ---------------------------------------------------- + pdf["BC_numConc_std"] = pdf["BC numb"] / (flow_sccm * sec_per_bin) + pdf["BC_numConc_vol"] = pdf["BC numb"] / (flow_vccm * sec_per_bin) + + # --- BC within range --------------------------------------------- + pdf["BC_massConc_within_range_std"] = ( + pdf["BC mass within range"] * 1e-9 / (flow_sccm * sec_per_bin * 1e-6) + ) + pdf["BC_massConc_within_range_vol"] = ( + pdf["BC mass within range"] * 1e-9 / (flow_vccm * sec_per_bin * 1e-6) + ) + pdf["BC_numConc_within_range_std"] = pdf["BC numb within range"] / ( + flow_sccm * sec_per_bin + ) + pdf["BC_numConc_within_range_vol"] = pdf["BC numb within range"] / ( + flow_vccm * sec_per_bin + ) + + # --- scattering number ------------------------------------------- + pdf["S_numConc_std"] = pdf["Scatt numb"] / (flow_sccm * sec_per_bin) + pdf["S_numConc_vol"] = pdf["Scatt numb"] / (flow_vccm * sec_per_bin) + + pdf["S_numConc_within_range_std"] = pdf["Scatt numb within range"] / ( + flow_sccm * sec_per_bin + ) + pdf["S_numConc_within_range_vol"] = pdf["Scatt numb within range"] / ( + flow_vccm * sec_per_bin + ) + + return pdf diff --git a/src/sp2xr/distribution.py b/src/sp2xr/distribution.py new file mode 100644 index 0000000..f648ff2 --- /dev/null +++ b/src/sp2xr/distribution.py @@ -0,0 +1,197 @@ +import numpy as np +import pandas as pd +from typing import Tuple +from .calibration import BC_mass_to_diam + + +def make_bin_arrays( + *, + lims: np.ndarray | None = None, + ctrs: np.ndarray | None = None, + n: int = 50, + log: bool = True, +) -> Tuple[np.ndarray, np.ndarray]: + if lims is None and ctrs is None: + lims = ( + np.logspace(np.log10(0.3), np.log10(400), n) + if log + else np.linspace(0.3, 400, n) + ) + elif lims is None: + lims = np.append( + ctrs[0] - (ctrs[1] - ctrs[0]) / 2, + (ctrs[:-1] + ctrs[1:]) / 2, + ) + lims = np.append(lims, ctrs[-1] + (ctrs[-1] - ctrs[-2]) / 2) + ctrs = (lims[:-1] + lims[1:]) / 2 + return lims, ctrs + + +def dNdlogDp_to_dMdlogDp(N, Dp, rho=1800): + """This is from functions.py from Rob""" + """ + Given a normalized number size distribution [dN/dlogDp, cm^-3] defined over the diameters given by Dp [nm] + return the corresponding the mass size distribution [dM/dlogDp, ug/m3] + + Optional density input given in units of kg/m3 + """ + # M = N*((np.pi*(Dp*1e-9)**3)/6)*rho*1e18 + M = N * np.pi / 6 * rho * (Dp**3) * 1e-12 + return M + + +def bin_ctrs_to_lims(x_ctrs): + """This is from functions.py from Rob""" + """ + Take an array of bin centers and return an array 1 element larger of bin limits + """ + delta = np.diff(x_ctrs) / 2 + x_lims = np.append(x_ctrs[0] - delta[0], x_ctrs[:-1] + delta) + x_lims = np.append(x_lims, x_ctrs[-1] + delta[-1]) + return x_lims + + +def bin_lims_to_ctrs(x_lims): + """ + Take an array of bin limits and return an array 1 element smaller of bin centers + """ + x_ctrs = (x_lims[1:] + x_lims[:-1]) / 2 + return x_ctrs + + +def get_dlogp(Dp): + """This is from functions.py from Rob""" + """ + Given an array of diameters Dp calculate an array of the log of the difference dlogdp + """ + dlogdp = np.log10(bin_ctrs_to_lims(Dp)[1:]) - np.log10(bin_ctrs_to_lims(Dp)[:-1]) + return dlogdp + + +def counts2numConc(numb, flow, t=1): + """This is from functions.py from Rob""" + """ + Calculate number concentration from counts (cnts), flow rate in [ccm] (Q) and sampling time in [secs] (t) + Return numConc with units of [cm^-3] + """ + conc = numb.divide(flow * (t / 60), axis=0) + return conc + + +def calculate_histogram(series, bin_lims=np.logspace(np.log10(0.3), np.log10(400), 50)): + series = series.dropna() + if series.empty: + counts = np.full(len(bin_lims) - 1, np.nan, dtype=float) + else: + counts, _ = np.histogram(series, bins=bin_lims) + return counts + + +def process_hist_and_dist_partition( + df, + col, + flag_col, + flag_value, + bin_lims, + bin_ctrs, + dt_str, + calculate_conc=True, + flow=None, + rho_eff=1800, + BC_type="", + t=1, +): + # Filter the dataframe per partition + if flag_col and flag_value is not None: + df_filtered = df.loc[df[flag_col] == flag_value] + else: + df_filtered = df + + # Adjust flag_col + if flag_col is not None and "cnts" in flag_col: + flag_col = flag_col[5:] + + df_resampled = ( + df_filtered[[col]] + .resample(dt_str) + .apply(lambda x: calculate_histogram(x[col], bin_lims=bin_lims)) + ) + flow_dt = df["Sample Flow Controller Read (vccm)"].resample(dt_str).mean() + + # df_resampled = df_resampled.to_frame(name="result") + if isinstance(df_resampled, pd.Series): + df_resampled = df_resampled.to_frame(name="result") + else: # already a DataFrame + df_resampled.columns = ["result"] + + original_idx_df = df_filtered[["temporary_col"]].resample(dt_str).count() + original_idx_df.columns = ["original_idx"] # ensure proper naming + + df_resampled = df_resampled.join(original_idx_df, how="left") + if "original_idx" in df_resampled.columns: + df_resampled = df_resampled[df_resampled["original_idx"] != 0] + df_resampled = df_resampled.drop("original_idx", axis=1) + + # Initialize DataFrame based on whether the result is empty or not + list_col = "result" + max_list_length = len(bin_ctrs) + + if df_resampled.empty: + columns = [f"{list_col}_{i}" for i in range(max_list_length)] + ddf_hist = pd.DataFrame(np.nan, columns=columns, index=flow.index) + else: + ddf_hist = df_resampled[list_col].apply(pd.Series) + ddf_hist.index = df_resampled.index + columns = [f"{list_col}_{i}" for i in range(max_list_length)] + ddf_hist.columns = columns + + if calculate_conc: + # Join with the sample flow controller data + inc_hist_flow = ddf_hist.join(flow_dt) + + if rho_eff is not None and BC_type is not None: + # Calculate diameters and densities + density, Dmev = BC_mass_to_diam(bin_ctrs, rho_eff=rho_eff, BC_type=BC_type) + + # Calculate number concentration + dNdlogDmev = counts2numConc( + inc_hist_flow.iloc[:, :-1], inc_hist_flow.iloc[:, -1], t=t + ) / get_dlogp(Dmev) + if flag_col is None: + dNdlogDmev.columns = [f"dNdlogDmev_all_{i:.2f}" for i in Dmev] + else: + dNdlogDmev.columns = [f"dNdlogDmev_{flag_col}_{i:.2f}" for i in Dmev] + + # Calculate mass concentration + dMdlogDmev = dNdlogDp_to_dMdlogDp(dNdlogDmev, Dmev, rho=density) + if flag_col is None: + dMdlogDmev.columns = [f"dMdlogDmev_all_{i:.2f}" for i in Dmev] + else: + dMdlogDmev.columns = [f"dMdlogDmev_{flag_col}_{i:.2f}" for i in Dmev] + + else: + + # Calculate number concentration + dNdlogDmev = counts2numConc( + inc_hist_flow.iloc[:, :-1], inc_hist_flow.iloc[:, -1], t=t + ) / get_dlogp(bin_ctrs) + if flag_col is None: + dNdlogDmev.columns = [f"dNdlogDsc_all_{i:.2f}" for i in bin_ctrs] + else: + dNdlogDmev.columns = [f"dNdlogDsc_{flag_col}_{i:.2f}" for i in bin_ctrs] + + dMdlogDmev = None + # return dNdlogDmev, dMdlogDmev + + # else: + # return ddf_hist, None + + if calculate_conc: + # Concatenate results into a single dataframe + if dMdlogDmev is not None: + result_df = pd.concat([dNdlogDmev, dMdlogDmev], axis=1) + else: + result_df = dNdlogDmev + else: + result_df = ddf_hist + return result_df diff --git a/src/sp2xr/flag_single_particle_data.py b/src/sp2xr/flag_single_particle_data.py new file mode 100644 index 0000000..c670323 --- /dev/null +++ b/src/sp2xr/flag_single_particle_data.py @@ -0,0 +1,203 @@ +import numpy as np +import pandas as pd + + +def define_flags(pdf_pbp, config): + flag_inc_transit_time = ( + pdf_pbp["Incand Transit Time"] + >= config["instrument_parameters"]["IncTransitMin"] + ) & ( + pdf_pbp["Incand Transit Time"] + <= config["instrument_parameters"]["IncTransitMax"] + ) + flag_inc_fwhm = ( + pdf_pbp["Incand FWHM"] >= config["instrument_parameters"]["IncFWHMMin"] + ) & (pdf_pbp["Incand FWHM"] <= config["instrument_parameters"]["IncFWHMMax"]) + flag_inc = flag_inc_transit_time & flag_inc_fwhm + pdf_pbp["flag_valid_inc_signal"] = flag_inc + pdf_pbp["flag_inc_not_sat"] = ( + pdf_pbp["Incand relPeak"] < config["Signal_saturation"]["IncSatPoint"] + ) + flag_inc_in_range = ( + flag_inc + & (pdf_pbp["BC mass"] >= config["inc_histo"]["min_mass"]) + & (pdf_pbp["BC mass"] <= config["inc_histo"]["max_mass"]) + ) + pdf_pbp["flag_valid_inc_signal_in_range"] = flag_inc_in_range + + flag_scatt_transit_time = ( + pdf_pbp["Scatter Transit Time"] + >= config["instrument_parameters"]["ScattTransitMin"] + ) & ( + pdf_pbp["Scatter Transit Time"] + <= config["instrument_parameters"]["ScattTransitMax"] + ) + flag_scatt_fwhm = ( + pdf_pbp["Scatter FWHM"] >= config["instrument_parameters"]["ScattFWHMMin"] + ) & (pdf_pbp["Scatter FWHM"] <= config["instrument_parameters"]["ScattFWHMMax"]) + flag_scatt = flag_scatt_transit_time & flag_scatt_fwhm + pdf_pbp["flag_valid_scatt_signal"] = flag_scatt + pdf_pbp["flag_scatt_not_sat"] = ( + pdf_pbp["Scatter relPeak"] < config["Signal_saturation"]["ScattSatPoint"] + ) + flag_scatt_in_range = ( + flag_scatt + & (pdf_pbp["Opt diam"] >= config["scatt_histo"]["min_D"]) + & (pdf_pbp["Opt diam"] <= config["scatt_histo"]["max_D"]) + ) + pdf_pbp["flag_valid_scatt_signal_in_range"] = flag_scatt_in_range + + pdf_pbp["flag_negative_timelag"] = ( + pdf_pbp["time_lag"] < config["time_delay_histo"]["min"] + ) + pdf_pbp["flag_extreme_positive_timelag"] = ( + pdf_pbp["time_lag"] >= config["time_delay_histo"]["max"] + ) + pdf_pbp["flag_valid_timelag_thin"] = ( + pdf_pbp["time_lag"] < config["thick_coating"]["threshold"] + ) & (pdf_pbp["time_lag"] >= config["time_delay_histo"]["min"]) + pdf_pbp["flag_valid_timelag_thick"] = ( + pdf_pbp["time_lag"] >= config["thick_coating"]["threshold"] + ) & (pdf_pbp["time_lag"] < config["time_delay_histo"]["max"]) + + pdf_pbp["flag_low_ratio_inc_scatt"] = pdf_pbp["ratio_inc_scatt"] < 1.1 + + pdf_pbp["Opt diam scatt only"] = pdf_pbp["Opt diam"].where( + flag_scatt & ~flag_inc, np.nan + ) + + return pdf_pbp + + +FLAG_COLS = [ + "flag_valid_scatt_signal", + "flag_scatt_not_sat", + "flag_valid_inc_signal_in_range", + "flag_valid_timelag_thin", + "flag_valid_timelag_thick", + "flag_negative_timelag", + "flag_extreme_positive_timelag", + "flag_low_ratio_inc_scatt", +] + + +def add_thin_thick_flags(pdf: pd.DataFrame) -> pd.DataFrame: + """Add cnts_* indicator + total columns (vectorised).""" + + # Safety check: ensure all flags exist + missing = [c for c in FLAG_COLS if c not in pdf.columns] + if missing: + raise KeyError(f"Missing flag columns: {missing}") + + f = pdf # shorthand + + # initialise empty int8 columns + zeros = np.zeros(len(pdf), dtype="int8") + newcols = { + "cnts_thin": zeros, + "cnts_thin_noScatt": zeros, + "cnts_thick": zeros, + "cnts_thick_sat": zeros, + "cnts_thin_sat": zeros, + "cnts_ntl_sat": zeros, + "cnts_ntl": zeros, + "cnts_extreme_positive_timelag": zeros, + "cnts_thin_low_inc_scatt_ratio": zeros, + "cnts_particles_for_tl_dist": zeros, + } + pdf = pdf.assign(**newcols) + + # ------------------------------------------------------------ + # vectorised assignments + # ------------------------------------------------------------ + pdf["cnts_thin_noScatt"] = ( + (~f.flag_valid_scatt_signal) & f.flag_valid_inc_signal_in_range + ).astype("int8") + + pdf["cnts_thin"] = ( + f.flag_valid_scatt_signal + & f.flag_scatt_not_sat + & f.flag_valid_inc_signal_in_range + & f.flag_valid_timelag_thin + & ~f.flag_low_ratio_inc_scatt + ).astype("int8") + + pdf["cnts_thin_low_inc_scatt_ratio"] = ( + f.flag_valid_scatt_signal + & f.flag_scatt_not_sat + & f.flag_valid_inc_signal_in_range + & f.flag_valid_timelag_thin + & f.flag_low_ratio_inc_scatt + ).astype("int8") + + pdf["cnts_thick"] = ( + f.flag_valid_scatt_signal + & f.flag_scatt_not_sat + & f.flag_valid_inc_signal_in_range + & f.flag_valid_timelag_thick + & ~f.flag_extreme_positive_timelag + ).astype("int8") + + pdf["cnts_thick_sat"] = ( + f.flag_valid_scatt_signal + & (~f.flag_scatt_not_sat) + & f.flag_valid_inc_signal_in_range + & f.flag_valid_timelag_thick + & ~f.flag_extreme_positive_timelag + ).astype("int8") + + pdf["cnts_thin_sat"] = ( + f.flag_valid_scatt_signal + & (~f.flag_scatt_not_sat) + & f.flag_valid_inc_signal_in_range + & f.flag_valid_timelag_thin + ).astype("int8") + + pdf["cnts_ntl_sat"] = ( + f.flag_valid_scatt_signal + & (~f.flag_scatt_not_sat) + & f.flag_valid_inc_signal_in_range + & f.flag_negative_timelag + ).astype("int8") + + pdf["cnts_ntl"] = ( + f.flag_valid_scatt_signal + & f.flag_scatt_not_sat + & f.flag_valid_inc_signal_in_range + & f.flag_negative_timelag + ).astype("int8") + + pdf["cnts_extreme_positive_timelag"] = ( + f.flag_valid_scatt_signal + & f.flag_valid_inc_signal_in_range + & f.flag_extreme_positive_timelag + ).astype("int8") + + pdf["cnts_particles_for_tl_dist"] = ( + f.flag_valid_scatt_signal + & f.flag_scatt_not_sat + & f.flag_valid_inc_signal_in_range + & ( + (f.flag_valid_timelag_thin & ~f.flag_low_ratio_inc_scatt) + | (f.flag_valid_timelag_thick & ~f.flag_extreme_positive_timelag) + ) + ).astype("int8") + + # totals + pdf["cnts_thin_total"] = (pdf["cnts_thin"] + pdf["cnts_thin_noScatt"]).astype( + "int16" + ) + + pdf["cnts_thick_total"] = ( + pdf["cnts_thick"] + + pdf["cnts_thick_sat"] + + pdf["cnts_ntl_sat"] + + pdf["cnts_ntl"] + + pdf["cnts_thin_sat"] + ).astype("int16") + + pdf["cnts_unclassified"] = ( + pdf["cnts_extreme_positive_timelag"] + pdf["cnts_thin_low_inc_scatt_ratio"] + ).astype("int16") + + return pdf diff --git a/src/sp2xr/resample_pbp_hk.py b/src/sp2xr/resample_pbp_hk.py new file mode 100644 index 0000000..485623b --- /dev/null +++ b/src/sp2xr/resample_pbp_hk.py @@ -0,0 +1,131 @@ +from typing import List +import pandas as pd + + +_SUM_COLS: List[str] = [ + "Dropped Records", + "Incand Mass (fg)", + "BC mass", + "BC mass within range", + "cnts_thin", + "cnts_thin_noScatt", + "cnts_thick", + "cnts_thick_sat", + "cnts_thin_sat", + "cnts_ntl_sat", + "cnts_ntl", + "cnts_extreme_positive_timelag", + "cnts_thin_low_inc_scatt_ratio", + "cnts_thin_total", + "cnts_thick_total", + "cnts_unclassified", +] + +_COUNT_SRC: List[str] = [ + "Incand Mass (fg)", + "BC mass", + "BC mass within range", + "Scatter Size (nm)", + "Opt diam scatt only", + "Opt diam scatt only within range", + "temporary_col", +] + +_COUNT_DST: List[str] = [ + "BC numb from file", + "BC numb", + "BC numb within range", + "scatter numb from file", + "Scatt numb", + "Scatt numb within range", + "original_idx", +] + + +def build_dt_summary(pdf: pd.DataFrame, dt_s: int = 1) -> pd.DataFrame: + """ + Resample single-particle data to `dt_s`-second bins and return a summary. + + Parameters + ---------- + pdf : pandas.DataFrame + Partition containing particle-level rows. Must have a DateTimeIndex. + dt_s : int, default 1 + Resampling interval in seconds. + + Returns + ------- + pandas.DataFrame + One row per `dt_s`, with summed counters, counts, and means. + Columns `date` and `hour` are added for downstream partitioning. + """ + dt_str = f"{dt_s}s" + + # --- SUM columns -------------------------------------------------- + df_sum = pdf[_SUM_COLS].resample(dt_str).sum() + + # --- COUNT columns ------------------------------------------------ + df_cnt = ( + pdf[_COUNT_SRC] + .resample(dt_str) + .count() + .rename(columns=dict(zip(_COUNT_SRC, _COUNT_DST))) + ) + + # --- MEAN columns ------------------------------------------------- + df_mean = ( + pdf[["Secs_2GB"]] + .resample(dt_str) + .mean() + .rename(columns={"Secs_2GB": "Secs_2GB_mean"}) + ) + + # --- merge + post-process ---------------------------------------- + out = pd.concat([df_sum, df_cnt, df_mean], axis=1) + + # drop empty bins (original_idx == 0) + out = out[out["original_idx"] != 0].drop(columns="original_idx") + + # add date / hour helper cols + out["date"] = out.index.normalize() + out["hour"] = out.index.hour.astype("int64") + + return out + + +def resample_hk_partition(pdf: pd.DataFrame, dt="1s") -> pd.DataFrame: + """ + Partition-wise resampling of HK data to `dt` bins. + Keeps only non-empty intervals (original_idx != 0). + """ + # Ensure local partition is sorted + pdf = pdf.sort_index() + pdf.index = pdf.index.floor(dt) + + # --- resample mean values --- + df_mean = ( + pdf[ + ["Sample Flow Controller Read (sccm)", "Sample Flow Controller Read (vccm)"] + ] + .resample(dt) + .mean() + ) + + # --- resample count for 'original_idx' --- + pdf["temporary_col"] = 1 + df_count = pdf[["temporary_col"]].resample(dt).count() + df_count.rename(columns={"temporary_col": "original_idx"}, inplace=True) + + # --- merge results --- + out = df_mean.join(df_count) + + # --- drop empty bins --- + out = out[out["original_idx"] != 0].drop(columns="original_idx") + + out = out.asfreq(dt).ffill(limit=1) + + # --- add date/hour columns --- + out["date"] = out.index.normalize() + out["hour"] = out.index.hour.astype("int64") + + return out